Skip to main content

vortex_array/extension/datetime/
timestamp.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Temporal extension data types.
5
6use std::fmt;
7use std::sync::Arc;
8
9use jiff::Span;
10use vortex_error::VortexExpect;
11use vortex_error::VortexResult;
12use vortex_error::vortex_bail;
13use vortex_error::vortex_ensure;
14use vortex_error::vortex_err;
15use vortex_error::vortex_panic;
16use vortex_session::registry::CachedId;
17
18use crate::dtype::DType;
19use crate::dtype::Nullability;
20use crate::dtype::PType;
21use crate::dtype::extension::ExtDType;
22use crate::dtype::extension::ExtId;
23use crate::dtype::extension::ExtVTable;
24use crate::extension::datetime::TimeUnit;
25use crate::scalar::ScalarValue;
26
27/// Timestamp DType.
28#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
29pub struct Timestamp;
30
31impl Timestamp {
32    /// Creates a new Timestamp extension =dtype with the given time unit and nullability.
33    pub fn new(time_unit: TimeUnit, nullability: Nullability) -> ExtDType<Self> {
34        Self::new_with_tz(time_unit, None, nullability)
35    }
36
37    /// Creates a new Timestamp extension dtype with the given time unit, timezone, and nullability.
38    pub fn new_with_tz(
39        time_unit: TimeUnit,
40        timezone: Option<Arc<str>>,
41        nullability: Nullability,
42    ) -> ExtDType<Self> {
43        ExtDType::try_new(
44            TimestampOptions {
45                unit: time_unit,
46                tz: timezone,
47            },
48            DType::Primitive(PType::I64, nullability),
49        )
50        .vortex_expect("failed to create timestamp dtype")
51    }
52
53    /// Creates a new `Timestamp` extension dtype with the given options and nullability.
54    pub fn new_with_options(options: TimestampOptions, nullability: Nullability) -> ExtDType<Self> {
55        ExtDType::try_new(options, DType::Primitive(PType::I64, nullability))
56            .vortex_expect("failed to create timestamp dtype")
57    }
58}
59
60/// Options for the Timestamp DType.
61#[derive(Clone, Debug, PartialEq, Eq, Hash)]
62pub struct TimestampOptions {
63    /// The time unit of the timestamp.
64    pub unit: TimeUnit,
65    /// The timezone of the timestamp, if any.
66    pub tz: Option<Arc<str>>,
67}
68
69impl fmt::Display for TimestampOptions {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        match &self.tz {
72            Some(tz) => write!(f, "{}, tz={}", self.unit, tz),
73            None => write!(f, "{}", self.unit),
74        }
75    }
76}
77
78/// Unpacked value of a [`Timestamp`] extension scalar.
79///
80/// Each variant carries the raw storage value and an optional timezone.
81pub enum TimestampValue<'a> {
82    /// Seconds since the Unix epoch.
83    Seconds(i64, Option<&'a Arc<str>>),
84    /// Milliseconds since the Unix epoch.
85    Milliseconds(i64, Option<&'a Arc<str>>),
86    /// Microseconds since the Unix epoch.
87    Microseconds(i64, Option<&'a Arc<str>>),
88    /// Nanoseconds since the Unix epoch.
89    Nanoseconds(i64, Option<&'a Arc<str>>),
90}
91
92impl fmt::Display for TimestampValue<'_> {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        let (span, tz) = match self {
95            TimestampValue::Seconds(v, tz) => (Span::new().seconds(*v), *tz),
96            TimestampValue::Milliseconds(v, tz) => (Span::new().milliseconds(*v), *tz),
97            TimestampValue::Microseconds(v, tz) => (Span::new().microseconds(*v), *tz),
98            TimestampValue::Nanoseconds(v, tz) => (Span::new().nanoseconds(*v), *tz),
99        };
100        let ts = jiff::Timestamp::UNIX_EPOCH + span;
101
102        match tz {
103            None => write!(f, "{ts}"),
104            Some(tz) => {
105                let adjusted_ts = ts.in_tz(tz.as_ref()).vortex_expect("unknown timezone");
106                write!(f, "{adjusted_ts}",)
107            }
108        }
109    }
110}
111
112impl ExtVTable for Timestamp {
113    type Metadata = TimestampOptions;
114
115    type NativeValue<'a> = TimestampValue<'a>;
116
117    fn id(&self) -> ExtId {
118        static ID: CachedId = CachedId::new("vortex.timestamp");
119        *ID
120    }
121
122    // NOTE(ngates): unfortunately we're stuck with this hand-rolled serialization format for
123    //  backwards compatibility.
124    fn serialize_metadata(&self, metadata: &Self::Metadata) -> VortexResult<Vec<u8>> {
125        let mut bytes = Vec::with_capacity(4);
126        let unit_tag: u8 = metadata.unit.into();
127
128        bytes.push(unit_tag);
129
130        // Encode time_zone as u16 length followed by utf8 bytes.
131        match &metadata.tz {
132            None => bytes.extend_from_slice(0u16.to_le_bytes().as_slice()),
133            Some(tz) => {
134                let tz_bytes = tz.as_bytes();
135                let tz_len = u16::try_from(tz_bytes.len())
136                    .unwrap_or_else(|err| vortex_panic!("tz did not fit in u16: {}", err));
137                bytes.extend_from_slice(tz_len.to_le_bytes().as_slice());
138                bytes.extend_from_slice(tz_bytes);
139            }
140        }
141
142        Ok(bytes)
143    }
144
145    fn deserialize_metadata(&self, data: &[u8]) -> VortexResult<Self::Metadata> {
146        vortex_ensure!(
147            data.len() >= 3,
148            "Timestamp metadata must have at least 3 bytes, got {}",
149            data.len()
150        );
151
152        let tag = data[0];
153        let time_unit = TimeUnit::try_from(tag)?;
154        let tz_len_bytes: [u8; 2] = data[1..3]
155            .try_into()
156            .ok()
157            .vortex_expect("Verified to have two bytes");
158        let tz_len = u16::from_le_bytes(tz_len_bytes) as usize;
159        if tz_len == 0 {
160            return Ok(TimestampOptions {
161                unit: time_unit,
162                tz: None,
163            });
164        }
165
166        // Attempt to load from len-prefixed bytes
167        vortex_ensure!(
168            data.len() >= 3 + tz_len,
169            "Timestamp metadata is truncated: declared timezone length {} but only {} bytes available",
170            tz_len,
171            data.len() - 3
172        );
173        let tz_bytes = &data[3..3 + tz_len];
174        let tz: Arc<str> = str::from_utf8(tz_bytes)
175            .map_err(|e| vortex_err!("timezone is not valid utf8 string: {e}"))?
176            .to_string()
177            .into();
178        Ok(TimestampOptions {
179            unit: time_unit,
180            tz: Some(tz),
181        })
182    }
183
184    fn can_coerce_from(ext_dtype: &ExtDType<Self>, other: &DType) -> bool {
185        let DType::Extension(other_ext) = other else {
186            return false;
187        };
188        let Some(other_opts) = other_ext.metadata_opt::<Timestamp>() else {
189            return false;
190        };
191        let our_opts = ext_dtype.metadata();
192        our_opts.tz == other_opts.tz
193            && our_opts.unit <= other_opts.unit
194            && (ext_dtype.storage_dtype().is_nullable() || !other.is_nullable())
195    }
196
197    fn least_supertype(ext_dtype: &ExtDType<Self>, other: &DType) -> Option<DType> {
198        let DType::Extension(other_ext) = other else {
199            return None;
200        };
201        let other_opts = other_ext.metadata_opt::<Timestamp>()?;
202        let our_opts = ext_dtype.metadata();
203        if our_opts.tz != other_opts.tz {
204            return None;
205        }
206        let finest = our_opts.unit.min(other_opts.unit);
207        let union_null = ext_dtype.storage_dtype().nullability() | other.nullability();
208        Some(DType::Extension(
209            Timestamp::new_with_tz(finest, our_opts.tz.clone(), union_null).erased(),
210        ))
211    }
212
213    fn validate_dtype(ext_dtype: &ExtDType<Self>) -> VortexResult<()> {
214        vortex_ensure!(
215            matches!(ext_dtype.storage_dtype(), DType::Primitive(PType::I64, _)),
216            "Timestamp storage dtype must be i64"
217        );
218        Ok(())
219    }
220
221    fn unpack_native<'a>(
222        ext_dtype: &'a ExtDType<Self>,
223        storage_value: &'a ScalarValue,
224    ) -> VortexResult<Self::NativeValue<'a>> {
225        let metadata = ext_dtype.metadata();
226        let ts_value = storage_value.as_primitive().cast::<i64>()?;
227        let tz = metadata.tz.as_ref();
228
229        let (span, value) = match metadata.unit {
230            TimeUnit::Nanoseconds => (
231                Span::new().nanoseconds(ts_value),
232                TimestampValue::Nanoseconds(ts_value, tz),
233            ),
234            TimeUnit::Microseconds => (
235                Span::new().microseconds(ts_value),
236                TimestampValue::Microseconds(ts_value, tz),
237            ),
238            TimeUnit::Milliseconds => (
239                Span::new().milliseconds(ts_value),
240                TimestampValue::Milliseconds(ts_value, tz),
241            ),
242            TimeUnit::Seconds => (
243                Span::new().seconds(ts_value),
244                TimestampValue::Seconds(ts_value, tz),
245            ),
246            TimeUnit::Days => vortex_bail!("Timestamp does not support Days time unit"),
247        };
248
249        // Validate the storage value is within the valid range for Timestamp.
250        let ts = jiff::Timestamp::UNIX_EPOCH
251            .checked_add(span)
252            .map_err(|e| vortex_err!("Invalid timestamp scalar: {}", e))?;
253
254        if let Some(tz) = tz {
255            ts.in_tz(tz.as_ref())
256                .map_err(|e| vortex_err!("Invalid timezone for timestamp scalar: {}", e))?;
257        }
258
259        Ok(value)
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use std::sync::Arc;
266
267    use vortex_error::VortexResult;
268
269    use crate::dtype::DType;
270    use crate::dtype::Nullability::Nullable;
271    use crate::extension::datetime::TimeUnit;
272    use crate::extension::datetime::Timestamp;
273    use crate::scalar::PValue;
274    use crate::scalar::Scalar;
275    use crate::scalar::ScalarValue;
276
277    #[test]
278    fn validate_timestamp_scalar() -> VortexResult<()> {
279        let dtype = DType::Extension(Timestamp::new(TimeUnit::Seconds, Nullable).erased());
280        Scalar::try_new(dtype, Some(ScalarValue::Primitive(PValue::I64(0))))?;
281
282        Ok(())
283    }
284
285    #[cfg_attr(miri, ignore)]
286    #[test]
287    fn reject_timestamp_with_invalid_timezone() {
288        let dtype = DType::Extension(
289            Timestamp::new_with_tz(
290                TimeUnit::Seconds,
291                Some(Arc::from("Not/A/Timezone")),
292                Nullable,
293            )
294            .erased(),
295        );
296        let result = Scalar::try_new(dtype, Some(ScalarValue::Primitive(PValue::I64(0))));
297        assert!(result.is_err());
298    }
299
300    #[cfg_attr(miri, ignore)]
301    #[test]
302    fn display_timestamp_scalar() {
303        // Local (no timezone) timestamp.
304        let local_dtype = DType::Extension(Timestamp::new(TimeUnit::Seconds, Nullable).erased());
305        let scalar = Scalar::new(local_dtype, Some(ScalarValue::Primitive(PValue::I64(0))));
306        assert_eq!(format!("{}", scalar.as_extension()), "1970-01-01T00:00:00Z");
307
308        // Zoned timestamp.
309        let zoned_dtype = DType::Extension(
310            Timestamp::new_with_tz(
311                TimeUnit::Seconds,
312                Some(Arc::from("America/New_York")),
313                Nullable,
314            )
315            .erased(),
316        );
317        let scalar = Scalar::new(zoned_dtype, Some(ScalarValue::Primitive(PValue::I64(0))));
318        assert_eq!(
319            format!("{}", scalar.as_extension()),
320            "1969-12-31T19:00:00-05:00[America/New_York]"
321        );
322    }
323
324    #[test]
325    fn least_supertype_timestamp_units() {
326        use crate::dtype::Nullability::NonNullable;
327
328        let secs = DType::Extension(Timestamp::new(TimeUnit::Seconds, NonNullable).erased());
329        let ns = DType::Extension(Timestamp::new(TimeUnit::Nanoseconds, NonNullable).erased());
330        let expected =
331            DType::Extension(Timestamp::new(TimeUnit::Nanoseconds, NonNullable).erased());
332        assert_eq!(secs.least_supertype(&ns).unwrap(), expected);
333        assert_eq!(ns.least_supertype(&secs).unwrap(), expected);
334    }
335
336    #[test]
337    fn least_supertype_timestamp_tz_mismatch() {
338        use crate::dtype::Nullability::NonNullable;
339
340        let utc = DType::Extension(
341            Timestamp::new_with_tz(TimeUnit::Seconds, Some(Arc::from("UTC")), NonNullable).erased(),
342        );
343        let none = DType::Extension(Timestamp::new(TimeUnit::Seconds, NonNullable).erased());
344        assert!(utc.least_supertype(&none).is_none());
345    }
346
347    #[test]
348    fn least_supertype_timestamp_same_tz() {
349        use crate::dtype::Nullability::NonNullable;
350
351        let utc_s = DType::Extension(
352            Timestamp::new_with_tz(TimeUnit::Seconds, Some(Arc::from("UTC")), NonNullable).erased(),
353        );
354        let utc_ns = DType::Extension(
355            Timestamp::new_with_tz(TimeUnit::Nanoseconds, Some(Arc::from("UTC")), NonNullable)
356                .erased(),
357        );
358        let expected = DType::Extension(
359            Timestamp::new_with_tz(TimeUnit::Nanoseconds, Some(Arc::from("UTC")), NonNullable)
360                .erased(),
361        );
362        assert_eq!(utc_s.least_supertype(&utc_ns).unwrap(), expected);
363    }
364
365    #[test]
366    fn can_coerce_from_timestamp_tz() {
367        use crate::dtype::Nullability::NonNullable;
368
369        let utc = DType::Extension(
370            Timestamp::new_with_tz(TimeUnit::Nanoseconds, Some(Arc::from("UTC")), NonNullable)
371                .erased(),
372        );
373        let utc_s = DType::Extension(
374            Timestamp::new_with_tz(TimeUnit::Seconds, Some(Arc::from("UTC")), NonNullable).erased(),
375        );
376        let none = DType::Extension(Timestamp::new(TimeUnit::Nanoseconds, NonNullable).erased());
377        assert!(utc.can_coerce_from(&utc_s));
378        assert!(!utc.can_coerce_from(&none));
379    }
380
381    #[test]
382    fn deserialize_empty_metadata_returns_error() {
383        use crate::dtype::extension::ExtVTable;
384
385        let vtable = Timestamp;
386        assert!(vtable.deserialize_metadata(&[]).is_err());
387    }
388
389    #[test]
390    fn deserialize_too_short_metadata_returns_error() {
391        use crate::dtype::extension::ExtVTable;
392
393        let vtable = Timestamp;
394        // Only 2 bytes - too short for the required 3-byte header.
395        assert!(vtable.deserialize_metadata(&[0x00, 0x01]).is_err());
396    }
397
398    #[test]
399    fn deserialize_truncated_timezone_returns_error() {
400        use crate::dtype::extension::ExtVTable;
401
402        let vtable = Timestamp;
403        // Valid tag (0x00 = Nanoseconds), tz_len = 10 (little-endian: [0x0A, 0x00]),
404        // but only 3 bytes of timezone data instead of the declared 10.
405        let data = [0x00u8, 0x0A, 0x00, b'U', b'T', b'C'];
406        assert!(vtable.deserialize_metadata(&data).is_err());
407    }
408}