datafusion_remote_table/connection/
postgres.rs

1use crate::connection::{big_decimal_to_i128, projections_contains};
2use crate::transform::transform_batch;
3use crate::{
4    project_remote_schema, Connection, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
5    RemoteType, Transform,
6};
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bb8_postgres::PostgresConnectionManager;
10use bigdecimal::BigDecimal;
11use byteorder::{BigEndian, ReadBytesExt};
12use chrono::Timelike;
13use datafusion::arrow::array::{
14    make_builder, ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder,
15    Decimal128Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
16    IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
17    Time64NanosecondBuilder, TimestampNanosecondBuilder,
18};
19use datafusion::arrow::datatypes::{Date32Type, IntervalMonthDayNanoType, SchemaRef};
20use datafusion::common::project_schema;
21use datafusion::error::DataFusionError;
22use datafusion::execution::SendableRecordBatchStream;
23use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
24use futures::StreamExt;
25use num_bigint::{BigInt, Sign};
26use std::string::ToString;
27use std::sync::Arc;
28use std::time::{SystemTime, UNIX_EPOCH};
29
30#[derive(Debug, Clone, derive_with::With)]
31pub struct PostgresConnectionOptions {
32    pub(crate) host: String,
33    pub(crate) port: u16,
34    pub(crate) username: String,
35    pub(crate) password: String,
36    pub(crate) database: Option<String>,
37}
38
39impl PostgresConnectionOptions {
40    pub fn new(
41        host: impl Into<String>,
42        port: u16,
43        username: impl Into<String>,
44        password: impl Into<String>,
45    ) -> Self {
46        Self {
47            host: host.into(),
48            port,
49            username: username.into(),
50            password: password.into(),
51            database: None,
52        }
53    }
54}
55
56#[derive(Debug)]
57pub struct PostgresPool {
58    pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
59}
60
61#[async_trait::async_trait]
62impl Pool for PostgresPool {
63    async fn get(&self) -> DFResult<Arc<dyn Connection>> {
64        let conn = self.pool.get_owned().await.map_err(|e| {
65            DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
66        })?;
67        Ok(Arc::new(PostgresConnection { conn }))
68    }
69}
70
71pub(crate) async fn connect_postgres(
72    options: &PostgresConnectionOptions,
73) -> DFResult<PostgresPool> {
74    let mut config = bb8_postgres::tokio_postgres::config::Config::new();
75    config
76        .host(&options.host)
77        .port(options.port)
78        .user(&options.username)
79        .password(&options.password);
80    if let Some(database) = &options.database {
81        config.dbname(database);
82    }
83    let manager = PostgresConnectionManager::new(config, NoTls);
84    let pool = bb8::Pool::builder()
85        .max_size(5)
86        .build(manager)
87        .await
88        .map_err(|e| {
89            DataFusionError::Execution(format!(
90                "Failed to create postgres connection pool due to {e}",
91            ))
92        })?;
93
94    Ok(PostgresPool { pool })
95}
96
97#[derive(Debug)]
98pub(crate) struct PostgresConnection {
99    conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
100}
101
102#[async_trait::async_trait]
103impl Connection for PostgresConnection {
104    async fn infer_schema(
105        &self,
106        sql: &str,
107        transform: Option<Arc<dyn Transform>>,
108    ) -> DFResult<(RemoteSchema, SchemaRef)> {
109        let mut stream = self
110            .conn
111            .query_raw(sql, Vec::<String>::new())
112            .await
113            .map_err(|e| {
114                DataFusionError::Execution(format!(
115                    "Failed to execute query {sql} on postgres due to {e}",
116                ))
117            })?
118            .chunks(1)
119            .boxed();
120
121        let Some(first_chunk) = stream.next().await else {
122            return Err(DataFusionError::Execution(
123                "No data returned from postgres".to_string(),
124            ));
125        };
126        let first_chunk: Vec<Row> = first_chunk
127            .into_iter()
128            .collect::<Result<Vec<_>, _>>()
129            .map_err(|e| {
130                DataFusionError::Execution(format!(
131                    "Failed to collect rows from postgres due to {e}",
132                ))
133            })?;
134        let Some(first_row) = first_chunk.first() else {
135            return Err(DataFusionError::Execution(
136                "No data returned from postgres".to_string(),
137            ));
138        };
139        let remote_schema = build_remote_schema(first_row)?;
140        let arrow_schema = Arc::new(remote_schema.to_arrow_schema());
141        if let Some(transform) = transform {
142            let batch = rows_to_batch(
143                std::slice::from_ref(first_row),
144                &remote_schema,
145                arrow_schema,
146                None,
147            )?;
148            let transformed_batch = transform_batch(batch, transform.as_ref(), &remote_schema)?;
149            Ok((remote_schema, transformed_batch.schema()))
150        } else {
151            Ok((remote_schema, arrow_schema))
152        }
153    }
154
155    async fn query(
156        &self,
157        sql: String,
158        projection: Option<Vec<usize>>,
159    ) -> DFResult<(SendableRecordBatchStream, RemoteSchema)> {
160        let mut stream = self
161            .conn
162            .query_raw(&sql, Vec::<String>::new())
163            .await
164            .map_err(|e| {
165                DataFusionError::Execution(format!(
166                    "Failed to execute query {sql} on postgres due to {e}",
167                ))
168            })?
169            .chunks(2048)
170            .boxed();
171
172        let Some(first_chunk) = stream.next().await else {
173            return Err(DataFusionError::Execution(
174                "No data returned from postgres".to_string(),
175            ));
176        };
177        let first_chunk: Vec<Row> = first_chunk
178            .into_iter()
179            .collect::<Result<Vec<_>, _>>()
180            .map_err(|e| {
181                DataFusionError::Execution(format!(
182                    "Failed to collect rows from postgres due to {e}",
183                ))
184            })?;
185        let Some(first_row) = first_chunk.first() else {
186            return Err(DataFusionError::Execution(
187                "No data returned from postgres".to_string(),
188            ));
189        };
190        let remote_schema = build_remote_schema(first_row)?;
191        let projected_remote_schema = project_remote_schema(&remote_schema, projection.as_ref());
192        let arrow_schema = Arc::new(remote_schema.to_arrow_schema());
193        let first_chunk = rows_to_batch(
194            first_chunk.as_slice(),
195            &remote_schema,
196            arrow_schema.clone(),
197            projection.as_ref(),
198        )?;
199        let schema = first_chunk.schema();
200
201        let mut stream = stream.map(move |rows| {
202            let rows: Vec<Row> = rows
203                .into_iter()
204                .collect::<Result<Vec<_>, _>>()
205                .map_err(|e| {
206                    DataFusionError::Execution(format!(
207                        "Failed to collect rows from postgres due to {e}",
208                    ))
209                })?;
210            let batch = rows_to_batch(
211                rows.as_slice(),
212                &remote_schema,
213                arrow_schema.clone(),
214                projection.as_ref(),
215            )?;
216            Ok::<RecordBatch, DataFusionError>(batch)
217        });
218
219        let output_stream = async_stream::stream! {
220           yield Ok(first_chunk);
221           while let Some(batch) = stream.next().await {
222                yield batch
223           }
224        };
225
226        Ok((
227            Box::pin(RecordBatchStreamAdapter::new(schema, output_stream)),
228            projected_remote_schema,
229        ))
230    }
231}
232
233fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<RemoteType> {
234    match pg_type {
235        &Type::INT2 => Ok(RemoteType::Postgres(PostgresType::Int2)),
236        &Type::INT4 => Ok(RemoteType::Postgres(PostgresType::Int4)),
237        &Type::INT8 => Ok(RemoteType::Postgres(PostgresType::Int8)),
238        &Type::FLOAT4 => Ok(RemoteType::Postgres(PostgresType::Float4)),
239        &Type::FLOAT8 => Ok(RemoteType::Postgres(PostgresType::Float8)),
240        &Type::NUMERIC => {
241            let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
242                DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
243            })?;
244            let scale = match v {
245                Some(v) => v.scale,
246                None => 0,
247            };
248            assert!((scale as u32) <= (i8::MAX as u32));
249            Ok(RemoteType::Postgres(PostgresType::Numeric(
250                scale.try_into().unwrap_or_default(),
251            )))
252        }
253        &Type::VARCHAR => Ok(RemoteType::Postgres(PostgresType::Varchar)),
254        &Type::BPCHAR => Ok(RemoteType::Postgres(PostgresType::Bpchar)),
255        &Type::TEXT => Ok(RemoteType::Postgres(PostgresType::Text)),
256        &Type::BYTEA => Ok(RemoteType::Postgres(PostgresType::Bytea)),
257        &Type::DATE => Ok(RemoteType::Postgres(PostgresType::Date)),
258        &Type::TIMESTAMP => Ok(RemoteType::Postgres(PostgresType::Timestamp)),
259        &Type::TIMESTAMPTZ => Ok(RemoteType::Postgres(PostgresType::TimestampTz)),
260        &Type::TIME => Ok(RemoteType::Postgres(PostgresType::Time)),
261        &Type::INTERVAL => Ok(RemoteType::Postgres(PostgresType::Interval)),
262        &Type::BOOL => Ok(RemoteType::Postgres(PostgresType::Bool)),
263        &Type::JSON => Ok(RemoteType::Postgres(PostgresType::Json)),
264        &Type::JSONB => Ok(RemoteType::Postgres(PostgresType::Jsonb)),
265        &Type::INT2_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int2Array)),
266        &Type::INT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int4Array)),
267        &Type::INT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int8Array)),
268        &Type::FLOAT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float4Array)),
269        &Type::FLOAT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float8Array)),
270        &Type::VARCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::VarcharArray)),
271        &Type::BPCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::BpcharArray)),
272        &Type::TEXT_ARRAY => Ok(RemoteType::Postgres(PostgresType::TextArray)),
273        &Type::BYTEA_ARRAY => Ok(RemoteType::Postgres(PostgresType::ByteaArray)),
274        &Type::BOOL_ARRAY => Ok(RemoteType::Postgres(PostgresType::BoolArray)),
275        other if other.name().eq_ignore_ascii_case("geometry") => {
276            Ok(RemoteType::Postgres(PostgresType::PostGisGeometry))
277        }
278        _ => Err(DataFusionError::NotImplemented(format!(
279            "Unsupported postgres type {pg_type:?}",
280        ))),
281    }
282}
283
284fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
285    let mut remote_fields = vec![];
286    for (idx, col) in row.columns().iter().enumerate() {
287        remote_fields.push(RemoteField::new(
288            col.name(),
289            pg_type_to_remote_type(col.type_(), row, idx)?,
290            true,
291        ));
292    }
293    Ok(RemoteSchema::new(remote_fields))
294}
295
296macro_rules! handle_primitive_type {
297    ($builder:expr, $field:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr) => {{
298        let builder = $builder
299            .as_any_mut()
300            .downcast_mut::<$builder_ty>()
301            .unwrap_or_else(|| {
302                panic!(
303                    concat!(
304                        "Failed to downcast builder to ",
305                        stringify!($builder_ty),
306                        " for {:?}"
307                    ),
308                    $field
309                )
310            });
311        let v: Option<$value_ty> = $row.try_get($index).unwrap_or_else(|e| {
312            panic!(
313                concat!(
314                    "Failed to get ",
315                    stringify!($value_ty),
316                    " value for {:?}: {:?}"
317                ),
318                $field, e
319            )
320        });
321
322        match v {
323            Some(v) => builder.append_value(v),
324            None => builder.append_null(),
325        }
326    }};
327}
328
329macro_rules! handle_primitive_array_type {
330    ($builder:expr, $field:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
331        let builder = $builder
332            .as_any_mut()
333            .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
334            .unwrap_or_else(|| {
335                panic!(
336                    "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?}",
337                    $field
338                )
339            });
340        let values_builder = builder
341            .values()
342            .as_any_mut()
343            .downcast_mut::<$values_builder_ty>()
344            .unwrap_or_else(|| {
345                panic!(
346                    concat!(
347                        "Failed to downcast values builder to ",
348                        stringify!($builder_ty),
349                        " for {:?}"
350                    ),
351                    $field
352                )
353            });
354        let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).unwrap_or_else(|e| {
355            panic!(
356                concat!(
357                    "Failed to get ",
358                    stringify!($value_ty),
359                    " array value for {:?}: {:?}"
360                ),
361                $field, e
362            )
363        });
364
365        match v {
366            Some(v) => {
367                let v = v.into_iter().map(Some);
368                values_builder.extend(v);
369                builder.append(true);
370            }
371            None => builder.append_null(),
372        }
373    }};
374}
375
376#[derive(Debug)]
377struct BigDecimalFromSql {
378    inner: BigDecimal,
379    scale: u16,
380}
381
382impl BigDecimalFromSql {
383    fn to_decimal_128(&self) -> Option<i128> {
384        big_decimal_to_i128(&self.inner, Some(self.scale as u32))
385    }
386}
387
388#[allow(clippy::cast_sign_loss)]
389#[allow(clippy::cast_possible_wrap)]
390#[allow(clippy::cast_possible_truncation)]
391impl<'a> FromSql<'a> for BigDecimalFromSql {
392    fn from_sql(
393        _ty: &Type,
394        raw: &'a [u8],
395    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
396        let raw_u16: Vec<u16> = raw
397            .chunks(2)
398            .map(|chunk| {
399                if chunk.len() == 2 {
400                    u16::from_be_bytes([chunk[0], chunk[1]])
401                } else {
402                    u16::from_be_bytes([chunk[0], 0])
403                }
404            })
405            .collect();
406
407        let base_10_000_digit_count = raw_u16[0];
408        let weight = raw_u16[1] as i16;
409        let sign = raw_u16[2];
410        let scale = raw_u16[3];
411
412        let mut base_10_000_digits = Vec::new();
413        for i in 4..4 + base_10_000_digit_count {
414            base_10_000_digits.push(raw_u16[i as usize]);
415        }
416
417        let mut u8_digits = Vec::new();
418        for &base_10_000_digit in base_10_000_digits.iter().rev() {
419            let mut base_10_000_digit = base_10_000_digit;
420            let mut temp_result = Vec::new();
421            while base_10_000_digit > 0 {
422                temp_result.push((base_10_000_digit % 10) as u8);
423                base_10_000_digit /= 10;
424            }
425            while temp_result.len() < 4 {
426                temp_result.push(0);
427            }
428            u8_digits.extend(temp_result);
429        }
430        u8_digits.reverse();
431
432        let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
433        let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
434        u8_digits.resize(size as usize, 0);
435
436        let sign = match sign {
437            0x4000 => Sign::Minus,
438            0x0000 => Sign::Plus,
439            _ => {
440                return Err(Box::new(DataFusionError::Execution(
441                    "Failed to parse big decimal from postgres numeric value".to_string(),
442                )));
443            }
444        };
445
446        let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
447            return Err(Box::new(DataFusionError::Execution(
448                "Failed to parse big decimal from postgres numeric value".to_string(),
449            )));
450        };
451        Ok(BigDecimalFromSql {
452            inner: BigDecimal::new(digits, i64::from(scale)),
453            scale,
454        })
455    }
456
457    fn accepts(ty: &Type) -> bool {
458        matches!(*ty, Type::NUMERIC)
459    }
460}
461
462// interval_send - Postgres C (https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/timestamp.c#L1032)
463// interval values are internally stored as three integral fields: months, days, and microseconds
464#[derive(Debug)]
465struct IntervalFromSql {
466    time: i64,
467    day: i32,
468    month: i32,
469}
470
471impl<'a> FromSql<'a> for IntervalFromSql {
472    fn from_sql(
473        _ty: &Type,
474        raw: &'a [u8],
475    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
476        let mut cursor = std::io::Cursor::new(raw);
477
478        let time = cursor.read_i64::<BigEndian>()?;
479        let day = cursor.read_i32::<BigEndian>()?;
480        let month = cursor.read_i32::<BigEndian>()?;
481
482        Ok(IntervalFromSql { time, day, month })
483    }
484
485    fn accepts(ty: &Type) -> bool {
486        matches!(*ty, Type::INTERVAL)
487    }
488}
489
490pub struct GeometryFromSql<'a> {
491    wkb: &'a [u8],
492}
493
494impl<'a> FromSql<'a> for GeometryFromSql<'a> {
495    fn from_sql(
496        _ty: &Type,
497        raw: &'a [u8],
498    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
499        Ok(GeometryFromSql { wkb: raw })
500    }
501
502    fn accepts(ty: &Type) -> bool {
503        matches!(ty.name(), "geometry")
504    }
505}
506
507fn rows_to_batch(
508    rows: &[Row],
509    remote_schema: &RemoteSchema,
510    arrow_schema: SchemaRef,
511    projection: Option<&Vec<usize>>,
512) -> DFResult<RecordBatch> {
513    let projected_schema = project_schema(&arrow_schema, projection)?;
514    let mut array_builders = vec![];
515    for field in arrow_schema.fields() {
516        let builder = make_builder(field.data_type(), rows.len());
517        array_builders.push(builder);
518    }
519    for row in rows {
520        for (idx, field) in remote_schema.fields.iter().enumerate() {
521            if !projections_contains(projection, idx) {
522                continue;
523            }
524            let builder = &mut array_builders[idx];
525            match field.remote_type {
526                RemoteType::Postgres(PostgresType::Int2) => {
527                    handle_primitive_type!(builder, field, Int16Builder, i16, row, idx);
528                }
529                RemoteType::Postgres(PostgresType::Int4) => {
530                    handle_primitive_type!(builder, field, Int32Builder, i32, row, idx);
531                }
532                RemoteType::Postgres(PostgresType::Int8) => {
533                    handle_primitive_type!(builder, field, Int64Builder, i64, row, idx);
534                }
535                RemoteType::Postgres(PostgresType::Float4) => {
536                    handle_primitive_type!(builder, field, Float32Builder, f32, row, idx);
537                }
538                RemoteType::Postgres(PostgresType::Float8) => {
539                    handle_primitive_type!(builder, field, Float64Builder, f64, row, idx);
540                }
541                RemoteType::Postgres(PostgresType::Numeric(_scale)) => {
542                    let builder = builder
543                        .as_any_mut()
544                        .downcast_mut::<Decimal128Builder>()
545                        .unwrap_or_else(|| {
546                            panic!("Failed to downcast builder to Decimal128Builder for {field:?}")
547                        });
548                    let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
549                        DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
550                    })?;
551
552                    match v {
553                        Some(v) => {
554                            let Some(v) = v.to_decimal_128() else {
555                                return Err(DataFusionError::Execution(format!(
556                                    "Failed to convert BigDecimal {:?} to i128",
557                                    v
558                                )));
559                            };
560                            builder.append_value(v)
561                        }
562                        None => builder.append_null(),
563                    }
564                }
565                RemoteType::Postgres(PostgresType::Varchar)
566                | RemoteType::Postgres(PostgresType::Text) => {
567                    handle_primitive_type!(builder, field, StringBuilder, &str, row, idx);
568                }
569                RemoteType::Postgres(PostgresType::Bpchar) => {
570                    let builder = builder
571                        .as_any_mut()
572                        .downcast_mut::<StringBuilder>()
573                        .unwrap_or_else(|| {
574                            panic!("Failed to downcast builder to StringBuilder for {field:?}")
575                        });
576                    let v: Option<&str> = row.try_get(idx).unwrap_or_else(|e| {
577                        panic!("Failed to get &str value for {field:?}: {e:?}")
578                    });
579
580                    match v {
581                        Some(v) => builder.append_value(v.trim_end()),
582                        None => builder.append_null(),
583                    }
584                }
585                RemoteType::Postgres(PostgresType::Bytea) => {
586                    handle_primitive_type!(builder, field, BinaryBuilder, Vec<u8>, row, idx);
587                }
588                RemoteType::Postgres(PostgresType::Timestamp) => {
589                    let builder = builder
590                        .as_any_mut()
591                        .downcast_mut::<TimestampNanosecondBuilder>()
592                        .unwrap_or_else(|| panic!("Failed to downcast builder to TimestampNanosecondBuilder for {field:?}"));
593                    let v: Option<SystemTime> = row.try_get(idx).unwrap_or_else(|e| {
594                        panic!("Failed to get SystemTime value for {field:?}: {e:?}")
595                    });
596
597                    match v {
598                        Some(v) => {
599                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
600                                let timestamp: i64 = v
601                                    .as_nanos()
602                                    .try_into()
603                                    .expect("Failed to convert SystemTime to i64");
604                                builder.append_value(timestamp);
605                            } else {
606                                return Err(DataFusionError::Execution(format!(
607                                    "Failed to convert SystemTime {v:?} to i64 for {field:?}"
608                                )));
609                            }
610                        }
611                        None => builder.append_null(),
612                    }
613                }
614                RemoteType::Postgres(PostgresType::TimestampTz) => {
615                    let builder = builder
616                        .as_any_mut()
617                        .downcast_mut::<TimestampNanosecondBuilder>()
618                        .unwrap_or_else(|| {
619                            panic!("Failed to downcast builder to TimestampTz for {field:?}")
620                        });
621                    let v: Option<chrono::DateTime<chrono::Utc>> = row.try_get(idx).unwrap_or_else(|e| panic!("Failed to get chrono::DateTime<chrono::Utc> value for {field:?}: {e:?}"));
622
623                    match v {
624                        Some(v) => {
625                            let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for Type::TIMESTAMP"));
626                            builder.append_value(timestamp);
627                        }
628                        None => builder.append_null(),
629                    }
630                }
631                RemoteType::Postgres(PostgresType::Time) => {
632                    let builder = builder
633                        .as_any_mut()
634                        .downcast_mut::<Time64NanosecondBuilder>()
635                        .expect(
636                            "Failed to downcast builder to Time64NanosecondBuilder for Type::TIME",
637                        );
638                    let v: Option<chrono::NaiveTime> = row
639                        .try_get(idx)
640                        .expect("Failed to get chrono::NaiveTime value for column Type::TIME");
641
642                    match v {
643                        Some(v) => {
644                            let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
645                                * 1_000_000_000
646                                + i64::from(v.nanosecond());
647                            builder.append_value(timestamp);
648                        }
649                        None => builder.append_null(),
650                    }
651                }
652                RemoteType::Postgres(PostgresType::Date) => {
653                    let builder = builder
654                        .as_any_mut()
655                        .downcast_mut::<Date32Builder>()
656                        .expect("Failed to downcast builder to Date32Builder for Type::DATE");
657                    let v: Option<chrono::NaiveDate> = row
658                        .try_get(idx)
659                        .expect("Failed to get chrono::NaiveDate value for column Type::DATE");
660
661                    match v {
662                        Some(v) => builder.append_value(Date32Type::from_naive_date(v)),
663                        None => builder.append_null(),
664                    }
665                }
666                RemoteType::Postgres(PostgresType::Interval) => {
667                    let builder = builder
668                        .as_any_mut()
669                        .downcast_mut::<IntervalMonthDayNanoBuilder>()
670                        .expect("Failed to downcast builder to IntervalMonthDayNanoBuilder for Type::INTERVAL");
671
672                    let v: Option<IntervalFromSql> = row
673                        .try_get(idx)
674                        .expect("Failed to get IntervalFromSql value for column Type::INTERVAL");
675
676                    match v {
677                        Some(v) => {
678                            let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
679                                v.month,
680                                v.day,
681                                v.time * 1_000,
682                            );
683                            builder.append_value(interval_month_day_nano);
684                        }
685                        None => builder.append_null(),
686                    }
687                }
688                RemoteType::Postgres(PostgresType::Bool) => {
689                    handle_primitive_type!(builder, field, BooleanBuilder, bool, row, idx);
690                }
691                RemoteType::Postgres(PostgresType::Json)
692                | RemoteType::Postgres(PostgresType::Jsonb) => {
693                    let builder = builder
694                        .as_any_mut()
695                        .downcast_mut::<LargeStringBuilder>()
696                        .unwrap_or_else(|| {
697                            panic!("Failed to downcast builder to LargeStringBuilder for {field:?}")
698                        });
699                    let v: Option<serde_json::value::Value> =
700                        row.try_get(idx).unwrap_or_else(|e| {
701                            panic!(
702                                "Failed to get serde_json::value::Value value for {field:?}: {e:?}"
703                            )
704                        });
705
706                    match v {
707                        Some(v) => builder.append_value(v.to_string()),
708                        None => builder.append_null(),
709                    }
710                }
711                RemoteType::Postgres(PostgresType::Int2Array) => {
712                    handle_primitive_array_type!(builder, field, Int16Builder, i16, row, idx);
713                }
714                RemoteType::Postgres(PostgresType::Int4Array) => {
715                    handle_primitive_array_type!(builder, field, Int32Builder, i32, row, idx);
716                }
717                RemoteType::Postgres(PostgresType::Int8Array) => {
718                    handle_primitive_array_type!(builder, field, Int64Builder, i64, row, idx);
719                }
720                RemoteType::Postgres(PostgresType::Float4Array) => {
721                    handle_primitive_array_type!(builder, field, Float32Builder, f32, row, idx);
722                }
723                RemoteType::Postgres(PostgresType::Float8Array) => {
724                    handle_primitive_array_type!(builder, field, Float64Builder, f64, row, idx);
725                }
726                RemoteType::Postgres(PostgresType::VarcharArray)
727                | RemoteType::Postgres(PostgresType::BpcharArray)
728                | RemoteType::Postgres(PostgresType::TextArray) => {
729                    handle_primitive_array_type!(builder, field, StringBuilder, &str, row, idx);
730                }
731                RemoteType::Postgres(PostgresType::ByteaArray) => {
732                    handle_primitive_array_type!(builder, field, BinaryBuilder, Vec<u8>, row, idx);
733                }
734                RemoteType::Postgres(PostgresType::BoolArray) => {
735                    handle_primitive_array_type!(builder, field, BooleanBuilder, bool, row, idx);
736                }
737
738                RemoteType::Postgres(PostgresType::PostGisGeometry) => {
739                    let builder = builder
740                        .as_any_mut()
741                        .downcast_mut::<BinaryBuilder>()
742                        .expect("Failed to downcast builder to BinaryBuilder for Type::geometry");
743                    let v: Option<GeometryFromSql> = row
744                        .try_get(idx)
745                        .expect("Failed to get GeometryFromSql value for column Type::geometry");
746
747                    match v {
748                        Some(v) => builder.append_value(v.wkb),
749                        None => builder.append_null(),
750                    }
751                }
752                _ => {
753                    return Err(DataFusionError::Execution(format!(
754                        "Unsupported postgres type {field:?}",
755                    )));
756                }
757            }
758        }
759    }
760    let projected_columns = array_builders
761        .into_iter()
762        .enumerate()
763        .filter(|(idx, _)| projections_contains(projection, *idx))
764        .map(|(_, mut builder)| builder.finish())
765        .collect::<Vec<ArrayRef>>();
766    Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
767}