datafusion_remote_table/connection/
postgres.rs

1use crate::connection::{big_decimal_to_i128, projections_contains};
2use crate::{
3    Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4    RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14    Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15    IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
16    Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
17    TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20    DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use derive_getters::Getters;
27use derive_with::With;
28use futures::StreamExt;
29use num_bigint::{BigInt, Sign};
30use std::string::ToString;
31use std::sync::Arc;
32use std::time::{SystemTime, UNIX_EPOCH};
33
34#[derive(Debug, Clone, With, Getters)]
35pub struct PostgresConnectionOptions {
36    pub(crate) host: String,
37    pub(crate) port: u16,
38    pub(crate) username: String,
39    pub(crate) password: String,
40    pub(crate) database: Option<String>,
41    pub(crate) pool_max_size: usize,
42    pub(crate) stream_chunk_size: usize,
43}
44
45impl PostgresConnectionOptions {
46    pub fn new(
47        host: impl Into<String>,
48        port: u16,
49        username: impl Into<String>,
50        password: impl Into<String>,
51    ) -> Self {
52        Self {
53            host: host.into(),
54            port,
55            username: username.into(),
56            password: password.into(),
57            database: None,
58            pool_max_size: 10,
59            stream_chunk_size: 2048,
60        }
61    }
62}
63
64#[derive(Debug)]
65pub struct PostgresPool {
66    pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
67}
68
69#[async_trait::async_trait]
70impl Pool for PostgresPool {
71    async fn get(&self) -> DFResult<Arc<dyn Connection>> {
72        let conn = self.pool.get_owned().await.map_err(|e| {
73            DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
74        })?;
75        Ok(Arc::new(PostgresConnection { conn }))
76    }
77}
78
79pub(crate) async fn connect_postgres(
80    options: &PostgresConnectionOptions,
81) -> DFResult<PostgresPool> {
82    let mut config = bb8_postgres::tokio_postgres::config::Config::new();
83    config
84        .host(&options.host)
85        .port(options.port)
86        .user(&options.username)
87        .password(&options.password);
88    if let Some(database) = &options.database {
89        config.dbname(database);
90    }
91    let manager = PostgresConnectionManager::new(config, NoTls);
92    let pool = bb8::Pool::builder()
93        .max_size(options.pool_max_size as u32)
94        .build(manager)
95        .await
96        .map_err(|e| {
97            DataFusionError::Execution(format!(
98                "Failed to create postgres connection pool due to {e}",
99            ))
100        })?;
101
102    Ok(PostgresPool { pool })
103}
104
105#[derive(Debug)]
106pub(crate) struct PostgresConnection {
107    conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
108}
109
110#[async_trait::async_trait]
111impl Connection for PostgresConnection {
112    async fn infer_schema(&self, sql: &str) -> DFResult<(RemoteSchemaRef, SchemaRef)> {
113        let sql = try_limit1_query(sql).unwrap_or_else(|| sql.to_string());
114        let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
115            DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
116        })?;
117        let remote_schema = Arc::new(build_remote_schema(&row)?);
118        let arrow_schema = Arc::new(remote_schema.to_arrow_schema());
119        Ok((remote_schema, arrow_schema))
120    }
121
122    async fn query(
123        &self,
124        conn_options: &ConnectionOptions,
125        sql: &str,
126        table_schema: SchemaRef,
127        projection: Option<&Vec<usize>>,
128    ) -> DFResult<SendableRecordBatchStream> {
129        let projected_schema = project_schema(&table_schema, projection)?;
130        let projection = projection.cloned();
131        let chunk_size = conn_options.stream_chunk_size();
132        let stream = self
133            .conn
134            .query_raw(sql, Vec::<String>::new())
135            .await
136            .map_err(|e| {
137                DataFusionError::Execution(format!(
138                    "Failed to execute query {sql} on postgres due to {e}",
139                ))
140            })?
141            .chunks(chunk_size)
142            .boxed();
143
144        let stream = stream.map(move |rows| {
145            let rows: Vec<Row> = rows
146                .into_iter()
147                .collect::<Result<Vec<_>, _>>()
148                .map_err(|e| {
149                    DataFusionError::Execution(format!(
150                        "Failed to collect rows from postgres due to {e}",
151                    ))
152                })?;
153            rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
154        });
155
156        Ok(Box::pin(RecordBatchStreamAdapter::new(
157            projected_schema,
158            stream,
159        )))
160    }
161}
162
163fn try_limit1_query(sql: &str) -> Option<String> {
164    if sql.trim()[0..6].eq_ignore_ascii_case("select") {
165        Some(format!("SELECT * FROM ({sql}) as __subquery LIMIT 1"))
166    } else {
167        None
168    }
169}
170
171fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<RemoteType> {
172    match pg_type {
173        &Type::INT2 => Ok(RemoteType::Postgres(PostgresType::Int2)),
174        &Type::INT4 => Ok(RemoteType::Postgres(PostgresType::Int4)),
175        &Type::INT8 => Ok(RemoteType::Postgres(PostgresType::Int8)),
176        &Type::FLOAT4 => Ok(RemoteType::Postgres(PostgresType::Float4)),
177        &Type::FLOAT8 => Ok(RemoteType::Postgres(PostgresType::Float8)),
178        &Type::NUMERIC => {
179            let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
180                DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
181            })?;
182            let scale = match v {
183                Some(v) => v.scale,
184                None => 0,
185            };
186            assert!((scale as u32) <= (i8::MAX as u32));
187            Ok(RemoteType::Postgres(PostgresType::Numeric(
188                scale.try_into().unwrap_or_default(),
189            )))
190        }
191        &Type::OID => Ok(RemoteType::Postgres(PostgresType::Oid)),
192        &Type::NAME => Ok(RemoteType::Postgres(PostgresType::Name)),
193        &Type::VARCHAR => Ok(RemoteType::Postgres(PostgresType::Varchar)),
194        &Type::BPCHAR => Ok(RemoteType::Postgres(PostgresType::Bpchar)),
195        &Type::TEXT => Ok(RemoteType::Postgres(PostgresType::Text)),
196        &Type::BYTEA => Ok(RemoteType::Postgres(PostgresType::Bytea)),
197        &Type::DATE => Ok(RemoteType::Postgres(PostgresType::Date)),
198        &Type::TIMESTAMP => Ok(RemoteType::Postgres(PostgresType::Timestamp)),
199        &Type::TIMESTAMPTZ => Ok(RemoteType::Postgres(PostgresType::TimestampTz)),
200        &Type::TIME => Ok(RemoteType::Postgres(PostgresType::Time)),
201        &Type::INTERVAL => Ok(RemoteType::Postgres(PostgresType::Interval)),
202        &Type::BOOL => Ok(RemoteType::Postgres(PostgresType::Bool)),
203        &Type::JSON => Ok(RemoteType::Postgres(PostgresType::Json)),
204        &Type::JSONB => Ok(RemoteType::Postgres(PostgresType::Jsonb)),
205        &Type::INT2_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int2Array)),
206        &Type::INT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int4Array)),
207        &Type::INT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int8Array)),
208        &Type::FLOAT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float4Array)),
209        &Type::FLOAT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float8Array)),
210        &Type::VARCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::VarcharArray)),
211        &Type::BPCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::BpcharArray)),
212        &Type::TEXT_ARRAY => Ok(RemoteType::Postgres(PostgresType::TextArray)),
213        &Type::BYTEA_ARRAY => Ok(RemoteType::Postgres(PostgresType::ByteaArray)),
214        &Type::BOOL_ARRAY => Ok(RemoteType::Postgres(PostgresType::BoolArray)),
215        other if other.name().eq_ignore_ascii_case("geometry") => {
216            Ok(RemoteType::Postgres(PostgresType::PostGisGeometry))
217        }
218        _ => Err(DataFusionError::NotImplemented(format!(
219            "Unsupported postgres type {pg_type:?}",
220        ))),
221    }
222}
223
224fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
225    let mut remote_fields = vec![];
226    for (idx, col) in row.columns().iter().enumerate() {
227        remote_fields.push(RemoteField::new(
228            col.name(),
229            pg_type_to_remote_type(col.type_(), row, idx)?,
230            true,
231        ));
232    }
233    Ok(RemoteSchema::new(remote_fields))
234}
235
236macro_rules! handle_primitive_type {
237    ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
238        let builder = $builder
239            .as_any_mut()
240            .downcast_mut::<$builder_ty>()
241            .unwrap_or_else(|| {
242                panic!(
243                    "Failed to downcast builder to {} for {:?} and {:?}",
244                    stringify!($builder_ty),
245                    $field,
246                    $col
247                )
248            });
249        let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
250            DataFusionError::Execution(format!(
251                "Failed to get {} value for {:?} and {:?}: {e:?}",
252                stringify!($value_ty),
253                $field,
254                $col
255            ))
256        })?;
257
258        match v {
259            Some(v) => builder.append_value($convert(v)?),
260            None => builder.append_null(),
261        }
262    }};
263}
264
265macro_rules! handle_primitive_array_type {
266    ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
267        let builder = $builder
268            .as_any_mut()
269            .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
270            .unwrap_or_else(|| {
271                panic!(
272                    "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
273                    $field, $col
274                )
275            });
276        let values_builder = builder
277            .values()
278            .as_any_mut()
279            .downcast_mut::<$values_builder_ty>()
280            .unwrap_or_else(|| {
281                panic!(
282                    "Failed to downcast values builder to {} for {:?} and {:?}",
283                    stringify!($builder_ty),
284                    $field,
285                    $col,
286                )
287            });
288        let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
289            DataFusionError::Execution(format!(
290                "Failed to get {} array value for {:?} and {:?}: {e:?}",
291                stringify!($value_ty),
292                $field,
293                $col,
294            ))
295        })?;
296
297        match v {
298            Some(v) => {
299                let v = v.into_iter().map(Some);
300                values_builder.extend(v);
301                builder.append(true);
302            }
303            None => builder.append_null(),
304        }
305    }};
306}
307
308#[derive(Debug)]
309struct BigDecimalFromSql {
310    inner: BigDecimal,
311    scale: u16,
312}
313
314impl BigDecimalFromSql {
315    fn to_decimal_128(&self) -> Option<i128> {
316        big_decimal_to_i128(&self.inner, Some(self.scale as i32))
317    }
318}
319
320#[allow(clippy::cast_sign_loss)]
321#[allow(clippy::cast_possible_wrap)]
322#[allow(clippy::cast_possible_truncation)]
323impl<'a> FromSql<'a> for BigDecimalFromSql {
324    fn from_sql(
325        _ty: &Type,
326        raw: &'a [u8],
327    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
328        let raw_u16: Vec<u16> = raw
329            .chunks(2)
330            .map(|chunk| {
331                if chunk.len() == 2 {
332                    u16::from_be_bytes([chunk[0], chunk[1]])
333                } else {
334                    u16::from_be_bytes([chunk[0], 0])
335                }
336            })
337            .collect();
338
339        let base_10_000_digit_count = raw_u16[0];
340        let weight = raw_u16[1] as i16;
341        let sign = raw_u16[2];
342        let scale = raw_u16[3];
343
344        let mut base_10_000_digits = Vec::new();
345        for i in 4..4 + base_10_000_digit_count {
346            base_10_000_digits.push(raw_u16[i as usize]);
347        }
348
349        let mut u8_digits = Vec::new();
350        for &base_10_000_digit in base_10_000_digits.iter().rev() {
351            let mut base_10_000_digit = base_10_000_digit;
352            let mut temp_result = Vec::new();
353            while base_10_000_digit > 0 {
354                temp_result.push((base_10_000_digit % 10) as u8);
355                base_10_000_digit /= 10;
356            }
357            while temp_result.len() < 4 {
358                temp_result.push(0);
359            }
360            u8_digits.extend(temp_result);
361        }
362        u8_digits.reverse();
363
364        let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
365        let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
366        u8_digits.resize(size as usize, 0);
367
368        let sign = match sign {
369            0x4000 => Sign::Minus,
370            0x0000 => Sign::Plus,
371            _ => {
372                return Err(Box::new(DataFusionError::Execution(
373                    "Failed to parse big decimal from postgres numeric value".to_string(),
374                )));
375            }
376        };
377
378        let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
379            return Err(Box::new(DataFusionError::Execution(
380                "Failed to parse big decimal from postgres numeric value".to_string(),
381            )));
382        };
383        Ok(BigDecimalFromSql {
384            inner: BigDecimal::new(digits, i64::from(scale)),
385            scale,
386        })
387    }
388
389    fn accepts(ty: &Type) -> bool {
390        matches!(*ty, Type::NUMERIC)
391    }
392}
393
394// interval_send - Postgres C (https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/timestamp.c#L1032)
395// interval values are internally stored as three integral fields: months, days, and microseconds
396#[derive(Debug)]
397struct IntervalFromSql {
398    time: i64,
399    day: i32,
400    month: i32,
401}
402
403impl<'a> FromSql<'a> for IntervalFromSql {
404    fn from_sql(
405        _ty: &Type,
406        raw: &'a [u8],
407    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
408        let mut cursor = std::io::Cursor::new(raw);
409
410        let time = cursor.read_i64::<BigEndian>()?;
411        let day = cursor.read_i32::<BigEndian>()?;
412        let month = cursor.read_i32::<BigEndian>()?;
413
414        Ok(IntervalFromSql { time, day, month })
415    }
416
417    fn accepts(ty: &Type) -> bool {
418        matches!(*ty, Type::INTERVAL)
419    }
420}
421
422struct GeometryFromSql<'a> {
423    wkb: &'a [u8],
424}
425
426impl<'a> FromSql<'a> for GeometryFromSql<'a> {
427    fn from_sql(
428        _ty: &Type,
429        raw: &'a [u8],
430    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
431        Ok(GeometryFromSql { wkb: raw })
432    }
433
434    fn accepts(ty: &Type) -> bool {
435        matches!(ty.name(), "geometry")
436    }
437}
438
439fn rows_to_batch(
440    rows: &[Row],
441    table_schema: &SchemaRef,
442    projection: Option<&Vec<usize>>,
443) -> DFResult<RecordBatch> {
444    let projected_schema = project_schema(table_schema, projection)?;
445    let mut array_builders = vec![];
446    for field in table_schema.fields() {
447        let builder = make_builder(field.data_type(), rows.len());
448        array_builders.push(builder);
449    }
450
451    for row in rows {
452        for (idx, field) in table_schema.fields.iter().enumerate() {
453            if !projections_contains(projection, idx) {
454                continue;
455            }
456            let builder = &mut array_builders[idx];
457            let col = row.columns().get(idx);
458            match field.data_type() {
459                DataType::Int16 => {
460                    handle_primitive_type!(builder, field, col, Int16Builder, i16, row, idx, |v| {
461                        Ok::<_, DataFusionError>(v)
462                    });
463                }
464                DataType::Int32 => {
465                    handle_primitive_type!(builder, field, col, Int32Builder, i32, row, idx, |v| {
466                        Ok::<_, DataFusionError>(v)
467                    });
468                }
469                DataType::UInt32 => {
470                    handle_primitive_type!(
471                        builder,
472                        field,
473                        col,
474                        UInt32Builder,
475                        u32,
476                        row,
477                        idx,
478                        |v| { Ok::<_, DataFusionError>(v) }
479                    );
480                }
481                DataType::Int64 => {
482                    handle_primitive_type!(builder, field, col, Int64Builder, i64, row, idx, |v| {
483                        Ok::<_, DataFusionError>(v)
484                    });
485                }
486                DataType::Float32 => {
487                    handle_primitive_type!(
488                        builder,
489                        field,
490                        col,
491                        Float32Builder,
492                        f32,
493                        row,
494                        idx,
495                        |v| { Ok::<_, DataFusionError>(v) }
496                    );
497                }
498                DataType::Float64 => {
499                    handle_primitive_type!(
500                        builder,
501                        field,
502                        col,
503                        Float64Builder,
504                        f64,
505                        row,
506                        idx,
507                        |v| { Ok::<_, DataFusionError>(v) }
508                    );
509                }
510                DataType::Decimal128(_precision, _scale) => {
511                    handle_primitive_type!(
512                        builder,
513                        field,
514                        col,
515                        Decimal128Builder,
516                        BigDecimalFromSql,
517                        row,
518                        idx,
519                        |v: BigDecimalFromSql| {
520                            v.to_decimal_128().ok_or_else(|| {
521                                DataFusionError::Execution(format!(
522                                    "Failed to convert BigDecimal {v:?} to i128",
523                                ))
524                            })
525                        }
526                    );
527                }
528                DataType::Utf8 => {
529                    handle_primitive_type!(
530                        builder,
531                        field,
532                        col,
533                        StringBuilder,
534                        &str,
535                        row,
536                        idx,
537                        |v| { Ok::<_, DataFusionError>(v) }
538                    );
539                }
540                DataType::LargeUtf8 => {
541                    if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
542                        handle_primitive_type!(
543                            builder,
544                            field,
545                            col,
546                            LargeStringBuilder,
547                            serde_json::value::Value,
548                            row,
549                            idx,
550                            |v: serde_json::value::Value| {
551                                Ok::<_, DataFusionError>(v.to_string())
552                            }
553                        );
554                    } else {
555                        handle_primitive_type!(
556                            builder,
557                            field,
558                            col,
559                            LargeStringBuilder,
560                            &str,
561                            row,
562                            idx,
563                            |v| { Ok::<_, DataFusionError>(v) }
564                        );
565                    }
566                }
567                DataType::Binary => {
568                    if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
569                    {
570                        handle_primitive_type!(
571                            builder,
572                            field,
573                            col,
574                            BinaryBuilder,
575                            GeometryFromSql,
576                            row,
577                            idx,
578                            |v: GeometryFromSql| { Ok::<_, DataFusionError>(v.wkb.to_vec()) }
579                        );
580                    } else if col.is_some()
581                        && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
582                    {
583                        handle_primitive_type!(
584                            builder,
585                            field,
586                            col,
587                            BinaryBuilder,
588                            serde_json::value::Value,
589                            row,
590                            idx,
591                            |v: serde_json::value::Value| {
592                                Ok::<_, DataFusionError>(v.to_string().as_bytes().to_vec())
593                            }
594                        );
595                    } else {
596                        handle_primitive_type!(
597                            builder,
598                            field,
599                            col,
600                            BinaryBuilder,
601                            Vec<u8>,
602                            row,
603                            idx,
604                            |v| { Ok::<_, DataFusionError>(v) }
605                        );
606                    }
607                }
608                DataType::Timestamp(TimeUnit::Microsecond, None) => {
609                    handle_primitive_type!(
610                        builder,
611                        field,
612                        col,
613                        TimestampMicrosecondBuilder,
614                        SystemTime,
615                        row,
616                        idx,
617                        |v: SystemTime| {
618                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
619                                let timestamp: i64 = v
620                                    .as_micros()
621                                    .try_into()
622                                    .expect("Failed to convert SystemTime to i64");
623                                Ok(timestamp)
624                            } else {
625                                Err(DataFusionError::Execution(format!(
626                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
627                                )))
628                            }
629                        }
630                    );
631                }
632                DataType::Timestamp(TimeUnit::Nanosecond, None) => {
633                    handle_primitive_type!(
634                        builder,
635                        field,
636                        col,
637                        TimestampNanosecondBuilder,
638                        SystemTime,
639                        row,
640                        idx,
641                        |v: SystemTime| {
642                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
643                                let timestamp: i64 = v
644                                    .as_nanos()
645                                    .try_into()
646                                    .expect("Failed to convert SystemTime to i64");
647                                Ok(timestamp)
648                            } else {
649                                Err(DataFusionError::Execution(format!(
650                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
651                                )))
652                            }
653                        }
654                    );
655                }
656                DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
657                    handle_primitive_type!(
658                        builder,
659                        field,
660                        col,
661                        TimestampNanosecondBuilder,
662                        chrono::DateTime<chrono::Utc>,
663                        row,
664                        idx,
665                        |v: chrono::DateTime<chrono::Utc>| {
666                            let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
667                            Ok::<_, DataFusionError>(timestamp)
668                        }
669                    );
670                }
671                DataType::Time64(TimeUnit::Microsecond) => {
672                    handle_primitive_type!(
673                        builder,
674                        field,
675                        col,
676                        Time64MicrosecondBuilder,
677                        chrono::NaiveTime,
678                        row,
679                        idx,
680                        |v: chrono::NaiveTime| {
681                            let seconds = i64::from(v.num_seconds_from_midnight());
682                            let microseconds = i64::from(v.nanosecond()) / 1000;
683                            Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
684                        }
685                    );
686                }
687                DataType::Time64(TimeUnit::Nanosecond) => {
688                    handle_primitive_type!(
689                        builder,
690                        field,
691                        col,
692                        Time64NanosecondBuilder,
693                        chrono::NaiveTime,
694                        row,
695                        idx,
696                        |v: chrono::NaiveTime| {
697                            let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
698                                * 1_000_000_000
699                                + i64::from(v.nanosecond());
700                            Ok::<_, DataFusionError>(timestamp)
701                        }
702                    );
703                }
704                DataType::Date32 => {
705                    handle_primitive_type!(
706                        builder,
707                        field,
708                        col,
709                        Date32Builder,
710                        chrono::NaiveDate,
711                        row,
712                        idx,
713                        |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
714                    );
715                }
716                DataType::Interval(IntervalUnit::MonthDayNano) => {
717                    handle_primitive_type!(
718                        builder,
719                        field,
720                        col,
721                        IntervalMonthDayNanoBuilder,
722                        IntervalFromSql,
723                        row,
724                        idx,
725                        |v: IntervalFromSql| {
726                            let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
727                                v.month,
728                                v.day,
729                                v.time * 1_000,
730                            );
731                            Ok::<_, DataFusionError>(interval_month_day_nano)
732                        }
733                    );
734                }
735                DataType::Boolean => {
736                    handle_primitive_type!(
737                        builder,
738                        field,
739                        col,
740                        BooleanBuilder,
741                        bool,
742                        row,
743                        idx,
744                        |v| { Ok::<_, DataFusionError>(v) }
745                    );
746                }
747                DataType::List(inner) => match inner.data_type() {
748                    DataType::Int16 => {
749                        handle_primitive_array_type!(
750                            builder,
751                            field,
752                            col,
753                            Int16Builder,
754                            i16,
755                            row,
756                            idx
757                        );
758                    }
759                    DataType::Int32 => {
760                        handle_primitive_array_type!(
761                            builder,
762                            field,
763                            col,
764                            Int32Builder,
765                            i32,
766                            row,
767                            idx
768                        );
769                    }
770                    DataType::Int64 => {
771                        handle_primitive_array_type!(
772                            builder,
773                            field,
774                            col,
775                            Int64Builder,
776                            i64,
777                            row,
778                            idx
779                        );
780                    }
781                    DataType::Float32 => {
782                        handle_primitive_array_type!(
783                            builder,
784                            field,
785                            col,
786                            Float32Builder,
787                            f32,
788                            row,
789                            idx
790                        );
791                    }
792                    DataType::Float64 => {
793                        handle_primitive_array_type!(
794                            builder,
795                            field,
796                            col,
797                            Float64Builder,
798                            f64,
799                            row,
800                            idx
801                        );
802                    }
803                    DataType::Utf8 => {
804                        handle_primitive_array_type!(
805                            builder,
806                            field,
807                            col,
808                            StringBuilder,
809                            &str,
810                            row,
811                            idx
812                        );
813                    }
814                    DataType::Binary => {
815                        handle_primitive_array_type!(
816                            builder,
817                            field,
818                            col,
819                            BinaryBuilder,
820                            Vec<u8>,
821                            row,
822                            idx
823                        );
824                    }
825                    DataType::Boolean => {
826                        handle_primitive_array_type!(
827                            builder,
828                            field,
829                            col,
830                            BooleanBuilder,
831                            bool,
832                            row,
833                            idx
834                        );
835                    }
836                    _ => {
837                        return Err(DataFusionError::NotImplemented(format!(
838                            "Unsupported list data type {:?} for col: {:?}",
839                            field.data_type(),
840                            col
841                        )));
842                    }
843                },
844                _ => {
845                    return Err(DataFusionError::NotImplemented(format!(
846                        "Unsupported data type {:?} for col: {:?}",
847                        field.data_type(),
848                        col
849                    )));
850                }
851            }
852        }
853    }
854    let projected_columns = array_builders
855        .into_iter()
856        .enumerate()
857        .filter(|(idx, _)| projections_contains(projection, *idx))
858        .map(|(_, mut builder)| builder.finish())
859        .collect::<Vec<ArrayRef>>();
860    Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
861}