datafusion_remote_table/
codec.rs

1use crate::DmConnectionOptions;
2use crate::MysqlConnectionOptions;
3use crate::OracleConnectionOptions;
4use crate::PostgresConnectionOptions;
5use crate::SqliteConnectionOptions;
6use crate::generated::prost as protobuf;
7use crate::{
8    Connection, ConnectionOptions, DFResult, DefaultLiteralizer, DefaultTransform, DmType,
9    Literalize, MysqlType, OracleType, PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef,
10    RemoteSource, RemoteTableInsertExec, RemoteTableScanExec, RemoteType, SqliteType, Transform,
11    connect,
12};
13use datafusion::arrow::array::RecordBatch;
14use datafusion::arrow::datatypes::{Schema, SchemaRef};
15use datafusion::arrow::ipc::reader::StreamReader;
16use datafusion::arrow::ipc::writer::StreamWriter;
17use datafusion::catalog::memory::{DataSourceExec, MemorySourceConfig};
18use datafusion::common::DataFusionError;
19use datafusion::datasource::source::DataSource;
20use datafusion::execution::FunctionRegistry;
21use datafusion::physical_expr::LexOrdering;
22use datafusion::physical_plan::ExecutionPlan;
23use datafusion_proto::convert_required;
24use datafusion_proto::physical_plan::PhysicalExtensionCodec;
25use datafusion_proto::physical_plan::from_proto::parse_physical_sort_exprs;
26use datafusion_proto::physical_plan::to_proto::serialize_physical_sort_exprs;
27use datafusion_proto::protobuf::proto_error;
28use derive_with::With;
29use log::debug;
30use prost::Message;
31use std::fmt::Debug;
32use std::sync::Arc;
33use std::time::Duration;
34
35pub trait TransformCodec: Debug + Send + Sync {
36    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
37    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
38}
39
40#[derive(Debug)]
41pub struct DefaultTransformCodec {}
42
43const DEFAULT_TRANSFORM_ID: &str = "__default";
44
45impl TransformCodec for DefaultTransformCodec {
46    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
47        if value.as_any().is::<DefaultTransform>() {
48            Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
49        } else {
50            Err(DataFusionError::Execution(format!(
51                "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
52            )))
53        }
54    }
55
56    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
57        if value == DEFAULT_TRANSFORM_ID.as_bytes() {
58            Ok(Arc::new(DefaultTransform {}))
59        } else {
60            Err(DataFusionError::Execution(
61                "DefaultTransformCodec only supports DefaultTransform".to_string(),
62            ))
63        }
64    }
65}
66
67pub trait LiteralizeCodec: Debug + Send + Sync {
68    fn try_encode(&self, value: &dyn Literalize) -> DFResult<Vec<u8>>;
69    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Literalize>>;
70}
71
72#[derive(Debug)]
73pub struct DefaultLiteralizeCodec {}
74
75const DEFAULT_LITERALIZE_ID: &str = "__default";
76
77impl LiteralizeCodec for DefaultLiteralizeCodec {
78    fn try_encode(&self, value: &dyn Literalize) -> DFResult<Vec<u8>> {
79        if value.as_any().is::<DefaultLiteralizer>() {
80            Ok(DEFAULT_LITERALIZE_ID.as_bytes().to_vec())
81        } else {
82            Err(DataFusionError::Execution(format!(
83                "DefaultLiteralizeCodec does not support literalize: {value:?}, please implement a custom LiteralizeCodec."
84            )))
85        }
86    }
87
88    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Literalize>> {
89        if value == DEFAULT_LITERALIZE_ID.as_bytes() {
90            Ok(Arc::new(DefaultLiteralizer {}))
91        } else {
92            Err(DataFusionError::Execution(
93                "DefaultLiteralizeCodec only supports DefaultLiteralizer".to_string(),
94            ))
95        }
96    }
97}
98
99pub trait ConnectionCodec: Debug + Send + Sync {
100    fn try_encode(
101        &self,
102        value: &dyn Connection,
103        conn_options: &ConnectionOptions,
104    ) -> DFResult<Vec<u8>>;
105    fn try_decode(
106        &self,
107        value: &[u8],
108        conn_options: &ConnectionOptions,
109    ) -> DFResult<Arc<dyn Connection>>;
110}
111
112#[derive(Debug)]
113pub struct DefaultConnectionCodec;
114
115const DEFAULT_CONNECTION_VALUE: &str = "__default";
116
117impl ConnectionCodec for DefaultConnectionCodec {
118    fn try_encode(
119        &self,
120        _value: &dyn Connection,
121        _conn_options: &ConnectionOptions,
122    ) -> DFResult<Vec<u8>> {
123        Ok(DEFAULT_CONNECTION_VALUE.as_bytes().to_vec())
124    }
125
126    fn try_decode(
127        &self,
128        value: &[u8],
129        conn_options: &ConnectionOptions,
130    ) -> DFResult<Arc<dyn Connection>> {
131        if value != DEFAULT_CONNECTION_VALUE.as_bytes() {
132            return Err(DataFusionError::Execution(format!(
133                "DefaultConnectionCodec only supports {DEFAULT_CONNECTION_VALUE} value but got: {value:?}"
134            )));
135        }
136        let conn_options = conn_options.clone().with_pool_max_size(1);
137        tokio::task::block_in_place(|| {
138            tokio::runtime::Handle::current().block_on(async {
139                let pool = connect(&conn_options).await?;
140                let conn = pool.get().await?;
141                Ok::<_, DataFusionError>(conn)
142            })
143        })
144    }
145}
146
147#[derive(Debug, With)]
148pub struct RemotePhysicalCodec {
149    pub transform_codec: Arc<dyn TransformCodec>,
150    pub literalize_codec: Arc<dyn LiteralizeCodec>,
151    pub connection_codec: Arc<dyn ConnectionCodec>,
152}
153
154impl RemotePhysicalCodec {
155    pub fn new() -> Self {
156        Self {
157            transform_codec: Arc::new(DefaultTransformCodec {}),
158            literalize_codec: Arc::new(DefaultLiteralizeCodec {}),
159            connection_codec: Arc::new(DefaultConnectionCodec {}),
160        }
161    }
162}
163
164impl Default for RemotePhysicalCodec {
165    fn default() -> Self {
166        Self::new()
167    }
168}
169
170impl PhysicalExtensionCodec for RemotePhysicalCodec {
171    fn try_decode(
172        &self,
173        buf: &[u8],
174        inputs: &[Arc<dyn ExecutionPlan>],
175        registry: &dyn FunctionRegistry,
176    ) -> DFResult<Arc<dyn ExecutionPlan>> {
177        let remote_table_node =
178            protobuf::RemoteTablePhysicalPlanNode::decode(buf).map_err(|e| {
179                DataFusionError::Internal(format!(
180                    "Failed to decode remote table physical plan node: {e:?}"
181                ))
182            })?;
183        let remote_table_plan = remote_table_node.remote_table_physical_plan_type.ok_or_else(|| {
184            DataFusionError::Internal(
185                "Failed to decode remote table physical plan node due to physical plan type is none".to_string()
186            )
187        })?;
188
189        match remote_table_plan {
190            protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(proto) => {
191                let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
192                    Arc::new(DefaultTransform {})
193                } else {
194                    self.transform_codec.try_decode(&proto.transform)?
195                };
196
197                let source = parse_remote_source(proto.source.as_ref().ok_or(DataFusionError::Internal(
198                    "remote source is not set".to_string(),
199                ))?)?;
200
201                let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
202                let remote_schema = proto
203                    .remote_schema
204                    .map(|schema| Arc::new(parse_remote_schema(&schema)));
205
206                let projection: Option<Vec<usize>> = proto
207                    .projection
208                    .map(|p| p.projection.iter().map(|n| *n as usize).collect());
209
210                let limit = proto.limit.map(|l| l as usize);
211
212                let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
213
214                let now = std::time::Instant::now();
215                let conn = self
216                    .connection_codec
217                    .try_decode(&proto.connection, &conn_options)?;
218                debug!(
219                    "[remote-table] Decoding connection cost {}ms",
220                    now.elapsed().as_millis()
221                );
222
223                Ok(Arc::new(RemoteTableScanExec::try_new(
224                    conn_options,
225                    source,
226                    table_schema,
227                    remote_schema,
228                    projection,
229                    proto.unparsed_filters,
230                    limit,
231                    transform,
232                    conn,
233                )?))
234            }
235            protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(proto) => {
236
237                if inputs.len() != 1 {
238                    return Err(DataFusionError::Internal(
239                        "RemoteTableInsertExec only support one input".to_string(),
240                    ));
241                }
242
243                let input = inputs[0].clone();
244
245                let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
246                let remote_schema = Arc::new(parse_remote_schema(&proto.remote_schema.unwrap()));
247                let table = proto.table.unwrap().idents;
248
249                let literalizer = if proto.literalizer == DEFAULT_LITERALIZE_ID.as_bytes() {
250                    Arc::new(DefaultLiteralizer {})
251                } else {
252                    self.literalize_codec.try_decode(&proto.literalizer)?
253                };
254
255                let now = std::time::Instant::now();
256                let conn = self
257                    .connection_codec
258                    .try_decode(&proto.connection, &conn_options)?;
259                debug!(
260                    "[remote-table] Decoding connection cost {}ms",
261                    now.elapsed().as_millis()
262                );
263
264                Ok(Arc::new(RemoteTableInsertExec::new(
265                    input,
266                    conn_options,
267                    literalizer,
268                    table,
269                    remote_schema,
270                    conn,
271                )))
272            }
273            protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::MemoryDatasource(proto) => {
274                let partitions = parse_partitions(&proto.partitions)?;
275                let schema = Schema::try_from(&proto.schema.unwrap())?;
276                let projection = parse_projection(proto.projection.as_ref());
277
278                let sort_information = proto
279                    .sort_information
280                    .iter()
281                    .map(|sort_exprs| {
282                        let sort_exprs = parse_physical_sort_exprs(
283                            sort_exprs.physical_sort_expr_nodes.as_slice(),
284                            registry,
285                            &schema,
286                            self,
287                        )?;
288                        let lex_ordering = LexOrdering::new(sort_exprs).expect("lex ordering is not empty");
289                        Ok::<_, DataFusionError>(lex_ordering)
290                    })
291                    .collect::<Result<Vec<_>, _>>()?;
292
293                let show_sizes = proto.show_sizes;
294                let fetch = proto.fetch.map(|f| f as usize);
295                let memory_source =
296                    MemorySourceConfig::try_new(&partitions, Arc::new(schema), projection)?
297                        .with_show_sizes(show_sizes)
298                        .with_limit(fetch);
299
300                let memory_source =
301                    MemorySourceConfig::try_with_sort_information(memory_source, sort_information)?;
302                Ok(DataSourceExec::from_data_source(memory_source))
303            }
304        }
305    }
306
307    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
308        if let Some(exec) = node.as_any().downcast_ref::<RemoteTableScanExec>() {
309            let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
310                DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
311            } else {
312                let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
313                assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
314                bytes
315            };
316
317            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
318            let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
319
320            let serialized_conn = self
321                .connection_codec
322                .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
323
324            let serialized_source = serialize_remote_source(&exec.source);
325
326            let proto = protobuf::RemoteTablePhysicalPlanNode {
327                remote_table_physical_plan_type: Some(
328                    protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(
329                        protobuf::RemoteTableScanExec {
330                            conn_options: Some(serialized_connection_options),
331                            source: Some(serialized_source),
332                            table_schema: Some(exec.table_schema.as_ref().try_into()?),
333                            remote_schema,
334                            projection: serialize_projection(exec.projection.as_ref()),
335                            unparsed_filters: exec.unparsed_filters.clone(),
336                            limit: exec.limit.map(|l| l as u32),
337                            transform: serialized_transform,
338                            connection: serialized_conn,
339                        },
340                    ),
341                ),
342            };
343
344            proto.encode(buf).map_err(|e| {
345                DataFusionError::Internal(format!(
346                    "Failed to encode remote table scan exec plan: {e:?}"
347                ))
348            })?;
349            Ok(())
350        } else if let Some(exec) = node.as_any().downcast_ref::<RemoteTableInsertExec>() {
351            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
352            let remote_schema = serialize_remote_schema(&exec.remote_schema);
353            let serialized_table = protobuf::Identifiers {
354                idents: exec.table.clone(),
355            };
356            let serialized_literalizer = self
357                .literalize_codec
358                .try_encode(exec.literalizer.as_ref())?;
359            let serialized_conn = self
360                .connection_codec
361                .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
362
363            let proto = protobuf::RemoteTablePhysicalPlanNode {
364                remote_table_physical_plan_type: Some(
365                    protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(
366                        protobuf::RemoteTableInsertExec {
367                            conn_options: Some(serialized_connection_options),
368                            table: Some(serialized_table),
369                            remote_schema: Some(remote_schema),
370                            literalizer: serialized_literalizer,
371                            connection: serialized_conn,
372                        },
373                    ),
374                ),
375            };
376
377            proto.encode(buf).map_err(|e| {
378                DataFusionError::Internal(format!(
379                    "Failed to encode remote table insert exec node: {e:?}"
380                ))
381            })?;
382
383            Ok(())
384        } else if let Some(exec) = node.as_any().downcast_ref::<DataSourceExec>() {
385            let source = exec.data_source();
386            if let Some(memory_source) = source.as_any().downcast_ref::<MemorySourceConfig>() {
387                let proto_partitions = serialize_partitions(memory_source.partitions())?;
388                let projection = serialize_projection(memory_source.projection().as_ref());
389                let sort_information = memory_source
390                    .sort_information()
391                    .iter()
392                    .map(|ordering| {
393                        let sort_exprs = serialize_physical_sort_exprs(ordering.clone(), self)?;
394                        Ok::<_, DataFusionError>(
395                            datafusion_proto::protobuf::PhysicalSortExprNodeCollection {
396                                physical_sort_expr_nodes: sort_exprs,
397                            },
398                        )
399                    })
400                    .collect::<Result<Vec<_>, _>>()?;
401
402                let proto = protobuf::RemoteTablePhysicalPlanNode {
403                    remote_table_physical_plan_type: Some(
404                        protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::MemoryDatasource(protobuf::MemoryDatasourceNode {
405                            partitions: proto_partitions,
406                            schema: Some(memory_source.original_schema().try_into()?),
407                            projection,
408                            sort_information,
409                            show_sizes: memory_source.show_sizes(),
410                            fetch: memory_source.fetch().map(|f| f as u32),
411                        }),
412                    ),
413                };
414
415                proto.encode(buf).map_err(|e| {
416                    DataFusionError::Internal(format!(
417                        "Failed to encode memory datasource node: {e:?}"
418                    ))
419                })?;
420
421                Ok(())
422            } else {
423                Err(DataFusionError::NotImplemented(format!(
424                    "RemotePhysicalCodec only support encoding MemorySourceConfig, got {source:?}",
425                )))
426            }
427        } else {
428            Err(DataFusionError::NotImplemented(format!(
429                "RemotePhysicalCodec does not support encoding {}",
430                node.name()
431            )))
432        }
433    }
434}
435
436fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
437    match options {
438        ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
439            connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
440                protobuf::PostgresConnectionOptions {
441                    host: options.host.clone(),
442                    port: options.port as u32,
443                    username: options.username.clone(),
444                    password: options.password.clone(),
445                    database: options.database.clone(),
446                    pool_max_size: options.pool_max_size as u32,
447                    pool_min_idle: options.pool_min_idle as u32,
448                    pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
449                    pool_ttl_check_interval: Some(serialize_duration(
450                        &options.pool_ttl_check_interval,
451                    )),
452                    stream_chunk_size: options.stream_chunk_size as u32,
453                    default_numeric_scale: options.default_numeric_scale as i32,
454                },
455            )),
456        },
457        ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
458            connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
459                protobuf::MysqlConnectionOptions {
460                    host: options.host.clone(),
461                    port: options.port as u32,
462                    username: options.username.clone(),
463                    password: options.password.clone(),
464                    database: options.database.clone(),
465                    pool_max_size: options.pool_max_size as u32,
466                    pool_min_idle: options.pool_min_idle as u32,
467                    pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
468                    pool_ttl_check_interval: Some(serialize_duration(
469                        &options.pool_ttl_check_interval,
470                    )),
471                    stream_chunk_size: options.stream_chunk_size as u32,
472                },
473            )),
474        },
475        ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
476            connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
477                protobuf::OracleConnectionOptions {
478                    host: options.host.clone(),
479                    port: options.port as u32,
480                    username: options.username.clone(),
481                    password: options.password.clone(),
482                    service_name: options.service_name.clone(),
483                    pool_max_size: options.pool_max_size as u32,
484                    pool_min_idle: options.pool_min_idle as u32,
485                    pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
486                    pool_ttl_check_interval: Some(serialize_duration(
487                        &options.pool_ttl_check_interval,
488                    )),
489                    stream_chunk_size: options.stream_chunk_size as u32,
490                },
491            )),
492        },
493        ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
494            connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
495                protobuf::SqliteConnectionOptions {
496                    path: options.path.to_str().unwrap().to_string(),
497                    stream_chunk_size: options.stream_chunk_size as u32,
498                },
499            )),
500        },
501        ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
502            connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
503                protobuf::DmConnectionOptions {
504                    host: options.host.clone(),
505                    port: options.port as u32,
506                    username: options.username.clone(),
507                    password: options.password.clone(),
508                    schema: options.schema.clone(),
509                    stream_chunk_size: options.stream_chunk_size as u32,
510                    driver: options.driver.clone(),
511                },
512            )),
513        },
514    }
515}
516
517fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
518    match options.connection_options {
519        Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
520            ConnectionOptions::Postgres(PostgresConnectionOptions {
521                host: options.host,
522                port: options.port as u16,
523                username: options.username,
524                password: options.password,
525                database: options.database,
526                pool_max_size: options.pool_max_size as usize,
527                pool_min_idle: options.pool_min_idle as usize,
528                pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
529                pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
530                stream_chunk_size: options.stream_chunk_size as usize,
531                default_numeric_scale: options.default_numeric_scale as i8,
532            })
533        }
534        Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
535            ConnectionOptions::Mysql(MysqlConnectionOptions {
536                host: options.host,
537                port: options.port as u16,
538                username: options.username,
539                password: options.password,
540                database: options.database,
541                pool_max_size: options.pool_max_size as usize,
542                pool_min_idle: options.pool_min_idle as usize,
543                pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
544                pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
545                stream_chunk_size: options.stream_chunk_size as usize,
546            })
547        }
548        Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
549            ConnectionOptions::Oracle(OracleConnectionOptions {
550                host: options.host,
551                port: options.port as u16,
552                username: options.username,
553                password: options.password,
554                service_name: options.service_name,
555                pool_max_size: options.pool_max_size as usize,
556                pool_min_idle: options.pool_min_idle as usize,
557                pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
558                pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
559                stream_chunk_size: options.stream_chunk_size as usize,
560            })
561        }
562        Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
563            ConnectionOptions::Sqlite(SqliteConnectionOptions {
564                path: std::path::Path::new(&options.path).to_path_buf(),
565                stream_chunk_size: options.stream_chunk_size as usize,
566            })
567        }
568        Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
569            ConnectionOptions::Dm(DmConnectionOptions {
570                host: options.host,
571                port: options.port as u16,
572                username: options.username,
573                password: options.password,
574                schema: options.schema,
575                stream_chunk_size: options.stream_chunk_size as usize,
576                driver: options.driver,
577            })
578        }
579        _ => panic!("Failed to parse connection options: {options:?}"),
580    }
581}
582
583fn serialize_duration(duration: &Duration) -> protobuf::Duration {
584    protobuf::Duration {
585        secs: duration.as_secs(),
586        nanos: duration.subsec_nanos(),
587    }
588}
589
590fn parse_duration(duration: &protobuf::Duration) -> Duration {
591    Duration::new(duration.secs, duration.nanos)
592}
593
594fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<protobuf::Projection> {
595    projection.map(|p| protobuf::Projection {
596        projection: p.iter().map(|n| *n as u32).collect(),
597    })
598}
599
600fn parse_projection(projection: Option<&protobuf::Projection>) -> Option<Vec<usize>> {
601    projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
602}
603
604fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
605    let fields = remote_schema
606        .fields
607        .iter()
608        .map(serialize_remote_field)
609        .collect::<Vec<_>>();
610
611    protobuf::RemoteSchema { fields }
612}
613
614fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
615    protobuf::RemoteField {
616        name: remote_field.name.clone(),
617        remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
618        nullable: remote_field.nullable,
619        auto_increment: remote_field.auto_increment,
620    }
621}
622
623fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
624    match remote_type {
625        RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
626            r#type: Some(protobuf::remote_type::Type::PostgresInt2(
627                protobuf::PostgresInt2 {},
628            )),
629        },
630        RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
631            r#type: Some(protobuf::remote_type::Type::PostgresInt4(
632                protobuf::PostgresInt4 {},
633            )),
634        },
635        RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
636            r#type: Some(protobuf::remote_type::Type::PostgresInt8(
637                protobuf::PostgresInt8 {},
638            )),
639        },
640        RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
641            r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
642                protobuf::PostgresFloat4 {},
643            )),
644        },
645        RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
646            r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
647                protobuf::PostgresFloat8 {},
648            )),
649        },
650        RemoteType::Postgres(PostgresType::Numeric(precision, scale)) => protobuf::RemoteType {
651            r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
652                protobuf::PostgresNumeric {
653                    precision: *precision as u32,
654                    scale: *scale as i32,
655                },
656            )),
657        },
658        RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
659            r#type: Some(protobuf::remote_type::Type::PostgresName(
660                protobuf::PostgresName {},
661            )),
662        },
663        RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
664            r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
665                protobuf::PostgresVarchar {},
666            )),
667        },
668        RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
669            r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
670                protobuf::PostgresBpchar {},
671            )),
672        },
673        RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
674            r#type: Some(protobuf::remote_type::Type::PostgresText(
675                protobuf::PostgresText {},
676            )),
677        },
678        RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
679            r#type: Some(protobuf::remote_type::Type::PostgresBytea(
680                protobuf::PostgresBytea {},
681            )),
682        },
683        RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
684            r#type: Some(protobuf::remote_type::Type::PostgresDate(
685                protobuf::PostgresDate {},
686            )),
687        },
688        RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
689            r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
690                protobuf::PostgresTimestamp {},
691            )),
692        },
693        RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
694            r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
695                protobuf::PostgresTimestampTz {},
696            )),
697        },
698        RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
699            r#type: Some(protobuf::remote_type::Type::PostgresTime(
700                protobuf::PostgresTime {},
701            )),
702        },
703        RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
704            r#type: Some(protobuf::remote_type::Type::PostgresInterval(
705                protobuf::PostgresInterval {},
706            )),
707        },
708        RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
709            r#type: Some(protobuf::remote_type::Type::PostgresBool(
710                protobuf::PostgresBool {},
711            )),
712        },
713        RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
714            r#type: Some(protobuf::remote_type::Type::PostgresJson(
715                protobuf::PostgresJson {},
716            )),
717        },
718        RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
719            r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
720                protobuf::PostgresJsonb {},
721            )),
722        },
723        RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
724            r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
725                protobuf::PostgresInt2Array {},
726            )),
727        },
728        RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
729            r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
730                protobuf::PostgresInt4Array {},
731            )),
732        },
733        RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
734            r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
735                protobuf::PostgresInt8Array {},
736            )),
737        },
738        RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
739            r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
740                protobuf::PostgresFloat4Array {},
741            )),
742        },
743        RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
744            r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
745                protobuf::PostgresFloat8Array {},
746            )),
747        },
748        RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
749            r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
750                protobuf::PostgresVarcharArray {},
751            )),
752        },
753        RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
754            r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
755                protobuf::PostgresBpcharArray {},
756            )),
757        },
758        RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
759            r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
760                protobuf::PostgresTextArray {},
761            )),
762        },
763        RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
764            r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
765                protobuf::PostgresByteaArray {},
766            )),
767        },
768        RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
769            r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
770                protobuf::PostgresBoolArray {},
771            )),
772        },
773        RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
774            r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
775                protobuf::PostgresPostGisGeometry {},
776            )),
777        },
778        RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
779            r#type: Some(protobuf::remote_type::Type::PostgresOid(
780                protobuf::PostgresOid {},
781            )),
782        },
783        RemoteType::Postgres(PostgresType::Xml) => protobuf::RemoteType {
784            r#type: Some(protobuf::remote_type::Type::PostgresXml(
785                protobuf::PostgresXml {},
786            )),
787        },
788        RemoteType::Postgres(PostgresType::Uuid) => protobuf::RemoteType {
789            r#type: Some(protobuf::remote_type::Type::PostgresUuid(
790                protobuf::PostgresUuid {},
791            )),
792        },
793
794        RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
795            r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
796                protobuf::MysqlTinyInt {},
797            )),
798        },
799        RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
800            r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
801                protobuf::MysqlTinyIntUnsigned {},
802            )),
803        },
804        RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
805            r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
806                protobuf::MysqlSmallInt {},
807            )),
808        },
809        RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
810            r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
811                protobuf::MysqlSmallIntUnsigned {},
812            )),
813        },
814        RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
815            r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
816                protobuf::MysqlMediumInt {},
817            )),
818        },
819        RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
820            r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
821                protobuf::MysqlMediumIntUnsigned {},
822            )),
823        },
824        RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
825            r#type: Some(protobuf::remote_type::Type::MysqlInteger(
826                protobuf::MysqlInteger {},
827            )),
828        },
829        RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
830            r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
831                protobuf::MysqlIntegerUnsigned {},
832            )),
833        },
834        RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
835            r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
836                protobuf::MysqlBigInt {},
837            )),
838        },
839        RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
840            r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
841                protobuf::MysqlBigIntUnsigned {},
842            )),
843        },
844        RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
845            r#type: Some(protobuf::remote_type::Type::MysqlFloat(
846                protobuf::MysqlFloat {},
847            )),
848        },
849        RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
850            r#type: Some(protobuf::remote_type::Type::MysqlDouble(
851                protobuf::MysqlDouble {},
852            )),
853        },
854        RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
855            r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
856                protobuf::MysqlDecimal {
857                    precision: *precision as u32,
858                    scale: *scale as u32,
859                },
860            )),
861        },
862        RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
863            r#type: Some(protobuf::remote_type::Type::MysqlDate(
864                protobuf::MysqlDate {},
865            )),
866        },
867        RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
868            r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
869                protobuf::MysqlDateTime {},
870            )),
871        },
872        RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
873            r#type: Some(protobuf::remote_type::Type::MysqlTime(
874                protobuf::MysqlTime {},
875            )),
876        },
877        RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
878            r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
879                protobuf::MysqlTimestamp {},
880            )),
881        },
882        RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
883            r#type: Some(protobuf::remote_type::Type::MysqlYear(
884                protobuf::MysqlYear {},
885            )),
886        },
887        RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
888            r#type: Some(protobuf::remote_type::Type::MysqlChar(
889                protobuf::MysqlChar {},
890            )),
891        },
892        RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
893            r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
894                protobuf::MysqlVarchar {},
895            )),
896        },
897        RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
898            r#type: Some(protobuf::remote_type::Type::MysqlBinary(
899                protobuf::MysqlBinary {},
900            )),
901        },
902        RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
903            r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
904                protobuf::MysqlVarbinary {},
905            )),
906        },
907        RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
908            r#type: Some(protobuf::remote_type::Type::MysqlText(
909                protobuf::MysqlText { length: *len },
910            )),
911        },
912        RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
913            r#type: Some(protobuf::remote_type::Type::MysqlBlob(
914                protobuf::MysqlBlob { length: *len },
915            )),
916        },
917        RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
918            r#type: Some(protobuf::remote_type::Type::MysqlJson(
919                protobuf::MysqlJson {},
920            )),
921        },
922        RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
923            r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
924                protobuf::MysqlGeometry {},
925            )),
926        },
927
928        RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
929            r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
930                protobuf::OracleVarchar2 { length: *len },
931            )),
932        },
933        RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
934            r#type: Some(protobuf::remote_type::Type::OracleChar(
935                protobuf::OracleChar { length: *len },
936            )),
937        },
938        RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
939            r#type: Some(protobuf::remote_type::Type::OracleNumber(
940                protobuf::OracleNumber {
941                    precision: *precision as u32,
942                    scale: *scale as i32,
943                },
944            )),
945        },
946        RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
947            r#type: Some(protobuf::remote_type::Type::OracleDate(
948                protobuf::OracleDate {},
949            )),
950        },
951        RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
952            r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
953                protobuf::OracleTimestamp {},
954            )),
955        },
956        RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
957            r#type: Some(protobuf::remote_type::Type::OracleBoolean(
958                protobuf::OracleBoolean {},
959            )),
960        },
961        RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
962            r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
963                protobuf::OracleBinaryFloat {},
964            )),
965        },
966        RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
967            r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
968                protobuf::OracleBinaryDouble {},
969            )),
970        },
971        RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
972            r#type: Some(protobuf::remote_type::Type::OracleBlob(
973                protobuf::OracleBlob {},
974            )),
975        },
976        RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
977            r#type: Some(protobuf::remote_type::Type::OracleFloat(
978                protobuf::OracleFloat {
979                    precision: *precision as u32,
980                },
981            )),
982        },
983        RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
984            r#type: Some(protobuf::remote_type::Type::OracleNchar(
985                protobuf::OracleNChar { length: *len },
986            )),
987        },
988        RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
989            r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
990                protobuf::OracleNVarchar2 { length: *len },
991            )),
992        },
993        RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
994            r#type: Some(protobuf::remote_type::Type::OracleRaw(
995                protobuf::OracleRaw { length: *len },
996            )),
997        },
998        RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
999            r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
1000                protobuf::OracleLongRaw {},
1001            )),
1002        },
1003        RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
1004            r#type: Some(protobuf::remote_type::Type::OracleLong(
1005                protobuf::OracleLong {},
1006            )),
1007        },
1008        RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
1009            r#type: Some(protobuf::remote_type::Type::OracleClob(
1010                protobuf::OracleClob {},
1011            )),
1012        },
1013        RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
1014            r#type: Some(protobuf::remote_type::Type::OracleNclob(
1015                protobuf::OracleNClob {},
1016            )),
1017        },
1018        RemoteType::Oracle(OracleType::SdeGeometry) => protobuf::RemoteType {
1019            r#type: Some(protobuf::remote_type::Type::OracleSdeGeometry(
1020                protobuf::OracleSdeGeometry {},
1021            )),
1022        },
1023        RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
1024            r#type: Some(protobuf::remote_type::Type::SqliteNull(
1025                protobuf::SqliteNull {},
1026            )),
1027        },
1028        RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
1029            r#type: Some(protobuf::remote_type::Type::SqliteInteger(
1030                protobuf::SqliteInteger {},
1031            )),
1032        },
1033        RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
1034            r#type: Some(protobuf::remote_type::Type::SqliteReal(
1035                protobuf::SqliteReal {},
1036            )),
1037        },
1038        RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
1039            r#type: Some(protobuf::remote_type::Type::SqliteText(
1040                protobuf::SqliteText {},
1041            )),
1042        },
1043        RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
1044            r#type: Some(protobuf::remote_type::Type::SqliteBlob(
1045                protobuf::SqliteBlob {},
1046            )),
1047        },
1048        RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
1049            r#type: Some(protobuf::remote_type::Type::DmTinyInt(
1050                protobuf::DmTinyInt {},
1051            )),
1052        },
1053        RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
1054            r#type: Some(protobuf::remote_type::Type::DmSmallInt(
1055                protobuf::DmSmallInt {},
1056            )),
1057        },
1058        RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
1059            r#type: Some(protobuf::remote_type::Type::DmInteger(
1060                protobuf::DmInteger {},
1061            )),
1062        },
1063        RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
1064            r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
1065        },
1066        RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
1067            r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
1068        },
1069        RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
1070            r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
1071        },
1072        RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
1073            r#type: Some(protobuf::remote_type::Type::DmNumeric(
1074                protobuf::DmNumeric {
1075                    precision: *precision as u32,
1076                    scale: *scale as i32,
1077                },
1078            )),
1079        },
1080        RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
1081            r#type: Some(protobuf::remote_type::Type::DmDecimal(
1082                protobuf::DmDecimal {
1083                    precision: *precision as u32,
1084                    scale: *scale as i32,
1085                },
1086            )),
1087        },
1088        RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
1089            r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
1090                length: len.map(|s| s as u32),
1091            })),
1092        },
1093        RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
1094            r#type: Some(protobuf::remote_type::Type::DmVarchar(
1095                protobuf::DmVarchar {
1096                    length: len.map(|s| s as u32),
1097                },
1098            )),
1099        },
1100        RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
1101            r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
1102        },
1103        RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
1104            r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
1105                length: *len as u32,
1106            })),
1107        },
1108        RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
1109            r#type: Some(protobuf::remote_type::Type::DmVarbinary(
1110                protobuf::DmVarbinary {
1111                    length: len.map(|s| s as u32),
1112                },
1113            )),
1114        },
1115        RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
1116            r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
1117        },
1118        RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
1119            r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
1120        },
1121        RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
1122            r#type: Some(protobuf::remote_type::Type::DmTimestamp(
1123                protobuf::DmTimestamp {
1124                    precision: *precision as u32,
1125                },
1126            )),
1127        },
1128        RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
1129            r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
1130                precision: *precision as u32,
1131            })),
1132        },
1133        RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
1134            r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
1135        },
1136    }
1137}
1138
1139fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
1140    let fields = remote_schema
1141        .fields
1142        .iter()
1143        .map(parse_remote_field)
1144        .collect::<Vec<_>>();
1145
1146    RemoteSchema { fields }
1147}
1148
1149fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
1150    RemoteField {
1151        name: field.name.clone(),
1152        remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
1153        nullable: field.nullable,
1154        auto_increment: field.auto_increment,
1155    }
1156}
1157
1158fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
1159    match remote_type.r#type.as_ref().unwrap() {
1160        protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
1161        protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
1162        protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
1163        protobuf::remote_type::Type::PostgresFloat4(_) => {
1164            RemoteType::Postgres(PostgresType::Float4)
1165        }
1166        protobuf::remote_type::Type::PostgresFloat8(_) => {
1167            RemoteType::Postgres(PostgresType::Float8)
1168        }
1169        protobuf::remote_type::Type::PostgresNumeric(numeric) => RemoteType::Postgres(
1170            PostgresType::Numeric(numeric.precision as u8, numeric.scale as i8),
1171        ),
1172        protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
1173        protobuf::remote_type::Type::PostgresVarchar(_) => {
1174            RemoteType::Postgres(PostgresType::Varchar)
1175        }
1176        protobuf::remote_type::Type::PostgresBpchar(_) => {
1177            RemoteType::Postgres(PostgresType::Bpchar)
1178        }
1179        protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
1180        protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
1181        protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
1182        protobuf::remote_type::Type::PostgresTimestamp(_) => {
1183            RemoteType::Postgres(PostgresType::Timestamp)
1184        }
1185        protobuf::remote_type::Type::PostgresTimestampTz(_) => {
1186            RemoteType::Postgres(PostgresType::TimestampTz)
1187        }
1188        protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
1189        protobuf::remote_type::Type::PostgresInterval(_) => {
1190            RemoteType::Postgres(PostgresType::Interval)
1191        }
1192        protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
1193        protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
1194        protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
1195        protobuf::remote_type::Type::PostgresInt2Array(_) => {
1196            RemoteType::Postgres(PostgresType::Int2Array)
1197        }
1198        protobuf::remote_type::Type::PostgresInt4Array(_) => {
1199            RemoteType::Postgres(PostgresType::Int4Array)
1200        }
1201        protobuf::remote_type::Type::PostgresInt8Array(_) => {
1202            RemoteType::Postgres(PostgresType::Int8Array)
1203        }
1204        protobuf::remote_type::Type::PostgresFloat4Array(_) => {
1205            RemoteType::Postgres(PostgresType::Float4Array)
1206        }
1207        protobuf::remote_type::Type::PostgresFloat8Array(_) => {
1208            RemoteType::Postgres(PostgresType::Float8Array)
1209        }
1210        protobuf::remote_type::Type::PostgresVarcharArray(_) => {
1211            RemoteType::Postgres(PostgresType::VarcharArray)
1212        }
1213        protobuf::remote_type::Type::PostgresBpcharArray(_) => {
1214            RemoteType::Postgres(PostgresType::BpcharArray)
1215        }
1216        protobuf::remote_type::Type::PostgresTextArray(_) => {
1217            RemoteType::Postgres(PostgresType::TextArray)
1218        }
1219        protobuf::remote_type::Type::PostgresByteaArray(_) => {
1220            RemoteType::Postgres(PostgresType::ByteaArray)
1221        }
1222        protobuf::remote_type::Type::PostgresBoolArray(_) => {
1223            RemoteType::Postgres(PostgresType::BoolArray)
1224        }
1225        protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
1226            RemoteType::Postgres(PostgresType::PostGisGeometry)
1227        }
1228        protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
1229        protobuf::remote_type::Type::PostgresXml(_) => RemoteType::Postgres(PostgresType::Xml),
1230        protobuf::remote_type::Type::PostgresUuid(_) => RemoteType::Postgres(PostgresType::Uuid),
1231        protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
1232        protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
1233            RemoteType::Mysql(MysqlType::TinyIntUnsigned)
1234        }
1235        protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
1236        protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
1237            RemoteType::Mysql(MysqlType::SmallIntUnsigned)
1238        }
1239        protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
1240        protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
1241            RemoteType::Mysql(MysqlType::MediumIntUnsigned)
1242        }
1243        protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
1244        protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
1245            RemoteType::Mysql(MysqlType::IntegerUnsigned)
1246        }
1247        protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
1248        protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
1249            RemoteType::Mysql(MysqlType::BigIntUnsigned)
1250        }
1251        protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
1252        protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
1253        protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
1254            MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
1255        ),
1256        protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
1257        protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
1258        protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
1259        protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
1260        protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
1261        protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
1262        protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
1263        protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
1264        protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
1265        protobuf::remote_type::Type::MysqlText(text) => {
1266            RemoteType::Mysql(MysqlType::Text(text.length))
1267        }
1268        protobuf::remote_type::Type::MysqlBlob(blob) => {
1269            RemoteType::Mysql(MysqlType::Blob(blob.length))
1270        }
1271        protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
1272        protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
1273        protobuf::remote_type::Type::OracleVarchar2(varchar) => {
1274            RemoteType::Oracle(OracleType::Varchar2(varchar.length))
1275        }
1276        protobuf::remote_type::Type::OracleChar(char) => {
1277            RemoteType::Oracle(OracleType::Char(char.length))
1278        }
1279        protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
1280            OracleType::Number(number.precision as u8, number.scale as i8),
1281        ),
1282        protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
1283        protobuf::remote_type::Type::OracleTimestamp(_) => {
1284            RemoteType::Oracle(OracleType::Timestamp)
1285        }
1286        protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
1287        protobuf::remote_type::Type::OracleBinaryFloat(_) => {
1288            RemoteType::Oracle(OracleType::BinaryFloat)
1289        }
1290        protobuf::remote_type::Type::OracleBinaryDouble(_) => {
1291            RemoteType::Oracle(OracleType::BinaryDouble)
1292        }
1293        protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
1294            RemoteType::Oracle(OracleType::Float(*precision as u8))
1295        }
1296        protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
1297            RemoteType::Oracle(OracleType::NChar(*length))
1298        }
1299        protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
1300            RemoteType::Oracle(OracleType::NVarchar2(*length))
1301        }
1302        protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
1303            RemoteType::Oracle(OracleType::Raw(*length))
1304        }
1305        protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
1306        protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
1307        protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
1308        protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
1309        protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
1310        protobuf::remote_type::Type::OracleSdeGeometry(_) => {
1311            RemoteType::Oracle(OracleType::SdeGeometry)
1312        }
1313        protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1314        protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1315        protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1316        protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1317        protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1318        protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1319        protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1320        protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1321        protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1322        protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1323        protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1324        protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1325            RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1326        }
1327        protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1328            RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1329        }
1330        protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1331            RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1332        }
1333        protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1334            RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1335        }
1336        protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1337        protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1338            RemoteType::Dm(DmType::Binary(*length as u16))
1339        }
1340        protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1341            RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1342        }
1343        protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1344        protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1345        protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1346            RemoteType::Dm(DmType::Timestamp(*precision as u8))
1347        }
1348        protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1349            RemoteType::Dm(DmType::Time(*precision as u8))
1350        }
1351        protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1352    }
1353}
1354
1355fn serialize_remote_source(source: &RemoteSource) -> protobuf::RemoteSource {
1356    match source {
1357        RemoteSource::Query(query) => protobuf::RemoteSource {
1358            source: Some(protobuf::remote_source::Source::Query(query.clone())),
1359        },
1360        RemoteSource::Table(table_identifiers) => protobuf::RemoteSource {
1361            source: Some(protobuf::remote_source::Source::Table(
1362                protobuf::Identifiers {
1363                    idents: table_identifiers.clone(),
1364                },
1365            )),
1366        },
1367    }
1368}
1369
1370fn parse_remote_source(source: &protobuf::RemoteSource) -> DFResult<RemoteSource> {
1371    let source = source.source.as_ref().ok_or(DataFusionError::Internal(
1372        "remote source is not set".to_string(),
1373    ))?;
1374    match source {
1375        protobuf::remote_source::Source::Query(query) => Ok(RemoteSource::Query(query.clone())),
1376        protobuf::remote_source::Source::Table(table_identifiers) => {
1377            Ok(RemoteSource::Table(table_identifiers.idents.clone()))
1378        }
1379    }
1380}
1381
1382fn serialize_partitions(partitions: &[Vec<RecordBatch>]) -> Result<Vec<Vec<u8>>, DataFusionError> {
1383    let mut proto_partitions = vec![];
1384    for partition in partitions {
1385        if partition.is_empty() {
1386            proto_partitions.push(vec![]);
1387            continue;
1388        }
1389        let mut proto_partition = vec![];
1390        let mut stream_writer =
1391            StreamWriter::try_new(&mut proto_partition, &partition[0].schema())?;
1392        for batch in partition {
1393            stream_writer.write(batch)?;
1394        }
1395        stream_writer.finish()?;
1396        proto_partitions.push(proto_partition);
1397    }
1398    Ok(proto_partitions)
1399}
1400
1401fn parse_partitions(
1402    proto_partitions: &[Vec<u8>],
1403) -> Result<Vec<Vec<RecordBatch>>, DataFusionError> {
1404    let mut partitions = vec![];
1405    for proto_partition in proto_partitions {
1406        if proto_partition.is_empty() {
1407            partitions.push(vec![]);
1408            continue;
1409        }
1410        let mut partition = vec![];
1411        let stream_reader = StreamReader::try_new(proto_partition.as_slice(), None)?;
1412        for batch in stream_reader {
1413            partition.push(batch?);
1414        }
1415        partitions.push(partition);
1416    }
1417    Ok(partitions)
1418}