use serde::{Deserialize, Serialize};
use crate::error::{Error, Result};
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub enum MissedFiresPolicy {
#[default]
Skip,
FireOnce,
FireAll { max_catchup: u32 },
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum RepeatPattern {
Cron {
expression: String,
tz: Option<String>,
},
Every { interval_ms: u64 },
}
impl RepeatPattern {
pub fn signature(&self) -> String {
match self {
RepeatPattern::Cron { expression, tz } => {
let tz_str = tz.as_deref().unwrap_or("UTC");
format!("cron:{expression}:{tz_str}")
}
RepeatPattern::Every { interval_ms } => format!("every:{interval_ms}"),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RepeatableSpec<T> {
pub key: String,
pub job_name: String,
pub pattern: RepeatPattern,
pub payload: T,
pub limit: Option<u64>,
pub start_after_ms: Option<u64>,
pub end_before_ms: Option<u64>,
#[serde(default, skip_serializing_if = "is_default_missed_fires_policy")]
pub missed_fires: MissedFiresPolicy,
}
fn is_default_missed_fires_policy(p: &MissedFiresPolicy) -> bool {
matches!(p, MissedFiresPolicy::Skip)
}
impl<T> RepeatableSpec<T> {
pub fn new(job_name: impl Into<String>, pattern: RepeatPattern, payload: T) -> Self {
Self {
key: String::new(),
job_name: job_name.into(),
pattern,
payload,
limit: None,
start_after_ms: None,
end_before_ms: None,
missed_fires: MissedFiresPolicy::default(),
}
}
pub fn with_key(mut self, key: impl Into<String>) -> Self {
self.key = key.into();
self
}
pub fn with_limit(mut self, limit: u64) -> Self {
self.limit = Some(limit);
self
}
pub fn with_start_after_ms(mut self, ms: u64) -> Self {
self.start_after_ms = Some(ms);
self
}
pub fn with_end_before_ms(mut self, ms: u64) -> Self {
self.end_before_ms = Some(ms);
self
}
pub fn with_missed_fires(mut self, policy: MissedFiresPolicy) -> Self {
self.missed_fires = policy;
self
}
pub fn resolved_key(&self) -> String {
if self.key.is_empty() {
format!("{}::{}", self.job_name, self.pattern.signature())
} else {
self.key.clone()
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RepeatableMeta {
pub key: String,
pub job_name: String,
pub pattern: RepeatPattern,
pub next_fire_ms: u64,
pub limit: Option<u64>,
pub start_after_ms: Option<u64>,
pub end_before_ms: Option<u64>,
#[serde(default, skip_serializing_if = "is_default_missed_fires_policy")]
pub missed_fires: MissedFiresPolicy,
}
#[derive(Serialize, Deserialize)]
pub(crate) struct StoredSpec {
pub key: String,
pub job_name: String,
pub pattern: RepeatPattern,
#[serde(with = "serde_bytes")]
pub payload: Vec<u8>,
pub limit: Option<u64>,
pub start_after_ms: Option<u64>,
pub end_before_ms: Option<u64>,
#[serde(default)]
pub fired: u64,
#[serde(default, skip_serializing_if = "is_default_missed_fires_policy")]
pub missed_fires: MissedFiresPolicy,
}
mod serde_bytes {
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S: Serializer>(bytes: &[u8], s: S) -> Result<S::Ok, S::Error> {
s.serialize_bytes(bytes)
}
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Vec<u8>, D::Error> {
let v: serde_bytes_helper::ByteBuf = Deserialize::deserialize(d)?;
Ok(v.0)
}
mod serde_bytes_helper {
use serde::de::{Deserialize, Deserializer, Visitor};
use std::fmt;
pub struct ByteBuf(pub Vec<u8>);
impl<'de> Deserialize<'de> for ByteBuf {
fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
struct V;
impl<'de> Visitor<'de> for V {
type Value = ByteBuf;
fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("byte array")
}
fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E> {
Ok(ByteBuf(v.to_vec()))
}
fn visit_byte_buf<E>(self, v: Vec<u8>) -> Result<Self::Value, E> {
Ok(ByteBuf(v))
}
fn visit_seq<A: serde::de::SeqAccess<'de>>(
self,
mut seq: A,
) -> Result<Self::Value, A::Error> {
let mut out: Vec<u8> = Vec::new();
while let Some(b) = seq.next_element::<u8>()? {
out.push(b);
}
Ok(ByteBuf(out))
}
}
d.deserialize_byte_buf(V)
}
}
}
}
pub(crate) fn next_fire_after(
pattern: &RepeatPattern,
now_ms: u64,
start_after_ms: Option<u64>,
) -> Result<Option<u64>> {
let floor = start_after_ms.unwrap_or(0);
let from_ms = now_ms.max(floor);
let raw = match pattern {
RepeatPattern::Cron { expression, tz } => {
next_cron_after(expression, tz.as_deref(), from_ms)?
}
RepeatPattern::Every { interval_ms } => {
if *interval_ms == 0 {
return Ok(None);
}
Some(from_ms.saturating_add(*interval_ms))
}
};
Ok(raw)
}
const FIRST_FUTURE_FIRE_ITERATION_CAP: u32 = 100_000;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum AdvanceOutcome {
Future(u64),
Exhausted,
CapReached,
}
pub(crate) fn first_future_fire(
pattern: &RepeatPattern,
now_ms: u64,
fire_at_ms: u64,
start_after_ms: Option<u64>,
) -> Result<AdvanceOutcome> {
let mut at = fire_at_ms;
for _ in 0..FIRST_FUTURE_FIRE_ITERATION_CAP {
match next_fire_after(pattern, at, start_after_ms)? {
Some(next) => {
if next > now_ms {
return Ok(AdvanceOutcome::Future(next));
}
if next <= at {
return Ok(AdvanceOutcome::Future(next));
}
at = next;
}
None => return Ok(AdvanceOutcome::Exhausted),
}
}
Ok(AdvanceOutcome::CapReached)
}
#[derive(Clone, Copy)]
pub(crate) enum TzKind {
Fixed(chrono::FixedOffset),
Named(chrono_tz::Tz),
}
fn next_cron_after(expression: &str, tz: Option<&str>, from_ms: u64) -> Result<Option<u64>> {
use chrono::{TimeZone, Utc};
use croner::parser::{CronParser, Seconds};
let cron = CronParser::builder()
.seconds(Seconds::Optional)
.build()
.parse(expression)
.map_err(|e| Error::Config(format!("invalid cron expression {expression:?}: {e}")))?;
let from_secs = (from_ms / 1000) as i64;
let from_nsec = ((from_ms % 1000) * 1_000_000) as u32;
let utc = Utc
.timestamp_opt(from_secs, from_nsec)
.single()
.ok_or_else(|| Error::Config("invalid from-time".into()))?;
let next_ms = match parse_tz(tz)? {
TzKind::Fixed(off) => next_in_zone(&cron, utc, off)?,
TzKind::Named(zone) => next_in_zone(&cron, utc, zone)?,
};
Ok(Some(next_ms))
}
fn next_in_zone<Tz: chrono::TimeZone>(
cron: &croner::Cron,
from_utc: chrono::DateTime<chrono::Utc>,
tz: Tz,
) -> Result<u64> {
use chrono::Utc;
let from_local = from_utc.with_timezone(&tz);
let next_local = cron
.find_next_occurrence(&from_local, false)
.map_err(|e| Error::Config(format!("cron scheduling failed: {e}")))?;
Ok(next_local.with_timezone(&Utc).timestamp_millis().max(0) as u64)
}
fn parse_tz(tz: Option<&str>) -> Result<TzKind> {
use chrono::FixedOffset;
let raw = match tz {
None => return Ok(TzKind::Fixed(FixedOffset::east_opt(0).expect("UTC"))),
Some(s) => s,
};
let trimmed = raw.trim();
if trimmed.is_empty() || trimmed.eq_ignore_ascii_case("UTC") || trimmed == "Z" {
return Ok(TzKind::Fixed(FixedOffset::east_opt(0).expect("UTC")));
}
if trimmed.starts_with('+') || trimmed.starts_with('-') {
return parse_fixed_offset(trimmed, raw).map(TzKind::Fixed);
}
trimmed
.parse::<chrono_tz::Tz>()
.map(TzKind::Named)
.map_err(|_| {
Error::Config(format!(
"unsupported timezone {raw:?}: expected UTC, +HH:MM, or any IANA timezone name (e.g. \"America/New_York\")"
))
})
}
fn parse_fixed_offset(trimmed: &str, raw: &str) -> Result<chrono::FixedOffset> {
use chrono::FixedOffset;
let (sign, rest) = match trimmed.as_bytes().first() {
Some(b'+') => (1, &trimmed[1..]),
Some(b'-') => (-1, &trimmed[1..]),
_ => unreachable!("parse_fixed_offset called without leading sign"),
};
let (hh, mm) = if let Some((h, m)) = rest.split_once(':') {
(h, m)
} else if rest.len() == 4 {
(&rest[..2], &rest[2..])
} else {
return Err(Error::Config(format!(
"malformed timezone offset {raw:?}: expected +HH:MM or +HHMM"
)));
};
let h: i32 = hh
.parse()
.map_err(|_| Error::Config(format!("malformed timezone offset {raw:?}")))?;
let m: i32 = mm
.parse()
.map_err(|_| Error::Config(format!("malformed timezone offset {raw:?}")))?;
let total = sign * (h * 3600 + m * 60);
FixedOffset::east_opt(total)
.ok_or_else(|| Error::Config(format!("timezone offset {raw:?} out of range")))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn signature_distinguishes_cron_and_every() {
let a = RepeatPattern::Cron {
expression: "* * * * *".into(),
tz: None,
};
let b = RepeatPattern::Every { interval_ms: 1000 };
assert_ne!(a.signature(), b.signature());
}
#[test]
fn signature_includes_tz() {
let a = RepeatPattern::Cron {
expression: "0 0 * * *".into(),
tz: None,
};
let b = RepeatPattern::Cron {
expression: "0 0 * * *".into(),
tz: Some("America/New_York".into()),
};
assert_ne!(a.signature(), b.signature());
}
#[test]
fn resolved_key_uses_supplied_when_present() {
let s = RepeatableSpec::<u32> {
key: "explicit".into(),
job_name: "my-job".into(),
pattern: RepeatPattern::Every { interval_ms: 1000 },
payload: 0,
limit: None,
start_after_ms: None,
end_before_ms: None,
missed_fires: MissedFiresPolicy::Skip,
};
assert_eq!(s.resolved_key(), "explicit");
}
#[test]
fn resolved_key_falls_back_to_signature() {
let s = RepeatableSpec::<u32> {
key: String::new(),
job_name: "my-job".into(),
pattern: RepeatPattern::Every { interval_ms: 1000 },
payload: 0,
limit: None,
start_after_ms: None,
end_before_ms: None,
missed_fires: MissedFiresPolicy::Skip,
};
assert_eq!(s.resolved_key(), "my-job::every:1000");
}
#[test]
fn next_fire_after_every() {
let pat = RepeatPattern::Every { interval_ms: 500 };
let next = next_fire_after(&pat, 1000, None).expect("ok");
assert_eq!(next, Some(1500));
}
#[test]
fn next_fire_after_every_zero_returns_none() {
let pat = RepeatPattern::Every { interval_ms: 0 };
assert_eq!(next_fire_after(&pat, 1000, None).unwrap(), None);
}
#[test]
fn next_fire_after_every_respects_start_after() {
let pat = RepeatPattern::Every { interval_ms: 100 };
let next = next_fire_after(&pat, 1000, Some(5_000)).unwrap();
assert_eq!(next, Some(5_100));
}
#[test]
fn next_fire_after_cron_minute_resolution() {
let pat = RepeatPattern::Cron {
expression: "* * * * *".into(),
tz: None,
};
let from_ms: u64 = 1_893_456_030_000;
let next = next_fire_after(&pat, from_ms, None)
.expect("ok")
.expect("some");
assert_eq!(next, 1_893_456_060_000_u64);
}
#[test]
fn next_fire_after_cron_invalid_expression_errors() {
let pat = RepeatPattern::Cron {
expression: "not a cron".into(),
tz: None,
};
let res = next_fire_after(&pat, 0, None);
assert!(res.is_err(), "got {res:?}");
}
#[test]
fn next_fire_after_cron_fixed_offset_still_works() {
let pat = RepeatPattern::Cron {
expression: "0 2 * * *".into(),
tz: Some("+05:30".into()),
};
let from_ms: u64 = 1_768_435_200_000; let next = next_fire_after(&pat, from_ms, None).unwrap().unwrap();
assert_eq!(next, 1_768_509_000_000_u64);
}
#[test]
fn next_fire_after_cron_named_iana_zone() {
let pat = RepeatPattern::Cron {
expression: "0 2 * * *".into(),
tz: Some("America/New_York".into()),
};
let from_ms: u64 = 1_768_435_200_000; let next = next_fire_after(&pat, from_ms, None).unwrap().unwrap();
assert_eq!(next, 1_768_460_400_000_u64); }
#[test]
fn next_fire_after_cron_iana_zone_dst_transition() {
let pat = RepeatPattern::Cron {
expression: "0 2 * * *".into(),
tz: Some("America/New_York".into()),
};
let from_ms: u64 = 1_773_014_400_000; let next = next_fire_after(&pat, from_ms, None).unwrap().unwrap();
assert_eq!(next, 1_773_036_000_000_u64); }
#[test]
fn next_fire_after_cron_iana_alias() {
let pat = RepeatPattern::Cron {
expression: "0 2 * * *".into(),
tz: Some("US/Eastern".into()),
};
let from_ms: u64 = 1_768_435_200_000; let next = next_fire_after(&pat, from_ms, None).unwrap().unwrap();
assert_eq!(next, 1_768_460_400_000_u64); }
#[test]
fn next_fire_after_cron_invalid_iana_zone_errors() {
let pat = RepeatPattern::Cron {
expression: "0 2 * * *".into(),
tz: Some("Mars/Olympus_Mons".into()),
};
let res = next_fire_after(&pat, 0, None);
assert!(res.is_err(), "got {res:?}");
let msg = format!("{}", res.unwrap_err());
assert!(msg.contains("UTC"), "msg={msg}");
assert!(msg.contains("IANA"), "msg={msg}");
}
#[test]
fn next_fire_after_cron_europe_london_bst() {
let pat = RepeatPattern::Cron {
expression: "0 9 * * *".into(),
tz: Some("Europe/London".into()),
};
let from_ms: u64 = 1_781_481_600_000; let next = next_fire_after(&pat, from_ms, None).unwrap().unwrap();
assert_eq!(next, 1_781_510_400_000_u64); }
#[test]
fn next_fire_after_cron_iana_dst_overlap_picks_consistent_instant() {
let pat = RepeatPattern::Cron {
expression: "30 1 * * *".into(),
tz: Some("America/New_York".into()),
};
let from_ms: u64 = 1_793_505_600_000;
let next = next_fire_after(&pat, from_ms, None).unwrap().unwrap();
let first_edt: u64 = 1_793_511_000_000; let second_est: u64 = 1_793_514_600_000;
assert!(
next == first_edt || next == second_est,
"expected {first_edt} or {second_est}, got {next}",
);
assert_eq!(next, first_edt);
}
#[test]
fn next_fire_after_cron_iana_dst_gap_skips_to_valid_local_time() {
use chrono::{TimeZone, Timelike};
let pat = RepeatPattern::Cron {
expression: "30 2 * * *".into(),
tz: Some("America/New_York".into()),
};
let from_ms: u64 = 1_772_949_600_000;
let next = next_fire_after(&pat, from_ms, None).unwrap().unwrap();
assert!(next > from_ms, "next={next} should be after from={from_ms}");
let zone: chrono_tz::Tz = "America/New_York".parse().unwrap();
let local = zone
.timestamp_millis_opt(next as i64)
.single()
.expect("local time exists");
let bad_date = chrono::NaiveDate::from_ymd_opt(2026, 3, 8).unwrap();
assert!(
!(local.date_naive() == bad_date
&& local.time().hour() == 2
&& local.time().minute() == 30),
"got nonexistent local time {local}",
);
}
#[test]
fn builder_sets_defaults_and_overrides() {
let s: RepeatableSpec<u32> =
RepeatableSpec::new("my-job", RepeatPattern::Every { interval_ms: 1000 }, 7);
assert!(s.key.is_empty());
assert_eq!(s.job_name, "my-job");
assert_eq!(s.payload, 7);
assert!(s.limit.is_none());
assert!(s.start_after_ms.is_none());
assert!(s.end_before_ms.is_none());
assert_eq!(s.missed_fires, MissedFiresPolicy::Skip);
let s2: RepeatableSpec<u32> =
RepeatableSpec::new("another", RepeatPattern::Every { interval_ms: 250 }, 42)
.with_key("explicit-key")
.with_limit(3)
.with_start_after_ms(1_000)
.with_end_before_ms(2_000)
.with_missed_fires(MissedFiresPolicy::FireAll { max_catchup: 5 });
assert_eq!(s2.key, "explicit-key");
assert_eq!(s2.limit, Some(3));
assert_eq!(s2.start_after_ms, Some(1_000));
assert_eq!(s2.end_before_ms, Some(2_000));
assert_eq!(
s2.missed_fires,
MissedFiresPolicy::FireAll { max_catchup: 5 }
);
}
#[test]
fn missed_fires_policy_default_is_skip() {
let p: MissedFiresPolicy = Default::default();
assert_eq!(p, MissedFiresPolicy::Skip);
}
#[test]
fn missed_fires_policy_msgpack_round_trip() {
for p in [
MissedFiresPolicy::Skip,
MissedFiresPolicy::FireOnce,
MissedFiresPolicy::FireAll { max_catchup: 7 },
] {
let bytes = rmp_serde::to_vec(&p).expect("encode");
let decoded: MissedFiresPolicy = rmp_serde::from_slice(&bytes).expect("decode");
assert_eq!(decoded, p);
}
}
#[test]
fn first_future_fire_every_skips_to_strictly_after_now() {
let pat = RepeatPattern::Every {
interval_ms: 60_000,
};
let next = first_future_fire(&pat, 1_300_000, 1_000_000, None).unwrap();
assert_eq!(next, AdvanceOutcome::Future(1_360_000));
}
#[test]
fn first_future_fire_every_zero_returns_exhausted() {
let pat = RepeatPattern::Every { interval_ms: 0 };
assert_eq!(
first_future_fire(&pat, 1_000, 0, None).unwrap(),
AdvanceOutcome::Exhausted,
);
}
#[test]
fn catchup_replays_all_fires_across_nyc_fall_back_dst() {
let pat = RepeatPattern::Cron {
expression: "0 * * * *".into(),
tz: Some("America/New_York".into()),
};
let anchor_ms: u64 = 1_793_500_800_000;
let now_ms: u64 = 1_793_590_800_000;
let mut at = anchor_ms;
let mut fires: Vec<u64> = Vec::new();
for _ in 0..1_000 {
if at > now_ms {
break;
}
fires.push(at);
match next_fire_after(&pat, at, None).expect("pattern advance") {
Some(n) if n > at => at = n,
_ => break,
}
}
assert_eq!(
fires.len(),
25,
"expected 25 missed top-of-hour fires across the NYC fall-back day, got {} ({:?})",
fires.len(),
fires,
);
assert_eq!(fires[0], anchor_ms);
assert!(*fires.last().unwrap() <= now_ms);
}
#[test]
fn first_future_fire_when_already_future_returns_first_step() {
let pat = RepeatPattern::Every {
interval_ms: 60_000,
};
let next = first_future_fire(&pat, 100, 1_000, None).unwrap();
assert_eq!(next, AdvanceOutcome::Future(61_000)); }
#[derive(Serialize)]
struct LegacyStoredSpec {
key: String,
job_name: String,
pattern: RepeatPattern,
#[serde(with = "serde_bytes")]
payload: Vec<u8>,
limit: Option<u64>,
start_after_ms: Option<u64>,
end_before_ms: Option<u64>,
fired: u64,
}
#[test]
fn legacy_storedspec_decodes_with_default_policy() {
let legacy = LegacyStoredSpec {
key: "legacy-key".into(),
job_name: "legacy-job".into(),
pattern: RepeatPattern::Every { interval_ms: 1_000 },
payload: vec![0xC0], limit: None,
start_after_ms: None,
end_before_ms: Some(9_999_999_999),
fired: 42,
};
let bytes = rmp_serde::to_vec(&legacy).expect("encode legacy");
let decoded: StoredSpec = rmp_serde::from_slice(&bytes).expect("decode into new shape");
assert_eq!(decoded.key, "legacy-key");
assert_eq!(decoded.job_name, "legacy-job");
assert_eq!(decoded.fired, 42);
assert_eq!(decoded.missed_fires, MissedFiresPolicy::Skip);
}
#[test]
fn new_storedspec_with_default_policy_omits_field_on_wire() {
let new_default = StoredSpec {
key: "k".into(),
job_name: "j".into(),
pattern: RepeatPattern::Every { interval_ms: 1_000 },
payload: vec![0xC0],
limit: None,
start_after_ms: None,
end_before_ms: None,
fired: 0,
missed_fires: MissedFiresPolicy::Skip,
};
let legacy_eq = LegacyStoredSpec {
key: "k".into(),
job_name: "j".into(),
pattern: RepeatPattern::Every { interval_ms: 1_000 },
payload: vec![0xC0],
limit: None,
start_after_ms: None,
end_before_ms: None,
fired: 0,
};
let new_bytes = rmp_serde::to_vec(&new_default).expect("encode new");
let legacy_bytes = rmp_serde::to_vec(&legacy_eq).expect("encode legacy");
assert_eq!(
new_bytes, legacy_bytes,
"default-policy StoredSpec must encode identically to the legacy 8-field shape",
);
}
#[test]
fn new_storedspec_with_non_default_policy_round_trips() {
let s = StoredSpec {
key: "k".into(),
job_name: "j".into(),
pattern: RepeatPattern::Every { interval_ms: 1_000 },
payload: vec![0xC0],
limit: None,
start_after_ms: None,
end_before_ms: None,
fired: 0,
missed_fires: MissedFiresPolicy::FireAll { max_catchup: 12 },
};
let bytes = rmp_serde::to_vec(&s).expect("encode");
let decoded: StoredSpec = rmp_serde::from_slice(&bytes).expect("decode");
assert_eq!(
decoded.missed_fires,
MissedFiresPolicy::FireAll { max_catchup: 12 }
);
}
}