1use datafusion::arrow::datatypes::{
2 DECIMAL128_MAX_PRECISION, DataType, Field, IntervalUnit, Schema, TimeUnit,
3};
4use std::sync::Arc;
5
6use crate::RemoteDbType;
7
8#[derive(Debug, Clone)]
9pub enum RemoteType {
10 Postgres(PostgresType),
11 Mysql(MysqlType),
12 Oracle(OracleType),
13 Sqlite(SqliteType),
14 Dm(DmType),
15}
16
17impl RemoteType {
18 pub fn to_arrow_type(&self) -> DataType {
19 match self {
20 RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
21 RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
22 RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
23 RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
24 RemoteType::Dm(dm_type) => dm_type.to_arrow_type(),
25 }
26 }
27
28 pub fn db_type(&self) -> RemoteDbType {
29 match self {
30 RemoteType::Postgres(_) => RemoteDbType::Postgres,
31 RemoteType::Mysql(_) => RemoteDbType::Mysql,
32 RemoteType::Oracle(_) => RemoteDbType::Oracle,
33 RemoteType::Sqlite(_) => RemoteDbType::Sqlite,
34 RemoteType::Dm(_) => RemoteDbType::Dm,
35 }
36 }
37}
38
39#[derive(Debug, Clone)]
41pub enum PostgresType {
42 Int2,
44 Int4,
46 Int8,
48 Float4,
50 Float8,
52 Numeric(u8, i8),
55 Oid,
56 Name,
57 Varchar,
59 Bpchar,
61 Text,
62 Bytea,
63 Date,
64 Timestamp,
65 TimestampTz,
66 Time,
67 Interval,
68 Bool,
69 Json,
70 Jsonb,
71 Int2Array,
72 Int4Array,
73 Int8Array,
74 Float4Array,
75 Float8Array,
76 VarcharArray,
77 BpcharArray,
78 TextArray,
79 ByteaArray,
80 BoolArray,
81 PostGisGeometry,
82 Xml,
83 Uuid,
84}
85
86impl PostgresType {
87 pub fn to_arrow_type(&self) -> DataType {
88 match self {
89 PostgresType::Int2 => DataType::Int16,
90 PostgresType::Int4 => DataType::Int32,
91 PostgresType::Int8 => DataType::Int64,
92 PostgresType::Float4 => DataType::Float32,
93 PostgresType::Float8 => DataType::Float64,
94 PostgresType::Numeric(precision, scale) => {
95 if *precision <= DECIMAL128_MAX_PRECISION {
96 DataType::Decimal128(*precision, *scale)
97 } else {
98 DataType::Decimal256(*precision, *scale)
99 }
100 }
101 PostgresType::Oid => DataType::UInt32,
102 PostgresType::Name
103 | PostgresType::Text
104 | PostgresType::Varchar
105 | PostgresType::Bpchar
106 | PostgresType::Xml => DataType::Utf8,
107 PostgresType::Bytea => DataType::Binary,
108 PostgresType::Date => DataType::Date32,
109 PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None),
110 PostgresType::TimestampTz => {
111 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
112 }
113 PostgresType::Time => DataType::Time64(TimeUnit::Microsecond),
114 PostgresType::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
115 PostgresType::Bool => DataType::Boolean,
116 PostgresType::Json | PostgresType::Jsonb => DataType::LargeUtf8,
117 PostgresType::Int2Array => {
118 DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
119 }
120 PostgresType::Int4Array => {
121 DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
122 }
123 PostgresType::Int8Array => {
124 DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
125 }
126 PostgresType::Float4Array => {
127 DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
128 }
129 PostgresType::Float8Array => {
130 DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
131 }
132 PostgresType::VarcharArray | PostgresType::BpcharArray | PostgresType::TextArray => {
133 DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
134 }
135 PostgresType::ByteaArray => {
136 DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
137 }
138 PostgresType::BoolArray => {
139 DataType::List(Arc::new(Field::new("", DataType::Boolean, true)))
140 }
141 PostgresType::PostGisGeometry => DataType::Binary,
142 PostgresType::Uuid => DataType::FixedSizeBinary(16),
143 }
144 }
145}
146
147#[derive(Debug, Clone)]
149pub enum MysqlType {
150 TinyInt,
151 TinyIntUnsigned,
152 SmallInt,
153 SmallIntUnsigned,
154 MediumInt,
155 MediumIntUnsigned,
156 Integer,
157 IntegerUnsigned,
158 BigInt,
159 BigIntUnsigned,
160 Float,
161 Double,
162 Decimal(u8, u8),
163 Date,
164 Datetime,
165 Time,
166 Timestamp,
167 Year,
168 Char,
169 Varchar,
170 Binary,
171 Varbinary,
172 Text(u32),
174 Blob(u32),
176 Json,
177 Geometry,
178}
179
180impl MysqlType {
181 pub fn to_arrow_type(&self) -> DataType {
182 match self {
183 MysqlType::TinyInt => DataType::Int8,
184 MysqlType::TinyIntUnsigned => DataType::UInt8,
185 MysqlType::SmallInt => DataType::Int16,
186 MysqlType::SmallIntUnsigned => DataType::UInt16,
187 MysqlType::MediumInt => DataType::Int32,
188 MysqlType::MediumIntUnsigned => DataType::UInt32,
189 MysqlType::Integer => DataType::Int32,
190 MysqlType::IntegerUnsigned => DataType::UInt32,
191 MysqlType::BigInt => DataType::Int64,
192 MysqlType::BigIntUnsigned => DataType::UInt64,
193 MysqlType::Float => DataType::Float32,
194 MysqlType::Double => DataType::Float64,
195 MysqlType::Decimal(precision, scale) => {
196 assert!(*scale <= (i8::MAX as u8));
197 if *precision > 38 {
198 DataType::Decimal256(*precision, *scale as i8)
199 } else {
200 DataType::Decimal128(*precision, *scale as i8)
201 }
202 }
203 MysqlType::Date => DataType::Date32,
204 MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
205 MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
206 MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
207 MysqlType::Year => DataType::Int16,
208 MysqlType::Char => DataType::Utf8,
209 MysqlType::Varchar => DataType::Utf8,
210 MysqlType::Binary => DataType::Binary,
211 MysqlType::Varbinary => DataType::Binary,
212 MysqlType::Text(col_len) => {
213 if (*col_len as usize) > (i32::MAX as usize) {
214 DataType::LargeUtf8
215 } else {
216 DataType::Utf8
217 }
218 }
219 MysqlType::Blob(col_len) => {
220 if (*col_len as usize) > (i32::MAX as usize) {
221 DataType::LargeBinary
222 } else {
223 DataType::Binary
224 }
225 }
226 MysqlType::Json => DataType::LargeUtf8,
227 MysqlType::Geometry => DataType::LargeBinary,
228 }
229 }
230}
231
232#[derive(Debug, Clone)]
235pub enum OracleType {
236 BinaryFloat,
237 BinaryDouble,
238 Number(u8, i8),
239 Float(u8),
240 Varchar2(u32),
241 NVarchar2(u32),
242 Char(u32),
243 NChar(u32),
244 Long,
245 Clob,
246 NClob,
247 Raw(u32),
248 LongRaw,
249 Blob,
250 Date,
251 Timestamp,
252 Boolean,
253}
254
255impl OracleType {
256 pub fn to_arrow_type(&self) -> DataType {
257 match self {
258 OracleType::BinaryFloat => DataType::Float32,
259 OracleType::BinaryDouble => DataType::Float64,
260 OracleType::Number(precision, scale) => DataType::Decimal128(*precision, *scale),
261 OracleType::Float(_precision) => DataType::Float64,
262 OracleType::Varchar2(_) => DataType::Utf8,
263 OracleType::NVarchar2(_) => DataType::Utf8,
264 OracleType::Char(_) => DataType::Utf8,
265 OracleType::NChar(_) => DataType::Utf8,
266 OracleType::Long => DataType::Utf8,
267 OracleType::Clob => DataType::LargeUtf8,
268 OracleType::NClob => DataType::LargeUtf8,
269 OracleType::Raw(_) => DataType::Binary,
270 OracleType::LongRaw => DataType::Binary,
271 OracleType::Blob => DataType::LargeBinary,
272 OracleType::Date => DataType::Timestamp(TimeUnit::Second, None),
273 OracleType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
274 OracleType::Boolean => DataType::Boolean,
275 }
276 }
277}
278
279#[derive(Debug, Clone)]
281pub enum SqliteType {
282 Null,
283 Integer,
284 Real,
285 Text,
286 Blob,
287}
288
289impl SqliteType {
290 pub fn to_arrow_type(&self) -> DataType {
291 match self {
292 SqliteType::Null => DataType::Null,
293 SqliteType::Integer => DataType::Int64,
294 SqliteType::Real => DataType::Float64,
295 SqliteType::Text => DataType::Utf8,
296 SqliteType::Blob => DataType::Binary,
297 }
298 }
299}
300
301#[derive(Debug, Clone)]
304pub enum DmType {
305 TinyInt,
306 SmallInt,
307 Integer,
308 BigInt,
309 Real,
310 Double,
311 Numeric(u8, i8),
312 Decimal(u8, i8),
313 Char(Option<u16>),
314 Varchar(Option<u16>),
315 Text,
316 Binary(u16),
317 Varbinary(Option<u16>),
318 Image,
319 Bit,
320 Timestamp(u8),
321 Time(u8),
322 Date,
323}
324
325impl DmType {
326 pub fn to_arrow_type(&self) -> DataType {
327 match self {
328 DmType::TinyInt => DataType::Int8,
329 DmType::SmallInt => DataType::Int16,
330 DmType::Integer => DataType::Int32,
331 DmType::BigInt => DataType::Int64,
332 DmType::Real => DataType::Float32,
333 DmType::Double => DataType::Float64,
334 DmType::Numeric(precision, scale) => DataType::Decimal128(*precision, *scale),
335 DmType::Decimal(precision, scale) => DataType::Decimal128(*precision, *scale),
336 DmType::Char(_) => DataType::Utf8,
337 DmType::Varchar(_) => DataType::Utf8,
338 DmType::Text => DataType::Utf8,
339 DmType::Binary(len) => DataType::FixedSizeBinary(*len as i32),
340 DmType::Varbinary(_) => DataType::Binary,
341 DmType::Image => DataType::Binary,
342 DmType::Bit => DataType::Boolean,
343 DmType::Timestamp(precision) => {
344 if *precision == 0 {
345 DataType::Timestamp(TimeUnit::Second, None)
346 } else if *precision <= 3 {
347 DataType::Timestamp(TimeUnit::Millisecond, None)
348 } else if *precision <= 6 {
349 DataType::Timestamp(TimeUnit::Microsecond, None)
350 } else {
351 DataType::Timestamp(TimeUnit::Nanosecond, None)
352 }
353 }
354 DmType::Time(precision) => {
355 if *precision == 0 {
356 DataType::Time32(TimeUnit::Second)
357 } else if *precision <= 3 {
358 DataType::Time32(TimeUnit::Millisecond)
359 } else {
360 DataType::Time64(TimeUnit::Microsecond)
361 }
362 }
363 DmType::Date => DataType::Date32,
364 }
365 }
366}
367
368#[derive(Debug, Clone)]
369pub struct RemoteField {
370 pub name: String,
371 pub remote_type: RemoteType,
372 pub nullable: bool,
373 pub auto_increment: bool,
374}
375
376impl RemoteField {
377 pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
378 RemoteField {
379 name: name.into(),
380 remote_type,
381 nullable,
382 auto_increment: false,
383 }
384 }
385
386 pub fn with_auto_increment(mut self, auto_increment: bool) -> Self {
387 self.auto_increment = auto_increment;
388 self
389 }
390
391 pub fn to_arrow_field(&self) -> Field {
392 Field::new(
393 self.name.clone(),
394 self.remote_type.to_arrow_type(),
395 self.nullable,
396 )
397 }
398}
399
400pub type RemoteSchemaRef = Arc<RemoteSchema>;
401
402#[derive(Debug, Clone)]
403pub struct RemoteSchema {
404 pub fields: Vec<RemoteField>,
405}
406
407impl RemoteSchema {
408 pub fn empty() -> Self {
409 RemoteSchema { fields: vec![] }
410 }
411
412 pub fn new(fields: Vec<RemoteField>) -> Self {
413 RemoteSchema { fields }
414 }
415
416 pub fn to_arrow_schema(&self) -> Schema {
417 let mut fields = vec![];
418 for remote_field in self.fields.iter() {
419 fields.push(remote_field.to_arrow_field());
420 }
421 Schema::new(fields)
422 }
423}