Skip to main content

keyhog_verifier/verify/
mod.rs

1//! Verification execution logic.
2//!
3//! Verification is explicitly opt-in via the `--verify` CLI flag.
4//! Security invariants for this module:
5//! - Credentials are never stored permanently. They are only used in-memory for the current run.
6//! - HTTPS only. TLS certificate validation stays enabled for every request.
7//! - Private IPs and private DNS resolutions are blocked to reduce SSRF risk.
8//! - Redirects are not followed.
9//! - Response bodies are capped at 1 MB.
10
11mod auth;
12mod aws;
13mod credential;
14mod multi_step;
15mod request;
16mod response;
17
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::time::Duration;
21
22use dashmap::DashMap;
23use keyhog_core::{VerificationResult, VerifiedFinding};
24use reqwest::Client;
25use tokio::sync::{Notify, Semaphore};
26use tokio::task::JoinSet;
27
28use crate::cache;
29use crate::{into_finding, DedupedMatch, VerificationEngine, VerifyConfig, VerifyError};
30
31pub(crate) use aws::build_aws_probe;
32pub use aws::format_sigv4_timestamps;
33pub(crate) use credential::{verify_with_retry, VerificationAttempt};
34pub(crate) use request::{
35    build_request_for_step, execute_request, resolved_client_for_url, RequestBuildResult,
36};
37pub(crate) use response::{
38    body_indicates_error, evaluate_success, extract_metadata, read_response_body,
39};
40
41const DEFAULT_SERVICE_CONCURRENCY: usize = 5;
42
43#[derive(Clone)]
44struct VerifyTaskShared {
45    global_semaphore: Arc<Semaphore>,
46    service_semaphores: Arc<HashMap<Arc<str>, Arc<Semaphore>>>,
47    client: Client,
48    detectors: Arc<HashMap<Arc<str>, keyhog_core::DetectorSpec>>,
49    timeout: Duration,
50    cache: Arc<cache::VerificationCache>,
51    inflight: Arc<DashMap<(Arc<str>, Arc<str>), Arc<Notify>>>,
52    max_inflight_keys: usize,
53    danger_allow_private_ips: bool,
54    danger_allow_http: bool,
55    /// Mirrors `VerifyConfig.insecure_tls`. Threaded into
56    /// `resolved_client_for_url` so the DNS-pinned per-request client
57    /// rebuild honors the `--insecure` flag the operator set on the
58    /// engine. Without this the base client accepts invalid certs but
59    /// the rebuild path rejects them - the flag silently does nothing
60    /// for direct (non-proxy) connections. 2026-05-26.
61    insecure_tls: bool,
62    /// `true` when the engine'"'"'s base client was built with a proxy. The
63    /// per-request DNS-pinned client rebuild path in
64    /// `resolved_client_for_url` MUST NOT fire when a proxy is in use,
65    /// or the proxy config silently gets dropped. We carry the bool
66    /// rather than the proxy URL itself because no downstream code
67    /// needs the URL - only the "skip the rebuild" signal.
68    proxy_in_use: bool,
69    oob_session: Option<Arc<crate::oob::OobSession>>,
70}
71
72struct InflightGuard {
73    key: (Arc<str>, Arc<str>),
74    inflight: Arc<DashMap<(Arc<str>, Arc<str>), Arc<Notify>>>,
75    notify: Arc<Notify>,
76}
77
78impl Drop for InflightGuard {
79    fn drop(&mut self) {
80        // DashMap's per-shard locking means this never blocks a tokio worker
81        // for more than the time to mutate one shard - orders of magnitude
82        // less than the previous global parking_lot::Mutex which was held
83        // across the entire HashMap traversal in the await loop.
84        self.inflight.remove(&self.key);
85        self.notify.notify_waiters();
86    }
87}
88
89async fn verify_group_task(shared: VerifyTaskShared, group: DedupedMatch) -> VerifiedFinding {
90    let global = shared.global_semaphore;
91    let service_sem = shared
92        .service_semaphores
93        .get(&*group.service)
94        .cloned()
95        .unwrap_or_else(|| Arc::new(Semaphore::new(DEFAULT_SERVICE_CONCURRENCY)));
96    let client = shared.client;
97    let detector = shared.detectors.get(&*group.detector_id).cloned();
98    let timeout = shared.timeout;
99
100    let cache = shared.cache;
101    let inflight = shared.inflight;
102    let max_inflight_keys = shared.max_inflight_keys;
103
104    let Ok(_global_permit) = global.acquire().await else {
105        return into_finding(
106            group,
107            VerificationResult::Error("semaphore closed".into()),
108            HashMap::new(),
109        );
110    };
111    let Ok(_service_permit) = service_sem.acquire().await else {
112        return into_finding(
113            group,
114            VerificationResult::Error("service semaphore closed".into()),
115            HashMap::new(),
116        );
117    };
118
119    if let Some((cached_result, cached_meta)) = cache.get(&group.credential, &group.detector_id) {
120        return into_finding(group, cached_result, cached_meta);
121    }
122
123    let _inflight_guard = loop {
124        let notify_to_await: Option<Arc<Notify>> = {
125            // Inflight dedup via DashMap: per-shard locks instead of one
126            // global parking_lot::Mutex held across HashMap operations in an
127            // async context (anti-pattern that stalled the tokio runtime
128            // under high concurrency - see legendary-2026-04-26).
129            if inflight.len() >= max_inflight_keys {
130                break None;
131            }
132
133            let key = (group.detector_id.clone(), group.credential.clone());
134            if let Some((cached_result, cached_meta)) =
135                cache.get(&group.credential, &group.detector_id)
136            {
137                return into_finding(group, cached_result, cached_meta);
138            }
139
140            match inflight.entry(key.clone()) {
141                dashmap::mapref::entry::Entry::Occupied(entry) => Some(entry.get().clone()),
142                dashmap::mapref::entry::Entry::Vacant(entry) => {
143                    let notify = Arc::new(Notify::new());
144                    entry.insert(notify.clone());
145                    break Some(InflightGuard {
146                        key,
147                        inflight: inflight.clone(),
148                        notify,
149                    });
150                }
151            }
152        };
153
154        if let Some(notify) = notify_to_await {
155            notify.notified().await;
156        } else {
157            break None;
158        }
159    };
160
161    let (verification, metadata) = if let Some(custom_verifier) =
162        keyhog_core::registry::get_verifier_registry().get(&group.detector_id)
163    {
164        custom_verifier.verify(&group).await
165    } else {
166        match &detector {
167            Some(det) => match &det.verify {
168                Some(verify_spec) => {
169                    verify_with_retry(
170                        &client,
171                        verify_spec,
172                        &group.credential,
173                        &group.companions,
174                        timeout,
175                        shared.danger_allow_private_ips,
176                        shared.danger_allow_http,
177                        shared.proxy_in_use,
178                        shared.insecure_tls,
179                        shared.oob_session.as_ref(),
180                    )
181                    .await
182                }
183                None => (VerificationResult::Unverifiable, HashMap::new()),
184            },
185            None => (VerificationResult::Unverifiable, HashMap::new()),
186        }
187    };
188
189    cache.put(
190        &group.credential,
191        &group.detector_id,
192        verification.clone(),
193        metadata.clone(),
194    );
195
196    into_finding(group, verification, metadata)
197}
198
199impl VerificationEngine {
200    /// Create a verifier with shared HTTP client, cache, and concurrency controls.
201    pub fn new(
202        detectors: &[keyhog_core::DetectorSpec],
203        config: VerifyConfig,
204    ) -> Result<Self, VerifyError> {
205        let mut builder = Client::builder()
206            .timeout(config.timeout)
207            // Cert validation: ON by default, escape hatch ONLY through the
208            // explicit `VerifyConfig.insecure_tls` knob (or `KEYHOG_INSECURE_TLS`
209            // via CLI). Production paths never flip this.
210            .danger_accept_invalid_certs(config.insecure_tls)
211            .redirect(reqwest::redirect::Policy::none());
212        builder = crate::apply_proxy_config(builder, config.proxy.as_deref())
213            .map_err(VerifyError::ProxyConfig)?;
214        let client = builder.build().map_err(VerifyError::ClientBuild)?;
215
216        let detector_map: HashMap<Arc<str>, keyhog_core::DetectorSpec> = detectors
217            .iter()
218            .cloned()
219            .map(|d| (d.id.clone().into(), d))
220            .collect();
221
222        let mut service_semaphores = HashMap::new();
223        for d in detectors {
224            service_semaphores
225                .entry(d.service.clone().into())
226                .or_insert_with(|| {
227                    Arc::new(Semaphore::new(config.max_concurrent_per_service.max(1)))
228                });
229        }
230
231        Ok(Self {
232            client,
233            detectors: Arc::new(detector_map),
234            service_semaphores: Arc::new(service_semaphores),
235            global_semaphore: Arc::new(Semaphore::new(config.max_concurrent_global.max(1))),
236            timeout: config.timeout,
237            cache: Arc::new(cache::VerificationCache::default_ttl()),
238            inflight: Arc::new(DashMap::new()),
239            max_inflight_keys: config.max_inflight_keys.max(1),
240            danger_allow_private_ips: config.danger_allow_private_ips,
241            danger_allow_http: config.danger_allow_http,
242            insecure_tls: config.insecure_tls,
243            // Issue #2 + #3 fix: don't conflate "configured to set a proxy
244            // policy" with "a proxy is actively routing traffic."
245            // `proxy_is_active` resolves the documented `KEYHOG_PROXY` sentinels
246            // (`off`/`none`/empty) AND accounts for reqwest's standard
247            // `HTTPS_PROXY` / `HTTP_PROXY` / `ALL_PROXY` env vars that the
248            // shared client honors via reqwest defaults. proxy_in_use gates
249            // DNS-pinning rebuild in resolved_client_for_url(); a wrong value
250            // either:
251            //   #2: disables DNS pinning when no proxy is actually active
252            //       (KEYHOG_PROXY=off counted as active → rebuild skipped
253            //       → DNS-rebinding protection silently removed)
254            //   #3: keeps DNS pinning on a path that drops env-proxy +
255            //       insecure_tls config (HTTPS_PROXY invisible to the
256            //       rebuild branch → verifier connects direct → operator
257            //       interception/audit bypassed)
258            proxy_in_use: crate::proxy_is_active(config.proxy.as_deref()),
259            oob_session: None,
260        })
261    }
262
263    /// Verify a batch of deduplicated raw matches in parallel.
264    pub async fn verify_all(&self, groups: Vec<DedupedMatch>) -> Vec<VerifiedFinding> {
265        let max_active = self.global_semaphore.available_permits().max(1);
266        let total = groups.len();
267        let shared = VerifyTaskShared {
268            global_semaphore: self.global_semaphore.clone(),
269            service_semaphores: self.service_semaphores.clone(),
270            client: self.client.clone(),
271            detectors: self.detectors.clone(),
272            timeout: self.timeout,
273            cache: self.cache.clone(),
274            inflight: self.inflight.clone(),
275            max_inflight_keys: self.max_inflight_keys,
276            danger_allow_private_ips: self.danger_allow_private_ips,
277            danger_allow_http: self.danger_allow_http,
278            insecure_tls: self.insecure_tls,
279            proxy_in_use: self.proxy_in_use,
280            oob_session: self.oob_session.clone(),
281        };
282        let mut pending = groups.into_iter();
283        let mut join_set = JoinSet::new();
284
285        while join_set.len() < max_active {
286            let Some(group) = pending.next() else {
287                break;
288            };
289            join_set.spawn(verify_group_task(shared.clone(), group));
290        }
291
292        let mut findings = Vec::with_capacity(total);
293        while let Some(result) = join_set.join_next().await {
294            match result {
295                Ok(finding) => findings.push(finding),
296                Err(e) => tracing::error!("verification task panicked: {}", e),
297            }
298
299            if let Some(group) = pending.next() {
300                join_set.spawn(verify_group_task(shared.clone(), group));
301            }
302        }
303        findings
304    }
305
306    /// Enable out-of-band callback verification for detectors with
307    /// `[detector.verify.oob]`. Registers a fresh interactsh session against
308    /// the configured collector and starts the polling loop. Subsequent
309    /// `verify_all` calls will mint per-finding callback URLs and combine
310    /// HTTP success criteria with OOB observations per the detector's policy.
311    ///
312    /// Idempotent: a second call replaces the previous session (the old one
313    /// is shut down). Errors here do *not* abort the engine - call sites
314    /// log + continue with OOB disabled rather than failing the whole scan.
315    pub async fn enable_oob(
316        &mut self,
317        config: crate::oob::OobConfig,
318    ) -> Result<(), crate::oob::InteractshError> {
319        if let Some(old) = self.oob_session.take() {
320            old.shutdown().await;
321        }
322        let session = crate::oob::OobSession::start(self.client.clone(), config).await?;
323        self.oob_session = Some(session);
324        Ok(())
325    }
326
327    /// Tear down the OOB session if one is active. Idempotent. Call before
328    /// dropping the engine to deregister cleanly with the collector.
329    pub async fn shutdown_oob(&mut self) {
330        if let Some(session) = self.oob_session.take() {
331            session.shutdown().await;
332        }
333    }
334}
335
336impl Drop for VerificationEngine {
337    fn drop(&mut self) {
338        // Best-effort safety net: if the caller forgot to `shutdown_oob().await`
339        // before dropping the engine, we still need to stop the background
340        // poller - otherwise it keeps polling the collector indefinitely
341        // even after the scan that produced it is gone, leaking a tokio
342        // task and a network connection.
343        //
344        // We can't block on async cleanup in `Drop`, so we abort the
345        // poller's join handle synchronously. The deregister POST is
346        // skipped (the collector prunes inactive sessions on its own
347        // retention timer), but the poller stops immediately.
348        if let Some(session) = self.oob_session.take() {
349            session.abort_poller_for_drop();
350        }
351    }
352}