1use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6 config::{
7 core::EvaluatableProfile,
8 elements::{FlakyResult, MaxFail, RetryPolicy, TestGroup, TestThreads},
9 scripts::SetupScriptExecuteData,
10 },
11 double_spawn::DoubleSpawnInfo,
12 errors::{
13 ConfigureHandleInheritanceError, DebuggerCommandParseError, StressCountParseError,
14 TestRunnerBuildError, TestRunnerExecuteErrors, TracerCommandParseError,
15 },
16 input::{InputHandler, InputHandlerKind, InputHandlerStatus},
17 list::{OwnedTestInstanceId, TestInstanceWithSettings, TestList},
18 reporter::events::{ReporterEvent, RunStats, StressIndex},
19 runner::ExecutorEvent,
20 signal::{SignalHandler, SignalHandlerKind},
21 target_runner::TargetRunner,
22 test_output::CaptureStrategy,
23};
24use async_scoped::TokioScope;
25use chrono::{DateTime, Local};
26use future_queue::{FutureQueueContext, StreamExt};
27use futures::{future::Fuse, prelude::*};
28use nextest_metadata::FilterMatch;
29use quick_junit::ReportUuid;
30use semver::Version;
31use std::{
32 collections::BTreeSet, convert::Infallible, fmt, num::NonZero, pin::Pin, str::FromStr,
33 sync::Arc, time::Duration,
34};
35use tokio::{
36 runtime::Runtime,
37 sync::{mpsc::unbounded_channel, oneshot},
38 task::JoinError,
39};
40use tracing::{debug, warn};
41
42#[derive(Clone, Debug)]
44pub struct DebuggerCommand {
45 program: String,
46 args: Vec<String>,
47}
48
49impl DebuggerCommand {
50 pub fn program(&self) -> &str {
52 &self.program
54 }
55
56 pub fn args(&self) -> &[String] {
58 &self.args
59 }
60}
61
62impl FromStr for DebuggerCommand {
63 type Err = DebuggerCommandParseError;
64
65 fn from_str(command: &str) -> Result<Self, Self::Err> {
66 let mut parts =
67 shell_words::split(command).map_err(DebuggerCommandParseError::ShellWordsParse)?;
68 if parts.is_empty() {
69 return Err(DebuggerCommandParseError::EmptyCommand);
70 }
71 let program = parts.remove(0);
72 Ok(Self {
73 program,
74 args: parts,
75 })
76 }
77}
78
79#[derive(Clone, Debug)]
81pub struct TracerCommand {
82 program: String,
83 args: Vec<String>,
84}
85
86impl TracerCommand {
87 pub fn program(&self) -> &str {
89 &self.program
90 }
91
92 pub fn args(&self) -> &[String] {
94 &self.args
95 }
96}
97
98impl FromStr for TracerCommand {
99 type Err = TracerCommandParseError;
100
101 fn from_str(command: &str) -> Result<Self, Self::Err> {
102 let mut parts =
103 shell_words::split(command).map_err(TracerCommandParseError::ShellWordsParse)?;
104 if parts.is_empty() {
105 return Err(TracerCommandParseError::EmptyCommand);
106 }
107 let program = parts.remove(0);
108 Ok(Self {
109 program,
110 args: parts,
111 })
112 }
113}
114
115#[derive(Clone, Debug, Default)]
117pub enum Interceptor {
118 #[default]
120 None,
121
122 Debugger(DebuggerCommand),
124
125 Tracer(TracerCommand),
127}
128
129impl Interceptor {
130 pub fn should_disable_timeouts(&self) -> bool {
134 match self {
135 Interceptor::None => false,
136 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
137 }
138 }
139
140 pub fn should_passthrough_stdin(&self) -> bool {
144 match self {
145 Interceptor::None | Interceptor::Tracer(_) => false,
146 Interceptor::Debugger(_) => true,
147 }
148 }
149
150 pub fn should_create_process_group(&self) -> bool {
155 match self {
156 Interceptor::None | Interceptor::Tracer(_) => true,
157 Interceptor::Debugger(_) => false,
158 }
159 }
160
161 pub fn should_skip_leak_detection(&self) -> bool {
165 match self {
166 Interceptor::None => false,
167 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
168 }
169 }
170
171 pub fn should_show_wrapper_command(&self) -> bool {
175 match self {
176 Interceptor::None => false,
177 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
178 }
179 }
180
181 pub fn should_send_sigtstp(&self) -> bool {
188 match self {
189 Interceptor::None | Interceptor::Tracer(_) => true,
190 Interceptor::Debugger(_) => false,
191 }
192 }
193}
194
195#[derive(Clone, Debug)]
200pub struct VersionEnvVars {
201 pub current_version: Version,
203
204 pub required_version: Option<Version>,
206
207 pub recommended_version: Option<Version>,
209}
210
211impl VersionEnvVars {
212 pub(crate) fn apply_env(&self, cmd: &mut std::process::Command) {
214 cmd.env("NEXTEST_VERSION", self.current_version.to_string());
215 cmd.env(
216 "NEXTEST_REQUIRED_VERSION",
217 match &self.required_version {
218 Some(v) => v.to_string(),
219 None => "none".to_owned(),
220 },
221 );
222 cmd.env(
223 "NEXTEST_RECOMMENDED_VERSION",
224 match &self.recommended_version {
225 Some(v) => v.to_string(),
226 None => "none".to_owned(),
227 },
228 );
229 }
230}
231
232#[derive(Copy, Clone, Debug)]
234pub(super) enum ChildPid {
235 Process(#[cfg_attr(not(unix), expect(unused))] u32),
237
238 #[cfg(unix)]
240 ProcessGroup(u32),
241}
242
243impl ChildPid {
244 #[cfg(unix)]
251 pub(super) fn for_kill(self) -> i32 {
252 match self {
253 ChildPid::Process(pid) => pid as i32,
254 ChildPid::ProcessGroup(pid) => -(pid as i32),
255 }
256 }
257}
258
259#[derive(Debug, Default)]
261pub struct TestRunnerBuilder {
262 capture_strategy: CaptureStrategy,
263 retries: Option<RetryPolicy>,
264 flaky_result: Option<FlakyResult>,
265 max_fail: Option<MaxFail>,
266 test_threads: Option<TestThreads>,
267 stress_condition: Option<StressCondition>,
268 interceptor: Interceptor,
269 expected_outstanding: Option<BTreeSet<OwnedTestInstanceId>>,
270}
271
272impl TestRunnerBuilder {
273 pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
284 self.capture_strategy = strategy;
285 self
286 }
287
288 pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
290 self.retries = Some(retries);
291 self
292 }
293
294 pub fn set_flaky_result(&mut self, flaky_result: FlakyResult) -> &mut Self {
296 self.flaky_result = Some(flaky_result);
297 self
298 }
299
300 pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
302 self.max_fail = Some(max_fail);
303 self
304 }
305
306 pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
308 self.test_threads = Some(test_threads);
309 self
310 }
311
312 pub fn set_stress_condition(&mut self, stress_condition: StressCondition) -> &mut Self {
314 self.stress_condition = Some(stress_condition);
315 self
316 }
317
318 pub fn set_interceptor(&mut self, interceptor: Interceptor) -> &mut Self {
320 self.interceptor = interceptor;
321 self
322 }
323
324 pub fn set_expected_outstanding(
330 &mut self,
331 expected: BTreeSet<OwnedTestInstanceId>,
332 ) -> &mut Self {
333 self.expected_outstanding = Some(expected);
334 self
335 }
336
337 #[expect(clippy::too_many_arguments)]
343 pub fn build<'a>(
344 self,
345 run_id: ReportUuid,
346 version_env_vars: VersionEnvVars,
347 test_list: &'a TestList,
348 profile: &'a EvaluatableProfile<'a>,
349 cli_args: Vec<String>,
350 signal_handler: SignalHandlerKind,
351 input_handler: InputHandlerKind,
352 double_spawn: DoubleSpawnInfo,
353 target_runner: TargetRunner,
354 ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
355 let test_threads = match self.capture_strategy {
356 CaptureStrategy::None => 1,
357 CaptureStrategy::Combined | CaptureStrategy::Split => self
358 .test_threads
359 .unwrap_or_else(|| profile.test_threads())
360 .compute(),
361 };
362 let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
363
364 let runtime = tokio::runtime::Builder::new_multi_thread()
365 .enable_all()
366 .thread_name("nextest-runner-worker")
367 .build()
368 .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
369 let _guard = runtime.enter();
370
371 let signal_handler = signal_handler.build()?;
373
374 let input_handler = input_handler.build();
375
376 Ok(TestRunner {
377 inner: TestRunnerInner {
378 run_id,
379 started_at: Local::now(),
380 profile,
381 test_list,
382 test_threads,
383 double_spawn,
384 target_runner,
385 capture_strategy: self.capture_strategy,
386 force_retries: self.retries,
387 force_flaky_result: self.flaky_result,
388 cli_args,
389 max_fail,
390 stress_condition: self.stress_condition,
391 interceptor: self.interceptor,
392 expected_outstanding: self.expected_outstanding,
393 version_env_vars,
394 runtime,
395 },
396 signal_handler,
397 input_handler,
398 })
399 }
400}
401
402#[derive(Clone, Debug)]
404pub enum StressCondition {
405 Count(StressCount),
407
408 Duration(Duration),
410}
411
412#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
414#[serde(tag = "kind", rename_all = "kebab-case")]
415#[cfg_attr(test, derive(test_strategy::Arbitrary))]
416pub enum StressCount {
417 Count {
419 count: NonZero<u32>,
421 },
422
423 Infinite,
425}
426
427impl FromStr for StressCount {
428 type Err = StressCountParseError;
429
430 fn from_str(s: &str) -> Result<Self, Self::Err> {
431 if s == "infinite" {
432 Ok(StressCount::Infinite)
433 } else {
434 match s.parse() {
435 Ok(count) => Ok(StressCount::Count { count }),
436 Err(_) => Err(StressCountParseError::new(s)),
437 }
438 }
439 }
440}
441
442#[derive(Debug)]
446pub struct TestRunner<'a> {
447 inner: TestRunnerInner<'a>,
448 signal_handler: SignalHandler,
449 input_handler: InputHandler,
450}
451
452impl<'a> TestRunner<'a> {
453 pub fn run_id(&self) -> ReportUuid {
455 self.inner.run_id
456 }
457
458 pub fn started_at(&self) -> DateTime<Local> {
460 self.inner.started_at
461 }
462
463 pub fn input_handler_status(&self) -> InputHandlerStatus {
465 self.input_handler.status()
466 }
467
468 pub fn execute<F>(
474 self,
475 mut callback: F,
476 ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
477 where
478 F: FnMut(ReporterEvent<'a>) + Send,
479 {
480 self.try_execute::<Infallible, _>(|event| {
481 callback(event);
482 Ok(())
483 })
484 }
485
486 pub fn try_execute<E, F>(
493 mut self,
494 mut callback: F,
495 ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
496 where
497 F: FnMut(ReporterEvent<'a>) -> Result<(), E> + Send,
498 E: fmt::Debug + Send,
499 {
500 let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
501
502 let mut report_cancel_tx = Some(report_cancel_tx);
506 let mut first_error = None;
507
508 let res = self.inner.execute(
509 &mut self.signal_handler,
510 &mut self.input_handler,
511 report_cancel_rx,
512 |event| {
513 match callback(event) {
514 Ok(()) => {}
515 Err(error) => {
516 if let Some(report_cancel_tx) = report_cancel_tx.take() {
520 let _ = report_cancel_tx.send(());
521 first_error = Some(error);
522 }
523 }
524 }
525 },
526 );
527
528 self.inner.runtime.shutdown_background();
532
533 match (res, first_error) {
534 (Ok(run_stats), None) => Ok(run_stats),
535 (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
536 report_error: Some(report_error),
537 join_errors: Vec::new(),
538 }),
539 (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
540 report_error,
541 join_errors,
542 }),
543 }
544 }
545}
546
547#[derive(Debug)]
548struct TestRunnerInner<'a> {
549 run_id: ReportUuid,
550 started_at: DateTime<Local>,
551 profile: &'a EvaluatableProfile<'a>,
552 test_list: &'a TestList<'a>,
553 test_threads: usize,
554 double_spawn: DoubleSpawnInfo,
555 target_runner: TargetRunner,
556 capture_strategy: CaptureStrategy,
557 force_retries: Option<RetryPolicy>,
558 force_flaky_result: Option<FlakyResult>,
559 cli_args: Vec<String>,
560 max_fail: MaxFail,
561 stress_condition: Option<StressCondition>,
562 interceptor: Interceptor,
563 expected_outstanding: Option<BTreeSet<OwnedTestInstanceId>>,
564 version_env_vars: VersionEnvVars,
565 runtime: Runtime,
566}
567
568impl<'a> TestRunnerInner<'a> {
569 fn execute<F>(
570 &self,
571 signal_handler: &mut SignalHandler,
572 input_handler: &mut InputHandler,
573 report_cancel_rx: oneshot::Receiver<()>,
574 callback: F,
575 ) -> Result<RunStats, Vec<JoinError>>
576 where
577 F: FnMut(ReporterEvent<'a>) + Send,
578 {
579 let global_timeout = if self.interceptor.should_disable_timeouts() {
583 crate::time::far_future_duration()
584 } else {
585 self.profile.global_timeout(self.test_list.mode()).period
586 };
587
588 let mut dispatcher_cx = DispatcherContext::new(
589 callback,
590 self.run_id,
591 self.profile.name(),
592 self.cli_args.clone(),
593 self.test_list.run_count(),
594 self.max_fail,
595 global_timeout,
596 self.stress_condition.clone(),
597 self.expected_outstanding.clone(),
598 );
599
600 let executor_cx = ExecutorContext::new(
601 self.run_id,
602 self.profile,
603 self.test_list,
604 self.test_threads,
605 self.double_spawn.clone(),
606 self.target_runner.clone(),
607 self.capture_strategy,
608 self.force_retries,
609 self.force_flaky_result,
610 self.interceptor.clone(),
611 self.version_env_vars.clone(),
612 );
613
614 dispatcher_cx.run_started(self.test_list, self.test_threads);
616
617 let _guard = self.runtime.enter();
618
619 let mut report_cancel_rx = std::pin::pin!(report_cancel_rx.fuse());
620
621 if self.stress_condition.is_some() {
622 loop {
623 let progress = dispatcher_cx
624 .stress_progress()
625 .expect("stress_condition is Some => stress progress is Some");
626 if progress.remaining().is_some() {
627 dispatcher_cx.stress_sub_run_started(progress);
628
629 self.do_run(
630 dispatcher_cx.stress_index(),
631 &mut dispatcher_cx,
632 &executor_cx,
633 signal_handler,
634 input_handler,
635 report_cancel_rx.as_mut(),
636 )?;
637
638 dispatcher_cx.stress_sub_run_finished();
639
640 if dispatcher_cx.cancel_reason().is_some() {
641 break;
642 }
643 } else {
644 break;
645 }
646 }
647 } else {
648 self.do_run(
649 None,
650 &mut dispatcher_cx,
651 &executor_cx,
652 signal_handler,
653 input_handler,
654 report_cancel_rx,
655 )?;
656 }
657
658 let run_stats = dispatcher_cx.run_stats();
659 dispatcher_cx.run_finished();
660
661 Ok(run_stats)
662 }
663
664 fn do_run<F>(
665 &self,
666 stress_index: Option<StressIndex>,
667 dispatcher_cx: &mut DispatcherContext<'a, F>,
668 executor_cx: &ExecutorContext<'a>,
669 signal_handler: &mut SignalHandler,
670 input_handler: &mut InputHandler,
671 report_cancel_rx: Pin<&mut Fuse<oneshot::Receiver<()>>>,
672 ) -> Result<(), Vec<JoinError>>
673 where
674 F: FnMut(ReporterEvent<'a>) + Send,
675 {
676 let ((), results) = TokioScope::scope_and_block(move |scope| {
677 let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
678
679 let dispatcher_fut =
681 dispatcher_cx.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
682 scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
683
684 let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
685 let script_resp_tx = resp_tx.clone();
686 let run_scripts_fut = async move {
687 let script_data = executor_cx
690 .run_setup_scripts(stress_index, script_resp_tx)
691 .await;
692 if script_tx.send(script_data).is_err() {
693 debug!("script_tx.send failed, shutting down");
695 }
696 RunnerTaskState::finished_no_children()
697 };
698 scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
699
700 let Some(script_data) = script_rx.blocking_recv() else {
701 debug!("no script data received, shutting down");
703 return;
704 };
705
706 let groups = self
708 .profile
709 .test_group_config()
710 .iter()
711 .map(|(group_name, config)| (group_name, config.max_threads.compute()));
712
713 let setup_script_data = Arc::new(script_data);
714
715 let filter_resp_tx = resp_tx.clone();
716
717 let tests = self.test_list.to_priority_queue(self.profile);
718 let run_tests_fut = futures::stream::iter(tests)
719 .filter_map(move |test| {
720 let filter_resp_tx = filter_resp_tx.clone();
728 async move {
729 if let FilterMatch::Mismatch { reason } =
730 test.instance.test_info.filter_match
731 {
732 let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
734 stress_index,
735 test_instance: test.instance,
736 reason,
737 });
738 return None;
739 }
740 Some(test)
741 }
742 })
743 .map(move |test: TestInstanceWithSettings<'a>| {
744 let threads_required =
745 test.settings.threads_required().compute(self.test_threads);
746 let test_group = match test.settings.test_group() {
747 TestGroup::Global => None,
748 TestGroup::Custom(name) => Some(name.clone()),
749 };
750 let resp_tx = resp_tx.clone();
751 let setup_script_data = setup_script_data.clone();
752
753 let test_instance = test.instance;
754
755 let f = move |cx: FutureQueueContext| {
756 debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
757 async move {
763 let ((), mut ret) = unsafe {
777 TokioScope::scope_and_collect(move |scope| {
778 scope.spawn(executor_cx.run_test_instance(
779 stress_index,
780 test,
781 cx,
782 resp_tx.clone(),
783 setup_script_data,
784 ))
785 })
786 }
787 .await;
788
789 let Some(result) = ret.pop() else {
792 warn!(
793 "no task was started for test instance: {}",
794 test_instance.id()
795 );
796 return None;
797 };
798 result.err()
799 }
800 };
801
802 (threads_required, test_group, f)
803 })
804 .future_queue_grouped(self.test_threads, groups)
807 .filter_map(std::future::ready)
809 .collect::<Vec<_>>()
810 .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
815
816 scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
817 });
818
819 let mut cancelled_count = 0;
826 let join_errors = results
827 .into_iter()
828 .flat_map(|r| {
829 match r {
830 Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
831 Ok(RunnerTaskState::Cancelled) => {
834 cancelled_count += 1;
835 Vec::new()
836 }
837 Err(join_error) => vec![join_error],
838 }
839 })
840 .collect::<Vec<_>>();
841
842 if cancelled_count > 0 {
843 debug!(
844 "{} tasks were cancelled -- this \
845 generally should only happen due to panics",
846 cancelled_count
847 );
848 }
849 if !join_errors.is_empty() {
850 return Err(join_errors);
851 }
852
853 Ok(())
854 }
855}
856
857pub fn configure_handle_inheritance(
870 no_capture: bool,
871) -> Result<(), ConfigureHandleInheritanceError> {
872 super::os::configure_handle_inheritance_impl(no_capture)
873}
874
875#[cfg(test)]
876mod tests {
877 use super::*;
878 use crate::{config::core::NextestConfig, platform::BuildPlatforms};
879
880 #[test]
881 fn no_capture_settings() {
882 let mut builder = TestRunnerBuilder::default();
884 builder
885 .set_capture_strategy(CaptureStrategy::None)
886 .set_test_threads(TestThreads::Count(20));
887 let test_list = TestList::empty();
888 let config = NextestConfig::default_config("/fake/dir");
889 let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
890 let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
891 let signal_handler = SignalHandlerKind::Noop;
892 let input_handler = InputHandlerKind::Noop;
893 let profile = profile.apply_build_platforms(&build_platforms);
894 let version_env_vars = VersionEnvVars {
895 current_version: Version::new(0, 0, 0),
896 required_version: None,
897 recommended_version: None,
898 };
899 let runner = builder
900 .build(
901 crate::helpers::force_or_new_run_id(),
902 version_env_vars,
903 &test_list,
904 &profile,
905 vec![],
906 signal_handler,
907 input_handler,
908 DoubleSpawnInfo::disabled(),
909 TargetRunner::empty(),
910 )
911 .unwrap();
912 assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
913 assert_eq!(runner.inner.test_threads, 1, "tests run serially");
914 }
915
916 #[test]
917 fn test_debugger_command_parsing() {
918 let cmd = DebuggerCommand::from_str("gdb --args").unwrap();
920 assert_eq!(cmd.program(), "gdb");
921 assert_eq!(cmd.args(), &["--args"]);
922
923 let cmd = DebuggerCommand::from_str("rust-gdb -ex run --args").unwrap();
924 assert_eq!(cmd.program(), "rust-gdb");
925 assert_eq!(cmd.args(), &["-ex", "run", "--args"]);
926
927 let cmd = DebuggerCommand::from_str(r#"gdb -ex "set print pretty on" --args"#).unwrap();
929 assert_eq!(cmd.program(), "gdb");
930 assert_eq!(cmd.args(), &["-ex", "set print pretty on", "--args"]);
931
932 let err = DebuggerCommand::from_str("").unwrap_err();
934 assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
935
936 let err = DebuggerCommand::from_str(" ").unwrap_err();
938 assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
939 }
940}