1use datafusion::arrow::datatypes::{
2 DECIMAL128_MAX_PRECISION, DataType, Field, FieldRef, Fields, IntervalUnit, Schema, TimeUnit,
3};
4use std::sync::{Arc, LazyLock};
5
6use crate::RemoteDbType;
7
8#[derive(Debug, Clone)]
9pub enum RemoteType {
10 Postgres(PostgresType),
11 Mysql(MysqlType),
12 Oracle(OracleType),
13 Sqlite(SqliteType),
14 Dm(DmType),
15}
16
17impl RemoteType {
18 pub fn to_arrow_type(&self) -> DataType {
19 match self {
20 RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
21 RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
22 RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
23 RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
24 RemoteType::Dm(dm_type) => dm_type.to_arrow_type(),
25 }
26 }
27
28 pub fn db_type(&self) -> RemoteDbType {
29 match self {
30 RemoteType::Postgres(_) => RemoteDbType::Postgres,
31 RemoteType::Mysql(_) => RemoteDbType::Mysql,
32 RemoteType::Oracle(_) => RemoteDbType::Oracle,
33 RemoteType::Sqlite(_) => RemoteDbType::Sqlite,
34 RemoteType::Dm(_) => RemoteDbType::Dm,
35 }
36 }
37}
38
39#[derive(Debug, Clone)]
41pub enum PostgresType {
42 Int2,
44 Int4,
46 Int8,
48 Float4,
50 Float8,
52 Numeric(u8, i8),
55 Oid,
56 Name,
57 Varchar,
59 Bpchar,
61 Text,
62 Bytea,
63 Date,
64 Timestamp,
65 TimestampTz,
66 Time,
67 Interval,
68 Bool,
69 Json,
70 Jsonb,
71 Int2Array,
72 Int4Array,
73 Int8Array,
74 Float4Array,
75 Float8Array,
76 VarcharArray,
77 BpcharArray,
78 TextArray,
79 ByteaArray,
80 BoolArray,
81 PostGisGeometry,
82 Xml,
83 Uuid,
84}
85
86impl PostgresType {
87 pub fn to_arrow_type(&self) -> DataType {
88 match self {
89 PostgresType::Int2 => DataType::Int16,
90 PostgresType::Int4 => DataType::Int32,
91 PostgresType::Int8 => DataType::Int64,
92 PostgresType::Float4 => DataType::Float32,
93 PostgresType::Float8 => DataType::Float64,
94 PostgresType::Numeric(precision, scale) => {
95 if *precision <= DECIMAL128_MAX_PRECISION {
96 DataType::Decimal128(*precision, *scale)
97 } else {
98 DataType::Decimal256(*precision, *scale)
99 }
100 }
101 PostgresType::Oid => DataType::UInt32,
102 PostgresType::Name
103 | PostgresType::Text
104 | PostgresType::Varchar
105 | PostgresType::Bpchar
106 | PostgresType::Xml => DataType::Utf8,
107 PostgresType::Bytea => DataType::Binary,
108 PostgresType::Date => DataType::Date32,
109 PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None),
110 PostgresType::TimestampTz => {
111 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
112 }
113 PostgresType::Time => DataType::Time64(TimeUnit::Microsecond),
114 PostgresType::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
115 PostgresType::Bool => DataType::Boolean,
116 PostgresType::Json | PostgresType::Jsonb => DataType::LargeUtf8,
117 PostgresType::Int2Array => {
118 DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
119 }
120 PostgresType::Int4Array => {
121 DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
122 }
123 PostgresType::Int8Array => {
124 DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
125 }
126 PostgresType::Float4Array => {
127 DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
128 }
129 PostgresType::Float8Array => {
130 DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
131 }
132 PostgresType::VarcharArray | PostgresType::BpcharArray | PostgresType::TextArray => {
133 DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
134 }
135 PostgresType::ByteaArray => {
136 DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
137 }
138 PostgresType::BoolArray => {
139 DataType::List(Arc::new(Field::new("", DataType::Boolean, true)))
140 }
141 PostgresType::PostGisGeometry => DataType::Binary,
142 PostgresType::Uuid => DataType::FixedSizeBinary(16),
143 }
144 }
145}
146
147#[derive(Debug, Clone)]
149pub enum MysqlType {
150 TinyInt,
151 TinyIntUnsigned,
152 SmallInt,
153 SmallIntUnsigned,
154 MediumInt,
155 MediumIntUnsigned,
156 Integer,
157 IntegerUnsigned,
158 BigInt,
159 BigIntUnsigned,
160 Float,
161 Double,
162 Decimal(u8, u8),
163 Date,
164 Datetime,
165 Time,
166 Timestamp,
167 Year,
168 Char,
169 Varchar,
170 Binary,
171 Varbinary,
172 Text(u32),
174 Blob(u32),
176 Json,
177 Geometry,
178}
179
180impl MysqlType {
181 pub fn to_arrow_type(&self) -> DataType {
182 match self {
183 MysqlType::TinyInt => DataType::Int8,
184 MysqlType::TinyIntUnsigned => DataType::UInt8,
185 MysqlType::SmallInt => DataType::Int16,
186 MysqlType::SmallIntUnsigned => DataType::UInt16,
187 MysqlType::MediumInt => DataType::Int32,
188 MysqlType::MediumIntUnsigned => DataType::UInt32,
189 MysqlType::Integer => DataType::Int32,
190 MysqlType::IntegerUnsigned => DataType::UInt32,
191 MysqlType::BigInt => DataType::Int64,
192 MysqlType::BigIntUnsigned => DataType::UInt64,
193 MysqlType::Float => DataType::Float32,
194 MysqlType::Double => DataType::Float64,
195 MysqlType::Decimal(precision, scale) => {
196 assert!(*scale <= (i8::MAX as u8));
197 if *precision > 38 {
198 DataType::Decimal256(*precision, *scale as i8)
199 } else {
200 DataType::Decimal128(*precision, *scale as i8)
201 }
202 }
203 MysqlType::Date => DataType::Date32,
204 MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
205 MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
206 MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
207 MysqlType::Year => DataType::Int16,
208 MysqlType::Char => DataType::Utf8,
209 MysqlType::Varchar => DataType::Utf8,
210 MysqlType::Binary => DataType::Binary,
211 MysqlType::Varbinary => DataType::Binary,
212 MysqlType::Text(col_len) => {
213 if (*col_len as usize) > (i32::MAX as usize) {
214 DataType::LargeUtf8
215 } else {
216 DataType::Utf8
217 }
218 }
219 MysqlType::Blob(col_len) => {
220 if (*col_len as usize) > (i32::MAX as usize) {
221 DataType::LargeBinary
222 } else {
223 DataType::Binary
224 }
225 }
226 MysqlType::Json => DataType::LargeUtf8,
227 MysqlType::Geometry => DataType::LargeBinary,
228 }
229 }
230}
231
232#[derive(Debug, Clone)]
235pub enum OracleType {
236 BinaryFloat,
237 BinaryDouble,
238 Number(u8, i8),
239 Float(u8),
240 Varchar2(u32),
241 NVarchar2(u32),
242 Char(u32),
243 NChar(u32),
244 Long,
245 Clob,
246 NClob,
247 Raw(u32),
248 LongRaw,
249 Blob,
250 Date,
251 Timestamp,
252 Boolean,
253 SdeGeometry,
254}
255
256impl OracleType {
257 pub fn to_arrow_type(&self) -> DataType {
258 match self {
259 OracleType::BinaryFloat => DataType::Float32,
260 OracleType::BinaryDouble => DataType::Float64,
261 OracleType::Number(precision, scale) => DataType::Decimal128(*precision, *scale),
262 OracleType::Float(_precision) => DataType::Float64,
263 OracleType::Varchar2(_) => DataType::Utf8,
264 OracleType::NVarchar2(_) => DataType::Utf8,
265 OracleType::Char(_) => DataType::Utf8,
266 OracleType::NChar(_) => DataType::Utf8,
267 OracleType::Long => DataType::Utf8,
268 OracleType::Clob => DataType::LargeUtf8,
269 OracleType::NClob => DataType::LargeUtf8,
270 OracleType::Raw(_) => DataType::Binary,
271 OracleType::LongRaw => DataType::Binary,
272 OracleType::Blob => DataType::LargeBinary,
273 OracleType::Date => DataType::Timestamp(TimeUnit::Second, None),
274 OracleType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
275 OracleType::Boolean => DataType::Boolean,
276 OracleType::SdeGeometry => {
277 static ENTITY: LazyLock<FieldRef> = LazyLock::new(|| {
278 Arc::new(Field::new("ENTITY", DataType::Decimal128(38, 0), true))
279 });
280 static NUMPTS: LazyLock<FieldRef> = LazyLock::new(|| {
281 Arc::new(Field::new("NUMPTS", DataType::Decimal128(38, 0), true))
282 });
283 static MINX: LazyLock<FieldRef> =
284 LazyLock::new(|| Arc::new(Field::new("MINX", DataType::Float64, true)));
285 static MINY: LazyLock<FieldRef> =
286 LazyLock::new(|| Arc::new(Field::new("MINY", DataType::Float64, true)));
287 static MAXX: LazyLock<FieldRef> =
288 LazyLock::new(|| Arc::new(Field::new("MAXX", DataType::Float64, true)));
289 static MAXY: LazyLock<FieldRef> =
290 LazyLock::new(|| Arc::new(Field::new("MAXY", DataType::Float64, true)));
291 static MINZ: LazyLock<FieldRef> =
292 LazyLock::new(|| Arc::new(Field::new("MINZ", DataType::Float64, true)));
293 static MAXZ: LazyLock<FieldRef> =
294 LazyLock::new(|| Arc::new(Field::new("MAXZ", DataType::Float64, true)));
295 static MINM: LazyLock<FieldRef> =
296 LazyLock::new(|| Arc::new(Field::new("MINM", DataType::Float64, true)));
297 static MAXM: LazyLock<FieldRef> =
298 LazyLock::new(|| Arc::new(Field::new("MAXM", DataType::Float64, true)));
299 static AREA: LazyLock<FieldRef> =
300 LazyLock::new(|| Arc::new(Field::new("AREA", DataType::Float64, true)));
301 static LEN: LazyLock<FieldRef> =
302 LazyLock::new(|| Arc::new(Field::new("LEN", DataType::Float64, true)));
303 static SRID: LazyLock<FieldRef> = LazyLock::new(|| {
304 Arc::new(Field::new("SRID", DataType::Decimal128(38, 0), true))
305 });
306 static POINTS: LazyLock<FieldRef> =
307 LazyLock::new(|| Arc::new(Field::new("POINTS", DataType::LargeBinary, true)));
308
309 DataType::Struct(Fields::from(vec![
310 ENTITY.clone(),
311 NUMPTS.clone(),
312 MINX.clone(),
313 MINY.clone(),
314 MAXX.clone(),
315 MAXY.clone(),
316 MINZ.clone(),
317 MAXZ.clone(),
318 MINM.clone(),
319 MAXM.clone(),
320 AREA.clone(),
321 LEN.clone(),
322 SRID.clone(),
323 POINTS.clone(),
324 ]))
325 }
326 }
327 }
328}
329
330#[derive(Debug, Clone)]
332pub enum SqliteType {
333 Null,
334 Integer,
335 Real,
336 Text,
337 Blob,
338}
339
340impl SqliteType {
341 pub fn to_arrow_type(&self) -> DataType {
342 match self {
343 SqliteType::Null => DataType::Null,
344 SqliteType::Integer => DataType::Int64,
345 SqliteType::Real => DataType::Float64,
346 SqliteType::Text => DataType::Utf8,
347 SqliteType::Blob => DataType::Binary,
348 }
349 }
350}
351
352#[derive(Debug, Clone)]
355pub enum DmType {
356 TinyInt,
357 SmallInt,
358 Integer,
359 BigInt,
360 Real,
361 Double,
362 Numeric(u8, i8),
363 Decimal(u8, i8),
364 Char(Option<u16>),
365 Varchar(Option<u16>),
366 Text,
367 Binary(u16),
368 Varbinary(Option<u16>),
369 Image,
370 Bit,
371 Timestamp(u8),
372 Time(u8),
373 Date,
374}
375
376impl DmType {
377 pub fn to_arrow_type(&self) -> DataType {
378 match self {
379 DmType::TinyInt => DataType::Int8,
380 DmType::SmallInt => DataType::Int16,
381 DmType::Integer => DataType::Int32,
382 DmType::BigInt => DataType::Int64,
383 DmType::Real => DataType::Float32,
384 DmType::Double => DataType::Float64,
385 DmType::Numeric(precision, scale) => DataType::Decimal128(*precision, *scale),
386 DmType::Decimal(precision, scale) => DataType::Decimal128(*precision, *scale),
387 DmType::Char(_) => DataType::Utf8,
388 DmType::Varchar(_) => DataType::Utf8,
389 DmType::Text => DataType::Utf8,
390 DmType::Binary(len) => DataType::FixedSizeBinary(*len as i32),
391 DmType::Varbinary(_) => DataType::Binary,
392 DmType::Image => DataType::Binary,
393 DmType::Bit => DataType::Boolean,
394 DmType::Timestamp(precision) => {
395 if *precision == 0 {
396 DataType::Timestamp(TimeUnit::Second, None)
397 } else if *precision <= 3 {
398 DataType::Timestamp(TimeUnit::Millisecond, None)
399 } else if *precision <= 6 {
400 DataType::Timestamp(TimeUnit::Microsecond, None)
401 } else {
402 DataType::Timestamp(TimeUnit::Nanosecond, None)
403 }
404 }
405 DmType::Time(precision) => {
406 if *precision == 0 {
407 DataType::Time32(TimeUnit::Second)
408 } else if *precision <= 3 {
409 DataType::Time32(TimeUnit::Millisecond)
410 } else {
411 DataType::Time64(TimeUnit::Microsecond)
412 }
413 }
414 DmType::Date => DataType::Date32,
415 }
416 }
417}
418
419#[derive(Debug, Clone)]
420pub struct RemoteField {
421 pub name: String,
422 pub remote_type: RemoteType,
423 pub nullable: bool,
424 pub auto_increment: bool,
425}
426
427impl RemoteField {
428 pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
429 RemoteField {
430 name: name.into(),
431 remote_type,
432 nullable,
433 auto_increment: false,
434 }
435 }
436
437 pub fn with_auto_increment(mut self, auto_increment: bool) -> Self {
438 self.auto_increment = auto_increment;
439 self
440 }
441
442 pub fn to_arrow_field(&self) -> Field {
443 Field::new(
444 self.name.clone(),
445 self.remote_type.to_arrow_type(),
446 self.nullable,
447 )
448 }
449}
450
451pub type RemoteSchemaRef = Arc<RemoteSchema>;
452
453#[derive(Debug, Clone)]
454pub struct RemoteSchema {
455 pub fields: Vec<RemoteField>,
456}
457
458impl RemoteSchema {
459 pub fn empty() -> Self {
460 RemoteSchema { fields: vec![] }
461 }
462
463 pub fn new(fields: Vec<RemoteField>) -> Self {
464 RemoteSchema { fields }
465 }
466
467 pub fn to_arrow_schema(&self) -> Schema {
468 let mut fields = vec![];
469 for remote_field in self.fields.iter() {
470 fields.push(remote_field.to_arrow_field());
471 }
472 Schema::new(fields)
473 }
474}