wafrift-proxy 0.3.1

HTTP forward proxy with automatic WAF evasion and optional TLS interception support.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
//! Operator-driven intercept queue: pause every forward, surface it
//! in the TUI, let the operator release / kill before upstream sees
//! anything.
//!
//! Closes blocker #119. The queue is process-scoped via an
//! [`InterceptStore`] held behind an `Arc<Mutex<>>` so the proxy
//! request handler and the TUI render+keymap layers see the same
//! state.
//!
//! Locking discipline:
//! - Both register and release/kill take the write lock briefly,
//!   never across an `await` that performs I/O.
//! - The waiting future does NOT hold the lock — it parks on a
//!   per-request `tokio::sync::oneshot` instead.

use std::collections::BTreeMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{Duration, Instant};

use tokio::sync::oneshot;

/// Process-wide intercept-mode flag. Toggleable from the TUI keymap.
/// When `true`, every proxy forward parks on the global
/// [`InterceptStore`] until an operator action.
static INTERCEPT_MODE: AtomicBool = AtomicBool::new(false);

/// Process-wide pending-intercept queue. Lazily initialised so the
/// proxy and TUI see the same state.
static INTERCEPT_STORE: OnceLock<InterceptStore> = OnceLock::new();

/// Read intercept-mode atomically. Cheap.
///
/// Uses `Acquire` to pair with the `Release` store in
/// `toggle_intercept_mode` / `set_intercept_mode`. Pre-R60 this was
/// `Relaxed` on both sides — the mutex around the WRITE serialised
/// writers against each other but did NOT establish a happens-before
/// edge from writer-with-mutex to reader-without-mutex. On
/// weakly-ordered hardware (ARM/aarch64) a request handler thread
/// could observe `false` indefinitely after a TUI keypress flipped
/// intercept ON, silently bypassing the intercept queue.
/// R60 pass-21 §15 audit-hunts (concurrent-state ordering).
#[must_use]
pub fn intercept_mode_enabled() -> bool {
    INTERCEPT_MODE.load(Ordering::Acquire)
}

/// Serializes the (flip, drain) pair in toggle/set so two concurrent
/// toggles can't interleave such that the drain runs while another
/// thread has already flipped intercept back ON. Without this guard
/// the audit's `test_concurrent_toggle_race` reproduced the
/// spurious-release bug.
static MODE_TRANSITION: std::sync::Mutex<()> = std::sync::Mutex::new(());

/// Toggle intercept-mode and return the new value. When toggling
/// OFF, drains every pending intercept with `Release` so existing
/// requests don't wedge.
pub fn toggle_intercept_mode() -> bool {
    // Hold MODE_TRANSITION across the entire (read-modify-drain)
    // sequence — the atomic alone isn't enough because the drain is
    // a separate observation of the store. Closes the TOCTOU window
    // identified by the 2026-05-10 audit.
    let _guard = MODE_TRANSITION
        .lock()
        .unwrap_or_else(std::sync::PoisonError::into_inner);
    // R60 pass-21 §15: Release pairs with the Acquire load in
    // `intercept_mode_enabled()` — establishes the happens-before edge
    // required for the reader to observe the new value on ARM/aarch64.
    let prev = INTERCEPT_MODE.fetch_xor(true, Ordering::Release);
    let now_on = !prev;
    if !now_on {
        let _ = global_store().drain_release();
    }
    now_on
}

/// Force intercept-mode to a specific value (test / programmatic
/// override). Drains pending on transition to OFF.
pub fn set_intercept_mode(on: bool) {
    let _guard = MODE_TRANSITION
        .lock()
        .unwrap_or_else(std::sync::PoisonError::into_inner);
    // R60 pass-21 §15: Release pairs with Acquire in
    // `intercept_mode_enabled()`. See toggle_intercept_mode comment.
    let prev = INTERCEPT_MODE.swap(on, Ordering::Release);
    if prev && !on {
        let _ = global_store().drain_release();
    }
}

/// Get the process-wide intercept store, initialising it on first
/// access.
pub fn global_store() -> &'static InterceptStore {
    INTERCEPT_STORE.get_or_init(InterceptStore::new)
}

/// Decision the operator (or the timeout fallback) returns to the
/// blocked request.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InterceptDecision {
    /// Forward the request unmodified.
    Release,
    /// Return a synthetic 403 to the client; never hits upstream.
    Kill,
}

/// One pending intercept the TUI shows + the operator acts on.
#[derive(Debug, Clone)]
pub struct PendingIntercept {
    pub id: u64,
    pub host: String,
    pub method: String,
    pub path: String,
    /// When the request was registered.
    pub since: Instant,
}

/// Shared per-process intercept store.
#[derive(Debug, Default, Clone)]
pub struct InterceptStore {
    inner: Arc<Mutex<InterceptInner>>,
}

#[derive(Debug, Default)]
struct InterceptInner {
    /// Per-request rendezvous sender. Removed when the operator
    /// resolves the intercept (release/kill) or when a timeout fires.
    senders: BTreeMap<u64, oneshot::Sender<InterceptDecision>>,
    /// Snapshot of the same set the TUI iterates for display.
    pending: BTreeMap<u64, PendingIntercept>,
    /// Monotonic ID generator. Starts at 0 so the first `register`
    /// call's `wrapping_add(1)` yields id=1 — id=0 is RESERVED as
    /// an "invalid intercept" sentinel. `resolve(0, ...)` and
    /// `cancel(0, ...)` silently return false, so callers must
    /// never pass 0 expecting it to map to a real intercept.
    next_id: u64,
}

/// Default intercept timeout — after which the request defaults
/// to `Release` so the proxy never wedges if the operator walks
/// away.
pub const INTERCEPT_TIMEOUT: Duration = Duration::from_secs(30);

impl InterceptStore {
    pub fn new() -> Self {
        Self::default()
    }

    /// Register a fresh intercept and return the receiver the request
    /// handler should await on, plus the assigned ID.
    ///
    /// Each call also opportunistically GCs any senders whose receiver
    /// has been dropped — this catches the client-disconnect path where
    /// neither `resolve` nor the timeout's `cancel` fires (the request
    /// future is cancelled before either arm of `tokio::select!` runs).
    /// Without the GC the entries leak forever in `senders` + `pending`.
    pub fn register(
        &self,
        host: impl Into<String>,
        method: impl Into<String>,
        path: impl Into<String>,
    ) -> (u64, oneshot::Receiver<InterceptDecision>) {
        let (tx, rx) = oneshot::channel();
        let mut inner = self
            .inner
            .lock()
            .unwrap_or_else(std::sync::PoisonError::into_inner);
        // GC closed senders (client-disconnect leak).
        let dead: Vec<u64> = inner
            .senders
            .iter()
            .filter(|(_, tx)| tx.is_closed())
            .map(|(id, _)| *id)
            .collect();
        for id in dead {
            inner.senders.remove(&id);
            inner.pending.remove(&id);
        }
        // Advance the counter and skip 0 (reserved sentinel). After
        // u64::MAX registrations, wrapping_add gives 0; wrapping_add
        // once more gives 1, resuming the sequence. IDs are not
        // guaranteed unique if the map still holds the re-issued ID
        // at wraparound, but that requires ~1.8×10^19 concurrent
        // pending intercepts — practically impossible.
        inner.next_id = inner.next_id.wrapping_add(1);
        if inner.next_id == 0 {
            inner.next_id = 1;
        }
        let id = inner.next_id;
        inner.senders.insert(id, tx);
        inner.pending.insert(
            id,
            PendingIntercept {
                id,
                host: host.into(),
                method: method.into(),
                path: path.into(),
                since: Instant::now(),
            },
        );
        (id, rx)
    }

    /// Drop entries whose oneshot rx has been dropped. Exposed for
    /// tests + the TUI render loop, which can call this periodically
    /// even when no new intercepts are arriving.
    pub fn gc_dead_senders(&self) -> usize {
        let mut inner = self
            .inner
            .lock()
            .unwrap_or_else(std::sync::PoisonError::into_inner);
        let dead: Vec<u64> = inner
            .senders
            .iter()
            .filter(|(_, tx)| tx.is_closed())
            .map(|(id, _)| *id)
            .collect();
        let n = dead.len();
        for id in dead {
            inner.senders.remove(&id);
            inner.pending.remove(&id);
        }
        n
    }

    /// Resolve a pending intercept with a decision. Idempotent — a
    /// second resolve for the same id is a no-op.
    pub fn resolve(&self, id: u64, decision: InterceptDecision) -> bool {
        let mut inner = self
            .inner
            .lock()
            .unwrap_or_else(std::sync::PoisonError::into_inner);
        inner.pending.remove(&id);
        if let Some(tx) = inner.senders.remove(&id) {
            let _ = tx.send(decision);
            true
        } else {
            false
        }
    }

    /// Cancel a pending intercept WITHOUT sending a decision. The
    /// proxy calls this when the receiver is dropped (client
    /// disconnected, request timed out before the operator decided)
    /// so the sender + pending entry don't leak forever in the maps.
    /// Idempotent.
    ///
    /// Returns true if an entry was removed, false if no such id.
    pub fn cancel(&self, id: u64) -> bool {
        let mut inner = self
            .inner
            .lock()
            .unwrap_or_else(std::sync::PoisonError::into_inner);
        let removed_pending = inner.pending.remove(&id).is_some();
        let removed_sender = inner.senders.remove(&id).is_some();
        removed_pending || removed_sender
    }

    /// Release every pending intercept with `Release`. Used when the
    /// operator toggles intercept-mode OFF — don't strand existing
    /// requests.
    pub fn drain_release(&self) -> usize {
        let mut inner = self
            .inner
            .lock()
            .unwrap_or_else(std::sync::PoisonError::into_inner);
        let ids: Vec<u64> = inner.senders.keys().copied().collect();
        let mut released = 0;
        for id in ids {
            if let Some(tx) = inner.senders.remove(&id) {
                inner.pending.remove(&id);
                let _ = tx.send(InterceptDecision::Release);
                released += 1;
            }
        }
        released
    }

    /// Snapshot of the pending list for the TUI.
    pub fn snapshot(&self) -> Vec<PendingIntercept> {
        let inner = self
            .inner
            .lock()
            .unwrap_or_else(std::sync::PoisonError::into_inner);
        inner.pending.values().cloned().collect()
    }

    /// How many requests are currently parked in the rendezvous.
    pub fn pending_count(&self) -> usize {
        let inner = self
            .inner
            .lock()
            .unwrap_or_else(std::sync::PoisonError::into_inner);
        inner.pending.len()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn store() -> InterceptStore {
        InterceptStore::new()
    }

    #[tokio::test]
    async fn register_then_release_unblocks_with_release() {
        let s = store();
        let (id, rx) = s.register("h", "GET", "/");
        let s2 = s.clone();
        tokio::spawn(async move {
            tokio::time::sleep(Duration::from_millis(10)).await;
            s2.resolve(id, InterceptDecision::Release);
        });
        let decision = rx.await.expect("rx");
        assert_eq!(decision, InterceptDecision::Release);
        assert_eq!(s.pending_count(), 0, "pending must drain after resolve");
    }

    #[tokio::test]
    async fn register_then_kill_unblocks_with_kill() {
        let s = store();
        let (id, rx) = s.register("h", "POST", "/admin");
        let s2 = s.clone();
        tokio::spawn(async move {
            s2.resolve(id, InterceptDecision::Kill);
        });
        assert_eq!(rx.await.unwrap(), InterceptDecision::Kill);
    }

    #[tokio::test]
    async fn snapshot_shows_pending_until_resolved() {
        let s = store();
        let (id1, _r1) = s.register("a.com", "GET", "/x");
        let (id2, _r2) = s.register("b.com", "POST", "/y");
        let snap = s.snapshot();
        assert_eq!(snap.len(), 2);
        assert!(snap.iter().any(|p| p.id == id1 && p.host == "a.com"));
        assert!(snap.iter().any(|p| p.id == id2 && p.host == "b.com"));
    }

    #[tokio::test]
    async fn drain_release_unblocks_every_pending() {
        let s = store();
        let (_, rx1) = s.register("a", "GET", "/");
        let (_, rx2) = s.register("b", "GET", "/");
        let n = s.drain_release();
        assert_eq!(n, 2);
        assert_eq!(rx1.await.unwrap(), InterceptDecision::Release);
        assert_eq!(rx2.await.unwrap(), InterceptDecision::Release);
        assert_eq!(s.pending_count(), 0);
    }

    #[tokio::test]
    async fn resolve_unknown_id_is_idempotent_no_op() {
        let s = store();
        let acted = s.resolve(999, InterceptDecision::Release);
        assert!(!acted, "resolve of unknown id must report it didn't fire");
    }

    #[tokio::test]
    async fn resolve_twice_only_fires_once() {
        let s = store();
        let (id, rx) = s.register("h", "GET", "/");
        assert!(s.resolve(id, InterceptDecision::Release));
        assert!(
            !s.resolve(id, InterceptDecision::Kill),
            "second resolve must no-op"
        );
        assert_eq!(rx.await.unwrap(), InterceptDecision::Release);
    }

    #[tokio::test]
    async fn timeout_default_release_via_select() {
        // The proxy uses tokio::select! { _ = rx => …, _ = sleep(TIMEOUT) => Release }.
        // Verifies the receiver actually waits forever when no resolve fires.
        let s = store();
        let (_id, rx) = s.register("h", "GET", "/");
        let result = tokio::time::timeout(Duration::from_millis(50), rx).await;
        assert!(result.is_err(), "rx must NOT complete on its own");
    }

    #[tokio::test]
    async fn ids_are_monotonic_per_register() {
        let s = store();
        let (id1, _) = s.register("a", "GET", "/");
        let (id2, _) = s.register("a", "GET", "/");
        let (id3, _) = s.register("a", "GET", "/");
        assert_eq!(id2, id1 + 1);
        assert_eq!(id3, id2 + 1);
    }

    #[test]
    fn id_zero_is_reserved_and_resolve_cancel_return_false() {
        // Contract regression: id=0 is a reserved sentinel — no
        // register() call can ever assign it (the first call hands
        // back id=1). Caller mistakes that pass 0 must not match
        // any real intercept; both resolve and cancel return false.
        let s = store();
        // No intercepts registered yet.
        assert!(!s.resolve(0, InterceptDecision::Release));
        assert!(!s.cancel(0));
        // Register one — id is 1, never 0.
        let (id, _rx) = s.register("h", "GET", "/");
        assert_eq!(id, 1, "first id must be 1 (0 is reserved)");
        // 0 still returns false even with real intercepts present.
        assert!(!s.resolve(0, InterceptDecision::Release));
        assert!(!s.cancel(0));
    }

    #[test]
    fn id_wraparound_skips_zero() {
        // Regression: after u64::MAX registrations, wrapping_add(1) == 0,
        // violating the "0 is reserved" contract. The fix skips 0 and
        // assigns 1. Simulate by forcing next_id to u64::MAX - 1.
        let s = store();
        {
            let mut inner = s.inner.lock().unwrap();
            inner.next_id = u64::MAX - 1;
        }
        // This register gets id = u64::MAX.
        let (id1, _rx1) = s.register("h", "GET", "/");
        assert_eq!(id1, u64::MAX, "pre-wraparound id must be u64::MAX");
        // Next register wraps: 0 is skipped, lands at 1.
        let (id2, _rx2) = s.register("h", "GET", "/");
        assert_eq!(id2, 1, "post-wraparound id must skip 0 and return 1");
        assert_ne!(id2, 0, "id=0 must never be issued");
    }

    #[test]
    fn cancel_removes_from_both_maps() {
        let s = store();
        let (id, _rx) = s.register("h", "GET", "/path");
        assert_eq!(s.pending_count(), 1);
        let removed = s.cancel(id);
        assert!(removed, "cancel must return true for a valid id");
        assert_eq!(s.pending_count(), 0, "cancel must drain from pending map");
        // Second cancel is idempotent.
        assert!(!s.cancel(id), "second cancel returns false (already gone)");
    }

    #[test]
    fn gc_dead_senders_removes_disconnected_rx() {
        let s = store();
        let (id, rx) = s.register("h", "GET", "/");
        // Drop the receiver — simulates client disconnect.
        drop(rx);
        // gc_dead_senders must clean up the orphaned sender + pending entry.
        let removed = s.gc_dead_senders();
        assert_eq!(removed, 1, "exactly one dead sender must be GCd");
        assert_eq!(s.pending_count(), 0);
        // The id must now be truly gone.
        assert!(!s.cancel(id));
    }

    #[test]
    fn resolve_zero_id_never_matches_real_intercept() {
        // After wraparound the first real id re-issued is 1, never 0.
        // An outstanding resolve(0) must not accidentally hit the real id=1.
        let s = store();
        {
            let mut inner = s.inner.lock().unwrap();
            inner.next_id = u64::MAX;
        }
        // id=1 after wraparound.
        let (id, _rx) = s.register("h", "GET", "/");
        assert_eq!(id, 1);
        // resolve(0) must not affect the real id=1 entry.
        assert!(!s.resolve(0, InterceptDecision::Kill));
        assert_eq!(
            s.pending_count(),
            1,
            "id=1 must still be pending after resolve(0)"
        );
    }
}