1#[cfg(feature = "dm")]
2use crate::DmConnectionOptions;
3#[cfg(feature = "mysql")]
4use crate::MysqlConnectionOptions;
5#[cfg(feature = "oracle")]
6use crate::OracleConnectionOptions;
7#[cfg(feature = "postgres")]
8use crate::PostgresConnectionOptions;
9#[cfg(feature = "sqlite")]
10use crate::SqliteConnectionOptions;
11use crate::generated::prost as protobuf;
12use crate::{
13 Connection, ConnectionOptions, DFResult, DefaultTransform, DmType, MysqlType, OracleType,
14 PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef, RemoteTableExec, RemoteType,
15 SqliteType, Transform, connect,
16};
17use datafusion::arrow::datatypes::SchemaRef;
18use datafusion::common::DataFusionError;
19use datafusion::execution::FunctionRegistry;
20use datafusion::physical_plan::ExecutionPlan;
21use datafusion_proto::convert_required;
22use datafusion_proto::physical_plan::PhysicalExtensionCodec;
23use datafusion_proto::protobuf::proto_error;
24use derive_with::With;
25use log::debug;
26use prost::Message;
27use std::fmt::Debug;
28#[cfg(feature = "sqlite")]
29use std::path::Path;
30use std::sync::Arc;
31
32pub trait TransformCodec: Debug + Send + Sync {
33 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
34 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
35}
36
37#[derive(Debug)]
38pub struct DefaultTransformCodec {}
39
40const DEFAULT_TRANSFORM_ID: &str = "__default";
41
42impl TransformCodec for DefaultTransformCodec {
43 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
44 if value.as_any().is::<DefaultTransform>() {
45 Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
46 } else {
47 Err(DataFusionError::Execution(format!(
48 "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
49 )))
50 }
51 }
52
53 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
54 if value == DEFAULT_TRANSFORM_ID.as_bytes() {
55 Ok(Arc::new(DefaultTransform {}))
56 } else {
57 Err(DataFusionError::Execution(
58 "DefaultTransformCodec only supports DefaultTransform".to_string(),
59 ))
60 }
61 }
62}
63
64pub trait ConnectionCodec: Debug + Send + Sync {
65 fn try_encode(
66 &self,
67 value: &dyn Connection,
68 conn_options: &ConnectionOptions,
69 ) -> DFResult<Vec<u8>>;
70 fn try_decode(
71 &self,
72 value: &[u8],
73 conn_options: &ConnectionOptions,
74 ) -> DFResult<Arc<dyn Connection>>;
75}
76
77#[derive(Debug)]
78pub struct DefaultConnectionCodec;
79
80const DEFAULT_CONNECTION_VALUE: &str = "__default";
81
82impl ConnectionCodec for DefaultConnectionCodec {
83 fn try_encode(
84 &self,
85 _value: &dyn Connection,
86 _conn_options: &ConnectionOptions,
87 ) -> DFResult<Vec<u8>> {
88 Ok(DEFAULT_CONNECTION_VALUE.as_bytes().to_vec())
89 }
90
91 fn try_decode(
92 &self,
93 value: &[u8],
94 conn_options: &ConnectionOptions,
95 ) -> DFResult<Arc<dyn Connection>> {
96 if value != DEFAULT_CONNECTION_VALUE.as_bytes() {
97 return Err(DataFusionError::Execution(format!(
98 "DefaultConnectionCodec only supports {DEFAULT_CONNECTION_VALUE} value but got: {value:?}"
99 )));
100 }
101 let conn_options = conn_options.clone().with_pool_max_size(1);
102 tokio::task::block_in_place(|| {
103 tokio::runtime::Handle::current().block_on(async {
104 let pool = connect(&conn_options).await?;
105 let conn = pool.get().await?;
106 Ok::<_, DataFusionError>(conn)
107 })
108 })
109 }
110}
111
112#[derive(Debug, With)]
113pub struct RemotePhysicalCodec {
114 transform_codec: Arc<dyn TransformCodec>,
115 connection_codec: Arc<dyn ConnectionCodec>,
116}
117
118impl RemotePhysicalCodec {
119 pub fn new() -> Self {
120 Self {
121 transform_codec: Arc::new(DefaultTransformCodec {}),
122 connection_codec: Arc::new(DefaultConnectionCodec {}),
123 }
124 }
125}
126
127impl Default for RemotePhysicalCodec {
128 fn default() -> Self {
129 Self::new()
130 }
131}
132
133impl PhysicalExtensionCodec for RemotePhysicalCodec {
134 fn try_decode(
135 &self,
136 buf: &[u8],
137 _inputs: &[Arc<dyn ExecutionPlan>],
138 _registry: &dyn FunctionRegistry,
139 ) -> DFResult<Arc<dyn ExecutionPlan>> {
140 let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
141 DataFusionError::Internal(format!(
142 "Failed to decode remote table execution plan: {e:?}"
143 ))
144 })?;
145
146 let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
147 Arc::new(DefaultTransform {})
148 } else {
149 self.transform_codec.try_decode(&proto.transform)?
150 };
151
152 let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
153 let remote_schema = proto
154 .remote_schema
155 .map(|schema| Arc::new(parse_remote_schema(&schema)));
156
157 let projection: Option<Vec<usize>> = proto
158 .projection
159 .map(|p| p.projection.iter().map(|n| *n as usize).collect());
160
161 let limit = proto.limit.map(|l| l as usize);
162
163 let conn_options = parse_connection_options(proto.conn_options.unwrap());
164
165 let now = std::time::Instant::now();
166 let conn = self
167 .connection_codec
168 .try_decode(&proto.connection, &conn_options)?;
169 debug!(
170 "[remote-table] Decoding connection cost {}ms",
171 now.elapsed().as_millis()
172 );
173
174 Ok(Arc::new(RemoteTableExec::try_new(
175 conn_options,
176 proto.sql,
177 table_schema,
178 remote_schema,
179 projection,
180 proto.unparsed_filters,
181 limit,
182 transform,
183 conn,
184 )?))
185 }
186
187 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
188 if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
189 let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
190 DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
191 } else {
192 let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
193 assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
194 bytes
195 };
196
197 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
198 let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
199
200 let serialized_conn = self
201 .connection_codec
202 .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
203
204 let proto = protobuf::RemoteTableExec {
205 conn_options: Some(serialized_connection_options),
206 sql: exec.sql.clone(),
207 table_schema: Some(exec.table_schema.as_ref().try_into()?),
208 remote_schema,
209 projection: exec
210 .projection
211 .as_ref()
212 .map(|p| serialize_projection(p.as_slice())),
213 unparsed_filters: exec.unparsed_filters.clone(),
214 limit: exec.limit.map(|l| l as u32),
215 transform: serialized_transform,
216 connection: serialized_conn,
217 };
218
219 proto.encode(buf).map_err(|e| {
220 DataFusionError::Internal(format!(
221 "Failed to encode remote table execution plan: {e:?}"
222 ))
223 })?;
224 Ok(())
225 } else {
226 Err(DataFusionError::Execution(format!(
227 "Failed to encode {}",
228 RemoteTableExec::static_name()
229 )))
230 }
231 }
232}
233
234fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
235 match options {
236 #[cfg(feature = "postgres")]
237 ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
238 connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
239 protobuf::PostgresConnectionOptions {
240 host: options.host.clone(),
241 port: options.port as u32,
242 username: options.username.clone(),
243 password: options.password.clone(),
244 database: options.database.clone(),
245 pool_max_size: options.pool_max_size as u32,
246 stream_chunk_size: options.stream_chunk_size as u32,
247 },
248 )),
249 },
250 #[cfg(feature = "mysql")]
251 ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
252 connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
253 protobuf::MysqlConnectionOptions {
254 host: options.host.clone(),
255 port: options.port as u32,
256 username: options.username.clone(),
257 password: options.password.clone(),
258 database: options.database.clone(),
259 pool_max_size: options.pool_max_size as u32,
260 stream_chunk_size: options.stream_chunk_size as u32,
261 },
262 )),
263 },
264 #[cfg(feature = "oracle")]
265 ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
266 connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
267 protobuf::OracleConnectionOptions {
268 host: options.host.clone(),
269 port: options.port as u32,
270 username: options.username.clone(),
271 password: options.password.clone(),
272 service_name: options.service_name.clone(),
273 pool_max_size: options.pool_max_size as u32,
274 stream_chunk_size: options.stream_chunk_size as u32,
275 },
276 )),
277 },
278 #[cfg(feature = "sqlite")]
279 ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
280 connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
281 protobuf::SqliteConnectionOptions {
282 path: options.path.to_str().unwrap().to_string(),
283 stream_chunk_size: options.stream_chunk_size as u32,
284 },
285 )),
286 },
287 #[cfg(feature = "dm")]
288 ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
289 connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
290 protobuf::DmConnectionOptions {
291 host: options.host.clone(),
292 port: options.port as u32,
293 username: options.username.clone(),
294 password: options.password.clone(),
295 schema: options.schema.clone(),
296 stream_chunk_size: options.stream_chunk_size as u32,
297 driver: options.driver.clone(),
298 },
299 )),
300 },
301 }
302}
303
304fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
305 match options.connection_options {
306 #[cfg(feature = "postgres")]
307 Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
308 ConnectionOptions::Postgres(PostgresConnectionOptions {
309 host: options.host,
310 port: options.port as u16,
311 username: options.username,
312 password: options.password,
313 database: options.database,
314 pool_max_size: options.pool_max_size as usize,
315 stream_chunk_size: options.stream_chunk_size as usize,
316 })
317 }
318 #[cfg(feature = "mysql")]
319 Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
320 ConnectionOptions::Mysql(MysqlConnectionOptions {
321 host: options.host,
322 port: options.port as u16,
323 username: options.username,
324 password: options.password,
325 database: options.database,
326 pool_max_size: options.pool_max_size as usize,
327 stream_chunk_size: options.stream_chunk_size as usize,
328 })
329 }
330 #[cfg(feature = "oracle")]
331 Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
332 ConnectionOptions::Oracle(OracleConnectionOptions {
333 host: options.host,
334 port: options.port as u16,
335 username: options.username,
336 password: options.password,
337 service_name: options.service_name,
338 pool_max_size: options.pool_max_size as usize,
339 stream_chunk_size: options.stream_chunk_size as usize,
340 })
341 }
342 #[cfg(feature = "sqlite")]
343 Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
344 ConnectionOptions::Sqlite(SqliteConnectionOptions {
345 path: Path::new(&options.path).to_path_buf(),
346 stream_chunk_size: options.stream_chunk_size as usize,
347 })
348 }
349 #[cfg(feature = "dm")]
350 Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
351 ConnectionOptions::Dm(DmConnectionOptions {
352 host: options.host,
353 port: options.port as u16,
354 username: options.username,
355 password: options.password,
356 schema: options.schema,
357 stream_chunk_size: options.stream_chunk_size as usize,
358 driver: options.driver,
359 })
360 }
361 _ => panic!("Failed to parse connection options: {options:?}"),
362 }
363}
364
365fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
366 protobuf::Projection {
367 projection: projection.iter().map(|n| *n as u32).collect(),
368 }
369}
370
371fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
372 let fields = remote_schema
373 .fields
374 .iter()
375 .map(serialize_remote_field)
376 .collect::<Vec<_>>();
377
378 protobuf::RemoteSchema { fields }
379}
380
381fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
382 protobuf::RemoteField {
383 name: remote_field.name.clone(),
384 remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
385 nullable: remote_field.nullable,
386 }
387}
388
389fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
390 match remote_type {
391 RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
392 r#type: Some(protobuf::remote_type::Type::PostgresInt2(
393 protobuf::PostgresInt2 {},
394 )),
395 },
396 RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
397 r#type: Some(protobuf::remote_type::Type::PostgresInt4(
398 protobuf::PostgresInt4 {},
399 )),
400 },
401 RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
402 r#type: Some(protobuf::remote_type::Type::PostgresInt8(
403 protobuf::PostgresInt8 {},
404 )),
405 },
406 RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
407 r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
408 protobuf::PostgresFloat4 {},
409 )),
410 },
411 RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
412 r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
413 protobuf::PostgresFloat8 {},
414 )),
415 },
416 RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
417 r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
418 protobuf::PostgresNumeric {
419 scale: *scale as i32,
420 },
421 )),
422 },
423 RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
424 r#type: Some(protobuf::remote_type::Type::PostgresName(
425 protobuf::PostgresName {},
426 )),
427 },
428 RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
429 r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
430 protobuf::PostgresVarchar {},
431 )),
432 },
433 RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
434 r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
435 protobuf::PostgresBpchar {},
436 )),
437 },
438 RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
439 r#type: Some(protobuf::remote_type::Type::PostgresText(
440 protobuf::PostgresText {},
441 )),
442 },
443 RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
444 r#type: Some(protobuf::remote_type::Type::PostgresBytea(
445 protobuf::PostgresBytea {},
446 )),
447 },
448 RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
449 r#type: Some(protobuf::remote_type::Type::PostgresDate(
450 protobuf::PostgresDate {},
451 )),
452 },
453 RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
454 r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
455 protobuf::PostgresTimestamp {},
456 )),
457 },
458 RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
459 r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
460 protobuf::PostgresTimestampTz {},
461 )),
462 },
463 RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
464 r#type: Some(protobuf::remote_type::Type::PostgresTime(
465 protobuf::PostgresTime {},
466 )),
467 },
468 RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
469 r#type: Some(protobuf::remote_type::Type::PostgresInterval(
470 protobuf::PostgresInterval {},
471 )),
472 },
473 RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
474 r#type: Some(protobuf::remote_type::Type::PostgresBool(
475 protobuf::PostgresBool {},
476 )),
477 },
478 RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
479 r#type: Some(protobuf::remote_type::Type::PostgresJson(
480 protobuf::PostgresJson {},
481 )),
482 },
483 RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
484 r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
485 protobuf::PostgresJsonb {},
486 )),
487 },
488 RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
489 r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
490 protobuf::PostgresInt2Array {},
491 )),
492 },
493 RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
494 r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
495 protobuf::PostgresInt4Array {},
496 )),
497 },
498 RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
499 r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
500 protobuf::PostgresInt8Array {},
501 )),
502 },
503 RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
504 r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
505 protobuf::PostgresFloat4Array {},
506 )),
507 },
508 RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
509 r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
510 protobuf::PostgresFloat8Array {},
511 )),
512 },
513 RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
514 r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
515 protobuf::PostgresVarcharArray {},
516 )),
517 },
518 RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
519 r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
520 protobuf::PostgresBpcharArray {},
521 )),
522 },
523 RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
524 r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
525 protobuf::PostgresTextArray {},
526 )),
527 },
528 RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
529 r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
530 protobuf::PostgresByteaArray {},
531 )),
532 },
533 RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
534 r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
535 protobuf::PostgresBoolArray {},
536 )),
537 },
538 RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
539 r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
540 protobuf::PostgresPostGisGeometry {},
541 )),
542 },
543 RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
544 r#type: Some(protobuf::remote_type::Type::PostgresOid(
545 protobuf::PostgresOid {},
546 )),
547 },
548 RemoteType::Postgres(PostgresType::Xml) => protobuf::RemoteType {
549 r#type: Some(protobuf::remote_type::Type::PostgresXml(
550 protobuf::PostgresXml {},
551 )),
552 },
553 RemoteType::Postgres(PostgresType::Uuid) => protobuf::RemoteType {
554 r#type: Some(protobuf::remote_type::Type::PostgresUuid(
555 protobuf::PostgresUuid {},
556 )),
557 },
558
559 RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
560 r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
561 protobuf::MysqlTinyInt {},
562 )),
563 },
564 RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
565 r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
566 protobuf::MysqlTinyIntUnsigned {},
567 )),
568 },
569 RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
570 r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
571 protobuf::MysqlSmallInt {},
572 )),
573 },
574 RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
575 r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
576 protobuf::MysqlSmallIntUnsigned {},
577 )),
578 },
579 RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
580 r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
581 protobuf::MysqlMediumInt {},
582 )),
583 },
584 RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
585 r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
586 protobuf::MysqlMediumIntUnsigned {},
587 )),
588 },
589 RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
590 r#type: Some(protobuf::remote_type::Type::MysqlInteger(
591 protobuf::MysqlInteger {},
592 )),
593 },
594 RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
595 r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
596 protobuf::MysqlIntegerUnsigned {},
597 )),
598 },
599 RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
600 r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
601 protobuf::MysqlBigInt {},
602 )),
603 },
604 RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
605 r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
606 protobuf::MysqlBigIntUnsigned {},
607 )),
608 },
609 RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
610 r#type: Some(protobuf::remote_type::Type::MysqlFloat(
611 protobuf::MysqlFloat {},
612 )),
613 },
614 RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
615 r#type: Some(protobuf::remote_type::Type::MysqlDouble(
616 protobuf::MysqlDouble {},
617 )),
618 },
619 RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
620 r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
621 protobuf::MysqlDecimal {
622 precision: *precision as u32,
623 scale: *scale as u32,
624 },
625 )),
626 },
627 RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
628 r#type: Some(protobuf::remote_type::Type::MysqlDate(
629 protobuf::MysqlDate {},
630 )),
631 },
632 RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
633 r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
634 protobuf::MysqlDateTime {},
635 )),
636 },
637 RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
638 r#type: Some(protobuf::remote_type::Type::MysqlTime(
639 protobuf::MysqlTime {},
640 )),
641 },
642 RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
643 r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
644 protobuf::MysqlTimestamp {},
645 )),
646 },
647 RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
648 r#type: Some(protobuf::remote_type::Type::MysqlYear(
649 protobuf::MysqlYear {},
650 )),
651 },
652 RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
653 r#type: Some(protobuf::remote_type::Type::MysqlChar(
654 protobuf::MysqlChar {},
655 )),
656 },
657 RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
658 r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
659 protobuf::MysqlVarchar {},
660 )),
661 },
662 RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
663 r#type: Some(protobuf::remote_type::Type::MysqlBinary(
664 protobuf::MysqlBinary {},
665 )),
666 },
667 RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
668 r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
669 protobuf::MysqlVarbinary {},
670 )),
671 },
672 RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
673 r#type: Some(protobuf::remote_type::Type::MysqlText(
674 protobuf::MysqlText { length: *len },
675 )),
676 },
677 RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
678 r#type: Some(protobuf::remote_type::Type::MysqlBlob(
679 protobuf::MysqlBlob { length: *len },
680 )),
681 },
682 RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
683 r#type: Some(protobuf::remote_type::Type::MysqlJson(
684 protobuf::MysqlJson {},
685 )),
686 },
687 RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
688 r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
689 protobuf::MysqlGeometry {},
690 )),
691 },
692
693 RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
694 r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
695 protobuf::OracleVarchar2 { length: *len },
696 )),
697 },
698 RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
699 r#type: Some(protobuf::remote_type::Type::OracleChar(
700 protobuf::OracleChar { length: *len },
701 )),
702 },
703 RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
704 r#type: Some(protobuf::remote_type::Type::OracleNumber(
705 protobuf::OracleNumber {
706 precision: *precision as u32,
707 scale: *scale as i32,
708 },
709 )),
710 },
711 RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
712 r#type: Some(protobuf::remote_type::Type::OracleDate(
713 protobuf::OracleDate {},
714 )),
715 },
716 RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
717 r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
718 protobuf::OracleTimestamp {},
719 )),
720 },
721 RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
722 r#type: Some(protobuf::remote_type::Type::OracleBoolean(
723 protobuf::OracleBoolean {},
724 )),
725 },
726 RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
727 r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
728 protobuf::OracleBinaryFloat {},
729 )),
730 },
731 RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
732 r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
733 protobuf::OracleBinaryDouble {},
734 )),
735 },
736 RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
737 r#type: Some(protobuf::remote_type::Type::OracleBlob(
738 protobuf::OracleBlob {},
739 )),
740 },
741 RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
742 r#type: Some(protobuf::remote_type::Type::OracleFloat(
743 protobuf::OracleFloat {
744 precision: *precision as u32,
745 },
746 )),
747 },
748 RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
749 r#type: Some(protobuf::remote_type::Type::OracleNchar(
750 protobuf::OracleNChar { length: *len },
751 )),
752 },
753 RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
754 r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
755 protobuf::OracleNVarchar2 { length: *len },
756 )),
757 },
758 RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
759 r#type: Some(protobuf::remote_type::Type::OracleRaw(
760 protobuf::OracleRaw { length: *len },
761 )),
762 },
763 RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
764 r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
765 protobuf::OracleLongRaw {},
766 )),
767 },
768 RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
769 r#type: Some(protobuf::remote_type::Type::OracleLong(
770 protobuf::OracleLong {},
771 )),
772 },
773 RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
774 r#type: Some(protobuf::remote_type::Type::OracleClob(
775 protobuf::OracleClob {},
776 )),
777 },
778 RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
779 r#type: Some(protobuf::remote_type::Type::OracleNclob(
780 protobuf::OracleNClob {},
781 )),
782 },
783 RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
784 r#type: Some(protobuf::remote_type::Type::SqliteNull(
785 protobuf::SqliteNull {},
786 )),
787 },
788 RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
789 r#type: Some(protobuf::remote_type::Type::SqliteInteger(
790 protobuf::SqliteInteger {},
791 )),
792 },
793 RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
794 r#type: Some(protobuf::remote_type::Type::SqliteReal(
795 protobuf::SqliteReal {},
796 )),
797 },
798 RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
799 r#type: Some(protobuf::remote_type::Type::SqliteText(
800 protobuf::SqliteText {},
801 )),
802 },
803 RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
804 r#type: Some(protobuf::remote_type::Type::SqliteBlob(
805 protobuf::SqliteBlob {},
806 )),
807 },
808 RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
809 r#type: Some(protobuf::remote_type::Type::DmTinyInt(
810 protobuf::DmTinyInt {},
811 )),
812 },
813 RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
814 r#type: Some(protobuf::remote_type::Type::DmSmallInt(
815 protobuf::DmSmallInt {},
816 )),
817 },
818 RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
819 r#type: Some(protobuf::remote_type::Type::DmInteger(
820 protobuf::DmInteger {},
821 )),
822 },
823 RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
824 r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
825 },
826 RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
827 r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
828 },
829 RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
830 r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
831 },
832 RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
833 r#type: Some(protobuf::remote_type::Type::DmNumeric(
834 protobuf::DmNumeric {
835 precision: *precision as u32,
836 scale: *scale as i32,
837 },
838 )),
839 },
840 RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
841 r#type: Some(protobuf::remote_type::Type::DmDecimal(
842 protobuf::DmDecimal {
843 precision: *precision as u32,
844 scale: *scale as i32,
845 },
846 )),
847 },
848 RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
849 r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
850 length: len.map(|s| s as u32),
851 })),
852 },
853 RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
854 r#type: Some(protobuf::remote_type::Type::DmVarchar(
855 protobuf::DmVarchar {
856 length: len.map(|s| s as u32),
857 },
858 )),
859 },
860 RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
861 r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
862 },
863 RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
864 r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
865 length: *len as u32,
866 })),
867 },
868 RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
869 r#type: Some(protobuf::remote_type::Type::DmVarbinary(
870 protobuf::DmVarbinary {
871 length: len.map(|s| s as u32),
872 },
873 )),
874 },
875 RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
876 r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
877 },
878 RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
879 r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
880 },
881 RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
882 r#type: Some(protobuf::remote_type::Type::DmTimestamp(
883 protobuf::DmTimestamp {
884 precision: *precision as u32,
885 },
886 )),
887 },
888 RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
889 r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
890 precision: *precision as u32,
891 })),
892 },
893 RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
894 r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
895 },
896 }
897}
898
899fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
900 let fields = remote_schema
901 .fields
902 .iter()
903 .map(parse_remote_field)
904 .collect::<Vec<_>>();
905
906 RemoteSchema { fields }
907}
908
909fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
910 RemoteField {
911 name: field.name.clone(),
912 remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
913 nullable: field.nullable,
914 }
915}
916
917fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
918 match remote_type.r#type.as_ref().unwrap() {
919 protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
920 protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
921 protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
922 protobuf::remote_type::Type::PostgresFloat4(_) => {
923 RemoteType::Postgres(PostgresType::Float4)
924 }
925 protobuf::remote_type::Type::PostgresFloat8(_) => {
926 RemoteType::Postgres(PostgresType::Float8)
927 }
928 protobuf::remote_type::Type::PostgresNumeric(numeric) => {
929 RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
930 }
931 protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
932 protobuf::remote_type::Type::PostgresVarchar(_) => {
933 RemoteType::Postgres(PostgresType::Varchar)
934 }
935 protobuf::remote_type::Type::PostgresBpchar(_) => {
936 RemoteType::Postgres(PostgresType::Bpchar)
937 }
938 protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
939 protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
940 protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
941 protobuf::remote_type::Type::PostgresTimestamp(_) => {
942 RemoteType::Postgres(PostgresType::Timestamp)
943 }
944 protobuf::remote_type::Type::PostgresTimestampTz(_) => {
945 RemoteType::Postgres(PostgresType::TimestampTz)
946 }
947 protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
948 protobuf::remote_type::Type::PostgresInterval(_) => {
949 RemoteType::Postgres(PostgresType::Interval)
950 }
951 protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
952 protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
953 protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
954 protobuf::remote_type::Type::PostgresInt2Array(_) => {
955 RemoteType::Postgres(PostgresType::Int2Array)
956 }
957 protobuf::remote_type::Type::PostgresInt4Array(_) => {
958 RemoteType::Postgres(PostgresType::Int4Array)
959 }
960 protobuf::remote_type::Type::PostgresInt8Array(_) => {
961 RemoteType::Postgres(PostgresType::Int8Array)
962 }
963 protobuf::remote_type::Type::PostgresFloat4Array(_) => {
964 RemoteType::Postgres(PostgresType::Float4Array)
965 }
966 protobuf::remote_type::Type::PostgresFloat8Array(_) => {
967 RemoteType::Postgres(PostgresType::Float8Array)
968 }
969 protobuf::remote_type::Type::PostgresVarcharArray(_) => {
970 RemoteType::Postgres(PostgresType::VarcharArray)
971 }
972 protobuf::remote_type::Type::PostgresBpcharArray(_) => {
973 RemoteType::Postgres(PostgresType::BpcharArray)
974 }
975 protobuf::remote_type::Type::PostgresTextArray(_) => {
976 RemoteType::Postgres(PostgresType::TextArray)
977 }
978 protobuf::remote_type::Type::PostgresByteaArray(_) => {
979 RemoteType::Postgres(PostgresType::ByteaArray)
980 }
981 protobuf::remote_type::Type::PostgresBoolArray(_) => {
982 RemoteType::Postgres(PostgresType::BoolArray)
983 }
984 protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
985 RemoteType::Postgres(PostgresType::PostGisGeometry)
986 }
987 protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
988 protobuf::remote_type::Type::PostgresXml(_) => RemoteType::Postgres(PostgresType::Xml),
989 protobuf::remote_type::Type::PostgresUuid(_) => RemoteType::Postgres(PostgresType::Uuid),
990 protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
991 protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
992 RemoteType::Mysql(MysqlType::TinyIntUnsigned)
993 }
994 protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
995 protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
996 RemoteType::Mysql(MysqlType::SmallIntUnsigned)
997 }
998 protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
999 protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
1000 RemoteType::Mysql(MysqlType::MediumIntUnsigned)
1001 }
1002 protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
1003 protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
1004 RemoteType::Mysql(MysqlType::IntegerUnsigned)
1005 }
1006 protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
1007 protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
1008 RemoteType::Mysql(MysqlType::BigIntUnsigned)
1009 }
1010 protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
1011 protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
1012 protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
1013 MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
1014 ),
1015 protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
1016 protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
1017 protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
1018 protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
1019 protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
1020 protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
1021 protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
1022 protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
1023 protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
1024 protobuf::remote_type::Type::MysqlText(text) => {
1025 RemoteType::Mysql(MysqlType::Text(text.length))
1026 }
1027 protobuf::remote_type::Type::MysqlBlob(blob) => {
1028 RemoteType::Mysql(MysqlType::Blob(blob.length))
1029 }
1030 protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
1031 protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
1032 protobuf::remote_type::Type::OracleVarchar2(varchar) => {
1033 RemoteType::Oracle(OracleType::Varchar2(varchar.length))
1034 }
1035 protobuf::remote_type::Type::OracleChar(char) => {
1036 RemoteType::Oracle(OracleType::Char(char.length))
1037 }
1038 protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
1039 OracleType::Number(number.precision as u8, number.scale as i8),
1040 ),
1041 protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
1042 protobuf::remote_type::Type::OracleTimestamp(_) => {
1043 RemoteType::Oracle(OracleType::Timestamp)
1044 }
1045 protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
1046 protobuf::remote_type::Type::OracleBinaryFloat(_) => {
1047 RemoteType::Oracle(OracleType::BinaryFloat)
1048 }
1049 protobuf::remote_type::Type::OracleBinaryDouble(_) => {
1050 RemoteType::Oracle(OracleType::BinaryDouble)
1051 }
1052 protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
1053 RemoteType::Oracle(OracleType::Float(*precision as u8))
1054 }
1055 protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
1056 RemoteType::Oracle(OracleType::NChar(*length))
1057 }
1058 protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
1059 RemoteType::Oracle(OracleType::NVarchar2(*length))
1060 }
1061 protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
1062 RemoteType::Oracle(OracleType::Raw(*length))
1063 }
1064 protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
1065 protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
1066 protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
1067 protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
1068 protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
1069 protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1070 protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1071 protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1072 protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1073 protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1074 protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1075 protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1076 protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1077 protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1078 protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1079 protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1080 protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1081 RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1082 }
1083 protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1084 RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1085 }
1086 protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1087 RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1088 }
1089 protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1090 RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1091 }
1092 protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1093 protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1094 RemoteType::Dm(DmType::Binary(*length as u16))
1095 }
1096 protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1097 RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1098 }
1099 protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1100 protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1101 protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1102 RemoteType::Dm(DmType::Timestamp(*precision as u8))
1103 }
1104 protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1105 RemoteType::Dm(DmType::Time(*precision as u8))
1106 }
1107 protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1108 }
1109}