datafusion_remote_table/
schema.rs

1use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
2use std::sync::Arc;
3
4#[derive(Debug, Clone)]
5pub enum RemoteType {
6    Postgres(PostgresType),
7    Mysql(MysqlType),
8    Oracle(OracleType),
9    Sqlite(SqliteType),
10    Dm(DmType),
11}
12
13impl RemoteType {
14    pub fn to_arrow_type(&self) -> DataType {
15        match self {
16            RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
17            RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
18            RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
19            RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
20            RemoteType::Dm(dm_type) => dm_type.to_arrow_type(),
21        }
22    }
23}
24
25/// https://www.postgresql.org/docs/current/datatype.html
26#[derive(Debug, Clone)]
27pub enum PostgresType {
28    // smallint
29    Int2,
30    // integer
31    Int4,
32    // bigint
33    Int8,
34    // real
35    Float4,
36    // double precision
37    Float8,
38    // numeric(p, s), decimal(p, s)
39    // precision is a fixed value(38)
40    Numeric(i8),
41    Oid,
42    Name,
43    // varchar(n)
44    Varchar,
45    // char, char(n), bpchar(n), bpchar
46    Bpchar,
47    Text,
48    Bytea,
49    Date,
50    Timestamp,
51    TimestampTz,
52    Time,
53    Interval,
54    Bool,
55    Json,
56    Jsonb,
57    Int2Array,
58    Int4Array,
59    Int8Array,
60    Float4Array,
61    Float8Array,
62    VarcharArray,
63    BpcharArray,
64    TextArray,
65    ByteaArray,
66    BoolArray,
67    PostGisGeometry,
68}
69
70impl PostgresType {
71    pub fn to_arrow_type(&self) -> DataType {
72        match self {
73            PostgresType::Int2 => DataType::Int16,
74            PostgresType::Int4 => DataType::Int32,
75            PostgresType::Int8 => DataType::Int64,
76            PostgresType::Float4 => DataType::Float32,
77            PostgresType::Float8 => DataType::Float64,
78            PostgresType::Numeric(scale) => DataType::Decimal128(38, *scale),
79            PostgresType::Oid => DataType::UInt32,
80            PostgresType::Name
81            | PostgresType::Text
82            | PostgresType::Varchar
83            | PostgresType::Bpchar => DataType::Utf8,
84            PostgresType::Bytea => DataType::Binary,
85            PostgresType::Date => DataType::Date32,
86            PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
87            PostgresType::TimestampTz => {
88                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
89            }
90            PostgresType::Time => DataType::Time64(TimeUnit::Nanosecond),
91            PostgresType::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
92            PostgresType::Bool => DataType::Boolean,
93            PostgresType::Json | PostgresType::Jsonb => DataType::LargeUtf8,
94            PostgresType::Int2Array => {
95                DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
96            }
97            PostgresType::Int4Array => {
98                DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
99            }
100            PostgresType::Int8Array => {
101                DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
102            }
103            PostgresType::Float4Array => {
104                DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
105            }
106            PostgresType::Float8Array => {
107                DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
108            }
109            PostgresType::VarcharArray | PostgresType::BpcharArray | PostgresType::TextArray => {
110                DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
111            }
112            PostgresType::ByteaArray => {
113                DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
114            }
115            PostgresType::BoolArray => {
116                DataType::List(Arc::new(Field::new("", DataType::Boolean, true)))
117            }
118            PostgresType::PostGisGeometry => DataType::Binary,
119        }
120    }
121}
122
123// https://dev.mysql.com/doc/refman/8.4/en/data-types.html
124#[derive(Debug, Clone)]
125pub enum MysqlType {
126    TinyInt,
127    TinyIntUnsigned,
128    SmallInt,
129    SmallIntUnsigned,
130    MediumInt,
131    MediumIntUnsigned,
132    Integer,
133    IntegerUnsigned,
134    BigInt,
135    BigIntUnsigned,
136    Float,
137    Double,
138    Decimal(u8, u8),
139    Date,
140    Datetime,
141    Time,
142    Timestamp,
143    Year,
144    Char,
145    Varchar,
146    Binary,
147    Varbinary,
148    // TinyText, Text, MediumText, LongText
149    Text(u32),
150    // TinyBlob, Blob, MediumBlob, LongBlob
151    Blob(u32),
152    Json,
153    Geometry,
154}
155
156impl MysqlType {
157    pub fn to_arrow_type(&self) -> DataType {
158        match self {
159            MysqlType::TinyInt => DataType::Int8,
160            MysqlType::TinyIntUnsigned => DataType::UInt8,
161            MysqlType::SmallInt => DataType::Int16,
162            MysqlType::SmallIntUnsigned => DataType::UInt16,
163            MysqlType::MediumInt => DataType::Int32,
164            MysqlType::MediumIntUnsigned => DataType::UInt32,
165            MysqlType::Integer => DataType::Int32,
166            MysqlType::IntegerUnsigned => DataType::UInt32,
167            MysqlType::BigInt => DataType::Int64,
168            MysqlType::BigIntUnsigned => DataType::UInt64,
169            MysqlType::Float => DataType::Float32,
170            MysqlType::Double => DataType::Float64,
171            MysqlType::Decimal(precision, scale) => {
172                assert!(*scale <= (i8::MAX as u8));
173                if *precision > 38 {
174                    DataType::Decimal256(*precision, *scale as i8)
175                } else {
176                    DataType::Decimal128(*precision, *scale as i8)
177                }
178            }
179            MysqlType::Date => DataType::Date32,
180            MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
181            MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
182            MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
183            MysqlType::Year => DataType::Int16,
184            MysqlType::Char => DataType::Utf8,
185            MysqlType::Varchar => DataType::Utf8,
186            MysqlType::Binary => DataType::Binary,
187            MysqlType::Varbinary => DataType::Binary,
188            MysqlType::Text(col_len) => {
189                if (*col_len as usize) > (i32::MAX as usize) {
190                    DataType::LargeUtf8
191                } else {
192                    DataType::Utf8
193                }
194            }
195            MysqlType::Blob(col_len) => {
196                if (*col_len as usize) > (i32::MAX as usize) {
197                    DataType::LargeBinary
198                } else {
199                    DataType::Binary
200                }
201            }
202            MysqlType::Json => DataType::LargeUtf8,
203            MysqlType::Geometry => DataType::LargeBinary,
204        }
205    }
206}
207
208// https://docs.oracle.com/cd/B28359_01/server.111/b28286/sql_elements001.htm#i54330
209// https://docs.oracle.com/en/database/oracle/oracle-database/21/lnoci/data-types.html
210#[derive(Debug, Clone)]
211pub enum OracleType {
212    BinaryFloat,
213    BinaryDouble,
214    Number(u8, i8),
215    Float(u8),
216    Varchar2(u32),
217    NVarchar2(u32),
218    Char(u32),
219    NChar(u32),
220    Long,
221    Clob,
222    NClob,
223    Raw(u32),
224    LongRaw,
225    Blob,
226    Date,
227    Timestamp,
228    Boolean,
229}
230
231impl OracleType {
232    pub fn to_arrow_type(&self) -> DataType {
233        match self {
234            OracleType::BinaryFloat => DataType::Float32,
235            OracleType::BinaryDouble => DataType::Float64,
236            OracleType::Number(precision, scale) => DataType::Decimal128(*precision, *scale),
237            OracleType::Float(_precision) => DataType::Float64,
238            OracleType::Varchar2(_) => DataType::Utf8,
239            OracleType::NVarchar2(_) => DataType::Utf8,
240            OracleType::Char(_) => DataType::Utf8,
241            OracleType::NChar(_) => DataType::Utf8,
242            OracleType::Long => DataType::Utf8,
243            OracleType::Clob => DataType::LargeUtf8,
244            OracleType::NClob => DataType::LargeUtf8,
245            OracleType::Raw(_) => DataType::Binary,
246            OracleType::LongRaw => DataType::Binary,
247            OracleType::Blob => DataType::LargeBinary,
248            OracleType::Date => DataType::Timestamp(TimeUnit::Second, None),
249            OracleType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
250            OracleType::Boolean => DataType::Boolean,
251        }
252    }
253}
254
255// https://www.sqlite.org/datatype3.html
256#[derive(Debug, Clone)]
257pub enum SqliteType {
258    Null,
259    Integer,
260    Real,
261    Text,
262    Blob,
263}
264
265impl SqliteType {
266    pub fn to_arrow_type(&self) -> DataType {
267        match self {
268            SqliteType::Null => DataType::Null,
269            SqliteType::Integer => DataType::Int64,
270            SqliteType::Real => DataType::Float64,
271            SqliteType::Text => DataType::Utf8,
272            SqliteType::Blob => DataType::Binary,
273        }
274    }
275}
276
277// https://eco.dameng.com/document/dm/zh-cn/pm/odbc-rogramming-guide.html
278// https://eco.dameng.com/document/dm/zh-cn/pm/dm8_sql-data-types-operators.html
279#[derive(Debug, Clone)]
280pub enum DmType {
281    TinyInt,
282    SmallInt,
283    Integer,
284    BigInt,
285    Real,
286    Double,
287    Numeric(u8, i8),
288    Decimal(u8, i8),
289    Char(Option<u16>),
290    Varchar(Option<u16>),
291    Text,
292    Binary(u16),
293    Varbinary(Option<u16>),
294    Image,
295    Bit,
296    Timestamp(u8),
297    Time(u8),
298    Date,
299}
300
301impl DmType {
302    pub fn to_arrow_type(&self) -> DataType {
303        match self {
304            DmType::TinyInt => DataType::Int8,
305            DmType::SmallInt => DataType::Int16,
306            DmType::Integer => DataType::Int32,
307            DmType::BigInt => DataType::Int64,
308            DmType::Real => DataType::Float32,
309            DmType::Double => DataType::Float64,
310            DmType::Numeric(precision, scale) => DataType::Decimal128(*precision, *scale),
311            DmType::Decimal(precision, scale) => DataType::Decimal128(*precision, *scale),
312            DmType::Char(_) => DataType::Utf8,
313            DmType::Varchar(_) => DataType::Utf8,
314            DmType::Text => DataType::Utf8,
315            DmType::Binary(len) => DataType::FixedSizeBinary(*len as i32),
316            DmType::Varbinary(_) => DataType::Binary,
317            DmType::Image => DataType::Binary,
318            DmType::Bit => DataType::Boolean,
319            DmType::Timestamp(precision) => {
320                if *precision == 0 {
321                    DataType::Timestamp(TimeUnit::Second, None)
322                } else if *precision <= 3 {
323                    DataType::Timestamp(TimeUnit::Millisecond, None)
324                } else if *precision <= 6 {
325                    DataType::Timestamp(TimeUnit::Microsecond, None)
326                } else {
327                    DataType::Timestamp(TimeUnit::Nanosecond, None)
328                }
329            }
330            DmType::Time(precision) => {
331                if *precision == 0 {
332                    DataType::Time32(TimeUnit::Second)
333                } else if *precision <= 3 {
334                    DataType::Time32(TimeUnit::Millisecond)
335                } else {
336                    DataType::Time64(TimeUnit::Microsecond)
337                }
338            }
339            DmType::Date => DataType::Date32,
340        }
341    }
342}
343
344#[derive(Debug, Clone)]
345pub struct RemoteField {
346    pub name: String,
347    pub remote_type: RemoteType,
348    pub nullable: bool,
349}
350
351impl RemoteField {
352    pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
353        RemoteField {
354            name: name.into(),
355            remote_type,
356            nullable,
357        }
358    }
359
360    pub fn to_arrow_field(&self) -> Field {
361        Field::new(
362            self.name.clone(),
363            self.remote_type.to_arrow_type(),
364            self.nullable,
365        )
366    }
367}
368
369pub type RemoteSchemaRef = Arc<RemoteSchema>;
370
371#[derive(Debug, Clone)]
372pub struct RemoteSchema {
373    pub fields: Vec<RemoteField>,
374}
375
376impl RemoteSchema {
377    pub fn empty() -> Self {
378        RemoteSchema { fields: vec![] }
379    }
380    pub fn new(fields: Vec<RemoteField>) -> Self {
381        RemoteSchema { fields }
382    }
383
384    pub fn to_arrow_schema(&self) -> Schema {
385        let mut fields = vec![];
386        for remote_field in self.fields.iter() {
387            fields.push(remote_field.to_arrow_field());
388        }
389        Schema::new(fields)
390    }
391}
392
393pub fn project_remote_schema(
394    schema: &RemoteSchema,
395    projection: Option<&Vec<usize>>,
396) -> RemoteSchema {
397    match projection {
398        Some(projection) => {
399            let fields = projection
400                .iter()
401                .map(|i| schema.fields[*i].clone())
402                .collect::<Vec<_>>();
403            RemoteSchema::new(fields)
404        }
405        None => schema.clone(),
406    }
407}