1#[cfg(feature = "dm")]
2use crate::DmConnectionOptions;
3#[cfg(feature = "mysql")]
4use crate::MysqlConnectionOptions;
5#[cfg(feature = "oracle")]
6use crate::OracleConnectionOptions;
7#[cfg(feature = "postgres")]
8use crate::PostgresConnectionOptions;
9#[cfg(feature = "sqlite")]
10use crate::SqliteConnectionOptions;
11use crate::generated::prost as protobuf;
12use crate::{
13 ConnectionOptions, DFResult, DmType, MysqlType, OracleType, PostgresType, RemoteField,
14 RemoteSchema, RemoteSchemaRef, RemoteTableExec, RemoteType, SqliteType, Transform, connect,
15};
16use datafusion::arrow::datatypes::SchemaRef;
17use datafusion::common::DataFusionError;
18use datafusion::execution::FunctionRegistry;
19use datafusion::physical_plan::ExecutionPlan;
20use datafusion_proto::convert_required;
21use datafusion_proto::logical_plan::from_proto::parse_exprs;
22use datafusion_proto::logical_plan::to_proto::serialize_exprs;
23use datafusion_proto::logical_plan::{DefaultLogicalExtensionCodec, LogicalExtensionCodec};
24use datafusion_proto::physical_plan::PhysicalExtensionCodec;
25use datafusion_proto::protobuf::proto_error;
26use derive_with::With;
27use prost::Message;
28use std::fmt::Debug;
29#[cfg(feature = "sqlite")]
30use std::path::Path;
31use std::sync::Arc;
32
33pub trait TransformCodec: Debug + Send + Sync {
34 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
35 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
36}
37
38#[derive(Debug, With)]
39pub struct RemotePhysicalCodec {
40 transform_codec: Option<Arc<dyn TransformCodec>>,
41 logical_extension_codec: Arc<dyn LogicalExtensionCodec>,
42}
43
44impl RemotePhysicalCodec {
45 pub fn new() -> Self {
46 Self {
47 transform_codec: None,
48 logical_extension_codec: Arc::new(DefaultLogicalExtensionCodec {}),
49 }
50 }
51}
52
53impl Default for RemotePhysicalCodec {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59impl PhysicalExtensionCodec for RemotePhysicalCodec {
60 fn try_decode(
61 &self,
62 buf: &[u8],
63 _inputs: &[Arc<dyn ExecutionPlan>],
64 registry: &dyn FunctionRegistry,
65 ) -> DFResult<Arc<dyn ExecutionPlan>> {
66 let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
67 DataFusionError::Internal(format!(
68 "Failed to decode remote table execution plan: {e:?}"
69 ))
70 })?;
71
72 let transform = if let Some(bytes) = proto.transform {
73 let Some(transform_codec) = self.transform_codec.as_ref() else {
74 return Err(DataFusionError::Execution(
75 "No transform codec found".to_string(),
76 ));
77 };
78 Some(transform_codec.try_decode(&bytes)?)
79 } else {
80 None
81 };
82
83 let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
84 let remote_schema = proto
85 .remote_schema
86 .map(|schema| Arc::new(parse_remote_schema(&schema)));
87
88 let projection: Option<Vec<usize>> = proto
89 .projection
90 .map(|p| p.projection.iter().map(|n| *n as usize).collect());
91
92 let filters = parse_exprs(
93 &proto.filters,
94 registry,
95 self.logical_extension_codec.as_ref(),
96 )?;
97 let limit = proto.limit.map(|l| l as usize);
98
99 let conn_options = parse_connection_options(proto.conn_options.unwrap());
100 let conn = tokio::task::block_in_place(|| {
101 tokio::runtime::Handle::current().block_on(async {
102 let pool = connect(&conn_options).await?;
103 let conn = pool.get().await?;
104 Ok::<_, DataFusionError>(conn)
105 })
106 })?;
107
108 Ok(Arc::new(RemoteTableExec::try_new(
109 conn_options,
110 proto.sql,
111 table_schema,
112 remote_schema,
113 projection,
114 filters,
115 limit,
116 transform,
117 conn,
118 )?))
119 }
120
121 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
122 if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
123 let serialized_transform = if let Some(transform) = exec.transform.as_ref() {
124 let Some(transform_codec) = self.transform_codec.as_ref() else {
125 return Err(DataFusionError::Execution(
126 "No transform codec found".to_string(),
127 ));
128 };
129 let bytes = transform_codec.try_encode(transform.as_ref())?;
130 Some(bytes)
131 } else {
132 None
133 };
134
135 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
136 let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
137 let serialized_filters =
138 serialize_exprs(&exec.filters, self.logical_extension_codec.as_ref())?;
139
140 let proto = protobuf::RemoteTableExec {
141 conn_options: Some(serialized_connection_options),
142 sql: exec.sql.clone(),
143 table_schema: Some(exec.table_schema.as_ref().try_into()?),
144 remote_schema,
145 projection: exec
146 .projection
147 .as_ref()
148 .map(|p| serialize_projection(p.as_slice())),
149 filters: serialized_filters,
150 limit: exec.limit.map(|l| l as u32),
151 transform: serialized_transform,
152 };
153
154 proto.encode(buf).map_err(|e| {
155 DataFusionError::Internal(format!(
156 "Failed to encode remote table execution plan: {e:?}"
157 ))
158 })?;
159 Ok(())
160 } else {
161 Err(DataFusionError::Execution(format!(
162 "Failed to encode {}",
163 RemoteTableExec::static_name()
164 )))
165 }
166 }
167}
168
169fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
170 match options {
171 #[cfg(feature = "postgres")]
172 ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
173 connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
174 protobuf::PostgresConnectionOptions {
175 host: options.host.clone(),
176 port: options.port as u32,
177 username: options.username.clone(),
178 password: options.password.clone(),
179 database: options.database.clone(),
180 pool_max_size: options.pool_max_size as u32,
181 stream_chunk_size: options.stream_chunk_size as u32,
182 },
183 )),
184 },
185 #[cfg(feature = "mysql")]
186 ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
187 connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
188 protobuf::MysqlConnectionOptions {
189 host: options.host.clone(),
190 port: options.port as u32,
191 username: options.username.clone(),
192 password: options.password.clone(),
193 database: options.database.clone(),
194 pool_max_size: options.pool_max_size as u32,
195 stream_chunk_size: options.stream_chunk_size as u32,
196 },
197 )),
198 },
199 #[cfg(feature = "oracle")]
200 ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
201 connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
202 protobuf::OracleConnectionOptions {
203 host: options.host.clone(),
204 port: options.port as u32,
205 username: options.username.clone(),
206 password: options.password.clone(),
207 service_name: options.service_name.clone(),
208 pool_max_size: options.pool_max_size as u32,
209 stream_chunk_size: options.stream_chunk_size as u32,
210 },
211 )),
212 },
213 #[cfg(feature = "sqlite")]
214 ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
215 connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
216 protobuf::SqliteConnectionOptions {
217 path: options.path.to_str().unwrap().to_string(),
218 stream_chunk_size: options.stream_chunk_size as u32,
219 },
220 )),
221 },
222 #[cfg(feature = "dm")]
223 ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
224 connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
225 protobuf::DmConnectionOptions {
226 host: options.host.clone(),
227 port: options.port as u32,
228 username: options.username.clone(),
229 password: options.password.clone(),
230 schema: options.schema.clone(),
231 stream_chunk_size: options.stream_chunk_size as u32,
232 driver: options.driver.clone(),
233 },
234 )),
235 },
236 }
237}
238
239fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
240 match options.connection_options {
241 #[cfg(feature = "postgres")]
242 Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
243 ConnectionOptions::Postgres(PostgresConnectionOptions {
244 host: options.host,
245 port: options.port as u16,
246 username: options.username,
247 password: options.password,
248 database: options.database,
249 pool_max_size: options.pool_max_size as usize,
250 stream_chunk_size: options.stream_chunk_size as usize,
251 })
252 }
253 #[cfg(feature = "mysql")]
254 Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
255 ConnectionOptions::Mysql(MysqlConnectionOptions {
256 host: options.host,
257 port: options.port as u16,
258 username: options.username,
259 password: options.password,
260 database: options.database,
261 pool_max_size: options.pool_max_size as usize,
262 stream_chunk_size: options.stream_chunk_size as usize,
263 })
264 }
265 #[cfg(feature = "oracle")]
266 Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
267 ConnectionOptions::Oracle(OracleConnectionOptions {
268 host: options.host,
269 port: options.port as u16,
270 username: options.username,
271 password: options.password,
272 service_name: options.service_name,
273 pool_max_size: options.pool_max_size as usize,
274 stream_chunk_size: options.stream_chunk_size as usize,
275 })
276 }
277 #[cfg(feature = "sqlite")]
278 Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
279 ConnectionOptions::Sqlite(SqliteConnectionOptions {
280 path: Path::new(&options.path).to_path_buf(),
281 stream_chunk_size: options.stream_chunk_size as usize,
282 })
283 }
284 #[cfg(feature = "dm")]
285 Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
286 ConnectionOptions::Dm(DmConnectionOptions {
287 host: options.host,
288 port: options.port as u16,
289 username: options.username,
290 password: options.password,
291 schema: options.schema,
292 stream_chunk_size: options.stream_chunk_size as usize,
293 driver: options.driver,
294 })
295 }
296 _ => panic!("Failed to parse connection options: {options:?}"),
297 }
298}
299
300fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
301 protobuf::Projection {
302 projection: projection.iter().map(|n| *n as u32).collect(),
303 }
304}
305
306fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
307 let fields = remote_schema
308 .fields
309 .iter()
310 .map(serialize_remote_field)
311 .collect::<Vec<_>>();
312
313 protobuf::RemoteSchema { fields }
314}
315
316fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
317 protobuf::RemoteField {
318 name: remote_field.name.clone(),
319 remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
320 nullable: remote_field.nullable,
321 }
322}
323
324fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
325 match remote_type {
326 RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
327 r#type: Some(protobuf::remote_type::Type::PostgresInt2(
328 protobuf::PostgresInt2 {},
329 )),
330 },
331 RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
332 r#type: Some(protobuf::remote_type::Type::PostgresInt4(
333 protobuf::PostgresInt4 {},
334 )),
335 },
336 RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
337 r#type: Some(protobuf::remote_type::Type::PostgresInt8(
338 protobuf::PostgresInt8 {},
339 )),
340 },
341 RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
342 r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
343 protobuf::PostgresFloat4 {},
344 )),
345 },
346 RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
347 r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
348 protobuf::PostgresFloat8 {},
349 )),
350 },
351 RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
352 r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
353 protobuf::PostgresNumeric {
354 scale: *scale as i32,
355 },
356 )),
357 },
358 RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
359 r#type: Some(protobuf::remote_type::Type::PostgresName(
360 protobuf::PostgresName {},
361 )),
362 },
363 RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
364 r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
365 protobuf::PostgresVarchar {},
366 )),
367 },
368 RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
369 r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
370 protobuf::PostgresBpchar {},
371 )),
372 },
373 RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
374 r#type: Some(protobuf::remote_type::Type::PostgresText(
375 protobuf::PostgresText {},
376 )),
377 },
378 RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
379 r#type: Some(protobuf::remote_type::Type::PostgresBytea(
380 protobuf::PostgresBytea {},
381 )),
382 },
383 RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
384 r#type: Some(protobuf::remote_type::Type::PostgresDate(
385 protobuf::PostgresDate {},
386 )),
387 },
388 RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
389 r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
390 protobuf::PostgresTimestamp {},
391 )),
392 },
393 RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
394 r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
395 protobuf::PostgresTimestampTz {},
396 )),
397 },
398 RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
399 r#type: Some(protobuf::remote_type::Type::PostgresTime(
400 protobuf::PostgresTime {},
401 )),
402 },
403 RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
404 r#type: Some(protobuf::remote_type::Type::PostgresInterval(
405 protobuf::PostgresInterval {},
406 )),
407 },
408 RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
409 r#type: Some(protobuf::remote_type::Type::PostgresBool(
410 protobuf::PostgresBool {},
411 )),
412 },
413 RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
414 r#type: Some(protobuf::remote_type::Type::PostgresJson(
415 protobuf::PostgresJson {},
416 )),
417 },
418 RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
419 r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
420 protobuf::PostgresJsonb {},
421 )),
422 },
423 RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
424 r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
425 protobuf::PostgresInt2Array {},
426 )),
427 },
428 RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
429 r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
430 protobuf::PostgresInt4Array {},
431 )),
432 },
433 RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
434 r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
435 protobuf::PostgresInt8Array {},
436 )),
437 },
438 RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
439 r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
440 protobuf::PostgresFloat4Array {},
441 )),
442 },
443 RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
444 r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
445 protobuf::PostgresFloat8Array {},
446 )),
447 },
448 RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
449 r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
450 protobuf::PostgresVarcharArray {},
451 )),
452 },
453 RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
454 r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
455 protobuf::PostgresBpcharArray {},
456 )),
457 },
458 RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
459 r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
460 protobuf::PostgresTextArray {},
461 )),
462 },
463 RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
464 r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
465 protobuf::PostgresByteaArray {},
466 )),
467 },
468 RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
469 r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
470 protobuf::PostgresBoolArray {},
471 )),
472 },
473 RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
474 r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
475 protobuf::PostgresPostGisGeometry {},
476 )),
477 },
478 RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
479 r#type: Some(protobuf::remote_type::Type::PostgresOid(
480 protobuf::PostgresOid {},
481 )),
482 },
483
484 RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
485 r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
486 protobuf::MysqlTinyInt {},
487 )),
488 },
489 RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
490 r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
491 protobuf::MysqlTinyIntUnsigned {},
492 )),
493 },
494 RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
495 r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
496 protobuf::MysqlSmallInt {},
497 )),
498 },
499 RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
500 r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
501 protobuf::MysqlSmallIntUnsigned {},
502 )),
503 },
504 RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
505 r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
506 protobuf::MysqlMediumInt {},
507 )),
508 },
509 RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
510 r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
511 protobuf::MysqlMediumIntUnsigned {},
512 )),
513 },
514 RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
515 r#type: Some(protobuf::remote_type::Type::MysqlInteger(
516 protobuf::MysqlInteger {},
517 )),
518 },
519 RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
520 r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
521 protobuf::MysqlIntegerUnsigned {},
522 )),
523 },
524 RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
525 r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
526 protobuf::MysqlBigInt {},
527 )),
528 },
529 RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
530 r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
531 protobuf::MysqlBigIntUnsigned {},
532 )),
533 },
534 RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
535 r#type: Some(protobuf::remote_type::Type::MysqlFloat(
536 protobuf::MysqlFloat {},
537 )),
538 },
539 RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
540 r#type: Some(protobuf::remote_type::Type::MysqlDouble(
541 protobuf::MysqlDouble {},
542 )),
543 },
544 RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
545 r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
546 protobuf::MysqlDecimal {
547 precision: *precision as u32,
548 scale: *scale as u32,
549 },
550 )),
551 },
552 RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
553 r#type: Some(protobuf::remote_type::Type::MysqlDate(
554 protobuf::MysqlDate {},
555 )),
556 },
557 RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
558 r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
559 protobuf::MysqlDateTime {},
560 )),
561 },
562 RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
563 r#type: Some(protobuf::remote_type::Type::MysqlTime(
564 protobuf::MysqlTime {},
565 )),
566 },
567 RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
568 r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
569 protobuf::MysqlTimestamp {},
570 )),
571 },
572 RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
573 r#type: Some(protobuf::remote_type::Type::MysqlYear(
574 protobuf::MysqlYear {},
575 )),
576 },
577 RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
578 r#type: Some(protobuf::remote_type::Type::MysqlChar(
579 protobuf::MysqlChar {},
580 )),
581 },
582 RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
583 r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
584 protobuf::MysqlVarchar {},
585 )),
586 },
587 RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
588 r#type: Some(protobuf::remote_type::Type::MysqlBinary(
589 protobuf::MysqlBinary {},
590 )),
591 },
592 RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
593 r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
594 protobuf::MysqlVarbinary {},
595 )),
596 },
597 RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
598 r#type: Some(protobuf::remote_type::Type::MysqlText(
599 protobuf::MysqlText { length: *len },
600 )),
601 },
602 RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
603 r#type: Some(protobuf::remote_type::Type::MysqlBlob(
604 protobuf::MysqlBlob { length: *len },
605 )),
606 },
607 RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
608 r#type: Some(protobuf::remote_type::Type::MysqlJson(
609 protobuf::MysqlJson {},
610 )),
611 },
612 RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
613 r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
614 protobuf::MysqlGeometry {},
615 )),
616 },
617
618 RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
619 r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
620 protobuf::OracleVarchar2 { length: *len },
621 )),
622 },
623 RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
624 r#type: Some(protobuf::remote_type::Type::OracleChar(
625 protobuf::OracleChar { length: *len },
626 )),
627 },
628 RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
629 r#type: Some(protobuf::remote_type::Type::OracleNumber(
630 protobuf::OracleNumber {
631 precision: *precision as u32,
632 scale: *scale as i32,
633 },
634 )),
635 },
636 RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
637 r#type: Some(protobuf::remote_type::Type::OracleDate(
638 protobuf::OracleDate {},
639 )),
640 },
641 RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
642 r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
643 protobuf::OracleTimestamp {},
644 )),
645 },
646 RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
647 r#type: Some(protobuf::remote_type::Type::OracleBoolean(
648 protobuf::OracleBoolean {},
649 )),
650 },
651 RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
652 r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
653 protobuf::OracleBinaryFloat {},
654 )),
655 },
656 RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
657 r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
658 protobuf::OracleBinaryDouble {},
659 )),
660 },
661 RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
662 r#type: Some(protobuf::remote_type::Type::OracleBlob(
663 protobuf::OracleBlob {},
664 )),
665 },
666 RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
667 r#type: Some(protobuf::remote_type::Type::OracleFloat(
668 protobuf::OracleFloat {
669 precision: *precision as u32,
670 },
671 )),
672 },
673 RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
674 r#type: Some(protobuf::remote_type::Type::OracleNchar(
675 protobuf::OracleNChar { length: *len },
676 )),
677 },
678 RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
679 r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
680 protobuf::OracleNVarchar2 { length: *len },
681 )),
682 },
683 RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
684 r#type: Some(protobuf::remote_type::Type::OracleRaw(
685 protobuf::OracleRaw { length: *len },
686 )),
687 },
688 RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
689 r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
690 protobuf::OracleLongRaw {},
691 )),
692 },
693 RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
694 r#type: Some(protobuf::remote_type::Type::OracleLong(
695 protobuf::OracleLong {},
696 )),
697 },
698 RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
699 r#type: Some(protobuf::remote_type::Type::OracleClob(
700 protobuf::OracleClob {},
701 )),
702 },
703 RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
704 r#type: Some(protobuf::remote_type::Type::OracleNclob(
705 protobuf::OracleNClob {},
706 )),
707 },
708 RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
709 r#type: Some(protobuf::remote_type::Type::SqliteNull(
710 protobuf::SqliteNull {},
711 )),
712 },
713 RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
714 r#type: Some(protobuf::remote_type::Type::SqliteInteger(
715 protobuf::SqliteInteger {},
716 )),
717 },
718 RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
719 r#type: Some(protobuf::remote_type::Type::SqliteReal(
720 protobuf::SqliteReal {},
721 )),
722 },
723 RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
724 r#type: Some(protobuf::remote_type::Type::SqliteText(
725 protobuf::SqliteText {},
726 )),
727 },
728 RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
729 r#type: Some(protobuf::remote_type::Type::SqliteBlob(
730 protobuf::SqliteBlob {},
731 )),
732 },
733 RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
734 r#type: Some(protobuf::remote_type::Type::DmTinyInt(
735 protobuf::DmTinyInt {},
736 )),
737 },
738 RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
739 r#type: Some(protobuf::remote_type::Type::DmSmallInt(
740 protobuf::DmSmallInt {},
741 )),
742 },
743 RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
744 r#type: Some(protobuf::remote_type::Type::DmInteger(
745 protobuf::DmInteger {},
746 )),
747 },
748 RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
749 r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
750 },
751 RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
752 r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
753 },
754 RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
755 r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
756 },
757 RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
758 r#type: Some(protobuf::remote_type::Type::DmNumeric(
759 protobuf::DmNumeric {
760 precision: *precision as u32,
761 scale: *scale as i32,
762 },
763 )),
764 },
765 RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
766 r#type: Some(protobuf::remote_type::Type::DmDecimal(
767 protobuf::DmDecimal {
768 precision: *precision as u32,
769 scale: *scale as i32,
770 },
771 )),
772 },
773 RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
774 r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
775 length: len.map(|s| s as u32),
776 })),
777 },
778 RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
779 r#type: Some(protobuf::remote_type::Type::DmVarchar(
780 protobuf::DmVarchar {
781 length: len.map(|s| s as u32),
782 },
783 )),
784 },
785 RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
786 r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
787 },
788 RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
789 r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
790 length: *len as u32,
791 })),
792 },
793 RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
794 r#type: Some(protobuf::remote_type::Type::DmVarbinary(
795 protobuf::DmVarbinary {
796 length: len.map(|s| s as u32),
797 },
798 )),
799 },
800 RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
801 r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
802 },
803 RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
804 r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
805 },
806 RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
807 r#type: Some(protobuf::remote_type::Type::DmTimestamp(
808 protobuf::DmTimestamp {
809 precision: *precision as u32,
810 },
811 )),
812 },
813 RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
814 r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
815 precision: *precision as u32,
816 })),
817 },
818 RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
819 r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
820 },
821 }
822}
823
824fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
825 let fields = remote_schema
826 .fields
827 .iter()
828 .map(parse_remote_field)
829 .collect::<Vec<_>>();
830
831 RemoteSchema { fields }
832}
833
834fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
835 RemoteField {
836 name: field.name.clone(),
837 remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
838 nullable: field.nullable,
839 }
840}
841
842fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
843 match remote_type.r#type.as_ref().unwrap() {
844 protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
845 protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
846 protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
847 protobuf::remote_type::Type::PostgresFloat4(_) => {
848 RemoteType::Postgres(PostgresType::Float4)
849 }
850 protobuf::remote_type::Type::PostgresFloat8(_) => {
851 RemoteType::Postgres(PostgresType::Float8)
852 }
853 protobuf::remote_type::Type::PostgresNumeric(numeric) => {
854 RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
855 }
856 protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
857 protobuf::remote_type::Type::PostgresVarchar(_) => {
858 RemoteType::Postgres(PostgresType::Varchar)
859 }
860 protobuf::remote_type::Type::PostgresBpchar(_) => {
861 RemoteType::Postgres(PostgresType::Bpchar)
862 }
863 protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
864 protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
865 protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
866 protobuf::remote_type::Type::PostgresTimestamp(_) => {
867 RemoteType::Postgres(PostgresType::Timestamp)
868 }
869 protobuf::remote_type::Type::PostgresTimestampTz(_) => {
870 RemoteType::Postgres(PostgresType::TimestampTz)
871 }
872 protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
873 protobuf::remote_type::Type::PostgresInterval(_) => {
874 RemoteType::Postgres(PostgresType::Interval)
875 }
876 protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
877 protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
878 protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
879 protobuf::remote_type::Type::PostgresInt2Array(_) => {
880 RemoteType::Postgres(PostgresType::Int2Array)
881 }
882 protobuf::remote_type::Type::PostgresInt4Array(_) => {
883 RemoteType::Postgres(PostgresType::Int4Array)
884 }
885 protobuf::remote_type::Type::PostgresInt8Array(_) => {
886 RemoteType::Postgres(PostgresType::Int8Array)
887 }
888 protobuf::remote_type::Type::PostgresFloat4Array(_) => {
889 RemoteType::Postgres(PostgresType::Float4Array)
890 }
891 protobuf::remote_type::Type::PostgresFloat8Array(_) => {
892 RemoteType::Postgres(PostgresType::Float8Array)
893 }
894 protobuf::remote_type::Type::PostgresVarcharArray(_) => {
895 RemoteType::Postgres(PostgresType::VarcharArray)
896 }
897 protobuf::remote_type::Type::PostgresBpcharArray(_) => {
898 RemoteType::Postgres(PostgresType::BpcharArray)
899 }
900 protobuf::remote_type::Type::PostgresTextArray(_) => {
901 RemoteType::Postgres(PostgresType::TextArray)
902 }
903 protobuf::remote_type::Type::PostgresByteaArray(_) => {
904 RemoteType::Postgres(PostgresType::ByteaArray)
905 }
906 protobuf::remote_type::Type::PostgresBoolArray(_) => {
907 RemoteType::Postgres(PostgresType::BoolArray)
908 }
909 protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
910 RemoteType::Postgres(PostgresType::PostGisGeometry)
911 }
912 protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
913 protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
914 protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
915 RemoteType::Mysql(MysqlType::TinyIntUnsigned)
916 }
917 protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
918 protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
919 RemoteType::Mysql(MysqlType::SmallIntUnsigned)
920 }
921 protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
922 protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
923 RemoteType::Mysql(MysqlType::MediumIntUnsigned)
924 }
925 protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
926 protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
927 RemoteType::Mysql(MysqlType::IntegerUnsigned)
928 }
929 protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
930 protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
931 RemoteType::Mysql(MysqlType::BigIntUnsigned)
932 }
933 protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
934 protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
935 protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
936 MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
937 ),
938 protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
939 protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
940 protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
941 protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
942 protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
943 protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
944 protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
945 protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
946 protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
947 protobuf::remote_type::Type::MysqlText(text) => {
948 RemoteType::Mysql(MysqlType::Text(text.length))
949 }
950 protobuf::remote_type::Type::MysqlBlob(blob) => {
951 RemoteType::Mysql(MysqlType::Blob(blob.length))
952 }
953 protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
954 protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
955 protobuf::remote_type::Type::OracleVarchar2(varchar) => {
956 RemoteType::Oracle(OracleType::Varchar2(varchar.length))
957 }
958 protobuf::remote_type::Type::OracleChar(char) => {
959 RemoteType::Oracle(OracleType::Char(char.length))
960 }
961 protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
962 OracleType::Number(number.precision as u8, number.scale as i8),
963 ),
964 protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
965 protobuf::remote_type::Type::OracleTimestamp(_) => {
966 RemoteType::Oracle(OracleType::Timestamp)
967 }
968 protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
969 protobuf::remote_type::Type::OracleBinaryFloat(_) => {
970 RemoteType::Oracle(OracleType::BinaryFloat)
971 }
972 protobuf::remote_type::Type::OracleBinaryDouble(_) => {
973 RemoteType::Oracle(OracleType::BinaryDouble)
974 }
975 protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
976 RemoteType::Oracle(OracleType::Float(*precision as u8))
977 }
978 protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
979 RemoteType::Oracle(OracleType::NChar(*length))
980 }
981 protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
982 RemoteType::Oracle(OracleType::NVarchar2(*length))
983 }
984 protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
985 RemoteType::Oracle(OracleType::Raw(*length))
986 }
987 protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
988 protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
989 protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
990 protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
991 protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
992 protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
993 protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
994 protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
995 protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
996 protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
997 protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
998 protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
999 protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1000 protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1001 protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1002 protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1003 protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1004 RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1005 }
1006 protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1007 RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1008 }
1009 protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1010 RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1011 }
1012 protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1013 RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1014 }
1015 protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1016 protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1017 RemoteType::Dm(DmType::Binary(*length as u16))
1018 }
1019 protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1020 RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1021 }
1022 protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1023 protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1024 protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1025 RemoteType::Dm(DmType::Timestamp(*precision as u8))
1026 }
1027 protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1028 RemoteType::Dm(DmType::Time(*precision as u8))
1029 }
1030 protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1031 }
1032}