Skip to main content

bamboo_server/schedule_app/
store.rs

1use std::cmp::Reverse;
2use std::collections::HashMap;
3use std::io;
4use std::path::{Path, PathBuf};
5
6use chrono::{DateTime, Duration, Utc};
7use serde::{Deserialize, Deserializer, Serialize};
8use tokio::fs;
9use tokio::sync::{Mutex, RwLock};
10use uuid::Uuid;
11
12use super::trigger_engine::{default_trigger_engine, TriggerEngine};
13use bamboo_domain::{
14    MisFirePolicy, OverlapPolicy, ScheduleRunConfig, ScheduleRunRecord, ScheduleRunStatus,
15    ScheduleSpec, ScheduleState, ScheduleTrigger, ScheduleWindow,
16};
17
18fn other_io_error(message: impl Into<String>) -> io::Error {
19    io::Error::new(io::ErrorKind::Other, message.into())
20}
21
22async fn atomic_write_json(path: &Path, bytes: Vec<u8>) -> io::Result<()> {
23    let tmp = path.with_extension(format!("json.tmp.{}", Uuid::new_v4()));
24
25    // Write + fsync to ensure data is on disk before rename.
26    {
27        let mut file = fs::File::create(&tmp).await?;
28        tokio::io::AsyncWriteExt::write_all(&mut file, &bytes).await?;
29        file.sync_all().await?;
30    }
31
32    fs::rename(&tmp, path).await?;
33
34    // fsync parent directory to persist the rename metadata.
35    if let Some(parent) = path.parent() {
36        if let Ok(dir) = fs::File::open(parent).await {
37            let _ = dir.sync_all().await;
38        }
39    }
40
41    Ok(())
42}
43
44/// Remove leftover `.tmp.*` files from a previous interrupted atomic write.
45async fn cleanup_stale_tmp_files(dir: &Path, prefix: &str) {
46    let mut entries = match fs::read_dir(dir).await {
47        Ok(e) => e,
48        Err(_) => return,
49    };
50    while let Ok(Some(entry)) = entries.next_entry().await {
51        if let Some(name) = entry.file_name().to_str() {
52            if name.starts_with(prefix) {
53                tracing::info!("Removing stale temp file: {}", entry.path().display());
54                let _ = fs::remove_file(entry.path()).await;
55            }
56        }
57    }
58}
59
60#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
61pub struct ScheduleEntry {
62    pub id: String,
63    pub name: String,
64    pub enabled: bool,
65    pub trigger: ScheduleTrigger,
66    #[serde(default, skip_serializing_if = "Option::is_none")]
67    pub timezone: Option<String>,
68    #[serde(default, skip_serializing_if = "Option::is_none")]
69    pub start_at: Option<DateTime<Utc>>,
70    #[serde(default, skip_serializing_if = "Option::is_none")]
71    pub end_at: Option<DateTime<Utc>>,
72    #[serde(default)]
73    pub misfire_policy: MisFirePolicy,
74    #[serde(default)]
75    pub overlap_policy: OverlapPolicy,
76    pub created_at: DateTime<Utc>,
77    pub updated_at: DateTime<Utc>,
78    #[serde(default)]
79    pub state: ScheduleState,
80    #[serde(default)]
81    pub run_config: ScheduleRunConfig,
82}
83
84#[derive(Debug, Clone, Deserialize)]
85struct ScheduleEntryCompat {
86    pub id: String,
87    pub name: String,
88    #[serde(default)]
89    pub enabled: bool,
90    #[serde(default)]
91    pub interval_seconds: Option<u64>,
92    #[serde(default, skip_serializing_if = "Option::is_none")]
93    pub trigger: Option<ScheduleTrigger>,
94    #[serde(default, skip_serializing_if = "Option::is_none")]
95    pub timezone: Option<String>,
96    #[serde(default, skip_serializing_if = "Option::is_none")]
97    pub start_at: Option<DateTime<Utc>>,
98    #[serde(default, skip_serializing_if = "Option::is_none")]
99    pub end_at: Option<DateTime<Utc>>,
100    #[serde(default)]
101    pub misfire_policy: MisFirePolicy,
102    #[serde(default)]
103    pub overlap_policy: OverlapPolicy,
104    pub created_at: DateTime<Utc>,
105    pub updated_at: DateTime<Utc>,
106    #[serde(default)]
107    pub state: Option<ScheduleState>,
108    #[serde(default, skip_serializing_if = "Option::is_none")]
109    pub last_run_at: Option<DateTime<Utc>>,
110    #[serde(default, skip_serializing_if = "Option::is_none")]
111    pub next_run_at: Option<DateTime<Utc>>,
112    #[serde(default)]
113    pub queued_run_count: u32,
114    #[serde(default)]
115    pub running_run_count: u32,
116    #[serde(default)]
117    pub run_config: ScheduleRunConfig,
118}
119
120impl ScheduleEntry {
121    fn from_compat(raw: ScheduleEntryCompat) -> Result<Self, String> {
122        let trigger = match raw.trigger.clone() {
123            Some(ScheduleTrigger::Interval {
124                every_seconds,
125                anchor_at,
126            }) => {
127                let every_seconds = raw.interval_seconds.unwrap_or(every_seconds);
128                ScheduleTrigger::Interval {
129                    every_seconds,
130                    anchor_at: anchor_at
131                        .or_else(|| derive_legacy_anchor_at(&raw, every_seconds))
132                        .or(Some(raw.created_at)),
133                }
134            }
135            Some(other) => other,
136            None => {
137                let every_seconds = raw.interval_seconds.ok_or_else(|| {
138                    format!("schedule entry {} missing trigger definition", raw.id)
139                })?;
140                ScheduleTrigger::legacy_interval(
141                    every_seconds,
142                    derive_legacy_anchor_at(&raw, every_seconds).or(Some(raw.created_at)),
143                )
144            }
145        };
146
147        let mut state = raw.state.unwrap_or_else(|| ScheduleState {
148            next_fire_at: raw.next_run_at,
149            last_scheduled_at: raw.last_run_at,
150            queued_run_count: raw.queued_run_count,
151            running_run_count: raw.running_run_count,
152            ..Default::default()
153        });
154        if state.next_fire_at.is_none() {
155            state.next_fire_at = raw.next_run_at;
156        }
157        if state.last_scheduled_at.is_none() {
158            state.last_scheduled_at = raw.last_run_at;
159        }
160
161        Ok(Self {
162            id: raw.id,
163            name: raw.name,
164            enabled: raw.enabled,
165            trigger,
166            timezone: raw.timezone,
167            start_at: raw.start_at,
168            end_at: raw.end_at,
169            misfire_policy: raw.misfire_policy,
170            overlap_policy: raw.overlap_policy,
171            created_at: raw.created_at,
172            updated_at: raw.updated_at,
173            state,
174            run_config: raw.run_config,
175        })
176    }
177
178    pub fn derived_anchor_at(&self) -> Option<DateTime<Utc>> {
179        let every_seconds = interval_seconds_from_trigger(&self.trigger)?;
180        if let ScheduleTrigger::Interval {
181            anchor_at: Some(anchor_at),
182            ..
183        } = &self.trigger
184        {
185            return Some(*anchor_at);
186        }
187        self.state
188            .last_scheduled_at
189            .or_else(|| {
190                self.state
191                    .next_fire_at
192                    .map(|next| next - Duration::seconds(every_seconds as i64))
193            })
194            .or(Some(self.created_at))
195    }
196
197    pub fn to_schedule_spec(&self) -> ScheduleSpec {
198        ScheduleSpec {
199            id: self.id.clone(),
200            name: self.name.clone(),
201            enabled: self.enabled,
202            trigger: self.trigger.clone(),
203            timezone: self.timezone.clone(),
204            start_at: self.start_at,
205            end_at: self.end_at,
206            misfire_policy: self.misfire_policy,
207            overlap_policy: self.overlap_policy,
208            run_config: self.run_config.clone(),
209            created_at: self.created_at,
210            updated_at: self.updated_at,
211        }
212    }
213
214    pub fn to_schedule_state(&self) -> ScheduleState {
215        self.state.clone()
216    }
217}
218
219impl<'de> Deserialize<'de> for ScheduleEntry {
220    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
221    where
222        D: Deserializer<'de>,
223    {
224        let raw = ScheduleEntryCompat::deserialize(deserializer)?;
225        ScheduleEntry::from_compat(raw).map_err(serde::de::Error::custom)
226    }
227}
228
229fn derive_legacy_anchor_at(raw: &ScheduleEntryCompat, every_seconds: u64) -> Option<DateTime<Utc>> {
230    if let Some(ScheduleTrigger::Interval {
231        anchor_at: Some(anchor_at),
232        ..
233    }) = raw.trigger.as_ref()
234    {
235        return Some(*anchor_at);
236    }
237
238    raw.state
239        .as_ref()
240        .and_then(|state| state.last_scheduled_at)
241        .or(raw.last_run_at)
242        .or_else(|| {
243            raw.state
244                .as_ref()
245                .and_then(|state| state.next_fire_at)
246                .or(raw.next_run_at)
247                .map(|next| next - Duration::seconds(every_seconds as i64))
248        })
249}
250
251fn interval_seconds_from_trigger(trigger: &ScheduleTrigger) -> Option<u64> {
252    match trigger {
253        ScheduleTrigger::Interval { every_seconds, .. } => Some(*every_seconds),
254        _ => None,
255    }
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
259struct SchedulesIndex {
260    version: u32,
261    updated_at: DateTime<Utc>,
262    schedules: HashMap<String, ScheduleEntry>,
263    #[serde(default)]
264    run_records: HashMap<String, ScheduleRunRecord>,
265}
266
267impl SchedulesIndex {
268    fn empty() -> Self {
269        Self {
270            version: 4,
271            updated_at: Utc::now(),
272            schedules: HashMap::new(),
273            run_records: HashMap::new(),
274        }
275    }
276}
277
278#[derive(Debug, Clone, Default)]
279pub struct ScheduleDefinitionChanges {
280    pub trigger: Option<ScheduleTrigger>,
281    pub timezone: Option<String>,
282    pub start_at: Option<DateTime<Utc>>,
283    pub end_at: Option<DateTime<Utc>>,
284    pub misfire_policy: Option<MisFirePolicy>,
285    pub overlap_policy: Option<OverlapPolicy>,
286}
287
288fn normalize_optional_string(value: Option<String>) -> Option<String> {
289    value
290        .map(|value| value.trim().to_string())
291        .filter(|value| !value.is_empty())
292}
293
294fn definition_window(definition: &ScheduleDefinitionChanges) -> ScheduleWindow {
295    ScheduleWindow {
296        start_at: definition.start_at,
297        end_at: definition.end_at,
298    }
299}
300
301fn compute_initial_next_run_at(
302    trigger: &ScheduleTrigger,
303    timezone: Option<&str>,
304    window: &ScheduleWindow,
305    now: DateTime<Utc>,
306) -> io::Result<DateTime<Utc>> {
307    let engine = default_trigger_engine();
308    let runtime_timezone = match trigger {
309        ScheduleTrigger::Interval { .. } => None,
310        _ => timezone,
311    };
312    engine
313        .next_after(trigger, runtime_timezone, now, window)
314        .map_err(|error| other_io_error(format!("failed to compute initial next run: {error}")))?
315        .ok_or_else(|| other_io_error("schedule has no next run within configured window"))
316}
317
318fn normalize_trigger_for_storage(
319    trigger: ScheduleTrigger,
320    anchor_at: DateTime<Utc>,
321) -> ScheduleTrigger {
322    match trigger {
323        ScheduleTrigger::Interval {
324            every_seconds,
325            anchor_at: existing_anchor,
326        } => ScheduleTrigger::Interval {
327            every_seconds,
328            anchor_at: existing_anchor.or(Some(anchor_at)),
329        },
330        other => other,
331    }
332}
333
334fn normalize_loaded_index(mut index: SchedulesIndex) -> (SchedulesIndex, bool) {
335    let mut changed = false;
336    if index.version < 4 {
337        index.version = 4;
338        changed = true;
339    }
340    for entry in index.schedules.values_mut() {
341        changed |= normalize_loaded_schedule_entry(entry);
342    }
343    if changed {
344        index.updated_at = Utc::now();
345    }
346    (index, changed)
347}
348
349fn normalize_loaded_schedule_entry(entry: &mut ScheduleEntry) -> bool {
350    let mut changed = false;
351
352    let normalized_timezone = normalize_optional_string(entry.timezone.clone());
353    if entry.timezone != normalized_timezone {
354        entry.timezone = normalized_timezone;
355        changed = true;
356    }
357
358    let derived_anchor_at = entry.derived_anchor_at();
359    if let ScheduleTrigger::Interval {
360        every_seconds,
361        anchor_at,
362    } = &mut entry.trigger
363    {
364        if anchor_at.is_none() {
365            if let Some(derived_anchor_at) = derived_anchor_at {
366                *anchor_at = Some(derived_anchor_at);
367                changed = true;
368            }
369        }
370        if *every_seconds == 0 {
371            *every_seconds = 1;
372            changed = true;
373        }
374    }
375
376    if entry.state.next_fire_at.is_none() {
377        entry.state.next_fire_at = Some(entry.created_at);
378        changed = true;
379    }
380
381    changed
382}
383
384fn current_due_at(entry: &ScheduleEntry) -> Option<DateTime<Utc>> {
385    let mut due_at = entry.state.next_fire_at?;
386    if let Some(start_at) = entry.start_at {
387        if due_at < start_at {
388            due_at = start_at;
389        }
390    }
391    if let Some(end_at) = entry.end_at {
392        if due_at > end_at {
393            return None;
394        }
395    }
396    Some(due_at)
397}
398
399fn overlap_blocks_dispatch(entry: &ScheduleEntry) -> bool {
400    match entry.overlap_policy {
401        OverlapPolicy::Allow => false,
402        OverlapPolicy::Skip => entry.state.running_run_count > 0,
403        OverlapPolicy::QueueOne => entry.state.queued_run_count > 0,
404    }
405}
406
407fn apply_overlap_dispatch_limit(entry: &ScheduleEntry, dispatch_count: u32) -> u32 {
408    match entry.overlap_policy {
409        OverlapPolicy::Allow | OverlapPolicy::Skip => dispatch_count,
410        OverlapPolicy::QueueOne => dispatch_count.min(1),
411    }
412}
413
414fn compute_misfire_dispatch_count(entry: &ScheduleEntry, now: DateTime<Utc>) -> u32 {
415    let Some(next_fire_at) = entry.state.next_fire_at else {
416        return 0;
417    };
418    let lateness_seconds = now.signed_duration_since(next_fire_at).num_seconds().max(0) as u64;
419    let interval_seconds = interval_seconds_from_trigger(&entry.trigger).unwrap_or(0);
420
421    match entry.misfire_policy {
422        MisFirePolicy::RunOnce => 1,
423        MisFirePolicy::Skip => 0,
424        MisFirePolicy::CatchUpAll => lateness_seconds
425            .checked_div(interval_seconds)
426            .map(|q| q.saturating_add(1) as u32)
427            .unwrap_or(1),
428        MisFirePolicy::CatchUpWindow {
429            max_catch_up_runs,
430            max_lateness_seconds,
431        } => {
432            if lateness_seconds > max_lateness_seconds {
433                0
434            } else {
435                lateness_seconds
436                    .checked_div(interval_seconds)
437                    .map(|q| (q.saturating_add(1) as u32).min(max_catch_up_runs.max(1)))
438                    .unwrap_or(1)
439            }
440        }
441    }
442}
443
444fn runtime_trigger(entry: &ScheduleEntry) -> ScheduleTrigger {
445    match &entry.trigger {
446        ScheduleTrigger::Interval { every_seconds, .. } => {
447            ScheduleTrigger::legacy_interval(*every_seconds, None)
448        }
449        other => other.clone(),
450    }
451}
452
453fn runtime_timezone<'a>(entry: &'a ScheduleEntry, trigger: &ScheduleTrigger) -> Option<&'a str> {
454    match trigger {
455        ScheduleTrigger::Interval { .. } => None,
456        _ => entry.timezone.as_deref(),
457    }
458}
459
460fn compute_next_run_at_with_engine(
461    entry: &ScheduleEntry,
462    now: DateTime<Utc>,
463    engine: &dyn TriggerEngine,
464) -> io::Result<Option<DateTime<Utc>>> {
465    let trigger = runtime_trigger(entry);
466    let window = ScheduleWindow {
467        start_at: entry.start_at,
468        end_at: entry.end_at,
469    };
470    engine
471        .next_after(&trigger, runtime_timezone(entry, &trigger), now, &window)
472        .map_err(|error| {
473            other_io_error(format!(
474                "failed to compute next fire for schedule {}: {}",
475                entry.id, error
476            ))
477        })
478}
479
480fn record_missed_occurrence(entry: &mut ScheduleEntry) {
481    entry.state.total_missed_count = entry.state.total_missed_count.saturating_add(1);
482}
483
484fn non_negative_duration_ms(from: DateTime<Utc>, to: DateTime<Utc>) -> u64 {
485    to.signed_duration_since(from).num_milliseconds().max(0) as u64
486}
487
488fn make_queued_run_record(
489    schedule_id: &str,
490    scheduled_for: DateTime<Utc>,
491    claimed_at: DateTime<Utc>,
492    was_catch_up: bool,
493) -> ScheduleRunRecord {
494    ScheduleRunRecord {
495        run_id: Uuid::new_v4().to_string(),
496        schedule_id: schedule_id.to_string(),
497        scheduled_for,
498        claimed_at,
499        started_at: None,
500        completed_at: None,
501        status: ScheduleRunStatus::Queued,
502        outcome_reason: None,
503        session_id: None,
504        dispatch_lag_ms: None,
505        execution_duration_ms: None,
506        was_catch_up,
507    }
508}
509
510fn make_fallback_run_record(
511    run_id: &str,
512    schedule_id: &str,
513    now: DateTime<Utc>,
514    status: ScheduleRunStatus,
515) -> ScheduleRunRecord {
516    ScheduleRunRecord {
517        run_id: run_id.to_string(),
518        schedule_id: schedule_id.to_string(),
519        scheduled_for: now,
520        claimed_at: now,
521        started_at: matches!(status, ScheduleRunStatus::Running).then_some(now),
522        completed_at: matches!(
523            status,
524            ScheduleRunStatus::Success
525                | ScheduleRunStatus::Failed
526                | ScheduleRunStatus::Skipped
527                | ScheduleRunStatus::Missed
528                | ScheduleRunStatus::Cancelled
529        )
530        .then_some(now),
531        status,
532        outcome_reason: None,
533        session_id: None,
534        dispatch_lag_ms: None,
535        execution_duration_ms: None,
536        was_catch_up: false,
537    }
538}
539
540fn scheduled_for_dispatch(
541    entry: &ScheduleEntry,
542    due_at: DateTime<Utc>,
543    dispatch_index: u32,
544) -> DateTime<Utc> {
545    match interval_seconds_from_trigger(&entry.trigger) {
546        Some(interval_seconds) if dispatch_index > 0 => {
547            due_at + Duration::seconds(interval_seconds as i64 * dispatch_index as i64)
548        }
549        _ => due_at,
550    }
551}
552
553fn update_run_record_started(
554    record: &mut ScheduleRunRecord,
555    started_at: DateTime<Utc>,
556    session_id: Option<&str>,
557) {
558    record.status = ScheduleRunStatus::Running;
559    record.started_at = Some(started_at);
560    record.dispatch_lag_ms = Some(non_negative_duration_ms(record.scheduled_for, started_at));
561    if let Some(session_id) = session_id {
562        record.session_id = Some(session_id.to_string());
563    }
564}
565
566fn update_run_record_terminal(
567    record: &mut ScheduleRunRecord,
568    status: ScheduleRunStatus,
569    completed_at: DateTime<Utc>,
570    outcome_reason: Option<String>,
571    session_id: Option<&str>,
572) {
573    record.status = status;
574    record.completed_at = Some(completed_at);
575    if record.dispatch_lag_ms.is_none() {
576        record.dispatch_lag_ms = Some(non_negative_duration_ms(record.scheduled_for, completed_at));
577    }
578    if let Some(started_at) = record.started_at {
579        record.execution_duration_ms = Some(non_negative_duration_ms(started_at, completed_at));
580    }
581    if let Some(outcome_reason) = normalize_optional_string(outcome_reason) {
582        record.outcome_reason = Some(outcome_reason);
583    }
584    if let Some(session_id) = session_id {
585        record.session_id = Some(session_id.to_string());
586    }
587}
588
589fn apply_terminal_run_status(
590    entry: &mut ScheduleEntry,
591    status: ScheduleRunStatus,
592    finished_at: DateTime<Utc>,
593) -> io::Result<()> {
594    entry.state.running_run_count = entry.state.running_run_count.saturating_sub(1);
595    entry.state.last_finished_at = Some(finished_at);
596
597    match status {
598        ScheduleRunStatus::Success => {
599            entry.state.last_success_at = Some(finished_at);
600            entry.state.total_run_count = entry.state.total_run_count.saturating_add(1);
601            entry.state.total_success_count = entry.state.total_success_count.saturating_add(1);
602            entry.state.consecutive_failures = 0;
603            Ok(())
604        }
605        ScheduleRunStatus::Failed | ScheduleRunStatus::Cancelled => {
606            entry.state.last_failure_at = Some(finished_at);
607            entry.state.total_run_count = entry.state.total_run_count.saturating_add(1);
608            entry.state.total_failure_count = entry.state.total_failure_count.saturating_add(1);
609            entry.state.consecutive_failures = entry.state.consecutive_failures.saturating_add(1);
610            Ok(())
611        }
612        ScheduleRunStatus::Skipped => Ok(()),
613        ScheduleRunStatus::Missed | ScheduleRunStatus::Queued | ScheduleRunStatus::Running => {
614            Err(other_io_error(format!(
615                "non-terminal or unsupported run status for lifecycle accounting: {:?}",
616                status
617            )))
618        }
619    }
620}
621
622#[derive(Debug, Clone)]
623pub struct ClaimedScheduleRun {
624    pub run_id: String,
625    pub schedule_id: String,
626    pub schedule_name: String,
627    pub run_config: ScheduleRunConfig,
628    pub scheduled_for: DateTime<Utc>,
629    pub claimed_at: DateTime<Utc>,
630    pub was_catch_up: bool,
631}
632
633#[derive(Debug)]
634pub struct ScheduleStore {
635    index_path: PathBuf,
636    index: RwLock<SchedulesIndex>,
637    write_lock: Mutex<()>,
638}
639
640impl ScheduleStore {
641    pub async fn new(bamboo_home_dir: PathBuf) -> io::Result<Self> {
642        let index_path = bamboo_home_dir.join("schedules.json");
643
644        let (index, needs_backfill_write) = if index_path.exists() {
645            let raw = fs::read_to_string(&index_path).await?;
646            match serde_json::from_str::<SchedulesIndex>(&raw) {
647                Ok(parsed) => normalize_loaded_index(parsed),
648                Err(e) => {
649                    // Corrupted file (e.g. partial write before crash).
650                    // Back up the broken file and start fresh so the app can boot.
651                    let backup_path =
652                        index_path.with_extension(format!("json.corrupted.{}", Uuid::new_v4()));
653                    tracing::error!(
654                        "schedules.json is corrupted ({}). Backing up to {} and resetting.",
655                        e,
656                        backup_path.display()
657                    );
658                    if let Err(rename_err) = fs::rename(&index_path, &backup_path).await {
659                        tracing::warn!(
660                            "Failed to back up corrupted schedules.json: {}",
661                            rename_err
662                        );
663                    }
664                    let fresh = SchedulesIndex::empty();
665                    atomic_write_json(
666                        &index_path,
667                        serde_json::to_vec_pretty(&fresh)
668                            .map_err(|e| other_io_error(e.to_string()))?,
669                    )
670                    .await?;
671                    (fresh, false)
672                }
673            }
674        } else {
675            let index = SchedulesIndex::empty();
676            atomic_write_json(
677                &index_path,
678                serde_json::to_vec_pretty(&index).map_err(|e| other_io_error(e.to_string()))?,
679            )
680            .await?;
681            (index, false)
682        };
683
684        if needs_backfill_write {
685            atomic_write_json(
686                &index_path,
687                serde_json::to_vec_pretty(&index).map_err(|e| other_io_error(e.to_string()))?,
688            )
689            .await?;
690        }
691
692        // Clean up stale temp files left behind by interrupted atomic writes.
693        cleanup_stale_tmp_files(&bamboo_home_dir, "schedules.json.tmp.").await;
694
695        Ok(Self {
696            index_path,
697            index: RwLock::new(index),
698            write_lock: Mutex::new(()),
699        })
700    }
701
702    pub fn index_path(&self) -> &Path {
703        &self.index_path
704    }
705
706    async fn update_index<F, T>(&self, f: F) -> io::Result<T>
707    where
708        F: FnOnce(&mut SchedulesIndex) -> io::Result<T>,
709    {
710        let _guard = self.write_lock.lock().await;
711        let mut index = self.index.write().await;
712        let out = f(&mut index)?;
713        index.updated_at = Utc::now();
714        atomic_write_json(
715            &self.index_path,
716            serde_json::to_vec_pretty(&*index).map_err(|e| other_io_error(e.to_string()))?,
717        )
718        .await?;
719        Ok(out)
720    }
721
722    pub async fn list_schedules(&self) -> Vec<ScheduleEntry> {
723        let index = self.index.read().await;
724        let mut items: Vec<_> = index.schedules.values().cloned().collect();
725        items.sort_by_key(|e| Reverse(e.updated_at));
726        items
727    }
728
729    pub async fn get_schedule(&self, id: &str) -> Option<ScheduleEntry> {
730        let index = self.index.read().await;
731        index.schedules.get(id).cloned()
732    }
733
734    pub async fn get_run_record(&self, run_id: &str) -> Option<ScheduleRunRecord> {
735        let index = self.index.read().await;
736        index.run_records.get(run_id).cloned()
737    }
738
739    pub async fn list_run_records_for_schedule(&self, schedule_id: &str) -> Vec<ScheduleRunRecord> {
740        let index = self.index.read().await;
741        let mut items = index
742            .run_records
743            .values()
744            .filter(|record| record.schedule_id == schedule_id)
745            .cloned()
746            .collect::<Vec<_>>();
747        items.sort_by_key(|r| Reverse(r.claimed_at));
748        items
749    }
750
751    pub async fn create_schedule(
752        &self,
753        name: String,
754        trigger: ScheduleTrigger,
755        enabled: bool,
756        run_config: ScheduleRunConfig,
757    ) -> io::Result<ScheduleEntry> {
758        self.create_schedule_with_definition(
759            name,
760            enabled,
761            run_config,
762            ScheduleDefinitionChanges {
763                trigger: Some(trigger),
764                ..Default::default()
765            },
766        )
767        .await
768    }
769
770    pub async fn create_schedule_with_definition(
771        &self,
772        name: String,
773        enabled: bool,
774        run_config: ScheduleRunConfig,
775        definition: ScheduleDefinitionChanges,
776    ) -> io::Result<ScheduleEntry> {
777        let now = Utc::now();
778        let id = Uuid::new_v4().to_string();
779        let window = definition_window(&definition);
780        let trigger = normalize_trigger_for_storage(
781            definition
782                .trigger
783                .ok_or_else(|| other_io_error("schedule trigger is required"))?,
784            now,
785        );
786        let timezone = normalize_optional_string(definition.timezone);
787        let next_fire_at =
788            compute_initial_next_run_at(&trigger, timezone.as_deref(), &window, now)?;
789        let entry = ScheduleEntry {
790            id: id.clone(),
791            name,
792            enabled,
793            trigger,
794            timezone,
795            start_at: definition.start_at,
796            end_at: definition.end_at,
797            misfire_policy: definition.misfire_policy.unwrap_or_default(),
798            overlap_policy: definition.overlap_policy.unwrap_or_default(),
799            created_at: now,
800            updated_at: now,
801            state: ScheduleState {
802                next_fire_at: Some(next_fire_at),
803                ..Default::default()
804            },
805            run_config,
806        };
807
808        self.update_index(|index| {
809            index.schedules.insert(id.clone(), entry.clone());
810            Ok(entry.clone())
811        })
812        .await
813    }
814
815    pub async fn patch_schedule(
816        &self,
817        id: &str,
818        name: Option<String>,
819        enabled: Option<bool>,
820        trigger: Option<ScheduleTrigger>,
821        run_config: Option<ScheduleRunConfig>,
822    ) -> io::Result<Option<ScheduleEntry>> {
823        self.patch_schedule_with_definition(
824            id,
825            name,
826            enabled,
827            run_config,
828            ScheduleDefinitionChanges {
829                trigger,
830                ..Default::default()
831            },
832        )
833        .await
834    }
835
836    pub async fn patch_schedule_with_definition(
837        &self,
838        id: &str,
839        name: Option<String>,
840        enabled: Option<bool>,
841        run_config: Option<ScheduleRunConfig>,
842        definition: ScheduleDefinitionChanges,
843    ) -> io::Result<Option<ScheduleEntry>> {
844        self.update_index(|index| {
845            let Some(existing) = index.schedules.get_mut(id) else {
846                return Ok(None);
847            };
848            let now = Utc::now();
849            if let Some(name) = name {
850                existing.name = name;
851            }
852            if let Some(enabled) = enabled {
853                existing.enabled = enabled;
854            }
855
856            let trigger = normalize_trigger_for_storage(
857                definition
858                    .trigger
859                    .clone()
860                    .unwrap_or_else(|| existing.trigger.clone()),
861                existing.derived_anchor_at().unwrap_or(now),
862            );
863            existing.trigger = trigger.clone();
864
865            if let Some(timezone) = definition.timezone {
866                existing.timezone = normalize_optional_string(Some(timezone));
867            }
868            if let Some(start_at) = definition.start_at {
869                existing.start_at = Some(start_at);
870            }
871            if let Some(end_at) = definition.end_at {
872                existing.end_at = Some(end_at);
873            }
874            if let Some(misfire_policy) = definition.misfire_policy {
875                existing.misfire_policy = misfire_policy;
876            }
877            if let Some(overlap_policy) = definition.overlap_policy {
878                existing.overlap_policy = overlap_policy;
879            }
880            if let Some(run_config) = run_config {
881                existing.run_config = run_config;
882            }
883
884            let window = ScheduleWindow {
885                start_at: existing.start_at,
886                end_at: existing.end_at,
887            };
888            existing.state.next_fire_at = Some(compute_initial_next_run_at(
889                &trigger,
890                existing.timezone.as_deref(),
891                &window,
892                now,
893            )?);
894            existing.updated_at = now;
895            Ok(Some(existing.clone()))
896        })
897        .await
898    }
899
900    pub async fn delete_schedule(&self, id: &str) -> io::Result<bool> {
901        self.update_index(|index| {
902            let deleted = index.schedules.remove(id).is_some();
903            if deleted {
904                index
905                    .run_records
906                    .retain(|_, record| record.schedule_id != id);
907            }
908            Ok(deleted)
909        })
910        .await
911    }
912
913    /// Claim all due schedules and advance their `next_run_at`.
914    ///
915    /// Returns a list of run descriptors to execute out-of-band.
916    ///
917    /// **Important**: only writes to disk when at least one schedule is actually
918    /// due.  The ticker calls this every few seconds, so avoiding unnecessary
919    /// writes is critical for disk health and crash-safety.
920    pub async fn claim_due_runs(&self, now: DateTime<Utc>) -> io::Result<Vec<ClaimedScheduleRun>> {
921        let engine = default_trigger_engine();
922        self.claim_due_runs_with_engine(now, engine.as_ref()).await
923    }
924
925    pub async fn claim_due_runs_with_engine(
926        &self,
927        now: DateTime<Utc>,
928        engine: &dyn TriggerEngine,
929    ) -> io::Result<Vec<ClaimedScheduleRun>> {
930        {
931            let index = self.index.read().await;
932            let any_due = index.schedules.values().any(|entry| {
933                entry.enabled && current_due_at(entry).is_some_and(|due_at| due_at <= now)
934            });
935            if !any_due {
936                return Ok(Vec::new());
937            }
938        }
939
940        self.update_index(|index| {
941            let mut out = Vec::new();
942            let (schedules, run_records) = (&mut index.schedules, &mut index.run_records);
943            for entry in schedules.values_mut() {
944                if !entry.enabled {
945                    continue;
946                }
947                let Some(due_at) = current_due_at(entry) else {
948                    continue;
949                };
950                if due_at > now {
951                    continue;
952                }
953
954                let dispatch_count = compute_misfire_dispatch_count(entry, now);
955                if dispatch_count == 0 {
956                    entry.state.last_scheduled_at = Some(now);
957                    record_missed_occurrence(entry);
958                    match compute_next_run_at_with_engine(entry, now, engine) {
959                        Ok(Some(next_fire_at)) => entry.state.next_fire_at = Some(next_fire_at),
960                        Ok(None) => {
961                            entry.state.next_fire_at = None;
962                            entry.enabled = false;
963                        }
964                        Err(error) => {
965                            tracing::warn!(
966                                "failed to compute next scheduled fire for {} after misfire skip: {}. falling back to legacy interval semantics",
967                                entry.id,
968                                error
969                            );
970                            let fallback = interval_seconds_from_trigger(&entry.trigger).unwrap_or(60);
971                            entry.state.next_fire_at = Some(now + Duration::seconds(fallback as i64));
972                        }
973                    }
974                    entry.updated_at = now;
975                    continue;
976                }
977
978                let blocked = overlap_blocks_dispatch(entry);
979                match entry.overlap_policy {
980                    OverlapPolicy::Skip if blocked => {
981                        entry.state.last_scheduled_at = Some(now);
982                        record_missed_occurrence(entry);
983                        match compute_next_run_at_with_engine(entry, now, engine) {
984                            Ok(Some(next_fire_at)) => entry.state.next_fire_at = Some(next_fire_at),
985                            Ok(None) => {
986                                entry.state.next_fire_at = None;
987                                entry.enabled = false;
988                            }
989                            Err(error) => {
990                                tracing::warn!(
991                                    "failed to compute next scheduled fire for {} after overlap skip: {}. falling back to legacy interval semantics",
992                                    entry.id,
993                                    error
994                                );
995                                let fallback = interval_seconds_from_trigger(&entry.trigger).unwrap_or(60);
996                                entry.state.next_fire_at = Some(now + Duration::seconds(fallback as i64));
997                            }
998                        }
999                        entry.updated_at = now;
1000                        continue;
1001                    }
1002                    OverlapPolicy::QueueOne if blocked => {
1003                        continue;
1004                    }
1005                    _ => {}
1006                }
1007
1008                let dispatch_count = apply_overlap_dispatch_limit(entry, dispatch_count);
1009                entry.state.last_scheduled_at = Some(now);
1010                match compute_next_run_at_with_engine(entry, now, engine) {
1011                    Ok(Some(next_fire_at)) => {
1012                        entry.state.next_fire_at = Some(next_fire_at);
1013                    }
1014                    Ok(None) => {
1015                        entry.state.next_fire_at = None;
1016                        entry.enabled = false;
1017                    }
1018                    Err(error) => {
1019                        tracing::warn!(
1020                            "failed to compute next scheduled fire for {}: {}. falling back to legacy interval semantics",
1021                            entry.id,
1022                            error
1023                        );
1024                        let fallback = interval_seconds_from_trigger(&entry.trigger).unwrap_or(60);
1025                        entry.state.next_fire_at = Some(now + Duration::seconds(fallback as i64));
1026                    }
1027                }
1028                entry.state.queued_run_count = entry.state.queued_run_count.saturating_add(dispatch_count);
1029                entry.updated_at = now;
1030                for dispatch_index in 0..dispatch_count {
1031                    let scheduled_for = scheduled_for_dispatch(entry, due_at, dispatch_index);
1032                    let was_catch_up = scheduled_for < now;
1033                    let record = make_queued_run_record(&entry.id, scheduled_for, now, was_catch_up);
1034                    let run_id = record.run_id.clone();
1035                    run_records.insert(run_id.clone(), record);
1036                    out.push(ClaimedScheduleRun {
1037                        run_id,
1038                        schedule_id: entry.id.clone(),
1039                        schedule_name: entry.name.clone(),
1040                        run_config: entry.run_config.clone(),
1041                        scheduled_for,
1042                        claimed_at: now,
1043                        was_catch_up,
1044                    });
1045                }
1046            }
1047            Ok(out)
1048        })
1049        .await
1050    }
1051
1052    pub async fn mark_run_started(&self, schedule_id: &str, run_id: &str) -> io::Result<()> {
1053        self.update_index(|index| {
1054            let now = Utc::now();
1055            if let Some(entry) = index.schedules.get_mut(schedule_id) {
1056                entry.state.queued_run_count = entry.state.queued_run_count.saturating_sub(1);
1057                entry.state.running_run_count = entry.state.running_run_count.saturating_add(1);
1058                entry.state.last_started_at = Some(now);
1059                entry.updated_at = now;
1060            }
1061            let record = index
1062                .run_records
1063                .entry(run_id.to_string())
1064                .or_insert_with(|| {
1065                    make_fallback_run_record(run_id, schedule_id, now, ScheduleRunStatus::Queued)
1066                });
1067            update_run_record_started(record, now, None);
1068            Ok(())
1069        })
1070        .await
1071    }
1072
1073    pub async fn bind_run_session(
1074        &self,
1075        schedule_id: &str,
1076        run_id: &str,
1077        session_id: &str,
1078    ) -> io::Result<()> {
1079        self.update_index(|index| {
1080            let now = Utc::now();
1081            let record = index
1082                .run_records
1083                .entry(run_id.to_string())
1084                .or_insert_with(|| {
1085                    make_fallback_run_record(run_id, schedule_id, now, ScheduleRunStatus::Running)
1086                });
1087            record.session_id = Some(session_id.to_string());
1088            Ok(())
1089        })
1090        .await
1091    }
1092
1093    pub async fn mark_run_terminal(
1094        &self,
1095        schedule_id: &str,
1096        run_id: &str,
1097        status: ScheduleRunStatus,
1098        outcome_reason: Option<String>,
1099    ) -> io::Result<()> {
1100        self.update_index(|index| {
1101            let now = Utc::now();
1102            if let Some(entry) = index.schedules.get_mut(schedule_id) {
1103                apply_terminal_run_status(entry, status, now)?;
1104                entry.updated_at = now;
1105            }
1106            let record = index
1107                .run_records
1108                .entry(run_id.to_string())
1109                .or_insert_with(|| make_fallback_run_record(run_id, schedule_id, now, status));
1110            update_run_record_terminal(record, status, now, outcome_reason, None);
1111            Ok(())
1112        })
1113        .await
1114    }
1115
1116    pub async fn mark_run_dequeued_without_start(
1117        &self,
1118        schedule_id: &str,
1119        run_id: &str,
1120        outcome_reason: Option<String>,
1121    ) -> io::Result<()> {
1122        self.update_index(|index| {
1123            let now = Utc::now();
1124            if let Some(entry) = index.schedules.get_mut(schedule_id) {
1125                entry.state.queued_run_count = entry.state.queued_run_count.saturating_sub(1);
1126                record_missed_occurrence(entry);
1127                entry.updated_at = now;
1128            }
1129            let record = index
1130                .run_records
1131                .entry(run_id.to_string())
1132                .or_insert_with(|| {
1133                    make_fallback_run_record(run_id, schedule_id, now, ScheduleRunStatus::Queued)
1134                });
1135            update_run_record_terminal(
1136                record,
1137                ScheduleRunStatus::Missed,
1138                now,
1139                outcome_reason,
1140                None,
1141            );
1142            Ok(())
1143        })
1144        .await
1145    }
1146
1147    /// Create a run descriptor immediately (does not change the schedule cadence).
1148    pub async fn create_run_now(&self, id: &str) -> io::Result<Option<ClaimedScheduleRun>> {
1149        self.update_index(|index| {
1150            let Some(entry) = index.schedules.get(id).cloned() else {
1151                return Ok(None);
1152            };
1153            let now = Utc::now();
1154            let record = make_queued_run_record(&entry.id, now, now, false);
1155            let run_id = record.run_id.clone();
1156            index.run_records.insert(run_id.clone(), record);
1157            Ok(Some(ClaimedScheduleRun {
1158                run_id,
1159                schedule_id: entry.id,
1160                schedule_name: entry.name,
1161                run_config: entry.run_config,
1162                scheduled_for: now,
1163                claimed_at: now,
1164                was_catch_up: false,
1165            }))
1166        })
1167        .await
1168    }
1169}
1170
1171#[cfg(test)]
1172mod tests {
1173    use super::*;
1174    use tempfile::tempdir;
1175
1176    #[tokio::test]
1177    async fn store_backfills_legacy_interval_trigger_on_load() {
1178        let dir = tempdir().unwrap();
1179        let now = DateTime::parse_from_rfc3339("2026-04-04T10:00:00Z")
1180            .unwrap()
1181            .with_timezone(&Utc);
1182        let next_run_at = DateTime::parse_from_rfc3339("2026-04-04T11:00:00Z")
1183            .unwrap()
1184            .with_timezone(&Utc);
1185
1186        let raw = serde_json::json!({
1187            "version": 1,
1188            "updated_at": now,
1189            "schedules": {
1190                "legacy-1": {
1191                    "id": "legacy-1",
1192                    "name": "legacy",
1193                    "enabled": true,
1194                    "interval_seconds": 3600,
1195                    "created_at": now,
1196                    "updated_at": now,
1197                    "last_run_at": null,
1198                    "next_run_at": next_run_at,
1199                    "run_config": { "auto_execute": false }
1200                }
1201            }
1202        });
1203        tokio::fs::write(
1204            dir.path().join("schedules.json"),
1205            serde_json::to_vec_pretty(&raw).unwrap(),
1206        )
1207        .await
1208        .unwrap();
1209
1210        let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1211        let schedule = store.get_schedule("legacy-1").await.unwrap();
1212        assert!(matches!(
1213            schedule.trigger,
1214            ScheduleTrigger::Interval {
1215                every_seconds: 3600,
1216                ..
1217            }
1218        ));
1219    }
1220
1221    #[tokio::test]
1222    async fn create_schedule_with_definition_persists_interval_trigger_metadata() {
1223        let dir = tempdir().unwrap();
1224        let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1225
1226        let created = store
1227            .create_schedule_with_definition(
1228                "interval".to_string(),
1229                true,
1230                ScheduleRunConfig::default(),
1231                ScheduleDefinitionChanges {
1232                    trigger: Some(ScheduleTrigger::Interval {
1233                        every_seconds: 300,
1234                        anchor_at: None,
1235                    }),
1236                    timezone: Some("Asia/Shanghai".to_string()),
1237                    misfire_policy: Some(MisFirePolicy::RunOnce),
1238                    overlap_policy: Some(OverlapPolicy::QueueOne),
1239                    ..Default::default()
1240                },
1241            )
1242            .await
1243            .unwrap();
1244
1245        assert!(matches!(
1246            created.trigger,
1247            ScheduleTrigger::Interval {
1248                every_seconds: 300,
1249                ..
1250            }
1251        ));
1252        assert_eq!(created.timezone.as_deref(), Some("Asia/Shanghai"));
1253        assert_eq!(created.misfire_policy, MisFirePolicy::RunOnce);
1254        assert_eq!(created.overlap_policy, OverlapPolicy::QueueOne);
1255    }
1256
1257    #[tokio::test]
1258    async fn patch_schedule_with_definition_updates_interval_trigger_metadata() {
1259        let dir = tempdir().unwrap();
1260        let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1261
1262        let created = store
1263            .create_schedule(
1264                "interval".to_string(),
1265                ScheduleTrigger::Interval {
1266                    every_seconds: 300,
1267                    anchor_at: None,
1268                },
1269                true,
1270                ScheduleRunConfig::default(),
1271            )
1272            .await
1273            .unwrap();
1274
1275        let patched = store
1276            .patch_schedule_with_definition(
1277                &created.id,
1278                None,
1279                None,
1280                None,
1281                ScheduleDefinitionChanges {
1282                    trigger: Some(ScheduleTrigger::Interval {
1283                        every_seconds: 600,
1284                        anchor_at: None,
1285                    }),
1286                    timezone: Some("UTC".to_string()),
1287                    misfire_policy: Some(MisFirePolicy::CatchUpAll),
1288                    overlap_policy: Some(OverlapPolicy::Skip),
1289                    ..Default::default()
1290                },
1291            )
1292            .await
1293            .unwrap()
1294            .unwrap();
1295
1296        assert!(matches!(
1297            patched.trigger,
1298            ScheduleTrigger::Interval {
1299                every_seconds: 600,
1300                ..
1301            }
1302        ));
1303        assert_eq!(patched.timezone.as_deref(), Some("UTC"));
1304        assert_eq!(patched.misfire_policy, MisFirePolicy::CatchUpAll);
1305        assert_eq!(patched.overlap_policy, OverlapPolicy::Skip);
1306    }
1307
1308    #[tokio::test]
1309    async fn claim_due_runs_with_engine_uses_runtime_adapter_for_interval_trigger() {
1310        let dir = tempdir().unwrap();
1311        let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1312
1313        let now = DateTime::parse_from_rfc3339("2026-04-04T10:00:00Z")
1314            .unwrap()
1315            .with_timezone(&Utc);
1316
1317        let created = store
1318            .create_schedule_with_definition(
1319                "interval".to_string(),
1320                true,
1321                ScheduleRunConfig::default(),
1322                ScheduleDefinitionChanges {
1323                    trigger: Some(ScheduleTrigger::Interval {
1324                        every_seconds: 300,
1325                        anchor_at: Some(now - Duration::seconds(300)),
1326                    }),
1327                    ..Default::default()
1328                },
1329            )
1330            .await
1331            .unwrap();
1332
1333        store
1334            .update_index(|index| {
1335                let entry = index.schedules.get_mut(&created.id).unwrap();
1336                entry.state.next_fire_at = Some(now);
1337                Ok(())
1338            })
1339            .await
1340            .unwrap();
1341
1342        let engine = default_trigger_engine();
1343        let claimed = store
1344            .claim_due_runs_with_engine(now, engine.as_ref())
1345            .await
1346            .unwrap();
1347        assert_eq!(claimed.len(), 1);
1348
1349        let updated = store.get_schedule(&created.id).await.unwrap();
1350        assert_eq!(updated.state.last_scheduled_at, Some(now));
1351        assert_eq!(
1352            updated.state.next_fire_at,
1353            Some(now + Duration::seconds(300))
1354        );
1355    }
1356
1357    #[tokio::test]
1358    async fn create_schedule_with_definition_initializes_monthly_next_run() {
1359        let dir = tempdir().unwrap();
1360        let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1361
1362        let created = store
1363            .create_schedule_with_definition(
1364                "monthly".to_string(),
1365                true,
1366                ScheduleRunConfig::default(),
1367                ScheduleDefinitionChanges {
1368                    trigger: Some(ScheduleTrigger::Monthly {
1369                        days: vec![1, 15],
1370                        hour: 9,
1371                        minute: 0,
1372                        second: 0,
1373                    }),
1374                    timezone: Some("UTC".to_string()),
1375                    ..Default::default()
1376                },
1377            )
1378            .await
1379            .unwrap();
1380
1381        assert!(matches!(
1382            created.trigger,
1383            ScheduleTrigger::Monthly {
1384                days,
1385                hour: 9,
1386                minute: 0,
1387                second: 0
1388            } if days == vec![1, 15]
1389        ));
1390        assert!(created
1391            .state
1392            .next_fire_at
1393            .is_some_and(|next| next > created.created_at));
1394    }
1395
1396    #[tokio::test]
1397    async fn misfire_skip_does_not_dispatch_run() {
1398        let dir = tempdir().unwrap();
1399        let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1400        let now = DateTime::parse_from_rfc3339("2026-04-04T10:00:00Z")
1401            .unwrap()
1402            .with_timezone(&Utc);
1403
1404        let created = store
1405            .create_schedule_with_definition(
1406                "skip".to_string(),
1407                true,
1408                ScheduleRunConfig::default(),
1409                ScheduleDefinitionChanges {
1410                    trigger: Some(ScheduleTrigger::Interval {
1411                        every_seconds: 300,
1412                        anchor_at: None,
1413                    }),
1414                    misfire_policy: Some(MisFirePolicy::Skip),
1415                    ..Default::default()
1416                },
1417            )
1418            .await
1419            .unwrap();
1420
1421        store
1422            .update_index(|index| {
1423                let entry = index.schedules.get_mut(&created.id).unwrap();
1424                entry.state.next_fire_at = Some(now - Duration::seconds(900));
1425                Ok(())
1426            })
1427            .await
1428            .unwrap();
1429
1430        let engine = default_trigger_engine();
1431        let claimed = store
1432            .claim_due_runs_with_engine(now, engine.as_ref())
1433            .await
1434            .unwrap();
1435        assert!(claimed.is_empty());
1436
1437        let updated = store.get_schedule(&created.id).await.unwrap();
1438        assert_eq!(updated.state.last_scheduled_at, Some(now));
1439        assert!(updated.state.next_fire_at.is_some_and(|next| next > now));
1440        assert_eq!(updated.state.queued_run_count, 0);
1441    }
1442
1443    #[tokio::test]
1444    async fn overlap_skip_does_not_dispatch_when_running() {
1445        let dir = tempdir().unwrap();
1446        let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1447        let now = DateTime::parse_from_rfc3339("2026-04-04T10:00:00Z")
1448            .unwrap()
1449            .with_timezone(&Utc);
1450
1451        let created = store
1452            .create_schedule_with_definition(
1453                "skip-overlap".to_string(),
1454                true,
1455                ScheduleRunConfig::default(),
1456                ScheduleDefinitionChanges {
1457                    trigger: Some(ScheduleTrigger::Interval {
1458                        every_seconds: 300,
1459                        anchor_at: None,
1460                    }),
1461                    overlap_policy: Some(OverlapPolicy::Skip),
1462                    ..Default::default()
1463                },
1464            )
1465            .await
1466            .unwrap();
1467
1468        store
1469            .update_index(|index| {
1470                let entry = index.schedules.get_mut(&created.id).unwrap();
1471                entry.state.next_fire_at = Some(now);
1472                entry.state.running_run_count = 1;
1473                Ok(())
1474            })
1475            .await
1476            .unwrap();
1477
1478        let engine = default_trigger_engine();
1479        let claimed = store
1480            .claim_due_runs_with_engine(now, engine.as_ref())
1481            .await
1482            .unwrap();
1483        assert!(claimed.is_empty());
1484
1485        let updated = store.get_schedule(&created.id).await.unwrap();
1486        assert_eq!(updated.state.running_run_count, 1);
1487        assert!(updated.state.next_fire_at.is_some_and(|next| next > now));
1488        assert_eq!(updated.state.queued_run_count, 0);
1489    }
1490
1491    #[tokio::test]
1492    async fn overlap_queue_one_does_not_add_more_than_one_pending_run() {
1493        let dir = tempdir().unwrap();
1494        let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1495        let now = DateTime::parse_from_rfc3339("2026-04-04T10:00:00Z")
1496            .unwrap()
1497            .with_timezone(&Utc);
1498
1499        let created = store
1500            .create_schedule_with_definition(
1501                "queue-one".to_string(),
1502                true,
1503                ScheduleRunConfig::default(),
1504                ScheduleDefinitionChanges {
1505                    trigger: Some(ScheduleTrigger::Interval {
1506                        every_seconds: 300,
1507                        anchor_at: None,
1508                    }),
1509                    overlap_policy: Some(OverlapPolicy::QueueOne),
1510                    ..Default::default()
1511                },
1512            )
1513            .await
1514            .unwrap();
1515
1516        store
1517            .update_index(|index| {
1518                let entry = index.schedules.get_mut(&created.id).unwrap();
1519                entry.state.next_fire_at = Some(now);
1520                entry.state.queued_run_count = 1;
1521                Ok(())
1522            })
1523            .await
1524            .unwrap();
1525
1526        let engine = default_trigger_engine();
1527        let claimed = store
1528            .claim_due_runs_with_engine(now, engine.as_ref())
1529            .await
1530            .unwrap();
1531        assert!(claimed.is_empty());
1532
1533        let updated = store.get_schedule(&created.id).await.unwrap();
1534        assert_eq!(updated.state.queued_run_count, 1);
1535    }
1536
1537    #[tokio::test]
1538    async fn overlap_queue_one_limits_catch_up_to_single_pending_run() {
1539        let dir = tempdir().unwrap();
1540        let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1541        let now = DateTime::parse_from_rfc3339("2026-04-04T10:00:00Z")
1542            .unwrap()
1543            .with_timezone(&Utc);
1544
1545        let created = store
1546            .create_schedule_with_definition(
1547                "queue-one-catchup".to_string(),
1548                true,
1549                ScheduleRunConfig::default(),
1550                ScheduleDefinitionChanges {
1551                    trigger: Some(ScheduleTrigger::Interval {
1552                        every_seconds: 300,
1553                        anchor_at: None,
1554                    }),
1555                    misfire_policy: Some(MisFirePolicy::CatchUpAll),
1556                    overlap_policy: Some(OverlapPolicy::QueueOne),
1557                    ..Default::default()
1558                },
1559            )
1560            .await
1561            .unwrap();
1562
1563        store
1564            .update_index(|index| {
1565                let entry = index.schedules.get_mut(&created.id).unwrap();
1566                entry.state.next_fire_at = Some(now - Duration::seconds(900));
1567                Ok(())
1568            })
1569            .await
1570            .unwrap();
1571
1572        let engine = default_trigger_engine();
1573        let claimed = store
1574            .claim_due_runs_with_engine(now, engine.as_ref())
1575            .await
1576            .unwrap();
1577        assert_eq!(claimed.len(), 1);
1578
1579        let updated = store.get_schedule(&created.id).await.unwrap();
1580        assert_eq!(updated.state.queued_run_count, 1);
1581        assert!(updated.state.next_fire_at.is_some_and(|next| next > now));
1582    }
1583
1584    #[tokio::test]
1585    async fn mark_run_terminal_records_success_accounting() {
1586        let dir = tempdir().unwrap();
1587        let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1588
1589        let created = store
1590            .create_schedule(
1591                "success".to_string(),
1592                ScheduleTrigger::Interval {
1593                    every_seconds: 60,
1594                    anchor_at: None,
1595                },
1596                true,
1597                ScheduleRunConfig::default(),
1598            )
1599            .await
1600            .unwrap();
1601
1602        store
1603            .update_index(|index| {
1604                let entry = index.schedules.get_mut(&created.id).unwrap();
1605                entry.state.running_run_count = 1;
1606                entry.state.consecutive_failures = 2;
1607                Ok(())
1608            })
1609            .await
1610            .unwrap();
1611
1612        store
1613            .mark_run_terminal(&created.id, "run-success", ScheduleRunStatus::Success, None)
1614            .await
1615            .unwrap();
1616
1617        let updated = store.get_schedule(&created.id).await.unwrap();
1618        assert_eq!(updated.state.running_run_count, 0);
1619        assert!(updated.state.last_finished_at.is_some());
1620        assert!(updated.state.last_success_at.is_some());
1621        assert_eq!(updated.state.total_run_count, 1);
1622        assert_eq!(updated.state.total_success_count, 1);
1623        assert_eq!(updated.state.total_failure_count, 0);
1624        assert_eq!(updated.state.consecutive_failures, 0);
1625    }
1626
1627    #[tokio::test]
1628    async fn mark_run_terminal_records_failure_accounting() {
1629        let dir = tempdir().unwrap();
1630        let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1631
1632        let created = store
1633            .create_schedule(
1634                "failure".to_string(),
1635                ScheduleTrigger::Interval {
1636                    every_seconds: 60,
1637                    anchor_at: None,
1638                },
1639                true,
1640                ScheduleRunConfig::default(),
1641            )
1642            .await
1643            .unwrap();
1644
1645        store
1646            .update_index(|index| {
1647                let entry = index.schedules.get_mut(&created.id).unwrap();
1648                entry.state.running_run_count = 1;
1649                Ok(())
1650            })
1651            .await
1652            .unwrap();
1653
1654        store
1655            .mark_run_terminal(&created.id, "run-failed", ScheduleRunStatus::Failed, None)
1656            .await
1657            .unwrap();
1658        store
1659            .update_index(|index| {
1660                let entry = index.schedules.get_mut(&created.id).unwrap();
1661                entry.state.running_run_count = 1;
1662                Ok(())
1663            })
1664            .await
1665            .unwrap();
1666        store
1667            .mark_run_terminal(
1668                &created.id,
1669                "run-cancelled",
1670                ScheduleRunStatus::Cancelled,
1671                None,
1672            )
1673            .await
1674            .unwrap();
1675
1676        let updated = store.get_schedule(&created.id).await.unwrap();
1677        assert_eq!(updated.state.running_run_count, 0);
1678        assert!(updated.state.last_finished_at.is_some());
1679        assert!(updated.state.last_failure_at.is_some());
1680        assert_eq!(updated.state.total_run_count, 2);
1681        assert_eq!(updated.state.total_success_count, 0);
1682        assert_eq!(updated.state.total_failure_count, 2);
1683        assert_eq!(updated.state.consecutive_failures, 2);
1684    }
1685
1686    #[tokio::test]
1687    async fn mark_run_dequeued_without_start_counts_missed_occurrence() {
1688        let dir = tempdir().unwrap();
1689        let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1690
1691        let created = store
1692            .create_schedule(
1693                "missed".to_string(),
1694                ScheduleTrigger::Interval {
1695                    every_seconds: 60,
1696                    anchor_at: None,
1697                },
1698                true,
1699                ScheduleRunConfig::default(),
1700            )
1701            .await
1702            .unwrap();
1703
1704        store
1705            .update_index(|index| {
1706                let entry = index.schedules.get_mut(&created.id).unwrap();
1707                entry.state.queued_run_count = 1;
1708                Ok(())
1709            })
1710            .await
1711            .unwrap();
1712
1713        store
1714            .mark_run_dequeued_without_start(&created.id, "run-missed", None)
1715            .await
1716            .unwrap();
1717
1718        let updated = store.get_schedule(&created.id).await.unwrap();
1719        assert_eq!(updated.state.queued_run_count, 0);
1720        assert_eq!(updated.state.total_missed_count, 1);
1721        let record = store
1722            .get_run_record("run-missed")
1723            .await
1724            .expect("run record should be created for missed dequeue");
1725        assert_eq!(record.status, ScheduleRunStatus::Missed);
1726        assert!(record.completed_at.is_some());
1727    }
1728
1729    #[tokio::test]
1730    async fn create_run_now_persists_queued_run_record() {
1731        let dir = tempdir().unwrap();
1732        let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1733
1734        let created = store
1735            .create_schedule(
1736                "run-now".to_string(),
1737                ScheduleTrigger::Interval {
1738                    every_seconds: 60,
1739                    anchor_at: None,
1740                },
1741                true,
1742                ScheduleRunConfig::default(),
1743            )
1744            .await
1745            .unwrap();
1746
1747        let claimed = store
1748            .create_run_now(&created.id)
1749            .await
1750            .unwrap()
1751            .expect("run descriptor should be created");
1752
1753        let record = store
1754            .get_run_record(&claimed.run_id)
1755            .await
1756            .expect("queued run record should exist");
1757        assert_eq!(record.schedule_id, created.id);
1758        assert_eq!(record.status, ScheduleRunStatus::Queued);
1759        assert_eq!(record.claimed_at, claimed.claimed_at);
1760        assert_eq!(record.scheduled_for, claimed.scheduled_for);
1761        assert!(!claimed.was_catch_up);
1762    }
1763
1764    #[tokio::test]
1765    async fn mark_run_started_and_terminal_updates_run_record_fields() {
1766        let dir = tempdir().unwrap();
1767        let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1768
1769        let created = store
1770            .create_schedule(
1771                "run-record-lifecycle".to_string(),
1772                ScheduleTrigger::Interval {
1773                    every_seconds: 60,
1774                    anchor_at: None,
1775                },
1776                true,
1777                ScheduleRunConfig::default(),
1778            )
1779            .await
1780            .unwrap();
1781
1782        let claimed = store
1783            .create_run_now(&created.id)
1784            .await
1785            .unwrap()
1786            .expect("run descriptor should be created");
1787
1788        store
1789            .mark_run_started(&created.id, &claimed.run_id)
1790            .await
1791            .unwrap();
1792        store
1793            .bind_run_session(&created.id, &claimed.run_id, "session-1")
1794            .await
1795            .unwrap();
1796        store
1797            .mark_run_terminal(
1798                &created.id,
1799                &claimed.run_id,
1800                ScheduleRunStatus::Success,
1801                Some("ok".to_string()),
1802            )
1803            .await
1804            .unwrap();
1805
1806        let record = store
1807            .get_run_record(&claimed.run_id)
1808            .await
1809            .expect("run record should exist");
1810        assert_eq!(record.status, ScheduleRunStatus::Success);
1811        assert!(record.started_at.is_some());
1812        assert!(record.completed_at.is_some());
1813        assert_eq!(record.session_id.as_deref(), Some("session-1"));
1814        assert!(record.dispatch_lag_ms.is_some());
1815        assert!(record.execution_duration_ms.is_some());
1816        assert_eq!(record.outcome_reason.as_deref(), Some("ok"));
1817    }
1818
1819    #[tokio::test]
1820    async fn delete_schedule_removes_associated_run_records() {
1821        let dir = tempdir().unwrap();
1822        let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1823
1824        let created = store
1825            .create_schedule(
1826                "cleanup-history".to_string(),
1827                ScheduleTrigger::Interval {
1828                    every_seconds: 60,
1829                    anchor_at: None,
1830                },
1831                true,
1832                ScheduleRunConfig::default(),
1833            )
1834            .await
1835            .unwrap();
1836
1837        let claimed = store
1838            .create_run_now(&created.id)
1839            .await
1840            .unwrap()
1841            .expect("run descriptor should be created");
1842        assert!(store.get_run_record(&claimed.run_id).await.is_some());
1843
1844        let deleted = store.delete_schedule(&created.id).await.unwrap();
1845        assert!(deleted);
1846        assert!(store.get_run_record(&claimed.run_id).await.is_none());
1847    }
1848}