1use crate::DmConnectionOptions;
2use crate::MysqlConnectionOptions;
3use crate::OracleConnectionOptions;
4use crate::PostgresConnectionOptions;
5use crate::SqliteConnectionOptions;
6use crate::generated::prost as protobuf;
7use crate::{
8 Connection, ConnectionOptions, DFResult, DefaultLiteralizer, DefaultTransform, DmType,
9 Literalize, MysqlType, OracleType, PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef,
10 RemoteSource, RemoteTableInsertExec, RemoteTableScanExec, RemoteType, SqliteType, Transform,
11 connect,
12};
13use datafusion::arrow::array::RecordBatch;
14use datafusion::arrow::datatypes::{Schema, SchemaRef};
15use datafusion::arrow::ipc::reader::StreamReader;
16use datafusion::arrow::ipc::writer::StreamWriter;
17use datafusion::catalog::memory::{DataSourceExec, MemorySourceConfig};
18use datafusion::common::DataFusionError;
19use datafusion::datasource::source::DataSource;
20use datafusion::execution::FunctionRegistry;
21use datafusion::physical_expr::LexOrdering;
22use datafusion::physical_plan::ExecutionPlan;
23use datafusion_proto::convert_required;
24use datafusion_proto::physical_plan::PhysicalExtensionCodec;
25use datafusion_proto::physical_plan::from_proto::parse_physical_sort_exprs;
26use datafusion_proto::physical_plan::to_proto::serialize_physical_sort_exprs;
27use datafusion_proto::protobuf::proto_error;
28use derive_with::With;
29use log::debug;
30use prost::Message;
31use std::fmt::Debug;
32use std::sync::Arc;
33use std::time::Duration;
34
35pub trait TransformCodec: Debug + Send + Sync {
36 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
37 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
38}
39
40#[derive(Debug)]
41pub struct DefaultTransformCodec {}
42
43const DEFAULT_TRANSFORM_ID: &str = "__default";
44
45impl TransformCodec for DefaultTransformCodec {
46 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
47 if value.as_any().is::<DefaultTransform>() {
48 Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
49 } else {
50 Err(DataFusionError::Execution(format!(
51 "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
52 )))
53 }
54 }
55
56 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
57 if value == DEFAULT_TRANSFORM_ID.as_bytes() {
58 Ok(Arc::new(DefaultTransform {}))
59 } else {
60 Err(DataFusionError::Execution(
61 "DefaultTransformCodec only supports DefaultTransform".to_string(),
62 ))
63 }
64 }
65}
66
67pub trait LiteralizeCodec: Debug + Send + Sync {
68 fn try_encode(&self, value: &dyn Literalize) -> DFResult<Vec<u8>>;
69 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Literalize>>;
70}
71
72#[derive(Debug)]
73pub struct DefaultLiteralizeCodec {}
74
75const DEFAULT_LITERALIZE_ID: &str = "__default";
76
77impl LiteralizeCodec for DefaultLiteralizeCodec {
78 fn try_encode(&self, value: &dyn Literalize) -> DFResult<Vec<u8>> {
79 if value.as_any().is::<DefaultLiteralizer>() {
80 Ok(DEFAULT_LITERALIZE_ID.as_bytes().to_vec())
81 } else {
82 Err(DataFusionError::Execution(format!(
83 "DefaultLiteralizeCodec does not support literalize: {value:?}, please implement a custom LiteralizeCodec."
84 )))
85 }
86 }
87
88 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Literalize>> {
89 if value == DEFAULT_LITERALIZE_ID.as_bytes() {
90 Ok(Arc::new(DefaultLiteralizer {}))
91 } else {
92 Err(DataFusionError::Execution(
93 "DefaultLiteralizeCodec only supports DefaultLiteralizer".to_string(),
94 ))
95 }
96 }
97}
98
99pub trait ConnectionCodec: Debug + Send + Sync {
100 fn try_encode(
101 &self,
102 value: &dyn Connection,
103 conn_options: &ConnectionOptions,
104 ) -> DFResult<Vec<u8>>;
105 fn try_decode(
106 &self,
107 value: &[u8],
108 conn_options: &ConnectionOptions,
109 ) -> DFResult<Arc<dyn Connection>>;
110}
111
112#[derive(Debug)]
113pub struct DefaultConnectionCodec;
114
115const DEFAULT_CONNECTION_VALUE: &str = "__default";
116
117impl ConnectionCodec for DefaultConnectionCodec {
118 fn try_encode(
119 &self,
120 _value: &dyn Connection,
121 _conn_options: &ConnectionOptions,
122 ) -> DFResult<Vec<u8>> {
123 Ok(DEFAULT_CONNECTION_VALUE.as_bytes().to_vec())
124 }
125
126 fn try_decode(
127 &self,
128 value: &[u8],
129 conn_options: &ConnectionOptions,
130 ) -> DFResult<Arc<dyn Connection>> {
131 if value != DEFAULT_CONNECTION_VALUE.as_bytes() {
132 return Err(DataFusionError::Execution(format!(
133 "DefaultConnectionCodec only supports {DEFAULT_CONNECTION_VALUE} value but got: {value:?}"
134 )));
135 }
136 let conn_options = conn_options.clone().with_pool_max_size(1);
137 tokio::task::block_in_place(|| {
138 tokio::runtime::Handle::current().block_on(async {
139 let pool = connect(&conn_options).await?;
140 let conn = pool.get().await?;
141 Ok::<_, DataFusionError>(conn)
142 })
143 })
144 }
145}
146
147#[derive(Debug, With)]
148pub struct RemotePhysicalCodec {
149 pub transform_codec: Arc<dyn TransformCodec>,
150 pub literalize_codec: Arc<dyn LiteralizeCodec>,
151 pub connection_codec: Arc<dyn ConnectionCodec>,
152}
153
154impl RemotePhysicalCodec {
155 pub fn new() -> Self {
156 Self {
157 transform_codec: Arc::new(DefaultTransformCodec {}),
158 literalize_codec: Arc::new(DefaultLiteralizeCodec {}),
159 connection_codec: Arc::new(DefaultConnectionCodec {}),
160 }
161 }
162}
163
164impl Default for RemotePhysicalCodec {
165 fn default() -> Self {
166 Self::new()
167 }
168}
169
170impl PhysicalExtensionCodec for RemotePhysicalCodec {
171 fn try_decode(
172 &self,
173 buf: &[u8],
174 inputs: &[Arc<dyn ExecutionPlan>],
175 registry: &dyn FunctionRegistry,
176 ) -> DFResult<Arc<dyn ExecutionPlan>> {
177 let remote_table_node =
178 protobuf::RemoteTablePhysicalPlanNode::decode(buf).map_err(|e| {
179 DataFusionError::Internal(format!(
180 "Failed to decode remote table physical plan node: {e:?}"
181 ))
182 })?;
183 let remote_table_plan = remote_table_node.remote_table_physical_plan_type.ok_or_else(|| {
184 DataFusionError::Internal(
185 "Failed to decode remote table physical plan node due to physical plan type is none".to_string()
186 )
187 })?;
188
189 match remote_table_plan {
190 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(proto) => {
191 let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
192 Arc::new(DefaultTransform {})
193 } else {
194 self.transform_codec.try_decode(&proto.transform)?
195 };
196
197 let source = parse_remote_source(proto.source.as_ref().ok_or(DataFusionError::Internal(
198 "remote source is not set".to_string(),
199 ))?)?;
200
201 let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
202 let remote_schema = proto
203 .remote_schema
204 .map(|schema| Arc::new(parse_remote_schema(&schema)));
205
206 let projection: Option<Vec<usize>> = proto
207 .projection
208 .map(|p| p.projection.iter().map(|n| *n as usize).collect());
209
210 let limit = proto.limit.map(|l| l as usize);
211
212 let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
213
214 let now = std::time::Instant::now();
215 let conn = self
216 .connection_codec
217 .try_decode(&proto.connection, &conn_options)?;
218 debug!(
219 "[remote-table] Decoding connection cost {}ms",
220 now.elapsed().as_millis()
221 );
222
223 Ok(Arc::new(RemoteTableScanExec::try_new(
224 conn_options,
225 source,
226 table_schema,
227 remote_schema,
228 projection,
229 proto.unparsed_filters,
230 limit,
231 transform,
232 conn,
233 )?))
234 }
235 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(proto) => {
236
237 if inputs.len() != 1 {
238 return Err(DataFusionError::Internal(
239 "RemoteTableInsertExec only support one input".to_string(),
240 ));
241 }
242
243 let input = inputs[0].clone();
244
245 let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
246 let remote_schema = Arc::new(parse_remote_schema(&proto.remote_schema.unwrap()));
247 let table = proto.table.unwrap().idents;
248
249 let literalizer = if proto.literalizer == DEFAULT_LITERALIZE_ID.as_bytes() {
250 Arc::new(DefaultLiteralizer {})
251 } else {
252 self.literalize_codec.try_decode(&proto.literalizer)?
253 };
254
255 let now = std::time::Instant::now();
256 let conn = self
257 .connection_codec
258 .try_decode(&proto.connection, &conn_options)?;
259 debug!(
260 "[remote-table] Decoding connection cost {}ms",
261 now.elapsed().as_millis()
262 );
263
264 Ok(Arc::new(RemoteTableInsertExec::new(
265 input,
266 conn_options,
267 literalizer,
268 table,
269 remote_schema,
270 conn,
271 )))
272 }
273 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::MemoryDatasource(proto) => {
274 let partitions = parse_partitions(&proto.partitions)?;
275 let schema = Schema::try_from(&proto.schema.unwrap())?;
276 let projection = parse_projection(proto.projection.as_ref());
277
278 let sort_information = proto
279 .sort_information
280 .iter()
281 .map(|sort_exprs| {
282 let sort_exprs = parse_physical_sort_exprs(
283 sort_exprs.physical_sort_expr_nodes.as_slice(),
284 registry,
285 &schema,
286 self,
287 )?;
288 let lex_ordering = LexOrdering::new(sort_exprs).expect("lex ordering is not empty");
289 Ok::<_, DataFusionError>(lex_ordering)
290 })
291 .collect::<Result<Vec<_>, _>>()?;
292
293 let show_sizes = proto.show_sizes;
294 let fetch = proto.fetch.map(|f| f as usize);
295 let memory_source =
296 MemorySourceConfig::try_new(&partitions, Arc::new(schema), projection)?
297 .with_show_sizes(show_sizes)
298 .with_limit(fetch);
299
300 let memory_source =
301 MemorySourceConfig::try_with_sort_information(memory_source, sort_information)?;
302 Ok(DataSourceExec::from_data_source(memory_source))
303 }
304 }
305 }
306
307 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
308 if let Some(exec) = node.as_any().downcast_ref::<RemoteTableScanExec>() {
309 let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
310 DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
311 } else {
312 let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
313 assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
314 bytes
315 };
316
317 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
318 let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
319
320 let serialized_conn = self
321 .connection_codec
322 .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
323
324 let serialized_source = serialize_remote_source(&exec.source);
325
326 let proto = protobuf::RemoteTablePhysicalPlanNode {
327 remote_table_physical_plan_type: Some(
328 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(
329 protobuf::RemoteTableScanExec {
330 conn_options: Some(serialized_connection_options),
331 source: Some(serialized_source),
332 table_schema: Some(exec.table_schema.as_ref().try_into()?),
333 remote_schema,
334 projection: serialize_projection(exec.projection.as_ref()),
335 unparsed_filters: exec.unparsed_filters.clone(),
336 limit: exec.limit.map(|l| l as u32),
337 transform: serialized_transform,
338 connection: serialized_conn,
339 },
340 ),
341 ),
342 };
343
344 proto.encode(buf).map_err(|e| {
345 DataFusionError::Internal(format!(
346 "Failed to encode remote table scan exec plan: {e:?}"
347 ))
348 })?;
349 Ok(())
350 } else if let Some(exec) = node.as_any().downcast_ref::<RemoteTableInsertExec>() {
351 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
352 let remote_schema = serialize_remote_schema(&exec.remote_schema);
353 let serialized_table = protobuf::Identifiers {
354 idents: exec.table.clone(),
355 };
356 let serialized_literalizer = self
357 .literalize_codec
358 .try_encode(exec.literalizer.as_ref())?;
359 let serialized_conn = self
360 .connection_codec
361 .try_encode(exec.conn.as_ref(), &exec.conn_options)?;
362
363 let proto = protobuf::RemoteTablePhysicalPlanNode {
364 remote_table_physical_plan_type: Some(
365 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(
366 protobuf::RemoteTableInsertExec {
367 conn_options: Some(serialized_connection_options),
368 table: Some(serialized_table),
369 remote_schema: Some(remote_schema),
370 literalizer: serialized_literalizer,
371 connection: serialized_conn,
372 },
373 ),
374 ),
375 };
376
377 proto.encode(buf).map_err(|e| {
378 DataFusionError::Internal(format!(
379 "Failed to encode remote table insert exec node: {e:?}"
380 ))
381 })?;
382
383 Ok(())
384 } else if let Some(exec) = node.as_any().downcast_ref::<DataSourceExec>() {
385 let source = exec.data_source();
386 if let Some(memory_source) = source.as_any().downcast_ref::<MemorySourceConfig>() {
387 let proto_partitions = serialize_partitions(memory_source.partitions())?;
388 let projection = serialize_projection(memory_source.projection().as_ref());
389 let sort_information = memory_source
390 .sort_information()
391 .iter()
392 .map(|ordering| {
393 let sort_exprs = serialize_physical_sort_exprs(ordering.clone(), self)?;
394 Ok::<_, DataFusionError>(
395 datafusion_proto::protobuf::PhysicalSortExprNodeCollection {
396 physical_sort_expr_nodes: sort_exprs,
397 },
398 )
399 })
400 .collect::<Result<Vec<_>, _>>()?;
401
402 let proto = protobuf::RemoteTablePhysicalPlanNode {
403 remote_table_physical_plan_type: Some(
404 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::MemoryDatasource(protobuf::MemoryDatasourceNode {
405 partitions: proto_partitions,
406 schema: Some(memory_source.original_schema().try_into()?),
407 projection,
408 sort_information,
409 show_sizes: memory_source.show_sizes(),
410 fetch: memory_source.fetch().map(|f| f as u32),
411 }),
412 ),
413 };
414
415 proto.encode(buf).map_err(|e| {
416 DataFusionError::Internal(format!(
417 "Failed to encode memory datasource node: {e:?}"
418 ))
419 })?;
420
421 Ok(())
422 } else {
423 Err(DataFusionError::NotImplemented(format!(
424 "RemotePhysicalCodec only support encoding MemorySourceConfig, got {source:?}",
425 )))
426 }
427 } else {
428 Err(DataFusionError::NotImplemented(format!(
429 "RemotePhysicalCodec does not support encoding {}",
430 node.name()
431 )))
432 }
433 }
434}
435
436fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
437 match options {
438 ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
439 connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
440 protobuf::PostgresConnectionOptions {
441 host: options.host.clone(),
442 port: options.port as u32,
443 username: options.username.clone(),
444 password: options.password.clone(),
445 database: options.database.clone(),
446 pool_max_size: options.pool_max_size as u32,
447 pool_min_idle: options.pool_min_idle as u32,
448 pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
449 pool_ttl_check_interval: Some(serialize_duration(
450 &options.pool_ttl_check_interval,
451 )),
452 stream_chunk_size: options.stream_chunk_size as u32,
453 default_numeric_scale: options.default_numeric_scale as i32,
454 },
455 )),
456 },
457 ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
458 connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
459 protobuf::MysqlConnectionOptions {
460 host: options.host.clone(),
461 port: options.port as u32,
462 username: options.username.clone(),
463 password: options.password.clone(),
464 database: options.database.clone(),
465 pool_max_size: options.pool_max_size as u32,
466 pool_min_idle: options.pool_min_idle as u32,
467 pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
468 pool_ttl_check_interval: Some(serialize_duration(
469 &options.pool_ttl_check_interval,
470 )),
471 stream_chunk_size: options.stream_chunk_size as u32,
472 },
473 )),
474 },
475 ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
476 connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
477 protobuf::OracleConnectionOptions {
478 host: options.host.clone(),
479 port: options.port as u32,
480 username: options.username.clone(),
481 password: options.password.clone(),
482 service_name: options.service_name.clone(),
483 pool_max_size: options.pool_max_size as u32,
484 pool_min_idle: options.pool_min_idle as u32,
485 pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
486 pool_ttl_check_interval: Some(serialize_duration(
487 &options.pool_ttl_check_interval,
488 )),
489 stream_chunk_size: options.stream_chunk_size as u32,
490 },
491 )),
492 },
493 ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
494 connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
495 protobuf::SqliteConnectionOptions {
496 path: options.path.to_str().unwrap().to_string(),
497 stream_chunk_size: options.stream_chunk_size as u32,
498 },
499 )),
500 },
501 ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
502 connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
503 protobuf::DmConnectionOptions {
504 host: options.host.clone(),
505 port: options.port as u32,
506 username: options.username.clone(),
507 password: options.password.clone(),
508 schema: options.schema.clone(),
509 stream_chunk_size: options.stream_chunk_size as u32,
510 driver: options.driver.clone(),
511 },
512 )),
513 },
514 }
515}
516
517fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
518 match options.connection_options {
519 Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
520 ConnectionOptions::Postgres(PostgresConnectionOptions {
521 host: options.host,
522 port: options.port as u16,
523 username: options.username,
524 password: options.password,
525 database: options.database,
526 pool_max_size: options.pool_max_size as usize,
527 pool_min_idle: options.pool_min_idle as usize,
528 pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
529 pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
530 stream_chunk_size: options.stream_chunk_size as usize,
531 default_numeric_scale: options.default_numeric_scale as i8,
532 })
533 }
534 Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
535 ConnectionOptions::Mysql(MysqlConnectionOptions {
536 host: options.host,
537 port: options.port as u16,
538 username: options.username,
539 password: options.password,
540 database: options.database,
541 pool_max_size: options.pool_max_size as usize,
542 pool_min_idle: options.pool_min_idle as usize,
543 pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
544 pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
545 stream_chunk_size: options.stream_chunk_size as usize,
546 })
547 }
548 Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
549 ConnectionOptions::Oracle(OracleConnectionOptions {
550 host: options.host,
551 port: options.port as u16,
552 username: options.username,
553 password: options.password,
554 service_name: options.service_name,
555 pool_max_size: options.pool_max_size as usize,
556 pool_min_idle: options.pool_min_idle as usize,
557 pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
558 pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
559 stream_chunk_size: options.stream_chunk_size as usize,
560 })
561 }
562 Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
563 ConnectionOptions::Sqlite(SqliteConnectionOptions {
564 path: std::path::Path::new(&options.path).to_path_buf(),
565 stream_chunk_size: options.stream_chunk_size as usize,
566 })
567 }
568 Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
569 ConnectionOptions::Dm(DmConnectionOptions {
570 host: options.host,
571 port: options.port as u16,
572 username: options.username,
573 password: options.password,
574 schema: options.schema,
575 stream_chunk_size: options.stream_chunk_size as usize,
576 driver: options.driver,
577 })
578 }
579 _ => panic!("Failed to parse connection options: {options:?}"),
580 }
581}
582
583fn serialize_duration(duration: &Duration) -> protobuf::Duration {
584 protobuf::Duration {
585 secs: duration.as_secs(),
586 nanos: duration.subsec_nanos(),
587 }
588}
589
590fn parse_duration(duration: &protobuf::Duration) -> Duration {
591 Duration::new(duration.secs, duration.nanos)
592}
593
594fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<protobuf::Projection> {
595 projection.map(|p| protobuf::Projection {
596 projection: p.iter().map(|n| *n as u32).collect(),
597 })
598}
599
600fn parse_projection(projection: Option<&protobuf::Projection>) -> Option<Vec<usize>> {
601 projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
602}
603
604fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
605 let fields = remote_schema
606 .fields
607 .iter()
608 .map(serialize_remote_field)
609 .collect::<Vec<_>>();
610
611 protobuf::RemoteSchema { fields }
612}
613
614fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
615 protobuf::RemoteField {
616 name: remote_field.name.clone(),
617 remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
618 nullable: remote_field.nullable,
619 auto_increment: remote_field.auto_increment,
620 }
621}
622
623fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
624 match remote_type {
625 RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
626 r#type: Some(protobuf::remote_type::Type::PostgresInt2(
627 protobuf::PostgresInt2 {},
628 )),
629 },
630 RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
631 r#type: Some(protobuf::remote_type::Type::PostgresInt4(
632 protobuf::PostgresInt4 {},
633 )),
634 },
635 RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
636 r#type: Some(protobuf::remote_type::Type::PostgresInt8(
637 protobuf::PostgresInt8 {},
638 )),
639 },
640 RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
641 r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
642 protobuf::PostgresFloat4 {},
643 )),
644 },
645 RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
646 r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
647 protobuf::PostgresFloat8 {},
648 )),
649 },
650 RemoteType::Postgres(PostgresType::Numeric(precision, scale)) => protobuf::RemoteType {
651 r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
652 protobuf::PostgresNumeric {
653 precision: *precision as u32,
654 scale: *scale as i32,
655 },
656 )),
657 },
658 RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
659 r#type: Some(protobuf::remote_type::Type::PostgresName(
660 protobuf::PostgresName {},
661 )),
662 },
663 RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
664 r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
665 protobuf::PostgresVarchar {},
666 )),
667 },
668 RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
669 r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
670 protobuf::PostgresBpchar {},
671 )),
672 },
673 RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
674 r#type: Some(protobuf::remote_type::Type::PostgresText(
675 protobuf::PostgresText {},
676 )),
677 },
678 RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
679 r#type: Some(protobuf::remote_type::Type::PostgresBytea(
680 protobuf::PostgresBytea {},
681 )),
682 },
683 RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
684 r#type: Some(protobuf::remote_type::Type::PostgresDate(
685 protobuf::PostgresDate {},
686 )),
687 },
688 RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
689 r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
690 protobuf::PostgresTimestamp {},
691 )),
692 },
693 RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
694 r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
695 protobuf::PostgresTimestampTz {},
696 )),
697 },
698 RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
699 r#type: Some(protobuf::remote_type::Type::PostgresTime(
700 protobuf::PostgresTime {},
701 )),
702 },
703 RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
704 r#type: Some(protobuf::remote_type::Type::PostgresInterval(
705 protobuf::PostgresInterval {},
706 )),
707 },
708 RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
709 r#type: Some(protobuf::remote_type::Type::PostgresBool(
710 protobuf::PostgresBool {},
711 )),
712 },
713 RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
714 r#type: Some(protobuf::remote_type::Type::PostgresJson(
715 protobuf::PostgresJson {},
716 )),
717 },
718 RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
719 r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
720 protobuf::PostgresJsonb {},
721 )),
722 },
723 RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
724 r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
725 protobuf::PostgresInt2Array {},
726 )),
727 },
728 RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
729 r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
730 protobuf::PostgresInt4Array {},
731 )),
732 },
733 RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
734 r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
735 protobuf::PostgresInt8Array {},
736 )),
737 },
738 RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
739 r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
740 protobuf::PostgresFloat4Array {},
741 )),
742 },
743 RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
744 r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
745 protobuf::PostgresFloat8Array {},
746 )),
747 },
748 RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
749 r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
750 protobuf::PostgresVarcharArray {},
751 )),
752 },
753 RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
754 r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
755 protobuf::PostgresBpcharArray {},
756 )),
757 },
758 RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
759 r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
760 protobuf::PostgresTextArray {},
761 )),
762 },
763 RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
764 r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
765 protobuf::PostgresByteaArray {},
766 )),
767 },
768 RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
769 r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
770 protobuf::PostgresBoolArray {},
771 )),
772 },
773 RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
774 r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
775 protobuf::PostgresPostGisGeometry {},
776 )),
777 },
778 RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
779 r#type: Some(protobuf::remote_type::Type::PostgresOid(
780 protobuf::PostgresOid {},
781 )),
782 },
783 RemoteType::Postgres(PostgresType::Xml) => protobuf::RemoteType {
784 r#type: Some(protobuf::remote_type::Type::PostgresXml(
785 protobuf::PostgresXml {},
786 )),
787 },
788 RemoteType::Postgres(PostgresType::Uuid) => protobuf::RemoteType {
789 r#type: Some(protobuf::remote_type::Type::PostgresUuid(
790 protobuf::PostgresUuid {},
791 )),
792 },
793
794 RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
795 r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
796 protobuf::MysqlTinyInt {},
797 )),
798 },
799 RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
800 r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
801 protobuf::MysqlTinyIntUnsigned {},
802 )),
803 },
804 RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
805 r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
806 protobuf::MysqlSmallInt {},
807 )),
808 },
809 RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
810 r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
811 protobuf::MysqlSmallIntUnsigned {},
812 )),
813 },
814 RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
815 r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
816 protobuf::MysqlMediumInt {},
817 )),
818 },
819 RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
820 r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
821 protobuf::MysqlMediumIntUnsigned {},
822 )),
823 },
824 RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
825 r#type: Some(protobuf::remote_type::Type::MysqlInteger(
826 protobuf::MysqlInteger {},
827 )),
828 },
829 RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
830 r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
831 protobuf::MysqlIntegerUnsigned {},
832 )),
833 },
834 RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
835 r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
836 protobuf::MysqlBigInt {},
837 )),
838 },
839 RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
840 r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
841 protobuf::MysqlBigIntUnsigned {},
842 )),
843 },
844 RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
845 r#type: Some(protobuf::remote_type::Type::MysqlFloat(
846 protobuf::MysqlFloat {},
847 )),
848 },
849 RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
850 r#type: Some(protobuf::remote_type::Type::MysqlDouble(
851 protobuf::MysqlDouble {},
852 )),
853 },
854 RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
855 r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
856 protobuf::MysqlDecimal {
857 precision: *precision as u32,
858 scale: *scale as u32,
859 },
860 )),
861 },
862 RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
863 r#type: Some(protobuf::remote_type::Type::MysqlDate(
864 protobuf::MysqlDate {},
865 )),
866 },
867 RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
868 r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
869 protobuf::MysqlDateTime {},
870 )),
871 },
872 RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
873 r#type: Some(protobuf::remote_type::Type::MysqlTime(
874 protobuf::MysqlTime {},
875 )),
876 },
877 RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
878 r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
879 protobuf::MysqlTimestamp {},
880 )),
881 },
882 RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
883 r#type: Some(protobuf::remote_type::Type::MysqlYear(
884 protobuf::MysqlYear {},
885 )),
886 },
887 RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
888 r#type: Some(protobuf::remote_type::Type::MysqlChar(
889 protobuf::MysqlChar {},
890 )),
891 },
892 RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
893 r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
894 protobuf::MysqlVarchar {},
895 )),
896 },
897 RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
898 r#type: Some(protobuf::remote_type::Type::MysqlBinary(
899 protobuf::MysqlBinary {},
900 )),
901 },
902 RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
903 r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
904 protobuf::MysqlVarbinary {},
905 )),
906 },
907 RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
908 r#type: Some(protobuf::remote_type::Type::MysqlText(
909 protobuf::MysqlText { length: *len },
910 )),
911 },
912 RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
913 r#type: Some(protobuf::remote_type::Type::MysqlBlob(
914 protobuf::MysqlBlob { length: *len },
915 )),
916 },
917 RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
918 r#type: Some(protobuf::remote_type::Type::MysqlJson(
919 protobuf::MysqlJson {},
920 )),
921 },
922 RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
923 r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
924 protobuf::MysqlGeometry {},
925 )),
926 },
927
928 RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
929 r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
930 protobuf::OracleVarchar2 { length: *len },
931 )),
932 },
933 RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
934 r#type: Some(protobuf::remote_type::Type::OracleChar(
935 protobuf::OracleChar { length: *len },
936 )),
937 },
938 RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
939 r#type: Some(protobuf::remote_type::Type::OracleNumber(
940 protobuf::OracleNumber {
941 precision: *precision as u32,
942 scale: *scale as i32,
943 },
944 )),
945 },
946 RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
947 r#type: Some(protobuf::remote_type::Type::OracleDate(
948 protobuf::OracleDate {},
949 )),
950 },
951 RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
952 r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
953 protobuf::OracleTimestamp {},
954 )),
955 },
956 RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
957 r#type: Some(protobuf::remote_type::Type::OracleBoolean(
958 protobuf::OracleBoolean {},
959 )),
960 },
961 RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
962 r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
963 protobuf::OracleBinaryFloat {},
964 )),
965 },
966 RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
967 r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
968 protobuf::OracleBinaryDouble {},
969 )),
970 },
971 RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
972 r#type: Some(protobuf::remote_type::Type::OracleBlob(
973 protobuf::OracleBlob {},
974 )),
975 },
976 RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
977 r#type: Some(protobuf::remote_type::Type::OracleFloat(
978 protobuf::OracleFloat {
979 precision: *precision as u32,
980 },
981 )),
982 },
983 RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
984 r#type: Some(protobuf::remote_type::Type::OracleNchar(
985 protobuf::OracleNChar { length: *len },
986 )),
987 },
988 RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
989 r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
990 protobuf::OracleNVarchar2 { length: *len },
991 )),
992 },
993 RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
994 r#type: Some(protobuf::remote_type::Type::OracleRaw(
995 protobuf::OracleRaw { length: *len },
996 )),
997 },
998 RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
999 r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
1000 protobuf::OracleLongRaw {},
1001 )),
1002 },
1003 RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
1004 r#type: Some(protobuf::remote_type::Type::OracleLong(
1005 protobuf::OracleLong {},
1006 )),
1007 },
1008 RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
1009 r#type: Some(protobuf::remote_type::Type::OracleClob(
1010 protobuf::OracleClob {},
1011 )),
1012 },
1013 RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
1014 r#type: Some(protobuf::remote_type::Type::OracleNclob(
1015 protobuf::OracleNClob {},
1016 )),
1017 },
1018 RemoteType::Oracle(OracleType::SdeGeometry) => protobuf::RemoteType {
1019 r#type: Some(protobuf::remote_type::Type::OracleSdeGeometry(
1020 protobuf::OracleSdeGeometry {},
1021 )),
1022 },
1023 RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
1024 r#type: Some(protobuf::remote_type::Type::SqliteNull(
1025 protobuf::SqliteNull {},
1026 )),
1027 },
1028 RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
1029 r#type: Some(protobuf::remote_type::Type::SqliteInteger(
1030 protobuf::SqliteInteger {},
1031 )),
1032 },
1033 RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
1034 r#type: Some(protobuf::remote_type::Type::SqliteReal(
1035 protobuf::SqliteReal {},
1036 )),
1037 },
1038 RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
1039 r#type: Some(protobuf::remote_type::Type::SqliteText(
1040 protobuf::SqliteText {},
1041 )),
1042 },
1043 RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
1044 r#type: Some(protobuf::remote_type::Type::SqliteBlob(
1045 protobuf::SqliteBlob {},
1046 )),
1047 },
1048 RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
1049 r#type: Some(protobuf::remote_type::Type::DmTinyInt(
1050 protobuf::DmTinyInt {},
1051 )),
1052 },
1053 RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
1054 r#type: Some(protobuf::remote_type::Type::DmSmallInt(
1055 protobuf::DmSmallInt {},
1056 )),
1057 },
1058 RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
1059 r#type: Some(protobuf::remote_type::Type::DmInteger(
1060 protobuf::DmInteger {},
1061 )),
1062 },
1063 RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
1064 r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
1065 },
1066 RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
1067 r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
1068 },
1069 RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
1070 r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
1071 },
1072 RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
1073 r#type: Some(protobuf::remote_type::Type::DmNumeric(
1074 protobuf::DmNumeric {
1075 precision: *precision as u32,
1076 scale: *scale as i32,
1077 },
1078 )),
1079 },
1080 RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
1081 r#type: Some(protobuf::remote_type::Type::DmDecimal(
1082 protobuf::DmDecimal {
1083 precision: *precision as u32,
1084 scale: *scale as i32,
1085 },
1086 )),
1087 },
1088 RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
1089 r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
1090 length: len.map(|s| s as u32),
1091 })),
1092 },
1093 RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
1094 r#type: Some(protobuf::remote_type::Type::DmVarchar(
1095 protobuf::DmVarchar {
1096 length: len.map(|s| s as u32),
1097 },
1098 )),
1099 },
1100 RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
1101 r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
1102 },
1103 RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
1104 r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
1105 length: *len as u32,
1106 })),
1107 },
1108 RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
1109 r#type: Some(protobuf::remote_type::Type::DmVarbinary(
1110 protobuf::DmVarbinary {
1111 length: len.map(|s| s as u32),
1112 },
1113 )),
1114 },
1115 RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
1116 r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
1117 },
1118 RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
1119 r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
1120 },
1121 RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
1122 r#type: Some(protobuf::remote_type::Type::DmTimestamp(
1123 protobuf::DmTimestamp {
1124 precision: *precision as u32,
1125 },
1126 )),
1127 },
1128 RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
1129 r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
1130 precision: *precision as u32,
1131 })),
1132 },
1133 RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
1134 r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
1135 },
1136 }
1137}
1138
1139fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
1140 let fields = remote_schema
1141 .fields
1142 .iter()
1143 .map(parse_remote_field)
1144 .collect::<Vec<_>>();
1145
1146 RemoteSchema { fields }
1147}
1148
1149fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
1150 RemoteField {
1151 name: field.name.clone(),
1152 remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
1153 nullable: field.nullable,
1154 auto_increment: field.auto_increment,
1155 }
1156}
1157
1158fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
1159 match remote_type.r#type.as_ref().unwrap() {
1160 protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
1161 protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
1162 protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
1163 protobuf::remote_type::Type::PostgresFloat4(_) => {
1164 RemoteType::Postgres(PostgresType::Float4)
1165 }
1166 protobuf::remote_type::Type::PostgresFloat8(_) => {
1167 RemoteType::Postgres(PostgresType::Float8)
1168 }
1169 protobuf::remote_type::Type::PostgresNumeric(numeric) => RemoteType::Postgres(
1170 PostgresType::Numeric(numeric.precision as u8, numeric.scale as i8),
1171 ),
1172 protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
1173 protobuf::remote_type::Type::PostgresVarchar(_) => {
1174 RemoteType::Postgres(PostgresType::Varchar)
1175 }
1176 protobuf::remote_type::Type::PostgresBpchar(_) => {
1177 RemoteType::Postgres(PostgresType::Bpchar)
1178 }
1179 protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
1180 protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
1181 protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
1182 protobuf::remote_type::Type::PostgresTimestamp(_) => {
1183 RemoteType::Postgres(PostgresType::Timestamp)
1184 }
1185 protobuf::remote_type::Type::PostgresTimestampTz(_) => {
1186 RemoteType::Postgres(PostgresType::TimestampTz)
1187 }
1188 protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
1189 protobuf::remote_type::Type::PostgresInterval(_) => {
1190 RemoteType::Postgres(PostgresType::Interval)
1191 }
1192 protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
1193 protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
1194 protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
1195 protobuf::remote_type::Type::PostgresInt2Array(_) => {
1196 RemoteType::Postgres(PostgresType::Int2Array)
1197 }
1198 protobuf::remote_type::Type::PostgresInt4Array(_) => {
1199 RemoteType::Postgres(PostgresType::Int4Array)
1200 }
1201 protobuf::remote_type::Type::PostgresInt8Array(_) => {
1202 RemoteType::Postgres(PostgresType::Int8Array)
1203 }
1204 protobuf::remote_type::Type::PostgresFloat4Array(_) => {
1205 RemoteType::Postgres(PostgresType::Float4Array)
1206 }
1207 protobuf::remote_type::Type::PostgresFloat8Array(_) => {
1208 RemoteType::Postgres(PostgresType::Float8Array)
1209 }
1210 protobuf::remote_type::Type::PostgresVarcharArray(_) => {
1211 RemoteType::Postgres(PostgresType::VarcharArray)
1212 }
1213 protobuf::remote_type::Type::PostgresBpcharArray(_) => {
1214 RemoteType::Postgres(PostgresType::BpcharArray)
1215 }
1216 protobuf::remote_type::Type::PostgresTextArray(_) => {
1217 RemoteType::Postgres(PostgresType::TextArray)
1218 }
1219 protobuf::remote_type::Type::PostgresByteaArray(_) => {
1220 RemoteType::Postgres(PostgresType::ByteaArray)
1221 }
1222 protobuf::remote_type::Type::PostgresBoolArray(_) => {
1223 RemoteType::Postgres(PostgresType::BoolArray)
1224 }
1225 protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
1226 RemoteType::Postgres(PostgresType::PostGisGeometry)
1227 }
1228 protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
1229 protobuf::remote_type::Type::PostgresXml(_) => RemoteType::Postgres(PostgresType::Xml),
1230 protobuf::remote_type::Type::PostgresUuid(_) => RemoteType::Postgres(PostgresType::Uuid),
1231 protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
1232 protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
1233 RemoteType::Mysql(MysqlType::TinyIntUnsigned)
1234 }
1235 protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
1236 protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
1237 RemoteType::Mysql(MysqlType::SmallIntUnsigned)
1238 }
1239 protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
1240 protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
1241 RemoteType::Mysql(MysqlType::MediumIntUnsigned)
1242 }
1243 protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
1244 protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
1245 RemoteType::Mysql(MysqlType::IntegerUnsigned)
1246 }
1247 protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
1248 protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
1249 RemoteType::Mysql(MysqlType::BigIntUnsigned)
1250 }
1251 protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
1252 protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
1253 protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
1254 MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
1255 ),
1256 protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
1257 protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
1258 protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
1259 protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
1260 protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
1261 protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
1262 protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
1263 protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
1264 protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
1265 protobuf::remote_type::Type::MysqlText(text) => {
1266 RemoteType::Mysql(MysqlType::Text(text.length))
1267 }
1268 protobuf::remote_type::Type::MysqlBlob(blob) => {
1269 RemoteType::Mysql(MysqlType::Blob(blob.length))
1270 }
1271 protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
1272 protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
1273 protobuf::remote_type::Type::OracleVarchar2(varchar) => {
1274 RemoteType::Oracle(OracleType::Varchar2(varchar.length))
1275 }
1276 protobuf::remote_type::Type::OracleChar(char) => {
1277 RemoteType::Oracle(OracleType::Char(char.length))
1278 }
1279 protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
1280 OracleType::Number(number.precision as u8, number.scale as i8),
1281 ),
1282 protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
1283 protobuf::remote_type::Type::OracleTimestamp(_) => {
1284 RemoteType::Oracle(OracleType::Timestamp)
1285 }
1286 protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
1287 protobuf::remote_type::Type::OracleBinaryFloat(_) => {
1288 RemoteType::Oracle(OracleType::BinaryFloat)
1289 }
1290 protobuf::remote_type::Type::OracleBinaryDouble(_) => {
1291 RemoteType::Oracle(OracleType::BinaryDouble)
1292 }
1293 protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
1294 RemoteType::Oracle(OracleType::Float(*precision as u8))
1295 }
1296 protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
1297 RemoteType::Oracle(OracleType::NChar(*length))
1298 }
1299 protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
1300 RemoteType::Oracle(OracleType::NVarchar2(*length))
1301 }
1302 protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
1303 RemoteType::Oracle(OracleType::Raw(*length))
1304 }
1305 protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
1306 protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
1307 protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
1308 protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
1309 protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
1310 protobuf::remote_type::Type::OracleSdeGeometry(_) => {
1311 RemoteType::Oracle(OracleType::SdeGeometry)
1312 }
1313 protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1314 protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1315 protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1316 protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1317 protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1318 protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1319 protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1320 protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1321 protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1322 protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1323 protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1324 protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1325 RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1326 }
1327 protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1328 RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1329 }
1330 protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1331 RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1332 }
1333 protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1334 RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1335 }
1336 protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1337 protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1338 RemoteType::Dm(DmType::Binary(*length as u16))
1339 }
1340 protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1341 RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1342 }
1343 protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1344 protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1345 protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1346 RemoteType::Dm(DmType::Timestamp(*precision as u8))
1347 }
1348 protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1349 RemoteType::Dm(DmType::Time(*precision as u8))
1350 }
1351 protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1352 }
1353}
1354
1355fn serialize_remote_source(source: &RemoteSource) -> protobuf::RemoteSource {
1356 match source {
1357 RemoteSource::Query(query) => protobuf::RemoteSource {
1358 source: Some(protobuf::remote_source::Source::Query(query.clone())),
1359 },
1360 RemoteSource::Table(table_identifiers) => protobuf::RemoteSource {
1361 source: Some(protobuf::remote_source::Source::Table(
1362 protobuf::Identifiers {
1363 idents: table_identifiers.clone(),
1364 },
1365 )),
1366 },
1367 }
1368}
1369
1370fn parse_remote_source(source: &protobuf::RemoteSource) -> DFResult<RemoteSource> {
1371 let source = source.source.as_ref().ok_or(DataFusionError::Internal(
1372 "remote source is not set".to_string(),
1373 ))?;
1374 match source {
1375 protobuf::remote_source::Source::Query(query) => Ok(RemoteSource::Query(query.clone())),
1376 protobuf::remote_source::Source::Table(table_identifiers) => {
1377 Ok(RemoteSource::Table(table_identifiers.idents.clone()))
1378 }
1379 }
1380}
1381
1382fn serialize_partitions(partitions: &[Vec<RecordBatch>]) -> Result<Vec<Vec<u8>>, DataFusionError> {
1383 let mut proto_partitions = vec![];
1384 for partition in partitions {
1385 if partition.is_empty() {
1386 proto_partitions.push(vec![]);
1387 continue;
1388 }
1389 let mut proto_partition = vec![];
1390 let mut stream_writer =
1391 StreamWriter::try_new(&mut proto_partition, &partition[0].schema())?;
1392 for batch in partition {
1393 stream_writer.write(batch)?;
1394 }
1395 stream_writer.finish()?;
1396 proto_partitions.push(proto_partition);
1397 }
1398 Ok(proto_partitions)
1399}
1400
1401fn parse_partitions(
1402 proto_partitions: &[Vec<u8>],
1403) -> Result<Vec<Vec<RecordBatch>>, DataFusionError> {
1404 let mut partitions = vec![];
1405 for proto_partition in proto_partitions {
1406 if proto_partition.is_empty() {
1407 partitions.push(vec![]);
1408 continue;
1409 }
1410 let mut partition = vec![];
1411 let stream_reader = StreamReader::try_new(proto_partition.as_slice(), None)?;
1412 for batch in stream_reader {
1413 partition.push(batch?);
1414 }
1415 partitions.push(partition);
1416 }
1417 Ok(partitions)
1418}