datafusion_remote_table/connection/
postgres.rs

1use crate::connection::{RemoteDbType, big_decimal_to_i128, just_return, projections_contains};
2use crate::{
3    Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4    RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14    Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15    IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, RecordBatchOptions,
16    StringBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
17    TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20    DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use derive_getters::Getters;
27use derive_with::With;
28use futures::StreamExt;
29use log::debug;
30use num_bigint::{BigInt, Sign};
31use std::any::Any;
32use std::string::ToString;
33use std::sync::Arc;
34use std::time::{SystemTime, UNIX_EPOCH};
35
36#[derive(Debug, Clone, With, Getters)]
37pub struct PostgresConnectionOptions {
38    pub(crate) host: String,
39    pub(crate) port: u16,
40    pub(crate) username: String,
41    pub(crate) password: String,
42    pub(crate) database: Option<String>,
43    pub(crate) pool_max_size: usize,
44    pub(crate) stream_chunk_size: usize,
45}
46
47impl PostgresConnectionOptions {
48    pub fn new(
49        host: impl Into<String>,
50        port: u16,
51        username: impl Into<String>,
52        password: impl Into<String>,
53    ) -> Self {
54        Self {
55            host: host.into(),
56            port,
57            username: username.into(),
58            password: password.into(),
59            database: None,
60            pool_max_size: 10,
61            stream_chunk_size: 2048,
62        }
63    }
64}
65
66impl From<PostgresConnectionOptions> for ConnectionOptions {
67    fn from(options: PostgresConnectionOptions) -> Self {
68        ConnectionOptions::Postgres(options)
69    }
70}
71
72#[derive(Debug)]
73pub struct PostgresPool {
74    pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
75}
76
77#[async_trait::async_trait]
78impl Pool for PostgresPool {
79    async fn get(&self) -> DFResult<Arc<dyn Connection>> {
80        let conn = self.pool.get_owned().await.map_err(|e| {
81            DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
82        })?;
83        Ok(Arc::new(PostgresConnection { conn }))
84    }
85}
86
87pub(crate) async fn connect_postgres(
88    options: &PostgresConnectionOptions,
89) -> DFResult<PostgresPool> {
90    let mut config = bb8_postgres::tokio_postgres::config::Config::new();
91    config
92        .host(&options.host)
93        .port(options.port)
94        .user(&options.username)
95        .password(&options.password);
96    if let Some(database) = &options.database {
97        config.dbname(database);
98    }
99    let manager = PostgresConnectionManager::new(config, NoTls);
100    let pool = bb8::Pool::builder()
101        .max_size(options.pool_max_size as u32)
102        .build(manager)
103        .await
104        .map_err(|e| {
105            DataFusionError::Execution(format!(
106                "Failed to create postgres connection pool due to {e}",
107            ))
108        })?;
109
110    Ok(PostgresPool { pool })
111}
112
113#[derive(Debug)]
114pub(crate) struct PostgresConnection {
115    conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
116}
117
118#[async_trait::async_trait]
119impl Connection for PostgresConnection {
120    fn as_any(&self) -> &dyn Any {
121        self
122    }
123
124    async fn infer_schema(&self, sql: &str) -> DFResult<RemoteSchemaRef> {
125        let sql = RemoteDbType::Postgres.query_limit_1(sql);
126        let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
127            DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
128        })?;
129        let remote_schema = Arc::new(build_remote_schema(&row)?);
130        Ok(remote_schema)
131    }
132
133    async fn query(
134        &self,
135        conn_options: &ConnectionOptions,
136        sql: &str,
137        table_schema: SchemaRef,
138        projection: Option<&Vec<usize>>,
139        unparsed_filters: &[String],
140        limit: Option<usize>,
141    ) -> DFResult<SendableRecordBatchStream> {
142        let projected_schema = project_schema(&table_schema, projection)?;
143
144        let sql = RemoteDbType::Postgres.rewrite_query(sql, unparsed_filters, limit);
145        debug!("[remote-table] executing postgres query: {sql}");
146
147        let projection = projection.cloned();
148        let chunk_size = conn_options.stream_chunk_size();
149        let stream = self
150            .conn
151            .query_raw(&sql, Vec::<String>::new())
152            .await
153            .map_err(|e| {
154                DataFusionError::Execution(format!(
155                    "Failed to execute query {sql} on postgres: {e}",
156                ))
157            })?
158            .chunks(chunk_size)
159            .boxed();
160
161        let stream = stream.map(move |rows| {
162            let rows: Vec<Row> = rows
163                .into_iter()
164                .collect::<Result<Vec<_>, _>>()
165                .map_err(|e| {
166                    DataFusionError::Execution(format!(
167                        "Failed to collect rows from postgres due to {e}",
168                    ))
169                })?;
170            rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
171        });
172
173        Ok(Box::pin(RecordBatchStreamAdapter::new(
174            projected_schema,
175            stream,
176        )))
177    }
178}
179
180fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<PostgresType> {
181    match pg_type {
182        &Type::INT2 => Ok(PostgresType::Int2),
183        &Type::INT4 => Ok(PostgresType::Int4),
184        &Type::INT8 => Ok(PostgresType::Int8),
185        &Type::FLOAT4 => Ok(PostgresType::Float4),
186        &Type::FLOAT8 => Ok(PostgresType::Float8),
187        &Type::NUMERIC => {
188            let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
189                DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
190            })?;
191            let scale = match v {
192                Some(v) => v.scale,
193                None => 0,
194            };
195            assert!((scale as u32) <= (i8::MAX as u32));
196            Ok(PostgresType::Numeric(scale.try_into().unwrap_or_default()))
197        }
198        &Type::OID => Ok(PostgresType::Oid),
199        &Type::NAME => Ok(PostgresType::Name),
200        &Type::VARCHAR => Ok(PostgresType::Varchar),
201        &Type::BPCHAR => Ok(PostgresType::Bpchar),
202        &Type::TEXT => Ok(PostgresType::Text),
203        &Type::BYTEA => Ok(PostgresType::Bytea),
204        &Type::DATE => Ok(PostgresType::Date),
205        &Type::TIMESTAMP => Ok(PostgresType::Timestamp),
206        &Type::TIMESTAMPTZ => Ok(PostgresType::TimestampTz),
207        &Type::TIME => Ok(PostgresType::Time),
208        &Type::INTERVAL => Ok(PostgresType::Interval),
209        &Type::BOOL => Ok(PostgresType::Bool),
210        &Type::JSON => Ok(PostgresType::Json),
211        &Type::JSONB => Ok(PostgresType::Jsonb),
212        &Type::INT2_ARRAY => Ok(PostgresType::Int2Array),
213        &Type::INT4_ARRAY => Ok(PostgresType::Int4Array),
214        &Type::INT8_ARRAY => Ok(PostgresType::Int8Array),
215        &Type::FLOAT4_ARRAY => Ok(PostgresType::Float4Array),
216        &Type::FLOAT8_ARRAY => Ok(PostgresType::Float8Array),
217        &Type::VARCHAR_ARRAY => Ok(PostgresType::VarcharArray),
218        &Type::BPCHAR_ARRAY => Ok(PostgresType::BpcharArray),
219        &Type::TEXT_ARRAY => Ok(PostgresType::TextArray),
220        &Type::BYTEA_ARRAY => Ok(PostgresType::ByteaArray),
221        &Type::BOOL_ARRAY => Ok(PostgresType::BoolArray),
222        other if other.name().eq_ignore_ascii_case("geometry") => Ok(PostgresType::PostGisGeometry),
223        _ => Err(DataFusionError::NotImplemented(format!(
224            "Unsupported postgres type {pg_type:?}",
225        ))),
226    }
227}
228
229fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
230    let mut remote_fields = vec![];
231    for (idx, col) in row.columns().iter().enumerate() {
232        remote_fields.push(RemoteField::new(
233            col.name(),
234            RemoteType::Postgres(pg_type_to_remote_type(col.type_(), row, idx)?),
235            true,
236        ));
237    }
238    Ok(RemoteSchema::new(remote_fields))
239}
240
241macro_rules! handle_primitive_type {
242    ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
243        let builder = $builder
244            .as_any_mut()
245            .downcast_mut::<$builder_ty>()
246            .unwrap_or_else(|| {
247                panic!(
248                    "Failed to downcast builder to {} for {:?} and {:?}",
249                    stringify!($builder_ty),
250                    $field,
251                    $col
252                )
253            });
254        let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
255            DataFusionError::Execution(format!(
256                "Failed to get {} value for {:?} and {:?}: {e:?}",
257                stringify!($value_ty),
258                $field,
259                $col
260            ))
261        })?;
262
263        match v {
264            Some(v) => builder.append_value($convert(v)?),
265            None => builder.append_null(),
266        }
267    }};
268}
269
270macro_rules! handle_primitive_array_type {
271    ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
272        let builder = $builder
273            .as_any_mut()
274            .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
275            .unwrap_or_else(|| {
276                panic!(
277                    "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
278                    $field, $col
279                )
280            });
281        let values_builder = builder
282            .values()
283            .as_any_mut()
284            .downcast_mut::<$values_builder_ty>()
285            .unwrap_or_else(|| {
286                panic!(
287                    "Failed to downcast values builder to {} for {:?} and {:?}",
288                    stringify!($builder_ty),
289                    $field,
290                    $col,
291                )
292            });
293        let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
294            DataFusionError::Execution(format!(
295                "Failed to get {} array value for {:?} and {:?}: {e:?}",
296                stringify!($value_ty),
297                $field,
298                $col,
299            ))
300        })?;
301
302        match v {
303            Some(v) => {
304                let v = v.into_iter().map(Some);
305                values_builder.extend(v);
306                builder.append(true);
307            }
308            None => builder.append_null(),
309        }
310    }};
311}
312
313#[derive(Debug)]
314struct BigDecimalFromSql {
315    inner: BigDecimal,
316    scale: u16,
317}
318
319impl BigDecimalFromSql {
320    fn to_decimal_128(&self) -> Option<i128> {
321        big_decimal_to_i128(&self.inner, Some(self.scale as i32))
322    }
323}
324
325#[allow(clippy::cast_sign_loss)]
326#[allow(clippy::cast_possible_wrap)]
327#[allow(clippy::cast_possible_truncation)]
328impl<'a> FromSql<'a> for BigDecimalFromSql {
329    fn from_sql(
330        _ty: &Type,
331        raw: &'a [u8],
332    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
333        let raw_u16: Vec<u16> = raw
334            .chunks(2)
335            .map(|chunk| {
336                if chunk.len() == 2 {
337                    u16::from_be_bytes([chunk[0], chunk[1]])
338                } else {
339                    u16::from_be_bytes([chunk[0], 0])
340                }
341            })
342            .collect();
343
344        let base_10_000_digit_count = raw_u16[0];
345        let weight = raw_u16[1] as i16;
346        let sign = raw_u16[2];
347        let scale = raw_u16[3];
348
349        let mut base_10_000_digits = Vec::new();
350        for i in 4..4 + base_10_000_digit_count {
351            base_10_000_digits.push(raw_u16[i as usize]);
352        }
353
354        let mut u8_digits = Vec::new();
355        for &base_10_000_digit in base_10_000_digits.iter().rev() {
356            let mut base_10_000_digit = base_10_000_digit;
357            let mut temp_result = Vec::new();
358            while base_10_000_digit > 0 {
359                temp_result.push((base_10_000_digit % 10) as u8);
360                base_10_000_digit /= 10;
361            }
362            while temp_result.len() < 4 {
363                temp_result.push(0);
364            }
365            u8_digits.extend(temp_result);
366        }
367        u8_digits.reverse();
368
369        let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
370        let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
371        u8_digits.resize(size as usize, 0);
372
373        let sign = match sign {
374            0x4000 => Sign::Minus,
375            0x0000 => Sign::Plus,
376            _ => {
377                return Err(Box::new(DataFusionError::Execution(
378                    "Failed to parse big decimal from postgres numeric value".to_string(),
379                )));
380            }
381        };
382
383        let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
384            return Err(Box::new(DataFusionError::Execution(
385                "Failed to parse big decimal from postgres numeric value".to_string(),
386            )));
387        };
388        Ok(BigDecimalFromSql {
389            inner: BigDecimal::new(digits, i64::from(scale)),
390            scale,
391        })
392    }
393
394    fn accepts(ty: &Type) -> bool {
395        matches!(*ty, Type::NUMERIC)
396    }
397}
398
399// interval_send - Postgres C (https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/timestamp.c#L1032)
400// interval values are internally stored as three integral fields: months, days, and microseconds
401#[derive(Debug)]
402struct IntervalFromSql {
403    time: i64,
404    day: i32,
405    month: i32,
406}
407
408impl<'a> FromSql<'a> for IntervalFromSql {
409    fn from_sql(
410        _ty: &Type,
411        raw: &'a [u8],
412    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
413        let mut cursor = std::io::Cursor::new(raw);
414
415        let time = cursor.read_i64::<BigEndian>()?;
416        let day = cursor.read_i32::<BigEndian>()?;
417        let month = cursor.read_i32::<BigEndian>()?;
418
419        Ok(IntervalFromSql { time, day, month })
420    }
421
422    fn accepts(ty: &Type) -> bool {
423        matches!(*ty, Type::INTERVAL)
424    }
425}
426
427struct GeometryFromSql<'a> {
428    wkb: &'a [u8],
429}
430
431impl<'a> FromSql<'a> for GeometryFromSql<'a> {
432    fn from_sql(
433        _ty: &Type,
434        raw: &'a [u8],
435    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
436        Ok(GeometryFromSql { wkb: raw })
437    }
438
439    fn accepts(ty: &Type) -> bool {
440        matches!(ty.name(), "geometry")
441    }
442}
443
444fn rows_to_batch(
445    rows: &[Row],
446    table_schema: &SchemaRef,
447    projection: Option<&Vec<usize>>,
448) -> DFResult<RecordBatch> {
449    let projected_schema = project_schema(table_schema, projection)?;
450    let mut array_builders = vec![];
451    for field in table_schema.fields() {
452        let builder = make_builder(field.data_type(), rows.len());
453        array_builders.push(builder);
454    }
455
456    for row in rows {
457        for (idx, field) in table_schema.fields.iter().enumerate() {
458            if !projections_contains(projection, idx) {
459                continue;
460            }
461            let builder = &mut array_builders[idx];
462            let col = row.columns().get(idx);
463            match field.data_type() {
464                DataType::Int16 => {
465                    handle_primitive_type!(
466                        builder,
467                        field,
468                        col,
469                        Int16Builder,
470                        i16,
471                        row,
472                        idx,
473                        just_return
474                    );
475                }
476                DataType::Int32 => {
477                    handle_primitive_type!(
478                        builder,
479                        field,
480                        col,
481                        Int32Builder,
482                        i32,
483                        row,
484                        idx,
485                        just_return
486                    );
487                }
488                DataType::UInt32 => {
489                    handle_primitive_type!(
490                        builder,
491                        field,
492                        col,
493                        UInt32Builder,
494                        u32,
495                        row,
496                        idx,
497                        just_return
498                    );
499                }
500                DataType::Int64 => {
501                    handle_primitive_type!(
502                        builder,
503                        field,
504                        col,
505                        Int64Builder,
506                        i64,
507                        row,
508                        idx,
509                        just_return
510                    );
511                }
512                DataType::Float32 => {
513                    handle_primitive_type!(
514                        builder,
515                        field,
516                        col,
517                        Float32Builder,
518                        f32,
519                        row,
520                        idx,
521                        just_return
522                    );
523                }
524                DataType::Float64 => {
525                    handle_primitive_type!(
526                        builder,
527                        field,
528                        col,
529                        Float64Builder,
530                        f64,
531                        row,
532                        idx,
533                        just_return
534                    );
535                }
536                DataType::Decimal128(_precision, _scale) => {
537                    handle_primitive_type!(
538                        builder,
539                        field,
540                        col,
541                        Decimal128Builder,
542                        BigDecimalFromSql,
543                        row,
544                        idx,
545                        |v: BigDecimalFromSql| {
546                            v.to_decimal_128().ok_or_else(|| {
547                                DataFusionError::Execution(format!(
548                                    "Failed to convert BigDecimal {v:?} to i128",
549                                ))
550                            })
551                        }
552                    );
553                }
554                DataType::Utf8 => {
555                    handle_primitive_type!(
556                        builder,
557                        field,
558                        col,
559                        StringBuilder,
560                        &str,
561                        row,
562                        idx,
563                        just_return
564                    );
565                }
566                DataType::LargeUtf8 => {
567                    if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
568                        handle_primitive_type!(
569                            builder,
570                            field,
571                            col,
572                            LargeStringBuilder,
573                            serde_json::value::Value,
574                            row,
575                            idx,
576                            |v: serde_json::value::Value| {
577                                Ok::<_, DataFusionError>(v.to_string())
578                            }
579                        );
580                    } else {
581                        handle_primitive_type!(
582                            builder,
583                            field,
584                            col,
585                            LargeStringBuilder,
586                            &str,
587                            row,
588                            idx,
589                            just_return
590                        );
591                    }
592                }
593                DataType::Binary => {
594                    if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
595                    {
596                        let convert: for<'a> fn(GeometryFromSql<'a>) -> DFResult<&'a [u8]> =
597                            |v| Ok(v.wkb);
598                        handle_primitive_type!(
599                            builder,
600                            field,
601                            col,
602                            BinaryBuilder,
603                            GeometryFromSql,
604                            row,
605                            idx,
606                            convert
607                        );
608                    } else if col.is_some()
609                        && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
610                    {
611                        handle_primitive_type!(
612                            builder,
613                            field,
614                            col,
615                            BinaryBuilder,
616                            serde_json::value::Value,
617                            row,
618                            idx,
619                            |v: serde_json::value::Value| {
620                                Ok::<_, DataFusionError>(v.to_string().into_bytes())
621                            }
622                        );
623                    } else {
624                        handle_primitive_type!(
625                            builder,
626                            field,
627                            col,
628                            BinaryBuilder,
629                            Vec<u8>,
630                            row,
631                            idx,
632                            just_return
633                        );
634                    }
635                }
636                DataType::Timestamp(TimeUnit::Microsecond, None) => {
637                    handle_primitive_type!(
638                        builder,
639                        field,
640                        col,
641                        TimestampMicrosecondBuilder,
642                        SystemTime,
643                        row,
644                        idx,
645                        |v: SystemTime| {
646                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
647                                let timestamp: i64 = v
648                                    .as_micros()
649                                    .try_into()
650                                    .expect("Failed to convert SystemTime to i64");
651                                Ok(timestamp)
652                            } else {
653                                Err(DataFusionError::Execution(format!(
654                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
655                                )))
656                            }
657                        }
658                    );
659                }
660                DataType::Timestamp(TimeUnit::Nanosecond, None) => {
661                    handle_primitive_type!(
662                        builder,
663                        field,
664                        col,
665                        TimestampNanosecondBuilder,
666                        SystemTime,
667                        row,
668                        idx,
669                        |v: SystemTime| {
670                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
671                                let timestamp: i64 = v
672                                    .as_nanos()
673                                    .try_into()
674                                    .expect("Failed to convert SystemTime to i64");
675                                Ok(timestamp)
676                            } else {
677                                Err(DataFusionError::Execution(format!(
678                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
679                                )))
680                            }
681                        }
682                    );
683                }
684                DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
685                    handle_primitive_type!(
686                        builder,
687                        field,
688                        col,
689                        TimestampNanosecondBuilder,
690                        chrono::DateTime<chrono::Utc>,
691                        row,
692                        idx,
693                        |v: chrono::DateTime<chrono::Utc>| {
694                            let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
695                            Ok::<_, DataFusionError>(timestamp)
696                        }
697                    );
698                }
699                DataType::Time64(TimeUnit::Microsecond) => {
700                    handle_primitive_type!(
701                        builder,
702                        field,
703                        col,
704                        Time64MicrosecondBuilder,
705                        chrono::NaiveTime,
706                        row,
707                        idx,
708                        |v: chrono::NaiveTime| {
709                            let seconds = i64::from(v.num_seconds_from_midnight());
710                            let microseconds = i64::from(v.nanosecond()) / 1000;
711                            Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
712                        }
713                    );
714                }
715                DataType::Time64(TimeUnit::Nanosecond) => {
716                    handle_primitive_type!(
717                        builder,
718                        field,
719                        col,
720                        Time64NanosecondBuilder,
721                        chrono::NaiveTime,
722                        row,
723                        idx,
724                        |v: chrono::NaiveTime| {
725                            let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
726                                * 1_000_000_000
727                                + i64::from(v.nanosecond());
728                            Ok::<_, DataFusionError>(timestamp)
729                        }
730                    );
731                }
732                DataType::Date32 => {
733                    handle_primitive_type!(
734                        builder,
735                        field,
736                        col,
737                        Date32Builder,
738                        chrono::NaiveDate,
739                        row,
740                        idx,
741                        |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
742                    );
743                }
744                DataType::Interval(IntervalUnit::MonthDayNano) => {
745                    handle_primitive_type!(
746                        builder,
747                        field,
748                        col,
749                        IntervalMonthDayNanoBuilder,
750                        IntervalFromSql,
751                        row,
752                        idx,
753                        |v: IntervalFromSql| {
754                            let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
755                                v.month,
756                                v.day,
757                                v.time * 1_000,
758                            );
759                            Ok::<_, DataFusionError>(interval_month_day_nano)
760                        }
761                    );
762                }
763                DataType::Boolean => {
764                    handle_primitive_type!(
765                        builder,
766                        field,
767                        col,
768                        BooleanBuilder,
769                        bool,
770                        row,
771                        idx,
772                        just_return
773                    );
774                }
775                DataType::List(inner) => match inner.data_type() {
776                    DataType::Int16 => {
777                        handle_primitive_array_type!(
778                            builder,
779                            field,
780                            col,
781                            Int16Builder,
782                            i16,
783                            row,
784                            idx
785                        );
786                    }
787                    DataType::Int32 => {
788                        handle_primitive_array_type!(
789                            builder,
790                            field,
791                            col,
792                            Int32Builder,
793                            i32,
794                            row,
795                            idx
796                        );
797                    }
798                    DataType::Int64 => {
799                        handle_primitive_array_type!(
800                            builder,
801                            field,
802                            col,
803                            Int64Builder,
804                            i64,
805                            row,
806                            idx
807                        );
808                    }
809                    DataType::Float32 => {
810                        handle_primitive_array_type!(
811                            builder,
812                            field,
813                            col,
814                            Float32Builder,
815                            f32,
816                            row,
817                            idx
818                        );
819                    }
820                    DataType::Float64 => {
821                        handle_primitive_array_type!(
822                            builder,
823                            field,
824                            col,
825                            Float64Builder,
826                            f64,
827                            row,
828                            idx
829                        );
830                    }
831                    DataType::Utf8 => {
832                        handle_primitive_array_type!(
833                            builder,
834                            field,
835                            col,
836                            StringBuilder,
837                            &str,
838                            row,
839                            idx
840                        );
841                    }
842                    DataType::Binary => {
843                        handle_primitive_array_type!(
844                            builder,
845                            field,
846                            col,
847                            BinaryBuilder,
848                            Vec<u8>,
849                            row,
850                            idx
851                        );
852                    }
853                    DataType::Boolean => {
854                        handle_primitive_array_type!(
855                            builder,
856                            field,
857                            col,
858                            BooleanBuilder,
859                            bool,
860                            row,
861                            idx
862                        );
863                    }
864                    _ => {
865                        return Err(DataFusionError::NotImplemented(format!(
866                            "Unsupported list data type {:?} for col: {:?}",
867                            field.data_type(),
868                            col
869                        )));
870                    }
871                },
872                _ => {
873                    return Err(DataFusionError::NotImplemented(format!(
874                        "Unsupported data type {:?} for col: {:?}",
875                        field.data_type(),
876                        col
877                    )));
878                }
879            }
880        }
881    }
882    let projected_columns = array_builders
883        .into_iter()
884        .enumerate()
885        .filter(|(idx, _)| projections_contains(projection, *idx))
886        .map(|(_, mut builder)| builder.finish())
887        .collect::<Vec<ArrayRef>>();
888    let options = RecordBatchOptions::new().with_row_count(Some(rows.len()));
889    Ok(RecordBatch::try_new_with_options(
890        projected_schema,
891        projected_columns,
892        &options,
893    )?)
894}