1use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6 config::{
7 core::EvaluatableProfile,
8 elements::{MaxFail, RetryPolicy, TestGroup, TestThreads},
9 scripts::SetupScriptExecuteData,
10 },
11 double_spawn::DoubleSpawnInfo,
12 errors::{ConfigureHandleInheritanceError, TestRunnerBuildError, TestRunnerExecuteErrors},
13 input::{InputHandler, InputHandlerKind, InputHandlerStatus},
14 list::{TestInstanceWithSettings, TestList},
15 reporter::events::{RunStats, TestEvent},
16 runner::ExecutorEvent,
17 signal::{SignalHandler, SignalHandlerKind},
18 target_runner::TargetRunner,
19 test_output::CaptureStrategy,
20};
21use async_scoped::TokioScope;
22use future_queue::{FutureQueueContext, StreamExt};
23use futures::prelude::*;
24use nextest_metadata::FilterMatch;
25use quick_junit::ReportUuid;
26use std::{convert::Infallible, fmt, sync::Arc};
27use tokio::{
28 runtime::Runtime,
29 sync::{mpsc::unbounded_channel, oneshot},
30 task::JoinError,
31};
32use tracing::{debug, warn};
33
34#[derive(Debug, Default)]
36pub struct TestRunnerBuilder {
37 capture_strategy: CaptureStrategy,
38 retries: Option<RetryPolicy>,
39 max_fail: Option<MaxFail>,
40 test_threads: Option<TestThreads>,
41}
42
43impl TestRunnerBuilder {
44 pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
55 self.capture_strategy = strategy;
56 self
57 }
58
59 pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
61 self.retries = Some(retries);
62 self
63 }
64
65 pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
67 self.max_fail = Some(max_fail);
68 self
69 }
70
71 pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
73 self.test_threads = Some(test_threads);
74 self
75 }
76
77 #[expect(clippy::too_many_arguments)]
79 pub fn build<'a>(
80 self,
81 test_list: &'a TestList,
82 profile: &'a EvaluatableProfile<'a>,
83 cli_args: Vec<String>,
84 signal_handler: SignalHandlerKind,
85 input_handler: InputHandlerKind,
86 double_spawn: DoubleSpawnInfo,
87 target_runner: TargetRunner,
88 ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
89 let test_threads = match self.capture_strategy {
90 CaptureStrategy::None => 1,
91 CaptureStrategy::Combined | CaptureStrategy::Split => self
92 .test_threads
93 .unwrap_or_else(|| profile.test_threads())
94 .compute(),
95 };
96 let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
97
98 let runtime = tokio::runtime::Builder::new_multi_thread()
99 .enable_all()
100 .thread_name("nextest-runner-worker")
101 .build()
102 .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
103 let _guard = runtime.enter();
104
105 let signal_handler = signal_handler.build()?;
107
108 let input_handler = input_handler.build();
109
110 Ok(TestRunner {
111 inner: TestRunnerInner {
112 run_id: ReportUuid::new_v4(),
113 profile,
114 test_list,
115 test_threads,
116 double_spawn,
117 target_runner,
118 capture_strategy: self.capture_strategy,
119 force_retries: self.retries,
120 cli_args,
121 max_fail,
122 runtime,
123 },
124 signal_handler,
125 input_handler,
126 })
127 }
128}
129
130#[derive(Debug)]
134pub struct TestRunner<'a> {
135 inner: TestRunnerInner<'a>,
136 signal_handler: SignalHandler,
137 input_handler: InputHandler,
138}
139
140impl<'a> TestRunner<'a> {
141 pub fn input_handler_status(&self) -> InputHandlerStatus {
143 self.input_handler.status()
144 }
145
146 pub fn execute<F>(
152 self,
153 mut callback: F,
154 ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
155 where
156 F: FnMut(TestEvent<'a>) + Send,
157 {
158 self.try_execute::<Infallible, _>(|test_event| {
159 callback(test_event);
160 Ok(())
161 })
162 }
163
164 pub fn try_execute<E, F>(
171 mut self,
172 mut callback: F,
173 ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
174 where
175 F: FnMut(TestEvent<'a>) -> Result<(), E> + Send,
176 E: fmt::Debug + Send,
177 {
178 let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
179
180 let mut report_cancel_tx = Some(report_cancel_tx);
184 let mut first_error = None;
185
186 let res = self.inner.execute(
187 &mut self.signal_handler,
188 &mut self.input_handler,
189 report_cancel_rx,
190 |event| {
191 match callback(event) {
192 Ok(()) => {}
193 Err(error) => {
194 if let Some(report_cancel_tx) = report_cancel_tx.take() {
198 let _ = report_cancel_tx.send(());
199 first_error = Some(error);
200 }
201 }
202 }
203 },
204 );
205
206 self.inner.runtime.shutdown_background();
210
211 match (res, first_error) {
212 (Ok(run_stats), None) => Ok(run_stats),
213 (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
214 report_error: Some(report_error),
215 join_errors: Vec::new(),
216 }),
217 (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
218 report_error,
219 join_errors,
220 }),
221 }
222 }
223}
224
225#[derive(Debug)]
226struct TestRunnerInner<'a> {
227 run_id: ReportUuid,
228 profile: &'a EvaluatableProfile<'a>,
229 test_list: &'a TestList<'a>,
230 test_threads: usize,
231 double_spawn: DoubleSpawnInfo,
232 target_runner: TargetRunner,
233 capture_strategy: CaptureStrategy,
234 force_retries: Option<RetryPolicy>,
235 cli_args: Vec<String>,
236 max_fail: MaxFail,
237 runtime: Runtime,
238}
239
240impl<'a> TestRunnerInner<'a> {
241 fn execute<F>(
242 &self,
243 signal_handler: &mut SignalHandler,
244 input_handler: &mut InputHandler,
245 report_cancel_rx: oneshot::Receiver<()>,
246 callback: F,
247 ) -> Result<RunStats, Vec<JoinError>>
248 where
249 F: FnMut(TestEvent<'a>) + Send,
250 {
251 let mut dispatcher_cx = DispatcherContext::new(
254 callback,
255 self.run_id,
256 self.profile.name(),
257 self.cli_args.clone(),
258 self.test_list.run_count(),
259 self.max_fail,
260 self.profile.global_timeout().period,
261 );
262
263 let executor_cx = ExecutorContext::new(
264 self.run_id,
265 self.profile,
266 self.test_list,
267 self.double_spawn.clone(),
268 self.target_runner.clone(),
269 self.capture_strategy,
270 self.force_retries,
271 );
272
273 dispatcher_cx.run_started(self.test_list);
277
278 let executor_cx_ref = &executor_cx;
279 let dispatcher_cx_mut = &mut dispatcher_cx;
280
281 let _guard = self.runtime.enter();
282
283 let ((), results) = TokioScope::scope_and_block(move |scope| {
284 let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
285
286 let dispatcher_fut =
288 dispatcher_cx_mut.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
289 scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
290
291 let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
292 let script_resp_tx = resp_tx.clone();
293 let run_scripts_fut = async move {
294 let script_data = executor_cx_ref.run_setup_scripts(script_resp_tx).await;
297 if script_tx.send(script_data).is_err() {
298 debug!("script_tx.send failed, shutting down");
300 }
301 RunnerTaskState::finished_no_children()
302 };
303 scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
304
305 let Some(script_data) = script_rx.blocking_recv() else {
306 debug!("no script data received, shutting down");
308 return;
309 };
310
311 let groups = self
313 .profile
314 .test_group_config()
315 .iter()
316 .map(|(group_name, config)| (group_name, config.max_threads.compute()));
317
318 let setup_script_data = Arc::new(script_data);
319
320 let filter_resp_tx = resp_tx.clone();
321
322 let tests = self.test_list.to_priority_queue(self.profile);
323 let run_tests_fut = futures::stream::iter(tests)
324 .filter_map(move |test| {
325 let filter_resp_tx = filter_resp_tx.clone();
333 async move {
334 if let FilterMatch::Mismatch { reason } =
335 test.instance.test_info.filter_match
336 {
337 let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
339 test_instance: test.instance,
340 reason,
341 });
342 return None;
343 }
344 Some(test)
345 }
346 })
347 .map(move |test: TestInstanceWithSettings<'a>| {
348 let threads_required =
349 test.settings.threads_required().compute(self.test_threads);
350 let test_group = match test.settings.test_group() {
351 TestGroup::Global => None,
352 TestGroup::Custom(name) => Some(name.clone()),
353 };
354 let resp_tx = resp_tx.clone();
355 let setup_script_data = setup_script_data.clone();
356
357 let test_instance = test.instance;
358
359 let f = move |cx: FutureQueueContext| {
360 debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
361 async move {
367 let ((), mut ret) = unsafe {
381 TokioScope::scope_and_collect(move |scope| {
382 scope.spawn(executor_cx_ref.run_test_instance(
383 test,
384 cx,
385 resp_tx.clone(),
386 setup_script_data,
387 ))
388 })
389 }
390 .await;
391
392 let Some(result) = ret.pop() else {
395 warn!(
396 "no task was started for test instance: {}",
397 test_instance.id()
398 );
399 return None;
400 };
401 result.err()
402 }
403 };
404
405 (threads_required, test_group, f)
406 })
407 .future_queue_grouped(self.test_threads, groups)
410 .filter_map(std::future::ready)
412 .collect::<Vec<_>>()
413 .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
418
419 scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
420 });
421
422 dispatcher_cx.run_finished();
423
424 let mut cancelled_count = 0;
431 let join_errors = results
432 .into_iter()
433 .flat_map(|r| {
434 match r {
435 Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
436 Ok(RunnerTaskState::Cancelled) => {
439 cancelled_count += 1;
440 Vec::new()
441 }
442 Err(join_error) => vec![join_error],
443 }
444 })
445 .collect::<Vec<_>>();
446
447 if cancelled_count > 0 {
448 debug!(
449 "{} tasks were cancelled -- this \
450 generally should only happen due to panics",
451 cancelled_count
452 );
453 }
454 if !join_errors.is_empty() {
455 return Err(join_errors);
456 }
457 Ok(dispatcher_cx.run_stats())
458 }
459}
460
461pub fn configure_handle_inheritance(
474 no_capture: bool,
475) -> Result<(), ConfigureHandleInheritanceError> {
476 super::os::configure_handle_inheritance_impl(no_capture)
477}
478
479#[cfg(test)]
480mod tests {
481 use super::*;
482 use crate::{config::core::NextestConfig, platform::BuildPlatforms};
483
484 #[test]
485 fn no_capture_settings() {
486 let mut builder = TestRunnerBuilder::default();
488 builder
489 .set_capture_strategy(CaptureStrategy::None)
490 .set_test_threads(TestThreads::Count(20));
491 let test_list = TestList::empty();
492 let config = NextestConfig::default_config("/fake/dir");
493 let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
494 let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
495 let signal_handler = SignalHandlerKind::Noop;
496 let input_handler = InputHandlerKind::Noop;
497 let profile = profile.apply_build_platforms(&build_platforms);
498 let runner = builder
499 .build(
500 &test_list,
501 &profile,
502 vec![],
503 signal_handler,
504 input_handler,
505 DoubleSpawnInfo::disabled(),
506 TargetRunner::empty(),
507 )
508 .unwrap();
509 assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
510 assert_eq!(runner.inner.test_threads, 1, "tests run serially");
511 }
512}