Skip to main content

adler_core/
executor.rs

1//! Concurrent fan-out runner for site probes.
2//!
3//! Spawns one task per site and bounds the maximum in-flight count with a
4//! [`Semaphore`]. Tasks are independent — a panic or hang in one site never
5//! blocks results from the rest. Each task self-aborts when the global
6//! deadline (if any) is reached; remaining sites surface as
7//! [`MatchKind::Uncertain`].
8
9use std::num::NonZeroUsize;
10use std::sync::Arc;
11use std::time::Duration;
12
13use tokio::sync::Semaphore;
14use tokio::task::JoinSet;
15use tokio::time::{Instant as TokioInstant, timeout_at};
16
17use crate::check::{CheckOutcome, MatchKind};
18use crate::client::Client;
19use crate::site::Site;
20use crate::username::Username;
21
22/// Default concurrency for [`run`].
23///
24/// Most sites are distinct hosts, so the per-host throttle rarely serialises;
25/// the bottleneck is network round-trips, and 32 in-flight probes keeps the
26/// pipe full without hammering any single host.
27const DEFAULT_CONCURRENCY: NonZeroUsize = match NonZeroUsize::new(32) {
28    Some(n) => n,
29    None => unreachable!(),
30};
31
32/// Tunables for [`run`].
33#[derive(Debug, Clone)]
34#[must_use = "ExecutorOptions does nothing until passed to executor::run"]
35pub struct ExecutorOptions {
36    /// Maximum number of in-flight site probes.
37    pub concurrency: NonZeroUsize,
38    /// Total wall-clock deadline for the entire scan. Sites still in flight
39    /// when this elapses produce [`MatchKind::Uncertain`] outcomes.
40    pub deadline: Option<Duration>,
41}
42
43impl Default for ExecutorOptions {
44    fn default() -> Self {
45        Self {
46            concurrency: DEFAULT_CONCURRENCY,
47            deadline: None,
48        }
49    }
50}
51
52impl ExecutorOptions {
53    /// Override [`Self::concurrency`].
54    pub fn concurrency(mut self, n: NonZeroUsize) -> Self {
55        self.concurrency = n;
56        self
57    }
58
59    /// Set a total scan deadline.
60    pub fn deadline(mut self, d: Duration) -> Self {
61        self.deadline = Some(d);
62        self
63    }
64}
65
66/// Run a fan-out scan over `sites`, returning one outcome per site.
67///
68/// Results come back in completion order (not input order) — sort by name
69/// for stable presentation. A panicking site task is logged at `error` and
70/// silently dropped; transient HTTP failures already become
71/// [`MatchKind::Uncertain`] inside `Client::check`.
72pub async fn run(
73    client: &Client,
74    sites: &[Site],
75    username: &Username,
76    options: ExecutorOptions,
77) -> Vec<CheckOutcome> {
78    run_with_progress(client, sites, username, options, |_| {}).await
79}
80
81/// Variant of [`run`] that invokes `on_outcome` for each completed probe.
82///
83/// Useful for driving a live progress indicator or for emitting streaming
84/// output before the full scan finishes. The callback runs on the executor
85/// task between completions; long work inside it will throttle the loop.
86pub async fn run_with_progress<F>(
87    client: &Client,
88    sites: &[Site],
89    username: &Username,
90    options: ExecutorOptions,
91    mut on_outcome: F,
92) -> Vec<CheckOutcome>
93where
94    F: FnMut(&CheckOutcome),
95{
96    let semaphore = Arc::new(Semaphore::new(options.concurrency.get()));
97    let deadline_at = options.deadline.map(|d| TokioInstant::now() + d);
98    let mut set: JoinSet<CheckOutcome> = JoinSet::new();
99
100    for site in sites {
101        let site = site.clone();
102        let username = username.clone();
103        let client = client.clone();
104        let permits = Arc::clone(&semaphore);
105        set.spawn(async move {
106            let permit = match permits.acquire_owned().await {
107                Ok(p) => p,
108                Err(_closed) => {
109                    return CheckOutcome {
110                        site: site.name.clone(),
111                        url: site.url_for(&username),
112                        kind: MatchKind::Uncertain,
113                        reason: Some(crate::check::UncertainReason::SchedulerClosed),
114                        elapsed_ms: 0,
115                        enrichment: std::collections::BTreeMap::new(),
116                        evidence: Vec::new(),
117                        transport: None,
118                        escalations: 0,
119                    };
120                }
121            };
122            let probe = client.check(&site, &username);
123            let outcome = match deadline_at {
124                None => probe.await,
125                Some(at) => match timeout_at(at, probe).await {
126                    Ok(o) => o,
127                    Err(_elapsed) => CheckOutcome {
128                        site: site.name.clone(),
129                        url: site.url_for(&username),
130                        kind: MatchKind::Uncertain,
131                        reason: Some(crate::check::UncertainReason::Deadline),
132                        elapsed_ms: 0,
133                        enrichment: std::collections::BTreeMap::new(),
134                        evidence: Vec::new(),
135                        transport: None,
136                        escalations: 0,
137                    },
138                },
139            };
140            drop(permit);
141            outcome
142        });
143    }
144
145    let mut results = Vec::with_capacity(sites.len());
146    while let Some(joined) = set.join_next().await {
147        match joined {
148            Ok(outcome) => {
149                on_outcome(&outcome);
150                results.push(outcome);
151            }
152            Err(err) if err.is_cancelled() => {
153                tracing::warn!(error = %err, "check task cancelled");
154            }
155            Err(err) => {
156                tracing::error!(error = %err, "check task panicked");
157            }
158        }
159    }
160    results
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166    use crate::site::{Signal, UrlTemplate};
167    use wiremock::matchers::{any, path};
168    use wiremock::{Mock, MockServer, ResponseTemplate};
169
170    /// Test sites are uniformly defined with a Found/NotFound status pair,
171    /// matching how production sites.json migrates from Phase 1.
172    fn site(server: &MockServer, name: &str, segment: &str) -> Site {
173        Site {
174            name: name.into(),
175            url: UrlTemplate::new(format!("{}/{}/{{username}}", server.uri(), segment)).unwrap(),
176            signals: vec![
177                Signal::StatusFound { codes: vec![200] },
178                Signal::StatusNotFound { codes: vec![404] },
179            ],
180            known_present: None,
181            known_absent: None,
182            extract: Vec::new(),
183            tags: Vec::new(),
184            request_headers: std::collections::BTreeMap::new(),
185            regex_check: None,
186            engine: None,
187            strip_bad_char: None,
188            request_method: crate::site::HttpMethod::Get,
189            request_body: None,
190            protection: Vec::new(),
191            disabled: false,
192            disabled_reason: None,
193            source: None,
194            popularity: None,
195            access: crate::AccessPolicy::default(),
196        }
197    }
198
199    fn fast_client() -> Client {
200        Client::builder()
201            .timeout(Duration::from_secs(5))
202            // Tests share host 127.0.0.1 — disable throttling so concurrency
203            // assertions actually exercise the executor.
204            .min_request_interval(Duration::ZERO)
205            .build()
206            .unwrap()
207    }
208
209    fn opts_with_concurrency(n: usize) -> ExecutorOptions {
210        ExecutorOptions::default().concurrency(NonZeroUsize::new(n).unwrap())
211    }
212
213    #[tokio::test]
214    async fn runs_all_sites_concurrently() {
215        let server = MockServer::start().await;
216
217        Mock::given(any())
218            .and(path("/a/alice"))
219            .respond_with(ResponseTemplate::new(200))
220            .mount(&server)
221            .await;
222        Mock::given(any())
223            .and(path("/b/alice"))
224            .respond_with(ResponseTemplate::new(404))
225            .mount(&server)
226            .await;
227        Mock::given(any())
228            .and(path("/c/alice"))
229            .respond_with(ResponseTemplate::new(200))
230            .mount(&server)
231            .await;
232
233        let sites = vec![
234            site(&server, "A", "a"),
235            site(&server, "B", "b"),
236            site(&server, "C", "c"),
237        ];
238        let user = Username::new("alice").unwrap();
239        let mut out = run(&fast_client(), &sites, &user, opts_with_concurrency(4)).await;
240        out.sort_by(|a, b| a.site.cmp(&b.site));
241
242        assert_eq!(out.len(), 3);
243        assert_eq!(out[0].kind, MatchKind::Found);
244        assert_eq!(out[1].kind, MatchKind::NotFound);
245        assert_eq!(out[2].kind, MatchKind::Found);
246    }
247
248    #[tokio::test]
249    async fn respects_concurrency_limit() {
250        let server = MockServer::start().await;
251        for i in 0..6 {
252            Mock::given(any())
253                .and(path(format!("/{i}/alice")))
254                .respond_with(ResponseTemplate::new(200).set_delay(Duration::from_millis(50)))
255                .mount(&server)
256                .await;
257        }
258        let sites: Vec<Site> = (0..6)
259            .map(|i| site(&server, &format!("S{i}"), &i.to_string()))
260            .collect();
261        let user = Username::new("alice").unwrap();
262        let started = std::time::Instant::now();
263        let out = run(&fast_client(), &sites, &user, opts_with_concurrency(2)).await;
264        let elapsed = started.elapsed();
265        assert_eq!(out.len(), 6);
266        // 6 sites / 2 concurrent * 50 ms = 150 ms floor.
267        assert!(
268            elapsed >= Duration::from_millis(120),
269            "expected ≥120 ms, got {elapsed:?}",
270        );
271    }
272
273    #[tokio::test]
274    async fn empty_input_returns_empty() {
275        let user = Username::new("alice").unwrap();
276        let out = run(&fast_client(), &[], &user, opts_with_concurrency(4)).await;
277        assert!(out.is_empty());
278    }
279
280    #[tokio::test]
281    async fn run_with_progress_invokes_callback_per_outcome() {
282        use std::sync::Mutex;
283        let server = MockServer::start().await;
284        Mock::given(any())
285            .and(path("/a/alice"))
286            .respond_with(ResponseTemplate::new(200))
287            .mount(&server)
288            .await;
289        Mock::given(any())
290            .and(path("/b/alice"))
291            .respond_with(ResponseTemplate::new(404))
292            .mount(&server)
293            .await;
294        let sites = vec![site(&server, "A", "a"), site(&server, "B", "b")];
295        let user = Username::new("alice").unwrap();
296        let calls = Mutex::new(0);
297        let outcomes = run_with_progress(
298            &fast_client(),
299            &sites,
300            &user,
301            opts_with_concurrency(4),
302            |_| *calls.lock().unwrap() += 1,
303        )
304        .await;
305        assert_eq!(outcomes.len(), 2);
306        assert_eq!(*calls.lock().unwrap(), 2);
307    }
308
309    #[tokio::test]
310    async fn deadline_marks_slow_sites_uncertain() {
311        let server = MockServer::start().await;
312        Mock::given(any())
313            .and(path("/slow/alice"))
314            .respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(2)))
315            .mount(&server)
316            .await;
317        Mock::given(any())
318            .and(path("/fast/alice"))
319            .respond_with(ResponseTemplate::new(200))
320            .mount(&server)
321            .await;
322        let sites = vec![site(&server, "Slow", "slow"), site(&server, "Fast", "fast")];
323        let user = Username::new("alice").unwrap();
324        let options = ExecutorOptions::default()
325            .concurrency(NonZeroUsize::new(4).unwrap())
326            .deadline(Duration::from_millis(200));
327        let started = std::time::Instant::now();
328        let mut out = run(&fast_client(), &sites, &user, options).await;
329        let elapsed = started.elapsed();
330        out.sort_by(|a, b| a.site.cmp(&b.site));
331
332        assert_eq!(out.len(), 2);
333        // Fast site completed; slow one hit the deadline.
334        let fast = out.iter().find(|o| o.site == "Fast").unwrap();
335        let slow = out.iter().find(|o| o.site == "Slow").unwrap();
336        assert_eq!(fast.kind, MatchKind::Found);
337        assert_eq!(slow.kind, MatchKind::Uncertain);
338        assert_eq!(slow.reason, Some(crate::check::UncertainReason::Deadline));
339        assert!(
340            elapsed < Duration::from_millis(800),
341            "scan should abort near the deadline, got {elapsed:?}",
342        );
343    }
344}