1use std::collections::HashMap;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use dashmap::DashMap;
25use parking_lot::Mutex;
26use reqwest::Client;
27use tokio::sync::Notify;
28use tokio::task::JoinHandle;
29use tracing::{debug, info, warn};
30
31use super::client::{Interaction, InteractionProtocol, InteractshClient};
32
33#[derive(Debug, Clone)]
36pub struct OobConfig {
37 pub server: String,
40 pub default_timeout: Duration,
42 pub max_timeout: Duration,
44 pub poll_interval: Duration,
46 pub max_observation_age: Duration,
49}
50
51impl Default for OobConfig {
52 fn default() -> Self {
53 Self {
54 server: "oast.fun".to_string(),
55 default_timeout: Duration::from_secs(30),
56 max_timeout: Duration::from_secs(120),
57 poll_interval: Duration::from_secs(2),
58 max_observation_age: Duration::from_secs(600),
59 }
60 }
61}
62
63#[derive(Debug, Clone)]
65pub enum OobObservation {
66 Observed {
67 protocol: InteractionProtocol,
68 remote_address: String,
69 timestamp: String,
70 raw_payload: String,
71 },
72 NotObserved,
74 Disabled(String),
77}
78
79struct StoredInteraction {
80 interaction: Interaction,
81 received_at: Instant,
82}
83
84pub struct OobSession {
86 client: Arc<InteractshClient>,
87 config: OobConfig,
88 observations: Arc<DashMap<String, StoredInteraction>>,
92 waiters: Arc<Mutex<HashMap<String, Arc<Notify>>>>,
97 poller_handle: Mutex<Option<JoinHandle<()>>>,
98 shutdown: Arc<AtomicBool>,
99}
100
101impl OobSession {
102 pub async fn start(
106 http: Client,
107 config: OobConfig,
108 ) -> Result<Arc<Self>, super::InteractshError> {
109 let client = InteractshClient::register(http, &config.server).await?;
110 let client = Arc::new(client);
111 info!(
112 target: "keyhog::oob",
113 correlation_id = %client.correlation_id(),
114 server = %config.server,
115 "OOB verification enabled"
116 );
117 let session = Arc::new(Self {
118 client: Arc::clone(&client),
119 config: config.clone(),
120 observations: Arc::new(DashMap::new()),
121 waiters: Arc::new(Mutex::new(HashMap::new())),
122 poller_handle: Mutex::new(None),
123 shutdown: Arc::new(AtomicBool::new(false)),
124 });
125 let handle = spawn_poller(Arc::clone(&session));
126 *session.poller_handle.lock() = Some(handle);
127 Ok(session)
128 }
129
130 pub fn mint(&self) -> super::client::MintedUrl {
134 self.client.mint_url()
135 }
136
137 pub fn config_default_timeout(&self) -> Duration {
141 self.config.default_timeout
142 }
143
144 pub async fn wait_for(
147 &self,
148 unique_id: &str,
149 accepts: OobAccept,
150 timeout: Duration,
151 ) -> OobObservation {
152 if self.shutdown.load(Ordering::Acquire) {
153 return OobObservation::Disabled("session shut down".into());
154 }
155 let timeout = timeout.min(self.config.max_timeout);
156
157 if let Some(obs) = self.peek_match(unique_id, accepts) {
159 return obs;
160 }
161
162 let notify = {
163 let mut waiters = self.waiters.lock();
164 waiters
165 .entry(unique_id.to_string())
166 .or_insert_with(|| Arc::new(Notify::new()))
167 .clone()
168 };
169
170 let deadline = Instant::now() + timeout;
197 loop {
198 if self.shutdown.load(Ordering::Acquire) {
204 self.waiters.lock().remove(unique_id);
205 return OobObservation::Disabled("session shut down".into());
206 }
207
208 let remaining = deadline.saturating_duration_since(Instant::now());
209 if remaining.is_zero() {
210 self.waiters.lock().remove(unique_id);
211 return OobObservation::NotObserved;
212 }
213
214 let mut notified = std::pin::pin!(notify.notified());
215 notified.as_mut().enable();
216
217 if let Some(obs) = self.peek_match(unique_id, accepts) {
218 self.waiters.lock().remove(unique_id);
219 return obs;
220 }
221
222 let woken = tokio::time::timeout(remaining, notified.as_mut()).await;
223 if let Some(obs) = self.peek_match(unique_id, accepts) {
224 self.waiters.lock().remove(unique_id);
225 return obs;
226 }
227 if woken.is_err() {
228 self.waiters.lock().remove(unique_id);
229 return OobObservation::NotObserved;
230 }
231 }
235 }
236
237 pub async fn shutdown(self: &Arc<Self>) {
240 if self.shutdown.swap(true, Ordering::AcqRel) {
241 return;
242 }
243 self.wake_all_waiters();
244 let handle = self.poller_handle.lock().take();
245 if let Some(h) = handle {
246 h.abort();
247 let _ = h.await;
248 }
249 if let Err(e) = self.client.deregister().await {
250 debug!(target: "keyhog::oob", error = %e, "deregister failed (non-fatal)");
251 }
252 }
253
254 pub fn abort_poller_for_drop(&self) {
265 if self.shutdown.swap(true, Ordering::AcqRel) {
266 return;
267 }
268 self.wake_all_waiters();
269 if let Some(h) = self.poller_handle.lock().take() {
270 h.abort();
271 }
274 }
275
276 fn wake_all_waiters(&self) {
281 let drained: Vec<Arc<Notify>> = {
282 let mut waiters = self.waiters.lock();
283 waiters.drain().map(|(_, n)| n).collect()
284 };
285 for notify in drained {
286 notify.notify_waiters();
287 }
288 }
289
290 fn peek_match(&self, unique_id: &str, accepts: OobAccept) -> Option<OobObservation> {
291 let stored = self.observations.get(unique_id)?;
292 if !accepts.matches(stored.interaction.protocol) {
293 return None;
294 }
295 Some(OobObservation::Observed {
296 protocol: stored.interaction.protocol,
297 remote_address: stored.interaction.remote_address.clone(),
298 timestamp: stored.interaction.timestamp.clone(),
299 raw_payload: stored.interaction.raw_payload.clone(),
300 })
301 }
302
303 fn store_and_notify(&self, interaction: Interaction) {
304 let id = interaction.unique_id.clone();
305 let inserted = self
309 .observations
310 .entry(id.clone())
311 .or_insert_with(|| StoredInteraction {
312 interaction,
313 received_at: Instant::now(),
314 });
315 let _ = inserted; if let Some(notify) = self.waiters.lock().get(&id) {
317 notify.notify_waiters();
318 }
319 }
320
321 fn gc(&self) {
322 let cutoff = Instant::now()
323 .checked_sub(self.config.max_observation_age)
324 .unwrap_or_else(Instant::now);
325 self.observations
326 .retain(|_, stored| stored.received_at >= cutoff);
327 }
328
329 #[cfg(test)]
334 pub(crate) fn for_test(client: Arc<InteractshClient>, config: OobConfig) -> Arc<Self> {
335 Arc::new(Self {
336 client,
337 config,
338 observations: Arc::new(DashMap::new()),
339 waiters: Arc::new(Mutex::new(HashMap::new())),
340 poller_handle: Mutex::new(None),
341 shutdown: Arc::new(AtomicBool::new(false)),
342 })
343 }
344
345 #[cfg(test)]
349 pub(crate) fn store_and_notify_for_test(&self, interaction: super::client::Interaction) {
350 self.store_and_notify(interaction);
351 }
352}
353
354#[derive(Debug, Clone, Copy)]
357pub enum OobAccept {
358 Dns,
359 Http,
360 Smtp,
361 Any,
362}
363
364impl OobAccept {
365 fn matches(self, p: InteractionProtocol) -> bool {
366 matches!(
367 (self, p),
368 (Self::Any, _)
369 | (Self::Dns, InteractionProtocol::Dns)
370 | (Self::Http, InteractionProtocol::Http)
371 | (Self::Smtp, InteractionProtocol::Smtp)
372 )
373 }
374}
375
376impl From<keyhog_core::OobProtocol> for OobAccept {
377 fn from(p: keyhog_core::OobProtocol) -> Self {
378 match p {
379 keyhog_core::OobProtocol::Dns => Self::Dns,
380 keyhog_core::OobProtocol::Http => Self::Http,
381 keyhog_core::OobProtocol::Smtp => Self::Smtp,
382 keyhog_core::OobProtocol::Any => Self::Any,
383 }
384 }
385}
386
387fn spawn_poller(session: Arc<OobSession>) -> JoinHandle<()> {
388 tokio::spawn(async move {
389 let mut consecutive_errors = 0u32;
390 let mut next_gc = Instant::now() + Duration::from_secs(60);
391 loop {
392 if session.shutdown.load(Ordering::Acquire) {
393 break;
394 }
395 match session.client.poll().await {
396 Ok(interactions) => {
397 consecutive_errors = 0;
398 for interaction in interactions {
399 session.store_and_notify(interaction);
400 }
401 }
402 Err(e) => {
403 consecutive_errors += 1;
404 let backoff_secs = (1u64 << consecutive_errors.min(5)).min(30);
407 warn!(
408 target: "keyhog::oob",
409 error = %e,
410 consecutive_errors,
411 backoff_secs,
412 "interactsh poll failed; backing off"
413 );
414 tokio::time::sleep(Duration::from_secs(backoff_secs)).await;
415 continue;
416 }
417 }
418 if Instant::now() >= next_gc {
419 session.gc();
420 next_gc = Instant::now() + Duration::from_secs(60);
421 }
422 tokio::time::sleep(session.config.poll_interval).await;
423 }
424 debug!(target: "keyhog::oob", "poller exiting");
425 })
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431
432 #[test]
433 fn oob_accept_filters_protocols() {
434 assert!(OobAccept::Any.matches(InteractionProtocol::Dns));
435 assert!(OobAccept::Any.matches(InteractionProtocol::Other));
436 assert!(OobAccept::Http.matches(InteractionProtocol::Http));
437 assert!(!OobAccept::Http.matches(InteractionProtocol::Dns));
438 assert!(OobAccept::Smtp.matches(InteractionProtocol::Smtp));
439 assert!(!OobAccept::Smtp.matches(InteractionProtocol::Http));
440 }
441
442 #[test]
443 fn oob_config_defaults_safe() {
444 let c = OobConfig::default();
445 assert_eq!(c.server, "oast.fun");
446 assert!(c.default_timeout <= c.max_timeout);
447 assert!(c.poll_interval < c.default_timeout);
448 }
449
450 fn test_session() -> Arc<OobSession> {
451 let client = Arc::new(super::super::client::InteractshClient::for_test("oast.fun"));
452 let config = OobConfig {
453 default_timeout: Duration::from_secs(2),
456 max_timeout: Duration::from_secs(2),
457 poll_interval: Duration::from_millis(50),
458 max_observation_age: Duration::from_secs(60),
459 ..OobConfig::default()
460 };
461 OobSession::for_test(client, config)
462 }
463
464 fn fake_interaction(
465 unique_id: &str,
466 protocol: InteractionProtocol,
467 ) -> super::super::client::Interaction {
468 super::super::client::Interaction {
469 unique_id: unique_id.to_string(),
470 protocol,
471 remote_address: "203.0.113.7".to_string(),
472 timestamp: "2026-05-06T00:00:00Z".to_string(),
473 raw_payload: "GET /probe HTTP/1.1".to_string(),
474 }
475 }
476
477 #[tokio::test]
481 async fn wait_for_returns_immediately_when_observation_arrives_post_install() {
482 let session = test_session();
483 let id = "abcdefghijklmnopqrst1234567890123";
484 let s = Arc::clone(&session);
486 let id_clone = id.to_string();
487 let task = tokio::spawn(async move {
488 s.wait_for(&id_clone, OobAccept::Http, Duration::from_secs(2))
489 .await
490 });
491 tokio::time::sleep(Duration::from_millis(50)).await;
492 session.store_and_notify_for_test(fake_interaction(id, InteractionProtocol::Http));
498 let start = Instant::now();
499 let obs = task.await.expect("wait_for task panicked");
500 assert!(matches!(obs, OobObservation::Observed { .. }));
501 assert!(
502 start.elapsed() < Duration::from_millis(500),
503 "wait_for should resolve via wakeup, not timeout; took {:?}",
504 start.elapsed()
505 );
506 }
507
508 #[tokio::test]
511 async fn wait_for_fast_path_when_observation_already_present() {
512 let session = test_session();
513 let id = "preexistingidpreexistingidpreexis";
514 session.store_and_notify_for_test(fake_interaction(id, InteractionProtocol::Http));
515 let start = Instant::now();
516 let obs = session
517 .wait_for(id, OobAccept::Http, Duration::from_secs(2))
518 .await;
519 assert!(matches!(obs, OobObservation::Observed { .. }));
520 assert!(start.elapsed() < Duration::from_millis(50));
521 }
522
523 #[tokio::test]
526 async fn wait_for_filters_by_protocol() {
527 let session = test_session();
528 let id = "protofilteridprotofilteridprotofi";
529 session.store_and_notify_for_test(fake_interaction(id, InteractionProtocol::Dns));
531 let s = Arc::clone(&session);
534 let task = tokio::spawn(async move {
535 s.wait_for(id, OobAccept::Http, Duration::from_millis(500))
536 .await
537 });
538 tokio::time::sleep(Duration::from_millis(50)).await;
539 let obs = task.await.expect("task panicked");
544 assert!(matches!(obs, OobObservation::NotObserved));
545 }
546
547 #[tokio::test]
551 async fn shutdown_wakes_parked_waiter_promptly() {
552 let session = test_session();
553 let id = "shutdownidshutdownidshutdownidshu";
554 let s = Arc::clone(&session);
555 let task = tokio::spawn(async move {
556 s.wait_for(id, OobAccept::Http, Duration::from_secs(60))
557 .await
558 });
559 tokio::time::sleep(Duration::from_millis(50)).await;
560 let start = Instant::now();
561 session.abort_poller_for_drop();
562 let obs = task.await.expect("task panicked");
563 assert!(
567 matches!(
568 obs,
569 OobObservation::Disabled(_) | OobObservation::NotObserved
570 ),
571 "expected Disabled or NotObserved post-shutdown; got {obs:?}"
572 );
573 assert!(
574 start.elapsed() < Duration::from_secs(1),
575 "shutdown should wake waiters promptly; took {:?}",
576 start.elapsed()
577 );
578 }
579
580 #[tokio::test]
583 async fn wait_for_after_shutdown_returns_disabled_immediately() {
584 let session = test_session();
585 session.abort_poller_for_drop();
586 let id = "afterdownidafterdownidafterdownid";
587 let start = Instant::now();
588 let obs = session
589 .wait_for(id, OobAccept::Http, Duration::from_secs(60))
590 .await;
591 assert!(matches!(obs, OobObservation::Disabled(_)));
592 assert!(start.elapsed() < Duration::from_millis(50));
593 }
594}