datafusion_remote_table/
codec.rs

1use crate::DmConnectionOptions;
2use crate::MysqlConnectionOptions;
3use crate::OracleConnectionOptions;
4use crate::PostgresConnectionOptions;
5use crate::SqliteConnectionOptions;
6use crate::generated::prost as protobuf;
7use crate::{
8    Connection, ConnectionOptions, DFResult, DefaultLiteralizer, DefaultTransform, DmType,
9    Literalize, MysqlType, OracleType, PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef,
10    RemoteSource, RemoteTableInsertExec, RemoteTableScanExec, RemoteType, SqliteType, Transform,
11    connect,
12};
13use datafusion::arrow::datatypes::SchemaRef;
14use datafusion::common::DataFusionError;
15use datafusion::execution::FunctionRegistry;
16use datafusion::physical_plan::ExecutionPlan;
17use datafusion_proto::convert_required;
18use datafusion_proto::physical_plan::PhysicalExtensionCodec;
19use datafusion_proto::protobuf::proto_error;
20use derive_with::With;
21use log::debug;
22use prost::Message;
23use std::fmt::Debug;
24use std::sync::Arc;
25use std::time::Duration;
26
27pub trait TransformCodec: Debug + Send + Sync {
28    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
29    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
30}
31
32#[derive(Debug)]
33pub struct DefaultTransformCodec {}
34
35const DEFAULT_TRANSFORM_ID: &str = "__default";
36
37impl TransformCodec for DefaultTransformCodec {
38    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
39        if value.as_any().is::<DefaultTransform>() {
40            Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
41        } else {
42            Err(DataFusionError::Execution(format!(
43                "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
44            )))
45        }
46    }
47
48    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
49        if value == DEFAULT_TRANSFORM_ID.as_bytes() {
50            Ok(Arc::new(DefaultTransform {}))
51        } else {
52            Err(DataFusionError::Execution(
53                "DefaultTransformCodec only supports DefaultTransform".to_string(),
54            ))
55        }
56    }
57}
58
59pub trait LiteralizeCodec: Debug + Send + Sync {
60    fn try_encode(&self, value: &dyn Literalize) -> DFResult<Vec<u8>>;
61    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Literalize>>;
62}
63
64#[derive(Debug)]
65pub struct DefaultLiteralizeCodec {}
66
67const DEFAULT_LITERALIZE_ID: &str = "__default";
68
69impl LiteralizeCodec for DefaultLiteralizeCodec {
70    fn try_encode(&self, value: &dyn Literalize) -> DFResult<Vec<u8>> {
71        if value.as_any().is::<DefaultLiteralizer>() {
72            Ok(DEFAULT_LITERALIZE_ID.as_bytes().to_vec())
73        } else {
74            Err(DataFusionError::Execution(format!(
75                "DefaultLiteralizeCodec does not support literalize: {value:?}, please implement a custom LiteralizeCodec."
76            )))
77        }
78    }
79
80    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Literalize>> {
81        if value == DEFAULT_LITERALIZE_ID.as_bytes() {
82            Ok(Arc::new(DefaultLiteralizer {}))
83        } else {
84            Err(DataFusionError::Execution(
85                "DefaultLiteralizeCodec only supports DefaultLiteralizer".to_string(),
86            ))
87        }
88    }
89}
90
91pub trait ConnectionCodec: Debug + Send + Sync {
92    fn try_encode(
93        &self,
94        value: &dyn Connection,
95        conn_options: &ConnectionOptions,
96    ) -> DFResult<Vec<u8>>;
97    fn try_decode(
98        &self,
99        value: &[u8],
100        conn_options: &ConnectionOptions,
101    ) -> DFResult<Arc<dyn Connection>>;
102}
103
104#[derive(Debug)]
105pub struct DefaultConnectionCodec;
106
107const DEFAULT_CONNECTION_VALUE: &str = "__default";
108
109impl ConnectionCodec for DefaultConnectionCodec {
110    fn try_encode(
111        &self,
112        _value: &dyn Connection,
113        _conn_options: &ConnectionOptions,
114    ) -> DFResult<Vec<u8>> {
115        Ok(DEFAULT_CONNECTION_VALUE.as_bytes().to_vec())
116    }
117
118    fn try_decode(
119        &self,
120        value: &[u8],
121        conn_options: &ConnectionOptions,
122    ) -> DFResult<Arc<dyn Connection>> {
123        if value != DEFAULT_CONNECTION_VALUE.as_bytes() {
124            return Err(DataFusionError::Execution(format!(
125                "DefaultConnectionCodec only supports {DEFAULT_CONNECTION_VALUE} value but got: {value:?}"
126            )));
127        }
128        let conn_options = conn_options.clone().with_pool_max_size(1);
129        tokio::task::block_in_place(|| {
130            tokio::runtime::Handle::current().block_on(async {
131                let pool = connect(&conn_options).await?;
132                let conn = pool.get().await?;
133                Ok::<_, DataFusionError>(conn)
134            })
135        })
136    }
137}
138
139#[derive(Debug, With)]
140pub struct RemotePhysicalCodec {
141    pub transform_codec: Arc<dyn TransformCodec>,
142    pub literalize_codec: Arc<dyn LiteralizeCodec>,
143    pub connection_codec: Arc<dyn ConnectionCodec>,
144}
145
146impl RemotePhysicalCodec {
147    pub fn new() -> Self {
148        Self {
149            transform_codec: Arc::new(DefaultTransformCodec {}),
150            literalize_codec: Arc::new(DefaultLiteralizeCodec {}),
151            connection_codec: Arc::new(DefaultConnectionCodec {}),
152        }
153    }
154}
155
156impl Default for RemotePhysicalCodec {
157    fn default() -> Self {
158        Self::new()
159    }
160}
161
162impl PhysicalExtensionCodec for RemotePhysicalCodec {
163    fn try_decode(
164        &self,
165        buf: &[u8],
166        inputs: &[Arc<dyn ExecutionPlan>],
167        _registry: &dyn FunctionRegistry,
168    ) -> DFResult<Arc<dyn ExecutionPlan>> {
169        let remote_table_node =
170            protobuf::RemoteTablePhysicalPlanNode::decode(buf).map_err(|e| {
171                DataFusionError::Internal(format!(
172                    "Failed to decode remote table physical plan node: {e:?}"
173                ))
174            })?;
175        let remote_table_plan = remote_table_node.remote_table_physical_plan_type.ok_or_else(|| {
176            DataFusionError::Internal(
177                "Failed to decode remote table physical plan node due to physical plan type is none".to_string()
178            )
179        })?;
180
181        match remote_table_plan {
182            protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(proto) => {
183                let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
184                    Arc::new(DefaultTransform {})
185                } else {
186                    self.transform_codec.try_decode(&proto.transform)?
187                };
188
189                let source = parse_remote_source(proto.source.as_ref().ok_or(
190                    DataFusionError::Internal("remote source is not set".to_string()),
191                )?)?;
192
193                let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
194                let remote_schema = proto
195                    .remote_schema
196                    .map(|schema| Arc::new(parse_remote_schema(&schema)));
197
198                let projection = parse_projection(proto.projection.as_ref());
199
200                let limit = proto.limit.map(|l| l as usize);
201
202                let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
203
204                let now = std::time::Instant::now();
205                let conn = self
206                    .connection_codec
207                    .try_decode(&proto.connection, &conn_options)?;
208                debug!(
209                    "[remote-table] Decoding connection cost {}ms",
210                    now.elapsed().as_millis()
211                );
212
213                Ok(Arc::new(RemoteTableScanExec::try_new(
214                    conn_options,
215                    source,
216                    table_schema,
217                    remote_schema,
218                    projection,
219                    proto.unparsed_filters,
220                    limit,
221                    transform,
222                    conn,
223                )?))
224            }
225            protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(
226                proto,
227            ) => {
228                if inputs.len() != 1 {
229                    return Err(DataFusionError::Internal(
230                        "RemoteTableInsertExec only support one input".to_string(),
231                    ));
232                }
233
234                let input = inputs[0].clone();
235
236                let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
237                let remote_schema = Arc::new(parse_remote_schema(&proto.remote_schema.unwrap()));
238                let table = proto.table.unwrap().idents;
239
240                let literalizer = if proto.literalizer == DEFAULT_LITERALIZE_ID.as_bytes() {
241                    Arc::new(DefaultLiteralizer {})
242                } else {
243                    self.literalize_codec.try_decode(&proto.literalizer)?
244                };
245
246                let now = std::time::Instant::now();
247                let conn = self
248                    .connection_codec
249                    .try_decode(&proto.connection, &conn_options)?;
250                debug!(
251                    "[remote-table] Decoding connection cost {}ms",
252                    now.elapsed().as_millis()
253                );
254
255                Ok(Arc::new(RemoteTableInsertExec::new(
256                    input,
257                    conn_options,
258                    literalizer,
259                    table,
260                    remote_schema,
261                    conn,
262                )))
263            }
264        }
265    }
266
267    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
268        if let Some(exec) = node.as_any().downcast_ref::<RemoteTableScanExec>() {
269            let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
270                DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
271            } else {
272                let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
273                assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
274                bytes
275            };
276
277            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
278            let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
279
280            let serialized_conn = self
281                .connection_codec
282                .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
283
284            let serialized_source = serialize_remote_source(&exec.source);
285
286            let proto = protobuf::RemoteTablePhysicalPlanNode {
287                remote_table_physical_plan_type: Some(
288                    protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(
289                        protobuf::RemoteTableScanExec {
290                            conn_options: Some(serialized_connection_options),
291                            source: Some(serialized_source),
292                            table_schema: Some(exec.table_schema.as_ref().try_into()?),
293                            remote_schema,
294                            projection: serialize_projection(exec.projection.as_ref()),
295                            unparsed_filters: exec.unparsed_filters.clone(),
296                            limit: exec.limit.map(|l| l as u32),
297                            transform: serialized_transform,
298                            connection: serialized_conn,
299                        },
300                    ),
301                ),
302            };
303
304            proto.encode(buf).map_err(|e| {
305                DataFusionError::Internal(format!(
306                    "Failed to encode remote table scan exec plan: {e:?}"
307                ))
308            })?;
309            Ok(())
310        } else if let Some(exec) = node.as_any().downcast_ref::<RemoteTableInsertExec>() {
311            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
312            let remote_schema = serialize_remote_schema(&exec.remote_schema);
313            let serialized_table = protobuf::Identifiers {
314                idents: exec.table.clone(),
315            };
316            let serialized_literalizer = self
317                .literalize_codec
318                .try_encode(exec.literalizer.as_ref())?;
319            let serialized_conn = self
320                .connection_codec
321                .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
322
323            let proto = protobuf::RemoteTablePhysicalPlanNode {
324                remote_table_physical_plan_type: Some(
325                    protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(
326                        protobuf::RemoteTableInsertExec {
327                            conn_options: Some(serialized_connection_options),
328                            table: Some(serialized_table),
329                            remote_schema: Some(remote_schema),
330                            literalizer: serialized_literalizer,
331                            connection: serialized_conn,
332                        },
333                    ),
334                ),
335            };
336
337            proto.encode(buf).map_err(|e| {
338                DataFusionError::Internal(format!(
339                    "Failed to encode remote table insert exec node: {e:?}"
340                ))
341            })?;
342
343            Ok(())
344        } else {
345            Err(DataFusionError::NotImplemented(format!(
346                "RemotePhysicalCodec does not support encoding {}",
347                node.name()
348            )))
349        }
350    }
351}
352
353fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
354    match options {
355        ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
356            connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
357                protobuf::PostgresConnectionOptions {
358                    host: options.host.clone(),
359                    port: options.port as u32,
360                    username: options.username.clone(),
361                    password: options.password.clone(),
362                    database: options.database.clone(),
363                    pool_max_size: options.pool_max_size as u32,
364                    pool_min_idle: options.pool_min_idle as u32,
365                    pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
366                    pool_ttl_check_interval: Some(serialize_duration(
367                        &options.pool_ttl_check_interval,
368                    )),
369                    stream_chunk_size: options.stream_chunk_size as u32,
370                    default_numeric_scale: options.default_numeric_scale as i32,
371                },
372            )),
373        },
374        ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
375            connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
376                protobuf::MysqlConnectionOptions {
377                    host: options.host.clone(),
378                    port: options.port as u32,
379                    username: options.username.clone(),
380                    password: options.password.clone(),
381                    database: options.database.clone(),
382                    pool_max_size: options.pool_max_size as u32,
383                    pool_min_idle: options.pool_min_idle as u32,
384                    pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
385                    pool_ttl_check_interval: Some(serialize_duration(
386                        &options.pool_ttl_check_interval,
387                    )),
388                    stream_chunk_size: options.stream_chunk_size as u32,
389                },
390            )),
391        },
392        ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
393            connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
394                protobuf::OracleConnectionOptions {
395                    host: options.host.clone(),
396                    port: options.port as u32,
397                    username: options.username.clone(),
398                    password: options.password.clone(),
399                    service_name: options.service_name.clone(),
400                    pool_max_size: options.pool_max_size as u32,
401                    pool_min_idle: options.pool_min_idle as u32,
402                    pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
403                    pool_ttl_check_interval: Some(serialize_duration(
404                        &options.pool_ttl_check_interval,
405                    )),
406                    stream_chunk_size: options.stream_chunk_size as u32,
407                },
408            )),
409        },
410        ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
411            connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
412                protobuf::SqliteConnectionOptions {
413                    path: options.path.to_str().unwrap().to_string(),
414                    stream_chunk_size: options.stream_chunk_size as u32,
415                },
416            )),
417        },
418        ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
419            connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
420                protobuf::DmConnectionOptions {
421                    host: options.host.clone(),
422                    port: options.port as u32,
423                    username: options.username.clone(),
424                    password: options.password.clone(),
425                    schema: options.schema.clone(),
426                    stream_chunk_size: options.stream_chunk_size as u32,
427                    driver: options.driver.clone(),
428                },
429            )),
430        },
431    }
432}
433
434fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
435    match options.connection_options {
436        Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
437            ConnectionOptions::Postgres(PostgresConnectionOptions {
438                host: options.host,
439                port: options.port as u16,
440                username: options.username,
441                password: options.password,
442                database: options.database,
443                pool_max_size: options.pool_max_size as usize,
444                pool_min_idle: options.pool_min_idle as usize,
445                pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
446                pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
447                stream_chunk_size: options.stream_chunk_size as usize,
448                default_numeric_scale: options.default_numeric_scale as i8,
449            })
450        }
451        Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
452            ConnectionOptions::Mysql(MysqlConnectionOptions {
453                host: options.host,
454                port: options.port as u16,
455                username: options.username,
456                password: options.password,
457                database: options.database,
458                pool_max_size: options.pool_max_size as usize,
459                pool_min_idle: options.pool_min_idle as usize,
460                pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
461                pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
462                stream_chunk_size: options.stream_chunk_size as usize,
463            })
464        }
465        Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
466            ConnectionOptions::Oracle(OracleConnectionOptions {
467                host: options.host,
468                port: options.port as u16,
469                username: options.username,
470                password: options.password,
471                service_name: options.service_name,
472                pool_max_size: options.pool_max_size as usize,
473                pool_min_idle: options.pool_min_idle as usize,
474                pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
475                pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
476                stream_chunk_size: options.stream_chunk_size as usize,
477            })
478        }
479        Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
480            ConnectionOptions::Sqlite(SqliteConnectionOptions {
481                path: std::path::Path::new(&options.path).to_path_buf(),
482                stream_chunk_size: options.stream_chunk_size as usize,
483            })
484        }
485        Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
486            ConnectionOptions::Dm(DmConnectionOptions {
487                host: options.host,
488                port: options.port as u16,
489                username: options.username,
490                password: options.password,
491                schema: options.schema,
492                stream_chunk_size: options.stream_chunk_size as usize,
493                driver: options.driver,
494            })
495        }
496        _ => panic!("Failed to parse connection options: {options:?}"),
497    }
498}
499
500fn serialize_duration(duration: &Duration) -> protobuf::Duration {
501    protobuf::Duration {
502        secs: duration.as_secs(),
503        nanos: duration.subsec_nanos(),
504    }
505}
506
507fn parse_duration(duration: &protobuf::Duration) -> Duration {
508    Duration::new(duration.secs, duration.nanos)
509}
510
511fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<protobuf::Projection> {
512    projection.map(|p| protobuf::Projection {
513        projection: p.iter().map(|n| *n as u32).collect(),
514    })
515}
516
517fn parse_projection(projection: Option<&protobuf::Projection>) -> Option<Vec<usize>> {
518    projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
519}
520
521fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
522    let fields = remote_schema
523        .fields
524        .iter()
525        .map(serialize_remote_field)
526        .collect::<Vec<_>>();
527
528    protobuf::RemoteSchema { fields }
529}
530
531fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
532    protobuf::RemoteField {
533        name: remote_field.name.clone(),
534        remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
535        nullable: remote_field.nullable,
536        auto_increment: remote_field.auto_increment,
537    }
538}
539
540fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
541    match remote_type {
542        RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
543            r#type: Some(protobuf::remote_type::Type::PostgresInt2(
544                protobuf::Empty {},
545            )),
546        },
547        RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
548            r#type: Some(protobuf::remote_type::Type::PostgresInt4(
549                protobuf::Empty {},
550            )),
551        },
552        RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
553            r#type: Some(protobuf::remote_type::Type::PostgresInt8(
554                protobuf::Empty {},
555            )),
556        },
557        RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
558            r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
559                protobuf::Empty {},
560            )),
561        },
562        RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
563            r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
564                protobuf::Empty {},
565            )),
566        },
567        RemoteType::Postgres(PostgresType::Numeric(precision, scale)) => protobuf::RemoteType {
568            r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
569                protobuf::PostgresNumeric {
570                    precision: *precision as u32,
571                    scale: *scale as i32,
572                },
573            )),
574        },
575        RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
576            r#type: Some(protobuf::remote_type::Type::PostgresName(
577                protobuf::Empty {},
578            )),
579        },
580        RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
581            r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
582                protobuf::Empty {},
583            )),
584        },
585        RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
586            r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
587                protobuf::Empty {},
588            )),
589        },
590        RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
591            r#type: Some(protobuf::remote_type::Type::PostgresText(
592                protobuf::Empty {},
593            )),
594        },
595        RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
596            r#type: Some(protobuf::remote_type::Type::PostgresBytea(
597                protobuf::Empty {},
598            )),
599        },
600        RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
601            r#type: Some(protobuf::remote_type::Type::PostgresDate(
602                protobuf::Empty {},
603            )),
604        },
605        RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
606            r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
607                protobuf::Empty {},
608            )),
609        },
610        RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
611            r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
612                protobuf::Empty {},
613            )),
614        },
615        RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
616            r#type: Some(protobuf::remote_type::Type::PostgresTime(
617                protobuf::Empty {},
618            )),
619        },
620        RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
621            r#type: Some(protobuf::remote_type::Type::PostgresInterval(
622                protobuf::Empty {},
623            )),
624        },
625        RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
626            r#type: Some(protobuf::remote_type::Type::PostgresBool(
627                protobuf::Empty {},
628            )),
629        },
630        RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
631            r#type: Some(protobuf::remote_type::Type::PostgresJson(
632                protobuf::Empty {},
633            )),
634        },
635        RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
636            r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
637                protobuf::Empty {},
638            )),
639        },
640        RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
641            r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
642                protobuf::Empty {},
643            )),
644        },
645        RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
646            r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
647                protobuf::Empty {},
648            )),
649        },
650        RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
651            r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
652                protobuf::Empty {},
653            )),
654        },
655        RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
656            r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
657                protobuf::Empty {},
658            )),
659        },
660        RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
661            r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
662                protobuf::Empty {},
663            )),
664        },
665        RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
666            r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
667                protobuf::Empty {},
668            )),
669        },
670        RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
671            r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
672                protobuf::Empty {},
673            )),
674        },
675        RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
676            r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
677                protobuf::Empty {},
678            )),
679        },
680        RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
681            r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
682                protobuf::Empty {},
683            )),
684        },
685        RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
686            r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
687                protobuf::Empty {},
688            )),
689        },
690        RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
691            r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
692                protobuf::Empty {},
693            )),
694        },
695        RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
696            r#type: Some(protobuf::remote_type::Type::PostgresOid(protobuf::Empty {})),
697        },
698        RemoteType::Postgres(PostgresType::Xml) => protobuf::RemoteType {
699            r#type: Some(protobuf::remote_type::Type::PostgresXml(protobuf::Empty {})),
700        },
701        RemoteType::Postgres(PostgresType::Uuid) => protobuf::RemoteType {
702            r#type: Some(protobuf::remote_type::Type::PostgresUuid(
703                protobuf::Empty {},
704            )),
705        },
706
707        RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
708            r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
709                protobuf::Empty {},
710            )),
711        },
712        RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
713            r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
714                protobuf::Empty {},
715            )),
716        },
717        RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
718            r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
719                protobuf::Empty {},
720            )),
721        },
722        RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
723            r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
724                protobuf::Empty {},
725            )),
726        },
727        RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
728            r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
729                protobuf::Empty {},
730            )),
731        },
732        RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
733            r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
734                protobuf::Empty {},
735            )),
736        },
737        RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
738            r#type: Some(protobuf::remote_type::Type::MysqlInteger(
739                protobuf::Empty {},
740            )),
741        },
742        RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
743            r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
744                protobuf::Empty {},
745            )),
746        },
747        RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
748            r#type: Some(protobuf::remote_type::Type::MysqlBigInt(protobuf::Empty {})),
749        },
750        RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
751            r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
752                protobuf::Empty {},
753            )),
754        },
755        RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
756            r#type: Some(protobuf::remote_type::Type::MysqlFloat(protobuf::Empty {})),
757        },
758        RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
759            r#type: Some(protobuf::remote_type::Type::MysqlDouble(protobuf::Empty {})),
760        },
761        RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
762            r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
763                protobuf::MysqlDecimal {
764                    precision: *precision as u32,
765                    scale: *scale as u32,
766                },
767            )),
768        },
769        RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
770            r#type: Some(protobuf::remote_type::Type::MysqlDate(protobuf::Empty {})),
771        },
772        RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
773            r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
774                protobuf::Empty {},
775            )),
776        },
777        RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
778            r#type: Some(protobuf::remote_type::Type::MysqlTime(protobuf::Empty {})),
779        },
780        RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
781            r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
782                protobuf::Empty {},
783            )),
784        },
785        RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
786            r#type: Some(protobuf::remote_type::Type::MysqlYear(protobuf::Empty {})),
787        },
788        RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
789            r#type: Some(protobuf::remote_type::Type::MysqlChar(protobuf::Empty {})),
790        },
791        RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
792            r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
793                protobuf::Empty {},
794            )),
795        },
796        RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
797            r#type: Some(protobuf::remote_type::Type::MysqlBinary(protobuf::Empty {})),
798        },
799        RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
800            r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
801                protobuf::Empty {},
802            )),
803        },
804        RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
805            r#type: Some(protobuf::remote_type::Type::MysqlText(
806                protobuf::MysqlText { length: *len },
807            )),
808        },
809        RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
810            r#type: Some(protobuf::remote_type::Type::MysqlBlob(
811                protobuf::MysqlBlob { length: *len },
812            )),
813        },
814        RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
815            r#type: Some(protobuf::remote_type::Type::MysqlJson(protobuf::Empty {})),
816        },
817        RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
818            r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
819                protobuf::Empty {},
820            )),
821        },
822
823        RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
824            r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
825                protobuf::OracleVarchar2 { length: *len },
826            )),
827        },
828        RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
829            r#type: Some(protobuf::remote_type::Type::OracleChar(
830                protobuf::OracleChar { length: *len },
831            )),
832        },
833        RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
834            r#type: Some(protobuf::remote_type::Type::OracleNumber(
835                protobuf::OracleNumber {
836                    precision: *precision as u32,
837                    scale: *scale as i32,
838                },
839            )),
840        },
841        RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
842            r#type: Some(protobuf::remote_type::Type::OracleDate(protobuf::Empty {})),
843        },
844        RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
845            r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
846                protobuf::Empty {},
847            )),
848        },
849        RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
850            r#type: Some(protobuf::remote_type::Type::OracleBoolean(
851                protobuf::Empty {},
852            )),
853        },
854        RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
855            r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
856                protobuf::Empty {},
857            )),
858        },
859        RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
860            r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
861                protobuf::Empty {},
862            )),
863        },
864        RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
865            r#type: Some(protobuf::remote_type::Type::OracleBlob(protobuf::Empty {})),
866        },
867        RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
868            r#type: Some(protobuf::remote_type::Type::OracleFloat(
869                protobuf::OracleFloat {
870                    precision: *precision as u32,
871                },
872            )),
873        },
874        RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
875            r#type: Some(protobuf::remote_type::Type::OracleNchar(
876                protobuf::OracleNChar { length: *len },
877            )),
878        },
879        RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
880            r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
881                protobuf::OracleNVarchar2 { length: *len },
882            )),
883        },
884        RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
885            r#type: Some(protobuf::remote_type::Type::OracleRaw(
886                protobuf::OracleRaw { length: *len },
887            )),
888        },
889        RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
890            r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
891                protobuf::Empty {},
892            )),
893        },
894        RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
895            r#type: Some(protobuf::remote_type::Type::OracleLong(protobuf::Empty {})),
896        },
897        RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
898            r#type: Some(protobuf::remote_type::Type::OracleClob(protobuf::Empty {})),
899        },
900        RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
901            r#type: Some(protobuf::remote_type::Type::OracleNclob(protobuf::Empty {})),
902        },
903        RemoteType::Oracle(OracleType::SdeGeometry) => protobuf::RemoteType {
904            r#type: Some(protobuf::remote_type::Type::OracleSdeGeometry(
905                protobuf::Empty {},
906            )),
907        },
908        RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
909            r#type: Some(protobuf::remote_type::Type::SqliteNull(protobuf::Empty {})),
910        },
911        RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
912            r#type: Some(protobuf::remote_type::Type::SqliteInteger(
913                protobuf::Empty {},
914            )),
915        },
916        RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
917            r#type: Some(protobuf::remote_type::Type::SqliteReal(protobuf::Empty {})),
918        },
919        RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
920            r#type: Some(protobuf::remote_type::Type::SqliteText(protobuf::Empty {})),
921        },
922        RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
923            r#type: Some(protobuf::remote_type::Type::SqliteBlob(protobuf::Empty {})),
924        },
925        RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
926            r#type: Some(protobuf::remote_type::Type::DmTinyInt(protobuf::Empty {})),
927        },
928        RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
929            r#type: Some(protobuf::remote_type::Type::DmSmallInt(protobuf::Empty {})),
930        },
931        RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
932            r#type: Some(protobuf::remote_type::Type::DmInteger(protobuf::Empty {})),
933        },
934        RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
935            r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::Empty {})),
936        },
937        RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
938            r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::Empty {})),
939        },
940        RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
941            r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::Empty {})),
942        },
943        RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
944            r#type: Some(protobuf::remote_type::Type::DmNumeric(
945                protobuf::DmNumeric {
946                    precision: *precision as u32,
947                    scale: *scale as i32,
948                },
949            )),
950        },
951        RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
952            r#type: Some(protobuf::remote_type::Type::DmDecimal(
953                protobuf::DmDecimal {
954                    precision: *precision as u32,
955                    scale: *scale as i32,
956                },
957            )),
958        },
959        RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
960            r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
961                length: len.map(|s| s as u32),
962            })),
963        },
964        RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
965            r#type: Some(protobuf::remote_type::Type::DmVarchar(
966                protobuf::DmVarchar {
967                    length: len.map(|s| s as u32),
968                },
969            )),
970        },
971        RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
972            r#type: Some(protobuf::remote_type::Type::DmText(protobuf::Empty {})),
973        },
974        RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
975            r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
976                length: *len as u32,
977            })),
978        },
979        RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
980            r#type: Some(protobuf::remote_type::Type::DmVarbinary(
981                protobuf::DmVarbinary {
982                    length: len.map(|s| s as u32),
983                },
984            )),
985        },
986        RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
987            r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::Empty {})),
988        },
989        RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
990            r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::Empty {})),
991        },
992        RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
993            r#type: Some(protobuf::remote_type::Type::DmTimestamp(
994                protobuf::DmTimestamp {
995                    precision: *precision as u32,
996                },
997            )),
998        },
999        RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
1000            r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
1001                precision: *precision as u32,
1002            })),
1003        },
1004        RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
1005            r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::Empty {})),
1006        },
1007    }
1008}
1009
1010fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
1011    let fields = remote_schema
1012        .fields
1013        .iter()
1014        .map(parse_remote_field)
1015        .collect::<Vec<_>>();
1016
1017    RemoteSchema { fields }
1018}
1019
1020fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
1021    RemoteField {
1022        name: field.name.clone(),
1023        remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
1024        nullable: field.nullable,
1025        auto_increment: field.auto_increment,
1026    }
1027}
1028
1029fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
1030    match remote_type.r#type.as_ref().unwrap() {
1031        protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
1032        protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
1033        protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
1034        protobuf::remote_type::Type::PostgresFloat4(_) => {
1035            RemoteType::Postgres(PostgresType::Float4)
1036        }
1037        protobuf::remote_type::Type::PostgresFloat8(_) => {
1038            RemoteType::Postgres(PostgresType::Float8)
1039        }
1040        protobuf::remote_type::Type::PostgresNumeric(numeric) => RemoteType::Postgres(
1041            PostgresType::Numeric(numeric.precision as u8, numeric.scale as i8),
1042        ),
1043        protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
1044        protobuf::remote_type::Type::PostgresVarchar(_) => {
1045            RemoteType::Postgres(PostgresType::Varchar)
1046        }
1047        protobuf::remote_type::Type::PostgresBpchar(_) => {
1048            RemoteType::Postgres(PostgresType::Bpchar)
1049        }
1050        protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
1051        protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
1052        protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
1053        protobuf::remote_type::Type::PostgresTimestamp(_) => {
1054            RemoteType::Postgres(PostgresType::Timestamp)
1055        }
1056        protobuf::remote_type::Type::PostgresTimestampTz(_) => {
1057            RemoteType::Postgres(PostgresType::TimestampTz)
1058        }
1059        protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
1060        protobuf::remote_type::Type::PostgresInterval(_) => {
1061            RemoteType::Postgres(PostgresType::Interval)
1062        }
1063        protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
1064        protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
1065        protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
1066        protobuf::remote_type::Type::PostgresInt2Array(_) => {
1067            RemoteType::Postgres(PostgresType::Int2Array)
1068        }
1069        protobuf::remote_type::Type::PostgresInt4Array(_) => {
1070            RemoteType::Postgres(PostgresType::Int4Array)
1071        }
1072        protobuf::remote_type::Type::PostgresInt8Array(_) => {
1073            RemoteType::Postgres(PostgresType::Int8Array)
1074        }
1075        protobuf::remote_type::Type::PostgresFloat4Array(_) => {
1076            RemoteType::Postgres(PostgresType::Float4Array)
1077        }
1078        protobuf::remote_type::Type::PostgresFloat8Array(_) => {
1079            RemoteType::Postgres(PostgresType::Float8Array)
1080        }
1081        protobuf::remote_type::Type::PostgresVarcharArray(_) => {
1082            RemoteType::Postgres(PostgresType::VarcharArray)
1083        }
1084        protobuf::remote_type::Type::PostgresBpcharArray(_) => {
1085            RemoteType::Postgres(PostgresType::BpcharArray)
1086        }
1087        protobuf::remote_type::Type::PostgresTextArray(_) => {
1088            RemoteType::Postgres(PostgresType::TextArray)
1089        }
1090        protobuf::remote_type::Type::PostgresByteaArray(_) => {
1091            RemoteType::Postgres(PostgresType::ByteaArray)
1092        }
1093        protobuf::remote_type::Type::PostgresBoolArray(_) => {
1094            RemoteType::Postgres(PostgresType::BoolArray)
1095        }
1096        protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
1097            RemoteType::Postgres(PostgresType::PostGisGeometry)
1098        }
1099        protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
1100        protobuf::remote_type::Type::PostgresXml(_) => RemoteType::Postgres(PostgresType::Xml),
1101        protobuf::remote_type::Type::PostgresUuid(_) => RemoteType::Postgres(PostgresType::Uuid),
1102        protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
1103        protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
1104            RemoteType::Mysql(MysqlType::TinyIntUnsigned)
1105        }
1106        protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
1107        protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
1108            RemoteType::Mysql(MysqlType::SmallIntUnsigned)
1109        }
1110        protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
1111        protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
1112            RemoteType::Mysql(MysqlType::MediumIntUnsigned)
1113        }
1114        protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
1115        protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
1116            RemoteType::Mysql(MysqlType::IntegerUnsigned)
1117        }
1118        protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
1119        protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
1120            RemoteType::Mysql(MysqlType::BigIntUnsigned)
1121        }
1122        protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
1123        protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
1124        protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
1125            MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
1126        ),
1127        protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
1128        protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
1129        protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
1130        protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
1131        protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
1132        protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
1133        protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
1134        protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
1135        protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
1136        protobuf::remote_type::Type::MysqlText(text) => {
1137            RemoteType::Mysql(MysqlType::Text(text.length))
1138        }
1139        protobuf::remote_type::Type::MysqlBlob(blob) => {
1140            RemoteType::Mysql(MysqlType::Blob(blob.length))
1141        }
1142        protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
1143        protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
1144        protobuf::remote_type::Type::OracleVarchar2(varchar) => {
1145            RemoteType::Oracle(OracleType::Varchar2(varchar.length))
1146        }
1147        protobuf::remote_type::Type::OracleChar(char) => {
1148            RemoteType::Oracle(OracleType::Char(char.length))
1149        }
1150        protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
1151            OracleType::Number(number.precision as u8, number.scale as i8),
1152        ),
1153        protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
1154        protobuf::remote_type::Type::OracleTimestamp(_) => {
1155            RemoteType::Oracle(OracleType::Timestamp)
1156        }
1157        protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
1158        protobuf::remote_type::Type::OracleBinaryFloat(_) => {
1159            RemoteType::Oracle(OracleType::BinaryFloat)
1160        }
1161        protobuf::remote_type::Type::OracleBinaryDouble(_) => {
1162            RemoteType::Oracle(OracleType::BinaryDouble)
1163        }
1164        protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
1165            RemoteType::Oracle(OracleType::Float(*precision as u8))
1166        }
1167        protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
1168            RemoteType::Oracle(OracleType::NChar(*length))
1169        }
1170        protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
1171            RemoteType::Oracle(OracleType::NVarchar2(*length))
1172        }
1173        protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
1174            RemoteType::Oracle(OracleType::Raw(*length))
1175        }
1176        protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
1177        protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
1178        protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
1179        protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
1180        protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
1181        protobuf::remote_type::Type::OracleSdeGeometry(_) => {
1182            RemoteType::Oracle(OracleType::SdeGeometry)
1183        }
1184        protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1185        protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1186        protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1187        protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1188        protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1189        protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1190        protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1191        protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1192        protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1193        protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1194        protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1195        protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1196            RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1197        }
1198        protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1199            RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1200        }
1201        protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1202            RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1203        }
1204        protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1205            RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1206        }
1207        protobuf::remote_type::Type::DmText(_) => RemoteType::Dm(DmType::Text),
1208        protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1209            RemoteType::Dm(DmType::Binary(*length as u16))
1210        }
1211        protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1212            RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1213        }
1214        protobuf::remote_type::Type::DmImage(_) => RemoteType::Dm(DmType::Image),
1215        protobuf::remote_type::Type::DmBit(_) => RemoteType::Dm(DmType::Bit),
1216        protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1217            RemoteType::Dm(DmType::Timestamp(*precision as u8))
1218        }
1219        protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1220            RemoteType::Dm(DmType::Time(*precision as u8))
1221        }
1222        protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1223    }
1224}
1225
1226fn serialize_remote_source(source: &RemoteSource) -> protobuf::RemoteSource {
1227    match source {
1228        RemoteSource::Query(query) => protobuf::RemoteSource {
1229            source: Some(protobuf::remote_source::Source::Query(query.clone())),
1230        },
1231        RemoteSource::Table(table_identifiers) => protobuf::RemoteSource {
1232            source: Some(protobuf::remote_source::Source::Table(
1233                protobuf::Identifiers {
1234                    idents: table_identifiers.clone(),
1235                },
1236            )),
1237        },
1238    }
1239}
1240
1241fn parse_remote_source(source: &protobuf::RemoteSource) -> DFResult<RemoteSource> {
1242    let source = source.source.as_ref().ok_or(DataFusionError::Internal(
1243        "remote source is not set".to_string(),
1244    ))?;
1245    match source {
1246        protobuf::remote_source::Source::Query(query) => Ok(RemoteSource::Query(query.clone())),
1247        protobuf::remote_source::Source::Table(table_identifiers) => {
1248            Ok(RemoteSource::Table(table_identifiers.idents.clone()))
1249        }
1250    }
1251}