Skip to main content

adler_core/
executor.rs

1//! Concurrent fan-out runner for site probes.
2//!
3//! Spawns one task per site and bounds the maximum in-flight count with a
4//! [`Semaphore`]. Tasks are independent — a panic or hang in one site never
5//! blocks results from the rest. Each task self-aborts when the global
6//! deadline (if any) is reached; remaining sites surface as
7//! [`MatchKind::Uncertain`].
8
9use std::num::NonZeroUsize;
10use std::sync::Arc;
11use std::time::Duration;
12
13use tokio::sync::Semaphore;
14use tokio::task::JoinSet;
15use tokio::time::{Instant as TokioInstant, timeout_at};
16
17use crate::check::{CheckOutcome, MatchKind};
18use crate::client::Client;
19use crate::confidence::ConfidenceScore;
20use crate::site::Site;
21use crate::username::Username;
22
23/// Default concurrency for [`run`].
24///
25/// Most sites are distinct hosts, so the per-host throttle rarely serialises;
26/// the bottleneck is network round-trips, and 32 in-flight probes keeps the
27/// pipe full without hammering any single host.
28const DEFAULT_CONCURRENCY: NonZeroUsize = match NonZeroUsize::new(32) {
29    Some(n) => n,
30    None => unreachable!(),
31};
32
33/// Tunables for [`run`].
34#[derive(Debug, Clone)]
35#[must_use = "ExecutorOptions does nothing until passed to executor::run"]
36pub struct ExecutorOptions {
37    /// Maximum number of in-flight site probes.
38    pub concurrency: NonZeroUsize,
39    /// Total wall-clock deadline for the entire scan. Sites still in flight
40    /// when this elapses produce [`MatchKind::Uncertain`] outcomes.
41    pub deadline: Option<Duration>,
42}
43
44impl Default for ExecutorOptions {
45    fn default() -> Self {
46        Self {
47            concurrency: DEFAULT_CONCURRENCY,
48            deadline: None,
49        }
50    }
51}
52
53impl ExecutorOptions {
54    /// Override [`Self::concurrency`].
55    pub fn concurrency(mut self, n: NonZeroUsize) -> Self {
56        self.concurrency = n;
57        self
58    }
59
60    /// Set a total scan deadline.
61    pub fn deadline(mut self, d: Duration) -> Self {
62        self.deadline = Some(d);
63        self
64    }
65}
66
67/// Run a fan-out scan over `sites`, returning one outcome per site.
68///
69/// Results come back in completion order (not input order) — sort by name
70/// for stable presentation. A panicking site task is logged at `error` and
71/// silently dropped; transient HTTP failures already become
72/// [`MatchKind::Uncertain`] inside `Client::check`.
73pub async fn run(
74    client: &Client,
75    sites: &[Site],
76    username: &Username,
77    options: ExecutorOptions,
78) -> Vec<CheckOutcome> {
79    run_with_progress(client, sites, username, options, |_| {}).await
80}
81
82/// Variant of [`run`] that invokes `on_outcome` for each completed probe.
83///
84/// Useful for driving a live progress indicator or for emitting streaming
85/// output before the full scan finishes. The callback runs on the executor
86/// task between completions; long work inside it will throttle the loop.
87pub async fn run_with_progress<F>(
88    client: &Client,
89    sites: &[Site],
90    username: &Username,
91    options: ExecutorOptions,
92    mut on_outcome: F,
93) -> Vec<CheckOutcome>
94where
95    F: FnMut(&CheckOutcome),
96{
97    let semaphore = Arc::new(Semaphore::new(options.concurrency.get()));
98    let deadline_at = options.deadline.map(|d| TokioInstant::now() + d);
99    let mut set: JoinSet<CheckOutcome> = JoinSet::new();
100
101    for site in sites {
102        let site = site.clone();
103        let username = username.clone();
104        let client = client.clone();
105        let permits = Arc::clone(&semaphore);
106        set.spawn(async move {
107            let permit = match permits.acquire_owned().await {
108                Ok(p) => p,
109                Err(_closed) => {
110                    let reason = crate::check::UncertainReason::SchedulerClosed;
111                    return CheckOutcome {
112                        site: site.name.clone(),
113                        url: site.url_for(&username),
114                        kind: MatchKind::Uncertain,
115                        reason: Some(reason.clone()),
116                        elapsed_ms: 0,
117                        enrichment: std::collections::BTreeMap::new(),
118                        evidence: Vec::new(),
119                        profile_evidence: Vec::new(),
120                        confidence: ConfidenceScore::from_parts(
121                            MatchKind::Uncertain,
122                            Some(&reason),
123                            0,
124                            0,
125                        ),
126                        transport: None,
127                        escalations: 0,
128                    };
129                }
130            };
131            let probe = client.check(&site, &username);
132            let outcome = match deadline_at {
133                None => probe.await,
134                Some(at) => match timeout_at(at, probe).await {
135                    Ok(o) => o,
136                    Err(_elapsed) => {
137                        let reason = crate::check::UncertainReason::Deadline;
138                        CheckOutcome {
139                            site: site.name.clone(),
140                            url: site.url_for(&username),
141                            kind: MatchKind::Uncertain,
142                            reason: Some(reason.clone()),
143                            elapsed_ms: 0,
144                            enrichment: std::collections::BTreeMap::new(),
145                            evidence: Vec::new(),
146                            profile_evidence: Vec::new(),
147                            confidence: ConfidenceScore::from_parts(
148                                MatchKind::Uncertain,
149                                Some(&reason),
150                                0,
151                                0,
152                            ),
153                            transport: None,
154                            escalations: 0,
155                        }
156                    }
157                },
158            };
159            drop(permit);
160            outcome
161        });
162    }
163
164    let mut results = Vec::with_capacity(sites.len());
165    while let Some(joined) = set.join_next().await {
166        match joined {
167            Ok(outcome) => {
168                on_outcome(&outcome);
169                results.push(outcome);
170            }
171            Err(err) if err.is_cancelled() => {
172                tracing::warn!(error = %err, "check task cancelled");
173            }
174            Err(err) => {
175                tracing::error!(error = %err, "check task panicked");
176            }
177        }
178    }
179    results
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use crate::site::Signal;
186    use crate::test_fixtures::{default_site, test_client_builder};
187    use wiremock::matchers::{any, path};
188    use wiremock::{Mock, MockServer, ResponseTemplate};
189
190    /// Test sites are uniformly defined with a Found/NotFound status pair,
191    /// matching how production sites.json migrates from Phase 1.
192    fn site(server: &MockServer, name: &str, segment: &str) -> Site {
193        let mut s = default_site(name, &format!("{}/{}/{{username}}", server.uri(), segment));
194        s.signals = vec![
195            Signal::StatusFound { codes: vec![200] },
196            Signal::StatusNotFound { codes: vec![404] },
197        ];
198        s
199    }
200
201    /// Wider timeout than the default test client — executor
202    /// concurrency tests fan out 10–30 mock calls and the 2s default
203    /// is too tight on a loaded CI runner.
204    fn fast_client() -> Client {
205        test_client_builder()
206            .timeout(Duration::from_secs(5))
207            .build()
208            .expect("fast_client builds")
209    }
210
211    fn opts_with_concurrency(n: usize) -> ExecutorOptions {
212        ExecutorOptions::default().concurrency(NonZeroUsize::new(n).unwrap())
213    }
214
215    #[tokio::test]
216    async fn runs_all_sites_concurrently() {
217        let server = MockServer::start().await;
218
219        Mock::given(any())
220            .and(path("/a/alice"))
221            .respond_with(ResponseTemplate::new(200))
222            .mount(&server)
223            .await;
224        Mock::given(any())
225            .and(path("/b/alice"))
226            .respond_with(ResponseTemplate::new(404))
227            .mount(&server)
228            .await;
229        Mock::given(any())
230            .and(path("/c/alice"))
231            .respond_with(ResponseTemplate::new(200))
232            .mount(&server)
233            .await;
234
235        let sites = vec![
236            site(&server, "A", "a"),
237            site(&server, "B", "b"),
238            site(&server, "C", "c"),
239        ];
240        let user = Username::new("alice").unwrap();
241        let mut out = run(&fast_client(), &sites, &user, opts_with_concurrency(4)).await;
242        out.sort_by(|a, b| a.site.cmp(&b.site));
243
244        assert_eq!(out.len(), 3);
245        assert_eq!(out[0].kind, MatchKind::Found);
246        assert_eq!(out[1].kind, MatchKind::NotFound);
247        assert_eq!(out[2].kind, MatchKind::Found);
248    }
249
250    #[tokio::test]
251    async fn respects_concurrency_limit() {
252        let server = MockServer::start().await;
253        for i in 0..6 {
254            Mock::given(any())
255                .and(path(format!("/{i}/alice")))
256                .respond_with(ResponseTemplate::new(200).set_delay(Duration::from_millis(50)))
257                .mount(&server)
258                .await;
259        }
260        let sites: Vec<Site> = (0..6)
261            .map(|i| site(&server, &format!("S{i}"), &i.to_string()))
262            .collect();
263        let user = Username::new("alice").unwrap();
264        let started = std::time::Instant::now();
265        let out = run(&fast_client(), &sites, &user, opts_with_concurrency(2)).await;
266        let elapsed = started.elapsed();
267        assert_eq!(out.len(), 6);
268        // 6 sites / 2 concurrent * 50 ms = 150 ms floor.
269        assert!(
270            elapsed >= Duration::from_millis(120),
271            "expected ≥120 ms, got {elapsed:?}",
272        );
273    }
274
275    #[tokio::test]
276    async fn empty_input_returns_empty() {
277        let user = Username::new("alice").unwrap();
278        let out = run(&fast_client(), &[], &user, opts_with_concurrency(4)).await;
279        assert!(out.is_empty());
280    }
281
282    #[tokio::test]
283    async fn run_with_progress_invokes_callback_per_outcome() {
284        use std::sync::Mutex;
285        let server = MockServer::start().await;
286        Mock::given(any())
287            .and(path("/a/alice"))
288            .respond_with(ResponseTemplate::new(200))
289            .mount(&server)
290            .await;
291        Mock::given(any())
292            .and(path("/b/alice"))
293            .respond_with(ResponseTemplate::new(404))
294            .mount(&server)
295            .await;
296        let sites = vec![site(&server, "A", "a"), site(&server, "B", "b")];
297        let user = Username::new("alice").unwrap();
298        let calls = Mutex::new(0);
299        let outcomes = run_with_progress(
300            &fast_client(),
301            &sites,
302            &user,
303            opts_with_concurrency(4),
304            |_| *calls.lock().unwrap() += 1,
305        )
306        .await;
307        assert_eq!(outcomes.len(), 2);
308        assert_eq!(*calls.lock().unwrap(), 2);
309    }
310
311    #[tokio::test]
312    async fn deadline_marks_slow_sites_uncertain() {
313        let server = MockServer::start().await;
314        Mock::given(any())
315            .and(path("/slow/alice"))
316            .respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(2)))
317            .mount(&server)
318            .await;
319        Mock::given(any())
320            .and(path("/fast/alice"))
321            .respond_with(ResponseTemplate::new(200))
322            .mount(&server)
323            .await;
324        let sites = vec![site(&server, "Slow", "slow"), site(&server, "Fast", "fast")];
325        let user = Username::new("alice").unwrap();
326        let options = ExecutorOptions::default()
327            .concurrency(NonZeroUsize::new(4).unwrap())
328            .deadline(Duration::from_millis(200));
329        let started = std::time::Instant::now();
330        let mut out = run(&fast_client(), &sites, &user, options).await;
331        let elapsed = started.elapsed();
332        out.sort_by(|a, b| a.site.cmp(&b.site));
333
334        assert_eq!(out.len(), 2);
335        // Fast site completed; slow one hit the deadline.
336        let fast = out.iter().find(|o| o.site == "Fast").unwrap();
337        let slow = out.iter().find(|o| o.site == "Slow").unwrap();
338        assert_eq!(fast.kind, MatchKind::Found);
339        assert_eq!(slow.kind, MatchKind::Uncertain);
340        assert_eq!(slow.reason, Some(crate::check::UncertainReason::Deadline));
341        assert!(
342            elapsed < Duration::from_millis(800),
343            "scan should abort near the deadline, got {elapsed:?}",
344        );
345    }
346}