Skip to main content

adler_server/
api.rs

1//! Axum router and request handlers.
2//!
3//! Routes:
4//!
5//! - `GET  /api/health`           — liveness probe (returns `{ "ok": true }`).
6//! - `GET  /api/sites`            — site catalogue available to scans.
7//! - `POST /api/scan`             — start a scan; returns a [`ScanId`].
8//! - `GET  /api/scan/:id`         — final aggregate (or 404 / 202 in-progress).
9//! - `GET  /api/scan/:id/stream`  — Server-Sent Events stream of outcomes.
10//!
11//! All endpoints emit JSON. Errors carry a stable `{ "error": "<code>",
12//! "message": "<human>" }` shape so the `SolidJS` frontend can branch on
13//! `error` without parsing free-text.
14
15use std::collections::HashMap;
16use std::convert::Infallible;
17use std::sync::Arc;
18use std::time::Duration;
19
20use adler_core::{CheckOutcome, ExecutorOptions, Site, Username};
21use async_stream::stream;
22use axum::Json;
23use axum::Router;
24use axum::extract::{Path as AxumPath, State};
25use axum::http::StatusCode;
26use axum::response::sse::{Event, KeepAlive, KeepAliveStream, Sse};
27use axum::response::{IntoResponse, Response};
28use axum::routing::{get, post};
29use futures::Stream;
30use serde::{Deserialize, Serialize};
31use tower_http::cors::{Any, CorsLayer};
32use tower_http::trace::TraceLayer;
33
34use crate::scan::{FinishedScan, ScanHandle, ScanId};
35use crate::state::AppState;
36
37/// Build the axum router. Public so test harnesses can drive it
38/// directly without going through [`crate::serve`].
39pub fn router(state: AppState) -> Router {
40    Router::new()
41        .route("/api/health", get(health))
42        .route("/api/sites", get(list_sites))
43        .route("/api/access", get(list_access))
44        .route("/api/scans", get(list_scans))
45        .route("/api/scan", post(start_scan))
46        .route("/api/scan/{id}", get(get_scan))
47        .route("/api/scan/{id}/stream", get(stream_scan))
48        .route("/api/scan/{id}/retry", post(retry_site))
49        .route("/api/scan/{id}/refilter", post(refilter_scan))
50        .layer(
51            CorsLayer::new()
52                .allow_origin(Any)
53                .allow_methods(Any)
54                .allow_headers(Any),
55        )
56        .layer(TraceLayer::new_for_http())
57        .with_state(state)
58}
59
60#[derive(Serialize)]
61struct Health {
62    ok: bool,
63    version: &'static str,
64}
65
66async fn health() -> Json<Health> {
67    Json(Health {
68        ok: true,
69        version: env!("CARGO_PKG_VERSION"),
70    })
71}
72
73/// Site summary returned by `GET /api/sites`. Strictly smaller than the
74/// internal [`Site`] — we don't leak detection signals, just what a UI
75/// needs to render a filter list.
76#[derive(Serialize)]
77struct SiteSummary {
78    name: String,
79    url: String,
80    tags: Vec<String>,
81    #[serde(skip_serializing_if = "Option::is_none")]
82    popularity: Option<u32>,
83}
84
85impl From<&Site> for SiteSummary {
86    fn from(s: &Site) -> Self {
87        Self {
88            name: s.name.clone(),
89            url: s.url.as_str().to_owned(),
90            tags: s.tags.clone(),
91            popularity: s.popularity,
92        }
93    }
94}
95
96async fn list_sites(State(state): State<AppState>) -> Json<Vec<SiteSummary>> {
97    Json(state.sites.iter().map(SiteSummary::from).collect())
98}
99
100/// Read-only view of the access engine's runtime config — what's
101/// configured via `--proxy-pool` and `--sessions`, *without* leaking
102/// any secrets the operator supplied:
103///   - egress entries surface only `(country, kind)` — proxy URLs
104///     typically embed credentials (`socks5://user:pass@host:1080`),
105///     so we never put them in the response;
106///   - sessions surface only their *names* — session header values
107///     are cookies / auth tokens that have no business reaching a
108///     browser over this HTTP API.
109///
110/// Editing happens out-of-band: the operator updates the pool / session
111/// TOML files and restarts the server. The SPA exposes this view as a
112/// read-only panel so an operator can confirm what's loaded without
113/// shell access to the server.
114#[derive(Serialize)]
115struct AccessSummary {
116    egress: Vec<adler_core::EgressSummary>,
117    sessions: Vec<SessionName>,
118}
119
120#[derive(Serialize)]
121struct SessionName {
122    name: String,
123}
124
125async fn list_access(State(state): State<AppState>) -> Json<AccessSummary> {
126    let egress = state.client.egress_summary();
127    let sessions = state
128        .client
129        .session_names()
130        .into_iter()
131        .map(|name| SessionName { name })
132        .collect();
133    Json(AccessSummary { egress, sessions })
134}
135
136/// One row in `GET /api/scans`.
137#[derive(Serialize)]
138struct ScanListEntry {
139    scan_id: ScanId,
140    username: String,
141    site_count: usize,
142    /// Unix epoch milliseconds when the scan was started.
143    started_at_ms: u64,
144    elapsed_ms: u64,
145    /// `"running"` or `"finished"`.
146    status: &'static str,
147    /// Counts present only when `status == "finished"`.
148    #[serde(skip_serializing_if = "Option::is_none")]
149    summary: Option<crate::scan::Summary>,
150}
151
152async fn list_scans(State(state): State<AppState>) -> Json<Vec<ScanListEntry>> {
153    // Snapshot the in-memory handles out from under the lock so the
154    // per-handle `.finished()` awaits don't serialise on the outer
155    // map's read guard.
156    let handles: Vec<(ScanId, ScanHandle)> = {
157        let scans = state.scans.read().await;
158        scans
159            .iter()
160            .map(|(id, h)| (id.clone(), h.clone()))
161            .collect()
162    };
163    let mut by_id: HashMap<ScanId, ScanListEntry> = HashMap::with_capacity(handles.len());
164    for (id, handle) in handles {
165        let finished = handle.finished().await;
166        by_id.insert(
167            id.clone(),
168            ScanListEntry {
169                scan_id: id,
170                username: handle.username().to_owned(),
171                site_count: handle.site_count(),
172                started_at_ms: handle.created_at_ms(),
173                elapsed_ms: u64::try_from(handle.elapsed().as_millis()).unwrap_or(u64::MAX),
174                status: if finished.is_some() {
175                    "finished"
176                } else {
177                    "running"
178                },
179                summary: finished.map(|f| f.summary),
180            },
181        );
182    }
183    // Layer in on-disk archive (older scans evicted from memory).
184    // In-memory entries always win — they may still be running.
185    if let Some(dir) = &state.scans_dir {
186        for ps in crate::persist::load_all(dir).await {
187            by_id.entry(ps.scan_id.clone()).or_insert(ScanListEntry {
188                scan_id: ps.scan_id,
189                username: ps.username,
190                site_count: ps.site_count,
191                started_at_ms: ps.created_at_ms,
192                elapsed_ms: ps.elapsed_ms,
193                status: "finished",
194                summary: Some(ps.summary),
195            });
196        }
197    }
198    let mut entries: Vec<ScanListEntry> = by_id.into_values().collect();
199    // Newest first — convenient for a history sidebar.
200    entries.sort_by_key(|e| std::cmp::Reverse(e.started_at_ms));
201    Json(entries)
202}
203
204/// Request body for `POST /api/scan`.
205///
206/// Filter fields mirror the CLI flags one-for-one (`--only`,
207/// `--exclude`, `--tag`, `--exclude-tag`, `--top`, `--nsfw`). All are
208/// optional; omitting them runs the full catalog the server was
209/// launched with.
210#[derive(Debug, Deserialize, Default)]
211struct StartScanRequest {
212    username: String,
213    /// Only sites whose name contains one of these substrings
214    /// (case-insensitive). Empty = no name include filter.
215    #[serde(default)]
216    only: Vec<String>,
217    /// Skip sites whose name contains any of these substrings.
218    #[serde(default)]
219    exclude: Vec<String>,
220    /// Only sites carrying one of these tags. Empty = no tag filter.
221    /// Sites with no tags are excluded when this is non-empty.
222    #[serde(default)]
223    tag: Vec<String>,
224    /// Skip sites carrying any of these tags.
225    #[serde(default)]
226    exclude_tag: Vec<String>,
227    /// Restrict to ranked sites within the top N most-popular, sorted
228    /// by rank. Sites without a `popularity` rank are dropped.
229    #[serde(default)]
230    top: Option<u32>,
231    /// Include sites tagged `nsfw`. Default false — matches the CLI.
232    #[serde(default)]
233    nsfw: bool,
234    /// Optional per-scan concurrency override. Falls back to the
235    /// executor's default if omitted.
236    #[serde(default)]
237    concurrency: Option<std::num::NonZeroUsize>,
238    /// Optional total scan deadline in seconds.
239    #[serde(default)]
240    deadline_secs: Option<u64>,
241    /// Subset of the configured egress pool to use for this scan,
242    /// selected by `name`. Empty (or omitted) uses the full pool.
243    /// Unknown names → 400 `unknown_egress`. Sites whose access policy
244    /// can't be satisfied by the chosen subset land in
245    /// `Uncertain(geo_unavailable)` — the same honest verdict the engine
246    /// returns when a constrained policy can't be matched at all.
247    #[serde(default)]
248    egress_names: Vec<String>,
249}
250
251#[derive(Serialize)]
252struct StartScanResponse {
253    scan_id: ScanId,
254    username: String,
255    site_count: usize,
256}
257
258/// Apply per-scan name/tag/popularity filters to a catalog slice.
259///
260/// Mirrors [`adler_core::Registry::filter`] semantics but works on a
261/// `&[Site]` so it can compose with the catalog already filtered at
262/// server startup.
263fn filter_catalog(catalog: &[Site], req: &StartScanRequest) -> Vec<Site> {
264    let only_lc: Vec<String> = req.only.iter().map(|s| s.to_lowercase()).collect();
265    let exclude_lc: Vec<String> = req.exclude.iter().map(|s| s.to_lowercase()).collect();
266    let tag_set: std::collections::HashSet<&str> = req.tag.iter().map(String::as_str).collect();
267    let exclude_tag_set: std::collections::HashSet<&str> =
268        req.exclude_tag.iter().map(String::as_str).collect();
269
270    let mut filtered: Vec<Site> = catalog
271        .iter()
272        .filter(|s| {
273            let name_lc = s.name.to_lowercase();
274            if !only_lc.is_empty() && !only_lc.iter().any(|n| name_lc.contains(n)) {
275                return false;
276            }
277            if exclude_lc.iter().any(|n| name_lc.contains(n)) {
278                return false;
279            }
280            if !tag_set.is_empty() {
281                if s.tags.is_empty() {
282                    return false;
283                }
284                if !s.tags.iter().any(|t| tag_set.contains(t.as_str())) {
285                    return false;
286                }
287            }
288            if s.tags.iter().any(|t| exclude_tag_set.contains(t.as_str())) {
289                return false;
290            }
291            if !req.nsfw && s.tags.iter().any(|t| t == "nsfw") {
292                return false;
293            }
294            true
295        })
296        .cloned()
297        .collect();
298
299    if let Some(n) = req.top {
300        filtered.retain(|s| s.popularity.is_some_and(|p| p <= n));
301        filtered.sort_by_key(|s| s.popularity.unwrap_or(u32::MAX));
302    }
303    filtered
304}
305
306async fn start_scan(
307    State(state): State<AppState>,
308    Json(req): Json<StartScanRequest>,
309) -> Result<Json<StartScanResponse>, ApiError> {
310    let username = Username::new(req.username.clone())
311        .map_err(|e| ApiError::bad_request("invalid_username", e.to_string()))?;
312
313    let sites = filter_catalog(&state.sites, &req);
314    if sites.is_empty() {
315        return Err(ApiError::bad_request(
316            "empty_site_filter",
317            "no sites match the requested filter",
318        ));
319    }
320
321    // Validate per-scan egress subset (if any) against the configured
322    // pool. Unknown names are rejected at the boundary rather than
323    // silently dropping to "no egress matched" — that would make a
324    // user-facing typo look like a deeper config problem.
325    if !req.egress_names.is_empty() {
326        let known: std::collections::HashSet<String> =
327            state.client.egress_names().into_iter().collect();
328        let bad: Vec<&String> = req
329            .egress_names
330            .iter()
331            .filter(|n| !known.contains(n.as_str()))
332            .collect();
333        if !bad.is_empty() {
334            let names: Vec<&str> = bad.iter().map(|s| s.as_str()).collect();
335            return Err(ApiError::bad_request(
336                "unknown_egress",
337                format!("egress not in pool: {}", names.join(", ")),
338            ));
339        }
340    }
341
342    let mut options = ExecutorOptions::default();
343    if let Some(c) = req.concurrency {
344        options = options.concurrency(c);
345    }
346    if let Some(d) = req.deadline_secs {
347        options = options.deadline(Duration::from_secs(d));
348    }
349
350    let id = ScanId::new();
351    let site_count = sites.len();
352    let handle = ScanHandle::new(req.username.clone(), site_count, site_count.max(64));
353    state.insert_scan(id.clone(), handle.clone()).await;
354
355    let persist_ctx = state
356        .scans_dir
357        .as_ref()
358        .map(|dir| crate::scan::PersistContext {
359            scan_id: id.clone(),
360            dir: dir.clone(),
361        });
362
363    // Per-scan client: when egress_names is non-empty, swap the pool
364    // for a subset. The new Client shares all other state (throttle,
365    // sessions, budgets) with the parent. When egress_names is empty,
366    // skip the wrap entirely so the shared default client is re-used.
367    let scan_client: Arc<adler_core::Client> = if req.egress_names.is_empty() {
368        state.client.clone()
369    } else {
370        Arc::new(state.client.with_egress_subset(&req.egress_names))
371    };
372
373    let task = crate::scan::spawn(
374        handle,
375        scan_client,
376        Arc::from(sites.into_boxed_slice()),
377        username,
378        options,
379        persist_ctx,
380    );
381    state.register_scan_task(id.clone(), task).await;
382
383    Ok(Json(StartScanResponse {
384        scan_id: id,
385        username: req.username,
386        site_count,
387    }))
388}
389
390/// Body for `POST /api/scan/:id/refilter`.
391///
392/// Mirrors [`StartScanRequest`] minus the `username` (carried over from
393/// the existing scan). The active scan is cancelled and replaced with a
394/// fresh one driven by the new filter; outcomes for sites that appear
395/// in both the old and new site lists carry over unchanged, so the
396/// operator pays only for newly-in-scope sites.
397#[derive(Debug, Deserialize, Default)]
398struct RefilterRequest {
399    #[serde(default)]
400    only: Vec<String>,
401    #[serde(default)]
402    exclude: Vec<String>,
403    #[serde(default)]
404    tag: Vec<String>,
405    #[serde(default)]
406    exclude_tag: Vec<String>,
407    #[serde(default)]
408    top: Option<u32>,
409    #[serde(default)]
410    nsfw: bool,
411    #[serde(default)]
412    concurrency: Option<std::num::NonZeroUsize>,
413    #[serde(default)]
414    deadline_secs: Option<u64>,
415    #[serde(default)]
416    egress_names: Vec<String>,
417}
418
419impl From<&RefilterRequest> for StartScanRequest {
420    fn from(r: &RefilterRequest) -> Self {
421        Self {
422            username: String::new(), // filled in by caller; refilter reuses username from existing scan
423            only: r.only.clone(),
424            exclude: r.exclude.clone(),
425            tag: r.tag.clone(),
426            exclude_tag: r.exclude_tag.clone(),
427            top: r.top,
428            nsfw: r.nsfw,
429            concurrency: r.concurrency,
430            deadline_secs: r.deadline_secs,
431            egress_names: r.egress_names.clone(),
432        }
433    }
434}
435
436#[derive(Serialize)]
437struct RefilterResponse {
438    /// Fresh scan id. The SPA switches its SSE stream over to this id.
439    scan_id: ScanId,
440    /// Predecessor whose outcomes were carried into the new scan.
441    derived_from: ScanId,
442    /// Number of outcomes pre-populated from the predecessor (the
443    /// "overlap"). Zero when the new filter shares no completed sites
444    /// with the old.
445    carried_outcomes: usize,
446    /// Total site count for the new scan (`carried_outcomes` already
447    /// recorded + sites still to probe).
448    site_count: usize,
449}
450
451/// Cancel an in-flight scan and replace it with a successor driven by
452/// a new filter, carrying over outcomes for sites the two filters share.
453///
454/// Outcomes already on disk for the old scan stay there; nothing about
455/// the historic record is rewritten. The new scan is a fresh entry in
456/// `state.scans` with its own id. A finished scan can't be refiltered —
457/// just call `POST /api/scan` to start a fresh one instead.
458async fn refilter_scan(
459    State(state): State<AppState>,
460    AxumPath(id): AxumPath<String>,
461    Json(req): Json<RefilterRequest>,
462) -> Result<Json<RefilterResponse>, ApiError> {
463    let prev_id = ScanId::from(id);
464    let prev_handle = state
465        .get_scan(&prev_id)
466        .await
467        .ok_or_else(|| ApiError::not_found("scan_not_found", "no scan with that ID"))?;
468
469    if prev_handle.is_finished_now() {
470        return Err(ApiError::bad_request(
471            "scan_finished",
472            "scan has already finished; start a new one with POST /api/scan",
473        ));
474    }
475
476    // Pre-validate egress subset against the live pool. Same boundary
477    // check as `start_scan`; rejecting a typo before cancelling the
478    // running scan avoids "operator clicks Apply, scan dies, then sees
479    // the error" surprise.
480    if !req.egress_names.is_empty() {
481        let known: std::collections::HashSet<String> =
482            state.client.egress_names().into_iter().collect();
483        let bad: Vec<&String> = req
484            .egress_names
485            .iter()
486            .filter(|n| !known.contains(n.as_str()))
487            .collect();
488        if !bad.is_empty() {
489            let names: Vec<&str> = bad.iter().map(|s| s.as_str()).collect();
490            return Err(ApiError::bad_request(
491                "unknown_egress",
492                format!("egress not in pool: {}", names.join(", ")),
493            ));
494        }
495    }
496
497    // Resolve new filter against the catalog.
498    let start_shape = StartScanRequest::from(&req);
499    let new_sites = filter_catalog(&state.sites, &start_shape);
500    if new_sites.is_empty() {
501        return Err(ApiError::bad_request(
502            "empty_site_filter",
503            "no sites match the requested filter",
504        ));
505    }
506
507    // Snapshot the predecessor's outcomes; partition by whether the
508    // site is still in the new filter. Sites in both → carried over;
509    // sites in the new filter but not yet probed → spawn task probes
510    // them; sites only in the old filter → dropped (the operator
511    // narrowed scope deliberately).
512    let prev_outcomes = prev_handle.outcomes_snapshot().await;
513    let new_site_names: std::collections::HashSet<String> =
514        new_sites.iter().map(|s| s.name.clone()).collect();
515    let carried: Vec<adler_core::CheckOutcome> = prev_outcomes
516        .into_iter()
517        .filter(|o| new_site_names.contains(&o.site))
518        .collect();
519    let carried_names: std::collections::HashSet<String> =
520        carried.iter().map(|o| o.site.clone()).collect();
521    let sites_to_probe: Vec<Site> = new_sites
522        .iter()
523        .filter(|s| !carried_names.contains(&s.name))
524        .cloned()
525        .collect();
526
527    // Abort the predecessor *after* the snapshot so a probe that
528    // finishes between snapshot and abort can't sneak into the new
529    // scan as a duplicate.
530    state.abort_scan(&prev_id).await;
531
532    // Apply per-scan executor knobs.
533    let mut options = ExecutorOptions::default();
534    if let Some(c) = req.concurrency {
535        options = options.concurrency(c);
536    }
537    if let Some(d) = req.deadline_secs {
538        options = options.deadline(Duration::from_secs(d));
539    }
540
541    let username_str = prev_handle.username().to_owned();
542    let username = Username::new(username_str.clone())
543        .map_err(|e| ApiError::bad_request("invalid_username", e.to_string()))?;
544
545    let id = ScanId::new();
546    let site_count = new_sites.len();
547    let handle = ScanHandle::new(username_str.clone(), site_count, site_count.max(64));
548    state.insert_scan(id.clone(), handle.clone()).await;
549
550    // Pre-populate the new handle with the carried-over outcomes so a
551    // subscriber that connects after the refilter sees them
552    // immediately via the same `index N appended` events the
553    // executor produces.
554    handle.extend_outcomes(carried.clone()).await;
555
556    let persist_ctx = state
557        .scans_dir
558        .as_ref()
559        .map(|dir| crate::scan::PersistContext {
560            scan_id: id.clone(),
561            dir: dir.clone(),
562        });
563
564    let scan_client: Arc<adler_core::Client> = if req.egress_names.is_empty() {
565        state.client.clone()
566    } else {
567        Arc::new(state.client.with_egress_subset(&req.egress_names))
568    };
569
570    let task = crate::scan::spawn(
571        handle,
572        scan_client,
573        Arc::from(sites_to_probe.into_boxed_slice()),
574        username,
575        options,
576        persist_ctx,
577    );
578    state.register_scan_task(id.clone(), task).await;
579
580    Ok(Json(RefilterResponse {
581        scan_id: id,
582        derived_from: prev_id,
583        carried_outcomes: carried.len(),
584        site_count,
585    }))
586}
587
588/// Snapshot returned by `GET /api/scan/:id`.
589///
590/// Both variants carry `username` and `site_count` so the UI can
591/// render progress and breadcrumbs without cross-referencing the
592/// history endpoint.
593#[derive(Serialize)]
594#[serde(tag = "status", rename_all = "snake_case")]
595enum ScanSnapshot {
596    /// Scan is still running. Outcomes recorded so far are included so
597    /// a poller can render progress without holding an SSE stream open.
598    Running {
599        username: String,
600        site_count: usize,
601        elapsed_ms: u64,
602        partial: Vec<adler_core::CheckOutcome>,
603    },
604    /// Scan has completed; full aggregate.
605    Finished {
606        username: String,
607        site_count: usize,
608        #[serde(flatten)]
609        finished: FinishedScan,
610    },
611}
612
613async fn get_scan(
614    State(state): State<AppState>,
615    AxumPath(id): AxumPath<String>,
616) -> Result<Json<ScanSnapshot>, ApiError> {
617    let scan_id = ScanId::from(id);
618    if let Some(scan) = state.get_scan(&scan_id).await {
619        return Ok(match scan.finished().await {
620            Some(finished) => Json(ScanSnapshot::Finished {
621                username: scan.username().to_owned(),
622                site_count: scan.site_count(),
623                finished,
624            }),
625            None => Json(ScanSnapshot::Running {
626                username: scan.username().to_owned(),
627                site_count: scan.site_count(),
628                elapsed_ms: u64::try_from(scan.elapsed().as_millis()).unwrap_or(u64::MAX),
629                partial: scan.outcomes_snapshot().await,
630            }),
631        });
632    }
633    // Fall back to on-disk archive.
634    if let Some(dir) = &state.scans_dir {
635        if let Some(ps) = crate::persist::load(dir, &scan_id).await {
636            return Ok(Json(ScanSnapshot::Finished {
637                username: ps.username,
638                site_count: ps.site_count,
639                finished: crate::scan::FinishedScan {
640                    summary: ps.summary,
641                    outcomes: ps.outcomes,
642                    elapsed_ms: ps.elapsed_ms,
643                },
644            }));
645        }
646    }
647    Err(ApiError::not_found(
648        "scan_not_found",
649        "no scan with that ID",
650    ))
651}
652
653/// Boxed alias used by [`stream_scan`] to unify two same-Item streams
654/// (live broadcast vs. on-disk replay) under a single return type.
655type SseStream = std::pin::Pin<Box<dyn Stream<Item = Result<Event, Infallible>> + Send>>;
656
657/// `POST /api/scan/:id/retry` — re-probe a single site from a
658/// finished scan and replace its outcome.
659#[derive(Debug, Deserialize)]
660struct RetryRequest {
661    /// Name of the site to re-probe (must match `Site::name`).
662    site: String,
663}
664
665#[derive(Serialize)]
666struct RetryResponse {
667    outcome: CheckOutcome,
668}
669
670async fn retry_site(
671    State(state): State<AppState>,
672    AxumPath(id): AxumPath<String>,
673    Json(req): Json<RetryRequest>,
674) -> Result<Json<RetryResponse>, ApiError> {
675    let scan_id = ScanId::from(id);
676
677    // Locate the username for this scan — in-memory first, then disk.
678    let username_raw: String = if let Some(handle) = state.get_scan(&scan_id).await {
679        handle.username().to_owned()
680    } else if let Some(dir) = &state.scans_dir {
681        if let Some(ps) = crate::persist::load(dir, &scan_id).await {
682            ps.username
683        } else {
684            return Err(ApiError::not_found(
685                "scan_not_found",
686                "no scan with that ID",
687            ));
688        }
689    } else {
690        return Err(ApiError::not_found(
691            "scan_not_found",
692            "no scan with that ID",
693        ));
694    };
695
696    let site = state
697        .sites
698        .iter()
699        .find(|s| s.name.eq_ignore_ascii_case(&req.site))
700        .cloned()
701        .ok_or_else(|| {
702            ApiError::bad_request("site_not_in_catalog", "site not in current catalog")
703        })?;
704
705    let username = Username::new(username_raw.clone())
706        .map_err(|e| ApiError::bad_request("invalid_username", e.to_string()))?;
707
708    let new_outcome = state.client.check(&site, &username).await;
709
710    // Update in-memory scan handle (if loaded) and re-persist.
711    if let Some(handle) = state.get_scan(&scan_id).await {
712        handle.replace_outcome(new_outcome.clone()).await;
713        if let (Some(finished), Some(dir)) = (handle.finished().await, &state.scans_dir) {
714            let snap = crate::persist::PersistedScan::from_finished(
715                scan_id.clone(),
716                handle.username().to_owned(),
717                handle.site_count(),
718                handle.created_at_ms(),
719                finished,
720            );
721            if let Err(err) = crate::persist::save(dir, &snap).await {
722                tracing::warn!(error = %err, scan_id = %scan_id, "failed to re-persist scan");
723            }
724        }
725    } else if let Some(dir) = &state.scans_dir {
726        // In-memory eviction already happened; patch the on-disk file.
727        if let Some(mut ps) = crate::persist::load(dir, &scan_id).await {
728            if let Some(slot) = ps.outcomes.iter_mut().find(|o| o.site == new_outcome.site) {
729                *slot = new_outcome.clone();
730            } else {
731                ps.outcomes.push(new_outcome.clone());
732            }
733            ps.summary = crate::scan::Summary::from_outcomes(&ps.outcomes);
734            if let Err(err) = crate::persist::save(dir, &ps).await {
735                tracing::warn!(error = %err, scan_id = %scan_id, "failed to patch persisted scan");
736            }
737        }
738    }
739
740    Ok(Json(RetryResponse {
741        outcome: new_outcome,
742    }))
743}
744
745async fn stream_scan(
746    State(state): State<AppState>,
747    AxumPath(id): AxumPath<String>,
748) -> Result<Sse<KeepAliveStream<SseStream>>, ApiError> {
749    let scan_id = ScanId::from(id);
750    if let Some(scan) = state.get_scan(&scan_id).await {
751        let stream: SseStream = Box::pin(scan_event_stream(scan));
752        return Ok(Sse::new(stream).keep_alive(KeepAlive::new()));
753    }
754    if let Some(dir) = &state.scans_dir {
755        if let Some(ps) = crate::persist::load(dir, &scan_id).await {
756            let stream: SseStream = Box::pin(persisted_event_stream(ps));
757            return Ok(Sse::new(stream).keep_alive(KeepAlive::new()));
758        }
759    }
760    Err(ApiError::not_found(
761        "scan_not_found",
762        "no scan with that ID",
763    ))
764}
765
766/// Build an SSE stream that replays a [`PersistedScan`] all-at-once
767/// then terminates. Mirrors [`scan_event_stream`]'s event types so the
768/// client side handles both cases identically.
769fn persisted_event_stream(
770    ps: crate::persist::PersistedScan,
771) -> impl Stream<Item = Result<Event, Infallible>> + Send {
772    let username = ps.username.clone();
773    let outcomes = ps.outcomes.clone();
774    let finished = crate::scan::FinishedScan {
775        summary: ps.summary,
776        outcomes: ps.outcomes,
777        elapsed_ms: ps.elapsed_ms,
778    };
779    stream! {
780        yield Ok(Event::default()
781            .event("start")
782            .json_data(StartEvent { username })
783            .unwrap_or_default());
784        for o in &outcomes {
785            yield Ok(outcome_event(o));
786        }
787        yield Ok(Event::default()
788            .event("done")
789            .json_data(&finished)
790            .unwrap_or_default());
791    }
792}
793
794/// Build the per-subscription SSE event stream.
795///
796/// Order: a `start` event, then every outcome already in history, then
797/// each newly-broadcast outcome live, then a final `done` event with
798/// the summary aggregate. The stream terminates after `done` so the
799/// client's `EventSource` closes cleanly.
800fn scan_event_stream(scan: ScanHandle) -> impl Stream<Item = Result<Event, Infallible>> {
801    stream! {
802        yield Ok(Event::default()
803            .event("start")
804            .json_data(StartEvent { username: scan.username().to_owned() })
805            .unwrap_or_default());
806
807        // Replay every outcome already recorded so a slightly late
808        // subscriber still gets a full picture.
809        let history = scan.outcomes_snapshot().await;
810        let mut last_index = history.len();
811        for outcome in &history {
812            yield Ok(outcome_event(outcome));
813        }
814
815        // If the scan finished before we subscribed, the broadcast
816        // channel is closed — skip the live loop and go straight to
817        // emitting `done`.
818        if scan.finished().await.is_none() {
819            let mut rx = scan.subscribe();
820            loop {
821                tokio::select! {
822                    biased;
823                    () = scan.wait_done() => break,
824                    recv = rx.recv() => match recv {
825                        Ok(idx) => {
826                            // Catch-up: deliver every outcome we haven't
827                            // emitted yet (handles a Lagged broadcast as
828                            // well — we re-snapshot the vec).
829                            let snap = scan.outcomes_snapshot().await;
830                            for outcome in &snap[last_index..=idx.min(snap.len().saturating_sub(1))] {
831                                yield Ok(outcome_event(outcome));
832                            }
833                            last_index = idx + 1;
834                        }
835                        Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
836                            // Re-snapshot and emit the gap.
837                            let snap = scan.outcomes_snapshot().await;
838                            for outcome in &snap[last_index..] {
839                                yield Ok(outcome_event(outcome));
840                            }
841                            last_index = snap.len();
842                        }
843                        Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
844                    }
845                }
846            }
847        }
848
849        // Emit any outcomes the live loop missed (shouldn't happen in
850        // practice, but cheap insurance).
851        let final_snap = scan.outcomes_snapshot().await;
852        for outcome in &final_snap[last_index..] {
853            yield Ok(outcome_event(outcome));
854        }
855
856        if let Some(finished) = scan.finished().await {
857            yield Ok(Event::default()
858                .event("done")
859                .json_data(&finished)
860                .unwrap_or_default());
861        }
862    }
863}
864
865fn outcome_event(outcome: &adler_core::CheckOutcome) -> Event {
866    Event::default()
867        .event("outcome")
868        .json_data(outcome)
869        .unwrap_or_default()
870}
871
872#[derive(Serialize)]
873struct StartEvent {
874    username: String,
875}
876
877/// JSON error envelope returned by failing handlers.
878#[derive(Debug, Serialize)]
879struct ApiError {
880    #[serde(skip)]
881    status: StatusCode,
882    error: &'static str,
883    message: String,
884}
885
886impl ApiError {
887    fn bad_request(code: &'static str, msg: impl Into<String>) -> Self {
888        Self {
889            status: StatusCode::BAD_REQUEST,
890            error: code,
891            message: msg.into(),
892        }
893    }
894
895    fn not_found(code: &'static str, msg: impl Into<String>) -> Self {
896        Self {
897            status: StatusCode::NOT_FOUND,
898            error: code,
899            message: msg.into(),
900        }
901    }
902}
903
904impl IntoResponse for ApiError {
905    fn into_response(self) -> Response {
906        let status = self.status;
907        (status, Json(self)).into_response()
908    }
909}
910
911#[cfg(test)]
912mod tests {
913    use super::*;
914    use adler_core::{Client, KnownPresent, Signal, UrlTemplate};
915    use axum::body::{Body, to_bytes};
916    use axum::http::{Request, header};
917    use tower::ServiceExt;
918    use wiremock::matchers::{any, path};
919    use wiremock::{Mock, MockServer, ResponseTemplate};
920
921    fn site(name: &str, base: &str, segment: &str) -> Site {
922        Site {
923            name: name.into(),
924            url: UrlTemplate::new(format!("{base}/{segment}/{{username}}")).unwrap(),
925            signals: vec![
926                Signal::StatusFound { codes: vec![200] },
927                Signal::StatusNotFound { codes: vec![404] },
928            ],
929            known_present: None::<KnownPresent>,
930            known_absent: None,
931            extract: Vec::new(),
932            tags: Vec::new(),
933            request_headers: std::collections::BTreeMap::new(),
934            regex_check: None,
935            engine: None,
936            strip_bad_char: None,
937            request_method: adler_core::HttpMethod::Get,
938            request_body: None,
939            protection: Vec::new(),
940            disabled: false,
941            disabled_reason: None,
942            source: None,
943            popularity: None,
944            access: adler_core::AccessPolicy::default(),
945        }
946    }
947
948    async fn test_app() -> (Router, MockServer) {
949        let mock = MockServer::start().await;
950        Mock::given(any())
951            .and(path("/a/alice"))
952            .respond_with(ResponseTemplate::new(200))
953            .mount(&mock)
954            .await;
955        Mock::given(any())
956            .and(path("/b/alice"))
957            .respond_with(ResponseTemplate::new(404))
958            .mount(&mock)
959            .await;
960        let sites = vec![site("A", &mock.uri(), "a"), site("B", &mock.uri(), "b")];
961        let client = Client::builder()
962            .timeout(Duration::from_secs(2))
963            .min_request_interval(Duration::ZERO)
964            .build()
965            .unwrap();
966        let state = AppState::new(sites, client, 16);
967        (router(state), mock)
968    }
969
970    #[tokio::test]
971    async fn health_returns_ok() {
972        let (app, _mock) = test_app().await;
973        let resp = app
974            .oneshot(
975                Request::builder()
976                    .uri("/api/health")
977                    .body(Body::empty())
978                    .unwrap(),
979            )
980            .await
981            .unwrap();
982        assert_eq!(resp.status(), StatusCode::OK);
983        let body = to_bytes(resp.into_body(), 1024).await.unwrap();
984        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
985        assert_eq!(v["ok"], true);
986    }
987
988    #[tokio::test]
989    async fn list_sites_returns_summary() {
990        let (app, _mock) = test_app().await;
991        let resp = app
992            .oneshot(
993                Request::builder()
994                    .uri("/api/sites")
995                    .body(Body::empty())
996                    .unwrap(),
997            )
998            .await
999            .unwrap();
1000        assert_eq!(resp.status(), StatusCode::OK);
1001        let body = to_bytes(resp.into_body(), 4096).await.unwrap();
1002        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1003        assert_eq!(v.as_array().unwrap().len(), 2);
1004        assert_eq!(v[0]["name"], "A");
1005        assert!(v[0]["url"].as_str().unwrap().contains("{username}"));
1006    }
1007
1008    #[tokio::test]
1009    async fn list_access_empty_when_nothing_configured() {
1010        let (app, _mock) = test_app().await;
1011        let resp = app
1012            .oneshot(
1013                Request::builder()
1014                    .uri("/api/access")
1015                    .body(Body::empty())
1016                    .unwrap(),
1017            )
1018            .await
1019            .unwrap();
1020        assert_eq!(resp.status(), StatusCode::OK);
1021        let body = to_bytes(resp.into_body(), 1024).await.unwrap();
1022        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1023        assert_eq!(v["egress"].as_array().unwrap().len(), 0);
1024        assert_eq!(v["sessions"].as_array().unwrap().len(), 0);
1025    }
1026
1027    #[tokio::test]
1028    async fn list_access_surfaces_pool_and_sessions_without_secrets() {
1029        use adler_core::{EgressKind, EgressSpec, Session, SessionStore};
1030        let mock = MockServer::start().await;
1031        let sites = vec![site("A", &mock.uri(), "a")];
1032
1033        let pool = vec![
1034            EgressSpec {
1035                url: "http://corp-proxy.invalid:8080".into(),
1036                country: adler_core::CountryCode::new("de"),
1037                kind: EgressKind::Datacenter,
1038                name: Some("corp-de".into()),
1039            },
1040            EgressSpec {
1041                url: "socks5://user:hunter2@residential.invalid:1080".into(),
1042                country: adler_core::CountryCode::new("us"),
1043                kind: EgressKind::Residential,
1044                name: Some("us-residential".into()),
1045            },
1046        ];
1047        let mut sessions = SessionStore::new();
1048        let mut hdr = std::collections::BTreeMap::new();
1049        hdr.insert("Cookie".into(), "sessionid=secret-token-do-not-leak".into());
1050        sessions.insert("instagram", Session::from_headers(hdr));
1051
1052        let client = Client::builder()
1053            .timeout(Duration::from_secs(2))
1054            .min_request_interval(Duration::ZERO)
1055            .egress_pool(pool)
1056            .sessions(sessions)
1057            .build()
1058            .unwrap();
1059        let state = AppState::new(sites, client, 16);
1060        let app = router(state);
1061
1062        let resp = app
1063            .oneshot(
1064                Request::builder()
1065                    .uri("/api/access")
1066                    .body(Body::empty())
1067                    .unwrap(),
1068            )
1069            .await
1070            .unwrap();
1071        assert_eq!(resp.status(), StatusCode::OK);
1072        let body = to_bytes(resp.into_body(), 4096).await.unwrap();
1073        let raw = String::from_utf8(body.to_vec()).unwrap();
1074        // Negative assertions first — a regression here is the whole point
1075        // of the API design (no URLs, no header values reach the browser).
1076        assert!(
1077            !raw.contains("corp-proxy.invalid"),
1078            "proxy URLs must never leak into /api/access — got body: {raw}"
1079        );
1080        assert!(
1081            !raw.contains("residential.invalid"),
1082            "proxy URLs must never leak: {raw}"
1083        );
1084        assert!(
1085            !raw.contains("hunter2"),
1086            "proxy credentials must never leak: {raw}"
1087        );
1088        assert!(
1089            !raw.contains("secret-token-do-not-leak"),
1090            "session values must never leak: {raw}"
1091        );
1092
1093        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1094        let egress = v["egress"].as_array().unwrap();
1095        assert_eq!(egress.len(), 2);
1096        assert_eq!(egress[0]["name"], "corp-de");
1097        assert_eq!(egress[0]["country"], "de");
1098        assert_eq!(egress[0]["kind"], "datacenter");
1099        assert_eq!(egress[1]["name"], "us-residential");
1100        assert_eq!(egress[1]["country"], "us");
1101        assert_eq!(egress[1]["kind"], "residential");
1102
1103        let sessions = v["sessions"].as_array().unwrap();
1104        assert_eq!(sessions.len(), 1);
1105        assert_eq!(sessions[0]["name"], "instagram");
1106    }
1107
1108    #[tokio::test]
1109    async fn start_scan_rejects_unknown_egress_name() {
1110        use adler_core::{EgressKind, EgressSpec};
1111        let mock = MockServer::start().await;
1112        let sites = vec![site("A", &mock.uri(), "a")];
1113        let pool = vec![EgressSpec {
1114            url: "http://only-one.invalid:8080".into(),
1115            country: adler_core::CountryCode::new("de"),
1116            kind: EgressKind::Datacenter,
1117            name: Some("only-one".into()),
1118        }];
1119        let client = Client::builder()
1120            .timeout(Duration::from_secs(2))
1121            .min_request_interval(Duration::ZERO)
1122            .egress_pool(pool)
1123            .build()
1124            .unwrap();
1125        let app = router(AppState::new(sites, client, 16));
1126
1127        let resp = app
1128            .oneshot(
1129                Request::builder()
1130                    .method("POST")
1131                    .uri("/api/scan")
1132                    .header(header::CONTENT_TYPE, "application/json")
1133                    .body(Body::from(
1134                        r#"{"username":"alice","egress_names":["does-not-exist"]}"#,
1135                    ))
1136                    .unwrap(),
1137            )
1138            .await
1139            .unwrap();
1140        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1141        let body = to_bytes(resp.into_body(), 1024).await.unwrap();
1142        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1143        assert_eq!(v["error"], "unknown_egress");
1144        assert!(
1145            v["message"].as_str().unwrap().contains("does-not-exist"),
1146            "message should name the bad egress, got {}",
1147            v["message"]
1148        );
1149    }
1150
1151    #[tokio::test]
1152    async fn start_scan_accepts_known_egress_name() {
1153        use adler_core::{EgressKind, EgressSpec};
1154        let mock = MockServer::start().await;
1155        Mock::given(any())
1156            .and(path("/a/alice"))
1157            .respond_with(ResponseTemplate::new(200))
1158            .mount(&mock)
1159            .await;
1160        let sites = vec![site("A", &mock.uri(), "a")];
1161        let pool = vec![EgressSpec {
1162            url: "http://corp-de.invalid:8080".into(),
1163            country: adler_core::CountryCode::new("de"),
1164            kind: EgressKind::Datacenter,
1165            name: Some("corp-de".into()),
1166        }];
1167        let client = Client::builder()
1168            .timeout(Duration::from_secs(2))
1169            .min_request_interval(Duration::ZERO)
1170            .egress_pool(pool)
1171            .build()
1172            .unwrap();
1173        let app = router(AppState::new(sites, client, 16));
1174
1175        let resp = app
1176            .oneshot(
1177                Request::builder()
1178                    .method("POST")
1179                    .uri("/api/scan")
1180                    .header(header::CONTENT_TYPE, "application/json")
1181                    .body(Body::from(
1182                        r#"{"username":"alice","egress_names":["corp-de"]}"#,
1183                    ))
1184                    .unwrap(),
1185            )
1186            .await
1187            .unwrap();
1188        // Known egress name; the scan is accepted (the actual probe
1189        // outcome happens off-task and is checked elsewhere — this
1190        // assertion just covers the validation boundary).
1191        assert_eq!(resp.status(), StatusCode::OK);
1192        let body = to_bytes(resp.into_body(), 1024).await.unwrap();
1193        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1194        assert!(v["scan_id"].is_string());
1195    }
1196
1197    #[tokio::test]
1198    async fn start_scan_rejects_invalid_username() {
1199        let (app, _mock) = test_app().await;
1200        let resp = app
1201            .oneshot(
1202                Request::builder()
1203                    .method("POST")
1204                    .uri("/api/scan")
1205                    .header(header::CONTENT_TYPE, "application/json")
1206                    .body(Body::from(r#"{"username":" bad "}"#))
1207                    .unwrap(),
1208            )
1209            .await
1210            .unwrap();
1211        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1212        let body = to_bytes(resp.into_body(), 1024).await.unwrap();
1213        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1214        assert_eq!(v["error"], "invalid_username");
1215    }
1216
1217    #[tokio::test]
1218    async fn start_then_poll_finishes_with_expected_counts() {
1219        let (app, _mock) = test_app().await;
1220        let resp = app
1221            .clone()
1222            .oneshot(
1223                Request::builder()
1224                    .method("POST")
1225                    .uri("/api/scan")
1226                    .header(header::CONTENT_TYPE, "application/json")
1227                    .body(Body::from(r#"{"username":"alice"}"#))
1228                    .unwrap(),
1229            )
1230            .await
1231            .unwrap();
1232        assert_eq!(resp.status(), StatusCode::OK);
1233        let body = to_bytes(resp.into_body(), 4096).await.unwrap();
1234        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1235        let scan_id = v["scan_id"].as_str().unwrap().to_owned();
1236        assert_eq!(v["site_count"], 2);
1237
1238        // Poll until finished, max ~5s.
1239        for _ in 0..50 {
1240            tokio::time::sleep(Duration::from_millis(100)).await;
1241            let r = app
1242                .clone()
1243                .oneshot(
1244                    Request::builder()
1245                        .uri(format!("/api/scan/{scan_id}"))
1246                        .body(Body::empty())
1247                        .unwrap(),
1248                )
1249                .await
1250                .unwrap();
1251            assert_eq!(r.status(), StatusCode::OK);
1252            let body = to_bytes(r.into_body(), 16384).await.unwrap();
1253            let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1254            if v["status"] == "finished" {
1255                assert_eq!(v["summary"]["found"], 1);
1256                assert_eq!(v["summary"]["not_found"], 1);
1257                assert_eq!(v["outcomes"].as_array().unwrap().len(), 2);
1258                return;
1259            }
1260        }
1261        panic!("scan did not finish within 5s");
1262    }
1263
1264    #[tokio::test]
1265    async fn get_scan_404s_on_unknown_id() {
1266        let (app, _mock) = test_app().await;
1267        let resp = app
1268            .oneshot(
1269                Request::builder()
1270                    .uri("/api/scan/does-not-exist")
1271                    .body(Body::empty())
1272                    .unwrap(),
1273            )
1274            .await
1275            .unwrap();
1276        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1277        let body = to_bytes(resp.into_body(), 1024).await.unwrap();
1278        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1279        assert_eq!(v["error"], "scan_not_found");
1280    }
1281
1282    fn tagged_site(name: &str, base: &str, segment: &str, tags: &[&str]) -> Site {
1283        let mut s = site(name, base, segment);
1284        s.tags = tags.iter().map(|t| (*t).to_owned()).collect();
1285        s
1286    }
1287
1288    #[test]
1289    fn filter_catalog_honours_only_exclude() {
1290        let sites = vec![
1291            site("GitHub", "http://x", "gh"),
1292            site("GitLab", "http://x", "gl"),
1293            site("Bitbucket", "http://x", "bb"),
1294        ];
1295        let only = StartScanRequest {
1296            only: vec!["git".into()],
1297            ..Default::default()
1298        };
1299        let names: Vec<_> = filter_catalog(&sites, &only)
1300            .into_iter()
1301            .map(|s| s.name)
1302            .collect();
1303        assert_eq!(names, vec!["GitHub", "GitLab"]);
1304
1305        let exclude = StartScanRequest {
1306            exclude: vec!["lab".into()],
1307            ..Default::default()
1308        };
1309        let names: Vec<_> = filter_catalog(&sites, &exclude)
1310            .into_iter()
1311            .map(|s| s.name)
1312            .collect();
1313        assert_eq!(names, vec!["GitHub", "Bitbucket"]);
1314    }
1315
1316    #[test]
1317    fn filter_catalog_honours_tags_and_nsfw() {
1318        let sites = vec![
1319            tagged_site("A", "http://x", "a", &["social"]),
1320            tagged_site("B", "http://x", "b", &["dev"]),
1321            tagged_site("C", "http://x", "c", &["social", "nsfw"]),
1322            tagged_site("D", "http://x", "d", &[]),
1323        ];
1324        let only_social = StartScanRequest {
1325            tag: vec!["social".into()],
1326            ..Default::default()
1327        };
1328        // C has `nsfw` so default `nsfw=false` excludes it.
1329        let names: Vec<_> = filter_catalog(&sites, &only_social)
1330            .into_iter()
1331            .map(|s| s.name)
1332            .collect();
1333        assert_eq!(names, vec!["A"]);
1334
1335        let with_nsfw = StartScanRequest {
1336            tag: vec!["social".into()],
1337            nsfw: true,
1338            ..Default::default()
1339        };
1340        let names: Vec<_> = filter_catalog(&sites, &with_nsfw)
1341            .into_iter()
1342            .map(|s| s.name)
1343            .collect();
1344        assert_eq!(names, vec!["A", "C"]);
1345
1346        let exclude_dev = StartScanRequest {
1347            exclude_tag: vec!["dev".into()],
1348            ..Default::default()
1349        };
1350        // dev excluded → A, C (still no nsfw), D remain.
1351        let names: Vec<_> = filter_catalog(&sites, &exclude_dev)
1352            .into_iter()
1353            .map(|s| s.name)
1354            .collect();
1355        assert_eq!(names, vec!["A", "D"]);
1356    }
1357
1358    #[test]
1359    fn filter_catalog_top_sorts_by_popularity() {
1360        let mut a = site("A", "http://x", "a");
1361        a.popularity = Some(3);
1362        let mut b = site("B", "http://x", "b");
1363        b.popularity = Some(1);
1364        let mut c = site("C", "http://x", "c");
1365        c.popularity = Some(2);
1366        let d = site("D", "http://x", "d"); // no rank
1367        let sites = vec![a, b, c, d];
1368        let req = StartScanRequest {
1369            top: Some(2),
1370            ..Default::default()
1371        };
1372        let names: Vec<_> = filter_catalog(&sites, &req)
1373            .into_iter()
1374            .map(|s| s.name)
1375            .collect();
1376        assert_eq!(names, vec!["B", "C"]);
1377    }
1378
1379    #[tokio::test]
1380    async fn start_scan_with_tag_filter_only_runs_matching_sites() {
1381        let mock = MockServer::start().await;
1382        Mock::given(any())
1383            .and(path("/a/alice"))
1384            .respond_with(ResponseTemplate::new(200))
1385            .mount(&mock)
1386            .await;
1387        Mock::given(any())
1388            .and(path("/b/alice"))
1389            .respond_with(ResponseTemplate::new(404))
1390            .mount(&mock)
1391            .await;
1392        let sites = vec![
1393            tagged_site("A", &mock.uri(), "a", &["social"]),
1394            tagged_site("B", &mock.uri(), "b", &["dev"]),
1395        ];
1396        let client = Client::builder()
1397            .timeout(Duration::from_secs(2))
1398            .min_request_interval(Duration::ZERO)
1399            .build()
1400            .unwrap();
1401        let state = AppState::new(sites, client, 16);
1402        let app = router(state);
1403        let resp = app
1404            .clone()
1405            .oneshot(
1406                Request::builder()
1407                    .method("POST")
1408                    .uri("/api/scan")
1409                    .header(header::CONTENT_TYPE, "application/json")
1410                    .body(Body::from(r#"{"username":"alice","tag":["social"]}"#))
1411                    .unwrap(),
1412            )
1413            .await
1414            .unwrap();
1415        assert_eq!(resp.status(), StatusCode::OK);
1416        let body = to_bytes(resp.into_body(), 4096).await.unwrap();
1417        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1418        assert_eq!(v["site_count"], 1);
1419    }
1420
1421    #[tokio::test]
1422    async fn empty_filter_returns_bad_request() {
1423        let (app, _mock) = test_app().await;
1424        let resp = app
1425            .oneshot(
1426                Request::builder()
1427                    .method("POST")
1428                    .uri("/api/scan")
1429                    .header(header::CONTENT_TYPE, "application/json")
1430                    .body(Body::from(
1431                        r#"{"username":"alice","only":["definitely-not-a-site"]}"#,
1432                    ))
1433                    .unwrap(),
1434            )
1435            .await
1436            .unwrap();
1437        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1438        let body = to_bytes(resp.into_body(), 1024).await.unwrap();
1439        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1440        assert_eq!(v["error"], "empty_site_filter");
1441    }
1442
1443    #[tokio::test]
1444    async fn retry_flips_outcome_when_response_changes() {
1445        // First call returns 404 (one-shot via `up_to_n_times`); second
1446        // and later calls hit the longer-lived 200 mock that follows it.
1447        let mock = MockServer::start().await;
1448        Mock::given(any())
1449            .and(path("/a/alice"))
1450            .respond_with(ResponseTemplate::new(404))
1451            .up_to_n_times(1)
1452            .mount(&mock)
1453            .await;
1454        Mock::given(any())
1455            .and(path("/a/alice"))
1456            .respond_with(ResponseTemplate::new(200))
1457            .mount(&mock)
1458            .await;
1459
1460        let sites = vec![site("A", &mock.uri(), "a")];
1461        let client = Client::builder()
1462            .timeout(Duration::from_secs(2))
1463            .min_request_interval(Duration::ZERO)
1464            .build()
1465            .unwrap();
1466        let state = AppState::new(sites, client, 16);
1467        let app = router(state);
1468
1469        let r = app
1470            .clone()
1471            .oneshot(
1472                Request::builder()
1473                    .method("POST")
1474                    .uri("/api/scan")
1475                    .header(header::CONTENT_TYPE, "application/json")
1476                    .body(Body::from(r#"{"username":"alice"}"#))
1477                    .unwrap(),
1478            )
1479            .await
1480            .unwrap();
1481        let body = to_bytes(r.into_body(), 4096).await.unwrap();
1482        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1483        let scan_id = v["scan_id"].as_str().unwrap().to_owned();
1484
1485        // Wait for completion with NotFound for site A.
1486        let mut finished = false;
1487        for _ in 0..60 {
1488            tokio::time::sleep(Duration::from_millis(60)).await;
1489            let r = app
1490                .clone()
1491                .oneshot(
1492                    Request::builder()
1493                        .uri(format!("/api/scan/{scan_id}"))
1494                        .body(Body::empty())
1495                        .unwrap(),
1496                )
1497                .await
1498                .unwrap();
1499            let body = to_bytes(r.into_body(), 8192).await.unwrap();
1500            let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1501            if v["status"] == "finished" {
1502                assert_eq!(v["summary"]["not_found"], 1);
1503                finished = true;
1504                break;
1505            }
1506        }
1507        assert!(finished, "scan did not finish");
1508
1509        // Retry — should now hit the 200 mock and flip to found.
1510        let r = app
1511            .clone()
1512            .oneshot(
1513                Request::builder()
1514                    .method("POST")
1515                    .uri(format!("/api/scan/{scan_id}/retry"))
1516                    .header(header::CONTENT_TYPE, "application/json")
1517                    .body(Body::from(r#"{"site":"A"}"#))
1518                    .unwrap(),
1519            )
1520            .await
1521            .unwrap();
1522        assert_eq!(r.status(), StatusCode::OK);
1523        let body = to_bytes(r.into_body(), 4096).await.unwrap();
1524        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1525        assert_eq!(v["outcome"]["site"], "A");
1526        assert_eq!(v["outcome"]["kind"], "found");
1527
1528        // Persistent scan state reflects the new outcome.
1529        let r = app
1530            .oneshot(
1531                Request::builder()
1532                    .uri(format!("/api/scan/{scan_id}"))
1533                    .body(Body::empty())
1534                    .unwrap(),
1535            )
1536            .await
1537            .unwrap();
1538        let body = to_bytes(r.into_body(), 16384).await.unwrap();
1539        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1540        assert_eq!(v["summary"]["found"], 1);
1541        assert_eq!(v["summary"]["not_found"], 0);
1542    }
1543
1544    #[tokio::test]
1545    async fn retry_404s_unknown_site_or_scan() {
1546        let (app, _mock) = test_app().await;
1547        // Unknown scan.
1548        let r = app
1549            .clone()
1550            .oneshot(
1551                Request::builder()
1552                    .method("POST")
1553                    .uri("/api/scan/nope/retry")
1554                    .header(header::CONTENT_TYPE, "application/json")
1555                    .body(Body::from(r#"{"site":"A"}"#))
1556                    .unwrap(),
1557            )
1558            .await
1559            .unwrap();
1560        assert_eq!(r.status(), StatusCode::NOT_FOUND);
1561
1562        // Start a scan, then ask to retry a site that isn't in the catalog.
1563        let r = app
1564            .clone()
1565            .oneshot(
1566                Request::builder()
1567                    .method("POST")
1568                    .uri("/api/scan")
1569                    .header(header::CONTENT_TYPE, "application/json")
1570                    .body(Body::from(r#"{"username":"alice"}"#))
1571                    .unwrap(),
1572            )
1573            .await
1574            .unwrap();
1575        let body = to_bytes(r.into_body(), 4096).await.unwrap();
1576        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1577        let scan_id = v["scan_id"].as_str().unwrap().to_owned();
1578        let r = app
1579            .oneshot(
1580                Request::builder()
1581                    .method("POST")
1582                    .uri(format!("/api/scan/{scan_id}/retry"))
1583                    .header(header::CONTENT_TYPE, "application/json")
1584                    .body(Body::from(r#"{"site":"NoSuch"}"#))
1585                    .unwrap(),
1586            )
1587            .await
1588            .unwrap();
1589        assert_eq!(r.status(), StatusCode::BAD_REQUEST);
1590        let body = to_bytes(r.into_body(), 1024).await.unwrap();
1591        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1592        assert_eq!(v["error"], "site_not_in_catalog");
1593    }
1594
1595    #[tokio::test]
1596    async fn list_scans_returns_newest_first() {
1597        let (app, _mock) = test_app().await;
1598        // Kick off two scans.
1599        for _ in 0..2 {
1600            let r = app
1601                .clone()
1602                .oneshot(
1603                    Request::builder()
1604                        .method("POST")
1605                        .uri("/api/scan")
1606                        .header(header::CONTENT_TYPE, "application/json")
1607                        .body(Body::from(r#"{"username":"alice"}"#))
1608                        .unwrap(),
1609                )
1610                .await
1611                .unwrap();
1612            assert_eq!(r.status(), StatusCode::OK);
1613            // Yield so SystemTime moves forward between insertions.
1614            tokio::time::sleep(Duration::from_millis(5)).await;
1615        }
1616        let resp = app
1617            .oneshot(
1618                Request::builder()
1619                    .uri("/api/scans")
1620                    .body(Body::empty())
1621                    .unwrap(),
1622            )
1623            .await
1624            .unwrap();
1625        assert_eq!(resp.status(), StatusCode::OK);
1626        let body = to_bytes(resp.into_body(), 4096).await.unwrap();
1627        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1628        let arr = v.as_array().unwrap();
1629        assert_eq!(arr.len(), 2);
1630        assert!(
1631            arr[0]["started_at_ms"].as_u64() >= arr[1]["started_at_ms"].as_u64(),
1632            "scans must be newest-first",
1633        );
1634    }
1635
1636    #[tokio::test]
1637    async fn refilter_404s_unknown_scan() {
1638        let (app, _mock) = test_app().await;
1639        let resp = app
1640            .oneshot(
1641                Request::builder()
1642                    .method("POST")
1643                    .uri("/api/scan/does-not-exist/refilter")
1644                    .header("content-type", "application/json")
1645                    .body(Body::from(r"{}"))
1646                    .unwrap(),
1647            )
1648            .await
1649            .unwrap();
1650        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1651    }
1652
1653    #[tokio::test]
1654    async fn refilter_rejects_finished_scan() {
1655        // Start a scan, wait for it to finish naturally (both sites
1656        // resolve from the mock instantly), then refilter — must
1657        // return 400 scan_finished.
1658        let (app, _mock) = test_app().await;
1659        let id = start_and_wait(&app, "alice").await;
1660        let resp = app
1661            .oneshot(
1662                Request::builder()
1663                    .method("POST")
1664                    .uri(format!("/api/scan/{id}/refilter"))
1665                    .header("content-type", "application/json")
1666                    .body(Body::from(r#"{"only":["A"]}"#))
1667                    .unwrap(),
1668            )
1669            .await
1670            .unwrap();
1671        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1672        let body = to_bytes(resp.into_body(), 1024).await.unwrap();
1673        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1674        assert_eq!(v["error"], "scan_finished");
1675    }
1676
1677    #[tokio::test]
1678    async fn refilter_rejects_empty_filter() {
1679        let (app, _mock) = test_app().await;
1680        let id = start_and_wait(&app, "alice").await;
1681        // Even with a finished predecessor, the empty-filter check
1682        // would fire before scan_finished — but here we get
1683        // scan_finished first. Use the live router with a custom
1684        // handle to actually exercise empty_site_filter. We instead
1685        // construct a fake running scan by inserting a never-ending
1686        // handle directly into AppState.
1687        let _ = id;
1688        let mock = MockServer::start().await;
1689        let sites = vec![site("A", &mock.uri(), "a"), site("B", &mock.uri(), "b")];
1690        let client = Client::builder()
1691            .timeout(Duration::from_secs(2))
1692            .min_request_interval(Duration::ZERO)
1693            .build()
1694            .unwrap();
1695        let state = AppState::new(sites, client, 16);
1696        let prev_id = ScanId::new();
1697        let handle = ScanHandle::new("bob", 2, 16);
1698        state.insert_scan(prev_id.clone(), handle).await;
1699        let app = router(state);
1700        let resp = app
1701            .oneshot(
1702                Request::builder()
1703                    .method("POST")
1704                    .uri(format!("/api/scan/{prev_id}/refilter"))
1705                    .header("content-type", "application/json")
1706                    // `only=Z` matches no site in the catalog (`A`, `B`).
1707                    .body(Body::from(r#"{"only":["Z"]}"#))
1708                    .unwrap(),
1709            )
1710            .await
1711            .unwrap();
1712        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1713        let body = to_bytes(resp.into_body(), 1024).await.unwrap();
1714        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1715        assert_eq!(v["error"], "empty_site_filter");
1716    }
1717
1718    #[tokio::test]
1719    async fn refilter_carries_overlap_and_returns_fresh_id() {
1720        // Synthesize a "running" predecessor whose handle already has
1721        // an outcome recorded for site A. Refiltering to `only=A`
1722        // means the new scan should carry A over (1 outcome) and have
1723        // 0 sites left to probe.
1724        let mock = MockServer::start().await;
1725        let sites = vec![site("A", &mock.uri(), "a"), site("B", &mock.uri(), "b")];
1726        let client = Client::builder()
1727            .timeout(Duration::from_secs(2))
1728            .min_request_interval(Duration::ZERO)
1729            .build()
1730            .unwrap();
1731        let state = AppState::new(sites, client, 16);
1732
1733        let prev_id = ScanId::new();
1734        let handle = ScanHandle::new("bob", 2, 16);
1735        // Inject a Found outcome for site A so the refilter has
1736        // something concrete to carry over.
1737        handle
1738            .extend_outcomes(vec![adler_core::CheckOutcome {
1739                site: "A".to_owned(),
1740                url: "https://a.test/bob".to_owned(),
1741                kind: adler_core::MatchKind::Found,
1742                reason: None,
1743                elapsed_ms: 12,
1744                evidence: Vec::new(),
1745                enrichment: std::collections::BTreeMap::new(),
1746                transport: None,
1747                escalations: 0,
1748            }])
1749            .await;
1750        state.insert_scan(prev_id.clone(), handle).await;
1751        let app = router(state.clone());
1752
1753        let resp = app
1754            .oneshot(
1755                Request::builder()
1756                    .method("POST")
1757                    .uri(format!("/api/scan/{prev_id}/refilter"))
1758                    .header("content-type", "application/json")
1759                    .body(Body::from(r#"{"only":["A"]}"#))
1760                    .unwrap(),
1761            )
1762            .await
1763            .unwrap();
1764        assert_eq!(resp.status(), StatusCode::OK);
1765        let body = to_bytes(resp.into_body(), 1024).await.unwrap();
1766        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1767        assert_eq!(v["carried_outcomes"], 1);
1768        assert_eq!(v["site_count"], 1);
1769        assert_eq!(v["derived_from"].as_str().unwrap(), prev_id.as_str());
1770        let new_id = v["scan_id"].as_str().unwrap();
1771        assert_ne!(new_id, prev_id.as_str(), "new scan must have a fresh id");
1772
1773        // The successor handle should hold the carried-over outcome
1774        // already, even before the spawn task gets a chance to run.
1775        let new_handle = state
1776            .get_scan(&ScanId::from(new_id.to_owned()))
1777            .await
1778            .expect("new handle registered");
1779        let snap = new_handle.outcomes_snapshot().await;
1780        assert_eq!(snap.len(), 1);
1781        assert_eq!(snap[0].site, "A");
1782    }
1783
1784    /// Test helper: start a scan and wait for it to finish. Returns
1785    /// the scan id as a string.
1786    async fn start_and_wait(app: &Router, username: &str) -> String {
1787        let resp = app
1788            .clone()
1789            .oneshot(
1790                Request::builder()
1791                    .method("POST")
1792                    .uri("/api/scan")
1793                    .header("content-type", "application/json")
1794                    .body(Body::from(
1795                        serde_json::json!({"username": username}).to_string(),
1796                    ))
1797                    .unwrap(),
1798            )
1799            .await
1800            .unwrap();
1801        assert_eq!(resp.status(), StatusCode::OK);
1802        let body = to_bytes(resp.into_body(), 1024).await.unwrap();
1803        let v: serde_json::Value = serde_json::from_slice(&body).unwrap();
1804        let id = v["scan_id"].as_str().unwrap().to_owned();
1805        // Poll status until finished (test mocks resolve instantly).
1806        for _ in 0..50 {
1807            let r = app
1808                .clone()
1809                .oneshot(
1810                    Request::builder()
1811                        .uri(format!("/api/scan/{id}"))
1812                        .body(Body::empty())
1813                        .unwrap(),
1814                )
1815                .await
1816                .unwrap();
1817            let b = to_bytes(r.into_body(), 4096).await.unwrap();
1818            let v: serde_json::Value = serde_json::from_slice(&b).unwrap();
1819            if v["status"] == "finished" {
1820                return id;
1821            }
1822            tokio::time::sleep(Duration::from_millis(20)).await;
1823        }
1824        panic!("scan {id} did not finish within ~1s");
1825    }
1826}