Skip to main content

arrow_pg/
encoder.rs

1use std::str::FromStr;
2use std::sync::Arc;
3
4#[cfg(not(feature = "datafusion"))]
5use arrow::{array::*, datatypes::*};
6use chrono::NaiveTime;
7use chrono::{NaiveDate, NaiveDateTime};
8#[cfg(feature = "datafusion")]
9use datafusion::arrow::{array::*, datatypes::*};
10use pg_interval::Interval as PgInterval;
11use pgwire::api::results::{CopyEncoder, DataRowEncoder, FieldInfo};
12use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
13use pgwire::messages::copy::CopyData;
14use pgwire::messages::data::DataRow;
15use pgwire::types::ToSqlText;
16use postgres_types::ToSql;
17use rust_decimal::Decimal;
18use timezone::Tz;
19
20use crate::error::ToSqlError;
21#[cfg(feature = "postgis")]
22use crate::geo_encoder::encode_geo;
23use crate::list_encoder::encode_list;
24use crate::struct_encoder::encode_struct;
25
26pub trait Encoder {
27    type Item;
28
29    fn encode_field<T>(&mut self, value: &T, pg_field: &FieldInfo) -> PgWireResult<()>
30    where
31        T: ToSql + ToSqlText + Sized;
32
33    fn take_row(&mut self) -> Self::Item;
34}
35
36impl Encoder for DataRowEncoder {
37    type Item = DataRow;
38
39    fn encode_field<T>(&mut self, value: &T, pg_field: &FieldInfo) -> PgWireResult<()>
40    where
41        T: ToSql + ToSqlText + Sized,
42    {
43        self.encode_field_with_type_and_format(
44            value,
45            pg_field.datatype(),
46            pg_field.format(),
47            pg_field.format_options(),
48        )
49    }
50
51    fn take_row(&mut self) -> Self::Item {
52        self.take_row()
53    }
54}
55
56impl Encoder for CopyEncoder {
57    type Item = CopyData;
58
59    fn encode_field<T>(&mut self, value: &T, _pg_field: &FieldInfo) -> PgWireResult<()>
60    where
61        T: ToSql + ToSqlText + Sized,
62    {
63        self.encode_field(value)
64    }
65
66    fn take_row(&mut self) -> Self::Item {
67        self.take_copy()
68    }
69}
70
71fn get_bool_value(arr: &Arc<dyn Array>, idx: usize) -> Option<bool> {
72    (!arr.is_null(idx)).then(|| {
73        arr.as_any()
74            .downcast_ref::<BooleanArray>()
75            .unwrap()
76            .value(idx)
77    })
78}
79
80macro_rules! get_primitive_value {
81    ($name:ident, $t:ty, $pt:ty) => {
82        fn $name(arr: &Arc<dyn Array>, idx: usize) -> Option<$pt> {
83            (!arr.is_null(idx)).then(|| {
84                arr.as_any()
85                    .downcast_ref::<PrimitiveArray<$t>>()
86                    .unwrap()
87                    .value(idx)
88            })
89        }
90    };
91}
92
93get_primitive_value!(get_i8_value, Int8Type, i8);
94get_primitive_value!(get_i16_value, Int16Type, i16);
95get_primitive_value!(get_i32_value, Int32Type, i32);
96get_primitive_value!(get_i64_value, Int64Type, i64);
97get_primitive_value!(get_u8_value, UInt8Type, u8);
98get_primitive_value!(get_u16_value, UInt16Type, u16);
99get_primitive_value!(get_u32_value, UInt32Type, u32);
100get_primitive_value!(get_u64_value, UInt64Type, u64);
101
102fn get_u64_as_decimal_value(arr: &Arc<dyn Array>, idx: usize) -> Option<Decimal> {
103    get_u64_value(arr, idx).map(Decimal::from)
104}
105get_primitive_value!(get_f32_value, Float32Type, f32);
106get_primitive_value!(get_f64_value, Float64Type, f64);
107
108fn get_utf8_view_value(arr: &Arc<dyn Array>, idx: usize) -> Option<&str> {
109    (!arr.is_null(idx)).then(|| {
110        arr.as_any()
111            .downcast_ref::<StringViewArray>()
112            .unwrap()
113            .value(idx)
114    })
115}
116
117fn get_binary_view_value(arr: &Arc<dyn Array>, idx: usize) -> Option<&[u8]> {
118    (!arr.is_null(idx)).then(|| {
119        arr.as_any()
120            .downcast_ref::<BinaryViewArray>()
121            .unwrap()
122            .value(idx)
123    })
124}
125
126fn get_utf8_value(arr: &Arc<dyn Array>, idx: usize) -> Option<&str> {
127    (!arr.is_null(idx)).then(|| {
128        arr.as_any()
129            .downcast_ref::<StringArray>()
130            .unwrap()
131            .value(idx)
132    })
133}
134
135fn get_large_utf8_value(arr: &Arc<dyn Array>, idx: usize) -> Option<&str> {
136    (!arr.is_null(idx)).then(|| {
137        arr.as_any()
138            .downcast_ref::<LargeStringArray>()
139            .unwrap()
140            .value(idx)
141    })
142}
143
144fn get_binary_value(arr: &Arc<dyn Array>, idx: usize) -> Option<&[u8]> {
145    (!arr.is_null(idx)).then(|| {
146        arr.as_any()
147            .downcast_ref::<BinaryArray>()
148            .unwrap()
149            .value(idx)
150    })
151}
152
153fn get_large_binary_value(arr: &Arc<dyn Array>, idx: usize) -> Option<&[u8]> {
154    (!arr.is_null(idx)).then(|| {
155        arr.as_any()
156            .downcast_ref::<LargeBinaryArray>()
157            .unwrap()
158            .value(idx)
159    })
160}
161
162fn get_date32_value(arr: &Arc<dyn Array>, idx: usize) -> Option<NaiveDate> {
163    if arr.is_null(idx) {
164        return None;
165    }
166    arr.as_any()
167        .downcast_ref::<Date32Array>()
168        .unwrap()
169        .value_as_date(idx)
170}
171
172fn get_date64_value(arr: &Arc<dyn Array>, idx: usize) -> Option<NaiveDate> {
173    if arr.is_null(idx) {
174        return None;
175    }
176    arr.as_any()
177        .downcast_ref::<Date64Array>()
178        .unwrap()
179        .value_as_date(idx)
180}
181
182fn get_time32_second_value(arr: &Arc<dyn Array>, idx: usize) -> Option<NaiveTime> {
183    if arr.is_null(idx) {
184        return None;
185    }
186    arr.as_any()
187        .downcast_ref::<Time32SecondArray>()
188        .unwrap()
189        .value_as_time(idx)
190}
191
192fn get_time32_millisecond_value(arr: &Arc<dyn Array>, idx: usize) -> Option<NaiveTime> {
193    if arr.is_null(idx) {
194        return None;
195    }
196    arr.as_any()
197        .downcast_ref::<Time32MillisecondArray>()
198        .unwrap()
199        .value_as_time(idx)
200}
201
202fn get_time64_microsecond_value(arr: &Arc<dyn Array>, idx: usize) -> Option<NaiveTime> {
203    if arr.is_null(idx) {
204        return None;
205    }
206    arr.as_any()
207        .downcast_ref::<Time64MicrosecondArray>()
208        .unwrap()
209        .value_as_time(idx)
210}
211fn get_time64_nanosecond_value(arr: &Arc<dyn Array>, idx: usize) -> Option<NaiveTime> {
212    if arr.is_null(idx) {
213        return None;
214    }
215    arr.as_any()
216        .downcast_ref::<Time64NanosecondArray>()
217        .unwrap()
218        .value_as_time(idx)
219}
220
221fn get_numeric_128_value(
222    arr: &Arc<dyn Array>,
223    idx: usize,
224    scale: u32,
225) -> PgWireResult<Option<Decimal>> {
226    if arr.is_null(idx) {
227        return Ok(None);
228    }
229
230    let array = arr.as_any().downcast_ref::<Decimal128Array>().unwrap();
231    let value = array.value(idx);
232    Decimal::try_from_i128_with_scale(value, scale)
233        .map_err(|e| {
234            let error_code = match e {
235                rust_decimal::Error::ExceedsMaximumPossibleValue => {
236                    "22003" // numeric_value_out_of_range
237                }
238                rust_decimal::Error::LessThanMinimumPossibleValue => {
239                    "22003" // numeric_value_out_of_range
240                }
241                rust_decimal::Error::ScaleExceedsMaximumPrecision(scale) => {
242                    return PgWireError::UserError(Box::new(ErrorInfo::new(
243                        "ERROR".to_string(),
244                        "22003".to_string(),
245                        format!("Scale {scale} exceeds maximum precision for numeric type"),
246                    )));
247                }
248                _ => "22003", // generic numeric_value_out_of_range
249            };
250            PgWireError::UserError(Box::new(ErrorInfo::new(
251                "ERROR".to_string(),
252                error_code.to_string(),
253                format!("Numeric value conversion failed: {e}"),
254            )))
255        })
256        .map(Some)
257}
258
259pub fn encode_value<T: Encoder>(
260    encoder: &mut T,
261    arr: &Arc<dyn Array>,
262    idx: usize,
263    arrow_field: &Field,
264    pg_field: &FieldInfo,
265) -> PgWireResult<()> {
266    let arrow_type = arrow_field.data_type();
267
268    #[cfg(feature = "postgis")]
269    if let Some(geoarrow_type) = geoarrow_schema::GeoArrowType::from_extension_field(arrow_field)
270        .map_err(|e| PgWireError::ApiError(Box::new(e)))?
271    {
272        let geoarrow_array: Arc<dyn geoarrow::array::GeoArrowArray> =
273            geoarrow::array::from_arrow_array(arr, arrow_field)
274                .map_err(|e| PgWireError::ApiError(Box::new(e)))?;
275
276        return encode_geo(
277            encoder,
278            geoarrow_type,
279            &geoarrow_array,
280            idx,
281            arrow_field,
282            pg_field,
283        );
284    }
285
286    match arrow_type {
287        DataType::Null => encoder.encode_field(&None::<i8>, pg_field)?,
288        DataType::Boolean => encoder.encode_field(&get_bool_value(arr, idx), pg_field)?,
289        DataType::Int8 => encoder.encode_field(&get_i8_value(arr, idx), pg_field)?,
290        DataType::Int16 => encoder.encode_field(&get_i16_value(arr, idx), pg_field)?,
291        DataType::Int32 => encoder.encode_field(&get_i32_value(arr, idx), pg_field)?,
292        DataType::Int64 => encoder.encode_field(&get_i64_value(arr, idx), pg_field)?,
293        DataType::UInt8 => {
294            encoder.encode_field(&(get_u8_value(arr, idx).map(|x| x as i16)), pg_field)?
295        }
296        DataType::UInt16 => {
297            encoder.encode_field(&(get_u16_value(arr, idx).map(|x| x as i32)), pg_field)?
298        }
299        DataType::UInt32 => {
300            encoder.encode_field(&get_u32_value(arr, idx).map(|x| x as i64), pg_field)?
301        }
302        DataType::UInt64 => encoder.encode_field(&get_u64_as_decimal_value(arr, idx), pg_field)?,
303        DataType::Float32 => encoder.encode_field(&get_f32_value(arr, idx), pg_field)?,
304        DataType::Float64 => encoder.encode_field(&get_f64_value(arr, idx), pg_field)?,
305        DataType::Decimal128(_, s) => {
306            encoder.encode_field(&get_numeric_128_value(arr, idx, *s as u32)?, pg_field)?
307        }
308        DataType::Utf8 => encoder.encode_field(&get_utf8_value(arr, idx), pg_field)?,
309        DataType::Utf8View => encoder.encode_field(&get_utf8_view_value(arr, idx), pg_field)?,
310        DataType::BinaryView => encoder.encode_field(&get_binary_view_value(arr, idx), pg_field)?,
311        DataType::LargeUtf8 => encoder.encode_field(&get_large_utf8_value(arr, idx), pg_field)?,
312        DataType::Binary => encoder.encode_field(&get_binary_value(arr, idx), pg_field)?,
313        DataType::LargeBinary => {
314            encoder.encode_field(&get_large_binary_value(arr, idx), pg_field)?
315        }
316        DataType::Date32 => encoder.encode_field(&get_date32_value(arr, idx), pg_field)?,
317        DataType::Date64 => encoder.encode_field(&get_date64_value(arr, idx), pg_field)?,
318        DataType::Time32(unit) => match unit {
319            TimeUnit::Second => {
320                encoder.encode_field(&get_time32_second_value(arr, idx), pg_field)?
321            }
322            TimeUnit::Millisecond => {
323                encoder.encode_field(&get_time32_millisecond_value(arr, idx), pg_field)?
324            }
325            _ => {}
326        },
327        DataType::Time64(unit) => match unit {
328            TimeUnit::Microsecond => {
329                encoder.encode_field(&get_time64_microsecond_value(arr, idx), pg_field)?
330            }
331            TimeUnit::Nanosecond => {
332                encoder.encode_field(&get_time64_nanosecond_value(arr, idx), pg_field)?
333            }
334            _ => {}
335        },
336        DataType::Timestamp(unit, timezone) => match unit {
337            TimeUnit::Second => {
338                if arr.is_null(idx) {
339                    return encoder.encode_field(&None::<NaiveDateTime>, pg_field);
340                }
341                let ts_array = arr.as_any().downcast_ref::<TimestampSecondArray>().unwrap();
342                if let Some(tz) = timezone {
343                    let tz = Tz::from_str(tz.as_ref()).map_err(ToSqlError::from)?;
344                    let value = ts_array
345                        .value_as_datetime_with_tz(idx, tz)
346                        .map(|d| d.fixed_offset());
347
348                    encoder.encode_field(&value, pg_field)?;
349                } else {
350                    let value = ts_array.value_as_datetime(idx);
351                    encoder.encode_field(&value, pg_field)?;
352                }
353            }
354            TimeUnit::Millisecond => {
355                if arr.is_null(idx) {
356                    return encoder.encode_field(&None::<NaiveDateTime>, pg_field);
357                }
358                let ts_array = arr
359                    .as_any()
360                    .downcast_ref::<TimestampMillisecondArray>()
361                    .unwrap();
362                if let Some(tz) = timezone {
363                    let tz = Tz::from_str(tz.as_ref()).map_err(ToSqlError::from)?;
364                    let value = ts_array
365                        .value_as_datetime_with_tz(idx, tz)
366                        .map(|d| d.fixed_offset());
367                    encoder.encode_field(&value, pg_field)?;
368                } else {
369                    let value = ts_array.value_as_datetime(idx);
370                    encoder.encode_field(&value, pg_field)?;
371                }
372            }
373            TimeUnit::Microsecond => {
374                if arr.is_null(idx) {
375                    return encoder.encode_field(&None::<NaiveDateTime>, pg_field);
376                }
377                let ts_array = arr
378                    .as_any()
379                    .downcast_ref::<TimestampMicrosecondArray>()
380                    .unwrap();
381                if let Some(tz) = timezone {
382                    let tz = Tz::from_str(tz.as_ref()).map_err(ToSqlError::from)?;
383                    let value = ts_array
384                        .value_as_datetime_with_tz(idx, tz)
385                        .map(|d| d.fixed_offset());
386                    encoder.encode_field(&value, pg_field)?;
387                } else {
388                    let value = ts_array.value_as_datetime(idx);
389                    encoder.encode_field(&value, pg_field)?;
390                }
391            }
392            TimeUnit::Nanosecond => {
393                if arr.is_null(idx) {
394                    return encoder.encode_field(&None::<NaiveDateTime>, pg_field);
395                }
396                let ts_array = arr
397                    .as_any()
398                    .downcast_ref::<TimestampNanosecondArray>()
399                    .unwrap();
400                if let Some(tz) = timezone {
401                    let tz = Tz::from_str(tz.as_ref()).map_err(ToSqlError::from)?;
402                    let value = ts_array
403                        .value_as_datetime_with_tz(idx, tz)
404                        .map(|d| d.fixed_offset());
405                    encoder.encode_field(&value, pg_field)?;
406                } else {
407                    let value = ts_array.value_as_datetime(idx);
408                    encoder.encode_field(&value, pg_field)?;
409                }
410            }
411        },
412        DataType::Interval(interval_unit) => match interval_unit {
413            IntervalUnit::YearMonth => {
414                let interval_array = arr
415                    .as_any()
416                    .downcast_ref::<IntervalYearMonthArray>()
417                    .unwrap();
418                let months = IntervalYearMonthType::to_months(interval_array.value(idx));
419                encoder.encode_field(&PgInterval::new(months, 0, 0), pg_field)?;
420            }
421            IntervalUnit::DayTime => {
422                let interval_array = arr.as_any().downcast_ref::<IntervalDayTimeArray>().unwrap();
423                let (days, millis) = IntervalDayTimeType::to_parts(interval_array.value(idx));
424                encoder
425                    .encode_field(&PgInterval::new(0, days, millis as i64 * 1000i64), pg_field)?;
426            }
427            IntervalUnit::MonthDayNano => {
428                let interval_array = arr
429                    .as_any()
430                    .downcast_ref::<IntervalMonthDayNanoArray>()
431                    .unwrap();
432                let (months, days, nanoseconds) =
433                    IntervalMonthDayNanoType::to_parts(interval_array.value(idx));
434
435                encoder.encode_field(
436                    &PgInterval::new(months, days, nanoseconds / 1000i64),
437                    pg_field,
438                )?;
439            }
440        },
441        DataType::Duration(unit) => match unit {
442            TimeUnit::Second => {
443                if arr.is_null(idx) {
444                    return encoder.encode_field(&None::<PgInterval>, pg_field);
445                }
446                let duration_array = arr.as_any().downcast_ref::<DurationSecondArray>().unwrap();
447                let microseconds = duration_array.value(idx) * 1_000_000i64;
448                encoder.encode_field(&PgInterval::new(0, 0, microseconds), pg_field)?;
449            }
450            TimeUnit::Millisecond => {
451                if arr.is_null(idx) {
452                    return encoder.encode_field(&None::<PgInterval>, pg_field);
453                }
454                let duration_array = arr
455                    .as_any()
456                    .downcast_ref::<DurationMillisecondArray>()
457                    .unwrap();
458                let microseconds = duration_array.value(idx) * 1_000i64;
459                encoder.encode_field(&PgInterval::new(0, 0, microseconds), pg_field)?;
460            }
461            TimeUnit::Microsecond => {
462                if arr.is_null(idx) {
463                    return encoder.encode_field(&None::<PgInterval>, pg_field);
464                }
465                let duration_array = arr
466                    .as_any()
467                    .downcast_ref::<DurationMicrosecondArray>()
468                    .unwrap();
469                let microseconds = duration_array.value(idx);
470                encoder.encode_field(&PgInterval::new(0, 0, microseconds), pg_field)?;
471            }
472            TimeUnit::Nanosecond => {
473                if arr.is_null(idx) {
474                    return encoder.encode_field(&None::<PgInterval>, pg_field);
475                }
476                let duration_array = arr
477                    .as_any()
478                    .downcast_ref::<DurationNanosecondArray>()
479                    .unwrap();
480                let microseconds = duration_array.value(idx) / 1_000i64;
481                encoder.encode_field(&PgInterval::new(0, 0, microseconds), pg_field)?;
482            }
483        },
484        DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
485            if arr.is_null(idx) {
486                return encoder.encode_field(&None::<&[i8]>, pg_field);
487            }
488            let array = arr.as_any().downcast_ref::<ListArray>().unwrap().value(idx);
489            encode_list(encoder, array, pg_field)?
490        }
491        DataType::Struct(arrow_fields) => encode_struct(encoder, arr, idx, arrow_fields, pg_field)?,
492        DataType::Dictionary(_, value_type) => {
493            if arr.is_null(idx) {
494                return encoder.encode_field(&None::<i8>, pg_field);
495            }
496            // Get the dictionary values and the mapped row index
497            macro_rules! get_dict_values_and_index {
498                ($key_type:ty) => {
499                    arr.as_any()
500                        .downcast_ref::<DictionaryArray<$key_type>>()
501                        .map(|dict| (dict.values(), dict.keys().value(idx) as usize))
502                };
503            }
504
505            // Try to extract values using different key types
506            let (values, idx) = get_dict_values_and_index!(Int8Type)
507                .or_else(|| get_dict_values_and_index!(Int16Type))
508                .or_else(|| get_dict_values_and_index!(Int32Type))
509                .or_else(|| get_dict_values_and_index!(Int64Type))
510                .or_else(|| get_dict_values_and_index!(UInt8Type))
511                .or_else(|| get_dict_values_and_index!(UInt16Type))
512                .or_else(|| get_dict_values_and_index!(UInt32Type))
513                .or_else(|| get_dict_values_and_index!(UInt64Type))
514                .ok_or_else(|| {
515                    ToSqlError::from(format!(
516                        "Unsupported dictionary key type for value type {value_type}"
517                    ))
518                })?;
519
520            let inner_arrow_field = Field::new(pg_field.name(), *value_type.clone(), true);
521
522            encode_value(encoder, values, idx, &inner_arrow_field, pg_field)?
523        }
524        _ => {
525            return Err(PgWireError::ApiError(ToSqlError::from(format!(
526                "Unsupported Datatype {} and array {:?}",
527                arr.data_type(),
528                &arr
529            ))));
530        }
531    }
532
533    Ok(())
534}
535
536#[cfg(test)]
537mod tests {
538    use arrow::buffer::NullBuffer;
539    use bytes::BytesMut;
540    use pgwire::{api::results::FieldFormat, types::format::FormatOptions};
541    use postgres_types::Type;
542
543    use super::*;
544
545    #[test]
546    fn encodes_dictionary_array() {
547        #[derive(Default)]
548        struct MockEncoder {
549            encoded_value: String,
550        }
551
552        impl Encoder for MockEncoder {
553            type Item = String;
554
555            fn encode_field<T>(&mut self, value: &T, pg_field: &FieldInfo) -> PgWireResult<()>
556            where
557                T: ToSql + ToSqlText + Sized,
558            {
559                let mut bytes = BytesMut::new();
560                let _sql_text =
561                    value.to_sql_text(pg_field.datatype(), &mut bytes, &FormatOptions::default());
562                let string = String::from_utf8(bytes.to_vec());
563                self.encoded_value = string.unwrap();
564                Ok(())
565            }
566
567            fn take_row(&mut self) -> Self::Item {
568                std::mem::take(&mut self.encoded_value)
569            }
570        }
571
572        let val = "~!@&$[]()@@!!";
573        let value = StringArray::from_iter_values([val]);
574        let keys = Int8Array::from_iter_values([0, 0, 0, 0]);
575        let dict_arr: Arc<dyn Array> =
576            Arc::new(DictionaryArray::<Int8Type>::try_new(keys, Arc::new(value)).unwrap());
577
578        let mut encoder = MockEncoder::default();
579
580        let arrow_field = Field::new(
581            "x",
582            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
583            true,
584        );
585        let pg_field = FieldInfo::new("x".to_string(), None, None, Type::TEXT, FieldFormat::Text);
586        let result = encode_value(&mut encoder, &dict_arr, 2, &arrow_field, &pg_field);
587
588        assert!(result.is_ok());
589
590        assert!(encoder.encoded_value == val);
591    }
592
593    #[test]
594    fn encode_struct_null_emits_field() {
595        // Regression test: encode_struct must call encoder.encode_field for
596        // NULL struct values so a NULL indicator is written to the DataRow.
597        // Previously it returned Ok(()) without encoding, corrupting the
598        // column count.
599
600        #[derive(Default)]
601        struct CountingEncoder {
602            call_count: usize,
603        }
604
605        impl Encoder for CountingEncoder {
606            type Item = ();
607
608            fn encode_field<T>(&mut self, _value: &T, _pg_field: &FieldInfo) -> PgWireResult<()>
609            where
610                T: ToSql + ToSqlText + Sized,
611            {
612                self.call_count += 1;
613                Ok(())
614            }
615
616            fn take_row(&mut self) -> Self::Item {}
617        }
618
619        let fields = vec![
620            Arc::new(Field::new("a", DataType::Utf8, true)),
621            Arc::new(Field::new("b", DataType::Utf8, true)),
622        ];
623        let a = Arc::new(StringArray::from(vec![Some("hello"), Some("x")])) as Arc<dyn Array>;
624        let b = Arc::new(StringArray::from(vec![Some("world"), Some("y")])) as Arc<dyn Array>;
625
626        // Row 0: non-null struct, Row 1: null struct
627        let null_buffer = NullBuffer::from(vec![true, false]);
628        let struct_arr: Arc<dyn Array> = Arc::new(
629            StructArray::try_new(fields.clone().into(), vec![a, b], Some(null_buffer)).unwrap(),
630        );
631
632        let arrow_field = Field::new("s", DataType::Struct(fields.into()), true);
633        let pg_field = FieldInfo::new("s".to_string(), None, None, Type::TEXT, FieldFormat::Text);
634
635        // Encode the NULL row (index 1).
636        let mut encoder = CountingEncoder::default();
637        let result = encode_value(&mut encoder, &struct_arr, 1, &arrow_field, &pg_field);
638        assert!(result.is_ok());
639        assert_eq!(
640            encoder.call_count, 1,
641            "encode_field must be called exactly once for a NULL struct to emit a NULL indicator"
642        );
643    }
644
645    #[test]
646    fn test_get_time32_second_value() {
647        let array = Time32SecondArray::from_iter_values([3723_i32]);
648        let array: Arc<dyn Array> = Arc::new(array);
649        let value = get_time32_second_value(&array, 0);
650        assert_eq!(value, Some(NaiveTime::from_hms_opt(1, 2, 3)).unwrap());
651    }
652
653    #[test]
654    fn test_get_time32_millisecond_value() {
655        let array = Time32MillisecondArray::from_iter_values([3723001_i32]);
656        let array: Arc<dyn Array> = Arc::new(array);
657        let value = get_time32_millisecond_value(&array, 0);
658        assert_eq!(
659            value,
660            Some(NaiveTime::from_hms_milli_opt(1, 2, 3, 1)).unwrap()
661        );
662    }
663
664    #[test]
665    fn test_get_time64_microsecond_value() {
666        let array = Time64MicrosecondArray::from_iter_values([3723001001_i64]);
667        let array: Arc<dyn Array> = Arc::new(array);
668        let value = get_time64_microsecond_value(&array, 0);
669        assert_eq!(
670            value,
671            Some(NaiveTime::from_hms_micro_opt(1, 2, 3, 1001)).unwrap()
672        );
673    }
674
675    #[test]
676    fn test_get_time64_nanosecond_value() {
677        let array = Time64NanosecondArray::from_iter_values([3723001001001_i64]);
678        let array: Arc<dyn Array> = Arc::new(array);
679        let value = get_time64_nanosecond_value(&array, 0);
680        assert_eq!(
681            value,
682            Some(NaiveTime::from_hms_nano_opt(1, 2, 3, 1001001)).unwrap()
683        );
684    }
685}