use std::fmt;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use adler_core::{
CheckOutcome, Client, ExecutorOptions, IdentityCluster, MatchKind, Site, Username,
build_identity_clusters, executor,
};
use serde::{Deserialize, Serialize};
use tokio::sync::{Notify, RwLock, broadcast, mpsc};
use crate::persist::{self, PersistedScan, ScanRequestContext};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ScanId(String);
impl ScanId {
#[must_use]
pub fn new() -> Self {
const ALPHABET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
let mut s = String::with_capacity(12);
for _ in 0..12 {
let idx = fastrand::usize(..ALPHABET.len());
s.push(char::from(ALPHABET[idx]));
}
Self(s)
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
}
impl Default for ScanId {
fn default() -> Self {
Self::new()
}
}
impl fmt::Display for ScanId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.0)
}
}
impl From<String> for ScanId {
fn from(s: String) -> Self {
Self(s)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FinishedScan {
pub summary: Summary,
pub outcomes: Vec<CheckOutcome>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub identity_clusters: Vec<IdentityCluster>,
pub elapsed_ms: u64,
}
impl FinishedScan {
pub(crate) fn from_outcomes(
username: &str,
outcomes: Vec<CheckOutcome>,
elapsed_ms: u64,
) -> Self {
let summary = Summary::from_outcomes(&outcomes);
let identity_clusters = build_identity_clusters(username, &outcomes);
Self {
summary,
outcomes,
identity_clusters,
elapsed_ms,
}
}
pub(crate) fn refresh_identity_clusters(&mut self, username: &str) {
self.identity_clusters = build_identity_clusters(username, &self.outcomes);
}
}
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
pub struct Summary {
pub found: usize,
pub not_found: usize,
pub uncertain: usize,
}
impl Summary {
#[must_use]
pub fn from_outcomes(outcomes: &[CheckOutcome]) -> Self {
let mut s = Self::default();
for o in outcomes {
match o.kind {
MatchKind::Found => s.found += 1,
MatchKind::NotFound => s.not_found += 1,
MatchKind::Uncertain => s.uncertain += 1,
}
}
s
}
#[must_use]
pub const fn total(&self) -> usize {
self.found + self.not_found + self.uncertain
}
}
#[derive(Debug, Clone)]
pub struct ScanHandle {
inner: Arc<Inner>,
}
#[derive(Debug)]
struct Inner {
username: String,
site_count: usize,
started_at: Instant,
created_at_ms: u64,
outcomes: RwLock<Vec<CheckOutcome>>,
finished: RwLock<Option<FinishedScan>>,
tx: broadcast::Sender<usize>,
done: Notify,
}
impl ScanHandle {
#[must_use]
pub fn new(username: impl Into<String>, site_count: usize, outcome_buffer: usize) -> Self {
let (tx, _) = broadcast::channel(outcome_buffer.max(1));
let created_at_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX));
Self {
inner: Arc::new(Inner {
username: username.into(),
site_count,
started_at: Instant::now(),
created_at_ms,
outcomes: RwLock::new(Vec::new()),
finished: RwLock::new(None),
tx,
done: Notify::new(),
}),
}
}
#[must_use]
pub fn username(&self) -> &str {
&self.inner.username
}
#[must_use]
pub fn site_count(&self) -> usize {
self.inner.site_count
}
#[must_use]
pub fn elapsed(&self) -> Duration {
self.inner.started_at.elapsed()
}
#[must_use]
pub fn created_at_ms(&self) -> u64 {
self.inner.created_at_ms
}
pub async fn outcomes_snapshot(&self) -> Vec<CheckOutcome> {
self.inner.outcomes.read().await.clone()
}
pub async fn finished(&self) -> Option<FinishedScan> {
self.inner.finished.read().await.clone()
}
#[must_use]
pub fn is_finished_now(&self) -> bool {
self.inner.finished.try_read().is_ok_and(|g| g.is_some())
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<usize> {
self.inner.tx.subscribe()
}
pub async fn wait_done(&self) {
if self.inner.finished.read().await.is_some() {
return;
}
self.inner.done.notified().await;
}
fn tx(&self) -> broadcast::Sender<usize> {
self.inner.tx.clone()
}
async fn append(&self, outcome: CheckOutcome) {
let mut buf = self.inner.outcomes.write().await;
let idx = buf.len();
buf.push(outcome);
drop(buf);
let _ = self.inner.tx.send(idx);
}
#[allow(clippy::significant_drop_tightening)]
pub(crate) async fn extend_outcomes(&self, carried: Vec<CheckOutcome>) {
if carried.is_empty() {
return;
}
let mut buf = self.inner.outcomes.write().await;
for outcome in carried {
let idx = buf.len();
buf.push(outcome);
let _ = self.inner.tx.send(idx);
}
}
pub(crate) async fn publish(&self, finished: FinishedScan) {
*self.inner.finished.write().await = Some(finished);
self.inner.done.notify_waiters();
}
#[allow(clippy::significant_drop_tightening)]
pub(crate) async fn replace_outcome(&self, new: CheckOutcome) {
let mut guard = self.inner.finished.write().await;
let Some(finished) = guard.as_mut() else {
return;
};
if let Some(slot) = finished.outcomes.iter_mut().find(|o| o.site == new.site) {
*slot = new;
} else {
finished.outcomes.push(new);
}
finished.summary = Summary::from_outcomes(&finished.outcomes);
finished.refresh_identity_clusters(self.username());
}
}
#[derive(Debug, Clone)]
pub(crate) struct PersistContext {
pub scan_id: ScanId,
pub dir: Arc<PathBuf>,
pub request_context: ScanRequestContext,
}
pub(crate) fn spawn(
handle: ScanHandle,
client: Arc<Client>,
sites: Arc<[Site]>,
username: Username,
options: ExecutorOptions,
persist_ctx: Option<PersistContext>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
run(handle, &client, &sites, &username, options, persist_ctx).await;
})
}
async fn run(
handle: ScanHandle,
client: &Client,
sites: &[Site],
username: &Username,
options: ExecutorOptions,
persist_ctx: Option<PersistContext>,
) {
let (tx, mut rx) = mpsc::unbounded_channel::<CheckOutcome>();
let tx_for_cb = tx.clone();
let scan_fut = async move {
let outcomes = executor::run_with_progress(client, sites, username, options, move |o| {
let _ = tx_for_cb.send(o.clone());
})
.await;
drop(tx);
outcomes
};
let handle_ref = handle.clone();
let consume_fut = async move {
while let Some(outcome) = rx.recv().await {
handle_ref.append(outcome).await;
}
};
let (all_outcomes, ()) = tokio::join!(scan_fut, consume_fut);
let elapsed_ms = u64::try_from(handle.elapsed().as_millis()).unwrap_or(u64::MAX);
let finished = FinishedScan::from_outcomes(username.as_str(), all_outcomes, elapsed_ms);
if let Some(ctx) = &persist_ctx {
let snapshot = PersistedScan::from_finished(
ctx.scan_id.clone(),
handle.username().to_owned(),
handle.site_count(),
handle.created_at_ms(),
finished.clone(),
)
.with_request_context(ctx.request_context.clone());
if let Err(err) = persist::save(&ctx.dir, &snapshot).await {
tracing::warn!(error = %err, scan_id = %ctx.scan_id, "failed to persist scan");
} else {
let removed = persist::prune(&ctx.dir, persist::MAX_PERSISTED_SCANS).await;
if removed > 0 {
tracing::debug!(removed, "pruned older persisted scans");
}
}
}
handle.publish(finished).await;
drop(handle.tx()); }
#[cfg(test)]
mod tests {
use super::*;
use adler_core::{ProfileEvidence, UncertainReason};
fn outcome(name: &str, kind: MatchKind) -> CheckOutcome {
CheckOutcome {
site: name.into(),
url: format!("https://{name}.example/u"),
kind,
reason: matches!(kind, MatchKind::Uncertain)
.then_some(UncertainReason::Other("test".into())),
elapsed_ms: 1,
enrichment: std::collections::BTreeMap::new(),
evidence: Vec::new(),
profile_evidence: Vec::new(),
confidence: adler_core::ConfidenceScore::default(),
transport: None,
escalations: 0,
}
}
fn found_with_website(site: &str, website: &str) -> CheckOutcome {
let mut outcome = outcome(site, MatchKind::Found);
outcome
.profile_evidence
.push(ProfileEvidence::from_enrichment(
site,
&outcome.url,
"website",
website,
));
outcome
}
#[test]
fn summary_tallies_by_verdict() {
let s = Summary::from_outcomes(&[
outcome("a", MatchKind::Found),
outcome("b", MatchKind::NotFound),
outcome("c", MatchKind::NotFound),
outcome("d", MatchKind::Uncertain),
]);
assert_eq!(s.found, 1);
assert_eq!(s.not_found, 2);
assert_eq!(s.uncertain, 1);
assert_eq!(s.total(), 4);
}
#[test]
fn scan_id_is_url_safe_and_random() {
let a = ScanId::new();
let b = ScanId::new();
assert_eq!(a.as_str().len(), 12);
assert!(
a.as_str()
.chars()
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit()),
);
assert_ne!(a, b);
}
#[test]
fn finished_scan_includes_identity_clusters() {
let finished = FinishedScan::from_outcomes(
"alice",
vec![
found_with_website("GitHub", "https://alice.dev"),
found_with_website("GitLab", "https://alice.dev"),
],
42,
);
assert_eq!(finished.summary.found, 2);
assert_eq!(finished.identity_clusters.len(), 1);
assert_eq!(finished.identity_clusters[0].members.len(), 2);
}
#[tokio::test]
async fn append_publishes_to_subscribers_and_history() {
let handle = ScanHandle::new("alice", 2, 16);
let mut rx = handle.subscribe();
handle.append(outcome("GitHub", MatchKind::Found)).await;
handle.append(outcome("GitLab", MatchKind::NotFound)).await;
let snap = handle.outcomes_snapshot().await;
assert_eq!(snap.len(), 2);
assert_eq!(snap[0].site, "GitHub");
assert_eq!(snap[1].site, "GitLab");
assert_eq!(rx.recv().await.unwrap(), 0);
assert_eq!(rx.recv().await.unwrap(), 1);
}
#[tokio::test]
async fn publish_releases_wait_done_and_exposes_finished() {
let handle = ScanHandle::new("alice", 1, 4);
let waiter = {
let h = handle.clone();
tokio::spawn(async move { h.wait_done().await })
};
tokio::task::yield_now().await;
handle
.publish(FinishedScan::from_outcomes(
"alice",
vec![outcome("GitHub", MatchKind::Found)],
42,
))
.await;
waiter.await.unwrap();
let f = handle.finished().await.expect("finished");
assert_eq!(f.summary.found, 1);
assert_eq!(f.elapsed_ms, 42);
assert_eq!(f.outcomes.len(), 1);
}
#[tokio::test]
async fn wait_done_returns_immediately_if_already_finished() {
let handle = ScanHandle::new("alice", 1, 4);
handle
.publish(FinishedScan::from_outcomes("alice", Vec::new(), 0))
.await;
tokio::time::timeout(Duration::from_millis(100), handle.wait_done())
.await
.expect("wait_done must return immediately when already finished");
}
#[tokio::test]
async fn replace_outcome_recomputes_identity_clusters() {
let handle = ScanHandle::new("alice", 2, 4);
handle
.publish(FinishedScan::from_outcomes(
"alice",
vec![
found_with_website("GitHub", "https://alice.dev"),
outcome("GitLab", MatchKind::NotFound),
],
10,
))
.await;
let mut finished = handle.finished().await.expect("finished");
assert!(finished.identity_clusters.is_empty());
handle
.replace_outcome(found_with_website("GitLab", "https://alice.dev"))
.await;
finished = handle.finished().await.expect("finished");
assert_eq!(finished.summary.found, 2);
assert_eq!(finished.identity_clusters.len(), 1);
assert_eq!(finished.identity_clusters[0].members.len(), 2);
}
}