kbolt_core/engine/
schedule_run_ops.rs1use crate::lock::LockMode;
2use crate::schedule_state_store::ScheduleRunStateStore;
3use crate::Result;
4use kbolt_types::{KboltError, ScheduleRunResult, ScheduleRunState, ScheduleScope, UpdateOptions};
5use time::format_description::well_known::Rfc3339;
6use time::OffsetDateTime;
7
8use super::{schedule_ops::load_schedule_definition, Engine};
9
10impl Engine {
11 pub fn run_schedule(&self, id: &str) -> Result<ScheduleRunState> {
12 let schedule = load_schedule_definition(&self.config.config_dir, id)?;
13 let started_at = utc_now_string()?;
14 let mut state = ScheduleRunState {
15 last_started: Some(started_at),
16 last_finished: None,
17 last_result: None,
18 last_error: None,
19 };
20 ScheduleRunStateStore::save(&self.config.cache_dir, &schedule.id, &state)?;
21
22 let lock = match self.acquire_operation_lock(LockMode::Exclusive) {
23 Ok(lock) => lock,
24 Err(err) if is_lock_contention_error(&err) => {
25 state.last_finished = Some(utc_now_string()?);
26 state.last_result = Some(ScheduleRunResult::SkippedLock);
27 state.last_error = None;
28 ScheduleRunStateStore::save(&self.config.cache_dir, &schedule.id, &state)?;
29 return Ok(state);
30 }
31 Err(err) => {
32 persist_failed_run_state(
33 &self.config.cache_dir,
34 &schedule.id,
35 &mut state,
36 err.to_string(),
37 )?;
38 return Err(err);
39 }
40 };
41
42 let schedule = match load_schedule_definition(&self.config.config_dir, &schedule.id) {
43 Ok(schedule) => schedule,
44 Err(err) => {
45 persist_failed_run_state(
46 &self.config.cache_dir,
47 &schedule.id,
48 &mut state,
49 err.to_string(),
50 )?;
51 return Err(err);
52 }
53 };
54
55 let run_result = self.update_unlocked(update_options_for_schedule(&schedule.scope));
56 drop(lock);
57
58 match run_result {
59 Ok(_) => {
60 state.last_finished = Some(utc_now_string()?);
61 state.last_result = Some(ScheduleRunResult::Success);
62 state.last_error = None;
63 ScheduleRunStateStore::save(&self.config.cache_dir, &schedule.id, &state)?;
64 Ok(state)
65 }
66 Err(err) => {
67 persist_failed_run_state(
68 &self.config.cache_dir,
69 &schedule.id,
70 &mut state,
71 err.to_string(),
72 )?;
73 Err(err)
74 }
75 }
76 }
77
78 pub fn schedule_run_state(&self, id: &str) -> Result<ScheduleRunState> {
79 let schedule = load_schedule_definition(&self.config.config_dir, id)?;
80 ScheduleRunStateStore::load(&self.config.cache_dir, &schedule.id)
81 }
82}
83
84fn update_options_for_schedule(scope: &ScheduleScope) -> UpdateOptions {
85 match scope {
86 ScheduleScope::All => UpdateOptions {
87 space: None,
88 collections: Vec::new(),
89 no_embed: false,
90 dry_run: false,
91 verbose: false,
92 },
93 ScheduleScope::Space { space } => UpdateOptions {
94 space: Some(space.clone()),
95 collections: Vec::new(),
96 no_embed: false,
97 dry_run: false,
98 verbose: false,
99 },
100 ScheduleScope::Collections { space, collections } => UpdateOptions {
101 space: Some(space.clone()),
102 collections: collections.clone(),
103 no_embed: false,
104 dry_run: false,
105 verbose: false,
106 },
107 }
108}
109
110fn is_lock_contention_error(err: &crate::error::CoreError) -> bool {
111 err.to_string().contains("Another kbolt process is active")
112}
113
114fn persist_failed_run_state(
115 cache_dir: &std::path::Path,
116 schedule_id: &str,
117 state: &mut ScheduleRunState,
118 error_message: String,
119) -> Result<()> {
120 state.last_finished = Some(utc_now_string()?);
121 state.last_result = Some(ScheduleRunResult::Failed);
122 state.last_error = Some(error_message);
123
124 if let Err(save_err) = ScheduleRunStateStore::save(cache_dir, schedule_id, &state) {
125 let run_error = state.last_error.as_deref().unwrap_or("schedule run failed");
126 return Err(KboltError::Internal(format!(
127 "schedule run failed: {run_error}; additionally failed to save run state: {save_err}"
128 ))
129 .into());
130 }
131
132 Ok(())
133}
134
135fn utc_now_string() -> Result<String> {
136 OffsetDateTime::now_utc().format(&Rfc3339).map_err(|err| {
137 KboltError::Internal(format!("failed to format utc timestamp: {err}")).into()
138 })
139}
140
141#[cfg(test)]
142mod tests {
143 use tempfile::tempdir;
144
145 use super::persist_failed_run_state;
146 use kbolt_types::{KboltError, ScheduleRunResult, ScheduleRunState};
147
148 #[test]
149 fn persist_failed_run_state_preserves_original_error_when_save_also_fails() {
150 let tmp = tempdir().expect("create tempdir");
151 std::fs::write(tmp.path().join("schedules"), "conflict").expect("write conflicting file");
152
153 let err = persist_failed_run_state(
154 tmp.path(),
155 "s1",
156 &mut ScheduleRunState::default(),
157 "target update failed".to_string(),
158 )
159 .expect_err("save failure should be reported");
160
161 match KboltError::from(err) {
162 KboltError::Internal(message) => {
163 assert!(
164 message.contains("schedule run failed: target update failed"),
165 "unexpected message: {message}"
166 );
167 assert!(
168 message.contains("failed to save run state"),
169 "unexpected message: {message}"
170 );
171 }
172 other => panic!("unexpected error: {other}"),
173 }
174 }
175
176 #[test]
177 fn persist_failed_run_state_sets_failed_fields_before_saving() {
178 let tmp = tempdir().expect("create tempdir");
179 let mut state = ScheduleRunState::default();
180
181 persist_failed_run_state(
182 tmp.path(),
183 "s1",
184 &mut state,
185 "target update failed".to_string(),
186 )
187 .expect("save failed state");
188
189 assert!(state.last_started.is_none());
190 assert!(state.last_finished.is_some());
191 assert_eq!(state.last_result, Some(ScheduleRunResult::Failed));
192 assert_eq!(state.last_error.as_deref(), Some("target update failed"));
193 }
194}