1#[cfg(feature = "dm")]
2use crate::DmConnectionOptions;
3#[cfg(feature = "mysql")]
4use crate::MysqlConnectionOptions;
5#[cfg(feature = "oracle")]
6use crate::OracleConnectionOptions;
7#[cfg(feature = "postgres")]
8use crate::PostgresConnectionOptions;
9#[cfg(feature = "sqlite")]
10use crate::SqliteConnectionOptions;
11use crate::generated::prost as protobuf;
12use crate::{
13 ConnectionOptions, DFResult, DefaultTransform, DmType, MysqlType, OracleType, PostgresType,
14 RemoteField, RemoteSchema, RemoteSchemaRef, RemoteTableExec, RemoteType, SqliteType, Transform,
15 connect,
16};
17use datafusion::arrow::datatypes::SchemaRef;
18use datafusion::common::DataFusionError;
19use datafusion::execution::FunctionRegistry;
20use datafusion::physical_plan::ExecutionPlan;
21use datafusion_proto::convert_required;
22use datafusion_proto::physical_plan::PhysicalExtensionCodec;
23use datafusion_proto::protobuf::proto_error;
24use derive_with::With;
25use prost::Message;
26use std::fmt::Debug;
27#[cfg(feature = "sqlite")]
28use std::path::Path;
29use std::sync::Arc;
30
31pub trait TransformCodec: Debug + Send + Sync {
32 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
33 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
34}
35
36#[derive(Debug)]
37pub struct DefaultTransformCodec {}
38
39const DEFAULT_TRANSFORM_ID: &str = "__default";
40
41impl TransformCodec for DefaultTransformCodec {
42 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>> {
43 if value.as_any().is::<DefaultTransform>() {
44 Ok(DEFAULT_TRANSFORM_ID.as_bytes().to_vec())
45 } else {
46 Err(DataFusionError::Execution(format!(
47 "DefaultTransformCodec does not support transform: {value:?}, please implement a custom TransformCodec."
48 )))
49 }
50 }
51
52 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>> {
53 if value == DEFAULT_TRANSFORM_ID.as_bytes() {
54 Ok(Arc::new(DefaultTransform {}))
55 } else {
56 Err(DataFusionError::Execution(
57 "DefaultTransformCodec only supports DefaultTransform".to_string(),
58 ))
59 }
60 }
61}
62
63#[derive(Debug, With)]
64pub struct RemotePhysicalCodec {
65 transform_codec: Arc<dyn TransformCodec>,
66}
67
68impl RemotePhysicalCodec {
69 pub fn new() -> Self {
70 Self {
71 transform_codec: Arc::new(DefaultTransformCodec {}),
72 }
73 }
74}
75
76impl Default for RemotePhysicalCodec {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82impl PhysicalExtensionCodec for RemotePhysicalCodec {
83 fn try_decode(
84 &self,
85 buf: &[u8],
86 _inputs: &[Arc<dyn ExecutionPlan>],
87 _registry: &dyn FunctionRegistry,
88 ) -> DFResult<Arc<dyn ExecutionPlan>> {
89 let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
90 DataFusionError::Internal(format!(
91 "Failed to decode remote table execution plan: {e:?}"
92 ))
93 })?;
94
95 let transform = self.transform_codec.try_decode(&proto.transform)?;
96
97 let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
98 let remote_schema = proto
99 .remote_schema
100 .map(|schema| Arc::new(parse_remote_schema(&schema)));
101
102 let projection: Option<Vec<usize>> = proto
103 .projection
104 .map(|p| p.projection.iter().map(|n| *n as usize).collect());
105
106 let limit = proto.limit.map(|l| l as usize);
107
108 let conn_options = parse_connection_options(proto.conn_options.unwrap());
109 let conn = tokio::task::block_in_place(|| {
110 tokio::runtime::Handle::current().block_on(async {
111 let pool = connect(&conn_options).await?;
112 let conn = pool.get().await?;
113 Ok::<_, DataFusionError>(conn)
114 })
115 })?;
116
117 Ok(Arc::new(RemoteTableExec::try_new(
118 conn_options,
119 proto.sql,
120 table_schema,
121 remote_schema,
122 projection,
123 proto.unparsed_filters,
124 limit,
125 transform,
126 conn,
127 )?))
128 }
129
130 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
131 if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
132 let serialized_transform = self.transform_codec.try_encode(exec.transform.as_ref())?;
133
134 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
135 let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
136
137 let proto = protobuf::RemoteTableExec {
138 conn_options: Some(serialized_connection_options),
139 sql: exec.sql.clone(),
140 table_schema: Some(exec.table_schema.as_ref().try_into()?),
141 remote_schema,
142 projection: exec
143 .projection
144 .as_ref()
145 .map(|p| serialize_projection(p.as_slice())),
146 unparsed_filters: exec.unparsed_filters.clone(),
147 limit: exec.limit.map(|l| l as u32),
148 transform: serialized_transform,
149 };
150
151 proto.encode(buf).map_err(|e| {
152 DataFusionError::Internal(format!(
153 "Failed to encode remote table execution plan: {e:?}"
154 ))
155 })?;
156 Ok(())
157 } else {
158 Err(DataFusionError::Execution(format!(
159 "Failed to encode {}",
160 RemoteTableExec::static_name()
161 )))
162 }
163 }
164}
165
166fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
167 match options {
168 #[cfg(feature = "postgres")]
169 ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
170 connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
171 protobuf::PostgresConnectionOptions {
172 host: options.host.clone(),
173 port: options.port as u32,
174 username: options.username.clone(),
175 password: options.password.clone(),
176 database: options.database.clone(),
177 pool_max_size: options.pool_max_size as u32,
178 stream_chunk_size: options.stream_chunk_size as u32,
179 },
180 )),
181 },
182 #[cfg(feature = "mysql")]
183 ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
184 connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
185 protobuf::MysqlConnectionOptions {
186 host: options.host.clone(),
187 port: options.port as u32,
188 username: options.username.clone(),
189 password: options.password.clone(),
190 database: options.database.clone(),
191 pool_max_size: options.pool_max_size as u32,
192 stream_chunk_size: options.stream_chunk_size as u32,
193 },
194 )),
195 },
196 #[cfg(feature = "oracle")]
197 ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
198 connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
199 protobuf::OracleConnectionOptions {
200 host: options.host.clone(),
201 port: options.port as u32,
202 username: options.username.clone(),
203 password: options.password.clone(),
204 service_name: options.service_name.clone(),
205 pool_max_size: options.pool_max_size as u32,
206 stream_chunk_size: options.stream_chunk_size as u32,
207 },
208 )),
209 },
210 #[cfg(feature = "sqlite")]
211 ConnectionOptions::Sqlite(options) => protobuf::ConnectionOptions {
212 connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
213 protobuf::SqliteConnectionOptions {
214 path: options.path.to_str().unwrap().to_string(),
215 stream_chunk_size: options.stream_chunk_size as u32,
216 },
217 )),
218 },
219 #[cfg(feature = "dm")]
220 ConnectionOptions::Dm(options) => protobuf::ConnectionOptions {
221 connection_options: Some(protobuf::connection_options::ConnectionOptions::Dm(
222 protobuf::DmConnectionOptions {
223 host: options.host.clone(),
224 port: options.port as u32,
225 username: options.username.clone(),
226 password: options.password.clone(),
227 schema: options.schema.clone(),
228 stream_chunk_size: options.stream_chunk_size as u32,
229 driver: options.driver.clone(),
230 },
231 )),
232 },
233 }
234}
235
236fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
237 match options.connection_options {
238 #[cfg(feature = "postgres")]
239 Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
240 ConnectionOptions::Postgres(PostgresConnectionOptions {
241 host: options.host,
242 port: options.port as u16,
243 username: options.username,
244 password: options.password,
245 database: options.database,
246 pool_max_size: options.pool_max_size as usize,
247 stream_chunk_size: options.stream_chunk_size as usize,
248 })
249 }
250 #[cfg(feature = "mysql")]
251 Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
252 ConnectionOptions::Mysql(MysqlConnectionOptions {
253 host: options.host,
254 port: options.port as u16,
255 username: options.username,
256 password: options.password,
257 database: options.database,
258 pool_max_size: options.pool_max_size as usize,
259 stream_chunk_size: options.stream_chunk_size as usize,
260 })
261 }
262 #[cfg(feature = "oracle")]
263 Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
264 ConnectionOptions::Oracle(OracleConnectionOptions {
265 host: options.host,
266 port: options.port as u16,
267 username: options.username,
268 password: options.password,
269 service_name: options.service_name,
270 pool_max_size: options.pool_max_size as usize,
271 stream_chunk_size: options.stream_chunk_size as usize,
272 })
273 }
274 #[cfg(feature = "sqlite")]
275 Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
276 ConnectionOptions::Sqlite(SqliteConnectionOptions {
277 path: Path::new(&options.path).to_path_buf(),
278 stream_chunk_size: options.stream_chunk_size as usize,
279 })
280 }
281 #[cfg(feature = "dm")]
282 Some(protobuf::connection_options::ConnectionOptions::Dm(options)) => {
283 ConnectionOptions::Dm(DmConnectionOptions {
284 host: options.host,
285 port: options.port as u16,
286 username: options.username,
287 password: options.password,
288 schema: options.schema,
289 stream_chunk_size: options.stream_chunk_size as usize,
290 driver: options.driver,
291 })
292 }
293 _ => panic!("Failed to parse connection options: {options:?}"),
294 }
295}
296
297fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
298 protobuf::Projection {
299 projection: projection.iter().map(|n| *n as u32).collect(),
300 }
301}
302
303fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
304 let fields = remote_schema
305 .fields
306 .iter()
307 .map(serialize_remote_field)
308 .collect::<Vec<_>>();
309
310 protobuf::RemoteSchema { fields }
311}
312
313fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
314 protobuf::RemoteField {
315 name: remote_field.name.clone(),
316 remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
317 nullable: remote_field.nullable,
318 }
319}
320
321fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
322 match remote_type {
323 RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
324 r#type: Some(protobuf::remote_type::Type::PostgresInt2(
325 protobuf::PostgresInt2 {},
326 )),
327 },
328 RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
329 r#type: Some(protobuf::remote_type::Type::PostgresInt4(
330 protobuf::PostgresInt4 {},
331 )),
332 },
333 RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
334 r#type: Some(protobuf::remote_type::Type::PostgresInt8(
335 protobuf::PostgresInt8 {},
336 )),
337 },
338 RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
339 r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
340 protobuf::PostgresFloat4 {},
341 )),
342 },
343 RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
344 r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
345 protobuf::PostgresFloat8 {},
346 )),
347 },
348 RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
349 r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
350 protobuf::PostgresNumeric {
351 scale: *scale as i32,
352 },
353 )),
354 },
355 RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
356 r#type: Some(protobuf::remote_type::Type::PostgresName(
357 protobuf::PostgresName {},
358 )),
359 },
360 RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
361 r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
362 protobuf::PostgresVarchar {},
363 )),
364 },
365 RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
366 r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
367 protobuf::PostgresBpchar {},
368 )),
369 },
370 RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
371 r#type: Some(protobuf::remote_type::Type::PostgresText(
372 protobuf::PostgresText {},
373 )),
374 },
375 RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
376 r#type: Some(protobuf::remote_type::Type::PostgresBytea(
377 protobuf::PostgresBytea {},
378 )),
379 },
380 RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
381 r#type: Some(protobuf::remote_type::Type::PostgresDate(
382 protobuf::PostgresDate {},
383 )),
384 },
385 RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
386 r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
387 protobuf::PostgresTimestamp {},
388 )),
389 },
390 RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
391 r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
392 protobuf::PostgresTimestampTz {},
393 )),
394 },
395 RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
396 r#type: Some(protobuf::remote_type::Type::PostgresTime(
397 protobuf::PostgresTime {},
398 )),
399 },
400 RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
401 r#type: Some(protobuf::remote_type::Type::PostgresInterval(
402 protobuf::PostgresInterval {},
403 )),
404 },
405 RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
406 r#type: Some(protobuf::remote_type::Type::PostgresBool(
407 protobuf::PostgresBool {},
408 )),
409 },
410 RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
411 r#type: Some(protobuf::remote_type::Type::PostgresJson(
412 protobuf::PostgresJson {},
413 )),
414 },
415 RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
416 r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
417 protobuf::PostgresJsonb {},
418 )),
419 },
420 RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
421 r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
422 protobuf::PostgresInt2Array {},
423 )),
424 },
425 RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
426 r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
427 protobuf::PostgresInt4Array {},
428 )),
429 },
430 RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
431 r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
432 protobuf::PostgresInt8Array {},
433 )),
434 },
435 RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
436 r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
437 protobuf::PostgresFloat4Array {},
438 )),
439 },
440 RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
441 r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
442 protobuf::PostgresFloat8Array {},
443 )),
444 },
445 RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
446 r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
447 protobuf::PostgresVarcharArray {},
448 )),
449 },
450 RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
451 r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
452 protobuf::PostgresBpcharArray {},
453 )),
454 },
455 RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
456 r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
457 protobuf::PostgresTextArray {},
458 )),
459 },
460 RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
461 r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
462 protobuf::PostgresByteaArray {},
463 )),
464 },
465 RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
466 r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
467 protobuf::PostgresBoolArray {},
468 )),
469 },
470 RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
471 r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
472 protobuf::PostgresPostGisGeometry {},
473 )),
474 },
475 RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
476 r#type: Some(protobuf::remote_type::Type::PostgresOid(
477 protobuf::PostgresOid {},
478 )),
479 },
480
481 RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
482 r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
483 protobuf::MysqlTinyInt {},
484 )),
485 },
486 RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
487 r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
488 protobuf::MysqlTinyIntUnsigned {},
489 )),
490 },
491 RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
492 r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
493 protobuf::MysqlSmallInt {},
494 )),
495 },
496 RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
497 r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
498 protobuf::MysqlSmallIntUnsigned {},
499 )),
500 },
501 RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
502 r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
503 protobuf::MysqlMediumInt {},
504 )),
505 },
506 RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
507 r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
508 protobuf::MysqlMediumIntUnsigned {},
509 )),
510 },
511 RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
512 r#type: Some(protobuf::remote_type::Type::MysqlInteger(
513 protobuf::MysqlInteger {},
514 )),
515 },
516 RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
517 r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
518 protobuf::MysqlIntegerUnsigned {},
519 )),
520 },
521 RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
522 r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
523 protobuf::MysqlBigInt {},
524 )),
525 },
526 RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
527 r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
528 protobuf::MysqlBigIntUnsigned {},
529 )),
530 },
531 RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
532 r#type: Some(protobuf::remote_type::Type::MysqlFloat(
533 protobuf::MysqlFloat {},
534 )),
535 },
536 RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
537 r#type: Some(protobuf::remote_type::Type::MysqlDouble(
538 protobuf::MysqlDouble {},
539 )),
540 },
541 RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
542 r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
543 protobuf::MysqlDecimal {
544 precision: *precision as u32,
545 scale: *scale as u32,
546 },
547 )),
548 },
549 RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
550 r#type: Some(protobuf::remote_type::Type::MysqlDate(
551 protobuf::MysqlDate {},
552 )),
553 },
554 RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
555 r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
556 protobuf::MysqlDateTime {},
557 )),
558 },
559 RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
560 r#type: Some(protobuf::remote_type::Type::MysqlTime(
561 protobuf::MysqlTime {},
562 )),
563 },
564 RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
565 r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
566 protobuf::MysqlTimestamp {},
567 )),
568 },
569 RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
570 r#type: Some(protobuf::remote_type::Type::MysqlYear(
571 protobuf::MysqlYear {},
572 )),
573 },
574 RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
575 r#type: Some(protobuf::remote_type::Type::MysqlChar(
576 protobuf::MysqlChar {},
577 )),
578 },
579 RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
580 r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
581 protobuf::MysqlVarchar {},
582 )),
583 },
584 RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
585 r#type: Some(protobuf::remote_type::Type::MysqlBinary(
586 protobuf::MysqlBinary {},
587 )),
588 },
589 RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
590 r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
591 protobuf::MysqlVarbinary {},
592 )),
593 },
594 RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
595 r#type: Some(protobuf::remote_type::Type::MysqlText(
596 protobuf::MysqlText { length: *len },
597 )),
598 },
599 RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
600 r#type: Some(protobuf::remote_type::Type::MysqlBlob(
601 protobuf::MysqlBlob { length: *len },
602 )),
603 },
604 RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
605 r#type: Some(protobuf::remote_type::Type::MysqlJson(
606 protobuf::MysqlJson {},
607 )),
608 },
609 RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
610 r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
611 protobuf::MysqlGeometry {},
612 )),
613 },
614
615 RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
616 r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
617 protobuf::OracleVarchar2 { length: *len },
618 )),
619 },
620 RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
621 r#type: Some(protobuf::remote_type::Type::OracleChar(
622 protobuf::OracleChar { length: *len },
623 )),
624 },
625 RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
626 r#type: Some(protobuf::remote_type::Type::OracleNumber(
627 protobuf::OracleNumber {
628 precision: *precision as u32,
629 scale: *scale as i32,
630 },
631 )),
632 },
633 RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
634 r#type: Some(protobuf::remote_type::Type::OracleDate(
635 protobuf::OracleDate {},
636 )),
637 },
638 RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
639 r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
640 protobuf::OracleTimestamp {},
641 )),
642 },
643 RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
644 r#type: Some(protobuf::remote_type::Type::OracleBoolean(
645 protobuf::OracleBoolean {},
646 )),
647 },
648 RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
649 r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
650 protobuf::OracleBinaryFloat {},
651 )),
652 },
653 RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
654 r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
655 protobuf::OracleBinaryDouble {},
656 )),
657 },
658 RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
659 r#type: Some(protobuf::remote_type::Type::OracleBlob(
660 protobuf::OracleBlob {},
661 )),
662 },
663 RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
664 r#type: Some(protobuf::remote_type::Type::OracleFloat(
665 protobuf::OracleFloat {
666 precision: *precision as u32,
667 },
668 )),
669 },
670 RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
671 r#type: Some(protobuf::remote_type::Type::OracleNchar(
672 protobuf::OracleNChar { length: *len },
673 )),
674 },
675 RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
676 r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
677 protobuf::OracleNVarchar2 { length: *len },
678 )),
679 },
680 RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
681 r#type: Some(protobuf::remote_type::Type::OracleRaw(
682 protobuf::OracleRaw { length: *len },
683 )),
684 },
685 RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
686 r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
687 protobuf::OracleLongRaw {},
688 )),
689 },
690 RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
691 r#type: Some(protobuf::remote_type::Type::OracleLong(
692 protobuf::OracleLong {},
693 )),
694 },
695 RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
696 r#type: Some(protobuf::remote_type::Type::OracleClob(
697 protobuf::OracleClob {},
698 )),
699 },
700 RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
701 r#type: Some(protobuf::remote_type::Type::OracleNclob(
702 protobuf::OracleNClob {},
703 )),
704 },
705 RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
706 r#type: Some(protobuf::remote_type::Type::SqliteNull(
707 protobuf::SqliteNull {},
708 )),
709 },
710 RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
711 r#type: Some(protobuf::remote_type::Type::SqliteInteger(
712 protobuf::SqliteInteger {},
713 )),
714 },
715 RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
716 r#type: Some(protobuf::remote_type::Type::SqliteReal(
717 protobuf::SqliteReal {},
718 )),
719 },
720 RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
721 r#type: Some(protobuf::remote_type::Type::SqliteText(
722 protobuf::SqliteText {},
723 )),
724 },
725 RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
726 r#type: Some(protobuf::remote_type::Type::SqliteBlob(
727 protobuf::SqliteBlob {},
728 )),
729 },
730 RemoteType::Dm(DmType::TinyInt) => protobuf::RemoteType {
731 r#type: Some(protobuf::remote_type::Type::DmTinyInt(
732 protobuf::DmTinyInt {},
733 )),
734 },
735 RemoteType::Dm(DmType::SmallInt) => protobuf::RemoteType {
736 r#type: Some(protobuf::remote_type::Type::DmSmallInt(
737 protobuf::DmSmallInt {},
738 )),
739 },
740 RemoteType::Dm(DmType::Integer) => protobuf::RemoteType {
741 r#type: Some(protobuf::remote_type::Type::DmInteger(
742 protobuf::DmInteger {},
743 )),
744 },
745 RemoteType::Dm(DmType::BigInt) => protobuf::RemoteType {
746 r#type: Some(protobuf::remote_type::Type::DmBigInt(protobuf::DmBigInt {})),
747 },
748 RemoteType::Dm(DmType::Real) => protobuf::RemoteType {
749 r#type: Some(protobuf::remote_type::Type::DmReal(protobuf::DmReal {})),
750 },
751 RemoteType::Dm(DmType::Double) => protobuf::RemoteType {
752 r#type: Some(protobuf::remote_type::Type::DmDouble(protobuf::DmDouble {})),
753 },
754 RemoteType::Dm(DmType::Numeric(precision, scale)) => protobuf::RemoteType {
755 r#type: Some(protobuf::remote_type::Type::DmNumeric(
756 protobuf::DmNumeric {
757 precision: *precision as u32,
758 scale: *scale as i32,
759 },
760 )),
761 },
762 RemoteType::Dm(DmType::Decimal(precision, scale)) => protobuf::RemoteType {
763 r#type: Some(protobuf::remote_type::Type::DmDecimal(
764 protobuf::DmDecimal {
765 precision: *precision as u32,
766 scale: *scale as i32,
767 },
768 )),
769 },
770 RemoteType::Dm(DmType::Char(len)) => protobuf::RemoteType {
771 r#type: Some(protobuf::remote_type::Type::DmChar(protobuf::DmChar {
772 length: len.map(|s| s as u32),
773 })),
774 },
775 RemoteType::Dm(DmType::Varchar(len)) => protobuf::RemoteType {
776 r#type: Some(protobuf::remote_type::Type::DmVarchar(
777 protobuf::DmVarchar {
778 length: len.map(|s| s as u32),
779 },
780 )),
781 },
782 RemoteType::Dm(DmType::Text) => protobuf::RemoteType {
783 r#type: Some(protobuf::remote_type::Type::DmText(protobuf::DmText {})),
784 },
785 RemoteType::Dm(DmType::Binary(len)) => protobuf::RemoteType {
786 r#type: Some(protobuf::remote_type::Type::DmBinary(protobuf::DmBinary {
787 length: *len as u32,
788 })),
789 },
790 RemoteType::Dm(DmType::Varbinary(len)) => protobuf::RemoteType {
791 r#type: Some(protobuf::remote_type::Type::DmVarbinary(
792 protobuf::DmVarbinary {
793 length: len.map(|s| s as u32),
794 },
795 )),
796 },
797 RemoteType::Dm(DmType::Image) => protobuf::RemoteType {
798 r#type: Some(protobuf::remote_type::Type::DmImage(protobuf::DmImage {})),
799 },
800 RemoteType::Dm(DmType::Bit) => protobuf::RemoteType {
801 r#type: Some(protobuf::remote_type::Type::DmBit(protobuf::DmBit {})),
802 },
803 RemoteType::Dm(DmType::Timestamp(precision)) => protobuf::RemoteType {
804 r#type: Some(protobuf::remote_type::Type::DmTimestamp(
805 protobuf::DmTimestamp {
806 precision: *precision as u32,
807 },
808 )),
809 },
810 RemoteType::Dm(DmType::Time(precision)) => protobuf::RemoteType {
811 r#type: Some(protobuf::remote_type::Type::DmTime(protobuf::DmTime {
812 precision: *precision as u32,
813 })),
814 },
815 RemoteType::Dm(DmType::Date) => protobuf::RemoteType {
816 r#type: Some(protobuf::remote_type::Type::DmDate(protobuf::DmDate {})),
817 },
818 }
819}
820
821fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
822 let fields = remote_schema
823 .fields
824 .iter()
825 .map(parse_remote_field)
826 .collect::<Vec<_>>();
827
828 RemoteSchema { fields }
829}
830
831fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
832 RemoteField {
833 name: field.name.clone(),
834 remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
835 nullable: field.nullable,
836 }
837}
838
839fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
840 match remote_type.r#type.as_ref().unwrap() {
841 protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
842 protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
843 protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
844 protobuf::remote_type::Type::PostgresFloat4(_) => {
845 RemoteType::Postgres(PostgresType::Float4)
846 }
847 protobuf::remote_type::Type::PostgresFloat8(_) => {
848 RemoteType::Postgres(PostgresType::Float8)
849 }
850 protobuf::remote_type::Type::PostgresNumeric(numeric) => {
851 RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
852 }
853 protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
854 protobuf::remote_type::Type::PostgresVarchar(_) => {
855 RemoteType::Postgres(PostgresType::Varchar)
856 }
857 protobuf::remote_type::Type::PostgresBpchar(_) => {
858 RemoteType::Postgres(PostgresType::Bpchar)
859 }
860 protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
861 protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
862 protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
863 protobuf::remote_type::Type::PostgresTimestamp(_) => {
864 RemoteType::Postgres(PostgresType::Timestamp)
865 }
866 protobuf::remote_type::Type::PostgresTimestampTz(_) => {
867 RemoteType::Postgres(PostgresType::TimestampTz)
868 }
869 protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
870 protobuf::remote_type::Type::PostgresInterval(_) => {
871 RemoteType::Postgres(PostgresType::Interval)
872 }
873 protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
874 protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
875 protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
876 protobuf::remote_type::Type::PostgresInt2Array(_) => {
877 RemoteType::Postgres(PostgresType::Int2Array)
878 }
879 protobuf::remote_type::Type::PostgresInt4Array(_) => {
880 RemoteType::Postgres(PostgresType::Int4Array)
881 }
882 protobuf::remote_type::Type::PostgresInt8Array(_) => {
883 RemoteType::Postgres(PostgresType::Int8Array)
884 }
885 protobuf::remote_type::Type::PostgresFloat4Array(_) => {
886 RemoteType::Postgres(PostgresType::Float4Array)
887 }
888 protobuf::remote_type::Type::PostgresFloat8Array(_) => {
889 RemoteType::Postgres(PostgresType::Float8Array)
890 }
891 protobuf::remote_type::Type::PostgresVarcharArray(_) => {
892 RemoteType::Postgres(PostgresType::VarcharArray)
893 }
894 protobuf::remote_type::Type::PostgresBpcharArray(_) => {
895 RemoteType::Postgres(PostgresType::BpcharArray)
896 }
897 protobuf::remote_type::Type::PostgresTextArray(_) => {
898 RemoteType::Postgres(PostgresType::TextArray)
899 }
900 protobuf::remote_type::Type::PostgresByteaArray(_) => {
901 RemoteType::Postgres(PostgresType::ByteaArray)
902 }
903 protobuf::remote_type::Type::PostgresBoolArray(_) => {
904 RemoteType::Postgres(PostgresType::BoolArray)
905 }
906 protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
907 RemoteType::Postgres(PostgresType::PostGisGeometry)
908 }
909 protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
910 protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
911 protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
912 RemoteType::Mysql(MysqlType::TinyIntUnsigned)
913 }
914 protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
915 protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
916 RemoteType::Mysql(MysqlType::SmallIntUnsigned)
917 }
918 protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
919 protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
920 RemoteType::Mysql(MysqlType::MediumIntUnsigned)
921 }
922 protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
923 protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
924 RemoteType::Mysql(MysqlType::IntegerUnsigned)
925 }
926 protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
927 protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
928 RemoteType::Mysql(MysqlType::BigIntUnsigned)
929 }
930 protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
931 protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
932 protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
933 MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
934 ),
935 protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
936 protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
937 protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
938 protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
939 protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
940 protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
941 protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
942 protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
943 protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
944 protobuf::remote_type::Type::MysqlText(text) => {
945 RemoteType::Mysql(MysqlType::Text(text.length))
946 }
947 protobuf::remote_type::Type::MysqlBlob(blob) => {
948 RemoteType::Mysql(MysqlType::Blob(blob.length))
949 }
950 protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
951 protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
952 protobuf::remote_type::Type::OracleVarchar2(varchar) => {
953 RemoteType::Oracle(OracleType::Varchar2(varchar.length))
954 }
955 protobuf::remote_type::Type::OracleChar(char) => {
956 RemoteType::Oracle(OracleType::Char(char.length))
957 }
958 protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
959 OracleType::Number(number.precision as u8, number.scale as i8),
960 ),
961 protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
962 protobuf::remote_type::Type::OracleTimestamp(_) => {
963 RemoteType::Oracle(OracleType::Timestamp)
964 }
965 protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
966 protobuf::remote_type::Type::OracleBinaryFloat(_) => {
967 RemoteType::Oracle(OracleType::BinaryFloat)
968 }
969 protobuf::remote_type::Type::OracleBinaryDouble(_) => {
970 RemoteType::Oracle(OracleType::BinaryDouble)
971 }
972 protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
973 RemoteType::Oracle(OracleType::Float(*precision as u8))
974 }
975 protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
976 RemoteType::Oracle(OracleType::NChar(*length))
977 }
978 protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
979 RemoteType::Oracle(OracleType::NVarchar2(*length))
980 }
981 protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
982 RemoteType::Oracle(OracleType::Raw(*length))
983 }
984 protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
985 protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
986 protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
987 protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
988 protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
989 protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
990 protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
991 protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
992 protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
993 protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
994 protobuf::remote_type::Type::DmTinyInt(_) => RemoteType::Dm(DmType::TinyInt),
995 protobuf::remote_type::Type::DmSmallInt(_) => RemoteType::Dm(DmType::SmallInt),
996 protobuf::remote_type::Type::DmInteger(_) => RemoteType::Dm(DmType::Integer),
997 protobuf::remote_type::Type::DmBigInt(_) => RemoteType::Dm(DmType::BigInt),
998 protobuf::remote_type::Type::DmReal(_) => RemoteType::Dm(DmType::Real),
999 protobuf::remote_type::Type::DmDouble(_) => RemoteType::Dm(DmType::Double),
1000 protobuf::remote_type::Type::DmNumeric(protobuf::DmNumeric { precision, scale }) => {
1001 RemoteType::Dm(DmType::Numeric(*precision as u8, *scale as i8))
1002 }
1003 protobuf::remote_type::Type::DmDecimal(protobuf::DmDecimal { precision, scale }) => {
1004 RemoteType::Dm(DmType::Decimal(*precision as u8, *scale as i8))
1005 }
1006 protobuf::remote_type::Type::DmChar(protobuf::DmChar { length }) => {
1007 RemoteType::Dm(DmType::Char(length.map(|s| s as u16)))
1008 }
1009 protobuf::remote_type::Type::DmVarchar(protobuf::DmVarchar { length }) => {
1010 RemoteType::Dm(DmType::Varchar(length.map(|s| s as u16)))
1011 }
1012 protobuf::remote_type::Type::DmText(protobuf::DmText {}) => RemoteType::Dm(DmType::Text),
1013 protobuf::remote_type::Type::DmBinary(protobuf::DmBinary { length }) => {
1014 RemoteType::Dm(DmType::Binary(*length as u16))
1015 }
1016 protobuf::remote_type::Type::DmVarbinary(protobuf::DmVarbinary { length }) => {
1017 RemoteType::Dm(DmType::Varbinary(length.map(|s| s as u16)))
1018 }
1019 protobuf::remote_type::Type::DmImage(protobuf::DmImage {}) => RemoteType::Dm(DmType::Image),
1020 protobuf::remote_type::Type::DmBit(protobuf::DmBit {}) => RemoteType::Dm(DmType::Bit),
1021 protobuf::remote_type::Type::DmTimestamp(protobuf::DmTimestamp { precision }) => {
1022 RemoteType::Dm(DmType::Timestamp(*precision as u8))
1023 }
1024 protobuf::remote_type::Type::DmTime(protobuf::DmTime { precision }) => {
1025 RemoteType::Dm(DmType::Time(*precision as u8))
1026 }
1027 protobuf::remote_type::Type::DmDate(_) => RemoteType::Dm(DmType::Date),
1028 }
1029}