datafusion_remote_table/
codec.rs

1#[cfg(feature = "dm")]
2use crate::DmConnectionOptions;
3#[cfg(feature = "mysql")]
4use crate::MysqlConnectionOptions;
5#[cfg(feature = "oracle")]
6use crate::OracleConnectionOptions;
7#[cfg(feature = "postgres")]
8use crate::PostgresConnectionOptions;
9#[cfg(feature = "sqlite")]
10use crate::SqliteConnectionOptions;
11use crate::generated::prost as protobuf;
12use crate::{
13    Connection, ConnectionOptions, DFResult, DefaultLiteralizer, DefaultTransform, DmType,
14    Literalize, MysqlType, OracleType, PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef,
15    RemoteSource, RemoteTableInsertExec, RemoteTableScanExec, RemoteType, SqliteType, Transform,
16    connect,
17};
18use datafusion::arrow::array::RecordBatch;
19use datafusion::arrow::datatypes::{Schema, SchemaRef};
20use datafusion::arrow::ipc::reader::StreamReader;
21use datafusion::arrow::ipc::writer::StreamWriter;
22use datafusion::catalog::memory::{DataSourceExec, MemorySourceConfig};
23use datafusion::common::DataFusionError;
24use datafusion::datasource::source::DataSource;
25use datafusion::execution::FunctionRegistry;
26use datafusion::physical_expr::LexOrdering;
27use datafusion::physical_plan::ExecutionPlan;
28use datafusion_proto::convert_required;
29use datafusion_proto::physical_plan::PhysicalExtensionCodec;
30use datafusion_proto::physical_plan::from_proto::parse_physical_sort_exprs;
31use datafusion_proto::physical_plan::to_proto::serialize_physical_sort_exprs;
32use datafusion_proto::protobuf::proto_error;
33use derive_with::With;
34use log::debug;
35use prost::Message;
36use std::fmt::Debug;
37use std::sync::Arc;
38
39pub trait TransformCodec: Debug + Send + Sync {
40    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
41    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
42}
43
44#[derive(Debug)]
45pub struct DefaultTransformCodec {}
46
47const DEFAULT_TRANSFORM_ID: &str = "__default";
48
49impl TransformCodec for DefaultTransformCodec {
50    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
51        if value.as_any().is::<DefaultTransform>() {
52            Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
53        } else {
54            Err(DataFusionError::Execution(format!(
55                "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
56            )))
57        }
58    }
59
60    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
61        if value == DEFAULT_TRANSFORM_ID.as_bytes() {
62            Ok(Arc::new(DefaultTransform {}))
63        } else {
64            Err(DataFusionError::Execution(
65                "DefaultTransformCodec only supports DefaultTransform".to_string(),
66            ))
67        }
68    }
69}
70
71pub trait LiteralizeCodec: Debug + Send + Sync {
72    fn try_encode(&self, value: &dyn Literalize) -> DFResult<Vec<u8>>;
73    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Literalize>>;
74}
75
76#[derive(Debug)]
77pub struct DefaultLiteralizeCodec {}
78
79const DEFAULT_LITERALIZE_ID: &str = "__default";
80
81impl LiteralizeCodec for DefaultLiteralizeCodec {
82    fn try_encode(&self, value: &dyn Literalize) -> DFResult<Vec<u8>> {
83        if value.as_any().is::<DefaultLiteralizer>() {
84            Ok(DEFAULT_LITERALIZE_ID.as_bytes().to_vec())
85        } else {
86            Err(DataFusionError::Execution(format!(
87                "DefaultLiteralizeCodec does not support literalize: {value:?}, please implement a custom LiteralizeCodec."
88            )))
89        }
90    }
91
92    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Literalize>> {
93        if value == DEFAULT_LITERALIZE_ID.as_bytes() {
94            Ok(Arc::new(DefaultLiteralizer {}))
95        } else {
96            Err(DataFusionError::Execution(
97                "DefaultLiteralizeCodec only supports DefaultLiteralizer".to_string(),
98            ))
99        }
100    }
101}
102
103pub trait ConnectionCodec: Debug + Send + Sync {
104    fn try_encode(
105        &self,
106        value: &dyn Connection,
107        conn_options: &ConnectionOptions,
108    ) -> DFResult<Vec<u8>>;
109    fn try_decode(
110        &self,
111        value: &[u8],
112        conn_options: &ConnectionOptions,
113    ) -> DFResult<Arc<dyn Connection>>;
114}
115
116#[derive(Debug)]
117pub struct DefaultConnectionCodec;
118
119const DEFAULT_CONNECTION_VALUE: &str = "__default";
120
121impl ConnectionCodec for DefaultConnectionCodec {
122    fn try_encode(
123        &self,
124        _value: &dyn Connection,
125        _conn_options: &ConnectionOptions,
126    ) -> DFResult<Vec<u8>> {
127        Ok(DEFAULT_CONNECTION_VALUE.as_bytes().to_vec())
128    }
129
130    fn try_decode(
131        &self,
132        value: &[u8],
133        conn_options: &ConnectionOptions,
134    ) -> DFResult<Arc<dyn Connection>> {
135        if value != DEFAULT_CONNECTION_VALUE.as_bytes() {
136            return Err(DataFusionError::Execution(format!(
137                "DefaultConnectionCodec only supports {DEFAULT_CONNECTION_VALUE} value but got: {value:?}"
138            )));
139        }
140        let conn_options = conn_options.clone().with_pool_max_size(1);
141        tokio::task::block_in_place(|| {
142            tokio::runtime::Handle::current().block_on(async {
143                let pool = connect(&conn_options).await?;
144                let conn = pool.get().await?;
145                Ok::<_, DataFusionError>(conn)
146            })
147        })
148    }
149}
150
151#[derive(Debug, With)]
152pub struct RemotePhysicalCodec {
153    pub transform_codec: Arc<dyn TransformCodec>,
154    pub literalize_codec: Arc<dyn LiteralizeCodec>,
155    pub connection_codec: Arc<dyn ConnectionCodec>,
156}
157
158impl RemotePhysicalCodec {
159    pub fn new() -> Self {
160        Self {
161            transform_codec: Arc::new(DefaultTransformCodec {}),
162            literalize_codec: Arc::new(DefaultLiteralizeCodec {}),
163            connection_codec: Arc::new(DefaultConnectionCodec {}),
164        }
165    }
166}
167
168impl Default for RemotePhysicalCodec {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174impl PhysicalExtensionCodec for RemotePhysicalCodec {
175    fn try_decode(
176        &self,
177        buf: &[u8],
178        inputs: &[Arc<dyn ExecutionPlan>],
179        registry: &dyn FunctionRegistry,
180    ) -> DFResult<Arc<dyn ExecutionPlan>> {
181        let remote_table_node =
182            protobuf::RemoteTablePhysicalPlanNode::decode(buf).map_err(|e| {
183                DataFusionError::Internal(format!(
184                    "Failed to decode remote table physical plan node: {e:?}"
185                ))
186            })?;
187        let remote_table_plan = remote_table_node.remote_table_physical_plan_type.ok_or_else(|| {
188            DataFusionError::Internal(
189                "Failed to decode remote table physical plan node due to physical plan type is none".to_string()
190            )
191        })?;
192
193        match remote_table_plan {
194            protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(proto) => {
195                let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
196                    Arc::new(DefaultTransform {})
197                } else {
198                    self.transform_codec.try_decode(&proto.transform)?
199                };
200
201                let source = parse_remote_source(proto.source.as_ref().ok_or(DataFusionError::Internal(
202                    "remote source is not set".to_string(),
203                ))?)?;
204
205                let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
206                let remote_schema = proto
207                    .remote_schema
208                    .map(|schema| Arc::new(parse_remote_schema(&schema)));
209
210                let projection: Option<Vec<usize>> = proto
211                    .projection
212                    .map(|p| p.projection.iter().map(|n| *n as usize).collect());
213
214                let limit = proto.limit.map(|l| l as usize);
215
216                let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
217
218                let now = std::time::Instant::now();
219                let conn = self
220                    .connection_codec
221                    .try_decode(&proto.connection, &conn_options)?;
222                debug!(
223                    "[remote-table] Decoding connection cost {}ms",
224                    now.elapsed().as_millis()
225                );
226
227                Ok(Arc::new(RemoteTableScanExec::try_new(
228                    conn_options,
229                    source,
230                    table_schema,
231                    remote_schema,
232                    projection,
233                    proto.unparsed_filters,
234                    limit,
235                    transform,
236                    conn,
237                )?))
238            }
239            protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(proto) => {
240
241                if inputs.len() != 1 {
242                    return Err(DataFusionError::Internal(
243                        "RemoteTableInsertExec only support one input".to_string(),
244                    ));
245                }
246
247                let input = inputs[0].clone();
248
249                let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
250                let remote_schema = Arc::new(parse_remote_schema(&proto.remote_schema.unwrap()));
251                let table = proto.table.unwrap().idents;
252
253                let literalizer = if proto.literalizer == DEFAULT_LITERALIZE_ID.as_bytes() {
254                    Arc::new(DefaultLiteralizer {})
255                } else {
256                    self.literalize_codec.try_decode(&proto.literalizer)?
257                };
258
259                let now = std::time::Instant::now();
260                let conn = self
261                    .connection_codec
262                    .try_decode(&proto.connection, &conn_options)?;
263                debug!(
264                    "[remote-table] Decoding connection cost {}ms",
265                    now.elapsed().as_millis()
266                );
267
268                Ok(Arc::new(RemoteTableInsertExec::new(
269                    input,
270                    conn_options,
271                    literalizer,
272                    table,
273                    remote_schema,
274                    conn,
275                )))
276            }
277            protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::MemoryDatasource(proto) => {
278                let partitions = parse_partitions(&proto.partitions)?;
279                let schema = Schema::try_from(&proto.schema.unwrap())?;
280                let projection = parse_projection(proto.projection.as_ref());
281
282                let sort_information = proto
283                    .sort_information
284                    .iter()
285                    .map(|sort_exprs| {
286                        let sort_exprs = parse_physical_sort_exprs(
287                            sort_exprs.physical_sort_expr_nodes.as_slice(),
288                            registry,
289                            &schema,
290                            self,
291                        )?;
292                        let lex_ordering = LexOrdering::new(sort_exprs).expect("lex ordering is not empty");
293                        Ok::<_, DataFusionError>(lex_ordering)
294                    })
295                    .collect::<Result<Vec<_>, _>>()?;
296
297                let show_sizes = proto.show_sizes;
298                let fetch = proto.fetch.map(|f| f as usize);
299                let memory_source =
300                    MemorySourceConfig::try_new(&partitions, Arc::new(schema), projection)?
301                        .with_show_sizes(show_sizes)
302                        .with_limit(fetch);
303
304                let memory_source =
305                    MemorySourceConfig::try_with_sort_information(memory_source, sort_information)?;
306                Ok(DataSourceExec::from_data_source(memory_source))
307            }
308        }
309    }
310
311    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
312        if let Some(exec) = node.as_any().downcast_ref::<RemoteTableScanExec>() {
313            let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
314                DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
315            } else {
316                let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
317                assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
318                bytes
319            };
320
321            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
322            let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
323
324            let serialized_conn = self
325                .connection_codec
326                .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
327
328            let serialized_source = serialize_remote_source(&exec.source);
329
330            let proto = protobuf::RemoteTablePhysicalPlanNode {
331                remote_table_physical_plan_type: Some(
332                    protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(
333                        protobuf::RemoteTableScanExec {
334                            conn_options: Some(serialized_connection_options),
335                            source: Some(serialized_source),
336                            table_schema: Some(exec.table_schema.as_ref().try_into()?),
337                            remote_schema,
338                            projection: serialize_projection(exec.projection.as_ref()),
339                            unparsed_filters: exec.unparsed_filters.clone(),
340                            limit: exec.limit.map(|l| l as u32),
341                            transform: serialized_transform,
342                            connection: serialized_conn,
343                        },
344                    ),
345                ),
346            };
347
348            proto.encode(buf).map_err(|e| {
349                DataFusionError::Internal(format!(
350                    "Failed to encode remote table scan exec plan: {e:?}"
351                ))
352            })?;
353            Ok(())
354        } else if let Some(exec) = node.as_any().downcast_ref::<RemoteTableInsertExec>() {
355            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
356            let remote_schema = serialize_remote_schema(&exec.remote_schema);
357            let serialized_table = protobuf::Identifiers {
358                idents: exec.table.clone(),
359            };
360            let serialized_literalizer = self
361                .literalize_codec
362                .try_encode(exec.literalizer.as_ref())?;
363            let serialized_conn = self
364                .connection_codec
365                .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
366
367            let proto = protobuf::RemoteTablePhysicalPlanNode {
368                remote_table_physical_plan_type: Some(
369                    protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(
370                        protobuf::RemoteTableInsertExec {
371                            conn_options: Some(serialized_connection_options),
372                            table: Some(serialized_table),
373                            remote_schema: Some(remote_schema),
374                            literalizer: serialized_literalizer,
375                            connection: serialized_conn,
376                        },
377                    ),
378                ),
379            };
380
381            proto.encode(buf).map_err(|e| {
382                DataFusionError::Internal(format!(
383                    "Failed to encode remote table insert exec node: {e:?}"
384                ))
385            })?;
386
387            Ok(())
388        } else if let Some(exec) = node.as_any().downcast_ref::<DataSourceExec>() {
389            let source = exec.data_source();
390            if let Some(memory_source) = source.as_any().downcast_ref::<MemorySourceConfig>() {
391                let proto_partitions = serialize_partitions(memory_source.partitions())?;
392                let projection = serialize_projection(memory_source.projection().as_ref());
393                let sort_information = memory_source
394                    .sort_information()
395                    .iter()
396                    .map(|ordering| {
397                        let sort_exprs = serialize_physical_sort_exprs(ordering.clone(), self)?;
398                        Ok::<_, DataFusionError>(
399                            datafusion_proto::protobuf::PhysicalSortExprNodeCollection {
400                                physical_sort_expr_nodes: sort_exprs,
401                            },
402                        )
403                    })
404                    .collect::<Result<Vec<_>, _>>()?;
405
406                let proto = protobuf::RemoteTablePhysicalPlanNode {
407                    remote_table_physical_plan_type: Some(
408                        protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::MemoryDatasource(protobuf::MemoryDatasourceNode {
409                            partitions: proto_partitions,
410                            schema: Some(memory_source.original_schema().try_into()?),
411                            projection,
412                            sort_information,
413                            show_sizes: memory_source.show_sizes(),
414                            fetch: memory_source.fetch().map(|f| f as u32),
415                        }),
416                    ),
417                };
418
419                proto.encode(buf).map_err(|e| {
420                    DataFusionError::Internal(format!(
421                        "Failed to encode memory datasource node: {e:?}"
422                    ))
423                })?;
424
425                Ok(())
426            } else {
427                Err(DataFusionError::NotImplemented(format!(
428                    "RemotePhysicalCodec only support encoding MemorySourceConfig, got {source:?}",
429                )))
430            }
431        } else {
432            Err(DataFusionError::NotImplemented(format!(
433                "RemotePhysicalCodec does not support encoding {}",
434                node.name()
435            )))
436        }
437    }
438}
439
440fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
441    match options {
442        #[cfg(feature = "postgres")]
443        ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
444            connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
445                protobuf::PostgresConnectionOptions {
446                    host: options.host.clone(),
447                    port: options.port as u32,
448                    username: options.username.clone(),
449                    password: options.password.clone(),
450                    database: options.database.clone(),
451                    pool_max_size: options.pool_max_size as u32,
452                    stream_chunk_size: options.stream_chunk_size as u32,
453                    default_numeric_scale: options.default_numeric_scale as i32,
454                },
455            )),
456        },
457        #[cfg(feature = "mysql")]
458        ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
459            connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
460                protobuf::MysqlConnectionOptions {
461                    host: options.host.clone(),
462                    port: options.port as u32,
463                    username: options.username.clone(),
464                    password: options.password.clone(),
465                    database: options.database.clone(),
466                    pool_max_size: options.pool_max_size as u32,
467                    stream_chunk_size: options.stream_chunk_size as u32,
468                },
469            )),
470        },
471        #[cfg(feature = "oracle")]
472        ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
473            connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
474                protobuf::OracleConnectionOptions {
475                    host: options.host.clone(),
476                    port: options.port as u32,
477                    username: options.username.clone(),
478                    password: options.password.clone(),
479                    service_name: options.service_name.clone(),
480                    pool_max_size: options.pool_max_size as u32,
481                    stream_chunk_size: options.stream_chunk_size as u32,
482                },
483            )),
484        },
485        #[cfg(feature = "sqlite")]
486        ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
487            connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
488                protobuf::SqliteConnectionOptions {
489                    path: options.path.to_str().unwrap().to_string(),
490                    stream_chunk_size: options.stream_chunk_size as u32,
491                },
492            )),
493        },
494        #[cfg(feature = "dm")]
495        ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
496            connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
497                protobuf::DmConnectionOptions {
498                    host: options.host.clone(),
499                    port: options.port as u32,
500                    username: options.username.clone(),
501                    password: options.password.clone(),
502                    schema: options.schema.clone(),
503                    stream_chunk_size: options.stream_chunk_size as u32,
504                    driver: options.driver.clone(),
505                },
506            )),
507        },
508    }
509}
510
511fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
512    match options.connection_options {
513        #[cfg(feature = "postgres")]
514        Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
515            ConnectionOptions::Postgres(PostgresConnectionOptions {
516                host: options.host,
517                port: options.port as u16,
518                username: options.username,
519                password: options.password,
520                database: options.database,
521                pool_max_size: options.pool_max_size as usize,
522                stream_chunk_size: options.stream_chunk_size as usize,
523                default_numeric_scale: options.default_numeric_scale as i8,
524            })
525        }
526        #[cfg(feature = "mysql")]
527        Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
528            ConnectionOptions::Mysql(MysqlConnectionOptions {
529                host: options.host,
530                port: options.port as u16,
531                username: options.username,
532                password: options.password,
533                database: options.database,
534                pool_max_size: options.pool_max_size as usize,
535                stream_chunk_size: options.stream_chunk_size as usize,
536            })
537        }
538        #[cfg(feature = "oracle")]
539        Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
540            ConnectionOptions::Oracle(OracleConnectionOptions {
541                host: options.host,
542                port: options.port as u16,
543                username: options.username,
544                password: options.password,
545                service_name: options.service_name,
546                pool_max_size: options.pool_max_size as usize,
547                stream_chunk_size: options.stream_chunk_size as usize,
548            })
549        }
550        #[cfg(feature = "sqlite")]
551        Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
552            ConnectionOptions::Sqlite(SqliteConnectionOptions {
553                path: std::path::Path::new(&options.path).to_path_buf(),
554                stream_chunk_size: options.stream_chunk_size as usize,
555            })
556        }
557        #[cfg(feature = "dm")]
558        Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
559            ConnectionOptions::Dm(DmConnectionOptions {
560                host: options.host,
561                port: options.port as u16,
562                username: options.username,
563                password: options.password,
564                schema: options.schema,
565                stream_chunk_size: options.stream_chunk_size as usize,
566                driver: options.driver,
567            })
568        }
569        _ => panic!("Failed to parse connection options: {options:?}"),
570    }
571}
572
573fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<protobuf::Projection> {
574    projection.map(|p| protobuf::Projection {
575        projection: p.iter().map(|n| *n as u32).collect(),
576    })
577}
578
579fn parse_projection(projection: Option<&protobuf::Projection>) -> Option<Vec<usize>> {
580    projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
581}
582
583fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
584    let fields = remote_schema
585        .fields
586        .iter()
587        .map(serialize_remote_field)
588        .collect::<Vec<_>>();
589
590    protobuf::RemoteSchema { fields }
591}
592
593fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
594    protobuf::RemoteField {
595        name: remote_field.name.clone(),
596        remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
597        nullable: remote_field.nullable,
598        auto_increment: remote_field.auto_increment,
599    }
600}
601
602fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
603    match remote_type {
604        RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
605            r#type: Some(protobuf::remote_type::Type::PostgresInt2(
606                protobuf::PostgresInt2 {},
607            )),
608        },
609        RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
610            r#type: Some(protobuf::remote_type::Type::PostgresInt4(
611                protobuf::PostgresInt4 {},
612            )),
613        },
614        RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
615            r#type: Some(protobuf::remote_type::Type::PostgresInt8(
616                protobuf::PostgresInt8 {},
617            )),
618        },
619        RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
620            r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
621                protobuf::PostgresFloat4 {},
622            )),
623        },
624        RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
625            r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
626                protobuf::PostgresFloat8 {},
627            )),
628        },
629        RemoteType::Postgres(PostgresType::Numeric(precision, scale)) => protobuf::RemoteType {
630            r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
631                protobuf::PostgresNumeric {
632                    precision: *precision as u32,
633                    scale: *scale as i32,
634                },
635            )),
636        },
637        RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
638            r#type: Some(protobuf::remote_type::Type::PostgresName(
639                protobuf::PostgresName {},
640            )),
641        },
642        RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
643            r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
644                protobuf::PostgresVarchar {},
645            )),
646        },
647        RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
648            r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
649                protobuf::PostgresBpchar {},
650            )),
651        },
652        RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
653            r#type: Some(protobuf::remote_type::Type::PostgresText(
654                protobuf::PostgresText {},
655            )),
656        },
657        RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
658            r#type: Some(protobuf::remote_type::Type::PostgresBytea(
659                protobuf::PostgresBytea {},
660            )),
661        },
662        RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
663            r#type: Some(protobuf::remote_type::Type::PostgresDate(
664                protobuf::PostgresDate {},
665            )),
666        },
667        RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
668            r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
669                protobuf::PostgresTimestamp {},
670            )),
671        },
672        RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
673            r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
674                protobuf::PostgresTimestampTz {},
675            )),
676        },
677        RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
678            r#type: Some(protobuf::remote_type::Type::PostgresTime(
679                protobuf::PostgresTime {},
680            )),
681        },
682        RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
683            r#type: Some(protobuf::remote_type::Type::PostgresInterval(
684                protobuf::PostgresInterval {},
685            )),
686        },
687        RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
688            r#type: Some(protobuf::remote_type::Type::PostgresBool(
689                protobuf::PostgresBool {},
690            )),
691        },
692        RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
693            r#type: Some(protobuf::remote_type::Type::PostgresJson(
694                protobuf::PostgresJson {},
695            )),
696        },
697        RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
698            r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
699                protobuf::PostgresJsonb {},
700            )),
701        },
702        RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
703            r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
704                protobuf::PostgresInt2Array {},
705            )),
706        },
707        RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
708            r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
709                protobuf::PostgresInt4Array {},
710            )),
711        },
712        RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
713            r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
714                protobuf::PostgresInt8Array {},
715            )),
716        },
717        RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
718            r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
719                protobuf::PostgresFloat4Array {},
720            )),
721        },
722        RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
723            r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
724                protobuf::PostgresFloat8Array {},
725            )),
726        },
727        RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
728            r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
729                protobuf::PostgresVarcharArray {},
730            )),
731        },
732        RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
733            r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
734                protobuf::PostgresBpcharArray {},
735            )),
736        },
737        RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
738            r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
739                protobuf::PostgresTextArray {},
740            )),
741        },
742        RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
743            r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
744                protobuf::PostgresByteaArray {},
745            )),
746        },
747        RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
748            r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
749                protobuf::PostgresBoolArray {},
750            )),
751        },
752        RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
753            r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
754                protobuf::PostgresPostGisGeometry {},
755            )),
756        },
757        RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
758            r#type: Some(protobuf::remote_type::Type::PostgresOid(
759                protobuf::PostgresOid {},
760            )),
761        },
762        RemoteType::Postgres(PostgresType::Xml) => protobuf::RemoteType {
763            r#type: Some(protobuf::remote_type::Type::PostgresXml(
764                protobuf::PostgresXml {},
765            )),
766        },
767        RemoteType::Postgres(PostgresType::Uuid) => protobuf::RemoteType {
768            r#type: Some(protobuf::remote_type::Type::PostgresUuid(
769                protobuf::PostgresUuid {},
770            )),
771        },
772
773        RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
774            r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
775                protobuf::MysqlTinyInt {},
776            )),
777        },
778        RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
779            r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
780                protobuf::MysqlTinyIntUnsigned {},
781            )),
782        },
783        RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
784            r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
785                protobuf::MysqlSmallInt {},
786            )),
787        },
788        RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
789            r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
790                protobuf::MysqlSmallIntUnsigned {},
791            )),
792        },
793        RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
794            r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
795                protobuf::MysqlMediumInt {},
796            )),
797        },
798        RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
799            r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
800                protobuf::MysqlMediumIntUnsigned {},
801            )),
802        },
803        RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
804            r#type: Some(protobuf::remote_type::Type::MysqlInteger(
805                protobuf::MysqlInteger {},
806            )),
807        },
808        RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
809            r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
810                protobuf::MysqlIntegerUnsigned {},
811            )),
812        },
813        RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
814            r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
815                protobuf::MysqlBigInt {},
816            )),
817        },
818        RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
819            r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
820                protobuf::MysqlBigIntUnsigned {},
821            )),
822        },
823        RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
824            r#type: Some(protobuf::remote_type::Type::MysqlFloat(
825                protobuf::MysqlFloat {},
826            )),
827        },
828        RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
829            r#type: Some(protobuf::remote_type::Type::MysqlDouble(
830                protobuf::MysqlDouble {},
831            )),
832        },
833        RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
834            r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
835                protobuf::MysqlDecimal {
836                    precision: *precision as u32,
837                    scale: *scale as u32,
838                },
839            )),
840        },
841        RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
842            r#type: Some(protobuf::remote_type::Type::MysqlDate(
843                protobuf::MysqlDate {},
844            )),
845        },
846        RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
847            r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
848                protobuf::MysqlDateTime {},
849            )),
850        },
851        RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
852            r#type: Some(protobuf::remote_type::Type::MysqlTime(
853                protobuf::MysqlTime {},
854            )),
855        },
856        RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
857            r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
858                protobuf::MysqlTimestamp {},
859            )),
860        },
861        RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
862            r#type: Some(protobuf::remote_type::Type::MysqlYear(
863                protobuf::MysqlYear {},
864            )),
865        },
866        RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
867            r#type: Some(protobuf::remote_type::Type::MysqlChar(
868                protobuf::MysqlChar {},
869            )),
870        },
871        RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
872            r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
873                protobuf::MysqlVarchar {},
874            )),
875        },
876        RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
877            r#type: Some(protobuf::remote_type::Type::MysqlBinary(
878                protobuf::MysqlBinary {},
879            )),
880        },
881        RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
882            r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
883                protobuf::MysqlVarbinary {},
884            )),
885        },
886        RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
887            r#type: Some(protobuf::remote_type::Type::MysqlText(
888                protobuf::MysqlText { length: *len },
889            )),
890        },
891        RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
892            r#type: Some(protobuf::remote_type::Type::MysqlBlob(
893                protobuf::MysqlBlob { length: *len },
894            )),
895        },
896        RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
897            r#type: Some(protobuf::remote_type::Type::MysqlJson(
898                protobuf::MysqlJson {},
899            )),
900        },
901        RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
902            r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
903                protobuf::MysqlGeometry {},
904            )),
905        },
906
907        RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
908            r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
909                protobuf::OracleVarchar2 { length: *len },
910            )),
911        },
912        RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
913            r#type: Some(protobuf::remote_type::Type::OracleChar(
914                protobuf::OracleChar { length: *len },
915            )),
916        },
917        RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
918            r#type: Some(protobuf::remote_type::Type::OracleNumber(
919                protobuf::OracleNumber {
920                    precision: *precision as u32,
921                    scale: *scale as i32,
922                },
923            )),
924        },
925        RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
926            r#type: Some(protobuf::remote_type::Type::OracleDate(
927                protobuf::OracleDate {},
928            )),
929        },
930        RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
931            r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
932                protobuf::OracleTimestamp {},
933            )),
934        },
935        RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
936            r#type: Some(protobuf::remote_type::Type::OracleBoolean(
937                protobuf::OracleBoolean {},
938            )),
939        },
940        RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
941            r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
942                protobuf::OracleBinaryFloat {},
943            )),
944        },
945        RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
946            r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
947                protobuf::OracleBinaryDouble {},
948            )),
949        },
950        RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
951            r#type: Some(protobuf::remote_type::Type::OracleBlob(
952                protobuf::OracleBlob {},
953            )),
954        },
955        RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
956            r#type: Some(protobuf::remote_type::Type::OracleFloat(
957                protobuf::OracleFloat {
958                    precision: *precision as u32,
959                },
960            )),
961        },
962        RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
963            r#type: Some(protobuf::remote_type::Type::OracleNchar(
964                protobuf::OracleNChar { length: *len },
965            )),
966        },
967        RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
968            r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
969                protobuf::OracleNVarchar2 { length: *len },
970            )),
971        },
972        RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
973            r#type: Some(protobuf::remote_type::Type::OracleRaw(
974                protobuf::OracleRaw { length: *len },
975            )),
976        },
977        RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
978            r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
979                protobuf::OracleLongRaw {},
980            )),
981        },
982        RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
983            r#type: Some(protobuf::remote_type::Type::OracleLong(
984                protobuf::OracleLong {},
985            )),
986        },
987        RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
988            r#type: Some(protobuf::remote_type::Type::OracleClob(
989                protobuf::OracleClob {},
990            )),
991        },
992        RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
993            r#type: Some(protobuf::remote_type::Type::OracleNclob(
994                protobuf::OracleNClob {},
995            )),
996        },
997        RemoteType::Oracle(OracleType::SdeGeometry) => protobuf::RemoteType {
998            r#type: Some(protobuf::remote_type::Type::OracleSdeGeometry(
999                protobuf::OracleSdeGeometry {},
1000            )),
1001        },
1002        RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
1003            r#type: Some(protobuf::remote_type::Type::SqliteNull(
1004                protobuf::SqliteNull {},
1005            )),
1006        },
1007        RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
1008            r#type: Some(protobuf::remote_type::Type::SqliteInteger(
1009                protobuf::SqliteInteger {},
1010            )),
1011        },
1012        RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
1013            r#type: Some(protobuf::remote_type::Type::SqliteReal(
1014                protobuf::SqliteReal {},
1015            )),
1016        },
1017        RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
1018            r#type: Some(protobuf::remote_type::Type::SqliteText(
1019                protobuf::SqliteText {},
1020            )),
1021        },
1022        RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
1023            r#type: Some(protobuf::remote_type::Type::SqliteBlob(
1024                protobuf::SqliteBlob {},
1025            )),
1026        },
1027        RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
1028            r#type: Some(protobuf::remote_type::Type::DmTinyInt(
1029                protobuf::DmTinyInt {},
1030            )),
1031        },
1032        RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
1033            r#type: Some(protobuf::remote_type::Type::DmSmallInt(
1034                protobuf::DmSmallInt {},
1035            )),
1036        },
1037        RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
1038            r#type: Some(protobuf::remote_type::Type::DmInteger(
1039                protobuf::DmInteger {},
1040            )),
1041        },
1042        RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
1043            r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
1044        },
1045        RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
1046            r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
1047        },
1048        RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
1049            r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
1050        },
1051        RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
1052            r#type: Some(protobuf::remote_type::Type::DmNumeric(
1053                protobuf::DmNumeric {
1054                    precision: *precision as u32,
1055                    scale: *scale as i32,
1056                },
1057            )),
1058        },
1059        RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
1060            r#type: Some(protobuf::remote_type::Type::DmDecimal(
1061                protobuf::DmDecimal {
1062                    precision: *precision as u32,
1063                    scale: *scale as i32,
1064                },
1065            )),
1066        },
1067        RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
1068            r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
1069                length: len.map(|s| s as u32),
1070            })),
1071        },
1072        RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
1073            r#type: Some(protobuf::remote_type::Type::DmVarchar(
1074                protobuf::DmVarchar {
1075                    length: len.map(|s| s as u32),
1076                },
1077            )),
1078        },
1079        RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
1080            r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
1081        },
1082        RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
1083            r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
1084                length: *len as u32,
1085            })),
1086        },
1087        RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
1088            r#type: Some(protobuf::remote_type::Type::DmVarbinary(
1089                protobuf::DmVarbinary {
1090                    length: len.map(|s| s as u32),
1091                },
1092            )),
1093        },
1094        RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
1095            r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
1096        },
1097        RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
1098            r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
1099        },
1100        RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
1101            r#type: Some(protobuf::remote_type::Type::DmTimestamp(
1102                protobuf::DmTimestamp {
1103                    precision: *precision as u32,
1104                },
1105            )),
1106        },
1107        RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
1108            r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
1109                precision: *precision as u32,
1110            })),
1111        },
1112        RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
1113            r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
1114        },
1115    }
1116}
1117
1118fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
1119    let fields = remote_schema
1120        .fields
1121        .iter()
1122        .map(parse_remote_field)
1123        .collect::<Vec<_>>();
1124
1125    RemoteSchema { fields }
1126}
1127
1128fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
1129    RemoteField {
1130        name: field.name.clone(),
1131        remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
1132        nullable: field.nullable,
1133        auto_increment: field.auto_increment,
1134    }
1135}
1136
1137fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
1138    match remote_type.r#type.as_ref().unwrap() {
1139        protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
1140        protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
1141        protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
1142        protobuf::remote_type::Type::PostgresFloat4(_) => {
1143            RemoteType::Postgres(PostgresType::Float4)
1144        }
1145        protobuf::remote_type::Type::PostgresFloat8(_) => {
1146            RemoteType::Postgres(PostgresType::Float8)
1147        }
1148        protobuf::remote_type::Type::PostgresNumeric(numeric) => RemoteType::Postgres(
1149            PostgresType::Numeric(numeric.precision as u8, numeric.scale as i8),
1150        ),
1151        protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
1152        protobuf::remote_type::Type::PostgresVarchar(_) => {
1153            RemoteType::Postgres(PostgresType::Varchar)
1154        }
1155        protobuf::remote_type::Type::PostgresBpchar(_) => {
1156            RemoteType::Postgres(PostgresType::Bpchar)
1157        }
1158        protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
1159        protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
1160        protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
1161        protobuf::remote_type::Type::PostgresTimestamp(_) => {
1162            RemoteType::Postgres(PostgresType::Timestamp)
1163        }
1164        protobuf::remote_type::Type::PostgresTimestampTz(_) => {
1165            RemoteType::Postgres(PostgresType::TimestampTz)
1166        }
1167        protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
1168        protobuf::remote_type::Type::PostgresInterval(_) => {
1169            RemoteType::Postgres(PostgresType::Interval)
1170        }
1171        protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
1172        protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
1173        protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
1174        protobuf::remote_type::Type::PostgresInt2Array(_) => {
1175            RemoteType::Postgres(PostgresType::Int2Array)
1176        }
1177        protobuf::remote_type::Type::PostgresInt4Array(_) => {
1178            RemoteType::Postgres(PostgresType::Int4Array)
1179        }
1180        protobuf::remote_type::Type::PostgresInt8Array(_) => {
1181            RemoteType::Postgres(PostgresType::Int8Array)
1182        }
1183        protobuf::remote_type::Type::PostgresFloat4Array(_) => {
1184            RemoteType::Postgres(PostgresType::Float4Array)
1185        }
1186        protobuf::remote_type::Type::PostgresFloat8Array(_) => {
1187            RemoteType::Postgres(PostgresType::Float8Array)
1188        }
1189        protobuf::remote_type::Type::PostgresVarcharArray(_) => {
1190            RemoteType::Postgres(PostgresType::VarcharArray)
1191        }
1192        protobuf::remote_type::Type::PostgresBpcharArray(_) => {
1193            RemoteType::Postgres(PostgresType::BpcharArray)
1194        }
1195        protobuf::remote_type::Type::PostgresTextArray(_) => {
1196            RemoteType::Postgres(PostgresType::TextArray)
1197        }
1198        protobuf::remote_type::Type::PostgresByteaArray(_) => {
1199            RemoteType::Postgres(PostgresType::ByteaArray)
1200        }
1201        protobuf::remote_type::Type::PostgresBoolArray(_) => {
1202            RemoteType::Postgres(PostgresType::BoolArray)
1203        }
1204        protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
1205            RemoteType::Postgres(PostgresType::PostGisGeometry)
1206        }
1207        protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
1208        protobuf::remote_type::Type::PostgresXml(_) => RemoteType::Postgres(PostgresType::Xml),
1209        protobuf::remote_type::Type::PostgresUuid(_) => RemoteType::Postgres(PostgresType::Uuid),
1210        protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
1211        protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
1212            RemoteType::Mysql(MysqlType::TinyIntUnsigned)
1213        }
1214        protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
1215        protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
1216            RemoteType::Mysql(MysqlType::SmallIntUnsigned)
1217        }
1218        protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
1219        protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
1220            RemoteType::Mysql(MysqlType::MediumIntUnsigned)
1221        }
1222        protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
1223        protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
1224            RemoteType::Mysql(MysqlType::IntegerUnsigned)
1225        }
1226        protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
1227        protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
1228            RemoteType::Mysql(MysqlType::BigIntUnsigned)
1229        }
1230        protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
1231        protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
1232        protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
1233            MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
1234        ),
1235        protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
1236        protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
1237        protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
1238        protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
1239        protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
1240        protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
1241        protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
1242        protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
1243        protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
1244        protobuf::remote_type::Type::MysqlText(text) => {
1245            RemoteType::Mysql(MysqlType::Text(text.length))
1246        }
1247        protobuf::remote_type::Type::MysqlBlob(blob) => {
1248            RemoteType::Mysql(MysqlType::Blob(blob.length))
1249        }
1250        protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
1251        protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
1252        protobuf::remote_type::Type::OracleVarchar2(varchar) => {
1253            RemoteType::Oracle(OracleType::Varchar2(varchar.length))
1254        }
1255        protobuf::remote_type::Type::OracleChar(char) => {
1256            RemoteType::Oracle(OracleType::Char(char.length))
1257        }
1258        protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
1259            OracleType::Number(number.precision as u8, number.scale as i8),
1260        ),
1261        protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
1262        protobuf::remote_type::Type::OracleTimestamp(_) => {
1263            RemoteType::Oracle(OracleType::Timestamp)
1264        }
1265        protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
1266        protobuf::remote_type::Type::OracleBinaryFloat(_) => {
1267            RemoteType::Oracle(OracleType::BinaryFloat)
1268        }
1269        protobuf::remote_type::Type::OracleBinaryDouble(_) => {
1270            RemoteType::Oracle(OracleType::BinaryDouble)
1271        }
1272        protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
1273            RemoteType::Oracle(OracleType::Float(*precision as u8))
1274        }
1275        protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
1276            RemoteType::Oracle(OracleType::NChar(*length))
1277        }
1278        protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
1279            RemoteType::Oracle(OracleType::NVarchar2(*length))
1280        }
1281        protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
1282            RemoteType::Oracle(OracleType::Raw(*length))
1283        }
1284        protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
1285        protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
1286        protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
1287        protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
1288        protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
1289        protobuf::remote_type::Type::OracleSdeGeometry(_) => {
1290            RemoteType::Oracle(OracleType::SdeGeometry)
1291        }
1292        protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1293        protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1294        protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1295        protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1296        protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1297        protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1298        protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1299        protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1300        protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1301        protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1302        protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1303        protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1304            RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1305        }
1306        protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1307            RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1308        }
1309        protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1310            RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1311        }
1312        protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1313            RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1314        }
1315        protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1316        protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1317            RemoteType::Dm(DmType::Binary(*length as u16))
1318        }
1319        protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1320            RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1321        }
1322        protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1323        protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1324        protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1325            RemoteType::Dm(DmType::Timestamp(*precision as u8))
1326        }
1327        protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1328            RemoteType::Dm(DmType::Time(*precision as u8))
1329        }
1330        protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1331    }
1332}
1333
1334fn serialize_remote_source(source: &RemoteSource) -> protobuf::RemoteSource {
1335    match source {
1336        RemoteSource::Query(query) => protobuf::RemoteSource {
1337            source: Some(protobuf::remote_source::Source::Query(query.clone())),
1338        },
1339        RemoteSource::Table(table_identifiers) => protobuf::RemoteSource {
1340            source: Some(protobuf::remote_source::Source::Table(
1341                protobuf::Identifiers {
1342                    idents: table_identifiers.clone(),
1343                },
1344            )),
1345        },
1346    }
1347}
1348
1349fn parse_remote_source(source: &protobuf::RemoteSource) -> DFResult<RemoteSource> {
1350    let source = source.source.as_ref().ok_or(DataFusionError::Internal(
1351        "remote source is not set".to_string(),
1352    ))?;
1353    match source {
1354        protobuf::remote_source::Source::Query(query) => Ok(RemoteSource::Query(query.clone())),
1355        protobuf::remote_source::Source::Table(table_identifiers) => {
1356            Ok(RemoteSource::Table(table_identifiers.idents.clone()))
1357        }
1358    }
1359}
1360
1361fn serialize_partitions(partitions: &[Vec<RecordBatch>]) -> Result<Vec<Vec<u8>>, DataFusionError> {
1362    let mut proto_partitions = vec![];
1363    for partition in partitions {
1364        if partition.is_empty() {
1365            proto_partitions.push(vec![]);
1366            continue;
1367        }
1368        let mut proto_partition = vec![];
1369        let mut stream_writer =
1370            StreamWriter::try_new(&mut proto_partition, &partition[0].schema())?;
1371        for batch in partition {
1372            stream_writer.write(batch)?;
1373        }
1374        stream_writer.finish()?;
1375        proto_partitions.push(proto_partition);
1376    }
1377    Ok(proto_partitions)
1378}
1379
1380fn parse_partitions(
1381    proto_partitions: &[Vec<u8>],
1382) -> Result<Vec<Vec<RecordBatch>>, DataFusionError> {
1383    let mut partitions = vec![];
1384    for proto_partition in proto_partitions {
1385        if proto_partition.is_empty() {
1386            partitions.push(vec![]);
1387            continue;
1388        }
1389        let mut partition = vec![];
1390        let stream_reader = StreamReader::try_new(proto_partition.as_slice(), None)?;
1391        for batch in stream_reader {
1392            partition.push(batch?);
1393        }
1394        partitions.push(partition);
1395    }
1396    Ok(partitions)
1397}