datafusion_remote_table/
codec.rs

1#[cfg(feature = "mysql")]
2use crate::MysqlConnectionOptions;
3#[cfg(feature = "oracle")]
4use crate::OracleConnectionOptions;
5#[cfg(feature = "postgres")]
6use crate::PostgresConnectionOptions;
7use crate::generated::prost as protobuf;
8use crate::{
9    ConnectionOptions, DFResult, MysqlType, OracleType, PostgresType, RemoteField, RemoteSchema,
10    RemoteSchemaRef, RemoteTableExec, RemoteType, SqliteType, Transform, connect,
11};
12use datafusion::arrow::datatypes::SchemaRef;
13use datafusion::common::DataFusionError;
14use datafusion::execution::FunctionRegistry;
15use datafusion::physical_plan::ExecutionPlan;
16use datafusion_proto::convert_required;
17use datafusion_proto::logical_plan::from_proto::parse_exprs;
18use datafusion_proto::logical_plan::to_proto::serialize_exprs;
19use datafusion_proto::logical_plan::{DefaultLogicalExtensionCodec, LogicalExtensionCodec};
20use datafusion_proto::physical_plan::PhysicalExtensionCodec;
21use datafusion_proto::protobuf::proto_error;
22use derive_with::With;
23use prost::Message;
24use std::fmt::Debug;
25#[cfg(feature = "sqlite")]
26use std::path::Path;
27use std::sync::Arc;
28
29pub trait TransformCodec: Debug + Send + Sync {
30    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
31    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
32}
33
34#[derive(Debug, With)]
35pub struct RemotePhysicalCodec {
36    transform_codec: Option<Arc<dyn TransformCodec>>,
37    logical_extension_codec: Arc<dyn LogicalExtensionCodec>,
38}
39
40impl RemotePhysicalCodec {
41    pub fn new() -> Self {
42        Self {
43            transform_codec: None,
44            logical_extension_codec: Arc::new(DefaultLogicalExtensionCodec {}),
45        }
46    }
47}
48
49impl Default for RemotePhysicalCodec {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55impl PhysicalExtensionCodec for RemotePhysicalCodec {
56    fn try_decode(
57        &self,
58        buf: &[u8],
59        _inputs: &[Arc<dyn ExecutionPlan>],
60        registry: &dyn FunctionRegistry,
61    ) -> DFResult<Arc<dyn ExecutionPlan>> {
62        let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
63            DataFusionError::Internal(format!(
64                "Failed to decode remote table execution plan: {e:?}"
65            ))
66        })?;
67
68        let transform = if let Some(bytes) = proto.transform {
69            let Some(transform_codec) = self.transform_codec.as_ref() else {
70                return Err(DataFusionError::Execution(
71                    "No transform codec found".to_string(),
72                ));
73            };
74            Some(transform_codec.try_decode(&bytes)?)
75        } else {
76            None
77        };
78
79        let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
80        let remote_schema = proto
81            .remote_schema
82            .map(|schema| Arc::new(parse_remote_schema(&schema)));
83
84        let projection: Option<Vec<usize>> = proto
85            .projection
86            .map(|p| p.projection.iter().map(|n| *n as usize).collect());
87
88        let filters = parse_exprs(
89            &proto.filters,
90            registry,
91            self.logical_extension_codec.as_ref(),
92        )?;
93        let limit = proto.limit.map(|l| l as usize);
94
95        let conn_options = parse_connection_options(proto.conn_options.unwrap());
96        let conn = tokio::task::block_in_place(|| {
97            tokio::runtime::Handle::current().block_on(async {
98                let pool = connect(&conn_options).await?;
99                let conn = pool.get().await?;
100                Ok::<_, DataFusionError>(conn)
101            })
102        })?;
103
104        Ok(Arc::new(RemoteTableExec::try_new(
105            conn_options,
106            proto.sql,
107            table_schema,
108            remote_schema,
109            projection,
110            filters,
111            limit,
112            transform,
113            conn,
114        )?))
115    }
116
117    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
118        if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
119            let serialized_transform = if let Some(transform) = exec.transform.as_ref() {
120                let Some(transform_codec) = self.transform_codec.as_ref() else {
121                    return Err(DataFusionError::Execution(
122                        "No transform codec found".to_string(),
123                    ));
124                };
125                let bytes = transform_codec.try_encode(transform.as_ref())?;
126                Some(bytes)
127            } else {
128                None
129            };
130
131            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
132            let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
133            let serialized_filters =
134                serialize_exprs(&exec.filters, self.logical_extension_codec.as_ref())?;
135
136            let proto = protobuf::RemoteTableExec {
137                conn_options: Some(serialized_connection_options),
138                sql: exec.sql.clone(),
139                table_schema: Some(exec.table_schema.as_ref().try_into()?),
140                remote_schema,
141                projection: exec
142                    .projection
143                    .as_ref()
144                    .map(|p| serialize_projection(p.as_slice())),
145                filters: serialized_filters,
146                limit: exec.limit.map(|l| l as u32),
147                transform: serialized_transform,
148            };
149
150            proto.encode(buf).map_err(|e| {
151                DataFusionError::Internal(format!(
152                    "Failed to encode remote table execution plan: {e:?}"
153                ))
154            })?;
155            Ok(())
156        } else {
157            Err(DataFusionError::Execution(format!(
158                "Failed to encode {}",
159                RemoteTableExec::static_name()
160            )))
161        }
162    }
163}
164
165fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
166    match options {
167        #[cfg(feature = "postgres")]
168        ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
169            connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
170                protobuf::PostgresConnectionOptions {
171                    host: options.host.clone(),
172                    port: options.port as u32,
173                    username: options.username.clone(),
174                    password: options.password.clone(),
175                    database: options.database.clone(),
176                    pool_max_size: options.pool_max_size as u32,
177                    stream_chunk_size: options.stream_chunk_size as u32,
178                },
179            )),
180        },
181        #[cfg(feature = "mysql")]
182        ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
183            connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
184                protobuf::MysqlConnectionOptions {
185                    host: options.host.clone(),
186                    port: options.port as u32,
187                    username: options.username.clone(),
188                    password: options.password.clone(),
189                    database: options.database.clone(),
190                    pool_max_size: options.pool_max_size as u32,
191                    stream_chunk_size: options.stream_chunk_size as u32,
192                },
193            )),
194        },
195        #[cfg(feature = "oracle")]
196        ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
197            connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
198                protobuf::OracleConnectionOptions {
199                    host: options.host.clone(),
200                    port: options.port as u32,
201                    username: options.username.clone(),
202                    password: options.password.clone(),
203                    service_name: options.service_name.clone(),
204                    pool_max_size: options.pool_max_size as u32,
205                    stream_chunk_size: options.stream_chunk_size as u32,
206                },
207            )),
208        },
209        #[cfg(feature = "sqlite")]
210        ConnectionOptions::Sqlite(path) => protobuf::ConnectionOptions {
211            connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
212                protobuf::SqliteConnectionOptions {
213                    path: path.to_str().unwrap().to_string(),
214                },
215            )),
216        },
217    }
218}
219
220fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
221    match options.connection_options {
222        #[cfg(feature = "postgres")]
223        Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
224            ConnectionOptions::Postgres(PostgresConnectionOptions {
225                host: options.host,
226                port: options.port as u16,
227                username: options.username,
228                password: options.password,
229                database: options.database,
230                pool_max_size: options.pool_max_size as usize,
231                stream_chunk_size: options.stream_chunk_size as usize,
232            })
233        }
234        #[cfg(feature = "mysql")]
235        Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
236            ConnectionOptions::Mysql(MysqlConnectionOptions {
237                host: options.host,
238                port: options.port as u16,
239                username: options.username,
240                password: options.password,
241                database: options.database,
242                pool_max_size: options.pool_max_size as usize,
243                stream_chunk_size: options.stream_chunk_size as usize,
244            })
245        }
246        #[cfg(feature = "oracle")]
247        Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
248            ConnectionOptions::Oracle(OracleConnectionOptions {
249                host: options.host,
250                port: options.port as u16,
251                username: options.username,
252                password: options.password,
253                service_name: options.service_name,
254                pool_max_size: options.pool_max_size as usize,
255                stream_chunk_size: options.stream_chunk_size as usize,
256            })
257        }
258        #[cfg(feature = "sqlite")]
259        Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
260            ConnectionOptions::Sqlite(Path::new(&options.path).to_path_buf())
261        }
262        _ => panic!("Failed to parse connection options: {options:?}"),
263    }
264}
265
266fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
267    protobuf::Projection {
268        projection: projection.iter().map(|n| *n as u32).collect(),
269    }
270}
271
272fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
273    let fields = remote_schema
274        .fields
275        .iter()
276        .map(serialize_remote_field)
277        .collect::<Vec<_>>();
278
279    protobuf::RemoteSchema { fields }
280}
281
282fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
283    protobuf::RemoteField {
284        name: remote_field.name.clone(),
285        remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
286        nullable: remote_field.nullable,
287    }
288}
289
290fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
291    match remote_type {
292        RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
293            r#type: Some(protobuf::remote_type::Type::PostgresInt2(
294                protobuf::PostgresInt2 {},
295            )),
296        },
297        RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
298            r#type: Some(protobuf::remote_type::Type::PostgresInt4(
299                protobuf::PostgresInt4 {},
300            )),
301        },
302        RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
303            r#type: Some(protobuf::remote_type::Type::PostgresInt8(
304                protobuf::PostgresInt8 {},
305            )),
306        },
307        RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
308            r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
309                protobuf::PostgresFloat4 {},
310            )),
311        },
312        RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
313            r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
314                protobuf::PostgresFloat8 {},
315            )),
316        },
317        RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
318            r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
319                protobuf::PostgresNumeric {
320                    scale: *scale as i32,
321                },
322            )),
323        },
324        RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
325            r#type: Some(protobuf::remote_type::Type::PostgresName(
326                protobuf::PostgresName {},
327            )),
328        },
329        RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
330            r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
331                protobuf::PostgresVarchar {},
332            )),
333        },
334        RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
335            r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
336                protobuf::PostgresBpchar {},
337            )),
338        },
339        RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
340            r#type: Some(protobuf::remote_type::Type::PostgresText(
341                protobuf::PostgresText {},
342            )),
343        },
344        RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
345            r#type: Some(protobuf::remote_type::Type::PostgresBytea(
346                protobuf::PostgresBytea {},
347            )),
348        },
349        RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
350            r#type: Some(protobuf::remote_type::Type::PostgresDate(
351                protobuf::PostgresDate {},
352            )),
353        },
354        RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
355            r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
356                protobuf::PostgresTimestamp {},
357            )),
358        },
359        RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
360            r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
361                protobuf::PostgresTimestampTz {},
362            )),
363        },
364        RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
365            r#type: Some(protobuf::remote_type::Type::PostgresTime(
366                protobuf::PostgresTime {},
367            )),
368        },
369        RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
370            r#type: Some(protobuf::remote_type::Type::PostgresInterval(
371                protobuf::PostgresInterval {},
372            )),
373        },
374        RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
375            r#type: Some(protobuf::remote_type::Type::PostgresBool(
376                protobuf::PostgresBool {},
377            )),
378        },
379        RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
380            r#type: Some(protobuf::remote_type::Type::PostgresJson(
381                protobuf::PostgresJson {},
382            )),
383        },
384        RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
385            r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
386                protobuf::PostgresJsonb {},
387            )),
388        },
389        RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
390            r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
391                protobuf::PostgresInt2Array {},
392            )),
393        },
394        RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
395            r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
396                protobuf::PostgresInt4Array {},
397            )),
398        },
399        RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
400            r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
401                protobuf::PostgresInt8Array {},
402            )),
403        },
404        RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
405            r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
406                protobuf::PostgresFloat4Array {},
407            )),
408        },
409        RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
410            r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
411                protobuf::PostgresFloat8Array {},
412            )),
413        },
414        RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
415            r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
416                protobuf::PostgresVarcharArray {},
417            )),
418        },
419        RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
420            r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
421                protobuf::PostgresBpcharArray {},
422            )),
423        },
424        RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
425            r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
426                protobuf::PostgresTextArray {},
427            )),
428        },
429        RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
430            r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
431                protobuf::PostgresByteaArray {},
432            )),
433        },
434        RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
435            r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
436                protobuf::PostgresBoolArray {},
437            )),
438        },
439        RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
440            r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
441                protobuf::PostgresPostGisGeometry {},
442            )),
443        },
444        RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
445            r#type: Some(protobuf::remote_type::Type::PostgresOid(
446                protobuf::PostgresOid {},
447            )),
448        },
449
450        RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
451            r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
452                protobuf::MysqlTinyInt {},
453            )),
454        },
455        RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
456            r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
457                protobuf::MysqlTinyIntUnsigned {},
458            )),
459        },
460        RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
461            r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
462                protobuf::MysqlSmallInt {},
463            )),
464        },
465        RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
466            r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
467                protobuf::MysqlSmallIntUnsigned {},
468            )),
469        },
470        RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
471            r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
472                protobuf::MysqlMediumInt {},
473            )),
474        },
475        RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
476            r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
477                protobuf::MysqlMediumIntUnsigned {},
478            )),
479        },
480        RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
481            r#type: Some(protobuf::remote_type::Type::MysqlInteger(
482                protobuf::MysqlInteger {},
483            )),
484        },
485        RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
486            r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
487                protobuf::MysqlIntegerUnsigned {},
488            )),
489        },
490        RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
491            r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
492                protobuf::MysqlBigInt {},
493            )),
494        },
495        RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
496            r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
497                protobuf::MysqlBigIntUnsigned {},
498            )),
499        },
500        RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
501            r#type: Some(protobuf::remote_type::Type::MysqlFloat(
502                protobuf::MysqlFloat {},
503            )),
504        },
505        RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
506            r#type: Some(protobuf::remote_type::Type::MysqlDouble(
507                protobuf::MysqlDouble {},
508            )),
509        },
510        RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
511            r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
512                protobuf::MysqlDecimal {
513                    precision: *precision as u32,
514                    scale: *scale as u32,
515                },
516            )),
517        },
518        RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
519            r#type: Some(protobuf::remote_type::Type::MysqlDate(
520                protobuf::MysqlDate {},
521            )),
522        },
523        RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
524            r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
525                protobuf::MysqlDateTime {},
526            )),
527        },
528        RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
529            r#type: Some(protobuf::remote_type::Type::MysqlTime(
530                protobuf::MysqlTime {},
531            )),
532        },
533        RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
534            r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
535                protobuf::MysqlTimestamp {},
536            )),
537        },
538        RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
539            r#type: Some(protobuf::remote_type::Type::MysqlYear(
540                protobuf::MysqlYear {},
541            )),
542        },
543        RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
544            r#type: Some(protobuf::remote_type::Type::MysqlChar(
545                protobuf::MysqlChar {},
546            )),
547        },
548        RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
549            r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
550                protobuf::MysqlVarchar {},
551            )),
552        },
553        RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
554            r#type: Some(protobuf::remote_type::Type::MysqlBinary(
555                protobuf::MysqlBinary {},
556            )),
557        },
558        RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
559            r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
560                protobuf::MysqlVarbinary {},
561            )),
562        },
563        RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
564            r#type: Some(protobuf::remote_type::Type::MysqlText(
565                protobuf::MysqlText { length: *len },
566            )),
567        },
568        RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
569            r#type: Some(protobuf::remote_type::Type::MysqlBlob(
570                protobuf::MysqlBlob { length: *len },
571            )),
572        },
573        RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
574            r#type: Some(protobuf::remote_type::Type::MysqlJson(
575                protobuf::MysqlJson {},
576            )),
577        },
578        RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
579            r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
580                protobuf::MysqlGeometry {},
581            )),
582        },
583
584        RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
585            r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
586                protobuf::OracleVarchar2 { length: *len },
587            )),
588        },
589        RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
590            r#type: Some(protobuf::remote_type::Type::OracleChar(
591                protobuf::OracleChar { length: *len },
592            )),
593        },
594        RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
595            r#type: Some(protobuf::remote_type::Type::OracleNumber(
596                protobuf::OracleNumber {
597                    precision: *precision as u32,
598                    scale: *scale as i32,
599                },
600            )),
601        },
602        RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
603            r#type: Some(protobuf::remote_type::Type::OracleDate(
604                protobuf::OracleDate {},
605            )),
606        },
607        RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
608            r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
609                protobuf::OracleTimestamp {},
610            )),
611        },
612        RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
613            r#type: Some(protobuf::remote_type::Type::OracleBoolean(
614                protobuf::OracleBoolean {},
615            )),
616        },
617        RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
618            r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
619                protobuf::OracleBinaryFloat {},
620            )),
621        },
622        RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
623            r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
624                protobuf::OracleBinaryDouble {},
625            )),
626        },
627        RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
628            r#type: Some(protobuf::remote_type::Type::OracleBlob(
629                protobuf::OracleBlob {},
630            )),
631        },
632        RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
633            r#type: Some(protobuf::remote_type::Type::OracleFloat(
634                protobuf::OracleFloat {
635                    precision: *precision as u32,
636                },
637            )),
638        },
639        RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
640            r#type: Some(protobuf::remote_type::Type::OracleNchar(
641                protobuf::OracleNChar { length: *len },
642            )),
643        },
644        RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
645            r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
646                protobuf::OracleNVarchar2 { length: *len },
647            )),
648        },
649        RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
650            r#type: Some(protobuf::remote_type::Type::OracleRaw(
651                protobuf::OracleRaw { length: *len },
652            )),
653        },
654        RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
655            r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
656                protobuf::OracleLongRaw {},
657            )),
658        },
659        RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
660            r#type: Some(protobuf::remote_type::Type::OracleLong(
661                protobuf::OracleLong {},
662            )),
663        },
664        RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
665            r#type: Some(protobuf::remote_type::Type::OracleClob(
666                protobuf::OracleClob {},
667            )),
668        },
669        RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
670            r#type: Some(protobuf::remote_type::Type::OracleNclob(
671                protobuf::OracleNClob {},
672            )),
673        },
674        RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
675            r#type: Some(protobuf::remote_type::Type::SqliteNull(
676                protobuf::SqliteNull {},
677            )),
678        },
679        RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
680            r#type: Some(protobuf::remote_type::Type::SqliteInteger(
681                protobuf::SqliteInteger {},
682            )),
683        },
684        RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
685            r#type: Some(protobuf::remote_type::Type::SqliteReal(
686                protobuf::SqliteReal {},
687            )),
688        },
689        RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
690            r#type: Some(protobuf::remote_type::Type::SqliteText(
691                protobuf::SqliteText {},
692            )),
693        },
694        RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
695            r#type: Some(protobuf::remote_type::Type::SqliteBlob(
696                protobuf::SqliteBlob {},
697            )),
698        },
699    }
700}
701
702fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
703    let fields = remote_schema
704        .fields
705        .iter()
706        .map(parse_remote_field)
707        .collect::<Vec<_>>();
708
709    RemoteSchema { fields }
710}
711
712fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
713    RemoteField {
714        name: field.name.clone(),
715        remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
716        nullable: field.nullable,
717    }
718}
719
720fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
721    match remote_type.r#type.as_ref().unwrap() {
722        protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
723        protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
724        protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
725        protobuf::remote_type::Type::PostgresFloat4(_) => {
726            RemoteType::Postgres(PostgresType::Float4)
727        }
728        protobuf::remote_type::Type::PostgresFloat8(_) => {
729            RemoteType::Postgres(PostgresType::Float8)
730        }
731        protobuf::remote_type::Type::PostgresNumeric(numeric) => {
732            RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
733        }
734        protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
735        protobuf::remote_type::Type::PostgresVarchar(_) => {
736            RemoteType::Postgres(PostgresType::Varchar)
737        }
738        protobuf::remote_type::Type::PostgresBpchar(_) => {
739            RemoteType::Postgres(PostgresType::Bpchar)
740        }
741        protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
742        protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
743        protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
744        protobuf::remote_type::Type::PostgresTimestamp(_) => {
745            RemoteType::Postgres(PostgresType::Timestamp)
746        }
747        protobuf::remote_type::Type::PostgresTimestampTz(_) => {
748            RemoteType::Postgres(PostgresType::TimestampTz)
749        }
750        protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
751        protobuf::remote_type::Type::PostgresInterval(_) => {
752            RemoteType::Postgres(PostgresType::Interval)
753        }
754        protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
755        protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
756        protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
757        protobuf::remote_type::Type::PostgresInt2Array(_) => {
758            RemoteType::Postgres(PostgresType::Int2Array)
759        }
760        protobuf::remote_type::Type::PostgresInt4Array(_) => {
761            RemoteType::Postgres(PostgresType::Int4Array)
762        }
763        protobuf::remote_type::Type::PostgresInt8Array(_) => {
764            RemoteType::Postgres(PostgresType::Int8Array)
765        }
766        protobuf::remote_type::Type::PostgresFloat4Array(_) => {
767            RemoteType::Postgres(PostgresType::Float4Array)
768        }
769        protobuf::remote_type::Type::PostgresFloat8Array(_) => {
770            RemoteType::Postgres(PostgresType::Float8Array)
771        }
772        protobuf::remote_type::Type::PostgresVarcharArray(_) => {
773            RemoteType::Postgres(PostgresType::VarcharArray)
774        }
775        protobuf::remote_type::Type::PostgresBpcharArray(_) => {
776            RemoteType::Postgres(PostgresType::BpcharArray)
777        }
778        protobuf::remote_type::Type::PostgresTextArray(_) => {
779            RemoteType::Postgres(PostgresType::TextArray)
780        }
781        protobuf::remote_type::Type::PostgresByteaArray(_) => {
782            RemoteType::Postgres(PostgresType::ByteaArray)
783        }
784        protobuf::remote_type::Type::PostgresBoolArray(_) => {
785            RemoteType::Postgres(PostgresType::BoolArray)
786        }
787        protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
788            RemoteType::Postgres(PostgresType::PostGisGeometry)
789        }
790        protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
791        protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
792        protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
793            RemoteType::Mysql(MysqlType::TinyIntUnsigned)
794        }
795        protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
796        protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
797            RemoteType::Mysql(MysqlType::SmallIntUnsigned)
798        }
799        protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
800        protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
801            RemoteType::Mysql(MysqlType::MediumIntUnsigned)
802        }
803        protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
804        protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
805            RemoteType::Mysql(MysqlType::IntegerUnsigned)
806        }
807        protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
808        protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
809            RemoteType::Mysql(MysqlType::BigIntUnsigned)
810        }
811        protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
812        protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
813        protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
814            MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
815        ),
816        protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
817        protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
818        protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
819        protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
820        protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
821        protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
822        protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
823        protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
824        protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
825        protobuf::remote_type::Type::MysqlText(text) => {
826            RemoteType::Mysql(MysqlType::Text(text.length))
827        }
828        protobuf::remote_type::Type::MysqlBlob(blob) => {
829            RemoteType::Mysql(MysqlType::Blob(blob.length))
830        }
831        protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
832        protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
833        protobuf::remote_type::Type::OracleVarchar2(varchar) => {
834            RemoteType::Oracle(OracleType::Varchar2(varchar.length))
835        }
836        protobuf::remote_type::Type::OracleChar(char) => {
837            RemoteType::Oracle(OracleType::Char(char.length))
838        }
839        protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
840            OracleType::Number(number.precision as u8, number.scale as i8),
841        ),
842        protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
843        protobuf::remote_type::Type::OracleTimestamp(_) => {
844            RemoteType::Oracle(OracleType::Timestamp)
845        }
846        protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
847        protobuf::remote_type::Type::OracleBinaryFloat(_) => {
848            RemoteType::Oracle(OracleType::BinaryFloat)
849        }
850        protobuf::remote_type::Type::OracleBinaryDouble(_) => {
851            RemoteType::Oracle(OracleType::BinaryDouble)
852        }
853        protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
854            RemoteType::Oracle(OracleType::Float(*precision as u8))
855        }
856        protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
857            RemoteType::Oracle(OracleType::NChar(*length))
858        }
859        protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
860            RemoteType::Oracle(OracleType::NVarchar2(*length))
861        }
862        protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
863            RemoteType::Oracle(OracleType::Raw(*length))
864        }
865        protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
866        protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
867        protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
868        protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
869        protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
870        protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
871        protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
872        protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
873        protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
874        protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
875    }
876}