hyperi_rustlib/governor/gate.rs
1// Project: hyperi-rustlib
2// File: src/governor/gate.rs
3// Purpose: Inbound gate: edge-detecting pause/resume over the pressure latch
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Inbound gate: drives an actuator on each pause/resume transition.
10//!
11//! The [`UnifiedPressure`] latch tells us *whether* to hold; the
12//! [`InboundGate`] turns that latched boolean into EDGE events. Each
13//! [`evaluate`](InboundGate::evaluate) samples the latch and, on a
14//! transition only, calls the [`GateActuator`] exactly once -- `pause()`
15//! on the false->true (rising) edge, `resume()` on the true->false
16//! (falling) edge. While the latch stays held, repeated `evaluate()`
17//! calls return [`Admit::Hold`] but do NOT re-call `pause()`; likewise a
18//! released latch returns [`Admit::Yes`] without re-calling `resume()`.
19//!
20//! INBOUND side only: the actuator pauses a source's recv/ingest (stops
21//! pulling new work) so the in-flight buffer drains under pressure. NEVER
22//! wired to the outbound drain (sink) -- gating the drain would deadlock the
23//! pipeline. `send` is never involved here.
24//!
25//! Gated behind the `governor` feature; wired into the receive transport by
26//! [`SelfRegulationGovernor`](super::SelfRegulationGovernor) (default-on).
27
28use std::sync::Arc;
29use std::sync::atomic::{AtomicBool, Ordering};
30
31use super::source::UnifiedPressure;
32
33/// Drives the inbound source on pause/resume edges.
34///
35/// Implementors translate a gate edge into a concrete action on the
36/// ingest side -- e.g. stop polling a Kafka consumer, stop accepting on an
37/// HTTP listener. The gate guarantees each method fires EXACTLY ONCE per
38/// transition, so an implementation is free to be non-idempotent (toggle
39/// a flag, pause/resume a stream) without double-pausing.
40pub trait GateActuator: Send + Sync {
41 /// Pause the inbound source. Called once on the rising edge.
42 fn pause(&self);
43 /// Resume the inbound source. Called once on the falling edge.
44 fn resume(&self);
45}
46
47/// An observability decorator over a [`GateActuator`].
48///
49/// Wrap the real actuator (Kafka pause/resume, a [`NoopActuator`], etc.) so
50/// each pause/resume EDGE emits a metric + brake-reason log, then forwards to
51/// the inner actuator. Because the [`InboundGate`] fires each edge EXACTLY
52/// ONCE, the `inbound_paused` gauge and `self_regulation_inbound_pauses_total`
53/// counter track real transitions, not per-evaluate noise -- a `paused`/
54/// `resumed` log pair and a gauge dashboards can graph.
55pub struct ObservingActuator {
56 inner: Box<dyn GateActuator>,
57 /// Stable source label for the log line (e.g. `"kafka"`, `"http"`).
58 source: &'static str,
59}
60
61impl ObservingActuator {
62 /// Wrap `inner` so pause/resume edges emit metrics + logs under `source`.
63 #[must_use]
64 pub fn new(source: &'static str, inner: Box<dyn GateActuator>) -> Self {
65 Self { inner, source }
66 }
67}
68
69impl GateActuator for ObservingActuator {
70 fn pause(&self) {
71 // `self_regulation_` domain prefix + a `source` label: a bare
72 // `inbound_paused` collides with nothing today but reads ambiguously
73 // next to `MemoryGuard`/`ScalingPressure` gauges, and the label lets two
74 // governed receivers (e.g. Kafka + HTTP) be told apart -- without it the
75 // single global series shows "unpaused" while one source is still held.
76 #[cfg(feature = "metrics")]
77 {
78 ::metrics::gauge!("self_regulation_inbound_paused", "source" => self.source).set(1.0);
79 ::metrics::counter!("self_regulation_inbound_pauses_total", "source" => self.source)
80 .increment(1);
81 }
82 tracing::warn!(
83 source = self.source,
84 "self-regulation: inbound PAUSED under pressure (memory/back-pressure brake)"
85 );
86 self.inner.pause();
87 }
88
89 fn resume(&self) {
90 #[cfg(feature = "metrics")]
91 ::metrics::gauge!("self_regulation_inbound_paused", "source" => self.source).set(0.0);
92 tracing::info!(
93 source = self.source,
94 "self-regulation: inbound RESUMED, pressure cleared"
95 );
96 self.inner.resume();
97 }
98}
99
100/// A no-op actuator for tests and send-only pipelines.
101///
102/// Useful when a stage wants the gate's [`Admit`] decision (to stop
103/// pulling work in its own loop) but has nothing external to pause --
104/// the gate's held/released state is still observable via
105/// [`InboundGate::is_held`].
106#[derive(Debug, Default, Clone, Copy)]
107pub struct NoopActuator;
108
109impl GateActuator for NoopActuator {
110 fn pause(&self) {}
111 fn resume(&self) {}
112}
113
114/// The gate's admission decision for the next unit of inbound work.
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum Admit {
117 /// Admit the next unit -- the gate is open.
118 Yes,
119 /// Hold (do not admit) -- the gate is closed under pressure.
120 Hold,
121}
122
123/// Edge-detecting inbound gate over a [`UnifiedPressure`] latch.
124///
125/// Wraps the recv/ingest side of a stage. Each
126/// [`evaluate`](Self::evaluate) consults the latch and drives the
127/// [`GateActuator`] once per transition. See the
128/// [module docs](crate::governor) for the full contract.
129pub struct InboundGate {
130 pressure: Arc<UnifiedPressure>,
131 actuator: Box<dyn GateActuator>,
132 /// Last edge state we drove the actuator to. Tracked separately from
133 /// the pressure latch so the actuator fires EXACTLY ONCE per
134 /// transition even though `should_hold()` returns `true` repeatedly
135 /// while latched.
136 paused_edge: AtomicBool,
137}
138
139impl InboundGate {
140 /// Build a gate over a shared pressure latch and an actuator.
141 ///
142 /// The gate starts in the released (open) state; the first
143 /// [`evaluate`](Self::evaluate) under pressure will fire `pause()`.
144 #[must_use]
145 pub fn new(pressure: Arc<UnifiedPressure>, actuator: Box<dyn GateActuator>) -> Self {
146 Self {
147 pressure,
148 actuator,
149 paused_edge: AtomicBool::new(false),
150 }
151 }
152
153 /// Sample the latch and drive the actuator on a transition.
154 ///
155 /// Computes [`should_hold`](UnifiedPressure::should_hold) from the
156 /// pressure, then uses a `compare_exchange` on the edge flag so:
157 ///
158 /// - false->true (rising edge): `compare_exchange(false, true)`
159 /// succeeds exactly once -> call `pause()`.
160 /// - true->false (falling edge): `compare_exchange(true, false)`
161 /// succeeds exactly once -> call `resume()`.
162 /// - no change: `compare_exchange` fails -> no actuator call.
163 ///
164 /// Returns [`Admit::Hold`] when held, [`Admit::Yes`] otherwise. Never
165 /// touches the outbound side.
166 ///
167 /// # Single-evaluator contract
168 ///
169 /// `evaluate()` MUST be called from a SINGLE task (the one recv loop that
170 /// owns this source). The edge `compare_exchange` and the actuator call are
171 /// not one atomic step: two tasks racing across a pressure transition could
172 /// interleave so the actuator (an async pause/resume on the consumer) runs
173 /// out of order, stranding the source paused while the flag reads open. The
174 /// in-tree wiring satisfies this -- each transport's recv loop calls
175 /// `evaluate()` once per `recv`. Do NOT share one `InboundGate` across
176 /// concurrent evaluators; give each source its own gate.
177 pub fn evaluate(&self) -> Admit {
178 let hold = self.pressure.should_hold();
179 if hold {
180 // Rising edge: flip false -> true exactly once.
181 if self
182 .paused_edge
183 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
184 .is_ok()
185 {
186 self.actuator.pause();
187 }
188 Admit::Hold
189 } else {
190 // Falling edge: flip true -> false exactly once.
191 if self
192 .paused_edge
193 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
194 .is_ok()
195 {
196 self.actuator.resume();
197 }
198 Admit::Yes
199 }
200 }
201
202 /// Whether the gate last drove the actuator to the held state.
203 ///
204 /// Reflects the edge flag, not a fresh pressure sample -- it is the
205 /// state the actuator has been driven to by the most recent
206 /// [`evaluate`](Self::evaluate).
207 #[must_use]
208 pub fn is_held(&self) -> bool {
209 self.paused_edge.load(Ordering::Acquire)
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216 use crate::governor::source::{Hysteresis, Pressure, PressureSource};
217 use std::sync::atomic::{AtomicU64, AtomicUsize};
218
219 /// Scriptable pressure source (mirrors the G1 test double): stores the
220 /// reading as a bit-pattern `u64` so it stays `Sync` without `unsafe`
221 /// or a lock.
222 struct MockSource {
223 value: AtomicU64,
224 hard: bool,
225 }
226
227 impl MockSource {
228 fn new(value: f64, hard: bool) -> Self {
229 Self {
230 value: AtomicU64::new(value.to_bits()),
231 hard,
232 }
233 }
234 fn set(&self, value: f64) {
235 self.value.store(value.to_bits(), Ordering::Relaxed);
236 }
237 }
238
239 impl PressureSource for MockSource {
240 fn name(&self) -> &'static str {
241 "mock"
242 }
243 fn sample(&self) -> Pressure {
244 Pressure::new(f64::from_bits(self.value.load(Ordering::Relaxed)))
245 }
246 fn is_hard(&self) -> bool {
247 self.hard
248 }
249 }
250
251 /// Counting actuator: records exactly how many times each edge fired.
252 struct CountingActuator {
253 pause_calls: AtomicUsize,
254 resume_calls: AtomicUsize,
255 }
256
257 impl CountingActuator {
258 fn new() -> Self {
259 Self {
260 pause_calls: AtomicUsize::new(0),
261 resume_calls: AtomicUsize::new(0),
262 }
263 }
264 fn pauses(&self) -> usize {
265 self.pause_calls.load(Ordering::Relaxed)
266 }
267 fn resumes(&self) -> usize {
268 self.resume_calls.load(Ordering::Relaxed)
269 }
270 }
271
272 impl GateActuator for CountingActuator {
273 fn pause(&self) {
274 self.pause_calls.fetch_add(1, Ordering::Relaxed);
275 }
276 fn resume(&self) {
277 self.resume_calls.fetch_add(1, Ordering::Relaxed);
278 }
279 }
280
281 /// A `GateActuator` that forwards to a shared `Arc<CountingActuator>`
282 /// so the test can both hand the gate an actuator AND inspect the
283 /// counts afterwards (the gate takes a `Box`, consuming ownership).
284 struct SharedActuator(Arc<CountingActuator>);
285
286 impl GateActuator for SharedActuator {
287 fn pause(&self) {
288 self.0.pause();
289 }
290 fn resume(&self) {
291 self.0.resume();
292 }
293 }
294
295 fn governor_with(source: Arc<MockSource>) -> Arc<UnifiedPressure> {
296 let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
297 Arc::new(UnifiedPressure::new(
298 vec![source as Arc<dyn PressureSource>],
299 hyst,
300 ))
301 }
302
303 /// THE adversarial proving test for the gate.
304 ///
305 /// Drives one gate through low->high->high->low->low->high and proves:
306 /// 1. `pause()` fires EXACTLY ONCE per rising edge (not once per
307 /// `evaluate()` while latched);
308 /// 2. `resume()` fires EXACTLY ONCE per falling edge;
309 /// 3. `evaluate()` returns `Hold` while latched, `Yes` otherwise;
310 /// 4. the latch re-arms cleanly (the second rising edge fires
311 /// `pause()` again -- no sticky state).
312 #[test]
313 fn gate_drives_actuator_exactly_once_per_edge() {
314 let mem = Arc::new(MockSource::new(0.10, true));
315 let pressure = governor_with(Arc::clone(&mem));
316 let counter = Arc::new(CountingActuator::new());
317 let gate = InboundGate::new(
318 Arc::clone(&pressure),
319 Box::new(SharedActuator(Arc::clone(&counter))),
320 );
321
322 // LOW: open, no actuator calls yet.
323 assert_eq!(gate.evaluate(), Admit::Yes);
324 assert!(!gate.is_held());
325 assert_eq!(counter.pauses(), 0);
326 assert_eq!(counter.resumes(), 0);
327
328 // RISING edge: 0.10 -> 0.90 (>= pause_above) -> pause() ONCE.
329 mem.set(0.90);
330 assert_eq!(gate.evaluate(), Admit::Hold);
331 assert!(gate.is_held());
332 assert_eq!(counter.pauses(), 1, "pause once on rising edge");
333 assert_eq!(counter.resumes(), 0);
334
335 // STILL HIGH: latched. Many evaluate()s, still Hold, NO extra
336 // pause() -- this is the edge-dedup invariant.
337 for _ in 0..5 {
338 assert_eq!(gate.evaluate(), Admit::Hold);
339 }
340 assert_eq!(counter.pauses(), 1, "no re-pause while latched");
341 assert_eq!(counter.resumes(), 0);
342
343 // Inside the band (0.70 > resume_below 0.65): latch HOLDS, still
344 // no extra calls.
345 mem.set(0.70);
346 assert_eq!(gate.evaluate(), Admit::Hold);
347 assert_eq!(counter.pauses(), 1, "band holds, no re-pause");
348 assert_eq!(counter.resumes(), 0);
349
350 // FALLING edge: 0.70 -> 0.50 (<= resume_below) -> resume() ONCE.
351 mem.set(0.50);
352 assert_eq!(gate.evaluate(), Admit::Yes);
353 assert!(!gate.is_held());
354 assert_eq!(counter.pauses(), 1);
355 assert_eq!(counter.resumes(), 1, "resume once on falling edge");
356
357 // STILL LOW: open. Many evaluate()s, still Yes, NO extra resume().
358 for _ in 0..5 {
359 assert_eq!(gate.evaluate(), Admit::Yes);
360 }
361 assert_eq!(counter.pauses(), 1);
362 assert_eq!(counter.resumes(), 1, "no re-resume while released");
363
364 // SECOND RISING edge: re-arms cleanly -> pause() AGAIN (count 2).
365 mem.set(0.95);
366 assert_eq!(gate.evaluate(), Admit::Hold);
367 assert!(gate.is_held());
368 assert_eq!(counter.pauses(), 2, "latch re-arms, pause fires again");
369 assert_eq!(counter.resumes(), 1);
370 }
371
372 #[test]
373 fn noop_actuator_gate_still_tracks_held_state() {
374 let mem = Arc::new(MockSource::new(0.10, true));
375 let pressure = governor_with(Arc::clone(&mem));
376 let gate = InboundGate::new(Arc::clone(&pressure), Box::new(NoopActuator));
377
378 assert_eq!(gate.evaluate(), Admit::Yes);
379 assert!(!gate.is_held());
380
381 mem.set(0.90);
382 assert_eq!(gate.evaluate(), Admit::Hold);
383 assert!(gate.is_held());
384
385 mem.set(0.10);
386 assert_eq!(gate.evaluate(), Admit::Yes);
387 assert!(!gate.is_held());
388 }
389}