datafusion_remote_table/
codec.rs

1#[cfg(feature = "dm")]
2use crate::DmConnectionOptions;
3#[cfg(feature = "mysql")]
4use crate::MysqlConnectionOptions;
5#[cfg(feature = "oracle")]
6use crate::OracleConnectionOptions;
7#[cfg(feature = "postgres")]
8use crate::PostgresConnectionOptions;
9#[cfg(feature = "sqlite")]
10use crate::SqliteConnectionOptions;
11use crate::generated::prost as protobuf;
12use crate::{
13    ConnectionOptions, DFResult, DmType, MysqlType, OracleType, PostgresType, RemoteField,
14    RemoteSchema, RemoteSchemaRef, RemoteTableExec, RemoteType, SqliteType, Transform, connect,
15};
16use datafusion::arrow::datatypes::SchemaRef;
17use datafusion::common::DataFusionError;
18use datafusion::execution::FunctionRegistry;
19use datafusion::physical_plan::ExecutionPlan;
20use datafusion_proto::convert_required;
21use datafusion_proto::logical_plan::from_proto::parse_exprs;
22use datafusion_proto::logical_plan::to_proto::serialize_exprs;
23use datafusion_proto::logical_plan::{DefaultLogicalExtensionCodec, LogicalExtensionCodec};
24use datafusion_proto::physical_plan::PhysicalExtensionCodec;
25use datafusion_proto::protobuf::proto_error;
26use derive_with::With;
27use prost::Message;
28use std::fmt::Debug;
29#[cfg(feature = "sqlite")]
30use std::path::Path;
31use std::sync::Arc;
32
33pub trait TransformCodec: Debug + Send + Sync {
34    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
35    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
36}
37
38#[derive(Debug, With)]
39pub struct RemotePhysicalCodec {
40    transform_codec: Option<Arc<dyn TransformCodec>>,
41    logical_extension_codec: Arc<dyn LogicalExtensionCodec>,
42}
43
44impl RemotePhysicalCodec {
45    pub fn new() -> Self {
46        Self {
47            transform_codec: None,
48            logical_extension_codec: Arc::new(DefaultLogicalExtensionCodec {}),
49        }
50    }
51}
52
53impl Default for RemotePhysicalCodec {
54    fn default() -> Self {
55        Self::new()
56    }
57}
58
59impl PhysicalExtensionCodec for RemotePhysicalCodec {
60    fn try_decode(
61        &self,
62        buf: &[u8],
63        _inputs: &[Arc<dyn ExecutionPlan>],
64        registry: &dyn FunctionRegistry,
65    ) -> DFResult<Arc<dyn ExecutionPlan>> {
66        let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
67            DataFusionError::Internal(format!(
68                "Failed to decode remote table execution plan: {e:?}"
69            ))
70        })?;
71
72        let transform = if let Some(bytes) = proto.transform {
73            let Some(transform_codec) = self.transform_codec.as_ref() else {
74                return Err(DataFusionError::Execution(
75                    "No transform codec found".to_string(),
76                ));
77            };
78            Some(transform_codec.try_decode(&bytes)?)
79        } else {
80            None
81        };
82
83        let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
84        let remote_schema = proto
85            .remote_schema
86            .map(|schema| Arc::new(parse_remote_schema(&schema)));
87
88        let projection: Option<Vec<usize>> = proto
89            .projection
90            .map(|p| p.projection.iter().map(|n| *n as usize).collect());
91
92        let filters = parse_exprs(
93            &proto.filters,
94            registry,
95            self.logical_extension_codec.as_ref(),
96        )?;
97        let limit = proto.limit.map(|l| l as usize);
98
99        let conn_options = parse_connection_options(proto.conn_options.unwrap());
100        let conn = tokio::task::block_in_place(|| {
101            tokio::runtime::Handle::current().block_on(async {
102                let pool = connect(&conn_options).await?;
103                let conn = pool.get().await?;
104                Ok::<_, DataFusionError>(conn)
105            })
106        })?;
107
108        Ok(Arc::new(RemoteTableExec::try_new(
109            conn_options,
110            proto.sql,
111            table_schema,
112            remote_schema,
113            projection,
114            filters,
115            limit,
116            transform,
117            conn,
118        )?))
119    }
120
121    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
122        if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
123            let serialized_transform = if let Some(transform) = exec.transform.as_ref() {
124                let Some(transform_codec) = self.transform_codec.as_ref() else {
125                    return Err(DataFusionError::Execution(
126                        "No transform codec found".to_string(),
127                    ));
128                };
129                let bytes = transform_codec.try_encode(transform.as_ref())?;
130                Some(bytes)
131            } else {
132                None
133            };
134
135            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
136            let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
137            let serialized_filters =
138                serialize_exprs(&exec.filters, self.logical_extension_codec.as_ref())?;
139
140            let proto = protobuf::RemoteTableExec {
141                conn_options: Some(serialized_connection_options),
142                sql: exec.sql.clone(),
143                table_schema: Some(exec.table_schema.as_ref().try_into()?),
144                remote_schema,
145                projection: exec
146                    .projection
147                    .as_ref()
148                    .map(|p| serialize_projection(p.as_slice())),
149                filters: serialized_filters,
150                limit: exec.limit.map(|l| l as u32),
151                transform: serialized_transform,
152            };
153
154            proto.encode(buf).map_err(|e| {
155                DataFusionError::Internal(format!(
156                    "Failed to encode remote table execution plan: {e:?}"
157                ))
158            })?;
159            Ok(())
160        } else {
161            Err(DataFusionError::Execution(format!(
162                "Failed to encode {}",
163                RemoteTableExec::static_name()
164            )))
165        }
166    }
167}
168
169fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
170    match options {
171        #[cfg(feature = "postgres")]
172        ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
173            connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
174                protobuf::PostgresConnectionOptions {
175                    host: options.host.clone(),
176                    port: options.port as u32,
177                    username: options.username.clone(),
178                    password: options.password.clone(),
179                    database: options.database.clone(),
180                    pool_max_size: options.pool_max_size as u32,
181                    stream_chunk_size: options.stream_chunk_size as u32,
182                },
183            )),
184        },
185        #[cfg(feature = "mysql")]
186        ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
187            connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
188                protobuf::MysqlConnectionOptions {
189                    host: options.host.clone(),
190                    port: options.port as u32,
191                    username: options.username.clone(),
192                    password: options.password.clone(),
193                    database: options.database.clone(),
194                    pool_max_size: options.pool_max_size as u32,
195                    stream_chunk_size: options.stream_chunk_size as u32,
196                },
197            )),
198        },
199        #[cfg(feature = "oracle")]
200        ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
201            connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
202                protobuf::OracleConnectionOptions {
203                    host: options.host.clone(),
204                    port: options.port as u32,
205                    username: options.username.clone(),
206                    password: options.password.clone(),
207                    service_name: options.service_name.clone(),
208                    pool_max_size: options.pool_max_size as u32,
209                    stream_chunk_size: options.stream_chunk_size as u32,
210                },
211            )),
212        },
213        #[cfg(feature = "sqlite")]
214        ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
215            connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
216                protobuf::SqliteConnectionOptions {
217                    path: options.path.to_str().unwrap().to_string(),
218                    stream_chunk_size: options.stream_chunk_size as u32,
219                },
220            )),
221        },
222        #[cfg(feature = "dm")]
223        ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
224            connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
225                protobuf::DmConnectionOptions {
226                    host: options.host.clone(),
227                    port: options.port as u32,
228                    username: options.username.clone(),
229                    password: options.password.clone(),
230                    schema: options.schema.clone(),
231                    stream_chunk_size: options.stream_chunk_size as u32,
232                    driver: options.driver.clone(),
233                },
234            )),
235        },
236    }
237}
238
239fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
240    match options.connection_options {
241        #[cfg(feature = "postgres")]
242        Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
243            ConnectionOptions::Postgres(PostgresConnectionOptions {
244                host: options.host,
245                port: options.port as u16,
246                username: options.username,
247                password: options.password,
248                database: options.database,
249                pool_max_size: options.pool_max_size as usize,
250                stream_chunk_size: options.stream_chunk_size as usize,
251            })
252        }
253        #[cfg(feature = "mysql")]
254        Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
255            ConnectionOptions::Mysql(MysqlConnectionOptions {
256                host: options.host,
257                port: options.port as u16,
258                username: options.username,
259                password: options.password,
260                database: options.database,
261                pool_max_size: options.pool_max_size as usize,
262                stream_chunk_size: options.stream_chunk_size as usize,
263            })
264        }
265        #[cfg(feature = "oracle")]
266        Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
267            ConnectionOptions::Oracle(OracleConnectionOptions {
268                host: options.host,
269                port: options.port as u16,
270                username: options.username,
271                password: options.password,
272                service_name: options.service_name,
273                pool_max_size: options.pool_max_size as usize,
274                stream_chunk_size: options.stream_chunk_size as usize,
275            })
276        }
277        #[cfg(feature = "sqlite")]
278        Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
279            ConnectionOptions::Sqlite(SqliteConnectionOptions {
280                path: Path::new(&options.path).to_path_buf(),
281                stream_chunk_size: options.stream_chunk_size as usize,
282            })
283        }
284        #[cfg(feature = "dm")]
285        Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
286            ConnectionOptions::Dm(DmConnectionOptions {
287                host: options.host,
288                port: options.port as u16,
289                username: options.username,
290                password: options.password,
291                schema: options.schema,
292                stream_chunk_size: options.stream_chunk_size as usize,
293                driver: options.driver,
294            })
295        }
296        _ => panic!("Failed to parse connection options: {options:?}"),
297    }
298}
299
300fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
301    protobuf::Projection {
302        projection: projection.iter().map(|n| *n as u32).collect(),
303    }
304}
305
306fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
307    let fields = remote_schema
308        .fields
309        .iter()
310        .map(serialize_remote_field)
311        .collect::<Vec<_>>();
312
313    protobuf::RemoteSchema { fields }
314}
315
316fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
317    protobuf::RemoteField {
318        name: remote_field.name.clone(),
319        remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
320        nullable: remote_field.nullable,
321    }
322}
323
324fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
325    match remote_type {
326        RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
327            r#type: Some(protobuf::remote_type::Type::PostgresInt2(
328                protobuf::PostgresInt2 {},
329            )),
330        },
331        RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
332            r#type: Some(protobuf::remote_type::Type::PostgresInt4(
333                protobuf::PostgresInt4 {},
334            )),
335        },
336        RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
337            r#type: Some(protobuf::remote_type::Type::PostgresInt8(
338                protobuf::PostgresInt8 {},
339            )),
340        },
341        RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
342            r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
343                protobuf::PostgresFloat4 {},
344            )),
345        },
346        RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
347            r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
348                protobuf::PostgresFloat8 {},
349            )),
350        },
351        RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
352            r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
353                protobuf::PostgresNumeric {
354                    scale: *scale as i32,
355                },
356            )),
357        },
358        RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
359            r#type: Some(protobuf::remote_type::Type::PostgresName(
360                protobuf::PostgresName {},
361            )),
362        },
363        RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
364            r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
365                protobuf::PostgresVarchar {},
366            )),
367        },
368        RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
369            r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
370                protobuf::PostgresBpchar {},
371            )),
372        },
373        RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
374            r#type: Some(protobuf::remote_type::Type::PostgresText(
375                protobuf::PostgresText {},
376            )),
377        },
378        RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
379            r#type: Some(protobuf::remote_type::Type::PostgresBytea(
380                protobuf::PostgresBytea {},
381            )),
382        },
383        RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
384            r#type: Some(protobuf::remote_type::Type::PostgresDate(
385                protobuf::PostgresDate {},
386            )),
387        },
388        RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
389            r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
390                protobuf::PostgresTimestamp {},
391            )),
392        },
393        RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
394            r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
395                protobuf::PostgresTimestampTz {},
396            )),
397        },
398        RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
399            r#type: Some(protobuf::remote_type::Type::PostgresTime(
400                protobuf::PostgresTime {},
401            )),
402        },
403        RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
404            r#type: Some(protobuf::remote_type::Type::PostgresInterval(
405                protobuf::PostgresInterval {},
406            )),
407        },
408        RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
409            r#type: Some(protobuf::remote_type::Type::PostgresBool(
410                protobuf::PostgresBool {},
411            )),
412        },
413        RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
414            r#type: Some(protobuf::remote_type::Type::PostgresJson(
415                protobuf::PostgresJson {},
416            )),
417        },
418        RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
419            r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
420                protobuf::PostgresJsonb {},
421            )),
422        },
423        RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
424            r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
425                protobuf::PostgresInt2Array {},
426            )),
427        },
428        RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
429            r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
430                protobuf::PostgresInt4Array {},
431            )),
432        },
433        RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
434            r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
435                protobuf::PostgresInt8Array {},
436            )),
437        },
438        RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
439            r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
440                protobuf::PostgresFloat4Array {},
441            )),
442        },
443        RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
444            r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
445                protobuf::PostgresFloat8Array {},
446            )),
447        },
448        RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
449            r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
450                protobuf::PostgresVarcharArray {},
451            )),
452        },
453        RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
454            r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
455                protobuf::PostgresBpcharArray {},
456            )),
457        },
458        RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
459            r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
460                protobuf::PostgresTextArray {},
461            )),
462        },
463        RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
464            r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
465                protobuf::PostgresByteaArray {},
466            )),
467        },
468        RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
469            r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
470                protobuf::PostgresBoolArray {},
471            )),
472        },
473        RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
474            r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
475                protobuf::PostgresPostGisGeometry {},
476            )),
477        },
478        RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
479            r#type: Some(protobuf::remote_type::Type::PostgresOid(
480                protobuf::PostgresOid {},
481            )),
482        },
483
484        RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
485            r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
486                protobuf::MysqlTinyInt {},
487            )),
488        },
489        RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
490            r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
491                protobuf::MysqlTinyIntUnsigned {},
492            )),
493        },
494        RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
495            r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
496                protobuf::MysqlSmallInt {},
497            )),
498        },
499        RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
500            r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
501                protobuf::MysqlSmallIntUnsigned {},
502            )),
503        },
504        RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
505            r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
506                protobuf::MysqlMediumInt {},
507            )),
508        },
509        RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
510            r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
511                protobuf::MysqlMediumIntUnsigned {},
512            )),
513        },
514        RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
515            r#type: Some(protobuf::remote_type::Type::MysqlInteger(
516                protobuf::MysqlInteger {},
517            )),
518        },
519        RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
520            r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
521                protobuf::MysqlIntegerUnsigned {},
522            )),
523        },
524        RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
525            r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
526                protobuf::MysqlBigInt {},
527            )),
528        },
529        RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
530            r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
531                protobuf::MysqlBigIntUnsigned {},
532            )),
533        },
534        RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
535            r#type: Some(protobuf::remote_type::Type::MysqlFloat(
536                protobuf::MysqlFloat {},
537            )),
538        },
539        RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
540            r#type: Some(protobuf::remote_type::Type::MysqlDouble(
541                protobuf::MysqlDouble {},
542            )),
543        },
544        RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
545            r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
546                protobuf::MysqlDecimal {
547                    precision: *precision as u32,
548                    scale: *scale as u32,
549                },
550            )),
551        },
552        RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
553            r#type: Some(protobuf::remote_type::Type::MysqlDate(
554                protobuf::MysqlDate {},
555            )),
556        },
557        RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
558            r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
559                protobuf::MysqlDateTime {},
560            )),
561        },
562        RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
563            r#type: Some(protobuf::remote_type::Type::MysqlTime(
564                protobuf::MysqlTime {},
565            )),
566        },
567        RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
568            r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
569                protobuf::MysqlTimestamp {},
570            )),
571        },
572        RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
573            r#type: Some(protobuf::remote_type::Type::MysqlYear(
574                protobuf::MysqlYear {},
575            )),
576        },
577        RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
578            r#type: Some(protobuf::remote_type::Type::MysqlChar(
579                protobuf::MysqlChar {},
580            )),
581        },
582        RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
583            r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
584                protobuf::MysqlVarchar {},
585            )),
586        },
587        RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
588            r#type: Some(protobuf::remote_type::Type::MysqlBinary(
589                protobuf::MysqlBinary {},
590            )),
591        },
592        RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
593            r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
594                protobuf::MysqlVarbinary {},
595            )),
596        },
597        RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
598            r#type: Some(protobuf::remote_type::Type::MysqlText(
599                protobuf::MysqlText { length: *len },
600            )),
601        },
602        RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
603            r#type: Some(protobuf::remote_type::Type::MysqlBlob(
604                protobuf::MysqlBlob { length: *len },
605            )),
606        },
607        RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
608            r#type: Some(protobuf::remote_type::Type::MysqlJson(
609                protobuf::MysqlJson {},
610            )),
611        },
612        RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
613            r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
614                protobuf::MysqlGeometry {},
615            )),
616        },
617
618        RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
619            r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
620                protobuf::OracleVarchar2 { length: *len },
621            )),
622        },
623        RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
624            r#type: Some(protobuf::remote_type::Type::OracleChar(
625                protobuf::OracleChar { length: *len },
626            )),
627        },
628        RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
629            r#type: Some(protobuf::remote_type::Type::OracleNumber(
630                protobuf::OracleNumber {
631                    precision: *precision as u32,
632                    scale: *scale as i32,
633                },
634            )),
635        },
636        RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
637            r#type: Some(protobuf::remote_type::Type::OracleDate(
638                protobuf::OracleDate {},
639            )),
640        },
641        RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
642            r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
643                protobuf::OracleTimestamp {},
644            )),
645        },
646        RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
647            r#type: Some(protobuf::remote_type::Type::OracleBoolean(
648                protobuf::OracleBoolean {},
649            )),
650        },
651        RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
652            r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
653                protobuf::OracleBinaryFloat {},
654            )),
655        },
656        RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
657            r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
658                protobuf::OracleBinaryDouble {},
659            )),
660        },
661        RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
662            r#type: Some(protobuf::remote_type::Type::OracleBlob(
663                protobuf::OracleBlob {},
664            )),
665        },
666        RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
667            r#type: Some(protobuf::remote_type::Type::OracleFloat(
668                protobuf::OracleFloat {
669                    precision: *precision as u32,
670                },
671            )),
672        },
673        RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
674            r#type: Some(protobuf::remote_type::Type::OracleNchar(
675                protobuf::OracleNChar { length: *len },
676            )),
677        },
678        RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
679            r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
680                protobuf::OracleNVarchar2 { length: *len },
681            )),
682        },
683        RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
684            r#type: Some(protobuf::remote_type::Type::OracleRaw(
685                protobuf::OracleRaw { length: *len },
686            )),
687        },
688        RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
689            r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
690                protobuf::OracleLongRaw {},
691            )),
692        },
693        RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
694            r#type: Some(protobuf::remote_type::Type::OracleLong(
695                protobuf::OracleLong {},
696            )),
697        },
698        RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
699            r#type: Some(protobuf::remote_type::Type::OracleClob(
700                protobuf::OracleClob {},
701            )),
702        },
703        RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
704            r#type: Some(protobuf::remote_type::Type::OracleNclob(
705                protobuf::OracleNClob {},
706            )),
707        },
708        RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
709            r#type: Some(protobuf::remote_type::Type::SqliteNull(
710                protobuf::SqliteNull {},
711            )),
712        },
713        RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
714            r#type: Some(protobuf::remote_type::Type::SqliteInteger(
715                protobuf::SqliteInteger {},
716            )),
717        },
718        RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
719            r#type: Some(protobuf::remote_type::Type::SqliteReal(
720                protobuf::SqliteReal {},
721            )),
722        },
723        RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
724            r#type: Some(protobuf::remote_type::Type::SqliteText(
725                protobuf::SqliteText {},
726            )),
727        },
728        RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
729            r#type: Some(protobuf::remote_type::Type::SqliteBlob(
730                protobuf::SqliteBlob {},
731            )),
732        },
733        RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
734            r#type: Some(protobuf::remote_type::Type::DmTinyInt(
735                protobuf::DmTinyInt {},
736            )),
737        },
738        RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
739            r#type: Some(protobuf::remote_type::Type::DmSmallInt(
740                protobuf::DmSmallInt {},
741            )),
742        },
743        RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
744            r#type: Some(protobuf::remote_type::Type::DmInteger(
745                protobuf::DmInteger {},
746            )),
747        },
748        RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
749            r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
750        },
751        RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
752            r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
753        },
754        RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
755            r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
756        },
757        RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
758            r#type: Some(protobuf::remote_type::Type::DmNumeric(
759                protobuf::DmNumeric {
760                    precision: *precision as u32,
761                    scale: *scale as i32,
762                },
763            )),
764        },
765        RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
766            r#type: Some(protobuf::remote_type::Type::DmDecimal(
767                protobuf::DmDecimal {
768                    precision: *precision as u32,
769                    scale: *scale as i32,
770                },
771            )),
772        },
773        RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
774            r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
775                length: len.map(|s| s as u32),
776            })),
777        },
778        RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
779            r#type: Some(protobuf::remote_type::Type::DmVarchar(
780                protobuf::DmVarchar {
781                    length: len.map(|s| s as u32),
782                },
783            )),
784        },
785        RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
786            r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
787        },
788        RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
789            r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
790                length: *len as u32,
791            })),
792        },
793        RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
794            r#type: Some(protobuf::remote_type::Type::DmVarbinary(
795                protobuf::DmVarbinary {
796                    length: len.map(|s| s as u32),
797                },
798            )),
799        },
800        RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
801            r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
802        },
803        RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
804            r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
805        },
806        RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
807            r#type: Some(protobuf::remote_type::Type::DmTimestamp(
808                protobuf::DmTimestamp {
809                    precision: *precision as u32,
810                },
811            )),
812        },
813        RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
814            r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
815                precision: *precision as u32,
816            })),
817        },
818        RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
819            r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
820        },
821    }
822}
823
824fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
825    let fields = remote_schema
826        .fields
827        .iter()
828        .map(parse_remote_field)
829        .collect::<Vec<_>>();
830
831    RemoteSchema { fields }
832}
833
834fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
835    RemoteField {
836        name: field.name.clone(),
837        remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
838        nullable: field.nullable,
839    }
840}
841
842fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
843    match remote_type.r#type.as_ref().unwrap() {
844        protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
845        protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
846        protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
847        protobuf::remote_type::Type::PostgresFloat4(_) => {
848            RemoteType::Postgres(PostgresType::Float4)
849        }
850        protobuf::remote_type::Type::PostgresFloat8(_) => {
851            RemoteType::Postgres(PostgresType::Float8)
852        }
853        protobuf::remote_type::Type::PostgresNumeric(numeric) => {
854            RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
855        }
856        protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
857        protobuf::remote_type::Type::PostgresVarchar(_) => {
858            RemoteType::Postgres(PostgresType::Varchar)
859        }
860        protobuf::remote_type::Type::PostgresBpchar(_) => {
861            RemoteType::Postgres(PostgresType::Bpchar)
862        }
863        protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
864        protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
865        protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
866        protobuf::remote_type::Type::PostgresTimestamp(_) => {
867            RemoteType::Postgres(PostgresType::Timestamp)
868        }
869        protobuf::remote_type::Type::PostgresTimestampTz(_) => {
870            RemoteType::Postgres(PostgresType::TimestampTz)
871        }
872        protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
873        protobuf::remote_type::Type::PostgresInterval(_) => {
874            RemoteType::Postgres(PostgresType::Interval)
875        }
876        protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
877        protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
878        protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
879        protobuf::remote_type::Type::PostgresInt2Array(_) => {
880            RemoteType::Postgres(PostgresType::Int2Array)
881        }
882        protobuf::remote_type::Type::PostgresInt4Array(_) => {
883            RemoteType::Postgres(PostgresType::Int4Array)
884        }
885        protobuf::remote_type::Type::PostgresInt8Array(_) => {
886            RemoteType::Postgres(PostgresType::Int8Array)
887        }
888        protobuf::remote_type::Type::PostgresFloat4Array(_) => {
889            RemoteType::Postgres(PostgresType::Float4Array)
890        }
891        protobuf::remote_type::Type::PostgresFloat8Array(_) => {
892            RemoteType::Postgres(PostgresType::Float8Array)
893        }
894        protobuf::remote_type::Type::PostgresVarcharArray(_) => {
895            RemoteType::Postgres(PostgresType::VarcharArray)
896        }
897        protobuf::remote_type::Type::PostgresBpcharArray(_) => {
898            RemoteType::Postgres(PostgresType::BpcharArray)
899        }
900        protobuf::remote_type::Type::PostgresTextArray(_) => {
901            RemoteType::Postgres(PostgresType::TextArray)
902        }
903        protobuf::remote_type::Type::PostgresByteaArray(_) => {
904            RemoteType::Postgres(PostgresType::ByteaArray)
905        }
906        protobuf::remote_type::Type::PostgresBoolArray(_) => {
907            RemoteType::Postgres(PostgresType::BoolArray)
908        }
909        protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
910            RemoteType::Postgres(PostgresType::PostGisGeometry)
911        }
912        protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
913        protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
914        protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
915            RemoteType::Mysql(MysqlType::TinyIntUnsigned)
916        }
917        protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
918        protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
919            RemoteType::Mysql(MysqlType::SmallIntUnsigned)
920        }
921        protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
922        protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
923            RemoteType::Mysql(MysqlType::MediumIntUnsigned)
924        }
925        protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
926        protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
927            RemoteType::Mysql(MysqlType::IntegerUnsigned)
928        }
929        protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
930        protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
931            RemoteType::Mysql(MysqlType::BigIntUnsigned)
932        }
933        protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
934        protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
935        protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
936            MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
937        ),
938        protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
939        protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
940        protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
941        protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
942        protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
943        protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
944        protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
945        protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
946        protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
947        protobuf::remote_type::Type::MysqlText(text) => {
948            RemoteType::Mysql(MysqlType::Text(text.length))
949        }
950        protobuf::remote_type::Type::MysqlBlob(blob) => {
951            RemoteType::Mysql(MysqlType::Blob(blob.length))
952        }
953        protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
954        protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
955        protobuf::remote_type::Type::OracleVarchar2(varchar) => {
956            RemoteType::Oracle(OracleType::Varchar2(varchar.length))
957        }
958        protobuf::remote_type::Type::OracleChar(char) => {
959            RemoteType::Oracle(OracleType::Char(char.length))
960        }
961        protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
962            OracleType::Number(number.precision as u8, number.scale as i8),
963        ),
964        protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
965        protobuf::remote_type::Type::OracleTimestamp(_) => {
966            RemoteType::Oracle(OracleType::Timestamp)
967        }
968        protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
969        protobuf::remote_type::Type::OracleBinaryFloat(_) => {
970            RemoteType::Oracle(OracleType::BinaryFloat)
971        }
972        protobuf::remote_type::Type::OracleBinaryDouble(_) => {
973            RemoteType::Oracle(OracleType::BinaryDouble)
974        }
975        protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
976            RemoteType::Oracle(OracleType::Float(*precision as u8))
977        }
978        protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
979            RemoteType::Oracle(OracleType::NChar(*length))
980        }
981        protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
982            RemoteType::Oracle(OracleType::NVarchar2(*length))
983        }
984        protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
985            RemoteType::Oracle(OracleType::Raw(*length))
986        }
987        protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
988        protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
989        protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
990        protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
991        protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
992        protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
993        protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
994        protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
995        protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
996        protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
997        protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
998        protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
999        protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1000        protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1001        protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1002        protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1003        protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1004            RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1005        }
1006        protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1007            RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1008        }
1009        protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1010            RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1011        }
1012        protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1013            RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1014        }
1015        protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1016        protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1017            RemoteType::Dm(DmType::Binary(*length as u16))
1018        }
1019        protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1020            RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1021        }
1022        protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1023        protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1024        protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1025            RemoteType::Dm(DmType::Timestamp(*precision as u8))
1026        }
1027        protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1028            RemoteType::Dm(DmType::Time(*precision as u8))
1029        }
1030        protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1031    }
1032}