1use crate::connection::{RemoteDbType, big_decimal_to_i128, just_return, projections_contains};
2use crate::{
3 Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4 RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13 ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14 Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15 IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
16 Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
17 TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20 DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use datafusion::prelude::Expr;
27use derive_getters::Getters;
28use derive_with::With;
29use futures::StreamExt;
30use num_bigint::{BigInt, Sign};
31use std::string::ToString;
32use std::sync::Arc;
33use std::time::{SystemTime, UNIX_EPOCH};
34
35#[derive(Debug, Clone, With, Getters)]
36pub struct PostgresConnectionOptions {
37 pub(crate) host: String,
38 pub(crate) port: u16,
39 pub(crate) username: String,
40 pub(crate) password: String,
41 pub(crate) database: Option<String>,
42 pub(crate) pool_max_size: usize,
43 pub(crate) stream_chunk_size: usize,
44}
45
46impl PostgresConnectionOptions {
47 pub fn new(
48 host: impl Into<String>,
49 port: u16,
50 username: impl Into<String>,
51 password: impl Into<String>,
52 ) -> Self {
53 Self {
54 host: host.into(),
55 port,
56 username: username.into(),
57 password: password.into(),
58 database: None,
59 pool_max_size: 10,
60 stream_chunk_size: 2048,
61 }
62 }
63}
64
65#[derive(Debug)]
66pub struct PostgresPool {
67 pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
68}
69
70#[async_trait::async_trait]
71impl Pool for PostgresPool {
72 async fn get(&self) -> DFResult<Arc<dyn Connection>> {
73 let conn = self.pool.get_owned().await.map_err(|e| {
74 DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
75 })?;
76 Ok(Arc::new(PostgresConnection { conn }))
77 }
78}
79
80pub(crate) async fn connect_postgres(
81 options: &PostgresConnectionOptions,
82) -> DFResult<PostgresPool> {
83 let mut config = bb8_postgres::tokio_postgres::config::Config::new();
84 config
85 .host(&options.host)
86 .port(options.port)
87 .user(&options.username)
88 .password(&options.password);
89 if let Some(database) = &options.database {
90 config.dbname(database);
91 }
92 let manager = PostgresConnectionManager::new(config, NoTls);
93 let pool = bb8::Pool::builder()
94 .max_size(options.pool_max_size as u32)
95 .build(manager)
96 .await
97 .map_err(|e| {
98 DataFusionError::Execution(format!(
99 "Failed to create postgres connection pool due to {e}",
100 ))
101 })?;
102
103 Ok(PostgresPool { pool })
104}
105
106#[derive(Debug)]
107pub(crate) struct PostgresConnection {
108 conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
109}
110
111#[async_trait::async_trait]
112impl Connection for PostgresConnection {
113 async fn infer_schema(&self, sql: &str) -> DFResult<RemoteSchemaRef> {
114 let sql = RemoteDbType::Postgres.query_limit_1(sql)?;
115 let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
116 DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
117 })?;
118 let remote_schema = Arc::new(build_remote_schema(&row)?);
119 Ok(remote_schema)
120 }
121
122 async fn query(
123 &self,
124 conn_options: &ConnectionOptions,
125 sql: &str,
126 table_schema: SchemaRef,
127 projection: Option<&Vec<usize>>,
128 filters: &[Expr],
129 limit: Option<usize>,
130 ) -> DFResult<SendableRecordBatchStream> {
131 let projected_schema = project_schema(&table_schema, projection)?;
132 let sql = RemoteDbType::Postgres.try_rewrite_query(sql, filters, limit)?;
133 let projection = projection.cloned();
134 let chunk_size = conn_options.stream_chunk_size();
135 let stream = self
136 .conn
137 .query_raw(&sql, Vec::<String>::new())
138 .await
139 .map_err(|e| {
140 DataFusionError::Execution(format!(
141 "Failed to execute query {sql} on postgres: {e}",
142 ))
143 })?
144 .chunks(chunk_size)
145 .boxed();
146
147 let stream = stream.map(move |rows| {
148 let rows: Vec<Row> = rows
149 .into_iter()
150 .collect::<Result<Vec<_>, _>>()
151 .map_err(|e| {
152 DataFusionError::Execution(format!(
153 "Failed to collect rows from postgres due to {e}",
154 ))
155 })?;
156 rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
157 });
158
159 Ok(Box::pin(RecordBatchStreamAdapter::new(
160 projected_schema,
161 stream,
162 )))
163 }
164}
165
166fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<PostgresType> {
167 match pg_type {
168 &Type::INT2 => Ok(PostgresType::Int2),
169 &Type::INT4 => Ok(PostgresType::Int4),
170 &Type::INT8 => Ok(PostgresType::Int8),
171 &Type::FLOAT4 => Ok(PostgresType::Float4),
172 &Type::FLOAT8 => Ok(PostgresType::Float8),
173 &Type::NUMERIC => {
174 let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
175 DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
176 })?;
177 let scale = match v {
178 Some(v) => v.scale,
179 None => 0,
180 };
181 assert!((scale as u32) <= (i8::MAX as u32));
182 Ok(PostgresType::Numeric(scale.try_into().unwrap_or_default()))
183 }
184 &Type::OID => Ok(PostgresType::Oid),
185 &Type::NAME => Ok(PostgresType::Name),
186 &Type::VARCHAR => Ok(PostgresType::Varchar),
187 &Type::BPCHAR => Ok(PostgresType::Bpchar),
188 &Type::TEXT => Ok(PostgresType::Text),
189 &Type::BYTEA => Ok(PostgresType::Bytea),
190 &Type::DATE => Ok(PostgresType::Date),
191 &Type::TIMESTAMP => Ok(PostgresType::Timestamp),
192 &Type::TIMESTAMPTZ => Ok(PostgresType::TimestampTz),
193 &Type::TIME => Ok(PostgresType::Time),
194 &Type::INTERVAL => Ok(PostgresType::Interval),
195 &Type::BOOL => Ok(PostgresType::Bool),
196 &Type::JSON => Ok(PostgresType::Json),
197 &Type::JSONB => Ok(PostgresType::Jsonb),
198 &Type::INT2_ARRAY => Ok(PostgresType::Int2Array),
199 &Type::INT4_ARRAY => Ok(PostgresType::Int4Array),
200 &Type::INT8_ARRAY => Ok(PostgresType::Int8Array),
201 &Type::FLOAT4_ARRAY => Ok(PostgresType::Float4Array),
202 &Type::FLOAT8_ARRAY => Ok(PostgresType::Float8Array),
203 &Type::VARCHAR_ARRAY => Ok(PostgresType::VarcharArray),
204 &Type::BPCHAR_ARRAY => Ok(PostgresType::BpcharArray),
205 &Type::TEXT_ARRAY => Ok(PostgresType::TextArray),
206 &Type::BYTEA_ARRAY => Ok(PostgresType::ByteaArray),
207 &Type::BOOL_ARRAY => Ok(PostgresType::BoolArray),
208 other if other.name().eq_ignore_ascii_case("geometry") => Ok(PostgresType::PostGisGeometry),
209 _ => Err(DataFusionError::NotImplemented(format!(
210 "Unsupported postgres type {pg_type:?}",
211 ))),
212 }
213}
214
215fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
216 let mut remote_fields = vec![];
217 for (idx, col) in row.columns().iter().enumerate() {
218 remote_fields.push(RemoteField::new(
219 col.name(),
220 RemoteType::Postgres(pg_type_to_remote_type(col.type_(), row, idx)?),
221 true,
222 ));
223 }
224 Ok(RemoteSchema::new(remote_fields))
225}
226
227macro_rules! handle_primitive_type {
228 ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
229 let builder = $builder
230 .as_any_mut()
231 .downcast_mut::<$builder_ty>()
232 .unwrap_or_else(|| {
233 panic!(
234 "Failed to downcast builder to {} for {:?} and {:?}",
235 stringify!($builder_ty),
236 $field,
237 $col
238 )
239 });
240 let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
241 DataFusionError::Execution(format!(
242 "Failed to get {} value for {:?} and {:?}: {e:?}",
243 stringify!($value_ty),
244 $field,
245 $col
246 ))
247 })?;
248
249 match v {
250 Some(v) => builder.append_value($convert(v)?),
251 None => builder.append_null(),
252 }
253 }};
254}
255
256macro_rules! handle_primitive_array_type {
257 ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
258 let builder = $builder
259 .as_any_mut()
260 .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
261 .unwrap_or_else(|| {
262 panic!(
263 "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
264 $field, $col
265 )
266 });
267 let values_builder = builder
268 .values()
269 .as_any_mut()
270 .downcast_mut::<$values_builder_ty>()
271 .unwrap_or_else(|| {
272 panic!(
273 "Failed to downcast values builder to {} for {:?} and {:?}",
274 stringify!($builder_ty),
275 $field,
276 $col,
277 )
278 });
279 let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
280 DataFusionError::Execution(format!(
281 "Failed to get {} array value for {:?} and {:?}: {e:?}",
282 stringify!($value_ty),
283 $field,
284 $col,
285 ))
286 })?;
287
288 match v {
289 Some(v) => {
290 let v = v.into_iter().map(Some);
291 values_builder.extend(v);
292 builder.append(true);
293 }
294 None => builder.append_null(),
295 }
296 }};
297}
298
299#[derive(Debug)]
300struct BigDecimalFromSql {
301 inner: BigDecimal,
302 scale: u16,
303}
304
305impl BigDecimalFromSql {
306 fn to_decimal_128(&self) -> Option<i128> {
307 big_decimal_to_i128(&self.inner, Some(self.scale as i32))
308 }
309}
310
311#[allow(clippy::cast_sign_loss)]
312#[allow(clippy::cast_possible_wrap)]
313#[allow(clippy::cast_possible_truncation)]
314impl<'a> FromSql<'a> for BigDecimalFromSql {
315 fn from_sql(
316 _ty: &Type,
317 raw: &'a [u8],
318 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
319 let raw_u16: Vec<u16> = raw
320 .chunks(2)
321 .map(|chunk| {
322 if chunk.len() == 2 {
323 u16::from_be_bytes([chunk[0], chunk[1]])
324 } else {
325 u16::from_be_bytes([chunk[0], 0])
326 }
327 })
328 .collect();
329
330 let base_10_000_digit_count = raw_u16[0];
331 let weight = raw_u16[1] as i16;
332 let sign = raw_u16[2];
333 let scale = raw_u16[3];
334
335 let mut base_10_000_digits = Vec::new();
336 for i in 4..4 + base_10_000_digit_count {
337 base_10_000_digits.push(raw_u16[i as usize]);
338 }
339
340 let mut u8_digits = Vec::new();
341 for &base_10_000_digit in base_10_000_digits.iter().rev() {
342 let mut base_10_000_digit = base_10_000_digit;
343 let mut temp_result = Vec::new();
344 while base_10_000_digit > 0 {
345 temp_result.push((base_10_000_digit % 10) as u8);
346 base_10_000_digit /= 10;
347 }
348 while temp_result.len() < 4 {
349 temp_result.push(0);
350 }
351 u8_digits.extend(temp_result);
352 }
353 u8_digits.reverse();
354
355 let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
356 let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
357 u8_digits.resize(size as usize, 0);
358
359 let sign = match sign {
360 0x4000 => Sign::Minus,
361 0x0000 => Sign::Plus,
362 _ => {
363 return Err(Box::new(DataFusionError::Execution(
364 "Failed to parse big decimal from postgres numeric value".to_string(),
365 )));
366 }
367 };
368
369 let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
370 return Err(Box::new(DataFusionError::Execution(
371 "Failed to parse big decimal from postgres numeric value".to_string(),
372 )));
373 };
374 Ok(BigDecimalFromSql {
375 inner: BigDecimal::new(digits, i64::from(scale)),
376 scale,
377 })
378 }
379
380 fn accepts(ty: &Type) -> bool {
381 matches!(*ty, Type::NUMERIC)
382 }
383}
384
385#[derive(Debug)]
388struct IntervalFromSql {
389 time: i64,
390 day: i32,
391 month: i32,
392}
393
394impl<'a> FromSql<'a> for IntervalFromSql {
395 fn from_sql(
396 _ty: &Type,
397 raw: &'a [u8],
398 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
399 let mut cursor = std::io::Cursor::new(raw);
400
401 let time = cursor.read_i64::<BigEndian>()?;
402 let day = cursor.read_i32::<BigEndian>()?;
403 let month = cursor.read_i32::<BigEndian>()?;
404
405 Ok(IntervalFromSql { time, day, month })
406 }
407
408 fn accepts(ty: &Type) -> bool {
409 matches!(*ty, Type::INTERVAL)
410 }
411}
412
413struct GeometryFromSql<'a> {
414 wkb: &'a [u8],
415}
416
417impl<'a> FromSql<'a> for GeometryFromSql<'a> {
418 fn from_sql(
419 _ty: &Type,
420 raw: &'a [u8],
421 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
422 Ok(GeometryFromSql { wkb: raw })
423 }
424
425 fn accepts(ty: &Type) -> bool {
426 matches!(ty.name(), "geometry")
427 }
428}
429
430fn rows_to_batch(
431 rows: &[Row],
432 table_schema: &SchemaRef,
433 projection: Option<&Vec<usize>>,
434) -> DFResult<RecordBatch> {
435 let projected_schema = project_schema(table_schema, projection)?;
436 let mut array_builders = vec![];
437 for field in table_schema.fields() {
438 let builder = make_builder(field.data_type(), rows.len());
439 array_builders.push(builder);
440 }
441
442 for row in rows {
443 for (idx, field) in table_schema.fields.iter().enumerate() {
444 if !projections_contains(projection, idx) {
445 continue;
446 }
447 let builder = &mut array_builders[idx];
448 let col = row.columns().get(idx);
449 match field.data_type() {
450 DataType::Int16 => {
451 handle_primitive_type!(
452 builder,
453 field,
454 col,
455 Int16Builder,
456 i16,
457 row,
458 idx,
459 just_return
460 );
461 }
462 DataType::Int32 => {
463 handle_primitive_type!(
464 builder,
465 field,
466 col,
467 Int32Builder,
468 i32,
469 row,
470 idx,
471 just_return
472 );
473 }
474 DataType::UInt32 => {
475 handle_primitive_type!(
476 builder,
477 field,
478 col,
479 UInt32Builder,
480 u32,
481 row,
482 idx,
483 just_return
484 );
485 }
486 DataType::Int64 => {
487 handle_primitive_type!(
488 builder,
489 field,
490 col,
491 Int64Builder,
492 i64,
493 row,
494 idx,
495 just_return
496 );
497 }
498 DataType::Float32 => {
499 handle_primitive_type!(
500 builder,
501 field,
502 col,
503 Float32Builder,
504 f32,
505 row,
506 idx,
507 just_return
508 );
509 }
510 DataType::Float64 => {
511 handle_primitive_type!(
512 builder,
513 field,
514 col,
515 Float64Builder,
516 f64,
517 row,
518 idx,
519 just_return
520 );
521 }
522 DataType::Decimal128(_precision, _scale) => {
523 handle_primitive_type!(
524 builder,
525 field,
526 col,
527 Decimal128Builder,
528 BigDecimalFromSql,
529 row,
530 idx,
531 |v: BigDecimalFromSql| {
532 v.to_decimal_128().ok_or_else(|| {
533 DataFusionError::Execution(format!(
534 "Failed to convert BigDecimal {v:?} to i128",
535 ))
536 })
537 }
538 );
539 }
540 DataType::Utf8 => {
541 handle_primitive_type!(
542 builder,
543 field,
544 col,
545 StringBuilder,
546 &str,
547 row,
548 idx,
549 just_return
550 );
551 }
552 DataType::LargeUtf8 => {
553 if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
554 handle_primitive_type!(
555 builder,
556 field,
557 col,
558 LargeStringBuilder,
559 serde_json::value::Value,
560 row,
561 idx,
562 |v: serde_json::value::Value| {
563 Ok::<_, DataFusionError>(v.to_string())
564 }
565 );
566 } else {
567 handle_primitive_type!(
568 builder,
569 field,
570 col,
571 LargeStringBuilder,
572 &str,
573 row,
574 idx,
575 just_return
576 );
577 }
578 }
579 DataType::Binary => {
580 if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
581 {
582 let convert: for<'a> fn(GeometryFromSql<'a>) -> DFResult<&'a [u8]> =
583 |v| Ok(v.wkb);
584 handle_primitive_type!(
585 builder,
586 field,
587 col,
588 BinaryBuilder,
589 GeometryFromSql,
590 row,
591 idx,
592 convert
593 );
594 } else if col.is_some()
595 && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
596 {
597 handle_primitive_type!(
598 builder,
599 field,
600 col,
601 BinaryBuilder,
602 serde_json::value::Value,
603 row,
604 idx,
605 |v: serde_json::value::Value| {
606 Ok::<_, DataFusionError>(v.to_string().into_bytes())
607 }
608 );
609 } else {
610 handle_primitive_type!(
611 builder,
612 field,
613 col,
614 BinaryBuilder,
615 Vec<u8>,
616 row,
617 idx,
618 just_return
619 );
620 }
621 }
622 DataType::Timestamp(TimeUnit::Microsecond, None) => {
623 handle_primitive_type!(
624 builder,
625 field,
626 col,
627 TimestampMicrosecondBuilder,
628 SystemTime,
629 row,
630 idx,
631 |v: SystemTime| {
632 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
633 let timestamp: i64 = v
634 .as_micros()
635 .try_into()
636 .expect("Failed to convert SystemTime to i64");
637 Ok(timestamp)
638 } else {
639 Err(DataFusionError::Execution(format!(
640 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
641 )))
642 }
643 }
644 );
645 }
646 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
647 handle_primitive_type!(
648 builder,
649 field,
650 col,
651 TimestampNanosecondBuilder,
652 SystemTime,
653 row,
654 idx,
655 |v: SystemTime| {
656 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
657 let timestamp: i64 = v
658 .as_nanos()
659 .try_into()
660 .expect("Failed to convert SystemTime to i64");
661 Ok(timestamp)
662 } else {
663 Err(DataFusionError::Execution(format!(
664 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
665 )))
666 }
667 }
668 );
669 }
670 DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
671 handle_primitive_type!(
672 builder,
673 field,
674 col,
675 TimestampNanosecondBuilder,
676 chrono::DateTime<chrono::Utc>,
677 row,
678 idx,
679 |v: chrono::DateTime<chrono::Utc>| {
680 let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
681 Ok::<_, DataFusionError>(timestamp)
682 }
683 );
684 }
685 DataType::Time64(TimeUnit::Microsecond) => {
686 handle_primitive_type!(
687 builder,
688 field,
689 col,
690 Time64MicrosecondBuilder,
691 chrono::NaiveTime,
692 row,
693 idx,
694 |v: chrono::NaiveTime| {
695 let seconds = i64::from(v.num_seconds_from_midnight());
696 let microseconds = i64::from(v.nanosecond()) / 1000;
697 Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
698 }
699 );
700 }
701 DataType::Time64(TimeUnit::Nanosecond) => {
702 handle_primitive_type!(
703 builder,
704 field,
705 col,
706 Time64NanosecondBuilder,
707 chrono::NaiveTime,
708 row,
709 idx,
710 |v: chrono::NaiveTime| {
711 let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
712 * 1_000_000_000
713 + i64::from(v.nanosecond());
714 Ok::<_, DataFusionError>(timestamp)
715 }
716 );
717 }
718 DataType::Date32 => {
719 handle_primitive_type!(
720 builder,
721 field,
722 col,
723 Date32Builder,
724 chrono::NaiveDate,
725 row,
726 idx,
727 |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
728 );
729 }
730 DataType::Interval(IntervalUnit::MonthDayNano) => {
731 handle_primitive_type!(
732 builder,
733 field,
734 col,
735 IntervalMonthDayNanoBuilder,
736 IntervalFromSql,
737 row,
738 idx,
739 |v: IntervalFromSql| {
740 let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
741 v.month,
742 v.day,
743 v.time * 1_000,
744 );
745 Ok::<_, DataFusionError>(interval_month_day_nano)
746 }
747 );
748 }
749 DataType::Boolean => {
750 handle_primitive_type!(
751 builder,
752 field,
753 col,
754 BooleanBuilder,
755 bool,
756 row,
757 idx,
758 just_return
759 );
760 }
761 DataType::List(inner) => match inner.data_type() {
762 DataType::Int16 => {
763 handle_primitive_array_type!(
764 builder,
765 field,
766 col,
767 Int16Builder,
768 i16,
769 row,
770 idx
771 );
772 }
773 DataType::Int32 => {
774 handle_primitive_array_type!(
775 builder,
776 field,
777 col,
778 Int32Builder,
779 i32,
780 row,
781 idx
782 );
783 }
784 DataType::Int64 => {
785 handle_primitive_array_type!(
786 builder,
787 field,
788 col,
789 Int64Builder,
790 i64,
791 row,
792 idx
793 );
794 }
795 DataType::Float32 => {
796 handle_primitive_array_type!(
797 builder,
798 field,
799 col,
800 Float32Builder,
801 f32,
802 row,
803 idx
804 );
805 }
806 DataType::Float64 => {
807 handle_primitive_array_type!(
808 builder,
809 field,
810 col,
811 Float64Builder,
812 f64,
813 row,
814 idx
815 );
816 }
817 DataType::Utf8 => {
818 handle_primitive_array_type!(
819 builder,
820 field,
821 col,
822 StringBuilder,
823 &str,
824 row,
825 idx
826 );
827 }
828 DataType::Binary => {
829 handle_primitive_array_type!(
830 builder,
831 field,
832 col,
833 BinaryBuilder,
834 Vec<u8>,
835 row,
836 idx
837 );
838 }
839 DataType::Boolean => {
840 handle_primitive_array_type!(
841 builder,
842 field,
843 col,
844 BooleanBuilder,
845 bool,
846 row,
847 idx
848 );
849 }
850 _ => {
851 return Err(DataFusionError::NotImplemented(format!(
852 "Unsupported list data type {:?} for col: {:?}",
853 field.data_type(),
854 col
855 )));
856 }
857 },
858 _ => {
859 return Err(DataFusionError::NotImplemented(format!(
860 "Unsupported data type {:?} for col: {:?}",
861 field.data_type(),
862 col
863 )));
864 }
865 }
866 }
867 }
868 let projected_columns = array_builders
869 .into_iter()
870 .enumerate()
871 .filter(|(idx, _)| projections_contains(projection, *idx))
872 .map(|(_, mut builder)| builder.finish())
873 .collect::<Vec<ArrayRef>>();
874 Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
875}