1use crate::connection::{RemoteDbType, big_decimal_to_i128, projections_contains};
2use crate::{
3 Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4 RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13 ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14 Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15 IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
16 Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
17 TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20 DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use datafusion::prelude::Expr;
27use derive_getters::Getters;
28use derive_with::With;
29use futures::StreamExt;
30use num_bigint::{BigInt, Sign};
31use std::string::ToString;
32use std::sync::Arc;
33use std::time::{SystemTime, UNIX_EPOCH};
34
35#[derive(Debug, Clone, With, Getters)]
36pub struct PostgresConnectionOptions {
37 pub(crate) host: String,
38 pub(crate) port: u16,
39 pub(crate) username: String,
40 pub(crate) password: String,
41 pub(crate) database: Option<String>,
42 pub(crate) pool_max_size: usize,
43 pub(crate) stream_chunk_size: usize,
44}
45
46impl PostgresConnectionOptions {
47 pub fn new(
48 host: impl Into<String>,
49 port: u16,
50 username: impl Into<String>,
51 password: impl Into<String>,
52 ) -> Self {
53 Self {
54 host: host.into(),
55 port,
56 username: username.into(),
57 password: password.into(),
58 database: None,
59 pool_max_size: 10,
60 stream_chunk_size: 2048,
61 }
62 }
63}
64
65#[derive(Debug)]
66pub struct PostgresPool {
67 pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
68}
69
70#[async_trait::async_trait]
71impl Pool for PostgresPool {
72 async fn get(&self) -> DFResult<Arc<dyn Connection>> {
73 let conn = self.pool.get_owned().await.map_err(|e| {
74 DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
75 })?;
76 Ok(Arc::new(PostgresConnection { conn }))
77 }
78}
79
80pub(crate) async fn connect_postgres(
81 options: &PostgresConnectionOptions,
82) -> DFResult<PostgresPool> {
83 let mut config = bb8_postgres::tokio_postgres::config::Config::new();
84 config
85 .host(&options.host)
86 .port(options.port)
87 .user(&options.username)
88 .password(&options.password);
89 if let Some(database) = &options.database {
90 config.dbname(database);
91 }
92 let manager = PostgresConnectionManager::new(config, NoTls);
93 let pool = bb8::Pool::builder()
94 .max_size(options.pool_max_size as u32)
95 .build(manager)
96 .await
97 .map_err(|e| {
98 DataFusionError::Execution(format!(
99 "Failed to create postgres connection pool due to {e}",
100 ))
101 })?;
102
103 Ok(PostgresPool { pool })
104}
105
106#[derive(Debug)]
107pub(crate) struct PostgresConnection {
108 conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
109}
110
111#[async_trait::async_trait]
112impl Connection for PostgresConnection {
113 async fn infer_schema(&self, sql: &str) -> DFResult<(RemoteSchemaRef, SchemaRef)> {
114 let sql = RemoteDbType::Postgres
115 .try_rewrite_query(sql, &[], Some(1))
116 .unwrap_or_else(|| sql.to_string());
117 let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
118 DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
119 })?;
120 let remote_schema = Arc::new(build_remote_schema(&row)?);
121 let arrow_schema = Arc::new(remote_schema.to_arrow_schema());
122 Ok((remote_schema, arrow_schema))
123 }
124
125 async fn query(
126 &self,
127 conn_options: &ConnectionOptions,
128 sql: &str,
129 table_schema: SchemaRef,
130 projection: Option<&Vec<usize>>,
131 filters: &[Expr],
132 limit: Option<usize>,
133 ) -> DFResult<SendableRecordBatchStream> {
134 let projected_schema = project_schema(&table_schema, projection)?;
135 let sql = RemoteDbType::Postgres
136 .try_rewrite_query(sql, filters, limit)
137 .unwrap_or_else(|| sql.to_string());
138 let projection = projection.cloned();
139 let chunk_size = conn_options.stream_chunk_size();
140 let stream = self
141 .conn
142 .query_raw(&sql, Vec::<String>::new())
143 .await
144 .map_err(|e| {
145 DataFusionError::Execution(format!(
146 "Failed to execute query {sql} on postgres: {e}",
147 ))
148 })?
149 .chunks(chunk_size)
150 .boxed();
151
152 let stream = stream.map(move |rows| {
153 let rows: Vec<Row> = rows
154 .into_iter()
155 .collect::<Result<Vec<_>, _>>()
156 .map_err(|e| {
157 DataFusionError::Execution(format!(
158 "Failed to collect rows from postgres due to {e}",
159 ))
160 })?;
161 rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
162 });
163
164 Ok(Box::pin(RecordBatchStreamAdapter::new(
165 projected_schema,
166 stream,
167 )))
168 }
169}
170
171fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<RemoteType> {
172 match pg_type {
173 &Type::INT2 => Ok(RemoteType::Postgres(PostgresType::Int2)),
174 &Type::INT4 => Ok(RemoteType::Postgres(PostgresType::Int4)),
175 &Type::INT8 => Ok(RemoteType::Postgres(PostgresType::Int8)),
176 &Type::FLOAT4 => Ok(RemoteType::Postgres(PostgresType::Float4)),
177 &Type::FLOAT8 => Ok(RemoteType::Postgres(PostgresType::Float8)),
178 &Type::NUMERIC => {
179 let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
180 DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
181 })?;
182 let scale = match v {
183 Some(v) => v.scale,
184 None => 0,
185 };
186 assert!((scale as u32) <= (i8::MAX as u32));
187 Ok(RemoteType::Postgres(PostgresType::Numeric(
188 scale.try_into().unwrap_or_default(),
189 )))
190 }
191 &Type::OID => Ok(RemoteType::Postgres(PostgresType::Oid)),
192 &Type::NAME => Ok(RemoteType::Postgres(PostgresType::Name)),
193 &Type::VARCHAR => Ok(RemoteType::Postgres(PostgresType::Varchar)),
194 &Type::BPCHAR => Ok(RemoteType::Postgres(PostgresType::Bpchar)),
195 &Type::TEXT => Ok(RemoteType::Postgres(PostgresType::Text)),
196 &Type::BYTEA => Ok(RemoteType::Postgres(PostgresType::Bytea)),
197 &Type::DATE => Ok(RemoteType::Postgres(PostgresType::Date)),
198 &Type::TIMESTAMP => Ok(RemoteType::Postgres(PostgresType::Timestamp)),
199 &Type::TIMESTAMPTZ => Ok(RemoteType::Postgres(PostgresType::TimestampTz)),
200 &Type::TIME => Ok(RemoteType::Postgres(PostgresType::Time)),
201 &Type::INTERVAL => Ok(RemoteType::Postgres(PostgresType::Interval)),
202 &Type::BOOL => Ok(RemoteType::Postgres(PostgresType::Bool)),
203 &Type::JSON => Ok(RemoteType::Postgres(PostgresType::Json)),
204 &Type::JSONB => Ok(RemoteType::Postgres(PostgresType::Jsonb)),
205 &Type::INT2_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int2Array)),
206 &Type::INT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int4Array)),
207 &Type::INT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int8Array)),
208 &Type::FLOAT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float4Array)),
209 &Type::FLOAT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float8Array)),
210 &Type::VARCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::VarcharArray)),
211 &Type::BPCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::BpcharArray)),
212 &Type::TEXT_ARRAY => Ok(RemoteType::Postgres(PostgresType::TextArray)),
213 &Type::BYTEA_ARRAY => Ok(RemoteType::Postgres(PostgresType::ByteaArray)),
214 &Type::BOOL_ARRAY => Ok(RemoteType::Postgres(PostgresType::BoolArray)),
215 other if other.name().eq_ignore_ascii_case("geometry") => {
216 Ok(RemoteType::Postgres(PostgresType::PostGisGeometry))
217 }
218 _ => Err(DataFusionError::NotImplemented(format!(
219 "Unsupported postgres type {pg_type:?}",
220 ))),
221 }
222}
223
224fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
225 let mut remote_fields = vec![];
226 for (idx, col) in row.columns().iter().enumerate() {
227 remote_fields.push(RemoteField::new(
228 col.name(),
229 pg_type_to_remote_type(col.type_(), row, idx)?,
230 true,
231 ));
232 }
233 Ok(RemoteSchema::new(remote_fields))
234}
235
236macro_rules! handle_primitive_type {
237 ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
238 let builder = $builder
239 .as_any_mut()
240 .downcast_mut::<$builder_ty>()
241 .unwrap_or_else(|| {
242 panic!(
243 "Failed to downcast builder to {} for {:?} and {:?}",
244 stringify!($builder_ty),
245 $field,
246 $col
247 )
248 });
249 let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
250 DataFusionError::Execution(format!(
251 "Failed to get {} value for {:?} and {:?}: {e:?}",
252 stringify!($value_ty),
253 $field,
254 $col
255 ))
256 })?;
257
258 match v {
259 Some(v) => builder.append_value($convert(v)?),
260 None => builder.append_null(),
261 }
262 }};
263}
264
265macro_rules! handle_primitive_array_type {
266 ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
267 let builder = $builder
268 .as_any_mut()
269 .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
270 .unwrap_or_else(|| {
271 panic!(
272 "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
273 $field, $col
274 )
275 });
276 let values_builder = builder
277 .values()
278 .as_any_mut()
279 .downcast_mut::<$values_builder_ty>()
280 .unwrap_or_else(|| {
281 panic!(
282 "Failed to downcast values builder to {} for {:?} and {:?}",
283 stringify!($builder_ty),
284 $field,
285 $col,
286 )
287 });
288 let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
289 DataFusionError::Execution(format!(
290 "Failed to get {} array value for {:?} and {:?}: {e:?}",
291 stringify!($value_ty),
292 $field,
293 $col,
294 ))
295 })?;
296
297 match v {
298 Some(v) => {
299 let v = v.into_iter().map(Some);
300 values_builder.extend(v);
301 builder.append(true);
302 }
303 None => builder.append_null(),
304 }
305 }};
306}
307
308#[derive(Debug)]
309struct BigDecimalFromSql {
310 inner: BigDecimal,
311 scale: u16,
312}
313
314impl BigDecimalFromSql {
315 fn to_decimal_128(&self) -> Option<i128> {
316 big_decimal_to_i128(&self.inner, Some(self.scale as i32))
317 }
318}
319
320#[allow(clippy::cast_sign_loss)]
321#[allow(clippy::cast_possible_wrap)]
322#[allow(clippy::cast_possible_truncation)]
323impl<'a> FromSql<'a> for BigDecimalFromSql {
324 fn from_sql(
325 _ty: &Type,
326 raw: &'a [u8],
327 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
328 let raw_u16: Vec<u16> = raw
329 .chunks(2)
330 .map(|chunk| {
331 if chunk.len() == 2 {
332 u16::from_be_bytes([chunk[0], chunk[1]])
333 } else {
334 u16::from_be_bytes([chunk[0], 0])
335 }
336 })
337 .collect();
338
339 let base_10_000_digit_count = raw_u16[0];
340 let weight = raw_u16[1] as i16;
341 let sign = raw_u16[2];
342 let scale = raw_u16[3];
343
344 let mut base_10_000_digits = Vec::new();
345 for i in 4..4 + base_10_000_digit_count {
346 base_10_000_digits.push(raw_u16[i as usize]);
347 }
348
349 let mut u8_digits = Vec::new();
350 for &base_10_000_digit in base_10_000_digits.iter().rev() {
351 let mut base_10_000_digit = base_10_000_digit;
352 let mut temp_result = Vec::new();
353 while base_10_000_digit > 0 {
354 temp_result.push((base_10_000_digit % 10) as u8);
355 base_10_000_digit /= 10;
356 }
357 while temp_result.len() < 4 {
358 temp_result.push(0);
359 }
360 u8_digits.extend(temp_result);
361 }
362 u8_digits.reverse();
363
364 let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
365 let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
366 u8_digits.resize(size as usize, 0);
367
368 let sign = match sign {
369 0x4000 => Sign::Minus,
370 0x0000 => Sign::Plus,
371 _ => {
372 return Err(Box::new(DataFusionError::Execution(
373 "Failed to parse big decimal from postgres numeric value".to_string(),
374 )));
375 }
376 };
377
378 let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
379 return Err(Box::new(DataFusionError::Execution(
380 "Failed to parse big decimal from postgres numeric value".to_string(),
381 )));
382 };
383 Ok(BigDecimalFromSql {
384 inner: BigDecimal::new(digits, i64::from(scale)),
385 scale,
386 })
387 }
388
389 fn accepts(ty: &Type) -> bool {
390 matches!(*ty, Type::NUMERIC)
391 }
392}
393
394#[derive(Debug)]
397struct IntervalFromSql {
398 time: i64,
399 day: i32,
400 month: i32,
401}
402
403impl<'a> FromSql<'a> for IntervalFromSql {
404 fn from_sql(
405 _ty: &Type,
406 raw: &'a [u8],
407 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
408 let mut cursor = std::io::Cursor::new(raw);
409
410 let time = cursor.read_i64::<BigEndian>()?;
411 let day = cursor.read_i32::<BigEndian>()?;
412 let month = cursor.read_i32::<BigEndian>()?;
413
414 Ok(IntervalFromSql { time, day, month })
415 }
416
417 fn accepts(ty: &Type) -> bool {
418 matches!(*ty, Type::INTERVAL)
419 }
420}
421
422struct GeometryFromSql<'a> {
423 wkb: &'a [u8],
424}
425
426impl<'a> FromSql<'a> for GeometryFromSql<'a> {
427 fn from_sql(
428 _ty: &Type,
429 raw: &'a [u8],
430 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
431 Ok(GeometryFromSql { wkb: raw })
432 }
433
434 fn accepts(ty: &Type) -> bool {
435 matches!(ty.name(), "geometry")
436 }
437}
438
439fn rows_to_batch(
440 rows: &[Row],
441 table_schema: &SchemaRef,
442 projection: Option<&Vec<usize>>,
443) -> DFResult<RecordBatch> {
444 let projected_schema = project_schema(table_schema, projection)?;
445 let mut array_builders = vec![];
446 for field in table_schema.fields() {
447 let builder = make_builder(field.data_type(), rows.len());
448 array_builders.push(builder);
449 }
450
451 for row in rows {
452 for (idx, field) in table_schema.fields.iter().enumerate() {
453 if !projections_contains(projection, idx) {
454 continue;
455 }
456 let builder = &mut array_builders[idx];
457 let col = row.columns().get(idx);
458 match field.data_type() {
459 DataType::Int16 => {
460 handle_primitive_type!(builder, field, col, Int16Builder, i16, row, idx, |v| {
461 Ok::<_, DataFusionError>(v)
462 });
463 }
464 DataType::Int32 => {
465 handle_primitive_type!(builder, field, col, Int32Builder, i32, row, idx, |v| {
466 Ok::<_, DataFusionError>(v)
467 });
468 }
469 DataType::UInt32 => {
470 handle_primitive_type!(
471 builder,
472 field,
473 col,
474 UInt32Builder,
475 u32,
476 row,
477 idx,
478 |v| { Ok::<_, DataFusionError>(v) }
479 );
480 }
481 DataType::Int64 => {
482 handle_primitive_type!(builder, field, col, Int64Builder, i64, row, idx, |v| {
483 Ok::<_, DataFusionError>(v)
484 });
485 }
486 DataType::Float32 => {
487 handle_primitive_type!(
488 builder,
489 field,
490 col,
491 Float32Builder,
492 f32,
493 row,
494 idx,
495 |v| { Ok::<_, DataFusionError>(v) }
496 );
497 }
498 DataType::Float64 => {
499 handle_primitive_type!(
500 builder,
501 field,
502 col,
503 Float64Builder,
504 f64,
505 row,
506 idx,
507 |v| { Ok::<_, DataFusionError>(v) }
508 );
509 }
510 DataType::Decimal128(_precision, _scale) => {
511 handle_primitive_type!(
512 builder,
513 field,
514 col,
515 Decimal128Builder,
516 BigDecimalFromSql,
517 row,
518 idx,
519 |v: BigDecimalFromSql| {
520 v.to_decimal_128().ok_or_else(|| {
521 DataFusionError::Execution(format!(
522 "Failed to convert BigDecimal {v:?} to i128",
523 ))
524 })
525 }
526 );
527 }
528 DataType::Utf8 => {
529 handle_primitive_type!(
530 builder,
531 field,
532 col,
533 StringBuilder,
534 &str,
535 row,
536 idx,
537 |v| { Ok::<_, DataFusionError>(v) }
538 );
539 }
540 DataType::LargeUtf8 => {
541 if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
542 handle_primitive_type!(
543 builder,
544 field,
545 col,
546 LargeStringBuilder,
547 serde_json::value::Value,
548 row,
549 idx,
550 |v: serde_json::value::Value| {
551 Ok::<_, DataFusionError>(v.to_string())
552 }
553 );
554 } else {
555 handle_primitive_type!(
556 builder,
557 field,
558 col,
559 LargeStringBuilder,
560 &str,
561 row,
562 idx,
563 |v| { Ok::<_, DataFusionError>(v) }
564 );
565 }
566 }
567 DataType::Binary => {
568 if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
569 {
570 let convert: for<'a> fn(GeometryFromSql<'a>) -> DFResult<&'a [u8]> =
571 |v| Ok(v.wkb);
572 handle_primitive_type!(
573 builder,
574 field,
575 col,
576 BinaryBuilder,
577 GeometryFromSql,
578 row,
579 idx,
580 convert
581 );
582 } else if col.is_some()
583 && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
584 {
585 handle_primitive_type!(
586 builder,
587 field,
588 col,
589 BinaryBuilder,
590 serde_json::value::Value,
591 row,
592 idx,
593 |v: serde_json::value::Value| {
594 Ok::<_, DataFusionError>(v.to_string().into_bytes())
595 }
596 );
597 } else {
598 handle_primitive_type!(
599 builder,
600 field,
601 col,
602 BinaryBuilder,
603 Vec<u8>,
604 row,
605 idx,
606 |v| { Ok::<_, DataFusionError>(v) }
607 );
608 }
609 }
610 DataType::Timestamp(TimeUnit::Microsecond, None) => {
611 handle_primitive_type!(
612 builder,
613 field,
614 col,
615 TimestampMicrosecondBuilder,
616 SystemTime,
617 row,
618 idx,
619 |v: SystemTime| {
620 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
621 let timestamp: i64 = v
622 .as_micros()
623 .try_into()
624 .expect("Failed to convert SystemTime to i64");
625 Ok(timestamp)
626 } else {
627 Err(DataFusionError::Execution(format!(
628 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
629 )))
630 }
631 }
632 );
633 }
634 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
635 handle_primitive_type!(
636 builder,
637 field,
638 col,
639 TimestampNanosecondBuilder,
640 SystemTime,
641 row,
642 idx,
643 |v: SystemTime| {
644 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
645 let timestamp: i64 = v
646 .as_nanos()
647 .try_into()
648 .expect("Failed to convert SystemTime to i64");
649 Ok(timestamp)
650 } else {
651 Err(DataFusionError::Execution(format!(
652 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
653 )))
654 }
655 }
656 );
657 }
658 DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
659 handle_primitive_type!(
660 builder,
661 field,
662 col,
663 TimestampNanosecondBuilder,
664 chrono::DateTime<chrono::Utc>,
665 row,
666 idx,
667 |v: chrono::DateTime<chrono::Utc>| {
668 let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
669 Ok::<_, DataFusionError>(timestamp)
670 }
671 );
672 }
673 DataType::Time64(TimeUnit::Microsecond) => {
674 handle_primitive_type!(
675 builder,
676 field,
677 col,
678 Time64MicrosecondBuilder,
679 chrono::NaiveTime,
680 row,
681 idx,
682 |v: chrono::NaiveTime| {
683 let seconds = i64::from(v.num_seconds_from_midnight());
684 let microseconds = i64::from(v.nanosecond()) / 1000;
685 Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
686 }
687 );
688 }
689 DataType::Time64(TimeUnit::Nanosecond) => {
690 handle_primitive_type!(
691 builder,
692 field,
693 col,
694 Time64NanosecondBuilder,
695 chrono::NaiveTime,
696 row,
697 idx,
698 |v: chrono::NaiveTime| {
699 let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
700 * 1_000_000_000
701 + i64::from(v.nanosecond());
702 Ok::<_, DataFusionError>(timestamp)
703 }
704 );
705 }
706 DataType::Date32 => {
707 handle_primitive_type!(
708 builder,
709 field,
710 col,
711 Date32Builder,
712 chrono::NaiveDate,
713 row,
714 idx,
715 |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
716 );
717 }
718 DataType::Interval(IntervalUnit::MonthDayNano) => {
719 handle_primitive_type!(
720 builder,
721 field,
722 col,
723 IntervalMonthDayNanoBuilder,
724 IntervalFromSql,
725 row,
726 idx,
727 |v: IntervalFromSql| {
728 let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
729 v.month,
730 v.day,
731 v.time * 1_000,
732 );
733 Ok::<_, DataFusionError>(interval_month_day_nano)
734 }
735 );
736 }
737 DataType::Boolean => {
738 handle_primitive_type!(
739 builder,
740 field,
741 col,
742 BooleanBuilder,
743 bool,
744 row,
745 idx,
746 |v| { Ok::<_, DataFusionError>(v) }
747 );
748 }
749 DataType::List(inner) => match inner.data_type() {
750 DataType::Int16 => {
751 handle_primitive_array_type!(
752 builder,
753 field,
754 col,
755 Int16Builder,
756 i16,
757 row,
758 idx
759 );
760 }
761 DataType::Int32 => {
762 handle_primitive_array_type!(
763 builder,
764 field,
765 col,
766 Int32Builder,
767 i32,
768 row,
769 idx
770 );
771 }
772 DataType::Int64 => {
773 handle_primitive_array_type!(
774 builder,
775 field,
776 col,
777 Int64Builder,
778 i64,
779 row,
780 idx
781 );
782 }
783 DataType::Float32 => {
784 handle_primitive_array_type!(
785 builder,
786 field,
787 col,
788 Float32Builder,
789 f32,
790 row,
791 idx
792 );
793 }
794 DataType::Float64 => {
795 handle_primitive_array_type!(
796 builder,
797 field,
798 col,
799 Float64Builder,
800 f64,
801 row,
802 idx
803 );
804 }
805 DataType::Utf8 => {
806 handle_primitive_array_type!(
807 builder,
808 field,
809 col,
810 StringBuilder,
811 &str,
812 row,
813 idx
814 );
815 }
816 DataType::Binary => {
817 handle_primitive_array_type!(
818 builder,
819 field,
820 col,
821 BinaryBuilder,
822 Vec<u8>,
823 row,
824 idx
825 );
826 }
827 DataType::Boolean => {
828 handle_primitive_array_type!(
829 builder,
830 field,
831 col,
832 BooleanBuilder,
833 bool,
834 row,
835 idx
836 );
837 }
838 _ => {
839 return Err(DataFusionError::NotImplemented(format!(
840 "Unsupported list data type {:?} for col: {:?}",
841 field.data_type(),
842 col
843 )));
844 }
845 },
846 _ => {
847 return Err(DataFusionError::NotImplemented(format!(
848 "Unsupported data type {:?} for col: {:?}",
849 field.data_type(),
850 col
851 )));
852 }
853 }
854 }
855 }
856 let projected_columns = array_builders
857 .into_iter()
858 .enumerate()
859 .filter(|(idx, _)| projections_contains(projection, *idx))
860 .map(|(_, mut builder)| builder.finish())
861 .collect::<Vec<ArrayRef>>();
862 Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
863}