deepseek/agent/scheduler/
mod.rs1pub mod cron;
12pub mod jitter;
13pub mod maintenance;
14pub mod store;
15
16use std::collections::BTreeMap;
17
18use chrono::{DateTime, Duration, Utc};
19use serde::{Deserialize, Serialize};
20use uuid::Uuid;
21
22pub use cron::CronExpr;
23pub use maintenance::{resolve_prompt, BUILT_IN_MAINTENANCE_PROMPT};
24
25pub const DEFAULT_MAX_TASKS: usize = 50;
27
28pub const RECURRING_EXPIRY: Duration = Duration::days(7);
30
31pub const CLAUDE_DISABLE_VAR: &str = "CLAUDE_CODE_DISABLE_CRON";
33pub const ALIAS_DISABLE_VAR: &str = "DEEPSEEK_LOOP_DISABLE_CRON";
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
38pub struct TaskId(String);
39
40impl TaskId {
41 pub fn new() -> Self {
44 let bytes = Uuid::new_v4().into_bytes();
45 Self(crockford32(&bytes[..5]))
46 }
47
48 pub fn from_raw(s: &str) -> Self {
51 Self(s.to_string())
52 }
53
54 pub fn as_str(&self) -> &str {
55 &self.0
56 }
57}
58
59impl Default for TaskId {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65impl std::fmt::Display for TaskId {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 f.write_str(&self.0)
68 }
69}
70
71fn crockford32(bytes: &[u8]) -> String {
72 const ALPHABET: &[u8; 32] = b"0123456789ABCDEFGHJKMNPQRSTVWXYZ";
73 let mut bits: u64 = 0;
74 for &b in bytes {
75 bits = (bits << 8) | b as u64;
76 }
77 let needed = (bytes.len() * 8).div_ceil(5);
78 let mut out = vec![0u8; needed];
79 for (i, slot) in out.iter_mut().enumerate() {
80 let shift = (needed - 1 - i) * 5;
81 let idx = ((bits >> shift) & 0x1f) as usize;
82 *slot = ALPHABET[idx];
83 }
84 String::from_utf8(out).unwrap()
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89#[serde(tag = "kind", rename_all = "snake_case")]
90pub enum Schedule {
91 Cron(Box<CronExpr>),
94 Once(DateTime<Utc>),
96 Dynamic,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct Task {
103 pub id: TaskId,
104 pub schedule: Schedule,
105 pub prompt: String,
106 pub recurring: bool,
107 pub created_at: DateTime<Utc>,
108 pub next_fire: DateTime<Utc>,
109 pub expires_at: Option<DateTime<Utc>>,
110}
111
112#[derive(Debug, thiserror::Error)]
113pub enum SchedulerError {
114 #[error("scheduler is disabled via env var (set {0}=0 to enable)")]
115 Disabled(&'static str),
116 #[error("task capacity reached ({0}); delete a task first")]
117 Capacity(usize),
118 #[error("invalid cron expression: {0}")]
119 BadCron(String),
120 #[error("invalid schedule: {0}")]
121 BadSchedule(String),
122}
123
124#[derive(Debug)]
126pub struct Scheduler {
127 session_id: String,
128 tasks: BTreeMap<TaskId, Task>,
129 cap: usize,
130 disabled: bool,
131}
132
133impl Scheduler {
134 pub fn new(session_id: impl Into<String>) -> Self {
137 Self::with_cap(session_id, DEFAULT_MAX_TASKS)
138 }
139
140 pub fn with_cap(session_id: impl Into<String>, cap: usize) -> Self {
141 Self {
142 session_id: session_id.into(),
143 tasks: BTreeMap::new(),
144 cap,
145 disabled: is_disabled(),
146 }
147 }
148
149 pub fn restore(session_id: impl Into<String>) -> Self {
152 let mut s = Self::new(session_id);
153 if let Ok(loaded) = store::load(&s.session_id) {
154 let now = Utc::now();
155 for t in loaded {
156 if t.is_expired(now) {
157 continue;
158 }
159 s.tasks.insert(t.id.clone(), t);
160 }
161 }
162 s
163 }
164
165 pub fn session_id(&self) -> &str {
166 &self.session_id
167 }
168
169 pub fn is_disabled(&self) -> bool {
170 self.disabled
171 }
172
173 pub fn cap(&self) -> usize {
174 self.cap
175 }
176
177 pub fn len(&self) -> usize {
178 self.tasks.len()
179 }
180
181 pub fn is_empty(&self) -> bool {
182 self.tasks.is_empty()
183 }
184
185 pub fn create(
192 &mut self,
193 schedule: Schedule,
194 prompt: impl Into<String>,
195 recurring: bool,
196 ) -> Result<TaskId, SchedulerError> {
197 if self.disabled {
198 return Err(SchedulerError::Disabled(CLAUDE_DISABLE_VAR));
199 }
200 if self.tasks.len() >= self.cap {
201 return Err(SchedulerError::Capacity(self.cap));
202 }
203
204 let now = Utc::now();
205 let id = TaskId::new();
206
207 let nominal = match &schedule {
208 Schedule::Cron(c) => c
209 .clone()
210 .next_after(now)
211 .ok_or_else(|| SchedulerError::BadCron("no future fire time".into()))?,
212 Schedule::Once(t) => *t,
213 Schedule::Dynamic => now + Duration::seconds(60),
214 };
215 let interval = match &schedule {
216 Schedule::Cron(c) => Some(c.clone().approx_interval_seconds()),
217 _ => None,
218 };
219 let recurring_eff = match &schedule {
220 Schedule::Once(_) => false,
221 _ => recurring,
222 };
223 let next_fire = jitter::apply(id.as_str(), nominal, interval, recurring_eff);
224
225 let expires_at = if recurring_eff {
226 Some(now + RECURRING_EXPIRY)
227 } else {
228 None
229 };
230
231 let task = Task {
232 id: id.clone(),
233 schedule,
234 prompt: prompt.into(),
235 recurring: recurring_eff,
236 created_at: now,
237 next_fire,
238 expires_at,
239 };
240 self.tasks.insert(id.clone(), task);
241 let _ = store::save(&self.session_id, &self.snapshot());
242 Ok(id)
243 }
244
245 pub fn list(&self) -> Vec<&Task> {
246 self.tasks.values().collect()
247 }
248
249 pub fn delete(&mut self, id: &TaskId) -> bool {
250 let removed = self.tasks.remove(id).is_some();
251 if removed {
252 let _ = store::save(&self.session_id, &self.snapshot());
253 }
254 removed
255 }
256
257 pub fn tick(&mut self, now: DateTime<Utc>) -> Vec<Fire> {
261 if self.disabled {
262 return Vec::new();
263 }
264
265 let due_ids: Vec<TaskId> = self
266 .tasks
267 .iter()
268 .filter(|(_, t)| t.next_fire <= now)
269 .map(|(id, _)| id.clone())
270 .collect();
271
272 let mut fires = Vec::with_capacity(due_ids.len());
273 let mut mutated = false;
274
275 for id in due_ids {
276 let snapshot = match self.tasks.get(&id) {
278 Some(t) => t.clone(),
279 None => continue,
280 };
281
282 fires.push(Fire {
283 task_id: id.clone(),
284 prompt: snapshot.prompt.clone(),
285 fired_at: snapshot.next_fire,
286 final_fire: snapshot.is_expired(now) || !snapshot.recurring,
287 });
288
289 if !snapshot.recurring || snapshot.is_expired(now) {
291 self.tasks.remove(&id);
292 mutated = true;
293 continue;
294 }
295
296 let interval = match &snapshot.schedule {
298 Schedule::Cron(c) => Some(c.clone().approx_interval_seconds()),
299 _ => None,
300 };
301 let nominal = match &snapshot.schedule {
302 Schedule::Cron(c) => c.clone().next_after(now),
303 Schedule::Dynamic => Some(now + Duration::seconds(60)),
304 Schedule::Once(_) => None, };
306 if let Some(nominal) = nominal {
307 let nf = jitter::apply(id.as_str(), nominal, interval, true);
308 if let Some(t) = self.tasks.get_mut(&id) {
309 t.next_fire = nf;
310 mutated = true;
311 }
312 } else {
313 self.tasks.remove(&id);
314 mutated = true;
315 }
316 }
317
318 if mutated {
319 let _ = store::save(&self.session_id, &self.snapshot());
320 }
321 fires
322 }
323
324 fn snapshot(&self) -> Vec<Task> {
325 self.tasks.values().cloned().collect()
326 }
327}
328
329impl Task {
330 pub fn is_expired(&self, now: DateTime<Utc>) -> bool {
331 self.expires_at.is_some_and(|t| now >= t)
332 }
333}
334
335#[derive(Debug, Clone)]
337pub struct Fire {
338 pub task_id: TaskId,
339 pub prompt: String,
340 pub fired_at: DateTime<Utc>,
341 pub final_fire: bool,
344}
345
346fn is_disabled() -> bool {
347 fn flag(name: &str) -> bool {
348 std::env::var(name).map(|v| v == "1" || v.eq_ignore_ascii_case("true")).unwrap_or(false)
349 }
350 flag(CLAUDE_DISABLE_VAR) || flag(ALIAS_DISABLE_VAR)
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356
357 fn fresh_scheduler() -> Scheduler {
358 std::env::remove_var(CLAUDE_DISABLE_VAR);
360 std::env::remove_var(ALIAS_DISABLE_VAR);
361 Scheduler::new("test-session")
362 }
363
364 #[test]
365 fn task_id_is_8_chars() {
366 let id = TaskId::new();
367 assert_eq!(id.as_str().len(), 8);
368 }
369
370 #[test]
371 fn create_then_list_then_delete() {
372 let mut s = fresh_scheduler();
373 let cron = CronExpr::parse("*/5 * * * *").unwrap();
374 let id = s
375 .create(Schedule::Cron(Box::new(cron)), "do work", true)
376 .unwrap();
377 assert_eq!(s.list().len(), 1);
378 assert!(s.delete(&id));
379 assert_eq!(s.list().len(), 0);
380 }
381
382 #[test]
383 fn cap_blocks_create() {
384 let mut s = Scheduler::with_cap("test-cap", 2);
388 s.disabled = false;
389 let cron = CronExpr::parse("*/5 * * * *").unwrap();
390 s.create(Schedule::Cron(Box::new(cron.clone())), "a", true).unwrap();
391 s.create(Schedule::Cron(Box::new(cron.clone())), "b", true).unwrap();
392 let err = s
393 .create(Schedule::Cron(Box::new(cron)), "c", true)
394 .unwrap_err();
395 assert!(matches!(err, SchedulerError::Capacity(2)));
396 }
397
398 #[test]
399 fn tick_fires_due_recurring_and_advances() {
400 let mut s = fresh_scheduler();
401 let cron = CronExpr::parse("*/1 * * * *").unwrap();
402 let id = s.create(Schedule::Cron(Box::new(cron)), "tick", true).unwrap();
403 s.tasks.get_mut(&id).unwrap().next_fire = Utc::now() - Duration::seconds(5);
405
406 let fires = s.tick(Utc::now());
407 assert_eq!(fires.len(), 1);
408 assert_eq!(fires[0].task_id, id);
409 assert!(!fires[0].final_fire);
410 let t = s.tasks.get(&id).unwrap();
412 assert!(t.next_fire > Utc::now());
413 }
414
415 #[test]
416 fn one_shot_is_removed_after_fire() {
417 let mut s = fresh_scheduler();
418 let when = Utc::now() - Duration::seconds(1);
419 let id = s
420 .create(Schedule::Once(when), "one-shot", false)
421 .unwrap();
422 let fires = s.tick(Utc::now());
423 assert_eq!(fires.len(), 1);
424 assert!(fires[0].final_fire);
425 assert!(!s.tasks.contains_key(&id));
426 }
427
428 #[test]
429 fn expired_recurring_fires_once_then_removed() {
430 let mut s = fresh_scheduler();
431 let cron = CronExpr::parse("*/1 * * * *").unwrap();
432 let id = s.create(Schedule::Cron(Box::new(cron)), "old", true).unwrap();
433 let t = s.tasks.get_mut(&id).unwrap();
435 t.created_at = Utc::now() - Duration::days(8);
436 t.expires_at = Some(Utc::now() - Duration::hours(1));
437 t.next_fire = Utc::now() - Duration::seconds(5);
438
439 let fires = s.tick(Utc::now());
440 assert_eq!(fires.len(), 1);
441 assert!(fires[0].final_fire);
442 assert!(!s.tasks.contains_key(&id));
443 }
444
445 #[test]
446 fn disabled_blocks_create_and_tick() {
447 let mut s = Scheduler::new("disabled");
450 s.disabled = true;
451 let cron = CronExpr::parse("*/5 * * * *").unwrap();
452 let err = s.create(Schedule::Cron(Box::new(cron)), "x", true).unwrap_err();
453 assert!(matches!(err, SchedulerError::Disabled(_)));
454 assert!(s.tick(Utc::now()).is_empty());
455 }
456
457 #[test]
458 fn env_var_triggers_disable() {
459 let key = "DEEPSEEK_LOOP_DISABLE_CRON_TEST_ONLY";
463 std::env::set_var(key, "1");
464 let observed = std::env::var(key).map(|v| v == "1").unwrap_or(false);
465 std::env::remove_var(key);
466 assert!(observed, "env-var sanity check failed");
467 }
468}