1#[cfg(feature = "mysql")]
2use crate::MysqlConnectionOptions;
3#[cfg(feature = "oracle")]
4use crate::OracleConnectionOptions;
5#[cfg(feature = "postgres")]
6use crate::PostgresConnectionOptions;
7use crate::generated::prost as protobuf;
8use crate::{
9 ConnectionOptions, DFResult, MysqlType, OracleType, PostgresType, RemoteField, RemoteSchema,
10 RemoteSchemaRef, RemoteTableExec, RemoteType, SqliteType, Transform, connect,
11};
12use datafusion::arrow::datatypes::SchemaRef;
13use datafusion::common::DataFusionError;
14use datafusion::execution::FunctionRegistry;
15use datafusion::physical_plan::ExecutionPlan;
16use datafusion_proto::convert_required;
17use datafusion_proto::logical_plan::from_proto::parse_exprs;
18use datafusion_proto::logical_plan::to_proto::serialize_exprs;
19use datafusion_proto::logical_plan::{DefaultLogicalExtensionCodec, LogicalExtensionCodec};
20use datafusion_proto::physical_plan::PhysicalExtensionCodec;
21use datafusion_proto::protobuf::proto_error;
22use derive_with::With;
23use prost::Message;
24use std::fmt::Debug;
25#[cfg(feature = "sqlite")]
26use std::path::Path;
27use std::sync::Arc;
28
29pub trait TransformCodec: Debug + Send + Sync {
30 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
31 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
32}
33
34#[derive(Debug, With)]
35pub struct RemotePhysicalCodec {
36 transform_codec: Option<Arc<dyn TransformCodec>>,
37 logical_extension_codec: Arc<dyn LogicalExtensionCodec>,
38}
39
40impl RemotePhysicalCodec {
41 pub fn new() -> Self {
42 Self {
43 transform_codec: None,
44 logical_extension_codec: Arc::new(DefaultLogicalExtensionCodec {}),
45 }
46 }
47}
48
49impl Default for RemotePhysicalCodec {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55impl PhysicalExtensionCodec for RemotePhysicalCodec {
56 fn try_decode(
57 &self,
58 buf: &[u8],
59 _inputs: &[Arc<dyn ExecutionPlan>],
60 registry: &dyn FunctionRegistry,
61 ) -> DFResult<Arc<dyn ExecutionPlan>> {
62 let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
63 DataFusionError::Internal(format!(
64 "Failed to decode remote table execution plan: {e:?}"
65 ))
66 })?;
67
68 let transform = if let Some(bytes) = proto.transform {
69 let Some(transform_codec) = self.transform_codec.as_ref() else {
70 return Err(DataFusionError::Execution(
71 "No transform codec found".to_string(),
72 ));
73 };
74 Some(transform_codec.try_decode(&bytes)?)
75 } else {
76 None
77 };
78
79 let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
80 let remote_schema = proto
81 .remote_schema
82 .map(|schema| Arc::new(parse_remote_schema(&schema)));
83
84 let projection: Option<Vec<usize>> = proto
85 .projection
86 .map(|p| p.projection.iter().map(|n| *n as usize).collect());
87
88 let filters = parse_exprs(
89 &proto.filters,
90 registry,
91 self.logical_extension_codec.as_ref(),
92 )?;
93 let limit = proto.limit.map(|l| l as usize);
94
95 let conn_options = parse_connection_options(proto.conn_options.unwrap());
96 let conn = tokio::task::block_in_place(|| {
97 tokio::runtime::Handle::current().block_on(async {
98 let pool = connect(&conn_options).await?;
99 let conn = pool.get().await?;
100 Ok::<_, DataFusionError>(conn)
101 })
102 })?;
103
104 Ok(Arc::new(RemoteTableExec::try_new(
105 conn_options,
106 proto.sql,
107 table_schema,
108 remote_schema,
109 projection,
110 filters,
111 limit,
112 transform,
113 conn,
114 )?))
115 }
116
117 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
118 if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
119 let serialized_transform = if let Some(transform) = exec.transform.as_ref() {
120 let Some(transform_codec) = self.transform_codec.as_ref() else {
121 return Err(DataFusionError::Execution(
122 "No transform codec found".to_string(),
123 ));
124 };
125 let bytes = transform_codec.try_encode(transform.as_ref())?;
126 Some(bytes)
127 } else {
128 None
129 };
130
131 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
132 let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
133 let serialized_filters =
134 serialize_exprs(&exec.filters, self.logical_extension_codec.as_ref())?;
135
136 let proto = protobuf::RemoteTableExec {
137 conn_options: Some(serialized_connection_options),
138 sql: exec.sql.clone(),
139 table_schema: Some(exec.table_schema.as_ref().try_into()?),
140 remote_schema,
141 projection: exec
142 .projection
143 .as_ref()
144 .map(|p| serialize_projection(p.as_slice())),
145 filters: serialized_filters,
146 limit: exec.limit.map(|l| l as u32),
147 transform: serialized_transform,
148 };
149
150 proto.encode(buf).map_err(|e| {
151 DataFusionError::Internal(format!(
152 "Failed to encode remote table execution plan: {e:?}"
153 ))
154 })?;
155 Ok(())
156 } else {
157 Err(DataFusionError::Execution(format!(
158 "Failed to encode {}",
159 RemoteTableExec::static_name()
160 )))
161 }
162 }
163}
164
165fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
166 match options {
167 #[cfg(feature = "postgres")]
168 ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
169 connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
170 protobuf::PostgresConnectionOptions {
171 host: options.host.clone(),
172 port: options.port as u32,
173 username: options.username.clone(),
174 password: options.password.clone(),
175 database: options.database.clone(),
176 pool_max_size: options.pool_max_size as u32,
177 stream_chunk_size: options.stream_chunk_size as u32,
178 },
179 )),
180 },
181 #[cfg(feature = "mysql")]
182 ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
183 connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
184 protobuf::MysqlConnectionOptions {
185 host: options.host.clone(),
186 port: options.port as u32,
187 username: options.username.clone(),
188 password: options.password.clone(),
189 database: options.database.clone(),
190 pool_max_size: options.pool_max_size as u32,
191 stream_chunk_size: options.stream_chunk_size as u32,
192 },
193 )),
194 },
195 #[cfg(feature = "oracle")]
196 ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
197 connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
198 protobuf::OracleConnectionOptions {
199 host: options.host.clone(),
200 port: options.port as u32,
201 username: options.username.clone(),
202 password: options.password.clone(),
203 service_name: options.service_name.clone(),
204 pool_max_size: options.pool_max_size as u32,
205 stream_chunk_size: options.stream_chunk_size as u32,
206 },
207 )),
208 },
209 #[cfg(feature = "sqlite")]
210 ConnectionOptions::Sqlite(path) => protobuf::ConnectionOptions {
211 connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
212 protobuf::SqliteConnectionOptions {
213 path: path.to_str().unwrap().to_string(),
214 },
215 )),
216 },
217 }
218}
219
220fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
221 match options.connection_options {
222 #[cfg(feature = "postgres")]
223 Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
224 ConnectionOptions::Postgres(PostgresConnectionOptions {
225 host: options.host,
226 port: options.port as u16,
227 username: options.username,
228 password: options.password,
229 database: options.database,
230 pool_max_size: options.pool_max_size as usize,
231 stream_chunk_size: options.stream_chunk_size as usize,
232 })
233 }
234 #[cfg(feature = "mysql")]
235 Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
236 ConnectionOptions::Mysql(MysqlConnectionOptions {
237 host: options.host,
238 port: options.port as u16,
239 username: options.username,
240 password: options.password,
241 database: options.database,
242 pool_max_size: options.pool_max_size as usize,
243 stream_chunk_size: options.stream_chunk_size as usize,
244 })
245 }
246 #[cfg(feature = "oracle")]
247 Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
248 ConnectionOptions::Oracle(OracleConnectionOptions {
249 host: options.host,
250 port: options.port as u16,
251 username: options.username,
252 password: options.password,
253 service_name: options.service_name,
254 pool_max_size: options.pool_max_size as usize,
255 stream_chunk_size: options.stream_chunk_size as usize,
256 })
257 }
258 #[cfg(feature = "sqlite")]
259 Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
260 ConnectionOptions::Sqlite(Path::new(&options.path).to_path_buf())
261 }
262 _ => panic!("Failed to parse connection options: {options:?}"),
263 }
264}
265
266fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
267 protobuf::Projection {
268 projection: projection.iter().map(|n| *n as u32).collect(),
269 }
270}
271
272fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
273 let fields = remote_schema
274 .fields
275 .iter()
276 .map(serialize_remote_field)
277 .collect::<Vec<_>>();
278
279 protobuf::RemoteSchema { fields }
280}
281
282fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
283 protobuf::RemoteField {
284 name: remote_field.name.clone(),
285 remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
286 nullable: remote_field.nullable,
287 }
288}
289
290fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
291 match remote_type {
292 RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
293 r#type: Some(protobuf::remote_type::Type::PostgresInt2(
294 protobuf::PostgresInt2 {},
295 )),
296 },
297 RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
298 r#type: Some(protobuf::remote_type::Type::PostgresInt4(
299 protobuf::PostgresInt4 {},
300 )),
301 },
302 RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
303 r#type: Some(protobuf::remote_type::Type::PostgresInt8(
304 protobuf::PostgresInt8 {},
305 )),
306 },
307 RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
308 r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
309 protobuf::PostgresFloat4 {},
310 )),
311 },
312 RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
313 r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
314 protobuf::PostgresFloat8 {},
315 )),
316 },
317 RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
318 r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
319 protobuf::PostgresNumeric {
320 scale: *scale as i32,
321 },
322 )),
323 },
324 RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
325 r#type: Some(protobuf::remote_type::Type::PostgresName(
326 protobuf::PostgresName {},
327 )),
328 },
329 RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
330 r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
331 protobuf::PostgresVarchar {},
332 )),
333 },
334 RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
335 r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
336 protobuf::PostgresBpchar {},
337 )),
338 },
339 RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
340 r#type: Some(protobuf::remote_type::Type::PostgresText(
341 protobuf::PostgresText {},
342 )),
343 },
344 RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
345 r#type: Some(protobuf::remote_type::Type::PostgresBytea(
346 protobuf::PostgresBytea {},
347 )),
348 },
349 RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
350 r#type: Some(protobuf::remote_type::Type::PostgresDate(
351 protobuf::PostgresDate {},
352 )),
353 },
354 RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
355 r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
356 protobuf::PostgresTimestamp {},
357 )),
358 },
359 RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
360 r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
361 protobuf::PostgresTimestampTz {},
362 )),
363 },
364 RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
365 r#type: Some(protobuf::remote_type::Type::PostgresTime(
366 protobuf::PostgresTime {},
367 )),
368 },
369 RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
370 r#type: Some(protobuf::remote_type::Type::PostgresInterval(
371 protobuf::PostgresInterval {},
372 )),
373 },
374 RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
375 r#type: Some(protobuf::remote_type::Type::PostgresBool(
376 protobuf::PostgresBool {},
377 )),
378 },
379 RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
380 r#type: Some(protobuf::remote_type::Type::PostgresJson(
381 protobuf::PostgresJson {},
382 )),
383 },
384 RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
385 r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
386 protobuf::PostgresJsonb {},
387 )),
388 },
389 RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
390 r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
391 protobuf::PostgresInt2Array {},
392 )),
393 },
394 RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
395 r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
396 protobuf::PostgresInt4Array {},
397 )),
398 },
399 RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
400 r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
401 protobuf::PostgresInt8Array {},
402 )),
403 },
404 RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
405 r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
406 protobuf::PostgresFloat4Array {},
407 )),
408 },
409 RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
410 r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
411 protobuf::PostgresFloat8Array {},
412 )),
413 },
414 RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
415 r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
416 protobuf::PostgresVarcharArray {},
417 )),
418 },
419 RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
420 r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
421 protobuf::PostgresBpcharArray {},
422 )),
423 },
424 RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
425 r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
426 protobuf::PostgresTextArray {},
427 )),
428 },
429 RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
430 r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
431 protobuf::PostgresByteaArray {},
432 )),
433 },
434 RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
435 r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
436 protobuf::PostgresBoolArray {},
437 )),
438 },
439 RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
440 r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
441 protobuf::PostgresPostGisGeometry {},
442 )),
443 },
444 RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
445 r#type: Some(protobuf::remote_type::Type::PostgresOid(
446 protobuf::PostgresOid {},
447 )),
448 },
449
450 RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
451 r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
452 protobuf::MysqlTinyInt {},
453 )),
454 },
455 RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
456 r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
457 protobuf::MysqlTinyIntUnsigned {},
458 )),
459 },
460 RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
461 r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
462 protobuf::MysqlSmallInt {},
463 )),
464 },
465 RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
466 r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
467 protobuf::MysqlSmallIntUnsigned {},
468 )),
469 },
470 RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
471 r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
472 protobuf::MysqlMediumInt {},
473 )),
474 },
475 RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
476 r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
477 protobuf::MysqlMediumIntUnsigned {},
478 )),
479 },
480 RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
481 r#type: Some(protobuf::remote_type::Type::MysqlInteger(
482 protobuf::MysqlInteger {},
483 )),
484 },
485 RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
486 r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
487 protobuf::MysqlIntegerUnsigned {},
488 )),
489 },
490 RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
491 r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
492 protobuf::MysqlBigInt {},
493 )),
494 },
495 RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
496 r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
497 protobuf::MysqlBigIntUnsigned {},
498 )),
499 },
500 RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
501 r#type: Some(protobuf::remote_type::Type::MysqlFloat(
502 protobuf::MysqlFloat {},
503 )),
504 },
505 RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
506 r#type: Some(protobuf::remote_type::Type::MysqlDouble(
507 protobuf::MysqlDouble {},
508 )),
509 },
510 RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
511 r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
512 protobuf::MysqlDecimal {
513 precision: *precision as u32,
514 scale: *scale as u32,
515 },
516 )),
517 },
518 RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
519 r#type: Some(protobuf::remote_type::Type::MysqlDate(
520 protobuf::MysqlDate {},
521 )),
522 },
523 RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
524 r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
525 protobuf::MysqlDateTime {},
526 )),
527 },
528 RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
529 r#type: Some(protobuf::remote_type::Type::MysqlTime(
530 protobuf::MysqlTime {},
531 )),
532 },
533 RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
534 r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
535 protobuf::MysqlTimestamp {},
536 )),
537 },
538 RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
539 r#type: Some(protobuf::remote_type::Type::MysqlYear(
540 protobuf::MysqlYear {},
541 )),
542 },
543 RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
544 r#type: Some(protobuf::remote_type::Type::MysqlChar(
545 protobuf::MysqlChar {},
546 )),
547 },
548 RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
549 r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
550 protobuf::MysqlVarchar {},
551 )),
552 },
553 RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
554 r#type: Some(protobuf::remote_type::Type::MysqlBinary(
555 protobuf::MysqlBinary {},
556 )),
557 },
558 RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
559 r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
560 protobuf::MysqlVarbinary {},
561 )),
562 },
563 RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
564 r#type: Some(protobuf::remote_type::Type::MysqlText(
565 protobuf::MysqlText { length: *len },
566 )),
567 },
568 RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
569 r#type: Some(protobuf::remote_type::Type::MysqlBlob(
570 protobuf::MysqlBlob { length: *len },
571 )),
572 },
573 RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
574 r#type: Some(protobuf::remote_type::Type::MysqlJson(
575 protobuf::MysqlJson {},
576 )),
577 },
578 RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
579 r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
580 protobuf::MysqlGeometry {},
581 )),
582 },
583
584 RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
585 r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
586 protobuf::OracleVarchar2 { length: *len },
587 )),
588 },
589 RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
590 r#type: Some(protobuf::remote_type::Type::OracleChar(
591 protobuf::OracleChar { length: *len },
592 )),
593 },
594 RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
595 r#type: Some(protobuf::remote_type::Type::OracleNumber(
596 protobuf::OracleNumber {
597 precision: *precision as u32,
598 scale: *scale as i32,
599 },
600 )),
601 },
602 RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
603 r#type: Some(protobuf::remote_type::Type::OracleDate(
604 protobuf::OracleDate {},
605 )),
606 },
607 RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
608 r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
609 protobuf::OracleTimestamp {},
610 )),
611 },
612 RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
613 r#type: Some(protobuf::remote_type::Type::OracleBoolean(
614 protobuf::OracleBoolean {},
615 )),
616 },
617 RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
618 r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
619 protobuf::OracleBinaryFloat {},
620 )),
621 },
622 RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
623 r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
624 protobuf::OracleBinaryDouble {},
625 )),
626 },
627 RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
628 r#type: Some(protobuf::remote_type::Type::OracleBlob(
629 protobuf::OracleBlob {},
630 )),
631 },
632 RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
633 r#type: Some(protobuf::remote_type::Type::OracleFloat(
634 protobuf::OracleFloat {
635 precision: *precision as u32,
636 },
637 )),
638 },
639 RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
640 r#type: Some(protobuf::remote_type::Type::OracleNchar(
641 protobuf::OracleNChar { length: *len },
642 )),
643 },
644 RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
645 r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
646 protobuf::OracleNVarchar2 { length: *len },
647 )),
648 },
649 RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
650 r#type: Some(protobuf::remote_type::Type::OracleRaw(
651 protobuf::OracleRaw { length: *len },
652 )),
653 },
654 RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
655 r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
656 protobuf::OracleLongRaw {},
657 )),
658 },
659 RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
660 r#type: Some(protobuf::remote_type::Type::OracleLong(
661 protobuf::OracleLong {},
662 )),
663 },
664 RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
665 r#type: Some(protobuf::remote_type::Type::OracleClob(
666 protobuf::OracleClob {},
667 )),
668 },
669 RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
670 r#type: Some(protobuf::remote_type::Type::OracleNclob(
671 protobuf::OracleNClob {},
672 )),
673 },
674 RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
675 r#type: Some(protobuf::remote_type::Type::SqliteNull(
676 protobuf::SqliteNull {},
677 )),
678 },
679 RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
680 r#type: Some(protobuf::remote_type::Type::SqliteInteger(
681 protobuf::SqliteInteger {},
682 )),
683 },
684 RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
685 r#type: Some(protobuf::remote_type::Type::SqliteReal(
686 protobuf::SqliteReal {},
687 )),
688 },
689 RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
690 r#type: Some(protobuf::remote_type::Type::SqliteText(
691 protobuf::SqliteText {},
692 )),
693 },
694 RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
695 r#type: Some(protobuf::remote_type::Type::SqliteBlob(
696 protobuf::SqliteBlob {},
697 )),
698 },
699 }
700}
701
702fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
703 let fields = remote_schema
704 .fields
705 .iter()
706 .map(parse_remote_field)
707 .collect::<Vec<_>>();
708
709 RemoteSchema { fields }
710}
711
712fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
713 RemoteField {
714 name: field.name.clone(),
715 remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
716 nullable: field.nullable,
717 }
718}
719
720fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
721 match remote_type.r#type.as_ref().unwrap() {
722 protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
723 protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
724 protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
725 protobuf::remote_type::Type::PostgresFloat4(_) => {
726 RemoteType::Postgres(PostgresType::Float4)
727 }
728 protobuf::remote_type::Type::PostgresFloat8(_) => {
729 RemoteType::Postgres(PostgresType::Float8)
730 }
731 protobuf::remote_type::Type::PostgresNumeric(numeric) => {
732 RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
733 }
734 protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
735 protobuf::remote_type::Type::PostgresVarchar(_) => {
736 RemoteType::Postgres(PostgresType::Varchar)
737 }
738 protobuf::remote_type::Type::PostgresBpchar(_) => {
739 RemoteType::Postgres(PostgresType::Bpchar)
740 }
741 protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
742 protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
743 protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
744 protobuf::remote_type::Type::PostgresTimestamp(_) => {
745 RemoteType::Postgres(PostgresType::Timestamp)
746 }
747 protobuf::remote_type::Type::PostgresTimestampTz(_) => {
748 RemoteType::Postgres(PostgresType::TimestampTz)
749 }
750 protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
751 protobuf::remote_type::Type::PostgresInterval(_) => {
752 RemoteType::Postgres(PostgresType::Interval)
753 }
754 protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
755 protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
756 protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
757 protobuf::remote_type::Type::PostgresInt2Array(_) => {
758 RemoteType::Postgres(PostgresType::Int2Array)
759 }
760 protobuf::remote_type::Type::PostgresInt4Array(_) => {
761 RemoteType::Postgres(PostgresType::Int4Array)
762 }
763 protobuf::remote_type::Type::PostgresInt8Array(_) => {
764 RemoteType::Postgres(PostgresType::Int8Array)
765 }
766 protobuf::remote_type::Type::PostgresFloat4Array(_) => {
767 RemoteType::Postgres(PostgresType::Float4Array)
768 }
769 protobuf::remote_type::Type::PostgresFloat8Array(_) => {
770 RemoteType::Postgres(PostgresType::Float8Array)
771 }
772 protobuf::remote_type::Type::PostgresVarcharArray(_) => {
773 RemoteType::Postgres(PostgresType::VarcharArray)
774 }
775 protobuf::remote_type::Type::PostgresBpcharArray(_) => {
776 RemoteType::Postgres(PostgresType::BpcharArray)
777 }
778 protobuf::remote_type::Type::PostgresTextArray(_) => {
779 RemoteType::Postgres(PostgresType::TextArray)
780 }
781 protobuf::remote_type::Type::PostgresByteaArray(_) => {
782 RemoteType::Postgres(PostgresType::ByteaArray)
783 }
784 protobuf::remote_type::Type::PostgresBoolArray(_) => {
785 RemoteType::Postgres(PostgresType::BoolArray)
786 }
787 protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
788 RemoteType::Postgres(PostgresType::PostGisGeometry)
789 }
790 protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
791 protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
792 protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
793 RemoteType::Mysql(MysqlType::TinyIntUnsigned)
794 }
795 protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
796 protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
797 RemoteType::Mysql(MysqlType::SmallIntUnsigned)
798 }
799 protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
800 protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
801 RemoteType::Mysql(MysqlType::MediumIntUnsigned)
802 }
803 protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
804 protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
805 RemoteType::Mysql(MysqlType::IntegerUnsigned)
806 }
807 protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
808 protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
809 RemoteType::Mysql(MysqlType::BigIntUnsigned)
810 }
811 protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
812 protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
813 protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
814 MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
815 ),
816 protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
817 protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
818 protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
819 protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
820 protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
821 protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
822 protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
823 protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
824 protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
825 protobuf::remote_type::Type::MysqlText(text) => {
826 RemoteType::Mysql(MysqlType::Text(text.length))
827 }
828 protobuf::remote_type::Type::MysqlBlob(blob) => {
829 RemoteType::Mysql(MysqlType::Blob(blob.length))
830 }
831 protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
832 protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
833 protobuf::remote_type::Type::OracleVarchar2(varchar) => {
834 RemoteType::Oracle(OracleType::Varchar2(varchar.length))
835 }
836 protobuf::remote_type::Type::OracleChar(char) => {
837 RemoteType::Oracle(OracleType::Char(char.length))
838 }
839 protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
840 OracleType::Number(number.precision as u8, number.scale as i8),
841 ),
842 protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
843 protobuf::remote_type::Type::OracleTimestamp(_) => {
844 RemoteType::Oracle(OracleType::Timestamp)
845 }
846 protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
847 protobuf::remote_type::Type::OracleBinaryFloat(_) => {
848 RemoteType::Oracle(OracleType::BinaryFloat)
849 }
850 protobuf::remote_type::Type::OracleBinaryDouble(_) => {
851 RemoteType::Oracle(OracleType::BinaryDouble)
852 }
853 protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
854 RemoteType::Oracle(OracleType::Float(*precision as u8))
855 }
856 protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
857 RemoteType::Oracle(OracleType::NChar(*length))
858 }
859 protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
860 RemoteType::Oracle(OracleType::NVarchar2(*length))
861 }
862 protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
863 RemoteType::Oracle(OracleType::Raw(*length))
864 }
865 protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
866 protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
867 protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
868 protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
869 protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
870 protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
871 protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
872 protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
873 protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
874 protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
875 }
876}