datafusion_remote_table/
codec.rs

1#[cfg(feature = "dm")]
2use crate::DmConnectionOptions;
3#[cfg(feature = "mysql")]
4use crate::MysqlConnectionOptions;
5#[cfg(feature = "oracle")]
6use crate::OracleConnectionOptions;
7#[cfg(feature = "postgres")]
8use crate::PostgresConnectionOptions;
9#[cfg(feature = "sqlite")]
10use crate::SqliteConnectionOptions;
11use crate::generated::prost as protobuf;
12use crate::{
13    Connection, ConnectionOptions, DFResult, DefaultTransform, DmType, MysqlType, OracleType,
14    PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef, RemoteTableExec, RemoteType,
15    SqliteType, Transform, connect,
16};
17use datafusion::arrow::datatypes::SchemaRef;
18use datafusion::common::DataFusionError;
19use datafusion::execution::FunctionRegistry;
20use datafusion::physical_plan::ExecutionPlan;
21use datafusion_proto::convert_required;
22use datafusion_proto::physical_plan::PhysicalExtensionCodec;
23use datafusion_proto::protobuf::proto_error;
24use derive_with::With;
25use log::debug;
26use prost::Message;
27use std::fmt::Debug;
28#[cfg(feature = "sqlite")]
29use std::path::Path;
30use std::sync::Arc;
31
32pub trait TransformCodec: Debug + Send + Sync {
33    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
34    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
35}
36
37#[derive(Debug)]
38pub struct DefaultTransformCodec {}
39
40const DEFAULT_TRANSFORM_ID: &str = "__default";
41
42impl TransformCodec for DefaultTransformCodec {
43    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
44        if value.as_any().is::<DefaultTransform>() {
45            Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
46        } else {
47            Err(DataFusionError::Execution(format!(
48                "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
49            )))
50        }
51    }
52
53    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
54        if value == DEFAULT_TRANSFORM_ID.as_bytes() {
55            Ok(Arc::new(DefaultTransform {}))
56        } else {
57            Err(DataFusionError::Execution(
58                "DefaultTransformCodec only supports DefaultTransform".to_string(),
59            ))
60        }
61    }
62}
63
64pub trait ConnectionCodec: Debug + Send + Sync {
65    fn try_encode(
66        &self,
67        value: &dyn Connection,
68        conn_options: &ConnectionOptions,
69    ) -> DFResult<Vec<u8>>;
70    fn try_decode(
71        &self,
72        value: &[u8],
73        conn_options: &ConnectionOptions,
74    ) -> DFResult<Arc<dyn Connection>>;
75}
76
77#[derive(Debug)]
78pub struct DefaultConnectionCodec;
79
80const DEFAULT_CONNECTION_VALUE: &str = "__default";
81
82impl ConnectionCodec for DefaultConnectionCodec {
83    fn try_encode(
84        &self,
85        _value: &dyn Connection,
86        _conn_options: &ConnectionOptions,
87    ) -> DFResult<Vec<u8>> {
88        Ok(DEFAULT_CONNECTION_VALUE.as_bytes().to_vec())
89    }
90
91    fn try_decode(
92        &self,
93        value: &[u8],
94        conn_options: &ConnectionOptions,
95    ) -> DFResult<Arc<dyn Connection>> {
96        if value != DEFAULT_CONNECTION_VALUE.as_bytes() {
97            return Err(DataFusionError::Execution(format!(
98                "DefaultConnectionCodec only supports {DEFAULT_CONNECTION_VALUE} value but got: {value:?}"
99            )));
100        }
101        let conn_options = conn_options.clone().with_pool_max_size(1);
102        tokio::task::block_in_place(|| {
103            tokio::runtime::Handle::current().block_on(async {
104                let pool = connect(&conn_options).await?;
105                let conn = pool.get().await?;
106                Ok::<_, DataFusionError>(conn)
107            })
108        })
109    }
110}
111
112#[derive(Debug, With)]
113pub struct RemotePhysicalCodec {
114    transform_codec: Arc<dyn TransformCodec>,
115    connection_codec: Arc<dyn ConnectionCodec>,
116}
117
118impl RemotePhysicalCodec {
119    pub fn new() -> Self {
120        Self {
121            transform_codec: Arc::new(DefaultTransformCodec {}),
122            connection_codec: Arc::new(DefaultConnectionCodec {}),
123        }
124    }
125}
126
127impl Default for RemotePhysicalCodec {
128    fn default() -> Self {
129        Self::new()
130    }
131}
132
133impl PhysicalExtensionCodec for RemotePhysicalCodec {
134    fn try_decode(
135        &self,
136        buf: &[u8],
137        _inputs: &[Arc<dyn ExecutionPlan>],
138        _registry: &dyn FunctionRegistry,
139    ) -> DFResult<Arc<dyn ExecutionPlan>> {
140        let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
141            DataFusionError::Internal(format!(
142                "Failed to decode remote table execution plan: {e:?}"
143            ))
144        })?;
145
146        let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
147            Arc::new(DefaultTransform {})
148        } else {
149            self.transform_codec.try_decode(&proto.transform)?
150        };
151
152        let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
153        let remote_schema = proto
154            .remote_schema
155            .map(|schema| Arc::new(parse_remote_schema(&schema)));
156
157        let projection: Option<Vec<usize>> = proto
158            .projection
159            .map(|p| p.projection.iter().map(|n| *n as usize).collect());
160
161        let limit = proto.limit.map(|l| l as usize);
162
163        let conn_options = parse_connection_options(proto.conn_options.unwrap());
164
165        let now = std::time::Instant::now();
166        let conn = self
167            .connection_codec
168            .try_decode(&proto.connection, &conn_options)?;
169        debug!(
170            "[remote-table] Decoding connection cost {}ms",
171            now.elapsed().as_millis()
172        );
173
174        Ok(Arc::new(RemoteTableExec::try_new(
175            conn_options,
176            proto.sql,
177            table_schema,
178            remote_schema,
179            projection,
180            proto.unparsed_filters,
181            limit,
182            transform,
183            conn,
184        )?))
185    }
186
187    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
188        if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
189            let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
190                DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
191            } else {
192                let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
193                assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
194                bytes
195            };
196
197            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
198            let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
199
200            let serialized_conn = self
201                .connection_codec
202                .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
203
204            let proto = protobuf::RemoteTableExec {
205                conn_options: Some(serialized_connection_options),
206                sql: exec.sql.clone(),
207                table_schema: Some(exec.table_schema.as_ref().try_into()?),
208                remote_schema,
209                projection: exec
210                    .projection
211                    .as_ref()
212                    .map(|p| serialize_projection(p.as_slice())),
213                unparsed_filters: exec.unparsed_filters.clone(),
214                limit: exec.limit.map(|l| l as u32),
215                transform: serialized_transform,
216                connection: serialized_conn,
217            };
218
219            proto.encode(buf).map_err(|e| {
220                DataFusionError::Internal(format!(
221                    "Failed to encode remote table execution plan: {e:?}"
222                ))
223            })?;
224            Ok(())
225        } else {
226            Err(DataFusionError::Execution(format!(
227                "Failed to encode {}",
228                RemoteTableExec::static_name()
229            )))
230        }
231    }
232}
233
234fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
235    match options {
236        #[cfg(feature = "postgres")]
237        ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
238            connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
239                protobuf::PostgresConnectionOptions {
240                    host: options.host.clone(),
241                    port: options.port as u32,
242                    username: options.username.clone(),
243                    password: options.password.clone(),
244                    database: options.database.clone(),
245                    pool_max_size: options.pool_max_size as u32,
246                    stream_chunk_size: options.stream_chunk_size as u32,
247                },
248            )),
249        },
250        #[cfg(feature = "mysql")]
251        ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
252            connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
253                protobuf::MysqlConnectionOptions {
254                    host: options.host.clone(),
255                    port: options.port as u32,
256                    username: options.username.clone(),
257                    password: options.password.clone(),
258                    database: options.database.clone(),
259                    pool_max_size: options.pool_max_size as u32,
260                    stream_chunk_size: options.stream_chunk_size as u32,
261                },
262            )),
263        },
264        #[cfg(feature = "oracle")]
265        ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
266            connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
267                protobuf::OracleConnectionOptions {
268                    host: options.host.clone(),
269                    port: options.port as u32,
270                    username: options.username.clone(),
271                    password: options.password.clone(),
272                    service_name: options.service_name.clone(),
273                    pool_max_size: options.pool_max_size as u32,
274                    stream_chunk_size: options.stream_chunk_size as u32,
275                },
276            )),
277        },
278        #[cfg(feature = "sqlite")]
279        ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
280            connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
281                protobuf::SqliteConnectionOptions {
282                    path: options.path.to_str().unwrap().to_string(),
283                    stream_chunk_size: options.stream_chunk_size as u32,
284                },
285            )),
286        },
287        #[cfg(feature = "dm")]
288        ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
289            connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
290                protobuf::DmConnectionOptions {
291                    host: options.host.clone(),
292                    port: options.port as u32,
293                    username: options.username.clone(),
294                    password: options.password.clone(),
295                    schema: options.schema.clone(),
296                    stream_chunk_size: options.stream_chunk_size as u32,
297                    driver: options.driver.clone(),
298                },
299            )),
300        },
301    }
302}
303
304fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
305    match options.connection_options {
306        #[cfg(feature = "postgres")]
307        Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
308            ConnectionOptions::Postgres(PostgresConnectionOptions {
309                host: options.host,
310                port: options.port as u16,
311                username: options.username,
312                password: options.password,
313                database: options.database,
314                pool_max_size: options.pool_max_size as usize,
315                stream_chunk_size: options.stream_chunk_size as usize,
316            })
317        }
318        #[cfg(feature = "mysql")]
319        Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
320            ConnectionOptions::Mysql(MysqlConnectionOptions {
321                host: options.host,
322                port: options.port as u16,
323                username: options.username,
324                password: options.password,
325                database: options.database,
326                pool_max_size: options.pool_max_size as usize,
327                stream_chunk_size: options.stream_chunk_size as usize,
328            })
329        }
330        #[cfg(feature = "oracle")]
331        Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
332            ConnectionOptions::Oracle(OracleConnectionOptions {
333                host: options.host,
334                port: options.port as u16,
335                username: options.username,
336                password: options.password,
337                service_name: options.service_name,
338                pool_max_size: options.pool_max_size as usize,
339                stream_chunk_size: options.stream_chunk_size as usize,
340            })
341        }
342        #[cfg(feature = "sqlite")]
343        Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
344            ConnectionOptions::Sqlite(SqliteConnectionOptions {
345                path: Path::new(&options.path).to_path_buf(),
346                stream_chunk_size: options.stream_chunk_size as usize,
347            })
348        }
349        #[cfg(feature = "dm")]
350        Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
351            ConnectionOptions::Dm(DmConnectionOptions {
352                host: options.host,
353                port: options.port as u16,
354                username: options.username,
355                password: options.password,
356                schema: options.schema,
357                stream_chunk_size: options.stream_chunk_size as usize,
358                driver: options.driver,
359            })
360        }
361        _ => panic!("Failed to parse connection options: {options:?}"),
362    }
363}
364
365fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
366    protobuf::Projection {
367        projection: projection.iter().map(|n| *n as u32).collect(),
368    }
369}
370
371fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
372    let fields = remote_schema
373        .fields
374        .iter()
375        .map(serialize_remote_field)
376        .collect::<Vec<_>>();
377
378    protobuf::RemoteSchema { fields }
379}
380
381fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
382    protobuf::RemoteField {
383        name: remote_field.name.clone(),
384        remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
385        nullable: remote_field.nullable,
386    }
387}
388
389fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
390    match remote_type {
391        RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
392            r#type: Some(protobuf::remote_type::Type::PostgresInt2(
393                protobuf::PostgresInt2 {},
394            )),
395        },
396        RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
397            r#type: Some(protobuf::remote_type::Type::PostgresInt4(
398                protobuf::PostgresInt4 {},
399            )),
400        },
401        RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
402            r#type: Some(protobuf::remote_type::Type::PostgresInt8(
403                protobuf::PostgresInt8 {},
404            )),
405        },
406        RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
407            r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
408                protobuf::PostgresFloat4 {},
409            )),
410        },
411        RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
412            r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
413                protobuf::PostgresFloat8 {},
414            )),
415        },
416        RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
417            r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
418                protobuf::PostgresNumeric {
419                    scale: *scale as i32,
420                },
421            )),
422        },
423        RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
424            r#type: Some(protobuf::remote_type::Type::PostgresName(
425                protobuf::PostgresName {},
426            )),
427        },
428        RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
429            r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
430                protobuf::PostgresVarchar {},
431            )),
432        },
433        RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
434            r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
435                protobuf::PostgresBpchar {},
436            )),
437        },
438        RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
439            r#type: Some(protobuf::remote_type::Type::PostgresText(
440                protobuf::PostgresText {},
441            )),
442        },
443        RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
444            r#type: Some(protobuf::remote_type::Type::PostgresBytea(
445                protobuf::PostgresBytea {},
446            )),
447        },
448        RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
449            r#type: Some(protobuf::remote_type::Type::PostgresDate(
450                protobuf::PostgresDate {},
451            )),
452        },
453        RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
454            r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
455                protobuf::PostgresTimestamp {},
456            )),
457        },
458        RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
459            r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
460                protobuf::PostgresTimestampTz {},
461            )),
462        },
463        RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
464            r#type: Some(protobuf::remote_type::Type::PostgresTime(
465                protobuf::PostgresTime {},
466            )),
467        },
468        RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
469            r#type: Some(protobuf::remote_type::Type::PostgresInterval(
470                protobuf::PostgresInterval {},
471            )),
472        },
473        RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
474            r#type: Some(protobuf::remote_type::Type::PostgresBool(
475                protobuf::PostgresBool {},
476            )),
477        },
478        RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
479            r#type: Some(protobuf::remote_type::Type::PostgresJson(
480                protobuf::PostgresJson {},
481            )),
482        },
483        RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
484            r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
485                protobuf::PostgresJsonb {},
486            )),
487        },
488        RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
489            r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
490                protobuf::PostgresInt2Array {},
491            )),
492        },
493        RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
494            r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
495                protobuf::PostgresInt4Array {},
496            )),
497        },
498        RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
499            r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
500                protobuf::PostgresInt8Array {},
501            )),
502        },
503        RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
504            r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
505                protobuf::PostgresFloat4Array {},
506            )),
507        },
508        RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
509            r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
510                protobuf::PostgresFloat8Array {},
511            )),
512        },
513        RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
514            r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
515                protobuf::PostgresVarcharArray {},
516            )),
517        },
518        RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
519            r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
520                protobuf::PostgresBpcharArray {},
521            )),
522        },
523        RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
524            r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
525                protobuf::PostgresTextArray {},
526            )),
527        },
528        RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
529            r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
530                protobuf::PostgresByteaArray {},
531            )),
532        },
533        RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
534            r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
535                protobuf::PostgresBoolArray {},
536            )),
537        },
538        RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
539            r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
540                protobuf::PostgresPostGisGeometry {},
541            )),
542        },
543        RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
544            r#type: Some(protobuf::remote_type::Type::PostgresOid(
545                protobuf::PostgresOid {},
546            )),
547        },
548
549        RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
550            r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
551                protobuf::MysqlTinyInt {},
552            )),
553        },
554        RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
555            r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
556                protobuf::MysqlTinyIntUnsigned {},
557            )),
558        },
559        RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
560            r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
561                protobuf::MysqlSmallInt {},
562            )),
563        },
564        RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
565            r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
566                protobuf::MysqlSmallIntUnsigned {},
567            )),
568        },
569        RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
570            r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
571                protobuf::MysqlMediumInt {},
572            )),
573        },
574        RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
575            r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
576                protobuf::MysqlMediumIntUnsigned {},
577            )),
578        },
579        RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
580            r#type: Some(protobuf::remote_type::Type::MysqlInteger(
581                protobuf::MysqlInteger {},
582            )),
583        },
584        RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
585            r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
586                protobuf::MysqlIntegerUnsigned {},
587            )),
588        },
589        RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
590            r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
591                protobuf::MysqlBigInt {},
592            )),
593        },
594        RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
595            r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
596                protobuf::MysqlBigIntUnsigned {},
597            )),
598        },
599        RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
600            r#type: Some(protobuf::remote_type::Type::MysqlFloat(
601                protobuf::MysqlFloat {},
602            )),
603        },
604        RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
605            r#type: Some(protobuf::remote_type::Type::MysqlDouble(
606                protobuf::MysqlDouble {},
607            )),
608        },
609        RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
610            r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
611                protobuf::MysqlDecimal {
612                    precision: *precision as u32,
613                    scale: *scale as u32,
614                },
615            )),
616        },
617        RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
618            r#type: Some(protobuf::remote_type::Type::MysqlDate(
619                protobuf::MysqlDate {},
620            )),
621        },
622        RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
623            r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
624                protobuf::MysqlDateTime {},
625            )),
626        },
627        RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
628            r#type: Some(protobuf::remote_type::Type::MysqlTime(
629                protobuf::MysqlTime {},
630            )),
631        },
632        RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
633            r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
634                protobuf::MysqlTimestamp {},
635            )),
636        },
637        RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
638            r#type: Some(protobuf::remote_type::Type::MysqlYear(
639                protobuf::MysqlYear {},
640            )),
641        },
642        RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
643            r#type: Some(protobuf::remote_type::Type::MysqlChar(
644                protobuf::MysqlChar {},
645            )),
646        },
647        RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
648            r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
649                protobuf::MysqlVarchar {},
650            )),
651        },
652        RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
653            r#type: Some(protobuf::remote_type::Type::MysqlBinary(
654                protobuf::MysqlBinary {},
655            )),
656        },
657        RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
658            r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
659                protobuf::MysqlVarbinary {},
660            )),
661        },
662        RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
663            r#type: Some(protobuf::remote_type::Type::MysqlText(
664                protobuf::MysqlText { length: *len },
665            )),
666        },
667        RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
668            r#type: Some(protobuf::remote_type::Type::MysqlBlob(
669                protobuf::MysqlBlob { length: *len },
670            )),
671        },
672        RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
673            r#type: Some(protobuf::remote_type::Type::MysqlJson(
674                protobuf::MysqlJson {},
675            )),
676        },
677        RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
678            r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
679                protobuf::MysqlGeometry {},
680            )),
681        },
682
683        RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
684            r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
685                protobuf::OracleVarchar2 { length: *len },
686            )),
687        },
688        RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
689            r#type: Some(protobuf::remote_type::Type::OracleChar(
690                protobuf::OracleChar { length: *len },
691            )),
692        },
693        RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
694            r#type: Some(protobuf::remote_type::Type::OracleNumber(
695                protobuf::OracleNumber {
696                    precision: *precision as u32,
697                    scale: *scale as i32,
698                },
699            )),
700        },
701        RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
702            r#type: Some(protobuf::remote_type::Type::OracleDate(
703                protobuf::OracleDate {},
704            )),
705        },
706        RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
707            r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
708                protobuf::OracleTimestamp {},
709            )),
710        },
711        RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
712            r#type: Some(protobuf::remote_type::Type::OracleBoolean(
713                protobuf::OracleBoolean {},
714            )),
715        },
716        RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
717            r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
718                protobuf::OracleBinaryFloat {},
719            )),
720        },
721        RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
722            r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
723                protobuf::OracleBinaryDouble {},
724            )),
725        },
726        RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
727            r#type: Some(protobuf::remote_type::Type::OracleBlob(
728                protobuf::OracleBlob {},
729            )),
730        },
731        RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
732            r#type: Some(protobuf::remote_type::Type::OracleFloat(
733                protobuf::OracleFloat {
734                    precision: *precision as u32,
735                },
736            )),
737        },
738        RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
739            r#type: Some(protobuf::remote_type::Type::OracleNchar(
740                protobuf::OracleNChar { length: *len },
741            )),
742        },
743        RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
744            r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
745                protobuf::OracleNVarchar2 { length: *len },
746            )),
747        },
748        RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
749            r#type: Some(protobuf::remote_type::Type::OracleRaw(
750                protobuf::OracleRaw { length: *len },
751            )),
752        },
753        RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
754            r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
755                protobuf::OracleLongRaw {},
756            )),
757        },
758        RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
759            r#type: Some(protobuf::remote_type::Type::OracleLong(
760                protobuf::OracleLong {},
761            )),
762        },
763        RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
764            r#type: Some(protobuf::remote_type::Type::OracleClob(
765                protobuf::OracleClob {},
766            )),
767        },
768        RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
769            r#type: Some(protobuf::remote_type::Type::OracleNclob(
770                protobuf::OracleNClob {},
771            )),
772        },
773        RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
774            r#type: Some(protobuf::remote_type::Type::SqliteNull(
775                protobuf::SqliteNull {},
776            )),
777        },
778        RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
779            r#type: Some(protobuf::remote_type::Type::SqliteInteger(
780                protobuf::SqliteInteger {},
781            )),
782        },
783        RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
784            r#type: Some(protobuf::remote_type::Type::SqliteReal(
785                protobuf::SqliteReal {},
786            )),
787        },
788        RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
789            r#type: Some(protobuf::remote_type::Type::SqliteText(
790                protobuf::SqliteText {},
791            )),
792        },
793        RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
794            r#type: Some(protobuf::remote_type::Type::SqliteBlob(
795                protobuf::SqliteBlob {},
796            )),
797        },
798        RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
799            r#type: Some(protobuf::remote_type::Type::DmTinyInt(
800                protobuf::DmTinyInt {},
801            )),
802        },
803        RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
804            r#type: Some(protobuf::remote_type::Type::DmSmallInt(
805                protobuf::DmSmallInt {},
806            )),
807        },
808        RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
809            r#type: Some(protobuf::remote_type::Type::DmInteger(
810                protobuf::DmInteger {},
811            )),
812        },
813        RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
814            r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
815        },
816        RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
817            r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
818        },
819        RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
820            r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
821        },
822        RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
823            r#type: Some(protobuf::remote_type::Type::DmNumeric(
824                protobuf::DmNumeric {
825                    precision: *precision as u32,
826                    scale: *scale as i32,
827                },
828            )),
829        },
830        RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
831            r#type: Some(protobuf::remote_type::Type::DmDecimal(
832                protobuf::DmDecimal {
833                    precision: *precision as u32,
834                    scale: *scale as i32,
835                },
836            )),
837        },
838        RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
839            r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
840                length: len.map(|s| s as u32),
841            })),
842        },
843        RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
844            r#type: Some(protobuf::remote_type::Type::DmVarchar(
845                protobuf::DmVarchar {
846                    length: len.map(|s| s as u32),
847                },
848            )),
849        },
850        RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
851            r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
852        },
853        RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
854            r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
855                length: *len as u32,
856            })),
857        },
858        RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
859            r#type: Some(protobuf::remote_type::Type::DmVarbinary(
860                protobuf::DmVarbinary {
861                    length: len.map(|s| s as u32),
862                },
863            )),
864        },
865        RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
866            r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
867        },
868        RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
869            r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
870        },
871        RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
872            r#type: Some(protobuf::remote_type::Type::DmTimestamp(
873                protobuf::DmTimestamp {
874                    precision: *precision as u32,
875                },
876            )),
877        },
878        RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
879            r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
880                precision: *precision as u32,
881            })),
882        },
883        RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
884            r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
885        },
886    }
887}
888
889fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
890    let fields = remote_schema
891        .fields
892        .iter()
893        .map(parse_remote_field)
894        .collect::<Vec<_>>();
895
896    RemoteSchema { fields }
897}
898
899fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
900    RemoteField {
901        name: field.name.clone(),
902        remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
903        nullable: field.nullable,
904    }
905}
906
907fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
908    match remote_type.r#type.as_ref().unwrap() {
909        protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
910        protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
911        protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
912        protobuf::remote_type::Type::PostgresFloat4(_) => {
913            RemoteType::Postgres(PostgresType::Float4)
914        }
915        protobuf::remote_type::Type::PostgresFloat8(_) => {
916            RemoteType::Postgres(PostgresType::Float8)
917        }
918        protobuf::remote_type::Type::PostgresNumeric(numeric) => {
919            RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
920        }
921        protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
922        protobuf::remote_type::Type::PostgresVarchar(_) => {
923            RemoteType::Postgres(PostgresType::Varchar)
924        }
925        protobuf::remote_type::Type::PostgresBpchar(_) => {
926            RemoteType::Postgres(PostgresType::Bpchar)
927        }
928        protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
929        protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
930        protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
931        protobuf::remote_type::Type::PostgresTimestamp(_) => {
932            RemoteType::Postgres(PostgresType::Timestamp)
933        }
934        protobuf::remote_type::Type::PostgresTimestampTz(_) => {
935            RemoteType::Postgres(PostgresType::TimestampTz)
936        }
937        protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
938        protobuf::remote_type::Type::PostgresInterval(_) => {
939            RemoteType::Postgres(PostgresType::Interval)
940        }
941        protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
942        protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
943        protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
944        protobuf::remote_type::Type::PostgresInt2Array(_) => {
945            RemoteType::Postgres(PostgresType::Int2Array)
946        }
947        protobuf::remote_type::Type::PostgresInt4Array(_) => {
948            RemoteType::Postgres(PostgresType::Int4Array)
949        }
950        protobuf::remote_type::Type::PostgresInt8Array(_) => {
951            RemoteType::Postgres(PostgresType::Int8Array)
952        }
953        protobuf::remote_type::Type::PostgresFloat4Array(_) => {
954            RemoteType::Postgres(PostgresType::Float4Array)
955        }
956        protobuf::remote_type::Type::PostgresFloat8Array(_) => {
957            RemoteType::Postgres(PostgresType::Float8Array)
958        }
959        protobuf::remote_type::Type::PostgresVarcharArray(_) => {
960            RemoteType::Postgres(PostgresType::VarcharArray)
961        }
962        protobuf::remote_type::Type::PostgresBpcharArray(_) => {
963            RemoteType::Postgres(PostgresType::BpcharArray)
964        }
965        protobuf::remote_type::Type::PostgresTextArray(_) => {
966            RemoteType::Postgres(PostgresType::TextArray)
967        }
968        protobuf::remote_type::Type::PostgresByteaArray(_) => {
969            RemoteType::Postgres(PostgresType::ByteaArray)
970        }
971        protobuf::remote_type::Type::PostgresBoolArray(_) => {
972            RemoteType::Postgres(PostgresType::BoolArray)
973        }
974        protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
975            RemoteType::Postgres(PostgresType::PostGisGeometry)
976        }
977        protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
978        protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
979        protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
980            RemoteType::Mysql(MysqlType::TinyIntUnsigned)
981        }
982        protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
983        protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
984            RemoteType::Mysql(MysqlType::SmallIntUnsigned)
985        }
986        protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
987        protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
988            RemoteType::Mysql(MysqlType::MediumIntUnsigned)
989        }
990        protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
991        protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
992            RemoteType::Mysql(MysqlType::IntegerUnsigned)
993        }
994        protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
995        protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
996            RemoteType::Mysql(MysqlType::BigIntUnsigned)
997        }
998        protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
999        protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
1000        protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
1001            MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
1002        ),
1003        protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
1004        protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
1005        protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
1006        protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
1007        protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
1008        protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
1009        protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
1010        protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
1011        protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
1012        protobuf::remote_type::Type::MysqlText(text) => {
1013            RemoteType::Mysql(MysqlType::Text(text.length))
1014        }
1015        protobuf::remote_type::Type::MysqlBlob(blob) => {
1016            RemoteType::Mysql(MysqlType::Blob(blob.length))
1017        }
1018        protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
1019        protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
1020        protobuf::remote_type::Type::OracleVarchar2(varchar) => {
1021            RemoteType::Oracle(OracleType::Varchar2(varchar.length))
1022        }
1023        protobuf::remote_type::Type::OracleChar(char) => {
1024            RemoteType::Oracle(OracleType::Char(char.length))
1025        }
1026        protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
1027            OracleType::Number(number.precision as u8, number.scale as i8),
1028        ),
1029        protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
1030        protobuf::remote_type::Type::OracleTimestamp(_) => {
1031            RemoteType::Oracle(OracleType::Timestamp)
1032        }
1033        protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
1034        protobuf::remote_type::Type::OracleBinaryFloat(_) => {
1035            RemoteType::Oracle(OracleType::BinaryFloat)
1036        }
1037        protobuf::remote_type::Type::OracleBinaryDouble(_) => {
1038            RemoteType::Oracle(OracleType::BinaryDouble)
1039        }
1040        protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
1041            RemoteType::Oracle(OracleType::Float(*precision as u8))
1042        }
1043        protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
1044            RemoteType::Oracle(OracleType::NChar(*length))
1045        }
1046        protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
1047            RemoteType::Oracle(OracleType::NVarchar2(*length))
1048        }
1049        protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
1050            RemoteType::Oracle(OracleType::Raw(*length))
1051        }
1052        protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
1053        protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
1054        protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
1055        protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
1056        protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
1057        protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1058        protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1059        protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1060        protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1061        protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1062        protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1063        protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1064        protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1065        protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1066        protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1067        protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1068        protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1069            RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1070        }
1071        protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1072            RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1073        }
1074        protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1075            RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1076        }
1077        protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1078            RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1079        }
1080        protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1081        protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1082            RemoteType::Dm(DmType::Binary(*length as u16))
1083        }
1084        protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1085            RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1086        }
1087        protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1088        protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1089        protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1090            RemoteType::Dm(DmType::Timestamp(*precision as u8))
1091        }
1092        protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1093            RemoteType::Dm(DmType::Time(*precision as u8))
1094        }
1095        protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1096    }
1097}