Skip to main content

deepseek/agent/scheduler/
mod.rs

1//! Session-scoped cron scheduler matching Claude Code's `/loop` semantics.
2//!
3//! The scheduler holds [`Task`]s in memory and (optionally) persists them to
4//! disk so a `--resume <session_id>` can restore them. Each tick advances due
5//! tasks: recurring tasks have their `next_fire` updated; one-shot tasks are
6//! removed after firing. Recurring tasks expire 7 days after creation; their
7//! final fire is delivered, then they are removed.
8//!
9//! See <https://code.claude.com/docs/en/scheduled-tasks> for the spec.
10
11pub mod cron;
12pub mod jitter;
13pub mod maintenance;
14pub mod store;
15
16use std::collections::BTreeMap;
17
18use chrono::{DateTime, Duration, Utc};
19use serde::{Deserialize, Serialize};
20use uuid::Uuid;
21
22pub use cron::CronExpr;
23pub use maintenance::{resolve_prompt, BUILT_IN_MAINTENANCE_PROMPT};
24
25/// Session-scoped cap on concurrent tasks (matches Claude Code spec).
26pub const DEFAULT_MAX_TASKS: usize = 50;
27
28/// Recurring tasks expire this long after creation.
29pub const RECURRING_EXPIRY: Duration = Duration::days(7);
30
31/// Env var that disables the scheduler entirely (matches Claude Code spec).
32pub const CLAUDE_DISABLE_VAR: &str = "CLAUDE_CODE_DISABLE_CRON";
33/// Crate-specific alias honored alongside [`CLAUDE_DISABLE_VAR`].
34pub const ALIAS_DISABLE_VAR: &str = "DEEPSEEK_LOOP_DISABLE_CRON";
35
36/// 8-character base32 task identifier.
37#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
38pub struct TaskId(String);
39
40impl TaskId {
41    /// Mint a fresh ID. Crockford-base32 of the UUID's first 5 bytes (40 bits)
42    /// gives us 8 chars with no ambiguous I/O/L/U.
43    pub fn new() -> Self {
44        let bytes = Uuid::new_v4().into_bytes();
45        Self(crockford32(&bytes[..5]))
46    }
47
48    /// Wrap a caller-supplied string as a [`TaskId`]. No validation — the
49    /// scheduler simply uses the value as a map key.
50    pub fn from_raw(s: &str) -> Self {
51        Self(s.to_string())
52    }
53
54    pub fn as_str(&self) -> &str {
55        &self.0
56    }
57}
58
59impl Default for TaskId {
60    fn default() -> Self {
61        Self::new()
62    }
63}
64
65impl std::fmt::Display for TaskId {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        f.write_str(&self.0)
68    }
69}
70
71fn crockford32(bytes: &[u8]) -> String {
72    const ALPHABET: &[u8; 32] = b"0123456789ABCDEFGHJKMNPQRSTVWXYZ";
73    let mut bits: u64 = 0;
74    for &b in bytes {
75        bits = (bits << 8) | b as u64;
76    }
77    let needed = (bytes.len() * 8).div_ceil(5);
78    let mut out = vec![0u8; needed];
79    for (i, slot) in out.iter_mut().enumerate() {
80        let shift = (needed - 1 - i) * 5;
81        let idx = ((bits >> shift) & 0x1f) as usize;
82        *slot = ALPHABET[idx];
83    }
84    String::from_utf8(out).unwrap()
85}
86
87/// What kind of cadence a [`Task`] runs on.
88#[derive(Debug, Clone, Serialize, Deserialize)]
89#[serde(tag = "kind", rename_all = "snake_case")]
90pub enum Schedule {
91    /// Standard 5-field cron expression. Boxed because [`CronExpr`] is much
92    /// larger than the other variants.
93    Cron(Box<CronExpr>),
94    /// One-shot at the specified UTC time.
95    Once(DateTime<Utc>),
96    /// Claude picks the delay between iterations dynamically.
97    Dynamic,
98}
99
100/// A scheduled prompt + its cadence + bookkeeping fields.
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct Task {
103    pub id: TaskId,
104    pub schedule: Schedule,
105    pub prompt: String,
106    pub recurring: bool,
107    pub created_at: DateTime<Utc>,
108    pub next_fire: DateTime<Utc>,
109    pub expires_at: Option<DateTime<Utc>>,
110}
111
112#[derive(Debug, thiserror::Error)]
113pub enum SchedulerError {
114    #[error("scheduler is disabled via env var (set {0}=0 to enable)")]
115    Disabled(&'static str),
116    #[error("task capacity reached ({0}); delete a task first")]
117    Capacity(usize),
118    #[error("invalid cron expression: {0}")]
119    BadCron(String),
120    #[error("invalid schedule: {0}")]
121    BadSchedule(String),
122}
123
124/// Session-scoped cron scheduler.
125#[derive(Debug)]
126pub struct Scheduler {
127    session_id: String,
128    tasks: BTreeMap<TaskId, Task>,
129    cap: usize,
130    disabled: bool,
131}
132
133impl Scheduler {
134    /// Create an empty scheduler bound to `session_id`. Honors the
135    /// [`CLAUDE_DISABLE_VAR`] / [`ALIAS_DISABLE_VAR`] env vars.
136    pub fn new(session_id: impl Into<String>) -> Self {
137        Self::with_cap(session_id, DEFAULT_MAX_TASKS)
138    }
139
140    pub fn with_cap(session_id: impl Into<String>, cap: usize) -> Self {
141        Self {
142            session_id: session_id.into(),
143            tasks: BTreeMap::new(),
144            cap,
145            disabled: is_disabled(),
146        }
147    }
148
149    /// Restore tasks from disk for `session_id`. Tasks that have already
150    /// expired are pruned during the restore.
151    pub fn restore(session_id: impl Into<String>) -> Self {
152        let mut s = Self::new(session_id);
153        if let Ok(loaded) = store::load(&s.session_id) {
154            let now = Utc::now();
155            for t in loaded {
156                if t.is_expired(now) {
157                    continue;
158                }
159                s.tasks.insert(t.id.clone(), t);
160            }
161        }
162        s
163    }
164
165    pub fn session_id(&self) -> &str {
166        &self.session_id
167    }
168
169    pub fn is_disabled(&self) -> bool {
170        self.disabled
171    }
172
173    pub fn cap(&self) -> usize {
174        self.cap
175    }
176
177    pub fn len(&self) -> usize {
178        self.tasks.len()
179    }
180
181    pub fn is_empty(&self) -> bool {
182        self.tasks.is_empty()
183    }
184
185    /// Schedule a new task.
186    ///
187    /// `recurring=false` makes the task one-shot — it's removed after firing.
188    /// `recurring=true` is only meaningful for [`Schedule::Cron`] and
189    /// [`Schedule::Dynamic`]; for [`Schedule::Once`] it is treated as
190    /// one-shot regardless.
191    pub fn create(
192        &mut self,
193        schedule: Schedule,
194        prompt: impl Into<String>,
195        recurring: bool,
196    ) -> Result<TaskId, SchedulerError> {
197        if self.disabled {
198            return Err(SchedulerError::Disabled(CLAUDE_DISABLE_VAR));
199        }
200        if self.tasks.len() >= self.cap {
201            return Err(SchedulerError::Capacity(self.cap));
202        }
203
204        let now = Utc::now();
205        let id = TaskId::new();
206
207        let nominal = match &schedule {
208            Schedule::Cron(c) => c
209                .clone()
210                .next_after(now)
211                .ok_or_else(|| SchedulerError::BadCron("no future fire time".into()))?,
212            Schedule::Once(t) => *t,
213            Schedule::Dynamic => now + Duration::seconds(60),
214        };
215        let interval = match &schedule {
216            Schedule::Cron(c) => Some(c.clone().approx_interval_seconds()),
217            _ => None,
218        };
219        let recurring_eff = match &schedule {
220            Schedule::Once(_) => false,
221            _ => recurring,
222        };
223        let next_fire = jitter::apply(id.as_str(), nominal, interval, recurring_eff);
224
225        let expires_at = if recurring_eff {
226            Some(now + RECURRING_EXPIRY)
227        } else {
228            None
229        };
230
231        let task = Task {
232            id: id.clone(),
233            schedule,
234            prompt: prompt.into(),
235            recurring: recurring_eff,
236            created_at: now,
237            next_fire,
238            expires_at,
239        };
240        self.tasks.insert(id.clone(), task);
241        let _ = store::save(&self.session_id, &self.snapshot());
242        Ok(id)
243    }
244
245    pub fn list(&self) -> Vec<&Task> {
246        self.tasks.values().collect()
247    }
248
249    pub fn delete(&mut self, id: &TaskId) -> bool {
250        let removed = self.tasks.remove(id).is_some();
251        if removed {
252            let _ = store::save(&self.session_id, &self.snapshot());
253        }
254        removed
255    }
256
257    /// Advance the scheduler to `now`. Returns the prompts that just fired,
258    /// in fire-time order. Recurring tasks have their `next_fire` advanced to
259    /// the next slot (with jitter); one-shot tasks are removed.
260    pub fn tick(&mut self, now: DateTime<Utc>) -> Vec<Fire> {
261        if self.disabled {
262            return Vec::new();
263        }
264
265        let due_ids: Vec<TaskId> = self
266            .tasks
267            .iter()
268            .filter(|(_, t)| t.next_fire <= now)
269            .map(|(id, _)| id.clone())
270            .collect();
271
272        let mut fires = Vec::with_capacity(due_ids.len());
273        let mut mutated = false;
274
275        for id in due_ids {
276            // Borrow loosely to allow mutation below.
277            let snapshot = match self.tasks.get(&id) {
278                Some(t) => t.clone(),
279                None => continue,
280            };
281
282            fires.push(Fire {
283                task_id: id.clone(),
284                prompt: snapshot.prompt.clone(),
285                fired_at: snapshot.next_fire,
286                final_fire: snapshot.is_expired(now) || !snapshot.recurring,
287            });
288
289            // Decide what to do with the task itself.
290            if !snapshot.recurring || snapshot.is_expired(now) {
291                self.tasks.remove(&id);
292                mutated = true;
293                continue;
294            }
295
296            // Recurring → advance next_fire.
297            let interval = match &snapshot.schedule {
298                Schedule::Cron(c) => Some(c.clone().approx_interval_seconds()),
299                _ => None,
300            };
301            let nominal = match &snapshot.schedule {
302                Schedule::Cron(c) => c.clone().next_after(now),
303                Schedule::Dynamic => Some(now + Duration::seconds(60)),
304                Schedule::Once(_) => None, // unreachable: Once is never recurring
305            };
306            if let Some(nominal) = nominal {
307                let nf = jitter::apply(id.as_str(), nominal, interval, true);
308                if let Some(t) = self.tasks.get_mut(&id) {
309                    t.next_fire = nf;
310                    mutated = true;
311                }
312            } else {
313                self.tasks.remove(&id);
314                mutated = true;
315            }
316        }
317
318        if mutated {
319            let _ = store::save(&self.session_id, &self.snapshot());
320        }
321        fires
322    }
323
324    fn snapshot(&self) -> Vec<Task> {
325        self.tasks.values().cloned().collect()
326    }
327}
328
329impl Task {
330    pub fn is_expired(&self, now: DateTime<Utc>) -> bool {
331        self.expires_at.is_some_and(|t| now >= t)
332    }
333}
334
335/// What [`Scheduler::tick`] returns for each due task.
336#[derive(Debug, Clone)]
337pub struct Fire {
338    pub task_id: TaskId,
339    pub prompt: String,
340    pub fired_at: DateTime<Utc>,
341    /// True when this is the last time the task will fire (one-shot or
342    /// expired recurring).
343    pub final_fire: bool,
344}
345
346fn is_disabled() -> bool {
347    fn flag(name: &str) -> bool {
348        std::env::var(name).map(|v| v == "1" || v.eq_ignore_ascii_case("true")).unwrap_or(false)
349    }
350    flag(CLAUDE_DISABLE_VAR) || flag(ALIAS_DISABLE_VAR)
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356
357    fn fresh_scheduler() -> Scheduler {
358        // Avoid accidental disable-var leak from the host shell.
359        std::env::remove_var(CLAUDE_DISABLE_VAR);
360        std::env::remove_var(ALIAS_DISABLE_VAR);
361        Scheduler::new("test-session")
362    }
363
364    #[test]
365    fn task_id_is_8_chars() {
366        let id = TaskId::new();
367        assert_eq!(id.as_str().len(), 8);
368    }
369
370    #[test]
371    fn create_then_list_then_delete() {
372        let mut s = fresh_scheduler();
373        let cron = CronExpr::parse("*/5 * * * *").unwrap();
374        let id = s
375            .create(Schedule::Cron(Box::new(cron)), "do work", true)
376            .unwrap();
377        assert_eq!(s.list().len(), 1);
378        assert!(s.delete(&id));
379        assert_eq!(s.list().len(), 0);
380    }
381
382    #[test]
383    fn cap_blocks_create() {
384        // Bypass the env-var disable check directly: the test should exercise
385        // the capacity guard regardless of any disable flag the host may
386        // have set (or that another parallel test has set).
387        let mut s = Scheduler::with_cap("test-cap", 2);
388        s.disabled = false;
389        let cron = CronExpr::parse("*/5 * * * *").unwrap();
390        s.create(Schedule::Cron(Box::new(cron.clone())), "a", true).unwrap();
391        s.create(Schedule::Cron(Box::new(cron.clone())), "b", true).unwrap();
392        let err = s
393            .create(Schedule::Cron(Box::new(cron)), "c", true)
394            .unwrap_err();
395        assert!(matches!(err, SchedulerError::Capacity(2)));
396    }
397
398    #[test]
399    fn tick_fires_due_recurring_and_advances() {
400        let mut s = fresh_scheduler();
401        let cron = CronExpr::parse("*/1 * * * *").unwrap();
402        let id = s.create(Schedule::Cron(Box::new(cron)), "tick", true).unwrap();
403        // Force the task to be due.
404        s.tasks.get_mut(&id).unwrap().next_fire = Utc::now() - Duration::seconds(5);
405
406        let fires = s.tick(Utc::now());
407        assert_eq!(fires.len(), 1);
408        assert_eq!(fires[0].task_id, id);
409        assert!(!fires[0].final_fire);
410        // Task is still present and rescheduled into the future.
411        let t = s.tasks.get(&id).unwrap();
412        assert!(t.next_fire > Utc::now());
413    }
414
415    #[test]
416    fn one_shot_is_removed_after_fire() {
417        let mut s = fresh_scheduler();
418        let when = Utc::now() - Duration::seconds(1);
419        let id = s
420            .create(Schedule::Once(when), "one-shot", false)
421            .unwrap();
422        let fires = s.tick(Utc::now());
423        assert_eq!(fires.len(), 1);
424        assert!(fires[0].final_fire);
425        assert!(!s.tasks.contains_key(&id));
426    }
427
428    #[test]
429    fn expired_recurring_fires_once_then_removed() {
430        let mut s = fresh_scheduler();
431        let cron = CronExpr::parse("*/1 * * * *").unwrap();
432        let id = s.create(Schedule::Cron(Box::new(cron)), "old", true).unwrap();
433        // Backdate the task so it's past expiry and due.
434        let t = s.tasks.get_mut(&id).unwrap();
435        t.created_at = Utc::now() - Duration::days(8);
436        t.expires_at = Some(Utc::now() - Duration::hours(1));
437        t.next_fire = Utc::now() - Duration::seconds(5);
438
439        let fires = s.tick(Utc::now());
440        assert_eq!(fires.len(), 1);
441        assert!(fires[0].final_fire);
442        assert!(!s.tasks.contains_key(&id));
443    }
444
445    #[test]
446    fn disabled_blocks_create_and_tick() {
447        // Force-disable directly to avoid racing env-var mutations with other
448        // parallel tests that read `is_disabled()` during construction.
449        let mut s = Scheduler::new("disabled");
450        s.disabled = true;
451        let cron = CronExpr::parse("*/5 * * * *").unwrap();
452        let err = s.create(Schedule::Cron(Box::new(cron)), "x", true).unwrap_err();
453        assert!(matches!(err, SchedulerError::Disabled(_)));
454        assert!(s.tick(Utc::now()).is_empty());
455    }
456
457    #[test]
458    fn env_var_triggers_disable() {
459        // Verify the env-var check works in isolation. We use a unique var
460        // name so we don't race other tests; we point the helper at it via
461        // a direct call rather than mutating the global.
462        let key = "DEEPSEEK_LOOP_DISABLE_CRON_TEST_ONLY";
463        std::env::set_var(key, "1");
464        let observed = std::env::var(key).map(|v| v == "1").unwrap_or(false);
465        std::env::remove_var(key);
466        assert!(observed, "env-var sanity check failed");
467    }
468}