datafusion_remote_table/connection/
postgres.rs

1use crate::connection::{RemoteDbType, big_decimal_to_i128, projections_contains};
2use crate::{
3    Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4    RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14    Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15    IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
16    Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
17    TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20    DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use datafusion::prelude::Expr;
27use derive_getters::Getters;
28use derive_with::With;
29use futures::StreamExt;
30use num_bigint::{BigInt, Sign};
31use std::string::ToString;
32use std::sync::Arc;
33use std::time::{SystemTime, UNIX_EPOCH};
34
35#[derive(Debug, Clone, With, Getters)]
36pub struct PostgresConnectionOptions {
37    pub(crate) host: String,
38    pub(crate) port: u16,
39    pub(crate) username: String,
40    pub(crate) password: String,
41    pub(crate) database: Option<String>,
42    pub(crate) pool_max_size: usize,
43    pub(crate) stream_chunk_size: usize,
44}
45
46impl PostgresConnectionOptions {
47    pub fn new(
48        host: impl Into<String>,
49        port: u16,
50        username: impl Into<String>,
51        password: impl Into<String>,
52    ) -> Self {
53        Self {
54            host: host.into(),
55            port,
56            username: username.into(),
57            password: password.into(),
58            database: None,
59            pool_max_size: 10,
60            stream_chunk_size: 2048,
61        }
62    }
63}
64
65#[derive(Debug)]
66pub struct PostgresPool {
67    pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
68}
69
70#[async_trait::async_trait]
71impl Pool for PostgresPool {
72    async fn get(&self) -> DFResult<Arc<dyn Connection>> {
73        let conn = self.pool.get_owned().await.map_err(|e| {
74            DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
75        })?;
76        Ok(Arc::new(PostgresConnection { conn }))
77    }
78}
79
80pub(crate) async fn connect_postgres(
81    options: &PostgresConnectionOptions,
82) -> DFResult<PostgresPool> {
83    let mut config = bb8_postgres::tokio_postgres::config::Config::new();
84    config
85        .host(&options.host)
86        .port(options.port)
87        .user(&options.username)
88        .password(&options.password);
89    if let Some(database) = &options.database {
90        config.dbname(database);
91    }
92    let manager = PostgresConnectionManager::new(config, NoTls);
93    let pool = bb8::Pool::builder()
94        .max_size(options.pool_max_size as u32)
95        .build(manager)
96        .await
97        .map_err(|e| {
98            DataFusionError::Execution(format!(
99                "Failed to create postgres connection pool due to {e}",
100            ))
101        })?;
102
103    Ok(PostgresPool { pool })
104}
105
106#[derive(Debug)]
107pub(crate) struct PostgresConnection {
108    conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
109}
110
111#[async_trait::async_trait]
112impl Connection for PostgresConnection {
113    async fn infer_schema(&self, sql: &str) -> DFResult<(RemoteSchemaRef, SchemaRef)> {
114        let sql = RemoteDbType::Postgres
115            .try_rewrite_query(sql, &[], Some(1))
116            .unwrap_or_else(|| sql.to_string());
117        let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
118            DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
119        })?;
120        let remote_schema = Arc::new(build_remote_schema(&row)?);
121        let arrow_schema = Arc::new(remote_schema.to_arrow_schema());
122        Ok((remote_schema, arrow_schema))
123    }
124
125    async fn query(
126        &self,
127        conn_options: &ConnectionOptions,
128        sql: &str,
129        table_schema: SchemaRef,
130        projection: Option<&Vec<usize>>,
131        filters: &[Expr],
132        limit: Option<usize>,
133    ) -> DFResult<SendableRecordBatchStream> {
134        let projected_schema = project_schema(&table_schema, projection)?;
135        let sql = RemoteDbType::Postgres
136            .try_rewrite_query(sql, filters, limit)
137            .unwrap_or_else(|| sql.to_string());
138        let projection = projection.cloned();
139        let chunk_size = conn_options.stream_chunk_size();
140        let stream = self
141            .conn
142            .query_raw(&sql, Vec::<String>::new())
143            .await
144            .map_err(|e| {
145                DataFusionError::Execution(format!(
146                    "Failed to execute query {sql} on postgres: {e}",
147                ))
148            })?
149            .chunks(chunk_size)
150            .boxed();
151
152        let stream = stream.map(move |rows| {
153            let rows: Vec<Row> = rows
154                .into_iter()
155                .collect::<Result<Vec<_>, _>>()
156                .map_err(|e| {
157                    DataFusionError::Execution(format!(
158                        "Failed to collect rows from postgres due to {e}",
159                    ))
160                })?;
161            rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
162        });
163
164        Ok(Box::pin(RecordBatchStreamAdapter::new(
165            projected_schema,
166            stream,
167        )))
168    }
169}
170
171fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<RemoteType> {
172    match pg_type {
173        &Type::INT2 => Ok(RemoteType::Postgres(PostgresType::Int2)),
174        &Type::INT4 => Ok(RemoteType::Postgres(PostgresType::Int4)),
175        &Type::INT8 => Ok(RemoteType::Postgres(PostgresType::Int8)),
176        &Type::FLOAT4 => Ok(RemoteType::Postgres(PostgresType::Float4)),
177        &Type::FLOAT8 => Ok(RemoteType::Postgres(PostgresType::Float8)),
178        &Type::NUMERIC => {
179            let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
180                DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
181            })?;
182            let scale = match v {
183                Some(v) => v.scale,
184                None => 0,
185            };
186            assert!((scale as u32) <= (i8::MAX as u32));
187            Ok(RemoteType::Postgres(PostgresType::Numeric(
188                scale.try_into().unwrap_or_default(),
189            )))
190        }
191        &Type::OID => Ok(RemoteType::Postgres(PostgresType::Oid)),
192        &Type::NAME => Ok(RemoteType::Postgres(PostgresType::Name)),
193        &Type::VARCHAR => Ok(RemoteType::Postgres(PostgresType::Varchar)),
194        &Type::BPCHAR => Ok(RemoteType::Postgres(PostgresType::Bpchar)),
195        &Type::TEXT => Ok(RemoteType::Postgres(PostgresType::Text)),
196        &Type::BYTEA => Ok(RemoteType::Postgres(PostgresType::Bytea)),
197        &Type::DATE => Ok(RemoteType::Postgres(PostgresType::Date)),
198        &Type::TIMESTAMP => Ok(RemoteType::Postgres(PostgresType::Timestamp)),
199        &Type::TIMESTAMPTZ => Ok(RemoteType::Postgres(PostgresType::TimestampTz)),
200        &Type::TIME => Ok(RemoteType::Postgres(PostgresType::Time)),
201        &Type::INTERVAL => Ok(RemoteType::Postgres(PostgresType::Interval)),
202        &Type::BOOL => Ok(RemoteType::Postgres(PostgresType::Bool)),
203        &Type::JSON => Ok(RemoteType::Postgres(PostgresType::Json)),
204        &Type::JSONB => Ok(RemoteType::Postgres(PostgresType::Jsonb)),
205        &Type::INT2_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int2Array)),
206        &Type::INT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int4Array)),
207        &Type::INT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int8Array)),
208        &Type::FLOAT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float4Array)),
209        &Type::FLOAT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float8Array)),
210        &Type::VARCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::VarcharArray)),
211        &Type::BPCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::BpcharArray)),
212        &Type::TEXT_ARRAY => Ok(RemoteType::Postgres(PostgresType::TextArray)),
213        &Type::BYTEA_ARRAY => Ok(RemoteType::Postgres(PostgresType::ByteaArray)),
214        &Type::BOOL_ARRAY => Ok(RemoteType::Postgres(PostgresType::BoolArray)),
215        other if other.name().eq_ignore_ascii_case("geometry") => {
216            Ok(RemoteType::Postgres(PostgresType::PostGisGeometry))
217        }
218        _ => Err(DataFusionError::NotImplemented(format!(
219            "Unsupported postgres type {pg_type:?}",
220        ))),
221    }
222}
223
224fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
225    let mut remote_fields = vec![];
226    for (idx, col) in row.columns().iter().enumerate() {
227        remote_fields.push(RemoteField::new(
228            col.name(),
229            pg_type_to_remote_type(col.type_(), row, idx)?,
230            true,
231        ));
232    }
233    Ok(RemoteSchema::new(remote_fields))
234}
235
236macro_rules! handle_primitive_type {
237    ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
238        let builder = $builder
239            .as_any_mut()
240            .downcast_mut::<$builder_ty>()
241            .unwrap_or_else(|| {
242                panic!(
243                    "Failed to downcast builder to {} for {:?} and {:?}",
244                    stringify!($builder_ty),
245                    $field,
246                    $col
247                )
248            });
249        let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
250            DataFusionError::Execution(format!(
251                "Failed to get {} value for {:?} and {:?}: {e:?}",
252                stringify!($value_ty),
253                $field,
254                $col
255            ))
256        })?;
257
258        match v {
259            Some(v) => builder.append_value($convert(v)?),
260            None => builder.append_null(),
261        }
262    }};
263}
264
265macro_rules! handle_primitive_array_type {
266    ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
267        let builder = $builder
268            .as_any_mut()
269            .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
270            .unwrap_or_else(|| {
271                panic!(
272                    "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
273                    $field, $col
274                )
275            });
276        let values_builder = builder
277            .values()
278            .as_any_mut()
279            .downcast_mut::<$values_builder_ty>()
280            .unwrap_or_else(|| {
281                panic!(
282                    "Failed to downcast values builder to {} for {:?} and {:?}",
283                    stringify!($builder_ty),
284                    $field,
285                    $col,
286                )
287            });
288        let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
289            DataFusionError::Execution(format!(
290                "Failed to get {} array value for {:?} and {:?}: {e:?}",
291                stringify!($value_ty),
292                $field,
293                $col,
294            ))
295        })?;
296
297        match v {
298            Some(v) => {
299                let v = v.into_iter().map(Some);
300                values_builder.extend(v);
301                builder.append(true);
302            }
303            None => builder.append_null(),
304        }
305    }};
306}
307
308#[derive(Debug)]
309struct BigDecimalFromSql {
310    inner: BigDecimal,
311    scale: u16,
312}
313
314impl BigDecimalFromSql {
315    fn to_decimal_128(&self) -> Option<i128> {
316        big_decimal_to_i128(&self.inner, Some(self.scale as i32))
317    }
318}
319
320#[allow(clippy::cast_sign_loss)]
321#[allow(clippy::cast_possible_wrap)]
322#[allow(clippy::cast_possible_truncation)]
323impl<'a> FromSql<'a> for BigDecimalFromSql {
324    fn from_sql(
325        _ty: &Type,
326        raw: &'a [u8],
327    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
328        let raw_u16: Vec<u16> = raw
329            .chunks(2)
330            .map(|chunk| {
331                if chunk.len() == 2 {
332                    u16::from_be_bytes([chunk[0], chunk[1]])
333                } else {
334                    u16::from_be_bytes([chunk[0], 0])
335                }
336            })
337            .collect();
338
339        let base_10_000_digit_count = raw_u16[0];
340        let weight = raw_u16[1] as i16;
341        let sign = raw_u16[2];
342        let scale = raw_u16[3];
343
344        let mut base_10_000_digits = Vec::new();
345        for i in 4..4 + base_10_000_digit_count {
346            base_10_000_digits.push(raw_u16[i as usize]);
347        }
348
349        let mut u8_digits = Vec::new();
350        for &base_10_000_digit in base_10_000_digits.iter().rev() {
351            let mut base_10_000_digit = base_10_000_digit;
352            let mut temp_result = Vec::new();
353            while base_10_000_digit > 0 {
354                temp_result.push((base_10_000_digit % 10) as u8);
355                base_10_000_digit /= 10;
356            }
357            while temp_result.len() < 4 {
358                temp_result.push(0);
359            }
360            u8_digits.extend(temp_result);
361        }
362        u8_digits.reverse();
363
364        let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
365        let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
366        u8_digits.resize(size as usize, 0);
367
368        let sign = match sign {
369            0x4000 => Sign::Minus,
370            0x0000 => Sign::Plus,
371            _ => {
372                return Err(Box::new(DataFusionError::Execution(
373                    "Failed to parse big decimal from postgres numeric value".to_string(),
374                )));
375            }
376        };
377
378        let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
379            return Err(Box::new(DataFusionError::Execution(
380                "Failed to parse big decimal from postgres numeric value".to_string(),
381            )));
382        };
383        Ok(BigDecimalFromSql {
384            inner: BigDecimal::new(digits, i64::from(scale)),
385            scale,
386        })
387    }
388
389    fn accepts(ty: &Type) -> bool {
390        matches!(*ty, Type::NUMERIC)
391    }
392}
393
394// interval_send - Postgres C (https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/timestamp.c#L1032)
395// interval values are internally stored as three integral fields: months, days, and microseconds
396#[derive(Debug)]
397struct IntervalFromSql {
398    time: i64,
399    day: i32,
400    month: i32,
401}
402
403impl<'a> FromSql<'a> for IntervalFromSql {
404    fn from_sql(
405        _ty: &Type,
406        raw: &'a [u8],
407    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
408        let mut cursor = std::io::Cursor::new(raw);
409
410        let time = cursor.read_i64::<BigEndian>()?;
411        let day = cursor.read_i32::<BigEndian>()?;
412        let month = cursor.read_i32::<BigEndian>()?;
413
414        Ok(IntervalFromSql { time, day, month })
415    }
416
417    fn accepts(ty: &Type) -> bool {
418        matches!(*ty, Type::INTERVAL)
419    }
420}
421
422struct GeometryFromSql<'a> {
423    wkb: &'a [u8],
424}
425
426impl<'a> FromSql<'a> for GeometryFromSql<'a> {
427    fn from_sql(
428        _ty: &Type,
429        raw: &'a [u8],
430    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
431        Ok(GeometryFromSql { wkb: raw })
432    }
433
434    fn accepts(ty: &Type) -> bool {
435        matches!(ty.name(), "geometry")
436    }
437}
438
439fn rows_to_batch(
440    rows: &[Row],
441    table_schema: &SchemaRef,
442    projection: Option<&Vec<usize>>,
443) -> DFResult<RecordBatch> {
444    let projected_schema = project_schema(table_schema, projection)?;
445    let mut array_builders = vec![];
446    for field in table_schema.fields() {
447        let builder = make_builder(field.data_type(), rows.len());
448        array_builders.push(builder);
449    }
450
451    for row in rows {
452        for (idx, field) in table_schema.fields.iter().enumerate() {
453            if !projections_contains(projection, idx) {
454                continue;
455            }
456            let builder = &mut array_builders[idx];
457            let col = row.columns().get(idx);
458            match field.data_type() {
459                DataType::Int16 => {
460                    handle_primitive_type!(builder, field, col, Int16Builder, i16, row, idx, |v| {
461                        Ok::<_, DataFusionError>(v)
462                    });
463                }
464                DataType::Int32 => {
465                    handle_primitive_type!(builder, field, col, Int32Builder, i32, row, idx, |v| {
466                        Ok::<_, DataFusionError>(v)
467                    });
468                }
469                DataType::UInt32 => {
470                    handle_primitive_type!(
471                        builder,
472                        field,
473                        col,
474                        UInt32Builder,
475                        u32,
476                        row,
477                        idx,
478                        |v| { Ok::<_, DataFusionError>(v) }
479                    );
480                }
481                DataType::Int64 => {
482                    handle_primitive_type!(builder, field, col, Int64Builder, i64, row, idx, |v| {
483                        Ok::<_, DataFusionError>(v)
484                    });
485                }
486                DataType::Float32 => {
487                    handle_primitive_type!(
488                        builder,
489                        field,
490                        col,
491                        Float32Builder,
492                        f32,
493                        row,
494                        idx,
495                        |v| { Ok::<_, DataFusionError>(v) }
496                    );
497                }
498                DataType::Float64 => {
499                    handle_primitive_type!(
500                        builder,
501                        field,
502                        col,
503                        Float64Builder,
504                        f64,
505                        row,
506                        idx,
507                        |v| { Ok::<_, DataFusionError>(v) }
508                    );
509                }
510                DataType::Decimal128(_precision, _scale) => {
511                    handle_primitive_type!(
512                        builder,
513                        field,
514                        col,
515                        Decimal128Builder,
516                        BigDecimalFromSql,
517                        row,
518                        idx,
519                        |v: BigDecimalFromSql| {
520                            v.to_decimal_128().ok_or_else(|| {
521                                DataFusionError::Execution(format!(
522                                    "Failed to convert BigDecimal {v:?} to i128",
523                                ))
524                            })
525                        }
526                    );
527                }
528                DataType::Utf8 => {
529                    handle_primitive_type!(
530                        builder,
531                        field,
532                        col,
533                        StringBuilder,
534                        &str,
535                        row,
536                        idx,
537                        |v| { Ok::<_, DataFusionError>(v) }
538                    );
539                }
540                DataType::LargeUtf8 => {
541                    if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
542                        handle_primitive_type!(
543                            builder,
544                            field,
545                            col,
546                            LargeStringBuilder,
547                            serde_json::value::Value,
548                            row,
549                            idx,
550                            |v: serde_json::value::Value| {
551                                Ok::<_, DataFusionError>(v.to_string())
552                            }
553                        );
554                    } else {
555                        handle_primitive_type!(
556                            builder,
557                            field,
558                            col,
559                            LargeStringBuilder,
560                            &str,
561                            row,
562                            idx,
563                            |v| { Ok::<_, DataFusionError>(v) }
564                        );
565                    }
566                }
567                DataType::Binary => {
568                    if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
569                    {
570                        let convert: for<'a> fn(GeometryFromSql<'a>) -> DFResult<&'a [u8]> =
571                            |v| Ok(v.wkb);
572                        handle_primitive_type!(
573                            builder,
574                            field,
575                            col,
576                            BinaryBuilder,
577                            GeometryFromSql,
578                            row,
579                            idx,
580                            convert
581                        );
582                    } else if col.is_some()
583                        && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
584                    {
585                        handle_primitive_type!(
586                            builder,
587                            field,
588                            col,
589                            BinaryBuilder,
590                            serde_json::value::Value,
591                            row,
592                            idx,
593                            |v: serde_json::value::Value| {
594                                Ok::<_, DataFusionError>(v.to_string().into_bytes())
595                            }
596                        );
597                    } else {
598                        handle_primitive_type!(
599                            builder,
600                            field,
601                            col,
602                            BinaryBuilder,
603                            Vec<u8>,
604                            row,
605                            idx,
606                            |v| { Ok::<_, DataFusionError>(v) }
607                        );
608                    }
609                }
610                DataType::Timestamp(TimeUnit::Microsecond, None) => {
611                    handle_primitive_type!(
612                        builder,
613                        field,
614                        col,
615                        TimestampMicrosecondBuilder,
616                        SystemTime,
617                        row,
618                        idx,
619                        |v: SystemTime| {
620                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
621                                let timestamp: i64 = v
622                                    .as_micros()
623                                    .try_into()
624                                    .expect("Failed to convert SystemTime to i64");
625                                Ok(timestamp)
626                            } else {
627                                Err(DataFusionError::Execution(format!(
628                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
629                                )))
630                            }
631                        }
632                    );
633                }
634                DataType::Timestamp(TimeUnit::Nanosecond, None) => {
635                    handle_primitive_type!(
636                        builder,
637                        field,
638                        col,
639                        TimestampNanosecondBuilder,
640                        SystemTime,
641                        row,
642                        idx,
643                        |v: SystemTime| {
644                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
645                                let timestamp: i64 = v
646                                    .as_nanos()
647                                    .try_into()
648                                    .expect("Failed to convert SystemTime to i64");
649                                Ok(timestamp)
650                            } else {
651                                Err(DataFusionError::Execution(format!(
652                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
653                                )))
654                            }
655                        }
656                    );
657                }
658                DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
659                    handle_primitive_type!(
660                        builder,
661                        field,
662                        col,
663                        TimestampNanosecondBuilder,
664                        chrono::DateTime<chrono::Utc>,
665                        row,
666                        idx,
667                        |v: chrono::DateTime<chrono::Utc>| {
668                            let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
669                            Ok::<_, DataFusionError>(timestamp)
670                        }
671                    );
672                }
673                DataType::Time64(TimeUnit::Microsecond) => {
674                    handle_primitive_type!(
675                        builder,
676                        field,
677                        col,
678                        Time64MicrosecondBuilder,
679                        chrono::NaiveTime,
680                        row,
681                        idx,
682                        |v: chrono::NaiveTime| {
683                            let seconds = i64::from(v.num_seconds_from_midnight());
684                            let microseconds = i64::from(v.nanosecond()) / 1000;
685                            Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
686                        }
687                    );
688                }
689                DataType::Time64(TimeUnit::Nanosecond) => {
690                    handle_primitive_type!(
691                        builder,
692                        field,
693                        col,
694                        Time64NanosecondBuilder,
695                        chrono::NaiveTime,
696                        row,
697                        idx,
698                        |v: chrono::NaiveTime| {
699                            let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
700                                * 1_000_000_000
701                                + i64::from(v.nanosecond());
702                            Ok::<_, DataFusionError>(timestamp)
703                        }
704                    );
705                }
706                DataType::Date32 => {
707                    handle_primitive_type!(
708                        builder,
709                        field,
710                        col,
711                        Date32Builder,
712                        chrono::NaiveDate,
713                        row,
714                        idx,
715                        |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
716                    );
717                }
718                DataType::Interval(IntervalUnit::MonthDayNano) => {
719                    handle_primitive_type!(
720                        builder,
721                        field,
722                        col,
723                        IntervalMonthDayNanoBuilder,
724                        IntervalFromSql,
725                        row,
726                        idx,
727                        |v: IntervalFromSql| {
728                            let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
729                                v.month,
730                                v.day,
731                                v.time * 1_000,
732                            );
733                            Ok::<_, DataFusionError>(interval_month_day_nano)
734                        }
735                    );
736                }
737                DataType::Boolean => {
738                    handle_primitive_type!(
739                        builder,
740                        field,
741                        col,
742                        BooleanBuilder,
743                        bool,
744                        row,
745                        idx,
746                        |v| { Ok::<_, DataFusionError>(v) }
747                    );
748                }
749                DataType::List(inner) => match inner.data_type() {
750                    DataType::Int16 => {
751                        handle_primitive_array_type!(
752                            builder,
753                            field,
754                            col,
755                            Int16Builder,
756                            i16,
757                            row,
758                            idx
759                        );
760                    }
761                    DataType::Int32 => {
762                        handle_primitive_array_type!(
763                            builder,
764                            field,
765                            col,
766                            Int32Builder,
767                            i32,
768                            row,
769                            idx
770                        );
771                    }
772                    DataType::Int64 => {
773                        handle_primitive_array_type!(
774                            builder,
775                            field,
776                            col,
777                            Int64Builder,
778                            i64,
779                            row,
780                            idx
781                        );
782                    }
783                    DataType::Float32 => {
784                        handle_primitive_array_type!(
785                            builder,
786                            field,
787                            col,
788                            Float32Builder,
789                            f32,
790                            row,
791                            idx
792                        );
793                    }
794                    DataType::Float64 => {
795                        handle_primitive_array_type!(
796                            builder,
797                            field,
798                            col,
799                            Float64Builder,
800                            f64,
801                            row,
802                            idx
803                        );
804                    }
805                    DataType::Utf8 => {
806                        handle_primitive_array_type!(
807                            builder,
808                            field,
809                            col,
810                            StringBuilder,
811                            &str,
812                            row,
813                            idx
814                        );
815                    }
816                    DataType::Binary => {
817                        handle_primitive_array_type!(
818                            builder,
819                            field,
820                            col,
821                            BinaryBuilder,
822                            Vec<u8>,
823                            row,
824                            idx
825                        );
826                    }
827                    DataType::Boolean => {
828                        handle_primitive_array_type!(
829                            builder,
830                            field,
831                            col,
832                            BooleanBuilder,
833                            bool,
834                            row,
835                            idx
836                        );
837                    }
838                    _ => {
839                        return Err(DataFusionError::NotImplemented(format!(
840                            "Unsupported list data type {:?} for col: {:?}",
841                            field.data_type(),
842                            col
843                        )));
844                    }
845                },
846                _ => {
847                    return Err(DataFusionError::NotImplemented(format!(
848                        "Unsupported data type {:?} for col: {:?}",
849                        field.data_type(),
850                        col
851                    )));
852                }
853            }
854        }
855    }
856    let projected_columns = array_builders
857        .into_iter()
858        .enumerate()
859        .filter(|(idx, _)| projections_contains(projection, *idx))
860        .map(|(_, mut builder)| builder.finish())
861        .collect::<Vec<ArrayRef>>();
862    Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
863}