1use std::collections::{BTreeSet, HashSet};
2
3use kbolt_types::{
4 AddScheduleRequest, KboltError, RemoveScheduleRequest, RemoveScheduleSelector,
5 ScheduleAddResponse, ScheduleDefinition, ScheduleInterval, ScheduleIntervalUnit,
6 ScheduleRemoveResponse, ScheduleScope, ScheduleTrigger, ScheduleWeekday,
7};
8
9use crate::lock::LockMode;
10use crate::schedule_backend::{current_schedule_backend, reconcile_schedule_backend};
11use crate::schedule_state_store::ScheduleRunStateStore;
12use crate::schedule_store::ScheduleCatalog;
13use crate::schedule_support::{schedule_id_number, schedule_id_sort_key};
14use crate::Result;
15
16use super::Engine;
17
18const MIN_SCHEDULE_INTERVAL_MINUTES: u32 = 5;
19
20impl Engine {
21 pub fn add_schedule(&self, req: AddScheduleRequest) -> Result<ScheduleAddResponse> {
22 self.add_schedule_with_backend_support_check(req, ensure_schedule_backend_supported)
23 }
24
25 fn add_schedule_with_backend_support_check(
26 &self,
27 req: AddScheduleRequest,
28 ensure_backend_supported: fn() -> Result<()>,
29 ) -> Result<ScheduleAddResponse> {
30 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
31 ensure_backend_supported()?;
32 let trigger = normalize_schedule_trigger(req.trigger)?;
33 let scope = self.normalize_schedule_scope(req.scope, true)?;
34 let mut catalog = ScheduleCatalog::load(&self.config.config_dir)?;
35
36 if let Some(existing) = catalog
37 .schedules
38 .iter()
39 .find(|schedule| schedule.trigger == trigger && schedule.scope == scope)
40 {
41 return Err(KboltError::InvalidInput(format!(
42 "schedule already exists: {}",
43 existing.id
44 ))
45 .into());
46 }
47
48 let schedule_id = format!("s{}", catalog.next_id);
49 catalog.next_id = catalog.next_id.checked_add(1).ok_or_else(|| {
50 KboltError::InvalidInput("cannot create schedule: schedule ids exhausted".to_string())
51 })?;
52
53 let schedule = ScheduleDefinition {
54 id: schedule_id,
55 trigger,
56 scope,
57 };
58 catalog.schedules.push(schedule.clone());
59 catalog.save(&self.config.config_dir)?;
60 let backend = match reconcile_schedule_backend(
61 &self.config.config_dir,
62 &self.config.cache_dir,
63 &catalog.schedules,
64 ) {
65 Ok(backend) => backend,
66 Err(err) => {
67 return Err(KboltError::Internal(format!(
68 "schedule {} was saved, but backend reconcile failed: {err}",
69 schedule.id
70 ))
71 .into())
72 }
73 };
74
75 Ok(ScheduleAddResponse { schedule, backend })
76 }
77
78 pub fn list_schedules(&self) -> Result<Vec<ScheduleDefinition>> {
79 let mut schedules = ScheduleCatalog::load(&self.config.config_dir)?.schedules;
80 schedules.sort_by_key(|schedule| schedule_id_sort_key(&schedule.id));
81 Ok(schedules)
82 }
83
84 pub fn remove_schedule(&self, req: RemoveScheduleRequest) -> Result<ScheduleRemoveResponse> {
85 self.remove_schedule_with_backend_support_check(req, ensure_schedule_backend_supported)
86 }
87
88 fn remove_schedule_with_backend_support_check(
89 &self,
90 req: RemoveScheduleRequest,
91 ensure_backend_supported: fn() -> Result<()>,
92 ) -> Result<ScheduleRemoveResponse> {
93 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
94 ensure_backend_supported()?;
95 let mut catalog = ScheduleCatalog::load(&self.config.config_dir)?;
96 let removed_ids = self.resolve_removed_schedule_ids(req.selector, &catalog.schedules)?;
97
98 if removed_ids.is_empty() {
99 reconcile_schedule_backend(
100 &self.config.config_dir,
101 &self.config.cache_dir,
102 &catalog.schedules,
103 )?;
104 return Ok(ScheduleRemoveResponse { removed_ids });
105 }
106
107 let removed = removed_ids.iter().cloned().collect::<HashSet<_>>();
108 catalog
109 .schedules
110 .retain(|schedule| !removed.contains(&schedule.id));
111 catalog.save(&self.config.config_dir)?;
112 for schedule_id in &removed_ids {
113 ScheduleRunStateStore::remove(&self.config.cache_dir, schedule_id)?;
114 }
115 if let Err(err) = reconcile_schedule_backend(
116 &self.config.config_dir,
117 &self.config.cache_dir,
118 &catalog.schedules,
119 ) {
120 return Err(KboltError::Internal(format!(
121 "removed schedules {}, but backend reconcile failed: {err}",
122 removed_ids.join(", ")
123 ))
124 .into());
125 }
126
127 Ok(ScheduleRemoveResponse { removed_ids })
128 }
129
130 fn normalize_schedule_scope(
131 &self,
132 scope: ScheduleScope,
133 validate_targets: bool,
134 ) -> Result<ScheduleScope> {
135 match scope {
136 ScheduleScope::All => Ok(ScheduleScope::All),
137 ScheduleScope::Space { space } => {
138 let normalized_space = normalize_scope_name("space", &space)?;
139 if validate_targets {
140 let resolved = self.storage.get_space(&normalized_space)?;
141 return Ok(ScheduleScope::Space {
142 space: resolved.name,
143 });
144 }
145
146 Ok(ScheduleScope::Space {
147 space: normalized_space,
148 })
149 }
150 ScheduleScope::Collections { space, collections } => {
151 let normalized_space = normalize_scope_name("space", &space)?;
152 let normalized_collections = normalize_collection_names(collections)?;
153
154 if validate_targets {
155 let resolved = self.storage.get_space(&normalized_space)?;
156 for collection in &normalized_collections {
157 self.storage.get_collection(resolved.id, collection)?;
158 }
159 return Ok(ScheduleScope::Collections {
160 space: resolved.name,
161 collections: normalized_collections,
162 });
163 }
164
165 Ok(ScheduleScope::Collections {
166 space: normalized_space,
167 collections: normalized_collections,
168 })
169 }
170 }
171 }
172
173 fn resolve_removed_schedule_ids(
174 &self,
175 selector: RemoveScheduleSelector,
176 schedules: &[ScheduleDefinition],
177 ) -> Result<Vec<String>> {
178 match selector {
179 RemoveScheduleSelector::All => Ok(schedules
180 .iter()
181 .map(|schedule| schedule.id.clone())
182 .collect()),
183 RemoveScheduleSelector::Id { id } => {
184 let normalized_id = normalize_schedule_id(&id)?;
185 if schedules
186 .iter()
187 .any(|schedule| schedule.id == normalized_id)
188 {
189 return Ok(vec![normalized_id]);
190 }
191
192 Err(KboltError::InvalidInput(format!("schedule not found: {normalized_id}")).into())
193 }
194 RemoveScheduleSelector::Scope { scope } => {
195 let normalized_scope = self.normalize_schedule_scope(scope, false)?;
196 let mut matches = schedules
197 .iter()
198 .filter(|schedule| schedule.scope == normalized_scope)
199 .map(|schedule| schedule.id.clone())
200 .collect::<Vec<_>>();
201 matches.sort_by_key(|id| schedule_id_sort_key(id));
202
203 match matches.len() {
204 0 => Err(KboltError::InvalidInput(
205 "no schedules matched the requested scope".to_string(),
206 )
207 .into()),
208 1 => Ok(matches),
209 _ => Err(KboltError::InvalidInput(format!(
210 "schedule scope matched multiple schedules: {}",
211 matches.join(", ")
212 ))
213 .into()),
214 }
215 }
216 }
217 }
218}
219
220pub(super) fn load_schedule_definition(
221 config_dir: &std::path::Path,
222 id: &str,
223) -> Result<ScheduleDefinition> {
224 let normalized_id = normalize_schedule_id(id)?;
225 let catalog = ScheduleCatalog::load(config_dir)?;
226 catalog
227 .schedules
228 .into_iter()
229 .find(|schedule| schedule.id == normalized_id)
230 .ok_or_else(|| {
231 KboltError::InvalidInput(format!("schedule not found: {normalized_id}")).into()
232 })
233}
234
235fn normalize_schedule_trigger(trigger: ScheduleTrigger) -> Result<ScheduleTrigger> {
236 match trigger {
237 ScheduleTrigger::Every { interval } => Ok(ScheduleTrigger::Every {
238 interval: normalize_schedule_interval(interval)?,
239 }),
240 ScheduleTrigger::Daily { time } => Ok(ScheduleTrigger::Daily {
241 time: normalize_schedule_time(&time)?,
242 }),
243 ScheduleTrigger::Weekly { weekdays, time } => Ok(ScheduleTrigger::Weekly {
244 weekdays: normalize_schedule_weekdays(weekdays)?,
245 time: normalize_schedule_time(&time)?,
246 }),
247 }
248}
249
250fn normalize_schedule_interval(interval: ScheduleInterval) -> Result<ScheduleInterval> {
251 if interval.value == 0 {
252 return Err(KboltError::InvalidInput(
253 "schedule interval must be greater than zero".to_string(),
254 )
255 .into());
256 }
257
258 match interval.unit {
259 ScheduleIntervalUnit::Minutes if interval.value < MIN_SCHEDULE_INTERVAL_MINUTES => {
260 Err(KboltError::InvalidInput(format!(
261 "schedule interval must be at least {MIN_SCHEDULE_INTERVAL_MINUTES} minutes"
262 ))
263 .into())
264 }
265 ScheduleIntervalUnit::Minutes | ScheduleIntervalUnit::Hours => Ok(interval),
266 }
267}
268
269fn normalize_schedule_weekdays(weekdays: Vec<ScheduleWeekday>) -> Result<Vec<ScheduleWeekday>> {
270 let normalized = weekdays.into_iter().collect::<BTreeSet<_>>();
271 if normalized.is_empty() {
272 return Err(KboltError::InvalidInput(
273 "weekly schedules require at least one weekday".to_string(),
274 )
275 .into());
276 }
277
278 Ok(normalized.into_iter().collect())
279}
280
281fn normalize_schedule_time(input: &str) -> Result<String> {
282 let trimmed = input.trim();
283 if trimmed.is_empty() {
284 return Err(KboltError::InvalidInput("schedule time must not be empty".to_string()).into());
285 }
286
287 let collapsed = trimmed
288 .chars()
289 .filter(|ch| !ch.is_whitespace())
290 .collect::<String>()
291 .to_ascii_lowercase();
292
293 let (time_part, meridiem) = if let Some(time) = collapsed.strip_suffix("am") {
294 (time, Some("am"))
295 } else if let Some(time) = collapsed.strip_suffix("pm") {
296 (time, Some("pm"))
297 } else {
298 (collapsed.as_str(), None)
299 };
300
301 if time_part.is_empty() {
302 return Err(invalid_schedule_time(input).into());
303 }
304
305 let (mut hour, minute) = if let Some((hour_part, minute_part)) = time_part.split_once(':') {
306 if minute_part.contains(':') {
307 return Err(invalid_schedule_time(input).into());
308 }
309 (
310 parse_time_component(hour_part, input)?,
311 parse_time_component(minute_part, input)?,
312 )
313 } else {
314 if meridiem.is_none() {
315 return Err(invalid_schedule_time(input).into());
316 }
317 (parse_time_component(time_part, input)?, 0)
318 };
319
320 if minute > 59 {
321 return Err(invalid_schedule_time(input).into());
322 }
323
324 match meridiem {
325 Some("am") => {
326 if hour == 0 || hour > 12 {
327 return Err(invalid_schedule_time(input).into());
328 }
329 if hour == 12 {
330 hour = 0;
331 }
332 }
333 Some("pm") => {
334 if hour == 0 || hour > 12 {
335 return Err(invalid_schedule_time(input).into());
336 }
337 if hour != 12 {
338 hour += 12;
339 }
340 }
341 None => {
342 if hour > 23 {
343 return Err(invalid_schedule_time(input).into());
344 }
345 }
346 Some(_) => unreachable!("only am/pm meridiems are supported"),
347 }
348
349 Ok(format!("{hour:02}:{minute:02}"))
350}
351
352fn parse_time_component(component: &str, input: &str) -> Result<u32> {
353 if component.is_empty() {
354 return Err(invalid_schedule_time(input).into());
355 }
356
357 component
358 .parse::<u32>()
359 .map_err(|_| invalid_schedule_time(input).into())
360}
361
362fn invalid_schedule_time(input: &str) -> KboltError {
363 KboltError::InvalidInput(format!(
364 "invalid schedule time '{input}': use HH:MM, 3pm, or 3:00pm"
365 ))
366}
367
368fn normalize_scope_name(label: &str, name: &str) -> Result<String> {
369 let trimmed = name.trim();
370 if trimmed.is_empty() {
371 return Err(KboltError::InvalidInput(format!("{label} name must not be empty")).into());
372 }
373 Ok(trimmed.to_string())
374}
375
376fn normalize_collection_names(collections: Vec<String>) -> Result<Vec<String>> {
377 let mut normalized = BTreeSet::new();
378 for collection in collections {
379 let trimmed = collection.trim();
380 if trimmed.is_empty() {
381 return Err(
382 KboltError::InvalidInput("collection names must not be empty".to_string()).into(),
383 );
384 }
385 normalized.insert(trimmed.to_string());
386 }
387
388 if normalized.is_empty() {
389 return Err(KboltError::InvalidInput(
390 "collection scope must include at least one collection".to_string(),
391 )
392 .into());
393 }
394
395 Ok(normalized.into_iter().collect())
396}
397
398fn normalize_schedule_id(id: &str) -> Result<String> {
399 let normalized = id.trim().to_ascii_lowercase();
400 if normalized.is_empty() {
401 return Err(KboltError::InvalidInput("schedule id must not be empty".to_string()).into());
402 }
403
404 if schedule_id_number(&normalized).is_none() {
405 return Err(KboltError::InvalidInput(format!("invalid schedule id: {id}")).into());
406 }
407
408 Ok(normalized)
409}
410
411fn ensure_schedule_backend_supported() -> Result<()> {
412 current_schedule_backend().map(|_| ())
413}
414
415#[cfg(test)]
416mod tests {
417 use fs2::FileExt;
418 use std::fs::OpenOptions;
419 use std::mem;
420
421 use tempfile::tempdir;
422
423 use super::Engine;
424 use crate::config::{ChunkingConfig, Config, RankingConfig, ReapingConfig};
425 use crate::schedule_state_store::ScheduleRunStateStore;
426 use crate::schedule_store::ScheduleCatalog;
427 use crate::storage::Storage;
428 use kbolt_types::{
429 AddScheduleRequest, KboltError, RemoveScheduleRequest, RemoveScheduleSelector,
430 ScheduleRunResult, ScheduleRunState, ScheduleScope, ScheduleTrigger,
431 };
432
433 #[test]
434 fn add_schedule_does_not_persist_when_backend_support_check_fails() {
435 let engine = test_engine();
436
437 let err = engine
438 .add_schedule_with_backend_support_check(
439 AddScheduleRequest {
440 trigger: ScheduleTrigger::Daily {
441 time: "09:00".to_string(),
442 },
443 scope: ScheduleScope::All,
444 },
445 unsupported_schedule_backend,
446 )
447 .expect_err("unsupported backend should fail");
448
449 match KboltError::from(err) {
450 KboltError::InvalidInput(message) => {
451 assert!(
452 message.contains("schedule is not supported on this platform"),
453 "unexpected message: {message}"
454 );
455 }
456 other => panic!("unexpected error: {other}"),
457 }
458
459 let catalog =
460 ScheduleCatalog::load(&engine.config.config_dir).expect("load unchanged schedule file");
461 assert_eq!(catalog, ScheduleCatalog::default());
462 }
463
464 #[test]
465 fn remove_schedule_does_not_mutate_when_backend_support_check_fails() {
466 let engine = test_engine();
467 let added = engine
468 .add_schedule(AddScheduleRequest {
469 trigger: ScheduleTrigger::Daily {
470 time: "09:00".to_string(),
471 },
472 scope: ScheduleScope::All,
473 })
474 .expect("add schedule");
475 let saved_state = ScheduleRunState {
476 last_started: Some("2026-03-07T12:00:00Z".to_string()),
477 last_finished: Some("2026-03-07T12:00:09Z".to_string()),
478 last_result: Some(ScheduleRunResult::Success),
479 last_error: None,
480 };
481 ScheduleRunStateStore::save(&engine.config.cache_dir, &added.schedule.id, &saved_state)
482 .expect("save run state");
483
484 let err = engine
485 .remove_schedule_with_backend_support_check(
486 RemoveScheduleRequest {
487 selector: RemoveScheduleSelector::Id {
488 id: added.schedule.id.clone(),
489 },
490 },
491 unsupported_schedule_backend,
492 )
493 .expect_err("unsupported backend should fail");
494
495 match KboltError::from(err) {
496 KboltError::InvalidInput(message) => {
497 assert!(
498 message.contains("schedule is not supported on this platform"),
499 "unexpected message: {message}"
500 );
501 }
502 other => panic!("unexpected error: {other}"),
503 }
504
505 let catalog =
506 ScheduleCatalog::load(&engine.config.config_dir).expect("load unchanged schedule file");
507 assert_eq!(catalog.schedules, vec![added.schedule.clone()]);
508 let loaded_state =
509 ScheduleRunStateStore::load(&engine.config.cache_dir, &added.schedule.id)
510 .expect("load preserved run state");
511 assert_eq!(loaded_state, saved_state);
512 }
513
514 #[test]
515 fn list_schedules_succeeds_while_global_lock_is_held() {
516 let engine = test_engine();
517 let added = engine
518 .add_schedule(AddScheduleRequest {
519 trigger: ScheduleTrigger::Daily {
520 time: "09:00".to_string(),
521 },
522 scope: ScheduleScope::All,
523 })
524 .expect("add schedule");
525
526 let lock_path = engine.config.cache_dir.join("kbolt.lock");
527 std::fs::create_dir_all(&engine.config.cache_dir).expect("create cache dir");
528 let holder = OpenOptions::new()
529 .read(true)
530 .write(true)
531 .create(true)
532 .truncate(false)
533 .open(&lock_path)
534 .expect("open lock file");
535 FileExt::try_lock_exclusive(&holder).expect("acquire global lock");
536
537 let schedules = engine.list_schedules().expect("list schedules");
538 assert_eq!(schedules, vec![added.schedule]);
539 }
540
541 fn unsupported_schedule_backend() -> crate::Result<()> {
542 Err(
543 KboltError::InvalidInput("schedule is not supported on this platform".to_string())
544 .into(),
545 )
546 }
547
548 fn test_engine() -> Engine {
549 let root = tempdir().expect("create temp root");
550 let root_path = root.path().to_path_buf();
551 mem::forget(root);
552 let config_dir = root_path.join("config");
553 let cache_dir = root_path.join("cache");
554 let storage = Storage::new(&cache_dir).expect("create storage");
555 let config = Config {
556 config_dir,
557 cache_dir,
558 default_space: None,
559 providers: std::collections::HashMap::new(),
560 roles: crate::config::RoleBindingsConfig::default(),
561 reaping: ReapingConfig { days: 7 },
562 chunking: ChunkingConfig::default(),
563 ranking: RankingConfig::default(),
564 };
565 Engine::from_parts(storage, config)
566 }
567}