datafusion_remote_table/connection/
postgres.rs

1use crate::connection::{big_decimal_to_i128, projections_contains};
2use crate::transform::transform_batch;
3use crate::{
4    Connection, DFResult, Pool, PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef,
5    RemoteType, Transform,
6};
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bb8_postgres::PostgresConnectionManager;
10use bigdecimal::BigDecimal;
11use byteorder::{BigEndian, ReadBytesExt};
12use chrono::Timelike;
13use datafusion::arrow::array::{
14    make_builder, ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder,
15    Decimal128Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
16    IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
17    Time64NanosecondBuilder, TimestampNanosecondBuilder,
18};
19use datafusion::arrow::datatypes::{
20    DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use futures::StreamExt;
27use num_bigint::{BigInt, Sign};
28use std::string::ToString;
29use std::sync::Arc;
30use std::time::{SystemTime, UNIX_EPOCH};
31
32#[derive(Debug, Clone, derive_with::With)]
33pub struct PostgresConnectionOptions {
34    pub(crate) host: String,
35    pub(crate) port: u16,
36    pub(crate) username: String,
37    pub(crate) password: String,
38    pub(crate) database: Option<String>,
39}
40
41impl PostgresConnectionOptions {
42    pub fn new(
43        host: impl Into<String>,
44        port: u16,
45        username: impl Into<String>,
46        password: impl Into<String>,
47    ) -> Self {
48        Self {
49            host: host.into(),
50            port,
51            username: username.into(),
52            password: password.into(),
53            database: None,
54        }
55    }
56}
57
58#[derive(Debug)]
59pub struct PostgresPool {
60    pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
61}
62
63#[async_trait::async_trait]
64impl Pool for PostgresPool {
65    async fn get(&self) -> DFResult<Arc<dyn Connection>> {
66        let conn = self.pool.get_owned().await.map_err(|e| {
67            DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
68        })?;
69        Ok(Arc::new(PostgresConnection { conn }))
70    }
71}
72
73pub(crate) async fn connect_postgres(
74    options: &PostgresConnectionOptions,
75) -> DFResult<PostgresPool> {
76    let mut config = bb8_postgres::tokio_postgres::config::Config::new();
77    config
78        .host(&options.host)
79        .port(options.port)
80        .user(&options.username)
81        .password(&options.password);
82    if let Some(database) = &options.database {
83        config.dbname(database);
84    }
85    let manager = PostgresConnectionManager::new(config, NoTls);
86    let pool = bb8::Pool::builder()
87        .max_size(5)
88        .build(manager)
89        .await
90        .map_err(|e| {
91            DataFusionError::Execution(format!(
92                "Failed to create postgres connection pool due to {e}",
93            ))
94        })?;
95
96    Ok(PostgresPool { pool })
97}
98
99#[derive(Debug)]
100pub(crate) struct PostgresConnection {
101    conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
102}
103
104#[async_trait::async_trait]
105impl Connection for PostgresConnection {
106    async fn infer_schema(
107        &self,
108        sql: &str,
109        transform: Option<Arc<dyn Transform>>,
110    ) -> DFResult<(RemoteSchemaRef, SchemaRef)> {
111        let mut stream = self
112            .conn
113            .query_raw(sql, Vec::<String>::new())
114            .await
115            .map_err(|e| {
116                DataFusionError::Execution(format!(
117                    "Failed to execute query {sql} on postgres due to {e}",
118                ))
119            })?
120            .chunks(1)
121            .boxed();
122
123        let Some(first_chunk) = stream.next().await else {
124            return Err(DataFusionError::Execution(
125                "No data returned from postgres".to_string(),
126            ));
127        };
128        let first_chunk: Vec<Row> = first_chunk
129            .into_iter()
130            .collect::<Result<Vec<_>, _>>()
131            .map_err(|e| {
132                DataFusionError::Execution(format!(
133                    "Failed to collect rows from postgres due to {e}",
134                ))
135            })?;
136        let Some(first_row) = first_chunk.first() else {
137            return Err(DataFusionError::Execution(
138                "No data returned from postgres".to_string(),
139            ));
140        };
141        let remote_schema = Arc::new(build_remote_schema(first_row)?);
142        let arrow_schema = Arc::new(remote_schema.to_arrow_schema());
143        if let Some(transform) = transform {
144            let batch = rows_to_batch(std::slice::from_ref(first_row), &arrow_schema, None)?;
145            let transformed_batch = transform_batch(
146                batch,
147                transform.as_ref(),
148                &arrow_schema,
149                None,
150                Some(&remote_schema),
151            )?;
152            Ok((remote_schema, transformed_batch.schema()))
153        } else {
154            Ok((remote_schema, arrow_schema))
155        }
156    }
157
158    async fn query(
159        &self,
160        sql: String,
161        table_schema: SchemaRef,
162        projection: Option<Vec<usize>>,
163    ) -> DFResult<SendableRecordBatchStream> {
164        let projected_schema = project_schema(&table_schema, projection.as_ref())?;
165        let stream = self
166            .conn
167            .query_raw(&sql, Vec::<String>::new())
168            .await
169            .map_err(|e| {
170                DataFusionError::Execution(format!(
171                    "Failed to execute query {sql} on postgres due to {e}",
172                ))
173            })?
174            .chunks(2048)
175            .boxed();
176
177        let stream = stream.map(move |rows| {
178            let rows: Vec<Row> = rows
179                .into_iter()
180                .collect::<Result<Vec<_>, _>>()
181                .map_err(|e| {
182                    DataFusionError::Execution(format!(
183                        "Failed to collect rows from postgres due to {e}",
184                    ))
185                })?;
186            rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
187        });
188
189        Ok(Box::pin(RecordBatchStreamAdapter::new(
190            projected_schema,
191            stream,
192        )))
193    }
194}
195
196fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<RemoteType> {
197    match pg_type {
198        &Type::INT2 => Ok(RemoteType::Postgres(PostgresType::Int2)),
199        &Type::INT4 => Ok(RemoteType::Postgres(PostgresType::Int4)),
200        &Type::INT8 => Ok(RemoteType::Postgres(PostgresType::Int8)),
201        &Type::FLOAT4 => Ok(RemoteType::Postgres(PostgresType::Float4)),
202        &Type::FLOAT8 => Ok(RemoteType::Postgres(PostgresType::Float8)),
203        &Type::NUMERIC => {
204            let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
205                DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
206            })?;
207            let scale = match v {
208                Some(v) => v.scale,
209                None => 0,
210            };
211            assert!((scale as u32) <= (i8::MAX as u32));
212            Ok(RemoteType::Postgres(PostgresType::Numeric(
213                scale.try_into().unwrap_or_default(),
214            )))
215        }
216        &Type::NAME => Ok(RemoteType::Postgres(PostgresType::Name)),
217        &Type::VARCHAR => Ok(RemoteType::Postgres(PostgresType::Varchar)),
218        &Type::BPCHAR => Ok(RemoteType::Postgres(PostgresType::Bpchar)),
219        &Type::TEXT => Ok(RemoteType::Postgres(PostgresType::Text)),
220        &Type::BYTEA => Ok(RemoteType::Postgres(PostgresType::Bytea)),
221        &Type::DATE => Ok(RemoteType::Postgres(PostgresType::Date)),
222        &Type::TIMESTAMP => Ok(RemoteType::Postgres(PostgresType::Timestamp)),
223        &Type::TIMESTAMPTZ => Ok(RemoteType::Postgres(PostgresType::TimestampTz)),
224        &Type::TIME => Ok(RemoteType::Postgres(PostgresType::Time)),
225        &Type::INTERVAL => Ok(RemoteType::Postgres(PostgresType::Interval)),
226        &Type::BOOL => Ok(RemoteType::Postgres(PostgresType::Bool)),
227        &Type::JSON => Ok(RemoteType::Postgres(PostgresType::Json)),
228        &Type::JSONB => Ok(RemoteType::Postgres(PostgresType::Jsonb)),
229        &Type::INT2_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int2Array)),
230        &Type::INT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int4Array)),
231        &Type::INT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int8Array)),
232        &Type::FLOAT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float4Array)),
233        &Type::FLOAT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float8Array)),
234        &Type::VARCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::VarcharArray)),
235        &Type::BPCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::BpcharArray)),
236        &Type::TEXT_ARRAY => Ok(RemoteType::Postgres(PostgresType::TextArray)),
237        &Type::BYTEA_ARRAY => Ok(RemoteType::Postgres(PostgresType::ByteaArray)),
238        &Type::BOOL_ARRAY => Ok(RemoteType::Postgres(PostgresType::BoolArray)),
239        other if other.name().eq_ignore_ascii_case("geometry") => {
240            Ok(RemoteType::Postgres(PostgresType::PostGisGeometry))
241        }
242        _ => Err(DataFusionError::NotImplemented(format!(
243            "Unsupported postgres type {pg_type:?}",
244        ))),
245    }
246}
247
248fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
249    let mut remote_fields = vec![];
250    for (idx, col) in row.columns().iter().enumerate() {
251        remote_fields.push(RemoteField::new(
252            col.name(),
253            pg_type_to_remote_type(col.type_(), row, idx)?,
254            true,
255        ));
256    }
257    Ok(RemoteSchema::new(remote_fields))
258}
259
260macro_rules! handle_primitive_type {
261    ($builder:expr, $field:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr) => {{
262        let builder = $builder
263            .as_any_mut()
264            .downcast_mut::<$builder_ty>()
265            .unwrap_or_else(|| {
266                panic!(
267                    concat!(
268                        "Failed to downcast builder to ",
269                        stringify!($builder_ty),
270                        " for {:?}"
271                    ),
272                    $field
273                )
274            });
275        let v: Option<$value_ty> = $row.try_get($index).unwrap_or_else(|e| {
276            panic!(
277                concat!(
278                    "Failed to get ",
279                    stringify!($value_ty),
280                    " value for {:?}: {:?}"
281                ),
282                $field, e
283            )
284        });
285
286        match v {
287            Some(v) => builder.append_value(v),
288            None => builder.append_null(),
289        }
290    }};
291}
292
293macro_rules! handle_primitive_array_type {
294    ($builder:expr, $field:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
295        let builder = $builder
296            .as_any_mut()
297            .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
298            .unwrap_or_else(|| {
299                panic!(
300                    "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?}",
301                    $field
302                )
303            });
304        let values_builder = builder
305            .values()
306            .as_any_mut()
307            .downcast_mut::<$values_builder_ty>()
308            .unwrap_or_else(|| {
309                panic!(
310                    concat!(
311                        "Failed to downcast values builder to ",
312                        stringify!($builder_ty),
313                        " for {:?}"
314                    ),
315                    $field
316                )
317            });
318        let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).unwrap_or_else(|e| {
319            panic!(
320                concat!(
321                    "Failed to get ",
322                    stringify!($value_ty),
323                    " array value for {:?}: {:?}"
324                ),
325                $field, e
326            )
327        });
328
329        match v {
330            Some(v) => {
331                let v = v.into_iter().map(Some);
332                values_builder.extend(v);
333                builder.append(true);
334            }
335            None => builder.append_null(),
336        }
337    }};
338}
339
340#[derive(Debug)]
341struct BigDecimalFromSql {
342    inner: BigDecimal,
343    scale: u16,
344}
345
346impl BigDecimalFromSql {
347    fn to_decimal_128(&self) -> Option<i128> {
348        big_decimal_to_i128(&self.inner, Some(self.scale as u32))
349    }
350}
351
352#[allow(clippy::cast_sign_loss)]
353#[allow(clippy::cast_possible_wrap)]
354#[allow(clippy::cast_possible_truncation)]
355impl<'a> FromSql<'a> for BigDecimalFromSql {
356    fn from_sql(
357        _ty: &Type,
358        raw: &'a [u8],
359    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
360        let raw_u16: Vec<u16> = raw
361            .chunks(2)
362            .map(|chunk| {
363                if chunk.len() == 2 {
364                    u16::from_be_bytes([chunk[0], chunk[1]])
365                } else {
366                    u16::from_be_bytes([chunk[0], 0])
367                }
368            })
369            .collect();
370
371        let base_10_000_digit_count = raw_u16[0];
372        let weight = raw_u16[1] as i16;
373        let sign = raw_u16[2];
374        let scale = raw_u16[3];
375
376        let mut base_10_000_digits = Vec::new();
377        for i in 4..4 + base_10_000_digit_count {
378            base_10_000_digits.push(raw_u16[i as usize]);
379        }
380
381        let mut u8_digits = Vec::new();
382        for &base_10_000_digit in base_10_000_digits.iter().rev() {
383            let mut base_10_000_digit = base_10_000_digit;
384            let mut temp_result = Vec::new();
385            while base_10_000_digit > 0 {
386                temp_result.push((base_10_000_digit % 10) as u8);
387                base_10_000_digit /= 10;
388            }
389            while temp_result.len() < 4 {
390                temp_result.push(0);
391            }
392            u8_digits.extend(temp_result);
393        }
394        u8_digits.reverse();
395
396        let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
397        let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
398        u8_digits.resize(size as usize, 0);
399
400        let sign = match sign {
401            0x4000 => Sign::Minus,
402            0x0000 => Sign::Plus,
403            _ => {
404                return Err(Box::new(DataFusionError::Execution(
405                    "Failed to parse big decimal from postgres numeric value".to_string(),
406                )));
407            }
408        };
409
410        let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
411            return Err(Box::new(DataFusionError::Execution(
412                "Failed to parse big decimal from postgres numeric value".to_string(),
413            )));
414        };
415        Ok(BigDecimalFromSql {
416            inner: BigDecimal::new(digits, i64::from(scale)),
417            scale,
418        })
419    }
420
421    fn accepts(ty: &Type) -> bool {
422        matches!(*ty, Type::NUMERIC)
423    }
424}
425
426// interval_send - Postgres C (https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/timestamp.c#L1032)
427// interval values are internally stored as three integral fields: months, days, and microseconds
428#[derive(Debug)]
429struct IntervalFromSql {
430    time: i64,
431    day: i32,
432    month: i32,
433}
434
435impl<'a> FromSql<'a> for IntervalFromSql {
436    fn from_sql(
437        _ty: &Type,
438        raw: &'a [u8],
439    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
440        let mut cursor = std::io::Cursor::new(raw);
441
442        let time = cursor.read_i64::<BigEndian>()?;
443        let day = cursor.read_i32::<BigEndian>()?;
444        let month = cursor.read_i32::<BigEndian>()?;
445
446        Ok(IntervalFromSql { time, day, month })
447    }
448
449    fn accepts(ty: &Type) -> bool {
450        matches!(*ty, Type::INTERVAL)
451    }
452}
453
454pub struct GeometryFromSql<'a> {
455    wkb: &'a [u8],
456}
457
458impl<'a> FromSql<'a> for GeometryFromSql<'a> {
459    fn from_sql(
460        _ty: &Type,
461        raw: &'a [u8],
462    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
463        Ok(GeometryFromSql { wkb: raw })
464    }
465
466    fn accepts(ty: &Type) -> bool {
467        matches!(ty.name(), "geometry")
468    }
469}
470
471fn rows_to_batch(
472    rows: &[Row],
473    table_schema: &SchemaRef,
474    projection: Option<&Vec<usize>>,
475) -> DFResult<RecordBatch> {
476    let projected_schema = project_schema(table_schema, projection)?;
477    let mut array_builders = vec![];
478    for field in table_schema.fields() {
479        let builder = make_builder(field.data_type(), rows.len());
480        array_builders.push(builder);
481    }
482
483    for row in rows {
484        for (idx, field) in table_schema.fields.iter().enumerate() {
485            if !projections_contains(projection, idx) {
486                continue;
487            }
488            let builder = &mut array_builders[idx];
489            let col = row.columns().get(idx);
490            match field.data_type() {
491                DataType::Int16 => {
492                    handle_primitive_type!(builder, field, Int16Builder, i16, row, idx);
493                }
494                DataType::Int32 => {
495                    handle_primitive_type!(builder, field, Int32Builder, i32, row, idx);
496                }
497                DataType::Int64 => {
498                    handle_primitive_type!(builder, field, Int64Builder, i64, row, idx);
499                }
500                DataType::Float32 => {
501                    handle_primitive_type!(builder, field, Float32Builder, f32, row, idx);
502                }
503                DataType::Float64 => {
504                    handle_primitive_type!(builder, field, Float64Builder, f64, row, idx);
505                }
506                DataType::Decimal128(_precision, _scale) => {
507                    let builder = builder
508                        .as_any_mut()
509                        .downcast_mut::<Decimal128Builder>()
510                        .unwrap_or_else(|| {
511                            panic!("Failed to downcast builder to Decimal128Builder for {field:?}")
512                        });
513                    let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
514                        DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
515                    })?;
516
517                    match v {
518                        Some(v) => {
519                            let Some(v) = v.to_decimal_128() else {
520                                return Err(DataFusionError::Execution(format!(
521                                    "Failed to convert BigDecimal {:?} to i128",
522                                    v
523                                )));
524                            };
525                            builder.append_value(v)
526                        }
527                        None => builder.append_null(),
528                    }
529                }
530                DataType::Utf8 => {
531                    handle_primitive_type!(builder, field, StringBuilder, &str, row, idx);
532                }
533                DataType::LargeUtf8 => {
534                    if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
535                        let builder = builder
536                            .as_any_mut()
537                            .downcast_mut::<LargeStringBuilder>()
538                            .unwrap_or_else(|| {
539                                panic!("Failed to downcast builder to LargeStringBuilder for {field:?}")
540                            });
541                        let v: Option<serde_json::value::Value> =
542                            row.try_get(idx).unwrap_or_else(|e| {
543                                panic!("Failed to get serde_json::value::Value value for {field:?}: {e:?}")
544                            });
545
546                        match v {
547                            Some(v) => builder.append_value(v.to_string()),
548                            None => builder.append_null(),
549                        }
550                    } else {
551                        handle_primitive_type!(builder, field, LargeStringBuilder, &str, row, idx);
552                    }
553                }
554                DataType::Binary => {
555                    if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
556                    {
557                        let builder = builder.as_any_mut().downcast_mut::<BinaryBuilder>().expect(
558                            "Failed to downcast builder to BinaryBuilder for Type::geometry",
559                        );
560                        let v: Option<GeometryFromSql> = row.try_get(idx).expect(
561                            "Failed to get GeometryFromSql value for column Type::geometry",
562                        );
563
564                        match v {
565                            Some(v) => builder.append_value(v.wkb),
566                            None => builder.append_null(),
567                        }
568                    } else {
569                        handle_primitive_type!(builder, field, BinaryBuilder, Vec<u8>, row, idx);
570                    }
571                }
572                DataType::Timestamp(TimeUnit::Nanosecond, None) => {
573                    let builder = builder
574                                .as_any_mut()
575                                .downcast_mut::<TimestampNanosecondBuilder>()
576                                .unwrap_or_else(|| panic!("Failed to downcast builder to TimestampNanosecondBuilder for {field:?}"));
577                    let v: Option<SystemTime> = row.try_get(idx).unwrap_or_else(|e| {
578                        panic!("Failed to get SystemTime value for {field:?}: {e:?}")
579                    });
580
581                    match v {
582                        Some(v) => {
583                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
584                                let timestamp: i64 = v
585                                    .as_nanos()
586                                    .try_into()
587                                    .expect("Failed to convert SystemTime to i64");
588                                builder.append_value(timestamp);
589                            } else {
590                                return Err(DataFusionError::Execution(format!(
591                                    "Failed to convert SystemTime {v:?} to i64 for {field:?}"
592                                )));
593                            }
594                        }
595                        None => builder.append_null(),
596                    }
597                }
598                DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
599                    let builder = builder
600                        .as_any_mut()
601                        .downcast_mut::<TimestampNanosecondBuilder>()
602                        .unwrap_or_else(|| {
603                            panic!("Failed to downcast builder to TimestampTz for {field:?}")
604                        });
605                    let v: Option<chrono::DateTime<chrono::Utc>> = row.try_get(idx).unwrap_or_else(|e| panic!("Failed to get chrono::DateTime<chrono::Utc> value for {field:?}: {e:?}"));
606
607                    match v {
608                        Some(v) => {
609                            let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for Type::TIMESTAMP"));
610                            builder.append_value(timestamp);
611                        }
612                        None => builder.append_null(),
613                    }
614                }
615                DataType::Time64(TimeUnit::Nanosecond) => {
616                    let builder = builder
617                        .as_any_mut()
618                        .downcast_mut::<Time64NanosecondBuilder>()
619                        .expect(
620                            "Failed to downcast builder to Time64NanosecondBuilder for Type::TIME",
621                        );
622                    let v: Option<chrono::NaiveTime> = row
623                        .try_get(idx)
624                        .expect("Failed to get chrono::NaiveTime value for column Type::TIME");
625
626                    match v {
627                        Some(v) => {
628                            let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
629                                * 1_000_000_000
630                                + i64::from(v.nanosecond());
631                            builder.append_value(timestamp);
632                        }
633                        None => builder.append_null(),
634                    }
635                }
636                DataType::Date32 => {
637                    let builder = builder
638                        .as_any_mut()
639                        .downcast_mut::<Date32Builder>()
640                        .expect("Failed to downcast builder to Date32Builder for Type::DATE");
641                    let v: Option<chrono::NaiveDate> = row
642                        .try_get(idx)
643                        .expect("Failed to get chrono::NaiveDate value for column Type::DATE");
644
645                    match v {
646                        Some(v) => builder.append_value(Date32Type::from_naive_date(v)),
647                        None => builder.append_null(),
648                    }
649                }
650                DataType::Interval(IntervalUnit::MonthDayNano) => {
651                    let builder = builder
652                                .as_any_mut()
653                                .downcast_mut::<IntervalMonthDayNanoBuilder>()
654                                .expect("Failed to downcast builder to IntervalMonthDayNanoBuilder for Type::INTERVAL");
655
656                    let v: Option<IntervalFromSql> = row
657                        .try_get(idx)
658                        .expect("Failed to get IntervalFromSql value for column Type::INTERVAL");
659
660                    match v {
661                        Some(v) => {
662                            let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
663                                v.month,
664                                v.day,
665                                v.time * 1_000,
666                            );
667                            builder.append_value(interval_month_day_nano);
668                        }
669                        None => builder.append_null(),
670                    }
671                }
672                DataType::Boolean => {
673                    handle_primitive_type!(builder, field, BooleanBuilder, bool, row, idx);
674                }
675                DataType::List(inner) => match inner.data_type() {
676                    DataType::Int16 => {
677                        handle_primitive_array_type!(builder, field, Int16Builder, i16, row, idx);
678                    }
679                    DataType::Int32 => {
680                        handle_primitive_array_type!(builder, field, Int32Builder, i32, row, idx);
681                    }
682                    DataType::Int64 => {
683                        handle_primitive_array_type!(builder, field, Int64Builder, i64, row, idx);
684                    }
685                    DataType::Float32 => {
686                        handle_primitive_array_type!(builder, field, Float32Builder, f32, row, idx);
687                    }
688                    DataType::Float64 => {
689                        handle_primitive_array_type!(builder, field, Float64Builder, f64, row, idx);
690                    }
691                    DataType::Utf8 => {
692                        handle_primitive_array_type!(builder, field, StringBuilder, &str, row, idx);
693                    }
694                    DataType::Binary => {
695                        handle_primitive_array_type!(
696                            builder,
697                            field,
698                            BinaryBuilder,
699                            Vec<u8>,
700                            row,
701                            idx
702                        );
703                    }
704                    DataType::Boolean => {
705                        handle_primitive_array_type!(
706                            builder,
707                            field,
708                            BooleanBuilder,
709                            bool,
710                            row,
711                            idx
712                        );
713                    }
714                    _ => {
715                        return Err(DataFusionError::NotImplemented(format!(
716                            "Unsupported list data type {:?} for col: {:?}",
717                            field.data_type(),
718                            col
719                        )));
720                    }
721                },
722                _ => {
723                    return Err(DataFusionError::NotImplemented(format!(
724                        "Unsupported data type {:?} for col: {:?}",
725                        field.data_type(),
726                        col
727                    )));
728                }
729            }
730            // match field.remote_type {
731            //     RemoteType::Postgres(PostgresType::PostGisGeometry) => {
732            //         let builder = builder
733            //             .as_any_mut()
734            //             .downcast_mut::<BinaryBuilder>()
735            //             .expect("Failed to downcast builder to BinaryBuilder for Type::geometry");
736            //         let v: Option<GeometryFromSql> = row
737            //             .try_get(idx)
738            //             .expect("Failed to get GeometryFromSql value for column Type::geometry");
739            //
740            //         match v {
741            //             Some(v) => builder.append_value(v.wkb),
742            //             None => builder.append_null(),
743            //         }
744            //     }
745            //     _ => {
746            //         return Err(DataFusionError::Execution(format!(
747            //             "Unsupported postgres type {field:?}",
748            //         )));
749            //     }
750            // }
751        }
752    }
753    let projected_columns = array_builders
754        .into_iter()
755        .enumerate()
756        .filter(|(idx, _)| projections_contains(projection, *idx))
757        .map(|(_, mut builder)| builder.finish())
758        .collect::<Vec<ArrayRef>>();
759    Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
760}