datafusion_remote_table/
codec.rs

1use crate::generated::prost as protobuf;
2use crate::{
3    connect, ConnectionOptions, DFResult, MysqlConnectionOptions, MysqlType,
4    OracleConnectionOptions, OracleType, PostgresConnectionOptions, PostgresType, RemoteField,
5    RemoteSchema, RemoteSchemaRef, RemoteTableExec, RemoteType, SqliteType, Transform,
6};
7use datafusion::arrow::datatypes::SchemaRef;
8use datafusion::common::DataFusionError;
9use datafusion::execution::FunctionRegistry;
10use datafusion::physical_plan::ExecutionPlan;
11use datafusion_proto::convert_required;
12use datafusion_proto::physical_plan::PhysicalExtensionCodec;
13use datafusion_proto::protobuf::proto_error;
14use prost::Message;
15use std::fmt::Debug;
16use std::path::Path;
17use std::sync::Arc;
18
19pub trait TransformCodec: Debug + Send + Sync {
20    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
21    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
22}
23
24#[derive(Debug)]
25pub struct RemotePhysicalCodec {
26    transform_codec: Option<Arc<dyn TransformCodec>>,
27}
28
29impl RemotePhysicalCodec {
30    pub fn new(transform_codec: Option<Arc<dyn TransformCodec>>) -> Self {
31        Self { transform_codec }
32    }
33}
34
35impl PhysicalExtensionCodec for RemotePhysicalCodec {
36    fn try_decode(
37        &self,
38        buf: &[u8],
39        _inputs: &[Arc<dyn ExecutionPlan>],
40        _registry: &dyn FunctionRegistry,
41    ) -> DFResult<Arc<dyn ExecutionPlan>> {
42        let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
43            DataFusionError::Internal(format!(
44                "Failed to decode remote table execution plan: {e:?}"
45            ))
46        })?;
47
48        let transform = if let Some(bytes) = proto.transform {
49            let Some(transform_codec) = self.transform_codec.as_ref() else {
50                return Err(DataFusionError::Execution(
51                    "No transform codec found".to_string(),
52                ));
53            };
54            Some(transform_codec.try_decode(&bytes)?)
55        } else {
56            None
57        };
58
59        let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
60        let remote_schema = proto
61            .remote_schema
62            .map(|schema| Arc::new(parse_remote_schema(&schema)));
63
64        let projection: Option<Vec<usize>> = proto
65            .projection
66            .map(|p| p.projection.iter().map(|n| *n as usize).collect());
67
68        let conn_options = parse_connection_options(proto.conn_options.unwrap());
69        let conn = tokio::task::block_in_place(|| {
70            tokio::runtime::Handle::current().block_on(async {
71                let pool = connect(&conn_options).await?;
72                let conn = pool.get().await?;
73                Ok::<_, DataFusionError>(conn)
74            })
75        })?;
76
77        Ok(Arc::new(RemoteTableExec::try_new(
78            conn_options,
79            proto.sql,
80            table_schema,
81            remote_schema,
82            projection,
83            transform,
84            conn,
85        )?))
86    }
87
88    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
89        if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
90            let serialized_transform = if let Some(transform) = exec.transform.as_ref() {
91                let Some(transform_codec) = self.transform_codec.as_ref() else {
92                    return Err(DataFusionError::Execution(
93                        "No transform codec found".to_string(),
94                    ));
95                };
96                let bytes = transform_codec.try_encode(transform.as_ref())?;
97                Some(bytes)
98            } else {
99                None
100            };
101
102            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
103            let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
104
105            let proto = protobuf::RemoteTableExec {
106                conn_options: Some(serialized_connection_options),
107                sql: exec.sql.clone(),
108                table_schema: Some(exec.table_schema.as_ref().try_into()?),
109                remote_schema,
110                projection: exec
111                    .projection
112                    .as_ref()
113                    .map(|p| serialize_projection(p.as_slice())),
114                transform: serialized_transform,
115            };
116
117            proto.encode(buf).map_err(|e| {
118                DataFusionError::Internal(format!(
119                    "Failed to encode remote table execution plan: {e:?}"
120                ))
121            })?;
122            Ok(())
123        } else {
124            Err(DataFusionError::Execution(format!(
125                "Failed to encode {}",
126                RemoteTableExec::static_name()
127            )))
128        }
129    }
130}
131
132fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
133    match options {
134        ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
135            connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
136                protobuf::PostgresConnectionOptions {
137                    host: options.host.clone(),
138                    port: options.port as u32,
139                    username: options.username.clone(),
140                    password: options.password.clone(),
141                    database: options.database.clone(),
142                },
143            )),
144        },
145        ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
146            connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
147                protobuf::MysqlConnectionOptions {
148                    host: options.host.clone(),
149                    port: options.port as u32,
150                    username: options.username.clone(),
151                    password: options.password.clone(),
152                    database: options.database.clone(),
153                },
154            )),
155        },
156        ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
157            connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
158                protobuf::OracleConnectionOptions {
159                    host: options.host.clone(),
160                    port: options.port as u32,
161                    username: options.username.clone(),
162                    password: options.password.clone(),
163                    service_name: options.service_name.clone(),
164                },
165            )),
166        },
167        ConnectionOptions::Sqlite(path) => protobuf::ConnectionOptions {
168            connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
169                protobuf::SqliteConnectionOptions {
170                    path: path.to_str().unwrap().to_string(),
171                },
172            )),
173        },
174    }
175}
176
177fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
178    match options.connection_options {
179        Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
180            ConnectionOptions::Postgres(PostgresConnectionOptions {
181                host: options.host,
182                port: options.port as u16,
183                username: options.username,
184                password: options.password,
185                database: options.database,
186            })
187        }
188        Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
189            ConnectionOptions::Mysql(MysqlConnectionOptions {
190                host: options.host,
191                port: options.port as u16,
192                username: options.username,
193                password: options.password,
194                database: options.database,
195            })
196        }
197        Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
198            ConnectionOptions::Oracle(OracleConnectionOptions {
199                host: options.host,
200                port: options.port as u16,
201                username: options.username,
202                password: options.password,
203                service_name: options.service_name,
204            })
205        }
206        Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
207            ConnectionOptions::Sqlite(Path::new(&options.path).to_path_buf())
208        }
209        _ => panic!("Failed to parse connection options: {options:?}"),
210    }
211}
212
213fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
214    protobuf::Projection {
215        projection: projection.iter().map(|n| *n as u32).collect(),
216    }
217}
218
219fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
220    let fields = remote_schema
221        .fields
222        .iter()
223        .map(serialize_remote_field)
224        .collect::<Vec<_>>();
225
226    protobuf::RemoteSchema { fields }
227}
228
229fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
230    protobuf::RemoteField {
231        name: remote_field.name.clone(),
232        remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
233        nullable: remote_field.nullable,
234    }
235}
236
237fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
238    match remote_type {
239        RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
240            r#type: Some(protobuf::remote_type::Type::PostgresInt2(
241                protobuf::PostgresInt2 {},
242            )),
243        },
244        RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
245            r#type: Some(protobuf::remote_type::Type::PostgresInt4(
246                protobuf::PostgresInt4 {},
247            )),
248        },
249        RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
250            r#type: Some(protobuf::remote_type::Type::PostgresInt8(
251                protobuf::PostgresInt8 {},
252            )),
253        },
254        RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
255            r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
256                protobuf::PostgresFloat4 {},
257            )),
258        },
259        RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
260            r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
261                protobuf::PostgresFloat8 {},
262            )),
263        },
264        RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
265            r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
266                protobuf::PostgresNumeric {
267                    scale: *scale as i32,
268                },
269            )),
270        },
271        RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
272            r#type: Some(protobuf::remote_type::Type::PostgresName(
273                protobuf::PostgresName {},
274            )),
275        },
276        RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
277            r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
278                protobuf::PostgresVarchar {},
279            )),
280        },
281        RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
282            r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
283                protobuf::PostgresBpchar {},
284            )),
285        },
286        RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
287            r#type: Some(protobuf::remote_type::Type::PostgresText(
288                protobuf::PostgresText {},
289            )),
290        },
291        RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
292            r#type: Some(protobuf::remote_type::Type::PostgresBytea(
293                protobuf::PostgresBytea {},
294            )),
295        },
296        RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
297            r#type: Some(protobuf::remote_type::Type::PostgresDate(
298                protobuf::PostgresDate {},
299            )),
300        },
301        RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
302            r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
303                protobuf::PostgresTimestamp {},
304            )),
305        },
306        RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
307            r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
308                protobuf::PostgresTimestampTz {},
309            )),
310        },
311        RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
312            r#type: Some(protobuf::remote_type::Type::PostgresTime(
313                protobuf::PostgresTime {},
314            )),
315        },
316        RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
317            r#type: Some(protobuf::remote_type::Type::PostgresInterval(
318                protobuf::PostgresInterval {},
319            )),
320        },
321        RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
322            r#type: Some(protobuf::remote_type::Type::PostgresBool(
323                protobuf::PostgresBool {},
324            )),
325        },
326        RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
327            r#type: Some(protobuf::remote_type::Type::PostgresJson(
328                protobuf::PostgresJson {},
329            )),
330        },
331        RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
332            r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
333                protobuf::PostgresJsonb {},
334            )),
335        },
336        RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
337            r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
338                protobuf::PostgresInt2Array {},
339            )),
340        },
341        RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
342            r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
343                protobuf::PostgresInt4Array {},
344            )),
345        },
346        RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
347            r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
348                protobuf::PostgresInt8Array {},
349            )),
350        },
351        RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
352            r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
353                protobuf::PostgresFloat4Array {},
354            )),
355        },
356        RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
357            r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
358                protobuf::PostgresFloat8Array {},
359            )),
360        },
361        RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
362            r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
363                protobuf::PostgresVarcharArray {},
364            )),
365        },
366        RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
367            r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
368                protobuf::PostgresBpcharArray {},
369            )),
370        },
371        RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
372            r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
373                protobuf::PostgresTextArray {},
374            )),
375        },
376        RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
377            r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
378                protobuf::PostgresByteaArray {},
379            )),
380        },
381        RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
382            r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
383                protobuf::PostgresBoolArray {},
384            )),
385        },
386        RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
387            r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
388                protobuf::PostgresPostGisGeometry {},
389            )),
390        },
391
392        RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
393            r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
394                protobuf::MysqlTinyInt {},
395            )),
396        },
397        RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
398            r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
399                protobuf::MysqlTinyIntUnsigned {},
400            )),
401        },
402        RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
403            r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
404                protobuf::MysqlSmallInt {},
405            )),
406        },
407        RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
408            r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
409                protobuf::MysqlSmallIntUnsigned {},
410            )),
411        },
412        RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
413            r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
414                protobuf::MysqlMediumInt {},
415            )),
416        },
417        RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
418            r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
419                protobuf::MysqlMediumIntUnsigned {},
420            )),
421        },
422        RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
423            r#type: Some(protobuf::remote_type::Type::MysqlInteger(
424                protobuf::MysqlInteger {},
425            )),
426        },
427        RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
428            r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
429                protobuf::MysqlIntegerUnsigned {},
430            )),
431        },
432        RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
433            r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
434                protobuf::MysqlBigInt {},
435            )),
436        },
437        RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
438            r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
439                protobuf::MysqlBigIntUnsigned {},
440            )),
441        },
442        RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
443            r#type: Some(protobuf::remote_type::Type::MysqlFloat(
444                protobuf::MysqlFloat {},
445            )),
446        },
447        RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
448            r#type: Some(protobuf::remote_type::Type::MysqlDouble(
449                protobuf::MysqlDouble {},
450            )),
451        },
452        RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
453            r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
454                protobuf::MysqlDecimal {
455                    precision: *precision as u32,
456                    scale: *scale as u32,
457                },
458            )),
459        },
460        RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
461            r#type: Some(protobuf::remote_type::Type::MysqlDate(
462                protobuf::MysqlDate {},
463            )),
464        },
465        RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
466            r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
467                protobuf::MysqlDateTime {},
468            )),
469        },
470        RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
471            r#type: Some(protobuf::remote_type::Type::MysqlTime(
472                protobuf::MysqlTime {},
473            )),
474        },
475        RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
476            r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
477                protobuf::MysqlTimestamp {},
478            )),
479        },
480        RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
481            r#type: Some(protobuf::remote_type::Type::MysqlYear(
482                protobuf::MysqlYear {},
483            )),
484        },
485        RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
486            r#type: Some(protobuf::remote_type::Type::MysqlChar(
487                protobuf::MysqlChar {},
488            )),
489        },
490        RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
491            r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
492                protobuf::MysqlVarchar {},
493            )),
494        },
495        RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
496            r#type: Some(protobuf::remote_type::Type::MysqlBinary(
497                protobuf::MysqlBinary {},
498            )),
499        },
500        RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
501            r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
502                protobuf::MysqlVarbinary {},
503            )),
504        },
505        RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
506            r#type: Some(protobuf::remote_type::Type::MysqlText(
507                protobuf::MysqlText { length: *len },
508            )),
509        },
510        RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
511            r#type: Some(protobuf::remote_type::Type::MysqlBlob(
512                protobuf::MysqlBlob { length: *len },
513            )),
514        },
515        RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
516            r#type: Some(protobuf::remote_type::Type::MysqlJson(
517                protobuf::MysqlJson {},
518            )),
519        },
520        RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
521            r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
522                protobuf::MysqlGeometry {},
523            )),
524        },
525
526        RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
527            r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
528                protobuf::OracleVarchar2 { length: *len },
529            )),
530        },
531        RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
532            r#type: Some(protobuf::remote_type::Type::OracleChar(
533                protobuf::OracleChar { length: *len },
534            )),
535        },
536        RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
537            r#type: Some(protobuf::remote_type::Type::OracleNumber(
538                protobuf::OracleNumber {
539                    precision: *precision as u32,
540                    scale: *scale as i32,
541                },
542            )),
543        },
544        RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
545            r#type: Some(protobuf::remote_type::Type::OracleDate(
546                protobuf::OracleDate {},
547            )),
548        },
549        RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
550            r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
551                protobuf::OracleTimestamp {},
552            )),
553        },
554        RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
555            r#type: Some(protobuf::remote_type::Type::SqliteNull(
556                protobuf::SqliteNull {},
557            )),
558        },
559        RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
560            r#type: Some(protobuf::remote_type::Type::SqliteInteger(
561                protobuf::SqliteInteger {},
562            )),
563        },
564        RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
565            r#type: Some(protobuf::remote_type::Type::SqliteReal(
566                protobuf::SqliteReal {},
567            )),
568        },
569        RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
570            r#type: Some(protobuf::remote_type::Type::SqliteText(
571                protobuf::SqliteText {},
572            )),
573        },
574        RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
575            r#type: Some(protobuf::remote_type::Type::SqliteBlob(
576                protobuf::SqliteBlob {},
577            )),
578        },
579    }
580}
581
582fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
583    let fields = remote_schema
584        .fields
585        .iter()
586        .map(parse_remote_field)
587        .collect::<Vec<_>>();
588
589    RemoteSchema { fields }
590}
591
592fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
593    RemoteField {
594        name: field.name.clone(),
595        remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
596        nullable: field.nullable,
597    }
598}
599
600fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
601    match remote_type.r#type.as_ref().unwrap() {
602        protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
603        protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
604        protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
605        protobuf::remote_type::Type::PostgresFloat4(_) => {
606            RemoteType::Postgres(PostgresType::Float4)
607        }
608        protobuf::remote_type::Type::PostgresFloat8(_) => {
609            RemoteType::Postgres(PostgresType::Float8)
610        }
611        protobuf::remote_type::Type::PostgresNumeric(numeric) => {
612            RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
613        }
614        protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
615        protobuf::remote_type::Type::PostgresVarchar(_) => {
616            RemoteType::Postgres(PostgresType::Varchar)
617        }
618        protobuf::remote_type::Type::PostgresBpchar(_) => {
619            RemoteType::Postgres(PostgresType::Bpchar)
620        }
621        protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
622        protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
623        protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
624        protobuf::remote_type::Type::PostgresTimestamp(_) => {
625            RemoteType::Postgres(PostgresType::Timestamp)
626        }
627        protobuf::remote_type::Type::PostgresTimestampTz(_) => {
628            RemoteType::Postgres(PostgresType::TimestampTz)
629        }
630        protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
631        protobuf::remote_type::Type::PostgresInterval(_) => {
632            RemoteType::Postgres(PostgresType::Interval)
633        }
634        protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
635        protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
636        protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
637        protobuf::remote_type::Type::PostgresInt2Array(_) => {
638            RemoteType::Postgres(PostgresType::Int2Array)
639        }
640        protobuf::remote_type::Type::PostgresInt4Array(_) => {
641            RemoteType::Postgres(PostgresType::Int4Array)
642        }
643        protobuf::remote_type::Type::PostgresInt8Array(_) => {
644            RemoteType::Postgres(PostgresType::Int8Array)
645        }
646        protobuf::remote_type::Type::PostgresFloat4Array(_) => {
647            RemoteType::Postgres(PostgresType::Float4Array)
648        }
649        protobuf::remote_type::Type::PostgresFloat8Array(_) => {
650            RemoteType::Postgres(PostgresType::Float8Array)
651        }
652        protobuf::remote_type::Type::PostgresVarcharArray(_) => {
653            RemoteType::Postgres(PostgresType::VarcharArray)
654        }
655        protobuf::remote_type::Type::PostgresBpcharArray(_) => {
656            RemoteType::Postgres(PostgresType::BpcharArray)
657        }
658        protobuf::remote_type::Type::PostgresTextArray(_) => {
659            RemoteType::Postgres(PostgresType::TextArray)
660        }
661        protobuf::remote_type::Type::PostgresByteaArray(_) => {
662            RemoteType::Postgres(PostgresType::ByteaArray)
663        }
664        protobuf::remote_type::Type::PostgresBoolArray(_) => {
665            RemoteType::Postgres(PostgresType::BoolArray)
666        }
667        protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
668            RemoteType::Postgres(PostgresType::PostGisGeometry)
669        }
670        protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
671        protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
672            RemoteType::Mysql(MysqlType::TinyIntUnsigned)
673        }
674        protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
675        protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
676            RemoteType::Mysql(MysqlType::SmallIntUnsigned)
677        }
678        protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
679        protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
680            RemoteType::Mysql(MysqlType::MediumIntUnsigned)
681        }
682        protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
683        protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
684            RemoteType::Mysql(MysqlType::IntegerUnsigned)
685        }
686        protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
687        protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
688            RemoteType::Mysql(MysqlType::BigIntUnsigned)
689        }
690        protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
691        protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
692        protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
693            MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
694        ),
695        protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
696        protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
697        protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
698        protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
699        protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
700        protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
701        protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
702        protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
703        protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
704        protobuf::remote_type::Type::MysqlText(text) => {
705            RemoteType::Mysql(MysqlType::Text(text.length))
706        }
707        protobuf::remote_type::Type::MysqlBlob(blob) => {
708            RemoteType::Mysql(MysqlType::Blob(blob.length))
709        }
710        protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
711        protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
712        protobuf::remote_type::Type::OracleVarchar2(varchar) => {
713            RemoteType::Oracle(OracleType::Varchar2(varchar.length))
714        }
715        protobuf::remote_type::Type::OracleChar(char) => {
716            RemoteType::Oracle(OracleType::Char(char.length))
717        }
718        protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
719            OracleType::Number(number.precision as u8, number.scale as i8),
720        ),
721        protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
722        protobuf::remote_type::Type::OracleTimestamp(_) => {
723            RemoteType::Oracle(OracleType::Timestamp)
724        }
725        protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
726        protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
727        protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
728        protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
729        protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
730    }
731}