datafusion_remote_table/
codec.rs

1#[cfg(feature = "dm")]
2use crate::DmConnectionOptions;
3#[cfg(feature = "mysql")]
4use crate::MysqlConnectionOptions;
5#[cfg(feature = "oracle")]
6use crate::OracleConnectionOptions;
7#[cfg(feature = "postgres")]
8use crate::PostgresConnectionOptions;
9#[cfg(feature = "sqlite")]
10use crate::SqliteConnectionOptions;
11use crate::generated::prost as protobuf;
12use crate::{
13    Connection, ConnectionOptions, DFResult, DefaultTransform, DmType, MysqlType, OracleType,
14    PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef, RemoteTableExec, RemoteType,
15    SqliteType, Transform, connect,
16};
17use datafusion::arrow::datatypes::SchemaRef;
18use datafusion::common::DataFusionError;
19use datafusion::execution::FunctionRegistry;
20use datafusion::physical_plan::ExecutionPlan;
21use datafusion_proto::convert_required;
22use datafusion_proto::physical_plan::PhysicalExtensionCodec;
23use datafusion_proto::protobuf::proto_error;
24use derive_with::With;
25use log::debug;
26use prost::Message;
27use std::fmt::Debug;
28#[cfg(feature = "sqlite")]
29use std::path::Path;
30use std::sync::Arc;
31
32pub trait TransformCodec: Debug + Send + Sync {
33    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
34    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
35}
36
37#[derive(Debug)]
38pub struct DefaultTransformCodec {}
39
40const DEFAULT_TRANSFORM_ID: &str = "__default";
41
42impl TransformCodec for DefaultTransformCodec {
43    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
44        if value.as_any().is::<DefaultTransform>() {
45            Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
46        } else {
47            Err(DataFusionError::Execution(format!(
48                "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
49            )))
50        }
51    }
52
53    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
54        if value == DEFAULT_TRANSFORM_ID.as_bytes() {
55            Ok(Arc::new(DefaultTransform {}))
56        } else {
57            Err(DataFusionError::Execution(
58                "DefaultTransformCodec only supports DefaultTransform".to_string(),
59            ))
60        }
61    }
62}
63
64pub trait ConnectionCodec: Debug + Send + Sync {
65    fn try_encode(
66        &self,
67        value: &dyn Connection,
68        conn_options: &ConnectionOptions,
69    ) -> DFResult<Vec<u8>>;
70    fn try_decode(
71        &self,
72        value: &[u8],
73        conn_options: &ConnectionOptions,
74    ) -> DFResult<Arc<dyn Connection>>;
75}
76
77#[derive(Debug)]
78pub struct DefaultConnectionCodec;
79
80const DEFAULT_CONNECTION_VALUE: &str = "__default";
81
82impl ConnectionCodec for DefaultConnectionCodec {
83    fn try_encode(
84        &self,
85        _value: &dyn Connection,
86        _conn_options: &ConnectionOptions,
87    ) -> DFResult<Vec<u8>> {
88        Ok(DEFAULT_CONNECTION_VALUE.as_bytes().to_vec())
89    }
90
91    fn try_decode(
92        &self,
93        value: &[u8],
94        conn_options: &ConnectionOptions,
95    ) -> DFResult<Arc<dyn Connection>> {
96        if value != DEFAULT_CONNECTION_VALUE.as_bytes() {
97            return Err(DataFusionError::Execution(format!(
98                "DefaultConnectionCodec only supports {DEFAULT_CONNECTION_VALUE} value but got: {value:?}"
99            )));
100        }
101        let conn_options = conn_options.clone().with_pool_max_size(1);
102        tokio::task::block_in_place(|| {
103            tokio::runtime::Handle::current().block_on(async {
104                let pool = connect(&conn_options).await?;
105                let conn = pool.get().await?;
106                Ok::<_, DataFusionError>(conn)
107            })
108        })
109    }
110}
111
112#[derive(Debug, With)]
113pub struct RemotePhysicalCodec {
114    transform_codec: Arc<dyn TransformCodec>,
115    connection_codec: Arc<dyn ConnectionCodec>,
116}
117
118impl RemotePhysicalCodec {
119    pub fn new() -> Self {
120        Self {
121            transform_codec: Arc::new(DefaultTransformCodec {}),
122            connection_codec: Arc::new(DefaultConnectionCodec {}),
123        }
124    }
125}
126
127impl Default for RemotePhysicalCodec {
128    fn default() -> Self {
129        Self::new()
130    }
131}
132
133impl PhysicalExtensionCodec for RemotePhysicalCodec {
134    fn try_decode(
135        &self,
136        buf: &[u8],
137        _inputs: &[Arc<dyn ExecutionPlan>],
138        _registry: &dyn FunctionRegistry,
139    ) -> DFResult<Arc<dyn ExecutionPlan>> {
140        let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
141            DataFusionError::Internal(format!(
142                "Failed to decode remote table execution plan: {e:?}"
143            ))
144        })?;
145
146        let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
147            Arc::new(DefaultTransform {})
148        } else {
149            self.transform_codec.try_decode(&proto.transform)?
150        };
151
152        let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
153        let remote_schema = proto
154            .remote_schema
155            .map(|schema| Arc::new(parse_remote_schema(&schema)));
156
157        let projection: Option<Vec<usize>> = proto
158            .projection
159            .map(|p| p.projection.iter().map(|n| *n as usize).collect());
160
161        let limit = proto.limit.map(|l| l as usize);
162
163        let conn_options = parse_connection_options(proto.conn_options.unwrap());
164
165        let now = std::time::Instant::now();
166        let conn = self
167            .connection_codec
168            .try_decode(&proto.connection, &conn_options)?;
169        debug!(
170            "[remote-table] Decoding connection cost {}ms",
171            now.elapsed().as_millis()
172        );
173
174        Ok(Arc::new(RemoteTableExec::try_new(
175            conn_options,
176            proto.sql,
177            table_schema,
178            remote_schema,
179            projection,
180            proto.unparsed_filters,
181            limit,
182            transform,
183            conn,
184        )?))
185    }
186
187    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
188        if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
189            let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
190                DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
191            } else {
192                let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
193                assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
194                bytes
195            };
196
197            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
198            let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
199
200            let serialized_conn = self
201                .connection_codec
202                .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
203
204            let proto = protobuf::RemoteTableExec {
205                conn_options: Some(serialized_connection_options),
206                sql: exec.sql.clone(),
207                table_schema: Some(exec.table_schema.as_ref().try_into()?),
208                remote_schema,
209                projection: exec
210                    .projection
211                    .as_ref()
212                    .map(|p| serialize_projection(p.as_slice())),
213                unparsed_filters: exec.unparsed_filters.clone(),
214                limit: exec.limit.map(|l| l as u32),
215                transform: serialized_transform,
216                connection: serialized_conn,
217            };
218
219            proto.encode(buf).map_err(|e| {
220                DataFusionError::Internal(format!(
221                    "Failed to encode remote table execution plan: {e:?}"
222                ))
223            })?;
224            Ok(())
225        } else {
226            Err(DataFusionError::Execution(format!(
227                "Failed to encode {}",
228                RemoteTableExec::static_name()
229            )))
230        }
231    }
232}
233
234fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
235    match options {
236        #[cfg(feature = "postgres")]
237        ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
238            connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
239                protobuf::PostgresConnectionOptions {
240                    host: options.host.clone(),
241                    port: options.port as u32,
242                    username: options.username.clone(),
243                    password: options.password.clone(),
244                    database: options.database.clone(),
245                    pool_max_size: options.pool_max_size as u32,
246                    stream_chunk_size: options.stream_chunk_size as u32,
247                },
248            )),
249        },
250        #[cfg(feature = "mysql")]
251        ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
252            connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
253                protobuf::MysqlConnectionOptions {
254                    host: options.host.clone(),
255                    port: options.port as u32,
256                    username: options.username.clone(),
257                    password: options.password.clone(),
258                    database: options.database.clone(),
259                    pool_max_size: options.pool_max_size as u32,
260                    stream_chunk_size: options.stream_chunk_size as u32,
261                },
262            )),
263        },
264        #[cfg(feature = "oracle")]
265        ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
266            connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
267                protobuf::OracleConnectionOptions {
268                    host: options.host.clone(),
269                    port: options.port as u32,
270                    username: options.username.clone(),
271                    password: options.password.clone(),
272                    service_name: options.service_name.clone(),
273                    pool_max_size: options.pool_max_size as u32,
274                    stream_chunk_size: options.stream_chunk_size as u32,
275                },
276            )),
277        },
278        #[cfg(feature = "sqlite")]
279        ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
280            connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
281                protobuf::SqliteConnectionOptions {
282                    path: options.path.to_str().unwrap().to_string(),
283                    stream_chunk_size: options.stream_chunk_size as u32,
284                },
285            )),
286        },
287        #[cfg(feature = "dm")]
288        ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
289            connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
290                protobuf::DmConnectionOptions {
291                    host: options.host.clone(),
292                    port: options.port as u32,
293                    username: options.username.clone(),
294                    password: options.password.clone(),
295                    schema: options.schema.clone(),
296                    stream_chunk_size: options.stream_chunk_size as u32,
297                    driver: options.driver.clone(),
298                },
299            )),
300        },
301    }
302}
303
304fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
305    match options.connection_options {
306        #[cfg(feature = "postgres")]
307        Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
308            ConnectionOptions::Postgres(PostgresConnectionOptions {
309                host: options.host,
310                port: options.port as u16,
311                username: options.username,
312                password: options.password,
313                database: options.database,
314                pool_max_size: options.pool_max_size as usize,
315                stream_chunk_size: options.stream_chunk_size as usize,
316            })
317        }
318        #[cfg(feature = "mysql")]
319        Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
320            ConnectionOptions::Mysql(MysqlConnectionOptions {
321                host: options.host,
322                port: options.port as u16,
323                username: options.username,
324                password: options.password,
325                database: options.database,
326                pool_max_size: options.pool_max_size as usize,
327                stream_chunk_size: options.stream_chunk_size as usize,
328            })
329        }
330        #[cfg(feature = "oracle")]
331        Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
332            ConnectionOptions::Oracle(OracleConnectionOptions {
333                host: options.host,
334                port: options.port as u16,
335                username: options.username,
336                password: options.password,
337                service_name: options.service_name,
338                pool_max_size: options.pool_max_size as usize,
339                stream_chunk_size: options.stream_chunk_size as usize,
340            })
341        }
342        #[cfg(feature = "sqlite")]
343        Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
344            ConnectionOptions::Sqlite(SqliteConnectionOptions {
345                path: Path::new(&options.path).to_path_buf(),
346                stream_chunk_size: options.stream_chunk_size as usize,
347            })
348        }
349        #[cfg(feature = "dm")]
350        Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
351            ConnectionOptions::Dm(DmConnectionOptions {
352                host: options.host,
353                port: options.port as u16,
354                username: options.username,
355                password: options.password,
356                schema: options.schema,
357                stream_chunk_size: options.stream_chunk_size as usize,
358                driver: options.driver,
359            })
360        }
361        _ => panic!("Failed to parse connection options: {options:?}"),
362    }
363}
364
365fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
366    protobuf::Projection {
367        projection: projection.iter().map(|n| *n as u32).collect(),
368    }
369}
370
371fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
372    let fields = remote_schema
373        .fields
374        .iter()
375        .map(serialize_remote_field)
376        .collect::<Vec<_>>();
377
378    protobuf::RemoteSchema { fields }
379}
380
381fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
382    protobuf::RemoteField {
383        name: remote_field.name.clone(),
384        remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
385        nullable: remote_field.nullable,
386    }
387}
388
389fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
390    match remote_type {
391        RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
392            r#type: Some(protobuf::remote_type::Type::PostgresInt2(
393                protobuf::PostgresInt2 {},
394            )),
395        },
396        RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
397            r#type: Some(protobuf::remote_type::Type::PostgresInt4(
398                protobuf::PostgresInt4 {},
399            )),
400        },
401        RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
402            r#type: Some(protobuf::remote_type::Type::PostgresInt8(
403                protobuf::PostgresInt8 {},
404            )),
405        },
406        RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
407            r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
408                protobuf::PostgresFloat4 {},
409            )),
410        },
411        RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
412            r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
413                protobuf::PostgresFloat8 {},
414            )),
415        },
416        RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
417            r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
418                protobuf::PostgresNumeric {
419                    scale: *scale as i32,
420                },
421            )),
422        },
423        RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
424            r#type: Some(protobuf::remote_type::Type::PostgresName(
425                protobuf::PostgresName {},
426            )),
427        },
428        RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
429            r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
430                protobuf::PostgresVarchar {},
431            )),
432        },
433        RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
434            r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
435                protobuf::PostgresBpchar {},
436            )),
437        },
438        RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
439            r#type: Some(protobuf::remote_type::Type::PostgresText(
440                protobuf::PostgresText {},
441            )),
442        },
443        RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
444            r#type: Some(protobuf::remote_type::Type::PostgresBytea(
445                protobuf::PostgresBytea {},
446            )),
447        },
448        RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
449            r#type: Some(protobuf::remote_type::Type::PostgresDate(
450                protobuf::PostgresDate {},
451            )),
452        },
453        RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
454            r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
455                protobuf::PostgresTimestamp {},
456            )),
457        },
458        RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
459            r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
460                protobuf::PostgresTimestampTz {},
461            )),
462        },
463        RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
464            r#type: Some(protobuf::remote_type::Type::PostgresTime(
465                protobuf::PostgresTime {},
466            )),
467        },
468        RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
469            r#type: Some(protobuf::remote_type::Type::PostgresInterval(
470                protobuf::PostgresInterval {},
471            )),
472        },
473        RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
474            r#type: Some(protobuf::remote_type::Type::PostgresBool(
475                protobuf::PostgresBool {},
476            )),
477        },
478        RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
479            r#type: Some(protobuf::remote_type::Type::PostgresJson(
480                protobuf::PostgresJson {},
481            )),
482        },
483        RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
484            r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
485                protobuf::PostgresJsonb {},
486            )),
487        },
488        RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
489            r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
490                protobuf::PostgresInt2Array {},
491            )),
492        },
493        RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
494            r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
495                protobuf::PostgresInt4Array {},
496            )),
497        },
498        RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
499            r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
500                protobuf::PostgresInt8Array {},
501            )),
502        },
503        RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
504            r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
505                protobuf::PostgresFloat4Array {},
506            )),
507        },
508        RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
509            r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
510                protobuf::PostgresFloat8Array {},
511            )),
512        },
513        RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
514            r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
515                protobuf::PostgresVarcharArray {},
516            )),
517        },
518        RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
519            r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
520                protobuf::PostgresBpcharArray {},
521            )),
522        },
523        RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
524            r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
525                protobuf::PostgresTextArray {},
526            )),
527        },
528        RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
529            r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
530                protobuf::PostgresByteaArray {},
531            )),
532        },
533        RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
534            r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
535                protobuf::PostgresBoolArray {},
536            )),
537        },
538        RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
539            r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
540                protobuf::PostgresPostGisGeometry {},
541            )),
542        },
543        RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
544            r#type: Some(protobuf::remote_type::Type::PostgresOid(
545                protobuf::PostgresOid {},
546            )),
547        },
548        RemoteType::Postgres(PostgresType::Xml) => protobuf::RemoteType {
549            r#type: Some(protobuf::remote_type::Type::PostgresXml(
550                protobuf::PostgresXml {},
551            )),
552        },
553        RemoteType::Postgres(PostgresType::Uuid) => protobuf::RemoteType {
554            r#type: Some(protobuf::remote_type::Type::PostgresUuid(
555                protobuf::PostgresUuid {},
556            )),
557        },
558
559        RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
560            r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
561                protobuf::MysqlTinyInt {},
562            )),
563        },
564        RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
565            r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
566                protobuf::MysqlTinyIntUnsigned {},
567            )),
568        },
569        RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
570            r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
571                protobuf::MysqlSmallInt {},
572            )),
573        },
574        RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
575            r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
576                protobuf::MysqlSmallIntUnsigned {},
577            )),
578        },
579        RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
580            r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
581                protobuf::MysqlMediumInt {},
582            )),
583        },
584        RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
585            r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
586                protobuf::MysqlMediumIntUnsigned {},
587            )),
588        },
589        RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
590            r#type: Some(protobuf::remote_type::Type::MysqlInteger(
591                protobuf::MysqlInteger {},
592            )),
593        },
594        RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
595            r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
596                protobuf::MysqlIntegerUnsigned {},
597            )),
598        },
599        RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
600            r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
601                protobuf::MysqlBigInt {},
602            )),
603        },
604        RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
605            r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
606                protobuf::MysqlBigIntUnsigned {},
607            )),
608        },
609        RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
610            r#type: Some(protobuf::remote_type::Type::MysqlFloat(
611                protobuf::MysqlFloat {},
612            )),
613        },
614        RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
615            r#type: Some(protobuf::remote_type::Type::MysqlDouble(
616                protobuf::MysqlDouble {},
617            )),
618        },
619        RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
620            r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
621                protobuf::MysqlDecimal {
622                    precision: *precision as u32,
623                    scale: *scale as u32,
624                },
625            )),
626        },
627        RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
628            r#type: Some(protobuf::remote_type::Type::MysqlDate(
629                protobuf::MysqlDate {},
630            )),
631        },
632        RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
633            r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
634                protobuf::MysqlDateTime {},
635            )),
636        },
637        RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
638            r#type: Some(protobuf::remote_type::Type::MysqlTime(
639                protobuf::MysqlTime {},
640            )),
641        },
642        RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
643            r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
644                protobuf::MysqlTimestamp {},
645            )),
646        },
647        RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
648            r#type: Some(protobuf::remote_type::Type::MysqlYear(
649                protobuf::MysqlYear {},
650            )),
651        },
652        RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
653            r#type: Some(protobuf::remote_type::Type::MysqlChar(
654                protobuf::MysqlChar {},
655            )),
656        },
657        RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
658            r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
659                protobuf::MysqlVarchar {},
660            )),
661        },
662        RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
663            r#type: Some(protobuf::remote_type::Type::MysqlBinary(
664                protobuf::MysqlBinary {},
665            )),
666        },
667        RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
668            r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
669                protobuf::MysqlVarbinary {},
670            )),
671        },
672        RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
673            r#type: Some(protobuf::remote_type::Type::MysqlText(
674                protobuf::MysqlText { length: *len },
675            )),
676        },
677        RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
678            r#type: Some(protobuf::remote_type::Type::MysqlBlob(
679                protobuf::MysqlBlob { length: *len },
680            )),
681        },
682        RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
683            r#type: Some(protobuf::remote_type::Type::MysqlJson(
684                protobuf::MysqlJson {},
685            )),
686        },
687        RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
688            r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
689                protobuf::MysqlGeometry {},
690            )),
691        },
692
693        RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
694            r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
695                protobuf::OracleVarchar2 { length: *len },
696            )),
697        },
698        RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
699            r#type: Some(protobuf::remote_type::Type::OracleChar(
700                protobuf::OracleChar { length: *len },
701            )),
702        },
703        RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
704            r#type: Some(protobuf::remote_type::Type::OracleNumber(
705                protobuf::OracleNumber {
706                    precision: *precision as u32,
707                    scale: *scale as i32,
708                },
709            )),
710        },
711        RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
712            r#type: Some(protobuf::remote_type::Type::OracleDate(
713                protobuf::OracleDate {},
714            )),
715        },
716        RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
717            r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
718                protobuf::OracleTimestamp {},
719            )),
720        },
721        RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
722            r#type: Some(protobuf::remote_type::Type::OracleBoolean(
723                protobuf::OracleBoolean {},
724            )),
725        },
726        RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
727            r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
728                protobuf::OracleBinaryFloat {},
729            )),
730        },
731        RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
732            r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
733                protobuf::OracleBinaryDouble {},
734            )),
735        },
736        RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
737            r#type: Some(protobuf::remote_type::Type::OracleBlob(
738                protobuf::OracleBlob {},
739            )),
740        },
741        RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
742            r#type: Some(protobuf::remote_type::Type::OracleFloat(
743                protobuf::OracleFloat {
744                    precision: *precision as u32,
745                },
746            )),
747        },
748        RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
749            r#type: Some(protobuf::remote_type::Type::OracleNchar(
750                protobuf::OracleNChar { length: *len },
751            )),
752        },
753        RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
754            r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
755                protobuf::OracleNVarchar2 { length: *len },
756            )),
757        },
758        RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
759            r#type: Some(protobuf::remote_type::Type::OracleRaw(
760                protobuf::OracleRaw { length: *len },
761            )),
762        },
763        RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
764            r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
765                protobuf::OracleLongRaw {},
766            )),
767        },
768        RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
769            r#type: Some(protobuf::remote_type::Type::OracleLong(
770                protobuf::OracleLong {},
771            )),
772        },
773        RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
774            r#type: Some(protobuf::remote_type::Type::OracleClob(
775                protobuf::OracleClob {},
776            )),
777        },
778        RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
779            r#type: Some(protobuf::remote_type::Type::OracleNclob(
780                protobuf::OracleNClob {},
781            )),
782        },
783        RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
784            r#type: Some(protobuf::remote_type::Type::SqliteNull(
785                protobuf::SqliteNull {},
786            )),
787        },
788        RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
789            r#type: Some(protobuf::remote_type::Type::SqliteInteger(
790                protobuf::SqliteInteger {},
791            )),
792        },
793        RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
794            r#type: Some(protobuf::remote_type::Type::SqliteReal(
795                protobuf::SqliteReal {},
796            )),
797        },
798        RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
799            r#type: Some(protobuf::remote_type::Type::SqliteText(
800                protobuf::SqliteText {},
801            )),
802        },
803        RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
804            r#type: Some(protobuf::remote_type::Type::SqliteBlob(
805                protobuf::SqliteBlob {},
806            )),
807        },
808        RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
809            r#type: Some(protobuf::remote_type::Type::DmTinyInt(
810                protobuf::DmTinyInt {},
811            )),
812        },
813        RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
814            r#type: Some(protobuf::remote_type::Type::DmSmallInt(
815                protobuf::DmSmallInt {},
816            )),
817        },
818        RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
819            r#type: Some(protobuf::remote_type::Type::DmInteger(
820                protobuf::DmInteger {},
821            )),
822        },
823        RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
824            r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
825        },
826        RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
827            r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
828        },
829        RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
830            r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
831        },
832        RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
833            r#type: Some(protobuf::remote_type::Type::DmNumeric(
834                protobuf::DmNumeric {
835                    precision: *precision as u32,
836                    scale: *scale as i32,
837                },
838            )),
839        },
840        RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
841            r#type: Some(protobuf::remote_type::Type::DmDecimal(
842                protobuf::DmDecimal {
843                    precision: *precision as u32,
844                    scale: *scale as i32,
845                },
846            )),
847        },
848        RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
849            r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
850                length: len.map(|s| s as u32),
851            })),
852        },
853        RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
854            r#type: Some(protobuf::remote_type::Type::DmVarchar(
855                protobuf::DmVarchar {
856                    length: len.map(|s| s as u32),
857                },
858            )),
859        },
860        RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
861            r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
862        },
863        RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
864            r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
865                length: *len as u32,
866            })),
867        },
868        RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
869            r#type: Some(protobuf::remote_type::Type::DmVarbinary(
870                protobuf::DmVarbinary {
871                    length: len.map(|s| s as u32),
872                },
873            )),
874        },
875        RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
876            r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
877        },
878        RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
879            r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
880        },
881        RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
882            r#type: Some(protobuf::remote_type::Type::DmTimestamp(
883                protobuf::DmTimestamp {
884                    precision: *precision as u32,
885                },
886            )),
887        },
888        RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
889            r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
890                precision: *precision as u32,
891            })),
892        },
893        RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
894            r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
895        },
896    }
897}
898
899fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
900    let fields = remote_schema
901        .fields
902        .iter()
903        .map(parse_remote_field)
904        .collect::<Vec<_>>();
905
906    RemoteSchema { fields }
907}
908
909fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
910    RemoteField {
911        name: field.name.clone(),
912        remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
913        nullable: field.nullable,
914    }
915}
916
917fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
918    match remote_type.r#type.as_ref().unwrap() {
919        protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
920        protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
921        protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
922        protobuf::remote_type::Type::PostgresFloat4(_) => {
923            RemoteType::Postgres(PostgresType::Float4)
924        }
925        protobuf::remote_type::Type::PostgresFloat8(_) => {
926            RemoteType::Postgres(PostgresType::Float8)
927        }
928        protobuf::remote_type::Type::PostgresNumeric(numeric) => {
929            RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
930        }
931        protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
932        protobuf::remote_type::Type::PostgresVarchar(_) => {
933            RemoteType::Postgres(PostgresType::Varchar)
934        }
935        protobuf::remote_type::Type::PostgresBpchar(_) => {
936            RemoteType::Postgres(PostgresType::Bpchar)
937        }
938        protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
939        protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
940        protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
941        protobuf::remote_type::Type::PostgresTimestamp(_) => {
942            RemoteType::Postgres(PostgresType::Timestamp)
943        }
944        protobuf::remote_type::Type::PostgresTimestampTz(_) => {
945            RemoteType::Postgres(PostgresType::TimestampTz)
946        }
947        protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
948        protobuf::remote_type::Type::PostgresInterval(_) => {
949            RemoteType::Postgres(PostgresType::Interval)
950        }
951        protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
952        protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
953        protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
954        protobuf::remote_type::Type::PostgresInt2Array(_) => {
955            RemoteType::Postgres(PostgresType::Int2Array)
956        }
957        protobuf::remote_type::Type::PostgresInt4Array(_) => {
958            RemoteType::Postgres(PostgresType::Int4Array)
959        }
960        protobuf::remote_type::Type::PostgresInt8Array(_) => {
961            RemoteType::Postgres(PostgresType::Int8Array)
962        }
963        protobuf::remote_type::Type::PostgresFloat4Array(_) => {
964            RemoteType::Postgres(PostgresType::Float4Array)
965        }
966        protobuf::remote_type::Type::PostgresFloat8Array(_) => {
967            RemoteType::Postgres(PostgresType::Float8Array)
968        }
969        protobuf::remote_type::Type::PostgresVarcharArray(_) => {
970            RemoteType::Postgres(PostgresType::VarcharArray)
971        }
972        protobuf::remote_type::Type::PostgresBpcharArray(_) => {
973            RemoteType::Postgres(PostgresType::BpcharArray)
974        }
975        protobuf::remote_type::Type::PostgresTextArray(_) => {
976            RemoteType::Postgres(PostgresType::TextArray)
977        }
978        protobuf::remote_type::Type::PostgresByteaArray(_) => {
979            RemoteType::Postgres(PostgresType::ByteaArray)
980        }
981        protobuf::remote_type::Type::PostgresBoolArray(_) => {
982            RemoteType::Postgres(PostgresType::BoolArray)
983        }
984        protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
985            RemoteType::Postgres(PostgresType::PostGisGeometry)
986        }
987        protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
988        protobuf::remote_type::Type::PostgresXml(_) => RemoteType::Postgres(PostgresType::Xml),
989        protobuf::remote_type::Type::PostgresUuid(_) => RemoteType::Postgres(PostgresType::Uuid),
990        protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
991        protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
992            RemoteType::Mysql(MysqlType::TinyIntUnsigned)
993        }
994        protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
995        protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
996            RemoteType::Mysql(MysqlType::SmallIntUnsigned)
997        }
998        protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
999        protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
1000            RemoteType::Mysql(MysqlType::MediumIntUnsigned)
1001        }
1002        protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
1003        protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
1004            RemoteType::Mysql(MysqlType::IntegerUnsigned)
1005        }
1006        protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
1007        protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
1008            RemoteType::Mysql(MysqlType::BigIntUnsigned)
1009        }
1010        protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
1011        protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
1012        protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
1013            MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
1014        ),
1015        protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
1016        protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
1017        protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
1018        protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
1019        protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
1020        protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
1021        protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
1022        protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
1023        protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
1024        protobuf::remote_type::Type::MysqlText(text) => {
1025            RemoteType::Mysql(MysqlType::Text(text.length))
1026        }
1027        protobuf::remote_type::Type::MysqlBlob(blob) => {
1028            RemoteType::Mysql(MysqlType::Blob(blob.length))
1029        }
1030        protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
1031        protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
1032        protobuf::remote_type::Type::OracleVarchar2(varchar) => {
1033            RemoteType::Oracle(OracleType::Varchar2(varchar.length))
1034        }
1035        protobuf::remote_type::Type::OracleChar(char) => {
1036            RemoteType::Oracle(OracleType::Char(char.length))
1037        }
1038        protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
1039            OracleType::Number(number.precision as u8, number.scale as i8),
1040        ),
1041        protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
1042        protobuf::remote_type::Type::OracleTimestamp(_) => {
1043            RemoteType::Oracle(OracleType::Timestamp)
1044        }
1045        protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
1046        protobuf::remote_type::Type::OracleBinaryFloat(_) => {
1047            RemoteType::Oracle(OracleType::BinaryFloat)
1048        }
1049        protobuf::remote_type::Type::OracleBinaryDouble(_) => {
1050            RemoteType::Oracle(OracleType::BinaryDouble)
1051        }
1052        protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
1053            RemoteType::Oracle(OracleType::Float(*precision as u8))
1054        }
1055        protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
1056            RemoteType::Oracle(OracleType::NChar(*length))
1057        }
1058        protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
1059            RemoteType::Oracle(OracleType::NVarchar2(*length))
1060        }
1061        protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
1062            RemoteType::Oracle(OracleType::Raw(*length))
1063        }
1064        protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
1065        protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
1066        protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
1067        protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
1068        protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
1069        protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1070        protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1071        protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1072        protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1073        protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1074        protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1075        protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1076        protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1077        protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1078        protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1079        protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1080        protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1081            RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1082        }
1083        protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1084            RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1085        }
1086        protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1087            RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1088        }
1089        protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1090            RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1091        }
1092        protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1093        protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1094            RemoteType::Dm(DmType::Binary(*length as u16))
1095        }
1096        protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1097            RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1098        }
1099        protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1100        protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1101        protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1102            RemoteType::Dm(DmType::Timestamp(*precision as u8))
1103        }
1104        protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1105            RemoteType::Dm(DmType::Time(*precision as u8))
1106        }
1107        protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1108    }
1109}