1use super::module_boundary::{
4 CORE_MODULE_MCP_TIMEOUT, SCHEDULING_DISPATCH_MCP_TOOL, call_module_mcp_tool_json,
5 mcp_required_error, module_uses_mcp,
6};
7use super::*;
8
9pub fn evaluate_schedules_at_tick(
10 schedules: &[ScheduleDefinition],
11 tick_ms: u64,
12) -> Result<ScheduleEvaluation, ScheduleValidationError> {
13 validate_schedule_tick_ms_supported(tick_ms)?;
14 validate_schedules(schedules)?;
15 let mut due_triggers = Vec::new();
16 for schedule in schedules.iter().filter(|s| s.enabled) {
17 let canonical_schedule_id = canonical_schedule_id(&schedule.schedule_id);
18 let interval = parse_schedule_interval(&schedule.interval).ok_or_else(|| {
19 ScheduleValidationError::InvalidInterval {
20 schedule_id: canonical_schedule_id.clone(),
21 interval: schedule.interval.clone(),
22 }
23 })?;
24 let timezone = parse_schedule_timezone(&schedule.timezone).ok_or_else(|| {
25 ScheduleValidationError::InvalidTimezone {
26 schedule_id: canonical_schedule_id.clone(),
27 timezone: schedule.timezone.clone(),
28 }
29 })?;
30 let Some(due_tick_ms) = latest_due_tick_at_or_before(
31 &canonical_schedule_id,
32 &interval,
33 &timezone,
34 schedule.jitter_ms,
35 tick_ms,
36 ) else {
37 continue;
38 };
39 if due_tick_ms != tick_ms {
40 continue;
41 }
42 due_triggers.push(ScheduleTrigger {
43 schedule_id: canonical_schedule_id,
44 interval: schedule.interval.clone(),
45 timezone: schedule.timezone.clone(),
46 due_tick_ms,
47 });
48 }
49
50 due_triggers.sort_by(|left, right| {
51 left.due_tick_ms
52 .cmp(&right.due_tick_ms)
53 .then_with(|| left.schedule_id.cmp(&right.schedule_id))
54 .then_with(|| left.interval.cmp(&right.interval))
55 .then_with(|| left.timezone.cmp(&right.timezone))
56 });
57
58 Ok(ScheduleEvaluation {
59 tick_ms,
60 due_triggers,
61 })
62}
63
64pub(crate) fn validate_schedules(
65 schedules: &[ScheduleDefinition],
66) -> Result<(), ScheduleValidationError> {
67 let mut seen = BTreeSet::new();
68 for schedule in schedules {
69 let canonical_schedule_id = canonical_schedule_id(&schedule.schedule_id);
70 if canonical_schedule_id.is_empty() {
71 return Err(ScheduleValidationError::EmptyScheduleId);
72 }
73 if !seen.insert(canonical_schedule_id.clone()) {
74 return Err(ScheduleValidationError::DuplicateScheduleId(
75 canonical_schedule_id,
76 ));
77 }
78 if parse_schedule_interval(&schedule.interval).is_none() {
79 return Err(ScheduleValidationError::InvalidInterval {
80 schedule_id: canonical_schedule_id,
81 interval: schedule.interval.clone(),
82 });
83 }
84 if parse_schedule_timezone(&schedule.timezone).is_none() {
85 return Err(ScheduleValidationError::InvalidTimezone {
86 schedule_id: canonical_schedule_id,
87 timezone: schedule.timezone.clone(),
88 });
89 }
90 }
91 Ok(())
92}
93
94impl MobkitRuntimeHandle {
95 fn parse_scheduling_runtime_injection_response(
96 response: Value,
97 ) -> Result<Option<(String, String)>, RuntimeBoundaryError> {
98 let Some(injection) = response
99 .as_object()
100 .and_then(|payload| payload.get("runtime_injection"))
101 .and_then(Value::as_object)
102 .cloned()
103 else {
104 return Ok(None);
105 };
106 let member_id = injection
107 .get("member_id")
108 .and_then(Value::as_str)
109 .map(str::trim)
110 .filter(|value| !value.is_empty())
111 .ok_or_else(|| {
112 RuntimeBoundaryError::Mcp(McpBoundaryError::InvalidToolPayload {
113 module_id: "scheduling".to_string(),
114 tool: SCHEDULING_DISPATCH_MCP_TOOL.to_string(),
115 reason: "runtime_injection.member_id must be a non-empty string".to_string(),
116 })
117 })?;
118 let message = injection
119 .get("message")
120 .and_then(Value::as_str)
121 .map(str::trim)
122 .filter(|value| !value.is_empty())
123 .ok_or_else(|| {
124 RuntimeBoundaryError::Mcp(McpBoundaryError::InvalidToolPayload {
125 module_id: "scheduling".to_string(),
126 tool: SCHEDULING_DISPATCH_MCP_TOOL.to_string(),
127 reason: "runtime_injection.message must be a non-empty string".to_string(),
128 })
129 })?;
130 Ok(Some((member_id.to_string(), message.to_string())))
131 }
132
133 fn scheduling_runtime_injection_for_dispatch(
134 &self,
135 schedule_id: &str,
136 interval: &str,
137 timezone: &str,
138 due_tick_ms: u64,
139 tick_ms: u64,
140 claim_key: &str,
141 ) -> Result<Option<(String, String)>, RuntimeBoundaryError> {
142 let Some((scheduling_module, pre_spawn)) = self.module_and_prespawn("scheduling") else {
143 return Ok(None);
144 };
145 if !self.is_module_loaded("scheduling") {
146 return Ok(None);
147 }
148 if !module_uses_mcp(scheduling_module, pre_spawn) {
149 return Err(mcp_required_error(
150 "scheduling",
151 SCHEDULING_DISPATCH_MCP_TOOL,
152 ));
153 }
154 let response = call_module_mcp_tool_json(
155 scheduling_module,
156 pre_spawn,
157 SCHEDULING_DISPATCH_MCP_TOOL,
158 &serde_json::json!({
159 "schedule_id": schedule_id,
160 "interval": interval,
161 "timezone": timezone,
162 "due_tick_ms": due_tick_ms,
163 "tick_ms": tick_ms,
164 "claim_key": claim_key,
165 }),
166 CORE_MODULE_MCP_TIMEOUT,
167 )?;
168 Self::parse_scheduling_runtime_injection_response(response)
169 }
170
171 fn next_scheduling_dispatch_sequence(&mut self) -> u64 {
172 Self::next_sequence(&mut self.scheduling_dispatch_sequence)
173 }
174 pub fn evaluate_schedule_tick(
175 &self,
176 schedules: &[ScheduleDefinition],
177 tick_ms: u64,
178 ) -> Result<ScheduleEvaluation, ScheduleValidationError> {
179 evaluate_schedules_at_tick(schedules, tick_ms)
180 }
181
182 pub fn dispatch_schedule_tick(
183 &mut self,
184 schedules: &[ScheduleDefinition],
185 tick_ms: u64,
186 ) -> Result<ScheduleDispatchReport, ScheduleValidationError> {
187 validate_schedule_tick_ms_supported(tick_ms)?;
188 validate_schedules(schedules)?;
189 self.prune_schedule_claims(tick_ms);
190 self.prune_scheduling_last_due_ticks(tick_ms);
191 let mut due_triggers = Vec::new();
192 for schedule in schedules.iter().filter(|s| s.enabled) {
193 let canonical_schedule_id = canonical_schedule_id(&schedule.schedule_id);
194 let interval = parse_schedule_interval(&schedule.interval).ok_or_else(|| {
195 ScheduleValidationError::InvalidInterval {
196 schedule_id: canonical_schedule_id.clone(),
197 interval: schedule.interval.clone(),
198 }
199 })?;
200 let timezone = parse_schedule_timezone(&schedule.timezone).ok_or_else(|| {
201 ScheduleValidationError::InvalidTimezone {
202 schedule_id: canonical_schedule_id.clone(),
203 timezone: schedule.timezone.clone(),
204 }
205 })?;
206 let Some(due_tick_ms) = latest_due_tick_at_or_before(
207 &canonical_schedule_id,
208 &interval,
209 &timezone,
210 schedule.jitter_ms,
211 tick_ms,
212 ) else {
213 continue;
214 };
215 let last_due_tick = self
216 .scheduling_last_due_ticks
217 .get(&canonical_schedule_id)
218 .copied();
219 if schedule.catch_up {
220 if last_due_tick.is_some_and(|last| last >= due_tick_ms) {
221 continue;
222 }
223 } else if last_due_tick
224 .is_some_and(|last| last >= due_tick_ms && due_tick_ms != tick_ms)
225 {
226 continue;
227 }
228 due_triggers.push((schedule, canonical_schedule_id, due_tick_ms));
229 }
230 due_triggers.sort_by(
231 |(left_schedule, left_schedule_id, left_due_tick),
232 (right_schedule, right_schedule_id, right_due_tick)| {
233 left_due_tick
234 .cmp(right_due_tick)
235 .then_with(|| left_schedule_id.cmp(right_schedule_id))
236 .then_with(|| left_schedule.interval.cmp(&right_schedule.interval))
237 .then_with(|| left_schedule.timezone.cmp(&right_schedule.timezone))
238 },
239 );
240 let mut dispatched = Vec::new();
241 let mut skipped_claims = Vec::new();
242 let scheduling_signal = self.scheduling_supervisor_signal();
243 let mut supervisor_restart_emitted = false;
244
245 for (trigger, canonical_schedule_id, due_tick_ms) in &due_triggers {
246 let claim_key = format!("{canonical_schedule_id}:{due_tick_ms}");
247 if !self.record_schedule_claim(claim_key.clone(), tick_ms) {
248 skipped_claims.push(claim_key);
249 continue;
250 }
251 self.scheduling_last_due_ticks
252 .insert(canonical_schedule_id.clone(), *due_tick_ms);
253 self.prune_scheduling_last_due_ticks(tick_ms);
254
255 let event_sequence = self.next_scheduling_dispatch_sequence();
256 let event_id =
257 format!("evt-schedule-{canonical_schedule_id}-{due_tick_ms}-{event_sequence}");
258 insert_event_sorted(
259 &mut self.merged_events,
260 EventEnvelope {
261 event_id: event_id.clone(),
262 source: "module".to_string(),
263 timestamp_ms: tick_ms,
264 event: UnifiedEvent::Module(ModuleEvent {
265 module: "scheduling".to_string(),
266 event_type: "dispatch".to_string(),
267 payload: serde_json::json!({
268 "schedule_id": canonical_schedule_id,
269 "interval": trigger.interval,
270 "timezone": trigger.timezone,
271 "tick_ms": tick_ms,
272 "due_tick_ms": due_tick_ms,
273 "claim_key": claim_key,
274 "supervisor_signal": scheduling_signal,
275 }),
276 }),
277 },
278 );
279
280 if let Some(signal) = &scheduling_signal
281 && signal.restart_observed
282 && !supervisor_restart_emitted
283 {
284 insert_event_sorted(
285 &mut self.merged_events,
286 EventEnvelope {
287 event_id: format!("evt-scheduling-supervisor-{tick_ms}-{event_sequence}"),
288 source: "module".to_string(),
289 timestamp_ms: tick_ms,
290 event: UnifiedEvent::Module(ModuleEvent {
291 module: "scheduling".to_string(),
292 event_type: "supervisor.restart".to_string(),
293 payload: serde_json::json!({
294 "module_id": signal.module_id,
295 "latest_state": signal.latest_state,
296 "latest_attempt": signal.latest_attempt,
297 "restart_observed": signal.restart_observed,
298 }),
299 }),
300 },
301 );
302 supervisor_restart_emitted = true;
303 }
304
305 let mut runtime_injection = None;
306 let mut runtime_injection_error = None;
307 match self.scheduling_runtime_injection_for_dispatch(
308 canonical_schedule_id,
309 &trigger.interval,
310 &trigger.timezone,
311 *due_tick_ms,
312 tick_ms,
313 &claim_key,
314 ) {
315 Ok(Some((member_id, message))) => {
316 let injection_event_id =
317 format!("evt-runtime-injection-{tick_ms}-{event_sequence}");
318 insert_event_sorted(
319 &mut self.merged_events,
320 EventEnvelope {
321 event_id: injection_event_id.clone(),
322 source: "module".to_string(),
323 timestamp_ms: tick_ms,
324 event: UnifiedEvent::Module(ModuleEvent {
325 module: "runtime".to_string(),
326 event_type: "injection.dispatch".to_string(),
327 payload: serde_json::json!({
328 "schedule_id": canonical_schedule_id,
329 "claim_key": claim_key,
330 "member_id": member_id,
331 "message": message,
332 }),
333 }),
334 },
335 );
336 runtime_injection = Some(ScheduleRuntimeInjection {
337 member_id,
338 message,
339 injection_event_id,
340 });
341 }
342 Ok(None) => {}
343 Err(error) => {
344 runtime_injection_error = Some(format!("{error:?}"));
345 insert_event_sorted(
346 &mut self.merged_events,
347 EventEnvelope {
348 event_id: format!(
349 "evt-runtime-injection-failed-{tick_ms}-{event_sequence}"
350 ),
351 source: "module".to_string(),
352 timestamp_ms: tick_ms,
353 event: UnifiedEvent::Module(ModuleEvent {
354 module: "runtime".to_string(),
355 event_type: "runtime.injection.failed".to_string(),
356 payload: serde_json::json!({
357 "schedule_id": canonical_schedule_id,
358 "claim_key": claim_key,
359 "error": format!("{error:?}"),
360 }),
361 }),
362 },
363 );
364 }
365 }
366
367 dispatched.push(ScheduleDispatch {
368 claim_key,
369 schedule_id: canonical_schedule_id.clone(),
370 interval: trigger.interval.clone(),
371 timezone: trigger.timezone.clone(),
372 due_tick_ms: *due_tick_ms,
373 tick_ms,
374 event_id,
375 supervisor_signal: scheduling_signal.clone(),
376 runtime_injection,
377 runtime_injection_error,
378 });
379 }
380
381 Ok(ScheduleDispatchReport {
382 tick_ms,
383 due_count: due_triggers.len(),
384 dispatched,
385 skipped_claims,
386 })
387 }
388 fn record_schedule_claim(&mut self, claim_key: String, tick_ms: u64) -> bool {
389 if !self.scheduling_claims.insert(claim_key.clone()) {
390 return false;
391 }
392 self.scheduling_claim_ticks
393 .entry(tick_ms)
394 .or_default()
395 .push(claim_key);
396 true
397 }
398
399 fn prune_schedule_claims(&mut self, current_tick_ms: u64) {
400 let cutoff_tick = current_tick_ms.saturating_sub(SCHEDULING_CLAIM_RETENTION_WINDOW_MS);
401 let expired_ticks = self
402 .scheduling_claim_ticks
403 .keys()
404 .copied()
405 .take_while(|tick| *tick < cutoff_tick)
406 .collect::<Vec<_>>();
407 for tick in expired_ticks {
408 if let Some(keys) = self.scheduling_claim_ticks.remove(&tick) {
409 for key in keys {
410 self.scheduling_claims.remove(&key);
411 }
412 }
413 }
414
415 while self.scheduling_claims.len() > SCHEDULING_CLAIMS_MAX_RETAINED {
416 let Some(oldest_tick) = self.scheduling_claim_ticks.keys().next().copied() else {
417 break;
418 };
419 if let Some(keys) = self.scheduling_claim_ticks.remove(&oldest_tick) {
420 for key in keys {
421 self.scheduling_claims.remove(&key);
422 }
423 } else {
424 break;
425 }
426 }
427 }
428
429 fn prune_scheduling_last_due_ticks(&mut self, current_tick_ms: u64) {
430 let cutoff_tick = current_tick_ms.saturating_sub(SCHEDULING_CLAIM_RETENTION_WINDOW_MS);
431 self.scheduling_last_due_ticks
432 .retain(|_, due_tick| *due_tick >= cutoff_tick);
433
434 while self.scheduling_last_due_ticks.len() > SCHEDULING_LAST_DUE_MAX_RETAINED {
435 let Some(oldest_schedule_id) = self
436 .scheduling_last_due_ticks
437 .iter()
438 .min_by(|(left_id, left_due), (right_id, right_due)| {
439 left_due.cmp(right_due).then_with(|| left_id.cmp(right_id))
440 })
441 .map(|(schedule_id, _)| schedule_id.clone())
442 else {
443 break;
444 };
445 self.scheduling_last_due_ticks.remove(&oldest_schedule_id);
446 }
447 }
448 fn scheduling_supervisor_signal(&self) -> Option<SchedulingSupervisorSignal> {
449 let module_transitions = self
450 .supervisor_report
451 .transitions
452 .iter()
453 .filter(|transition| transition.module_id == "scheduling")
454 .collect::<Vec<_>>();
455 let latest = module_transitions.last()?;
456 let restart_observed = module_transitions
457 .iter()
458 .any(|transition| transition.to == ModuleHealthState::Restarting);
459 Some(SchedulingSupervisorSignal {
460 module_id: latest.module_id.clone(),
461 latest_state: latest.to.clone(),
462 latest_attempt: latest.attempt,
463 restart_observed,
464 })
465 }
466}
467
468#[derive(Debug, Clone, PartialEq, Eq)]
469enum ParsedInterval {
470 Marker { interval_ms: u64 },
471 Cron(CronExpression),
472}
473
474impl ParsedInterval {
475 fn jitter_base_interval_ms(&self) -> u64 {
476 match self {
477 Self::Marker { interval_ms } => *interval_ms,
478 Self::Cron(_) => 60_000,
480 }
481 }
482}
483
484#[derive(Debug, Clone, PartialEq, Eq)]
485enum ParsedTimezone {
486 FixedOffsetMs(i64),
487 Iana(chrono_tz::Tz),
488}
489
490#[derive(Debug, Clone, PartialEq, Eq)]
491struct CronExpression {
492 minute: CronFieldSet,
493 hour: CronFieldSet,
494 day_of_month: CronFieldSet,
495 month: CronFieldSet,
496 day_of_week: CronFieldSet,
497}
498
499#[derive(Debug, Clone, PartialEq, Eq)]
500struct CronFieldSet {
501 any: bool,
502 min: u32,
503 allowed: Vec<bool>,
504}
505
506impl CronExpression {
507 fn parse(expression: &str) -> Option<Self> {
508 let fields = expression.split_whitespace().collect::<Vec<_>>();
509 if fields.len() != 5 {
510 return None;
511 }
512 let parsed = Self {
513 minute: parse_cron_field(fields[0], 0, 59, false)?,
514 hour: parse_cron_field(fields[1], 0, 23, false)?,
515 day_of_month: parse_cron_field(fields[2], 1, 31, false)?,
516 month: parse_cron_field(fields[3], 1, 12, false)?,
517 day_of_week: parse_cron_field(fields[4], 0, 7, true)?,
518 };
519
520 if parsed.day_of_week.any
523 && !parsed.day_of_month.any
524 && !parsed.has_possible_day_of_month_for_selected_months()
525 {
526 return None;
527 }
528
529 Some(parsed)
530 }
531
532 fn matches(&self, local: &LocalDateTimeFields) -> bool {
533 if !self.minute.matches(local.minute)
534 || !self.hour.matches(local.hour)
535 || !self.month.matches(local.month)
536 {
537 return false;
538 }
539
540 let dom_match = self.day_of_month.matches(local.day_of_month);
541 let dow_match = self.day_of_week.matches(local.day_of_week);
542
543 if self.day_of_month.any && self.day_of_week.any {
544 true
545 } else if self.day_of_month.any {
546 dow_match
547 } else if self.day_of_week.any {
548 dom_match
549 } else {
550 dom_match || dow_match
551 }
552 }
553
554 fn has_possible_day_of_month_for_selected_months(&self) -> bool {
555 for month in 1..=12 {
556 if !self.month.matches(month) {
557 continue;
558 }
559 let max_day = max_day_for_month_with_feb_29(month);
560 for day in 1..=max_day {
561 if self.day_of_month.matches(day) {
562 return true;
563 }
564 }
565 }
566 false
567 }
568}
569
570impl CronFieldSet {
571 fn matches(&self, value: u32) -> bool {
572 if value < self.min {
573 return false;
574 }
575 let idx = (value - self.min) as usize;
576 self.allowed.get(idx).copied().unwrap_or(false)
577 }
578}
579
580#[derive(Debug, Clone, PartialEq, Eq)]
581struct LocalDateTimeFields {
582 minute: u32,
583 hour: u32,
584 day_of_month: u32,
585 month: u32,
586 day_of_week: u32,
587 second: u32,
588 subsec_nanos: u32,
589}
590
591fn parse_cron_field(
592 field: &str,
593 min: u32,
594 max: u32,
595 map_sunday_seven_to_zero: bool,
596) -> Option<CronFieldSet> {
597 let mut allowed = vec![false; (max - min + 1) as usize];
598
599 for raw_token in field.split(',') {
600 let token = raw_token.trim();
601 if token.is_empty() {
602 return None;
603 }
604 let (base, step) = match token.split_once('/') {
605 Some((base, step)) => {
606 let step = step.parse::<u32>().ok()?;
607 if step == 0 {
608 return None;
609 }
610 (base.trim(), step)
611 }
612 None => (token, 1),
613 };
614
615 if base == "*" {
616 let mut value = min;
617 while value <= max {
618 let mapped = normalize_cron_value(value, map_sunday_seven_to_zero);
619 let idx = (mapped - min) as usize;
620 allowed[idx] = true;
621 match value.checked_add(step) {
622 Some(next) => value = next,
623 None => break,
624 }
625 }
626 continue;
627 }
628
629 if let Some((start, end)) = base.split_once('-') {
630 let start = parse_cron_raw_value(start.trim(), min, max)?;
631 let end = parse_cron_raw_value(end.trim(), min, max)?;
632 if start > end {
633 return None;
634 }
635 let mut value = start;
636 while value <= end {
637 let mapped = normalize_cron_value(value, map_sunday_seven_to_zero);
638 let idx = (mapped - min) as usize;
639 allowed[idx] = true;
640 match value.checked_add(step) {
641 Some(next) => value = next,
642 None => break,
643 }
644 }
645 continue;
646 }
647
648 let value = parse_cron_value(base, min, max, map_sunday_seven_to_zero)?;
649 let idx = (value - min) as usize;
650 allowed[idx] = true;
651 }
652
653 if allowed.iter().all(|allowed| !allowed) {
654 return None;
655 }
656
657 let any = cron_field_is_semantic_wildcard(min, max, map_sunday_seven_to_zero, &allowed);
658 Some(CronFieldSet { any, min, allowed })
659}
660
661fn cron_field_is_semantic_wildcard(
662 min: u32,
663 max: u32,
664 map_sunday_seven_to_zero: bool,
665 allowed: &[bool],
666) -> bool {
667 let mut covered = vec![false; allowed.len()];
668 for raw in min..=max {
669 let mapped = normalize_cron_value(raw, map_sunday_seven_to_zero);
670 if mapped < min || mapped > max {
671 return false;
672 }
673 let mapped_idx = (mapped - min) as usize;
674 covered[mapped_idx] = true;
675 }
676
677 covered
678 .iter()
679 .enumerate()
680 .filter(|(_, is_semantic_value)| **is_semantic_value)
681 .all(|(idx, _)| allowed.get(idx).copied().unwrap_or(false))
682}
683
684fn parse_cron_value(raw: &str, min: u32, max: u32, map_sunday_seven_to_zero: bool) -> Option<u32> {
685 let value = normalize_cron_value(
686 parse_cron_raw_value(raw, min, max)?,
687 map_sunday_seven_to_zero,
688 );
689 if value < min || value > max {
690 return None;
691 }
692 Some(value)
693}
694
695fn parse_cron_raw_value(raw: &str, min: u32, max: u32) -> Option<u32> {
696 let value = raw.parse::<u32>().ok()?;
697 if value < min || value > max {
698 return None;
699 }
700 Some(value)
701}
702
703fn normalize_cron_value(value: u32, map_sunday_seven_to_zero: bool) -> u32 {
704 if map_sunday_seven_to_zero && value == 7 {
705 0
706 } else {
707 value
708 }
709}
710
711fn max_day_for_month_with_feb_29(month: u32) -> u32 {
712 match month {
713 2 => 29,
714 4 | 6 | 9 | 11 => 30,
715 _ => 31,
716 }
717}
718
719fn parse_interval_marker_ms(interval: &str) -> Option<u64> {
720 let marker = interval.trim().to_ascii_lowercase();
721 let marker = marker.strip_prefix("*/")?;
722 if marker.len() < 2 {
723 return None;
724 }
725 let (count_part, unit_part) = marker.split_at(marker.len() - 1);
726 let count = count_part.parse::<u64>().ok()?;
727 if count == 0 {
728 return None;
729 }
730 let unit_ms = match unit_part {
731 "s" => 1_000,
732 "m" => 60_000,
733 "h" => 3_600_000,
734 "d" => 86_400_000,
735 _ => return None,
736 };
737 count.checked_mul(unit_ms)
738}
739
740fn parse_schedule_interval(interval: &str) -> Option<ParsedInterval> {
741 parse_interval_marker_ms(interval)
742 .map(|interval_ms| ParsedInterval::Marker { interval_ms })
743 .or_else(|| CronExpression::parse(interval.trim()).map(ParsedInterval::Cron))
744}
745
746fn deterministic_jitter_offset_ms(schedule_id: &str, jitter_ms: u64, interval_ms: u64) -> u64 {
747 if jitter_ms == 0 || interval_ms <= 1 {
748 return 0;
749 }
750 let mut hash = 1_469_598_103_934_665_603_u64;
751 for byte in schedule_id.bytes() {
752 hash ^= byte as u64;
753 hash = hash.wrapping_mul(1_099_511_628_211);
754 }
755 let max_jitter = jitter_ms.min(interval_ms.saturating_sub(1));
756 hash % (max_jitter + 1)
757}
758
759fn parse_schedule_timezone(timezone: &str) -> Option<ParsedTimezone> {
760 let timezone = timezone.trim();
761 if timezone.is_empty() {
762 return None;
763 }
764 parse_timezone_offset_ms(timezone)
765 .map(ParsedTimezone::FixedOffsetMs)
766 .or_else(|| {
767 timezone
768 .parse::<chrono_tz::Tz>()
769 .ok()
770 .map(ParsedTimezone::Iana)
771 })
772}
773
774fn parse_timezone_offset_ms(timezone: &str) -> Option<i64> {
775 let tz = timezone.trim();
776 if tz.is_empty() {
777 return None;
778 }
779 if tz.eq_ignore_ascii_case("utc") || tz == "Z" {
780 return Some(0);
781 }
782 let offset = tz
783 .strip_prefix("UTC")
784 .or_else(|| tz.strip_prefix("utc"))
785 .or_else(|| tz.strip_prefix("GMT"))
786 .or_else(|| tz.strip_prefix("gmt"))
787 .unwrap_or(tz);
788 parse_hhmm_offset(offset)
789}
790
791fn parse_hhmm_offset(offset: &str) -> Option<i64> {
792 if offset.is_empty() {
793 return Some(0);
794 }
795 let sign = if offset.starts_with('+') {
796 1_i64
797 } else if offset.starts_with('-') {
798 -1_i64
799 } else {
800 return None;
801 };
802 let body = &offset[1..];
803 let (hours, minutes) = if let Some((h, m)) = body.split_once(':') {
804 (h, m)
805 } else if body.len() == 4 {
806 body.split_at(2)
807 } else {
808 return None;
809 };
810 let hours = hours.parse::<i64>().ok()?;
811 let minutes = minutes.parse::<i64>().ok()?;
812 if hours > 23 || minutes > 59 {
813 return None;
814 }
815 let total_minutes = hours.saturating_mul(60).saturating_add(minutes);
816 Some(sign.saturating_mul(total_minutes).saturating_mul(60_000))
817}
818
819fn utc_datetime_from_tick_ms(tick_ms: u64) -> Option<chrono::DateTime<Utc>> {
820 let tick_ms = i64::try_from(tick_ms).ok()?;
821 chrono::DateTime::<Utc>::from_timestamp_millis(tick_ms)
822}
823
824fn local_fields_at_tick(timezone: &ParsedTimezone, tick_ms: u64) -> Option<LocalDateTimeFields> {
825 let utc = utc_datetime_from_tick_ms(tick_ms)?;
826 let (minute, hour, day_of_month, month, day_of_week, second, subsec_nanos) = match timezone {
827 ParsedTimezone::FixedOffsetMs(offset_ms) => {
828 let offset_seconds = i32::try_from(offset_ms / 1_000).ok()?;
829 let offset = chrono::FixedOffset::east_opt(offset_seconds)?;
830 let local = utc.with_timezone(&offset);
831 (
832 local.minute(),
833 local.hour(),
834 local.day(),
835 local.month(),
836 local.weekday().num_days_from_sunday(),
837 local.second(),
838 local.nanosecond(),
839 )
840 }
841 ParsedTimezone::Iana(timezone) => {
842 let local = utc.with_timezone(timezone);
843 (
844 local.minute(),
845 local.hour(),
846 local.day(),
847 local.month(),
848 local.weekday().num_days_from_sunday(),
849 local.second(),
850 local.nanosecond(),
851 )
852 }
853 };
854 Some(LocalDateTimeFields {
855 minute,
856 hour,
857 day_of_month,
858 month,
859 day_of_week,
860 second,
861 subsec_nanos,
862 })
863}
864
865fn timezone_offset_ms_at_tick(timezone: &ParsedTimezone, tick_ms: u64) -> Option<i64> {
866 match timezone {
867 ParsedTimezone::FixedOffsetMs(offset) => Some(*offset),
868 ParsedTimezone::Iana(tz) => {
869 let utc = utc_datetime_from_tick_ms(tick_ms)?;
870 let local = utc.with_timezone(tz);
871 Some(i64::from(local.offset().fix().local_minus_utc()).saturating_mul(1_000))
872 }
873 }
874}
875
876fn latest_due_marker_tick_at_or_before(
877 interval_ms: u64,
878 timezone: &ParsedTimezone,
879 tick_ms: u64,
880) -> Option<u64> {
881 match timezone {
882 ParsedTimezone::FixedOffsetMs(timezone_offset_ms) => {
883 latest_due_marker_tick_at_or_before_with_offset(
884 interval_ms,
885 *timezone_offset_ms,
886 tick_ms,
887 )
888 }
889 ParsedTimezone::Iana(_) => {
890 let mut timezone_offset_ms = timezone_offset_ms_at_tick(timezone, tick_ms)?;
891 for _ in 0..4 {
892 let due_tick = latest_due_marker_tick_at_or_before_with_offset(
893 interval_ms,
894 timezone_offset_ms,
895 tick_ms,
896 )?;
897 let due_offset_ms = timezone_offset_ms_at_tick(timezone, due_tick)?;
898 if due_offset_ms == timezone_offset_ms {
899 return Some(due_tick);
900 }
901 timezone_offset_ms = due_offset_ms;
902 }
903 latest_due_marker_tick_at_or_before_with_offset(
904 interval_ms,
905 timezone_offset_ms,
906 tick_ms,
907 )
908 }
909 }
910}
911
912fn latest_due_marker_tick_at_or_before_with_offset(
913 interval_ms: u64,
914 timezone_offset_ms: i64,
915 tick_ms: u64,
916) -> Option<u64> {
917 let local_tick = i128::from(tick_ms) + i128::from(timezone_offset_ms);
918 if local_tick < 0 {
919 return None;
920 }
921 let local_tick = local_tick as u64;
922 let latest_due_local_tick = local_tick - (local_tick % interval_ms);
923 let due_tick = i128::from(latest_due_local_tick) - i128::from(timezone_offset_ms);
924 if due_tick < 0 {
925 return None;
926 }
927 Some(due_tick as u64)
928}
929
930fn canonical_schedule_id(schedule_id: &str) -> String {
931 schedule_id.trim().to_string()
932}
933
934fn validate_schedule_tick_ms_supported(tick_ms: u64) -> Result<(), ScheduleValidationError> {
935 if tick_ms > i64::MAX as u64 {
936 return Err(ScheduleValidationError::InvalidTickMs(tick_ms));
937 }
938 Ok(())
939}
940
941fn latest_due_cron_tick_at_or_before(
942 cron: &CronExpression,
943 timezone: &ParsedTimezone,
944 tick_ms: u64,
945) -> Option<u64> {
946 let mut candidate = tick_ms - (tick_ms % 60_000);
947 for _ in 0..=CRON_LOOKBACK_MINUTES {
948 let fields = local_fields_at_tick(timezone, candidate)?;
949 if fields.second == 0 && fields.subsec_nanos == 0 && cron.matches(&fields) {
950 return Some(candidate);
951 }
952 candidate = candidate.checked_sub(60_000)?;
953 }
954 None
955}
956
957fn latest_due_tick_at_or_before(
958 schedule_id: &str,
959 interval: &ParsedInterval,
960 timezone: &ParsedTimezone,
961 jitter_ms: u64,
962 tick_ms: u64,
963) -> Option<u64> {
964 let jitter_offset_ms =
965 deterministic_jitter_offset_ms(schedule_id, jitter_ms, interval.jitter_base_interval_ms());
966 let tick_without_jitter = tick_ms.checked_sub(jitter_offset_ms)?;
967 let due_without_jitter = match interval {
968 ParsedInterval::Marker { interval_ms } => {
969 latest_due_marker_tick_at_or_before(*interval_ms, timezone, tick_without_jitter)?
970 }
971 ParsedInterval::Cron(cron) => {
972 latest_due_cron_tick_at_or_before(cron, timezone, tick_without_jitter)?
973 }
974 };
975 due_without_jitter.checked_add(jitter_offset_ms)
976}