datafusion_remote_table/
schema.rs

1use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
2use std::sync::Arc;
3
4#[derive(Debug, Clone)]
5pub enum RemoteType {
6    Postgres(PostgresType),
7    Mysql(MysqlType),
8    Oracle(OracleType),
9    Sqlite(SqliteType),
10}
11
12impl RemoteType {
13    pub fn to_arrow_type(&self) -> DataType {
14        match self {
15            RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
16            RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
17            RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
18            RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
19        }
20    }
21}
22
23/// https://www.postgresql.org/docs/current/datatype.html
24#[derive(Debug, Clone)]
25pub enum PostgresType {
26    // smallint
27    Int2,
28    // integer
29    Int4,
30    // bigint
31    Int8,
32    // real
33    Float4,
34    // double precision
35    Float8,
36    // numeric(p, s), decimal(p, s)
37    // precision is a fixed value(38)
38    Numeric(i8),
39    Name,
40    // varchar(n)
41    Varchar,
42    // char, char(n), bpchar(n), bpchar
43    Bpchar,
44    Text,
45    Bytea,
46    Date,
47    Timestamp,
48    TimestampTz,
49    Time,
50    Interval,
51    Bool,
52    Json,
53    Jsonb,
54    Int2Array,
55    Int4Array,
56    Int8Array,
57    Float4Array,
58    Float8Array,
59    VarcharArray,
60    BpcharArray,
61    TextArray,
62    ByteaArray,
63    BoolArray,
64    PostGisGeometry,
65}
66
67impl PostgresType {
68    pub fn to_arrow_type(&self) -> DataType {
69        match self {
70            PostgresType::Int2 => DataType::Int16,
71            PostgresType::Int4 => DataType::Int32,
72            PostgresType::Int8 => DataType::Int64,
73            PostgresType::Float4 => DataType::Float32,
74            PostgresType::Float8 => DataType::Float64,
75            PostgresType::Numeric(scale) => DataType::Decimal128(38, *scale),
76            PostgresType::Name
77            | PostgresType::Text
78            | PostgresType::Varchar
79            | PostgresType::Bpchar => DataType::Utf8,
80            PostgresType::Bytea => DataType::Binary,
81            PostgresType::Date => DataType::Date32,
82            PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
83            PostgresType::TimestampTz => {
84                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
85            }
86            PostgresType::Time => DataType::Time64(TimeUnit::Nanosecond),
87            PostgresType::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
88            PostgresType::Bool => DataType::Boolean,
89            PostgresType::Json | PostgresType::Jsonb => DataType::LargeUtf8,
90            PostgresType::Int2Array => {
91                DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
92            }
93            PostgresType::Int4Array => {
94                DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
95            }
96            PostgresType::Int8Array => {
97                DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
98            }
99            PostgresType::Float4Array => {
100                DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
101            }
102            PostgresType::Float8Array => {
103                DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
104            }
105            PostgresType::VarcharArray | PostgresType::BpcharArray | PostgresType::TextArray => {
106                DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
107            }
108            PostgresType::ByteaArray => {
109                DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
110            }
111            PostgresType::BoolArray => {
112                DataType::List(Arc::new(Field::new("", DataType::Boolean, true)))
113            }
114            PostgresType::PostGisGeometry => DataType::Binary,
115        }
116    }
117}
118
119// https://dev.mysql.com/doc/refman/8.4/en/data-types.html
120#[derive(Debug, Clone)]
121pub enum MysqlType {
122    TinyInt,
123    TinyIntUnsigned,
124    SmallInt,
125    SmallIntUnsigned,
126    MediumInt,
127    MediumIntUnsigned,
128    Integer,
129    IntegerUnsigned,
130    BigInt,
131    BigIntUnsigned,
132    Float,
133    Double,
134    Decimal(u8, u8),
135    Date,
136    Datetime,
137    Time,
138    Timestamp,
139    Year,
140    Char,
141    Varchar,
142    Binary,
143    Varbinary,
144    // TinyText, Text, MediumText, LongText
145    Text(u32),
146    // TinyBlob, Blob, MediumBlob, LongBlob
147    Blob(u32),
148    Json,
149    Geometry,
150}
151
152impl MysqlType {
153    pub fn to_arrow_type(&self) -> DataType {
154        match self {
155            MysqlType::TinyInt => DataType::Int8,
156            MysqlType::TinyIntUnsigned => DataType::UInt8,
157            MysqlType::SmallInt => DataType::Int16,
158            MysqlType::SmallIntUnsigned => DataType::UInt16,
159            MysqlType::MediumInt => DataType::Int32,
160            MysqlType::MediumIntUnsigned => DataType::UInt32,
161            MysqlType::Integer => DataType::Int32,
162            MysqlType::IntegerUnsigned => DataType::UInt32,
163            MysqlType::BigInt => DataType::Int64,
164            MysqlType::BigIntUnsigned => DataType::UInt64,
165            MysqlType::Float => DataType::Float32,
166            MysqlType::Double => DataType::Float64,
167            MysqlType::Decimal(precision, scale) => {
168                assert!(*scale <= (i8::MAX as u8));
169                if *precision > 38 {
170                    DataType::Decimal256(*precision, *scale as i8)
171                } else {
172                    DataType::Decimal128(*precision, *scale as i8)
173                }
174            }
175            MysqlType::Date => DataType::Date32,
176            MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
177            MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
178            MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None),
179            MysqlType::Year => DataType::Int16,
180            MysqlType::Char => DataType::Utf8,
181            MysqlType::Varchar => DataType::Utf8,
182            MysqlType::Binary => DataType::Binary,
183            MysqlType::Varbinary => DataType::Binary,
184            MysqlType::Text(col_len) => {
185                if (*col_len as usize) > (i32::MAX as usize) {
186                    DataType::LargeUtf8
187                } else {
188                    DataType::Utf8
189                }
190            }
191            MysqlType::Blob(col_len) => {
192                if (*col_len as usize) > (i32::MAX as usize) {
193                    DataType::LargeBinary
194                } else {
195                    DataType::Binary
196                }
197            }
198            MysqlType::Json => DataType::LargeUtf8,
199            MysqlType::Geometry => DataType::LargeBinary,
200        }
201    }
202}
203
204// https://docs.oracle.com/cd/B28359_01/server.111/b28286/sql_elements001.htm#i54330
205// https://docs.oracle.com/en/database/oracle/oracle-database/21/lnoci/data-types.html
206#[derive(Debug, Clone)]
207pub enum OracleType {
208    Varchar2(u32),
209    Char(u32),
210    Number(u8, i8),
211    Date,
212    Timestamp,
213}
214
215impl OracleType {
216    pub fn to_arrow_type(&self) -> DataType {
217        match self {
218            OracleType::Varchar2(_) => DataType::Utf8,
219            OracleType::Char(_) => DataType::Utf8,
220            OracleType::Number(precision, scale) => DataType::Decimal128(*precision, *scale),
221            OracleType::Date => DataType::Timestamp(TimeUnit::Second, None),
222            OracleType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
223        }
224    }
225}
226
227// https://www.sqlite.org/datatype3.html
228#[derive(Debug, Clone)]
229pub enum SqliteType {
230    Null,
231    Integer,
232    Real,
233    Text,
234    Blob,
235}
236
237impl SqliteType {
238    pub fn to_arrow_type(&self) -> DataType {
239        match self {
240            SqliteType::Null => DataType::Null,
241            SqliteType::Integer => DataType::Int64,
242            SqliteType::Real => DataType::Float64,
243            SqliteType::Text => DataType::Utf8,
244            SqliteType::Blob => DataType::Binary,
245        }
246    }
247}
248
249#[derive(Debug, Clone)]
250pub struct RemoteField {
251    pub name: String,
252    pub remote_type: RemoteType,
253    pub nullable: bool,
254}
255
256impl RemoteField {
257    pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
258        RemoteField {
259            name: name.into(),
260            remote_type,
261            nullable,
262        }
263    }
264
265    pub fn to_arrow_field(&self) -> Field {
266        Field::new(
267            self.name.clone(),
268            self.remote_type.to_arrow_type(),
269            self.nullable,
270        )
271    }
272}
273
274pub type RemoteSchemaRef = Arc<RemoteSchema>;
275
276#[derive(Debug, Clone)]
277pub struct RemoteSchema {
278    pub fields: Vec<RemoteField>,
279}
280
281impl RemoteSchema {
282    pub fn empty() -> Self {
283        RemoteSchema { fields: vec![] }
284    }
285    pub fn new(fields: Vec<RemoteField>) -> Self {
286        RemoteSchema { fields }
287    }
288
289    pub fn to_arrow_schema(&self) -> Schema {
290        let mut fields = vec![];
291        for remote_field in self.fields.iter() {
292            fields.push(remote_field.to_arrow_field());
293        }
294        Schema::new(fields)
295    }
296}
297
298pub fn project_remote_schema(
299    schema: &RemoteSchema,
300    projection: Option<&Vec<usize>>,
301) -> RemoteSchema {
302    match projection {
303        Some(projection) => {
304            let fields = projection
305                .iter()
306                .map(|i| schema.fields[*i].clone())
307                .collect::<Vec<_>>();
308            RemoteSchema::new(fields)
309        }
310        None => schema.clone(),
311    }
312}