Skip to main content

kbolt_core/engine/
schedule_run_ops.rs

1use crate::lock::LockMode;
2use crate::schedule_state_store::ScheduleRunStateStore;
3use crate::Result;
4use kbolt_types::{KboltError, ScheduleRunResult, ScheduleRunState, ScheduleScope, UpdateOptions};
5use time::format_description::well_known::Rfc3339;
6use time::OffsetDateTime;
7
8use super::{schedule_ops::load_schedule_definition, Engine};
9
10impl Engine {
11    pub fn run_schedule(&self, id: &str) -> Result<ScheduleRunState> {
12        let schedule = load_schedule_definition(&self.config.config_dir, id)?;
13        let started_at = utc_now_string()?;
14        let mut state = ScheduleRunState {
15            last_started: Some(started_at),
16            last_finished: None,
17            last_result: None,
18            last_error: None,
19        };
20        ScheduleRunStateStore::save(&self.config.cache_dir, &schedule.id, &state)?;
21
22        let lock = match self.acquire_operation_lock(LockMode::Exclusive) {
23            Ok(lock) => lock,
24            Err(err) if is_lock_contention_error(&err) => {
25                state.last_finished = Some(utc_now_string()?);
26                state.last_result = Some(ScheduleRunResult::SkippedLock);
27                state.last_error = None;
28                ScheduleRunStateStore::save(&self.config.cache_dir, &schedule.id, &state)?;
29                return Ok(state);
30            }
31            Err(err) => {
32                persist_failed_run_state(
33                    &self.config.cache_dir,
34                    &schedule.id,
35                    &mut state,
36                    err.to_string(),
37                )?;
38                return Err(err);
39            }
40        };
41
42        let schedule = match load_schedule_definition(&self.config.config_dir, &schedule.id) {
43            Ok(schedule) => schedule,
44            Err(err) => {
45                persist_failed_run_state(
46                    &self.config.cache_dir,
47                    &schedule.id,
48                    &mut state,
49                    err.to_string(),
50                )?;
51                return Err(err);
52            }
53        };
54
55        let run_result = self.update_unlocked(update_options_for_schedule(&schedule.scope));
56        drop(lock);
57
58        match run_result {
59            Ok(_) => {
60                state.last_finished = Some(utc_now_string()?);
61                state.last_result = Some(ScheduleRunResult::Success);
62                state.last_error = None;
63                ScheduleRunStateStore::save(&self.config.cache_dir, &schedule.id, &state)?;
64                Ok(state)
65            }
66            Err(err) => {
67                persist_failed_run_state(
68                    &self.config.cache_dir,
69                    &schedule.id,
70                    &mut state,
71                    err.to_string(),
72                )?;
73                Err(err)
74            }
75        }
76    }
77
78    pub fn schedule_run_state(&self, id: &str) -> Result<ScheduleRunState> {
79        let schedule = load_schedule_definition(&self.config.config_dir, id)?;
80        ScheduleRunStateStore::load(&self.config.cache_dir, &schedule.id)
81    }
82}
83
84fn update_options_for_schedule(scope: &ScheduleScope) -> UpdateOptions {
85    match scope {
86        ScheduleScope::All => UpdateOptions {
87            space: None,
88            collections: Vec::new(),
89            no_embed: false,
90            dry_run: false,
91            verbose: false,
92        },
93        ScheduleScope::Space { space } => UpdateOptions {
94            space: Some(space.clone()),
95            collections: Vec::new(),
96            no_embed: false,
97            dry_run: false,
98            verbose: false,
99        },
100        ScheduleScope::Collections { space, collections } => UpdateOptions {
101            space: Some(space.clone()),
102            collections: collections.clone(),
103            no_embed: false,
104            dry_run: false,
105            verbose: false,
106        },
107    }
108}
109
110fn is_lock_contention_error(err: &crate::error::CoreError) -> bool {
111    err.to_string().contains("Another kbolt process is active")
112}
113
114fn persist_failed_run_state(
115    cache_dir: &std::path::Path,
116    schedule_id: &str,
117    state: &mut ScheduleRunState,
118    error_message: String,
119) -> Result<()> {
120    state.last_finished = Some(utc_now_string()?);
121    state.last_result = Some(ScheduleRunResult::Failed);
122    state.last_error = Some(error_message);
123
124    if let Err(save_err) = ScheduleRunStateStore::save(cache_dir, schedule_id, &state) {
125        let run_error = state.last_error.as_deref().unwrap_or("schedule run failed");
126        return Err(KboltError::Internal(format!(
127            "schedule run failed: {run_error}; additionally failed to save run state: {save_err}"
128        ))
129        .into());
130    }
131
132    Ok(())
133}
134
135fn utc_now_string() -> Result<String> {
136    OffsetDateTime::now_utc().format(&Rfc3339).map_err(|err| {
137        KboltError::Internal(format!("failed to format utc timestamp: {err}")).into()
138    })
139}
140
141#[cfg(test)]
142mod tests {
143    use tempfile::tempdir;
144
145    use super::persist_failed_run_state;
146    use kbolt_types::{KboltError, ScheduleRunResult, ScheduleRunState};
147
148    #[test]
149    fn persist_failed_run_state_preserves_original_error_when_save_also_fails() {
150        let tmp = tempdir().expect("create tempdir");
151        std::fs::write(tmp.path().join("schedules"), "conflict").expect("write conflicting file");
152
153        let err = persist_failed_run_state(
154            tmp.path(),
155            "s1",
156            &mut ScheduleRunState::default(),
157            "target update failed".to_string(),
158        )
159        .expect_err("save failure should be reported");
160
161        match KboltError::from(err) {
162            KboltError::Internal(message) => {
163                assert!(
164                    message.contains("schedule run failed: target update failed"),
165                    "unexpected message: {message}"
166                );
167                assert!(
168                    message.contains("failed to save run state"),
169                    "unexpected message: {message}"
170                );
171            }
172            other => panic!("unexpected error: {other}"),
173        }
174    }
175
176    #[test]
177    fn persist_failed_run_state_sets_failed_fields_before_saving() {
178        let tmp = tempdir().expect("create tempdir");
179        let mut state = ScheduleRunState::default();
180
181        persist_failed_run_state(
182            tmp.path(),
183            "s1",
184            &mut state,
185            "target update failed".to_string(),
186        )
187        .expect("save failed state");
188
189        assert!(state.last_started.is_none());
190        assert!(state.last_finished.is_some());
191        assert_eq!(state.last_result, Some(ScheduleRunResult::Failed));
192        assert_eq!(state.last_error.as_deref(), Some("target update failed"));
193    }
194}