1use crate::connection::{RemoteDbType, big_decimal_to_i128, just_return, projections_contains};
2use crate::{
3 Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4 RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13 ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14 FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int16Builder, Int32Builder,
15 Int64Builder, IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch,
16 RecordBatchOptions, StringBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder,
17 TimestampMicrosecondBuilder, TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20 DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use derive_getters::Getters;
27use derive_with::With;
28use futures::StreamExt;
29use log::debug;
30use num_bigint::{BigInt, Sign};
31use std::any::Any;
32use std::string::ToString;
33use std::sync::Arc;
34use std::time::{SystemTime, UNIX_EPOCH};
35use uuid::Uuid;
36
37#[derive(Debug, Clone, With, Getters)]
38pub struct PostgresConnectionOptions {
39 pub(crate) host: String,
40 pub(crate) port: u16,
41 pub(crate) username: String,
42 pub(crate) password: String,
43 pub(crate) database: Option<String>,
44 pub(crate) pool_max_size: usize,
45 pub(crate) stream_chunk_size: usize,
46}
47
48impl PostgresConnectionOptions {
49 pub fn new(
50 host: impl Into<String>,
51 port: u16,
52 username: impl Into<String>,
53 password: impl Into<String>,
54 ) -> Self {
55 Self {
56 host: host.into(),
57 port,
58 username: username.into(),
59 password: password.into(),
60 database: None,
61 pool_max_size: 10,
62 stream_chunk_size: 2048,
63 }
64 }
65}
66
67impl From<PostgresConnectionOptions> for ConnectionOptions {
68 fn from(options: PostgresConnectionOptions) -> Self {
69 ConnectionOptions::Postgres(options)
70 }
71}
72
73#[derive(Debug)]
74pub struct PostgresPool {
75 pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
76}
77
78#[async_trait::async_trait]
79impl Pool for PostgresPool {
80 async fn get(&self) -> DFResult<Arc<dyn Connection>> {
81 let conn = self.pool.get_owned().await.map_err(|e| {
82 DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
83 })?;
84 Ok(Arc::new(PostgresConnection { conn }))
85 }
86}
87
88pub(crate) async fn connect_postgres(
89 options: &PostgresConnectionOptions,
90) -> DFResult<PostgresPool> {
91 let mut config = bb8_postgres::tokio_postgres::config::Config::new();
92 config
93 .host(&options.host)
94 .port(options.port)
95 .user(&options.username)
96 .password(&options.password);
97 if let Some(database) = &options.database {
98 config.dbname(database);
99 }
100 let manager = PostgresConnectionManager::new(config, NoTls);
101 let pool = bb8::Pool::builder()
102 .max_size(options.pool_max_size as u32)
103 .build(manager)
104 .await
105 .map_err(|e| {
106 DataFusionError::Execution(format!(
107 "Failed to create postgres connection pool due to {e}",
108 ))
109 })?;
110
111 Ok(PostgresPool { pool })
112}
113
114#[derive(Debug)]
115pub(crate) struct PostgresConnection {
116 conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
117}
118
119#[async_trait::async_trait]
120impl Connection for PostgresConnection {
121 fn as_any(&self) -> &dyn Any {
122 self
123 }
124
125 async fn infer_schema(&self, sql: &str) -> DFResult<RemoteSchemaRef> {
126 let sql = RemoteDbType::Postgres.query_limit_1(sql);
127 let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
128 DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
129 })?;
130 let remote_schema = Arc::new(build_remote_schema(&row)?);
131 Ok(remote_schema)
132 }
133
134 async fn query(
135 &self,
136 conn_options: &ConnectionOptions,
137 sql: &str,
138 table_schema: SchemaRef,
139 projection: Option<&Vec<usize>>,
140 unparsed_filters: &[String],
141 limit: Option<usize>,
142 ) -> DFResult<SendableRecordBatchStream> {
143 let projected_schema = project_schema(&table_schema, projection)?;
144
145 let sql = RemoteDbType::Postgres.rewrite_query(sql, unparsed_filters, limit);
146 debug!("[remote-table] executing postgres query: {sql}");
147
148 let projection = projection.cloned();
149 let chunk_size = conn_options.stream_chunk_size();
150 let stream = self
151 .conn
152 .query_raw(&sql, Vec::<String>::new())
153 .await
154 .map_err(|e| {
155 DataFusionError::Execution(format!(
156 "Failed to execute query {sql} on postgres: {e}",
157 ))
158 })?
159 .chunks(chunk_size)
160 .boxed();
161
162 let stream = stream.map(move |rows| {
163 let rows: Vec<Row> = rows
164 .into_iter()
165 .collect::<Result<Vec<_>, _>>()
166 .map_err(|e| {
167 DataFusionError::Execution(format!(
168 "Failed to collect rows from postgres due to {e}",
169 ))
170 })?;
171 rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
172 });
173
174 Ok(Box::pin(RecordBatchStreamAdapter::new(
175 projected_schema,
176 stream,
177 )))
178 }
179}
180
181fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<PostgresType> {
182 match pg_type {
183 &Type::INT2 => Ok(PostgresType::Int2),
184 &Type::INT4 => Ok(PostgresType::Int4),
185 &Type::INT8 => Ok(PostgresType::Int8),
186 &Type::FLOAT4 => Ok(PostgresType::Float4),
187 &Type::FLOAT8 => Ok(PostgresType::Float8),
188 &Type::NUMERIC => {
189 let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
190 DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
191 })?;
192 let scale = match v {
193 Some(v) => v.scale,
194 None => 0,
195 };
196 assert!((scale as u32) <= (i8::MAX as u32));
197 Ok(PostgresType::Numeric(scale.try_into().unwrap_or_default()))
198 }
199 &Type::OID => Ok(PostgresType::Oid),
200 &Type::NAME => Ok(PostgresType::Name),
201 &Type::VARCHAR => Ok(PostgresType::Varchar),
202 &Type::BPCHAR => Ok(PostgresType::Bpchar),
203 &Type::TEXT => Ok(PostgresType::Text),
204 &Type::BYTEA => Ok(PostgresType::Bytea),
205 &Type::DATE => Ok(PostgresType::Date),
206 &Type::TIMESTAMP => Ok(PostgresType::Timestamp),
207 &Type::TIMESTAMPTZ => Ok(PostgresType::TimestampTz),
208 &Type::TIME => Ok(PostgresType::Time),
209 &Type::INTERVAL => Ok(PostgresType::Interval),
210 &Type::BOOL => Ok(PostgresType::Bool),
211 &Type::JSON => Ok(PostgresType::Json),
212 &Type::JSONB => Ok(PostgresType::Jsonb),
213 &Type::INT2_ARRAY => Ok(PostgresType::Int2Array),
214 &Type::INT4_ARRAY => Ok(PostgresType::Int4Array),
215 &Type::INT8_ARRAY => Ok(PostgresType::Int8Array),
216 &Type::FLOAT4_ARRAY => Ok(PostgresType::Float4Array),
217 &Type::FLOAT8_ARRAY => Ok(PostgresType::Float8Array),
218 &Type::VARCHAR_ARRAY => Ok(PostgresType::VarcharArray),
219 &Type::BPCHAR_ARRAY => Ok(PostgresType::BpcharArray),
220 &Type::TEXT_ARRAY => Ok(PostgresType::TextArray),
221 &Type::BYTEA_ARRAY => Ok(PostgresType::ByteaArray),
222 &Type::BOOL_ARRAY => Ok(PostgresType::BoolArray),
223 &Type::XML => Ok(PostgresType::Xml),
224 &Type::UUID => Ok(PostgresType::Uuid),
225 other if other.name().eq_ignore_ascii_case("geometry") => Ok(PostgresType::PostGisGeometry),
226 _ => Err(DataFusionError::NotImplemented(format!(
227 "Unsupported postgres type {pg_type:?}",
228 ))),
229 }
230}
231
232fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
233 let mut remote_fields = vec![];
234 for (idx, col) in row.columns().iter().enumerate() {
235 remote_fields.push(RemoteField::new(
236 col.name(),
237 RemoteType::Postgres(pg_type_to_remote_type(col.type_(), row, idx)?),
238 true,
239 ));
240 }
241 Ok(RemoteSchema::new(remote_fields))
242}
243
244macro_rules! handle_primitive_type {
245 ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
246 let builder = $builder
247 .as_any_mut()
248 .downcast_mut::<$builder_ty>()
249 .unwrap_or_else(|| {
250 panic!(
251 "Failed to downcast builder to {} for {:?} and {:?}",
252 stringify!($builder_ty),
253 $field,
254 $col
255 )
256 });
257 let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
258 DataFusionError::Execution(format!(
259 "Failed to get {} value for {:?} and {:?}: {e:?}",
260 stringify!($value_ty),
261 $field,
262 $col
263 ))
264 })?;
265
266 match v {
267 Some(v) => builder.append_value($convert(v)?),
268 None => builder.append_null(),
269 }
270 }};
271}
272
273macro_rules! handle_primitive_array_type {
274 ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
275 let builder = $builder
276 .as_any_mut()
277 .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
278 .unwrap_or_else(|| {
279 panic!(
280 "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
281 $field, $col
282 )
283 });
284 let values_builder = builder
285 .values()
286 .as_any_mut()
287 .downcast_mut::<$values_builder_ty>()
288 .unwrap_or_else(|| {
289 panic!(
290 "Failed to downcast values builder to {} for {:?} and {:?}",
291 stringify!($builder_ty),
292 $field,
293 $col,
294 )
295 });
296 let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
297 DataFusionError::Execution(format!(
298 "Failed to get {} array value for {:?} and {:?}: {e:?}",
299 stringify!($value_ty),
300 $field,
301 $col,
302 ))
303 })?;
304
305 match v {
306 Some(v) => {
307 let v = v.into_iter().map(Some);
308 values_builder.extend(v);
309 builder.append(true);
310 }
311 None => builder.append_null(),
312 }
313 }};
314}
315
316#[derive(Debug)]
317struct BigDecimalFromSql {
318 inner: BigDecimal,
319 scale: u16,
320}
321
322impl BigDecimalFromSql {
323 fn to_decimal_128(&self) -> Option<i128> {
324 big_decimal_to_i128(&self.inner, Some(self.scale as i32))
325 }
326}
327
328#[allow(clippy::cast_sign_loss)]
329#[allow(clippy::cast_possible_wrap)]
330#[allow(clippy::cast_possible_truncation)]
331impl<'a> FromSql<'a> for BigDecimalFromSql {
332 fn from_sql(
333 _ty: &Type,
334 raw: &'a [u8],
335 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
336 let raw_u16: Vec<u16> = raw
337 .chunks(2)
338 .map(|chunk| {
339 if chunk.len() == 2 {
340 u16::from_be_bytes([chunk[0], chunk[1]])
341 } else {
342 u16::from_be_bytes([chunk[0], 0])
343 }
344 })
345 .collect();
346
347 let base_10_000_digit_count = raw_u16[0];
348 let weight = raw_u16[1] as i16;
349 let sign = raw_u16[2];
350 let scale = raw_u16[3];
351
352 let mut base_10_000_digits = Vec::new();
353 for i in 4..4 + base_10_000_digit_count {
354 base_10_000_digits.push(raw_u16[i as usize]);
355 }
356
357 let mut u8_digits = Vec::new();
358 for &base_10_000_digit in base_10_000_digits.iter().rev() {
359 let mut base_10_000_digit = base_10_000_digit;
360 let mut temp_result = Vec::new();
361 while base_10_000_digit > 0 {
362 temp_result.push((base_10_000_digit % 10) as u8);
363 base_10_000_digit /= 10;
364 }
365 while temp_result.len() < 4 {
366 temp_result.push(0);
367 }
368 u8_digits.extend(temp_result);
369 }
370 u8_digits.reverse();
371
372 let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
373 let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
374 u8_digits.resize(size as usize, 0);
375
376 let sign = match sign {
377 0x4000 => Sign::Minus,
378 0x0000 => Sign::Plus,
379 _ => {
380 return Err(Box::new(DataFusionError::Execution(
381 "Failed to parse big decimal from postgres numeric value".to_string(),
382 )));
383 }
384 };
385
386 let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
387 return Err(Box::new(DataFusionError::Execution(
388 "Failed to parse big decimal from postgres numeric value".to_string(),
389 )));
390 };
391 Ok(BigDecimalFromSql {
392 inner: BigDecimal::new(digits, i64::from(scale)),
393 scale,
394 })
395 }
396
397 fn accepts(ty: &Type) -> bool {
398 matches!(*ty, Type::NUMERIC)
399 }
400}
401
402#[derive(Debug)]
405struct IntervalFromSql {
406 time: i64,
407 day: i32,
408 month: i32,
409}
410
411impl<'a> FromSql<'a> for IntervalFromSql {
412 fn from_sql(
413 _ty: &Type,
414 raw: &'a [u8],
415 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
416 let mut cursor = std::io::Cursor::new(raw);
417
418 let time = cursor.read_i64::<BigEndian>()?;
419 let day = cursor.read_i32::<BigEndian>()?;
420 let month = cursor.read_i32::<BigEndian>()?;
421
422 Ok(IntervalFromSql { time, day, month })
423 }
424
425 fn accepts(ty: &Type) -> bool {
426 matches!(*ty, Type::INTERVAL)
427 }
428}
429
430struct GeometryFromSql<'a> {
431 wkb: &'a [u8],
432}
433
434impl<'a> FromSql<'a> for GeometryFromSql<'a> {
435 fn from_sql(
436 _ty: &Type,
437 raw: &'a [u8],
438 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
439 Ok(GeometryFromSql { wkb: raw })
440 }
441
442 fn accepts(ty: &Type) -> bool {
443 matches!(ty.name(), "geometry")
444 }
445}
446
447struct XmlFromSql<'a> {
448 xml: &'a str,
449}
450
451impl<'a> FromSql<'a> for XmlFromSql<'a> {
452 fn from_sql(
453 _ty: &Type,
454 raw: &'a [u8],
455 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
456 let xml = str::from_utf8(raw)?;
457 Ok(XmlFromSql { xml })
458 }
459
460 fn accepts(ty: &Type) -> bool {
461 matches!(*ty, Type::XML)
462 }
463}
464
465fn rows_to_batch(
466 rows: &[Row],
467 table_schema: &SchemaRef,
468 projection: Option<&Vec<usize>>,
469) -> DFResult<RecordBatch> {
470 let projected_schema = project_schema(table_schema, projection)?;
471 let mut array_builders = vec![];
472 for field in table_schema.fields() {
473 let builder = make_builder(field.data_type(), rows.len());
474 array_builders.push(builder);
475 }
476
477 for row in rows {
478 for (idx, field) in table_schema.fields.iter().enumerate() {
479 if !projections_contains(projection, idx) {
480 continue;
481 }
482 let builder = &mut array_builders[idx];
483 let col = row.columns().get(idx);
484 match field.data_type() {
485 DataType::Int16 => {
486 handle_primitive_type!(
487 builder,
488 field,
489 col,
490 Int16Builder,
491 i16,
492 row,
493 idx,
494 just_return
495 );
496 }
497 DataType::Int32 => {
498 handle_primitive_type!(
499 builder,
500 field,
501 col,
502 Int32Builder,
503 i32,
504 row,
505 idx,
506 just_return
507 );
508 }
509 DataType::UInt32 => {
510 handle_primitive_type!(
511 builder,
512 field,
513 col,
514 UInt32Builder,
515 u32,
516 row,
517 idx,
518 just_return
519 );
520 }
521 DataType::Int64 => {
522 handle_primitive_type!(
523 builder,
524 field,
525 col,
526 Int64Builder,
527 i64,
528 row,
529 idx,
530 just_return
531 );
532 }
533 DataType::Float32 => {
534 handle_primitive_type!(
535 builder,
536 field,
537 col,
538 Float32Builder,
539 f32,
540 row,
541 idx,
542 just_return
543 );
544 }
545 DataType::Float64 => {
546 handle_primitive_type!(
547 builder,
548 field,
549 col,
550 Float64Builder,
551 f64,
552 row,
553 idx,
554 just_return
555 );
556 }
557 DataType::Decimal128(_precision, _scale) => {
558 handle_primitive_type!(
559 builder,
560 field,
561 col,
562 Decimal128Builder,
563 BigDecimalFromSql,
564 row,
565 idx,
566 |v: BigDecimalFromSql| {
567 v.to_decimal_128().ok_or_else(|| {
568 DataFusionError::Execution(format!(
569 "Failed to convert BigDecimal {v:?} to i128",
570 ))
571 })
572 }
573 );
574 }
575 DataType::Utf8 => {
576 if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("xml") {
577 let convert: for<'a> fn(XmlFromSql<'a>) -> DFResult<&'a str> =
578 |v| Ok(v.xml);
579 handle_primitive_type!(
580 builder,
581 field,
582 col,
583 StringBuilder,
584 XmlFromSql,
585 row,
586 idx,
587 convert
588 );
589 } else {
590 handle_primitive_type!(
591 builder,
592 field,
593 col,
594 StringBuilder,
595 &str,
596 row,
597 idx,
598 just_return
599 );
600 }
601 }
602 DataType::LargeUtf8 => {
603 if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
604 handle_primitive_type!(
605 builder,
606 field,
607 col,
608 LargeStringBuilder,
609 serde_json::value::Value,
610 row,
611 idx,
612 |v: serde_json::value::Value| {
613 Ok::<_, DataFusionError>(v.to_string())
614 }
615 );
616 } else {
617 handle_primitive_type!(
618 builder,
619 field,
620 col,
621 LargeStringBuilder,
622 &str,
623 row,
624 idx,
625 just_return
626 );
627 }
628 }
629 DataType::Binary => {
630 if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
631 {
632 let convert: for<'a> fn(GeometryFromSql<'a>) -> DFResult<&'a [u8]> =
633 |v| Ok(v.wkb);
634 handle_primitive_type!(
635 builder,
636 field,
637 col,
638 BinaryBuilder,
639 GeometryFromSql,
640 row,
641 idx,
642 convert
643 );
644 } else if col.is_some()
645 && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
646 {
647 handle_primitive_type!(
648 builder,
649 field,
650 col,
651 BinaryBuilder,
652 serde_json::value::Value,
653 row,
654 idx,
655 |v: serde_json::value::Value| {
656 Ok::<_, DataFusionError>(v.to_string().into_bytes())
657 }
658 );
659 } else {
660 handle_primitive_type!(
661 builder,
662 field,
663 col,
664 BinaryBuilder,
665 Vec<u8>,
666 row,
667 idx,
668 just_return
669 );
670 }
671 }
672 DataType::FixedSizeBinary(_) => {
673 let builder = builder
674 .as_any_mut()
675 .downcast_mut::<FixedSizeBinaryBuilder>()
676 .unwrap_or_else(|| {
677 panic!(
678 "Failed to downcast builder to FixedSizeBinaryBuilder for {field:?}"
679 )
680 });
681 let v = if col.is_some()
682 && col.unwrap().type_().name().eq_ignore_ascii_case("uuid")
683 {
684 let v: Option<Uuid> = row.try_get(idx).map_err(|e| {
685 DataFusionError::Execution(format!(
686 "Failed to get Uuid value for field {:?}: {e:?}",
687 field
688 ))
689 })?;
690 v.map(|v| v.as_bytes().to_vec())
691 } else {
692 let v: Option<Vec<u8>> = row.try_get(idx).map_err(|e| {
693 DataFusionError::Execution(format!(
694 "Failed to get FixedSizeBinary value for field {:?}: {e:?}",
695 field
696 ))
697 })?;
698 v
699 };
700
701 match v {
702 Some(v) => builder.append_value(v)?,
703 None => builder.append_null(),
704 }
705 }
706 DataType::Timestamp(TimeUnit::Microsecond, None) => {
707 handle_primitive_type!(
708 builder,
709 field,
710 col,
711 TimestampMicrosecondBuilder,
712 SystemTime,
713 row,
714 idx,
715 |v: SystemTime| {
716 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
717 let timestamp: i64 = v
718 .as_micros()
719 .try_into()
720 .expect("Failed to convert SystemTime to i64");
721 Ok(timestamp)
722 } else {
723 Err(DataFusionError::Execution(format!(
724 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
725 )))
726 }
727 }
728 );
729 }
730 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
731 handle_primitive_type!(
732 builder,
733 field,
734 col,
735 TimestampNanosecondBuilder,
736 SystemTime,
737 row,
738 idx,
739 |v: SystemTime| {
740 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
741 let timestamp: i64 = v
742 .as_nanos()
743 .try_into()
744 .expect("Failed to convert SystemTime to i64");
745 Ok(timestamp)
746 } else {
747 Err(DataFusionError::Execution(format!(
748 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
749 )))
750 }
751 }
752 );
753 }
754 DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
755 handle_primitive_type!(
756 builder,
757 field,
758 col,
759 TimestampNanosecondBuilder,
760 chrono::DateTime<chrono::Utc>,
761 row,
762 idx,
763 |v: chrono::DateTime<chrono::Utc>| {
764 let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
765 Ok::<_, DataFusionError>(timestamp)
766 }
767 );
768 }
769 DataType::Time64(TimeUnit::Microsecond) => {
770 handle_primitive_type!(
771 builder,
772 field,
773 col,
774 Time64MicrosecondBuilder,
775 chrono::NaiveTime,
776 row,
777 idx,
778 |v: chrono::NaiveTime| {
779 let seconds = i64::from(v.num_seconds_from_midnight());
780 let microseconds = i64::from(v.nanosecond()) / 1000;
781 Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
782 }
783 );
784 }
785 DataType::Time64(TimeUnit::Nanosecond) => {
786 handle_primitive_type!(
787 builder,
788 field,
789 col,
790 Time64NanosecondBuilder,
791 chrono::NaiveTime,
792 row,
793 idx,
794 |v: chrono::NaiveTime| {
795 let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
796 * 1_000_000_000
797 + i64::from(v.nanosecond());
798 Ok::<_, DataFusionError>(timestamp)
799 }
800 );
801 }
802 DataType::Date32 => {
803 handle_primitive_type!(
804 builder,
805 field,
806 col,
807 Date32Builder,
808 chrono::NaiveDate,
809 row,
810 idx,
811 |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
812 );
813 }
814 DataType::Interval(IntervalUnit::MonthDayNano) => {
815 handle_primitive_type!(
816 builder,
817 field,
818 col,
819 IntervalMonthDayNanoBuilder,
820 IntervalFromSql,
821 row,
822 idx,
823 |v: IntervalFromSql| {
824 let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
825 v.month,
826 v.day,
827 v.time * 1_000,
828 );
829 Ok::<_, DataFusionError>(interval_month_day_nano)
830 }
831 );
832 }
833 DataType::Boolean => {
834 handle_primitive_type!(
835 builder,
836 field,
837 col,
838 BooleanBuilder,
839 bool,
840 row,
841 idx,
842 just_return
843 );
844 }
845 DataType::List(inner) => match inner.data_type() {
846 DataType::Int16 => {
847 handle_primitive_array_type!(
848 builder,
849 field,
850 col,
851 Int16Builder,
852 i16,
853 row,
854 idx
855 );
856 }
857 DataType::Int32 => {
858 handle_primitive_array_type!(
859 builder,
860 field,
861 col,
862 Int32Builder,
863 i32,
864 row,
865 idx
866 );
867 }
868 DataType::Int64 => {
869 handle_primitive_array_type!(
870 builder,
871 field,
872 col,
873 Int64Builder,
874 i64,
875 row,
876 idx
877 );
878 }
879 DataType::Float32 => {
880 handle_primitive_array_type!(
881 builder,
882 field,
883 col,
884 Float32Builder,
885 f32,
886 row,
887 idx
888 );
889 }
890 DataType::Float64 => {
891 handle_primitive_array_type!(
892 builder,
893 field,
894 col,
895 Float64Builder,
896 f64,
897 row,
898 idx
899 );
900 }
901 DataType::Utf8 => {
902 handle_primitive_array_type!(
903 builder,
904 field,
905 col,
906 StringBuilder,
907 &str,
908 row,
909 idx
910 );
911 }
912 DataType::Binary => {
913 handle_primitive_array_type!(
914 builder,
915 field,
916 col,
917 BinaryBuilder,
918 Vec<u8>,
919 row,
920 idx
921 );
922 }
923 DataType::Boolean => {
924 handle_primitive_array_type!(
925 builder,
926 field,
927 col,
928 BooleanBuilder,
929 bool,
930 row,
931 idx
932 );
933 }
934 _ => {
935 return Err(DataFusionError::NotImplemented(format!(
936 "Unsupported list data type {} for col: {:?}",
937 field.data_type(),
938 col
939 )));
940 }
941 },
942 _ => {
943 return Err(DataFusionError::NotImplemented(format!(
944 "Unsupported data type {} for col: {:?}",
945 field.data_type(),
946 col
947 )));
948 }
949 }
950 }
951 }
952 let projected_columns = array_builders
953 .into_iter()
954 .enumerate()
955 .filter(|(idx, _)| projections_contains(projection, *idx))
956 .map(|(_, mut builder)| builder.finish())
957 .collect::<Vec<ArrayRef>>();
958 let options = RecordBatchOptions::new().with_row_count(Some(rows.len()));
959 Ok(RecordBatch::try_new_with_options(
960 projected_schema,
961 projected_columns,
962 &options,
963 )?)
964}