1use crate::connection::{big_decimal_to_i128, projections_contains};
2use crate::{
3 Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4 RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13 ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14 Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15 IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
16 Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
17 TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20 DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::limit::LimitStream;
26use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
27use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
28use derive_getters::Getters;
29use derive_with::With;
30use futures::StreamExt;
31use num_bigint::{BigInt, Sign};
32use std::string::ToString;
33use std::sync::Arc;
34use std::time::{SystemTime, UNIX_EPOCH};
35
36#[derive(Debug, Clone, With, Getters)]
37pub struct PostgresConnectionOptions {
38 pub(crate) host: String,
39 pub(crate) port: u16,
40 pub(crate) username: String,
41 pub(crate) password: String,
42 pub(crate) database: Option<String>,
43 pub(crate) pool_max_size: usize,
44 pub(crate) stream_chunk_size: usize,
45}
46
47impl PostgresConnectionOptions {
48 pub fn new(
49 host: impl Into<String>,
50 port: u16,
51 username: impl Into<String>,
52 password: impl Into<String>,
53 ) -> Self {
54 Self {
55 host: host.into(),
56 port,
57 username: username.into(),
58 password: password.into(),
59 database: None,
60 pool_max_size: 10,
61 stream_chunk_size: 2048,
62 }
63 }
64}
65
66#[derive(Debug)]
67pub struct PostgresPool {
68 pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
69}
70
71#[async_trait::async_trait]
72impl Pool for PostgresPool {
73 async fn get(&self) -> DFResult<Arc<dyn Connection>> {
74 let conn = self.pool.get_owned().await.map_err(|e| {
75 DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
76 })?;
77 Ok(Arc::new(PostgresConnection { conn }))
78 }
79}
80
81pub(crate) async fn connect_postgres(
82 options: &PostgresConnectionOptions,
83) -> DFResult<PostgresPool> {
84 let mut config = bb8_postgres::tokio_postgres::config::Config::new();
85 config
86 .host(&options.host)
87 .port(options.port)
88 .user(&options.username)
89 .password(&options.password);
90 if let Some(database) = &options.database {
91 config.dbname(database);
92 }
93 let manager = PostgresConnectionManager::new(config, NoTls);
94 let pool = bb8::Pool::builder()
95 .max_size(options.pool_max_size as u32)
96 .build(manager)
97 .await
98 .map_err(|e| {
99 DataFusionError::Execution(format!(
100 "Failed to create postgres connection pool due to {e}",
101 ))
102 })?;
103
104 Ok(PostgresPool { pool })
105}
106
107#[derive(Debug)]
108pub(crate) struct PostgresConnection {
109 conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
110}
111
112#[async_trait::async_trait]
113impl Connection for PostgresConnection {
114 async fn infer_schema(&self, sql: &str) -> DFResult<(RemoteSchemaRef, SchemaRef)> {
115 let sql = try_limit_query(sql, 1).unwrap_or_else(|| sql.to_string());
116 let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
117 DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
118 })?;
119 let remote_schema = Arc::new(build_remote_schema(&row)?);
120 let arrow_schema = Arc::new(remote_schema.to_arrow_schema());
121 Ok((remote_schema, arrow_schema))
122 }
123
124 async fn query(
125 &self,
126 conn_options: &ConnectionOptions,
127 sql: &str,
128 table_schema: SchemaRef,
129 projection: Option<&Vec<usize>>,
130 limit: Option<usize>,
131 ) -> DFResult<SendableRecordBatchStream> {
132 let projected_schema = project_schema(&table_schema, projection)?;
133 let (sql, limit_stream) = match limit {
134 Some(limit) => {
135 if let Some(limited_sql) = try_limit_query(sql, limit) {
136 (limited_sql, false)
137 } else {
138 (sql.to_string(), true)
139 }
140 }
141 None => (sql.to_string(), false),
142 };
143 let projection = projection.cloned();
144 let chunk_size = conn_options.stream_chunk_size();
145 let stream = self
146 .conn
147 .query_raw(&sql, Vec::<String>::new())
148 .await
149 .map_err(|e| {
150 DataFusionError::Execution(format!(
151 "Failed to execute query {sql} on postgres due to {e}",
152 ))
153 })?
154 .chunks(chunk_size)
155 .boxed();
156
157 let stream = stream.map(move |rows| {
158 let rows: Vec<Row> = rows
159 .into_iter()
160 .collect::<Result<Vec<_>, _>>()
161 .map_err(|e| {
162 DataFusionError::Execution(format!(
163 "Failed to collect rows from postgres due to {e}",
164 ))
165 })?;
166 rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
167 });
168
169 let sendable_stream = Box::pin(RecordBatchStreamAdapter::new(projected_schema, stream));
170
171 if limit_stream {
172 let metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
173 Ok(Box::pin(LimitStream::new(
174 sendable_stream,
175 0,
176 limit,
177 metrics,
178 )))
179 } else {
180 Ok(sendable_stream)
181 }
182 }
183}
184
185fn try_limit_query(sql: &str, limit: usize) -> Option<String> {
186 if sql.trim()[0..6].eq_ignore_ascii_case("select") {
187 Some(format!("SELECT * FROM ({sql}) as __subquery LIMIT {limit}"))
188 } else {
189 None
190 }
191}
192
193fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<RemoteType> {
194 match pg_type {
195 &Type::INT2 => Ok(RemoteType::Postgres(PostgresType::Int2)),
196 &Type::INT4 => Ok(RemoteType::Postgres(PostgresType::Int4)),
197 &Type::INT8 => Ok(RemoteType::Postgres(PostgresType::Int8)),
198 &Type::FLOAT4 => Ok(RemoteType::Postgres(PostgresType::Float4)),
199 &Type::FLOAT8 => Ok(RemoteType::Postgres(PostgresType::Float8)),
200 &Type::NUMERIC => {
201 let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
202 DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
203 })?;
204 let scale = match v {
205 Some(v) => v.scale,
206 None => 0,
207 };
208 assert!((scale as u32) <= (i8::MAX as u32));
209 Ok(RemoteType::Postgres(PostgresType::Numeric(
210 scale.try_into().unwrap_or_default(),
211 )))
212 }
213 &Type::OID => Ok(RemoteType::Postgres(PostgresType::Oid)),
214 &Type::NAME => Ok(RemoteType::Postgres(PostgresType::Name)),
215 &Type::VARCHAR => Ok(RemoteType::Postgres(PostgresType::Varchar)),
216 &Type::BPCHAR => Ok(RemoteType::Postgres(PostgresType::Bpchar)),
217 &Type::TEXT => Ok(RemoteType::Postgres(PostgresType::Text)),
218 &Type::BYTEA => Ok(RemoteType::Postgres(PostgresType::Bytea)),
219 &Type::DATE => Ok(RemoteType::Postgres(PostgresType::Date)),
220 &Type::TIMESTAMP => Ok(RemoteType::Postgres(PostgresType::Timestamp)),
221 &Type::TIMESTAMPTZ => Ok(RemoteType::Postgres(PostgresType::TimestampTz)),
222 &Type::TIME => Ok(RemoteType::Postgres(PostgresType::Time)),
223 &Type::INTERVAL => Ok(RemoteType::Postgres(PostgresType::Interval)),
224 &Type::BOOL => Ok(RemoteType::Postgres(PostgresType::Bool)),
225 &Type::JSON => Ok(RemoteType::Postgres(PostgresType::Json)),
226 &Type::JSONB => Ok(RemoteType::Postgres(PostgresType::Jsonb)),
227 &Type::INT2_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int2Array)),
228 &Type::INT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int4Array)),
229 &Type::INT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int8Array)),
230 &Type::FLOAT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float4Array)),
231 &Type::FLOAT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float8Array)),
232 &Type::VARCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::VarcharArray)),
233 &Type::BPCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::BpcharArray)),
234 &Type::TEXT_ARRAY => Ok(RemoteType::Postgres(PostgresType::TextArray)),
235 &Type::BYTEA_ARRAY => Ok(RemoteType::Postgres(PostgresType::ByteaArray)),
236 &Type::BOOL_ARRAY => Ok(RemoteType::Postgres(PostgresType::BoolArray)),
237 other if other.name().eq_ignore_ascii_case("geometry") => {
238 Ok(RemoteType::Postgres(PostgresType::PostGisGeometry))
239 }
240 _ => Err(DataFusionError::NotImplemented(format!(
241 "Unsupported postgres type {pg_type:?}",
242 ))),
243 }
244}
245
246fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
247 let mut remote_fields = vec![];
248 for (idx, col) in row.columns().iter().enumerate() {
249 remote_fields.push(RemoteField::new(
250 col.name(),
251 pg_type_to_remote_type(col.type_(), row, idx)?,
252 true,
253 ));
254 }
255 Ok(RemoteSchema::new(remote_fields))
256}
257
258macro_rules! handle_primitive_type {
259 ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
260 let builder = $builder
261 .as_any_mut()
262 .downcast_mut::<$builder_ty>()
263 .unwrap_or_else(|| {
264 panic!(
265 "Failed to downcast builder to {} for {:?} and {:?}",
266 stringify!($builder_ty),
267 $field,
268 $col
269 )
270 });
271 let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
272 DataFusionError::Execution(format!(
273 "Failed to get {} value for {:?} and {:?}: {e:?}",
274 stringify!($value_ty),
275 $field,
276 $col
277 ))
278 })?;
279
280 match v {
281 Some(v) => builder.append_value($convert(v)?),
282 None => builder.append_null(),
283 }
284 }};
285}
286
287macro_rules! handle_primitive_array_type {
288 ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
289 let builder = $builder
290 .as_any_mut()
291 .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
292 .unwrap_or_else(|| {
293 panic!(
294 "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
295 $field, $col
296 )
297 });
298 let values_builder = builder
299 .values()
300 .as_any_mut()
301 .downcast_mut::<$values_builder_ty>()
302 .unwrap_or_else(|| {
303 panic!(
304 "Failed to downcast values builder to {} for {:?} and {:?}",
305 stringify!($builder_ty),
306 $field,
307 $col,
308 )
309 });
310 let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
311 DataFusionError::Execution(format!(
312 "Failed to get {} array value for {:?} and {:?}: {e:?}",
313 stringify!($value_ty),
314 $field,
315 $col,
316 ))
317 })?;
318
319 match v {
320 Some(v) => {
321 let v = v.into_iter().map(Some);
322 values_builder.extend(v);
323 builder.append(true);
324 }
325 None => builder.append_null(),
326 }
327 }};
328}
329
330#[derive(Debug)]
331struct BigDecimalFromSql {
332 inner: BigDecimal,
333 scale: u16,
334}
335
336impl BigDecimalFromSql {
337 fn to_decimal_128(&self) -> Option<i128> {
338 big_decimal_to_i128(&self.inner, Some(self.scale as i32))
339 }
340}
341
342#[allow(clippy::cast_sign_loss)]
343#[allow(clippy::cast_possible_wrap)]
344#[allow(clippy::cast_possible_truncation)]
345impl<'a> FromSql<'a> for BigDecimalFromSql {
346 fn from_sql(
347 _ty: &Type,
348 raw: &'a [u8],
349 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
350 let raw_u16: Vec<u16> = raw
351 .chunks(2)
352 .map(|chunk| {
353 if chunk.len() == 2 {
354 u16::from_be_bytes([chunk[0], chunk[1]])
355 } else {
356 u16::from_be_bytes([chunk[0], 0])
357 }
358 })
359 .collect();
360
361 let base_10_000_digit_count = raw_u16[0];
362 let weight = raw_u16[1] as i16;
363 let sign = raw_u16[2];
364 let scale = raw_u16[3];
365
366 let mut base_10_000_digits = Vec::new();
367 for i in 4..4 + base_10_000_digit_count {
368 base_10_000_digits.push(raw_u16[i as usize]);
369 }
370
371 let mut u8_digits = Vec::new();
372 for &base_10_000_digit in base_10_000_digits.iter().rev() {
373 let mut base_10_000_digit = base_10_000_digit;
374 let mut temp_result = Vec::new();
375 while base_10_000_digit > 0 {
376 temp_result.push((base_10_000_digit % 10) as u8);
377 base_10_000_digit /= 10;
378 }
379 while temp_result.len() < 4 {
380 temp_result.push(0);
381 }
382 u8_digits.extend(temp_result);
383 }
384 u8_digits.reverse();
385
386 let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
387 let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
388 u8_digits.resize(size as usize, 0);
389
390 let sign = match sign {
391 0x4000 => Sign::Minus,
392 0x0000 => Sign::Plus,
393 _ => {
394 return Err(Box::new(DataFusionError::Execution(
395 "Failed to parse big decimal from postgres numeric value".to_string(),
396 )));
397 }
398 };
399
400 let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
401 return Err(Box::new(DataFusionError::Execution(
402 "Failed to parse big decimal from postgres numeric value".to_string(),
403 )));
404 };
405 Ok(BigDecimalFromSql {
406 inner: BigDecimal::new(digits, i64::from(scale)),
407 scale,
408 })
409 }
410
411 fn accepts(ty: &Type) -> bool {
412 matches!(*ty, Type::NUMERIC)
413 }
414}
415
416#[derive(Debug)]
419struct IntervalFromSql {
420 time: i64,
421 day: i32,
422 month: i32,
423}
424
425impl<'a> FromSql<'a> for IntervalFromSql {
426 fn from_sql(
427 _ty: &Type,
428 raw: &'a [u8],
429 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
430 let mut cursor = std::io::Cursor::new(raw);
431
432 let time = cursor.read_i64::<BigEndian>()?;
433 let day = cursor.read_i32::<BigEndian>()?;
434 let month = cursor.read_i32::<BigEndian>()?;
435
436 Ok(IntervalFromSql { time, day, month })
437 }
438
439 fn accepts(ty: &Type) -> bool {
440 matches!(*ty, Type::INTERVAL)
441 }
442}
443
444struct GeometryFromSql<'a> {
445 wkb: &'a [u8],
446}
447
448impl<'a> FromSql<'a> for GeometryFromSql<'a> {
449 fn from_sql(
450 _ty: &Type,
451 raw: &'a [u8],
452 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
453 Ok(GeometryFromSql { wkb: raw })
454 }
455
456 fn accepts(ty: &Type) -> bool {
457 matches!(ty.name(), "geometry")
458 }
459}
460
461fn rows_to_batch(
462 rows: &[Row],
463 table_schema: &SchemaRef,
464 projection: Option<&Vec<usize>>,
465) -> DFResult<RecordBatch> {
466 let projected_schema = project_schema(table_schema, projection)?;
467 let mut array_builders = vec![];
468 for field in table_schema.fields() {
469 let builder = make_builder(field.data_type(), rows.len());
470 array_builders.push(builder);
471 }
472
473 for row in rows {
474 for (idx, field) in table_schema.fields.iter().enumerate() {
475 if !projections_contains(projection, idx) {
476 continue;
477 }
478 let builder = &mut array_builders[idx];
479 let col = row.columns().get(idx);
480 match field.data_type() {
481 DataType::Int16 => {
482 handle_primitive_type!(builder, field, col, Int16Builder, i16, row, idx, |v| {
483 Ok::<_, DataFusionError>(v)
484 });
485 }
486 DataType::Int32 => {
487 handle_primitive_type!(builder, field, col, Int32Builder, i32, row, idx, |v| {
488 Ok::<_, DataFusionError>(v)
489 });
490 }
491 DataType::UInt32 => {
492 handle_primitive_type!(
493 builder,
494 field,
495 col,
496 UInt32Builder,
497 u32,
498 row,
499 idx,
500 |v| { Ok::<_, DataFusionError>(v) }
501 );
502 }
503 DataType::Int64 => {
504 handle_primitive_type!(builder, field, col, Int64Builder, i64, row, idx, |v| {
505 Ok::<_, DataFusionError>(v)
506 });
507 }
508 DataType::Float32 => {
509 handle_primitive_type!(
510 builder,
511 field,
512 col,
513 Float32Builder,
514 f32,
515 row,
516 idx,
517 |v| { Ok::<_, DataFusionError>(v) }
518 );
519 }
520 DataType::Float64 => {
521 handle_primitive_type!(
522 builder,
523 field,
524 col,
525 Float64Builder,
526 f64,
527 row,
528 idx,
529 |v| { Ok::<_, DataFusionError>(v) }
530 );
531 }
532 DataType::Decimal128(_precision, _scale) => {
533 handle_primitive_type!(
534 builder,
535 field,
536 col,
537 Decimal128Builder,
538 BigDecimalFromSql,
539 row,
540 idx,
541 |v: BigDecimalFromSql| {
542 v.to_decimal_128().ok_or_else(|| {
543 DataFusionError::Execution(format!(
544 "Failed to convert BigDecimal {v:?} to i128",
545 ))
546 })
547 }
548 );
549 }
550 DataType::Utf8 => {
551 handle_primitive_type!(
552 builder,
553 field,
554 col,
555 StringBuilder,
556 &str,
557 row,
558 idx,
559 |v| { Ok::<_, DataFusionError>(v) }
560 );
561 }
562 DataType::LargeUtf8 => {
563 if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
564 handle_primitive_type!(
565 builder,
566 field,
567 col,
568 LargeStringBuilder,
569 serde_json::value::Value,
570 row,
571 idx,
572 |v: serde_json::value::Value| {
573 Ok::<_, DataFusionError>(v.to_string())
574 }
575 );
576 } else {
577 handle_primitive_type!(
578 builder,
579 field,
580 col,
581 LargeStringBuilder,
582 &str,
583 row,
584 idx,
585 |v| { Ok::<_, DataFusionError>(v) }
586 );
587 }
588 }
589 DataType::Binary => {
590 if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
591 {
592 handle_primitive_type!(
593 builder,
594 field,
595 col,
596 BinaryBuilder,
597 GeometryFromSql,
598 row,
599 idx,
600 |v: GeometryFromSql| { Ok::<_, DataFusionError>(v.wkb.to_vec()) }
601 );
602 } else if col.is_some()
603 && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
604 {
605 handle_primitive_type!(
606 builder,
607 field,
608 col,
609 BinaryBuilder,
610 serde_json::value::Value,
611 row,
612 idx,
613 |v: serde_json::value::Value| {
614 Ok::<_, DataFusionError>(v.to_string().as_bytes().to_vec())
615 }
616 );
617 } else {
618 handle_primitive_type!(
619 builder,
620 field,
621 col,
622 BinaryBuilder,
623 Vec<u8>,
624 row,
625 idx,
626 |v| { Ok::<_, DataFusionError>(v) }
627 );
628 }
629 }
630 DataType::Timestamp(TimeUnit::Microsecond, None) => {
631 handle_primitive_type!(
632 builder,
633 field,
634 col,
635 TimestampMicrosecondBuilder,
636 SystemTime,
637 row,
638 idx,
639 |v: SystemTime| {
640 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
641 let timestamp: i64 = v
642 .as_micros()
643 .try_into()
644 .expect("Failed to convert SystemTime to i64");
645 Ok(timestamp)
646 } else {
647 Err(DataFusionError::Execution(format!(
648 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
649 )))
650 }
651 }
652 );
653 }
654 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
655 handle_primitive_type!(
656 builder,
657 field,
658 col,
659 TimestampNanosecondBuilder,
660 SystemTime,
661 row,
662 idx,
663 |v: SystemTime| {
664 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
665 let timestamp: i64 = v
666 .as_nanos()
667 .try_into()
668 .expect("Failed to convert SystemTime to i64");
669 Ok(timestamp)
670 } else {
671 Err(DataFusionError::Execution(format!(
672 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
673 )))
674 }
675 }
676 );
677 }
678 DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
679 handle_primitive_type!(
680 builder,
681 field,
682 col,
683 TimestampNanosecondBuilder,
684 chrono::DateTime<chrono::Utc>,
685 row,
686 idx,
687 |v: chrono::DateTime<chrono::Utc>| {
688 let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
689 Ok::<_, DataFusionError>(timestamp)
690 }
691 );
692 }
693 DataType::Time64(TimeUnit::Microsecond) => {
694 handle_primitive_type!(
695 builder,
696 field,
697 col,
698 Time64MicrosecondBuilder,
699 chrono::NaiveTime,
700 row,
701 idx,
702 |v: chrono::NaiveTime| {
703 let seconds = i64::from(v.num_seconds_from_midnight());
704 let microseconds = i64::from(v.nanosecond()) / 1000;
705 Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
706 }
707 );
708 }
709 DataType::Time64(TimeUnit::Nanosecond) => {
710 handle_primitive_type!(
711 builder,
712 field,
713 col,
714 Time64NanosecondBuilder,
715 chrono::NaiveTime,
716 row,
717 idx,
718 |v: chrono::NaiveTime| {
719 let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
720 * 1_000_000_000
721 + i64::from(v.nanosecond());
722 Ok::<_, DataFusionError>(timestamp)
723 }
724 );
725 }
726 DataType::Date32 => {
727 handle_primitive_type!(
728 builder,
729 field,
730 col,
731 Date32Builder,
732 chrono::NaiveDate,
733 row,
734 idx,
735 |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
736 );
737 }
738 DataType::Interval(IntervalUnit::MonthDayNano) => {
739 handle_primitive_type!(
740 builder,
741 field,
742 col,
743 IntervalMonthDayNanoBuilder,
744 IntervalFromSql,
745 row,
746 idx,
747 |v: IntervalFromSql| {
748 let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
749 v.month,
750 v.day,
751 v.time * 1_000,
752 );
753 Ok::<_, DataFusionError>(interval_month_day_nano)
754 }
755 );
756 }
757 DataType::Boolean => {
758 handle_primitive_type!(
759 builder,
760 field,
761 col,
762 BooleanBuilder,
763 bool,
764 row,
765 idx,
766 |v| { Ok::<_, DataFusionError>(v) }
767 );
768 }
769 DataType::List(inner) => match inner.data_type() {
770 DataType::Int16 => {
771 handle_primitive_array_type!(
772 builder,
773 field,
774 col,
775 Int16Builder,
776 i16,
777 row,
778 idx
779 );
780 }
781 DataType::Int32 => {
782 handle_primitive_array_type!(
783 builder,
784 field,
785 col,
786 Int32Builder,
787 i32,
788 row,
789 idx
790 );
791 }
792 DataType::Int64 => {
793 handle_primitive_array_type!(
794 builder,
795 field,
796 col,
797 Int64Builder,
798 i64,
799 row,
800 idx
801 );
802 }
803 DataType::Float32 => {
804 handle_primitive_array_type!(
805 builder,
806 field,
807 col,
808 Float32Builder,
809 f32,
810 row,
811 idx
812 );
813 }
814 DataType::Float64 => {
815 handle_primitive_array_type!(
816 builder,
817 field,
818 col,
819 Float64Builder,
820 f64,
821 row,
822 idx
823 );
824 }
825 DataType::Utf8 => {
826 handle_primitive_array_type!(
827 builder,
828 field,
829 col,
830 StringBuilder,
831 &str,
832 row,
833 idx
834 );
835 }
836 DataType::Binary => {
837 handle_primitive_array_type!(
838 builder,
839 field,
840 col,
841 BinaryBuilder,
842 Vec<u8>,
843 row,
844 idx
845 );
846 }
847 DataType::Boolean => {
848 handle_primitive_array_type!(
849 builder,
850 field,
851 col,
852 BooleanBuilder,
853 bool,
854 row,
855 idx
856 );
857 }
858 _ => {
859 return Err(DataFusionError::NotImplemented(format!(
860 "Unsupported list data type {:?} for col: {:?}",
861 field.data_type(),
862 col
863 )));
864 }
865 },
866 _ => {
867 return Err(DataFusionError::NotImplemented(format!(
868 "Unsupported data type {:?} for col: {:?}",
869 field.data_type(),
870 col
871 )));
872 }
873 }
874 }
875 }
876 let projected_columns = array_builders
877 .into_iter()
878 .enumerate()
879 .filter(|(idx, _)| projections_contains(projection, *idx))
880 .map(|(_, mut builder)| builder.finish())
881 .collect::<Vec<ArrayRef>>();
882 Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
883}