1#[cfg(feature = "dm")]
2use crate::DmConnectionOptions;
3#[cfg(feature = "mysql")]
4use crate::MysqlConnectionOptions;
5#[cfg(feature = "oracle")]
6use crate::OracleConnectionOptions;
7#[cfg(feature = "postgres")]
8use crate::PostgresConnectionOptions;
9#[cfg(feature = "sqlite")]
10use crate::SqliteConnectionOptions;
11use crate::generated::prost as protobuf;
12use crate::{
13 Connection, ConnectionOptions, DFResult, DefaultLiteralizer, DefaultTransform, DmType,
14 Literalize, MysqlType, OracleType, PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef,
15 RemoteSource, RemoteTableInsertExec, RemoteTableScanExec, RemoteType, SqliteType, Transform,
16 connect,
17};
18use datafusion::arrow::array::RecordBatch;
19use datafusion::arrow::datatypes::{Schema, SchemaRef};
20use datafusion::arrow::ipc::reader::StreamReader;
21use datafusion::arrow::ipc::writer::StreamWriter;
22use datafusion::catalog::memory::{DataSourceExec, MemorySourceConfig};
23use datafusion::common::DataFusionError;
24use datafusion::datasource::source::DataSource;
25use datafusion::execution::FunctionRegistry;
26use datafusion::physical_expr::LexOrdering;
27use datafusion::physical_plan::ExecutionPlan;
28use datafusion_proto::convert_required;
29use datafusion_proto::physical_plan::PhysicalExtensionCodec;
30use datafusion_proto::physical_plan::from_proto::parse_physical_sort_exprs;
31use datafusion_proto::physical_plan::to_proto::serialize_physical_sort_exprs;
32use datafusion_proto::protobuf::proto_error;
33use derive_with::With;
34use log::debug;
35use prost::Message;
36use std::fmt::Debug;
37use std::sync::Arc;
38
39pub trait TransformCodec: Debug + Send + Sync {
40 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
41 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
42}
43
44#[derive(Debug)]
45pub struct DefaultTransformCodec {}
46
47const DEFAULT_TRANSFORM_ID: &str = "__default";
48
49impl TransformCodec for DefaultTransformCodec {
50 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
51 if value.as_any().is::<DefaultTransform>() {
52 Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
53 } else {
54 Err(DataFusionError::Execution(format!(
55 "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
56 )))
57 }
58 }
59
60 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
61 if value == DEFAULT_TRANSFORM_ID.as_bytes() {
62 Ok(Arc::new(DefaultTransform {}))
63 } else {
64 Err(DataFusionError::Execution(
65 "DefaultTransformCodec only supports DefaultTransform".to_string(),
66 ))
67 }
68 }
69}
70
71pub trait LiteralizeCodec: Debug + Send + Sync {
72 fn try_encode(&self, value: &dyn Literalize) -> DFResult<Vec<u8>>;
73 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Literalize>>;
74}
75
76#[derive(Debug)]
77pub struct DefaultLiteralizeCodec {}
78
79const DEFAULT_LITERALIZE_ID: &str = "__default";
80
81impl LiteralizeCodec for DefaultLiteralizeCodec {
82 fn try_encode(&self, value: &dyn Literalize) -> DFResult<Vec<u8>> {
83 if value.as_any().is::<DefaultLiteralizer>() {
84 Ok(DEFAULT_LITERALIZE_ID.as_bytes().to_vec())
85 } else {
86 Err(DataFusionError::Execution(format!(
87 "DefaultLiteralizeCodec does not support literalize: {value:?}, please implement a custom LiteralizeCodec."
88 )))
89 }
90 }
91
92 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Literalize>> {
93 if value == DEFAULT_LITERALIZE_ID.as_bytes() {
94 Ok(Arc::new(DefaultLiteralizer {}))
95 } else {
96 Err(DataFusionError::Execution(
97 "DefaultLiteralizeCodec only supports DefaultLiteralizer".to_string(),
98 ))
99 }
100 }
101}
102
103pub trait ConnectionCodec: Debug + Send + Sync {
104 fn try_encode(
105 &self,
106 value: &dyn Connection,
107 conn_options: &ConnectionOptions,
108 ) -> DFResult<Vec<u8>>;
109 fn try_decode(
110 &self,
111 value: &[u8],
112 conn_options: &ConnectionOptions,
113 ) -> DFResult<Arc<dyn Connection>>;
114}
115
116#[derive(Debug)]
117pub struct DefaultConnectionCodec;
118
119const DEFAULT_CONNECTION_VALUE: &str = "__default";
120
121impl ConnectionCodec for DefaultConnectionCodec {
122 fn try_encode(
123 &self,
124 _value: &dyn Connection,
125 _conn_options: &ConnectionOptions,
126 ) -> DFResult<Vec<u8>> {
127 Ok(DEFAULT_CONNECTION_VALUE.as_bytes().to_vec())
128 }
129
130 fn try_decode(
131 &self,
132 value: &[u8],
133 conn_options: &ConnectionOptions,
134 ) -> DFResult<Arc<dyn Connection>> {
135 if value != DEFAULT_CONNECTION_VALUE.as_bytes() {
136 return Err(DataFusionError::Execution(format!(
137 "DefaultConnectionCodec only supports {DEFAULT_CONNECTION_VALUE} value but got: {value:?}"
138 )));
139 }
140 let conn_options = conn_options.clone().with_pool_max_size(1);
141 tokio::task::block_in_place(|| {
142 tokio::runtime::Handle::current().block_on(async {
143 let pool = connect(&conn_options).await?;
144 let conn = pool.get().await?;
145 Ok::<_, DataFusionError>(conn)
146 })
147 })
148 }
149}
150
151#[derive(Debug, With)]
152pub struct RemotePhysicalCodec {
153 pub transform_codec: Arc<dyn TransformCodec>,
154 pub literalize_codec: Arc<dyn LiteralizeCodec>,
155 pub connection_codec: Arc<dyn ConnectionCodec>,
156}
157
158impl RemotePhysicalCodec {
159 pub fn new() -> Self {
160 Self {
161 transform_codec: Arc::new(DefaultTransformCodec {}),
162 literalize_codec: Arc::new(DefaultLiteralizeCodec {}),
163 connection_codec: Arc::new(DefaultConnectionCodec {}),
164 }
165 }
166}
167
168impl Default for RemotePhysicalCodec {
169 fn default() -> Self {
170 Self::new()
171 }
172}
173
174impl PhysicalExtensionCodec for RemotePhysicalCodec {
175 fn try_decode(
176 &self,
177 buf: &[u8],
178 inputs: &[Arc<dyn ExecutionPlan>],
179 registry: &dyn FunctionRegistry,
180 ) -> DFResult<Arc<dyn ExecutionPlan>> {
181 let remote_table_node =
182 protobuf::RemoteTablePhysicalPlanNode::decode(buf).map_err(|e| {
183 DataFusionError::Internal(format!(
184 "Failed to decode remote table physical plan node: {e:?}"
185 ))
186 })?;
187 let remote_table_plan = remote_table_node.remote_table_physical_plan_type.ok_or_else(|| {
188 DataFusionError::Internal(
189 "Failed to decode remote table physical plan node due to physical plan type is none".to_string()
190 )
191 })?;
192
193 match remote_table_plan {
194 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(proto) => {
195 let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
196 Arc::new(DefaultTransform {})
197 } else {
198 self.transform_codec.try_decode(&proto.transform)?
199 };
200
201 let source = parse_remote_source(proto.source.as_ref().ok_or(DataFusionError::Internal(
202 "remote source is not set".to_string(),
203 ))?)?;
204
205 let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
206 let remote_schema = proto
207 .remote_schema
208 .map(|schema| Arc::new(parse_remote_schema(&schema)));
209
210 let projection: Option<Vec<usize>> = proto
211 .projection
212 .map(|p| p.projection.iter().map(|n| *n as usize).collect());
213
214 let limit = proto.limit.map(|l| l as usize);
215
216 let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
217
218 let now = std::time::Instant::now();
219 let conn = self
220 .connection_codec
221 .try_decode(&proto.connection, &conn_options)?;
222 debug!(
223 "[remote-table] Decoding connection cost {}ms",
224 now.elapsed().as_millis()
225 );
226
227 Ok(Arc::new(RemoteTableScanExec::try_new(
228 conn_options,
229 source,
230 table_schema,
231 remote_schema,
232 projection,
233 proto.unparsed_filters,
234 limit,
235 transform,
236 conn,
237 )?))
238 }
239 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(proto) => {
240
241 if inputs.len() != 1 {
242 return Err(DataFusionError::Internal(
243 "RemoteTableInsertExec only support one input".to_string(),
244 ));
245 }
246
247 let input = inputs[0].clone();
248
249 let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
250 let remote_schema = Arc::new(parse_remote_schema(&proto.remote_schema.unwrap()));
251 let table = proto.table.unwrap().idents;
252
253 let literalizer = if proto.literalizer == DEFAULT_LITERALIZE_ID.as_bytes() {
254 Arc::new(DefaultLiteralizer {})
255 } else {
256 self.literalize_codec.try_decode(&proto.literalizer)?
257 };
258
259 let now = std::time::Instant::now();
260 let conn = self
261 .connection_codec
262 .try_decode(&proto.connection, &conn_options)?;
263 debug!(
264 "[remote-table] Decoding connection cost {}ms",
265 now.elapsed().as_millis()
266 );
267
268 Ok(Arc::new(RemoteTableInsertExec::new(
269 input,
270 conn_options,
271 literalizer,
272 table,
273 remote_schema,
274 conn,
275 )))
276 }
277 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::MemoryDatasource(proto) => {
278 let partitions = parse_partitions(&proto.partitions)?;
279 let schema = Schema::try_from(&proto.schema.unwrap())?;
280 let projection = parse_projection(proto.projection.as_ref());
281
282 let sort_information = proto
283 .sort_information
284 .iter()
285 .map(|sort_exprs| {
286 let sort_exprs = parse_physical_sort_exprs(
287 sort_exprs.physical_sort_expr_nodes.as_slice(),
288 registry,
289 &schema,
290 self,
291 )?;
292 let lex_ordering = LexOrdering::new(sort_exprs).expect("lex ordering is not empty");
293 Ok::<_, DataFusionError>(lex_ordering)
294 })
295 .collect::<Result<Vec<_>, _>>()?;
296
297 let show_sizes = proto.show_sizes;
298 let fetch = proto.fetch.map(|f| f as usize);
299 let memory_source =
300 MemorySourceConfig::try_new(&partitions, Arc::new(schema), projection)?
301 .with_show_sizes(show_sizes)
302 .with_limit(fetch);
303
304 let memory_source =
305 MemorySourceConfig::try_with_sort_information(memory_source, sort_information)?;
306 Ok(DataSourceExec::from_data_source(memory_source))
307 }
308 }
309 }
310
311 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
312 if let Some(exec) = node.as_any().downcast_ref::<RemoteTableScanExec>() {
313 let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
314 DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
315 } else {
316 let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
317 assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
318 bytes
319 };
320
321 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
322 let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
323
324 let serialized_conn = self
325 .connection_codec
326 .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
327
328 let serialized_source = serialize_remote_source(&exec.source);
329
330 let proto = protobuf::RemoteTablePhysicalPlanNode {
331 remote_table_physical_plan_type: Some(
332 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(
333 protobuf::RemoteTableScanExec {
334 conn_options: Some(serialized_connection_options),
335 source: Some(serialized_source),
336 table_schema: Some(exec.table_schema.as_ref().try_into()?),
337 remote_schema,
338 projection: serialize_projection(exec.projection.as_ref()),
339 unparsed_filters: exec.unparsed_filters.clone(),
340 limit: exec.limit.map(|l| l as u32),
341 transform: serialized_transform,
342 connection: serialized_conn,
343 },
344 ),
345 ),
346 };
347
348 proto.encode(buf).map_err(|e| {
349 DataFusionError::Internal(format!(
350 "Failed to encode remote table scan exec plan: {e:?}"
351 ))
352 })?;
353 Ok(())
354 } else if let Some(exec) = node.as_any().downcast_ref::<RemoteTableInsertExec>() {
355 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
356 let remote_schema = serialize_remote_schema(&exec.remote_schema);
357 let serialized_table = protobuf::Identifiers {
358 idents: exec.table.clone(),
359 };
360 let serialized_literalizer = self
361 .literalize_codec
362 .try_encode(exec.literalizer.as_ref())?;
363 let serialized_conn = self
364 .connection_codec
365 .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
366
367 let proto = protobuf::RemoteTablePhysicalPlanNode {
368 remote_table_physical_plan_type: Some(
369 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(
370 protobuf::RemoteTableInsertExec {
371 conn_options: Some(serialized_connection_options),
372 table: Some(serialized_table),
373 remote_schema: Some(remote_schema),
374 literalizer: serialized_literalizer,
375 connection: serialized_conn,
376 },
377 ),
378 ),
379 };
380
381 proto.encode(buf).map_err(|e| {
382 DataFusionError::Internal(format!(
383 "Failed to encode remote table insert exec node: {e:?}"
384 ))
385 })?;
386
387 Ok(())
388 } else if let Some(exec) = node.as_any().downcast_ref::<DataSourceExec>() {
389 let source = exec.data_source();
390 if let Some(memory_source) = source.as_any().downcast_ref::<MemorySourceConfig>() {
391 let proto_partitions = serialize_partitions(memory_source.partitions())?;
392 let projection = serialize_projection(memory_source.projection().as_ref());
393 let sort_information = memory_source
394 .sort_information()
395 .iter()
396 .map(|ordering| {
397 let sort_exprs = serialize_physical_sort_exprs(ordering.clone(), self)?;
398 Ok::<_, DataFusionError>(
399 datafusion_proto::protobuf::PhysicalSortExprNodeCollection {
400 physical_sort_expr_nodes: sort_exprs,
401 },
402 )
403 })
404 .collect::<Result<Vec<_>, _>>()?;
405
406 let proto = protobuf::RemoteTablePhysicalPlanNode {
407 remote_table_physical_plan_type: Some(
408 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::MemoryDatasource(protobuf::MemoryDatasourceNode {
409 partitions: proto_partitions,
410 schema: Some(memory_source.original_schema().try_into()?),
411 projection,
412 sort_information,
413 show_sizes: memory_source.show_sizes(),
414 fetch: memory_source.fetch().map(|f| f as u32),
415 }),
416 ),
417 };
418
419 proto.encode(buf).map_err(|e| {
420 DataFusionError::Internal(format!(
421 "Failed to encode memory datasource node: {e:?}"
422 ))
423 })?;
424
425 Ok(())
426 } else {
427 Err(DataFusionError::NotImplemented(format!(
428 "RemotePhysicalCodec only support encoding MemorySourceConfig, got {source:?}",
429 )))
430 }
431 } else {
432 Err(DataFusionError::NotImplemented(format!(
433 "RemotePhysicalCodec does not support encoding {}",
434 node.name()
435 )))
436 }
437 }
438}
439
440fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
441 match options {
442 #[cfg(feature = "postgres")]
443 ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
444 connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
445 protobuf::PostgresConnectionOptions {
446 host: options.host.clone(),
447 port: options.port as u32,
448 username: options.username.clone(),
449 password: options.password.clone(),
450 database: options.database.clone(),
451 pool_max_size: options.pool_max_size as u32,
452 stream_chunk_size: options.stream_chunk_size as u32,
453 default_numeric_scale: options.default_numeric_scale as i32,
454 },
455 )),
456 },
457 #[cfg(feature = "mysql")]
458 ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
459 connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
460 protobuf::MysqlConnectionOptions {
461 host: options.host.clone(),
462 port: options.port as u32,
463 username: options.username.clone(),
464 password: options.password.clone(),
465 database: options.database.clone(),
466 pool_max_size: options.pool_max_size as u32,
467 stream_chunk_size: options.stream_chunk_size as u32,
468 },
469 )),
470 },
471 #[cfg(feature = "oracle")]
472 ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
473 connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
474 protobuf::OracleConnectionOptions {
475 host: options.host.clone(),
476 port: options.port as u32,
477 username: options.username.clone(),
478 password: options.password.clone(),
479 service_name: options.service_name.clone(),
480 pool_max_size: options.pool_max_size as u32,
481 stream_chunk_size: options.stream_chunk_size as u32,
482 },
483 )),
484 },
485 #[cfg(feature = "sqlite")]
486 ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
487 connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
488 protobuf::SqliteConnectionOptions {
489 path: options.path.to_str().unwrap().to_string(),
490 stream_chunk_size: options.stream_chunk_size as u32,
491 },
492 )),
493 },
494 #[cfg(feature = "dm")]
495 ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
496 connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
497 protobuf::DmConnectionOptions {
498 host: options.host.clone(),
499 port: options.port as u32,
500 username: options.username.clone(),
501 password: options.password.clone(),
502 schema: options.schema.clone(),
503 stream_chunk_size: options.stream_chunk_size as u32,
504 driver: options.driver.clone(),
505 },
506 )),
507 },
508 }
509}
510
511fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
512 match options.connection_options {
513 #[cfg(feature = "postgres")]
514 Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
515 ConnectionOptions::Postgres(PostgresConnectionOptions {
516 host: options.host,
517 port: options.port as u16,
518 username: options.username,
519 password: options.password,
520 database: options.database,
521 pool_max_size: options.pool_max_size as usize,
522 stream_chunk_size: options.stream_chunk_size as usize,
523 default_numeric_scale: options.default_numeric_scale as i8,
524 })
525 }
526 #[cfg(feature = "mysql")]
527 Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
528 ConnectionOptions::Mysql(MysqlConnectionOptions {
529 host: options.host,
530 port: options.port as u16,
531 username: options.username,
532 password: options.password,
533 database: options.database,
534 pool_max_size: options.pool_max_size as usize,
535 stream_chunk_size: options.stream_chunk_size as usize,
536 })
537 }
538 #[cfg(feature = "oracle")]
539 Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
540 ConnectionOptions::Oracle(OracleConnectionOptions {
541 host: options.host,
542 port: options.port as u16,
543 username: options.username,
544 password: options.password,
545 service_name: options.service_name,
546 pool_max_size: options.pool_max_size as usize,
547 stream_chunk_size: options.stream_chunk_size as usize,
548 })
549 }
550 #[cfg(feature = "sqlite")]
551 Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
552 ConnectionOptions::Sqlite(SqliteConnectionOptions {
553 path: std::path::Path::new(&options.path).to_path_buf(),
554 stream_chunk_size: options.stream_chunk_size as usize,
555 })
556 }
557 #[cfg(feature = "dm")]
558 Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
559 ConnectionOptions::Dm(DmConnectionOptions {
560 host: options.host,
561 port: options.port as u16,
562 username: options.username,
563 password: options.password,
564 schema: options.schema,
565 stream_chunk_size: options.stream_chunk_size as usize,
566 driver: options.driver,
567 })
568 }
569 _ => panic!("Failed to parse connection options: {options:?}"),
570 }
571}
572
573fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<protobuf::Projection> {
574 projection.map(|p| protobuf::Projection {
575 projection: p.iter().map(|n| *n as u32).collect(),
576 })
577}
578
579fn parse_projection(projection: Option<&protobuf::Projection>) -> Option<Vec<usize>> {
580 projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
581}
582
583fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
584 let fields = remote_schema
585 .fields
586 .iter()
587 .map(serialize_remote_field)
588 .collect::<Vec<_>>();
589
590 protobuf::RemoteSchema { fields }
591}
592
593fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
594 protobuf::RemoteField {
595 name: remote_field.name.clone(),
596 remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
597 nullable: remote_field.nullable,
598 auto_increment: remote_field.auto_increment,
599 }
600}
601
602fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
603 match remote_type {
604 RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
605 r#type: Some(protobuf::remote_type::Type::PostgresInt2(
606 protobuf::PostgresInt2 {},
607 )),
608 },
609 RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
610 r#type: Some(protobuf::remote_type::Type::PostgresInt4(
611 protobuf::PostgresInt4 {},
612 )),
613 },
614 RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
615 r#type: Some(protobuf::remote_type::Type::PostgresInt8(
616 protobuf::PostgresInt8 {},
617 )),
618 },
619 RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
620 r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
621 protobuf::PostgresFloat4 {},
622 )),
623 },
624 RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
625 r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
626 protobuf::PostgresFloat8 {},
627 )),
628 },
629 RemoteType::Postgres(PostgresType::Numeric(precision, scale)) => protobuf::RemoteType {
630 r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
631 protobuf::PostgresNumeric {
632 precision: *precision as u32,
633 scale: *scale as i32,
634 },
635 )),
636 },
637 RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
638 r#type: Some(protobuf::remote_type::Type::PostgresName(
639 protobuf::PostgresName {},
640 )),
641 },
642 RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
643 r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
644 protobuf::PostgresVarchar {},
645 )),
646 },
647 RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
648 r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
649 protobuf::PostgresBpchar {},
650 )),
651 },
652 RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
653 r#type: Some(protobuf::remote_type::Type::PostgresText(
654 protobuf::PostgresText {},
655 )),
656 },
657 RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
658 r#type: Some(protobuf::remote_type::Type::PostgresBytea(
659 protobuf::PostgresBytea {},
660 )),
661 },
662 RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
663 r#type: Some(protobuf::remote_type::Type::PostgresDate(
664 protobuf::PostgresDate {},
665 )),
666 },
667 RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
668 r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
669 protobuf::PostgresTimestamp {},
670 )),
671 },
672 RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
673 r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
674 protobuf::PostgresTimestampTz {},
675 )),
676 },
677 RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
678 r#type: Some(protobuf::remote_type::Type::PostgresTime(
679 protobuf::PostgresTime {},
680 )),
681 },
682 RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
683 r#type: Some(protobuf::remote_type::Type::PostgresInterval(
684 protobuf::PostgresInterval {},
685 )),
686 },
687 RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
688 r#type: Some(protobuf::remote_type::Type::PostgresBool(
689 protobuf::PostgresBool {},
690 )),
691 },
692 RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
693 r#type: Some(protobuf::remote_type::Type::PostgresJson(
694 protobuf::PostgresJson {},
695 )),
696 },
697 RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
698 r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
699 protobuf::PostgresJsonb {},
700 )),
701 },
702 RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
703 r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
704 protobuf::PostgresInt2Array {},
705 )),
706 },
707 RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
708 r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
709 protobuf::PostgresInt4Array {},
710 )),
711 },
712 RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
713 r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
714 protobuf::PostgresInt8Array {},
715 )),
716 },
717 RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
718 r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
719 protobuf::PostgresFloat4Array {},
720 )),
721 },
722 RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
723 r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
724 protobuf::PostgresFloat8Array {},
725 )),
726 },
727 RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
728 r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
729 protobuf::PostgresVarcharArray {},
730 )),
731 },
732 RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
733 r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
734 protobuf::PostgresBpcharArray {},
735 )),
736 },
737 RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
738 r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
739 protobuf::PostgresTextArray {},
740 )),
741 },
742 RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
743 r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
744 protobuf::PostgresByteaArray {},
745 )),
746 },
747 RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
748 r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
749 protobuf::PostgresBoolArray {},
750 )),
751 },
752 RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
753 r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
754 protobuf::PostgresPostGisGeometry {},
755 )),
756 },
757 RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
758 r#type: Some(protobuf::remote_type::Type::PostgresOid(
759 protobuf::PostgresOid {},
760 )),
761 },
762 RemoteType::Postgres(PostgresType::Xml) => protobuf::RemoteType {
763 r#type: Some(protobuf::remote_type::Type::PostgresXml(
764 protobuf::PostgresXml {},
765 )),
766 },
767 RemoteType::Postgres(PostgresType::Uuid) => protobuf::RemoteType {
768 r#type: Some(protobuf::remote_type::Type::PostgresUuid(
769 protobuf::PostgresUuid {},
770 )),
771 },
772
773 RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
774 r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
775 protobuf::MysqlTinyInt {},
776 )),
777 },
778 RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
779 r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
780 protobuf::MysqlTinyIntUnsigned {},
781 )),
782 },
783 RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
784 r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
785 protobuf::MysqlSmallInt {},
786 )),
787 },
788 RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
789 r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
790 protobuf::MysqlSmallIntUnsigned {},
791 )),
792 },
793 RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
794 r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
795 protobuf::MysqlMediumInt {},
796 )),
797 },
798 RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
799 r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
800 protobuf::MysqlMediumIntUnsigned {},
801 )),
802 },
803 RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
804 r#type: Some(protobuf::remote_type::Type::MysqlInteger(
805 protobuf::MysqlInteger {},
806 )),
807 },
808 RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
809 r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
810 protobuf::MysqlIntegerUnsigned {},
811 )),
812 },
813 RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
814 r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
815 protobuf::MysqlBigInt {},
816 )),
817 },
818 RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
819 r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
820 protobuf::MysqlBigIntUnsigned {},
821 )),
822 },
823 RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
824 r#type: Some(protobuf::remote_type::Type::MysqlFloat(
825 protobuf::MysqlFloat {},
826 )),
827 },
828 RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
829 r#type: Some(protobuf::remote_type::Type::MysqlDouble(
830 protobuf::MysqlDouble {},
831 )),
832 },
833 RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
834 r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
835 protobuf::MysqlDecimal {
836 precision: *precision as u32,
837 scale: *scale as u32,
838 },
839 )),
840 },
841 RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
842 r#type: Some(protobuf::remote_type::Type::MysqlDate(
843 protobuf::MysqlDate {},
844 )),
845 },
846 RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
847 r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
848 protobuf::MysqlDateTime {},
849 )),
850 },
851 RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
852 r#type: Some(protobuf::remote_type::Type::MysqlTime(
853 protobuf::MysqlTime {},
854 )),
855 },
856 RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
857 r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
858 protobuf::MysqlTimestamp {},
859 )),
860 },
861 RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
862 r#type: Some(protobuf::remote_type::Type::MysqlYear(
863 protobuf::MysqlYear {},
864 )),
865 },
866 RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
867 r#type: Some(protobuf::remote_type::Type::MysqlChar(
868 protobuf::MysqlChar {},
869 )),
870 },
871 RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
872 r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
873 protobuf::MysqlVarchar {},
874 )),
875 },
876 RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
877 r#type: Some(protobuf::remote_type::Type::MysqlBinary(
878 protobuf::MysqlBinary {},
879 )),
880 },
881 RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
882 r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
883 protobuf::MysqlVarbinary {},
884 )),
885 },
886 RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
887 r#type: Some(protobuf::remote_type::Type::MysqlText(
888 protobuf::MysqlText { length: *len },
889 )),
890 },
891 RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
892 r#type: Some(protobuf::remote_type::Type::MysqlBlob(
893 protobuf::MysqlBlob { length: *len },
894 )),
895 },
896 RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
897 r#type: Some(protobuf::remote_type::Type::MysqlJson(
898 protobuf::MysqlJson {},
899 )),
900 },
901 RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
902 r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
903 protobuf::MysqlGeometry {},
904 )),
905 },
906
907 RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
908 r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
909 protobuf::OracleVarchar2 { length: *len },
910 )),
911 },
912 RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
913 r#type: Some(protobuf::remote_type::Type::OracleChar(
914 protobuf::OracleChar { length: *len },
915 )),
916 },
917 RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
918 r#type: Some(protobuf::remote_type::Type::OracleNumber(
919 protobuf::OracleNumber {
920 precision: *precision as u32,
921 scale: *scale as i32,
922 },
923 )),
924 },
925 RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
926 r#type: Some(protobuf::remote_type::Type::OracleDate(
927 protobuf::OracleDate {},
928 )),
929 },
930 RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
931 r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
932 protobuf::OracleTimestamp {},
933 )),
934 },
935 RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
936 r#type: Some(protobuf::remote_type::Type::OracleBoolean(
937 protobuf::OracleBoolean {},
938 )),
939 },
940 RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
941 r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
942 protobuf::OracleBinaryFloat {},
943 )),
944 },
945 RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
946 r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
947 protobuf::OracleBinaryDouble {},
948 )),
949 },
950 RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
951 r#type: Some(protobuf::remote_type::Type::OracleBlob(
952 protobuf::OracleBlob {},
953 )),
954 },
955 RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
956 r#type: Some(protobuf::remote_type::Type::OracleFloat(
957 protobuf::OracleFloat {
958 precision: *precision as u32,
959 },
960 )),
961 },
962 RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
963 r#type: Some(protobuf::remote_type::Type::OracleNchar(
964 protobuf::OracleNChar { length: *len },
965 )),
966 },
967 RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
968 r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
969 protobuf::OracleNVarchar2 { length: *len },
970 )),
971 },
972 RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
973 r#type: Some(protobuf::remote_type::Type::OracleRaw(
974 protobuf::OracleRaw { length: *len },
975 )),
976 },
977 RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
978 r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
979 protobuf::OracleLongRaw {},
980 )),
981 },
982 RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
983 r#type: Some(protobuf::remote_type::Type::OracleLong(
984 protobuf::OracleLong {},
985 )),
986 },
987 RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
988 r#type: Some(protobuf::remote_type::Type::OracleClob(
989 protobuf::OracleClob {},
990 )),
991 },
992 RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
993 r#type: Some(protobuf::remote_type::Type::OracleNclob(
994 protobuf::OracleNClob {},
995 )),
996 },
997 RemoteType::Oracle(OracleType::SdeGeometry) => protobuf::RemoteType {
998 r#type: Some(protobuf::remote_type::Type::OracleSdeGeometry(
999 protobuf::OracleSdeGeometry {},
1000 )),
1001 },
1002 RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
1003 r#type: Some(protobuf::remote_type::Type::SqliteNull(
1004 protobuf::SqliteNull {},
1005 )),
1006 },
1007 RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
1008 r#type: Some(protobuf::remote_type::Type::SqliteInteger(
1009 protobuf::SqliteInteger {},
1010 )),
1011 },
1012 RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
1013 r#type: Some(protobuf::remote_type::Type::SqliteReal(
1014 protobuf::SqliteReal {},
1015 )),
1016 },
1017 RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
1018 r#type: Some(protobuf::remote_type::Type::SqliteText(
1019 protobuf::SqliteText {},
1020 )),
1021 },
1022 RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
1023 r#type: Some(protobuf::remote_type::Type::SqliteBlob(
1024 protobuf::SqliteBlob {},
1025 )),
1026 },
1027 RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
1028 r#type: Some(protobuf::remote_type::Type::DmTinyInt(
1029 protobuf::DmTinyInt {},
1030 )),
1031 },
1032 RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
1033 r#type: Some(protobuf::remote_type::Type::DmSmallInt(
1034 protobuf::DmSmallInt {},
1035 )),
1036 },
1037 RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
1038 r#type: Some(protobuf::remote_type::Type::DmInteger(
1039 protobuf::DmInteger {},
1040 )),
1041 },
1042 RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
1043 r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
1044 },
1045 RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
1046 r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
1047 },
1048 RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
1049 r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
1050 },
1051 RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
1052 r#type: Some(protobuf::remote_type::Type::DmNumeric(
1053 protobuf::DmNumeric {
1054 precision: *precision as u32,
1055 scale: *scale as i32,
1056 },
1057 )),
1058 },
1059 RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
1060 r#type: Some(protobuf::remote_type::Type::DmDecimal(
1061 protobuf::DmDecimal {
1062 precision: *precision as u32,
1063 scale: *scale as i32,
1064 },
1065 )),
1066 },
1067 RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
1068 r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
1069 length: len.map(|s| s as u32),
1070 })),
1071 },
1072 RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
1073 r#type: Some(protobuf::remote_type::Type::DmVarchar(
1074 protobuf::DmVarchar {
1075 length: len.map(|s| s as u32),
1076 },
1077 )),
1078 },
1079 RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
1080 r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
1081 },
1082 RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
1083 r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
1084 length: *len as u32,
1085 })),
1086 },
1087 RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
1088 r#type: Some(protobuf::remote_type::Type::DmVarbinary(
1089 protobuf::DmVarbinary {
1090 length: len.map(|s| s as u32),
1091 },
1092 )),
1093 },
1094 RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
1095 r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
1096 },
1097 RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
1098 r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
1099 },
1100 RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
1101 r#type: Some(protobuf::remote_type::Type::DmTimestamp(
1102 protobuf::DmTimestamp {
1103 precision: *precision as u32,
1104 },
1105 )),
1106 },
1107 RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
1108 r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
1109 precision: *precision as u32,
1110 })),
1111 },
1112 RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
1113 r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
1114 },
1115 }
1116}
1117
1118fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
1119 let fields = remote_schema
1120 .fields
1121 .iter()
1122 .map(parse_remote_field)
1123 .collect::<Vec<_>>();
1124
1125 RemoteSchema { fields }
1126}
1127
1128fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
1129 RemoteField {
1130 name: field.name.clone(),
1131 remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
1132 nullable: field.nullable,
1133 auto_increment: field.auto_increment,
1134 }
1135}
1136
1137fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
1138 match remote_type.r#type.as_ref().unwrap() {
1139 protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
1140 protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
1141 protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
1142 protobuf::remote_type::Type::PostgresFloat4(_) => {
1143 RemoteType::Postgres(PostgresType::Float4)
1144 }
1145 protobuf::remote_type::Type::PostgresFloat8(_) => {
1146 RemoteType::Postgres(PostgresType::Float8)
1147 }
1148 protobuf::remote_type::Type::PostgresNumeric(numeric) => RemoteType::Postgres(
1149 PostgresType::Numeric(numeric.precision as u8, numeric.scale as i8),
1150 ),
1151 protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
1152 protobuf::remote_type::Type::PostgresVarchar(_) => {
1153 RemoteType::Postgres(PostgresType::Varchar)
1154 }
1155 protobuf::remote_type::Type::PostgresBpchar(_) => {
1156 RemoteType::Postgres(PostgresType::Bpchar)
1157 }
1158 protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
1159 protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
1160 protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
1161 protobuf::remote_type::Type::PostgresTimestamp(_) => {
1162 RemoteType::Postgres(PostgresType::Timestamp)
1163 }
1164 protobuf::remote_type::Type::PostgresTimestampTz(_) => {
1165 RemoteType::Postgres(PostgresType::TimestampTz)
1166 }
1167 protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
1168 protobuf::remote_type::Type::PostgresInterval(_) => {
1169 RemoteType::Postgres(PostgresType::Interval)
1170 }
1171 protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
1172 protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
1173 protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
1174 protobuf::remote_type::Type::PostgresInt2Array(_) => {
1175 RemoteType::Postgres(PostgresType::Int2Array)
1176 }
1177 protobuf::remote_type::Type::PostgresInt4Array(_) => {
1178 RemoteType::Postgres(PostgresType::Int4Array)
1179 }
1180 protobuf::remote_type::Type::PostgresInt8Array(_) => {
1181 RemoteType::Postgres(PostgresType::Int8Array)
1182 }
1183 protobuf::remote_type::Type::PostgresFloat4Array(_) => {
1184 RemoteType::Postgres(PostgresType::Float4Array)
1185 }
1186 protobuf::remote_type::Type::PostgresFloat8Array(_) => {
1187 RemoteType::Postgres(PostgresType::Float8Array)
1188 }
1189 protobuf::remote_type::Type::PostgresVarcharArray(_) => {
1190 RemoteType::Postgres(PostgresType::VarcharArray)
1191 }
1192 protobuf::remote_type::Type::PostgresBpcharArray(_) => {
1193 RemoteType::Postgres(PostgresType::BpcharArray)
1194 }
1195 protobuf::remote_type::Type::PostgresTextArray(_) => {
1196 RemoteType::Postgres(PostgresType::TextArray)
1197 }
1198 protobuf::remote_type::Type::PostgresByteaArray(_) => {
1199 RemoteType::Postgres(PostgresType::ByteaArray)
1200 }
1201 protobuf::remote_type::Type::PostgresBoolArray(_) => {
1202 RemoteType::Postgres(PostgresType::BoolArray)
1203 }
1204 protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
1205 RemoteType::Postgres(PostgresType::PostGisGeometry)
1206 }
1207 protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
1208 protobuf::remote_type::Type::PostgresXml(_) => RemoteType::Postgres(PostgresType::Xml),
1209 protobuf::remote_type::Type::PostgresUuid(_) => RemoteType::Postgres(PostgresType::Uuid),
1210 protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
1211 protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
1212 RemoteType::Mysql(MysqlType::TinyIntUnsigned)
1213 }
1214 protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
1215 protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
1216 RemoteType::Mysql(MysqlType::SmallIntUnsigned)
1217 }
1218 protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
1219 protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
1220 RemoteType::Mysql(MysqlType::MediumIntUnsigned)
1221 }
1222 protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
1223 protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
1224 RemoteType::Mysql(MysqlType::IntegerUnsigned)
1225 }
1226 protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
1227 protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
1228 RemoteType::Mysql(MysqlType::BigIntUnsigned)
1229 }
1230 protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
1231 protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
1232 protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
1233 MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
1234 ),
1235 protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
1236 protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
1237 protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
1238 protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
1239 protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
1240 protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
1241 protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
1242 protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
1243 protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
1244 protobuf::remote_type::Type::MysqlText(text) => {
1245 RemoteType::Mysql(MysqlType::Text(text.length))
1246 }
1247 protobuf::remote_type::Type::MysqlBlob(blob) => {
1248 RemoteType::Mysql(MysqlType::Blob(blob.length))
1249 }
1250 protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
1251 protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
1252 protobuf::remote_type::Type::OracleVarchar2(varchar) => {
1253 RemoteType::Oracle(OracleType::Varchar2(varchar.length))
1254 }
1255 protobuf::remote_type::Type::OracleChar(char) => {
1256 RemoteType::Oracle(OracleType::Char(char.length))
1257 }
1258 protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
1259 OracleType::Number(number.precision as u8, number.scale as i8),
1260 ),
1261 protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
1262 protobuf::remote_type::Type::OracleTimestamp(_) => {
1263 RemoteType::Oracle(OracleType::Timestamp)
1264 }
1265 protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
1266 protobuf::remote_type::Type::OracleBinaryFloat(_) => {
1267 RemoteType::Oracle(OracleType::BinaryFloat)
1268 }
1269 protobuf::remote_type::Type::OracleBinaryDouble(_) => {
1270 RemoteType::Oracle(OracleType::BinaryDouble)
1271 }
1272 protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
1273 RemoteType::Oracle(OracleType::Float(*precision as u8))
1274 }
1275 protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
1276 RemoteType::Oracle(OracleType::NChar(*length))
1277 }
1278 protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
1279 RemoteType::Oracle(OracleType::NVarchar2(*length))
1280 }
1281 protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
1282 RemoteType::Oracle(OracleType::Raw(*length))
1283 }
1284 protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
1285 protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
1286 protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
1287 protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
1288 protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
1289 protobuf::remote_type::Type::OracleSdeGeometry(_) => {
1290 RemoteType::Oracle(OracleType::SdeGeometry)
1291 }
1292 protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1293 protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1294 protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1295 protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1296 protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1297 protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1298 protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1299 protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1300 protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1301 protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1302 protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1303 protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1304 RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1305 }
1306 protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1307 RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1308 }
1309 protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1310 RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1311 }
1312 protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1313 RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1314 }
1315 protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1316 protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1317 RemoteType::Dm(DmType::Binary(*length as u16))
1318 }
1319 protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1320 RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1321 }
1322 protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1323 protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1324 protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1325 RemoteType::Dm(DmType::Timestamp(*precision as u8))
1326 }
1327 protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1328 RemoteType::Dm(DmType::Time(*precision as u8))
1329 }
1330 protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1331 }
1332}
1333
1334fn serialize_remote_source(source: &RemoteSource) -> protobuf::RemoteSource {
1335 match source {
1336 RemoteSource::Query(query) => protobuf::RemoteSource {
1337 source: Some(protobuf::remote_source::Source::Query(query.clone())),
1338 },
1339 RemoteSource::Table(table_identifiers) => protobuf::RemoteSource {
1340 source: Some(protobuf::remote_source::Source::Table(
1341 protobuf::Identifiers {
1342 idents: table_identifiers.clone(),
1343 },
1344 )),
1345 },
1346 }
1347}
1348
1349fn parse_remote_source(source: &protobuf::RemoteSource) -> DFResult<RemoteSource> {
1350 let source = source.source.as_ref().ok_or(DataFusionError::Internal(
1351 "remote source is not set".to_string(),
1352 ))?;
1353 match source {
1354 protobuf::remote_source::Source::Query(query) => Ok(RemoteSource::Query(query.clone())),
1355 protobuf::remote_source::Source::Table(table_identifiers) => {
1356 Ok(RemoteSource::Table(table_identifiers.idents.clone()))
1357 }
1358 }
1359}
1360
1361fn serialize_partitions(partitions: &[Vec<RecordBatch>]) -> Result<Vec<Vec<u8>>, DataFusionError> {
1362 let mut proto_partitions = vec![];
1363 for partition in partitions {
1364 if partition.is_empty() {
1365 proto_partitions.push(vec![]);
1366 continue;
1367 }
1368 let mut proto_partition = vec![];
1369 let mut stream_writer =
1370 StreamWriter::try_new(&mut proto_partition, &partition[0].schema())?;
1371 for batch in partition {
1372 stream_writer.write(batch)?;
1373 }
1374 stream_writer.finish()?;
1375 proto_partitions.push(proto_partition);
1376 }
1377 Ok(proto_partitions)
1378}
1379
1380fn parse_partitions(
1381 proto_partitions: &[Vec<u8>],
1382) -> Result<Vec<Vec<RecordBatch>>, DataFusionError> {
1383 let mut partitions = vec![];
1384 for proto_partition in proto_partitions {
1385 if proto_partition.is_empty() {
1386 partitions.push(vec![]);
1387 continue;
1388 }
1389 let mut partition = vec![];
1390 let stream_reader = StreamReader::try_new(proto_partition.as_slice(), None)?;
1391 for batch in stream_reader {
1392 partition.push(batch?);
1393 }
1394 partitions.push(partition);
1395 }
1396 Ok(partitions)
1397}