datafusion_remote_table/
codec.rs

1#[cfg(feature = "dm")]
2use crate::DmConnectionOptions;
3#[cfg(feature = "mysql")]
4use crate::MysqlConnectionOptions;
5#[cfg(feature = "oracle")]
6use crate::OracleConnectionOptions;
7#[cfg(feature = "postgres")]
8use crate::PostgresConnectionOptions;
9#[cfg(feature = "sqlite")]
10use crate::SqliteConnectionOptions;
11use crate::generated::prost as protobuf;
12use crate::{
13    Connection, ConnectionOptions, DFResult, DefaultTransform, DmType, MysqlType, OracleType,
14    PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef, RemoteTableExec, RemoteType,
15    SqliteType, Transform, connect,
16};
17use datafusion::arrow::datatypes::SchemaRef;
18use datafusion::common::DataFusionError;
19use datafusion::execution::FunctionRegistry;
20use datafusion::physical_plan::ExecutionPlan;
21use datafusion_proto::convert_required;
22use datafusion_proto::physical_plan::PhysicalExtensionCodec;
23use datafusion_proto::protobuf::proto_error;
24use derive_with::With;
25use log::debug;
26use prost::Message;
27use std::fmt::Debug;
28#[cfg(feature = "sqlite")]
29use std::path::Path;
30use std::sync::Arc;
31
32pub trait TransformCodec: Debug + Send + Sync {
33    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
34    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
35}
36
37#[derive(Debug)]
38pub struct DefaultTransformCodec {}
39
40const DEFAULT_TRANSFORM_ID: &str = "__default";
41
42impl TransformCodec for DefaultTransformCodec {
43    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
44        if value.as_any().is::<DefaultTransform>() {
45            Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
46        } else {
47            Err(DataFusionError::Execution(format!(
48                "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
49            )))
50        }
51    }
52
53    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
54        if value == DEFAULT_TRANSFORM_ID.as_bytes() {
55            Ok(Arc::new(DefaultTransform {}))
56        } else {
57            Err(DataFusionError::Execution(
58                "DefaultTransformCodec only supports DefaultTransform".to_string(),
59            ))
60        }
61    }
62}
63
64pub trait ConnectionCodec: Debug + Send + Sync {
65    fn try_encode(
66        &self,
67        value: &dyn Connection,
68        conn_options: &ConnectionOptions,
69    ) -> DFResult<Vec<u8>>;
70    fn try_decode(
71        &self,
72        value: &[u8],
73        conn_options: &ConnectionOptions,
74    ) -> DFResult<Arc<dyn Connection>>;
75}
76
77#[derive(Debug)]
78pub struct DefaultConnectionCodec;
79
80const DEFAULT_CONNECTION_VALUE: &str = "__default";
81
82impl ConnectionCodec for DefaultConnectionCodec {
83    fn try_encode(
84        &self,
85        _value: &dyn Connection,
86        _conn_options: &ConnectionOptions,
87    ) -> DFResult<Vec<u8>> {
88        Ok(DEFAULT_CONNECTION_VALUE.as_bytes().to_vec())
89    }
90
91    fn try_decode(
92        &self,
93        value: &[u8],
94        conn_options: &ConnectionOptions,
95    ) -> DFResult<Arc<dyn Connection>> {
96        if value != DEFAULT_CONNECTION_VALUE.as_bytes() {
97            return Err(DataFusionError::Execution(format!(
98                "DefaultConnectionCodec only supports {DEFAULT_CONNECTION_VALUE} value but got: {value:?}"
99            )));
100        }
101        tokio::task::block_in_place(|| {
102            tokio::runtime::Handle::current().block_on(async {
103                let pool = connect(conn_options).await?;
104                let conn = pool.get().await?;
105                Ok::<_, DataFusionError>(conn)
106            })
107        })
108    }
109}
110
111#[derive(Debug, With)]
112pub struct RemotePhysicalCodec {
113    transform_codec: Arc<dyn TransformCodec>,
114    connection_codec: Arc<dyn ConnectionCodec>,
115}
116
117impl RemotePhysicalCodec {
118    pub fn new() -> Self {
119        Self {
120            transform_codec: Arc::new(DefaultTransformCodec {}),
121            connection_codec: Arc::new(DefaultConnectionCodec {}),
122        }
123    }
124}
125
126impl Default for RemotePhysicalCodec {
127    fn default() -> Self {
128        Self::new()
129    }
130}
131
132impl PhysicalExtensionCodec for RemotePhysicalCodec {
133    fn try_decode(
134        &self,
135        buf: &[u8],
136        _inputs: &[Arc<dyn ExecutionPlan>],
137        _registry: &dyn FunctionRegistry,
138    ) -> DFResult<Arc<dyn ExecutionPlan>> {
139        let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
140            DataFusionError::Internal(format!(
141                "Failed to decode remote table execution plan: {e:?}"
142            ))
143        })?;
144
145        let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
146            Arc::new(DefaultTransform {})
147        } else {
148            self.transform_codec.try_decode(&proto.transform)?
149        };
150
151        let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
152        let remote_schema = proto
153            .remote_schema
154            .map(|schema| Arc::new(parse_remote_schema(&schema)));
155
156        let projection: Option<Vec<usize>> = proto
157            .projection
158            .map(|p| p.projection.iter().map(|n| *n as usize).collect());
159
160        let limit = proto.limit.map(|l| l as usize);
161
162        let conn_options = parse_connection_options(proto.conn_options.unwrap());
163
164        let now = std::time::Instant::now();
165        let conn = self
166            .connection_codec
167            .try_decode(&proto.connection, &conn_options)?;
168        debug!(
169            "[remote-table] Decoding connection cost {}ms",
170            now.elapsed().as_millis()
171        );
172
173        Ok(Arc::new(RemoteTableExec::try_new(
174            conn_options,
175            proto.sql,
176            table_schema,
177            remote_schema,
178            projection,
179            proto.unparsed_filters,
180            limit,
181            transform,
182            conn,
183        )?))
184    }
185
186    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
187        if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
188            let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
189                DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
190            } else {
191                let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
192                assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
193                bytes
194            };
195
196            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
197            let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
198
199            let serialized_conn = self
200                .connection_codec
201                .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
202
203            let proto = protobuf::RemoteTableExec {
204                conn_options: Some(serialized_connection_options),
205                sql: exec.sql.clone(),
206                table_schema: Some(exec.table_schema.as_ref().try_into()?),
207                remote_schema,
208                projection: exec
209                    .projection
210                    .as_ref()
211                    .map(|p| serialize_projection(p.as_slice())),
212                unparsed_filters: exec.unparsed_filters.clone(),
213                limit: exec.limit.map(|l| l as u32),
214                transform: serialized_transform,
215                connection: serialized_conn,
216            };
217
218            proto.encode(buf).map_err(|e| {
219                DataFusionError::Internal(format!(
220                    "Failed to encode remote table execution plan: {e:?}"
221                ))
222            })?;
223            Ok(())
224        } else {
225            Err(DataFusionError::Execution(format!(
226                "Failed to encode {}",
227                RemoteTableExec::static_name()
228            )))
229        }
230    }
231}
232
233fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
234    match options {
235        #[cfg(feature = "postgres")]
236        ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
237            connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
238                protobuf::PostgresConnectionOptions {
239                    host: options.host.clone(),
240                    port: options.port as u32,
241                    username: options.username.clone(),
242                    password: options.password.clone(),
243                    database: options.database.clone(),
244                    pool_max_size: options.pool_max_size as u32,
245                    stream_chunk_size: options.stream_chunk_size as u32,
246                },
247            )),
248        },
249        #[cfg(feature = "mysql")]
250        ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
251            connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
252                protobuf::MysqlConnectionOptions {
253                    host: options.host.clone(),
254                    port: options.port as u32,
255                    username: options.username.clone(),
256                    password: options.password.clone(),
257                    database: options.database.clone(),
258                    pool_max_size: options.pool_max_size as u32,
259                    stream_chunk_size: options.stream_chunk_size as u32,
260                },
261            )),
262        },
263        #[cfg(feature = "oracle")]
264        ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
265            connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
266                protobuf::OracleConnectionOptions {
267                    host: options.host.clone(),
268                    port: options.port as u32,
269                    username: options.username.clone(),
270                    password: options.password.clone(),
271                    service_name: options.service_name.clone(),
272                    pool_max_size: options.pool_max_size as u32,
273                    stream_chunk_size: options.stream_chunk_size as u32,
274                },
275            )),
276        },
277        #[cfg(feature = "sqlite")]
278        ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
279            connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
280                protobuf::SqliteConnectionOptions {
281                    path: options.path.to_str().unwrap().to_string(),
282                    stream_chunk_size: options.stream_chunk_size as u32,
283                },
284            )),
285        },
286        #[cfg(feature = "dm")]
287        ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
288            connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
289                protobuf::DmConnectionOptions {
290                    host: options.host.clone(),
291                    port: options.port as u32,
292                    username: options.username.clone(),
293                    password: options.password.clone(),
294                    schema: options.schema.clone(),
295                    stream_chunk_size: options.stream_chunk_size as u32,
296                    driver: options.driver.clone(),
297                },
298            )),
299        },
300    }
301}
302
303fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
304    match options.connection_options {
305        #[cfg(feature = "postgres")]
306        Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
307            ConnectionOptions::Postgres(PostgresConnectionOptions {
308                host: options.host,
309                port: options.port as u16,
310                username: options.username,
311                password: options.password,
312                database: options.database,
313                pool_max_size: options.pool_max_size as usize,
314                stream_chunk_size: options.stream_chunk_size as usize,
315            })
316        }
317        #[cfg(feature = "mysql")]
318        Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
319            ConnectionOptions::Mysql(MysqlConnectionOptions {
320                host: options.host,
321                port: options.port as u16,
322                username: options.username,
323                password: options.password,
324                database: options.database,
325                pool_max_size: options.pool_max_size as usize,
326                stream_chunk_size: options.stream_chunk_size as usize,
327            })
328        }
329        #[cfg(feature = "oracle")]
330        Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
331            ConnectionOptions::Oracle(OracleConnectionOptions {
332                host: options.host,
333                port: options.port as u16,
334                username: options.username,
335                password: options.password,
336                service_name: options.service_name,
337                pool_max_size: options.pool_max_size as usize,
338                stream_chunk_size: options.stream_chunk_size as usize,
339            })
340        }
341        #[cfg(feature = "sqlite")]
342        Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
343            ConnectionOptions::Sqlite(SqliteConnectionOptions {
344                path: Path::new(&options.path).to_path_buf(),
345                stream_chunk_size: options.stream_chunk_size as usize,
346            })
347        }
348        #[cfg(feature = "dm")]
349        Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
350            ConnectionOptions::Dm(DmConnectionOptions {
351                host: options.host,
352                port: options.port as u16,
353                username: options.username,
354                password: options.password,
355                schema: options.schema,
356                stream_chunk_size: options.stream_chunk_size as usize,
357                driver: options.driver,
358            })
359        }
360        _ => panic!("Failed to parse connection options: {options:?}"),
361    }
362}
363
364fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
365    protobuf::Projection {
366        projection: projection.iter().map(|n| *n as u32).collect(),
367    }
368}
369
370fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
371    let fields = remote_schema
372        .fields
373        .iter()
374        .map(serialize_remote_field)
375        .collect::<Vec<_>>();
376
377    protobuf::RemoteSchema { fields }
378}
379
380fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
381    protobuf::RemoteField {
382        name: remote_field.name.clone(),
383        remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
384        nullable: remote_field.nullable,
385    }
386}
387
388fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
389    match remote_type {
390        RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
391            r#type: Some(protobuf::remote_type::Type::PostgresInt2(
392                protobuf::PostgresInt2 {},
393            )),
394        },
395        RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
396            r#type: Some(protobuf::remote_type::Type::PostgresInt4(
397                protobuf::PostgresInt4 {},
398            )),
399        },
400        RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
401            r#type: Some(protobuf::remote_type::Type::PostgresInt8(
402                protobuf::PostgresInt8 {},
403            )),
404        },
405        RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
406            r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
407                protobuf::PostgresFloat4 {},
408            )),
409        },
410        RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
411            r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
412                protobuf::PostgresFloat8 {},
413            )),
414        },
415        RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
416            r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
417                protobuf::PostgresNumeric {
418                    scale: *scale as i32,
419                },
420            )),
421        },
422        RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
423            r#type: Some(protobuf::remote_type::Type::PostgresName(
424                protobuf::PostgresName {},
425            )),
426        },
427        RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
428            r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
429                protobuf::PostgresVarchar {},
430            )),
431        },
432        RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
433            r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
434                protobuf::PostgresBpchar {},
435            )),
436        },
437        RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
438            r#type: Some(protobuf::remote_type::Type::PostgresText(
439                protobuf::PostgresText {},
440            )),
441        },
442        RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
443            r#type: Some(protobuf::remote_type::Type::PostgresBytea(
444                protobuf::PostgresBytea {},
445            )),
446        },
447        RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
448            r#type: Some(protobuf::remote_type::Type::PostgresDate(
449                protobuf::PostgresDate {},
450            )),
451        },
452        RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
453            r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
454                protobuf::PostgresTimestamp {},
455            )),
456        },
457        RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
458            r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
459                protobuf::PostgresTimestampTz {},
460            )),
461        },
462        RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
463            r#type: Some(protobuf::remote_type::Type::PostgresTime(
464                protobuf::PostgresTime {},
465            )),
466        },
467        RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
468            r#type: Some(protobuf::remote_type::Type::PostgresInterval(
469                protobuf::PostgresInterval {},
470            )),
471        },
472        RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
473            r#type: Some(protobuf::remote_type::Type::PostgresBool(
474                protobuf::PostgresBool {},
475            )),
476        },
477        RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
478            r#type: Some(protobuf::remote_type::Type::PostgresJson(
479                protobuf::PostgresJson {},
480            )),
481        },
482        RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
483            r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
484                protobuf::PostgresJsonb {},
485            )),
486        },
487        RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
488            r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
489                protobuf::PostgresInt2Array {},
490            )),
491        },
492        RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
493            r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
494                protobuf::PostgresInt4Array {},
495            )),
496        },
497        RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
498            r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
499                protobuf::PostgresInt8Array {},
500            )),
501        },
502        RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
503            r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
504                protobuf::PostgresFloat4Array {},
505            )),
506        },
507        RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
508            r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
509                protobuf::PostgresFloat8Array {},
510            )),
511        },
512        RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
513            r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
514                protobuf::PostgresVarcharArray {},
515            )),
516        },
517        RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
518            r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
519                protobuf::PostgresBpcharArray {},
520            )),
521        },
522        RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
523            r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
524                protobuf::PostgresTextArray {},
525            )),
526        },
527        RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
528            r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
529                protobuf::PostgresByteaArray {},
530            )),
531        },
532        RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
533            r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
534                protobuf::PostgresBoolArray {},
535            )),
536        },
537        RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
538            r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
539                protobuf::PostgresPostGisGeometry {},
540            )),
541        },
542        RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
543            r#type: Some(protobuf::remote_type::Type::PostgresOid(
544                protobuf::PostgresOid {},
545            )),
546        },
547
548        RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
549            r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
550                protobuf::MysqlTinyInt {},
551            )),
552        },
553        RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
554            r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
555                protobuf::MysqlTinyIntUnsigned {},
556            )),
557        },
558        RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
559            r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
560                protobuf::MysqlSmallInt {},
561            )),
562        },
563        RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
564            r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
565                protobuf::MysqlSmallIntUnsigned {},
566            )),
567        },
568        RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
569            r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
570                protobuf::MysqlMediumInt {},
571            )),
572        },
573        RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
574            r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
575                protobuf::MysqlMediumIntUnsigned {},
576            )),
577        },
578        RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
579            r#type: Some(protobuf::remote_type::Type::MysqlInteger(
580                protobuf::MysqlInteger {},
581            )),
582        },
583        RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
584            r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
585                protobuf::MysqlIntegerUnsigned {},
586            )),
587        },
588        RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
589            r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
590                protobuf::MysqlBigInt {},
591            )),
592        },
593        RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
594            r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
595                protobuf::MysqlBigIntUnsigned {},
596            )),
597        },
598        RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
599            r#type: Some(protobuf::remote_type::Type::MysqlFloat(
600                protobuf::MysqlFloat {},
601            )),
602        },
603        RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
604            r#type: Some(protobuf::remote_type::Type::MysqlDouble(
605                protobuf::MysqlDouble {},
606            )),
607        },
608        RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
609            r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
610                protobuf::MysqlDecimal {
611                    precision: *precision as u32,
612                    scale: *scale as u32,
613                },
614            )),
615        },
616        RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
617            r#type: Some(protobuf::remote_type::Type::MysqlDate(
618                protobuf::MysqlDate {},
619            )),
620        },
621        RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
622            r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
623                protobuf::MysqlDateTime {},
624            )),
625        },
626        RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
627            r#type: Some(protobuf::remote_type::Type::MysqlTime(
628                protobuf::MysqlTime {},
629            )),
630        },
631        RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
632            r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
633                protobuf::MysqlTimestamp {},
634            )),
635        },
636        RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
637            r#type: Some(protobuf::remote_type::Type::MysqlYear(
638                protobuf::MysqlYear {},
639            )),
640        },
641        RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
642            r#type: Some(protobuf::remote_type::Type::MysqlChar(
643                protobuf::MysqlChar {},
644            )),
645        },
646        RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
647            r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
648                protobuf::MysqlVarchar {},
649            )),
650        },
651        RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
652            r#type: Some(protobuf::remote_type::Type::MysqlBinary(
653                protobuf::MysqlBinary {},
654            )),
655        },
656        RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
657            r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
658                protobuf::MysqlVarbinary {},
659            )),
660        },
661        RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
662            r#type: Some(protobuf::remote_type::Type::MysqlText(
663                protobuf::MysqlText { length: *len },
664            )),
665        },
666        RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
667            r#type: Some(protobuf::remote_type::Type::MysqlBlob(
668                protobuf::MysqlBlob { length: *len },
669            )),
670        },
671        RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
672            r#type: Some(protobuf::remote_type::Type::MysqlJson(
673                protobuf::MysqlJson {},
674            )),
675        },
676        RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
677            r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
678                protobuf::MysqlGeometry {},
679            )),
680        },
681
682        RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
683            r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
684                protobuf::OracleVarchar2 { length: *len },
685            )),
686        },
687        RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
688            r#type: Some(protobuf::remote_type::Type::OracleChar(
689                protobuf::OracleChar { length: *len },
690            )),
691        },
692        RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
693            r#type: Some(protobuf::remote_type::Type::OracleNumber(
694                protobuf::OracleNumber {
695                    precision: *precision as u32,
696                    scale: *scale as i32,
697                },
698            )),
699        },
700        RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
701            r#type: Some(protobuf::remote_type::Type::OracleDate(
702                protobuf::OracleDate {},
703            )),
704        },
705        RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
706            r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
707                protobuf::OracleTimestamp {},
708            )),
709        },
710        RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
711            r#type: Some(protobuf::remote_type::Type::OracleBoolean(
712                protobuf::OracleBoolean {},
713            )),
714        },
715        RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
716            r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
717                protobuf::OracleBinaryFloat {},
718            )),
719        },
720        RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
721            r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
722                protobuf::OracleBinaryDouble {},
723            )),
724        },
725        RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
726            r#type: Some(protobuf::remote_type::Type::OracleBlob(
727                protobuf::OracleBlob {},
728            )),
729        },
730        RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
731            r#type: Some(protobuf::remote_type::Type::OracleFloat(
732                protobuf::OracleFloat {
733                    precision: *precision as u32,
734                },
735            )),
736        },
737        RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
738            r#type: Some(protobuf::remote_type::Type::OracleNchar(
739                protobuf::OracleNChar { length: *len },
740            )),
741        },
742        RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
743            r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
744                protobuf::OracleNVarchar2 { length: *len },
745            )),
746        },
747        RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
748            r#type: Some(protobuf::remote_type::Type::OracleRaw(
749                protobuf::OracleRaw { length: *len },
750            )),
751        },
752        RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
753            r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
754                protobuf::OracleLongRaw {},
755            )),
756        },
757        RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
758            r#type: Some(protobuf::remote_type::Type::OracleLong(
759                protobuf::OracleLong {},
760            )),
761        },
762        RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
763            r#type: Some(protobuf::remote_type::Type::OracleClob(
764                protobuf::OracleClob {},
765            )),
766        },
767        RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
768            r#type: Some(protobuf::remote_type::Type::OracleNclob(
769                protobuf::OracleNClob {},
770            )),
771        },
772        RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
773            r#type: Some(protobuf::remote_type::Type::SqliteNull(
774                protobuf::SqliteNull {},
775            )),
776        },
777        RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
778            r#type: Some(protobuf::remote_type::Type::SqliteInteger(
779                protobuf::SqliteInteger {},
780            )),
781        },
782        RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
783            r#type: Some(protobuf::remote_type::Type::SqliteReal(
784                protobuf::SqliteReal {},
785            )),
786        },
787        RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
788            r#type: Some(protobuf::remote_type::Type::SqliteText(
789                protobuf::SqliteText {},
790            )),
791        },
792        RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
793            r#type: Some(protobuf::remote_type::Type::SqliteBlob(
794                protobuf::SqliteBlob {},
795            )),
796        },
797        RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
798            r#type: Some(protobuf::remote_type::Type::DmTinyInt(
799                protobuf::DmTinyInt {},
800            )),
801        },
802        RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
803            r#type: Some(protobuf::remote_type::Type::DmSmallInt(
804                protobuf::DmSmallInt {},
805            )),
806        },
807        RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
808            r#type: Some(protobuf::remote_type::Type::DmInteger(
809                protobuf::DmInteger {},
810            )),
811        },
812        RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
813            r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
814        },
815        RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
816            r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
817        },
818        RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
819            r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
820        },
821        RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
822            r#type: Some(protobuf::remote_type::Type::DmNumeric(
823                protobuf::DmNumeric {
824                    precision: *precision as u32,
825                    scale: *scale as i32,
826                },
827            )),
828        },
829        RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
830            r#type: Some(protobuf::remote_type::Type::DmDecimal(
831                protobuf::DmDecimal {
832                    precision: *precision as u32,
833                    scale: *scale as i32,
834                },
835            )),
836        },
837        RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
838            r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
839                length: len.map(|s| s as u32),
840            })),
841        },
842        RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
843            r#type: Some(protobuf::remote_type::Type::DmVarchar(
844                protobuf::DmVarchar {
845                    length: len.map(|s| s as u32),
846                },
847            )),
848        },
849        RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
850            r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
851        },
852        RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
853            r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
854                length: *len as u32,
855            })),
856        },
857        RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
858            r#type: Some(protobuf::remote_type::Type::DmVarbinary(
859                protobuf::DmVarbinary {
860                    length: len.map(|s| s as u32),
861                },
862            )),
863        },
864        RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
865            r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
866        },
867        RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
868            r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
869        },
870        RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
871            r#type: Some(protobuf::remote_type::Type::DmTimestamp(
872                protobuf::DmTimestamp {
873                    precision: *precision as u32,
874                },
875            )),
876        },
877        RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
878            r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
879                precision: *precision as u32,
880            })),
881        },
882        RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
883            r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
884        },
885    }
886}
887
888fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
889    let fields = remote_schema
890        .fields
891        .iter()
892        .map(parse_remote_field)
893        .collect::<Vec<_>>();
894
895    RemoteSchema { fields }
896}
897
898fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
899    RemoteField {
900        name: field.name.clone(),
901        remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
902        nullable: field.nullable,
903    }
904}
905
906fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
907    match remote_type.r#type.as_ref().unwrap() {
908        protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
909        protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
910        protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
911        protobuf::remote_type::Type::PostgresFloat4(_) => {
912            RemoteType::Postgres(PostgresType::Float4)
913        }
914        protobuf::remote_type::Type::PostgresFloat8(_) => {
915            RemoteType::Postgres(PostgresType::Float8)
916        }
917        protobuf::remote_type::Type::PostgresNumeric(numeric) => {
918            RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
919        }
920        protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
921        protobuf::remote_type::Type::PostgresVarchar(_) => {
922            RemoteType::Postgres(PostgresType::Varchar)
923        }
924        protobuf::remote_type::Type::PostgresBpchar(_) => {
925            RemoteType::Postgres(PostgresType::Bpchar)
926        }
927        protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
928        protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
929        protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
930        protobuf::remote_type::Type::PostgresTimestamp(_) => {
931            RemoteType::Postgres(PostgresType::Timestamp)
932        }
933        protobuf::remote_type::Type::PostgresTimestampTz(_) => {
934            RemoteType::Postgres(PostgresType::TimestampTz)
935        }
936        protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
937        protobuf::remote_type::Type::PostgresInterval(_) => {
938            RemoteType::Postgres(PostgresType::Interval)
939        }
940        protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
941        protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
942        protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
943        protobuf::remote_type::Type::PostgresInt2Array(_) => {
944            RemoteType::Postgres(PostgresType::Int2Array)
945        }
946        protobuf::remote_type::Type::PostgresInt4Array(_) => {
947            RemoteType::Postgres(PostgresType::Int4Array)
948        }
949        protobuf::remote_type::Type::PostgresInt8Array(_) => {
950            RemoteType::Postgres(PostgresType::Int8Array)
951        }
952        protobuf::remote_type::Type::PostgresFloat4Array(_) => {
953            RemoteType::Postgres(PostgresType::Float4Array)
954        }
955        protobuf::remote_type::Type::PostgresFloat8Array(_) => {
956            RemoteType::Postgres(PostgresType::Float8Array)
957        }
958        protobuf::remote_type::Type::PostgresVarcharArray(_) => {
959            RemoteType::Postgres(PostgresType::VarcharArray)
960        }
961        protobuf::remote_type::Type::PostgresBpcharArray(_) => {
962            RemoteType::Postgres(PostgresType::BpcharArray)
963        }
964        protobuf::remote_type::Type::PostgresTextArray(_) => {
965            RemoteType::Postgres(PostgresType::TextArray)
966        }
967        protobuf::remote_type::Type::PostgresByteaArray(_) => {
968            RemoteType::Postgres(PostgresType::ByteaArray)
969        }
970        protobuf::remote_type::Type::PostgresBoolArray(_) => {
971            RemoteType::Postgres(PostgresType::BoolArray)
972        }
973        protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
974            RemoteType::Postgres(PostgresType::PostGisGeometry)
975        }
976        protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
977        protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
978        protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
979            RemoteType::Mysql(MysqlType::TinyIntUnsigned)
980        }
981        protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
982        protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
983            RemoteType::Mysql(MysqlType::SmallIntUnsigned)
984        }
985        protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
986        protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
987            RemoteType::Mysql(MysqlType::MediumIntUnsigned)
988        }
989        protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
990        protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
991            RemoteType::Mysql(MysqlType::IntegerUnsigned)
992        }
993        protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
994        protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
995            RemoteType::Mysql(MysqlType::BigIntUnsigned)
996        }
997        protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
998        protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
999        protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
1000            MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
1001        ),
1002        protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
1003        protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
1004        protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
1005        protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
1006        protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
1007        protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
1008        protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
1009        protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
1010        protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
1011        protobuf::remote_type::Type::MysqlText(text) => {
1012            RemoteType::Mysql(MysqlType::Text(text.length))
1013        }
1014        protobuf::remote_type::Type::MysqlBlob(blob) => {
1015            RemoteType::Mysql(MysqlType::Blob(blob.length))
1016        }
1017        protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
1018        protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
1019        protobuf::remote_type::Type::OracleVarchar2(varchar) => {
1020            RemoteType::Oracle(OracleType::Varchar2(varchar.length))
1021        }
1022        protobuf::remote_type::Type::OracleChar(char) => {
1023            RemoteType::Oracle(OracleType::Char(char.length))
1024        }
1025        protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
1026            OracleType::Number(number.precision as u8, number.scale as i8),
1027        ),
1028        protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
1029        protobuf::remote_type::Type::OracleTimestamp(_) => {
1030            RemoteType::Oracle(OracleType::Timestamp)
1031        }
1032        protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
1033        protobuf::remote_type::Type::OracleBinaryFloat(_) => {
1034            RemoteType::Oracle(OracleType::BinaryFloat)
1035        }
1036        protobuf::remote_type::Type::OracleBinaryDouble(_) => {
1037            RemoteType::Oracle(OracleType::BinaryDouble)
1038        }
1039        protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
1040            RemoteType::Oracle(OracleType::Float(*precision as u8))
1041        }
1042        protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
1043            RemoteType::Oracle(OracleType::NChar(*length))
1044        }
1045        protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
1046            RemoteType::Oracle(OracleType::NVarchar2(*length))
1047        }
1048        protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
1049            RemoteType::Oracle(OracleType::Raw(*length))
1050        }
1051        protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
1052        protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
1053        protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
1054        protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
1055        protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
1056        protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1057        protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1058        protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1059        protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1060        protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1061        protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1062        protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1063        protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1064        protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1065        protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1066        protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1067        protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1068            RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1069        }
1070        protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1071            RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1072        }
1073        protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1074            RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1075        }
1076        protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1077            RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1078        }
1079        protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1080        protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1081            RemoteType::Dm(DmType::Binary(*length as u16))
1082        }
1083        protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1084            RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1085        }
1086        protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1087        protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1088        protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1089            RemoteType::Dm(DmType::Timestamp(*precision as u8))
1090        }
1091        protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1092            RemoteType::Dm(DmType::Time(*precision as u8))
1093        }
1094        protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1095    }
1096}