datafusion_remote_table/connection/
postgres.rs

1use crate::connection::{RemoteDbType, big_decimal_to_i128, just_return, projections_contains};
2use crate::{
3    Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4    RemoteSchemaRef, RemoteType, TableSource, Unparse, unparse_array,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14    FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int16Builder, Int32Builder,
15    Int64Builder, IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch,
16    RecordBatchOptions, StringBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder,
17    TimestampMicrosecondBuilder, TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20    DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22
23use datafusion::common::project_schema;
24use datafusion::error::DataFusionError;
25use datafusion::execution::SendableRecordBatchStream;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
27use derive_getters::Getters;
28use derive_with::With;
29use futures::StreamExt;
30use log::debug;
31use num_bigint::{BigInt, Sign};
32use std::any::Any;
33use std::string::ToString;
34use std::sync::Arc;
35use uuid::Uuid;
36
37#[derive(Debug, Clone, With, Getters)]
38pub struct PostgresConnectionOptions {
39    pub(crate) host: String,
40    pub(crate) port: u16,
41    pub(crate) username: String,
42    pub(crate) password: String,
43    pub(crate) database: Option<String>,
44    pub(crate) pool_max_size: usize,
45    pub(crate) stream_chunk_size: usize,
46}
47
48impl PostgresConnectionOptions {
49    pub fn new(
50        host: impl Into<String>,
51        port: u16,
52        username: impl Into<String>,
53        password: impl Into<String>,
54    ) -> Self {
55        Self {
56            host: host.into(),
57            port,
58            username: username.into(),
59            password: password.into(),
60            database: None,
61            pool_max_size: 10,
62            stream_chunk_size: 2048,
63        }
64    }
65}
66
67impl From<PostgresConnectionOptions> for ConnectionOptions {
68    fn from(options: PostgresConnectionOptions) -> Self {
69        ConnectionOptions::Postgres(options)
70    }
71}
72
73#[derive(Debug)]
74pub struct PostgresPool {
75    pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
76}
77
78#[async_trait::async_trait]
79impl Pool for PostgresPool {
80    async fn get(&self) -> DFResult<Arc<dyn Connection>> {
81        let conn = self.pool.get_owned().await.map_err(|e| {
82            DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
83        })?;
84        Ok(Arc::new(PostgresConnection { conn }))
85    }
86}
87
88pub(crate) async fn connect_postgres(
89    options: &PostgresConnectionOptions,
90) -> DFResult<PostgresPool> {
91    let mut config = bb8_postgres::tokio_postgres::config::Config::new();
92    config
93        .host(&options.host)
94        .port(options.port)
95        .user(&options.username)
96        .password(&options.password);
97    if let Some(database) = &options.database {
98        config.dbname(database);
99    }
100    let manager = PostgresConnectionManager::new(config, NoTls);
101    let pool = bb8::Pool::builder()
102        .max_size(options.pool_max_size as u32)
103        .build(manager)
104        .await
105        .map_err(|e| {
106            DataFusionError::Execution(format!(
107                "Failed to create postgres connection pool due to {e}",
108            ))
109        })?;
110
111    Ok(PostgresPool { pool })
112}
113
114#[derive(Debug)]
115pub(crate) struct PostgresConnection {
116    conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
117}
118
119#[async_trait::async_trait]
120impl Connection for PostgresConnection {
121    fn as_any(&self) -> &dyn Any {
122        self
123    }
124
125    async fn infer_schema(&self, source: &TableSource) -> DFResult<RemoteSchemaRef> {
126        match source {
127            TableSource::Table(_table) => {
128                // TODO: improve infer schema for table
129                let sql = RemoteDbType::Postgres.limit_1_query_if_possible(source);
130                let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
131                    DataFusionError::Execution(format!(
132                        "Failed to execute query {sql} on postgres: {e:?}",
133                    ))
134                })?;
135                let remote_schema = Arc::new(build_remote_schema(&row)?);
136                Ok(remote_schema)
137            }
138            TableSource::Query(_query) => {
139                let sql = RemoteDbType::Postgres.limit_1_query_if_possible(source);
140                let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
141                    DataFusionError::Execution(format!(
142                        "Failed to execute query {sql} on postgres: {e:?}",
143                    ))
144                })?;
145                let remote_schema = Arc::new(build_remote_schema(&row)?);
146                Ok(remote_schema)
147            }
148        }
149    }
150
151    async fn query(
152        &self,
153        conn_options: &ConnectionOptions,
154        source: &TableSource,
155        table_schema: SchemaRef,
156        projection: Option<&Vec<usize>>,
157        unparsed_filters: &[String],
158        limit: Option<usize>,
159    ) -> DFResult<SendableRecordBatchStream> {
160        let projected_schema = project_schema(&table_schema, projection)?;
161
162        let sql = RemoteDbType::Postgres.rewrite_query(source, unparsed_filters, limit);
163        debug!("[remote-table] executing postgres query: {sql}");
164
165        let projection = projection.cloned();
166        let chunk_size = conn_options.stream_chunk_size();
167        let stream = self
168            .conn
169            .query_raw(&sql, Vec::<String>::new())
170            .await
171            .map_err(|e| {
172                DataFusionError::Execution(format!(
173                    "Failed to execute query {sql} on postgres: {e}",
174                ))
175            })?
176            .chunks(chunk_size)
177            .boxed();
178
179        let stream = stream.map(move |rows| {
180            let rows: Vec<Row> = rows
181                .into_iter()
182                .collect::<Result<Vec<_>, _>>()
183                .map_err(|e| {
184                    DataFusionError::Execution(format!(
185                        "Failed to collect rows from postgres due to {e}",
186                    ))
187                })?;
188            rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
189        });
190
191        Ok(Box::pin(RecordBatchStreamAdapter::new(
192            projected_schema,
193            stream,
194        )))
195    }
196
197    async fn insert(
198        &self,
199        _conn_options: &ConnectionOptions,
200        unparser: Arc<dyn Unparse>,
201        table: &[String],
202        remote_schema: RemoteSchemaRef,
203        mut input: SendableRecordBatchStream,
204    ) -> DFResult<usize> {
205        let input_schema = input.schema();
206
207        let mut total_count = 0;
208        while let Some(batch) = input.next().await {
209            let batch = batch?;
210
211            let mut columns = Vec::with_capacity(remote_schema.fields.len());
212            for i in 0..batch.num_columns() {
213                let input_field = input_schema.field(i);
214                let remote_field = &remote_schema.fields[i];
215                if remote_field.auto_increment && input_field.is_nullable() {
216                    continue;
217                }
218
219                let remote_type = remote_schema.fields[i].remote_type.clone();
220                let array = batch.column(i);
221                let column = unparse_array(unparser.as_ref(), array, remote_type)?;
222                columns.push(column);
223            }
224
225            let num_rows = columns[0].len();
226            let num_columns = columns.len();
227
228            let mut values = Vec::with_capacity(num_rows);
229            for i in 0..num_rows {
230                let mut value = Vec::with_capacity(num_columns);
231                for col in columns.iter() {
232                    value.push(col[i].as_str());
233                }
234                values.push(format!("({})", value.join(",")));
235            }
236
237            let mut col_names = Vec::with_capacity(remote_schema.fields.len());
238            for (remote_field, input_field) in
239                remote_schema.fields.iter().zip(input_schema.fields.iter())
240            {
241                if remote_field.auto_increment && input_field.is_nullable() {
242                    continue;
243                }
244                col_names.push(RemoteDbType::Postgres.sql_identifier(&remote_field.name));
245            }
246
247            let sql = format!(
248                "INSERT INTO {} ({}) VALUES {}",
249                RemoteDbType::Postgres.sql_table_name(table),
250                col_names.join(","),
251                values.join(",")
252            );
253
254            let count = self.conn.execute(&sql, &[]).await.map_err(|e| {
255                DataFusionError::Execution(format!(
256                    "Failed to execute insert statement on postgres: {e:?}, sql: {sql}"
257                ))
258            })?;
259            total_count += count as usize;
260        }
261
262        Ok(total_count)
263    }
264}
265
266fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<PostgresType> {
267    match pg_type {
268        &Type::INT2 => Ok(PostgresType::Int2),
269        &Type::INT4 => Ok(PostgresType::Int4),
270        &Type::INT8 => Ok(PostgresType::Int8),
271        &Type::FLOAT4 => Ok(PostgresType::Float4),
272        &Type::FLOAT8 => Ok(PostgresType::Float8),
273        &Type::NUMERIC => {
274            let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
275                DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
276            })?;
277            let scale = match v {
278                Some(v) => v.scale,
279                None => 0,
280            };
281            assert!((scale as u32) <= (i8::MAX as u32));
282            Ok(PostgresType::Numeric(scale.try_into().unwrap_or_default()))
283        }
284        &Type::OID => Ok(PostgresType::Oid),
285        &Type::NAME => Ok(PostgresType::Name),
286        &Type::VARCHAR => Ok(PostgresType::Varchar),
287        &Type::BPCHAR => Ok(PostgresType::Bpchar),
288        &Type::TEXT => Ok(PostgresType::Text),
289        &Type::BYTEA => Ok(PostgresType::Bytea),
290        &Type::DATE => Ok(PostgresType::Date),
291        &Type::TIMESTAMP => Ok(PostgresType::Timestamp),
292        &Type::TIMESTAMPTZ => Ok(PostgresType::TimestampTz),
293        &Type::TIME => Ok(PostgresType::Time),
294        &Type::INTERVAL => Ok(PostgresType::Interval),
295        &Type::BOOL => Ok(PostgresType::Bool),
296        &Type::JSON => Ok(PostgresType::Json),
297        &Type::JSONB => Ok(PostgresType::Jsonb),
298        &Type::INT2_ARRAY => Ok(PostgresType::Int2Array),
299        &Type::INT4_ARRAY => Ok(PostgresType::Int4Array),
300        &Type::INT8_ARRAY => Ok(PostgresType::Int8Array),
301        &Type::FLOAT4_ARRAY => Ok(PostgresType::Float4Array),
302        &Type::FLOAT8_ARRAY => Ok(PostgresType::Float8Array),
303        &Type::VARCHAR_ARRAY => Ok(PostgresType::VarcharArray),
304        &Type::BPCHAR_ARRAY => Ok(PostgresType::BpcharArray),
305        &Type::TEXT_ARRAY => Ok(PostgresType::TextArray),
306        &Type::BYTEA_ARRAY => Ok(PostgresType::ByteaArray),
307        &Type::BOOL_ARRAY => Ok(PostgresType::BoolArray),
308        &Type::XML => Ok(PostgresType::Xml),
309        &Type::UUID => Ok(PostgresType::Uuid),
310        other if other.name().eq_ignore_ascii_case("geometry") => Ok(PostgresType::PostGisGeometry),
311        _ => Err(DataFusionError::NotImplemented(format!(
312            "Unsupported postgres type {pg_type:?}",
313        ))),
314    }
315}
316
317fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
318    let mut remote_fields = vec![];
319    for (idx, col) in row.columns().iter().enumerate() {
320        remote_fields.push(RemoteField::new(
321            col.name(),
322            RemoteType::Postgres(pg_type_to_remote_type(col.type_(), row, idx)?),
323            true,
324        ));
325    }
326    Ok(RemoteSchema::new(remote_fields))
327}
328
329macro_rules! handle_primitive_type {
330    ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
331        let builder = $builder
332            .as_any_mut()
333            .downcast_mut::<$builder_ty>()
334            .unwrap_or_else(|| {
335                panic!(
336                    "Failed to downcast builder to {} for {:?} and {:?}",
337                    stringify!($builder_ty),
338                    $field,
339                    $col
340                )
341            });
342        let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
343            DataFusionError::Execution(format!(
344                "Failed to get {} value for {:?} and {:?}: {e:?}",
345                stringify!($value_ty),
346                $field,
347                $col
348            ))
349        })?;
350
351        match v {
352            Some(v) => builder.append_value($convert(v)?),
353            None => builder.append_null(),
354        }
355    }};
356}
357
358macro_rules! handle_primitive_array_type {
359    ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
360        let builder = $builder
361            .as_any_mut()
362            .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
363            .unwrap_or_else(|| {
364                panic!(
365                    "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
366                    $field, $col
367                )
368            });
369        let values_builder = builder
370            .values()
371            .as_any_mut()
372            .downcast_mut::<$values_builder_ty>()
373            .unwrap_or_else(|| {
374                panic!(
375                    "Failed to downcast values builder to {} for {:?} and {:?}",
376                    stringify!($builder_ty),
377                    $field,
378                    $col,
379                )
380            });
381        let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
382            DataFusionError::Execution(format!(
383                "Failed to get {} array value for {:?} and {:?}: {e:?}",
384                stringify!($value_ty),
385                $field,
386                $col,
387            ))
388        })?;
389
390        match v {
391            Some(v) => {
392                let v = v.into_iter().map(Some);
393                values_builder.extend(v);
394                builder.append(true);
395            }
396            None => builder.append_null(),
397        }
398    }};
399}
400
401#[derive(Debug)]
402struct BigDecimalFromSql {
403    inner: BigDecimal,
404    scale: u16,
405}
406
407impl BigDecimalFromSql {
408    fn to_decimal_128(&self) -> Option<i128> {
409        big_decimal_to_i128(&self.inner, Some(self.scale as i32))
410    }
411}
412
413#[allow(clippy::cast_sign_loss)]
414#[allow(clippy::cast_possible_wrap)]
415#[allow(clippy::cast_possible_truncation)]
416impl<'a> FromSql<'a> for BigDecimalFromSql {
417    fn from_sql(
418        _ty: &Type,
419        raw: &'a [u8],
420    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
421        let raw_u16: Vec<u16> = raw
422            .chunks(2)
423            .map(|chunk| {
424                if chunk.len() == 2 {
425                    u16::from_be_bytes([chunk[0], chunk[1]])
426                } else {
427                    u16::from_be_bytes([chunk[0], 0])
428                }
429            })
430            .collect();
431
432        let base_10_000_digit_count = raw_u16[0];
433        let weight = raw_u16[1] as i16;
434        let sign = raw_u16[2];
435        let scale = raw_u16[3];
436
437        let mut base_10_000_digits = Vec::new();
438        for i in 4..4 + base_10_000_digit_count {
439            base_10_000_digits.push(raw_u16[i as usize]);
440        }
441
442        let mut u8_digits = Vec::new();
443        for &base_10_000_digit in base_10_000_digits.iter().rev() {
444            let mut base_10_000_digit = base_10_000_digit;
445            let mut temp_result = Vec::new();
446            while base_10_000_digit > 0 {
447                temp_result.push((base_10_000_digit % 10) as u8);
448                base_10_000_digit /= 10;
449            }
450            while temp_result.len() < 4 {
451                temp_result.push(0);
452            }
453            u8_digits.extend(temp_result);
454        }
455        u8_digits.reverse();
456
457        let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
458        let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
459        u8_digits.resize(size as usize, 0);
460
461        let sign = match sign {
462            0x4000 => Sign::Minus,
463            0x0000 => Sign::Plus,
464            _ => {
465                return Err(Box::new(DataFusionError::Execution(
466                    "Failed to parse big decimal from postgres numeric value".to_string(),
467                )));
468            }
469        };
470
471        let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
472            return Err(Box::new(DataFusionError::Execution(
473                "Failed to parse big decimal from postgres numeric value".to_string(),
474            )));
475        };
476        Ok(BigDecimalFromSql {
477            inner: BigDecimal::new(digits, i64::from(scale)),
478            scale,
479        })
480    }
481
482    fn accepts(ty: &Type) -> bool {
483        matches!(*ty, Type::NUMERIC)
484    }
485}
486
487// interval_send - Postgres C (https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/timestamp.c#L1032)
488// interval values are internally stored as three integral fields: months, days, and microseconds
489#[derive(Debug)]
490struct IntervalFromSql {
491    time: i64,
492    day: i32,
493    month: i32,
494}
495
496impl<'a> FromSql<'a> for IntervalFromSql {
497    fn from_sql(
498        _ty: &Type,
499        raw: &'a [u8],
500    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
501        let mut cursor = std::io::Cursor::new(raw);
502
503        let time = cursor.read_i64::<BigEndian>()?;
504        let day = cursor.read_i32::<BigEndian>()?;
505        let month = cursor.read_i32::<BigEndian>()?;
506
507        Ok(IntervalFromSql { time, day, month })
508    }
509
510    fn accepts(ty: &Type) -> bool {
511        matches!(*ty, Type::INTERVAL)
512    }
513}
514
515struct GeometryFromSql<'a> {
516    wkb: &'a [u8],
517}
518
519impl<'a> FromSql<'a> for GeometryFromSql<'a> {
520    fn from_sql(
521        _ty: &Type,
522        raw: &'a [u8],
523    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
524        Ok(GeometryFromSql { wkb: raw })
525    }
526
527    fn accepts(ty: &Type) -> bool {
528        matches!(ty.name(), "geometry")
529    }
530}
531
532struct XmlFromSql<'a> {
533    xml: &'a str,
534}
535
536impl<'a> FromSql<'a> for XmlFromSql<'a> {
537    fn from_sql(
538        _ty: &Type,
539        raw: &'a [u8],
540    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
541        let xml = str::from_utf8(raw)?;
542        Ok(XmlFromSql { xml })
543    }
544
545    fn accepts(ty: &Type) -> bool {
546        matches!(*ty, Type::XML)
547    }
548}
549
550fn rows_to_batch(
551    rows: &[Row],
552    table_schema: &SchemaRef,
553    projection: Option<&Vec<usize>>,
554) -> DFResult<RecordBatch> {
555    let projected_schema = project_schema(table_schema, projection)?;
556    let mut array_builders = vec![];
557    for field in table_schema.fields() {
558        let builder = make_builder(field.data_type(), rows.len());
559        array_builders.push(builder);
560    }
561
562    for row in rows {
563        for (idx, field) in table_schema.fields.iter().enumerate() {
564            if !projections_contains(projection, idx) {
565                continue;
566            }
567            let builder = &mut array_builders[idx];
568            let col = row.columns().get(idx);
569            match field.data_type() {
570                DataType::Int16 => {
571                    handle_primitive_type!(
572                        builder,
573                        field,
574                        col,
575                        Int16Builder,
576                        i16,
577                        row,
578                        idx,
579                        just_return
580                    );
581                }
582                DataType::Int32 => {
583                    handle_primitive_type!(
584                        builder,
585                        field,
586                        col,
587                        Int32Builder,
588                        i32,
589                        row,
590                        idx,
591                        just_return
592                    );
593                }
594                DataType::UInt32 => {
595                    handle_primitive_type!(
596                        builder,
597                        field,
598                        col,
599                        UInt32Builder,
600                        u32,
601                        row,
602                        idx,
603                        just_return
604                    );
605                }
606                DataType::Int64 => {
607                    handle_primitive_type!(
608                        builder,
609                        field,
610                        col,
611                        Int64Builder,
612                        i64,
613                        row,
614                        idx,
615                        just_return
616                    );
617                }
618                DataType::Float32 => {
619                    handle_primitive_type!(
620                        builder,
621                        field,
622                        col,
623                        Float32Builder,
624                        f32,
625                        row,
626                        idx,
627                        just_return
628                    );
629                }
630                DataType::Float64 => {
631                    handle_primitive_type!(
632                        builder,
633                        field,
634                        col,
635                        Float64Builder,
636                        f64,
637                        row,
638                        idx,
639                        just_return
640                    );
641                }
642                DataType::Decimal128(_precision, _scale) => {
643                    handle_primitive_type!(
644                        builder,
645                        field,
646                        col,
647                        Decimal128Builder,
648                        BigDecimalFromSql,
649                        row,
650                        idx,
651                        |v: BigDecimalFromSql| {
652                            v.to_decimal_128().ok_or_else(|| {
653                                DataFusionError::Execution(format!(
654                                    "Failed to convert BigDecimal {v:?} to i128",
655                                ))
656                            })
657                        }
658                    );
659                }
660                DataType::Utf8 => {
661                    if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("xml") {
662                        let convert: for<'a> fn(XmlFromSql<'a>) -> DFResult<&'a str> =
663                            |v| Ok(v.xml);
664                        handle_primitive_type!(
665                            builder,
666                            field,
667                            col,
668                            StringBuilder,
669                            XmlFromSql,
670                            row,
671                            idx,
672                            convert
673                        );
674                    } else {
675                        handle_primitive_type!(
676                            builder,
677                            field,
678                            col,
679                            StringBuilder,
680                            &str,
681                            row,
682                            idx,
683                            just_return
684                        );
685                    }
686                }
687                DataType::LargeUtf8 => {
688                    if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
689                        handle_primitive_type!(
690                            builder,
691                            field,
692                            col,
693                            LargeStringBuilder,
694                            serde_json::value::Value,
695                            row,
696                            idx,
697                            |v: serde_json::value::Value| {
698                                Ok::<_, DataFusionError>(v.to_string())
699                            }
700                        );
701                    } else {
702                        handle_primitive_type!(
703                            builder,
704                            field,
705                            col,
706                            LargeStringBuilder,
707                            &str,
708                            row,
709                            idx,
710                            just_return
711                        );
712                    }
713                }
714                DataType::Binary => {
715                    if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
716                    {
717                        let convert: for<'a> fn(GeometryFromSql<'a>) -> DFResult<&'a [u8]> =
718                            |v| Ok(v.wkb);
719                        handle_primitive_type!(
720                            builder,
721                            field,
722                            col,
723                            BinaryBuilder,
724                            GeometryFromSql,
725                            row,
726                            idx,
727                            convert
728                        );
729                    } else if col.is_some()
730                        && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
731                    {
732                        handle_primitive_type!(
733                            builder,
734                            field,
735                            col,
736                            BinaryBuilder,
737                            serde_json::value::Value,
738                            row,
739                            idx,
740                            |v: serde_json::value::Value| {
741                                Ok::<_, DataFusionError>(v.to_string().into_bytes())
742                            }
743                        );
744                    } else {
745                        handle_primitive_type!(
746                            builder,
747                            field,
748                            col,
749                            BinaryBuilder,
750                            Vec<u8>,
751                            row,
752                            idx,
753                            just_return
754                        );
755                    }
756                }
757                DataType::FixedSizeBinary(_) => {
758                    let builder = builder
759                        .as_any_mut()
760                        .downcast_mut::<FixedSizeBinaryBuilder>()
761                        .unwrap_or_else(|| {
762                            panic!(
763                                "Failed to downcast builder to FixedSizeBinaryBuilder for {field:?}"
764                            )
765                        });
766                    let v = if col.is_some()
767                        && col.unwrap().type_().name().eq_ignore_ascii_case("uuid")
768                    {
769                        let v: Option<Uuid> = row.try_get(idx).map_err(|e| {
770                            DataFusionError::Execution(format!(
771                                "Failed to get Uuid value for field {:?}: {e:?}",
772                                field
773                            ))
774                        })?;
775                        v.map(|v| v.as_bytes().to_vec())
776                    } else {
777                        let v: Option<Vec<u8>> = row.try_get(idx).map_err(|e| {
778                            DataFusionError::Execution(format!(
779                                "Failed to get FixedSizeBinary value for field {:?}: {e:?}",
780                                field
781                            ))
782                        })?;
783                        v
784                    };
785
786                    match v {
787                        Some(v) => builder.append_value(v)?,
788                        None => builder.append_null(),
789                    }
790                }
791                DataType::Timestamp(TimeUnit::Microsecond, None) => {
792                    handle_primitive_type!(
793                        builder,
794                        field,
795                        col,
796                        TimestampMicrosecondBuilder,
797                        chrono::NaiveDateTime,
798                        row,
799                        idx,
800                        |v: chrono::NaiveDateTime| {
801                            let timestamp: i64 = v.and_utc().timestamp_micros();
802
803                            Ok::<i64, DataFusionError>(timestamp)
804                        }
805                    );
806                }
807                DataType::Timestamp(TimeUnit::Microsecond, Some(_tz)) => {
808                    handle_primitive_type!(
809                        builder,
810                        field,
811                        col,
812                        TimestampMicrosecondBuilder,
813                        chrono::DateTime<chrono::Utc>,
814                        row,
815                        idx,
816                        |v: chrono::DateTime<chrono::Utc>| {
817                            let timestamp: i64 = v.timestamp_micros();
818                            Ok::<_, DataFusionError>(timestamp)
819                        }
820                    );
821                }
822                DataType::Timestamp(TimeUnit::Nanosecond, None) => {
823                    handle_primitive_type!(
824                        builder,
825                        field,
826                        col,
827                        TimestampNanosecondBuilder,
828                        chrono::NaiveDateTime,
829                        row,
830                        idx,
831                        |v: chrono::NaiveDateTime| {
832                            let timestamp: i64 = v.and_utc().timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
833                            Ok::<i64, DataFusionError>(timestamp)
834                        }
835                    );
836                }
837                DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
838                    handle_primitive_type!(
839                        builder,
840                        field,
841                        col,
842                        TimestampNanosecondBuilder,
843                        chrono::DateTime<chrono::Utc>,
844                        row,
845                        idx,
846                        |v: chrono::DateTime<chrono::Utc>| {
847                            let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
848                            Ok::<_, DataFusionError>(timestamp)
849                        }
850                    );
851                }
852                DataType::Time64(TimeUnit::Microsecond) => {
853                    handle_primitive_type!(
854                        builder,
855                        field,
856                        col,
857                        Time64MicrosecondBuilder,
858                        chrono::NaiveTime,
859                        row,
860                        idx,
861                        |v: chrono::NaiveTime| {
862                            let seconds = i64::from(v.num_seconds_from_midnight());
863                            let microseconds = i64::from(v.nanosecond()) / 1000;
864                            Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
865                        }
866                    );
867                }
868                DataType::Time64(TimeUnit::Nanosecond) => {
869                    handle_primitive_type!(
870                        builder,
871                        field,
872                        col,
873                        Time64NanosecondBuilder,
874                        chrono::NaiveTime,
875                        row,
876                        idx,
877                        |v: chrono::NaiveTime| {
878                            let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
879                                * 1_000_000_000
880                                + i64::from(v.nanosecond());
881                            Ok::<_, DataFusionError>(timestamp)
882                        }
883                    );
884                }
885                DataType::Date32 => {
886                    handle_primitive_type!(
887                        builder,
888                        field,
889                        col,
890                        Date32Builder,
891                        chrono::NaiveDate,
892                        row,
893                        idx,
894                        |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
895                    );
896                }
897                DataType::Interval(IntervalUnit::MonthDayNano) => {
898                    handle_primitive_type!(
899                        builder,
900                        field,
901                        col,
902                        IntervalMonthDayNanoBuilder,
903                        IntervalFromSql,
904                        row,
905                        idx,
906                        |v: IntervalFromSql| {
907                            let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
908                                v.month,
909                                v.day,
910                                v.time * 1_000,
911                            );
912                            Ok::<_, DataFusionError>(interval_month_day_nano)
913                        }
914                    );
915                }
916                DataType::Boolean => {
917                    handle_primitive_type!(
918                        builder,
919                        field,
920                        col,
921                        BooleanBuilder,
922                        bool,
923                        row,
924                        idx,
925                        just_return
926                    );
927                }
928                DataType::List(inner) => match inner.data_type() {
929                    DataType::Int16 => {
930                        handle_primitive_array_type!(
931                            builder,
932                            field,
933                            col,
934                            Int16Builder,
935                            i16,
936                            row,
937                            idx
938                        );
939                    }
940                    DataType::Int32 => {
941                        handle_primitive_array_type!(
942                            builder,
943                            field,
944                            col,
945                            Int32Builder,
946                            i32,
947                            row,
948                            idx
949                        );
950                    }
951                    DataType::Int64 => {
952                        handle_primitive_array_type!(
953                            builder,
954                            field,
955                            col,
956                            Int64Builder,
957                            i64,
958                            row,
959                            idx
960                        );
961                    }
962                    DataType::Float32 => {
963                        handle_primitive_array_type!(
964                            builder,
965                            field,
966                            col,
967                            Float32Builder,
968                            f32,
969                            row,
970                            idx
971                        );
972                    }
973                    DataType::Float64 => {
974                        handle_primitive_array_type!(
975                            builder,
976                            field,
977                            col,
978                            Float64Builder,
979                            f64,
980                            row,
981                            idx
982                        );
983                    }
984                    DataType::Utf8 => {
985                        handle_primitive_array_type!(
986                            builder,
987                            field,
988                            col,
989                            StringBuilder,
990                            &str,
991                            row,
992                            idx
993                        );
994                    }
995                    DataType::Binary => {
996                        handle_primitive_array_type!(
997                            builder,
998                            field,
999                            col,
1000                            BinaryBuilder,
1001                            Vec<u8>,
1002                            row,
1003                            idx
1004                        );
1005                    }
1006                    DataType::Boolean => {
1007                        handle_primitive_array_type!(
1008                            builder,
1009                            field,
1010                            col,
1011                            BooleanBuilder,
1012                            bool,
1013                            row,
1014                            idx
1015                        );
1016                    }
1017                    _ => {
1018                        return Err(DataFusionError::NotImplemented(format!(
1019                            "Unsupported list data type {} for col: {:?}",
1020                            field.data_type(),
1021                            col
1022                        )));
1023                    }
1024                },
1025                _ => {
1026                    return Err(DataFusionError::NotImplemented(format!(
1027                        "Unsupported data type {} for col: {:?}",
1028                        field.data_type(),
1029                        col
1030                    )));
1031                }
1032            }
1033        }
1034    }
1035    let projected_columns = array_builders
1036        .into_iter()
1037        .enumerate()
1038        .filter(|(idx, _)| projections_contains(projection, *idx))
1039        .map(|(_, mut builder)| builder.finish())
1040        .collect::<Vec<ArrayRef>>();
1041    let options = RecordBatchOptions::new().with_row_count(Some(rows.len()));
1042    Ok(RecordBatch::try_new_with_options(
1043        projected_schema,
1044        projected_columns,
1045        &options,
1046    )?)
1047}