1use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6 config::{
7 core::EvaluatableProfile,
8 elements::{MaxFail, RetryPolicy, TestGroup, TestThreads},
9 scripts::SetupScriptExecuteData,
10 },
11 double_spawn::DoubleSpawnInfo,
12 errors::{
13 ConfigureHandleInheritanceError, DebuggerCommandParseError, StressCountParseError,
14 TestRunnerBuildError, TestRunnerExecuteErrors, TracerCommandParseError,
15 },
16 input::{InputHandler, InputHandlerKind, InputHandlerStatus},
17 list::{TestInstanceWithSettings, TestList},
18 reporter::events::{ReporterEvent, RunStats, StressIndex},
19 runner::ExecutorEvent,
20 signal::{SignalHandler, SignalHandlerKind},
21 target_runner::TargetRunner,
22 test_output::CaptureStrategy,
23};
24use async_scoped::TokioScope;
25use future_queue::{FutureQueueContext, StreamExt};
26use futures::{future::Fuse, prelude::*};
27use nextest_metadata::FilterMatch;
28use quick_junit::ReportUuid;
29use std::{
30 convert::Infallible, fmt, num::NonZero, pin::Pin, str::FromStr, sync::Arc, time::Duration,
31};
32use tokio::{
33 runtime::Runtime,
34 sync::{mpsc::unbounded_channel, oneshot},
35 task::JoinError,
36};
37use tracing::{debug, warn};
38
39#[derive(Clone, Debug)]
41pub struct DebuggerCommand {
42 program: String,
43 args: Vec<String>,
44}
45
46impl DebuggerCommand {
47 pub fn program(&self) -> &str {
49 &self.program
51 }
52
53 pub fn args(&self) -> &[String] {
55 &self.args
56 }
57}
58
59impl FromStr for DebuggerCommand {
60 type Err = DebuggerCommandParseError;
61
62 fn from_str(command: &str) -> Result<Self, Self::Err> {
63 let mut parts =
64 shell_words::split(command).map_err(DebuggerCommandParseError::ShellWordsParse)?;
65 if parts.is_empty() {
66 return Err(DebuggerCommandParseError::EmptyCommand);
67 }
68 let program = parts.remove(0);
69 Ok(Self {
70 program,
71 args: parts,
72 })
73 }
74}
75
76#[derive(Clone, Debug)]
78pub struct TracerCommand {
79 program: String,
80 args: Vec<String>,
81}
82
83impl TracerCommand {
84 pub fn program(&self) -> &str {
86 &self.program
87 }
88
89 pub fn args(&self) -> &[String] {
91 &self.args
92 }
93}
94
95impl FromStr for TracerCommand {
96 type Err = TracerCommandParseError;
97
98 fn from_str(command: &str) -> Result<Self, Self::Err> {
99 let mut parts =
100 shell_words::split(command).map_err(TracerCommandParseError::ShellWordsParse)?;
101 if parts.is_empty() {
102 return Err(TracerCommandParseError::EmptyCommand);
103 }
104 let program = parts.remove(0);
105 Ok(Self {
106 program,
107 args: parts,
108 })
109 }
110}
111
112#[derive(Clone, Debug, Default)]
114pub enum Interceptor {
115 #[default]
117 None,
118
119 Debugger(DebuggerCommand),
121
122 Tracer(TracerCommand),
124}
125
126impl Interceptor {
127 pub fn should_disable_timeouts(&self) -> bool {
131 match self {
132 Interceptor::None => false,
133 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
134 }
135 }
136
137 pub fn should_passthrough_stdin(&self) -> bool {
141 match self {
142 Interceptor::None | Interceptor::Tracer(_) => false,
143 Interceptor::Debugger(_) => true,
144 }
145 }
146
147 pub fn should_create_process_group(&self) -> bool {
152 match self {
153 Interceptor::None | Interceptor::Tracer(_) => true,
154 Interceptor::Debugger(_) => false,
155 }
156 }
157
158 pub fn should_skip_leak_detection(&self) -> bool {
162 match self {
163 Interceptor::None => false,
164 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
165 }
166 }
167
168 pub fn should_show_wrapper_command(&self) -> bool {
172 match self {
173 Interceptor::None => false,
174 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
175 }
176 }
177
178 pub fn should_send_sigtstp(&self) -> bool {
185 match self {
186 Interceptor::None | Interceptor::Tracer(_) => true,
187 Interceptor::Debugger(_) => false,
188 }
189 }
190}
191
192#[derive(Copy, Clone, Debug)]
194pub(super) enum ChildPid {
195 Process(#[cfg_attr(not(unix), expect(unused))] u32),
197
198 #[cfg(unix)]
200 ProcessGroup(u32),
201}
202
203impl ChildPid {
204 #[cfg(unix)]
211 pub(super) fn for_kill(self) -> i32 {
212 match self {
213 ChildPid::Process(pid) => pid as i32,
214 ChildPid::ProcessGroup(pid) => -(pid as i32),
215 }
216 }
217}
218
219#[derive(Debug, Default)]
221pub struct TestRunnerBuilder {
222 capture_strategy: CaptureStrategy,
223 retries: Option<RetryPolicy>,
224 max_fail: Option<MaxFail>,
225 test_threads: Option<TestThreads>,
226 stress_condition: Option<StressCondition>,
227 interceptor: Interceptor,
228}
229
230impl TestRunnerBuilder {
231 pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
242 self.capture_strategy = strategy;
243 self
244 }
245
246 pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
248 self.retries = Some(retries);
249 self
250 }
251
252 pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
254 self.max_fail = Some(max_fail);
255 self
256 }
257
258 pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
260 self.test_threads = Some(test_threads);
261 self
262 }
263
264 pub fn set_stress_condition(&mut self, stress_condition: StressCondition) -> &mut Self {
266 self.stress_condition = Some(stress_condition);
267 self
268 }
269
270 pub fn set_interceptor(&mut self, interceptor: Interceptor) -> &mut Self {
272 self.interceptor = interceptor;
273 self
274 }
275
276 #[expect(clippy::too_many_arguments)]
278 pub fn build<'a>(
279 self,
280 test_list: &'a TestList,
281 profile: &'a EvaluatableProfile<'a>,
282 cli_args: Vec<String>,
283 signal_handler: SignalHandlerKind,
284 input_handler: InputHandlerKind,
285 double_spawn: DoubleSpawnInfo,
286 target_runner: TargetRunner,
287 ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
288 let test_threads = match self.capture_strategy {
289 CaptureStrategy::None => 1,
290 CaptureStrategy::Combined | CaptureStrategy::Split => self
291 .test_threads
292 .unwrap_or_else(|| profile.test_threads())
293 .compute(),
294 };
295 let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
296
297 let runtime = tokio::runtime::Builder::new_multi_thread()
298 .enable_all()
299 .thread_name("nextest-runner-worker")
300 .build()
301 .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
302 let _guard = runtime.enter();
303
304 let signal_handler = signal_handler.build()?;
306
307 let input_handler = input_handler.build();
308
309 Ok(TestRunner {
310 inner: TestRunnerInner {
311 run_id: ReportUuid::new_v4(),
312 profile,
313 test_list,
314 test_threads,
315 double_spawn,
316 target_runner,
317 capture_strategy: self.capture_strategy,
318 force_retries: self.retries,
319 cli_args,
320 max_fail,
321 stress_condition: self.stress_condition,
322 interceptor: self.interceptor,
323 runtime,
324 },
325 signal_handler,
326 input_handler,
327 })
328 }
329}
330
331#[derive(Clone, Debug)]
333pub enum StressCondition {
334 Count(StressCount),
336
337 Duration(Duration),
339}
340
341#[derive(Clone, Copy, Debug)]
343pub enum StressCount {
344 Count {
346 count: NonZero<u32>,
348 },
349
350 Infinite,
352}
353
354impl FromStr for StressCount {
355 type Err = StressCountParseError;
356
357 fn from_str(s: &str) -> Result<Self, Self::Err> {
358 if s == "infinite" {
359 Ok(StressCount::Infinite)
360 } else {
361 match s.parse() {
362 Ok(count) => Ok(StressCount::Count { count }),
363 Err(_) => Err(StressCountParseError::new(s)),
364 }
365 }
366 }
367}
368
369#[derive(Debug)]
373pub struct TestRunner<'a> {
374 inner: TestRunnerInner<'a>,
375 signal_handler: SignalHandler,
376 input_handler: InputHandler,
377}
378
379impl<'a> TestRunner<'a> {
380 pub fn input_handler_status(&self) -> InputHandlerStatus {
382 self.input_handler.status()
383 }
384
385 pub fn execute<F>(
391 self,
392 mut callback: F,
393 ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
394 where
395 F: FnMut(ReporterEvent<'a>) + Send,
396 {
397 self.try_execute::<Infallible, _>(|event| {
398 callback(event);
399 Ok(())
400 })
401 }
402
403 pub fn try_execute<E, F>(
410 mut self,
411 mut callback: F,
412 ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
413 where
414 F: FnMut(ReporterEvent<'a>) -> Result<(), E> + Send,
415 E: fmt::Debug + Send,
416 {
417 let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
418
419 let mut report_cancel_tx = Some(report_cancel_tx);
423 let mut first_error = None;
424
425 let res = self.inner.execute(
426 &mut self.signal_handler,
427 &mut self.input_handler,
428 report_cancel_rx,
429 |event| {
430 match callback(event) {
431 Ok(()) => {}
432 Err(error) => {
433 if let Some(report_cancel_tx) = report_cancel_tx.take() {
437 let _ = report_cancel_tx.send(());
438 first_error = Some(error);
439 }
440 }
441 }
442 },
443 );
444
445 self.inner.runtime.shutdown_background();
449
450 match (res, first_error) {
451 (Ok(run_stats), None) => Ok(run_stats),
452 (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
453 report_error: Some(report_error),
454 join_errors: Vec::new(),
455 }),
456 (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
457 report_error,
458 join_errors,
459 }),
460 }
461 }
462}
463
464#[derive(Debug)]
465struct TestRunnerInner<'a> {
466 run_id: ReportUuid,
467 profile: &'a EvaluatableProfile<'a>,
468 test_list: &'a TestList<'a>,
469 test_threads: usize,
470 double_spawn: DoubleSpawnInfo,
471 target_runner: TargetRunner,
472 capture_strategy: CaptureStrategy,
473 force_retries: Option<RetryPolicy>,
474 cli_args: Vec<String>,
475 max_fail: MaxFail,
476 stress_condition: Option<StressCondition>,
477 interceptor: Interceptor,
478 runtime: Runtime,
479}
480
481impl<'a> TestRunnerInner<'a> {
482 fn execute<F>(
483 &self,
484 signal_handler: &mut SignalHandler,
485 input_handler: &mut InputHandler,
486 report_cancel_rx: oneshot::Receiver<()>,
487 callback: F,
488 ) -> Result<RunStats, Vec<JoinError>>
489 where
490 F: FnMut(ReporterEvent<'a>) + Send,
491 {
492 let global_timeout = if self.interceptor.should_disable_timeouts() {
496 crate::time::far_future_duration()
497 } else {
498 self.profile.global_timeout(self.test_list.mode()).period
499 };
500
501 let mut dispatcher_cx = DispatcherContext::new(
502 callback,
503 self.run_id,
504 self.profile.name(),
505 self.cli_args.clone(),
506 self.test_list.run_count(),
507 self.max_fail,
508 global_timeout,
509 self.stress_condition.clone(),
510 );
511
512 let executor_cx = ExecutorContext::new(
513 self.run_id,
514 self.profile,
515 self.test_list,
516 self.double_spawn.clone(),
517 self.target_runner.clone(),
518 self.capture_strategy,
519 self.force_retries,
520 self.interceptor.clone(),
521 );
522
523 dispatcher_cx.run_started(self.test_list, self.test_threads);
525
526 let _guard = self.runtime.enter();
527
528 let mut report_cancel_rx = std::pin::pin!(report_cancel_rx.fuse());
529
530 if self.stress_condition.is_some() {
531 loop {
532 let progress = dispatcher_cx
533 .stress_progress()
534 .expect("stress_condition is Some => stress progress is Some");
535 if progress.remaining().is_some() {
536 dispatcher_cx.stress_sub_run_started(progress);
537
538 self.do_run(
539 dispatcher_cx.stress_index(),
540 &mut dispatcher_cx,
541 &executor_cx,
542 signal_handler,
543 input_handler,
544 report_cancel_rx.as_mut(),
545 )?;
546
547 dispatcher_cx.stress_sub_run_finished();
548
549 if dispatcher_cx.cancel_reason().is_some() {
550 break;
551 }
552 } else {
553 break;
554 }
555 }
556 } else {
557 self.do_run(
558 None,
559 &mut dispatcher_cx,
560 &executor_cx,
561 signal_handler,
562 input_handler,
563 report_cancel_rx,
564 )?;
565 }
566
567 let run_stats = dispatcher_cx.run_stats();
568 dispatcher_cx.run_finished();
569
570 Ok(run_stats)
571 }
572
573 fn do_run<F>(
574 &self,
575 stress_index: Option<StressIndex>,
576 dispatcher_cx: &mut DispatcherContext<'a, F>,
577 executor_cx: &ExecutorContext<'a>,
578 signal_handler: &mut SignalHandler,
579 input_handler: &mut InputHandler,
580 report_cancel_rx: Pin<&mut Fuse<oneshot::Receiver<()>>>,
581 ) -> Result<(), Vec<JoinError>>
582 where
583 F: FnMut(ReporterEvent<'a>) + Send,
584 {
585 let ((), results) = TokioScope::scope_and_block(move |scope| {
586 let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
587
588 let dispatcher_fut =
590 dispatcher_cx.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
591 scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
592
593 let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
594 let script_resp_tx = resp_tx.clone();
595 let run_scripts_fut = async move {
596 let script_data = executor_cx
599 .run_setup_scripts(stress_index, script_resp_tx)
600 .await;
601 if script_tx.send(script_data).is_err() {
602 debug!("script_tx.send failed, shutting down");
604 }
605 RunnerTaskState::finished_no_children()
606 };
607 scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
608
609 let Some(script_data) = script_rx.blocking_recv() else {
610 debug!("no script data received, shutting down");
612 return;
613 };
614
615 let groups = self
617 .profile
618 .test_group_config()
619 .iter()
620 .map(|(group_name, config)| (group_name, config.max_threads.compute()));
621
622 let setup_script_data = Arc::new(script_data);
623
624 let filter_resp_tx = resp_tx.clone();
625
626 let tests = self.test_list.to_priority_queue(self.profile);
627 let run_tests_fut = futures::stream::iter(tests)
628 .filter_map(move |test| {
629 let filter_resp_tx = filter_resp_tx.clone();
637 async move {
638 if let FilterMatch::Mismatch { reason } =
639 test.instance.test_info.filter_match
640 {
641 let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
643 stress_index,
644 test_instance: test.instance,
645 reason,
646 });
647 return None;
648 }
649 Some(test)
650 }
651 })
652 .map(move |test: TestInstanceWithSettings<'a>| {
653 let threads_required =
654 test.settings.threads_required().compute(self.test_threads);
655 let test_group = match test.settings.test_group() {
656 TestGroup::Global => None,
657 TestGroup::Custom(name) => Some(name.clone()),
658 };
659 let resp_tx = resp_tx.clone();
660 let setup_script_data = setup_script_data.clone();
661
662 let test_instance = test.instance;
663
664 let f = move |cx: FutureQueueContext| {
665 debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
666 async move {
672 let ((), mut ret) = unsafe {
686 TokioScope::scope_and_collect(move |scope| {
687 scope.spawn(executor_cx.run_test_instance(
688 stress_index,
689 test,
690 cx,
691 resp_tx.clone(),
692 setup_script_data,
693 ))
694 })
695 }
696 .await;
697
698 let Some(result) = ret.pop() else {
701 warn!(
702 "no task was started for test instance: {}",
703 test_instance.id()
704 );
705 return None;
706 };
707 result.err()
708 }
709 };
710
711 (threads_required, test_group, f)
712 })
713 .future_queue_grouped(self.test_threads, groups)
716 .filter_map(std::future::ready)
718 .collect::<Vec<_>>()
719 .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
724
725 scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
726 });
727
728 let mut cancelled_count = 0;
735 let join_errors = results
736 .into_iter()
737 .flat_map(|r| {
738 match r {
739 Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
740 Ok(RunnerTaskState::Cancelled) => {
743 cancelled_count += 1;
744 Vec::new()
745 }
746 Err(join_error) => vec![join_error],
747 }
748 })
749 .collect::<Vec<_>>();
750
751 if cancelled_count > 0 {
752 debug!(
753 "{} tasks were cancelled -- this \
754 generally should only happen due to panics",
755 cancelled_count
756 );
757 }
758 if !join_errors.is_empty() {
759 return Err(join_errors);
760 }
761
762 Ok(())
763 }
764}
765
766pub fn configure_handle_inheritance(
779 no_capture: bool,
780) -> Result<(), ConfigureHandleInheritanceError> {
781 super::os::configure_handle_inheritance_impl(no_capture)
782}
783
784#[cfg(test)]
785mod tests {
786 use super::*;
787 use crate::{config::core::NextestConfig, platform::BuildPlatforms};
788
789 #[test]
790 fn no_capture_settings() {
791 let mut builder = TestRunnerBuilder::default();
793 builder
794 .set_capture_strategy(CaptureStrategy::None)
795 .set_test_threads(TestThreads::Count(20));
796 let test_list = TestList::empty();
797 let config = NextestConfig::default_config("/fake/dir");
798 let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
799 let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
800 let signal_handler = SignalHandlerKind::Noop;
801 let input_handler = InputHandlerKind::Noop;
802 let profile = profile.apply_build_platforms(&build_platforms);
803 let runner = builder
804 .build(
805 &test_list,
806 &profile,
807 vec![],
808 signal_handler,
809 input_handler,
810 DoubleSpawnInfo::disabled(),
811 TargetRunner::empty(),
812 )
813 .unwrap();
814 assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
815 assert_eq!(runner.inner.test_threads, 1, "tests run serially");
816 }
817
818 #[test]
819 fn test_debugger_command_parsing() {
820 let cmd = DebuggerCommand::from_str("gdb --args").unwrap();
822 assert_eq!(cmd.program(), "gdb");
823 assert_eq!(cmd.args(), &["--args"]);
824
825 let cmd = DebuggerCommand::from_str("rust-gdb -ex run --args").unwrap();
826 assert_eq!(cmd.program(), "rust-gdb");
827 assert_eq!(cmd.args(), &["-ex", "run", "--args"]);
828
829 let cmd = DebuggerCommand::from_str(r#"gdb -ex "set print pretty on" --args"#).unwrap();
831 assert_eq!(cmd.program(), "gdb");
832 assert_eq!(cmd.args(), &["-ex", "set print pretty on", "--args"]);
833
834 let err = DebuggerCommand::from_str("").unwrap_err();
836 assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
837
838 let err = DebuggerCommand::from_str(" ").unwrap_err();
840 assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
841 }
842}