1use crate::connection::{big_decimal_to_i128, projections_contains};
2use crate::transform::transform_batch;
3use crate::{
4 Connection, DFResult, Pool, PostgresType, RemoteField, RemoteSchema, RemoteSchemaRef,
5 RemoteType, Transform,
6};
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bb8_postgres::PostgresConnectionManager;
10use bigdecimal::BigDecimal;
11use byteorder::{BigEndian, ReadBytesExt};
12use chrono::Timelike;
13use datafusion::arrow::array::{
14 make_builder, ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder,
15 Decimal128Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
16 IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch, StringBuilder,
17 Time64NanosecondBuilder, TimestampNanosecondBuilder,
18};
19use datafusion::arrow::datatypes::{
20 DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22use datafusion::common::project_schema;
23use datafusion::error::DataFusionError;
24use datafusion::execution::SendableRecordBatchStream;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use futures::StreamExt;
27use num_bigint::{BigInt, Sign};
28use std::string::ToString;
29use std::sync::Arc;
30use std::time::{SystemTime, UNIX_EPOCH};
31
32#[derive(Debug, Clone, derive_with::With)]
33pub struct PostgresConnectionOptions {
34 pub(crate) host: String,
35 pub(crate) port: u16,
36 pub(crate) username: String,
37 pub(crate) password: String,
38 pub(crate) database: Option<String>,
39}
40
41impl PostgresConnectionOptions {
42 pub fn new(
43 host: impl Into<String>,
44 port: u16,
45 username: impl Into<String>,
46 password: impl Into<String>,
47 ) -> Self {
48 Self {
49 host: host.into(),
50 port,
51 username: username.into(),
52 password: password.into(),
53 database: None,
54 }
55 }
56}
57
58#[derive(Debug)]
59pub struct PostgresPool {
60 pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
61}
62
63#[async_trait::async_trait]
64impl Pool for PostgresPool {
65 async fn get(&self) -> DFResult<Arc<dyn Connection>> {
66 let conn = self.pool.get_owned().await.map_err(|e| {
67 DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
68 })?;
69 Ok(Arc::new(PostgresConnection { conn }))
70 }
71}
72
73pub(crate) async fn connect_postgres(
74 options: &PostgresConnectionOptions,
75) -> DFResult<PostgresPool> {
76 let mut config = bb8_postgres::tokio_postgres::config::Config::new();
77 config
78 .host(&options.host)
79 .port(options.port)
80 .user(&options.username)
81 .password(&options.password);
82 if let Some(database) = &options.database {
83 config.dbname(database);
84 }
85 let manager = PostgresConnectionManager::new(config, NoTls);
86 let pool = bb8::Pool::builder()
87 .max_size(5)
88 .build(manager)
89 .await
90 .map_err(|e| {
91 DataFusionError::Execution(format!(
92 "Failed to create postgres connection pool due to {e}",
93 ))
94 })?;
95
96 Ok(PostgresPool { pool })
97}
98
99#[derive(Debug)]
100pub(crate) struct PostgresConnection {
101 conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
102}
103
104#[async_trait::async_trait]
105impl Connection for PostgresConnection {
106 async fn infer_schema(
107 &self,
108 sql: &str,
109 transform: Option<Arc<dyn Transform>>,
110 ) -> DFResult<(RemoteSchemaRef, SchemaRef)> {
111 let mut stream = self
112 .conn
113 .query_raw(sql, Vec::<String>::new())
114 .await
115 .map_err(|e| {
116 DataFusionError::Execution(format!(
117 "Failed to execute query {sql} on postgres due to {e}",
118 ))
119 })?
120 .chunks(1)
121 .boxed();
122
123 let Some(first_chunk) = stream.next().await else {
124 return Err(DataFusionError::Execution(
125 "No data returned from postgres".to_string(),
126 ));
127 };
128 let first_chunk: Vec<Row> = first_chunk
129 .into_iter()
130 .collect::<Result<Vec<_>, _>>()
131 .map_err(|e| {
132 DataFusionError::Execution(format!(
133 "Failed to collect rows from postgres due to {e}",
134 ))
135 })?;
136 let Some(first_row) = first_chunk.first() else {
137 return Err(DataFusionError::Execution(
138 "No data returned from postgres".to_string(),
139 ));
140 };
141 let remote_schema = Arc::new(build_remote_schema(first_row)?);
142 let arrow_schema = Arc::new(remote_schema.to_arrow_schema());
143 if let Some(transform) = transform {
144 let batch = rows_to_batch(std::slice::from_ref(first_row), &arrow_schema, None)?;
145 let transformed_batch = transform_batch(
146 batch,
147 transform.as_ref(),
148 &arrow_schema,
149 None,
150 Some(&remote_schema),
151 )?;
152 Ok((remote_schema, transformed_batch.schema()))
153 } else {
154 Ok((remote_schema, arrow_schema))
155 }
156 }
157
158 async fn query(
159 &self,
160 sql: String,
161 table_schema: SchemaRef,
162 projection: Option<Vec<usize>>,
163 ) -> DFResult<SendableRecordBatchStream> {
164 let projected_schema = project_schema(&table_schema, projection.as_ref())?;
165 let stream = self
166 .conn
167 .query_raw(&sql, Vec::<String>::new())
168 .await
169 .map_err(|e| {
170 DataFusionError::Execution(format!(
171 "Failed to execute query {sql} on postgres due to {e}",
172 ))
173 })?
174 .chunks(2048)
175 .boxed();
176
177 let stream = stream.map(move |rows| {
178 let rows: Vec<Row> = rows
179 .into_iter()
180 .collect::<Result<Vec<_>, _>>()
181 .map_err(|e| {
182 DataFusionError::Execution(format!(
183 "Failed to collect rows from postgres due to {e}",
184 ))
185 })?;
186 rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
187 });
188
189 Ok(Box::pin(RecordBatchStreamAdapter::new(
190 projected_schema,
191 stream,
192 )))
193 }
194}
195
196fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<RemoteType> {
197 match pg_type {
198 &Type::INT2 => Ok(RemoteType::Postgres(PostgresType::Int2)),
199 &Type::INT4 => Ok(RemoteType::Postgres(PostgresType::Int4)),
200 &Type::INT8 => Ok(RemoteType::Postgres(PostgresType::Int8)),
201 &Type::FLOAT4 => Ok(RemoteType::Postgres(PostgresType::Float4)),
202 &Type::FLOAT8 => Ok(RemoteType::Postgres(PostgresType::Float8)),
203 &Type::NUMERIC => {
204 let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
205 DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
206 })?;
207 let scale = match v {
208 Some(v) => v.scale,
209 None => 0,
210 };
211 assert!((scale as u32) <= (i8::MAX as u32));
212 Ok(RemoteType::Postgres(PostgresType::Numeric(
213 scale.try_into().unwrap_or_default(),
214 )))
215 }
216 &Type::NAME => Ok(RemoteType::Postgres(PostgresType::Name)),
217 &Type::VARCHAR => Ok(RemoteType::Postgres(PostgresType::Varchar)),
218 &Type::BPCHAR => Ok(RemoteType::Postgres(PostgresType::Bpchar)),
219 &Type::TEXT => Ok(RemoteType::Postgres(PostgresType::Text)),
220 &Type::BYTEA => Ok(RemoteType::Postgres(PostgresType::Bytea)),
221 &Type::DATE => Ok(RemoteType::Postgres(PostgresType::Date)),
222 &Type::TIMESTAMP => Ok(RemoteType::Postgres(PostgresType::Timestamp)),
223 &Type::TIMESTAMPTZ => Ok(RemoteType::Postgres(PostgresType::TimestampTz)),
224 &Type::TIME => Ok(RemoteType::Postgres(PostgresType::Time)),
225 &Type::INTERVAL => Ok(RemoteType::Postgres(PostgresType::Interval)),
226 &Type::BOOL => Ok(RemoteType::Postgres(PostgresType::Bool)),
227 &Type::JSON => Ok(RemoteType::Postgres(PostgresType::Json)),
228 &Type::JSONB => Ok(RemoteType::Postgres(PostgresType::Jsonb)),
229 &Type::INT2_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int2Array)),
230 &Type::INT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int4Array)),
231 &Type::INT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Int8Array)),
232 &Type::FLOAT4_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float4Array)),
233 &Type::FLOAT8_ARRAY => Ok(RemoteType::Postgres(PostgresType::Float8Array)),
234 &Type::VARCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::VarcharArray)),
235 &Type::BPCHAR_ARRAY => Ok(RemoteType::Postgres(PostgresType::BpcharArray)),
236 &Type::TEXT_ARRAY => Ok(RemoteType::Postgres(PostgresType::TextArray)),
237 &Type::BYTEA_ARRAY => Ok(RemoteType::Postgres(PostgresType::ByteaArray)),
238 &Type::BOOL_ARRAY => Ok(RemoteType::Postgres(PostgresType::BoolArray)),
239 other if other.name().eq_ignore_ascii_case("geometry") => {
240 Ok(RemoteType::Postgres(PostgresType::PostGisGeometry))
241 }
242 _ => Err(DataFusionError::NotImplemented(format!(
243 "Unsupported postgres type {pg_type:?}",
244 ))),
245 }
246}
247
248fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
249 let mut remote_fields = vec![];
250 for (idx, col) in row.columns().iter().enumerate() {
251 remote_fields.push(RemoteField::new(
252 col.name(),
253 pg_type_to_remote_type(col.type_(), row, idx)?,
254 true,
255 ));
256 }
257 Ok(RemoteSchema::new(remote_fields))
258}
259
260macro_rules! handle_primitive_type {
261 ($builder:expr, $field:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr) => {{
262 let builder = $builder
263 .as_any_mut()
264 .downcast_mut::<$builder_ty>()
265 .unwrap_or_else(|| {
266 panic!(
267 concat!(
268 "Failed to downcast builder to ",
269 stringify!($builder_ty),
270 " for {:?}"
271 ),
272 $field
273 )
274 });
275 let v: Option<$value_ty> = $row.try_get($index).unwrap_or_else(|e| {
276 panic!(
277 concat!(
278 "Failed to get ",
279 stringify!($value_ty),
280 " value for {:?}: {:?}"
281 ),
282 $field, e
283 )
284 });
285
286 match v {
287 Some(v) => builder.append_value(v),
288 None => builder.append_null(),
289 }
290 }};
291}
292
293macro_rules! handle_primitive_array_type {
294 ($builder:expr, $field:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
295 let builder = $builder
296 .as_any_mut()
297 .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
298 .unwrap_or_else(|| {
299 panic!(
300 "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?}",
301 $field
302 )
303 });
304 let values_builder = builder
305 .values()
306 .as_any_mut()
307 .downcast_mut::<$values_builder_ty>()
308 .unwrap_or_else(|| {
309 panic!(
310 concat!(
311 "Failed to downcast values builder to ",
312 stringify!($builder_ty),
313 " for {:?}"
314 ),
315 $field
316 )
317 });
318 let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).unwrap_or_else(|e| {
319 panic!(
320 concat!(
321 "Failed to get ",
322 stringify!($value_ty),
323 " array value for {:?}: {:?}"
324 ),
325 $field, e
326 )
327 });
328
329 match v {
330 Some(v) => {
331 let v = v.into_iter().map(Some);
332 values_builder.extend(v);
333 builder.append(true);
334 }
335 None => builder.append_null(),
336 }
337 }};
338}
339
340#[derive(Debug)]
341struct BigDecimalFromSql {
342 inner: BigDecimal,
343 scale: u16,
344}
345
346impl BigDecimalFromSql {
347 fn to_decimal_128(&self) -> Option<i128> {
348 big_decimal_to_i128(&self.inner, Some(self.scale as u32))
349 }
350}
351
352#[allow(clippy::cast_sign_loss)]
353#[allow(clippy::cast_possible_wrap)]
354#[allow(clippy::cast_possible_truncation)]
355impl<'a> FromSql<'a> for BigDecimalFromSql {
356 fn from_sql(
357 _ty: &Type,
358 raw: &'a [u8],
359 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
360 let raw_u16: Vec<u16> = raw
361 .chunks(2)
362 .map(|chunk| {
363 if chunk.len() == 2 {
364 u16::from_be_bytes([chunk[0], chunk[1]])
365 } else {
366 u16::from_be_bytes([chunk[0], 0])
367 }
368 })
369 .collect();
370
371 let base_10_000_digit_count = raw_u16[0];
372 let weight = raw_u16[1] as i16;
373 let sign = raw_u16[2];
374 let scale = raw_u16[3];
375
376 let mut base_10_000_digits = Vec::new();
377 for i in 4..4 + base_10_000_digit_count {
378 base_10_000_digits.push(raw_u16[i as usize]);
379 }
380
381 let mut u8_digits = Vec::new();
382 for &base_10_000_digit in base_10_000_digits.iter().rev() {
383 let mut base_10_000_digit = base_10_000_digit;
384 let mut temp_result = Vec::new();
385 while base_10_000_digit > 0 {
386 temp_result.push((base_10_000_digit % 10) as u8);
387 base_10_000_digit /= 10;
388 }
389 while temp_result.len() < 4 {
390 temp_result.push(0);
391 }
392 u8_digits.extend(temp_result);
393 }
394 u8_digits.reverse();
395
396 let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
397 let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
398 u8_digits.resize(size as usize, 0);
399
400 let sign = match sign {
401 0x4000 => Sign::Minus,
402 0x0000 => Sign::Plus,
403 _ => {
404 return Err(Box::new(DataFusionError::Execution(
405 "Failed to parse big decimal from postgres numeric value".to_string(),
406 )));
407 }
408 };
409
410 let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
411 return Err(Box::new(DataFusionError::Execution(
412 "Failed to parse big decimal from postgres numeric value".to_string(),
413 )));
414 };
415 Ok(BigDecimalFromSql {
416 inner: BigDecimal::new(digits, i64::from(scale)),
417 scale,
418 })
419 }
420
421 fn accepts(ty: &Type) -> bool {
422 matches!(*ty, Type::NUMERIC)
423 }
424}
425
426#[derive(Debug)]
429struct IntervalFromSql {
430 time: i64,
431 day: i32,
432 month: i32,
433}
434
435impl<'a> FromSql<'a> for IntervalFromSql {
436 fn from_sql(
437 _ty: &Type,
438 raw: &'a [u8],
439 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
440 let mut cursor = std::io::Cursor::new(raw);
441
442 let time = cursor.read_i64::<BigEndian>()?;
443 let day = cursor.read_i32::<BigEndian>()?;
444 let month = cursor.read_i32::<BigEndian>()?;
445
446 Ok(IntervalFromSql { time, day, month })
447 }
448
449 fn accepts(ty: &Type) -> bool {
450 matches!(*ty, Type::INTERVAL)
451 }
452}
453
454pub struct GeometryFromSql<'a> {
455 wkb: &'a [u8],
456}
457
458impl<'a> FromSql<'a> for GeometryFromSql<'a> {
459 fn from_sql(
460 _ty: &Type,
461 raw: &'a [u8],
462 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
463 Ok(GeometryFromSql { wkb: raw })
464 }
465
466 fn accepts(ty: &Type) -> bool {
467 matches!(ty.name(), "geometry")
468 }
469}
470
471fn rows_to_batch(
472 rows: &[Row],
473 table_schema: &SchemaRef,
474 projection: Option<&Vec<usize>>,
475) -> DFResult<RecordBatch> {
476 let projected_schema = project_schema(table_schema, projection)?;
477 let mut array_builders = vec![];
478 for field in table_schema.fields() {
479 let builder = make_builder(field.data_type(), rows.len());
480 array_builders.push(builder);
481 }
482
483 for row in rows {
484 for (idx, field) in table_schema.fields.iter().enumerate() {
485 if !projections_contains(projection, idx) {
486 continue;
487 }
488 let builder = &mut array_builders[idx];
489 let col = row.columns().get(idx);
490 match field.data_type() {
491 DataType::Int16 => {
492 handle_primitive_type!(builder, field, Int16Builder, i16, row, idx);
493 }
494 DataType::Int32 => {
495 handle_primitive_type!(builder, field, Int32Builder, i32, row, idx);
496 }
497 DataType::Int64 => {
498 handle_primitive_type!(builder, field, Int64Builder, i64, row, idx);
499 }
500 DataType::Float32 => {
501 handle_primitive_type!(builder, field, Float32Builder, f32, row, idx);
502 }
503 DataType::Float64 => {
504 handle_primitive_type!(builder, field, Float64Builder, f64, row, idx);
505 }
506 DataType::Decimal128(_precision, _scale) => {
507 let builder = builder
508 .as_any_mut()
509 .downcast_mut::<Decimal128Builder>()
510 .unwrap_or_else(|| {
511 panic!("Failed to downcast builder to Decimal128Builder for {field:?}")
512 });
513 let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
514 DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
515 })?;
516
517 match v {
518 Some(v) => {
519 let Some(v) = v.to_decimal_128() else {
520 return Err(DataFusionError::Execution(format!(
521 "Failed to convert BigDecimal {:?} to i128",
522 v
523 )));
524 };
525 builder.append_value(v)
526 }
527 None => builder.append_null(),
528 }
529 }
530 DataType::Utf8 => {
531 handle_primitive_type!(builder, field, StringBuilder, &str, row, idx);
532 }
533 DataType::LargeUtf8 => {
534 if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
535 let builder = builder
536 .as_any_mut()
537 .downcast_mut::<LargeStringBuilder>()
538 .unwrap_or_else(|| {
539 panic!("Failed to downcast builder to LargeStringBuilder for {field:?}")
540 });
541 let v: Option<serde_json::value::Value> =
542 row.try_get(idx).unwrap_or_else(|e| {
543 panic!("Failed to get serde_json::value::Value value for {field:?}: {e:?}")
544 });
545
546 match v {
547 Some(v) => builder.append_value(v.to_string()),
548 None => builder.append_null(),
549 }
550 } else {
551 handle_primitive_type!(builder, field, LargeStringBuilder, &str, row, idx);
552 }
553 }
554 DataType::Binary => {
555 if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
556 {
557 let builder = builder.as_any_mut().downcast_mut::<BinaryBuilder>().expect(
558 "Failed to downcast builder to BinaryBuilder for Type::geometry",
559 );
560 let v: Option<GeometryFromSql> = row.try_get(idx).expect(
561 "Failed to get GeometryFromSql value for column Type::geometry",
562 );
563
564 match v {
565 Some(v) => builder.append_value(v.wkb),
566 None => builder.append_null(),
567 }
568 } else {
569 handle_primitive_type!(builder, field, BinaryBuilder, Vec<u8>, row, idx);
570 }
571 }
572 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
573 let builder = builder
574 .as_any_mut()
575 .downcast_mut::<TimestampNanosecondBuilder>()
576 .unwrap_or_else(|| panic!("Failed to downcast builder to TimestampNanosecondBuilder for {field:?}"));
577 let v: Option<SystemTime> = row.try_get(idx).unwrap_or_else(|e| {
578 panic!("Failed to get SystemTime value for {field:?}: {e:?}")
579 });
580
581 match v {
582 Some(v) => {
583 if let Ok(v) = v.duration_since(UNIX_EPOCH) {
584 let timestamp: i64 = v
585 .as_nanos()
586 .try_into()
587 .expect("Failed to convert SystemTime to i64");
588 builder.append_value(timestamp);
589 } else {
590 return Err(DataFusionError::Execution(format!(
591 "Failed to convert SystemTime {v:?} to i64 for {field:?}"
592 )));
593 }
594 }
595 None => builder.append_null(),
596 }
597 }
598 DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
599 let builder = builder
600 .as_any_mut()
601 .downcast_mut::<TimestampNanosecondBuilder>()
602 .unwrap_or_else(|| {
603 panic!("Failed to downcast builder to TimestampTz for {field:?}")
604 });
605 let v: Option<chrono::DateTime<chrono::Utc>> = row.try_get(idx).unwrap_or_else(|e| panic!("Failed to get chrono::DateTime<chrono::Utc> value for {field:?}: {e:?}"));
606
607 match v {
608 Some(v) => {
609 let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for Type::TIMESTAMP"));
610 builder.append_value(timestamp);
611 }
612 None => builder.append_null(),
613 }
614 }
615 DataType::Time64(TimeUnit::Nanosecond) => {
616 let builder = builder
617 .as_any_mut()
618 .downcast_mut::<Time64NanosecondBuilder>()
619 .expect(
620 "Failed to downcast builder to Time64NanosecondBuilder for Type::TIME",
621 );
622 let v: Option<chrono::NaiveTime> = row
623 .try_get(idx)
624 .expect("Failed to get chrono::NaiveTime value for column Type::TIME");
625
626 match v {
627 Some(v) => {
628 let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
629 * 1_000_000_000
630 + i64::from(v.nanosecond());
631 builder.append_value(timestamp);
632 }
633 None => builder.append_null(),
634 }
635 }
636 DataType::Date32 => {
637 let builder = builder
638 .as_any_mut()
639 .downcast_mut::<Date32Builder>()
640 .expect("Failed to downcast builder to Date32Builder for Type::DATE");
641 let v: Option<chrono::NaiveDate> = row
642 .try_get(idx)
643 .expect("Failed to get chrono::NaiveDate value for column Type::DATE");
644
645 match v {
646 Some(v) => builder.append_value(Date32Type::from_naive_date(v)),
647 None => builder.append_null(),
648 }
649 }
650 DataType::Interval(IntervalUnit::MonthDayNano) => {
651 let builder = builder
652 .as_any_mut()
653 .downcast_mut::<IntervalMonthDayNanoBuilder>()
654 .expect("Failed to downcast builder to IntervalMonthDayNanoBuilder for Type::INTERVAL");
655
656 let v: Option<IntervalFromSql> = row
657 .try_get(idx)
658 .expect("Failed to get IntervalFromSql value for column Type::INTERVAL");
659
660 match v {
661 Some(v) => {
662 let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
663 v.month,
664 v.day,
665 v.time * 1_000,
666 );
667 builder.append_value(interval_month_day_nano);
668 }
669 None => builder.append_null(),
670 }
671 }
672 DataType::Boolean => {
673 handle_primitive_type!(builder, field, BooleanBuilder, bool, row, idx);
674 }
675 DataType::List(inner) => match inner.data_type() {
676 DataType::Int16 => {
677 handle_primitive_array_type!(builder, field, Int16Builder, i16, row, idx);
678 }
679 DataType::Int32 => {
680 handle_primitive_array_type!(builder, field, Int32Builder, i32, row, idx);
681 }
682 DataType::Int64 => {
683 handle_primitive_array_type!(builder, field, Int64Builder, i64, row, idx);
684 }
685 DataType::Float32 => {
686 handle_primitive_array_type!(builder, field, Float32Builder, f32, row, idx);
687 }
688 DataType::Float64 => {
689 handle_primitive_array_type!(builder, field, Float64Builder, f64, row, idx);
690 }
691 DataType::Utf8 => {
692 handle_primitive_array_type!(builder, field, StringBuilder, &str, row, idx);
693 }
694 DataType::Binary => {
695 handle_primitive_array_type!(
696 builder,
697 field,
698 BinaryBuilder,
699 Vec<u8>,
700 row,
701 idx
702 );
703 }
704 DataType::Boolean => {
705 handle_primitive_array_type!(
706 builder,
707 field,
708 BooleanBuilder,
709 bool,
710 row,
711 idx
712 );
713 }
714 _ => {
715 return Err(DataFusionError::NotImplemented(format!(
716 "Unsupported list data type {:?} for col: {:?}",
717 field.data_type(),
718 col
719 )));
720 }
721 },
722 _ => {
723 return Err(DataFusionError::NotImplemented(format!(
724 "Unsupported data type {:?} for col: {:?}",
725 field.data_type(),
726 col
727 )));
728 }
729 }
730 }
752 }
753 let projected_columns = array_builders
754 .into_iter()
755 .enumerate()
756 .filter(|(idx, _)| projections_contains(projection, *idx))
757 .map(|(_, mut builder)| builder.finish())
758 .collect::<Vec<ArrayRef>>();
759 Ok(RecordBatch::try_new(projected_schema, projected_columns)?)
760}