datafusion_remote_table/
schema.rs

1use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
2use std::sync::Arc;
3
4#[derive(Debug, Clone)]
5pub enum RemoteType {
6    Postgres(PostgresType),
7    Mysql(MysqlType),
8    Oracle(OracleType),
9    Sqlite(SqliteType),
10}
11
12impl RemoteType {
13    pub fn to_arrow_type(&self) -> DataType {
14        match self {
15            RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
16            RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
17            RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
18            RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
19        }
20    }
21}
22
23/// https://www.postgresql.org/docs/current/datatype.html
24#[derive(Debug, Clone)]
25pub enum PostgresType {
26    // smallint
27    Int2,
28    // integer
29    Int4,
30    // bigint
31    Int8,
32    // real
33    Float4,
34    // double precision
35    Float8,
36    // numeric(p, s), decimal(p, s)
37    // precision is a fixed value(38)
38    Numeric(i8),
39    // varchar(n)
40    Varchar,
41    // char, char(n), bpchar(n), bpchar
42    Bpchar,
43    Text,
44    Bytea,
45    Date,
46    Timestamp,
47    TimestampTz,
48    Time,
49    Interval,
50    Bool,
51    Json,
52    Jsonb,
53    Int2Array,
54    Int4Array,
55    Int8Array,
56    Float4Array,
57    Float8Array,
58    VarcharArray,
59    BpcharArray,
60    TextArray,
61    ByteaArray,
62    BoolArray,
63    PostGisGeometry,
64}
65
66impl PostgresType {
67    pub fn to_arrow_type(&self) -> DataType {
68        match self {
69            PostgresType::Int2 => DataType::Int16,
70            PostgresType::Int4 => DataType::Int32,
71            PostgresType::Int8 => DataType::Int64,
72            PostgresType::Float4 => DataType::Float32,
73            PostgresType::Float8 => DataType::Float64,
74            PostgresType::Numeric(scale) => DataType::Decimal128(38, *scale),
75            PostgresType::Text | PostgresType::Varchar | PostgresType::Bpchar => DataType::Utf8,
76            PostgresType::Bytea => DataType::Binary,
77            PostgresType::Date => DataType::Date32,
78            PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
79            PostgresType::TimestampTz => {
80                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
81            }
82            PostgresType::Time => DataType::Time64(TimeUnit::Nanosecond),
83            PostgresType::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
84            PostgresType::Bool => DataType::Boolean,
85            PostgresType::Json | PostgresType::Jsonb => DataType::LargeUtf8,
86            PostgresType::Int2Array => {
87                DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
88            }
89            PostgresType::Int4Array => {
90                DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
91            }
92            PostgresType::Int8Array => {
93                DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
94            }
95            PostgresType::Float4Array => {
96                DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
97            }
98            PostgresType::Float8Array => {
99                DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
100            }
101            PostgresType::VarcharArray | PostgresType::BpcharArray | PostgresType::TextArray => {
102                DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
103            }
104            PostgresType::ByteaArray => {
105                DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
106            }
107            PostgresType::BoolArray => {
108                DataType::List(Arc::new(Field::new("", DataType::Boolean, true)))
109            }
110            PostgresType::PostGisGeometry => DataType::Binary,
111        }
112    }
113}
114
115// https://dev.mysql.com/doc/refman/8.4/en/data-types.html
116#[derive(Debug, Clone)]
117pub enum MysqlType {
118    TinyInt,
119    SmallInt,
120    MediumInt,
121    Integer,
122    BigInt,
123    Float,
124    Double,
125    Decimal(u8, u8),
126    Date,
127    Datetime,
128    Time,
129    Timestamp,
130    Year,
131    Char,
132    Varchar,
133    Binary,
134    Varbinary,
135    TinyText,
136    Text,
137    MediumText,
138    LongText,
139    TinyBlob,
140    Blob,
141    MediumBlob,
142    LongBlob,
143    Json,
144    Geometry,
145}
146
147impl MysqlType {
148    pub fn to_arrow_type(&self) -> DataType {
149        match self {
150            MysqlType::TinyInt => DataType::Int8,
151            MysqlType::SmallInt => DataType::Int16,
152            MysqlType::MediumInt => DataType::Int32,
153            MysqlType::Integer => DataType::Int32,
154            MysqlType::BigInt => DataType::Int64,
155            MysqlType::Float => DataType::Float32,
156            MysqlType::Double => DataType::Float64,
157            MysqlType::Decimal(precision, scale) => {
158                assert!(*scale <= (i8::MAX as u8));
159                if *precision > 38 {
160                    DataType::Decimal256(*precision, *scale as i8)
161                } else {
162                    DataType::Decimal128(*precision, *scale as i8)
163                }
164            }
165            MysqlType::Date => DataType::Date32,
166            MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
167            MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
168            MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None),
169            MysqlType::Year => DataType::Int16,
170            MysqlType::Char => DataType::Utf8,
171            MysqlType::Varchar => DataType::Utf8,
172            MysqlType::Binary => DataType::Binary,
173            MysqlType::Varbinary => DataType::Binary,
174            MysqlType::TinyText => DataType::Utf8,
175            MysqlType::Text => DataType::Utf8,
176            MysqlType::MediumText => DataType::Utf8,
177            MysqlType::LongText => DataType::LargeUtf8,
178            MysqlType::TinyBlob => DataType::Binary,
179            MysqlType::Blob => DataType::Binary,
180            MysqlType::MediumBlob => DataType::Binary,
181            MysqlType::LongBlob => DataType::LargeBinary,
182            MysqlType::Json => DataType::LargeUtf8,
183            MysqlType::Geometry => DataType::LargeBinary,
184        }
185    }
186}
187
188// https://docs.oracle.com/cd/B28359_01/server.111/b28286/sql_elements001.htm#i54330
189#[derive(Debug, Clone)]
190pub enum OracleType {
191    Varchar2(u32),
192    Char(u32),
193    Number(u8, i8),
194    Date,
195    Timestamp,
196}
197
198impl OracleType {
199    pub fn to_arrow_type(&self) -> DataType {
200        match self {
201            OracleType::Varchar2(_) => DataType::Utf8,
202            OracleType::Char(_) => DataType::Utf8,
203            OracleType::Number(precision, scale) => DataType::Decimal128(*precision, *scale),
204            OracleType::Date => DataType::Timestamp(TimeUnit::Second, None),
205            OracleType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
206        }
207    }
208}
209
210// https://www.sqlite.org/datatype3.html
211#[derive(Debug, Clone)]
212pub enum SqliteType {
213    Null,
214    Integer,
215    Real,
216    Text,
217    Blob,
218}
219
220impl SqliteType {
221    pub fn to_arrow_type(&self) -> DataType {
222        match self {
223            SqliteType::Null => DataType::Null,
224            SqliteType::Integer => DataType::Int64,
225            SqliteType::Real => DataType::Float64,
226            SqliteType::Text => DataType::Utf8,
227            SqliteType::Blob => DataType::Binary,
228        }
229    }
230}
231
232#[derive(Debug, Clone)]
233pub struct RemoteField {
234    pub name: String,
235    pub remote_type: RemoteType,
236    pub nullable: bool,
237}
238
239impl RemoteField {
240    pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
241        RemoteField {
242            name: name.into(),
243            remote_type,
244            nullable,
245        }
246    }
247
248    pub fn to_arrow_field(&self) -> Field {
249        Field::new(
250            self.name.clone(),
251            self.remote_type.to_arrow_type(),
252            self.nullable,
253        )
254    }
255}
256
257#[derive(Debug, Clone)]
258pub struct RemoteSchema {
259    pub fields: Vec<RemoteField>,
260}
261
262impl RemoteSchema {
263    pub fn empty() -> Self {
264        RemoteSchema { fields: vec![] }
265    }
266    pub fn new(fields: Vec<RemoteField>) -> Self {
267        RemoteSchema { fields }
268    }
269
270    pub fn to_arrow_schema(&self) -> Schema {
271        let mut fields = vec![];
272        for remote_field in self.fields.iter() {
273            fields.push(remote_field.to_arrow_field());
274        }
275        Schema::new(fields)
276    }
277}
278
279pub fn project_remote_schema(
280    schema: &RemoteSchema,
281    projection: Option<&Vec<usize>>,
282) -> RemoteSchema {
283    match projection {
284        Some(projection) => {
285            let fields = projection
286                .iter()
287                .map(|i| schema.fields[*i].clone())
288                .collect::<Vec<_>>();
289            RemoteSchema::new(fields)
290        }
291        None => schema.clone(),
292    }
293}