Skip to main content

kbolt_core/engine/
schedule_ops.rs

1use std::collections::{BTreeSet, HashSet};
2
3use kbolt_types::{
4    AddScheduleRequest, KboltError, RemoveScheduleRequest, RemoveScheduleSelector,
5    ScheduleAddResponse, ScheduleDefinition, ScheduleInterval, ScheduleIntervalUnit,
6    ScheduleRemoveResponse, ScheduleScope, ScheduleTrigger, ScheduleWeekday,
7};
8
9use crate::lock::LockMode;
10use crate::schedule_backend::{current_schedule_backend, reconcile_schedule_backend};
11use crate::schedule_state_store::ScheduleRunStateStore;
12use crate::schedule_store::ScheduleCatalog;
13use crate::schedule_support::{schedule_id_number, schedule_id_sort_key};
14use crate::Result;
15
16use super::Engine;
17
18const MIN_SCHEDULE_INTERVAL_MINUTES: u32 = 5;
19
20impl Engine {
21    pub fn add_schedule(&self, req: AddScheduleRequest) -> Result<ScheduleAddResponse> {
22        self.add_schedule_with_backend_support_check(req, ensure_schedule_backend_supported)
23    }
24
25    fn add_schedule_with_backend_support_check(
26        &self,
27        req: AddScheduleRequest,
28        ensure_backend_supported: fn() -> Result<()>,
29    ) -> Result<ScheduleAddResponse> {
30        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
31        ensure_backend_supported()?;
32        let trigger = normalize_schedule_trigger(req.trigger)?;
33        let scope = self.normalize_schedule_scope(req.scope, true)?;
34        let mut catalog = ScheduleCatalog::load(&self.config.config_dir)?;
35
36        if let Some(existing) = catalog
37            .schedules
38            .iter()
39            .find(|schedule| schedule.trigger == trigger && schedule.scope == scope)
40        {
41            return Err(KboltError::InvalidInput(format!(
42                "schedule already exists: {}",
43                existing.id
44            ))
45            .into());
46        }
47
48        let schedule_id = format!("s{}", catalog.next_id);
49        catalog.next_id = catalog.next_id.checked_add(1).ok_or_else(|| {
50            KboltError::InvalidInput("cannot create schedule: schedule ids exhausted".to_string())
51        })?;
52
53        let schedule = ScheduleDefinition {
54            id: schedule_id,
55            trigger,
56            scope,
57        };
58        catalog.schedules.push(schedule.clone());
59        catalog.save(&self.config.config_dir)?;
60        let backend = match reconcile_schedule_backend(
61            &self.config.config_dir,
62            &self.config.cache_dir,
63            &catalog.schedules,
64        ) {
65            Ok(backend) => backend,
66            Err(err) => {
67                return Err(KboltError::Internal(format!(
68                    "schedule {} was saved, but backend reconcile failed: {err}",
69                    schedule.id
70                ))
71                .into())
72            }
73        };
74
75        Ok(ScheduleAddResponse { schedule, backend })
76    }
77
78    pub fn list_schedules(&self) -> Result<Vec<ScheduleDefinition>> {
79        let _lock = self.acquire_operation_lock(LockMode::Shared)?;
80        let mut schedules = ScheduleCatalog::load(&self.config.config_dir)?.schedules;
81        schedules.sort_by_key(|schedule| schedule_id_sort_key(&schedule.id));
82        Ok(schedules)
83    }
84
85    pub fn remove_schedule(&self, req: RemoveScheduleRequest) -> Result<ScheduleRemoveResponse> {
86        self.remove_schedule_with_backend_support_check(req, ensure_schedule_backend_supported)
87    }
88
89    fn remove_schedule_with_backend_support_check(
90        &self,
91        req: RemoveScheduleRequest,
92        ensure_backend_supported: fn() -> Result<()>,
93    ) -> Result<ScheduleRemoveResponse> {
94        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
95        ensure_backend_supported()?;
96        let mut catalog = ScheduleCatalog::load(&self.config.config_dir)?;
97        let removed_ids = self.resolve_removed_schedule_ids(req.selector, &catalog.schedules)?;
98
99        if removed_ids.is_empty() {
100            reconcile_schedule_backend(
101                &self.config.config_dir,
102                &self.config.cache_dir,
103                &catalog.schedules,
104            )?;
105            return Ok(ScheduleRemoveResponse { removed_ids });
106        }
107
108        let removed = removed_ids.iter().cloned().collect::<HashSet<_>>();
109        catalog
110            .schedules
111            .retain(|schedule| !removed.contains(&schedule.id));
112        catalog.save(&self.config.config_dir)?;
113        for schedule_id in &removed_ids {
114            ScheduleRunStateStore::remove(&self.config.cache_dir, schedule_id)?;
115        }
116        if let Err(err) = reconcile_schedule_backend(
117            &self.config.config_dir,
118            &self.config.cache_dir,
119            &catalog.schedules,
120        ) {
121            return Err(KboltError::Internal(format!(
122                "removed schedules {}, but backend reconcile failed: {err}",
123                removed_ids.join(", ")
124            ))
125            .into());
126        }
127
128        Ok(ScheduleRemoveResponse { removed_ids })
129    }
130
131    fn normalize_schedule_scope(
132        &self,
133        scope: ScheduleScope,
134        validate_targets: bool,
135    ) -> Result<ScheduleScope> {
136        match scope {
137            ScheduleScope::All => Ok(ScheduleScope::All),
138            ScheduleScope::Space { space } => {
139                let normalized_space = normalize_scope_name("space", &space)?;
140                if validate_targets {
141                    let resolved = self.storage.get_space(&normalized_space)?;
142                    return Ok(ScheduleScope::Space {
143                        space: resolved.name,
144                    });
145                }
146
147                Ok(ScheduleScope::Space {
148                    space: normalized_space,
149                })
150            }
151            ScheduleScope::Collections { space, collections } => {
152                let normalized_space = normalize_scope_name("space", &space)?;
153                let normalized_collections = normalize_collection_names(collections)?;
154
155                if validate_targets {
156                    let resolved = self.storage.get_space(&normalized_space)?;
157                    for collection in &normalized_collections {
158                        self.storage.get_collection(resolved.id, collection)?;
159                    }
160                    return Ok(ScheduleScope::Collections {
161                        space: resolved.name,
162                        collections: normalized_collections,
163                    });
164                }
165
166                Ok(ScheduleScope::Collections {
167                    space: normalized_space,
168                    collections: normalized_collections,
169                })
170            }
171        }
172    }
173
174    fn resolve_removed_schedule_ids(
175        &self,
176        selector: RemoveScheduleSelector,
177        schedules: &[ScheduleDefinition],
178    ) -> Result<Vec<String>> {
179        match selector {
180            RemoveScheduleSelector::All => Ok(schedules
181                .iter()
182                .map(|schedule| schedule.id.clone())
183                .collect()),
184            RemoveScheduleSelector::Id { id } => {
185                let normalized_id = normalize_schedule_id(&id)?;
186                if schedules
187                    .iter()
188                    .any(|schedule| schedule.id == normalized_id)
189                {
190                    return Ok(vec![normalized_id]);
191                }
192
193                Err(KboltError::InvalidInput(format!("schedule not found: {normalized_id}")).into())
194            }
195            RemoveScheduleSelector::Scope { scope } => {
196                let normalized_scope = self.normalize_schedule_scope(scope, false)?;
197                let mut matches = schedules
198                    .iter()
199                    .filter(|schedule| schedule.scope == normalized_scope)
200                    .map(|schedule| schedule.id.clone())
201                    .collect::<Vec<_>>();
202                matches.sort_by_key(|id| schedule_id_sort_key(id));
203
204                match matches.len() {
205                    0 => Err(KboltError::InvalidInput(
206                        "no schedules matched the requested scope".to_string(),
207                    )
208                    .into()),
209                    1 => Ok(matches),
210                    _ => Err(KboltError::InvalidInput(format!(
211                        "schedule scope matched multiple schedules: {}",
212                        matches.join(", ")
213                    ))
214                    .into()),
215                }
216            }
217        }
218    }
219}
220
221pub(super) fn load_schedule_definition(
222    config_dir: &std::path::Path,
223    id: &str,
224) -> Result<ScheduleDefinition> {
225    let normalized_id = normalize_schedule_id(id)?;
226    let catalog = ScheduleCatalog::load(config_dir)?;
227    catalog
228        .schedules
229        .into_iter()
230        .find(|schedule| schedule.id == normalized_id)
231        .ok_or_else(|| {
232            KboltError::InvalidInput(format!("schedule not found: {normalized_id}")).into()
233        })
234}
235
236fn normalize_schedule_trigger(trigger: ScheduleTrigger) -> Result<ScheduleTrigger> {
237    match trigger {
238        ScheduleTrigger::Every { interval } => Ok(ScheduleTrigger::Every {
239            interval: normalize_schedule_interval(interval)?,
240        }),
241        ScheduleTrigger::Daily { time } => Ok(ScheduleTrigger::Daily {
242            time: normalize_schedule_time(&time)?,
243        }),
244        ScheduleTrigger::Weekly { weekdays, time } => Ok(ScheduleTrigger::Weekly {
245            weekdays: normalize_schedule_weekdays(weekdays)?,
246            time: normalize_schedule_time(&time)?,
247        }),
248    }
249}
250
251fn normalize_schedule_interval(interval: ScheduleInterval) -> Result<ScheduleInterval> {
252    if interval.value == 0 {
253        return Err(KboltError::InvalidInput(
254            "schedule interval must be greater than zero".to_string(),
255        )
256        .into());
257    }
258
259    match interval.unit {
260        ScheduleIntervalUnit::Minutes if interval.value < MIN_SCHEDULE_INTERVAL_MINUTES => {
261            Err(KboltError::InvalidInput(format!(
262                "schedule interval must be at least {MIN_SCHEDULE_INTERVAL_MINUTES} minutes"
263            ))
264            .into())
265        }
266        ScheduleIntervalUnit::Minutes | ScheduleIntervalUnit::Hours => Ok(interval),
267    }
268}
269
270fn normalize_schedule_weekdays(weekdays: Vec<ScheduleWeekday>) -> Result<Vec<ScheduleWeekday>> {
271    let normalized = weekdays.into_iter().collect::<BTreeSet<_>>();
272    if normalized.is_empty() {
273        return Err(KboltError::InvalidInput(
274            "weekly schedules require at least one weekday".to_string(),
275        )
276        .into());
277    }
278
279    Ok(normalized.into_iter().collect())
280}
281
282fn normalize_schedule_time(input: &str) -> Result<String> {
283    let trimmed = input.trim();
284    if trimmed.is_empty() {
285        return Err(KboltError::InvalidInput("schedule time must not be empty".to_string()).into());
286    }
287
288    let collapsed = trimmed
289        .chars()
290        .filter(|ch| !ch.is_whitespace())
291        .collect::<String>()
292        .to_ascii_lowercase();
293
294    let (time_part, meridiem) = if let Some(time) = collapsed.strip_suffix("am") {
295        (time, Some("am"))
296    } else if let Some(time) = collapsed.strip_suffix("pm") {
297        (time, Some("pm"))
298    } else {
299        (collapsed.as_str(), None)
300    };
301
302    if time_part.is_empty() {
303        return Err(invalid_schedule_time(input).into());
304    }
305
306    let (mut hour, minute) = if let Some((hour_part, minute_part)) = time_part.split_once(':') {
307        if minute_part.contains(':') {
308            return Err(invalid_schedule_time(input).into());
309        }
310        (
311            parse_time_component(hour_part, input)?,
312            parse_time_component(minute_part, input)?,
313        )
314    } else {
315        if meridiem.is_none() {
316            return Err(invalid_schedule_time(input).into());
317        }
318        (parse_time_component(time_part, input)?, 0)
319    };
320
321    if minute > 59 {
322        return Err(invalid_schedule_time(input).into());
323    }
324
325    match meridiem {
326        Some("am") => {
327            if hour == 0 || hour > 12 {
328                return Err(invalid_schedule_time(input).into());
329            }
330            if hour == 12 {
331                hour = 0;
332            }
333        }
334        Some("pm") => {
335            if hour == 0 || hour > 12 {
336                return Err(invalid_schedule_time(input).into());
337            }
338            if hour != 12 {
339                hour += 12;
340            }
341        }
342        None => {
343            if hour > 23 {
344                return Err(invalid_schedule_time(input).into());
345            }
346        }
347        Some(_) => unreachable!("only am/pm meridiems are supported"),
348    }
349
350    Ok(format!("{hour:02}:{minute:02}"))
351}
352
353fn parse_time_component(component: &str, input: &str) -> Result<u32> {
354    if component.is_empty() {
355        return Err(invalid_schedule_time(input).into());
356    }
357
358    component
359        .parse::<u32>()
360        .map_err(|_| invalid_schedule_time(input).into())
361}
362
363fn invalid_schedule_time(input: &str) -> KboltError {
364    KboltError::InvalidInput(format!(
365        "invalid schedule time '{input}': use HH:MM, 3pm, or 3:00pm"
366    ))
367}
368
369fn normalize_scope_name(label: &str, name: &str) -> Result<String> {
370    let trimmed = name.trim();
371    if trimmed.is_empty() {
372        return Err(KboltError::InvalidInput(format!("{label} name must not be empty")).into());
373    }
374    Ok(trimmed.to_string())
375}
376
377fn normalize_collection_names(collections: Vec<String>) -> Result<Vec<String>> {
378    let mut normalized = BTreeSet::new();
379    for collection in collections {
380        let trimmed = collection.trim();
381        if trimmed.is_empty() {
382            return Err(
383                KboltError::InvalidInput("collection names must not be empty".to_string()).into(),
384            );
385        }
386        normalized.insert(trimmed.to_string());
387    }
388
389    if normalized.is_empty() {
390        return Err(KboltError::InvalidInput(
391            "collection scope must include at least one collection".to_string(),
392        )
393        .into());
394    }
395
396    Ok(normalized.into_iter().collect())
397}
398
399fn normalize_schedule_id(id: &str) -> Result<String> {
400    let normalized = id.trim().to_ascii_lowercase();
401    if normalized.is_empty() {
402        return Err(KboltError::InvalidInput("schedule id must not be empty".to_string()).into());
403    }
404
405    if schedule_id_number(&normalized).is_none() {
406        return Err(KboltError::InvalidInput(format!("invalid schedule id: {id}")).into());
407    }
408
409    Ok(normalized)
410}
411
412fn ensure_schedule_backend_supported() -> Result<()> {
413    current_schedule_backend().map(|_| ())
414}
415
416#[cfg(test)]
417mod tests {
418    use std::mem;
419
420    use tempfile::tempdir;
421
422    use super::Engine;
423    use crate::config::{ChunkingConfig, Config, RankingConfig, ReapingConfig};
424    use crate::schedule_state_store::ScheduleRunStateStore;
425    use crate::schedule_store::ScheduleCatalog;
426    use crate::storage::Storage;
427    use kbolt_types::{
428        AddScheduleRequest, KboltError, RemoveScheduleRequest, RemoveScheduleSelector,
429        ScheduleRunResult, ScheduleRunState, ScheduleScope, ScheduleTrigger,
430    };
431
432    #[test]
433    fn add_schedule_does_not_persist_when_backend_support_check_fails() {
434        let engine = test_engine();
435
436        let err = engine
437            .add_schedule_with_backend_support_check(
438                AddScheduleRequest {
439                    trigger: ScheduleTrigger::Daily {
440                        time: "09:00".to_string(),
441                    },
442                    scope: ScheduleScope::All,
443                },
444                unsupported_schedule_backend,
445            )
446            .expect_err("unsupported backend should fail");
447
448        match KboltError::from(err) {
449            KboltError::InvalidInput(message) => {
450                assert!(
451                    message.contains("schedule is not supported on this platform"),
452                    "unexpected message: {message}"
453                );
454            }
455            other => panic!("unexpected error: {other}"),
456        }
457
458        let catalog =
459            ScheduleCatalog::load(&engine.config.config_dir).expect("load unchanged schedule file");
460        assert_eq!(catalog, ScheduleCatalog::default());
461    }
462
463    #[test]
464    fn remove_schedule_does_not_mutate_when_backend_support_check_fails() {
465        let engine = test_engine();
466        let added = engine
467            .add_schedule(AddScheduleRequest {
468                trigger: ScheduleTrigger::Daily {
469                    time: "09:00".to_string(),
470                },
471                scope: ScheduleScope::All,
472            })
473            .expect("add schedule");
474        let saved_state = ScheduleRunState {
475            last_started: Some("2026-03-07T12:00:00Z".to_string()),
476            last_finished: Some("2026-03-07T12:00:09Z".to_string()),
477            last_result: Some(ScheduleRunResult::Success),
478            last_error: None,
479        };
480        ScheduleRunStateStore::save(&engine.config.cache_dir, &added.schedule.id, &saved_state)
481            .expect("save run state");
482
483        let err = engine
484            .remove_schedule_with_backend_support_check(
485                RemoveScheduleRequest {
486                    selector: RemoveScheduleSelector::Id {
487                        id: added.schedule.id.clone(),
488                    },
489                },
490                unsupported_schedule_backend,
491            )
492            .expect_err("unsupported backend should fail");
493
494        match KboltError::from(err) {
495            KboltError::InvalidInput(message) => {
496                assert!(
497                    message.contains("schedule is not supported on this platform"),
498                    "unexpected message: {message}"
499                );
500            }
501            other => panic!("unexpected error: {other}"),
502        }
503
504        let catalog =
505            ScheduleCatalog::load(&engine.config.config_dir).expect("load unchanged schedule file");
506        assert_eq!(catalog.schedules, vec![added.schedule.clone()]);
507        let loaded_state =
508            ScheduleRunStateStore::load(&engine.config.cache_dir, &added.schedule.id)
509                .expect("load preserved run state");
510        assert_eq!(loaded_state, saved_state);
511    }
512
513    fn unsupported_schedule_backend() -> crate::Result<()> {
514        Err(
515            KboltError::InvalidInput("schedule is not supported on this platform".to_string())
516                .into(),
517        )
518    }
519
520    fn test_engine() -> Engine {
521        let root = tempdir().expect("create temp root");
522        let root_path = root.path().to_path_buf();
523        mem::forget(root);
524        let config_dir = root_path.join("config");
525        let cache_dir = root_path.join("cache");
526        let storage = Storage::new(&cache_dir).expect("create storage");
527        let config = Config {
528            config_dir,
529            cache_dir,
530            default_space: None,
531            providers: std::collections::HashMap::new(),
532            roles: crate::config::RoleBindingsConfig::default(),
533            reaping: ReapingConfig { days: 7 },
534            chunking: ChunkingConfig::default(),
535            ranking: RankingConfig::default(),
536        };
537        Engine::from_parts(storage, config)
538    }
539}