nextest_runner/runner/
imp.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6    config::{
7        core::EvaluatableProfile,
8        elements::{MaxFail, RetryPolicy, TestGroup, TestThreads},
9        scripts::SetupScriptExecuteData,
10    },
11    double_spawn::DoubleSpawnInfo,
12    errors::{ConfigureHandleInheritanceError, TestRunnerBuildError, TestRunnerExecuteErrors},
13    input::{InputHandler, InputHandlerKind, InputHandlerStatus},
14    list::{TestInstanceWithSettings, TestList},
15    reporter::events::{RunStats, TestEvent},
16    runner::ExecutorEvent,
17    signal::{SignalHandler, SignalHandlerKind},
18    target_runner::TargetRunner,
19    test_output::CaptureStrategy,
20};
21use async_scoped::TokioScope;
22use future_queue::{FutureQueueContext, StreamExt};
23use futures::prelude::*;
24use nextest_metadata::FilterMatch;
25use quick_junit::ReportUuid;
26use std::{convert::Infallible, fmt, sync::Arc};
27use tokio::{
28    runtime::Runtime,
29    sync::{mpsc::unbounded_channel, oneshot},
30    task::JoinError,
31};
32use tracing::{debug, warn};
33
34/// Test runner options.
35#[derive(Debug, Default)]
36pub struct TestRunnerBuilder {
37    capture_strategy: CaptureStrategy,
38    retries: Option<RetryPolicy>,
39    max_fail: Option<MaxFail>,
40    test_threads: Option<TestThreads>,
41}
42
43impl TestRunnerBuilder {
44    /// Sets the capture strategy for the test runner
45    ///
46    /// * [`CaptureStrategy::Split`]
47    ///   * pro: output from `stdout` and `stderr` can be identified and easily split
48    ///   * con: ordering between the streams cannot be guaranteed
49    /// * [`CaptureStrategy::Combined`]
50    ///   * pro: output is guaranteed to be ordered as it would in a terminal emulator
51    ///   * con: distinction between `stdout` and `stderr` is lost
52    /// * [`CaptureStrategy::None`] -
53    ///   * In this mode, tests will always be run serially: `test_threads` will always be 1.
54    pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
55        self.capture_strategy = strategy;
56        self
57    }
58
59    /// Sets the number of retries for this test runner.
60    pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
61        self.retries = Some(retries);
62        self
63    }
64
65    /// Sets the max-fail value for this test runner.
66    pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
67        self.max_fail = Some(max_fail);
68        self
69    }
70
71    /// Sets the number of tests to run simultaneously.
72    pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
73        self.test_threads = Some(test_threads);
74        self
75    }
76
77    /// Creates a new test runner.
78    #[expect(clippy::too_many_arguments)]
79    pub fn build<'a>(
80        self,
81        test_list: &'a TestList,
82        profile: &'a EvaluatableProfile<'a>,
83        cli_args: Vec<String>,
84        signal_handler: SignalHandlerKind,
85        input_handler: InputHandlerKind,
86        double_spawn: DoubleSpawnInfo,
87        target_runner: TargetRunner,
88    ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
89        let test_threads = match self.capture_strategy {
90            CaptureStrategy::None => 1,
91            CaptureStrategy::Combined | CaptureStrategy::Split => self
92                .test_threads
93                .unwrap_or_else(|| profile.test_threads())
94                .compute(),
95        };
96        let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
97
98        let runtime = tokio::runtime::Builder::new_multi_thread()
99            .enable_all()
100            .thread_name("nextest-runner-worker")
101            .build()
102            .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
103        let _guard = runtime.enter();
104
105        // signal_handler.build() must be called from within the guard.
106        let signal_handler = signal_handler.build()?;
107
108        let input_handler = input_handler.build();
109
110        Ok(TestRunner {
111            inner: TestRunnerInner {
112                run_id: ReportUuid::new_v4(),
113                profile,
114                test_list,
115                test_threads,
116                double_spawn,
117                target_runner,
118                capture_strategy: self.capture_strategy,
119                force_retries: self.retries,
120                cli_args,
121                max_fail,
122                runtime,
123            },
124            signal_handler,
125            input_handler,
126        })
127    }
128}
129
130/// Context for running tests.
131///
132/// Created using [`TestRunnerBuilder::build`].
133#[derive(Debug)]
134pub struct TestRunner<'a> {
135    inner: TestRunnerInner<'a>,
136    signal_handler: SignalHandler,
137    input_handler: InputHandler,
138}
139
140impl<'a> TestRunner<'a> {
141    /// Returns the status of the input handler.
142    pub fn input_handler_status(&self) -> InputHandlerStatus {
143        self.input_handler.status()
144    }
145
146    /// Executes the listed tests, each one in its own process.
147    ///
148    /// The callback is called with the results of each test.
149    ///
150    /// Returns an error if any of the tasks panicked.
151    pub fn execute<F>(
152        self,
153        mut callback: F,
154    ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
155    where
156        F: FnMut(TestEvent<'a>) + Send,
157    {
158        self.try_execute::<Infallible, _>(|test_event| {
159            callback(test_event);
160            Ok(())
161        })
162    }
163
164    /// Executes the listed tests, each one in its own process.
165    ///
166    /// Accepts a callback that is called with the results of each test. If the callback returns an
167    /// error, the test run terminates and the callback is no longer called.
168    ///
169    /// Returns an error if any of the tasks panicked.
170    pub fn try_execute<E, F>(
171        mut self,
172        mut callback: F,
173    ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
174    where
175        F: FnMut(TestEvent<'a>) -> Result<(), E> + Send,
176        E: fmt::Debug + Send,
177    {
178        let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
179
180        // If report_cancel_tx is None, at least one error has occurred and the
181        // runner has been instructed to shut down. first_error is also set to
182        // Some in that case.
183        let mut report_cancel_tx = Some(report_cancel_tx);
184        let mut first_error = None;
185
186        let res = self.inner.execute(
187            &mut self.signal_handler,
188            &mut self.input_handler,
189            report_cancel_rx,
190            |event| {
191                match callback(event) {
192                    Ok(()) => {}
193                    Err(error) => {
194                        // If the callback fails, we need to let the runner know to start shutting
195                        // down. But we keep reporting results in case the callback starts working
196                        // again.
197                        if let Some(report_cancel_tx) = report_cancel_tx.take() {
198                            let _ = report_cancel_tx.send(());
199                            first_error = Some(error);
200                        }
201                    }
202                }
203            },
204        );
205
206        // On Windows, the stdout and stderr futures might spawn processes that keep the runner
207        // stuck indefinitely if it's dropped the normal way. Shut it down aggressively, being OK
208        // with leaked resources.
209        self.inner.runtime.shutdown_background();
210
211        match (res, first_error) {
212            (Ok(run_stats), None) => Ok(run_stats),
213            (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
214                report_error: Some(report_error),
215                join_errors: Vec::new(),
216            }),
217            (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
218                report_error,
219                join_errors,
220            }),
221        }
222    }
223}
224
225#[derive(Debug)]
226struct TestRunnerInner<'a> {
227    run_id: ReportUuid,
228    profile: &'a EvaluatableProfile<'a>,
229    test_list: &'a TestList<'a>,
230    test_threads: usize,
231    double_spawn: DoubleSpawnInfo,
232    target_runner: TargetRunner,
233    capture_strategy: CaptureStrategy,
234    force_retries: Option<RetryPolicy>,
235    cli_args: Vec<String>,
236    max_fail: MaxFail,
237    runtime: Runtime,
238}
239
240impl<'a> TestRunnerInner<'a> {
241    fn execute<F>(
242        &self,
243        signal_handler: &mut SignalHandler,
244        input_handler: &mut InputHandler,
245        report_cancel_rx: oneshot::Receiver<()>,
246        callback: F,
247    ) -> Result<RunStats, Vec<JoinError>>
248    where
249        F: FnMut(TestEvent<'a>) + Send,
250    {
251        // TODO: add support for other test-running approaches, measure performance.
252
253        let mut dispatcher_cx = DispatcherContext::new(
254            callback,
255            self.run_id,
256            self.profile.name(),
257            self.cli_args.clone(),
258            self.test_list.run_count(),
259            self.max_fail,
260            self.profile.global_timeout().period,
261        );
262
263        let executor_cx = ExecutorContext::new(
264            self.run_id,
265            self.profile,
266            self.test_list,
267            self.double_spawn.clone(),
268            self.target_runner.clone(),
269            self.capture_strategy,
270            self.force_retries,
271        );
272
273        // Send the initial event.
274        // (Don't need to set the cancelled atomic if this fails because the run hasn't started
275        // yet.)
276        dispatcher_cx.run_started(self.test_list);
277
278        let executor_cx_ref = &executor_cx;
279        let dispatcher_cx_mut = &mut dispatcher_cx;
280
281        let _guard = self.runtime.enter();
282
283        let ((), results) = TokioScope::scope_and_block(move |scope| {
284            let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
285
286            // Run the dispatcher to completion in a task.
287            let dispatcher_fut =
288                dispatcher_cx_mut.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
289            scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
290
291            let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
292            let script_resp_tx = resp_tx.clone();
293            let run_scripts_fut = async move {
294                // Since script tasks are run serially, we just reuse the one
295                // script task.
296                let script_data = executor_cx_ref.run_setup_scripts(script_resp_tx).await;
297                if script_tx.send(script_data).is_err() {
298                    // The dispatcher has shut down, so we should too.
299                    debug!("script_tx.send failed, shutting down");
300                }
301                RunnerTaskState::finished_no_children()
302            };
303            scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
304
305            let Some(script_data) = script_rx.blocking_recv() else {
306                // Most likely the harness is shutting down, so we should too.
307                debug!("no script data received, shutting down");
308                return;
309            };
310
311            // groups is going to be passed to future_queue_grouped.
312            let groups = self
313                .profile
314                .test_group_config()
315                .iter()
316                .map(|(group_name, config)| (group_name, config.max_threads.compute()));
317
318            let setup_script_data = Arc::new(script_data);
319
320            let filter_resp_tx = resp_tx.clone();
321
322            let tests = self.test_list.to_priority_queue(self.profile);
323            let run_tests_fut = futures::stream::iter(tests)
324                .filter_map(move |test| {
325                    // Filter tests before assigning a FutureQueueContext to
326                    // them.
327                    //
328                    // Note that this function is called lazily due to the
329                    // `future_queue_grouped` below. This means that skip
330                    // notifications will go out as tests are iterated over, not
331                    // all at once.
332                    let filter_resp_tx = filter_resp_tx.clone();
333                    async move {
334                        if let FilterMatch::Mismatch { reason } =
335                            test.instance.test_info.filter_match
336                        {
337                            // Failure to send means the receiver was dropped.
338                            let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
339                                test_instance: test.instance,
340                                reason,
341                            });
342                            return None;
343                        }
344                        Some(test)
345                    }
346                })
347                .map(move |test: TestInstanceWithSettings<'a>| {
348                    let threads_required =
349                        test.settings.threads_required().compute(self.test_threads);
350                    let test_group = match test.settings.test_group() {
351                        TestGroup::Global => None,
352                        TestGroup::Custom(name) => Some(name.clone()),
353                    };
354                    let resp_tx = resp_tx.clone();
355                    let setup_script_data = setup_script_data.clone();
356
357                    let test_instance = test.instance;
358
359                    let f = move |cx: FutureQueueContext| {
360                        debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
361                        // Use a separate Tokio task for each test. For repos
362                        // with lots of small tests, this has been observed to
363                        // be much faster than using a single task for all tests
364                        // (what we used to do). It also provides some degree of
365                        // per-test isolation.
366                        async move {
367                            // SAFETY: Within an outer scope_and_block (which we
368                            // have here), scope_and_collect is safe as long as
369                            // the returned future isn't forgotten. We're not
370                            // forgetting it below -- we're running it to
371                            // completion immediately.
372                            //
373                            // But recursive scoped calls really feel like
374                            // pushing against the limits of async-scoped. For
375                            // example, there's no way built into async-scoped
376                            // to propagate a cancellation signal from the outer
377                            // scope to the inner scope. (But there could be,
378                            // right? That seems solvable via channels. And we
379                            // could likely do our own channels here.)
380                            let ((), mut ret) = unsafe {
381                                TokioScope::scope_and_collect(move |scope| {
382                                    scope.spawn(executor_cx_ref.run_test_instance(
383                                        test,
384                                        cx,
385                                        resp_tx.clone(),
386                                        setup_script_data,
387                                    ))
388                                })
389                            }
390                            .await;
391
392                            // If no future was started, that's really strange.
393                            // Worth at least logging.
394                            let Some(result) = ret.pop() else {
395                                warn!(
396                                    "no task was started for test instance: {}",
397                                    test_instance.id()
398                                );
399                                return None;
400                            };
401                            result.err()
402                        }
403                    };
404
405                    (threads_required, test_group, f)
406                })
407                // future_queue_grouped means tests are spawned in the order
408                // defined, but returned in any order.
409                .future_queue_grouped(self.test_threads, groups)
410                // Drop the None values.
411                .filter_map(std::future::ready)
412                .collect::<Vec<_>>()
413                // Interestingly, using a more idiomatic `async move {
414                // run_tests_fut.await ... }` block causes Rust 1.83 to complain
415                // about a weird lifetime mismatch. FutureExt::map as used below
416                // does not.
417                .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
418
419            scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
420        });
421
422        dispatcher_cx.run_finished();
423
424        // Were there any join errors in tasks?
425        //
426        // If one of the tasks panics, we likely end up stuck because the
427        // dispatcher, which is spawned in the same async-scoped block, doesn't
428        // get relayed the panic immediately. That should probably be fixed at
429        // some point.
430        let mut cancelled_count = 0;
431        let join_errors = results
432            .into_iter()
433            .flat_map(|r| {
434                match r {
435                    Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
436                    // Largely ignore cancelled tasks since it most likely means
437                    // shutdown -- we don't cancel tasks manually.
438                    Ok(RunnerTaskState::Cancelled) => {
439                        cancelled_count += 1;
440                        Vec::new()
441                    }
442                    Err(join_error) => vec![join_error],
443                }
444            })
445            .collect::<Vec<_>>();
446
447        if cancelled_count > 0 {
448            debug!(
449                "{} tasks were cancelled -- this \
450                 generally should only happen due to panics",
451                cancelled_count
452            );
453        }
454        if !join_errors.is_empty() {
455            return Err(join_errors);
456        }
457        Ok(dispatcher_cx.run_stats())
458    }
459}
460
461/// Configures stdout, stdin and stderr inheritance by test processes on Windows.
462///
463/// With Rust on Windows, these handles can be held open by tests (and therefore by grandchild processes)
464/// even if we run the tests with `Stdio::inherit`. This can cause problems with leaky tests.
465///
466/// This changes global state on the Win32 side, so the application must manage mutual exclusion
467/// around it. Call this right before [`TestRunner::try_execute`].
468///
469/// This is a no-op on non-Windows platforms.
470///
471/// See [this issue on the Rust repository](https://github.com/rust-lang/rust/issues/54760) for more
472/// discussion.
473pub fn configure_handle_inheritance(
474    no_capture: bool,
475) -> Result<(), ConfigureHandleInheritanceError> {
476    super::os::configure_handle_inheritance_impl(no_capture)
477}
478
479#[cfg(test)]
480mod tests {
481    use super::*;
482    use crate::{config::core::NextestConfig, platform::BuildPlatforms};
483
484    #[test]
485    fn no_capture_settings() {
486        // Ensure that output settings are ignored with no-capture.
487        let mut builder = TestRunnerBuilder::default();
488        builder
489            .set_capture_strategy(CaptureStrategy::None)
490            .set_test_threads(TestThreads::Count(20));
491        let test_list = TestList::empty();
492        let config = NextestConfig::default_config("/fake/dir");
493        let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
494        let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
495        let signal_handler = SignalHandlerKind::Noop;
496        let input_handler = InputHandlerKind::Noop;
497        let profile = profile.apply_build_platforms(&build_platforms);
498        let runner = builder
499            .build(
500                &test_list,
501                &profile,
502                vec![],
503                signal_handler,
504                input_handler,
505                DoubleSpawnInfo::disabled(),
506                TargetRunner::empty(),
507            )
508            .unwrap();
509        assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
510        assert_eq!(runner.inner.test_threads, 1, "tests run serially");
511    }
512}