vortex_array/extension/datetime/
timestamp.rs1use std::fmt;
7use std::sync::Arc;
8
9use jiff::Span;
10use vortex_error::VortexExpect;
11use vortex_error::VortexResult;
12use vortex_error::vortex_bail;
13use vortex_error::vortex_ensure;
14use vortex_error::vortex_err;
15use vortex_error::vortex_panic;
16
17use crate::dtype::DType;
18use crate::dtype::Nullability;
19use crate::dtype::PType;
20use crate::dtype::extension::ExtDType;
21use crate::dtype::extension::ExtId;
22use crate::dtype::extension::ExtVTable;
23use crate::extension::datetime::TimeUnit;
24use crate::scalar::ScalarValue;
25
26#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
28pub struct Timestamp;
29
30impl Timestamp {
31 pub fn new(time_unit: TimeUnit, nullability: Nullability) -> ExtDType<Self> {
33 Self::new_with_tz(time_unit, None, nullability)
34 }
35
36 pub fn new_with_tz(
38 time_unit: TimeUnit,
39 timezone: Option<Arc<str>>,
40 nullability: Nullability,
41 ) -> ExtDType<Self> {
42 ExtDType::try_new(
43 TimestampOptions {
44 unit: time_unit,
45 tz: timezone,
46 },
47 DType::Primitive(PType::I64, nullability),
48 )
49 .vortex_expect("failed to create timestamp dtype")
50 }
51
52 pub fn new_with_options(options: TimestampOptions, nullability: Nullability) -> ExtDType<Self> {
54 ExtDType::try_new(options, DType::Primitive(PType::I64, nullability))
55 .vortex_expect("failed to create timestamp dtype")
56 }
57}
58
59#[derive(Clone, Debug, PartialEq, Eq, Hash)]
61pub struct TimestampOptions {
62 pub unit: TimeUnit,
64 pub tz: Option<Arc<str>>,
66}
67
68impl fmt::Display for TimestampOptions {
69 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70 match &self.tz {
71 Some(tz) => write!(f, "{}, tz={}", self.unit, tz),
72 None => write!(f, "{}", self.unit),
73 }
74 }
75}
76
77pub enum TimestampValue<'a> {
81 Seconds(i64, Option<&'a Arc<str>>),
83 Milliseconds(i64, Option<&'a Arc<str>>),
85 Microseconds(i64, Option<&'a Arc<str>>),
87 Nanoseconds(i64, Option<&'a Arc<str>>),
89}
90
91impl fmt::Display for TimestampValue<'_> {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 let (span, tz) = match self {
94 TimestampValue::Seconds(v, tz) => (Span::new().seconds(*v), *tz),
95 TimestampValue::Milliseconds(v, tz) => (Span::new().milliseconds(*v), *tz),
96 TimestampValue::Microseconds(v, tz) => (Span::new().microseconds(*v), *tz),
97 TimestampValue::Nanoseconds(v, tz) => (Span::new().nanoseconds(*v), *tz),
98 };
99 let ts = jiff::Timestamp::UNIX_EPOCH + span;
100
101 match tz {
102 None => write!(f, "{ts}"),
103 Some(tz) => {
104 let adjusted_ts = ts.in_tz(tz.as_ref()).vortex_expect("unknown timezone");
105 write!(f, "{adjusted_ts}",)
106 }
107 }
108 }
109}
110
111impl ExtVTable for Timestamp {
112 type Metadata = TimestampOptions;
113
114 type NativeValue<'a> = TimestampValue<'a>;
115
116 fn id(&self) -> ExtId {
117 ExtId::new_ref("vortex.timestamp")
118 }
119
120 fn serialize_metadata(&self, metadata: &Self::Metadata) -> VortexResult<Vec<u8>> {
123 let mut bytes = Vec::with_capacity(4);
124 let unit_tag: u8 = metadata.unit.into();
125
126 bytes.push(unit_tag);
127
128 match &metadata.tz {
130 None => bytes.extend_from_slice(0u16.to_le_bytes().as_slice()),
131 Some(tz) => {
132 let tz_bytes = tz.as_bytes();
133 let tz_len = u16::try_from(tz_bytes.len())
134 .unwrap_or_else(|err| vortex_panic!("tz did not fit in u16: {}", err));
135 bytes.extend_from_slice(tz_len.to_le_bytes().as_slice());
136 bytes.extend_from_slice(tz_bytes);
137 }
138 }
139
140 Ok(bytes)
141 }
142
143 fn deserialize_metadata(&self, data: &[u8]) -> VortexResult<Self::Metadata> {
144 vortex_ensure!(data.len() >= 3);
145
146 let tag = data[0];
147 let time_unit = TimeUnit::try_from(tag)?;
148 let tz_len_bytes: [u8; 2] = data[1..3]
149 .try_into()
150 .ok()
151 .vortex_expect("Verified to have two bytes");
152 let tz_len = u16::from_le_bytes(tz_len_bytes) as usize;
153 if tz_len == 0 {
154 return Ok(TimestampOptions {
155 unit: time_unit,
156 tz: None,
157 });
158 }
159
160 let tz_bytes = &data[3..][..tz_len];
162 let tz: Arc<str> = str::from_utf8(tz_bytes)
163 .map_err(|e| vortex_err!("timezone is not valid utf8 string: {e}"))?
164 .to_string()
165 .into();
166 Ok(TimestampOptions {
167 unit: time_unit,
168 tz: Some(tz),
169 })
170 }
171
172 fn validate_dtype(
173 &self,
174 _metadata: &Self::Metadata,
175 storage_dtype: &DType,
176 ) -> VortexResult<()> {
177 vortex_ensure!(
178 matches!(storage_dtype, DType::Primitive(PType::I64, _)),
179 "Timestamp storage dtype must be i64"
180 );
181 Ok(())
182 }
183
184 fn unpack_native<'a>(
185 &self,
186 metadata: &'a Self::Metadata,
187 _storage_dtype: &'a DType,
188 storage_value: &'a ScalarValue,
189 ) -> VortexResult<Self::NativeValue<'a>> {
190 let ts_value = storage_value.as_primitive().cast::<i64>()?;
191 let tz = metadata.tz.as_ref();
192
193 let (span, value) = match metadata.unit {
194 TimeUnit::Nanoseconds => (
195 Span::new().nanoseconds(ts_value),
196 TimestampValue::Nanoseconds(ts_value, tz),
197 ),
198 TimeUnit::Microseconds => (
199 Span::new().microseconds(ts_value),
200 TimestampValue::Microseconds(ts_value, tz),
201 ),
202 TimeUnit::Milliseconds => (
203 Span::new().milliseconds(ts_value),
204 TimestampValue::Milliseconds(ts_value, tz),
205 ),
206 TimeUnit::Seconds => (
207 Span::new().seconds(ts_value),
208 TimestampValue::Seconds(ts_value, tz),
209 ),
210 TimeUnit::Days => vortex_bail!("Timestamp does not support Days time unit"),
211 };
212
213 let ts = jiff::Timestamp::UNIX_EPOCH
215 .checked_add(span)
216 .map_err(|e| vortex_err!("Invalid timestamp scalar: {}", e))?;
217
218 if let Some(tz) = tz {
219 ts.in_tz(tz.as_ref())
220 .map_err(|e| vortex_err!("Invalid timezone for timestamp scalar: {}", e))?;
221 }
222
223 Ok(value)
224 }
225}