Skip to main content

adler_server/
scan.rs

1//! Per-scan state: the live broadcast channel + the final aggregate.
2//!
3//! A scan is started in the background via [`spawn`]. Outcomes flow
4//! into [`ScanHandle::outcomes`] in append-only order; each push fans
5//! out an index notification on [`ScanHandle::tx`] so SSE subscribers
6//! can stream them as they arrive. When the executor finishes, the
7//! aggregate is published in [`ScanHandle::finished`] and waiters
8//! parked on [`ScanHandle::done`] are released.
9
10use std::fmt;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
14
15use adler_core::{CheckOutcome, Client, ExecutorOptions, MatchKind, Site, Username, executor};
16use serde::{Deserialize, Serialize};
17use tokio::sync::{Notify, RwLock, broadcast, mpsc};
18
19use crate::persist::{self, PersistedScan};
20
21/// Identifier for a running or finished scan.
22///
23/// Short alphanumeric token (12 chars, ~71 bits of entropy) suitable
24/// for URLs. Not a cryptographic identifier — it is a *capability* in
25/// the sense that knowing the ID lets you read scan results, so it is
26/// random enough not to be guessable in a single-process session, but
27/// no replacement for proper auth if the server is ever exposed
28/// publicly (it isn't, by default — see [`crate::AppConfig`]).
29#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
30#[serde(transparent)]
31pub struct ScanId(String);
32
33impl ScanId {
34    /// Mint a fresh random ID using the workspace `fastrand` PRNG.
35    #[must_use]
36    pub fn new() -> Self {
37        const ALPHABET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
38        let mut s = String::with_capacity(12);
39        for _ in 0..12 {
40            let idx = fastrand::usize(..ALPHABET.len());
41            s.push(char::from(ALPHABET[idx]));
42        }
43        Self(s)
44    }
45
46    /// Borrow the ID as a string slice.
47    #[must_use]
48    pub fn as_str(&self) -> &str {
49        &self.0
50    }
51}
52
53impl Default for ScanId {
54    fn default() -> Self {
55        Self::new()
56    }
57}
58
59impl fmt::Display for ScanId {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        f.write_str(&self.0)
62    }
63}
64
65impl From<String> for ScanId {
66    fn from(s: String) -> Self {
67        Self(s)
68    }
69}
70
71/// Aggregate published once a scan finishes.
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct FinishedScan {
74    /// Counts by verdict.
75    pub summary: Summary,
76    /// All outcomes, in completion order (same order as the live stream).
77    pub outcomes: Vec<CheckOutcome>,
78    /// Wall-clock duration of the whole scan, milliseconds.
79    pub elapsed_ms: u64,
80}
81
82/// Verdict counts for a finished scan.
83#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
84pub struct Summary {
85    /// Sites where the account exists.
86    pub found: usize,
87    /// Sites where the account doesn't exist.
88    pub not_found: usize,
89    /// Sites with inconclusive verdicts.
90    pub uncertain: usize,
91}
92
93impl Summary {
94    /// Tally verdicts from a slice of outcomes.
95    #[must_use]
96    pub fn from_outcomes(outcomes: &[CheckOutcome]) -> Self {
97        let mut s = Self::default();
98        for o in outcomes {
99            match o.kind {
100                MatchKind::Found => s.found += 1,
101                MatchKind::NotFound => s.not_found += 1,
102                MatchKind::Uncertain => s.uncertain += 1,
103            }
104        }
105        s
106    }
107
108    /// Total number of probed sites.
109    #[must_use]
110    pub const fn total(&self) -> usize {
111        self.found + self.not_found + self.uncertain
112    }
113}
114
115/// Live state of one scan.
116///
117/// All fields are `Arc<…>` because handles are shared between the
118/// background scan task and any number of HTTP request handlers.
119#[derive(Debug, Clone)]
120pub struct ScanHandle {
121    inner: Arc<Inner>,
122}
123
124#[derive(Debug)]
125struct Inner {
126    username: String,
127    site_count: usize,
128    started_at: Instant,
129    created_at_ms: u64,
130    outcomes: RwLock<Vec<CheckOutcome>>,
131    finished: RwLock<Option<FinishedScan>>,
132    // Broadcast carries the *index* of a newly appended outcome rather
133    // than the outcome itself — subscribers re-read from `outcomes` so
134    // a slow subscriber that misses a notification can still resync by
135    // re-snapshotting on the next event.
136    tx: broadcast::Sender<usize>,
137    done: Notify,
138}
139
140impl ScanHandle {
141    /// Construct an empty handle ready to accept outcomes.
142    ///
143    /// `site_count` is the size of the site list this scan will run
144    /// against — surfaced through [`Self::site_count`] so the UI can
145    /// render `423 / 1890` progress without holding open an SSE
146    /// stream. `outcome_buffer` sizes the broadcast ring buffer; a
147    /// value substantially larger than `site_count` is fine — the cost
148    /// is one `Arc<…>` slot per buffered notification.
149    #[must_use]
150    pub fn new(username: impl Into<String>, site_count: usize, outcome_buffer: usize) -> Self {
151        let (tx, _) = broadcast::channel(outcome_buffer.max(1));
152        let created_at_ms = SystemTime::now()
153            .duration_since(UNIX_EPOCH)
154            .map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX));
155        Self {
156            inner: Arc::new(Inner {
157                username: username.into(),
158                site_count,
159                started_at: Instant::now(),
160                created_at_ms,
161                outcomes: RwLock::new(Vec::new()),
162                finished: RwLock::new(None),
163                tx,
164                done: Notify::new(),
165            }),
166        }
167    }
168
169    /// Username being scanned (for display / debugging).
170    #[must_use]
171    pub fn username(&self) -> &str {
172        &self.inner.username
173    }
174
175    /// Total number of sites this scan will / did probe.
176    #[must_use]
177    pub fn site_count(&self) -> usize {
178        self.inner.site_count
179    }
180
181    /// Wall-clock time since the handle was created.
182    #[must_use]
183    pub fn elapsed(&self) -> Duration {
184        self.inner.started_at.elapsed()
185    }
186
187    /// Unix epoch milliseconds when this handle was constructed. Used
188    /// by the history endpoint so the UI can render relative times.
189    #[must_use]
190    pub fn created_at_ms(&self) -> u64 {
191        self.inner.created_at_ms
192    }
193
194    /// Snapshot of outcomes recorded so far. Cheap clone — `Vec` deep-clones
195    /// only the small number of strings inside each [`CheckOutcome`].
196    pub async fn outcomes_snapshot(&self) -> Vec<CheckOutcome> {
197        self.inner.outcomes.read().await.clone()
198    }
199
200    /// Final aggregate, once the scan has completed. `None` while running.
201    pub async fn finished(&self) -> Option<FinishedScan> {
202        self.inner.finished.read().await.clone()
203    }
204
205    /// Best-effort sync peek used by the eviction policy. Returns
206    /// `false` if the lock is currently held — a momentarily-locked
207    /// `finished` slot is, by construction, still being mutated.
208    #[must_use]
209    pub fn is_finished_now(&self) -> bool {
210        self.inner.finished.try_read().is_ok_and(|g| g.is_some())
211    }
212
213    /// Subscribe to "new outcome appended at index N" notifications.
214    /// Combine with [`Self::outcomes_snapshot`] for "replay then live" semantics.
215    #[must_use]
216    pub fn subscribe(&self) -> broadcast::Receiver<usize> {
217        self.inner.tx.subscribe()
218    }
219
220    /// Wait until the scan finishes. Idempotent — fires for every
221    /// caller registered before *or* after the scan completes (the
222    /// `finished` field is the source of truth; this is just a wake-up).
223    pub async fn wait_done(&self) {
224        if self.inner.finished.read().await.is_some() {
225            return;
226        }
227        self.inner.done.notified().await;
228    }
229
230    fn tx(&self) -> broadcast::Sender<usize> {
231        self.inner.tx.clone()
232    }
233
234    async fn append(&self, outcome: CheckOutcome) {
235        let mut buf = self.inner.outcomes.write().await;
236        let idx = buf.len();
237        buf.push(outcome);
238        drop(buf);
239        // Broadcast send is non-blocking; an `Err` means no live
240        // subscribers, which is fine — `outcomes_snapshot` still works.
241        let _ = self.inner.tx.send(idx);
242    }
243
244    /// Bulk-append outcomes carried over from a previous scan run
245    /// (the overlap subset on a refilter). Used to pre-populate a
246    /// handle before [`spawn`] starts probing the rest, so the SSE
247    /// stream a subscriber attaches to surfaces the carried-over
248    /// outcomes immediately. Acquires one write-lock and emits one
249    /// broadcast per outcome so subscribers see them as ordinary
250    /// `index N appended` events.
251    // The write lock is held for the whole bulk-insert deliberately
252    // so subscribers never see a half-populated buffer; the
253    // "tighten the drop" nursery lint would defeat that.
254    #[allow(clippy::significant_drop_tightening)]
255    pub(crate) async fn extend_outcomes(&self, carried: Vec<CheckOutcome>) {
256        if carried.is_empty() {
257            return;
258        }
259        let mut buf = self.inner.outcomes.write().await;
260        for outcome in carried {
261            let idx = buf.len();
262            buf.push(outcome);
263            let _ = self.inner.tx.send(idx);
264        }
265    }
266
267    pub(crate) async fn publish(&self, finished: FinishedScan) {
268        *self.inner.finished.write().await = Some(finished);
269        self.inner.done.notify_waiters();
270    }
271
272    /// Replace the outcome for `new.site` in the (finished) scan,
273    /// recomputing the summary. No-op if the scan is still running.
274    ///
275    /// Used by the per-site retry endpoint to swap an `Uncertain`
276    /// result with a fresh probe.
277    // The whole function body operates on the write guard; the nursery
278    // lint's "tighten the drop" suggestion would defeat the atomicity
279    // we want between the mutation and the summary recompute.
280    #[allow(clippy::significant_drop_tightening)]
281    pub(crate) async fn replace_outcome(&self, new: CheckOutcome) {
282        let mut guard = self.inner.finished.write().await;
283        let Some(finished) = guard.as_mut() else {
284            return;
285        };
286        if let Some(slot) = finished.outcomes.iter_mut().find(|o| o.site == new.site) {
287            *slot = new;
288        } else {
289            finished.outcomes.push(new);
290        }
291        finished.summary = Summary::from_outcomes(&finished.outcomes);
292    }
293}
294
295/// Optional persistence context handed to [`spawn`]: when present, the
296/// finished scan is written to `<dir>/<scan_id>.json` before the `done`
297/// event fires — so a UI refresh right after completion can reload the
298/// scan from disk.
299#[derive(Debug, Clone)]
300pub(crate) struct PersistContext {
301    pub scan_id: ScanId,
302    pub dir: Arc<PathBuf>,
303}
304
305/// Spawn the background task that runs the scan and feeds the handle.
306///
307/// Returns immediately; the caller drives progress via SSE
308/// ([`ScanHandle::subscribe`]) or polls completion via
309/// [`ScanHandle::finished`].
310pub(crate) fn spawn(
311    handle: ScanHandle,
312    client: Arc<Client>,
313    sites: Arc<[Site]>,
314    username: Username,
315    options: ExecutorOptions,
316    persist_ctx: Option<PersistContext>,
317) -> tokio::task::JoinHandle<()> {
318    tokio::spawn(async move {
319        run(handle, &client, &sites, &username, options, persist_ctx).await;
320    })
321}
322
323async fn run(
324    handle: ScanHandle,
325    client: &Client,
326    sites: &[Site],
327    username: &Username,
328    options: ExecutorOptions,
329    persist_ctx: Option<PersistContext>,
330) {
331    let (tx, mut rx) = mpsc::unbounded_channel::<CheckOutcome>();
332
333    // The executor callback is sync FnMut — bridge to the async append
334    // path via an unbounded mpsc so we never block the executor loop.
335    let tx_for_cb = tx.clone();
336    let scan_fut = async move {
337        let outcomes = executor::run_with_progress(client, sites, username, options, move |o| {
338            // Drop is fine: a receive end disappearing means the server
339            // is shutting down, in which case losing notifications is
340            // exactly what we want.
341            let _ = tx_for_cb.send(o.clone());
342        })
343        .await;
344        // Drop the original sender so the consumer loop terminates.
345        drop(tx);
346        outcomes
347    };
348
349    let handle_ref = handle.clone();
350    let consume_fut = async move {
351        while let Some(outcome) = rx.recv().await {
352            handle_ref.append(outcome).await;
353        }
354    };
355
356    let (all_outcomes, ()) = tokio::join!(scan_fut, consume_fut);
357
358    let elapsed_ms = u64::try_from(handle.elapsed().as_millis()).unwrap_or(u64::MAX);
359    let summary = Summary::from_outcomes(&all_outcomes);
360    let finished = FinishedScan {
361        summary,
362        outcomes: all_outcomes,
363        elapsed_ms,
364    };
365
366    // Persist before publishing the `done` event so a UI that refreshes
367    // immediately after seeing `done` still finds the scan on disk.
368    if let Some(ctx) = &persist_ctx {
369        let snapshot = PersistedScan::from_finished(
370            ctx.scan_id.clone(),
371            handle.username().to_owned(),
372            handle.site_count(),
373            handle.created_at_ms(),
374            finished.clone(),
375        );
376        if let Err(err) = persist::save(&ctx.dir, &snapshot).await {
377            tracing::warn!(error = %err, scan_id = %ctx.scan_id, "failed to persist scan");
378        } else {
379            let removed = persist::prune(&ctx.dir, persist::MAX_PERSISTED_SCANS).await;
380            if removed > 0 {
381                tracing::debug!(removed, "pruned older persisted scans");
382            }
383        }
384    }
385
386    handle.publish(finished).await;
387    drop(handle.tx()); // help the broadcast channel close cleanly
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use adler_core::UncertainReason;
394
395    fn outcome(name: &str, kind: MatchKind) -> CheckOutcome {
396        CheckOutcome {
397            site: name.into(),
398            url: format!("https://{name}.example/u"),
399            kind,
400            reason: matches!(kind, MatchKind::Uncertain)
401                .then_some(UncertainReason::Other("test".into())),
402            elapsed_ms: 1,
403            enrichment: std::collections::BTreeMap::new(),
404            evidence: Vec::new(),
405            transport: None,
406            escalations: 0,
407        }
408    }
409
410    #[test]
411    fn summary_tallies_by_verdict() {
412        let s = Summary::from_outcomes(&[
413            outcome("a", MatchKind::Found),
414            outcome("b", MatchKind::NotFound),
415            outcome("c", MatchKind::NotFound),
416            outcome("d", MatchKind::Uncertain),
417        ]);
418        assert_eq!(s.found, 1);
419        assert_eq!(s.not_found, 2);
420        assert_eq!(s.uncertain, 1);
421        assert_eq!(s.total(), 4);
422    }
423
424    #[test]
425    fn scan_id_is_url_safe_and_random() {
426        let a = ScanId::new();
427        let b = ScanId::new();
428        assert_eq!(a.as_str().len(), 12);
429        assert!(
430            a.as_str()
431                .chars()
432                .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit()),
433        );
434        // Birthday-collision probability on two 71-bit IDs is negligible.
435        assert_ne!(a, b);
436    }
437
438    #[tokio::test]
439    async fn append_publishes_to_subscribers_and_history() {
440        let handle = ScanHandle::new("alice", 2, 16);
441        let mut rx = handle.subscribe();
442
443        handle.append(outcome("GitHub", MatchKind::Found)).await;
444        handle.append(outcome("GitLab", MatchKind::NotFound)).await;
445
446        // History was recorded in order.
447        let snap = handle.outcomes_snapshot().await;
448        assert_eq!(snap.len(), 2);
449        assert_eq!(snap[0].site, "GitHub");
450        assert_eq!(snap[1].site, "GitLab");
451
452        // Indices were broadcast in order.
453        assert_eq!(rx.recv().await.unwrap(), 0);
454        assert_eq!(rx.recv().await.unwrap(), 1);
455    }
456
457    #[tokio::test]
458    async fn publish_releases_wait_done_and_exposes_finished() {
459        let handle = ScanHandle::new("alice", 1, 4);
460
461        let waiter = {
462            let h = handle.clone();
463            tokio::spawn(async move { h.wait_done().await })
464        };
465
466        // Give the waiter a chance to park.
467        tokio::task::yield_now().await;
468
469        handle
470            .publish(FinishedScan {
471                summary: Summary {
472                    found: 1,
473                    not_found: 0,
474                    uncertain: 0,
475                },
476                outcomes: vec![outcome("GitHub", MatchKind::Found)],
477                elapsed_ms: 42,
478            })
479            .await;
480
481        waiter.await.unwrap();
482        let f = handle.finished().await.expect("finished");
483        assert_eq!(f.summary.found, 1);
484        assert_eq!(f.elapsed_ms, 42);
485        assert_eq!(f.outcomes.len(), 1);
486    }
487
488    #[tokio::test]
489    async fn wait_done_returns_immediately_if_already_finished() {
490        let handle = ScanHandle::new("alice", 1, 4);
491        handle
492            .publish(FinishedScan {
493                summary: Summary::default(),
494                outcomes: Vec::new(),
495                elapsed_ms: 0,
496            })
497            .await;
498        // Should not deadlock — the fast path checks `finished` first.
499        tokio::time::timeout(Duration::from_millis(100), handle.wait_done())
500            .await
501            .expect("wait_done must return immediately when already finished");
502    }
503}