datafusion_remote_table/
codec.rs

1use crate::DmConnectionOptions;
2use crate::LazyPool;
3use crate::MysqlConnectionOptions;
4use crate::OracleConnectionOptions;
5use crate::PostgresConnectionOptions;
6use crate::SqliteConnectionOptions;
7use crate::generated::prost as protobuf;
8use crate::{
9    ConnectionOptions, DFResult, DefaultLiteralizer, DefaultTransform, DmType, Literalize,
10    MysqlType, OracleType, PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef, RemoteSource,
11    RemoteTableInsertExec, RemoteTableScanExec, RemoteType, SqliteType, Transform,
12};
13use arrow::datatypes::SchemaRef;
14use datafusion_common::DataFusionError;
15use datafusion_execution::TaskContext;
16use datafusion_physical_plan::ExecutionPlan;
17use datafusion_proto::convert_required;
18use datafusion_proto::physical_plan::PhysicalExtensionCodec;
19use datafusion_proto::protobuf::proto_error;
20use derive_with::With;
21use prost::Message;
22use std::fmt::Debug;
23use std::sync::Arc;
24use std::time::Duration;
25
26pub trait TransformCodec: Debug + Send + Sync {
27    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
28    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
29}
30
31#[derive(Debug)]
32pub struct DefaultTransformCodec {}
33
34const DEFAULT_TRANSFORM_ID: &str = "__default";
35
36impl TransformCodec for DefaultTransformCodec {
37    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
38        if value.as_any().is::<DefaultTransform>() {
39            Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
40        } else {
41            Err(DataFusionError::Execution(format!(
42                "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
43            )))
44        }
45    }
46
47    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
48        if value == DEFAULT_TRANSFORM_ID.as_bytes() {
49            Ok(Arc::new(DefaultTransform {}))
50        } else {
51            Err(DataFusionError::Execution(
52                "DefaultTransformCodec only supports DefaultTransform".to_string(),
53            ))
54        }
55    }
56}
57
58pub trait LiteralizeCodec: Debug + Send + Sync {
59    fn try_encode(&self, value: &dyn Literalize) -> DFResult<Vec<u8>>;
60    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Literalize>>;
61}
62
63#[derive(Debug)]
64pub struct DefaultLiteralizeCodec {}
65
66const DEFAULT_LITERALIZE_ID: &str = "__default";
67
68impl LiteralizeCodec for DefaultLiteralizeCodec {
69    fn try_encode(&self, value: &dyn Literalize) -> DFResult<Vec<u8>> {
70        if value.as_any().is::<DefaultLiteralizer>() {
71            Ok(DEFAULT_LITERALIZE_ID.as_bytes().to_vec())
72        } else {
73            Err(DataFusionError::Execution(format!(
74                "DefaultLiteralizeCodec does not support literalize: {value:?}, please implement a custom LiteralizeCodec."
75            )))
76        }
77    }
78
79    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Literalize>> {
80        if value == DEFAULT_LITERALIZE_ID.as_bytes() {
81            Ok(Arc::new(DefaultLiteralizer {}))
82        } else {
83            Err(DataFusionError::Execution(
84                "DefaultLiteralizeCodec only supports DefaultLiteralizer".to_string(),
85            ))
86        }
87    }
88}
89
90#[derive(Debug, With)]
91pub struct RemotePhysicalCodec {
92    pub transform_codec: Arc<dyn TransformCodec>,
93    pub literalize_codec: Arc<dyn LiteralizeCodec>,
94}
95
96impl RemotePhysicalCodec {
97    pub fn new() -> Self {
98        Self {
99            transform_codec: Arc::new(DefaultTransformCodec {}),
100            literalize_codec: Arc::new(DefaultLiteralizeCodec {}),
101        }
102    }
103}
104
105impl Default for RemotePhysicalCodec {
106    fn default() -> Self {
107        Self::new()
108    }
109}
110
111impl PhysicalExtensionCodec for RemotePhysicalCodec {
112    fn try_decode(
113        &self,
114        buf: &[u8],
115        inputs: &[Arc<dyn ExecutionPlan>],
116        _ctx: &TaskContext,
117    ) -> DFResult<Arc<dyn ExecutionPlan>> {
118        let remote_table_node =
119            protobuf::RemoteTablePhysicalPlanNode::decode(buf).map_err(|e| {
120                DataFusionError::Internal(format!(
121                    "Failed to decode remote table physical plan node: {e:?}"
122                ))
123            })?;
124        let remote_table_plan = remote_table_node.remote_table_physical_plan_type.ok_or_else(|| {
125            DataFusionError::Internal(
126                "Failed to decode remote table physical plan node due to physical plan type is none".to_string()
127            )
128        })?;
129
130        match remote_table_plan {
131            protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(proto) => {
132                let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
133                    Arc::new(DefaultTransform {})
134                } else {
135                    self.transform_codec.try_decode(&proto.transform)?
136                };
137
138                let source = parse_remote_source(proto.source.as_ref().ok_or(
139                    DataFusionError::Internal("remote source is not set".to_string()),
140                )?)?;
141
142                let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
143                let remote_schema = proto
144                    .remote_schema
145                    .map(|schema| Arc::new(parse_remote_schema(&schema)));
146
147                let projection = parse_projection(proto.projection.as_ref());
148
149                let limit = proto.limit.map(|l| l as usize);
150
151                let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
152                let pool = LazyPool::new(conn_options.clone());
153
154                let row_count = proto.row_count.map(|c| c as usize);
155
156                Ok(Arc::new(RemoteTableScanExec::try_new(
157                    conn_options,
158                    pool,
159                    source,
160                    table_schema,
161                    remote_schema,
162                    projection,
163                    proto.unparsed_filters,
164                    limit,
165                    transform,
166                    row_count,
167                )?))
168            }
169            protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(
170                proto,
171            ) => {
172                if inputs.len() != 1 {
173                    return Err(DataFusionError::Internal(
174                        "RemoteTableInsertExec only support one input".to_string(),
175                    ));
176                }
177
178                let input = inputs[0].clone();
179
180                let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
181                let pool = LazyPool::new(conn_options.clone());
182                let remote_schema = Arc::new(parse_remote_schema(&proto.remote_schema.unwrap()));
183                let table = proto.table.unwrap().idents;
184
185                let literalizer = if proto.literalizer == DEFAULT_LITERALIZE_ID.as_bytes() {
186                    Arc::new(DefaultLiteralizer {})
187                } else {
188                    self.literalize_codec.try_decode(&proto.literalizer)?
189                };
190
191                Ok(Arc::new(RemoteTableInsertExec::new(
192                    input,
193                    conn_options,
194                    pool,
195                    literalizer,
196                    table,
197                    remote_schema,
198                )))
199            }
200        }
201    }
202
203    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
204        if let Some(exec) = node.as_any().downcast_ref::<RemoteTableScanExec>() {
205            let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
206                DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
207            } else {
208                let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
209                assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
210                bytes
211            };
212
213            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
214            let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
215            let serialized_source = serialize_remote_source(&exec.source);
216
217            let proto = protobuf::RemoteTablePhysicalPlanNode {
218                remote_table_physical_plan_type: Some(
219                    protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(
220                        protobuf::RemoteTableScanExec {
221                            conn_options: Some(serialized_connection_options),
222                            source: Some(serialized_source),
223                            table_schema: Some(exec.table_schema.as_ref().try_into()?),
224                            remote_schema,
225                            projection: serialize_projection(exec.projection.as_ref()),
226                            unparsed_filters: exec.unparsed_filters.clone(),
227                            limit: exec.limit.map(|l| l as u32),
228                            transform: serialized_transform,
229                            row_count: exec.row_count.map(|c| c as u32),
230                        },
231                    ),
232                ),
233            };
234
235            proto.encode(buf).map_err(|e| {
236                DataFusionError::Internal(format!(
237                    "Failed to encode remote table scan exec plan: {e:?}"
238                ))
239            })?;
240            Ok(())
241        } else if let Some(exec) = node.as_any().downcast_ref::<RemoteTableInsertExec>() {
242            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
243            let remote_schema = serialize_remote_schema(&exec.remote_schema);
244            let serialized_table = protobuf::Identifiers {
245                idents: exec.table.clone(),
246            };
247            let serialized_literalizer = self
248                .literalize_codec
249                .try_encode(exec.literalizer.as_ref())?;
250
251            let proto = protobuf::RemoteTablePhysicalPlanNode {
252                remote_table_physical_plan_type: Some(
253                    protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(
254                        protobuf::RemoteTableInsertExec {
255                            conn_options: Some(serialized_connection_options),
256                            table: Some(serialized_table),
257                            remote_schema: Some(remote_schema),
258                            literalizer: serialized_literalizer,
259                        },
260                    ),
261                ),
262            };
263
264            proto.encode(buf).map_err(|e| {
265                DataFusionError::Internal(format!(
266                    "Failed to encode remote table insert exec node: {e:?}"
267                ))
268            })?;
269
270            Ok(())
271        } else {
272            Err(DataFusionError::NotImplemented(format!(
273                "RemotePhysicalCodec does not support encoding {}",
274                node.name()
275            )))
276        }
277    }
278}
279
280fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
281    match options {
282        ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
283            connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
284                protobuf::PostgresConnectionOptions {
285                    host: options.host.clone(),
286                    port: options.port as u32,
287                    username: options.username.clone(),
288                    password: options.password.clone(),
289                    database: options.database.clone(),
290                    pool_max_size: options.pool_max_size as u32,
291                    pool_min_idle: options.pool_min_idle as u32,
292                    pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
293                    pool_ttl_check_interval: Some(serialize_duration(
294                        &options.pool_ttl_check_interval,
295                    )),
296                    stream_chunk_size: options.stream_chunk_size as u32,
297                    default_numeric_scale: options.default_numeric_scale as i32,
298                },
299            )),
300        },
301        ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
302            connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
303                protobuf::MysqlConnectionOptions {
304                    host: options.host.clone(),
305                    port: options.port as u32,
306                    username: options.username.clone(),
307                    password: options.password.clone(),
308                    database: options.database.clone(),
309                    pool_max_size: options.pool_max_size as u32,
310                    pool_min_idle: options.pool_min_idle as u32,
311                    pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
312                    pool_ttl_check_interval: Some(serialize_duration(
313                        &options.pool_ttl_check_interval,
314                    )),
315                    stream_chunk_size: options.stream_chunk_size as u32,
316                },
317            )),
318        },
319        ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
320            connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
321                protobuf::OracleConnectionOptions {
322                    host: options.host.clone(),
323                    port: options.port as u32,
324                    username: options.username.clone(),
325                    password: options.password.clone(),
326                    service_name: options.service_name.clone(),
327                    pool_max_size: options.pool_max_size as u32,
328                    pool_min_idle: options.pool_min_idle as u32,
329                    pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
330                    pool_ttl_check_interval: Some(serialize_duration(
331                        &options.pool_ttl_check_interval,
332                    )),
333                    stream_chunk_size: options.stream_chunk_size as u32,
334                },
335            )),
336        },
337        ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
338            connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
339                protobuf::SqliteConnectionOptions {
340                    path: options.path.to_str().unwrap().to_string(),
341                    stream_chunk_size: options.stream_chunk_size as u32,
342                },
343            )),
344        },
345        ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
346            connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
347                protobuf::DmConnectionOptions {
348                    host: options.host.clone(),
349                    port: options.port as u32,
350                    username: options.username.clone(),
351                    password: options.password.clone(),
352                    schema: options.schema.clone(),
353                    stream_chunk_size: options.stream_chunk_size as u32,
354                    driver: options.driver.clone(),
355                },
356            )),
357        },
358    }
359}
360
361fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
362    match options.connection_options {
363        Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
364            ConnectionOptions::Postgres(PostgresConnectionOptions {
365                host: options.host,
366                port: options.port as u16,
367                username: options.username,
368                password: options.password,
369                database: options.database,
370                pool_max_size: options.pool_max_size as usize,
371                pool_min_idle: options.pool_min_idle as usize,
372                pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
373                pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
374                stream_chunk_size: options.stream_chunk_size as usize,
375                default_numeric_scale: options.default_numeric_scale as i8,
376            })
377        }
378        Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
379            ConnectionOptions::Mysql(MysqlConnectionOptions {
380                host: options.host,
381                port: options.port as u16,
382                username: options.username,
383                password: options.password,
384                database: options.database,
385                pool_max_size: options.pool_max_size as usize,
386                pool_min_idle: options.pool_min_idle as usize,
387                pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
388                pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
389                stream_chunk_size: options.stream_chunk_size as usize,
390            })
391        }
392        Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
393            ConnectionOptions::Oracle(OracleConnectionOptions {
394                host: options.host,
395                port: options.port as u16,
396                username: options.username,
397                password: options.password,
398                service_name: options.service_name,
399                pool_max_size: options.pool_max_size as usize,
400                pool_min_idle: options.pool_min_idle as usize,
401                pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
402                pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
403                stream_chunk_size: options.stream_chunk_size as usize,
404            })
405        }
406        Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
407            ConnectionOptions::Sqlite(SqliteConnectionOptions {
408                path: std::path::Path::new(&options.path).to_path_buf(),
409                stream_chunk_size: options.stream_chunk_size as usize,
410            })
411        }
412        Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
413            ConnectionOptions::Dm(DmConnectionOptions {
414                host: options.host,
415                port: options.port as u16,
416                username: options.username,
417                password: options.password,
418                schema: options.schema,
419                stream_chunk_size: options.stream_chunk_size as usize,
420                driver: options.driver,
421            })
422        }
423        _ => panic!("Failed to parse connection options: {options:?}"),
424    }
425}
426
427fn serialize_duration(duration: &Duration) -> protobuf::Duration {
428    protobuf::Duration {
429        secs: duration.as_secs(),
430        nanos: duration.subsec_nanos(),
431    }
432}
433
434fn parse_duration(duration: &protobuf::Duration) -> Duration {
435    Duration::new(duration.secs, duration.nanos)
436}
437
438fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<protobuf::Projection> {
439    projection.map(|p| protobuf::Projection {
440        projection: p.iter().map(|n| *n as u32).collect(),
441    })
442}
443
444fn parse_projection(projection: Option<&protobuf::Projection>) -> Option<Vec<usize>> {
445    projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
446}
447
448fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
449    let fields = remote_schema
450        .fields
451        .iter()
452        .map(serialize_remote_field)
453        .collect::<Vec<_>>();
454
455    protobuf::RemoteSchema { fields }
456}
457
458fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
459    protobuf::RemoteField {
460        name: remote_field.name.clone(),
461        remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
462        nullable: remote_field.nullable,
463        auto_increment: remote_field.auto_increment,
464    }
465}
466
467fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
468    match remote_type {
469        RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
470            r#type: Some(protobuf::remote_type::Type::PostgresInt2(
471                protobuf::Empty {},
472            )),
473        },
474        RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
475            r#type: Some(protobuf::remote_type::Type::PostgresInt4(
476                protobuf::Empty {},
477            )),
478        },
479        RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
480            r#type: Some(protobuf::remote_type::Type::PostgresInt8(
481                protobuf::Empty {},
482            )),
483        },
484        RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
485            r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
486                protobuf::Empty {},
487            )),
488        },
489        RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
490            r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
491                protobuf::Empty {},
492            )),
493        },
494        RemoteType::Postgres(PostgresType::Numeric(precision, scale)) => protobuf::RemoteType {
495            r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
496                protobuf::PostgresNumeric {
497                    precision: *precision as u32,
498                    scale: *scale as i32,
499                },
500            )),
501        },
502        RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
503            r#type: Some(protobuf::remote_type::Type::PostgresName(
504                protobuf::Empty {},
505            )),
506        },
507        RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
508            r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
509                protobuf::Empty {},
510            )),
511        },
512        RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
513            r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
514                protobuf::Empty {},
515            )),
516        },
517        RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
518            r#type: Some(protobuf::remote_type::Type::PostgresText(
519                protobuf::Empty {},
520            )),
521        },
522        RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
523            r#type: Some(protobuf::remote_type::Type::PostgresBytea(
524                protobuf::Empty {},
525            )),
526        },
527        RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
528            r#type: Some(protobuf::remote_type::Type::PostgresDate(
529                protobuf::Empty {},
530            )),
531        },
532        RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
533            r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
534                protobuf::Empty {},
535            )),
536        },
537        RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
538            r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
539                protobuf::Empty {},
540            )),
541        },
542        RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
543            r#type: Some(protobuf::remote_type::Type::PostgresTime(
544                protobuf::Empty {},
545            )),
546        },
547        RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
548            r#type: Some(protobuf::remote_type::Type::PostgresInterval(
549                protobuf::Empty {},
550            )),
551        },
552        RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
553            r#type: Some(protobuf::remote_type::Type::PostgresBool(
554                protobuf::Empty {},
555            )),
556        },
557        RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
558            r#type: Some(protobuf::remote_type::Type::PostgresJson(
559                protobuf::Empty {},
560            )),
561        },
562        RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
563            r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
564                protobuf::Empty {},
565            )),
566        },
567        RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
568            r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
569                protobuf::Empty {},
570            )),
571        },
572        RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
573            r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
574                protobuf::Empty {},
575            )),
576        },
577        RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
578            r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
579                protobuf::Empty {},
580            )),
581        },
582        RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
583            r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
584                protobuf::Empty {},
585            )),
586        },
587        RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
588            r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
589                protobuf::Empty {},
590            )),
591        },
592        RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
593            r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
594                protobuf::Empty {},
595            )),
596        },
597        RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
598            r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
599                protobuf::Empty {},
600            )),
601        },
602        RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
603            r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
604                protobuf::Empty {},
605            )),
606        },
607        RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
608            r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
609                protobuf::Empty {},
610            )),
611        },
612        RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
613            r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
614                protobuf::Empty {},
615            )),
616        },
617        RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
618            r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
619                protobuf::Empty {},
620            )),
621        },
622        RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
623            r#type: Some(protobuf::remote_type::Type::PostgresOid(protobuf::Empty {})),
624        },
625        RemoteType::Postgres(PostgresType::Xml) => protobuf::RemoteType {
626            r#type: Some(protobuf::remote_type::Type::PostgresXml(protobuf::Empty {})),
627        },
628        RemoteType::Postgres(PostgresType::Uuid) => protobuf::RemoteType {
629            r#type: Some(protobuf::remote_type::Type::PostgresUuid(
630                protobuf::Empty {},
631            )),
632        },
633
634        RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
635            r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
636                protobuf::Empty {},
637            )),
638        },
639        RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
640            r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
641                protobuf::Empty {},
642            )),
643        },
644        RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
645            r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
646                protobuf::Empty {},
647            )),
648        },
649        RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
650            r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
651                protobuf::Empty {},
652            )),
653        },
654        RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
655            r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
656                protobuf::Empty {},
657            )),
658        },
659        RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
660            r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
661                protobuf::Empty {},
662            )),
663        },
664        RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
665            r#type: Some(protobuf::remote_type::Type::MysqlInteger(
666                protobuf::Empty {},
667            )),
668        },
669        RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
670            r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
671                protobuf::Empty {},
672            )),
673        },
674        RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
675            r#type: Some(protobuf::remote_type::Type::MysqlBigInt(protobuf::Empty {})),
676        },
677        RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
678            r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
679                protobuf::Empty {},
680            )),
681        },
682        RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
683            r#type: Some(protobuf::remote_type::Type::MysqlFloat(protobuf::Empty {})),
684        },
685        RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
686            r#type: Some(protobuf::remote_type::Type::MysqlDouble(protobuf::Empty {})),
687        },
688        RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
689            r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
690                protobuf::MysqlDecimal {
691                    precision: *precision as u32,
692                    scale: *scale as u32,
693                },
694            )),
695        },
696        RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
697            r#type: Some(protobuf::remote_type::Type::MysqlDate(protobuf::Empty {})),
698        },
699        RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
700            r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
701                protobuf::Empty {},
702            )),
703        },
704        RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
705            r#type: Some(protobuf::remote_type::Type::MysqlTime(protobuf::Empty {})),
706        },
707        RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
708            r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
709                protobuf::Empty {},
710            )),
711        },
712        RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
713            r#type: Some(protobuf::remote_type::Type::MysqlYear(protobuf::Empty {})),
714        },
715        RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
716            r#type: Some(protobuf::remote_type::Type::MysqlChar(protobuf::Empty {})),
717        },
718        RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
719            r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
720                protobuf::Empty {},
721            )),
722        },
723        RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
724            r#type: Some(protobuf::remote_type::Type::MysqlBinary(protobuf::Empty {})),
725        },
726        RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
727            r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
728                protobuf::Empty {},
729            )),
730        },
731        RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
732            r#type: Some(protobuf::remote_type::Type::MysqlText(
733                protobuf::MysqlText { length: *len },
734            )),
735        },
736        RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
737            r#type: Some(protobuf::remote_type::Type::MysqlBlob(
738                protobuf::MysqlBlob { length: *len },
739            )),
740        },
741        RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
742            r#type: Some(protobuf::remote_type::Type::MysqlJson(protobuf::Empty {})),
743        },
744        RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
745            r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
746                protobuf::Empty {},
747            )),
748        },
749
750        RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
751            r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
752                protobuf::OracleVarchar2 { length: *len },
753            )),
754        },
755        RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
756            r#type: Some(protobuf::remote_type::Type::OracleChar(
757                protobuf::OracleChar { length: *len },
758            )),
759        },
760        RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
761            r#type: Some(protobuf::remote_type::Type::OracleNumber(
762                protobuf::OracleNumber {
763                    precision: *precision as u32,
764                    scale: *scale as i32,
765                },
766            )),
767        },
768        RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
769            r#type: Some(protobuf::remote_type::Type::OracleDate(protobuf::Empty {})),
770        },
771        RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
772            r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
773                protobuf::Empty {},
774            )),
775        },
776        RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
777            r#type: Some(protobuf::remote_type::Type::OracleBoolean(
778                protobuf::Empty {},
779            )),
780        },
781        RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
782            r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
783                protobuf::Empty {},
784            )),
785        },
786        RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
787            r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
788                protobuf::Empty {},
789            )),
790        },
791        RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
792            r#type: Some(protobuf::remote_type::Type::OracleBlob(protobuf::Empty {})),
793        },
794        RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
795            r#type: Some(protobuf::remote_type::Type::OracleFloat(
796                protobuf::OracleFloat {
797                    precision: *precision as u32,
798                },
799            )),
800        },
801        RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
802            r#type: Some(protobuf::remote_type::Type::OracleNchar(
803                protobuf::OracleNChar { length: *len },
804            )),
805        },
806        RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
807            r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
808                protobuf::OracleNVarchar2 { length: *len },
809            )),
810        },
811        RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
812            r#type: Some(protobuf::remote_type::Type::OracleRaw(
813                protobuf::OracleRaw { length: *len },
814            )),
815        },
816        RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
817            r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
818                protobuf::Empty {},
819            )),
820        },
821        RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
822            r#type: Some(protobuf::remote_type::Type::OracleLong(protobuf::Empty {})),
823        },
824        RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
825            r#type: Some(protobuf::remote_type::Type::OracleClob(protobuf::Empty {})),
826        },
827        RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
828            r#type: Some(protobuf::remote_type::Type::OracleNclob(protobuf::Empty {})),
829        },
830        RemoteType::Oracle(OracleType::SdeGeometry) => protobuf::RemoteType {
831            r#type: Some(protobuf::remote_type::Type::OracleSdeGeometry(
832                protobuf::Empty {},
833            )),
834        },
835        RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
836            r#type: Some(protobuf::remote_type::Type::SqliteNull(protobuf::Empty {})),
837        },
838        RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
839            r#type: Some(protobuf::remote_type::Type::SqliteInteger(
840                protobuf::Empty {},
841            )),
842        },
843        RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
844            r#type: Some(protobuf::remote_type::Type::SqliteReal(protobuf::Empty {})),
845        },
846        RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
847            r#type: Some(protobuf::remote_type::Type::SqliteText(protobuf::Empty {})),
848        },
849        RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
850            r#type: Some(protobuf::remote_type::Type::SqliteBlob(protobuf::Empty {})),
851        },
852        RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
853            r#type: Some(protobuf::remote_type::Type::DmTinyInt(protobuf::Empty {})),
854        },
855        RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
856            r#type: Some(protobuf::remote_type::Type::DmSmallInt(protobuf::Empty {})),
857        },
858        RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
859            r#type: Some(protobuf::remote_type::Type::DmInteger(protobuf::Empty {})),
860        },
861        RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
862            r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::Empty {})),
863        },
864        RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
865            r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::Empty {})),
866        },
867        RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
868            r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::Empty {})),
869        },
870        RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
871            r#type: Some(protobuf::remote_type::Type::DmNumeric(
872                protobuf::DmNumeric {
873                    precision: *precision as u32,
874                    scale: *scale as i32,
875                },
876            )),
877        },
878        RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
879            r#type: Some(protobuf::remote_type::Type::DmDecimal(
880                protobuf::DmDecimal {
881                    precision: *precision as u32,
882                    scale: *scale as i32,
883                },
884            )),
885        },
886        RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
887            r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
888                length: len.map(|s| s as u32),
889            })),
890        },
891        RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
892            r#type: Some(protobuf::remote_type::Type::DmVarchar(
893                protobuf::DmVarchar {
894                    length: len.map(|s| s as u32),
895                },
896            )),
897        },
898        RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
899            r#type: Some(protobuf::remote_type::Type::DmText(protobuf::Empty {})),
900        },
901        RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
902            r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
903                length: *len as u32,
904            })),
905        },
906        RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
907            r#type: Some(protobuf::remote_type::Type::DmVarbinary(
908                protobuf::DmVarbinary {
909                    length: len.map(|s| s as u32),
910                },
911            )),
912        },
913        RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
914            r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::Empty {})),
915        },
916        RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
917            r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::Empty {})),
918        },
919        RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
920            r#type: Some(protobuf::remote_type::Type::DmTimestamp(
921                protobuf::DmTimestamp {
922                    precision: *precision as u32,
923                },
924            )),
925        },
926        RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
927            r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
928                precision: *precision as u32,
929            })),
930        },
931        RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
932            r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::Empty {})),
933        },
934    }
935}
936
937fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
938    let fields = remote_schema
939        .fields
940        .iter()
941        .map(parse_remote_field)
942        .collect::<Vec<_>>();
943
944    RemoteSchema { fields }
945}
946
947fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
948    RemoteField {
949        name: field.name.clone(),
950        remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
951        nullable: field.nullable,
952        auto_increment: field.auto_increment,
953    }
954}
955
956fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
957    match remote_type.r#type.as_ref().unwrap() {
958        protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
959        protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
960        protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
961        protobuf::remote_type::Type::PostgresFloat4(_) => {
962            RemoteType::Postgres(PostgresType::Float4)
963        }
964        protobuf::remote_type::Type::PostgresFloat8(_) => {
965            RemoteType::Postgres(PostgresType::Float8)
966        }
967        protobuf::remote_type::Type::PostgresNumeric(numeric) => RemoteType::Postgres(
968            PostgresType::Numeric(numeric.precision as u8, numeric.scale as i8),
969        ),
970        protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
971        protobuf::remote_type::Type::PostgresVarchar(_) => {
972            RemoteType::Postgres(PostgresType::Varchar)
973        }
974        protobuf::remote_type::Type::PostgresBpchar(_) => {
975            RemoteType::Postgres(PostgresType::Bpchar)
976        }
977        protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
978        protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
979        protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
980        protobuf::remote_type::Type::PostgresTimestamp(_) => {
981            RemoteType::Postgres(PostgresType::Timestamp)
982        }
983        protobuf::remote_type::Type::PostgresTimestampTz(_) => {
984            RemoteType::Postgres(PostgresType::TimestampTz)
985        }
986        protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
987        protobuf::remote_type::Type::PostgresInterval(_) => {
988            RemoteType::Postgres(PostgresType::Interval)
989        }
990        protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
991        protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
992        protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
993        protobuf::remote_type::Type::PostgresInt2Array(_) => {
994            RemoteType::Postgres(PostgresType::Int2Array)
995        }
996        protobuf::remote_type::Type::PostgresInt4Array(_) => {
997            RemoteType::Postgres(PostgresType::Int4Array)
998        }
999        protobuf::remote_type::Type::PostgresInt8Array(_) => {
1000            RemoteType::Postgres(PostgresType::Int8Array)
1001        }
1002        protobuf::remote_type::Type::PostgresFloat4Array(_) => {
1003            RemoteType::Postgres(PostgresType::Float4Array)
1004        }
1005        protobuf::remote_type::Type::PostgresFloat8Array(_) => {
1006            RemoteType::Postgres(PostgresType::Float8Array)
1007        }
1008        protobuf::remote_type::Type::PostgresVarcharArray(_) => {
1009            RemoteType::Postgres(PostgresType::VarcharArray)
1010        }
1011        protobuf::remote_type::Type::PostgresBpcharArray(_) => {
1012            RemoteType::Postgres(PostgresType::BpcharArray)
1013        }
1014        protobuf::remote_type::Type::PostgresTextArray(_) => {
1015            RemoteType::Postgres(PostgresType::TextArray)
1016        }
1017        protobuf::remote_type::Type::PostgresByteaArray(_) => {
1018            RemoteType::Postgres(PostgresType::ByteaArray)
1019        }
1020        protobuf::remote_type::Type::PostgresBoolArray(_) => {
1021            RemoteType::Postgres(PostgresType::BoolArray)
1022        }
1023        protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
1024            RemoteType::Postgres(PostgresType::PostGisGeometry)
1025        }
1026        protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
1027        protobuf::remote_type::Type::PostgresXml(_) => RemoteType::Postgres(PostgresType::Xml),
1028        protobuf::remote_type::Type::PostgresUuid(_) => RemoteType::Postgres(PostgresType::Uuid),
1029        protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
1030        protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
1031            RemoteType::Mysql(MysqlType::TinyIntUnsigned)
1032        }
1033        protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
1034        protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
1035            RemoteType::Mysql(MysqlType::SmallIntUnsigned)
1036        }
1037        protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
1038        protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
1039            RemoteType::Mysql(MysqlType::MediumIntUnsigned)
1040        }
1041        protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
1042        protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
1043            RemoteType::Mysql(MysqlType::IntegerUnsigned)
1044        }
1045        protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
1046        protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
1047            RemoteType::Mysql(MysqlType::BigIntUnsigned)
1048        }
1049        protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
1050        protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
1051        protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
1052            MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
1053        ),
1054        protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
1055        protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
1056        protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
1057        protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
1058        protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
1059        protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
1060        protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
1061        protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
1062        protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
1063        protobuf::remote_type::Type::MysqlText(text) => {
1064            RemoteType::Mysql(MysqlType::Text(text.length))
1065        }
1066        protobuf::remote_type::Type::MysqlBlob(blob) => {
1067            RemoteType::Mysql(MysqlType::Blob(blob.length))
1068        }
1069        protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
1070        protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
1071        protobuf::remote_type::Type::OracleVarchar2(varchar) => {
1072            RemoteType::Oracle(OracleType::Varchar2(varchar.length))
1073        }
1074        protobuf::remote_type::Type::OracleChar(char) => {
1075            RemoteType::Oracle(OracleType::Char(char.length))
1076        }
1077        protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
1078            OracleType::Number(number.precision as u8, number.scale as i8),
1079        ),
1080        protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
1081        protobuf::remote_type::Type::OracleTimestamp(_) => {
1082            RemoteType::Oracle(OracleType::Timestamp)
1083        }
1084        protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
1085        protobuf::remote_type::Type::OracleBinaryFloat(_) => {
1086            RemoteType::Oracle(OracleType::BinaryFloat)
1087        }
1088        protobuf::remote_type::Type::OracleBinaryDouble(_) => {
1089            RemoteType::Oracle(OracleType::BinaryDouble)
1090        }
1091        protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
1092            RemoteType::Oracle(OracleType::Float(*precision as u8))
1093        }
1094        protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
1095            RemoteType::Oracle(OracleType::NChar(*length))
1096        }
1097        protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
1098            RemoteType::Oracle(OracleType::NVarchar2(*length))
1099        }
1100        protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
1101            RemoteType::Oracle(OracleType::Raw(*length))
1102        }
1103        protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
1104        protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
1105        protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
1106        protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
1107        protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
1108        protobuf::remote_type::Type::OracleSdeGeometry(_) => {
1109            RemoteType::Oracle(OracleType::SdeGeometry)
1110        }
1111        protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1112        protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1113        protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1114        protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1115        protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1116        protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1117        protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1118        protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1119        protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1120        protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1121        protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1122        protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1123            RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1124        }
1125        protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1126            RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1127        }
1128        protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1129            RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1130        }
1131        protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1132            RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1133        }
1134        protobuf::remote_type::Type::DmText(_) => RemoteType::Dm(DmType::Text),
1135        protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1136            RemoteType::Dm(DmType::Binary(*length as u16))
1137        }
1138        protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1139            RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1140        }
1141        protobuf::remote_type::Type::DmImage(_) => RemoteType::Dm(DmType::Image),
1142        protobuf::remote_type::Type::DmBit(_) => RemoteType::Dm(DmType::Bit),
1143        protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1144            RemoteType::Dm(DmType::Timestamp(*precision as u8))
1145        }
1146        protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1147            RemoteType::Dm(DmType::Time(*precision as u8))
1148        }
1149        protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1150    }
1151}
1152
1153fn serialize_remote_source(source: &RemoteSource) -> protobuf::RemoteSource {
1154    match source {
1155        RemoteSource::Query(query) => protobuf::RemoteSource {
1156            source: Some(protobuf::remote_source::Source::Query(query.clone())),
1157        },
1158        RemoteSource::Table(table_identifiers) => protobuf::RemoteSource {
1159            source: Some(protobuf::remote_source::Source::Table(
1160                protobuf::Identifiers {
1161                    idents: table_identifiers.clone(),
1162                },
1163            )),
1164        },
1165    }
1166}
1167
1168fn parse_remote_source(source: &protobuf::RemoteSource) -> DFResult<RemoteSource> {
1169    let source = source.source.as_ref().ok_or(DataFusionError::Internal(
1170        "remote source is not set".to_string(),
1171    ))?;
1172    match source {
1173        protobuf::remote_source::Source::Query(query) => Ok(RemoteSource::Query(query.clone())),
1174        protobuf::remote_source::Source::Table(table_identifiers) => {
1175            Ok(RemoteSource::Table(table_identifiers.idents.clone()))
1176        }
1177    }
1178}