datafusion_remote_table/
schema.rs

1use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
2use std::sync::Arc;
3
4#[derive(Debug, Clone)]
5pub enum RemoteType {
6    Postgres(PostgresType),
7    Mysql(MysqlType),
8    Oracle(OracleType),
9    Sqlite(SqliteType),
10}
11
12impl RemoteType {
13    pub fn to_arrow_type(&self) -> DataType {
14        match self {
15            RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
16            RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
17            RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
18            RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
19        }
20    }
21}
22
23/// https://www.postgresql.org/docs/current/datatype.html
24#[derive(Debug, Clone)]
25pub enum PostgresType {
26    Bool,
27    Char,
28    Int2,
29    Int4,
30    Int8,
31    Float4,
32    Float8,
33    Text,
34    Varchar,
35    Bpchar,
36    Bytea,
37    Date,
38    Timestamp,
39    TimestampTz,
40    Time,
41    Int2Array,
42    Int4Array,
43    Int8Array,
44    Float4Array,
45    Float8Array,
46    TextArray,
47    VarcharArray,
48    ByteaArray,
49    PostGisGeometry,
50}
51
52impl PostgresType {
53    pub fn to_arrow_type(&self) -> DataType {
54        match self {
55            PostgresType::Bool => DataType::Boolean,
56            PostgresType::Char => DataType::Utf8,
57            PostgresType::Int2 => DataType::Int16,
58            PostgresType::Int4 => DataType::Int32,
59            PostgresType::Int8 => DataType::Int64,
60            PostgresType::Float4 => DataType::Float32,
61            PostgresType::Float8 => DataType::Float64,
62            PostgresType::Text | PostgresType::Varchar | PostgresType::Bpchar => DataType::Utf8,
63            PostgresType::Bytea => DataType::Binary,
64            PostgresType::Date => DataType::Date32,
65            PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
66            PostgresType::TimestampTz => {
67                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
68            }
69            PostgresType::Time => DataType::Time64(TimeUnit::Nanosecond),
70            PostgresType::Int2Array => {
71                DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
72            }
73            PostgresType::Int4Array => {
74                DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
75            }
76            PostgresType::Int8Array => {
77                DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
78            }
79            PostgresType::Float4Array => {
80                DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
81            }
82            PostgresType::Float8Array => {
83                DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
84            }
85            PostgresType::TextArray | PostgresType::VarcharArray => {
86                DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
87            }
88            PostgresType::ByteaArray => {
89                DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
90            }
91            PostgresType::PostGisGeometry => DataType::Binary,
92        }
93    }
94}
95
96// https://dev.mysql.com/doc/refman/8.4/en/data-types.html
97#[derive(Debug, Clone)]
98pub enum MysqlType {
99    TinyInt,
100    SmallInt,
101    MediumInt,
102    Integer,
103    BigInt,
104    Float,
105    Double,
106    Date,
107    Datetime,
108    Time,
109    Timestamp,
110    Char,
111    Varchar,
112    Binary,
113    Varbinary,
114    TinyText,
115    Text,
116    MediumText,
117    LongText,
118    TinyBlob,
119    Blob,
120    MediumBlob,
121    LongBlob,
122    Json,
123    Geometry,
124}
125
126impl MysqlType {
127    pub fn to_arrow_type(&self) -> DataType {
128        match self {
129            MysqlType::TinyInt => DataType::Int8,
130            MysqlType::SmallInt => DataType::Int16,
131            MysqlType::MediumInt => DataType::Int32,
132            MysqlType::Integer => DataType::Int32,
133            MysqlType::BigInt => DataType::Int64,
134            MysqlType::Float => DataType::Float32,
135            MysqlType::Double => DataType::Float64,
136            MysqlType::Date => DataType::Date32,
137            MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
138            MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
139            MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None),
140            MysqlType::Char => DataType::Utf8,
141            MysqlType::Varchar => DataType::Utf8,
142            MysqlType::Binary => DataType::Binary,
143            MysqlType::Varbinary => DataType::Binary,
144            MysqlType::TinyText => DataType::Utf8,
145            MysqlType::Text => DataType::Utf8,
146            MysqlType::MediumText => DataType::Utf8,
147            MysqlType::LongText => DataType::LargeUtf8,
148            MysqlType::TinyBlob => DataType::Binary,
149            MysqlType::Blob => DataType::Binary,
150            MysqlType::MediumBlob => DataType::Binary,
151            MysqlType::LongBlob => DataType::LargeBinary,
152            MysqlType::Json => DataType::LargeUtf8,
153            MysqlType::Geometry => DataType::LargeBinary,
154        }
155    }
156}
157
158// https://docs.oracle.com/cd/B28359_01/server.111/b28286/sql_elements001.htm#i54330
159#[derive(Debug, Clone)]
160pub enum OracleType {
161    Varchar2(u32),
162}
163
164impl OracleType {
165    pub fn to_arrow_type(&self) -> DataType {
166        match self {
167            OracleType::Varchar2(_) => DataType::Utf8,
168        }
169    }
170}
171
172// https://www.sqlite.org/datatype3.html
173#[derive(Debug, Clone)]
174pub enum SqliteType {
175    Null,
176    Integer,
177    Real,
178    Text,
179    Blob,
180}
181
182impl SqliteType {
183    pub fn to_arrow_type(&self) -> DataType {
184        match self {
185            SqliteType::Null => DataType::Null,
186            SqliteType::Integer => DataType::Int64,
187            SqliteType::Real => DataType::Float64,
188            SqliteType::Text => DataType::Utf8,
189            SqliteType::Blob => DataType::Binary,
190        }
191    }
192}
193
194#[derive(Debug, Clone)]
195pub struct RemoteField {
196    pub name: String,
197    pub remote_type: RemoteType,
198    pub nullable: bool,
199}
200
201impl RemoteField {
202    pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
203        RemoteField {
204            name: name.into(),
205            remote_type,
206            nullable,
207        }
208    }
209
210    pub fn to_arrow_field(&self) -> Field {
211        Field::new(
212            self.name.clone(),
213            self.remote_type.to_arrow_type(),
214            self.nullable,
215        )
216    }
217}
218
219#[derive(Debug, Clone)]
220pub struct RemoteSchema {
221    pub fields: Vec<RemoteField>,
222}
223
224impl RemoteSchema {
225    pub fn empty() -> Self {
226        RemoteSchema { fields: vec![] }
227    }
228    pub fn new(fields: Vec<RemoteField>) -> Self {
229        RemoteSchema { fields }
230    }
231
232    pub fn to_arrow_schema(&self) -> Schema {
233        let mut fields = vec![];
234        for remote_field in self.fields.iter() {
235            fields.push(remote_field.to_arrow_field());
236        }
237        Schema::new(fields)
238    }
239}
240
241pub fn project_remote_schema(
242    schema: &RemoteSchema,
243    projection: Option<&Vec<usize>>,
244) -> RemoteSchema {
245    match projection {
246        Some(projection) => {
247            let fields = projection
248                .iter()
249                .map(|i| schema.fields[*i].clone())
250                .collect::<Vec<_>>();
251            RemoteSchema::new(fields)
252        }
253        None => schema.clone(),
254    }
255}