Skip to main content

mako_engine/
deadline.rs

1//! Deadline tracking for regulatory process timers.
2//!
3//! Every MaKo process is subject to hard regulatory deadlines defined in the
4//! BDEW Application Handbooks. Deadline semantics vary by process family:
5//!
6//! | Process family | Deadline unit | Helper |
7//! |---|---|---|
8//! | GPKE Lieferantenwechsel (BK6-22-024) | 24 wall-clock hours | [`fristen::add_hours`] |
9//! | WiM / GeLi Gas / MABIS | Werktage | [`fristen::add_werktage`] |
10//!
11//! Use the helpers in [`crate::fristen`] to compute the correct `due_at`
12//! timestamp before constructing a [`Deadline`].
13//!
14//! The `DeadlineStore` persists these timers per process stream. A background
15//! scheduler polls [`DeadlineStore::due_now`] and dispatches a
16//! `TimeoutDeadline` command to the owning process when a deadline lapses.
17//! The process workflow then handles the command — e.g. by escalating the
18//! case or switching to a failure path.
19//!
20//! # Usage
21//!
22//! ```rust,ignore
23//! use mako_engine::fristen;
24//!
25//! // GPKE 24h Lieferantenwechsel (BK6-22-024):
26//! let due = fristen::add_hours(OffsetDateTime::now_utc(), 24);
27//! // WiM 5-Werktage confirmation window:
28//! let due = fristen::add_werktage(OffsetDateTime::now_utc().date(), 5,
29//!     fristen::HolidayCalendar::BdewMaKo).midnight().assume_utc();
30//!
31//! let deadline = Deadline::new(
32//!     process.stream_id().clone(),
33//!     process.process_id(),
34//!     process.tenant_id(),
35//!     process.workflow_id().clone(),
36//!     "aperak-response-window",
37//!     due,
38//! );
39//! deadline_store.register(&deadline).await?;
40//!
41//! // When the counterparty responds in time, cancel the deadline:
42//! deadline_store.cancel(deadline.deadline_id()).await?;
43//!
44//! // Background scheduler (runs every N minutes):
45//! let result = deadline_store.due_now(100).await?;
46//! for d in result.deadlines {
47//!     process_handle.execute(TimeoutDeadline { deadline_id: d.deadline_id() }).await?;
48//!     deadline_store.cancel(d.deadline_id()).await?;
49//! }
50//! ```
51//!
52//! [`fristen::add_hours`]: crate::fristen::add_hours
53//! [`fristen::add_werktage`]: crate::fristen::add_werktage
54
55use std::sync::Arc;
56
57#[cfg(any(test, feature = "testing"))]
58use std::collections::HashMap;
59#[cfg(any(test, feature = "testing"))]
60use tokio::sync::RwLock;
61
62use time::OffsetDateTime;
63
64use crate::{
65    error::EngineError,
66    ids::{DeadlineId, ProcessId, StreamId, TenantId},
67    version::WorkflowId,
68};
69
70// ── Deadline ──────────────────────────────────────────────────────────────────
71
72/// A registered regulatory deadline for a single process stream.
73///
74/// Create with [`Deadline::new`], persist via [`DeadlineStore::register`], and
75/// cancel via [`DeadlineStore::cancel`] when the process advances past the
76/// deadline before it fires.
77///
78/// The `label` field identifies the deadline type (e.g.
79/// `"aperak-response-window"`) and is used by the scheduler to dispatch the
80/// correct timeout command.
81#[allow(clippy::struct_field_names)]
82#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
83pub struct Deadline {
84    /// Unique identifier for this deadline entry.
85    deadline_id: DeadlineId,
86
87    /// The process stream this deadline belongs to.
88    stream_id: StreamId,
89
90    /// The process instance this deadline belongs to.
91    process_id: ProcessId,
92
93    /// The tenant that owns this process.
94    tenant_id: TenantId,
95
96    /// The workflow that owns this process (name + format version).
97    ///
98    /// Stored so the deadline scheduler can reconstruct a [`ProcessIdentity`]
99    /// and route the `TimeoutExpired` command to the correct workflow type
100    /// without a separate registry lookup.
101    ///
102    /// [`ProcessIdentity`]: crate::ids::ProcessIdentity
103    workflow_id: WorkflowId,
104
105    /// Human-readable label identifying the deadline type.
106    label: Box<str>,
107
108    /// When this deadline expires.
109    due_at: OffsetDateTime,
110
111    /// When this deadline was registered.
112    created_at: OffsetDateTime,
113}
114
115impl Deadline {
116    /// Construct a new deadline.
117    ///
118    /// `deadline_id` and `created_at` are generated automatically.
119    ///
120    /// `workflow_id` must match the [`WorkflowId`] under which the owning
121    /// process was started (i.e. `process.workflow_id().clone()`). The
122    /// deadline scheduler uses it to reconstruct a [`ProcessIdentity`] and
123    /// route the `TimeoutExpired` command to the correct workflow type.
124    ///
125    /// [`ProcessIdentity`]: crate::ids::ProcessIdentity
126    #[must_use]
127    pub fn new(
128        stream_id: StreamId,
129        process_id: ProcessId,
130        tenant_id: TenantId,
131        workflow_id: WorkflowId,
132        label: impl Into<Box<str>>,
133        due_at: OffsetDateTime,
134    ) -> Self {
135        Self {
136            deadline_id: DeadlineId::new(),
137            stream_id,
138            process_id,
139            tenant_id,
140            workflow_id,
141            label: label.into(),
142            due_at,
143            created_at: OffsetDateTime::now_utc(),
144        }
145    }
146
147    /// Return `true` when this deadline has passed relative to `now`.
148    ///
149    /// ```rust
150    /// use mako_engine::deadline::Deadline;
151    /// use mako_engine::ids::{ProcessId, StreamId, TenantId};
152    /// use mako_engine::version::WorkflowId;
153    /// use time::{Duration, OffsetDateTime};
154    ///
155    /// let past = Deadline::new(
156    ///     StreamId::new("process/x"),
157    ///     ProcessId::new(),
158    ///     TenantId::new(),
159    ///     WorkflowId::new("gpke-supplier-change", "FV2025-10-01"),
160    ///     "aperak-response-window",
161    ///     OffsetDateTime::now_utc() - Duration::seconds(1),
162    /// );
163    /// assert!(past.is_due(OffsetDateTime::now_utc()));
164    /// ```
165    #[must_use]
166    pub fn is_due(&self, now: OffsetDateTime) -> bool {
167        self.due_at <= now
168    }
169
170    /// The unique identifier of this deadline.
171    #[must_use]
172    pub fn deadline_id(&self) -> DeadlineId {
173        self.deadline_id
174    }
175
176    /// The stream this deadline belongs to.
177    #[must_use]
178    pub fn stream_id(&self) -> &StreamId {
179        &self.stream_id
180    }
181
182    /// The process instance this deadline belongs to.
183    #[must_use]
184    pub fn process_id(&self) -> ProcessId {
185        self.process_id
186    }
187
188    /// The tenant that owns this process.
189    #[must_use]
190    pub fn tenant_id(&self) -> TenantId {
191        self.tenant_id
192    }
193
194    /// The workflow that owns this process.
195    ///
196    /// Used by the deadline scheduler to reconstruct a [`ProcessIdentity`]
197    /// and route the `TimeoutExpired` command to the correct workflow type.
198    ///
199    /// [`ProcessIdentity`]: crate::ids::ProcessIdentity
200    #[must_use]
201    pub fn workflow_id(&self) -> &WorkflowId {
202        &self.workflow_id
203    }
204
205    /// The human-readable label identifying the deadline type (e.g.
206    /// `"aperak-response-window"`).
207    #[must_use]
208    pub fn label(&self) -> &str {
209        &self.label
210    }
211
212    /// When this deadline expires.
213    #[must_use]
214    pub fn due_at(&self) -> OffsetDateTime {
215        self.due_at
216    }
217
218    /// When this deadline was registered.
219    #[must_use]
220    pub fn created_at(&self) -> OffsetDateTime {
221        self.created_at
222    }
223}
224
225// ── DueNowResult ──────────────────────────────────────────────────────────────
226
227/// Result of a [`DeadlineStore::due_now`] poll.
228///
229/// When `has_more` is `true`, the store has additional expired deadlines beyond
230/// the returned `deadlines`. The scheduler should drain in a loop until
231/// `has_more` is `false` to avoid leaving unfired deadlines in the store.
232///
233/// ```rust
234/// # tokio_test::block_on(async {
235/// # use mako_engine::deadline::{InMemoryDeadlineStore, DeadlineStore, Deadline};
236/// # use mako_engine::ids::{ProcessId, StreamId, TenantId};
237/// # use time::OffsetDateTime;
238/// let store = InMemoryDeadlineStore::new();
239/// loop {
240///     let result = store.due_now(50).await.unwrap();
241///     for deadline in result.deadlines {
242///         // dispatch TimeoutDeadline command …
243///         store.cancel(deadline.deadline_id()).await.unwrap();
244///     }
245///     if !result.has_more { break; }
246/// }
247/// # });
248/// ```
249#[derive(Debug, Clone)]
250pub struct DueNowResult {
251    /// Expired deadlines, ordered soonest-first.
252    pub deadlines: Vec<Deadline>,
253    /// `true` when the store contains more expired deadlines beyond `deadlines`.
254    pub has_more: bool,
255}
256
257// ── DeadlineStore ─────────────────────────────────────────────────────────────
258
259/// Storage contract for process deadlines.
260///
261/// ## Scheduler contract
262///
263/// A background timer task should poll this store periodically:
264///
265/// 1. Call [`DeadlineStore::due_now`] to retrieve expired deadlines.
266/// 2. Dispatch a `TimeoutDeadline` command to each owning process.
267/// 3. Call [`DeadlineStore::cancel`] to remove the fired deadline.
268///
269/// Cancelling a deadline before the scheduler fires it prevents a spurious
270/// `TimeoutDeadline` command from being dispatched to the process. Always
271/// cancel deadlines when the process advances past them naturally (e.g. when
272/// the expected counterparty response arrives in time).
273///
274/// ## Blanket `Arc` implementation
275///
276/// `Arc<S>` implements `DeadlineStore` whenever `S: DeadlineStore`, enabling
277/// shared access from both the scheduler and command handlers.
278#[allow(async_fn_in_trait)]
279pub trait DeadlineStore: Send + Sync {
280    /// Register a new deadline.
281    ///
282    /// Upserts by `deadline_id`: if a deadline with the same ID already
283    /// exists it is replaced.
284    ///
285    /// # Errors
286    ///
287    /// Returns [`EngineError::Deadline`] on storage failure.
288    async fn register(&self, deadline: &Deadline) -> Result<(), EngineError>;
289
290    /// Cancel a registered deadline by ID.
291    ///
292    /// No-op when the deadline does not exist.
293    ///
294    /// # Errors
295    ///
296    /// Returns [`EngineError::Deadline`] on storage failure.
297    async fn cancel(&self, id: DeadlineId) -> Result<(), EngineError>;
298
299    /// Return up to `limit` deadlines whose `due_at <= now_utc()`, ordered
300    /// soonest-first.
301    ///
302    /// When the store contains more expired deadlines than `limit`, the
303    /// returned [`DueNowResult::has_more`] is `true`. Callers should drain
304    /// in a loop until `has_more` is `false`.
305    ///
306    /// # Errors
307    ///
308    /// Returns [`EngineError::Deadline`] on storage failure.
309    async fn due_now(&self, limit: usize) -> Result<DueNowResult, EngineError>;
310
311    /// Return all active deadlines for `stream_id`, in registration order.
312    ///
313    /// # Errors
314    ///
315    /// Returns [`EngineError::Deadline`] on storage failure.
316    async fn for_stream(&self, stream_id: &StreamId) -> Result<Vec<Deadline>, EngineError>;
317
318    /// Total number of registered deadlines.
319    ///
320    /// # Errors
321    ///
322    /// Returns [`EngineError::Deadline`] on storage failure.
323    async fn len(&self) -> Result<usize, EngineError>;
324
325    /// Return `true` when no deadlines are registered.
326    ///
327    /// # Errors
328    ///
329    /// Returns [`EngineError::Deadline`] on storage failure.
330    async fn is_empty(&self) -> Result<bool, EngineError> {
331        Ok(self.len().await? == 0)
332    }
333
334    /// Count deadlines whose `due_at ≤ now` that have not yet been cancelled.
335    ///
336    /// Indicates scheduler lag: a non-zero value means `TimeoutExpired` commands
337    /// are not being dispatched in time, which is a compliance violation.
338    ///
339    /// The default implementation delegates to [`due_now`] with a limit of
340    /// 10 000; if there are more overdue deadlines, returns 10 000 (capped).
341    /// Implementations can override for a more efficient point-count query.
342    ///
343    /// # Errors
344    ///
345    /// Returns [`EngineError::Deadline`] on storage failure.
346    ///
347    /// [`due_now`]: DeadlineStore::due_now
348    async fn overdue_count(&self) -> Result<usize, EngineError> {
349        const LIMIT: usize = 10_000;
350        let result = self.due_now(LIMIT).await?;
351        Ok(if result.has_more {
352            LIMIT
353        } else {
354            result.deadlines.len()
355        })
356    }
357}
358
359// ── Arc<S> blanket impl ───────────────────────────────────────────────────────
360
361impl<S: DeadlineStore> DeadlineStore for Arc<S> {
362    async fn register(&self, deadline: &Deadline) -> Result<(), EngineError> {
363        self.as_ref().register(deadline).await
364    }
365
366    async fn cancel(&self, id: DeadlineId) -> Result<(), EngineError> {
367        self.as_ref().cancel(id).await
368    }
369
370    async fn due_now(&self, limit: usize) -> Result<DueNowResult, EngineError> {
371        self.as_ref().due_now(limit).await
372    }
373
374    async fn for_stream(&self, stream_id: &StreamId) -> Result<Vec<Deadline>, EngineError> {
375        self.as_ref().for_stream(stream_id).await
376    }
377
378    async fn len(&self) -> Result<usize, EngineError> {
379        self.as_ref().len().await
380    }
381
382    async fn overdue_count(&self) -> Result<usize, EngineError> {
383        self.as_ref().overdue_count().await
384    }
385}
386
387// ── NoopDeadlineStore ─────────────────────────────────────────────────────────
388
389/// A [`DeadlineStore`] that never persists anything.
390///
391/// `register` succeeds silently; `due_now` always returns an empty list.
392/// Use this as the default when deadline tracking is not needed.
393///
394/// # ⚠️ Silent deadline loss
395///
396/// `NoopDeadlineStore` **discards every deadline registration silently**. No
397/// scheduler timeout will ever fire. Missed deadlines are a compliance
398/// violation under BNetzA monitoring. Do not use in production.
399///
400/// This type is available in all build configurations so it can serve as a
401/// default type parameter in [`EngineBuilder`]. However, [`EngineBuilder::new`]
402/// (which wires this as the default) is only available with the `testing`
403/// feature or in `cfg(test)`. Production binaries must call
404/// [`EngineBuilder::with_stores`] instead.
405///
406/// [`EngineBuilder`]: crate::builder::EngineBuilder
407/// [`EngineBuilder::new`]: crate::builder::EngineBuilder::new
408/// [`EngineBuilder::with_stores`]: crate::builder::EngineBuilder::with_stores
409#[derive(Debug, Clone, Copy, Default)]
410#[must_use = "NoopDeadlineStore discards all deadlines silently — use a persistent DeadlineStore in production"]
411pub struct NoopDeadlineStore;
412
413#[cfg(any(test, feature = "testing"))]
414impl DeadlineStore for NoopDeadlineStore {
415    async fn register(&self, _deadline: &Deadline) -> Result<(), EngineError> {
416        Ok(())
417    }
418
419    async fn cancel(&self, _id: DeadlineId) -> Result<(), EngineError> {
420        Ok(())
421    }
422
423    async fn due_now(&self, _limit: usize) -> Result<DueNowResult, EngineError> {
424        Ok(DueNowResult {
425            deadlines: Vec::new(),
426            has_more: false,
427        })
428    }
429
430    async fn for_stream(&self, _stream_id: &StreamId) -> Result<Vec<Deadline>, EngineError> {
431        Ok(Vec::new())
432    }
433
434    async fn len(&self) -> Result<usize, EngineError> {
435        Ok(0)
436    }
437}
438
439// ── InMemoryDeadlineStore ─────────────────────────────────────────────────────
440
441/// An in-memory [`DeadlineStore`] for tests and development.
442///
443/// Backed by a `HashMap` protected by a `Mutex`. Cloning shares the
444/// underlying data via `Arc` — all clones see the same deadlines.
445///
446/// **Not production-safe.** Use this for:
447/// - Unit and integration tests
448/// - Examples and local development
449/// - Verifying the scheduler loop without an external timer service
450///
451/// Only available in `#[cfg(test)]` or with the `testing` feature enabled.
452#[cfg(any(test, feature = "testing"))]
453#[derive(Debug, Default, Clone)]
454pub struct InMemoryDeadlineStore {
455    inner: Arc<RwLock<HashMap<DeadlineId, Deadline>>>,
456}
457
458#[cfg(any(test, feature = "testing"))]
459impl InMemoryDeadlineStore {
460    /// Create an empty deadline store.
461    #[must_use]
462    pub fn new() -> Self {
463        Self::default()
464    }
465
466    /// Return `true` when no deadlines are registered.
467    pub async fn is_empty(&self) -> bool {
468        self.inner.read().await.is_empty()
469    }
470}
471
472#[cfg(any(test, feature = "testing"))]
473impl DeadlineStore for InMemoryDeadlineStore {
474    async fn register(&self, deadline: &Deadline) -> Result<(), EngineError> {
475        self.inner
476            .write()
477            .await
478            .insert(deadline.deadline_id, deadline.clone());
479        Ok(())
480    }
481
482    async fn cancel(&self, id: DeadlineId) -> Result<(), EngineError> {
483        self.inner.write().await.remove(&id);
484        Ok(())
485    }
486
487    async fn due_now(&self, limit: usize) -> Result<DueNowResult, EngineError> {
488        let now = OffsetDateTime::now_utc();
489        let map = self.inner.read().await;
490        let mut due: Vec<_> = map.values().filter(|d| d.is_due(now)).cloned().collect();
491        // Soonest-first: the scheduler processes the most urgent deadlines first.
492        due.sort_by_key(|d| d.due_at);
493        // Probe one extra to detect whether more remain.
494        let has_more = due.len() > limit;
495        due.truncate(limit);
496        Ok(DueNowResult {
497            deadlines: due,
498            has_more,
499        })
500    }
501
502    async fn for_stream(&self, stream_id: &StreamId) -> Result<Vec<Deadline>, EngineError> {
503        let map = self.inner.read().await;
504        Ok(map
505            .values()
506            .filter(|d| &d.stream_id == stream_id)
507            .cloned()
508            .collect())
509    }
510
511    async fn len(&self) -> Result<usize, EngineError> {
512        Ok(self.inner.read().await.len())
513    }
514}
515
516#[cfg(test)]
517mod tests {
518    use super::*;
519    use time::Duration;
520
521    fn make_deadline(due_at: OffsetDateTime) -> Deadline {
522        Deadline::new(
523            StreamId::new("process/test"),
524            ProcessId::new(),
525            TenantId::new(),
526            WorkflowId::new("test-workflow", "FV2025-10-01"),
527            "aperak-response-window",
528            due_at,
529        )
530    }
531
532    #[tokio::test]
533    async fn register_and_cancel() {
534        let store = InMemoryDeadlineStore::new();
535        let d = make_deadline(OffsetDateTime::now_utc() + Duration::days(5));
536        let id = d.deadline_id;
537
538        store.register(&d).await.unwrap();
539        assert_eq!(store.len().await.unwrap(), 1);
540
541        store.cancel(id).await.unwrap();
542        assert!(store.is_empty().await);
543    }
544
545    #[tokio::test]
546    async fn due_now_only_returns_overdue() {
547        let store = InMemoryDeadlineStore::new();
548        let past = make_deadline(OffsetDateTime::now_utc() - Duration::seconds(1));
549        let future = make_deadline(OffsetDateTime::now_utc() + Duration::days(5));
550
551        store.register(&past).await.unwrap();
552        store.register(&future).await.unwrap();
553
554        let due = store.due_now(100).await.unwrap();
555        assert_eq!(due.deadlines.len(), 1);
556        assert_eq!(due.deadlines[0].label.as_ref(), "aperak-response-window");
557        assert!(!due.has_more);
558    }
559
560    #[tokio::test]
561    async fn due_now_ordered_soonest_first() {
562        let store = InMemoryDeadlineStore::new();
563        let t1 = OffsetDateTime::now_utc() - Duration::seconds(60);
564        let t2 = OffsetDateTime::now_utc() - Duration::seconds(10);
565        let t3 = OffsetDateTime::now_utc() - Duration::seconds(1);
566
567        // Register out of order to verify sorting.
568        store.register(&make_deadline(t3)).await.unwrap();
569        store.register(&make_deadline(t1)).await.unwrap();
570        store.register(&make_deadline(t2)).await.unwrap();
571
572        let due = store.due_now(10).await.unwrap();
573        assert_eq!(due.deadlines.len(), 3);
574        assert!(due.deadlines[0].due_at <= due.deadlines[1].due_at);
575        assert!(due.deadlines[1].due_at <= due.deadlines[2].due_at);
576        assert!(!due.has_more);
577    }
578
579    #[tokio::test]
580    async fn for_stream_filters_by_stream() {
581        let store = InMemoryDeadlineStore::new();
582        let stream1 = StreamId::new("process/aaa");
583        let stream2 = StreamId::new("process/bbb");
584        let d1 = Deadline::new(
585            stream1.clone(),
586            ProcessId::new(),
587            TenantId::new(),
588            WorkflowId::new("test-workflow", "FV2025-10-01"),
589            "label",
590            OffsetDateTime::now_utc() + Duration::days(1),
591        );
592        let d2 = Deadline::new(
593            stream2.clone(),
594            ProcessId::new(),
595            TenantId::new(),
596            WorkflowId::new("test-workflow", "FV2025-10-01"),
597            "label",
598            OffsetDateTime::now_utc() + Duration::days(1),
599        );
600
601        store.register(&d1).await.unwrap();
602        store.register(&d2).await.unwrap();
603
604        let for1 = store.for_stream(&stream1).await.unwrap();
605        assert_eq!(for1.len(), 1);
606        assert_eq!(for1[0].stream_id, stream1);
607    }
608
609    #[tokio::test]
610    async fn register_upserts_on_same_id() {
611        let store = InMemoryDeadlineStore::new();
612        let mut d = make_deadline(OffsetDateTime::now_utc() + Duration::days(5));
613        store.register(&d).await.unwrap();
614
615        let new_due = OffsetDateTime::now_utc() + Duration::days(10);
616        d.due_at = new_due;
617        store.register(&d).await.unwrap();
618
619        assert_eq!(
620            store.len().await.unwrap(),
621            1,
622            "upsert must not create a duplicate"
623        );
624        let found = store.for_stream(&d.stream_id).await.unwrap();
625        assert_eq!(found[0].due_at, new_due);
626    }
627
628    #[tokio::test]
629    async fn noop_store_succeeds_silently() {
630        let store = NoopDeadlineStore;
631        let d = make_deadline(OffsetDateTime::now_utc() - Duration::seconds(1));
632        store.register(&d).await.unwrap();
633        assert!(store.due_now(10).await.unwrap().deadlines.is_empty());
634        assert!(store.is_empty().await.unwrap());
635    }
636
637    #[tokio::test]
638    async fn clone_shares_state() {
639        let store1 = InMemoryDeadlineStore::new();
640        let store2 = store1.clone();
641        let d = make_deadline(OffsetDateTime::now_utc() + Duration::days(1));
642        store1.register(&d).await.unwrap();
643        assert_eq!(store2.len().await.unwrap(), 1);
644    }
645
646    #[tokio::test]
647    async fn due_now_has_more_signals_truncation() {
648        let store = InMemoryDeadlineStore::new();
649        let past = OffsetDateTime::now_utc() - Duration::seconds(1);
650        for _ in 0..5 {
651            store.register(&make_deadline(past)).await.unwrap();
652        }
653
654        // Request fewer than available — has_more must be true.
655        let r = store.due_now(3).await.unwrap();
656        assert_eq!(r.deadlines.len(), 3);
657        assert!(r.has_more);
658
659        // Request all — has_more must be false.
660        let r2 = store.due_now(10).await.unwrap();
661        assert_eq!(r2.deadlines.len(), 5);
662        assert!(!r2.has_more);
663    }
664}