datafusion_remote_table/connection/
postgres.rs

1use crate::connection::{RemoteDbType, big_decimal_to_i128, just_return, projections_contains};
2use crate::{
3    Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4    RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14    Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15    IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, RecordBatchOptions,
16    StringBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
17    TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20    DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use derive_getters::Getters;
27use derive_with::With;
28use futures::StreamExt;
29use log::debug;
30use num_bigint::{BigInt, Sign};
31use std::string::ToString;
32use std::sync::Arc;
33use std::time::{SystemTime, UNIX_EPOCH};
34
35#[derive(Debug, Clone, With, Getters)]
36pub struct PostgresConnectionOptions {
37    pub(crate) host: String,
38    pub(crate) port: u16,
39    pub(crate) username: String,
40    pub(crate) password: String,
41    pub(crate) database: Option<String>,
42    pub(crate) pool_max_size: usize,
43    pub(crate) stream_chunk_size: usize,
44}
45
46impl PostgresConnectionOptions {
47    pub fn new(
48        host: impl Into<String>,
49        port: u16,
50        username: impl Into<String>,
51        password: impl Into<String>,
52    ) -> Self {
53        Self {
54            host: host.into(),
55            port,
56            username: username.into(),
57            password: password.into(),
58            database: None,
59            pool_max_size: 10,
60            stream_chunk_size: 2048,
61        }
62    }
63}
64
65#[derive(Debug)]
66pub struct PostgresPool {
67    pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
68}
69
70#[async_trait::async_trait]
71impl Pool for PostgresPool {
72    async fn get(&self) -> DFResult<Arc<dyn Connection>> {
73        let conn = self.pool.get_owned().await.map_err(|e| {
74            DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
75        })?;
76        Ok(Arc::new(PostgresConnection { conn }))
77    }
78}
79
80pub(crate) async fn connect_postgres(
81    options: &PostgresConnectionOptions,
82) -> DFResult<PostgresPool> {
83    let mut config = bb8_postgres::tokio_postgres::config::Config::new();
84    config
85        .host(&options.host)
86        .port(options.port)
87        .user(&options.username)
88        .password(&options.password);
89    if let Some(database) = &options.database {
90        config.dbname(database);
91    }
92    let manager = PostgresConnectionManager::new(config, NoTls);
93    let pool = bb8::Pool::builder()
94        .max_size(options.pool_max_size as u32)
95        .build(manager)
96        .await
97        .map_err(|e| {
98            DataFusionError::Execution(format!(
99                "Failed to create postgres connection pool due to {e}",
100            ))
101        })?;
102
103    Ok(PostgresPool { pool })
104}
105
106#[derive(Debug)]
107pub(crate) struct PostgresConnection {
108    conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
109}
110
111#[async_trait::async_trait]
112impl Connection for PostgresConnection {
113    async fn infer_schema(&self, sql: &str) -> DFResult<RemoteSchemaRef> {
114        let sql = RemoteDbType::Postgres.query_limit_1(sql);
115        let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
116            DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
117        })?;
118        let remote_schema = Arc::new(build_remote_schema(&row)?);
119        Ok(remote_schema)
120    }
121
122    async fn query(
123        &self,
124        conn_options: &ConnectionOptions,
125        sql: &str,
126        table_schema: SchemaRef,
127        projection: Option<&Vec<usize>>,
128        unparsed_filters: &[String],
129        limit: Option<usize>,
130    ) -> DFResult<SendableRecordBatchStream> {
131        let projected_schema = project_schema(&table_schema, projection)?;
132
133        let sql = RemoteDbType::Postgres.rewrite_query(sql, unparsed_filters, limit);
134        debug!("[remote-table] executing postgres query: {sql}");
135
136        let projection = projection.cloned();
137        let chunk_size = conn_options.stream_chunk_size();
138        let stream = self
139            .conn
140            .query_raw(&sql, Vec::<String>::new())
141            .await
142            .map_err(|e| {
143                DataFusionError::Execution(format!(
144                    "Failed to execute query {sql} on postgres: {e}",
145                ))
146            })?
147            .chunks(chunk_size)
148            .boxed();
149
150        let stream = stream.map(move |rows| {
151            let rows: Vec<Row> = rows
152                .into_iter()
153                .collect::<Result<Vec<_>, _>>()
154                .map_err(|e| {
155                    DataFusionError::Execution(format!(
156                        "Failed to collect rows from postgres due to {e}",
157                    ))
158                })?;
159            rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
160        });
161
162        Ok(Box::pin(RecordBatchStreamAdapter::new(
163            projected_schema,
164            stream,
165        )))
166    }
167}
168
169fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<PostgresType> {
170    match pg_type {
171        &Type::INT2 => Ok(PostgresType::Int2),
172        &Type::INT4 => Ok(PostgresType::Int4),
173        &Type::INT8 => Ok(PostgresType::Int8),
174        &Type::FLOAT4 => Ok(PostgresType::Float4),
175        &Type::FLOAT8 => Ok(PostgresType::Float8),
176        &Type::NUMERIC => {
177            let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
178                DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
179            })?;
180            let scale = match v {
181                Some(v) => v.scale,
182                None => 0,
183            };
184            assert!((scale as u32) <= (i8::MAX as u32));
185            Ok(PostgresType::Numeric(scale.try_into().unwrap_or_default()))
186        }
187        &Type::OID => Ok(PostgresType::Oid),
188        &Type::NAME => Ok(PostgresType::Name),
189        &Type::VARCHAR => Ok(PostgresType::Varchar),
190        &Type::BPCHAR => Ok(PostgresType::Bpchar),
191        &Type::TEXT => Ok(PostgresType::Text),
192        &Type::BYTEA => Ok(PostgresType::Bytea),
193        &Type::DATE => Ok(PostgresType::Date),
194        &Type::TIMESTAMP => Ok(PostgresType::Timestamp),
195        &Type::TIMESTAMPTZ => Ok(PostgresType::TimestampTz),
196        &Type::TIME => Ok(PostgresType::Time),
197        &Type::INTERVAL => Ok(PostgresType::Interval),
198        &Type::BOOL => Ok(PostgresType::Bool),
199        &Type::JSON => Ok(PostgresType::Json),
200        &Type::JSONB => Ok(PostgresType::Jsonb),
201        &Type::INT2_ARRAY => Ok(PostgresType::Int2Array),
202        &Type::INT4_ARRAY => Ok(PostgresType::Int4Array),
203        &Type::INT8_ARRAY => Ok(PostgresType::Int8Array),
204        &Type::FLOAT4_ARRAY => Ok(PostgresType::Float4Array),
205        &Type::FLOAT8_ARRAY => Ok(PostgresType::Float8Array),
206        &Type::VARCHAR_ARRAY => Ok(PostgresType::VarcharArray),
207        &Type::BPCHAR_ARRAY => Ok(PostgresType::BpcharArray),
208        &Type::TEXT_ARRAY => Ok(PostgresType::TextArray),
209        &Type::BYTEA_ARRAY => Ok(PostgresType::ByteaArray),
210        &Type::BOOL_ARRAY => Ok(PostgresType::BoolArray),
211        other if other.name().eq_ignore_ascii_case("geometry") => Ok(PostgresType::PostGisGeometry),
212        _ => Err(DataFusionError::NotImplemented(format!(
213            "Unsupported postgres type {pg_type:?}",
214        ))),
215    }
216}
217
218fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
219    let mut remote_fields = vec![];
220    for (idx, col) in row.columns().iter().enumerate() {
221        remote_fields.push(RemoteField::new(
222            col.name(),
223            RemoteType::Postgres(pg_type_to_remote_type(col.type_(), row, idx)?),
224            true,
225        ));
226    }
227    Ok(RemoteSchema::new(remote_fields))
228}
229
230macro_rules! handle_primitive_type {
231    ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
232        let builder = $builder
233            .as_any_mut()
234            .downcast_mut::<$builder_ty>()
235            .unwrap_or_else(|| {
236                panic!(
237                    "Failed to downcast builder to {} for {:?} and {:?}",
238                    stringify!($builder_ty),
239                    $field,
240                    $col
241                )
242            });
243        let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
244            DataFusionError::Execution(format!(
245                "Failed to get {} value for {:?} and {:?}: {e:?}",
246                stringify!($value_ty),
247                $field,
248                $col
249            ))
250        })?;
251
252        match v {
253            Some(v) => builder.append_value($convert(v)?),
254            None => builder.append_null(),
255        }
256    }};
257}
258
259macro_rules! handle_primitive_array_type {
260    ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
261        let builder = $builder
262            .as_any_mut()
263            .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
264            .unwrap_or_else(|| {
265                panic!(
266                    "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
267                    $field, $col
268                )
269            });
270        let values_builder = builder
271            .values()
272            .as_any_mut()
273            .downcast_mut::<$values_builder_ty>()
274            .unwrap_or_else(|| {
275                panic!(
276                    "Failed to downcast values builder to {} for {:?} and {:?}",
277                    stringify!($builder_ty),
278                    $field,
279                    $col,
280                )
281            });
282        let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
283            DataFusionError::Execution(format!(
284                "Failed to get {} array value for {:?} and {:?}: {e:?}",
285                stringify!($value_ty),
286                $field,
287                $col,
288            ))
289        })?;
290
291        match v {
292            Some(v) => {
293                let v = v.into_iter().map(Some);
294                values_builder.extend(v);
295                builder.append(true);
296            }
297            None => builder.append_null(),
298        }
299    }};
300}
301
302#[derive(Debug)]
303struct BigDecimalFromSql {
304    inner: BigDecimal,
305    scale: u16,
306}
307
308impl BigDecimalFromSql {
309    fn to_decimal_128(&self) -> Option<i128> {
310        big_decimal_to_i128(&self.inner, Some(self.scale as i32))
311    }
312}
313
314#[allow(clippy::cast_sign_loss)]
315#[allow(clippy::cast_possible_wrap)]
316#[allow(clippy::cast_possible_truncation)]
317impl<'a> FromSql<'a> for BigDecimalFromSql {
318    fn from_sql(
319        _ty: &Type,
320        raw: &'a [u8],
321    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
322        let raw_u16: Vec<u16> = raw
323            .chunks(2)
324            .map(|chunk| {
325                if chunk.len() == 2 {
326                    u16::from_be_bytes([chunk[0], chunk[1]])
327                } else {
328                    u16::from_be_bytes([chunk[0], 0])
329                }
330            })
331            .collect();
332
333        let base_10_000_digit_count = raw_u16[0];
334        let weight = raw_u16[1] as i16;
335        let sign = raw_u16[2];
336        let scale = raw_u16[3];
337
338        let mut base_10_000_digits = Vec::new();
339        for i in 4..4 + base_10_000_digit_count {
340            base_10_000_digits.push(raw_u16[i as usize]);
341        }
342
343        let mut u8_digits = Vec::new();
344        for &base_10_000_digit in base_10_000_digits.iter().rev() {
345            let mut base_10_000_digit = base_10_000_digit;
346            let mut temp_result = Vec::new();
347            while base_10_000_digit > 0 {
348                temp_result.push((base_10_000_digit % 10) as u8);
349                base_10_000_digit /= 10;
350            }
351            while temp_result.len() < 4 {
352                temp_result.push(0);
353            }
354            u8_digits.extend(temp_result);
355        }
356        u8_digits.reverse();
357
358        let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
359        let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
360        u8_digits.resize(size as usize, 0);
361
362        let sign = match sign {
363            0x4000 => Sign::Minus,
364            0x0000 => Sign::Plus,
365            _ => {
366                return Err(Box::new(DataFusionError::Execution(
367                    "Failed to parse big decimal from postgres numeric value".to_string(),
368                )));
369            }
370        };
371
372        let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
373            return Err(Box::new(DataFusionError::Execution(
374                "Failed to parse big decimal from postgres numeric value".to_string(),
375            )));
376        };
377        Ok(BigDecimalFromSql {
378            inner: BigDecimal::new(digits, i64::from(scale)),
379            scale,
380        })
381    }
382
383    fn accepts(ty: &Type) -> bool {
384        matches!(*ty, Type::NUMERIC)
385    }
386}
387
388// interval_send - Postgres C (https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/timestamp.c#L1032)
389// interval values are internally stored as three integral fields: months, days, and microseconds
390#[derive(Debug)]
391struct IntervalFromSql {
392    time: i64,
393    day: i32,
394    month: i32,
395}
396
397impl<'a> FromSql<'a> for IntervalFromSql {
398    fn from_sql(
399        _ty: &Type,
400        raw: &'a [u8],
401    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
402        let mut cursor = std::io::Cursor::new(raw);
403
404        let time = cursor.read_i64::<BigEndian>()?;
405        let day = cursor.read_i32::<BigEndian>()?;
406        let month = cursor.read_i32::<BigEndian>()?;
407
408        Ok(IntervalFromSql { time, day, month })
409    }
410
411    fn accepts(ty: &Type) -> bool {
412        matches!(*ty, Type::INTERVAL)
413    }
414}
415
416struct GeometryFromSql<'a> {
417    wkb: &'a [u8],
418}
419
420impl<'a> FromSql<'a> for GeometryFromSql<'a> {
421    fn from_sql(
422        _ty: &Type,
423        raw: &'a [u8],
424    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
425        Ok(GeometryFromSql { wkb: raw })
426    }
427
428    fn accepts(ty: &Type) -> bool {
429        matches!(ty.name(), "geometry")
430    }
431}
432
433fn rows_to_batch(
434    rows: &[Row],
435    table_schema: &SchemaRef,
436    projection: Option<&Vec<usize>>,
437) -> DFResult<RecordBatch> {
438    let projected_schema = project_schema(table_schema, projection)?;
439    let mut array_builders = vec![];
440    for field in table_schema.fields() {
441        let builder = make_builder(field.data_type(), rows.len());
442        array_builders.push(builder);
443    }
444
445    for row in rows {
446        for (idx, field) in table_schema.fields.iter().enumerate() {
447            if !projections_contains(projection, idx) {
448                continue;
449            }
450            let builder = &mut array_builders[idx];
451            let col = row.columns().get(idx);
452            match field.data_type() {
453                DataType::Int16 => {
454                    handle_primitive_type!(
455                        builder,
456                        field,
457                        col,
458                        Int16Builder,
459                        i16,
460                        row,
461                        idx,
462                        just_return
463                    );
464                }
465                DataType::Int32 => {
466                    handle_primitive_type!(
467                        builder,
468                        field,
469                        col,
470                        Int32Builder,
471                        i32,
472                        row,
473                        idx,
474                        just_return
475                    );
476                }
477                DataType::UInt32 => {
478                    handle_primitive_type!(
479                        builder,
480                        field,
481                        col,
482                        UInt32Builder,
483                        u32,
484                        row,
485                        idx,
486                        just_return
487                    );
488                }
489                DataType::Int64 => {
490                    handle_primitive_type!(
491                        builder,
492                        field,
493                        col,
494                        Int64Builder,
495                        i64,
496                        row,
497                        idx,
498                        just_return
499                    );
500                }
501                DataType::Float32 => {
502                    handle_primitive_type!(
503                        builder,
504                        field,
505                        col,
506                        Float32Builder,
507                        f32,
508                        row,
509                        idx,
510                        just_return
511                    );
512                }
513                DataType::Float64 => {
514                    handle_primitive_type!(
515                        builder,
516                        field,
517                        col,
518                        Float64Builder,
519                        f64,
520                        row,
521                        idx,
522                        just_return
523                    );
524                }
525                DataType::Decimal128(_precision, _scale) => {
526                    handle_primitive_type!(
527                        builder,
528                        field,
529                        col,
530                        Decimal128Builder,
531                        BigDecimalFromSql,
532                        row,
533                        idx,
534                        |v: BigDecimalFromSql| {
535                            v.to_decimal_128().ok_or_else(|| {
536                                DataFusionError::Execution(format!(
537                                    "Failed to convert BigDecimal {v:?} to i128",
538                                ))
539                            })
540                        }
541                    );
542                }
543                DataType::Utf8 => {
544                    handle_primitive_type!(
545                        builder,
546                        field,
547                        col,
548                        StringBuilder,
549                        &str,
550                        row,
551                        idx,
552                        just_return
553                    );
554                }
555                DataType::LargeUtf8 => {
556                    if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
557                        handle_primitive_type!(
558                            builder,
559                            field,
560                            col,
561                            LargeStringBuilder,
562                            serde_json::value::Value,
563                            row,
564                            idx,
565                            |v: serde_json::value::Value| {
566                                Ok::<_, DataFusionError>(v.to_string())
567                            }
568                        );
569                    } else {
570                        handle_primitive_type!(
571                            builder,
572                            field,
573                            col,
574                            LargeStringBuilder,
575                            &str,
576                            row,
577                            idx,
578                            just_return
579                        );
580                    }
581                }
582                DataType::Binary => {
583                    if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
584                    {
585                        let convert: for<'a> fn(GeometryFromSql<'a>) -> DFResult<&'a [u8]> =
586                            |v| Ok(v.wkb);
587                        handle_primitive_type!(
588                            builder,
589                            field,
590                            col,
591                            BinaryBuilder,
592                            GeometryFromSql,
593                            row,
594                            idx,
595                            convert
596                        );
597                    } else if col.is_some()
598                        && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
599                    {
600                        handle_primitive_type!(
601                            builder,
602                            field,
603                            col,
604                            BinaryBuilder,
605                            serde_json::value::Value,
606                            row,
607                            idx,
608                            |v: serde_json::value::Value| {
609                                Ok::<_, DataFusionError>(v.to_string().into_bytes())
610                            }
611                        );
612                    } else {
613                        handle_primitive_type!(
614                            builder,
615                            field,
616                            col,
617                            BinaryBuilder,
618                            Vec<u8>,
619                            row,
620                            idx,
621                            just_return
622                        );
623                    }
624                }
625                DataType::Timestamp(TimeUnit::Microsecond, None) => {
626                    handle_primitive_type!(
627                        builder,
628                        field,
629                        col,
630                        TimestampMicrosecondBuilder,
631                        SystemTime,
632                        row,
633                        idx,
634                        |v: SystemTime| {
635                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
636                                let timestamp: i64 = v
637                                    .as_micros()
638                                    .try_into()
639                                    .expect("Failed to convert SystemTime to i64");
640                                Ok(timestamp)
641                            } else {
642                                Err(DataFusionError::Execution(format!(
643                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
644                                )))
645                            }
646                        }
647                    );
648                }
649                DataType::Timestamp(TimeUnit::Nanosecond, None) => {
650                    handle_primitive_type!(
651                        builder,
652                        field,
653                        col,
654                        TimestampNanosecondBuilder,
655                        SystemTime,
656                        row,
657                        idx,
658                        |v: SystemTime| {
659                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
660                                let timestamp: i64 = v
661                                    .as_nanos()
662                                    .try_into()
663                                    .expect("Failed to convert SystemTime to i64");
664                                Ok(timestamp)
665                            } else {
666                                Err(DataFusionError::Execution(format!(
667                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
668                                )))
669                            }
670                        }
671                    );
672                }
673                DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
674                    handle_primitive_type!(
675                        builder,
676                        field,
677                        col,
678                        TimestampNanosecondBuilder,
679                        chrono::DateTime<chrono::Utc>,
680                        row,
681                        idx,
682                        |v: chrono::DateTime<chrono::Utc>| {
683                            let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
684                            Ok::<_, DataFusionError>(timestamp)
685                        }
686                    );
687                }
688                DataType::Time64(TimeUnit::Microsecond) => {
689                    handle_primitive_type!(
690                        builder,
691                        field,
692                        col,
693                        Time64MicrosecondBuilder,
694                        chrono::NaiveTime,
695                        row,
696                        idx,
697                        |v: chrono::NaiveTime| {
698                            let seconds = i64::from(v.num_seconds_from_midnight());
699                            let microseconds = i64::from(v.nanosecond()) / 1000;
700                            Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
701                        }
702                    );
703                }
704                DataType::Time64(TimeUnit::Nanosecond) => {
705                    handle_primitive_type!(
706                        builder,
707                        field,
708                        col,
709                        Time64NanosecondBuilder,
710                        chrono::NaiveTime,
711                        row,
712                        idx,
713                        |v: chrono::NaiveTime| {
714                            let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
715                                * 1_000_000_000
716                                + i64::from(v.nanosecond());
717                            Ok::<_, DataFusionError>(timestamp)
718                        }
719                    );
720                }
721                DataType::Date32 => {
722                    handle_primitive_type!(
723                        builder,
724                        field,
725                        col,
726                        Date32Builder,
727                        chrono::NaiveDate,
728                        row,
729                        idx,
730                        |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
731                    );
732                }
733                DataType::Interval(IntervalUnit::MonthDayNano) => {
734                    handle_primitive_type!(
735                        builder,
736                        field,
737                        col,
738                        IntervalMonthDayNanoBuilder,
739                        IntervalFromSql,
740                        row,
741                        idx,
742                        |v: IntervalFromSql| {
743                            let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
744                                v.month,
745                                v.day,
746                                v.time * 1_000,
747                            );
748                            Ok::<_, DataFusionError>(interval_month_day_nano)
749                        }
750                    );
751                }
752                DataType::Boolean => {
753                    handle_primitive_type!(
754                        builder,
755                        field,
756                        col,
757                        BooleanBuilder,
758                        bool,
759                        row,
760                        idx,
761                        just_return
762                    );
763                }
764                DataType::List(inner) => match inner.data_type() {
765                    DataType::Int16 => {
766                        handle_primitive_array_type!(
767                            builder,
768                            field,
769                            col,
770                            Int16Builder,
771                            i16,
772                            row,
773                            idx
774                        );
775                    }
776                    DataType::Int32 => {
777                        handle_primitive_array_type!(
778                            builder,
779                            field,
780                            col,
781                            Int32Builder,
782                            i32,
783                            row,
784                            idx
785                        );
786                    }
787                    DataType::Int64 => {
788                        handle_primitive_array_type!(
789                            builder,
790                            field,
791                            col,
792                            Int64Builder,
793                            i64,
794                            row,
795                            idx
796                        );
797                    }
798                    DataType::Float32 => {
799                        handle_primitive_array_type!(
800                            builder,
801                            field,
802                            col,
803                            Float32Builder,
804                            f32,
805                            row,
806                            idx
807                        );
808                    }
809                    DataType::Float64 => {
810                        handle_primitive_array_type!(
811                            builder,
812                            field,
813                            col,
814                            Float64Builder,
815                            f64,
816                            row,
817                            idx
818                        );
819                    }
820                    DataType::Utf8 => {
821                        handle_primitive_array_type!(
822                            builder,
823                            field,
824                            col,
825                            StringBuilder,
826                            &str,
827                            row,
828                            idx
829                        );
830                    }
831                    DataType::Binary => {
832                        handle_primitive_array_type!(
833                            builder,
834                            field,
835                            col,
836                            BinaryBuilder,
837                            Vec<u8>,
838                            row,
839                            idx
840                        );
841                    }
842                    DataType::Boolean => {
843                        handle_primitive_array_type!(
844                            builder,
845                            field,
846                            col,
847                            BooleanBuilder,
848                            bool,
849                            row,
850                            idx
851                        );
852                    }
853                    _ => {
854                        return Err(DataFusionError::NotImplemented(format!(
855                            "Unsupported list data type {:?} for col: {:?}",
856                            field.data_type(),
857                            col
858                        )));
859                    }
860                },
861                _ => {
862                    return Err(DataFusionError::NotImplemented(format!(
863                        "Unsupported data type {:?} for col: {:?}",
864                        field.data_type(),
865                        col
866                    )));
867                }
868            }
869        }
870    }
871    let projected_columns = array_builders
872        .into_iter()
873        .enumerate()
874        .filter(|(idx, _)| projections_contains(projection, *idx))
875        .map(|(_, mut builder)| builder.finish())
876        .collect::<Vec<ArrayRef>>();
877    let options = RecordBatchOptions::new().with_row_count(Some(rows.len()));
878    Ok(RecordBatch::try_new_with_options(
879        projected_schema,
880        projected_columns,
881        &options,
882    )?)
883}