1use crate::connection::{RemoteDbType, big_decimal_to_i128, just_return, projections_contains};
2use crate::{
3 Connection, ConnectionOptions, DFResult, Pool, PostgresType, RemoteField, RemoteSchema,
4 RemoteSchemaRef, RemoteType, TableSource, Unparse, unparse_array,
5};
6use bb8_postgres::PostgresConnectionManager;
7use bb8_postgres::tokio_postgres::types::{FromSql, Type};
8use bb8_postgres::tokio_postgres::{NoTls, Row};
9use bigdecimal::BigDecimal;
10use byteorder::{BigEndian, ReadBytesExt};
11use chrono::Timelike;
12use datafusion::arrow::array::{
13 ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
14 FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int16Builder, Int32Builder,
15 Int64Builder, IntervalMonthDayNanoBuilder, LargeStringBuilder, ListBuilder, RecordBatch,
16 RecordBatchOptions, StringBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder,
17 TimestampMicrosecondBuilder, TimestampNanosecondBuilder, UInt32Builder, make_builder,
18};
19use datafusion::arrow::datatypes::{
20 DataType, Date32Type, IntervalMonthDayNanoType, IntervalUnit, SchemaRef, TimeUnit,
21};
22
23use datafusion::common::project_schema;
24use datafusion::error::DataFusionError;
25use datafusion::execution::SendableRecordBatchStream;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
27use derive_getters::Getters;
28use derive_with::With;
29use futures::StreamExt;
30use log::debug;
31use num_bigint::{BigInt, Sign};
32use std::any::Any;
33use std::string::ToString;
34use std::sync::Arc;
35use uuid::Uuid;
36
37#[derive(Debug, Clone, With, Getters)]
38pub struct PostgresConnectionOptions {
39 pub(crate) host: String,
40 pub(crate) port: u16,
41 pub(crate) username: String,
42 pub(crate) password: String,
43 pub(crate) database: Option<String>,
44 pub(crate) pool_max_size: usize,
45 pub(crate) stream_chunk_size: usize,
46}
47
48impl PostgresConnectionOptions {
49 pub fn new(
50 host: impl Into<String>,
51 port: u16,
52 username: impl Into<String>,
53 password: impl Into<String>,
54 ) -> Self {
55 Self {
56 host: host.into(),
57 port,
58 username: username.into(),
59 password: password.into(),
60 database: None,
61 pool_max_size: 10,
62 stream_chunk_size: 2048,
63 }
64 }
65}
66
67impl From<PostgresConnectionOptions> for ConnectionOptions {
68 fn from(options: PostgresConnectionOptions) -> Self {
69 ConnectionOptions::Postgres(options)
70 }
71}
72
73#[derive(Debug)]
74pub struct PostgresPool {
75 pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
76}
77
78#[async_trait::async_trait]
79impl Pool for PostgresPool {
80 async fn get(&self) -> DFResult<Arc<dyn Connection>> {
81 let conn = self.pool.get_owned().await.map_err(|e| {
82 DataFusionError::Execution(format!("Failed to get postgres connection due to {e:?}"))
83 })?;
84 Ok(Arc::new(PostgresConnection { conn }))
85 }
86}
87
88pub(crate) async fn connect_postgres(
89 options: &PostgresConnectionOptions,
90) -> DFResult<PostgresPool> {
91 let mut config = bb8_postgres::tokio_postgres::config::Config::new();
92 config
93 .host(&options.host)
94 .port(options.port)
95 .user(&options.username)
96 .password(&options.password);
97 if let Some(database) = &options.database {
98 config.dbname(database);
99 }
100 let manager = PostgresConnectionManager::new(config, NoTls);
101 let pool = bb8::Pool::builder()
102 .max_size(options.pool_max_size as u32)
103 .build(manager)
104 .await
105 .map_err(|e| {
106 DataFusionError::Execution(format!(
107 "Failed to create postgres connection pool due to {e}",
108 ))
109 })?;
110
111 Ok(PostgresPool { pool })
112}
113
114#[derive(Debug)]
115pub(crate) struct PostgresConnection {
116 conn: bb8::PooledConnection<'static, PostgresConnectionManager<NoTls>>,
117}
118
119#[async_trait::async_trait]
120impl Connection for PostgresConnection {
121 fn as_any(&self) -> &dyn Any {
122 self
123 }
124
125 async fn infer_schema(&self, source: &TableSource) -> DFResult<RemoteSchemaRef> {
126 match source {
127 TableSource::Table(_table) => {
128 let sql = RemoteDbType::Postgres.limit_1_query_if_possible(source);
130 let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
131 DataFusionError::Execution(format!(
132 "Failed to execute query {sql} on postgres: {e:?}",
133 ))
134 })?;
135 let remote_schema = Arc::new(build_remote_schema(&row)?);
136 Ok(remote_schema)
137 }
138 TableSource::Query(_query) => {
139 let sql = RemoteDbType::Postgres.limit_1_query_if_possible(source);
140 let row = self.conn.query_one(&sql, &[]).await.map_err(|e| {
141 DataFusionError::Execution(format!(
142 "Failed to execute query {sql} on postgres: {e:?}",
143 ))
144 })?;
145 let remote_schema = Arc::new(build_remote_schema(&row)?);
146 Ok(remote_schema)
147 }
148 }
149 }
150
151 async fn query(
152 &self,
153 conn_options: &ConnectionOptions,
154 source: &TableSource,
155 table_schema: SchemaRef,
156 projection: Option<&Vec<usize>>,
157 unparsed_filters: &[String],
158 limit: Option<usize>,
159 ) -> DFResult<SendableRecordBatchStream> {
160 let projected_schema = project_schema(&table_schema, projection)?;
161
162 let sql = RemoteDbType::Postgres.rewrite_query(source, unparsed_filters, limit);
163 debug!("[remote-table] executing postgres query: {sql}");
164
165 let projection = projection.cloned();
166 let chunk_size = conn_options.stream_chunk_size();
167 let stream = self
168 .conn
169 .query_raw(&sql, Vec::<String>::new())
170 .await
171 .map_err(|e| {
172 DataFusionError::Execution(format!(
173 "Failed to execute query {sql} on postgres: {e}",
174 ))
175 })?
176 .chunks(chunk_size)
177 .boxed();
178
179 let stream = stream.map(move |rows| {
180 let rows: Vec<Row> = rows
181 .into_iter()
182 .collect::<Result<Vec<_>, _>>()
183 .map_err(|e| {
184 DataFusionError::Execution(format!(
185 "Failed to collect rows from postgres due to {e}",
186 ))
187 })?;
188 rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
189 });
190
191 Ok(Box::pin(RecordBatchStreamAdapter::new(
192 projected_schema,
193 stream,
194 )))
195 }
196
197 async fn insert(
198 &self,
199 _conn_options: &ConnectionOptions,
200 unparser: Arc<dyn Unparse>,
201 table: &[String],
202 remote_schema: RemoteSchemaRef,
203 mut input: SendableRecordBatchStream,
204 ) -> DFResult<usize> {
205 let input_schema = input.schema();
206
207 let mut total_count = 0;
208 while let Some(batch) = input.next().await {
209 let batch = batch?;
210
211 let mut columns = Vec::with_capacity(remote_schema.fields.len());
212 for i in 0..batch.num_columns() {
213 let input_field = input_schema.field(i);
214 let remote_field = &remote_schema.fields[i];
215 if remote_field.auto_increment && input_field.is_nullable() {
216 continue;
217 }
218
219 let remote_type = remote_schema.fields[i].remote_type.clone();
220 let array = batch.column(i);
221 let column = unparse_array(unparser.as_ref(), array, remote_type)?;
222 columns.push(column);
223 }
224
225 let num_rows = columns[0].len();
226 let num_columns = columns.len();
227
228 let mut values = Vec::with_capacity(num_rows);
229 for i in 0..num_rows {
230 let mut value = Vec::with_capacity(num_columns);
231 for col in columns.iter() {
232 value.push(col[i].as_str());
233 }
234 values.push(format!("({})", value.join(",")));
235 }
236
237 let mut col_names = Vec::with_capacity(remote_schema.fields.len());
238 for (remote_field, input_field) in
239 remote_schema.fields.iter().zip(input_schema.fields.iter())
240 {
241 if remote_field.auto_increment && input_field.is_nullable() {
242 continue;
243 }
244 col_names.push(RemoteDbType::Postgres.sql_identifier(&remote_field.name));
245 }
246
247 let sql = format!(
248 "INSERT INTO {} ({}) VALUES {}",
249 RemoteDbType::Postgres.sql_table_name(table),
250 col_names.join(","),
251 values.join(",")
252 );
253
254 let count = self.conn.execute(&sql, &[]).await.map_err(|e| {
255 DataFusionError::Execution(format!(
256 "Failed to execute insert statement on postgres: {e:?}, sql: {sql}"
257 ))
258 })?;
259 total_count += count as usize;
260 }
261
262 Ok(total_count)
263 }
264}
265
266fn pg_type_to_remote_type(pg_type: &Type, row: &Row, idx: usize) -> DFResult<PostgresType> {
267 match pg_type {
268 &Type::INT2 => Ok(PostgresType::Int2),
269 &Type::INT4 => Ok(PostgresType::Int4),
270 &Type::INT8 => Ok(PostgresType::Int8),
271 &Type::FLOAT4 => Ok(PostgresType::Float4),
272 &Type::FLOAT8 => Ok(PostgresType::Float8),
273 &Type::NUMERIC => {
274 let v: Option<BigDecimalFromSql> = row.try_get(idx).map_err(|e| {
275 DataFusionError::Execution(format!("Failed to get BigDecimal value: {e:?}"))
276 })?;
277 let scale = match v {
278 Some(v) => v.scale,
279 None => 0,
280 };
281 assert!((scale as u32) <= (i8::MAX as u32));
282 Ok(PostgresType::Numeric(scale.try_into().unwrap_or_default()))
283 }
284 &Type::OID => Ok(PostgresType::Oid),
285 &Type::NAME => Ok(PostgresType::Name),
286 &Type::VARCHAR => Ok(PostgresType::Varchar),
287 &Type::BPCHAR => Ok(PostgresType::Bpchar),
288 &Type::TEXT => Ok(PostgresType::Text),
289 &Type::BYTEA => Ok(PostgresType::Bytea),
290 &Type::DATE => Ok(PostgresType::Date),
291 &Type::TIMESTAMP => Ok(PostgresType::Timestamp),
292 &Type::TIMESTAMPTZ => Ok(PostgresType::TimestampTz),
293 &Type::TIME => Ok(PostgresType::Time),
294 &Type::INTERVAL => Ok(PostgresType::Interval),
295 &Type::BOOL => Ok(PostgresType::Bool),
296 &Type::JSON => Ok(PostgresType::Json),
297 &Type::JSONB => Ok(PostgresType::Jsonb),
298 &Type::INT2_ARRAY => Ok(PostgresType::Int2Array),
299 &Type::INT4_ARRAY => Ok(PostgresType::Int4Array),
300 &Type::INT8_ARRAY => Ok(PostgresType::Int8Array),
301 &Type::FLOAT4_ARRAY => Ok(PostgresType::Float4Array),
302 &Type::FLOAT8_ARRAY => Ok(PostgresType::Float8Array),
303 &Type::VARCHAR_ARRAY => Ok(PostgresType::VarcharArray),
304 &Type::BPCHAR_ARRAY => Ok(PostgresType::BpcharArray),
305 &Type::TEXT_ARRAY => Ok(PostgresType::TextArray),
306 &Type::BYTEA_ARRAY => Ok(PostgresType::ByteaArray),
307 &Type::BOOL_ARRAY => Ok(PostgresType::BoolArray),
308 &Type::XML => Ok(PostgresType::Xml),
309 &Type::UUID => Ok(PostgresType::Uuid),
310 other if other.name().eq_ignore_ascii_case("geometry") => Ok(PostgresType::PostGisGeometry),
311 _ => Err(DataFusionError::NotImplemented(format!(
312 "Unsupported postgres type {pg_type:?}",
313 ))),
314 }
315}
316
317fn build_remote_schema(row: &Row) -> DFResult<RemoteSchema> {
318 let mut remote_fields = vec![];
319 for (idx, col) in row.columns().iter().enumerate() {
320 remote_fields.push(RemoteField::new(
321 col.name(),
322 RemoteType::Postgres(pg_type_to_remote_type(col.type_(), row, idx)?),
323 true,
324 ));
325 }
326 Ok(RemoteSchema::new(remote_fields))
327}
328
329macro_rules! handle_primitive_type {
330 ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
331 let builder = $builder
332 .as_any_mut()
333 .downcast_mut::<$builder_ty>()
334 .unwrap_or_else(|| {
335 panic!(
336 "Failed to downcast builder to {} for {:?} and {:?}",
337 stringify!($builder_ty),
338 $field,
339 $col
340 )
341 });
342 let v: Option<$value_ty> = $row.try_get($index).map_err(|e| {
343 DataFusionError::Execution(format!(
344 "Failed to get {} value for {:?} and {:?}: {e:?}",
345 stringify!($value_ty),
346 $field,
347 $col
348 ))
349 })?;
350
351 match v {
352 Some(v) => builder.append_value($convert(v)?),
353 None => builder.append_null(),
354 }
355 }};
356}
357
358macro_rules! handle_primitive_array_type {
359 ($builder:expr, $field:expr, $col:expr, $values_builder_ty:ty, $primitive_value_ty:ty, $row:expr, $index:expr) => {{
360 let builder = $builder
361 .as_any_mut()
362 .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
363 .unwrap_or_else(|| {
364 panic!(
365 "Failed to downcast builder to ListBuilder<Box<dyn ArrayBuilder>> for {:?} and {:?}",
366 $field, $col
367 )
368 });
369 let values_builder = builder
370 .values()
371 .as_any_mut()
372 .downcast_mut::<$values_builder_ty>()
373 .unwrap_or_else(|| {
374 panic!(
375 "Failed to downcast values builder to {} for {:?} and {:?}",
376 stringify!($builder_ty),
377 $field,
378 $col,
379 )
380 });
381 let v: Option<Vec<$primitive_value_ty>> = $row.try_get($index).map_err(|e| {
382 DataFusionError::Execution(format!(
383 "Failed to get {} array value for {:?} and {:?}: {e:?}",
384 stringify!($value_ty),
385 $field,
386 $col,
387 ))
388 })?;
389
390 match v {
391 Some(v) => {
392 let v = v.into_iter().map(Some);
393 values_builder.extend(v);
394 builder.append(true);
395 }
396 None => builder.append_null(),
397 }
398 }};
399}
400
401#[derive(Debug)]
402struct BigDecimalFromSql {
403 inner: BigDecimal,
404 scale: u16,
405}
406
407impl BigDecimalFromSql {
408 fn to_decimal_128(&self) -> Option<i128> {
409 big_decimal_to_i128(&self.inner, Some(self.scale as i32))
410 }
411}
412
413#[allow(clippy::cast_sign_loss)]
414#[allow(clippy::cast_possible_wrap)]
415#[allow(clippy::cast_possible_truncation)]
416impl<'a> FromSql<'a> for BigDecimalFromSql {
417 fn from_sql(
418 _ty: &Type,
419 raw: &'a [u8],
420 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
421 let raw_u16: Vec<u16> = raw
422 .chunks(2)
423 .map(|chunk| {
424 if chunk.len() == 2 {
425 u16::from_be_bytes([chunk[0], chunk[1]])
426 } else {
427 u16::from_be_bytes([chunk[0], 0])
428 }
429 })
430 .collect();
431
432 let base_10_000_digit_count = raw_u16[0];
433 let weight = raw_u16[1] as i16;
434 let sign = raw_u16[2];
435 let scale = raw_u16[3];
436
437 let mut base_10_000_digits = Vec::new();
438 for i in 4..4 + base_10_000_digit_count {
439 base_10_000_digits.push(raw_u16[i as usize]);
440 }
441
442 let mut u8_digits = Vec::new();
443 for &base_10_000_digit in base_10_000_digits.iter().rev() {
444 let mut base_10_000_digit = base_10_000_digit;
445 let mut temp_result = Vec::new();
446 while base_10_000_digit > 0 {
447 temp_result.push((base_10_000_digit % 10) as u8);
448 base_10_000_digit /= 10;
449 }
450 while temp_result.len() < 4 {
451 temp_result.push(0);
452 }
453 u8_digits.extend(temp_result);
454 }
455 u8_digits.reverse();
456
457 let value_scale = 4 * (i64::from(base_10_000_digit_count) - i64::from(weight) - 1);
458 let size = i64::try_from(u8_digits.len())? + i64::from(scale) - value_scale;
459 u8_digits.resize(size as usize, 0);
460
461 let sign = match sign {
462 0x4000 => Sign::Minus,
463 0x0000 => Sign::Plus,
464 _ => {
465 return Err(Box::new(DataFusionError::Execution(
466 "Failed to parse big decimal from postgres numeric value".to_string(),
467 )));
468 }
469 };
470
471 let Some(digits) = BigInt::from_radix_be(sign, u8_digits.as_slice(), 10) else {
472 return Err(Box::new(DataFusionError::Execution(
473 "Failed to parse big decimal from postgres numeric value".to_string(),
474 )));
475 };
476 Ok(BigDecimalFromSql {
477 inner: BigDecimal::new(digits, i64::from(scale)),
478 scale,
479 })
480 }
481
482 fn accepts(ty: &Type) -> bool {
483 matches!(*ty, Type::NUMERIC)
484 }
485}
486
487#[derive(Debug)]
490struct IntervalFromSql {
491 time: i64,
492 day: i32,
493 month: i32,
494}
495
496impl<'a> FromSql<'a> for IntervalFromSql {
497 fn from_sql(
498 _ty: &Type,
499 raw: &'a [u8],
500 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
501 let mut cursor = std::io::Cursor::new(raw);
502
503 let time = cursor.read_i64::<BigEndian>()?;
504 let day = cursor.read_i32::<BigEndian>()?;
505 let month = cursor.read_i32::<BigEndian>()?;
506
507 Ok(IntervalFromSql { time, day, month })
508 }
509
510 fn accepts(ty: &Type) -> bool {
511 matches!(*ty, Type::INTERVAL)
512 }
513}
514
515struct GeometryFromSql<'a> {
516 wkb: &'a [u8],
517}
518
519impl<'a> FromSql<'a> for GeometryFromSql<'a> {
520 fn from_sql(
521 _ty: &Type,
522 raw: &'a [u8],
523 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
524 Ok(GeometryFromSql { wkb: raw })
525 }
526
527 fn accepts(ty: &Type) -> bool {
528 matches!(ty.name(), "geometry")
529 }
530}
531
532struct XmlFromSql<'a> {
533 xml: &'a str,
534}
535
536impl<'a> FromSql<'a> for XmlFromSql<'a> {
537 fn from_sql(
538 _ty: &Type,
539 raw: &'a [u8],
540 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
541 let xml = str::from_utf8(raw)?;
542 Ok(XmlFromSql { xml })
543 }
544
545 fn accepts(ty: &Type) -> bool {
546 matches!(*ty, Type::XML)
547 }
548}
549
550fn rows_to_batch(
551 rows: &[Row],
552 table_schema: &SchemaRef,
553 projection: Option<&Vec<usize>>,
554) -> DFResult<RecordBatch> {
555 let projected_schema = project_schema(table_schema, projection)?;
556 let mut array_builders = vec![];
557 for field in table_schema.fields() {
558 let builder = make_builder(field.data_type(), rows.len());
559 array_builders.push(builder);
560 }
561
562 for row in rows {
563 for (idx, field) in table_schema.fields.iter().enumerate() {
564 if !projections_contains(projection, idx) {
565 continue;
566 }
567 let builder = &mut array_builders[idx];
568 let col = row.columns().get(idx);
569 match field.data_type() {
570 DataType::Int16 => {
571 handle_primitive_type!(
572 builder,
573 field,
574 col,
575 Int16Builder,
576 i16,
577 row,
578 idx,
579 just_return
580 );
581 }
582 DataType::Int32 => {
583 handle_primitive_type!(
584 builder,
585 field,
586 col,
587 Int32Builder,
588 i32,
589 row,
590 idx,
591 just_return
592 );
593 }
594 DataType::UInt32 => {
595 handle_primitive_type!(
596 builder,
597 field,
598 col,
599 UInt32Builder,
600 u32,
601 row,
602 idx,
603 just_return
604 );
605 }
606 DataType::Int64 => {
607 handle_primitive_type!(
608 builder,
609 field,
610 col,
611 Int64Builder,
612 i64,
613 row,
614 idx,
615 just_return
616 );
617 }
618 DataType::Float32 => {
619 handle_primitive_type!(
620 builder,
621 field,
622 col,
623 Float32Builder,
624 f32,
625 row,
626 idx,
627 just_return
628 );
629 }
630 DataType::Float64 => {
631 handle_primitive_type!(
632 builder,
633 field,
634 col,
635 Float64Builder,
636 f64,
637 row,
638 idx,
639 just_return
640 );
641 }
642 DataType::Decimal128(_precision, _scale) => {
643 handle_primitive_type!(
644 builder,
645 field,
646 col,
647 Decimal128Builder,
648 BigDecimalFromSql,
649 row,
650 idx,
651 |v: BigDecimalFromSql| {
652 v.to_decimal_128().ok_or_else(|| {
653 DataFusionError::Execution(format!(
654 "Failed to convert BigDecimal {v:?} to i128",
655 ))
656 })
657 }
658 );
659 }
660 DataType::Utf8 => {
661 if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("xml") {
662 let convert: for<'a> fn(XmlFromSql<'a>) -> DFResult<&'a str> =
663 |v| Ok(v.xml);
664 handle_primitive_type!(
665 builder,
666 field,
667 col,
668 StringBuilder,
669 XmlFromSql,
670 row,
671 idx,
672 convert
673 );
674 } else {
675 handle_primitive_type!(
676 builder,
677 field,
678 col,
679 StringBuilder,
680 &str,
681 row,
682 idx,
683 just_return
684 );
685 }
686 }
687 DataType::LargeUtf8 => {
688 if col.is_some() && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB) {
689 handle_primitive_type!(
690 builder,
691 field,
692 col,
693 LargeStringBuilder,
694 serde_json::value::Value,
695 row,
696 idx,
697 |v: serde_json::value::Value| {
698 Ok::<_, DataFusionError>(v.to_string())
699 }
700 );
701 } else {
702 handle_primitive_type!(
703 builder,
704 field,
705 col,
706 LargeStringBuilder,
707 &str,
708 row,
709 idx,
710 just_return
711 );
712 }
713 }
714 DataType::Binary => {
715 if col.is_some() && col.unwrap().type_().name().eq_ignore_ascii_case("geometry")
716 {
717 let convert: for<'a> fn(GeometryFromSql<'a>) -> DFResult<&'a [u8]> =
718 |v| Ok(v.wkb);
719 handle_primitive_type!(
720 builder,
721 field,
722 col,
723 BinaryBuilder,
724 GeometryFromSql,
725 row,
726 idx,
727 convert
728 );
729 } else if col.is_some()
730 && matches!(col.unwrap().type_(), &Type::JSON | &Type::JSONB)
731 {
732 handle_primitive_type!(
733 builder,
734 field,
735 col,
736 BinaryBuilder,
737 serde_json::value::Value,
738 row,
739 idx,
740 |v: serde_json::value::Value| {
741 Ok::<_, DataFusionError>(v.to_string().into_bytes())
742 }
743 );
744 } else {
745 handle_primitive_type!(
746 builder,
747 field,
748 col,
749 BinaryBuilder,
750 Vec<u8>,
751 row,
752 idx,
753 just_return
754 );
755 }
756 }
757 DataType::FixedSizeBinary(_) => {
758 let builder = builder
759 .as_any_mut()
760 .downcast_mut::<FixedSizeBinaryBuilder>()
761 .unwrap_or_else(|| {
762 panic!(
763 "Failed to downcast builder to FixedSizeBinaryBuilder for {field:?}"
764 )
765 });
766 let v = if col.is_some()
767 && col.unwrap().type_().name().eq_ignore_ascii_case("uuid")
768 {
769 let v: Option<Uuid> = row.try_get(idx).map_err(|e| {
770 DataFusionError::Execution(format!(
771 "Failed to get Uuid value for field {:?}: {e:?}",
772 field
773 ))
774 })?;
775 v.map(|v| v.as_bytes().to_vec())
776 } else {
777 let v: Option<Vec<u8>> = row.try_get(idx).map_err(|e| {
778 DataFusionError::Execution(format!(
779 "Failed to get FixedSizeBinary value for field {:?}: {e:?}",
780 field
781 ))
782 })?;
783 v
784 };
785
786 match v {
787 Some(v) => builder.append_value(v)?,
788 None => builder.append_null(),
789 }
790 }
791 DataType::Timestamp(TimeUnit::Microsecond, None) => {
792 handle_primitive_type!(
793 builder,
794 field,
795 col,
796 TimestampMicrosecondBuilder,
797 chrono::NaiveDateTime,
798 row,
799 idx,
800 |v: chrono::NaiveDateTime| {
801 let timestamp: i64 = v.and_utc().timestamp_micros();
802
803 Ok::<i64, DataFusionError>(timestamp)
804 }
805 );
806 }
807 DataType::Timestamp(TimeUnit::Microsecond, Some(_tz)) => {
808 handle_primitive_type!(
809 builder,
810 field,
811 col,
812 TimestampMicrosecondBuilder,
813 chrono::DateTime<chrono::Utc>,
814 row,
815 idx,
816 |v: chrono::DateTime<chrono::Utc>| {
817 let timestamp: i64 = v.timestamp_micros();
818 Ok::<_, DataFusionError>(timestamp)
819 }
820 );
821 }
822 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
823 handle_primitive_type!(
824 builder,
825 field,
826 col,
827 TimestampNanosecondBuilder,
828 chrono::NaiveDateTime,
829 row,
830 idx,
831 |v: chrono::NaiveDateTime| {
832 let timestamp: i64 = v.and_utc().timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
833 Ok::<i64, DataFusionError>(timestamp)
834 }
835 );
836 }
837 DataType::Timestamp(TimeUnit::Nanosecond, Some(_tz)) => {
838 handle_primitive_type!(
839 builder,
840 field,
841 col,
842 TimestampNanosecondBuilder,
843 chrono::DateTime<chrono::Utc>,
844 row,
845 idx,
846 |v: chrono::DateTime<chrono::Utc>| {
847 let timestamp: i64 = v.timestamp_nanos_opt().unwrap_or_else(|| panic!("Failed to get timestamp in nanoseconds from {v} for {field:?} and {col:?}"));
848 Ok::<_, DataFusionError>(timestamp)
849 }
850 );
851 }
852 DataType::Time64(TimeUnit::Microsecond) => {
853 handle_primitive_type!(
854 builder,
855 field,
856 col,
857 Time64MicrosecondBuilder,
858 chrono::NaiveTime,
859 row,
860 idx,
861 |v: chrono::NaiveTime| {
862 let seconds = i64::from(v.num_seconds_from_midnight());
863 let microseconds = i64::from(v.nanosecond()) / 1000;
864 Ok::<_, DataFusionError>(seconds * 1_000_000 + microseconds)
865 }
866 );
867 }
868 DataType::Time64(TimeUnit::Nanosecond) => {
869 handle_primitive_type!(
870 builder,
871 field,
872 col,
873 Time64NanosecondBuilder,
874 chrono::NaiveTime,
875 row,
876 idx,
877 |v: chrono::NaiveTime| {
878 let timestamp: i64 = i64::from(v.num_seconds_from_midnight())
879 * 1_000_000_000
880 + i64::from(v.nanosecond());
881 Ok::<_, DataFusionError>(timestamp)
882 }
883 );
884 }
885 DataType::Date32 => {
886 handle_primitive_type!(
887 builder,
888 field,
889 col,
890 Date32Builder,
891 chrono::NaiveDate,
892 row,
893 idx,
894 |v| { Ok::<_, DataFusionError>(Date32Type::from_naive_date(v)) }
895 );
896 }
897 DataType::Interval(IntervalUnit::MonthDayNano) => {
898 handle_primitive_type!(
899 builder,
900 field,
901 col,
902 IntervalMonthDayNanoBuilder,
903 IntervalFromSql,
904 row,
905 idx,
906 |v: IntervalFromSql| {
907 let interval_month_day_nano = IntervalMonthDayNanoType::make_value(
908 v.month,
909 v.day,
910 v.time * 1_000,
911 );
912 Ok::<_, DataFusionError>(interval_month_day_nano)
913 }
914 );
915 }
916 DataType::Boolean => {
917 handle_primitive_type!(
918 builder,
919 field,
920 col,
921 BooleanBuilder,
922 bool,
923 row,
924 idx,
925 just_return
926 );
927 }
928 DataType::List(inner) => match inner.data_type() {
929 DataType::Int16 => {
930 handle_primitive_array_type!(
931 builder,
932 field,
933 col,
934 Int16Builder,
935 i16,
936 row,
937 idx
938 );
939 }
940 DataType::Int32 => {
941 handle_primitive_array_type!(
942 builder,
943 field,
944 col,
945 Int32Builder,
946 i32,
947 row,
948 idx
949 );
950 }
951 DataType::Int64 => {
952 handle_primitive_array_type!(
953 builder,
954 field,
955 col,
956 Int64Builder,
957 i64,
958 row,
959 idx
960 );
961 }
962 DataType::Float32 => {
963 handle_primitive_array_type!(
964 builder,
965 field,
966 col,
967 Float32Builder,
968 f32,
969 row,
970 idx
971 );
972 }
973 DataType::Float64 => {
974 handle_primitive_array_type!(
975 builder,
976 field,
977 col,
978 Float64Builder,
979 f64,
980 row,
981 idx
982 );
983 }
984 DataType::Utf8 => {
985 handle_primitive_array_type!(
986 builder,
987 field,
988 col,
989 StringBuilder,
990 &str,
991 row,
992 idx
993 );
994 }
995 DataType::Binary => {
996 handle_primitive_array_type!(
997 builder,
998 field,
999 col,
1000 BinaryBuilder,
1001 Vec<u8>,
1002 row,
1003 idx
1004 );
1005 }
1006 DataType::Boolean => {
1007 handle_primitive_array_type!(
1008 builder,
1009 field,
1010 col,
1011 BooleanBuilder,
1012 bool,
1013 row,
1014 idx
1015 );
1016 }
1017 _ => {
1018 return Err(DataFusionError::NotImplemented(format!(
1019 "Unsupported list data type {} for col: {:?}",
1020 field.data_type(),
1021 col
1022 )));
1023 }
1024 },
1025 _ => {
1026 return Err(DataFusionError::NotImplemented(format!(
1027 "Unsupported data type {} for col: {:?}",
1028 field.data_type(),
1029 col
1030 )));
1031 }
1032 }
1033 }
1034 }
1035 let projected_columns = array_builders
1036 .into_iter()
1037 .enumerate()
1038 .filter(|(idx, _)| projections_contains(projection, *idx))
1039 .map(|(_, mut builder)| builder.finish())
1040 .collect::<Vec<ArrayRef>>();
1041 let options = RecordBatchOptions::new().with_row_count(Some(rows.len()));
1042 Ok(RecordBatch::try_new_with_options(
1043 projected_schema,
1044 projected_columns,
1045 &options,
1046 )?)
1047}