1use std::fmt;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
14
15use adler_core::{
16 CheckOutcome, Client, ExecutorOptions, IdentityCluster, MatchKind, Site, Username,
17 build_identity_clusters, executor,
18};
19use serde::{Deserialize, Serialize};
20use tokio::sync::{Notify, RwLock, broadcast, mpsc};
21
22use crate::persist::{self, PersistedScan, ScanRequestContext};
23
24#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
33#[serde(transparent)]
34pub struct ScanId(String);
35
36impl ScanId {
37 #[must_use]
39 pub fn new() -> Self {
40 const ALPHABET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
41 let mut s = String::with_capacity(12);
42 for _ in 0..12 {
43 let idx = fastrand::usize(..ALPHABET.len());
44 s.push(char::from(ALPHABET[idx]));
45 }
46 Self(s)
47 }
48
49 #[must_use]
51 pub fn as_str(&self) -> &str {
52 &self.0
53 }
54}
55
56impl Default for ScanId {
57 fn default() -> Self {
58 Self::new()
59 }
60}
61
62impl fmt::Display for ScanId {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 f.write_str(&self.0)
65 }
66}
67
68impl From<String> for ScanId {
69 fn from(s: String) -> Self {
70 Self(s)
71 }
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct FinishedScan {
77 pub summary: Summary,
79 pub outcomes: Vec<CheckOutcome>,
81 #[serde(default, skip_serializing_if = "Vec::is_empty")]
84 pub identity_clusters: Vec<IdentityCluster>,
85 pub elapsed_ms: u64,
87}
88
89impl FinishedScan {
90 pub(crate) fn from_outcomes(
91 username: &str,
92 outcomes: Vec<CheckOutcome>,
93 elapsed_ms: u64,
94 ) -> Self {
95 let summary = Summary::from_outcomes(&outcomes);
96 let identity_clusters = build_identity_clusters(username, &outcomes);
97 Self {
98 summary,
99 outcomes,
100 identity_clusters,
101 elapsed_ms,
102 }
103 }
104
105 pub(crate) fn refresh_identity_clusters(&mut self, username: &str) {
106 self.identity_clusters = build_identity_clusters(username, &self.outcomes);
107 }
108}
109
110#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
112pub struct Summary {
113 pub found: usize,
115 pub not_found: usize,
117 pub uncertain: usize,
119}
120
121impl Summary {
122 #[must_use]
124 pub fn from_outcomes(outcomes: &[CheckOutcome]) -> Self {
125 let mut s = Self::default();
126 for o in outcomes {
127 match o.kind {
128 MatchKind::Found => s.found += 1,
129 MatchKind::NotFound => s.not_found += 1,
130 MatchKind::Uncertain => s.uncertain += 1,
131 }
132 }
133 s
134 }
135
136 #[must_use]
138 pub const fn total(&self) -> usize {
139 self.found + self.not_found + self.uncertain
140 }
141}
142
143#[derive(Debug, Clone)]
148pub struct ScanHandle {
149 inner: Arc<Inner>,
150}
151
152#[derive(Debug)]
153struct Inner {
154 username: String,
155 site_count: usize,
156 started_at: Instant,
157 created_at_ms: u64,
158 outcomes: RwLock<Vec<CheckOutcome>>,
159 finished: RwLock<Option<FinishedScan>>,
160 tx: broadcast::Sender<usize>,
165 done: Notify,
166}
167
168impl ScanHandle {
169 #[must_use]
178 pub fn new(username: impl Into<String>, site_count: usize, outcome_buffer: usize) -> Self {
179 let (tx, _) = broadcast::channel(outcome_buffer.max(1));
180 let created_at_ms = SystemTime::now()
181 .duration_since(UNIX_EPOCH)
182 .map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX));
183 Self {
184 inner: Arc::new(Inner {
185 username: username.into(),
186 site_count,
187 started_at: Instant::now(),
188 created_at_ms,
189 outcomes: RwLock::new(Vec::new()),
190 finished: RwLock::new(None),
191 tx,
192 done: Notify::new(),
193 }),
194 }
195 }
196
197 #[must_use]
199 pub fn username(&self) -> &str {
200 &self.inner.username
201 }
202
203 #[must_use]
205 pub fn site_count(&self) -> usize {
206 self.inner.site_count
207 }
208
209 #[must_use]
211 pub fn elapsed(&self) -> Duration {
212 self.inner.started_at.elapsed()
213 }
214
215 #[must_use]
218 pub fn created_at_ms(&self) -> u64 {
219 self.inner.created_at_ms
220 }
221
222 pub async fn outcomes_snapshot(&self) -> Vec<CheckOutcome> {
225 self.inner.outcomes.read().await.clone()
226 }
227
228 pub async fn finished(&self) -> Option<FinishedScan> {
230 self.inner.finished.read().await.clone()
231 }
232
233 #[must_use]
237 pub fn is_finished_now(&self) -> bool {
238 self.inner.finished.try_read().is_ok_and(|g| g.is_some())
239 }
240
241 #[must_use]
244 pub fn subscribe(&self) -> broadcast::Receiver<usize> {
245 self.inner.tx.subscribe()
246 }
247
248 pub async fn wait_done(&self) {
252 if self.inner.finished.read().await.is_some() {
253 return;
254 }
255 self.inner.done.notified().await;
256 }
257
258 fn tx(&self) -> broadcast::Sender<usize> {
259 self.inner.tx.clone()
260 }
261
262 async fn append(&self, outcome: CheckOutcome) {
263 let mut buf = self.inner.outcomes.write().await;
264 let idx = buf.len();
265 buf.push(outcome);
266 drop(buf);
267 let _ = self.inner.tx.send(idx);
270 }
271
272 #[allow(clippy::significant_drop_tightening)]
283 pub(crate) async fn extend_outcomes(&self, carried: Vec<CheckOutcome>) {
284 if carried.is_empty() {
285 return;
286 }
287 let mut buf = self.inner.outcomes.write().await;
288 for outcome in carried {
289 let idx = buf.len();
290 buf.push(outcome);
291 let _ = self.inner.tx.send(idx);
292 }
293 }
294
295 pub(crate) async fn publish(&self, finished: FinishedScan) {
296 *self.inner.finished.write().await = Some(finished);
297 self.inner.done.notify_waiters();
298 }
299
300 #[allow(clippy::significant_drop_tightening)]
309 pub(crate) async fn replace_outcome(&self, new: CheckOutcome) {
310 let mut guard = self.inner.finished.write().await;
311 let Some(finished) = guard.as_mut() else {
312 return;
313 };
314 if let Some(slot) = finished.outcomes.iter_mut().find(|o| o.site == new.site) {
315 *slot = new;
316 } else {
317 finished.outcomes.push(new);
318 }
319 finished.summary = Summary::from_outcomes(&finished.outcomes);
320 finished.refresh_identity_clusters(self.username());
321 }
322}
323
324#[derive(Debug, Clone)]
329pub(crate) struct PersistContext {
330 pub scan_id: ScanId,
331 pub dir: Arc<PathBuf>,
332 pub request_context: ScanRequestContext,
333}
334
335pub(crate) fn spawn(
341 handle: ScanHandle,
342 client: Arc<Client>,
343 sites: Arc<[Site]>,
344 username: Username,
345 options: ExecutorOptions,
346 persist_ctx: Option<PersistContext>,
347) -> tokio::task::JoinHandle<()> {
348 tokio::spawn(async move {
349 run(handle, &client, &sites, &username, options, persist_ctx).await;
350 })
351}
352
353async fn run(
354 handle: ScanHandle,
355 client: &Client,
356 sites: &[Site],
357 username: &Username,
358 options: ExecutorOptions,
359 persist_ctx: Option<PersistContext>,
360) {
361 let (tx, mut rx) = mpsc::unbounded_channel::<CheckOutcome>();
362
363 let tx_for_cb = tx.clone();
366 let scan_fut = async move {
367 let outcomes = executor::run_with_progress(client, sites, username, options, move |o| {
368 let _ = tx_for_cb.send(o.clone());
372 })
373 .await;
374 drop(tx);
376 outcomes
377 };
378
379 let handle_ref = handle.clone();
380 let consume_fut = async move {
381 while let Some(outcome) = rx.recv().await {
382 handle_ref.append(outcome).await;
383 }
384 };
385
386 let (all_outcomes, ()) = tokio::join!(scan_fut, consume_fut);
387
388 let elapsed_ms = u64::try_from(handle.elapsed().as_millis()).unwrap_or(u64::MAX);
389 let finished = FinishedScan::from_outcomes(username.as_str(), all_outcomes, elapsed_ms);
390
391 if let Some(ctx) = &persist_ctx {
394 let snapshot = PersistedScan::from_finished(
395 ctx.scan_id.clone(),
396 handle.username().to_owned(),
397 handle.site_count(),
398 handle.created_at_ms(),
399 finished.clone(),
400 )
401 .with_request_context(ctx.request_context.clone());
402 if let Err(err) = persist::save(&ctx.dir, &snapshot).await {
403 tracing::warn!(error = %err, scan_id = %ctx.scan_id, "failed to persist scan");
404 } else {
405 let removed = persist::prune(&ctx.dir, persist::MAX_PERSISTED_SCANS).await;
406 if removed > 0 {
407 tracing::debug!(removed, "pruned older persisted scans");
408 }
409 }
410 }
411
412 handle.publish(finished).await;
413 drop(handle.tx()); }
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use adler_core::{ProfileEvidence, UncertainReason};
420
421 fn outcome(name: &str, kind: MatchKind) -> CheckOutcome {
422 CheckOutcome {
423 site: name.into(),
424 url: format!("https://{name}.example/u"),
425 kind,
426 reason: matches!(kind, MatchKind::Uncertain)
427 .then_some(UncertainReason::Other("test".into())),
428 elapsed_ms: 1,
429 enrichment: std::collections::BTreeMap::new(),
430 evidence: Vec::new(),
431 profile_evidence: Vec::new(),
432 confidence: adler_core::ConfidenceScore::default(),
433 transport: None,
434 escalations: 0,
435 }
436 }
437
438 fn found_with_website(site: &str, website: &str) -> CheckOutcome {
439 let mut outcome = outcome(site, MatchKind::Found);
440 outcome
441 .profile_evidence
442 .push(ProfileEvidence::from_enrichment(
443 site,
444 &outcome.url,
445 "website",
446 website,
447 ));
448 outcome
449 }
450
451 #[test]
452 fn summary_tallies_by_verdict() {
453 let s = Summary::from_outcomes(&[
454 outcome("a", MatchKind::Found),
455 outcome("b", MatchKind::NotFound),
456 outcome("c", MatchKind::NotFound),
457 outcome("d", MatchKind::Uncertain),
458 ]);
459 assert_eq!(s.found, 1);
460 assert_eq!(s.not_found, 2);
461 assert_eq!(s.uncertain, 1);
462 assert_eq!(s.total(), 4);
463 }
464
465 #[test]
466 fn scan_id_is_url_safe_and_random() {
467 let a = ScanId::new();
468 let b = ScanId::new();
469 assert_eq!(a.as_str().len(), 12);
470 assert!(
471 a.as_str()
472 .chars()
473 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit()),
474 );
475 assert_ne!(a, b);
477 }
478
479 #[test]
480 fn finished_scan_includes_identity_clusters() {
481 let finished = FinishedScan::from_outcomes(
482 "alice",
483 vec![
484 found_with_website("GitHub", "https://alice.dev"),
485 found_with_website("GitLab", "https://alice.dev"),
486 ],
487 42,
488 );
489
490 assert_eq!(finished.summary.found, 2);
491 assert_eq!(finished.identity_clusters.len(), 1);
492 assert_eq!(finished.identity_clusters[0].members.len(), 2);
493 }
494
495 #[tokio::test]
496 async fn append_publishes_to_subscribers_and_history() {
497 let handle = ScanHandle::new("alice", 2, 16);
498 let mut rx = handle.subscribe();
499
500 handle.append(outcome("GitHub", MatchKind::Found)).await;
501 handle.append(outcome("GitLab", MatchKind::NotFound)).await;
502
503 let snap = handle.outcomes_snapshot().await;
505 assert_eq!(snap.len(), 2);
506 assert_eq!(snap[0].site, "GitHub");
507 assert_eq!(snap[1].site, "GitLab");
508
509 assert_eq!(rx.recv().await.unwrap(), 0);
511 assert_eq!(rx.recv().await.unwrap(), 1);
512 }
513
514 #[tokio::test]
515 async fn publish_releases_wait_done_and_exposes_finished() {
516 let handle = ScanHandle::new("alice", 1, 4);
517
518 let waiter = {
519 let h = handle.clone();
520 tokio::spawn(async move { h.wait_done().await })
521 };
522
523 tokio::task::yield_now().await;
525
526 handle
527 .publish(FinishedScan::from_outcomes(
528 "alice",
529 vec![outcome("GitHub", MatchKind::Found)],
530 42,
531 ))
532 .await;
533
534 waiter.await.unwrap();
535 let f = handle.finished().await.expect("finished");
536 assert_eq!(f.summary.found, 1);
537 assert_eq!(f.elapsed_ms, 42);
538 assert_eq!(f.outcomes.len(), 1);
539 }
540
541 #[tokio::test]
542 async fn wait_done_returns_immediately_if_already_finished() {
543 let handle = ScanHandle::new("alice", 1, 4);
544 handle
545 .publish(FinishedScan::from_outcomes("alice", Vec::new(), 0))
546 .await;
547 tokio::time::timeout(Duration::from_millis(100), handle.wait_done())
549 .await
550 .expect("wait_done must return immediately when already finished");
551 }
552
553 #[tokio::test]
554 async fn replace_outcome_recomputes_identity_clusters() {
555 let handle = ScanHandle::new("alice", 2, 4);
556 handle
557 .publish(FinishedScan::from_outcomes(
558 "alice",
559 vec![
560 found_with_website("GitHub", "https://alice.dev"),
561 outcome("GitLab", MatchKind::NotFound),
562 ],
563 10,
564 ))
565 .await;
566
567 let mut finished = handle.finished().await.expect("finished");
568 assert!(finished.identity_clusters.is_empty());
569
570 handle
571 .replace_outcome(found_with_website("GitLab", "https://alice.dev"))
572 .await;
573
574 finished = handle.finished().await.expect("finished");
575 assert_eq!(finished.summary.found, 2);
576 assert_eq!(finished.identity_clusters.len(), 1);
577 assert_eq!(finished.identity_clusters[0].members.len(), 2);
578 }
579}