1#[cfg(feature = "dm")]
2use crate::DmConnectionOptions;
3#[cfg(feature = "mysql")]
4use crate::MysqlConnectionOptions;
5#[cfg(feature = "oracle")]
6use crate::OracleConnectionOptions;
7#[cfg(feature = "postgres")]
8use crate::PostgresConnectionOptions;
9#[cfg(feature = "sqlite")]
10use crate::SqliteConnectionOptions;
11use crate::generated::prost as protobuf;
12use crate::{
13 Connection, ConnectionOptions, DFResult, DefaultTransform, DmType, MysqlType, OracleType,
14 PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef, RemoteTableExec, RemoteType,
15 SqliteType, Transform, connect,
16};
17use datafusion::arrow::datatypes::SchemaRef;
18use datafusion::common::DataFusionError;
19use datafusion::execution::FunctionRegistry;
20use datafusion::physical_plan::ExecutionPlan;
21use datafusion_proto::convert_required;
22use datafusion_proto::physical_plan::PhysicalExtensionCodec;
23use datafusion_proto::protobuf::proto_error;
24use derive_with::With;
25use log::debug;
26use prost::Message;
27use std::fmt::Debug;
28#[cfg(feature = "sqlite")]
29use std::path::Path;
30use std::sync::Arc;
31
32pub trait TransformCodec: Debug + Send + Sync {
33 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
34 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
35}
36
37#[derive(Debug)]
38pub struct DefaultTransformCodec {}
39
40const DEFAULT_TRANSFORM_ID: &str = "__default";
41
42impl TransformCodec for DefaultTransformCodec {
43 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
44 if value.as_any().is::<DefaultTransform>() {
45 Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
46 } else {
47 Err(DataFusionError::Execution(format!(
48 "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
49 )))
50 }
51 }
52
53 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
54 if value == DEFAULT_TRANSFORM_ID.as_bytes() {
55 Ok(Arc::new(DefaultTransform {}))
56 } else {
57 Err(DataFusionError::Execution(
58 "DefaultTransformCodec only supports DefaultTransform".to_string(),
59 ))
60 }
61 }
62}
63
64pub trait ConnectionCodec: Debug + Send + Sync {
65 fn try_encode(
66 &self,
67 value: &dyn Connection,
68 conn_options: &ConnectionOptions,
69 ) -> DFResult<Vec<u8>>;
70 fn try_decode(
71 &self,
72 value: &[u8],
73 conn_options: &ConnectionOptions,
74 ) -> DFResult<Arc<dyn Connection>>;
75}
76
77#[derive(Debug)]
78pub struct DefaultConnectionCodec;
79
80const DEFAULT_CONNECTION_VALUE: &str = "__default";
81
82impl ConnectionCodec for DefaultConnectionCodec {
83 fn try_encode(
84 &self,
85 _value: &dyn Connection,
86 _conn_options: &ConnectionOptions,
87 ) -> DFResult<Vec<u8>> {
88 Ok(DEFAULT_CONNECTION_VALUE.as_bytes().to_vec())
89 }
90
91 fn try_decode(
92 &self,
93 value: &[u8],
94 conn_options: &ConnectionOptions,
95 ) -> DFResult<Arc<dyn Connection>> {
96 if value != DEFAULT_CONNECTION_VALUE.as_bytes() {
97 return Err(DataFusionError::Execution(format!(
98 "DefaultConnectionCodec only supports {DEFAULT_CONNECTION_VALUE} value but got: {value:?}"
99 )));
100 }
101 let conn_options = conn_options.clone().with_pool_max_size(1);
102 tokio::task::block_in_place(|| {
103 tokio::runtime::Handle::current().block_on(async {
104 let pool = connect(&conn_options).await?;
105 let conn = pool.get().await?;
106 Ok::<_, DataFusionError>(conn)
107 })
108 })
109 }
110}
111
112#[derive(Debug, With)]
113pub struct RemotePhysicalCodec {
114 transform_codec: Arc<dyn TransformCodec>,
115 connection_codec: Arc<dyn ConnectionCodec>,
116}
117
118impl RemotePhysicalCodec {
119 pub fn new() -> Self {
120 Self {
121 transform_codec: Arc::new(DefaultTransformCodec {}),
122 connection_codec: Arc::new(DefaultConnectionCodec {}),
123 }
124 }
125}
126
127impl Default for RemotePhysicalCodec {
128 fn default() -> Self {
129 Self::new()
130 }
131}
132
133impl PhysicalExtensionCodec for RemotePhysicalCodec {
134 fn try_decode(
135 &self,
136 buf: &[u8],
137 _inputs: &[Arc<dyn ExecutionPlan>],
138 _registry: &dyn FunctionRegistry,
139 ) -> DFResult<Arc<dyn ExecutionPlan>> {
140 let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
141 DataFusionError::Internal(format!(
142 "Failed to decode remote table execution plan: {e:?}"
143 ))
144 })?;
145
146 let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
147 Arc::new(DefaultTransform {})
148 } else {
149 self.transform_codec.try_decode(&proto.transform)?
150 };
151
152 let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
153 let remote_schema = proto
154 .remote_schema
155 .map(|schema| Arc::new(parse_remote_schema(&schema)));
156
157 let projection: Option<Vec<usize>> = proto
158 .projection
159 .map(|p| p.projection.iter().map(|n| *n as usize).collect());
160
161 let limit = proto.limit.map(|l| l as usize);
162
163 let conn_options = parse_connection_options(proto.conn_options.unwrap());
164
165 let now = std::time::Instant::now();
166 let conn = self
167 .connection_codec
168 .try_decode(&proto.connection, &conn_options)?;
169 debug!(
170 "[remote-table] Decoding connection cost {}ms",
171 now.elapsed().as_millis()
172 );
173
174 Ok(Arc::new(RemoteTableExec::try_new(
175 conn_options,
176 proto.sql,
177 table_schema,
178 remote_schema,
179 projection,
180 proto.unparsed_filters,
181 limit,
182 transform,
183 conn,
184 )?))
185 }
186
187 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
188 if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
189 let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
190 DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
191 } else {
192 let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
193 assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
194 bytes
195 };
196
197 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
198 let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
199
200 let serialized_conn = self
201 .connection_codec
202 .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
203
204 let proto = protobuf::RemoteTableExec {
205 conn_options: Some(serialized_connection_options),
206 sql: exec.sql.clone(),
207 table_schema: Some(exec.table_schema.as_ref().try_into()?),
208 remote_schema,
209 projection: exec
210 .projection
211 .as_ref()
212 .map(|p| serialize_projection(p.as_slice())),
213 unparsed_filters: exec.unparsed_filters.clone(),
214 limit: exec.limit.map(|l| l as u32),
215 transform: serialized_transform,
216 connection: serialized_conn,
217 };
218
219 proto.encode(buf).map_err(|e| {
220 DataFusionError::Internal(format!(
221 "Failed to encode remote table execution plan: {e:?}"
222 ))
223 })?;
224 Ok(())
225 } else {
226 Err(DataFusionError::Execution(format!(
227 "Failed to encode {}",
228 RemoteTableExec::static_name()
229 )))
230 }
231 }
232}
233
234fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
235 match options {
236 #[cfg(feature = "postgres")]
237 ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
238 connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
239 protobuf::PostgresConnectionOptions {
240 host: options.host.clone(),
241 port: options.port as u32,
242 username: options.username.clone(),
243 password: options.password.clone(),
244 database: options.database.clone(),
245 pool_max_size: options.pool_max_size as u32,
246 stream_chunk_size: options.stream_chunk_size as u32,
247 },
248 )),
249 },
250 #[cfg(feature = "mysql")]
251 ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
252 connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
253 protobuf::MysqlConnectionOptions {
254 host: options.host.clone(),
255 port: options.port as u32,
256 username: options.username.clone(),
257 password: options.password.clone(),
258 database: options.database.clone(),
259 pool_max_size: options.pool_max_size as u32,
260 stream_chunk_size: options.stream_chunk_size as u32,
261 },
262 )),
263 },
264 #[cfg(feature = "oracle")]
265 ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
266 connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
267 protobuf::OracleConnectionOptions {
268 host: options.host.clone(),
269 port: options.port as u32,
270 username: options.username.clone(),
271 password: options.password.clone(),
272 service_name: options.service_name.clone(),
273 pool_max_size: options.pool_max_size as u32,
274 stream_chunk_size: options.stream_chunk_size as u32,
275 },
276 )),
277 },
278 #[cfg(feature = "sqlite")]
279 ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
280 connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
281 protobuf::SqliteConnectionOptions {
282 path: options.path.to_str().unwrap().to_string(),
283 stream_chunk_size: options.stream_chunk_size as u32,
284 },
285 )),
286 },
287 #[cfg(feature = "dm")]
288 ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
289 connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
290 protobuf::DmConnectionOptions {
291 host: options.host.clone(),
292 port: options.port as u32,
293 username: options.username.clone(),
294 password: options.password.clone(),
295 schema: options.schema.clone(),
296 stream_chunk_size: options.stream_chunk_size as u32,
297 driver: options.driver.clone(),
298 },
299 )),
300 },
301 }
302}
303
304fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
305 match options.connection_options {
306 #[cfg(feature = "postgres")]
307 Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
308 ConnectionOptions::Postgres(PostgresConnectionOptions {
309 host: options.host,
310 port: options.port as u16,
311 username: options.username,
312 password: options.password,
313 database: options.database,
314 pool_max_size: options.pool_max_size as usize,
315 stream_chunk_size: options.stream_chunk_size as usize,
316 })
317 }
318 #[cfg(feature = "mysql")]
319 Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
320 ConnectionOptions::Mysql(MysqlConnectionOptions {
321 host: options.host,
322 port: options.port as u16,
323 username: options.username,
324 password: options.password,
325 database: options.database,
326 pool_max_size: options.pool_max_size as usize,
327 stream_chunk_size: options.stream_chunk_size as usize,
328 })
329 }
330 #[cfg(feature = "oracle")]
331 Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
332 ConnectionOptions::Oracle(OracleConnectionOptions {
333 host: options.host,
334 port: options.port as u16,
335 username: options.username,
336 password: options.password,
337 service_name: options.service_name,
338 pool_max_size: options.pool_max_size as usize,
339 stream_chunk_size: options.stream_chunk_size as usize,
340 })
341 }
342 #[cfg(feature = "sqlite")]
343 Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
344 ConnectionOptions::Sqlite(SqliteConnectionOptions {
345 path: Path::new(&options.path).to_path_buf(),
346 stream_chunk_size: options.stream_chunk_size as usize,
347 })
348 }
349 #[cfg(feature = "dm")]
350 Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
351 ConnectionOptions::Dm(DmConnectionOptions {
352 host: options.host,
353 port: options.port as u16,
354 username: options.username,
355 password: options.password,
356 schema: options.schema,
357 stream_chunk_size: options.stream_chunk_size as usize,
358 driver: options.driver,
359 })
360 }
361 _ => panic!("Failed to parse connection options: {options:?}"),
362 }
363}
364
365fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
366 protobuf::Projection {
367 projection: projection.iter().map(|n| *n as u32).collect(),
368 }
369}
370
371fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
372 let fields = remote_schema
373 .fields
374 .iter()
375 .map(serialize_remote_field)
376 .collect::<Vec<_>>();
377
378 protobuf::RemoteSchema { fields }
379}
380
381fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
382 protobuf::RemoteField {
383 name: remote_field.name.clone(),
384 remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
385 nullable: remote_field.nullable,
386 }
387}
388
389fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
390 match remote_type {
391 RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
392 r#type: Some(protobuf::remote_type::Type::PostgresInt2(
393 protobuf::PostgresInt2 {},
394 )),
395 },
396 RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
397 r#type: Some(protobuf::remote_type::Type::PostgresInt4(
398 protobuf::PostgresInt4 {},
399 )),
400 },
401 RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
402 r#type: Some(protobuf::remote_type::Type::PostgresInt8(
403 protobuf::PostgresInt8 {},
404 )),
405 },
406 RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
407 r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
408 protobuf::PostgresFloat4 {},
409 )),
410 },
411 RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
412 r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
413 protobuf::PostgresFloat8 {},
414 )),
415 },
416 RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
417 r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
418 protobuf::PostgresNumeric {
419 scale: *scale as i32,
420 },
421 )),
422 },
423 RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
424 r#type: Some(protobuf::remote_type::Type::PostgresName(
425 protobuf::PostgresName {},
426 )),
427 },
428 RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
429 r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
430 protobuf::PostgresVarchar {},
431 )),
432 },
433 RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
434 r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
435 protobuf::PostgresBpchar {},
436 )),
437 },
438 RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
439 r#type: Some(protobuf::remote_type::Type::PostgresText(
440 protobuf::PostgresText {},
441 )),
442 },
443 RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
444 r#type: Some(protobuf::remote_type::Type::PostgresBytea(
445 protobuf::PostgresBytea {},
446 )),
447 },
448 RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
449 r#type: Some(protobuf::remote_type::Type::PostgresDate(
450 protobuf::PostgresDate {},
451 )),
452 },
453 RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
454 r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
455 protobuf::PostgresTimestamp {},
456 )),
457 },
458 RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
459 r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
460 protobuf::PostgresTimestampTz {},
461 )),
462 },
463 RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
464 r#type: Some(protobuf::remote_type::Type::PostgresTime(
465 protobuf::PostgresTime {},
466 )),
467 },
468 RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
469 r#type: Some(protobuf::remote_type::Type::PostgresInterval(
470 protobuf::PostgresInterval {},
471 )),
472 },
473 RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
474 r#type: Some(protobuf::remote_type::Type::PostgresBool(
475 protobuf::PostgresBool {},
476 )),
477 },
478 RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
479 r#type: Some(protobuf::remote_type::Type::PostgresJson(
480 protobuf::PostgresJson {},
481 )),
482 },
483 RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
484 r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
485 protobuf::PostgresJsonb {},
486 )),
487 },
488 RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
489 r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
490 protobuf::PostgresInt2Array {},
491 )),
492 },
493 RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
494 r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
495 protobuf::PostgresInt4Array {},
496 )),
497 },
498 RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
499 r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
500 protobuf::PostgresInt8Array {},
501 )),
502 },
503 RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
504 r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
505 protobuf::PostgresFloat4Array {},
506 )),
507 },
508 RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
509 r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
510 protobuf::PostgresFloat8Array {},
511 )),
512 },
513 RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
514 r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
515 protobuf::PostgresVarcharArray {},
516 )),
517 },
518 RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
519 r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
520 protobuf::PostgresBpcharArray {},
521 )),
522 },
523 RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
524 r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
525 protobuf::PostgresTextArray {},
526 )),
527 },
528 RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
529 r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
530 protobuf::PostgresByteaArray {},
531 )),
532 },
533 RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
534 r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
535 protobuf::PostgresBoolArray {},
536 )),
537 },
538 RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
539 r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
540 protobuf::PostgresPostGisGeometry {},
541 )),
542 },
543 RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
544 r#type: Some(protobuf::remote_type::Type::PostgresOid(
545 protobuf::PostgresOid {},
546 )),
547 },
548
549 RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
550 r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
551 protobuf::MysqlTinyInt {},
552 )),
553 },
554 RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
555 r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
556 protobuf::MysqlTinyIntUnsigned {},
557 )),
558 },
559 RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
560 r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
561 protobuf::MysqlSmallInt {},
562 )),
563 },
564 RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
565 r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
566 protobuf::MysqlSmallIntUnsigned {},
567 )),
568 },
569 RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
570 r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
571 protobuf::MysqlMediumInt {},
572 )),
573 },
574 RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
575 r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
576 protobuf::MysqlMediumIntUnsigned {},
577 )),
578 },
579 RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
580 r#type: Some(protobuf::remote_type::Type::MysqlInteger(
581 protobuf::MysqlInteger {},
582 )),
583 },
584 RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
585 r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
586 protobuf::MysqlIntegerUnsigned {},
587 )),
588 },
589 RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
590 r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
591 protobuf::MysqlBigInt {},
592 )),
593 },
594 RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
595 r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
596 protobuf::MysqlBigIntUnsigned {},
597 )),
598 },
599 RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
600 r#type: Some(protobuf::remote_type::Type::MysqlFloat(
601 protobuf::MysqlFloat {},
602 )),
603 },
604 RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
605 r#type: Some(protobuf::remote_type::Type::MysqlDouble(
606 protobuf::MysqlDouble {},
607 )),
608 },
609 RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
610 r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
611 protobuf::MysqlDecimal {
612 precision: *precision as u32,
613 scale: *scale as u32,
614 },
615 )),
616 },
617 RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
618 r#type: Some(protobuf::remote_type::Type::MysqlDate(
619 protobuf::MysqlDate {},
620 )),
621 },
622 RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
623 r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
624 protobuf::MysqlDateTime {},
625 )),
626 },
627 RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
628 r#type: Some(protobuf::remote_type::Type::MysqlTime(
629 protobuf::MysqlTime {},
630 )),
631 },
632 RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
633 r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
634 protobuf::MysqlTimestamp {},
635 )),
636 },
637 RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
638 r#type: Some(protobuf::remote_type::Type::MysqlYear(
639 protobuf::MysqlYear {},
640 )),
641 },
642 RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
643 r#type: Some(protobuf::remote_type::Type::MysqlChar(
644 protobuf::MysqlChar {},
645 )),
646 },
647 RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
648 r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
649 protobuf::MysqlVarchar {},
650 )),
651 },
652 RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
653 r#type: Some(protobuf::remote_type::Type::MysqlBinary(
654 protobuf::MysqlBinary {},
655 )),
656 },
657 RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
658 r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
659 protobuf::MysqlVarbinary {},
660 )),
661 },
662 RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
663 r#type: Some(protobuf::remote_type::Type::MysqlText(
664 protobuf::MysqlText { length: *len },
665 )),
666 },
667 RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
668 r#type: Some(protobuf::remote_type::Type::MysqlBlob(
669 protobuf::MysqlBlob { length: *len },
670 )),
671 },
672 RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
673 r#type: Some(protobuf::remote_type::Type::MysqlJson(
674 protobuf::MysqlJson {},
675 )),
676 },
677 RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
678 r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
679 protobuf::MysqlGeometry {},
680 )),
681 },
682
683 RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
684 r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
685 protobuf::OracleVarchar2 { length: *len },
686 )),
687 },
688 RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
689 r#type: Some(protobuf::remote_type::Type::OracleChar(
690 protobuf::OracleChar { length: *len },
691 )),
692 },
693 RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
694 r#type: Some(protobuf::remote_type::Type::OracleNumber(
695 protobuf::OracleNumber {
696 precision: *precision as u32,
697 scale: *scale as i32,
698 },
699 )),
700 },
701 RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
702 r#type: Some(protobuf::remote_type::Type::OracleDate(
703 protobuf::OracleDate {},
704 )),
705 },
706 RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
707 r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
708 protobuf::OracleTimestamp {},
709 )),
710 },
711 RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
712 r#type: Some(protobuf::remote_type::Type::OracleBoolean(
713 protobuf::OracleBoolean {},
714 )),
715 },
716 RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
717 r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
718 protobuf::OracleBinaryFloat {},
719 )),
720 },
721 RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
722 r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
723 protobuf::OracleBinaryDouble {},
724 )),
725 },
726 RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
727 r#type: Some(protobuf::remote_type::Type::OracleBlob(
728 protobuf::OracleBlob {},
729 )),
730 },
731 RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
732 r#type: Some(protobuf::remote_type::Type::OracleFloat(
733 protobuf::OracleFloat {
734 precision: *precision as u32,
735 },
736 )),
737 },
738 RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
739 r#type: Some(protobuf::remote_type::Type::OracleNchar(
740 protobuf::OracleNChar { length: *len },
741 )),
742 },
743 RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
744 r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
745 protobuf::OracleNVarchar2 { length: *len },
746 )),
747 },
748 RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
749 r#type: Some(protobuf::remote_type::Type::OracleRaw(
750 protobuf::OracleRaw { length: *len },
751 )),
752 },
753 RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
754 r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
755 protobuf::OracleLongRaw {},
756 )),
757 },
758 RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
759 r#type: Some(protobuf::remote_type::Type::OracleLong(
760 protobuf::OracleLong {},
761 )),
762 },
763 RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
764 r#type: Some(protobuf::remote_type::Type::OracleClob(
765 protobuf::OracleClob {},
766 )),
767 },
768 RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
769 r#type: Some(protobuf::remote_type::Type::OracleNclob(
770 protobuf::OracleNClob {},
771 )),
772 },
773 RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
774 r#type: Some(protobuf::remote_type::Type::SqliteNull(
775 protobuf::SqliteNull {},
776 )),
777 },
778 RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
779 r#type: Some(protobuf::remote_type::Type::SqliteInteger(
780 protobuf::SqliteInteger {},
781 )),
782 },
783 RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
784 r#type: Some(protobuf::remote_type::Type::SqliteReal(
785 protobuf::SqliteReal {},
786 )),
787 },
788 RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
789 r#type: Some(protobuf::remote_type::Type::SqliteText(
790 protobuf::SqliteText {},
791 )),
792 },
793 RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
794 r#type: Some(protobuf::remote_type::Type::SqliteBlob(
795 protobuf::SqliteBlob {},
796 )),
797 },
798 RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
799 r#type: Some(protobuf::remote_type::Type::DmTinyInt(
800 protobuf::DmTinyInt {},
801 )),
802 },
803 RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
804 r#type: Some(protobuf::remote_type::Type::DmSmallInt(
805 protobuf::DmSmallInt {},
806 )),
807 },
808 RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
809 r#type: Some(protobuf::remote_type::Type::DmInteger(
810 protobuf::DmInteger {},
811 )),
812 },
813 RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
814 r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
815 },
816 RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
817 r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
818 },
819 RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
820 r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
821 },
822 RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
823 r#type: Some(protobuf::remote_type::Type::DmNumeric(
824 protobuf::DmNumeric {
825 precision: *precision as u32,
826 scale: *scale as i32,
827 },
828 )),
829 },
830 RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
831 r#type: Some(protobuf::remote_type::Type::DmDecimal(
832 protobuf::DmDecimal {
833 precision: *precision as u32,
834 scale: *scale as i32,
835 },
836 )),
837 },
838 RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
839 r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
840 length: len.map(|s| s as u32),
841 })),
842 },
843 RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
844 r#type: Some(protobuf::remote_type::Type::DmVarchar(
845 protobuf::DmVarchar {
846 length: len.map(|s| s as u32),
847 },
848 )),
849 },
850 RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
851 r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
852 },
853 RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
854 r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
855 length: *len as u32,
856 })),
857 },
858 RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
859 r#type: Some(protobuf::remote_type::Type::DmVarbinary(
860 protobuf::DmVarbinary {
861 length: len.map(|s| s as u32),
862 },
863 )),
864 },
865 RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
866 r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
867 },
868 RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
869 r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
870 },
871 RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
872 r#type: Some(protobuf::remote_type::Type::DmTimestamp(
873 protobuf::DmTimestamp {
874 precision: *precision as u32,
875 },
876 )),
877 },
878 RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
879 r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
880 precision: *precision as u32,
881 })),
882 },
883 RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
884 r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
885 },
886 }
887}
888
889fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
890 let fields = remote_schema
891 .fields
892 .iter()
893 .map(parse_remote_field)
894 .collect::<Vec<_>>();
895
896 RemoteSchema { fields }
897}
898
899fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
900 RemoteField {
901 name: field.name.clone(),
902 remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
903 nullable: field.nullable,
904 }
905}
906
907fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
908 match remote_type.r#type.as_ref().unwrap() {
909 protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
910 protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
911 protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
912 protobuf::remote_type::Type::PostgresFloat4(_) => {
913 RemoteType::Postgres(PostgresType::Float4)
914 }
915 protobuf::remote_type::Type::PostgresFloat8(_) => {
916 RemoteType::Postgres(PostgresType::Float8)
917 }
918 protobuf::remote_type::Type::PostgresNumeric(numeric) => {
919 RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
920 }
921 protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
922 protobuf::remote_type::Type::PostgresVarchar(_) => {
923 RemoteType::Postgres(PostgresType::Varchar)
924 }
925 protobuf::remote_type::Type::PostgresBpchar(_) => {
926 RemoteType::Postgres(PostgresType::Bpchar)
927 }
928 protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
929 protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
930 protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
931 protobuf::remote_type::Type::PostgresTimestamp(_) => {
932 RemoteType::Postgres(PostgresType::Timestamp)
933 }
934 protobuf::remote_type::Type::PostgresTimestampTz(_) => {
935 RemoteType::Postgres(PostgresType::TimestampTz)
936 }
937 protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
938 protobuf::remote_type::Type::PostgresInterval(_) => {
939 RemoteType::Postgres(PostgresType::Interval)
940 }
941 protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
942 protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
943 protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
944 protobuf::remote_type::Type::PostgresInt2Array(_) => {
945 RemoteType::Postgres(PostgresType::Int2Array)
946 }
947 protobuf::remote_type::Type::PostgresInt4Array(_) => {
948 RemoteType::Postgres(PostgresType::Int4Array)
949 }
950 protobuf::remote_type::Type::PostgresInt8Array(_) => {
951 RemoteType::Postgres(PostgresType::Int8Array)
952 }
953 protobuf::remote_type::Type::PostgresFloat4Array(_) => {
954 RemoteType::Postgres(PostgresType::Float4Array)
955 }
956 protobuf::remote_type::Type::PostgresFloat8Array(_) => {
957 RemoteType::Postgres(PostgresType::Float8Array)
958 }
959 protobuf::remote_type::Type::PostgresVarcharArray(_) => {
960 RemoteType::Postgres(PostgresType::VarcharArray)
961 }
962 protobuf::remote_type::Type::PostgresBpcharArray(_) => {
963 RemoteType::Postgres(PostgresType::BpcharArray)
964 }
965 protobuf::remote_type::Type::PostgresTextArray(_) => {
966 RemoteType::Postgres(PostgresType::TextArray)
967 }
968 protobuf::remote_type::Type::PostgresByteaArray(_) => {
969 RemoteType::Postgres(PostgresType::ByteaArray)
970 }
971 protobuf::remote_type::Type::PostgresBoolArray(_) => {
972 RemoteType::Postgres(PostgresType::BoolArray)
973 }
974 protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
975 RemoteType::Postgres(PostgresType::PostGisGeometry)
976 }
977 protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
978 protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
979 protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
980 RemoteType::Mysql(MysqlType::TinyIntUnsigned)
981 }
982 protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
983 protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
984 RemoteType::Mysql(MysqlType::SmallIntUnsigned)
985 }
986 protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
987 protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
988 RemoteType::Mysql(MysqlType::MediumIntUnsigned)
989 }
990 protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
991 protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
992 RemoteType::Mysql(MysqlType::IntegerUnsigned)
993 }
994 protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
995 protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
996 RemoteType::Mysql(MysqlType::BigIntUnsigned)
997 }
998 protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
999 protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
1000 protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
1001 MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
1002 ),
1003 protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
1004 protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
1005 protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
1006 protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
1007 protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
1008 protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
1009 protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
1010 protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
1011 protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
1012 protobuf::remote_type::Type::MysqlText(text) => {
1013 RemoteType::Mysql(MysqlType::Text(text.length))
1014 }
1015 protobuf::remote_type::Type::MysqlBlob(blob) => {
1016 RemoteType::Mysql(MysqlType::Blob(blob.length))
1017 }
1018 protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
1019 protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
1020 protobuf::remote_type::Type::OracleVarchar2(varchar) => {
1021 RemoteType::Oracle(OracleType::Varchar2(varchar.length))
1022 }
1023 protobuf::remote_type::Type::OracleChar(char) => {
1024 RemoteType::Oracle(OracleType::Char(char.length))
1025 }
1026 protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
1027 OracleType::Number(number.precision as u8, number.scale as i8),
1028 ),
1029 protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
1030 protobuf::remote_type::Type::OracleTimestamp(_) => {
1031 RemoteType::Oracle(OracleType::Timestamp)
1032 }
1033 protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
1034 protobuf::remote_type::Type::OracleBinaryFloat(_) => {
1035 RemoteType::Oracle(OracleType::BinaryFloat)
1036 }
1037 protobuf::remote_type::Type::OracleBinaryDouble(_) => {
1038 RemoteType::Oracle(OracleType::BinaryDouble)
1039 }
1040 protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
1041 RemoteType::Oracle(OracleType::Float(*precision as u8))
1042 }
1043 protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
1044 RemoteType::Oracle(OracleType::NChar(*length))
1045 }
1046 protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
1047 RemoteType::Oracle(OracleType::NVarchar2(*length))
1048 }
1049 protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
1050 RemoteType::Oracle(OracleType::Raw(*length))
1051 }
1052 protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
1053 protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
1054 protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
1055 protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
1056 protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
1057 protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1058 protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1059 protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1060 protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1061 protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1062 protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1063 protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1064 protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1065 protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1066 protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1067 protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1068 protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1069 RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1070 }
1071 protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1072 RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1073 }
1074 protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1075 RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1076 }
1077 protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1078 RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1079 }
1080 protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1081 protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1082 RemoteType::Dm(DmType::Binary(*length as u16))
1083 }
1084 protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1085 RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1086 }
1087 protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1088 protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1089 protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1090 RemoteType::Dm(DmType::Timestamp(*precision as u8))
1091 }
1092 protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1093 RemoteType::Dm(DmType::Time(*precision as u8))
1094 }
1095 protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1096 }
1097}