Skip to main content

vortex_array/extension/datetime/
timestamp.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Temporal extension data types.
5
6use std::fmt;
7use std::sync::Arc;
8
9use jiff::Span;
10use vortex_error::VortexExpect;
11use vortex_error::VortexResult;
12use vortex_error::vortex_bail;
13use vortex_error::vortex_ensure;
14use vortex_error::vortex_err;
15use vortex_error::vortex_panic;
16
17use crate::dtype::DType;
18use crate::dtype::Nullability;
19use crate::dtype::PType;
20use crate::dtype::extension::ExtDType;
21use crate::dtype::extension::ExtId;
22use crate::dtype::extension::ExtVTable;
23use crate::extension::datetime::TimeUnit;
24use crate::scalar::ScalarValue;
25
26/// Timestamp DType.
27#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
28pub struct Timestamp;
29
30impl Timestamp {
31    /// Creates a new Timestamp extension =dtype with the given time unit and nullability.
32    pub fn new(time_unit: TimeUnit, nullability: Nullability) -> ExtDType<Self> {
33        Self::new_with_tz(time_unit, None, nullability)
34    }
35
36    /// Creates a new Timestamp extension dtype with the given time unit, timezone, and nullability.
37    pub fn new_with_tz(
38        time_unit: TimeUnit,
39        timezone: Option<Arc<str>>,
40        nullability: Nullability,
41    ) -> ExtDType<Self> {
42        ExtDType::try_new(
43            TimestampOptions {
44                unit: time_unit,
45                tz: timezone,
46            },
47            DType::Primitive(PType::I64, nullability),
48        )
49        .vortex_expect("failed to create timestamp dtype")
50    }
51
52    /// Creates a new `Timestamp` extension dtype with the given options and nullability.
53    pub fn new_with_options(options: TimestampOptions, nullability: Nullability) -> ExtDType<Self> {
54        ExtDType::try_new(options, DType::Primitive(PType::I64, nullability))
55            .vortex_expect("failed to create timestamp dtype")
56    }
57}
58
59/// Options for the Timestamp DType.
60#[derive(Clone, Debug, PartialEq, Eq, Hash)]
61pub struct TimestampOptions {
62    /// The time unit of the timestamp.
63    pub unit: TimeUnit,
64    /// The timezone of the timestamp, if any.
65    pub tz: Option<Arc<str>>,
66}
67
68impl fmt::Display for TimestampOptions {
69    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70        match &self.tz {
71            Some(tz) => write!(f, "{}, tz={}", self.unit, tz),
72            None => write!(f, "{}", self.unit),
73        }
74    }
75}
76
77/// Unpacked value of a [`Timestamp`] extension scalar.
78///
79/// Each variant carries the raw storage value and an optional timezone.
80pub enum TimestampValue<'a> {
81    /// Seconds since the Unix epoch.
82    Seconds(i64, Option<&'a Arc<str>>),
83    /// Milliseconds since the Unix epoch.
84    Milliseconds(i64, Option<&'a Arc<str>>),
85    /// Microseconds since the Unix epoch.
86    Microseconds(i64, Option<&'a Arc<str>>),
87    /// Nanoseconds since the Unix epoch.
88    Nanoseconds(i64, Option<&'a Arc<str>>),
89}
90
91impl fmt::Display for TimestampValue<'_> {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        let (span, tz) = match self {
94            TimestampValue::Seconds(v, tz) => (Span::new().seconds(*v), *tz),
95            TimestampValue::Milliseconds(v, tz) => (Span::new().milliseconds(*v), *tz),
96            TimestampValue::Microseconds(v, tz) => (Span::new().microseconds(*v), *tz),
97            TimestampValue::Nanoseconds(v, tz) => (Span::new().nanoseconds(*v), *tz),
98        };
99        let ts = jiff::Timestamp::UNIX_EPOCH + span;
100
101        match tz {
102            None => write!(f, "{ts}"),
103            Some(tz) => {
104                let adjusted_ts = ts.in_tz(tz.as_ref()).vortex_expect("unknown timezone");
105                write!(f, "{adjusted_ts}",)
106            }
107        }
108    }
109}
110
111impl ExtVTable for Timestamp {
112    type Metadata = TimestampOptions;
113
114    type NativeValue<'a> = TimestampValue<'a>;
115
116    fn id(&self) -> ExtId {
117        ExtId::new("vortex.timestamp")
118    }
119
120    // NOTE(ngates): unfortunately we're stuck with this hand-rolled serialization format for
121    //  backwards compatibility.
122    fn serialize_metadata(&self, metadata: &Self::Metadata) -> VortexResult<Vec<u8>> {
123        let mut bytes = Vec::with_capacity(4);
124        let unit_tag: u8 = metadata.unit.into();
125
126        bytes.push(unit_tag);
127
128        // Encode time_zone as u16 length followed by utf8 bytes.
129        match &metadata.tz {
130            None => bytes.extend_from_slice(0u16.to_le_bytes().as_slice()),
131            Some(tz) => {
132                let tz_bytes = tz.as_bytes();
133                let tz_len = u16::try_from(tz_bytes.len())
134                    .unwrap_or_else(|err| vortex_panic!("tz did not fit in u16: {}", err));
135                bytes.extend_from_slice(tz_len.to_le_bytes().as_slice());
136                bytes.extend_from_slice(tz_bytes);
137            }
138        }
139
140        Ok(bytes)
141    }
142
143    fn deserialize_metadata(&self, data: &[u8]) -> VortexResult<Self::Metadata> {
144        vortex_ensure!(
145            data.len() >= 3,
146            "Timestamp metadata must have at least 3 bytes, got {}",
147            data.len()
148        );
149
150        let tag = data[0];
151        let time_unit = TimeUnit::try_from(tag)?;
152        let tz_len_bytes: [u8; 2] = data[1..3]
153            .try_into()
154            .ok()
155            .vortex_expect("Verified to have two bytes");
156        let tz_len = u16::from_le_bytes(tz_len_bytes) as usize;
157        if tz_len == 0 {
158            return Ok(TimestampOptions {
159                unit: time_unit,
160                tz: None,
161            });
162        }
163
164        // Attempt to load from len-prefixed bytes
165        vortex_ensure!(
166            data.len() >= 3 + tz_len,
167            "Timestamp metadata is truncated: declared timezone length {} but only {} bytes available",
168            tz_len,
169            data.len() - 3
170        );
171        let tz_bytes = &data[3..3 + tz_len];
172        let tz: Arc<str> = str::from_utf8(tz_bytes)
173            .map_err(|e| vortex_err!("timezone is not valid utf8 string: {e}"))?
174            .to_string()
175            .into();
176        Ok(TimestampOptions {
177            unit: time_unit,
178            tz: Some(tz),
179        })
180    }
181
182    fn can_coerce_from(ext_dtype: &ExtDType<Self>, other: &DType) -> bool {
183        let DType::Extension(other_ext) = other else {
184            return false;
185        };
186        let Some(other_opts) = other_ext.metadata_opt::<Timestamp>() else {
187            return false;
188        };
189        let our_opts = ext_dtype.metadata();
190        our_opts.tz == other_opts.tz
191            && our_opts.unit <= other_opts.unit
192            && (ext_dtype.storage_dtype().is_nullable() || !other.is_nullable())
193    }
194
195    fn least_supertype(ext_dtype: &ExtDType<Self>, other: &DType) -> Option<DType> {
196        let DType::Extension(other_ext) = other else {
197            return None;
198        };
199        let other_opts = other_ext.metadata_opt::<Timestamp>()?;
200        let our_opts = ext_dtype.metadata();
201        if our_opts.tz != other_opts.tz {
202            return None;
203        }
204        let finest = our_opts.unit.min(other_opts.unit);
205        let union_null = ext_dtype.storage_dtype().nullability() | other.nullability();
206        Some(DType::Extension(
207            Timestamp::new_with_tz(finest, our_opts.tz.clone(), union_null).erased(),
208        ))
209    }
210
211    fn validate_dtype(ext_dtype: &ExtDType<Self>) -> VortexResult<()> {
212        vortex_ensure!(
213            matches!(ext_dtype.storage_dtype(), DType::Primitive(PType::I64, _)),
214            "Timestamp storage dtype must be i64"
215        );
216        Ok(())
217    }
218
219    fn unpack_native<'a>(
220        ext_dtype: &'a ExtDType<Self>,
221        storage_value: &'a ScalarValue,
222    ) -> VortexResult<Self::NativeValue<'a>> {
223        let metadata = ext_dtype.metadata();
224        let ts_value = storage_value.as_primitive().cast::<i64>()?;
225        let tz = metadata.tz.as_ref();
226
227        let (span, value) = match metadata.unit {
228            TimeUnit::Nanoseconds => (
229                Span::new().nanoseconds(ts_value),
230                TimestampValue::Nanoseconds(ts_value, tz),
231            ),
232            TimeUnit::Microseconds => (
233                Span::new().microseconds(ts_value),
234                TimestampValue::Microseconds(ts_value, tz),
235            ),
236            TimeUnit::Milliseconds => (
237                Span::new().milliseconds(ts_value),
238                TimestampValue::Milliseconds(ts_value, tz),
239            ),
240            TimeUnit::Seconds => (
241                Span::new().seconds(ts_value),
242                TimestampValue::Seconds(ts_value, tz),
243            ),
244            TimeUnit::Days => vortex_bail!("Timestamp does not support Days time unit"),
245        };
246
247        // Validate the storage value is within the valid range for Timestamp.
248        let ts = jiff::Timestamp::UNIX_EPOCH
249            .checked_add(span)
250            .map_err(|e| vortex_err!("Invalid timestamp scalar: {}", e))?;
251
252        if let Some(tz) = tz {
253            ts.in_tz(tz.as_ref())
254                .map_err(|e| vortex_err!("Invalid timezone for timestamp scalar: {}", e))?;
255        }
256
257        Ok(value)
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use std::sync::Arc;
264
265    use vortex_error::VortexResult;
266
267    use crate::dtype::DType;
268    use crate::dtype::Nullability::Nullable;
269    use crate::extension::datetime::TimeUnit;
270    use crate::extension::datetime::Timestamp;
271    use crate::scalar::PValue;
272    use crate::scalar::Scalar;
273    use crate::scalar::ScalarValue;
274
275    #[test]
276    fn validate_timestamp_scalar() -> VortexResult<()> {
277        let dtype = DType::Extension(Timestamp::new(TimeUnit::Seconds, Nullable).erased());
278        Scalar::try_new(dtype, Some(ScalarValue::Primitive(PValue::I64(0))))?;
279
280        Ok(())
281    }
282
283    #[cfg_attr(miri, ignore)]
284    #[test]
285    fn reject_timestamp_with_invalid_timezone() {
286        let dtype = DType::Extension(
287            Timestamp::new_with_tz(
288                TimeUnit::Seconds,
289                Some(Arc::from("Not/A/Timezone")),
290                Nullable,
291            )
292            .erased(),
293        );
294        let result = Scalar::try_new(dtype, Some(ScalarValue::Primitive(PValue::I64(0))));
295        assert!(result.is_err());
296    }
297
298    #[cfg_attr(miri, ignore)]
299    #[test]
300    fn display_timestamp_scalar() {
301        // Local (no timezone) timestamp.
302        let local_dtype = DType::Extension(Timestamp::new(TimeUnit::Seconds, Nullable).erased());
303        let scalar = Scalar::new(local_dtype, Some(ScalarValue::Primitive(PValue::I64(0))));
304        assert_eq!(format!("{}", scalar.as_extension()), "1970-01-01T00:00:00Z");
305
306        // Zoned timestamp.
307        let zoned_dtype = DType::Extension(
308            Timestamp::new_with_tz(
309                TimeUnit::Seconds,
310                Some(Arc::from("America/New_York")),
311                Nullable,
312            )
313            .erased(),
314        );
315        let scalar = Scalar::new(zoned_dtype, Some(ScalarValue::Primitive(PValue::I64(0))));
316        assert_eq!(
317            format!("{}", scalar.as_extension()),
318            "1969-12-31T19:00:00-05:00[America/New_York]"
319        );
320    }
321
322    #[test]
323    fn least_supertype_timestamp_units() {
324        use crate::dtype::Nullability::NonNullable;
325
326        let secs = DType::Extension(Timestamp::new(TimeUnit::Seconds, NonNullable).erased());
327        let ns = DType::Extension(Timestamp::new(TimeUnit::Nanoseconds, NonNullable).erased());
328        let expected =
329            DType::Extension(Timestamp::new(TimeUnit::Nanoseconds, NonNullable).erased());
330        assert_eq!(secs.least_supertype(&ns).unwrap(), expected);
331        assert_eq!(ns.least_supertype(&secs).unwrap(), expected);
332    }
333
334    #[test]
335    fn least_supertype_timestamp_tz_mismatch() {
336        use crate::dtype::Nullability::NonNullable;
337
338        let utc = DType::Extension(
339            Timestamp::new_with_tz(TimeUnit::Seconds, Some(Arc::from("UTC")), NonNullable).erased(),
340        );
341        let none = DType::Extension(Timestamp::new(TimeUnit::Seconds, NonNullable).erased());
342        assert!(utc.least_supertype(&none).is_none());
343    }
344
345    #[test]
346    fn least_supertype_timestamp_same_tz() {
347        use crate::dtype::Nullability::NonNullable;
348
349        let utc_s = DType::Extension(
350            Timestamp::new_with_tz(TimeUnit::Seconds, Some(Arc::from("UTC")), NonNullable).erased(),
351        );
352        let utc_ns = DType::Extension(
353            Timestamp::new_with_tz(TimeUnit::Nanoseconds, Some(Arc::from("UTC")), NonNullable)
354                .erased(),
355        );
356        let expected = DType::Extension(
357            Timestamp::new_with_tz(TimeUnit::Nanoseconds, Some(Arc::from("UTC")), NonNullable)
358                .erased(),
359        );
360        assert_eq!(utc_s.least_supertype(&utc_ns).unwrap(), expected);
361    }
362
363    #[test]
364    fn can_coerce_from_timestamp_tz() {
365        use crate::dtype::Nullability::NonNullable;
366
367        let utc = DType::Extension(
368            Timestamp::new_with_tz(TimeUnit::Nanoseconds, Some(Arc::from("UTC")), NonNullable)
369                .erased(),
370        );
371        let utc_s = DType::Extension(
372            Timestamp::new_with_tz(TimeUnit::Seconds, Some(Arc::from("UTC")), NonNullable).erased(),
373        );
374        let none = DType::Extension(Timestamp::new(TimeUnit::Nanoseconds, NonNullable).erased());
375        assert!(utc.can_coerce_from(&utc_s));
376        assert!(!utc.can_coerce_from(&none));
377    }
378
379    #[test]
380    fn deserialize_empty_metadata_returns_error() {
381        use crate::dtype::extension::ExtVTable;
382
383        let vtable = Timestamp;
384        assert!(vtable.deserialize_metadata(&[]).is_err());
385    }
386
387    #[test]
388    fn deserialize_too_short_metadata_returns_error() {
389        use crate::dtype::extension::ExtVTable;
390
391        let vtable = Timestamp;
392        // Only 2 bytes - too short for the required 3-byte header.
393        assert!(vtable.deserialize_metadata(&[0x00, 0x01]).is_err());
394    }
395
396    #[test]
397    fn deserialize_truncated_timezone_returns_error() {
398        use crate::dtype::extension::ExtVTable;
399
400        let vtable = Timestamp;
401        // Valid tag (0x00 = Nanoseconds), tz_len = 10 (little-endian: [0x0A, 0x00]),
402        // but only 3 bytes of timezone data instead of the declared 10.
403        let data = [0x00u8, 0x0A, 0x00, b'U', b'T', b'C'];
404        assert!(vtable.deserialize_metadata(&data).is_err());
405    }
406}