1use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
2use std::sync::Arc;
3
4#[derive(Debug, Clone)]
5pub enum RemoteType {
6 Postgres(PostgresType),
7 Mysql(MysqlType),
8 Oracle(OracleType),
9 Sqlite(SqliteType),
10 Dm(DmType),
11}
12
13impl RemoteType {
14 pub fn to_arrow_type(&self) -> DataType {
15 match self {
16 RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
17 RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
18 RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
19 RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
20 RemoteType::Dm(dm_type) => dm_type.to_arrow_type(),
21 }
22 }
23}
24
25#[derive(Debug, Clone)]
27pub enum PostgresType {
28 Int2,
30 Int4,
32 Int8,
34 Float4,
36 Float8,
38 Numeric(i8),
41 Oid,
42 Name,
43 Varchar,
45 Bpchar,
47 Text,
48 Bytea,
49 Date,
50 Timestamp,
51 TimestampTz,
52 Time,
53 Interval,
54 Bool,
55 Json,
56 Jsonb,
57 Int2Array,
58 Int4Array,
59 Int8Array,
60 Float4Array,
61 Float8Array,
62 VarcharArray,
63 BpcharArray,
64 TextArray,
65 ByteaArray,
66 BoolArray,
67 PostGisGeometry,
68}
69
70impl PostgresType {
71 pub fn to_arrow_type(&self) -> DataType {
72 match self {
73 PostgresType::Int2 => DataType::Int16,
74 PostgresType::Int4 => DataType::Int32,
75 PostgresType::Int8 => DataType::Int64,
76 PostgresType::Float4 => DataType::Float32,
77 PostgresType::Float8 => DataType::Float64,
78 PostgresType::Numeric(scale) => DataType::Decimal128(38, *scale),
79 PostgresType::Oid => DataType::UInt32,
80 PostgresType::Name
81 | PostgresType::Text
82 | PostgresType::Varchar
83 | PostgresType::Bpchar => DataType::Utf8,
84 PostgresType::Bytea => DataType::Binary,
85 PostgresType::Date => DataType::Date32,
86 PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
87 PostgresType::TimestampTz => {
88 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
89 }
90 PostgresType::Time => DataType::Time64(TimeUnit::Nanosecond),
91 PostgresType::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
92 PostgresType::Bool => DataType::Boolean,
93 PostgresType::Json | PostgresType::Jsonb => DataType::LargeUtf8,
94 PostgresType::Int2Array => {
95 DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
96 }
97 PostgresType::Int4Array => {
98 DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
99 }
100 PostgresType::Int8Array => {
101 DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
102 }
103 PostgresType::Float4Array => {
104 DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
105 }
106 PostgresType::Float8Array => {
107 DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
108 }
109 PostgresType::VarcharArray | PostgresType::BpcharArray | PostgresType::TextArray => {
110 DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
111 }
112 PostgresType::ByteaArray => {
113 DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
114 }
115 PostgresType::BoolArray => {
116 DataType::List(Arc::new(Field::new("", DataType::Boolean, true)))
117 }
118 PostgresType::PostGisGeometry => DataType::Binary,
119 }
120 }
121}
122
123#[derive(Debug, Clone)]
125pub enum MysqlType {
126 TinyInt,
127 TinyIntUnsigned,
128 SmallInt,
129 SmallIntUnsigned,
130 MediumInt,
131 MediumIntUnsigned,
132 Integer,
133 IntegerUnsigned,
134 BigInt,
135 BigIntUnsigned,
136 Float,
137 Double,
138 Decimal(u8, u8),
139 Date,
140 Datetime,
141 Time,
142 Timestamp,
143 Year,
144 Char,
145 Varchar,
146 Binary,
147 Varbinary,
148 Text(u32),
150 Blob(u32),
152 Json,
153 Geometry,
154}
155
156impl MysqlType {
157 pub fn to_arrow_type(&self) -> DataType {
158 match self {
159 MysqlType::TinyInt => DataType::Int8,
160 MysqlType::TinyIntUnsigned => DataType::UInt8,
161 MysqlType::SmallInt => DataType::Int16,
162 MysqlType::SmallIntUnsigned => DataType::UInt16,
163 MysqlType::MediumInt => DataType::Int32,
164 MysqlType::MediumIntUnsigned => DataType::UInt32,
165 MysqlType::Integer => DataType::Int32,
166 MysqlType::IntegerUnsigned => DataType::UInt32,
167 MysqlType::BigInt => DataType::Int64,
168 MysqlType::BigIntUnsigned => DataType::UInt64,
169 MysqlType::Float => DataType::Float32,
170 MysqlType::Double => DataType::Float64,
171 MysqlType::Decimal(precision, scale) => {
172 assert!(*scale <= (i8::MAX as u8));
173 if *precision > 38 {
174 DataType::Decimal256(*precision, *scale as i8)
175 } else {
176 DataType::Decimal128(*precision, *scale as i8)
177 }
178 }
179 MysqlType::Date => DataType::Date32,
180 MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
181 MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
182 MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None),
183 MysqlType::Year => DataType::Int16,
184 MysqlType::Char => DataType::Utf8,
185 MysqlType::Varchar => DataType::Utf8,
186 MysqlType::Binary => DataType::Binary,
187 MysqlType::Varbinary => DataType::Binary,
188 MysqlType::Text(col_len) => {
189 if (*col_len as usize) > (i32::MAX as usize) {
190 DataType::LargeUtf8
191 } else {
192 DataType::Utf8
193 }
194 }
195 MysqlType::Blob(col_len) => {
196 if (*col_len as usize) > (i32::MAX as usize) {
197 DataType::LargeBinary
198 } else {
199 DataType::Binary
200 }
201 }
202 MysqlType::Json => DataType::LargeUtf8,
203 MysqlType::Geometry => DataType::LargeBinary,
204 }
205 }
206}
207
208#[derive(Debug, Clone)]
211pub enum OracleType {
212 BinaryFloat,
213 BinaryDouble,
214 Number(u8, i8),
215 Float(u8),
216 Varchar2(u32),
217 NVarchar2(u32),
218 Char(u32),
219 NChar(u32),
220 Long,
221 Clob,
222 NClob,
223 Raw(u32),
224 LongRaw,
225 Blob,
226 Date,
227 Timestamp,
228 Boolean,
229}
230
231impl OracleType {
232 pub fn to_arrow_type(&self) -> DataType {
233 match self {
234 OracleType::BinaryFloat => DataType::Float32,
235 OracleType::BinaryDouble => DataType::Float64,
236 OracleType::Number(precision, scale) => DataType::Decimal128(*precision, *scale),
237 OracleType::Float(_precision) => DataType::Float64,
238 OracleType::Varchar2(_) => DataType::Utf8,
239 OracleType::NVarchar2(_) => DataType::Utf8,
240 OracleType::Char(_) => DataType::Utf8,
241 OracleType::NChar(_) => DataType::Utf8,
242 OracleType::Long => DataType::Utf8,
243 OracleType::Clob => DataType::LargeUtf8,
244 OracleType::NClob => DataType::LargeUtf8,
245 OracleType::Raw(_) => DataType::Binary,
246 OracleType::LongRaw => DataType::Binary,
247 OracleType::Blob => DataType::LargeBinary,
248 OracleType::Date => DataType::Timestamp(TimeUnit::Second, None),
249 OracleType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
250 OracleType::Boolean => DataType::Boolean,
251 }
252 }
253}
254
255#[derive(Debug, Clone)]
257pub enum SqliteType {
258 Null,
259 Integer,
260 Real,
261 Text,
262 Blob,
263}
264
265impl SqliteType {
266 pub fn to_arrow_type(&self) -> DataType {
267 match self {
268 SqliteType::Null => DataType::Null,
269 SqliteType::Integer => DataType::Int64,
270 SqliteType::Real => DataType::Float64,
271 SqliteType::Text => DataType::Utf8,
272 SqliteType::Blob => DataType::Binary,
273 }
274 }
275}
276
277#[derive(Debug, Clone)]
280pub enum DmType {
281 TinyInt,
282 SmallInt,
283 Integer,
284 BigInt,
285 Real,
286 Double,
287 Numeric(u8, i8),
288 Decimal(u8, i8),
289 Char(Option<u16>),
290 Varchar(Option<u16>),
291 Text,
292 Binary(u16),
293 Varbinary(Option<u16>),
294 Image,
295 Bit,
296 Timestamp(u8),
297 Time(u8),
298 Date,
299}
300
301impl DmType {
302 pub fn to_arrow_type(&self) -> DataType {
303 match self {
304 DmType::TinyInt => DataType::Int8,
305 DmType::SmallInt => DataType::Int16,
306 DmType::Integer => DataType::Int32,
307 DmType::BigInt => DataType::Int64,
308 DmType::Real => DataType::Float32,
309 DmType::Double => DataType::Float64,
310 DmType::Numeric(precision, scale) => DataType::Decimal128(*precision, *scale),
311 DmType::Decimal(precision, scale) => DataType::Decimal128(*precision, *scale),
312 DmType::Char(_) => DataType::Utf8,
313 DmType::Varchar(_) => DataType::Utf8,
314 DmType::Text => DataType::Utf8,
315 DmType::Binary(len) => DataType::FixedSizeBinary(*len as i32),
316 DmType::Varbinary(_) => DataType::Binary,
317 DmType::Image => DataType::Binary,
318 DmType::Bit => DataType::Boolean,
319 DmType::Timestamp(precision) => {
320 if *precision == 0 {
321 DataType::Timestamp(TimeUnit::Second, None)
322 } else if *precision <= 3 {
323 DataType::Timestamp(TimeUnit::Millisecond, None)
324 } else if *precision <= 6 {
325 DataType::Timestamp(TimeUnit::Microsecond, None)
326 } else {
327 DataType::Timestamp(TimeUnit::Nanosecond, None)
328 }
329 }
330 DmType::Time(precision) => {
331 if *precision == 0 {
332 DataType::Time32(TimeUnit::Second)
333 } else if *precision <= 3 {
334 DataType::Time32(TimeUnit::Millisecond)
335 } else {
336 DataType::Time64(TimeUnit::Microsecond)
337 }
338 }
339 DmType::Date => DataType::Date32,
340 }
341 }
342}
343
344#[derive(Debug, Clone)]
345pub struct RemoteField {
346 pub name: String,
347 pub remote_type: RemoteType,
348 pub nullable: bool,
349}
350
351impl RemoteField {
352 pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
353 RemoteField {
354 name: name.into(),
355 remote_type,
356 nullable,
357 }
358 }
359
360 pub fn to_arrow_field(&self) -> Field {
361 Field::new(
362 self.name.clone(),
363 self.remote_type.to_arrow_type(),
364 self.nullable,
365 )
366 }
367}
368
369pub type RemoteSchemaRef = Arc<RemoteSchema>;
370
371#[derive(Debug, Clone)]
372pub struct RemoteSchema {
373 pub fields: Vec<RemoteField>,
374}
375
376impl RemoteSchema {
377 pub fn empty() -> Self {
378 RemoteSchema { fields: vec![] }
379 }
380 pub fn new(fields: Vec<RemoteField>) -> Self {
381 RemoteSchema { fields }
382 }
383
384 pub fn to_arrow_schema(&self) -> Schema {
385 let mut fields = vec![];
386 for remote_field in self.fields.iter() {
387 fields.push(remote_field.to_arrow_field());
388 }
389 Schema::new(fields)
390 }
391}
392
393pub fn project_remote_schema(
394 schema: &RemoteSchema,
395 projection: Option<&Vec<usize>>,
396) -> RemoteSchema {
397 match projection {
398 Some(projection) => {
399 let fields = projection
400 .iter()
401 .map(|i| schema.fields[*i].clone())
402 .collect::<Vec<_>>();
403 RemoteSchema::new(fields)
404 }
405 None => schema.clone(),
406 }
407}