1use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6 config::{
7 core::EvaluatableProfile,
8 elements::{MaxFail, RetryPolicy, TestGroup, TestThreads},
9 scripts::SetupScriptExecuteData,
10 },
11 double_spawn::DoubleSpawnInfo,
12 errors::{
13 ConfigureHandleInheritanceError, DebuggerCommandParseError, StressCountParseError,
14 TestRunnerBuildError, TestRunnerExecuteErrors, TracerCommandParseError,
15 },
16 input::{InputHandler, InputHandlerKind, InputHandlerStatus},
17 list::{OwnedTestInstanceId, TestInstanceWithSettings, TestList},
18 reporter::events::{ReporterEvent, RunStats, StressIndex},
19 runner::ExecutorEvent,
20 signal::{SignalHandler, SignalHandlerKind},
21 target_runner::TargetRunner,
22 test_output::CaptureStrategy,
23};
24use async_scoped::TokioScope;
25use chrono::{DateTime, Local};
26use future_queue::{FutureQueueContext, StreamExt};
27use futures::{future::Fuse, prelude::*};
28use nextest_metadata::FilterMatch;
29use quick_junit::ReportUuid;
30use std::{
31 collections::BTreeSet, convert::Infallible, fmt, num::NonZero, pin::Pin, str::FromStr,
32 sync::Arc, time::Duration,
33};
34use tokio::{
35 runtime::Runtime,
36 sync::{mpsc::unbounded_channel, oneshot},
37 task::JoinError,
38};
39use tracing::{debug, warn};
40
41#[derive(Clone, Debug)]
43pub struct DebuggerCommand {
44 program: String,
45 args: Vec<String>,
46}
47
48impl DebuggerCommand {
49 pub fn program(&self) -> &str {
51 &self.program
53 }
54
55 pub fn args(&self) -> &[String] {
57 &self.args
58 }
59}
60
61impl FromStr for DebuggerCommand {
62 type Err = DebuggerCommandParseError;
63
64 fn from_str(command: &str) -> Result<Self, Self::Err> {
65 let mut parts =
66 shell_words::split(command).map_err(DebuggerCommandParseError::ShellWordsParse)?;
67 if parts.is_empty() {
68 return Err(DebuggerCommandParseError::EmptyCommand);
69 }
70 let program = parts.remove(0);
71 Ok(Self {
72 program,
73 args: parts,
74 })
75 }
76}
77
78#[derive(Clone, Debug)]
80pub struct TracerCommand {
81 program: String,
82 args: Vec<String>,
83}
84
85impl TracerCommand {
86 pub fn program(&self) -> &str {
88 &self.program
89 }
90
91 pub fn args(&self) -> &[String] {
93 &self.args
94 }
95}
96
97impl FromStr for TracerCommand {
98 type Err = TracerCommandParseError;
99
100 fn from_str(command: &str) -> Result<Self, Self::Err> {
101 let mut parts =
102 shell_words::split(command).map_err(TracerCommandParseError::ShellWordsParse)?;
103 if parts.is_empty() {
104 return Err(TracerCommandParseError::EmptyCommand);
105 }
106 let program = parts.remove(0);
107 Ok(Self {
108 program,
109 args: parts,
110 })
111 }
112}
113
114#[derive(Clone, Debug, Default)]
116pub enum Interceptor {
117 #[default]
119 None,
120
121 Debugger(DebuggerCommand),
123
124 Tracer(TracerCommand),
126}
127
128impl Interceptor {
129 pub fn should_disable_timeouts(&self) -> bool {
133 match self {
134 Interceptor::None => false,
135 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
136 }
137 }
138
139 pub fn should_passthrough_stdin(&self) -> bool {
143 match self {
144 Interceptor::None | Interceptor::Tracer(_) => false,
145 Interceptor::Debugger(_) => true,
146 }
147 }
148
149 pub fn should_create_process_group(&self) -> bool {
154 match self {
155 Interceptor::None | Interceptor::Tracer(_) => true,
156 Interceptor::Debugger(_) => false,
157 }
158 }
159
160 pub fn should_skip_leak_detection(&self) -> bool {
164 match self {
165 Interceptor::None => false,
166 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
167 }
168 }
169
170 pub fn should_show_wrapper_command(&self) -> bool {
174 match self {
175 Interceptor::None => false,
176 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
177 }
178 }
179
180 pub fn should_send_sigtstp(&self) -> bool {
187 match self {
188 Interceptor::None | Interceptor::Tracer(_) => true,
189 Interceptor::Debugger(_) => false,
190 }
191 }
192}
193
194#[derive(Copy, Clone, Debug)]
196pub(super) enum ChildPid {
197 Process(#[cfg_attr(not(unix), expect(unused))] u32),
199
200 #[cfg(unix)]
202 ProcessGroup(u32),
203}
204
205impl ChildPid {
206 #[cfg(unix)]
213 pub(super) fn for_kill(self) -> i32 {
214 match self {
215 ChildPid::Process(pid) => pid as i32,
216 ChildPid::ProcessGroup(pid) => -(pid as i32),
217 }
218 }
219}
220
221#[derive(Debug, Default)]
223pub struct TestRunnerBuilder {
224 capture_strategy: CaptureStrategy,
225 retries: Option<RetryPolicy>,
226 max_fail: Option<MaxFail>,
227 test_threads: Option<TestThreads>,
228 stress_condition: Option<StressCondition>,
229 interceptor: Interceptor,
230 expected_outstanding: Option<BTreeSet<OwnedTestInstanceId>>,
231}
232
233impl TestRunnerBuilder {
234 pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
245 self.capture_strategy = strategy;
246 self
247 }
248
249 pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
251 self.retries = Some(retries);
252 self
253 }
254
255 pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
257 self.max_fail = Some(max_fail);
258 self
259 }
260
261 pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
263 self.test_threads = Some(test_threads);
264 self
265 }
266
267 pub fn set_stress_condition(&mut self, stress_condition: StressCondition) -> &mut Self {
269 self.stress_condition = Some(stress_condition);
270 self
271 }
272
273 pub fn set_interceptor(&mut self, interceptor: Interceptor) -> &mut Self {
275 self.interceptor = interceptor;
276 self
277 }
278
279 pub fn set_expected_outstanding(
285 &mut self,
286 expected: BTreeSet<OwnedTestInstanceId>,
287 ) -> &mut Self {
288 self.expected_outstanding = Some(expected);
289 self
290 }
291
292 #[expect(clippy::too_many_arguments)]
294 pub fn build<'a>(
295 self,
296 test_list: &'a TestList,
297 profile: &'a EvaluatableProfile<'a>,
298 cli_args: Vec<String>,
299 signal_handler: SignalHandlerKind,
300 input_handler: InputHandlerKind,
301 double_spawn: DoubleSpawnInfo,
302 target_runner: TargetRunner,
303 ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
304 let test_threads = match self.capture_strategy {
305 CaptureStrategy::None => 1,
306 CaptureStrategy::Combined | CaptureStrategy::Split => self
307 .test_threads
308 .unwrap_or_else(|| profile.test_threads())
309 .compute(),
310 };
311 let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
312
313 let runtime = tokio::runtime::Builder::new_multi_thread()
314 .enable_all()
315 .thread_name("nextest-runner-worker")
316 .build()
317 .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
318 let _guard = runtime.enter();
319
320 let signal_handler = signal_handler.build()?;
322
323 let input_handler = input_handler.build();
324
325 Ok(TestRunner {
326 inner: TestRunnerInner {
327 run_id: force_or_new_run_id(),
328 started_at: Local::now(),
329 profile,
330 test_list,
331 test_threads,
332 double_spawn,
333 target_runner,
334 capture_strategy: self.capture_strategy,
335 force_retries: self.retries,
336 cli_args,
337 max_fail,
338 stress_condition: self.stress_condition,
339 interceptor: self.interceptor,
340 expected_outstanding: self.expected_outstanding,
341 runtime,
342 },
343 signal_handler,
344 input_handler,
345 })
346 }
347}
348
349#[derive(Clone, Debug)]
351pub enum StressCondition {
352 Count(StressCount),
354
355 Duration(Duration),
357}
358
359#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
361#[serde(tag = "kind", rename_all = "kebab-case")]
362#[cfg_attr(test, derive(test_strategy::Arbitrary))]
363pub enum StressCount {
364 Count {
366 count: NonZero<u32>,
368 },
369
370 Infinite,
372}
373
374impl FromStr for StressCount {
375 type Err = StressCountParseError;
376
377 fn from_str(s: &str) -> Result<Self, Self::Err> {
378 if s == "infinite" {
379 Ok(StressCount::Infinite)
380 } else {
381 match s.parse() {
382 Ok(count) => Ok(StressCount::Count { count }),
383 Err(_) => Err(StressCountParseError::new(s)),
384 }
385 }
386 }
387}
388
389#[derive(Debug)]
393pub struct TestRunner<'a> {
394 inner: TestRunnerInner<'a>,
395 signal_handler: SignalHandler,
396 input_handler: InputHandler,
397}
398
399impl<'a> TestRunner<'a> {
400 pub fn run_id(&self) -> ReportUuid {
402 self.inner.run_id
403 }
404
405 pub fn started_at(&self) -> DateTime<Local> {
407 self.inner.started_at
408 }
409
410 pub fn input_handler_status(&self) -> InputHandlerStatus {
412 self.input_handler.status()
413 }
414
415 pub fn execute<F>(
421 self,
422 mut callback: F,
423 ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
424 where
425 F: FnMut(ReporterEvent<'a>) + Send,
426 {
427 self.try_execute::<Infallible, _>(|event| {
428 callback(event);
429 Ok(())
430 })
431 }
432
433 pub fn try_execute<E, F>(
440 mut self,
441 mut callback: F,
442 ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
443 where
444 F: FnMut(ReporterEvent<'a>) -> Result<(), E> + Send,
445 E: fmt::Debug + Send,
446 {
447 let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
448
449 let mut report_cancel_tx = Some(report_cancel_tx);
453 let mut first_error = None;
454
455 let res = self.inner.execute(
456 &mut self.signal_handler,
457 &mut self.input_handler,
458 report_cancel_rx,
459 |event| {
460 match callback(event) {
461 Ok(()) => {}
462 Err(error) => {
463 if let Some(report_cancel_tx) = report_cancel_tx.take() {
467 let _ = report_cancel_tx.send(());
468 first_error = Some(error);
469 }
470 }
471 }
472 },
473 );
474
475 self.inner.runtime.shutdown_background();
479
480 match (res, first_error) {
481 (Ok(run_stats), None) => Ok(run_stats),
482 (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
483 report_error: Some(report_error),
484 join_errors: Vec::new(),
485 }),
486 (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
487 report_error,
488 join_errors,
489 }),
490 }
491 }
492}
493
494#[derive(Debug)]
495struct TestRunnerInner<'a> {
496 run_id: ReportUuid,
497 started_at: DateTime<Local>,
498 profile: &'a EvaluatableProfile<'a>,
499 test_list: &'a TestList<'a>,
500 test_threads: usize,
501 double_spawn: DoubleSpawnInfo,
502 target_runner: TargetRunner,
503 capture_strategy: CaptureStrategy,
504 force_retries: Option<RetryPolicy>,
505 cli_args: Vec<String>,
506 max_fail: MaxFail,
507 stress_condition: Option<StressCondition>,
508 interceptor: Interceptor,
509 expected_outstanding: Option<BTreeSet<OwnedTestInstanceId>>,
510 runtime: Runtime,
511}
512
513impl<'a> TestRunnerInner<'a> {
514 fn execute<F>(
515 &self,
516 signal_handler: &mut SignalHandler,
517 input_handler: &mut InputHandler,
518 report_cancel_rx: oneshot::Receiver<()>,
519 callback: F,
520 ) -> Result<RunStats, Vec<JoinError>>
521 where
522 F: FnMut(ReporterEvent<'a>) + Send,
523 {
524 let global_timeout = if self.interceptor.should_disable_timeouts() {
528 crate::time::far_future_duration()
529 } else {
530 self.profile.global_timeout(self.test_list.mode()).period
531 };
532
533 let mut dispatcher_cx = DispatcherContext::new(
534 callback,
535 self.run_id,
536 self.profile.name(),
537 self.cli_args.clone(),
538 self.test_list.run_count(),
539 self.max_fail,
540 global_timeout,
541 self.stress_condition.clone(),
542 self.expected_outstanding.clone(),
543 );
544
545 let executor_cx = ExecutorContext::new(
546 self.run_id,
547 self.profile,
548 self.test_list,
549 self.double_spawn.clone(),
550 self.target_runner.clone(),
551 self.capture_strategy,
552 self.force_retries,
553 self.interceptor.clone(),
554 );
555
556 dispatcher_cx.run_started(self.test_list, self.test_threads);
558
559 let _guard = self.runtime.enter();
560
561 let mut report_cancel_rx = std::pin::pin!(report_cancel_rx.fuse());
562
563 if self.stress_condition.is_some() {
564 loop {
565 let progress = dispatcher_cx
566 .stress_progress()
567 .expect("stress_condition is Some => stress progress is Some");
568 if progress.remaining().is_some() {
569 dispatcher_cx.stress_sub_run_started(progress);
570
571 self.do_run(
572 dispatcher_cx.stress_index(),
573 &mut dispatcher_cx,
574 &executor_cx,
575 signal_handler,
576 input_handler,
577 report_cancel_rx.as_mut(),
578 )?;
579
580 dispatcher_cx.stress_sub_run_finished();
581
582 if dispatcher_cx.cancel_reason().is_some() {
583 break;
584 }
585 } else {
586 break;
587 }
588 }
589 } else {
590 self.do_run(
591 None,
592 &mut dispatcher_cx,
593 &executor_cx,
594 signal_handler,
595 input_handler,
596 report_cancel_rx,
597 )?;
598 }
599
600 let run_stats = dispatcher_cx.run_stats();
601 dispatcher_cx.run_finished();
602
603 Ok(run_stats)
604 }
605
606 fn do_run<F>(
607 &self,
608 stress_index: Option<StressIndex>,
609 dispatcher_cx: &mut DispatcherContext<'a, F>,
610 executor_cx: &ExecutorContext<'a>,
611 signal_handler: &mut SignalHandler,
612 input_handler: &mut InputHandler,
613 report_cancel_rx: Pin<&mut Fuse<oneshot::Receiver<()>>>,
614 ) -> Result<(), Vec<JoinError>>
615 where
616 F: FnMut(ReporterEvent<'a>) + Send,
617 {
618 let ((), results) = TokioScope::scope_and_block(move |scope| {
619 let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
620
621 let dispatcher_fut =
623 dispatcher_cx.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
624 scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
625
626 let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
627 let script_resp_tx = resp_tx.clone();
628 let run_scripts_fut = async move {
629 let script_data = executor_cx
632 .run_setup_scripts(stress_index, script_resp_tx)
633 .await;
634 if script_tx.send(script_data).is_err() {
635 debug!("script_tx.send failed, shutting down");
637 }
638 RunnerTaskState::finished_no_children()
639 };
640 scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
641
642 let Some(script_data) = script_rx.blocking_recv() else {
643 debug!("no script data received, shutting down");
645 return;
646 };
647
648 let groups = self
650 .profile
651 .test_group_config()
652 .iter()
653 .map(|(group_name, config)| (group_name, config.max_threads.compute()));
654
655 let setup_script_data = Arc::new(script_data);
656
657 let filter_resp_tx = resp_tx.clone();
658
659 let tests = self.test_list.to_priority_queue(self.profile);
660 let run_tests_fut = futures::stream::iter(tests)
661 .filter_map(move |test| {
662 let filter_resp_tx = filter_resp_tx.clone();
670 async move {
671 if let FilterMatch::Mismatch { reason } =
672 test.instance.test_info.filter_match
673 {
674 let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
676 stress_index,
677 test_instance: test.instance,
678 reason,
679 });
680 return None;
681 }
682 Some(test)
683 }
684 })
685 .map(move |test: TestInstanceWithSettings<'a>| {
686 let threads_required =
687 test.settings.threads_required().compute(self.test_threads);
688 let test_group = match test.settings.test_group() {
689 TestGroup::Global => None,
690 TestGroup::Custom(name) => Some(name.clone()),
691 };
692 let resp_tx = resp_tx.clone();
693 let setup_script_data = setup_script_data.clone();
694
695 let test_instance = test.instance;
696
697 let f = move |cx: FutureQueueContext| {
698 debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
699 async move {
705 let ((), mut ret) = unsafe {
719 TokioScope::scope_and_collect(move |scope| {
720 scope.spawn(executor_cx.run_test_instance(
721 stress_index,
722 test,
723 cx,
724 resp_tx.clone(),
725 setup_script_data,
726 ))
727 })
728 }
729 .await;
730
731 let Some(result) = ret.pop() else {
734 warn!(
735 "no task was started for test instance: {}",
736 test_instance.id()
737 );
738 return None;
739 };
740 result.err()
741 }
742 };
743
744 (threads_required, test_group, f)
745 })
746 .future_queue_grouped(self.test_threads, groups)
749 .filter_map(std::future::ready)
751 .collect::<Vec<_>>()
752 .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
757
758 scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
759 });
760
761 let mut cancelled_count = 0;
768 let join_errors = results
769 .into_iter()
770 .flat_map(|r| {
771 match r {
772 Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
773 Ok(RunnerTaskState::Cancelled) => {
776 cancelled_count += 1;
777 Vec::new()
778 }
779 Err(join_error) => vec![join_error],
780 }
781 })
782 .collect::<Vec<_>>();
783
784 if cancelled_count > 0 {
785 debug!(
786 "{} tasks were cancelled -- this \
787 generally should only happen due to panics",
788 cancelled_count
789 );
790 }
791 if !join_errors.is_empty() {
792 return Err(join_errors);
793 }
794
795 Ok(())
796 }
797}
798
799pub fn configure_handle_inheritance(
812 no_capture: bool,
813) -> Result<(), ConfigureHandleInheritanceError> {
814 super::os::configure_handle_inheritance_impl(no_capture)
815}
816
817const FORCE_RUN_ID_ENV: &str = "__NEXTEST_FORCE_RUN_ID";
819
820fn force_or_new_run_id() -> ReportUuid {
822 if let Ok(id_str) = std::env::var(FORCE_RUN_ID_ENV) {
823 match id_str.parse::<ReportUuid>() {
824 Ok(uuid) => return uuid,
825 Err(err) => {
826 warn!(
827 "{FORCE_RUN_ID_ENV} is set but invalid (expected UUID): {err}, \
828 generating random ID"
829 );
830 }
831 }
832 }
833 ReportUuid::new_v4()
834}
835
836#[cfg(test)]
837mod tests {
838 use super::*;
839 use crate::{config::core::NextestConfig, platform::BuildPlatforms};
840
841 #[test]
842 fn no_capture_settings() {
843 let mut builder = TestRunnerBuilder::default();
845 builder
846 .set_capture_strategy(CaptureStrategy::None)
847 .set_test_threads(TestThreads::Count(20));
848 let test_list = TestList::empty();
849 let config = NextestConfig::default_config("/fake/dir");
850 let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
851 let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
852 let signal_handler = SignalHandlerKind::Noop;
853 let input_handler = InputHandlerKind::Noop;
854 let profile = profile.apply_build_platforms(&build_platforms);
855 let runner = builder
856 .build(
857 &test_list,
858 &profile,
859 vec![],
860 signal_handler,
861 input_handler,
862 DoubleSpawnInfo::disabled(),
863 TargetRunner::empty(),
864 )
865 .unwrap();
866 assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
867 assert_eq!(runner.inner.test_threads, 1, "tests run serially");
868 }
869
870 #[test]
871 fn test_debugger_command_parsing() {
872 let cmd = DebuggerCommand::from_str("gdb --args").unwrap();
874 assert_eq!(cmd.program(), "gdb");
875 assert_eq!(cmd.args(), &["--args"]);
876
877 let cmd = DebuggerCommand::from_str("rust-gdb -ex run --args").unwrap();
878 assert_eq!(cmd.program(), "rust-gdb");
879 assert_eq!(cmd.args(), &["-ex", "run", "--args"]);
880
881 let cmd = DebuggerCommand::from_str(r#"gdb -ex "set print pretty on" --args"#).unwrap();
883 assert_eq!(cmd.program(), "gdb");
884 assert_eq!(cmd.args(), &["-ex", "set print pretty on", "--args"]);
885
886 let err = DebuggerCommand::from_str("").unwrap_err();
888 assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
889
890 let err = DebuggerCommand::from_str(" ").unwrap_err();
892 assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
893 }
894}