datafusion_remote_table/
codec.rs

1#[cfg(feature = "mysql")]
2use crate::MysqlConnectionOptions;
3#[cfg(feature = "oracle")]
4use crate::OracleConnectionOptions;
5#[cfg(feature = "postgres")]
6use crate::PostgresConnectionOptions;
7use crate::generated::prost as protobuf;
8use crate::{
9    ConnectionOptions, DFResult, MysqlType, OracleType, PostgresType, RemoteField, RemoteSchema,
10    RemoteSchemaRef, RemoteTableExec, RemoteType, SqliteType, Transform, connect,
11};
12use datafusion::arrow::datatypes::SchemaRef;
13use datafusion::common::DataFusionError;
14use datafusion::execution::FunctionRegistry;
15use datafusion::physical_plan::ExecutionPlan;
16use datafusion_proto::convert_required;
17use datafusion_proto::physical_plan::PhysicalExtensionCodec;
18use datafusion_proto::protobuf::proto_error;
19use prost::Message;
20use std::fmt::Debug;
21#[cfg(feature = "sqlite")]
22use std::path::Path;
23use std::sync::Arc;
24
25pub trait TransformCodec: Debug + Send + Sync {
26    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
27    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
28}
29
30#[derive(Debug)]
31pub struct RemotePhysicalCodec {
32    transform_codec: Option<Arc<dyn TransformCodec>>,
33}
34
35impl RemotePhysicalCodec {
36    pub fn new(transform_codec: Option<Arc<dyn TransformCodec>>) -> Self {
37        Self { transform_codec }
38    }
39}
40
41impl PhysicalExtensionCodec for RemotePhysicalCodec {
42    fn try_decode(
43        &self,
44        buf: &[u8],
45        _inputs: &[Arc<dyn ExecutionPlan>],
46        _registry: &dyn FunctionRegistry,
47    ) -> DFResult<Arc<dyn ExecutionPlan>> {
48        let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
49            DataFusionError::Internal(format!(
50                "Failed to decode remote table execution plan: {e:?}"
51            ))
52        })?;
53
54        let transform = if let Some(bytes) = proto.transform {
55            let Some(transform_codec) = self.transform_codec.as_ref() else {
56                return Err(DataFusionError::Execution(
57                    "No transform codec found".to_string(),
58                ));
59            };
60            Some(transform_codec.try_decode(&bytes)?)
61        } else {
62            None
63        };
64
65        let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
66        let remote_schema = proto
67            .remote_schema
68            .map(|schema| Arc::new(parse_remote_schema(&schema)));
69
70        let projection: Option<Vec<usize>> = proto
71            .projection
72            .map(|p| p.projection.iter().map(|n| *n as usize).collect());
73
74        let limit = proto.limit.map(|l| l as usize);
75
76        let conn_options = parse_connection_options(proto.conn_options.unwrap());
77        let conn = tokio::task::block_in_place(|| {
78            tokio::runtime::Handle::current().block_on(async {
79                let pool = connect(&conn_options).await?;
80                let conn = pool.get().await?;
81                Ok::<_, DataFusionError>(conn)
82            })
83        })?;
84
85        Ok(Arc::new(RemoteTableExec::try_new(
86            conn_options,
87            proto.sql,
88            table_schema,
89            remote_schema,
90            projection,
91            limit,
92            transform,
93            conn,
94        )?))
95    }
96
97    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
98        if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
99            let serialized_transform = if let Some(transform) = exec.transform.as_ref() {
100                let Some(transform_codec) = self.transform_codec.as_ref() else {
101                    return Err(DataFusionError::Execution(
102                        "No transform codec found".to_string(),
103                    ));
104                };
105                let bytes = transform_codec.try_encode(transform.as_ref())?;
106                Some(bytes)
107            } else {
108                None
109            };
110
111            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
112            let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
113
114            let proto = protobuf::RemoteTableExec {
115                conn_options: Some(serialized_connection_options),
116                sql: exec.sql.clone(),
117                table_schema: Some(exec.table_schema.as_ref().try_into()?),
118                remote_schema,
119                projection: exec
120                    .projection
121                    .as_ref()
122                    .map(|p| serialize_projection(p.as_slice())),
123                limit: exec.limit.map(|l| l as u32),
124                transform: serialized_transform,
125            };
126
127            proto.encode(buf).map_err(|e| {
128                DataFusionError::Internal(format!(
129                    "Failed to encode remote table execution plan: {e:?}"
130                ))
131            })?;
132            Ok(())
133        } else {
134            Err(DataFusionError::Execution(format!(
135                "Failed to encode {}",
136                RemoteTableExec::static_name()
137            )))
138        }
139    }
140}
141
142fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
143    match options {
144        #[cfg(feature = "postgres")]
145        ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
146            connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
147                protobuf::PostgresConnectionOptions {
148                    host: options.host.clone(),
149                    port: options.port as u32,
150                    username: options.username.clone(),
151                    password: options.password.clone(),
152                    database: options.database.clone(),
153                    pool_max_size: options.pool_max_size as u32,
154                    stream_chunk_size: options.stream_chunk_size as u32,
155                },
156            )),
157        },
158        #[cfg(feature = "mysql")]
159        ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
160            connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
161                protobuf::MysqlConnectionOptions {
162                    host: options.host.clone(),
163                    port: options.port as u32,
164                    username: options.username.clone(),
165                    password: options.password.clone(),
166                    database: options.database.clone(),
167                    pool_max_size: options.pool_max_size as u32,
168                    stream_chunk_size: options.stream_chunk_size as u32,
169                },
170            )),
171        },
172        #[cfg(feature = "oracle")]
173        ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
174            connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
175                protobuf::OracleConnectionOptions {
176                    host: options.host.clone(),
177                    port: options.port as u32,
178                    username: options.username.clone(),
179                    password: options.password.clone(),
180                    service_name: options.service_name.clone(),
181                    pool_max_size: options.pool_max_size as u32,
182                    stream_chunk_size: options.stream_chunk_size as u32,
183                },
184            )),
185        },
186        #[cfg(feature = "sqlite")]
187        ConnectionOptions::Sqlite(path) => protobuf::ConnectionOptions {
188            connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
189                protobuf::SqliteConnectionOptions {
190                    path: path.to_str().unwrap().to_string(),
191                },
192            )),
193        },
194    }
195}
196
197fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
198    match options.connection_options {
199        #[cfg(feature = "postgres")]
200        Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
201            ConnectionOptions::Postgres(PostgresConnectionOptions {
202                host: options.host,
203                port: options.port as u16,
204                username: options.username,
205                password: options.password,
206                database: options.database,
207                pool_max_size: options.pool_max_size as usize,
208                stream_chunk_size: options.stream_chunk_size as usize,
209            })
210        }
211        #[cfg(feature = "mysql")]
212        Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
213            ConnectionOptions::Mysql(MysqlConnectionOptions {
214                host: options.host,
215                port: options.port as u16,
216                username: options.username,
217                password: options.password,
218                database: options.database,
219                pool_max_size: options.pool_max_size as usize,
220                stream_chunk_size: options.stream_chunk_size as usize,
221            })
222        }
223        #[cfg(feature = "oracle")]
224        Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
225            ConnectionOptions::Oracle(OracleConnectionOptions {
226                host: options.host,
227                port: options.port as u16,
228                username: options.username,
229                password: options.password,
230                service_name: options.service_name,
231                pool_max_size: options.pool_max_size as usize,
232                stream_chunk_size: options.stream_chunk_size as usize,
233            })
234        }
235        #[cfg(feature = "sqlite")]
236        Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
237            ConnectionOptions::Sqlite(Path::new(&options.path).to_path_buf())
238        }
239        _ => panic!("Failed to parse connection options: {options:?}"),
240    }
241}
242
243fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
244    protobuf::Projection {
245        projection: projection.iter().map(|n| *n as u32).collect(),
246    }
247}
248
249fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
250    let fields = remote_schema
251        .fields
252        .iter()
253        .map(serialize_remote_field)
254        .collect::<Vec<_>>();
255
256    protobuf::RemoteSchema { fields }
257}
258
259fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
260    protobuf::RemoteField {
261        name: remote_field.name.clone(),
262        remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
263        nullable: remote_field.nullable,
264    }
265}
266
267fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
268    match remote_type {
269        RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
270            r#type: Some(protobuf::remote_type::Type::PostgresInt2(
271                protobuf::PostgresInt2 {},
272            )),
273        },
274        RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
275            r#type: Some(protobuf::remote_type::Type::PostgresInt4(
276                protobuf::PostgresInt4 {},
277            )),
278        },
279        RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
280            r#type: Some(protobuf::remote_type::Type::PostgresInt8(
281                protobuf::PostgresInt8 {},
282            )),
283        },
284        RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
285            r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
286                protobuf::PostgresFloat4 {},
287            )),
288        },
289        RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
290            r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
291                protobuf::PostgresFloat8 {},
292            )),
293        },
294        RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
295            r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
296                protobuf::PostgresNumeric {
297                    scale: *scale as i32,
298                },
299            )),
300        },
301        RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
302            r#type: Some(protobuf::remote_type::Type::PostgresName(
303                protobuf::PostgresName {},
304            )),
305        },
306        RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
307            r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
308                protobuf::PostgresVarchar {},
309            )),
310        },
311        RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
312            r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
313                protobuf::PostgresBpchar {},
314            )),
315        },
316        RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
317            r#type: Some(protobuf::remote_type::Type::PostgresText(
318                protobuf::PostgresText {},
319            )),
320        },
321        RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
322            r#type: Some(protobuf::remote_type::Type::PostgresBytea(
323                protobuf::PostgresBytea {},
324            )),
325        },
326        RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
327            r#type: Some(protobuf::remote_type::Type::PostgresDate(
328                protobuf::PostgresDate {},
329            )),
330        },
331        RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
332            r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
333                protobuf::PostgresTimestamp {},
334            )),
335        },
336        RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
337            r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
338                protobuf::PostgresTimestampTz {},
339            )),
340        },
341        RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
342            r#type: Some(protobuf::remote_type::Type::PostgresTime(
343                protobuf::PostgresTime {},
344            )),
345        },
346        RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
347            r#type: Some(protobuf::remote_type::Type::PostgresInterval(
348                protobuf::PostgresInterval {},
349            )),
350        },
351        RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
352            r#type: Some(protobuf::remote_type::Type::PostgresBool(
353                protobuf::PostgresBool {},
354            )),
355        },
356        RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
357            r#type: Some(protobuf::remote_type::Type::PostgresJson(
358                protobuf::PostgresJson {},
359            )),
360        },
361        RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
362            r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
363                protobuf::PostgresJsonb {},
364            )),
365        },
366        RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
367            r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
368                protobuf::PostgresInt2Array {},
369            )),
370        },
371        RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
372            r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
373                protobuf::PostgresInt4Array {},
374            )),
375        },
376        RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
377            r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
378                protobuf::PostgresInt8Array {},
379            )),
380        },
381        RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
382            r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
383                protobuf::PostgresFloat4Array {},
384            )),
385        },
386        RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
387            r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
388                protobuf::PostgresFloat8Array {},
389            )),
390        },
391        RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
392            r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
393                protobuf::PostgresVarcharArray {},
394            )),
395        },
396        RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
397            r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
398                protobuf::PostgresBpcharArray {},
399            )),
400        },
401        RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
402            r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
403                protobuf::PostgresTextArray {},
404            )),
405        },
406        RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
407            r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
408                protobuf::PostgresByteaArray {},
409            )),
410        },
411        RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
412            r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
413                protobuf::PostgresBoolArray {},
414            )),
415        },
416        RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
417            r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
418                protobuf::PostgresPostGisGeometry {},
419            )),
420        },
421        RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
422            r#type: Some(protobuf::remote_type::Type::PostgresOid(
423                protobuf::PostgresOid {},
424            )),
425        },
426
427        RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
428            r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
429                protobuf::MysqlTinyInt {},
430            )),
431        },
432        RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
433            r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
434                protobuf::MysqlTinyIntUnsigned {},
435            )),
436        },
437        RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
438            r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
439                protobuf::MysqlSmallInt {},
440            )),
441        },
442        RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
443            r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
444                protobuf::MysqlSmallIntUnsigned {},
445            )),
446        },
447        RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
448            r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
449                protobuf::MysqlMediumInt {},
450            )),
451        },
452        RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
453            r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
454                protobuf::MysqlMediumIntUnsigned {},
455            )),
456        },
457        RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
458            r#type: Some(protobuf::remote_type::Type::MysqlInteger(
459                protobuf::MysqlInteger {},
460            )),
461        },
462        RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
463            r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
464                protobuf::MysqlIntegerUnsigned {},
465            )),
466        },
467        RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
468            r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
469                protobuf::MysqlBigInt {},
470            )),
471        },
472        RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
473            r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
474                protobuf::MysqlBigIntUnsigned {},
475            )),
476        },
477        RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
478            r#type: Some(protobuf::remote_type::Type::MysqlFloat(
479                protobuf::MysqlFloat {},
480            )),
481        },
482        RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
483            r#type: Some(protobuf::remote_type::Type::MysqlDouble(
484                protobuf::MysqlDouble {},
485            )),
486        },
487        RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
488            r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
489                protobuf::MysqlDecimal {
490                    precision: *precision as u32,
491                    scale: *scale as u32,
492                },
493            )),
494        },
495        RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
496            r#type: Some(protobuf::remote_type::Type::MysqlDate(
497                protobuf::MysqlDate {},
498            )),
499        },
500        RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
501            r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
502                protobuf::MysqlDateTime {},
503            )),
504        },
505        RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
506            r#type: Some(protobuf::remote_type::Type::MysqlTime(
507                protobuf::MysqlTime {},
508            )),
509        },
510        RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
511            r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
512                protobuf::MysqlTimestamp {},
513            )),
514        },
515        RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
516            r#type: Some(protobuf::remote_type::Type::MysqlYear(
517                protobuf::MysqlYear {},
518            )),
519        },
520        RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
521            r#type: Some(protobuf::remote_type::Type::MysqlChar(
522                protobuf::MysqlChar {},
523            )),
524        },
525        RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
526            r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
527                protobuf::MysqlVarchar {},
528            )),
529        },
530        RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
531            r#type: Some(protobuf::remote_type::Type::MysqlBinary(
532                protobuf::MysqlBinary {},
533            )),
534        },
535        RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
536            r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
537                protobuf::MysqlVarbinary {},
538            )),
539        },
540        RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
541            r#type: Some(protobuf::remote_type::Type::MysqlText(
542                protobuf::MysqlText { length: *len },
543            )),
544        },
545        RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
546            r#type: Some(protobuf::remote_type::Type::MysqlBlob(
547                protobuf::MysqlBlob { length: *len },
548            )),
549        },
550        RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
551            r#type: Some(protobuf::remote_type::Type::MysqlJson(
552                protobuf::MysqlJson {},
553            )),
554        },
555        RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
556            r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
557                protobuf::MysqlGeometry {},
558            )),
559        },
560
561        RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
562            r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
563                protobuf::OracleVarchar2 { length: *len },
564            )),
565        },
566        RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
567            r#type: Some(protobuf::remote_type::Type::OracleChar(
568                protobuf::OracleChar { length: *len },
569            )),
570        },
571        RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
572            r#type: Some(protobuf::remote_type::Type::OracleNumber(
573                protobuf::OracleNumber {
574                    precision: *precision as u32,
575                    scale: *scale as i32,
576                },
577            )),
578        },
579        RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
580            r#type: Some(protobuf::remote_type::Type::OracleDate(
581                protobuf::OracleDate {},
582            )),
583        },
584        RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
585            r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
586                protobuf::OracleTimestamp {},
587            )),
588        },
589        RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
590            r#type: Some(protobuf::remote_type::Type::OracleBoolean(
591                protobuf::OracleBoolean {},
592            )),
593        },
594        RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
595            r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
596                protobuf::OracleBinaryFloat {},
597            )),
598        },
599        RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
600            r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
601                protobuf::OracleBinaryDouble {},
602            )),
603        },
604        RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
605            r#type: Some(protobuf::remote_type::Type::OracleBlob(
606                protobuf::OracleBlob {},
607            )),
608        },
609        RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
610            r#type: Some(protobuf::remote_type::Type::OracleFloat(
611                protobuf::OracleFloat {
612                    precision: *precision as u32,
613                },
614            )),
615        },
616        RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
617            r#type: Some(protobuf::remote_type::Type::OracleNchar(
618                protobuf::OracleNChar { length: *len },
619            )),
620        },
621        RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
622            r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
623                protobuf::OracleNVarchar2 { length: *len },
624            )),
625        },
626        RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
627            r#type: Some(protobuf::remote_type::Type::OracleRaw(
628                protobuf::OracleRaw { length: *len },
629            )),
630        },
631        RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
632            r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
633                protobuf::OracleLongRaw {},
634            )),
635        },
636        RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
637            r#type: Some(protobuf::remote_type::Type::OracleLong(
638                protobuf::OracleLong {},
639            )),
640        },
641        RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
642            r#type: Some(protobuf::remote_type::Type::OracleClob(
643                protobuf::OracleClob {},
644            )),
645        },
646        RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
647            r#type: Some(protobuf::remote_type::Type::OracleNclob(
648                protobuf::OracleNClob {},
649            )),
650        },
651        RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
652            r#type: Some(protobuf::remote_type::Type::SqliteNull(
653                protobuf::SqliteNull {},
654            )),
655        },
656        RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
657            r#type: Some(protobuf::remote_type::Type::SqliteInteger(
658                protobuf::SqliteInteger {},
659            )),
660        },
661        RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
662            r#type: Some(protobuf::remote_type::Type::SqliteReal(
663                protobuf::SqliteReal {},
664            )),
665        },
666        RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
667            r#type: Some(protobuf::remote_type::Type::SqliteText(
668                protobuf::SqliteText {},
669            )),
670        },
671        RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
672            r#type: Some(protobuf::remote_type::Type::SqliteBlob(
673                protobuf::SqliteBlob {},
674            )),
675        },
676    }
677}
678
679fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
680    let fields = remote_schema
681        .fields
682        .iter()
683        .map(parse_remote_field)
684        .collect::<Vec<_>>();
685
686    RemoteSchema { fields }
687}
688
689fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
690    RemoteField {
691        name: field.name.clone(),
692        remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
693        nullable: field.nullable,
694    }
695}
696
697fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
698    match remote_type.r#type.as_ref().unwrap() {
699        protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
700        protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
701        protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
702        protobuf::remote_type::Type::PostgresFloat4(_) => {
703            RemoteType::Postgres(PostgresType::Float4)
704        }
705        protobuf::remote_type::Type::PostgresFloat8(_) => {
706            RemoteType::Postgres(PostgresType::Float8)
707        }
708        protobuf::remote_type::Type::PostgresNumeric(numeric) => {
709            RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
710        }
711        protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
712        protobuf::remote_type::Type::PostgresVarchar(_) => {
713            RemoteType::Postgres(PostgresType::Varchar)
714        }
715        protobuf::remote_type::Type::PostgresBpchar(_) => {
716            RemoteType::Postgres(PostgresType::Bpchar)
717        }
718        protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
719        protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
720        protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
721        protobuf::remote_type::Type::PostgresTimestamp(_) => {
722            RemoteType::Postgres(PostgresType::Timestamp)
723        }
724        protobuf::remote_type::Type::PostgresTimestampTz(_) => {
725            RemoteType::Postgres(PostgresType::TimestampTz)
726        }
727        protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
728        protobuf::remote_type::Type::PostgresInterval(_) => {
729            RemoteType::Postgres(PostgresType::Interval)
730        }
731        protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
732        protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
733        protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
734        protobuf::remote_type::Type::PostgresInt2Array(_) => {
735            RemoteType::Postgres(PostgresType::Int2Array)
736        }
737        protobuf::remote_type::Type::PostgresInt4Array(_) => {
738            RemoteType::Postgres(PostgresType::Int4Array)
739        }
740        protobuf::remote_type::Type::PostgresInt8Array(_) => {
741            RemoteType::Postgres(PostgresType::Int8Array)
742        }
743        protobuf::remote_type::Type::PostgresFloat4Array(_) => {
744            RemoteType::Postgres(PostgresType::Float4Array)
745        }
746        protobuf::remote_type::Type::PostgresFloat8Array(_) => {
747            RemoteType::Postgres(PostgresType::Float8Array)
748        }
749        protobuf::remote_type::Type::PostgresVarcharArray(_) => {
750            RemoteType::Postgres(PostgresType::VarcharArray)
751        }
752        protobuf::remote_type::Type::PostgresBpcharArray(_) => {
753            RemoteType::Postgres(PostgresType::BpcharArray)
754        }
755        protobuf::remote_type::Type::PostgresTextArray(_) => {
756            RemoteType::Postgres(PostgresType::TextArray)
757        }
758        protobuf::remote_type::Type::PostgresByteaArray(_) => {
759            RemoteType::Postgres(PostgresType::ByteaArray)
760        }
761        protobuf::remote_type::Type::PostgresBoolArray(_) => {
762            RemoteType::Postgres(PostgresType::BoolArray)
763        }
764        protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
765            RemoteType::Postgres(PostgresType::PostGisGeometry)
766        }
767        protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
768        protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
769        protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
770            RemoteType::Mysql(MysqlType::TinyIntUnsigned)
771        }
772        protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
773        protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
774            RemoteType::Mysql(MysqlType::SmallIntUnsigned)
775        }
776        protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
777        protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
778            RemoteType::Mysql(MysqlType::MediumIntUnsigned)
779        }
780        protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
781        protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
782            RemoteType::Mysql(MysqlType::IntegerUnsigned)
783        }
784        protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
785        protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
786            RemoteType::Mysql(MysqlType::BigIntUnsigned)
787        }
788        protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
789        protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
790        protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
791            MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
792        ),
793        protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
794        protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
795        protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
796        protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
797        protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
798        protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
799        protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
800        protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
801        protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
802        protobuf::remote_type::Type::MysqlText(text) => {
803            RemoteType::Mysql(MysqlType::Text(text.length))
804        }
805        protobuf::remote_type::Type::MysqlBlob(blob) => {
806            RemoteType::Mysql(MysqlType::Blob(blob.length))
807        }
808        protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
809        protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
810        protobuf::remote_type::Type::OracleVarchar2(varchar) => {
811            RemoteType::Oracle(OracleType::Varchar2(varchar.length))
812        }
813        protobuf::remote_type::Type::OracleChar(char) => {
814            RemoteType::Oracle(OracleType::Char(char.length))
815        }
816        protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
817            OracleType::Number(number.precision as u8, number.scale as i8),
818        ),
819        protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
820        protobuf::remote_type::Type::OracleTimestamp(_) => {
821            RemoteType::Oracle(OracleType::Timestamp)
822        }
823        protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
824        protobuf::remote_type::Type::OracleBinaryFloat(_) => {
825            RemoteType::Oracle(OracleType::BinaryFloat)
826        }
827        protobuf::remote_type::Type::OracleBinaryDouble(_) => {
828            RemoteType::Oracle(OracleType::BinaryDouble)
829        }
830        protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
831            RemoteType::Oracle(OracleType::Float(*precision as u8))
832        }
833        protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
834            RemoteType::Oracle(OracleType::NChar(*length))
835        }
836        protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
837            RemoteType::Oracle(OracleType::NVarchar2(*length))
838        }
839        protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
840            RemoteType::Oracle(OracleType::Raw(*length))
841        }
842        protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
843        protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
844        protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
845        protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
846        protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
847        protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
848        protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
849        protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
850        protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
851        protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
852    }
853}