1pub mod cron;
12pub mod jitter;
13pub mod maintenance;
14pub mod store;
15
16use std::collections::BTreeMap;
17
18use chrono::{DateTime, Duration, Utc};
19use serde::{Deserialize, Serialize};
20use uuid::Uuid;
21
22pub use cron::CronExpr;
23pub use maintenance::{resolve_prompt, BUILT_IN_MAINTENANCE_PROMPT};
24
25pub const DEFAULT_MAX_TASKS: usize = 50;
27
28pub const RECURRING_EXPIRY: Duration = Duration::days(7);
30
31pub const CLAUDE_DISABLE_VAR: &str = "CLAUDE_CODE_DISABLE_CRON";
33pub const ALIAS_DISABLE_VAR: &str = "DEEPSEEK_LOOP_DISABLE_CRON";
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
38pub struct TaskId(String);
39
40impl TaskId {
41 pub fn new() -> Self {
44 let bytes = Uuid::new_v4().into_bytes();
45 Self(crockford32(&bytes[..5]))
46 }
47
48 pub fn from_raw(s: &str) -> Self {
51 Self(s.to_string())
52 }
53
54 pub fn as_str(&self) -> &str {
55 &self.0
56 }
57}
58
59impl Default for TaskId {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65impl std::fmt::Display for TaskId {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 f.write_str(&self.0)
68 }
69}
70
71fn crockford32(bytes: &[u8]) -> String {
72 const ALPHABET: &[u8; 32] = b"0123456789ABCDEFGHJKMNPQRSTVWXYZ";
73 let mut bits: u64 = 0;
74 for &b in bytes {
75 bits = (bits << 8) | b as u64;
76 }
77 let needed = (bytes.len() * 8).div_ceil(5);
78 let mut out = vec![0u8; needed];
79 for (i, slot) in out.iter_mut().enumerate() {
80 let shift = (needed - 1 - i) * 5;
81 let idx = ((bits >> shift) & 0x1f) as usize;
82 *slot = ALPHABET[idx];
83 }
84 String::from_utf8(out).unwrap()
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89#[serde(tag = "kind", rename_all = "snake_case")]
90pub enum Schedule {
91 Cron(Box<CronExpr>),
94 Once { at: DateTime<Utc> },
99 Dynamic,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct Task {
106 pub id: TaskId,
107 pub schedule: Schedule,
108 pub prompt: String,
109 pub recurring: bool,
110 pub created_at: DateTime<Utc>,
111 pub next_fire: DateTime<Utc>,
112 pub expires_at: Option<DateTime<Utc>>,
113}
114
115#[derive(Debug, thiserror::Error)]
116pub enum SchedulerError {
117 #[error("scheduler is disabled via env var (set {0}=0 to enable)")]
118 Disabled(&'static str),
119 #[error("task capacity reached ({0}); delete a task first")]
120 Capacity(usize),
121 #[error("invalid cron expression: {0}")]
122 BadCron(String),
123 #[error("invalid schedule: {0}")]
124 BadSchedule(String),
125}
126
127#[derive(Debug)]
129pub struct Scheduler {
130 session_id: String,
131 tasks: BTreeMap<TaskId, Task>,
132 cap: usize,
133 disabled: bool,
134}
135
136impl Scheduler {
137 pub fn new(session_id: impl Into<String>) -> Self {
140 Self::with_cap(session_id, DEFAULT_MAX_TASKS)
141 }
142
143 pub fn with_cap(session_id: impl Into<String>, cap: usize) -> Self {
144 Self {
145 session_id: session_id.into(),
146 tasks: BTreeMap::new(),
147 cap,
148 disabled: is_disabled(),
149 }
150 }
151
152 pub fn restore(session_id: impl Into<String>) -> Self {
155 let mut s = Self::new(session_id);
156 if let Ok(loaded) = store::load(&s.session_id) {
157 let now = Utc::now();
158 for t in loaded {
159 if t.is_expired(now) {
160 continue;
161 }
162 s.tasks.insert(t.id.clone(), t);
163 }
164 }
165 s
166 }
167
168 pub fn session_id(&self) -> &str {
169 &self.session_id
170 }
171
172 pub fn is_disabled(&self) -> bool {
173 self.disabled
174 }
175
176 pub fn cap(&self) -> usize {
177 self.cap
178 }
179
180 pub fn len(&self) -> usize {
181 self.tasks.len()
182 }
183
184 pub fn is_empty(&self) -> bool {
185 self.tasks.is_empty()
186 }
187
188 pub fn create(
195 &mut self,
196 schedule: Schedule,
197 prompt: impl Into<String>,
198 recurring: bool,
199 ) -> Result<TaskId, SchedulerError> {
200 if self.disabled {
201 return Err(SchedulerError::Disabled(CLAUDE_DISABLE_VAR));
202 }
203 if self.tasks.len() >= self.cap {
204 return Err(SchedulerError::Capacity(self.cap));
205 }
206
207 let now = Utc::now();
208 let id = TaskId::new();
209
210 let nominal = match &schedule {
211 Schedule::Cron(c) => c
212 .clone()
213 .next_after(now)
214 .ok_or_else(|| SchedulerError::BadCron("no future fire time".into()))?,
215 Schedule::Once { at } => *at,
216 Schedule::Dynamic => now + Duration::seconds(60),
217 };
218 let interval = match &schedule {
219 Schedule::Cron(c) => Some(c.clone().approx_interval_seconds()),
220 _ => None,
221 };
222 let recurring_eff = match &schedule {
223 Schedule::Once { .. } => false,
224 _ => recurring,
225 };
226 let next_fire = jitter::apply(id.as_str(), nominal, interval, recurring_eff);
227
228 let expires_at = if recurring_eff {
229 Some(now + RECURRING_EXPIRY)
230 } else {
231 None
232 };
233
234 let task = Task {
235 id: id.clone(),
236 schedule,
237 prompt: prompt.into(),
238 recurring: recurring_eff,
239 created_at: now,
240 next_fire,
241 expires_at,
242 };
243 self.tasks.insert(id.clone(), task);
244 let _ = store::save(&self.session_id, &self.snapshot());
245 Ok(id)
246 }
247
248 pub fn list(&self) -> Vec<&Task> {
249 self.tasks.values().collect()
250 }
251
252 pub fn delete(&mut self, id: &TaskId) -> bool {
253 let removed = self.tasks.remove(id).is_some();
254 if removed {
255 let _ = store::save(&self.session_id, &self.snapshot());
256 }
257 removed
258 }
259
260 pub fn tick(&mut self, now: DateTime<Utc>) -> Vec<Fire> {
264 if self.disabled {
265 return Vec::new();
266 }
267
268 let due_ids: Vec<TaskId> = self
269 .tasks
270 .iter()
271 .filter(|(_, t)| t.next_fire <= now)
272 .map(|(id, _)| id.clone())
273 .collect();
274
275 let mut fires = Vec::with_capacity(due_ids.len());
276 let mut mutated = false;
277
278 for id in due_ids {
279 let snapshot = match self.tasks.get(&id) {
281 Some(t) => t.clone(),
282 None => continue,
283 };
284
285 fires.push(Fire {
286 task_id: id.clone(),
287 prompt: snapshot.prompt.clone(),
288 fired_at: snapshot.next_fire,
289 final_fire: snapshot.is_expired(now) || !snapshot.recurring,
290 });
291
292 if !snapshot.recurring || snapshot.is_expired(now) {
294 self.tasks.remove(&id);
295 mutated = true;
296 continue;
297 }
298
299 let interval = match &snapshot.schedule {
301 Schedule::Cron(c) => Some(c.clone().approx_interval_seconds()),
302 _ => None,
303 };
304 let nominal = match &snapshot.schedule {
305 Schedule::Cron(c) => c.clone().next_after(now),
306 Schedule::Dynamic => Some(now + Duration::seconds(60)),
307 Schedule::Once { .. } => None, };
309 if let Some(nominal) = nominal {
310 let nf = jitter::apply(id.as_str(), nominal, interval, true);
311 if let Some(t) = self.tasks.get_mut(&id) {
312 t.next_fire = nf;
313 mutated = true;
314 }
315 } else {
316 self.tasks.remove(&id);
317 mutated = true;
318 }
319 }
320
321 if mutated {
322 let _ = store::save(&self.session_id, &self.snapshot());
323 }
324 fires
325 }
326
327 fn snapshot(&self) -> Vec<Task> {
328 self.tasks.values().cloned().collect()
329 }
330}
331
332impl Task {
333 pub fn is_expired(&self, now: DateTime<Utc>) -> bool {
334 self.expires_at.is_some_and(|t| now >= t)
335 }
336}
337
338#[derive(Debug, Clone)]
340pub struct Fire {
341 pub task_id: TaskId,
342 pub prompt: String,
343 pub fired_at: DateTime<Utc>,
344 pub final_fire: bool,
347}
348
349fn is_disabled() -> bool {
350 fn flag(name: &str) -> bool {
351 std::env::var(name)
352 .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
353 .unwrap_or(false)
354 }
355 flag(CLAUDE_DISABLE_VAR) || flag(ALIAS_DISABLE_VAR)
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361
362 fn fresh_scheduler() -> Scheduler {
363 std::env::remove_var(CLAUDE_DISABLE_VAR);
365 std::env::remove_var(ALIAS_DISABLE_VAR);
366 Scheduler::new("test-session")
367 }
368
369 #[test]
370 fn task_id_is_8_chars() {
371 let id = TaskId::new();
372 assert_eq!(id.as_str().len(), 8);
373 }
374
375 #[test]
376 fn create_then_list_then_delete() {
377 let mut s = fresh_scheduler();
378 let cron = CronExpr::parse("*/5 * * * *").unwrap();
379 let id = s
380 .create(Schedule::Cron(Box::new(cron)), "do work", true)
381 .unwrap();
382 assert_eq!(s.list().len(), 1);
383 assert!(s.delete(&id));
384 assert_eq!(s.list().len(), 0);
385 }
386
387 #[test]
388 fn cap_blocks_create() {
389 let mut s = Scheduler::with_cap("test-cap", 2);
393 s.disabled = false;
394 let cron = CronExpr::parse("*/5 * * * *").unwrap();
395 s.create(Schedule::Cron(Box::new(cron.clone())), "a", true)
396 .unwrap();
397 s.create(Schedule::Cron(Box::new(cron.clone())), "b", true)
398 .unwrap();
399 let err = s
400 .create(Schedule::Cron(Box::new(cron)), "c", true)
401 .unwrap_err();
402 assert!(matches!(err, SchedulerError::Capacity(2)));
403 }
404
405 #[test]
406 fn tick_fires_due_recurring_and_advances() {
407 let mut s = fresh_scheduler();
408 let cron = CronExpr::parse("*/1 * * * *").unwrap();
409 let id = s
410 .create(Schedule::Cron(Box::new(cron)), "tick", true)
411 .unwrap();
412 s.tasks.get_mut(&id).unwrap().next_fire = Utc::now() - Duration::seconds(5);
414
415 let fires = s.tick(Utc::now());
416 assert_eq!(fires.len(), 1);
417 assert_eq!(fires[0].task_id, id);
418 assert!(!fires[0].final_fire);
419 let t = s.tasks.get(&id).unwrap();
421 assert!(t.next_fire > Utc::now());
422 }
423
424 #[test]
425 fn one_shot_is_removed_after_fire() {
426 let mut s = fresh_scheduler();
427 let when = Utc::now() - Duration::seconds(1);
428 let id = s
429 .create(Schedule::Once { at: when }, "one-shot", false)
430 .unwrap();
431 let fires = s.tick(Utc::now());
432 assert_eq!(fires.len(), 1);
433 assert!(fires[0].final_fire);
434 assert!(!s.tasks.contains_key(&id));
435 }
436
437 #[test]
438 fn expired_recurring_fires_once_then_removed() {
439 let mut s = fresh_scheduler();
440 let cron = CronExpr::parse("*/1 * * * *").unwrap();
441 let id = s
442 .create(Schedule::Cron(Box::new(cron)), "old", true)
443 .unwrap();
444 let t = s.tasks.get_mut(&id).unwrap();
446 t.created_at = Utc::now() - Duration::days(8);
447 t.expires_at = Some(Utc::now() - Duration::hours(1));
448 t.next_fire = Utc::now() - Duration::seconds(5);
449
450 let fires = s.tick(Utc::now());
451 assert_eq!(fires.len(), 1);
452 assert!(fires[0].final_fire);
453 assert!(!s.tasks.contains_key(&id));
454 }
455
456 #[test]
457 fn disabled_blocks_create_and_tick() {
458 let mut s = Scheduler::new("disabled");
461 s.disabled = true;
462 let cron = CronExpr::parse("*/5 * * * *").unwrap();
463 let err = s
464 .create(Schedule::Cron(Box::new(cron)), "x", true)
465 .unwrap_err();
466 assert!(matches!(err, SchedulerError::Disabled(_)));
467 assert!(s.tick(Utc::now()).is_empty());
468 }
469
470 #[test]
471 fn env_var_triggers_disable() {
472 let key = "DEEPSEEK_LOOP_DISABLE_CRON_TEST_ONLY";
476 std::env::set_var(key, "1");
477 let observed = std::env::var(key).map(|v| v == "1").unwrap_or(false);
478 std::env::remove_var(key);
479 assert!(observed, "env-var sanity check failed");
480 }
481
482 #[test]
483 fn task_id_uses_safe_alphabet() {
484 for _ in 0..256 {
486 let id = TaskId::new();
487 for c in id.as_str().chars() {
488 assert!(c.is_ascii_alphanumeric());
489 assert!(
490 !matches!(c, 'I' | 'O' | 'L' | 'U' | 'i' | 'o' | 'l' | 'u'),
491 "ambiguous char {c} in id {id}"
492 );
493 }
494 }
495 }
496
497 #[test]
498 fn task_id_is_unique_in_practice() {
499 let mut seen = std::collections::HashSet::new();
503 for _ in 0..1_000 {
504 assert!(seen.insert(TaskId::new()), "collision");
505 }
506 }
507
508 #[test]
509 fn cap_zero_blocks_all_creates() {
510 let mut s = Scheduler::with_cap("cap-zero", 0);
511 s.disabled = false;
512 let cron = CronExpr::parse("*/5 * * * *").unwrap();
513 let err = s
514 .create(Schedule::Cron(Box::new(cron)), "x", true)
515 .unwrap_err();
516 assert!(matches!(err, SchedulerError::Capacity(0)));
517 }
518
519 #[test]
520 fn delete_unknown_returns_false() {
521 let mut s = fresh_scheduler();
522 assert!(!s.delete(&TaskId::from_raw("DOES-NOT-EXIST")));
523 }
524
525 #[test]
526 fn tick_on_empty_returns_no_fires() {
527 let mut s = fresh_scheduler();
528 assert!(s.tick(Utc::now()).is_empty());
529 }
530
531 #[test]
532 fn dynamic_schedule_starts_60s_out() {
533 let mut s = fresh_scheduler();
534 let id = s.create(Schedule::Dynamic, "dyn", true).unwrap();
535 let task = s.tasks.get(&id).unwrap();
536 let delta = (task.next_fire - task.created_at).num_seconds();
537 assert!((60..(60 + 1800)).contains(&delta), "delta={delta}");
540 }
541
542 #[test]
543 fn once_schedule_ignores_recurring_flag() {
544 let mut s = fresh_scheduler();
545 let when = Utc::now() + Duration::hours(1);
546 let id = s
547 .create(Schedule::Once { at: when }, "future", true)
548 .unwrap();
549 let t = s.tasks.get(&id).unwrap();
550 assert!(!t.recurring, "Once schedule must always be one-shot");
551 assert!(t.expires_at.is_none(), "Once must not set an expiry");
552 }
553
554 #[test]
555 fn once_in_future_does_not_fire() {
556 let mut s = fresh_scheduler();
557 let when = Utc::now() + Duration::seconds(60);
558 let _id = s
559 .create(Schedule::Once { at: when }, "later", false)
560 .unwrap();
561 let fires = s.tick(Utc::now());
563 assert!(fires.is_empty());
564 }
565
566 #[test]
567 fn recurring_cron_sets_7_day_expiry() {
568 let mut s = fresh_scheduler();
569 let cron = CronExpr::parse("*/5 * * * *").unwrap();
570 let id = s.create(Schedule::Cron(Box::new(cron)), "x", true).unwrap();
571 let t = s.tasks.get(&id).unwrap();
572 let expires = t.expires_at.expect("recurring should set expires_at");
573 let span = (expires - t.created_at).num_days();
574 assert_eq!(span, 7, "recurring expiry should be exactly 7 days");
575 }
576
577 #[test]
578 fn non_recurring_cron_has_no_expiry() {
579 let mut s = fresh_scheduler();
580 let cron = CronExpr::parse("*/5 * * * *").unwrap();
581 let id = s
582 .create(Schedule::Cron(Box::new(cron)), "x", false)
583 .unwrap();
584 let t = s.tasks.get(&id).unwrap();
585 assert!(t.expires_at.is_none());
586 }
587
588 #[test]
589 fn tick_returns_due_tasks_in_fire_time_order() {
590 let mut s = fresh_scheduler();
593 let cron = CronExpr::parse("*/5 * * * *").unwrap();
594 let id_a = s
595 .create(Schedule::Cron(Box::new(cron.clone())), "a", true)
596 .unwrap();
597 let id_b = s.create(Schedule::Cron(Box::new(cron)), "b", true).unwrap();
598 let now = Utc::now();
599 s.tasks.get_mut(&id_a).unwrap().next_fire = now - Duration::seconds(10);
600 s.tasks.get_mut(&id_b).unwrap().next_fire = now - Duration::seconds(5);
601
602 let fires = s.tick(now);
603 assert_eq!(fires.len(), 2);
604 let mut sorted = fires.clone();
606 sorted.sort_by_key(|f| f.fired_at);
607 assert_eq!(sorted[0].task_id, id_a);
608 assert_eq!(sorted[1].task_id, id_b);
609 assert!(sorted[0].fired_at < sorted[1].fired_at);
610 }
611
612 #[test]
613 fn cap_returns_capacity_with_correct_bound() {
614 let mut s = Scheduler::with_cap("cap-three", 3);
616 s.disabled = false;
617 let cron = CronExpr::parse("*/5 * * * *").unwrap();
618 for i in 0..3 {
619 s.create(
620 Schedule::Cron(Box::new(cron.clone())),
621 format!("p{i}"),
622 true,
623 )
624 .unwrap();
625 }
626 let err = s
627 .create(Schedule::Cron(Box::new(cron)), "overflow", true)
628 .unwrap_err();
629 match err {
630 SchedulerError::Capacity(n) => assert_eq!(n, 3),
631 other => panic!("expected Capacity(3), got {other:?}"),
632 }
633 }
634
635 #[test]
636 fn tick_after_delete_does_not_fire() {
637 let mut s = fresh_scheduler();
641 let cron = CronExpr::parse("*/1 * * * *").unwrap();
642 let id = s.create(Schedule::Cron(Box::new(cron)), "x", true).unwrap();
643 s.tasks.get_mut(&id).unwrap().next_fire = Utc::now() - Duration::seconds(5);
644 s.tasks.remove(&id);
646 let fires = s.tick(Utc::now());
647 assert!(fires.is_empty());
648 }
649
650 #[test]
651 fn task_is_expired_only_after_expires_at() {
652 let mut s = fresh_scheduler();
653 let cron = CronExpr::parse("*/5 * * * *").unwrap();
654 let id = s.create(Schedule::Cron(Box::new(cron)), "x", true).unwrap();
655 let t = s.tasks.get(&id).unwrap();
656 let exp = t.expires_at.unwrap();
657 assert!(!t.is_expired(exp - Duration::seconds(1)));
658 assert!(t.is_expired(exp));
659 assert!(t.is_expired(exp + Duration::seconds(1)));
660 }
661
662 #[test]
663 fn restored_scheduler_keeps_tasks_with_persistence_round_trip() {
664 let dir = tempfile::tempdir().unwrap();
670 let cron = CronExpr::parse("*/5 * * * *").unwrap();
671 let now = Utc::now();
672 let task = Task {
673 id: TaskId::new(),
674 schedule: Schedule::Cron(Box::new(cron)),
675 prompt: "persist me".into(),
676 recurring: true,
677 created_at: now,
678 next_fire: now + Duration::minutes(5),
679 expires_at: Some(now + RECURRING_EXPIRY),
680 };
681 store::save_to(dir.path(), &[task.clone()]).unwrap();
682 let loaded = store::load_from(dir.path()).unwrap();
683 assert_eq!(loaded.len(), 1);
684 assert_eq!(loaded[0].id, task.id);
685 assert_eq!(loaded[0].prompt, task.prompt);
686 assert!(loaded[0].recurring);
687 match (&loaded[0].schedule, &task.schedule) {
688 (Schedule::Cron(a), Schedule::Cron(b)) => assert_eq!(a.as_str(), b.as_str()),
689 _ => panic!("schedule kind drifted across round trip"),
690 }
691 }
692
693 #[test]
694 fn schedule_is_disabled_when_either_var_set() {
695 use std::sync::Mutex;
699 static ENV_LOCK: Mutex<()> = Mutex::new(());
700 let _g = ENV_LOCK.lock().unwrap();
701
702 std::env::remove_var(CLAUDE_DISABLE_VAR);
703 std::env::remove_var(ALIAS_DISABLE_VAR);
704 assert!(!is_disabled());
705
706 std::env::set_var(CLAUDE_DISABLE_VAR, "1");
707 assert!(is_disabled());
708 std::env::remove_var(CLAUDE_DISABLE_VAR);
709
710 std::env::set_var(ALIAS_DISABLE_VAR, "true");
711 assert!(is_disabled(), "alias should also disable");
712 std::env::remove_var(ALIAS_DISABLE_VAR);
713
714 std::env::set_var(CLAUDE_DISABLE_VAR, "0");
715 assert!(!is_disabled(), "0 must not disable");
716 std::env::remove_var(CLAUDE_DISABLE_VAR);
717 }
718}