1use crate::connection::{RemoteDbType, big_decimal_to_i128, just_return, projections_contains};
2use crate::{
3 Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4 RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13 ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14 Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15 IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
16 Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
17 TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20 DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use derive_getters::Getters;
27use derive_with::With;
28use futures::StreamExt;
29use num_bigint::{BigInt, Sign};
30use std::string::ToString;
31use std::sync::Arc;
32use std::time::{SystemTime, UNIX_EPOCH};
33
34#[derive(Debug, Clone, With, Getters)]
35pub struct PostgresConnectionOptions {
36 pub(crate) host: String,
37 pub(crate) port: u16,
38 pub(crate) username: String,
39 pub(crate) password: String,
40 pub(crate) database: Option<String>,
41 pub(crate) pool_max_size: usize,
42 pub(crate) stream_chunk_size: usize,
43}
44
45impl PostgresConnectionOptions {
46 pub fn new(
47 host: impl Into<String>,
48 port: u16,
49 username: impl Into<String>,
50 password: impl Into<String>,
51 ) -> Self {
52 Self {
53 host: host.into(),
54 port,
55 username: username.into(),
56 password: password.into(),
57 database: None,
58 pool_max_size: 10,
59 stream_chunk_size: 2048,
60 }
61 }
62}
63
64#[derive(Debug)]
65pub struct PostgresPool {
66 pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
67}
68
69#[async_trait::async_trait]
70impl Pool for PostgresPool {
71 async fn get(&self) -> DFResult<Arc<dyn Connection>> {
72 let conn = self.pool.get_owned().await.map_err(|e| {
73 DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
74 })?;
75 Ok(Arc::new(PostgresConnection { conn }))
76 }
77}
78
79pub(crate) async fn connect_postgres(
80 options: &PostgresConnectionOptions,
81) -> DFResult<PostgresPool> {
82 let mut config = bb8_postgres::tokio_postgres::config::Config::new();
83 config
84 .host(&options.host)
85 .port(options.port)
86 .user(&options.username)
87 .password(&options.password);
88 if let Some(database) = &options.database {
89 config.dbname(database);
90 }
91 let manager = PostgresConnectionManager::new(config, NoTls);
92 let pool = bb8::Pool::builder()
93 .max_size(options.pool_max_size as u32)
94 .build(manager)
95 .await
96 .map_err(|e| {
97 DataFusionError::Execution(format!(
98 "Failed to create postgres connection pool due to {e}",
99 ))
100 })?;
101
102 Ok(PostgresPool { pool })
103}
104
105#[derive(Debug)]
106pub(crate) struct PostgresConnection {
107 conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
108}
109
110#[async_trait::async_trait]
111impl Connection for PostgresConnection {
112 async fn infer_schema(&self, sql: &str) -> DFResult<RemoteSchemaRef> {
113 let sql = RemoteDbType::Postgres.query_limit_1(sql)?;
114 let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
115 DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
116 })?;
117 let remote_schema = Arc::new(build_remote_schema(&row)?);
118 Ok(remote_schema)
119 }
120
121 async fn query(
122 &self,
123 conn_options: &ConnectionOptions,
124 sql: &str,
125 table_schema: SchemaRef,
126 projection: Option<&Vec<usize>>,
127 unparsed_filters: &[String],
128 limit: Option<usize>,
129 ) -> DFResult<SendableRecordBatchStream> {
130 let projected_schema = project_schema(&table_schema, projection)?;
131 let sql = RemoteDbType::Postgres.try_rewrite_query(sql, unparsed_filters, limit)?;
132 let projection = projection.cloned();
133 let chunk_size = conn_options.stream_chunk_size();
134 let stream = self
135 .conn
136 .query_raw(&sql, Vec::<String>::new())
137 .await
138 .map_err(|e| {
139 DataFusionError::Execution(format!(
140 "Failed to execute query {sql} on postgres: {e}",
141 ))
142 })?
143 .chunks(chunk_size)
144 .boxed();
145
146 let stream = stream.map(move |rows| {
147 let rows: Vec<Row> = rows
148 .into_iter()
149 .collect::<Result<Vec<_>, _>>()
150 .map_err(|e| {
151 DataFusionError::Execution(format!(
152 "Failed to collect rows from postgres due to {e}",
153 ))
154 })?;
155 rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
156 });
157
158 Ok(Box::pin(RecordBatchStreamAdapter::new(
159 projected_schema,
160 stream,
161 )))
162 }
163}
164
165fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<PostgresType> {
166 match pg_type {
167 &Type::INT2 => Ok(PostgresType::Int2),
168 &Type::INT4 => Ok(PostgresType::Int4),
169 &Type::INT8 => Ok(PostgresType::Int8),
170 &Type::FLOAT4 => Ok(PostgresType::Float4),
171 &Type::FLOAT8 => Ok(PostgresType::Float8),
172 &Type::NUMERIC => {
173 let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
174 DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
175 })?;
176 let scale = match v {
177 Some(v) => v.scale,
178 None => 0,
179 };
180 assert!((scale as u32) <= (i8::MAX as u32));
181 Ok(PostgresType::Numeric(scale.try_into().unwrap_or_default()))
182 }
183 &Type::OID => Ok(PostgresType::Oid),
184 &Type::NAME => Ok(PostgresType::Name),
185 &Type::VARCHAR => Ok(PostgresType::Varchar),
186 &Type::BPCHAR => Ok(PostgresType::Bpchar),
187 &Type::TEXT => Ok(PostgresType::Text),
188 &Type::BYTEA => Ok(PostgresType::Bytea),
189 &Type::DATE => Ok(PostgresType::Date),
190 &Type::TIMESTAMP => Ok(PostgresType::Timestamp),
191 &Type::TIMESTAMPTZ => Ok(PostgresType::TimestampTz),
192 &Type::TIME => Ok(PostgresType::Time),
193 &Type::INTERVAL => Ok(PostgresType::Interval),
194 &Type::BOOL => Ok(PostgresType::Bool),
195 &Type::JSON => Ok(PostgresType::Json),
196 &Type::JSONB => Ok(PostgresType::Jsonb),
197 &Type::INT2_ARRAY => Ok(PostgresType::Int2Array),
198 &Type::INT4_ARRAY => Ok(PostgresType::Int4Array),
199 &Type::INT8_ARRAY => Ok(PostgresType::Int8Array),
200 &Type::FLOAT4_ARRAY => Ok(PostgresType::Float4Array),
201 &Type::FLOAT8_ARRAY => Ok(PostgresType::Float8Array),
202 &Type::VARCHAR_ARRAY => Ok(PostgresType::VarcharArray),
203 &Type::BPCHAR_ARRAY => Ok(PostgresType::BpcharArray),
204 &Type::TEXT_ARRAY => Ok(PostgresType::TextArray),
205 &Type::BYTEA_ARRAY => Ok(PostgresType::ByteaArray),
206 &Type::BOOL_ARRAY => Ok(PostgresType::BoolArray),
207 other if other.name().eq_ignore_ascii_case("geometry") => Ok(PostgresType::PostGisGeometry),
208 _ => Err(DataFusionError::NotImplemented(format!(
209 "Unsupported postgres type {pg_type:?}",
210 ))),
211 }
212}
213
214fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
215 let mut remote_fields = vec![];
216 for (idx, col) in row.columns().iter().enumerate() {
217 remote_fields.push(RemoteField::new(
218 col.name(),
219 RemoteType::Postgres(pg_type_to_remote_type(col.type_(), row, idx)?),
220 true,
221 ));
222 }
223 Ok(RemoteSchema::new(remote_fields))
224}
225
226macro_rules! handle_primitive_type {
227 ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
228 let builder = $builder
229 .as_any_mut()
230 .downcast_mut::<$builder_ty>()
231 .unwrap_or_else(|| {
232 panic!(
233 "Failed to downcast builder to {} for {:?} and {:?}",
234 stringify!($builder_ty),
235 $field,
236 $col
237 )
238 });
239 let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
240 DataFusionError::Execution(format!(
241 "Failed to get {} value for {:?} and {:?}: {e:?}",
242 stringify!($value_ty),
243 $field,
244 $col
245 ))
246 })?;
247
248 match v {
249 Some(v) => builder.append_value($convert(v)?),
250 None => builder.append_null(),
251 }
252 }};
253}
254
255macro_rules! handle_primitive_array_type {
256 ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
257 let builder = $builder
258 .as_any_mut()
259 .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
260 .unwrap_or_else(|| {
261 panic!(
262 "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
263 $field, $col
264 )
265 });
266 let values_builder = builder
267 .values()
268 .as_any_mut()
269 .downcast_mut::<$values_builder_ty>()
270 .unwrap_or_else(|| {
271 panic!(
272 "Failed to downcast values builder to {} for {:?} and {:?}",
273 stringify!($builder_ty),
274 $field,
275 $col,
276 )
277 });
278 let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
279 DataFusionError::Execution(format!(
280 "Failed to get {} array value for {:?} and {:?}: {e:?}",
281 stringify!($value_ty),
282 $field,
283 $col,
284 ))
285 })?;
286
287 match v {
288 Some(v) => {
289 let v = v.into_iter().map(Some);
290 values_builder.extend(v);
291 builder.append(true);
292 }
293 None => builder.append_null(),
294 }
295 }};
296}
297
298#[derive(Debug)]
299struct BigDecimalFromSql {
300 inner: BigDecimal,
301 scale: u16,
302}
303
304impl BigDecimalFromSql {
305 fn to_decimal_128(&self) -> Option<i128> {
306 big_decimal_to_i128(&self.inner, Some(self.scale as i32))
307 }
308}
309
310#[allow(clippy::cast_sign_loss)]
311#[allow(clippy::cast_possible_wrap)]
312#[allow(clippy::cast_possible_truncation)]
313impl<'a> FromSql<'a> for BigDecimalFromSql {
314 fn from_sql(
315 _ty: &Type,
316 raw: &'a [u8],
317 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
318 let raw_u16: Vec<u16> = raw
319 .chunks(2)
320 .map(|chunk| {
321 if chunk.len() == 2 {
322 u16::from_be_bytes([chunk[0], chunk[1]])
323 } else {
324 u16::from_be_bytes([chunk[0], 0])
325 }
326 })
327 .collect();
328
329 let base_10_000_digit_count = raw_u16[0];
330 let weight = raw_u16[1] as i16;
331 let sign = raw_u16[2];
332 let scale = raw_u16[3];
333
334 let mut base_10_000_digits = Vec::new();
335 for i in 4..4 + base_10_000_digit_count {
336 base_10_000_digits.push(raw_u16[i as usize]);
337 }
338
339 let mut u8_digits = Vec::new();
340 for &base_10_000_digit in base_10_000_digits.iter().rev() {
341 let mut base_10_000_digit = base_10_000_digit;
342 let mut temp_result = Vec::new();
343 while base_10_000_digit > 0 {
344 temp_result.push((base_10_000_digit % 10) as u8);
345 base_10_000_digit /= 10;
346 }
347 while temp_result.len() < 4 {
348 temp_result.push(0);
349 }
350 u8_digits.extend(temp_result);
351 }
352 u8_digits.reverse();
353
354 let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
355 let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
356 u8_digits.resize(size as usize, 0);
357
358 let sign = match sign {
359 0x4000 => Sign::Minus,
360 0x0000 => Sign::Plus,
361 _ => {
362 return Err(Box::new(DataFusionError::Execution(
363 "Failed to parse big decimal from postgres numeric value".to_string(),
364 )));
365 }
366 };
367
368 let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
369 return Err(Box::new(DataFusionError::Execution(
370 "Failed to parse big decimal from postgres numeric value".to_string(),
371 )));
372 };
373 Ok(BigDecimalFromSql {
374 inner: BigDecimal::new(digits, i64::from(scale)),
375 scale,
376 })
377 }
378
379 fn accepts(ty: &Type) -> bool {
380 matches!(*ty, Type::NUMERIC)
381 }
382}
383
384#[derive(Debug)]
387struct IntervalFromSql {
388 time: i64,
389 day: i32,
390 month: i32,
391}
392
393impl<'a> FromSql<'a> for IntervalFromSql {
394 fn from_sql(
395 _ty: &Type,
396 raw: &'a [u8],
397 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
398 let mut cursor = std::io::Cursor::new(raw);
399
400 let time = cursor.read_i64::<BigEndian>()?;
401 let day = cursor.read_i32::<BigEndian>()?;
402 let month = cursor.read_i32::<BigEndian>()?;
403
404 Ok(IntervalFromSql { time, day, month })
405 }
406
407 fn accepts(ty: &Type) -> bool {
408 matches!(*ty, Type::INTERVAL)
409 }
410}
411
412struct GeometryFromSql<'a> {
413 wkb: &'a [u8],
414}
415
416impl<'a> FromSql<'a> for GeometryFromSql<'a> {
417 fn from_sql(
418 _ty: &Type,
419 raw: &'a [u8],
420 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
421 Ok(GeometryFromSql { wkb: raw })
422 }
423
424 fn accepts(ty: &Type) -> bool {
425 matches!(ty.name(), "geometry")
426 }
427}
428
429fn rows_to_batch(
430 rows: &[Row],
431 table_schema: &SchemaRef,
432 projection: Option<&Vec<usize>>,
433) -> DFResult<RecordBatch> {
434 let projected_schema = project_schema(table_schema, projection)?;
435 let mut array_builders = vec![];
436 for field in table_schema.fields() {
437 let builder = make_builder(field.data_type(), rows.len());
438 array_builders.push(builder);
439 }
440
441 for row in rows {
442 for (idx, field) in table_schema.fields.iter().enumerate() {
443 if !projections_contains(projection, idx) {
444 continue;
445 }
446 let builder = &mut array_builders[idx];
447 let col = row.columns().get(idx);
448 match field.data_type() {
449 DataType::Int16 => {
450 handle_primitive_type!(
451 builder,
452 field,
453 col,
454 Int16Builder,
455 i16,
456 row,
457 idx,
458 just_return
459 );
460 }
461 DataType::Int32 => {
462 handle_primitive_type!(
463 builder,
464 field,
465 col,
466 Int32Builder,
467 i32,
468 row,
469 idx,
470 just_return
471 );
472 }
473 DataType::UInt32 => {
474 handle_primitive_type!(
475 builder,
476 field,
477 col,
478 UInt32Builder,
479 u32,
480 row,
481 idx,
482 just_return
483 );
484 }
485 DataType::Int64 => {
486 handle_primitive_type!(
487 builder,
488 field,
489 col,
490 Int64Builder,
491 i64,
492 row,
493 idx,
494 just_return
495 );
496 }
497 DataType::Float32 => {
498 handle_primitive_type!(
499 builder,
500 field,
501 col,
502 Float32Builder,
503 f32,
504 row,
505 idx,
506 just_return
507 );
508 }
509 DataType::Float64 => {
510 handle_primitive_type!(
511 builder,
512 field,
513 col,
514 Float64Builder,
515 f64,
516 row,
517 idx,
518 just_return
519 );
520 }
521 DataType::Decimal128(_precision, _scale) => {
522 handle_primitive_type!(
523 builder,
524 field,
525 col,
526 Decimal128Builder,
527 BigDecimalFromSql,
528 row,
529 idx,
530 |v: BigDecimalFromSql| {
531 v.to_decimal_128().ok_or_else(|| {
532 DataFusionError::Execution(format!(
533 "Failed to convert BigDecimal {v:?} to i128",
534 ))
535 })
536 }
537 );
538 }
539 DataType::Utf8 => {
540 handle_primitive_type!(
541 builder,
542 field,
543 col,
544 StringBuilder,
545 &str,
546 row,
547 idx,
548 just_return
549 );
550 }
551 DataType::LargeUtf8 => {
552 if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
553 handle_primitive_type!(
554 builder,
555 field,
556 col,
557 LargeStringBuilder,
558 serde_json::value::Value,
559 row,
560 idx,
561 |v: serde_json::value::Value| {
562 Ok::<_, DataFusionError>(v.to_string())
563 }
564 );
565 } else {
566 handle_primitive_type!(
567 builder,
568 field,
569 col,
570 LargeStringBuilder,
571 &str,
572 row,
573 idx,
574 just_return
575 );
576 }
577 }
578 DataType::Binary => {
579 if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
580 {
581 let convert: for<'a> fn(GeometryFromSql<'a>) -> DFResult<&'a [u8]> =
582 |v| Ok(v.wkb);
583 handle_primitive_type!(
584 builder,
585 field,
586 col,
587 BinaryBuilder,
588 GeometryFromSql,
589 row,
590 idx,
591 convert
592 );
593 } else if col.is_some()
594 && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
595 {
596 handle_primitive_type!(
597 builder,
598 field,
599 col,
600 BinaryBuilder,
601 serde_json::value::Value,
602 row,
603 idx,
604 |v: serde_json::value::Value| {
605 Ok::<_, DataFusionError>(v.to_string().into_bytes())
606 }
607 );
608 } else {
609 handle_primitive_type!(
610 builder,
611 field,
612 col,
613 BinaryBuilder,
614 Vec<u8>,
615 row,
616 idx,
617 just_return
618 );
619 }
620 }
621 DataType::Timestamp(TimeUnit::Microsecond, None) => {
622 handle_primitive_type!(
623 builder,
624 field,
625 col,
626 TimestampMicrosecondBuilder,
627 SystemTime,
628 row,
629 idx,
630 |v: SystemTime| {
631 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
632 let timestamp: i64 = v
633 .as_micros()
634 .try_into()
635 .expect("Failed to convert SystemTime to i64");
636 Ok(timestamp)
637 } else {
638 Err(DataFusionError::Execution(format!(
639 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
640 )))
641 }
642 }
643 );
644 }
645 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
646 handle_primitive_type!(
647 builder,
648 field,
649 col,
650 TimestampNanosecondBuilder,
651 SystemTime,
652 row,
653 idx,
654 |v: SystemTime| {
655 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
656 let timestamp: i64 = v
657 .as_nanos()
658 .try_into()
659 .expect("Failed to convert SystemTime to i64");
660 Ok(timestamp)
661 } else {
662 Err(DataFusionError::Execution(format!(
663 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
664 )))
665 }
666 }
667 );
668 }
669 DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
670 handle_primitive_type!(
671 builder,
672 field,
673 col,
674 TimestampNanosecondBuilder,
675 chrono::DateTime<chrono::Utc>,
676 row,
677 idx,
678 |v: chrono::DateTime<chrono::Utc>| {
679 let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
680 Ok::<_, DataFusionError>(timestamp)
681 }
682 );
683 }
684 DataType::Time64(TimeUnit::Microsecond) => {
685 handle_primitive_type!(
686 builder,
687 field,
688 col,
689 Time64MicrosecondBuilder,
690 chrono::NaiveTime,
691 row,
692 idx,
693 |v: chrono::NaiveTime| {
694 let seconds = i64::from(v.num_seconds_from_midnight());
695 let microseconds = i64::from(v.nanosecond()) / 1000;
696 Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
697 }
698 );
699 }
700 DataType::Time64(TimeUnit::Nanosecond) => {
701 handle_primitive_type!(
702 builder,
703 field,
704 col,
705 Time64NanosecondBuilder,
706 chrono::NaiveTime,
707 row,
708 idx,
709 |v: chrono::NaiveTime| {
710 let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
711 * 1_000_000_000
712 + i64::from(v.nanosecond());
713 Ok::<_, DataFusionError>(timestamp)
714 }
715 );
716 }
717 DataType::Date32 => {
718 handle_primitive_type!(
719 builder,
720 field,
721 col,
722 Date32Builder,
723 chrono::NaiveDate,
724 row,
725 idx,
726 |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
727 );
728 }
729 DataType::Interval(IntervalUnit::MonthDayNano) => {
730 handle_primitive_type!(
731 builder,
732 field,
733 col,
734 IntervalMonthDayNanoBuilder,
735 IntervalFromSql,
736 row,
737 idx,
738 |v: IntervalFromSql| {
739 let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
740 v.month,
741 v.day,
742 v.time * 1_000,
743 );
744 Ok::<_, DataFusionError>(interval_month_day_nano)
745 }
746 );
747 }
748 DataType::Boolean => {
749 handle_primitive_type!(
750 builder,
751 field,
752 col,
753 BooleanBuilder,
754 bool,
755 row,
756 idx,
757 just_return
758 );
759 }
760 DataType::List(inner) => match inner.data_type() {
761 DataType::Int16 => {
762 handle_primitive_array_type!(
763 builder,
764 field,
765 col,
766 Int16Builder,
767 i16,
768 row,
769 idx
770 );
771 }
772 DataType::Int32 => {
773 handle_primitive_array_type!(
774 builder,
775 field,
776 col,
777 Int32Builder,
778 i32,
779 row,
780 idx
781 );
782 }
783 DataType::Int64 => {
784 handle_primitive_array_type!(
785 builder,
786 field,
787 col,
788 Int64Builder,
789 i64,
790 row,
791 idx
792 );
793 }
794 DataType::Float32 => {
795 handle_primitive_array_type!(
796 builder,
797 field,
798 col,
799 Float32Builder,
800 f32,
801 row,
802 idx
803 );
804 }
805 DataType::Float64 => {
806 handle_primitive_array_type!(
807 builder,
808 field,
809 col,
810 Float64Builder,
811 f64,
812 row,
813 idx
814 );
815 }
816 DataType::Utf8 => {
817 handle_primitive_array_type!(
818 builder,
819 field,
820 col,
821 StringBuilder,
822 &str,
823 row,
824 idx
825 );
826 }
827 DataType::Binary => {
828 handle_primitive_array_type!(
829 builder,
830 field,
831 col,
832 BinaryBuilder,
833 Vec<u8>,
834 row,
835 idx
836 );
837 }
838 DataType::Boolean => {
839 handle_primitive_array_type!(
840 builder,
841 field,
842 col,
843 BooleanBuilder,
844 bool,
845 row,
846 idx
847 );
848 }
849 _ => {
850 return Err(DataFusionError::NotImplemented(format!(
851 "Unsupported list data type {:?} for col: {:?}",
852 field.data_type(),
853 col
854 )));
855 }
856 },
857 _ => {
858 return Err(DataFusionError::NotImplemented(format!(
859 "Unsupported data type {:?} for col: {:?}",
860 field.data_type(),
861 col
862 )));
863 }
864 }
865 }
866 }
867 let projected_columns = array_builders
868 .into_iter()
869 .enumerate()
870 .filter(|(idx, _)| projections_contains(projection, *idx))
871 .map(|(_, mut builder)| builder.finish())
872 .collect::<Vec<ArrayRef>>();
873 Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
874}