1mod auth;
12mod aws;
13mod credential;
14mod multi_step;
15mod request;
16mod response;
17
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::time::Duration;
21
22use dashmap::DashMap;
23use keyhog_core::{VerificationResult, VerifiedFinding};
24use reqwest::Client;
25use tokio::sync::{Notify, Semaphore};
26use tokio::task::JoinSet;
27
28use crate::cache;
29use crate::{into_finding, DedupedMatch, VerificationEngine, VerifyConfig, VerifyError};
30
31pub(crate) use aws::build_aws_probe;
32pub use aws::format_sigv4_timestamps;
33pub(crate) use credential::{verify_with_retry, VerificationAttempt};
34pub(crate) use request::{
35 build_request_for_step, execute_request, resolved_client_for_url, RequestBuildResult,
36};
37pub(crate) use response::{
38 body_indicates_error, evaluate_success, extract_metadata, read_response_body,
39};
40
41const DEFAULT_SERVICE_CONCURRENCY: usize = 5;
42
43#[derive(Clone)]
44struct VerifyTaskShared {
45 global_semaphore: Arc<Semaphore>,
46 service_semaphores: Arc<HashMap<Arc<str>, Arc<Semaphore>>>,
47 client: Client,
48 detectors: Arc<HashMap<Arc<str>, keyhog_core::DetectorSpec>>,
49 timeout: Duration,
50 cache: Arc<cache::VerificationCache>,
51 inflight: Arc<DashMap<(Arc<str>, Arc<str>), Arc<Notify>>>,
52 max_inflight_keys: usize,
53 danger_allow_private_ips: bool,
54 danger_allow_http: bool,
55 insecure_tls: bool,
62 proxy_in_use: bool,
69 oob_session: Option<Arc<crate::oob::OobSession>>,
70}
71
72struct InflightGuard {
73 key: (Arc<str>, Arc<str>),
74 inflight: Arc<DashMap<(Arc<str>, Arc<str>), Arc<Notify>>>,
75 notify: Arc<Notify>,
76}
77
78impl Drop for InflightGuard {
79 fn drop(&mut self) {
80 self.inflight.remove(&self.key);
85 self.notify.notify_waiters();
86 }
87}
88
89async fn verify_group_task(shared: VerifyTaskShared, group: DedupedMatch) -> VerifiedFinding {
90 let global = shared.global_semaphore;
91 let service_sem = shared
92 .service_semaphores
93 .get(&*group.service)
94 .cloned()
95 .unwrap_or_else(|| Arc::new(Semaphore::new(DEFAULT_SERVICE_CONCURRENCY)));
96 let client = shared.client;
97 let detector = shared.detectors.get(&*group.detector_id).cloned();
98 let timeout = shared.timeout;
99
100 let cache = shared.cache;
101 let inflight = shared.inflight;
102 let max_inflight_keys = shared.max_inflight_keys;
103
104 let Ok(_global_permit) = global.acquire().await else {
105 return into_finding(
106 group,
107 VerificationResult::Error("semaphore closed".into()),
108 HashMap::new(),
109 );
110 };
111 let Ok(_service_permit) = service_sem.acquire().await else {
112 return into_finding(
113 group,
114 VerificationResult::Error("service semaphore closed".into()),
115 HashMap::new(),
116 );
117 };
118
119 if let Some((cached_result, cached_meta)) = cache.get(&group.credential, &group.detector_id) {
120 return into_finding(group, cached_result, cached_meta);
121 }
122
123 let _inflight_guard = loop {
124 let notify_to_await: Option<Arc<Notify>> = {
125 if inflight.len() >= max_inflight_keys {
130 break None;
131 }
132
133 let key = (group.detector_id.clone(), group.credential.clone());
134 if let Some((cached_result, cached_meta)) =
135 cache.get(&group.credential, &group.detector_id)
136 {
137 return into_finding(group, cached_result, cached_meta);
138 }
139
140 match inflight.entry(key.clone()) {
141 dashmap::mapref::entry::Entry::Occupied(entry) => Some(entry.get().clone()),
142 dashmap::mapref::entry::Entry::Vacant(entry) => {
143 let notify = Arc::new(Notify::new());
144 entry.insert(notify.clone());
145 break Some(InflightGuard {
146 key,
147 inflight: inflight.clone(),
148 notify,
149 });
150 }
151 }
152 };
153
154 if let Some(notify) = notify_to_await {
155 notify.notified().await;
156 } else {
157 break None;
158 }
159 };
160
161 let (verification, metadata) = if let Some(custom_verifier) =
162 keyhog_core::registry::get_verifier_registry().get(&group.detector_id)
163 {
164 custom_verifier.verify(&group).await
165 } else {
166 match &detector {
167 Some(det) => match &det.verify {
168 Some(verify_spec) => {
169 verify_with_retry(
170 &client,
171 verify_spec,
172 &group.credential,
173 &group.companions,
174 timeout,
175 shared.danger_allow_private_ips,
176 shared.danger_allow_http,
177 shared.proxy_in_use,
178 shared.insecure_tls,
179 shared.oob_session.as_ref(),
180 )
181 .await
182 }
183 None => (VerificationResult::Unverifiable, HashMap::new()),
184 },
185 None => (VerificationResult::Unverifiable, HashMap::new()),
186 }
187 };
188
189 cache.put(
190 &group.credential,
191 &group.detector_id,
192 verification.clone(),
193 metadata.clone(),
194 );
195
196 into_finding(group, verification, metadata)
197}
198
199impl VerificationEngine {
200 pub fn new(
202 detectors: &[keyhog_core::DetectorSpec],
203 config: VerifyConfig,
204 ) -> Result<Self, VerifyError> {
205 let mut builder = Client::builder()
206 .timeout(config.timeout)
207 .danger_accept_invalid_certs(config.insecure_tls)
211 .redirect(reqwest::redirect::Policy::none());
212 builder = crate::apply_proxy_config(builder, config.proxy.as_deref())
213 .map_err(VerifyError::ProxyConfig)?;
214 let client = builder.build().map_err(VerifyError::ClientBuild)?;
215
216 let detector_map: HashMap<Arc<str>, keyhog_core::DetectorSpec> = detectors
217 .iter()
218 .cloned()
219 .map(|d| (d.id.clone().into(), d))
220 .collect();
221
222 let mut service_semaphores = HashMap::new();
223 for d in detectors {
224 service_semaphores
225 .entry(d.service.clone().into())
226 .or_insert_with(|| {
227 Arc::new(Semaphore::new(config.max_concurrent_per_service.max(1)))
228 });
229 }
230
231 Ok(Self {
232 client,
233 detectors: Arc::new(detector_map),
234 service_semaphores: Arc::new(service_semaphores),
235 global_semaphore: Arc::new(Semaphore::new(config.max_concurrent_global.max(1))),
236 timeout: config.timeout,
237 cache: Arc::new(cache::VerificationCache::default_ttl()),
238 inflight: Arc::new(DashMap::new()),
239 max_inflight_keys: config.max_inflight_keys.max(1),
240 danger_allow_private_ips: config.danger_allow_private_ips,
241 danger_allow_http: config.danger_allow_http,
242 insecure_tls: config.insecure_tls,
243 proxy_in_use: crate::proxy_is_active(config.proxy.as_deref()),
259 oob_session: None,
260 })
261 }
262
263 pub async fn verify_all(&self, groups: Vec<DedupedMatch>) -> Vec<VerifiedFinding> {
265 let max_active = self.global_semaphore.available_permits().max(1);
266 let total = groups.len();
267 let shared = VerifyTaskShared {
268 global_semaphore: self.global_semaphore.clone(),
269 service_semaphores: self.service_semaphores.clone(),
270 client: self.client.clone(),
271 detectors: self.detectors.clone(),
272 timeout: self.timeout,
273 cache: self.cache.clone(),
274 inflight: self.inflight.clone(),
275 max_inflight_keys: self.max_inflight_keys,
276 danger_allow_private_ips: self.danger_allow_private_ips,
277 danger_allow_http: self.danger_allow_http,
278 insecure_tls: self.insecure_tls,
279 proxy_in_use: self.proxy_in_use,
280 oob_session: self.oob_session.clone(),
281 };
282 let mut pending = groups.into_iter();
283 let mut join_set = JoinSet::new();
284
285 while join_set.len() < max_active {
286 let Some(group) = pending.next() else {
287 break;
288 };
289 join_set.spawn(verify_group_task(shared.clone(), group));
290 }
291
292 let mut findings = Vec::with_capacity(total);
293 while let Some(result) = join_set.join_next().await {
294 match result {
295 Ok(finding) => findings.push(finding),
296 Err(e) => tracing::error!("verification task panicked: {}", e),
297 }
298
299 if let Some(group) = pending.next() {
300 join_set.spawn(verify_group_task(shared.clone(), group));
301 }
302 }
303 findings
304 }
305
306 pub async fn enable_oob(
316 &mut self,
317 config: crate::oob::OobConfig,
318 ) -> Result<(), crate::oob::InteractshError> {
319 if let Some(old) = self.oob_session.take() {
320 old.shutdown().await;
321 }
322 let session = crate::oob::OobSession::start(self.client.clone(), config).await?;
323 self.oob_session = Some(session);
324 Ok(())
325 }
326
327 pub async fn shutdown_oob(&mut self) {
330 if let Some(session) = self.oob_session.take() {
331 session.shutdown().await;
332 }
333 }
334}
335
336impl Drop for VerificationEngine {
337 fn drop(&mut self) {
338 if let Some(session) = self.oob_session.take() {
349 session.abort_poller_for_drop();
350 }
351 }
352}