Skip to main content

kbolt_core/engine/
schedule_ops.rs

1use std::collections::{BTreeSet, HashSet};
2
3use kbolt_types::{
4    AddScheduleRequest, KboltError, RemoveScheduleRequest, RemoveScheduleSelector,
5    ScheduleAddResponse, ScheduleDefinition, ScheduleInterval, ScheduleIntervalUnit,
6    ScheduleRemoveResponse, ScheduleScope, ScheduleTrigger, ScheduleWeekday,
7};
8
9use crate::lock::LockMode;
10use crate::schedule_backend::{current_schedule_backend, reconcile_schedule_backend};
11use crate::schedule_state_store::ScheduleRunStateStore;
12use crate::schedule_store::ScheduleCatalog;
13use crate::schedule_support::{schedule_id_number, schedule_id_sort_key};
14use crate::Result;
15
16use super::Engine;
17
18const MIN_SCHEDULE_INTERVAL_MINUTES: u32 = 5;
19
20impl Engine {
21    pub fn add_schedule(&self, req: AddScheduleRequest) -> Result<ScheduleAddResponse> {
22        self.add_schedule_with_backend_support_check(req, ensure_schedule_backend_supported)
23    }
24
25    fn add_schedule_with_backend_support_check(
26        &self,
27        req: AddScheduleRequest,
28        ensure_backend_supported: fn() -> Result<()>,
29    ) -> Result<ScheduleAddResponse> {
30        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
31        ensure_backend_supported()?;
32        let trigger = normalize_schedule_trigger(req.trigger)?;
33        let scope = self.normalize_schedule_scope(req.scope, true)?;
34        let mut catalog = ScheduleCatalog::load(&self.config.config_dir)?;
35
36        if let Some(existing) = catalog
37            .schedules
38            .iter()
39            .find(|schedule| schedule.trigger == trigger && schedule.scope == scope)
40        {
41            return Err(KboltError::InvalidInput(format!(
42                "schedule already exists: {}",
43                existing.id
44            ))
45            .into());
46        }
47
48        let schedule_id = format!("s{}", catalog.next_id);
49        catalog.next_id = catalog.next_id.checked_add(1).ok_or_else(|| {
50            KboltError::InvalidInput("cannot create schedule: schedule ids exhausted".to_string())
51        })?;
52
53        let schedule = ScheduleDefinition {
54            id: schedule_id,
55            trigger,
56            scope,
57        };
58        catalog.schedules.push(schedule.clone());
59        catalog.save(&self.config.config_dir)?;
60        let backend = match reconcile_schedule_backend(
61            &self.config.config_dir,
62            &self.config.cache_dir,
63            &catalog.schedules,
64        ) {
65            Ok(backend) => backend,
66            Err(err) => {
67                return Err(KboltError::Internal(format!(
68                    "schedule {} was saved, but backend reconcile failed: {err}",
69                    schedule.id
70                ))
71                .into())
72            }
73        };
74
75        Ok(ScheduleAddResponse { schedule, backend })
76    }
77
78    pub fn list_schedules(&self) -> Result<Vec<ScheduleDefinition>> {
79        let mut schedules = ScheduleCatalog::load(&self.config.config_dir)?.schedules;
80        schedules.sort_by_key(|schedule| schedule_id_sort_key(&schedule.id));
81        Ok(schedules)
82    }
83
84    pub fn remove_schedule(&self, req: RemoveScheduleRequest) -> Result<ScheduleRemoveResponse> {
85        self.remove_schedule_with_backend_support_check(req, ensure_schedule_backend_supported)
86    }
87
88    fn remove_schedule_with_backend_support_check(
89        &self,
90        req: RemoveScheduleRequest,
91        ensure_backend_supported: fn() -> Result<()>,
92    ) -> Result<ScheduleRemoveResponse> {
93        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
94        ensure_backend_supported()?;
95        let mut catalog = ScheduleCatalog::load(&self.config.config_dir)?;
96        let removed_ids = self.resolve_removed_schedule_ids(req.selector, &catalog.schedules)?;
97
98        if removed_ids.is_empty() {
99            reconcile_schedule_backend(
100                &self.config.config_dir,
101                &self.config.cache_dir,
102                &catalog.schedules,
103            )?;
104            return Ok(ScheduleRemoveResponse { removed_ids });
105        }
106
107        let removed = removed_ids.iter().cloned().collect::<HashSet<_>>();
108        catalog
109            .schedules
110            .retain(|schedule| !removed.contains(&schedule.id));
111        catalog.save(&self.config.config_dir)?;
112        for schedule_id in &removed_ids {
113            ScheduleRunStateStore::remove(&self.config.cache_dir, schedule_id)?;
114        }
115        if let Err(err) = reconcile_schedule_backend(
116            &self.config.config_dir,
117            &self.config.cache_dir,
118            &catalog.schedules,
119        ) {
120            return Err(KboltError::Internal(format!(
121                "removed schedules {}, but backend reconcile failed: {err}",
122                removed_ids.join(", ")
123            ))
124            .into());
125        }
126
127        Ok(ScheduleRemoveResponse { removed_ids })
128    }
129
130    fn normalize_schedule_scope(
131        &self,
132        scope: ScheduleScope,
133        validate_targets: bool,
134    ) -> Result<ScheduleScope> {
135        match scope {
136            ScheduleScope::All => Ok(ScheduleScope::All),
137            ScheduleScope::Space { space } => {
138                let normalized_space = normalize_scope_name("space", &space)?;
139                if validate_targets {
140                    let resolved = self.storage.get_space(&normalized_space)?;
141                    return Ok(ScheduleScope::Space {
142                        space: resolved.name,
143                    });
144                }
145
146                Ok(ScheduleScope::Space {
147                    space: normalized_space,
148                })
149            }
150            ScheduleScope::Collections { space, collections } => {
151                let normalized_space = normalize_scope_name("space", &space)?;
152                let normalized_collections = normalize_collection_names(collections)?;
153
154                if validate_targets {
155                    let resolved = self.storage.get_space(&normalized_space)?;
156                    for collection in &normalized_collections {
157                        self.storage.get_collection(resolved.id, collection)?;
158                    }
159                    return Ok(ScheduleScope::Collections {
160                        space: resolved.name,
161                        collections: normalized_collections,
162                    });
163                }
164
165                Ok(ScheduleScope::Collections {
166                    space: normalized_space,
167                    collections: normalized_collections,
168                })
169            }
170        }
171    }
172
173    fn resolve_removed_schedule_ids(
174        &self,
175        selector: RemoveScheduleSelector,
176        schedules: &[ScheduleDefinition],
177    ) -> Result<Vec<String>> {
178        match selector {
179            RemoveScheduleSelector::All => Ok(schedules
180                .iter()
181                .map(|schedule| schedule.id.clone())
182                .collect()),
183            RemoveScheduleSelector::Id { id } => {
184                let normalized_id = normalize_schedule_id(&id)?;
185                if schedules
186                    .iter()
187                    .any(|schedule| schedule.id == normalized_id)
188                {
189                    return Ok(vec![normalized_id]);
190                }
191
192                Err(KboltError::InvalidInput(format!("schedule not found: {normalized_id}")).into())
193            }
194            RemoveScheduleSelector::Scope { scope } => {
195                let normalized_scope = self.normalize_schedule_scope(scope, false)?;
196                let mut matches = schedules
197                    .iter()
198                    .filter(|schedule| schedule.scope == normalized_scope)
199                    .map(|schedule| schedule.id.clone())
200                    .collect::<Vec<_>>();
201                matches.sort_by_key(|id| schedule_id_sort_key(id));
202
203                match matches.len() {
204                    0 => Err(KboltError::InvalidInput(
205                        "no schedules matched the requested scope".to_string(),
206                    )
207                    .into()),
208                    1 => Ok(matches),
209                    _ => Err(KboltError::InvalidInput(format!(
210                        "schedule scope matched multiple schedules: {}",
211                        matches.join(", ")
212                    ))
213                    .into()),
214                }
215            }
216        }
217    }
218}
219
220pub(super) fn load_schedule_definition(
221    config_dir: &std::path::Path,
222    id: &str,
223) -> Result<ScheduleDefinition> {
224    let normalized_id = normalize_schedule_id(id)?;
225    let catalog = ScheduleCatalog::load(config_dir)?;
226    catalog
227        .schedules
228        .into_iter()
229        .find(|schedule| schedule.id == normalized_id)
230        .ok_or_else(|| {
231            KboltError::InvalidInput(format!("schedule not found: {normalized_id}")).into()
232        })
233}
234
235fn normalize_schedule_trigger(trigger: ScheduleTrigger) -> Result<ScheduleTrigger> {
236    match trigger {
237        ScheduleTrigger::Every { interval } => Ok(ScheduleTrigger::Every {
238            interval: normalize_schedule_interval(interval)?,
239        }),
240        ScheduleTrigger::Daily { time } => Ok(ScheduleTrigger::Daily {
241            time: normalize_schedule_time(&time)?,
242        }),
243        ScheduleTrigger::Weekly { weekdays, time } => Ok(ScheduleTrigger::Weekly {
244            weekdays: normalize_schedule_weekdays(weekdays)?,
245            time: normalize_schedule_time(&time)?,
246        }),
247    }
248}
249
250fn normalize_schedule_interval(interval: ScheduleInterval) -> Result<ScheduleInterval> {
251    if interval.value == 0 {
252        return Err(KboltError::InvalidInput(
253            "schedule interval must be greater than zero".to_string(),
254        )
255        .into());
256    }
257
258    match interval.unit {
259        ScheduleIntervalUnit::Minutes if interval.value < MIN_SCHEDULE_INTERVAL_MINUTES => {
260            Err(KboltError::InvalidInput(format!(
261                "schedule interval must be at least {MIN_SCHEDULE_INTERVAL_MINUTES} minutes"
262            ))
263            .into())
264        }
265        ScheduleIntervalUnit::Minutes | ScheduleIntervalUnit::Hours => Ok(interval),
266    }
267}
268
269fn normalize_schedule_weekdays(weekdays: Vec<ScheduleWeekday>) -> Result<Vec<ScheduleWeekday>> {
270    let normalized = weekdays.into_iter().collect::<BTreeSet<_>>();
271    if normalized.is_empty() {
272        return Err(KboltError::InvalidInput(
273            "weekly schedules require at least one weekday".to_string(),
274        )
275        .into());
276    }
277
278    Ok(normalized.into_iter().collect())
279}
280
281fn normalize_schedule_time(input: &str) -> Result<String> {
282    let trimmed = input.trim();
283    if trimmed.is_empty() {
284        return Err(KboltError::InvalidInput("schedule time must not be empty".to_string()).into());
285    }
286
287    let collapsed = trimmed
288        .chars()
289        .filter(|ch| !ch.is_whitespace())
290        .collect::<String>()
291        .to_ascii_lowercase();
292
293    let (time_part, meridiem) = if let Some(time) = collapsed.strip_suffix("am") {
294        (time, Some("am"))
295    } else if let Some(time) = collapsed.strip_suffix("pm") {
296        (time, Some("pm"))
297    } else {
298        (collapsed.as_str(), None)
299    };
300
301    if time_part.is_empty() {
302        return Err(invalid_schedule_time(input).into());
303    }
304
305    let (mut hour, minute) = if let Some((hour_part, minute_part)) = time_part.split_once(':') {
306        if minute_part.contains(':') {
307            return Err(invalid_schedule_time(input).into());
308        }
309        (
310            parse_time_component(hour_part, input)?,
311            parse_time_component(minute_part, input)?,
312        )
313    } else {
314        if meridiem.is_none() {
315            return Err(invalid_schedule_time(input).into());
316        }
317        (parse_time_component(time_part, input)?, 0)
318    };
319
320    if minute > 59 {
321        return Err(invalid_schedule_time(input).into());
322    }
323
324    match meridiem {
325        Some("am") => {
326            if hour == 0 || hour > 12 {
327                return Err(invalid_schedule_time(input).into());
328            }
329            if hour == 12 {
330                hour = 0;
331            }
332        }
333        Some("pm") => {
334            if hour == 0 || hour > 12 {
335                return Err(invalid_schedule_time(input).into());
336            }
337            if hour != 12 {
338                hour += 12;
339            }
340        }
341        None => {
342            if hour > 23 {
343                return Err(invalid_schedule_time(input).into());
344            }
345        }
346        Some(_) => unreachable!("only am/pm meridiems are supported"),
347    }
348
349    Ok(format!("{hour:02}:{minute:02}"))
350}
351
352fn parse_time_component(component: &str, input: &str) -> Result<u32> {
353    if component.is_empty() {
354        return Err(invalid_schedule_time(input).into());
355    }
356
357    component
358        .parse::<u32>()
359        .map_err(|_| invalid_schedule_time(input).into())
360}
361
362fn invalid_schedule_time(input: &str) -> KboltError {
363    KboltError::InvalidInput(format!(
364        "invalid schedule time '{input}': use HH:MM, 3pm, or 3:00pm"
365    ))
366}
367
368fn normalize_scope_name(label: &str, name: &str) -> Result<String> {
369    let trimmed = name.trim();
370    if trimmed.is_empty() {
371        return Err(KboltError::InvalidInput(format!("{label} name must not be empty")).into());
372    }
373    Ok(trimmed.to_string())
374}
375
376fn normalize_collection_names(collections: Vec<String>) -> Result<Vec<String>> {
377    let mut normalized = BTreeSet::new();
378    for collection in collections {
379        let trimmed = collection.trim();
380        if trimmed.is_empty() {
381            return Err(
382                KboltError::InvalidInput("collection names must not be empty".to_string()).into(),
383            );
384        }
385        normalized.insert(trimmed.to_string());
386    }
387
388    if normalized.is_empty() {
389        return Err(KboltError::InvalidInput(
390            "collection scope must include at least one collection".to_string(),
391        )
392        .into());
393    }
394
395    Ok(normalized.into_iter().collect())
396}
397
398fn normalize_schedule_id(id: &str) -> Result<String> {
399    let normalized = id.trim().to_ascii_lowercase();
400    if normalized.is_empty() {
401        return Err(KboltError::InvalidInput("schedule id must not be empty".to_string()).into());
402    }
403
404    if schedule_id_number(&normalized).is_none() {
405        return Err(KboltError::InvalidInput(format!("invalid schedule id: {id}")).into());
406    }
407
408    Ok(normalized)
409}
410
411fn ensure_schedule_backend_supported() -> Result<()> {
412    current_schedule_backend().map(|_| ())
413}
414
415#[cfg(test)]
416mod tests {
417    use fs2::FileExt;
418    use std::fs::OpenOptions;
419    use std::mem;
420
421    use tempfile::tempdir;
422
423    use super::Engine;
424    use crate::config::{ChunkingConfig, Config, RankingConfig, ReapingConfig};
425    use crate::schedule_state_store::ScheduleRunStateStore;
426    use crate::schedule_store::ScheduleCatalog;
427    use crate::storage::Storage;
428    use kbolt_types::{
429        AddScheduleRequest, KboltError, RemoveScheduleRequest, RemoveScheduleSelector,
430        ScheduleRunResult, ScheduleRunState, ScheduleScope, ScheduleTrigger,
431    };
432
433    #[test]
434    fn add_schedule_does_not_persist_when_backend_support_check_fails() {
435        let engine = test_engine();
436
437        let err = engine
438            .add_schedule_with_backend_support_check(
439                AddScheduleRequest {
440                    trigger: ScheduleTrigger::Daily {
441                        time: "09:00".to_string(),
442                    },
443                    scope: ScheduleScope::All,
444                },
445                unsupported_schedule_backend,
446            )
447            .expect_err("unsupported backend should fail");
448
449        match KboltError::from(err) {
450            KboltError::InvalidInput(message) => {
451                assert!(
452                    message.contains("schedule is not supported on this platform"),
453                    "unexpected message: {message}"
454                );
455            }
456            other => panic!("unexpected error: {other}"),
457        }
458
459        let catalog =
460            ScheduleCatalog::load(&engine.config.config_dir).expect("load unchanged schedule file");
461        assert_eq!(catalog, ScheduleCatalog::default());
462    }
463
464    #[test]
465    fn remove_schedule_does_not_mutate_when_backend_support_check_fails() {
466        let engine = test_engine();
467        let added = engine
468            .add_schedule(AddScheduleRequest {
469                trigger: ScheduleTrigger::Daily {
470                    time: "09:00".to_string(),
471                },
472                scope: ScheduleScope::All,
473            })
474            .expect("add schedule");
475        let saved_state = ScheduleRunState {
476            last_started: Some("2026-03-07T12:00:00Z".to_string()),
477            last_finished: Some("2026-03-07T12:00:09Z".to_string()),
478            last_result: Some(ScheduleRunResult::Success),
479            last_error: None,
480        };
481        ScheduleRunStateStore::save(&engine.config.cache_dir, &added.schedule.id, &saved_state)
482            .expect("save run state");
483
484        let err = engine
485            .remove_schedule_with_backend_support_check(
486                RemoveScheduleRequest {
487                    selector: RemoveScheduleSelector::Id {
488                        id: added.schedule.id.clone(),
489                    },
490                },
491                unsupported_schedule_backend,
492            )
493            .expect_err("unsupported backend should fail");
494
495        match KboltError::from(err) {
496            KboltError::InvalidInput(message) => {
497                assert!(
498                    message.contains("schedule is not supported on this platform"),
499                    "unexpected message: {message}"
500                );
501            }
502            other => panic!("unexpected error: {other}"),
503        }
504
505        let catalog =
506            ScheduleCatalog::load(&engine.config.config_dir).expect("load unchanged schedule file");
507        assert_eq!(catalog.schedules, vec![added.schedule.clone()]);
508        let loaded_state =
509            ScheduleRunStateStore::load(&engine.config.cache_dir, &added.schedule.id)
510                .expect("load preserved run state");
511        assert_eq!(loaded_state, saved_state);
512    }
513
514    #[test]
515    fn list_schedules_succeeds_while_global_lock_is_held() {
516        let engine = test_engine();
517        let added = engine
518            .add_schedule(AddScheduleRequest {
519                trigger: ScheduleTrigger::Daily {
520                    time: "09:00".to_string(),
521                },
522                scope: ScheduleScope::All,
523            })
524            .expect("add schedule");
525
526        let lock_path = engine.config.cache_dir.join("kbolt.lock");
527        std::fs::create_dir_all(&engine.config.cache_dir).expect("create cache dir");
528        let holder = OpenOptions::new()
529            .read(true)
530            .write(true)
531            .create(true)
532            .truncate(false)
533            .open(&lock_path)
534            .expect("open lock file");
535        FileExt::try_lock_exclusive(&holder).expect("acquire global lock");
536
537        let schedules = engine.list_schedules().expect("list schedules");
538        assert_eq!(schedules, vec![added.schedule]);
539    }
540
541    fn unsupported_schedule_backend() -> crate::Result<()> {
542        Err(
543            KboltError::InvalidInput("schedule is not supported on this platform".to_string())
544                .into(),
545        )
546    }
547
548    fn test_engine() -> Engine {
549        let root = tempdir().expect("create temp root");
550        let root_path = root.path().to_path_buf();
551        mem::forget(root);
552        let config_dir = root_path.join("config");
553        let cache_dir = root_path.join("cache");
554        let storage = Storage::new(&cache_dir).expect("create storage");
555        let config = Config {
556            config_dir,
557            cache_dir,
558            default_space: None,
559            providers: std::collections::HashMap::new(),
560            roles: crate::config::RoleBindingsConfig::default(),
561            reaping: ReapingConfig { days: 7 },
562            chunking: ChunkingConfig::default(),
563            ranking: RankingConfig::default(),
564        };
565        Engine::from_parts(storage, config)
566    }
567}