1use crate::connection::{big_decimal_to_i128, projections_contains};
2use crate::{
3 Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4 RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13 ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14 Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15 IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
16 Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
17 TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20 DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use derive_getters::Getters;
27use derive_with::With;
28use futures::StreamExt;
29use num_bigint::{BigInt, Sign};
30use std::string::ToString;
31use std::sync::Arc;
32use std::time::{SystemTime, UNIX_EPOCH};
33
34#[derive(Debug, Clone, With, Getters)]
35pub struct PostgresConnectionOptions {
36 pub(crate) host: String,
37 pub(crate) port: u16,
38 pub(crate) username: String,
39 pub(crate) password: String,
40 pub(crate) database: Option<String>,
41 pub(crate) pool_max_size: usize,
42 pub(crate) stream_chunk_size: usize,
43}
44
45impl PostgresConnectionOptions {
46 pub fn new(
47 host: impl Into<String>,
48 port: u16,
49 username: impl Into<String>,
50 password: impl Into<String>,
51 ) -> Self {
52 Self {
53 host: host.into(),
54 port,
55 username: username.into(),
56 password: password.into(),
57 database: None,
58 pool_max_size: 10,
59 stream_chunk_size: 2048,
60 }
61 }
62}
63
64#[derive(Debug)]
65pub struct PostgresPool {
66 pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
67}
68
69#[async_trait::async_trait]
70impl Pool for PostgresPool {
71 async fn get(&self) -> DFResult<Arc<dyn Connection>> {
72 let conn = self.pool.get_owned().await.map_err(|e| {
73 DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
74 })?;
75 Ok(Arc::new(PostgresConnection { conn }))
76 }
77}
78
79pub(crate) async fn connect_postgres(
80 options: &PostgresConnectionOptions,
81) -> DFResult<PostgresPool> {
82 let mut config = bb8_postgres::tokio_postgres::config::Config::new();
83 config
84 .host(&options.host)
85 .port(options.port)
86 .user(&options.username)
87 .password(&options.password);
88 if let Some(database) = &options.database {
89 config.dbname(database);
90 }
91 let manager = PostgresConnectionManager::new(config, NoTls);
92 let pool = bb8::Pool::builder()
93 .max_size(options.pool_max_size as u32)
94 .build(manager)
95 .await
96 .map_err(|e| {
97 DataFusionError::Execution(format!(
98 "Failed to create postgres connection pool due to {e}",
99 ))
100 })?;
101
102 Ok(PostgresPool { pool })
103}
104
105#[derive(Debug)]
106pub(crate) struct PostgresConnection {
107 conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
108}
109
110#[async_trait::async_trait]
111impl Connection for PostgresConnection {
112 async fn infer_schema(&self, sql: &str) -> DFResult<(RemoteSchemaRef, SchemaRef)> {
113 let sql = try_limit1_query(sql).unwrap_or_else(|| sql.to_string());
114 let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
115 DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
116 })?;
117 let remote_schema = Arc::new(build_remote_schema(&row)?);
118 let arrow_schema = Arc::new(remote_schema.to_arrow_schema());
119 Ok((remote_schema, arrow_schema))
120 }
121
122 async fn query(
123 &self,
124 conn_options: &ConnectionOptions,
125 sql: &str,
126 table_schema: SchemaRef,
127 projection: Option<&Vec<usize>>,
128 ) -> DFResult<SendableRecordBatchStream> {
129 let projected_schema = project_schema(&table_schema, projection)?;
130 let projection = projection.cloned();
131 let chunk_size = conn_options.stream_chunk_size();
132 let stream = self
133 .conn
134 .query_raw(sql, Vec::<String>::new())
135 .await
136 .map_err(|e| {
137 DataFusionError::Execution(format!(
138 "Failed to execute query {sql} on postgres due to {e}",
139 ))
140 })?
141 .chunks(chunk_size)
142 .boxed();
143
144 let stream = stream.map(move |rows| {
145 let rows: Vec<Row> = rows
146 .into_iter()
147 .collect::<Result<Vec<_>, _>>()
148 .map_err(|e| {
149 DataFusionError::Execution(format!(
150 "Failed to collect rows from postgres due to {e}",
151 ))
152 })?;
153 rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
154 });
155
156 Ok(Box::pin(RecordBatchStreamAdapter::new(
157 projected_schema,
158 stream,
159 )))
160 }
161}
162
163fn try_limit1_query(sql: &str) -> Option<String> {
164 if sql.trim()[0..6].eq_ignore_ascii_case("select") {
165 Some(format!("SELECT * FROM ({sql}) as __subquery LIMIT 1"))
166 } else {
167 None
168 }
169}
170
171fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<RemoteType> {
172 match pg_type {
173 &Type::INT2 => Ok(RemoteType::Postgres(PostgresType::Int2)),
174 &Type::INT4 => Ok(RemoteType::Postgres(PostgresType::Int4)),
175 &Type::INT8 => Ok(RemoteType::Postgres(PostgresType::Int8)),
176 &Type::FLOAT4 => Ok(RemoteType::Postgres(PostgresType::Float4)),
177 &Type::FLOAT8 => Ok(RemoteType::Postgres(PostgresType::Float8)),
178 &Type::NUMERIC => {
179 let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
180 DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
181 })?;
182 let scale = match v {
183 Some(v) => v.scale,
184 None => 0,
185 };
186 assert!((scale as u32) <= (i8::MAX as u32));
187 Ok(RemoteType::Postgres(PostgresType::Numeric(
188 scale.try_into().unwrap_or_default(),
189 )))
190 }
191 &Type::OID => Ok(RemoteType::Postgres(PostgresType::Oid)),
192 &Type::NAME => Ok(RemoteType::Postgres(PostgresType::Name)),
193 &Type::VARCHAR => Ok(RemoteType::Postgres(PostgresType::Varchar)),
194 &Type::BPCHAR => Ok(RemoteType::Postgres(PostgresType::Bpchar)),
195 &Type::TEXT => Ok(RemoteType::Postgres(PostgresType::Text)),
196 &Type::BYTEA => Ok(RemoteType::Postgres(PostgresType::Bytea)),
197 &Type::DATE => Ok(RemoteType::Postgres(PostgresType::Date)),
198 &Type::TIMESTAMP => Ok(RemoteType::Postgres(PostgresType::Timestamp)),
199 &Type::TIMESTAMPTZ => Ok(RemoteType::Postgres(PostgresType::TimestampTz)),
200 &Type::TIME => Ok(RemoteType::Postgres(PostgresType::Time)),
201 &Type::INTERVAL => Ok(RemoteType::Postgres(PostgresType::Interval)),
202 &Type::BOOL => Ok(RemoteType::Postgres(PostgresType::Bool)),
203 &Type::JSON => Ok(RemoteType::Postgres(PostgresType::Json)),
204 &Type::JSONB => Ok(RemoteType::Postgres(PostgresType::Jsonb)),
205 &Type::INT2_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int2Array)),
206 &Type::INT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int4Array)),
207 &Type::INT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int8Array)),
208 &Type::FLOAT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float4Array)),
209 &Type::FLOAT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float8Array)),
210 &Type::VARCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::VarcharArray)),
211 &Type::BPCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::BpcharArray)),
212 &Type::TEXT_ARRAY => Ok(RemoteType::Postgres(PostgresType::TextArray)),
213 &Type::BYTEA_ARRAY => Ok(RemoteType::Postgres(PostgresType::ByteaArray)),
214 &Type::BOOL_ARRAY => Ok(RemoteType::Postgres(PostgresType::BoolArray)),
215 other if other.name().eq_ignore_ascii_case("geometry") => {
216 Ok(RemoteType::Postgres(PostgresType::PostGisGeometry))
217 }
218 _ => Err(DataFusionError::NotImplemented(format!(
219 "Unsupported postgres type {pg_type:?}",
220 ))),
221 }
222}
223
224fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
225 let mut remote_fields = vec![];
226 for (idx, col) in row.columns().iter().enumerate() {
227 remote_fields.push(RemoteField::new(
228 col.name(),
229 pg_type_to_remote_type(col.type_(), row, idx)?,
230 true,
231 ));
232 }
233 Ok(RemoteSchema::new(remote_fields))
234}
235
236macro_rules! handle_primitive_type {
237 ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
238 let builder = $builder
239 .as_any_mut()
240 .downcast_mut::<$builder_ty>()
241 .unwrap_or_else(|| {
242 panic!(
243 "Failed to downcast builder to {} for {:?} and {:?}",
244 stringify!($builder_ty),
245 $field,
246 $col
247 )
248 });
249 let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
250 DataFusionError::Execution(format!(
251 "Failed to get {} value for {:?} and {:?}: {e:?}",
252 stringify!($value_ty),
253 $field,
254 $col
255 ))
256 })?;
257
258 match v {
259 Some(v) => builder.append_value($convert(v)?),
260 None => builder.append_null(),
261 }
262 }};
263}
264
265macro_rules! handle_primitive_array_type {
266 ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
267 let builder = $builder
268 .as_any_mut()
269 .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
270 .unwrap_or_else(|| {
271 panic!(
272 "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
273 $field, $col
274 )
275 });
276 let values_builder = builder
277 .values()
278 .as_any_mut()
279 .downcast_mut::<$values_builder_ty>()
280 .unwrap_or_else(|| {
281 panic!(
282 "Failed to downcast values builder to {} for {:?} and {:?}",
283 stringify!($builder_ty),
284 $field,
285 $col,
286 )
287 });
288 let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
289 DataFusionError::Execution(format!(
290 "Failed to get {} array value for {:?} and {:?}: {e:?}",
291 stringify!($value_ty),
292 $field,
293 $col,
294 ))
295 })?;
296
297 match v {
298 Some(v) => {
299 let v = v.into_iter().map(Some);
300 values_builder.extend(v);
301 builder.append(true);
302 }
303 None => builder.append_null(),
304 }
305 }};
306}
307
308#[derive(Debug)]
309struct BigDecimalFromSql {
310 inner: BigDecimal,
311 scale: u16,
312}
313
314impl BigDecimalFromSql {
315 fn to_decimal_128(&self) -> Option<i128> {
316 big_decimal_to_i128(&self.inner, Some(self.scale as i32))
317 }
318}
319
320#[allow(clippy::cast_sign_loss)]
321#[allow(clippy::cast_possible_wrap)]
322#[allow(clippy::cast_possible_truncation)]
323impl<'a> FromSql<'a> for BigDecimalFromSql {
324 fn from_sql(
325 _ty: &Type,
326 raw: &'a [u8],
327 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
328 let raw_u16: Vec<u16> = raw
329 .chunks(2)
330 .map(|chunk| {
331 if chunk.len() == 2 {
332 u16::from_be_bytes([chunk[0], chunk[1]])
333 } else {
334 u16::from_be_bytes([chunk[0], 0])
335 }
336 })
337 .collect();
338
339 let base_10_000_digit_count = raw_u16[0];
340 let weight = raw_u16[1] as i16;
341 let sign = raw_u16[2];
342 let scale = raw_u16[3];
343
344 let mut base_10_000_digits = Vec::new();
345 for i in 4..4 + base_10_000_digit_count {
346 base_10_000_digits.push(raw_u16[i as usize]);
347 }
348
349 let mut u8_digits = Vec::new();
350 for &base_10_000_digit in base_10_000_digits.iter().rev() {
351 let mut base_10_000_digit = base_10_000_digit;
352 let mut temp_result = Vec::new();
353 while base_10_000_digit > 0 {
354 temp_result.push((base_10_000_digit % 10) as u8);
355 base_10_000_digit /= 10;
356 }
357 while temp_result.len() < 4 {
358 temp_result.push(0);
359 }
360 u8_digits.extend(temp_result);
361 }
362 u8_digits.reverse();
363
364 let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
365 let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
366 u8_digits.resize(size as usize, 0);
367
368 let sign = match sign {
369 0x4000 => Sign::Minus,
370 0x0000 => Sign::Plus,
371 _ => {
372 return Err(Box::new(DataFusionError::Execution(
373 "Failed to parse big decimal from postgres numeric value".to_string(),
374 )));
375 }
376 };
377
378 let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
379 return Err(Box::new(DataFusionError::Execution(
380 "Failed to parse big decimal from postgres numeric value".to_string(),
381 )));
382 };
383 Ok(BigDecimalFromSql {
384 inner: BigDecimal::new(digits, i64::from(scale)),
385 scale,
386 })
387 }
388
389 fn accepts(ty: &Type) -> bool {
390 matches!(*ty, Type::NUMERIC)
391 }
392}
393
394#[derive(Debug)]
397struct IntervalFromSql {
398 time: i64,
399 day: i32,
400 month: i32,
401}
402
403impl<'a> FromSql<'a> for IntervalFromSql {
404 fn from_sql(
405 _ty: &Type,
406 raw: &'a [u8],
407 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
408 let mut cursor = std::io::Cursor::new(raw);
409
410 let time = cursor.read_i64::<BigEndian>()?;
411 let day = cursor.read_i32::<BigEndian>()?;
412 let month = cursor.read_i32::<BigEndian>()?;
413
414 Ok(IntervalFromSql { time, day, month })
415 }
416
417 fn accepts(ty: &Type) -> bool {
418 matches!(*ty, Type::INTERVAL)
419 }
420}
421
422struct GeometryFromSql<'a> {
423 wkb: &'a [u8],
424}
425
426impl<'a> FromSql<'a> for GeometryFromSql<'a> {
427 fn from_sql(
428 _ty: &Type,
429 raw: &'a [u8],
430 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
431 Ok(GeometryFromSql { wkb: raw })
432 }
433
434 fn accepts(ty: &Type) -> bool {
435 matches!(ty.name(), "geometry")
436 }
437}
438
439fn rows_to_batch(
440 rows: &[Row],
441 table_schema: &SchemaRef,
442 projection: Option<&Vec<usize>>,
443) -> DFResult<RecordBatch> {
444 let projected_schema = project_schema(table_schema, projection)?;
445 let mut array_builders = vec![];
446 for field in table_schema.fields() {
447 let builder = make_builder(field.data_type(), rows.len());
448 array_builders.push(builder);
449 }
450
451 for row in rows {
452 for (idx, field) in table_schema.fields.iter().enumerate() {
453 if !projections_contains(projection, idx) {
454 continue;
455 }
456 let builder = &mut array_builders[idx];
457 let col = row.columns().get(idx);
458 match field.data_type() {
459 DataType::Int16 => {
460 handle_primitive_type!(builder, field, col, Int16Builder, i16, row, idx, |v| {
461 Ok::<_, DataFusionError>(v)
462 });
463 }
464 DataType::Int32 => {
465 handle_primitive_type!(builder, field, col, Int32Builder, i32, row, idx, |v| {
466 Ok::<_, DataFusionError>(v)
467 });
468 }
469 DataType::UInt32 => {
470 handle_primitive_type!(
471 builder,
472 field,
473 col,
474 UInt32Builder,
475 u32,
476 row,
477 idx,
478 |v| { Ok::<_, DataFusionError>(v) }
479 );
480 }
481 DataType::Int64 => {
482 handle_primitive_type!(builder, field, col, Int64Builder, i64, row, idx, |v| {
483 Ok::<_, DataFusionError>(v)
484 });
485 }
486 DataType::Float32 => {
487 handle_primitive_type!(
488 builder,
489 field,
490 col,
491 Float32Builder,
492 f32,
493 row,
494 idx,
495 |v| { Ok::<_, DataFusionError>(v) }
496 );
497 }
498 DataType::Float64 => {
499 handle_primitive_type!(
500 builder,
501 field,
502 col,
503 Float64Builder,
504 f64,
505 row,
506 idx,
507 |v| { Ok::<_, DataFusionError>(v) }
508 );
509 }
510 DataType::Decimal128(_precision, _scale) => {
511 handle_primitive_type!(
512 builder,
513 field,
514 col,
515 Decimal128Builder,
516 BigDecimalFromSql,
517 row,
518 idx,
519 |v: BigDecimalFromSql| {
520 v.to_decimal_128().ok_or_else(|| {
521 DataFusionError::Execution(format!(
522 "Failed to convert BigDecimal {v:?} to i128",
523 ))
524 })
525 }
526 );
527 }
528 DataType::Utf8 => {
529 handle_primitive_type!(
530 builder,
531 field,
532 col,
533 StringBuilder,
534 &str,
535 row,
536 idx,
537 |v| { Ok::<_, DataFusionError>(v) }
538 );
539 }
540 DataType::LargeUtf8 => {
541 if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
542 handle_primitive_type!(
543 builder,
544 field,
545 col,
546 LargeStringBuilder,
547 serde_json::value::Value,
548 row,
549 idx,
550 |v: serde_json::value::Value| {
551 Ok::<_, DataFusionError>(v.to_string())
552 }
553 );
554 } else {
555 handle_primitive_type!(
556 builder,
557 field,
558 col,
559 LargeStringBuilder,
560 &str,
561 row,
562 idx,
563 |v| { Ok::<_, DataFusionError>(v) }
564 );
565 }
566 }
567 DataType::Binary => {
568 if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
569 {
570 handle_primitive_type!(
571 builder,
572 field,
573 col,
574 BinaryBuilder,
575 GeometryFromSql,
576 row,
577 idx,
578 |v: GeometryFromSql| { Ok::<_, DataFusionError>(v.wkb.to_vec()) }
579 );
580 } else if col.is_some()
581 && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
582 {
583 handle_primitive_type!(
584 builder,
585 field,
586 col,
587 BinaryBuilder,
588 serde_json::value::Value,
589 row,
590 idx,
591 |v: serde_json::value::Value| {
592 Ok::<_, DataFusionError>(v.to_string().as_bytes().to_vec())
593 }
594 );
595 } else {
596 handle_primitive_type!(
597 builder,
598 field,
599 col,
600 BinaryBuilder,
601 Vec<u8>,
602 row,
603 idx,
604 |v| { Ok::<_, DataFusionError>(v) }
605 );
606 }
607 }
608 DataType::Timestamp(TimeUnit::Microsecond, None) => {
609 handle_primitive_type!(
610 builder,
611 field,
612 col,
613 TimestampMicrosecondBuilder,
614 SystemTime,
615 row,
616 idx,
617 |v: SystemTime| {
618 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
619 let timestamp: i64 = v
620 .as_micros()
621 .try_into()
622 .expect("Failed to convert SystemTime to i64");
623 Ok(timestamp)
624 } else {
625 Err(DataFusionError::Execution(format!(
626 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
627 )))
628 }
629 }
630 );
631 }
632 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
633 handle_primitive_type!(
634 builder,
635 field,
636 col,
637 TimestampNanosecondBuilder,
638 SystemTime,
639 row,
640 idx,
641 |v: SystemTime| {
642 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
643 let timestamp: i64 = v
644 .as_nanos()
645 .try_into()
646 .expect("Failed to convert SystemTime to i64");
647 Ok(timestamp)
648 } else {
649 Err(DataFusionError::Execution(format!(
650 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
651 )))
652 }
653 }
654 );
655 }
656 DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
657 handle_primitive_type!(
658 builder,
659 field,
660 col,
661 TimestampNanosecondBuilder,
662 chrono::DateTime<chrono::Utc>,
663 row,
664 idx,
665 |v: chrono::DateTime<chrono::Utc>| {
666 let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
667 Ok::<_, DataFusionError>(timestamp)
668 }
669 );
670 }
671 DataType::Time64(TimeUnit::Microsecond) => {
672 handle_primitive_type!(
673 builder,
674 field,
675 col,
676 Time64MicrosecondBuilder,
677 chrono::NaiveTime,
678 row,
679 idx,
680 |v: chrono::NaiveTime| {
681 let seconds = i64::from(v.num_seconds_from_midnight());
682 let microseconds = i64::from(v.nanosecond()) / 1000;
683 Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
684 }
685 );
686 }
687 DataType::Time64(TimeUnit::Nanosecond) => {
688 handle_primitive_type!(
689 builder,
690 field,
691 col,
692 Time64NanosecondBuilder,
693 chrono::NaiveTime,
694 row,
695 idx,
696 |v: chrono::NaiveTime| {
697 let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
698 * 1_000_000_000
699 + i64::from(v.nanosecond());
700 Ok::<_, DataFusionError>(timestamp)
701 }
702 );
703 }
704 DataType::Date32 => {
705 handle_primitive_type!(
706 builder,
707 field,
708 col,
709 Date32Builder,
710 chrono::NaiveDate,
711 row,
712 idx,
713 |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
714 );
715 }
716 DataType::Interval(IntervalUnit::MonthDayNano) => {
717 handle_primitive_type!(
718 builder,
719 field,
720 col,
721 IntervalMonthDayNanoBuilder,
722 IntervalFromSql,
723 row,
724 idx,
725 |v: IntervalFromSql| {
726 let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
727 v.month,
728 v.day,
729 v.time * 1_000,
730 );
731 Ok::<_, DataFusionError>(interval_month_day_nano)
732 }
733 );
734 }
735 DataType::Boolean => {
736 handle_primitive_type!(
737 builder,
738 field,
739 col,
740 BooleanBuilder,
741 bool,
742 row,
743 idx,
744 |v| { Ok::<_, DataFusionError>(v) }
745 );
746 }
747 DataType::List(inner) => match inner.data_type() {
748 DataType::Int16 => {
749 handle_primitive_array_type!(
750 builder,
751 field,
752 col,
753 Int16Builder,
754 i16,
755 row,
756 idx
757 );
758 }
759 DataType::Int32 => {
760 handle_primitive_array_type!(
761 builder,
762 field,
763 col,
764 Int32Builder,
765 i32,
766 row,
767 idx
768 );
769 }
770 DataType::Int64 => {
771 handle_primitive_array_type!(
772 builder,
773 field,
774 col,
775 Int64Builder,
776 i64,
777 row,
778 idx
779 );
780 }
781 DataType::Float32 => {
782 handle_primitive_array_type!(
783 builder,
784 field,
785 col,
786 Float32Builder,
787 f32,
788 row,
789 idx
790 );
791 }
792 DataType::Float64 => {
793 handle_primitive_array_type!(
794 builder,
795 field,
796 col,
797 Float64Builder,
798 f64,
799 row,
800 idx
801 );
802 }
803 DataType::Utf8 => {
804 handle_primitive_array_type!(
805 builder,
806 field,
807 col,
808 StringBuilder,
809 &str,
810 row,
811 idx
812 );
813 }
814 DataType::Binary => {
815 handle_primitive_array_type!(
816 builder,
817 field,
818 col,
819 BinaryBuilder,
820 Vec<u8>,
821 row,
822 idx
823 );
824 }
825 DataType::Boolean => {
826 handle_primitive_array_type!(
827 builder,
828 field,
829 col,
830 BooleanBuilder,
831 bool,
832 row,
833 idx
834 );
835 }
836 _ => {
837 return Err(DataFusionError::NotImplemented(format!(
838 "Unsupported list data type {:?} for col: {:?}",
839 field.data_type(),
840 col
841 )));
842 }
843 },
844 _ => {
845 return Err(DataFusionError::NotImplemented(format!(
846 "Unsupported data type {:?} for col: {:?}",
847 field.data_type(),
848 col
849 )));
850 }
851 }
852 }
853 }
854 let projected_columns = array_builders
855 .into_iter()
856 .enumerate()
857 .filter(|(idx, _)| projections_contains(projection, *idx))
858 .map(|(_, mut builder)| builder.finish())
859 .collect::<Vec<ArrayRef>>();
860 Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
861}