mako_engine/deadline.rs
1//! Deadline tracking for regulatory process timers.
2//!
3//! Every MaKo process is subject to hard regulatory deadlines defined in the
4//! BDEW Application Handbooks. Deadline semantics vary by process family:
5//!
6//! | Process family | Deadline unit | Helper |
7//! |---|---|---|
8//! | GPKE Lieferantenwechsel (BK6-22-024) | 24 wall-clock hours | [`fristen::add_hours`] |
9//! | WiM / GeLi Gas / MABIS | Werktage | [`fristen::add_werktage`] |
10//!
11//! Use the helpers in [`crate::fristen`] to compute the correct `due_at`
12//! timestamp before constructing a [`Deadline`].
13//!
14//! The `DeadlineStore` persists these timers per process stream. A background
15//! scheduler polls [`DeadlineStore::due_now`] and dispatches a
16//! `TimeoutDeadline` command to the owning process when a deadline lapses.
17//! The process workflow then handles the command — e.g. by escalating the
18//! case or switching to a failure path.
19//!
20//! # Usage
21//!
22//! ```rust,ignore
23//! use mako_engine::fristen;
24//!
25//! // GPKE 24h Lieferantenwechsel (BK6-22-024):
26//! let due = fristen::add_hours(OffsetDateTime::now_utc(), 24);
27//! // WiM 5-Werktage confirmation window:
28//! let due = fristen::add_werktage(OffsetDateTime::now_utc().date(), 5,
29//! fristen::HolidayCalendar::BdewMaKo).midnight().assume_utc();
30//!
31//! let deadline = Deadline::new(
32//! process.stream_id().clone(),
33//! process.process_id(),
34//! process.tenant_id(),
35//! process.workflow_id().clone(),
36//! "aperak-response-window",
37//! due,
38//! );
39//! deadline_store.register(&deadline).await?;
40//!
41//! // When the counterparty responds in time, cancel the deadline:
42//! deadline_store.cancel(deadline.deadline_id()).await?;
43//!
44//! // Background scheduler (runs every N minutes):
45//! let result = deadline_store.due_now(100).await?;
46//! for d in result.deadlines {
47//! process_handle.execute(TimeoutDeadline { deadline_id: d.deadline_id() }).await?;
48//! deadline_store.cancel(d.deadline_id()).await?;
49//! }
50//! ```
51//!
52//! [`fristen::add_hours`]: crate::fristen::add_hours
53//! [`fristen::add_werktage`]: crate::fristen::add_werktage
54
55use std::sync::Arc;
56
57#[cfg(any(test, feature = "testing"))]
58use std::collections::HashMap;
59#[cfg(any(test, feature = "testing"))]
60use tokio::sync::RwLock;
61
62use time::OffsetDateTime;
63
64use crate::{
65 error::EngineError,
66 ids::{DeadlineId, ProcessId, StreamId, TenantId},
67 version::WorkflowId,
68};
69
70// ── Deadline ──────────────────────────────────────────────────────────────────
71
72/// A registered regulatory deadline for a single process stream.
73///
74/// Create with [`Deadline::new`], persist via [`DeadlineStore::register`], and
75/// cancel via [`DeadlineStore::cancel`] when the process advances past the
76/// deadline before it fires.
77///
78/// The `label` field identifies the deadline type (e.g.
79/// `"aperak-response-window"`) and is used by the scheduler to dispatch the
80/// correct timeout command.
81#[allow(clippy::struct_field_names)]
82#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
83pub struct Deadline {
84 /// Unique identifier for this deadline entry.
85 deadline_id: DeadlineId,
86
87 /// The process stream this deadline belongs to.
88 stream_id: StreamId,
89
90 /// The process instance this deadline belongs to.
91 process_id: ProcessId,
92
93 /// The tenant that owns this process.
94 tenant_id: TenantId,
95
96 /// The workflow that owns this process (name + format version).
97 ///
98 /// Stored so the deadline scheduler can reconstruct a [`ProcessIdentity`]
99 /// and route the `TimeoutExpired` command to the correct workflow type
100 /// without a separate registry lookup.
101 ///
102 /// [`ProcessIdentity`]: crate::ids::ProcessIdentity
103 workflow_id: WorkflowId,
104
105 /// Human-readable label identifying the deadline type.
106 label: Box<str>,
107
108 /// When this deadline expires.
109 due_at: OffsetDateTime,
110
111 /// When this deadline was registered.
112 created_at: OffsetDateTime,
113}
114
115impl Deadline {
116 /// Construct a new deadline.
117 ///
118 /// `deadline_id` and `created_at` are generated automatically.
119 ///
120 /// `workflow_id` must match the [`WorkflowId`] under which the owning
121 /// process was started (i.e. `process.workflow_id().clone()`). The
122 /// deadline scheduler uses it to reconstruct a [`ProcessIdentity`] and
123 /// route the `TimeoutExpired` command to the correct workflow type.
124 ///
125 /// [`ProcessIdentity`]: crate::ids::ProcessIdentity
126 #[must_use]
127 pub fn new(
128 stream_id: StreamId,
129 process_id: ProcessId,
130 tenant_id: TenantId,
131 workflow_id: WorkflowId,
132 label: impl Into<Box<str>>,
133 due_at: OffsetDateTime,
134 ) -> Self {
135 Self {
136 deadline_id: DeadlineId::new(),
137 stream_id,
138 process_id,
139 tenant_id,
140 workflow_id,
141 label: label.into(),
142 due_at,
143 created_at: OffsetDateTime::now_utc(),
144 }
145 }
146
147 /// Return `true` when this deadline has passed relative to `now`.
148 ///
149 /// ```rust
150 /// use mako_engine::deadline::Deadline;
151 /// use mako_engine::ids::{ProcessId, StreamId, TenantId};
152 /// use mako_engine::version::WorkflowId;
153 /// use time::{Duration, OffsetDateTime};
154 ///
155 /// let past = Deadline::new(
156 /// StreamId::new("process/x"),
157 /// ProcessId::new(),
158 /// TenantId::new(),
159 /// WorkflowId::new("gpke-supplier-change", "FV2025-10-01"),
160 /// "aperak-response-window",
161 /// OffsetDateTime::now_utc() - Duration::seconds(1),
162 /// );
163 /// assert!(past.is_due(OffsetDateTime::now_utc()));
164 /// ```
165 #[must_use]
166 pub fn is_due(&self, now: OffsetDateTime) -> bool {
167 self.due_at <= now
168 }
169
170 /// The unique identifier of this deadline.
171 #[must_use]
172 pub fn deadline_id(&self) -> DeadlineId {
173 self.deadline_id
174 }
175
176 /// The stream this deadline belongs to.
177 #[must_use]
178 pub fn stream_id(&self) -> &StreamId {
179 &self.stream_id
180 }
181
182 /// The process instance this deadline belongs to.
183 #[must_use]
184 pub fn process_id(&self) -> ProcessId {
185 self.process_id
186 }
187
188 /// The tenant that owns this process.
189 #[must_use]
190 pub fn tenant_id(&self) -> TenantId {
191 self.tenant_id
192 }
193
194 /// The workflow that owns this process.
195 ///
196 /// Used by the deadline scheduler to reconstruct a [`ProcessIdentity`]
197 /// and route the `TimeoutExpired` command to the correct workflow type.
198 ///
199 /// [`ProcessIdentity`]: crate::ids::ProcessIdentity
200 #[must_use]
201 pub fn workflow_id(&self) -> &WorkflowId {
202 &self.workflow_id
203 }
204
205 /// The human-readable label identifying the deadline type (e.g.
206 /// `"aperak-response-window"`).
207 #[must_use]
208 pub fn label(&self) -> &str {
209 &self.label
210 }
211
212 /// When this deadline expires.
213 #[must_use]
214 pub fn due_at(&self) -> OffsetDateTime {
215 self.due_at
216 }
217
218 /// When this deadline was registered.
219 #[must_use]
220 pub fn created_at(&self) -> OffsetDateTime {
221 self.created_at
222 }
223}
224
225// ── DueNowResult ──────────────────────────────────────────────────────────────
226
227/// Result of a [`DeadlineStore::due_now`] poll.
228///
229/// When `has_more` is `true`, the store has additional expired deadlines beyond
230/// the returned `deadlines`. The scheduler should drain in a loop until
231/// `has_more` is `false` to avoid leaving unfired deadlines in the store.
232///
233/// ```rust
234/// # tokio_test::block_on(async {
235/// # use mako_engine::deadline::{InMemoryDeadlineStore, DeadlineStore, Deadline};
236/// # use mako_engine::ids::{ProcessId, StreamId, TenantId};
237/// # use time::OffsetDateTime;
238/// let store = InMemoryDeadlineStore::new();
239/// loop {
240/// let result = store.due_now(50).await.unwrap();
241/// for deadline in result.deadlines {
242/// // dispatch TimeoutDeadline command …
243/// store.cancel(deadline.deadline_id()).await.unwrap();
244/// }
245/// if !result.has_more { break; }
246/// }
247/// # });
248/// ```
249#[derive(Debug, Clone)]
250pub struct DueNowResult {
251 /// Expired deadlines, ordered soonest-first.
252 pub deadlines: Vec<Deadline>,
253 /// `true` when the store contains more expired deadlines beyond `deadlines`.
254 pub has_more: bool,
255}
256
257// ── DeadlineStore ─────────────────────────────────────────────────────────────
258
259/// Storage contract for process deadlines.
260///
261/// ## Scheduler contract
262///
263/// A background timer task should poll this store periodically:
264///
265/// 1. Call [`DeadlineStore::due_now`] to retrieve expired deadlines.
266/// 2. Dispatch a `TimeoutDeadline` command to each owning process.
267/// 3. Call [`DeadlineStore::cancel`] to remove the fired deadline.
268///
269/// Cancelling a deadline before the scheduler fires it prevents a spurious
270/// `TimeoutDeadline` command from being dispatched to the process. Always
271/// cancel deadlines when the process advances past them naturally (e.g. when
272/// the expected counterparty response arrives in time).
273///
274/// ## Blanket `Arc` implementation
275///
276/// `Arc<S>` implements `DeadlineStore` whenever `S: DeadlineStore`, enabling
277/// shared access from both the scheduler and command handlers.
278#[allow(async_fn_in_trait)]
279pub trait DeadlineStore: Send + Sync {
280 /// Register a new deadline.
281 ///
282 /// Upserts by `deadline_id`: if a deadline with the same ID already
283 /// exists it is replaced.
284 ///
285 /// # Errors
286 ///
287 /// Returns [`EngineError::Deadline`] on storage failure.
288 async fn register(&self, deadline: &Deadline) -> Result<(), EngineError>;
289
290 /// Cancel a registered deadline by ID.
291 ///
292 /// No-op when the deadline does not exist.
293 ///
294 /// # Errors
295 ///
296 /// Returns [`EngineError::Deadline`] on storage failure.
297 async fn cancel(&self, id: DeadlineId) -> Result<(), EngineError>;
298
299 /// Return up to `limit` deadlines whose `due_at <= now_utc()`, ordered
300 /// soonest-first.
301 ///
302 /// When the store contains more expired deadlines than `limit`, the
303 /// returned [`DueNowResult::has_more`] is `true`. Callers should drain
304 /// in a loop until `has_more` is `false`.
305 ///
306 /// # Errors
307 ///
308 /// Returns [`EngineError::Deadline`] on storage failure.
309 async fn due_now(&self, limit: usize) -> Result<DueNowResult, EngineError>;
310
311 /// Return all active deadlines for `stream_id`, in registration order.
312 ///
313 /// # Errors
314 ///
315 /// Returns [`EngineError::Deadline`] on storage failure.
316 async fn for_stream(&self, stream_id: &StreamId) -> Result<Vec<Deadline>, EngineError>;
317
318 /// Total number of registered deadlines.
319 ///
320 /// # Errors
321 ///
322 /// Returns [`EngineError::Deadline`] on storage failure.
323 async fn len(&self) -> Result<usize, EngineError>;
324
325 /// Return `true` when no deadlines are registered.
326 ///
327 /// # Errors
328 ///
329 /// Returns [`EngineError::Deadline`] on storage failure.
330 async fn is_empty(&self) -> Result<bool, EngineError> {
331 Ok(self.len().await? == 0)
332 }
333
334 /// Count deadlines whose `due_at ≤ now` that have not yet been cancelled.
335 ///
336 /// Indicates scheduler lag: a non-zero value means `TimeoutExpired` commands
337 /// are not being dispatched in time, which is a compliance violation.
338 ///
339 /// The default implementation delegates to [`due_now`] with a limit of
340 /// 10 000; if there are more overdue deadlines, returns 10 000 (capped).
341 /// Implementations can override for a more efficient point-count query.
342 ///
343 /// # Errors
344 ///
345 /// Returns [`EngineError::Deadline`] on storage failure.
346 ///
347 /// [`due_now`]: DeadlineStore::due_now
348 async fn overdue_count(&self) -> Result<usize, EngineError> {
349 const LIMIT: usize = 10_000;
350 let result = self.due_now(LIMIT).await?;
351 Ok(if result.has_more {
352 LIMIT
353 } else {
354 result.deadlines.len()
355 })
356 }
357}
358
359// ── Arc<S> blanket impl ───────────────────────────────────────────────────────
360
361impl<S: DeadlineStore> DeadlineStore for Arc<S> {
362 async fn register(&self, deadline: &Deadline) -> Result<(), EngineError> {
363 self.as_ref().register(deadline).await
364 }
365
366 async fn cancel(&self, id: DeadlineId) -> Result<(), EngineError> {
367 self.as_ref().cancel(id).await
368 }
369
370 async fn due_now(&self, limit: usize) -> Result<DueNowResult, EngineError> {
371 self.as_ref().due_now(limit).await
372 }
373
374 async fn for_stream(&self, stream_id: &StreamId) -> Result<Vec<Deadline>, EngineError> {
375 self.as_ref().for_stream(stream_id).await
376 }
377
378 async fn len(&self) -> Result<usize, EngineError> {
379 self.as_ref().len().await
380 }
381
382 async fn overdue_count(&self) -> Result<usize, EngineError> {
383 self.as_ref().overdue_count().await
384 }
385}
386
387// ── NoopDeadlineStore ─────────────────────────────────────────────────────────
388
389/// A [`DeadlineStore`] that never persists anything.
390///
391/// `register` succeeds silently; `due_now` always returns an empty list.
392/// Use this as the default when deadline tracking is not needed.
393///
394/// # ⚠️ Silent deadline loss
395///
396/// `NoopDeadlineStore` **discards every deadline registration silently**. No
397/// scheduler timeout will ever fire. Missed deadlines are a compliance
398/// violation under BNetzA monitoring. Do not use in production.
399///
400/// This type is available in all build configurations so it can serve as a
401/// default type parameter in [`EngineBuilder`]. However, [`EngineBuilder::new`]
402/// (which wires this as the default) is only available with the `testing`
403/// feature or in `cfg(test)`. Production binaries must call
404/// [`EngineBuilder::with_stores`] instead.
405///
406/// [`EngineBuilder`]: crate::builder::EngineBuilder
407/// [`EngineBuilder::new`]: crate::builder::EngineBuilder::new
408/// [`EngineBuilder::with_stores`]: crate::builder::EngineBuilder::with_stores
409#[derive(Debug, Clone, Copy, Default)]
410#[must_use = "NoopDeadlineStore discards all deadlines silently — use a persistent DeadlineStore in production"]
411pub struct NoopDeadlineStore;
412
413#[cfg(any(test, feature = "testing"))]
414impl DeadlineStore for NoopDeadlineStore {
415 async fn register(&self, _deadline: &Deadline) -> Result<(), EngineError> {
416 Ok(())
417 }
418
419 async fn cancel(&self, _id: DeadlineId) -> Result<(), EngineError> {
420 Ok(())
421 }
422
423 async fn due_now(&self, _limit: usize) -> Result<DueNowResult, EngineError> {
424 Ok(DueNowResult {
425 deadlines: Vec::new(),
426 has_more: false,
427 })
428 }
429
430 async fn for_stream(&self, _stream_id: &StreamId) -> Result<Vec<Deadline>, EngineError> {
431 Ok(Vec::new())
432 }
433
434 async fn len(&self) -> Result<usize, EngineError> {
435 Ok(0)
436 }
437}
438
439// ── InMemoryDeadlineStore ─────────────────────────────────────────────────────
440
441/// An in-memory [`DeadlineStore`] for tests and development.
442///
443/// Backed by a `HashMap` protected by a `Mutex`. Cloning shares the
444/// underlying data via `Arc` — all clones see the same deadlines.
445///
446/// **Not production-safe.** Use this for:
447/// - Unit and integration tests
448/// - Examples and local development
449/// - Verifying the scheduler loop without an external timer service
450///
451/// Only available in `#[cfg(test)]` or with the `testing` feature enabled.
452#[cfg(any(test, feature = "testing"))]
453#[derive(Debug, Default, Clone)]
454pub struct InMemoryDeadlineStore {
455 inner: Arc<RwLock<HashMap<DeadlineId, Deadline>>>,
456}
457
458#[cfg(any(test, feature = "testing"))]
459impl InMemoryDeadlineStore {
460 /// Create an empty deadline store.
461 #[must_use]
462 pub fn new() -> Self {
463 Self::default()
464 }
465
466 /// Return `true` when no deadlines are registered.
467 pub async fn is_empty(&self) -> bool {
468 self.inner.read().await.is_empty()
469 }
470}
471
472#[cfg(any(test, feature = "testing"))]
473impl DeadlineStore for InMemoryDeadlineStore {
474 async fn register(&self, deadline: &Deadline) -> Result<(), EngineError> {
475 self.inner
476 .write()
477 .await
478 .insert(deadline.deadline_id, deadline.clone());
479 Ok(())
480 }
481
482 async fn cancel(&self, id: DeadlineId) -> Result<(), EngineError> {
483 self.inner.write().await.remove(&id);
484 Ok(())
485 }
486
487 async fn due_now(&self, limit: usize) -> Result<DueNowResult, EngineError> {
488 let now = OffsetDateTime::now_utc();
489 let map = self.inner.read().await;
490 let mut due: Vec<_> = map.values().filter(|d| d.is_due(now)).cloned().collect();
491 // Soonest-first: the scheduler processes the most urgent deadlines first.
492 due.sort_by_key(|d| d.due_at);
493 // Probe one extra to detect whether more remain.
494 let has_more = due.len() > limit;
495 due.truncate(limit);
496 Ok(DueNowResult {
497 deadlines: due,
498 has_more,
499 })
500 }
501
502 async fn for_stream(&self, stream_id: &StreamId) -> Result<Vec<Deadline>, EngineError> {
503 let map = self.inner.read().await;
504 Ok(map
505 .values()
506 .filter(|d| &d.stream_id == stream_id)
507 .cloned()
508 .collect())
509 }
510
511 async fn len(&self) -> Result<usize, EngineError> {
512 Ok(self.inner.read().await.len())
513 }
514}
515
516#[cfg(test)]
517mod tests {
518 use super::*;
519 use time::Duration;
520
521 fn make_deadline(due_at: OffsetDateTime) -> Deadline {
522 Deadline::new(
523 StreamId::new("process/test"),
524 ProcessId::new(),
525 TenantId::new(),
526 WorkflowId::new("test-workflow", "FV2025-10-01"),
527 "aperak-response-window",
528 due_at,
529 )
530 }
531
532 #[tokio::test]
533 async fn register_and_cancel() {
534 let store = InMemoryDeadlineStore::new();
535 let d = make_deadline(OffsetDateTime::now_utc() + Duration::days(5));
536 let id = d.deadline_id;
537
538 store.register(&d).await.unwrap();
539 assert_eq!(store.len().await.unwrap(), 1);
540
541 store.cancel(id).await.unwrap();
542 assert!(store.is_empty().await);
543 }
544
545 #[tokio::test]
546 async fn due_now_only_returns_overdue() {
547 let store = InMemoryDeadlineStore::new();
548 let past = make_deadline(OffsetDateTime::now_utc() - Duration::seconds(1));
549 let future = make_deadline(OffsetDateTime::now_utc() + Duration::days(5));
550
551 store.register(&past).await.unwrap();
552 store.register(&future).await.unwrap();
553
554 let due = store.due_now(100).await.unwrap();
555 assert_eq!(due.deadlines.len(), 1);
556 assert_eq!(due.deadlines[0].label.as_ref(), "aperak-response-window");
557 assert!(!due.has_more);
558 }
559
560 #[tokio::test]
561 async fn due_now_ordered_soonest_first() {
562 let store = InMemoryDeadlineStore::new();
563 let t1 = OffsetDateTime::now_utc() - Duration::seconds(60);
564 let t2 = OffsetDateTime::now_utc() - Duration::seconds(10);
565 let t3 = OffsetDateTime::now_utc() - Duration::seconds(1);
566
567 // Register out of order to verify sorting.
568 store.register(&make_deadline(t3)).await.unwrap();
569 store.register(&make_deadline(t1)).await.unwrap();
570 store.register(&make_deadline(t2)).await.unwrap();
571
572 let due = store.due_now(10).await.unwrap();
573 assert_eq!(due.deadlines.len(), 3);
574 assert!(due.deadlines[0].due_at <= due.deadlines[1].due_at);
575 assert!(due.deadlines[1].due_at <= due.deadlines[2].due_at);
576 assert!(!due.has_more);
577 }
578
579 #[tokio::test]
580 async fn for_stream_filters_by_stream() {
581 let store = InMemoryDeadlineStore::new();
582 let stream1 = StreamId::new("process/aaa");
583 let stream2 = StreamId::new("process/bbb");
584 let d1 = Deadline::new(
585 stream1.clone(),
586 ProcessId::new(),
587 TenantId::new(),
588 WorkflowId::new("test-workflow", "FV2025-10-01"),
589 "label",
590 OffsetDateTime::now_utc() + Duration::days(1),
591 );
592 let d2 = Deadline::new(
593 stream2.clone(),
594 ProcessId::new(),
595 TenantId::new(),
596 WorkflowId::new("test-workflow", "FV2025-10-01"),
597 "label",
598 OffsetDateTime::now_utc() + Duration::days(1),
599 );
600
601 store.register(&d1).await.unwrap();
602 store.register(&d2).await.unwrap();
603
604 let for1 = store.for_stream(&stream1).await.unwrap();
605 assert_eq!(for1.len(), 1);
606 assert_eq!(for1[0].stream_id, stream1);
607 }
608
609 #[tokio::test]
610 async fn register_upserts_on_same_id() {
611 let store = InMemoryDeadlineStore::new();
612 let mut d = make_deadline(OffsetDateTime::now_utc() + Duration::days(5));
613 store.register(&d).await.unwrap();
614
615 let new_due = OffsetDateTime::now_utc() + Duration::days(10);
616 d.due_at = new_due;
617 store.register(&d).await.unwrap();
618
619 assert_eq!(
620 store.len().await.unwrap(),
621 1,
622 "upsert must not create a duplicate"
623 );
624 let found = store.for_stream(&d.stream_id).await.unwrap();
625 assert_eq!(found[0].due_at, new_due);
626 }
627
628 #[tokio::test]
629 async fn noop_store_succeeds_silently() {
630 let store = NoopDeadlineStore;
631 let d = make_deadline(OffsetDateTime::now_utc() - Duration::seconds(1));
632 store.register(&d).await.unwrap();
633 assert!(store.due_now(10).await.unwrap().deadlines.is_empty());
634 assert!(store.is_empty().await.unwrap());
635 }
636
637 #[tokio::test]
638 async fn clone_shares_state() {
639 let store1 = InMemoryDeadlineStore::new();
640 let store2 = store1.clone();
641 let d = make_deadline(OffsetDateTime::now_utc() + Duration::days(1));
642 store1.register(&d).await.unwrap();
643 assert_eq!(store2.len().await.unwrap(), 1);
644 }
645
646 #[tokio::test]
647 async fn due_now_has_more_signals_truncation() {
648 let store = InMemoryDeadlineStore::new();
649 let past = OffsetDateTime::now_utc() - Duration::seconds(1);
650 for _ in 0..5 {
651 store.register(&make_deadline(past)).await.unwrap();
652 }
653
654 // Request fewer than available — has_more must be true.
655 let r = store.due_now(3).await.unwrap();
656 assert_eq!(r.deadlines.len(), 3);
657 assert!(r.has_more);
658
659 // Request all — has_more must be false.
660 let r2 = store.due_now(10).await.unwrap();
661 assert_eq!(r2.deadlines.len(), 5);
662 assert!(!r2.has_more);
663 }
664}