Skip to main content

fluss/row/
datum.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::error::Error::RowConvertError;
19use crate::error::Result;
20use crate::row::Decimal;
21use arrow::array::{
22    ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
23    FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, Int16Builder,
24    Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder, Time32SecondBuilder,
25    Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
26    TimestampMillisecondBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder,
27};
28use arrow::datatypes as arrow_schema;
29use arrow::error::ArrowError;
30use jiff::ToSpan;
31use ordered_float::OrderedFloat;
32use parse_display::Display;
33use serde::Serialize;
34use std::borrow::Cow;
35
36#[allow(dead_code)]
37const THIRTY_YEARS_MICROSECONDS: i64 = 946_684_800_000_000;
38
39#[derive(Debug, Clone, Display, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
40pub enum Datum<'a> {
41    #[display("null")]
42    Null,
43    #[display("{0}")]
44    Bool(bool),
45    #[display("{0}")]
46    Int8(i8),
47    #[display("{0}")]
48    Int16(i16),
49    #[display("{0}")]
50    Int32(i32),
51    #[display("{0}")]
52    Int64(i64),
53    #[display("{0}")]
54    Float32(F32),
55    #[display("{0}")]
56    Float64(F64),
57    #[display("'{0}'")]
58    String(Str<'a>),
59    #[display("{:?}")]
60    Blob(Blob<'a>),
61    #[display("{0}")]
62    Decimal(Decimal),
63    #[display("{0}")]
64    Date(Date),
65    #[display("{0}")]
66    Time(Time),
67    #[display("{0}")]
68    TimestampNtz(TimestampNtz),
69    #[display("{0}")]
70    TimestampLtz(TimestampLtz),
71}
72
73impl Datum<'_> {
74    pub fn is_null(&self) -> bool {
75        matches!(self, Datum::Null)
76    }
77
78    pub fn as_str(&self) -> &str {
79        match self {
80            Self::String(s) => s,
81            _ => panic!("not a string: {self:?}"),
82        }
83    }
84
85    pub fn as_blob(&self) -> &[u8] {
86        match self {
87            Self::Blob(blob) => blob.as_ref(),
88            _ => panic!("not a blob: {self:?}"),
89        }
90    }
91
92    pub fn as_decimal(&self) -> &Decimal {
93        match self {
94            Self::Decimal(d) => d,
95            _ => panic!("not a decimal: {self:?}"),
96        }
97    }
98
99    pub fn as_date(&self) -> Date {
100        match self {
101            Self::Date(d) => *d,
102            _ => panic!("not a date: {self:?}"),
103        }
104    }
105
106    pub fn as_time(&self) -> Time {
107        match self {
108            Self::Time(t) => *t,
109            _ => panic!("not a time: {self:?}"),
110        }
111    }
112
113    pub fn as_timestamp_ntz(&self) -> TimestampNtz {
114        match self {
115            Self::TimestampNtz(ts) => *ts,
116            _ => panic!("not a timestamp ntz: {self:?}"),
117        }
118    }
119
120    pub fn as_timestamp_ltz(&self) -> TimestampLtz {
121        match self {
122            Self::TimestampLtz(ts) => *ts,
123            _ => panic!("not a timestamp ltz: {self:?}"),
124        }
125    }
126}
127
128// ----------- implement from
129impl<'a> From<i32> for Datum<'a> {
130    #[inline]
131    fn from(i: i32) -> Datum<'a> {
132        Datum::Int32(i)
133    }
134}
135
136impl<'a> From<i64> for Datum<'a> {
137    #[inline]
138    fn from(i: i64) -> Datum<'a> {
139        Datum::Int64(i)
140    }
141}
142
143impl<'a> From<i8> for Datum<'a> {
144    #[inline]
145    fn from(i: i8) -> Datum<'a> {
146        Datum::Int8(i)
147    }
148}
149
150impl<'a> From<i16> for Datum<'a> {
151    #[inline]
152    fn from(i: i16) -> Datum<'a> {
153        Datum::Int16(i)
154    }
155}
156
157pub type Str<'a> = Cow<'a, str>;
158
159impl<'a> From<String> for Datum<'a> {
160    #[inline]
161    fn from(s: String) -> Self {
162        Datum::String(Cow::Owned(s))
163    }
164}
165
166impl<'a> From<&'a str> for Datum<'a> {
167    #[inline]
168    fn from(s: &'a str) -> Datum<'a> {
169        Datum::String(Cow::Borrowed(s))
170    }
171}
172
173impl From<Option<&()>> for Datum<'_> {
174    fn from(_: Option<&()>) -> Self {
175        Self::Null
176    }
177}
178
179impl<'a> From<f32> for Datum<'a> {
180    #[inline]
181    fn from(f: f32) -> Datum<'a> {
182        Datum::Float32(F32::from(f))
183    }
184}
185
186impl<'a> From<f64> for Datum<'a> {
187    #[inline]
188    fn from(f: f64) -> Datum<'a> {
189        Datum::Float64(F64::from(f))
190    }
191}
192
193impl TryFrom<&Datum<'_>> for i32 {
194    type Error = ();
195
196    #[inline]
197    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
198        match from {
199            Datum::Int32(i) => Ok(*i),
200            _ => Err(()),
201        }
202    }
203}
204
205impl TryFrom<&Datum<'_>> for i16 {
206    type Error = ();
207
208    #[inline]
209    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
210        match from {
211            Datum::Int16(i) => Ok(*i),
212            _ => Err(()),
213        }
214    }
215}
216
217impl TryFrom<&Datum<'_>> for i64 {
218    type Error = ();
219
220    #[inline]
221    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
222        match from {
223            Datum::Int64(i) => Ok(*i),
224            _ => Err(()),
225        }
226    }
227}
228
229impl TryFrom<&Datum<'_>> for f32 {
230    type Error = ();
231
232    #[inline]
233    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
234        match from {
235            Datum::Float32(f) => Ok(f.into_inner()),
236            _ => Err(()),
237        }
238    }
239}
240
241impl TryFrom<&Datum<'_>> for f64 {
242    type Error = ();
243
244    #[inline]
245    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
246        match from {
247            Datum::Float64(f) => Ok(f.into_inner()),
248            _ => Err(()),
249        }
250    }
251}
252
253impl TryFrom<&Datum<'_>> for bool {
254    type Error = ();
255
256    #[inline]
257    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
258        match from {
259            Datum::Bool(b) => Ok(*b),
260            _ => Err(()),
261        }
262    }
263}
264
265impl<'b, 'a: 'b> TryFrom<&'b Datum<'a>> for &'b str {
266    type Error = ();
267
268    #[inline]
269    fn try_from(from: &'b Datum<'a>) -> std::result::Result<Self, Self::Error> {
270        match from {
271            Datum::String(s) => Ok(s.as_ref()),
272            _ => Err(()),
273        }
274    }
275}
276
277impl TryFrom<&Datum<'_>> for i8 {
278    type Error = ();
279
280    #[inline]
281    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
282        match from {
283            Datum::Int8(i) => Ok(*i),
284            _ => Err(()),
285        }
286    }
287}
288
289impl TryFrom<&Datum<'_>> for Decimal {
290    type Error = ();
291
292    #[inline]
293    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
294        match from {
295            Datum::Decimal(d) => Ok(d.clone()),
296            _ => Err(()),
297        }
298    }
299}
300
301impl TryFrom<&Datum<'_>> for Date {
302    type Error = ();
303
304    #[inline]
305    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
306        match from {
307            Datum::Date(d) => Ok(*d),
308            _ => Err(()),
309        }
310    }
311}
312
313impl TryFrom<&Datum<'_>> for Time {
314    type Error = ();
315
316    #[inline]
317    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
318        match from {
319            Datum::Time(t) => Ok(*t),
320            _ => Err(()),
321        }
322    }
323}
324
325impl TryFrom<&Datum<'_>> for TimestampNtz {
326    type Error = ();
327
328    #[inline]
329    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
330        match from {
331            Datum::TimestampNtz(ts) => Ok(*ts),
332            _ => Err(()),
333        }
334    }
335}
336
337impl TryFrom<&Datum<'_>> for TimestampLtz {
338    type Error = ();
339
340    #[inline]
341    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
342        match from {
343            Datum::TimestampLtz(ts) => Ok(*ts),
344            _ => Err(()),
345        }
346    }
347}
348
349impl<'a> From<bool> for Datum<'a> {
350    #[inline]
351    fn from(b: bool) -> Datum<'a> {
352        Datum::Bool(b)
353    }
354}
355
356impl<'a> From<Decimal> for Datum<'a> {
357    #[inline]
358    fn from(d: Decimal) -> Datum<'a> {
359        Datum::Decimal(d)
360    }
361}
362
363impl<'a> From<Date> for Datum<'a> {
364    #[inline]
365    fn from(d: Date) -> Datum<'a> {
366        Datum::Date(d)
367    }
368}
369
370impl<'a> From<Time> for Datum<'a> {
371    #[inline]
372    fn from(t: Time) -> Datum<'a> {
373        Datum::Time(t)
374    }
375}
376
377impl<'a> From<TimestampNtz> for Datum<'a> {
378    #[inline]
379    fn from(ts: TimestampNtz) -> Datum<'a> {
380        Datum::TimestampNtz(ts)
381    }
382}
383
384impl<'a> From<TimestampLtz> for Datum<'a> {
385    #[inline]
386    fn from(ts: TimestampLtz) -> Datum<'a> {
387        Datum::TimestampLtz(ts)
388    }
389}
390
391pub trait ToArrow {
392    fn append_to(
393        &self,
394        builder: &mut dyn ArrayBuilder,
395        data_type: &arrow_schema::DataType,
396    ) -> Result<()>;
397}
398
399// Time unit conversion constants
400pub(crate) const MILLIS_PER_SECOND: i64 = 1_000;
401pub(crate) const MICROS_PER_MILLI: i64 = 1_000;
402pub(crate) const NANOS_PER_MILLI: i64 = 1_000_000;
403
404/// Converts milliseconds and nanoseconds-within-millisecond to total microseconds.
405/// Returns an error if the conversion would overflow.
406pub(crate) fn millis_nanos_to_micros(millis: i64, nanos: i32) -> Result<i64> {
407    let millis_micros = millis
408        .checked_mul(MICROS_PER_MILLI)
409        .ok_or_else(|| RowConvertError {
410            message: format!(
411                "Timestamp milliseconds {millis} overflows when converting to microseconds"
412            ),
413        })?;
414    let nanos_micros = (nanos as i64) / MICROS_PER_MILLI;
415    millis_micros
416        .checked_add(nanos_micros)
417        .ok_or_else(|| RowConvertError {
418            message: format!(
419                "Timestamp overflow when adding microseconds: {millis_micros} + {nanos_micros}"
420            ),
421        })
422}
423
424/// Converts milliseconds and nanoseconds-within-millisecond to total nanoseconds.
425/// Returns an error if the conversion would overflow.
426pub(crate) fn millis_nanos_to_nanos(millis: i64, nanos: i32) -> Result<i64> {
427    let millis_nanos = millis
428        .checked_mul(NANOS_PER_MILLI)
429        .ok_or_else(|| RowConvertError {
430            message: format!(
431                "Timestamp milliseconds {millis} overflows when converting to nanoseconds"
432            ),
433        })?;
434    millis_nanos
435        .checked_add(nanos as i64)
436        .ok_or_else(|| RowConvertError {
437            message: format!(
438                "Timestamp overflow when adding nanoseconds: {millis_nanos} + {nanos}"
439            ),
440        })
441}
442
443/// Rescales a [`Decimal`] to the given Arrow target precision/scale and appends
444/// the resulting i128 to the builder.
445pub(crate) fn append_decimal_to_builder(
446    decimal: &Decimal,
447    target_precision: u32,
448    target_scale: i64,
449    builder: &mut Decimal128Builder,
450) -> Result<()> {
451    use bigdecimal::RoundingMode;
452
453    let bd = decimal.to_big_decimal();
454    let rescaled = bd.with_scale_round(target_scale, RoundingMode::HalfUp);
455    let (unscaled, _) = rescaled.as_bigint_and_exponent();
456
457    let actual_precision = Decimal::compute_precision(&unscaled);
458    if actual_precision > target_precision as usize {
459        return Err(RowConvertError {
460            message: format!(
461                "Decimal precision overflow: value has {actual_precision} digits but Arrow expects {target_precision} (value: {rescaled})"
462            ),
463        });
464    }
465
466    let i128_val: i128 = match unscaled.try_into() {
467        Ok(v) => v,
468        Err(_) => {
469            return Err(RowConvertError {
470                message: format!("Decimal value exceeds i128 range: {rescaled}"),
471            });
472        }
473    };
474
475    builder.append_value(i128_val);
476    Ok(())
477}
478
479trait AppendResult {
480    fn into_append_result(self) -> Result<()>;
481}
482
483impl AppendResult for () {
484    fn into_append_result(self) -> Result<()> {
485        Ok(())
486    }
487}
488
489impl AppendResult for std::result::Result<(), ArrowError> {
490    fn into_append_result(self) -> Result<()> {
491        self.map_err(|e| RowConvertError {
492            message: format!("Failed to append value: {e}"),
493        })
494    }
495}
496
497impl Datum<'_> {
498    pub fn append_to(
499        &self,
500        builder: &mut dyn ArrayBuilder,
501        data_type: &arrow_schema::DataType,
502    ) -> Result<()> {
503        macro_rules! append_null_to_arrow {
504            ($builder_type:ty) => {
505                if let Some(b) = builder.as_any_mut().downcast_mut::<$builder_type>() {
506                    b.append_null();
507                    return Ok(());
508                }
509            };
510        }
511
512        macro_rules! append_value_to_arrow {
513            ($builder_type:ty, $value:expr) => {
514                if let Some(b) = builder.as_any_mut().downcast_mut::<$builder_type>() {
515                    b.append_value($value).into_append_result()?;
516                    return Ok(());
517                }
518            };
519        }
520
521        match self {
522            Datum::Null => {
523                append_null_to_arrow!(Int8Builder);
524                append_null_to_arrow!(BooleanBuilder);
525                append_null_to_arrow!(Int16Builder);
526                append_null_to_arrow!(Int32Builder);
527                append_null_to_arrow!(Int64Builder);
528                append_null_to_arrow!(Float32Builder);
529                append_null_to_arrow!(Float64Builder);
530                append_null_to_arrow!(StringBuilder);
531                append_null_to_arrow!(BinaryBuilder);
532                append_null_to_arrow!(FixedSizeBinaryBuilder);
533                append_null_to_arrow!(Decimal128Builder);
534                append_null_to_arrow!(Date32Builder);
535                append_null_to_arrow!(Time32SecondBuilder);
536                append_null_to_arrow!(Time32MillisecondBuilder);
537                append_null_to_arrow!(Time64MicrosecondBuilder);
538                append_null_to_arrow!(Time64NanosecondBuilder);
539                append_null_to_arrow!(TimestampSecondBuilder);
540                append_null_to_arrow!(TimestampMillisecondBuilder);
541                append_null_to_arrow!(TimestampMicrosecondBuilder);
542                append_null_to_arrow!(TimestampNanosecondBuilder);
543            }
544            Datum::Bool(v) => append_value_to_arrow!(BooleanBuilder, *v),
545            Datum::Int8(v) => append_value_to_arrow!(Int8Builder, *v),
546            Datum::Int16(v) => append_value_to_arrow!(Int16Builder, *v),
547            Datum::Int32(v) => append_value_to_arrow!(Int32Builder, *v),
548            Datum::Int64(v) => append_value_to_arrow!(Int64Builder, *v),
549            Datum::Float32(v) => append_value_to_arrow!(Float32Builder, v.into_inner()),
550            Datum::Float64(v) => append_value_to_arrow!(Float64Builder, v.into_inner()),
551            Datum::String(v) => append_value_to_arrow!(StringBuilder, v.as_ref()),
552            Datum::Blob(v) => match data_type {
553                arrow_schema::DataType::Binary => {
554                    append_value_to_arrow!(BinaryBuilder, v.as_ref());
555                }
556                arrow_schema::DataType::FixedSizeBinary(_) => {
557                    append_value_to_arrow!(FixedSizeBinaryBuilder, v.as_ref());
558                }
559                _ => {
560                    return Err(RowConvertError {
561                        message: format!(
562                            "Expected Binary or FixedSizeBinary Arrow type, got: {data_type:?}"
563                        ),
564                    });
565                }
566            },
567            Datum::Decimal(decimal) => {
568                // Extract target precision and scale from Arrow schema
569                let (p, s) = match data_type {
570                    arrow_schema::DataType::Decimal128(p, s) => (*p, *s),
571                    _ => {
572                        return Err(RowConvertError {
573                            message: format!("Expected Decimal128 Arrow type, got: {data_type:?}"),
574                        });
575                    }
576                };
577
578                if s < 0 {
579                    return Err(RowConvertError {
580                        message: format!("Negative decimal scale {s} is not supported"),
581                    });
582                }
583
584                if let Some(b) = builder.as_any_mut().downcast_mut::<Decimal128Builder>() {
585                    append_decimal_to_builder(decimal, p as u32, s as i64, b)?;
586                    return Ok(());
587                }
588
589                return Err(RowConvertError {
590                    message: "Builder type mismatch for Decimal128".to_string(),
591                });
592            }
593            Datum::Date(date) => {
594                append_value_to_arrow!(Date32Builder, date.get_inner());
595            }
596            Datum::Time(time) => {
597                // Time is stored as milliseconds since midnight in Fluss
598                // Convert to Arrow's time unit based on schema
599                let millis = time.get_inner();
600
601                match data_type {
602                    arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Second) => {
603                        if let Some(b) = builder.as_any_mut().downcast_mut::<Time32SecondBuilder>()
604                        {
605                            // Validate no sub-second precision is lost
606                            if millis % MILLIS_PER_SECOND as i32 != 0 {
607                                return Err(RowConvertError {
608                                    message: format!(
609                                        "Time value {millis} ms has sub-second precision but schema expects seconds only"
610                                    ),
611                                });
612                            }
613                            b.append_value(millis / MILLIS_PER_SECOND as i32);
614                            return Ok(());
615                        }
616                    }
617                    arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
618                        if let Some(b) = builder
619                            .as_any_mut()
620                            .downcast_mut::<Time32MillisecondBuilder>()
621                        {
622                            b.append_value(millis);
623                            return Ok(());
624                        }
625                    }
626                    arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
627                        if let Some(b) = builder
628                            .as_any_mut()
629                            .downcast_mut::<Time64MicrosecondBuilder>()
630                        {
631                            let micros = (millis as i64)
632                                .checked_mul(MICROS_PER_MILLI)
633                                .ok_or_else(|| RowConvertError {
634                                    message: format!(
635                                        "Time value {millis} ms overflows when converting to microseconds"
636                                    ),
637                                })?;
638                            b.append_value(micros);
639                            return Ok(());
640                        }
641                    }
642                    arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
643                        if let Some(b) = builder
644                            .as_any_mut()
645                            .downcast_mut::<Time64NanosecondBuilder>()
646                        {
647                            let nanos = (millis as i64).checked_mul(NANOS_PER_MILLI).ok_or_else(
648                                || RowConvertError {
649                                    message: format!(
650                                        "Time value {millis} ms overflows when converting to nanoseconds"
651                                    ),
652                                },
653                            )?;
654                            b.append_value(nanos);
655                            return Ok(());
656                        }
657                    }
658                    _ => {
659                        return Err(RowConvertError {
660                            message: format!(
661                                "Expected Time32/Time64 Arrow type, got: {data_type:?}"
662                            ),
663                        });
664                    }
665                }
666
667                return Err(RowConvertError {
668                    message: "Builder type mismatch for Time".to_string(),
669                });
670            }
671            Datum::TimestampNtz(ts) => {
672                let millis = ts.get_millisecond();
673                let nanos = ts.get_nano_of_millisecond();
674
675                if let Some(b) = builder
676                    .as_any_mut()
677                    .downcast_mut::<TimestampSecondBuilder>()
678                {
679                    b.append_value(millis / MILLIS_PER_SECOND);
680                    return Ok(());
681                }
682                if let Some(b) = builder
683                    .as_any_mut()
684                    .downcast_mut::<TimestampMillisecondBuilder>()
685                {
686                    b.append_value(millis);
687                    return Ok(());
688                }
689                if let Some(b) = builder
690                    .as_any_mut()
691                    .downcast_mut::<TimestampMicrosecondBuilder>()
692                {
693                    b.append_value(millis_nanos_to_micros(millis, nanos)?);
694                    return Ok(());
695                }
696                if let Some(b) = builder
697                    .as_any_mut()
698                    .downcast_mut::<TimestampNanosecondBuilder>()
699                {
700                    b.append_value(millis_nanos_to_nanos(millis, nanos)?);
701                    return Ok(());
702                }
703
704                return Err(RowConvertError {
705                    message: "Builder type mismatch for TimestampNtz".to_string(),
706                });
707            }
708            Datum::TimestampLtz(ts) => {
709                let millis = ts.get_epoch_millisecond();
710                let nanos = ts.get_nano_of_millisecond();
711
712                if let Some(b) = builder
713                    .as_any_mut()
714                    .downcast_mut::<TimestampSecondBuilder>()
715                {
716                    b.append_value(millis / MILLIS_PER_SECOND);
717                    return Ok(());
718                }
719                if let Some(b) = builder
720                    .as_any_mut()
721                    .downcast_mut::<TimestampMillisecondBuilder>()
722                {
723                    b.append_value(millis);
724                    return Ok(());
725                }
726                if let Some(b) = builder
727                    .as_any_mut()
728                    .downcast_mut::<TimestampMicrosecondBuilder>()
729                {
730                    b.append_value(millis_nanos_to_micros(millis, nanos)?);
731                    return Ok(());
732                }
733                if let Some(b) = builder
734                    .as_any_mut()
735                    .downcast_mut::<TimestampNanosecondBuilder>()
736                {
737                    b.append_value(millis_nanos_to_nanos(millis, nanos)?);
738                    return Ok(());
739                }
740
741                return Err(RowConvertError {
742                    message: "Builder type mismatch for TimestampLtz".to_string(),
743                });
744            }
745        }
746
747        Err(RowConvertError {
748            message: format!(
749                "Cannot append {:?} to builder of type {}",
750                self,
751                std::any::type_name_of_val(builder)
752            ),
753        })
754    }
755}
756
757macro_rules! impl_to_arrow {
758    ($ty:ty, $variant:ident) => {
759        impl ToArrow for $ty {
760            fn append_to(
761                &self,
762                builder: &mut dyn ArrayBuilder,
763                _data_type: &arrow_schema::DataType,
764            ) -> Result<()> {
765                if let Some(b) = builder.as_any_mut().downcast_mut::<$variant>() {
766                    b.append_value(*self);
767                    Ok(())
768                } else {
769                    Err(RowConvertError {
770                        message: format!(
771                            "Cannot cast {} to {} builder",
772                            stringify!($ty),
773                            stringify!($variant)
774                        ),
775                    })
776                }
777            }
778        }
779    };
780}
781
782impl_to_arrow!(i8, Int8Builder);
783impl_to_arrow!(i16, Int16Builder);
784impl_to_arrow!(i32, Int32Builder);
785impl_to_arrow!(f32, Float32Builder);
786impl_to_arrow!(f64, Float64Builder);
787impl_to_arrow!(&str, StringBuilder);
788
789pub type F32 = OrderedFloat<f32>;
790pub type F64 = OrderedFloat<f64>;
791#[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, Hash, Serialize)]
792pub struct Date(i32);
793
794#[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, Hash, Serialize)]
795pub struct Time(i32);
796
797impl Time {
798    pub const fn new(inner: i32) -> Self {
799        Time(inner)
800    }
801
802    /// Get the inner value of time type (milliseconds since midnight)
803    pub fn get_inner(&self) -> i32 {
804        self.0
805    }
806}
807
808/// Maximum timestamp precision that can be stored compactly (milliseconds only).
809/// Values with precision > MAX_COMPACT_TIMESTAMP_PRECISION require additional nanosecond storage.
810pub const MAX_COMPACT_TIMESTAMP_PRECISION: u32 = 3;
811
812/// Maximum valid value for nanoseconds within a millisecond (0 to 999,999 inclusive).
813/// A millisecond contains 1,000,000 nanoseconds, so the fractional part ranges from 0 to 999,999.
814pub const MAX_NANO_OF_MILLISECOND: i32 = 999_999;
815
816#[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, Hash, Serialize)]
817#[display("{millisecond}")]
818pub struct TimestampNtz {
819    millisecond: i64,
820    nano_of_millisecond: i32,
821}
822
823impl TimestampNtz {
824    pub const fn new(millisecond: i64) -> Self {
825        TimestampNtz {
826            millisecond,
827            nano_of_millisecond: 0,
828        }
829    }
830
831    pub fn from_millis_nanos(millisecond: i64, nano_of_millisecond: i32) -> Result<Self> {
832        if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) {
833            return Err(crate::error::Error::IllegalArgument {
834                message: format!(
835                    "nanoOfMillisecond must be in range [0, {MAX_NANO_OF_MILLISECOND}], got: {nano_of_millisecond}"
836                ),
837            });
838        }
839        Ok(TimestampNtz {
840            millisecond,
841            nano_of_millisecond,
842        })
843    }
844
845    pub fn get_millisecond(&self) -> i64 {
846        self.millisecond
847    }
848
849    pub fn get_nano_of_millisecond(&self) -> i32 {
850        self.nano_of_millisecond
851    }
852
853    /// Check if the timestamp is compact based on precision.
854    /// Precision <= MAX_COMPACT_TIMESTAMP_PRECISION means millisecond precision, no need for nanos.
855    pub fn is_compact(precision: u32) -> bool {
856        precision <= MAX_COMPACT_TIMESTAMP_PRECISION
857    }
858}
859
860#[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, Hash, Serialize)]
861#[display("{epoch_millisecond}")]
862pub struct TimestampLtz {
863    epoch_millisecond: i64,
864    nano_of_millisecond: i32,
865}
866
867impl TimestampLtz {
868    pub const fn new(epoch_millisecond: i64) -> Self {
869        TimestampLtz {
870            epoch_millisecond,
871            nano_of_millisecond: 0,
872        }
873    }
874
875    pub fn from_millis_nanos(epoch_millisecond: i64, nano_of_millisecond: i32) -> Result<Self> {
876        if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) {
877            return Err(crate::error::Error::IllegalArgument {
878                message: format!(
879                    "nanoOfMillisecond must be in range [0, {MAX_NANO_OF_MILLISECOND}], got: {nano_of_millisecond}"
880                ),
881            });
882        }
883        Ok(TimestampLtz {
884            epoch_millisecond,
885            nano_of_millisecond,
886        })
887    }
888
889    pub fn get_epoch_millisecond(&self) -> i64 {
890        self.epoch_millisecond
891    }
892
893    pub fn get_nano_of_millisecond(&self) -> i32 {
894        self.nano_of_millisecond
895    }
896
897    /// Check if the timestamp is compact based on precision.
898    /// Precision <= MAX_COMPACT_TIMESTAMP_PRECISION means millisecond precision, no need for nanos.
899    pub fn is_compact(precision: u32) -> bool {
900        precision <= MAX_COMPACT_TIMESTAMP_PRECISION
901    }
902}
903
904pub type Blob<'a> = Cow<'a, [u8]>;
905
906impl<'a> From<Vec<u8>> for Datum<'a> {
907    fn from(vec: Vec<u8>) -> Self {
908        Datum::Blob(Blob::from(vec))
909    }
910}
911
912impl<'a> From<&'a [u8]> for Datum<'a> {
913    fn from(bytes: &'a [u8]) -> Datum<'a> {
914        Datum::Blob(Blob::from(bytes))
915    }
916}
917
918const UNIX_EPOCH_DAY: jiff::civil::Date = jiff::civil::date(1970, 1, 1);
919
920impl Date {
921    pub const fn new(inner: i32) -> Self {
922        Date(inner)
923    }
924
925    /// Get the inner value of date type
926    pub fn get_inner(&self) -> i32 {
927        self.0
928    }
929
930    pub fn year(&self) -> i16 {
931        let date = UNIX_EPOCH_DAY + self.0.days();
932        date.year()
933    }
934    pub fn month(&self) -> i8 {
935        let date = UNIX_EPOCH_DAY + self.0.days();
936        date.month()
937    }
938
939    pub fn day(&self) -> i8 {
940        let date = UNIX_EPOCH_DAY + self.0.days();
941        date.day()
942    }
943}
944
945#[cfg(test)]
946mod tests {
947    use super::*;
948    use arrow::array::{Array, Int32Builder, StringBuilder};
949
950    #[test]
951    fn datum_accessors_and_conversions() {
952        let datum = Datum::String("value".into());
953        assert_eq!(datum.as_str(), "value");
954        assert!(!datum.is_null());
955
956        let blob = Blob::from(vec![1, 2, 3]);
957        let datum = Datum::Blob(blob);
958        assert_eq!(datum.as_blob(), &[1, 2, 3]);
959
960        assert!(Datum::Null.is_null());
961
962        let datum = Datum::Int32(42);
963        let value: i32 = (&datum).try_into().unwrap();
964        assert_eq!(value, 42);
965        let value: std::result::Result<i16, _> = (&datum).try_into();
966        assert!(value.is_err());
967
968        // Test temporal types
969        let decimal = Decimal::from_unscaled_long(12345, 10, 2).unwrap();
970        let datum: Datum = decimal.clone().into();
971        assert_eq!(datum.as_decimal(), &decimal);
972        let extracted: Decimal = (&datum).try_into().unwrap();
973        assert_eq!(extracted, decimal);
974
975        let date = Date::new(19000);
976        let datum: Datum = date.into();
977        assert_eq!(datum.as_date(), date);
978
979        let ts_ltz = TimestampLtz::new(1672531200000);
980        let datum: Datum = ts_ltz.into();
981        assert_eq!(datum.as_timestamp_ltz(), ts_ltz);
982    }
983
984    #[test]
985    fn datum_append_to_builder() {
986        let mut builder = Int32Builder::new();
987        Datum::Null
988            .append_to(&mut builder, &arrow_schema::DataType::Int32)
989            .unwrap();
990        Datum::Int32(5)
991            .append_to(&mut builder, &arrow_schema::DataType::Int32)
992            .unwrap();
993        let array = builder.finish();
994        assert!(array.is_null(0));
995        assert_eq!(array.value(1), 5);
996
997        let mut builder = StringBuilder::new();
998        let err = Datum::Int32(1)
999            .append_to(&mut builder, &arrow_schema::DataType::Utf8)
1000            .unwrap_err();
1001        assert!(matches!(err, crate::error::Error::RowConvertError { .. }));
1002    }
1003
1004    #[test]
1005    #[should_panic]
1006    fn datum_as_str_panics_on_non_string() {
1007        let _ = Datum::Int32(1).as_str();
1008    }
1009
1010    #[test]
1011    #[should_panic]
1012    fn datum_as_blob_panics_on_non_blob() {
1013        let _ = Datum::Int16(1).as_blob();
1014    }
1015
1016    #[test]
1017    fn date_components() {
1018        let date = Date::new(0);
1019        assert_eq!(date.get_inner(), 0);
1020        assert_eq!(date.year(), 1970);
1021        assert_eq!(date.month(), 1);
1022        assert_eq!(date.day(), 1);
1023    }
1024}
1025
1026#[cfg(test)]
1027mod timestamp_tests {
1028    use super::*;
1029
1030    #[test]
1031    fn test_timestamp_valid_nanos() {
1032        // Valid range: 0 to MAX_NANO_OF_MILLISECOND for both TimestampNtz and TimestampLtz
1033        let ntz1 = TimestampNtz::from_millis_nanos(1000, 0).unwrap();
1034        assert_eq!(ntz1.get_nano_of_millisecond(), 0);
1035
1036        let ntz2 = TimestampNtz::from_millis_nanos(1000, MAX_NANO_OF_MILLISECOND).unwrap();
1037        assert_eq!(ntz2.get_nano_of_millisecond(), MAX_NANO_OF_MILLISECOND);
1038
1039        let ntz3 = TimestampNtz::from_millis_nanos(1000, 500_000).unwrap();
1040        assert_eq!(ntz3.get_nano_of_millisecond(), 500_000);
1041
1042        let ltz1 = TimestampLtz::from_millis_nanos(1000, 0).unwrap();
1043        assert_eq!(ltz1.get_nano_of_millisecond(), 0);
1044
1045        let ltz2 = TimestampLtz::from_millis_nanos(1000, MAX_NANO_OF_MILLISECOND).unwrap();
1046        assert_eq!(ltz2.get_nano_of_millisecond(), MAX_NANO_OF_MILLISECOND);
1047    }
1048
1049    #[test]
1050    fn test_timestamp_nanos_out_of_range() {
1051        // Test that both TimestampNtz and TimestampLtz reject invalid nanos
1052        let expected_msg =
1053            format!("nanoOfMillisecond must be in range [0, {MAX_NANO_OF_MILLISECOND}]");
1054
1055        // Too large (1,000,000 is just beyond the valid range)
1056        let result_ntz = TimestampNtz::from_millis_nanos(1000, MAX_NANO_OF_MILLISECOND + 1);
1057        assert!(result_ntz.is_err());
1058        assert!(result_ntz.unwrap_err().to_string().contains(&expected_msg));
1059
1060        let result_ltz = TimestampLtz::from_millis_nanos(1000, MAX_NANO_OF_MILLISECOND + 1);
1061        assert!(result_ltz.is_err());
1062        assert!(result_ltz.unwrap_err().to_string().contains(&expected_msg));
1063
1064        // Negative
1065        let result_ntz = TimestampNtz::from_millis_nanos(1000, -1);
1066        assert!(result_ntz.is_err());
1067        assert!(result_ntz.unwrap_err().to_string().contains(&expected_msg));
1068
1069        let result_ltz = TimestampLtz::from_millis_nanos(1000, -1);
1070        assert!(result_ltz.is_err());
1071        assert!(result_ltz.unwrap_err().to_string().contains(&expected_msg));
1072    }
1073}