datafusion_remote_table/connection/
postgres.rs

1use crate::connection::{RemoteDbType, big_decimal_to_i128, just_return, projections_contains};
2use crate::{
3    Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4    RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14    Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15    IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
16    Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
17    TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20    DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use derive_getters::Getters;
27use derive_with::With;
28use futures::StreamExt;
29use num_bigint::{BigInt, Sign};
30use std::string::ToString;
31use std::sync::Arc;
32use std::time::{SystemTime, UNIX_EPOCH};
33
34#[derive(Debug, Clone, With, Getters)]
35pub struct PostgresConnectionOptions {
36    pub(crate) host: String,
37    pub(crate) port: u16,
38    pub(crate) username: String,
39    pub(crate) password: String,
40    pub(crate) database: Option<String>,
41    pub(crate) pool_max_size: usize,
42    pub(crate) stream_chunk_size: usize,
43}
44
45impl PostgresConnectionOptions {
46    pub fn new(
47        host: impl Into<String>,
48        port: u16,
49        username: impl Into<String>,
50        password: impl Into<String>,
51    ) -> Self {
52        Self {
53            host: host.into(),
54            port,
55            username: username.into(),
56            password: password.into(),
57            database: None,
58            pool_max_size: 10,
59            stream_chunk_size: 2048,
60        }
61    }
62}
63
64#[derive(Debug)]
65pub struct PostgresPool {
66    pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
67}
68
69#[async_trait::async_trait]
70impl Pool for PostgresPool {
71    async fn get(&self) -> DFResult<Arc<dyn Connection>> {
72        let conn = self.pool.get_owned().await.map_err(|e| {
73            DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
74        })?;
75        Ok(Arc::new(PostgresConnection { conn }))
76    }
77}
78
79pub(crate) async fn connect_postgres(
80    options: &PostgresConnectionOptions,
81) -> DFResult<PostgresPool> {
82    let mut config = bb8_postgres::tokio_postgres::config::Config::new();
83    config
84        .host(&options.host)
85        .port(options.port)
86        .user(&options.username)
87        .password(&options.password);
88    if let Some(database) = &options.database {
89        config.dbname(database);
90    }
91    let manager = PostgresConnectionManager::new(config, NoTls);
92    let pool = bb8::Pool::builder()
93        .max_size(options.pool_max_size as u32)
94        .build(manager)
95        .await
96        .map_err(|e| {
97            DataFusionError::Execution(format!(
98                "Failed to create postgres connection pool due to {e}",
99            ))
100        })?;
101
102    Ok(PostgresPool { pool })
103}
104
105#[derive(Debug)]
106pub(crate) struct PostgresConnection {
107    conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
108}
109
110#[async_trait::async_trait]
111impl Connection for PostgresConnection {
112    async fn infer_schema(&self, sql: &str) -> DFResult<RemoteSchemaRef> {
113        let sql = RemoteDbType::Postgres.query_limit_1(sql)?;
114        let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
115            DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
116        })?;
117        let remote_schema = Arc::new(build_remote_schema(&row)?);
118        Ok(remote_schema)
119    }
120
121    async fn query(
122        &self,
123        conn_options: &ConnectionOptions,
124        sql: &str,
125        table_schema: SchemaRef,
126        projection: Option<&Vec<usize>>,
127        unparsed_filters: &[String],
128        limit: Option<usize>,
129    ) -> DFResult<SendableRecordBatchStream> {
130        let projected_schema = project_schema(&table_schema, projection)?;
131        let sql = RemoteDbType::Postgres.try_rewrite_query(sql, unparsed_filters, limit)?;
132        let projection = projection.cloned();
133        let chunk_size = conn_options.stream_chunk_size();
134        let stream = self
135            .conn
136            .query_raw(&sql, Vec::<String>::new())
137            .await
138            .map_err(|e| {
139                DataFusionError::Execution(format!(
140                    "Failed to execute query {sql} on postgres: {e}",
141                ))
142            })?
143            .chunks(chunk_size)
144            .boxed();
145
146        let stream = stream.map(move |rows| {
147            let rows: Vec<Row> = rows
148                .into_iter()
149                .collect::<Result<Vec<_>, _>>()
150                .map_err(|e| {
151                    DataFusionError::Execution(format!(
152                        "Failed to collect rows from postgres due to {e}",
153                    ))
154                })?;
155            rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
156        });
157
158        Ok(Box::pin(RecordBatchStreamAdapter::new(
159            projected_schema,
160            stream,
161        )))
162    }
163}
164
165fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<PostgresType> {
166    match pg_type {
167        &Type::INT2 => Ok(PostgresType::Int2),
168        &Type::INT4 => Ok(PostgresType::Int4),
169        &Type::INT8 => Ok(PostgresType::Int8),
170        &Type::FLOAT4 => Ok(PostgresType::Float4),
171        &Type::FLOAT8 => Ok(PostgresType::Float8),
172        &Type::NUMERIC => {
173            let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
174                DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
175            })?;
176            let scale = match v {
177                Some(v) => v.scale,
178                None => 0,
179            };
180            assert!((scale as u32) <= (i8::MAX as u32));
181            Ok(PostgresType::Numeric(scale.try_into().unwrap_or_default()))
182        }
183        &Type::OID => Ok(PostgresType::Oid),
184        &Type::NAME => Ok(PostgresType::Name),
185        &Type::VARCHAR => Ok(PostgresType::Varchar),
186        &Type::BPCHAR => Ok(PostgresType::Bpchar),
187        &Type::TEXT => Ok(PostgresType::Text),
188        &Type::BYTEA => Ok(PostgresType::Bytea),
189        &Type::DATE => Ok(PostgresType::Date),
190        &Type::TIMESTAMP => Ok(PostgresType::Timestamp),
191        &Type::TIMESTAMPTZ => Ok(PostgresType::TimestampTz),
192        &Type::TIME => Ok(PostgresType::Time),
193        &Type::INTERVAL => Ok(PostgresType::Interval),
194        &Type::BOOL => Ok(PostgresType::Bool),
195        &Type::JSON => Ok(PostgresType::Json),
196        &Type::JSONB => Ok(PostgresType::Jsonb),
197        &Type::INT2_ARRAY => Ok(PostgresType::Int2Array),
198        &Type::INT4_ARRAY => Ok(PostgresType::Int4Array),
199        &Type::INT8_ARRAY => Ok(PostgresType::Int8Array),
200        &Type::FLOAT4_ARRAY => Ok(PostgresType::Float4Array),
201        &Type::FLOAT8_ARRAY => Ok(PostgresType::Float8Array),
202        &Type::VARCHAR_ARRAY => Ok(PostgresType::VarcharArray),
203        &Type::BPCHAR_ARRAY => Ok(PostgresType::BpcharArray),
204        &Type::TEXT_ARRAY => Ok(PostgresType::TextArray),
205        &Type::BYTEA_ARRAY => Ok(PostgresType::ByteaArray),
206        &Type::BOOL_ARRAY => Ok(PostgresType::BoolArray),
207        other if other.name().eq_ignore_ascii_case("geometry") => Ok(PostgresType::PostGisGeometry),
208        _ => Err(DataFusionError::NotImplemented(format!(
209            "Unsupported postgres type {pg_type:?}",
210        ))),
211    }
212}
213
214fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
215    let mut remote_fields = vec![];
216    for (idx, col) in row.columns().iter().enumerate() {
217        remote_fields.push(RemoteField::new(
218            col.name(),
219            RemoteType::Postgres(pg_type_to_remote_type(col.type_(), row, idx)?),
220            true,
221        ));
222    }
223    Ok(RemoteSchema::new(remote_fields))
224}
225
226macro_rules! handle_primitive_type {
227    ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
228        let builder = $builder
229            .as_any_mut()
230            .downcast_mut::<$builder_ty>()
231            .unwrap_or_else(|| {
232                panic!(
233                    "Failed to downcast builder to {} for {:?} and {:?}",
234                    stringify!($builder_ty),
235                    $field,
236                    $col
237                )
238            });
239        let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
240            DataFusionError::Execution(format!(
241                "Failed to get {} value for {:?} and {:?}: {e:?}",
242                stringify!($value_ty),
243                $field,
244                $col
245            ))
246        })?;
247
248        match v {
249            Some(v) => builder.append_value($convert(v)?),
250            None => builder.append_null(),
251        }
252    }};
253}
254
255macro_rules! handle_primitive_array_type {
256    ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
257        let builder = $builder
258            .as_any_mut()
259            .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
260            .unwrap_or_else(|| {
261                panic!(
262                    "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
263                    $field, $col
264                )
265            });
266        let values_builder = builder
267            .values()
268            .as_any_mut()
269            .downcast_mut::<$values_builder_ty>()
270            .unwrap_or_else(|| {
271                panic!(
272                    "Failed to downcast values builder to {} for {:?} and {:?}",
273                    stringify!($builder_ty),
274                    $field,
275                    $col,
276                )
277            });
278        let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
279            DataFusionError::Execution(format!(
280                "Failed to get {} array value for {:?} and {:?}: {e:?}",
281                stringify!($value_ty),
282                $field,
283                $col,
284            ))
285        })?;
286
287        match v {
288            Some(v) => {
289                let v = v.into_iter().map(Some);
290                values_builder.extend(v);
291                builder.append(true);
292            }
293            None => builder.append_null(),
294        }
295    }};
296}
297
298#[derive(Debug)]
299struct BigDecimalFromSql {
300    inner: BigDecimal,
301    scale: u16,
302}
303
304impl BigDecimalFromSql {
305    fn to_decimal_128(&self) -> Option<i128> {
306        big_decimal_to_i128(&self.inner, Some(self.scale as i32))
307    }
308}
309
310#[allow(clippy::cast_sign_loss)]
311#[allow(clippy::cast_possible_wrap)]
312#[allow(clippy::cast_possible_truncation)]
313impl<'a> FromSql<'a> for BigDecimalFromSql {
314    fn from_sql(
315        _ty: &Type,
316        raw: &'a [u8],
317    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
318        let raw_u16: Vec<u16> = raw
319            .chunks(2)
320            .map(|chunk| {
321                if chunk.len() == 2 {
322                    u16::from_be_bytes([chunk[0], chunk[1]])
323                } else {
324                    u16::from_be_bytes([chunk[0], 0])
325                }
326            })
327            .collect();
328
329        let base_10_000_digit_count = raw_u16[0];
330        let weight = raw_u16[1] as i16;
331        let sign = raw_u16[2];
332        let scale = raw_u16[3];
333
334        let mut base_10_000_digits = Vec::new();
335        for i in 4..4 + base_10_000_digit_count {
336            base_10_000_digits.push(raw_u16[i as usize]);
337        }
338
339        let mut u8_digits = Vec::new();
340        for &base_10_000_digit in base_10_000_digits.iter().rev() {
341            let mut base_10_000_digit = base_10_000_digit;
342            let mut temp_result = Vec::new();
343            while base_10_000_digit > 0 {
344                temp_result.push((base_10_000_digit % 10) as u8);
345                base_10_000_digit /= 10;
346            }
347            while temp_result.len() < 4 {
348                temp_result.push(0);
349            }
350            u8_digits.extend(temp_result);
351        }
352        u8_digits.reverse();
353
354        let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
355        let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
356        u8_digits.resize(size as usize, 0);
357
358        let sign = match sign {
359            0x4000 => Sign::Minus,
360            0x0000 => Sign::Plus,
361            _ => {
362                return Err(Box::new(DataFusionError::Execution(
363                    "Failed to parse big decimal from postgres numeric value".to_string(),
364                )));
365            }
366        };
367
368        let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
369            return Err(Box::new(DataFusionError::Execution(
370                "Failed to parse big decimal from postgres numeric value".to_string(),
371            )));
372        };
373        Ok(BigDecimalFromSql {
374            inner: BigDecimal::new(digits, i64::from(scale)),
375            scale,
376        })
377    }
378
379    fn accepts(ty: &Type) -> bool {
380        matches!(*ty, Type::NUMERIC)
381    }
382}
383
384// interval_send - Postgres C (https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/timestamp.c#L1032)
385// interval values are internally stored as three integral fields: months, days, and microseconds
386#[derive(Debug)]
387struct IntervalFromSql {
388    time: i64,
389    day: i32,
390    month: i32,
391}
392
393impl<'a> FromSql<'a> for IntervalFromSql {
394    fn from_sql(
395        _ty: &Type,
396        raw: &'a [u8],
397    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
398        let mut cursor = std::io::Cursor::new(raw);
399
400        let time = cursor.read_i64::<BigEndian>()?;
401        let day = cursor.read_i32::<BigEndian>()?;
402        let month = cursor.read_i32::<BigEndian>()?;
403
404        Ok(IntervalFromSql { time, day, month })
405    }
406
407    fn accepts(ty: &Type) -> bool {
408        matches!(*ty, Type::INTERVAL)
409    }
410}
411
412struct GeometryFromSql<'a> {
413    wkb: &'a [u8],
414}
415
416impl<'a> FromSql<'a> for GeometryFromSql<'a> {
417    fn from_sql(
418        _ty: &Type,
419        raw: &'a [u8],
420    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
421        Ok(GeometryFromSql { wkb: raw })
422    }
423
424    fn accepts(ty: &Type) -> bool {
425        matches!(ty.name(), "geometry")
426    }
427}
428
429fn rows_to_batch(
430    rows: &[Row],
431    table_schema: &SchemaRef,
432    projection: Option<&Vec<usize>>,
433) -> DFResult<RecordBatch> {
434    let projected_schema = project_schema(table_schema, projection)?;
435    let mut array_builders = vec![];
436    for field in table_schema.fields() {
437        let builder = make_builder(field.data_type(), rows.len());
438        array_builders.push(builder);
439    }
440
441    for row in rows {
442        for (idx, field) in table_schema.fields.iter().enumerate() {
443            if !projections_contains(projection, idx) {
444                continue;
445            }
446            let builder = &mut array_builders[idx];
447            let col = row.columns().get(idx);
448            match field.data_type() {
449                DataType::Int16 => {
450                    handle_primitive_type!(
451                        builder,
452                        field,
453                        col,
454                        Int16Builder,
455                        i16,
456                        row,
457                        idx,
458                        just_return
459                    );
460                }
461                DataType::Int32 => {
462                    handle_primitive_type!(
463                        builder,
464                        field,
465                        col,
466                        Int32Builder,
467                        i32,
468                        row,
469                        idx,
470                        just_return
471                    );
472                }
473                DataType::UInt32 => {
474                    handle_primitive_type!(
475                        builder,
476                        field,
477                        col,
478                        UInt32Builder,
479                        u32,
480                        row,
481                        idx,
482                        just_return
483                    );
484                }
485                DataType::Int64 => {
486                    handle_primitive_type!(
487                        builder,
488                        field,
489                        col,
490                        Int64Builder,
491                        i64,
492                        row,
493                        idx,
494                        just_return
495                    );
496                }
497                DataType::Float32 => {
498                    handle_primitive_type!(
499                        builder,
500                        field,
501                        col,
502                        Float32Builder,
503                        f32,
504                        row,
505                        idx,
506                        just_return
507                    );
508                }
509                DataType::Float64 => {
510                    handle_primitive_type!(
511                        builder,
512                        field,
513                        col,
514                        Float64Builder,
515                        f64,
516                        row,
517                        idx,
518                        just_return
519                    );
520                }
521                DataType::Decimal128(_precision, _scale) => {
522                    handle_primitive_type!(
523                        builder,
524                        field,
525                        col,
526                        Decimal128Builder,
527                        BigDecimalFromSql,
528                        row,
529                        idx,
530                        |v: BigDecimalFromSql| {
531                            v.to_decimal_128().ok_or_else(|| {
532                                DataFusionError::Execution(format!(
533                                    "Failed to convert BigDecimal {v:?} to i128",
534                                ))
535                            })
536                        }
537                    );
538                }
539                DataType::Utf8 => {
540                    handle_primitive_type!(
541                        builder,
542                        field,
543                        col,
544                        StringBuilder,
545                        &str,
546                        row,
547                        idx,
548                        just_return
549                    );
550                }
551                DataType::LargeUtf8 => {
552                    if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
553                        handle_primitive_type!(
554                            builder,
555                            field,
556                            col,
557                            LargeStringBuilder,
558                            serde_json::value::Value,
559                            row,
560                            idx,
561                            |v: serde_json::value::Value| {
562                                Ok::<_, DataFusionError>(v.to_string())
563                            }
564                        );
565                    } else {
566                        handle_primitive_type!(
567                            builder,
568                            field,
569                            col,
570                            LargeStringBuilder,
571                            &str,
572                            row,
573                            idx,
574                            just_return
575                        );
576                    }
577                }
578                DataType::Binary => {
579                    if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
580                    {
581                        let convert: for<'a> fn(GeometryFromSql<'a>) -> DFResult<&'a [u8]> =
582                            |v| Ok(v.wkb);
583                        handle_primitive_type!(
584                            builder,
585                            field,
586                            col,
587                            BinaryBuilder,
588                            GeometryFromSql,
589                            row,
590                            idx,
591                            convert
592                        );
593                    } else if col.is_some()
594                        && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
595                    {
596                        handle_primitive_type!(
597                            builder,
598                            field,
599                            col,
600                            BinaryBuilder,
601                            serde_json::value::Value,
602                            row,
603                            idx,
604                            |v: serde_json::value::Value| {
605                                Ok::<_, DataFusionError>(v.to_string().into_bytes())
606                            }
607                        );
608                    } else {
609                        handle_primitive_type!(
610                            builder,
611                            field,
612                            col,
613                            BinaryBuilder,
614                            Vec<u8>,
615                            row,
616                            idx,
617                            just_return
618                        );
619                    }
620                }
621                DataType::Timestamp(TimeUnit::Microsecond, None) => {
622                    handle_primitive_type!(
623                        builder,
624                        field,
625                        col,
626                        TimestampMicrosecondBuilder,
627                        SystemTime,
628                        row,
629                        idx,
630                        |v: SystemTime| {
631                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
632                                let timestamp: i64 = v
633                                    .as_micros()
634                                    .try_into()
635                                    .expect("Failed to convert SystemTime to i64");
636                                Ok(timestamp)
637                            } else {
638                                Err(DataFusionError::Execution(format!(
639                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
640                                )))
641                            }
642                        }
643                    );
644                }
645                DataType::Timestamp(TimeUnit::Nanosecond, None) => {
646                    handle_primitive_type!(
647                        builder,
648                        field,
649                        col,
650                        TimestampNanosecondBuilder,
651                        SystemTime,
652                        row,
653                        idx,
654                        |v: SystemTime| {
655                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
656                                let timestamp: i64 = v
657                                    .as_nanos()
658                                    .try_into()
659                                    .expect("Failed to convert SystemTime to i64");
660                                Ok(timestamp)
661                            } else {
662                                Err(DataFusionError::Execution(format!(
663                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
664                                )))
665                            }
666                        }
667                    );
668                }
669                DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
670                    handle_primitive_type!(
671                        builder,
672                        field,
673                        col,
674                        TimestampNanosecondBuilder,
675                        chrono::DateTime<chrono::Utc>,
676                        row,
677                        idx,
678                        |v: chrono::DateTime<chrono::Utc>| {
679                            let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
680                            Ok::<_, DataFusionError>(timestamp)
681                        }
682                    );
683                }
684                DataType::Time64(TimeUnit::Microsecond) => {
685                    handle_primitive_type!(
686                        builder,
687                        field,
688                        col,
689                        Time64MicrosecondBuilder,
690                        chrono::NaiveTime,
691                        row,
692                        idx,
693                        |v: chrono::NaiveTime| {
694                            let seconds = i64::from(v.num_seconds_from_midnight());
695                            let microseconds = i64::from(v.nanosecond()) / 1000;
696                            Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
697                        }
698                    );
699                }
700                DataType::Time64(TimeUnit::Nanosecond) => {
701                    handle_primitive_type!(
702                        builder,
703                        field,
704                        col,
705                        Time64NanosecondBuilder,
706                        chrono::NaiveTime,
707                        row,
708                        idx,
709                        |v: chrono::NaiveTime| {
710                            let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
711                                * 1_000_000_000
712                                + i64::from(v.nanosecond());
713                            Ok::<_, DataFusionError>(timestamp)
714                        }
715                    );
716                }
717                DataType::Date32 => {
718                    handle_primitive_type!(
719                        builder,
720                        field,
721                        col,
722                        Date32Builder,
723                        chrono::NaiveDate,
724                        row,
725                        idx,
726                        |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
727                    );
728                }
729                DataType::Interval(IntervalUnit::MonthDayNano) => {
730                    handle_primitive_type!(
731                        builder,
732                        field,
733                        col,
734                        IntervalMonthDayNanoBuilder,
735                        IntervalFromSql,
736                        row,
737                        idx,
738                        |v: IntervalFromSql| {
739                            let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
740                                v.month,
741                                v.day,
742                                v.time * 1_000,
743                            );
744                            Ok::<_, DataFusionError>(interval_month_day_nano)
745                        }
746                    );
747                }
748                DataType::Boolean => {
749                    handle_primitive_type!(
750                        builder,
751                        field,
752                        col,
753                        BooleanBuilder,
754                        bool,
755                        row,
756                        idx,
757                        just_return
758                    );
759                }
760                DataType::List(inner) => match inner.data_type() {
761                    DataType::Int16 => {
762                        handle_primitive_array_type!(
763                            builder,
764                            field,
765                            col,
766                            Int16Builder,
767                            i16,
768                            row,
769                            idx
770                        );
771                    }
772                    DataType::Int32 => {
773                        handle_primitive_array_type!(
774                            builder,
775                            field,
776                            col,
777                            Int32Builder,
778                            i32,
779                            row,
780                            idx
781                        );
782                    }
783                    DataType::Int64 => {
784                        handle_primitive_array_type!(
785                            builder,
786                            field,
787                            col,
788                            Int64Builder,
789                            i64,
790                            row,
791                            idx
792                        );
793                    }
794                    DataType::Float32 => {
795                        handle_primitive_array_type!(
796                            builder,
797                            field,
798                            col,
799                            Float32Builder,
800                            f32,
801                            row,
802                            idx
803                        );
804                    }
805                    DataType::Float64 => {
806                        handle_primitive_array_type!(
807                            builder,
808                            field,
809                            col,
810                            Float64Builder,
811                            f64,
812                            row,
813                            idx
814                        );
815                    }
816                    DataType::Utf8 => {
817                        handle_primitive_array_type!(
818                            builder,
819                            field,
820                            col,
821                            StringBuilder,
822                            &str,
823                            row,
824                            idx
825                        );
826                    }
827                    DataType::Binary => {
828                        handle_primitive_array_type!(
829                            builder,
830                            field,
831                            col,
832                            BinaryBuilder,
833                            Vec<u8>,
834                            row,
835                            idx
836                        );
837                    }
838                    DataType::Boolean => {
839                        handle_primitive_array_type!(
840                            builder,
841                            field,
842                            col,
843                            BooleanBuilder,
844                            bool,
845                            row,
846                            idx
847                        );
848                    }
849                    _ => {
850                        return Err(DataFusionError::NotImplemented(format!(
851                            "Unsupported list data type {:?} for col: {:?}",
852                            field.data_type(),
853                            col
854                        )));
855                    }
856                },
857                _ => {
858                    return Err(DataFusionError::NotImplemented(format!(
859                        "Unsupported data type {:?} for col: {:?}",
860                        field.data_type(),
861                        col
862                    )));
863                }
864            }
865        }
866    }
867    let projected_columns = array_builders
868        .into_iter()
869        .enumerate()
870        .filter(|(idx, _)| projections_contains(projection, *idx))
871        .map(|(_, mut builder)| builder.finish())
872        .collect::<Vec<ArrayRef>>();
873    Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
874}