nextest_runner/runner/
imp.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6    config::{
7        core::EvaluatableProfile,
8        elements::{MaxFail, RetryPolicy, TestGroup, TestThreads},
9        scripts::SetupScriptExecuteData,
10    },
11    double_spawn::DoubleSpawnInfo,
12    errors::{
13        ConfigureHandleInheritanceError, DebuggerCommandParseError, StressCountParseError,
14        TestRunnerBuildError, TestRunnerExecuteErrors, TracerCommandParseError,
15    },
16    input::{InputHandler, InputHandlerKind, InputHandlerStatus},
17    list::{OwnedTestInstanceId, TestInstanceWithSettings, TestList},
18    reporter::events::{ReporterEvent, RunStats, StressIndex},
19    runner::ExecutorEvent,
20    signal::{SignalHandler, SignalHandlerKind},
21    target_runner::TargetRunner,
22    test_output::CaptureStrategy,
23};
24use async_scoped::TokioScope;
25use chrono::{DateTime, Local};
26use future_queue::{FutureQueueContext, StreamExt};
27use futures::{future::Fuse, prelude::*};
28use nextest_metadata::FilterMatch;
29use quick_junit::ReportUuid;
30use std::{
31    collections::BTreeSet, convert::Infallible, fmt, num::NonZero, pin::Pin, str::FromStr,
32    sync::Arc, time::Duration,
33};
34use tokio::{
35    runtime::Runtime,
36    sync::{mpsc::unbounded_channel, oneshot},
37    task::JoinError,
38};
39use tracing::{debug, warn};
40
41/// A parsed debugger command.
42#[derive(Clone, Debug)]
43pub struct DebuggerCommand {
44    program: String,
45    args: Vec<String>,
46}
47
48impl DebuggerCommand {
49    /// Gets the program.
50    pub fn program(&self) -> &str {
51        // The from_str constructor ensures that there is at least one part.
52        &self.program
53    }
54
55    /// Gets the arguments.
56    pub fn args(&self) -> &[String] {
57        &self.args
58    }
59}
60
61impl FromStr for DebuggerCommand {
62    type Err = DebuggerCommandParseError;
63
64    fn from_str(command: &str) -> Result<Self, Self::Err> {
65        let mut parts =
66            shell_words::split(command).map_err(DebuggerCommandParseError::ShellWordsParse)?;
67        if parts.is_empty() {
68            return Err(DebuggerCommandParseError::EmptyCommand);
69        }
70        let program = parts.remove(0);
71        Ok(Self {
72            program,
73            args: parts,
74        })
75    }
76}
77
78/// A parsed tracer command.
79#[derive(Clone, Debug)]
80pub struct TracerCommand {
81    program: String,
82    args: Vec<String>,
83}
84
85impl TracerCommand {
86    /// Gets the program.
87    pub fn program(&self) -> &str {
88        &self.program
89    }
90
91    /// Gets the arguments.
92    pub fn args(&self) -> &[String] {
93        &self.args
94    }
95}
96
97impl FromStr for TracerCommand {
98    type Err = TracerCommandParseError;
99
100    fn from_str(command: &str) -> Result<Self, Self::Err> {
101        let mut parts =
102            shell_words::split(command).map_err(TracerCommandParseError::ShellWordsParse)?;
103        if parts.is_empty() {
104            return Err(TracerCommandParseError::EmptyCommand);
105        }
106        let program = parts.remove(0);
107        Ok(Self {
108            program,
109            args: parts,
110        })
111    }
112}
113
114/// An interceptor wraps test execution with a debugger or tracer.
115#[derive(Clone, Debug, Default)]
116pub enum Interceptor {
117    /// No interceptor - standard test execution.
118    #[default]
119    None,
120
121    /// Run the test under a debugger.
122    Debugger(DebuggerCommand),
123
124    /// Run the test under a syscall tracer.
125    Tracer(TracerCommand),
126}
127
128impl Interceptor {
129    /// Returns true if timeouts should be disabled.
130    ///
131    /// Both debuggers and tracers disable timeouts.
132    pub fn should_disable_timeouts(&self) -> bool {
133        match self {
134            Interceptor::None => false,
135            Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
136        }
137    }
138
139    /// Returns true if stdin should be passed through to child test processes.
140    ///
141    /// Only debuggers need stdin passthrough for interactive debugging.
142    pub fn should_passthrough_stdin(&self) -> bool {
143        match self {
144            Interceptor::None | Interceptor::Tracer(_) => false,
145            Interceptor::Debugger(_) => true,
146        }
147    }
148
149    /// Returns true if a process group should be created for the child.
150    ///
151    /// Debuggers need terminal control, so no process group is created. Tracers
152    /// work fine with process groups.
153    pub fn should_create_process_group(&self) -> bool {
154        match self {
155            Interceptor::None | Interceptor::Tracer(_) => true,
156            Interceptor::Debugger(_) => false,
157        }
158    }
159
160    /// Returns true if leak detection should be skipped.
161    ///
162    /// Both debuggers and tracers skip leak detection to avoid interference.
163    pub fn should_skip_leak_detection(&self) -> bool {
164        match self {
165            Interceptor::None => false,
166            Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
167        }
168    }
169
170    /// Returns true if the test command should be displayed.
171    ///
172    /// Used to determine if we should print the wrapper command for debugging.
173    pub fn should_show_wrapper_command(&self) -> bool {
174        match self {
175            Interceptor::None => false,
176            Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
177        }
178    }
179
180    /// Returns true if, on receiving SIGTSTP, we should send SIGTSTP to the
181    /// child.
182    ///
183    /// Debugger mode has special signal handling where we don't send SIGTSTP to
184    /// the child (it receives it directly from the terminal, since no process
185    /// group is created). Tracers use standard signal handling.
186    pub fn should_send_sigtstp(&self) -> bool {
187        match self {
188            Interceptor::None | Interceptor::Tracer(_) => true,
189            Interceptor::Debugger(_) => false,
190        }
191    }
192}
193
194/// A child process identifier: either a single process or a process group.
195#[derive(Copy, Clone, Debug)]
196pub(super) enum ChildPid {
197    /// A single process ID.
198    Process(#[cfg_attr(not(unix), expect(unused))] u32),
199
200    /// A process group ID.
201    #[cfg(unix)]
202    ProcessGroup(u32),
203}
204
205impl ChildPid {
206    /// Returns the PID value to use with `libc::kill`.
207    ///
208    /// - `Process(pid)` returns `pid as i32` (positive, kills single process).
209    /// - `ProcessGroup(pid)` returns `-(pid as i32)` (negative, kills process group).
210    ///
211    /// On Windows, always returns `pid as i32`.
212    #[cfg(unix)]
213    pub(super) fn for_kill(self) -> i32 {
214        match self {
215            ChildPid::Process(pid) => pid as i32,
216            ChildPid::ProcessGroup(pid) => -(pid as i32),
217        }
218    }
219}
220
221/// Test runner options.
222#[derive(Debug, Default)]
223pub struct TestRunnerBuilder {
224    capture_strategy: CaptureStrategy,
225    retries: Option<RetryPolicy>,
226    max_fail: Option<MaxFail>,
227    test_threads: Option<TestThreads>,
228    stress_condition: Option<StressCondition>,
229    interceptor: Interceptor,
230    expected_outstanding: Option<BTreeSet<OwnedTestInstanceId>>,
231}
232
233impl TestRunnerBuilder {
234    /// Sets the capture strategy for the test runner
235    ///
236    /// * [`CaptureStrategy::Split`]
237    ///   * pro: output from `stdout` and `stderr` can be identified and easily split
238    ///   * con: ordering between the streams cannot be guaranteed
239    /// * [`CaptureStrategy::Combined`]
240    ///   * pro: output is guaranteed to be ordered as it would in a terminal emulator
241    ///   * con: distinction between `stdout` and `stderr` is lost
242    /// * [`CaptureStrategy::None`] -
243    ///   * In this mode, tests will always be run serially: `test_threads` will always be 1.
244    pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
245        self.capture_strategy = strategy;
246        self
247    }
248
249    /// Sets the number of retries for this test runner.
250    pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
251        self.retries = Some(retries);
252        self
253    }
254
255    /// Sets the max-fail value for this test runner.
256    pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
257        self.max_fail = Some(max_fail);
258        self
259    }
260
261    /// Sets the number of tests to run simultaneously.
262    pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
263        self.test_threads = Some(test_threads);
264        self
265    }
266
267    /// Sets the stress testing condition.
268    pub fn set_stress_condition(&mut self, stress_condition: StressCondition) -> &mut Self {
269        self.stress_condition = Some(stress_condition);
270        self
271    }
272
273    /// Sets the interceptor (debugger or tracer) to use for running tests.
274    pub fn set_interceptor(&mut self, interceptor: Interceptor) -> &mut Self {
275        self.interceptor = interceptor;
276        self
277    }
278
279    /// Sets the expected outstanding tests for rerun tracking.
280    ///
281    /// When set, the dispatcher will track which tests were seen during the run
282    /// and emit a `TestsNotSeen` as part of the `RunFinished` if some expected
283    /// tests were not seen.
284    pub fn set_expected_outstanding(
285        &mut self,
286        expected: BTreeSet<OwnedTestInstanceId>,
287    ) -> &mut Self {
288        self.expected_outstanding = Some(expected);
289        self
290    }
291
292    /// Creates a new test runner.
293    #[expect(clippy::too_many_arguments)]
294    pub fn build<'a>(
295        self,
296        test_list: &'a TestList,
297        profile: &'a EvaluatableProfile<'a>,
298        cli_args: Vec<String>,
299        signal_handler: SignalHandlerKind,
300        input_handler: InputHandlerKind,
301        double_spawn: DoubleSpawnInfo,
302        target_runner: TargetRunner,
303    ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
304        let test_threads = match self.capture_strategy {
305            CaptureStrategy::None => 1,
306            CaptureStrategy::Combined | CaptureStrategy::Split => self
307                .test_threads
308                .unwrap_or_else(|| profile.test_threads())
309                .compute(),
310        };
311        let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
312
313        let runtime = tokio::runtime::Builder::new_multi_thread()
314            .enable_all()
315            .thread_name("nextest-runner-worker")
316            .build()
317            .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
318        let _guard = runtime.enter();
319
320        // signal_handler.build() must be called from within the guard.
321        let signal_handler = signal_handler.build()?;
322
323        let input_handler = input_handler.build();
324
325        Ok(TestRunner {
326            inner: TestRunnerInner {
327                run_id: force_or_new_run_id(),
328                started_at: Local::now(),
329                profile,
330                test_list,
331                test_threads,
332                double_spawn,
333                target_runner,
334                capture_strategy: self.capture_strategy,
335                force_retries: self.retries,
336                cli_args,
337                max_fail,
338                stress_condition: self.stress_condition,
339                interceptor: self.interceptor,
340                expected_outstanding: self.expected_outstanding,
341                runtime,
342            },
343            signal_handler,
344            input_handler,
345        })
346    }
347}
348
349/// Stress testing condition.
350#[derive(Clone, Debug)]
351pub enum StressCondition {
352    /// Run each test `count` times.
353    Count(StressCount),
354
355    /// Run until this duration has elapsed.
356    Duration(Duration),
357}
358
359/// A count for stress testing.
360#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
361#[serde(tag = "kind", rename_all = "kebab-case")]
362#[cfg_attr(test, derive(test_strategy::Arbitrary))]
363pub enum StressCount {
364    /// Run each test `count` times.
365    Count {
366        /// The number of times to run each test.
367        count: NonZero<u32>,
368    },
369
370    /// Run indefinitely.
371    Infinite,
372}
373
374impl FromStr for StressCount {
375    type Err = StressCountParseError;
376
377    fn from_str(s: &str) -> Result<Self, Self::Err> {
378        if s == "infinite" {
379            Ok(StressCount::Infinite)
380        } else {
381            match s.parse() {
382                Ok(count) => Ok(StressCount::Count { count }),
383                Err(_) => Err(StressCountParseError::new(s)),
384            }
385        }
386    }
387}
388
389/// Context for running tests.
390///
391/// Created using [`TestRunnerBuilder::build`].
392#[derive(Debug)]
393pub struct TestRunner<'a> {
394    inner: TestRunnerInner<'a>,
395    signal_handler: SignalHandler,
396    input_handler: InputHandler,
397}
398
399impl<'a> TestRunner<'a> {
400    /// Returns the unique ID for this test run.
401    pub fn run_id(&self) -> ReportUuid {
402        self.inner.run_id
403    }
404
405    /// Returns the timestamp when this test run was started.
406    pub fn started_at(&self) -> DateTime<Local> {
407        self.inner.started_at
408    }
409
410    /// Returns the status of the input handler.
411    pub fn input_handler_status(&self) -> InputHandlerStatus {
412        self.input_handler.status()
413    }
414
415    /// Executes the listed tests, each one in its own process.
416    ///
417    /// The callback is called with the results of each test.
418    ///
419    /// Returns an error if any of the tasks panicked.
420    pub fn execute<F>(
421        self,
422        mut callback: F,
423    ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
424    where
425        F: FnMut(ReporterEvent<'a>) + Send,
426    {
427        self.try_execute::<Infallible, _>(|event| {
428            callback(event);
429            Ok(())
430        })
431    }
432
433    /// Executes the listed tests, each one in its own process.
434    ///
435    /// Accepts a callback that is called with the results of each test. If the callback returns an
436    /// error, the test run terminates and the callback is no longer called.
437    ///
438    /// Returns an error if any of the tasks panicked.
439    pub fn try_execute<E, F>(
440        mut self,
441        mut callback: F,
442    ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
443    where
444        F: FnMut(ReporterEvent<'a>) -> Result<(), E> + Send,
445        E: fmt::Debug + Send,
446    {
447        let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
448
449        // If report_cancel_tx is None, at least one error has occurred and the
450        // runner has been instructed to shut down. first_error is also set to
451        // Some in that case.
452        let mut report_cancel_tx = Some(report_cancel_tx);
453        let mut first_error = None;
454
455        let res = self.inner.execute(
456            &mut self.signal_handler,
457            &mut self.input_handler,
458            report_cancel_rx,
459            |event| {
460                match callback(event) {
461                    Ok(()) => {}
462                    Err(error) => {
463                        // If the callback fails, we need to let the runner know to start shutting
464                        // down. But we keep reporting results in case the callback starts working
465                        // again.
466                        if let Some(report_cancel_tx) = report_cancel_tx.take() {
467                            let _ = report_cancel_tx.send(());
468                            first_error = Some(error);
469                        }
470                    }
471                }
472            },
473        );
474
475        // On Windows, the stdout and stderr futures might spawn processes that keep the runner
476        // stuck indefinitely if it's dropped the normal way. Shut it down aggressively, being OK
477        // with leaked resources.
478        self.inner.runtime.shutdown_background();
479
480        match (res, first_error) {
481            (Ok(run_stats), None) => Ok(run_stats),
482            (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
483                report_error: Some(report_error),
484                join_errors: Vec::new(),
485            }),
486            (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
487                report_error,
488                join_errors,
489            }),
490        }
491    }
492}
493
494#[derive(Debug)]
495struct TestRunnerInner<'a> {
496    run_id: ReportUuid,
497    started_at: DateTime<Local>,
498    profile: &'a EvaluatableProfile<'a>,
499    test_list: &'a TestList<'a>,
500    test_threads: usize,
501    double_spawn: DoubleSpawnInfo,
502    target_runner: TargetRunner,
503    capture_strategy: CaptureStrategy,
504    force_retries: Option<RetryPolicy>,
505    cli_args: Vec<String>,
506    max_fail: MaxFail,
507    stress_condition: Option<StressCondition>,
508    interceptor: Interceptor,
509    expected_outstanding: Option<BTreeSet<OwnedTestInstanceId>>,
510    runtime: Runtime,
511}
512
513impl<'a> TestRunnerInner<'a> {
514    fn execute<F>(
515        &self,
516        signal_handler: &mut SignalHandler,
517        input_handler: &mut InputHandler,
518        report_cancel_rx: oneshot::Receiver<()>,
519        callback: F,
520    ) -> Result<RunStats, Vec<JoinError>>
521    where
522        F: FnMut(ReporterEvent<'a>) + Send,
523    {
524        // TODO: add support for other test-running approaches, measure performance.
525
526        // Disable the global timeout when an interceptor is active.
527        let global_timeout = if self.interceptor.should_disable_timeouts() {
528            crate::time::far_future_duration()
529        } else {
530            self.profile.global_timeout(self.test_list.mode()).period
531        };
532
533        let mut dispatcher_cx = DispatcherContext::new(
534            callback,
535            self.run_id,
536            self.profile.name(),
537            self.cli_args.clone(),
538            self.test_list.run_count(),
539            self.max_fail,
540            global_timeout,
541            self.stress_condition.clone(),
542            self.expected_outstanding.clone(),
543        );
544
545        let executor_cx = ExecutorContext::new(
546            self.run_id,
547            self.profile,
548            self.test_list,
549            self.double_spawn.clone(),
550            self.target_runner.clone(),
551            self.capture_strategy,
552            self.force_retries,
553            self.interceptor.clone(),
554        );
555
556        // Send the initial event.
557        dispatcher_cx.run_started(self.test_list, self.test_threads);
558
559        let _guard = self.runtime.enter();
560
561        let mut report_cancel_rx = std::pin::pin!(report_cancel_rx.fuse());
562
563        if self.stress_condition.is_some() {
564            loop {
565                let progress = dispatcher_cx
566                    .stress_progress()
567                    .expect("stress_condition is Some => stress progress is Some");
568                if progress.remaining().is_some() {
569                    dispatcher_cx.stress_sub_run_started(progress);
570
571                    self.do_run(
572                        dispatcher_cx.stress_index(),
573                        &mut dispatcher_cx,
574                        &executor_cx,
575                        signal_handler,
576                        input_handler,
577                        report_cancel_rx.as_mut(),
578                    )?;
579
580                    dispatcher_cx.stress_sub_run_finished();
581
582                    if dispatcher_cx.cancel_reason().is_some() {
583                        break;
584                    }
585                } else {
586                    break;
587                }
588            }
589        } else {
590            self.do_run(
591                None,
592                &mut dispatcher_cx,
593                &executor_cx,
594                signal_handler,
595                input_handler,
596                report_cancel_rx,
597            )?;
598        }
599
600        let run_stats = dispatcher_cx.run_stats();
601        dispatcher_cx.run_finished();
602
603        Ok(run_stats)
604    }
605
606    fn do_run<F>(
607        &self,
608        stress_index: Option<StressIndex>,
609        dispatcher_cx: &mut DispatcherContext<'a, F>,
610        executor_cx: &ExecutorContext<'a>,
611        signal_handler: &mut SignalHandler,
612        input_handler: &mut InputHandler,
613        report_cancel_rx: Pin<&mut Fuse<oneshot::Receiver<()>>>,
614    ) -> Result<(), Vec<JoinError>>
615    where
616        F: FnMut(ReporterEvent<'a>) + Send,
617    {
618        let ((), results) = TokioScope::scope_and_block(move |scope| {
619            let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
620
621            // Run the dispatcher to completion in a task.
622            let dispatcher_fut =
623                dispatcher_cx.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
624            scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
625
626            let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
627            let script_resp_tx = resp_tx.clone();
628            let run_scripts_fut = async move {
629                // Since script tasks are run serially, we just reuse the one
630                // script task.
631                let script_data = executor_cx
632                    .run_setup_scripts(stress_index, script_resp_tx)
633                    .await;
634                if script_tx.send(script_data).is_err() {
635                    // The dispatcher has shut down, so we should too.
636                    debug!("script_tx.send failed, shutting down");
637                }
638                RunnerTaskState::finished_no_children()
639            };
640            scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
641
642            let Some(script_data) = script_rx.blocking_recv() else {
643                // Most likely the harness is shutting down, so we should too.
644                debug!("no script data received, shutting down");
645                return;
646            };
647
648            // groups is going to be passed to future_queue_grouped.
649            let groups = self
650                .profile
651                .test_group_config()
652                .iter()
653                .map(|(group_name, config)| (group_name, config.max_threads.compute()));
654
655            let setup_script_data = Arc::new(script_data);
656
657            let filter_resp_tx = resp_tx.clone();
658
659            let tests = self.test_list.to_priority_queue(self.profile);
660            let run_tests_fut = futures::stream::iter(tests)
661                .filter_map(move |test| {
662                    // Filter tests before assigning a FutureQueueContext to
663                    // them.
664                    //
665                    // Note that this function is called lazily due to the
666                    // `future_queue_grouped` below. This means that skip
667                    // notifications will go out as tests are iterated over, not
668                    // all at once.
669                    let filter_resp_tx = filter_resp_tx.clone();
670                    async move {
671                        if let FilterMatch::Mismatch { reason } =
672                            test.instance.test_info.filter_match
673                        {
674                            // Failure to send means the receiver was dropped.
675                            let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
676                                stress_index,
677                                test_instance: test.instance,
678                                reason,
679                            });
680                            return None;
681                        }
682                        Some(test)
683                    }
684                })
685                .map(move |test: TestInstanceWithSettings<'a>| {
686                    let threads_required =
687                        test.settings.threads_required().compute(self.test_threads);
688                    let test_group = match test.settings.test_group() {
689                        TestGroup::Global => None,
690                        TestGroup::Custom(name) => Some(name.clone()),
691                    };
692                    let resp_tx = resp_tx.clone();
693                    let setup_script_data = setup_script_data.clone();
694
695                    let test_instance = test.instance;
696
697                    let f = move |cx: FutureQueueContext| {
698                        debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
699                        // Use a separate Tokio task for each test. For repos
700                        // with lots of small tests, this has been observed to
701                        // be much faster than using a single task for all tests
702                        // (what we used to do). It also provides some degree of
703                        // per-test isolation.
704                        async move {
705                            // SAFETY: Within an outer scope_and_block (which we
706                            // have here), scope_and_collect is safe as long as
707                            // the returned future isn't forgotten. We're not
708                            // forgetting it below -- we're running it to
709                            // completion immediately.
710                            //
711                            // But recursive scoped calls really feel like
712                            // pushing against the limits of async-scoped. For
713                            // example, there's no way built into async-scoped
714                            // to propagate a cancellation signal from the outer
715                            // scope to the inner scope. (But there could be,
716                            // right? That seems solvable via channels. And we
717                            // could likely do our own channels here.)
718                            let ((), mut ret) = unsafe {
719                                TokioScope::scope_and_collect(move |scope| {
720                                    scope.spawn(executor_cx.run_test_instance(
721                                        stress_index,
722                                        test,
723                                        cx,
724                                        resp_tx.clone(),
725                                        setup_script_data,
726                                    ))
727                                })
728                            }
729                            .await;
730
731                            // If no future was started, that's really strange.
732                            // Worth at least logging.
733                            let Some(result) = ret.pop() else {
734                                warn!(
735                                    "no task was started for test instance: {}",
736                                    test_instance.id()
737                                );
738                                return None;
739                            };
740                            result.err()
741                        }
742                    };
743
744                    (threads_required, test_group, f)
745                })
746                // future_queue_grouped means tests are spawned in the order
747                // defined, but returned in any order.
748                .future_queue_grouped(self.test_threads, groups)
749                // Drop the None values.
750                .filter_map(std::future::ready)
751                .collect::<Vec<_>>()
752                // Interestingly, using a more idiomatic `async move {
753                // run_tests_fut.await ... }` block causes Rust 1.83 to complain
754                // about a weird lifetime mismatch. FutureExt::map as used below
755                // does not.
756                .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
757
758            scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
759        });
760
761        // Were there any join errors in tasks?
762        //
763        // If one of the tasks panics, we likely end up stuck because the
764        // dispatcher, which is spawned in the same async-scoped block, doesn't
765        // get relayed the panic immediately. That should probably be fixed at
766        // some point.
767        let mut cancelled_count = 0;
768        let join_errors = results
769            .into_iter()
770            .flat_map(|r| {
771                match r {
772                    Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
773                    // Largely ignore cancelled tasks since it most likely means
774                    // shutdown -- we don't cancel tasks manually.
775                    Ok(RunnerTaskState::Cancelled) => {
776                        cancelled_count += 1;
777                        Vec::new()
778                    }
779                    Err(join_error) => vec![join_error],
780                }
781            })
782            .collect::<Vec<_>>();
783
784        if cancelled_count > 0 {
785            debug!(
786                "{} tasks were cancelled -- this \
787                 generally should only happen due to panics",
788                cancelled_count
789            );
790        }
791        if !join_errors.is_empty() {
792            return Err(join_errors);
793        }
794
795        Ok(())
796    }
797}
798
799/// Configures stdout, stdin and stderr inheritance by test processes on Windows.
800///
801/// With Rust on Windows, these handles can be held open by tests (and therefore by grandchild processes)
802/// even if we run the tests with `Stdio::inherit`. This can cause problems with leaky tests.
803///
804/// This changes global state on the Win32 side, so the application must manage mutual exclusion
805/// around it. Call this right before [`TestRunner::try_execute`].
806///
807/// This is a no-op on non-Windows platforms.
808///
809/// See [this issue on the Rust repository](https://github.com/rust-lang/rust/issues/54760) for more
810/// discussion.
811pub fn configure_handle_inheritance(
812    no_capture: bool,
813) -> Result<(), ConfigureHandleInheritanceError> {
814    super::os::configure_handle_inheritance_impl(no_capture)
815}
816
817/// Environment variable to force a specific run ID (for testing).
818const FORCE_RUN_ID_ENV: &str = "__NEXTEST_FORCE_RUN_ID";
819
820/// Returns a forced run ID from the environment, or generates a new one.
821fn force_or_new_run_id() -> ReportUuid {
822    if let Ok(id_str) = std::env::var(FORCE_RUN_ID_ENV) {
823        match id_str.parse::<ReportUuid>() {
824            Ok(uuid) => return uuid,
825            Err(err) => {
826                warn!(
827                    "{FORCE_RUN_ID_ENV} is set but invalid (expected UUID): {err}, \
828                     generating random ID"
829                );
830            }
831        }
832    }
833    ReportUuid::new_v4()
834}
835
836#[cfg(test)]
837mod tests {
838    use super::*;
839    use crate::{config::core::NextestConfig, platform::BuildPlatforms};
840
841    #[test]
842    fn no_capture_settings() {
843        // Ensure that output settings are ignored with no-capture.
844        let mut builder = TestRunnerBuilder::default();
845        builder
846            .set_capture_strategy(CaptureStrategy::None)
847            .set_test_threads(TestThreads::Count(20));
848        let test_list = TestList::empty();
849        let config = NextestConfig::default_config("/fake/dir");
850        let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
851        let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
852        let signal_handler = SignalHandlerKind::Noop;
853        let input_handler = InputHandlerKind::Noop;
854        let profile = profile.apply_build_platforms(&build_platforms);
855        let runner = builder
856            .build(
857                &test_list,
858                &profile,
859                vec![],
860                signal_handler,
861                input_handler,
862                DoubleSpawnInfo::disabled(),
863                TargetRunner::empty(),
864            )
865            .unwrap();
866        assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
867        assert_eq!(runner.inner.test_threads, 1, "tests run serially");
868    }
869
870    #[test]
871    fn test_debugger_command_parsing() {
872        // Valid commands
873        let cmd = DebuggerCommand::from_str("gdb --args").unwrap();
874        assert_eq!(cmd.program(), "gdb");
875        assert_eq!(cmd.args(), &["--args"]);
876
877        let cmd = DebuggerCommand::from_str("rust-gdb -ex run --args").unwrap();
878        assert_eq!(cmd.program(), "rust-gdb");
879        assert_eq!(cmd.args(), &["-ex", "run", "--args"]);
880
881        // With quotes
882        let cmd = DebuggerCommand::from_str(r#"gdb -ex "set print pretty on" --args"#).unwrap();
883        assert_eq!(cmd.program(), "gdb");
884        assert_eq!(cmd.args(), &["-ex", "set print pretty on", "--args"]);
885
886        // Empty command
887        let err = DebuggerCommand::from_str("").unwrap_err();
888        assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
889
890        // Whitespace only
891        let err = DebuggerCommand::from_str("   ").unwrap_err();
892        assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
893    }
894}