1use crate::generated::prost as protobuf;
2use crate::{
3 connect, ConnectionOptions, DFResult, MysqlConnectionOptions, MysqlType,
4 OracleConnectionOptions, OracleType, PostgresConnectionOptions, PostgresType, RemoteField,
5 RemoteSchema, RemoteSchemaRef, RemoteTableExec, RemoteType, SqliteType, Transform,
6};
7use datafusion::arrow::datatypes::SchemaRef;
8use datafusion::common::DataFusionError;
9use datafusion::execution::FunctionRegistry;
10use datafusion::physical_plan::ExecutionPlan;
11use datafusion_proto::convert_required;
12use datafusion_proto::physical_plan::PhysicalExtensionCodec;
13use datafusion_proto::protobuf::proto_error;
14use prost::Message;
15use std::fmt::Debug;
16use std::path::Path;
17use std::sync::Arc;
18
19pub trait TransformCodec: Debug + Send + Sync {
20 fn try_encode(&self, value: &dyn Transform) -> DFResult<Vec<u8>>;
21 fn try_decode(&self, value: &[u8]) -> DFResult<Arc<dyn Transform>>;
22}
23
24#[derive(Debug)]
25pub struct RemotePhysicalCodec {
26 transform_codec: Option<Arc<dyn TransformCodec>>,
27}
28
29impl RemotePhysicalCodec {
30 pub fn new(transform_codec: Option<Arc<dyn TransformCodec>>) -> Self {
31 Self { transform_codec }
32 }
33}
34
35impl PhysicalExtensionCodec for RemotePhysicalCodec {
36 fn try_decode(
37 &self,
38 buf: &[u8],
39 _inputs: &[Arc<dyn ExecutionPlan>],
40 _registry: &dyn FunctionRegistry,
41 ) -> DFResult<Arc<dyn ExecutionPlan>> {
42 let proto = protobuf::RemoteTableExec::decode(buf).map_err(|e| {
43 DataFusionError::Internal(format!(
44 "Failed to decode remote table execution plan: {e:?}"
45 ))
46 })?;
47
48 let transform = if let Some(bytes) = proto.transform {
49 let Some(transform_codec) = self.transform_codec.as_ref() else {
50 return Err(DataFusionError::Execution(
51 "No transform codec found".to_string(),
52 ));
53 };
54 Some(transform_codec.try_decode(&bytes)?)
55 } else {
56 None
57 };
58
59 let table_schema: SchemaRef = Arc::new(convert_required!(&proto.table_schema)?);
60 let remote_schema = proto
61 .remote_schema
62 .map(|schema| Arc::new(parse_remote_schema(&schema)));
63
64 let projection: Option<Vec<usize>> = proto
65 .projection
66 .map(|p| p.projection.iter().map(|n| *n as usize).collect());
67
68 let conn_options = parse_connection_options(proto.conn_options.unwrap());
69 let conn = tokio::task::block_in_place(|| {
70 tokio::runtime::Handle::current().block_on(async {
71 let pool = connect(&conn_options).await?;
72 let conn = pool.get().await?;
73 Ok::<_, DataFusionError>(conn)
74 })
75 })?;
76
77 Ok(Arc::new(RemoteTableExec::try_new(
78 conn_options,
79 proto.sql,
80 table_schema,
81 remote_schema,
82 projection,
83 transform,
84 conn,
85 )?))
86 }
87
88 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
89 if let Some(exec) = node.as_any().downcast_ref::<RemoteTableExec>() {
90 let serialized_transform = if let Some(transform) = exec.transform.as_ref() {
91 let Some(transform_codec) = self.transform_codec.as_ref() else {
92 return Err(DataFusionError::Execution(
93 "No transform codec found".to_string(),
94 ));
95 };
96 let bytes = transform_codec.try_encode(transform.as_ref())?;
97 Some(bytes)
98 } else {
99 None
100 };
101
102 let serialized_connection_options = serialize_connection_options(&exec.conn_options);
103 let remote_schema = exec.remote_schema.as_ref().map(serialize_remote_schema);
104
105 let proto = protobuf::RemoteTableExec {
106 conn_options: Some(serialized_connection_options),
107 sql: exec.sql.clone(),
108 table_schema: Some(exec.table_schema.as_ref().try_into()?),
109 remote_schema,
110 projection: exec
111 .projection
112 .as_ref()
113 .map(|p| serialize_projection(p.as_slice())),
114 transform: serialized_transform,
115 };
116
117 proto.encode(buf).map_err(|e| {
118 DataFusionError::Internal(format!(
119 "Failed to encode remote table execution plan: {e:?}"
120 ))
121 })?;
122 Ok(())
123 } else {
124 Err(DataFusionError::Execution(format!(
125 "Failed to encode {}",
126 RemoteTableExec::static_name()
127 )))
128 }
129 }
130}
131
132fn serialize_connection_options(options: &ConnectionOptions) -> protobuf::ConnectionOptions {
133 match options {
134 ConnectionOptions::Postgres(options) => protobuf::ConnectionOptions {
135 connection_options: Some(protobuf::connection_options::ConnectionOptions::Postgres(
136 protobuf::PostgresConnectionOptions {
137 host: options.host.clone(),
138 port: options.port as u32,
139 username: options.username.clone(),
140 password: options.password.clone(),
141 database: options.database.clone(),
142 },
143 )),
144 },
145 ConnectionOptions::Mysql(options) => protobuf::ConnectionOptions {
146 connection_options: Some(protobuf::connection_options::ConnectionOptions::Mysql(
147 protobuf::MysqlConnectionOptions {
148 host: options.host.clone(),
149 port: options.port as u32,
150 username: options.username.clone(),
151 password: options.password.clone(),
152 database: options.database.clone(),
153 },
154 )),
155 },
156 ConnectionOptions::Oracle(options) => protobuf::ConnectionOptions {
157 connection_options: Some(protobuf::connection_options::ConnectionOptions::Oracle(
158 protobuf::OracleConnectionOptions {
159 host: options.host.clone(),
160 port: options.port as u32,
161 username: options.username.clone(),
162 password: options.password.clone(),
163 service_name: options.service_name.clone(),
164 },
165 )),
166 },
167 ConnectionOptions::Sqlite(path) => protobuf::ConnectionOptions {
168 connection_options: Some(protobuf::connection_options::ConnectionOptions::Sqlite(
169 protobuf::SqliteConnectionOptions {
170 path: path.to_str().unwrap().to_string(),
171 },
172 )),
173 },
174 }
175}
176
177fn parse_connection_options(options: protobuf::ConnectionOptions) -> ConnectionOptions {
178 match options.connection_options {
179 Some(protobuf::connection_options::ConnectionOptions::Postgres(options)) => {
180 ConnectionOptions::Postgres(PostgresConnectionOptions {
181 host: options.host,
182 port: options.port as u16,
183 username: options.username,
184 password: options.password,
185 database: options.database,
186 })
187 }
188 Some(protobuf::connection_options::ConnectionOptions::Mysql(options)) => {
189 ConnectionOptions::Mysql(MysqlConnectionOptions {
190 host: options.host,
191 port: options.port as u16,
192 username: options.username,
193 password: options.password,
194 database: options.database,
195 })
196 }
197 Some(protobuf::connection_options::ConnectionOptions::Oracle(options)) => {
198 ConnectionOptions::Oracle(OracleConnectionOptions {
199 host: options.host,
200 port: options.port as u16,
201 username: options.username,
202 password: options.password,
203 service_name: options.service_name,
204 })
205 }
206 Some(protobuf::connection_options::ConnectionOptions::Sqlite(options)) => {
207 ConnectionOptions::Sqlite(Path::new(&options.path).to_path_buf())
208 }
209 _ => panic!("Failed to parse connection options: {options:?}"),
210 }
211}
212
213fn serialize_projection(projection: &[usize]) -> protobuf::Projection {
214 protobuf::Projection {
215 projection: projection.iter().map(|n| *n as u32).collect(),
216 }
217}
218
219fn serialize_remote_schema(remote_schema: &RemoteSchemaRef) -> protobuf::RemoteSchema {
220 let fields = remote_schema
221 .fields
222 .iter()
223 .map(serialize_remote_field)
224 .collect::<Vec<_>>();
225
226 protobuf::RemoteSchema { fields }
227}
228
229fn serialize_remote_field(remote_field: &RemoteField) -> protobuf::RemoteField {
230 protobuf::RemoteField {
231 name: remote_field.name.clone(),
232 remote_type: Some(serialize_remote_type(&remote_field.remote_type)),
233 nullable: remote_field.nullable,
234 }
235}
236
237fn serialize_remote_type(remote_type: &RemoteType) -> protobuf::RemoteType {
238 match remote_type {
239 RemoteType::Postgres(PostgresType::Int2) => protobuf::RemoteType {
240 r#type: Some(protobuf::remote_type::Type::PostgresInt2(
241 protobuf::PostgresInt2 {},
242 )),
243 },
244 RemoteType::Postgres(PostgresType::Int4) => protobuf::RemoteType {
245 r#type: Some(protobuf::remote_type::Type::PostgresInt4(
246 protobuf::PostgresInt4 {},
247 )),
248 },
249 RemoteType::Postgres(PostgresType::Int8) => protobuf::RemoteType {
250 r#type: Some(protobuf::remote_type::Type::PostgresInt8(
251 protobuf::PostgresInt8 {},
252 )),
253 },
254 RemoteType::Postgres(PostgresType::Float4) => protobuf::RemoteType {
255 r#type: Some(protobuf::remote_type::Type::PostgresFloat4(
256 protobuf::PostgresFloat4 {},
257 )),
258 },
259 RemoteType::Postgres(PostgresType::Float8) => protobuf::RemoteType {
260 r#type: Some(protobuf::remote_type::Type::PostgresFloat8(
261 protobuf::PostgresFloat8 {},
262 )),
263 },
264 RemoteType::Postgres(PostgresType::Numeric(scale)) => protobuf::RemoteType {
265 r#type: Some(protobuf::remote_type::Type::PostgresNumeric(
266 protobuf::PostgresNumeric {
267 scale: *scale as i32,
268 },
269 )),
270 },
271 RemoteType::Postgres(PostgresType::Name) => protobuf::RemoteType {
272 r#type: Some(protobuf::remote_type::Type::PostgresName(
273 protobuf::PostgresName {},
274 )),
275 },
276 RemoteType::Postgres(PostgresType::Varchar) => protobuf::RemoteType {
277 r#type: Some(protobuf::remote_type::Type::PostgresVarchar(
278 protobuf::PostgresVarchar {},
279 )),
280 },
281 RemoteType::Postgres(PostgresType::Bpchar) => protobuf::RemoteType {
282 r#type: Some(protobuf::remote_type::Type::PostgresBpchar(
283 protobuf::PostgresBpchar {},
284 )),
285 },
286 RemoteType::Postgres(PostgresType::Text) => protobuf::RemoteType {
287 r#type: Some(protobuf::remote_type::Type::PostgresText(
288 protobuf::PostgresText {},
289 )),
290 },
291 RemoteType::Postgres(PostgresType::Bytea) => protobuf::RemoteType {
292 r#type: Some(protobuf::remote_type::Type::PostgresBytea(
293 protobuf::PostgresBytea {},
294 )),
295 },
296 RemoteType::Postgres(PostgresType::Date) => protobuf::RemoteType {
297 r#type: Some(protobuf::remote_type::Type::PostgresDate(
298 protobuf::PostgresDate {},
299 )),
300 },
301 RemoteType::Postgres(PostgresType::Timestamp) => protobuf::RemoteType {
302 r#type: Some(protobuf::remote_type::Type::PostgresTimestamp(
303 protobuf::PostgresTimestamp {},
304 )),
305 },
306 RemoteType::Postgres(PostgresType::TimestampTz) => protobuf::RemoteType {
307 r#type: Some(protobuf::remote_type::Type::PostgresTimestampTz(
308 protobuf::PostgresTimestampTz {},
309 )),
310 },
311 RemoteType::Postgres(PostgresType::Time) => protobuf::RemoteType {
312 r#type: Some(protobuf::remote_type::Type::PostgresTime(
313 protobuf::PostgresTime {},
314 )),
315 },
316 RemoteType::Postgres(PostgresType::Interval) => protobuf::RemoteType {
317 r#type: Some(protobuf::remote_type::Type::PostgresInterval(
318 protobuf::PostgresInterval {},
319 )),
320 },
321 RemoteType::Postgres(PostgresType::Bool) => protobuf::RemoteType {
322 r#type: Some(protobuf::remote_type::Type::PostgresBool(
323 protobuf::PostgresBool {},
324 )),
325 },
326 RemoteType::Postgres(PostgresType::Json) => protobuf::RemoteType {
327 r#type: Some(protobuf::remote_type::Type::PostgresJson(
328 protobuf::PostgresJson {},
329 )),
330 },
331 RemoteType::Postgres(PostgresType::Jsonb) => protobuf::RemoteType {
332 r#type: Some(protobuf::remote_type::Type::PostgresJsonb(
333 protobuf::PostgresJsonb {},
334 )),
335 },
336 RemoteType::Postgres(PostgresType::Int2Array) => protobuf::RemoteType {
337 r#type: Some(protobuf::remote_type::Type::PostgresInt2Array(
338 protobuf::PostgresInt2Array {},
339 )),
340 },
341 RemoteType::Postgres(PostgresType::Int4Array) => protobuf::RemoteType {
342 r#type: Some(protobuf::remote_type::Type::PostgresInt4Array(
343 protobuf::PostgresInt4Array {},
344 )),
345 },
346 RemoteType::Postgres(PostgresType::Int8Array) => protobuf::RemoteType {
347 r#type: Some(protobuf::remote_type::Type::PostgresInt8Array(
348 protobuf::PostgresInt8Array {},
349 )),
350 },
351 RemoteType::Postgres(PostgresType::Float4Array) => protobuf::RemoteType {
352 r#type: Some(protobuf::remote_type::Type::PostgresFloat4Array(
353 protobuf::PostgresFloat4Array {},
354 )),
355 },
356 RemoteType::Postgres(PostgresType::Float8Array) => protobuf::RemoteType {
357 r#type: Some(protobuf::remote_type::Type::PostgresFloat8Array(
358 protobuf::PostgresFloat8Array {},
359 )),
360 },
361 RemoteType::Postgres(PostgresType::VarcharArray) => protobuf::RemoteType {
362 r#type: Some(protobuf::remote_type::Type::PostgresVarcharArray(
363 protobuf::PostgresVarcharArray {},
364 )),
365 },
366 RemoteType::Postgres(PostgresType::BpcharArray) => protobuf::RemoteType {
367 r#type: Some(protobuf::remote_type::Type::PostgresBpcharArray(
368 protobuf::PostgresBpcharArray {},
369 )),
370 },
371 RemoteType::Postgres(PostgresType::TextArray) => protobuf::RemoteType {
372 r#type: Some(protobuf::remote_type::Type::PostgresTextArray(
373 protobuf::PostgresTextArray {},
374 )),
375 },
376 RemoteType::Postgres(PostgresType::ByteaArray) => protobuf::RemoteType {
377 r#type: Some(protobuf::remote_type::Type::PostgresByteaArray(
378 protobuf::PostgresByteaArray {},
379 )),
380 },
381 RemoteType::Postgres(PostgresType::BoolArray) => protobuf::RemoteType {
382 r#type: Some(protobuf::remote_type::Type::PostgresBoolArray(
383 protobuf::PostgresBoolArray {},
384 )),
385 },
386 RemoteType::Postgres(PostgresType::PostGisGeometry) => protobuf::RemoteType {
387 r#type: Some(protobuf::remote_type::Type::PostgresPostgisGeometry(
388 protobuf::PostgresPostGisGeometry {},
389 )),
390 },
391
392 RemoteType::Mysql(MysqlType::TinyInt) => protobuf::RemoteType {
393 r#type: Some(protobuf::remote_type::Type::MysqlTinyInt(
394 protobuf::MysqlTinyInt {},
395 )),
396 },
397 RemoteType::Mysql(MysqlType::TinyIntUnsigned) => protobuf::RemoteType {
398 r#type: Some(protobuf::remote_type::Type::MysqlTinyIntUnsigned(
399 protobuf::MysqlTinyIntUnsigned {},
400 )),
401 },
402 RemoteType::Mysql(MysqlType::SmallInt) => protobuf::RemoteType {
403 r#type: Some(protobuf::remote_type::Type::MysqlSmallInt(
404 protobuf::MysqlSmallInt {},
405 )),
406 },
407 RemoteType::Mysql(MysqlType::SmallIntUnsigned) => protobuf::RemoteType {
408 r#type: Some(protobuf::remote_type::Type::MysqlSmallIntUnsigned(
409 protobuf::MysqlSmallIntUnsigned {},
410 )),
411 },
412 RemoteType::Mysql(MysqlType::MediumInt) => protobuf::RemoteType {
413 r#type: Some(protobuf::remote_type::Type::MysqlMediumInt(
414 protobuf::MysqlMediumInt {},
415 )),
416 },
417 RemoteType::Mysql(MysqlType::MediumIntUnsigned) => protobuf::RemoteType {
418 r#type: Some(protobuf::remote_type::Type::MysqlMediumIntUnsigned(
419 protobuf::MysqlMediumIntUnsigned {},
420 )),
421 },
422 RemoteType::Mysql(MysqlType::Integer) => protobuf::RemoteType {
423 r#type: Some(protobuf::remote_type::Type::MysqlInteger(
424 protobuf::MysqlInteger {},
425 )),
426 },
427 RemoteType::Mysql(MysqlType::IntegerUnsigned) => protobuf::RemoteType {
428 r#type: Some(protobuf::remote_type::Type::MysqlIntegerUnsigned(
429 protobuf::MysqlIntegerUnsigned {},
430 )),
431 },
432 RemoteType::Mysql(MysqlType::BigInt) => protobuf::RemoteType {
433 r#type: Some(protobuf::remote_type::Type::MysqlBigInt(
434 protobuf::MysqlBigInt {},
435 )),
436 },
437 RemoteType::Mysql(MysqlType::BigIntUnsigned) => protobuf::RemoteType {
438 r#type: Some(protobuf::remote_type::Type::MysqlBigIntUnsigned(
439 protobuf::MysqlBigIntUnsigned {},
440 )),
441 },
442 RemoteType::Mysql(MysqlType::Float) => protobuf::RemoteType {
443 r#type: Some(protobuf::remote_type::Type::MysqlFloat(
444 protobuf::MysqlFloat {},
445 )),
446 },
447 RemoteType::Mysql(MysqlType::Double) => protobuf::RemoteType {
448 r#type: Some(protobuf::remote_type::Type::MysqlDouble(
449 protobuf::MysqlDouble {},
450 )),
451 },
452 RemoteType::Mysql(MysqlType::Decimal(precision, scale)) => protobuf::RemoteType {
453 r#type: Some(protobuf::remote_type::Type::MysqlDecimal(
454 protobuf::MysqlDecimal {
455 precision: *precision as u32,
456 scale: *scale as u32,
457 },
458 )),
459 },
460 RemoteType::Mysql(MysqlType::Date) => protobuf::RemoteType {
461 r#type: Some(protobuf::remote_type::Type::MysqlDate(
462 protobuf::MysqlDate {},
463 )),
464 },
465 RemoteType::Mysql(MysqlType::Datetime) => protobuf::RemoteType {
466 r#type: Some(protobuf::remote_type::Type::MysqlDateTime(
467 protobuf::MysqlDateTime {},
468 )),
469 },
470 RemoteType::Mysql(MysqlType::Time) => protobuf::RemoteType {
471 r#type: Some(protobuf::remote_type::Type::MysqlTime(
472 protobuf::MysqlTime {},
473 )),
474 },
475 RemoteType::Mysql(MysqlType::Timestamp) => protobuf::RemoteType {
476 r#type: Some(protobuf::remote_type::Type::MysqlTimestamp(
477 protobuf::MysqlTimestamp {},
478 )),
479 },
480 RemoteType::Mysql(MysqlType::Year) => protobuf::RemoteType {
481 r#type: Some(protobuf::remote_type::Type::MysqlYear(
482 protobuf::MysqlYear {},
483 )),
484 },
485 RemoteType::Mysql(MysqlType::Char) => protobuf::RemoteType {
486 r#type: Some(protobuf::remote_type::Type::MysqlChar(
487 protobuf::MysqlChar {},
488 )),
489 },
490 RemoteType::Mysql(MysqlType::Varchar) => protobuf::RemoteType {
491 r#type: Some(protobuf::remote_type::Type::MysqlVarchar(
492 protobuf::MysqlVarchar {},
493 )),
494 },
495 RemoteType::Mysql(MysqlType::Binary) => protobuf::RemoteType {
496 r#type: Some(protobuf::remote_type::Type::MysqlBinary(
497 protobuf::MysqlBinary {},
498 )),
499 },
500 RemoteType::Mysql(MysqlType::Varbinary) => protobuf::RemoteType {
501 r#type: Some(protobuf::remote_type::Type::MysqlVarbinary(
502 protobuf::MysqlVarbinary {},
503 )),
504 },
505 RemoteType::Mysql(MysqlType::Text(len)) => protobuf::RemoteType {
506 r#type: Some(protobuf::remote_type::Type::MysqlText(
507 protobuf::MysqlText { length: *len },
508 )),
509 },
510 RemoteType::Mysql(MysqlType::Blob(len)) => protobuf::RemoteType {
511 r#type: Some(protobuf::remote_type::Type::MysqlBlob(
512 protobuf::MysqlBlob { length: *len },
513 )),
514 },
515 RemoteType::Mysql(MysqlType::Json) => protobuf::RemoteType {
516 r#type: Some(protobuf::remote_type::Type::MysqlJson(
517 protobuf::MysqlJson {},
518 )),
519 },
520 RemoteType::Mysql(MysqlType::Geometry) => protobuf::RemoteType {
521 r#type: Some(protobuf::remote_type::Type::MysqlGeometry(
522 protobuf::MysqlGeometry {},
523 )),
524 },
525
526 RemoteType::Oracle(OracleType::Varchar2(len)) => protobuf::RemoteType {
527 r#type: Some(protobuf::remote_type::Type::OracleVarchar2(
528 protobuf::OracleVarchar2 { length: *len },
529 )),
530 },
531 RemoteType::Oracle(OracleType::Char(len)) => protobuf::RemoteType {
532 r#type: Some(protobuf::remote_type::Type::OracleChar(
533 protobuf::OracleChar { length: *len },
534 )),
535 },
536 RemoteType::Oracle(OracleType::Number(precision, scale)) => protobuf::RemoteType {
537 r#type: Some(protobuf::remote_type::Type::OracleNumber(
538 protobuf::OracleNumber {
539 precision: *precision as u32,
540 scale: *scale as i32,
541 },
542 )),
543 },
544 RemoteType::Oracle(OracleType::Date) => protobuf::RemoteType {
545 r#type: Some(protobuf::remote_type::Type::OracleDate(
546 protobuf::OracleDate {},
547 )),
548 },
549 RemoteType::Oracle(OracleType::Timestamp) => protobuf::RemoteType {
550 r#type: Some(protobuf::remote_type::Type::OracleTimestamp(
551 protobuf::OracleTimestamp {},
552 )),
553 },
554 RemoteType::Sqlite(SqliteType::Null) => protobuf::RemoteType {
555 r#type: Some(protobuf::remote_type::Type::SqliteNull(
556 protobuf::SqliteNull {},
557 )),
558 },
559 RemoteType::Sqlite(SqliteType::Integer) => protobuf::RemoteType {
560 r#type: Some(protobuf::remote_type::Type::SqliteInteger(
561 protobuf::SqliteInteger {},
562 )),
563 },
564 RemoteType::Sqlite(SqliteType::Real) => protobuf::RemoteType {
565 r#type: Some(protobuf::remote_type::Type::SqliteReal(
566 protobuf::SqliteReal {},
567 )),
568 },
569 RemoteType::Sqlite(SqliteType::Text) => protobuf::RemoteType {
570 r#type: Some(protobuf::remote_type::Type::SqliteText(
571 protobuf::SqliteText {},
572 )),
573 },
574 RemoteType::Sqlite(SqliteType::Blob) => protobuf::RemoteType {
575 r#type: Some(protobuf::remote_type::Type::SqliteBlob(
576 protobuf::SqliteBlob {},
577 )),
578 },
579 }
580}
581
582fn parse_remote_schema(remote_schema: &protobuf::RemoteSchema) -> RemoteSchema {
583 let fields = remote_schema
584 .fields
585 .iter()
586 .map(parse_remote_field)
587 .collect::<Vec<_>>();
588
589 RemoteSchema { fields }
590}
591
592fn parse_remote_field(field: &protobuf::RemoteField) -> RemoteField {
593 RemoteField {
594 name: field.name.clone(),
595 remote_type: parse_remote_type(field.remote_type.as_ref().unwrap()),
596 nullable: field.nullable,
597 }
598}
599
600fn parse_remote_type(remote_type: &protobuf::RemoteType) -> RemoteType {
601 match remote_type.r#type.as_ref().unwrap() {
602 protobuf::remote_type::Type::PostgresInt2(_) => RemoteType::Postgres(PostgresType::Int2),
603 protobuf::remote_type::Type::PostgresInt4(_) => RemoteType::Postgres(PostgresType::Int4),
604 protobuf::remote_type::Type::PostgresInt8(_) => RemoteType::Postgres(PostgresType::Int8),
605 protobuf::remote_type::Type::PostgresFloat4(_) => {
606 RemoteType::Postgres(PostgresType::Float4)
607 }
608 protobuf::remote_type::Type::PostgresFloat8(_) => {
609 RemoteType::Postgres(PostgresType::Float8)
610 }
611 protobuf::remote_type::Type::PostgresNumeric(numeric) => {
612 RemoteType::Postgres(PostgresType::Numeric(numeric.scale as i8))
613 }
614 protobuf::remote_type::Type::PostgresName(_) => RemoteType::Postgres(PostgresType::Name),
615 protobuf::remote_type::Type::PostgresVarchar(_) => {
616 RemoteType::Postgres(PostgresType::Varchar)
617 }
618 protobuf::remote_type::Type::PostgresBpchar(_) => {
619 RemoteType::Postgres(PostgresType::Bpchar)
620 }
621 protobuf::remote_type::Type::PostgresText(_) => RemoteType::Postgres(PostgresType::Text),
622 protobuf::remote_type::Type::PostgresBytea(_) => RemoteType::Postgres(PostgresType::Bytea),
623 protobuf::remote_type::Type::PostgresDate(_) => RemoteType::Postgres(PostgresType::Date),
624 protobuf::remote_type::Type::PostgresTimestamp(_) => {
625 RemoteType::Postgres(PostgresType::Timestamp)
626 }
627 protobuf::remote_type::Type::PostgresTimestampTz(_) => {
628 RemoteType::Postgres(PostgresType::TimestampTz)
629 }
630 protobuf::remote_type::Type::PostgresTime(_) => RemoteType::Postgres(PostgresType::Time),
631 protobuf::remote_type::Type::PostgresInterval(_) => {
632 RemoteType::Postgres(PostgresType::Interval)
633 }
634 protobuf::remote_type::Type::PostgresBool(_) => RemoteType::Postgres(PostgresType::Bool),
635 protobuf::remote_type::Type::PostgresJson(_) => RemoteType::Postgres(PostgresType::Json),
636 protobuf::remote_type::Type::PostgresJsonb(_) => RemoteType::Postgres(PostgresType::Jsonb),
637 protobuf::remote_type::Type::PostgresInt2Array(_) => {
638 RemoteType::Postgres(PostgresType::Int2Array)
639 }
640 protobuf::remote_type::Type::PostgresInt4Array(_) => {
641 RemoteType::Postgres(PostgresType::Int4Array)
642 }
643 protobuf::remote_type::Type::PostgresInt8Array(_) => {
644 RemoteType::Postgres(PostgresType::Int8Array)
645 }
646 protobuf::remote_type::Type::PostgresFloat4Array(_) => {
647 RemoteType::Postgres(PostgresType::Float4Array)
648 }
649 protobuf::remote_type::Type::PostgresFloat8Array(_) => {
650 RemoteType::Postgres(PostgresType::Float8Array)
651 }
652 protobuf::remote_type::Type::PostgresVarcharArray(_) => {
653 RemoteType::Postgres(PostgresType::VarcharArray)
654 }
655 protobuf::remote_type::Type::PostgresBpcharArray(_) => {
656 RemoteType::Postgres(PostgresType::BpcharArray)
657 }
658 protobuf::remote_type::Type::PostgresTextArray(_) => {
659 RemoteType::Postgres(PostgresType::TextArray)
660 }
661 protobuf::remote_type::Type::PostgresByteaArray(_) => {
662 RemoteType::Postgres(PostgresType::ByteaArray)
663 }
664 protobuf::remote_type::Type::PostgresBoolArray(_) => {
665 RemoteType::Postgres(PostgresType::BoolArray)
666 }
667 protobuf::remote_type::Type::PostgresPostgisGeometry(_) => {
668 RemoteType::Postgres(PostgresType::PostGisGeometry)
669 }
670 protobuf::remote_type::Type::MysqlTinyInt(_) => RemoteType::Mysql(MysqlType::TinyInt),
671 protobuf::remote_type::Type::MysqlTinyIntUnsigned(_) => {
672 RemoteType::Mysql(MysqlType::TinyIntUnsigned)
673 }
674 protobuf::remote_type::Type::MysqlSmallInt(_) => RemoteType::Mysql(MysqlType::SmallInt),
675 protobuf::remote_type::Type::MysqlSmallIntUnsigned(_) => {
676 RemoteType::Mysql(MysqlType::SmallIntUnsigned)
677 }
678 protobuf::remote_type::Type::MysqlMediumInt(_) => RemoteType::Mysql(MysqlType::MediumInt),
679 protobuf::remote_type::Type::MysqlMediumIntUnsigned(_) => {
680 RemoteType::Mysql(MysqlType::MediumIntUnsigned)
681 }
682 protobuf::remote_type::Type::MysqlInteger(_) => RemoteType::Mysql(MysqlType::Integer),
683 protobuf::remote_type::Type::MysqlIntegerUnsigned(_) => {
684 RemoteType::Mysql(MysqlType::IntegerUnsigned)
685 }
686 protobuf::remote_type::Type::MysqlBigInt(_) => RemoteType::Mysql(MysqlType::BigInt),
687 protobuf::remote_type::Type::MysqlBigIntUnsigned(_) => {
688 RemoteType::Mysql(MysqlType::BigIntUnsigned)
689 }
690 protobuf::remote_type::Type::MysqlFloat(_) => RemoteType::Mysql(MysqlType::Float),
691 protobuf::remote_type::Type::MysqlDouble(_) => RemoteType::Mysql(MysqlType::Double),
692 protobuf::remote_type::Type::MysqlDecimal(decimal) => RemoteType::Mysql(
693 MysqlType::Decimal(decimal.precision as u8, decimal.scale as u8),
694 ),
695 protobuf::remote_type::Type::MysqlDate(_) => RemoteType::Mysql(MysqlType::Date),
696 protobuf::remote_type::Type::MysqlDateTime(_) => RemoteType::Mysql(MysqlType::Datetime),
697 protobuf::remote_type::Type::MysqlTime(_) => RemoteType::Mysql(MysqlType::Time),
698 protobuf::remote_type::Type::MysqlTimestamp(_) => RemoteType::Mysql(MysqlType::Timestamp),
699 protobuf::remote_type::Type::MysqlYear(_) => RemoteType::Mysql(MysqlType::Year),
700 protobuf::remote_type::Type::MysqlChar(_) => RemoteType::Mysql(MysqlType::Char),
701 protobuf::remote_type::Type::MysqlVarchar(_) => RemoteType::Mysql(MysqlType::Varchar),
702 protobuf::remote_type::Type::MysqlBinary(_) => RemoteType::Mysql(MysqlType::Binary),
703 protobuf::remote_type::Type::MysqlVarbinary(_) => RemoteType::Mysql(MysqlType::Varbinary),
704 protobuf::remote_type::Type::MysqlText(text) => {
705 RemoteType::Mysql(MysqlType::Text(text.length))
706 }
707 protobuf::remote_type::Type::MysqlBlob(blob) => {
708 RemoteType::Mysql(MysqlType::Blob(blob.length))
709 }
710 protobuf::remote_type::Type::MysqlJson(_) => RemoteType::Mysql(MysqlType::Json),
711 protobuf::remote_type::Type::MysqlGeometry(_) => RemoteType::Mysql(MysqlType::Geometry),
712 protobuf::remote_type::Type::OracleVarchar2(varchar) => {
713 RemoteType::Oracle(OracleType::Varchar2(varchar.length))
714 }
715 protobuf::remote_type::Type::OracleChar(char) => {
716 RemoteType::Oracle(OracleType::Char(char.length))
717 }
718 protobuf::remote_type::Type::OracleNumber(number) => RemoteType::Oracle(
719 OracleType::Number(number.precision as u8, number.scale as i8),
720 ),
721 protobuf::remote_type::Type::OracleDate(_) => RemoteType::Oracle(OracleType::Date),
722 protobuf::remote_type::Type::OracleTimestamp(_) => {
723 RemoteType::Oracle(OracleType::Timestamp)
724 }
725 protobuf::remote_type::Type::SqliteNull(_) => RemoteType::Sqlite(SqliteType::Null),
726 protobuf::remote_type::Type::SqliteInteger(_) => RemoteType::Sqlite(SqliteType::Integer),
727 protobuf::remote_type::Type::SqliteReal(_) => RemoteType::Sqlite(SqliteType::Real),
728 protobuf::remote_type::Type::SqliteText(_) => RemoteType::Sqlite(SqliteType::Text),
729 protobuf::remote_type::Type::SqliteBlob(_) => RemoteType::Sqlite(SqliteType::Blob),
730 }
731}