1use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
2use std::sync::Arc;
3
4#[derive(Debug, Clone)]
5pub enum RemoteType {
6 Postgres(PostgresType),
7 Mysql(MysqlType),
8 Oracle(OracleType),
9 Sqlite(SqliteType),
10}
11
12impl RemoteType {
13 pub fn to_arrow_type(&self) -> DataType {
14 match self {
15 RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
16 RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
17 RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
18 RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
19 }
20 }
21}
22
23#[derive(Debug, Clone)]
25pub enum PostgresType {
26 Int2,
28 Int4,
30 Int8,
32 Float4,
34 Float8,
36 Numeric(i8),
39 Name,
40 Varchar,
42 Bpchar,
44 Text,
45 Bytea,
46 Date,
47 Timestamp,
48 TimestampTz,
49 Time,
50 Interval,
51 Bool,
52 Json,
53 Jsonb,
54 Int2Array,
55 Int4Array,
56 Int8Array,
57 Float4Array,
58 Float8Array,
59 VarcharArray,
60 BpcharArray,
61 TextArray,
62 ByteaArray,
63 BoolArray,
64 PostGisGeometry,
65}
66
67impl PostgresType {
68 pub fn to_arrow_type(&self) -> DataType {
69 match self {
70 PostgresType::Int2 => DataType::Int16,
71 PostgresType::Int4 => DataType::Int32,
72 PostgresType::Int8 => DataType::Int64,
73 PostgresType::Float4 => DataType::Float32,
74 PostgresType::Float8 => DataType::Float64,
75 PostgresType::Numeric(scale) => DataType::Decimal128(38, *scale),
76 PostgresType::Name
77 | PostgresType::Text
78 | PostgresType::Varchar
79 | PostgresType::Bpchar => DataType::Utf8,
80 PostgresType::Bytea => DataType::Binary,
81 PostgresType::Date => DataType::Date32,
82 PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
83 PostgresType::TimestampTz => {
84 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
85 }
86 PostgresType::Time => DataType::Time64(TimeUnit::Nanosecond),
87 PostgresType::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
88 PostgresType::Bool => DataType::Boolean,
89 PostgresType::Json | PostgresType::Jsonb => DataType::LargeUtf8,
90 PostgresType::Int2Array => {
91 DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
92 }
93 PostgresType::Int4Array => {
94 DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
95 }
96 PostgresType::Int8Array => {
97 DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
98 }
99 PostgresType::Float4Array => {
100 DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
101 }
102 PostgresType::Float8Array => {
103 DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
104 }
105 PostgresType::VarcharArray | PostgresType::BpcharArray | PostgresType::TextArray => {
106 DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
107 }
108 PostgresType::ByteaArray => {
109 DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
110 }
111 PostgresType::BoolArray => {
112 DataType::List(Arc::new(Field::new("", DataType::Boolean, true)))
113 }
114 PostgresType::PostGisGeometry => DataType::Binary,
115 }
116 }
117}
118
119#[derive(Debug, Clone)]
121pub enum MysqlType {
122 TinyInt,
123 TinyIntUnsigned,
124 SmallInt,
125 SmallIntUnsigned,
126 MediumInt,
127 MediumIntUnsigned,
128 Integer,
129 IntegerUnsigned,
130 BigInt,
131 BigIntUnsigned,
132 Float,
133 Double,
134 Decimal(u8, u8),
135 Date,
136 Datetime,
137 Time,
138 Timestamp,
139 Year,
140 Char,
141 Varchar,
142 Binary,
143 Varbinary,
144 Text(u32),
146 Blob(u32),
148 Json,
149 Geometry,
150}
151
152impl MysqlType {
153 pub fn to_arrow_type(&self) -> DataType {
154 match self {
155 MysqlType::TinyInt => DataType::Int8,
156 MysqlType::TinyIntUnsigned => DataType::UInt8,
157 MysqlType::SmallInt => DataType::Int16,
158 MysqlType::SmallIntUnsigned => DataType::UInt16,
159 MysqlType::MediumInt => DataType::Int32,
160 MysqlType::MediumIntUnsigned => DataType::UInt32,
161 MysqlType::Integer => DataType::Int32,
162 MysqlType::IntegerUnsigned => DataType::UInt32,
163 MysqlType::BigInt => DataType::Int64,
164 MysqlType::BigIntUnsigned => DataType::UInt64,
165 MysqlType::Float => DataType::Float32,
166 MysqlType::Double => DataType::Float64,
167 MysqlType::Decimal(precision, scale) => {
168 assert!(*scale <= (i8::MAX as u8));
169 if *precision > 38 {
170 DataType::Decimal256(*precision, *scale as i8)
171 } else {
172 DataType::Decimal128(*precision, *scale as i8)
173 }
174 }
175 MysqlType::Date => DataType::Date32,
176 MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
177 MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
178 MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None),
179 MysqlType::Year => DataType::Int16,
180 MysqlType::Char => DataType::Utf8,
181 MysqlType::Varchar => DataType::Utf8,
182 MysqlType::Binary => DataType::Binary,
183 MysqlType::Varbinary => DataType::Binary,
184 MysqlType::Text(col_len) => {
185 if (*col_len as usize) > (i32::MAX as usize) {
186 DataType::LargeUtf8
187 } else {
188 DataType::Utf8
189 }
190 }
191 MysqlType::Blob(col_len) => {
192 if (*col_len as usize) > (i32::MAX as usize) {
193 DataType::LargeBinary
194 } else {
195 DataType::Binary
196 }
197 }
198 MysqlType::Json => DataType::LargeUtf8,
199 MysqlType::Geometry => DataType::LargeBinary,
200 }
201 }
202}
203
204#[derive(Debug, Clone)]
206pub enum OracleType {
207 Varchar2(u32),
208 Char(u32),
209 Number(u8, i8),
210 Date,
211 Timestamp,
212}
213
214impl OracleType {
215 pub fn to_arrow_type(&self) -> DataType {
216 match self {
217 OracleType::Varchar2(_) => DataType::Utf8,
218 OracleType::Char(_) => DataType::Utf8,
219 OracleType::Number(precision, scale) => DataType::Decimal128(*precision, *scale),
220 OracleType::Date => DataType::Timestamp(TimeUnit::Second, None),
221 OracleType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
222 }
223 }
224}
225
226#[derive(Debug, Clone)]
228pub enum SqliteType {
229 Null,
230 Integer,
231 Real,
232 Text,
233 Blob,
234}
235
236impl SqliteType {
237 pub fn to_arrow_type(&self) -> DataType {
238 match self {
239 SqliteType::Null => DataType::Null,
240 SqliteType::Integer => DataType::Int64,
241 SqliteType::Real => DataType::Float64,
242 SqliteType::Text => DataType::Utf8,
243 SqliteType::Blob => DataType::Binary,
244 }
245 }
246}
247
248#[derive(Debug, Clone)]
249pub struct RemoteField {
250 pub name: String,
251 pub remote_type: RemoteType,
252 pub nullable: bool,
253}
254
255impl RemoteField {
256 pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
257 RemoteField {
258 name: name.into(),
259 remote_type,
260 nullable,
261 }
262 }
263
264 pub fn to_arrow_field(&self) -> Field {
265 Field::new(
266 self.name.clone(),
267 self.remote_type.to_arrow_type(),
268 self.nullable,
269 )
270 }
271}
272
273pub type RemoteSchemaRef = Arc<RemoteSchema>;
274
275#[derive(Debug, Clone)]
276pub struct RemoteSchema {
277 pub fields: Vec<RemoteField>,
278}
279
280impl RemoteSchema {
281 pub fn empty() -> Self {
282 RemoteSchema { fields: vec![] }
283 }
284 pub fn new(fields: Vec<RemoteField>) -> Self {
285 RemoteSchema { fields }
286 }
287
288 pub fn to_arrow_schema(&self) -> Schema {
289 let mut fields = vec![];
290 for remote_field in self.fields.iter() {
291 fields.push(remote_field.to_arrow_field());
292 }
293 Schema::new(fields)
294 }
295}
296
297pub fn project_remote_schema(
298 schema: &RemoteSchema,
299 projection: Option<&Vec<usize>>,
300) -> RemoteSchema {
301 match projection {
302 Some(projection) => {
303 let fields = projection
304 .iter()
305 .map(|i| schema.fields[*i].clone())
306 .collect::<Vec<_>>();
307 RemoteSchema::new(fields)
308 }
309 None => schema.clone(),
310 }
311}