use std::sync::LazyLock;
use ahash::AHashMap;
use chrono::{DateTime, LocalResult, NaiveTime, TimeZone};
use chrono_tz::{America::New_York, Tz};
use databento::dbn;
use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_DAY};
use ustr::Ustr;
static DEFAULT_CONFIG: LazyLock<DatabentoDecodeConfig> =
LazyLock::new(DatabentoDecodeConfig::default);
fn opra_default_time() -> NaiveTime {
NaiveTime::from_hms_opt(16, 0, 0).expect("16:00:00 is a valid time")
}
#[derive(Clone, Debug)]
pub struct OptionExpirationRule {
pub timezone: Tz,
pub default_time: NaiveTime,
pub overrides: AHashMap<Ustr, NaiveTime>,
}
impl OptionExpirationRule {
#[must_use]
pub fn opra() -> Self {
Self {
timezone: New_York,
default_time: opra_default_time(),
overrides: AHashMap::new(),
}
}
fn time_for(&self, underlying: Ustr) -> NaiveTime {
self.overrides
.get(&underlying)
.copied()
.unwrap_or(self.default_time)
}
}
#[derive(Clone, Debug)]
pub struct DatabentoDecodeConfig {
pub option_expiration: AHashMap<dbn::Dataset, OptionExpirationRule>,
}
impl Default for DatabentoDecodeConfig {
fn default() -> Self {
let mut option_expiration = AHashMap::new();
option_expiration.insert(dbn::Dataset::OpraPillar, OptionExpirationRule::opra());
Self { option_expiration }
}
}
#[must_use]
pub fn corrected_option_expiration(
expiration: UnixNanos,
underlying: Ustr,
dataset: Option<dbn::Dataset>,
config: Option<&DatabentoDecodeConfig>,
) -> UnixNanos {
let Some(dataset) = dataset else {
return expiration;
};
let config = config.unwrap_or(&DEFAULT_CONFIG);
let Some(rule) = config.option_expiration.get(&dataset) else {
return expiration;
};
let raw = expiration.as_u64();
if raw == 0 || !raw.is_multiple_of(NANOSECONDS_IN_DAY) {
return expiration;
}
let Ok(raw) = i64::try_from(raw) else {
return expiration;
};
let date = DateTime::from_timestamp_nanos(raw).date_naive();
let corrected = match rule
.timezone
.from_local_datetime(&date.and_time(rule.time_for(underlying)))
{
LocalResult::Single(dt) => dt,
LocalResult::Ambiguous(dt, _) => dt,
LocalResult::None => return expiration,
};
match corrected.timestamp_nanos_opt() {
Some(ns) if ns >= 0 => UnixNanos::from(ns as u64),
_ => expiration,
}
}
#[cfg(test)]
mod tests {
use chrono::NaiveTime;
use databento::dbn;
use nautilus_core::UnixNanos;
use rstest::rstest;
use ustr::Ustr;
use super::{DatabentoDecodeConfig, corrected_option_expiration};
const EDT_MIDNIGHT_UTC: u64 = 1_782_691_200_000_000_000; const EDT_1600_ET: u64 = 1_782_763_200_000_000_000; const EST_MIDNIGHT_UTC: u64 = 1_768_521_600_000_000_000; const EST_1600_ET: u64 = 1_768_597_200_000_000_000; const EDT_0930_ET: u64 = 1_782_739_800_000_000_000; const INTRADAY_UTC: u64 = 1_789_738_200_000_000_000;
fn config_with_opra_override(underlying: &str, time: NaiveTime) -> DatabentoDecodeConfig {
let mut config = DatabentoDecodeConfig::default();
config
.option_expiration
.get_mut(&dbn::Dataset::OpraPillar)
.unwrap()
.overrides
.insert(Ustr::from(underlying), time);
config
}
#[rstest]
fn test_opra_midnight_corrected_to_1600_et_during_edt() {
let result = corrected_option_expiration(
UnixNanos::from(EDT_MIDNIGHT_UTC),
Ustr::from("SPX"),
Some(dbn::Dataset::OpraPillar),
None,
);
assert_eq!(result.as_u64(), EDT_1600_ET);
}
#[rstest]
fn test_opra_midnight_corrected_to_1600_et_during_est() {
let result = corrected_option_expiration(
UnixNanos::from(EST_MIDNIGHT_UTC),
Ustr::from("SPX"),
Some(dbn::Dataset::OpraPillar),
None,
);
assert_eq!(result.as_u64(), EST_1600_ET);
}
#[rstest]
fn test_opra_override_applied_for_matching_underlying() {
let config = config_with_opra_override("XSP", NaiveTime::from_hms_opt(9, 30, 0).unwrap());
let result = corrected_option_expiration(
UnixNanos::from(EDT_MIDNIGHT_UTC),
Ustr::from("XSP"),
Some(dbn::Dataset::OpraPillar),
Some(&config),
);
assert_eq!(result.as_u64(), EDT_0930_ET);
}
#[rstest]
fn test_opra_default_used_when_underlying_not_overridden() {
let config = config_with_opra_override("XSP", NaiveTime::from_hms_opt(9, 30, 0).unwrap());
let result = corrected_option_expiration(
UnixNanos::from(EDT_MIDNIGHT_UTC),
Ustr::from("SPX"),
Some(dbn::Dataset::OpraPillar),
Some(&config),
);
assert_eq!(result.as_u64(), EDT_1600_ET);
}
#[rstest]
fn test_opra_intraday_expiration_passes_through() {
let result = corrected_option_expiration(
UnixNanos::from(INTRADAY_UTC),
Ustr::from("SPX"),
Some(dbn::Dataset::OpraPillar),
None,
);
assert_eq!(result.as_u64(), INTRADAY_UTC);
}
#[rstest]
fn test_non_opra_midnight_passes_through() {
let result = corrected_option_expiration(
UnixNanos::from(EDT_MIDNIGHT_UTC),
Ustr::from("ESU6"),
Some(dbn::Dataset::GlbxMdp3),
None,
);
assert_eq!(result.as_u64(), EDT_MIDNIGHT_UTC);
}
#[rstest]
fn test_unknown_dataset_passes_through() {
let result = corrected_option_expiration(
UnixNanos::from(EDT_MIDNIGHT_UTC),
Ustr::from("SPX"),
None,
None,
);
assert_eq!(result.as_u64(), EDT_MIDNIGHT_UTC);
}
#[rstest]
fn test_dataset_without_rule_passes_through() {
let config = DatabentoDecodeConfig {
option_expiration: ahash::AHashMap::new(),
};
let result = corrected_option_expiration(
UnixNanos::from(EDT_MIDNIGHT_UTC),
Ustr::from("SPX"),
Some(dbn::Dataset::OpraPillar),
Some(&config),
);
assert_eq!(result.as_u64(), EDT_MIDNIGHT_UTC);
}
}