1#[cfg(feature = "mysql")]
2use crate::MysqlConnectionOptions;
3#[cfg(feature = "oracle")]
4use crate::OracleConnectionOptions;
5#[cfg(feature = "postgres")]
6use crate::PostgresConnectionOptions;
7use crate::generated::prost as protobuf;
8use crate::{
9 ConnectionOptions, DFResult, MysqlType, OracleType, PostgresType, RemoteField, RemoteSchema,
10 RemoteSchemaRef, RemoteTableExec, RemoteType, SqliteType, Transform, connect,
11};
12use datafusion::arrow::datatypes::SchemaRef;
13use datafusion::common::DataFusionError;
14use datafusion::execution::FunctionRegistry;
15use datafusion::physical_plan::ExecutionPlan;
16use datafusion_proto::convert_required;
17use datafusion_proto::physical_plan::PhysicalExtensionCodec;
18use datafusion_proto::protobuf::proto_error;
19use prost::Message;
20use std::fmt::Debug;
21#[cfg(feature = "sqlite")]
22use std::path::Path;
23use std::sync::Arc;
24
25pub trait TransformCodec: Debug + Send + Sync {
26 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
27 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
28}
29
30#[derive(Debug)]
31pub struct RemotePhysicalCodec {
32 transform_codec: Option<Arc<dyn TransformCodec>>,
33}
34
35impl RemotePhysicalCodec {
36 pub fn new(transform_codec: Option<Arc<dyn TransformCodec>>) -> Self {
37 Self { transform_codec }
38 }
39}
40
41impl PhysicalExtensionCodec for RemotePhysicalCodec {
42 fn try_decode(
43 &self,
44 buf: &[u8],
45 _inputs: &[Arc<dyn ExecutionPlan>],
46 _registry: &dyn FunctionRegistry,
47 ) -> DFResult<Arc<dyn ExecutionPlan>> {
48 let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
49 DataFusionError::Internal(format!(
50 "Failed to decode remote table execution plan: {e:?}"
51 ))
52 })?;
53
54 let transform = if let Some(bytes) = proto.transform {
55 let Some(transform_codec) = self.transform_codec.as_ref() else {
56 return Err(DataFusionError::Execution(
57 "No transform codec found".to_string(),
58 ));
59 };
60 Some(transform_codec.try_decode(&bytes)?)
61 } else {
62 None
63 };
64
65 let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
66 let remote_schema = proto
67 .remote_schema
68 .map(|schema| Arc::new(parse_remote_schema(&schema)));
69
70 let projection: Option<Vec<usize>> = proto
71 .projection
72 .map(|p| p.projection.iter().map(|n| *n as usize).collect());
73
74 let limit = proto.limit.map(|l| l as usize);
75
76 let conn_options = parse_connection_options(proto.conn_options.unwrap());
77 let conn = tokio::task::block_in_place(|| {
78 tokio::runtime::Handle::current().block_on(async {
79 let pool = connect(&conn_options).await?;
80 let conn = pool.get().await?;
81 Ok::<_, DataFusionError>(conn)
82 })
83 })?;
84
85 Ok(Arc::new(RemoteTableExec::try_new(
86 conn_options,
87 proto.sql,
88 table_schema,
89 remote_schema,
90 projection,
91 limit,
92 transform,
93 conn,
94 )?))
95 }
96
97 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
98 if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
99 let serialized_transform = if let Some(transform) = exec.transform.as_ref() {
100 let Some(transform_codec) = self.transform_codec.as_ref() else {
101 return Err(DataFusionError::Execution(
102 "No transform codec found".to_string(),
103 ));
104 };
105 let bytes = transform_codec.try_encode(transform.as_ref())?;
106 Some(bytes)
107 } else {
108 None
109 };
110
111 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
112 let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
113
114 let proto = protobuf::RemoteTableExec {
115 conn_options: Some(serialized_connection_options),
116 sql: exec.sql.clone(),
117 table_schema: Some(exec.table_schema.as_ref().try_into()?),
118 remote_schema,
119 projection: exec
120 .projection
121 .as_ref()
122 .map(|p| serialize_projection(p.as_slice())),
123 limit: exec.limit.map(|l| l as u32),
124 transform: serialized_transform,
125 };
126
127 proto.encode(buf).map_err(|e| {
128 DataFusionError::Internal(format!(
129 "Failed to encode remote table execution plan: {e:?}"
130 ))
131 })?;
132 Ok(())
133 } else {
134 Err(DataFusionError::Execution(format!(
135 "Failed to encode {}",
136 RemoteTableExec::static_name()
137 )))
138 }
139 }
140}
141
142fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
143 match options {
144 #[cfg(feature = "postgres")]
145 ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
146 connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
147 protobuf::PostgresConnectionOptions {
148 host: options.host.clone(),
149 port: options.port as u32,
150 username: options.username.clone(),
151 password: options.password.clone(),
152 database: options.database.clone(),
153 pool_max_size: options.pool_max_size as u32,
154 stream_chunk_size: options.stream_chunk_size as u32,
155 },
156 )),
157 },
158 #[cfg(feature = "mysql")]
159 ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
160 connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
161 protobuf::MysqlConnectionOptions {
162 host: options.host.clone(),
163 port: options.port as u32,
164 username: options.username.clone(),
165 password: options.password.clone(),
166 database: options.database.clone(),
167 pool_max_size: options.pool_max_size as u32,
168 stream_chunk_size: options.stream_chunk_size as u32,
169 },
170 )),
171 },
172 #[cfg(feature = "oracle")]
173 ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
174 connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
175 protobuf::OracleConnectionOptions {
176 host: options.host.clone(),
177 port: options.port as u32,
178 username: options.username.clone(),
179 password: options.password.clone(),
180 service_name: options.service_name.clone(),
181 pool_max_size: options.pool_max_size as u32,
182 stream_chunk_size: options.stream_chunk_size as u32,
183 },
184 )),
185 },
186 #[cfg(feature = "sqlite")]
187 ConnectionOptions::Sqlite(path) => protobuf::ConnectionOptions {
188 connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
189 protobuf::SqliteConnectionOptions {
190 path: path.to_str().unwrap().to_string(),
191 },
192 )),
193 },
194 }
195}
196
197fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
198 match options.connection_options {
199 #[cfg(feature = "postgres")]
200 Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
201 ConnectionOptions::Postgres(PostgresConnectionOptions {
202 host: options.host,
203 port: options.port as u16,
204 username: options.username,
205 password: options.password,
206 database: options.database,
207 pool_max_size: options.pool_max_size as usize,
208 stream_chunk_size: options.stream_chunk_size as usize,
209 })
210 }
211 #[cfg(feature = "mysql")]
212 Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
213 ConnectionOptions::Mysql(MysqlConnectionOptions {
214 host: options.host,
215 port: options.port as u16,
216 username: options.username,
217 password: options.password,
218 database: options.database,
219 pool_max_size: options.pool_max_size as usize,
220 stream_chunk_size: options.stream_chunk_size as usize,
221 })
222 }
223 #[cfg(feature = "oracle")]
224 Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
225 ConnectionOptions::Oracle(OracleConnectionOptions {
226 host: options.host,
227 port: options.port as u16,
228 username: options.username,
229 password: options.password,
230 service_name: options.service_name,
231 pool_max_size: options.pool_max_size as usize,
232 stream_chunk_size: options.stream_chunk_size as usize,
233 })
234 }
235 #[cfg(feature = "sqlite")]
236 Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
237 ConnectionOptions::Sqlite(Path::new(&options.path).to_path_buf())
238 }
239 _ => panic!("Failed to parse connection options: {options:?}"),
240 }
241}
242
243fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
244 protobuf::Projection {
245 projection: projection.iter().map(|n| *n as u32).collect(),
246 }
247}
248
249fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
250 let fields = remote_schema
251 .fields
252 .iter()
253 .map(serialize_remote_field)
254 .collect::<Vec<_>>();
255
256 protobuf::RemoteSchema { fields }
257}
258
259fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
260 protobuf::RemoteField {
261 name: remote_field.name.clone(),
262 remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
263 nullable: remote_field.nullable,
264 }
265}
266
267fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
268 match remote_type {
269 RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
270 r#type: Some(protobuf::remote_type::Type::PostgresInt2(
271 protobuf::PostgresInt2 {},
272 )),
273 },
274 RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
275 r#type: Some(protobuf::remote_type::Type::PostgresInt4(
276 protobuf::PostgresInt4 {},
277 )),
278 },
279 RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
280 r#type: Some(protobuf::remote_type::Type::PostgresInt8(
281 protobuf::PostgresInt8 {},
282 )),
283 },
284 RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
285 r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
286 protobuf::PostgresFloat4 {},
287 )),
288 },
289 RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
290 r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
291 protobuf::PostgresFloat8 {},
292 )),
293 },
294 RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
295 r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
296 protobuf::PostgresNumeric {
297 scale: *scale as i32,
298 },
299 )),
300 },
301 RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
302 r#type: Some(protobuf::remote_type::Type::PostgresName(
303 protobuf::PostgresName {},
304 )),
305 },
306 RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
307 r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
308 protobuf::PostgresVarchar {},
309 )),
310 },
311 RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
312 r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
313 protobuf::PostgresBpchar {},
314 )),
315 },
316 RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
317 r#type: Some(protobuf::remote_type::Type::PostgresText(
318 protobuf::PostgresText {},
319 )),
320 },
321 RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
322 r#type: Some(protobuf::remote_type::Type::PostgresBytea(
323 protobuf::PostgresBytea {},
324 )),
325 },
326 RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
327 r#type: Some(protobuf::remote_type::Type::PostgresDate(
328 protobuf::PostgresDate {},
329 )),
330 },
331 RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
332 r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
333 protobuf::PostgresTimestamp {},
334 )),
335 },
336 RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
337 r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
338 protobuf::PostgresTimestampTz {},
339 )),
340 },
341 RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
342 r#type: Some(protobuf::remote_type::Type::PostgresTime(
343 protobuf::PostgresTime {},
344 )),
345 },
346 RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
347 r#type: Some(protobuf::remote_type::Type::PostgresInterval(
348 protobuf::PostgresInterval {},
349 )),
350 },
351 RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
352 r#type: Some(protobuf::remote_type::Type::PostgresBool(
353 protobuf::PostgresBool {},
354 )),
355 },
356 RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
357 r#type: Some(protobuf::remote_type::Type::PostgresJson(
358 protobuf::PostgresJson {},
359 )),
360 },
361 RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
362 r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
363 protobuf::PostgresJsonb {},
364 )),
365 },
366 RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
367 r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
368 protobuf::PostgresInt2Array {},
369 )),
370 },
371 RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
372 r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
373 protobuf::PostgresInt4Array {},
374 )),
375 },
376 RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
377 r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
378 protobuf::PostgresInt8Array {},
379 )),
380 },
381 RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
382 r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
383 protobuf::PostgresFloat4Array {},
384 )),
385 },
386 RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
387 r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
388 protobuf::PostgresFloat8Array {},
389 )),
390 },
391 RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
392 r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
393 protobuf::PostgresVarcharArray {},
394 )),
395 },
396 RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
397 r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
398 protobuf::PostgresBpcharArray {},
399 )),
400 },
401 RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
402 r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
403 protobuf::PostgresTextArray {},
404 )),
405 },
406 RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
407 r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
408 protobuf::PostgresByteaArray {},
409 )),
410 },
411 RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
412 r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
413 protobuf::PostgresBoolArray {},
414 )),
415 },
416 RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
417 r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
418 protobuf::PostgresPostGisGeometry {},
419 )),
420 },
421 RemoteType::Postgres(PostgresType::Oid) => protobuf::RemoteType {
422 r#type: Some(protobuf::remote_type::Type::PostgresOid(
423 protobuf::PostgresOid {},
424 )),
425 },
426
427 RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
428 r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
429 protobuf::MysqlTinyInt {},
430 )),
431 },
432 RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
433 r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
434 protobuf::MysqlTinyIntUnsigned {},
435 )),
436 },
437 RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
438 r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
439 protobuf::MysqlSmallInt {},
440 )),
441 },
442 RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
443 r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
444 protobuf::MysqlSmallIntUnsigned {},
445 )),
446 },
447 RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
448 r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
449 protobuf::MysqlMediumInt {},
450 )),
451 },
452 RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
453 r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
454 protobuf::MysqlMediumIntUnsigned {},
455 )),
456 },
457 RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
458 r#type: Some(protobuf::remote_type::Type::MysqlInteger(
459 protobuf::MysqlInteger {},
460 )),
461 },
462 RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
463 r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
464 protobuf::MysqlIntegerUnsigned {},
465 )),
466 },
467 RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
468 r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
469 protobuf::MysqlBigInt {},
470 )),
471 },
472 RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
473 r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
474 protobuf::MysqlBigIntUnsigned {},
475 )),
476 },
477 RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
478 r#type: Some(protobuf::remote_type::Type::MysqlFloat(
479 protobuf::MysqlFloat {},
480 )),
481 },
482 RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
483 r#type: Some(protobuf::remote_type::Type::MysqlDouble(
484 protobuf::MysqlDouble {},
485 )),
486 },
487 RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
488 r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
489 protobuf::MysqlDecimal {
490 precision: *precision as u32,
491 scale: *scale as u32,
492 },
493 )),
494 },
495 RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
496 r#type: Some(protobuf::remote_type::Type::MysqlDate(
497 protobuf::MysqlDate {},
498 )),
499 },
500 RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
501 r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
502 protobuf::MysqlDateTime {},
503 )),
504 },
505 RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
506 r#type: Some(protobuf::remote_type::Type::MysqlTime(
507 protobuf::MysqlTime {},
508 )),
509 },
510 RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
511 r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
512 protobuf::MysqlTimestamp {},
513 )),
514 },
515 RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
516 r#type: Some(protobuf::remote_type::Type::MysqlYear(
517 protobuf::MysqlYear {},
518 )),
519 },
520 RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
521 r#type: Some(protobuf::remote_type::Type::MysqlChar(
522 protobuf::MysqlChar {},
523 )),
524 },
525 RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
526 r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
527 protobuf::MysqlVarchar {},
528 )),
529 },
530 RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
531 r#type: Some(protobuf::remote_type::Type::MysqlBinary(
532 protobuf::MysqlBinary {},
533 )),
534 },
535 RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
536 r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
537 protobuf::MysqlVarbinary {},
538 )),
539 },
540 RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
541 r#type: Some(protobuf::remote_type::Type::MysqlText(
542 protobuf::MysqlText { length: *len },
543 )),
544 },
545 RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
546 r#type: Some(protobuf::remote_type::Type::MysqlBlob(
547 protobuf::MysqlBlob { length: *len },
548 )),
549 },
550 RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
551 r#type: Some(protobuf::remote_type::Type::MysqlJson(
552 protobuf::MysqlJson {},
553 )),
554 },
555 RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
556 r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
557 protobuf::MysqlGeometry {},
558 )),
559 },
560
561 RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
562 r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
563 protobuf::OracleVarchar2 { length: *len },
564 )),
565 },
566 RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
567 r#type: Some(protobuf::remote_type::Type::OracleChar(
568 protobuf::OracleChar { length: *len },
569 )),
570 },
571 RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
572 r#type: Some(protobuf::remote_type::Type::OracleNumber(
573 protobuf::OracleNumber {
574 precision: *precision as u32,
575 scale: *scale as i32,
576 },
577 )),
578 },
579 RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
580 r#type: Some(protobuf::remote_type::Type::OracleDate(
581 protobuf::OracleDate {},
582 )),
583 },
584 RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
585 r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
586 protobuf::OracleTimestamp {},
587 )),
588 },
589 RemoteType::Oracle(OracleType::Boolean) => protobuf::RemoteType {
590 r#type: Some(protobuf::remote_type::Type::OracleBoolean(
591 protobuf::OracleBoolean {},
592 )),
593 },
594 RemoteType::Oracle(OracleType::BinaryFloat) => protobuf::RemoteType {
595 r#type: Some(protobuf::remote_type::Type::OracleBinaryFloat(
596 protobuf::OracleBinaryFloat {},
597 )),
598 },
599 RemoteType::Oracle(OracleType::BinaryDouble) => protobuf::RemoteType {
600 r#type: Some(protobuf::remote_type::Type::OracleBinaryDouble(
601 protobuf::OracleBinaryDouble {},
602 )),
603 },
604 RemoteType::Oracle(OracleType::Blob) => protobuf::RemoteType {
605 r#type: Some(protobuf::remote_type::Type::OracleBlob(
606 protobuf::OracleBlob {},
607 )),
608 },
609 RemoteType::Oracle(OracleType::Float(precision)) => protobuf::RemoteType {
610 r#type: Some(protobuf::remote_type::Type::OracleFloat(
611 protobuf::OracleFloat {
612 precision: *precision as u32,
613 },
614 )),
615 },
616 RemoteType::Oracle(OracleType::NChar(len)) => protobuf::RemoteType {
617 r#type: Some(protobuf::remote_type::Type::OracleNchar(
618 protobuf::OracleNChar { length: *len },
619 )),
620 },
621 RemoteType::Oracle(OracleType::NVarchar2(len)) => protobuf::RemoteType {
622 r#type: Some(protobuf::remote_type::Type::OracleNvarchar2(
623 protobuf::OracleNVarchar2 { length: *len },
624 )),
625 },
626 RemoteType::Oracle(OracleType::Raw(len)) => protobuf::RemoteType {
627 r#type: Some(protobuf::remote_type::Type::OracleRaw(
628 protobuf::OracleRaw { length: *len },
629 )),
630 },
631 RemoteType::Oracle(OracleType::LongRaw) => protobuf::RemoteType {
632 r#type: Some(protobuf::remote_type::Type::OracleLongRaw(
633 protobuf::OracleLongRaw {},
634 )),
635 },
636 RemoteType::Oracle(OracleType::Long) => protobuf::RemoteType {
637 r#type: Some(protobuf::remote_type::Type::OracleLong(
638 protobuf::OracleLong {},
639 )),
640 },
641 RemoteType::Oracle(OracleType::Clob) => protobuf::RemoteType {
642 r#type: Some(protobuf::remote_type::Type::OracleClob(
643 protobuf::OracleClob {},
644 )),
645 },
646 RemoteType::Oracle(OracleType::NClob) => protobuf::RemoteType {
647 r#type: Some(protobuf::remote_type::Type::OracleNclob(
648 protobuf::OracleNClob {},
649 )),
650 },
651 RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
652 r#type: Some(protobuf::remote_type::Type::SqliteNull(
653 protobuf::SqliteNull {},
654 )),
655 },
656 RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
657 r#type: Some(protobuf::remote_type::Type::SqliteInteger(
658 protobuf::SqliteInteger {},
659 )),
660 },
661 RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
662 r#type: Some(protobuf::remote_type::Type::SqliteReal(
663 protobuf::SqliteReal {},
664 )),
665 },
666 RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
667 r#type: Some(protobuf::remote_type::Type::SqliteText(
668 protobuf::SqliteText {},
669 )),
670 },
671 RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
672 r#type: Some(protobuf::remote_type::Type::SqliteBlob(
673 protobuf::SqliteBlob {},
674 )),
675 },
676 }
677}
678
679fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
680 let fields = remote_schema
681 .fields
682 .iter()
683 .map(parse_remote_field)
684 .collect::<Vec<_>>();
685
686 RemoteSchema { fields }
687}
688
689fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
690 RemoteField {
691 name: field.name.clone(),
692 remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
693 nullable: field.nullable,
694 }
695}
696
697fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
698 match remote_type.r#type.as_ref().unwrap() {
699 protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
700 protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
701 protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
702 protobuf::remote_type::Type::PostgresFloat4(_) => {
703 RemoteType::Postgres(PostgresType::Float4)
704 }
705 protobuf::remote_type::Type::PostgresFloat8(_) => {
706 RemoteType::Postgres(PostgresType::Float8)
707 }
708 protobuf::remote_type::Type::PostgresNumeric(numeric) => {
709 RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
710 }
711 protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
712 protobuf::remote_type::Type::PostgresVarchar(_) => {
713 RemoteType::Postgres(PostgresType::Varchar)
714 }
715 protobuf::remote_type::Type::PostgresBpchar(_) => {
716 RemoteType::Postgres(PostgresType::Bpchar)
717 }
718 protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
719 protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
720 protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
721 protobuf::remote_type::Type::PostgresTimestamp(_) => {
722 RemoteType::Postgres(PostgresType::Timestamp)
723 }
724 protobuf::remote_type::Type::PostgresTimestampTz(_) => {
725 RemoteType::Postgres(PostgresType::TimestampTz)
726 }
727 protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
728 protobuf::remote_type::Type::PostgresInterval(_) => {
729 RemoteType::Postgres(PostgresType::Interval)
730 }
731 protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
732 protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
733 protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
734 protobuf::remote_type::Type::PostgresInt2Array(_) => {
735 RemoteType::Postgres(PostgresType::Int2Array)
736 }
737 protobuf::remote_type::Type::PostgresInt4Array(_) => {
738 RemoteType::Postgres(PostgresType::Int4Array)
739 }
740 protobuf::remote_type::Type::PostgresInt8Array(_) => {
741 RemoteType::Postgres(PostgresType::Int8Array)
742 }
743 protobuf::remote_type::Type::PostgresFloat4Array(_) => {
744 RemoteType::Postgres(PostgresType::Float4Array)
745 }
746 protobuf::remote_type::Type::PostgresFloat8Array(_) => {
747 RemoteType::Postgres(PostgresType::Float8Array)
748 }
749 protobuf::remote_type::Type::PostgresVarcharArray(_) => {
750 RemoteType::Postgres(PostgresType::VarcharArray)
751 }
752 protobuf::remote_type::Type::PostgresBpcharArray(_) => {
753 RemoteType::Postgres(PostgresType::BpcharArray)
754 }
755 protobuf::remote_type::Type::PostgresTextArray(_) => {
756 RemoteType::Postgres(PostgresType::TextArray)
757 }
758 protobuf::remote_type::Type::PostgresByteaArray(_) => {
759 RemoteType::Postgres(PostgresType::ByteaArray)
760 }
761 protobuf::remote_type::Type::PostgresBoolArray(_) => {
762 RemoteType::Postgres(PostgresType::BoolArray)
763 }
764 protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
765 RemoteType::Postgres(PostgresType::PostGisGeometry)
766 }
767 protobuf::remote_type::Type::PostgresOid(_) => RemoteType::Postgres(PostgresType::Oid),
768 protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
769 protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
770 RemoteType::Mysql(MysqlType::TinyIntUnsigned)
771 }
772 protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
773 protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
774 RemoteType::Mysql(MysqlType::SmallIntUnsigned)
775 }
776 protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
777 protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
778 RemoteType::Mysql(MysqlType::MediumIntUnsigned)
779 }
780 protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
781 protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
782 RemoteType::Mysql(MysqlType::IntegerUnsigned)
783 }
784 protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
785 protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
786 RemoteType::Mysql(MysqlType::BigIntUnsigned)
787 }
788 protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
789 protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
790 protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
791 MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
792 ),
793 protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
794 protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
795 protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
796 protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
797 protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
798 protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
799 protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
800 protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
801 protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
802 protobuf::remote_type::Type::MysqlText(text) => {
803 RemoteType::Mysql(MysqlType::Text(text.length))
804 }
805 protobuf::remote_type::Type::MysqlBlob(blob) => {
806 RemoteType::Mysql(MysqlType::Blob(blob.length))
807 }
808 protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
809 protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
810 protobuf::remote_type::Type::OracleVarchar2(varchar) => {
811 RemoteType::Oracle(OracleType::Varchar2(varchar.length))
812 }
813 protobuf::remote_type::Type::OracleChar(char) => {
814 RemoteType::Oracle(OracleType::Char(char.length))
815 }
816 protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
817 OracleType::Number(number.precision as u8, number.scale as i8),
818 ),
819 protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
820 protobuf::remote_type::Type::OracleTimestamp(_) => {
821 RemoteType::Oracle(OracleType::Timestamp)
822 }
823 protobuf::remote_type::Type::OracleBoolean(_) => RemoteType::Oracle(OracleType::Boolean),
824 protobuf::remote_type::Type::OracleBinaryFloat(_) => {
825 RemoteType::Oracle(OracleType::BinaryFloat)
826 }
827 protobuf::remote_type::Type::OracleBinaryDouble(_) => {
828 RemoteType::Oracle(OracleType::BinaryDouble)
829 }
830 protobuf::remote_type::Type::OracleFloat(protobuf::OracleFloat { precision }) => {
831 RemoteType::Oracle(OracleType::Float(*precision as u8))
832 }
833 protobuf::remote_type::Type::OracleNchar(protobuf::OracleNChar { length }) => {
834 RemoteType::Oracle(OracleType::NChar(*length))
835 }
836 protobuf::remote_type::Type::OracleNvarchar2(protobuf::OracleNVarchar2 { length }) => {
837 RemoteType::Oracle(OracleType::NVarchar2(*length))
838 }
839 protobuf::remote_type::Type::OracleRaw(protobuf::OracleRaw { length }) => {
840 RemoteType::Oracle(OracleType::Raw(*length))
841 }
842 protobuf::remote_type::Type::OracleLongRaw(_) => RemoteType::Oracle(OracleType::LongRaw),
843 protobuf::remote_type::Type::OracleBlob(_) => RemoteType::Oracle(OracleType::Blob),
844 protobuf::remote_type::Type::OracleLong(_) => RemoteType::Oracle(OracleType::Long),
845 protobuf::remote_type::Type::OracleClob(_) => RemoteType::Oracle(OracleType::Clob),
846 protobuf::remote_type::Type::OracleNclob(_) => RemoteType::Oracle(OracleType::NClob),
847 protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
848 protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
849 protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
850 protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
851 protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
852 }
853}