1use std::cmp::Reverse;
2use std::collections::HashMap;
3use std::io;
4use std::path::{Path, PathBuf};
5
6use chrono::{DateTime, Duration, Utc};
7use serde::{Deserialize, Deserializer, Serialize};
8use tokio::fs;
9use tokio::sync::{Mutex, RwLock};
10use uuid::Uuid;
11
12use super::trigger_engine::{default_trigger_engine, TriggerEngine};
13use bamboo_domain::{
14 MisFirePolicy, OverlapPolicy, ScheduleRunConfig, ScheduleRunRecord, ScheduleRunStatus,
15 ScheduleSpec, ScheduleState, ScheduleTrigger, ScheduleWindow,
16};
17
18fn other_io_error(message: impl Into<String>) -> io::Error {
19 io::Error::new(io::ErrorKind::Other, message.into())
20}
21
22async fn atomic_write_json(path: &Path, bytes: Vec<u8>) -> io::Result<()> {
23 let tmp = path.with_extension(format!("json.tmp.{}", Uuid::new_v4()));
24
25 {
27 let mut file = fs::File::create(&tmp).await?;
28 tokio::io::AsyncWriteExt::write_all(&mut file, &bytes).await?;
29 file.sync_all().await?;
30 }
31
32 fs::rename(&tmp, path).await?;
33
34 if let Some(parent) = path.parent() {
36 if let Ok(dir) = fs::File::open(parent).await {
37 let _ = dir.sync_all().await;
38 }
39 }
40
41 Ok(())
42}
43
44async fn cleanup_stale_tmp_files(dir: &Path, prefix: &str) {
46 let mut entries = match fs::read_dir(dir).await {
47 Ok(e) => e,
48 Err(_) => return,
49 };
50 while let Ok(Some(entry)) = entries.next_entry().await {
51 if let Some(name) = entry.file_name().to_str() {
52 if name.starts_with(prefix) {
53 tracing::info!("Removing stale temp file: {}", entry.path().display());
54 let _ = fs::remove_file(entry.path()).await;
55 }
56 }
57 }
58}
59
60#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
61pub struct ScheduleEntry {
62 pub id: String,
63 pub name: String,
64 pub enabled: bool,
65 pub trigger: ScheduleTrigger,
66 #[serde(default, skip_serializing_if = "Option::is_none")]
67 pub timezone: Option<String>,
68 #[serde(default, skip_serializing_if = "Option::is_none")]
69 pub start_at: Option<DateTime<Utc>>,
70 #[serde(default, skip_serializing_if = "Option::is_none")]
71 pub end_at: Option<DateTime<Utc>>,
72 #[serde(default)]
73 pub misfire_policy: MisFirePolicy,
74 #[serde(default)]
75 pub overlap_policy: OverlapPolicy,
76 pub created_at: DateTime<Utc>,
77 pub updated_at: DateTime<Utc>,
78 #[serde(default)]
79 pub state: ScheduleState,
80 #[serde(default)]
81 pub run_config: ScheduleRunConfig,
82}
83
84#[derive(Debug, Clone, Deserialize)]
85struct ScheduleEntryCompat {
86 pub id: String,
87 pub name: String,
88 #[serde(default)]
89 pub enabled: bool,
90 #[serde(default)]
91 pub interval_seconds: Option<u64>,
92 #[serde(default, skip_serializing_if = "Option::is_none")]
93 pub trigger: Option<ScheduleTrigger>,
94 #[serde(default, skip_serializing_if = "Option::is_none")]
95 pub timezone: Option<String>,
96 #[serde(default, skip_serializing_if = "Option::is_none")]
97 pub start_at: Option<DateTime<Utc>>,
98 #[serde(default, skip_serializing_if = "Option::is_none")]
99 pub end_at: Option<DateTime<Utc>>,
100 #[serde(default)]
101 pub misfire_policy: MisFirePolicy,
102 #[serde(default)]
103 pub overlap_policy: OverlapPolicy,
104 pub created_at: DateTime<Utc>,
105 pub updated_at: DateTime<Utc>,
106 #[serde(default)]
107 pub state: Option<ScheduleState>,
108 #[serde(default, skip_serializing_if = "Option::is_none")]
109 pub last_run_at: Option<DateTime<Utc>>,
110 #[serde(default, skip_serializing_if = "Option::is_none")]
111 pub next_run_at: Option<DateTime<Utc>>,
112 #[serde(default)]
113 pub queued_run_count: u32,
114 #[serde(default)]
115 pub running_run_count: u32,
116 #[serde(default)]
117 pub run_config: ScheduleRunConfig,
118}
119
120impl ScheduleEntry {
121 fn from_compat(raw: ScheduleEntryCompat) -> Result<Self, String> {
122 let trigger = match raw.trigger.clone() {
123 Some(ScheduleTrigger::Interval {
124 every_seconds,
125 anchor_at,
126 }) => {
127 let every_seconds = raw.interval_seconds.unwrap_or(every_seconds);
128 ScheduleTrigger::Interval {
129 every_seconds,
130 anchor_at: anchor_at
131 .or_else(|| derive_legacy_anchor_at(&raw, every_seconds))
132 .or(Some(raw.created_at)),
133 }
134 }
135 Some(other) => other,
136 None => {
137 let every_seconds = raw.interval_seconds.ok_or_else(|| {
138 format!("schedule entry {} missing trigger definition", raw.id)
139 })?;
140 ScheduleTrigger::legacy_interval(
141 every_seconds,
142 derive_legacy_anchor_at(&raw, every_seconds).or(Some(raw.created_at)),
143 )
144 }
145 };
146
147 let mut state = raw.state.unwrap_or_else(|| ScheduleState {
148 next_fire_at: raw.next_run_at,
149 last_scheduled_at: raw.last_run_at,
150 queued_run_count: raw.queued_run_count,
151 running_run_count: raw.running_run_count,
152 ..Default::default()
153 });
154 if state.next_fire_at.is_none() {
155 state.next_fire_at = raw.next_run_at;
156 }
157 if state.last_scheduled_at.is_none() {
158 state.last_scheduled_at = raw.last_run_at;
159 }
160
161 Ok(Self {
162 id: raw.id,
163 name: raw.name,
164 enabled: raw.enabled,
165 trigger,
166 timezone: raw.timezone,
167 start_at: raw.start_at,
168 end_at: raw.end_at,
169 misfire_policy: raw.misfire_policy,
170 overlap_policy: raw.overlap_policy,
171 created_at: raw.created_at,
172 updated_at: raw.updated_at,
173 state,
174 run_config: raw.run_config,
175 })
176 }
177
178 pub fn derived_anchor_at(&self) -> Option<DateTime<Utc>> {
179 let every_seconds = interval_seconds_from_trigger(&self.trigger)?;
180 if let ScheduleTrigger::Interval {
181 anchor_at: Some(anchor_at),
182 ..
183 } = &self.trigger
184 {
185 return Some(*anchor_at);
186 }
187 self.state
188 .last_scheduled_at
189 .or_else(|| {
190 self.state
191 .next_fire_at
192 .map(|next| next - Duration::seconds(every_seconds as i64))
193 })
194 .or(Some(self.created_at))
195 }
196
197 pub fn to_schedule_spec(&self) -> ScheduleSpec {
198 ScheduleSpec {
199 id: self.id.clone(),
200 name: self.name.clone(),
201 enabled: self.enabled,
202 trigger: self.trigger.clone(),
203 timezone: self.timezone.clone(),
204 start_at: self.start_at,
205 end_at: self.end_at,
206 misfire_policy: self.misfire_policy,
207 overlap_policy: self.overlap_policy,
208 run_config: self.run_config.clone(),
209 created_at: self.created_at,
210 updated_at: self.updated_at,
211 }
212 }
213
214 pub fn to_schedule_state(&self) -> ScheduleState {
215 self.state.clone()
216 }
217}
218
219impl<'de> Deserialize<'de> for ScheduleEntry {
220 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
221 where
222 D: Deserializer<'de>,
223 {
224 let raw = ScheduleEntryCompat::deserialize(deserializer)?;
225 ScheduleEntry::from_compat(raw).map_err(serde::de::Error::custom)
226 }
227}
228
229fn derive_legacy_anchor_at(raw: &ScheduleEntryCompat, every_seconds: u64) -> Option<DateTime<Utc>> {
230 if let Some(ScheduleTrigger::Interval {
231 anchor_at: Some(anchor_at),
232 ..
233 }) = raw.trigger.as_ref()
234 {
235 return Some(*anchor_at);
236 }
237
238 raw.state
239 .as_ref()
240 .and_then(|state| state.last_scheduled_at)
241 .or(raw.last_run_at)
242 .or_else(|| {
243 raw.state
244 .as_ref()
245 .and_then(|state| state.next_fire_at)
246 .or(raw.next_run_at)
247 .map(|next| next - Duration::seconds(every_seconds as i64))
248 })
249}
250
251fn interval_seconds_from_trigger(trigger: &ScheduleTrigger) -> Option<u64> {
252 match trigger {
253 ScheduleTrigger::Interval { every_seconds, .. } => Some(*every_seconds),
254 _ => None,
255 }
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
259struct SchedulesIndex {
260 version: u32,
261 updated_at: DateTime<Utc>,
262 schedules: HashMap<String, ScheduleEntry>,
263 #[serde(default)]
264 run_records: HashMap<String, ScheduleRunRecord>,
265}
266
267impl SchedulesIndex {
268 fn empty() -> Self {
269 Self {
270 version: 4,
271 updated_at: Utc::now(),
272 schedules: HashMap::new(),
273 run_records: HashMap::new(),
274 }
275 }
276}
277
278#[derive(Debug, Clone, Default)]
279pub struct ScheduleDefinitionChanges {
280 pub trigger: Option<ScheduleTrigger>,
281 pub timezone: Option<String>,
282 pub start_at: Option<DateTime<Utc>>,
283 pub end_at: Option<DateTime<Utc>>,
284 pub misfire_policy: Option<MisFirePolicy>,
285 pub overlap_policy: Option<OverlapPolicy>,
286}
287
288fn normalize_optional_string(value: Option<String>) -> Option<String> {
289 value
290 .map(|value| value.trim().to_string())
291 .filter(|value| !value.is_empty())
292}
293
294fn definition_window(definition: &ScheduleDefinitionChanges) -> ScheduleWindow {
295 ScheduleWindow {
296 start_at: definition.start_at,
297 end_at: definition.end_at,
298 }
299}
300
301fn compute_initial_next_run_at(
302 trigger: &ScheduleTrigger,
303 timezone: Option<&str>,
304 window: &ScheduleWindow,
305 now: DateTime<Utc>,
306) -> io::Result<DateTime<Utc>> {
307 let engine = default_trigger_engine();
308 let runtime_timezone = match trigger {
309 ScheduleTrigger::Interval { .. } => None,
310 _ => timezone,
311 };
312 engine
313 .next_after(trigger, runtime_timezone, now, window)
314 .map_err(|error| other_io_error(format!("failed to compute initial next run: {error}")))?
315 .ok_or_else(|| other_io_error("schedule has no next run within configured window"))
316}
317
318fn normalize_trigger_for_storage(
319 trigger: ScheduleTrigger,
320 anchor_at: DateTime<Utc>,
321) -> ScheduleTrigger {
322 match trigger {
323 ScheduleTrigger::Interval {
324 every_seconds,
325 anchor_at: existing_anchor,
326 } => ScheduleTrigger::Interval {
327 every_seconds,
328 anchor_at: existing_anchor.or(Some(anchor_at)),
329 },
330 other => other,
331 }
332}
333
334fn normalize_loaded_index(mut index: SchedulesIndex) -> (SchedulesIndex, bool) {
335 let mut changed = false;
336 if index.version < 4 {
337 index.version = 4;
338 changed = true;
339 }
340 for entry in index.schedules.values_mut() {
341 changed |= normalize_loaded_schedule_entry(entry);
342 }
343 if changed {
344 index.updated_at = Utc::now();
345 }
346 (index, changed)
347}
348
349fn normalize_loaded_schedule_entry(entry: &mut ScheduleEntry) -> bool {
350 let mut changed = false;
351
352 let normalized_timezone = normalize_optional_string(entry.timezone.clone());
353 if entry.timezone != normalized_timezone {
354 entry.timezone = normalized_timezone;
355 changed = true;
356 }
357
358 let derived_anchor_at = entry.derived_anchor_at();
359 if let ScheduleTrigger::Interval {
360 every_seconds,
361 anchor_at,
362 } = &mut entry.trigger
363 {
364 if anchor_at.is_none() {
365 if let Some(derived_anchor_at) = derived_anchor_at {
366 *anchor_at = Some(derived_anchor_at);
367 changed = true;
368 }
369 }
370 if *every_seconds == 0 {
371 *every_seconds = 1;
372 changed = true;
373 }
374 }
375
376 if entry.state.next_fire_at.is_none() {
377 entry.state.next_fire_at = Some(entry.created_at);
378 changed = true;
379 }
380
381 changed
382}
383
384fn current_due_at(entry: &ScheduleEntry) -> Option<DateTime<Utc>> {
385 let mut due_at = entry.state.next_fire_at?;
386 if let Some(start_at) = entry.start_at {
387 if due_at < start_at {
388 due_at = start_at;
389 }
390 }
391 if let Some(end_at) = entry.end_at {
392 if due_at > end_at {
393 return None;
394 }
395 }
396 Some(due_at)
397}
398
399fn overlap_blocks_dispatch(entry: &ScheduleEntry) -> bool {
400 match entry.overlap_policy {
401 OverlapPolicy::Allow => false,
402 OverlapPolicy::Skip => entry.state.running_run_count > 0,
403 OverlapPolicy::QueueOne => entry.state.queued_run_count > 0,
404 }
405}
406
407fn apply_overlap_dispatch_limit(entry: &ScheduleEntry, dispatch_count: u32) -> u32 {
408 match entry.overlap_policy {
409 OverlapPolicy::Allow | OverlapPolicy::Skip => dispatch_count,
410 OverlapPolicy::QueueOne => dispatch_count.min(1),
411 }
412}
413
414fn compute_misfire_dispatch_count(entry: &ScheduleEntry, now: DateTime<Utc>) -> u32 {
415 let Some(next_fire_at) = entry.state.next_fire_at else {
416 return 0;
417 };
418 let lateness_seconds = now.signed_duration_since(next_fire_at).num_seconds().max(0) as u64;
419 let interval_seconds = interval_seconds_from_trigger(&entry.trigger).unwrap_or(0);
420
421 match entry.misfire_policy {
422 MisFirePolicy::RunOnce => 1,
423 MisFirePolicy::Skip => 0,
424 MisFirePolicy::CatchUpAll => lateness_seconds
425 .checked_div(interval_seconds)
426 .map(|q| q.saturating_add(1) as u32)
427 .unwrap_or(1),
428 MisFirePolicy::CatchUpWindow {
429 max_catch_up_runs,
430 max_lateness_seconds,
431 } => {
432 if lateness_seconds > max_lateness_seconds {
433 0
434 } else {
435 lateness_seconds
436 .checked_div(interval_seconds)
437 .map(|q| (q.saturating_add(1) as u32).min(max_catch_up_runs.max(1)))
438 .unwrap_or(1)
439 }
440 }
441 }
442}
443
444fn runtime_trigger(entry: &ScheduleEntry) -> ScheduleTrigger {
445 match &entry.trigger {
446 ScheduleTrigger::Interval { every_seconds, .. } => {
447 ScheduleTrigger::legacy_interval(*every_seconds, None)
448 }
449 other => other.clone(),
450 }
451}
452
453fn runtime_timezone<'a>(entry: &'a ScheduleEntry, trigger: &ScheduleTrigger) -> Option<&'a str> {
454 match trigger {
455 ScheduleTrigger::Interval { .. } => None,
456 _ => entry.timezone.as_deref(),
457 }
458}
459
460fn compute_next_run_at_with_engine(
461 entry: &ScheduleEntry,
462 now: DateTime<Utc>,
463 engine: &dyn TriggerEngine,
464) -> io::Result<Option<DateTime<Utc>>> {
465 let trigger = runtime_trigger(entry);
466 let window = ScheduleWindow {
467 start_at: entry.start_at,
468 end_at: entry.end_at,
469 };
470 engine
471 .next_after(&trigger, runtime_timezone(entry, &trigger), now, &window)
472 .map_err(|error| {
473 other_io_error(format!(
474 "failed to compute next fire for schedule {}: {}",
475 entry.id, error
476 ))
477 })
478}
479
480fn record_missed_occurrence(entry: &mut ScheduleEntry) {
481 entry.state.total_missed_count = entry.state.total_missed_count.saturating_add(1);
482}
483
484fn non_negative_duration_ms(from: DateTime<Utc>, to: DateTime<Utc>) -> u64 {
485 to.signed_duration_since(from).num_milliseconds().max(0) as u64
486}
487
488fn make_queued_run_record(
489 schedule_id: &str,
490 scheduled_for: DateTime<Utc>,
491 claimed_at: DateTime<Utc>,
492 was_catch_up: bool,
493) -> ScheduleRunRecord {
494 ScheduleRunRecord {
495 run_id: Uuid::new_v4().to_string(),
496 schedule_id: schedule_id.to_string(),
497 scheduled_for,
498 claimed_at,
499 started_at: None,
500 completed_at: None,
501 status: ScheduleRunStatus::Queued,
502 outcome_reason: None,
503 session_id: None,
504 dispatch_lag_ms: None,
505 execution_duration_ms: None,
506 was_catch_up,
507 }
508}
509
510fn make_fallback_run_record(
511 run_id: &str,
512 schedule_id: &str,
513 now: DateTime<Utc>,
514 status: ScheduleRunStatus,
515) -> ScheduleRunRecord {
516 ScheduleRunRecord {
517 run_id: run_id.to_string(),
518 schedule_id: schedule_id.to_string(),
519 scheduled_for: now,
520 claimed_at: now,
521 started_at: matches!(status, ScheduleRunStatus::Running).then_some(now),
522 completed_at: matches!(
523 status,
524 ScheduleRunStatus::Success
525 | ScheduleRunStatus::Failed
526 | ScheduleRunStatus::Skipped
527 | ScheduleRunStatus::Missed
528 | ScheduleRunStatus::Cancelled
529 )
530 .then_some(now),
531 status,
532 outcome_reason: None,
533 session_id: None,
534 dispatch_lag_ms: None,
535 execution_duration_ms: None,
536 was_catch_up: false,
537 }
538}
539
540fn scheduled_for_dispatch(
541 entry: &ScheduleEntry,
542 due_at: DateTime<Utc>,
543 dispatch_index: u32,
544) -> DateTime<Utc> {
545 match interval_seconds_from_trigger(&entry.trigger) {
546 Some(interval_seconds) if dispatch_index > 0 => {
547 due_at + Duration::seconds(interval_seconds as i64 * dispatch_index as i64)
548 }
549 _ => due_at,
550 }
551}
552
553fn update_run_record_started(
554 record: &mut ScheduleRunRecord,
555 started_at: DateTime<Utc>,
556 session_id: Option<&str>,
557) {
558 record.status = ScheduleRunStatus::Running;
559 record.started_at = Some(started_at);
560 record.dispatch_lag_ms = Some(non_negative_duration_ms(record.scheduled_for, started_at));
561 if let Some(session_id) = session_id {
562 record.session_id = Some(session_id.to_string());
563 }
564}
565
566fn update_run_record_terminal(
567 record: &mut ScheduleRunRecord,
568 status: ScheduleRunStatus,
569 completed_at: DateTime<Utc>,
570 outcome_reason: Option<String>,
571 session_id: Option<&str>,
572) {
573 record.status = status;
574 record.completed_at = Some(completed_at);
575 if record.dispatch_lag_ms.is_none() {
576 record.dispatch_lag_ms = Some(non_negative_duration_ms(record.scheduled_for, completed_at));
577 }
578 if let Some(started_at) = record.started_at {
579 record.execution_duration_ms = Some(non_negative_duration_ms(started_at, completed_at));
580 }
581 if let Some(outcome_reason) = normalize_optional_string(outcome_reason) {
582 record.outcome_reason = Some(outcome_reason);
583 }
584 if let Some(session_id) = session_id {
585 record.session_id = Some(session_id.to_string());
586 }
587}
588
589fn apply_terminal_run_status(
590 entry: &mut ScheduleEntry,
591 status: ScheduleRunStatus,
592 finished_at: DateTime<Utc>,
593) -> io::Result<()> {
594 entry.state.running_run_count = entry.state.running_run_count.saturating_sub(1);
595 entry.state.last_finished_at = Some(finished_at);
596
597 match status {
598 ScheduleRunStatus::Success => {
599 entry.state.last_success_at = Some(finished_at);
600 entry.state.total_run_count = entry.state.total_run_count.saturating_add(1);
601 entry.state.total_success_count = entry.state.total_success_count.saturating_add(1);
602 entry.state.consecutive_failures = 0;
603 Ok(())
604 }
605 ScheduleRunStatus::Failed | ScheduleRunStatus::Cancelled => {
606 entry.state.last_failure_at = Some(finished_at);
607 entry.state.total_run_count = entry.state.total_run_count.saturating_add(1);
608 entry.state.total_failure_count = entry.state.total_failure_count.saturating_add(1);
609 entry.state.consecutive_failures = entry.state.consecutive_failures.saturating_add(1);
610 Ok(())
611 }
612 ScheduleRunStatus::Skipped => Ok(()),
613 ScheduleRunStatus::Missed | ScheduleRunStatus::Queued | ScheduleRunStatus::Running => {
614 Err(other_io_error(format!(
615 "non-terminal or unsupported run status for lifecycle accounting: {:?}",
616 status
617 )))
618 }
619 }
620}
621
622#[derive(Debug, Clone)]
623pub struct ClaimedScheduleRun {
624 pub run_id: String,
625 pub schedule_id: String,
626 pub schedule_name: String,
627 pub run_config: ScheduleRunConfig,
628 pub scheduled_for: DateTime<Utc>,
629 pub claimed_at: DateTime<Utc>,
630 pub was_catch_up: bool,
631}
632
633#[derive(Debug)]
634pub struct ScheduleStore {
635 index_path: PathBuf,
636 index: RwLock<SchedulesIndex>,
637 write_lock: Mutex<()>,
638}
639
640impl ScheduleStore {
641 pub async fn new(bamboo_home_dir: PathBuf) -> io::Result<Self> {
642 let index_path = bamboo_home_dir.join("schedules.json");
643
644 let (index, needs_backfill_write) = if index_path.exists() {
645 let raw = fs::read_to_string(&index_path).await?;
646 match serde_json::from_str::<SchedulesIndex>(&raw) {
647 Ok(parsed) => normalize_loaded_index(parsed),
648 Err(e) => {
649 let backup_path =
652 index_path.with_extension(format!("json.corrupted.{}", Uuid::new_v4()));
653 tracing::error!(
654 "schedules.json is corrupted ({}). Backing up to {} and resetting.",
655 e,
656 backup_path.display()
657 );
658 if let Err(rename_err) = fs::rename(&index_path, &backup_path).await {
659 tracing::warn!(
660 "Failed to back up corrupted schedules.json: {}",
661 rename_err
662 );
663 }
664 let fresh = SchedulesIndex::empty();
665 atomic_write_json(
666 &index_path,
667 serde_json::to_vec_pretty(&fresh)
668 .map_err(|e| other_io_error(e.to_string()))?,
669 )
670 .await?;
671 (fresh, false)
672 }
673 }
674 } else {
675 let index = SchedulesIndex::empty();
676 atomic_write_json(
677 &index_path,
678 serde_json::to_vec_pretty(&index).map_err(|e| other_io_error(e.to_string()))?,
679 )
680 .await?;
681 (index, false)
682 };
683
684 if needs_backfill_write {
685 atomic_write_json(
686 &index_path,
687 serde_json::to_vec_pretty(&index).map_err(|e| other_io_error(e.to_string()))?,
688 )
689 .await?;
690 }
691
692 cleanup_stale_tmp_files(&bamboo_home_dir, "schedules.json.tmp.").await;
694
695 Ok(Self {
696 index_path,
697 index: RwLock::new(index),
698 write_lock: Mutex::new(()),
699 })
700 }
701
702 pub fn index_path(&self) -> &Path {
703 &self.index_path
704 }
705
706 async fn update_index<F, T>(&self, f: F) -> io::Result<T>
707 where
708 F: FnOnce(&mut SchedulesIndex) -> io::Result<T>,
709 {
710 let _guard = self.write_lock.lock().await;
711 let mut index = self.index.write().await;
712 let out = f(&mut index)?;
713 index.updated_at = Utc::now();
714 atomic_write_json(
715 &self.index_path,
716 serde_json::to_vec_pretty(&*index).map_err(|e| other_io_error(e.to_string()))?,
717 )
718 .await?;
719 Ok(out)
720 }
721
722 pub async fn list_schedules(&self) -> Vec<ScheduleEntry> {
723 let index = self.index.read().await;
724 let mut items: Vec<_> = index.schedules.values().cloned().collect();
725 items.sort_by_key(|e| Reverse(e.updated_at));
726 items
727 }
728
729 pub async fn get_schedule(&self, id: &str) -> Option<ScheduleEntry> {
730 let index = self.index.read().await;
731 index.schedules.get(id).cloned()
732 }
733
734 pub async fn get_run_record(&self, run_id: &str) -> Option<ScheduleRunRecord> {
735 let index = self.index.read().await;
736 index.run_records.get(run_id).cloned()
737 }
738
739 pub async fn list_run_records_for_schedule(&self, schedule_id: &str) -> Vec<ScheduleRunRecord> {
740 let index = self.index.read().await;
741 let mut items = index
742 .run_records
743 .values()
744 .filter(|record| record.schedule_id == schedule_id)
745 .cloned()
746 .collect::<Vec<_>>();
747 items.sort_by_key(|r| Reverse(r.claimed_at));
748 items
749 }
750
751 pub async fn create_schedule(
752 &self,
753 name: String,
754 trigger: ScheduleTrigger,
755 enabled: bool,
756 run_config: ScheduleRunConfig,
757 ) -> io::Result<ScheduleEntry> {
758 self.create_schedule_with_definition(
759 name,
760 enabled,
761 run_config,
762 ScheduleDefinitionChanges {
763 trigger: Some(trigger),
764 ..Default::default()
765 },
766 )
767 .await
768 }
769
770 pub async fn create_schedule_with_definition(
771 &self,
772 name: String,
773 enabled: bool,
774 run_config: ScheduleRunConfig,
775 definition: ScheduleDefinitionChanges,
776 ) -> io::Result<ScheduleEntry> {
777 let now = Utc::now();
778 let id = Uuid::new_v4().to_string();
779 let window = definition_window(&definition);
780 let trigger = normalize_trigger_for_storage(
781 definition
782 .trigger
783 .ok_or_else(|| other_io_error("schedule trigger is required"))?,
784 now,
785 );
786 let timezone = normalize_optional_string(definition.timezone);
787 let next_fire_at =
788 compute_initial_next_run_at(&trigger, timezone.as_deref(), &window, now)?;
789 let entry = ScheduleEntry {
790 id: id.clone(),
791 name,
792 enabled,
793 trigger,
794 timezone,
795 start_at: definition.start_at,
796 end_at: definition.end_at,
797 misfire_policy: definition.misfire_policy.unwrap_or_default(),
798 overlap_policy: definition.overlap_policy.unwrap_or_default(),
799 created_at: now,
800 updated_at: now,
801 state: ScheduleState {
802 next_fire_at: Some(next_fire_at),
803 ..Default::default()
804 },
805 run_config,
806 };
807
808 self.update_index(|index| {
809 index.schedules.insert(id.clone(), entry.clone());
810 Ok(entry.clone())
811 })
812 .await
813 }
814
815 pub async fn patch_schedule(
816 &self,
817 id: &str,
818 name: Option<String>,
819 enabled: Option<bool>,
820 trigger: Option<ScheduleTrigger>,
821 run_config: Option<ScheduleRunConfig>,
822 ) -> io::Result<Option<ScheduleEntry>> {
823 self.patch_schedule_with_definition(
824 id,
825 name,
826 enabled,
827 run_config,
828 ScheduleDefinitionChanges {
829 trigger,
830 ..Default::default()
831 },
832 )
833 .await
834 }
835
836 pub async fn patch_schedule_with_definition(
837 &self,
838 id: &str,
839 name: Option<String>,
840 enabled: Option<bool>,
841 run_config: Option<ScheduleRunConfig>,
842 definition: ScheduleDefinitionChanges,
843 ) -> io::Result<Option<ScheduleEntry>> {
844 self.update_index(|index| {
845 let Some(existing) = index.schedules.get_mut(id) else {
846 return Ok(None);
847 };
848 let now = Utc::now();
849 if let Some(name) = name {
850 existing.name = name;
851 }
852 if let Some(enabled) = enabled {
853 existing.enabled = enabled;
854 }
855
856 let trigger = normalize_trigger_for_storage(
857 definition
858 .trigger
859 .clone()
860 .unwrap_or_else(|| existing.trigger.clone()),
861 existing.derived_anchor_at().unwrap_or(now),
862 );
863 existing.trigger = trigger.clone();
864
865 if let Some(timezone) = definition.timezone {
866 existing.timezone = normalize_optional_string(Some(timezone));
867 }
868 if let Some(start_at) = definition.start_at {
869 existing.start_at = Some(start_at);
870 }
871 if let Some(end_at) = definition.end_at {
872 existing.end_at = Some(end_at);
873 }
874 if let Some(misfire_policy) = definition.misfire_policy {
875 existing.misfire_policy = misfire_policy;
876 }
877 if let Some(overlap_policy) = definition.overlap_policy {
878 existing.overlap_policy = overlap_policy;
879 }
880 if let Some(run_config) = run_config {
881 existing.run_config = run_config;
882 }
883
884 let window = ScheduleWindow {
885 start_at: existing.start_at,
886 end_at: existing.end_at,
887 };
888 existing.state.next_fire_at = Some(compute_initial_next_run_at(
889 &trigger,
890 existing.timezone.as_deref(),
891 &window,
892 now,
893 )?);
894 existing.updated_at = now;
895 Ok(Some(existing.clone()))
896 })
897 .await
898 }
899
900 pub async fn delete_schedule(&self, id: &str) -> io::Result<bool> {
901 self.update_index(|index| {
902 let deleted = index.schedules.remove(id).is_some();
903 if deleted {
904 index
905 .run_records
906 .retain(|_, record| record.schedule_id != id);
907 }
908 Ok(deleted)
909 })
910 .await
911 }
912
913 pub async fn claim_due_runs(&self, now: DateTime<Utc>) -> io::Result<Vec<ClaimedScheduleRun>> {
921 let engine = default_trigger_engine();
922 self.claim_due_runs_with_engine(now, engine.as_ref()).await
923 }
924
925 pub async fn claim_due_runs_with_engine(
926 &self,
927 now: DateTime<Utc>,
928 engine: &dyn TriggerEngine,
929 ) -> io::Result<Vec<ClaimedScheduleRun>> {
930 {
931 let index = self.index.read().await;
932 let any_due = index.schedules.values().any(|entry| {
933 entry.enabled && current_due_at(entry).is_some_and(|due_at| due_at <= now)
934 });
935 if !any_due {
936 return Ok(Vec::new());
937 }
938 }
939
940 self.update_index(|index| {
941 let mut out = Vec::new();
942 let (schedules, run_records) = (&mut index.schedules, &mut index.run_records);
943 for entry in schedules.values_mut() {
944 if !entry.enabled {
945 continue;
946 }
947 let Some(due_at) = current_due_at(entry) else {
948 continue;
949 };
950 if due_at > now {
951 continue;
952 }
953
954 let dispatch_count = compute_misfire_dispatch_count(entry, now);
955 if dispatch_count == 0 {
956 entry.state.last_scheduled_at = Some(now);
957 record_missed_occurrence(entry);
958 match compute_next_run_at_with_engine(entry, now, engine) {
959 Ok(Some(next_fire_at)) => entry.state.next_fire_at = Some(next_fire_at),
960 Ok(None) => {
961 entry.state.next_fire_at = None;
962 entry.enabled = false;
963 }
964 Err(error) => {
965 tracing::warn!(
966 "failed to compute next scheduled fire for {} after misfire skip: {}. falling back to legacy interval semantics",
967 entry.id,
968 error
969 );
970 let fallback = interval_seconds_from_trigger(&entry.trigger).unwrap_or(60);
971 entry.state.next_fire_at = Some(now + Duration::seconds(fallback as i64));
972 }
973 }
974 entry.updated_at = now;
975 continue;
976 }
977
978 let blocked = overlap_blocks_dispatch(entry);
979 match entry.overlap_policy {
980 OverlapPolicy::Skip if blocked => {
981 entry.state.last_scheduled_at = Some(now);
982 record_missed_occurrence(entry);
983 match compute_next_run_at_with_engine(entry, now, engine) {
984 Ok(Some(next_fire_at)) => entry.state.next_fire_at = Some(next_fire_at),
985 Ok(None) => {
986 entry.state.next_fire_at = None;
987 entry.enabled = false;
988 }
989 Err(error) => {
990 tracing::warn!(
991 "failed to compute next scheduled fire for {} after overlap skip: {}. falling back to legacy interval semantics",
992 entry.id,
993 error
994 );
995 let fallback = interval_seconds_from_trigger(&entry.trigger).unwrap_or(60);
996 entry.state.next_fire_at = Some(now + Duration::seconds(fallback as i64));
997 }
998 }
999 entry.updated_at = now;
1000 continue;
1001 }
1002 OverlapPolicy::QueueOne if blocked => {
1003 continue;
1004 }
1005 _ => {}
1006 }
1007
1008 let dispatch_count = apply_overlap_dispatch_limit(entry, dispatch_count);
1009 entry.state.last_scheduled_at = Some(now);
1010 match compute_next_run_at_with_engine(entry, now, engine) {
1011 Ok(Some(next_fire_at)) => {
1012 entry.state.next_fire_at = Some(next_fire_at);
1013 }
1014 Ok(None) => {
1015 entry.state.next_fire_at = None;
1016 entry.enabled = false;
1017 }
1018 Err(error) => {
1019 tracing::warn!(
1020 "failed to compute next scheduled fire for {}: {}. falling back to legacy interval semantics",
1021 entry.id,
1022 error
1023 );
1024 let fallback = interval_seconds_from_trigger(&entry.trigger).unwrap_or(60);
1025 entry.state.next_fire_at = Some(now + Duration::seconds(fallback as i64));
1026 }
1027 }
1028 entry.state.queued_run_count = entry.state.queued_run_count.saturating_add(dispatch_count);
1029 entry.updated_at = now;
1030 for dispatch_index in 0..dispatch_count {
1031 let scheduled_for = scheduled_for_dispatch(entry, due_at, dispatch_index);
1032 let was_catch_up = scheduled_for < now;
1033 let record = make_queued_run_record(&entry.id, scheduled_for, now, was_catch_up);
1034 let run_id = record.run_id.clone();
1035 run_records.insert(run_id.clone(), record);
1036 out.push(ClaimedScheduleRun {
1037 run_id,
1038 schedule_id: entry.id.clone(),
1039 schedule_name: entry.name.clone(),
1040 run_config: entry.run_config.clone(),
1041 scheduled_for,
1042 claimed_at: now,
1043 was_catch_up,
1044 });
1045 }
1046 }
1047 Ok(out)
1048 })
1049 .await
1050 }
1051
1052 pub async fn mark_run_started(&self, schedule_id: &str, run_id: &str) -> io::Result<()> {
1053 self.update_index(|index| {
1054 let now = Utc::now();
1055 if let Some(entry) = index.schedules.get_mut(schedule_id) {
1056 entry.state.queued_run_count = entry.state.queued_run_count.saturating_sub(1);
1057 entry.state.running_run_count = entry.state.running_run_count.saturating_add(1);
1058 entry.state.last_started_at = Some(now);
1059 entry.updated_at = now;
1060 }
1061 let record = index
1062 .run_records
1063 .entry(run_id.to_string())
1064 .or_insert_with(|| {
1065 make_fallback_run_record(run_id, schedule_id, now, ScheduleRunStatus::Queued)
1066 });
1067 update_run_record_started(record, now, None);
1068 Ok(())
1069 })
1070 .await
1071 }
1072
1073 pub async fn bind_run_session(
1074 &self,
1075 schedule_id: &str,
1076 run_id: &str,
1077 session_id: &str,
1078 ) -> io::Result<()> {
1079 self.update_index(|index| {
1080 let now = Utc::now();
1081 let record = index
1082 .run_records
1083 .entry(run_id.to_string())
1084 .or_insert_with(|| {
1085 make_fallback_run_record(run_id, schedule_id, now, ScheduleRunStatus::Running)
1086 });
1087 record.session_id = Some(session_id.to_string());
1088 Ok(())
1089 })
1090 .await
1091 }
1092
1093 pub async fn mark_run_terminal(
1094 &self,
1095 schedule_id: &str,
1096 run_id: &str,
1097 status: ScheduleRunStatus,
1098 outcome_reason: Option<String>,
1099 ) -> io::Result<()> {
1100 self.update_index(|index| {
1101 let now = Utc::now();
1102 if let Some(entry) = index.schedules.get_mut(schedule_id) {
1103 apply_terminal_run_status(entry, status, now)?;
1104 entry.updated_at = now;
1105 }
1106 let record = index
1107 .run_records
1108 .entry(run_id.to_string())
1109 .or_insert_with(|| make_fallback_run_record(run_id, schedule_id, now, status));
1110 update_run_record_terminal(record, status, now, outcome_reason, None);
1111 Ok(())
1112 })
1113 .await
1114 }
1115
1116 pub async fn mark_run_dequeued_without_start(
1117 &self,
1118 schedule_id: &str,
1119 run_id: &str,
1120 outcome_reason: Option<String>,
1121 ) -> io::Result<()> {
1122 self.update_index(|index| {
1123 let now = Utc::now();
1124 if let Some(entry) = index.schedules.get_mut(schedule_id) {
1125 entry.state.queued_run_count = entry.state.queued_run_count.saturating_sub(1);
1126 record_missed_occurrence(entry);
1127 entry.updated_at = now;
1128 }
1129 let record = index
1130 .run_records
1131 .entry(run_id.to_string())
1132 .or_insert_with(|| {
1133 make_fallback_run_record(run_id, schedule_id, now, ScheduleRunStatus::Queued)
1134 });
1135 update_run_record_terminal(
1136 record,
1137 ScheduleRunStatus::Missed,
1138 now,
1139 outcome_reason,
1140 None,
1141 );
1142 Ok(())
1143 })
1144 .await
1145 }
1146
1147 pub async fn create_run_now(&self, id: &str) -> io::Result<Option<ClaimedScheduleRun>> {
1149 self.update_index(|index| {
1150 let Some(entry) = index.schedules.get(id).cloned() else {
1151 return Ok(None);
1152 };
1153 let now = Utc::now();
1154 let record = make_queued_run_record(&entry.id, now, now, false);
1155 let run_id = record.run_id.clone();
1156 index.run_records.insert(run_id.clone(), record);
1157 Ok(Some(ClaimedScheduleRun {
1158 run_id,
1159 schedule_id: entry.id,
1160 schedule_name: entry.name,
1161 run_config: entry.run_config,
1162 scheduled_for: now,
1163 claimed_at: now,
1164 was_catch_up: false,
1165 }))
1166 })
1167 .await
1168 }
1169}
1170
1171#[cfg(test)]
1172mod tests {
1173 use super::*;
1174 use tempfile::tempdir;
1175
1176 #[tokio::test]
1177 async fn store_backfills_legacy_interval_trigger_on_load() {
1178 let dir = tempdir().unwrap();
1179 let now = DateTime::parse_from_rfc3339("2026-04-04T10:00:00Z")
1180 .unwrap()
1181 .with_timezone(&Utc);
1182 let next_run_at = DateTime::parse_from_rfc3339("2026-04-04T11:00:00Z")
1183 .unwrap()
1184 .with_timezone(&Utc);
1185
1186 let raw = serde_json::json!({
1187 "version": 1,
1188 "updated_at": now,
1189 "schedules": {
1190 "legacy-1": {
1191 "id": "legacy-1",
1192 "name": "legacy",
1193 "enabled": true,
1194 "interval_seconds": 3600,
1195 "created_at": now,
1196 "updated_at": now,
1197 "last_run_at": null,
1198 "next_run_at": next_run_at,
1199 "run_config": { "auto_execute": false }
1200 }
1201 }
1202 });
1203 tokio::fs::write(
1204 dir.path().join("schedules.json"),
1205 serde_json::to_vec_pretty(&raw).unwrap(),
1206 )
1207 .await
1208 .unwrap();
1209
1210 let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1211 let schedule = store.get_schedule("legacy-1").await.unwrap();
1212 assert!(matches!(
1213 schedule.trigger,
1214 ScheduleTrigger::Interval {
1215 every_seconds: 3600,
1216 ..
1217 }
1218 ));
1219 }
1220
1221 #[tokio::test]
1222 async fn create_schedule_with_definition_persists_interval_trigger_metadata() {
1223 let dir = tempdir().unwrap();
1224 let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1225
1226 let created = store
1227 .create_schedule_with_definition(
1228 "interval".to_string(),
1229 true,
1230 ScheduleRunConfig::default(),
1231 ScheduleDefinitionChanges {
1232 trigger: Some(ScheduleTrigger::Interval {
1233 every_seconds: 300,
1234 anchor_at: None,
1235 }),
1236 timezone: Some("Asia/Shanghai".to_string()),
1237 misfire_policy: Some(MisFirePolicy::RunOnce),
1238 overlap_policy: Some(OverlapPolicy::QueueOne),
1239 ..Default::default()
1240 },
1241 )
1242 .await
1243 .unwrap();
1244
1245 assert!(matches!(
1246 created.trigger,
1247 ScheduleTrigger::Interval {
1248 every_seconds: 300,
1249 ..
1250 }
1251 ));
1252 assert_eq!(created.timezone.as_deref(), Some("Asia/Shanghai"));
1253 assert_eq!(created.misfire_policy, MisFirePolicy::RunOnce);
1254 assert_eq!(created.overlap_policy, OverlapPolicy::QueueOne);
1255 }
1256
1257 #[tokio::test]
1258 async fn patch_schedule_with_definition_updates_interval_trigger_metadata() {
1259 let dir = tempdir().unwrap();
1260 let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1261
1262 let created = store
1263 .create_schedule(
1264 "interval".to_string(),
1265 ScheduleTrigger::Interval {
1266 every_seconds: 300,
1267 anchor_at: None,
1268 },
1269 true,
1270 ScheduleRunConfig::default(),
1271 )
1272 .await
1273 .unwrap();
1274
1275 let patched = store
1276 .patch_schedule_with_definition(
1277 &created.id,
1278 None,
1279 None,
1280 None,
1281 ScheduleDefinitionChanges {
1282 trigger: Some(ScheduleTrigger::Interval {
1283 every_seconds: 600,
1284 anchor_at: None,
1285 }),
1286 timezone: Some("UTC".to_string()),
1287 misfire_policy: Some(MisFirePolicy::CatchUpAll),
1288 overlap_policy: Some(OverlapPolicy::Skip),
1289 ..Default::default()
1290 },
1291 )
1292 .await
1293 .unwrap()
1294 .unwrap();
1295
1296 assert!(matches!(
1297 patched.trigger,
1298 ScheduleTrigger::Interval {
1299 every_seconds: 600,
1300 ..
1301 }
1302 ));
1303 assert_eq!(patched.timezone.as_deref(), Some("UTC"));
1304 assert_eq!(patched.misfire_policy, MisFirePolicy::CatchUpAll);
1305 assert_eq!(patched.overlap_policy, OverlapPolicy::Skip);
1306 }
1307
1308 #[tokio::test]
1309 async fn claim_due_runs_with_engine_uses_runtime_adapter_for_interval_trigger() {
1310 let dir = tempdir().unwrap();
1311 let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1312
1313 let now = DateTime::parse_from_rfc3339("2026-04-04T10:00:00Z")
1314 .unwrap()
1315 .with_timezone(&Utc);
1316
1317 let created = store
1318 .create_schedule_with_definition(
1319 "interval".to_string(),
1320 true,
1321 ScheduleRunConfig::default(),
1322 ScheduleDefinitionChanges {
1323 trigger: Some(ScheduleTrigger::Interval {
1324 every_seconds: 300,
1325 anchor_at: Some(now - Duration::seconds(300)),
1326 }),
1327 ..Default::default()
1328 },
1329 )
1330 .await
1331 .unwrap();
1332
1333 store
1334 .update_index(|index| {
1335 let entry = index.schedules.get_mut(&created.id).unwrap();
1336 entry.state.next_fire_at = Some(now);
1337 Ok(())
1338 })
1339 .await
1340 .unwrap();
1341
1342 let engine = default_trigger_engine();
1343 let claimed = store
1344 .claim_due_runs_with_engine(now, engine.as_ref())
1345 .await
1346 .unwrap();
1347 assert_eq!(claimed.len(), 1);
1348
1349 let updated = store.get_schedule(&created.id).await.unwrap();
1350 assert_eq!(updated.state.last_scheduled_at, Some(now));
1351 assert_eq!(
1352 updated.state.next_fire_at,
1353 Some(now + Duration::seconds(300))
1354 );
1355 }
1356
1357 #[tokio::test]
1358 async fn create_schedule_with_definition_initializes_monthly_next_run() {
1359 let dir = tempdir().unwrap();
1360 let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1361
1362 let created = store
1363 .create_schedule_with_definition(
1364 "monthly".to_string(),
1365 true,
1366 ScheduleRunConfig::default(),
1367 ScheduleDefinitionChanges {
1368 trigger: Some(ScheduleTrigger::Monthly {
1369 days: vec![1, 15],
1370 hour: 9,
1371 minute: 0,
1372 second: 0,
1373 }),
1374 timezone: Some("UTC".to_string()),
1375 ..Default::default()
1376 },
1377 )
1378 .await
1379 .unwrap();
1380
1381 assert!(matches!(
1382 created.trigger,
1383 ScheduleTrigger::Monthly {
1384 days,
1385 hour: 9,
1386 minute: 0,
1387 second: 0
1388 } if days == vec![1, 15]
1389 ));
1390 assert!(created
1391 .state
1392 .next_fire_at
1393 .is_some_and(|next| next > created.created_at));
1394 }
1395
1396 #[tokio::test]
1397 async fn misfire_skip_does_not_dispatch_run() {
1398 let dir = tempdir().unwrap();
1399 let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1400 let now = DateTime::parse_from_rfc3339("2026-04-04T10:00:00Z")
1401 .unwrap()
1402 .with_timezone(&Utc);
1403
1404 let created = store
1405 .create_schedule_with_definition(
1406 "skip".to_string(),
1407 true,
1408 ScheduleRunConfig::default(),
1409 ScheduleDefinitionChanges {
1410 trigger: Some(ScheduleTrigger::Interval {
1411 every_seconds: 300,
1412 anchor_at: None,
1413 }),
1414 misfire_policy: Some(MisFirePolicy::Skip),
1415 ..Default::default()
1416 },
1417 )
1418 .await
1419 .unwrap();
1420
1421 store
1422 .update_index(|index| {
1423 let entry = index.schedules.get_mut(&created.id).unwrap();
1424 entry.state.next_fire_at = Some(now - Duration::seconds(900));
1425 Ok(())
1426 })
1427 .await
1428 .unwrap();
1429
1430 let engine = default_trigger_engine();
1431 let claimed = store
1432 .claim_due_runs_with_engine(now, engine.as_ref())
1433 .await
1434 .unwrap();
1435 assert!(claimed.is_empty());
1436
1437 let updated = store.get_schedule(&created.id).await.unwrap();
1438 assert_eq!(updated.state.last_scheduled_at, Some(now));
1439 assert!(updated.state.next_fire_at.is_some_and(|next| next > now));
1440 assert_eq!(updated.state.queued_run_count, 0);
1441 }
1442
1443 #[tokio::test]
1444 async fn overlap_skip_does_not_dispatch_when_running() {
1445 let dir = tempdir().unwrap();
1446 let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1447 let now = DateTime::parse_from_rfc3339("2026-04-04T10:00:00Z")
1448 .unwrap()
1449 .with_timezone(&Utc);
1450
1451 let created = store
1452 .create_schedule_with_definition(
1453 "skip-overlap".to_string(),
1454 true,
1455 ScheduleRunConfig::default(),
1456 ScheduleDefinitionChanges {
1457 trigger: Some(ScheduleTrigger::Interval {
1458 every_seconds: 300,
1459 anchor_at: None,
1460 }),
1461 overlap_policy: Some(OverlapPolicy::Skip),
1462 ..Default::default()
1463 },
1464 )
1465 .await
1466 .unwrap();
1467
1468 store
1469 .update_index(|index| {
1470 let entry = index.schedules.get_mut(&created.id).unwrap();
1471 entry.state.next_fire_at = Some(now);
1472 entry.state.running_run_count = 1;
1473 Ok(())
1474 })
1475 .await
1476 .unwrap();
1477
1478 let engine = default_trigger_engine();
1479 let claimed = store
1480 .claim_due_runs_with_engine(now, engine.as_ref())
1481 .await
1482 .unwrap();
1483 assert!(claimed.is_empty());
1484
1485 let updated = store.get_schedule(&created.id).await.unwrap();
1486 assert_eq!(updated.state.running_run_count, 1);
1487 assert!(updated.state.next_fire_at.is_some_and(|next| next > now));
1488 assert_eq!(updated.state.queued_run_count, 0);
1489 }
1490
1491 #[tokio::test]
1492 async fn overlap_queue_one_does_not_add_more_than_one_pending_run() {
1493 let dir = tempdir().unwrap();
1494 let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1495 let now = DateTime::parse_from_rfc3339("2026-04-04T10:00:00Z")
1496 .unwrap()
1497 .with_timezone(&Utc);
1498
1499 let created = store
1500 .create_schedule_with_definition(
1501 "queue-one".to_string(),
1502 true,
1503 ScheduleRunConfig::default(),
1504 ScheduleDefinitionChanges {
1505 trigger: Some(ScheduleTrigger::Interval {
1506 every_seconds: 300,
1507 anchor_at: None,
1508 }),
1509 overlap_policy: Some(OverlapPolicy::QueueOne),
1510 ..Default::default()
1511 },
1512 )
1513 .await
1514 .unwrap();
1515
1516 store
1517 .update_index(|index| {
1518 let entry = index.schedules.get_mut(&created.id).unwrap();
1519 entry.state.next_fire_at = Some(now);
1520 entry.state.queued_run_count = 1;
1521 Ok(())
1522 })
1523 .await
1524 .unwrap();
1525
1526 let engine = default_trigger_engine();
1527 let claimed = store
1528 .claim_due_runs_with_engine(now, engine.as_ref())
1529 .await
1530 .unwrap();
1531 assert!(claimed.is_empty());
1532
1533 let updated = store.get_schedule(&created.id).await.unwrap();
1534 assert_eq!(updated.state.queued_run_count, 1);
1535 }
1536
1537 #[tokio::test]
1538 async fn overlap_queue_one_limits_catch_up_to_single_pending_run() {
1539 let dir = tempdir().unwrap();
1540 let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1541 let now = DateTime::parse_from_rfc3339("2026-04-04T10:00:00Z")
1542 .unwrap()
1543 .with_timezone(&Utc);
1544
1545 let created = store
1546 .create_schedule_with_definition(
1547 "queue-one-catchup".to_string(),
1548 true,
1549 ScheduleRunConfig::default(),
1550 ScheduleDefinitionChanges {
1551 trigger: Some(ScheduleTrigger::Interval {
1552 every_seconds: 300,
1553 anchor_at: None,
1554 }),
1555 misfire_policy: Some(MisFirePolicy::CatchUpAll),
1556 overlap_policy: Some(OverlapPolicy::QueueOne),
1557 ..Default::default()
1558 },
1559 )
1560 .await
1561 .unwrap();
1562
1563 store
1564 .update_index(|index| {
1565 let entry = index.schedules.get_mut(&created.id).unwrap();
1566 entry.state.next_fire_at = Some(now - Duration::seconds(900));
1567 Ok(())
1568 })
1569 .await
1570 .unwrap();
1571
1572 let engine = default_trigger_engine();
1573 let claimed = store
1574 .claim_due_runs_with_engine(now, engine.as_ref())
1575 .await
1576 .unwrap();
1577 assert_eq!(claimed.len(), 1);
1578
1579 let updated = store.get_schedule(&created.id).await.unwrap();
1580 assert_eq!(updated.state.queued_run_count, 1);
1581 assert!(updated.state.next_fire_at.is_some_and(|next| next > now));
1582 }
1583
1584 #[tokio::test]
1585 async fn mark_run_terminal_records_success_accounting() {
1586 let dir = tempdir().unwrap();
1587 let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1588
1589 let created = store
1590 .create_schedule(
1591 "success".to_string(),
1592 ScheduleTrigger::Interval {
1593 every_seconds: 60,
1594 anchor_at: None,
1595 },
1596 true,
1597 ScheduleRunConfig::default(),
1598 )
1599 .await
1600 .unwrap();
1601
1602 store
1603 .update_index(|index| {
1604 let entry = index.schedules.get_mut(&created.id).unwrap();
1605 entry.state.running_run_count = 1;
1606 entry.state.consecutive_failures = 2;
1607 Ok(())
1608 })
1609 .await
1610 .unwrap();
1611
1612 store
1613 .mark_run_terminal(&created.id, "run-success", ScheduleRunStatus::Success, None)
1614 .await
1615 .unwrap();
1616
1617 let updated = store.get_schedule(&created.id).await.unwrap();
1618 assert_eq!(updated.state.running_run_count, 0);
1619 assert!(updated.state.last_finished_at.is_some());
1620 assert!(updated.state.last_success_at.is_some());
1621 assert_eq!(updated.state.total_run_count, 1);
1622 assert_eq!(updated.state.total_success_count, 1);
1623 assert_eq!(updated.state.total_failure_count, 0);
1624 assert_eq!(updated.state.consecutive_failures, 0);
1625 }
1626
1627 #[tokio::test]
1628 async fn mark_run_terminal_records_failure_accounting() {
1629 let dir = tempdir().unwrap();
1630 let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1631
1632 let created = store
1633 .create_schedule(
1634 "failure".to_string(),
1635 ScheduleTrigger::Interval {
1636 every_seconds: 60,
1637 anchor_at: None,
1638 },
1639 true,
1640 ScheduleRunConfig::default(),
1641 )
1642 .await
1643 .unwrap();
1644
1645 store
1646 .update_index(|index| {
1647 let entry = index.schedules.get_mut(&created.id).unwrap();
1648 entry.state.running_run_count = 1;
1649 Ok(())
1650 })
1651 .await
1652 .unwrap();
1653
1654 store
1655 .mark_run_terminal(&created.id, "run-failed", ScheduleRunStatus::Failed, None)
1656 .await
1657 .unwrap();
1658 store
1659 .update_index(|index| {
1660 let entry = index.schedules.get_mut(&created.id).unwrap();
1661 entry.state.running_run_count = 1;
1662 Ok(())
1663 })
1664 .await
1665 .unwrap();
1666 store
1667 .mark_run_terminal(
1668 &created.id,
1669 "run-cancelled",
1670 ScheduleRunStatus::Cancelled,
1671 None,
1672 )
1673 .await
1674 .unwrap();
1675
1676 let updated = store.get_schedule(&created.id).await.unwrap();
1677 assert_eq!(updated.state.running_run_count, 0);
1678 assert!(updated.state.last_finished_at.is_some());
1679 assert!(updated.state.last_failure_at.is_some());
1680 assert_eq!(updated.state.total_run_count, 2);
1681 assert_eq!(updated.state.total_success_count, 0);
1682 assert_eq!(updated.state.total_failure_count, 2);
1683 assert_eq!(updated.state.consecutive_failures, 2);
1684 }
1685
1686 #[tokio::test]
1687 async fn mark_run_dequeued_without_start_counts_missed_occurrence() {
1688 let dir = tempdir().unwrap();
1689 let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1690
1691 let created = store
1692 .create_schedule(
1693 "missed".to_string(),
1694 ScheduleTrigger::Interval {
1695 every_seconds: 60,
1696 anchor_at: None,
1697 },
1698 true,
1699 ScheduleRunConfig::default(),
1700 )
1701 .await
1702 .unwrap();
1703
1704 store
1705 .update_index(|index| {
1706 let entry = index.schedules.get_mut(&created.id).unwrap();
1707 entry.state.queued_run_count = 1;
1708 Ok(())
1709 })
1710 .await
1711 .unwrap();
1712
1713 store
1714 .mark_run_dequeued_without_start(&created.id, "run-missed", None)
1715 .await
1716 .unwrap();
1717
1718 let updated = store.get_schedule(&created.id).await.unwrap();
1719 assert_eq!(updated.state.queued_run_count, 0);
1720 assert_eq!(updated.state.total_missed_count, 1);
1721 let record = store
1722 .get_run_record("run-missed")
1723 .await
1724 .expect("run record should be created for missed dequeue");
1725 assert_eq!(record.status, ScheduleRunStatus::Missed);
1726 assert!(record.completed_at.is_some());
1727 }
1728
1729 #[tokio::test]
1730 async fn create_run_now_persists_queued_run_record() {
1731 let dir = tempdir().unwrap();
1732 let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1733
1734 let created = store
1735 .create_schedule(
1736 "run-now".to_string(),
1737 ScheduleTrigger::Interval {
1738 every_seconds: 60,
1739 anchor_at: None,
1740 },
1741 true,
1742 ScheduleRunConfig::default(),
1743 )
1744 .await
1745 .unwrap();
1746
1747 let claimed = store
1748 .create_run_now(&created.id)
1749 .await
1750 .unwrap()
1751 .expect("run descriptor should be created");
1752
1753 let record = store
1754 .get_run_record(&claimed.run_id)
1755 .await
1756 .expect("queued run record should exist");
1757 assert_eq!(record.schedule_id, created.id);
1758 assert_eq!(record.status, ScheduleRunStatus::Queued);
1759 assert_eq!(record.claimed_at, claimed.claimed_at);
1760 assert_eq!(record.scheduled_for, claimed.scheduled_for);
1761 assert!(!claimed.was_catch_up);
1762 }
1763
1764 #[tokio::test]
1765 async fn mark_run_started_and_terminal_updates_run_record_fields() {
1766 let dir = tempdir().unwrap();
1767 let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1768
1769 let created = store
1770 .create_schedule(
1771 "run-record-lifecycle".to_string(),
1772 ScheduleTrigger::Interval {
1773 every_seconds: 60,
1774 anchor_at: None,
1775 },
1776 true,
1777 ScheduleRunConfig::default(),
1778 )
1779 .await
1780 .unwrap();
1781
1782 let claimed = store
1783 .create_run_now(&created.id)
1784 .await
1785 .unwrap()
1786 .expect("run descriptor should be created");
1787
1788 store
1789 .mark_run_started(&created.id, &claimed.run_id)
1790 .await
1791 .unwrap();
1792 store
1793 .bind_run_session(&created.id, &claimed.run_id, "session-1")
1794 .await
1795 .unwrap();
1796 store
1797 .mark_run_terminal(
1798 &created.id,
1799 &claimed.run_id,
1800 ScheduleRunStatus::Success,
1801 Some("ok".to_string()),
1802 )
1803 .await
1804 .unwrap();
1805
1806 let record = store
1807 .get_run_record(&claimed.run_id)
1808 .await
1809 .expect("run record should exist");
1810 assert_eq!(record.status, ScheduleRunStatus::Success);
1811 assert!(record.started_at.is_some());
1812 assert!(record.completed_at.is_some());
1813 assert_eq!(record.session_id.as_deref(), Some("session-1"));
1814 assert!(record.dispatch_lag_ms.is_some());
1815 assert!(record.execution_duration_ms.is_some());
1816 assert_eq!(record.outcome_reason.as_deref(), Some("ok"));
1817 }
1818
1819 #[tokio::test]
1820 async fn delete_schedule_removes_associated_run_records() {
1821 let dir = tempdir().unwrap();
1822 let store = ScheduleStore::new(dir.path().to_path_buf()).await.unwrap();
1823
1824 let created = store
1825 .create_schedule(
1826 "cleanup-history".to_string(),
1827 ScheduleTrigger::Interval {
1828 every_seconds: 60,
1829 anchor_at: None,
1830 },
1831 true,
1832 ScheduleRunConfig::default(),
1833 )
1834 .await
1835 .unwrap();
1836
1837 let claimed = store
1838 .create_run_now(&created.id)
1839 .await
1840 .unwrap()
1841 .expect("run descriptor should be created");
1842 assert!(store.get_run_record(&claimed.run_id).await.is_some());
1843
1844 let deleted = store.delete_schedule(&created.id).await.unwrap();
1845 assert!(deleted);
1846 assert!(store.get_run_record(&claimed.run_id).await.is_none());
1847 }
1848}