1use std::fmt;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
14
15use adler_core::{CheckOutcome, Client, ExecutorOptions, MatchKind, Site, Username, executor};
16use serde::{Deserialize, Serialize};
17use tokio::sync::{Notify, RwLock, broadcast, mpsc};
18
19use crate::persist::{self, PersistedScan};
20
21#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
30#[serde(transparent)]
31pub struct ScanId(String);
32
33impl ScanId {
34 #[must_use]
36 pub fn new() -> Self {
37 const ALPHABET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
38 let mut s = String::with_capacity(12);
39 for _ in 0..12 {
40 let idx = fastrand::usize(..ALPHABET.len());
41 s.push(char::from(ALPHABET[idx]));
42 }
43 Self(s)
44 }
45
46 #[must_use]
48 pub fn as_str(&self) -> &str {
49 &self.0
50 }
51}
52
53impl Default for ScanId {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59impl fmt::Display for ScanId {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 f.write_str(&self.0)
62 }
63}
64
65impl From<String> for ScanId {
66 fn from(s: String) -> Self {
67 Self(s)
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct FinishedScan {
74 pub summary: Summary,
76 pub outcomes: Vec<CheckOutcome>,
78 pub elapsed_ms: u64,
80}
81
82#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
84pub struct Summary {
85 pub found: usize,
87 pub not_found: usize,
89 pub uncertain: usize,
91}
92
93impl Summary {
94 #[must_use]
96 pub fn from_outcomes(outcomes: &[CheckOutcome]) -> Self {
97 let mut s = Self::default();
98 for o in outcomes {
99 match o.kind {
100 MatchKind::Found => s.found += 1,
101 MatchKind::NotFound => s.not_found += 1,
102 MatchKind::Uncertain => s.uncertain += 1,
103 }
104 }
105 s
106 }
107
108 #[must_use]
110 pub const fn total(&self) -> usize {
111 self.found + self.not_found + self.uncertain
112 }
113}
114
115#[derive(Debug, Clone)]
120pub struct ScanHandle {
121 inner: Arc<Inner>,
122}
123
124#[derive(Debug)]
125struct Inner {
126 username: String,
127 site_count: usize,
128 started_at: Instant,
129 created_at_ms: u64,
130 outcomes: RwLock<Vec<CheckOutcome>>,
131 finished: RwLock<Option<FinishedScan>>,
132 tx: broadcast::Sender<usize>,
137 done: Notify,
138}
139
140impl ScanHandle {
141 #[must_use]
150 pub fn new(username: impl Into<String>, site_count: usize, outcome_buffer: usize) -> Self {
151 let (tx, _) = broadcast::channel(outcome_buffer.max(1));
152 let created_at_ms = SystemTime::now()
153 .duration_since(UNIX_EPOCH)
154 .map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX));
155 Self {
156 inner: Arc::new(Inner {
157 username: username.into(),
158 site_count,
159 started_at: Instant::now(),
160 created_at_ms,
161 outcomes: RwLock::new(Vec::new()),
162 finished: RwLock::new(None),
163 tx,
164 done: Notify::new(),
165 }),
166 }
167 }
168
169 #[must_use]
171 pub fn username(&self) -> &str {
172 &self.inner.username
173 }
174
175 #[must_use]
177 pub fn site_count(&self) -> usize {
178 self.inner.site_count
179 }
180
181 #[must_use]
183 pub fn elapsed(&self) -> Duration {
184 self.inner.started_at.elapsed()
185 }
186
187 #[must_use]
190 pub fn created_at_ms(&self) -> u64 {
191 self.inner.created_at_ms
192 }
193
194 pub async fn outcomes_snapshot(&self) -> Vec<CheckOutcome> {
197 self.inner.outcomes.read().await.clone()
198 }
199
200 pub async fn finished(&self) -> Option<FinishedScan> {
202 self.inner.finished.read().await.clone()
203 }
204
205 #[must_use]
209 pub fn is_finished_now(&self) -> bool {
210 self.inner.finished.try_read().is_ok_and(|g| g.is_some())
211 }
212
213 #[must_use]
216 pub fn subscribe(&self) -> broadcast::Receiver<usize> {
217 self.inner.tx.subscribe()
218 }
219
220 pub async fn wait_done(&self) {
224 if self.inner.finished.read().await.is_some() {
225 return;
226 }
227 self.inner.done.notified().await;
228 }
229
230 fn tx(&self) -> broadcast::Sender<usize> {
231 self.inner.tx.clone()
232 }
233
234 async fn append(&self, outcome: CheckOutcome) {
235 let mut buf = self.inner.outcomes.write().await;
236 let idx = buf.len();
237 buf.push(outcome);
238 drop(buf);
239 let _ = self.inner.tx.send(idx);
242 }
243
244 pub(crate) async fn publish(&self, finished: FinishedScan) {
245 *self.inner.finished.write().await = Some(finished);
246 self.inner.done.notify_waiters();
247 }
248
249 #[allow(clippy::significant_drop_tightening)]
258 pub(crate) async fn replace_outcome(&self, new: CheckOutcome) {
259 let mut guard = self.inner.finished.write().await;
260 let Some(finished) = guard.as_mut() else {
261 return;
262 };
263 if let Some(slot) = finished.outcomes.iter_mut().find(|o| o.site == new.site) {
264 *slot = new;
265 } else {
266 finished.outcomes.push(new);
267 }
268 finished.summary = Summary::from_outcomes(&finished.outcomes);
269 }
270}
271
272#[derive(Debug, Clone)]
277pub(crate) struct PersistContext {
278 pub scan_id: ScanId,
279 pub dir: Arc<PathBuf>,
280}
281
282pub(crate) fn spawn(
288 handle: ScanHandle,
289 client: Arc<Client>,
290 sites: Arc<[Site]>,
291 username: Username,
292 options: ExecutorOptions,
293 persist_ctx: Option<PersistContext>,
294) {
295 tokio::spawn(async move {
296 run(handle, &client, &sites, &username, options, persist_ctx).await;
297 });
298}
299
300async fn run(
301 handle: ScanHandle,
302 client: &Client,
303 sites: &[Site],
304 username: &Username,
305 options: ExecutorOptions,
306 persist_ctx: Option<PersistContext>,
307) {
308 let (tx, mut rx) = mpsc::unbounded_channel::<CheckOutcome>();
309
310 let tx_for_cb = tx.clone();
313 let scan_fut = async move {
314 let outcomes = executor::run_with_progress(client, sites, username, options, move |o| {
315 let _ = tx_for_cb.send(o.clone());
319 })
320 .await;
321 drop(tx);
323 outcomes
324 };
325
326 let handle_ref = handle.clone();
327 let consume_fut = async move {
328 while let Some(outcome) = rx.recv().await {
329 handle_ref.append(outcome).await;
330 }
331 };
332
333 let (all_outcomes, ()) = tokio::join!(scan_fut, consume_fut);
334
335 let elapsed_ms = u64::try_from(handle.elapsed().as_millis()).unwrap_or(u64::MAX);
336 let summary = Summary::from_outcomes(&all_outcomes);
337 let finished = FinishedScan {
338 summary,
339 outcomes: all_outcomes,
340 elapsed_ms,
341 };
342
343 if let Some(ctx) = &persist_ctx {
346 let snapshot = PersistedScan::from_finished(
347 ctx.scan_id.clone(),
348 handle.username().to_owned(),
349 handle.site_count(),
350 handle.created_at_ms(),
351 finished.clone(),
352 );
353 if let Err(err) = persist::save(&ctx.dir, &snapshot).await {
354 tracing::warn!(error = %err, scan_id = %ctx.scan_id, "failed to persist scan");
355 } else {
356 let removed = persist::prune(&ctx.dir, persist::MAX_PERSISTED_SCANS).await;
357 if removed > 0 {
358 tracing::debug!(removed, "pruned older persisted scans");
359 }
360 }
361 }
362
363 handle.publish(finished).await;
364 drop(handle.tx()); }
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370 use adler_core::UncertainReason;
371
372 fn outcome(name: &str, kind: MatchKind) -> CheckOutcome {
373 CheckOutcome {
374 site: name.into(),
375 url: format!("https://{name}.example/u"),
376 kind,
377 reason: matches!(kind, MatchKind::Uncertain)
378 .then_some(UncertainReason::Other("test".into())),
379 elapsed_ms: 1,
380 enrichment: std::collections::BTreeMap::new(),
381 evidence: Vec::new(),
382 transport: None,
383 escalations: 0,
384 }
385 }
386
387 #[test]
388 fn summary_tallies_by_verdict() {
389 let s = Summary::from_outcomes(&[
390 outcome("a", MatchKind::Found),
391 outcome("b", MatchKind::NotFound),
392 outcome("c", MatchKind::NotFound),
393 outcome("d", MatchKind::Uncertain),
394 ]);
395 assert_eq!(s.found, 1);
396 assert_eq!(s.not_found, 2);
397 assert_eq!(s.uncertain, 1);
398 assert_eq!(s.total(), 4);
399 }
400
401 #[test]
402 fn scan_id_is_url_safe_and_random() {
403 let a = ScanId::new();
404 let b = ScanId::new();
405 assert_eq!(a.as_str().len(), 12);
406 assert!(
407 a.as_str()
408 .chars()
409 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit()),
410 );
411 assert_ne!(a, b);
413 }
414
415 #[tokio::test]
416 async fn append_publishes_to_subscribers_and_history() {
417 let handle = ScanHandle::new("alice", 2, 16);
418 let mut rx = handle.subscribe();
419
420 handle.append(outcome("GitHub", MatchKind::Found)).await;
421 handle.append(outcome("GitLab", MatchKind::NotFound)).await;
422
423 let snap = handle.outcomes_snapshot().await;
425 assert_eq!(snap.len(), 2);
426 assert_eq!(snap[0].site, "GitHub");
427 assert_eq!(snap[1].site, "GitLab");
428
429 assert_eq!(rx.recv().await.unwrap(), 0);
431 assert_eq!(rx.recv().await.unwrap(), 1);
432 }
433
434 #[tokio::test]
435 async fn publish_releases_wait_done_and_exposes_finished() {
436 let handle = ScanHandle::new("alice", 1, 4);
437
438 let waiter = {
439 let h = handle.clone();
440 tokio::spawn(async move { h.wait_done().await })
441 };
442
443 tokio::task::yield_now().await;
445
446 handle
447 .publish(FinishedScan {
448 summary: Summary {
449 found: 1,
450 not_found: 0,
451 uncertain: 0,
452 },
453 outcomes: vec![outcome("GitHub", MatchKind::Found)],
454 elapsed_ms: 42,
455 })
456 .await;
457
458 waiter.await.unwrap();
459 let f = handle.finished().await.expect("finished");
460 assert_eq!(f.summary.found, 1);
461 assert_eq!(f.elapsed_ms, 42);
462 assert_eq!(f.outcomes.len(), 1);
463 }
464
465 #[tokio::test]
466 async fn wait_done_returns_immediately_if_already_finished() {
467 let handle = ScanHandle::new("alice", 1, 4);
468 handle
469 .publish(FinishedScan {
470 summary: Summary::default(),
471 outcomes: Vec::new(),
472 elapsed_ms: 0,
473 })
474 .await;
475 tokio::time::timeout(Duration::from_millis(100), handle.wait_done())
477 .await
478 .expect("wait_done must return immediately when already finished");
479 }
480}