datafusion_remote_table/
codec.rs

1#[cfg(feature = "dm")]
2use crate::DmConnectionOptions;
3#[cfg(feature = "mysql")]
4use crate::MysqlConnectionOptions;
5#[cfg(feature = "oracle")]
6use crate::OracleConnectionOptions;
7#[cfg(feature = "postgres")]
8use crate::PostgresConnectionOptions;
9#[cfg(feature = "sqlite")]
10use crate::SqliteConnectionOptions;
11use crate::generated::prost as protobuf;
12use crate::{
13    Connection, ConnectionOptions, DFResult, DefaultTransform, DefaultUnparser, DmType, MysqlType,
14    OracleType, PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef, RemoteSource,
15    RemoteTableInsertExec, RemoteTableScanExec, RemoteType, SqliteType, Transform, Unparse,
16    connect,
17};
18use datafusion::arrow::array::RecordBatch;
19use datafusion::arrow::datatypes::{Schema, SchemaRef};
20use datafusion::arrow::ipc::reader::StreamReader;
21use datafusion::arrow::ipc::writer::StreamWriter;
22use datafusion::catalog::memory::{DataSourceExec, MemorySourceConfig};
23use datafusion::common::DataFusionError;
24use datafusion::datasource::source::DataSource;
25use datafusion::execution::FunctionRegistry;
26use datafusion::physical_plan::ExecutionPlan;
27use datafusion_proto::convert_required;
28use datafusion_proto::physical_plan::PhysicalExtensionCodec;
29use datafusion_proto::physical_plan::from_proto::parse_physical_sort_exprs;
30use datafusion_proto::physical_plan::to_proto::serialize_physical_sort_exprs;
31use datafusion_proto::protobuf::proto_error;
32use derive_with::With;
33use log::debug;
34use prost::Message;
35use std::fmt::Debug;
36use std::sync::Arc;
37
38pub trait TransformCodec: Debug + Send + Sync {
39    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
40    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
41}
42
43#[derive(Debug)]
44pub struct DefaultTransformCodec {}
45
46const DEFAULT_TRANSFORM_ID: &str = "__default";
47
48impl TransformCodec for DefaultTransformCodec {
49    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
50        if value.as_any().is::<DefaultTransform>() {
51            Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
52        } else {
53            Err(DataFusionError::Execution(format!(
54                "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
55            )))
56        }
57    }
58
59    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
60        if value == DEFAULT_TRANSFORM_ID.as_bytes() {
61            Ok(Arc::new(DefaultTransform {}))
62        } else {
63            Err(DataFusionError::Execution(
64                "DefaultTransformCodec only supports DefaultTransform".to_string(),
65            ))
66        }
67    }
68}
69
70pub trait UnparseCodec: Debug + Send + Sync {
71    fn try_encode(&self, value: &dyn Unparse) -> DFResult<Vec<u8>>;
72    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Unparse>>;
73}
74
75#[derive(Debug)]
76pub struct DefaultUnparseCodec {}
77
78const DEFAULT_UNPARSE_ID: &str = "__default";
79
80impl UnparseCodec for DefaultUnparseCodec {
81    fn try_encode(&self, value: &dyn Unparse) -> DFResult<Vec<u8>> {
82        if value.as_any().is::<DefaultUnparser>() {
83            Ok(DEFAULT_UNPARSE_ID.as_bytes().to_vec())
84        } else {
85            Err(DataFusionError::Execution(format!(
86                "DefaultUnparseCodec does not support unparse: {value:?}, please implement a custom UnparseCodec."
87            )))
88        }
89    }
90
91    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Unparse>> {
92        if value == DEFAULT_UNPARSE_ID.as_bytes() {
93            Ok(Arc::new(DefaultUnparser {}))
94        } else {
95            Err(DataFusionError::Execution(
96                "DefaultUnparseCodec only supports DefaultUnparser".to_string(),
97            ))
98        }
99    }
100}
101
102pub trait ConnectionCodec: Debug + Send + Sync {
103    fn try_encode(
104        &self,
105        value: &dyn Connection,
106        conn_options: &ConnectionOptions,
107    ) -> DFResult<Vec<u8>>;
108    fn try_decode(
109        &self,
110        value: &[u8],
111        conn_options: &ConnectionOptions,
112    ) -> DFResult<Arc<dyn Connection>>;
113}
114
115#[derive(Debug)]
116pub struct DefaultConnectionCodec;
117
118const DEFAULT_CONNECTION_VALUE: &str = "__default";
119
120impl ConnectionCodec for DefaultConnectionCodec {
121    fn try_encode(
122        &self,
123        _value: &dyn Connection,
124        _conn_options: &ConnectionOptions,
125    ) -> DFResult<Vec<u8>> {
126        Ok(DEFAULT_CONNECTION_VALUE.as_bytes().to_vec())
127    }
128
129    fn try_decode(
130        &self,
131        value: &[u8],
132        conn_options: &ConnectionOptions,
133    ) -> DFResult<Arc<dyn Connection>> {
134        if value != DEFAULT_CONNECTION_VALUE.as_bytes() {
135            return Err(DataFusionError::Execution(format!(
136                "DefaultConnectionCodec only supports {DEFAULT_CONNECTION_VALUE} value but got: {value:?}"
137            )));
138        }
139        let conn_options = conn_options.clone().with_pool_max_size(1);
140        tokio::task::block_in_place(|| {
141            tokio::runtime::Handle::current().block_on(async {
142                let pool = connect(&conn_options).await?;
143                let conn = pool.get().await?;
144                Ok::<_, DataFusionError>(conn)
145            })
146        })
147    }
148}
149
150#[derive(Debug, With)]
151pub struct RemotePhysicalCodec {
152    pub transform_codec: Arc<dyn TransformCodec>,
153    pub unparse_codec: Arc<dyn UnparseCodec>,
154    pub connection_codec: Arc<dyn ConnectionCodec>,
155}
156
157impl RemotePhysicalCodec {
158    pub fn new() -> Self {
159        Self {
160            transform_codec: Arc::new(DefaultTransformCodec {}),
161            unparse_codec: Arc::new(DefaultUnparseCodec {}),
162            connection_codec: Arc::new(DefaultConnectionCodec {}),
163        }
164    }
165}
166
167impl Default for RemotePhysicalCodec {
168    fn default() -> Self {
169        Self::new()
170    }
171}
172
173impl PhysicalExtensionCodec for RemotePhysicalCodec {
174    fn try_decode(
175        &self,
176        buf: &[u8],
177        inputs: &[Arc<dyn ExecutionPlan>],
178        registry: &dyn FunctionRegistry,
179    ) -> DFResult<Arc<dyn ExecutionPlan>> {
180        let remote_table_node =
181            protobuf::RemoteTablePhysicalPlanNode::decode(buf).map_err(|e| {
182                DataFusionError::Internal(format!(
183                    "Failed to decode remote table physical plan node: {e:?}"
184                ))
185            })?;
186        let remote_table_plan = remote_table_node.remote_table_physical_plan_type.ok_or_else(|| {
187            DataFusionError::Internal(
188                "Failed to decode remote table physical plan node due to physical plan type is none".to_string()
189            )
190        })?;
191
192        match remote_table_plan {
193            protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(proto) => {
194                let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
195                    Arc::new(DefaultTransform {})
196                } else {
197                    self.transform_codec.try_decode(&proto.transform)?
198                };
199
200                let source = parse_remote_source(proto.source.as_ref().ok_or(DataFusionError::Internal(
201                    "remote source is not set".to_string(),
202                ))?)?;
203
204                let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
205                let remote_schema = proto
206                    .remote_schema
207                    .map(|schema| Arc::new(parse_remote_schema(&schema)));
208
209                let projection: Option<Vec<usize>> = proto
210                    .projection
211                    .map(|p| p.projection.iter().map(|n| *n as usize).collect());
212
213                let limit = proto.limit.map(|l| l as usize);
214
215                let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
216
217                let now = std::time::Instant::now();
218                let conn = self
219                    .connection_codec
220                    .try_decode(&proto.connection, &conn_options)?;
221                debug!(
222                    "[remote-table] Decoding connection cost {}ms",
223                    now.elapsed().as_millis()
224                );
225
226                Ok(Arc::new(RemoteTableScanExec::try_new(
227                    conn_options,
228                    source,
229                    table_schema,
230                    remote_schema,
231                    projection,
232                    proto.unparsed_filters,
233                    limit,
234                    transform,
235                    conn,
236                )?))
237            }
238            protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(proto) => {
239
240                if inputs.len() != 1 {
241                    return Err(DataFusionError::Internal(
242                        "RemoteTableInsertExec only support one input".to_string(),
243                    ));
244                }
245
246                let input = inputs[0].clone();
247
248                let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
249                let remote_schema = Arc::new(parse_remote_schema(&proto.remote_schema.unwrap()));
250                let table = proto.table.unwrap().idents;
251
252                let unparser = if proto.unparser == DEFAULT_UNPARSE_ID.as_bytes() {
253                    Arc::new(DefaultUnparser {})
254                } else {
255                    self.unparse_codec.try_decode(&proto.unparser)?
256                };
257
258                let now = std::time::Instant::now();
259                let conn = self
260                    .connection_codec
261                    .try_decode(&proto.connection, &conn_options)?;
262                debug!(
263                    "[remote-table] Decoding connection cost {}ms",
264                    now.elapsed().as_millis()
265                );
266
267                Ok(Arc::new(RemoteTableInsertExec::new(
268                    input,
269                    conn_options,
270                    unparser,
271                    table,
272                    remote_schema,
273                    conn,
274                )))
275            }
276            protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::MemoryDatasource(proto) => {
277                let partitions = parse_partitions(&proto.partitions)?;
278                let schema = Schema::try_from(&proto.schema.unwrap())?;
279                let projection = parse_projection(proto.projection.as_ref());
280
281                let sort_information = proto
282                    .sort_information
283                    .iter()
284                    .map(|sort_exprs| {
285                        let sort_exprs = parse_physical_sort_exprs(
286                            sort_exprs.physical_sort_expr_nodes.as_slice(),
287                            registry,
288                            &schema,
289                            self,
290                        )?;
291                        Ok::<_, DataFusionError>(sort_exprs)
292                    })
293                    .collect::<Result<Vec<_>, _>>()?;
294
295                let show_sizes = proto.show_sizes;
296                let fetch = proto.fetch.map(|f| f as usize);
297                let memory_source =
298                    MemorySourceConfig::try_new(&partitions, Arc::new(schema), projection)?
299                        .with_show_sizes(show_sizes)
300                        .with_limit(fetch);
301
302                let memory_source =
303                    MemorySourceConfig::try_with_sort_information(memory_source, sort_information)?;
304                Ok(DataSourceExec::from_data_source(memory_source))
305            }
306        }
307    }
308
309    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
310        if let Some(exec) = node.as_any().downcast_ref::<RemoteTableScanExec>() {
311            let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
312                DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
313            } else {
314                let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
315                assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
316                bytes
317            };
318
319            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
320            let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
321
322            let serialized_conn = self
323                .connection_codec
324                .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
325
326            let serialized_source = serialize_remote_source(&exec.source);
327
328            let proto = protobuf::RemoteTablePhysicalPlanNode {
329                remote_table_physical_plan_type: Some(
330                    protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(
331                        protobuf::RemoteTableScanExec {
332                            conn_options: Some(serialized_connection_options),
333                            source: Some(serialized_source),
334                            table_schema: Some(exec.table_schema.as_ref().try_into()?),
335                            remote_schema,
336                            projection: serialize_projection(exec.projection.as_ref()),
337                            unparsed_filters: exec.unparsed_filters.clone(),
338                            limit: exec.limit.map(|l| l as u32),
339                            transform: serialized_transform,
340                            connection: serialized_conn,
341                        },
342                    ),
343                ),
344            };
345
346            proto.encode(buf).map_err(|e| {
347                DataFusionError::Internal(format!(
348                    "Failed to encode remote table scan exec plan: {e:?}"
349                ))
350            })?;
351            Ok(())
352        } else if let Some(exec) = node.as_any().downcast_ref::<RemoteTableInsertExec>() {
353            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
354            let remote_schema = serialize_remote_schema(&exec.remote_schema);
355            let serialized_table = protobuf::Identifiers {
356                idents: exec.table.clone(),
357            };
358            let serialized_unparser = self.unparse_codec.try_encode(exec.unparser.as_ref())?;
359            let serialized_conn = self
360                .connection_codec
361                .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
362
363            let proto = protobuf::RemoteTablePhysicalPlanNode {
364                remote_table_physical_plan_type: Some(
365                    protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(
366                        protobuf::RemoteTableInsertExec {
367                            conn_options: Some(serialized_connection_options),
368                            table: Some(serialized_table),
369                            remote_schema: Some(remote_schema),
370                            unparser: serialized_unparser,
371                            connection: serialized_conn,
372                        },
373                    ),
374                ),
375            };
376
377            proto.encode(buf).map_err(|e| {
378                DataFusionError::Internal(format!(
379                    "Failed to encode remote table insert exec node: {e:?}"
380                ))
381            })?;
382
383            Ok(())
384        } else if let Some(exec) = node.as_any().downcast_ref::<DataSourceExec>() {
385            let source = exec.data_source();
386            if let Some(memory_source) = source.as_any().downcast_ref::<MemorySourceConfig>() {
387                let proto_partitions = serialize_partitions(memory_source.partitions())?;
388                let projection = serialize_projection(memory_source.projection().as_ref());
389                let sort_information = memory_source
390                    .sort_information()
391                    .iter()
392                    .map(|ordering| {
393                        let sort_exprs = serialize_physical_sort_exprs(ordering.clone(), self)?;
394                        Ok::<_, DataFusionError>(
395                            datafusion_proto::protobuf::PhysicalSortExprNodeCollection {
396                                physical_sort_expr_nodes: sort_exprs,
397                            },
398                        )
399                    })
400                    .collect::<Result<Vec<_>, _>>()?;
401
402                let proto = protobuf::RemoteTablePhysicalPlanNode {
403                    remote_table_physical_plan_type: Some(
404                        protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::MemoryDatasource(protobuf::MemoryDatasourceNode {
405                            partitions: proto_partitions,
406                            schema: Some(memory_source.original_schema().try_into()?),
407                            projection,
408                            sort_information,
409                            show_sizes: memory_source.show_sizes(),
410                            fetch: memory_source.fetch().map(|f| f as u32),
411                        }),
412                    ),
413                };
414
415                proto.encode(buf).map_err(|e| {
416                    DataFusionError::Internal(format!(
417                        "Failed to encode memory datasource node: {e:?}"
418                    ))
419                })?;
420
421                Ok(())
422            } else {
423                Err(DataFusionError::NotImplemented(format!(
424                    "RemotePhysicalCodec only support encoding MemorySourceConfig, got {source:?}",
425                )))
426            }
427        } else {
428            Err(DataFusionError::NotImplemented(format!(
429                "RemotePhysicalCodec does not support encoding {}",
430                node.name()
431            )))
432        }
433    }
434}
435
436fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
437    match options {
438        #[cfg(feature = "postgres")]
439        ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
440            connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
441                protobuf::PostgresConnectionOptions {
442                    host: options.host.clone(),
443                    port: options.port as u32,
444                    username: options.username.clone(),
445                    password: options.password.clone(),
446                    database: options.database.clone(),
447                    pool_max_size: options.pool_max_size as u32,
448                    stream_chunk_size: options.stream_chunk_size as u32,
449                    default_numeric_scale: options.default_numeric_scale as i32,
450                },
451            )),
452        },
453        #[cfg(feature = "mysql")]
454        ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
455            connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
456                protobuf::MysqlConnectionOptions {
457                    host: options.host.clone(),
458                    port: options.port as u32,
459                    username: options.username.clone(),
460                    password: options.password.clone(),
461                    database: options.database.clone(),
462                    pool_max_size: options.pool_max_size as u32,
463                    stream_chunk_size: options.stream_chunk_size as u32,
464                },
465            )),
466        },
467        #[cfg(feature = "oracle")]
468        ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
469            connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
470                protobuf::OracleConnectionOptions {
471                    host: options.host.clone(),
472                    port: options.port as u32,
473                    username: options.username.clone(),
474                    password: options.password.clone(),
475                    service_name: options.service_name.clone(),
476                    pool_max_size: options.pool_max_size as u32,
477                    stream_chunk_size: options.stream_chunk_size as u32,
478                },
479            )),
480        },
481        #[cfg(feature = "sqlite")]
482        ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
483            connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
484                protobuf::SqliteConnectionOptions {
485                    path: options.path.to_str().unwrap().to_string(),
486                    stream_chunk_size: options.stream_chunk_size as u32,
487                },
488            )),
489        },
490        #[cfg(feature = "dm")]
491        ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
492            connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
493                protobuf::DmConnectionOptions {
494                    host: options.host.clone(),
495                    port: options.port as u32,
496                    username: options.username.clone(),
497                    password: options.password.clone(),
498                    schema: options.schema.clone(),
499                    stream_chunk_size: options.stream_chunk_size as u32,
500                    driver: options.driver.clone(),
501                },
502            )),
503        },
504    }
505}
506
507fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
508    match options.connection_options {
509        #[cfg(feature = "postgres")]
510        Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
511            ConnectionOptions::Postgres(PostgresConnectionOptions {
512                host: options.host,
513                port: options.port as u16,
514                username: options.username,
515                password: options.password,
516                database: options.database,
517                pool_max_size: options.pool_max_size as usize,
518                stream_chunk_size: options.stream_chunk_size as usize,
519                default_numeric_scale: options.default_numeric_scale as i8,
520            })
521        }
522        #[cfg(feature = "mysql")]
523        Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
524            ConnectionOptions::Mysql(MysqlConnectionOptions {
525                host: options.host,
526                port: options.port as u16,
527                username: options.username,
528                password: options.password,
529                database: options.database,
530                pool_max_size: options.pool_max_size as usize,
531                stream_chunk_size: options.stream_chunk_size as usize,
532            })
533        }
534        #[cfg(feature = "oracle")]
535        Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
536            ConnectionOptions::Oracle(OracleConnectionOptions {
537                host: options.host,
538                port: options.port as u16,
539                username: options.username,
540                password: options.password,
541                service_name: options.service_name,
542                pool_max_size: options.pool_max_size as usize,
543                stream_chunk_size: options.stream_chunk_size as usize,
544            })
545        }
546        #[cfg(feature = "sqlite")]
547        Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
548            ConnectionOptions::Sqlite(SqliteConnectionOptions {
549                path: std::path::Path::new(&options.path).to_path_buf(),
550                stream_chunk_size: options.stream_chunk_size as usize,
551            })
552        }
553        #[cfg(feature = "dm")]
554        Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
555            ConnectionOptions::Dm(DmConnectionOptions {
556                host: options.host,
557                port: options.port as u16,
558                username: options.username,
559                password: options.password,
560                schema: options.schema,
561                stream_chunk_size: options.stream_chunk_size as usize,
562                driver: options.driver,
563            })
564        }
565        _ => panic!("Failed to parse connection options: {options:?}"),
566    }
567}
568
569fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<protobuf::Projection> {
570    projection.map(|p| protobuf::Projection {
571        projection: p.iter().map(|n| *n as u32).collect(),
572    })
573}
574
575fn parse_projection(projection: Option<&protobuf::Projection>) -> Option<Vec<usize>> {
576    projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
577}
578
579fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
580    let fields = remote_schema
581        .fields
582        .iter()
583        .map(serialize_remote_field)
584        .collect::<Vec<_>>();
585
586    protobuf::RemoteSchema { fields }
587}
588
589fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
590    protobuf::RemoteField {
591        name: remote_field.name.clone(),
592        remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
593        nullable: remote_field.nullable,
594        auto_increment: remote_field.auto_increment,
595    }
596}
597
598fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
599    match remote_type {
600        RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
601            r#type: Some(protobuf::remote_type::Type::PostgresInt2(
602                protobuf::PostgresInt2 {},
603            )),
604        },
605        RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
606            r#type: Some(protobuf::remote_type::Type::PostgresInt4(
607                protobuf::PostgresInt4 {},
608            )),
609        },
610        RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
611            r#type: Some(protobuf::remote_type::Type::PostgresInt8(
612                protobuf::PostgresInt8 {},
613            )),
614        },
615        RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
616            r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
617                protobuf::PostgresFloat4 {},
618            )),
619        },
620        RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
621            r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
622                protobuf::PostgresFloat8 {},
623            )),
624        },
625        RemoteType::Postgres(PostgresType::Numeric(precision, scale)) => protobuf::RemoteType {
626            r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
627                protobuf::PostgresNumeric {
628                    precision: *precision as u32,
629                    scale: *scale as i32,
630                },
631            )),
632        },
633        RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
634            r#type: Some(protobuf::remote_type::Type::PostgresName(
635                protobuf::PostgresName {},
636            )),
637        },
638        RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
639            r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
640                protobuf::PostgresVarchar {},
641            )),
642        },
643        RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
644            r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
645                protobuf::PostgresBpchar {},
646            )),
647        },
648        RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
649            r#type: Some(protobuf::remote_type::Type::PostgresText(
650                protobuf::PostgresText {},
651            )),
652        },
653        RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
654            r#type: Some(protobuf::remote_type::Type::PostgresBytea(
655                protobuf::PostgresBytea {},
656            )),
657        },
658        RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
659            r#type: Some(protobuf::remote_type::Type::PostgresDate(
660                protobuf::PostgresDate {},
661            )),
662        },
663        RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
664            r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
665                protobuf::PostgresTimestamp {},
666            )),
667        },
668        RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
669            r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
670                protobuf::PostgresTimestampTz {},
671            )),
672        },
673        RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
674            r#type: Some(protobuf::remote_type::Type::PostgresTime(
675                protobuf::PostgresTime {},
676            )),
677        },
678        RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
679            r#type: Some(protobuf::remote_type::Type::PostgresInterval(
680                protobuf::PostgresInterval {},
681            )),
682        },
683        RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
684            r#type: Some(protobuf::remote_type::Type::PostgresBool(
685                protobuf::PostgresBool {},
686            )),
687        },
688        RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
689            r#type: Some(protobuf::remote_type::Type::PostgresJson(
690                protobuf::PostgresJson {},
691            )),
692        },
693        RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
694            r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
695                protobuf::PostgresJsonb {},
696            )),
697        },
698        RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
699            r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
700                protobuf::PostgresInt2Array {},
701            )),
702        },
703        RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
704            r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
705                protobuf::PostgresInt4Array {},
706            )),
707        },
708        RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
709            r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
710                protobuf::PostgresInt8Array {},
711            )),
712        },
713        RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
714            r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
715                protobuf::PostgresFloat4Array {},
716            )),
717        },
718        RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
719            r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
720                protobuf::PostgresFloat8Array {},
721            )),
722        },
723        RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
724            r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
725                protobuf::PostgresVarcharArray {},
726            )),
727        },
728        RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
729            r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
730                protobuf::PostgresBpcharArray {},
731            )),
732        },
733        RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
734            r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
735                protobuf::PostgresTextArray {},
736            )),
737        },
738        RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
739            r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
740                protobuf::PostgresByteaArray {},
741            )),
742        },
743        RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
744            r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
745                protobuf::PostgresBoolArray {},
746            )),
747        },
748        RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
749            r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
750                protobuf::PostgresPostGisGeometry {},
751            )),
752        },
753        RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
754            r#type: Some(protobuf::remote_type::Type::PostgresOid(
755                protobuf::PostgresOid {},
756            )),
757        },
758        RemoteType::Postgres(PostgresType::Xml) => protobuf::RemoteType {
759            r#type: Some(protobuf::remote_type::Type::PostgresXml(
760                protobuf::PostgresXml {},
761            )),
762        },
763        RemoteType::Postgres(PostgresType::Uuid) => protobuf::RemoteType {
764            r#type: Some(protobuf::remote_type::Type::PostgresUuid(
765                protobuf::PostgresUuid {},
766            )),
767        },
768
769        RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
770            r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
771                protobuf::MysqlTinyInt {},
772            )),
773        },
774        RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
775            r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
776                protobuf::MysqlTinyIntUnsigned {},
777            )),
778        },
779        RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
780            r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
781                protobuf::MysqlSmallInt {},
782            )),
783        },
784        RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
785            r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
786                protobuf::MysqlSmallIntUnsigned {},
787            )),
788        },
789        RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
790            r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
791                protobuf::MysqlMediumInt {},
792            )),
793        },
794        RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
795            r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
796                protobuf::MysqlMediumIntUnsigned {},
797            )),
798        },
799        RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
800            r#type: Some(protobuf::remote_type::Type::MysqlInteger(
801                protobuf::MysqlInteger {},
802            )),
803        },
804        RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
805            r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
806                protobuf::MysqlIntegerUnsigned {},
807            )),
808        },
809        RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
810            r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
811                protobuf::MysqlBigInt {},
812            )),
813        },
814        RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
815            r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
816                protobuf::MysqlBigIntUnsigned {},
817            )),
818        },
819        RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
820            r#type: Some(protobuf::remote_type::Type::MysqlFloat(
821                protobuf::MysqlFloat {},
822            )),
823        },
824        RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
825            r#type: Some(protobuf::remote_type::Type::MysqlDouble(
826                protobuf::MysqlDouble {},
827            )),
828        },
829        RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
830            r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
831                protobuf::MysqlDecimal {
832                    precision: *precision as u32,
833                    scale: *scale as u32,
834                },
835            )),
836        },
837        RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
838            r#type: Some(protobuf::remote_type::Type::MysqlDate(
839                protobuf::MysqlDate {},
840            )),
841        },
842        RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
843            r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
844                protobuf::MysqlDateTime {},
845            )),
846        },
847        RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
848            r#type: Some(protobuf::remote_type::Type::MysqlTime(
849                protobuf::MysqlTime {},
850            )),
851        },
852        RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
853            r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
854                protobuf::MysqlTimestamp {},
855            )),
856        },
857        RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
858            r#type: Some(protobuf::remote_type::Type::MysqlYear(
859                protobuf::MysqlYear {},
860            )),
861        },
862        RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
863            r#type: Some(protobuf::remote_type::Type::MysqlChar(
864                protobuf::MysqlChar {},
865            )),
866        },
867        RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
868            r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
869                protobuf::MysqlVarchar {},
870            )),
871        },
872        RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
873            r#type: Some(protobuf::remote_type::Type::MysqlBinary(
874                protobuf::MysqlBinary {},
875            )),
876        },
877        RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
878            r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
879                protobuf::MysqlVarbinary {},
880            )),
881        },
882        RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
883            r#type: Some(protobuf::remote_type::Type::MysqlText(
884                protobuf::MysqlText { length: *len },
885            )),
886        },
887        RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
888            r#type: Some(protobuf::remote_type::Type::MysqlBlob(
889                protobuf::MysqlBlob { length: *len },
890            )),
891        },
892        RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
893            r#type: Some(protobuf::remote_type::Type::MysqlJson(
894                protobuf::MysqlJson {},
895            )),
896        },
897        RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
898            r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
899                protobuf::MysqlGeometry {},
900            )),
901        },
902
903        RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
904            r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
905                protobuf::OracleVarchar2 { length: *len },
906            )),
907        },
908        RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
909            r#type: Some(protobuf::remote_type::Type::OracleChar(
910                protobuf::OracleChar { length: *len },
911            )),
912        },
913        RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
914            r#type: Some(protobuf::remote_type::Type::OracleNumber(
915                protobuf::OracleNumber {
916                    precision: *precision as u32,
917                    scale: *scale as i32,
918                },
919            )),
920        },
921        RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
922            r#type: Some(protobuf::remote_type::Type::OracleDate(
923                protobuf::OracleDate {},
924            )),
925        },
926        RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
927            r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
928                protobuf::OracleTimestamp {},
929            )),
930        },
931        RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
932            r#type: Some(protobuf::remote_type::Type::OracleBoolean(
933                protobuf::OracleBoolean {},
934            )),
935        },
936        RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
937            r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
938                protobuf::OracleBinaryFloat {},
939            )),
940        },
941        RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
942            r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
943                protobuf::OracleBinaryDouble {},
944            )),
945        },
946        RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
947            r#type: Some(protobuf::remote_type::Type::OracleBlob(
948                protobuf::OracleBlob {},
949            )),
950        },
951        RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
952            r#type: Some(protobuf::remote_type::Type::OracleFloat(
953                protobuf::OracleFloat {
954                    precision: *precision as u32,
955                },
956            )),
957        },
958        RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
959            r#type: Some(protobuf::remote_type::Type::OracleNchar(
960                protobuf::OracleNChar { length: *len },
961            )),
962        },
963        RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
964            r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
965                protobuf::OracleNVarchar2 { length: *len },
966            )),
967        },
968        RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
969            r#type: Some(protobuf::remote_type::Type::OracleRaw(
970                protobuf::OracleRaw { length: *len },
971            )),
972        },
973        RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
974            r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
975                protobuf::OracleLongRaw {},
976            )),
977        },
978        RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
979            r#type: Some(protobuf::remote_type::Type::OracleLong(
980                protobuf::OracleLong {},
981            )),
982        },
983        RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
984            r#type: Some(protobuf::remote_type::Type::OracleClob(
985                protobuf::OracleClob {},
986            )),
987        },
988        RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
989            r#type: Some(protobuf::remote_type::Type::OracleNclob(
990                protobuf::OracleNClob {},
991            )),
992        },
993        RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
994            r#type: Some(protobuf::remote_type::Type::SqliteNull(
995                protobuf::SqliteNull {},
996            )),
997        },
998        RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
999            r#type: Some(protobuf::remote_type::Type::SqliteInteger(
1000                protobuf::SqliteInteger {},
1001            )),
1002        },
1003        RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
1004            r#type: Some(protobuf::remote_type::Type::SqliteReal(
1005                protobuf::SqliteReal {},
1006            )),
1007        },
1008        RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
1009            r#type: Some(protobuf::remote_type::Type::SqliteText(
1010                protobuf::SqliteText {},
1011            )),
1012        },
1013        RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
1014            r#type: Some(protobuf::remote_type::Type::SqliteBlob(
1015                protobuf::SqliteBlob {},
1016            )),
1017        },
1018        RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
1019            r#type: Some(protobuf::remote_type::Type::DmTinyInt(
1020                protobuf::DmTinyInt {},
1021            )),
1022        },
1023        RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
1024            r#type: Some(protobuf::remote_type::Type::DmSmallInt(
1025                protobuf::DmSmallInt {},
1026            )),
1027        },
1028        RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
1029            r#type: Some(protobuf::remote_type::Type::DmInteger(
1030                protobuf::DmInteger {},
1031            )),
1032        },
1033        RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
1034            r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
1035        },
1036        RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
1037            r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
1038        },
1039        RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
1040            r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
1041        },
1042        RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
1043            r#type: Some(protobuf::remote_type::Type::DmNumeric(
1044                protobuf::DmNumeric {
1045                    precision: *precision as u32,
1046                    scale: *scale as i32,
1047                },
1048            )),
1049        },
1050        RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
1051            r#type: Some(protobuf::remote_type::Type::DmDecimal(
1052                protobuf::DmDecimal {
1053                    precision: *precision as u32,
1054                    scale: *scale as i32,
1055                },
1056            )),
1057        },
1058        RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
1059            r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
1060                length: len.map(|s| s as u32),
1061            })),
1062        },
1063        RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
1064            r#type: Some(protobuf::remote_type::Type::DmVarchar(
1065                protobuf::DmVarchar {
1066                    length: len.map(|s| s as u32),
1067                },
1068            )),
1069        },
1070        RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
1071            r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
1072        },
1073        RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
1074            r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
1075                length: *len as u32,
1076            })),
1077        },
1078        RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
1079            r#type: Some(protobuf::remote_type::Type::DmVarbinary(
1080                protobuf::DmVarbinary {
1081                    length: len.map(|s| s as u32),
1082                },
1083            )),
1084        },
1085        RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
1086            r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
1087        },
1088        RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
1089            r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
1090        },
1091        RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
1092            r#type: Some(protobuf::remote_type::Type::DmTimestamp(
1093                protobuf::DmTimestamp {
1094                    precision: *precision as u32,
1095                },
1096            )),
1097        },
1098        RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
1099            r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
1100                precision: *precision as u32,
1101            })),
1102        },
1103        RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
1104            r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
1105        },
1106    }
1107}
1108
1109fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
1110    let fields = remote_schema
1111        .fields
1112        .iter()
1113        .map(parse_remote_field)
1114        .collect::<Vec<_>>();
1115
1116    RemoteSchema { fields }
1117}
1118
1119fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
1120    RemoteField {
1121        name: field.name.clone(),
1122        remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
1123        nullable: field.nullable,
1124        auto_increment: field.auto_increment,
1125    }
1126}
1127
1128fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
1129    match remote_type.r#type.as_ref().unwrap() {
1130        protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
1131        protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
1132        protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
1133        protobuf::remote_type::Type::PostgresFloat4(_) => {
1134            RemoteType::Postgres(PostgresType::Float4)
1135        }
1136        protobuf::remote_type::Type::PostgresFloat8(_) => {
1137            RemoteType::Postgres(PostgresType::Float8)
1138        }
1139        protobuf::remote_type::Type::PostgresNumeric(numeric) => RemoteType::Postgres(
1140            PostgresType::Numeric(numeric.precision as u8, numeric.scale as i8),
1141        ),
1142        protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
1143        protobuf::remote_type::Type::PostgresVarchar(_) => {
1144            RemoteType::Postgres(PostgresType::Varchar)
1145        }
1146        protobuf::remote_type::Type::PostgresBpchar(_) => {
1147            RemoteType::Postgres(PostgresType::Bpchar)
1148        }
1149        protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
1150        protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
1151        protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
1152        protobuf::remote_type::Type::PostgresTimestamp(_) => {
1153            RemoteType::Postgres(PostgresType::Timestamp)
1154        }
1155        protobuf::remote_type::Type::PostgresTimestampTz(_) => {
1156            RemoteType::Postgres(PostgresType::TimestampTz)
1157        }
1158        protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
1159        protobuf::remote_type::Type::PostgresInterval(_) => {
1160            RemoteType::Postgres(PostgresType::Interval)
1161        }
1162        protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
1163        protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
1164        protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
1165        protobuf::remote_type::Type::PostgresInt2Array(_) => {
1166            RemoteType::Postgres(PostgresType::Int2Array)
1167        }
1168        protobuf::remote_type::Type::PostgresInt4Array(_) => {
1169            RemoteType::Postgres(PostgresType::Int4Array)
1170        }
1171        protobuf::remote_type::Type::PostgresInt8Array(_) => {
1172            RemoteType::Postgres(PostgresType::Int8Array)
1173        }
1174        protobuf::remote_type::Type::PostgresFloat4Array(_) => {
1175            RemoteType::Postgres(PostgresType::Float4Array)
1176        }
1177        protobuf::remote_type::Type::PostgresFloat8Array(_) => {
1178            RemoteType::Postgres(PostgresType::Float8Array)
1179        }
1180        protobuf::remote_type::Type::PostgresVarcharArray(_) => {
1181            RemoteType::Postgres(PostgresType::VarcharArray)
1182        }
1183        protobuf::remote_type::Type::PostgresBpcharArray(_) => {
1184            RemoteType::Postgres(PostgresType::BpcharArray)
1185        }
1186        protobuf::remote_type::Type::PostgresTextArray(_) => {
1187            RemoteType::Postgres(PostgresType::TextArray)
1188        }
1189        protobuf::remote_type::Type::PostgresByteaArray(_) => {
1190            RemoteType::Postgres(PostgresType::ByteaArray)
1191        }
1192        protobuf::remote_type::Type::PostgresBoolArray(_) => {
1193            RemoteType::Postgres(PostgresType::BoolArray)
1194        }
1195        protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
1196            RemoteType::Postgres(PostgresType::PostGisGeometry)
1197        }
1198        protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
1199        protobuf::remote_type::Type::PostgresXml(_) => RemoteType::Postgres(PostgresType::Xml),
1200        protobuf::remote_type::Type::PostgresUuid(_) => RemoteType::Postgres(PostgresType::Uuid),
1201        protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
1202        protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
1203            RemoteType::Mysql(MysqlType::TinyIntUnsigned)
1204        }
1205        protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
1206        protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
1207            RemoteType::Mysql(MysqlType::SmallIntUnsigned)
1208        }
1209        protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
1210        protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
1211            RemoteType::Mysql(MysqlType::MediumIntUnsigned)
1212        }
1213        protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
1214        protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
1215            RemoteType::Mysql(MysqlType::IntegerUnsigned)
1216        }
1217        protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
1218        protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
1219            RemoteType::Mysql(MysqlType::BigIntUnsigned)
1220        }
1221        protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
1222        protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
1223        protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
1224            MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
1225        ),
1226        protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
1227        protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
1228        protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
1229        protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
1230        protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
1231        protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
1232        protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
1233        protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
1234        protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
1235        protobuf::remote_type::Type::MysqlText(text) => {
1236            RemoteType::Mysql(MysqlType::Text(text.length))
1237        }
1238        protobuf::remote_type::Type::MysqlBlob(blob) => {
1239            RemoteType::Mysql(MysqlType::Blob(blob.length))
1240        }
1241        protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
1242        protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
1243        protobuf::remote_type::Type::OracleVarchar2(varchar) => {
1244            RemoteType::Oracle(OracleType::Varchar2(varchar.length))
1245        }
1246        protobuf::remote_type::Type::OracleChar(char) => {
1247            RemoteType::Oracle(OracleType::Char(char.length))
1248        }
1249        protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
1250            OracleType::Number(number.precision as u8, number.scale as i8),
1251        ),
1252        protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
1253        protobuf::remote_type::Type::OracleTimestamp(_) => {
1254            RemoteType::Oracle(OracleType::Timestamp)
1255        }
1256        protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
1257        protobuf::remote_type::Type::OracleBinaryFloat(_) => {
1258            RemoteType::Oracle(OracleType::BinaryFloat)
1259        }
1260        protobuf::remote_type::Type::OracleBinaryDouble(_) => {
1261            RemoteType::Oracle(OracleType::BinaryDouble)
1262        }
1263        protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
1264            RemoteType::Oracle(OracleType::Float(*precision as u8))
1265        }
1266        protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
1267            RemoteType::Oracle(OracleType::NChar(*length))
1268        }
1269        protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
1270            RemoteType::Oracle(OracleType::NVarchar2(*length))
1271        }
1272        protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
1273            RemoteType::Oracle(OracleType::Raw(*length))
1274        }
1275        protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
1276        protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
1277        protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
1278        protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
1279        protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
1280        protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1281        protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1282        protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1283        protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1284        protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1285        protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1286        protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1287        protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1288        protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1289        protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1290        protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1291        protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1292            RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1293        }
1294        protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1295            RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1296        }
1297        protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1298            RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1299        }
1300        protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1301            RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1302        }
1303        protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1304        protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1305            RemoteType::Dm(DmType::Binary(*length as u16))
1306        }
1307        protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1308            RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1309        }
1310        protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1311        protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1312        protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1313            RemoteType::Dm(DmType::Timestamp(*precision as u8))
1314        }
1315        protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1316            RemoteType::Dm(DmType::Time(*precision as u8))
1317        }
1318        protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1319    }
1320}
1321
1322fn serialize_remote_source(source: &RemoteSource) -> protobuf::RemoteSource {
1323    match source {
1324        RemoteSource::Query(query) => protobuf::RemoteSource {
1325            source: Some(protobuf::remote_source::Source::Query(query.clone())),
1326        },
1327        RemoteSource::Table(table_identifiers) => protobuf::RemoteSource {
1328            source: Some(protobuf::remote_source::Source::Table(
1329                protobuf::Identifiers {
1330                    idents: table_identifiers.clone(),
1331                },
1332            )),
1333        },
1334    }
1335}
1336
1337fn parse_remote_source(source: &protobuf::RemoteSource) -> DFResult<RemoteSource> {
1338    let source = source.source.as_ref().ok_or(DataFusionError::Internal(
1339        "remote source is not set".to_string(),
1340    ))?;
1341    match source {
1342        protobuf::remote_source::Source::Query(query) => Ok(RemoteSource::Query(query.clone())),
1343        protobuf::remote_source::Source::Table(table_identifiers) => {
1344            Ok(RemoteSource::Table(table_identifiers.idents.clone()))
1345        }
1346    }
1347}
1348
1349fn serialize_partitions(partitions: &[Vec<RecordBatch>]) -> Result<Vec<Vec<u8>>, DataFusionError> {
1350    let mut proto_partitions = vec![];
1351    for partition in partitions {
1352        if partition.is_empty() {
1353            proto_partitions.push(vec![]);
1354            continue;
1355        }
1356        let mut proto_partition = vec![];
1357        let mut stream_writer =
1358            StreamWriter::try_new(&mut proto_partition, &partition[0].schema())?;
1359        for batch in partition {
1360            stream_writer.write(batch)?;
1361        }
1362        stream_writer.finish()?;
1363        proto_partitions.push(proto_partition);
1364    }
1365    Ok(proto_partitions)
1366}
1367
1368fn parse_partitions(
1369    proto_partitions: &[Vec<u8>],
1370) -> Result<Vec<Vec<RecordBatch>>, DataFusionError> {
1371    let mut partitions = vec![];
1372    for proto_partition in proto_partitions {
1373        if proto_partition.is_empty() {
1374            partitions.push(vec![]);
1375            continue;
1376        }
1377        let mut partition = vec![];
1378        let stream_reader = StreamReader::try_new(proto_partition.as_slice(), None)?;
1379        for batch in stream_reader {
1380            partition.push(batch?);
1381        }
1382        partitions.push(partition);
1383    }
1384    Ok(partitions)
1385}