1use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
2use std::sync::Arc;
3
4#[derive(Debug, Clone)]
5pub enum RemoteType {
6 Postgres(PostgresType),
7 Mysql(MysqlType),
8 Oracle(OracleType),
9 Sqlite(SqliteType),
10}
11
12impl RemoteType {
13 pub fn to_arrow_type(&self) -> DataType {
14 match self {
15 RemoteType::Postgres(postgres_type) => postgres_type.to_arrow_type(),
16 RemoteType::Mysql(mysql_type) => mysql_type.to_arrow_type(),
17 RemoteType::Oracle(oracle_type) => oracle_type.to_arrow_type(),
18 RemoteType::Sqlite(sqlite_type) => sqlite_type.to_arrow_type(),
19 }
20 }
21}
22
23#[derive(Debug, Clone)]
25pub enum PostgresType {
26 Int2,
28 Int4,
30 Int8,
32 Float4,
34 Float8,
36 Numeric(i8),
39 Varchar,
41 Bpchar,
43 Text,
44 Bytea,
45 Date,
46 Timestamp,
47 TimestampTz,
48 Time,
49 Interval,
50 Bool,
51 Json,
52 Jsonb,
53 Int2Array,
54 Int4Array,
55 Int8Array,
56 Float4Array,
57 Float8Array,
58 VarcharArray,
59 BpcharArray,
60 TextArray,
61 ByteaArray,
62 BoolArray,
63 PostGisGeometry,
64}
65
66impl PostgresType {
67 pub fn to_arrow_type(&self) -> DataType {
68 match self {
69 PostgresType::Int2 => DataType::Int16,
70 PostgresType::Int4 => DataType::Int32,
71 PostgresType::Int8 => DataType::Int64,
72 PostgresType::Float4 => DataType::Float32,
73 PostgresType::Float8 => DataType::Float64,
74 PostgresType::Numeric(scale) => DataType::Decimal128(38, *scale),
75 PostgresType::Text | PostgresType::Varchar | PostgresType::Bpchar => DataType::Utf8,
76 PostgresType::Bytea => DataType::Binary,
77 PostgresType::Date => DataType::Date32,
78 PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
79 PostgresType::TimestampTz => {
80 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
81 }
82 PostgresType::Time => DataType::Time64(TimeUnit::Nanosecond),
83 PostgresType::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
84 PostgresType::Bool => DataType::Boolean,
85 PostgresType::Json | PostgresType::Jsonb => DataType::LargeUtf8,
86 PostgresType::Int2Array => {
87 DataType::List(Arc::new(Field::new("", DataType::Int16, true)))
88 }
89 PostgresType::Int4Array => {
90 DataType::List(Arc::new(Field::new("", DataType::Int32, true)))
91 }
92 PostgresType::Int8Array => {
93 DataType::List(Arc::new(Field::new("", DataType::Int64, true)))
94 }
95 PostgresType::Float4Array => {
96 DataType::List(Arc::new(Field::new("", DataType::Float32, true)))
97 }
98 PostgresType::Float8Array => {
99 DataType::List(Arc::new(Field::new("", DataType::Float64, true)))
100 }
101 PostgresType::VarcharArray | PostgresType::BpcharArray | PostgresType::TextArray => {
102 DataType::List(Arc::new(Field::new("", DataType::Utf8, true)))
103 }
104 PostgresType::ByteaArray => {
105 DataType::List(Arc::new(Field::new("", DataType::Binary, true)))
106 }
107 PostgresType::BoolArray => {
108 DataType::List(Arc::new(Field::new("", DataType::Boolean, true)))
109 }
110 PostgresType::PostGisGeometry => DataType::Binary,
111 }
112 }
113}
114
115#[derive(Debug, Clone)]
117pub enum MysqlType {
118 TinyInt,
119 SmallInt,
120 MediumInt,
121 Integer,
122 BigInt,
123 Float,
124 Double,
125 Decimal(u8, u8),
126 Date,
127 Datetime,
128 Time,
129 Timestamp,
130 Year,
131 Char,
132 Varchar,
133 Binary,
134 Varbinary,
135 TinyText,
136 Text,
137 MediumText,
138 LongText,
139 TinyBlob,
140 Blob,
141 MediumBlob,
142 LongBlob,
143 Json,
144 Geometry,
145}
146
147impl MysqlType {
148 pub fn to_arrow_type(&self) -> DataType {
149 match self {
150 MysqlType::TinyInt => DataType::Int8,
151 MysqlType::SmallInt => DataType::Int16,
152 MysqlType::MediumInt => DataType::Int32,
153 MysqlType::Integer => DataType::Int32,
154 MysqlType::BigInt => DataType::Int64,
155 MysqlType::Float => DataType::Float32,
156 MysqlType::Double => DataType::Float64,
157 MysqlType::Decimal(precision, scale) => {
158 assert!(*scale <= (i8::MAX as u8));
159 if *precision > 38 {
160 DataType::Decimal256(*precision, *scale as i8)
161 } else {
162 DataType::Decimal128(*precision, *scale as i8)
163 }
164 }
165 MysqlType::Date => DataType::Date32,
166 MysqlType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None),
167 MysqlType::Time => DataType::Time64(TimeUnit::Nanosecond),
168 MysqlType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None),
169 MysqlType::Year => DataType::Int16,
170 MysqlType::Char => DataType::Utf8,
171 MysqlType::Varchar => DataType::Utf8,
172 MysqlType::Binary => DataType::Binary,
173 MysqlType::Varbinary => DataType::Binary,
174 MysqlType::TinyText => DataType::Utf8,
175 MysqlType::Text => DataType::Utf8,
176 MysqlType::MediumText => DataType::Utf8,
177 MysqlType::LongText => DataType::LargeUtf8,
178 MysqlType::TinyBlob => DataType::Binary,
179 MysqlType::Blob => DataType::Binary,
180 MysqlType::MediumBlob => DataType::Binary,
181 MysqlType::LongBlob => DataType::LargeBinary,
182 MysqlType::Json => DataType::LargeUtf8,
183 MysqlType::Geometry => DataType::LargeBinary,
184 }
185 }
186}
187
188#[derive(Debug, Clone)]
190pub enum OracleType {
191 Varchar2(u32),
192 Char(u32),
193 Number(u8, i8),
194 Date,
195 Timestamp,
196}
197
198impl OracleType {
199 pub fn to_arrow_type(&self) -> DataType {
200 match self {
201 OracleType::Varchar2(_) => DataType::Utf8,
202 OracleType::Char(_) => DataType::Utf8,
203 OracleType::Number(precision, scale) => DataType::Decimal128(*precision, *scale),
204 OracleType::Date => DataType::Timestamp(TimeUnit::Second, None),
205 OracleType::Timestamp => DataType::Timestamp(TimeUnit::Nanosecond, None),
206 }
207 }
208}
209
210#[derive(Debug, Clone)]
212pub enum SqliteType {
213 Null,
214 Integer,
215 Real,
216 Text,
217 Blob,
218}
219
220impl SqliteType {
221 pub fn to_arrow_type(&self) -> DataType {
222 match self {
223 SqliteType::Null => DataType::Null,
224 SqliteType::Integer => DataType::Int64,
225 SqliteType::Real => DataType::Float64,
226 SqliteType::Text => DataType::Utf8,
227 SqliteType::Blob => DataType::Binary,
228 }
229 }
230}
231
232#[derive(Debug, Clone)]
233pub struct RemoteField {
234 pub name: String,
235 pub remote_type: RemoteType,
236 pub nullable: bool,
237}
238
239impl RemoteField {
240 pub fn new(name: impl Into<String>, remote_type: RemoteType, nullable: bool) -> Self {
241 RemoteField {
242 name: name.into(),
243 remote_type,
244 nullable,
245 }
246 }
247
248 pub fn to_arrow_field(&self) -> Field {
249 Field::new(
250 self.name.clone(),
251 self.remote_type.to_arrow_type(),
252 self.nullable,
253 )
254 }
255}
256
257#[derive(Debug, Clone)]
258pub struct RemoteSchema {
259 pub fields: Vec<RemoteField>,
260}
261
262impl RemoteSchema {
263 pub fn empty() -> Self {
264 RemoteSchema { fields: vec![] }
265 }
266 pub fn new(fields: Vec<RemoteField>) -> Self {
267 RemoteSchema { fields }
268 }
269
270 pub fn to_arrow_schema(&self) -> Schema {
271 let mut fields = vec![];
272 for remote_field in self.fields.iter() {
273 fields.push(remote_field.to_arrow_field());
274 }
275 Schema::new(fields)
276 }
277}
278
279pub fn project_remote_schema(
280 schema: &RemoteSchema,
281 projection: Option<&Vec<usize>>,
282) -> RemoteSchema {
283 match projection {
284 Some(projection) => {
285 let fields = projection
286 .iter()
287 .map(|i| schema.fields[*i].clone())
288 .collect::<Vec<_>>();
289 RemoteSchema::new(fields)
290 }
291 None => schema.clone(),
292 }
293}