1use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
2use std::sync::Arc;
3
4#[derive(Debug, Clone)]
5pub enum RemoteType {
6 Postgres(PostgresType),
7 Mysql(MysqlType),
8 Oracle(OracleType),
9 Sqlite(SqliteType),
10 Dm(DmType),
11}
12
13impl RemoteType {
14 pub fn to_arrow_type(&self) -> DataType {
15 match self {
16 RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
17 RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
18 RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
19 RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
20 RemoteType::Dm(dm_type) => dm_type.to_arrow_type(),
21 }
22 }
23}
24
25#[derive(Debug, Clone)]
27pub enum PostgresType {
28 Int2,
30 Int4,
32 Int8,
34 Float4,
36 Float8,
38 Numeric(i8),
41 Oid,
42 Name,
43 Varchar,
45 Bpchar,
47 Text,
48 Bytea,
49 Date,
50 Timestamp,
51 TimestampTz,
52 Time,
53 Interval,
54 Bool,
55 Json,
56 Jsonb,
57 Int2Array,
58 Int4Array,
59 Int8Array,
60 Float4Array,
61 Float8Array,
62 VarcharArray,
63 BpcharArray,
64 TextArray,
65 ByteaArray,
66 BoolArray,
67 PostGisGeometry,
68 Xml,
69 Uuid,
70}
71
72impl PostgresType {
73 pub fn to_arrow_type(&self) -> DataType {
74 match self {
75 PostgresType::Int2 => DataType::Int16,
76 PostgresType::Int4 => DataType::Int32,
77 PostgresType::Int8 => DataType::Int64,
78 PostgresType::Float4 => DataType::Float32,
79 PostgresType::Float8 => DataType::Float64,
80 PostgresType::Numeric(scale) => DataType::Decimal128(38, *scale),
81 PostgresType::Oid => DataType::UInt32,
82 PostgresType::Name
83 | PostgresType::Text
84 | PostgresType::Varchar
85 | PostgresType::Bpchar
86 | PostgresType::Xml => DataType::Utf8,
87 PostgresType::Bytea => DataType::Binary,
88 PostgresType::Date => DataType::Date32,
89 PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
90 PostgresType::TimestampTz => {
91 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
92 }
93 PostgresType::Time => DataType::Time64(TimeUnit::Nanosecond),
94 PostgresType::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
95 PostgresType::Bool => DataType::Boolean,
96 PostgresType::Json | PostgresType::Jsonb => DataType::LargeUtf8,
97 PostgresType::Int2Array => {
98 DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
99 }
100 PostgresType::Int4Array => {
101 DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
102 }
103 PostgresType::Int8Array => {
104 DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
105 }
106 PostgresType::Float4Array => {
107 DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
108 }
109 PostgresType::Float8Array => {
110 DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
111 }
112 PostgresType::VarcharArray | PostgresType::BpcharArray | PostgresType::TextArray => {
113 DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
114 }
115 PostgresType::ByteaArray => {
116 DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
117 }
118 PostgresType::BoolArray => {
119 DataType::List(Arc::new(Field::new("", DataType::Boolean, true)))
120 }
121 PostgresType::PostGisGeometry => DataType::Binary,
122 PostgresType::Uuid => DataType::FixedSizeBinary(16),
123 }
124 }
125}
126
127#[derive(Debug, Clone)]
129pub enum MysqlType {
130 TinyInt,
131 TinyIntUnsigned,
132 SmallInt,
133 SmallIntUnsigned,
134 MediumInt,
135 MediumIntUnsigned,
136 Integer,
137 IntegerUnsigned,
138 BigInt,
139 BigIntUnsigned,
140 Float,
141 Double,
142 Decimal(u8, u8),
143 Date,
144 Datetime,
145 Time,
146 Timestamp,
147 Year,
148 Char,
149 Varchar,
150 Binary,
151 Varbinary,
152 Text(u32),
154 Blob(u32),
156 Json,
157 Geometry,
158}
159
160impl MysqlType {
161 pub fn to_arrow_type(&self) -> DataType {
162 match self {
163 MysqlType::TinyInt => DataType::Int8,
164 MysqlType::TinyIntUnsigned => DataType::UInt8,
165 MysqlType::SmallInt => DataType::Int16,
166 MysqlType::SmallIntUnsigned => DataType::UInt16,
167 MysqlType::MediumInt => DataType::Int32,
168 MysqlType::MediumIntUnsigned => DataType::UInt32,
169 MysqlType::Integer => DataType::Int32,
170 MysqlType::IntegerUnsigned => DataType::UInt32,
171 MysqlType::BigInt => DataType::Int64,
172 MysqlType::BigIntUnsigned => DataType::UInt64,
173 MysqlType::Float => DataType::Float32,
174 MysqlType::Double => DataType::Float64,
175 MysqlType::Decimal(precision, scale) => {
176 assert!(*scale <= (i8::MAX as u8));
177 if *precision > 38 {
178 DataType::Decimal256(*precision, *scale as i8)
179 } else {
180 DataType::Decimal128(*precision, *scale as i8)
181 }
182 }
183 MysqlType::Date => DataType::Date32,
184 MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
185 MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
186 MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
187 MysqlType::Year => DataType::Int16,
188 MysqlType::Char => DataType::Utf8,
189 MysqlType::Varchar => DataType::Utf8,
190 MysqlType::Binary => DataType::Binary,
191 MysqlType::Varbinary => DataType::Binary,
192 MysqlType::Text(col_len) => {
193 if (*col_len as usize) > (i32::MAX as usize) {
194 DataType::LargeUtf8
195 } else {
196 DataType::Utf8
197 }
198 }
199 MysqlType::Blob(col_len) => {
200 if (*col_len as usize) > (i32::MAX as usize) {
201 DataType::LargeBinary
202 } else {
203 DataType::Binary
204 }
205 }
206 MysqlType::Json => DataType::LargeUtf8,
207 MysqlType::Geometry => DataType::LargeBinary,
208 }
209 }
210}
211
212#[derive(Debug, Clone)]
215pub enum OracleType {
216 BinaryFloat,
217 BinaryDouble,
218 Number(u8, i8),
219 Float(u8),
220 Varchar2(u32),
221 NVarchar2(u32),
222 Char(u32),
223 NChar(u32),
224 Long,
225 Clob,
226 NClob,
227 Raw(u32),
228 LongRaw,
229 Blob,
230 Date,
231 Timestamp,
232 Boolean,
233}
234
235impl OracleType {
236 pub fn to_arrow_type(&self) -> DataType {
237 match self {
238 OracleType::BinaryFloat => DataType::Float32,
239 OracleType::BinaryDouble => DataType::Float64,
240 OracleType::Number(precision, scale) => DataType::Decimal128(*precision, *scale),
241 OracleType::Float(_precision) => DataType::Float64,
242 OracleType::Varchar2(_) => DataType::Utf8,
243 OracleType::NVarchar2(_) => DataType::Utf8,
244 OracleType::Char(_) => DataType::Utf8,
245 OracleType::NChar(_) => DataType::Utf8,
246 OracleType::Long => DataType::Utf8,
247 OracleType::Clob => DataType::LargeUtf8,
248 OracleType::NClob => DataType::LargeUtf8,
249 OracleType::Raw(_) => DataType::Binary,
250 OracleType::LongRaw => DataType::Binary,
251 OracleType::Blob => DataType::LargeBinary,
252 OracleType::Date => DataType::Timestamp(TimeUnit::Second, None),
253 OracleType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
254 OracleType::Boolean => DataType::Boolean,
255 }
256 }
257}
258
259#[derive(Debug, Clone)]
261pub enum SqliteType {
262 Null,
263 Integer,
264 Real,
265 Text,
266 Blob,
267}
268
269impl SqliteType {
270 pub fn to_arrow_type(&self) -> DataType {
271 match self {
272 SqliteType::Null => DataType::Null,
273 SqliteType::Integer => DataType::Int64,
274 SqliteType::Real => DataType::Float64,
275 SqliteType::Text => DataType::Utf8,
276 SqliteType::Blob => DataType::Binary,
277 }
278 }
279}
280
281#[derive(Debug, Clone)]
284pub enum DmType {
285 TinyInt,
286 SmallInt,
287 Integer,
288 BigInt,
289 Real,
290 Double,
291 Numeric(u8, i8),
292 Decimal(u8, i8),
293 Char(Option<u16>),
294 Varchar(Option<u16>),
295 Text,
296 Binary(u16),
297 Varbinary(Option<u16>),
298 Image,
299 Bit,
300 Timestamp(u8),
301 Time(u8),
302 Date,
303}
304
305impl DmType {
306 pub fn to_arrow_type(&self) -> DataType {
307 match self {
308 DmType::TinyInt => DataType::Int8,
309 DmType::SmallInt => DataType::Int16,
310 DmType::Integer => DataType::Int32,
311 DmType::BigInt => DataType::Int64,
312 DmType::Real => DataType::Float32,
313 DmType::Double => DataType::Float64,
314 DmType::Numeric(precision, scale) => DataType::Decimal128(*precision, *scale),
315 DmType::Decimal(precision, scale) => DataType::Decimal128(*precision, *scale),
316 DmType::Char(_) => DataType::Utf8,
317 DmType::Varchar(_) => DataType::Utf8,
318 DmType::Text => DataType::Utf8,
319 DmType::Binary(len) => DataType::FixedSizeBinary(*len as i32),
320 DmType::Varbinary(_) => DataType::Binary,
321 DmType::Image => DataType::Binary,
322 DmType::Bit => DataType::Boolean,
323 DmType::Timestamp(precision) => {
324 if *precision == 0 {
325 DataType::Timestamp(TimeUnit::Second, None)
326 } else if *precision <= 3 {
327 DataType::Timestamp(TimeUnit::Millisecond, None)
328 } else if *precision <= 6 {
329 DataType::Timestamp(TimeUnit::Microsecond, None)
330 } else {
331 DataType::Timestamp(TimeUnit::Nanosecond, None)
332 }
333 }
334 DmType::Time(precision) => {
335 if *precision == 0 {
336 DataType::Time32(TimeUnit::Second)
337 } else if *precision <= 3 {
338 DataType::Time32(TimeUnit::Millisecond)
339 } else {
340 DataType::Time64(TimeUnit::Microsecond)
341 }
342 }
343 DmType::Date => DataType::Date32,
344 }
345 }
346}
347
348#[derive(Debug, Clone)]
349pub struct RemoteField {
350 pub name: String,
351 pub remote_type: RemoteType,
352 pub nullable: bool,
353}
354
355impl RemoteField {
356 pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
357 RemoteField {
358 name: name.into(),
359 remote_type,
360 nullable,
361 }
362 }
363
364 pub fn to_arrow_field(&self) -> Field {
365 Field::new(
366 self.name.clone(),
367 self.remote_type.to_arrow_type(),
368 self.nullable,
369 )
370 }
371}
372
373pub type RemoteSchemaRef = Arc<RemoteSchema>;
374
375#[derive(Debug, Clone)]
376pub struct RemoteSchema {
377 pub fields: Vec<RemoteField>,
378}
379
380impl RemoteSchema {
381 pub fn empty() -> Self {
382 RemoteSchema { fields: vec![] }
383 }
384 pub fn new(fields: Vec<RemoteField>) -> Self {
385 RemoteSchema { fields }
386 }
387
388 pub fn to_arrow_schema(&self) -> Schema {
389 let mut fields = vec![];
390 for remote_field in self.fields.iter() {
391 fields.push(remote_field.to_arrow_field());
392 }
393 Schema::new(fields)
394 }
395}
396
397pub fn project_remote_schema(
398 schema: &RemoteSchema,
399 projection: Option<&Vec<usize>>,
400) -> RemoteSchema {
401 match projection {
402 Some(projection) => {
403 let fields = projection
404 .iter()
405 .map(|i| schema.fields[*i].clone())
406 .collect::<Vec<_>>();
407 RemoteSchema::new(fields)
408 }
409 None => schema.clone(),
410 }
411}