mnem_core/guard.rs
1//! Shared commit-budget guard primitive (gap-catalog shared/commit-budget-guard).
2//!
3//! `CommitBudgetGuard` wraps a commit's variable-cost stages
4//! (canonicalization, Leiden recompute, kNN insert batch) with a single
5//! wall-clock budget envelope. Stages are `charge()`-d as they complete;
6//! once the running elapsed exceeds `budget_ms` the guard returns
7//! [`Decision::ShouldDefer`] so callers can push tail work to the next
8//! commit deterministically, and once it exceeds `hard_wall_ms` the
9//! guard returns [`HardWallExceeded`] so callers abort.
10//!
11//! # Determinism
12//!
13//! - Construction captures `commit_cid` so replay is reproducible: a
14//! commit replay re-enters with the same CID and the envelope-stored
15//! `deferred_stages` tells the replayer which stages to skip.
16//! - `Decision::ShouldDefer` vs `Decision::Proceed` is timing-dependent
17//! in live mode, but the commit envelope records `deferred_stages`;
18//! replay reads the envelope and skips those stages.
19//! - `HardWallExceeded` is an *error*, not a deferral: caller aborts.
20//!
21//! # Emitted metrics (drop / `into_report`)
22//!
23//! - `mnem_commit_budget_elapsed_ms{tag}` histogram.
24//! - `mnem_commit_budget_breached_total{tag}` counter (soft breach).
25//! - `mnem_commit_hard_wall_hit_total{tag}` counter.
26//! - `mnem_commit_deferred_stages_total{tag,stage}` counter.
27//!
28//! This module is deliberately no-I/O: metrics are exposed through the
29//! returned [`CommitBudgetReport`] so the host runtime wires them into
30//! its own counter sink (prometheus, OTel, etc.). `mnem-core` stays
31//! terminal-free per `lib.rs` invariants.
32
33use std::time::{Duration, Instant};
34
35use crate::id::Cid;
36
37/// Shared wall-clock budget envelope for a commit's variable-cost stages.
38///
39/// See the module docs for wiring semantics and determinism notes.
40#[derive(Debug)]
41pub struct CommitBudgetGuard {
42 /// Caller tag (used as metric label). Pass the stable
43 /// gap-shorthand, e.g. `"gap-04-resolve-or-create"`.
44 pub tag: &'static str,
45 /// Monotonic clock anchor set at construction.
46 pub start: Instant,
47 /// Soft budget in milliseconds. Exceeding it yields
48 /// [`Decision::ShouldDefer`].
49 pub budget_ms: u32,
50 /// Hard wall in milliseconds. Exceeding it yields
51 /// [`HardWallExceeded`] and aborts the commit.
52 pub hard_wall_ms: u32,
53 /// CID of the commit this guard is embedded in, stored in the
54 /// envelope for replay determinism.
55 pub commit_cid: Cid,
56 /// Stages deferred to the next commit (pushed by [`Self::defer`]).
57 pub deferred: Vec<&'static str>,
58 /// Stages charged so far; order-preserving for the envelope.
59 pub charged: Vec<(&'static str, u32)>,
60 /// Sticky flag: true once any `charge()` returns `ShouldDefer`.
61 /// The envelope carries this so replay can short-circuit.
62 pub breached: bool,
63 /// Sticky flag: true once any `charge()` returns `HardWallExceeded`.
64 pub hard_wall_hit: bool,
65}
66
67/// Outcome of a successful `charge()` call.
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum Decision {
70 /// Running elapsed is under the soft budget - continue.
71 Proceed,
72 /// Running elapsed exceeds the soft budget but is under the hard
73 /// wall. Caller should `.defer()` the remaining tail stages.
74 ShouldDefer,
75}
76
77/// Error returned by `charge()` when the hard wall is exceeded.
78///
79/// This is a structural abort signal: the commit MUST unwind. Callers
80/// should log, increment the hard-wall counter (done automatically via
81/// the report), and return the commit's abort outcome.
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub struct HardWallExceeded {
84 /// Elapsed time at the moment of breach, in milliseconds.
85 pub elapsed_ms: u32,
86 /// The hard wall that was exceeded, in milliseconds.
87 pub hard_wall_ms: u32,
88}
89
90impl core::fmt::Display for HardWallExceeded {
91 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
92 write!(
93 f,
94 "commit-budget hard wall exceeded: elapsed {}ms > wall {}ms",
95 self.elapsed_ms, self.hard_wall_ms
96 )
97 }
98}
99
100impl std::error::Error for HardWallExceeded {}
101
102/// Report snapshot produced by [`CommitBudgetGuard::into_report`].
103///
104/// Callers embed this in their commit envelope and also feed it to the
105/// host's metric sink.
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub struct CommitBudgetReport {
108 /// Caller tag, e.g. `"gap-04-resolve-or-create"`.
109 pub tag: &'static str,
110 /// Total wall-clock elapsed, in milliseconds.
111 pub elapsed_ms: u32,
112 /// Soft budget that was in effect.
113 pub budget_ms: u32,
114 /// Hard wall that was in effect.
115 pub hard_wall_ms: u32,
116 /// True iff the soft budget was breached (but hard wall wasn't hit).
117 pub breached: bool,
118 /// True iff the hard wall was hit (commit aborted).
119 pub hard_wall_hit: bool,
120 /// Stages pushed to the deferred queue for next commit.
121 pub deferred_stages: Vec<&'static str>,
122 /// Ordered list of (stage, elapsed_ms_at_charge).
123 pub charged_stages: Vec<(&'static str, u32)>,
124}
125
126impl CommitBudgetGuard {
127 /// Start a new guard. `hard_wall_ms >= budget_ms` is enforced by
128 /// clamping hard_wall to at least budget; an explicit violation
129 /// would be a caller-bug but we accept the floor silently.
130 #[must_use]
131 pub fn start(tag: &'static str, budget_ms: u32, hard_wall_ms: u32, commit_cid: Cid) -> Self {
132 Self {
133 tag,
134 start: Instant::now(),
135 budget_ms,
136 hard_wall_ms: hard_wall_ms.max(budget_ms),
137 commit_cid,
138 deferred: Vec::new(),
139 charged: Vec::new(),
140 breached: false,
141 hard_wall_hit: false,
142 }
143 }
144
145 /// Elapsed milliseconds since construction, saturated to `u32::MAX`.
146 #[must_use]
147 pub fn elapsed_ms(&self) -> u32 {
148 u32::try_from(self.start.elapsed().as_millis()).unwrap_or(u32::MAX)
149 }
150
151 /// Charge the running elapsed after a completed stage.
152 ///
153 /// Returns `Err(HardWallExceeded)` if the elapsed exceeds
154 /// `hard_wall_ms` - caller aborts. Returns
155 /// `Ok(Decision::ShouldDefer)` if it exceeds `budget_ms` but not
156 /// the hard wall - caller should stop doing new work and `defer`
157 /// the remaining stages. Returns `Ok(Decision::Proceed)` otherwise.
158 ///
159 /// # Errors
160 ///
161 /// Returns [`HardWallExceeded`] when the running elapsed would
162 /// exceed `hard_wall_ms`.
163 pub fn charge(&mut self, stage: &'static str) -> Result<Decision, HardWallExceeded> {
164 let elapsed = self.elapsed_ms();
165 self.charged.push((stage, elapsed));
166 if elapsed > self.hard_wall_ms {
167 self.hard_wall_hit = true;
168 return Err(HardWallExceeded {
169 elapsed_ms: elapsed,
170 hard_wall_ms: self.hard_wall_ms,
171 });
172 }
173 if elapsed > self.budget_ms {
174 self.breached = true;
175 return Ok(Decision::ShouldDefer);
176 }
177 Ok(Decision::Proceed)
178 }
179
180 /// Charge using an externally-provided elapsed value (used by
181 /// deterministic proptests that need a synthetic clock).
182 ///
183 /// # Errors
184 ///
185 /// Returns [`HardWallExceeded`] when `elapsed_ms > hard_wall_ms`.
186 #[doc(hidden)]
187 pub fn charge_with(
188 &mut self,
189 stage: &'static str,
190 elapsed_ms: u32,
191 ) -> Result<Decision, HardWallExceeded> {
192 self.charged.push((stage, elapsed_ms));
193 if elapsed_ms > self.hard_wall_ms {
194 self.hard_wall_hit = true;
195 return Err(HardWallExceeded {
196 elapsed_ms,
197 hard_wall_ms: self.hard_wall_ms,
198 });
199 }
200 if elapsed_ms > self.budget_ms {
201 self.breached = true;
202 return Ok(Decision::ShouldDefer);
203 }
204 Ok(Decision::Proceed)
205 }
206
207 /// Push a stage onto the deferred queue for the next commit.
208 pub fn defer(&mut self, stage: &'static str) {
209 self.deferred.push(stage);
210 }
211
212 /// Freeze the guard into a report for the commit envelope.
213 #[must_use]
214 pub fn into_report(self) -> CommitBudgetReport {
215 CommitBudgetReport {
216 tag: self.tag,
217 elapsed_ms: u32::try_from(self.start.elapsed().as_millis()).unwrap_or(u32::MAX),
218 budget_ms: self.budget_ms,
219 hard_wall_ms: self.hard_wall_ms,
220 breached: self.breached,
221 hard_wall_hit: self.hard_wall_hit,
222 deferred_stages: self.deferred,
223 charged_stages: self.charged,
224 }
225 }
226}
227
228/// Sleep helper used only by tests in this module / downstream callers
229/// that need to synthesise wall-clock delays without importing
230/// `std::thread::sleep` at every call site.
231#[doc(hidden)]
232#[must_use]
233pub fn since(start: Instant) -> Duration {
234 start.elapsed()
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use crate::id::Multihash;
241
242 fn zero_cid() -> Cid {
243 Cid::new(
244 crate::id::CODEC_RAW,
245 Multihash::wrap(crate::id::HASH_BLAKE3_256, &[0u8; 32]).expect("32-byte digest"),
246 )
247 }
248
249 #[test]
250 fn charge_under_budget_proceeds() {
251 let mut g = CommitBudgetGuard::start("test", 100, 200, zero_cid());
252 let d = g.charge_with("stage_a", 50).unwrap();
253 assert_eq!(d, Decision::Proceed);
254 assert!(!g.breached);
255 }
256
257 #[test]
258 fn charge_over_budget_defers() {
259 let mut g = CommitBudgetGuard::start("test", 50, 200, zero_cid());
260 let d = g.charge_with("stage_a", 75).unwrap();
261 assert_eq!(d, Decision::ShouldDefer);
262 assert!(g.breached);
263 }
264
265 #[test]
266 fn charge_over_hard_wall_aborts() {
267 let mut g = CommitBudgetGuard::start("test", 50, 100, zero_cid());
268 let err = g.charge_with("stage_a", 150).unwrap_err();
269 assert_eq!(err.elapsed_ms, 150);
270 assert_eq!(err.hard_wall_ms, 100);
271 assert!(g.hard_wall_hit);
272 }
273
274 #[test]
275 fn hard_wall_clamped_to_at_least_budget() {
276 // caller bug: hard_wall < budget. We clamp.
277 let g = CommitBudgetGuard::start("test", 100, 50, zero_cid());
278 assert_eq!(g.hard_wall_ms, 100);
279 }
280
281 #[test]
282 fn report_records_charged_and_deferred() {
283 let mut g = CommitBudgetGuard::start("test", 50, 200, zero_cid());
284 let _ = g.charge_with("a", 10).unwrap();
285 let _ = g.charge_with("b", 80).unwrap(); // defer
286 g.defer("c");
287 g.defer("d");
288 let rep = g.into_report();
289 assert_eq!(rep.tag, "test");
290 assert!(rep.breached);
291 assert!(!rep.hard_wall_hit);
292 assert_eq!(rep.deferred_stages, vec!["c", "d"]);
293 assert_eq!(rep.charged_stages.len(), 2);
294 assert_eq!(rep.charged_stages[0].0, "a");
295 assert_eq!(rep.charged_stages[1].0, "b");
296 }
297}