1use crate::DmConnectionOptions;
2use crate::LazyPool;
3use crate::MysqlConnectionOptions;
4use crate::OracleConnectionOptions;
5use crate::PostgresConnectionOptions;
6use crate::SqliteConnectionOptions;
7use crate::generated::prost as protobuf;
8use crate::{
9 ConnectionOptions, DFResult, DefaultLiteralizer, DefaultTransform, DmType, Literalize,
10 MysqlType, OracleType, PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef, RemoteSource,
11 RemoteTableInsertExec, RemoteTableScanExec, RemoteType, SqliteType, Transform,
12};
13use arrow::datatypes::SchemaRef;
14use datafusion_common::DataFusionError;
15use datafusion_execution::TaskContext;
16use datafusion_physical_plan::ExecutionPlan;
17use datafusion_proto::convert_required;
18use datafusion_proto::physical_plan::PhysicalExtensionCodec;
19use datafusion_proto::protobuf::proto_error;
20use derive_with::With;
21use prost::Message;
22use std::fmt::Debug;
23use std::sync::Arc;
24use std::time::Duration;
25
26pub trait TransformCodec: Debug + Send + Sync {
27 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
28 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
29}
30
31#[derive(Debug)]
32pub struct DefaultTransformCodec {}
33
34const DEFAULT_TRANSFORM_ID: &str = "__default";
35
36impl TransformCodec for DefaultTransformCodec {
37 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
38 if value.as_any().is::<DefaultTransform>() {
39 Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
40 } else {
41 Err(DataFusionError::Execution(format!(
42 "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
43 )))
44 }
45 }
46
47 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
48 if value == DEFAULT_TRANSFORM_ID.as_bytes() {
49 Ok(Arc::new(DefaultTransform {}))
50 } else {
51 Err(DataFusionError::Execution(
52 "DefaultTransformCodec only supports DefaultTransform".to_string(),
53 ))
54 }
55 }
56}
57
58pub trait LiteralizeCodec: Debug + Send + Sync {
59 fn try_encode(&self, value: &dyn Literalize) -> DFResult<Vec<u8>>;
60 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Literalize>>;
61}
62
63#[derive(Debug)]
64pub struct DefaultLiteralizeCodec {}
65
66const DEFAULT_LITERALIZE_ID: &str = "__default";
67
68impl LiteralizeCodec for DefaultLiteralizeCodec {
69 fn try_encode(&self, value: &dyn Literalize) -> DFResult<Vec<u8>> {
70 if value.as_any().is::<DefaultLiteralizer>() {
71 Ok(DEFAULT_LITERALIZE_ID.as_bytes().to_vec())
72 } else {
73 Err(DataFusionError::Execution(format!(
74 "DefaultLiteralizeCodec does not support literalize: {value:?}, please implement a custom LiteralizeCodec."
75 )))
76 }
77 }
78
79 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Literalize>> {
80 if value == DEFAULT_LITERALIZE_ID.as_bytes() {
81 Ok(Arc::new(DefaultLiteralizer {}))
82 } else {
83 Err(DataFusionError::Execution(
84 "DefaultLiteralizeCodec only supports DefaultLiteralizer".to_string(),
85 ))
86 }
87 }
88}
89
90#[derive(Debug, With)]
91pub struct RemotePhysicalCodec {
92 pub transform_codec: Arc<dyn TransformCodec>,
93 pub literalize_codec: Arc<dyn LiteralizeCodec>,
94}
95
96impl RemotePhysicalCodec {
97 pub fn new() -> Self {
98 Self {
99 transform_codec: Arc::new(DefaultTransformCodec {}),
100 literalize_codec: Arc::new(DefaultLiteralizeCodec {}),
101 }
102 }
103}
104
105impl Default for RemotePhysicalCodec {
106 fn default() -> Self {
107 Self::new()
108 }
109}
110
111impl PhysicalExtensionCodec for RemotePhysicalCodec {
112 fn try_decode(
113 &self,
114 buf: &[u8],
115 inputs: &[Arc<dyn ExecutionPlan>],
116 _ctx: &TaskContext,
117 ) -> DFResult<Arc<dyn ExecutionPlan>> {
118 let remote_table_node =
119 protobuf::RemoteTablePhysicalPlanNode::decode(buf).map_err(|e| {
120 DataFusionError::Internal(format!(
121 "Failed to decode remote table physical plan node: {e:?}"
122 ))
123 })?;
124 let remote_table_plan = remote_table_node.remote_table_physical_plan_type.ok_or_else(|| {
125 DataFusionError::Internal(
126 "Failed to decode remote table physical plan node due to physical plan type is none".to_string()
127 )
128 })?;
129
130 match remote_table_plan {
131 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(proto) => {
132 let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
133 Arc::new(DefaultTransform {})
134 } else {
135 self.transform_codec.try_decode(&proto.transform)?
136 };
137
138 let source = parse_remote_source(proto.source.as_ref().ok_or(
139 DataFusionError::Internal("remote source is not set".to_string()),
140 )?)?;
141
142 let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
143 let remote_schema = proto
144 .remote_schema
145 .map(|schema| Arc::new(parse_remote_schema(&schema)));
146
147 let projection = parse_projection(proto.projection.as_ref());
148
149 let limit = proto.limit.map(|l| l as usize);
150
151 let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
152 let pool = LazyPool::new(conn_options.clone());
153
154 let row_count = proto.row_count.map(|c| c as usize);
155
156 Ok(Arc::new(RemoteTableScanExec::try_new(
157 conn_options,
158 pool,
159 source,
160 table_schema,
161 remote_schema,
162 projection,
163 proto.unparsed_filters,
164 limit,
165 transform,
166 row_count,
167 )?))
168 }
169 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(
170 proto,
171 ) => {
172 if inputs.len() != 1 {
173 return Err(DataFusionError::Internal(
174 "RemoteTableInsertExec only support one input".to_string(),
175 ));
176 }
177
178 let input = inputs[0].clone();
179
180 let conn_options = Arc::new(parse_connection_options(proto.conn_options.unwrap()));
181 let pool = LazyPool::new(conn_options.clone());
182 let remote_schema = Arc::new(parse_remote_schema(&proto.remote_schema.unwrap()));
183 let table = proto.table.unwrap().idents;
184
185 let literalizer = if proto.literalizer == DEFAULT_LITERALIZE_ID.as_bytes() {
186 Arc::new(DefaultLiteralizer {})
187 } else {
188 self.literalize_codec.try_decode(&proto.literalizer)?
189 };
190
191 Ok(Arc::new(RemoteTableInsertExec::new(
192 input,
193 conn_options,
194 pool,
195 literalizer,
196 table,
197 remote_schema,
198 )))
199 }
200 }
201 }
202
203 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
204 if let Some(exec) = node.as_any().downcast_ref::<RemoteTableScanExec>() {
205 let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
206 DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
207 } else {
208 let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
209 assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
210 bytes
211 };
212
213 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
214 let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
215 let serialized_source = serialize_remote_source(&exec.source);
216
217 let proto = protobuf::RemoteTablePhysicalPlanNode {
218 remote_table_physical_plan_type: Some(
219 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Scan(
220 protobuf::RemoteTableScanExec {
221 conn_options: Some(serialized_connection_options),
222 source: Some(serialized_source),
223 table_schema: Some(exec.table_schema.as_ref().try_into()?),
224 remote_schema,
225 projection: serialize_projection(exec.projection.as_ref()),
226 unparsed_filters: exec.unparsed_filters.clone(),
227 limit: exec.limit.map(|l| l as u32),
228 transform: serialized_transform,
229 row_count: exec.row_count.map(|c| c as u32),
230 },
231 ),
232 ),
233 };
234
235 proto.encode(buf).map_err(|e| {
236 DataFusionError::Internal(format!(
237 "Failed to encode remote table scan exec plan: {e:?}"
238 ))
239 })?;
240 Ok(())
241 } else if let Some(exec) = node.as_any().downcast_ref::<RemoteTableInsertExec>() {
242 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
243 let remote_schema = serialize_remote_schema(&exec.remote_schema);
244 let serialized_table = protobuf::Identifiers {
245 idents: exec.table.clone(),
246 };
247 let serialized_literalizer = self
248 .literalize_codec
249 .try_encode(exec.literalizer.as_ref())?;
250
251 let proto = protobuf::RemoteTablePhysicalPlanNode {
252 remote_table_physical_plan_type: Some(
253 protobuf::remote_table_physical_plan_node::RemoteTablePhysicalPlanType::Insert(
254 protobuf::RemoteTableInsertExec {
255 conn_options: Some(serialized_connection_options),
256 table: Some(serialized_table),
257 remote_schema: Some(remote_schema),
258 literalizer: serialized_literalizer,
259 },
260 ),
261 ),
262 };
263
264 proto.encode(buf).map_err(|e| {
265 DataFusionError::Internal(format!(
266 "Failed to encode remote table insert exec node: {e:?}"
267 ))
268 })?;
269
270 Ok(())
271 } else {
272 Err(DataFusionError::NotImplemented(format!(
273 "RemotePhysicalCodec does not support encoding {}",
274 node.name()
275 )))
276 }
277 }
278}
279
280fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
281 match options {
282 ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
283 connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
284 protobuf::PostgresConnectionOptions {
285 host: options.host.clone(),
286 port: options.port as u32,
287 username: options.username.clone(),
288 password: options.password.clone(),
289 database: options.database.clone(),
290 pool_max_size: options.pool_max_size as u32,
291 pool_min_idle: options.pool_min_idle as u32,
292 pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
293 pool_ttl_check_interval: Some(serialize_duration(
294 &options.pool_ttl_check_interval,
295 )),
296 stream_chunk_size: options.stream_chunk_size as u32,
297 default_numeric_scale: options.default_numeric_scale as i32,
298 },
299 )),
300 },
301 ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
302 connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
303 protobuf::MysqlConnectionOptions {
304 host: options.host.clone(),
305 port: options.port as u32,
306 username: options.username.clone(),
307 password: options.password.clone(),
308 database: options.database.clone(),
309 pool_max_size: options.pool_max_size as u32,
310 pool_min_idle: options.pool_min_idle as u32,
311 pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
312 pool_ttl_check_interval: Some(serialize_duration(
313 &options.pool_ttl_check_interval,
314 )),
315 stream_chunk_size: options.stream_chunk_size as u32,
316 },
317 )),
318 },
319 ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
320 connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
321 protobuf::OracleConnectionOptions {
322 host: options.host.clone(),
323 port: options.port as u32,
324 username: options.username.clone(),
325 password: options.password.clone(),
326 service_name: options.service_name.clone(),
327 pool_max_size: options.pool_max_size as u32,
328 pool_min_idle: options.pool_min_idle as u32,
329 pool_idle_timeout: Some(serialize_duration(&options.pool_idle_timeout)),
330 pool_ttl_check_interval: Some(serialize_duration(
331 &options.pool_ttl_check_interval,
332 )),
333 stream_chunk_size: options.stream_chunk_size as u32,
334 },
335 )),
336 },
337 ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
338 connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
339 protobuf::SqliteConnectionOptions {
340 path: options.path.to_str().unwrap().to_string(),
341 stream_chunk_size: options.stream_chunk_size as u32,
342 },
343 )),
344 },
345 ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
346 connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
347 protobuf::DmConnectionOptions {
348 host: options.host.clone(),
349 port: options.port as u32,
350 username: options.username.clone(),
351 password: options.password.clone(),
352 schema: options.schema.clone(),
353 stream_chunk_size: options.stream_chunk_size as u32,
354 driver: options.driver.clone(),
355 },
356 )),
357 },
358 }
359}
360
361fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
362 match options.connection_options {
363 Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
364 ConnectionOptions::Postgres(PostgresConnectionOptions {
365 host: options.host,
366 port: options.port as u16,
367 username: options.username,
368 password: options.password,
369 database: options.database,
370 pool_max_size: options.pool_max_size as usize,
371 pool_min_idle: options.pool_min_idle as usize,
372 pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
373 pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
374 stream_chunk_size: options.stream_chunk_size as usize,
375 default_numeric_scale: options.default_numeric_scale as i8,
376 })
377 }
378 Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
379 ConnectionOptions::Mysql(MysqlConnectionOptions {
380 host: options.host,
381 port: options.port as u16,
382 username: options.username,
383 password: options.password,
384 database: options.database,
385 pool_max_size: options.pool_max_size as usize,
386 pool_min_idle: options.pool_min_idle as usize,
387 pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
388 pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
389 stream_chunk_size: options.stream_chunk_size as usize,
390 })
391 }
392 Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
393 ConnectionOptions::Oracle(OracleConnectionOptions {
394 host: options.host,
395 port: options.port as u16,
396 username: options.username,
397 password: options.password,
398 service_name: options.service_name,
399 pool_max_size: options.pool_max_size as usize,
400 pool_min_idle: options.pool_min_idle as usize,
401 pool_idle_timeout: parse_duration(&options.pool_idle_timeout.unwrap()),
402 pool_ttl_check_interval: parse_duration(&options.pool_ttl_check_interval.unwrap()),
403 stream_chunk_size: options.stream_chunk_size as usize,
404 })
405 }
406 Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
407 ConnectionOptions::Sqlite(SqliteConnectionOptions {
408 path: std::path::Path::new(&options.path).to_path_buf(),
409 stream_chunk_size: options.stream_chunk_size as usize,
410 })
411 }
412 Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
413 ConnectionOptions::Dm(DmConnectionOptions {
414 host: options.host,
415 port: options.port as u16,
416 username: options.username,
417 password: options.password,
418 schema: options.schema,
419 stream_chunk_size: options.stream_chunk_size as usize,
420 driver: options.driver,
421 })
422 }
423 _ => panic!("Failed to parse connection options: {options:?}"),
424 }
425}
426
427fn serialize_duration(duration: &Duration) -> protobuf::Duration {
428 protobuf::Duration {
429 secs: duration.as_secs(),
430 nanos: duration.subsec_nanos(),
431 }
432}
433
434fn parse_duration(duration: &protobuf::Duration) -> Duration {
435 Duration::new(duration.secs, duration.nanos)
436}
437
438fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<protobuf::Projection> {
439 projection.map(|p| protobuf::Projection {
440 projection: p.iter().map(|n| *n as u32).collect(),
441 })
442}
443
444fn parse_projection(projection: Option<&protobuf::Projection>) -> Option<Vec<usize>> {
445 projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
446}
447
448fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
449 let fields = remote_schema
450 .fields
451 .iter()
452 .map(serialize_remote_field)
453 .collect::<Vec<_>>();
454
455 protobuf::RemoteSchema { fields }
456}
457
458fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
459 protobuf::RemoteField {
460 name: remote_field.name.clone(),
461 remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
462 nullable: remote_field.nullable,
463 auto_increment: remote_field.auto_increment,
464 }
465}
466
467fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
468 match remote_type {
469 RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
470 r#type: Some(protobuf::remote_type::Type::PostgresInt2(
471 protobuf::Empty {},
472 )),
473 },
474 RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
475 r#type: Some(protobuf::remote_type::Type::PostgresInt4(
476 protobuf::Empty {},
477 )),
478 },
479 RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
480 r#type: Some(protobuf::remote_type::Type::PostgresInt8(
481 protobuf::Empty {},
482 )),
483 },
484 RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
485 r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
486 protobuf::Empty {},
487 )),
488 },
489 RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
490 r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
491 protobuf::Empty {},
492 )),
493 },
494 RemoteType::Postgres(PostgresType::Numeric(precision, scale)) => protobuf::RemoteType {
495 r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
496 protobuf::PostgresNumeric {
497 precision: *precision as u32,
498 scale: *scale as i32,
499 },
500 )),
501 },
502 RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
503 r#type: Some(protobuf::remote_type::Type::PostgresName(
504 protobuf::Empty {},
505 )),
506 },
507 RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
508 r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
509 protobuf::Empty {},
510 )),
511 },
512 RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
513 r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
514 protobuf::Empty {},
515 )),
516 },
517 RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
518 r#type: Some(protobuf::remote_type::Type::PostgresText(
519 protobuf::Empty {},
520 )),
521 },
522 RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
523 r#type: Some(protobuf::remote_type::Type::PostgresBytea(
524 protobuf::Empty {},
525 )),
526 },
527 RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
528 r#type: Some(protobuf::remote_type::Type::PostgresDate(
529 protobuf::Empty {},
530 )),
531 },
532 RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
533 r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
534 protobuf::Empty {},
535 )),
536 },
537 RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
538 r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
539 protobuf::Empty {},
540 )),
541 },
542 RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
543 r#type: Some(protobuf::remote_type::Type::PostgresTime(
544 protobuf::Empty {},
545 )),
546 },
547 RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
548 r#type: Some(protobuf::remote_type::Type::PostgresInterval(
549 protobuf::Empty {},
550 )),
551 },
552 RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
553 r#type: Some(protobuf::remote_type::Type::PostgresBool(
554 protobuf::Empty {},
555 )),
556 },
557 RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
558 r#type: Some(protobuf::remote_type::Type::PostgresJson(
559 protobuf::Empty {},
560 )),
561 },
562 RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
563 r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
564 protobuf::Empty {},
565 )),
566 },
567 RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
568 r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
569 protobuf::Empty {},
570 )),
571 },
572 RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
573 r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
574 protobuf::Empty {},
575 )),
576 },
577 RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
578 r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
579 protobuf::Empty {},
580 )),
581 },
582 RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
583 r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
584 protobuf::Empty {},
585 )),
586 },
587 RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
588 r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
589 protobuf::Empty {},
590 )),
591 },
592 RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
593 r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
594 protobuf::Empty {},
595 )),
596 },
597 RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
598 r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
599 protobuf::Empty {},
600 )),
601 },
602 RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
603 r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
604 protobuf::Empty {},
605 )),
606 },
607 RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
608 r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
609 protobuf::Empty {},
610 )),
611 },
612 RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
613 r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
614 protobuf::Empty {},
615 )),
616 },
617 RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
618 r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
619 protobuf::Empty {},
620 )),
621 },
622 RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
623 r#type: Some(protobuf::remote_type::Type::PostgresOid(protobuf::Empty {})),
624 },
625 RemoteType::Postgres(PostgresType::Xml) => protobuf::RemoteType {
626 r#type: Some(protobuf::remote_type::Type::PostgresXml(protobuf::Empty {})),
627 },
628 RemoteType::Postgres(PostgresType::Uuid) => protobuf::RemoteType {
629 r#type: Some(protobuf::remote_type::Type::PostgresUuid(
630 protobuf::Empty {},
631 )),
632 },
633
634 RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
635 r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
636 protobuf::Empty {},
637 )),
638 },
639 RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
640 r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
641 protobuf::Empty {},
642 )),
643 },
644 RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
645 r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
646 protobuf::Empty {},
647 )),
648 },
649 RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
650 r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
651 protobuf::Empty {},
652 )),
653 },
654 RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
655 r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
656 protobuf::Empty {},
657 )),
658 },
659 RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
660 r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
661 protobuf::Empty {},
662 )),
663 },
664 RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
665 r#type: Some(protobuf::remote_type::Type::MysqlInteger(
666 protobuf::Empty {},
667 )),
668 },
669 RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
670 r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
671 protobuf::Empty {},
672 )),
673 },
674 RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
675 r#type: Some(protobuf::remote_type::Type::MysqlBigInt(protobuf::Empty {})),
676 },
677 RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
678 r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
679 protobuf::Empty {},
680 )),
681 },
682 RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
683 r#type: Some(protobuf::remote_type::Type::MysqlFloat(protobuf::Empty {})),
684 },
685 RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
686 r#type: Some(protobuf::remote_type::Type::MysqlDouble(protobuf::Empty {})),
687 },
688 RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
689 r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
690 protobuf::MysqlDecimal {
691 precision: *precision as u32,
692 scale: *scale as u32,
693 },
694 )),
695 },
696 RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
697 r#type: Some(protobuf::remote_type::Type::MysqlDate(protobuf::Empty {})),
698 },
699 RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
700 r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
701 protobuf::Empty {},
702 )),
703 },
704 RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
705 r#type: Some(protobuf::remote_type::Type::MysqlTime(protobuf::Empty {})),
706 },
707 RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
708 r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
709 protobuf::Empty {},
710 )),
711 },
712 RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
713 r#type: Some(protobuf::remote_type::Type::MysqlYear(protobuf::Empty {})),
714 },
715 RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
716 r#type: Some(protobuf::remote_type::Type::MysqlChar(protobuf::Empty {})),
717 },
718 RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
719 r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
720 protobuf::Empty {},
721 )),
722 },
723 RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
724 r#type: Some(protobuf::remote_type::Type::MysqlBinary(protobuf::Empty {})),
725 },
726 RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
727 r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
728 protobuf::Empty {},
729 )),
730 },
731 RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
732 r#type: Some(protobuf::remote_type::Type::MysqlText(
733 protobuf::MysqlText { length: *len },
734 )),
735 },
736 RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
737 r#type: Some(protobuf::remote_type::Type::MysqlBlob(
738 protobuf::MysqlBlob { length: *len },
739 )),
740 },
741 RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
742 r#type: Some(protobuf::remote_type::Type::MysqlJson(protobuf::Empty {})),
743 },
744 RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
745 r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
746 protobuf::Empty {},
747 )),
748 },
749
750 RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
751 r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
752 protobuf::OracleVarchar2 { length: *len },
753 )),
754 },
755 RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
756 r#type: Some(protobuf::remote_type::Type::OracleChar(
757 protobuf::OracleChar { length: *len },
758 )),
759 },
760 RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
761 r#type: Some(protobuf::remote_type::Type::OracleNumber(
762 protobuf::OracleNumber {
763 precision: *precision as u32,
764 scale: *scale as i32,
765 },
766 )),
767 },
768 RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
769 r#type: Some(protobuf::remote_type::Type::OracleDate(protobuf::Empty {})),
770 },
771 RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
772 r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
773 protobuf::Empty {},
774 )),
775 },
776 RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
777 r#type: Some(protobuf::remote_type::Type::OracleBoolean(
778 protobuf::Empty {},
779 )),
780 },
781 RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
782 r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
783 protobuf::Empty {},
784 )),
785 },
786 RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
787 r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
788 protobuf::Empty {},
789 )),
790 },
791 RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
792 r#type: Some(protobuf::remote_type::Type::OracleBlob(protobuf::Empty {})),
793 },
794 RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
795 r#type: Some(protobuf::remote_type::Type::OracleFloat(
796 protobuf::OracleFloat {
797 precision: *precision as u32,
798 },
799 )),
800 },
801 RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
802 r#type: Some(protobuf::remote_type::Type::OracleNchar(
803 protobuf::OracleNChar { length: *len },
804 )),
805 },
806 RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
807 r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
808 protobuf::OracleNVarchar2 { length: *len },
809 )),
810 },
811 RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
812 r#type: Some(protobuf::remote_type::Type::OracleRaw(
813 protobuf::OracleRaw { length: *len },
814 )),
815 },
816 RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
817 r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
818 protobuf::Empty {},
819 )),
820 },
821 RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
822 r#type: Some(protobuf::remote_type::Type::OracleLong(protobuf::Empty {})),
823 },
824 RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
825 r#type: Some(protobuf::remote_type::Type::OracleClob(protobuf::Empty {})),
826 },
827 RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
828 r#type: Some(protobuf::remote_type::Type::OracleNclob(protobuf::Empty {})),
829 },
830 RemoteType::Oracle(OracleType::SdeGeometry) => protobuf::RemoteType {
831 r#type: Some(protobuf::remote_type::Type::OracleSdeGeometry(
832 protobuf::Empty {},
833 )),
834 },
835 RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
836 r#type: Some(protobuf::remote_type::Type::SqliteNull(protobuf::Empty {})),
837 },
838 RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
839 r#type: Some(protobuf::remote_type::Type::SqliteInteger(
840 protobuf::Empty {},
841 )),
842 },
843 RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
844 r#type: Some(protobuf::remote_type::Type::SqliteReal(protobuf::Empty {})),
845 },
846 RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
847 r#type: Some(protobuf::remote_type::Type::SqliteText(protobuf::Empty {})),
848 },
849 RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
850 r#type: Some(protobuf::remote_type::Type::SqliteBlob(protobuf::Empty {})),
851 },
852 RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
853 r#type: Some(protobuf::remote_type::Type::DmTinyInt(protobuf::Empty {})),
854 },
855 RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
856 r#type: Some(protobuf::remote_type::Type::DmSmallInt(protobuf::Empty {})),
857 },
858 RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
859 r#type: Some(protobuf::remote_type::Type::DmInteger(protobuf::Empty {})),
860 },
861 RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
862 r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::Empty {})),
863 },
864 RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
865 r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::Empty {})),
866 },
867 RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
868 r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::Empty {})),
869 },
870 RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
871 r#type: Some(protobuf::remote_type::Type::DmNumeric(
872 protobuf::DmNumeric {
873 precision: *precision as u32,
874 scale: *scale as i32,
875 },
876 )),
877 },
878 RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
879 r#type: Some(protobuf::remote_type::Type::DmDecimal(
880 protobuf::DmDecimal {
881 precision: *precision as u32,
882 scale: *scale as i32,
883 },
884 )),
885 },
886 RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
887 r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
888 length: len.map(|s| s as u32),
889 })),
890 },
891 RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
892 r#type: Some(protobuf::remote_type::Type::DmVarchar(
893 protobuf::DmVarchar {
894 length: len.map(|s| s as u32),
895 },
896 )),
897 },
898 RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
899 r#type: Some(protobuf::remote_type::Type::DmText(protobuf::Empty {})),
900 },
901 RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
902 r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
903 length: *len as u32,
904 })),
905 },
906 RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
907 r#type: Some(protobuf::remote_type::Type::DmVarbinary(
908 protobuf::DmVarbinary {
909 length: len.map(|s| s as u32),
910 },
911 )),
912 },
913 RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
914 r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::Empty {})),
915 },
916 RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
917 r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::Empty {})),
918 },
919 RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
920 r#type: Some(protobuf::remote_type::Type::DmTimestamp(
921 protobuf::DmTimestamp {
922 precision: *precision as u32,
923 },
924 )),
925 },
926 RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
927 r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
928 precision: *precision as u32,
929 })),
930 },
931 RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
932 r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::Empty {})),
933 },
934 }
935}
936
937fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
938 let fields = remote_schema
939 .fields
940 .iter()
941 .map(parse_remote_field)
942 .collect::<Vec<_>>();
943
944 RemoteSchema { fields }
945}
946
947fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
948 RemoteField {
949 name: field.name.clone(),
950 remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
951 nullable: field.nullable,
952 auto_increment: field.auto_increment,
953 }
954}
955
956fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
957 match remote_type.r#type.as_ref().unwrap() {
958 protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
959 protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
960 protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
961 protobuf::remote_type::Type::PostgresFloat4(_) => {
962 RemoteType::Postgres(PostgresType::Float4)
963 }
964 protobuf::remote_type::Type::PostgresFloat8(_) => {
965 RemoteType::Postgres(PostgresType::Float8)
966 }
967 protobuf::remote_type::Type::PostgresNumeric(numeric) => RemoteType::Postgres(
968 PostgresType::Numeric(numeric.precision as u8, numeric.scale as i8),
969 ),
970 protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
971 protobuf::remote_type::Type::PostgresVarchar(_) => {
972 RemoteType::Postgres(PostgresType::Varchar)
973 }
974 protobuf::remote_type::Type::PostgresBpchar(_) => {
975 RemoteType::Postgres(PostgresType::Bpchar)
976 }
977 protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
978 protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
979 protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
980 protobuf::remote_type::Type::PostgresTimestamp(_) => {
981 RemoteType::Postgres(PostgresType::Timestamp)
982 }
983 protobuf::remote_type::Type::PostgresTimestampTz(_) => {
984 RemoteType::Postgres(PostgresType::TimestampTz)
985 }
986 protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
987 protobuf::remote_type::Type::PostgresInterval(_) => {
988 RemoteType::Postgres(PostgresType::Interval)
989 }
990 protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
991 protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
992 protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
993 protobuf::remote_type::Type::PostgresInt2Array(_) => {
994 RemoteType::Postgres(PostgresType::Int2Array)
995 }
996 protobuf::remote_type::Type::PostgresInt4Array(_) => {
997 RemoteType::Postgres(PostgresType::Int4Array)
998 }
999 protobuf::remote_type::Type::PostgresInt8Array(_) => {
1000 RemoteType::Postgres(PostgresType::Int8Array)
1001 }
1002 protobuf::remote_type::Type::PostgresFloat4Array(_) => {
1003 RemoteType::Postgres(PostgresType::Float4Array)
1004 }
1005 protobuf::remote_type::Type::PostgresFloat8Array(_) => {
1006 RemoteType::Postgres(PostgresType::Float8Array)
1007 }
1008 protobuf::remote_type::Type::PostgresVarcharArray(_) => {
1009 RemoteType::Postgres(PostgresType::VarcharArray)
1010 }
1011 protobuf::remote_type::Type::PostgresBpcharArray(_) => {
1012 RemoteType::Postgres(PostgresType::BpcharArray)
1013 }
1014 protobuf::remote_type::Type::PostgresTextArray(_) => {
1015 RemoteType::Postgres(PostgresType::TextArray)
1016 }
1017 protobuf::remote_type::Type::PostgresByteaArray(_) => {
1018 RemoteType::Postgres(PostgresType::ByteaArray)
1019 }
1020 protobuf::remote_type::Type::PostgresBoolArray(_) => {
1021 RemoteType::Postgres(PostgresType::BoolArray)
1022 }
1023 protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
1024 RemoteType::Postgres(PostgresType::PostGisGeometry)
1025 }
1026 protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
1027 protobuf::remote_type::Type::PostgresXml(_) => RemoteType::Postgres(PostgresType::Xml),
1028 protobuf::remote_type::Type::PostgresUuid(_) => RemoteType::Postgres(PostgresType::Uuid),
1029 protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
1030 protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
1031 RemoteType::Mysql(MysqlType::TinyIntUnsigned)
1032 }
1033 protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
1034 protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
1035 RemoteType::Mysql(MysqlType::SmallIntUnsigned)
1036 }
1037 protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
1038 protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
1039 RemoteType::Mysql(MysqlType::MediumIntUnsigned)
1040 }
1041 protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
1042 protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
1043 RemoteType::Mysql(MysqlType::IntegerUnsigned)
1044 }
1045 protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
1046 protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
1047 RemoteType::Mysql(MysqlType::BigIntUnsigned)
1048 }
1049 protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
1050 protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
1051 protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
1052 MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
1053 ),
1054 protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
1055 protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
1056 protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
1057 protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
1058 protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
1059 protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
1060 protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
1061 protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
1062 protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
1063 protobuf::remote_type::Type::MysqlText(text) => {
1064 RemoteType::Mysql(MysqlType::Text(text.length))
1065 }
1066 protobuf::remote_type::Type::MysqlBlob(blob) => {
1067 RemoteType::Mysql(MysqlType::Blob(blob.length))
1068 }
1069 protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
1070 protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
1071 protobuf::remote_type::Type::OracleVarchar2(varchar) => {
1072 RemoteType::Oracle(OracleType::Varchar2(varchar.length))
1073 }
1074 protobuf::remote_type::Type::OracleChar(char) => {
1075 RemoteType::Oracle(OracleType::Char(char.length))
1076 }
1077 protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
1078 OracleType::Number(number.precision as u8, number.scale as i8),
1079 ),
1080 protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
1081 protobuf::remote_type::Type::OracleTimestamp(_) => {
1082 RemoteType::Oracle(OracleType::Timestamp)
1083 }
1084 protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
1085 protobuf::remote_type::Type::OracleBinaryFloat(_) => {
1086 RemoteType::Oracle(OracleType::BinaryFloat)
1087 }
1088 protobuf::remote_type::Type::OracleBinaryDouble(_) => {
1089 RemoteType::Oracle(OracleType::BinaryDouble)
1090 }
1091 protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
1092 RemoteType::Oracle(OracleType::Float(*precision as u8))
1093 }
1094 protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
1095 RemoteType::Oracle(OracleType::NChar(*length))
1096 }
1097 protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
1098 RemoteType::Oracle(OracleType::NVarchar2(*length))
1099 }
1100 protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
1101 RemoteType::Oracle(OracleType::Raw(*length))
1102 }
1103 protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
1104 protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
1105 protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
1106 protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
1107 protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
1108 protobuf::remote_type::Type::OracleSdeGeometry(_) => {
1109 RemoteType::Oracle(OracleType::SdeGeometry)
1110 }
1111 protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1112 protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1113 protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1114 protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1115 protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1116 protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1117 protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1118 protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1119 protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1120 protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1121 protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1122 protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1123 RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1124 }
1125 protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1126 RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1127 }
1128 protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1129 RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1130 }
1131 protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1132 RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1133 }
1134 protobuf::remote_type::Type::DmText(_) => RemoteType::Dm(DmType::Text),
1135 protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1136 RemoteType::Dm(DmType::Binary(*length as u16))
1137 }
1138 protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1139 RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1140 }
1141 protobuf::remote_type::Type::DmImage(_) => RemoteType::Dm(DmType::Image),
1142 protobuf::remote_type::Type::DmBit(_) => RemoteType::Dm(DmType::Bit),
1143 protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1144 RemoteType::Dm(DmType::Timestamp(*precision as u8))
1145 }
1146 protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1147 RemoteType::Dm(DmType::Time(*precision as u8))
1148 }
1149 protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1150 }
1151}
1152
1153fn serialize_remote_source(source: &RemoteSource) -> protobuf::RemoteSource {
1154 match source {
1155 RemoteSource::Query(query) => protobuf::RemoteSource {
1156 source: Some(protobuf::remote_source::Source::Query(query.clone())),
1157 },
1158 RemoteSource::Table(table_identifiers) => protobuf::RemoteSource {
1159 source: Some(protobuf::remote_source::Source::Table(
1160 protobuf::Identifiers {
1161 idents: table_identifiers.clone(),
1162 },
1163 )),
1164 },
1165 }
1166}
1167
1168fn parse_remote_source(source: &protobuf::RemoteSource) -> DFResult<RemoteSource> {
1169 let source = source.source.as_ref().ok_or(DataFusionError::Internal(
1170 "remote source is not set".to_string(),
1171 ))?;
1172 match source {
1173 protobuf::remote_source::Source::Query(query) => Ok(RemoteSource::Query(query.clone())),
1174 protobuf::remote_source::Source::Table(table_identifiers) => {
1175 Ok(RemoteSource::Table(table_identifiers.idents.clone()))
1176 }
1177 }
1178}