vortex_array/extension/datetime/
timestamp.rs1use std::fmt;
7use std::sync::Arc;
8
9use jiff::Span;
10use vortex_error::VortexExpect;
11use vortex_error::VortexResult;
12use vortex_error::vortex_bail;
13use vortex_error::vortex_ensure;
14use vortex_error::vortex_err;
15use vortex_error::vortex_panic;
16use vortex_session::registry::CachedId;
17
18use crate::dtype::DType;
19use crate::dtype::Nullability;
20use crate::dtype::PType;
21use crate::dtype::extension::ExtDType;
22use crate::dtype::extension::ExtId;
23use crate::dtype::extension::ExtVTable;
24use crate::extension::datetime::TimeUnit;
25use crate::scalar::ScalarValue;
26
27#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
29pub struct Timestamp;
30
31impl Timestamp {
32 pub fn new(time_unit: TimeUnit, nullability: Nullability) -> ExtDType<Self> {
34 Self::new_with_tz(time_unit, None, nullability)
35 }
36
37 pub fn new_with_tz(
39 time_unit: TimeUnit,
40 timezone: Option<Arc<str>>,
41 nullability: Nullability,
42 ) -> ExtDType<Self> {
43 ExtDType::try_new(
44 TimestampOptions {
45 unit: time_unit,
46 tz: timezone,
47 },
48 DType::Primitive(PType::I64, nullability),
49 )
50 .vortex_expect("failed to create timestamp dtype")
51 }
52
53 pub fn new_with_options(options: TimestampOptions, nullability: Nullability) -> ExtDType<Self> {
55 ExtDType::try_new(options, DType::Primitive(PType::I64, nullability))
56 .vortex_expect("failed to create timestamp dtype")
57 }
58}
59
60#[derive(Clone, Debug, PartialEq, Eq, Hash)]
62pub struct TimestampOptions {
63 pub unit: TimeUnit,
65 pub tz: Option<Arc<str>>,
67}
68
69impl fmt::Display for TimestampOptions {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 match &self.tz {
72 Some(tz) => write!(f, "{}, tz={}", self.unit, tz),
73 None => write!(f, "{}", self.unit),
74 }
75 }
76}
77
78pub enum TimestampValue<'a> {
82 Seconds(i64, Option<&'a Arc<str>>),
84 Milliseconds(i64, Option<&'a Arc<str>>),
86 Microseconds(i64, Option<&'a Arc<str>>),
88 Nanoseconds(i64, Option<&'a Arc<str>>),
90}
91
92impl fmt::Display for TimestampValue<'_> {
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 let (span, tz) = match self {
95 TimestampValue::Seconds(v, tz) => (Span::new().seconds(*v), *tz),
96 TimestampValue::Milliseconds(v, tz) => (Span::new().milliseconds(*v), *tz),
97 TimestampValue::Microseconds(v, tz) => (Span::new().microseconds(*v), *tz),
98 TimestampValue::Nanoseconds(v, tz) => (Span::new().nanoseconds(*v), *tz),
99 };
100 let ts = jiff::Timestamp::UNIX_EPOCH + span;
101
102 match tz {
103 None => write!(f, "{ts}"),
104 Some(tz) => {
105 let adjusted_ts = ts.in_tz(tz.as_ref()).vortex_expect("unknown timezone");
106 write!(f, "{adjusted_ts}",)
107 }
108 }
109 }
110}
111
112impl ExtVTable for Timestamp {
113 type Metadata = TimestampOptions;
114
115 type NativeValue<'a> = TimestampValue<'a>;
116
117 fn id(&self) -> ExtId {
118 static ID: CachedId = CachedId::new("vortex.timestamp");
119 *ID
120 }
121
122 fn serialize_metadata(&self, metadata: &Self::Metadata) -> VortexResult<Vec<u8>> {
125 let mut bytes = Vec::with_capacity(4);
126 let unit_tag: u8 = metadata.unit.into();
127
128 bytes.push(unit_tag);
129
130 match &metadata.tz {
132 None => bytes.extend_from_slice(0u16.to_le_bytes().as_slice()),
133 Some(tz) => {
134 let tz_bytes = tz.as_bytes();
135 let tz_len = u16::try_from(tz_bytes.len())
136 .unwrap_or_else(|err| vortex_panic!("tz did not fit in u16: {}", err));
137 bytes.extend_from_slice(tz_len.to_le_bytes().as_slice());
138 bytes.extend_from_slice(tz_bytes);
139 }
140 }
141
142 Ok(bytes)
143 }
144
145 fn deserialize_metadata(&self, data: &[u8]) -> VortexResult<Self::Metadata> {
146 vortex_ensure!(
147 data.len() >= 3,
148 "Timestamp metadata must have at least 3 bytes, got {}",
149 data.len()
150 );
151
152 let tag = data[0];
153 let time_unit = TimeUnit::try_from(tag)?;
154 let tz_len_bytes: [u8; 2] = data[1..3]
155 .try_into()
156 .ok()
157 .vortex_expect("Verified to have two bytes");
158 let tz_len = u16::from_le_bytes(tz_len_bytes) as usize;
159 if tz_len == 0 {
160 return Ok(TimestampOptions {
161 unit: time_unit,
162 tz: None,
163 });
164 }
165
166 vortex_ensure!(
168 data.len() >= 3 + tz_len,
169 "Timestamp metadata is truncated: declared timezone length {} but only {} bytes available",
170 tz_len,
171 data.len() - 3
172 );
173 let tz_bytes = &data[3..3 + tz_len];
174 let tz: Arc<str> = str::from_utf8(tz_bytes)
175 .map_err(|e| vortex_err!("timezone is not valid utf8 string: {e}"))?
176 .to_string()
177 .into();
178 Ok(TimestampOptions {
179 unit: time_unit,
180 tz: Some(tz),
181 })
182 }
183
184 fn can_coerce_from(ext_dtype: &ExtDType<Self>, other: &DType) -> bool {
185 let DType::Extension(other_ext) = other else {
186 return false;
187 };
188 let Some(other_opts) = other_ext.metadata_opt::<Timestamp>() else {
189 return false;
190 };
191 let our_opts = ext_dtype.metadata();
192 our_opts.tz == other_opts.tz
193 && our_opts.unit <= other_opts.unit
194 && (ext_dtype.storage_dtype().is_nullable() || !other.is_nullable())
195 }
196
197 fn least_supertype(ext_dtype: &ExtDType<Self>, other: &DType) -> Option<DType> {
198 let DType::Extension(other_ext) = other else {
199 return None;
200 };
201 let other_opts = other_ext.metadata_opt::<Timestamp>()?;
202 let our_opts = ext_dtype.metadata();
203 if our_opts.tz != other_opts.tz {
204 return None;
205 }
206 let finest = our_opts.unit.min(other_opts.unit);
207 let union_null = ext_dtype.storage_dtype().nullability() | other.nullability();
208 Some(DType::Extension(
209 Timestamp::new_with_tz(finest, our_opts.tz.clone(), union_null).erased(),
210 ))
211 }
212
213 fn validate_dtype(ext_dtype: &ExtDType<Self>) -> VortexResult<()> {
214 vortex_ensure!(
215 matches!(ext_dtype.storage_dtype(), DType::Primitive(PType::I64, _)),
216 "Timestamp storage dtype must be i64"
217 );
218 Ok(())
219 }
220
221 fn unpack_native<'a>(
222 ext_dtype: &'a ExtDType<Self>,
223 storage_value: &'a ScalarValue,
224 ) -> VortexResult<Self::NativeValue<'a>> {
225 let metadata = ext_dtype.metadata();
226 let ts_value = storage_value.as_primitive().cast::<i64>()?;
227 let tz = metadata.tz.as_ref();
228
229 let (span, value) = match metadata.unit {
230 TimeUnit::Nanoseconds => (
231 Span::new().nanoseconds(ts_value),
232 TimestampValue::Nanoseconds(ts_value, tz),
233 ),
234 TimeUnit::Microseconds => (
235 Span::new().microseconds(ts_value),
236 TimestampValue::Microseconds(ts_value, tz),
237 ),
238 TimeUnit::Milliseconds => (
239 Span::new().milliseconds(ts_value),
240 TimestampValue::Milliseconds(ts_value, tz),
241 ),
242 TimeUnit::Seconds => (
243 Span::new().seconds(ts_value),
244 TimestampValue::Seconds(ts_value, tz),
245 ),
246 TimeUnit::Days => vortex_bail!("Timestamp does not support Days time unit"),
247 };
248
249 let ts = jiff::Timestamp::UNIX_EPOCH
251 .checked_add(span)
252 .map_err(|e| vortex_err!("Invalid timestamp scalar: {}", e))?;
253
254 if let Some(tz) = tz {
255 ts.in_tz(tz.as_ref())
256 .map_err(|e| vortex_err!("Invalid timezone for timestamp scalar: {}", e))?;
257 }
258
259 Ok(value)
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use std::sync::Arc;
266
267 use vortex_error::VortexResult;
268
269 use crate::dtype::DType;
270 use crate::dtype::Nullability::Nullable;
271 use crate::extension::datetime::TimeUnit;
272 use crate::extension::datetime::Timestamp;
273 use crate::scalar::PValue;
274 use crate::scalar::Scalar;
275 use crate::scalar::ScalarValue;
276
277 #[test]
278 fn validate_timestamp_scalar() -> VortexResult<()> {
279 let dtype = DType::Extension(Timestamp::new(TimeUnit::Seconds, Nullable).erased());
280 Scalar::try_new(dtype, Some(ScalarValue::Primitive(PValue::I64(0))))?;
281
282 Ok(())
283 }
284
285 #[cfg_attr(miri, ignore)]
286 #[test]
287 fn reject_timestamp_with_invalid_timezone() {
288 let dtype = DType::Extension(
289 Timestamp::new_with_tz(
290 TimeUnit::Seconds,
291 Some(Arc::from("Not/A/Timezone")),
292 Nullable,
293 )
294 .erased(),
295 );
296 let result = Scalar::try_new(dtype, Some(ScalarValue::Primitive(PValue::I64(0))));
297 assert!(result.is_err());
298 }
299
300 #[cfg_attr(miri, ignore)]
301 #[test]
302 fn display_timestamp_scalar() {
303 let local_dtype = DType::Extension(Timestamp::new(TimeUnit::Seconds, Nullable).erased());
305 let scalar = Scalar::new(local_dtype, Some(ScalarValue::Primitive(PValue::I64(0))));
306 assert_eq!(format!("{}", scalar.as_extension()), "1970-01-01T00:00:00Z");
307
308 let zoned_dtype = DType::Extension(
310 Timestamp::new_with_tz(
311 TimeUnit::Seconds,
312 Some(Arc::from("America/New_York")),
313 Nullable,
314 )
315 .erased(),
316 );
317 let scalar = Scalar::new(zoned_dtype, Some(ScalarValue::Primitive(PValue::I64(0))));
318 assert_eq!(
319 format!("{}", scalar.as_extension()),
320 "1969-12-31T19:00:00-05:00[America/New_York]"
321 );
322 }
323
324 #[test]
325 fn least_supertype_timestamp_units() {
326 use crate::dtype::Nullability::NonNullable;
327
328 let secs = DType::Extension(Timestamp::new(TimeUnit::Seconds, NonNullable).erased());
329 let ns = DType::Extension(Timestamp::new(TimeUnit::Nanoseconds, NonNullable).erased());
330 let expected =
331 DType::Extension(Timestamp::new(TimeUnit::Nanoseconds, NonNullable).erased());
332 assert_eq!(secs.least_supertype(&ns).unwrap(), expected);
333 assert_eq!(ns.least_supertype(&secs).unwrap(), expected);
334 }
335
336 #[test]
337 fn least_supertype_timestamp_tz_mismatch() {
338 use crate::dtype::Nullability::NonNullable;
339
340 let utc = DType::Extension(
341 Timestamp::new_with_tz(TimeUnit::Seconds, Some(Arc::from("UTC")), NonNullable).erased(),
342 );
343 let none = DType::Extension(Timestamp::new(TimeUnit::Seconds, NonNullable).erased());
344 assert!(utc.least_supertype(&none).is_none());
345 }
346
347 #[test]
348 fn least_supertype_timestamp_same_tz() {
349 use crate::dtype::Nullability::NonNullable;
350
351 let utc_s = DType::Extension(
352 Timestamp::new_with_tz(TimeUnit::Seconds, Some(Arc::from("UTC")), NonNullable).erased(),
353 );
354 let utc_ns = DType::Extension(
355 Timestamp::new_with_tz(TimeUnit::Nanoseconds, Some(Arc::from("UTC")), NonNullable)
356 .erased(),
357 );
358 let expected = DType::Extension(
359 Timestamp::new_with_tz(TimeUnit::Nanoseconds, Some(Arc::from("UTC")), NonNullable)
360 .erased(),
361 );
362 assert_eq!(utc_s.least_supertype(&utc_ns).unwrap(), expected);
363 }
364
365 #[test]
366 fn can_coerce_from_timestamp_tz() {
367 use crate::dtype::Nullability::NonNullable;
368
369 let utc = DType::Extension(
370 Timestamp::new_with_tz(TimeUnit::Nanoseconds, Some(Arc::from("UTC")), NonNullable)
371 .erased(),
372 );
373 let utc_s = DType::Extension(
374 Timestamp::new_with_tz(TimeUnit::Seconds, Some(Arc::from("UTC")), NonNullable).erased(),
375 );
376 let none = DType::Extension(Timestamp::new(TimeUnit::Nanoseconds, NonNullable).erased());
377 assert!(utc.can_coerce_from(&utc_s));
378 assert!(!utc.can_coerce_from(&none));
379 }
380
381 #[test]
382 fn deserialize_empty_metadata_returns_error() {
383 use crate::dtype::extension::ExtVTable;
384
385 let vtable = Timestamp;
386 assert!(vtable.deserialize_metadata(&[]).is_err());
387 }
388
389 #[test]
390 fn deserialize_too_short_metadata_returns_error() {
391 use crate::dtype::extension::ExtVTable;
392
393 let vtable = Timestamp;
394 assert!(vtable.deserialize_metadata(&[0x00, 0x01]).is_err());
396 }
397
398 #[test]
399 fn deserialize_truncated_timezone_returns_error() {
400 use crate::dtype::extension::ExtVTable;
401
402 let vtable = Timestamp;
403 let data = [0x00u8, 0x0A, 0x00, b'U', b'T', b'C'];
406 assert!(vtable.deserialize_metadata(&data).is_err());
407 }
408}