1use anyhow::{Context, Result, anyhow, bail};
2use chrono::{
3 DateTime, Datelike, Duration as ChronoDuration, Local, LocalResult, NaiveDateTime, NaiveTime,
4 TimeZone, Timelike, Utc,
5};
6use humantime::parse_duration as parse_human_duration;
7use regex::Regex;
8use serde::{Deserialize, Serialize};
9use std::collections::{BTreeMap, BTreeSet};
10use std::fmt;
11use std::fs::{self, OpenOptions};
12use std::io::Write;
13use std::path::{Path, PathBuf};
14use std::process::Stdio;
15use std::sync::LazyLock;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Duration;
18use tokio::process::Command;
19
20use crate::config::defaults::{get_config_dir, get_data_dir};
21use crate::notifications::{NotificationEvent, send_global_notification};
22use crate::utils::path::normalize_path;
23
24pub const DEFAULT_LOOP_INTERVAL_MINUTES: u64 = 10;
25pub const MAX_SCHEDULED_TASKS: usize = 50;
26pub const SESSION_TASK_EXPIRY_HOURS: i64 = 72;
27pub const DISABLE_CRON_ENV: &str = "VTCODE_DISABLE_CRON";
28pub const DURABLE_SCHEDULER_RUNTIME_HINT: &str = "Durable tasks fire while VT Code is open, `vtcode schedule serve` is running, or the installed scheduler service is active.";
29
30const SESSION_JITTER_CAP_SECS: u64 = 15 * 60;
31const ONE_SHOT_TOP_OF_HOUR_JITTER_SECS: u64 = 90;
32const CLAIM_STALE_SECS: u64 = 15 * 60;
33const SERVICE_NAME: &str = "vtcode-scheduler";
34const LAUNCHD_LABEL: &str = "com.vtcode.scheduler";
35const DURABLE_STORE_DIR: &str = "scheduled_tasks";
36
37static NEXT_TASK_COUNTER: AtomicU64 = AtomicU64::new(1);
38
39#[cfg(test)]
40mod test_env_overrides {
41 use std::sync::{LazyLock, Mutex};
42
43 static DISABLE_CRON: LazyLock<Mutex<Option<String>>> = LazyLock::new(|| Mutex::new(None));
44
45 pub(super) fn get() -> Option<String> {
46 DISABLE_CRON.lock().ok().and_then(|value| value.clone())
47 }
48
49 pub(super) fn set(value: Option<&str>) {
50 if let Ok(mut slot) = DISABLE_CRON.lock() {
51 *slot = value.map(ToString::to_string);
52 }
53 }
54}
55
56#[cfg(test)]
57mod tests;
58
59static LEADING_INTERVAL_RE: LazyLock<Regex> = LazyLock::new(|| {
60 Regex::new(
61 r"(?ix)
62 ^\s*
63 (?P<count>\d+)
64 \s*
65 (?P<unit>s|sec|secs|second|seconds|m|min|mins|minute|minutes|h|hr|hrs|hour|hours|d|day|days)
66 \s+
67 (?P<prompt>.+)
68 $",
69 )
70 .expect("leading interval regex")
71});
72
73static TRAILING_INTERVAL_RE: LazyLock<Regex> = LazyLock::new(|| {
74 Regex::new(
75 r"(?ix)
76 ^\s*
77 (?P<prompt>.+?)
78 \s+
79 every
80 \s+
81 (?P<count>\d+)
82 \s*
83 (?P<unit>s|sec|secs|second|seconds|m|min|mins|minute|minutes|h|hr|hrs|hour|hours|d|day|days)
84 \s*
85 $",
86 )
87 .expect("trailing interval regex")
88});
89
90static REMIND_AT_RE: LazyLock<Regex> = LazyLock::new(|| {
91 Regex::new(r"(?ix)^\s*remind\s+me\s+at\s+(?P<when>.+?)\s+to\s+(?P<prompt>.+)\s*$")
92 .expect("remind-at regex")
93});
94
95static REMIND_IN_RE: LazyLock<Regex> = LazyLock::new(|| {
96 Regex::new(
97 r"(?ix)
98 ^\s*in\s+
99 (?P<count>\d+)
100 \s*
101 (?P<unit>minutes|minute|hours|hour|days|day)
102 \s*,?\s*
103 (?P<prompt>.+)
104 \s*$
105 ",
106 )
107 .expect("remind-in regex")
108});
109
110static TIME_ONLY_RE: LazyLock<Regex> = LazyLock::new(|| {
111 Regex::new(r"(?ix)^\s*(?P<hour>\d{1,2})(?::(?P<minute>\d{2}))?\s*(?P<ampm>am|pm)?\s*$")
112 .expect("time-only regex")
113});
114
115#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
116#[serde(tag = "kind", rename_all = "snake_case")]
117pub enum ScheduledTaskAction {
118 Prompt { prompt: String },
119 Reminder { message: String },
120}
121
122impl ScheduledTaskAction {
123 #[must_use]
124 pub fn summary(&self) -> &str {
125 match self {
126 Self::Prompt { prompt } => prompt,
127 Self::Reminder { message } => message,
128 }
129 }
130
131 #[must_use]
132 pub fn kind_label(&self) -> &'static str {
133 match self {
134 Self::Prompt { .. } => "prompt",
135 Self::Reminder { .. } => "reminder",
136 }
137 }
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
141#[serde(tag = "type", rename_all = "snake_case")]
142pub enum ScheduleSpec {
143 Cron5(Cron5),
144 FixedInterval(FixedInterval),
145 OneShot(OneShot),
146}
147
148impl ScheduleSpec {
149 pub fn cron5(expression: impl Into<String>) -> Result<Self> {
150 Ok(Self::Cron5(Cron5::parse(expression)?))
151 }
152
153 pub fn fixed_interval(duration: Duration) -> Result<Self> {
154 Ok(Self::FixedInterval(FixedInterval::from_duration(duration)?))
155 }
156
157 pub fn one_shot(at: DateTime<Utc>) -> Self {
158 Self::OneShot(OneShot { at })
159 }
160
161 #[must_use]
162 pub fn is_recurring(&self) -> bool {
163 !matches!(self, Self::OneShot(_))
164 }
165
166 #[must_use]
167 pub fn human_description(&self) -> String {
168 match self {
169 Self::Cron5(spec) => format!("cron {}", spec.expression),
170 Self::FixedInterval(spec) => spec.human_description(),
171 Self::OneShot(spec) => format!(
172 "once at {}",
173 spec.at.with_timezone(&Local).format("%Y-%m-%d %H:%M")
174 ),
175 }
176 }
177
178 pub fn first_base_fire_at(&self, created_at: DateTime<Utc>) -> Result<Option<DateTime<Utc>>> {
179 let created_local = created_at.with_timezone(&Local);
180 let base_local = match self {
181 Self::Cron5(spec) => spec.next_after(created_local)?,
182 Self::FixedInterval(spec) => Some(created_local + spec.chrono_duration()?),
183 Self::OneShot(spec) => Some(spec.at.with_timezone(&Local)),
184 };
185 Ok(base_local.map(|value| value.with_timezone(&Utc)))
186 }
187
188 pub fn next_base_fire_after(
189 &self,
190 last_base_fire_at: DateTime<Utc>,
191 ) -> Result<Option<DateTime<Utc>>> {
192 let last_base_local = last_base_fire_at.with_timezone(&Local);
193 let next_local = match self {
194 Self::Cron5(spec) => spec.next_after(last_base_local)?,
195 Self::FixedInterval(spec) => Some(last_base_local + spec.chrono_duration()?),
196 Self::OneShot(_) => None,
197 };
198 Ok(next_local.map(|value| value.with_timezone(&Utc)))
199 }
200
201 fn jittered_fire_at(&self, id: &str, base_fire_at: DateTime<Utc>) -> Result<DateTime<Utc>> {
202 let base_local = base_fire_at.with_timezone(&Local);
203 let hash = stable_hash_u64(id.as_bytes());
204 match self {
205 Self::Cron5(_) | Self::FixedInterval(_) => {
206 let Some(period) = self.nominal_period()? else {
207 return Ok(base_fire_at);
208 };
209 let period_secs = period.num_seconds().max(0) as u64;
210 let max_delay = ((period_secs as f64) * 0.10).floor() as u64;
211 let max_delay = max_delay.min(SESSION_JITTER_CAP_SECS);
212 let delay_secs = if max_delay == 0 {
213 0
214 } else {
215 hash % (max_delay + 1)
216 };
217 Ok(base_fire_at + ChronoDuration::seconds(delay_secs as i64))
218 }
219 Self::OneShot(_) if matches!(base_local.minute(), 0 | 30) => {
220 let lead_secs = hash % (ONE_SHOT_TOP_OF_HOUR_JITTER_SECS + 1);
221 Ok(base_fire_at - ChronoDuration::seconds(lead_secs as i64))
222 }
223 Self::OneShot(_) => Ok(base_fire_at),
224 }
225 }
226
227 fn nominal_period(&self) -> Result<Option<ChronoDuration>> {
228 match self {
229 Self::FixedInterval(spec) => Ok(Some(spec.chrono_duration()?)),
230 Self::Cron5(spec) => spec.approx_period(),
231 Self::OneShot(_) => Ok(None),
232 }
233 }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
237pub struct Cron5 {
238 pub expression: String,
239}
240
241impl Cron5 {
242 pub fn parse(expression: impl Into<String>) -> Result<Self> {
243 let expression = expression.into();
244 ParsedCron::parse(&expression)?;
245 Ok(Self { expression })
246 }
247
248 fn parsed(&self) -> Result<ParsedCron> {
249 ParsedCron::parse(&self.expression)
250 }
251
252 fn next_after(&self, after: DateTime<Local>) -> Result<Option<DateTime<Local>>> {
253 self.parsed()?.next_after(after)
254 }
255
256 fn approx_period(&self) -> Result<Option<ChronoDuration>> {
257 let now = Local::now();
258 let Some(first) = self.next_after(now)? else {
259 return Ok(None);
260 };
261 let Some(second) = self.next_after(first)? else {
262 return Ok(None);
263 };
264 Ok(Some(second - first))
265 }
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
269pub struct FixedInterval {
270 pub seconds: u64,
271}
272
273impl FixedInterval {
274 pub fn from_duration(duration: Duration) -> Result<Self> {
275 if duration.as_secs() < 60 {
276 bail!("Fixed intervals must be at least 1 minute");
277 }
278 Ok(Self {
279 seconds: duration.as_secs(),
280 })
281 }
282
283 pub fn chrono_duration(&self) -> Result<ChronoDuration> {
284 let seconds = i64::try_from(self.seconds).context("interval is too large")?;
285 Ok(ChronoDuration::seconds(seconds))
286 }
287
288 #[must_use]
289 pub fn human_description(&self) -> String {
290 humanize_interval(self.seconds)
291 }
292}
293
294#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
295pub struct OneShot {
296 pub at: DateTime<Utc>,
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
300pub struct ScheduledTaskDefinition {
301 pub id: String,
302 pub name: String,
303 pub schedule: ScheduleSpec,
304 pub action: ScheduledTaskAction,
305 pub workspace: Option<PathBuf>,
306 pub created_at: DateTime<Utc>,
307 pub expires_at: Option<DateTime<Utc>>,
308}
309
310impl ScheduledTaskDefinition {
311 pub fn new(
312 name: Option<String>,
313 schedule: ScheduleSpec,
314 action: ScheduledTaskAction,
315 workspace: Option<PathBuf>,
316 created_at: DateTime<Utc>,
317 expires_at: Option<DateTime<Utc>>,
318 ) -> Result<Self> {
319 let rendered_name = name
320 .map(|value| value.trim().to_string())
321 .filter(|value| !value.is_empty())
322 .unwrap_or_else(|| summarize_task_name(action.summary()));
323 let id = generate_task_id(&rendered_name, action.summary(), created_at);
324 Ok(Self {
325 id,
326 name: rendered_name,
327 schedule,
328 action,
329 workspace,
330 created_at,
331 expires_at,
332 })
333 }
334}
335
336#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
337#[serde(tag = "state", rename_all = "snake_case")]
338pub enum TaskRunStatus {
339 Triggered,
340 ReminderSent,
341 Success,
342 Failed { message: String },
343}
344
345impl fmt::Display for TaskRunStatus {
346 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347 match self {
348 Self::Triggered => write!(f, "triggered"),
349 Self::ReminderSent => write!(f, "reminder_sent"),
350 Self::Success => write!(f, "success"),
351 Self::Failed { message } => write!(f, "failed: {message}"),
352 }
353 }
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
357pub struct ScheduledTaskRuntimeState {
358 pub last_run_at: Option<DateTime<Utc>>,
359 pub next_base_run_at: Option<DateTime<Utc>>,
360 pub next_run_at: Option<DateTime<Utc>>,
361 pub last_status: Option<TaskRunStatus>,
362 pub last_artifact_dir: Option<PathBuf>,
363 pub last_events_file: Option<PathBuf>,
364 pub last_message_file: Option<PathBuf>,
365}
366
367#[derive(Debug, Clone, PartialEq, Eq)]
368pub struct ScheduledTaskRecord {
369 pub definition: ScheduledTaskDefinition,
370 pub runtime: ScheduledTaskRuntimeState,
371}
372
373#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
374pub struct ScheduledTaskSummary {
375 pub id: String,
376 pub name: String,
377 pub action_kind: String,
378 pub schedule: String,
379 pub workspace: Option<PathBuf>,
380 pub recurring: bool,
381 pub next_run_at: Option<DateTime<Utc>>,
382 pub last_run_at: Option<DateTime<Utc>>,
383 pub last_status: Option<String>,
384}
385
386impl ScheduledTaskRecord {
387 fn summary(&self) -> ScheduledTaskSummary {
388 ScheduledTaskSummary {
389 id: self.definition.id.clone(),
390 name: self.definition.name.clone(),
391 action_kind: self.definition.action.kind_label().to_string(),
392 schedule: self.definition.schedule.human_description(),
393 workspace: self.definition.workspace.clone(),
394 recurring: self.definition.schedule.is_recurring(),
395 next_run_at: self.runtime.next_run_at,
396 last_run_at: self.runtime.last_run_at,
397 last_status: self.runtime.last_status.as_ref().map(ToString::to_string),
398 }
399 }
400}
401
402#[derive(Debug, Clone, PartialEq, Eq)]
403pub struct DueSessionPrompt {
404 pub id: String,
405 pub name: String,
406 pub prompt: String,
407}
408
409#[derive(Debug, Default, Clone)]
410pub struct SessionScheduler {
411 tasks: BTreeMap<String, ScheduledTaskRecord>,
412}
413
414impl SessionScheduler {
415 #[must_use]
416 pub fn new() -> Self {
417 Self {
418 tasks: BTreeMap::new(),
419 }
420 }
421
422 #[must_use]
423 pub fn len(&self) -> usize {
424 self.tasks.len()
425 }
426
427 #[must_use]
428 pub fn is_empty(&self) -> bool {
429 self.tasks.is_empty()
430 }
431
432 pub fn create_prompt_task(
433 &mut self,
434 name: Option<String>,
435 prompt: String,
436 schedule: ScheduleSpec,
437 created_at: DateTime<Utc>,
438 ) -> Result<ScheduledTaskSummary> {
439 self.ensure_capacity()?;
440 let expires_at = schedule
441 .is_recurring()
442 .then(|| created_at + ChronoDuration::hours(SESSION_TASK_EXPIRY_HOURS));
443 let definition = ScheduledTaskDefinition::new(
444 name,
445 schedule,
446 ScheduledTaskAction::Prompt { prompt },
447 None,
448 created_at,
449 expires_at,
450 )?;
451 let runtime = initialize_runtime_state(&definition)?;
452 let record = ScheduledTaskRecord {
453 definition: definition.clone(),
454 runtime,
455 };
456 let summary = record.summary();
457 self.tasks.insert(definition.id.clone(), record);
458 Ok(summary)
459 }
460
461 pub fn list(&self) -> Vec<ScheduledTaskSummary> {
462 self.tasks
463 .values()
464 .map(ScheduledTaskRecord::summary)
465 .collect()
466 }
467
468 pub fn delete(&mut self, query: &str) -> Option<ScheduledTaskSummary> {
469 let query = query.trim();
470 if query.is_empty() {
471 return None;
472 }
473 if let Some(record) = self.tasks.remove(query) {
474 return Some(record.summary());
475 }
476 let key = self.tasks.iter().find_map(|(id, record)| {
477 record
478 .definition
479 .name
480 .eq_ignore_ascii_case(query)
481 .then(|| id.clone())
482 })?;
483 self.tasks.remove(&key).map(|record| record.summary())
484 }
485
486 pub fn collect_due_prompts(&mut self, now: DateTime<Utc>) -> Result<Vec<DueSessionPrompt>> {
487 let mut due = Vec::new();
488 let mut completed = Vec::new();
489 for record in self.tasks.values_mut() {
490 let Some(next_run_at) = record.runtime.next_run_at else {
491 continue;
492 };
493 if now < next_run_at {
494 continue;
495 }
496
497 if let Some(prompt) = due_session_prompt(record) {
498 due.push(prompt);
499 }
500
501 if advance_record_runtime(record, now, TaskRunStatus::Triggered)? {
502 completed.push(record.definition.id.clone());
503 }
504 }
505
506 for id in completed {
507 self.tasks.remove(&id);
508 }
509
510 Ok(due)
511 }
512
513 fn ensure_capacity(&self) -> Result<()> {
514 if self.tasks.len() >= MAX_SCHEDULED_TASKS {
515 bail!(
516 "A session can hold at most {} scheduled tasks",
517 MAX_SCHEDULED_TASKS
518 );
519 }
520 Ok(())
521 }
522}
523
524#[derive(Debug, Clone, PartialEq, Eq)]
525pub struct LoopCommand {
526 pub prompt: String,
527 pub interval: FixedInterval,
528 pub normalization_note: Option<String>,
529}
530
531#[derive(Debug, Clone, PartialEq, Eq)]
532pub enum SessionLanguageCommand {
533 CreateOneShotPrompt {
534 prompt: String,
535 run_at: DateTime<Utc>,
536 },
537 ListTasks,
538 CancelTask {
539 query: String,
540 },
541}
542
543#[derive(Debug, Clone, PartialEq, Eq)]
544pub struct ScheduleCreateInput {
545 pub name: Option<String>,
546 pub prompt: Option<String>,
547 pub reminder: Option<String>,
548 pub every: Option<String>,
549 pub cron: Option<String>,
550 pub at: Option<String>,
551 pub workspace: Option<PathBuf>,
552}
553
554impl ScheduleCreateInput {
555 pub fn build_definition(
556 self,
557 now: DateTime<Local>,
558 default_workspace: Option<PathBuf>,
559 ) -> Result<ScheduledTaskDefinition> {
560 let action = match (self.prompt, self.reminder) {
561 (Some(prompt), None) => ScheduledTaskAction::Prompt { prompt },
562 (None, Some(reminder)) => ScheduledTaskAction::Reminder { message: reminder },
563 (Some(_), Some(_)) => bail!("Choose either --prompt or --reminder"),
564 (None, None) => bail!("One of --prompt or --reminder is required"),
565 };
566
567 let schedule = match (self.every, self.cron, self.at) {
568 (Some(raw), None, None) => {
569 let duration = parse_human_duration(raw.trim())
570 .with_context(|| format!("Invalid --every duration: {}", raw.trim()))?;
571 ScheduleSpec::fixed_interval(duration)?
572 }
573 (None, Some(expression), None) => ScheduleSpec::cron5(expression)?,
574 (None, None, Some(raw)) => {
575 ScheduleSpec::one_shot(parse_local_datetime(raw.trim(), now)?)
576 }
577 _ => bail!("Choose exactly one of --every, --cron, or --at"),
578 };
579
580 let workspace = match (&action, self.workspace.or(default_workspace)) {
581 (ScheduledTaskAction::Prompt { .. }, Some(path)) => Some(path),
582 (ScheduledTaskAction::Prompt { .. }, None) => {
583 bail!("Prompt tasks require a workspace (pass --workspace or create from chat)")
584 }
585 (ScheduledTaskAction::Reminder { .. }, path) => path,
586 }
587 .map(|path| resolve_scheduled_workspace_path(&path))
588 .transpose()?;
589
590 if matches!(action, ScheduledTaskAction::Prompt { .. })
591 && workspace.as_ref().is_some_and(|path| !path.is_dir())
592 {
593 let workspace = workspace.as_ref().expect("prompt workspace should exist");
594 bail!(
595 "Prompt task workspace does not exist or is not a directory: {}",
596 workspace.display()
597 );
598 }
599
600 ScheduledTaskDefinition::new(
601 self.name,
602 schedule,
603 action,
604 workspace,
605 now.with_timezone(&Utc),
606 None,
607 )
608 }
609}
610
611fn resolve_scheduled_workspace_path(path: &Path) -> Result<PathBuf> {
612 resolve_scheduled_workspace_path_with_home(path, dirs::home_dir().as_deref())
613}
614
615fn resolve_scheduled_workspace_path_with_home(
616 path: &Path,
617 home_dir: Option<&Path>,
618) -> Result<PathBuf> {
619 let expanded = expand_scheduled_workspace_home(path, home_dir);
620 let absolute = if expanded.is_absolute() {
621 expanded
622 } else {
623 std::env::current_dir()
624 .context("Failed to resolve current directory for scheduled task workspace")?
625 .join(expanded)
626 };
627 Ok(normalize_path(&absolute))
628}
629
630fn expand_scheduled_workspace_home(path: &Path, home_dir: Option<&Path>) -> PathBuf {
631 let Some(raw) = path.to_str() else {
632 return path.to_path_buf();
633 };
634
635 if raw == "~" {
636 return home_dir
637 .map(Path::to_path_buf)
638 .unwrap_or_else(|| path.to_path_buf());
639 }
640
641 if let Some(rest) = raw.strip_prefix("~/")
642 && let Some(home_dir) = home_dir
643 {
644 return home_dir.join(rest);
645 }
646
647 path.to_path_buf()
648}
649
650#[derive(Debug, Clone)]
651pub struct SchedulerPaths {
652 pub config_root: PathBuf,
653 pub data_root: PathBuf,
654}
655
656impl SchedulerPaths {
657 pub fn new_default() -> Result<Self> {
658 let config_root = get_config_dir()
659 .ok_or_else(|| anyhow!("Failed to resolve VT Code config directory"))?
660 .join(DURABLE_STORE_DIR);
661 let data_root = get_data_dir()
662 .ok_or_else(|| anyhow!("Failed to resolve VT Code data directory"))?
663 .join(DURABLE_STORE_DIR);
664 Ok(Self {
665 config_root,
666 data_root,
667 })
668 }
669
670 #[must_use]
671 pub fn tasks_dir(&self) -> PathBuf {
672 self.config_root.join("tasks")
673 }
674
675 #[must_use]
676 pub fn state_dir(&self) -> PathBuf {
677 self.data_root.join("state")
678 }
679
680 #[must_use]
681 pub fn claims_dir(&self) -> PathBuf {
682 self.data_root.join("claims")
683 }
684
685 #[must_use]
686 pub fn runs_dir(&self) -> PathBuf {
687 self.data_root.join("runs")
688 }
689
690 pub fn ensure_dirs(&self) -> Result<()> {
691 for dir in [
692 &self.config_root,
693 &self.data_root,
694 &self.tasks_dir(),
695 &self.state_dir(),
696 &self.claims_dir(),
697 &self.runs_dir(),
698 ] {
699 fs::create_dir_all(dir).with_context(|| {
700 format!("Failed to create scheduler directory {}", dir.display())
701 })?;
702 }
703 Ok(())
704 }
705
706 #[must_use]
707 pub fn definition_path(&self, id: &str) -> PathBuf {
708 self.tasks_dir().join(format!("{id}.toml"))
709 }
710
711 #[must_use]
712 pub fn runtime_path(&self, id: &str) -> PathBuf {
713 self.state_dir().join(format!("{id}.json"))
714 }
715
716 #[must_use]
717 pub fn claim_path(&self, id: &str) -> PathBuf {
718 self.claims_dir().join(format!("{id}.claim"))
719 }
720
721 #[must_use]
722 pub fn artifact_dir(&self, id: &str, run_at: DateTime<Utc>) -> PathBuf {
723 self.runs_dir()
724 .join(id)
725 .join(run_at.format("%Y%m%dT%H%M%SZ").to_string())
726 }
727}
728
729#[derive(Debug, Clone)]
730pub struct DurableTaskStore {
731 paths: SchedulerPaths,
732}
733
734impl DurableTaskStore {
735 #[must_use]
736 pub fn with_paths(paths: SchedulerPaths) -> Self {
737 Self { paths }
738 }
739
740 pub fn new_default() -> Result<Self> {
741 Ok(Self {
742 paths: SchedulerPaths::new_default()?,
743 })
744 }
745
746 #[must_use]
747 pub fn paths(&self) -> &SchedulerPaths {
748 &self.paths
749 }
750
751 pub fn create(&self, definition: ScheduledTaskDefinition) -> Result<ScheduledTaskSummary> {
752 self.paths.ensure_dirs()?;
753 let current_count = self.definition_paths()?.len();
754 if current_count >= MAX_SCHEDULED_TASKS {
755 bail!(
756 "VT Code supports at most {} durable scheduled tasks",
757 MAX_SCHEDULED_TASKS
758 );
759 }
760
761 let runtime = initialize_runtime_state(&definition)?;
762 self.write_definition(&definition)?;
763 self.write_runtime(&definition.id, &runtime)?;
764 Ok(ScheduledTaskRecord {
765 definition,
766 runtime,
767 }
768 .summary())
769 }
770
771 pub fn create_from_input(
772 &self,
773 input: ScheduleCreateInput,
774 now: DateTime<Local>,
775 default_workspace: Option<PathBuf>,
776 ) -> Result<ScheduledTaskSummary> {
777 let definition = input.build_definition(now, default_workspace)?;
778 self.create(definition)
779 }
780
781 pub fn list(&self) -> Result<Vec<ScheduledTaskSummary>> {
782 let mut records = self.load_records()?;
783 records.sort_by_key(|record| record.runtime.next_run_at);
784 Ok(records.into_iter().map(|record| record.summary()).collect())
785 }
786
787 pub fn delete(&self, id: &str) -> Result<Option<ScheduledTaskSummary>> {
788 let Some(record) = self.load_record(id)? else {
789 return Ok(None);
790 };
791 let _ = fs::remove_file(self.paths.definition_path(id));
792 let _ = fs::remove_file(self.paths.runtime_path(id));
793 let _ = fs::remove_file(self.paths.claim_path(id));
794 Ok(Some(record.summary()))
795 }
796
797 pub fn load_record(&self, id: &str) -> Result<Option<ScheduledTaskRecord>> {
798 let definition_path = self.paths.definition_path(id);
799 if !definition_path.exists() {
800 return Ok(None);
801 }
802 let definition = read_definition(&definition_path)?;
803 let runtime = match self.read_runtime(&definition.id)? {
804 Some(runtime) => runtime,
805 None => initialize_runtime_state(&definition)?,
806 };
807 Ok(Some(ScheduledTaskRecord {
808 definition,
809 runtime,
810 }))
811 }
812
813 pub fn update_runtime(&self, record: &ScheduledTaskRecord) -> Result<()> {
814 self.write_runtime(&record.definition.id, &record.runtime)
815 }
816
817 fn load_records(&self) -> Result<Vec<ScheduledTaskRecord>> {
818 self.paths.ensure_dirs()?;
819 let mut records = Vec::new();
820 for definition_path in self.definition_paths()? {
821 let definition = read_definition(&definition_path)?;
822 let runtime = self
823 .read_runtime(&definition.id)?
824 .unwrap_or(initialize_runtime_state(&definition)?);
825 records.push(ScheduledTaskRecord {
826 definition,
827 runtime,
828 });
829 }
830 Ok(records)
831 }
832
833 fn definition_paths(&self) -> Result<Vec<PathBuf>> {
834 self.paths.ensure_dirs()?;
835 let mut paths = Vec::new();
836 for entry in fs::read_dir(self.paths.tasks_dir())
837 .with_context(|| format!("Failed to read {}", self.paths.tasks_dir().display()))?
838 {
839 let entry = entry?;
840 let path = entry.path();
841 if path.extension().and_then(|value| value.to_str()) == Some("toml") {
842 paths.push(path);
843 }
844 }
845 paths.sort();
846 Ok(paths)
847 }
848
849 fn write_definition(&self, definition: &ScheduledTaskDefinition) -> Result<()> {
850 let serialized =
851 toml::to_string_pretty(definition).context("Failed to serialize task definition")?;
852 atomic_write(
853 &self.paths.definition_path(&definition.id),
854 serialized.as_bytes(),
855 )
856 }
857
858 fn read_runtime(&self, id: &str) -> Result<Option<ScheduledTaskRuntimeState>> {
859 let path = self.paths.runtime_path(id);
860 if !path.exists() {
861 return Ok(None);
862 }
863 let raw = fs::read_to_string(&path)
864 .with_context(|| format!("Failed to read {}", path.display()))?;
865 let runtime = serde_json::from_str(&raw)
866 .with_context(|| format!("Failed to parse {}", path.display()))?;
867 Ok(Some(runtime))
868 }
869
870 fn write_runtime(&self, id: &str, runtime: &ScheduledTaskRuntimeState) -> Result<()> {
871 let serialized =
872 serde_json::to_vec_pretty(runtime).context("Failed to serialize runtime state")?;
873 atomic_write(&self.paths.runtime_path(id), &serialized)
874 }
875}
876
877#[derive(Debug, Clone)]
878pub struct SchedulerDaemon {
879 store: DurableTaskStore,
880 executable_path: PathBuf,
881}
882
883impl SchedulerDaemon {
884 #[must_use]
885 pub fn new(store: DurableTaskStore, executable_path: PathBuf) -> Self {
886 Self {
887 store,
888 executable_path,
889 }
890 }
891
892 pub async fn serve_forever(&self) -> Result<()> {
893 loop {
894 self.run_due_tasks_once().await?;
895 tokio::time::sleep(Duration::from_secs(1)).await;
896 }
897 }
898
899 pub async fn run_due_tasks_once(&self) -> Result<usize> {
900 let now = Utc::now();
901 let mut records = self.store.load_records()?;
902 let mut executed = 0usize;
903
904 records.sort_by_key(|record| record.runtime.next_run_at);
905 for mut record in records {
906 let Some(next_run_at) = record.runtime.next_run_at else {
907 continue;
908 };
909 if now < next_run_at {
910 continue;
911 }
912 if !try_acquire_claim(self.store.paths(), &record.definition.id)? {
913 continue;
914 }
915
916 let result = self.execute_record(&record, now).await;
917 let release_result = release_claim(self.store.paths(), &record.definition.id);
918 let run_outcome = result?;
919 release_result?;
920
921 apply_run_outcome(&mut record, run_outcome)?;
922 self.store.update_runtime(&record)?;
923 executed = executed.saturating_add(1);
924 }
925
926 Ok(executed)
927 }
928
929 async fn execute_record(
930 &self,
931 record: &ScheduledTaskRecord,
932 run_at: DateTime<Utc>,
933 ) -> Result<RunOutcome> {
934 match &record.definition.action {
935 ScheduledTaskAction::Reminder { message } => {
936 let notification = send_global_notification(NotificationEvent::IdlePrompt {
937 title: format!("Scheduled reminder: {}", record.definition.name),
938 message: message.clone(),
939 })
940 .await;
941 let status = match notification {
942 Ok(()) => TaskRunStatus::ReminderSent,
943 Err(error) => TaskRunStatus::Failed {
944 message: format!("failed to send reminder notification: {error:#}"),
945 },
946 };
947 Ok(RunOutcome {
948 ran_at: run_at,
949 status,
950 artifact_dir: None,
951 events_file: None,
952 last_message_file: None,
953 })
954 }
955 ScheduledTaskAction::Prompt { prompt } => {
956 let artifact_dir = self
957 .store
958 .paths()
959 .artifact_dir(&record.definition.id, run_at);
960 let events_file = artifact_dir.join("events.jsonl");
961 let last_message_file = artifact_dir.join("last-message.txt");
962 let workspace_label = scheduled_workspace_label(&record.definition);
963 let workspace = resolve_scheduled_task_workspace(&record.definition);
964 let execution = async {
965 tokio::fs::create_dir_all(&artifact_dir)
966 .await
967 .with_context(|| {
968 format!(
969 "Failed to create run artifact dir {}",
970 artifact_dir.display()
971 )
972 })?;
973
974 let mut command = Command::new(&self.executable_path);
975 command
976 .arg("exec")
977 .arg("--events")
978 .arg(&events_file)
979 .arg("--last-message-file")
980 .arg(&last_message_file)
981 .arg(prompt)
982 .stdin(Stdio::null())
983 .stdout(Stdio::null())
984 .stderr(Stdio::null());
985
986 if let Some(workspace) = workspace? {
987 command.current_dir(&workspace);
988 }
989
990 command.status().await.with_context(|| {
991 format!(
992 "Failed to spawn scheduled VT Code exec for task {} using {} in {}",
993 record.definition.id,
994 self.executable_path.display(),
995 workspace_label
996 )
997 })
998 }
999 .await;
1000 let run_status = match execution {
1001 Ok(status) if status.success() => TaskRunStatus::Success,
1002 Ok(status) => TaskRunStatus::Failed {
1003 message: format!("vtcode exec exited with {}", status),
1004 },
1005 Err(error) => TaskRunStatus::Failed {
1006 message: format!("{error:#}"),
1007 },
1008 };
1009 Ok(RunOutcome {
1010 ran_at: run_at,
1011 status: run_status,
1012 artifact_dir: artifact_dir.is_dir().then_some(artifact_dir),
1013 events_file: events_file.is_file().then_some(events_file),
1014 last_message_file: last_message_file.is_file().then_some(last_message_file),
1015 })
1016 }
1017 }
1018 }
1019}
1020
1021#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1022pub enum ServiceManager {
1023 Launchd,
1024 SystemdUser,
1025}
1026
1027impl ServiceManager {
1028 #[must_use]
1029 pub fn current() -> Option<Self> {
1030 #[cfg(target_os = "macos")]
1031 {
1032 return Some(Self::Launchd);
1033 }
1034 #[cfg(all(unix, not(target_os = "macos")))]
1035 {
1036 return Some(Self::SystemdUser);
1037 }
1038 #[expect(unreachable_code)]
1039 None
1040 }
1041}
1042
1043#[derive(Debug, Clone)]
1044pub struct ServiceInstallPlan {
1045 pub manager: ServiceManager,
1046 pub path: PathBuf,
1047 pub contents: String,
1048}
1049
1050pub fn render_service_install_plan(executable_path: &Path) -> Result<ServiceInstallPlan> {
1051 let manager = ServiceManager::current()
1052 .ok_or_else(|| anyhow!("Durable scheduler services are unsupported on this platform"))?;
1053 let path = match manager {
1054 ServiceManager::Launchd => dirs::home_dir()
1055 .ok_or_else(|| anyhow!("Failed to resolve home directory"))?
1056 .join("Library/LaunchAgents")
1057 .join(format!("{LAUNCHD_LABEL}.plist")),
1058 ServiceManager::SystemdUser => dirs::home_dir()
1059 .ok_or_else(|| anyhow!("Failed to resolve home directory"))?
1060 .join(".config/systemd/user")
1061 .join(format!("{SERVICE_NAME}.service")),
1062 };
1063 let contents = match manager {
1064 ServiceManager::Launchd => render_launchd_plist(executable_path),
1065 ServiceManager::SystemdUser => render_systemd_unit(executable_path),
1066 };
1067 Ok(ServiceInstallPlan {
1068 manager,
1069 path,
1070 contents,
1071 })
1072}
1073
1074pub fn install_service_file(executable_path: &Path) -> Result<ServiceInstallPlan> {
1075 let plan = render_service_install_plan(executable_path)?;
1076 if let Some(parent) = plan.path.parent() {
1077 fs::create_dir_all(parent)
1078 .with_context(|| format!("Failed to create {}", parent.display()))?;
1079 }
1080 atomic_write(&plan.path, plan.contents.as_bytes())?;
1081 Ok(plan)
1082}
1083
1084pub fn uninstall_service_file() -> Result<Option<(ServiceManager, PathBuf, bool)>> {
1085 let Some(manager) = ServiceManager::current() else {
1086 return Ok(None);
1087 };
1088 let path = render_service_install_plan(Path::new("/tmp/vtcode"))?.path;
1089 if path.exists() {
1090 fs::remove_file(&path).with_context(|| format!("Failed to remove {}", path.display()))?;
1091 return Ok(Some((manager, path, true)));
1092 }
1093 Ok(Some((manager, path, false)))
1094}
1095
1096#[must_use]
1097pub fn render_launchd_plist(executable_path: &Path) -> String {
1098 format!(
1099 r#"<?xml version="1.0" encoding="UTF-8"?>
1100<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
1101<plist version="1.0">
1102 <dict>
1103 <key>Label</key>
1104 <string>{LAUNCHD_LABEL}</string>
1105 <key>ProgramArguments</key>
1106 <array>
1107 <string>{}</string>
1108 <string>schedule</string>
1109 <string>serve</string>
1110 </array>
1111 <key>RunAtLoad</key>
1112 <true/>
1113 <key>KeepAlive</key>
1114 <true/>
1115 </dict>
1116</plist>
1117"#,
1118 xml_escape(executable_path.display().to_string())
1119 )
1120}
1121
1122#[must_use]
1123pub fn render_systemd_unit(executable_path: &Path) -> String {
1124 format!(
1125 "[Unit]\nDescription=VT Code scheduler\n\n[Service]\nType=simple\nExecStart={} schedule serve\nRestart=always\nRestartSec=5\n\n[Install]\nWantedBy=default.target\n",
1126 shell_words::quote(executable_path.to_string_lossy().as_ref())
1127 )
1128}
1129
1130pub fn scheduled_tasks_enabled(enabled_in_config: bool) -> bool {
1131 #[cfg(test)]
1132 if let Some(value) = test_env_overrides::get() {
1133 let normalized = value.trim().to_ascii_lowercase();
1134 if matches!(normalized.as_str(), "1" | "true" | "yes" | "on") {
1135 return false;
1136 }
1137 }
1138
1139 if let Ok(value) = std::env::var(DISABLE_CRON_ENV) {
1140 let normalized = value.trim().to_ascii_lowercase();
1141 if matches!(normalized.as_str(), "1" | "true" | "yes" | "on") {
1142 return false;
1143 }
1144 }
1145 enabled_in_config
1146}
1147
1148#[must_use]
1149pub fn durable_task_is_overdue(
1150 next_run_at: Option<DateTime<Utc>>,
1151 last_run_at: Option<DateTime<Utc>>,
1152 has_last_status: bool,
1153 now: DateTime<Utc>,
1154) -> bool {
1155 !has_last_status && last_run_at.is_none() && next_run_at.is_some_and(|next_run| next_run <= now)
1156}
1157
1158pub fn parse_loop_command(args: &str) -> Result<LoopCommand> {
1159 let trimmed = args.trim();
1160 if trimmed.is_empty() {
1161 bail!("Usage: /loop [interval] <prompt>");
1162 }
1163
1164 let (raw_prompt, count, unit) = if let Some(captures) = LEADING_INTERVAL_RE.captures(trimmed) {
1165 (
1166 captures["prompt"].trim().to_string(),
1167 captures["count"].parse::<u64>()?,
1168 captures["unit"].to_string(),
1169 )
1170 } else if let Some(captures) = TRAILING_INTERVAL_RE.captures(trimmed) {
1171 (
1172 captures["prompt"].trim().to_string(),
1173 captures["count"].parse::<u64>()?,
1174 captures["unit"].to_string(),
1175 )
1176 } else {
1177 (
1178 trimmed.to_string(),
1179 DEFAULT_LOOP_INTERVAL_MINUTES,
1180 "minutes".to_string(),
1181 )
1182 };
1183
1184 if raw_prompt.trim().is_empty() {
1185 bail!("Usage: /loop [interval] <prompt>");
1186 }
1187
1188 let (seconds, note) = normalize_interval_spec(count, &unit)?;
1189 Ok(LoopCommand {
1190 prompt: raw_prompt,
1191 interval: FixedInterval { seconds },
1192 normalization_note: note,
1193 })
1194}
1195
1196pub fn parse_session_language_command(
1197 input: &str,
1198 now: DateTime<Local>,
1199) -> Option<Result<SessionLanguageCommand>> {
1200 let trimmed = input.trim();
1201 if trimmed.is_empty() {
1202 return None;
1203 }
1204
1205 let normalized = trimmed
1206 .trim_end_matches(['?', '.', '!'])
1207 .to_ascii_lowercase();
1208 if normalized == "what scheduled tasks do i have" {
1209 return Some(Ok(SessionLanguageCommand::ListTasks));
1210 }
1211
1212 if let Some(captures) = REMIND_AT_RE.captures(trimmed) {
1213 let when = captures
1214 .name("when")
1215 .map(|value| value.as_str())
1216 .unwrap_or_default();
1217 let prompt = captures
1218 .name("prompt")
1219 .map(|value| value.as_str().trim().to_string())
1220 .unwrap_or_default();
1221 return Some(
1222 parse_local_datetime(when, now)
1223 .map(|run_at| SessionLanguageCommand::CreateOneShotPrompt { prompt, run_at }),
1224 );
1225 }
1226
1227 if let Some(captures) = REMIND_IN_RE.captures(trimmed) {
1228 let count = captures["count"].parse::<i64>().ok()?;
1229 let prompt = captures["prompt"].trim().to_string();
1230 let delta = match captures["unit"].to_ascii_lowercase().as_str() {
1231 "minute" | "minutes" => ChronoDuration::minutes(count),
1232 "hour" | "hours" => ChronoDuration::hours(count),
1233 "day" | "days" => ChronoDuration::days(count),
1234 _ => return None,
1235 };
1236 return Some(Ok(SessionLanguageCommand::CreateOneShotPrompt {
1237 prompt,
1238 run_at: (now + delta).with_timezone(&Utc),
1239 }));
1240 }
1241
1242 if let Some(query) = trimmed.strip_prefix("cancel ") {
1243 return Some(Ok(SessionLanguageCommand::CancelTask {
1244 query: query.trim().to_string(),
1245 }));
1246 }
1247
1248 None
1249}
1250
1251pub fn parse_schedule_create_args(args: &str) -> Result<ScheduleCreateInput> {
1252 let tokens =
1253 shell_words::split(args).with_context(|| format!("Failed to parse arguments: {args}"))?;
1254 parse_schedule_create_tokens(&tokens)
1255}
1256
1257pub fn parse_schedule_create_tokens(tokens: &[String]) -> Result<ScheduleCreateInput> {
1258 let mut name = None;
1259 let mut prompt = None;
1260 let mut reminder = None;
1261 let mut every = None;
1262 let mut cron = None;
1263 let mut at = None;
1264 let mut workspace = None;
1265 let mut index = 0usize;
1266
1267 while index < tokens.len() {
1268 let token = &tokens[index];
1269 let (flag, inline_value) = if let Some((left, right)) = token.split_once('=') {
1270 (left, Some(right.to_string()))
1271 } else {
1272 (token.as_str(), None)
1273 };
1274
1275 let take_value = |idx: &mut usize| -> Result<String> {
1276 if let Some(value) = inline_value.clone() {
1277 return Ok(value);
1278 }
1279 let Some(value) = tokens.get(*idx + 1) else {
1280 bail!("Missing value for {flag}");
1281 };
1282 *idx += 1;
1283 Ok(value.clone())
1284 };
1285
1286 match flag {
1287 "--name" => name = Some(take_value(&mut index)?),
1288 "--prompt" => prompt = Some(take_value(&mut index)?),
1289 "--reminder" => reminder = Some(take_value(&mut index)?),
1290 "--every" => every = Some(take_value(&mut index)?),
1291 "--cron" => cron = Some(take_value(&mut index)?),
1292 "--at" => at = Some(take_value(&mut index)?),
1293 "--workspace" => workspace = Some(PathBuf::from(take_value(&mut index)?)),
1294 "--help" | "help" => {
1295 bail!(
1296 "Usage: /schedule create --prompt <text>|--reminder <text> --every <dur>|--cron <expr>|--at <time> [--name <label>] [--workspace <path>]"
1297 );
1298 }
1299 _ => bail!("Unknown option: {token}"),
1300 }
1301 index += 1;
1302 }
1303
1304 Ok(ScheduleCreateInput {
1305 name,
1306 prompt,
1307 reminder,
1308 every,
1309 cron,
1310 at,
1311 workspace,
1312 })
1313}
1314
1315fn initialize_runtime_state(
1316 definition: &ScheduledTaskDefinition,
1317) -> Result<ScheduledTaskRuntimeState> {
1318 let next_base_run_at = definition
1319 .schedule
1320 .first_base_fire_at(definition.created_at)?;
1321 let next_run_at = next_base_run_at
1322 .map(|base| definition.schedule.jittered_fire_at(&definition.id, base))
1323 .transpose()?;
1324 Ok(ScheduledTaskRuntimeState {
1325 next_base_run_at,
1326 next_run_at,
1327 ..ScheduledTaskRuntimeState::default()
1328 })
1329}
1330
1331#[derive(Debug, Clone, Copy)]
1332struct NextScheduledRun {
1333 base_fire_at: DateTime<Utc>,
1334 fire_at: DateTime<Utc>,
1335}
1336
1337fn due_session_prompt(record: &ScheduledTaskRecord) -> Option<DueSessionPrompt> {
1338 let ScheduledTaskAction::Prompt { prompt } = &record.definition.action else {
1339 return None;
1340 };
1341
1342 Some(DueSessionPrompt {
1343 id: record.definition.id.clone(),
1344 name: record.definition.name.clone(),
1345 prompt: prompt.clone(),
1346 })
1347}
1348
1349fn next_scheduled_run(
1350 definition: &ScheduledTaskDefinition,
1351 last_base_run_at: Option<DateTime<Utc>>,
1352) -> Result<Option<NextScheduledRun>> {
1353 let Some(last_base_run_at) = last_base_run_at else {
1354 return Ok(None);
1355 };
1356
1357 let Some(next_base_run_at) = definition.schedule.next_base_fire_after(last_base_run_at)? else {
1358 return Ok(None);
1359 };
1360
1361 if definition
1362 .expires_at
1363 .is_some_and(|expiry| next_base_run_at > expiry)
1364 {
1365 return Ok(None);
1366 }
1367
1368 Ok(Some(NextScheduledRun {
1369 fire_at: definition
1370 .schedule
1371 .jittered_fire_at(&definition.id, next_base_run_at)?,
1372 base_fire_at: next_base_run_at,
1373 }))
1374}
1375
1376fn advance_record_runtime(
1377 record: &mut ScheduledTaskRecord,
1378 ran_at: DateTime<Utc>,
1379 status: TaskRunStatus,
1380) -> Result<bool> {
1381 record.runtime.last_run_at = Some(ran_at);
1382 record.runtime.last_status = Some(status);
1383
1384 match next_scheduled_run(&record.definition, record.runtime.next_base_run_at)? {
1385 Some(next_run) => {
1386 record.runtime.next_base_run_at = Some(next_run.base_fire_at);
1387 record.runtime.next_run_at = Some(next_run.fire_at);
1388 Ok(false)
1389 }
1390 None => {
1391 record.runtime.next_base_run_at = None;
1392 record.runtime.next_run_at = None;
1393 Ok(true)
1394 }
1395 }
1396}
1397
1398fn apply_run_outcome(record: &mut ScheduledTaskRecord, outcome: RunOutcome) -> Result<()> {
1399 let RunOutcome {
1400 ran_at,
1401 status,
1402 artifact_dir,
1403 events_file,
1404 last_message_file,
1405 } = outcome;
1406
1407 record.runtime.last_artifact_dir = artifact_dir;
1408 record.runtime.last_events_file = events_file;
1409 record.runtime.last_message_file = last_message_file;
1410
1411 advance_record_runtime(record, ran_at, status)?;
1412
1413 Ok(())
1414}
1415
1416#[derive(Debug, Clone)]
1417struct RunOutcome {
1418 ran_at: DateTime<Utc>,
1419 status: TaskRunStatus,
1420 artifact_dir: Option<PathBuf>,
1421 events_file: Option<PathBuf>,
1422 last_message_file: Option<PathBuf>,
1423}
1424
1425fn scheduled_workspace_label(definition: &ScheduledTaskDefinition) -> String {
1426 definition
1427 .workspace
1428 .as_ref()
1429 .map(|path| path.display().to_string())
1430 .unwrap_or_else(|| "<none>".to_string())
1431}
1432
1433fn resolve_scheduled_task_workspace(
1434 definition: &ScheduledTaskDefinition,
1435) -> Result<Option<PathBuf>> {
1436 definition
1437 .workspace
1438 .as_deref()
1439 .map(resolve_scheduled_workspace_path)
1440 .transpose()
1441 .with_context(|| {
1442 format!(
1443 "Failed to resolve scheduled task workspace {} for task {}",
1444 scheduled_workspace_label(definition),
1445 definition.id
1446 )
1447 })
1448}
1449
1450fn read_definition(path: &Path) -> Result<ScheduledTaskDefinition> {
1451 let raw =
1452 fs::read_to_string(path).with_context(|| format!("Failed to read {}", path.display()))?;
1453 toml::from_str(&raw).with_context(|| format!("Failed to parse {}", path.display()))
1454}
1455
1456fn atomic_write(path: &Path, content: &[u8]) -> Result<()> {
1457 if let Some(parent) = path.parent() {
1458 fs::create_dir_all(parent)
1459 .with_context(|| format!("Failed to create {}", parent.display()))?;
1460 }
1461
1462 let temp_name = format!(
1463 ".{}.tmp-{}",
1464 path.file_name()
1465 .and_then(|value| value.to_str())
1466 .unwrap_or("task"),
1467 NEXT_TASK_COUNTER.fetch_add(1, Ordering::Relaxed)
1468 );
1469 let temp_path = path.with_file_name(temp_name);
1470 fs::write(&temp_path, content)
1471 .with_context(|| format!("Failed to write {}", temp_path.display()))?;
1472 fs::rename(&temp_path, path)
1473 .with_context(|| format!("Failed to replace {}", path.display()))?;
1474 Ok(())
1475}
1476
1477fn try_acquire_claim(paths: &SchedulerPaths, id: &str) -> Result<bool> {
1478 paths.ensure_dirs()?;
1479 let path = paths.claim_path(id);
1480 match OpenOptions::new().write(true).create_new(true).open(&path) {
1481 Ok(mut file) => {
1482 let timestamp = Utc::now().to_rfc3339();
1483 file.write_all(timestamp.as_bytes())
1484 .with_context(|| format!("Failed to write {}", path.display()))?;
1485 Ok(true)
1486 }
1487 Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => {
1488 if claim_is_stale(&path)? {
1489 let _ = fs::remove_file(&path);
1490 return try_acquire_claim(paths, id);
1491 }
1492 Ok(false)
1493 }
1494 Err(error) => Err(error).with_context(|| format!("Failed to create {}", path.display())),
1495 }
1496}
1497
1498fn claim_is_stale(path: &Path) -> Result<bool> {
1499 let metadata =
1500 fs::metadata(path).with_context(|| format!("Failed to stat {}", path.display()))?;
1501 let modified = metadata
1502 .modified()
1503 .with_context(|| format!("Failed to read modification time for {}", path.display()))?;
1504 let elapsed = modified.elapsed().unwrap_or_default();
1505 Ok(elapsed >= Duration::from_secs(CLAIM_STALE_SECS))
1506}
1507
1508fn release_claim(paths: &SchedulerPaths, id: &str) -> Result<()> {
1509 let path = paths.claim_path(id);
1510 if path.exists() {
1511 fs::remove_file(&path).with_context(|| format!("Failed to remove {}", path.display()))?;
1512 }
1513 Ok(())
1514}
1515
1516fn summarize_task_name(summary: &str) -> String {
1517 let trimmed = summary.trim();
1518 if trimmed.is_empty() {
1519 return "Scheduled task".to_string();
1520 }
1521 let compact = vtcode_commons::formatting::collapse_whitespace(trimmed);
1522 let mut output = String::new();
1523 for ch in compact.chars().take(32) {
1524 output.push(ch);
1525 }
1526 output
1527}
1528
1529fn generate_task_id(name: &str, summary: &str, created_at: DateTime<Utc>) -> String {
1530 let counter = NEXT_TASK_COUNTER.fetch_add(1, Ordering::Relaxed);
1531 let seed = format!(
1532 "{name}|{summary}|{}|{}|{}",
1533 created_at.timestamp_nanos_opt().unwrap_or_default(),
1534 std::process::id(),
1535 counter
1536 );
1537 format!("{:08x}", stable_hash_u64(seed.as_bytes()) as u32)
1538}
1539
1540fn stable_hash_u64(bytes: &[u8]) -> u64 {
1541 let mut hash = 0xcbf29ce484222325u64;
1542 for byte in bytes {
1543 hash ^= u64::from(*byte);
1544 hash = hash.wrapping_mul(0x100000001b3);
1545 }
1546 hash
1547}
1548
1549fn humanize_interval(seconds: u64) -> String {
1550 match seconds {
1551 value if value % 86_400 == 0 => {
1552 let days = value / 86_400;
1553 if days == 1 {
1554 "every 1 day".to_string()
1555 } else {
1556 format!("every {days} days")
1557 }
1558 }
1559 value if value % 3_600 == 0 => {
1560 let hours = value / 3_600;
1561 if hours == 1 {
1562 "every 1 hour".to_string()
1563 } else {
1564 format!("every {hours} hours")
1565 }
1566 }
1567 value => {
1568 let minutes = value / 60;
1569 if minutes == 1 {
1570 "every 1 minute".to_string()
1571 } else {
1572 format!("every {minutes} minutes")
1573 }
1574 }
1575 }
1576}
1577
1578fn normalize_interval_spec(count: u64, unit: &str) -> Result<(u64, Option<String>)> {
1579 if count == 0 {
1580 bail!("Intervals must be greater than zero");
1581 }
1582
1583 let unit = unit.to_ascii_lowercase();
1584 let (raw_minutes, original) = match unit.as_str() {
1585 "s" | "sec" | "secs" | "second" | "seconds" => {
1586 let minutes = count.div_ceil(60);
1587 (minutes, format!("{count}s"))
1588 }
1589 "m" | "min" | "mins" | "minute" | "minutes" => (count, format!("{count}m")),
1590 "h" | "hr" | "hrs" | "hour" | "hours" => (count * 60, format!("{count}h")),
1591 "d" | "day" | "days" => (count * 60 * 24, format!("{count}d")),
1592 _ => bail!("Unsupported interval unit: {unit}"),
1593 };
1594
1595 let normalized_minutes = normalize_clean_minutes(raw_minutes);
1596 let note = if normalized_minutes != raw_minutes {
1597 Some(format!(
1598 "Rounded {original} to {} for scheduler cadence.",
1599 humanize_interval(normalized_minutes * 60)
1600 ))
1601 } else if raw_minutes != count && unit.starts_with('s') {
1602 Some(format!(
1603 "Rounded {original} up to {} because VT Code schedules at minute granularity.",
1604 humanize_interval(normalized_minutes * 60)
1605 ))
1606 } else {
1607 None
1608 };
1609
1610 Ok((normalized_minutes * 60, note))
1611}
1612
1613fn normalize_clean_minutes(raw_minutes: u64) -> u64 {
1614 const CLEAN_MINUTES: &[u64] = &[
1615 1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30, 60, 120, 180, 240, 360, 480, 720, 1_440,
1616 ];
1617 if CLEAN_MINUTES.contains(&raw_minutes) {
1618 return raw_minutes;
1619 }
1620 CLEAN_MINUTES
1621 .iter()
1622 .copied()
1623 .min_by_key(|candidate| {
1624 let distance = candidate.abs_diff(raw_minutes);
1625 (distance, *candidate < raw_minutes, *candidate)
1626 })
1627 .unwrap_or(raw_minutes)
1628}
1629
1630pub fn parse_local_datetime(raw: &str, now: DateTime<Local>) -> Result<DateTime<Utc>> {
1631 let trimmed = raw.trim();
1632 if trimmed.is_empty() {
1633 bail!("Time value cannot be empty");
1634 }
1635
1636 if let Ok(parsed) = DateTime::parse_from_rfc3339(trimmed) {
1637 return Ok(parsed.with_timezone(&Utc));
1638 }
1639
1640 for format in ["%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M", "%Y-%m-%d %H:%M:%S"] {
1641 if let Ok(naive) = NaiveDateTime::parse_from_str(trimmed, format) {
1642 return localize_naive_datetime(naive).map(|value| value.with_timezone(&Utc));
1643 }
1644 }
1645
1646 if let Some(captures) = TIME_ONLY_RE.captures(trimmed) {
1647 let mut hour = captures["hour"].parse::<u32>()?;
1648 let minute = captures
1649 .name("minute")
1650 .map(|value| value.as_str().parse::<u32>())
1651 .transpose()?
1652 .unwrap_or(0);
1653 if minute >= 60 {
1654 bail!("Invalid minute in time value");
1655 }
1656 if let Some(ampm) = captures
1657 .name("ampm")
1658 .map(|value| value.as_str().to_ascii_lowercase())
1659 {
1660 if hour == 0 || hour > 12 {
1661 bail!("Invalid 12-hour clock value");
1662 }
1663 if ampm == "pm" && hour != 12 {
1664 hour += 12;
1665 }
1666 if ampm == "am" && hour == 12 {
1667 hour = 0;
1668 }
1669 } else if hour >= 24 {
1670 bail!("Invalid 24-hour clock value");
1671 }
1672
1673 let time = NaiveTime::from_hms_opt(hour, minute, 0)
1674 .ok_or_else(|| anyhow!("Invalid time value"))?;
1675 let today = now.date_naive();
1676 let naive = today.and_time(time);
1677 let mut localized = localize_naive_datetime(naive)?;
1678 if localized <= now {
1679 localized = localize_naive_datetime((today + ChronoDuration::days(1)).and_time(time))?;
1680 }
1681 return Ok(localized.with_timezone(&Utc));
1682 }
1683
1684 bail!("Unsupported time format. Use RFC3339, YYYY-MM-DD HH:MM, or a local time like 3pm")
1685}
1686
1687fn localize_naive_datetime(naive: NaiveDateTime) -> Result<DateTime<Local>> {
1688 match Local.from_local_datetime(&naive) {
1689 LocalResult::Single(value) => Ok(value),
1690 LocalResult::Ambiguous(first, _) => Ok(first),
1691 LocalResult::None => bail!("Local time does not exist due to timezone transition"),
1692 }
1693}
1694
1695fn xml_escape(value: String) -> String {
1696 value
1697 .replace('&', "&")
1698 .replace('<', "<")
1699 .replace('>', ">")
1700 .replace('"', """)
1701 .replace('\'', "'")
1702}
1703
1704#[derive(Debug, Clone)]
1705struct ParsedCron {
1706 minute: CronField,
1707 hour: CronField,
1708 day_of_month: CronField,
1709 month: CronField,
1710 day_of_week: CronField,
1711}
1712
1713impl ParsedCron {
1714 fn parse(expression: &str) -> Result<Self> {
1715 let parts = expression.split_whitespace().collect::<Vec<_>>();
1716 if parts.len() != 5 {
1717 bail!("Cron expressions require exactly 5 fields");
1718 }
1719
1720 Ok(Self {
1721 minute: CronField::parse(parts[0], 0, 59, false)?,
1722 hour: CronField::parse(parts[1], 0, 23, false)?,
1723 day_of_month: CronField::parse(parts[2], 1, 31, false)?,
1724 month: CronField::parse(parts[3], 1, 12, false)?,
1725 day_of_week: CronField::parse(parts[4], 0, 7, true)?,
1726 })
1727 }
1728
1729 fn next_after(&self, after: DateTime<Local>) -> Result<Option<DateTime<Local>>> {
1730 let mut candidate = after
1731 .with_second(0)
1732 .and_then(|value| value.with_nanosecond(0))
1733 .ok_or_else(|| anyhow!("Failed to normalize cron timestamp"))?
1734 + ChronoDuration::minutes(1);
1735 let horizon = candidate + ChronoDuration::days(366 * 5);
1736
1737 while candidate <= horizon {
1738 if self.matches(candidate) {
1739 return Ok(Some(candidate));
1740 }
1741 candidate += ChronoDuration::minutes(1);
1742 }
1743
1744 Ok(None)
1745 }
1746
1747 fn matches(&self, value: DateTime<Local>) -> bool {
1748 let month = value.month();
1749 let dom = value.day();
1750 let minute = value.minute();
1751 let hour = value.hour();
1752 let dow = value.weekday().num_days_from_sunday();
1753
1754 if !self.minute.contains(minute) || !self.hour.contains(hour) || !self.month.contains(month)
1755 {
1756 return false;
1757 }
1758
1759 let dom_matches = self.day_of_month.contains(dom);
1760 let dow_matches = self.day_of_week.contains(dow);
1761
1762 if self.day_of_month.is_wildcard && self.day_of_week.is_wildcard {
1763 return true;
1764 }
1765 if self.day_of_month.is_wildcard {
1766 return dow_matches;
1767 }
1768 if self.day_of_week.is_wildcard {
1769 return dom_matches;
1770 }
1771
1772 dom_matches || dow_matches
1773 }
1774}
1775
1776#[derive(Debug, Clone)]
1777struct CronField {
1778 values: BTreeSet<u32>,
1779 is_wildcard: bool,
1780}
1781
1782impl CronField {
1783 fn parse(raw: &str, min: u32, max: u32, is_day_of_week: bool) -> Result<Self> {
1784 if raw.contains(['L', 'W', '?']) {
1785 bail!("Unsupported cron syntax in field '{raw}'");
1786 }
1787 if raw.chars().any(|ch| ch.is_ascii_alphabetic()) {
1788 bail!("Named cron aliases are not supported in '{raw}'");
1789 }
1790
1791 let mut values = BTreeSet::new();
1792 let mut is_wildcard = false;
1793 for segment in raw.split(',') {
1794 let segment = segment.trim();
1795 if segment.is_empty() {
1796 bail!("Cron field contains an empty segment");
1797 }
1798 if segment == "*" {
1799 is_wildcard = true;
1800 values.extend(min..=max);
1801 continue;
1802 }
1803
1804 let (base, step) = if let Some((left, right)) = segment.split_once('/') {
1805 let step = right
1806 .parse::<u32>()
1807 .with_context(|| format!("Invalid step value in '{segment}'"))?;
1808 if step == 0 {
1809 bail!("Step value must be greater than zero");
1810 }
1811 (left, Some(step))
1812 } else {
1813 (segment, None)
1814 };
1815
1816 let mut base_values = if base == "*" {
1817 is_wildcard = true;
1818 (min..=max).collect::<Vec<_>>()
1819 } else if let Some((left, right)) = base.split_once('-') {
1820 let start = parse_cron_number(left, min, max, is_day_of_week)?;
1821 let end = parse_cron_number(right, min, max, is_day_of_week)?;
1822 if start > end {
1823 bail!("Invalid descending range '{base}'");
1824 }
1825 (start..=end).collect::<Vec<_>>()
1826 } else {
1827 let start = parse_cron_number(base, min, max, is_day_of_week)?;
1828 if let Some(step) = step {
1829 let mut values = Vec::new();
1830 let mut value = start;
1831 while value <= max {
1832 values.push(value);
1833 match value.checked_add(step) {
1834 Some(next) => value = next,
1835 None => break,
1836 }
1837 }
1838 values
1839 } else {
1840 vec![start]
1841 }
1842 };
1843
1844 if let Some(step) = step
1845 && (base == "*" || base.contains('-'))
1846 {
1847 let mut stepped = Vec::new();
1848 for (index, value) in base_values.iter().enumerate() {
1849 if (index as u32).is_multiple_of(step) {
1850 stepped.push(*value);
1851 }
1852 }
1853 base_values = stepped;
1854 }
1855
1856 values.extend(base_values);
1857 }
1858
1859 Ok(Self {
1860 values,
1861 is_wildcard,
1862 })
1863 }
1864
1865 fn contains(&self, value: u32) -> bool {
1866 self.values.contains(&value)
1867 }
1868}
1869
1870fn parse_cron_number(raw: &str, min: u32, max: u32, is_day_of_week: bool) -> Result<u32> {
1871 let mut value = raw
1872 .parse::<u32>()
1873 .with_context(|| format!("Invalid cron value '{raw}'"))?;
1874 if is_day_of_week && value == 7 {
1875 value = 0;
1876 }
1877 if !(min..=max).contains(&value) {
1878 bail!("Cron value '{raw}' is out of range");
1879 }
1880 Ok(value)
1881}