Skip to main content

adler_server/
scan.rs

1//! Per-scan state: the live broadcast channel + the final aggregate.
2//!
3//! A scan is started in the background via [`spawn`]. Outcomes flow
4//! into [`ScanHandle::outcomes`] in append-only order; each push fans
5//! out an index notification on [`ScanHandle::tx`] so SSE subscribers
6//! can stream them as they arrive. When the executor finishes, the
7//! aggregate is published in [`ScanHandle::finished`] and waiters
8//! parked on [`ScanHandle::done`] are released.
9
10use std::fmt;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
14
15use adler_core::{CheckOutcome, Client, ExecutorOptions, MatchKind, Site, Username, executor};
16use serde::{Deserialize, Serialize};
17use tokio::sync::{Notify, RwLock, broadcast, mpsc};
18
19use crate::persist::{self, PersistedScan};
20
21/// Identifier for a running or finished scan.
22///
23/// Short alphanumeric token (12 chars, ~71 bits of entropy) suitable
24/// for URLs. Not a cryptographic identifier — it is a *capability* in
25/// the sense that knowing the ID lets you read scan results, so it is
26/// random enough not to be guessable in a single-process session, but
27/// no replacement for proper auth if the server is ever exposed
28/// publicly (it isn't, by default — see [`crate::AppConfig`]).
29#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
30#[serde(transparent)]
31pub struct ScanId(String);
32
33impl ScanId {
34    /// Mint a fresh random ID using the workspace `fastrand` PRNG.
35    #[must_use]
36    pub fn new() -> Self {
37        const ALPHABET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
38        let mut s = String::with_capacity(12);
39        for _ in 0..12 {
40            let idx = fastrand::usize(..ALPHABET.len());
41            s.push(char::from(ALPHABET[idx]));
42        }
43        Self(s)
44    }
45
46    /// Borrow the ID as a string slice.
47    #[must_use]
48    pub fn as_str(&self) -> &str {
49        &self.0
50    }
51}
52
53impl Default for ScanId {
54    fn default() -> Self {
55        Self::new()
56    }
57}
58
59impl fmt::Display for ScanId {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        f.write_str(&self.0)
62    }
63}
64
65impl From<String> for ScanId {
66    fn from(s: String) -> Self {
67        Self(s)
68    }
69}
70
71/// Aggregate published once a scan finishes.
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct FinishedScan {
74    /// Counts by verdict.
75    pub summary: Summary,
76    /// All outcomes, in completion order (same order as the live stream).
77    pub outcomes: Vec<CheckOutcome>,
78    /// Wall-clock duration of the whole scan, milliseconds.
79    pub elapsed_ms: u64,
80}
81
82/// Verdict counts for a finished scan.
83#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
84pub struct Summary {
85    /// Sites where the account exists.
86    pub found: usize,
87    /// Sites where the account doesn't exist.
88    pub not_found: usize,
89    /// Sites with inconclusive verdicts.
90    pub uncertain: usize,
91}
92
93impl Summary {
94    /// Tally verdicts from a slice of outcomes.
95    #[must_use]
96    pub fn from_outcomes(outcomes: &[CheckOutcome]) -> Self {
97        let mut s = Self::default();
98        for o in outcomes {
99            match o.kind {
100                MatchKind::Found => s.found += 1,
101                MatchKind::NotFound => s.not_found += 1,
102                MatchKind::Uncertain => s.uncertain += 1,
103            }
104        }
105        s
106    }
107
108    /// Total number of probed sites.
109    #[must_use]
110    pub const fn total(&self) -> usize {
111        self.found + self.not_found + self.uncertain
112    }
113}
114
115/// Live state of one scan.
116///
117/// All fields are `Arc<…>` because handles are shared between the
118/// background scan task and any number of HTTP request handlers.
119#[derive(Debug, Clone)]
120pub struct ScanHandle {
121    inner: Arc<Inner>,
122}
123
124#[derive(Debug)]
125struct Inner {
126    username: String,
127    site_count: usize,
128    started_at: Instant,
129    created_at_ms: u64,
130    outcomes: RwLock<Vec<CheckOutcome>>,
131    finished: RwLock<Option<FinishedScan>>,
132    // Broadcast carries the *index* of a newly appended outcome rather
133    // than the outcome itself — subscribers re-read from `outcomes` so
134    // a slow subscriber that misses a notification can still resync by
135    // re-snapshotting on the next event.
136    tx: broadcast::Sender<usize>,
137    done: Notify,
138}
139
140impl ScanHandle {
141    /// Construct an empty handle ready to accept outcomes.
142    ///
143    /// `site_count` is the size of the site list this scan will run
144    /// against — surfaced through [`Self::site_count`] so the UI can
145    /// render `423 / 1890` progress without holding open an SSE
146    /// stream. `outcome_buffer` sizes the broadcast ring buffer; a
147    /// value substantially larger than `site_count` is fine — the cost
148    /// is one `Arc<…>` slot per buffered notification.
149    #[must_use]
150    pub fn new(username: impl Into<String>, site_count: usize, outcome_buffer: usize) -> Self {
151        let (tx, _) = broadcast::channel(outcome_buffer.max(1));
152        let created_at_ms = SystemTime::now()
153            .duration_since(UNIX_EPOCH)
154            .map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX));
155        Self {
156            inner: Arc::new(Inner {
157                username: username.into(),
158                site_count,
159                started_at: Instant::now(),
160                created_at_ms,
161                outcomes: RwLock::new(Vec::new()),
162                finished: RwLock::new(None),
163                tx,
164                done: Notify::new(),
165            }),
166        }
167    }
168
169    /// Username being scanned (for display / debugging).
170    #[must_use]
171    pub fn username(&self) -> &str {
172        &self.inner.username
173    }
174
175    /// Total number of sites this scan will / did probe.
176    #[must_use]
177    pub fn site_count(&self) -> usize {
178        self.inner.site_count
179    }
180
181    /// Wall-clock time since the handle was created.
182    #[must_use]
183    pub fn elapsed(&self) -> Duration {
184        self.inner.started_at.elapsed()
185    }
186
187    /// Unix epoch milliseconds when this handle was constructed. Used
188    /// by the history endpoint so the UI can render relative times.
189    #[must_use]
190    pub fn created_at_ms(&self) -> u64 {
191        self.inner.created_at_ms
192    }
193
194    /// Snapshot of outcomes recorded so far. Cheap clone — `Vec` deep-clones
195    /// only the small number of strings inside each [`CheckOutcome`].
196    pub async fn outcomes_snapshot(&self) -> Vec<CheckOutcome> {
197        self.inner.outcomes.read().await.clone()
198    }
199
200    /// Final aggregate, once the scan has completed. `None` while running.
201    pub async fn finished(&self) -> Option<FinishedScan> {
202        self.inner.finished.read().await.clone()
203    }
204
205    /// Best-effort sync peek used by the eviction policy. Returns
206    /// `false` if the lock is currently held — a momentarily-locked
207    /// `finished` slot is, by construction, still being mutated.
208    #[must_use]
209    pub fn is_finished_now(&self) -> bool {
210        self.inner.finished.try_read().is_ok_and(|g| g.is_some())
211    }
212
213    /// Subscribe to "new outcome appended at index N" notifications.
214    /// Combine with [`Self::outcomes_snapshot`] for "replay then live" semantics.
215    #[must_use]
216    pub fn subscribe(&self) -> broadcast::Receiver<usize> {
217        self.inner.tx.subscribe()
218    }
219
220    /// Wait until the scan finishes. Idempotent — fires for every
221    /// caller registered before *or* after the scan completes (the
222    /// `finished` field is the source of truth; this is just a wake-up).
223    pub async fn wait_done(&self) {
224        if self.inner.finished.read().await.is_some() {
225            return;
226        }
227        self.inner.done.notified().await;
228    }
229
230    fn tx(&self) -> broadcast::Sender<usize> {
231        self.inner.tx.clone()
232    }
233
234    async fn append(&self, outcome: CheckOutcome) {
235        let mut buf = self.inner.outcomes.write().await;
236        let idx = buf.len();
237        buf.push(outcome);
238        drop(buf);
239        // Broadcast send is non-blocking; an `Err` means no live
240        // subscribers, which is fine — `outcomes_snapshot` still works.
241        let _ = self.inner.tx.send(idx);
242    }
243
244    pub(crate) async fn publish(&self, finished: FinishedScan) {
245        *self.inner.finished.write().await = Some(finished);
246        self.inner.done.notify_waiters();
247    }
248
249    /// Replace the outcome for `new.site` in the (finished) scan,
250    /// recomputing the summary. No-op if the scan is still running.
251    ///
252    /// Used by the per-site retry endpoint to swap an `Uncertain`
253    /// result with a fresh probe.
254    // The whole function body operates on the write guard; the nursery
255    // lint's "tighten the drop" suggestion would defeat the atomicity
256    // we want between the mutation and the summary recompute.
257    #[allow(clippy::significant_drop_tightening)]
258    pub(crate) async fn replace_outcome(&self, new: CheckOutcome) {
259        let mut guard = self.inner.finished.write().await;
260        let Some(finished) = guard.as_mut() else {
261            return;
262        };
263        if let Some(slot) = finished.outcomes.iter_mut().find(|o| o.site == new.site) {
264            *slot = new;
265        } else {
266            finished.outcomes.push(new);
267        }
268        finished.summary = Summary::from_outcomes(&finished.outcomes);
269    }
270}
271
272/// Optional persistence context handed to [`spawn`]: when present, the
273/// finished scan is written to `<dir>/<scan_id>.json` before the `done`
274/// event fires — so a UI refresh right after completion can reload the
275/// scan from disk.
276#[derive(Debug, Clone)]
277pub(crate) struct PersistContext {
278    pub scan_id: ScanId,
279    pub dir: Arc<PathBuf>,
280}
281
282/// Spawn the background task that runs the scan and feeds the handle.
283///
284/// Returns immediately; the caller drives progress via SSE
285/// ([`ScanHandle::subscribe`]) or polls completion via
286/// [`ScanHandle::finished`].
287pub(crate) fn spawn(
288    handle: ScanHandle,
289    client: Arc<Client>,
290    sites: Arc<[Site]>,
291    username: Username,
292    options: ExecutorOptions,
293    persist_ctx: Option<PersistContext>,
294) {
295    tokio::spawn(async move {
296        run(handle, &client, &sites, &username, options, persist_ctx).await;
297    });
298}
299
300async fn run(
301    handle: ScanHandle,
302    client: &Client,
303    sites: &[Site],
304    username: &Username,
305    options: ExecutorOptions,
306    persist_ctx: Option<PersistContext>,
307) {
308    let (tx, mut rx) = mpsc::unbounded_channel::<CheckOutcome>();
309
310    // The executor callback is sync FnMut — bridge to the async append
311    // path via an unbounded mpsc so we never block the executor loop.
312    let tx_for_cb = tx.clone();
313    let scan_fut = async move {
314        let outcomes = executor::run_with_progress(client, sites, username, options, move |o| {
315            // Drop is fine: a receive end disappearing means the server
316            // is shutting down, in which case losing notifications is
317            // exactly what we want.
318            let _ = tx_for_cb.send(o.clone());
319        })
320        .await;
321        // Drop the original sender so the consumer loop terminates.
322        drop(tx);
323        outcomes
324    };
325
326    let handle_ref = handle.clone();
327    let consume_fut = async move {
328        while let Some(outcome) = rx.recv().await {
329            handle_ref.append(outcome).await;
330        }
331    };
332
333    let (all_outcomes, ()) = tokio::join!(scan_fut, consume_fut);
334
335    let elapsed_ms = u64::try_from(handle.elapsed().as_millis()).unwrap_or(u64::MAX);
336    let summary = Summary::from_outcomes(&all_outcomes);
337    let finished = FinishedScan {
338        summary,
339        outcomes: all_outcomes,
340        elapsed_ms,
341    };
342
343    // Persist before publishing the `done` event so a UI that refreshes
344    // immediately after seeing `done` still finds the scan on disk.
345    if let Some(ctx) = &persist_ctx {
346        let snapshot = PersistedScan::from_finished(
347            ctx.scan_id.clone(),
348            handle.username().to_owned(),
349            handle.site_count(),
350            handle.created_at_ms(),
351            finished.clone(),
352        );
353        if let Err(err) = persist::save(&ctx.dir, &snapshot).await {
354            tracing::warn!(error = %err, scan_id = %ctx.scan_id, "failed to persist scan");
355        } else {
356            let removed = persist::prune(&ctx.dir, persist::MAX_PERSISTED_SCANS).await;
357            if removed > 0 {
358                tracing::debug!(removed, "pruned older persisted scans");
359            }
360        }
361    }
362
363    handle.publish(finished).await;
364    drop(handle.tx()); // help the broadcast channel close cleanly
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370    use adler_core::UncertainReason;
371
372    fn outcome(name: &str, kind: MatchKind) -> CheckOutcome {
373        CheckOutcome {
374            site: name.into(),
375            url: format!("https://{name}.example/u"),
376            kind,
377            reason: matches!(kind, MatchKind::Uncertain)
378                .then_some(UncertainReason::Other("test".into())),
379            elapsed_ms: 1,
380            enrichment: std::collections::BTreeMap::new(),
381            evidence: Vec::new(),
382            transport: None,
383            escalations: 0,
384        }
385    }
386
387    #[test]
388    fn summary_tallies_by_verdict() {
389        let s = Summary::from_outcomes(&[
390            outcome("a", MatchKind::Found),
391            outcome("b", MatchKind::NotFound),
392            outcome("c", MatchKind::NotFound),
393            outcome("d", MatchKind::Uncertain),
394        ]);
395        assert_eq!(s.found, 1);
396        assert_eq!(s.not_found, 2);
397        assert_eq!(s.uncertain, 1);
398        assert_eq!(s.total(), 4);
399    }
400
401    #[test]
402    fn scan_id_is_url_safe_and_random() {
403        let a = ScanId::new();
404        let b = ScanId::new();
405        assert_eq!(a.as_str().len(), 12);
406        assert!(
407            a.as_str()
408                .chars()
409                .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit()),
410        );
411        // Birthday-collision probability on two 71-bit IDs is negligible.
412        assert_ne!(a, b);
413    }
414
415    #[tokio::test]
416    async fn append_publishes_to_subscribers_and_history() {
417        let handle = ScanHandle::new("alice", 2, 16);
418        let mut rx = handle.subscribe();
419
420        handle.append(outcome("GitHub", MatchKind::Found)).await;
421        handle.append(outcome("GitLab", MatchKind::NotFound)).await;
422
423        // History was recorded in order.
424        let snap = handle.outcomes_snapshot().await;
425        assert_eq!(snap.len(), 2);
426        assert_eq!(snap[0].site, "GitHub");
427        assert_eq!(snap[1].site, "GitLab");
428
429        // Indices were broadcast in order.
430        assert_eq!(rx.recv().await.unwrap(), 0);
431        assert_eq!(rx.recv().await.unwrap(), 1);
432    }
433
434    #[tokio::test]
435    async fn publish_releases_wait_done_and_exposes_finished() {
436        let handle = ScanHandle::new("alice", 1, 4);
437
438        let waiter = {
439            let h = handle.clone();
440            tokio::spawn(async move { h.wait_done().await })
441        };
442
443        // Give the waiter a chance to park.
444        tokio::task::yield_now().await;
445
446        handle
447            .publish(FinishedScan {
448                summary: Summary {
449                    found: 1,
450                    not_found: 0,
451                    uncertain: 0,
452                },
453                outcomes: vec![outcome("GitHub", MatchKind::Found)],
454                elapsed_ms: 42,
455            })
456            .await;
457
458        waiter.await.unwrap();
459        let f = handle.finished().await.expect("finished");
460        assert_eq!(f.summary.found, 1);
461        assert_eq!(f.elapsed_ms, 42);
462        assert_eq!(f.outcomes.len(), 1);
463    }
464
465    #[tokio::test]
466    async fn wait_done_returns_immediately_if_already_finished() {
467        let handle = ScanHandle::new("alice", 1, 4);
468        handle
469            .publish(FinishedScan {
470                summary: Summary::default(),
471                outcomes: Vec::new(),
472                elapsed_ms: 0,
473            })
474            .await;
475        // Should not deadlock — the fast path checks `finished` first.
476        tokio::time::timeout(Duration::from_millis(100), handle.wait_done())
477            .await
478            .expect("wait_done must return immediately when already finished");
479    }
480}