1#[cfg(feature = "dm")]
2use crate::DmConnectionOptions;
3#[cfg(feature = "mysql")]
4use crate::MysqlConnectionOptions;
5#[cfg(feature = "oracle")]
6use crate::OracleConnectionOptions;
7#[cfg(feature = "postgres")]
8use crate::PostgresConnectionOptions;
9#[cfg(feature = "sqlite")]
10use crate::SqliteConnectionOptions;
11use crate::generated::prost as protobuf;
12use crate::{
13 ConnectionOptions, DFResult, DefaultTransform, DmType, MysqlType, OracleType, PostgresType,
14 RemoteField, RemoteSchema, RemoteSchemaRef, RemoteTableExec, RemoteType, SqliteType, Transform,
15 connect,
16};
17use datafusion::arrow::datatypes::SchemaRef;
18use datafusion::common::DataFusionError;
19use datafusion::execution::FunctionRegistry;
20use datafusion::physical_plan::ExecutionPlan;
21use datafusion_proto::convert_required;
22use datafusion_proto::physical_plan::PhysicalExtensionCodec;
23use datafusion_proto::protobuf::proto_error;
24use derive_with::With;
25use prost::Message;
26use std::fmt::Debug;
27#[cfg(feature = "sqlite")]
28use std::path::Path;
29use std::sync::Arc;
30
31pub trait TransformCodec: Debug + Send + Sync {
32 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
33 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
34}
35
36#[derive(Debug)]
37pub struct DefaultTransformCodec {}
38
39const DEFAULT_TRANSFORM_ID: &str = "__default";
40
41impl TransformCodec for DefaultTransformCodec {
42 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
43 if value.as_any().is::<DefaultTransform>() {
44 Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
45 } else {
46 Err(DataFusionError::Execution(format!(
47 "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
48 )))
49 }
50 }
51
52 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
53 if value == DEFAULT_TRANSFORM_ID.as_bytes() {
54 Ok(Arc::new(DefaultTransform {}))
55 } else {
56 Err(DataFusionError::Execution(
57 "DefaultTransformCodec only supports DefaultTransform".to_string(),
58 ))
59 }
60 }
61}
62
63#[derive(Debug, With)]
64pub struct RemotePhysicalCodec {
65 transform_codec: Arc<dyn TransformCodec>,
66}
67
68impl RemotePhysicalCodec {
69 pub fn new() -> Self {
70 Self {
71 transform_codec: Arc::new(DefaultTransformCodec {}),
72 }
73 }
74}
75
76impl Default for RemotePhysicalCodec {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82impl PhysicalExtensionCodec for RemotePhysicalCodec {
83 fn try_decode(
84 &self,
85 buf: &[u8],
86 _inputs: &[Arc<dyn ExecutionPlan>],
87 _registry: &dyn FunctionRegistry,
88 ) -> DFResult<Arc<dyn ExecutionPlan>> {
89 let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
90 DataFusionError::Internal(format!(
91 "Failed to decode remote table execution plan: {e:?}"
92 ))
93 })?;
94
95 let transform = if proto.transform == DEFAULT_TRANSFORM_ID.as_bytes() {
96 Arc::new(DefaultTransform {})
97 } else {
98 self.transform_codec.try_decode(&proto.transform)?
99 };
100
101 let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
102 let remote_schema = proto
103 .remote_schema
104 .map(|schema| Arc::new(parse_remote_schema(&schema)));
105
106 let projection: Option<Vec<usize>> = proto
107 .projection
108 .map(|p| p.projection.iter().map(|n| *n as usize).collect());
109
110 let limit = proto.limit.map(|l| l as usize);
111
112 let conn_options = parse_connection_options(proto.conn_options.unwrap());
113 let conn = tokio::task::block_in_place(|| {
114 tokio::runtime::Handle::current().block_on(async {
115 let pool = connect(&conn_options).await?;
116 let conn = pool.get().await?;
117 Ok::<_, DataFusionError>(conn)
118 })
119 })?;
120
121 Ok(Arc::new(RemoteTableExec::try_new(
122 conn_options,
123 proto.sql,
124 table_schema,
125 remote_schema,
126 projection,
127 proto.unparsed_filters,
128 limit,
129 transform,
130 conn,
131 )?))
132 }
133
134 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
135 if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
136 let serialized_transform = if exec.transform.as_any().is::<DefaultTransform>() {
137 DefaultTransformCodec {}.try_encode(exec.transform.as_ref())?
138 } else {
139 let bytes = self.transform_codec.try_encode(exec.transform.as_ref())?;
140 assert_ne!(bytes, DEFAULT_TRANSFORM_ID.as_bytes());
141 bytes
142 };
143
144 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
145 let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
146
147 let proto = protobuf::RemoteTableExec {
148 conn_options: Some(serialized_connection_options),
149 sql: exec.sql.clone(),
150 table_schema: Some(exec.table_schema.as_ref().try_into()?),
151 remote_schema,
152 projection: exec
153 .projection
154 .as_ref()
155 .map(|p| serialize_projection(p.as_slice())),
156 unparsed_filters: exec.unparsed_filters.clone(),
157 limit: exec.limit.map(|l| l as u32),
158 transform: serialized_transform,
159 };
160
161 proto.encode(buf).map_err(|e| {
162 DataFusionError::Internal(format!(
163 "Failed to encode remote table execution plan: {e:?}"
164 ))
165 })?;
166 Ok(())
167 } else {
168 Err(DataFusionError::Execution(format!(
169 "Failed to encode {}",
170 RemoteTableExec::static_name()
171 )))
172 }
173 }
174}
175
176fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
177 match options {
178 #[cfg(feature = "postgres")]
179 ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
180 connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
181 protobuf::PostgresConnectionOptions {
182 host: options.host.clone(),
183 port: options.port as u32,
184 username: options.username.clone(),
185 password: options.password.clone(),
186 database: options.database.clone(),
187 pool_max_size: options.pool_max_size as u32,
188 stream_chunk_size: options.stream_chunk_size as u32,
189 },
190 )),
191 },
192 #[cfg(feature = "mysql")]
193 ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
194 connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
195 protobuf::MysqlConnectionOptions {
196 host: options.host.clone(),
197 port: options.port as u32,
198 username: options.username.clone(),
199 password: options.password.clone(),
200 database: options.database.clone(),
201 pool_max_size: options.pool_max_size as u32,
202 stream_chunk_size: options.stream_chunk_size as u32,
203 },
204 )),
205 },
206 #[cfg(feature = "oracle")]
207 ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
208 connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
209 protobuf::OracleConnectionOptions {
210 host: options.host.clone(),
211 port: options.port as u32,
212 username: options.username.clone(),
213 password: options.password.clone(),
214 service_name: options.service_name.clone(),
215 pool_max_size: options.pool_max_size as u32,
216 stream_chunk_size: options.stream_chunk_size as u32,
217 },
218 )),
219 },
220 #[cfg(feature = "sqlite")]
221 ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
222 connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
223 protobuf::SqliteConnectionOptions {
224 path: options.path.to_str().unwrap().to_string(),
225 stream_chunk_size: options.stream_chunk_size as u32,
226 },
227 )),
228 },
229 #[cfg(feature = "dm")]
230 ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
231 connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
232 protobuf::DmConnectionOptions {
233 host: options.host.clone(),
234 port: options.port as u32,
235 username: options.username.clone(),
236 password: options.password.clone(),
237 schema: options.schema.clone(),
238 stream_chunk_size: options.stream_chunk_size as u32,
239 driver: options.driver.clone(),
240 },
241 )),
242 },
243 }
244}
245
246fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
247 match options.connection_options {
248 #[cfg(feature = "postgres")]
249 Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
250 ConnectionOptions::Postgres(PostgresConnectionOptions {
251 host: options.host,
252 port: options.port as u16,
253 username: options.username,
254 password: options.password,
255 database: options.database,
256 pool_max_size: options.pool_max_size as usize,
257 stream_chunk_size: options.stream_chunk_size as usize,
258 })
259 }
260 #[cfg(feature = "mysql")]
261 Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
262 ConnectionOptions::Mysql(MysqlConnectionOptions {
263 host: options.host,
264 port: options.port as u16,
265 username: options.username,
266 password: options.password,
267 database: options.database,
268 pool_max_size: options.pool_max_size as usize,
269 stream_chunk_size: options.stream_chunk_size as usize,
270 })
271 }
272 #[cfg(feature = "oracle")]
273 Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
274 ConnectionOptions::Oracle(OracleConnectionOptions {
275 host: options.host,
276 port: options.port as u16,
277 username: options.username,
278 password: options.password,
279 service_name: options.service_name,
280 pool_max_size: options.pool_max_size as usize,
281 stream_chunk_size: options.stream_chunk_size as usize,
282 })
283 }
284 #[cfg(feature = "sqlite")]
285 Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
286 ConnectionOptions::Sqlite(SqliteConnectionOptions {
287 path: Path::new(&options.path).to_path_buf(),
288 stream_chunk_size: options.stream_chunk_size as usize,
289 })
290 }
291 #[cfg(feature = "dm")]
292 Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
293 ConnectionOptions::Dm(DmConnectionOptions {
294 host: options.host,
295 port: options.port as u16,
296 username: options.username,
297 password: options.password,
298 schema: options.schema,
299 stream_chunk_size: options.stream_chunk_size as usize,
300 driver: options.driver,
301 })
302 }
303 _ => panic!("Failed to parse connection options: {options:?}"),
304 }
305}
306
307fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
308 protobuf::Projection {
309 projection: projection.iter().map(|n| *n as u32).collect(),
310 }
311}
312
313fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
314 let fields = remote_schema
315 .fields
316 .iter()
317 .map(serialize_remote_field)
318 .collect::<Vec<_>>();
319
320 protobuf::RemoteSchema { fields }
321}
322
323fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
324 protobuf::RemoteField {
325 name: remote_field.name.clone(),
326 remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
327 nullable: remote_field.nullable,
328 }
329}
330
331fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
332 match remote_type {
333 RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
334 r#type: Some(protobuf::remote_type::Type::PostgresInt2(
335 protobuf::PostgresInt2 {},
336 )),
337 },
338 RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
339 r#type: Some(protobuf::remote_type::Type::PostgresInt4(
340 protobuf::PostgresInt4 {},
341 )),
342 },
343 RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
344 r#type: Some(protobuf::remote_type::Type::PostgresInt8(
345 protobuf::PostgresInt8 {},
346 )),
347 },
348 RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
349 r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
350 protobuf::PostgresFloat4 {},
351 )),
352 },
353 RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
354 r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
355 protobuf::PostgresFloat8 {},
356 )),
357 },
358 RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
359 r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
360 protobuf::PostgresNumeric {
361 scale: *scale as i32,
362 },
363 )),
364 },
365 RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
366 r#type: Some(protobuf::remote_type::Type::PostgresName(
367 protobuf::PostgresName {},
368 )),
369 },
370 RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
371 r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
372 protobuf::PostgresVarchar {},
373 )),
374 },
375 RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
376 r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
377 protobuf::PostgresBpchar {},
378 )),
379 },
380 RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
381 r#type: Some(protobuf::remote_type::Type::PostgresText(
382 protobuf::PostgresText {},
383 )),
384 },
385 RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
386 r#type: Some(protobuf::remote_type::Type::PostgresBytea(
387 protobuf::PostgresBytea {},
388 )),
389 },
390 RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
391 r#type: Some(protobuf::remote_type::Type::PostgresDate(
392 protobuf::PostgresDate {},
393 )),
394 },
395 RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
396 r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
397 protobuf::PostgresTimestamp {},
398 )),
399 },
400 RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
401 r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
402 protobuf::PostgresTimestampTz {},
403 )),
404 },
405 RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
406 r#type: Some(protobuf::remote_type::Type::PostgresTime(
407 protobuf::PostgresTime {},
408 )),
409 },
410 RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
411 r#type: Some(protobuf::remote_type::Type::PostgresInterval(
412 protobuf::PostgresInterval {},
413 )),
414 },
415 RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
416 r#type: Some(protobuf::remote_type::Type::PostgresBool(
417 protobuf::PostgresBool {},
418 )),
419 },
420 RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
421 r#type: Some(protobuf::remote_type::Type::PostgresJson(
422 protobuf::PostgresJson {},
423 )),
424 },
425 RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
426 r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
427 protobuf::PostgresJsonb {},
428 )),
429 },
430 RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
431 r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
432 protobuf::PostgresInt2Array {},
433 )),
434 },
435 RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
436 r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
437 protobuf::PostgresInt4Array {},
438 )),
439 },
440 RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
441 r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
442 protobuf::PostgresInt8Array {},
443 )),
444 },
445 RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
446 r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
447 protobuf::PostgresFloat4Array {},
448 )),
449 },
450 RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
451 r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
452 protobuf::PostgresFloat8Array {},
453 )),
454 },
455 RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
456 r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
457 protobuf::PostgresVarcharArray {},
458 )),
459 },
460 RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
461 r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
462 protobuf::PostgresBpcharArray {},
463 )),
464 },
465 RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
466 r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
467 protobuf::PostgresTextArray {},
468 )),
469 },
470 RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
471 r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
472 protobuf::PostgresByteaArray {},
473 )),
474 },
475 RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
476 r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
477 protobuf::PostgresBoolArray {},
478 )),
479 },
480 RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
481 r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
482 protobuf::PostgresPostGisGeometry {},
483 )),
484 },
485 RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
486 r#type: Some(protobuf::remote_type::Type::PostgresOid(
487 protobuf::PostgresOid {},
488 )),
489 },
490
491 RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
492 r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
493 protobuf::MysqlTinyInt {},
494 )),
495 },
496 RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
497 r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
498 protobuf::MysqlTinyIntUnsigned {},
499 )),
500 },
501 RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
502 r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
503 protobuf::MysqlSmallInt {},
504 )),
505 },
506 RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
507 r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
508 protobuf::MysqlSmallIntUnsigned {},
509 )),
510 },
511 RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
512 r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
513 protobuf::MysqlMediumInt {},
514 )),
515 },
516 RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
517 r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
518 protobuf::MysqlMediumIntUnsigned {},
519 )),
520 },
521 RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
522 r#type: Some(protobuf::remote_type::Type::MysqlInteger(
523 protobuf::MysqlInteger {},
524 )),
525 },
526 RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
527 r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
528 protobuf::MysqlIntegerUnsigned {},
529 )),
530 },
531 RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
532 r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
533 protobuf::MysqlBigInt {},
534 )),
535 },
536 RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
537 r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
538 protobuf::MysqlBigIntUnsigned {},
539 )),
540 },
541 RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
542 r#type: Some(protobuf::remote_type::Type::MysqlFloat(
543 protobuf::MysqlFloat {},
544 )),
545 },
546 RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
547 r#type: Some(protobuf::remote_type::Type::MysqlDouble(
548 protobuf::MysqlDouble {},
549 )),
550 },
551 RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
552 r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
553 protobuf::MysqlDecimal {
554 precision: *precision as u32,
555 scale: *scale as u32,
556 },
557 )),
558 },
559 RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
560 r#type: Some(protobuf::remote_type::Type::MysqlDate(
561 protobuf::MysqlDate {},
562 )),
563 },
564 RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
565 r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
566 protobuf::MysqlDateTime {},
567 )),
568 },
569 RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
570 r#type: Some(protobuf::remote_type::Type::MysqlTime(
571 protobuf::MysqlTime {},
572 )),
573 },
574 RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
575 r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
576 protobuf::MysqlTimestamp {},
577 )),
578 },
579 RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
580 r#type: Some(protobuf::remote_type::Type::MysqlYear(
581 protobuf::MysqlYear {},
582 )),
583 },
584 RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
585 r#type: Some(protobuf::remote_type::Type::MysqlChar(
586 protobuf::MysqlChar {},
587 )),
588 },
589 RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
590 r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
591 protobuf::MysqlVarchar {},
592 )),
593 },
594 RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
595 r#type: Some(protobuf::remote_type::Type::MysqlBinary(
596 protobuf::MysqlBinary {},
597 )),
598 },
599 RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
600 r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
601 protobuf::MysqlVarbinary {},
602 )),
603 },
604 RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
605 r#type: Some(protobuf::remote_type::Type::MysqlText(
606 protobuf::MysqlText { length: *len },
607 )),
608 },
609 RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
610 r#type: Some(protobuf::remote_type::Type::MysqlBlob(
611 protobuf::MysqlBlob { length: *len },
612 )),
613 },
614 RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
615 r#type: Some(protobuf::remote_type::Type::MysqlJson(
616 protobuf::MysqlJson {},
617 )),
618 },
619 RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
620 r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
621 protobuf::MysqlGeometry {},
622 )),
623 },
624
625 RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
626 r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
627 protobuf::OracleVarchar2 { length: *len },
628 )),
629 },
630 RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
631 r#type: Some(protobuf::remote_type::Type::OracleChar(
632 protobuf::OracleChar { length: *len },
633 )),
634 },
635 RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
636 r#type: Some(protobuf::remote_type::Type::OracleNumber(
637 protobuf::OracleNumber {
638 precision: *precision as u32,
639 scale: *scale as i32,
640 },
641 )),
642 },
643 RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
644 r#type: Some(protobuf::remote_type::Type::OracleDate(
645 protobuf::OracleDate {},
646 )),
647 },
648 RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
649 r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
650 protobuf::OracleTimestamp {},
651 )),
652 },
653 RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
654 r#type: Some(protobuf::remote_type::Type::OracleBoolean(
655 protobuf::OracleBoolean {},
656 )),
657 },
658 RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
659 r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
660 protobuf::OracleBinaryFloat {},
661 )),
662 },
663 RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
664 r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
665 protobuf::OracleBinaryDouble {},
666 )),
667 },
668 RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
669 r#type: Some(protobuf::remote_type::Type::OracleBlob(
670 protobuf::OracleBlob {},
671 )),
672 },
673 RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
674 r#type: Some(protobuf::remote_type::Type::OracleFloat(
675 protobuf::OracleFloat {
676 precision: *precision as u32,
677 },
678 )),
679 },
680 RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
681 r#type: Some(protobuf::remote_type::Type::OracleNchar(
682 protobuf::OracleNChar { length: *len },
683 )),
684 },
685 RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
686 r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
687 protobuf::OracleNVarchar2 { length: *len },
688 )),
689 },
690 RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
691 r#type: Some(protobuf::remote_type::Type::OracleRaw(
692 protobuf::OracleRaw { length: *len },
693 )),
694 },
695 RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
696 r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
697 protobuf::OracleLongRaw {},
698 )),
699 },
700 RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
701 r#type: Some(protobuf::remote_type::Type::OracleLong(
702 protobuf::OracleLong {},
703 )),
704 },
705 RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
706 r#type: Some(protobuf::remote_type::Type::OracleClob(
707 protobuf::OracleClob {},
708 )),
709 },
710 RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
711 r#type: Some(protobuf::remote_type::Type::OracleNclob(
712 protobuf::OracleNClob {},
713 )),
714 },
715 RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
716 r#type: Some(protobuf::remote_type::Type::SqliteNull(
717 protobuf::SqliteNull {},
718 )),
719 },
720 RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
721 r#type: Some(protobuf::remote_type::Type::SqliteInteger(
722 protobuf::SqliteInteger {},
723 )),
724 },
725 RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
726 r#type: Some(protobuf::remote_type::Type::SqliteReal(
727 protobuf::SqliteReal {},
728 )),
729 },
730 RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
731 r#type: Some(protobuf::remote_type::Type::SqliteText(
732 protobuf::SqliteText {},
733 )),
734 },
735 RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
736 r#type: Some(protobuf::remote_type::Type::SqliteBlob(
737 protobuf::SqliteBlob {},
738 )),
739 },
740 RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
741 r#type: Some(protobuf::remote_type::Type::DmTinyInt(
742 protobuf::DmTinyInt {},
743 )),
744 },
745 RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
746 r#type: Some(protobuf::remote_type::Type::DmSmallInt(
747 protobuf::DmSmallInt {},
748 )),
749 },
750 RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
751 r#type: Some(protobuf::remote_type::Type::DmInteger(
752 protobuf::DmInteger {},
753 )),
754 },
755 RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
756 r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
757 },
758 RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
759 r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
760 },
761 RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
762 r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
763 },
764 RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
765 r#type: Some(protobuf::remote_type::Type::DmNumeric(
766 protobuf::DmNumeric {
767 precision: *precision as u32,
768 scale: *scale as i32,
769 },
770 )),
771 },
772 RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
773 r#type: Some(protobuf::remote_type::Type::DmDecimal(
774 protobuf::DmDecimal {
775 precision: *precision as u32,
776 scale: *scale as i32,
777 },
778 )),
779 },
780 RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
781 r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
782 length: len.map(|s| s as u32),
783 })),
784 },
785 RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
786 r#type: Some(protobuf::remote_type::Type::DmVarchar(
787 protobuf::DmVarchar {
788 length: len.map(|s| s as u32),
789 },
790 )),
791 },
792 RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
793 r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
794 },
795 RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
796 r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
797 length: *len as u32,
798 })),
799 },
800 RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
801 r#type: Some(protobuf::remote_type::Type::DmVarbinary(
802 protobuf::DmVarbinary {
803 length: len.map(|s| s as u32),
804 },
805 )),
806 },
807 RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
808 r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
809 },
810 RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
811 r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
812 },
813 RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
814 r#type: Some(protobuf::remote_type::Type::DmTimestamp(
815 protobuf::DmTimestamp {
816 precision: *precision as u32,
817 },
818 )),
819 },
820 RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
821 r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
822 precision: *precision as u32,
823 })),
824 },
825 RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
826 r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
827 },
828 }
829}
830
831fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
832 let fields = remote_schema
833 .fields
834 .iter()
835 .map(parse_remote_field)
836 .collect::<Vec<_>>();
837
838 RemoteSchema { fields }
839}
840
841fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
842 RemoteField {
843 name: field.name.clone(),
844 remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
845 nullable: field.nullable,
846 }
847}
848
849fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
850 match remote_type.r#type.as_ref().unwrap() {
851 protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
852 protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
853 protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
854 protobuf::remote_type::Type::PostgresFloat4(_) => {
855 RemoteType::Postgres(PostgresType::Float4)
856 }
857 protobuf::remote_type::Type::PostgresFloat8(_) => {
858 RemoteType::Postgres(PostgresType::Float8)
859 }
860 protobuf::remote_type::Type::PostgresNumeric(numeric) => {
861 RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
862 }
863 protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
864 protobuf::remote_type::Type::PostgresVarchar(_) => {
865 RemoteType::Postgres(PostgresType::Varchar)
866 }
867 protobuf::remote_type::Type::PostgresBpchar(_) => {
868 RemoteType::Postgres(PostgresType::Bpchar)
869 }
870 protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
871 protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
872 protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
873 protobuf::remote_type::Type::PostgresTimestamp(_) => {
874 RemoteType::Postgres(PostgresType::Timestamp)
875 }
876 protobuf::remote_type::Type::PostgresTimestampTz(_) => {
877 RemoteType::Postgres(PostgresType::TimestampTz)
878 }
879 protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
880 protobuf::remote_type::Type::PostgresInterval(_) => {
881 RemoteType::Postgres(PostgresType::Interval)
882 }
883 protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
884 protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
885 protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
886 protobuf::remote_type::Type::PostgresInt2Array(_) => {
887 RemoteType::Postgres(PostgresType::Int2Array)
888 }
889 protobuf::remote_type::Type::PostgresInt4Array(_) => {
890 RemoteType::Postgres(PostgresType::Int4Array)
891 }
892 protobuf::remote_type::Type::PostgresInt8Array(_) => {
893 RemoteType::Postgres(PostgresType::Int8Array)
894 }
895 protobuf::remote_type::Type::PostgresFloat4Array(_) => {
896 RemoteType::Postgres(PostgresType::Float4Array)
897 }
898 protobuf::remote_type::Type::PostgresFloat8Array(_) => {
899 RemoteType::Postgres(PostgresType::Float8Array)
900 }
901 protobuf::remote_type::Type::PostgresVarcharArray(_) => {
902 RemoteType::Postgres(PostgresType::VarcharArray)
903 }
904 protobuf::remote_type::Type::PostgresBpcharArray(_) => {
905 RemoteType::Postgres(PostgresType::BpcharArray)
906 }
907 protobuf::remote_type::Type::PostgresTextArray(_) => {
908 RemoteType::Postgres(PostgresType::TextArray)
909 }
910 protobuf::remote_type::Type::PostgresByteaArray(_) => {
911 RemoteType::Postgres(PostgresType::ByteaArray)
912 }
913 protobuf::remote_type::Type::PostgresBoolArray(_) => {
914 RemoteType::Postgres(PostgresType::BoolArray)
915 }
916 protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
917 RemoteType::Postgres(PostgresType::PostGisGeometry)
918 }
919 protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
920 protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
921 protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
922 RemoteType::Mysql(MysqlType::TinyIntUnsigned)
923 }
924 protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
925 protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
926 RemoteType::Mysql(MysqlType::SmallIntUnsigned)
927 }
928 protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
929 protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
930 RemoteType::Mysql(MysqlType::MediumIntUnsigned)
931 }
932 protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
933 protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
934 RemoteType::Mysql(MysqlType::IntegerUnsigned)
935 }
936 protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
937 protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
938 RemoteType::Mysql(MysqlType::BigIntUnsigned)
939 }
940 protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
941 protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
942 protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
943 MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
944 ),
945 protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
946 protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
947 protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
948 protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
949 protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
950 protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
951 protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
952 protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
953 protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
954 protobuf::remote_type::Type::MysqlText(text) => {
955 RemoteType::Mysql(MysqlType::Text(text.length))
956 }
957 protobuf::remote_type::Type::MysqlBlob(blob) => {
958 RemoteType::Mysql(MysqlType::Blob(blob.length))
959 }
960 protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
961 protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
962 protobuf::remote_type::Type::OracleVarchar2(varchar) => {
963 RemoteType::Oracle(OracleType::Varchar2(varchar.length))
964 }
965 protobuf::remote_type::Type::OracleChar(char) => {
966 RemoteType::Oracle(OracleType::Char(char.length))
967 }
968 protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
969 OracleType::Number(number.precision as u8, number.scale as i8),
970 ),
971 protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
972 protobuf::remote_type::Type::OracleTimestamp(_) => {
973 RemoteType::Oracle(OracleType::Timestamp)
974 }
975 protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
976 protobuf::remote_type::Type::OracleBinaryFloat(_) => {
977 RemoteType::Oracle(OracleType::BinaryFloat)
978 }
979 protobuf::remote_type::Type::OracleBinaryDouble(_) => {
980 RemoteType::Oracle(OracleType::BinaryDouble)
981 }
982 protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
983 RemoteType::Oracle(OracleType::Float(*precision as u8))
984 }
985 protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
986 RemoteType::Oracle(OracleType::NChar(*length))
987 }
988 protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
989 RemoteType::Oracle(OracleType::NVarchar2(*length))
990 }
991 protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
992 RemoteType::Oracle(OracleType::Raw(*length))
993 }
994 protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
995 protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
996 protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
997 protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
998 protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
999 protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
1000 protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
1001 protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
1002 protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
1003 protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
1004 protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
1005 protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
1006 protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
1007 protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
1008 protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
1009 protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1010 protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1011 RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1012 }
1013 protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1014 RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1015 }
1016 protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1017 RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1018 }
1019 protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1020 RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1021 }
1022 protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1023 protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1024 RemoteType::Dm(DmType::Binary(*length as u16))
1025 }
1026 protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1027 RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1028 }
1029 protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1030 protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1031 protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1032 RemoteType::Dm(DmType::Timestamp(*precision as u8))
1033 }
1034 protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1035 RemoteType::Dm(DmType::Time(*precision as u8))
1036 }
1037 protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1038 }
1039}