datafusion_remote_table/connection/
postgres.rs

1use crate::connection::{RemoteDbType, big_decimal_to_i128, just_return, projections_contains};
2use crate::{
3    Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4    RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14    Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15    IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
16    Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
17    TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20    DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use datafusion::prelude::Expr;
27use derive_getters::Getters;
28use derive_with::With;
29use futures::StreamExt;
30use num_bigint::{BigInt, Sign};
31use std::string::ToString;
32use std::sync::Arc;
33use std::time::{SystemTime, UNIX_EPOCH};
34
35#[derive(Debug, Clone, With, Getters)]
36pub struct PostgresConnectionOptions {
37    pub(crate) host: String,
38    pub(crate) port: u16,
39    pub(crate) username: String,
40    pub(crate) password: String,
41    pub(crate) database: Option<String>,
42    pub(crate) pool_max_size: usize,
43    pub(crate) stream_chunk_size: usize,
44}
45
46impl PostgresConnectionOptions {
47    pub fn new(
48        host: impl Into<String>,
49        port: u16,
50        username: impl Into<String>,
51        password: impl Into<String>,
52    ) -> Self {
53        Self {
54            host: host.into(),
55            port,
56            username: username.into(),
57            password: password.into(),
58            database: None,
59            pool_max_size: 10,
60            stream_chunk_size: 2048,
61        }
62    }
63}
64
65#[derive(Debug)]
66pub struct PostgresPool {
67    pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
68}
69
70#[async_trait::async_trait]
71impl Pool for PostgresPool {
72    async fn get(&self) -> DFResult<Arc<dyn Connection>> {
73        let conn = self.pool.get_owned().await.map_err(|e| {
74            DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
75        })?;
76        Ok(Arc::new(PostgresConnection { conn }))
77    }
78}
79
80pub(crate) async fn connect_postgres(
81    options: &PostgresConnectionOptions,
82) -> DFResult<PostgresPool> {
83    let mut config = bb8_postgres::tokio_postgres::config::Config::new();
84    config
85        .host(&options.host)
86        .port(options.port)
87        .user(&options.username)
88        .password(&options.password);
89    if let Some(database) = &options.database {
90        config.dbname(database);
91    }
92    let manager = PostgresConnectionManager::new(config, NoTls);
93    let pool = bb8::Pool::builder()
94        .max_size(options.pool_max_size as u32)
95        .build(manager)
96        .await
97        .map_err(|e| {
98            DataFusionError::Execution(format!(
99                "Failed to create postgres connection pool due to {e}",
100            ))
101        })?;
102
103    Ok(PostgresPool { pool })
104}
105
106#[derive(Debug)]
107pub(crate) struct PostgresConnection {
108    conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
109}
110
111#[async_trait::async_trait]
112impl Connection for PostgresConnection {
113    async fn infer_schema(&self, sql: &str) -> DFResult<RemoteSchemaRef> {
114        let sql = RemoteDbType::Postgres.query_limit_1(sql)?;
115        let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
116            DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
117        })?;
118        let remote_schema = Arc::new(build_remote_schema(&row)?);
119        Ok(remote_schema)
120    }
121
122    async fn query(
123        &self,
124        conn_options: &ConnectionOptions,
125        sql: &str,
126        table_schema: SchemaRef,
127        projection: Option<&Vec<usize>>,
128        filters: &[Expr],
129        limit: Option<usize>,
130    ) -> DFResult<SendableRecordBatchStream> {
131        let projected_schema = project_schema(&table_schema, projection)?;
132        let sql = RemoteDbType::Postgres.try_rewrite_query(sql, filters, limit)?;
133        let projection = projection.cloned();
134        let chunk_size = conn_options.stream_chunk_size();
135        let stream = self
136            .conn
137            .query_raw(&sql, Vec::<String>::new())
138            .await
139            .map_err(|e| {
140                DataFusionError::Execution(format!(
141                    "Failed to execute query {sql} on postgres: {e}",
142                ))
143            })?
144            .chunks(chunk_size)
145            .boxed();
146
147        let stream = stream.map(move |rows| {
148            let rows: Vec<Row> = rows
149                .into_iter()
150                .collect::<Result<Vec<_>, _>>()
151                .map_err(|e| {
152                    DataFusionError::Execution(format!(
153                        "Failed to collect rows from postgres due to {e}",
154                    ))
155                })?;
156            rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
157        });
158
159        Ok(Box::pin(RecordBatchStreamAdapter::new(
160            projected_schema,
161            stream,
162        )))
163    }
164}
165
166fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<PostgresType> {
167    match pg_type {
168        &Type::INT2 => Ok(PostgresType::Int2),
169        &Type::INT4 => Ok(PostgresType::Int4),
170        &Type::INT8 => Ok(PostgresType::Int8),
171        &Type::FLOAT4 => Ok(PostgresType::Float4),
172        &Type::FLOAT8 => Ok(PostgresType::Float8),
173        &Type::NUMERIC => {
174            let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
175                DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
176            })?;
177            let scale = match v {
178                Some(v) => v.scale,
179                None => 0,
180            };
181            assert!((scale as u32) <= (i8::MAX as u32));
182            Ok(PostgresType::Numeric(scale.try_into().unwrap_or_default()))
183        }
184        &Type::OID => Ok(PostgresType::Oid),
185        &Type::NAME => Ok(PostgresType::Name),
186        &Type::VARCHAR => Ok(PostgresType::Varchar),
187        &Type::BPCHAR => Ok(PostgresType::Bpchar),
188        &Type::TEXT => Ok(PostgresType::Text),
189        &Type::BYTEA => Ok(PostgresType::Bytea),
190        &Type::DATE => Ok(PostgresType::Date),
191        &Type::TIMESTAMP => Ok(PostgresType::Timestamp),
192        &Type::TIMESTAMPTZ => Ok(PostgresType::TimestampTz),
193        &Type::TIME => Ok(PostgresType::Time),
194        &Type::INTERVAL => Ok(PostgresType::Interval),
195        &Type::BOOL => Ok(PostgresType::Bool),
196        &Type::JSON => Ok(PostgresType::Json),
197        &Type::JSONB => Ok(PostgresType::Jsonb),
198        &Type::INT2_ARRAY => Ok(PostgresType::Int2Array),
199        &Type::INT4_ARRAY => Ok(PostgresType::Int4Array),
200        &Type::INT8_ARRAY => Ok(PostgresType::Int8Array),
201        &Type::FLOAT4_ARRAY => Ok(PostgresType::Float4Array),
202        &Type::FLOAT8_ARRAY => Ok(PostgresType::Float8Array),
203        &Type::VARCHAR_ARRAY => Ok(PostgresType::VarcharArray),
204        &Type::BPCHAR_ARRAY => Ok(PostgresType::BpcharArray),
205        &Type::TEXT_ARRAY => Ok(PostgresType::TextArray),
206        &Type::BYTEA_ARRAY => Ok(PostgresType::ByteaArray),
207        &Type::BOOL_ARRAY => Ok(PostgresType::BoolArray),
208        other if other.name().eq_ignore_ascii_case("geometry") => Ok(PostgresType::PostGisGeometry),
209        _ => Err(DataFusionError::NotImplemented(format!(
210            "Unsupported postgres type {pg_type:?}",
211        ))),
212    }
213}
214
215fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
216    let mut remote_fields = vec![];
217    for (idx, col) in row.columns().iter().enumerate() {
218        remote_fields.push(RemoteField::new(
219            col.name(),
220            RemoteType::Postgres(pg_type_to_remote_type(col.type_(), row, idx)?),
221            true,
222        ));
223    }
224    Ok(RemoteSchema::new(remote_fields))
225}
226
227macro_rules! handle_primitive_type {
228    ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
229        let builder = $builder
230            .as_any_mut()
231            .downcast_mut::<$builder_ty>()
232            .unwrap_or_else(|| {
233                panic!(
234                    "Failed to downcast builder to {} for {:?} and {:?}",
235                    stringify!($builder_ty),
236                    $field,
237                    $col
238                )
239            });
240        let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
241            DataFusionError::Execution(format!(
242                "Failed to get {} value for {:?} and {:?}: {e:?}",
243                stringify!($value_ty),
244                $field,
245                $col
246            ))
247        })?;
248
249        match v {
250            Some(v) => builder.append_value($convert(v)?),
251            None => builder.append_null(),
252        }
253    }};
254}
255
256macro_rules! handle_primitive_array_type {
257    ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
258        let builder = $builder
259            .as_any_mut()
260            .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
261            .unwrap_or_else(|| {
262                panic!(
263                    "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
264                    $field, $col
265                )
266            });
267        let values_builder = builder
268            .values()
269            .as_any_mut()
270            .downcast_mut::<$values_builder_ty>()
271            .unwrap_or_else(|| {
272                panic!(
273                    "Failed to downcast values builder to {} for {:?} and {:?}",
274                    stringify!($builder_ty),
275                    $field,
276                    $col,
277                )
278            });
279        let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
280            DataFusionError::Execution(format!(
281                "Failed to get {} array value for {:?} and {:?}: {e:?}",
282                stringify!($value_ty),
283                $field,
284                $col,
285            ))
286        })?;
287
288        match v {
289            Some(v) => {
290                let v = v.into_iter().map(Some);
291                values_builder.extend(v);
292                builder.append(true);
293            }
294            None => builder.append_null(),
295        }
296    }};
297}
298
299#[derive(Debug)]
300struct BigDecimalFromSql {
301    inner: BigDecimal,
302    scale: u16,
303}
304
305impl BigDecimalFromSql {
306    fn to_decimal_128(&self) -> Option<i128> {
307        big_decimal_to_i128(&self.inner, Some(self.scale as i32))
308    }
309}
310
311#[allow(clippy::cast_sign_loss)]
312#[allow(clippy::cast_possible_wrap)]
313#[allow(clippy::cast_possible_truncation)]
314impl<'a> FromSql<'a> for BigDecimalFromSql {
315    fn from_sql(
316        _ty: &Type,
317        raw: &'a [u8],
318    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
319        let raw_u16: Vec<u16> = raw
320            .chunks(2)
321            .map(|chunk| {
322                if chunk.len() == 2 {
323                    u16::from_be_bytes([chunk[0], chunk[1]])
324                } else {
325                    u16::from_be_bytes([chunk[0], 0])
326                }
327            })
328            .collect();
329
330        let base_10_000_digit_count = raw_u16[0];
331        let weight = raw_u16[1] as i16;
332        let sign = raw_u16[2];
333        let scale = raw_u16[3];
334
335        let mut base_10_000_digits = Vec::new();
336        for i in 4..4 + base_10_000_digit_count {
337            base_10_000_digits.push(raw_u16[i as usize]);
338        }
339
340        let mut u8_digits = Vec::new();
341        for &base_10_000_digit in base_10_000_digits.iter().rev() {
342            let mut base_10_000_digit = base_10_000_digit;
343            let mut temp_result = Vec::new();
344            while base_10_000_digit > 0 {
345                temp_result.push((base_10_000_digit % 10) as u8);
346                base_10_000_digit /= 10;
347            }
348            while temp_result.len() < 4 {
349                temp_result.push(0);
350            }
351            u8_digits.extend(temp_result);
352        }
353        u8_digits.reverse();
354
355        let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
356        let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
357        u8_digits.resize(size as usize, 0);
358
359        let sign = match sign {
360            0x4000 => Sign::Minus,
361            0x0000 => Sign::Plus,
362            _ => {
363                return Err(Box::new(DataFusionError::Execution(
364                    "Failed to parse big decimal from postgres numeric value".to_string(),
365                )));
366            }
367        };
368
369        let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
370            return Err(Box::new(DataFusionError::Execution(
371                "Failed to parse big decimal from postgres numeric value".to_string(),
372            )));
373        };
374        Ok(BigDecimalFromSql {
375            inner: BigDecimal::new(digits, i64::from(scale)),
376            scale,
377        })
378    }
379
380    fn accepts(ty: &Type) -> bool {
381        matches!(*ty, Type::NUMERIC)
382    }
383}
384
385// interval_send - Postgres C (https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/timestamp.c#L1032)
386// interval values are internally stored as three integral fields: months, days, and microseconds
387#[derive(Debug)]
388struct IntervalFromSql {
389    time: i64,
390    day: i32,
391    month: i32,
392}
393
394impl<'a> FromSql<'a> for IntervalFromSql {
395    fn from_sql(
396        _ty: &Type,
397        raw: &'a [u8],
398    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
399        let mut cursor = std::io::Cursor::new(raw);
400
401        let time = cursor.read_i64::<BigEndian>()?;
402        let day = cursor.read_i32::<BigEndian>()?;
403        let month = cursor.read_i32::<BigEndian>()?;
404
405        Ok(IntervalFromSql { time, day, month })
406    }
407
408    fn accepts(ty: &Type) -> bool {
409        matches!(*ty, Type::INTERVAL)
410    }
411}
412
413struct GeometryFromSql<'a> {
414    wkb: &'a [u8],
415}
416
417impl<'a> FromSql<'a> for GeometryFromSql<'a> {
418    fn from_sql(
419        _ty: &Type,
420        raw: &'a [u8],
421    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
422        Ok(GeometryFromSql { wkb: raw })
423    }
424
425    fn accepts(ty: &Type) -> bool {
426        matches!(ty.name(), "geometry")
427    }
428}
429
430fn rows_to_batch(
431    rows: &[Row],
432    table_schema: &SchemaRef,
433    projection: Option<&Vec<usize>>,
434) -> DFResult<RecordBatch> {
435    let projected_schema = project_schema(table_schema, projection)?;
436    let mut array_builders = vec![];
437    for field in table_schema.fields() {
438        let builder = make_builder(field.data_type(), rows.len());
439        array_builders.push(builder);
440    }
441
442    for row in rows {
443        for (idx, field) in table_schema.fields.iter().enumerate() {
444            if !projections_contains(projection, idx) {
445                continue;
446            }
447            let builder = &mut array_builders[idx];
448            let col = row.columns().get(idx);
449            match field.data_type() {
450                DataType::Int16 => {
451                    handle_primitive_type!(
452                        builder,
453                        field,
454                        col,
455                        Int16Builder,
456                        i16,
457                        row,
458                        idx,
459                        just_return
460                    );
461                }
462                DataType::Int32 => {
463                    handle_primitive_type!(
464                        builder,
465                        field,
466                        col,
467                        Int32Builder,
468                        i32,
469                        row,
470                        idx,
471                        just_return
472                    );
473                }
474                DataType::UInt32 => {
475                    handle_primitive_type!(
476                        builder,
477                        field,
478                        col,
479                        UInt32Builder,
480                        u32,
481                        row,
482                        idx,
483                        just_return
484                    );
485                }
486                DataType::Int64 => {
487                    handle_primitive_type!(
488                        builder,
489                        field,
490                        col,
491                        Int64Builder,
492                        i64,
493                        row,
494                        idx,
495                        just_return
496                    );
497                }
498                DataType::Float32 => {
499                    handle_primitive_type!(
500                        builder,
501                        field,
502                        col,
503                        Float32Builder,
504                        f32,
505                        row,
506                        idx,
507                        just_return
508                    );
509                }
510                DataType::Float64 => {
511                    handle_primitive_type!(
512                        builder,
513                        field,
514                        col,
515                        Float64Builder,
516                        f64,
517                        row,
518                        idx,
519                        just_return
520                    );
521                }
522                DataType::Decimal128(_precision, _scale) => {
523                    handle_primitive_type!(
524                        builder,
525                        field,
526                        col,
527                        Decimal128Builder,
528                        BigDecimalFromSql,
529                        row,
530                        idx,
531                        |v: BigDecimalFromSql| {
532                            v.to_decimal_128().ok_or_else(|| {
533                                DataFusionError::Execution(format!(
534                                    "Failed to convert BigDecimal {v:?} to i128",
535                                ))
536                            })
537                        }
538                    );
539                }
540                DataType::Utf8 => {
541                    handle_primitive_type!(
542                        builder,
543                        field,
544                        col,
545                        StringBuilder,
546                        &str,
547                        row,
548                        idx,
549                        just_return
550                    );
551                }
552                DataType::LargeUtf8 => {
553                    if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
554                        handle_primitive_type!(
555                            builder,
556                            field,
557                            col,
558                            LargeStringBuilder,
559                            serde_json::value::Value,
560                            row,
561                            idx,
562                            |v: serde_json::value::Value| {
563                                Ok::<_, DataFusionError>(v.to_string())
564                            }
565                        );
566                    } else {
567                        handle_primitive_type!(
568                            builder,
569                            field,
570                            col,
571                            LargeStringBuilder,
572                            &str,
573                            row,
574                            idx,
575                            just_return
576                        );
577                    }
578                }
579                DataType::Binary => {
580                    if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
581                    {
582                        let convert: for<'a> fn(GeometryFromSql<'a>) -> DFResult<&'a [u8]> =
583                            |v| Ok(v.wkb);
584                        handle_primitive_type!(
585                            builder,
586                            field,
587                            col,
588                            BinaryBuilder,
589                            GeometryFromSql,
590                            row,
591                            idx,
592                            convert
593                        );
594                    } else if col.is_some()
595                        && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
596                    {
597                        handle_primitive_type!(
598                            builder,
599                            field,
600                            col,
601                            BinaryBuilder,
602                            serde_json::value::Value,
603                            row,
604                            idx,
605                            |v: serde_json::value::Value| {
606                                Ok::<_, DataFusionError>(v.to_string().into_bytes())
607                            }
608                        );
609                    } else {
610                        handle_primitive_type!(
611                            builder,
612                            field,
613                            col,
614                            BinaryBuilder,
615                            Vec<u8>,
616                            row,
617                            idx,
618                            just_return
619                        );
620                    }
621                }
622                DataType::Timestamp(TimeUnit::Microsecond, None) => {
623                    handle_primitive_type!(
624                        builder,
625                        field,
626                        col,
627                        TimestampMicrosecondBuilder,
628                        SystemTime,
629                        row,
630                        idx,
631                        |v: SystemTime| {
632                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
633                                let timestamp: i64 = v
634                                    .as_micros()
635                                    .try_into()
636                                    .expect("Failed to convert SystemTime to i64");
637                                Ok(timestamp)
638                            } else {
639                                Err(DataFusionError::Execution(format!(
640                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
641                                )))
642                            }
643                        }
644                    );
645                }
646                DataType::Timestamp(TimeUnit::Nanosecond, None) => {
647                    handle_primitive_type!(
648                        builder,
649                        field,
650                        col,
651                        TimestampNanosecondBuilder,
652                        SystemTime,
653                        row,
654                        idx,
655                        |v: SystemTime| {
656                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
657                                let timestamp: i64 = v
658                                    .as_nanos()
659                                    .try_into()
660                                    .expect("Failed to convert SystemTime to i64");
661                                Ok(timestamp)
662                            } else {
663                                Err(DataFusionError::Execution(format!(
664                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
665                                )))
666                            }
667                        }
668                    );
669                }
670                DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
671                    handle_primitive_type!(
672                        builder,
673                        field,
674                        col,
675                        TimestampNanosecondBuilder,
676                        chrono::DateTime<chrono::Utc>,
677                        row,
678                        idx,
679                        |v: chrono::DateTime<chrono::Utc>| {
680                            let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
681                            Ok::<_, DataFusionError>(timestamp)
682                        }
683                    );
684                }
685                DataType::Time64(TimeUnit::Microsecond) => {
686                    handle_primitive_type!(
687                        builder,
688                        field,
689                        col,
690                        Time64MicrosecondBuilder,
691                        chrono::NaiveTime,
692                        row,
693                        idx,
694                        |v: chrono::NaiveTime| {
695                            let seconds = i64::from(v.num_seconds_from_midnight());
696                            let microseconds = i64::from(v.nanosecond()) / 1000;
697                            Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
698                        }
699                    );
700                }
701                DataType::Time64(TimeUnit::Nanosecond) => {
702                    handle_primitive_type!(
703                        builder,
704                        field,
705                        col,
706                        Time64NanosecondBuilder,
707                        chrono::NaiveTime,
708                        row,
709                        idx,
710                        |v: chrono::NaiveTime| {
711                            let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
712                                * 1_000_000_000
713                                + i64::from(v.nanosecond());
714                            Ok::<_, DataFusionError>(timestamp)
715                        }
716                    );
717                }
718                DataType::Date32 => {
719                    handle_primitive_type!(
720                        builder,
721                        field,
722                        col,
723                        Date32Builder,
724                        chrono::NaiveDate,
725                        row,
726                        idx,
727                        |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
728                    );
729                }
730                DataType::Interval(IntervalUnit::MonthDayNano) => {
731                    handle_primitive_type!(
732                        builder,
733                        field,
734                        col,
735                        IntervalMonthDayNanoBuilder,
736                        IntervalFromSql,
737                        row,
738                        idx,
739                        |v: IntervalFromSql| {
740                            let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
741                                v.month,
742                                v.day,
743                                v.time * 1_000,
744                            );
745                            Ok::<_, DataFusionError>(interval_month_day_nano)
746                        }
747                    );
748                }
749                DataType::Boolean => {
750                    handle_primitive_type!(
751                        builder,
752                        field,
753                        col,
754                        BooleanBuilder,
755                        bool,
756                        row,
757                        idx,
758                        just_return
759                    );
760                }
761                DataType::List(inner) => match inner.data_type() {
762                    DataType::Int16 => {
763                        handle_primitive_array_type!(
764                            builder,
765                            field,
766                            col,
767                            Int16Builder,
768                            i16,
769                            row,
770                            idx
771                        );
772                    }
773                    DataType::Int32 => {
774                        handle_primitive_array_type!(
775                            builder,
776                            field,
777                            col,
778                            Int32Builder,
779                            i32,
780                            row,
781                            idx
782                        );
783                    }
784                    DataType::Int64 => {
785                        handle_primitive_array_type!(
786                            builder,
787                            field,
788                            col,
789                            Int64Builder,
790                            i64,
791                            row,
792                            idx
793                        );
794                    }
795                    DataType::Float32 => {
796                        handle_primitive_array_type!(
797                            builder,
798                            field,
799                            col,
800                            Float32Builder,
801                            f32,
802                            row,
803                            idx
804                        );
805                    }
806                    DataType::Float64 => {
807                        handle_primitive_array_type!(
808                            builder,
809                            field,
810                            col,
811                            Float64Builder,
812                            f64,
813                            row,
814                            idx
815                        );
816                    }
817                    DataType::Utf8 => {
818                        handle_primitive_array_type!(
819                            builder,
820                            field,
821                            col,
822                            StringBuilder,
823                            &str,
824                            row,
825                            idx
826                        );
827                    }
828                    DataType::Binary => {
829                        handle_primitive_array_type!(
830                            builder,
831                            field,
832                            col,
833                            BinaryBuilder,
834                            Vec<u8>,
835                            row,
836                            idx
837                        );
838                    }
839                    DataType::Boolean => {
840                        handle_primitive_array_type!(
841                            builder,
842                            field,
843                            col,
844                            BooleanBuilder,
845                            bool,
846                            row,
847                            idx
848                        );
849                    }
850                    _ => {
851                        return Err(DataFusionError::NotImplemented(format!(
852                            "Unsupported list data type {:?} for col: {:?}",
853                            field.data_type(),
854                            col
855                        )));
856                    }
857                },
858                _ => {
859                    return Err(DataFusionError::NotImplemented(format!(
860                        "Unsupported data type {:?} for col: {:?}",
861                        field.data_type(),
862                        col
863                    )));
864                }
865            }
866        }
867    }
868    let projected_columns = array_builders
869        .into_iter()
870        .enumerate()
871        .filter(|(idx, _)| projections_contains(projection, *idx))
872        .map(|(_, mut builder)| builder.finish())
873        .collect::<Vec<ArrayRef>>();
874    Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
875}