datafusion_remote_table/connection/
postgres.rs

1use crate::connection::{big_decimal_to_i128, projections_contains};
2use crate::{
3    Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4    RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14    Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15    IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
16    Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
17    TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20    DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::limit::LimitStream;
26use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
27use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
28use derive_getters::Getters;
29use derive_with::With;
30use futures::StreamExt;
31use num_bigint::{BigInt, Sign};
32use std::string::ToString;
33use std::sync::Arc;
34use std::time::{SystemTime, UNIX_EPOCH};
35
36#[derive(Debug, Clone, With, Getters)]
37pub struct PostgresConnectionOptions {
38    pub(crate) host: String,
39    pub(crate) port: u16,
40    pub(crate) username: String,
41    pub(crate) password: String,
42    pub(crate) database: Option<String>,
43    pub(crate) pool_max_size: usize,
44    pub(crate) stream_chunk_size: usize,
45}
46
47impl PostgresConnectionOptions {
48    pub fn new(
49        host: impl Into<String>,
50        port: u16,
51        username: impl Into<String>,
52        password: impl Into<String>,
53    ) -> Self {
54        Self {
55            host: host.into(),
56            port,
57            username: username.into(),
58            password: password.into(),
59            database: None,
60            pool_max_size: 10,
61            stream_chunk_size: 2048,
62        }
63    }
64}
65
66#[derive(Debug)]
67pub struct PostgresPool {
68    pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
69}
70
71#[async_trait::async_trait]
72impl Pool for PostgresPool {
73    async fn get(&self) -> DFResult<Arc<dyn Connection>> {
74        let conn = self.pool.get_owned().await.map_err(|e| {
75            DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
76        })?;
77        Ok(Arc::new(PostgresConnection { conn }))
78    }
79}
80
81pub(crate) async fn connect_postgres(
82    options: &PostgresConnectionOptions,
83) -> DFResult<PostgresPool> {
84    let mut config = bb8_postgres::tokio_postgres::config::Config::new();
85    config
86        .host(&options.host)
87        .port(options.port)
88        .user(&options.username)
89        .password(&options.password);
90    if let Some(database) = &options.database {
91        config.dbname(database);
92    }
93    let manager = PostgresConnectionManager::new(config, NoTls);
94    let pool = bb8::Pool::builder()
95        .max_size(options.pool_max_size as u32)
96        .build(manager)
97        .await
98        .map_err(|e| {
99            DataFusionError::Execution(format!(
100                "Failed to create postgres connection pool due to {e}",
101            ))
102        })?;
103
104    Ok(PostgresPool { pool })
105}
106
107#[derive(Debug)]
108pub(crate) struct PostgresConnection {
109    conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
110}
111
112#[async_trait::async_trait]
113impl Connection for PostgresConnection {
114    async fn infer_schema(&self, sql: &str) -> DFResult<(RemoteSchemaRef, SchemaRef)> {
115        let sql = try_limit_query(sql, 1).unwrap_or_else(|| sql.to_string());
116        let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
117            DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
118        })?;
119        let remote_schema = Arc::new(build_remote_schema(&row)?);
120        let arrow_schema = Arc::new(remote_schema.to_arrow_schema());
121        Ok((remote_schema, arrow_schema))
122    }
123
124    async fn query(
125        &self,
126        conn_options: &ConnectionOptions,
127        sql: &str,
128        table_schema: SchemaRef,
129        projection: Option<&Vec<usize>>,
130        limit: Option<usize>,
131    ) -> DFResult<SendableRecordBatchStream> {
132        let projected_schema = project_schema(&table_schema, projection)?;
133        let (sql, limit_stream) = match limit {
134            Some(limit) => {
135                if let Some(limited_sql) = try_limit_query(sql, limit) {
136                    (limited_sql, false)
137                } else {
138                    (sql.to_string(), true)
139                }
140            }
141            None => (sql.to_string(), false),
142        };
143        let projection = projection.cloned();
144        let chunk_size = conn_options.stream_chunk_size();
145        let stream = self
146            .conn
147            .query_raw(&sql, Vec::<String>::new())
148            .await
149            .map_err(|e| {
150                DataFusionError::Execution(format!(
151                    "Failed to execute query {sql} on postgres due to {e}",
152                ))
153            })?
154            .chunks(chunk_size)
155            .boxed();
156
157        let stream = stream.map(move |rows| {
158            let rows: Vec<Row> = rows
159                .into_iter()
160                .collect::<Result<Vec<_>, _>>()
161                .map_err(|e| {
162                    DataFusionError::Execution(format!(
163                        "Failed to collect rows from postgres due to {e}",
164                    ))
165                })?;
166            rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
167        });
168
169        let sendable_stream = Box::pin(RecordBatchStreamAdapter::new(projected_schema, stream));
170
171        if limit_stream {
172            let metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
173            Ok(Box::pin(LimitStream::new(
174                sendable_stream,
175                0,
176                limit,
177                metrics,
178            )))
179        } else {
180            Ok(sendable_stream)
181        }
182    }
183}
184
185fn try_limit_query(sql: &str, limit: usize) -> Option<String> {
186    if sql.trim()[0..6].eq_ignore_ascii_case("select") {
187        Some(format!("SELECT * FROM ({sql}) as __subquery LIMIT {limit}"))
188    } else {
189        None
190    }
191}
192
193fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<RemoteType> {
194    match pg_type {
195        &Type::INT2 => Ok(RemoteType::Postgres(PostgresType::Int2)),
196        &Type::INT4 => Ok(RemoteType::Postgres(PostgresType::Int4)),
197        &Type::INT8 => Ok(RemoteType::Postgres(PostgresType::Int8)),
198        &Type::FLOAT4 => Ok(RemoteType::Postgres(PostgresType::Float4)),
199        &Type::FLOAT8 => Ok(RemoteType::Postgres(PostgresType::Float8)),
200        &Type::NUMERIC => {
201            let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
202                DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
203            })?;
204            let scale = match v {
205                Some(v) => v.scale,
206                None => 0,
207            };
208            assert!((scale as u32) <= (i8::MAX as u32));
209            Ok(RemoteType::Postgres(PostgresType::Numeric(
210                scale.try_into().unwrap_or_default(),
211            )))
212        }
213        &Type::OID => Ok(RemoteType::Postgres(PostgresType::Oid)),
214        &Type::NAME => Ok(RemoteType::Postgres(PostgresType::Name)),
215        &Type::VARCHAR => Ok(RemoteType::Postgres(PostgresType::Varchar)),
216        &Type::BPCHAR => Ok(RemoteType::Postgres(PostgresType::Bpchar)),
217        &Type::TEXT => Ok(RemoteType::Postgres(PostgresType::Text)),
218        &Type::BYTEA => Ok(RemoteType::Postgres(PostgresType::Bytea)),
219        &Type::DATE => Ok(RemoteType::Postgres(PostgresType::Date)),
220        &Type::TIMESTAMP => Ok(RemoteType::Postgres(PostgresType::Timestamp)),
221        &Type::TIMESTAMPTZ => Ok(RemoteType::Postgres(PostgresType::TimestampTz)),
222        &Type::TIME => Ok(RemoteType::Postgres(PostgresType::Time)),
223        &Type::INTERVAL => Ok(RemoteType::Postgres(PostgresType::Interval)),
224        &Type::BOOL => Ok(RemoteType::Postgres(PostgresType::Bool)),
225        &Type::JSON => Ok(RemoteType::Postgres(PostgresType::Json)),
226        &Type::JSONB => Ok(RemoteType::Postgres(PostgresType::Jsonb)),
227        &Type::INT2_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int2Array)),
228        &Type::INT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int4Array)),
229        &Type::INT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int8Array)),
230        &Type::FLOAT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float4Array)),
231        &Type::FLOAT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float8Array)),
232        &Type::VARCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::VarcharArray)),
233        &Type::BPCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::BpcharArray)),
234        &Type::TEXT_ARRAY => Ok(RemoteType::Postgres(PostgresType::TextArray)),
235        &Type::BYTEA_ARRAY => Ok(RemoteType::Postgres(PostgresType::ByteaArray)),
236        &Type::BOOL_ARRAY => Ok(RemoteType::Postgres(PostgresType::BoolArray)),
237        other if other.name().eq_ignore_ascii_case("geometry") => {
238            Ok(RemoteType::Postgres(PostgresType::PostGisGeometry))
239        }
240        _ => Err(DataFusionError::NotImplemented(format!(
241            "Unsupported postgres type {pg_type:?}",
242        ))),
243    }
244}
245
246fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
247    let mut remote_fields = vec![];
248    for (idx, col) in row.columns().iter().enumerate() {
249        remote_fields.push(RemoteField::new(
250            col.name(),
251            pg_type_to_remote_type(col.type_(), row, idx)?,
252            true,
253        ));
254    }
255    Ok(RemoteSchema::new(remote_fields))
256}
257
258macro_rules! handle_primitive_type {
259    ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
260        let builder = $builder
261            .as_any_mut()
262            .downcast_mut::<$builder_ty>()
263            .unwrap_or_else(|| {
264                panic!(
265                    "Failed to downcast builder to {} for {:?} and {:?}",
266                    stringify!($builder_ty),
267                    $field,
268                    $col
269                )
270            });
271        let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
272            DataFusionError::Execution(format!(
273                "Failed to get {} value for {:?} and {:?}: {e:?}",
274                stringify!($value_ty),
275                $field,
276                $col
277            ))
278        })?;
279
280        match v {
281            Some(v) => builder.append_value($convert(v)?),
282            None => builder.append_null(),
283        }
284    }};
285}
286
287macro_rules! handle_primitive_array_type {
288    ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
289        let builder = $builder
290            .as_any_mut()
291            .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
292            .unwrap_or_else(|| {
293                panic!(
294                    "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
295                    $field, $col
296                )
297            });
298        let values_builder = builder
299            .values()
300            .as_any_mut()
301            .downcast_mut::<$values_builder_ty>()
302            .unwrap_or_else(|| {
303                panic!(
304                    "Failed to downcast values builder to {} for {:?} and {:?}",
305                    stringify!($builder_ty),
306                    $field,
307                    $col,
308                )
309            });
310        let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
311            DataFusionError::Execution(format!(
312                "Failed to get {} array value for {:?} and {:?}: {e:?}",
313                stringify!($value_ty),
314                $field,
315                $col,
316            ))
317        })?;
318
319        match v {
320            Some(v) => {
321                let v = v.into_iter().map(Some);
322                values_builder.extend(v);
323                builder.append(true);
324            }
325            None => builder.append_null(),
326        }
327    }};
328}
329
330#[derive(Debug)]
331struct BigDecimalFromSql {
332    inner: BigDecimal,
333    scale: u16,
334}
335
336impl BigDecimalFromSql {
337    fn to_decimal_128(&self) -> Option<i128> {
338        big_decimal_to_i128(&self.inner, Some(self.scale as i32))
339    }
340}
341
342#[allow(clippy::cast_sign_loss)]
343#[allow(clippy::cast_possible_wrap)]
344#[allow(clippy::cast_possible_truncation)]
345impl<'a> FromSql<'a> for BigDecimalFromSql {
346    fn from_sql(
347        _ty: &Type,
348        raw: &'a [u8],
349    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
350        let raw_u16: Vec<u16> = raw
351            .chunks(2)
352            .map(|chunk| {
353                if chunk.len() == 2 {
354                    u16::from_be_bytes([chunk[0], chunk[1]])
355                } else {
356                    u16::from_be_bytes([chunk[0], 0])
357                }
358            })
359            .collect();
360
361        let base_10_000_digit_count = raw_u16[0];
362        let weight = raw_u16[1] as i16;
363        let sign = raw_u16[2];
364        let scale = raw_u16[3];
365
366        let mut base_10_000_digits = Vec::new();
367        for i in 4..4 + base_10_000_digit_count {
368            base_10_000_digits.push(raw_u16[i as usize]);
369        }
370
371        let mut u8_digits = Vec::new();
372        for &base_10_000_digit in base_10_000_digits.iter().rev() {
373            let mut base_10_000_digit = base_10_000_digit;
374            let mut temp_result = Vec::new();
375            while base_10_000_digit > 0 {
376                temp_result.push((base_10_000_digit % 10) as u8);
377                base_10_000_digit /= 10;
378            }
379            while temp_result.len() < 4 {
380                temp_result.push(0);
381            }
382            u8_digits.extend(temp_result);
383        }
384        u8_digits.reverse();
385
386        let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
387        let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
388        u8_digits.resize(size as usize, 0);
389
390        let sign = match sign {
391            0x4000 => Sign::Minus,
392            0x0000 => Sign::Plus,
393            _ => {
394                return Err(Box::new(DataFusionError::Execution(
395                    "Failed to parse big decimal from postgres numeric value".to_string(),
396                )));
397            }
398        };
399
400        let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
401            return Err(Box::new(DataFusionError::Execution(
402                "Failed to parse big decimal from postgres numeric value".to_string(),
403            )));
404        };
405        Ok(BigDecimalFromSql {
406            inner: BigDecimal::new(digits, i64::from(scale)),
407            scale,
408        })
409    }
410
411    fn accepts(ty: &Type) -> bool {
412        matches!(*ty, Type::NUMERIC)
413    }
414}
415
416// interval_send - Postgres C (https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/timestamp.c#L1032)
417// interval values are internally stored as three integral fields: months, days, and microseconds
418#[derive(Debug)]
419struct IntervalFromSql {
420    time: i64,
421    day: i32,
422    month: i32,
423}
424
425impl<'a> FromSql<'a> for IntervalFromSql {
426    fn from_sql(
427        _ty: &Type,
428        raw: &'a [u8],
429    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
430        let mut cursor = std::io::Cursor::new(raw);
431
432        let time = cursor.read_i64::<BigEndian>()?;
433        let day = cursor.read_i32::<BigEndian>()?;
434        let month = cursor.read_i32::<BigEndian>()?;
435
436        Ok(IntervalFromSql { time, day, month })
437    }
438
439    fn accepts(ty: &Type) -> bool {
440        matches!(*ty, Type::INTERVAL)
441    }
442}
443
444struct GeometryFromSql<'a> {
445    wkb: &'a [u8],
446}
447
448impl<'a> FromSql<'a> for GeometryFromSql<'a> {
449    fn from_sql(
450        _ty: &Type,
451        raw: &'a [u8],
452    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
453        Ok(GeometryFromSql { wkb: raw })
454    }
455
456    fn accepts(ty: &Type) -> bool {
457        matches!(ty.name(), "geometry")
458    }
459}
460
461fn rows_to_batch(
462    rows: &[Row],
463    table_schema: &SchemaRef,
464    projection: Option<&Vec<usize>>,
465) -> DFResult<RecordBatch> {
466    let projected_schema = project_schema(table_schema, projection)?;
467    let mut array_builders = vec![];
468    for field in table_schema.fields() {
469        let builder = make_builder(field.data_type(), rows.len());
470        array_builders.push(builder);
471    }
472
473    for row in rows {
474        for (idx, field) in table_schema.fields.iter().enumerate() {
475            if !projections_contains(projection, idx) {
476                continue;
477            }
478            let builder = &mut array_builders[idx];
479            let col = row.columns().get(idx);
480            match field.data_type() {
481                DataType::Int16 => {
482                    handle_primitive_type!(builder, field, col, Int16Builder, i16, row, idx, |v| {
483                        Ok::<_, DataFusionError>(v)
484                    });
485                }
486                DataType::Int32 => {
487                    handle_primitive_type!(builder, field, col, Int32Builder, i32, row, idx, |v| {
488                        Ok::<_, DataFusionError>(v)
489                    });
490                }
491                DataType::UInt32 => {
492                    handle_primitive_type!(
493                        builder,
494                        field,
495                        col,
496                        UInt32Builder,
497                        u32,
498                        row,
499                        idx,
500                        |v| { Ok::<_, DataFusionError>(v) }
501                    );
502                }
503                DataType::Int64 => {
504                    handle_primitive_type!(builder, field, col, Int64Builder, i64, row, idx, |v| {
505                        Ok::<_, DataFusionError>(v)
506                    });
507                }
508                DataType::Float32 => {
509                    handle_primitive_type!(
510                        builder,
511                        field,
512                        col,
513                        Float32Builder,
514                        f32,
515                        row,
516                        idx,
517                        |v| { Ok::<_, DataFusionError>(v) }
518                    );
519                }
520                DataType::Float64 => {
521                    handle_primitive_type!(
522                        builder,
523                        field,
524                        col,
525                        Float64Builder,
526                        f64,
527                        row,
528                        idx,
529                        |v| { Ok::<_, DataFusionError>(v) }
530                    );
531                }
532                DataType::Decimal128(_precision, _scale) => {
533                    handle_primitive_type!(
534                        builder,
535                        field,
536                        col,
537                        Decimal128Builder,
538                        BigDecimalFromSql,
539                        row,
540                        idx,
541                        |v: BigDecimalFromSql| {
542                            v.to_decimal_128().ok_or_else(|| {
543                                DataFusionError::Execution(format!(
544                                    "Failed to convert BigDecimal {v:?} to i128",
545                                ))
546                            })
547                        }
548                    );
549                }
550                DataType::Utf8 => {
551                    handle_primitive_type!(
552                        builder,
553                        field,
554                        col,
555                        StringBuilder,
556                        &str,
557                        row,
558                        idx,
559                        |v| { Ok::<_, DataFusionError>(v) }
560                    );
561                }
562                DataType::LargeUtf8 => {
563                    if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
564                        handle_primitive_type!(
565                            builder,
566                            field,
567                            col,
568                            LargeStringBuilder,
569                            serde_json::value::Value,
570                            row,
571                            idx,
572                            |v: serde_json::value::Value| {
573                                Ok::<_, DataFusionError>(v.to_string())
574                            }
575                        );
576                    } else {
577                        handle_primitive_type!(
578                            builder,
579                            field,
580                            col,
581                            LargeStringBuilder,
582                            &str,
583                            row,
584                            idx,
585                            |v| { Ok::<_, DataFusionError>(v) }
586                        );
587                    }
588                }
589                DataType::Binary => {
590                    if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
591                    {
592                        handle_primitive_type!(
593                            builder,
594                            field,
595                            col,
596                            BinaryBuilder,
597                            GeometryFromSql,
598                            row,
599                            idx,
600                            |v: GeometryFromSql| { Ok::<_, DataFusionError>(v.wkb.to_vec()) }
601                        );
602                    } else if col.is_some()
603                        && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
604                    {
605                        handle_primitive_type!(
606                            builder,
607                            field,
608                            col,
609                            BinaryBuilder,
610                            serde_json::value::Value,
611                            row,
612                            idx,
613                            |v: serde_json::value::Value| {
614                                Ok::<_, DataFusionError>(v.to_string().as_bytes().to_vec())
615                            }
616                        );
617                    } else {
618                        handle_primitive_type!(
619                            builder,
620                            field,
621                            col,
622                            BinaryBuilder,
623                            Vec<u8>,
624                            row,
625                            idx,
626                            |v| { Ok::<_, DataFusionError>(v) }
627                        );
628                    }
629                }
630                DataType::Timestamp(TimeUnit::Microsecond, None) => {
631                    handle_primitive_type!(
632                        builder,
633                        field,
634                        col,
635                        TimestampMicrosecondBuilder,
636                        SystemTime,
637                        row,
638                        idx,
639                        |v: SystemTime| {
640                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
641                                let timestamp: i64 = v
642                                    .as_micros()
643                                    .try_into()
644                                    .expect("Failed to convert SystemTime to i64");
645                                Ok(timestamp)
646                            } else {
647                                Err(DataFusionError::Execution(format!(
648                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
649                                )))
650                            }
651                        }
652                    );
653                }
654                DataType::Timestamp(TimeUnit::Nanosecond, None) => {
655                    handle_primitive_type!(
656                        builder,
657                        field,
658                        col,
659                        TimestampNanosecondBuilder,
660                        SystemTime,
661                        row,
662                        idx,
663                        |v: SystemTime| {
664                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
665                                let timestamp: i64 = v
666                                    .as_nanos()
667                                    .try_into()
668                                    .expect("Failed to convert SystemTime to i64");
669                                Ok(timestamp)
670                            } else {
671                                Err(DataFusionError::Execution(format!(
672                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
673                                )))
674                            }
675                        }
676                    );
677                }
678                DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
679                    handle_primitive_type!(
680                        builder,
681                        field,
682                        col,
683                        TimestampNanosecondBuilder,
684                        chrono::DateTime<chrono::Utc>,
685                        row,
686                        idx,
687                        |v: chrono::DateTime<chrono::Utc>| {
688                            let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
689                            Ok::<_, DataFusionError>(timestamp)
690                        }
691                    );
692                }
693                DataType::Time64(TimeUnit::Microsecond) => {
694                    handle_primitive_type!(
695                        builder,
696                        field,
697                        col,
698                        Time64MicrosecondBuilder,
699                        chrono::NaiveTime,
700                        row,
701                        idx,
702                        |v: chrono::NaiveTime| {
703                            let seconds = i64::from(v.num_seconds_from_midnight());
704                            let microseconds = i64::from(v.nanosecond()) / 1000;
705                            Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
706                        }
707                    );
708                }
709                DataType::Time64(TimeUnit::Nanosecond) => {
710                    handle_primitive_type!(
711                        builder,
712                        field,
713                        col,
714                        Time64NanosecondBuilder,
715                        chrono::NaiveTime,
716                        row,
717                        idx,
718                        |v: chrono::NaiveTime| {
719                            let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
720                                * 1_000_000_000
721                                + i64::from(v.nanosecond());
722                            Ok::<_, DataFusionError>(timestamp)
723                        }
724                    );
725                }
726                DataType::Date32 => {
727                    handle_primitive_type!(
728                        builder,
729                        field,
730                        col,
731                        Date32Builder,
732                        chrono::NaiveDate,
733                        row,
734                        idx,
735                        |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
736                    );
737                }
738                DataType::Interval(IntervalUnit::MonthDayNano) => {
739                    handle_primitive_type!(
740                        builder,
741                        field,
742                        col,
743                        IntervalMonthDayNanoBuilder,
744                        IntervalFromSql,
745                        row,
746                        idx,
747                        |v: IntervalFromSql| {
748                            let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
749                                v.month,
750                                v.day,
751                                v.time * 1_000,
752                            );
753                            Ok::<_, DataFusionError>(interval_month_day_nano)
754                        }
755                    );
756                }
757                DataType::Boolean => {
758                    handle_primitive_type!(
759                        builder,
760                        field,
761                        col,
762                        BooleanBuilder,
763                        bool,
764                        row,
765                        idx,
766                        |v| { Ok::<_, DataFusionError>(v) }
767                    );
768                }
769                DataType::List(inner) => match inner.data_type() {
770                    DataType::Int16 => {
771                        handle_primitive_array_type!(
772                            builder,
773                            field,
774                            col,
775                            Int16Builder,
776                            i16,
777                            row,
778                            idx
779                        );
780                    }
781                    DataType::Int32 => {
782                        handle_primitive_array_type!(
783                            builder,
784                            field,
785                            col,
786                            Int32Builder,
787                            i32,
788                            row,
789                            idx
790                        );
791                    }
792                    DataType::Int64 => {
793                        handle_primitive_array_type!(
794                            builder,
795                            field,
796                            col,
797                            Int64Builder,
798                            i64,
799                            row,
800                            idx
801                        );
802                    }
803                    DataType::Float32 => {
804                        handle_primitive_array_type!(
805                            builder,
806                            field,
807                            col,
808                            Float32Builder,
809                            f32,
810                            row,
811                            idx
812                        );
813                    }
814                    DataType::Float64 => {
815                        handle_primitive_array_type!(
816                            builder,
817                            field,
818                            col,
819                            Float64Builder,
820                            f64,
821                            row,
822                            idx
823                        );
824                    }
825                    DataType::Utf8 => {
826                        handle_primitive_array_type!(
827                            builder,
828                            field,
829                            col,
830                            StringBuilder,
831                            &str,
832                            row,
833                            idx
834                        );
835                    }
836                    DataType::Binary => {
837                        handle_primitive_array_type!(
838                            builder,
839                            field,
840                            col,
841                            BinaryBuilder,
842                            Vec<u8>,
843                            row,
844                            idx
845                        );
846                    }
847                    DataType::Boolean => {
848                        handle_primitive_array_type!(
849                            builder,
850                            field,
851                            col,
852                            BooleanBuilder,
853                            bool,
854                            row,
855                            idx
856                        );
857                    }
858                    _ => {
859                        return Err(DataFusionError::NotImplemented(format!(
860                            "Unsupported list data type {:?} for col: {:?}",
861                            field.data_type(),
862                            col
863                        )));
864                    }
865                },
866                _ => {
867                    return Err(DataFusionError::NotImplemented(format!(
868                        "Unsupported data type {:?} for col: {:?}",
869                        field.data_type(),
870                        col
871                    )));
872                }
873            }
874        }
875    }
876    let projected_columns = array_builders
877        .into_iter()
878        .enumerate()
879        .filter(|(idx, _)| projections_contains(projection, *idx))
880        .map(|(_, mut builder)| builder.finish())
881        .collect::<Vec<ArrayRef>>();
882    Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
883}