use std::fmt;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use adler_core::{CheckOutcome, Client, ExecutorOptions, MatchKind, Site, Username, executor};
use serde::{Deserialize, Serialize};
use tokio::sync::{Notify, RwLock, broadcast, mpsc};
use crate::persist::{self, PersistedScan};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ScanId(String);
impl ScanId {
#[must_use]
pub fn new() -> Self {
const ALPHABET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
let mut s = String::with_capacity(12);
for _ in 0..12 {
let idx = fastrand::usize(..ALPHABET.len());
s.push(char::from(ALPHABET[idx]));
}
Self(s)
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
}
impl Default for ScanId {
fn default() -> Self {
Self::new()
}
}
impl fmt::Display for ScanId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.0)
}
}
impl From<String> for ScanId {
fn from(s: String) -> Self {
Self(s)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FinishedScan {
pub summary: Summary,
pub outcomes: Vec<CheckOutcome>,
pub elapsed_ms: u64,
}
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
pub struct Summary {
pub found: usize,
pub not_found: usize,
pub uncertain: usize,
}
impl Summary {
#[must_use]
pub fn from_outcomes(outcomes: &[CheckOutcome]) -> Self {
let mut s = Self::default();
for o in outcomes {
match o.kind {
MatchKind::Found => s.found += 1,
MatchKind::NotFound => s.not_found += 1,
MatchKind::Uncertain => s.uncertain += 1,
}
}
s
}
#[must_use]
pub const fn total(&self) -> usize {
self.found + self.not_found + self.uncertain
}
}
#[derive(Debug, Clone)]
pub struct ScanHandle {
inner: Arc<Inner>,
}
#[derive(Debug)]
struct Inner {
username: String,
site_count: usize,
started_at: Instant,
created_at_ms: u64,
outcomes: RwLock<Vec<CheckOutcome>>,
finished: RwLock<Option<FinishedScan>>,
tx: broadcast::Sender<usize>,
done: Notify,
}
impl ScanHandle {
#[must_use]
pub fn new(username: impl Into<String>, site_count: usize, outcome_buffer: usize) -> Self {
let (tx, _) = broadcast::channel(outcome_buffer.max(1));
let created_at_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX));
Self {
inner: Arc::new(Inner {
username: username.into(),
site_count,
started_at: Instant::now(),
created_at_ms,
outcomes: RwLock::new(Vec::new()),
finished: RwLock::new(None),
tx,
done: Notify::new(),
}),
}
}
#[must_use]
pub fn username(&self) -> &str {
&self.inner.username
}
#[must_use]
pub fn site_count(&self) -> usize {
self.inner.site_count
}
#[must_use]
pub fn elapsed(&self) -> Duration {
self.inner.started_at.elapsed()
}
#[must_use]
pub fn created_at_ms(&self) -> u64 {
self.inner.created_at_ms
}
pub async fn outcomes_snapshot(&self) -> Vec<CheckOutcome> {
self.inner.outcomes.read().await.clone()
}
pub async fn finished(&self) -> Option<FinishedScan> {
self.inner.finished.read().await.clone()
}
#[must_use]
pub fn is_finished_now(&self) -> bool {
self.inner.finished.try_read().is_ok_and(|g| g.is_some())
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<usize> {
self.inner.tx.subscribe()
}
pub async fn wait_done(&self) {
if self.inner.finished.read().await.is_some() {
return;
}
self.inner.done.notified().await;
}
fn tx(&self) -> broadcast::Sender<usize> {
self.inner.tx.clone()
}
async fn append(&self, outcome: CheckOutcome) {
let mut buf = self.inner.outcomes.write().await;
let idx = buf.len();
buf.push(outcome);
drop(buf);
let _ = self.inner.tx.send(idx);
}
#[allow(clippy::significant_drop_tightening)]
pub(crate) async fn extend_outcomes(&self, carried: Vec<CheckOutcome>) {
if carried.is_empty() {
return;
}
let mut buf = self.inner.outcomes.write().await;
for outcome in carried {
let idx = buf.len();
buf.push(outcome);
let _ = self.inner.tx.send(idx);
}
}
pub(crate) async fn publish(&self, finished: FinishedScan) {
*self.inner.finished.write().await = Some(finished);
self.inner.done.notify_waiters();
}
#[allow(clippy::significant_drop_tightening)]
pub(crate) async fn replace_outcome(&self, new: CheckOutcome) {
let mut guard = self.inner.finished.write().await;
let Some(finished) = guard.as_mut() else {
return;
};
if let Some(slot) = finished.outcomes.iter_mut().find(|o| o.site == new.site) {
*slot = new;
} else {
finished.outcomes.push(new);
}
finished.summary = Summary::from_outcomes(&finished.outcomes);
}
}
#[derive(Debug, Clone)]
pub(crate) struct PersistContext {
pub scan_id: ScanId,
pub dir: Arc<PathBuf>,
}
pub(crate) fn spawn(
handle: ScanHandle,
client: Arc<Client>,
sites: Arc<[Site]>,
username: Username,
options: ExecutorOptions,
persist_ctx: Option<PersistContext>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
run(handle, &client, &sites, &username, options, persist_ctx).await;
})
}
async fn run(
handle: ScanHandle,
client: &Client,
sites: &[Site],
username: &Username,
options: ExecutorOptions,
persist_ctx: Option<PersistContext>,
) {
let (tx, mut rx) = mpsc::unbounded_channel::<CheckOutcome>();
let tx_for_cb = tx.clone();
let scan_fut = async move {
let outcomes = executor::run_with_progress(client, sites, username, options, move |o| {
let _ = tx_for_cb.send(o.clone());
})
.await;
drop(tx);
outcomes
};
let handle_ref = handle.clone();
let consume_fut = async move {
while let Some(outcome) = rx.recv().await {
handle_ref.append(outcome).await;
}
};
let (all_outcomes, ()) = tokio::join!(scan_fut, consume_fut);
let elapsed_ms = u64::try_from(handle.elapsed().as_millis()).unwrap_or(u64::MAX);
let summary = Summary::from_outcomes(&all_outcomes);
let finished = FinishedScan {
summary,
outcomes: all_outcomes,
elapsed_ms,
};
if let Some(ctx) = &persist_ctx {
let snapshot = PersistedScan::from_finished(
ctx.scan_id.clone(),
handle.username().to_owned(),
handle.site_count(),
handle.created_at_ms(),
finished.clone(),
);
if let Err(err) = persist::save(&ctx.dir, &snapshot).await {
tracing::warn!(error = %err, scan_id = %ctx.scan_id, "failed to persist scan");
} else {
let removed = persist::prune(&ctx.dir, persist::MAX_PERSISTED_SCANS).await;
if removed > 0 {
tracing::debug!(removed, "pruned older persisted scans");
}
}
}
handle.publish(finished).await;
drop(handle.tx()); }
#[cfg(test)]
mod tests {
use super::*;
use adler_core::UncertainReason;
fn outcome(name: &str, kind: MatchKind) -> CheckOutcome {
CheckOutcome {
site: name.into(),
url: format!("https://{name}.example/u"),
kind,
reason: matches!(kind, MatchKind::Uncertain)
.then_some(UncertainReason::Other("test".into())),
elapsed_ms: 1,
enrichment: std::collections::BTreeMap::new(),
evidence: Vec::new(),
transport: None,
escalations: 0,
}
}
#[test]
fn summary_tallies_by_verdict() {
let s = Summary::from_outcomes(&[
outcome("a", MatchKind::Found),
outcome("b", MatchKind::NotFound),
outcome("c", MatchKind::NotFound),
outcome("d", MatchKind::Uncertain),
]);
assert_eq!(s.found, 1);
assert_eq!(s.not_found, 2);
assert_eq!(s.uncertain, 1);
assert_eq!(s.total(), 4);
}
#[test]
fn scan_id_is_url_safe_and_random() {
let a = ScanId::new();
let b = ScanId::new();
assert_eq!(a.as_str().len(), 12);
assert!(
a.as_str()
.chars()
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit()),
);
assert_ne!(a, b);
}
#[tokio::test]
async fn append_publishes_to_subscribers_and_history() {
let handle = ScanHandle::new("alice", 2, 16);
let mut rx = handle.subscribe();
handle.append(outcome("GitHub", MatchKind::Found)).await;
handle.append(outcome("GitLab", MatchKind::NotFound)).await;
let snap = handle.outcomes_snapshot().await;
assert_eq!(snap.len(), 2);
assert_eq!(snap[0].site, "GitHub");
assert_eq!(snap[1].site, "GitLab");
assert_eq!(rx.recv().await.unwrap(), 0);
assert_eq!(rx.recv().await.unwrap(), 1);
}
#[tokio::test]
async fn publish_releases_wait_done_and_exposes_finished() {
let handle = ScanHandle::new("alice", 1, 4);
let waiter = {
let h = handle.clone();
tokio::spawn(async move { h.wait_done().await })
};
tokio::task::yield_now().await;
handle
.publish(FinishedScan {
summary: Summary {
found: 1,
not_found: 0,
uncertain: 0,
},
outcomes: vec![outcome("GitHub", MatchKind::Found)],
elapsed_ms: 42,
})
.await;
waiter.await.unwrap();
let f = handle.finished().await.expect("finished");
assert_eq!(f.summary.found, 1);
assert_eq!(f.elapsed_ms, 42);
assert_eq!(f.outcomes.len(), 1);
}
#[tokio::test]
async fn wait_done_returns_immediately_if_already_finished() {
let handle = ScanHandle::new("alice", 1, 4);
handle
.publish(FinishedScan {
summary: Summary::default(),
outcomes: Vec::new(),
elapsed_ms: 0,
})
.await;
tokio::time::timeout(Duration::from_millis(100), handle.wait_done())
.await
.expect("wait_done must return immediately when already finished");
}
}