Skip to main content

adler_server/
scan.rs

1//! Per-scan state: the live broadcast channel + the final aggregate.
2//!
3//! A scan is started in the background via [`spawn`]. Outcomes flow
4//! into [`ScanHandle::outcomes`] in append-only order; each push fans
5//! out an index notification on [`ScanHandle::tx`] so SSE subscribers
6//! can stream them as they arrive. When the executor finishes, the
7//! aggregate is published in [`ScanHandle::finished`] and waiters
8//! parked on [`ScanHandle::done`] are released.
9
10use std::fmt;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
14
15use adler_core::{
16    CheckOutcome, Client, ExecutorOptions, IdentityCluster, MatchKind, Site, Username,
17    build_identity_clusters, executor,
18};
19use serde::{Deserialize, Serialize};
20use tokio::sync::{Notify, RwLock, broadcast, mpsc};
21
22use crate::persist::{self, PersistedScan, ScanRequestContext};
23
24/// Identifier for a running or finished scan.
25///
26/// Short alphanumeric token (12 chars, ~71 bits of entropy) suitable
27/// for URLs. Not a cryptographic identifier — it is a *capability* in
28/// the sense that knowing the ID lets you read scan results, so it is
29/// random enough not to be guessable in a single-process session, but
30/// no replacement for proper auth if the server is ever exposed
31/// publicly (it isn't, by default — see [`crate::AppConfig`]).
32#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
33#[serde(transparent)]
34pub struct ScanId(String);
35
36impl ScanId {
37    /// Mint a fresh random ID using the workspace `fastrand` PRNG.
38    #[must_use]
39    pub fn new() -> Self {
40        const ALPHABET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
41        let mut s = String::with_capacity(12);
42        for _ in 0..12 {
43            let idx = fastrand::usize(..ALPHABET.len());
44            s.push(char::from(ALPHABET[idx]));
45        }
46        Self(s)
47    }
48
49    /// Borrow the ID as a string slice.
50    #[must_use]
51    pub fn as_str(&self) -> &str {
52        &self.0
53    }
54}
55
56impl Default for ScanId {
57    fn default() -> Self {
58        Self::new()
59    }
60}
61
62impl fmt::Display for ScanId {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        f.write_str(&self.0)
65    }
66}
67
68impl From<String> for ScanId {
69    fn from(s: String) -> Self {
70        Self(s)
71    }
72}
73
74/// Aggregate published once a scan finishes.
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct FinishedScan {
77    /// Counts by verdict.
78    pub summary: Summary,
79    /// All outcomes, in completion order (same order as the live stream).
80    pub outcomes: Vec<CheckOutcome>,
81    /// Deterministic identity candidates derived from found outcomes
82    /// with structured profile evidence.
83    #[serde(default, skip_serializing_if = "Vec::is_empty")]
84    pub identity_clusters: Vec<IdentityCluster>,
85    /// Wall-clock duration of the whole scan, milliseconds.
86    pub elapsed_ms: u64,
87}
88
89impl FinishedScan {
90    pub(crate) fn from_outcomes(
91        username: &str,
92        outcomes: Vec<CheckOutcome>,
93        elapsed_ms: u64,
94    ) -> Self {
95        let summary = Summary::from_outcomes(&outcomes);
96        let identity_clusters = build_identity_clusters(username, &outcomes);
97        Self {
98            summary,
99            outcomes,
100            identity_clusters,
101            elapsed_ms,
102        }
103    }
104
105    pub(crate) fn refresh_identity_clusters(&mut self, username: &str) {
106        self.identity_clusters = build_identity_clusters(username, &self.outcomes);
107    }
108}
109
110/// Verdict counts for a finished scan.
111#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
112pub struct Summary {
113    /// Sites where the account exists.
114    pub found: usize,
115    /// Sites where the account doesn't exist.
116    pub not_found: usize,
117    /// Sites with inconclusive verdicts.
118    pub uncertain: usize,
119}
120
121impl Summary {
122    /// Tally verdicts from a slice of outcomes.
123    #[must_use]
124    pub fn from_outcomes(outcomes: &[CheckOutcome]) -> Self {
125        let mut s = Self::default();
126        for o in outcomes {
127            match o.kind {
128                MatchKind::Found => s.found += 1,
129                MatchKind::NotFound => s.not_found += 1,
130                MatchKind::Uncertain => s.uncertain += 1,
131            }
132        }
133        s
134    }
135
136    /// Total number of probed sites.
137    #[must_use]
138    pub const fn total(&self) -> usize {
139        self.found + self.not_found + self.uncertain
140    }
141}
142
143/// Live state of one scan.
144///
145/// All fields are `Arc<…>` because handles are shared between the
146/// background scan task and any number of HTTP request handlers.
147#[derive(Debug, Clone)]
148pub struct ScanHandle {
149    inner: Arc<Inner>,
150}
151
152#[derive(Debug)]
153struct Inner {
154    username: String,
155    site_count: usize,
156    started_at: Instant,
157    created_at_ms: u64,
158    outcomes: RwLock<Vec<CheckOutcome>>,
159    finished: RwLock<Option<FinishedScan>>,
160    // Broadcast carries the *index* of a newly appended outcome rather
161    // than the outcome itself — subscribers re-read from `outcomes` so
162    // a slow subscriber that misses a notification can still resync by
163    // re-snapshotting on the next event.
164    tx: broadcast::Sender<usize>,
165    done: Notify,
166}
167
168impl ScanHandle {
169    /// Construct an empty handle ready to accept outcomes.
170    ///
171    /// `site_count` is the size of the site list this scan will run
172    /// against — surfaced through [`Self::site_count`] so the UI can
173    /// render `423 / 1890` progress without holding open an SSE
174    /// stream. `outcome_buffer` sizes the broadcast ring buffer; a
175    /// value substantially larger than `site_count` is fine — the cost
176    /// is one `Arc<…>` slot per buffered notification.
177    #[must_use]
178    pub fn new(username: impl Into<String>, site_count: usize, outcome_buffer: usize) -> Self {
179        let (tx, _) = broadcast::channel(outcome_buffer.max(1));
180        let created_at_ms = SystemTime::now()
181            .duration_since(UNIX_EPOCH)
182            .map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX));
183        Self {
184            inner: Arc::new(Inner {
185                username: username.into(),
186                site_count,
187                started_at: Instant::now(),
188                created_at_ms,
189                outcomes: RwLock::new(Vec::new()),
190                finished: RwLock::new(None),
191                tx,
192                done: Notify::new(),
193            }),
194        }
195    }
196
197    /// Username being scanned (for display / debugging).
198    #[must_use]
199    pub fn username(&self) -> &str {
200        &self.inner.username
201    }
202
203    /// Total number of sites this scan will / did probe.
204    #[must_use]
205    pub fn site_count(&self) -> usize {
206        self.inner.site_count
207    }
208
209    /// Wall-clock time since the handle was created.
210    #[must_use]
211    pub fn elapsed(&self) -> Duration {
212        self.inner.started_at.elapsed()
213    }
214
215    /// Unix epoch milliseconds when this handle was constructed. Used
216    /// by the history endpoint so the UI can render relative times.
217    #[must_use]
218    pub fn created_at_ms(&self) -> u64 {
219        self.inner.created_at_ms
220    }
221
222    /// Snapshot of outcomes recorded so far. Cheap clone — `Vec` deep-clones
223    /// only the small number of strings inside each [`CheckOutcome`].
224    pub async fn outcomes_snapshot(&self) -> Vec<CheckOutcome> {
225        self.inner.outcomes.read().await.clone()
226    }
227
228    /// Final aggregate, once the scan has completed. `None` while running.
229    pub async fn finished(&self) -> Option<FinishedScan> {
230        self.inner.finished.read().await.clone()
231    }
232
233    /// Best-effort sync peek used by the eviction policy. Returns
234    /// `false` if the lock is currently held — a momentarily-locked
235    /// `finished` slot is, by construction, still being mutated.
236    #[must_use]
237    pub fn is_finished_now(&self) -> bool {
238        self.inner.finished.try_read().is_ok_and(|g| g.is_some())
239    }
240
241    /// Subscribe to "new outcome appended at index N" notifications.
242    /// Combine with [`Self::outcomes_snapshot`] for "replay then live" semantics.
243    #[must_use]
244    pub fn subscribe(&self) -> broadcast::Receiver<usize> {
245        self.inner.tx.subscribe()
246    }
247
248    /// Wait until the scan finishes. Idempotent — fires for every
249    /// caller registered before *or* after the scan completes (the
250    /// `finished` field is the source of truth; this is just a wake-up).
251    pub async fn wait_done(&self) {
252        if self.inner.finished.read().await.is_some() {
253            return;
254        }
255        self.inner.done.notified().await;
256    }
257
258    fn tx(&self) -> broadcast::Sender<usize> {
259        self.inner.tx.clone()
260    }
261
262    async fn append(&self, outcome: CheckOutcome) {
263        let mut buf = self.inner.outcomes.write().await;
264        let idx = buf.len();
265        buf.push(outcome);
266        drop(buf);
267        // Broadcast send is non-blocking; an `Err` means no live
268        // subscribers, which is fine — `outcomes_snapshot` still works.
269        let _ = self.inner.tx.send(idx);
270    }
271
272    /// Bulk-append outcomes carried over from a previous scan run
273    /// (the overlap subset on a refilter). Used to pre-populate a
274    /// handle before [`spawn`] starts probing the rest, so the SSE
275    /// stream a subscriber attaches to surfaces the carried-over
276    /// outcomes immediately. Acquires one write-lock and emits one
277    /// broadcast per outcome so subscribers see them as ordinary
278    /// `index N appended` events.
279    // The write lock is held for the whole bulk-insert deliberately
280    // so subscribers never see a half-populated buffer; the
281    // "tighten the drop" nursery lint would defeat that.
282    #[allow(clippy::significant_drop_tightening)]
283    pub(crate) async fn extend_outcomes(&self, carried: Vec<CheckOutcome>) {
284        if carried.is_empty() {
285            return;
286        }
287        let mut buf = self.inner.outcomes.write().await;
288        for outcome in carried {
289            let idx = buf.len();
290            buf.push(outcome);
291            let _ = self.inner.tx.send(idx);
292        }
293    }
294
295    pub(crate) async fn publish(&self, finished: FinishedScan) {
296        *self.inner.finished.write().await = Some(finished);
297        self.inner.done.notify_waiters();
298    }
299
300    /// Replace the outcome for `new.site` in the (finished) scan,
301    /// recomputing the summary. No-op if the scan is still running.
302    ///
303    /// Used by the per-site retry endpoint to swap an `Uncertain`
304    /// result with a fresh probe.
305    // The whole function body operates on the write guard; the nursery
306    // lint's "tighten the drop" suggestion would defeat the atomicity
307    // we want between the mutation and the summary recompute.
308    #[allow(clippy::significant_drop_tightening)]
309    pub(crate) async fn replace_outcome(&self, new: CheckOutcome) {
310        let mut guard = self.inner.finished.write().await;
311        let Some(finished) = guard.as_mut() else {
312            return;
313        };
314        if let Some(slot) = finished.outcomes.iter_mut().find(|o| o.site == new.site) {
315            *slot = new;
316        } else {
317            finished.outcomes.push(new);
318        }
319        finished.summary = Summary::from_outcomes(&finished.outcomes);
320        finished.refresh_identity_clusters(self.username());
321    }
322}
323
324/// Optional persistence context handed to [`spawn`]: when present, the
325/// finished scan is written to `<dir>/<scan_id>.json` before the `done`
326/// event fires — so a UI refresh right after completion can reload the
327/// scan from disk.
328#[derive(Debug, Clone)]
329pub(crate) struct PersistContext {
330    pub scan_id: ScanId,
331    pub dir: Arc<PathBuf>,
332    pub request_context: ScanRequestContext,
333}
334
335/// Spawn the background task that runs the scan and feeds the handle.
336///
337/// Returns immediately; the caller drives progress via SSE
338/// ([`ScanHandle::subscribe`]) or polls completion via
339/// [`ScanHandle::finished`].
340pub(crate) fn spawn(
341    handle: ScanHandle,
342    client: Arc<Client>,
343    sites: Arc<[Site]>,
344    username: Username,
345    options: ExecutorOptions,
346    persist_ctx: Option<PersistContext>,
347) -> tokio::task::JoinHandle<()> {
348    tokio::spawn(async move {
349        run(handle, &client, &sites, &username, options, persist_ctx).await;
350    })
351}
352
353async fn run(
354    handle: ScanHandle,
355    client: &Client,
356    sites: &[Site],
357    username: &Username,
358    options: ExecutorOptions,
359    persist_ctx: Option<PersistContext>,
360) {
361    let (tx, mut rx) = mpsc::unbounded_channel::<CheckOutcome>();
362
363    // The executor callback is sync FnMut — bridge to the async append
364    // path via an unbounded mpsc so we never block the executor loop.
365    let tx_for_cb = tx.clone();
366    let scan_fut = async move {
367        let outcomes = executor::run_with_progress(client, sites, username, options, move |o| {
368            // Drop is fine: a receive end disappearing means the server
369            // is shutting down, in which case losing notifications is
370            // exactly what we want.
371            let _ = tx_for_cb.send(o.clone());
372        })
373        .await;
374        // Drop the original sender so the consumer loop terminates.
375        drop(tx);
376        outcomes
377    };
378
379    let handle_ref = handle.clone();
380    let consume_fut = async move {
381        while let Some(outcome) = rx.recv().await {
382            handle_ref.append(outcome).await;
383        }
384    };
385
386    let (all_outcomes, ()) = tokio::join!(scan_fut, consume_fut);
387
388    let elapsed_ms = u64::try_from(handle.elapsed().as_millis()).unwrap_or(u64::MAX);
389    let finished = FinishedScan::from_outcomes(username.as_str(), all_outcomes, elapsed_ms);
390
391    // Persist before publishing the `done` event so a UI that refreshes
392    // immediately after seeing `done` still finds the scan on disk.
393    if let Some(ctx) = &persist_ctx {
394        let snapshot = PersistedScan::from_finished(
395            ctx.scan_id.clone(),
396            handle.username().to_owned(),
397            handle.site_count(),
398            handle.created_at_ms(),
399            finished.clone(),
400        )
401        .with_request_context(ctx.request_context.clone());
402        if let Err(err) = persist::save(&ctx.dir, &snapshot).await {
403            tracing::warn!(error = %err, scan_id = %ctx.scan_id, "failed to persist scan");
404        } else {
405            let removed = persist::prune(&ctx.dir, persist::MAX_PERSISTED_SCANS).await;
406            if removed > 0 {
407                tracing::debug!(removed, "pruned older persisted scans");
408            }
409        }
410    }
411
412    handle.publish(finished).await;
413    drop(handle.tx()); // help the broadcast channel close cleanly
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use adler_core::{ProfileEvidence, UncertainReason};
420
421    fn outcome(name: &str, kind: MatchKind) -> CheckOutcome {
422        CheckOutcome {
423            site: name.into(),
424            url: format!("https://{name}.example/u"),
425            kind,
426            reason: matches!(kind, MatchKind::Uncertain)
427                .then_some(UncertainReason::Other("test".into())),
428            elapsed_ms: 1,
429            enrichment: std::collections::BTreeMap::new(),
430            evidence: Vec::new(),
431            profile_evidence: Vec::new(),
432            confidence: adler_core::ConfidenceScore::default(),
433            transport: None,
434            escalations: 0,
435        }
436    }
437
438    fn found_with_website(site: &str, website: &str) -> CheckOutcome {
439        let mut outcome = outcome(site, MatchKind::Found);
440        outcome
441            .profile_evidence
442            .push(ProfileEvidence::from_enrichment(
443                site,
444                &outcome.url,
445                "website",
446                website,
447            ));
448        outcome
449    }
450
451    #[test]
452    fn summary_tallies_by_verdict() {
453        let s = Summary::from_outcomes(&[
454            outcome("a", MatchKind::Found),
455            outcome("b", MatchKind::NotFound),
456            outcome("c", MatchKind::NotFound),
457            outcome("d", MatchKind::Uncertain),
458        ]);
459        assert_eq!(s.found, 1);
460        assert_eq!(s.not_found, 2);
461        assert_eq!(s.uncertain, 1);
462        assert_eq!(s.total(), 4);
463    }
464
465    #[test]
466    fn scan_id_is_url_safe_and_random() {
467        let a = ScanId::new();
468        let b = ScanId::new();
469        assert_eq!(a.as_str().len(), 12);
470        assert!(
471            a.as_str()
472                .chars()
473                .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit()),
474        );
475        // Birthday-collision probability on two 71-bit IDs is negligible.
476        assert_ne!(a, b);
477    }
478
479    #[test]
480    fn finished_scan_includes_identity_clusters() {
481        let finished = FinishedScan::from_outcomes(
482            "alice",
483            vec![
484                found_with_website("GitHub", "https://alice.dev"),
485                found_with_website("GitLab", "https://alice.dev"),
486            ],
487            42,
488        );
489
490        assert_eq!(finished.summary.found, 2);
491        assert_eq!(finished.identity_clusters.len(), 1);
492        assert_eq!(finished.identity_clusters[0].members.len(), 2);
493    }
494
495    #[tokio::test]
496    async fn append_publishes_to_subscribers_and_history() {
497        let handle = ScanHandle::new("alice", 2, 16);
498        let mut rx = handle.subscribe();
499
500        handle.append(outcome("GitHub", MatchKind::Found)).await;
501        handle.append(outcome("GitLab", MatchKind::NotFound)).await;
502
503        // History was recorded in order.
504        let snap = handle.outcomes_snapshot().await;
505        assert_eq!(snap.len(), 2);
506        assert_eq!(snap[0].site, "GitHub");
507        assert_eq!(snap[1].site, "GitLab");
508
509        // Indices were broadcast in order.
510        assert_eq!(rx.recv().await.unwrap(), 0);
511        assert_eq!(rx.recv().await.unwrap(), 1);
512    }
513
514    #[tokio::test]
515    async fn publish_releases_wait_done_and_exposes_finished() {
516        let handle = ScanHandle::new("alice", 1, 4);
517
518        let waiter = {
519            let h = handle.clone();
520            tokio::spawn(async move { h.wait_done().await })
521        };
522
523        // Give the waiter a chance to park.
524        tokio::task::yield_now().await;
525
526        handle
527            .publish(FinishedScan::from_outcomes(
528                "alice",
529                vec![outcome("GitHub", MatchKind::Found)],
530                42,
531            ))
532            .await;
533
534        waiter.await.unwrap();
535        let f = handle.finished().await.expect("finished");
536        assert_eq!(f.summary.found, 1);
537        assert_eq!(f.elapsed_ms, 42);
538        assert_eq!(f.outcomes.len(), 1);
539    }
540
541    #[tokio::test]
542    async fn wait_done_returns_immediately_if_already_finished() {
543        let handle = ScanHandle::new("alice", 1, 4);
544        handle
545            .publish(FinishedScan::from_outcomes("alice", Vec::new(), 0))
546            .await;
547        // Should not deadlock — the fast path checks `finished` first.
548        tokio::time::timeout(Duration::from_millis(100), handle.wait_done())
549            .await
550            .expect("wait_done must return immediately when already finished");
551    }
552
553    #[tokio::test]
554    async fn replace_outcome_recomputes_identity_clusters() {
555        let handle = ScanHandle::new("alice", 2, 4);
556        handle
557            .publish(FinishedScan::from_outcomes(
558                "alice",
559                vec![
560                    found_with_website("GitHub", "https://alice.dev"),
561                    outcome("GitLab", MatchKind::NotFound),
562                ],
563                10,
564            ))
565            .await;
566
567        let mut finished = handle.finished().await.expect("finished");
568        assert!(finished.identity_clusters.is_empty());
569
570        handle
571            .replace_outcome(found_with_website("GitLab", "https://alice.dev"))
572            .await;
573
574        finished = handle.finished().await.expect("finished");
575        assert_eq!(finished.summary.found, 2);
576        assert_eq!(finished.identity_clusters.len(), 1);
577        assert_eq!(finished.identity_clusters[0].members.len(), 2);
578    }
579}