1use crate::DmConnectionOptions;
2use crate::MysqlConnectionOptions;
3use crate::OracleConnectionOptions;
4use crate::PostgresConnectionOptions;
5use crate::SqliteConnectionOptions;
6use crate::generated::prost as protobuf;
7use crate::{
8 Connection, ConnectionOptions, DFResult, DefaultLiteralizer, DefaultTransform, DmType,
9 Literalize, MysqlType, OracleType, PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef,
10 RemoteSource, RemoteTableInsertExec, RemoteTableScanExec, RemoteType, SqliteType, Transform,
11 connect,
12};
13use datafusion::arrow::datatypes::SchemaRef;
14use datafusion::common::DataFusionError;
15use datafusion::execution::FunctionRegistry;
16use datafusion::physical_plan::ExecutionPlan;
17use datafusion_proto::convert_required;
18use datafusion_proto::physical_plan::PhysicalExtensionCodec;
19use datafusion_proto::protobuf::proto_error;
20use derive_with::With;
21use log::debug;
22use prost::Message;
23use std::fmt::Debug;
24use std::sync::Arc;
25use std::time::Duration;
26
27pub trait TransformCodec: Debug + Send + Sync {
28 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
29 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
30}
31
32#[derive(Debug)]
33pub struct DefaultTransformCodec {}
34
35const DEFAULT_TRANSFORM_ID: &str = "__default";
36
37impl TransformCodec for DefaultTransformCodec {
38 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
39 if value.as_any().is::<DefaultTransform>() {
40 Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
41 } else {
42 Err(DataFusionError::Execution(format!(
43 "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
44 )))
45 }
46 }
47
48 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
49 if value == DEFAULT_TRANSFORM_ID.as_bytes() {
50 Ok(Arc::new(DefaultTransform {}))
51 } else {
52 Err(DataFusionError::Execution(
53 "DefaultTransformCodec only supports DefaultTransform".to_string(),
54 ))
55 }
56 }
57}
58
59pub trait LiteralizeCodec: Debug + Send + Sync {
60 fn try_encode(&self, value: &dyn Literalize) -> DFResult<Vec<u8>>;
61 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Literalize>>;
62}
63
64#[derive(Debug)]
65pub struct DefaultLiteralizeCodec {}
66
67const DEFAULT_LITERALIZE_ID: &str = "__default";
68
69impl LiteralizeCodec for DefaultLiteralizeCodec {
70 fn try_encode(&self, value: &dyn Literalize) -> DFResult<Vec<u8>> {
71 if value.as_any().is::<DefaultLiteralizer>() {
72 Ok(DEFAULT_LITERALIZE_ID.as_bytes().to_vec())
73 } else {
74 Err(DataFusionError::Execution(format!(
75 "DefaultLiteralizeCodec does not support literalize: {value:?}, please implement a custom LiteralizeCodec."
76 )))
77 }
78 }
79
80 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Literalize>> {
81 if value == DEFAULT_LITERALIZE_ID.as_bytes() {
82 Ok(Arc::new(DefaultLiteralizer {}))
83 } else {
84 Err(DataFusionError::Execution(
85 "DefaultLiteralizeCodec only supports DefaultLiteralizer".to_string(),
86 ))
87 }
88 }
89}
90
91pub trait ConnectionCodec: Debug + Send + Sync {
92 fn try_encode(
93 &self,
94 value: &dyn Connection,
95 conn_options: &ConnectionOptions,
96 ) -> DFResult<Vec<u8>>;
97 fn try_decode(
98 &self,
99 value: &[u8],
100 conn_options: &ConnectionOptions,
101 ) -> DFResult<Arc<dyn Connection>>;
102}
103
104#[derive(Debug)]
105pub struct DefaultConnectionCodec;
106
107const DEFAULT_CONNECTION_VALUE: &str = "__default";
108
109impl ConnectionCodec for DefaultConnectionCodec {
110 fn try_encode(
111 &self,
112 _value: &dyn Connection,
113 _conn_options: &ConnectionOptions,
114 ) -> DFResult<Vec<u8>> {
115 Ok(DEFAULT_CONNECTION_VALUE.as_bytes().to_vec())
116 }
117
118 fn try_decode(
119 &self,
120 value: &[u8],
121 conn_options: &ConnectionOptions,
122 ) -> DFResult<Arc<dyn Connection>> {
123 if value != DEFAULT_CONNECTION_VALUE.as_bytes() {
124 return Err(DataFusionError::Execution(format!(
125 "DefaultConnectionCodec only supports {DEFAULT_CONNECTION_VALUE} value but got: {value:?}"
126 )));
127 }
128 let conn_options = conn_options.clone().with_pool_max_size(1);
129 tokio::task::block_in_place(|| {
130 tokio::runtime::Handle::current().block_on(async {
131 let pool = connect(&conn_options).await?;
132 let conn = pool.get().await?;
133 Ok::<_, DataFusionError>(conn)
134 })
135 })
136 }
137}
138
139#[derive(Debug, With)]
140pub struct RemotePhysicalCodec {
141 pub transform_codec: Arc<dyn TransformCodec>,
142 pub literalize_codec: Arc<dyn LiteralizeCodec>,
143 pub connection_codec: Arc<dyn ConnectionCodec>,
144}
145
146impl RemotePhysicalCodec {
147 pub fn new() -> Self {
148 Self {
149 transform_codec: Arc::new(DefaultTransformCodec {}),
150 literalize_codec: Arc::new(DefaultLiteralizeCodec {}),
151 connection_codec: Arc::new(DefaultConnectionCodec {}),
152 }
153 }
154}
155
156impl Default for RemotePhysicalCodec {
157 fn default() -> Self {
158 Self::new()
159 }
160}
161
162impl PhysicalExtensionCodec for RemotePhysicalCodec {
163 fn try_decode(
164 &self,
165 buf: &[u8],
166 inputs: &[Arc<dyn ExecutionPlan>],
167 _registry: &dyn FunctionRegistry,
168 ) -> DFResult<Arc<dyn ExecutionPlan>> {
169 let remote_table_node =
170 protobuf::RemoteTablePhysicalPlanNode::decode(buf).map_err(|e| {
171 DataFusionError::Internal(format!(
172 "Failed to decode remote table physical plan node: {e:?}"
173 ))
174 })?;
175 let remote_table_plan = remote_table_node.remote_table_physical_plan_type.ok_or_else(|| {
176 DataFusionError::Internal(
177 "Failed to decode remote table physical plan node due to physical plan type is none".to_string()
178 )
179 })?;
180
181 match remote_table_plan {
182 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(proto) => {
183 let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
184 Arc::new(DefaultTransform {})
185 } else {
186 self.transform_codec.try_decode(&proto.transform)?
187 };
188
189 let source = parse_remote_source(proto.source.as_ref().ok_or(
190 DataFusionError::Internal("remote source is not set".to_string()),
191 )?)?;
192
193 let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
194 let remote_schema = proto
195 .remote_schema
196 .map(|schema| Arc::new(parse_remote_schema(&schema)));
197
198 let projection = parse_projection(proto.projection.as_ref());
199
200 let limit = proto.limit.map(|l| l as usize);
201
202 let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
203
204 let now = std::time::Instant::now();
205 let conn = self
206 .connection_codec
207 .try_decode(&proto.connection, &conn_options)?;
208 debug!(
209 "[remote-table] Decoding connection cost {}ms",
210 now.elapsed().as_millis()
211 );
212
213 Ok(Arc::new(RemoteTableScanExec::try_new(
214 conn_options,
215 source,
216 table_schema,
217 remote_schema,
218 projection,
219 proto.unparsed_filters,
220 limit,
221 transform,
222 conn,
223 )?))
224 }
225 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(
226 proto,
227 ) => {
228 if inputs.len() != 1 {
229 return Err(DataFusionError::Internal(
230 "RemoteTableInsertExec only support one input".to_string(),
231 ));
232 }
233
234 let input = inputs[0].clone();
235
236 let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
237 let remote_schema = Arc::new(parse_remote_schema(&proto.remote_schema.unwrap()));
238 let table = proto.table.unwrap().idents;
239
240 let literalizer = if proto.literalizer == DEFAULT_LITERALIZE_ID.as_bytes() {
241 Arc::new(DefaultLiteralizer {})
242 } else {
243 self.literalize_codec.try_decode(&proto.literalizer)?
244 };
245
246 let now = std::time::Instant::now();
247 let conn = self
248 .connection_codec
249 .try_decode(&proto.connection, &conn_options)?;
250 debug!(
251 "[remote-table] Decoding connection cost {}ms",
252 now.elapsed().as_millis()
253 );
254
255 Ok(Arc::new(RemoteTableInsertExec::new(
256 input,
257 conn_options,
258 literalizer,
259 table,
260 remote_schema,
261 conn,
262 )))
263 }
264 }
265 }
266
267 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
268 if let Some(exec) = node.as_any().downcast_ref::<RemoteTableScanExec>() {
269 let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
270 DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
271 } else {
272 let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
273 assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
274 bytes
275 };
276
277 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
278 let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
279
280 let serialized_conn = self
281 .connection_codec
282 .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
283
284 let serialized_source = serialize_remote_source(&exec.source);
285
286 let proto = protobuf::RemoteTablePhysicalPlanNode {
287 remote_table_physical_plan_type: Some(
288 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(
289 protobuf::RemoteTableScanExec {
290 conn_options: Some(serialized_connection_options),
291 source: Some(serialized_source),
292 table_schema: Some(exec.table_schema.as_ref().try_into()?),
293 remote_schema,
294 projection: serialize_projection(exec.projection.as_ref()),
295 unparsed_filters: exec.unparsed_filters.clone(),
296 limit: exec.limit.map(|l| l as u32),
297 transform: serialized_transform,
298 connection: serialized_conn,
299 },
300 ),
301 ),
302 };
303
304 proto.encode(buf).map_err(|e| {
305 DataFusionError::Internal(format!(
306 "Failed to encode remote table scan exec plan: {e:?}"
307 ))
308 })?;
309 Ok(())
310 } else if let Some(exec) = node.as_any().downcast_ref::<RemoteTableInsertExec>() {
311 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
312 let remote_schema = serialize_remote_schema(&exec.remote_schema);
313 let serialized_table = protobuf::Identifiers {
314 idents: exec.table.clone(),
315 };
316 let serialized_literalizer = self
317 .literalize_codec
318 .try_encode(exec.literalizer.as_ref())?;
319 let serialized_conn = self
320 .connection_codec
321 .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
322
323 let proto = protobuf::RemoteTablePhysicalPlanNode {
324 remote_table_physical_plan_type: Some(
325 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(
326 protobuf::RemoteTableInsertExec {
327 conn_options: Some(serialized_connection_options),
328 table: Some(serialized_table),
329 remote_schema: Some(remote_schema),
330 literalizer: serialized_literalizer,
331 connection: serialized_conn,
332 },
333 ),
334 ),
335 };
336
337 proto.encode(buf).map_err(|e| {
338 DataFusionError::Internal(format!(
339 "Failed to encode remote table insert exec node: {e:?}"
340 ))
341 })?;
342
343 Ok(())
344 } else {
345 Err(DataFusionError::NotImplemented(format!(
346 "RemotePhysicalCodec does not support encoding {}",
347 node.name()
348 )))
349 }
350 }
351}
352
353fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
354 match options {
355 ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
356 connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
357 protobuf::PostgresConnectionOptions {
358 host: options.host.clone(),
359 port: options.port as u32,
360 username: options.username.clone(),
361 password: options.password.clone(),
362 database: options.database.clone(),
363 pool_max_size: options.pool_max_size as u32,
364 pool_min_idle: options.pool_min_idle as u32,
365 pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
366 pool_ttl_check_interval: Some(serialize_duration(
367 &options.pool_ttl_check_interval,
368 )),
369 stream_chunk_size: options.stream_chunk_size as u32,
370 default_numeric_scale: options.default_numeric_scale as i32,
371 },
372 )),
373 },
374 ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
375 connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
376 protobuf::MysqlConnectionOptions {
377 host: options.host.clone(),
378 port: options.port as u32,
379 username: options.username.clone(),
380 password: options.password.clone(),
381 database: options.database.clone(),
382 pool_max_size: options.pool_max_size as u32,
383 pool_min_idle: options.pool_min_idle as u32,
384 pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
385 pool_ttl_check_interval: Some(serialize_duration(
386 &options.pool_ttl_check_interval,
387 )),
388 stream_chunk_size: options.stream_chunk_size as u32,
389 },
390 )),
391 },
392 ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
393 connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
394 protobuf::OracleConnectionOptions {
395 host: options.host.clone(),
396 port: options.port as u32,
397 username: options.username.clone(),
398 password: options.password.clone(),
399 service_name: options.service_name.clone(),
400 pool_max_size: options.pool_max_size as u32,
401 pool_min_idle: options.pool_min_idle as u32,
402 pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
403 pool_ttl_check_interval: Some(serialize_duration(
404 &options.pool_ttl_check_interval,
405 )),
406 stream_chunk_size: options.stream_chunk_size as u32,
407 },
408 )),
409 },
410 ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
411 connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
412 protobuf::SqliteConnectionOptions {
413 path: options.path.to_str().unwrap().to_string(),
414 stream_chunk_size: options.stream_chunk_size as u32,
415 },
416 )),
417 },
418 ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
419 connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
420 protobuf::DmConnectionOptions {
421 host: options.host.clone(),
422 port: options.port as u32,
423 username: options.username.clone(),
424 password: options.password.clone(),
425 schema: options.schema.clone(),
426 stream_chunk_size: options.stream_chunk_size as u32,
427 driver: options.driver.clone(),
428 },
429 )),
430 },
431 }
432}
433
434fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
435 match options.connection_options {
436 Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
437 ConnectionOptions::Postgres(PostgresConnectionOptions {
438 host: options.host,
439 port: options.port as u16,
440 username: options.username,
441 password: options.password,
442 database: options.database,
443 pool_max_size: options.pool_max_size as usize,
444 pool_min_idle: options.pool_min_idle as usize,
445 pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
446 pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
447 stream_chunk_size: options.stream_chunk_size as usize,
448 default_numeric_scale: options.default_numeric_scale as i8,
449 })
450 }
451 Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
452 ConnectionOptions::Mysql(MysqlConnectionOptions {
453 host: options.host,
454 port: options.port as u16,
455 username: options.username,
456 password: options.password,
457 database: options.database,
458 pool_max_size: options.pool_max_size as usize,
459 pool_min_idle: options.pool_min_idle as usize,
460 pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
461 pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
462 stream_chunk_size: options.stream_chunk_size as usize,
463 })
464 }
465 Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
466 ConnectionOptions::Oracle(OracleConnectionOptions {
467 host: options.host,
468 port: options.port as u16,
469 username: options.username,
470 password: options.password,
471 service_name: options.service_name,
472 pool_max_size: options.pool_max_size as usize,
473 pool_min_idle: options.pool_min_idle as usize,
474 pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
475 pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
476 stream_chunk_size: options.stream_chunk_size as usize,
477 })
478 }
479 Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
480 ConnectionOptions::Sqlite(SqliteConnectionOptions {
481 path: std::path::Path::new(&options.path).to_path_buf(),
482 stream_chunk_size: options.stream_chunk_size as usize,
483 })
484 }
485 Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
486 ConnectionOptions::Dm(DmConnectionOptions {
487 host: options.host,
488 port: options.port as u16,
489 username: options.username,
490 password: options.password,
491 schema: options.schema,
492 stream_chunk_size: options.stream_chunk_size as usize,
493 driver: options.driver,
494 })
495 }
496 _ => panic!("Failed to parse connection options: {options:?}"),
497 }
498}
499
500fn serialize_duration(duration: &Duration) -> protobuf::Duration {
501 protobuf::Duration {
502 secs: duration.as_secs(),
503 nanos: duration.subsec_nanos(),
504 }
505}
506
507fn parse_duration(duration: &protobuf::Duration) -> Duration {
508 Duration::new(duration.secs, duration.nanos)
509}
510
511fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<protobuf::Projection> {
512 projection.map(|p| protobuf::Projection {
513 projection: p.iter().map(|n| *n as u32).collect(),
514 })
515}
516
517fn parse_projection(projection: Option<&protobuf::Projection>) -> Option<Vec<usize>> {
518 projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
519}
520
521fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
522 let fields = remote_schema
523 .fields
524 .iter()
525 .map(serialize_remote_field)
526 .collect::<Vec<_>>();
527
528 protobuf::RemoteSchema { fields }
529}
530
531fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
532 protobuf::RemoteField {
533 name: remote_field.name.clone(),
534 remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
535 nullable: remote_field.nullable,
536 auto_increment: remote_field.auto_increment,
537 }
538}
539
540fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
541 match remote_type {
542 RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
543 r#type: Some(protobuf::remote_type::Type::PostgresInt2(
544 protobuf::Empty {},
545 )),
546 },
547 RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
548 r#type: Some(protobuf::remote_type::Type::PostgresInt4(
549 protobuf::Empty {},
550 )),
551 },
552 RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
553 r#type: Some(protobuf::remote_type::Type::PostgresInt8(
554 protobuf::Empty {},
555 )),
556 },
557 RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
558 r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
559 protobuf::Empty {},
560 )),
561 },
562 RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
563 r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
564 protobuf::Empty {},
565 )),
566 },
567 RemoteType::Postgres(PostgresType::Numeric(precision, scale)) => protobuf::RemoteType {
568 r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
569 protobuf::PostgresNumeric {
570 precision: *precision as u32,
571 scale: *scale as i32,
572 },
573 )),
574 },
575 RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
576 r#type: Some(protobuf::remote_type::Type::PostgresName(
577 protobuf::Empty {},
578 )),
579 },
580 RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
581 r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
582 protobuf::Empty {},
583 )),
584 },
585 RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
586 r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
587 protobuf::Empty {},
588 )),
589 },
590 RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
591 r#type: Some(protobuf::remote_type::Type::PostgresText(
592 protobuf::Empty {},
593 )),
594 },
595 RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
596 r#type: Some(protobuf::remote_type::Type::PostgresBytea(
597 protobuf::Empty {},
598 )),
599 },
600 RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
601 r#type: Some(protobuf::remote_type::Type::PostgresDate(
602 protobuf::Empty {},
603 )),
604 },
605 RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
606 r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
607 protobuf::Empty {},
608 )),
609 },
610 RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
611 r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
612 protobuf::Empty {},
613 )),
614 },
615 RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
616 r#type: Some(protobuf::remote_type::Type::PostgresTime(
617 protobuf::Empty {},
618 )),
619 },
620 RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
621 r#type: Some(protobuf::remote_type::Type::PostgresInterval(
622 protobuf::Empty {},
623 )),
624 },
625 RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
626 r#type: Some(protobuf::remote_type::Type::PostgresBool(
627 protobuf::Empty {},
628 )),
629 },
630 RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
631 r#type: Some(protobuf::remote_type::Type::PostgresJson(
632 protobuf::Empty {},
633 )),
634 },
635 RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
636 r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
637 protobuf::Empty {},
638 )),
639 },
640 RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
641 r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
642 protobuf::Empty {},
643 )),
644 },
645 RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
646 r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
647 protobuf::Empty {},
648 )),
649 },
650 RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
651 r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
652 protobuf::Empty {},
653 )),
654 },
655 RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
656 r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
657 protobuf::Empty {},
658 )),
659 },
660 RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
661 r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
662 protobuf::Empty {},
663 )),
664 },
665 RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
666 r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
667 protobuf::Empty {},
668 )),
669 },
670 RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
671 r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
672 protobuf::Empty {},
673 )),
674 },
675 RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
676 r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
677 protobuf::Empty {},
678 )),
679 },
680 RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
681 r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
682 protobuf::Empty {},
683 )),
684 },
685 RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
686 r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
687 protobuf::Empty {},
688 )),
689 },
690 RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
691 r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
692 protobuf::Empty {},
693 )),
694 },
695 RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
696 r#type: Some(protobuf::remote_type::Type::PostgresOid(protobuf::Empty {})),
697 },
698 RemoteType::Postgres(PostgresType::Xml) => protobuf::RemoteType {
699 r#type: Some(protobuf::remote_type::Type::PostgresXml(protobuf::Empty {})),
700 },
701 RemoteType::Postgres(PostgresType::Uuid) => protobuf::RemoteType {
702 r#type: Some(protobuf::remote_type::Type::PostgresUuid(
703 protobuf::Empty {},
704 )),
705 },
706
707 RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
708 r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
709 protobuf::Empty {},
710 )),
711 },
712 RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
713 r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
714 protobuf::Empty {},
715 )),
716 },
717 RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
718 r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
719 protobuf::Empty {},
720 )),
721 },
722 RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
723 r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
724 protobuf::Empty {},
725 )),
726 },
727 RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
728 r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
729 protobuf::Empty {},
730 )),
731 },
732 RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
733 r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
734 protobuf::Empty {},
735 )),
736 },
737 RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
738 r#type: Some(protobuf::remote_type::Type::MysqlInteger(
739 protobuf::Empty {},
740 )),
741 },
742 RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
743 r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
744 protobuf::Empty {},
745 )),
746 },
747 RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
748 r#type: Some(protobuf::remote_type::Type::MysqlBigInt(protobuf::Empty {})),
749 },
750 RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
751 r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
752 protobuf::Empty {},
753 )),
754 },
755 RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
756 r#type: Some(protobuf::remote_type::Type::MysqlFloat(protobuf::Empty {})),
757 },
758 RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
759 r#type: Some(protobuf::remote_type::Type::MysqlDouble(protobuf::Empty {})),
760 },
761 RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
762 r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
763 protobuf::MysqlDecimal {
764 precision: *precision as u32,
765 scale: *scale as u32,
766 },
767 )),
768 },
769 RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
770 r#type: Some(protobuf::remote_type::Type::MysqlDate(protobuf::Empty {})),
771 },
772 RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
773 r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
774 protobuf::Empty {},
775 )),
776 },
777 RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
778 r#type: Some(protobuf::remote_type::Type::MysqlTime(protobuf::Empty {})),
779 },
780 RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
781 r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
782 protobuf::Empty {},
783 )),
784 },
785 RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
786 r#type: Some(protobuf::remote_type::Type::MysqlYear(protobuf::Empty {})),
787 },
788 RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
789 r#type: Some(protobuf::remote_type::Type::MysqlChar(protobuf::Empty {})),
790 },
791 RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
792 r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
793 protobuf::Empty {},
794 )),
795 },
796 RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
797 r#type: Some(protobuf::remote_type::Type::MysqlBinary(protobuf::Empty {})),
798 },
799 RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
800 r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
801 protobuf::Empty {},
802 )),
803 },
804 RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
805 r#type: Some(protobuf::remote_type::Type::MysqlText(
806 protobuf::MysqlText { length: *len },
807 )),
808 },
809 RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
810 r#type: Some(protobuf::remote_type::Type::MysqlBlob(
811 protobuf::MysqlBlob { length: *len },
812 )),
813 },
814 RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
815 r#type: Some(protobuf::remote_type::Type::MysqlJson(protobuf::Empty {})),
816 },
817 RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
818 r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
819 protobuf::Empty {},
820 )),
821 },
822
823 RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
824 r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
825 protobuf::OracleVarchar2 { length: *len },
826 )),
827 },
828 RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
829 r#type: Some(protobuf::remote_type::Type::OracleChar(
830 protobuf::OracleChar { length: *len },
831 )),
832 },
833 RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
834 r#type: Some(protobuf::remote_type::Type::OracleNumber(
835 protobuf::OracleNumber {
836 precision: *precision as u32,
837 scale: *scale as i32,
838 },
839 )),
840 },
841 RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
842 r#type: Some(protobuf::remote_type::Type::OracleDate(protobuf::Empty {})),
843 },
844 RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
845 r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
846 protobuf::Empty {},
847 )),
848 },
849 RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
850 r#type: Some(protobuf::remote_type::Type::OracleBoolean(
851 protobuf::Empty {},
852 )),
853 },
854 RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
855 r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
856 protobuf::Empty {},
857 )),
858 },
859 RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
860 r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
861 protobuf::Empty {},
862 )),
863 },
864 RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
865 r#type: Some(protobuf::remote_type::Type::OracleBlob(protobuf::Empty {})),
866 },
867 RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
868 r#type: Some(protobuf::remote_type::Type::OracleFloat(
869 protobuf::OracleFloat {
870 precision: *precision as u32,
871 },
872 )),
873 },
874 RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
875 r#type: Some(protobuf::remote_type::Type::OracleNchar(
876 protobuf::OracleNChar { length: *len },
877 )),
878 },
879 RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
880 r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
881 protobuf::OracleNVarchar2 { length: *len },
882 )),
883 },
884 RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
885 r#type: Some(protobuf::remote_type::Type::OracleRaw(
886 protobuf::OracleRaw { length: *len },
887 )),
888 },
889 RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
890 r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
891 protobuf::Empty {},
892 )),
893 },
894 RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
895 r#type: Some(protobuf::remote_type::Type::OracleLong(protobuf::Empty {})),
896 },
897 RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
898 r#type: Some(protobuf::remote_type::Type::OracleClob(protobuf::Empty {})),
899 },
900 RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
901 r#type: Some(protobuf::remote_type::Type::OracleNclob(protobuf::Empty {})),
902 },
903 RemoteType::Oracle(OracleType::SdeGeometry) => protobuf::RemoteType {
904 r#type: Some(protobuf::remote_type::Type::OracleSdeGeometry(
905 protobuf::Empty {},
906 )),
907 },
908 RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
909 r#type: Some(protobuf::remote_type::Type::SqliteNull(protobuf::Empty {})),
910 },
911 RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
912 r#type: Some(protobuf::remote_type::Type::SqliteInteger(
913 protobuf::Empty {},
914 )),
915 },
916 RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
917 r#type: Some(protobuf::remote_type::Type::SqliteReal(protobuf::Empty {})),
918 },
919 RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
920 r#type: Some(protobuf::remote_type::Type::SqliteText(protobuf::Empty {})),
921 },
922 RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
923 r#type: Some(protobuf::remote_type::Type::SqliteBlob(protobuf::Empty {})),
924 },
925 RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
926 r#type: Some(protobuf::remote_type::Type::DmTinyInt(protobuf::Empty {})),
927 },
928 RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
929 r#type: Some(protobuf::remote_type::Type::DmSmallInt(protobuf::Empty {})),
930 },
931 RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
932 r#type: Some(protobuf::remote_type::Type::DmInteger(protobuf::Empty {})),
933 },
934 RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
935 r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::Empty {})),
936 },
937 RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
938 r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::Empty {})),
939 },
940 RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
941 r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::Empty {})),
942 },
943 RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
944 r#type: Some(protobuf::remote_type::Type::DmNumeric(
945 protobuf::DmNumeric {
946 precision: *precision as u32,
947 scale: *scale as i32,
948 },
949 )),
950 },
951 RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
952 r#type: Some(protobuf::remote_type::Type::DmDecimal(
953 protobuf::DmDecimal {
954 precision: *precision as u32,
955 scale: *scale as i32,
956 },
957 )),
958 },
959 RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
960 r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
961 length: len.map(|s| s as u32),
962 })),
963 },
964 RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
965 r#type: Some(protobuf::remote_type::Type::DmVarchar(
966 protobuf::DmVarchar {
967 length: len.map(|s| s as u32),
968 },
969 )),
970 },
971 RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
972 r#type: Some(protobuf::remote_type::Type::DmText(protobuf::Empty {})),
973 },
974 RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
975 r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
976 length: *len as u32,
977 })),
978 },
979 RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
980 r#type: Some(protobuf::remote_type::Type::DmVarbinary(
981 protobuf::DmVarbinary {
982 length: len.map(|s| s as u32),
983 },
984 )),
985 },
986 RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
987 r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::Empty {})),
988 },
989 RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
990 r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::Empty {})),
991 },
992 RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
993 r#type: Some(protobuf::remote_type::Type::DmTimestamp(
994 protobuf::DmTimestamp {
995 precision: *precision as u32,
996 },
997 )),
998 },
999 RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
1000 r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
1001 precision: *precision as u32,
1002 })),
1003 },
1004 RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
1005 r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::Empty {})),
1006 },
1007 }
1008}
1009
1010fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
1011 let fields = remote_schema
1012 .fields
1013 .iter()
1014 .map(parse_remote_field)
1015 .collect::<Vec<_>>();
1016
1017 RemoteSchema { fields }
1018}
1019
1020fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
1021 RemoteField {
1022 name: field.name.clone(),
1023 remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
1024 nullable: field.nullable,
1025 auto_increment: field.auto_increment,
1026 }
1027}
1028
1029fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
1030 match remote_type.r#type.as_ref().unwrap() {
1031 protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
1032 protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
1033 protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
1034 protobuf::remote_type::Type::PostgresFloat4(_) => {
1035 RemoteType::Postgres(PostgresType::Float4)
1036 }
1037 protobuf::remote_type::Type::PostgresFloat8(_) => {
1038 RemoteType::Postgres(PostgresType::Float8)
1039 }
1040 protobuf::remote_type::Type::PostgresNumeric(numeric) => RemoteType::Postgres(
1041 PostgresType::Numeric(numeric.precision as u8, numeric.scale as i8),
1042 ),
1043 protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
1044 protobuf::remote_type::Type::PostgresVarchar(_) => {
1045 RemoteType::Postgres(PostgresType::Varchar)
1046 }
1047 protobuf::remote_type::Type::PostgresBpchar(_) => {
1048 RemoteType::Postgres(PostgresType::Bpchar)
1049 }
1050 protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
1051 protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
1052 protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
1053 protobuf::remote_type::Type::PostgresTimestamp(_) => {
1054 RemoteType::Postgres(PostgresType::Timestamp)
1055 }
1056 protobuf::remote_type::Type::PostgresTimestampTz(_) => {
1057 RemoteType::Postgres(PostgresType::TimestampTz)
1058 }
1059 protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
1060 protobuf::remote_type::Type::PostgresInterval(_) => {
1061 RemoteType::Postgres(PostgresType::Interval)
1062 }
1063 protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
1064 protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
1065 protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
1066 protobuf::remote_type::Type::PostgresInt2Array(_) => {
1067 RemoteType::Postgres(PostgresType::Int2Array)
1068 }
1069 protobuf::remote_type::Type::PostgresInt4Array(_) => {
1070 RemoteType::Postgres(PostgresType::Int4Array)
1071 }
1072 protobuf::remote_type::Type::PostgresInt8Array(_) => {
1073 RemoteType::Postgres(PostgresType::Int8Array)
1074 }
1075 protobuf::remote_type::Type::PostgresFloat4Array(_) => {
1076 RemoteType::Postgres(PostgresType::Float4Array)
1077 }
1078 protobuf::remote_type::Type::PostgresFloat8Array(_) => {
1079 RemoteType::Postgres(PostgresType::Float8Array)
1080 }
1081 protobuf::remote_type::Type::PostgresVarcharArray(_) => {
1082 RemoteType::Postgres(PostgresType::VarcharArray)
1083 }
1084 protobuf::remote_type::Type::PostgresBpcharArray(_) => {
1085 RemoteType::Postgres(PostgresType::BpcharArray)
1086 }
1087 protobuf::remote_type::Type::PostgresTextArray(_) => {
1088 RemoteType::Postgres(PostgresType::TextArray)
1089 }
1090 protobuf::remote_type::Type::PostgresByteaArray(_) => {
1091 RemoteType::Postgres(PostgresType::ByteaArray)
1092 }
1093 protobuf::remote_type::Type::PostgresBoolArray(_) => {
1094 RemoteType::Postgres(PostgresType::BoolArray)
1095 }
1096 protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
1097 RemoteType::Postgres(PostgresType::PostGisGeometry)
1098 }
1099 protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
1100 protobuf::remote_type::Type::PostgresXml(_) => RemoteType::Postgres(PostgresType::Xml),
1101 protobuf::remote_type::Type::PostgresUuid(_) => RemoteType::Postgres(PostgresType::Uuid),
1102 protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
1103 protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
1104 RemoteType::Mysql(MysqlType::TinyIntUnsigned)
1105 }
1106 protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
1107 protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
1108 RemoteType::Mysql(MysqlType::SmallIntUnsigned)
1109 }
1110 protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
1111 protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
1112 RemoteType::Mysql(MysqlType::MediumIntUnsigned)
1113 }
1114 protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
1115 protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
1116 RemoteType::Mysql(MysqlType::IntegerUnsigned)
1117 }
1118 protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
1119 protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
1120 RemoteType::Mysql(MysqlType::BigIntUnsigned)
1121 }
1122 protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
1123 protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
1124 protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
1125 MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
1126 ),
1127 protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
1128 protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
1129 protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
1130 protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
1131 protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
1132 protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
1133 protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
1134 protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
1135 protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
1136 protobuf::remote_type::Type::MysqlText(text) => {
1137 RemoteType::Mysql(MysqlType::Text(text.length))
1138 }
1139 protobuf::remote_type::Type::MysqlBlob(blob) => {
1140 RemoteType::Mysql(MysqlType::Blob(blob.length))
1141 }
1142 protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
1143 protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
1144 protobuf::remote_type::Type::OracleVarchar2(varchar) => {
1145 RemoteType::Oracle(OracleType::Varchar2(varchar.length))
1146 }
1147 protobuf::remote_type::Type::OracleChar(char) => {
1148 RemoteType::Oracle(OracleType::Char(char.length))
1149 }
1150 protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
1151 OracleType::Number(number.precision as u8, number.scale as i8),
1152 ),
1153 protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
1154 protobuf::remote_type::Type::OracleTimestamp(_) => {
1155 RemoteType::Oracle(OracleType::Timestamp)
1156 }
1157 protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
1158 protobuf::remote_type::Type::OracleBinaryFloat(_) => {
1159 RemoteType::Oracle(OracleType::BinaryFloat)
1160 }
1161 protobuf::remote_type::Type::OracleBinaryDouble(_) => {
1162 RemoteType::Oracle(OracleType::BinaryDouble)
1163 }
1164 protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
1165 RemoteType::Oracle(OracleType::Float(*precision as u8))
1166 }
1167 protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
1168 RemoteType::Oracle(OracleType::NChar(*length))
1169 }
1170 protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
1171 RemoteType::Oracle(OracleType::NVarchar2(*length))
1172 }
1173 protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
1174 RemoteType::Oracle(OracleType::Raw(*length))
1175 }
1176 protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
1177 protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
1178 protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
1179 protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
1180 protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
1181 protobuf::remote_type::Type::OracleSdeGeometry(_) => {
1182 RemoteType::Oracle(OracleType::SdeGeometry)
1183 }
1184 protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1185 protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1186 protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1187 protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1188 protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1189 protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1190 protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1191 protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1192 protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1193 protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1194 protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1195 protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1196 RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1197 }
1198 protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1199 RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1200 }
1201 protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1202 RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1203 }
1204 protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1205 RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1206 }
1207 protobuf::remote_type::Type::DmText(_) => RemoteType::Dm(DmType::Text),
1208 protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1209 RemoteType::Dm(DmType::Binary(*length as u16))
1210 }
1211 protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1212 RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1213 }
1214 protobuf::remote_type::Type::DmImage(_) => RemoteType::Dm(DmType::Image),
1215 protobuf::remote_type::Type::DmBit(_) => RemoteType::Dm(DmType::Bit),
1216 protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1217 RemoteType::Dm(DmType::Timestamp(*precision as u8))
1218 }
1219 protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1220 RemoteType::Dm(DmType::Time(*precision as u8))
1221 }
1222 protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1223 }
1224}
1225
1226fn serialize_remote_source(source: &RemoteSource) -> protobuf::RemoteSource {
1227 match source {
1228 RemoteSource::Query(query) => protobuf::RemoteSource {
1229 source: Some(protobuf::remote_source::Source::Query(query.clone())),
1230 },
1231 RemoteSource::Table(table_identifiers) => protobuf::RemoteSource {
1232 source: Some(protobuf::remote_source::Source::Table(
1233 protobuf::Identifiers {
1234 idents: table_identifiers.clone(),
1235 },
1236 )),
1237 },
1238 }
1239}
1240
1241fn parse_remote_source(source: &protobuf::RemoteSource) -> DFResult<RemoteSource> {
1242 let source = source.source.as_ref().ok_or(DataFusionError::Internal(
1243 "remote source is not set".to_string(),
1244 ))?;
1245 match source {
1246 protobuf::remote_source::Source::Query(query) => Ok(RemoteSource::Query(query.clone())),
1247 protobuf::remote_source::Source::Table(table_identifiers) => {
1248 Ok(RemoteSource::Table(table_identifiers.idents.clone()))
1249 }
1250 }
1251}