use std::fmt;
use std::sync::Arc;
use jiff::Span;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_error::vortex_err;
use vortex_error::vortex_panic;
use crate::dtype::DType;
use crate::dtype::Nullability;
use crate::dtype::PType;
use crate::dtype::extension::ExtDType;
use crate::dtype::extension::ExtId;
use crate::dtype::extension::ExtVTable;
use crate::extension::datetime::TimeUnit;
use crate::scalar::ScalarValue;
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct Timestamp;
impl Timestamp {
pub fn new(time_unit: TimeUnit, nullability: Nullability) -> ExtDType<Self> {
Self::new_with_tz(time_unit, None, nullability)
}
pub fn new_with_tz(
time_unit: TimeUnit,
timezone: Option<Arc<str>>,
nullability: Nullability,
) -> ExtDType<Self> {
ExtDType::try_new(
TimestampOptions {
unit: time_unit,
tz: timezone,
},
DType::Primitive(PType::I64, nullability),
)
.vortex_expect("failed to create timestamp dtype")
}
pub fn new_with_options(options: TimestampOptions, nullability: Nullability) -> ExtDType<Self> {
ExtDType::try_new(options, DType::Primitive(PType::I64, nullability))
.vortex_expect("failed to create timestamp dtype")
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct TimestampOptions {
pub unit: TimeUnit,
pub tz: Option<Arc<str>>,
}
impl fmt::Display for TimestampOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.tz {
Some(tz) => write!(f, "{}, tz={}", self.unit, tz),
None => write!(f, "{}", self.unit),
}
}
}
pub enum TimestampValue<'a> {
Seconds(i64, Option<&'a Arc<str>>),
Milliseconds(i64, Option<&'a Arc<str>>),
Microseconds(i64, Option<&'a Arc<str>>),
Nanoseconds(i64, Option<&'a Arc<str>>),
}
impl fmt::Display for TimestampValue<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let (span, tz) = match self {
TimestampValue::Seconds(v, tz) => (Span::new().seconds(*v), *tz),
TimestampValue::Milliseconds(v, tz) => (Span::new().milliseconds(*v), *tz),
TimestampValue::Microseconds(v, tz) => (Span::new().microseconds(*v), *tz),
TimestampValue::Nanoseconds(v, tz) => (Span::new().nanoseconds(*v), *tz),
};
let ts = jiff::Timestamp::UNIX_EPOCH + span;
match tz {
None => write!(f, "{ts}"),
Some(tz) => {
let adjusted_ts = ts.in_tz(tz.as_ref()).vortex_expect("unknown timezone");
write!(f, "{adjusted_ts}",)
}
}
}
}
impl ExtVTable for Timestamp {
type Metadata = TimestampOptions;
type NativeValue<'a> = TimestampValue<'a>;
fn id(&self) -> ExtId {
ExtId::new_ref("vortex.timestamp")
}
fn serialize_metadata(&self, metadata: &Self::Metadata) -> VortexResult<Vec<u8>> {
let mut bytes = Vec::with_capacity(4);
let unit_tag: u8 = metadata.unit.into();
bytes.push(unit_tag);
match &metadata.tz {
None => bytes.extend_from_slice(0u16.to_le_bytes().as_slice()),
Some(tz) => {
let tz_bytes = tz.as_bytes();
let tz_len = u16::try_from(tz_bytes.len())
.unwrap_or_else(|err| vortex_panic!("tz did not fit in u16: {}", err));
bytes.extend_from_slice(tz_len.to_le_bytes().as_slice());
bytes.extend_from_slice(tz_bytes);
}
}
Ok(bytes)
}
fn deserialize_metadata(&self, data: &[u8]) -> VortexResult<Self::Metadata> {
vortex_ensure!(data.len() >= 3);
let tag = data[0];
let time_unit = TimeUnit::try_from(tag)?;
let tz_len_bytes: [u8; 2] = data[1..3]
.try_into()
.ok()
.vortex_expect("Verified to have two bytes");
let tz_len = u16::from_le_bytes(tz_len_bytes) as usize;
if tz_len == 0 {
return Ok(TimestampOptions {
unit: time_unit,
tz: None,
});
}
let tz_bytes = &data[3..][..tz_len];
let tz: Arc<str> = str::from_utf8(tz_bytes)
.map_err(|e| vortex_err!("timezone is not valid utf8 string: {e}"))?
.to_string()
.into();
Ok(TimestampOptions {
unit: time_unit,
tz: Some(tz),
})
}
fn validate_dtype(
&self,
_metadata: &Self::Metadata,
storage_dtype: &DType,
) -> VortexResult<()> {
vortex_ensure!(
matches!(storage_dtype, DType::Primitive(PType::I64, _)),
"Timestamp storage dtype must be i64"
);
Ok(())
}
fn unpack_native<'a>(
&self,
metadata: &'a Self::Metadata,
_storage_dtype: &'a DType,
storage_value: &'a ScalarValue,
) -> VortexResult<Self::NativeValue<'a>> {
let ts_value = storage_value.as_primitive().cast::<i64>()?;
let tz = metadata.tz.as_ref();
let (span, value) = match metadata.unit {
TimeUnit::Nanoseconds => (
Span::new().nanoseconds(ts_value),
TimestampValue::Nanoseconds(ts_value, tz),
),
TimeUnit::Microseconds => (
Span::new().microseconds(ts_value),
TimestampValue::Microseconds(ts_value, tz),
),
TimeUnit::Milliseconds => (
Span::new().milliseconds(ts_value),
TimestampValue::Milliseconds(ts_value, tz),
),
TimeUnit::Seconds => (
Span::new().seconds(ts_value),
TimestampValue::Seconds(ts_value, tz),
),
TimeUnit::Days => vortex_bail!("Timestamp does not support Days time unit"),
};
let ts = jiff::Timestamp::UNIX_EPOCH
.checked_add(span)
.map_err(|e| vortex_err!("Invalid timestamp scalar: {}", e))?;
if let Some(tz) = tz {
ts.in_tz(tz.as_ref())
.map_err(|e| vortex_err!("Invalid timezone for timestamp scalar: {}", e))?;
}
Ok(value)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use vortex_error::VortexResult;
use crate::dtype::DType;
use crate::dtype::Nullability::Nullable;
use crate::extension::datetime::TimeUnit;
use crate::extension::datetime::Timestamp;
use crate::scalar::PValue;
use crate::scalar::Scalar;
use crate::scalar::ScalarValue;
#[test]
fn validate_timestamp_scalar() -> VortexResult<()> {
let dtype = DType::Extension(Timestamp::new(TimeUnit::Seconds, Nullable).erased());
Scalar::try_new(dtype, Some(ScalarValue::Primitive(PValue::I64(0))))?;
Ok(())
}
#[cfg_attr(miri, ignore)]
#[test]
fn reject_timestamp_with_invalid_timezone() {
let dtype = DType::Extension(
Timestamp::new_with_tz(
TimeUnit::Seconds,
Some(Arc::from("Not/A/Timezone")),
Nullable,
)
.erased(),
);
let result = Scalar::try_new(dtype, Some(ScalarValue::Primitive(PValue::I64(0))));
assert!(result.is_err());
}
#[cfg_attr(miri, ignore)]
#[test]
fn display_timestamp_scalar() {
let local_dtype = DType::Extension(Timestamp::new(TimeUnit::Seconds, Nullable).erased());
let scalar = Scalar::new(local_dtype, Some(ScalarValue::Primitive(PValue::I64(0))));
assert_eq!(format!("{}", scalar.as_extension()), "1970-01-01T00:00:00Z");
let zoned_dtype = DType::Extension(
Timestamp::new_with_tz(
TimeUnit::Seconds,
Some(Arc::from("America/New_York")),
Nullable,
)
.erased(),
);
let scalar = Scalar::new(zoned_dtype, Some(ScalarValue::Primitive(PValue::I64(0))));
assert_eq!(
format!("{}", scalar.as_extension()),
"1969-12-31T19:00:00-05:00[America/New_York]"
);
}
}