1use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
2use std::sync::Arc;
3
4#[derive(Debug, Clone)]
5pub enum RemoteType {
6 Postgres(PostgresType),
7 Mysql(MysqlType),
8 Oracle(OracleType),
9 Sqlite(SqliteType),
10}
11
12impl RemoteType {
13 pub fn to_arrow_type(&self) -> DataType {
14 match self {
15 RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
16 RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
17 RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
18 RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
19 }
20 }
21}
22
23#[derive(Debug, Clone)]
25pub enum PostgresType {
26 Int2,
28 Int4,
30 Int8,
32 Float4,
34 Float8,
36 Numeric(i8),
39 Oid,
40 Name,
41 Varchar,
43 Bpchar,
45 Text,
46 Bytea,
47 Date,
48 Timestamp,
49 TimestampTz,
50 Time,
51 Interval,
52 Bool,
53 Json,
54 Jsonb,
55 Int2Array,
56 Int4Array,
57 Int8Array,
58 Float4Array,
59 Float8Array,
60 VarcharArray,
61 BpcharArray,
62 TextArray,
63 ByteaArray,
64 BoolArray,
65 PostGisGeometry,
66}
67
68impl PostgresType {
69 pub fn to_arrow_type(&self) -> DataType {
70 match self {
71 PostgresType::Int2 => DataType::Int16,
72 PostgresType::Int4 => DataType::Int32,
73 PostgresType::Int8 => DataType::Int64,
74 PostgresType::Float4 => DataType::Float32,
75 PostgresType::Float8 => DataType::Float64,
76 PostgresType::Numeric(scale) => DataType::Decimal128(38, *scale),
77 PostgresType::Oid => DataType::UInt32,
78 PostgresType::Name
79 | PostgresType::Text
80 | PostgresType::Varchar
81 | PostgresType::Bpchar => DataType::Utf8,
82 PostgresType::Bytea => DataType::Binary,
83 PostgresType::Date => DataType::Date32,
84 PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
85 PostgresType::TimestampTz => {
86 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
87 }
88 PostgresType::Time => DataType::Time64(TimeUnit::Nanosecond),
89 PostgresType::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
90 PostgresType::Bool => DataType::Boolean,
91 PostgresType::Json | PostgresType::Jsonb => DataType::LargeUtf8,
92 PostgresType::Int2Array => {
93 DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
94 }
95 PostgresType::Int4Array => {
96 DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
97 }
98 PostgresType::Int8Array => {
99 DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
100 }
101 PostgresType::Float4Array => {
102 DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
103 }
104 PostgresType::Float8Array => {
105 DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
106 }
107 PostgresType::VarcharArray | PostgresType::BpcharArray | PostgresType::TextArray => {
108 DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
109 }
110 PostgresType::ByteaArray => {
111 DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
112 }
113 PostgresType::BoolArray => {
114 DataType::List(Arc::new(Field::new("", DataType::Boolean, true)))
115 }
116 PostgresType::PostGisGeometry => DataType::Binary,
117 }
118 }
119}
120
121#[derive(Debug, Clone)]
123pub enum MysqlType {
124 TinyInt,
125 TinyIntUnsigned,
126 SmallInt,
127 SmallIntUnsigned,
128 MediumInt,
129 MediumIntUnsigned,
130 Integer,
131 IntegerUnsigned,
132 BigInt,
133 BigIntUnsigned,
134 Float,
135 Double,
136 Decimal(u8, u8),
137 Date,
138 Datetime,
139 Time,
140 Timestamp,
141 Year,
142 Char,
143 Varchar,
144 Binary,
145 Varbinary,
146 Text(u32),
148 Blob(u32),
150 Json,
151 Geometry,
152}
153
154impl MysqlType {
155 pub fn to_arrow_type(&self) -> DataType {
156 match self {
157 MysqlType::TinyInt => DataType::Int8,
158 MysqlType::TinyIntUnsigned => DataType::UInt8,
159 MysqlType::SmallInt => DataType::Int16,
160 MysqlType::SmallIntUnsigned => DataType::UInt16,
161 MysqlType::MediumInt => DataType::Int32,
162 MysqlType::MediumIntUnsigned => DataType::UInt32,
163 MysqlType::Integer => DataType::Int32,
164 MysqlType::IntegerUnsigned => DataType::UInt32,
165 MysqlType::BigInt => DataType::Int64,
166 MysqlType::BigIntUnsigned => DataType::UInt64,
167 MysqlType::Float => DataType::Float32,
168 MysqlType::Double => DataType::Float64,
169 MysqlType::Decimal(precision, scale) => {
170 assert!(*scale <= (i8::MAX as u8));
171 if *precision > 38 {
172 DataType::Decimal256(*precision, *scale as i8)
173 } else {
174 DataType::Decimal128(*precision, *scale as i8)
175 }
176 }
177 MysqlType::Date => DataType::Date32,
178 MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
179 MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
180 MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None),
181 MysqlType::Year => DataType::Int16,
182 MysqlType::Char => DataType::Utf8,
183 MysqlType::Varchar => DataType::Utf8,
184 MysqlType::Binary => DataType::Binary,
185 MysqlType::Varbinary => DataType::Binary,
186 MysqlType::Text(col_len) => {
187 if (*col_len as usize) > (i32::MAX as usize) {
188 DataType::LargeUtf8
189 } else {
190 DataType::Utf8
191 }
192 }
193 MysqlType::Blob(col_len) => {
194 if (*col_len as usize) > (i32::MAX as usize) {
195 DataType::LargeBinary
196 } else {
197 DataType::Binary
198 }
199 }
200 MysqlType::Json => DataType::LargeUtf8,
201 MysqlType::Geometry => DataType::LargeBinary,
202 }
203 }
204}
205
206#[derive(Debug, Clone)]
209pub enum OracleType {
210 BinaryFloat,
211 BinaryDouble,
212 Number(u8, i8),
213 Float(u8),
214 Varchar2(u32),
215 NVarchar2(u32),
216 Char(u32),
217 NChar(u32),
218 Long,
219 Clob,
220 NClob,
221 Raw(u32),
222 LongRaw,
223 Blob,
224 Date,
225 Timestamp,
226 Boolean,
227}
228
229impl OracleType {
230 pub fn to_arrow_type(&self) -> DataType {
231 match self {
232 OracleType::BinaryFloat => DataType::Float32,
233 OracleType::BinaryDouble => DataType::Float64,
234 OracleType::Number(precision, scale) => DataType::Decimal128(*precision, *scale),
235 OracleType::Float(_precision) => DataType::Float64,
236 OracleType::Varchar2(_) => DataType::Utf8,
237 OracleType::NVarchar2(_) => DataType::Utf8,
238 OracleType::Char(_) => DataType::Utf8,
239 OracleType::NChar(_) => DataType::Utf8,
240 OracleType::Long => DataType::Utf8,
241 OracleType::Clob => DataType::LargeUtf8,
242 OracleType::NClob => DataType::LargeUtf8,
243 OracleType::Raw(_) => DataType::Binary,
244 OracleType::LongRaw => DataType::Binary,
245 OracleType::Blob => DataType::LargeBinary,
246 OracleType::Date => DataType::Timestamp(TimeUnit::Second, None),
247 OracleType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
248 OracleType::Boolean => DataType::Boolean,
249 }
250 }
251}
252
253#[derive(Debug, Clone)]
255pub enum SqliteType {
256 Null,
257 Integer,
258 Real,
259 Text,
260 Blob,
261}
262
263impl SqliteType {
264 pub fn to_arrow_type(&self) -> DataType {
265 match self {
266 SqliteType::Null => DataType::Null,
267 SqliteType::Integer => DataType::Int64,
268 SqliteType::Real => DataType::Float64,
269 SqliteType::Text => DataType::Utf8,
270 SqliteType::Blob => DataType::Binary,
271 }
272 }
273}
274
275#[derive(Debug, Clone)]
276pub struct RemoteField {
277 pub name: String,
278 pub remote_type: RemoteType,
279 pub nullable: bool,
280}
281
282impl RemoteField {
283 pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
284 RemoteField {
285 name: name.into(),
286 remote_type,
287 nullable,
288 }
289 }
290
291 pub fn to_arrow_field(&self) -> Field {
292 Field::new(
293 self.name.clone(),
294 self.remote_type.to_arrow_type(),
295 self.nullable,
296 )
297 }
298}
299
300pub type RemoteSchemaRef = Arc<RemoteSchema>;
301
302#[derive(Debug, Clone)]
303pub struct RemoteSchema {
304 pub fields: Vec<RemoteField>,
305}
306
307impl RemoteSchema {
308 pub fn empty() -> Self {
309 RemoteSchema { fields: vec![] }
310 }
311 pub fn new(fields: Vec<RemoteField>) -> Self {
312 RemoteSchema { fields }
313 }
314
315 pub fn to_arrow_schema(&self) -> Schema {
316 let mut fields = vec![];
317 for remote_field in self.fields.iter() {
318 fields.push(remote_field.to_arrow_field());
319 }
320 Schema::new(fields)
321 }
322}
323
324pub fn project_remote_schema(
325 schema: &RemoteSchema,
326 projection: Option<&Vec<usize>>,
327) -> RemoteSchema {
328 match projection {
329 Some(projection) => {
330 let fields = projection
331 .iter()
332 .map(|i| schema.fields[*i].clone())
333 .collect::<Vec<_>>();
334 RemoteSchema::new(fields)
335 }
336 None => schema.clone(),
337 }
338}