Skip to main content

vtcode_core/scheduler/
mod.rs

1use anyhow::{Context, Result, anyhow, bail};
2use chrono::{
3    DateTime, Datelike, Duration as ChronoDuration, Local, LocalResult, NaiveDateTime, NaiveTime,
4    TimeZone, Timelike, Utc,
5};
6use humantime::parse_duration as parse_human_duration;
7use regex::Regex;
8use serde::{Deserialize, Serialize};
9use std::collections::{BTreeMap, BTreeSet};
10use std::fmt;
11use std::fs::{self, OpenOptions};
12use std::io::Write;
13use std::path::{Path, PathBuf};
14use std::process::Stdio;
15use std::sync::LazyLock;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Duration;
18use tokio::process::Command;
19
20use crate::config::defaults::{get_config_dir, get_data_dir};
21use crate::notifications::{NotificationEvent, send_global_notification};
22use crate::utils::path::normalize_path;
23
24pub const DEFAULT_LOOP_INTERVAL_MINUTES: u64 = 10;
25pub const MAX_SCHEDULED_TASKS: usize = 50;
26pub const SESSION_TASK_EXPIRY_HOURS: i64 = 72;
27pub const DISABLE_CRON_ENV: &str = "VTCODE_DISABLE_CRON";
28pub const DURABLE_SCHEDULER_RUNTIME_HINT: &str = "Durable tasks fire while VT Code is open, `vtcode schedule serve` is running, or the installed scheduler service is active.";
29
30const SESSION_JITTER_CAP_SECS: u64 = 15 * 60;
31const ONE_SHOT_TOP_OF_HOUR_JITTER_SECS: u64 = 90;
32const CLAIM_STALE_SECS: u64 = 15 * 60;
33const SERVICE_NAME: &str = "vtcode-scheduler";
34const LAUNCHD_LABEL: &str = "com.vtcode.scheduler";
35const DURABLE_STORE_DIR: &str = "scheduled_tasks";
36
37static NEXT_TASK_COUNTER: AtomicU64 = AtomicU64::new(1);
38
39#[cfg(test)]
40mod test_env_overrides {
41    use std::sync::{LazyLock, Mutex};
42
43    static DISABLE_CRON: LazyLock<Mutex<Option<String>>> = LazyLock::new(|| Mutex::new(None));
44
45    pub(super) fn get() -> Option<String> {
46        DISABLE_CRON.lock().ok().and_then(|value| value.clone())
47    }
48
49    pub(super) fn set(value: Option<&str>) {
50        if let Ok(mut slot) = DISABLE_CRON.lock() {
51            *slot = value.map(ToString::to_string);
52        }
53    }
54}
55
56#[cfg(test)]
57mod tests;
58
59static LEADING_INTERVAL_RE: LazyLock<Regex> = LazyLock::new(|| {
60    Regex::new(
61        r"(?ix)
62        ^\s*
63        (?P<count>\d+)
64        \s*
65        (?P<unit>s|sec|secs|second|seconds|m|min|mins|minute|minutes|h|hr|hrs|hour|hours|d|day|days)
66        \s+
67        (?P<prompt>.+)
68        $",
69    )
70    .expect("leading interval regex")
71});
72
73static TRAILING_INTERVAL_RE: LazyLock<Regex> = LazyLock::new(|| {
74    Regex::new(
75        r"(?ix)
76        ^\s*
77        (?P<prompt>.+?)
78        \s+
79        every
80        \s+
81        (?P<count>\d+)
82        \s*
83        (?P<unit>s|sec|secs|second|seconds|m|min|mins|minute|minutes|h|hr|hrs|hour|hours|d|day|days)
84        \s*
85        $",
86    )
87    .expect("trailing interval regex")
88});
89
90static REMIND_AT_RE: LazyLock<Regex> = LazyLock::new(|| {
91    Regex::new(r"(?ix)^\s*remind\s+me\s+at\s+(?P<when>.+?)\s+to\s+(?P<prompt>.+)\s*$")
92        .expect("remind-at regex")
93});
94
95static REMIND_IN_RE: LazyLock<Regex> = LazyLock::new(|| {
96    Regex::new(
97        r"(?ix)
98        ^\s*in\s+
99        (?P<count>\d+)
100        \s*
101        (?P<unit>minutes|minute|hours|hour|days|day)
102        \s*,?\s*
103        (?P<prompt>.+)
104        \s*$
105    ",
106    )
107    .expect("remind-in regex")
108});
109
110static TIME_ONLY_RE: LazyLock<Regex> = LazyLock::new(|| {
111    Regex::new(r"(?ix)^\s*(?P<hour>\d{1,2})(?::(?P<minute>\d{2}))?\s*(?P<ampm>am|pm)?\s*$")
112        .expect("time-only regex")
113});
114
115#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
116#[serde(tag = "kind", rename_all = "snake_case")]
117pub enum ScheduledTaskAction {
118    Prompt { prompt: String },
119    Reminder { message: String },
120}
121
122impl ScheduledTaskAction {
123    #[must_use]
124    pub fn summary(&self) -> &str {
125        match self {
126            Self::Prompt { prompt } => prompt,
127            Self::Reminder { message } => message,
128        }
129    }
130
131    #[must_use]
132    pub fn kind_label(&self) -> &'static str {
133        match self {
134            Self::Prompt { .. } => "prompt",
135            Self::Reminder { .. } => "reminder",
136        }
137    }
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
141#[serde(tag = "type", rename_all = "snake_case")]
142pub enum ScheduleSpec {
143    Cron5(Cron5),
144    FixedInterval(FixedInterval),
145    OneShot(OneShot),
146}
147
148impl ScheduleSpec {
149    pub fn cron5(expression: impl Into<String>) -> Result<Self> {
150        Ok(Self::Cron5(Cron5::parse(expression)?))
151    }
152
153    pub fn fixed_interval(duration: Duration) -> Result<Self> {
154        Ok(Self::FixedInterval(FixedInterval::from_duration(duration)?))
155    }
156
157    pub fn one_shot(at: DateTime<Utc>) -> Self {
158        Self::OneShot(OneShot { at })
159    }
160
161    #[must_use]
162    pub fn is_recurring(&self) -> bool {
163        !matches!(self, Self::OneShot(_))
164    }
165
166    #[must_use]
167    pub fn human_description(&self) -> String {
168        match self {
169            Self::Cron5(spec) => format!("cron {}", spec.expression),
170            Self::FixedInterval(spec) => spec.human_description(),
171            Self::OneShot(spec) => format!(
172                "once at {}",
173                spec.at.with_timezone(&Local).format("%Y-%m-%d %H:%M")
174            ),
175        }
176    }
177
178    pub fn first_base_fire_at(&self, created_at: DateTime<Utc>) -> Result<Option<DateTime<Utc>>> {
179        let created_local = created_at.with_timezone(&Local);
180        let base_local = match self {
181            Self::Cron5(spec) => spec.next_after(created_local)?,
182            Self::FixedInterval(spec) => Some(created_local + spec.chrono_duration()?),
183            Self::OneShot(spec) => Some(spec.at.with_timezone(&Local)),
184        };
185        Ok(base_local.map(|value| value.with_timezone(&Utc)))
186    }
187
188    pub fn next_base_fire_after(
189        &self,
190        last_base_fire_at: DateTime<Utc>,
191    ) -> Result<Option<DateTime<Utc>>> {
192        let last_base_local = last_base_fire_at.with_timezone(&Local);
193        let next_local = match self {
194            Self::Cron5(spec) => spec.next_after(last_base_local)?,
195            Self::FixedInterval(spec) => Some(last_base_local + spec.chrono_duration()?),
196            Self::OneShot(_) => None,
197        };
198        Ok(next_local.map(|value| value.with_timezone(&Utc)))
199    }
200
201    fn jittered_fire_at(&self, id: &str, base_fire_at: DateTime<Utc>) -> Result<DateTime<Utc>> {
202        let base_local = base_fire_at.with_timezone(&Local);
203        let hash = stable_hash_u64(id.as_bytes());
204        match self {
205            Self::Cron5(_) | Self::FixedInterval(_) => {
206                let Some(period) = self.nominal_period()? else {
207                    return Ok(base_fire_at);
208                };
209                let period_secs = period.num_seconds().max(0) as u64;
210                let max_delay = ((period_secs as f64) * 0.10).floor() as u64;
211                let max_delay = max_delay.min(SESSION_JITTER_CAP_SECS);
212                let delay_secs = if max_delay == 0 {
213                    0
214                } else {
215                    hash % (max_delay + 1)
216                };
217                Ok(base_fire_at + ChronoDuration::seconds(delay_secs as i64))
218            }
219            Self::OneShot(_) if matches!(base_local.minute(), 0 | 30) => {
220                let lead_secs = hash % (ONE_SHOT_TOP_OF_HOUR_JITTER_SECS + 1);
221                Ok(base_fire_at - ChronoDuration::seconds(lead_secs as i64))
222            }
223            Self::OneShot(_) => Ok(base_fire_at),
224        }
225    }
226
227    fn nominal_period(&self) -> Result<Option<ChronoDuration>> {
228        match self {
229            Self::FixedInterval(spec) => Ok(Some(spec.chrono_duration()?)),
230            Self::Cron5(spec) => spec.approx_period(),
231            Self::OneShot(_) => Ok(None),
232        }
233    }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
237pub struct Cron5 {
238    pub expression: String,
239}
240
241impl Cron5 {
242    pub fn parse(expression: impl Into<String>) -> Result<Self> {
243        let expression = expression.into();
244        ParsedCron::parse(&expression)?;
245        Ok(Self { expression })
246    }
247
248    fn parsed(&self) -> Result<ParsedCron> {
249        ParsedCron::parse(&self.expression)
250    }
251
252    fn next_after(&self, after: DateTime<Local>) -> Result<Option<DateTime<Local>>> {
253        self.parsed()?.next_after(after)
254    }
255
256    fn approx_period(&self) -> Result<Option<ChronoDuration>> {
257        let now = Local::now();
258        let Some(first) = self.next_after(now)? else {
259            return Ok(None);
260        };
261        let Some(second) = self.next_after(first)? else {
262            return Ok(None);
263        };
264        Ok(Some(second - first))
265    }
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
269pub struct FixedInterval {
270    pub seconds: u64,
271}
272
273impl FixedInterval {
274    pub fn from_duration(duration: Duration) -> Result<Self> {
275        if duration.as_secs() < 60 {
276            bail!("Fixed intervals must be at least 1 minute");
277        }
278        Ok(Self {
279            seconds: duration.as_secs(),
280        })
281    }
282
283    pub fn chrono_duration(&self) -> Result<ChronoDuration> {
284        let seconds = i64::try_from(self.seconds).context("interval is too large")?;
285        Ok(ChronoDuration::seconds(seconds))
286    }
287
288    #[must_use]
289    pub fn human_description(&self) -> String {
290        humanize_interval(self.seconds)
291    }
292}
293
294#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
295pub struct OneShot {
296    pub at: DateTime<Utc>,
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
300pub struct ScheduledTaskDefinition {
301    pub id: String,
302    pub name: String,
303    pub schedule: ScheduleSpec,
304    pub action: ScheduledTaskAction,
305    pub workspace: Option<PathBuf>,
306    pub created_at: DateTime<Utc>,
307    pub expires_at: Option<DateTime<Utc>>,
308}
309
310impl ScheduledTaskDefinition {
311    pub fn new(
312        name: Option<String>,
313        schedule: ScheduleSpec,
314        action: ScheduledTaskAction,
315        workspace: Option<PathBuf>,
316        created_at: DateTime<Utc>,
317        expires_at: Option<DateTime<Utc>>,
318    ) -> Result<Self> {
319        let rendered_name = name
320            .map(|value| value.trim().to_string())
321            .filter(|value| !value.is_empty())
322            .unwrap_or_else(|| summarize_task_name(action.summary()));
323        let id = generate_task_id(&rendered_name, action.summary(), created_at);
324        Ok(Self {
325            id,
326            name: rendered_name,
327            schedule,
328            action,
329            workspace,
330            created_at,
331            expires_at,
332        })
333    }
334}
335
336#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
337#[serde(tag = "state", rename_all = "snake_case")]
338pub enum TaskRunStatus {
339    Triggered,
340    ReminderSent,
341    Success,
342    Failed { message: String },
343}
344
345impl fmt::Display for TaskRunStatus {
346    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347        match self {
348            Self::Triggered => write!(f, "triggered"),
349            Self::ReminderSent => write!(f, "reminder_sent"),
350            Self::Success => write!(f, "success"),
351            Self::Failed { message } => write!(f, "failed: {message}"),
352        }
353    }
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
357pub struct ScheduledTaskRuntimeState {
358    pub last_run_at: Option<DateTime<Utc>>,
359    pub next_base_run_at: Option<DateTime<Utc>>,
360    pub next_run_at: Option<DateTime<Utc>>,
361    pub last_status: Option<TaskRunStatus>,
362    pub last_artifact_dir: Option<PathBuf>,
363    pub last_events_file: Option<PathBuf>,
364    pub last_message_file: Option<PathBuf>,
365}
366
367#[derive(Debug, Clone, PartialEq, Eq)]
368pub struct ScheduledTaskRecord {
369    pub definition: ScheduledTaskDefinition,
370    pub runtime: ScheduledTaskRuntimeState,
371}
372
373#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
374pub struct ScheduledTaskSummary {
375    pub id: String,
376    pub name: String,
377    pub action_kind: String,
378    pub schedule: String,
379    pub workspace: Option<PathBuf>,
380    pub recurring: bool,
381    pub next_run_at: Option<DateTime<Utc>>,
382    pub last_run_at: Option<DateTime<Utc>>,
383    pub last_status: Option<String>,
384}
385
386impl ScheduledTaskRecord {
387    fn summary(&self) -> ScheduledTaskSummary {
388        ScheduledTaskSummary {
389            id: self.definition.id.clone(),
390            name: self.definition.name.clone(),
391            action_kind: self.definition.action.kind_label().to_string(),
392            schedule: self.definition.schedule.human_description(),
393            workspace: self.definition.workspace.clone(),
394            recurring: self.definition.schedule.is_recurring(),
395            next_run_at: self.runtime.next_run_at,
396            last_run_at: self.runtime.last_run_at,
397            last_status: self.runtime.last_status.as_ref().map(ToString::to_string),
398        }
399    }
400}
401
402#[derive(Debug, Clone, PartialEq, Eq)]
403pub struct DueSessionPrompt {
404    pub id: String,
405    pub name: String,
406    pub prompt: String,
407}
408
409#[derive(Debug, Default, Clone)]
410pub struct SessionScheduler {
411    tasks: BTreeMap<String, ScheduledTaskRecord>,
412}
413
414impl SessionScheduler {
415    #[must_use]
416    pub fn new() -> Self {
417        Self {
418            tasks: BTreeMap::new(),
419        }
420    }
421
422    #[must_use]
423    pub fn len(&self) -> usize {
424        self.tasks.len()
425    }
426
427    #[must_use]
428    pub fn is_empty(&self) -> bool {
429        self.tasks.is_empty()
430    }
431
432    pub fn create_prompt_task(
433        &mut self,
434        name: Option<String>,
435        prompt: String,
436        schedule: ScheduleSpec,
437        created_at: DateTime<Utc>,
438    ) -> Result<ScheduledTaskSummary> {
439        self.ensure_capacity()?;
440        let expires_at = schedule
441            .is_recurring()
442            .then(|| created_at + ChronoDuration::hours(SESSION_TASK_EXPIRY_HOURS));
443        let definition = ScheduledTaskDefinition::new(
444            name,
445            schedule,
446            ScheduledTaskAction::Prompt { prompt },
447            None,
448            created_at,
449            expires_at,
450        )?;
451        let runtime = initialize_runtime_state(&definition)?;
452        let record = ScheduledTaskRecord {
453            definition: definition.clone(),
454            runtime,
455        };
456        let summary = record.summary();
457        self.tasks.insert(definition.id.clone(), record);
458        Ok(summary)
459    }
460
461    pub fn list(&self) -> Vec<ScheduledTaskSummary> {
462        self.tasks
463            .values()
464            .map(ScheduledTaskRecord::summary)
465            .collect()
466    }
467
468    pub fn delete(&mut self, query: &str) -> Option<ScheduledTaskSummary> {
469        let query = query.trim();
470        if query.is_empty() {
471            return None;
472        }
473        if let Some(record) = self.tasks.remove(query) {
474            return Some(record.summary());
475        }
476        let key = self.tasks.iter().find_map(|(id, record)| {
477            record
478                .definition
479                .name
480                .eq_ignore_ascii_case(query)
481                .then(|| id.clone())
482        })?;
483        self.tasks.remove(&key).map(|record| record.summary())
484    }
485
486    pub fn collect_due_prompts(&mut self, now: DateTime<Utc>) -> Result<Vec<DueSessionPrompt>> {
487        let mut due = Vec::new();
488        let mut completed = Vec::new();
489        for record in self.tasks.values_mut() {
490            let Some(next_run_at) = record.runtime.next_run_at else {
491                continue;
492            };
493            if now < next_run_at {
494                continue;
495            }
496
497            if let Some(prompt) = due_session_prompt(record) {
498                due.push(prompt);
499            }
500
501            if advance_record_runtime(record, now, TaskRunStatus::Triggered)? {
502                completed.push(record.definition.id.clone());
503            }
504        }
505
506        for id in completed {
507            self.tasks.remove(&id);
508        }
509
510        Ok(due)
511    }
512
513    fn ensure_capacity(&self) -> Result<()> {
514        if self.tasks.len() >= MAX_SCHEDULED_TASKS {
515            bail!(
516                "A session can hold at most {} scheduled tasks",
517                MAX_SCHEDULED_TASKS
518            );
519        }
520        Ok(())
521    }
522}
523
524#[derive(Debug, Clone, PartialEq, Eq)]
525pub struct LoopCommand {
526    pub prompt: String,
527    pub interval: FixedInterval,
528    pub normalization_note: Option<String>,
529}
530
531#[derive(Debug, Clone, PartialEq, Eq)]
532pub enum SessionLanguageCommand {
533    CreateOneShotPrompt {
534        prompt: String,
535        run_at: DateTime<Utc>,
536    },
537    ListTasks,
538    CancelTask {
539        query: String,
540    },
541}
542
543#[derive(Debug, Clone, PartialEq, Eq)]
544pub struct ScheduleCreateInput {
545    pub name: Option<String>,
546    pub prompt: Option<String>,
547    pub reminder: Option<String>,
548    pub every: Option<String>,
549    pub cron: Option<String>,
550    pub at: Option<String>,
551    pub workspace: Option<PathBuf>,
552}
553
554impl ScheduleCreateInput {
555    pub fn build_definition(
556        self,
557        now: DateTime<Local>,
558        default_workspace: Option<PathBuf>,
559    ) -> Result<ScheduledTaskDefinition> {
560        let action = match (self.prompt, self.reminder) {
561            (Some(prompt), None) => ScheduledTaskAction::Prompt { prompt },
562            (None, Some(reminder)) => ScheduledTaskAction::Reminder { message: reminder },
563            (Some(_), Some(_)) => bail!("Choose either --prompt or --reminder"),
564            (None, None) => bail!("One of --prompt or --reminder is required"),
565        };
566
567        let schedule = match (self.every, self.cron, self.at) {
568            (Some(raw), None, None) => {
569                let duration = parse_human_duration(raw.trim())
570                    .with_context(|| format!("Invalid --every duration: {}", raw.trim()))?;
571                ScheduleSpec::fixed_interval(duration)?
572            }
573            (None, Some(expression), None) => ScheduleSpec::cron5(expression)?,
574            (None, None, Some(raw)) => {
575                ScheduleSpec::one_shot(parse_local_datetime(raw.trim(), now)?)
576            }
577            _ => bail!("Choose exactly one of --every, --cron, or --at"),
578        };
579
580        let workspace = match (&action, self.workspace.or(default_workspace)) {
581            (ScheduledTaskAction::Prompt { .. }, Some(path)) => Some(path),
582            (ScheduledTaskAction::Prompt { .. }, None) => {
583                bail!("Prompt tasks require a workspace (pass --workspace or create from chat)")
584            }
585            (ScheduledTaskAction::Reminder { .. }, path) => path,
586        }
587        .map(|path| resolve_scheduled_workspace_path(&path))
588        .transpose()?;
589
590        if matches!(action, ScheduledTaskAction::Prompt { .. })
591            && workspace.as_ref().is_some_and(|path| !path.is_dir())
592        {
593            let workspace = workspace.as_ref().expect("prompt workspace should exist");
594            bail!(
595                "Prompt task workspace does not exist or is not a directory: {}",
596                workspace.display()
597            );
598        }
599
600        ScheduledTaskDefinition::new(
601            self.name,
602            schedule,
603            action,
604            workspace,
605            now.with_timezone(&Utc),
606            None,
607        )
608    }
609}
610
611fn resolve_scheduled_workspace_path(path: &Path) -> Result<PathBuf> {
612    resolve_scheduled_workspace_path_with_home(path, dirs::home_dir().as_deref())
613}
614
615fn resolve_scheduled_workspace_path_with_home(
616    path: &Path,
617    home_dir: Option<&Path>,
618) -> Result<PathBuf> {
619    let expanded = expand_scheduled_workspace_home(path, home_dir);
620    let absolute = if expanded.is_absolute() {
621        expanded
622    } else {
623        std::env::current_dir()
624            .context("Failed to resolve current directory for scheduled task workspace")?
625            .join(expanded)
626    };
627    Ok(normalize_path(&absolute))
628}
629
630fn expand_scheduled_workspace_home(path: &Path, home_dir: Option<&Path>) -> PathBuf {
631    let Some(raw) = path.to_str() else {
632        return path.to_path_buf();
633    };
634
635    if raw == "~" {
636        return home_dir
637            .map(Path::to_path_buf)
638            .unwrap_or_else(|| path.to_path_buf());
639    }
640
641    if let Some(rest) = raw.strip_prefix("~/")
642        && let Some(home_dir) = home_dir
643    {
644        return home_dir.join(rest);
645    }
646
647    path.to_path_buf()
648}
649
650#[derive(Debug, Clone)]
651pub struct SchedulerPaths {
652    pub config_root: PathBuf,
653    pub data_root: PathBuf,
654}
655
656impl SchedulerPaths {
657    pub fn new_default() -> Result<Self> {
658        let config_root = get_config_dir()
659            .ok_or_else(|| anyhow!("Failed to resolve VT Code config directory"))?
660            .join(DURABLE_STORE_DIR);
661        let data_root = get_data_dir()
662            .ok_or_else(|| anyhow!("Failed to resolve VT Code data directory"))?
663            .join(DURABLE_STORE_DIR);
664        Ok(Self {
665            config_root,
666            data_root,
667        })
668    }
669
670    #[must_use]
671    pub fn tasks_dir(&self) -> PathBuf {
672        self.config_root.join("tasks")
673    }
674
675    #[must_use]
676    pub fn state_dir(&self) -> PathBuf {
677        self.data_root.join("state")
678    }
679
680    #[must_use]
681    pub fn claims_dir(&self) -> PathBuf {
682        self.data_root.join("claims")
683    }
684
685    #[must_use]
686    pub fn runs_dir(&self) -> PathBuf {
687        self.data_root.join("runs")
688    }
689
690    pub fn ensure_dirs(&self) -> Result<()> {
691        for dir in [
692            &self.config_root,
693            &self.data_root,
694            &self.tasks_dir(),
695            &self.state_dir(),
696            &self.claims_dir(),
697            &self.runs_dir(),
698        ] {
699            fs::create_dir_all(dir).with_context(|| {
700                format!("Failed to create scheduler directory {}", dir.display())
701            })?;
702        }
703        Ok(())
704    }
705
706    #[must_use]
707    pub fn definition_path(&self, id: &str) -> PathBuf {
708        self.tasks_dir().join(format!("{id}.toml"))
709    }
710
711    #[must_use]
712    pub fn runtime_path(&self, id: &str) -> PathBuf {
713        self.state_dir().join(format!("{id}.json"))
714    }
715
716    #[must_use]
717    pub fn claim_path(&self, id: &str) -> PathBuf {
718        self.claims_dir().join(format!("{id}.claim"))
719    }
720
721    #[must_use]
722    pub fn artifact_dir(&self, id: &str, run_at: DateTime<Utc>) -> PathBuf {
723        self.runs_dir()
724            .join(id)
725            .join(run_at.format("%Y%m%dT%H%M%SZ").to_string())
726    }
727}
728
729#[derive(Debug, Clone)]
730pub struct DurableTaskStore {
731    paths: SchedulerPaths,
732}
733
734impl DurableTaskStore {
735    #[must_use]
736    pub fn with_paths(paths: SchedulerPaths) -> Self {
737        Self { paths }
738    }
739
740    pub fn new_default() -> Result<Self> {
741        Ok(Self {
742            paths: SchedulerPaths::new_default()?,
743        })
744    }
745
746    #[must_use]
747    pub fn paths(&self) -> &SchedulerPaths {
748        &self.paths
749    }
750
751    pub fn create(&self, definition: ScheduledTaskDefinition) -> Result<ScheduledTaskSummary> {
752        self.paths.ensure_dirs()?;
753        let current_count = self.definition_paths()?.len();
754        if current_count >= MAX_SCHEDULED_TASKS {
755            bail!(
756                "VT Code supports at most {} durable scheduled tasks",
757                MAX_SCHEDULED_TASKS
758            );
759        }
760
761        let runtime = initialize_runtime_state(&definition)?;
762        self.write_definition(&definition)?;
763        self.write_runtime(&definition.id, &runtime)?;
764        Ok(ScheduledTaskRecord {
765            definition,
766            runtime,
767        }
768        .summary())
769    }
770
771    pub fn create_from_input(
772        &self,
773        input: ScheduleCreateInput,
774        now: DateTime<Local>,
775        default_workspace: Option<PathBuf>,
776    ) -> Result<ScheduledTaskSummary> {
777        let definition = input.build_definition(now, default_workspace)?;
778        self.create(definition)
779    }
780
781    pub fn list(&self) -> Result<Vec<ScheduledTaskSummary>> {
782        let mut records = self.load_records()?;
783        records.sort_by_key(|record| record.runtime.next_run_at);
784        Ok(records.into_iter().map(|record| record.summary()).collect())
785    }
786
787    pub fn delete(&self, id: &str) -> Result<Option<ScheduledTaskSummary>> {
788        let Some(record) = self.load_record(id)? else {
789            return Ok(None);
790        };
791        let _ = fs::remove_file(self.paths.definition_path(id));
792        let _ = fs::remove_file(self.paths.runtime_path(id));
793        let _ = fs::remove_file(self.paths.claim_path(id));
794        Ok(Some(record.summary()))
795    }
796
797    pub fn load_record(&self, id: &str) -> Result<Option<ScheduledTaskRecord>> {
798        let definition_path = self.paths.definition_path(id);
799        if !definition_path.exists() {
800            return Ok(None);
801        }
802        let definition = read_definition(&definition_path)?;
803        let runtime = match self.read_runtime(&definition.id)? {
804            Some(runtime) => runtime,
805            None => initialize_runtime_state(&definition)?,
806        };
807        Ok(Some(ScheduledTaskRecord {
808            definition,
809            runtime,
810        }))
811    }
812
813    pub fn update_runtime(&self, record: &ScheduledTaskRecord) -> Result<()> {
814        self.write_runtime(&record.definition.id, &record.runtime)
815    }
816
817    fn load_records(&self) -> Result<Vec<ScheduledTaskRecord>> {
818        self.paths.ensure_dirs()?;
819        let mut records = Vec::new();
820        for definition_path in self.definition_paths()? {
821            let definition = read_definition(&definition_path)?;
822            let runtime = self
823                .read_runtime(&definition.id)?
824                .unwrap_or(initialize_runtime_state(&definition)?);
825            records.push(ScheduledTaskRecord {
826                definition,
827                runtime,
828            });
829        }
830        Ok(records)
831    }
832
833    fn definition_paths(&self) -> Result<Vec<PathBuf>> {
834        self.paths.ensure_dirs()?;
835        let mut paths = Vec::new();
836        for entry in fs::read_dir(self.paths.tasks_dir())
837            .with_context(|| format!("Failed to read {}", self.paths.tasks_dir().display()))?
838        {
839            let entry = entry?;
840            let path = entry.path();
841            if path.extension().and_then(|value| value.to_str()) == Some("toml") {
842                paths.push(path);
843            }
844        }
845        paths.sort();
846        Ok(paths)
847    }
848
849    fn write_definition(&self, definition: &ScheduledTaskDefinition) -> Result<()> {
850        let serialized =
851            toml::to_string_pretty(definition).context("Failed to serialize task definition")?;
852        atomic_write(
853            &self.paths.definition_path(&definition.id),
854            serialized.as_bytes(),
855        )
856    }
857
858    fn read_runtime(&self, id: &str) -> Result<Option<ScheduledTaskRuntimeState>> {
859        let path = self.paths.runtime_path(id);
860        if !path.exists() {
861            return Ok(None);
862        }
863        let raw = fs::read_to_string(&path)
864            .with_context(|| format!("Failed to read {}", path.display()))?;
865        let runtime = serde_json::from_str(&raw)
866            .with_context(|| format!("Failed to parse {}", path.display()))?;
867        Ok(Some(runtime))
868    }
869
870    fn write_runtime(&self, id: &str, runtime: &ScheduledTaskRuntimeState) -> Result<()> {
871        let serialized =
872            serde_json::to_vec_pretty(runtime).context("Failed to serialize runtime state")?;
873        atomic_write(&self.paths.runtime_path(id), &serialized)
874    }
875}
876
877#[derive(Debug, Clone)]
878pub struct SchedulerDaemon {
879    store: DurableTaskStore,
880    executable_path: PathBuf,
881}
882
883impl SchedulerDaemon {
884    #[must_use]
885    pub fn new(store: DurableTaskStore, executable_path: PathBuf) -> Self {
886        Self {
887            store,
888            executable_path,
889        }
890    }
891
892    pub async fn serve_forever(&self) -> Result<()> {
893        loop {
894            self.run_due_tasks_once().await?;
895            tokio::time::sleep(Duration::from_secs(1)).await;
896        }
897    }
898
899    pub async fn run_due_tasks_once(&self) -> Result<usize> {
900        let now = Utc::now();
901        let mut records = self.store.load_records()?;
902        let mut executed = 0usize;
903
904        records.sort_by_key(|record| record.runtime.next_run_at);
905        for mut record in records {
906            let Some(next_run_at) = record.runtime.next_run_at else {
907                continue;
908            };
909            if now < next_run_at {
910                continue;
911            }
912            if !try_acquire_claim(self.store.paths(), &record.definition.id)? {
913                continue;
914            }
915
916            let result = self.execute_record(&record, now).await;
917            let release_result = release_claim(self.store.paths(), &record.definition.id);
918            let run_outcome = result?;
919            release_result?;
920
921            apply_run_outcome(&mut record, run_outcome)?;
922            self.store.update_runtime(&record)?;
923            executed = executed.saturating_add(1);
924        }
925
926        Ok(executed)
927    }
928
929    async fn execute_record(
930        &self,
931        record: &ScheduledTaskRecord,
932        run_at: DateTime<Utc>,
933    ) -> Result<RunOutcome> {
934        match &record.definition.action {
935            ScheduledTaskAction::Reminder { message } => {
936                let notification = send_global_notification(NotificationEvent::IdlePrompt {
937                    title: format!("Scheduled reminder: {}", record.definition.name),
938                    message: message.clone(),
939                })
940                .await;
941                let status = match notification {
942                    Ok(()) => TaskRunStatus::ReminderSent,
943                    Err(error) => TaskRunStatus::Failed {
944                        message: format!("failed to send reminder notification: {error:#}"),
945                    },
946                };
947                Ok(RunOutcome {
948                    ran_at: run_at,
949                    status,
950                    artifact_dir: None,
951                    events_file: None,
952                    last_message_file: None,
953                })
954            }
955            ScheduledTaskAction::Prompt { prompt } => {
956                let artifact_dir = self
957                    .store
958                    .paths()
959                    .artifact_dir(&record.definition.id, run_at);
960                let events_file = artifact_dir.join("events.jsonl");
961                let last_message_file = artifact_dir.join("last-message.txt");
962                let workspace_label = scheduled_workspace_label(&record.definition);
963                let workspace = resolve_scheduled_task_workspace(&record.definition);
964                let execution = async {
965                    tokio::fs::create_dir_all(&artifact_dir)
966                        .await
967                        .with_context(|| {
968                            format!(
969                                "Failed to create run artifact dir {}",
970                                artifact_dir.display()
971                            )
972                        })?;
973
974                    let mut command = Command::new(&self.executable_path);
975                    command
976                        .arg("exec")
977                        .arg("--events")
978                        .arg(&events_file)
979                        .arg("--last-message-file")
980                        .arg(&last_message_file)
981                        .arg(prompt)
982                        .stdin(Stdio::null())
983                        .stdout(Stdio::null())
984                        .stderr(Stdio::null());
985
986                    if let Some(workspace) = workspace? {
987                        command.current_dir(&workspace);
988                    }
989
990                    command.status().await.with_context(|| {
991                        format!(
992                            "Failed to spawn scheduled VT Code exec for task {} using {} in {}",
993                            record.definition.id,
994                            self.executable_path.display(),
995                            workspace_label
996                        )
997                    })
998                }
999                .await;
1000                let run_status = match execution {
1001                    Ok(status) if status.success() => TaskRunStatus::Success,
1002                    Ok(status) => TaskRunStatus::Failed {
1003                        message: format!("vtcode exec exited with {}", status),
1004                    },
1005                    Err(error) => TaskRunStatus::Failed {
1006                        message: format!("{error:#}"),
1007                    },
1008                };
1009                Ok(RunOutcome {
1010                    ran_at: run_at,
1011                    status: run_status,
1012                    artifact_dir: artifact_dir.is_dir().then_some(artifact_dir),
1013                    events_file: events_file.is_file().then_some(events_file),
1014                    last_message_file: last_message_file.is_file().then_some(last_message_file),
1015                })
1016            }
1017        }
1018    }
1019}
1020
1021#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1022pub enum ServiceManager {
1023    Launchd,
1024    SystemdUser,
1025}
1026
1027impl ServiceManager {
1028    #[must_use]
1029    pub fn current() -> Option<Self> {
1030        #[cfg(target_os = "macos")]
1031        {
1032            return Some(Self::Launchd);
1033        }
1034        #[cfg(all(unix, not(target_os = "macos")))]
1035        {
1036            return Some(Self::SystemdUser);
1037        }
1038        #[expect(unreachable_code)]
1039        None
1040    }
1041}
1042
1043#[derive(Debug, Clone)]
1044pub struct ServiceInstallPlan {
1045    pub manager: ServiceManager,
1046    pub path: PathBuf,
1047    pub contents: String,
1048}
1049
1050pub fn render_service_install_plan(executable_path: &Path) -> Result<ServiceInstallPlan> {
1051    let manager = ServiceManager::current()
1052        .ok_or_else(|| anyhow!("Durable scheduler services are unsupported on this platform"))?;
1053    let path = match manager {
1054        ServiceManager::Launchd => dirs::home_dir()
1055            .ok_or_else(|| anyhow!("Failed to resolve home directory"))?
1056            .join("Library/LaunchAgents")
1057            .join(format!("{LAUNCHD_LABEL}.plist")),
1058        ServiceManager::SystemdUser => dirs::home_dir()
1059            .ok_or_else(|| anyhow!("Failed to resolve home directory"))?
1060            .join(".config/systemd/user")
1061            .join(format!("{SERVICE_NAME}.service")),
1062    };
1063    let contents = match manager {
1064        ServiceManager::Launchd => render_launchd_plist(executable_path),
1065        ServiceManager::SystemdUser => render_systemd_unit(executable_path),
1066    };
1067    Ok(ServiceInstallPlan {
1068        manager,
1069        path,
1070        contents,
1071    })
1072}
1073
1074pub fn install_service_file(executable_path: &Path) -> Result<ServiceInstallPlan> {
1075    let plan = render_service_install_plan(executable_path)?;
1076    if let Some(parent) = plan.path.parent() {
1077        fs::create_dir_all(parent)
1078            .with_context(|| format!("Failed to create {}", parent.display()))?;
1079    }
1080    atomic_write(&plan.path, plan.contents.as_bytes())?;
1081    Ok(plan)
1082}
1083
1084pub fn uninstall_service_file() -> Result<Option<(ServiceManager, PathBuf, bool)>> {
1085    let Some(manager) = ServiceManager::current() else {
1086        return Ok(None);
1087    };
1088    let path = render_service_install_plan(Path::new("/tmp/vtcode"))?.path;
1089    if path.exists() {
1090        fs::remove_file(&path).with_context(|| format!("Failed to remove {}", path.display()))?;
1091        return Ok(Some((manager, path, true)));
1092    }
1093    Ok(Some((manager, path, false)))
1094}
1095
1096#[must_use]
1097pub fn render_launchd_plist(executable_path: &Path) -> String {
1098    format!(
1099        r#"<?xml version="1.0" encoding="UTF-8"?>
1100<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
1101<plist version="1.0">
1102  <dict>
1103    <key>Label</key>
1104    <string>{LAUNCHD_LABEL}</string>
1105    <key>ProgramArguments</key>
1106    <array>
1107      <string>{}</string>
1108      <string>schedule</string>
1109      <string>serve</string>
1110    </array>
1111    <key>RunAtLoad</key>
1112    <true/>
1113    <key>KeepAlive</key>
1114    <true/>
1115  </dict>
1116</plist>
1117"#,
1118        xml_escape(executable_path.display().to_string())
1119    )
1120}
1121
1122#[must_use]
1123pub fn render_systemd_unit(executable_path: &Path) -> String {
1124    format!(
1125        "[Unit]\nDescription=VT Code scheduler\n\n[Service]\nType=simple\nExecStart={} schedule serve\nRestart=always\nRestartSec=5\n\n[Install]\nWantedBy=default.target\n",
1126        shell_words::quote(executable_path.to_string_lossy().as_ref())
1127    )
1128}
1129
1130pub fn scheduled_tasks_enabled(enabled_in_config: bool) -> bool {
1131    #[cfg(test)]
1132    if let Some(value) = test_env_overrides::get() {
1133        let normalized = value.trim().to_ascii_lowercase();
1134        if matches!(normalized.as_str(), "1" | "true" | "yes" | "on") {
1135            return false;
1136        }
1137    }
1138
1139    if let Ok(value) = std::env::var(DISABLE_CRON_ENV) {
1140        let normalized = value.trim().to_ascii_lowercase();
1141        if matches!(normalized.as_str(), "1" | "true" | "yes" | "on") {
1142            return false;
1143        }
1144    }
1145    enabled_in_config
1146}
1147
1148#[must_use]
1149pub fn durable_task_is_overdue(
1150    next_run_at: Option<DateTime<Utc>>,
1151    last_run_at: Option<DateTime<Utc>>,
1152    has_last_status: bool,
1153    now: DateTime<Utc>,
1154) -> bool {
1155    !has_last_status && last_run_at.is_none() && next_run_at.is_some_and(|next_run| next_run <= now)
1156}
1157
1158pub fn parse_loop_command(args: &str) -> Result<LoopCommand> {
1159    let trimmed = args.trim();
1160    if trimmed.is_empty() {
1161        bail!("Usage: /loop [interval] <prompt>");
1162    }
1163
1164    let (raw_prompt, count, unit) = if let Some(captures) = LEADING_INTERVAL_RE.captures(trimmed) {
1165        (
1166            captures["prompt"].trim().to_string(),
1167            captures["count"].parse::<u64>()?,
1168            captures["unit"].to_string(),
1169        )
1170    } else if let Some(captures) = TRAILING_INTERVAL_RE.captures(trimmed) {
1171        (
1172            captures["prompt"].trim().to_string(),
1173            captures["count"].parse::<u64>()?,
1174            captures["unit"].to_string(),
1175        )
1176    } else {
1177        (
1178            trimmed.to_string(),
1179            DEFAULT_LOOP_INTERVAL_MINUTES,
1180            "minutes".to_string(),
1181        )
1182    };
1183
1184    if raw_prompt.trim().is_empty() {
1185        bail!("Usage: /loop [interval] <prompt>");
1186    }
1187
1188    let (seconds, note) = normalize_interval_spec(count, &unit)?;
1189    Ok(LoopCommand {
1190        prompt: raw_prompt,
1191        interval: FixedInterval { seconds },
1192        normalization_note: note,
1193    })
1194}
1195
1196pub fn parse_session_language_command(
1197    input: &str,
1198    now: DateTime<Local>,
1199) -> Option<Result<SessionLanguageCommand>> {
1200    let trimmed = input.trim();
1201    if trimmed.is_empty() {
1202        return None;
1203    }
1204
1205    let normalized = trimmed
1206        .trim_end_matches(['?', '.', '!'])
1207        .to_ascii_lowercase();
1208    if normalized == "what scheduled tasks do i have" {
1209        return Some(Ok(SessionLanguageCommand::ListTasks));
1210    }
1211
1212    if let Some(captures) = REMIND_AT_RE.captures(trimmed) {
1213        let when = captures
1214            .name("when")
1215            .map(|value| value.as_str())
1216            .unwrap_or_default();
1217        let prompt = captures
1218            .name("prompt")
1219            .map(|value| value.as_str().trim().to_string())
1220            .unwrap_or_default();
1221        return Some(
1222            parse_local_datetime(when, now)
1223                .map(|run_at| SessionLanguageCommand::CreateOneShotPrompt { prompt, run_at }),
1224        );
1225    }
1226
1227    if let Some(captures) = REMIND_IN_RE.captures(trimmed) {
1228        let count = captures["count"].parse::<i64>().ok()?;
1229        let prompt = captures["prompt"].trim().to_string();
1230        let delta = match captures["unit"].to_ascii_lowercase().as_str() {
1231            "minute" | "minutes" => ChronoDuration::minutes(count),
1232            "hour" | "hours" => ChronoDuration::hours(count),
1233            "day" | "days" => ChronoDuration::days(count),
1234            _ => return None,
1235        };
1236        return Some(Ok(SessionLanguageCommand::CreateOneShotPrompt {
1237            prompt,
1238            run_at: (now + delta).with_timezone(&Utc),
1239        }));
1240    }
1241
1242    if let Some(query) = trimmed.strip_prefix("cancel ") {
1243        return Some(Ok(SessionLanguageCommand::CancelTask {
1244            query: query.trim().to_string(),
1245        }));
1246    }
1247
1248    None
1249}
1250
1251pub fn parse_schedule_create_args(args: &str) -> Result<ScheduleCreateInput> {
1252    let tokens =
1253        shell_words::split(args).with_context(|| format!("Failed to parse arguments: {args}"))?;
1254    parse_schedule_create_tokens(&tokens)
1255}
1256
1257pub fn parse_schedule_create_tokens(tokens: &[String]) -> Result<ScheduleCreateInput> {
1258    let mut name = None;
1259    let mut prompt = None;
1260    let mut reminder = None;
1261    let mut every = None;
1262    let mut cron = None;
1263    let mut at = None;
1264    let mut workspace = None;
1265    let mut index = 0usize;
1266
1267    while index < tokens.len() {
1268        let token = &tokens[index];
1269        let (flag, inline_value) = if let Some((left, right)) = token.split_once('=') {
1270            (left, Some(right.to_string()))
1271        } else {
1272            (token.as_str(), None)
1273        };
1274
1275        let take_value = |idx: &mut usize| -> Result<String> {
1276            if let Some(value) = inline_value.clone() {
1277                return Ok(value);
1278            }
1279            let Some(value) = tokens.get(*idx + 1) else {
1280                bail!("Missing value for {flag}");
1281            };
1282            *idx += 1;
1283            Ok(value.clone())
1284        };
1285
1286        match flag {
1287            "--name" => name = Some(take_value(&mut index)?),
1288            "--prompt" => prompt = Some(take_value(&mut index)?),
1289            "--reminder" => reminder = Some(take_value(&mut index)?),
1290            "--every" => every = Some(take_value(&mut index)?),
1291            "--cron" => cron = Some(take_value(&mut index)?),
1292            "--at" => at = Some(take_value(&mut index)?),
1293            "--workspace" => workspace = Some(PathBuf::from(take_value(&mut index)?)),
1294            "--help" | "help" => {
1295                bail!(
1296                    "Usage: /schedule create --prompt <text>|--reminder <text> --every <dur>|--cron <expr>|--at <time> [--name <label>] [--workspace <path>]"
1297                );
1298            }
1299            _ => bail!("Unknown option: {token}"),
1300        }
1301        index += 1;
1302    }
1303
1304    Ok(ScheduleCreateInput {
1305        name,
1306        prompt,
1307        reminder,
1308        every,
1309        cron,
1310        at,
1311        workspace,
1312    })
1313}
1314
1315fn initialize_runtime_state(
1316    definition: &ScheduledTaskDefinition,
1317) -> Result<ScheduledTaskRuntimeState> {
1318    let next_base_run_at = definition
1319        .schedule
1320        .first_base_fire_at(definition.created_at)?;
1321    let next_run_at = next_base_run_at
1322        .map(|base| definition.schedule.jittered_fire_at(&definition.id, base))
1323        .transpose()?;
1324    Ok(ScheduledTaskRuntimeState {
1325        next_base_run_at,
1326        next_run_at,
1327        ..ScheduledTaskRuntimeState::default()
1328    })
1329}
1330
1331#[derive(Debug, Clone, Copy)]
1332struct NextScheduledRun {
1333    base_fire_at: DateTime<Utc>,
1334    fire_at: DateTime<Utc>,
1335}
1336
1337fn due_session_prompt(record: &ScheduledTaskRecord) -> Option<DueSessionPrompt> {
1338    let ScheduledTaskAction::Prompt { prompt } = &record.definition.action else {
1339        return None;
1340    };
1341
1342    Some(DueSessionPrompt {
1343        id: record.definition.id.clone(),
1344        name: record.definition.name.clone(),
1345        prompt: prompt.clone(),
1346    })
1347}
1348
1349fn next_scheduled_run(
1350    definition: &ScheduledTaskDefinition,
1351    last_base_run_at: Option<DateTime<Utc>>,
1352) -> Result<Option<NextScheduledRun>> {
1353    let Some(last_base_run_at) = last_base_run_at else {
1354        return Ok(None);
1355    };
1356
1357    let Some(next_base_run_at) = definition.schedule.next_base_fire_after(last_base_run_at)? else {
1358        return Ok(None);
1359    };
1360
1361    if definition
1362        .expires_at
1363        .is_some_and(|expiry| next_base_run_at > expiry)
1364    {
1365        return Ok(None);
1366    }
1367
1368    Ok(Some(NextScheduledRun {
1369        fire_at: definition
1370            .schedule
1371            .jittered_fire_at(&definition.id, next_base_run_at)?,
1372        base_fire_at: next_base_run_at,
1373    }))
1374}
1375
1376fn advance_record_runtime(
1377    record: &mut ScheduledTaskRecord,
1378    ran_at: DateTime<Utc>,
1379    status: TaskRunStatus,
1380) -> Result<bool> {
1381    record.runtime.last_run_at = Some(ran_at);
1382    record.runtime.last_status = Some(status);
1383
1384    match next_scheduled_run(&record.definition, record.runtime.next_base_run_at)? {
1385        Some(next_run) => {
1386            record.runtime.next_base_run_at = Some(next_run.base_fire_at);
1387            record.runtime.next_run_at = Some(next_run.fire_at);
1388            Ok(false)
1389        }
1390        None => {
1391            record.runtime.next_base_run_at = None;
1392            record.runtime.next_run_at = None;
1393            Ok(true)
1394        }
1395    }
1396}
1397
1398fn apply_run_outcome(record: &mut ScheduledTaskRecord, outcome: RunOutcome) -> Result<()> {
1399    let RunOutcome {
1400        ran_at,
1401        status,
1402        artifact_dir,
1403        events_file,
1404        last_message_file,
1405    } = outcome;
1406
1407    record.runtime.last_artifact_dir = artifact_dir;
1408    record.runtime.last_events_file = events_file;
1409    record.runtime.last_message_file = last_message_file;
1410
1411    advance_record_runtime(record, ran_at, status)?;
1412
1413    Ok(())
1414}
1415
1416#[derive(Debug, Clone)]
1417struct RunOutcome {
1418    ran_at: DateTime<Utc>,
1419    status: TaskRunStatus,
1420    artifact_dir: Option<PathBuf>,
1421    events_file: Option<PathBuf>,
1422    last_message_file: Option<PathBuf>,
1423}
1424
1425fn scheduled_workspace_label(definition: &ScheduledTaskDefinition) -> String {
1426    definition
1427        .workspace
1428        .as_ref()
1429        .map(|path| path.display().to_string())
1430        .unwrap_or_else(|| "<none>".to_string())
1431}
1432
1433fn resolve_scheduled_task_workspace(
1434    definition: &ScheduledTaskDefinition,
1435) -> Result<Option<PathBuf>> {
1436    definition
1437        .workspace
1438        .as_deref()
1439        .map(resolve_scheduled_workspace_path)
1440        .transpose()
1441        .with_context(|| {
1442            format!(
1443                "Failed to resolve scheduled task workspace {} for task {}",
1444                scheduled_workspace_label(definition),
1445                definition.id
1446            )
1447        })
1448}
1449
1450fn read_definition(path: &Path) -> Result<ScheduledTaskDefinition> {
1451    let raw =
1452        fs::read_to_string(path).with_context(|| format!("Failed to read {}", path.display()))?;
1453    toml::from_str(&raw).with_context(|| format!("Failed to parse {}", path.display()))
1454}
1455
1456fn atomic_write(path: &Path, content: &[u8]) -> Result<()> {
1457    if let Some(parent) = path.parent() {
1458        fs::create_dir_all(parent)
1459            .with_context(|| format!("Failed to create {}", parent.display()))?;
1460    }
1461
1462    let temp_name = format!(
1463        ".{}.tmp-{}",
1464        path.file_name()
1465            .and_then(|value| value.to_str())
1466            .unwrap_or("task"),
1467        NEXT_TASK_COUNTER.fetch_add(1, Ordering::Relaxed)
1468    );
1469    let temp_path = path.with_file_name(temp_name);
1470    fs::write(&temp_path, content)
1471        .with_context(|| format!("Failed to write {}", temp_path.display()))?;
1472    fs::rename(&temp_path, path)
1473        .with_context(|| format!("Failed to replace {}", path.display()))?;
1474    Ok(())
1475}
1476
1477fn try_acquire_claim(paths: &SchedulerPaths, id: &str) -> Result<bool> {
1478    paths.ensure_dirs()?;
1479    let path = paths.claim_path(id);
1480    match OpenOptions::new().write(true).create_new(true).open(&path) {
1481        Ok(mut file) => {
1482            let timestamp = Utc::now().to_rfc3339();
1483            file.write_all(timestamp.as_bytes())
1484                .with_context(|| format!("Failed to write {}", path.display()))?;
1485            Ok(true)
1486        }
1487        Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => {
1488            if claim_is_stale(&path)? {
1489                let _ = fs::remove_file(&path);
1490                return try_acquire_claim(paths, id);
1491            }
1492            Ok(false)
1493        }
1494        Err(error) => Err(error).with_context(|| format!("Failed to create {}", path.display())),
1495    }
1496}
1497
1498fn claim_is_stale(path: &Path) -> Result<bool> {
1499    let metadata =
1500        fs::metadata(path).with_context(|| format!("Failed to stat {}", path.display()))?;
1501    let modified = metadata
1502        .modified()
1503        .with_context(|| format!("Failed to read modification time for {}", path.display()))?;
1504    let elapsed = modified.elapsed().unwrap_or_default();
1505    Ok(elapsed >= Duration::from_secs(CLAIM_STALE_SECS))
1506}
1507
1508fn release_claim(paths: &SchedulerPaths, id: &str) -> Result<()> {
1509    let path = paths.claim_path(id);
1510    if path.exists() {
1511        fs::remove_file(&path).with_context(|| format!("Failed to remove {}", path.display()))?;
1512    }
1513    Ok(())
1514}
1515
1516fn summarize_task_name(summary: &str) -> String {
1517    let trimmed = summary.trim();
1518    if trimmed.is_empty() {
1519        return "Scheduled task".to_string();
1520    }
1521    let compact = vtcode_commons::formatting::collapse_whitespace(trimmed);
1522    let mut output = String::new();
1523    for ch in compact.chars().take(32) {
1524        output.push(ch);
1525    }
1526    output
1527}
1528
1529fn generate_task_id(name: &str, summary: &str, created_at: DateTime<Utc>) -> String {
1530    let counter = NEXT_TASK_COUNTER.fetch_add(1, Ordering::Relaxed);
1531    let seed = format!(
1532        "{name}|{summary}|{}|{}|{}",
1533        created_at.timestamp_nanos_opt().unwrap_or_default(),
1534        std::process::id(),
1535        counter
1536    );
1537    format!("{:08x}", stable_hash_u64(seed.as_bytes()) as u32)
1538}
1539
1540fn stable_hash_u64(bytes: &[u8]) -> u64 {
1541    let mut hash = 0xcbf29ce484222325u64;
1542    for byte in bytes {
1543        hash ^= u64::from(*byte);
1544        hash = hash.wrapping_mul(0x100000001b3);
1545    }
1546    hash
1547}
1548
1549fn humanize_interval(seconds: u64) -> String {
1550    match seconds {
1551        value if value % 86_400 == 0 => {
1552            let days = value / 86_400;
1553            if days == 1 {
1554                "every 1 day".to_string()
1555            } else {
1556                format!("every {days} days")
1557            }
1558        }
1559        value if value % 3_600 == 0 => {
1560            let hours = value / 3_600;
1561            if hours == 1 {
1562                "every 1 hour".to_string()
1563            } else {
1564                format!("every {hours} hours")
1565            }
1566        }
1567        value => {
1568            let minutes = value / 60;
1569            if minutes == 1 {
1570                "every 1 minute".to_string()
1571            } else {
1572                format!("every {minutes} minutes")
1573            }
1574        }
1575    }
1576}
1577
1578fn normalize_interval_spec(count: u64, unit: &str) -> Result<(u64, Option<String>)> {
1579    if count == 0 {
1580        bail!("Intervals must be greater than zero");
1581    }
1582
1583    let unit = unit.to_ascii_lowercase();
1584    let (raw_minutes, original) = match unit.as_str() {
1585        "s" | "sec" | "secs" | "second" | "seconds" => {
1586            let minutes = count.div_ceil(60);
1587            (minutes, format!("{count}s"))
1588        }
1589        "m" | "min" | "mins" | "minute" | "minutes" => (count, format!("{count}m")),
1590        "h" | "hr" | "hrs" | "hour" | "hours" => (count * 60, format!("{count}h")),
1591        "d" | "day" | "days" => (count * 60 * 24, format!("{count}d")),
1592        _ => bail!("Unsupported interval unit: {unit}"),
1593    };
1594
1595    let normalized_minutes = normalize_clean_minutes(raw_minutes);
1596    let note = if normalized_minutes != raw_minutes {
1597        Some(format!(
1598            "Rounded {original} to {} for scheduler cadence.",
1599            humanize_interval(normalized_minutes * 60)
1600        ))
1601    } else if raw_minutes != count && unit.starts_with('s') {
1602        Some(format!(
1603            "Rounded {original} up to {} because VT Code schedules at minute granularity.",
1604            humanize_interval(normalized_minutes * 60)
1605        ))
1606    } else {
1607        None
1608    };
1609
1610    Ok((normalized_minutes * 60, note))
1611}
1612
1613fn normalize_clean_minutes(raw_minutes: u64) -> u64 {
1614    const CLEAN_MINUTES: &[u64] = &[
1615        1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30, 60, 120, 180, 240, 360, 480, 720, 1_440,
1616    ];
1617    if CLEAN_MINUTES.contains(&raw_minutes) {
1618        return raw_minutes;
1619    }
1620    CLEAN_MINUTES
1621        .iter()
1622        .copied()
1623        .min_by_key(|candidate| {
1624            let distance = candidate.abs_diff(raw_minutes);
1625            (distance, *candidate < raw_minutes, *candidate)
1626        })
1627        .unwrap_or(raw_minutes)
1628}
1629
1630pub fn parse_local_datetime(raw: &str, now: DateTime<Local>) -> Result<DateTime<Utc>> {
1631    let trimmed = raw.trim();
1632    if trimmed.is_empty() {
1633        bail!("Time value cannot be empty");
1634    }
1635
1636    if let Ok(parsed) = DateTime::parse_from_rfc3339(trimmed) {
1637        return Ok(parsed.with_timezone(&Utc));
1638    }
1639
1640    for format in ["%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M", "%Y-%m-%d %H:%M:%S"] {
1641        if let Ok(naive) = NaiveDateTime::parse_from_str(trimmed, format) {
1642            return localize_naive_datetime(naive).map(|value| value.with_timezone(&Utc));
1643        }
1644    }
1645
1646    if let Some(captures) = TIME_ONLY_RE.captures(trimmed) {
1647        let mut hour = captures["hour"].parse::<u32>()?;
1648        let minute = captures
1649            .name("minute")
1650            .map(|value| value.as_str().parse::<u32>())
1651            .transpose()?
1652            .unwrap_or(0);
1653        if minute >= 60 {
1654            bail!("Invalid minute in time value");
1655        }
1656        if let Some(ampm) = captures
1657            .name("ampm")
1658            .map(|value| value.as_str().to_ascii_lowercase())
1659        {
1660            if hour == 0 || hour > 12 {
1661                bail!("Invalid 12-hour clock value");
1662            }
1663            if ampm == "pm" && hour != 12 {
1664                hour += 12;
1665            }
1666            if ampm == "am" && hour == 12 {
1667                hour = 0;
1668            }
1669        } else if hour >= 24 {
1670            bail!("Invalid 24-hour clock value");
1671        }
1672
1673        let time = NaiveTime::from_hms_opt(hour, minute, 0)
1674            .ok_or_else(|| anyhow!("Invalid time value"))?;
1675        let today = now.date_naive();
1676        let naive = today.and_time(time);
1677        let mut localized = localize_naive_datetime(naive)?;
1678        if localized <= now {
1679            localized = localize_naive_datetime((today + ChronoDuration::days(1)).and_time(time))?;
1680        }
1681        return Ok(localized.with_timezone(&Utc));
1682    }
1683
1684    bail!("Unsupported time format. Use RFC3339, YYYY-MM-DD HH:MM, or a local time like 3pm")
1685}
1686
1687fn localize_naive_datetime(naive: NaiveDateTime) -> Result<DateTime<Local>> {
1688    match Local.from_local_datetime(&naive) {
1689        LocalResult::Single(value) => Ok(value),
1690        LocalResult::Ambiguous(first, _) => Ok(first),
1691        LocalResult::None => bail!("Local time does not exist due to timezone transition"),
1692    }
1693}
1694
1695fn xml_escape(value: String) -> String {
1696    value
1697        .replace('&', "&amp;")
1698        .replace('<', "&lt;")
1699        .replace('>', "&gt;")
1700        .replace('"', "&quot;")
1701        .replace('\'', "&apos;")
1702}
1703
1704#[derive(Debug, Clone)]
1705struct ParsedCron {
1706    minute: CronField,
1707    hour: CronField,
1708    day_of_month: CronField,
1709    month: CronField,
1710    day_of_week: CronField,
1711}
1712
1713impl ParsedCron {
1714    fn parse(expression: &str) -> Result<Self> {
1715        let parts = expression.split_whitespace().collect::<Vec<_>>();
1716        if parts.len() != 5 {
1717            bail!("Cron expressions require exactly 5 fields");
1718        }
1719
1720        Ok(Self {
1721            minute: CronField::parse(parts[0], 0, 59, false)?,
1722            hour: CronField::parse(parts[1], 0, 23, false)?,
1723            day_of_month: CronField::parse(parts[2], 1, 31, false)?,
1724            month: CronField::parse(parts[3], 1, 12, false)?,
1725            day_of_week: CronField::parse(parts[4], 0, 7, true)?,
1726        })
1727    }
1728
1729    fn next_after(&self, after: DateTime<Local>) -> Result<Option<DateTime<Local>>> {
1730        let mut candidate = after
1731            .with_second(0)
1732            .and_then(|value| value.with_nanosecond(0))
1733            .ok_or_else(|| anyhow!("Failed to normalize cron timestamp"))?
1734            + ChronoDuration::minutes(1);
1735        let horizon = candidate + ChronoDuration::days(366 * 5);
1736
1737        while candidate <= horizon {
1738            if self.matches(candidate) {
1739                return Ok(Some(candidate));
1740            }
1741            candidate += ChronoDuration::minutes(1);
1742        }
1743
1744        Ok(None)
1745    }
1746
1747    fn matches(&self, value: DateTime<Local>) -> bool {
1748        let month = value.month();
1749        let dom = value.day();
1750        let minute = value.minute();
1751        let hour = value.hour();
1752        let dow = value.weekday().num_days_from_sunday();
1753
1754        if !self.minute.contains(minute) || !self.hour.contains(hour) || !self.month.contains(month)
1755        {
1756            return false;
1757        }
1758
1759        let dom_matches = self.day_of_month.contains(dom);
1760        let dow_matches = self.day_of_week.contains(dow);
1761
1762        if self.day_of_month.is_wildcard && self.day_of_week.is_wildcard {
1763            return true;
1764        }
1765        if self.day_of_month.is_wildcard {
1766            return dow_matches;
1767        }
1768        if self.day_of_week.is_wildcard {
1769            return dom_matches;
1770        }
1771
1772        dom_matches || dow_matches
1773    }
1774}
1775
1776#[derive(Debug, Clone)]
1777struct CronField {
1778    values: BTreeSet<u32>,
1779    is_wildcard: bool,
1780}
1781
1782impl CronField {
1783    fn parse(raw: &str, min: u32, max: u32, is_day_of_week: bool) -> Result<Self> {
1784        if raw.contains(['L', 'W', '?']) {
1785            bail!("Unsupported cron syntax in field '{raw}'");
1786        }
1787        if raw.chars().any(|ch| ch.is_ascii_alphabetic()) {
1788            bail!("Named cron aliases are not supported in '{raw}'");
1789        }
1790
1791        let mut values = BTreeSet::new();
1792        let mut is_wildcard = false;
1793        for segment in raw.split(',') {
1794            let segment = segment.trim();
1795            if segment.is_empty() {
1796                bail!("Cron field contains an empty segment");
1797            }
1798            if segment == "*" {
1799                is_wildcard = true;
1800                values.extend(min..=max);
1801                continue;
1802            }
1803
1804            let (base, step) = if let Some((left, right)) = segment.split_once('/') {
1805                let step = right
1806                    .parse::<u32>()
1807                    .with_context(|| format!("Invalid step value in '{segment}'"))?;
1808                if step == 0 {
1809                    bail!("Step value must be greater than zero");
1810                }
1811                (left, Some(step))
1812            } else {
1813                (segment, None)
1814            };
1815
1816            let mut base_values = if base == "*" {
1817                is_wildcard = true;
1818                (min..=max).collect::<Vec<_>>()
1819            } else if let Some((left, right)) = base.split_once('-') {
1820                let start = parse_cron_number(left, min, max, is_day_of_week)?;
1821                let end = parse_cron_number(right, min, max, is_day_of_week)?;
1822                if start > end {
1823                    bail!("Invalid descending range '{base}'");
1824                }
1825                (start..=end).collect::<Vec<_>>()
1826            } else {
1827                let start = parse_cron_number(base, min, max, is_day_of_week)?;
1828                if let Some(step) = step {
1829                    let mut values = Vec::new();
1830                    let mut value = start;
1831                    while value <= max {
1832                        values.push(value);
1833                        match value.checked_add(step) {
1834                            Some(next) => value = next,
1835                            None => break,
1836                        }
1837                    }
1838                    values
1839                } else {
1840                    vec![start]
1841                }
1842            };
1843
1844            if let Some(step) = step
1845                && (base == "*" || base.contains('-'))
1846            {
1847                let mut stepped = Vec::new();
1848                for (index, value) in base_values.iter().enumerate() {
1849                    if (index as u32).is_multiple_of(step) {
1850                        stepped.push(*value);
1851                    }
1852                }
1853                base_values = stepped;
1854            }
1855
1856            values.extend(base_values);
1857        }
1858
1859        Ok(Self {
1860            values,
1861            is_wildcard,
1862        })
1863    }
1864
1865    fn contains(&self, value: u32) -> bool {
1866        self.values.contains(&value)
1867    }
1868}
1869
1870fn parse_cron_number(raw: &str, min: u32, max: u32, is_day_of_week: bool) -> Result<u32> {
1871    let mut value = raw
1872        .parse::<u32>()
1873        .with_context(|| format!("Invalid cron value '{raw}'"))?;
1874    if is_day_of_week && value == 7 {
1875        value = 0;
1876    }
1877    if !(min..=max).contains(&value) {
1878        bail!("Cron value '{raw}' is out of range");
1879    }
1880    Ok(value)
1881}