datafusion_remote_table/
schema.rs

1use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
2use std::sync::Arc;
3
4#[derive(Debug, Clone)]
5pub enum RemoteType {
6    Postgres(PostgresType),
7    Mysql(MysqlType),
8    Oracle(OracleType),
9    Sqlite(SqliteType),
10}
11
12impl RemoteType {
13    pub fn to_arrow_type(&self) -> DataType {
14        match self {
15            RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
16            RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
17            RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
18            RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
19        }
20    }
21}
22
23/// https://www.postgresql.org/docs/current/datatype.html
24#[derive(Debug, Clone)]
25pub enum PostgresType {
26    // smallint
27    Int2,
28    // integer
29    Int4,
30    // bigint
31    Int8,
32    // real
33    Float4,
34    // double precision
35    Float8,
36    // numeric(p, s), decimal(p, s)
37    // precision is a fixed value(38)
38    Numeric(i8),
39    Oid,
40    Name,
41    // varchar(n)
42    Varchar,
43    // char, char(n), bpchar(n), bpchar
44    Bpchar,
45    Text,
46    Bytea,
47    Date,
48    Timestamp,
49    TimestampTz,
50    Time,
51    Interval,
52    Bool,
53    Json,
54    Jsonb,
55    Int2Array,
56    Int4Array,
57    Int8Array,
58    Float4Array,
59    Float8Array,
60    VarcharArray,
61    BpcharArray,
62    TextArray,
63    ByteaArray,
64    BoolArray,
65    PostGisGeometry,
66}
67
68impl PostgresType {
69    pub fn to_arrow_type(&self) -> DataType {
70        match self {
71            PostgresType::Int2 => DataType::Int16,
72            PostgresType::Int4 => DataType::Int32,
73            PostgresType::Int8 => DataType::Int64,
74            PostgresType::Float4 => DataType::Float32,
75            PostgresType::Float8 => DataType::Float64,
76            PostgresType::Numeric(scale) => DataType::Decimal128(38, *scale),
77            PostgresType::Oid => DataType::UInt32,
78            PostgresType::Name
79            | PostgresType::Text
80            | PostgresType::Varchar
81            | PostgresType::Bpchar => DataType::Utf8,
82            PostgresType::Bytea => DataType::Binary,
83            PostgresType::Date => DataType::Date32,
84            PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
85            PostgresType::TimestampTz => {
86                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
87            }
88            PostgresType::Time => DataType::Time64(TimeUnit::Nanosecond),
89            PostgresType::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
90            PostgresType::Bool => DataType::Boolean,
91            PostgresType::Json | PostgresType::Jsonb => DataType::LargeUtf8,
92            PostgresType::Int2Array => {
93                DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
94            }
95            PostgresType::Int4Array => {
96                DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
97            }
98            PostgresType::Int8Array => {
99                DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
100            }
101            PostgresType::Float4Array => {
102                DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
103            }
104            PostgresType::Float8Array => {
105                DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
106            }
107            PostgresType::VarcharArray | PostgresType::BpcharArray | PostgresType::TextArray => {
108                DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
109            }
110            PostgresType::ByteaArray => {
111                DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
112            }
113            PostgresType::BoolArray => {
114                DataType::List(Arc::new(Field::new("", DataType::Boolean, true)))
115            }
116            PostgresType::PostGisGeometry => DataType::Binary,
117        }
118    }
119}
120
121// https://dev.mysql.com/doc/refman/8.4/en/data-types.html
122#[derive(Debug, Clone)]
123pub enum MysqlType {
124    TinyInt,
125    TinyIntUnsigned,
126    SmallInt,
127    SmallIntUnsigned,
128    MediumInt,
129    MediumIntUnsigned,
130    Integer,
131    IntegerUnsigned,
132    BigInt,
133    BigIntUnsigned,
134    Float,
135    Double,
136    Decimal(u8, u8),
137    Date,
138    Datetime,
139    Time,
140    Timestamp,
141    Year,
142    Char,
143    Varchar,
144    Binary,
145    Varbinary,
146    // TinyText, Text, MediumText, LongText
147    Text(u32),
148    // TinyBlob, Blob, MediumBlob, LongBlob
149    Blob(u32),
150    Json,
151    Geometry,
152}
153
154impl MysqlType {
155    pub fn to_arrow_type(&self) -> DataType {
156        match self {
157            MysqlType::TinyInt => DataType::Int8,
158            MysqlType::TinyIntUnsigned => DataType::UInt8,
159            MysqlType::SmallInt => DataType::Int16,
160            MysqlType::SmallIntUnsigned => DataType::UInt16,
161            MysqlType::MediumInt => DataType::Int32,
162            MysqlType::MediumIntUnsigned => DataType::UInt32,
163            MysqlType::Integer => DataType::Int32,
164            MysqlType::IntegerUnsigned => DataType::UInt32,
165            MysqlType::BigInt => DataType::Int64,
166            MysqlType::BigIntUnsigned => DataType::UInt64,
167            MysqlType::Float => DataType::Float32,
168            MysqlType::Double => DataType::Float64,
169            MysqlType::Decimal(precision, scale) => {
170                assert!(*scale <= (i8::MAX as u8));
171                if *precision > 38 {
172                    DataType::Decimal256(*precision, *scale as i8)
173                } else {
174                    DataType::Decimal128(*precision, *scale as i8)
175                }
176            }
177            MysqlType::Date => DataType::Date32,
178            MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
179            MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
180            MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None),
181            MysqlType::Year => DataType::Int16,
182            MysqlType::Char => DataType::Utf8,
183            MysqlType::Varchar => DataType::Utf8,
184            MysqlType::Binary => DataType::Binary,
185            MysqlType::Varbinary => DataType::Binary,
186            MysqlType::Text(col_len) => {
187                if (*col_len as usize) > (i32::MAX as usize) {
188                    DataType::LargeUtf8
189                } else {
190                    DataType::Utf8
191                }
192            }
193            MysqlType::Blob(col_len) => {
194                if (*col_len as usize) > (i32::MAX as usize) {
195                    DataType::LargeBinary
196                } else {
197                    DataType::Binary
198                }
199            }
200            MysqlType::Json => DataType::LargeUtf8,
201            MysqlType::Geometry => DataType::LargeBinary,
202        }
203    }
204}
205
206// https://docs.oracle.com/cd/B28359_01/server.111/b28286/sql_elements001.htm#i54330
207// https://docs.oracle.com/en/database/oracle/oracle-database/21/lnoci/data-types.html
208#[derive(Debug, Clone)]
209pub enum OracleType {
210    BinaryFloat,
211    BinaryDouble,
212    Number(u8, i8),
213    Float(u8),
214    Varchar2(u32),
215    NVarchar2(u32),
216    Char(u32),
217    NChar(u32),
218    Long,
219    Clob,
220    NClob,
221    Raw(u32),
222    LongRaw,
223    Blob,
224    Date,
225    Timestamp,
226    Boolean,
227}
228
229impl OracleType {
230    pub fn to_arrow_type(&self) -> DataType {
231        match self {
232            OracleType::BinaryFloat => DataType::Float32,
233            OracleType::BinaryDouble => DataType::Float64,
234            OracleType::Number(precision, scale) => DataType::Decimal128(*precision, *scale),
235            OracleType::Float(_precision) => DataType::Float64,
236            OracleType::Varchar2(_) => DataType::Utf8,
237            OracleType::NVarchar2(_) => DataType::Utf8,
238            OracleType::Char(_) => DataType::Utf8,
239            OracleType::NChar(_) => DataType::Utf8,
240            OracleType::Long => DataType::Utf8,
241            OracleType::Clob => DataType::LargeUtf8,
242            OracleType::NClob => DataType::LargeUtf8,
243            OracleType::Raw(_) => DataType::Binary,
244            OracleType::LongRaw => DataType::Binary,
245            OracleType::Blob => DataType::LargeBinary,
246            OracleType::Date => DataType::Timestamp(TimeUnit::Second, None),
247            OracleType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
248            OracleType::Boolean => DataType::Boolean,
249        }
250    }
251}
252
253// https://www.sqlite.org/datatype3.html
254#[derive(Debug, Clone)]
255pub enum SqliteType {
256    Null,
257    Integer,
258    Real,
259    Text,
260    Blob,
261}
262
263impl SqliteType {
264    pub fn to_arrow_type(&self) -> DataType {
265        match self {
266            SqliteType::Null => DataType::Null,
267            SqliteType::Integer => DataType::Int64,
268            SqliteType::Real => DataType::Float64,
269            SqliteType::Text => DataType::Utf8,
270            SqliteType::Blob => DataType::Binary,
271        }
272    }
273}
274
275#[derive(Debug, Clone)]
276pub struct RemoteField {
277    pub name: String,
278    pub remote_type: RemoteType,
279    pub nullable: bool,
280}
281
282impl RemoteField {
283    pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
284        RemoteField {
285            name: name.into(),
286            remote_type,
287            nullable,
288        }
289    }
290
291    pub fn to_arrow_field(&self) -> Field {
292        Field::new(
293            self.name.clone(),
294            self.remote_type.to_arrow_type(),
295            self.nullable,
296        )
297    }
298}
299
300pub type RemoteSchemaRef = Arc<RemoteSchema>;
301
302#[derive(Debug, Clone)]
303pub struct RemoteSchema {
304    pub fields: Vec<RemoteField>,
305}
306
307impl RemoteSchema {
308    pub fn empty() -> Self {
309        RemoteSchema { fields: vec![] }
310    }
311    pub fn new(fields: Vec<RemoteField>) -> Self {
312        RemoteSchema { fields }
313    }
314
315    pub fn to_arrow_schema(&self) -> Schema {
316        let mut fields = vec![];
317        for remote_field in self.fields.iter() {
318            fields.push(remote_field.to_arrow_field());
319        }
320        Schema::new(fields)
321    }
322}
323
324pub fn project_remote_schema(
325    schema: &RemoteSchema,
326    projection: Option<&Vec<usize>>,
327) -> RemoteSchema {
328    match projection {
329        Some(projection) => {
330            let fields = projection
331                .iter()
332                .map(|i| schema.fields[*i].clone())
333                .collect::<Vec<_>>();
334            RemoteSchema::new(fields)
335        }
336        None => schema.clone(),
337    }
338}