Skip to main content

nextest_runner/runner/
imp.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6    config::{
7        core::EvaluatableProfile,
8        elements::{FlakyResult, MaxFail, RetryPolicy, TestGroup, TestThreads},
9        scripts::SetupScriptExecuteData,
10    },
11    double_spawn::DoubleSpawnInfo,
12    errors::{
13        ConfigureHandleInheritanceError, DebuggerCommandParseError, StressCountParseError,
14        TestRunnerBuildError, TestRunnerExecuteErrors, TracerCommandParseError,
15    },
16    input::{InputHandler, InputHandlerKind, InputHandlerStatus},
17    list::{OwnedTestInstanceId, TestInstanceWithSettings, TestList},
18    reporter::events::{ReporterEvent, RunStats, StressIndex},
19    runner::ExecutorEvent,
20    signal::{SignalHandler, SignalHandlerKind},
21    target_runner::TargetRunner,
22    test_output::CaptureStrategy,
23};
24use async_scoped::TokioScope;
25use chrono::{DateTime, Local};
26use future_queue::{FutureQueueContext, StreamExt};
27use futures::{future::Fuse, prelude::*};
28use nextest_metadata::FilterMatch;
29use quick_junit::ReportUuid;
30use semver::Version;
31use std::{
32    collections::BTreeSet, convert::Infallible, fmt, num::NonZero, pin::Pin, str::FromStr,
33    sync::Arc, time::Duration,
34};
35use tokio::{
36    runtime::Runtime,
37    sync::{mpsc::unbounded_channel, oneshot},
38    task::JoinError,
39};
40use tracing::{debug, warn};
41
42/// A parsed debugger command.
43#[derive(Clone, Debug)]
44pub struct DebuggerCommand {
45    program: String,
46    args: Vec<String>,
47}
48
49impl DebuggerCommand {
50    /// Gets the program.
51    pub fn program(&self) -> &str {
52        // The from_str constructor ensures that there is at least one part.
53        &self.program
54    }
55
56    /// Gets the arguments.
57    pub fn args(&self) -> &[String] {
58        &self.args
59    }
60}
61
62impl FromStr for DebuggerCommand {
63    type Err = DebuggerCommandParseError;
64
65    fn from_str(command: &str) -> Result<Self, Self::Err> {
66        let mut parts =
67            shell_words::split(command).map_err(DebuggerCommandParseError::ShellWordsParse)?;
68        if parts.is_empty() {
69            return Err(DebuggerCommandParseError::EmptyCommand);
70        }
71        let program = parts.remove(0);
72        Ok(Self {
73            program,
74            args: parts,
75        })
76    }
77}
78
79/// A parsed tracer command.
80#[derive(Clone, Debug)]
81pub struct TracerCommand {
82    program: String,
83    args: Vec<String>,
84}
85
86impl TracerCommand {
87    /// Gets the program.
88    pub fn program(&self) -> &str {
89        &self.program
90    }
91
92    /// Gets the arguments.
93    pub fn args(&self) -> &[String] {
94        &self.args
95    }
96}
97
98impl FromStr for TracerCommand {
99    type Err = TracerCommandParseError;
100
101    fn from_str(command: &str) -> Result<Self, Self::Err> {
102        let mut parts =
103            shell_words::split(command).map_err(TracerCommandParseError::ShellWordsParse)?;
104        if parts.is_empty() {
105            return Err(TracerCommandParseError::EmptyCommand);
106        }
107        let program = parts.remove(0);
108        Ok(Self {
109            program,
110            args: parts,
111        })
112    }
113}
114
115/// An interceptor wraps test execution with a debugger or tracer.
116#[derive(Clone, Debug, Default)]
117pub enum Interceptor {
118    /// No interceptor - standard test execution.
119    #[default]
120    None,
121
122    /// Run the test under a debugger.
123    Debugger(DebuggerCommand),
124
125    /// Run the test under a syscall tracer.
126    Tracer(TracerCommand),
127}
128
129impl Interceptor {
130    /// Returns true if timeouts should be disabled.
131    ///
132    /// Both debuggers and tracers disable timeouts.
133    pub fn should_disable_timeouts(&self) -> bool {
134        match self {
135            Interceptor::None => false,
136            Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
137        }
138    }
139
140    /// Returns true if stdin should be passed through to child test processes.
141    ///
142    /// Only debuggers need stdin passthrough for interactive debugging.
143    pub fn should_passthrough_stdin(&self) -> bool {
144        match self {
145            Interceptor::None | Interceptor::Tracer(_) => false,
146            Interceptor::Debugger(_) => true,
147        }
148    }
149
150    /// Returns true if a process group should be created for the child.
151    ///
152    /// Debuggers need terminal control, so no process group is created. Tracers
153    /// work fine with process groups.
154    pub fn should_create_process_group(&self) -> bool {
155        match self {
156            Interceptor::None | Interceptor::Tracer(_) => true,
157            Interceptor::Debugger(_) => false,
158        }
159    }
160
161    /// Returns true if leak detection should be skipped.
162    ///
163    /// Both debuggers and tracers skip leak detection to avoid interference.
164    pub fn should_skip_leak_detection(&self) -> bool {
165        match self {
166            Interceptor::None => false,
167            Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
168        }
169    }
170
171    /// Returns true if the test command should be displayed.
172    ///
173    /// Used to determine if we should print the wrapper command for debugging.
174    pub fn should_show_wrapper_command(&self) -> bool {
175        match self {
176            Interceptor::None => false,
177            Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
178        }
179    }
180
181    /// Returns true if, on receiving SIGTSTP, we should send SIGTSTP to the
182    /// child.
183    ///
184    /// Debugger mode has special signal handling where we don't send SIGTSTP to
185    /// the child (it receives it directly from the terminal, since no process
186    /// group is created). Tracers use standard signal handling.
187    pub fn should_send_sigtstp(&self) -> bool {
188        match self {
189            Interceptor::None | Interceptor::Tracer(_) => true,
190            Interceptor::Debugger(_) => false,
191        }
192    }
193}
194
195/// Version-related environment variables set for tests and setup scripts.
196///
197/// These expose the current nextest version and any version constraints from
198/// the repository's configuration.
199#[derive(Clone, Debug)]
200pub struct VersionEnvVars {
201    /// The current nextest version.
202    pub current_version: Version,
203
204    /// The required nextest version from configuration, if any.
205    pub required_version: Option<Version>,
206
207    /// The recommended nextest version from configuration, if any.
208    pub recommended_version: Option<Version>,
209}
210
211impl VersionEnvVars {
212    /// Applies the version environment variables to a command.
213    pub(crate) fn apply_env(&self, cmd: &mut std::process::Command) {
214        cmd.env("NEXTEST_VERSION", self.current_version.to_string());
215        cmd.env(
216            "NEXTEST_REQUIRED_VERSION",
217            match &self.required_version {
218                Some(v) => v.to_string(),
219                None => "none".to_owned(),
220            },
221        );
222        cmd.env(
223            "NEXTEST_RECOMMENDED_VERSION",
224            match &self.recommended_version {
225                Some(v) => v.to_string(),
226                None => "none".to_owned(),
227            },
228        );
229    }
230}
231
232/// A child process identifier: either a single process or a process group.
233#[derive(Copy, Clone, Debug)]
234pub(super) enum ChildPid {
235    /// A single process ID.
236    Process(#[cfg_attr(not(unix), expect(unused))] u32),
237
238    /// A process group ID.
239    #[cfg(unix)]
240    ProcessGroup(u32),
241}
242
243impl ChildPid {
244    /// Returns the PID value to use with `libc::kill`.
245    ///
246    /// - `Process(pid)` returns `pid as i32` (positive, kills single process).
247    /// - `ProcessGroup(pid)` returns `-(pid as i32)` (negative, kills process group).
248    ///
249    /// On Windows, always returns `pid as i32`.
250    #[cfg(unix)]
251    pub(super) fn for_kill(self) -> i32 {
252        match self {
253            ChildPid::Process(pid) => pid as i32,
254            ChildPid::ProcessGroup(pid) => -(pid as i32),
255        }
256    }
257}
258
259/// Test runner options.
260#[derive(Debug, Default)]
261pub struct TestRunnerBuilder {
262    capture_strategy: CaptureStrategy,
263    retries: Option<RetryPolicy>,
264    flaky_result: Option<FlakyResult>,
265    max_fail: Option<MaxFail>,
266    test_threads: Option<TestThreads>,
267    stress_condition: Option<StressCondition>,
268    interceptor: Interceptor,
269    expected_outstanding: Option<BTreeSet<OwnedTestInstanceId>>,
270}
271
272impl TestRunnerBuilder {
273    /// Sets the capture strategy for the test runner
274    ///
275    /// * [`CaptureStrategy::Split`]
276    ///   * pro: output from `stdout` and `stderr` can be identified and easily split
277    ///   * con: ordering between the streams cannot be guaranteed
278    /// * [`CaptureStrategy::Combined`]
279    ///   * pro: output is guaranteed to be ordered as it would in a terminal emulator
280    ///   * con: distinction between `stdout` and `stderr` is lost
281    /// * [`CaptureStrategy::None`] -
282    ///   * In this mode, tests will always be run serially: `test_threads` will always be 1.
283    pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
284        self.capture_strategy = strategy;
285        self
286    }
287
288    /// Sets the number of retries for this test runner.
289    pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
290        self.retries = Some(retries);
291        self
292    }
293
294    /// Sets the flaky result behavior for this test runner.
295    pub fn set_flaky_result(&mut self, flaky_result: FlakyResult) -> &mut Self {
296        self.flaky_result = Some(flaky_result);
297        self
298    }
299
300    /// Sets the max-fail value for this test runner.
301    pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
302        self.max_fail = Some(max_fail);
303        self
304    }
305
306    /// Sets the number of tests to run simultaneously.
307    pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
308        self.test_threads = Some(test_threads);
309        self
310    }
311
312    /// Sets the stress testing condition.
313    pub fn set_stress_condition(&mut self, stress_condition: StressCondition) -> &mut Self {
314        self.stress_condition = Some(stress_condition);
315        self
316    }
317
318    /// Sets the interceptor (debugger or tracer) to use for running tests.
319    pub fn set_interceptor(&mut self, interceptor: Interceptor) -> &mut Self {
320        self.interceptor = interceptor;
321        self
322    }
323
324    /// Sets the expected outstanding tests for rerun tracking.
325    ///
326    /// When set, the dispatcher will track which tests were seen during the run
327    /// and emit a `TestsNotSeen` as part of the `RunFinished` if some expected
328    /// tests were not seen.
329    pub fn set_expected_outstanding(
330        &mut self,
331        expected: BTreeSet<OwnedTestInstanceId>,
332    ) -> &mut Self {
333        self.expected_outstanding = Some(expected);
334        self
335    }
336
337    /// Creates a new test runner.
338    ///
339    /// `run_id` identifies this invocation. The caller should generate it via
340    /// [`force_or_new_run_id`](crate::helpers::force_or_new_run_id) and pass it
341    /// in.
342    #[expect(clippy::too_many_arguments)]
343    pub fn build<'a>(
344        self,
345        run_id: ReportUuid,
346        version_env_vars: VersionEnvVars,
347        test_list: &'a TestList,
348        profile: &'a EvaluatableProfile<'a>,
349        cli_args: Vec<String>,
350        signal_handler: SignalHandlerKind,
351        input_handler: InputHandlerKind,
352        double_spawn: DoubleSpawnInfo,
353        target_runner: TargetRunner,
354    ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
355        let test_threads = match self.capture_strategy {
356            CaptureStrategy::None => 1,
357            CaptureStrategy::Combined | CaptureStrategy::Split => self
358                .test_threads
359                .unwrap_or_else(|| profile.test_threads())
360                .compute(),
361        };
362        let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
363
364        let runtime = tokio::runtime::Builder::new_multi_thread()
365            .enable_all()
366            .thread_name("nextest-runner-worker")
367            .build()
368            .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
369        let _guard = runtime.enter();
370
371        // signal_handler.build() must be called from within the guard.
372        let signal_handler = signal_handler.build()?;
373
374        let input_handler = input_handler.build();
375
376        Ok(TestRunner {
377            inner: TestRunnerInner {
378                run_id,
379                started_at: Local::now(),
380                profile,
381                test_list,
382                test_threads,
383                double_spawn,
384                target_runner,
385                capture_strategy: self.capture_strategy,
386                force_retries: self.retries,
387                force_flaky_result: self.flaky_result,
388                cli_args,
389                max_fail,
390                stress_condition: self.stress_condition,
391                interceptor: self.interceptor,
392                expected_outstanding: self.expected_outstanding,
393                version_env_vars,
394                runtime,
395            },
396            signal_handler,
397            input_handler,
398        })
399    }
400}
401
402/// Stress testing condition.
403#[derive(Clone, Debug)]
404pub enum StressCondition {
405    /// Run each test `count` times.
406    Count(StressCount),
407
408    /// Run until this duration has elapsed.
409    Duration(Duration),
410}
411
412/// A count for stress testing.
413#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
414#[serde(tag = "kind", rename_all = "kebab-case")]
415#[cfg_attr(test, derive(test_strategy::Arbitrary))]
416pub enum StressCount {
417    /// Run each test `count` times.
418    Count {
419        /// The number of times to run each test.
420        count: NonZero<u32>,
421    },
422
423    /// Run indefinitely.
424    Infinite,
425}
426
427impl FromStr for StressCount {
428    type Err = StressCountParseError;
429
430    fn from_str(s: &str) -> Result<Self, Self::Err> {
431        if s == "infinite" {
432            Ok(StressCount::Infinite)
433        } else {
434            match s.parse() {
435                Ok(count) => Ok(StressCount::Count { count }),
436                Err(_) => Err(StressCountParseError::new(s)),
437            }
438        }
439    }
440}
441
442/// Context for running tests.
443///
444/// Created using [`TestRunnerBuilder::build`].
445#[derive(Debug)]
446pub struct TestRunner<'a> {
447    inner: TestRunnerInner<'a>,
448    signal_handler: SignalHandler,
449    input_handler: InputHandler,
450}
451
452impl<'a> TestRunner<'a> {
453    /// Returns the unique ID for this test run.
454    pub fn run_id(&self) -> ReportUuid {
455        self.inner.run_id
456    }
457
458    /// Returns the timestamp when this test run was started.
459    pub fn started_at(&self) -> DateTime<Local> {
460        self.inner.started_at
461    }
462
463    /// Returns the status of the input handler.
464    pub fn input_handler_status(&self) -> InputHandlerStatus {
465        self.input_handler.status()
466    }
467
468    /// Executes the listed tests, each one in its own process.
469    ///
470    /// The callback is called with the results of each test.
471    ///
472    /// Returns an error if any of the tasks panicked.
473    pub fn execute<F>(
474        self,
475        mut callback: F,
476    ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
477    where
478        F: FnMut(ReporterEvent<'a>) + Send,
479    {
480        self.try_execute::<Infallible, _>(|event| {
481            callback(event);
482            Ok(())
483        })
484    }
485
486    /// Executes the listed tests, each one in its own process.
487    ///
488    /// Accepts a callback that is called with the results of each test. If the callback returns an
489    /// error, the test run terminates and the callback is no longer called.
490    ///
491    /// Returns an error if any of the tasks panicked.
492    pub fn try_execute<E, F>(
493        mut self,
494        mut callback: F,
495    ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
496    where
497        F: FnMut(ReporterEvent<'a>) -> Result<(), E> + Send,
498        E: fmt::Debug + Send,
499    {
500        let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
501
502        // If report_cancel_tx is None, at least one error has occurred and the
503        // runner has been instructed to shut down. first_error is also set to
504        // Some in that case.
505        let mut report_cancel_tx = Some(report_cancel_tx);
506        let mut first_error = None;
507
508        let res = self.inner.execute(
509            &mut self.signal_handler,
510            &mut self.input_handler,
511            report_cancel_rx,
512            |event| {
513                match callback(event) {
514                    Ok(()) => {}
515                    Err(error) => {
516                        // If the callback fails, we need to let the runner know to start shutting
517                        // down. But we keep reporting results in case the callback starts working
518                        // again.
519                        if let Some(report_cancel_tx) = report_cancel_tx.take() {
520                            let _ = report_cancel_tx.send(());
521                            first_error = Some(error);
522                        }
523                    }
524                }
525            },
526        );
527
528        // On Windows, the stdout and stderr futures might spawn processes that keep the runner
529        // stuck indefinitely if it's dropped the normal way. Shut it down aggressively, being OK
530        // with leaked resources.
531        self.inner.runtime.shutdown_background();
532
533        match (res, first_error) {
534            (Ok(run_stats), None) => Ok(run_stats),
535            (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
536                report_error: Some(report_error),
537                join_errors: Vec::new(),
538            }),
539            (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
540                report_error,
541                join_errors,
542            }),
543        }
544    }
545}
546
547#[derive(Debug)]
548struct TestRunnerInner<'a> {
549    run_id: ReportUuid,
550    started_at: DateTime<Local>,
551    profile: &'a EvaluatableProfile<'a>,
552    test_list: &'a TestList<'a>,
553    test_threads: usize,
554    double_spawn: DoubleSpawnInfo,
555    target_runner: TargetRunner,
556    capture_strategy: CaptureStrategy,
557    force_retries: Option<RetryPolicy>,
558    force_flaky_result: Option<FlakyResult>,
559    cli_args: Vec<String>,
560    max_fail: MaxFail,
561    stress_condition: Option<StressCondition>,
562    interceptor: Interceptor,
563    expected_outstanding: Option<BTreeSet<OwnedTestInstanceId>>,
564    version_env_vars: VersionEnvVars,
565    runtime: Runtime,
566}
567
568impl<'a> TestRunnerInner<'a> {
569    fn execute<F>(
570        &self,
571        signal_handler: &mut SignalHandler,
572        input_handler: &mut InputHandler,
573        report_cancel_rx: oneshot::Receiver<()>,
574        callback: F,
575    ) -> Result<RunStats, Vec<JoinError>>
576    where
577        F: FnMut(ReporterEvent<'a>) + Send,
578    {
579        // TODO: add support for other test-running approaches, measure performance.
580
581        // Disable the global timeout when an interceptor is active.
582        let global_timeout = if self.interceptor.should_disable_timeouts() {
583            crate::time::far_future_duration()
584        } else {
585            self.profile.global_timeout(self.test_list.mode()).period
586        };
587
588        let mut dispatcher_cx = DispatcherContext::new(
589            callback,
590            self.run_id,
591            self.profile.name(),
592            self.cli_args.clone(),
593            self.test_list.run_count(),
594            self.max_fail,
595            global_timeout,
596            self.stress_condition.clone(),
597            self.expected_outstanding.clone(),
598        );
599
600        let executor_cx = ExecutorContext::new(
601            self.run_id,
602            self.profile,
603            self.test_list,
604            self.test_threads,
605            self.double_spawn.clone(),
606            self.target_runner.clone(),
607            self.capture_strategy,
608            self.force_retries,
609            self.force_flaky_result,
610            self.interceptor.clone(),
611            self.version_env_vars.clone(),
612        );
613
614        // Send the initial event.
615        dispatcher_cx.run_started(self.test_list, self.test_threads);
616
617        let _guard = self.runtime.enter();
618
619        let mut report_cancel_rx = std::pin::pin!(report_cancel_rx.fuse());
620
621        if self.stress_condition.is_some() {
622            loop {
623                let progress = dispatcher_cx
624                    .stress_progress()
625                    .expect("stress_condition is Some => stress progress is Some");
626                if progress.remaining().is_some() {
627                    dispatcher_cx.stress_sub_run_started(progress);
628
629                    self.do_run(
630                        dispatcher_cx.stress_index(),
631                        &mut dispatcher_cx,
632                        &executor_cx,
633                        signal_handler,
634                        input_handler,
635                        report_cancel_rx.as_mut(),
636                    )?;
637
638                    dispatcher_cx.stress_sub_run_finished();
639
640                    if dispatcher_cx.cancel_reason().is_some() {
641                        break;
642                    }
643                } else {
644                    break;
645                }
646            }
647        } else {
648            self.do_run(
649                None,
650                &mut dispatcher_cx,
651                &executor_cx,
652                signal_handler,
653                input_handler,
654                report_cancel_rx,
655            )?;
656        }
657
658        let run_stats = dispatcher_cx.run_stats();
659        dispatcher_cx.run_finished();
660
661        Ok(run_stats)
662    }
663
664    fn do_run<F>(
665        &self,
666        stress_index: Option<StressIndex>,
667        dispatcher_cx: &mut DispatcherContext<'a, F>,
668        executor_cx: &ExecutorContext<'a>,
669        signal_handler: &mut SignalHandler,
670        input_handler: &mut InputHandler,
671        report_cancel_rx: Pin<&mut Fuse<oneshot::Receiver<()>>>,
672    ) -> Result<(), Vec<JoinError>>
673    where
674        F: FnMut(ReporterEvent<'a>) + Send,
675    {
676        let ((), results) = TokioScope::scope_and_block(move |scope| {
677            let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
678
679            // Run the dispatcher to completion in a task.
680            let dispatcher_fut =
681                dispatcher_cx.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
682            scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
683
684            let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
685            let script_resp_tx = resp_tx.clone();
686            let run_scripts_fut = async move {
687                // Since script tasks are run serially, we just reuse the one
688                // script task.
689                let script_data = executor_cx
690                    .run_setup_scripts(stress_index, script_resp_tx)
691                    .await;
692                if script_tx.send(script_data).is_err() {
693                    // The dispatcher has shut down, so we should too.
694                    debug!("script_tx.send failed, shutting down");
695                }
696                RunnerTaskState::finished_no_children()
697            };
698            scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
699
700            let Some(script_data) = script_rx.blocking_recv() else {
701                // Most likely the harness is shutting down, so we should too.
702                debug!("no script data received, shutting down");
703                return;
704            };
705
706            // groups is going to be passed to future_queue_grouped.
707            let groups = self
708                .profile
709                .test_group_config()
710                .iter()
711                .map(|(group_name, config)| (group_name, config.max_threads.compute()));
712
713            let setup_script_data = Arc::new(script_data);
714
715            let filter_resp_tx = resp_tx.clone();
716
717            let tests = self.test_list.to_priority_queue(self.profile);
718            let run_tests_fut = futures::stream::iter(tests)
719                .filter_map(move |test| {
720                    // Filter tests before assigning a FutureQueueContext to
721                    // them.
722                    //
723                    // Note that this function is called lazily due to the
724                    // `future_queue_grouped` below. This means that skip
725                    // notifications will go out as tests are iterated over, not
726                    // all at once.
727                    let filter_resp_tx = filter_resp_tx.clone();
728                    async move {
729                        if let FilterMatch::Mismatch { reason } =
730                            test.instance.test_info.filter_match
731                        {
732                            // Failure to send means the receiver was dropped.
733                            let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
734                                stress_index,
735                                test_instance: test.instance,
736                                reason,
737                            });
738                            return None;
739                        }
740                        Some(test)
741                    }
742                })
743                .map(move |test: TestInstanceWithSettings<'a>| {
744                    let threads_required =
745                        test.settings.threads_required().compute(self.test_threads);
746                    let test_group = match test.settings.test_group() {
747                        TestGroup::Global => None,
748                        TestGroup::Custom(name) => Some(name.clone()),
749                    };
750                    let resp_tx = resp_tx.clone();
751                    let setup_script_data = setup_script_data.clone();
752
753                    let test_instance = test.instance;
754
755                    let f = move |cx: FutureQueueContext| {
756                        debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
757                        // Use a separate Tokio task for each test. For repos
758                        // with lots of small tests, this has been observed to
759                        // be much faster than using a single task for all tests
760                        // (what we used to do). It also provides some degree of
761                        // per-test isolation.
762                        async move {
763                            // SAFETY: Within an outer scope_and_block (which we
764                            // have here), scope_and_collect is safe as long as
765                            // the returned future isn't forgotten. We're not
766                            // forgetting it below -- we're running it to
767                            // completion immediately.
768                            //
769                            // But recursive scoped calls really feel like
770                            // pushing against the limits of async-scoped. For
771                            // example, there's no way built into async-scoped
772                            // to propagate a cancellation signal from the outer
773                            // scope to the inner scope. (But there could be,
774                            // right? That seems solvable via channels. And we
775                            // could likely do our own channels here.)
776                            let ((), mut ret) = unsafe {
777                                TokioScope::scope_and_collect(move |scope| {
778                                    scope.spawn(executor_cx.run_test_instance(
779                                        stress_index,
780                                        test,
781                                        cx,
782                                        resp_tx.clone(),
783                                        setup_script_data,
784                                    ))
785                                })
786                            }
787                            .await;
788
789                            // If no future was started, that's really strange.
790                            // Worth at least logging.
791                            let Some(result) = ret.pop() else {
792                                warn!(
793                                    "no task was started for test instance: {}",
794                                    test_instance.id()
795                                );
796                                return None;
797                            };
798                            result.err()
799                        }
800                    };
801
802                    (threads_required, test_group, f)
803                })
804                // future_queue_grouped means tests are spawned in the order
805                // defined, but returned in any order.
806                .future_queue_grouped(self.test_threads, groups)
807                // Drop the None values.
808                .filter_map(std::future::ready)
809                .collect::<Vec<_>>()
810                // Interestingly, using a more idiomatic `async move {
811                // run_tests_fut.await ... }` block causes Rust 1.83 to complain
812                // about a weird lifetime mismatch. FutureExt::map as used below
813                // does not.
814                .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
815
816            scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
817        });
818
819        // Were there any join errors in tasks?
820        //
821        // If one of the tasks panics, we likely end up stuck because the
822        // dispatcher, which is spawned in the same async-scoped block, doesn't
823        // get relayed the panic immediately. That should probably be fixed at
824        // some point.
825        let mut cancelled_count = 0;
826        let join_errors = results
827            .into_iter()
828            .flat_map(|r| {
829                match r {
830                    Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
831                    // Largely ignore cancelled tasks since it most likely means
832                    // shutdown -- we don't cancel tasks manually.
833                    Ok(RunnerTaskState::Cancelled) => {
834                        cancelled_count += 1;
835                        Vec::new()
836                    }
837                    Err(join_error) => vec![join_error],
838                }
839            })
840            .collect::<Vec<_>>();
841
842        if cancelled_count > 0 {
843            debug!(
844                "{} tasks were cancelled -- this \
845                 generally should only happen due to panics",
846                cancelled_count
847            );
848        }
849        if !join_errors.is_empty() {
850            return Err(join_errors);
851        }
852
853        Ok(())
854    }
855}
856
857/// Configures stdout, stdin and stderr inheritance by test processes on Windows.
858///
859/// With Rust on Windows, these handles can be held open by tests (and therefore by grandchild processes)
860/// even if we run the tests with `Stdio::inherit`. This can cause problems with leaky tests.
861///
862/// This changes global state on the Win32 side, so the application must manage mutual exclusion
863/// around it. Call this right before [`TestRunner::try_execute`].
864///
865/// This is a no-op on non-Windows platforms.
866///
867/// See [this issue on the Rust repository](https://github.com/rust-lang/rust/issues/54760) for more
868/// discussion.
869pub fn configure_handle_inheritance(
870    no_capture: bool,
871) -> Result<(), ConfigureHandleInheritanceError> {
872    super::os::configure_handle_inheritance_impl(no_capture)
873}
874
875#[cfg(test)]
876mod tests {
877    use super::*;
878    use crate::{config::core::NextestConfig, platform::BuildPlatforms};
879
880    #[test]
881    fn no_capture_settings() {
882        // Ensure that output settings are ignored with no-capture.
883        let mut builder = TestRunnerBuilder::default();
884        builder
885            .set_capture_strategy(CaptureStrategy::None)
886            .set_test_threads(TestThreads::Count(20));
887        let test_list = TestList::empty();
888        let config = NextestConfig::default_config("/fake/dir");
889        let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
890        let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
891        let signal_handler = SignalHandlerKind::Noop;
892        let input_handler = InputHandlerKind::Noop;
893        let profile = profile.apply_build_platforms(&build_platforms);
894        let version_env_vars = VersionEnvVars {
895            current_version: Version::new(0, 0, 0),
896            required_version: None,
897            recommended_version: None,
898        };
899        let runner = builder
900            .build(
901                crate::helpers::force_or_new_run_id(),
902                version_env_vars,
903                &test_list,
904                &profile,
905                vec![],
906                signal_handler,
907                input_handler,
908                DoubleSpawnInfo::disabled(),
909                TargetRunner::empty(),
910            )
911            .unwrap();
912        assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
913        assert_eq!(runner.inner.test_threads, 1, "tests run serially");
914    }
915
916    #[test]
917    fn test_debugger_command_parsing() {
918        // Valid commands
919        let cmd = DebuggerCommand::from_str("gdb --args").unwrap();
920        assert_eq!(cmd.program(), "gdb");
921        assert_eq!(cmd.args(), &["--args"]);
922
923        let cmd = DebuggerCommand::from_str("rust-gdb -ex run --args").unwrap();
924        assert_eq!(cmd.program(), "rust-gdb");
925        assert_eq!(cmd.args(), &["-ex", "run", "--args"]);
926
927        // With quotes
928        let cmd = DebuggerCommand::from_str(r#"gdb -ex "set print pretty on" --args"#).unwrap();
929        assert_eq!(cmd.program(), "gdb");
930        assert_eq!(cmd.args(), &["-ex", "set print pretty on", "--args"]);
931
932        // Empty command
933        let err = DebuggerCommand::from_str("").unwrap_err();
934        assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
935
936        // Whitespace only
937        let err = DebuggerCommand::from_str("   ").unwrap_err();
938        assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
939    }
940}