1use crate::connection::{RemoteDbType, big_decimal_to_i128, just_return, projections_contains};
2use crate::{
3 Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4 RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13 ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14 Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15 IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, RecordBatchOptions,
16 StringBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
17 TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20 DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use derive_getters::Getters;
27use derive_with::With;
28use futures::StreamExt;
29use log::debug;
30use num_bigint::{BigInt, Sign};
31use std::string::ToString;
32use std::sync::Arc;
33use std::time::{SystemTime, UNIX_EPOCH};
34
35#[derive(Debug, Clone, With, Getters)]
36pub struct PostgresConnectionOptions {
37 pub(crate) host: String,
38 pub(crate) port: u16,
39 pub(crate) username: String,
40 pub(crate) password: String,
41 pub(crate) database: Option<String>,
42 pub(crate) pool_max_size: usize,
43 pub(crate) stream_chunk_size: usize,
44}
45
46impl PostgresConnectionOptions {
47 pub fn new(
48 host: impl Into<String>,
49 port: u16,
50 username: impl Into<String>,
51 password: impl Into<String>,
52 ) -> Self {
53 Self {
54 host: host.into(),
55 port,
56 username: username.into(),
57 password: password.into(),
58 database: None,
59 pool_max_size: 10,
60 stream_chunk_size: 2048,
61 }
62 }
63}
64
65#[derive(Debug)]
66pub struct PostgresPool {
67 pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
68}
69
70#[async_trait::async_trait]
71impl Pool for PostgresPool {
72 async fn get(&self) -> DFResult<Arc<dyn Connection>> {
73 let conn = self.pool.get_owned().await.map_err(|e| {
74 DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
75 })?;
76 Ok(Arc::new(PostgresConnection { conn }))
77 }
78}
79
80pub(crate) async fn connect_postgres(
81 options: &PostgresConnectionOptions,
82) -> DFResult<PostgresPool> {
83 let mut config = bb8_postgres::tokio_postgres::config::Config::new();
84 config
85 .host(&options.host)
86 .port(options.port)
87 .user(&options.username)
88 .password(&options.password);
89 if let Some(database) = &options.database {
90 config.dbname(database);
91 }
92 let manager = PostgresConnectionManager::new(config, NoTls);
93 let pool = bb8::Pool::builder()
94 .max_size(options.pool_max_size as u32)
95 .build(manager)
96 .await
97 .map_err(|e| {
98 DataFusionError::Execution(format!(
99 "Failed to create postgres connection pool due to {e}",
100 ))
101 })?;
102
103 Ok(PostgresPool { pool })
104}
105
106#[derive(Debug)]
107pub(crate) struct PostgresConnection {
108 conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
109}
110
111#[async_trait::async_trait]
112impl Connection for PostgresConnection {
113 async fn infer_schema(&self, sql: &str) -> DFResult<RemoteSchemaRef> {
114 let sql = RemoteDbType::Postgres.query_limit_1(sql);
115 let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
116 DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
117 })?;
118 let remote_schema = Arc::new(build_remote_schema(&row)?);
119 Ok(remote_schema)
120 }
121
122 async fn query(
123 &self,
124 conn_options: &ConnectionOptions,
125 sql: &str,
126 table_schema: SchemaRef,
127 projection: Option<&Vec<usize>>,
128 unparsed_filters: &[String],
129 limit: Option<usize>,
130 ) -> DFResult<SendableRecordBatchStream> {
131 let projected_schema = project_schema(&table_schema, projection)?;
132
133 let sql = RemoteDbType::Postgres.rewrite_query(sql, unparsed_filters, limit);
134 debug!("[remote-table] executing postgres query: {sql}");
135
136 let projection = projection.cloned();
137 let chunk_size = conn_options.stream_chunk_size();
138 let stream = self
139 .conn
140 .query_raw(&sql, Vec::<String>::new())
141 .await
142 .map_err(|e| {
143 DataFusionError::Execution(format!(
144 "Failed to execute query {sql} on postgres: {e}",
145 ))
146 })?
147 .chunks(chunk_size)
148 .boxed();
149
150 let stream = stream.map(move |rows| {
151 let rows: Vec<Row> = rows
152 .into_iter()
153 .collect::<Result<Vec<_>, _>>()
154 .map_err(|e| {
155 DataFusionError::Execution(format!(
156 "Failed to collect rows from postgres due to {e}",
157 ))
158 })?;
159 rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
160 });
161
162 Ok(Box::pin(RecordBatchStreamAdapter::new(
163 projected_schema,
164 stream,
165 )))
166 }
167}
168
169fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<PostgresType> {
170 match pg_type {
171 &Type::INT2 => Ok(PostgresType::Int2),
172 &Type::INT4 => Ok(PostgresType::Int4),
173 &Type::INT8 => Ok(PostgresType::Int8),
174 &Type::FLOAT4 => Ok(PostgresType::Float4),
175 &Type::FLOAT8 => Ok(PostgresType::Float8),
176 &Type::NUMERIC => {
177 let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
178 DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
179 })?;
180 let scale = match v {
181 Some(v) => v.scale,
182 None => 0,
183 };
184 assert!((scale as u32) <= (i8::MAX as u32));
185 Ok(PostgresType::Numeric(scale.try_into().unwrap_or_default()))
186 }
187 &Type::OID => Ok(PostgresType::Oid),
188 &Type::NAME => Ok(PostgresType::Name),
189 &Type::VARCHAR => Ok(PostgresType::Varchar),
190 &Type::BPCHAR => Ok(PostgresType::Bpchar),
191 &Type::TEXT => Ok(PostgresType::Text),
192 &Type::BYTEA => Ok(PostgresType::Bytea),
193 &Type::DATE => Ok(PostgresType::Date),
194 &Type::TIMESTAMP => Ok(PostgresType::Timestamp),
195 &Type::TIMESTAMPTZ => Ok(PostgresType::TimestampTz),
196 &Type::TIME => Ok(PostgresType::Time),
197 &Type::INTERVAL => Ok(PostgresType::Interval),
198 &Type::BOOL => Ok(PostgresType::Bool),
199 &Type::JSON => Ok(PostgresType::Json),
200 &Type::JSONB => Ok(PostgresType::Jsonb),
201 &Type::INT2_ARRAY => Ok(PostgresType::Int2Array),
202 &Type::INT4_ARRAY => Ok(PostgresType::Int4Array),
203 &Type::INT8_ARRAY => Ok(PostgresType::Int8Array),
204 &Type::FLOAT4_ARRAY => Ok(PostgresType::Float4Array),
205 &Type::FLOAT8_ARRAY => Ok(PostgresType::Float8Array),
206 &Type::VARCHAR_ARRAY => Ok(PostgresType::VarcharArray),
207 &Type::BPCHAR_ARRAY => Ok(PostgresType::BpcharArray),
208 &Type::TEXT_ARRAY => Ok(PostgresType::TextArray),
209 &Type::BYTEA_ARRAY => Ok(PostgresType::ByteaArray),
210 &Type::BOOL_ARRAY => Ok(PostgresType::BoolArray),
211 other if other.name().eq_ignore_ascii_case("geometry") => Ok(PostgresType::PostGisGeometry),
212 _ => Err(DataFusionError::NotImplemented(format!(
213 "Unsupported postgres type {pg_type:?}",
214 ))),
215 }
216}
217
218fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
219 let mut remote_fields = vec![];
220 for (idx, col) in row.columns().iter().enumerate() {
221 remote_fields.push(RemoteField::new(
222 col.name(),
223 RemoteType::Postgres(pg_type_to_remote_type(col.type_(), row, idx)?),
224 true,
225 ));
226 }
227 Ok(RemoteSchema::new(remote_fields))
228}
229
230macro_rules! handle_primitive_type {
231 ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
232 let builder = $builder
233 .as_any_mut()
234 .downcast_mut::<$builder_ty>()
235 .unwrap_or_else(|| {
236 panic!(
237 "Failed to downcast builder to {} for {:?} and {:?}",
238 stringify!($builder_ty),
239 $field,
240 $col
241 )
242 });
243 let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
244 DataFusionError::Execution(format!(
245 "Failed to get {} value for {:?} and {:?}: {e:?}",
246 stringify!($value_ty),
247 $field,
248 $col
249 ))
250 })?;
251
252 match v {
253 Some(v) => builder.append_value($convert(v)?),
254 None => builder.append_null(),
255 }
256 }};
257}
258
259macro_rules! handle_primitive_array_type {
260 ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
261 let builder = $builder
262 .as_any_mut()
263 .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
264 .unwrap_or_else(|| {
265 panic!(
266 "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
267 $field, $col
268 )
269 });
270 let values_builder = builder
271 .values()
272 .as_any_mut()
273 .downcast_mut::<$values_builder_ty>()
274 .unwrap_or_else(|| {
275 panic!(
276 "Failed to downcast values builder to {} for {:?} and {:?}",
277 stringify!($builder_ty),
278 $field,
279 $col,
280 )
281 });
282 let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
283 DataFusionError::Execution(format!(
284 "Failed to get {} array value for {:?} and {:?}: {e:?}",
285 stringify!($value_ty),
286 $field,
287 $col,
288 ))
289 })?;
290
291 match v {
292 Some(v) => {
293 let v = v.into_iter().map(Some);
294 values_builder.extend(v);
295 builder.append(true);
296 }
297 None => builder.append_null(),
298 }
299 }};
300}
301
302#[derive(Debug)]
303struct BigDecimalFromSql {
304 inner: BigDecimal,
305 scale: u16,
306}
307
308impl BigDecimalFromSql {
309 fn to_decimal_128(&self) -> Option<i128> {
310 big_decimal_to_i128(&self.inner, Some(self.scale as i32))
311 }
312}
313
314#[allow(clippy::cast_sign_loss)]
315#[allow(clippy::cast_possible_wrap)]
316#[allow(clippy::cast_possible_truncation)]
317impl<'a> FromSql<'a> for BigDecimalFromSql {
318 fn from_sql(
319 _ty: &Type,
320 raw: &'a [u8],
321 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
322 let raw_u16: Vec<u16> = raw
323 .chunks(2)
324 .map(|chunk| {
325 if chunk.len() == 2 {
326 u16::from_be_bytes([chunk[0], chunk[1]])
327 } else {
328 u16::from_be_bytes([chunk[0], 0])
329 }
330 })
331 .collect();
332
333 let base_10_000_digit_count = raw_u16[0];
334 let weight = raw_u16[1] as i16;
335 let sign = raw_u16[2];
336 let scale = raw_u16[3];
337
338 let mut base_10_000_digits = Vec::new();
339 for i in 4..4 + base_10_000_digit_count {
340 base_10_000_digits.push(raw_u16[i as usize]);
341 }
342
343 let mut u8_digits = Vec::new();
344 for &base_10_000_digit in base_10_000_digits.iter().rev() {
345 let mut base_10_000_digit = base_10_000_digit;
346 let mut temp_result = Vec::new();
347 while base_10_000_digit > 0 {
348 temp_result.push((base_10_000_digit % 10) as u8);
349 base_10_000_digit /= 10;
350 }
351 while temp_result.len() < 4 {
352 temp_result.push(0);
353 }
354 u8_digits.extend(temp_result);
355 }
356 u8_digits.reverse();
357
358 let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
359 let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
360 u8_digits.resize(size as usize, 0);
361
362 let sign = match sign {
363 0x4000 => Sign::Minus,
364 0x0000 => Sign::Plus,
365 _ => {
366 return Err(Box::new(DataFusionError::Execution(
367 "Failed to parse big decimal from postgres numeric value".to_string(),
368 )));
369 }
370 };
371
372 let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
373 return Err(Box::new(DataFusionError::Execution(
374 "Failed to parse big decimal from postgres numeric value".to_string(),
375 )));
376 };
377 Ok(BigDecimalFromSql {
378 inner: BigDecimal::new(digits, i64::from(scale)),
379 scale,
380 })
381 }
382
383 fn accepts(ty: &Type) -> bool {
384 matches!(*ty, Type::NUMERIC)
385 }
386}
387
388#[derive(Debug)]
391struct IntervalFromSql {
392 time: i64,
393 day: i32,
394 month: i32,
395}
396
397impl<'a> FromSql<'a> for IntervalFromSql {
398 fn from_sql(
399 _ty: &Type,
400 raw: &'a [u8],
401 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
402 let mut cursor = std::io::Cursor::new(raw);
403
404 let time = cursor.read_i64::<BigEndian>()?;
405 let day = cursor.read_i32::<BigEndian>()?;
406 let month = cursor.read_i32::<BigEndian>()?;
407
408 Ok(IntervalFromSql { time, day, month })
409 }
410
411 fn accepts(ty: &Type) -> bool {
412 matches!(*ty, Type::INTERVAL)
413 }
414}
415
416struct GeometryFromSql<'a> {
417 wkb: &'a [u8],
418}
419
420impl<'a> FromSql<'a> for GeometryFromSql<'a> {
421 fn from_sql(
422 _ty: &Type,
423 raw: &'a [u8],
424 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
425 Ok(GeometryFromSql { wkb: raw })
426 }
427
428 fn accepts(ty: &Type) -> bool {
429 matches!(ty.name(), "geometry")
430 }
431}
432
433fn rows_to_batch(
434 rows: &[Row],
435 table_schema: &SchemaRef,
436 projection: Option<&Vec<usize>>,
437) -> DFResult<RecordBatch> {
438 let projected_schema = project_schema(table_schema, projection)?;
439 let mut array_builders = vec![];
440 for field in table_schema.fields() {
441 let builder = make_builder(field.data_type(), rows.len());
442 array_builders.push(builder);
443 }
444
445 for row in rows {
446 for (idx, field) in table_schema.fields.iter().enumerate() {
447 if !projections_contains(projection, idx) {
448 continue;
449 }
450 let builder = &mut array_builders[idx];
451 let col = row.columns().get(idx);
452 match field.data_type() {
453 DataType::Int16 => {
454 handle_primitive_type!(
455 builder,
456 field,
457 col,
458 Int16Builder,
459 i16,
460 row,
461 idx,
462 just_return
463 );
464 }
465 DataType::Int32 => {
466 handle_primitive_type!(
467 builder,
468 field,
469 col,
470 Int32Builder,
471 i32,
472 row,
473 idx,
474 just_return
475 );
476 }
477 DataType::UInt32 => {
478 handle_primitive_type!(
479 builder,
480 field,
481 col,
482 UInt32Builder,
483 u32,
484 row,
485 idx,
486 just_return
487 );
488 }
489 DataType::Int64 => {
490 handle_primitive_type!(
491 builder,
492 field,
493 col,
494 Int64Builder,
495 i64,
496 row,
497 idx,
498 just_return
499 );
500 }
501 DataType::Float32 => {
502 handle_primitive_type!(
503 builder,
504 field,
505 col,
506 Float32Builder,
507 f32,
508 row,
509 idx,
510 just_return
511 );
512 }
513 DataType::Float64 => {
514 handle_primitive_type!(
515 builder,
516 field,
517 col,
518 Float64Builder,
519 f64,
520 row,
521 idx,
522 just_return
523 );
524 }
525 DataType::Decimal128(_precision, _scale) => {
526 handle_primitive_type!(
527 builder,
528 field,
529 col,
530 Decimal128Builder,
531 BigDecimalFromSql,
532 row,
533 idx,
534 |v: BigDecimalFromSql| {
535 v.to_decimal_128().ok_or_else(|| {
536 DataFusionError::Execution(format!(
537 "Failed to convert BigDecimal {v:?} to i128",
538 ))
539 })
540 }
541 );
542 }
543 DataType::Utf8 => {
544 handle_primitive_type!(
545 builder,
546 field,
547 col,
548 StringBuilder,
549 &str,
550 row,
551 idx,
552 just_return
553 );
554 }
555 DataType::LargeUtf8 => {
556 if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
557 handle_primitive_type!(
558 builder,
559 field,
560 col,
561 LargeStringBuilder,
562 serde_json::value::Value,
563 row,
564 idx,
565 |v: serde_json::value::Value| {
566 Ok::<_, DataFusionError>(v.to_string())
567 }
568 );
569 } else {
570 handle_primitive_type!(
571 builder,
572 field,
573 col,
574 LargeStringBuilder,
575 &str,
576 row,
577 idx,
578 just_return
579 );
580 }
581 }
582 DataType::Binary => {
583 if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
584 {
585 let convert: for<'a> fn(GeometryFromSql<'a>) -> DFResult<&'a [u8]> =
586 |v| Ok(v.wkb);
587 handle_primitive_type!(
588 builder,
589 field,
590 col,
591 BinaryBuilder,
592 GeometryFromSql,
593 row,
594 idx,
595 convert
596 );
597 } else if col.is_some()
598 && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
599 {
600 handle_primitive_type!(
601 builder,
602 field,
603 col,
604 BinaryBuilder,
605 serde_json::value::Value,
606 row,
607 idx,
608 |v: serde_json::value::Value| {
609 Ok::<_, DataFusionError>(v.to_string().into_bytes())
610 }
611 );
612 } else {
613 handle_primitive_type!(
614 builder,
615 field,
616 col,
617 BinaryBuilder,
618 Vec<u8>,
619 row,
620 idx,
621 just_return
622 );
623 }
624 }
625 DataType::Timestamp(TimeUnit::Microsecond, None) => {
626 handle_primitive_type!(
627 builder,
628 field,
629 col,
630 TimestampMicrosecondBuilder,
631 SystemTime,
632 row,
633 idx,
634 |v: SystemTime| {
635 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
636 let timestamp: i64 = v
637 .as_micros()
638 .try_into()
639 .expect("Failed to convert SystemTime to i64");
640 Ok(timestamp)
641 } else {
642 Err(DataFusionError::Execution(format!(
643 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
644 )))
645 }
646 }
647 );
648 }
649 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
650 handle_primitive_type!(
651 builder,
652 field,
653 col,
654 TimestampNanosecondBuilder,
655 SystemTime,
656 row,
657 idx,
658 |v: SystemTime| {
659 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
660 let timestamp: i64 = v
661 .as_nanos()
662 .try_into()
663 .expect("Failed to convert SystemTime to i64");
664 Ok(timestamp)
665 } else {
666 Err(DataFusionError::Execution(format!(
667 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
668 )))
669 }
670 }
671 );
672 }
673 DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
674 handle_primitive_type!(
675 builder,
676 field,
677 col,
678 TimestampNanosecondBuilder,
679 chrono::DateTime<chrono::Utc>,
680 row,
681 idx,
682 |v: chrono::DateTime<chrono::Utc>| {
683 let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
684 Ok::<_, DataFusionError>(timestamp)
685 }
686 );
687 }
688 DataType::Time64(TimeUnit::Microsecond) => {
689 handle_primitive_type!(
690 builder,
691 field,
692 col,
693 Time64MicrosecondBuilder,
694 chrono::NaiveTime,
695 row,
696 idx,
697 |v: chrono::NaiveTime| {
698 let seconds = i64::from(v.num_seconds_from_midnight());
699 let microseconds = i64::from(v.nanosecond()) / 1000;
700 Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
701 }
702 );
703 }
704 DataType::Time64(TimeUnit::Nanosecond) => {
705 handle_primitive_type!(
706 builder,
707 field,
708 col,
709 Time64NanosecondBuilder,
710 chrono::NaiveTime,
711 row,
712 idx,
713 |v: chrono::NaiveTime| {
714 let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
715 * 1_000_000_000
716 + i64::from(v.nanosecond());
717 Ok::<_, DataFusionError>(timestamp)
718 }
719 );
720 }
721 DataType::Date32 => {
722 handle_primitive_type!(
723 builder,
724 field,
725 col,
726 Date32Builder,
727 chrono::NaiveDate,
728 row,
729 idx,
730 |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
731 );
732 }
733 DataType::Interval(IntervalUnit::MonthDayNano) => {
734 handle_primitive_type!(
735 builder,
736 field,
737 col,
738 IntervalMonthDayNanoBuilder,
739 IntervalFromSql,
740 row,
741 idx,
742 |v: IntervalFromSql| {
743 let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
744 v.month,
745 v.day,
746 v.time * 1_000,
747 );
748 Ok::<_, DataFusionError>(interval_month_day_nano)
749 }
750 );
751 }
752 DataType::Boolean => {
753 handle_primitive_type!(
754 builder,
755 field,
756 col,
757 BooleanBuilder,
758 bool,
759 row,
760 idx,
761 just_return
762 );
763 }
764 DataType::List(inner) => match inner.data_type() {
765 DataType::Int16 => {
766 handle_primitive_array_type!(
767 builder,
768 field,
769 col,
770 Int16Builder,
771 i16,
772 row,
773 idx
774 );
775 }
776 DataType::Int32 => {
777 handle_primitive_array_type!(
778 builder,
779 field,
780 col,
781 Int32Builder,
782 i32,
783 row,
784 idx
785 );
786 }
787 DataType::Int64 => {
788 handle_primitive_array_type!(
789 builder,
790 field,
791 col,
792 Int64Builder,
793 i64,
794 row,
795 idx
796 );
797 }
798 DataType::Float32 => {
799 handle_primitive_array_type!(
800 builder,
801 field,
802 col,
803 Float32Builder,
804 f32,
805 row,
806 idx
807 );
808 }
809 DataType::Float64 => {
810 handle_primitive_array_type!(
811 builder,
812 field,
813 col,
814 Float64Builder,
815 f64,
816 row,
817 idx
818 );
819 }
820 DataType::Utf8 => {
821 handle_primitive_array_type!(
822 builder,
823 field,
824 col,
825 StringBuilder,
826 &str,
827 row,
828 idx
829 );
830 }
831 DataType::Binary => {
832 handle_primitive_array_type!(
833 builder,
834 field,
835 col,
836 BinaryBuilder,
837 Vec<u8>,
838 row,
839 idx
840 );
841 }
842 DataType::Boolean => {
843 handle_primitive_array_type!(
844 builder,
845 field,
846 col,
847 BooleanBuilder,
848 bool,
849 row,
850 idx
851 );
852 }
853 _ => {
854 return Err(DataFusionError::NotImplemented(format!(
855 "Unsupported list data type {:?} for col: {:?}",
856 field.data_type(),
857 col
858 )));
859 }
860 },
861 _ => {
862 return Err(DataFusionError::NotImplemented(format!(
863 "Unsupported data type {:?} for col: {:?}",
864 field.data_type(),
865 col
866 )));
867 }
868 }
869 }
870 }
871 let projected_columns = array_builders
872 .into_iter()
873 .enumerate()
874 .filter(|(idx, _)| projections_contains(projection, *idx))
875 .map(|(_, mut builder)| builder.finish())
876 .collect::<Vec<ArrayRef>>();
877 let options = RecordBatchOptions::new().with_row_count(Some(rows.len()));
878 Ok(RecordBatch::try_new_with_options(
879 projected_schema,
880 projected_columns,
881 &options,
882 )?)
883}