Skip to main content

hyperi_rustlib/governor/
gate.rs

1// Project:   hyperi-rustlib
2// File:      src/governor/gate.rs
3// Purpose:   Inbound gate: edge-detecting pause/resume over the pressure latch
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Inbound gate: drives an actuator on each pause/resume transition.
10//!
11//! The [`UnifiedPressure`] latch tells us *whether* to hold; the
12//! [`InboundGate`] turns that latched boolean into EDGE events. Each
13//! [`evaluate`](InboundGate::evaluate) samples the latch and, on a
14//! transition only, calls the [`GateActuator`] exactly once -- `pause()`
15//! on the false->true (rising) edge, `resume()` on the true->false
16//! (falling) edge. While the latch stays held, repeated `evaluate()`
17//! calls return [`Admit::Hold`] but do NOT re-call `pause()`; likewise a
18//! released latch returns [`Admit::Yes`] without re-calling `resume()`.
19//!
20//! This is deliberately the INBOUND side only: the actuator pauses the
21//! recv/ingest of a source (stops pulling new work) so the in-flight
22//! buffer drains under pressure. It is never wired to the outbound drain
23//! (sink) -- gating the drain would deadlock the pipeline. `send` is
24//! never involved here.
25//!
26//! Additive and default-off (the `governor` feature). NOT wired into any
27//! transport, driver, or runtime here; that lands in a later phase.
28
29use std::sync::Arc;
30use std::sync::atomic::{AtomicBool, Ordering};
31
32use super::source::UnifiedPressure;
33
34/// Drives the inbound source on pause/resume edges.
35///
36/// Implementors translate a gate edge into a concrete action on the
37/// ingest side -- e.g. stop polling a Kafka consumer, stop accepting on an
38/// HTTP listener. The gate guarantees each method fires EXACTLY ONCE per
39/// transition, so an implementation is free to be non-idempotent (toggle
40/// a flag, pause/resume a stream) without double-pausing.
41pub trait GateActuator: Send + Sync {
42    /// Pause the inbound source. Called once on the rising edge.
43    fn pause(&self);
44    /// Resume the inbound source. Called once on the falling edge.
45    fn resume(&self);
46}
47
48/// An observability decorator over a [`GateActuator`].
49///
50/// Wrap the real actuator (the Kafka pause/resume actuator, a [`NoopActuator`],
51/// etc.) so each pause/resume EDGE emits a metric and a brake-reason log line,
52/// then forwards to the inner actuator. Because the [`InboundGate`] fires each
53/// edge EXACTLY ONCE, the `inbound_paused` gauge and the
54/// `self_regulation_inbound_pauses_total` counter track real transitions, not
55/// per-evaluate noise.
56///
57/// This makes inbound throttling VISIBLE: a `paused` log on the rising edge and
58/// a `resumed` log on the falling edge, plus a gauge dashboards can graph.
59pub struct ObservingActuator {
60    inner: Box<dyn GateActuator>,
61    /// Stable source label for the log line (e.g. `"kafka"`, `"http"`).
62    source: &'static str,
63}
64
65impl ObservingActuator {
66    /// Wrap `inner` so pause/resume edges emit metrics + logs under `source`.
67    #[must_use]
68    pub fn new(source: &'static str, inner: Box<dyn GateActuator>) -> Self {
69        Self { inner, source }
70    }
71}
72
73impl GateActuator for ObservingActuator {
74    fn pause(&self) {
75        #[cfg(feature = "metrics")]
76        {
77            ::metrics::gauge!("inbound_paused").set(1.0);
78            ::metrics::counter!("self_regulation_inbound_pauses_total").increment(1);
79        }
80        tracing::warn!(
81            source = self.source,
82            "self-regulation: inbound PAUSED under pressure (memory/back-pressure brake)"
83        );
84        self.inner.pause();
85    }
86
87    fn resume(&self) {
88        #[cfg(feature = "metrics")]
89        ::metrics::gauge!("inbound_paused").set(0.0);
90        tracing::info!(
91            source = self.source,
92            "self-regulation: inbound RESUMED, pressure cleared"
93        );
94        self.inner.resume();
95    }
96}
97
98/// A no-op actuator for tests and send-only pipelines.
99///
100/// Useful when a stage wants the gate's [`Admit`] decision (to stop
101/// pulling work in its own loop) but has nothing external to pause --
102/// the gate's held/released state is still observable via
103/// [`InboundGate::is_held`].
104#[derive(Debug, Default, Clone, Copy)]
105pub struct NoopActuator;
106
107impl GateActuator for NoopActuator {
108    fn pause(&self) {}
109    fn resume(&self) {}
110}
111
112/// The gate's admission decision for the next unit of inbound work.
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum Admit {
115    /// Admit the next unit -- the gate is open.
116    Yes,
117    /// Hold (do not admit) -- the gate is closed under pressure.
118    Hold,
119}
120
121/// Edge-detecting inbound gate over a [`UnifiedPressure`] latch.
122///
123/// Wraps the recv/ingest side of a stage. Each
124/// [`evaluate`](Self::evaluate) consults the latch and drives the
125/// [`GateActuator`] once per transition. See the
126/// [module docs](crate::governor) for the full contract.
127pub struct InboundGate {
128    pressure: Arc<UnifiedPressure>,
129    actuator: Box<dyn GateActuator>,
130    /// Last edge state we drove the actuator to. Tracked separately from
131    /// the pressure latch so the actuator fires EXACTLY ONCE per
132    /// transition even though `should_hold()` returns `true` repeatedly
133    /// while latched.
134    paused_edge: AtomicBool,
135}
136
137impl InboundGate {
138    /// Build a gate over a shared pressure latch and an actuator.
139    ///
140    /// The gate starts in the released (open) state; the first
141    /// [`evaluate`](Self::evaluate) under pressure will fire `pause()`.
142    #[must_use]
143    pub fn new(pressure: Arc<UnifiedPressure>, actuator: Box<dyn GateActuator>) -> Self {
144        Self {
145            pressure,
146            actuator,
147            paused_edge: AtomicBool::new(false),
148        }
149    }
150
151    /// Sample the latch and drive the actuator on a transition.
152    ///
153    /// Computes [`should_hold`](UnifiedPressure::should_hold) from the
154    /// pressure, then uses a `compare_exchange` on the edge flag so:
155    ///
156    /// - false->true (rising edge): `compare_exchange(false, true)`
157    ///   succeeds exactly once -> call `pause()`.
158    /// - true->false (falling edge): `compare_exchange(true, false)`
159    ///   succeeds exactly once -> call `resume()`.
160    /// - no change: `compare_exchange` fails -> no actuator call.
161    ///
162    /// Returns [`Admit::Hold`] when held, [`Admit::Yes`] otherwise. Never
163    /// touches the outbound side.
164    pub fn evaluate(&self) -> Admit {
165        let hold = self.pressure.should_hold();
166        if hold {
167            // Rising edge: flip false -> true exactly once.
168            if self
169                .paused_edge
170                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
171                .is_ok()
172            {
173                self.actuator.pause();
174            }
175            Admit::Hold
176        } else {
177            // Falling edge: flip true -> false exactly once.
178            if self
179                .paused_edge
180                .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
181                .is_ok()
182            {
183                self.actuator.resume();
184            }
185            Admit::Yes
186        }
187    }
188
189    /// Whether the gate last drove the actuator to the held state.
190    ///
191    /// Reflects the edge flag, not a fresh pressure sample -- it is the
192    /// state the actuator has been driven to by the most recent
193    /// [`evaluate`](Self::evaluate).
194    #[must_use]
195    pub fn is_held(&self) -> bool {
196        self.paused_edge.load(Ordering::Acquire)
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use crate::governor::source::{Hysteresis, Pressure, PressureSource};
204    use std::sync::atomic::{AtomicU64, AtomicUsize};
205
206    /// Scriptable pressure source (mirrors the G1 test double): stores the
207    /// reading as a bit-pattern `u64` so it stays `Sync` without `unsafe`
208    /// or a lock.
209    struct MockSource {
210        value: AtomicU64,
211        hard: bool,
212    }
213
214    impl MockSource {
215        fn new(value: f64, hard: bool) -> Self {
216            Self {
217                value: AtomicU64::new(value.to_bits()),
218                hard,
219            }
220        }
221        fn set(&self, value: f64) {
222            self.value.store(value.to_bits(), Ordering::Relaxed);
223        }
224    }
225
226    impl PressureSource for MockSource {
227        fn name(&self) -> &'static str {
228            "mock"
229        }
230        fn sample(&self) -> Pressure {
231            Pressure::new(f64::from_bits(self.value.load(Ordering::Relaxed)))
232        }
233        fn is_hard(&self) -> bool {
234            self.hard
235        }
236    }
237
238    /// Counting actuator: records exactly how many times each edge fired.
239    struct CountingActuator {
240        pause_calls: AtomicUsize,
241        resume_calls: AtomicUsize,
242    }
243
244    impl CountingActuator {
245        fn new() -> Self {
246            Self {
247                pause_calls: AtomicUsize::new(0),
248                resume_calls: AtomicUsize::new(0),
249            }
250        }
251        fn pauses(&self) -> usize {
252            self.pause_calls.load(Ordering::Relaxed)
253        }
254        fn resumes(&self) -> usize {
255            self.resume_calls.load(Ordering::Relaxed)
256        }
257    }
258
259    impl GateActuator for CountingActuator {
260        fn pause(&self) {
261            self.pause_calls.fetch_add(1, Ordering::Relaxed);
262        }
263        fn resume(&self) {
264            self.resume_calls.fetch_add(1, Ordering::Relaxed);
265        }
266    }
267
268    /// A `GateActuator` that forwards to a shared `Arc<CountingActuator>`
269    /// so the test can both hand the gate an actuator AND inspect the
270    /// counts afterwards (the gate takes a `Box`, consuming ownership).
271    struct SharedActuator(Arc<CountingActuator>);
272
273    impl GateActuator for SharedActuator {
274        fn pause(&self) {
275            self.0.pause();
276        }
277        fn resume(&self) {
278            self.0.resume();
279        }
280    }
281
282    fn governor_with(source: Arc<MockSource>) -> Arc<UnifiedPressure> {
283        let hyst = Hysteresis::new(0.80, 0.65).expect("valid band");
284        Arc::new(UnifiedPressure::new(
285            vec![source as Arc<dyn PressureSource>],
286            hyst,
287        ))
288    }
289
290    /// THE adversarial proving test for the gate.
291    ///
292    /// Drives one gate through low->high->high->low->low->high and proves:
293    ///   1. `pause()` fires EXACTLY ONCE per rising edge (not once per
294    ///      `evaluate()` while latched);
295    ///   2. `resume()` fires EXACTLY ONCE per falling edge;
296    ///   3. `evaluate()` returns `Hold` while latched, `Yes` otherwise;
297    ///   4. the latch re-arms cleanly (the second rising edge fires
298    ///      `pause()` again -- no sticky state).
299    #[test]
300    fn gate_drives_actuator_exactly_once_per_edge() {
301        let mem = Arc::new(MockSource::new(0.10, true));
302        let pressure = governor_with(Arc::clone(&mem));
303        let counter = Arc::new(CountingActuator::new());
304        let gate = InboundGate::new(
305            Arc::clone(&pressure),
306            Box::new(SharedActuator(Arc::clone(&counter))),
307        );
308
309        // LOW: open, no actuator calls yet.
310        assert_eq!(gate.evaluate(), Admit::Yes);
311        assert!(!gate.is_held());
312        assert_eq!(counter.pauses(), 0);
313        assert_eq!(counter.resumes(), 0);
314
315        // RISING edge: 0.10 -> 0.90 (>= pause_above) -> pause() ONCE.
316        mem.set(0.90);
317        assert_eq!(gate.evaluate(), Admit::Hold);
318        assert!(gate.is_held());
319        assert_eq!(counter.pauses(), 1, "pause once on rising edge");
320        assert_eq!(counter.resumes(), 0);
321
322        // STILL HIGH: latched. Many evaluate()s, still Hold, NO extra
323        // pause() -- this is the edge-dedup invariant.
324        for _ in 0..5 {
325            assert_eq!(gate.evaluate(), Admit::Hold);
326        }
327        assert_eq!(counter.pauses(), 1, "no re-pause while latched");
328        assert_eq!(counter.resumes(), 0);
329
330        // Inside the band (0.70 > resume_below 0.65): latch HOLDS, still
331        // no extra calls.
332        mem.set(0.70);
333        assert_eq!(gate.evaluate(), Admit::Hold);
334        assert_eq!(counter.pauses(), 1, "band holds, no re-pause");
335        assert_eq!(counter.resumes(), 0);
336
337        // FALLING edge: 0.70 -> 0.50 (<= resume_below) -> resume() ONCE.
338        mem.set(0.50);
339        assert_eq!(gate.evaluate(), Admit::Yes);
340        assert!(!gate.is_held());
341        assert_eq!(counter.pauses(), 1);
342        assert_eq!(counter.resumes(), 1, "resume once on falling edge");
343
344        // STILL LOW: open. Many evaluate()s, still Yes, NO extra resume().
345        for _ in 0..5 {
346            assert_eq!(gate.evaluate(), Admit::Yes);
347        }
348        assert_eq!(counter.pauses(), 1);
349        assert_eq!(counter.resumes(), 1, "no re-resume while released");
350
351        // SECOND RISING edge: re-arms cleanly -> pause() AGAIN (count 2).
352        mem.set(0.95);
353        assert_eq!(gate.evaluate(), Admit::Hold);
354        assert!(gate.is_held());
355        assert_eq!(counter.pauses(), 2, "latch re-arms, pause fires again");
356        assert_eq!(counter.resumes(), 1);
357    }
358
359    #[test]
360    fn noop_actuator_gate_still_tracks_held_state() {
361        let mem = Arc::new(MockSource::new(0.10, true));
362        let pressure = governor_with(Arc::clone(&mem));
363        let gate = InboundGate::new(Arc::clone(&pressure), Box::new(NoopActuator));
364
365        assert_eq!(gate.evaluate(), Admit::Yes);
366        assert!(!gate.is_held());
367
368        mem.set(0.90);
369        assert_eq!(gate.evaluate(), Admit::Hold);
370        assert!(gate.is_held());
371
372        mem.set(0.10);
373        assert_eq!(gate.evaluate(), Admit::Yes);
374        assert!(!gate.is_held());
375    }
376}