1#[cfg(feature = "mysql")]
2use crate::MysqlConnectionOptions;
3#[cfg(feature = "oracle")]
4use crate::OracleConnectionOptions;
5#[cfg(feature = "postgres")]
6use crate::PostgresConnectionOptions;
7use crate::generated::prost as protobuf;
8use crate::{
9 ConnectionOptions, DFResult, MysqlType, OracleType, PostgresType, RemoteField, RemoteSchema,
10 RemoteSchemaRef, RemoteTableExec, RemoteType, SqliteType, Transform, connect,
11};
12use datafusion::arrow::datatypes::SchemaRef;
13use datafusion::common::DataFusionError;
14use datafusion::execution::FunctionRegistry;
15use datafusion::physical_plan::ExecutionPlan;
16use datafusion_proto::convert_required;
17use datafusion_proto::physical_plan::PhysicalExtensionCodec;
18use datafusion_proto::protobuf::proto_error;
19use prost::Message;
20use std::fmt::Debug;
21#[cfg(feature = "sqlite")]
22use std::path::Path;
23use std::sync::Arc;
24
25pub trait TransformCodec: Debug + Send + Sync {
26 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
27 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
28}
29
30#[derive(Debug)]
31pub struct RemotePhysicalCodec {
32 transform_codec: Option<Arc<dyn TransformCodec>>,
33}
34
35impl RemotePhysicalCodec {
36 pub fn new(transform_codec: Option<Arc<dyn TransformCodec>>) -> Self {
37 Self { transform_codec }
38 }
39}
40
41impl PhysicalExtensionCodec for RemotePhysicalCodec {
42 fn try_decode(
43 &self,
44 buf: &[u8],
45 _inputs: &[Arc<dyn ExecutionPlan>],
46 _registry: &dyn FunctionRegistry,
47 ) -> DFResult<Arc<dyn ExecutionPlan>> {
48 let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
49 DataFusionError::Internal(format!(
50 "Failed to decode remote table execution plan: {e:?}"
51 ))
52 })?;
53
54 let transform = if let Some(bytes) = proto.transform {
55 let Some(transform_codec) = self.transform_codec.as_ref() else {
56 return Err(DataFusionError::Execution(
57 "No transform codec found".to_string(),
58 ));
59 };
60 Some(transform_codec.try_decode(&bytes)?)
61 } else {
62 None
63 };
64
65 let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
66 let remote_schema = proto
67 .remote_schema
68 .map(|schema| Arc::new(parse_remote_schema(&schema)));
69
70 let projection: Option<Vec<usize>> = proto
71 .projection
72 .map(|p| p.projection.iter().map(|n| *n as usize).collect());
73
74 let conn_options = parse_connection_options(proto.conn_options.unwrap());
75 let conn = tokio::task::block_in_place(|| {
76 tokio::runtime::Handle::current().block_on(async {
77 let pool = connect(&conn_options).await?;
78 let conn = pool.get().await?;
79 Ok::<_, DataFusionError>(conn)
80 })
81 })?;
82
83 Ok(Arc::new(RemoteTableExec::try_new(
84 conn_options,
85 proto.sql,
86 table_schema,
87 remote_schema,
88 projection,
89 transform,
90 conn,
91 )?))
92 }
93
94 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
95 if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
96 let serialized_transform = if let Some(transform) = exec.transform.as_ref() {
97 let Some(transform_codec) = self.transform_codec.as_ref() else {
98 return Err(DataFusionError::Execution(
99 "No transform codec found".to_string(),
100 ));
101 };
102 let bytes = transform_codec.try_encode(transform.as_ref())?;
103 Some(bytes)
104 } else {
105 None
106 };
107
108 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
109 let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
110
111 let proto = protobuf::RemoteTableExec {
112 conn_options: Some(serialized_connection_options),
113 sql: exec.sql.clone(),
114 table_schema: Some(exec.table_schema.as_ref().try_into()?),
115 remote_schema,
116 projection: exec
117 .projection
118 .as_ref()
119 .map(|p| serialize_projection(p.as_slice())),
120 transform: serialized_transform,
121 };
122
123 proto.encode(buf).map_err(|e| {
124 DataFusionError::Internal(format!(
125 "Failed to encode remote table execution plan: {e:?}"
126 ))
127 })?;
128 Ok(())
129 } else {
130 Err(DataFusionError::Execution(format!(
131 "Failed to encode {}",
132 RemoteTableExec::static_name()
133 )))
134 }
135 }
136}
137
138fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
139 match options {
140 #[cfg(feature = "postgres")]
141 ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
142 connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
143 protobuf::PostgresConnectionOptions {
144 host: options.host.clone(),
145 port: options.port as u32,
146 username: options.username.clone(),
147 password: options.password.clone(),
148 database: options.database.clone(),
149 pool_max_size: options.pool_max_size as u32,
150 stream_chunk_size: options.stream_chunk_size as u32,
151 },
152 )),
153 },
154 #[cfg(feature = "mysql")]
155 ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
156 connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
157 protobuf::MysqlConnectionOptions {
158 host: options.host.clone(),
159 port: options.port as u32,
160 username: options.username.clone(),
161 password: options.password.clone(),
162 database: options.database.clone(),
163 pool_max_size: options.pool_max_size as u32,
164 stream_chunk_size: options.stream_chunk_size as u32,
165 },
166 )),
167 },
168 #[cfg(feature = "oracle")]
169 ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
170 connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
171 protobuf::OracleConnectionOptions {
172 host: options.host.clone(),
173 port: options.port as u32,
174 username: options.username.clone(),
175 password: options.password.clone(),
176 service_name: options.service_name.clone(),
177 pool_max_size: options.pool_max_size as u32,
178 stream_chunk_size: options.stream_chunk_size as u32,
179 },
180 )),
181 },
182 #[cfg(feature = "sqlite")]
183 ConnectionOptions::Sqlite(path) => protobuf::ConnectionOptions {
184 connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
185 protobuf::SqliteConnectionOptions {
186 path: path.to_str().unwrap().to_string(),
187 },
188 )),
189 },
190 }
191}
192
193fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
194 match options.connection_options {
195 #[cfg(feature = "postgres")]
196 Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
197 ConnectionOptions::Postgres(PostgresConnectionOptions {
198 host: options.host,
199 port: options.port as u16,
200 username: options.username,
201 password: options.password,
202 database: options.database,
203 pool_max_size: options.pool_max_size as usize,
204 stream_chunk_size: options.stream_chunk_size as usize,
205 })
206 }
207 #[cfg(feature = "mysql")]
208 Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
209 ConnectionOptions::Mysql(MysqlConnectionOptions {
210 host: options.host,
211 port: options.port as u16,
212 username: options.username,
213 password: options.password,
214 database: options.database,
215 pool_max_size: options.pool_max_size as usize,
216 stream_chunk_size: options.stream_chunk_size as usize,
217 })
218 }
219 #[cfg(feature = "oracle")]
220 Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
221 ConnectionOptions::Oracle(OracleConnectionOptions {
222 host: options.host,
223 port: options.port as u16,
224 username: options.username,
225 password: options.password,
226 service_name: options.service_name,
227 pool_max_size: options.pool_max_size as usize,
228 stream_chunk_size: options.stream_chunk_size as usize,
229 })
230 }
231 #[cfg(feature = "sqlite")]
232 Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
233 ConnectionOptions::Sqlite(Path::new(&options.path).to_path_buf())
234 }
235 _ => panic!("Failed to parse connection options: {options:?}"),
236 }
237}
238
239fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
240 protobuf::Projection {
241 projection: projection.iter().map(|n| *n as u32).collect(),
242 }
243}
244
245fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
246 let fields = remote_schema
247 .fields
248 .iter()
249 .map(serialize_remote_field)
250 .collect::<Vec<_>>();
251
252 protobuf::RemoteSchema { fields }
253}
254
255fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
256 protobuf::RemoteField {
257 name: remote_field.name.clone(),
258 remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
259 nullable: remote_field.nullable,
260 }
261}
262
263fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
264 match remote_type {
265 RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
266 r#type: Some(protobuf::remote_type::Type::PostgresInt2(
267 protobuf::PostgresInt2 {},
268 )),
269 },
270 RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
271 r#type: Some(protobuf::remote_type::Type::PostgresInt4(
272 protobuf::PostgresInt4 {},
273 )),
274 },
275 RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
276 r#type: Some(protobuf::remote_type::Type::PostgresInt8(
277 protobuf::PostgresInt8 {},
278 )),
279 },
280 RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
281 r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
282 protobuf::PostgresFloat4 {},
283 )),
284 },
285 RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
286 r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
287 protobuf::PostgresFloat8 {},
288 )),
289 },
290 RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
291 r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
292 protobuf::PostgresNumeric {
293 scale: *scale as i32,
294 },
295 )),
296 },
297 RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
298 r#type: Some(protobuf::remote_type::Type::PostgresName(
299 protobuf::PostgresName {},
300 )),
301 },
302 RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
303 r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
304 protobuf::PostgresVarchar {},
305 )),
306 },
307 RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
308 r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
309 protobuf::PostgresBpchar {},
310 )),
311 },
312 RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
313 r#type: Some(protobuf::remote_type::Type::PostgresText(
314 protobuf::PostgresText {},
315 )),
316 },
317 RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
318 r#type: Some(protobuf::remote_type::Type::PostgresBytea(
319 protobuf::PostgresBytea {},
320 )),
321 },
322 RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
323 r#type: Some(protobuf::remote_type::Type::PostgresDate(
324 protobuf::PostgresDate {},
325 )),
326 },
327 RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
328 r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
329 protobuf::PostgresTimestamp {},
330 )),
331 },
332 RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
333 r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
334 protobuf::PostgresTimestampTz {},
335 )),
336 },
337 RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
338 r#type: Some(protobuf::remote_type::Type::PostgresTime(
339 protobuf::PostgresTime {},
340 )),
341 },
342 RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
343 r#type: Some(protobuf::remote_type::Type::PostgresInterval(
344 protobuf::PostgresInterval {},
345 )),
346 },
347 RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
348 r#type: Some(protobuf::remote_type::Type::PostgresBool(
349 protobuf::PostgresBool {},
350 )),
351 },
352 RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
353 r#type: Some(protobuf::remote_type::Type::PostgresJson(
354 protobuf::PostgresJson {},
355 )),
356 },
357 RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
358 r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
359 protobuf::PostgresJsonb {},
360 )),
361 },
362 RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
363 r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
364 protobuf::PostgresInt2Array {},
365 )),
366 },
367 RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
368 r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
369 protobuf::PostgresInt4Array {},
370 )),
371 },
372 RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
373 r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
374 protobuf::PostgresInt8Array {},
375 )),
376 },
377 RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
378 r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
379 protobuf::PostgresFloat4Array {},
380 )),
381 },
382 RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
383 r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
384 protobuf::PostgresFloat8Array {},
385 )),
386 },
387 RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
388 r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
389 protobuf::PostgresVarcharArray {},
390 )),
391 },
392 RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
393 r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
394 protobuf::PostgresBpcharArray {},
395 )),
396 },
397 RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
398 r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
399 protobuf::PostgresTextArray {},
400 )),
401 },
402 RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
403 r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
404 protobuf::PostgresByteaArray {},
405 )),
406 },
407 RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
408 r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
409 protobuf::PostgresBoolArray {},
410 )),
411 },
412 RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
413 r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
414 protobuf::PostgresPostGisGeometry {},
415 )),
416 },
417 RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
418 r#type: Some(protobuf::remote_type::Type::PostgresOid(
419 protobuf::PostgresOid {},
420 )),
421 },
422
423 RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
424 r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
425 protobuf::MysqlTinyInt {},
426 )),
427 },
428 RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
429 r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
430 protobuf::MysqlTinyIntUnsigned {},
431 )),
432 },
433 RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
434 r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
435 protobuf::MysqlSmallInt {},
436 )),
437 },
438 RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
439 r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
440 protobuf::MysqlSmallIntUnsigned {},
441 )),
442 },
443 RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
444 r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
445 protobuf::MysqlMediumInt {},
446 )),
447 },
448 RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
449 r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
450 protobuf::MysqlMediumIntUnsigned {},
451 )),
452 },
453 RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
454 r#type: Some(protobuf::remote_type::Type::MysqlInteger(
455 protobuf::MysqlInteger {},
456 )),
457 },
458 RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
459 r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
460 protobuf::MysqlIntegerUnsigned {},
461 )),
462 },
463 RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
464 r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
465 protobuf::MysqlBigInt {},
466 )),
467 },
468 RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
469 r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
470 protobuf::MysqlBigIntUnsigned {},
471 )),
472 },
473 RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
474 r#type: Some(protobuf::remote_type::Type::MysqlFloat(
475 protobuf::MysqlFloat {},
476 )),
477 },
478 RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
479 r#type: Some(protobuf::remote_type::Type::MysqlDouble(
480 protobuf::MysqlDouble {},
481 )),
482 },
483 RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
484 r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
485 protobuf::MysqlDecimal {
486 precision: *precision as u32,
487 scale: *scale as u32,
488 },
489 )),
490 },
491 RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
492 r#type: Some(protobuf::remote_type::Type::MysqlDate(
493 protobuf::MysqlDate {},
494 )),
495 },
496 RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
497 r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
498 protobuf::MysqlDateTime {},
499 )),
500 },
501 RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
502 r#type: Some(protobuf::remote_type::Type::MysqlTime(
503 protobuf::MysqlTime {},
504 )),
505 },
506 RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
507 r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
508 protobuf::MysqlTimestamp {},
509 )),
510 },
511 RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
512 r#type: Some(protobuf::remote_type::Type::MysqlYear(
513 protobuf::MysqlYear {},
514 )),
515 },
516 RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
517 r#type: Some(protobuf::remote_type::Type::MysqlChar(
518 protobuf::MysqlChar {},
519 )),
520 },
521 RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
522 r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
523 protobuf::MysqlVarchar {},
524 )),
525 },
526 RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
527 r#type: Some(protobuf::remote_type::Type::MysqlBinary(
528 protobuf::MysqlBinary {},
529 )),
530 },
531 RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
532 r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
533 protobuf::MysqlVarbinary {},
534 )),
535 },
536 RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
537 r#type: Some(protobuf::remote_type::Type::MysqlText(
538 protobuf::MysqlText { length: *len },
539 )),
540 },
541 RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
542 r#type: Some(protobuf::remote_type::Type::MysqlBlob(
543 protobuf::MysqlBlob { length: *len },
544 )),
545 },
546 RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
547 r#type: Some(protobuf::remote_type::Type::MysqlJson(
548 protobuf::MysqlJson {},
549 )),
550 },
551 RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
552 r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
553 protobuf::MysqlGeometry {},
554 )),
555 },
556
557 RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
558 r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
559 protobuf::OracleVarchar2 { length: *len },
560 )),
561 },
562 RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
563 r#type: Some(protobuf::remote_type::Type::OracleChar(
564 protobuf::OracleChar { length: *len },
565 )),
566 },
567 RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
568 r#type: Some(protobuf::remote_type::Type::OracleNumber(
569 protobuf::OracleNumber {
570 precision: *precision as u32,
571 scale: *scale as i32,
572 },
573 )),
574 },
575 RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
576 r#type: Some(protobuf::remote_type::Type::OracleDate(
577 protobuf::OracleDate {},
578 )),
579 },
580 RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
581 r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
582 protobuf::OracleTimestamp {},
583 )),
584 },
585 RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
586 r#type: Some(protobuf::remote_type::Type::OracleBoolean(
587 protobuf::OracleBoolean {},
588 )),
589 },
590 RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
591 r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
592 protobuf::OracleBinaryFloat {},
593 )),
594 },
595 RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
596 r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
597 protobuf::OracleBinaryDouble {},
598 )),
599 },
600 RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
601 r#type: Some(protobuf::remote_type::Type::OracleBlob(
602 protobuf::OracleBlob {},
603 )),
604 },
605 RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
606 r#type: Some(protobuf::remote_type::Type::OracleFloat(
607 protobuf::OracleFloat {
608 precision: *precision as u32,
609 },
610 )),
611 },
612 RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
613 r#type: Some(protobuf::remote_type::Type::OracleNchar(
614 protobuf::OracleNChar { length: *len },
615 )),
616 },
617 RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
618 r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
619 protobuf::OracleNVarchar2 { length: *len },
620 )),
621 },
622 RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
623 r#type: Some(protobuf::remote_type::Type::OracleRaw(
624 protobuf::OracleRaw { length: *len },
625 )),
626 },
627 RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
628 r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
629 protobuf::OracleLongRaw {},
630 )),
631 },
632 RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
633 r#type: Some(protobuf::remote_type::Type::OracleLong(
634 protobuf::OracleLong {},
635 )),
636 },
637 RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
638 r#type: Some(protobuf::remote_type::Type::OracleClob(
639 protobuf::OracleClob {},
640 )),
641 },
642 RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
643 r#type: Some(protobuf::remote_type::Type::OracleNclob(
644 protobuf::OracleNClob {},
645 )),
646 },
647 RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
648 r#type: Some(protobuf::remote_type::Type::SqliteNull(
649 protobuf::SqliteNull {},
650 )),
651 },
652 RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
653 r#type: Some(protobuf::remote_type::Type::SqliteInteger(
654 protobuf::SqliteInteger {},
655 )),
656 },
657 RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
658 r#type: Some(protobuf::remote_type::Type::SqliteReal(
659 protobuf::SqliteReal {},
660 )),
661 },
662 RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
663 r#type: Some(protobuf::remote_type::Type::SqliteText(
664 protobuf::SqliteText {},
665 )),
666 },
667 RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
668 r#type: Some(protobuf::remote_type::Type::SqliteBlob(
669 protobuf::SqliteBlob {},
670 )),
671 },
672 }
673}
674
675fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
676 let fields = remote_schema
677 .fields
678 .iter()
679 .map(parse_remote_field)
680 .collect::<Vec<_>>();
681
682 RemoteSchema { fields }
683}
684
685fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
686 RemoteField {
687 name: field.name.clone(),
688 remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
689 nullable: field.nullable,
690 }
691}
692
693fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
694 match remote_type.r#type.as_ref().unwrap() {
695 protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
696 protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
697 protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
698 protobuf::remote_type::Type::PostgresFloat4(_) => {
699 RemoteType::Postgres(PostgresType::Float4)
700 }
701 protobuf::remote_type::Type::PostgresFloat8(_) => {
702 RemoteType::Postgres(PostgresType::Float8)
703 }
704 protobuf::remote_type::Type::PostgresNumeric(numeric) => {
705 RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
706 }
707 protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
708 protobuf::remote_type::Type::PostgresVarchar(_) => {
709 RemoteType::Postgres(PostgresType::Varchar)
710 }
711 protobuf::remote_type::Type::PostgresBpchar(_) => {
712 RemoteType::Postgres(PostgresType::Bpchar)
713 }
714 protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
715 protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
716 protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
717 protobuf::remote_type::Type::PostgresTimestamp(_) => {
718 RemoteType::Postgres(PostgresType::Timestamp)
719 }
720 protobuf::remote_type::Type::PostgresTimestampTz(_) => {
721 RemoteType::Postgres(PostgresType::TimestampTz)
722 }
723 protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
724 protobuf::remote_type::Type::PostgresInterval(_) => {
725 RemoteType::Postgres(PostgresType::Interval)
726 }
727 protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
728 protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
729 protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
730 protobuf::remote_type::Type::PostgresInt2Array(_) => {
731 RemoteType::Postgres(PostgresType::Int2Array)
732 }
733 protobuf::remote_type::Type::PostgresInt4Array(_) => {
734 RemoteType::Postgres(PostgresType::Int4Array)
735 }
736 protobuf::remote_type::Type::PostgresInt8Array(_) => {
737 RemoteType::Postgres(PostgresType::Int8Array)
738 }
739 protobuf::remote_type::Type::PostgresFloat4Array(_) => {
740 RemoteType::Postgres(PostgresType::Float4Array)
741 }
742 protobuf::remote_type::Type::PostgresFloat8Array(_) => {
743 RemoteType::Postgres(PostgresType::Float8Array)
744 }
745 protobuf::remote_type::Type::PostgresVarcharArray(_) => {
746 RemoteType::Postgres(PostgresType::VarcharArray)
747 }
748 protobuf::remote_type::Type::PostgresBpcharArray(_) => {
749 RemoteType::Postgres(PostgresType::BpcharArray)
750 }
751 protobuf::remote_type::Type::PostgresTextArray(_) => {
752 RemoteType::Postgres(PostgresType::TextArray)
753 }
754 protobuf::remote_type::Type::PostgresByteaArray(_) => {
755 RemoteType::Postgres(PostgresType::ByteaArray)
756 }
757 protobuf::remote_type::Type::PostgresBoolArray(_) => {
758 RemoteType::Postgres(PostgresType::BoolArray)
759 }
760 protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
761 RemoteType::Postgres(PostgresType::PostGisGeometry)
762 }
763 protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
764 protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
765 protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
766 RemoteType::Mysql(MysqlType::TinyIntUnsigned)
767 }
768 protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
769 protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
770 RemoteType::Mysql(MysqlType::SmallIntUnsigned)
771 }
772 protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
773 protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
774 RemoteType::Mysql(MysqlType::MediumIntUnsigned)
775 }
776 protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
777 protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
778 RemoteType::Mysql(MysqlType::IntegerUnsigned)
779 }
780 protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
781 protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
782 RemoteType::Mysql(MysqlType::BigIntUnsigned)
783 }
784 protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
785 protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
786 protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
787 MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
788 ),
789 protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
790 protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
791 protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
792 protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
793 protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
794 protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
795 protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
796 protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
797 protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
798 protobuf::remote_type::Type::MysqlText(text) => {
799 RemoteType::Mysql(MysqlType::Text(text.length))
800 }
801 protobuf::remote_type::Type::MysqlBlob(blob) => {
802 RemoteType::Mysql(MysqlType::Blob(blob.length))
803 }
804 protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
805 protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
806 protobuf::remote_type::Type::OracleVarchar2(varchar) => {
807 RemoteType::Oracle(OracleType::Varchar2(varchar.length))
808 }
809 protobuf::remote_type::Type::OracleChar(char) => {
810 RemoteType::Oracle(OracleType::Char(char.length))
811 }
812 protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
813 OracleType::Number(number.precision as u8, number.scale as i8),
814 ),
815 protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
816 protobuf::remote_type::Type::OracleTimestamp(_) => {
817 RemoteType::Oracle(OracleType::Timestamp)
818 }
819 protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
820 protobuf::remote_type::Type::OracleBinaryFloat(_) => {
821 RemoteType::Oracle(OracleType::BinaryFloat)
822 }
823 protobuf::remote_type::Type::OracleBinaryDouble(_) => {
824 RemoteType::Oracle(OracleType::BinaryDouble)
825 }
826 protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
827 RemoteType::Oracle(OracleType::Float(*precision as u8))
828 }
829 protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
830 RemoteType::Oracle(OracleType::NChar(*length))
831 }
832 protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
833 RemoteType::Oracle(OracleType::NVarchar2(*length))
834 }
835 protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
836 RemoteType::Oracle(OracleType::Raw(*length))
837 }
838 protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
839 protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
840 protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
841 protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
842 protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
843 protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
844 protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
845 protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
846 protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
847 protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
848 }
849}