datafusion_remote_table/
codec.rs

1#[cfg(feature = "mysql")]
2use crate::MysqlConnectionOptions;
3#[cfg(feature = "oracle")]
4use crate::OracleConnectionOptions;
5#[cfg(feature = "postgres")]
6use crate::PostgresConnectionOptions;
7use crate::generated::prost as protobuf;
8use crate::{
9    ConnectionOptions, DFResult, MysqlType, OracleType, PostgresType, RemoteField, RemoteSchema,
10    RemoteSchemaRef, RemoteTableExec, RemoteType, SqliteType, Transform, connect,
11};
12use datafusion::arrow::datatypes::SchemaRef;
13use datafusion::common::DataFusionError;
14use datafusion::execution::FunctionRegistry;
15use datafusion::physical_plan::ExecutionPlan;
16use datafusion_proto::convert_required;
17use datafusion_proto::physical_plan::PhysicalExtensionCodec;
18use datafusion_proto::protobuf::proto_error;
19use prost::Message;
20use std::fmt::Debug;
21#[cfg(feature = "sqlite")]
22use std::path::Path;
23use std::sync::Arc;
24
25pub trait TransformCodec: Debug + Send + Sync {
26    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
27    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
28}
29
30#[derive(Debug)]
31pub struct RemotePhysicalCodec {
32    transform_codec: Option<Arc<dyn TransformCodec>>,
33}
34
35impl RemotePhysicalCodec {
36    pub fn new(transform_codec: Option<Arc<dyn TransformCodec>>) -> Self {
37        Self { transform_codec }
38    }
39}
40
41impl PhysicalExtensionCodec for RemotePhysicalCodec {
42    fn try_decode(
43        &self,
44        buf: &[u8],
45        _inputs: &[Arc<dyn ExecutionPlan>],
46        _registry: &dyn FunctionRegistry,
47    ) -> DFResult<Arc<dyn ExecutionPlan>> {
48        let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
49            DataFusionError::Internal(format!(
50                "Failed to decode remote table execution plan: {e:?}"
51            ))
52        })?;
53
54        let transform = if let Some(bytes) = proto.transform {
55            let Some(transform_codec) = self.transform_codec.as_ref() else {
56                return Err(DataFusionError::Execution(
57                    "No transform codec found".to_string(),
58                ));
59            };
60            Some(transform_codec.try_decode(&bytes)?)
61        } else {
62            None
63        };
64
65        let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
66        let remote_schema = proto
67            .remote_schema
68            .map(|schema| Arc::new(parse_remote_schema(&schema)));
69
70        let projection: Option<Vec<usize>> = proto
71            .projection
72            .map(|p| p.projection.iter().map(|n| *n as usize).collect());
73
74        let conn_options = parse_connection_options(proto.conn_options.unwrap());
75        let conn = tokio::task::block_in_place(|| {
76            tokio::runtime::Handle::current().block_on(async {
77                let pool = connect(&conn_options).await?;
78                let conn = pool.get().await?;
79                Ok::<_, DataFusionError>(conn)
80            })
81        })?;
82
83        Ok(Arc::new(RemoteTableExec::try_new(
84            conn_options,
85            proto.sql,
86            table_schema,
87            remote_schema,
88            projection,
89            transform,
90            conn,
91        )?))
92    }
93
94    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
95        if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
96            let serialized_transform = if let Some(transform) = exec.transform.as_ref() {
97                let Some(transform_codec) = self.transform_codec.as_ref() else {
98                    return Err(DataFusionError::Execution(
99                        "No transform codec found".to_string(),
100                    ));
101                };
102                let bytes = transform_codec.try_encode(transform.as_ref())?;
103                Some(bytes)
104            } else {
105                None
106            };
107
108            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
109            let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
110
111            let proto = protobuf::RemoteTableExec {
112                conn_options: Some(serialized_connection_options),
113                sql: exec.sql.clone(),
114                table_schema: Some(exec.table_schema.as_ref().try_into()?),
115                remote_schema,
116                projection: exec
117                    .projection
118                    .as_ref()
119                    .map(|p| serialize_projection(p.as_slice())),
120                transform: serialized_transform,
121            };
122
123            proto.encode(buf).map_err(|e| {
124                DataFusionError::Internal(format!(
125                    "Failed to encode remote table execution plan: {e:?}"
126                ))
127            })?;
128            Ok(())
129        } else {
130            Err(DataFusionError::Execution(format!(
131                "Failed to encode {}",
132                RemoteTableExec::static_name()
133            )))
134        }
135    }
136}
137
138fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
139    match options {
140        #[cfg(feature = "postgres")]
141        ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
142            connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
143                protobuf::PostgresConnectionOptions {
144                    host: options.host.clone(),
145                    port: options.port as u32,
146                    username: options.username.clone(),
147                    password: options.password.clone(),
148                    database: options.database.clone(),
149                    pool_max_size: options.pool_max_size as u32,
150                    stream_chunk_size: options.stream_chunk_size as u32,
151                },
152            )),
153        },
154        #[cfg(feature = "mysql")]
155        ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
156            connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
157                protobuf::MysqlConnectionOptions {
158                    host: options.host.clone(),
159                    port: options.port as u32,
160                    username: options.username.clone(),
161                    password: options.password.clone(),
162                    database: options.database.clone(),
163                    pool_max_size: options.pool_max_size as u32,
164                    stream_chunk_size: options.stream_chunk_size as u32,
165                },
166            )),
167        },
168        #[cfg(feature = "oracle")]
169        ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
170            connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
171                protobuf::OracleConnectionOptions {
172                    host: options.host.clone(),
173                    port: options.port as u32,
174                    username: options.username.clone(),
175                    password: options.password.clone(),
176                    service_name: options.service_name.clone(),
177                    pool_max_size: options.pool_max_size as u32,
178                    stream_chunk_size: options.stream_chunk_size as u32,
179                },
180            )),
181        },
182        #[cfg(feature = "sqlite")]
183        ConnectionOptions::Sqlite(path) => protobuf::ConnectionOptions {
184            connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
185                protobuf::SqliteConnectionOptions {
186                    path: path.to_str().unwrap().to_string(),
187                },
188            )),
189        },
190    }
191}
192
193fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
194    match options.connection_options {
195        #[cfg(feature = "postgres")]
196        Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
197            ConnectionOptions::Postgres(PostgresConnectionOptions {
198                host: options.host,
199                port: options.port as u16,
200                username: options.username,
201                password: options.password,
202                database: options.database,
203                pool_max_size: options.pool_max_size as usize,
204                stream_chunk_size: options.stream_chunk_size as usize,
205            })
206        }
207        #[cfg(feature = "mysql")]
208        Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
209            ConnectionOptions::Mysql(MysqlConnectionOptions {
210                host: options.host,
211                port: options.port as u16,
212                username: options.username,
213                password: options.password,
214                database: options.database,
215                pool_max_size: options.pool_max_size as usize,
216                stream_chunk_size: options.stream_chunk_size as usize,
217            })
218        }
219        #[cfg(feature = "oracle")]
220        Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
221            ConnectionOptions::Oracle(OracleConnectionOptions {
222                host: options.host,
223                port: options.port as u16,
224                username: options.username,
225                password: options.password,
226                service_name: options.service_name,
227                pool_max_size: options.pool_max_size as usize,
228                stream_chunk_size: options.stream_chunk_size as usize,
229            })
230        }
231        #[cfg(feature = "sqlite")]
232        Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
233            ConnectionOptions::Sqlite(Path::new(&options.path).to_path_buf())
234        }
235        _ => panic!("Failed to parse connection options: {options:?}"),
236    }
237}
238
239fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
240    protobuf::Projection {
241        projection: projection.iter().map(|n| *n as u32).collect(),
242    }
243}
244
245fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
246    let fields = remote_schema
247        .fields
248        .iter()
249        .map(serialize_remote_field)
250        .collect::<Vec<_>>();
251
252    protobuf::RemoteSchema { fields }
253}
254
255fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
256    protobuf::RemoteField {
257        name: remote_field.name.clone(),
258        remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
259        nullable: remote_field.nullable,
260    }
261}
262
263fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
264    match remote_type {
265        RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
266            r#type: Some(protobuf::remote_type::Type::PostgresInt2(
267                protobuf::PostgresInt2 {},
268            )),
269        },
270        RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
271            r#type: Some(protobuf::remote_type::Type::PostgresInt4(
272                protobuf::PostgresInt4 {},
273            )),
274        },
275        RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
276            r#type: Some(protobuf::remote_type::Type::PostgresInt8(
277                protobuf::PostgresInt8 {},
278            )),
279        },
280        RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
281            r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
282                protobuf::PostgresFloat4 {},
283            )),
284        },
285        RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
286            r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
287                protobuf::PostgresFloat8 {},
288            )),
289        },
290        RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
291            r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
292                protobuf::PostgresNumeric {
293                    scale: *scale as i32,
294                },
295            )),
296        },
297        RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
298            r#type: Some(protobuf::remote_type::Type::PostgresName(
299                protobuf::PostgresName {},
300            )),
301        },
302        RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
303            r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
304                protobuf::PostgresVarchar {},
305            )),
306        },
307        RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
308            r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
309                protobuf::PostgresBpchar {},
310            )),
311        },
312        RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
313            r#type: Some(protobuf::remote_type::Type::PostgresText(
314                protobuf::PostgresText {},
315            )),
316        },
317        RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
318            r#type: Some(protobuf::remote_type::Type::PostgresBytea(
319                protobuf::PostgresBytea {},
320            )),
321        },
322        RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
323            r#type: Some(protobuf::remote_type::Type::PostgresDate(
324                protobuf::PostgresDate {},
325            )),
326        },
327        RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
328            r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
329                protobuf::PostgresTimestamp {},
330            )),
331        },
332        RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
333            r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
334                protobuf::PostgresTimestampTz {},
335            )),
336        },
337        RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
338            r#type: Some(protobuf::remote_type::Type::PostgresTime(
339                protobuf::PostgresTime {},
340            )),
341        },
342        RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
343            r#type: Some(protobuf::remote_type::Type::PostgresInterval(
344                protobuf::PostgresInterval {},
345            )),
346        },
347        RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
348            r#type: Some(protobuf::remote_type::Type::PostgresBool(
349                protobuf::PostgresBool {},
350            )),
351        },
352        RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
353            r#type: Some(protobuf::remote_type::Type::PostgresJson(
354                protobuf::PostgresJson {},
355            )),
356        },
357        RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
358            r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
359                protobuf::PostgresJsonb {},
360            )),
361        },
362        RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
363            r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
364                protobuf::PostgresInt2Array {},
365            )),
366        },
367        RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
368            r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
369                protobuf::PostgresInt4Array {},
370            )),
371        },
372        RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
373            r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
374                protobuf::PostgresInt8Array {},
375            )),
376        },
377        RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
378            r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
379                protobuf::PostgresFloat4Array {},
380            )),
381        },
382        RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
383            r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
384                protobuf::PostgresFloat8Array {},
385            )),
386        },
387        RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
388            r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
389                protobuf::PostgresVarcharArray {},
390            )),
391        },
392        RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
393            r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
394                protobuf::PostgresBpcharArray {},
395            )),
396        },
397        RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
398            r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
399                protobuf::PostgresTextArray {},
400            )),
401        },
402        RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
403            r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
404                protobuf::PostgresByteaArray {},
405            )),
406        },
407        RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
408            r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
409                protobuf::PostgresBoolArray {},
410            )),
411        },
412        RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
413            r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
414                protobuf::PostgresPostGisGeometry {},
415            )),
416        },
417        RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
418            r#type: Some(protobuf::remote_type::Type::PostgresOid(
419                protobuf::PostgresOid {},
420            )),
421        },
422
423        RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
424            r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
425                protobuf::MysqlTinyInt {},
426            )),
427        },
428        RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
429            r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
430                protobuf::MysqlTinyIntUnsigned {},
431            )),
432        },
433        RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
434            r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
435                protobuf::MysqlSmallInt {},
436            )),
437        },
438        RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
439            r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
440                protobuf::MysqlSmallIntUnsigned {},
441            )),
442        },
443        RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
444            r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
445                protobuf::MysqlMediumInt {},
446            )),
447        },
448        RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
449            r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
450                protobuf::MysqlMediumIntUnsigned {},
451            )),
452        },
453        RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
454            r#type: Some(protobuf::remote_type::Type::MysqlInteger(
455                protobuf::MysqlInteger {},
456            )),
457        },
458        RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
459            r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
460                protobuf::MysqlIntegerUnsigned {},
461            )),
462        },
463        RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
464            r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
465                protobuf::MysqlBigInt {},
466            )),
467        },
468        RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
469            r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
470                protobuf::MysqlBigIntUnsigned {},
471            )),
472        },
473        RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
474            r#type: Some(protobuf::remote_type::Type::MysqlFloat(
475                protobuf::MysqlFloat {},
476            )),
477        },
478        RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
479            r#type: Some(protobuf::remote_type::Type::MysqlDouble(
480                protobuf::MysqlDouble {},
481            )),
482        },
483        RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
484            r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
485                protobuf::MysqlDecimal {
486                    precision: *precision as u32,
487                    scale: *scale as u32,
488                },
489            )),
490        },
491        RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
492            r#type: Some(protobuf::remote_type::Type::MysqlDate(
493                protobuf::MysqlDate {},
494            )),
495        },
496        RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
497            r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
498                protobuf::MysqlDateTime {},
499            )),
500        },
501        RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
502            r#type: Some(protobuf::remote_type::Type::MysqlTime(
503                protobuf::MysqlTime {},
504            )),
505        },
506        RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
507            r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
508                protobuf::MysqlTimestamp {},
509            )),
510        },
511        RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
512            r#type: Some(protobuf::remote_type::Type::MysqlYear(
513                protobuf::MysqlYear {},
514            )),
515        },
516        RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
517            r#type: Some(protobuf::remote_type::Type::MysqlChar(
518                protobuf::MysqlChar {},
519            )),
520        },
521        RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
522            r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
523                protobuf::MysqlVarchar {},
524            )),
525        },
526        RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
527            r#type: Some(protobuf::remote_type::Type::MysqlBinary(
528                protobuf::MysqlBinary {},
529            )),
530        },
531        RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
532            r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
533                protobuf::MysqlVarbinary {},
534            )),
535        },
536        RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
537            r#type: Some(protobuf::remote_type::Type::MysqlText(
538                protobuf::MysqlText { length: *len },
539            )),
540        },
541        RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
542            r#type: Some(protobuf::remote_type::Type::MysqlBlob(
543                protobuf::MysqlBlob { length: *len },
544            )),
545        },
546        RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
547            r#type: Some(protobuf::remote_type::Type::MysqlJson(
548                protobuf::MysqlJson {},
549            )),
550        },
551        RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
552            r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
553                protobuf::MysqlGeometry {},
554            )),
555        },
556
557        RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
558            r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
559                protobuf::OracleVarchar2 { length: *len },
560            )),
561        },
562        RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
563            r#type: Some(protobuf::remote_type::Type::OracleChar(
564                protobuf::OracleChar { length: *len },
565            )),
566        },
567        RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
568            r#type: Some(protobuf::remote_type::Type::OracleNumber(
569                protobuf::OracleNumber {
570                    precision: *precision as u32,
571                    scale: *scale as i32,
572                },
573            )),
574        },
575        RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
576            r#type: Some(protobuf::remote_type::Type::OracleDate(
577                protobuf::OracleDate {},
578            )),
579        },
580        RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
581            r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
582                protobuf::OracleTimestamp {},
583            )),
584        },
585        RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
586            r#type: Some(protobuf::remote_type::Type::OracleBoolean(
587                protobuf::OracleBoolean {},
588            )),
589        },
590        RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
591            r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
592                protobuf::OracleBinaryFloat {},
593            )),
594        },
595        RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
596            r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
597                protobuf::OracleBinaryDouble {},
598            )),
599        },
600        RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
601            r#type: Some(protobuf::remote_type::Type::OracleBlob(
602                protobuf::OracleBlob {},
603            )),
604        },
605        RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
606            r#type: Some(protobuf::remote_type::Type::OracleFloat(
607                protobuf::OracleFloat {
608                    precision: *precision as u32,
609                },
610            )),
611        },
612        RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
613            r#type: Some(protobuf::remote_type::Type::OracleNchar(
614                protobuf::OracleNChar { length: *len },
615            )),
616        },
617        RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
618            r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
619                protobuf::OracleNVarchar2 { length: *len },
620            )),
621        },
622        RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
623            r#type: Some(protobuf::remote_type::Type::OracleRaw(
624                protobuf::OracleRaw { length: *len },
625            )),
626        },
627        RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
628            r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
629                protobuf::OracleLongRaw {},
630            )),
631        },
632        RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
633            r#type: Some(protobuf::remote_type::Type::OracleLong(
634                protobuf::OracleLong {},
635            )),
636        },
637        RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
638            r#type: Some(protobuf::remote_type::Type::OracleClob(
639                protobuf::OracleClob {},
640            )),
641        },
642        RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
643            r#type: Some(protobuf::remote_type::Type::OracleNclob(
644                protobuf::OracleNClob {},
645            )),
646        },
647        RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
648            r#type: Some(protobuf::remote_type::Type::SqliteNull(
649                protobuf::SqliteNull {},
650            )),
651        },
652        RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
653            r#type: Some(protobuf::remote_type::Type::SqliteInteger(
654                protobuf::SqliteInteger {},
655            )),
656        },
657        RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
658            r#type: Some(protobuf::remote_type::Type::SqliteReal(
659                protobuf::SqliteReal {},
660            )),
661        },
662        RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
663            r#type: Some(protobuf::remote_type::Type::SqliteText(
664                protobuf::SqliteText {},
665            )),
666        },
667        RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
668            r#type: Some(protobuf::remote_type::Type::SqliteBlob(
669                protobuf::SqliteBlob {},
670            )),
671        },
672    }
673}
674
675fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
676    let fields = remote_schema
677        .fields
678        .iter()
679        .map(parse_remote_field)
680        .collect::<Vec<_>>();
681
682    RemoteSchema { fields }
683}
684
685fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
686    RemoteField {
687        name: field.name.clone(),
688        remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
689        nullable: field.nullable,
690    }
691}
692
693fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
694    match remote_type.r#type.as_ref().unwrap() {
695        protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
696        protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
697        protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
698        protobuf::remote_type::Type::PostgresFloat4(_) => {
699            RemoteType::Postgres(PostgresType::Float4)
700        }
701        protobuf::remote_type::Type::PostgresFloat8(_) => {
702            RemoteType::Postgres(PostgresType::Float8)
703        }
704        protobuf::remote_type::Type::PostgresNumeric(numeric) => {
705            RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
706        }
707        protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
708        protobuf::remote_type::Type::PostgresVarchar(_) => {
709            RemoteType::Postgres(PostgresType::Varchar)
710        }
711        protobuf::remote_type::Type::PostgresBpchar(_) => {
712            RemoteType::Postgres(PostgresType::Bpchar)
713        }
714        protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
715        protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
716        protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
717        protobuf::remote_type::Type::PostgresTimestamp(_) => {
718            RemoteType::Postgres(PostgresType::Timestamp)
719        }
720        protobuf::remote_type::Type::PostgresTimestampTz(_) => {
721            RemoteType::Postgres(PostgresType::TimestampTz)
722        }
723        protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
724        protobuf::remote_type::Type::PostgresInterval(_) => {
725            RemoteType::Postgres(PostgresType::Interval)
726        }
727        protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
728        protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
729        protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
730        protobuf::remote_type::Type::PostgresInt2Array(_) => {
731            RemoteType::Postgres(PostgresType::Int2Array)
732        }
733        protobuf::remote_type::Type::PostgresInt4Array(_) => {
734            RemoteType::Postgres(PostgresType::Int4Array)
735        }
736        protobuf::remote_type::Type::PostgresInt8Array(_) => {
737            RemoteType::Postgres(PostgresType::Int8Array)
738        }
739        protobuf::remote_type::Type::PostgresFloat4Array(_) => {
740            RemoteType::Postgres(PostgresType::Float4Array)
741        }
742        protobuf::remote_type::Type::PostgresFloat8Array(_) => {
743            RemoteType::Postgres(PostgresType::Float8Array)
744        }
745        protobuf::remote_type::Type::PostgresVarcharArray(_) => {
746            RemoteType::Postgres(PostgresType::VarcharArray)
747        }
748        protobuf::remote_type::Type::PostgresBpcharArray(_) => {
749            RemoteType::Postgres(PostgresType::BpcharArray)
750        }
751        protobuf::remote_type::Type::PostgresTextArray(_) => {
752            RemoteType::Postgres(PostgresType::TextArray)
753        }
754        protobuf::remote_type::Type::PostgresByteaArray(_) => {
755            RemoteType::Postgres(PostgresType::ByteaArray)
756        }
757        protobuf::remote_type::Type::PostgresBoolArray(_) => {
758            RemoteType::Postgres(PostgresType::BoolArray)
759        }
760        protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
761            RemoteType::Postgres(PostgresType::PostGisGeometry)
762        }
763        protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
764        protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
765        protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
766            RemoteType::Mysql(MysqlType::TinyIntUnsigned)
767        }
768        protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
769        protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
770            RemoteType::Mysql(MysqlType::SmallIntUnsigned)
771        }
772        protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
773        protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
774            RemoteType::Mysql(MysqlType::MediumIntUnsigned)
775        }
776        protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
777        protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
778            RemoteType::Mysql(MysqlType::IntegerUnsigned)
779        }
780        protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
781        protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
782            RemoteType::Mysql(MysqlType::BigIntUnsigned)
783        }
784        protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
785        protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
786        protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
787            MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
788        ),
789        protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
790        protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
791        protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
792        protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
793        protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
794        protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
795        protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
796        protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
797        protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
798        protobuf::remote_type::Type::MysqlText(text) => {
799            RemoteType::Mysql(MysqlType::Text(text.length))
800        }
801        protobuf::remote_type::Type::MysqlBlob(blob) => {
802            RemoteType::Mysql(MysqlType::Blob(blob.length))
803        }
804        protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
805        protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
806        protobuf::remote_type::Type::OracleVarchar2(varchar) => {
807            RemoteType::Oracle(OracleType::Varchar2(varchar.length))
808        }
809        protobuf::remote_type::Type::OracleChar(char) => {
810            RemoteType::Oracle(OracleType::Char(char.length))
811        }
812        protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
813            OracleType::Number(number.precision as u8, number.scale as i8),
814        ),
815        protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
816        protobuf::remote_type::Type::OracleTimestamp(_) => {
817            RemoteType::Oracle(OracleType::Timestamp)
818        }
819        protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
820        protobuf::remote_type::Type::OracleBinaryFloat(_) => {
821            RemoteType::Oracle(OracleType::BinaryFloat)
822        }
823        protobuf::remote_type::Type::OracleBinaryDouble(_) => {
824            RemoteType::Oracle(OracleType::BinaryDouble)
825        }
826        protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
827            RemoteType::Oracle(OracleType::Float(*precision as u8))
828        }
829        protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
830            RemoteType::Oracle(OracleType::NChar(*length))
831        }
832        protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
833            RemoteType::Oracle(OracleType::NVarchar2(*length))
834        }
835        protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
836            RemoteType::Oracle(OracleType::Raw(*length))
837        }
838        protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
839        protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
840        protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
841        protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
842        protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
843        protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
844        protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
845        protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
846        protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
847        protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
848    }
849}