datafusion_remote_table/
schema.rs

1use datafusion::arrow::datatypes::{
2    DECIMAL128_MAX_PRECISION, DataType, Field, IntervalUnit, Schema, TimeUnit,
3};
4use std::sync::Arc;
5
6use crate::RemoteDbType;
7
8#[derive(Debug, Clone)]
9pub enum RemoteType {
10    Postgres(PostgresType),
11    Mysql(MysqlType),
12    Oracle(OracleType),
13    Sqlite(SqliteType),
14    Dm(DmType),
15}
16
17impl RemoteType {
18    pub fn to_arrow_type(&self) -> DataType {
19        match self {
20            RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
21            RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
22            RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
23            RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
24            RemoteType::Dm(dm_type) => dm_type.to_arrow_type(),
25        }
26    }
27
28    pub fn db_type(&self) -> RemoteDbType {
29        match self {
30            RemoteType::Postgres(_) => RemoteDbType::Postgres,
31            RemoteType::Mysql(_) => RemoteDbType::Mysql,
32            RemoteType::Oracle(_) => RemoteDbType::Oracle,
33            RemoteType::Sqlite(_) => RemoteDbType::Sqlite,
34            RemoteType::Dm(_) => RemoteDbType::Dm,
35        }
36    }
37}
38
39/// https://www.postgresql.org/docs/current/datatype.html
40#[derive(Debug, Clone)]
41pub enum PostgresType {
42    // smallint
43    Int2,
44    // integer
45    Int4,
46    // bigint
47    Int8,
48    // real
49    Float4,
50    // double precision
51    Float8,
52    // numeric(p, s), decimal(p, s)
53    // precision is a fixed value(38)
54    Numeric(u8, i8),
55    Oid,
56    Name,
57    // varchar(n)
58    Varchar,
59    // char, char(n), bpchar(n), bpchar
60    Bpchar,
61    Text,
62    Bytea,
63    Date,
64    Timestamp,
65    TimestampTz,
66    Time,
67    Interval,
68    Bool,
69    Json,
70    Jsonb,
71    Int2Array,
72    Int4Array,
73    Int8Array,
74    Float4Array,
75    Float8Array,
76    VarcharArray,
77    BpcharArray,
78    TextArray,
79    ByteaArray,
80    BoolArray,
81    PostGisGeometry,
82    Xml,
83    Uuid,
84}
85
86impl PostgresType {
87    pub fn to_arrow_type(&self) -> DataType {
88        match self {
89            PostgresType::Int2 => DataType::Int16,
90            PostgresType::Int4 => DataType::Int32,
91            PostgresType::Int8 => DataType::Int64,
92            PostgresType::Float4 => DataType::Float32,
93            PostgresType::Float8 => DataType::Float64,
94            PostgresType::Numeric(precision, scale) => {
95                if *precision <= DECIMAL128_MAX_PRECISION {
96                    DataType::Decimal128(*precision, *scale)
97                } else {
98                    DataType::Decimal256(*precision, *scale)
99                }
100            }
101            PostgresType::Oid => DataType::UInt32,
102            PostgresType::Name
103            | PostgresType::Text
104            | PostgresType::Varchar
105            | PostgresType::Bpchar
106            | PostgresType::Xml => DataType::Utf8,
107            PostgresType::Bytea => DataType::Binary,
108            PostgresType::Date => DataType::Date32,
109            PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None),
110            PostgresType::TimestampTz => {
111                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
112            }
113            PostgresType::Time => DataType::Time64(TimeUnit::Microsecond),
114            PostgresType::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
115            PostgresType::Bool => DataType::Boolean,
116            PostgresType::Json | PostgresType::Jsonb => DataType::LargeUtf8,
117            PostgresType::Int2Array => {
118                DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
119            }
120            PostgresType::Int4Array => {
121                DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
122            }
123            PostgresType::Int8Array => {
124                DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
125            }
126            PostgresType::Float4Array => {
127                DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
128            }
129            PostgresType::Float8Array => {
130                DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
131            }
132            PostgresType::VarcharArray | PostgresType::BpcharArray | PostgresType::TextArray => {
133                DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
134            }
135            PostgresType::ByteaArray => {
136                DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
137            }
138            PostgresType::BoolArray => {
139                DataType::List(Arc::new(Field::new("", DataType::Boolean, true)))
140            }
141            PostgresType::PostGisGeometry => DataType::Binary,
142            PostgresType::Uuid => DataType::FixedSizeBinary(16),
143        }
144    }
145}
146
147// https://dev.mysql.com/doc/refman/8.4/en/data-types.html
148#[derive(Debug, Clone)]
149pub enum MysqlType {
150    TinyInt,
151    TinyIntUnsigned,
152    SmallInt,
153    SmallIntUnsigned,
154    MediumInt,
155    MediumIntUnsigned,
156    Integer,
157    IntegerUnsigned,
158    BigInt,
159    BigIntUnsigned,
160    Float,
161    Double,
162    Decimal(u8, u8),
163    Date,
164    Datetime,
165    Time,
166    Timestamp,
167    Year,
168    Char,
169    Varchar,
170    Binary,
171    Varbinary,
172    // TinyText, Text, MediumText, LongText
173    Text(u32),
174    // TinyBlob, Blob, MediumBlob, LongBlob
175    Blob(u32),
176    Json,
177    Geometry,
178}
179
180impl MysqlType {
181    pub fn to_arrow_type(&self) -> DataType {
182        match self {
183            MysqlType::TinyInt => DataType::Int8,
184            MysqlType::TinyIntUnsigned => DataType::UInt8,
185            MysqlType::SmallInt => DataType::Int16,
186            MysqlType::SmallIntUnsigned => DataType::UInt16,
187            MysqlType::MediumInt => DataType::Int32,
188            MysqlType::MediumIntUnsigned => DataType::UInt32,
189            MysqlType::Integer => DataType::Int32,
190            MysqlType::IntegerUnsigned => DataType::UInt32,
191            MysqlType::BigInt => DataType::Int64,
192            MysqlType::BigIntUnsigned => DataType::UInt64,
193            MysqlType::Float => DataType::Float32,
194            MysqlType::Double => DataType::Float64,
195            MysqlType::Decimal(precision, scale) => {
196                assert!(*scale <= (i8::MAX as u8));
197                if *precision > 38 {
198                    DataType::Decimal256(*precision, *scale as i8)
199                } else {
200                    DataType::Decimal128(*precision, *scale as i8)
201                }
202            }
203            MysqlType::Date => DataType::Date32,
204            MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
205            MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
206            MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
207            MysqlType::Year => DataType::Int16,
208            MysqlType::Char => DataType::Utf8,
209            MysqlType::Varchar => DataType::Utf8,
210            MysqlType::Binary => DataType::Binary,
211            MysqlType::Varbinary => DataType::Binary,
212            MysqlType::Text(col_len) => {
213                if (*col_len as usize) > (i32::MAX as usize) {
214                    DataType::LargeUtf8
215                } else {
216                    DataType::Utf8
217                }
218            }
219            MysqlType::Blob(col_len) => {
220                if (*col_len as usize) > (i32::MAX as usize) {
221                    DataType::LargeBinary
222                } else {
223                    DataType::Binary
224                }
225            }
226            MysqlType::Json => DataType::LargeUtf8,
227            MysqlType::Geometry => DataType::LargeBinary,
228        }
229    }
230}
231
232// https://docs.oracle.com/cd/B28359_01/server.111/b28286/sql_elements001.htm#i54330
233// https://docs.oracle.com/en/database/oracle/oracle-database/21/lnoci/data-types.html
234#[derive(Debug, Clone)]
235pub enum OracleType {
236    BinaryFloat,
237    BinaryDouble,
238    Number(u8, i8),
239    Float(u8),
240    Varchar2(u32),
241    NVarchar2(u32),
242    Char(u32),
243    NChar(u32),
244    Long,
245    Clob,
246    NClob,
247    Raw(u32),
248    LongRaw,
249    Blob,
250    Date,
251    Timestamp,
252    Boolean,
253}
254
255impl OracleType {
256    pub fn to_arrow_type(&self) -> DataType {
257        match self {
258            OracleType::BinaryFloat => DataType::Float32,
259            OracleType::BinaryDouble => DataType::Float64,
260            OracleType::Number(precision, scale) => DataType::Decimal128(*precision, *scale),
261            OracleType::Float(_precision) => DataType::Float64,
262            OracleType::Varchar2(_) => DataType::Utf8,
263            OracleType::NVarchar2(_) => DataType::Utf8,
264            OracleType::Char(_) => DataType::Utf8,
265            OracleType::NChar(_) => DataType::Utf8,
266            OracleType::Long => DataType::Utf8,
267            OracleType::Clob => DataType::LargeUtf8,
268            OracleType::NClob => DataType::LargeUtf8,
269            OracleType::Raw(_) => DataType::Binary,
270            OracleType::LongRaw => DataType::Binary,
271            OracleType::Blob => DataType::LargeBinary,
272            OracleType::Date => DataType::Timestamp(TimeUnit::Second, None),
273            OracleType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
274            OracleType::Boolean => DataType::Boolean,
275        }
276    }
277}
278
279// https://www.sqlite.org/datatype3.html
280#[derive(Debug, Clone)]
281pub enum SqliteType {
282    Null,
283    Integer,
284    Real,
285    Text,
286    Blob,
287}
288
289impl SqliteType {
290    pub fn to_arrow_type(&self) -> DataType {
291        match self {
292            SqliteType::Null => DataType::Null,
293            SqliteType::Integer => DataType::Int64,
294            SqliteType::Real => DataType::Float64,
295            SqliteType::Text => DataType::Utf8,
296            SqliteType::Blob => DataType::Binary,
297        }
298    }
299}
300
301// https://eco.dameng.com/document/dm/zh-cn/pm/odbc-rogramming-guide.html
302// https://eco.dameng.com/document/dm/zh-cn/pm/dm8_sql-data-types-operators.html
303#[derive(Debug, Clone)]
304pub enum DmType {
305    TinyInt,
306    SmallInt,
307    Integer,
308    BigInt,
309    Real,
310    Double,
311    Numeric(u8, i8),
312    Decimal(u8, i8),
313    Char(Option<u16>),
314    Varchar(Option<u16>),
315    Text,
316    Binary(u16),
317    Varbinary(Option<u16>),
318    Image,
319    Bit,
320    Timestamp(u8),
321    Time(u8),
322    Date,
323}
324
325impl DmType {
326    pub fn to_arrow_type(&self) -> DataType {
327        match self {
328            DmType::TinyInt => DataType::Int8,
329            DmType::SmallInt => DataType::Int16,
330            DmType::Integer => DataType::Int32,
331            DmType::BigInt => DataType::Int64,
332            DmType::Real => DataType::Float32,
333            DmType::Double => DataType::Float64,
334            DmType::Numeric(precision, scale) => DataType::Decimal128(*precision, *scale),
335            DmType::Decimal(precision, scale) => DataType::Decimal128(*precision, *scale),
336            DmType::Char(_) => DataType::Utf8,
337            DmType::Varchar(_) => DataType::Utf8,
338            DmType::Text => DataType::Utf8,
339            DmType::Binary(len) => DataType::FixedSizeBinary(*len as i32),
340            DmType::Varbinary(_) => DataType::Binary,
341            DmType::Image => DataType::Binary,
342            DmType::Bit => DataType::Boolean,
343            DmType::Timestamp(precision) => {
344                if *precision == 0 {
345                    DataType::Timestamp(TimeUnit::Second, None)
346                } else if *precision <= 3 {
347                    DataType::Timestamp(TimeUnit::Millisecond, None)
348                } else if *precision <= 6 {
349                    DataType::Timestamp(TimeUnit::Microsecond, None)
350                } else {
351                    DataType::Timestamp(TimeUnit::Nanosecond, None)
352                }
353            }
354            DmType::Time(precision) => {
355                if *precision == 0 {
356                    DataType::Time32(TimeUnit::Second)
357                } else if *precision <= 3 {
358                    DataType::Time32(TimeUnit::Millisecond)
359                } else {
360                    DataType::Time64(TimeUnit::Microsecond)
361                }
362            }
363            DmType::Date => DataType::Date32,
364        }
365    }
366}
367
368#[derive(Debug, Clone)]
369pub struct RemoteField {
370    pub name: String,
371    pub remote_type: RemoteType,
372    pub nullable: bool,
373    pub auto_increment: bool,
374}
375
376impl RemoteField {
377    pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
378        RemoteField {
379            name: name.into(),
380            remote_type,
381            nullable,
382            auto_increment: false,
383        }
384    }
385
386    pub fn with_auto_increment(mut self, auto_increment: bool) -> Self {
387        self.auto_increment = auto_increment;
388        self
389    }
390
391    pub fn to_arrow_field(&self) -> Field {
392        Field::new(
393            self.name.clone(),
394            self.remote_type.to_arrow_type(),
395            self.nullable,
396        )
397    }
398}
399
400pub type RemoteSchemaRef = Arc<RemoteSchema>;
401
402#[derive(Debug, Clone)]
403pub struct RemoteSchema {
404    pub fields: Vec<RemoteField>,
405}
406
407impl RemoteSchema {
408    pub fn empty() -> Self {
409        RemoteSchema { fields: vec![] }
410    }
411
412    pub fn new(fields: Vec<RemoteField>) -> Self {
413        RemoteSchema { fields }
414    }
415
416    pub fn to_arrow_schema(&self) -> Schema {
417        let mut fields = vec![];
418        for remote_field in self.fields.iter() {
419            fields.push(remote_field.to_arrow_field());
420        }
421        Schema::new(fields)
422    }
423}