Skip to main content

hyperi_rustlib/governor/
gate.rs

1// Project:   hyperi-rustlib
2// File:      src/governor/gate.rs
3// Purpose:   Inbound gate: edge-detecting pause/resume over the pressure latch
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Inbound gate: drives an actuator on each pause/resume transition.
10//!
11//! The [`UnifiedPressure`] latch tells us *whether* to hold; the
12//! [`InboundGate`] turns that latched boolean into EDGE events. Each
13//! [`evaluate`](InboundGate::evaluate) samples the latch and, on a
14//! transition only, calls the [`GateActuator`] exactly once -- `pause()`
15//! on the false->true (rising) edge, `resume()` on the true->false
16//! (falling) edge. While the latch stays held, repeated `evaluate()`
17//! calls return [`Admit::Hold`] but do NOT re-call `pause()`; likewise a
18//! released latch returns [`Admit::Yes`] without re-calling `resume()`.
19//!
20//! INBOUND side only: the actuator pauses a source's recv/ingest (stops
21//! pulling new work) so the in-flight buffer drains under pressure. NEVER
22//! wired to the outbound drain (sink) -- gating the drain would deadlock the
23//! pipeline. `send` is never involved here.
24//!
25//! Gated behind the `governor` feature; wired into the receive transport by
26//! [`SelfRegulationGovernor`](super::SelfRegulationGovernor) (default-on).
27
28use std::sync::Arc;
29use std::sync::atomic::{AtomicBool, Ordering};
30
31use super::source::UnifiedPressure;
32
33/// Drives the inbound source on pause/resume edges.
34///
35/// Implementors translate a gate edge into a concrete action on the
36/// ingest side -- e.g. stop polling a Kafka consumer, stop accepting on an
37/// HTTP listener. The gate guarantees each method fires EXACTLY ONCE per
38/// transition, so an implementation is free to be non-idempotent (toggle
39/// a flag, pause/resume a stream) without double-pausing.
40pub trait GateActuator: Send + Sync {
41    /// Pause the inbound source. Called once on the rising edge.
42    fn pause(&self);
43    /// Resume the inbound source. Called once on the falling edge.
44    fn resume(&self);
45}
46
47/// An observability decorator over a [`GateActuator`].
48///
49/// Wrap the real actuator (Kafka pause/resume, a [`NoopActuator`], etc.) so
50/// each pause/resume EDGE emits a metric + brake-reason log, then forwards to
51/// the inner actuator. Because the [`InboundGate`] fires each edge EXACTLY
52/// ONCE, the `inbound_paused` gauge and `self_regulation_inbound_pauses_total`
53/// counter track real transitions, not per-evaluate noise -- a `paused`/
54/// `resumed` log pair and a gauge dashboards can graph.
55pub struct ObservingActuator {
56    inner: Box<dyn GateActuator>,
57    /// Stable source label for the log line (e.g. `"kafka"`, `"http"`).
58    source: &'static str,
59}
60
61impl ObservingActuator {
62    /// Wrap `inner` so pause/resume edges emit metrics + logs under `source`.
63    #[must_use]
64    pub fn new(source: &'static str, inner: Box<dyn GateActuator>) -> Self {
65        Self { inner, source }
66    }
67}
68
69impl GateActuator for ObservingActuator {
70    fn pause(&self) {
71        // `self_regulation_` domain prefix + a `source` label: a bare
72        // `inbound_paused` collides with nothing today but reads ambiguously
73        // next to `MemoryGuard`/`ScalingPressure` gauges, and the label lets two
74        // governed receivers (e.g. Kafka + HTTP) be told apart -- without it the
75        // single global series shows "unpaused" while one source is still held.
76        #[cfg(feature = "metrics")]
77        {
78            ::metrics::gauge!("self_regulation_inbound_paused", "source" => self.source).set(1.0);
79            ::metrics::counter!("self_regulation_inbound_pauses_total", "source" => self.source)
80                .increment(1);
81        }
82        tracing::warn!(
83            source = self.source,
84            "self-regulation: inbound PAUSED under pressure (memory/back-pressure brake)"
85        );
86        self.inner.pause();
87    }
88
89    fn resume(&self) {
90        #[cfg(feature = "metrics")]
91        ::metrics::gauge!("self_regulation_inbound_paused", "source" => self.source).set(0.0);
92        tracing::info!(
93            source = self.source,
94            "self-regulation: inbound RESUMED, pressure cleared"
95        );
96        self.inner.resume();
97    }
98}
99
100/// A no-op actuator for tests and send-only pipelines.
101///
102/// Useful when a stage wants the gate's [`Admit`] decision (to stop
103/// pulling work in its own loop) but has nothing external to pause --
104/// the gate's held/released state is still observable via
105/// [`InboundGate::is_held`].
106#[derive(Debug, Default, Clone, Copy)]
107pub struct NoopActuator;
108
109impl GateActuator for NoopActuator {
110    fn pause(&self) {}
111    fn resume(&self) {}
112}
113
114/// The gate's admission decision for the next unit of inbound work.
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum Admit {
117    /// Admit the next unit -- the gate is open.
118    Yes,
119    /// Hold (do not admit) -- the gate is closed under pressure.
120    Hold,
121}
122
123/// Edge-detecting inbound gate over a [`UnifiedPressure`] latch.
124///
125/// Wraps the recv/ingest side of a stage. Each
126/// [`evaluate`](Self::evaluate) consults the latch and drives the
127/// [`GateActuator`] once per transition. See the
128/// [module docs](crate::governor) for the full contract.
129pub struct InboundGate {
130    pressure: Arc<UnifiedPressure>,
131    actuator: Box<dyn GateActuator>,
132    /// Last edge state we drove the actuator to. Tracked separately from
133    /// the pressure latch so the actuator fires EXACTLY ONCE per
134    /// transition even though `should_hold()` returns `true` repeatedly
135    /// while latched.
136    paused_edge: AtomicBool,
137}
138
139impl InboundGate {
140    /// Build a gate over a shared pressure latch and an actuator.
141    ///
142    /// The gate starts in the released (open) state; the first
143    /// [`evaluate`](Self::evaluate) under pressure will fire `pause()`.
144    #[must_use]
145    pub fn new(pressure: Arc<UnifiedPressure>, actuator: Box<dyn GateActuator>) -> Self {
146        Self {
147            pressure,
148            actuator,
149            paused_edge: AtomicBool::new(false),
150        }
151    }
152
153    /// Sample the latch and drive the actuator on a transition.
154    ///
155    /// Computes [`should_hold`](UnifiedPressure::should_hold) from the
156    /// pressure, then uses a `compare_exchange` on the edge flag so:
157    ///
158    /// - false->true (rising edge): `compare_exchange(false, true)`
159    ///   succeeds exactly once -> call `pause()`.
160    /// - true->false (falling edge): `compare_exchange(true, false)`
161    ///   succeeds exactly once -> call `resume()`.
162    /// - no change: `compare_exchange` fails -> no actuator call.
163    ///
164    /// Returns [`Admit::Hold`] when held, [`Admit::Yes`] otherwise. Never
165    /// touches the outbound side.
166    ///
167    /// # Single-evaluator contract
168    ///
169    /// `evaluate()` MUST be called from a SINGLE task (the one recv loop that
170    /// owns this source). The edge `compare_exchange` and the actuator call are
171    /// not one atomic step: two tasks racing across a pressure transition could
172    /// interleave so the actuator (an async pause/resume on the consumer) runs
173    /// out of order, stranding the source paused while the flag reads open. The
174    /// in-tree wiring satisfies this -- each transport's recv loop calls
175    /// `evaluate()` once per `recv`. Do NOT share one `InboundGate` across
176    /// concurrent evaluators; give each source its own gate.
177    pub fn evaluate(&self) -> Admit {
178        let hold = self.pressure.should_hold();
179        if hold {
180            // Rising edge: flip false -> true exactly once.
181            if self
182                .paused_edge
183                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
184                .is_ok()
185            {
186                self.actuator.pause();
187            }
188            Admit::Hold
189        } else {
190            // Falling edge: flip true -> false exactly once.
191            if self
192                .paused_edge
193                .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
194                .is_ok()
195            {
196                self.actuator.resume();
197            }
198            Admit::Yes
199        }
200    }
201
202    /// Whether the gate last drove the actuator to the held state.
203    ///
204    /// Reflects the edge flag, not a fresh pressure sample -- it is the
205    /// state the actuator has been driven to by the most recent
206    /// [`evaluate`](Self::evaluate).
207    #[must_use]
208    pub fn is_held(&self) -> bool {
209        self.paused_edge.load(Ordering::Acquire)
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216    use crate::governor::source::{Hysteresis, Pressure, PressureSource};
217    use std::sync::atomic::{AtomicU64, AtomicUsize};
218
219    /// Scriptable pressure source (mirrors the G1 test double): stores the
220    /// reading as a bit-pattern `u64` so it stays `Sync` without `unsafe`
221    /// or a lock.
222    struct MockSource {
223        value: AtomicU64,
224        hard: bool,
225    }
226
227    impl MockSource {
228        fn new(value: f64, hard: bool) -> Self {
229            Self {
230                value: AtomicU64::new(value.to_bits()),
231                hard,
232            }
233        }
234        fn set(&self, value: f64) {
235            self.value.store(value.to_bits(), Ordering::Relaxed);
236        }
237    }
238
239    impl PressureSource for MockSource {
240        fn name(&self) -> &'static str {
241            "mock"
242        }
243        fn sample(&self) -> Pressure {
244            Pressure::new(f64::from_bits(self.value.load(Ordering::Relaxed)))
245        }
246        fn is_hard(&self) -> bool {
247            self.hard
248        }
249    }
250
251    /// Counting actuator: records exactly how many times each edge fired.
252    struct CountingActuator {
253        pause_calls: AtomicUsize,
254        resume_calls: AtomicUsize,
255    }
256
257    impl CountingActuator {
258        fn new() -> Self {
259            Self {
260                pause_calls: AtomicUsize::new(0),
261                resume_calls: AtomicUsize::new(0),
262            }
263        }
264        fn pauses(&self) -> usize {
265            self.pause_calls.load(Ordering::Relaxed)
266        }
267        fn resumes(&self) -> usize {
268            self.resume_calls.load(Ordering::Relaxed)
269        }
270    }
271
272    impl GateActuator for CountingActuator {
273        fn pause(&self) {
274            self.pause_calls.fetch_add(1, Ordering::Relaxed);
275        }
276        fn resume(&self) {
277            self.resume_calls.fetch_add(1, Ordering::Relaxed);
278        }
279    }
280
281    /// A `GateActuator` that forwards to a shared `Arc<CountingActuator>`
282    /// so the test can both hand the gate an actuator AND inspect the
283    /// counts afterwards (the gate takes a `Box`, consuming ownership).
284    struct SharedActuator(Arc<CountingActuator>);
285
286    impl GateActuator for SharedActuator {
287        fn pause(&self) {
288            self.0.pause();
289        }
290        fn resume(&self) {
291            self.0.resume();
292        }
293    }
294
295    fn governor_with(source: Arc<MockSource>) -> Arc<UnifiedPressure> {
296        let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
297        Arc::new(UnifiedPressure::new(
298            vec![source as Arc<dyn PressureSource>],
299            hyst,
300        ))
301    }
302
303    /// THE adversarial proving test for the gate.
304    ///
305    /// Drives one gate through low->high->high->low->low->high and proves:
306    ///   1. `pause()` fires EXACTLY ONCE per rising edge (not once per
307    ///      `evaluate()` while latched);
308    ///   2. `resume()` fires EXACTLY ONCE per falling edge;
309    ///   3. `evaluate()` returns `Hold` while latched, `Yes` otherwise;
310    ///   4. the latch re-arms cleanly (the second rising edge fires
311    ///      `pause()` again -- no sticky state).
312    #[test]
313    fn gate_drives_actuator_exactly_once_per_edge() {
314        let mem = Arc::new(MockSource::new(0.10, true));
315        let pressure = governor_with(Arc::clone(&mem));
316        let counter = Arc::new(CountingActuator::new());
317        let gate = InboundGate::new(
318            Arc::clone(&pressure),
319            Box::new(SharedActuator(Arc::clone(&counter))),
320        );
321
322        // LOW: open, no actuator calls yet.
323        assert_eq!(gate.evaluate(), Admit::Yes);
324        assert!(!gate.is_held());
325        assert_eq!(counter.pauses(), 0);
326        assert_eq!(counter.resumes(), 0);
327
328        // RISING edge: 0.10 -> 0.90 (>= pause_above) -> pause() ONCE.
329        mem.set(0.90);
330        assert_eq!(gate.evaluate(), Admit::Hold);
331        assert!(gate.is_held());
332        assert_eq!(counter.pauses(), 1, "pause once on rising edge");
333        assert_eq!(counter.resumes(), 0);
334
335        // STILL HIGH: latched. Many evaluate()s, still Hold, NO extra
336        // pause() -- this is the edge-dedup invariant.
337        for _ in 0..5 {
338            assert_eq!(gate.evaluate(), Admit::Hold);
339        }
340        assert_eq!(counter.pauses(), 1, "no re-pause while latched");
341        assert_eq!(counter.resumes(), 0);
342
343        // Inside the band (0.70 > resume_below 0.65): latch HOLDS, still
344        // no extra calls.
345        mem.set(0.70);
346        assert_eq!(gate.evaluate(), Admit::Hold);
347        assert_eq!(counter.pauses(), 1, "band holds, no re-pause");
348        assert_eq!(counter.resumes(), 0);
349
350        // FALLING edge: 0.70 -> 0.50 (<= resume_below) -> resume() ONCE.
351        mem.set(0.50);
352        assert_eq!(gate.evaluate(), Admit::Yes);
353        assert!(!gate.is_held());
354        assert_eq!(counter.pauses(), 1);
355        assert_eq!(counter.resumes(), 1, "resume once on falling edge");
356
357        // STILL LOW: open. Many evaluate()s, still Yes, NO extra resume().
358        for _ in 0..5 {
359            assert_eq!(gate.evaluate(), Admit::Yes);
360        }
361        assert_eq!(counter.pauses(), 1);
362        assert_eq!(counter.resumes(), 1, "no re-resume while released");
363
364        // SECOND RISING edge: re-arms cleanly -> pause() AGAIN (count 2).
365        mem.set(0.95);
366        assert_eq!(gate.evaluate(), Admit::Hold);
367        assert!(gate.is_held());
368        assert_eq!(counter.pauses(), 2, "latch re-arms, pause fires again");
369        assert_eq!(counter.resumes(), 1);
370    }
371
372    #[test]
373    fn noop_actuator_gate_still_tracks_held_state() {
374        let mem = Arc::new(MockSource::new(0.10, true));
375        let pressure = governor_with(Arc::clone(&mem));
376        let gate = InboundGate::new(Arc::clone(&pressure), Box::new(NoopActuator));
377
378        assert_eq!(gate.evaluate(), Admit::Yes);
379        assert!(!gate.is_held());
380
381        mem.set(0.90);
382        assert_eq!(gate.evaluate(), Admit::Hold);
383        assert!(gate.is_held());
384
385        mem.set(0.10);
386        assert_eq!(gate.evaluate(), Admit::Yes);
387        assert!(!gate.is_held());
388    }
389}