hyperi_rustlib/governor/gate.rs
1// Project: hyperi-rustlib
2// File: src/governor/gate.rs
3// Purpose: Inbound gate: edge-detecting pause/resume over the pressure latch
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Inbound gate: drives an actuator on each pause/resume transition.
10//!
11//! The [`UnifiedPressure`] latch tells us *whether* to hold; the
12//! [`InboundGate`] turns that latched boolean into EDGE events. Each
13//! [`evaluate`](InboundGate::evaluate) samples the latch and, on a
14//! transition only, calls the [`GateActuator`] exactly once -- `pause()`
15//! on the false->true (rising) edge, `resume()` on the true->false
16//! (falling) edge. While the latch stays held, repeated `evaluate()`
17//! calls return [`Admit::Hold`] but do NOT re-call `pause()`; likewise a
18//! released latch returns [`Admit::Yes`] without re-calling `resume()`.
19//!
20//! This is deliberately the INBOUND side only: the actuator pauses the
21//! recv/ingest of a source (stops pulling new work) so the in-flight
22//! buffer drains under pressure. It is never wired to the outbound drain
23//! (sink) -- gating the drain would deadlock the pipeline. `send` is
24//! never involved here.
25//!
26//! Additive and default-off (the `governor` feature). NOT wired into any
27//! transport, driver, or runtime here; that lands in a later phase.
28
29use std::sync::Arc;
30use std::sync::atomic::{AtomicBool, Ordering};
31
32use super::source::UnifiedPressure;
33
34/// Drives the inbound source on pause/resume edges.
35///
36/// Implementors translate a gate edge into a concrete action on the
37/// ingest side -- e.g. stop polling a Kafka consumer, stop accepting on an
38/// HTTP listener. The gate guarantees each method fires EXACTLY ONCE per
39/// transition, so an implementation is free to be non-idempotent (toggle
40/// a flag, pause/resume a stream) without double-pausing.
41pub trait GateActuator: Send + Sync {
42 /// Pause the inbound source. Called once on the rising edge.
43 fn pause(&self);
44 /// Resume the inbound source. Called once on the falling edge.
45 fn resume(&self);
46}
47
48/// An observability decorator over a [`GateActuator`].
49///
50/// Wrap the real actuator (the Kafka pause/resume actuator, a [`NoopActuator`],
51/// etc.) so each pause/resume EDGE emits a metric and a brake-reason log line,
52/// then forwards to the inner actuator. Because the [`InboundGate`] fires each
53/// edge EXACTLY ONCE, the `inbound_paused` gauge and the
54/// `self_regulation_inbound_pauses_total` counter track real transitions, not
55/// per-evaluate noise.
56///
57/// This makes inbound throttling VISIBLE: a `paused` log on the rising edge and
58/// a `resumed` log on the falling edge, plus a gauge dashboards can graph.
59pub struct ObservingActuator {
60 inner: Box<dyn GateActuator>,
61 /// Stable source label for the log line (e.g. `"kafka"`, `"http"`).
62 source: &'static str,
63}
64
65impl ObservingActuator {
66 /// Wrap `inner` so pause/resume edges emit metrics + logs under `source`.
67 #[must_use]
68 pub fn new(source: &'static str, inner: Box<dyn GateActuator>) -> Self {
69 Self { inner, source }
70 }
71}
72
73impl GateActuator for ObservingActuator {
74 fn pause(&self) {
75 #[cfg(feature = "metrics")]
76 {
77 ::metrics::gauge!("inbound_paused").set(1.0);
78 ::metrics::counter!("self_regulation_inbound_pauses_total").increment(1);
79 }
80 tracing::warn!(
81 source = self.source,
82 "self-regulation: inbound PAUSED under pressure (memory/back-pressure brake)"
83 );
84 self.inner.pause();
85 }
86
87 fn resume(&self) {
88 #[cfg(feature = "metrics")]
89 ::metrics::gauge!("inbound_paused").set(0.0);
90 tracing::info!(
91 source = self.source,
92 "self-regulation: inbound RESUMED, pressure cleared"
93 );
94 self.inner.resume();
95 }
96}
97
98/// A no-op actuator for tests and send-only pipelines.
99///
100/// Useful when a stage wants the gate's [`Admit`] decision (to stop
101/// pulling work in its own loop) but has nothing external to pause --
102/// the gate's held/released state is still observable via
103/// [`InboundGate::is_held`].
104#[derive(Debug, Default, Clone, Copy)]
105pub struct NoopActuator;
106
107impl GateActuator for NoopActuator {
108 fn pause(&self) {}
109 fn resume(&self) {}
110}
111
112/// The gate's admission decision for the next unit of inbound work.
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum Admit {
115 /// Admit the next unit -- the gate is open.
116 Yes,
117 /// Hold (do not admit) -- the gate is closed under pressure.
118 Hold,
119}
120
121/// Edge-detecting inbound gate over a [`UnifiedPressure`] latch.
122///
123/// Wraps the recv/ingest side of a stage. Each
124/// [`evaluate`](Self::evaluate) consults the latch and drives the
125/// [`GateActuator`] once per transition. See the
126/// [module docs](crate::governor) for the full contract.
127pub struct InboundGate {
128 pressure: Arc<UnifiedPressure>,
129 actuator: Box<dyn GateActuator>,
130 /// Last edge state we drove the actuator to. Tracked separately from
131 /// the pressure latch so the actuator fires EXACTLY ONCE per
132 /// transition even though `should_hold()` returns `true` repeatedly
133 /// while latched.
134 paused_edge: AtomicBool,
135}
136
137impl InboundGate {
138 /// Build a gate over a shared pressure latch and an actuator.
139 ///
140 /// The gate starts in the released (open) state; the first
141 /// [`evaluate`](Self::evaluate) under pressure will fire `pause()`.
142 #[must_use]
143 pub fn new(pressure: Arc<UnifiedPressure>, actuator: Box<dyn GateActuator>) -> Self {
144 Self {
145 pressure,
146 actuator,
147 paused_edge: AtomicBool::new(false),
148 }
149 }
150
151 /// Sample the latch and drive the actuator on a transition.
152 ///
153 /// Computes [`should_hold`](UnifiedPressure::should_hold) from the
154 /// pressure, then uses a `compare_exchange` on the edge flag so:
155 ///
156 /// - false->true (rising edge): `compare_exchange(false, true)`
157 /// succeeds exactly once -> call `pause()`.
158 /// - true->false (falling edge): `compare_exchange(true, false)`
159 /// succeeds exactly once -> call `resume()`.
160 /// - no change: `compare_exchange` fails -> no actuator call.
161 ///
162 /// Returns [`Admit::Hold`] when held, [`Admit::Yes`] otherwise. Never
163 /// touches the outbound side.
164 pub fn evaluate(&self) -> Admit {
165 let hold = self.pressure.should_hold();
166 if hold {
167 // Rising edge: flip false -> true exactly once.
168 if self
169 .paused_edge
170 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
171 .is_ok()
172 {
173 self.actuator.pause();
174 }
175 Admit::Hold
176 } else {
177 // Falling edge: flip true -> false exactly once.
178 if self
179 .paused_edge
180 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
181 .is_ok()
182 {
183 self.actuator.resume();
184 }
185 Admit::Yes
186 }
187 }
188
189 /// Whether the gate last drove the actuator to the held state.
190 ///
191 /// Reflects the edge flag, not a fresh pressure sample -- it is the
192 /// state the actuator has been driven to by the most recent
193 /// [`evaluate`](Self::evaluate).
194 #[must_use]
195 pub fn is_held(&self) -> bool {
196 self.paused_edge.load(Ordering::Acquire)
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use crate::governor::source::{Hysteresis, Pressure, PressureSource};
204 use std::sync::atomic::{AtomicU64, AtomicUsize};
205
206 /// Scriptable pressure source (mirrors the G1 test double): stores the
207 /// reading as a bit-pattern `u64` so it stays `Sync` without `unsafe`
208 /// or a lock.
209 struct MockSource {
210 value: AtomicU64,
211 hard: bool,
212 }
213
214 impl MockSource {
215 fn new(value: f64, hard: bool) -> Self {
216 Self {
217 value: AtomicU64::new(value.to_bits()),
218 hard,
219 }
220 }
221 fn set(&self, value: f64) {
222 self.value.store(value.to_bits(), Ordering::Relaxed);
223 }
224 }
225
226 impl PressureSource for MockSource {
227 fn name(&self) -> &'static str {
228 "mock"
229 }
230 fn sample(&self) -> Pressure {
231 Pressure::new(f64::from_bits(self.value.load(Ordering::Relaxed)))
232 }
233 fn is_hard(&self) -> bool {
234 self.hard
235 }
236 }
237
238 /// Counting actuator: records exactly how many times each edge fired.
239 struct CountingActuator {
240 pause_calls: AtomicUsize,
241 resume_calls: AtomicUsize,
242 }
243
244 impl CountingActuator {
245 fn new() -> Self {
246 Self {
247 pause_calls: AtomicUsize::new(0),
248 resume_calls: AtomicUsize::new(0),
249 }
250 }
251 fn pauses(&self) -> usize {
252 self.pause_calls.load(Ordering::Relaxed)
253 }
254 fn resumes(&self) -> usize {
255 self.resume_calls.load(Ordering::Relaxed)
256 }
257 }
258
259 impl GateActuator for CountingActuator {
260 fn pause(&self) {
261 self.pause_calls.fetch_add(1, Ordering::Relaxed);
262 }
263 fn resume(&self) {
264 self.resume_calls.fetch_add(1, Ordering::Relaxed);
265 }
266 }
267
268 /// A `GateActuator` that forwards to a shared `Arc<CountingActuator>`
269 /// so the test can both hand the gate an actuator AND inspect the
270 /// counts afterwards (the gate takes a `Box`, consuming ownership).
271 struct SharedActuator(Arc<CountingActuator>);
272
273 impl GateActuator for SharedActuator {
274 fn pause(&self) {
275 self.0.pause();
276 }
277 fn resume(&self) {
278 self.0.resume();
279 }
280 }
281
282 fn governor_with(source: Arc<MockSource>) -> Arc<UnifiedPressure> {
283 let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
284 Arc::new(UnifiedPressure::new(
285 vec![source as Arc<dyn PressureSource>],
286 hyst,
287 ))
288 }
289
290 /// THE adversarial proving test for the gate.
291 ///
292 /// Drives one gate through low->high->high->low->low->high and proves:
293 /// 1. `pause()` fires EXACTLY ONCE per rising edge (not once per
294 /// `evaluate()` while latched);
295 /// 2. `resume()` fires EXACTLY ONCE per falling edge;
296 /// 3. `evaluate()` returns `Hold` while latched, `Yes` otherwise;
297 /// 4. the latch re-arms cleanly (the second rising edge fires
298 /// `pause()` again -- no sticky state).
299 #[test]
300 fn gate_drives_actuator_exactly_once_per_edge() {
301 let mem = Arc::new(MockSource::new(0.10, true));
302 let pressure = governor_with(Arc::clone(&mem));
303 let counter = Arc::new(CountingActuator::new());
304 let gate = InboundGate::new(
305 Arc::clone(&pressure),
306 Box::new(SharedActuator(Arc::clone(&counter))),
307 );
308
309 // LOW: open, no actuator calls yet.
310 assert_eq!(gate.evaluate(), Admit::Yes);
311 assert!(!gate.is_held());
312 assert_eq!(counter.pauses(), 0);
313 assert_eq!(counter.resumes(), 0);
314
315 // RISING edge: 0.10 -> 0.90 (>= pause_above) -> pause() ONCE.
316 mem.set(0.90);
317 assert_eq!(gate.evaluate(), Admit::Hold);
318 assert!(gate.is_held());
319 assert_eq!(counter.pauses(), 1, "pause once on rising edge");
320 assert_eq!(counter.resumes(), 0);
321
322 // STILL HIGH: latched. Many evaluate()s, still Hold, NO extra
323 // pause() -- this is the edge-dedup invariant.
324 for _ in 0..5 {
325 assert_eq!(gate.evaluate(), Admit::Hold);
326 }
327 assert_eq!(counter.pauses(), 1, "no re-pause while latched");
328 assert_eq!(counter.resumes(), 0);
329
330 // Inside the band (0.70 > resume_below 0.65): latch HOLDS, still
331 // no extra calls.
332 mem.set(0.70);
333 assert_eq!(gate.evaluate(), Admit::Hold);
334 assert_eq!(counter.pauses(), 1, "band holds, no re-pause");
335 assert_eq!(counter.resumes(), 0);
336
337 // FALLING edge: 0.70 -> 0.50 (<= resume_below) -> resume() ONCE.
338 mem.set(0.50);
339 assert_eq!(gate.evaluate(), Admit::Yes);
340 assert!(!gate.is_held());
341 assert_eq!(counter.pauses(), 1);
342 assert_eq!(counter.resumes(), 1, "resume once on falling edge");
343
344 // STILL LOW: open. Many evaluate()s, still Yes, NO extra resume().
345 for _ in 0..5 {
346 assert_eq!(gate.evaluate(), Admit::Yes);
347 }
348 assert_eq!(counter.pauses(), 1);
349 assert_eq!(counter.resumes(), 1, "no re-resume while released");
350
351 // SECOND RISING edge: re-arms cleanly -> pause() AGAIN (count 2).
352 mem.set(0.95);
353 assert_eq!(gate.evaluate(), Admit::Hold);
354 assert!(gate.is_held());
355 assert_eq!(counter.pauses(), 2, "latch re-arms, pause fires again");
356 assert_eq!(counter.resumes(), 1);
357 }
358
359 #[test]
360 fn noop_actuator_gate_still_tracks_held_state() {
361 let mem = Arc::new(MockSource::new(0.10, true));
362 let pressure = governor_with(Arc::clone(&mem));
363 let gate = InboundGate::new(Arc::clone(&pressure), Box::new(NoopActuator));
364
365 assert_eq!(gate.evaluate(), Admit::Yes);
366 assert!(!gate.is_held());
367
368 mem.set(0.90);
369 assert_eq!(gate.evaluate(), Admit::Hold);
370 assert!(gate.is_held());
371
372 mem.set(0.10);
373 assert_eq!(gate.evaluate(), Admit::Yes);
374 assert!(!gate.is_held());
375 }
376}