1use std::collections::{BTreeSet, HashSet};
2
3use kbolt_types::{
4 AddScheduleRequest, KboltError, RemoveScheduleRequest, RemoveScheduleSelector,
5 ScheduleAddResponse, ScheduleDefinition, ScheduleInterval, ScheduleIntervalUnit,
6 ScheduleRemoveResponse, ScheduleScope, ScheduleTrigger, ScheduleWeekday,
7};
8
9use crate::lock::LockMode;
10use crate::schedule_backend::{current_schedule_backend, reconcile_schedule_backend};
11use crate::schedule_state_store::ScheduleRunStateStore;
12use crate::schedule_store::ScheduleCatalog;
13use crate::schedule_support::{schedule_id_number, schedule_id_sort_key};
14use crate::Result;
15
16use super::Engine;
17
18const MIN_SCHEDULE_INTERVAL_MINUTES: u32 = 5;
19
20impl Engine {
21 pub fn add_schedule(&self, req: AddScheduleRequest) -> Result<ScheduleAddResponse> {
22 self.add_schedule_with_backend_support_check(req, ensure_schedule_backend_supported)
23 }
24
25 fn add_schedule_with_backend_support_check(
26 &self,
27 req: AddScheduleRequest,
28 ensure_backend_supported: fn() -> Result<()>,
29 ) -> Result<ScheduleAddResponse> {
30 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
31 ensure_backend_supported()?;
32 let trigger = normalize_schedule_trigger(req.trigger)?;
33 let scope = self.normalize_schedule_scope(req.scope, true)?;
34 let mut catalog = ScheduleCatalog::load(&self.config.config_dir)?;
35
36 if let Some(existing) = catalog
37 .schedules
38 .iter()
39 .find(|schedule| schedule.trigger == trigger && schedule.scope == scope)
40 {
41 return Err(KboltError::InvalidInput(format!(
42 "schedule already exists: {}",
43 existing.id
44 ))
45 .into());
46 }
47
48 let schedule_id = format!("s{}", catalog.next_id);
49 catalog.next_id = catalog.next_id.checked_add(1).ok_or_else(|| {
50 KboltError::InvalidInput("cannot create schedule: schedule ids exhausted".to_string())
51 })?;
52
53 let schedule = ScheduleDefinition {
54 id: schedule_id,
55 trigger,
56 scope,
57 };
58 catalog.schedules.push(schedule.clone());
59 catalog.save(&self.config.config_dir)?;
60 let backend = match reconcile_schedule_backend(
61 &self.config.config_dir,
62 &self.config.cache_dir,
63 &catalog.schedules,
64 ) {
65 Ok(backend) => backend,
66 Err(err) => {
67 return Err(KboltError::Internal(format!(
68 "schedule {} was saved, but backend reconcile failed: {err}",
69 schedule.id
70 ))
71 .into())
72 }
73 };
74
75 Ok(ScheduleAddResponse { schedule, backend })
76 }
77
78 pub fn list_schedules(&self) -> Result<Vec<ScheduleDefinition>> {
79 let _lock = self.acquire_operation_lock(LockMode::Shared)?;
80 let mut schedules = ScheduleCatalog::load(&self.config.config_dir)?.schedules;
81 schedules.sort_by_key(|schedule| schedule_id_sort_key(&schedule.id));
82 Ok(schedules)
83 }
84
85 pub fn remove_schedule(&self, req: RemoveScheduleRequest) -> Result<ScheduleRemoveResponse> {
86 self.remove_schedule_with_backend_support_check(req, ensure_schedule_backend_supported)
87 }
88
89 fn remove_schedule_with_backend_support_check(
90 &self,
91 req: RemoveScheduleRequest,
92 ensure_backend_supported: fn() -> Result<()>,
93 ) -> Result<ScheduleRemoveResponse> {
94 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
95 ensure_backend_supported()?;
96 let mut catalog = ScheduleCatalog::load(&self.config.config_dir)?;
97 let removed_ids = self.resolve_removed_schedule_ids(req.selector, &catalog.schedules)?;
98
99 if removed_ids.is_empty() {
100 reconcile_schedule_backend(
101 &self.config.config_dir,
102 &self.config.cache_dir,
103 &catalog.schedules,
104 )?;
105 return Ok(ScheduleRemoveResponse { removed_ids });
106 }
107
108 let removed = removed_ids.iter().cloned().collect::<HashSet<_>>();
109 catalog
110 .schedules
111 .retain(|schedule| !removed.contains(&schedule.id));
112 catalog.save(&self.config.config_dir)?;
113 for schedule_id in &removed_ids {
114 ScheduleRunStateStore::remove(&self.config.cache_dir, schedule_id)?;
115 }
116 if let Err(err) = reconcile_schedule_backend(
117 &self.config.config_dir,
118 &self.config.cache_dir,
119 &catalog.schedules,
120 ) {
121 return Err(KboltError::Internal(format!(
122 "removed schedules {}, but backend reconcile failed: {err}",
123 removed_ids.join(", ")
124 ))
125 .into());
126 }
127
128 Ok(ScheduleRemoveResponse { removed_ids })
129 }
130
131 fn normalize_schedule_scope(
132 &self,
133 scope: ScheduleScope,
134 validate_targets: bool,
135 ) -> Result<ScheduleScope> {
136 match scope {
137 ScheduleScope::All => Ok(ScheduleScope::All),
138 ScheduleScope::Space { space } => {
139 let normalized_space = normalize_scope_name("space", &space)?;
140 if validate_targets {
141 let resolved = self.storage.get_space(&normalized_space)?;
142 return Ok(ScheduleScope::Space {
143 space: resolved.name,
144 });
145 }
146
147 Ok(ScheduleScope::Space {
148 space: normalized_space,
149 })
150 }
151 ScheduleScope::Collections { space, collections } => {
152 let normalized_space = normalize_scope_name("space", &space)?;
153 let normalized_collections = normalize_collection_names(collections)?;
154
155 if validate_targets {
156 let resolved = self.storage.get_space(&normalized_space)?;
157 for collection in &normalized_collections {
158 self.storage.get_collection(resolved.id, collection)?;
159 }
160 return Ok(ScheduleScope::Collections {
161 space: resolved.name,
162 collections: normalized_collections,
163 });
164 }
165
166 Ok(ScheduleScope::Collections {
167 space: normalized_space,
168 collections: normalized_collections,
169 })
170 }
171 }
172 }
173
174 fn resolve_removed_schedule_ids(
175 &self,
176 selector: RemoveScheduleSelector,
177 schedules: &[ScheduleDefinition],
178 ) -> Result<Vec<String>> {
179 match selector {
180 RemoveScheduleSelector::All => Ok(schedules
181 .iter()
182 .map(|schedule| schedule.id.clone())
183 .collect()),
184 RemoveScheduleSelector::Id { id } => {
185 let normalized_id = normalize_schedule_id(&id)?;
186 if schedules
187 .iter()
188 .any(|schedule| schedule.id == normalized_id)
189 {
190 return Ok(vec![normalized_id]);
191 }
192
193 Err(KboltError::InvalidInput(format!("schedule not found: {normalized_id}")).into())
194 }
195 RemoveScheduleSelector::Scope { scope } => {
196 let normalized_scope = self.normalize_schedule_scope(scope, false)?;
197 let mut matches = schedules
198 .iter()
199 .filter(|schedule| schedule.scope == normalized_scope)
200 .map(|schedule| schedule.id.clone())
201 .collect::<Vec<_>>();
202 matches.sort_by_key(|id| schedule_id_sort_key(id));
203
204 match matches.len() {
205 0 => Err(KboltError::InvalidInput(
206 "no schedules matched the requested scope".to_string(),
207 )
208 .into()),
209 1 => Ok(matches),
210 _ => Err(KboltError::InvalidInput(format!(
211 "schedule scope matched multiple schedules: {}",
212 matches.join(", ")
213 ))
214 .into()),
215 }
216 }
217 }
218 }
219}
220
221pub(super) fn load_schedule_definition(
222 config_dir: &std::path::Path,
223 id: &str,
224) -> Result<ScheduleDefinition> {
225 let normalized_id = normalize_schedule_id(id)?;
226 let catalog = ScheduleCatalog::load(config_dir)?;
227 catalog
228 .schedules
229 .into_iter()
230 .find(|schedule| schedule.id == normalized_id)
231 .ok_or_else(|| {
232 KboltError::InvalidInput(format!("schedule not found: {normalized_id}")).into()
233 })
234}
235
236fn normalize_schedule_trigger(trigger: ScheduleTrigger) -> Result<ScheduleTrigger> {
237 match trigger {
238 ScheduleTrigger::Every { interval } => Ok(ScheduleTrigger::Every {
239 interval: normalize_schedule_interval(interval)?,
240 }),
241 ScheduleTrigger::Daily { time } => Ok(ScheduleTrigger::Daily {
242 time: normalize_schedule_time(&time)?,
243 }),
244 ScheduleTrigger::Weekly { weekdays, time } => Ok(ScheduleTrigger::Weekly {
245 weekdays: normalize_schedule_weekdays(weekdays)?,
246 time: normalize_schedule_time(&time)?,
247 }),
248 }
249}
250
251fn normalize_schedule_interval(interval: ScheduleInterval) -> Result<ScheduleInterval> {
252 if interval.value == 0 {
253 return Err(KboltError::InvalidInput(
254 "schedule interval must be greater than zero".to_string(),
255 )
256 .into());
257 }
258
259 match interval.unit {
260 ScheduleIntervalUnit::Minutes if interval.value < MIN_SCHEDULE_INTERVAL_MINUTES => {
261 Err(KboltError::InvalidInput(format!(
262 "schedule interval must be at least {MIN_SCHEDULE_INTERVAL_MINUTES} minutes"
263 ))
264 .into())
265 }
266 ScheduleIntervalUnit::Minutes | ScheduleIntervalUnit::Hours => Ok(interval),
267 }
268}
269
270fn normalize_schedule_weekdays(weekdays: Vec<ScheduleWeekday>) -> Result<Vec<ScheduleWeekday>> {
271 let normalized = weekdays.into_iter().collect::<BTreeSet<_>>();
272 if normalized.is_empty() {
273 return Err(KboltError::InvalidInput(
274 "weekly schedules require at least one weekday".to_string(),
275 )
276 .into());
277 }
278
279 Ok(normalized.into_iter().collect())
280}
281
282fn normalize_schedule_time(input: &str) -> Result<String> {
283 let trimmed = input.trim();
284 if trimmed.is_empty() {
285 return Err(KboltError::InvalidInput("schedule time must not be empty".to_string()).into());
286 }
287
288 let collapsed = trimmed
289 .chars()
290 .filter(|ch| !ch.is_whitespace())
291 .collect::<String>()
292 .to_ascii_lowercase();
293
294 let (time_part, meridiem) = if let Some(time) = collapsed.strip_suffix("am") {
295 (time, Some("am"))
296 } else if let Some(time) = collapsed.strip_suffix("pm") {
297 (time, Some("pm"))
298 } else {
299 (collapsed.as_str(), None)
300 };
301
302 if time_part.is_empty() {
303 return Err(invalid_schedule_time(input).into());
304 }
305
306 let (mut hour, minute) = if let Some((hour_part, minute_part)) = time_part.split_once(':') {
307 if minute_part.contains(':') {
308 return Err(invalid_schedule_time(input).into());
309 }
310 (
311 parse_time_component(hour_part, input)?,
312 parse_time_component(minute_part, input)?,
313 )
314 } else {
315 if meridiem.is_none() {
316 return Err(invalid_schedule_time(input).into());
317 }
318 (parse_time_component(time_part, input)?, 0)
319 };
320
321 if minute > 59 {
322 return Err(invalid_schedule_time(input).into());
323 }
324
325 match meridiem {
326 Some("am") => {
327 if hour == 0 || hour > 12 {
328 return Err(invalid_schedule_time(input).into());
329 }
330 if hour == 12 {
331 hour = 0;
332 }
333 }
334 Some("pm") => {
335 if hour == 0 || hour > 12 {
336 return Err(invalid_schedule_time(input).into());
337 }
338 if hour != 12 {
339 hour += 12;
340 }
341 }
342 None => {
343 if hour > 23 {
344 return Err(invalid_schedule_time(input).into());
345 }
346 }
347 Some(_) => unreachable!("only am/pm meridiems are supported"),
348 }
349
350 Ok(format!("{hour:02}:{minute:02}"))
351}
352
353fn parse_time_component(component: &str, input: &str) -> Result<u32> {
354 if component.is_empty() {
355 return Err(invalid_schedule_time(input).into());
356 }
357
358 component
359 .parse::<u32>()
360 .map_err(|_| invalid_schedule_time(input).into())
361}
362
363fn invalid_schedule_time(input: &str) -> KboltError {
364 KboltError::InvalidInput(format!(
365 "invalid schedule time '{input}': use HH:MM, 3pm, or 3:00pm"
366 ))
367}
368
369fn normalize_scope_name(label: &str, name: &str) -> Result<String> {
370 let trimmed = name.trim();
371 if trimmed.is_empty() {
372 return Err(KboltError::InvalidInput(format!("{label} name must not be empty")).into());
373 }
374 Ok(trimmed.to_string())
375}
376
377fn normalize_collection_names(collections: Vec<String>) -> Result<Vec<String>> {
378 let mut normalized = BTreeSet::new();
379 for collection in collections {
380 let trimmed = collection.trim();
381 if trimmed.is_empty() {
382 return Err(
383 KboltError::InvalidInput("collection names must not be empty".to_string()).into(),
384 );
385 }
386 normalized.insert(trimmed.to_string());
387 }
388
389 if normalized.is_empty() {
390 return Err(KboltError::InvalidInput(
391 "collection scope must include at least one collection".to_string(),
392 )
393 .into());
394 }
395
396 Ok(normalized.into_iter().collect())
397}
398
399fn normalize_schedule_id(id: &str) -> Result<String> {
400 let normalized = id.trim().to_ascii_lowercase();
401 if normalized.is_empty() {
402 return Err(KboltError::InvalidInput("schedule id must not be empty".to_string()).into());
403 }
404
405 if schedule_id_number(&normalized).is_none() {
406 return Err(KboltError::InvalidInput(format!("invalid schedule id: {id}")).into());
407 }
408
409 Ok(normalized)
410}
411
412fn ensure_schedule_backend_supported() -> Result<()> {
413 current_schedule_backend().map(|_| ())
414}
415
416#[cfg(test)]
417mod tests {
418 use std::mem;
419
420 use tempfile::tempdir;
421
422 use super::Engine;
423 use crate::config::{ChunkingConfig, Config, RankingConfig, ReapingConfig};
424 use crate::schedule_state_store::ScheduleRunStateStore;
425 use crate::schedule_store::ScheduleCatalog;
426 use crate::storage::Storage;
427 use kbolt_types::{
428 AddScheduleRequest, KboltError, RemoveScheduleRequest, RemoveScheduleSelector,
429 ScheduleRunResult, ScheduleRunState, ScheduleScope, ScheduleTrigger,
430 };
431
432 #[test]
433 fn add_schedule_does_not_persist_when_backend_support_check_fails() {
434 let engine = test_engine();
435
436 let err = engine
437 .add_schedule_with_backend_support_check(
438 AddScheduleRequest {
439 trigger: ScheduleTrigger::Daily {
440 time: "09:00".to_string(),
441 },
442 scope: ScheduleScope::All,
443 },
444 unsupported_schedule_backend,
445 )
446 .expect_err("unsupported backend should fail");
447
448 match KboltError::from(err) {
449 KboltError::InvalidInput(message) => {
450 assert!(
451 message.contains("schedule is not supported on this platform"),
452 "unexpected message: {message}"
453 );
454 }
455 other => panic!("unexpected error: {other}"),
456 }
457
458 let catalog =
459 ScheduleCatalog::load(&engine.config.config_dir).expect("load unchanged schedule file");
460 assert_eq!(catalog, ScheduleCatalog::default());
461 }
462
463 #[test]
464 fn remove_schedule_does_not_mutate_when_backend_support_check_fails() {
465 let engine = test_engine();
466 let added = engine
467 .add_schedule(AddScheduleRequest {
468 trigger: ScheduleTrigger::Daily {
469 time: "09:00".to_string(),
470 },
471 scope: ScheduleScope::All,
472 })
473 .expect("add schedule");
474 let saved_state = ScheduleRunState {
475 last_started: Some("2026-03-07T12:00:00Z".to_string()),
476 last_finished: Some("2026-03-07T12:00:09Z".to_string()),
477 last_result: Some(ScheduleRunResult::Success),
478 last_error: None,
479 };
480 ScheduleRunStateStore::save(&engine.config.cache_dir, &added.schedule.id, &saved_state)
481 .expect("save run state");
482
483 let err = engine
484 .remove_schedule_with_backend_support_check(
485 RemoveScheduleRequest {
486 selector: RemoveScheduleSelector::Id {
487 id: added.schedule.id.clone(),
488 },
489 },
490 unsupported_schedule_backend,
491 )
492 .expect_err("unsupported backend should fail");
493
494 match KboltError::from(err) {
495 KboltError::InvalidInput(message) => {
496 assert!(
497 message.contains("schedule is not supported on this platform"),
498 "unexpected message: {message}"
499 );
500 }
501 other => panic!("unexpected error: {other}"),
502 }
503
504 let catalog =
505 ScheduleCatalog::load(&engine.config.config_dir).expect("load unchanged schedule file");
506 assert_eq!(catalog.schedules, vec![added.schedule.clone()]);
507 let loaded_state =
508 ScheduleRunStateStore::load(&engine.config.cache_dir, &added.schedule.id)
509 .expect("load preserved run state");
510 assert_eq!(loaded_state, saved_state);
511 }
512
513 fn unsupported_schedule_backend() -> crate::Result<()> {
514 Err(
515 KboltError::InvalidInput("schedule is not supported on this platform".to_string())
516 .into(),
517 )
518 }
519
520 fn test_engine() -> Engine {
521 let root = tempdir().expect("create temp root");
522 let root_path = root.path().to_path_buf();
523 mem::forget(root);
524 let config_dir = root_path.join("config");
525 let cache_dir = root_path.join("cache");
526 let storage = Storage::new(&cache_dir).expect("create storage");
527 let config = Config {
528 config_dir,
529 cache_dir,
530 default_space: None,
531 providers: std::collections::HashMap::new(),
532 roles: crate::config::RoleBindingsConfig::default(),
533 reaping: ReapingConfig { days: 7 },
534 chunking: ChunkingConfig::default(),
535 ranking: RankingConfig::default(),
536 };
537 Engine::from_parts(storage, config)
538 }
539}