vortex_array/extension/datetime/
timestamp.rs1use std::fmt;
7use std::sync::Arc;
8
9use jiff::Span;
10use vortex_error::VortexExpect;
11use vortex_error::VortexResult;
12use vortex_error::vortex_bail;
13use vortex_error::vortex_ensure;
14use vortex_error::vortex_err;
15use vortex_error::vortex_panic;
16
17use crate::dtype::DType;
18use crate::dtype::Nullability;
19use crate::dtype::PType;
20use crate::dtype::extension::ExtDType;
21use crate::dtype::extension::ExtId;
22use crate::dtype::extension::ExtVTable;
23use crate::extension::datetime::TimeUnit;
24use crate::scalar::ScalarValue;
25
26#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
28pub struct Timestamp;
29
30impl Timestamp {
31 pub fn new(time_unit: TimeUnit, nullability: Nullability) -> ExtDType<Self> {
33 Self::new_with_tz(time_unit, None, nullability)
34 }
35
36 pub fn new_with_tz(
38 time_unit: TimeUnit,
39 timezone: Option<Arc<str>>,
40 nullability: Nullability,
41 ) -> ExtDType<Self> {
42 ExtDType::try_new(
43 TimestampOptions {
44 unit: time_unit,
45 tz: timezone,
46 },
47 DType::Primitive(PType::I64, nullability),
48 )
49 .vortex_expect("failed to create timestamp dtype")
50 }
51
52 pub fn new_with_options(options: TimestampOptions, nullability: Nullability) -> ExtDType<Self> {
54 ExtDType::try_new(options, DType::Primitive(PType::I64, nullability))
55 .vortex_expect("failed to create timestamp dtype")
56 }
57}
58
59#[derive(Clone, Debug, PartialEq, Eq, Hash)]
61pub struct TimestampOptions {
62 pub unit: TimeUnit,
64 pub tz: Option<Arc<str>>,
66}
67
68impl fmt::Display for TimestampOptions {
69 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70 match &self.tz {
71 Some(tz) => write!(f, "{}, tz={}", self.unit, tz),
72 None => write!(f, "{}", self.unit),
73 }
74 }
75}
76
77pub enum TimestampValue<'a> {
81 Seconds(i64, Option<&'a Arc<str>>),
83 Milliseconds(i64, Option<&'a Arc<str>>),
85 Microseconds(i64, Option<&'a Arc<str>>),
87 Nanoseconds(i64, Option<&'a Arc<str>>),
89}
90
91impl fmt::Display for TimestampValue<'_> {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 let (span, tz) = match self {
94 TimestampValue::Seconds(v, tz) => (Span::new().seconds(*v), *tz),
95 TimestampValue::Milliseconds(v, tz) => (Span::new().milliseconds(*v), *tz),
96 TimestampValue::Microseconds(v, tz) => (Span::new().microseconds(*v), *tz),
97 TimestampValue::Nanoseconds(v, tz) => (Span::new().nanoseconds(*v), *tz),
98 };
99 let ts = jiff::Timestamp::UNIX_EPOCH + span;
100
101 match tz {
102 None => write!(f, "{ts}"),
103 Some(tz) => {
104 let adjusted_ts = ts.in_tz(tz.as_ref()).vortex_expect("unknown timezone");
105 write!(f, "{adjusted_ts}",)
106 }
107 }
108 }
109}
110
111impl ExtVTable for Timestamp {
112 type Metadata = TimestampOptions;
113
114 type NativeValue<'a> = TimestampValue<'a>;
115
116 fn id(&self) -> ExtId {
117 ExtId::new("vortex.timestamp")
118 }
119
120 fn serialize_metadata(&self, metadata: &Self::Metadata) -> VortexResult<Vec<u8>> {
123 let mut bytes = Vec::with_capacity(4);
124 let unit_tag: u8 = metadata.unit.into();
125
126 bytes.push(unit_tag);
127
128 match &metadata.tz {
130 None => bytes.extend_from_slice(0u16.to_le_bytes().as_slice()),
131 Some(tz) => {
132 let tz_bytes = tz.as_bytes();
133 let tz_len = u16::try_from(tz_bytes.len())
134 .unwrap_or_else(|err| vortex_panic!("tz did not fit in u16: {}", err));
135 bytes.extend_from_slice(tz_len.to_le_bytes().as_slice());
136 bytes.extend_from_slice(tz_bytes);
137 }
138 }
139
140 Ok(bytes)
141 }
142
143 fn deserialize_metadata(&self, data: &[u8]) -> VortexResult<Self::Metadata> {
144 vortex_ensure!(
145 data.len() >= 3,
146 "Timestamp metadata must have at least 3 bytes, got {}",
147 data.len()
148 );
149
150 let tag = data[0];
151 let time_unit = TimeUnit::try_from(tag)?;
152 let tz_len_bytes: [u8; 2] = data[1..3]
153 .try_into()
154 .ok()
155 .vortex_expect("Verified to have two bytes");
156 let tz_len = u16::from_le_bytes(tz_len_bytes) as usize;
157 if tz_len == 0 {
158 return Ok(TimestampOptions {
159 unit: time_unit,
160 tz: None,
161 });
162 }
163
164 vortex_ensure!(
166 data.len() >= 3 + tz_len,
167 "Timestamp metadata is truncated: declared timezone length {} but only {} bytes available",
168 tz_len,
169 data.len() - 3
170 );
171 let tz_bytes = &data[3..3 + tz_len];
172 let tz: Arc<str> = str::from_utf8(tz_bytes)
173 .map_err(|e| vortex_err!("timezone is not valid utf8 string: {e}"))?
174 .to_string()
175 .into();
176 Ok(TimestampOptions {
177 unit: time_unit,
178 tz: Some(tz),
179 })
180 }
181
182 fn can_coerce_from(ext_dtype: &ExtDType<Self>, other: &DType) -> bool {
183 let DType::Extension(other_ext) = other else {
184 return false;
185 };
186 let Some(other_opts) = other_ext.metadata_opt::<Timestamp>() else {
187 return false;
188 };
189 let our_opts = ext_dtype.metadata();
190 our_opts.tz == other_opts.tz
191 && our_opts.unit <= other_opts.unit
192 && (ext_dtype.storage_dtype().is_nullable() || !other.is_nullable())
193 }
194
195 fn least_supertype(ext_dtype: &ExtDType<Self>, other: &DType) -> Option<DType> {
196 let DType::Extension(other_ext) = other else {
197 return None;
198 };
199 let other_opts = other_ext.metadata_opt::<Timestamp>()?;
200 let our_opts = ext_dtype.metadata();
201 if our_opts.tz != other_opts.tz {
202 return None;
203 }
204 let finest = our_opts.unit.min(other_opts.unit);
205 let union_null = ext_dtype.storage_dtype().nullability() | other.nullability();
206 Some(DType::Extension(
207 Timestamp::new_with_tz(finest, our_opts.tz.clone(), union_null).erased(),
208 ))
209 }
210
211 fn validate_dtype(ext_dtype: &ExtDType<Self>) -> VortexResult<()> {
212 vortex_ensure!(
213 matches!(ext_dtype.storage_dtype(), DType::Primitive(PType::I64, _)),
214 "Timestamp storage dtype must be i64"
215 );
216 Ok(())
217 }
218
219 fn unpack_native<'a>(
220 ext_dtype: &'a ExtDType<Self>,
221 storage_value: &'a ScalarValue,
222 ) -> VortexResult<Self::NativeValue<'a>> {
223 let metadata = ext_dtype.metadata();
224 let ts_value = storage_value.as_primitive().cast::<i64>()?;
225 let tz = metadata.tz.as_ref();
226
227 let (span, value) = match metadata.unit {
228 TimeUnit::Nanoseconds => (
229 Span::new().nanoseconds(ts_value),
230 TimestampValue::Nanoseconds(ts_value, tz),
231 ),
232 TimeUnit::Microseconds => (
233 Span::new().microseconds(ts_value),
234 TimestampValue::Microseconds(ts_value, tz),
235 ),
236 TimeUnit::Milliseconds => (
237 Span::new().milliseconds(ts_value),
238 TimestampValue::Milliseconds(ts_value, tz),
239 ),
240 TimeUnit::Seconds => (
241 Span::new().seconds(ts_value),
242 TimestampValue::Seconds(ts_value, tz),
243 ),
244 TimeUnit::Days => vortex_bail!("Timestamp does not support Days time unit"),
245 };
246
247 let ts = jiff::Timestamp::UNIX_EPOCH
249 .checked_add(span)
250 .map_err(|e| vortex_err!("Invalid timestamp scalar: {}", e))?;
251
252 if let Some(tz) = tz {
253 ts.in_tz(tz.as_ref())
254 .map_err(|e| vortex_err!("Invalid timezone for timestamp scalar: {}", e))?;
255 }
256
257 Ok(value)
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use std::sync::Arc;
264
265 use vortex_error::VortexResult;
266
267 use crate::dtype::DType;
268 use crate::dtype::Nullability::Nullable;
269 use crate::extension::datetime::TimeUnit;
270 use crate::extension::datetime::Timestamp;
271 use crate::scalar::PValue;
272 use crate::scalar::Scalar;
273 use crate::scalar::ScalarValue;
274
275 #[test]
276 fn validate_timestamp_scalar() -> VortexResult<()> {
277 let dtype = DType::Extension(Timestamp::new(TimeUnit::Seconds, Nullable).erased());
278 Scalar::try_new(dtype, Some(ScalarValue::Primitive(PValue::I64(0))))?;
279
280 Ok(())
281 }
282
283 #[cfg_attr(miri, ignore)]
284 #[test]
285 fn reject_timestamp_with_invalid_timezone() {
286 let dtype = DType::Extension(
287 Timestamp::new_with_tz(
288 TimeUnit::Seconds,
289 Some(Arc::from("Not/A/Timezone")),
290 Nullable,
291 )
292 .erased(),
293 );
294 let result = Scalar::try_new(dtype, Some(ScalarValue::Primitive(PValue::I64(0))));
295 assert!(result.is_err());
296 }
297
298 #[cfg_attr(miri, ignore)]
299 #[test]
300 fn display_timestamp_scalar() {
301 let local_dtype = DType::Extension(Timestamp::new(TimeUnit::Seconds, Nullable).erased());
303 let scalar = Scalar::new(local_dtype, Some(ScalarValue::Primitive(PValue::I64(0))));
304 assert_eq!(format!("{}", scalar.as_extension()), "1970-01-01T00:00:00Z");
305
306 let zoned_dtype = DType::Extension(
308 Timestamp::new_with_tz(
309 TimeUnit::Seconds,
310 Some(Arc::from("America/New_York")),
311 Nullable,
312 )
313 .erased(),
314 );
315 let scalar = Scalar::new(zoned_dtype, Some(ScalarValue::Primitive(PValue::I64(0))));
316 assert_eq!(
317 format!("{}", scalar.as_extension()),
318 "1969-12-31T19:00:00-05:00[America/New_York]"
319 );
320 }
321
322 #[test]
323 fn least_supertype_timestamp_units() {
324 use crate::dtype::Nullability::NonNullable;
325
326 let secs = DType::Extension(Timestamp::new(TimeUnit::Seconds, NonNullable).erased());
327 let ns = DType::Extension(Timestamp::new(TimeUnit::Nanoseconds, NonNullable).erased());
328 let expected =
329 DType::Extension(Timestamp::new(TimeUnit::Nanoseconds, NonNullable).erased());
330 assert_eq!(secs.least_supertype(&ns).unwrap(), expected);
331 assert_eq!(ns.least_supertype(&secs).unwrap(), expected);
332 }
333
334 #[test]
335 fn least_supertype_timestamp_tz_mismatch() {
336 use crate::dtype::Nullability::NonNullable;
337
338 let utc = DType::Extension(
339 Timestamp::new_with_tz(TimeUnit::Seconds, Some(Arc::from("UTC")), NonNullable).erased(),
340 );
341 let none = DType::Extension(Timestamp::new(TimeUnit::Seconds, NonNullable).erased());
342 assert!(utc.least_supertype(&none).is_none());
343 }
344
345 #[test]
346 fn least_supertype_timestamp_same_tz() {
347 use crate::dtype::Nullability::NonNullable;
348
349 let utc_s = DType::Extension(
350 Timestamp::new_with_tz(TimeUnit::Seconds, Some(Arc::from("UTC")), NonNullable).erased(),
351 );
352 let utc_ns = DType::Extension(
353 Timestamp::new_with_tz(TimeUnit::Nanoseconds, Some(Arc::from("UTC")), NonNullable)
354 .erased(),
355 );
356 let expected = DType::Extension(
357 Timestamp::new_with_tz(TimeUnit::Nanoseconds, Some(Arc::from("UTC")), NonNullable)
358 .erased(),
359 );
360 assert_eq!(utc_s.least_supertype(&utc_ns).unwrap(), expected);
361 }
362
363 #[test]
364 fn can_coerce_from_timestamp_tz() {
365 use crate::dtype::Nullability::NonNullable;
366
367 let utc = DType::Extension(
368 Timestamp::new_with_tz(TimeUnit::Nanoseconds, Some(Arc::from("UTC")), NonNullable)
369 .erased(),
370 );
371 let utc_s = DType::Extension(
372 Timestamp::new_with_tz(TimeUnit::Seconds, Some(Arc::from("UTC")), NonNullable).erased(),
373 );
374 let none = DType::Extension(Timestamp::new(TimeUnit::Nanoseconds, NonNullable).erased());
375 assert!(utc.can_coerce_from(&utc_s));
376 assert!(!utc.can_coerce_from(&none));
377 }
378
379 #[test]
380 fn deserialize_empty_metadata_returns_error() {
381 use crate::dtype::extension::ExtVTable;
382
383 let vtable = Timestamp;
384 assert!(vtable.deserialize_metadata(&[]).is_err());
385 }
386
387 #[test]
388 fn deserialize_too_short_metadata_returns_error() {
389 use crate::dtype::extension::ExtVTable;
390
391 let vtable = Timestamp;
392 assert!(vtable.deserialize_metadata(&[0x00, 0x01]).is_err());
394 }
395
396 #[test]
397 fn deserialize_truncated_timezone_returns_error() {
398 use crate::dtype::extension::ExtVTable;
399
400 let vtable = Timestamp;
401 let data = [0x00u8, 0x0A, 0x00, b'U', b'T', b'C'];
404 assert!(vtable.deserialize_metadata(&data).is_err());
405 }
406}