1use crate::connection::{big_decimal_to_i128, projections_contains};
2use crate::transform::transform_batch;
3use crate::{
4 Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
5 RemoteSchemaRef, RemoteType, Transform,
6};
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bb8_postgres::PostgresConnectionManager;
10use bigdecimal::BigDecimal;
11use byteorder::{BigEndian, ReadBytesExt};
12use chrono::Timelike;
13use datafusion::arrow::array::{
14 make_builder, ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder,
15 Decimal128Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
16 IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
17 Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
18 TimestampNanosecondBuilder,
19};
20use datafusion::arrow::datatypes::{
21 DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
22};
23use datafusion::common::project_schema;
24use datafusion::error::DataFusionError;
25use datafusion::execution::SendableRecordBatchStream;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
27use futures::StreamExt;
28use num_bigint::{BigInt, Sign};
29use std::string::ToString;
30use std::sync::Arc;
31use std::time::{SystemTime, UNIX_EPOCH};
32
33#[derive(Debug, Clone, derive_with::With)]
34pub struct PostgresConnectionOptions {
35 pub(crate) host: String,
36 pub(crate) port: u16,
37 pub(crate) username: String,
38 pub(crate) password: String,
39 pub(crate) database: Option<String>,
40 pub(crate) chunk_size: Option<usize>,
41}
42
43impl PostgresConnectionOptions {
44 pub fn new(
45 host: impl Into<String>,
46 port: u16,
47 username: impl Into<String>,
48 password: impl Into<String>,
49 ) -> Self {
50 Self {
51 host: host.into(),
52 port,
53 username: username.into(),
54 password: password.into(),
55 database: None,
56 chunk_size: None,
57 }
58 }
59}
60
61#[derive(Debug)]
62pub struct PostgresPool {
63 pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
64}
65
66#[async_trait::async_trait]
67impl Pool for PostgresPool {
68 async fn get(&self) -> DFResult<Arc<dyn Connection>> {
69 let conn = self.pool.get_owned().await.map_err(|e| {
70 DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
71 })?;
72 Ok(Arc::new(PostgresConnection { conn }))
73 }
74}
75
76pub(crate) async fn connect_postgres(
77 options: &PostgresConnectionOptions,
78) -> DFResult<PostgresPool> {
79 let mut config = bb8_postgres::tokio_postgres::config::Config::new();
80 config
81 .host(&options.host)
82 .port(options.port)
83 .user(&options.username)
84 .password(&options.password);
85 if let Some(database) = &options.database {
86 config.dbname(database);
87 }
88 let manager = PostgresConnectionManager::new(config, NoTls);
89 let pool = bb8::Pool::builder()
90 .max_size(5)
91 .build(manager)
92 .await
93 .map_err(|e| {
94 DataFusionError::Execution(format!(
95 "Failed to create postgres connection pool due to {e}",
96 ))
97 })?;
98
99 Ok(PostgresPool { pool })
100}
101
102#[derive(Debug)]
103pub(crate) struct PostgresConnection {
104 conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
105}
106
107#[async_trait::async_trait]
108impl Connection for PostgresConnection {
109 async fn infer_schema(
110 &self,
111 sql: &str,
112 transform: Option<Arc<dyn Transform>>,
113 ) -> DFResult<(RemoteSchemaRef, SchemaRef)> {
114 let mut stream = self
115 .conn
116 .query_raw(sql, Vec::<String>::new())
117 .await
118 .map_err(|e| {
119 DataFusionError::Execution(format!(
120 "Failed to execute query {sql} on postgres due to {e}",
121 ))
122 })?
123 .chunks(1)
124 .boxed();
125
126 let Some(first_chunk) = stream.next().await else {
127 return Err(DataFusionError::Execution(
128 "No data returned from postgres".to_string(),
129 ));
130 };
131 let first_chunk: Vec<Row> = first_chunk
132 .into_iter()
133 .collect::<Result<Vec<_>, _>>()
134 .map_err(|e| {
135 DataFusionError::Execution(format!(
136 "Failed to collect rows from postgres due to {e}",
137 ))
138 })?;
139 let Some(first_row) = first_chunk.first() else {
140 return Err(DataFusionError::Execution(
141 "No data returned from postgres".to_string(),
142 ));
143 };
144 let remote_schema = Arc::new(build_remote_schema(first_row)?);
145 let arrow_schema = Arc::new(remote_schema.to_arrow_schema());
146 if let Some(transform) = transform {
147 let batch = rows_to_batch(std::slice::from_ref(first_row), &arrow_schema, None)?;
148 let transformed_batch = transform_batch(
149 batch,
150 transform.as_ref(),
151 &arrow_schema,
152 None,
153 Some(&remote_schema),
154 )?;
155 Ok((remote_schema, transformed_batch.schema()))
156 } else {
157 Ok((remote_schema, arrow_schema))
158 }
159 }
160
161 async fn query(
162 &self,
163 conn_options: &ConnectionOptions,
164 sql: &str,
165 table_schema: SchemaRef,
166 projection: Option<&Vec<usize>>,
167 ) -> DFResult<SendableRecordBatchStream> {
168 let projected_schema = project_schema(&table_schema, projection)?;
169 let projection = projection.cloned();
170 let chunk_size = conn_options.chunk_size();
171 let stream = self
172 .conn
173 .query_raw(sql, Vec::<String>::new())
174 .await
175 .map_err(|e| {
176 DataFusionError::Execution(format!(
177 "Failed to execute query {sql} on postgres due to {e}",
178 ))
179 })?
180 .chunks(chunk_size.unwrap_or(2048))
181 .boxed();
182
183 let stream = stream.map(move |rows| {
184 let rows: Vec<Row> = rows
185 .into_iter()
186 .collect::<Result<Vec<_>, _>>()
187 .map_err(|e| {
188 DataFusionError::Execution(format!(
189 "Failed to collect rows from postgres due to {e}",
190 ))
191 })?;
192 rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
193 });
194
195 Ok(Box::pin(RecordBatchStreamAdapter::new(
196 projected_schema,
197 stream,
198 )))
199 }
200}
201
202fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<RemoteType> {
203 match pg_type {
204 &Type::INT2 => Ok(RemoteType::Postgres(PostgresType::Int2)),
205 &Type::INT4 => Ok(RemoteType::Postgres(PostgresType::Int4)),
206 &Type::INT8 => Ok(RemoteType::Postgres(PostgresType::Int8)),
207 &Type::FLOAT4 => Ok(RemoteType::Postgres(PostgresType::Float4)),
208 &Type::FLOAT8 => Ok(RemoteType::Postgres(PostgresType::Float8)),
209 &Type::NUMERIC => {
210 let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
211 DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
212 })?;
213 let scale = match v {
214 Some(v) => v.scale,
215 None => 0,
216 };
217 assert!((scale as u32) <= (i8::MAX as u32));
218 Ok(RemoteType::Postgres(PostgresType::Numeric(
219 scale.try_into().unwrap_or_default(),
220 )))
221 }
222 &Type::NAME => Ok(RemoteType::Postgres(PostgresType::Name)),
223 &Type::VARCHAR => Ok(RemoteType::Postgres(PostgresType::Varchar)),
224 &Type::BPCHAR => Ok(RemoteType::Postgres(PostgresType::Bpchar)),
225 &Type::TEXT => Ok(RemoteType::Postgres(PostgresType::Text)),
226 &Type::BYTEA => Ok(RemoteType::Postgres(PostgresType::Bytea)),
227 &Type::DATE => Ok(RemoteType::Postgres(PostgresType::Date)),
228 &Type::TIMESTAMP => Ok(RemoteType::Postgres(PostgresType::Timestamp)),
229 &Type::TIMESTAMPTZ => Ok(RemoteType::Postgres(PostgresType::TimestampTz)),
230 &Type::TIME => Ok(RemoteType::Postgres(PostgresType::Time)),
231 &Type::INTERVAL => Ok(RemoteType::Postgres(PostgresType::Interval)),
232 &Type::BOOL => Ok(RemoteType::Postgres(PostgresType::Bool)),
233 &Type::JSON => Ok(RemoteType::Postgres(PostgresType::Json)),
234 &Type::JSONB => Ok(RemoteType::Postgres(PostgresType::Jsonb)),
235 &Type::INT2_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int2Array)),
236 &Type::INT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int4Array)),
237 &Type::INT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int8Array)),
238 &Type::FLOAT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float4Array)),
239 &Type::FLOAT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float8Array)),
240 &Type::VARCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::VarcharArray)),
241 &Type::BPCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::BpcharArray)),
242 &Type::TEXT_ARRAY => Ok(RemoteType::Postgres(PostgresType::TextArray)),
243 &Type::BYTEA_ARRAY => Ok(RemoteType::Postgres(PostgresType::ByteaArray)),
244 &Type::BOOL_ARRAY => Ok(RemoteType::Postgres(PostgresType::BoolArray)),
245 other if other.name().eq_ignore_ascii_case("geometry") => {
246 Ok(RemoteType::Postgres(PostgresType::PostGisGeometry))
247 }
248 _ => Err(DataFusionError::NotImplemented(format!(
249 "Unsupported postgres type {pg_type:?}",
250 ))),
251 }
252}
253
254fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
255 let mut remote_fields = vec![];
256 for (idx, col) in row.columns().iter().enumerate() {
257 remote_fields.push(RemoteField::new(
258 col.name(),
259 pg_type_to_remote_type(col.type_(), row, idx)?,
260 true,
261 ));
262 }
263 Ok(RemoteSchema::new(remote_fields))
264}
265
266macro_rules! handle_primitive_type {
267 ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
268 let builder = $builder
269 .as_any_mut()
270 .downcast_mut::<$builder_ty>()
271 .unwrap_or_else(|| {
272 panic!(
273 concat!(
274 "Failed to downcast builder to ",
275 stringify!($builder_ty),
276 " for {:?} and {:?}"
277 ),
278 $field, $col
279 )
280 });
281 let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
282 DataFusionError::Execution(format!(
283 "Failed to get BigDecimal value for {:?} and {:?}: {e:?}",
284 $field, $col
285 ))
286 })?;
287
288 match v {
289 Some(v) => builder.append_value($convert(v)?),
290 None => builder.append_null(),
291 }
292 }};
293}
294
295macro_rules! handle_primitive_array_type {
296 ($builder:expr, $field:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
297 let builder = $builder
298 .as_any_mut()
299 .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
300 .unwrap_or_else(|| {
301 panic!(
302 "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?}",
303 $field
304 )
305 });
306 let values_builder = builder
307 .values()
308 .as_any_mut()
309 .downcast_mut::<$values_builder_ty>()
310 .unwrap_or_else(|| {
311 panic!(
312 concat!(
313 "Failed to downcast values builder to ",
314 stringify!($builder_ty),
315 " for {:?}"
316 ),
317 $field
318 )
319 });
320 let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).unwrap_or_else(|e| {
321 panic!(
322 concat!(
323 "Failed to get ",
324 stringify!($value_ty),
325 " array value for {:?}: {:?}"
326 ),
327 $field, e
328 )
329 });
330
331 match v {
332 Some(v) => {
333 let v = v.into_iter().map(Some);
334 values_builder.extend(v);
335 builder.append(true);
336 }
337 None => builder.append_null(),
338 }
339 }};
340}
341
342#[derive(Debug)]
343struct BigDecimalFromSql {
344 inner: BigDecimal,
345 scale: u16,
346}
347
348impl BigDecimalFromSql {
349 fn to_decimal_128(&self) -> Option<i128> {
350 big_decimal_to_i128(&self.inner, Some(self.scale as u32))
351 }
352}
353
354#[allow(clippy::cast_sign_loss)]
355#[allow(clippy::cast_possible_wrap)]
356#[allow(clippy::cast_possible_truncation)]
357impl<'a> FromSql<'a> for BigDecimalFromSql {
358 fn from_sql(
359 _ty: &Type,
360 raw: &'a [u8],
361 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
362 let raw_u16: Vec<u16> = raw
363 .chunks(2)
364 .map(|chunk| {
365 if chunk.len() == 2 {
366 u16::from_be_bytes([chunk[0], chunk[1]])
367 } else {
368 u16::from_be_bytes([chunk[0], 0])
369 }
370 })
371 .collect();
372
373 let base_10_000_digit_count = raw_u16[0];
374 let weight = raw_u16[1] as i16;
375 let sign = raw_u16[2];
376 let scale = raw_u16[3];
377
378 let mut base_10_000_digits = Vec::new();
379 for i in 4..4 + base_10_000_digit_count {
380 base_10_000_digits.push(raw_u16[i as usize]);
381 }
382
383 let mut u8_digits = Vec::new();
384 for &base_10_000_digit in base_10_000_digits.iter().rev() {
385 let mut base_10_000_digit = base_10_000_digit;
386 let mut temp_result = Vec::new();
387 while base_10_000_digit > 0 {
388 temp_result.push((base_10_000_digit % 10) as u8);
389 base_10_000_digit /= 10;
390 }
391 while temp_result.len() < 4 {
392 temp_result.push(0);
393 }
394 u8_digits.extend(temp_result);
395 }
396 u8_digits.reverse();
397
398 let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
399 let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
400 u8_digits.resize(size as usize, 0);
401
402 let sign = match sign {
403 0x4000 => Sign::Minus,
404 0x0000 => Sign::Plus,
405 _ => {
406 return Err(Box::new(DataFusionError::Execution(
407 "Failed to parse big decimal from postgres numeric value".to_string(),
408 )));
409 }
410 };
411
412 let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
413 return Err(Box::new(DataFusionError::Execution(
414 "Failed to parse big decimal from postgres numeric value".to_string(),
415 )));
416 };
417 Ok(BigDecimalFromSql {
418 inner: BigDecimal::new(digits, i64::from(scale)),
419 scale,
420 })
421 }
422
423 fn accepts(ty: &Type) -> bool {
424 matches!(*ty, Type::NUMERIC)
425 }
426}
427
428#[derive(Debug)]
431struct IntervalFromSql {
432 time: i64,
433 day: i32,
434 month: i32,
435}
436
437impl<'a> FromSql<'a> for IntervalFromSql {
438 fn from_sql(
439 _ty: &Type,
440 raw: &'a [u8],
441 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
442 let mut cursor = std::io::Cursor::new(raw);
443
444 let time = cursor.read_i64::<BigEndian>()?;
445 let day = cursor.read_i32::<BigEndian>()?;
446 let month = cursor.read_i32::<BigEndian>()?;
447
448 Ok(IntervalFromSql { time, day, month })
449 }
450
451 fn accepts(ty: &Type) -> bool {
452 matches!(*ty, Type::INTERVAL)
453 }
454}
455
456struct GeometryFromSql<'a> {
457 wkb: &'a [u8],
458}
459
460impl<'a> FromSql<'a> for GeometryFromSql<'a> {
461 fn from_sql(
462 _ty: &Type,
463 raw: &'a [u8],
464 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
465 Ok(GeometryFromSql { wkb: raw })
466 }
467
468 fn accepts(ty: &Type) -> bool {
469 matches!(ty.name(), "geometry")
470 }
471}
472
473fn rows_to_batch(
474 rows: &[Row],
475 table_schema: &SchemaRef,
476 projection: Option<&Vec<usize>>,
477) -> DFResult<RecordBatch> {
478 let projected_schema = project_schema(table_schema, projection)?;
479 let mut array_builders = vec![];
480 for field in table_schema.fields() {
481 let builder = make_builder(field.data_type(), rows.len());
482 array_builders.push(builder);
483 }
484
485 for row in rows {
486 for (idx, field) in table_schema.fields.iter().enumerate() {
487 if !projections_contains(projection, idx) {
488 continue;
489 }
490 let builder = &mut array_builders[idx];
491 let col = row.columns().get(idx);
492 match field.data_type() {
493 DataType::Int16 => {
494 handle_primitive_type!(builder, field, col, Int16Builder, i16, row, idx, |v| {
495 Ok::<_, DataFusionError>(v)
496 });
497 }
498 DataType::Int32 => {
499 handle_primitive_type!(builder, field, col, Int32Builder, i32, row, idx, |v| {
500 Ok::<_, DataFusionError>(v)
501 });
502 }
503 DataType::Int64 => {
504 handle_primitive_type!(builder, field, col, Int64Builder, i64, row, idx, |v| {
505 Ok::<_, DataFusionError>(v)
506 });
507 }
508 DataType::Float32 => {
509 handle_primitive_type!(
510 builder,
511 field,
512 col,
513 Float32Builder,
514 f32,
515 row,
516 idx,
517 |v| { Ok::<_, DataFusionError>(v) }
518 );
519 }
520 DataType::Float64 => {
521 handle_primitive_type!(
522 builder,
523 field,
524 col,
525 Float64Builder,
526 f64,
527 row,
528 idx,
529 |v| { Ok::<_, DataFusionError>(v) }
530 );
531 }
532 DataType::Decimal128(_precision, _scale) => {
533 handle_primitive_type!(
534 builder,
535 field,
536 col,
537 Decimal128Builder,
538 BigDecimalFromSql,
539 row,
540 idx,
541 |v: BigDecimalFromSql| {
542 v.to_decimal_128().ok_or_else(|| {
543 DataFusionError::Execution(format!(
544 "Failed to convert BigDecimal {v:?} to i128",
545 ))
546 })
547 }
548 );
549 }
550 DataType::Utf8 => {
551 handle_primitive_type!(
552 builder,
553 field,
554 col,
555 StringBuilder,
556 &str,
557 row,
558 idx,
559 |v| { Ok::<_, DataFusionError>(v) }
560 );
561 }
562 DataType::LargeUtf8 => {
563 if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
564 handle_primitive_type!(
565 builder,
566 field,
567 col,
568 LargeStringBuilder,
569 serde_json::value::Value,
570 row,
571 idx,
572 |v: serde_json::value::Value| {
573 Ok::<_, DataFusionError>(v.to_string())
574 }
575 );
576 } else {
577 handle_primitive_type!(
578 builder,
579 field,
580 col,
581 LargeStringBuilder,
582 &str,
583 row,
584 idx,
585 |v| { Ok::<_, DataFusionError>(v) }
586 );
587 }
588 }
589 DataType::Binary => {
590 if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
591 {
592 handle_primitive_type!(
593 builder,
594 field,
595 col,
596 BinaryBuilder,
597 GeometryFromSql,
598 row,
599 idx,
600 |v: GeometryFromSql| { Ok::<_, DataFusionError>(v.wkb.to_vec()) }
601 );
602 } else if col.is_some()
603 && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
604 {
605 handle_primitive_type!(
606 builder,
607 field,
608 col,
609 BinaryBuilder,
610 serde_json::value::Value,
611 row,
612 idx,
613 |v: serde_json::value::Value| {
614 Ok::<_, DataFusionError>(v.to_string().as_bytes().to_vec())
615 }
616 );
617 } else {
618 handle_primitive_type!(
619 builder,
620 field,
621 col,
622 BinaryBuilder,
623 Vec<u8>,
624 row,
625 idx,
626 |v| { Ok::<_, DataFusionError>(v) }
627 );
628 }
629 }
630 DataType::Timestamp(TimeUnit::Microsecond, None) => {
631 handle_primitive_type!(
632 builder,
633 field,
634 col,
635 TimestampMicrosecondBuilder,
636 SystemTime,
637 row,
638 idx,
639 |v: SystemTime| {
640 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
641 let timestamp: i64 = v
642 .as_micros()
643 .try_into()
644 .expect("Failed to convert SystemTime to i64");
645 Ok(timestamp)
646 } else {
647 Err(DataFusionError::Execution(format!(
648 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
649 )))
650 }
651 }
652 );
653 }
654 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
655 handle_primitive_type!(
656 builder,
657 field,
658 col,
659 TimestampNanosecondBuilder,
660 SystemTime,
661 row,
662 idx,
663 |v: SystemTime| {
664 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
665 let timestamp: i64 = v
666 .as_nanos()
667 .try_into()
668 .expect("Failed to convert SystemTime to i64");
669 Ok(timestamp)
670 } else {
671 Err(DataFusionError::Execution(format!(
672 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
673 )))
674 }
675 }
676 );
677 }
678 DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
679 handle_primitive_type!(
680 builder,
681 field,
682 col,
683 TimestampNanosecondBuilder,
684 chrono::DateTime<chrono::Utc>,
685 row,
686 idx,
687 |v: chrono::DateTime<chrono::Utc>| {
688 let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
689 Ok::<_, DataFusionError>(timestamp)
690 }
691 );
692 }
693 DataType::Time64(TimeUnit::Microsecond) => {
694 handle_primitive_type!(
695 builder,
696 field,
697 col,
698 Time64MicrosecondBuilder,
699 chrono::NaiveTime,
700 row,
701 idx,
702 |v: chrono::NaiveTime| {
703 let seconds = i64::from(v.num_seconds_from_midnight());
704 let microseconds = i64::from(v.nanosecond()) / 1000;
705 Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
706 }
707 );
708 }
709 DataType::Time64(TimeUnit::Nanosecond) => {
710 handle_primitive_type!(
711 builder,
712 field,
713 col,
714 Time64NanosecondBuilder,
715 chrono::NaiveTime,
716 row,
717 idx,
718 |v: chrono::NaiveTime| {
719 let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
720 * 1_000_000_000
721 + i64::from(v.nanosecond());
722 Ok::<_, DataFusionError>(timestamp)
723 }
724 );
725 }
726 DataType::Date32 => {
727 handle_primitive_type!(
728 builder,
729 field,
730 col,
731 Date32Builder,
732 chrono::NaiveDate,
733 row,
734 idx,
735 |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
736 );
737 }
738 DataType::Interval(IntervalUnit::MonthDayNano) => {
739 handle_primitive_type!(
740 builder,
741 field,
742 col,
743 IntervalMonthDayNanoBuilder,
744 IntervalFromSql,
745 row,
746 idx,
747 |v: IntervalFromSql| {
748 let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
749 v.month,
750 v.day,
751 v.time * 1_000,
752 );
753 Ok::<_, DataFusionError>(interval_month_day_nano)
754 }
755 );
756 }
757 DataType::Boolean => {
758 handle_primitive_type!(
759 builder,
760 field,
761 col,
762 BooleanBuilder,
763 bool,
764 row,
765 idx,
766 |v| { Ok::<_, DataFusionError>(v) }
767 );
768 }
769 DataType::List(inner) => match inner.data_type() {
770 DataType::Int16 => {
771 handle_primitive_array_type!(builder, field, Int16Builder, i16, row, idx);
772 }
773 DataType::Int32 => {
774 handle_primitive_array_type!(builder, field, Int32Builder, i32, row, idx);
775 }
776 DataType::Int64 => {
777 handle_primitive_array_type!(builder, field, Int64Builder, i64, row, idx);
778 }
779 DataType::Float32 => {
780 handle_primitive_array_type!(builder, field, Float32Builder, f32, row, idx);
781 }
782 DataType::Float64 => {
783 handle_primitive_array_type!(builder, field, Float64Builder, f64, row, idx);
784 }
785 DataType::Utf8 => {
786 handle_primitive_array_type!(builder, field, StringBuilder, &str, row, idx);
787 }
788 DataType::Binary => {
789 handle_primitive_array_type!(
790 builder,
791 field,
792 BinaryBuilder,
793 Vec<u8>,
794 row,
795 idx
796 );
797 }
798 DataType::Boolean => {
799 handle_primitive_array_type!(
800 builder,
801 field,
802 BooleanBuilder,
803 bool,
804 row,
805 idx
806 );
807 }
808 _ => {
809 return Err(DataFusionError::NotImplemented(format!(
810 "Unsupported list data type {:?} for col: {:?}",
811 field.data_type(),
812 col
813 )));
814 }
815 },
816 _ => {
817 return Err(DataFusionError::NotImplemented(format!(
818 "Unsupported data type {:?} for col: {:?}",
819 field.data_type(),
820 col
821 )));
822 }
823 }
824 }
825 }
826 let projected_columns = array_builders
827 .into_iter()
828 .enumerate()
829 .filter(|(idx, _)| projections_contains(projection, *idx))
830 .map(|(_, mut builder)| builder.finish())
831 .collect::<Vec<ArrayRef>>();
832 Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
833}