datafusion_remote_table/
schema.rs

1use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
2use std::sync::Arc;
3
4#[derive(Debug, Clone)]
5pub enum RemoteType {
6    Postgres(PostgresType),
7    Mysql(MysqlType),
8    Oracle(OracleType),
9    Sqlite(SqliteType),
10}
11
12impl RemoteType {
13    pub fn to_arrow_type(&self) -> DataType {
14        match self {
15            RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
16            RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
17            RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
18            RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
19        }
20    }
21}
22
23/// https://www.postgresql.org/docs/current/datatype.html
24#[derive(Debug, Clone)]
25pub enum PostgresType {
26    // smallint
27    Int2,
28    // integer
29    Int4,
30    // bigint
31    Int8,
32    // real
33    Float4,
34    // double precision
35    Float8,
36    // numeric(p, s), decimal(p, s)
37    // precision is a fixed value(38)
38    Numeric(i8),
39    Name,
40    // varchar(n)
41    Varchar,
42    // char, char(n), bpchar(n), bpchar
43    Bpchar,
44    Text,
45    Bytea,
46    Date,
47    Timestamp,
48    TimestampTz,
49    Time,
50    Interval,
51    Bool,
52    Json,
53    Jsonb,
54    Int2Array,
55    Int4Array,
56    Int8Array,
57    Float4Array,
58    Float8Array,
59    VarcharArray,
60    BpcharArray,
61    TextArray,
62    ByteaArray,
63    BoolArray,
64    PostGisGeometry,
65}
66
67impl PostgresType {
68    pub fn to_arrow_type(&self) -> DataType {
69        match self {
70            PostgresType::Int2 => DataType::Int16,
71            PostgresType::Int4 => DataType::Int32,
72            PostgresType::Int8 => DataType::Int64,
73            PostgresType::Float4 => DataType::Float32,
74            PostgresType::Float8 => DataType::Float64,
75            PostgresType::Numeric(scale) => DataType::Decimal128(38, *scale),
76            PostgresType::Name
77            | PostgresType::Text
78            | PostgresType::Varchar
79            | PostgresType::Bpchar => DataType::Utf8,
80            PostgresType::Bytea => DataType::Binary,
81            PostgresType::Date => DataType::Date32,
82            PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
83            PostgresType::TimestampTz => {
84                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
85            }
86            PostgresType::Time => DataType::Time64(TimeUnit::Nanosecond),
87            PostgresType::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
88            PostgresType::Bool => DataType::Boolean,
89            PostgresType::Json | PostgresType::Jsonb => DataType::LargeUtf8,
90            PostgresType::Int2Array => {
91                DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
92            }
93            PostgresType::Int4Array => {
94                DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
95            }
96            PostgresType::Int8Array => {
97                DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
98            }
99            PostgresType::Float4Array => {
100                DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
101            }
102            PostgresType::Float8Array => {
103                DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
104            }
105            PostgresType::VarcharArray | PostgresType::BpcharArray | PostgresType::TextArray => {
106                DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
107            }
108            PostgresType::ByteaArray => {
109                DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
110            }
111            PostgresType::BoolArray => {
112                DataType::List(Arc::new(Field::new("", DataType::Boolean, true)))
113            }
114            PostgresType::PostGisGeometry => DataType::Binary,
115        }
116    }
117}
118
119// https://dev.mysql.com/doc/refman/8.4/en/data-types.html
120#[derive(Debug, Clone)]
121pub enum MysqlType {
122    TinyInt,
123    TinyIntUnsigned,
124    SmallInt,
125    SmallIntUnsigned,
126    MediumInt,
127    MediumIntUnsigned,
128    Integer,
129    IntegerUnsigned,
130    BigInt,
131    BigIntUnsigned,
132    Float,
133    Double,
134    Decimal(u8, u8),
135    Date,
136    Datetime,
137    Time,
138    Timestamp,
139    Year,
140    Char,
141    Varchar,
142    Binary,
143    Varbinary,
144    // TinyText, Text, MediumText, LongText
145    Text(u32),
146    // TinyBlob, Blob, MediumBlob, LongBlob
147    Blob(u32),
148    Json,
149    Geometry,
150}
151
152impl MysqlType {
153    pub fn to_arrow_type(&self) -> DataType {
154        match self {
155            MysqlType::TinyInt => DataType::Int8,
156            MysqlType::TinyIntUnsigned => DataType::UInt8,
157            MysqlType::SmallInt => DataType::Int16,
158            MysqlType::SmallIntUnsigned => DataType::UInt16,
159            MysqlType::MediumInt => DataType::Int32,
160            MysqlType::MediumIntUnsigned => DataType::UInt32,
161            MysqlType::Integer => DataType::Int32,
162            MysqlType::IntegerUnsigned => DataType::UInt32,
163            MysqlType::BigInt => DataType::Int64,
164            MysqlType::BigIntUnsigned => DataType::UInt64,
165            MysqlType::Float => DataType::Float32,
166            MysqlType::Double => DataType::Float64,
167            MysqlType::Decimal(precision, scale) => {
168                assert!(*scale <= (i8::MAX as u8));
169                if *precision > 38 {
170                    DataType::Decimal256(*precision, *scale as i8)
171                } else {
172                    DataType::Decimal128(*precision, *scale as i8)
173                }
174            }
175            MysqlType::Date => DataType::Date32,
176            MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
177            MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
178            MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None),
179            MysqlType::Year => DataType::Int16,
180            MysqlType::Char => DataType::Utf8,
181            MysqlType::Varchar => DataType::Utf8,
182            MysqlType::Binary => DataType::Binary,
183            MysqlType::Varbinary => DataType::Binary,
184            MysqlType::Text(col_len) => {
185                if (*col_len as usize) > (i32::MAX as usize) {
186                    DataType::LargeUtf8
187                } else {
188                    DataType::Utf8
189                }
190            }
191            MysqlType::Blob(col_len) => {
192                if (*col_len as usize) > (i32::MAX as usize) {
193                    DataType::LargeBinary
194                } else {
195                    DataType::Binary
196                }
197            }
198            MysqlType::Json => DataType::LargeUtf8,
199            MysqlType::Geometry => DataType::LargeBinary,
200        }
201    }
202}
203
204// https://docs.oracle.com/cd/B28359_01/server.111/b28286/sql_elements001.htm#i54330
205#[derive(Debug, Clone)]
206pub enum OracleType {
207    Varchar2(u32),
208    Char(u32),
209    Number(u8, i8),
210    Date,
211    Timestamp,
212}
213
214impl OracleType {
215    pub fn to_arrow_type(&self) -> DataType {
216        match self {
217            OracleType::Varchar2(_) => DataType::Utf8,
218            OracleType::Char(_) => DataType::Utf8,
219            OracleType::Number(precision, scale) => DataType::Decimal128(*precision, *scale),
220            OracleType::Date => DataType::Timestamp(TimeUnit::Second, None),
221            OracleType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
222        }
223    }
224}
225
226// https://www.sqlite.org/datatype3.html
227#[derive(Debug, Clone)]
228pub enum SqliteType {
229    Null,
230    Integer,
231    Real,
232    Text,
233    Blob,
234}
235
236impl SqliteType {
237    pub fn to_arrow_type(&self) -> DataType {
238        match self {
239            SqliteType::Null => DataType::Null,
240            SqliteType::Integer => DataType::Int64,
241            SqliteType::Real => DataType::Float64,
242            SqliteType::Text => DataType::Utf8,
243            SqliteType::Blob => DataType::Binary,
244        }
245    }
246}
247
248#[derive(Debug, Clone)]
249pub struct RemoteField {
250    pub name: String,
251    pub remote_type: RemoteType,
252    pub nullable: bool,
253}
254
255impl RemoteField {
256    pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
257        RemoteField {
258            name: name.into(),
259            remote_type,
260            nullable,
261        }
262    }
263
264    pub fn to_arrow_field(&self) -> Field {
265        Field::new(
266            self.name.clone(),
267            self.remote_type.to_arrow_type(),
268            self.nullable,
269        )
270    }
271}
272
273pub type RemoteSchemaRef = Arc<RemoteSchema>;
274
275#[derive(Debug, Clone)]
276pub struct RemoteSchema {
277    pub fields: Vec<RemoteField>,
278}
279
280impl RemoteSchema {
281    pub fn empty() -> Self {
282        RemoteSchema { fields: vec![] }
283    }
284    pub fn new(fields: Vec<RemoteField>) -> Self {
285        RemoteSchema { fields }
286    }
287
288    pub fn to_arrow_schema(&self) -> Schema {
289        let mut fields = vec![];
290        for remote_field in self.fields.iter() {
291            fields.push(remote_field.to_arrow_field());
292        }
293        Schema::new(fields)
294    }
295}
296
297pub fn project_remote_schema(
298    schema: &RemoteSchema,
299    projection: Option<&Vec<usize>>,
300) -> RemoteSchema {
301    match projection {
302        Some(projection) => {
303            let fields = projection
304                .iter()
305                .map(|i| schema.fields[*i].clone())
306                .collect::<Vec<_>>();
307            RemoteSchema::new(fields)
308        }
309        None => schema.clone(),
310    }
311}