datafusion_remote_table/
codec.rs

1#[cfg(feature = "dm")]
2use crate::DmConnectionOptions;
3#[cfg(feature = "mysql")]
4use crate::MysqlConnectionOptions;
5#[cfg(feature = "oracle")]
6use crate::OracleConnectionOptions;
7#[cfg(feature = "postgres")]
8use crate::PostgresConnectionOptions;
9#[cfg(feature = "sqlite")]
10use crate::SqliteConnectionOptions;
11use crate::generated::prost as protobuf;
12use crate::{
13    ConnectionOptions, DFResult, DefaultTransform, DmType, MysqlType, OracleType, PostgresType,
14    RemoteField, RemoteSchema, RemoteSchemaRef, RemoteTableExec, RemoteType, SqliteType, Transform,
15    connect,
16};
17use datafusion::arrow::datatypes::SchemaRef;
18use datafusion::common::DataFusionError;
19use datafusion::execution::FunctionRegistry;
20use datafusion::physical_plan::ExecutionPlan;
21use datafusion_proto::convert_required;
22use datafusion_proto::physical_plan::PhysicalExtensionCodec;
23use datafusion_proto::protobuf::proto_error;
24use derive_with::With;
25use prost::Message;
26use std::fmt::Debug;
27#[cfg(feature = "sqlite")]
28use std::path::Path;
29use std::sync::Arc;
30
31pub trait TransformCodec: Debug + Send + Sync {
32    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
33    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
34}
35
36#[derive(Debug)]
37pub struct DefaultTransformCodec {}
38
39const DEFAULT_TRANSFORM_ID: &str = "__default";
40
41impl TransformCodec for DefaultTransformCodec {
42    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
43        if value.as_any().is::<DefaultTransform>() {
44            Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
45        } else {
46            Err(DataFusionError::Execution(format!(
47                "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
48            )))
49        }
50    }
51
52    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
53        if value == DEFAULT_TRANSFORM_ID.as_bytes() {
54            Ok(Arc::new(DefaultTransform {}))
55        } else {
56            Err(DataFusionError::Execution(
57                "DefaultTransformCodec only supports DefaultTransform".to_string(),
58            ))
59        }
60    }
61}
62
63#[derive(Debug, With)]
64pub struct RemotePhysicalCodec {
65    transform_codec: Arc<dyn TransformCodec>,
66}
67
68impl RemotePhysicalCodec {
69    pub fn new() -> Self {
70        Self {
71            transform_codec: Arc::new(DefaultTransformCodec {}),
72        }
73    }
74}
75
76impl Default for RemotePhysicalCodec {
77    fn default() -> Self {
78        Self::new()
79    }
80}
81
82impl PhysicalExtensionCodec for RemotePhysicalCodec {
83    fn try_decode(
84        &self,
85        buf: &[u8],
86        _inputs: &[Arc<dyn ExecutionPlan>],
87        _registry: &dyn FunctionRegistry,
88    ) -> DFResult<Arc<dyn ExecutionPlan>> {
89        let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
90            DataFusionError::Internal(format!(
91                "Failed to decode remote table execution plan: {e:?}"
92            ))
93        })?;
94
95        let transform = self.transform_codec.try_decode(&proto.transform)?;
96
97        let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
98        let remote_schema = proto
99            .remote_schema
100            .map(|schema| Arc::new(parse_remote_schema(&schema)));
101
102        let projection: Option<Vec<usize>> = proto
103            .projection
104            .map(|p| p.projection.iter().map(|n| *n as usize).collect());
105
106        let limit = proto.limit.map(|l| l as usize);
107
108        let conn_options = parse_connection_options(proto.conn_options.unwrap());
109        let conn = tokio::task::block_in_place(|| {
110            tokio::runtime::Handle::current().block_on(async {
111                let pool = connect(&conn_options).await?;
112                let conn = pool.get().await?;
113                Ok::<_, DataFusionError>(conn)
114            })
115        })?;
116
117        Ok(Arc::new(RemoteTableExec::try_new(
118            conn_options,
119            proto.sql,
120            table_schema,
121            remote_schema,
122            projection,
123            proto.unparsed_filters,
124            limit,
125            transform,
126            conn,
127        )?))
128    }
129
130    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
131        if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
132            let serialized_transform = self.transform_codec.try_encode(exec.transform.as_ref())?;
133
134            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
135            let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
136
137            let proto = protobuf::RemoteTableExec {
138                conn_options: Some(serialized_connection_options),
139                sql: exec.sql.clone(),
140                table_schema: Some(exec.table_schema.as_ref().try_into()?),
141                remote_schema,
142                projection: exec
143                    .projection
144                    .as_ref()
145                    .map(|p| serialize_projection(p.as_slice())),
146                unparsed_filters: exec.unparsed_filters.clone(),
147                limit: exec.limit.map(|l| l as u32),
148                transform: serialized_transform,
149            };
150
151            proto.encode(buf).map_err(|e| {
152                DataFusionError::Internal(format!(
153                    "Failed to encode remote table execution plan: {e:?}"
154                ))
155            })?;
156            Ok(())
157        } else {
158            Err(DataFusionError::Execution(format!(
159                "Failed to encode {}",
160                RemoteTableExec::static_name()
161            )))
162        }
163    }
164}
165
166fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
167    match options {
168        #[cfg(feature = "postgres")]
169        ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
170            connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
171                protobuf::PostgresConnectionOptions {
172                    host: options.host.clone(),
173                    port: options.port as u32,
174                    username: options.username.clone(),
175                    password: options.password.clone(),
176                    database: options.database.clone(),
177                    pool_max_size: options.pool_max_size as u32,
178                    stream_chunk_size: options.stream_chunk_size as u32,
179                },
180            )),
181        },
182        #[cfg(feature = "mysql")]
183        ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
184            connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
185                protobuf::MysqlConnectionOptions {
186                    host: options.host.clone(),
187                    port: options.port as u32,
188                    username: options.username.clone(),
189                    password: options.password.clone(),
190                    database: options.database.clone(),
191                    pool_max_size: options.pool_max_size as u32,
192                    stream_chunk_size: options.stream_chunk_size as u32,
193                },
194            )),
195        },
196        #[cfg(feature = "oracle")]
197        ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
198            connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
199                protobuf::OracleConnectionOptions {
200                    host: options.host.clone(),
201                    port: options.port as u32,
202                    username: options.username.clone(),
203                    password: options.password.clone(),
204                    service_name: options.service_name.clone(),
205                    pool_max_size: options.pool_max_size as u32,
206                    stream_chunk_size: options.stream_chunk_size as u32,
207                },
208            )),
209        },
210        #[cfg(feature = "sqlite")]
211        ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
212            connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
213                protobuf::SqliteConnectionOptions {
214                    path: options.path.to_str().unwrap().to_string(),
215                    stream_chunk_size: options.stream_chunk_size as u32,
216                },
217            )),
218        },
219        #[cfg(feature = "dm")]
220        ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
221            connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
222                protobuf::DmConnectionOptions {
223                    host: options.host.clone(),
224                    port: options.port as u32,
225                    username: options.username.clone(),
226                    password: options.password.clone(),
227                    schema: options.schema.clone(),
228                    stream_chunk_size: options.stream_chunk_size as u32,
229                    driver: options.driver.clone(),
230                },
231            )),
232        },
233    }
234}
235
236fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
237    match options.connection_options {
238        #[cfg(feature = "postgres")]
239        Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
240            ConnectionOptions::Postgres(PostgresConnectionOptions {
241                host: options.host,
242                port: options.port as u16,
243                username: options.username,
244                password: options.password,
245                database: options.database,
246                pool_max_size: options.pool_max_size as usize,
247                stream_chunk_size: options.stream_chunk_size as usize,
248            })
249        }
250        #[cfg(feature = "mysql")]
251        Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
252            ConnectionOptions::Mysql(MysqlConnectionOptions {
253                host: options.host,
254                port: options.port as u16,
255                username: options.username,
256                password: options.password,
257                database: options.database,
258                pool_max_size: options.pool_max_size as usize,
259                stream_chunk_size: options.stream_chunk_size as usize,
260            })
261        }
262        #[cfg(feature = "oracle")]
263        Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
264            ConnectionOptions::Oracle(OracleConnectionOptions {
265                host: options.host,
266                port: options.port as u16,
267                username: options.username,
268                password: options.password,
269                service_name: options.service_name,
270                pool_max_size: options.pool_max_size as usize,
271                stream_chunk_size: options.stream_chunk_size as usize,
272            })
273        }
274        #[cfg(feature = "sqlite")]
275        Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
276            ConnectionOptions::Sqlite(SqliteConnectionOptions {
277                path: Path::new(&options.path).to_path_buf(),
278                stream_chunk_size: options.stream_chunk_size as usize,
279            })
280        }
281        #[cfg(feature = "dm")]
282        Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
283            ConnectionOptions::Dm(DmConnectionOptions {
284                host: options.host,
285                port: options.port as u16,
286                username: options.username,
287                password: options.password,
288                schema: options.schema,
289                stream_chunk_size: options.stream_chunk_size as usize,
290                driver: options.driver,
291            })
292        }
293        _ => panic!("Failed to parse connection options: {options:?}"),
294    }
295}
296
297fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
298    protobuf::Projection {
299        projection: projection.iter().map(|n| *n as u32).collect(),
300    }
301}
302
303fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
304    let fields = remote_schema
305        .fields
306        .iter()
307        .map(serialize_remote_field)
308        .collect::<Vec<_>>();
309
310    protobuf::RemoteSchema { fields }
311}
312
313fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
314    protobuf::RemoteField {
315        name: remote_field.name.clone(),
316        remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
317        nullable: remote_field.nullable,
318    }
319}
320
321fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
322    match remote_type {
323        RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
324            r#type: Some(protobuf::remote_type::Type::PostgresInt2(
325                protobuf::PostgresInt2 {},
326            )),
327        },
328        RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
329            r#type: Some(protobuf::remote_type::Type::PostgresInt4(
330                protobuf::PostgresInt4 {},
331            )),
332        },
333        RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
334            r#type: Some(protobuf::remote_type::Type::PostgresInt8(
335                protobuf::PostgresInt8 {},
336            )),
337        },
338        RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
339            r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
340                protobuf::PostgresFloat4 {},
341            )),
342        },
343        RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
344            r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
345                protobuf::PostgresFloat8 {},
346            )),
347        },
348        RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
349            r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
350                protobuf::PostgresNumeric {
351                    scale: *scale as i32,
352                },
353            )),
354        },
355        RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
356            r#type: Some(protobuf::remote_type::Type::PostgresName(
357                protobuf::PostgresName {},
358            )),
359        },
360        RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
361            r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
362                protobuf::PostgresVarchar {},
363            )),
364        },
365        RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
366            r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
367                protobuf::PostgresBpchar {},
368            )),
369        },
370        RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
371            r#type: Some(protobuf::remote_type::Type::PostgresText(
372                protobuf::PostgresText {},
373            )),
374        },
375        RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
376            r#type: Some(protobuf::remote_type::Type::PostgresBytea(
377                protobuf::PostgresBytea {},
378            )),
379        },
380        RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
381            r#type: Some(protobuf::remote_type::Type::PostgresDate(
382                protobuf::PostgresDate {},
383            )),
384        },
385        RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
386            r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
387                protobuf::PostgresTimestamp {},
388            )),
389        },
390        RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
391            r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
392                protobuf::PostgresTimestampTz {},
393            )),
394        },
395        RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
396            r#type: Some(protobuf::remote_type::Type::PostgresTime(
397                protobuf::PostgresTime {},
398            )),
399        },
400        RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
401            r#type: Some(protobuf::remote_type::Type::PostgresInterval(
402                protobuf::PostgresInterval {},
403            )),
404        },
405        RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
406            r#type: Some(protobuf::remote_type::Type::PostgresBool(
407                protobuf::PostgresBool {},
408            )),
409        },
410        RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
411            r#type: Some(protobuf::remote_type::Type::PostgresJson(
412                protobuf::PostgresJson {},
413            )),
414        },
415        RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
416            r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
417                protobuf::PostgresJsonb {},
418            )),
419        },
420        RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
421            r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
422                protobuf::PostgresInt2Array {},
423            )),
424        },
425        RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
426            r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
427                protobuf::PostgresInt4Array {},
428            )),
429        },
430        RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
431            r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
432                protobuf::PostgresInt8Array {},
433            )),
434        },
435        RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
436            r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
437                protobuf::PostgresFloat4Array {},
438            )),
439        },
440        RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
441            r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
442                protobuf::PostgresFloat8Array {},
443            )),
444        },
445        RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
446            r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
447                protobuf::PostgresVarcharArray {},
448            )),
449        },
450        RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
451            r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
452                protobuf::PostgresBpcharArray {},
453            )),
454        },
455        RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
456            r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
457                protobuf::PostgresTextArray {},
458            )),
459        },
460        RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
461            r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
462                protobuf::PostgresByteaArray {},
463            )),
464        },
465        RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
466            r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
467                protobuf::PostgresBoolArray {},
468            )),
469        },
470        RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
471            r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
472                protobuf::PostgresPostGisGeometry {},
473            )),
474        },
475        RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
476            r#type: Some(protobuf::remote_type::Type::PostgresOid(
477                protobuf::PostgresOid {},
478            )),
479        },
480
481        RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
482            r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
483                protobuf::MysqlTinyInt {},
484            )),
485        },
486        RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
487            r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
488                protobuf::MysqlTinyIntUnsigned {},
489            )),
490        },
491        RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
492            r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
493                protobuf::MysqlSmallInt {},
494            )),
495        },
496        RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
497            r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
498                protobuf::MysqlSmallIntUnsigned {},
499            )),
500        },
501        RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
502            r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
503                protobuf::MysqlMediumInt {},
504            )),
505        },
506        RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
507            r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
508                protobuf::MysqlMediumIntUnsigned {},
509            )),
510        },
511        RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
512            r#type: Some(protobuf::remote_type::Type::MysqlInteger(
513                protobuf::MysqlInteger {},
514            )),
515        },
516        RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
517            r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
518                protobuf::MysqlIntegerUnsigned {},
519            )),
520        },
521        RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
522            r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
523                protobuf::MysqlBigInt {},
524            )),
525        },
526        RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
527            r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
528                protobuf::MysqlBigIntUnsigned {},
529            )),
530        },
531        RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
532            r#type: Some(protobuf::remote_type::Type::MysqlFloat(
533                protobuf::MysqlFloat {},
534            )),
535        },
536        RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
537            r#type: Some(protobuf::remote_type::Type::MysqlDouble(
538                protobuf::MysqlDouble {},
539            )),
540        },
541        RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
542            r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
543                protobuf::MysqlDecimal {
544                    precision: *precision as u32,
545                    scale: *scale as u32,
546                },
547            )),
548        },
549        RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
550            r#type: Some(protobuf::remote_type::Type::MysqlDate(
551                protobuf::MysqlDate {},
552            )),
553        },
554        RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
555            r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
556                protobuf::MysqlDateTime {},
557            )),
558        },
559        RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
560            r#type: Some(protobuf::remote_type::Type::MysqlTime(
561                protobuf::MysqlTime {},
562            )),
563        },
564        RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
565            r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
566                protobuf::MysqlTimestamp {},
567            )),
568        },
569        RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
570            r#type: Some(protobuf::remote_type::Type::MysqlYear(
571                protobuf::MysqlYear {},
572            )),
573        },
574        RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
575            r#type: Some(protobuf::remote_type::Type::MysqlChar(
576                protobuf::MysqlChar {},
577            )),
578        },
579        RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
580            r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
581                protobuf::MysqlVarchar {},
582            )),
583        },
584        RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
585            r#type: Some(protobuf::remote_type::Type::MysqlBinary(
586                protobuf::MysqlBinary {},
587            )),
588        },
589        RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
590            r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
591                protobuf::MysqlVarbinary {},
592            )),
593        },
594        RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
595            r#type: Some(protobuf::remote_type::Type::MysqlText(
596                protobuf::MysqlText { length: *len },
597            )),
598        },
599        RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
600            r#type: Some(protobuf::remote_type::Type::MysqlBlob(
601                protobuf::MysqlBlob { length: *len },
602            )),
603        },
604        RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
605            r#type: Some(protobuf::remote_type::Type::MysqlJson(
606                protobuf::MysqlJson {},
607            )),
608        },
609        RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
610            r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
611                protobuf::MysqlGeometry {},
612            )),
613        },
614
615        RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
616            r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
617                protobuf::OracleVarchar2 { length: *len },
618            )),
619        },
620        RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
621            r#type: Some(protobuf::remote_type::Type::OracleChar(
622                protobuf::OracleChar { length: *len },
623            )),
624        },
625        RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
626            r#type: Some(protobuf::remote_type::Type::OracleNumber(
627                protobuf::OracleNumber {
628                    precision: *precision as u32,
629                    scale: *scale as i32,
630                },
631            )),
632        },
633        RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
634            r#type: Some(protobuf::remote_type::Type::OracleDate(
635                protobuf::OracleDate {},
636            )),
637        },
638        RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
639            r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
640                protobuf::OracleTimestamp {},
641            )),
642        },
643        RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
644            r#type: Some(protobuf::remote_type::Type::OracleBoolean(
645                protobuf::OracleBoolean {},
646            )),
647        },
648        RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
649            r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
650                protobuf::OracleBinaryFloat {},
651            )),
652        },
653        RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
654            r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
655                protobuf::OracleBinaryDouble {},
656            )),
657        },
658        RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
659            r#type: Some(protobuf::remote_type::Type::OracleBlob(
660                protobuf::OracleBlob {},
661            )),
662        },
663        RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
664            r#type: Some(protobuf::remote_type::Type::OracleFloat(
665                protobuf::OracleFloat {
666                    precision: *precision as u32,
667                },
668            )),
669        },
670        RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
671            r#type: Some(protobuf::remote_type::Type::OracleNchar(
672                protobuf::OracleNChar { length: *len },
673            )),
674        },
675        RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
676            r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
677                protobuf::OracleNVarchar2 { length: *len },
678            )),
679        },
680        RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
681            r#type: Some(protobuf::remote_type::Type::OracleRaw(
682                protobuf::OracleRaw { length: *len },
683            )),
684        },
685        RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
686            r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
687                protobuf::OracleLongRaw {},
688            )),
689        },
690        RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
691            r#type: Some(protobuf::remote_type::Type::OracleLong(
692                protobuf::OracleLong {},
693            )),
694        },
695        RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
696            r#type: Some(protobuf::remote_type::Type::OracleClob(
697                protobuf::OracleClob {},
698            )),
699        },
700        RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
701            r#type: Some(protobuf::remote_type::Type::OracleNclob(
702                protobuf::OracleNClob {},
703            )),
704        },
705        RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
706            r#type: Some(protobuf::remote_type::Type::SqliteNull(
707                protobuf::SqliteNull {},
708            )),
709        },
710        RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
711            r#type: Some(protobuf::remote_type::Type::SqliteInteger(
712                protobuf::SqliteInteger {},
713            )),
714        },
715        RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
716            r#type: Some(protobuf::remote_type::Type::SqliteReal(
717                protobuf::SqliteReal {},
718            )),
719        },
720        RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
721            r#type: Some(protobuf::remote_type::Type::SqliteText(
722                protobuf::SqliteText {},
723            )),
724        },
725        RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
726            r#type: Some(protobuf::remote_type::Type::SqliteBlob(
727                protobuf::SqliteBlob {},
728            )),
729        },
730        RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
731            r#type: Some(protobuf::remote_type::Type::DmTinyInt(
732                protobuf::DmTinyInt {},
733            )),
734        },
735        RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
736            r#type: Some(protobuf::remote_type::Type::DmSmallInt(
737                protobuf::DmSmallInt {},
738            )),
739        },
740        RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
741            r#type: Some(protobuf::remote_type::Type::DmInteger(
742                protobuf::DmInteger {},
743            )),
744        },
745        RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
746            r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
747        },
748        RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
749            r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
750        },
751        RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
752            r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
753        },
754        RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
755            r#type: Some(protobuf::remote_type::Type::DmNumeric(
756                protobuf::DmNumeric {
757                    precision: *precision as u32,
758                    scale: *scale as i32,
759                },
760            )),
761        },
762        RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
763            r#type: Some(protobuf::remote_type::Type::DmDecimal(
764                protobuf::DmDecimal {
765                    precision: *precision as u32,
766                    scale: *scale as i32,
767                },
768            )),
769        },
770        RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
771            r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
772                length: len.map(|s| s as u32),
773            })),
774        },
775        RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
776            r#type: Some(protobuf::remote_type::Type::DmVarchar(
777                protobuf::DmVarchar {
778                    length: len.map(|s| s as u32),
779                },
780            )),
781        },
782        RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
783            r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
784        },
785        RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
786            r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
787                length: *len as u32,
788            })),
789        },
790        RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
791            r#type: Some(protobuf::remote_type::Type::DmVarbinary(
792                protobuf::DmVarbinary {
793                    length: len.map(|s| s as u32),
794                },
795            )),
796        },
797        RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
798            r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
799        },
800        RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
801            r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
802        },
803        RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
804            r#type: Some(protobuf::remote_type::Type::DmTimestamp(
805                protobuf::DmTimestamp {
806                    precision: *precision as u32,
807                },
808            )),
809        },
810        RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
811            r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
812                precision: *precision as u32,
813            })),
814        },
815        RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
816            r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
817        },
818    }
819}
820
821fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
822    let fields = remote_schema
823        .fields
824        .iter()
825        .map(parse_remote_field)
826        .collect::<Vec<_>>();
827
828    RemoteSchema { fields }
829}
830
831fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
832    RemoteField {
833        name: field.name.clone(),
834        remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
835        nullable: field.nullable,
836    }
837}
838
839fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
840    match remote_type.r#type.as_ref().unwrap() {
841        protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
842        protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
843        protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
844        protobuf::remote_type::Type::PostgresFloat4(_) => {
845            RemoteType::Postgres(PostgresType::Float4)
846        }
847        protobuf::remote_type::Type::PostgresFloat8(_) => {
848            RemoteType::Postgres(PostgresType::Float8)
849        }
850        protobuf::remote_type::Type::PostgresNumeric(numeric) => {
851            RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
852        }
853        protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
854        protobuf::remote_type::Type::PostgresVarchar(_) => {
855            RemoteType::Postgres(PostgresType::Varchar)
856        }
857        protobuf::remote_type::Type::PostgresBpchar(_) => {
858            RemoteType::Postgres(PostgresType::Bpchar)
859        }
860        protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
861        protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
862        protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
863        protobuf::remote_type::Type::PostgresTimestamp(_) => {
864            RemoteType::Postgres(PostgresType::Timestamp)
865        }
866        protobuf::remote_type::Type::PostgresTimestampTz(_) => {
867            RemoteType::Postgres(PostgresType::TimestampTz)
868        }
869        protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
870        protobuf::remote_type::Type::PostgresInterval(_) => {
871            RemoteType::Postgres(PostgresType::Interval)
872        }
873        protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
874        protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
875        protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
876        protobuf::remote_type::Type::PostgresInt2Array(_) => {
877            RemoteType::Postgres(PostgresType::Int2Array)
878        }
879        protobuf::remote_type::Type::PostgresInt4Array(_) => {
880            RemoteType::Postgres(PostgresType::Int4Array)
881        }
882        protobuf::remote_type::Type::PostgresInt8Array(_) => {
883            RemoteType::Postgres(PostgresType::Int8Array)
884        }
885        protobuf::remote_type::Type::PostgresFloat4Array(_) => {
886            RemoteType::Postgres(PostgresType::Float4Array)
887        }
888        protobuf::remote_type::Type::PostgresFloat8Array(_) => {
889            RemoteType::Postgres(PostgresType::Float8Array)
890        }
891        protobuf::remote_type::Type::PostgresVarcharArray(_) => {
892            RemoteType::Postgres(PostgresType::VarcharArray)
893        }
894        protobuf::remote_type::Type::PostgresBpcharArray(_) => {
895            RemoteType::Postgres(PostgresType::BpcharArray)
896        }
897        protobuf::remote_type::Type::PostgresTextArray(_) => {
898            RemoteType::Postgres(PostgresType::TextArray)
899        }
900        protobuf::remote_type::Type::PostgresByteaArray(_) => {
901            RemoteType::Postgres(PostgresType::ByteaArray)
902        }
903        protobuf::remote_type::Type::PostgresBoolArray(_) => {
904            RemoteType::Postgres(PostgresType::BoolArray)
905        }
906        protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
907            RemoteType::Postgres(PostgresType::PostGisGeometry)
908        }
909        protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
910        protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
911        protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
912            RemoteType::Mysql(MysqlType::TinyIntUnsigned)
913        }
914        protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
915        protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
916            RemoteType::Mysql(MysqlType::SmallIntUnsigned)
917        }
918        protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
919        protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
920            RemoteType::Mysql(MysqlType::MediumIntUnsigned)
921        }
922        protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
923        protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
924            RemoteType::Mysql(MysqlType::IntegerUnsigned)
925        }
926        protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
927        protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
928            RemoteType::Mysql(MysqlType::BigIntUnsigned)
929        }
930        protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
931        protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
932        protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
933            MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
934        ),
935        protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
936        protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
937        protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
938        protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
939        protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
940        protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
941        protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
942        protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
943        protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
944        protobuf::remote_type::Type::MysqlText(text) => {
945            RemoteType::Mysql(MysqlType::Text(text.length))
946        }
947        protobuf::remote_type::Type::MysqlBlob(blob) => {
948            RemoteType::Mysql(MysqlType::Blob(blob.length))
949        }
950        protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
951        protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
952        protobuf::remote_type::Type::OracleVarchar2(varchar) => {
953            RemoteType::Oracle(OracleType::Varchar2(varchar.length))
954        }
955        protobuf::remote_type::Type::OracleChar(char) => {
956            RemoteType::Oracle(OracleType::Char(char.length))
957        }
958        protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
959            OracleType::Number(number.precision as u8, number.scale as i8),
960        ),
961        protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
962        protobuf::remote_type::Type::OracleTimestamp(_) => {
963            RemoteType::Oracle(OracleType::Timestamp)
964        }
965        protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
966        protobuf::remote_type::Type::OracleBinaryFloat(_) => {
967            RemoteType::Oracle(OracleType::BinaryFloat)
968        }
969        protobuf::remote_type::Type::OracleBinaryDouble(_) => {
970            RemoteType::Oracle(OracleType::BinaryDouble)
971        }
972        protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
973            RemoteType::Oracle(OracleType::Float(*precision as u8))
974        }
975        protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
976            RemoteType::Oracle(OracleType::NChar(*length))
977        }
978        protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
979            RemoteType::Oracle(OracleType::NVarchar2(*length))
980        }
981        protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
982            RemoteType::Oracle(OracleType::Raw(*length))
983        }
984        protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
985        protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
986        protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
987        protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
988        protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
989        protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
990        protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
991        protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
992        protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
993        protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
994        protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
995        protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
996        protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
997        protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
998        protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
999        protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1000        protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1001            RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1002        }
1003        protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1004            RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1005        }
1006        protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1007            RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1008        }
1009        protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1010            RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1011        }
1012        protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1013        protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1014            RemoteType::Dm(DmType::Binary(*length as u16))
1015        }
1016        protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1017            RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1018        }
1019        protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1020        protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1021        protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1022            RemoteType::Dm(DmType::Timestamp(*precision as u8))
1023        }
1024        protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1025            RemoteType::Dm(DmType::Time(*precision as u8))
1026        }
1027        protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1028    }
1029}