datafusion_remote_table/
codec.rs

1use crate::generated::prost as protobuf;
2use crate::{
3    connect, ConnectionOptions, DFResult, MysqlConnectionOptions, MysqlType,
4    OracleConnectionOptions, OracleType, PostgresConnectionOptions, PostgresType, RemoteField,
5    RemoteSchema, RemoteSchemaRef, RemoteTableExec, RemoteType, SqliteType, Transform,
6};
7use datafusion::arrow::datatypes::SchemaRef;
8use datafusion::common::DataFusionError;
9use datafusion::execution::FunctionRegistry;
10use datafusion::physical_plan::ExecutionPlan;
11use datafusion_proto::convert_required;
12use datafusion_proto::physical_plan::PhysicalExtensionCodec;
13use datafusion_proto::protobuf::proto_error;
14use prost::Message;
15use std::fmt::Debug;
16use std::path::Path;
17use std::sync::Arc;
18
19pub trait TransformCodec: Debug + Send + Sync {
20    fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
21    fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
22}
23
24#[derive(Debug)]
25pub struct RemotePhysicalCodec {
26    transform_codec: Option<Arc<dyn TransformCodec>>,
27}
28
29impl RemotePhysicalCodec {
30    pub fn new(transform_codec: Option<Arc<dyn TransformCodec>>) -> Self {
31        Self { transform_codec }
32    }
33}
34
35impl PhysicalExtensionCodec for RemotePhysicalCodec {
36    fn try_decode(
37        &self,
38        buf: &[u8],
39        _inputs: &[Arc<dyn ExecutionPlan>],
40        _registry: &dyn FunctionRegistry,
41    ) -> DFResult<Arc<dyn ExecutionPlan>> {
42        let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
43            DataFusionError::Internal(format!(
44                "Failed to decode remote table execution plan: {e:?}"
45            ))
46        })?;
47
48        let transform = if let Some(bytes) = proto.transform {
49            let Some(transform_codec) = self.transform_codec.as_ref() else {
50                return Err(DataFusionError::Execution(
51                    "No transform codec found".to_string(),
52                ));
53            };
54            Some(transform_codec.try_decode(&bytes)?)
55        } else {
56            None
57        };
58
59        let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
60        let remote_schema = proto
61            .remote_schema
62            .map(|schema| Arc::new(parse_remote_schema(&schema)));
63
64        let projection: Option<Vec<usize>> = proto
65            .projection
66            .map(|p| p.projection.iter().map(|n| *n as usize).collect());
67
68        let conn_options = parse_connection_options(proto.conn_options.unwrap());
69        let conn = tokio::task::block_in_place(|| {
70            tokio::runtime::Handle::current().block_on(async {
71                let pool = connect(&conn_options).await?;
72                let conn = pool.get().await?;
73                Ok::<_, DataFusionError>(conn)
74            })
75        })?;
76
77        Ok(Arc::new(RemoteTableExec::try_new(
78            conn_options,
79            proto.sql,
80            table_schema,
81            remote_schema,
82            projection,
83            transform,
84            conn,
85        )?))
86    }
87
88    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
89        if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
90            let serialized_transform = if let Some(transform) = exec.transform.as_ref() {
91                let Some(transform_codec) = self.transform_codec.as_ref() else {
92                    return Err(DataFusionError::Execution(
93                        "No transform codec found".to_string(),
94                    ));
95                };
96                let bytes = transform_codec.try_encode(transform.as_ref())?;
97                Some(bytes)
98            } else {
99                None
100            };
101
102            let serialized_connection_options = serialize_connection_options(&exec.conn_options);
103            let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
104
105            let proto = protobuf::RemoteTableExec {
106                conn_options: Some(serialized_connection_options),
107                sql: exec.sql.clone(),
108                table_schema: Some(exec.table_schema.as_ref().try_into()?),
109                remote_schema,
110                projection: exec
111                    .projection
112                    .as_ref()
113                    .map(|p| serialize_projection(p.as_slice())),
114                transform: serialized_transform,
115            };
116
117            proto.encode(buf).map_err(|e| {
118                DataFusionError::Internal(format!(
119                    "Failed to encode remote table execution plan: {e:?}"
120                ))
121            })?;
122            Ok(())
123        } else {
124            Err(DataFusionError::Execution(format!(
125                "Failed to encode {}",
126                RemoteTableExec::static_name()
127            )))
128        }
129    }
130}
131
132fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
133    match options {
134        ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
135            connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
136                protobuf::PostgresConnectionOptions {
137                    host: options.host.clone(),
138                    port: options.port as u32,
139                    username: options.username.clone(),
140                    password: options.password.clone(),
141                    database: options.database.clone(),
142                    chunk_size: options.chunk_size.map(|sz| sz as u32),
143                },
144            )),
145        },
146        ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
147            connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
148                protobuf::MysqlConnectionOptions {
149                    host: options.host.clone(),
150                    port: options.port as u32,
151                    username: options.username.clone(),
152                    password: options.password.clone(),
153                    database: options.database.clone(),
154                    chunk_size: options.chunk_size.map(|sz| sz as u32),
155                },
156            )),
157        },
158        ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
159            connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
160                protobuf::OracleConnectionOptions {
161                    host: options.host.clone(),
162                    port: options.port as u32,
163                    username: options.username.clone(),
164                    password: options.password.clone(),
165                    service_name: options.service_name.clone(),
166                    chunk_size: options.chunk_size.map(|sz| sz as u32),
167                },
168            )),
169        },
170        ConnectionOptions::Sqlite(path) => protobuf::ConnectionOptions {
171            connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
172                protobuf::SqliteConnectionOptions {
173                    path: path.to_str().unwrap().to_string(),
174                },
175            )),
176        },
177    }
178}
179
180fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
181    match options.connection_options {
182        Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
183            ConnectionOptions::Postgres(PostgresConnectionOptions {
184                host: options.host,
185                port: options.port as u16,
186                username: options.username,
187                password: options.password,
188                database: options.database,
189                chunk_size: options.chunk_size.map(|sz| sz as usize),
190            })
191        }
192        Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
193            ConnectionOptions::Mysql(MysqlConnectionOptions {
194                host: options.host,
195                port: options.port as u16,
196                username: options.username,
197                password: options.password,
198                database: options.database,
199                chunk_size: options.chunk_size.map(|sz| sz as usize),
200            })
201        }
202        Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
203            ConnectionOptions::Oracle(OracleConnectionOptions {
204                host: options.host,
205                port: options.port as u16,
206                username: options.username,
207                password: options.password,
208                service_name: options.service_name,
209                chunk_size: options.chunk_size.map(|sz| sz as usize),
210            })
211        }
212        Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
213            ConnectionOptions::Sqlite(Path::new(&options.path).to_path_buf())
214        }
215        _ => panic!("Failed to parse connection options: {options:?}"),
216    }
217}
218
219fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
220    protobuf::Projection {
221        projection: projection.iter().map(|n| *n as u32).collect(),
222    }
223}
224
225fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
226    let fields = remote_schema
227        .fields
228        .iter()
229        .map(serialize_remote_field)
230        .collect::<Vec<_>>();
231
232    protobuf::RemoteSchema { fields }
233}
234
235fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
236    protobuf::RemoteField {
237        name: remote_field.name.clone(),
238        remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
239        nullable: remote_field.nullable,
240    }
241}
242
243fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
244    match remote_type {
245        RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
246            r#type: Some(protobuf::remote_type::Type::PostgresInt2(
247                protobuf::PostgresInt2 {},
248            )),
249        },
250        RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
251            r#type: Some(protobuf::remote_type::Type::PostgresInt4(
252                protobuf::PostgresInt4 {},
253            )),
254        },
255        RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
256            r#type: Some(protobuf::remote_type::Type::PostgresInt8(
257                protobuf::PostgresInt8 {},
258            )),
259        },
260        RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
261            r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
262                protobuf::PostgresFloat4 {},
263            )),
264        },
265        RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
266            r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
267                protobuf::PostgresFloat8 {},
268            )),
269        },
270        RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
271            r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
272                protobuf::PostgresNumeric {
273                    scale: *scale as i32,
274                },
275            )),
276        },
277        RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
278            r#type: Some(protobuf::remote_type::Type::PostgresName(
279                protobuf::PostgresName {},
280            )),
281        },
282        RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
283            r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
284                protobuf::PostgresVarchar {},
285            )),
286        },
287        RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
288            r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
289                protobuf::PostgresBpchar {},
290            )),
291        },
292        RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
293            r#type: Some(protobuf::remote_type::Type::PostgresText(
294                protobuf::PostgresText {},
295            )),
296        },
297        RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
298            r#type: Some(protobuf::remote_type::Type::PostgresBytea(
299                protobuf::PostgresBytea {},
300            )),
301        },
302        RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
303            r#type: Some(protobuf::remote_type::Type::PostgresDate(
304                protobuf::PostgresDate {},
305            )),
306        },
307        RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
308            r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
309                protobuf::PostgresTimestamp {},
310            )),
311        },
312        RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
313            r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
314                protobuf::PostgresTimestampTz {},
315            )),
316        },
317        RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
318            r#type: Some(protobuf::remote_type::Type::PostgresTime(
319                protobuf::PostgresTime {},
320            )),
321        },
322        RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
323            r#type: Some(protobuf::remote_type::Type::PostgresInterval(
324                protobuf::PostgresInterval {},
325            )),
326        },
327        RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
328            r#type: Some(protobuf::remote_type::Type::PostgresBool(
329                protobuf::PostgresBool {},
330            )),
331        },
332        RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
333            r#type: Some(protobuf::remote_type::Type::PostgresJson(
334                protobuf::PostgresJson {},
335            )),
336        },
337        RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
338            r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
339                protobuf::PostgresJsonb {},
340            )),
341        },
342        RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
343            r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
344                protobuf::PostgresInt2Array {},
345            )),
346        },
347        RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
348            r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
349                protobuf::PostgresInt4Array {},
350            )),
351        },
352        RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
353            r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
354                protobuf::PostgresInt8Array {},
355            )),
356        },
357        RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
358            r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
359                protobuf::PostgresFloat4Array {},
360            )),
361        },
362        RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
363            r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
364                protobuf::PostgresFloat8Array {},
365            )),
366        },
367        RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
368            r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
369                protobuf::PostgresVarcharArray {},
370            )),
371        },
372        RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
373            r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
374                protobuf::PostgresBpcharArray {},
375            )),
376        },
377        RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
378            r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
379                protobuf::PostgresTextArray {},
380            )),
381        },
382        RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
383            r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
384                protobuf::PostgresByteaArray {},
385            )),
386        },
387        RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
388            r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
389                protobuf::PostgresBoolArray {},
390            )),
391        },
392        RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
393            r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
394                protobuf::PostgresPostGisGeometry {},
395            )),
396        },
397
398        RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
399            r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
400                protobuf::MysqlTinyInt {},
401            )),
402        },
403        RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
404            r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
405                protobuf::MysqlTinyIntUnsigned {},
406            )),
407        },
408        RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
409            r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
410                protobuf::MysqlSmallInt {},
411            )),
412        },
413        RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
414            r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
415                protobuf::MysqlSmallIntUnsigned {},
416            )),
417        },
418        RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
419            r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
420                protobuf::MysqlMediumInt {},
421            )),
422        },
423        RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
424            r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
425                protobuf::MysqlMediumIntUnsigned {},
426            )),
427        },
428        RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
429            r#type: Some(protobuf::remote_type::Type::MysqlInteger(
430                protobuf::MysqlInteger {},
431            )),
432        },
433        RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
434            r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
435                protobuf::MysqlIntegerUnsigned {},
436            )),
437        },
438        RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
439            r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
440                protobuf::MysqlBigInt {},
441            )),
442        },
443        RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
444            r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
445                protobuf::MysqlBigIntUnsigned {},
446            )),
447        },
448        RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
449            r#type: Some(protobuf::remote_type::Type::MysqlFloat(
450                protobuf::MysqlFloat {},
451            )),
452        },
453        RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
454            r#type: Some(protobuf::remote_type::Type::MysqlDouble(
455                protobuf::MysqlDouble {},
456            )),
457        },
458        RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
459            r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
460                protobuf::MysqlDecimal {
461                    precision: *precision as u32,
462                    scale: *scale as u32,
463                },
464            )),
465        },
466        RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
467            r#type: Some(protobuf::remote_type::Type::MysqlDate(
468                protobuf::MysqlDate {},
469            )),
470        },
471        RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
472            r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
473                protobuf::MysqlDateTime {},
474            )),
475        },
476        RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
477            r#type: Some(protobuf::remote_type::Type::MysqlTime(
478                protobuf::MysqlTime {},
479            )),
480        },
481        RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
482            r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
483                protobuf::MysqlTimestamp {},
484            )),
485        },
486        RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
487            r#type: Some(protobuf::remote_type::Type::MysqlYear(
488                protobuf::MysqlYear {},
489            )),
490        },
491        RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
492            r#type: Some(protobuf::remote_type::Type::MysqlChar(
493                protobuf::MysqlChar {},
494            )),
495        },
496        RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
497            r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
498                protobuf::MysqlVarchar {},
499            )),
500        },
501        RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
502            r#type: Some(protobuf::remote_type::Type::MysqlBinary(
503                protobuf::MysqlBinary {},
504            )),
505        },
506        RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
507            r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
508                protobuf::MysqlVarbinary {},
509            )),
510        },
511        RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
512            r#type: Some(protobuf::remote_type::Type::MysqlText(
513                protobuf::MysqlText { length: *len },
514            )),
515        },
516        RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
517            r#type: Some(protobuf::remote_type::Type::MysqlBlob(
518                protobuf::MysqlBlob { length: *len },
519            )),
520        },
521        RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
522            r#type: Some(protobuf::remote_type::Type::MysqlJson(
523                protobuf::MysqlJson {},
524            )),
525        },
526        RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
527            r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
528                protobuf::MysqlGeometry {},
529            )),
530        },
531
532        RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
533            r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
534                protobuf::OracleVarchar2 { length: *len },
535            )),
536        },
537        RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
538            r#type: Some(protobuf::remote_type::Type::OracleChar(
539                protobuf::OracleChar { length: *len },
540            )),
541        },
542        RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
543            r#type: Some(protobuf::remote_type::Type::OracleNumber(
544                protobuf::OracleNumber {
545                    precision: *precision as u32,
546                    scale: *scale as i32,
547                },
548            )),
549        },
550        RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
551            r#type: Some(protobuf::remote_type::Type::OracleDate(
552                protobuf::OracleDate {},
553            )),
554        },
555        RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
556            r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
557                protobuf::OracleTimestamp {},
558            )),
559        },
560        RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
561            r#type: Some(protobuf::remote_type::Type::SqliteNull(
562                protobuf::SqliteNull {},
563            )),
564        },
565        RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
566            r#type: Some(protobuf::remote_type::Type::SqliteInteger(
567                protobuf::SqliteInteger {},
568            )),
569        },
570        RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
571            r#type: Some(protobuf::remote_type::Type::SqliteReal(
572                protobuf::SqliteReal {},
573            )),
574        },
575        RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
576            r#type: Some(protobuf::remote_type::Type::SqliteText(
577                protobuf::SqliteText {},
578            )),
579        },
580        RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
581            r#type: Some(protobuf::remote_type::Type::SqliteBlob(
582                protobuf::SqliteBlob {},
583            )),
584        },
585    }
586}
587
588fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
589    let fields = remote_schema
590        .fields
591        .iter()
592        .map(parse_remote_field)
593        .collect::<Vec<_>>();
594
595    RemoteSchema { fields }
596}
597
598fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
599    RemoteField {
600        name: field.name.clone(),
601        remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
602        nullable: field.nullable,
603    }
604}
605
606fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
607    match remote_type.r#type.as_ref().unwrap() {
608        protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
609        protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
610        protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
611        protobuf::remote_type::Type::PostgresFloat4(_) => {
612            RemoteType::Postgres(PostgresType::Float4)
613        }
614        protobuf::remote_type::Type::PostgresFloat8(_) => {
615            RemoteType::Postgres(PostgresType::Float8)
616        }
617        protobuf::remote_type::Type::PostgresNumeric(numeric) => {
618            RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
619        }
620        protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
621        protobuf::remote_type::Type::PostgresVarchar(_) => {
622            RemoteType::Postgres(PostgresType::Varchar)
623        }
624        protobuf::remote_type::Type::PostgresBpchar(_) => {
625            RemoteType::Postgres(PostgresType::Bpchar)
626        }
627        protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
628        protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
629        protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
630        protobuf::remote_type::Type::PostgresTimestamp(_) => {
631            RemoteType::Postgres(PostgresType::Timestamp)
632        }
633        protobuf::remote_type::Type::PostgresTimestampTz(_) => {
634            RemoteType::Postgres(PostgresType::TimestampTz)
635        }
636        protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
637        protobuf::remote_type::Type::PostgresInterval(_) => {
638            RemoteType::Postgres(PostgresType::Interval)
639        }
640        protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
641        protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
642        protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
643        protobuf::remote_type::Type::PostgresInt2Array(_) => {
644            RemoteType::Postgres(PostgresType::Int2Array)
645        }
646        protobuf::remote_type::Type::PostgresInt4Array(_) => {
647            RemoteType::Postgres(PostgresType::Int4Array)
648        }
649        protobuf::remote_type::Type::PostgresInt8Array(_) => {
650            RemoteType::Postgres(PostgresType::Int8Array)
651        }
652        protobuf::remote_type::Type::PostgresFloat4Array(_) => {
653            RemoteType::Postgres(PostgresType::Float4Array)
654        }
655        protobuf::remote_type::Type::PostgresFloat8Array(_) => {
656            RemoteType::Postgres(PostgresType::Float8Array)
657        }
658        protobuf::remote_type::Type::PostgresVarcharArray(_) => {
659            RemoteType::Postgres(PostgresType::VarcharArray)
660        }
661        protobuf::remote_type::Type::PostgresBpcharArray(_) => {
662            RemoteType::Postgres(PostgresType::BpcharArray)
663        }
664        protobuf::remote_type::Type::PostgresTextArray(_) => {
665            RemoteType::Postgres(PostgresType::TextArray)
666        }
667        protobuf::remote_type::Type::PostgresByteaArray(_) => {
668            RemoteType::Postgres(PostgresType::ByteaArray)
669        }
670        protobuf::remote_type::Type::PostgresBoolArray(_) => {
671            RemoteType::Postgres(PostgresType::BoolArray)
672        }
673        protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
674            RemoteType::Postgres(PostgresType::PostGisGeometry)
675        }
676        protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
677        protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
678            RemoteType::Mysql(MysqlType::TinyIntUnsigned)
679        }
680        protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
681        protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
682            RemoteType::Mysql(MysqlType::SmallIntUnsigned)
683        }
684        protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
685        protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
686            RemoteType::Mysql(MysqlType::MediumIntUnsigned)
687        }
688        protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
689        protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
690            RemoteType::Mysql(MysqlType::IntegerUnsigned)
691        }
692        protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
693        protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
694            RemoteType::Mysql(MysqlType::BigIntUnsigned)
695        }
696        protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
697        protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
698        protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
699            MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
700        ),
701        protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
702        protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
703        protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
704        protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
705        protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
706        protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
707        protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
708        protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
709        protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
710        protobuf::remote_type::Type::MysqlText(text) => {
711            RemoteType::Mysql(MysqlType::Text(text.length))
712        }
713        protobuf::remote_type::Type::MysqlBlob(blob) => {
714            RemoteType::Mysql(MysqlType::Blob(blob.length))
715        }
716        protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
717        protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
718        protobuf::remote_type::Type::OracleVarchar2(varchar) => {
719            RemoteType::Oracle(OracleType::Varchar2(varchar.length))
720        }
721        protobuf::remote_type::Type::OracleChar(char) => {
722            RemoteType::Oracle(OracleType::Char(char.length))
723        }
724        protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
725            OracleType::Number(number.precision as u8, number.scale as i8),
726        ),
727        protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
728        protobuf::remote_type::Type::OracleTimestamp(_) => {
729            RemoteType::Oracle(OracleType::Timestamp)
730        }
731        protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
732        protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
733        protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
734        protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
735        protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
736    }
737}