datafusion_remote_table/connection/
postgres.rs

1use crate::connection::{big_decimal_to_i128, projections_contains};
2use crate::transform::transform_batch;
3use crate::{
4    Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
5    RemoteSchemaRef, RemoteType, Transform,
6};
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bb8_postgres::PostgresConnectionManager;
10use bigdecimal::BigDecimal;
11use byteorder::{BigEndian, ReadBytesExt};
12use chrono::Timelike;
13use datafusion::arrow::array::{
14    make_builder, ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder,
15    Decimal128Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
16    IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
17    Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
18    TimestampNanosecondBuilder,
19};
20use datafusion::arrow::datatypes::{
21    DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
22};
23use datafusion::common::project_schema;
24use datafusion::error::DataFusionError;
25use datafusion::execution::SendableRecordBatchStream;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
27use futures::StreamExt;
28use num_bigint::{BigInt, Sign};
29use std::string::ToString;
30use std::sync::Arc;
31use std::time::{SystemTime, UNIX_EPOCH};
32
33#[derive(Debug, Clone, derive_with::With)]
34pub struct PostgresConnectionOptions {
35    pub(crate) host: String,
36    pub(crate) port: u16,
37    pub(crate) username: String,
38    pub(crate) password: String,
39    pub(crate) database: Option<String>,
40    pub(crate) chunk_size: Option<usize>,
41}
42
43impl PostgresConnectionOptions {
44    pub fn new(
45        host: impl Into<String>,
46        port: u16,
47        username: impl Into<String>,
48        password: impl Into<String>,
49    ) -> Self {
50        Self {
51            host: host.into(),
52            port,
53            username: username.into(),
54            password: password.into(),
55            database: None,
56            chunk_size: None,
57        }
58    }
59}
60
61#[derive(Debug)]
62pub struct PostgresPool {
63    pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
64}
65
66#[async_trait::async_trait]
67impl Pool for PostgresPool {
68    async fn get(&self) -> DFResult<Arc<dyn Connection>> {
69        let conn = self.pool.get_owned().await.map_err(|e| {
70            DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
71        })?;
72        Ok(Arc::new(PostgresConnection { conn }))
73    }
74}
75
76pub(crate) async fn connect_postgres(
77    options: &PostgresConnectionOptions,
78) -> DFResult<PostgresPool> {
79    let mut config = bb8_postgres::tokio_postgres::config::Config::new();
80    config
81        .host(&options.host)
82        .port(options.port)
83        .user(&options.username)
84        .password(&options.password);
85    if let Some(database) = &options.database {
86        config.dbname(database);
87    }
88    let manager = PostgresConnectionManager::new(config, NoTls);
89    let pool = bb8::Pool::builder()
90        .max_size(5)
91        .build(manager)
92        .await
93        .map_err(|e| {
94            DataFusionError::Execution(format!(
95                "Failed to create postgres connection pool due to {e}",
96            ))
97        })?;
98
99    Ok(PostgresPool { pool })
100}
101
102#[derive(Debug)]
103pub(crate) struct PostgresConnection {
104    conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
105}
106
107#[async_trait::async_trait]
108impl Connection for PostgresConnection {
109    async fn infer_schema(
110        &self,
111        sql: &str,
112        transform: Option<Arc<dyn Transform>>,
113    ) -> DFResult<(RemoteSchemaRef, SchemaRef)> {
114        let mut stream = self
115            .conn
116            .query_raw(sql, Vec::<String>::new())
117            .await
118            .map_err(|e| {
119                DataFusionError::Execution(format!(
120                    "Failed to execute query {sql} on postgres due to {e}",
121                ))
122            })?
123            .chunks(1)
124            .boxed();
125
126        let Some(first_chunk) = stream.next().await else {
127            return Err(DataFusionError::Execution(
128                "No data returned from postgres".to_string(),
129            ));
130        };
131        let first_chunk: Vec<Row> = first_chunk
132            .into_iter()
133            .collect::<Result<Vec<_>, _>>()
134            .map_err(|e| {
135                DataFusionError::Execution(format!(
136                    "Failed to collect rows from postgres due to {e}",
137                ))
138            })?;
139        let Some(first_row) = first_chunk.first() else {
140            return Err(DataFusionError::Execution(
141                "No data returned from postgres".to_string(),
142            ));
143        };
144        let remote_schema = Arc::new(build_remote_schema(first_row)?);
145        let arrow_schema = Arc::new(remote_schema.to_arrow_schema());
146        if let Some(transform) = transform {
147            let batch = rows_to_batch(std::slice::from_ref(first_row), &arrow_schema, None)?;
148            let transformed_batch = transform_batch(
149                batch,
150                transform.as_ref(),
151                &arrow_schema,
152                None,
153                Some(&remote_schema),
154            )?;
155            Ok((remote_schema, transformed_batch.schema()))
156        } else {
157            Ok((remote_schema, arrow_schema))
158        }
159    }
160
161    async fn query(
162        &self,
163        conn_options: &ConnectionOptions,
164        sql: &str,
165        table_schema: SchemaRef,
166        projection: Option<&Vec<usize>>,
167    ) -> DFResult<SendableRecordBatchStream> {
168        let projected_schema = project_schema(&table_schema, projection)?;
169        let projection = projection.cloned();
170        let chunk_size = conn_options.chunk_size();
171        let stream = self
172            .conn
173            .query_raw(sql, Vec::<String>::new())
174            .await
175            .map_err(|e| {
176                DataFusionError::Execution(format!(
177                    "Failed to execute query {sql} on postgres due to {e}",
178                ))
179            })?
180            .chunks(chunk_size.unwrap_or(2048))
181            .boxed();
182
183        let stream = stream.map(move |rows| {
184            let rows: Vec<Row> = rows
185                .into_iter()
186                .collect::<Result<Vec<_>, _>>()
187                .map_err(|e| {
188                    DataFusionError::Execution(format!(
189                        "Failed to collect rows from postgres due to {e}",
190                    ))
191                })?;
192            rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
193        });
194
195        Ok(Box::pin(RecordBatchStreamAdapter::new(
196            projected_schema,
197            stream,
198        )))
199    }
200}
201
202fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<RemoteType> {
203    match pg_type {
204        &Type::INT2 => Ok(RemoteType::Postgres(PostgresType::Int2)),
205        &Type::INT4 => Ok(RemoteType::Postgres(PostgresType::Int4)),
206        &Type::INT8 => Ok(RemoteType::Postgres(PostgresType::Int8)),
207        &Type::FLOAT4 => Ok(RemoteType::Postgres(PostgresType::Float4)),
208        &Type::FLOAT8 => Ok(RemoteType::Postgres(PostgresType::Float8)),
209        &Type::NUMERIC => {
210            let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
211                DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
212            })?;
213            let scale = match v {
214                Some(v) => v.scale,
215                None => 0,
216            };
217            assert!((scale as u32) <= (i8::MAX as u32));
218            Ok(RemoteType::Postgres(PostgresType::Numeric(
219                scale.try_into().unwrap_or_default(),
220            )))
221        }
222        &Type::NAME => Ok(RemoteType::Postgres(PostgresType::Name)),
223        &Type::VARCHAR => Ok(RemoteType::Postgres(PostgresType::Varchar)),
224        &Type::BPCHAR => Ok(RemoteType::Postgres(PostgresType::Bpchar)),
225        &Type::TEXT => Ok(RemoteType::Postgres(PostgresType::Text)),
226        &Type::BYTEA => Ok(RemoteType::Postgres(PostgresType::Bytea)),
227        &Type::DATE => Ok(RemoteType::Postgres(PostgresType::Date)),
228        &Type::TIMESTAMP => Ok(RemoteType::Postgres(PostgresType::Timestamp)),
229        &Type::TIMESTAMPTZ => Ok(RemoteType::Postgres(PostgresType::TimestampTz)),
230        &Type::TIME => Ok(RemoteType::Postgres(PostgresType::Time)),
231        &Type::INTERVAL => Ok(RemoteType::Postgres(PostgresType::Interval)),
232        &Type::BOOL => Ok(RemoteType::Postgres(PostgresType::Bool)),
233        &Type::JSON => Ok(RemoteType::Postgres(PostgresType::Json)),
234        &Type::JSONB => Ok(RemoteType::Postgres(PostgresType::Jsonb)),
235        &Type::INT2_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int2Array)),
236        &Type::INT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int4Array)),
237        &Type::INT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int8Array)),
238        &Type::FLOAT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float4Array)),
239        &Type::FLOAT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float8Array)),
240        &Type::VARCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::VarcharArray)),
241        &Type::BPCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::BpcharArray)),
242        &Type::TEXT_ARRAY => Ok(RemoteType::Postgres(PostgresType::TextArray)),
243        &Type::BYTEA_ARRAY => Ok(RemoteType::Postgres(PostgresType::ByteaArray)),
244        &Type::BOOL_ARRAY => Ok(RemoteType::Postgres(PostgresType::BoolArray)),
245        other if other.name().eq_ignore_ascii_case("geometry") => {
246            Ok(RemoteType::Postgres(PostgresType::PostGisGeometry))
247        }
248        _ => Err(DataFusionError::NotImplemented(format!(
249            "Unsupported postgres type {pg_type:?}",
250        ))),
251    }
252}
253
254fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
255    let mut remote_fields = vec![];
256    for (idx, col) in row.columns().iter().enumerate() {
257        remote_fields.push(RemoteField::new(
258            col.name(),
259            pg_type_to_remote_type(col.type_(), row, idx)?,
260            true,
261        ));
262    }
263    Ok(RemoteSchema::new(remote_fields))
264}
265
266macro_rules! handle_primitive_type {
267    ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
268        let builder = $builder
269            .as_any_mut()
270            .downcast_mut::<$builder_ty>()
271            .unwrap_or_else(|| {
272                panic!(
273                    concat!(
274                        "Failed to downcast builder to ",
275                        stringify!($builder_ty),
276                        " for {:?} and {:?}"
277                    ),
278                    $field, $col
279                )
280            });
281        let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
282            DataFusionError::Execution(format!(
283                "Failed to get BigDecimal value for {:?} and {:?}: {e:?}",
284                $field, $col
285            ))
286        })?;
287
288        match v {
289            Some(v) => builder.append_value($convert(v)?),
290            None => builder.append_null(),
291        }
292    }};
293}
294
295macro_rules! handle_primitive_array_type {
296    ($builder:expr, $field:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
297        let builder = $builder
298            .as_any_mut()
299            .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
300            .unwrap_or_else(|| {
301                panic!(
302                    "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?}",
303                    $field
304                )
305            });
306        let values_builder = builder
307            .values()
308            .as_any_mut()
309            .downcast_mut::<$values_builder_ty>()
310            .unwrap_or_else(|| {
311                panic!(
312                    concat!(
313                        "Failed to downcast values builder to ",
314                        stringify!($builder_ty),
315                        " for {:?}"
316                    ),
317                    $field
318                )
319            });
320        let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).unwrap_or_else(|e| {
321            panic!(
322                concat!(
323                    "Failed to get ",
324                    stringify!($value_ty),
325                    " array value for {:?}: {:?}"
326                ),
327                $field, e
328            )
329        });
330
331        match v {
332            Some(v) => {
333                let v = v.into_iter().map(Some);
334                values_builder.extend(v);
335                builder.append(true);
336            }
337            None => builder.append_null(),
338        }
339    }};
340}
341
342#[derive(Debug)]
343struct BigDecimalFromSql {
344    inner: BigDecimal,
345    scale: u16,
346}
347
348impl BigDecimalFromSql {
349    fn to_decimal_128(&self) -> Option<i128> {
350        big_decimal_to_i128(&self.inner, Some(self.scale as u32))
351    }
352}
353
354#[allow(clippy::cast_sign_loss)]
355#[allow(clippy::cast_possible_wrap)]
356#[allow(clippy::cast_possible_truncation)]
357impl<'a> FromSql<'a> for BigDecimalFromSql {
358    fn from_sql(
359        _ty: &Type,
360        raw: &'a [u8],
361    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
362        let raw_u16: Vec<u16> = raw
363            .chunks(2)
364            .map(|chunk| {
365                if chunk.len() == 2 {
366                    u16::from_be_bytes([chunk[0], chunk[1]])
367                } else {
368                    u16::from_be_bytes([chunk[0], 0])
369                }
370            })
371            .collect();
372
373        let base_10_000_digit_count = raw_u16[0];
374        let weight = raw_u16[1] as i16;
375        let sign = raw_u16[2];
376        let scale = raw_u16[3];
377
378        let mut base_10_000_digits = Vec::new();
379        for i in 4..4 + base_10_000_digit_count {
380            base_10_000_digits.push(raw_u16[i as usize]);
381        }
382
383        let mut u8_digits = Vec::new();
384        for &base_10_000_digit in base_10_000_digits.iter().rev() {
385            let mut base_10_000_digit = base_10_000_digit;
386            let mut temp_result = Vec::new();
387            while base_10_000_digit > 0 {
388                temp_result.push((base_10_000_digit % 10) as u8);
389                base_10_000_digit /= 10;
390            }
391            while temp_result.len() < 4 {
392                temp_result.push(0);
393            }
394            u8_digits.extend(temp_result);
395        }
396        u8_digits.reverse();
397
398        let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
399        let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
400        u8_digits.resize(size as usize, 0);
401
402        let sign = match sign {
403            0x4000 => Sign::Minus,
404            0x0000 => Sign::Plus,
405            _ => {
406                return Err(Box::new(DataFusionError::Execution(
407                    "Failed to parse big decimal from postgres numeric value".to_string(),
408                )));
409            }
410        };
411
412        let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
413            return Err(Box::new(DataFusionError::Execution(
414                "Failed to parse big decimal from postgres numeric value".to_string(),
415            )));
416        };
417        Ok(BigDecimalFromSql {
418            inner: BigDecimal::new(digits, i64::from(scale)),
419            scale,
420        })
421    }
422
423    fn accepts(ty: &Type) -> bool {
424        matches!(*ty, Type::NUMERIC)
425    }
426}
427
428// interval_send - Postgres C (https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/timestamp.c#L1032)
429// interval values are internally stored as three integral fields: months, days, and microseconds
430#[derive(Debug)]
431struct IntervalFromSql {
432    time: i64,
433    day: i32,
434    month: i32,
435}
436
437impl<'a> FromSql<'a> for IntervalFromSql {
438    fn from_sql(
439        _ty: &Type,
440        raw: &'a [u8],
441    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
442        let mut cursor = std::io::Cursor::new(raw);
443
444        let time = cursor.read_i64::<BigEndian>()?;
445        let day = cursor.read_i32::<BigEndian>()?;
446        let month = cursor.read_i32::<BigEndian>()?;
447
448        Ok(IntervalFromSql { time, day, month })
449    }
450
451    fn accepts(ty: &Type) -> bool {
452        matches!(*ty, Type::INTERVAL)
453    }
454}
455
456struct GeometryFromSql<'a> {
457    wkb: &'a [u8],
458}
459
460impl<'a> FromSql<'a> for GeometryFromSql<'a> {
461    fn from_sql(
462        _ty: &Type,
463        raw: &'a [u8],
464    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
465        Ok(GeometryFromSql { wkb: raw })
466    }
467
468    fn accepts(ty: &Type) -> bool {
469        matches!(ty.name(), "geometry")
470    }
471}
472
473fn rows_to_batch(
474    rows: &[Row],
475    table_schema: &SchemaRef,
476    projection: Option<&Vec<usize>>,
477) -> DFResult<RecordBatch> {
478    let projected_schema = project_schema(table_schema, projection)?;
479    let mut array_builders = vec![];
480    for field in table_schema.fields() {
481        let builder = make_builder(field.data_type(), rows.len());
482        array_builders.push(builder);
483    }
484
485    for row in rows {
486        for (idx, field) in table_schema.fields.iter().enumerate() {
487            if !projections_contains(projection, idx) {
488                continue;
489            }
490            let builder = &mut array_builders[idx];
491            let col = row.columns().get(idx);
492            match field.data_type() {
493                DataType::Int16 => {
494                    handle_primitive_type!(builder, field, col, Int16Builder, i16, row, idx, |v| {
495                        Ok::<_, DataFusionError>(v)
496                    });
497                }
498                DataType::Int32 => {
499                    handle_primitive_type!(builder, field, col, Int32Builder, i32, row, idx, |v| {
500                        Ok::<_, DataFusionError>(v)
501                    });
502                }
503                DataType::Int64 => {
504                    handle_primitive_type!(builder, field, col, Int64Builder, i64, row, idx, |v| {
505                        Ok::<_, DataFusionError>(v)
506                    });
507                }
508                DataType::Float32 => {
509                    handle_primitive_type!(
510                        builder,
511                        field,
512                        col,
513                        Float32Builder,
514                        f32,
515                        row,
516                        idx,
517                        |v| { Ok::<_, DataFusionError>(v) }
518                    );
519                }
520                DataType::Float64 => {
521                    handle_primitive_type!(
522                        builder,
523                        field,
524                        col,
525                        Float64Builder,
526                        f64,
527                        row,
528                        idx,
529                        |v| { Ok::<_, DataFusionError>(v) }
530                    );
531                }
532                DataType::Decimal128(_precision, _scale) => {
533                    handle_primitive_type!(
534                        builder,
535                        field,
536                        col,
537                        Decimal128Builder,
538                        BigDecimalFromSql,
539                        row,
540                        idx,
541                        |v: BigDecimalFromSql| {
542                            v.to_decimal_128().ok_or_else(|| {
543                                DataFusionError::Execution(format!(
544                                    "Failed to convert BigDecimal {v:?} to i128",
545                                ))
546                            })
547                        }
548                    );
549                }
550                DataType::Utf8 => {
551                    handle_primitive_type!(
552                        builder,
553                        field,
554                        col,
555                        StringBuilder,
556                        &str,
557                        row,
558                        idx,
559                        |v| { Ok::<_, DataFusionError>(v) }
560                    );
561                }
562                DataType::LargeUtf8 => {
563                    if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
564                        handle_primitive_type!(
565                            builder,
566                            field,
567                            col,
568                            LargeStringBuilder,
569                            serde_json::value::Value,
570                            row,
571                            idx,
572                            |v: serde_json::value::Value| {
573                                Ok::<_, DataFusionError>(v.to_string())
574                            }
575                        );
576                    } else {
577                        handle_primitive_type!(
578                            builder,
579                            field,
580                            col,
581                            LargeStringBuilder,
582                            &str,
583                            row,
584                            idx,
585                            |v| { Ok::<_, DataFusionError>(v) }
586                        );
587                    }
588                }
589                DataType::Binary => {
590                    if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
591                    {
592                        handle_primitive_type!(
593                            builder,
594                            field,
595                            col,
596                            BinaryBuilder,
597                            GeometryFromSql,
598                            row,
599                            idx,
600                            |v: GeometryFromSql| { Ok::<_, DataFusionError>(v.wkb.to_vec()) }
601                        );
602                    } else if col.is_some()
603                        && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
604                    {
605                        handle_primitive_type!(
606                            builder,
607                            field,
608                            col,
609                            BinaryBuilder,
610                            serde_json::value::Value,
611                            row,
612                            idx,
613                            |v: serde_json::value::Value| {
614                                Ok::<_, DataFusionError>(v.to_string().as_bytes().to_vec())
615                            }
616                        );
617                    } else {
618                        handle_primitive_type!(
619                            builder,
620                            field,
621                            col,
622                            BinaryBuilder,
623                            Vec<u8>,
624                            row,
625                            idx,
626                            |v| { Ok::<_, DataFusionError>(v) }
627                        );
628                    }
629                }
630                DataType::Timestamp(TimeUnit::Microsecond, None) => {
631                    handle_primitive_type!(
632                        builder,
633                        field,
634                        col,
635                        TimestampMicrosecondBuilder,
636                        SystemTime,
637                        row,
638                        idx,
639                        |v: SystemTime| {
640                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
641                                let timestamp: i64 = v
642                                    .as_micros()
643                                    .try_into()
644                                    .expect("Failed to convert SystemTime to i64");
645                                Ok(timestamp)
646                            } else {
647                                Err(DataFusionError::Execution(format!(
648                                    "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
649                                )))
650                            }
651                        }
652                    );
653                }
654                DataType::Timestamp(TimeUnit::Nanosecond, None) => {
655                    handle_primitive_type!(
656                        builder,
657                        field,
658                        col,
659                        TimestampNanosecondBuilder,
660                        SystemTime,
661                        row,
662                        idx,
663                        |v: SystemTime| {
664                            if let Ok(v) = v.duration_since(UNIX_EPOCH) {
665                                let timestamp: i64 = v
666                                    .as_nanos()
667                                    .try_into()
668                                    .expect("Failed to convert SystemTime to i64");
669                                Ok(timestamp)
670                            } else {
671                                Err(DataFusionError::Execution(format!(
672                                "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
673                            )))
674                            }
675                        }
676                    );
677                }
678                DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
679                    handle_primitive_type!(
680                        builder,
681                        field,
682                        col,
683                        TimestampNanosecondBuilder,
684                        chrono::DateTime<chrono::Utc>,
685                        row,
686                        idx,
687                        |v: chrono::DateTime<chrono::Utc>| {
688                            let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
689                            Ok::<_, DataFusionError>(timestamp)
690                        }
691                    );
692                }
693                DataType::Time64(TimeUnit::Microsecond) => {
694                    handle_primitive_type!(
695                        builder,
696                        field,
697                        col,
698                        Time64MicrosecondBuilder,
699                        chrono::NaiveTime,
700                        row,
701                        idx,
702                        |v: chrono::NaiveTime| {
703                            let seconds = i64::from(v.num_seconds_from_midnight());
704                            let microseconds = i64::from(v.nanosecond()) / 1000;
705                            Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
706                        }
707                    );
708                }
709                DataType::Time64(TimeUnit::Nanosecond) => {
710                    handle_primitive_type!(
711                        builder,
712                        field,
713                        col,
714                        Time64NanosecondBuilder,
715                        chrono::NaiveTime,
716                        row,
717                        idx,
718                        |v: chrono::NaiveTime| {
719                            let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
720                                * 1_000_000_000
721                                + i64::from(v.nanosecond());
722                            Ok::<_, DataFusionError>(timestamp)
723                        }
724                    );
725                }
726                DataType::Date32 => {
727                    handle_primitive_type!(
728                        builder,
729                        field,
730                        col,
731                        Date32Builder,
732                        chrono::NaiveDate,
733                        row,
734                        idx,
735                        |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
736                    );
737                }
738                DataType::Interval(IntervalUnit::MonthDayNano) => {
739                    handle_primitive_type!(
740                        builder,
741                        field,
742                        col,
743                        IntervalMonthDayNanoBuilder,
744                        IntervalFromSql,
745                        row,
746                        idx,
747                        |v: IntervalFromSql| {
748                            let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
749                                v.month,
750                                v.day,
751                                v.time * 1_000,
752                            );
753                            Ok::<_, DataFusionError>(interval_month_day_nano)
754                        }
755                    );
756                }
757                DataType::Boolean => {
758                    handle_primitive_type!(
759                        builder,
760                        field,
761                        col,
762                        BooleanBuilder,
763                        bool,
764                        row,
765                        idx,
766                        |v| { Ok::<_, DataFusionError>(v) }
767                    );
768                }
769                DataType::List(inner) => match inner.data_type() {
770                    DataType::Int16 => {
771                        handle_primitive_array_type!(builder, field, Int16Builder, i16, row, idx);
772                    }
773                    DataType::Int32 => {
774                        handle_primitive_array_type!(builder, field, Int32Builder, i32, row, idx);
775                    }
776                    DataType::Int64 => {
777                        handle_primitive_array_type!(builder, field, Int64Builder, i64, row, idx);
778                    }
779                    DataType::Float32 => {
780                        handle_primitive_array_type!(builder, field, Float32Builder, f32, row, idx);
781                    }
782                    DataType::Float64 => {
783                        handle_primitive_array_type!(builder, field, Float64Builder, f64, row, idx);
784                    }
785                    DataType::Utf8 => {
786                        handle_primitive_array_type!(builder, field, StringBuilder, &str, row, idx);
787                    }
788                    DataType::Binary => {
789                        handle_primitive_array_type!(
790                            builder,
791                            field,
792                            BinaryBuilder,
793                            Vec<u8>,
794                            row,
795                            idx
796                        );
797                    }
798                    DataType::Boolean => {
799                        handle_primitive_array_type!(
800                            builder,
801                            field,
802                            BooleanBuilder,
803                            bool,
804                            row,
805                            idx
806                        );
807                    }
808                    _ => {
809                        return Err(DataFusionError::NotImplemented(format!(
810                            "Unsupported list data type {:?} for col: {:?}",
811                            field.data_type(),
812                            col
813                        )));
814                    }
815                },
816                _ => {
817                    return Err(DataFusionError::NotImplemented(format!(
818                        "Unsupported data type {:?} for col: {:?}",
819                        field.data_type(),
820                        col
821                    )));
822                }
823            }
824        }
825    }
826    let projected_columns = array_builders
827        .into_iter()
828        .enumerate()
829        .filter(|(idx, _)| projections_contains(projection, *idx))
830        .map(|(_, mut builder)| builder.finish())
831        .collect::<Vec<ArrayRef>>();
832    Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
833}