1use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
2use std::sync::Arc;
3
4#[derive(Debug, Clone)]
5pub enum RemoteType {
6 Postgres(PostgresType),
7 Mysql(MysqlType),
8 Oracle(OracleType),
9 Sqlite(SqliteType),
10}
11
12impl RemoteType {
13 pub fn to_arrow_type(&self) -> DataType {
14 match self {
15 RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
16 RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
17 RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
18 RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
19 }
20 }
21}
22
23#[derive(Debug, Clone)]
25pub enum PostgresType {
26 Bool,
27 Char,
28 Int2,
29 Int4,
30 Int8,
31 Float4,
32 Float8,
33 Text,
34 Varchar,
35 Bpchar,
36 Bytea,
37 Date,
38 Timestamp,
39 TimestampTz,
40 Time,
41 Int2Array,
42 Int4Array,
43 Int8Array,
44 Float4Array,
45 Float8Array,
46 TextArray,
47 VarcharArray,
48 ByteaArray,
49 PostGisGeometry,
50}
51
52impl PostgresType {
53 pub fn to_arrow_type(&self) -> DataType {
54 match self {
55 PostgresType::Bool => DataType::Boolean,
56 PostgresType::Char => DataType::Utf8,
57 PostgresType::Int2 => DataType::Int16,
58 PostgresType::Int4 => DataType::Int32,
59 PostgresType::Int8 => DataType::Int64,
60 PostgresType::Float4 => DataType::Float32,
61 PostgresType::Float8 => DataType::Float64,
62 PostgresType::Text | PostgresType::Varchar | PostgresType::Bpchar => DataType::Utf8,
63 PostgresType::Bytea => DataType::Binary,
64 PostgresType::Date => DataType::Date32,
65 PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
66 PostgresType::TimestampTz => {
67 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
68 }
69 PostgresType::Time => DataType::Time64(TimeUnit::Nanosecond),
70 PostgresType::Int2Array => {
71 DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
72 }
73 PostgresType::Int4Array => {
74 DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
75 }
76 PostgresType::Int8Array => {
77 DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
78 }
79 PostgresType::Float4Array => {
80 DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
81 }
82 PostgresType::Float8Array => {
83 DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
84 }
85 PostgresType::TextArray | PostgresType::VarcharArray => {
86 DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
87 }
88 PostgresType::ByteaArray => {
89 DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
90 }
91 PostgresType::PostGisGeometry => DataType::Binary,
92 }
93 }
94}
95
96#[derive(Debug, Clone)]
98pub enum MysqlType {
99 TinyInt,
100 SmallInt,
101 MediumInt,
102 Integer,
103 BigInt,
104 Float,
105 Double,
106 Date,
107 Datetime,
108 Time,
109 Timestamp,
110 Char,
111 Varchar,
112 Binary,
113 Varbinary,
114 TinyText,
115 Text,
116 MediumText,
117 LongText,
118 TinyBlob,
119 Blob,
120 MediumBlob,
121 LongBlob,
122 Json,
123 Geometry,
124}
125
126impl MysqlType {
127 pub fn to_arrow_type(&self) -> DataType {
128 match self {
129 MysqlType::TinyInt => DataType::Int8,
130 MysqlType::SmallInt => DataType::Int16,
131 MysqlType::MediumInt => DataType::Int32,
132 MysqlType::Integer => DataType::Int32,
133 MysqlType::BigInt => DataType::Int64,
134 MysqlType::Float => DataType::Float32,
135 MysqlType::Double => DataType::Float64,
136 MysqlType::Date => DataType::Date32,
137 MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
138 MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
139 MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None),
140 MysqlType::Char => DataType::Utf8,
141 MysqlType::Varchar => DataType::Utf8,
142 MysqlType::Binary => DataType::Binary,
143 MysqlType::Varbinary => DataType::Binary,
144 MysqlType::TinyText => DataType::Utf8,
145 MysqlType::Text => DataType::Utf8,
146 MysqlType::MediumText => DataType::Utf8,
147 MysqlType::LongText => DataType::LargeUtf8,
148 MysqlType::TinyBlob => DataType::Binary,
149 MysqlType::Blob => DataType::Binary,
150 MysqlType::MediumBlob => DataType::Binary,
151 MysqlType::LongBlob => DataType::LargeBinary,
152 MysqlType::Json => DataType::LargeUtf8,
153 MysqlType::Geometry => DataType::LargeBinary,
154 }
155 }
156}
157
158#[derive(Debug, Clone)]
160pub enum OracleType {
161 Varchar2(u32),
162}
163
164impl OracleType {
165 pub fn to_arrow_type(&self) -> DataType {
166 match self {
167 OracleType::Varchar2(_) => DataType::Utf8,
168 }
169 }
170}
171
172#[derive(Debug, Clone)]
174pub enum SqliteType {
175 Null,
176 Integer,
177 Real,
178 Text,
179 Blob,
180}
181
182impl SqliteType {
183 pub fn to_arrow_type(&self) -> DataType {
184 match self {
185 SqliteType::Null => DataType::Null,
186 SqliteType::Integer => DataType::Int64,
187 SqliteType::Real => DataType::Float64,
188 SqliteType::Text => DataType::Utf8,
189 SqliteType::Blob => DataType::Binary,
190 }
191 }
192}
193
194#[derive(Debug, Clone)]
195pub struct RemoteField {
196 pub name: String,
197 pub remote_type: RemoteType,
198 pub nullable: bool,
199}
200
201impl RemoteField {
202 pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
203 RemoteField {
204 name: name.into(),
205 remote_type,
206 nullable,
207 }
208 }
209
210 pub fn to_arrow_field(&self) -> Field {
211 Field::new(
212 self.name.clone(),
213 self.remote_type.to_arrow_type(),
214 self.nullable,
215 )
216 }
217}
218
219#[derive(Debug, Clone)]
220pub struct RemoteSchema {
221 pub fields: Vec<RemoteField>,
222}
223
224impl RemoteSchema {
225 pub fn empty() -> Self {
226 RemoteSchema { fields: vec![] }
227 }
228 pub fn new(fields: Vec<RemoteField>) -> Self {
229 RemoteSchema { fields }
230 }
231
232 pub fn to_arrow_schema(&self) -> Schema {
233 let mut fields = vec![];
234 for remote_field in self.fields.iter() {
235 fields.push(remote_field.to_arrow_field());
236 }
237 Schema::new(fields)
238 }
239}
240
241pub fn project_remote_schema(
242 schema: &RemoteSchema,
243 projection: Option<&Vec<usize>>,
244) -> RemoteSchema {
245 match projection {
246 Some(projection) => {
247 let fields = projection
248 .iter()
249 .map(|i| schema.fields[*i].clone())
250 .collect::<Vec<_>>();
251 RemoteSchema::new(fields)
252 }
253 None => schema.clone(),
254 }
255}