1use crate::connection::{big_decimal_to_i128, projections_contains};
2use crate::transform::transform_batch;
3use crate::{
4 project_remote_schema, Connection, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
5 RemoteType, Transform,
6};
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bb8_postgres::PostgresConnectionManager;
10use bigdecimal::BigDecimal;
11use byteorder::{BigEndian, ReadBytesExt};
12use chrono::Timelike;
13use datafusion::arrow::array::{
14 make_builder, ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder,
15 Decimal128Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
16 IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
17 Time64NanosecondBuilder, TimestampNanosecondBuilder,
18};
19use datafusion::arrow::datatypes::{Date32Type, IntervalMonthDayNanoType, SchemaRef};
20use datafusion::common::project_schema;
21use datafusion::error::DataFusionError;
22use datafusion::execution::SendableRecordBatchStream;
23use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
24use futures::StreamExt;
25use num_bigint::{BigInt, Sign};
26use std::string::ToString;
27use std::sync::Arc;
28use std::time::{SystemTime, UNIX_EPOCH};
29
30#[derive(Debug, Clone, derive_with::With)]
31pub struct PostgresConnectionOptions {
32 pub(crate) host: String,
33 pub(crate) port: u16,
34 pub(crate) username: String,
35 pub(crate) password: String,
36 pub(crate) database: Option<String>,
37}
38
39impl PostgresConnectionOptions {
40 pub fn new(
41 host: impl Into<String>,
42 port: u16,
43 username: impl Into<String>,
44 password: impl Into<String>,
45 ) -> Self {
46 Self {
47 host: host.into(),
48 port,
49 username: username.into(),
50 password: password.into(),
51 database: None,
52 }
53 }
54}
55
56#[derive(Debug)]
57pub struct PostgresPool {
58 pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
59}
60
61#[async_trait::async_trait]
62impl Pool for PostgresPool {
63 async fn get(&self) -> DFResult<Arc<dyn Connection>> {
64 let conn = self.pool.get_owned().await.map_err(|e| {
65 DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
66 })?;
67 Ok(Arc::new(PostgresConnection { conn }))
68 }
69}
70
71pub(crate) async fn connect_postgres(
72 options: &PostgresConnectionOptions,
73) -> DFResult<PostgresPool> {
74 let mut config = bb8_postgres::tokio_postgres::config::Config::new();
75 config
76 .host(&options.host)
77 .port(options.port)
78 .user(&options.username)
79 .password(&options.password);
80 if let Some(database) = &options.database {
81 config.dbname(database);
82 }
83 let manager = PostgresConnectionManager::new(config, NoTls);
84 let pool = bb8::Pool::builder()
85 .max_size(5)
86 .build(manager)
87 .await
88 .map_err(|e| {
89 DataFusionError::Execution(format!(
90 "Failed to create postgres connection pool due to {e}",
91 ))
92 })?;
93
94 Ok(PostgresPool { pool })
95}
96
97#[derive(Debug)]
98pub(crate) struct PostgresConnection {
99 conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
100}
101
102#[async_trait::async_trait]
103impl Connection for PostgresConnection {
104 async fn infer_schema(
105 &self,
106 sql: &str,
107 transform: Option<Arc<dyn Transform>>,
108 ) -> DFResult<(RemoteSchema, SchemaRef)> {
109 let mut stream = self
110 .conn
111 .query_raw(sql, Vec::<String>::new())
112 .await
113 .map_err(|e| {
114 DataFusionError::Execution(format!(
115 "Failed to execute query {sql} on postgres due to {e}",
116 ))
117 })?
118 .chunks(1)
119 .boxed();
120
121 let Some(first_chunk) = stream.next().await else {
122 return Err(DataFusionError::Execution(
123 "No data returned from postgres".to_string(),
124 ));
125 };
126 let first_chunk: Vec<Row> = first_chunk
127 .into_iter()
128 .collect::<Result<Vec<_>, _>>()
129 .map_err(|e| {
130 DataFusionError::Execution(format!(
131 "Failed to collect rows from postgres due to {e}",
132 ))
133 })?;
134 let Some(first_row) = first_chunk.first() else {
135 return Err(DataFusionError::Execution(
136 "No data returned from postgres".to_string(),
137 ));
138 };
139 let remote_schema = build_remote_schema(first_row)?;
140 let arrow_schema = Arc::new(remote_schema.to_arrow_schema());
141 if let Some(transform) = transform {
142 let batch = rows_to_batch(
143 std::slice::from_ref(first_row),
144 &remote_schema,
145 arrow_schema,
146 None,
147 )?;
148 let transformed_batch = transform_batch(batch, transform.as_ref(), &remote_schema)?;
149 Ok((remote_schema, transformed_batch.schema()))
150 } else {
151 Ok((remote_schema, arrow_schema))
152 }
153 }
154
155 async fn query(
156 &self,
157 sql: String,
158 projection: Option<Vec<usize>>,
159 ) -> DFResult<(SendableRecordBatchStream, RemoteSchema)> {
160 let mut stream = self
161 .conn
162 .query_raw(&sql, Vec::<String>::new())
163 .await
164 .map_err(|e| {
165 DataFusionError::Execution(format!(
166 "Failed to execute query {sql} on postgres due to {e}",
167 ))
168 })?
169 .chunks(2048)
170 .boxed();
171
172 let Some(first_chunk) = stream.next().await else {
173 return Err(DataFusionError::Execution(
174 "No data returned from postgres".to_string(),
175 ));
176 };
177 let first_chunk: Vec<Row> = first_chunk
178 .into_iter()
179 .collect::<Result<Vec<_>, _>>()
180 .map_err(|e| {
181 DataFusionError::Execution(format!(
182 "Failed to collect rows from postgres due to {e}",
183 ))
184 })?;
185 let Some(first_row) = first_chunk.first() else {
186 return Err(DataFusionError::Execution(
187 "No data returned from postgres".to_string(),
188 ));
189 };
190 let remote_schema = build_remote_schema(first_row)?;
191 let projected_remote_schema = project_remote_schema(&remote_schema, projection.as_ref());
192 let arrow_schema = Arc::new(remote_schema.to_arrow_schema());
193 let first_chunk = rows_to_batch(
194 first_chunk.as_slice(),
195 &remote_schema,
196 arrow_schema.clone(),
197 projection.as_ref(),
198 )?;
199 let schema = first_chunk.schema();
200
201 let mut stream = stream.map(move |rows| {
202 let rows: Vec<Row> = rows
203 .into_iter()
204 .collect::<Result<Vec<_>, _>>()
205 .map_err(|e| {
206 DataFusionError::Execution(format!(
207 "Failed to collect rows from postgres due to {e}",
208 ))
209 })?;
210 let batch = rows_to_batch(
211 rows.as_slice(),
212 &remote_schema,
213 arrow_schema.clone(),
214 projection.as_ref(),
215 )?;
216 Ok::<RecordBatch, DataFusionError>(batch)
217 });
218
219 let output_stream = async_stream::stream! {
220 yield Ok(first_chunk);
221 while let Some(batch) = stream.next().await {
222 yield batch
223 }
224 };
225
226 Ok((
227 Box::pin(RecordBatchStreamAdapter::new(schema, output_stream)),
228 projected_remote_schema,
229 ))
230 }
231}
232
233fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<RemoteType> {
234 match pg_type {
235 &Type::INT2 => Ok(RemoteType::Postgres(PostgresType::Int2)),
236 &Type::INT4 => Ok(RemoteType::Postgres(PostgresType::Int4)),
237 &Type::INT8 => Ok(RemoteType::Postgres(PostgresType::Int8)),
238 &Type::FLOAT4 => Ok(RemoteType::Postgres(PostgresType::Float4)),
239 &Type::FLOAT8 => Ok(RemoteType::Postgres(PostgresType::Float8)),
240 &Type::NUMERIC => {
241 let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
242 DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
243 })?;
244 let scale = match v {
245 Some(v) => v.scale,
246 None => 0,
247 };
248 assert!((scale as u32) <= (i8::MAX as u32));
249 Ok(RemoteType::Postgres(PostgresType::Numeric(
250 scale.try_into().unwrap_or_default(),
251 )))
252 }
253 &Type::VARCHAR => Ok(RemoteType::Postgres(PostgresType::Varchar)),
254 &Type::BPCHAR => Ok(RemoteType::Postgres(PostgresType::Bpchar)),
255 &Type::TEXT => Ok(RemoteType::Postgres(PostgresType::Text)),
256 &Type::BYTEA => Ok(RemoteType::Postgres(PostgresType::Bytea)),
257 &Type::DATE => Ok(RemoteType::Postgres(PostgresType::Date)),
258 &Type::TIMESTAMP => Ok(RemoteType::Postgres(PostgresType::Timestamp)),
259 &Type::TIMESTAMPTZ => Ok(RemoteType::Postgres(PostgresType::TimestampTz)),
260 &Type::TIME => Ok(RemoteType::Postgres(PostgresType::Time)),
261 &Type::INTERVAL => Ok(RemoteType::Postgres(PostgresType::Interval)),
262 &Type::BOOL => Ok(RemoteType::Postgres(PostgresType::Bool)),
263 &Type::JSON => Ok(RemoteType::Postgres(PostgresType::Json)),
264 &Type::JSONB => Ok(RemoteType::Postgres(PostgresType::Jsonb)),
265 &Type::INT2_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int2Array)),
266 &Type::INT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int4Array)),
267 &Type::INT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int8Array)),
268 &Type::FLOAT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float4Array)),
269 &Type::FLOAT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float8Array)),
270 &Type::VARCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::VarcharArray)),
271 &Type::BPCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::BpcharArray)),
272 &Type::TEXT_ARRAY => Ok(RemoteType::Postgres(PostgresType::TextArray)),
273 &Type::BYTEA_ARRAY => Ok(RemoteType::Postgres(PostgresType::ByteaArray)),
274 &Type::BOOL_ARRAY => Ok(RemoteType::Postgres(PostgresType::BoolArray)),
275 other if other.name().eq_ignore_ascii_case("geometry") => {
276 Ok(RemoteType::Postgres(PostgresType::PostGisGeometry))
277 }
278 _ => Err(DataFusionError::NotImplemented(format!(
279 "Unsupported postgres type {pg_type:?}",
280 ))),
281 }
282}
283
284fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
285 let mut remote_fields = vec![];
286 for (idx, col) in row.columns().iter().enumerate() {
287 remote_fields.push(RemoteField::new(
288 col.name(),
289 pg_type_to_remote_type(col.type_(), row, idx)?,
290 true,
291 ));
292 }
293 Ok(RemoteSchema::new(remote_fields))
294}
295
296macro_rules! handle_primitive_type {
297 ($builder:expr, $field:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr) => {{
298 let builder = $builder
299 .as_any_mut()
300 .downcast_mut::<$builder_ty>()
301 .unwrap_or_else(|| {
302 panic!(
303 concat!(
304 "Failed to downcast builder to ",
305 stringify!($builder_ty),
306 " for {:?}"
307 ),
308 $field
309 )
310 });
311 let v: Option<$value_ty> = $row.try_get($index).unwrap_or_else(|e| {
312 panic!(
313 concat!(
314 "Failed to get ",
315 stringify!($value_ty),
316 " value for {:?}: {:?}"
317 ),
318 $field, e
319 )
320 });
321
322 match v {
323 Some(v) => builder.append_value(v),
324 None => builder.append_null(),
325 }
326 }};
327}
328
329macro_rules! handle_primitive_array_type {
330 ($builder:expr, $field:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
331 let builder = $builder
332 .as_any_mut()
333 .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
334 .unwrap_or_else(|| {
335 panic!(
336 "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?}",
337 $field
338 )
339 });
340 let values_builder = builder
341 .values()
342 .as_any_mut()
343 .downcast_mut::<$values_builder_ty>()
344 .unwrap_or_else(|| {
345 panic!(
346 concat!(
347 "Failed to downcast values builder to ",
348 stringify!($builder_ty),
349 " for {:?}"
350 ),
351 $field
352 )
353 });
354 let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).unwrap_or_else(|e| {
355 panic!(
356 concat!(
357 "Failed to get ",
358 stringify!($value_ty),
359 " array value for {:?}: {:?}"
360 ),
361 $field, e
362 )
363 });
364
365 match v {
366 Some(v) => {
367 let v = v.into_iter().map(Some);
368 values_builder.extend(v);
369 builder.append(true);
370 }
371 None => builder.append_null(),
372 }
373 }};
374}
375
376#[derive(Debug)]
377struct BigDecimalFromSql {
378 inner: BigDecimal,
379 scale: u16,
380}
381
382impl BigDecimalFromSql {
383 fn to_decimal_128(&self) -> Option<i128> {
384 big_decimal_to_i128(&self.inner, Some(self.scale as u32))
385 }
386}
387
388#[allow(clippy::cast_sign_loss)]
389#[allow(clippy::cast_possible_wrap)]
390#[allow(clippy::cast_possible_truncation)]
391impl<'a> FromSql<'a> for BigDecimalFromSql {
392 fn from_sql(
393 _ty: &Type,
394 raw: &'a [u8],
395 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
396 let raw_u16: Vec<u16> = raw
397 .chunks(2)
398 .map(|chunk| {
399 if chunk.len() == 2 {
400 u16::from_be_bytes([chunk[0], chunk[1]])
401 } else {
402 u16::from_be_bytes([chunk[0], 0])
403 }
404 })
405 .collect();
406
407 let base_10_000_digit_count = raw_u16[0];
408 let weight = raw_u16[1] as i16;
409 let sign = raw_u16[2];
410 let scale = raw_u16[3];
411
412 let mut base_10_000_digits = Vec::new();
413 for i in 4..4 + base_10_000_digit_count {
414 base_10_000_digits.push(raw_u16[i as usize]);
415 }
416
417 let mut u8_digits = Vec::new();
418 for &base_10_000_digit in base_10_000_digits.iter().rev() {
419 let mut base_10_000_digit = base_10_000_digit;
420 let mut temp_result = Vec::new();
421 while base_10_000_digit > 0 {
422 temp_result.push((base_10_000_digit % 10) as u8);
423 base_10_000_digit /= 10;
424 }
425 while temp_result.len() < 4 {
426 temp_result.push(0);
427 }
428 u8_digits.extend(temp_result);
429 }
430 u8_digits.reverse();
431
432 let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
433 let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
434 u8_digits.resize(size as usize, 0);
435
436 let sign = match sign {
437 0x4000 => Sign::Minus,
438 0x0000 => Sign::Plus,
439 _ => {
440 return Err(Box::new(DataFusionError::Execution(
441 "Failed to parse big decimal from postgres numeric value".to_string(),
442 )));
443 }
444 };
445
446 let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
447 return Err(Box::new(DataFusionError::Execution(
448 "Failed to parse big decimal from postgres numeric value".to_string(),
449 )));
450 };
451 Ok(BigDecimalFromSql {
452 inner: BigDecimal::new(digits, i64::from(scale)),
453 scale,
454 })
455 }
456
457 fn accepts(ty: &Type) -> bool {
458 matches!(*ty, Type::NUMERIC)
459 }
460}
461
462#[derive(Debug)]
465struct IntervalFromSql {
466 time: i64,
467 day: i32,
468 month: i32,
469}
470
471impl<'a> FromSql<'a> for IntervalFromSql {
472 fn from_sql(
473 _ty: &Type,
474 raw: &'a [u8],
475 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
476 let mut cursor = std::io::Cursor::new(raw);
477
478 let time = cursor.read_i64::<BigEndian>()?;
479 let day = cursor.read_i32::<BigEndian>()?;
480 let month = cursor.read_i32::<BigEndian>()?;
481
482 Ok(IntervalFromSql { time, day, month })
483 }
484
485 fn accepts(ty: &Type) -> bool {
486 matches!(*ty, Type::INTERVAL)
487 }
488}
489
490pub struct GeometryFromSql<'a> {
491 wkb: &'a [u8],
492}
493
494impl<'a> FromSql<'a> for GeometryFromSql<'a> {
495 fn from_sql(
496 _ty: &Type,
497 raw: &'a [u8],
498 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
499 Ok(GeometryFromSql { wkb: raw })
500 }
501
502 fn accepts(ty: &Type) -> bool {
503 matches!(ty.name(), "geometry")
504 }
505}
506
507fn rows_to_batch(
508 rows: &[Row],
509 remote_schema: &RemoteSchema,
510 arrow_schema: SchemaRef,
511 projection: Option<&Vec<usize>>,
512) -> DFResult<RecordBatch> {
513 let projected_schema = project_schema(&arrow_schema, projection)?;
514 let mut array_builders = vec![];
515 for field in arrow_schema.fields() {
516 let builder = make_builder(field.data_type(), rows.len());
517 array_builders.push(builder);
518 }
519 for row in rows {
520 for (idx, field) in remote_schema.fields.iter().enumerate() {
521 if !projections_contains(projection, idx) {
522 continue;
523 }
524 let builder = &mut array_builders[idx];
525 match field.remote_type {
526 RemoteType::Postgres(PostgresType::Int2) => {
527 handle_primitive_type!(builder, field, Int16Builder, i16, row, idx);
528 }
529 RemoteType::Postgres(PostgresType::Int4) => {
530 handle_primitive_type!(builder, field, Int32Builder, i32, row, idx);
531 }
532 RemoteType::Postgres(PostgresType::Int8) => {
533 handle_primitive_type!(builder, field, Int64Builder, i64, row, idx);
534 }
535 RemoteType::Postgres(PostgresType::Float4) => {
536 handle_primitive_type!(builder, field, Float32Builder, f32, row, idx);
537 }
538 RemoteType::Postgres(PostgresType::Float8) => {
539 handle_primitive_type!(builder, field, Float64Builder, f64, row, idx);
540 }
541 RemoteType::Postgres(PostgresType::Numeric(_scale)) => {
542 let builder = builder
543 .as_any_mut()
544 .downcast_mut::<Decimal128Builder>()
545 .unwrap_or_else(|| {
546 panic!("Failed to downcast builder to Decimal128Builder for {field:?}")
547 });
548 let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
549 DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
550 })?;
551
552 match v {
553 Some(v) => {
554 let Some(v) = v.to_decimal_128() else {
555 return Err(DataFusionError::Execution(format!(
556 "Failed to convert BigDecimal {:?} to i128",
557 v
558 )));
559 };
560 builder.append_value(v)
561 }
562 None => builder.append_null(),
563 }
564 }
565 RemoteType::Postgres(PostgresType::Varchar)
566 | RemoteType::Postgres(PostgresType::Text) => {
567 handle_primitive_type!(builder, field, StringBuilder, &str, row, idx);
568 }
569 RemoteType::Postgres(PostgresType::Bpchar) => {
570 let builder = builder
571 .as_any_mut()
572 .downcast_mut::<StringBuilder>()
573 .unwrap_or_else(|| {
574 panic!("Failed to downcast builder to StringBuilder for {field:?}")
575 });
576 let v: Option<&str> = row.try_get(idx).unwrap_or_else(|e| {
577 panic!("Failed to get &str value for {field:?}: {e:?}")
578 });
579
580 match v {
581 Some(v) => builder.append_value(v.trim_end()),
582 None => builder.append_null(),
583 }
584 }
585 RemoteType::Postgres(PostgresType::Bytea) => {
586 handle_primitive_type!(builder, field, BinaryBuilder, Vec<u8>, row, idx);
587 }
588 RemoteType::Postgres(PostgresType::Timestamp) => {
589 let builder = builder
590 .as_any_mut()
591 .downcast_mut::<TimestampNanosecondBuilder>()
592 .unwrap_or_else(|| panic!("Failed to downcast builder to TimestampNanosecondBuilder for {field:?}"));
593 let v: Option<SystemTime> = row.try_get(idx).unwrap_or_else(|e| {
594 panic!("Failed to get SystemTime value for {field:?}: {e:?}")
595 });
596
597 match v {
598 Some(v) => {
599 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
600 let timestamp: i64 = v
601 .as_nanos()
602 .try_into()
603 .expect("Failed to convert SystemTime to i64");
604 builder.append_value(timestamp);
605 } else {
606 return Err(DataFusionError::Execution(format!(
607 "Failed to convert SystemTime {v:?} to i64 for {field:?}"
608 )));
609 }
610 }
611 None => builder.append_null(),
612 }
613 }
614 RemoteType::Postgres(PostgresType::TimestampTz) => {
615 let builder = builder
616 .as_any_mut()
617 .downcast_mut::<TimestampNanosecondBuilder>()
618 .unwrap_or_else(|| {
619 panic!("Failed to downcast builder to TimestampTz for {field:?}")
620 });
621 let v: Option<chrono::DateTime<chrono::Utc>> = row.try_get(idx).unwrap_or_else(|e| panic!("Failed to get chrono::DateTime<chrono::Utc> value for {field:?}: {e:?}"));
622
623 match v {
624 Some(v) => {
625 let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for Type::TIMESTAMP"));
626 builder.append_value(timestamp);
627 }
628 None => builder.append_null(),
629 }
630 }
631 RemoteType::Postgres(PostgresType::Time) => {
632 let builder = builder
633 .as_any_mut()
634 .downcast_mut::<Time64NanosecondBuilder>()
635 .expect(
636 "Failed to downcast builder to Time64NanosecondBuilder for Type::TIME",
637 );
638 let v: Option<chrono::NaiveTime> = row
639 .try_get(idx)
640 .expect("Failed to get chrono::NaiveTime value for column Type::TIME");
641
642 match v {
643 Some(v) => {
644 let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
645 * 1_000_000_000
646 + i64::from(v.nanosecond());
647 builder.append_value(timestamp);
648 }
649 None => builder.append_null(),
650 }
651 }
652 RemoteType::Postgres(PostgresType::Date) => {
653 let builder = builder
654 .as_any_mut()
655 .downcast_mut::<Date32Builder>()
656 .expect("Failed to downcast builder to Date32Builder for Type::DATE");
657 let v: Option<chrono::NaiveDate> = row
658 .try_get(idx)
659 .expect("Failed to get chrono::NaiveDate value for column Type::DATE");
660
661 match v {
662 Some(v) => builder.append_value(Date32Type::from_naive_date(v)),
663 None => builder.append_null(),
664 }
665 }
666 RemoteType::Postgres(PostgresType::Interval) => {
667 let builder = builder
668 .as_any_mut()
669 .downcast_mut::<IntervalMonthDayNanoBuilder>()
670 .expect("Failed to downcast builder to IntervalMonthDayNanoBuilder for Type::INTERVAL");
671
672 let v: Option<IntervalFromSql> = row
673 .try_get(idx)
674 .expect("Failed to get IntervalFromSql value for column Type::INTERVAL");
675
676 match v {
677 Some(v) => {
678 let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
679 v.month,
680 v.day,
681 v.time * 1_000,
682 );
683 builder.append_value(interval_month_day_nano);
684 }
685 None => builder.append_null(),
686 }
687 }
688 RemoteType::Postgres(PostgresType::Bool) => {
689 handle_primitive_type!(builder, field, BooleanBuilder, bool, row, idx);
690 }
691 RemoteType::Postgres(PostgresType::Json)
692 | RemoteType::Postgres(PostgresType::Jsonb) => {
693 let builder = builder
694 .as_any_mut()
695 .downcast_mut::<LargeStringBuilder>()
696 .unwrap_or_else(|| {
697 panic!("Failed to downcast builder to LargeStringBuilder for {field:?}")
698 });
699 let v: Option<serde_json::value::Value> =
700 row.try_get(idx).unwrap_or_else(|e| {
701 panic!(
702 "Failed to get serde_json::value::Value value for {field:?}: {e:?}"
703 )
704 });
705
706 match v {
707 Some(v) => builder.append_value(v.to_string()),
708 None => builder.append_null(),
709 }
710 }
711 RemoteType::Postgres(PostgresType::Int2Array) => {
712 handle_primitive_array_type!(builder, field, Int16Builder, i16, row, idx);
713 }
714 RemoteType::Postgres(PostgresType::Int4Array) => {
715 handle_primitive_array_type!(builder, field, Int32Builder, i32, row, idx);
716 }
717 RemoteType::Postgres(PostgresType::Int8Array) => {
718 handle_primitive_array_type!(builder, field, Int64Builder, i64, row, idx);
719 }
720 RemoteType::Postgres(PostgresType::Float4Array) => {
721 handle_primitive_array_type!(builder, field, Float32Builder, f32, row, idx);
722 }
723 RemoteType::Postgres(PostgresType::Float8Array) => {
724 handle_primitive_array_type!(builder, field, Float64Builder, f64, row, idx);
725 }
726 RemoteType::Postgres(PostgresType::VarcharArray)
727 | RemoteType::Postgres(PostgresType::BpcharArray)
728 | RemoteType::Postgres(PostgresType::TextArray) => {
729 handle_primitive_array_type!(builder, field, StringBuilder, &str, row, idx);
730 }
731 RemoteType::Postgres(PostgresType::ByteaArray) => {
732 handle_primitive_array_type!(builder, field, BinaryBuilder, Vec<u8>, row, idx);
733 }
734 RemoteType::Postgres(PostgresType::BoolArray) => {
735 handle_primitive_array_type!(builder, field, BooleanBuilder, bool, row, idx);
736 }
737
738 RemoteType::Postgres(PostgresType::PostGisGeometry) => {
739 let builder = builder
740 .as_any_mut()
741 .downcast_mut::<BinaryBuilder>()
742 .expect("Failed to downcast builder to BinaryBuilder for Type::geometry");
743 let v: Option<GeometryFromSql> = row
744 .try_get(idx)
745 .expect("Failed to get GeometryFromSql value for column Type::geometry");
746
747 match v {
748 Some(v) => builder.append_value(v.wkb),
749 None => builder.append_null(),
750 }
751 }
752 _ => {
753 return Err(DataFusionError::Execution(format!(
754 "Unsupported postgres type {field:?}",
755 )));
756 }
757 }
758 }
759 }
760 let projected_columns = array_builders
761 .into_iter()
762 .enumerate()
763 .filter(|(idx, _)| projections_contains(projection, *idx))
764 .map(|(_, mut builder)| builder.finish())
765 .collect::<Vec<ArrayRef>>();
766 Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
767}