1use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
2use std::sync::Arc;
3
4#[derive(Debug, Clone)]
5pub enum RemoteType {
6 Postgres(PostgresType),
7 Mysql(MysqlType),
8 Oracle(OracleType),
9 Sqlite(SqliteType),
10}
11
12impl RemoteType {
13 pub fn to_arrow_type(&self) -> DataType {
14 match self {
15 RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
16 RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
17 RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
18 RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
19 }
20 }
21}
22
23#[derive(Debug, Clone)]
25pub enum PostgresType {
26 Int2,
28 Int4,
30 Int8,
32 Float4,
34 Float8,
36 Numeric(i8),
39 Name,
40 Varchar,
42 Bpchar,
44 Text,
45 Bytea,
46 Date,
47 Timestamp,
48 TimestampTz,
49 Time,
50 Interval,
51 Bool,
52 Json,
53 Jsonb,
54 Int2Array,
55 Int4Array,
56 Int8Array,
57 Float4Array,
58 Float8Array,
59 VarcharArray,
60 BpcharArray,
61 TextArray,
62 ByteaArray,
63 BoolArray,
64 PostGisGeometry,
65}
66
67impl PostgresType {
68 pub fn to_arrow_type(&self) -> DataType {
69 match self {
70 PostgresType::Int2 => DataType::Int16,
71 PostgresType::Int4 => DataType::Int32,
72 PostgresType::Int8 => DataType::Int64,
73 PostgresType::Float4 => DataType::Float32,
74 PostgresType::Float8 => DataType::Float64,
75 PostgresType::Numeric(scale) => DataType::Decimal128(38, *scale),
76 PostgresType::Name
77 | PostgresType::Text
78 | PostgresType::Varchar
79 | PostgresType::Bpchar => DataType::Utf8,
80 PostgresType::Bytea => DataType::Binary,
81 PostgresType::Date => DataType::Date32,
82 PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
83 PostgresType::TimestampTz => {
84 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
85 }
86 PostgresType::Time => DataType::Time64(TimeUnit::Nanosecond),
87 PostgresType::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
88 PostgresType::Bool => DataType::Boolean,
89 PostgresType::Json | PostgresType::Jsonb => DataType::LargeUtf8,
90 PostgresType::Int2Array => {
91 DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
92 }
93 PostgresType::Int4Array => {
94 DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
95 }
96 PostgresType::Int8Array => {
97 DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
98 }
99 PostgresType::Float4Array => {
100 DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
101 }
102 PostgresType::Float8Array => {
103 DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
104 }
105 PostgresType::VarcharArray | PostgresType::BpcharArray | PostgresType::TextArray => {
106 DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
107 }
108 PostgresType::ByteaArray => {
109 DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
110 }
111 PostgresType::BoolArray => {
112 DataType::List(Arc::new(Field::new("", DataType::Boolean, true)))
113 }
114 PostgresType::PostGisGeometry => DataType::Binary,
115 }
116 }
117}
118
119#[derive(Debug, Clone)]
121pub enum MysqlType {
122 TinyInt,
123 TinyIntUnsigned,
124 SmallInt,
125 SmallIntUnsigned,
126 MediumInt,
127 MediumIntUnsigned,
128 Integer,
129 IntegerUnsigned,
130 BigInt,
131 BigIntUnsigned,
132 Float,
133 Double,
134 Decimal(u8, u8),
135 Date,
136 Datetime,
137 Time,
138 Timestamp,
139 Year,
140 Char,
141 Varchar,
142 Binary,
143 Varbinary,
144 Text(u32),
146 Blob(u32),
148 Json,
149 Geometry,
150}
151
152impl MysqlType {
153 pub fn to_arrow_type(&self) -> DataType {
154 match self {
155 MysqlType::TinyInt => DataType::Int8,
156 MysqlType::TinyIntUnsigned => DataType::UInt8,
157 MysqlType::SmallInt => DataType::Int16,
158 MysqlType::SmallIntUnsigned => DataType::UInt16,
159 MysqlType::MediumInt => DataType::Int32,
160 MysqlType::MediumIntUnsigned => DataType::UInt32,
161 MysqlType::Integer => DataType::Int32,
162 MysqlType::IntegerUnsigned => DataType::UInt32,
163 MysqlType::BigInt => DataType::Int64,
164 MysqlType::BigIntUnsigned => DataType::UInt64,
165 MysqlType::Float => DataType::Float32,
166 MysqlType::Double => DataType::Float64,
167 MysqlType::Decimal(precision, scale) => {
168 assert!(*scale <= (i8::MAX as u8));
169 if *precision > 38 {
170 DataType::Decimal256(*precision, *scale as i8)
171 } else {
172 DataType::Decimal128(*precision, *scale as i8)
173 }
174 }
175 MysqlType::Date => DataType::Date32,
176 MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
177 MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
178 MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None),
179 MysqlType::Year => DataType::Int16,
180 MysqlType::Char => DataType::Utf8,
181 MysqlType::Varchar => DataType::Utf8,
182 MysqlType::Binary => DataType::Binary,
183 MysqlType::Varbinary => DataType::Binary,
184 MysqlType::Text(col_len) => {
185 if (*col_len as usize) > (i32::MAX as usize) {
186 DataType::LargeUtf8
187 } else {
188 DataType::Utf8
189 }
190 }
191 MysqlType::Blob(col_len) => {
192 if (*col_len as usize) > (i32::MAX as usize) {
193 DataType::LargeBinary
194 } else {
195 DataType::Binary
196 }
197 }
198 MysqlType::Json => DataType::LargeUtf8,
199 MysqlType::Geometry => DataType::LargeBinary,
200 }
201 }
202}
203
204#[derive(Debug, Clone)]
207pub enum OracleType {
208 Varchar2(u32),
209 Char(u32),
210 Number(u8, i8),
211 Date,
212 Timestamp,
213}
214
215impl OracleType {
216 pub fn to_arrow_type(&self) -> DataType {
217 match self {
218 OracleType::Varchar2(_) => DataType::Utf8,
219 OracleType::Char(_) => DataType::Utf8,
220 OracleType::Number(precision, scale) => DataType::Decimal128(*precision, *scale),
221 OracleType::Date => DataType::Timestamp(TimeUnit::Second, None),
222 OracleType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
223 }
224 }
225}
226
227#[derive(Debug, Clone)]
229pub enum SqliteType {
230 Null,
231 Integer,
232 Real,
233 Text,
234 Blob,
235}
236
237impl SqliteType {
238 pub fn to_arrow_type(&self) -> DataType {
239 match self {
240 SqliteType::Null => DataType::Null,
241 SqliteType::Integer => DataType::Int64,
242 SqliteType::Real => DataType::Float64,
243 SqliteType::Text => DataType::Utf8,
244 SqliteType::Blob => DataType::Binary,
245 }
246 }
247}
248
249#[derive(Debug, Clone)]
250pub struct RemoteField {
251 pub name: String,
252 pub remote_type: RemoteType,
253 pub nullable: bool,
254}
255
256impl RemoteField {
257 pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
258 RemoteField {
259 name: name.into(),
260 remote_type,
261 nullable,
262 }
263 }
264
265 pub fn to_arrow_field(&self) -> Field {
266 Field::new(
267 self.name.clone(),
268 self.remote_type.to_arrow_type(),
269 self.nullable,
270 )
271 }
272}
273
274pub type RemoteSchemaRef = Arc<RemoteSchema>;
275
276#[derive(Debug, Clone)]
277pub struct RemoteSchema {
278 pub fields: Vec<RemoteField>,
279}
280
281impl RemoteSchema {
282 pub fn empty() -> Self {
283 RemoteSchema { fields: vec![] }
284 }
285 pub fn new(fields: Vec<RemoteField>) -> Self {
286 RemoteSchema { fields }
287 }
288
289 pub fn to_arrow_schema(&self) -> Schema {
290 let mut fields = vec![];
291 for remote_field in self.fields.iter() {
292 fields.push(remote_field.to_arrow_field());
293 }
294 Schema::new(fields)
295 }
296}
297
298pub fn project_remote_schema(
299 schema: &RemoteSchema,
300 projection: Option<&Vec<usize>>,
301) -> RemoteSchema {
302 match projection {
303 Some(projection) => {
304 let fields = projection
305 .iter()
306 .map(|i| schema.fields[*i].clone())
307 .collect::<Vec<_>>();
308 RemoteSchema::new(fields)
309 }
310 None => schema.clone(),
311 }
312}