1use std::num::NonZeroUsize;
10use std::sync::Arc;
11use std::time::Duration;
12
13use tokio::sync::Semaphore;
14use tokio::task::JoinSet;
15use tokio::time::{Instant as TokioInstant, timeout_at};
16
17use crate::check::{CheckOutcome, MatchKind};
18use crate::client::Client;
19use crate::confidence::ConfidenceScore;
20use crate::site::Site;
21use crate::username::Username;
22
23const DEFAULT_CONCURRENCY: NonZeroUsize = match NonZeroUsize::new(32) {
29 Some(n) => n,
30 None => unreachable!(),
31};
32
33#[derive(Debug, Clone)]
35#[must_use = "ExecutorOptions does nothing until passed to executor::run"]
36pub struct ExecutorOptions {
37 pub concurrency: NonZeroUsize,
39 pub deadline: Option<Duration>,
42}
43
44impl Default for ExecutorOptions {
45 fn default() -> Self {
46 Self {
47 concurrency: DEFAULT_CONCURRENCY,
48 deadline: None,
49 }
50 }
51}
52
53impl ExecutorOptions {
54 pub fn concurrency(mut self, n: NonZeroUsize) -> Self {
56 self.concurrency = n;
57 self
58 }
59
60 pub fn deadline(mut self, d: Duration) -> Self {
62 self.deadline = Some(d);
63 self
64 }
65}
66
67pub async fn run(
74 client: &Client,
75 sites: &[Site],
76 username: &Username,
77 options: ExecutorOptions,
78) -> Vec<CheckOutcome> {
79 run_with_progress(client, sites, username, options, |_| {}).await
80}
81
82pub async fn run_with_progress<F>(
88 client: &Client,
89 sites: &[Site],
90 username: &Username,
91 options: ExecutorOptions,
92 mut on_outcome: F,
93) -> Vec<CheckOutcome>
94where
95 F: FnMut(&CheckOutcome),
96{
97 let semaphore = Arc::new(Semaphore::new(options.concurrency.get()));
98 let deadline_at = options.deadline.map(|d| TokioInstant::now() + d);
99 let mut set: JoinSet<CheckOutcome> = JoinSet::new();
100
101 for site in sites {
102 let site = site.clone();
103 let username = username.clone();
104 let client = client.clone();
105 let permits = Arc::clone(&semaphore);
106 set.spawn(async move {
107 let permit = match permits.acquire_owned().await {
108 Ok(p) => p,
109 Err(_closed) => {
110 let reason = crate::check::UncertainReason::SchedulerClosed;
111 return CheckOutcome {
112 site: site.name.clone(),
113 url: site.url_for(&username),
114 kind: MatchKind::Uncertain,
115 reason: Some(reason.clone()),
116 elapsed_ms: 0,
117 enrichment: std::collections::BTreeMap::new(),
118 evidence: Vec::new(),
119 profile_evidence: Vec::new(),
120 confidence: ConfidenceScore::from_parts(
121 MatchKind::Uncertain,
122 Some(&reason),
123 0,
124 0,
125 ),
126 transport: None,
127 escalations: 0,
128 };
129 }
130 };
131 let probe = client.check(&site, &username);
132 let outcome = match deadline_at {
133 None => probe.await,
134 Some(at) => match timeout_at(at, probe).await {
135 Ok(o) => o,
136 Err(_elapsed) => {
137 let reason = crate::check::UncertainReason::Deadline;
138 CheckOutcome {
139 site: site.name.clone(),
140 url: site.url_for(&username),
141 kind: MatchKind::Uncertain,
142 reason: Some(reason.clone()),
143 elapsed_ms: 0,
144 enrichment: std::collections::BTreeMap::new(),
145 evidence: Vec::new(),
146 profile_evidence: Vec::new(),
147 confidence: ConfidenceScore::from_parts(
148 MatchKind::Uncertain,
149 Some(&reason),
150 0,
151 0,
152 ),
153 transport: None,
154 escalations: 0,
155 }
156 }
157 },
158 };
159 drop(permit);
160 outcome
161 });
162 }
163
164 let mut results = Vec::with_capacity(sites.len());
165 while let Some(joined) = set.join_next().await {
166 match joined {
167 Ok(outcome) => {
168 on_outcome(&outcome);
169 results.push(outcome);
170 }
171 Err(err) if err.is_cancelled() => {
172 tracing::warn!(error = %err, "check task cancelled");
173 }
174 Err(err) => {
175 tracing::error!(error = %err, "check task panicked");
176 }
177 }
178 }
179 results
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use crate::site::Signal;
186 use crate::test_fixtures::{default_site, test_client_builder};
187 use wiremock::matchers::{any, path};
188 use wiremock::{Mock, MockServer, ResponseTemplate};
189
190 fn site(server: &MockServer, name: &str, segment: &str) -> Site {
193 let mut s = default_site(name, &format!("{}/{}/{{username}}", server.uri(), segment));
194 s.signals = vec![
195 Signal::StatusFound { codes: vec![200] },
196 Signal::StatusNotFound { codes: vec![404] },
197 ];
198 s
199 }
200
201 fn fast_client() -> Client {
205 test_client_builder()
206 .timeout(Duration::from_secs(5))
207 .build()
208 .expect("fast_client builds")
209 }
210
211 fn opts_with_concurrency(n: usize) -> ExecutorOptions {
212 ExecutorOptions::default().concurrency(NonZeroUsize::new(n).unwrap())
213 }
214
215 #[tokio::test]
216 async fn runs_all_sites_concurrently() {
217 let server = MockServer::start().await;
218
219 Mock::given(any())
220 .and(path("/a/alice"))
221 .respond_with(ResponseTemplate::new(200))
222 .mount(&server)
223 .await;
224 Mock::given(any())
225 .and(path("/b/alice"))
226 .respond_with(ResponseTemplate::new(404))
227 .mount(&server)
228 .await;
229 Mock::given(any())
230 .and(path("/c/alice"))
231 .respond_with(ResponseTemplate::new(200))
232 .mount(&server)
233 .await;
234
235 let sites = vec![
236 site(&server, "A", "a"),
237 site(&server, "B", "b"),
238 site(&server, "C", "c"),
239 ];
240 let user = Username::new("alice").unwrap();
241 let mut out = run(&fast_client(), &sites, &user, opts_with_concurrency(4)).await;
242 out.sort_by(|a, b| a.site.cmp(&b.site));
243
244 assert_eq!(out.len(), 3);
245 assert_eq!(out[0].kind, MatchKind::Found);
246 assert_eq!(out[1].kind, MatchKind::NotFound);
247 assert_eq!(out[2].kind, MatchKind::Found);
248 }
249
250 #[tokio::test]
251 async fn respects_concurrency_limit() {
252 let server = MockServer::start().await;
253 for i in 0..6 {
254 Mock::given(any())
255 .and(path(format!("/{i}/alice")))
256 .respond_with(ResponseTemplate::new(200).set_delay(Duration::from_millis(50)))
257 .mount(&server)
258 .await;
259 }
260 let sites: Vec<Site> = (0..6)
261 .map(|i| site(&server, &format!("S{i}"), &i.to_string()))
262 .collect();
263 let user = Username::new("alice").unwrap();
264 let started = std::time::Instant::now();
265 let out = run(&fast_client(), &sites, &user, opts_with_concurrency(2)).await;
266 let elapsed = started.elapsed();
267 assert_eq!(out.len(), 6);
268 assert!(
270 elapsed >= Duration::from_millis(120),
271 "expected ≥120 ms, got {elapsed:?}",
272 );
273 }
274
275 #[tokio::test]
276 async fn empty_input_returns_empty() {
277 let user = Username::new("alice").unwrap();
278 let out = run(&fast_client(), &[], &user, opts_with_concurrency(4)).await;
279 assert!(out.is_empty());
280 }
281
282 #[tokio::test]
283 async fn run_with_progress_invokes_callback_per_outcome() {
284 use std::sync::Mutex;
285 let server = MockServer::start().await;
286 Mock::given(any())
287 .and(path("/a/alice"))
288 .respond_with(ResponseTemplate::new(200))
289 .mount(&server)
290 .await;
291 Mock::given(any())
292 .and(path("/b/alice"))
293 .respond_with(ResponseTemplate::new(404))
294 .mount(&server)
295 .await;
296 let sites = vec![site(&server, "A", "a"), site(&server, "B", "b")];
297 let user = Username::new("alice").unwrap();
298 let calls = Mutex::new(0);
299 let outcomes = run_with_progress(
300 &fast_client(),
301 &sites,
302 &user,
303 opts_with_concurrency(4),
304 |_| *calls.lock().unwrap() += 1,
305 )
306 .await;
307 assert_eq!(outcomes.len(), 2);
308 assert_eq!(*calls.lock().unwrap(), 2);
309 }
310
311 #[tokio::test]
312 async fn deadline_marks_slow_sites_uncertain() {
313 let server = MockServer::start().await;
314 Mock::given(any())
315 .and(path("/slow/alice"))
316 .respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(2)))
317 .mount(&server)
318 .await;
319 Mock::given(any())
320 .and(path("/fast/alice"))
321 .respond_with(ResponseTemplate::new(200))
322 .mount(&server)
323 .await;
324 let sites = vec![site(&server, "Slow", "slow"), site(&server, "Fast", "fast")];
325 let user = Username::new("alice").unwrap();
326 let options = ExecutorOptions::default()
327 .concurrency(NonZeroUsize::new(4).unwrap())
328 .deadline(Duration::from_millis(200));
329 let started = std::time::Instant::now();
330 let mut out = run(&fast_client(), &sites, &user, options).await;
331 let elapsed = started.elapsed();
332 out.sort_by(|a, b| a.site.cmp(&b.site));
333
334 assert_eq!(out.len(), 2);
335 let fast = out.iter().find(|o| o.site == "Fast").unwrap();
337 let slow = out.iter().find(|o| o.site == "Slow").unwrap();
338 assert_eq!(fast.kind, MatchKind::Found);
339 assert_eq!(slow.kind, MatchKind::Uncertain);
340 assert_eq!(slow.reason, Some(crate::check::UncertainReason::Deadline));
341 assert!(
342 elapsed < Duration::from_millis(800),
343 "scan should abort near the deadline, got {elapsed:?}",
344 );
345 }
346}