Skip to main content

keyhog_verifier/oob/
session.rs

1//! Engine-scoped OOB session: one interactsh registration shared by every
2//! verification, a background polling loop, and per-finding wait notifications.
3//!
4//! ## Design
5//!
6//! - **One registration per scan.** RSA-2048 keygen + register adds ~150ms
7//!   startup; doing it per finding would burn 859× that. We register once at
8//!   engine boot and mint per-finding URLs from the same correlation id.
9//! - **Single poller.** A background `tokio::task` polls every
10//!   `poll_interval` and fans interactions out to per-id `Notify` waiters.
11//!   Findings that mint a URL but never get hit just time out; the poller
12//!   doesn't care.
13//! - **Bounded retention.** Observations are stored in a `DashMap` keyed by
14//!   unique-id. A simple `pending` set tracks ids actually being awaited;
15//!   once a finding observes its callback we drop the entry, and a periodic
16//!   GC pass evicts ids older than `max_observation_age` so a long scan
17//!   doesn't grow unbounded.
18
19use std::collections::HashMap;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use dashmap::DashMap;
25use parking_lot::Mutex;
26use reqwest::Client;
27use tokio::sync::Notify;
28use tokio::task::JoinHandle;
29use tracing::{debug, info, warn};
30
31use super::client::{Interaction, InteractionProtocol, InteractshClient};
32
33/// Runtime configuration for the OOB session. Surfaced through the CLI as
34/// `--verify-oob`, `--oob-server`, `--oob-timeout`.
35#[derive(Debug, Clone)]
36pub struct OobConfig {
37    /// Interactsh server. Default `oast.fun` (projectdiscovery's public
38    /// collector). Self-host for sensitive scans.
39    pub server: String,
40    /// Default per-finding wait timeout when the detector spec doesn't override.
41    pub default_timeout: Duration,
42    /// Hard cap on per-finding wait, regardless of spec. Bounds total scan time.
43    pub max_timeout: Duration,
44    /// How often the poller hits the collector.
45    pub poll_interval: Duration,
46    /// Drop observations older than this from memory. Long-running scans
47    /// won't accumulate stale events.
48    pub max_observation_age: Duration,
49}
50
51impl Default for OobConfig {
52    fn default() -> Self {
53        Self {
54            server: "oast.fun".to_string(),
55            default_timeout: Duration::from_secs(30),
56            max_timeout: Duration::from_secs(120),
57            poll_interval: Duration::from_secs(2),
58            max_observation_age: Duration::from_secs(600),
59        }
60    }
61}
62
63/// What the verifier sees after waiting on a minted URL.
64#[derive(Debug, Clone)]
65pub enum OobObservation {
66    Observed {
67        protocol: InteractionProtocol,
68        remote_address: String,
69        timestamp: String,
70        raw_payload: String,
71    },
72    /// Timed out before any matching interaction arrived.
73    NotObserved,
74    /// OOB session is unavailable (register failed, poller died). The verifier
75    /// degrades to HTTP-only success criteria for this finding.
76    Disabled(String),
77}
78
79struct StoredInteraction {
80    interaction: Interaction,
81    received_at: Instant,
82}
83
84/// Engine-shared OOB session. Wrap in `Arc` and share across verify tasks.
85pub struct OobSession {
86    client: Arc<InteractshClient>,
87    config: OobConfig,
88    /// id → first observed interaction. Once observed, future polls for the
89    /// same id are no-ops (we keep the entry until GC for late waiters that
90    /// haven't called `wait_for` yet).
91    observations: Arc<DashMap<String, StoredInteraction>>,
92    /// id → notify handle. Populated by `wait_for` before it parks; the
93    /// poller signals on match. `Mutex<HashMap>` over a `DashMap` because
94    /// we need atomic insert-and-check-existing; contention is bounded
95    /// (one entry per in-flight finding, ~max_concurrent_global).
96    waiters: Arc<Mutex<HashMap<String, Arc<Notify>>>>,
97    poller_handle: Mutex<Option<JoinHandle<()>>>,
98    shutdown: Arc<AtomicBool>,
99}
100
101impl OobSession {
102    /// Boot the session: register with the collector and spawn the poller.
103    /// Errors here are surface-level — caller logs and continues with OOB
104    /// disabled rather than aborting the scan.
105    pub async fn start(
106        http: Client,
107        config: OobConfig,
108    ) -> Result<Arc<Self>, super::InteractshError> {
109        let client = InteractshClient::register(http, &config.server).await?;
110        let client = Arc::new(client);
111        info!(
112            target: "keyhog::oob",
113            correlation_id = %client.correlation_id(),
114            server = %config.server,
115            "OOB verification enabled"
116        );
117        let session = Arc::new(Self {
118            client: Arc::clone(&client),
119            config: config.clone(),
120            observations: Arc::new(DashMap::new()),
121            waiters: Arc::new(Mutex::new(HashMap::new())),
122            poller_handle: Mutex::new(None),
123            shutdown: Arc::new(AtomicBool::new(false)),
124        });
125        let handle = spawn_poller(Arc::clone(&session));
126        *session.poller_handle.lock() = Some(handle);
127        Ok(session)
128    }
129
130    /// Mint a URL for a finding-in-flight. Returns the host and full URL the
131    /// caller should embed in the verification probe, plus the `unique_id`
132    /// to pass to `wait_for`.
133    pub fn mint(&self) -> super::client::MintedUrl {
134        self.client.mint_url()
135    }
136
137    /// Default per-finding wait timeout. Detector specs override this via
138    /// `[detector.verify.oob].timeout_secs`; the value is also clamped to
139    /// `max_timeout` inside `wait_for`.
140    pub fn config_default_timeout(&self) -> Duration {
141        self.config.default_timeout
142    }
143
144    /// Park until an interaction arrives for `unique_id`, or `timeout`
145    /// elapses, or shutdown — whichever comes first.
146    pub async fn wait_for(
147        &self,
148        unique_id: &str,
149        accepts: OobAccept,
150        timeout: Duration,
151    ) -> OobObservation {
152        if self.shutdown.load(Ordering::Acquire) {
153            return OobObservation::Disabled("session shut down".into());
154        }
155        let timeout = timeout.min(self.config.max_timeout);
156
157        // Fast path: poller may have observed it before we got here.
158        if let Some(obs) = self.peek_match(unique_id, accepts) {
159            return obs;
160        }
161
162        let notify = {
163            let mut waiters = self.waiters.lock();
164            waiters
165                .entry(unique_id.to_string())
166                .or_insert_with(|| Arc::new(Notify::new()))
167                .clone()
168        };
169
170        // Race we're closing:
171        //
172        //   t0  caller peek_match  →  no match
173        //   t1  poller store_and_notify  →  observation inserted
174        //   t2  poller fires notify_waiters() on the (existing) Notify
175        //   t3  caller calls notify.notified().await
176        //
177        // `notify_waiters()` does NOT store a permit and only wakes
178        // already-polled `Notified` futures. A future created at t3 was
179        // never polled at t2, so it never received the wake. The caller
180        // would then wait up to the full `timeout` window for a callback
181        // that already arrived.
182        //
183        // `Notified::enable()` registers the waiter at the Notify without
184        // polling. Any `notify_waiters()` after `enable()` returns is
185        // guaranteed to wake the future on its next poll. We enable BEFORE
186        // re-peeking observations so the sequence per loop iteration is:
187        //
188        //   1. Build a fresh notified future, enable() it (registers waiter).
189        //   2. Re-peek observations — catches anything stored before step 1.
190        //   3. await the notified future — catches anything stored after
191        //      step 1 (because the waiter is already registered).
192        //
193        // The future is recreated at the top of each iteration so that
194        // post-wakeup loops (e.g. notify fired but the protocol filter
195        // rejected the observation) re-arm against future stores.
196        let deadline = Instant::now() + timeout;
197        loop {
198            // Bail early if the session is shutting down. Without this check
199            // a parked wait_for would sleep the full timeout (default 30 s)
200            // after the engine's Drop fired — the shutdown path wakes
201            // every parked waiter, but they need to re-check shutdown to
202            // exit the loop instead of falling back into the next await.
203            if self.shutdown.load(Ordering::Acquire) {
204                self.waiters.lock().remove(unique_id);
205                return OobObservation::Disabled("session shut down".into());
206            }
207
208            let remaining = deadline.saturating_duration_since(Instant::now());
209            if remaining.is_zero() {
210                self.waiters.lock().remove(unique_id);
211                return OobObservation::NotObserved;
212            }
213
214            let mut notified = std::pin::pin!(notify.notified());
215            notified.as_mut().enable();
216
217            if let Some(obs) = self.peek_match(unique_id, accepts) {
218                self.waiters.lock().remove(unique_id);
219                return obs;
220            }
221
222            let woken = tokio::time::timeout(remaining, notified.as_mut()).await;
223            if let Some(obs) = self.peek_match(unique_id, accepts) {
224                self.waiters.lock().remove(unique_id);
225                return obs;
226            }
227            if woken.is_err() {
228                self.waiters.lock().remove(unique_id);
229                return OobObservation::NotObserved;
230            }
231            // Wakeup but no matching observation (e.g. wrong protocol filter,
232            // or notify_waiters fired without a corresponding store). Loop
233            // with a fresh notified future to re-arm.
234        }
235    }
236
237    /// Best-effort shutdown: stop poller, wake parked waiters, deregister.
238    /// Idempotent.
239    pub async fn shutdown(self: &Arc<Self>) {
240        if self.shutdown.swap(true, Ordering::AcqRel) {
241            return;
242        }
243        self.wake_all_waiters();
244        let handle = self.poller_handle.lock().take();
245        if let Some(h) = handle {
246            h.abort();
247            let _ = h.await;
248        }
249        if let Err(e) = self.client.deregister().await {
250            debug!(target: "keyhog::oob", error = %e, "deregister failed (non-fatal)");
251        }
252    }
253
254    /// Synchronous abort path used from `VerificationEngine::Drop` when the
255    /// caller forgot to `shutdown_oob().await`. We can't await deregister
256    /// from a sync context, so we just stop the poller and wake every
257    /// parked `wait_for` so they observe `shutdown=true` and return
258    /// `Disabled` instead of sleeping the rest of their per-finding
259    /// timeout. The collector prunes inactive sessions on its own
260    /// retention timer.
261    ///
262    /// Idempotent. Once called, subsequent `wait_for` invocations return
263    /// `Disabled("session shut down")`.
264    pub fn abort_poller_for_drop(&self) {
265        if self.shutdown.swap(true, Ordering::AcqRel) {
266            return;
267        }
268        self.wake_all_waiters();
269        if let Some(h) = self.poller_handle.lock().take() {
270            h.abort();
271            // No `.await` — the JoinHandle is dropped; the abort signal is
272            // delivered asynchronously by the runtime.
273        }
274    }
275
276    /// Wake every parked `wait_for` once. Each wakes, sees `shutdown=true`
277    /// at the top of its loop, and returns `Disabled`. Drains the waiter
278    /// table so a future store_and_notify (e.g. a poll-in-flight that
279    /// resolves after shutdown) doesn't try to fire on a dead waiter.
280    fn wake_all_waiters(&self) {
281        let drained: Vec<Arc<Notify>> = {
282            let mut waiters = self.waiters.lock();
283            waiters.drain().map(|(_, n)| n).collect()
284        };
285        for notify in drained {
286            notify.notify_waiters();
287        }
288    }
289
290    fn peek_match(&self, unique_id: &str, accepts: OobAccept) -> Option<OobObservation> {
291        let stored = self.observations.get(unique_id)?;
292        if !accepts.matches(stored.interaction.protocol) {
293            return None;
294        }
295        Some(OobObservation::Observed {
296            protocol: stored.interaction.protocol,
297            remote_address: stored.interaction.remote_address.clone(),
298            timestamp: stored.interaction.timestamp.clone(),
299            raw_payload: stored.interaction.raw_payload.clone(),
300        })
301    }
302
303    fn store_and_notify(&self, interaction: Interaction) {
304        let id = interaction.unique_id.clone();
305        // First-write-wins. Repeat callbacks for the same id (a service that
306        // hits us twice) don't overwrite — the first observation is what the
307        // verifier will see.
308        let inserted = self
309            .observations
310            .entry(id.clone())
311            .or_insert_with(|| StoredInteraction {
312                interaction,
313                received_at: Instant::now(),
314            });
315        let _ = inserted; // hold guard scope
316        if let Some(notify) = self.waiters.lock().get(&id) {
317            notify.notify_waiters();
318        }
319    }
320
321    fn gc(&self) {
322        let cutoff = Instant::now()
323            .checked_sub(self.config.max_observation_age)
324            .unwrap_or_else(Instant::now);
325        self.observations
326            .retain(|_, stored| stored.received_at >= cutoff);
327    }
328
329    /// Test-only constructor that bypasses both the network registration and
330    /// the background poller. Lets unit tests exercise the wait_for /
331    /// store_and_notify race + shutdown logic without standing up an
332    /// interactsh server.
333    #[cfg(test)]
334    pub(crate) fn for_test(client: Arc<InteractshClient>, config: OobConfig) -> Arc<Self> {
335        Arc::new(Self {
336            client,
337            config,
338            observations: Arc::new(DashMap::new()),
339            waiters: Arc::new(Mutex::new(HashMap::new())),
340            poller_handle: Mutex::new(None),
341            shutdown: Arc::new(AtomicBool::new(false)),
342        })
343    }
344
345    /// Test-only accessor — the unit tests need to fabricate `Interaction`
346    /// values and drive the notify path that the poller would normally
347    /// drive. Production code never calls this.
348    #[cfg(test)]
349    pub(crate) fn store_and_notify_for_test(&self, interaction: super::client::Interaction) {
350        self.store_and_notify(interaction);
351    }
352}
353
354/// Filter for which interaction protocols satisfy a wait. Mirrors `OobProtocol`
355/// in the spec but lives here to keep the verifier crate's domain clean.
356#[derive(Debug, Clone, Copy)]
357pub enum OobAccept {
358    Dns,
359    Http,
360    Smtp,
361    Any,
362}
363
364impl OobAccept {
365    fn matches(self, p: InteractionProtocol) -> bool {
366        matches!(
367            (self, p),
368            (Self::Any, _)
369                | (Self::Dns, InteractionProtocol::Dns)
370                | (Self::Http, InteractionProtocol::Http)
371                | (Self::Smtp, InteractionProtocol::Smtp)
372        )
373    }
374}
375
376impl From<keyhog_core::OobProtocol> for OobAccept {
377    fn from(p: keyhog_core::OobProtocol) -> Self {
378        match p {
379            keyhog_core::OobProtocol::Dns => Self::Dns,
380            keyhog_core::OobProtocol::Http => Self::Http,
381            keyhog_core::OobProtocol::Smtp => Self::Smtp,
382            keyhog_core::OobProtocol::Any => Self::Any,
383        }
384    }
385}
386
387fn spawn_poller(session: Arc<OobSession>) -> JoinHandle<()> {
388    tokio::spawn(async move {
389        let mut consecutive_errors = 0u32;
390        let mut next_gc = Instant::now() + Duration::from_secs(60);
391        loop {
392            if session.shutdown.load(Ordering::Acquire) {
393                break;
394            }
395            match session.client.poll().await {
396                Ok(interactions) => {
397                    consecutive_errors = 0;
398                    for interaction in interactions {
399                        session.store_and_notify(interaction);
400                    }
401                }
402                Err(e) => {
403                    consecutive_errors += 1;
404                    // Backoff progressively, but cap so we don't go silent for
405                    // ages on a flaky collector.
406                    let backoff_secs = (1u64 << consecutive_errors.min(5)).min(30);
407                    warn!(
408                        target: "keyhog::oob",
409                        error = %e,
410                        consecutive_errors,
411                        backoff_secs,
412                        "interactsh poll failed; backing off"
413                    );
414                    tokio::time::sleep(Duration::from_secs(backoff_secs)).await;
415                    continue;
416                }
417            }
418            if Instant::now() >= next_gc {
419                session.gc();
420                next_gc = Instant::now() + Duration::from_secs(60);
421            }
422            tokio::time::sleep(session.config.poll_interval).await;
423        }
424        debug!(target: "keyhog::oob", "poller exiting");
425    })
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431
432    #[test]
433    fn oob_accept_filters_protocols() {
434        assert!(OobAccept::Any.matches(InteractionProtocol::Dns));
435        assert!(OobAccept::Any.matches(InteractionProtocol::Other));
436        assert!(OobAccept::Http.matches(InteractionProtocol::Http));
437        assert!(!OobAccept::Http.matches(InteractionProtocol::Dns));
438        assert!(OobAccept::Smtp.matches(InteractionProtocol::Smtp));
439        assert!(!OobAccept::Smtp.matches(InteractionProtocol::Http));
440    }
441
442    #[test]
443    fn oob_config_defaults_safe() {
444        let c = OobConfig::default();
445        assert_eq!(c.server, "oast.fun");
446        assert!(c.default_timeout <= c.max_timeout);
447        assert!(c.poll_interval < c.default_timeout);
448    }
449
450    fn test_session() -> Arc<OobSession> {
451        let client = Arc::new(super::super::client::InteractshClient::for_test("oast.fun"));
452        let config = OobConfig {
453            // Tighten timeouts so a misbehaving wait_for fails fast in tests
454            // rather than holding the whole suite for the default 30 s.
455            default_timeout: Duration::from_secs(2),
456            max_timeout: Duration::from_secs(2),
457            poll_interval: Duration::from_millis(50),
458            max_observation_age: Duration::from_secs(60),
459            ..OobConfig::default()
460        };
461        OobSession::for_test(client, config)
462    }
463
464    fn fake_interaction(
465        unique_id: &str,
466        protocol: InteractionProtocol,
467    ) -> super::super::client::Interaction {
468        super::super::client::Interaction {
469            unique_id: unique_id.to_string(),
470            protocol,
471            remote_address: "203.0.113.7".to_string(),
472            timestamp: "2026-05-06T00:00:00Z".to_string(),
473            raw_payload: "GET /probe HTTP/1.1".to_string(),
474        }
475    }
476
477    /// Race fix: a notify_waiters that fires AFTER the waiter is installed
478    /// but BEFORE the future is polled used to be lost. With Notified::enable()
479    /// the waiter is registered before any peek/await, so this can't happen.
480    #[tokio::test]
481    async fn wait_for_returns_immediately_when_observation_arrives_post_install() {
482        let session = test_session();
483        let id = "abcdefghijklmnopqrst1234567890123";
484        // Spawn wait_for; give it a tick to install its waiter and call enable().
485        let s = Arc::clone(&session);
486        let id_clone = id.to_string();
487        let task = tokio::spawn(async move {
488            s.wait_for(&id_clone, OobAccept::Http, Duration::from_secs(2))
489                .await
490        });
491        tokio::time::sleep(Duration::from_millis(50)).await;
492        // Now store + notify. With the old code (no enable() before peek)
493        // and a poll_interval of 50ms, the test would still pass because
494        // the post-await peek catches the observation. Here we assert it
495        // returns BEFORE the 2-second timeout — proving the wakeup path
496        // (not the timeout fallback) drove completion.
497        session.store_and_notify_for_test(fake_interaction(id, InteractionProtocol::Http));
498        let start = Instant::now();
499        let obs = task.await.expect("wait_for task panicked");
500        assert!(matches!(obs, OobObservation::Observed { .. }));
501        assert!(
502            start.elapsed() < Duration::from_millis(500),
503            "wait_for should resolve via wakeup, not timeout; took {:?}",
504            start.elapsed()
505        );
506    }
507
508    /// Pre-existing observation (stored before wait_for is even called)
509    /// is caught by the first peek_match — fastest path.
510    #[tokio::test]
511    async fn wait_for_fast_path_when_observation_already_present() {
512        let session = test_session();
513        let id = "preexistingidpreexistingidpreexis";
514        session.store_and_notify_for_test(fake_interaction(id, InteractionProtocol::Http));
515        let start = Instant::now();
516        let obs = session
517            .wait_for(id, OobAccept::Http, Duration::from_secs(2))
518            .await;
519        assert!(matches!(obs, OobObservation::Observed { .. }));
520        assert!(start.elapsed() < Duration::from_millis(50));
521    }
522
523    /// Protocol filter mismatch should keep the wait parked; correct
524    /// protocol arriving later should resolve it.
525    #[tokio::test]
526    async fn wait_for_filters_by_protocol() {
527        let session = test_session();
528        let id = "protofilteridprotofilteridprotofi";
529        // Store wrong-protocol first.
530        session.store_and_notify_for_test(fake_interaction(id, InteractionProtocol::Dns));
531        // Wait for HTTP — the DNS interaction is in the DashMap but
532        // doesn't satisfy the OobAccept::Http filter.
533        let s = Arc::clone(&session);
534        let task = tokio::spawn(async move {
535            s.wait_for(id, OobAccept::Http, Duration::from_millis(500))
536                .await
537        });
538        tokio::time::sleep(Duration::from_millis(50)).await;
539        // wait_for is parked. Verdict: NotObserved within the 500ms timeout
540        // because the DNS observation doesn't match Http. (DashMap stores
541        // first-write-wins, so a subsequent Http store wouldn't overwrite —
542        // documenting that semantic with this assertion.)
543        let obs = task.await.expect("task panicked");
544        assert!(matches!(obs, OobObservation::NotObserved));
545    }
546
547    /// Shutdown wakes parked waiters instead of leaving them to time out.
548    /// Critical robustness property: `VerificationEngine::Drop` must not
549    /// leave verification tasks hanging for the per-finding timeout.
550    #[tokio::test]
551    async fn shutdown_wakes_parked_waiter_promptly() {
552        let session = test_session();
553        let id = "shutdownidshutdownidshutdownidshu";
554        let s = Arc::clone(&session);
555        let task = tokio::spawn(async move {
556            s.wait_for(id, OobAccept::Http, Duration::from_secs(60))
557                .await
558        });
559        tokio::time::sleep(Duration::from_millis(50)).await;
560        let start = Instant::now();
561        session.abort_poller_for_drop();
562        let obs = task.await.expect("task panicked");
563        // Either Disabled (saw shutdown after wakeup) or NotObserved if the
564        // shutdown raced with a spurious wakeup-with-empty-DashMap. Both
565        // are acceptable; what's NOT acceptable is a 60-second wait.
566        assert!(
567            matches!(
568                obs,
569                OobObservation::Disabled(_) | OobObservation::NotObserved
570            ),
571            "expected Disabled or NotObserved post-shutdown; got {obs:?}"
572        );
573        assert!(
574            start.elapsed() < Duration::from_secs(1),
575            "shutdown should wake waiters promptly; took {:?}",
576            start.elapsed()
577        );
578    }
579
580    /// Shutdown invoked before wait_for is even called returns Disabled
581    /// at the entry check, never installing a waiter.
582    #[tokio::test]
583    async fn wait_for_after_shutdown_returns_disabled_immediately() {
584        let session = test_session();
585        session.abort_poller_for_drop();
586        let id = "afterdownidafterdownidafterdownid";
587        let start = Instant::now();
588        let obs = session
589            .wait_for(id, OobAccept::Http, Duration::from_secs(60))
590            .await;
591        assert!(matches!(obs, OobObservation::Disabled(_)));
592        assert!(start.elapsed() < Duration::from_millis(50));
593    }
594}