Skip to main content

meerkat_mobkit/runtime/
scheduling.rs

1//! Scheduling subsystem — schedule dispatch, tick evaluation, and module-backed execution.
2
3use super::module_boundary::{
4    CORE_MODULE_MCP_TIMEOUT, SCHEDULING_DISPATCH_MCP_TOOL, call_module_mcp_tool_json,
5    mcp_required_error, module_uses_mcp,
6};
7use super::*;
8
9pub fn evaluate_schedules_at_tick(
10    schedules: &[ScheduleDefinition],
11    tick_ms: u64,
12) -> Result<ScheduleEvaluation, ScheduleValidationError> {
13    validate_schedule_tick_ms_supported(tick_ms)?;
14    validate_schedules(schedules)?;
15    let mut due_triggers = Vec::new();
16    for schedule in schedules.iter().filter(|s| s.enabled) {
17        let canonical_schedule_id = canonical_schedule_id(&schedule.schedule_id);
18        let interval = parse_schedule_interval(&schedule.interval).ok_or_else(|| {
19            ScheduleValidationError::InvalidInterval {
20                schedule_id: canonical_schedule_id.clone(),
21                interval: schedule.interval.clone(),
22            }
23        })?;
24        let timezone = parse_schedule_timezone(&schedule.timezone).ok_or_else(|| {
25            ScheduleValidationError::InvalidTimezone {
26                schedule_id: canonical_schedule_id.clone(),
27                timezone: schedule.timezone.clone(),
28            }
29        })?;
30        let Some(due_tick_ms) = latest_due_tick_at_or_before(
31            &canonical_schedule_id,
32            &interval,
33            &timezone,
34            schedule.jitter_ms,
35            tick_ms,
36        ) else {
37            continue;
38        };
39        if due_tick_ms != tick_ms {
40            continue;
41        }
42        due_triggers.push(ScheduleTrigger {
43            schedule_id: canonical_schedule_id,
44            interval: schedule.interval.clone(),
45            timezone: schedule.timezone.clone(),
46            due_tick_ms,
47        });
48    }
49
50    due_triggers.sort_by(|left, right| {
51        left.due_tick_ms
52            .cmp(&right.due_tick_ms)
53            .then_with(|| left.schedule_id.cmp(&right.schedule_id))
54            .then_with(|| left.interval.cmp(&right.interval))
55            .then_with(|| left.timezone.cmp(&right.timezone))
56    });
57
58    Ok(ScheduleEvaluation {
59        tick_ms,
60        due_triggers,
61    })
62}
63
64pub(crate) fn validate_schedules(
65    schedules: &[ScheduleDefinition],
66) -> Result<(), ScheduleValidationError> {
67    let mut seen = BTreeSet::new();
68    for schedule in schedules {
69        let canonical_schedule_id = canonical_schedule_id(&schedule.schedule_id);
70        if canonical_schedule_id.is_empty() {
71            return Err(ScheduleValidationError::EmptyScheduleId);
72        }
73        if !seen.insert(canonical_schedule_id.clone()) {
74            return Err(ScheduleValidationError::DuplicateScheduleId(
75                canonical_schedule_id,
76            ));
77        }
78        if parse_schedule_interval(&schedule.interval).is_none() {
79            return Err(ScheduleValidationError::InvalidInterval {
80                schedule_id: canonical_schedule_id,
81                interval: schedule.interval.clone(),
82            });
83        }
84        if parse_schedule_timezone(&schedule.timezone).is_none() {
85            return Err(ScheduleValidationError::InvalidTimezone {
86                schedule_id: canonical_schedule_id,
87                timezone: schedule.timezone.clone(),
88            });
89        }
90    }
91    Ok(())
92}
93
94impl MobkitRuntimeHandle {
95    fn parse_scheduling_runtime_injection_response(
96        response: Value,
97    ) -> Result<Option<(String, String)>, RuntimeBoundaryError> {
98        let Some(injection) = response
99            .as_object()
100            .and_then(|payload| payload.get("runtime_injection"))
101            .and_then(Value::as_object)
102            .cloned()
103        else {
104            return Ok(None);
105        };
106        let member_id = injection
107            .get("member_id")
108            .and_then(Value::as_str)
109            .map(str::trim)
110            .filter(|value| !value.is_empty())
111            .ok_or_else(|| {
112                RuntimeBoundaryError::Mcp(McpBoundaryError::InvalidToolPayload {
113                    module_id: "scheduling".to_string(),
114                    tool: SCHEDULING_DISPATCH_MCP_TOOL.to_string(),
115                    reason: "runtime_injection.member_id must be a non-empty string".to_string(),
116                })
117            })?;
118        let message = injection
119            .get("message")
120            .and_then(Value::as_str)
121            .map(str::trim)
122            .filter(|value| !value.is_empty())
123            .ok_or_else(|| {
124                RuntimeBoundaryError::Mcp(McpBoundaryError::InvalidToolPayload {
125                    module_id: "scheduling".to_string(),
126                    tool: SCHEDULING_DISPATCH_MCP_TOOL.to_string(),
127                    reason: "runtime_injection.message must be a non-empty string".to_string(),
128                })
129            })?;
130        Ok(Some((member_id.to_string(), message.to_string())))
131    }
132
133    fn scheduling_runtime_injection_for_dispatch(
134        &self,
135        schedule_id: &str,
136        interval: &str,
137        timezone: &str,
138        due_tick_ms: u64,
139        tick_ms: u64,
140        claim_key: &str,
141    ) -> Result<Option<(String, String)>, RuntimeBoundaryError> {
142        let Some((scheduling_module, pre_spawn)) = self.module_and_prespawn("scheduling") else {
143            return Ok(None);
144        };
145        if !self.is_module_loaded("scheduling") {
146            return Ok(None);
147        }
148        if !module_uses_mcp(scheduling_module, pre_spawn) {
149            return Err(mcp_required_error(
150                "scheduling",
151                SCHEDULING_DISPATCH_MCP_TOOL,
152            ));
153        }
154        let response = call_module_mcp_tool_json(
155            scheduling_module,
156            pre_spawn,
157            SCHEDULING_DISPATCH_MCP_TOOL,
158            &serde_json::json!({
159                "schedule_id": schedule_id,
160                "interval": interval,
161                "timezone": timezone,
162                "due_tick_ms": due_tick_ms,
163                "tick_ms": tick_ms,
164                "claim_key": claim_key,
165            }),
166            CORE_MODULE_MCP_TIMEOUT,
167        )?;
168        Self::parse_scheduling_runtime_injection_response(response)
169    }
170
171    fn next_scheduling_dispatch_sequence(&mut self) -> u64 {
172        Self::next_sequence(&mut self.scheduling_dispatch_sequence)
173    }
174    pub fn evaluate_schedule_tick(
175        &self,
176        schedules: &[ScheduleDefinition],
177        tick_ms: u64,
178    ) -> Result<ScheduleEvaluation, ScheduleValidationError> {
179        evaluate_schedules_at_tick(schedules, tick_ms)
180    }
181
182    pub fn dispatch_schedule_tick(
183        &mut self,
184        schedules: &[ScheduleDefinition],
185        tick_ms: u64,
186    ) -> Result<ScheduleDispatchReport, ScheduleValidationError> {
187        validate_schedule_tick_ms_supported(tick_ms)?;
188        validate_schedules(schedules)?;
189        self.prune_schedule_claims(tick_ms);
190        self.prune_scheduling_last_due_ticks(tick_ms);
191        let mut due_triggers = Vec::new();
192        for schedule in schedules.iter().filter(|s| s.enabled) {
193            let canonical_schedule_id = canonical_schedule_id(&schedule.schedule_id);
194            let interval = parse_schedule_interval(&schedule.interval).ok_or_else(|| {
195                ScheduleValidationError::InvalidInterval {
196                    schedule_id: canonical_schedule_id.clone(),
197                    interval: schedule.interval.clone(),
198                }
199            })?;
200            let timezone = parse_schedule_timezone(&schedule.timezone).ok_or_else(|| {
201                ScheduleValidationError::InvalidTimezone {
202                    schedule_id: canonical_schedule_id.clone(),
203                    timezone: schedule.timezone.clone(),
204                }
205            })?;
206            let Some(due_tick_ms) = latest_due_tick_at_or_before(
207                &canonical_schedule_id,
208                &interval,
209                &timezone,
210                schedule.jitter_ms,
211                tick_ms,
212            ) else {
213                continue;
214            };
215            let last_due_tick = self
216                .scheduling_last_due_ticks
217                .get(&canonical_schedule_id)
218                .copied();
219            if schedule.catch_up {
220                if last_due_tick.is_some_and(|last| last >= due_tick_ms) {
221                    continue;
222                }
223            } else if last_due_tick
224                .is_some_and(|last| last >= due_tick_ms && due_tick_ms != tick_ms)
225            {
226                continue;
227            }
228            due_triggers.push((schedule, canonical_schedule_id, due_tick_ms));
229        }
230        due_triggers.sort_by(
231            |(left_schedule, left_schedule_id, left_due_tick),
232             (right_schedule, right_schedule_id, right_due_tick)| {
233                left_due_tick
234                    .cmp(right_due_tick)
235                    .then_with(|| left_schedule_id.cmp(right_schedule_id))
236                    .then_with(|| left_schedule.interval.cmp(&right_schedule.interval))
237                    .then_with(|| left_schedule.timezone.cmp(&right_schedule.timezone))
238            },
239        );
240        let mut dispatched = Vec::new();
241        let mut skipped_claims = Vec::new();
242        let scheduling_signal = self.scheduling_supervisor_signal();
243        let mut supervisor_restart_emitted = false;
244
245        for (trigger, canonical_schedule_id, due_tick_ms) in &due_triggers {
246            let claim_key = format!("{canonical_schedule_id}:{due_tick_ms}");
247            if !self.record_schedule_claim(claim_key.clone(), tick_ms) {
248                skipped_claims.push(claim_key);
249                continue;
250            }
251            self.scheduling_last_due_ticks
252                .insert(canonical_schedule_id.clone(), *due_tick_ms);
253            self.prune_scheduling_last_due_ticks(tick_ms);
254
255            let event_sequence = self.next_scheduling_dispatch_sequence();
256            let event_id =
257                format!("evt-schedule-{canonical_schedule_id}-{due_tick_ms}-{event_sequence}");
258            insert_event_sorted(
259                &mut self.merged_events,
260                EventEnvelope {
261                    event_id: event_id.clone(),
262                    source: "module".to_string(),
263                    timestamp_ms: tick_ms,
264                    event: UnifiedEvent::Module(ModuleEvent {
265                        module: "scheduling".to_string(),
266                        event_type: "dispatch".to_string(),
267                        payload: serde_json::json!({
268                            "schedule_id": canonical_schedule_id,
269                            "interval": trigger.interval,
270                            "timezone": trigger.timezone,
271                            "tick_ms": tick_ms,
272                            "due_tick_ms": due_tick_ms,
273                            "claim_key": claim_key,
274                            "supervisor_signal": scheduling_signal,
275                        }),
276                    }),
277                },
278            );
279
280            if let Some(signal) = &scheduling_signal
281                && signal.restart_observed
282                && !supervisor_restart_emitted
283            {
284                insert_event_sorted(
285                    &mut self.merged_events,
286                    EventEnvelope {
287                        event_id: format!("evt-scheduling-supervisor-{tick_ms}-{event_sequence}"),
288                        source: "module".to_string(),
289                        timestamp_ms: tick_ms,
290                        event: UnifiedEvent::Module(ModuleEvent {
291                            module: "scheduling".to_string(),
292                            event_type: "supervisor.restart".to_string(),
293                            payload: serde_json::json!({
294                                "module_id": signal.module_id,
295                                "latest_state": signal.latest_state,
296                                "latest_attempt": signal.latest_attempt,
297                                "restart_observed": signal.restart_observed,
298                            }),
299                        }),
300                    },
301                );
302                supervisor_restart_emitted = true;
303            }
304
305            let mut runtime_injection = None;
306            let mut runtime_injection_error = None;
307            match self.scheduling_runtime_injection_for_dispatch(
308                canonical_schedule_id,
309                &trigger.interval,
310                &trigger.timezone,
311                *due_tick_ms,
312                tick_ms,
313                &claim_key,
314            ) {
315                Ok(Some((member_id, message))) => {
316                    let injection_event_id =
317                        format!("evt-runtime-injection-{tick_ms}-{event_sequence}");
318                    insert_event_sorted(
319                        &mut self.merged_events,
320                        EventEnvelope {
321                            event_id: injection_event_id.clone(),
322                            source: "module".to_string(),
323                            timestamp_ms: tick_ms,
324                            event: UnifiedEvent::Module(ModuleEvent {
325                                module: "runtime".to_string(),
326                                event_type: "injection.dispatch".to_string(),
327                                payload: serde_json::json!({
328                                    "schedule_id": canonical_schedule_id,
329                                    "claim_key": claim_key,
330                                    "member_id": member_id,
331                                    "message": message,
332                                }),
333                            }),
334                        },
335                    );
336                    runtime_injection = Some(ScheduleRuntimeInjection {
337                        member_id,
338                        message,
339                        injection_event_id,
340                    });
341                }
342                Ok(None) => {}
343                Err(error) => {
344                    runtime_injection_error = Some(format!("{error:?}"));
345                    insert_event_sorted(
346                        &mut self.merged_events,
347                        EventEnvelope {
348                            event_id: format!(
349                                "evt-runtime-injection-failed-{tick_ms}-{event_sequence}"
350                            ),
351                            source: "module".to_string(),
352                            timestamp_ms: tick_ms,
353                            event: UnifiedEvent::Module(ModuleEvent {
354                                module: "runtime".to_string(),
355                                event_type: "runtime.injection.failed".to_string(),
356                                payload: serde_json::json!({
357                                    "schedule_id": canonical_schedule_id,
358                                    "claim_key": claim_key,
359                                    "error": format!("{error:?}"),
360                                }),
361                            }),
362                        },
363                    );
364                }
365            }
366
367            dispatched.push(ScheduleDispatch {
368                claim_key,
369                schedule_id: canonical_schedule_id.clone(),
370                interval: trigger.interval.clone(),
371                timezone: trigger.timezone.clone(),
372                due_tick_ms: *due_tick_ms,
373                tick_ms,
374                event_id,
375                supervisor_signal: scheduling_signal.clone(),
376                runtime_injection,
377                runtime_injection_error,
378            });
379        }
380
381        Ok(ScheduleDispatchReport {
382            tick_ms,
383            due_count: due_triggers.len(),
384            dispatched,
385            skipped_claims,
386        })
387    }
388    fn record_schedule_claim(&mut self, claim_key: String, tick_ms: u64) -> bool {
389        if !self.scheduling_claims.insert(claim_key.clone()) {
390            return false;
391        }
392        self.scheduling_claim_ticks
393            .entry(tick_ms)
394            .or_default()
395            .push(claim_key);
396        true
397    }
398
399    fn prune_schedule_claims(&mut self, current_tick_ms: u64) {
400        let cutoff_tick = current_tick_ms.saturating_sub(SCHEDULING_CLAIM_RETENTION_WINDOW_MS);
401        let expired_ticks = self
402            .scheduling_claim_ticks
403            .keys()
404            .copied()
405            .take_while(|tick| *tick < cutoff_tick)
406            .collect::<Vec<_>>();
407        for tick in expired_ticks {
408            if let Some(keys) = self.scheduling_claim_ticks.remove(&tick) {
409                for key in keys {
410                    self.scheduling_claims.remove(&key);
411                }
412            }
413        }
414
415        while self.scheduling_claims.len() > SCHEDULING_CLAIMS_MAX_RETAINED {
416            let Some(oldest_tick) = self.scheduling_claim_ticks.keys().next().copied() else {
417                break;
418            };
419            if let Some(keys) = self.scheduling_claim_ticks.remove(&oldest_tick) {
420                for key in keys {
421                    self.scheduling_claims.remove(&key);
422                }
423            } else {
424                break;
425            }
426        }
427    }
428
429    fn prune_scheduling_last_due_ticks(&mut self, current_tick_ms: u64) {
430        let cutoff_tick = current_tick_ms.saturating_sub(SCHEDULING_CLAIM_RETENTION_WINDOW_MS);
431        self.scheduling_last_due_ticks
432            .retain(|_, due_tick| *due_tick >= cutoff_tick);
433
434        while self.scheduling_last_due_ticks.len() > SCHEDULING_LAST_DUE_MAX_RETAINED {
435            let Some(oldest_schedule_id) = self
436                .scheduling_last_due_ticks
437                .iter()
438                .min_by(|(left_id, left_due), (right_id, right_due)| {
439                    left_due.cmp(right_due).then_with(|| left_id.cmp(right_id))
440                })
441                .map(|(schedule_id, _)| schedule_id.clone())
442            else {
443                break;
444            };
445            self.scheduling_last_due_ticks.remove(&oldest_schedule_id);
446        }
447    }
448    fn scheduling_supervisor_signal(&self) -> Option<SchedulingSupervisorSignal> {
449        let module_transitions = self
450            .supervisor_report
451            .transitions
452            .iter()
453            .filter(|transition| transition.module_id == "scheduling")
454            .collect::<Vec<_>>();
455        let latest = module_transitions.last()?;
456        let restart_observed = module_transitions
457            .iter()
458            .any(|transition| transition.to == ModuleHealthState::Restarting);
459        Some(SchedulingSupervisorSignal {
460            module_id: latest.module_id.clone(),
461            latest_state: latest.to.clone(),
462            latest_attempt: latest.attempt,
463            restart_observed,
464        })
465    }
466}
467
468#[derive(Debug, Clone, PartialEq, Eq)]
469enum ParsedInterval {
470    Marker { interval_ms: u64 },
471    Cron(CronExpression),
472}
473
474impl ParsedInterval {
475    fn jitter_base_interval_ms(&self) -> u64 {
476        match self {
477            Self::Marker { interval_ms } => *interval_ms,
478            // Five-field cron expressions are minute-based.
479            Self::Cron(_) => 60_000,
480        }
481    }
482}
483
484#[derive(Debug, Clone, PartialEq, Eq)]
485enum ParsedTimezone {
486    FixedOffsetMs(i64),
487    Iana(chrono_tz::Tz),
488}
489
490#[derive(Debug, Clone, PartialEq, Eq)]
491struct CronExpression {
492    minute: CronFieldSet,
493    hour: CronFieldSet,
494    day_of_month: CronFieldSet,
495    month: CronFieldSet,
496    day_of_week: CronFieldSet,
497}
498
499#[derive(Debug, Clone, PartialEq, Eq)]
500struct CronFieldSet {
501    any: bool,
502    min: u32,
503    allowed: Vec<bool>,
504}
505
506impl CronExpression {
507    fn parse(expression: &str) -> Option<Self> {
508        let fields = expression.split_whitespace().collect::<Vec<_>>();
509        if fields.len() != 5 {
510            return None;
511        }
512        let parsed = Self {
513            minute: parse_cron_field(fields[0], 0, 59, false)?,
514            hour: parse_cron_field(fields[1], 0, 23, false)?,
515            day_of_month: parse_cron_field(fields[2], 1, 31, false)?,
516            month: parse_cron_field(fields[3], 1, 12, false)?,
517            day_of_week: parse_cron_field(fields[4], 0, 7, true)?,
518        };
519
520        // Keep standard DOM/DOW OR semantics. Only reject expressions that can never fire
521        // when day-of-week is wildcard and the selected day-of-month never exists in selected months.
522        if parsed.day_of_week.any
523            && !parsed.day_of_month.any
524            && !parsed.has_possible_day_of_month_for_selected_months()
525        {
526            return None;
527        }
528
529        Some(parsed)
530    }
531
532    fn matches(&self, local: &LocalDateTimeFields) -> bool {
533        if !self.minute.matches(local.minute)
534            || !self.hour.matches(local.hour)
535            || !self.month.matches(local.month)
536        {
537            return false;
538        }
539
540        let dom_match = self.day_of_month.matches(local.day_of_month);
541        let dow_match = self.day_of_week.matches(local.day_of_week);
542
543        if self.day_of_month.any && self.day_of_week.any {
544            true
545        } else if self.day_of_month.any {
546            dow_match
547        } else if self.day_of_week.any {
548            dom_match
549        } else {
550            dom_match || dow_match
551        }
552    }
553
554    fn has_possible_day_of_month_for_selected_months(&self) -> bool {
555        for month in 1..=12 {
556            if !self.month.matches(month) {
557                continue;
558            }
559            let max_day = max_day_for_month_with_feb_29(month);
560            for day in 1..=max_day {
561                if self.day_of_month.matches(day) {
562                    return true;
563                }
564            }
565        }
566        false
567    }
568}
569
570impl CronFieldSet {
571    fn matches(&self, value: u32) -> bool {
572        if value < self.min {
573            return false;
574        }
575        let idx = (value - self.min) as usize;
576        self.allowed.get(idx).copied().unwrap_or(false)
577    }
578}
579
580#[derive(Debug, Clone, PartialEq, Eq)]
581struct LocalDateTimeFields {
582    minute: u32,
583    hour: u32,
584    day_of_month: u32,
585    month: u32,
586    day_of_week: u32,
587    second: u32,
588    subsec_nanos: u32,
589}
590
591fn parse_cron_field(
592    field: &str,
593    min: u32,
594    max: u32,
595    map_sunday_seven_to_zero: bool,
596) -> Option<CronFieldSet> {
597    let mut allowed = vec![false; (max - min + 1) as usize];
598
599    for raw_token in field.split(',') {
600        let token = raw_token.trim();
601        if token.is_empty() {
602            return None;
603        }
604        let (base, step) = match token.split_once('/') {
605            Some((base, step)) => {
606                let step = step.parse::<u32>().ok()?;
607                if step == 0 {
608                    return None;
609                }
610                (base.trim(), step)
611            }
612            None => (token, 1),
613        };
614
615        if base == "*" {
616            let mut value = min;
617            while value <= max {
618                let mapped = normalize_cron_value(value, map_sunday_seven_to_zero);
619                let idx = (mapped - min) as usize;
620                allowed[idx] = true;
621                match value.checked_add(step) {
622                    Some(next) => value = next,
623                    None => break,
624                }
625            }
626            continue;
627        }
628
629        if let Some((start, end)) = base.split_once('-') {
630            let start = parse_cron_raw_value(start.trim(), min, max)?;
631            let end = parse_cron_raw_value(end.trim(), min, max)?;
632            if start > end {
633                return None;
634            }
635            let mut value = start;
636            while value <= end {
637                let mapped = normalize_cron_value(value, map_sunday_seven_to_zero);
638                let idx = (mapped - min) as usize;
639                allowed[idx] = true;
640                match value.checked_add(step) {
641                    Some(next) => value = next,
642                    None => break,
643                }
644            }
645            continue;
646        }
647
648        let value = parse_cron_value(base, min, max, map_sunday_seven_to_zero)?;
649        let idx = (value - min) as usize;
650        allowed[idx] = true;
651    }
652
653    if allowed.iter().all(|allowed| !allowed) {
654        return None;
655    }
656
657    let any = cron_field_is_semantic_wildcard(min, max, map_sunday_seven_to_zero, &allowed);
658    Some(CronFieldSet { any, min, allowed })
659}
660
661fn cron_field_is_semantic_wildcard(
662    min: u32,
663    max: u32,
664    map_sunday_seven_to_zero: bool,
665    allowed: &[bool],
666) -> bool {
667    let mut covered = vec![false; allowed.len()];
668    for raw in min..=max {
669        let mapped = normalize_cron_value(raw, map_sunday_seven_to_zero);
670        if mapped < min || mapped > max {
671            return false;
672        }
673        let mapped_idx = (mapped - min) as usize;
674        covered[mapped_idx] = true;
675    }
676
677    covered
678        .iter()
679        .enumerate()
680        .filter(|(_, is_semantic_value)| **is_semantic_value)
681        .all(|(idx, _)| allowed.get(idx).copied().unwrap_or(false))
682}
683
684fn parse_cron_value(raw: &str, min: u32, max: u32, map_sunday_seven_to_zero: bool) -> Option<u32> {
685    let value = normalize_cron_value(
686        parse_cron_raw_value(raw, min, max)?,
687        map_sunday_seven_to_zero,
688    );
689    if value < min || value > max {
690        return None;
691    }
692    Some(value)
693}
694
695fn parse_cron_raw_value(raw: &str, min: u32, max: u32) -> Option<u32> {
696    let value = raw.parse::<u32>().ok()?;
697    if value < min || value > max {
698        return None;
699    }
700    Some(value)
701}
702
703fn normalize_cron_value(value: u32, map_sunday_seven_to_zero: bool) -> u32 {
704    if map_sunday_seven_to_zero && value == 7 {
705        0
706    } else {
707        value
708    }
709}
710
711fn max_day_for_month_with_feb_29(month: u32) -> u32 {
712    match month {
713        2 => 29,
714        4 | 6 | 9 | 11 => 30,
715        _ => 31,
716    }
717}
718
719fn parse_interval_marker_ms(interval: &str) -> Option<u64> {
720    let marker = interval.trim().to_ascii_lowercase();
721    let marker = marker.strip_prefix("*/")?;
722    if marker.len() < 2 {
723        return None;
724    }
725    let (count_part, unit_part) = marker.split_at(marker.len() - 1);
726    let count = count_part.parse::<u64>().ok()?;
727    if count == 0 {
728        return None;
729    }
730    let unit_ms = match unit_part {
731        "s" => 1_000,
732        "m" => 60_000,
733        "h" => 3_600_000,
734        "d" => 86_400_000,
735        _ => return None,
736    };
737    count.checked_mul(unit_ms)
738}
739
740fn parse_schedule_interval(interval: &str) -> Option<ParsedInterval> {
741    parse_interval_marker_ms(interval)
742        .map(|interval_ms| ParsedInterval::Marker { interval_ms })
743        .or_else(|| CronExpression::parse(interval.trim()).map(ParsedInterval::Cron))
744}
745
746fn deterministic_jitter_offset_ms(schedule_id: &str, jitter_ms: u64, interval_ms: u64) -> u64 {
747    if jitter_ms == 0 || interval_ms <= 1 {
748        return 0;
749    }
750    let mut hash = 1_469_598_103_934_665_603_u64;
751    for byte in schedule_id.bytes() {
752        hash ^= byte as u64;
753        hash = hash.wrapping_mul(1_099_511_628_211);
754    }
755    let max_jitter = jitter_ms.min(interval_ms.saturating_sub(1));
756    hash % (max_jitter + 1)
757}
758
759fn parse_schedule_timezone(timezone: &str) -> Option<ParsedTimezone> {
760    let timezone = timezone.trim();
761    if timezone.is_empty() {
762        return None;
763    }
764    parse_timezone_offset_ms(timezone)
765        .map(ParsedTimezone::FixedOffsetMs)
766        .or_else(|| {
767            timezone
768                .parse::<chrono_tz::Tz>()
769                .ok()
770                .map(ParsedTimezone::Iana)
771        })
772}
773
774fn parse_timezone_offset_ms(timezone: &str) -> Option<i64> {
775    let tz = timezone.trim();
776    if tz.is_empty() {
777        return None;
778    }
779    if tz.eq_ignore_ascii_case("utc") || tz == "Z" {
780        return Some(0);
781    }
782    let offset = tz
783        .strip_prefix("UTC")
784        .or_else(|| tz.strip_prefix("utc"))
785        .or_else(|| tz.strip_prefix("GMT"))
786        .or_else(|| tz.strip_prefix("gmt"))
787        .unwrap_or(tz);
788    parse_hhmm_offset(offset)
789}
790
791fn parse_hhmm_offset(offset: &str) -> Option<i64> {
792    if offset.is_empty() {
793        return Some(0);
794    }
795    let sign = if offset.starts_with('+') {
796        1_i64
797    } else if offset.starts_with('-') {
798        -1_i64
799    } else {
800        return None;
801    };
802    let body = &offset[1..];
803    let (hours, minutes) = if let Some((h, m)) = body.split_once(':') {
804        (h, m)
805    } else if body.len() == 4 {
806        body.split_at(2)
807    } else {
808        return None;
809    };
810    let hours = hours.parse::<i64>().ok()?;
811    let minutes = minutes.parse::<i64>().ok()?;
812    if hours > 23 || minutes > 59 {
813        return None;
814    }
815    let total_minutes = hours.saturating_mul(60).saturating_add(minutes);
816    Some(sign.saturating_mul(total_minutes).saturating_mul(60_000))
817}
818
819fn utc_datetime_from_tick_ms(tick_ms: u64) -> Option<chrono::DateTime<Utc>> {
820    let tick_ms = i64::try_from(tick_ms).ok()?;
821    chrono::DateTime::<Utc>::from_timestamp_millis(tick_ms)
822}
823
824fn local_fields_at_tick(timezone: &ParsedTimezone, tick_ms: u64) -> Option<LocalDateTimeFields> {
825    let utc = utc_datetime_from_tick_ms(tick_ms)?;
826    let (minute, hour, day_of_month, month, day_of_week, second, subsec_nanos) = match timezone {
827        ParsedTimezone::FixedOffsetMs(offset_ms) => {
828            let offset_seconds = i32::try_from(offset_ms / 1_000).ok()?;
829            let offset = chrono::FixedOffset::east_opt(offset_seconds)?;
830            let local = utc.with_timezone(&offset);
831            (
832                local.minute(),
833                local.hour(),
834                local.day(),
835                local.month(),
836                local.weekday().num_days_from_sunday(),
837                local.second(),
838                local.nanosecond(),
839            )
840        }
841        ParsedTimezone::Iana(timezone) => {
842            let local = utc.with_timezone(timezone);
843            (
844                local.minute(),
845                local.hour(),
846                local.day(),
847                local.month(),
848                local.weekday().num_days_from_sunday(),
849                local.second(),
850                local.nanosecond(),
851            )
852        }
853    };
854    Some(LocalDateTimeFields {
855        minute,
856        hour,
857        day_of_month,
858        month,
859        day_of_week,
860        second,
861        subsec_nanos,
862    })
863}
864
865fn timezone_offset_ms_at_tick(timezone: &ParsedTimezone, tick_ms: u64) -> Option<i64> {
866    match timezone {
867        ParsedTimezone::FixedOffsetMs(offset) => Some(*offset),
868        ParsedTimezone::Iana(tz) => {
869            let utc = utc_datetime_from_tick_ms(tick_ms)?;
870            let local = utc.with_timezone(tz);
871            Some(i64::from(local.offset().fix().local_minus_utc()).saturating_mul(1_000))
872        }
873    }
874}
875
876fn latest_due_marker_tick_at_or_before(
877    interval_ms: u64,
878    timezone: &ParsedTimezone,
879    tick_ms: u64,
880) -> Option<u64> {
881    match timezone {
882        ParsedTimezone::FixedOffsetMs(timezone_offset_ms) => {
883            latest_due_marker_tick_at_or_before_with_offset(
884                interval_ms,
885                *timezone_offset_ms,
886                tick_ms,
887            )
888        }
889        ParsedTimezone::Iana(_) => {
890            let mut timezone_offset_ms = timezone_offset_ms_at_tick(timezone, tick_ms)?;
891            for _ in 0..4 {
892                let due_tick = latest_due_marker_tick_at_or_before_with_offset(
893                    interval_ms,
894                    timezone_offset_ms,
895                    tick_ms,
896                )?;
897                let due_offset_ms = timezone_offset_ms_at_tick(timezone, due_tick)?;
898                if due_offset_ms == timezone_offset_ms {
899                    return Some(due_tick);
900                }
901                timezone_offset_ms = due_offset_ms;
902            }
903            latest_due_marker_tick_at_or_before_with_offset(
904                interval_ms,
905                timezone_offset_ms,
906                tick_ms,
907            )
908        }
909    }
910}
911
912fn latest_due_marker_tick_at_or_before_with_offset(
913    interval_ms: u64,
914    timezone_offset_ms: i64,
915    tick_ms: u64,
916) -> Option<u64> {
917    let local_tick = i128::from(tick_ms) + i128::from(timezone_offset_ms);
918    if local_tick < 0 {
919        return None;
920    }
921    let local_tick = local_tick as u64;
922    let latest_due_local_tick = local_tick - (local_tick % interval_ms);
923    let due_tick = i128::from(latest_due_local_tick) - i128::from(timezone_offset_ms);
924    if due_tick < 0 {
925        return None;
926    }
927    Some(due_tick as u64)
928}
929
930fn canonical_schedule_id(schedule_id: &str) -> String {
931    schedule_id.trim().to_string()
932}
933
934fn validate_schedule_tick_ms_supported(tick_ms: u64) -> Result<(), ScheduleValidationError> {
935    if tick_ms > i64::MAX as u64 {
936        return Err(ScheduleValidationError::InvalidTickMs(tick_ms));
937    }
938    Ok(())
939}
940
941fn latest_due_cron_tick_at_or_before(
942    cron: &CronExpression,
943    timezone: &ParsedTimezone,
944    tick_ms: u64,
945) -> Option<u64> {
946    let mut candidate = tick_ms - (tick_ms % 60_000);
947    for _ in 0..=CRON_LOOKBACK_MINUTES {
948        let fields = local_fields_at_tick(timezone, candidate)?;
949        if fields.second == 0 && fields.subsec_nanos == 0 && cron.matches(&fields) {
950            return Some(candidate);
951        }
952        candidate = candidate.checked_sub(60_000)?;
953    }
954    None
955}
956
957fn latest_due_tick_at_or_before(
958    schedule_id: &str,
959    interval: &ParsedInterval,
960    timezone: &ParsedTimezone,
961    jitter_ms: u64,
962    tick_ms: u64,
963) -> Option<u64> {
964    let jitter_offset_ms =
965        deterministic_jitter_offset_ms(schedule_id, jitter_ms, interval.jitter_base_interval_ms());
966    let tick_without_jitter = tick_ms.checked_sub(jitter_offset_ms)?;
967    let due_without_jitter = match interval {
968        ParsedInterval::Marker { interval_ms } => {
969            latest_due_marker_tick_at_or_before(*interval_ms, timezone, tick_without_jitter)?
970        }
971        ParsedInterval::Cron(cron) => {
972            latest_due_cron_tick_at_or_before(cron, timezone, tick_without_jitter)?
973        }
974    };
975    due_without_jitter.checked_add(jitter_offset_ms)
976}