1#[cfg(feature = "dm")]
2use crate::DmConnectionOptions;
3#[cfg(feature = "mysql")]
4use crate::MysqlConnectionOptions;
5#[cfg(feature = "oracle")]
6use crate::OracleConnectionOptions;
7#[cfg(feature = "postgres")]
8use crate::PostgresConnectionOptions;
9#[cfg(feature = "sqlite")]
10use crate::SqliteConnectionOptions;
11use crate::generated::prost as protobuf;
12use crate::{
13 Connection, ConnectionOptions, DFResult, DefaultTransform, DmType, MysqlType, OracleType,
14 PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef, RemoteTableExec, RemoteType,
15 SqliteType, Transform, connect,
16};
17use datafusion::arrow::datatypes::SchemaRef;
18use datafusion::common::DataFusionError;
19use datafusion::execution::FunctionRegistry;
20use datafusion::physical_plan::ExecutionPlan;
21use datafusion_proto::convert_required;
22use datafusion_proto::physical_plan::PhysicalExtensionCodec;
23use datafusion_proto::protobuf::proto_error;
24use derive_with::With;
25use log::debug;
26use prost::Message;
27use std::fmt::Debug;
28#[cfg(feature = "sqlite")]
29use std::path::Path;
30use std::sync::Arc;
31
32pub trait TransformCodec: Debug + Send + Sync {
33 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
34 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
35}
36
37#[derive(Debug)]
38pub struct DefaultTransformCodec {}
39
40const DEFAULT_TRANSFORM_ID: &str = "__default";
41
42impl TransformCodec for DefaultTransformCodec {
43 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
44 if value.as_any().is::<DefaultTransform>() {
45 Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
46 } else {
47 Err(DataFusionError::Execution(format!(
48 "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
49 )))
50 }
51 }
52
53 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
54 if value == DEFAULT_TRANSFORM_ID.as_bytes() {
55 Ok(Arc::new(DefaultTransform {}))
56 } else {
57 Err(DataFusionError::Execution(
58 "DefaultTransformCodec only supports DefaultTransform".to_string(),
59 ))
60 }
61 }
62}
63
64pub trait ConnectionCodec: Debug + Send + Sync {
65 fn try_encode(
66 &self,
67 value: &dyn Connection,
68 conn_options: &ConnectionOptions,
69 ) -> DFResult<Vec<u8>>;
70 fn try_decode(
71 &self,
72 value: &[u8],
73 conn_options: &ConnectionOptions,
74 ) -> DFResult<Arc<dyn Connection>>;
75}
76
77#[derive(Debug)]
78pub struct DefaultConnectionCodec;
79
80const DEFAULT_CONNECTION_VALUE: &str = "__default";
81
82impl ConnectionCodec for DefaultConnectionCodec {
83 fn try_encode(
84 &self,
85 _value: &dyn Connection,
86 _conn_options: &ConnectionOptions,
87 ) -> DFResult<Vec<u8>> {
88 Ok(DEFAULT_CONNECTION_VALUE.as_bytes().to_vec())
89 }
90
91 fn try_decode(
92 &self,
93 value: &[u8],
94 conn_options: &ConnectionOptions,
95 ) -> DFResult<Arc<dyn Connection>> {
96 if value != DEFAULT_CONNECTION_VALUE.as_bytes() {
97 return Err(DataFusionError::Execution(format!(
98 "DefaultConnectionCodec only supports {DEFAULT_CONNECTION_VALUE} value but got: {value:?}"
99 )));
100 }
101 tokio::task::block_in_place(|| {
102 tokio::runtime::Handle::current().block_on(async {
103 let pool = connect(conn_options).await?;
104 let conn = pool.get().await?;
105 Ok::<_, DataFusionError>(conn)
106 })
107 })
108 }
109}
110
111#[derive(Debug, With)]
112pub struct RemotePhysicalCodec {
113 transform_codec: Arc<dyn TransformCodec>,
114 connection_codec: Arc<dyn ConnectionCodec>,
115}
116
117impl RemotePhysicalCodec {
118 pub fn new() -> Self {
119 Self {
120 transform_codec: Arc::new(DefaultTransformCodec {}),
121 connection_codec: Arc::new(DefaultConnectionCodec {}),
122 }
123 }
124}
125
126impl Default for RemotePhysicalCodec {
127 fn default() -> Self {
128 Self::new()
129 }
130}
131
132impl PhysicalExtensionCodec for RemotePhysicalCodec {
133 fn try_decode(
134 &self,
135 buf: &[u8],
136 _inputs: &[Arc<dyn ExecutionPlan>],
137 _registry: &dyn FunctionRegistry,
138 ) -> DFResult<Arc<dyn ExecutionPlan>> {
139 let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
140 DataFusionError::Internal(format!(
141 "Failed to decode remote table execution plan: {e:?}"
142 ))
143 })?;
144
145 let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
146 Arc::new(DefaultTransform {})
147 } else {
148 self.transform_codec.try_decode(&proto.transform)?
149 };
150
151 let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
152 let remote_schema = proto
153 .remote_schema
154 .map(|schema| Arc::new(parse_remote_schema(&schema)));
155
156 let projection: Option<Vec<usize>> = proto
157 .projection
158 .map(|p| p.projection.iter().map(|n| *n as usize).collect());
159
160 let limit = proto.limit.map(|l| l as usize);
161
162 let conn_options = parse_connection_options(proto.conn_options.unwrap());
163
164 let now = std::time::Instant::now();
165 let conn = self
166 .connection_codec
167 .try_decode(&proto.connection, &conn_options)?;
168 debug!(
169 "[remote-table] Decoding connection cost {}ms",
170 now.elapsed().as_millis()
171 );
172
173 Ok(Arc::new(RemoteTableExec::try_new(
174 conn_options,
175 proto.sql,
176 table_schema,
177 remote_schema,
178 projection,
179 proto.unparsed_filters,
180 limit,
181 transform,
182 conn,
183 )?))
184 }
185
186 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
187 if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
188 let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
189 DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
190 } else {
191 let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
192 assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
193 bytes
194 };
195
196 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
197 let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
198
199 let serialized_conn = self
200 .connection_codec
201 .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
202
203 let proto = protobuf::RemoteTableExec {
204 conn_options: Some(serialized_connection_options),
205 sql: exec.sql.clone(),
206 table_schema: Some(exec.table_schema.as_ref().try_into()?),
207 remote_schema,
208 projection: exec
209 .projection
210 .as_ref()
211 .map(|p| serialize_projection(p.as_slice())),
212 unparsed_filters: exec.unparsed_filters.clone(),
213 limit: exec.limit.map(|l| l as u32),
214 transform: serialized_transform,
215 connection: serialized_conn,
216 };
217
218 proto.encode(buf).map_err(|e| {
219 DataFusionError::Internal(format!(
220 "Failed to encode remote table execution plan: {e:?}"
221 ))
222 })?;
223 Ok(())
224 } else {
225 Err(DataFusionError::Execution(format!(
226 "Failed to encode {}",
227 RemoteTableExec::static_name()
228 )))
229 }
230 }
231}
232
233fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
234 match options {
235 #[cfg(feature = "postgres")]
236 ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
237 connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
238 protobuf::PostgresConnectionOptions {
239 host: options.host.clone(),
240 port: options.port as u32,
241 username: options.username.clone(),
242 password: options.password.clone(),
243 database: options.database.clone(),
244 pool_max_size: options.pool_max_size as u32,
245 stream_chunk_size: options.stream_chunk_size as u32,
246 },
247 )),
248 },
249 #[cfg(feature = "mysql")]
250 ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
251 connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
252 protobuf::MysqlConnectionOptions {
253 host: options.host.clone(),
254 port: options.port as u32,
255 username: options.username.clone(),
256 password: options.password.clone(),
257 database: options.database.clone(),
258 pool_max_size: options.pool_max_size as u32,
259 stream_chunk_size: options.stream_chunk_size as u32,
260 },
261 )),
262 },
263 #[cfg(feature = "oracle")]
264 ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
265 connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
266 protobuf::OracleConnectionOptions {
267 host: options.host.clone(),
268 port: options.port as u32,
269 username: options.username.clone(),
270 password: options.password.clone(),
271 service_name: options.service_name.clone(),
272 pool_max_size: options.pool_max_size as u32,
273 stream_chunk_size: options.stream_chunk_size as u32,
274 },
275 )),
276 },
277 #[cfg(feature = "sqlite")]
278 ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
279 connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
280 protobuf::SqliteConnectionOptions {
281 path: options.path.to_str().unwrap().to_string(),
282 stream_chunk_size: options.stream_chunk_size as u32,
283 },
284 )),
285 },
286 #[cfg(feature = "dm")]
287 ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
288 connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
289 protobuf::DmConnectionOptions {
290 host: options.host.clone(),
291 port: options.port as u32,
292 username: options.username.clone(),
293 password: options.password.clone(),
294 schema: options.schema.clone(),
295 stream_chunk_size: options.stream_chunk_size as u32,
296 driver: options.driver.clone(),
297 },
298 )),
299 },
300 }
301}
302
303fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
304 match options.connection_options {
305 #[cfg(feature = "postgres")]
306 Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
307 ConnectionOptions::Postgres(PostgresConnectionOptions {
308 host: options.host,
309 port: options.port as u16,
310 username: options.username,
311 password: options.password,
312 database: options.database,
313 pool_max_size: options.pool_max_size as usize,
314 stream_chunk_size: options.stream_chunk_size as usize,
315 })
316 }
317 #[cfg(feature = "mysql")]
318 Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
319 ConnectionOptions::Mysql(MysqlConnectionOptions {
320 host: options.host,
321 port: options.port as u16,
322 username: options.username,
323 password: options.password,
324 database: options.database,
325 pool_max_size: options.pool_max_size as usize,
326 stream_chunk_size: options.stream_chunk_size as usize,
327 })
328 }
329 #[cfg(feature = "oracle")]
330 Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
331 ConnectionOptions::Oracle(OracleConnectionOptions {
332 host: options.host,
333 port: options.port as u16,
334 username: options.username,
335 password: options.password,
336 service_name: options.service_name,
337 pool_max_size: options.pool_max_size as usize,
338 stream_chunk_size: options.stream_chunk_size as usize,
339 })
340 }
341 #[cfg(feature = "sqlite")]
342 Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
343 ConnectionOptions::Sqlite(SqliteConnectionOptions {
344 path: Path::new(&options.path).to_path_buf(),
345 stream_chunk_size: options.stream_chunk_size as usize,
346 })
347 }
348 #[cfg(feature = "dm")]
349 Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
350 ConnectionOptions::Dm(DmConnectionOptions {
351 host: options.host,
352 port: options.port as u16,
353 username: options.username,
354 password: options.password,
355 schema: options.schema,
356 stream_chunk_size: options.stream_chunk_size as usize,
357 driver: options.driver,
358 })
359 }
360 _ => panic!("Failed to parse connection options: {options:?}"),
361 }
362}
363
364fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
365 protobuf::Projection {
366 projection: projection.iter().map(|n| *n as u32).collect(),
367 }
368}
369
370fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
371 let fields = remote_schema
372 .fields
373 .iter()
374 .map(serialize_remote_field)
375 .collect::<Vec<_>>();
376
377 protobuf::RemoteSchema { fields }
378}
379
380fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
381 protobuf::RemoteField {
382 name: remote_field.name.clone(),
383 remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
384 nullable: remote_field.nullable,
385 }
386}
387
388fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
389 match remote_type {
390 RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
391 r#type: Some(protobuf::remote_type::Type::PostgresInt2(
392 protobuf::PostgresInt2 {},
393 )),
394 },
395 RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
396 r#type: Some(protobuf::remote_type::Type::PostgresInt4(
397 protobuf::PostgresInt4 {},
398 )),
399 },
400 RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
401 r#type: Some(protobuf::remote_type::Type::PostgresInt8(
402 protobuf::PostgresInt8 {},
403 )),
404 },
405 RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
406 r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
407 protobuf::PostgresFloat4 {},
408 )),
409 },
410 RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
411 r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
412 protobuf::PostgresFloat8 {},
413 )),
414 },
415 RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
416 r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
417 protobuf::PostgresNumeric {
418 scale: *scale as i32,
419 },
420 )),
421 },
422 RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
423 r#type: Some(protobuf::remote_type::Type::PostgresName(
424 protobuf::PostgresName {},
425 )),
426 },
427 RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
428 r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
429 protobuf::PostgresVarchar {},
430 )),
431 },
432 RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
433 r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
434 protobuf::PostgresBpchar {},
435 )),
436 },
437 RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
438 r#type: Some(protobuf::remote_type::Type::PostgresText(
439 protobuf::PostgresText {},
440 )),
441 },
442 RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
443 r#type: Some(protobuf::remote_type::Type::PostgresBytea(
444 protobuf::PostgresBytea {},
445 )),
446 },
447 RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
448 r#type: Some(protobuf::remote_type::Type::PostgresDate(
449 protobuf::PostgresDate {},
450 )),
451 },
452 RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
453 r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
454 protobuf::PostgresTimestamp {},
455 )),
456 },
457 RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
458 r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
459 protobuf::PostgresTimestampTz {},
460 )),
461 },
462 RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
463 r#type: Some(protobuf::remote_type::Type::PostgresTime(
464 protobuf::PostgresTime {},
465 )),
466 },
467 RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
468 r#type: Some(protobuf::remote_type::Type::PostgresInterval(
469 protobuf::PostgresInterval {},
470 )),
471 },
472 RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
473 r#type: Some(protobuf::remote_type::Type::PostgresBool(
474 protobuf::PostgresBool {},
475 )),
476 },
477 RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
478 r#type: Some(protobuf::remote_type::Type::PostgresJson(
479 protobuf::PostgresJson {},
480 )),
481 },
482 RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
483 r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
484 protobuf::PostgresJsonb {},
485 )),
486 },
487 RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
488 r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
489 protobuf::PostgresInt2Array {},
490 )),
491 },
492 RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
493 r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
494 protobuf::PostgresInt4Array {},
495 )),
496 },
497 RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
498 r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
499 protobuf::PostgresInt8Array {},
500 )),
501 },
502 RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
503 r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
504 protobuf::PostgresFloat4Array {},
505 )),
506 },
507 RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
508 r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
509 protobuf::PostgresFloat8Array {},
510 )),
511 },
512 RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
513 r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
514 protobuf::PostgresVarcharArray {},
515 )),
516 },
517 RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
518 r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
519 protobuf::PostgresBpcharArray {},
520 )),
521 },
522 RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
523 r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
524 protobuf::PostgresTextArray {},
525 )),
526 },
527 RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
528 r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
529 protobuf::PostgresByteaArray {},
530 )),
531 },
532 RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
533 r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
534 protobuf::PostgresBoolArray {},
535 )),
536 },
537 RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
538 r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
539 protobuf::PostgresPostGisGeometry {},
540 )),
541 },
542 RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
543 r#type: Some(protobuf::remote_type::Type::PostgresOid(
544 protobuf::PostgresOid {},
545 )),
546 },
547
548 RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
549 r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
550 protobuf::MysqlTinyInt {},
551 )),
552 },
553 RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
554 r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
555 protobuf::MysqlTinyIntUnsigned {},
556 )),
557 },
558 RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
559 r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
560 protobuf::MysqlSmallInt {},
561 )),
562 },
563 RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
564 r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
565 protobuf::MysqlSmallIntUnsigned {},
566 )),
567 },
568 RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
569 r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
570 protobuf::MysqlMediumInt {},
571 )),
572 },
573 RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
574 r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
575 protobuf::MysqlMediumIntUnsigned {},
576 )),
577 },
578 RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
579 r#type: Some(protobuf::remote_type::Type::MysqlInteger(
580 protobuf::MysqlInteger {},
581 )),
582 },
583 RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
584 r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
585 protobuf::MysqlIntegerUnsigned {},
586 )),
587 },
588 RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
589 r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
590 protobuf::MysqlBigInt {},
591 )),
592 },
593 RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
594 r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
595 protobuf::MysqlBigIntUnsigned {},
596 )),
597 },
598 RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
599 r#type: Some(protobuf::remote_type::Type::MysqlFloat(
600 protobuf::MysqlFloat {},
601 )),
602 },
603 RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
604 r#type: Some(protobuf::remote_type::Type::MysqlDouble(
605 protobuf::MysqlDouble {},
606 )),
607 },
608 RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
609 r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
610 protobuf::MysqlDecimal {
611 precision: *precision as u32,
612 scale: *scale as u32,
613 },
614 )),
615 },
616 RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
617 r#type: Some(protobuf::remote_type::Type::MysqlDate(
618 protobuf::MysqlDate {},
619 )),
620 },
621 RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
622 r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
623 protobuf::MysqlDateTime {},
624 )),
625 },
626 RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
627 r#type: Some(protobuf::remote_type::Type::MysqlTime(
628 protobuf::MysqlTime {},
629 )),
630 },
631 RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
632 r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
633 protobuf::MysqlTimestamp {},
634 )),
635 },
636 RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
637 r#type: Some(protobuf::remote_type::Type::MysqlYear(
638 protobuf::MysqlYear {},
639 )),
640 },
641 RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
642 r#type: Some(protobuf::remote_type::Type::MysqlChar(
643 protobuf::MysqlChar {},
644 )),
645 },
646 RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
647 r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
648 protobuf::MysqlVarchar {},
649 )),
650 },
651 RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
652 r#type: Some(protobuf::remote_type::Type::MysqlBinary(
653 protobuf::MysqlBinary {},
654 )),
655 },
656 RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
657 r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
658 protobuf::MysqlVarbinary {},
659 )),
660 },
661 RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
662 r#type: Some(protobuf::remote_type::Type::MysqlText(
663 protobuf::MysqlText { length: *len },
664 )),
665 },
666 RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
667 r#type: Some(protobuf::remote_type::Type::MysqlBlob(
668 protobuf::MysqlBlob { length: *len },
669 )),
670 },
671 RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
672 r#type: Some(protobuf::remote_type::Type::MysqlJson(
673 protobuf::MysqlJson {},
674 )),
675 },
676 RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
677 r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
678 protobuf::MysqlGeometry {},
679 )),
680 },
681
682 RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
683 r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
684 protobuf::OracleVarchar2 { length: *len },
685 )),
686 },
687 RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
688 r#type: Some(protobuf::remote_type::Type::OracleChar(
689 protobuf::OracleChar { length: *len },
690 )),
691 },
692 RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
693 r#type: Some(protobuf::remote_type::Type::OracleNumber(
694 protobuf::OracleNumber {
695 precision: *precision as u32,
696 scale: *scale as i32,
697 },
698 )),
699 },
700 RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
701 r#type: Some(protobuf::remote_type::Type::OracleDate(
702 protobuf::OracleDate {},
703 )),
704 },
705 RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
706 r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
707 protobuf::OracleTimestamp {},
708 )),
709 },
710 RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
711 r#type: Some(protobuf::remote_type::Type::OracleBoolean(
712 protobuf::OracleBoolean {},
713 )),
714 },
715 RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
716 r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
717 protobuf::OracleBinaryFloat {},
718 )),
719 },
720 RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
721 r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
722 protobuf::OracleBinaryDouble {},
723 )),
724 },
725 RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
726 r#type: Some(protobuf::remote_type::Type::OracleBlob(
727 protobuf::OracleBlob {},
728 )),
729 },
730 RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
731 r#type: Some(protobuf::remote_type::Type::OracleFloat(
732 protobuf::OracleFloat {
733 precision: *precision as u32,
734 },
735 )),
736 },
737 RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
738 r#type: Some(protobuf::remote_type::Type::OracleNchar(
739 protobuf::OracleNChar { length: *len },
740 )),
741 },
742 RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
743 r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
744 protobuf::OracleNVarchar2 { length: *len },
745 )),
746 },
747 RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
748 r#type: Some(protobuf::remote_type::Type::OracleRaw(
749 protobuf::OracleRaw { length: *len },
750 )),
751 },
752 RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
753 r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
754 protobuf::OracleLongRaw {},
755 )),
756 },
757 RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
758 r#type: Some(protobuf::remote_type::Type::OracleLong(
759 protobuf::OracleLong {},
760 )),
761 },
762 RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
763 r#type: Some(protobuf::remote_type::Type::OracleClob(
764 protobuf::OracleClob {},
765 )),
766 },
767 RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
768 r#type: Some(protobuf::remote_type::Type::OracleNclob(
769 protobuf::OracleNClob {},
770 )),
771 },
772 RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
773 r#type: Some(protobuf::remote_type::Type::SqliteNull(
774 protobuf::SqliteNull {},
775 )),
776 },
777 RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
778 r#type: Some(protobuf::remote_type::Type::SqliteInteger(
779 protobuf::SqliteInteger {},
780 )),
781 },
782 RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
783 r#type: Some(protobuf::remote_type::Type::SqliteReal(
784 protobuf::SqliteReal {},
785 )),
786 },
787 RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
788 r#type: Some(protobuf::remote_type::Type::SqliteText(
789 protobuf::SqliteText {},
790 )),
791 },
792 RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
793 r#type: Some(protobuf::remote_type::Type::SqliteBlob(
794 protobuf::SqliteBlob {},
795 )),
796 },
797 RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
798 r#type: Some(protobuf::remote_type::Type::DmTinyInt(
799 protobuf::DmTinyInt {},
800 )),
801 },
802 RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
803 r#type: Some(protobuf::remote_type::Type::DmSmallInt(
804 protobuf::DmSmallInt {},
805 )),
806 },
807 RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
808 r#type: Some(protobuf::remote_type::Type::DmInteger(
809 protobuf::DmInteger {},
810 )),
811 },
812 RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
813 r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
814 },
815 RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
816 r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
817 },
818 RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
819 r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
820 },
821 RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
822 r#type: Some(protobuf::remote_type::Type::DmNumeric(
823 protobuf::DmNumeric {
824 precision: *precision as u32,
825 scale: *scale as i32,
826 },
827 )),
828 },
829 RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
830 r#type: Some(protobuf::remote_type::Type::DmDecimal(
831 protobuf::DmDecimal {
832 precision: *precision as u32,
833 scale: *scale as i32,
834 },
835 )),
836 },
837 RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
838 r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
839 length: len.map(|s| s as u32),
840 })),
841 },
842 RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
843 r#type: Some(protobuf::remote_type::Type::DmVarchar(
844 protobuf::DmVarchar {
845 length: len.map(|s| s as u32),
846 },
847 )),
848 },
849 RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
850 r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
851 },
852 RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
853 r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
854 length: *len as u32,
855 })),
856 },
857 RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
858 r#type: Some(protobuf::remote_type::Type::DmVarbinary(
859 protobuf::DmVarbinary {
860 length: len.map(|s| s as u32),
861 },
862 )),
863 },
864 RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
865 r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
866 },
867 RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
868 r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
869 },
870 RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
871 r#type: Some(protobuf::remote_type::Type::DmTimestamp(
872 protobuf::DmTimestamp {
873 precision: *precision as u32,
874 },
875 )),
876 },
877 RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
878 r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
879 precision: *precision as u32,
880 })),
881 },
882 RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
883 r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
884 },
885 }
886}
887
888fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
889 let fields = remote_schema
890 .fields
891 .iter()
892 .map(parse_remote_field)
893 .collect::<Vec<_>>();
894
895 RemoteSchema { fields }
896}
897
898fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
899 RemoteField {
900 name: field.name.clone(),
901 remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
902 nullable: field.nullable,
903 }
904}
905
906fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
907 match remote_type.r#type.as_ref().unwrap() {
908 protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
909 protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
910 protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
911 protobuf::remote_type::Type::PostgresFloat4(_) => {
912 RemoteType::Postgres(PostgresType::Float4)
913 }
914 protobuf::remote_type::Type::PostgresFloat8(_) => {
915 RemoteType::Postgres(PostgresType::Float8)
916 }
917 protobuf::remote_type::Type::PostgresNumeric(numeric) => {
918 RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
919 }
920 protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
921 protobuf::remote_type::Type::PostgresVarchar(_) => {
922 RemoteType::Postgres(PostgresType::Varchar)
923 }
924 protobuf::remote_type::Type::PostgresBpchar(_) => {
925 RemoteType::Postgres(PostgresType::Bpchar)
926 }
927 protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
928 protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
929 protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
930 protobuf::remote_type::Type::PostgresTimestamp(_) => {
931 RemoteType::Postgres(PostgresType::Timestamp)
932 }
933 protobuf::remote_type::Type::PostgresTimestampTz(_) => {
934 RemoteType::Postgres(PostgresType::TimestampTz)
935 }
936 protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
937 protobuf::remote_type::Type::PostgresInterval(_) => {
938 RemoteType::Postgres(PostgresType::Interval)
939 }
940 protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
941 protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
942 protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
943 protobuf::remote_type::Type::PostgresInt2Array(_) => {
944 RemoteType::Postgres(PostgresType::Int2Array)
945 }
946 protobuf::remote_type::Type::PostgresInt4Array(_) => {
947 RemoteType::Postgres(PostgresType::Int4Array)
948 }
949 protobuf::remote_type::Type::PostgresInt8Array(_) => {
950 RemoteType::Postgres(PostgresType::Int8Array)
951 }
952 protobuf::remote_type::Type::PostgresFloat4Array(_) => {
953 RemoteType::Postgres(PostgresType::Float4Array)
954 }
955 protobuf::remote_type::Type::PostgresFloat8Array(_) => {
956 RemoteType::Postgres(PostgresType::Float8Array)
957 }
958 protobuf::remote_type::Type::PostgresVarcharArray(_) => {
959 RemoteType::Postgres(PostgresType::VarcharArray)
960 }
961 protobuf::remote_type::Type::PostgresBpcharArray(_) => {
962 RemoteType::Postgres(PostgresType::BpcharArray)
963 }
964 protobuf::remote_type::Type::PostgresTextArray(_) => {
965 RemoteType::Postgres(PostgresType::TextArray)
966 }
967 protobuf::remote_type::Type::PostgresByteaArray(_) => {
968 RemoteType::Postgres(PostgresType::ByteaArray)
969 }
970 protobuf::remote_type::Type::PostgresBoolArray(_) => {
971 RemoteType::Postgres(PostgresType::BoolArray)
972 }
973 protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
974 RemoteType::Postgres(PostgresType::PostGisGeometry)
975 }
976 protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
977 protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
978 protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
979 RemoteType::Mysql(MysqlType::TinyIntUnsigned)
980 }
981 protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
982 protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
983 RemoteType::Mysql(MysqlType::SmallIntUnsigned)
984 }
985 protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
986 protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
987 RemoteType::Mysql(MysqlType::MediumIntUnsigned)
988 }
989 protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
990 protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
991 RemoteType::Mysql(MysqlType::IntegerUnsigned)
992 }
993 protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
994 protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
995 RemoteType::Mysql(MysqlType::BigIntUnsigned)
996 }
997 protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
998 protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
999 protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
1000 MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
1001 ),
1002 protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
1003 protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
1004 protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
1005 protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
1006 protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
1007 protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
1008 protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
1009 protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
1010 protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
1011 protobuf::remote_type::Type::MysqlText(text) => {
1012 RemoteType::Mysql(MysqlType::Text(text.length))
1013 }
1014 protobuf::remote_type::Type::MysqlBlob(blob) => {
1015 RemoteType::Mysql(MysqlType::Blob(blob.length))
1016 }
1017 protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
1018 protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
1019 protobuf::remote_type::Type::OracleVarchar2(varchar) => {
1020 RemoteType::Oracle(OracleType::Varchar2(varchar.length))
1021 }
1022 protobuf::remote_type::Type::OracleChar(char) => {
1023 RemoteType::Oracle(OracleType::Char(char.length))
1024 }
1025 protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
1026 OracleType::Number(number.precision as u8, number.scale as i8),
1027 ),
1028 protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
1029 protobuf::remote_type::Type::OracleTimestamp(_) => {
1030 RemoteType::Oracle(OracleType::Timestamp)
1031 }
1032 protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
1033 protobuf::remote_type::Type::OracleBinaryFloat(_) => {
1034 RemoteType::Oracle(OracleType::BinaryFloat)
1035 }
1036 protobuf::remote_type::Type::OracleBinaryDouble(_) => {
1037 RemoteType::Oracle(OracleType::BinaryDouble)
1038 }
1039 protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
1040 RemoteType::Oracle(OracleType::Float(*precision as u8))
1041 }
1042 protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
1043 RemoteType::Oracle(OracleType::NChar(*length))
1044 }
1045 protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
1046 RemoteType::Oracle(OracleType::NVarchar2(*length))
1047 }
1048 protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
1049 RemoteType::Oracle(OracleType::Raw(*length))
1050 }
1051 protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
1052 protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
1053 protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
1054 protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
1055 protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
1056 protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1057 protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1058 protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1059 protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1060 protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1061 protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1062 protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1063 protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1064 protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1065 protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1066 protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1067 protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1068 RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1069 }
1070 protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1071 RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1072 }
1073 protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1074 RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1075 }
1076 protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1077 RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1078 }
1079 protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1080 protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1081 RemoteType::Dm(DmType::Binary(*length as u16))
1082 }
1083 protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1084 RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1085 }
1086 protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1087 protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1088 protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1089 RemoteType::Dm(DmType::Timestamp(*precision as u8))
1090 }
1091 protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1092 RemoteType::Dm(DmType::Time(*precision as u8))
1093 }
1094 protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1095 }
1096}