1use crate::connection::{RemoteDbType, big_decimal_to_i128, just_return, projections_contains};
2use crate::{
3 Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4 RemoteSchemaRef, RemoteType,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13 ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14 Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
15 IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, RecordBatchOptions,
16 StringBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
17 TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20 DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use derive_getters::Getters;
27use derive_with::With;
28use futures::StreamExt;
29use log::debug;
30use num_bigint::{BigInt, Sign};
31use std::any::Any;
32use std::string::ToString;
33use std::sync::Arc;
34use std::time::{SystemTime, UNIX_EPOCH};
35
36#[derive(Debug, Clone, With, Getters)]
37pub struct PostgresConnectionOptions {
38 pub(crate) host: String,
39 pub(crate) port: u16,
40 pub(crate) username: String,
41 pub(crate) password: String,
42 pub(crate) database: Option<String>,
43 pub(crate) pool_max_size: usize,
44 pub(crate) stream_chunk_size: usize,
45}
46
47impl PostgresConnectionOptions {
48 pub fn new(
49 host: impl Into<String>,
50 port: u16,
51 username: impl Into<String>,
52 password: impl Into<String>,
53 ) -> Self {
54 Self {
55 host: host.into(),
56 port,
57 username: username.into(),
58 password: password.into(),
59 database: None,
60 pool_max_size: 10,
61 stream_chunk_size: 2048,
62 }
63 }
64}
65
66impl From<PostgresConnectionOptions> for ConnectionOptions {
67 fn from(options: PostgresConnectionOptions) -> Self {
68 ConnectionOptions::Postgres(options)
69 }
70}
71
72#[derive(Debug)]
73pub struct PostgresPool {
74 pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
75}
76
77#[async_trait::async_trait]
78impl Pool for PostgresPool {
79 async fn get(&self) -> DFResult<Arc<dyn Connection>> {
80 let conn = self.pool.get_owned().await.map_err(|e| {
81 DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
82 })?;
83 Ok(Arc::new(PostgresConnection { conn }))
84 }
85}
86
87pub(crate) async fn connect_postgres(
88 options: &PostgresConnectionOptions,
89) -> DFResult<PostgresPool> {
90 let mut config = bb8_postgres::tokio_postgres::config::Config::new();
91 config
92 .host(&options.host)
93 .port(options.port)
94 .user(&options.username)
95 .password(&options.password);
96 if let Some(database) = &options.database {
97 config.dbname(database);
98 }
99 let manager = PostgresConnectionManager::new(config, NoTls);
100 let pool = bb8::Pool::builder()
101 .max_size(options.pool_max_size as u32)
102 .build(manager)
103 .await
104 .map_err(|e| {
105 DataFusionError::Execution(format!(
106 "Failed to create postgres connection pool due to {e}",
107 ))
108 })?;
109
110 Ok(PostgresPool { pool })
111}
112
113#[derive(Debug)]
114pub(crate) struct PostgresConnection {
115 conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
116}
117
118#[async_trait::async_trait]
119impl Connection for PostgresConnection {
120 fn as_any(&self) -> &dyn Any {
121 self
122 }
123
124 async fn infer_schema(&self, sql: &str) -> DFResult<RemoteSchemaRef> {
125 let sql = RemoteDbType::Postgres.query_limit_1(sql);
126 let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
127 DataFusionError::Execution(format!("Failed to execute query {sql} on postgres: {e:?}",))
128 })?;
129 let remote_schema = Arc::new(build_remote_schema(&row)?);
130 Ok(remote_schema)
131 }
132
133 async fn query(
134 &self,
135 conn_options: &ConnectionOptions,
136 sql: &str,
137 table_schema: SchemaRef,
138 projection: Option<&Vec<usize>>,
139 unparsed_filters: &[String],
140 limit: Option<usize>,
141 ) -> DFResult<SendableRecordBatchStream> {
142 let projected_schema = project_schema(&table_schema, projection)?;
143
144 let sql = RemoteDbType::Postgres.rewrite_query(sql, unparsed_filters, limit);
145 debug!("[remote-table] executing postgres query: {sql}");
146
147 let projection = projection.cloned();
148 let chunk_size = conn_options.stream_chunk_size();
149 let stream = self
150 .conn
151 .query_raw(&sql, Vec::<String>::new())
152 .await
153 .map_err(|e| {
154 DataFusionError::Execution(format!(
155 "Failed to execute query {sql} on postgres: {e}",
156 ))
157 })?
158 .chunks(chunk_size)
159 .boxed();
160
161 let stream = stream.map(move |rows| {
162 let rows: Vec<Row> = rows
163 .into_iter()
164 .collect::<Result<Vec<_>, _>>()
165 .map_err(|e| {
166 DataFusionError::Execution(format!(
167 "Failed to collect rows from postgres due to {e}",
168 ))
169 })?;
170 rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
171 });
172
173 Ok(Box::pin(RecordBatchStreamAdapter::new(
174 projected_schema,
175 stream,
176 )))
177 }
178}
179
180fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<PostgresType> {
181 match pg_type {
182 &Type::INT2 => Ok(PostgresType::Int2),
183 &Type::INT4 => Ok(PostgresType::Int4),
184 &Type::INT8 => Ok(PostgresType::Int8),
185 &Type::FLOAT4 => Ok(PostgresType::Float4),
186 &Type::FLOAT8 => Ok(PostgresType::Float8),
187 &Type::NUMERIC => {
188 let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
189 DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
190 })?;
191 let scale = match v {
192 Some(v) => v.scale,
193 None => 0,
194 };
195 assert!((scale as u32) <= (i8::MAX as u32));
196 Ok(PostgresType::Numeric(scale.try_into().unwrap_or_default()))
197 }
198 &Type::OID => Ok(PostgresType::Oid),
199 &Type::NAME => Ok(PostgresType::Name),
200 &Type::VARCHAR => Ok(PostgresType::Varchar),
201 &Type::BPCHAR => Ok(PostgresType::Bpchar),
202 &Type::TEXT => Ok(PostgresType::Text),
203 &Type::BYTEA => Ok(PostgresType::Bytea),
204 &Type::DATE => Ok(PostgresType::Date),
205 &Type::TIMESTAMP => Ok(PostgresType::Timestamp),
206 &Type::TIMESTAMPTZ => Ok(PostgresType::TimestampTz),
207 &Type::TIME => Ok(PostgresType::Time),
208 &Type::INTERVAL => Ok(PostgresType::Interval),
209 &Type::BOOL => Ok(PostgresType::Bool),
210 &Type::JSON => Ok(PostgresType::Json),
211 &Type::JSONB => Ok(PostgresType::Jsonb),
212 &Type::INT2_ARRAY => Ok(PostgresType::Int2Array),
213 &Type::INT4_ARRAY => Ok(PostgresType::Int4Array),
214 &Type::INT8_ARRAY => Ok(PostgresType::Int8Array),
215 &Type::FLOAT4_ARRAY => Ok(PostgresType::Float4Array),
216 &Type::FLOAT8_ARRAY => Ok(PostgresType::Float8Array),
217 &Type::VARCHAR_ARRAY => Ok(PostgresType::VarcharArray),
218 &Type::BPCHAR_ARRAY => Ok(PostgresType::BpcharArray),
219 &Type::TEXT_ARRAY => Ok(PostgresType::TextArray),
220 &Type::BYTEA_ARRAY => Ok(PostgresType::ByteaArray),
221 &Type::BOOL_ARRAY => Ok(PostgresType::BoolArray),
222 other if other.name().eq_ignore_ascii_case("geometry") => Ok(PostgresType::PostGisGeometry),
223 _ => Err(DataFusionError::NotImplemented(format!(
224 "Unsupported postgres type {pg_type:?}",
225 ))),
226 }
227}
228
229fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
230 let mut remote_fields = vec![];
231 for (idx, col) in row.columns().iter().enumerate() {
232 remote_fields.push(RemoteField::new(
233 col.name(),
234 RemoteType::Postgres(pg_type_to_remote_type(col.type_(), row, idx)?),
235 true,
236 ));
237 }
238 Ok(RemoteSchema::new(remote_fields))
239}
240
241macro_rules! handle_primitive_type {
242 ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
243 let builder = $builder
244 .as_any_mut()
245 .downcast_mut::<$builder_ty>()
246 .unwrap_or_else(|| {
247 panic!(
248 "Failed to downcast builder to {} for {:?} and {:?}",
249 stringify!($builder_ty),
250 $field,
251 $col
252 )
253 });
254 let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
255 DataFusionError::Execution(format!(
256 "Failed to get {} value for {:?} and {:?}: {e:?}",
257 stringify!($value_ty),
258 $field,
259 $col
260 ))
261 })?;
262
263 match v {
264 Some(v) => builder.append_value($convert(v)?),
265 None => builder.append_null(),
266 }
267 }};
268}
269
270macro_rules! handle_primitive_array_type {
271 ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
272 let builder = $builder
273 .as_any_mut()
274 .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
275 .unwrap_or_else(|| {
276 panic!(
277 "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
278 $field, $col
279 )
280 });
281 let values_builder = builder
282 .values()
283 .as_any_mut()
284 .downcast_mut::<$values_builder_ty>()
285 .unwrap_or_else(|| {
286 panic!(
287 "Failed to downcast values builder to {} for {:?} and {:?}",
288 stringify!($builder_ty),
289 $field,
290 $col,
291 )
292 });
293 let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
294 DataFusionError::Execution(format!(
295 "Failed to get {} array value for {:?} and {:?}: {e:?}",
296 stringify!($value_ty),
297 $field,
298 $col,
299 ))
300 })?;
301
302 match v {
303 Some(v) => {
304 let v = v.into_iter().map(Some);
305 values_builder.extend(v);
306 builder.append(true);
307 }
308 None => builder.append_null(),
309 }
310 }};
311}
312
313#[derive(Debug)]
314struct BigDecimalFromSql {
315 inner: BigDecimal,
316 scale: u16,
317}
318
319impl BigDecimalFromSql {
320 fn to_decimal_128(&self) -> Option<i128> {
321 big_decimal_to_i128(&self.inner, Some(self.scale as i32))
322 }
323}
324
325#[allow(clippy::cast_sign_loss)]
326#[allow(clippy::cast_possible_wrap)]
327#[allow(clippy::cast_possible_truncation)]
328impl<'a> FromSql<'a> for BigDecimalFromSql {
329 fn from_sql(
330 _ty: &Type,
331 raw: &'a [u8],
332 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
333 let raw_u16: Vec<u16> = raw
334 .chunks(2)
335 .map(|chunk| {
336 if chunk.len() == 2 {
337 u16::from_be_bytes([chunk[0], chunk[1]])
338 } else {
339 u16::from_be_bytes([chunk[0], 0])
340 }
341 })
342 .collect();
343
344 let base_10_000_digit_count = raw_u16[0];
345 let weight = raw_u16[1] as i16;
346 let sign = raw_u16[2];
347 let scale = raw_u16[3];
348
349 let mut base_10_000_digits = Vec::new();
350 for i in 4..4 + base_10_000_digit_count {
351 base_10_000_digits.push(raw_u16[i as usize]);
352 }
353
354 let mut u8_digits = Vec::new();
355 for &base_10_000_digit in base_10_000_digits.iter().rev() {
356 let mut base_10_000_digit = base_10_000_digit;
357 let mut temp_result = Vec::new();
358 while base_10_000_digit > 0 {
359 temp_result.push((base_10_000_digit % 10) as u8);
360 base_10_000_digit /= 10;
361 }
362 while temp_result.len() < 4 {
363 temp_result.push(0);
364 }
365 u8_digits.extend(temp_result);
366 }
367 u8_digits.reverse();
368
369 let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
370 let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
371 u8_digits.resize(size as usize, 0);
372
373 let sign = match sign {
374 0x4000 => Sign::Minus,
375 0x0000 => Sign::Plus,
376 _ => {
377 return Err(Box::new(DataFusionError::Execution(
378 "Failed to parse big decimal from postgres numeric value".to_string(),
379 )));
380 }
381 };
382
383 let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
384 return Err(Box::new(DataFusionError::Execution(
385 "Failed to parse big decimal from postgres numeric value".to_string(),
386 )));
387 };
388 Ok(BigDecimalFromSql {
389 inner: BigDecimal::new(digits, i64::from(scale)),
390 scale,
391 })
392 }
393
394 fn accepts(ty: &Type) -> bool {
395 matches!(*ty, Type::NUMERIC)
396 }
397}
398
399#[derive(Debug)]
402struct IntervalFromSql {
403 time: i64,
404 day: i32,
405 month: i32,
406}
407
408impl<'a> FromSql<'a> for IntervalFromSql {
409 fn from_sql(
410 _ty: &Type,
411 raw: &'a [u8],
412 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
413 let mut cursor = std::io::Cursor::new(raw);
414
415 let time = cursor.read_i64::<BigEndian>()?;
416 let day = cursor.read_i32::<BigEndian>()?;
417 let month = cursor.read_i32::<BigEndian>()?;
418
419 Ok(IntervalFromSql { time, day, month })
420 }
421
422 fn accepts(ty: &Type) -> bool {
423 matches!(*ty, Type::INTERVAL)
424 }
425}
426
427struct GeometryFromSql<'a> {
428 wkb: &'a [u8],
429}
430
431impl<'a> FromSql<'a> for GeometryFromSql<'a> {
432 fn from_sql(
433 _ty: &Type,
434 raw: &'a [u8],
435 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
436 Ok(GeometryFromSql { wkb: raw })
437 }
438
439 fn accepts(ty: &Type) -> bool {
440 matches!(ty.name(), "geometry")
441 }
442}
443
444fn rows_to_batch(
445 rows: &[Row],
446 table_schema: &SchemaRef,
447 projection: Option<&Vec<usize>>,
448) -> DFResult<RecordBatch> {
449 let projected_schema = project_schema(table_schema, projection)?;
450 let mut array_builders = vec![];
451 for field in table_schema.fields() {
452 let builder = make_builder(field.data_type(), rows.len());
453 array_builders.push(builder);
454 }
455
456 for row in rows {
457 for (idx, field) in table_schema.fields.iter().enumerate() {
458 if !projections_contains(projection, idx) {
459 continue;
460 }
461 let builder = &mut array_builders[idx];
462 let col = row.columns().get(idx);
463 match field.data_type() {
464 DataType::Int16 => {
465 handle_primitive_type!(
466 builder,
467 field,
468 col,
469 Int16Builder,
470 i16,
471 row,
472 idx,
473 just_return
474 );
475 }
476 DataType::Int32 => {
477 handle_primitive_type!(
478 builder,
479 field,
480 col,
481 Int32Builder,
482 i32,
483 row,
484 idx,
485 just_return
486 );
487 }
488 DataType::UInt32 => {
489 handle_primitive_type!(
490 builder,
491 field,
492 col,
493 UInt32Builder,
494 u32,
495 row,
496 idx,
497 just_return
498 );
499 }
500 DataType::Int64 => {
501 handle_primitive_type!(
502 builder,
503 field,
504 col,
505 Int64Builder,
506 i64,
507 row,
508 idx,
509 just_return
510 );
511 }
512 DataType::Float32 => {
513 handle_primitive_type!(
514 builder,
515 field,
516 col,
517 Float32Builder,
518 f32,
519 row,
520 idx,
521 just_return
522 );
523 }
524 DataType::Float64 => {
525 handle_primitive_type!(
526 builder,
527 field,
528 col,
529 Float64Builder,
530 f64,
531 row,
532 idx,
533 just_return
534 );
535 }
536 DataType::Decimal128(_precision, _scale) => {
537 handle_primitive_type!(
538 builder,
539 field,
540 col,
541 Decimal128Builder,
542 BigDecimalFromSql,
543 row,
544 idx,
545 |v: BigDecimalFromSql| {
546 v.to_decimal_128().ok_or_else(|| {
547 DataFusionError::Execution(format!(
548 "Failed to convert BigDecimal {v:?} to i128",
549 ))
550 })
551 }
552 );
553 }
554 DataType::Utf8 => {
555 handle_primitive_type!(
556 builder,
557 field,
558 col,
559 StringBuilder,
560 &str,
561 row,
562 idx,
563 just_return
564 );
565 }
566 DataType::LargeUtf8 => {
567 if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
568 handle_primitive_type!(
569 builder,
570 field,
571 col,
572 LargeStringBuilder,
573 serde_json::value::Value,
574 row,
575 idx,
576 |v: serde_json::value::Value| {
577 Ok::<_, DataFusionError>(v.to_string())
578 }
579 );
580 } else {
581 handle_primitive_type!(
582 builder,
583 field,
584 col,
585 LargeStringBuilder,
586 &str,
587 row,
588 idx,
589 just_return
590 );
591 }
592 }
593 DataType::Binary => {
594 if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
595 {
596 let convert: for<'a> fn(GeometryFromSql<'a>) -> DFResult<&'a [u8]> =
597 |v| Ok(v.wkb);
598 handle_primitive_type!(
599 builder,
600 field,
601 col,
602 BinaryBuilder,
603 GeometryFromSql,
604 row,
605 idx,
606 convert
607 );
608 } else if col.is_some()
609 && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
610 {
611 handle_primitive_type!(
612 builder,
613 field,
614 col,
615 BinaryBuilder,
616 serde_json::value::Value,
617 row,
618 idx,
619 |v: serde_json::value::Value| {
620 Ok::<_, DataFusionError>(v.to_string().into_bytes())
621 }
622 );
623 } else {
624 handle_primitive_type!(
625 builder,
626 field,
627 col,
628 BinaryBuilder,
629 Vec<u8>,
630 row,
631 idx,
632 just_return
633 );
634 }
635 }
636 DataType::Timestamp(TimeUnit::Microsecond, None) => {
637 handle_primitive_type!(
638 builder,
639 field,
640 col,
641 TimestampMicrosecondBuilder,
642 SystemTime,
643 row,
644 idx,
645 |v: SystemTime| {
646 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
647 let timestamp: i64 = v
648 .as_micros()
649 .try_into()
650 .expect("Failed to convert SystemTime to i64");
651 Ok(timestamp)
652 } else {
653 Err(DataFusionError::Execution(format!(
654 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
655 )))
656 }
657 }
658 );
659 }
660 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
661 handle_primitive_type!(
662 builder,
663 field,
664 col,
665 TimestampNanosecondBuilder,
666 SystemTime,
667 row,
668 idx,
669 |v: SystemTime| {
670 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
671 let timestamp: i64 = v
672 .as_nanos()
673 .try_into()
674 .expect("Failed to convert SystemTime to i64");
675 Ok(timestamp)
676 } else {
677 Err(DataFusionError::Execution(format!(
678 "Failed to convert SystemTime {v:?} to i64 for {field:?} and {col:?}"
679 )))
680 }
681 }
682 );
683 }
684 DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
685 handle_primitive_type!(
686 builder,
687 field,
688 col,
689 TimestampNanosecondBuilder,
690 chrono::DateTime<chrono::Utc>,
691 row,
692 idx,
693 |v: chrono::DateTime<chrono::Utc>| {
694 let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
695 Ok::<_, DataFusionError>(timestamp)
696 }
697 );
698 }
699 DataType::Time64(TimeUnit::Microsecond) => {
700 handle_primitive_type!(
701 builder,
702 field,
703 col,
704 Time64MicrosecondBuilder,
705 chrono::NaiveTime,
706 row,
707 idx,
708 |v: chrono::NaiveTime| {
709 let seconds = i64::from(v.num_seconds_from_midnight());
710 let microseconds = i64::from(v.nanosecond()) / 1000;
711 Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
712 }
713 );
714 }
715 DataType::Time64(TimeUnit::Nanosecond) => {
716 handle_primitive_type!(
717 builder,
718 field,
719 col,
720 Time64NanosecondBuilder,
721 chrono::NaiveTime,
722 row,
723 idx,
724 |v: chrono::NaiveTime| {
725 let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
726 * 1_000_000_000
727 + i64::from(v.nanosecond());
728 Ok::<_, DataFusionError>(timestamp)
729 }
730 );
731 }
732 DataType::Date32 => {
733 handle_primitive_type!(
734 builder,
735 field,
736 col,
737 Date32Builder,
738 chrono::NaiveDate,
739 row,
740 idx,
741 |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
742 );
743 }
744 DataType::Interval(IntervalUnit::MonthDayNano) => {
745 handle_primitive_type!(
746 builder,
747 field,
748 col,
749 IntervalMonthDayNanoBuilder,
750 IntervalFromSql,
751 row,
752 idx,
753 |v: IntervalFromSql| {
754 let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
755 v.month,
756 v.day,
757 v.time * 1_000,
758 );
759 Ok::<_, DataFusionError>(interval_month_day_nano)
760 }
761 );
762 }
763 DataType::Boolean => {
764 handle_primitive_type!(
765 builder,
766 field,
767 col,
768 BooleanBuilder,
769 bool,
770 row,
771 idx,
772 just_return
773 );
774 }
775 DataType::List(inner) => match inner.data_type() {
776 DataType::Int16 => {
777 handle_primitive_array_type!(
778 builder,
779 field,
780 col,
781 Int16Builder,
782 i16,
783 row,
784 idx
785 );
786 }
787 DataType::Int32 => {
788 handle_primitive_array_type!(
789 builder,
790 field,
791 col,
792 Int32Builder,
793 i32,
794 row,
795 idx
796 );
797 }
798 DataType::Int64 => {
799 handle_primitive_array_type!(
800 builder,
801 field,
802 col,
803 Int64Builder,
804 i64,
805 row,
806 idx
807 );
808 }
809 DataType::Float32 => {
810 handle_primitive_array_type!(
811 builder,
812 field,
813 col,
814 Float32Builder,
815 f32,
816 row,
817 idx
818 );
819 }
820 DataType::Float64 => {
821 handle_primitive_array_type!(
822 builder,
823 field,
824 col,
825 Float64Builder,
826 f64,
827 row,
828 idx
829 );
830 }
831 DataType::Utf8 => {
832 handle_primitive_array_type!(
833 builder,
834 field,
835 col,
836 StringBuilder,
837 &str,
838 row,
839 idx
840 );
841 }
842 DataType::Binary => {
843 handle_primitive_array_type!(
844 builder,
845 field,
846 col,
847 BinaryBuilder,
848 Vec<u8>,
849 row,
850 idx
851 );
852 }
853 DataType::Boolean => {
854 handle_primitive_array_type!(
855 builder,
856 field,
857 col,
858 BooleanBuilder,
859 bool,
860 row,
861 idx
862 );
863 }
864 _ => {
865 return Err(DataFusionError::NotImplemented(format!(
866 "Unsupported list data type {:?} for col: {:?}",
867 field.data_type(),
868 col
869 )));
870 }
871 },
872 _ => {
873 return Err(DataFusionError::NotImplemented(format!(
874 "Unsupported data type {:?} for col: {:?}",
875 field.data_type(),
876 col
877 )));
878 }
879 }
880 }
881 }
882 let projected_columns = array_builders
883 .into_iter()
884 .enumerate()
885 .filter(|(idx, _)| projections_contains(projection, *idx))
886 .map(|(_, mut builder)| builder.finish())
887 .collect::<Vec<ArrayRef>>();
888 let options = RecordBatchOptions::new().with_row_count(Some(rows.len()));
889 Ok(RecordBatch::try_new_with_options(
890 projected_schema,
891 projected_columns,
892 &options,
893 )?)
894}