Skip to main content

mnem_core/
guard.rs

1//! Shared commit-budget guard primitive (gap-catalog shared/commit-budget-guard).
2//!
3//! `CommitBudgetGuard` wraps a commit's variable-cost stages
4//! (canonicalization, Leiden recompute, kNN insert batch) with a single
5//! wall-clock budget envelope. Stages are `charge()`-d as they complete;
6//! once the running elapsed exceeds `budget_ms` the guard returns
7//! [`Decision::ShouldDefer`] so callers can push tail work to the next
8//! commit deterministically, and once it exceeds `hard_wall_ms` the
9//! guard returns [`HardWallExceeded`] so callers abort.
10//!
11//! # Determinism
12//!
13//! - Construction captures `commit_cid` so replay is reproducible: a
14//!   commit replay re-enters with the same CID and the envelope-stored
15//!   `deferred_stages` tells the replayer which stages to skip.
16//! - `Decision::ShouldDefer` vs `Decision::Proceed` is timing-dependent
17//!   in live mode, but the commit envelope records `deferred_stages`;
18//!   replay reads the envelope and skips those stages.
19//! - `HardWallExceeded` is an *error*, not a deferral: caller aborts.
20//!
21//! # Emitted metrics (drop / `into_report`)
22//!
23//! - `mnem_commit_budget_elapsed_ms{tag}` histogram.
24//! - `mnem_commit_budget_breached_total{tag}` counter (soft breach).
25//! - `mnem_commit_hard_wall_hit_total{tag}` counter.
26//! - `mnem_commit_deferred_stages_total{tag,stage}` counter.
27//!
28//! This module is deliberately no-I/O: metrics are exposed through the
29//! returned [`CommitBudgetReport`] so the host runtime wires them into
30//! its own counter sink (prometheus, OTel, etc.). `mnem-core` stays
31//! terminal-free per `lib.rs` invariants.
32
33use std::time::{Duration, Instant};
34
35use crate::id::Cid;
36
37/// Shared wall-clock budget envelope for a commit's variable-cost stages.
38///
39/// See the module docs for wiring semantics and determinism notes.
40#[derive(Debug)]
41pub struct CommitBudgetGuard {
42    /// Caller tag (used as metric label). Pass the stable
43    /// gap-shorthand, e.g. `"gap-04-resolve-or-create"`.
44    pub tag: &'static str,
45    /// Monotonic clock anchor set at construction.
46    pub start: Instant,
47    /// Soft budget in milliseconds. Exceeding it yields
48    /// [`Decision::ShouldDefer`].
49    pub budget_ms: u32,
50    /// Hard wall in milliseconds. Exceeding it yields
51    /// [`HardWallExceeded`] and aborts the commit.
52    pub hard_wall_ms: u32,
53    /// CID of the commit this guard is embedded in, stored in the
54    /// envelope for replay determinism.
55    pub commit_cid: Cid,
56    /// Stages deferred to the next commit (pushed by [`Self::defer`]).
57    pub deferred: Vec<&'static str>,
58    /// Stages charged so far; order-preserving for the envelope.
59    pub charged: Vec<(&'static str, u32)>,
60    /// Sticky flag: true once any `charge()` returns `ShouldDefer`.
61    /// The envelope carries this so replay can short-circuit.
62    pub breached: bool,
63    /// Sticky flag: true once any `charge()` returns `HardWallExceeded`.
64    pub hard_wall_hit: bool,
65}
66
67/// Outcome of a successful `charge()` call.
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum Decision {
70    /// Running elapsed is under the soft budget - continue.
71    Proceed,
72    /// Running elapsed exceeds the soft budget but is under the hard
73    /// wall. Caller should `.defer()` the remaining tail stages.
74    ShouldDefer,
75}
76
77/// Error returned by `charge()` when the hard wall is exceeded.
78///
79/// This is a structural abort signal: the commit MUST unwind. Callers
80/// should log, increment the hard-wall counter (done automatically via
81/// the report), and return the commit's abort outcome.
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub struct HardWallExceeded {
84    /// Elapsed time at the moment of breach, in milliseconds.
85    pub elapsed_ms: u32,
86    /// The hard wall that was exceeded, in milliseconds.
87    pub hard_wall_ms: u32,
88}
89
90impl core::fmt::Display for HardWallExceeded {
91    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
92        write!(
93            f,
94            "commit-budget hard wall exceeded: elapsed {}ms > wall {}ms",
95            self.elapsed_ms, self.hard_wall_ms
96        )
97    }
98}
99
100impl std::error::Error for HardWallExceeded {}
101
102/// Report snapshot produced by [`CommitBudgetGuard::into_report`].
103///
104/// Callers embed this in their commit envelope and also feed it to the
105/// host's metric sink.
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub struct CommitBudgetReport {
108    /// Caller tag, e.g. `"gap-04-resolve-or-create"`.
109    pub tag: &'static str,
110    /// Total wall-clock elapsed, in milliseconds.
111    pub elapsed_ms: u32,
112    /// Soft budget that was in effect.
113    pub budget_ms: u32,
114    /// Hard wall that was in effect.
115    pub hard_wall_ms: u32,
116    /// True iff the soft budget was breached (but hard wall wasn't hit).
117    pub breached: bool,
118    /// True iff the hard wall was hit (commit aborted).
119    pub hard_wall_hit: bool,
120    /// Stages pushed to the deferred queue for next commit.
121    pub deferred_stages: Vec<&'static str>,
122    /// Ordered list of (stage, elapsed_ms_at_charge).
123    pub charged_stages: Vec<(&'static str, u32)>,
124}
125
126impl CommitBudgetGuard {
127    /// Start a new guard. `hard_wall_ms >= budget_ms` is enforced by
128    /// clamping hard_wall to at least budget; an explicit violation
129    /// would be a caller-bug but we accept the floor silently.
130    #[must_use]
131    pub fn start(tag: &'static str, budget_ms: u32, hard_wall_ms: u32, commit_cid: Cid) -> Self {
132        Self {
133            tag,
134            start: Instant::now(),
135            budget_ms,
136            hard_wall_ms: hard_wall_ms.max(budget_ms),
137            commit_cid,
138            deferred: Vec::new(),
139            charged: Vec::new(),
140            breached: false,
141            hard_wall_hit: false,
142        }
143    }
144
145    /// Elapsed milliseconds since construction, saturated to `u32::MAX`.
146    #[must_use]
147    pub fn elapsed_ms(&self) -> u32 {
148        u32::try_from(self.start.elapsed().as_millis()).unwrap_or(u32::MAX)
149    }
150
151    /// Charge the running elapsed after a completed stage.
152    ///
153    /// Returns `Err(HardWallExceeded)` if the elapsed exceeds
154    /// `hard_wall_ms` - caller aborts. Returns
155    /// `Ok(Decision::ShouldDefer)` if it exceeds `budget_ms` but not
156    /// the hard wall - caller should stop doing new work and `defer`
157    /// the remaining stages. Returns `Ok(Decision::Proceed)` otherwise.
158    ///
159    /// # Errors
160    ///
161    /// Returns [`HardWallExceeded`] when the running elapsed would
162    /// exceed `hard_wall_ms`.
163    pub fn charge(&mut self, stage: &'static str) -> Result<Decision, HardWallExceeded> {
164        let elapsed = self.elapsed_ms();
165        self.charged.push((stage, elapsed));
166        if elapsed > self.hard_wall_ms {
167            self.hard_wall_hit = true;
168            return Err(HardWallExceeded {
169                elapsed_ms: elapsed,
170                hard_wall_ms: self.hard_wall_ms,
171            });
172        }
173        if elapsed > self.budget_ms {
174            self.breached = true;
175            return Ok(Decision::ShouldDefer);
176        }
177        Ok(Decision::Proceed)
178    }
179
180    /// Charge using an externally-provided elapsed value (used by
181    /// deterministic proptests that need a synthetic clock).
182    ///
183    /// # Errors
184    ///
185    /// Returns [`HardWallExceeded`] when `elapsed_ms > hard_wall_ms`.
186    #[doc(hidden)]
187    pub fn charge_with(
188        &mut self,
189        stage: &'static str,
190        elapsed_ms: u32,
191    ) -> Result<Decision, HardWallExceeded> {
192        self.charged.push((stage, elapsed_ms));
193        if elapsed_ms > self.hard_wall_ms {
194            self.hard_wall_hit = true;
195            return Err(HardWallExceeded {
196                elapsed_ms,
197                hard_wall_ms: self.hard_wall_ms,
198            });
199        }
200        if elapsed_ms > self.budget_ms {
201            self.breached = true;
202            return Ok(Decision::ShouldDefer);
203        }
204        Ok(Decision::Proceed)
205    }
206
207    /// Push a stage onto the deferred queue for the next commit.
208    pub fn defer(&mut self, stage: &'static str) {
209        self.deferred.push(stage);
210    }
211
212    /// Freeze the guard into a report for the commit envelope.
213    #[must_use]
214    pub fn into_report(self) -> CommitBudgetReport {
215        CommitBudgetReport {
216            tag: self.tag,
217            elapsed_ms: u32::try_from(self.start.elapsed().as_millis()).unwrap_or(u32::MAX),
218            budget_ms: self.budget_ms,
219            hard_wall_ms: self.hard_wall_ms,
220            breached: self.breached,
221            hard_wall_hit: self.hard_wall_hit,
222            deferred_stages: self.deferred,
223            charged_stages: self.charged,
224        }
225    }
226}
227
228/// Sleep helper used only by tests in this module / downstream callers
229/// that need to synthesise wall-clock delays without importing
230/// `std::thread::sleep` at every call site.
231#[doc(hidden)]
232#[must_use]
233pub fn since(start: Instant) -> Duration {
234    start.elapsed()
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use crate::id::Multihash;
241
242    fn zero_cid() -> Cid {
243        Cid::new(
244            crate::id::CODEC_RAW,
245            Multihash::wrap(crate::id::HASH_BLAKE3_256, &[0u8; 32]).expect("32-byte digest"),
246        )
247    }
248
249    #[test]
250    fn charge_under_budget_proceeds() {
251        let mut g = CommitBudgetGuard::start("test", 100, 200, zero_cid());
252        let d = g.charge_with("stage_a", 50).unwrap();
253        assert_eq!(d, Decision::Proceed);
254        assert!(!g.breached);
255    }
256
257    #[test]
258    fn charge_over_budget_defers() {
259        let mut g = CommitBudgetGuard::start("test", 50, 200, zero_cid());
260        let d = g.charge_with("stage_a", 75).unwrap();
261        assert_eq!(d, Decision::ShouldDefer);
262        assert!(g.breached);
263    }
264
265    #[test]
266    fn charge_over_hard_wall_aborts() {
267        let mut g = CommitBudgetGuard::start("test", 50, 100, zero_cid());
268        let err = g.charge_with("stage_a", 150).unwrap_err();
269        assert_eq!(err.elapsed_ms, 150);
270        assert_eq!(err.hard_wall_ms, 100);
271        assert!(g.hard_wall_hit);
272    }
273
274    #[test]
275    fn hard_wall_clamped_to_at_least_budget() {
276        // caller bug: hard_wall < budget. We clamp.
277        let g = CommitBudgetGuard::start("test", 100, 50, zero_cid());
278        assert_eq!(g.hard_wall_ms, 100);
279    }
280
281    #[test]
282    fn report_records_charged_and_deferred() {
283        let mut g = CommitBudgetGuard::start("test", 50, 200, zero_cid());
284        let _ = g.charge_with("a", 10).unwrap();
285        let _ = g.charge_with("b", 80).unwrap(); // defer
286        g.defer("c");
287        g.defer("d");
288        let rep = g.into_report();
289        assert_eq!(rep.tag, "test");
290        assert!(rep.breached);
291        assert!(!rep.hard_wall_hit);
292        assert_eq!(rep.deferred_stages, vec!["c", "d"]);
293        assert_eq!(rep.charged_stages.len(), 2);
294        assert_eq!(rep.charged_stages[0].0, "a");
295        assert_eq!(rep.charged_stages[1].0, "b");
296    }
297}