datafusion_remote_table/
schema.rs

1use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
2use std::sync::Arc;
3
4#[derive(Debug, Clone)]
5pub enum RemoteType {
6    Postgres(PostgresType),
7    Mysql(MysqlType),
8    Oracle(OracleType),
9    Sqlite(SqliteType),
10}
11
12impl RemoteType {
13    pub fn to_arrow_type(&self) -> DataType {
14        match self {
15            RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
16            RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
17            RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
18            RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
19        }
20    }
21}
22
23/// https://www.postgresql.org/docs/current/datatype.html
24#[derive(Debug, Clone)]
25pub enum PostgresType {
26    Bool,
27    Char,
28    Int2,
29    Int4,
30    Int8,
31    Float4,
32    Float8,
33    Text,
34    Varchar,
35    Bpchar,
36    Bytea,
37    Date,
38    Timestamp,
39    TimestampTz,
40    Time,
41    Int2Array,
42    Int4Array,
43    Int8Array,
44    Float4Array,
45    Float8Array,
46    TextArray,
47    VarcharArray,
48    ByteaArray,
49    PostGisGeometry,
50}
51
52impl PostgresType {
53    pub fn to_arrow_type(&self) -> DataType {
54        match self {
55            PostgresType::Bool => DataType::Boolean,
56            PostgresType::Char => DataType::Utf8,
57            PostgresType::Int2 => DataType::Int16,
58            PostgresType::Int4 => DataType::Int32,
59            PostgresType::Int8 => DataType::Int64,
60            PostgresType::Float4 => DataType::Float32,
61            PostgresType::Float8 => DataType::Float64,
62            PostgresType::Text | PostgresType::Varchar | PostgresType::Bpchar => DataType::Utf8,
63            PostgresType::Bytea => DataType::Binary,
64            PostgresType::Date => DataType::Date32,
65            PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
66            PostgresType::TimestampTz => {
67                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
68            }
69            PostgresType::Time => DataType::Time64(TimeUnit::Nanosecond),
70            PostgresType::Int2Array => {
71                DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
72            }
73            PostgresType::Int4Array => {
74                DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
75            }
76            PostgresType::Int8Array => {
77                DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
78            }
79            PostgresType::Float4Array => {
80                DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
81            }
82            PostgresType::Float8Array => {
83                DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
84            }
85            PostgresType::TextArray | PostgresType::VarcharArray => {
86                DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
87            }
88            PostgresType::ByteaArray => {
89                DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
90            }
91            PostgresType::PostGisGeometry => DataType::Binary,
92        }
93    }
94}
95
96#[derive(Debug, Clone)]
97pub enum MysqlType {
98    TinyInt,
99    SmallInt,
100    Integer,
101    BigInt,
102    Float,
103    Double,
104}
105
106impl MysqlType {
107    pub fn to_arrow_type(&self) -> DataType {
108        match self {
109            MysqlType::TinyInt => DataType::Int8,
110            MysqlType::SmallInt => DataType::Int16,
111            MysqlType::Integer => DataType::Int32,
112            MysqlType::BigInt => DataType::Int64,
113            MysqlType::Float => DataType::Float32,
114            MysqlType::Double => DataType::Float64,
115        }
116    }
117}
118
119// https://docs.oracle.com/cd/B28359_01/server.111/b28286/sql_elements001.htm#i54330
120#[derive(Debug, Clone)]
121pub enum OracleType {
122    Varchar2(u32),
123}
124
125impl OracleType {
126    pub fn to_arrow_type(&self) -> DataType {
127        match self {
128            OracleType::Varchar2(_) => DataType::Utf8,
129        }
130    }
131}
132
133// https://www.sqlite.org/datatype3.html
134#[derive(Debug, Clone)]
135pub enum SqliteType {
136    Null,
137    Integer,
138    Real,
139    Text,
140    Blob,
141}
142
143impl SqliteType {
144    pub fn to_arrow_type(&self) -> DataType {
145        match self {
146            SqliteType::Null => DataType::Null,
147            SqliteType::Integer => DataType::Int64,
148            SqliteType::Real => DataType::Float64,
149            SqliteType::Text => DataType::Utf8,
150            SqliteType::Blob => DataType::Binary,
151        }
152    }
153}
154
155#[derive(Debug, Clone)]
156pub struct RemoteField {
157    pub name: String,
158    pub remote_type: RemoteType,
159    pub nullable: bool,
160}
161
162impl RemoteField {
163    pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
164        RemoteField {
165            name: name.into(),
166            remote_type,
167            nullable,
168        }
169    }
170
171    pub fn to_arrow_field(&self) -> Field {
172        Field::new(
173            self.name.clone(),
174            self.remote_type.to_arrow_type(),
175            self.nullable,
176        )
177    }
178}
179
180#[derive(Debug, Clone)]
181pub struct RemoteSchema {
182    pub fields: Vec<RemoteField>,
183}
184
185impl RemoteSchema {
186    pub fn empty() -> Self {
187        RemoteSchema { fields: vec![] }
188    }
189    pub fn new(fields: Vec<RemoteField>) -> Self {
190        RemoteSchema { fields }
191    }
192
193    pub fn to_arrow_schema(&self) -> Schema {
194        let mut fields = vec![];
195        for remote_field in self.fields.iter() {
196            fields.push(remote_field.to_arrow_field());
197        }
198        Schema::new(fields)
199    }
200}
201
202pub fn project_remote_schema(
203    schema: &RemoteSchema,
204    projection: Option<&Vec<usize>>,
205) -> RemoteSchema {
206    match projection {
207        Some(projection) => {
208            let fields = projection
209                .iter()
210                .map(|i| schema.fields[*i].clone())
211                .collect::<Vec<_>>();
212            RemoteSchema::new(fields)
213        }
214        None => schema.clone(),
215    }
216}