1use crate::generated::prost as protobuf;
2use crate::{
3 connect, ConnectionOptions, DFResult, MysqlConnectionOptions, MysqlType,
4 OracleConnectionOptions, OracleType, PostgresConnectionOptions, PostgresType, RemoteField,
5 RemoteSchema, RemoteSchemaRef, RemoteTableExec, RemoteType, SqliteType, Transform,
6};
7use datafusion::arrow::datatypes::SchemaRef;
8use datafusion::common::DataFusionError;
9use datafusion::execution::FunctionRegistry;
10use datafusion::physical_plan::ExecutionPlan;
11use datafusion_proto::convert_required;
12use datafusion_proto::physical_plan::PhysicalExtensionCodec;
13use datafusion_proto::protobuf::proto_error;
14use prost::Message;
15use std::fmt::Debug;
16use std::path::Path;
17use std::sync::Arc;
18
19pub trait TransformCodec: Debug + Send + Sync {
20 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
21 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
22}
23
24#[derive(Debug)]
25pub struct RemotePhysicalCodec {
26 transform_codec: Option<Arc<dyn TransformCodec>>,
27}
28
29impl RemotePhysicalCodec {
30 pub fn new(transform_codec: Option<Arc<dyn TransformCodec>>) -> Self {
31 Self { transform_codec }
32 }
33}
34
35impl PhysicalExtensionCodec for RemotePhysicalCodec {
36 fn try_decode(
37 &self,
38 buf: &[u8],
39 _inputs: &[Arc<dyn ExecutionPlan>],
40 _registry: &dyn FunctionRegistry,
41 ) -> DFResult<Arc<dyn ExecutionPlan>> {
42 let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
43 DataFusionError::Internal(format!(
44 "Failed to decode remote table execution plan: {e:?}"
45 ))
46 })?;
47
48 let transform = if let Some(bytes) = proto.transform {
49 let Some(transform_codec) = self.transform_codec.as_ref() else {
50 return Err(DataFusionError::Execution(
51 "No transform codec found".to_string(),
52 ));
53 };
54 Some(transform_codec.try_decode(&bytes)?)
55 } else {
56 None
57 };
58
59 let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
60 let remote_schema = proto
61 .remote_schema
62 .map(|schema| Arc::new(parse_remote_schema(&schema)));
63
64 let projection: Option<Vec<usize>> = proto
65 .projection
66 .map(|p| p.projection.iter().map(|n| *n as usize).collect());
67
68 let conn_options = parse_connection_options(proto.conn_options.unwrap());
69 let conn = tokio::task::block_in_place(|| {
70 tokio::runtime::Handle::current().block_on(async {
71 let pool = connect(&conn_options).await?;
72 let conn = pool.get().await?;
73 Ok::<_, DataFusionError>(conn)
74 })
75 })?;
76
77 Ok(Arc::new(RemoteTableExec::try_new(
78 conn_options,
79 proto.sql,
80 table_schema,
81 remote_schema,
82 projection,
83 transform,
84 conn,
85 )?))
86 }
87
88 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
89 if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
90 let serialized_transform = if let Some(transform) = exec.transform.as_ref() {
91 let Some(transform_codec) = self.transform_codec.as_ref() else {
92 return Err(DataFusionError::Execution(
93 "No transform codec found".to_string(),
94 ));
95 };
96 let bytes = transform_codec.try_encode(transform.as_ref())?;
97 Some(bytes)
98 } else {
99 None
100 };
101
102 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
103 let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
104
105 let proto = protobuf::RemoteTableExec {
106 conn_options: Some(serialized_connection_options),
107 sql: exec.sql.clone(),
108 table_schema: Some(exec.table_schema.as_ref().try_into()?),
109 remote_schema,
110 projection: exec
111 .projection
112 .as_ref()
113 .map(|p| serialize_projection(p.as_slice())),
114 transform: serialized_transform,
115 };
116
117 proto.encode(buf).map_err(|e| {
118 DataFusionError::Internal(format!(
119 "Failed to encode remote table execution plan: {e:?}"
120 ))
121 })?;
122 Ok(())
123 } else {
124 Err(DataFusionError::Execution(format!(
125 "Failed to encode {}",
126 RemoteTableExec::static_name()
127 )))
128 }
129 }
130}
131
132fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
133 match options {
134 ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
135 connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
136 protobuf::PostgresConnectionOptions {
137 host: options.host.clone(),
138 port: options.port as u32,
139 username: options.username.clone(),
140 password: options.password.clone(),
141 database: options.database.clone(),
142 chunk_size: options.chunk_size.map(|sz| sz as u32),
143 },
144 )),
145 },
146 ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
147 connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
148 protobuf::MysqlConnectionOptions {
149 host: options.host.clone(),
150 port: options.port as u32,
151 username: options.username.clone(),
152 password: options.password.clone(),
153 database: options.database.clone(),
154 chunk_size: options.chunk_size.map(|sz| sz as u32),
155 },
156 )),
157 },
158 ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
159 connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
160 protobuf::OracleConnectionOptions {
161 host: options.host.clone(),
162 port: options.port as u32,
163 username: options.username.clone(),
164 password: options.password.clone(),
165 service_name: options.service_name.clone(),
166 chunk_size: options.chunk_size.map(|sz| sz as u32),
167 },
168 )),
169 },
170 ConnectionOptions::Sqlite(path) => protobuf::ConnectionOptions {
171 connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
172 protobuf::SqliteConnectionOptions {
173 path: path.to_str().unwrap().to_string(),
174 },
175 )),
176 },
177 }
178}
179
180fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
181 match options.connection_options {
182 Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
183 ConnectionOptions::Postgres(PostgresConnectionOptions {
184 host: options.host,
185 port: options.port as u16,
186 username: options.username,
187 password: options.password,
188 database: options.database,
189 chunk_size: options.chunk_size.map(|sz| sz as usize),
190 })
191 }
192 Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
193 ConnectionOptions::Mysql(MysqlConnectionOptions {
194 host: options.host,
195 port: options.port as u16,
196 username: options.username,
197 password: options.password,
198 database: options.database,
199 chunk_size: options.chunk_size.map(|sz| sz as usize),
200 })
201 }
202 Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
203 ConnectionOptions::Oracle(OracleConnectionOptions {
204 host: options.host,
205 port: options.port as u16,
206 username: options.username,
207 password: options.password,
208 service_name: options.service_name,
209 chunk_size: options.chunk_size.map(|sz| sz as usize),
210 })
211 }
212 Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
213 ConnectionOptions::Sqlite(Path::new(&options.path).to_path_buf())
214 }
215 _ => panic!("Failed to parse connection options: {options:?}"),
216 }
217}
218
219fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
220 protobuf::Projection {
221 projection: projection.iter().map(|n| *n as u32).collect(),
222 }
223}
224
225fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
226 let fields = remote_schema
227 .fields
228 .iter()
229 .map(serialize_remote_field)
230 .collect::<Vec<_>>();
231
232 protobuf::RemoteSchema { fields }
233}
234
235fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
236 protobuf::RemoteField {
237 name: remote_field.name.clone(),
238 remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
239 nullable: remote_field.nullable,
240 }
241}
242
243fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
244 match remote_type {
245 RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
246 r#type: Some(protobuf::remote_type::Type::PostgresInt2(
247 protobuf::PostgresInt2 {},
248 )),
249 },
250 RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
251 r#type: Some(protobuf::remote_type::Type::PostgresInt4(
252 protobuf::PostgresInt4 {},
253 )),
254 },
255 RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
256 r#type: Some(protobuf::remote_type::Type::PostgresInt8(
257 protobuf::PostgresInt8 {},
258 )),
259 },
260 RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
261 r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
262 protobuf::PostgresFloat4 {},
263 )),
264 },
265 RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
266 r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
267 protobuf::PostgresFloat8 {},
268 )),
269 },
270 RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
271 r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
272 protobuf::PostgresNumeric {
273 scale: *scale as i32,
274 },
275 )),
276 },
277 RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
278 r#type: Some(protobuf::remote_type::Type::PostgresName(
279 protobuf::PostgresName {},
280 )),
281 },
282 RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
283 r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
284 protobuf::PostgresVarchar {},
285 )),
286 },
287 RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
288 r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
289 protobuf::PostgresBpchar {},
290 )),
291 },
292 RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
293 r#type: Some(protobuf::remote_type::Type::PostgresText(
294 protobuf::PostgresText {},
295 )),
296 },
297 RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
298 r#type: Some(protobuf::remote_type::Type::PostgresBytea(
299 protobuf::PostgresBytea {},
300 )),
301 },
302 RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
303 r#type: Some(protobuf::remote_type::Type::PostgresDate(
304 protobuf::PostgresDate {},
305 )),
306 },
307 RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
308 r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
309 protobuf::PostgresTimestamp {},
310 )),
311 },
312 RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
313 r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
314 protobuf::PostgresTimestampTz {},
315 )),
316 },
317 RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
318 r#type: Some(protobuf::remote_type::Type::PostgresTime(
319 protobuf::PostgresTime {},
320 )),
321 },
322 RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
323 r#type: Some(protobuf::remote_type::Type::PostgresInterval(
324 protobuf::PostgresInterval {},
325 )),
326 },
327 RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
328 r#type: Some(protobuf::remote_type::Type::PostgresBool(
329 protobuf::PostgresBool {},
330 )),
331 },
332 RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
333 r#type: Some(protobuf::remote_type::Type::PostgresJson(
334 protobuf::PostgresJson {},
335 )),
336 },
337 RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
338 r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
339 protobuf::PostgresJsonb {},
340 )),
341 },
342 RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
343 r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
344 protobuf::PostgresInt2Array {},
345 )),
346 },
347 RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
348 r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
349 protobuf::PostgresInt4Array {},
350 )),
351 },
352 RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
353 r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
354 protobuf::PostgresInt8Array {},
355 )),
356 },
357 RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
358 r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
359 protobuf::PostgresFloat4Array {},
360 )),
361 },
362 RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
363 r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
364 protobuf::PostgresFloat8Array {},
365 )),
366 },
367 RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
368 r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
369 protobuf::PostgresVarcharArray {},
370 )),
371 },
372 RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
373 r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
374 protobuf::PostgresBpcharArray {},
375 )),
376 },
377 RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
378 r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
379 protobuf::PostgresTextArray {},
380 )),
381 },
382 RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
383 r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
384 protobuf::PostgresByteaArray {},
385 )),
386 },
387 RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
388 r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
389 protobuf::PostgresBoolArray {},
390 )),
391 },
392 RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
393 r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
394 protobuf::PostgresPostGisGeometry {},
395 )),
396 },
397
398 RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
399 r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
400 protobuf::MysqlTinyInt {},
401 )),
402 },
403 RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
404 r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
405 protobuf::MysqlTinyIntUnsigned {},
406 )),
407 },
408 RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
409 r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
410 protobuf::MysqlSmallInt {},
411 )),
412 },
413 RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
414 r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
415 protobuf::MysqlSmallIntUnsigned {},
416 )),
417 },
418 RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
419 r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
420 protobuf::MysqlMediumInt {},
421 )),
422 },
423 RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
424 r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
425 protobuf::MysqlMediumIntUnsigned {},
426 )),
427 },
428 RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
429 r#type: Some(protobuf::remote_type::Type::MysqlInteger(
430 protobuf::MysqlInteger {},
431 )),
432 },
433 RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
434 r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
435 protobuf::MysqlIntegerUnsigned {},
436 )),
437 },
438 RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
439 r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
440 protobuf::MysqlBigInt {},
441 )),
442 },
443 RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
444 r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
445 protobuf::MysqlBigIntUnsigned {},
446 )),
447 },
448 RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
449 r#type: Some(protobuf::remote_type::Type::MysqlFloat(
450 protobuf::MysqlFloat {},
451 )),
452 },
453 RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
454 r#type: Some(protobuf::remote_type::Type::MysqlDouble(
455 protobuf::MysqlDouble {},
456 )),
457 },
458 RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
459 r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
460 protobuf::MysqlDecimal {
461 precision: *precision as u32,
462 scale: *scale as u32,
463 },
464 )),
465 },
466 RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
467 r#type: Some(protobuf::remote_type::Type::MysqlDate(
468 protobuf::MysqlDate {},
469 )),
470 },
471 RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
472 r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
473 protobuf::MysqlDateTime {},
474 )),
475 },
476 RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
477 r#type: Some(protobuf::remote_type::Type::MysqlTime(
478 protobuf::MysqlTime {},
479 )),
480 },
481 RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
482 r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
483 protobuf::MysqlTimestamp {},
484 )),
485 },
486 RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
487 r#type: Some(protobuf::remote_type::Type::MysqlYear(
488 protobuf::MysqlYear {},
489 )),
490 },
491 RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
492 r#type: Some(protobuf::remote_type::Type::MysqlChar(
493 protobuf::MysqlChar {},
494 )),
495 },
496 RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
497 r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
498 protobuf::MysqlVarchar {},
499 )),
500 },
501 RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
502 r#type: Some(protobuf::remote_type::Type::MysqlBinary(
503 protobuf::MysqlBinary {},
504 )),
505 },
506 RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
507 r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
508 protobuf::MysqlVarbinary {},
509 )),
510 },
511 RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
512 r#type: Some(protobuf::remote_type::Type::MysqlText(
513 protobuf::MysqlText { length: *len },
514 )),
515 },
516 RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
517 r#type: Some(protobuf::remote_type::Type::MysqlBlob(
518 protobuf::MysqlBlob { length: *len },
519 )),
520 },
521 RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
522 r#type: Some(protobuf::remote_type::Type::MysqlJson(
523 protobuf::MysqlJson {},
524 )),
525 },
526 RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
527 r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
528 protobuf::MysqlGeometry {},
529 )),
530 },
531
532 RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
533 r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
534 protobuf::OracleVarchar2 { length: *len },
535 )),
536 },
537 RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
538 r#type: Some(protobuf::remote_type::Type::OracleChar(
539 protobuf::OracleChar { length: *len },
540 )),
541 },
542 RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
543 r#type: Some(protobuf::remote_type::Type::OracleNumber(
544 protobuf::OracleNumber {
545 precision: *precision as u32,
546 scale: *scale as i32,
547 },
548 )),
549 },
550 RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
551 r#type: Some(protobuf::remote_type::Type::OracleDate(
552 protobuf::OracleDate {},
553 )),
554 },
555 RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
556 r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
557 protobuf::OracleTimestamp {},
558 )),
559 },
560 RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
561 r#type: Some(protobuf::remote_type::Type::SqliteNull(
562 protobuf::SqliteNull {},
563 )),
564 },
565 RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
566 r#type: Some(protobuf::remote_type::Type::SqliteInteger(
567 protobuf::SqliteInteger {},
568 )),
569 },
570 RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
571 r#type: Some(protobuf::remote_type::Type::SqliteReal(
572 protobuf::SqliteReal {},
573 )),
574 },
575 RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
576 r#type: Some(protobuf::remote_type::Type::SqliteText(
577 protobuf::SqliteText {},
578 )),
579 },
580 RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
581 r#type: Some(protobuf::remote_type::Type::SqliteBlob(
582 protobuf::SqliteBlob {},
583 )),
584 },
585 }
586}
587
588fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
589 let fields = remote_schema
590 .fields
591 .iter()
592 .map(parse_remote_field)
593 .collect::<Vec<_>>();
594
595 RemoteSchema { fields }
596}
597
598fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
599 RemoteField {
600 name: field.name.clone(),
601 remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
602 nullable: field.nullable,
603 }
604}
605
606fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
607 match remote_type.r#type.as_ref().unwrap() {
608 protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
609 protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
610 protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
611 protobuf::remote_type::Type::PostgresFloat4(_) => {
612 RemoteType::Postgres(PostgresType::Float4)
613 }
614 protobuf::remote_type::Type::PostgresFloat8(_) => {
615 RemoteType::Postgres(PostgresType::Float8)
616 }
617 protobuf::remote_type::Type::PostgresNumeric(numeric) => {
618 RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
619 }
620 protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
621 protobuf::remote_type::Type::PostgresVarchar(_) => {
622 RemoteType::Postgres(PostgresType::Varchar)
623 }
624 protobuf::remote_type::Type::PostgresBpchar(_) => {
625 RemoteType::Postgres(PostgresType::Bpchar)
626 }
627 protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
628 protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
629 protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
630 protobuf::remote_type::Type::PostgresTimestamp(_) => {
631 RemoteType::Postgres(PostgresType::Timestamp)
632 }
633 protobuf::remote_type::Type::PostgresTimestampTz(_) => {
634 RemoteType::Postgres(PostgresType::TimestampTz)
635 }
636 protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
637 protobuf::remote_type::Type::PostgresInterval(_) => {
638 RemoteType::Postgres(PostgresType::Interval)
639 }
640 protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
641 protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
642 protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
643 protobuf::remote_type::Type::PostgresInt2Array(_) => {
644 RemoteType::Postgres(PostgresType::Int2Array)
645 }
646 protobuf::remote_type::Type::PostgresInt4Array(_) => {
647 RemoteType::Postgres(PostgresType::Int4Array)
648 }
649 protobuf::remote_type::Type::PostgresInt8Array(_) => {
650 RemoteType::Postgres(PostgresType::Int8Array)
651 }
652 protobuf::remote_type::Type::PostgresFloat4Array(_) => {
653 RemoteType::Postgres(PostgresType::Float4Array)
654 }
655 protobuf::remote_type::Type::PostgresFloat8Array(_) => {
656 RemoteType::Postgres(PostgresType::Float8Array)
657 }
658 protobuf::remote_type::Type::PostgresVarcharArray(_) => {
659 RemoteType::Postgres(PostgresType::VarcharArray)
660 }
661 protobuf::remote_type::Type::PostgresBpcharArray(_) => {
662 RemoteType::Postgres(PostgresType::BpcharArray)
663 }
664 protobuf::remote_type::Type::PostgresTextArray(_) => {
665 RemoteType::Postgres(PostgresType::TextArray)
666 }
667 protobuf::remote_type::Type::PostgresByteaArray(_) => {
668 RemoteType::Postgres(PostgresType::ByteaArray)
669 }
670 protobuf::remote_type::Type::PostgresBoolArray(_) => {
671 RemoteType::Postgres(PostgresType::BoolArray)
672 }
673 protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
674 RemoteType::Postgres(PostgresType::PostGisGeometry)
675 }
676 protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
677 protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
678 RemoteType::Mysql(MysqlType::TinyIntUnsigned)
679 }
680 protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
681 protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
682 RemoteType::Mysql(MysqlType::SmallIntUnsigned)
683 }
684 protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
685 protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
686 RemoteType::Mysql(MysqlType::MediumIntUnsigned)
687 }
688 protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
689 protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
690 RemoteType::Mysql(MysqlType::IntegerUnsigned)
691 }
692 protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
693 protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
694 RemoteType::Mysql(MysqlType::BigIntUnsigned)
695 }
696 protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
697 protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
698 protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
699 MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
700 ),
701 protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
702 protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
703 protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
704 protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
705 protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
706 protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
707 protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
708 protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
709 protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
710 protobuf::remote_type::Type::MysqlText(text) => {
711 RemoteType::Mysql(MysqlType::Text(text.length))
712 }
713 protobuf::remote_type::Type::MysqlBlob(blob) => {
714 RemoteType::Mysql(MysqlType::Blob(blob.length))
715 }
716 protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
717 protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
718 protobuf::remote_type::Type::OracleVarchar2(varchar) => {
719 RemoteType::Oracle(OracleType::Varchar2(varchar.length))
720 }
721 protobuf::remote_type::Type::OracleChar(char) => {
722 RemoteType::Oracle(OracleType::Char(char.length))
723 }
724 protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
725 OracleType::Number(number.precision as u8, number.scale as i8),
726 ),
727 protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
728 protobuf::remote_type::Type::OracleTimestamp(_) => {
729 RemoteType::Oracle(OracleType::Timestamp)
730 }
731 protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
732 protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
733 protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
734 protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
735 protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
736 }
737}