Skip to main content

wafrift_proxy/
intercept.rs

1//! Operator-driven intercept queue: pause every forward, surface it
2//! in the TUI, let the operator release / kill before upstream sees
3//! anything.
4//!
5//! Closes blocker #119. The queue is process-scoped via an
6//! [`InterceptStore`] held behind an `Arc<Mutex<>>` so the proxy
7//! request handler and the TUI render+keymap layers see the same
8//! state.
9//!
10//! Locking discipline:
11//! - Both register and release/kill take the write lock briefly,
12//!   never across an `await` that performs I/O.
13//! - The waiting future does NOT hold the lock — it parks on a
14//!   per-request `tokio::sync::oneshot` instead.
15
16use std::collections::BTreeMap;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::{Arc, Mutex, OnceLock};
19use std::time::{Duration, Instant};
20
21use tokio::sync::oneshot;
22
23/// Process-wide intercept-mode flag. Toggleable from the TUI keymap.
24/// When `true`, every proxy forward parks on the global
25/// [`InterceptStore`] until an operator action.
26static INTERCEPT_MODE: AtomicBool = AtomicBool::new(false);
27
28/// Process-wide pending-intercept queue. Lazily initialised so the
29/// proxy and TUI see the same state.
30static INTERCEPT_STORE: OnceLock<InterceptStore> = OnceLock::new();
31
32/// Read intercept-mode atomically. Cheap.
33///
34/// Uses `Acquire` to pair with the `Release` store in
35/// `toggle_intercept_mode` / `set_intercept_mode`. Pre-R60 this was
36/// `Relaxed` on both sides — the mutex around the WRITE serialised
37/// writers against each other but did NOT establish a happens-before
38/// edge from writer-with-mutex to reader-without-mutex. On
39/// weakly-ordered hardware (ARM/aarch64) a request handler thread
40/// could observe `false` indefinitely after a TUI keypress flipped
41/// intercept ON, silently bypassing the intercept queue.
42/// R60 pass-21 §15 audit-hunts (concurrent-state ordering).
43#[must_use]
44pub fn intercept_mode_enabled() -> bool {
45    INTERCEPT_MODE.load(Ordering::Acquire)
46}
47
48/// Serializes the (flip, drain) pair in toggle/set so two concurrent
49/// toggles can't interleave such that the drain runs while another
50/// thread has already flipped intercept back ON. Without this guard
51/// the audit's `test_concurrent_toggle_race` reproduced the
52/// spurious-release bug.
53static MODE_TRANSITION: std::sync::Mutex<()> = std::sync::Mutex::new(());
54
55/// Toggle intercept-mode and return the new value. When toggling
56/// OFF, drains every pending intercept with `Release` so existing
57/// requests don't wedge.
58pub fn toggle_intercept_mode() -> bool {
59    // Hold MODE_TRANSITION across the entire (read-modify-drain)
60    // sequence — the atomic alone isn't enough because the drain is
61    // a separate observation of the store. Closes the TOCTOU window
62    // identified by the 2026-05-10 audit.
63    let _guard = MODE_TRANSITION
64        .lock()
65        .unwrap_or_else(std::sync::PoisonError::into_inner);
66    // R60 pass-21 §15: Release pairs with the Acquire load in
67    // `intercept_mode_enabled()` — establishes the happens-before edge
68    // required for the reader to observe the new value on ARM/aarch64.
69    let prev = INTERCEPT_MODE.fetch_xor(true, Ordering::Release);
70    let now_on = !prev;
71    if !now_on {
72        let _ = global_store().drain_release();
73    }
74    now_on
75}
76
77/// Force intercept-mode to a specific value (test / programmatic
78/// override). Drains pending on transition to OFF.
79pub fn set_intercept_mode(on: bool) {
80    let _guard = MODE_TRANSITION
81        .lock()
82        .unwrap_or_else(std::sync::PoisonError::into_inner);
83    // R60 pass-21 §15: Release pairs with Acquire in
84    // `intercept_mode_enabled()`. See toggle_intercept_mode comment.
85    let prev = INTERCEPT_MODE.swap(on, Ordering::Release);
86    if prev && !on {
87        let _ = global_store().drain_release();
88    }
89}
90
91/// Get the process-wide intercept store, initialising it on first
92/// access.
93pub fn global_store() -> &'static InterceptStore {
94    INTERCEPT_STORE.get_or_init(InterceptStore::new)
95}
96
97/// Decision the operator (or the timeout fallback) returns to the
98/// blocked request.
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub enum InterceptDecision {
101    /// Forward the request unmodified.
102    Release,
103    /// Return a synthetic 403 to the client; never hits upstream.
104    Kill,
105}
106
107/// One pending intercept the TUI shows + the operator acts on.
108#[derive(Debug, Clone)]
109pub struct PendingIntercept {
110    pub id: u64,
111    pub host: String,
112    pub method: String,
113    pub path: String,
114    /// When the request was registered.
115    pub since: Instant,
116}
117
118/// Shared per-process intercept store.
119#[derive(Debug, Default, Clone)]
120pub struct InterceptStore {
121    inner: Arc<Mutex<InterceptInner>>,
122}
123
124#[derive(Debug, Default)]
125struct InterceptInner {
126    /// Per-request rendezvous sender. Removed when the operator
127    /// resolves the intercept (release/kill) or when a timeout fires.
128    senders: BTreeMap<u64, oneshot::Sender<InterceptDecision>>,
129    /// Snapshot of the same set the TUI iterates for display.
130    pending: BTreeMap<u64, PendingIntercept>,
131    /// Monotonic ID generator. Starts at 0 so the first `register`
132    /// call's `wrapping_add(1)` yields id=1 — id=0 is RESERVED as
133    /// an "invalid intercept" sentinel. `resolve(0, ...)` and
134    /// `cancel(0, ...)` silently return false, so callers must
135    /// never pass 0 expecting it to map to a real intercept.
136    next_id: u64,
137}
138
139/// Default intercept timeout — after which the request defaults
140/// to `Release` so the proxy never wedges if the operator walks
141/// away.
142pub const INTERCEPT_TIMEOUT: Duration = Duration::from_secs(30);
143
144impl InterceptStore {
145    pub fn new() -> Self {
146        Self::default()
147    }
148
149    /// Register a fresh intercept and return the receiver the request
150    /// handler should await on, plus the assigned ID.
151    ///
152    /// Each call also opportunistically GCs any senders whose receiver
153    /// has been dropped — this catches the client-disconnect path where
154    /// neither `resolve` nor the timeout's `cancel` fires (the request
155    /// future is cancelled before either arm of `tokio::select!` runs).
156    /// Without the GC the entries leak forever in `senders` + `pending`.
157    pub fn register(
158        &self,
159        host: impl Into<String>,
160        method: impl Into<String>,
161        path: impl Into<String>,
162    ) -> (u64, oneshot::Receiver<InterceptDecision>) {
163        let (tx, rx) = oneshot::channel();
164        let mut inner = self
165            .inner
166            .lock()
167            .unwrap_or_else(std::sync::PoisonError::into_inner);
168        // GC closed senders (client-disconnect leak).
169        let dead: Vec<u64> = inner
170            .senders
171            .iter()
172            .filter(|(_, tx)| tx.is_closed())
173            .map(|(id, _)| *id)
174            .collect();
175        for id in dead {
176            inner.senders.remove(&id);
177            inner.pending.remove(&id);
178        }
179        // Advance the counter and skip 0 (reserved sentinel). After
180        // u64::MAX registrations, wrapping_add gives 0; wrapping_add
181        // once more gives 1, resuming the sequence. IDs are not
182        // guaranteed unique if the map still holds the re-issued ID
183        // at wraparound, but that requires ~1.8×10^19 concurrent
184        // pending intercepts — practically impossible.
185        inner.next_id = inner.next_id.wrapping_add(1);
186        if inner.next_id == 0 {
187            inner.next_id = 1;
188        }
189        let id = inner.next_id;
190        inner.senders.insert(id, tx);
191        inner.pending.insert(
192            id,
193            PendingIntercept {
194                id,
195                host: host.into(),
196                method: method.into(),
197                path: path.into(),
198                since: Instant::now(),
199            },
200        );
201        (id, rx)
202    }
203
204    /// Drop entries whose oneshot rx has been dropped. Exposed for
205    /// tests + the TUI render loop, which can call this periodically
206    /// even when no new intercepts are arriving.
207    pub fn gc_dead_senders(&self) -> usize {
208        let mut inner = self
209            .inner
210            .lock()
211            .unwrap_or_else(std::sync::PoisonError::into_inner);
212        let dead: Vec<u64> = inner
213            .senders
214            .iter()
215            .filter(|(_, tx)| tx.is_closed())
216            .map(|(id, _)| *id)
217            .collect();
218        let n = dead.len();
219        for id in dead {
220            inner.senders.remove(&id);
221            inner.pending.remove(&id);
222        }
223        n
224    }
225
226    /// Resolve a pending intercept with a decision. Idempotent — a
227    /// second resolve for the same id is a no-op.
228    pub fn resolve(&self, id: u64, decision: InterceptDecision) -> bool {
229        let mut inner = self
230            .inner
231            .lock()
232            .unwrap_or_else(std::sync::PoisonError::into_inner);
233        inner.pending.remove(&id);
234        if let Some(tx) = inner.senders.remove(&id) {
235            let _ = tx.send(decision);
236            true
237        } else {
238            false
239        }
240    }
241
242    /// Cancel a pending intercept WITHOUT sending a decision. The
243    /// proxy calls this when the receiver is dropped (client
244    /// disconnected, request timed out before the operator decided)
245    /// so the sender + pending entry don't leak forever in the maps.
246    /// Idempotent.
247    ///
248    /// Returns true if an entry was removed, false if no such id.
249    pub fn cancel(&self, id: u64) -> bool {
250        let mut inner = self
251            .inner
252            .lock()
253            .unwrap_or_else(std::sync::PoisonError::into_inner);
254        let removed_pending = inner.pending.remove(&id).is_some();
255        let removed_sender = inner.senders.remove(&id).is_some();
256        removed_pending || removed_sender
257    }
258
259    /// Release every pending intercept with `Release`. Used when the
260    /// operator toggles intercept-mode OFF — don't strand existing
261    /// requests.
262    pub fn drain_release(&self) -> usize {
263        let mut inner = self
264            .inner
265            .lock()
266            .unwrap_or_else(std::sync::PoisonError::into_inner);
267        let ids: Vec<u64> = inner.senders.keys().copied().collect();
268        let mut released = 0;
269        for id in ids {
270            if let Some(tx) = inner.senders.remove(&id) {
271                inner.pending.remove(&id);
272                let _ = tx.send(InterceptDecision::Release);
273                released += 1;
274            }
275        }
276        released
277    }
278
279    /// Snapshot of the pending list for the TUI.
280    pub fn snapshot(&self) -> Vec<PendingIntercept> {
281        let inner = self
282            .inner
283            .lock()
284            .unwrap_or_else(std::sync::PoisonError::into_inner);
285        inner.pending.values().cloned().collect()
286    }
287
288    /// How many requests are currently parked in the rendezvous.
289    pub fn pending_count(&self) -> usize {
290        let inner = self
291            .inner
292            .lock()
293            .unwrap_or_else(std::sync::PoisonError::into_inner);
294        inner.pending.len()
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301
302    fn store() -> InterceptStore {
303        InterceptStore::new()
304    }
305
306    #[tokio::test]
307    async fn register_then_release_unblocks_with_release() {
308        let s = store();
309        let (id, rx) = s.register("h", "GET", "/");
310        let s2 = s.clone();
311        tokio::spawn(async move {
312            tokio::time::sleep(Duration::from_millis(10)).await;
313            s2.resolve(id, InterceptDecision::Release);
314        });
315        let decision = rx.await.expect("rx");
316        assert_eq!(decision, InterceptDecision::Release);
317        assert_eq!(s.pending_count(), 0, "pending must drain after resolve");
318    }
319
320    #[tokio::test]
321    async fn register_then_kill_unblocks_with_kill() {
322        let s = store();
323        let (id, rx) = s.register("h", "POST", "/admin");
324        let s2 = s.clone();
325        tokio::spawn(async move {
326            s2.resolve(id, InterceptDecision::Kill);
327        });
328        assert_eq!(rx.await.unwrap(), InterceptDecision::Kill);
329    }
330
331    #[tokio::test]
332    async fn snapshot_shows_pending_until_resolved() {
333        let s = store();
334        let (id1, _r1) = s.register("a.com", "GET", "/x");
335        let (id2, _r2) = s.register("b.com", "POST", "/y");
336        let snap = s.snapshot();
337        assert_eq!(snap.len(), 2);
338        assert!(snap.iter().any(|p| p.id == id1 && p.host == "a.com"));
339        assert!(snap.iter().any(|p| p.id == id2 && p.host == "b.com"));
340    }
341
342    #[tokio::test]
343    async fn drain_release_unblocks_every_pending() {
344        let s = store();
345        let (_, rx1) = s.register("a", "GET", "/");
346        let (_, rx2) = s.register("b", "GET", "/");
347        let n = s.drain_release();
348        assert_eq!(n, 2);
349        assert_eq!(rx1.await.unwrap(), InterceptDecision::Release);
350        assert_eq!(rx2.await.unwrap(), InterceptDecision::Release);
351        assert_eq!(s.pending_count(), 0);
352    }
353
354    #[tokio::test]
355    async fn resolve_unknown_id_is_idempotent_no_op() {
356        let s = store();
357        let acted = s.resolve(999, InterceptDecision::Release);
358        assert!(!acted, "resolve of unknown id must report it didn't fire");
359    }
360
361    #[tokio::test]
362    async fn resolve_twice_only_fires_once() {
363        let s = store();
364        let (id, rx) = s.register("h", "GET", "/");
365        assert!(s.resolve(id, InterceptDecision::Release));
366        assert!(
367            !s.resolve(id, InterceptDecision::Kill),
368            "second resolve must no-op"
369        );
370        assert_eq!(rx.await.unwrap(), InterceptDecision::Release);
371    }
372
373    #[tokio::test]
374    async fn timeout_default_release_via_select() {
375        // The proxy uses tokio::select! { _ = rx => …, _ = sleep(TIMEOUT) => Release }.
376        // Verifies the receiver actually waits forever when no resolve fires.
377        let s = store();
378        let (_id, rx) = s.register("h", "GET", "/");
379        let result = tokio::time::timeout(Duration::from_millis(50), rx).await;
380        assert!(result.is_err(), "rx must NOT complete on its own");
381    }
382
383    #[tokio::test]
384    async fn ids_are_monotonic_per_register() {
385        let s = store();
386        let (id1, _) = s.register("a", "GET", "/");
387        let (id2, _) = s.register("a", "GET", "/");
388        let (id3, _) = s.register("a", "GET", "/");
389        assert_eq!(id2, id1 + 1);
390        assert_eq!(id3, id2 + 1);
391    }
392
393    #[test]
394    fn id_zero_is_reserved_and_resolve_cancel_return_false() {
395        // Contract regression: id=0 is a reserved sentinel — no
396        // register() call can ever assign it (the first call hands
397        // back id=1). Caller mistakes that pass 0 must not match
398        // any real intercept; both resolve and cancel return false.
399        let s = store();
400        // No intercepts registered yet.
401        assert!(!s.resolve(0, InterceptDecision::Release));
402        assert!(!s.cancel(0));
403        // Register one — id is 1, never 0.
404        let (id, _rx) = s.register("h", "GET", "/");
405        assert_eq!(id, 1, "first id must be 1 (0 is reserved)");
406        // 0 still returns false even with real intercepts present.
407        assert!(!s.resolve(0, InterceptDecision::Release));
408        assert!(!s.cancel(0));
409    }
410
411    #[test]
412    fn id_wraparound_skips_zero() {
413        // Regression: after u64::MAX registrations, wrapping_add(1) == 0,
414        // violating the "0 is reserved" contract. The fix skips 0 and
415        // assigns 1. Simulate by forcing next_id to u64::MAX - 1.
416        let s = store();
417        {
418            let mut inner = s.inner.lock().unwrap();
419            inner.next_id = u64::MAX - 1;
420        }
421        // This register gets id = u64::MAX.
422        let (id1, _rx1) = s.register("h", "GET", "/");
423        assert_eq!(id1, u64::MAX, "pre-wraparound id must be u64::MAX");
424        // Next register wraps: 0 is skipped, lands at 1.
425        let (id2, _rx2) = s.register("h", "GET", "/");
426        assert_eq!(id2, 1, "post-wraparound id must skip 0 and return 1");
427        assert_ne!(id2, 0, "id=0 must never be issued");
428    }
429
430    #[test]
431    fn cancel_removes_from_both_maps() {
432        let s = store();
433        let (id, _rx) = s.register("h", "GET", "/path");
434        assert_eq!(s.pending_count(), 1);
435        let removed = s.cancel(id);
436        assert!(removed, "cancel must return true for a valid id");
437        assert_eq!(s.pending_count(), 0, "cancel must drain from pending map");
438        // Second cancel is idempotent.
439        assert!(!s.cancel(id), "second cancel returns false (already gone)");
440    }
441
442    #[test]
443    fn gc_dead_senders_removes_disconnected_rx() {
444        let s = store();
445        let (id, rx) = s.register("h", "GET", "/");
446        // Drop the receiver — simulates client disconnect.
447        drop(rx);
448        // gc_dead_senders must clean up the orphaned sender + pending entry.
449        let removed = s.gc_dead_senders();
450        assert_eq!(removed, 1, "exactly one dead sender must be GCd");
451        assert_eq!(s.pending_count(), 0);
452        // The id must now be truly gone.
453        assert!(!s.cancel(id));
454    }
455
456    #[test]
457    fn resolve_zero_id_never_matches_real_intercept() {
458        // After wraparound the first real id re-issued is 1, never 0.
459        // An outstanding resolve(0) must not accidentally hit the real id=1.
460        let s = store();
461        {
462            let mut inner = s.inner.lock().unwrap();
463            inner.next_id = u64::MAX;
464        }
465        // id=1 after wraparound.
466        let (id, _rx) = s.register("h", "GET", "/");
467        assert_eq!(id, 1);
468        // resolve(0) must not affect the real id=1 entry.
469        assert!(!s.resolve(0, InterceptDecision::Kill));
470        assert_eq!(
471            s.pending_count(),
472            1,
473            "id=1 must still be pending after resolve(0)"
474        );
475    }
476}