1#[cfg(feature = "dm")]
2use crate::DmConnectionOptions;
3#[cfg(feature = "mysql")]
4use crate::MysqlConnectionOptions;
5#[cfg(feature = "oracle")]
6use crate::OracleConnectionOptions;
7#[cfg(feature = "postgres")]
8use crate::PostgresConnectionOptions;
9#[cfg(feature = "sqlite")]
10use crate::SqliteConnectionOptions;
11use crate::generated::prost as protobuf;
12use crate::{
13 Connection, ConnectionOptions, DFResult, DefaultTransform, DefaultUnparser, DmType, MysqlType,
14 OracleType, PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef, RemoteSource,
15 RemoteTableInsertExec, RemoteTableScanExec, RemoteType, SqliteType, Transform, Unparse,
16 connect,
17};
18use datafusion::arrow::array::RecordBatch;
19use datafusion::arrow::datatypes::{Schema, SchemaRef};
20use datafusion::arrow::ipc::reader::StreamReader;
21use datafusion::arrow::ipc::writer::StreamWriter;
22use datafusion::catalog::memory::{DataSourceExec, MemorySourceConfig};
23use datafusion::common::DataFusionError;
24use datafusion::datasource::source::DataSource;
25use datafusion::execution::FunctionRegistry;
26use datafusion::physical_plan::ExecutionPlan;
27use datafusion_proto::convert_required;
28use datafusion_proto::physical_plan::PhysicalExtensionCodec;
29use datafusion_proto::physical_plan::from_proto::parse_physical_sort_exprs;
30use datafusion_proto::physical_plan::to_proto::serialize_physical_sort_exprs;
31use datafusion_proto::protobuf::proto_error;
32use derive_with::With;
33use log::debug;
34use prost::Message;
35use std::fmt::Debug;
36use std::sync::Arc;
37
38pub trait TransformCodec: Debug + Send + Sync {
39 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
40 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
41}
42
43#[derive(Debug)]
44pub struct DefaultTransformCodec {}
45
46const DEFAULT_TRANSFORM_ID: &str = "__default";
47
48impl TransformCodec for DefaultTransformCodec {
49 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
50 if value.as_any().is::<DefaultTransform>() {
51 Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
52 } else {
53 Err(DataFusionError::Execution(format!(
54 "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
55 )))
56 }
57 }
58
59 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
60 if value == DEFAULT_TRANSFORM_ID.as_bytes() {
61 Ok(Arc::new(DefaultTransform {}))
62 } else {
63 Err(DataFusionError::Execution(
64 "DefaultTransformCodec only supports DefaultTransform".to_string(),
65 ))
66 }
67 }
68}
69
70pub trait UnparseCodec: Debug + Send + Sync {
71 fn try_encode(&self, value: &dyn Unparse) -> DFResult<Vec<u8>>;
72 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Unparse>>;
73}
74
75#[derive(Debug)]
76pub struct DefaultUnparseCodec {}
77
78const DEFAULT_UNPARSE_ID: &str = "__default";
79
80impl UnparseCodec for DefaultUnparseCodec {
81 fn try_encode(&self, value: &dyn Unparse) -> DFResult<Vec<u8>> {
82 if value.as_any().is::<DefaultUnparser>() {
83 Ok(DEFAULT_UNPARSE_ID.as_bytes().to_vec())
84 } else {
85 Err(DataFusionError::Execution(format!(
86 "DefaultUnparseCodec does not support unparse: {value:?}, please implement a custom UnparseCodec."
87 )))
88 }
89 }
90
91 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Unparse>> {
92 if value == DEFAULT_UNPARSE_ID.as_bytes() {
93 Ok(Arc::new(DefaultUnparser {}))
94 } else {
95 Err(DataFusionError::Execution(
96 "DefaultUnparseCodec only supports DefaultUnparser".to_string(),
97 ))
98 }
99 }
100}
101
102pub trait ConnectionCodec: Debug + Send + Sync {
103 fn try_encode(
104 &self,
105 value: &dyn Connection,
106 conn_options: &ConnectionOptions,
107 ) -> DFResult<Vec<u8>>;
108 fn try_decode(
109 &self,
110 value: &[u8],
111 conn_options: &ConnectionOptions,
112 ) -> DFResult<Arc<dyn Connection>>;
113}
114
115#[derive(Debug)]
116pub struct DefaultConnectionCodec;
117
118const DEFAULT_CONNECTION_VALUE: &str = "__default";
119
120impl ConnectionCodec for DefaultConnectionCodec {
121 fn try_encode(
122 &self,
123 _value: &dyn Connection,
124 _conn_options: &ConnectionOptions,
125 ) -> DFResult<Vec<u8>> {
126 Ok(DEFAULT_CONNECTION_VALUE.as_bytes().to_vec())
127 }
128
129 fn try_decode(
130 &self,
131 value: &[u8],
132 conn_options: &ConnectionOptions,
133 ) -> DFResult<Arc<dyn Connection>> {
134 if value != DEFAULT_CONNECTION_VALUE.as_bytes() {
135 return Err(DataFusionError::Execution(format!(
136 "DefaultConnectionCodec only supports {DEFAULT_CONNECTION_VALUE} value but got: {value:?}"
137 )));
138 }
139 let conn_options = conn_options.clone().with_pool_max_size(1);
140 tokio::task::block_in_place(|| {
141 tokio::runtime::Handle::current().block_on(async {
142 let pool = connect(&conn_options).await?;
143 let conn = pool.get().await?;
144 Ok::<_, DataFusionError>(conn)
145 })
146 })
147 }
148}
149
150#[derive(Debug, With)]
151pub struct RemotePhysicalCodec {
152 pub transform_codec: Arc<dyn TransformCodec>,
153 pub unparse_codec: Arc<dyn UnparseCodec>,
154 pub connection_codec: Arc<dyn ConnectionCodec>,
155}
156
157impl RemotePhysicalCodec {
158 pub fn new() -> Self {
159 Self {
160 transform_codec: Arc::new(DefaultTransformCodec {}),
161 unparse_codec: Arc::new(DefaultUnparseCodec {}),
162 connection_codec: Arc::new(DefaultConnectionCodec {}),
163 }
164 }
165}
166
167impl Default for RemotePhysicalCodec {
168 fn default() -> Self {
169 Self::new()
170 }
171}
172
173impl PhysicalExtensionCodec for RemotePhysicalCodec {
174 fn try_decode(
175 &self,
176 buf: &[u8],
177 inputs: &[Arc<dyn ExecutionPlan>],
178 registry: &dyn FunctionRegistry,
179 ) -> DFResult<Arc<dyn ExecutionPlan>> {
180 let remote_table_node =
181 protobuf::RemoteTablePhysicalPlanNode::decode(buf).map_err(|e| {
182 DataFusionError::Internal(format!(
183 "Failed to decode remote table physical plan node: {e:?}"
184 ))
185 })?;
186 let remote_table_plan = remote_table_node.remote_table_physical_plan_type.ok_or_else(|| {
187 DataFusionError::Internal(
188 "Failed to decode remote table physical plan node due to physical plan type is none".to_string()
189 )
190 })?;
191
192 match remote_table_plan {
193 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(proto) => {
194 let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
195 Arc::new(DefaultTransform {})
196 } else {
197 self.transform_codec.try_decode(&proto.transform)?
198 };
199
200 let source = parse_remote_source(proto.source.as_ref().ok_or(DataFusionError::Internal(
201 "remote source is not set".to_string(),
202 ))?)?;
203
204 let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
205 let remote_schema = proto
206 .remote_schema
207 .map(|schema| Arc::new(parse_remote_schema(&schema)));
208
209 let projection: Option<Vec<usize>> = proto
210 .projection
211 .map(|p| p.projection.iter().map(|n| *n as usize).collect());
212
213 let limit = proto.limit.map(|l| l as usize);
214
215 let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
216
217 let now = std::time::Instant::now();
218 let conn = self
219 .connection_codec
220 .try_decode(&proto.connection, &conn_options)?;
221 debug!(
222 "[remote-table] Decoding connection cost {}ms",
223 now.elapsed().as_millis()
224 );
225
226 Ok(Arc::new(RemoteTableScanExec::try_new(
227 conn_options,
228 source,
229 table_schema,
230 remote_schema,
231 projection,
232 proto.unparsed_filters,
233 limit,
234 transform,
235 conn,
236 )?))
237 }
238 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(proto) => {
239
240 if inputs.len() != 1 {
241 return Err(DataFusionError::Internal(
242 "RemoteTableInsertExec only support one input".to_string(),
243 ));
244 }
245
246 let input = inputs[0].clone();
247
248 let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
249 let remote_schema = Arc::new(parse_remote_schema(&proto.remote_schema.unwrap()));
250 let table = proto.table.unwrap().idents;
251
252 let unparser = if proto.unparser == DEFAULT_UNPARSE_ID.as_bytes() {
253 Arc::new(DefaultUnparser {})
254 } else {
255 self.unparse_codec.try_decode(&proto.unparser)?
256 };
257
258 let now = std::time::Instant::now();
259 let conn = self
260 .connection_codec
261 .try_decode(&proto.connection, &conn_options)?;
262 debug!(
263 "[remote-table] Decoding connection cost {}ms",
264 now.elapsed().as_millis()
265 );
266
267 Ok(Arc::new(RemoteTableInsertExec::new(
268 input,
269 conn_options,
270 unparser,
271 table,
272 remote_schema,
273 conn,
274 )))
275 }
276 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::MemoryDatasource(proto) => {
277 let partitions = parse_partitions(&proto.partitions)?;
278 let schema = Schema::try_from(&proto.schema.unwrap())?;
279 let projection = parse_projection(proto.projection.as_ref());
280
281 let sort_information = proto
282 .sort_information
283 .iter()
284 .map(|sort_exprs| {
285 let sort_exprs = parse_physical_sort_exprs(
286 sort_exprs.physical_sort_expr_nodes.as_slice(),
287 registry,
288 &schema,
289 self,
290 )?;
291 Ok::<_, DataFusionError>(sort_exprs)
292 })
293 .collect::<Result<Vec<_>, _>>()?;
294
295 let show_sizes = proto.show_sizes;
296 let fetch = proto.fetch.map(|f| f as usize);
297 let memory_source =
298 MemorySourceConfig::try_new(&partitions, Arc::new(schema), projection)?
299 .with_show_sizes(show_sizes)
300 .with_limit(fetch);
301
302 let memory_source =
303 MemorySourceConfig::try_with_sort_information(memory_source, sort_information)?;
304 Ok(DataSourceExec::from_data_source(memory_source))
305 }
306 }
307 }
308
309 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
310 if let Some(exec) = node.as_any().downcast_ref::<RemoteTableScanExec>() {
311 let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
312 DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
313 } else {
314 let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
315 assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
316 bytes
317 };
318
319 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
320 let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
321
322 let serialized_conn = self
323 .connection_codec
324 .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
325
326 let serialized_source = serialize_remote_source(&exec.source);
327
328 let proto = protobuf::RemoteTablePhysicalPlanNode {
329 remote_table_physical_plan_type: Some(
330 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(
331 protobuf::RemoteTableScanExec {
332 conn_options: Some(serialized_connection_options),
333 source: Some(serialized_source),
334 table_schema: Some(exec.table_schema.as_ref().try_into()?),
335 remote_schema,
336 projection: serialize_projection(exec.projection.as_ref()),
337 unparsed_filters: exec.unparsed_filters.clone(),
338 limit: exec.limit.map(|l| l as u32),
339 transform: serialized_transform,
340 connection: serialized_conn,
341 },
342 ),
343 ),
344 };
345
346 proto.encode(buf).map_err(|e| {
347 DataFusionError::Internal(format!(
348 "Failed to encode remote table scan exec plan: {e:?}"
349 ))
350 })?;
351 Ok(())
352 } else if let Some(exec) = node.as_any().downcast_ref::<RemoteTableInsertExec>() {
353 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
354 let remote_schema = serialize_remote_schema(&exec.remote_schema);
355 let serialized_table = protobuf::Identifiers {
356 idents: exec.table.clone(),
357 };
358 let serialized_unparser = self.unparse_codec.try_encode(exec.unparser.as_ref())?;
359 let serialized_conn = self
360 .connection_codec
361 .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
362
363 let proto = protobuf::RemoteTablePhysicalPlanNode {
364 remote_table_physical_plan_type: Some(
365 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(
366 protobuf::RemoteTableInsertExec {
367 conn_options: Some(serialized_connection_options),
368 table: Some(serialized_table),
369 remote_schema: Some(remote_schema),
370 unparser: serialized_unparser,
371 connection: serialized_conn,
372 },
373 ),
374 ),
375 };
376
377 proto.encode(buf).map_err(|e| {
378 DataFusionError::Internal(format!(
379 "Failed to encode remote table insert exec node: {e:?}"
380 ))
381 })?;
382
383 Ok(())
384 } else if let Some(exec) = node.as_any().downcast_ref::<DataSourceExec>() {
385 let source = exec.data_source();
386 if let Some(memory_source) = source.as_any().downcast_ref::<MemorySourceConfig>() {
387 let proto_partitions = serialize_partitions(memory_source.partitions())?;
388 let projection = serialize_projection(memory_source.projection().as_ref());
389 let sort_information = memory_source
390 .sort_information()
391 .iter()
392 .map(|ordering| {
393 let sort_exprs = serialize_physical_sort_exprs(ordering.clone(), self)?;
394 Ok::<_, DataFusionError>(
395 datafusion_proto::protobuf::PhysicalSortExprNodeCollection {
396 physical_sort_expr_nodes: sort_exprs,
397 },
398 )
399 })
400 .collect::<Result<Vec<_>, _>>()?;
401
402 let proto = protobuf::RemoteTablePhysicalPlanNode {
403 remote_table_physical_plan_type: Some(
404 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::MemoryDatasource(protobuf::MemoryDatasourceNode {
405 partitions: proto_partitions,
406 schema: Some(memory_source.original_schema().try_into()?),
407 projection,
408 sort_information,
409 show_sizes: memory_source.show_sizes(),
410 fetch: memory_source.fetch().map(|f| f as u32),
411 }),
412 ),
413 };
414
415 proto.encode(buf).map_err(|e| {
416 DataFusionError::Internal(format!(
417 "Failed to encode memory datasource node: {e:?}"
418 ))
419 })?;
420
421 Ok(())
422 } else {
423 Err(DataFusionError::NotImplemented(format!(
424 "RemotePhysicalCodec only support encoding MemorySourceConfig, got {source:?}",
425 )))
426 }
427 } else {
428 Err(DataFusionError::NotImplemented(format!(
429 "RemotePhysicalCodec does not support encoding {}",
430 node.name()
431 )))
432 }
433 }
434}
435
436fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
437 match options {
438 #[cfg(feature = "postgres")]
439 ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
440 connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
441 protobuf::PostgresConnectionOptions {
442 host: options.host.clone(),
443 port: options.port as u32,
444 username: options.username.clone(),
445 password: options.password.clone(),
446 database: options.database.clone(),
447 pool_max_size: options.pool_max_size as u32,
448 stream_chunk_size: options.stream_chunk_size as u32,
449 default_numeric_scale: options.default_numeric_scale as i32,
450 },
451 )),
452 },
453 #[cfg(feature = "mysql")]
454 ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
455 connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
456 protobuf::MysqlConnectionOptions {
457 host: options.host.clone(),
458 port: options.port as u32,
459 username: options.username.clone(),
460 password: options.password.clone(),
461 database: options.database.clone(),
462 pool_max_size: options.pool_max_size as u32,
463 stream_chunk_size: options.stream_chunk_size as u32,
464 },
465 )),
466 },
467 #[cfg(feature = "oracle")]
468 ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
469 connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
470 protobuf::OracleConnectionOptions {
471 host: options.host.clone(),
472 port: options.port as u32,
473 username: options.username.clone(),
474 password: options.password.clone(),
475 service_name: options.service_name.clone(),
476 pool_max_size: options.pool_max_size as u32,
477 stream_chunk_size: options.stream_chunk_size as u32,
478 },
479 )),
480 },
481 #[cfg(feature = "sqlite")]
482 ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
483 connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
484 protobuf::SqliteConnectionOptions {
485 path: options.path.to_str().unwrap().to_string(),
486 stream_chunk_size: options.stream_chunk_size as u32,
487 },
488 )),
489 },
490 #[cfg(feature = "dm")]
491 ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
492 connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
493 protobuf::DmConnectionOptions {
494 host: options.host.clone(),
495 port: options.port as u32,
496 username: options.username.clone(),
497 password: options.password.clone(),
498 schema: options.schema.clone(),
499 stream_chunk_size: options.stream_chunk_size as u32,
500 driver: options.driver.clone(),
501 },
502 )),
503 },
504 }
505}
506
507fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
508 match options.connection_options {
509 #[cfg(feature = "postgres")]
510 Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
511 ConnectionOptions::Postgres(PostgresConnectionOptions {
512 host: options.host,
513 port: options.port as u16,
514 username: options.username,
515 password: options.password,
516 database: options.database,
517 pool_max_size: options.pool_max_size as usize,
518 stream_chunk_size: options.stream_chunk_size as usize,
519 default_numeric_scale: options.default_numeric_scale as i8,
520 })
521 }
522 #[cfg(feature = "mysql")]
523 Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
524 ConnectionOptions::Mysql(MysqlConnectionOptions {
525 host: options.host,
526 port: options.port as u16,
527 username: options.username,
528 password: options.password,
529 database: options.database,
530 pool_max_size: options.pool_max_size as usize,
531 stream_chunk_size: options.stream_chunk_size as usize,
532 })
533 }
534 #[cfg(feature = "oracle")]
535 Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
536 ConnectionOptions::Oracle(OracleConnectionOptions {
537 host: options.host,
538 port: options.port as u16,
539 username: options.username,
540 password: options.password,
541 service_name: options.service_name,
542 pool_max_size: options.pool_max_size as usize,
543 stream_chunk_size: options.stream_chunk_size as usize,
544 })
545 }
546 #[cfg(feature = "sqlite")]
547 Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
548 ConnectionOptions::Sqlite(SqliteConnectionOptions {
549 path: std::path::Path::new(&options.path).to_path_buf(),
550 stream_chunk_size: options.stream_chunk_size as usize,
551 })
552 }
553 #[cfg(feature = "dm")]
554 Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
555 ConnectionOptions::Dm(DmConnectionOptions {
556 host: options.host,
557 port: options.port as u16,
558 username: options.username,
559 password: options.password,
560 schema: options.schema,
561 stream_chunk_size: options.stream_chunk_size as usize,
562 driver: options.driver,
563 })
564 }
565 _ => panic!("Failed to parse connection options: {options:?}"),
566 }
567}
568
569fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<protobuf::Projection> {
570 projection.map(|p| protobuf::Projection {
571 projection: p.iter().map(|n| *n as u32).collect(),
572 })
573}
574
575fn parse_projection(projection: Option<&protobuf::Projection>) -> Option<Vec<usize>> {
576 projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
577}
578
579fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
580 let fields = remote_schema
581 .fields
582 .iter()
583 .map(serialize_remote_field)
584 .collect::<Vec<_>>();
585
586 protobuf::RemoteSchema { fields }
587}
588
589fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
590 protobuf::RemoteField {
591 name: remote_field.name.clone(),
592 remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
593 nullable: remote_field.nullable,
594 auto_increment: remote_field.auto_increment,
595 }
596}
597
598fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
599 match remote_type {
600 RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
601 r#type: Some(protobuf::remote_type::Type::PostgresInt2(
602 protobuf::PostgresInt2 {},
603 )),
604 },
605 RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
606 r#type: Some(protobuf::remote_type::Type::PostgresInt4(
607 protobuf::PostgresInt4 {},
608 )),
609 },
610 RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
611 r#type: Some(protobuf::remote_type::Type::PostgresInt8(
612 protobuf::PostgresInt8 {},
613 )),
614 },
615 RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
616 r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
617 protobuf::PostgresFloat4 {},
618 )),
619 },
620 RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
621 r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
622 protobuf::PostgresFloat8 {},
623 )),
624 },
625 RemoteType::Postgres(PostgresType::Numeric(precision, scale)) => protobuf::RemoteType {
626 r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
627 protobuf::PostgresNumeric {
628 precision: *precision as u32,
629 scale: *scale as i32,
630 },
631 )),
632 },
633 RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
634 r#type: Some(protobuf::remote_type::Type::PostgresName(
635 protobuf::PostgresName {},
636 )),
637 },
638 RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
639 r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
640 protobuf::PostgresVarchar {},
641 )),
642 },
643 RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
644 r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
645 protobuf::PostgresBpchar {},
646 )),
647 },
648 RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
649 r#type: Some(protobuf::remote_type::Type::PostgresText(
650 protobuf::PostgresText {},
651 )),
652 },
653 RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
654 r#type: Some(protobuf::remote_type::Type::PostgresBytea(
655 protobuf::PostgresBytea {},
656 )),
657 },
658 RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
659 r#type: Some(protobuf::remote_type::Type::PostgresDate(
660 protobuf::PostgresDate {},
661 )),
662 },
663 RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
664 r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
665 protobuf::PostgresTimestamp {},
666 )),
667 },
668 RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
669 r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
670 protobuf::PostgresTimestampTz {},
671 )),
672 },
673 RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
674 r#type: Some(protobuf::remote_type::Type::PostgresTime(
675 protobuf::PostgresTime {},
676 )),
677 },
678 RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
679 r#type: Some(protobuf::remote_type::Type::PostgresInterval(
680 protobuf::PostgresInterval {},
681 )),
682 },
683 RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
684 r#type: Some(protobuf::remote_type::Type::PostgresBool(
685 protobuf::PostgresBool {},
686 )),
687 },
688 RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
689 r#type: Some(protobuf::remote_type::Type::PostgresJson(
690 protobuf::PostgresJson {},
691 )),
692 },
693 RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
694 r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
695 protobuf::PostgresJsonb {},
696 )),
697 },
698 RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
699 r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
700 protobuf::PostgresInt2Array {},
701 )),
702 },
703 RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
704 r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
705 protobuf::PostgresInt4Array {},
706 )),
707 },
708 RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
709 r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
710 protobuf::PostgresInt8Array {},
711 )),
712 },
713 RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
714 r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
715 protobuf::PostgresFloat4Array {},
716 )),
717 },
718 RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
719 r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
720 protobuf::PostgresFloat8Array {},
721 )),
722 },
723 RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
724 r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
725 protobuf::PostgresVarcharArray {},
726 )),
727 },
728 RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
729 r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
730 protobuf::PostgresBpcharArray {},
731 )),
732 },
733 RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
734 r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
735 protobuf::PostgresTextArray {},
736 )),
737 },
738 RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
739 r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
740 protobuf::PostgresByteaArray {},
741 )),
742 },
743 RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
744 r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
745 protobuf::PostgresBoolArray {},
746 )),
747 },
748 RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
749 r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
750 protobuf::PostgresPostGisGeometry {},
751 )),
752 },
753 RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
754 r#type: Some(protobuf::remote_type::Type::PostgresOid(
755 protobuf::PostgresOid {},
756 )),
757 },
758 RemoteType::Postgres(PostgresType::Xml) => protobuf::RemoteType {
759 r#type: Some(protobuf::remote_type::Type::PostgresXml(
760 protobuf::PostgresXml {},
761 )),
762 },
763 RemoteType::Postgres(PostgresType::Uuid) => protobuf::RemoteType {
764 r#type: Some(protobuf::remote_type::Type::PostgresUuid(
765 protobuf::PostgresUuid {},
766 )),
767 },
768
769 RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
770 r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
771 protobuf::MysqlTinyInt {},
772 )),
773 },
774 RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
775 r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
776 protobuf::MysqlTinyIntUnsigned {},
777 )),
778 },
779 RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
780 r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
781 protobuf::MysqlSmallInt {},
782 )),
783 },
784 RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
785 r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
786 protobuf::MysqlSmallIntUnsigned {},
787 )),
788 },
789 RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
790 r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
791 protobuf::MysqlMediumInt {},
792 )),
793 },
794 RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
795 r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
796 protobuf::MysqlMediumIntUnsigned {},
797 )),
798 },
799 RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
800 r#type: Some(protobuf::remote_type::Type::MysqlInteger(
801 protobuf::MysqlInteger {},
802 )),
803 },
804 RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
805 r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
806 protobuf::MysqlIntegerUnsigned {},
807 )),
808 },
809 RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
810 r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
811 protobuf::MysqlBigInt {},
812 )),
813 },
814 RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
815 r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
816 protobuf::MysqlBigIntUnsigned {},
817 )),
818 },
819 RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
820 r#type: Some(protobuf::remote_type::Type::MysqlFloat(
821 protobuf::MysqlFloat {},
822 )),
823 },
824 RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
825 r#type: Some(protobuf::remote_type::Type::MysqlDouble(
826 protobuf::MysqlDouble {},
827 )),
828 },
829 RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
830 r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
831 protobuf::MysqlDecimal {
832 precision: *precision as u32,
833 scale: *scale as u32,
834 },
835 )),
836 },
837 RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
838 r#type: Some(protobuf::remote_type::Type::MysqlDate(
839 protobuf::MysqlDate {},
840 )),
841 },
842 RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
843 r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
844 protobuf::MysqlDateTime {},
845 )),
846 },
847 RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
848 r#type: Some(protobuf::remote_type::Type::MysqlTime(
849 protobuf::MysqlTime {},
850 )),
851 },
852 RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
853 r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
854 protobuf::MysqlTimestamp {},
855 )),
856 },
857 RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
858 r#type: Some(protobuf::remote_type::Type::MysqlYear(
859 protobuf::MysqlYear {},
860 )),
861 },
862 RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
863 r#type: Some(protobuf::remote_type::Type::MysqlChar(
864 protobuf::MysqlChar {},
865 )),
866 },
867 RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
868 r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
869 protobuf::MysqlVarchar {},
870 )),
871 },
872 RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
873 r#type: Some(protobuf::remote_type::Type::MysqlBinary(
874 protobuf::MysqlBinary {},
875 )),
876 },
877 RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
878 r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
879 protobuf::MysqlVarbinary {},
880 )),
881 },
882 RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
883 r#type: Some(protobuf::remote_type::Type::MysqlText(
884 protobuf::MysqlText { length: *len },
885 )),
886 },
887 RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
888 r#type: Some(protobuf::remote_type::Type::MysqlBlob(
889 protobuf::MysqlBlob { length: *len },
890 )),
891 },
892 RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
893 r#type: Some(protobuf::remote_type::Type::MysqlJson(
894 protobuf::MysqlJson {},
895 )),
896 },
897 RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
898 r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
899 protobuf::MysqlGeometry {},
900 )),
901 },
902
903 RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
904 r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
905 protobuf::OracleVarchar2 { length: *len },
906 )),
907 },
908 RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
909 r#type: Some(protobuf::remote_type::Type::OracleChar(
910 protobuf::OracleChar { length: *len },
911 )),
912 },
913 RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
914 r#type: Some(protobuf::remote_type::Type::OracleNumber(
915 protobuf::OracleNumber {
916 precision: *precision as u32,
917 scale: *scale as i32,
918 },
919 )),
920 },
921 RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
922 r#type: Some(protobuf::remote_type::Type::OracleDate(
923 protobuf::OracleDate {},
924 )),
925 },
926 RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
927 r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
928 protobuf::OracleTimestamp {},
929 )),
930 },
931 RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
932 r#type: Some(protobuf::remote_type::Type::OracleBoolean(
933 protobuf::OracleBoolean {},
934 )),
935 },
936 RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
937 r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
938 protobuf::OracleBinaryFloat {},
939 )),
940 },
941 RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
942 r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
943 protobuf::OracleBinaryDouble {},
944 )),
945 },
946 RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
947 r#type: Some(protobuf::remote_type::Type::OracleBlob(
948 protobuf::OracleBlob {},
949 )),
950 },
951 RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
952 r#type: Some(protobuf::remote_type::Type::OracleFloat(
953 protobuf::OracleFloat {
954 precision: *precision as u32,
955 },
956 )),
957 },
958 RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
959 r#type: Some(protobuf::remote_type::Type::OracleNchar(
960 protobuf::OracleNChar { length: *len },
961 )),
962 },
963 RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
964 r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
965 protobuf::OracleNVarchar2 { length: *len },
966 )),
967 },
968 RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
969 r#type: Some(protobuf::remote_type::Type::OracleRaw(
970 protobuf::OracleRaw { length: *len },
971 )),
972 },
973 RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
974 r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
975 protobuf::OracleLongRaw {},
976 )),
977 },
978 RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
979 r#type: Some(protobuf::remote_type::Type::OracleLong(
980 protobuf::OracleLong {},
981 )),
982 },
983 RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
984 r#type: Some(protobuf::remote_type::Type::OracleClob(
985 protobuf::OracleClob {},
986 )),
987 },
988 RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
989 r#type: Some(protobuf::remote_type::Type::OracleNclob(
990 protobuf::OracleNClob {},
991 )),
992 },
993 RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
994 r#type: Some(protobuf::remote_type::Type::SqliteNull(
995 protobuf::SqliteNull {},
996 )),
997 },
998 RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
999 r#type: Some(protobuf::remote_type::Type::SqliteInteger(
1000 protobuf::SqliteInteger {},
1001 )),
1002 },
1003 RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
1004 r#type: Some(protobuf::remote_type::Type::SqliteReal(
1005 protobuf::SqliteReal {},
1006 )),
1007 },
1008 RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
1009 r#type: Some(protobuf::remote_type::Type::SqliteText(
1010 protobuf::SqliteText {},
1011 )),
1012 },
1013 RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
1014 r#type: Some(protobuf::remote_type::Type::SqliteBlob(
1015 protobuf::SqliteBlob {},
1016 )),
1017 },
1018 RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
1019 r#type: Some(protobuf::remote_type::Type::DmTinyInt(
1020 protobuf::DmTinyInt {},
1021 )),
1022 },
1023 RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
1024 r#type: Some(protobuf::remote_type::Type::DmSmallInt(
1025 protobuf::DmSmallInt {},
1026 )),
1027 },
1028 RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
1029 r#type: Some(protobuf::remote_type::Type::DmInteger(
1030 protobuf::DmInteger {},
1031 )),
1032 },
1033 RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
1034 r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
1035 },
1036 RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
1037 r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
1038 },
1039 RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
1040 r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
1041 },
1042 RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
1043 r#type: Some(protobuf::remote_type::Type::DmNumeric(
1044 protobuf::DmNumeric {
1045 precision: *precision as u32,
1046 scale: *scale as i32,
1047 },
1048 )),
1049 },
1050 RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
1051 r#type: Some(protobuf::remote_type::Type::DmDecimal(
1052 protobuf::DmDecimal {
1053 precision: *precision as u32,
1054 scale: *scale as i32,
1055 },
1056 )),
1057 },
1058 RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
1059 r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
1060 length: len.map(|s| s as u32),
1061 })),
1062 },
1063 RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
1064 r#type: Some(protobuf::remote_type::Type::DmVarchar(
1065 protobuf::DmVarchar {
1066 length: len.map(|s| s as u32),
1067 },
1068 )),
1069 },
1070 RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
1071 r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
1072 },
1073 RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
1074 r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
1075 length: *len as u32,
1076 })),
1077 },
1078 RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
1079 r#type: Some(protobuf::remote_type::Type::DmVarbinary(
1080 protobuf::DmVarbinary {
1081 length: len.map(|s| s as u32),
1082 },
1083 )),
1084 },
1085 RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
1086 r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
1087 },
1088 RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
1089 r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
1090 },
1091 RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
1092 r#type: Some(protobuf::remote_type::Type::DmTimestamp(
1093 protobuf::DmTimestamp {
1094 precision: *precision as u32,
1095 },
1096 )),
1097 },
1098 RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
1099 r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
1100 precision: *precision as u32,
1101 })),
1102 },
1103 RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
1104 r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
1105 },
1106 }
1107}
1108
1109fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
1110 let fields = remote_schema
1111 .fields
1112 .iter()
1113 .map(parse_remote_field)
1114 .collect::<Vec<_>>();
1115
1116 RemoteSchema { fields }
1117}
1118
1119fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
1120 RemoteField {
1121 name: field.name.clone(),
1122 remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
1123 nullable: field.nullable,
1124 auto_increment: field.auto_increment,
1125 }
1126}
1127
1128fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
1129 match remote_type.r#type.as_ref().unwrap() {
1130 protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
1131 protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
1132 protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
1133 protobuf::remote_type::Type::PostgresFloat4(_) => {
1134 RemoteType::Postgres(PostgresType::Float4)
1135 }
1136 protobuf::remote_type::Type::PostgresFloat8(_) => {
1137 RemoteType::Postgres(PostgresType::Float8)
1138 }
1139 protobuf::remote_type::Type::PostgresNumeric(numeric) => RemoteType::Postgres(
1140 PostgresType::Numeric(numeric.precision as u8, numeric.scale as i8),
1141 ),
1142 protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
1143 protobuf::remote_type::Type::PostgresVarchar(_) => {
1144 RemoteType::Postgres(PostgresType::Varchar)
1145 }
1146 protobuf::remote_type::Type::PostgresBpchar(_) => {
1147 RemoteType::Postgres(PostgresType::Bpchar)
1148 }
1149 protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
1150 protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
1151 protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
1152 protobuf::remote_type::Type::PostgresTimestamp(_) => {
1153 RemoteType::Postgres(PostgresType::Timestamp)
1154 }
1155 protobuf::remote_type::Type::PostgresTimestampTz(_) => {
1156 RemoteType::Postgres(PostgresType::TimestampTz)
1157 }
1158 protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
1159 protobuf::remote_type::Type::PostgresInterval(_) => {
1160 RemoteType::Postgres(PostgresType::Interval)
1161 }
1162 protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
1163 protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
1164 protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
1165 protobuf::remote_type::Type::PostgresInt2Array(_) => {
1166 RemoteType::Postgres(PostgresType::Int2Array)
1167 }
1168 protobuf::remote_type::Type::PostgresInt4Array(_) => {
1169 RemoteType::Postgres(PostgresType::Int4Array)
1170 }
1171 protobuf::remote_type::Type::PostgresInt8Array(_) => {
1172 RemoteType::Postgres(PostgresType::Int8Array)
1173 }
1174 protobuf::remote_type::Type::PostgresFloat4Array(_) => {
1175 RemoteType::Postgres(PostgresType::Float4Array)
1176 }
1177 protobuf::remote_type::Type::PostgresFloat8Array(_) => {
1178 RemoteType::Postgres(PostgresType::Float8Array)
1179 }
1180 protobuf::remote_type::Type::PostgresVarcharArray(_) => {
1181 RemoteType::Postgres(PostgresType::VarcharArray)
1182 }
1183 protobuf::remote_type::Type::PostgresBpcharArray(_) => {
1184 RemoteType::Postgres(PostgresType::BpcharArray)
1185 }
1186 protobuf::remote_type::Type::PostgresTextArray(_) => {
1187 RemoteType::Postgres(PostgresType::TextArray)
1188 }
1189 protobuf::remote_type::Type::PostgresByteaArray(_) => {
1190 RemoteType::Postgres(PostgresType::ByteaArray)
1191 }
1192 protobuf::remote_type::Type::PostgresBoolArray(_) => {
1193 RemoteType::Postgres(PostgresType::BoolArray)
1194 }
1195 protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
1196 RemoteType::Postgres(PostgresType::PostGisGeometry)
1197 }
1198 protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
1199 protobuf::remote_type::Type::PostgresXml(_) => RemoteType::Postgres(PostgresType::Xml),
1200 protobuf::remote_type::Type::PostgresUuid(_) => RemoteType::Postgres(PostgresType::Uuid),
1201 protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
1202 protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
1203 RemoteType::Mysql(MysqlType::TinyIntUnsigned)
1204 }
1205 protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
1206 protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
1207 RemoteType::Mysql(MysqlType::SmallIntUnsigned)
1208 }
1209 protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
1210 protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
1211 RemoteType::Mysql(MysqlType::MediumIntUnsigned)
1212 }
1213 protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
1214 protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
1215 RemoteType::Mysql(MysqlType::IntegerUnsigned)
1216 }
1217 protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
1218 protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
1219 RemoteType::Mysql(MysqlType::BigIntUnsigned)
1220 }
1221 protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
1222 protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
1223 protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
1224 MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
1225 ),
1226 protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
1227 protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
1228 protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
1229 protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
1230 protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
1231 protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
1232 protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
1233 protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
1234 protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
1235 protobuf::remote_type::Type::MysqlText(text) => {
1236 RemoteType::Mysql(MysqlType::Text(text.length))
1237 }
1238 protobuf::remote_type::Type::MysqlBlob(blob) => {
1239 RemoteType::Mysql(MysqlType::Blob(blob.length))
1240 }
1241 protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
1242 protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
1243 protobuf::remote_type::Type::OracleVarchar2(varchar) => {
1244 RemoteType::Oracle(OracleType::Varchar2(varchar.length))
1245 }
1246 protobuf::remote_type::Type::OracleChar(char) => {
1247 RemoteType::Oracle(OracleType::Char(char.length))
1248 }
1249 protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
1250 OracleType::Number(number.precision as u8, number.scale as i8),
1251 ),
1252 protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
1253 protobuf::remote_type::Type::OracleTimestamp(_) => {
1254 RemoteType::Oracle(OracleType::Timestamp)
1255 }
1256 protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
1257 protobuf::remote_type::Type::OracleBinaryFloat(_) => {
1258 RemoteType::Oracle(OracleType::BinaryFloat)
1259 }
1260 protobuf::remote_type::Type::OracleBinaryDouble(_) => {
1261 RemoteType::Oracle(OracleType::BinaryDouble)
1262 }
1263 protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
1264 RemoteType::Oracle(OracleType::Float(*precision as u8))
1265 }
1266 protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
1267 RemoteType::Oracle(OracleType::NChar(*length))
1268 }
1269 protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
1270 RemoteType::Oracle(OracleType::NVarchar2(*length))
1271 }
1272 protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
1273 RemoteType::Oracle(OracleType::Raw(*length))
1274 }
1275 protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
1276 protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
1277 protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
1278 protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
1279 protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
1280 protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1281 protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1282 protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1283 protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1284 protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1285 protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1286 protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1287 protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1288 protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1289 protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1290 protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1291 protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1292 RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1293 }
1294 protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1295 RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1296 }
1297 protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1298 RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1299 }
1300 protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1301 RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1302 }
1303 protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1304 protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1305 RemoteType::Dm(DmType::Binary(*length as u16))
1306 }
1307 protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1308 RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1309 }
1310 protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1311 protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1312 protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1313 RemoteType::Dm(DmType::Timestamp(*precision as u8))
1314 }
1315 protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1316 RemoteType::Dm(DmType::Time(*precision as u8))
1317 }
1318 protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1319 }
1320}
1321
1322fn serialize_remote_source(source: &RemoteSource) -> protobuf::RemoteSource {
1323 match source {
1324 RemoteSource::Query(query) => protobuf::RemoteSource {
1325 source: Some(protobuf::remote_source::Source::Query(query.clone())),
1326 },
1327 RemoteSource::Table(table_identifiers) => protobuf::RemoteSource {
1328 source: Some(protobuf::remote_source::Source::Table(
1329 protobuf::Identifiers {
1330 idents: table_identifiers.clone(),
1331 },
1332 )),
1333 },
1334 }
1335}
1336
1337fn parse_remote_source(source: &protobuf::RemoteSource) -> DFResult<RemoteSource> {
1338 let source = source.source.as_ref().ok_or(DataFusionError::Internal(
1339 "remote source is not set".to_string(),
1340 ))?;
1341 match source {
1342 protobuf::remote_source::Source::Query(query) => Ok(RemoteSource::Query(query.clone())),
1343 protobuf::remote_source::Source::Table(table_identifiers) => {
1344 Ok(RemoteSource::Table(table_identifiers.idents.clone()))
1345 }
1346 }
1347}
1348
1349fn serialize_partitions(partitions: &[Vec<RecordBatch>]) -> Result<Vec<Vec<u8>>, DataFusionError> {
1350 let mut proto_partitions = vec![];
1351 for partition in partitions {
1352 if partition.is_empty() {
1353 proto_partitions.push(vec![]);
1354 continue;
1355 }
1356 let mut proto_partition = vec![];
1357 let mut stream_writer =
1358 StreamWriter::try_new(&mut proto_partition, &partition[0].schema())?;
1359 for batch in partition {
1360 stream_writer.write(batch)?;
1361 }
1362 stream_writer.finish()?;
1363 proto_partitions.push(proto_partition);
1364 }
1365 Ok(proto_partitions)
1366}
1367
1368fn parse_partitions(
1369 proto_partitions: &[Vec<u8>],
1370) -> Result<Vec<Vec<RecordBatch>>, DataFusionError> {
1371 let mut partitions = vec![];
1372 for proto_partition in proto_partitions {
1373 if proto_partition.is_empty() {
1374 partitions.push(vec![]);
1375 continue;
1376 }
1377 let mut partition = vec![];
1378 let stream_reader = StreamReader::try_new(proto_partition.as_slice(), None)?;
1379 for batch in stream_reader {
1380 partition.push(batch?);
1381 }
1382 partitions.push(partition);
1383 }
1384 Ok(partitions)
1385}