datafusion_remote_table/connection/
postgres.rs

1use crate::connection::{RemoteDbType, big_decimal_to_i128, just_return, projections_contains};
2use crate::{
3    Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4    RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14    FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int16Builder, Int32Builder,
15    Int64Builder, IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch,
16    RecordBatchOptions, StringBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder,
17    TimestampMicrosecondBuilder, TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20    DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use derive_getters::Getters;
27use derive_with::With;
28use futures::StreamExt;
29use log::debug;
30use num_bigint::{BigInt, Sign};
31use std::any::Any;
32use std::string::ToString;
33use std::sync::Arc;
34use std::time::{SystemTime, UNIX_EPOCH};
35use uuid::Uuid;
36
37#[derive(Debug, Clone, With, Getters)]
38pub struct PostgresConnectionOptions {
39    pub(crate) host: String,
40    pub(crate) port: u16,
41    pub(crate) username: String,
42    pub(crate) password: String,
43    pub(crate) database: Option<String>,
44    pub(crate) pool_max_size: usize,
45    pub(crate) stream_chunk_size: usize,
46}
47
48impl PostgresConnectionOptions {
49    pub fn new(
50        host: impl Into<String>,
51        port: u16,
52        username: impl Into<String>,
53        password: impl Into<String>,
54    ) -> Self {
55        Self {
56            host: host.into(),
57            port,
58            username: username.into(),
59            password: password.into(),
60            database: None,
61            pool_max_size: 10,
62            stream_chunk_size: 2048,
63        }
64    }
65}
66
67impl From<PostgresConnectionOptions> for ConnectionOptions {
68    fn from(options: PostgresConnectionOptions) -> Self {
69        ConnectionOptions::Postgres(options)
70    }
71}
72
73#[derive(Debug)]
74pub struct PostgresPool {
75    pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
76}
77
78#[async_trait::async_trait]
79impl Pool for PostgresPool {
80    async fn get(&self) -> DFResult<Arc<dyn Connection>> {
81        let conn = self.pool.get_owned().await.map_err(|e| {
82            DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
83        })?;
84        Ok(Arc::new(PostgresConnection { conn }))
85    }
86}
87
88pub(crate) async fn connect_postgres(
89    options: &PostgresConnectionOptions,
90) -> DFResult<PostgresPool> {
91    let mut config = bb8_postgres::tokio_postgres::config::Config::new();
92    config
93        .host(&options.host)
94        .port(options.port)
95        .user(&options.username)
96        .password(&options.password);
97    if let Some(database) = &options.database {
98        config.dbname(database);
99    }
100    let manager = PostgresConnectionManager::new(config, NoTls);
101    let pool = bb8::Pool::builder()
102        .max_size(options.pool_max_size as u32)
103        .build(manager)
104        .await
105        .map_err(|e| {
106            DataFusionError::Execution(format!(
107                "Failed to create postgres connection pool due to {e}",
108            ))
109        })?;
110
111    Ok(PostgresPool { pool })
112}
113
114#[derive(Debug)]
115pub(crate) struct PostgresConnection {
116    conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
117}
118
119#[async_trait::async_trait]
120impl Connection for PostgresConnection {
121    fn as_any(&self) -> &dyn Any {
122        self
123    }
124
125    async fn infer_schema(&self, sql: &str) -> DFResult<RemoteSchemaRef> {
126        let sql = RemoteDbType::Postgres.query_limit_1(sql);
127        let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
128            DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
129        })?;
130        let remote_schema = Arc::new(build_remote_schema(&row)?);
131        Ok(remote_schema)
132    }
133
134    async fn query(
135        &self,
136        conn_options: &ConnectionOptions,
137        sql: &str,
138        table_schema: SchemaRef,
139        projection: Option<&Vec<usize>>,
140        unparsed_filters: &[String],
141        limit: Option<usize>,
142    ) -> DFResult<SendableRecordBatchStream> {
143        let projected_schema = project_schema(&table_schema, projection)?;
144
145        let sql = RemoteDbType::Postgres.rewrite_query(sql, unparsed_filters, limit);
146        debug!("[remote-table] executing postgres query: {sql}");
147
148        let projection = projection.cloned();
149        let chunk_size = conn_options.stream_chunk_size();
150        let stream = self
151            .conn
152            .query_raw(&sql, Vec::<String>::new())
153            .await
154            .map_err(|e| {
155                DataFusionError::Execution(format!(
156                    "Failed to execute query {sql} on postgres: {e}",
157                ))
158            })?
159            .chunks(chunk_size)
160            .boxed();
161
162        let stream = stream.map(move |rows| {
163            let rows: Vec<Row> = rows
164                .into_iter()
165                .collect::<Result<Vec<_>, _>>()
166                .map_err(|e| {
167                    DataFusionError::Execution(format!(
168                        "Failed to collect rows from postgres due to {e}",
169                    ))
170                })?;
171            rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
172        });
173
174        Ok(Box::pin(RecordBatchStreamAdapter::new(
175            projected_schema,
176            stream,
177        )))
178    }
179}
180
181fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<PostgresType> {
182    match pg_type {
183        &Type::INT2 => Ok(PostgresType::Int2),
184        &Type::INT4 => Ok(PostgresType::Int4),
185        &Type::INT8 => Ok(PostgresType::Int8),
186        &Type::FLOAT4 => Ok(PostgresType::Float4),
187        &Type::FLOAT8 => Ok(PostgresType::Float8),
188        &Type::NUMERIC => {
189            let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
190                DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
191            })?;
192            let scale = match v {
193                Some(v) => v.scale,
194                None => 0,
195            };
196            assert!((scale as u32) <= (i8::MAX as u32));
197            Ok(PostgresType::Numeric(scale.try_into().unwrap_or_default()))
198        }
199        &Type::OID => Ok(PostgresType::Oid),
200        &Type::NAME => Ok(PostgresType::Name),
201        &Type::VARCHAR => Ok(PostgresType::Varchar),
202        &Type::BPCHAR => Ok(PostgresType::Bpchar),
203        &Type::TEXT => Ok(PostgresType::Text),
204        &Type::BYTEA => Ok(PostgresType::Bytea),
205        &Type::DATE => Ok(PostgresType::Date),
206        &Type::TIMESTAMP => Ok(PostgresType::Timestamp),
207        &Type::TIMESTAMPTZ => Ok(PostgresType::TimestampTz),
208        &Type::TIME => Ok(PostgresType::Time),
209        &Type::INTERVAL => Ok(PostgresType::Interval),
210        &Type::BOOL => Ok(PostgresType::Bool),
211        &Type::JSON => Ok(PostgresType::Json),
212        &Type::JSONB => Ok(PostgresType::Jsonb),
213        &Type::INT2_ARRAY => Ok(PostgresType::Int2Array),
214        &Type::INT4_ARRAY => Ok(PostgresType::Int4Array),
215        &Type::INT8_ARRAY => Ok(PostgresType::Int8Array),
216        &Type::FLOAT4_ARRAY => Ok(PostgresType::Float4Array),
217        &Type::FLOAT8_ARRAY => Ok(PostgresType::Float8Array),
218        &Type::VARCHAR_ARRAY => Ok(PostgresType::VarcharArray),
219        &Type::BPCHAR_ARRAY => Ok(PostgresType::BpcharArray),
220        &Type::TEXT_ARRAY => Ok(PostgresType::TextArray),
221        &Type::BYTEA_ARRAY => Ok(PostgresType::ByteaArray),
222        &Type::BOOL_ARRAY => Ok(PostgresType::BoolArray),
223        &Type::XML => Ok(PostgresType::Xml),
224        &Type::UUID => Ok(PostgresType::Uuid),
225        other if other.name().eq_ignore_ascii_case("geometry") => Ok(PostgresType::PostGisGeometry),
226        _ => Err(DataFusionError::NotImplemented(format!(
227            "Unsupported postgres type {pg_type:?}",
228        ))),
229    }
230}
231
232fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
233    let mut remote_fields = vec![];
234    for (idx, col) in row.columns().iter().enumerate() {
235        remote_fields.push(RemoteField::new(
236            col.name(),
237            RemoteType::Postgres(pg_type_to_remote_type(col.type_(), row, idx)?),
238            true,
239        ));
240    }
241    Ok(RemoteSchema::new(remote_fields))
242}
243
244macro_rules! handle_primitive_type {
245    ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
246        let builder = $builder
247            .as_any_mut()
248            .downcast_mut::<$builder_ty>()
249            .unwrap_or_else(|| {
250                panic!(
251                    "Failed to downcast builder to {} for {:?} and {:?}",
252                    stringify!($builder_ty),
253                    $field,
254                    $col
255                )
256            });
257        let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
258            DataFusionError::Execution(format!(
259                "Failed to get {} value for {:?} and {:?}: {e:?}",
260                stringify!($value_ty),
261                $field,
262                $col
263            ))
264        })?;
265
266        match v {
267            Some(v) => builder.append_value($convert(v)?),
268            None => builder.append_null(),
269        }
270    }};
271}
272
273macro_rules! handle_primitive_array_type {
274    ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
275        let builder = $builder
276            .as_any_mut()
277            .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
278            .unwrap_or_else(|| {
279                panic!(
280                    "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
281                    $field, $col
282                )
283            });
284        let values_builder = builder
285            .values()
286            .as_any_mut()
287            .downcast_mut::<$values_builder_ty>()
288            .unwrap_or_else(|| {
289                panic!(
290                    "Failed to downcast values builder to {} for {:?} and {:?}",
291                    stringify!($builder_ty),
292                    $field,
293                    $col,
294                )
295            });
296        let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
297            DataFusionError::Execution(format!(
298                "Failed to get {} array value for {:?} and {:?}: {e:?}",
299                stringify!($value_ty),
300                $field,
301                $col,
302            ))
303        })?;
304
305        match v {
306            Some(v) => {
307                let v = v.into_iter().map(Some);
308                values_builder.extend(v);
309                builder.append(true);
310            }
311            None => builder.append_null(),
312        }
313    }};
314}
315
316#[derive(Debug)]
317struct BigDecimalFromSql {
318    inner: BigDecimal,
319    scale: u16,
320}
321
322impl BigDecimalFromSql {
323    fn to_decimal_128(&self) -> Option<i128> {
324        big_decimal_to_i128(&self.inner, Some(self.scale as i32))
325    }
326}
327
328#[allow(clippy::cast_sign_loss)]
329#[allow(clippy::cast_possible_wrap)]
330#[allow(clippy::cast_possible_truncation)]
331impl<'a> FromSql<'a> for BigDecimalFromSql {
332    fn from_sql(
333        _ty: &Type,
334        raw: &'a [u8],
335    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
336        let raw_u16: Vec<u16> = raw
337            .chunks(2)
338            .map(|chunk| {
339                if chunk.len() == 2 {
340                    u16::from_be_bytes([chunk[0], chunk[1]])
341                } else {
342                    u16::from_be_bytes([chunk[0], 0])
343                }
344            })
345            .collect();
346
347        let base_10_000_digit_count = raw_u16[0];
348        let weight = raw_u16[1] as i16;
349        let sign = raw_u16[2];
350        let scale = raw_u16[3];
351
352        let mut base_10_000_digits = Vec::new();
353        for i in 4..4 + base_10_000_digit_count {
354            base_10_000_digits.push(raw_u16[i as usize]);
355        }
356
357        let mut u8_digits = Vec::new();
358        for &base_10_000_digit in base_10_000_digits.iter().rev() {
359            let mut base_10_000_digit = base_10_000_digit;
360            let mut temp_result = Vec::new();
361            while base_10_000_digit > 0 {
362                temp_result.push((base_10_000_digit % 10) as u8);
363                base_10_000_digit /= 10;
364            }
365            while temp_result.len() < 4 {
366                temp_result.push(0);
367            }
368            u8_digits.extend(temp_result);
369        }
370        u8_digits.reverse();
371
372        let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
373        let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
374        u8_digits.resize(size as usize, 0);
375
376        let sign = match sign {
377            0x4000 => Sign::Minus,
378            0x0000 => Sign::Plus,
379            _ => {
380                return Err(Box::new(DataFusionError::Execution(
381                    "Failed to parse big decimal from postgres numeric value".to_string(),
382                )));
383            }
384        };
385
386        let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
387            return Err(Box::new(DataFusionError::Execution(
388                "Failed to parse big decimal from postgres numeric value".to_string(),
389            )));
390        };
391        Ok(BigDecimalFromSql {
392            inner: BigDecimal::new(digits, i64::from(scale)),
393            scale,
394        })
395    }
396
397    fn accepts(ty: &Type) -> bool {
398        matches!(*ty, Type::NUMERIC)
399    }
400}
401
402// interval_send - Postgres C (https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/timestamp.c#L1032)
403// interval values are internally stored as three integral fields: months, days, and microseconds
404#[derive(Debug)]
405struct IntervalFromSql {
406    time: i64,
407    day: i32,
408    month: i32,
409}
410
411impl<'a> FromSql<'a> for IntervalFromSql {
412    fn from_sql(
413        _ty: &Type,
414        raw: &'a [u8],
415    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
416        let mut cursor = std::io::Cursor::new(raw);
417
418        let time = cursor.read_i64::<BigEndian>()?;
419        let day = cursor.read_i32::<BigEndian>()?;
420        let month = cursor.read_i32::<BigEndian>()?;
421
422        Ok(IntervalFromSql { time, day, month })
423    }
424
425    fn accepts(ty: &Type) -> bool {
426        matches!(*ty, Type::INTERVAL)
427    }
428}
429
430struct GeometryFromSql<'a> {
431    wkb: &'a [u8],
432}
433
434impl<'a> FromSql<'a> for GeometryFromSql<'a> {
435    fn from_sql(
436        _ty: &Type,
437        raw: &'a [u8],
438    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
439        Ok(GeometryFromSql { wkb: raw })
440    }
441
442    fn accepts(ty: &Type) -> bool {
443        matches!(ty.name(), "geometry")
444    }
445}
446
447struct XmlFromSql<'a> {
448    xml: &'a str,
449}
450
451impl<'a> FromSql<'a> for XmlFromSql<'a> {
452    fn from_sql(
453        _ty: &Type,
454        raw: &'a [u8],
455    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
456        let xml = str::from_utf8(raw)?;
457        Ok(XmlFromSql { xml })
458    }
459
460    fn accepts(ty: &Type) -> bool {
461        matches!(*ty, Type::XML)
462    }
463}
464
465fn rows_to_batch(
466    rows: &[Row],
467    table_schema: &SchemaRef,
468    projection: Option<&Vec<usize>>,
469) -> DFResult<RecordBatch> {
470    let projected_schema = project_schema(table_schema, projection)?;
471    let mut array_builders = vec![];
472    for field in table_schema.fields() {
473        let builder = make_builder(field.data_type(), rows.len());
474        array_builders.push(builder);
475    }
476
477    for row in rows {
478        for (idx, field) in table_schema.fields.iter().enumerate() {
479            if !projections_contains(projection, idx) {
480                continue;
481            }
482            let builder = &mut array_builders[idx];
483            let col = row.columns().get(idx);
484            match field.data_type() {
485                DataType::Int16 => {
486                    handle_primitive_type!(
487                        builder,
488                        field,
489                        col,
490                        Int16Builder,
491                        i16,
492                        row,
493                        idx,
494                        just_return
495                    );
496                }
497                DataType::Int32 => {
498                    handle_primitive_type!(
499                        builder,
500                        field,
501                        col,
502                        Int32Builder,
503                        i32,
504                        row,
505                        idx,
506                        just_return
507                    );
508                }
509                DataType::UInt32 => {
510                    handle_primitive_type!(
511                        builder,
512                        field,
513                        col,
514                        UInt32Builder,
515                        u32,
516                        row,
517                        idx,
518                        just_return
519                    );
520                }
521                DataType::Int64 => {
522                    handle_primitive_type!(
523                        builder,
524                        field,
525                        col,
526                        Int64Builder,
527                        i64,
528                        row,
529                        idx,
530                        just_return
531                    );
532                }
533                DataType::Float32 => {
534                    handle_primitive_type!(
535                        builder,
536                        field,
537                        col,
538                        Float32Builder,
539                        f32,
540                        row,
541                        idx,
542                        just_return
543                    );
544                }
545                DataType::Float64 => {
546                    handle_primitive_type!(
547                        builder,
548                        field,
549                        col,
550                        Float64Builder,
551                        f64,
552                        row,
553                        idx,
554                        just_return
555                    );
556                }
557                DataType::Decimal128(_precision, _scale) => {
558                    handle_primitive_type!(
559                        builder,
560                        field,
561                        col,
562                        Decimal128Builder,
563                        BigDecimalFromSql,
564                        row,
565                        idx,
566                        |v: BigDecimalFromSql| {
567                            v.to_decimal_128().ok_or_else(|| {
568                                DataFusionError::Execution(format!(
569                                    "Failed to convert BigDecimal {v:?} to i128",
570                                ))
571                            })
572                        }
573                    );
574                }
575                DataType::Utf8 => {
576                    if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("xml") {
577                        let convert: for<'a> fn(XmlFromSql<'a>) -> DFResult<&'a str> =
578                            |v| Ok(v.xml);
579                        handle_primitive_type!(
580                            builder,
581                            field,
582                            col,
583                            StringBuilder,
584                            XmlFromSql,
585                            row,
586                            idx,
587                            convert
588                        );
589                    } else {
590                        handle_primitive_type!(
591                            builder,
592                            field,
593                            col,
594                            StringBuilder,
595                            &str,
596                            row,
597                            idx,
598                            just_return
599                        );
600                    }
601                }
602                DataType::LargeUtf8 => {
603                    if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
604                        handle_primitive_type!(
605                            builder,
606                            field,
607                            col,
608                            LargeStringBuilder,
609                            serde_json::value::Value,
610                            row,
611                            idx,
612                            |v: serde_json::value::Value| {
613                                Ok::<_, DataFusionError>(v.to_string())
614                            }
615                        );
616                    } else {
617                        handle_primitive_type!(
618                            builder,
619                            field,
620                            col,
621                            LargeStringBuilder,
622                            &str,
623                            row,
624                            idx,
625                            just_return
626                        );
627                    }
628                }
629                DataType::Binary => {
630                    if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
631                    {
632                        let convert: for<'a> fn(GeometryFromSql<'a>) -> DFResult<&'a [u8]> =
633                            |v| Ok(v.wkb);
634                        handle_primitive_type!(
635                            builder,
636                            field,
637                            col,
638                            BinaryBuilder,
639                            GeometryFromSql,
640                            row,
641                            idx,
642                            convert
643                        );
644                    } else if col.is_some()
645                        && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
646                    {
647                        handle_primitive_type!(
648                            builder,
649                            field,
650                            col,
651                            BinaryBuilder,
652                            serde_json::value::Value,
653                            row,
654                            idx,
655                            |v: serde_json::value::Value| {
656                                Ok::<_, DataFusionError>(v.to_string().into_bytes())
657                            }
658                        );
659                    } else {
660                        handle_primitive_type!(
661                            builder,
662                            field,
663                            col,
664                            BinaryBuilder,
665                            Vec<u8>,
666                            row,
667                            idx,
668                            just_return
669                        );
670                    }
671                }
672                DataType::FixedSizeBinary(_) => {
673                    let builder = builder
674                        .as_any_mut()
675                        .downcast_mut::<FixedSizeBinaryBuilder>()
676                        .unwrap_or_else(|| {
677                            panic!(
678                                "Failed to downcast builder to FixedSizeBinaryBuilder for {field:?}"
679                            )
680                        });
681                    let v = if col.is_some()
682                        && col.unwrap().type_().name().eq_ignore_ascii_case("uuid")
683                    {
684                        let v: Option<Uuid> = row.try_get(idx).map_err(|e| {
685                            DataFusionError::Execution(format!(
686                                "Failed to get Uuid value for field {:?}: {e:?}",
687                                field
688                            ))
689                        })?;
690                        v.map(|v| v.as_bytes().to_vec())
691                    } else {
692                        let v: Option<Vec<u8>> = row.try_get(idx).map_err(|e| {
693                            DataFusionError::Execution(format!(
694                                "Failed to get FixedSizeBinary value for field {:?}: {e:?}",
695                                field
696                            ))
697                        })?;
698                        v
699                    };
700
701                    match v {
702                        Some(v) => builder.append_value(v)?,
703                        None => builder.append_null(),
704                    }
705                }
706                DataType::Timestamp(TimeUnit::Microsecond, None) => {
707                    handle_primitive_type!(
708                        builder,
709                        field,
710                        col,
711                        TimestampMicrosecondBuilder,
712                        SystemTime,
713                        row,
714                        idx,
715                        |v: SystemTime| {
716                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
717                                let timestamp: i64 = v
718                                    .as_micros()
719                                    .try_into()
720                                    .expect("Failed to convert SystemTime to i64");
721                                Ok(timestamp)
722                            } else {
723                                Err(DataFusionError::Execution(format!(
724                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
725                                )))
726                            }
727                        }
728                    );
729                }
730                DataType::Timestamp(TimeUnit::Nanosecond, None) => {
731                    handle_primitive_type!(
732                        builder,
733                        field,
734                        col,
735                        TimestampNanosecondBuilder,
736                        SystemTime,
737                        row,
738                        idx,
739                        |v: SystemTime| {
740                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
741                                let timestamp: i64 = v
742                                    .as_nanos()
743                                    .try_into()
744                                    .expect("Failed to convert SystemTime to i64");
745                                Ok(timestamp)
746                            } else {
747                                Err(DataFusionError::Execution(format!(
748                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
749                                )))
750                            }
751                        }
752                    );
753                }
754                DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
755                    handle_primitive_type!(
756                        builder,
757                        field,
758                        col,
759                        TimestampNanosecondBuilder,
760                        chrono::DateTime<chrono::Utc>,
761                        row,
762                        idx,
763                        |v: chrono::DateTime<chrono::Utc>| {
764                            let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
765                            Ok::<_, DataFusionError>(timestamp)
766                        }
767                    );
768                }
769                DataType::Time64(TimeUnit::Microsecond) => {
770                    handle_primitive_type!(
771                        builder,
772                        field,
773                        col,
774                        Time64MicrosecondBuilder,
775                        chrono::NaiveTime,
776                        row,
777                        idx,
778                        |v: chrono::NaiveTime| {
779                            let seconds = i64::from(v.num_seconds_from_midnight());
780                            let microseconds = i64::from(v.nanosecond()) / 1000;
781                            Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
782                        }
783                    );
784                }
785                DataType::Time64(TimeUnit::Nanosecond) => {
786                    handle_primitive_type!(
787                        builder,
788                        field,
789                        col,
790                        Time64NanosecondBuilder,
791                        chrono::NaiveTime,
792                        row,
793                        idx,
794                        |v: chrono::NaiveTime| {
795                            let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
796                                * 1_000_000_000
797                                + i64::from(v.nanosecond());
798                            Ok::<_, DataFusionError>(timestamp)
799                        }
800                    );
801                }
802                DataType::Date32 => {
803                    handle_primitive_type!(
804                        builder,
805                        field,
806                        col,
807                        Date32Builder,
808                        chrono::NaiveDate,
809                        row,
810                        idx,
811                        |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
812                    );
813                }
814                DataType::Interval(IntervalUnit::MonthDayNano) => {
815                    handle_primitive_type!(
816                        builder,
817                        field,
818                        col,
819                        IntervalMonthDayNanoBuilder,
820                        IntervalFromSql,
821                        row,
822                        idx,
823                        |v: IntervalFromSql| {
824                            let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
825                                v.month,
826                                v.day,
827                                v.time * 1_000,
828                            );
829                            Ok::<_, DataFusionError>(interval_month_day_nano)
830                        }
831                    );
832                }
833                DataType::Boolean => {
834                    handle_primitive_type!(
835                        builder,
836                        field,
837                        col,
838                        BooleanBuilder,
839                        bool,
840                        row,
841                        idx,
842                        just_return
843                    );
844                }
845                DataType::List(inner) => match inner.data_type() {
846                    DataType::Int16 => {
847                        handle_primitive_array_type!(
848                            builder,
849                            field,
850                            col,
851                            Int16Builder,
852                            i16,
853                            row,
854                            idx
855                        );
856                    }
857                    DataType::Int32 => {
858                        handle_primitive_array_type!(
859                            builder,
860                            field,
861                            col,
862                            Int32Builder,
863                            i32,
864                            row,
865                            idx
866                        );
867                    }
868                    DataType::Int64 => {
869                        handle_primitive_array_type!(
870                            builder,
871                            field,
872                            col,
873                            Int64Builder,
874                            i64,
875                            row,
876                            idx
877                        );
878                    }
879                    DataType::Float32 => {
880                        handle_primitive_array_type!(
881                            builder,
882                            field,
883                            col,
884                            Float32Builder,
885                            f32,
886                            row,
887                            idx
888                        );
889                    }
890                    DataType::Float64 => {
891                        handle_primitive_array_type!(
892                            builder,
893                            field,
894                            col,
895                            Float64Builder,
896                            f64,
897                            row,
898                            idx
899                        );
900                    }
901                    DataType::Utf8 => {
902                        handle_primitive_array_type!(
903                            builder,
904                            field,
905                            col,
906                            StringBuilder,
907                            &str,
908                            row,
909                            idx
910                        );
911                    }
912                    DataType::Binary => {
913                        handle_primitive_array_type!(
914                            builder,
915                            field,
916                            col,
917                            BinaryBuilder,
918                            Vec<u8>,
919                            row,
920                            idx
921                        );
922                    }
923                    DataType::Boolean => {
924                        handle_primitive_array_type!(
925                            builder,
926                            field,
927                            col,
928                            BooleanBuilder,
929                            bool,
930                            row,
931                            idx
932                        );
933                    }
934                    _ => {
935                        return Err(DataFusionError::NotImplemented(format!(
936                            "Unsupported list data type {} for col: {:?}",
937                            field.data_type(),
938                            col
939                        )));
940                    }
941                },
942                _ => {
943                    return Err(DataFusionError::NotImplemented(format!(
944                        "Unsupported data type {} for col: {:?}",
945                        field.data_type(),
946                        col
947                    )));
948                }
949            }
950        }
951    }
952    let projected_columns = array_builders
953        .into_iter()
954        .enumerate()
955        .filter(|(idx, _)| projections_contains(projection, *idx))
956        .map(|(_, mut builder)| builder.finish())
957        .collect::<Vec<ArrayRef>>();
958    let options = RecordBatchOptions::new().with_row_count(Some(rows.len()));
959    Ok(RecordBatch::try_new_with_options(
960        projected_schema,
961        projected_columns,
962        &options,
963    )?)
964}