1use crate::connection::{RemoteDbType, big_decimal_to_i128, projections_contains};
2use crate::{
3 Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4 RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13 ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14 Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15 IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
16 Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
17 TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20 DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use datafusion::prelude::Expr;
27use derive_getters::Getters;
28use derive_with::With;
29use futures::StreamExt;
30use num_bigint::{BigInt, Sign};
31use std::string::ToString;
32use std::sync::Arc;
33use std::time::{SystemTime, UNIX_EPOCH};
34
35#[derive(Debug, Clone, With, Getters)]
36pub struct PostgresConnectionOptions {
37 pub(crate) host: String,
38 pub(crate) port: u16,
39 pub(crate) username: String,
40 pub(crate) password: String,
41 pub(crate) database: Option<String>,
42 pub(crate) pool_max_size: usize,
43 pub(crate) stream_chunk_size: usize,
44}
45
46impl PostgresConnectionOptions {
47 pub fn new(
48 host: impl Into<String>,
49 port: u16,
50 username: impl Into<String>,
51 password: impl Into<String>,
52 ) -> Self {
53 Self {
54 host: host.into(),
55 port,
56 username: username.into(),
57 password: password.into(),
58 database: None,
59 pool_max_size: 10,
60 stream_chunk_size: 2048,
61 }
62 }
63}
64
65#[derive(Debug)]
66pub struct PostgresPool {
67 pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
68}
69
70#[async_trait::async_trait]
71impl Pool for PostgresPool {
72 async fn get(&self) -> DFResult<Arc<dyn Connection>> {
73 let conn = self.pool.get_owned().await.map_err(|e| {
74 DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
75 })?;
76 Ok(Arc::new(PostgresConnection { conn }))
77 }
78}
79
80pub(crate) async fn connect_postgres(
81 options: &PostgresConnectionOptions,
82) -> DFResult<PostgresPool> {
83 let mut config = bb8_postgres::tokio_postgres::config::Config::new();
84 config
85 .host(&options.host)
86 .port(options.port)
87 .user(&options.username)
88 .password(&options.password);
89 if let Some(database) = &options.database {
90 config.dbname(database);
91 }
92 let manager = PostgresConnectionManager::new(config, NoTls);
93 let pool = bb8::Pool::builder()
94 .max_size(options.pool_max_size as u32)
95 .build(manager)
96 .await
97 .map_err(|e| {
98 DataFusionError::Execution(format!(
99 "Failed to create postgres connection pool due to {e}",
100 ))
101 })?;
102
103 Ok(PostgresPool { pool })
104}
105
106#[derive(Debug)]
107pub(crate) struct PostgresConnection {
108 conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
109}
110
111#[async_trait::async_trait]
112impl Connection for PostgresConnection {
113 async fn infer_schema(&self, sql: &str) -> DFResult<RemoteSchemaRef> {
114 let sql = RemoteDbType::Postgres.query_limit_1(sql)?;
115 let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
116 DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
117 })?;
118 let remote_schema = Arc::new(build_remote_schema(&row)?);
119 Ok(remote_schema)
120 }
121
122 async fn query(
123 &self,
124 conn_options: &ConnectionOptions,
125 sql: &str,
126 table_schema: SchemaRef,
127 projection: Option<&Vec<usize>>,
128 filters: &[Expr],
129 limit: Option<usize>,
130 ) -> DFResult<SendableRecordBatchStream> {
131 let projected_schema = project_schema(&table_schema, projection)?;
132 let sql = RemoteDbType::Postgres.try_rewrite_query(sql, filters, limit)?;
133 let projection = projection.cloned();
134 let chunk_size = conn_options.stream_chunk_size();
135 let stream = self
136 .conn
137 .query_raw(&sql, Vec::<String>::new())
138 .await
139 .map_err(|e| {
140 DataFusionError::Execution(format!(
141 "Failed to execute query {sql} on postgres: {e}",
142 ))
143 })?
144 .chunks(chunk_size)
145 .boxed();
146
147 let stream = stream.map(move |rows| {
148 let rows: Vec<Row> = rows
149 .into_iter()
150 .collect::<Result<Vec<_>, _>>()
151 .map_err(|e| {
152 DataFusionError::Execution(format!(
153 "Failed to collect rows from postgres due to {e}",
154 ))
155 })?;
156 rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
157 });
158
159 Ok(Box::pin(RecordBatchStreamAdapter::new(
160 projected_schema,
161 stream,
162 )))
163 }
164}
165
166fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<PostgresType> {
167 match pg_type {
168 &Type::INT2 => Ok(PostgresType::Int2),
169 &Type::INT4 => Ok(PostgresType::Int4),
170 &Type::INT8 => Ok(PostgresType::Int8),
171 &Type::FLOAT4 => Ok(PostgresType::Float4),
172 &Type::FLOAT8 => Ok(PostgresType::Float8),
173 &Type::NUMERIC => {
174 let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
175 DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
176 })?;
177 let scale = match v {
178 Some(v) => v.scale,
179 None => 0,
180 };
181 assert!((scale as u32) <= (i8::MAX as u32));
182 Ok(PostgresType::Numeric(scale.try_into().unwrap_or_default()))
183 }
184 &Type::OID => Ok(PostgresType::Oid),
185 &Type::NAME => Ok(PostgresType::Name),
186 &Type::VARCHAR => Ok(PostgresType::Varchar),
187 &Type::BPCHAR => Ok(PostgresType::Bpchar),
188 &Type::TEXT => Ok(PostgresType::Text),
189 &Type::BYTEA => Ok(PostgresType::Bytea),
190 &Type::DATE => Ok(PostgresType::Date),
191 &Type::TIMESTAMP => Ok(PostgresType::Timestamp),
192 &Type::TIMESTAMPTZ => Ok(PostgresType::TimestampTz),
193 &Type::TIME => Ok(PostgresType::Time),
194 &Type::INTERVAL => Ok(PostgresType::Interval),
195 &Type::BOOL => Ok(PostgresType::Bool),
196 &Type::JSON => Ok(PostgresType::Json),
197 &Type::JSONB => Ok(PostgresType::Jsonb),
198 &Type::INT2_ARRAY => Ok(PostgresType::Int2Array),
199 &Type::INT4_ARRAY => Ok(PostgresType::Int4Array),
200 &Type::INT8_ARRAY => Ok(PostgresType::Int8Array),
201 &Type::FLOAT4_ARRAY => Ok(PostgresType::Float4Array),
202 &Type::FLOAT8_ARRAY => Ok(PostgresType::Float8Array),
203 &Type::VARCHAR_ARRAY => Ok(PostgresType::VarcharArray),
204 &Type::BPCHAR_ARRAY => Ok(PostgresType::BpcharArray),
205 &Type::TEXT_ARRAY => Ok(PostgresType::TextArray),
206 &Type::BYTEA_ARRAY => Ok(PostgresType::ByteaArray),
207 &Type::BOOL_ARRAY => Ok(PostgresType::BoolArray),
208 other if other.name().eq_ignore_ascii_case("geometry") => Ok(PostgresType::PostGisGeometry),
209 _ => Err(DataFusionError::NotImplemented(format!(
210 "Unsupported postgres type {pg_type:?}",
211 ))),
212 }
213}
214
215fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
216 let mut remote_fields = vec![];
217 for (idx, col) in row.columns().iter().enumerate() {
218 remote_fields.push(RemoteField::new(
219 col.name(),
220 RemoteType::Postgres(pg_type_to_remote_type(col.type_(), row, idx)?),
221 true,
222 ));
223 }
224 Ok(RemoteSchema::new(remote_fields))
225}
226
227macro_rules! handle_primitive_type {
228 ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
229 let builder = $builder
230 .as_any_mut()
231 .downcast_mut::<$builder_ty>()
232 .unwrap_or_else(|| {
233 panic!(
234 "Failed to downcast builder to {} for {:?} and {:?}",
235 stringify!($builder_ty),
236 $field,
237 $col
238 )
239 });
240 let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
241 DataFusionError::Execution(format!(
242 "Failed to get {} value for {:?} and {:?}: {e:?}",
243 stringify!($value_ty),
244 $field,
245 $col
246 ))
247 })?;
248
249 match v {
250 Some(v) => builder.append_value($convert(v)?),
251 None => builder.append_null(),
252 }
253 }};
254}
255
256macro_rules! handle_primitive_array_type {
257 ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
258 let builder = $builder
259 .as_any_mut()
260 .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
261 .unwrap_or_else(|| {
262 panic!(
263 "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
264 $field, $col
265 )
266 });
267 let values_builder = builder
268 .values()
269 .as_any_mut()
270 .downcast_mut::<$values_builder_ty>()
271 .unwrap_or_else(|| {
272 panic!(
273 "Failed to downcast values builder to {} for {:?} and {:?}",
274 stringify!($builder_ty),
275 $field,
276 $col,
277 )
278 });
279 let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
280 DataFusionError::Execution(format!(
281 "Failed to get {} array value for {:?} and {:?}: {e:?}",
282 stringify!($value_ty),
283 $field,
284 $col,
285 ))
286 })?;
287
288 match v {
289 Some(v) => {
290 let v = v.into_iter().map(Some);
291 values_builder.extend(v);
292 builder.append(true);
293 }
294 None => builder.append_null(),
295 }
296 }};
297}
298
299#[derive(Debug)]
300struct BigDecimalFromSql {
301 inner: BigDecimal,
302 scale: u16,
303}
304
305impl BigDecimalFromSql {
306 fn to_decimal_128(&self) -> Option<i128> {
307 big_decimal_to_i128(&self.inner, Some(self.scale as i32))
308 }
309}
310
311#[allow(clippy::cast_sign_loss)]
312#[allow(clippy::cast_possible_wrap)]
313#[allow(clippy::cast_possible_truncation)]
314impl<'a> FromSql<'a> for BigDecimalFromSql {
315 fn from_sql(
316 _ty: &Type,
317 raw: &'a [u8],
318 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
319 let raw_u16: Vec<u16> = raw
320 .chunks(2)
321 .map(|chunk| {
322 if chunk.len() == 2 {
323 u16::from_be_bytes([chunk[0], chunk[1]])
324 } else {
325 u16::from_be_bytes([chunk[0], 0])
326 }
327 })
328 .collect();
329
330 let base_10_000_digit_count = raw_u16[0];
331 let weight = raw_u16[1] as i16;
332 let sign = raw_u16[2];
333 let scale = raw_u16[3];
334
335 let mut base_10_000_digits = Vec::new();
336 for i in 4..4 + base_10_000_digit_count {
337 base_10_000_digits.push(raw_u16[i as usize]);
338 }
339
340 let mut u8_digits = Vec::new();
341 for &base_10_000_digit in base_10_000_digits.iter().rev() {
342 let mut base_10_000_digit = base_10_000_digit;
343 let mut temp_result = Vec::new();
344 while base_10_000_digit > 0 {
345 temp_result.push((base_10_000_digit % 10) as u8);
346 base_10_000_digit /= 10;
347 }
348 while temp_result.len() < 4 {
349 temp_result.push(0);
350 }
351 u8_digits.extend(temp_result);
352 }
353 u8_digits.reverse();
354
355 let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
356 let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
357 u8_digits.resize(size as usize, 0);
358
359 let sign = match sign {
360 0x4000 => Sign::Minus,
361 0x0000 => Sign::Plus,
362 _ => {
363 return Err(Box::new(DataFusionError::Execution(
364 "Failed to parse big decimal from postgres numeric value".to_string(),
365 )));
366 }
367 };
368
369 let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
370 return Err(Box::new(DataFusionError::Execution(
371 "Failed to parse big decimal from postgres numeric value".to_string(),
372 )));
373 };
374 Ok(BigDecimalFromSql {
375 inner: BigDecimal::new(digits, i64::from(scale)),
376 scale,
377 })
378 }
379
380 fn accepts(ty: &Type) -> bool {
381 matches!(*ty, Type::NUMERIC)
382 }
383}
384
385#[derive(Debug)]
388struct IntervalFromSql {
389 time: i64,
390 day: i32,
391 month: i32,
392}
393
394impl<'a> FromSql<'a> for IntervalFromSql {
395 fn from_sql(
396 _ty: &Type,
397 raw: &'a [u8],
398 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
399 let mut cursor = std::io::Cursor::new(raw);
400
401 let time = cursor.read_i64::<BigEndian>()?;
402 let day = cursor.read_i32::<BigEndian>()?;
403 let month = cursor.read_i32::<BigEndian>()?;
404
405 Ok(IntervalFromSql { time, day, month })
406 }
407
408 fn accepts(ty: &Type) -> bool {
409 matches!(*ty, Type::INTERVAL)
410 }
411}
412
413struct GeometryFromSql<'a> {
414 wkb: &'a [u8],
415}
416
417impl<'a> FromSql<'a> for GeometryFromSql<'a> {
418 fn from_sql(
419 _ty: &Type,
420 raw: &'a [u8],
421 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
422 Ok(GeometryFromSql { wkb: raw })
423 }
424
425 fn accepts(ty: &Type) -> bool {
426 matches!(ty.name(), "geometry")
427 }
428}
429
430fn rows_to_batch(
431 rows: &[Row],
432 table_schema: &SchemaRef,
433 projection: Option<&Vec<usize>>,
434) -> DFResult<RecordBatch> {
435 let projected_schema = project_schema(table_schema, projection)?;
436 let mut array_builders = vec![];
437 for field in table_schema.fields() {
438 let builder = make_builder(field.data_type(), rows.len());
439 array_builders.push(builder);
440 }
441
442 for row in rows {
443 for (idx, field) in table_schema.fields.iter().enumerate() {
444 if !projections_contains(projection, idx) {
445 continue;
446 }
447 let builder = &mut array_builders[idx];
448 let col = row.columns().get(idx);
449 match field.data_type() {
450 DataType::Int16 => {
451 handle_primitive_type!(builder, field, col, Int16Builder, i16, row, idx, |v| {
452 Ok::<_, DataFusionError>(v)
453 });
454 }
455 DataType::Int32 => {
456 handle_primitive_type!(builder, field, col, Int32Builder, i32, row, idx, |v| {
457 Ok::<_, DataFusionError>(v)
458 });
459 }
460 DataType::UInt32 => {
461 handle_primitive_type!(
462 builder,
463 field,
464 col,
465 UInt32Builder,
466 u32,
467 row,
468 idx,
469 |v| { Ok::<_, DataFusionError>(v) }
470 );
471 }
472 DataType::Int64 => {
473 handle_primitive_type!(builder, field, col, Int64Builder, i64, row, idx, |v| {
474 Ok::<_, DataFusionError>(v)
475 });
476 }
477 DataType::Float32 => {
478 handle_primitive_type!(
479 builder,
480 field,
481 col,
482 Float32Builder,
483 f32,
484 row,
485 idx,
486 |v| { Ok::<_, DataFusionError>(v) }
487 );
488 }
489 DataType::Float64 => {
490 handle_primitive_type!(
491 builder,
492 field,
493 col,
494 Float64Builder,
495 f64,
496 row,
497 idx,
498 |v| { Ok::<_, DataFusionError>(v) }
499 );
500 }
501 DataType::Decimal128(_precision, _scale) => {
502 handle_primitive_type!(
503 builder,
504 field,
505 col,
506 Decimal128Builder,
507 BigDecimalFromSql,
508 row,
509 idx,
510 |v: BigDecimalFromSql| {
511 v.to_decimal_128().ok_or_else(|| {
512 DataFusionError::Execution(format!(
513 "Failed to convert BigDecimal {v:?} to i128",
514 ))
515 })
516 }
517 );
518 }
519 DataType::Utf8 => {
520 handle_primitive_type!(
521 builder,
522 field,
523 col,
524 StringBuilder,
525 &str,
526 row,
527 idx,
528 |v| { Ok::<_, DataFusionError>(v) }
529 );
530 }
531 DataType::LargeUtf8 => {
532 if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
533 handle_primitive_type!(
534 builder,
535 field,
536 col,
537 LargeStringBuilder,
538 serde_json::value::Value,
539 row,
540 idx,
541 |v: serde_json::value::Value| {
542 Ok::<_, DataFusionError>(v.to_string())
543 }
544 );
545 } else {
546 handle_primitive_type!(
547 builder,
548 field,
549 col,
550 LargeStringBuilder,
551 &str,
552 row,
553 idx,
554 |v| { Ok::<_, DataFusionError>(v) }
555 );
556 }
557 }
558 DataType::Binary => {
559 if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
560 {
561 let convert: for<'a> fn(GeometryFromSql<'a>) -> DFResult<&'a [u8]> =
562 |v| Ok(v.wkb);
563 handle_primitive_type!(
564 builder,
565 field,
566 col,
567 BinaryBuilder,
568 GeometryFromSql,
569 row,
570 idx,
571 convert
572 );
573 } else if col.is_some()
574 && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
575 {
576 handle_primitive_type!(
577 builder,
578 field,
579 col,
580 BinaryBuilder,
581 serde_json::value::Value,
582 row,
583 idx,
584 |v: serde_json::value::Value| {
585 Ok::<_, DataFusionError>(v.to_string().into_bytes())
586 }
587 );
588 } else {
589 handle_primitive_type!(
590 builder,
591 field,
592 col,
593 BinaryBuilder,
594 Vec<u8>,
595 row,
596 idx,
597 |v| { Ok::<_, DataFusionError>(v) }
598 );
599 }
600 }
601 DataType::Timestamp(TimeUnit::Microsecond, None) => {
602 handle_primitive_type!(
603 builder,
604 field,
605 col,
606 TimestampMicrosecondBuilder,
607 SystemTime,
608 row,
609 idx,
610 |v: SystemTime| {
611 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
612 let timestamp: i64 = v
613 .as_micros()
614 .try_into()
615 .expect("Failed to convert SystemTime to i64");
616 Ok(timestamp)
617 } else {
618 Err(DataFusionError::Execution(format!(
619 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
620 )))
621 }
622 }
623 );
624 }
625 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
626 handle_primitive_type!(
627 builder,
628 field,
629 col,
630 TimestampNanosecondBuilder,
631 SystemTime,
632 row,
633 idx,
634 |v: SystemTime| {
635 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
636 let timestamp: i64 = v
637 .as_nanos()
638 .try_into()
639 .expect("Failed to convert SystemTime to i64");
640 Ok(timestamp)
641 } else {
642 Err(DataFusionError::Execution(format!(
643 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
644 )))
645 }
646 }
647 );
648 }
649 DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
650 handle_primitive_type!(
651 builder,
652 field,
653 col,
654 TimestampNanosecondBuilder,
655 chrono::DateTime<chrono::Utc>,
656 row,
657 idx,
658 |v: chrono::DateTime<chrono::Utc>| {
659 let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
660 Ok::<_, DataFusionError>(timestamp)
661 }
662 );
663 }
664 DataType::Time64(TimeUnit::Microsecond) => {
665 handle_primitive_type!(
666 builder,
667 field,
668 col,
669 Time64MicrosecondBuilder,
670 chrono::NaiveTime,
671 row,
672 idx,
673 |v: chrono::NaiveTime| {
674 let seconds = i64::from(v.num_seconds_from_midnight());
675 let microseconds = i64::from(v.nanosecond()) / 1000;
676 Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
677 }
678 );
679 }
680 DataType::Time64(TimeUnit::Nanosecond) => {
681 handle_primitive_type!(
682 builder,
683 field,
684 col,
685 Time64NanosecondBuilder,
686 chrono::NaiveTime,
687 row,
688 idx,
689 |v: chrono::NaiveTime| {
690 let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
691 * 1_000_000_000
692 + i64::from(v.nanosecond());
693 Ok::<_, DataFusionError>(timestamp)
694 }
695 );
696 }
697 DataType::Date32 => {
698 handle_primitive_type!(
699 builder,
700 field,
701 col,
702 Date32Builder,
703 chrono::NaiveDate,
704 row,
705 idx,
706 |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
707 );
708 }
709 DataType::Interval(IntervalUnit::MonthDayNano) => {
710 handle_primitive_type!(
711 builder,
712 field,
713 col,
714 IntervalMonthDayNanoBuilder,
715 IntervalFromSql,
716 row,
717 idx,
718 |v: IntervalFromSql| {
719 let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
720 v.month,
721 v.day,
722 v.time * 1_000,
723 );
724 Ok::<_, DataFusionError>(interval_month_day_nano)
725 }
726 );
727 }
728 DataType::Boolean => {
729 handle_primitive_type!(
730 builder,
731 field,
732 col,
733 BooleanBuilder,
734 bool,
735 row,
736 idx,
737 |v| { Ok::<_, DataFusionError>(v) }
738 );
739 }
740 DataType::List(inner) => match inner.data_type() {
741 DataType::Int16 => {
742 handle_primitive_array_type!(
743 builder,
744 field,
745 col,
746 Int16Builder,
747 i16,
748 row,
749 idx
750 );
751 }
752 DataType::Int32 => {
753 handle_primitive_array_type!(
754 builder,
755 field,
756 col,
757 Int32Builder,
758 i32,
759 row,
760 idx
761 );
762 }
763 DataType::Int64 => {
764 handle_primitive_array_type!(
765 builder,
766 field,
767 col,
768 Int64Builder,
769 i64,
770 row,
771 idx
772 );
773 }
774 DataType::Float32 => {
775 handle_primitive_array_type!(
776 builder,
777 field,
778 col,
779 Float32Builder,
780 f32,
781 row,
782 idx
783 );
784 }
785 DataType::Float64 => {
786 handle_primitive_array_type!(
787 builder,
788 field,
789 col,
790 Float64Builder,
791 f64,
792 row,
793 idx
794 );
795 }
796 DataType::Utf8 => {
797 handle_primitive_array_type!(
798 builder,
799 field,
800 col,
801 StringBuilder,
802 &str,
803 row,
804 idx
805 );
806 }
807 DataType::Binary => {
808 handle_primitive_array_type!(
809 builder,
810 field,
811 col,
812 BinaryBuilder,
813 Vec<u8>,
814 row,
815 idx
816 );
817 }
818 DataType::Boolean => {
819 handle_primitive_array_type!(
820 builder,
821 field,
822 col,
823 BooleanBuilder,
824 bool,
825 row,
826 idx
827 );
828 }
829 _ => {
830 return Err(DataFusionError::NotImplemented(format!(
831 "Unsupported list data type {:?} for col: {:?}",
832 field.data_type(),
833 col
834 )));
835 }
836 },
837 _ => {
838 return Err(DataFusionError::NotImplemented(format!(
839 "Unsupported data type {:?} for col: {:?}",
840 field.data_type(),
841 col
842 )));
843 }
844 }
845 }
846 }
847 let projected_columns = array_builders
848 .into_iter()
849 .enumerate()
850 .filter(|(idx, _)| projections_contains(projection, *idx))
851 .map(|(_, mut builder)| builder.finish())
852 .collect::<Vec<ArrayRef>>();
853 Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
854}