datafusion_remote_table/
codec.rs

1#[cfg(feature = "dm")]
2use crate::DmConnectionOptions;
3#[cfg(feature = "mysql")]
4use crate::MysqlConnectionOptions;
5#[cfg(feature = "oracle")]
6use crate::OracleConnectionOptions;
7#[cfg(feature = "postgres")]
8use crate::PostgresConnectionOptions;
9#[cfg(feature = "sqlite")]
10use crate::SqliteConnectionOptions;
11use crate::generated::prost as protobuf;
12use crate::{
13    ConnectionOptions, DFResult, DefaultTransform, DmType, MysqlType, OracleType, PostgresType,
14    RemoteField, RemoteSchema, RemoteSchemaRef, RemoteTableExec, RemoteType, SqliteType, Transform,
15    connect,
16};
17use datafusion::arrow::datatypes::SchemaRef;
18use datafusion::common::DataFusionError;
19use datafusion::execution::FunctionRegistry;
20use datafusion::physical_plan::ExecutionPlan;
21use datafusion_proto::convert_required;
22use datafusion_proto::physical_plan::PhysicalExtensionCodec;
23use datafusion_proto::protobuf::proto_error;
24use derive_with::With;
25use prost::Message;
26use std::fmt::Debug;
27#[cfg(feature = "sqlite")]
28use std::path::Path;
29use std::sync::Arc;
30
31pub trait TransformCodec: Debug + Send + Sync {
32    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
33    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
34}
35
36#[derive(Debug)]
37pub struct DefaultTransformCodec {}
38
39const DEFAULT_TRANSFORM_ID: &str = "__default";
40
41impl TransformCodec for DefaultTransformCodec {
42    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
43        if value.as_any().is::<DefaultTransform>() {
44            Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
45        } else {
46            Err(DataFusionError::Execution(format!(
47                "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
48            )))
49        }
50    }
51
52    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
53        if value == DEFAULT_TRANSFORM_ID.as_bytes() {
54            Ok(Arc::new(DefaultTransform {}))
55        } else {
56            Err(DataFusionError::Execution(
57                "DefaultTransformCodec only supports DefaultTransform".to_string(),
58            ))
59        }
60    }
61}
62
63#[derive(Debug, With)]
64pub struct RemotePhysicalCodec {
65    transform_codec: Arc<dyn TransformCodec>,
66}
67
68impl RemotePhysicalCodec {
69    pub fn new() -> Self {
70        Self {
71            transform_codec: Arc::new(DefaultTransformCodec {}),
72        }
73    }
74}
75
76impl Default for RemotePhysicalCodec {
77    fn default() -> Self {
78        Self::new()
79    }
80}
81
82impl PhysicalExtensionCodec for RemotePhysicalCodec {
83    fn try_decode(
84        &self,
85        buf: &[u8],
86        _inputs: &[Arc<dyn ExecutionPlan>],
87        _registry: &dyn FunctionRegistry,
88    ) -> DFResult<Arc<dyn ExecutionPlan>> {
89        let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
90            DataFusionError::Internal(format!(
91                "Failed to decode remote table execution plan: {e:?}"
92            ))
93        })?;
94
95        let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
96            Arc::new(DefaultTransform {})
97        } else {
98            self.transform_codec.try_decode(&proto.transform)?
99        };
100
101        let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
102        let remote_schema = proto
103            .remote_schema
104            .map(|schema| Arc::new(parse_remote_schema(&schema)));
105
106        let projection: Option<Vec<usize>> = proto
107            .projection
108            .map(|p| p.projection.iter().map(|n| *n as usize).collect());
109
110        let limit = proto.limit.map(|l| l as usize);
111
112        let conn_options = parse_connection_options(proto.conn_options.unwrap());
113        let conn = tokio::task::block_in_place(|| {
114            tokio::runtime::Handle::current().block_on(async {
115                let pool = connect(&conn_options).await?;
116                let conn = pool.get().await?;
117                Ok::<_, DataFusionError>(conn)
118            })
119        })?;
120
121        Ok(Arc::new(RemoteTableExec::try_new(
122            conn_options,
123            proto.sql,
124            table_schema,
125            remote_schema,
126            projection,
127            proto.unparsed_filters,
128            limit,
129            transform,
130            conn,
131        )?))
132    }
133
134    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
135        if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
136            let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
137                DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
138            } else {
139                let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
140                assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
141                bytes
142            };
143
144            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
145            let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
146
147            let proto = protobuf::RemoteTableExec {
148                conn_options: Some(serialized_connection_options),
149                sql: exec.sql.clone(),
150                table_schema: Some(exec.table_schema.as_ref().try_into()?),
151                remote_schema,
152                projection: exec
153                    .projection
154                    .as_ref()
155                    .map(|p| serialize_projection(p.as_slice())),
156                unparsed_filters: exec.unparsed_filters.clone(),
157                limit: exec.limit.map(|l| l as u32),
158                transform: serialized_transform,
159            };
160
161            proto.encode(buf).map_err(|e| {
162                DataFusionError::Internal(format!(
163                    "Failed to encode remote table execution plan: {e:?}"
164                ))
165            })?;
166            Ok(())
167        } else {
168            Err(DataFusionError::Execution(format!(
169                "Failed to encode {}",
170                RemoteTableExec::static_name()
171            )))
172        }
173    }
174}
175
176fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
177    match options {
178        #[cfg(feature = "postgres")]
179        ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
180            connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
181                protobuf::PostgresConnectionOptions {
182                    host: options.host.clone(),
183                    port: options.port as u32,
184                    username: options.username.clone(),
185                    password: options.password.clone(),
186                    database: options.database.clone(),
187                    pool_max_size: options.pool_max_size as u32,
188                    stream_chunk_size: options.stream_chunk_size as u32,
189                },
190            )),
191        },
192        #[cfg(feature = "mysql")]
193        ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
194            connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
195                protobuf::MysqlConnectionOptions {
196                    host: options.host.clone(),
197                    port: options.port as u32,
198                    username: options.username.clone(),
199                    password: options.password.clone(),
200                    database: options.database.clone(),
201                    pool_max_size: options.pool_max_size as u32,
202                    stream_chunk_size: options.stream_chunk_size as u32,
203                },
204            )),
205        },
206        #[cfg(feature = "oracle")]
207        ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
208            connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
209                protobuf::OracleConnectionOptions {
210                    host: options.host.clone(),
211                    port: options.port as u32,
212                    username: options.username.clone(),
213                    password: options.password.clone(),
214                    service_name: options.service_name.clone(),
215                    pool_max_size: options.pool_max_size as u32,
216                    stream_chunk_size: options.stream_chunk_size as u32,
217                },
218            )),
219        },
220        #[cfg(feature = "sqlite")]
221        ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
222            connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
223                protobuf::SqliteConnectionOptions {
224                    path: options.path.to_str().unwrap().to_string(),
225                    stream_chunk_size: options.stream_chunk_size as u32,
226                },
227            )),
228        },
229        #[cfg(feature = "dm")]
230        ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
231            connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
232                protobuf::DmConnectionOptions {
233                    host: options.host.clone(),
234                    port: options.port as u32,
235                    username: options.username.clone(),
236                    password: options.password.clone(),
237                    schema: options.schema.clone(),
238                    stream_chunk_size: options.stream_chunk_size as u32,
239                    driver: options.driver.clone(),
240                },
241            )),
242        },
243    }
244}
245
246fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
247    match options.connection_options {
248        #[cfg(feature = "postgres")]
249        Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
250            ConnectionOptions::Postgres(PostgresConnectionOptions {
251                host: options.host,
252                port: options.port as u16,
253                username: options.username,
254                password: options.password,
255                database: options.database,
256                pool_max_size: options.pool_max_size as usize,
257                stream_chunk_size: options.stream_chunk_size as usize,
258            })
259        }
260        #[cfg(feature = "mysql")]
261        Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
262            ConnectionOptions::Mysql(MysqlConnectionOptions {
263                host: options.host,
264                port: options.port as u16,
265                username: options.username,
266                password: options.password,
267                database: options.database,
268                pool_max_size: options.pool_max_size as usize,
269                stream_chunk_size: options.stream_chunk_size as usize,
270            })
271        }
272        #[cfg(feature = "oracle")]
273        Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
274            ConnectionOptions::Oracle(OracleConnectionOptions {
275                host: options.host,
276                port: options.port as u16,
277                username: options.username,
278                password: options.password,
279                service_name: options.service_name,
280                pool_max_size: options.pool_max_size as usize,
281                stream_chunk_size: options.stream_chunk_size as usize,
282            })
283        }
284        #[cfg(feature = "sqlite")]
285        Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
286            ConnectionOptions::Sqlite(SqliteConnectionOptions {
287                path: Path::new(&options.path).to_path_buf(),
288                stream_chunk_size: options.stream_chunk_size as usize,
289            })
290        }
291        #[cfg(feature = "dm")]
292        Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
293            ConnectionOptions::Dm(DmConnectionOptions {
294                host: options.host,
295                port: options.port as u16,
296                username: options.username,
297                password: options.password,
298                schema: options.schema,
299                stream_chunk_size: options.stream_chunk_size as usize,
300                driver: options.driver,
301            })
302        }
303        _ => panic!("Failed to parse connection options: {options:?}"),
304    }
305}
306
307fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
308    protobuf::Projection {
309        projection: projection.iter().map(|n| *n as u32).collect(),
310    }
311}
312
313fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
314    let fields = remote_schema
315        .fields
316        .iter()
317        .map(serialize_remote_field)
318        .collect::<Vec<_>>();
319
320    protobuf::RemoteSchema { fields }
321}
322
323fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
324    protobuf::RemoteField {
325        name: remote_field.name.clone(),
326        remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
327        nullable: remote_field.nullable,
328    }
329}
330
331fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
332    match remote_type {
333        RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
334            r#type: Some(protobuf::remote_type::Type::PostgresInt2(
335                protobuf::PostgresInt2 {},
336            )),
337        },
338        RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
339            r#type: Some(protobuf::remote_type::Type::PostgresInt4(
340                protobuf::PostgresInt4 {},
341            )),
342        },
343        RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
344            r#type: Some(protobuf::remote_type::Type::PostgresInt8(
345                protobuf::PostgresInt8 {},
346            )),
347        },
348        RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
349            r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
350                protobuf::PostgresFloat4 {},
351            )),
352        },
353        RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
354            r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
355                protobuf::PostgresFloat8 {},
356            )),
357        },
358        RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
359            r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
360                protobuf::PostgresNumeric {
361                    scale: *scale as i32,
362                },
363            )),
364        },
365        RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
366            r#type: Some(protobuf::remote_type::Type::PostgresName(
367                protobuf::PostgresName {},
368            )),
369        },
370        RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
371            r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
372                protobuf::PostgresVarchar {},
373            )),
374        },
375        RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
376            r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
377                protobuf::PostgresBpchar {},
378            )),
379        },
380        RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
381            r#type: Some(protobuf::remote_type::Type::PostgresText(
382                protobuf::PostgresText {},
383            )),
384        },
385        RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
386            r#type: Some(protobuf::remote_type::Type::PostgresBytea(
387                protobuf::PostgresBytea {},
388            )),
389        },
390        RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
391            r#type: Some(protobuf::remote_type::Type::PostgresDate(
392                protobuf::PostgresDate {},
393            )),
394        },
395        RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
396            r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
397                protobuf::PostgresTimestamp {},
398            )),
399        },
400        RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
401            r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
402                protobuf::PostgresTimestampTz {},
403            )),
404        },
405        RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
406            r#type: Some(protobuf::remote_type::Type::PostgresTime(
407                protobuf::PostgresTime {},
408            )),
409        },
410        RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
411            r#type: Some(protobuf::remote_type::Type::PostgresInterval(
412                protobuf::PostgresInterval {},
413            )),
414        },
415        RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
416            r#type: Some(protobuf::remote_type::Type::PostgresBool(
417                protobuf::PostgresBool {},
418            )),
419        },
420        RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
421            r#type: Some(protobuf::remote_type::Type::PostgresJson(
422                protobuf::PostgresJson {},
423            )),
424        },
425        RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
426            r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
427                protobuf::PostgresJsonb {},
428            )),
429        },
430        RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
431            r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
432                protobuf::PostgresInt2Array {},
433            )),
434        },
435        RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
436            r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
437                protobuf::PostgresInt4Array {},
438            )),
439        },
440        RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
441            r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
442                protobuf::PostgresInt8Array {},
443            )),
444        },
445        RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
446            r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
447                protobuf::PostgresFloat4Array {},
448            )),
449        },
450        RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
451            r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
452                protobuf::PostgresFloat8Array {},
453            )),
454        },
455        RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
456            r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
457                protobuf::PostgresVarcharArray {},
458            )),
459        },
460        RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
461            r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
462                protobuf::PostgresBpcharArray {},
463            )),
464        },
465        RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
466            r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
467                protobuf::PostgresTextArray {},
468            )),
469        },
470        RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
471            r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
472                protobuf::PostgresByteaArray {},
473            )),
474        },
475        RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
476            r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
477                protobuf::PostgresBoolArray {},
478            )),
479        },
480        RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
481            r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
482                protobuf::PostgresPostGisGeometry {},
483            )),
484        },
485        RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
486            r#type: Some(protobuf::remote_type::Type::PostgresOid(
487                protobuf::PostgresOid {},
488            )),
489        },
490
491        RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
492            r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
493                protobuf::MysqlTinyInt {},
494            )),
495        },
496        RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
497            r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
498                protobuf::MysqlTinyIntUnsigned {},
499            )),
500        },
501        RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
502            r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
503                protobuf::MysqlSmallInt {},
504            )),
505        },
506        RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
507            r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
508                protobuf::MysqlSmallIntUnsigned {},
509            )),
510        },
511        RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
512            r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
513                protobuf::MysqlMediumInt {},
514            )),
515        },
516        RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
517            r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
518                protobuf::MysqlMediumIntUnsigned {},
519            )),
520        },
521        RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
522            r#type: Some(protobuf::remote_type::Type::MysqlInteger(
523                protobuf::MysqlInteger {},
524            )),
525        },
526        RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
527            r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
528                protobuf::MysqlIntegerUnsigned {},
529            )),
530        },
531        RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
532            r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
533                protobuf::MysqlBigInt {},
534            )),
535        },
536        RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
537            r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
538                protobuf::MysqlBigIntUnsigned {},
539            )),
540        },
541        RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
542            r#type: Some(protobuf::remote_type::Type::MysqlFloat(
543                protobuf::MysqlFloat {},
544            )),
545        },
546        RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
547            r#type: Some(protobuf::remote_type::Type::MysqlDouble(
548                protobuf::MysqlDouble {},
549            )),
550        },
551        RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
552            r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
553                protobuf::MysqlDecimal {
554                    precision: *precision as u32,
555                    scale: *scale as u32,
556                },
557            )),
558        },
559        RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
560            r#type: Some(protobuf::remote_type::Type::MysqlDate(
561                protobuf::MysqlDate {},
562            )),
563        },
564        RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
565            r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
566                protobuf::MysqlDateTime {},
567            )),
568        },
569        RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
570            r#type: Some(protobuf::remote_type::Type::MysqlTime(
571                protobuf::MysqlTime {},
572            )),
573        },
574        RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
575            r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
576                protobuf::MysqlTimestamp {},
577            )),
578        },
579        RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
580            r#type: Some(protobuf::remote_type::Type::MysqlYear(
581                protobuf::MysqlYear {},
582            )),
583        },
584        RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
585            r#type: Some(protobuf::remote_type::Type::MysqlChar(
586                protobuf::MysqlChar {},
587            )),
588        },
589        RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
590            r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
591                protobuf::MysqlVarchar {},
592            )),
593        },
594        RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
595            r#type: Some(protobuf::remote_type::Type::MysqlBinary(
596                protobuf::MysqlBinary {},
597            )),
598        },
599        RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
600            r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
601                protobuf::MysqlVarbinary {},
602            )),
603        },
604        RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
605            r#type: Some(protobuf::remote_type::Type::MysqlText(
606                protobuf::MysqlText { length: *len },
607            )),
608        },
609        RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
610            r#type: Some(protobuf::remote_type::Type::MysqlBlob(
611                protobuf::MysqlBlob { length: *len },
612            )),
613        },
614        RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
615            r#type: Some(protobuf::remote_type::Type::MysqlJson(
616                protobuf::MysqlJson {},
617            )),
618        },
619        RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
620            r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
621                protobuf::MysqlGeometry {},
622            )),
623        },
624
625        RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
626            r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
627                protobuf::OracleVarchar2 { length: *len },
628            )),
629        },
630        RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
631            r#type: Some(protobuf::remote_type::Type::OracleChar(
632                protobuf::OracleChar { length: *len },
633            )),
634        },
635        RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
636            r#type: Some(protobuf::remote_type::Type::OracleNumber(
637                protobuf::OracleNumber {
638                    precision: *precision as u32,
639                    scale: *scale as i32,
640                },
641            )),
642        },
643        RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
644            r#type: Some(protobuf::remote_type::Type::OracleDate(
645                protobuf::OracleDate {},
646            )),
647        },
648        RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
649            r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
650                protobuf::OracleTimestamp {},
651            )),
652        },
653        RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
654            r#type: Some(protobuf::remote_type::Type::OracleBoolean(
655                protobuf::OracleBoolean {},
656            )),
657        },
658        RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
659            r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
660                protobuf::OracleBinaryFloat {},
661            )),
662        },
663        RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
664            r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
665                protobuf::OracleBinaryDouble {},
666            )),
667        },
668        RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
669            r#type: Some(protobuf::remote_type::Type::OracleBlob(
670                protobuf::OracleBlob {},
671            )),
672        },
673        RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
674            r#type: Some(protobuf::remote_type::Type::OracleFloat(
675                protobuf::OracleFloat {
676                    precision: *precision as u32,
677                },
678            )),
679        },
680        RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
681            r#type: Some(protobuf::remote_type::Type::OracleNchar(
682                protobuf::OracleNChar { length: *len },
683            )),
684        },
685        RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
686            r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
687                protobuf::OracleNVarchar2 { length: *len },
688            )),
689        },
690        RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
691            r#type: Some(protobuf::remote_type::Type::OracleRaw(
692                protobuf::OracleRaw { length: *len },
693            )),
694        },
695        RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
696            r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
697                protobuf::OracleLongRaw {},
698            )),
699        },
700        RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
701            r#type: Some(protobuf::remote_type::Type::OracleLong(
702                protobuf::OracleLong {},
703            )),
704        },
705        RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
706            r#type: Some(protobuf::remote_type::Type::OracleClob(
707                protobuf::OracleClob {},
708            )),
709        },
710        RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
711            r#type: Some(protobuf::remote_type::Type::OracleNclob(
712                protobuf::OracleNClob {},
713            )),
714        },
715        RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
716            r#type: Some(protobuf::remote_type::Type::SqliteNull(
717                protobuf::SqliteNull {},
718            )),
719        },
720        RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
721            r#type: Some(protobuf::remote_type::Type::SqliteInteger(
722                protobuf::SqliteInteger {},
723            )),
724        },
725        RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
726            r#type: Some(protobuf::remote_type::Type::SqliteReal(
727                protobuf::SqliteReal {},
728            )),
729        },
730        RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
731            r#type: Some(protobuf::remote_type::Type::SqliteText(
732                protobuf::SqliteText {},
733            )),
734        },
735        RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
736            r#type: Some(protobuf::remote_type::Type::SqliteBlob(
737                protobuf::SqliteBlob {},
738            )),
739        },
740        RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
741            r#type: Some(protobuf::remote_type::Type::DmTinyInt(
742                protobuf::DmTinyInt {},
743            )),
744        },
745        RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
746            r#type: Some(protobuf::remote_type::Type::DmSmallInt(
747                protobuf::DmSmallInt {},
748            )),
749        },
750        RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
751            r#type: Some(protobuf::remote_type::Type::DmInteger(
752                protobuf::DmInteger {},
753            )),
754        },
755        RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
756            r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
757        },
758        RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
759            r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
760        },
761        RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
762            r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
763        },
764        RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
765            r#type: Some(protobuf::remote_type::Type::DmNumeric(
766                protobuf::DmNumeric {
767                    precision: *precision as u32,
768                    scale: *scale as i32,
769                },
770            )),
771        },
772        RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
773            r#type: Some(protobuf::remote_type::Type::DmDecimal(
774                protobuf::DmDecimal {
775                    precision: *precision as u32,
776                    scale: *scale as i32,
777                },
778            )),
779        },
780        RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
781            r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
782                length: len.map(|s| s as u32),
783            })),
784        },
785        RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
786            r#type: Some(protobuf::remote_type::Type::DmVarchar(
787                protobuf::DmVarchar {
788                    length: len.map(|s| s as u32),
789                },
790            )),
791        },
792        RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
793            r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
794        },
795        RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
796            r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
797                length: *len as u32,
798            })),
799        },
800        RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
801            r#type: Some(protobuf::remote_type::Type::DmVarbinary(
802                protobuf::DmVarbinary {
803                    length: len.map(|s| s as u32),
804                },
805            )),
806        },
807        RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
808            r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
809        },
810        RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
811            r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
812        },
813        RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
814            r#type: Some(protobuf::remote_type::Type::DmTimestamp(
815                protobuf::DmTimestamp {
816                    precision: *precision as u32,
817                },
818            )),
819        },
820        RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
821            r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
822                precision: *precision as u32,
823            })),
824        },
825        RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
826            r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
827        },
828    }
829}
830
831fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
832    let fields = remote_schema
833        .fields
834        .iter()
835        .map(parse_remote_field)
836        .collect::<Vec<_>>();
837
838    RemoteSchema { fields }
839}
840
841fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
842    RemoteField {
843        name: field.name.clone(),
844        remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
845        nullable: field.nullable,
846    }
847}
848
849fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
850    match remote_type.r#type.as_ref().unwrap() {
851        protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
852        protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
853        protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
854        protobuf::remote_type::Type::PostgresFloat4(_) => {
855            RemoteType::Postgres(PostgresType::Float4)
856        }
857        protobuf::remote_type::Type::PostgresFloat8(_) => {
858            RemoteType::Postgres(PostgresType::Float8)
859        }
860        protobuf::remote_type::Type::PostgresNumeric(numeric) => {
861            RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
862        }
863        protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
864        protobuf::remote_type::Type::PostgresVarchar(_) => {
865            RemoteType::Postgres(PostgresType::Varchar)
866        }
867        protobuf::remote_type::Type::PostgresBpchar(_) => {
868            RemoteType::Postgres(PostgresType::Bpchar)
869        }
870        protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
871        protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
872        protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
873        protobuf::remote_type::Type::PostgresTimestamp(_) => {
874            RemoteType::Postgres(PostgresType::Timestamp)
875        }
876        protobuf::remote_type::Type::PostgresTimestampTz(_) => {
877            RemoteType::Postgres(PostgresType::TimestampTz)
878        }
879        protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
880        protobuf::remote_type::Type::PostgresInterval(_) => {
881            RemoteType::Postgres(PostgresType::Interval)
882        }
883        protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
884        protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
885        protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
886        protobuf::remote_type::Type::PostgresInt2Array(_) => {
887            RemoteType::Postgres(PostgresType::Int2Array)
888        }
889        protobuf::remote_type::Type::PostgresInt4Array(_) => {
890            RemoteType::Postgres(PostgresType::Int4Array)
891        }
892        protobuf::remote_type::Type::PostgresInt8Array(_) => {
893            RemoteType::Postgres(PostgresType::Int8Array)
894        }
895        protobuf::remote_type::Type::PostgresFloat4Array(_) => {
896            RemoteType::Postgres(PostgresType::Float4Array)
897        }
898        protobuf::remote_type::Type::PostgresFloat8Array(_) => {
899            RemoteType::Postgres(PostgresType::Float8Array)
900        }
901        protobuf::remote_type::Type::PostgresVarcharArray(_) => {
902            RemoteType::Postgres(PostgresType::VarcharArray)
903        }
904        protobuf::remote_type::Type::PostgresBpcharArray(_) => {
905            RemoteType::Postgres(PostgresType::BpcharArray)
906        }
907        protobuf::remote_type::Type::PostgresTextArray(_) => {
908            RemoteType::Postgres(PostgresType::TextArray)
909        }
910        protobuf::remote_type::Type::PostgresByteaArray(_) => {
911            RemoteType::Postgres(PostgresType::ByteaArray)
912        }
913        protobuf::remote_type::Type::PostgresBoolArray(_) => {
914            RemoteType::Postgres(PostgresType::BoolArray)
915        }
916        protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
917            RemoteType::Postgres(PostgresType::PostGisGeometry)
918        }
919        protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
920        protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
921        protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
922            RemoteType::Mysql(MysqlType::TinyIntUnsigned)
923        }
924        protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
925        protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
926            RemoteType::Mysql(MysqlType::SmallIntUnsigned)
927        }
928        protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
929        protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
930            RemoteType::Mysql(MysqlType::MediumIntUnsigned)
931        }
932        protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
933        protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
934            RemoteType::Mysql(MysqlType::IntegerUnsigned)
935        }
936        protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
937        protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
938            RemoteType::Mysql(MysqlType::BigIntUnsigned)
939        }
940        protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
941        protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
942        protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
943            MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
944        ),
945        protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
946        protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
947        protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
948        protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
949        protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
950        protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
951        protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
952        protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
953        protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
954        protobuf::remote_type::Type::MysqlText(text) => {
955            RemoteType::Mysql(MysqlType::Text(text.length))
956        }
957        protobuf::remote_type::Type::MysqlBlob(blob) => {
958            RemoteType::Mysql(MysqlType::Blob(blob.length))
959        }
960        protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
961        protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
962        protobuf::remote_type::Type::OracleVarchar2(varchar) => {
963            RemoteType::Oracle(OracleType::Varchar2(varchar.length))
964        }
965        protobuf::remote_type::Type::OracleChar(char) => {
966            RemoteType::Oracle(OracleType::Char(char.length))
967        }
968        protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
969            OracleType::Number(number.precision as u8, number.scale as i8),
970        ),
971        protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
972        protobuf::remote_type::Type::OracleTimestamp(_) => {
973            RemoteType::Oracle(OracleType::Timestamp)
974        }
975        protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
976        protobuf::remote_type::Type::OracleBinaryFloat(_) => {
977            RemoteType::Oracle(OracleType::BinaryFloat)
978        }
979        protobuf::remote_type::Type::OracleBinaryDouble(_) => {
980            RemoteType::Oracle(OracleType::BinaryDouble)
981        }
982        protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
983            RemoteType::Oracle(OracleType::Float(*precision as u8))
984        }
985        protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
986            RemoteType::Oracle(OracleType::NChar(*length))
987        }
988        protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
989            RemoteType::Oracle(OracleType::NVarchar2(*length))
990        }
991        protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
992            RemoteType::Oracle(OracleType::Raw(*length))
993        }
994        protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
995        protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
996        protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
997        protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
998        protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
999        protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1000        protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1001        protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1002        protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1003        protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1004        protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1005        protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1006        protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1007        protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1008        protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1009        protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1010        protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1011            RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1012        }
1013        protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1014            RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1015        }
1016        protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1017            RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1018        }
1019        protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1020            RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1021        }
1022        protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1023        protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1024            RemoteType::Dm(DmType::Binary(*length as u16))
1025        }
1026        protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1027            RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1028        }
1029        protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1030        protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1031        protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1032            RemoteType::Dm(DmType::Timestamp(*precision as u8))
1033        }
1034        protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1035            RemoteType::Dm(DmType::Time(*precision as u8))
1036        }
1037        protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1038    }
1039}