datafusion_remote_table/
schema.rs

1use datafusion::arrow::datatypes::{
2    DECIMAL128_MAX_PRECISION, DataType, Field, FieldRef, Fields, IntervalUnit, Schema, TimeUnit,
3};
4use std::sync::{Arc, LazyLock};
5
6use crate::RemoteDbType;
7
8#[derive(Debug, Clone)]
9pub enum RemoteType {
10    Postgres(PostgresType),
11    Mysql(MysqlType),
12    Oracle(OracleType),
13    Sqlite(SqliteType),
14    Dm(DmType),
15}
16
17impl RemoteType {
18    pub fn to_arrow_type(&self) -> DataType {
19        match self {
20            RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
21            RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
22            RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
23            RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
24            RemoteType::Dm(dm_type) => dm_type.to_arrow_type(),
25        }
26    }
27
28    pub fn db_type(&self) -> RemoteDbType {
29        match self {
30            RemoteType::Postgres(_) => RemoteDbType::Postgres,
31            RemoteType::Mysql(_) => RemoteDbType::Mysql,
32            RemoteType::Oracle(_) => RemoteDbType::Oracle,
33            RemoteType::Sqlite(_) => RemoteDbType::Sqlite,
34            RemoteType::Dm(_) => RemoteDbType::Dm,
35        }
36    }
37}
38
39/// https://www.postgresql.org/docs/current/datatype.html
40#[derive(Debug, Clone)]
41pub enum PostgresType {
42    // smallint
43    Int2,
44    // integer
45    Int4,
46    // bigint
47    Int8,
48    // real
49    Float4,
50    // double precision
51    Float8,
52    // numeric(p, s), decimal(p, s)
53    // precision is a fixed value(38)
54    Numeric(u8, i8),
55    Oid,
56    Name,
57    // varchar(n)
58    Varchar,
59    // char, char(n), bpchar(n), bpchar
60    Bpchar,
61    Text,
62    Bytea,
63    Date,
64    Timestamp,
65    TimestampTz,
66    Time,
67    Interval,
68    Bool,
69    Json,
70    Jsonb,
71    Int2Array,
72    Int4Array,
73    Int8Array,
74    Float4Array,
75    Float8Array,
76    VarcharArray,
77    BpcharArray,
78    TextArray,
79    ByteaArray,
80    BoolArray,
81    PostGisGeometry,
82    Xml,
83    Uuid,
84}
85
86impl PostgresType {
87    pub fn to_arrow_type(&self) -> DataType {
88        match self {
89            PostgresType::Int2 => DataType::Int16,
90            PostgresType::Int4 => DataType::Int32,
91            PostgresType::Int8 => DataType::Int64,
92            PostgresType::Float4 => DataType::Float32,
93            PostgresType::Float8 => DataType::Float64,
94            PostgresType::Numeric(precision, scale) => {
95                if *precision <= DECIMAL128_MAX_PRECISION {
96                    DataType::Decimal128(*precision, *scale)
97                } else {
98                    DataType::Decimal256(*precision, *scale)
99                }
100            }
101            PostgresType::Oid => DataType::UInt32,
102            PostgresType::Name
103            | PostgresType::Text
104            | PostgresType::Varchar
105            | PostgresType::Bpchar
106            | PostgresType::Xml => DataType::Utf8,
107            PostgresType::Bytea => DataType::Binary,
108            PostgresType::Date => DataType::Date32,
109            PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None),
110            PostgresType::TimestampTz => {
111                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
112            }
113            PostgresType::Time => DataType::Time64(TimeUnit::Microsecond),
114            PostgresType::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
115            PostgresType::Bool => DataType::Boolean,
116            PostgresType::Json | PostgresType::Jsonb => DataType::LargeUtf8,
117            PostgresType::Int2Array => {
118                DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
119            }
120            PostgresType::Int4Array => {
121                DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
122            }
123            PostgresType::Int8Array => {
124                DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
125            }
126            PostgresType::Float4Array => {
127                DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
128            }
129            PostgresType::Float8Array => {
130                DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
131            }
132            PostgresType::VarcharArray | PostgresType::BpcharArray | PostgresType::TextArray => {
133                DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
134            }
135            PostgresType::ByteaArray => {
136                DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
137            }
138            PostgresType::BoolArray => {
139                DataType::List(Arc::new(Field::new("", DataType::Boolean, true)))
140            }
141            PostgresType::PostGisGeometry => DataType::Binary,
142            PostgresType::Uuid => DataType::FixedSizeBinary(16),
143        }
144    }
145}
146
147// https://dev.mysql.com/doc/refman/8.4/en/data-types.html
148#[derive(Debug, Clone)]
149pub enum MysqlType {
150    TinyInt,
151    TinyIntUnsigned,
152    SmallInt,
153    SmallIntUnsigned,
154    MediumInt,
155    MediumIntUnsigned,
156    Integer,
157    IntegerUnsigned,
158    BigInt,
159    BigIntUnsigned,
160    Float,
161    Double,
162    Decimal(u8, u8),
163    Date,
164    Datetime,
165    Time,
166    Timestamp,
167    Year,
168    Char,
169    Varchar,
170    Binary,
171    Varbinary,
172    // TinyText, Text, MediumText, LongText
173    Text(u32),
174    // TinyBlob, Blob, MediumBlob, LongBlob
175    Blob(u32),
176    Json,
177    Geometry,
178}
179
180impl MysqlType {
181    pub fn to_arrow_type(&self) -> DataType {
182        match self {
183            MysqlType::TinyInt => DataType::Int8,
184            MysqlType::TinyIntUnsigned => DataType::UInt8,
185            MysqlType::SmallInt => DataType::Int16,
186            MysqlType::SmallIntUnsigned => DataType::UInt16,
187            MysqlType::MediumInt => DataType::Int32,
188            MysqlType::MediumIntUnsigned => DataType::UInt32,
189            MysqlType::Integer => DataType::Int32,
190            MysqlType::IntegerUnsigned => DataType::UInt32,
191            MysqlType::BigInt => DataType::Int64,
192            MysqlType::BigIntUnsigned => DataType::UInt64,
193            MysqlType::Float => DataType::Float32,
194            MysqlType::Double => DataType::Float64,
195            MysqlType::Decimal(precision, scale) => {
196                assert!(*scale <= (i8::MAX as u8));
197                if *precision > 38 {
198                    DataType::Decimal256(*precision, *scale as i8)
199                } else {
200                    DataType::Decimal128(*precision, *scale as i8)
201                }
202            }
203            MysqlType::Date => DataType::Date32,
204            MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
205            MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
206            MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
207            MysqlType::Year => DataType::Int16,
208            MysqlType::Char => DataType::Utf8,
209            MysqlType::Varchar => DataType::Utf8,
210            MysqlType::Binary => DataType::Binary,
211            MysqlType::Varbinary => DataType::Binary,
212            MysqlType::Text(col_len) => {
213                if (*col_len as usize) > (i32::MAX as usize) {
214                    DataType::LargeUtf8
215                } else {
216                    DataType::Utf8
217                }
218            }
219            MysqlType::Blob(col_len) => {
220                if (*col_len as usize) > (i32::MAX as usize) {
221                    DataType::LargeBinary
222                } else {
223                    DataType::Binary
224                }
225            }
226            MysqlType::Json => DataType::LargeUtf8,
227            MysqlType::Geometry => DataType::LargeBinary,
228        }
229    }
230}
231
232// https://docs.oracle.com/cd/B28359_01/server.111/b28286/sql_elements001.htm#i54330
233// https://docs.oracle.com/en/database/oracle/oracle-database/21/lnoci/data-types.html
234#[derive(Debug, Clone)]
235pub enum OracleType {
236    BinaryFloat,
237    BinaryDouble,
238    Number(u8, i8),
239    Float(u8),
240    Varchar2(u32),
241    NVarchar2(u32),
242    Char(u32),
243    NChar(u32),
244    Long,
245    Clob,
246    NClob,
247    Raw(u32),
248    LongRaw,
249    Blob,
250    Date,
251    Timestamp,
252    Boolean,
253    SdeGeometry,
254}
255
256impl OracleType {
257    pub fn to_arrow_type(&self) -> DataType {
258        match self {
259            OracleType::BinaryFloat => DataType::Float32,
260            OracleType::BinaryDouble => DataType::Float64,
261            OracleType::Number(precision, scale) => DataType::Decimal128(*precision, *scale),
262            OracleType::Float(_precision) => DataType::Float64,
263            OracleType::Varchar2(_) => DataType::Utf8,
264            OracleType::NVarchar2(_) => DataType::Utf8,
265            OracleType::Char(_) => DataType::Utf8,
266            OracleType::NChar(_) => DataType::Utf8,
267            OracleType::Long => DataType::Utf8,
268            OracleType::Clob => DataType::LargeUtf8,
269            OracleType::NClob => DataType::LargeUtf8,
270            OracleType::Raw(_) => DataType::Binary,
271            OracleType::LongRaw => DataType::Binary,
272            OracleType::Blob => DataType::LargeBinary,
273            OracleType::Date => DataType::Timestamp(TimeUnit::Second, None),
274            OracleType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
275            OracleType::Boolean => DataType::Boolean,
276            OracleType::SdeGeometry => {
277                static ENTITY: LazyLock<FieldRef> = LazyLock::new(|| {
278                    Arc::new(Field::new("ENTITY", DataType::Decimal128(38, 0), true))
279                });
280                static NUMPTS: LazyLock<FieldRef> = LazyLock::new(|| {
281                    Arc::new(Field::new("NUMPTS", DataType::Decimal128(38, 0), true))
282                });
283                static MINX: LazyLock<FieldRef> =
284                    LazyLock::new(|| Arc::new(Field::new("MINX", DataType::Float64, true)));
285                static MINY: LazyLock<FieldRef> =
286                    LazyLock::new(|| Arc::new(Field::new("MINY", DataType::Float64, true)));
287                static MAXX: LazyLock<FieldRef> =
288                    LazyLock::new(|| Arc::new(Field::new("MAXX", DataType::Float64, true)));
289                static MAXY: LazyLock<FieldRef> =
290                    LazyLock::new(|| Arc::new(Field::new("MAXY", DataType::Float64, true)));
291                static MINZ: LazyLock<FieldRef> =
292                    LazyLock::new(|| Arc::new(Field::new("MINZ", DataType::Float64, true)));
293                static MAXZ: LazyLock<FieldRef> =
294                    LazyLock::new(|| Arc::new(Field::new("MAXZ", DataType::Float64, true)));
295                static MINM: LazyLock<FieldRef> =
296                    LazyLock::new(|| Arc::new(Field::new("MINM", DataType::Float64, true)));
297                static MAXM: LazyLock<FieldRef> =
298                    LazyLock::new(|| Arc::new(Field::new("MAXM", DataType::Float64, true)));
299                static AREA: LazyLock<FieldRef> =
300                    LazyLock::new(|| Arc::new(Field::new("AREA", DataType::Float64, true)));
301                static LEN: LazyLock<FieldRef> =
302                    LazyLock::new(|| Arc::new(Field::new("LEN", DataType::Float64, true)));
303                static SRID: LazyLock<FieldRef> = LazyLock::new(|| {
304                    Arc::new(Field::new("SRID", DataType::Decimal128(38, 0), true))
305                });
306                static POINTS: LazyLock<FieldRef> =
307                    LazyLock::new(|| Arc::new(Field::new("POINTS", DataType::LargeBinary, true)));
308
309                DataType::Struct(Fields::from(vec![
310                    ENTITY.clone(),
311                    NUMPTS.clone(),
312                    MINX.clone(),
313                    MINY.clone(),
314                    MAXX.clone(),
315                    MAXY.clone(),
316                    MINZ.clone(),
317                    MAXZ.clone(),
318                    MINM.clone(),
319                    MAXM.clone(),
320                    AREA.clone(),
321                    LEN.clone(),
322                    SRID.clone(),
323                    POINTS.clone(),
324                ]))
325            }
326        }
327    }
328}
329
330// https://www.sqlite.org/datatype3.html
331#[derive(Debug, Clone)]
332pub enum SqliteType {
333    Null,
334    Integer,
335    Real,
336    Text,
337    Blob,
338}
339
340impl SqliteType {
341    pub fn to_arrow_type(&self) -> DataType {
342        match self {
343            SqliteType::Null => DataType::Null,
344            SqliteType::Integer => DataType::Int64,
345            SqliteType::Real => DataType::Float64,
346            SqliteType::Text => DataType::Utf8,
347            SqliteType::Blob => DataType::Binary,
348        }
349    }
350}
351
352// https://eco.dameng.com/document/dm/zh-cn/pm/odbc-rogramming-guide.html
353// https://eco.dameng.com/document/dm/zh-cn/pm/dm8_sql-data-types-operators.html
354#[derive(Debug, Clone)]
355pub enum DmType {
356    TinyInt,
357    SmallInt,
358    Integer,
359    BigInt,
360    Real,
361    Double,
362    Numeric(u8, i8),
363    Decimal(u8, i8),
364    Char(Option<u16>),
365    Varchar(Option<u16>),
366    Text,
367    Binary(u16),
368    Varbinary(Option<u16>),
369    Image,
370    Bit,
371    Timestamp(u8),
372    Time(u8),
373    Date,
374}
375
376impl DmType {
377    pub fn to_arrow_type(&self) -> DataType {
378        match self {
379            DmType::TinyInt => DataType::Int8,
380            DmType::SmallInt => DataType::Int16,
381            DmType::Integer => DataType::Int32,
382            DmType::BigInt => DataType::Int64,
383            DmType::Real => DataType::Float32,
384            DmType::Double => DataType::Float64,
385            DmType::Numeric(precision, scale) => DataType::Decimal128(*precision, *scale),
386            DmType::Decimal(precision, scale) => DataType::Decimal128(*precision, *scale),
387            DmType::Char(_) => DataType::Utf8,
388            DmType::Varchar(_) => DataType::Utf8,
389            DmType::Text => DataType::Utf8,
390            DmType::Binary(len) => DataType::FixedSizeBinary(*len as i32),
391            DmType::Varbinary(_) => DataType::Binary,
392            DmType::Image => DataType::Binary,
393            DmType::Bit => DataType::Boolean,
394            DmType::Timestamp(precision) => {
395                if *precision == 0 {
396                    DataType::Timestamp(TimeUnit::Second, None)
397                } else if *precision <= 3 {
398                    DataType::Timestamp(TimeUnit::Millisecond, None)
399                } else if *precision <= 6 {
400                    DataType::Timestamp(TimeUnit::Microsecond, None)
401                } else {
402                    DataType::Timestamp(TimeUnit::Nanosecond, None)
403                }
404            }
405            DmType::Time(precision) => {
406                if *precision == 0 {
407                    DataType::Time32(TimeUnit::Second)
408                } else if *precision <= 3 {
409                    DataType::Time32(TimeUnit::Millisecond)
410                } else {
411                    DataType::Time64(TimeUnit::Microsecond)
412                }
413            }
414            DmType::Date => DataType::Date32,
415        }
416    }
417}
418
419#[derive(Debug, Clone)]
420pub struct RemoteField {
421    pub name: String,
422    pub remote_type: RemoteType,
423    pub nullable: bool,
424    pub auto_increment: bool,
425}
426
427impl RemoteField {
428    pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
429        RemoteField {
430            name: name.into(),
431            remote_type,
432            nullable,
433            auto_increment: false,
434        }
435    }
436
437    pub fn with_auto_increment(mut self, auto_increment: bool) -> Self {
438        self.auto_increment = auto_increment;
439        self
440    }
441
442    pub fn to_arrow_field(&self) -> Field {
443        Field::new(
444            self.name.clone(),
445            self.remote_type.to_arrow_type(),
446            self.nullable,
447        )
448    }
449}
450
451pub type RemoteSchemaRef = Arc<RemoteSchema>;
452
453#[derive(Debug, Clone)]
454pub struct RemoteSchema {
455    pub fields: Vec<RemoteField>,
456}
457
458impl RemoteSchema {
459    pub fn empty() -> Self {
460        RemoteSchema { fields: vec![] }
461    }
462
463    pub fn new(fields: Vec<RemoteField>) -> Self {
464        RemoteSchema { fields }
465    }
466
467    pub fn to_arrow_schema(&self) -> Schema {
468        let mut fields = vec![];
469        for remote_field in self.fields.iter() {
470            fields.push(remote_field.to_arrow_field());
471        }
472        Schema::new(fields)
473    }
474}