1use std::num::NonZeroUsize;
10use std::sync::Arc;
11use std::time::Duration;
12
13use tokio::sync::Semaphore;
14use tokio::task::JoinSet;
15use tokio::time::{Instant as TokioInstant, timeout_at};
16
17use crate::check::{CheckOutcome, MatchKind};
18use crate::client::Client;
19use crate::site::Site;
20use crate::username::Username;
21
22const DEFAULT_CONCURRENCY: NonZeroUsize = match NonZeroUsize::new(32) {
28 Some(n) => n,
29 None => unreachable!(),
30};
31
32#[derive(Debug, Clone)]
34#[must_use = "ExecutorOptions does nothing until passed to executor::run"]
35pub struct ExecutorOptions {
36 pub concurrency: NonZeroUsize,
38 pub deadline: Option<Duration>,
41}
42
43impl Default for ExecutorOptions {
44 fn default() -> Self {
45 Self {
46 concurrency: DEFAULT_CONCURRENCY,
47 deadline: None,
48 }
49 }
50}
51
52impl ExecutorOptions {
53 pub fn concurrency(mut self, n: NonZeroUsize) -> Self {
55 self.concurrency = n;
56 self
57 }
58
59 pub fn deadline(mut self, d: Duration) -> Self {
61 self.deadline = Some(d);
62 self
63 }
64}
65
66pub async fn run(
73 client: &Client,
74 sites: &[Site],
75 username: &Username,
76 options: ExecutorOptions,
77) -> Vec<CheckOutcome> {
78 run_with_progress(client, sites, username, options, |_| {}).await
79}
80
81pub async fn run_with_progress<F>(
87 client: &Client,
88 sites: &[Site],
89 username: &Username,
90 options: ExecutorOptions,
91 mut on_outcome: F,
92) -> Vec<CheckOutcome>
93where
94 F: FnMut(&CheckOutcome),
95{
96 let semaphore = Arc::new(Semaphore::new(options.concurrency.get()));
97 let deadline_at = options.deadline.map(|d| TokioInstant::now() + d);
98 let mut set: JoinSet<CheckOutcome> = JoinSet::new();
99
100 for site in sites {
101 let site = site.clone();
102 let username = username.clone();
103 let client = client.clone();
104 let permits = Arc::clone(&semaphore);
105 set.spawn(async move {
106 let permit = match permits.acquire_owned().await {
107 Ok(p) => p,
108 Err(_closed) => {
109 return CheckOutcome {
110 site: site.name.clone(),
111 url: site.url_for(&username),
112 kind: MatchKind::Uncertain,
113 reason: Some(crate::check::UncertainReason::SchedulerClosed),
114 elapsed_ms: 0,
115 enrichment: std::collections::BTreeMap::new(),
116 evidence: Vec::new(),
117 transport: None,
118 escalations: 0,
119 };
120 }
121 };
122 let probe = client.check(&site, &username);
123 let outcome = match deadline_at {
124 None => probe.await,
125 Some(at) => match timeout_at(at, probe).await {
126 Ok(o) => o,
127 Err(_elapsed) => CheckOutcome {
128 site: site.name.clone(),
129 url: site.url_for(&username),
130 kind: MatchKind::Uncertain,
131 reason: Some(crate::check::UncertainReason::Deadline),
132 elapsed_ms: 0,
133 enrichment: std::collections::BTreeMap::new(),
134 evidence: Vec::new(),
135 transport: None,
136 escalations: 0,
137 },
138 },
139 };
140 drop(permit);
141 outcome
142 });
143 }
144
145 let mut results = Vec::with_capacity(sites.len());
146 while let Some(joined) = set.join_next().await {
147 match joined {
148 Ok(outcome) => {
149 on_outcome(&outcome);
150 results.push(outcome);
151 }
152 Err(err) if err.is_cancelled() => {
153 tracing::warn!(error = %err, "check task cancelled");
154 }
155 Err(err) => {
156 tracing::error!(error = %err, "check task panicked");
157 }
158 }
159 }
160 results
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166 use crate::site::{Signal, UrlTemplate};
167 use wiremock::matchers::{any, path};
168 use wiremock::{Mock, MockServer, ResponseTemplate};
169
170 fn site(server: &MockServer, name: &str, segment: &str) -> Site {
173 Site {
174 name: name.into(),
175 url: UrlTemplate::new(format!("{}/{}/{{username}}", server.uri(), segment)).unwrap(),
176 signals: vec![
177 Signal::StatusFound { codes: vec![200] },
178 Signal::StatusNotFound { codes: vec![404] },
179 ],
180 known_present: None,
181 known_absent: None,
182 extract: Vec::new(),
183 tags: Vec::new(),
184 request_headers: std::collections::BTreeMap::new(),
185 regex_check: None,
186 engine: None,
187 strip_bad_char: None,
188 request_method: crate::site::HttpMethod::Get,
189 request_body: None,
190 protection: Vec::new(),
191 disabled: false,
192 disabled_reason: None,
193 source: None,
194 popularity: None,
195 access: crate::AccessPolicy::default(),
196 }
197 }
198
199 fn fast_client() -> Client {
200 Client::builder()
201 .timeout(Duration::from_secs(5))
202 .min_request_interval(Duration::ZERO)
205 .build()
206 .unwrap()
207 }
208
209 fn opts_with_concurrency(n: usize) -> ExecutorOptions {
210 ExecutorOptions::default().concurrency(NonZeroUsize::new(n).unwrap())
211 }
212
213 #[tokio::test]
214 async fn runs_all_sites_concurrently() {
215 let server = MockServer::start().await;
216
217 Mock::given(any())
218 .and(path("/a/alice"))
219 .respond_with(ResponseTemplate::new(200))
220 .mount(&server)
221 .await;
222 Mock::given(any())
223 .and(path("/b/alice"))
224 .respond_with(ResponseTemplate::new(404))
225 .mount(&server)
226 .await;
227 Mock::given(any())
228 .and(path("/c/alice"))
229 .respond_with(ResponseTemplate::new(200))
230 .mount(&server)
231 .await;
232
233 let sites = vec![
234 site(&server, "A", "a"),
235 site(&server, "B", "b"),
236 site(&server, "C", "c"),
237 ];
238 let user = Username::new("alice").unwrap();
239 let mut out = run(&fast_client(), &sites, &user, opts_with_concurrency(4)).await;
240 out.sort_by(|a, b| a.site.cmp(&b.site));
241
242 assert_eq!(out.len(), 3);
243 assert_eq!(out[0].kind, MatchKind::Found);
244 assert_eq!(out[1].kind, MatchKind::NotFound);
245 assert_eq!(out[2].kind, MatchKind::Found);
246 }
247
248 #[tokio::test]
249 async fn respects_concurrency_limit() {
250 let server = MockServer::start().await;
251 for i in 0..6 {
252 Mock::given(any())
253 .and(path(format!("/{i}/alice")))
254 .respond_with(ResponseTemplate::new(200).set_delay(Duration::from_millis(50)))
255 .mount(&server)
256 .await;
257 }
258 let sites: Vec<Site> = (0..6)
259 .map(|i| site(&server, &format!("S{i}"), &i.to_string()))
260 .collect();
261 let user = Username::new("alice").unwrap();
262 let started = std::time::Instant::now();
263 let out = run(&fast_client(), &sites, &user, opts_with_concurrency(2)).await;
264 let elapsed = started.elapsed();
265 assert_eq!(out.len(), 6);
266 assert!(
268 elapsed >= Duration::from_millis(120),
269 "expected ≥120 ms, got {elapsed:?}",
270 );
271 }
272
273 #[tokio::test]
274 async fn empty_input_returns_empty() {
275 let user = Username::new("alice").unwrap();
276 let out = run(&fast_client(), &[], &user, opts_with_concurrency(4)).await;
277 assert!(out.is_empty());
278 }
279
280 #[tokio::test]
281 async fn run_with_progress_invokes_callback_per_outcome() {
282 use std::sync::Mutex;
283 let server = MockServer::start().await;
284 Mock::given(any())
285 .and(path("/a/alice"))
286 .respond_with(ResponseTemplate::new(200))
287 .mount(&server)
288 .await;
289 Mock::given(any())
290 .and(path("/b/alice"))
291 .respond_with(ResponseTemplate::new(404))
292 .mount(&server)
293 .await;
294 let sites = vec![site(&server, "A", "a"), site(&server, "B", "b")];
295 let user = Username::new("alice").unwrap();
296 let calls = Mutex::new(0);
297 let outcomes = run_with_progress(
298 &fast_client(),
299 &sites,
300 &user,
301 opts_with_concurrency(4),
302 |_| *calls.lock().unwrap() += 1,
303 )
304 .await;
305 assert_eq!(outcomes.len(), 2);
306 assert_eq!(*calls.lock().unwrap(), 2);
307 }
308
309 #[tokio::test]
310 async fn deadline_marks_slow_sites_uncertain() {
311 let server = MockServer::start().await;
312 Mock::given(any())
313 .and(path("/slow/alice"))
314 .respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(2)))
315 .mount(&server)
316 .await;
317 Mock::given(any())
318 .and(path("/fast/alice"))
319 .respond_with(ResponseTemplate::new(200))
320 .mount(&server)
321 .await;
322 let sites = vec![site(&server, "Slow", "slow"), site(&server, "Fast", "fast")];
323 let user = Username::new("alice").unwrap();
324 let options = ExecutorOptions::default()
325 .concurrency(NonZeroUsize::new(4).unwrap())
326 .deadline(Duration::from_millis(200));
327 let started = std::time::Instant::now();
328 let mut out = run(&fast_client(), &sites, &user, options).await;
329 let elapsed = started.elapsed();
330 out.sort_by(|a, b| a.site.cmp(&b.site));
331
332 assert_eq!(out.len(), 2);
333 let fast = out.iter().find(|o| o.site == "Fast").unwrap();
335 let slow = out.iter().find(|o| o.site == "Slow").unwrap();
336 assert_eq!(fast.kind, MatchKind::Found);
337 assert_eq!(slow.kind, MatchKind::Uncertain);
338 assert_eq!(slow.reason, Some(crate::check::UncertainReason::Deadline));
339 assert!(
340 elapsed < Duration::from_millis(800),
341 "scan should abort near the deadline, got {elapsed:?}",
342 );
343 }
344}