datafusion_remote_table/
schema.rs

1use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
2use std::sync::Arc;
3
4#[derive(Debug, Clone)]
5pub enum RemoteType {
6    Postgres(PostgresType),
7    Mysql(MysqlType),
8    Oracle(OracleType),
9    Sqlite(SqliteType),
10    Dm(DmType),
11}
12
13impl RemoteType {
14    pub fn to_arrow_type(&self) -> DataType {
15        match self {
16            RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
17            RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
18            RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
19            RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
20            RemoteType::Dm(dm_type) => dm_type.to_arrow_type(),
21        }
22    }
23}
24
25/// https://www.postgresql.org/docs/current/datatype.html
26#[derive(Debug, Clone)]
27pub enum PostgresType {
28    // smallint
29    Int2,
30    // integer
31    Int4,
32    // bigint
33    Int8,
34    // real
35    Float4,
36    // double precision
37    Float8,
38    // numeric(p, s), decimal(p, s)
39    // precision is a fixed value(38)
40    Numeric(i8),
41    Oid,
42    Name,
43    // varchar(n)
44    Varchar,
45    // char, char(n), bpchar(n), bpchar
46    Bpchar,
47    Text,
48    Bytea,
49    Date,
50    Timestamp,
51    TimestampTz,
52    Time,
53    Interval,
54    Bool,
55    Json,
56    Jsonb,
57    Int2Array,
58    Int4Array,
59    Int8Array,
60    Float4Array,
61    Float8Array,
62    VarcharArray,
63    BpcharArray,
64    TextArray,
65    ByteaArray,
66    BoolArray,
67    PostGisGeometry,
68    Xml,
69    Uuid,
70}
71
72impl PostgresType {
73    pub fn to_arrow_type(&self) -> DataType {
74        match self {
75            PostgresType::Int2 => DataType::Int16,
76            PostgresType::Int4 => DataType::Int32,
77            PostgresType::Int8 => DataType::Int64,
78            PostgresType::Float4 => DataType::Float32,
79            PostgresType::Float8 => DataType::Float64,
80            PostgresType::Numeric(scale) => DataType::Decimal128(38, *scale),
81            PostgresType::Oid => DataType::UInt32,
82            PostgresType::Name
83            | PostgresType::Text
84            | PostgresType::Varchar
85            | PostgresType::Bpchar
86            | PostgresType::Xml => DataType::Utf8,
87            PostgresType::Bytea => DataType::Binary,
88            PostgresType::Date => DataType::Date32,
89            PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
90            PostgresType::TimestampTz => {
91                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
92            }
93            PostgresType::Time => DataType::Time64(TimeUnit::Nanosecond),
94            PostgresType::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
95            PostgresType::Bool => DataType::Boolean,
96            PostgresType::Json | PostgresType::Jsonb => DataType::LargeUtf8,
97            PostgresType::Int2Array => {
98                DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
99            }
100            PostgresType::Int4Array => {
101                DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
102            }
103            PostgresType::Int8Array => {
104                DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
105            }
106            PostgresType::Float4Array => {
107                DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
108            }
109            PostgresType::Float8Array => {
110                DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
111            }
112            PostgresType::VarcharArray | PostgresType::BpcharArray | PostgresType::TextArray => {
113                DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
114            }
115            PostgresType::ByteaArray => {
116                DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
117            }
118            PostgresType::BoolArray => {
119                DataType::List(Arc::new(Field::new("", DataType::Boolean, true)))
120            }
121            PostgresType::PostGisGeometry => DataType::Binary,
122            PostgresType::Uuid => DataType::FixedSizeBinary(16),
123        }
124    }
125}
126
127// https://dev.mysql.com/doc/refman/8.4/en/data-types.html
128#[derive(Debug, Clone)]
129pub enum MysqlType {
130    TinyInt,
131    TinyIntUnsigned,
132    SmallInt,
133    SmallIntUnsigned,
134    MediumInt,
135    MediumIntUnsigned,
136    Integer,
137    IntegerUnsigned,
138    BigInt,
139    BigIntUnsigned,
140    Float,
141    Double,
142    Decimal(u8, u8),
143    Date,
144    Datetime,
145    Time,
146    Timestamp,
147    Year,
148    Char,
149    Varchar,
150    Binary,
151    Varbinary,
152    // TinyText, Text, MediumText, LongText
153    Text(u32),
154    // TinyBlob, Blob, MediumBlob, LongBlob
155    Blob(u32),
156    Json,
157    Geometry,
158}
159
160impl MysqlType {
161    pub fn to_arrow_type(&self) -> DataType {
162        match self {
163            MysqlType::TinyInt => DataType::Int8,
164            MysqlType::TinyIntUnsigned => DataType::UInt8,
165            MysqlType::SmallInt => DataType::Int16,
166            MysqlType::SmallIntUnsigned => DataType::UInt16,
167            MysqlType::MediumInt => DataType::Int32,
168            MysqlType::MediumIntUnsigned => DataType::UInt32,
169            MysqlType::Integer => DataType::Int32,
170            MysqlType::IntegerUnsigned => DataType::UInt32,
171            MysqlType::BigInt => DataType::Int64,
172            MysqlType::BigIntUnsigned => DataType::UInt64,
173            MysqlType::Float => DataType::Float32,
174            MysqlType::Double => DataType::Float64,
175            MysqlType::Decimal(precision, scale) => {
176                assert!(*scale <= (i8::MAX as u8));
177                if *precision > 38 {
178                    DataType::Decimal256(*precision, *scale as i8)
179                } else {
180                    DataType::Decimal128(*precision, *scale as i8)
181                }
182            }
183            MysqlType::Date => DataType::Date32,
184            MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
185            MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
186            MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
187            MysqlType::Year => DataType::Int16,
188            MysqlType::Char => DataType::Utf8,
189            MysqlType::Varchar => DataType::Utf8,
190            MysqlType::Binary => DataType::Binary,
191            MysqlType::Varbinary => DataType::Binary,
192            MysqlType::Text(col_len) => {
193                if (*col_len as usize) > (i32::MAX as usize) {
194                    DataType::LargeUtf8
195                } else {
196                    DataType::Utf8
197                }
198            }
199            MysqlType::Blob(col_len) => {
200                if (*col_len as usize) > (i32::MAX as usize) {
201                    DataType::LargeBinary
202                } else {
203                    DataType::Binary
204                }
205            }
206            MysqlType::Json => DataType::LargeUtf8,
207            MysqlType::Geometry => DataType::LargeBinary,
208        }
209    }
210}
211
212// https://docs.oracle.com/cd/B28359_01/server.111/b28286/sql_elements001.htm#i54330
213// https://docs.oracle.com/en/database/oracle/oracle-database/21/lnoci/data-types.html
214#[derive(Debug, Clone)]
215pub enum OracleType {
216    BinaryFloat,
217    BinaryDouble,
218    Number(u8, i8),
219    Float(u8),
220    Varchar2(u32),
221    NVarchar2(u32),
222    Char(u32),
223    NChar(u32),
224    Long,
225    Clob,
226    NClob,
227    Raw(u32),
228    LongRaw,
229    Blob,
230    Date,
231    Timestamp,
232    Boolean,
233}
234
235impl OracleType {
236    pub fn to_arrow_type(&self) -> DataType {
237        match self {
238            OracleType::BinaryFloat => DataType::Float32,
239            OracleType::BinaryDouble => DataType::Float64,
240            OracleType::Number(precision, scale) => DataType::Decimal128(*precision, *scale),
241            OracleType::Float(_precision) => DataType::Float64,
242            OracleType::Varchar2(_) => DataType::Utf8,
243            OracleType::NVarchar2(_) => DataType::Utf8,
244            OracleType::Char(_) => DataType::Utf8,
245            OracleType::NChar(_) => DataType::Utf8,
246            OracleType::Long => DataType::Utf8,
247            OracleType::Clob => DataType::LargeUtf8,
248            OracleType::NClob => DataType::LargeUtf8,
249            OracleType::Raw(_) => DataType::Binary,
250            OracleType::LongRaw => DataType::Binary,
251            OracleType::Blob => DataType::LargeBinary,
252            OracleType::Date => DataType::Timestamp(TimeUnit::Second, None),
253            OracleType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
254            OracleType::Boolean => DataType::Boolean,
255        }
256    }
257}
258
259// https://www.sqlite.org/datatype3.html
260#[derive(Debug, Clone)]
261pub enum SqliteType {
262    Null,
263    Integer,
264    Real,
265    Text,
266    Blob,
267}
268
269impl SqliteType {
270    pub fn to_arrow_type(&self) -> DataType {
271        match self {
272            SqliteType::Null => DataType::Null,
273            SqliteType::Integer => DataType::Int64,
274            SqliteType::Real => DataType::Float64,
275            SqliteType::Text => DataType::Utf8,
276            SqliteType::Blob => DataType::Binary,
277        }
278    }
279}
280
281// https://eco.dameng.com/document/dm/zh-cn/pm/odbc-rogramming-guide.html
282// https://eco.dameng.com/document/dm/zh-cn/pm/dm8_sql-data-types-operators.html
283#[derive(Debug, Clone)]
284pub enum DmType {
285    TinyInt,
286    SmallInt,
287    Integer,
288    BigInt,
289    Real,
290    Double,
291    Numeric(u8, i8),
292    Decimal(u8, i8),
293    Char(Option<u16>),
294    Varchar(Option<u16>),
295    Text,
296    Binary(u16),
297    Varbinary(Option<u16>),
298    Image,
299    Bit,
300    Timestamp(u8),
301    Time(u8),
302    Date,
303}
304
305impl DmType {
306    pub fn to_arrow_type(&self) -> DataType {
307        match self {
308            DmType::TinyInt => DataType::Int8,
309            DmType::SmallInt => DataType::Int16,
310            DmType::Integer => DataType::Int32,
311            DmType::BigInt => DataType::Int64,
312            DmType::Real => DataType::Float32,
313            DmType::Double => DataType::Float64,
314            DmType::Numeric(precision, scale) => DataType::Decimal128(*precision, *scale),
315            DmType::Decimal(precision, scale) => DataType::Decimal128(*precision, *scale),
316            DmType::Char(_) => DataType::Utf8,
317            DmType::Varchar(_) => DataType::Utf8,
318            DmType::Text => DataType::Utf8,
319            DmType::Binary(len) => DataType::FixedSizeBinary(*len as i32),
320            DmType::Varbinary(_) => DataType::Binary,
321            DmType::Image => DataType::Binary,
322            DmType::Bit => DataType::Boolean,
323            DmType::Timestamp(precision) => {
324                if *precision == 0 {
325                    DataType::Timestamp(TimeUnit::Second, None)
326                } else if *precision <= 3 {
327                    DataType::Timestamp(TimeUnit::Millisecond, None)
328                } else if *precision <= 6 {
329                    DataType::Timestamp(TimeUnit::Microsecond, None)
330                } else {
331                    DataType::Timestamp(TimeUnit::Nanosecond, None)
332                }
333            }
334            DmType::Time(precision) => {
335                if *precision == 0 {
336                    DataType::Time32(TimeUnit::Second)
337                } else if *precision <= 3 {
338                    DataType::Time32(TimeUnit::Millisecond)
339                } else {
340                    DataType::Time64(TimeUnit::Microsecond)
341                }
342            }
343            DmType::Date => DataType::Date32,
344        }
345    }
346}
347
348#[derive(Debug, Clone)]
349pub struct RemoteField {
350    pub name: String,
351    pub remote_type: RemoteType,
352    pub nullable: bool,
353}
354
355impl RemoteField {
356    pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
357        RemoteField {
358            name: name.into(),
359            remote_type,
360            nullable,
361        }
362    }
363
364    pub fn to_arrow_field(&self) -> Field {
365        Field::new(
366            self.name.clone(),
367            self.remote_type.to_arrow_type(),
368            self.nullable,
369        )
370    }
371}
372
373pub type RemoteSchemaRef = Arc<RemoteSchema>;
374
375#[derive(Debug, Clone)]
376pub struct RemoteSchema {
377    pub fields: Vec<RemoteField>,
378}
379
380impl RemoteSchema {
381    pub fn empty() -> Self {
382        RemoteSchema { fields: vec![] }
383    }
384    pub fn new(fields: Vec<RemoteField>) -> Self {
385        RemoteSchema { fields }
386    }
387
388    pub fn to_arrow_schema(&self) -> Schema {
389        let mut fields = vec![];
390        for remote_field in self.fields.iter() {
391            fields.push(remote_field.to_arrow_field());
392        }
393        Schema::new(fields)
394    }
395}
396
397pub fn project_remote_schema(
398    schema: &RemoteSchema,
399    projection: Option<&Vec<usize>>,
400) -> RemoteSchema {
401    match projection {
402        Some(projection) => {
403            let fields = projection
404                .iter()
405                .map(|i| schema.fields[*i].clone())
406                .collect::<Vec<_>>();
407            RemoteSchema::new(fields)
408        }
409        None => schema.clone(),
410    }
411}