datafusion_remote_table/connection/
postgres.rs

1use crate::connection::{RemoteDbType, big_decimal_to_i128, projections_contains};
2use crate::{
3    Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4    RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14    Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15    IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
16    Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
17    TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20    DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use datafusion::prelude::Expr;
27use derive_getters::Getters;
28use derive_with::With;
29use futures::StreamExt;
30use num_bigint::{BigInt, Sign};
31use std::string::ToString;
32use std::sync::Arc;
33use std::time::{SystemTime, UNIX_EPOCH};
34
35#[derive(Debug, Clone, With, Getters)]
36pub struct PostgresConnectionOptions {
37    pub(crate) host: String,
38    pub(crate) port: u16,
39    pub(crate) username: String,
40    pub(crate) password: String,
41    pub(crate) database: Option<String>,
42    pub(crate) pool_max_size: usize,
43    pub(crate) stream_chunk_size: usize,
44}
45
46impl PostgresConnectionOptions {
47    pub fn new(
48        host: impl Into<String>,
49        port: u16,
50        username: impl Into<String>,
51        password: impl Into<String>,
52    ) -> Self {
53        Self {
54            host: host.into(),
55            port,
56            username: username.into(),
57            password: password.into(),
58            database: None,
59            pool_max_size: 10,
60            stream_chunk_size: 2048,
61        }
62    }
63}
64
65#[derive(Debug)]
66pub struct PostgresPool {
67    pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
68}
69
70#[async_trait::async_trait]
71impl Pool for PostgresPool {
72    async fn get(&self) -> DFResult<Arc<dyn Connection>> {
73        let conn = self.pool.get_owned().await.map_err(|e| {
74            DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
75        })?;
76        Ok(Arc::new(PostgresConnection { conn }))
77    }
78}
79
80pub(crate) async fn connect_postgres(
81    options: &PostgresConnectionOptions,
82) -> DFResult<PostgresPool> {
83    let mut config = bb8_postgres::tokio_postgres::config::Config::new();
84    config
85        .host(&options.host)
86        .port(options.port)
87        .user(&options.username)
88        .password(&options.password);
89    if let Some(database) = &options.database {
90        config.dbname(database);
91    }
92    let manager = PostgresConnectionManager::new(config, NoTls);
93    let pool = bb8::Pool::builder()
94        .max_size(options.pool_max_size as u32)
95        .build(manager)
96        .await
97        .map_err(|e| {
98            DataFusionError::Execution(format!(
99                "Failed to create postgres connection pool due to {e}",
100            ))
101        })?;
102
103    Ok(PostgresPool { pool })
104}
105
106#[derive(Debug)]
107pub(crate) struct PostgresConnection {
108    conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
109}
110
111#[async_trait::async_trait]
112impl Connection for PostgresConnection {
113    async fn infer_schema(&self, sql: &str) -> DFResult<RemoteSchemaRef> {
114        let sql = RemoteDbType::Postgres.query_limit_1(sql)?;
115        let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
116            DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
117        })?;
118        let remote_schema = Arc::new(build_remote_schema(&row)?);
119        Ok(remote_schema)
120    }
121
122    async fn query(
123        &self,
124        conn_options: &ConnectionOptions,
125        sql: &str,
126        table_schema: SchemaRef,
127        projection: Option<&Vec<usize>>,
128        filters: &[Expr],
129        limit: Option<usize>,
130    ) -> DFResult<SendableRecordBatchStream> {
131        let projected_schema = project_schema(&table_schema, projection)?;
132        let sql = RemoteDbType::Postgres.try_rewrite_query(sql, filters, limit)?;
133        let projection = projection.cloned();
134        let chunk_size = conn_options.stream_chunk_size();
135        let stream = self
136            .conn
137            .query_raw(&sql, Vec::<String>::new())
138            .await
139            .map_err(|e| {
140                DataFusionError::Execution(format!(
141                    "Failed to execute query {sql} on postgres: {e}",
142                ))
143            })?
144            .chunks(chunk_size)
145            .boxed();
146
147        let stream = stream.map(move |rows| {
148            let rows: Vec<Row> = rows
149                .into_iter()
150                .collect::<Result<Vec<_>, _>>()
151                .map_err(|e| {
152                    DataFusionError::Execution(format!(
153                        "Failed to collect rows from postgres due to {e}",
154                    ))
155                })?;
156            rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
157        });
158
159        Ok(Box::pin(RecordBatchStreamAdapter::new(
160            projected_schema,
161            stream,
162        )))
163    }
164}
165
166fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<PostgresType> {
167    match pg_type {
168        &Type::INT2 => Ok(PostgresType::Int2),
169        &Type::INT4 => Ok(PostgresType::Int4),
170        &Type::INT8 => Ok(PostgresType::Int8),
171        &Type::FLOAT4 => Ok(PostgresType::Float4),
172        &Type::FLOAT8 => Ok(PostgresType::Float8),
173        &Type::NUMERIC => {
174            let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
175                DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
176            })?;
177            let scale = match v {
178                Some(v) => v.scale,
179                None => 0,
180            };
181            assert!((scale as u32) <= (i8::MAX as u32));
182            Ok(PostgresType::Numeric(scale.try_into().unwrap_or_default()))
183        }
184        &Type::OID => Ok(PostgresType::Oid),
185        &Type::NAME => Ok(PostgresType::Name),
186        &Type::VARCHAR => Ok(PostgresType::Varchar),
187        &Type::BPCHAR => Ok(PostgresType::Bpchar),
188        &Type::TEXT => Ok(PostgresType::Text),
189        &Type::BYTEA => Ok(PostgresType::Bytea),
190        &Type::DATE => Ok(PostgresType::Date),
191        &Type::TIMESTAMP => Ok(PostgresType::Timestamp),
192        &Type::TIMESTAMPTZ => Ok(PostgresType::TimestampTz),
193        &Type::TIME => Ok(PostgresType::Time),
194        &Type::INTERVAL => Ok(PostgresType::Interval),
195        &Type::BOOL => Ok(PostgresType::Bool),
196        &Type::JSON => Ok(PostgresType::Json),
197        &Type::JSONB => Ok(PostgresType::Jsonb),
198        &Type::INT2_ARRAY => Ok(PostgresType::Int2Array),
199        &Type::INT4_ARRAY => Ok(PostgresType::Int4Array),
200        &Type::INT8_ARRAY => Ok(PostgresType::Int8Array),
201        &Type::FLOAT4_ARRAY => Ok(PostgresType::Float4Array),
202        &Type::FLOAT8_ARRAY => Ok(PostgresType::Float8Array),
203        &Type::VARCHAR_ARRAY => Ok(PostgresType::VarcharArray),
204        &Type::BPCHAR_ARRAY => Ok(PostgresType::BpcharArray),
205        &Type::TEXT_ARRAY => Ok(PostgresType::TextArray),
206        &Type::BYTEA_ARRAY => Ok(PostgresType::ByteaArray),
207        &Type::BOOL_ARRAY => Ok(PostgresType::BoolArray),
208        other if other.name().eq_ignore_ascii_case("geometry") => Ok(PostgresType::PostGisGeometry),
209        _ => Err(DataFusionError::NotImplemented(format!(
210            "Unsupported postgres type {pg_type:?}",
211        ))),
212    }
213}
214
215fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
216    let mut remote_fields = vec![];
217    for (idx, col) in row.columns().iter().enumerate() {
218        remote_fields.push(RemoteField::new(
219            col.name(),
220            RemoteType::Postgres(pg_type_to_remote_type(col.type_(), row, idx)?),
221            true,
222        ));
223    }
224    Ok(RemoteSchema::new(remote_fields))
225}
226
227macro_rules! handle_primitive_type {
228    ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
229        let builder = $builder
230            .as_any_mut()
231            .downcast_mut::<$builder_ty>()
232            .unwrap_or_else(|| {
233                panic!(
234                    "Failed to downcast builder to {} for {:?} and {:?}",
235                    stringify!($builder_ty),
236                    $field,
237                    $col
238                )
239            });
240        let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
241            DataFusionError::Execution(format!(
242                "Failed to get {} value for {:?} and {:?}: {e:?}",
243                stringify!($value_ty),
244                $field,
245                $col
246            ))
247        })?;
248
249        match v {
250            Some(v) => builder.append_value($convert(v)?),
251            None => builder.append_null(),
252        }
253    }};
254}
255
256macro_rules! handle_primitive_array_type {
257    ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
258        let builder = $builder
259            .as_any_mut()
260            .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
261            .unwrap_or_else(|| {
262                panic!(
263                    "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
264                    $field, $col
265                )
266            });
267        let values_builder = builder
268            .values()
269            .as_any_mut()
270            .downcast_mut::<$values_builder_ty>()
271            .unwrap_or_else(|| {
272                panic!(
273                    "Failed to downcast values builder to {} for {:?} and {:?}",
274                    stringify!($builder_ty),
275                    $field,
276                    $col,
277                )
278            });
279        let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
280            DataFusionError::Execution(format!(
281                "Failed to get {} array value for {:?} and {:?}: {e:?}",
282                stringify!($value_ty),
283                $field,
284                $col,
285            ))
286        })?;
287
288        match v {
289            Some(v) => {
290                let v = v.into_iter().map(Some);
291                values_builder.extend(v);
292                builder.append(true);
293            }
294            None => builder.append_null(),
295        }
296    }};
297}
298
299#[derive(Debug)]
300struct BigDecimalFromSql {
301    inner: BigDecimal,
302    scale: u16,
303}
304
305impl BigDecimalFromSql {
306    fn to_decimal_128(&self) -> Option<i128> {
307        big_decimal_to_i128(&self.inner, Some(self.scale as i32))
308    }
309}
310
311#[allow(clippy::cast_sign_loss)]
312#[allow(clippy::cast_possible_wrap)]
313#[allow(clippy::cast_possible_truncation)]
314impl<'a> FromSql<'a> for BigDecimalFromSql {
315    fn from_sql(
316        _ty: &Type,
317        raw: &'a [u8],
318    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
319        let raw_u16: Vec<u16> = raw
320            .chunks(2)
321            .map(|chunk| {
322                if chunk.len() == 2 {
323                    u16::from_be_bytes([chunk[0], chunk[1]])
324                } else {
325                    u16::from_be_bytes([chunk[0], 0])
326                }
327            })
328            .collect();
329
330        let base_10_000_digit_count = raw_u16[0];
331        let weight = raw_u16[1] as i16;
332        let sign = raw_u16[2];
333        let scale = raw_u16[3];
334
335        let mut base_10_000_digits = Vec::new();
336        for i in 4..4 + base_10_000_digit_count {
337            base_10_000_digits.push(raw_u16[i as usize]);
338        }
339
340        let mut u8_digits = Vec::new();
341        for &base_10_000_digit in base_10_000_digits.iter().rev() {
342            let mut base_10_000_digit = base_10_000_digit;
343            let mut temp_result = Vec::new();
344            while base_10_000_digit > 0 {
345                temp_result.push((base_10_000_digit % 10) as u8);
346                base_10_000_digit /= 10;
347            }
348            while temp_result.len() < 4 {
349                temp_result.push(0);
350            }
351            u8_digits.extend(temp_result);
352        }
353        u8_digits.reverse();
354
355        let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
356        let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
357        u8_digits.resize(size as usize, 0);
358
359        let sign = match sign {
360            0x4000 => Sign::Minus,
361            0x0000 => Sign::Plus,
362            _ => {
363                return Err(Box::new(DataFusionError::Execution(
364                    "Failed to parse big decimal from postgres numeric value".to_string(),
365                )));
366            }
367        };
368
369        let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
370            return Err(Box::new(DataFusionError::Execution(
371                "Failed to parse big decimal from postgres numeric value".to_string(),
372            )));
373        };
374        Ok(BigDecimalFromSql {
375            inner: BigDecimal::new(digits, i64::from(scale)),
376            scale,
377        })
378    }
379
380    fn accepts(ty: &Type) -> bool {
381        matches!(*ty, Type::NUMERIC)
382    }
383}
384
385// interval_send - Postgres C (https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/timestamp.c#L1032)
386// interval values are internally stored as three integral fields: months, days, and microseconds
387#[derive(Debug)]
388struct IntervalFromSql {
389    time: i64,
390    day: i32,
391    month: i32,
392}
393
394impl<'a> FromSql<'a> for IntervalFromSql {
395    fn from_sql(
396        _ty: &Type,
397        raw: &'a [u8],
398    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
399        let mut cursor = std::io::Cursor::new(raw);
400
401        let time = cursor.read_i64::<BigEndian>()?;
402        let day = cursor.read_i32::<BigEndian>()?;
403        let month = cursor.read_i32::<BigEndian>()?;
404
405        Ok(IntervalFromSql { time, day, month })
406    }
407
408    fn accepts(ty: &Type) -> bool {
409        matches!(*ty, Type::INTERVAL)
410    }
411}
412
413struct GeometryFromSql<'a> {
414    wkb: &'a [u8],
415}
416
417impl<'a> FromSql<'a> for GeometryFromSql<'a> {
418    fn from_sql(
419        _ty: &Type,
420        raw: &'a [u8],
421    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
422        Ok(GeometryFromSql { wkb: raw })
423    }
424
425    fn accepts(ty: &Type) -> bool {
426        matches!(ty.name(), "geometry")
427    }
428}
429
430fn rows_to_batch(
431    rows: &[Row],
432    table_schema: &SchemaRef,
433    projection: Option<&Vec<usize>>,
434) -> DFResult<RecordBatch> {
435    let projected_schema = project_schema(table_schema, projection)?;
436    let mut array_builders = vec![];
437    for field in table_schema.fields() {
438        let builder = make_builder(field.data_type(), rows.len());
439        array_builders.push(builder);
440    }
441
442    for row in rows {
443        for (idx, field) in table_schema.fields.iter().enumerate() {
444            if !projections_contains(projection, idx) {
445                continue;
446            }
447            let builder = &mut array_builders[idx];
448            let col = row.columns().get(idx);
449            match field.data_type() {
450                DataType::Int16 => {
451                    handle_primitive_type!(builder, field, col, Int16Builder, i16, row, idx, |v| {
452                        Ok::<_, DataFusionError>(v)
453                    });
454                }
455                DataType::Int32 => {
456                    handle_primitive_type!(builder, field, col, Int32Builder, i32, row, idx, |v| {
457                        Ok::<_, DataFusionError>(v)
458                    });
459                }
460                DataType::UInt32 => {
461                    handle_primitive_type!(
462                        builder,
463                        field,
464                        col,
465                        UInt32Builder,
466                        u32,
467                        row,
468                        idx,
469                        |v| { Ok::<_, DataFusionError>(v) }
470                    );
471                }
472                DataType::Int64 => {
473                    handle_primitive_type!(builder, field, col, Int64Builder, i64, row, idx, |v| {
474                        Ok::<_, DataFusionError>(v)
475                    });
476                }
477                DataType::Float32 => {
478                    handle_primitive_type!(
479                        builder,
480                        field,
481                        col,
482                        Float32Builder,
483                        f32,
484                        row,
485                        idx,
486                        |v| { Ok::<_, DataFusionError>(v) }
487                    );
488                }
489                DataType::Float64 => {
490                    handle_primitive_type!(
491                        builder,
492                        field,
493                        col,
494                        Float64Builder,
495                        f64,
496                        row,
497                        idx,
498                        |v| { Ok::<_, DataFusionError>(v) }
499                    );
500                }
501                DataType::Decimal128(_precision, _scale) => {
502                    handle_primitive_type!(
503                        builder,
504                        field,
505                        col,
506                        Decimal128Builder,
507                        BigDecimalFromSql,
508                        row,
509                        idx,
510                        |v: BigDecimalFromSql| {
511                            v.to_decimal_128().ok_or_else(|| {
512                                DataFusionError::Execution(format!(
513                                    "Failed to convert BigDecimal {v:?} to i128",
514                                ))
515                            })
516                        }
517                    );
518                }
519                DataType::Utf8 => {
520                    handle_primitive_type!(
521                        builder,
522                        field,
523                        col,
524                        StringBuilder,
525                        &str,
526                        row,
527                        idx,
528                        |v| { Ok::<_, DataFusionError>(v) }
529                    );
530                }
531                DataType::LargeUtf8 => {
532                    if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
533                        handle_primitive_type!(
534                            builder,
535                            field,
536                            col,
537                            LargeStringBuilder,
538                            serde_json::value::Value,
539                            row,
540                            idx,
541                            |v: serde_json::value::Value| {
542                                Ok::<_, DataFusionError>(v.to_string())
543                            }
544                        );
545                    } else {
546                        handle_primitive_type!(
547                            builder,
548                            field,
549                            col,
550                            LargeStringBuilder,
551                            &str,
552                            row,
553                            idx,
554                            |v| { Ok::<_, DataFusionError>(v) }
555                        );
556                    }
557                }
558                DataType::Binary => {
559                    if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
560                    {
561                        let convert: for<'a> fn(GeometryFromSql<'a>) -> DFResult<&'a [u8]> =
562                            |v| Ok(v.wkb);
563                        handle_primitive_type!(
564                            builder,
565                            field,
566                            col,
567                            BinaryBuilder,
568                            GeometryFromSql,
569                            row,
570                            idx,
571                            convert
572                        );
573                    } else if col.is_some()
574                        && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
575                    {
576                        handle_primitive_type!(
577                            builder,
578                            field,
579                            col,
580                            BinaryBuilder,
581                            serde_json::value::Value,
582                            row,
583                            idx,
584                            |v: serde_json::value::Value| {
585                                Ok::<_, DataFusionError>(v.to_string().into_bytes())
586                            }
587                        );
588                    } else {
589                        handle_primitive_type!(
590                            builder,
591                            field,
592                            col,
593                            BinaryBuilder,
594                            Vec<u8>,
595                            row,
596                            idx,
597                            |v| { Ok::<_, DataFusionError>(v) }
598                        );
599                    }
600                }
601                DataType::Timestamp(TimeUnit::Microsecond, None) => {
602                    handle_primitive_type!(
603                        builder,
604                        field,
605                        col,
606                        TimestampMicrosecondBuilder,
607                        SystemTime,
608                        row,
609                        idx,
610                        |v: SystemTime| {
611                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
612                                let timestamp: i64 = v
613                                    .as_micros()
614                                    .try_into()
615                                    .expect("Failed to convert SystemTime to i64");
616                                Ok(timestamp)
617                            } else {
618                                Err(DataFusionError::Execution(format!(
619                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
620                                )))
621                            }
622                        }
623                    );
624                }
625                DataType::Timestamp(TimeUnit::Nanosecond, None) => {
626                    handle_primitive_type!(
627                        builder,
628                        field,
629                        col,
630                        TimestampNanosecondBuilder,
631                        SystemTime,
632                        row,
633                        idx,
634                        |v: SystemTime| {
635                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
636                                let timestamp: i64 = v
637                                    .as_nanos()
638                                    .try_into()
639                                    .expect("Failed to convert SystemTime to i64");
640                                Ok(timestamp)
641                            } else {
642                                Err(DataFusionError::Execution(format!(
643                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
644                                )))
645                            }
646                        }
647                    );
648                }
649                DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
650                    handle_primitive_type!(
651                        builder,
652                        field,
653                        col,
654                        TimestampNanosecondBuilder,
655                        chrono::DateTime<chrono::Utc>,
656                        row,
657                        idx,
658                        |v: chrono::DateTime<chrono::Utc>| {
659                            let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
660                            Ok::<_, DataFusionError>(timestamp)
661                        }
662                    );
663                }
664                DataType::Time64(TimeUnit::Microsecond) => {
665                    handle_primitive_type!(
666                        builder,
667                        field,
668                        col,
669                        Time64MicrosecondBuilder,
670                        chrono::NaiveTime,
671                        row,
672                        idx,
673                        |v: chrono::NaiveTime| {
674                            let seconds = i64::from(v.num_seconds_from_midnight());
675                            let microseconds = i64::from(v.nanosecond()) / 1000;
676                            Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
677                        }
678                    );
679                }
680                DataType::Time64(TimeUnit::Nanosecond) => {
681                    handle_primitive_type!(
682                        builder,
683                        field,
684                        col,
685                        Time64NanosecondBuilder,
686                        chrono::NaiveTime,
687                        row,
688                        idx,
689                        |v: chrono::NaiveTime| {
690                            let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
691                                * 1_000_000_000
692                                + i64::from(v.nanosecond());
693                            Ok::<_, DataFusionError>(timestamp)
694                        }
695                    );
696                }
697                DataType::Date32 => {
698                    handle_primitive_type!(
699                        builder,
700                        field,
701                        col,
702                        Date32Builder,
703                        chrono::NaiveDate,
704                        row,
705                        idx,
706                        |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
707                    );
708                }
709                DataType::Interval(IntervalUnit::MonthDayNano) => {
710                    handle_primitive_type!(
711                        builder,
712                        field,
713                        col,
714                        IntervalMonthDayNanoBuilder,
715                        IntervalFromSql,
716                        row,
717                        idx,
718                        |v: IntervalFromSql| {
719                            let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
720                                v.month,
721                                v.day,
722                                v.time * 1_000,
723                            );
724                            Ok::<_, DataFusionError>(interval_month_day_nano)
725                        }
726                    );
727                }
728                DataType::Boolean => {
729                    handle_primitive_type!(
730                        builder,
731                        field,
732                        col,
733                        BooleanBuilder,
734                        bool,
735                        row,
736                        idx,
737                        |v| { Ok::<_, DataFusionError>(v) }
738                    );
739                }
740                DataType::List(inner) => match inner.data_type() {
741                    DataType::Int16 => {
742                        handle_primitive_array_type!(
743                            builder,
744                            field,
745                            col,
746                            Int16Builder,
747                            i16,
748                            row,
749                            idx
750                        );
751                    }
752                    DataType::Int32 => {
753                        handle_primitive_array_type!(
754                            builder,
755                            field,
756                            col,
757                            Int32Builder,
758                            i32,
759                            row,
760                            idx
761                        );
762                    }
763                    DataType::Int64 => {
764                        handle_primitive_array_type!(
765                            builder,
766                            field,
767                            col,
768                            Int64Builder,
769                            i64,
770                            row,
771                            idx
772                        );
773                    }
774                    DataType::Float32 => {
775                        handle_primitive_array_type!(
776                            builder,
777                            field,
778                            col,
779                            Float32Builder,
780                            f32,
781                            row,
782                            idx
783                        );
784                    }
785                    DataType::Float64 => {
786                        handle_primitive_array_type!(
787                            builder,
788                            field,
789                            col,
790                            Float64Builder,
791                            f64,
792                            row,
793                            idx
794                        );
795                    }
796                    DataType::Utf8 => {
797                        handle_primitive_array_type!(
798                            builder,
799                            field,
800                            col,
801                            StringBuilder,
802                            &str,
803                            row,
804                            idx
805                        );
806                    }
807                    DataType::Binary => {
808                        handle_primitive_array_type!(
809                            builder,
810                            field,
811                            col,
812                            BinaryBuilder,
813                            Vec<u8>,
814                            row,
815                            idx
816                        );
817                    }
818                    DataType::Boolean => {
819                        handle_primitive_array_type!(
820                            builder,
821                            field,
822                            col,
823                            BooleanBuilder,
824                            bool,
825                            row,
826                            idx
827                        );
828                    }
829                    _ => {
830                        return Err(DataFusionError::NotImplemented(format!(
831                            "Unsupported list data type {:?} for col: {:?}",
832                            field.data_type(),
833                            col
834                        )));
835                    }
836                },
837                _ => {
838                    return Err(DataFusionError::NotImplemented(format!(
839                        "Unsupported data type {:?} for col: {:?}",
840                        field.data_type(),
841                        col
842                    )));
843                }
844            }
845        }
846    }
847    let projected_columns = array_builders
848        .into_iter()
849        .enumerate()
850        .filter(|(idx, _)| projections_contains(projection, *idx))
851        .map(|(_, mut builder)| builder.finish())
852        .collect::<Vec<ArrayRef>>();
853    Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
854}