1use std::fmt;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
14
15use adler_core::{CheckOutcome, Client, ExecutorOptions, MatchKind, Site, Username, executor};
16use serde::{Deserialize, Serialize};
17use tokio::sync::{Notify, RwLock, broadcast, mpsc};
18
19use crate::persist::{self, PersistedScan};
20
21#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
30#[serde(transparent)]
31pub struct ScanId(String);
32
33impl ScanId {
34 #[must_use]
36 pub fn new() -> Self {
37 const ALPHABET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
38 let mut s = String::with_capacity(12);
39 for _ in 0..12 {
40 let idx = fastrand::usize(..ALPHABET.len());
41 s.push(char::from(ALPHABET[idx]));
42 }
43 Self(s)
44 }
45
46 #[must_use]
48 pub fn as_str(&self) -> &str {
49 &self.0
50 }
51}
52
53impl Default for ScanId {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59impl fmt::Display for ScanId {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 f.write_str(&self.0)
62 }
63}
64
65impl From<String> for ScanId {
66 fn from(s: String) -> Self {
67 Self(s)
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct FinishedScan {
74 pub summary: Summary,
76 pub outcomes: Vec<CheckOutcome>,
78 pub elapsed_ms: u64,
80}
81
82#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
84pub struct Summary {
85 pub found: usize,
87 pub not_found: usize,
89 pub uncertain: usize,
91}
92
93impl Summary {
94 #[must_use]
96 pub fn from_outcomes(outcomes: &[CheckOutcome]) -> Self {
97 let mut s = Self::default();
98 for o in outcomes {
99 match o.kind {
100 MatchKind::Found => s.found += 1,
101 MatchKind::NotFound => s.not_found += 1,
102 MatchKind::Uncertain => s.uncertain += 1,
103 }
104 }
105 s
106 }
107
108 #[must_use]
110 pub const fn total(&self) -> usize {
111 self.found + self.not_found + self.uncertain
112 }
113}
114
115#[derive(Debug, Clone)]
120pub struct ScanHandle {
121 inner: Arc<Inner>,
122}
123
124#[derive(Debug)]
125struct Inner {
126 username: String,
127 site_count: usize,
128 started_at: Instant,
129 created_at_ms: u64,
130 outcomes: RwLock<Vec<CheckOutcome>>,
131 finished: RwLock<Option<FinishedScan>>,
132 tx: broadcast::Sender<usize>,
137 done: Notify,
138}
139
140impl ScanHandle {
141 #[must_use]
150 pub fn new(username: impl Into<String>, site_count: usize, outcome_buffer: usize) -> Self {
151 let (tx, _) = broadcast::channel(outcome_buffer.max(1));
152 let created_at_ms = SystemTime::now()
153 .duration_since(UNIX_EPOCH)
154 .map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX));
155 Self {
156 inner: Arc::new(Inner {
157 username: username.into(),
158 site_count,
159 started_at: Instant::now(),
160 created_at_ms,
161 outcomes: RwLock::new(Vec::new()),
162 finished: RwLock::new(None),
163 tx,
164 done: Notify::new(),
165 }),
166 }
167 }
168
169 #[must_use]
171 pub fn username(&self) -> &str {
172 &self.inner.username
173 }
174
175 #[must_use]
177 pub fn site_count(&self) -> usize {
178 self.inner.site_count
179 }
180
181 #[must_use]
183 pub fn elapsed(&self) -> Duration {
184 self.inner.started_at.elapsed()
185 }
186
187 #[must_use]
190 pub fn created_at_ms(&self) -> u64 {
191 self.inner.created_at_ms
192 }
193
194 pub async fn outcomes_snapshot(&self) -> Vec<CheckOutcome> {
197 self.inner.outcomes.read().await.clone()
198 }
199
200 pub async fn finished(&self) -> Option<FinishedScan> {
202 self.inner.finished.read().await.clone()
203 }
204
205 #[must_use]
209 pub fn is_finished_now(&self) -> bool {
210 self.inner.finished.try_read().is_ok_and(|g| g.is_some())
211 }
212
213 #[must_use]
216 pub fn subscribe(&self) -> broadcast::Receiver<usize> {
217 self.inner.tx.subscribe()
218 }
219
220 pub async fn wait_done(&self) {
224 if self.inner.finished.read().await.is_some() {
225 return;
226 }
227 self.inner.done.notified().await;
228 }
229
230 fn tx(&self) -> broadcast::Sender<usize> {
231 self.inner.tx.clone()
232 }
233
234 async fn append(&self, outcome: CheckOutcome) {
235 let mut buf = self.inner.outcomes.write().await;
236 let idx = buf.len();
237 buf.push(outcome);
238 drop(buf);
239 let _ = self.inner.tx.send(idx);
242 }
243
244 #[allow(clippy::significant_drop_tightening)]
255 pub(crate) async fn extend_outcomes(&self, carried: Vec<CheckOutcome>) {
256 if carried.is_empty() {
257 return;
258 }
259 let mut buf = self.inner.outcomes.write().await;
260 for outcome in carried {
261 let idx = buf.len();
262 buf.push(outcome);
263 let _ = self.inner.tx.send(idx);
264 }
265 }
266
267 pub(crate) async fn publish(&self, finished: FinishedScan) {
268 *self.inner.finished.write().await = Some(finished);
269 self.inner.done.notify_waiters();
270 }
271
272 #[allow(clippy::significant_drop_tightening)]
281 pub(crate) async fn replace_outcome(&self, new: CheckOutcome) {
282 let mut guard = self.inner.finished.write().await;
283 let Some(finished) = guard.as_mut() else {
284 return;
285 };
286 if let Some(slot) = finished.outcomes.iter_mut().find(|o| o.site == new.site) {
287 *slot = new;
288 } else {
289 finished.outcomes.push(new);
290 }
291 finished.summary = Summary::from_outcomes(&finished.outcomes);
292 }
293}
294
295#[derive(Debug, Clone)]
300pub(crate) struct PersistContext {
301 pub scan_id: ScanId,
302 pub dir: Arc<PathBuf>,
303}
304
305pub(crate) fn spawn(
311 handle: ScanHandle,
312 client: Arc<Client>,
313 sites: Arc<[Site]>,
314 username: Username,
315 options: ExecutorOptions,
316 persist_ctx: Option<PersistContext>,
317) -> tokio::task::JoinHandle<()> {
318 tokio::spawn(async move {
319 run(handle, &client, &sites, &username, options, persist_ctx).await;
320 })
321}
322
323async fn run(
324 handle: ScanHandle,
325 client: &Client,
326 sites: &[Site],
327 username: &Username,
328 options: ExecutorOptions,
329 persist_ctx: Option<PersistContext>,
330) {
331 let (tx, mut rx) = mpsc::unbounded_channel::<CheckOutcome>();
332
333 let tx_for_cb = tx.clone();
336 let scan_fut = async move {
337 let outcomes = executor::run_with_progress(client, sites, username, options, move |o| {
338 let _ = tx_for_cb.send(o.clone());
342 })
343 .await;
344 drop(tx);
346 outcomes
347 };
348
349 let handle_ref = handle.clone();
350 let consume_fut = async move {
351 while let Some(outcome) = rx.recv().await {
352 handle_ref.append(outcome).await;
353 }
354 };
355
356 let (all_outcomes, ()) = tokio::join!(scan_fut, consume_fut);
357
358 let elapsed_ms = u64::try_from(handle.elapsed().as_millis()).unwrap_or(u64::MAX);
359 let summary = Summary::from_outcomes(&all_outcomes);
360 let finished = FinishedScan {
361 summary,
362 outcomes: all_outcomes,
363 elapsed_ms,
364 };
365
366 if let Some(ctx) = &persist_ctx {
369 let snapshot = PersistedScan::from_finished(
370 ctx.scan_id.clone(),
371 handle.username().to_owned(),
372 handle.site_count(),
373 handle.created_at_ms(),
374 finished.clone(),
375 );
376 if let Err(err) = persist::save(&ctx.dir, &snapshot).await {
377 tracing::warn!(error = %err, scan_id = %ctx.scan_id, "failed to persist scan");
378 } else {
379 let removed = persist::prune(&ctx.dir, persist::MAX_PERSISTED_SCANS).await;
380 if removed > 0 {
381 tracing::debug!(removed, "pruned older persisted scans");
382 }
383 }
384 }
385
386 handle.publish(finished).await;
387 drop(handle.tx()); }
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393 use adler_core::UncertainReason;
394
395 fn outcome(name: &str, kind: MatchKind) -> CheckOutcome {
396 CheckOutcome {
397 site: name.into(),
398 url: format!("https://{name}.example/u"),
399 kind,
400 reason: matches!(kind, MatchKind::Uncertain)
401 .then_some(UncertainReason::Other("test".into())),
402 elapsed_ms: 1,
403 enrichment: std::collections::BTreeMap::new(),
404 evidence: Vec::new(),
405 transport: None,
406 escalations: 0,
407 }
408 }
409
410 #[test]
411 fn summary_tallies_by_verdict() {
412 let s = Summary::from_outcomes(&[
413 outcome("a", MatchKind::Found),
414 outcome("b", MatchKind::NotFound),
415 outcome("c", MatchKind::NotFound),
416 outcome("d", MatchKind::Uncertain),
417 ]);
418 assert_eq!(s.found, 1);
419 assert_eq!(s.not_found, 2);
420 assert_eq!(s.uncertain, 1);
421 assert_eq!(s.total(), 4);
422 }
423
424 #[test]
425 fn scan_id_is_url_safe_and_random() {
426 let a = ScanId::new();
427 let b = ScanId::new();
428 assert_eq!(a.as_str().len(), 12);
429 assert!(
430 a.as_str()
431 .chars()
432 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit()),
433 );
434 assert_ne!(a, b);
436 }
437
438 #[tokio::test]
439 async fn append_publishes_to_subscribers_and_history() {
440 let handle = ScanHandle::new("alice", 2, 16);
441 let mut rx = handle.subscribe();
442
443 handle.append(outcome("GitHub", MatchKind::Found)).await;
444 handle.append(outcome("GitLab", MatchKind::NotFound)).await;
445
446 let snap = handle.outcomes_snapshot().await;
448 assert_eq!(snap.len(), 2);
449 assert_eq!(snap[0].site, "GitHub");
450 assert_eq!(snap[1].site, "GitLab");
451
452 assert_eq!(rx.recv().await.unwrap(), 0);
454 assert_eq!(rx.recv().await.unwrap(), 1);
455 }
456
457 #[tokio::test]
458 async fn publish_releases_wait_done_and_exposes_finished() {
459 let handle = ScanHandle::new("alice", 1, 4);
460
461 let waiter = {
462 let h = handle.clone();
463 tokio::spawn(async move { h.wait_done().await })
464 };
465
466 tokio::task::yield_now().await;
468
469 handle
470 .publish(FinishedScan {
471 summary: Summary {
472 found: 1,
473 not_found: 0,
474 uncertain: 0,
475 },
476 outcomes: vec![outcome("GitHub", MatchKind::Found)],
477 elapsed_ms: 42,
478 })
479 .await;
480
481 waiter.await.unwrap();
482 let f = handle.finished().await.expect("finished");
483 assert_eq!(f.summary.found, 1);
484 assert_eq!(f.elapsed_ms, 42);
485 assert_eq!(f.outcomes.len(), 1);
486 }
487
488 #[tokio::test]
489 async fn wait_done_returns_immediately_if_already_finished() {
490 let handle = ScanHandle::new("alice", 1, 4);
491 handle
492 .publish(FinishedScan {
493 summary: Summary::default(),
494 outcomes: Vec::new(),
495 elapsed_ms: 0,
496 })
497 .await;
498 tokio::time::timeout(Duration::from_millis(100), handle.wait_done())
500 .await
501 .expect("wait_done must return immediately when already finished");
502 }
503}