Skip to main content

test_r_core/
execution.rs

1use rand::prelude::{SliceRandom, StdRng};
2use rand::SeedableRng;
3use std::any::Any;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::fmt::{Debug, Formatter};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use topological_sort::TopologicalSort;
9
10use crate::args::Arguments;
11use crate::internal::{
12    apply_suite_props_to_tests, filter_registered_tests, DependencyConstructor, DependencyView,
13    RegisteredDependency, RegisteredTest, RegisteredTestSuiteProperty,
14};
15
16pub(crate) struct TestSuiteExecution {
17    crate_and_module: String,
18    dependencies: Vec<RegisteredDependency>,
19    tests: Vec<RegisteredTest>,
20    props: Vec<RegisteredTestSuiteProperty>,
21    inner: Vec<TestSuiteExecution>,
22    materialized_dependencies: HashMap<String, Arc<dyn Any + Send + Sync>>,
23    sequential_lock: SequentialExecutionLock,
24    remaining_count: usize,
25    idx: usize,
26    is_sequential: bool,
27    skip_creating_dependencies: bool,
28    in_progress: Arc<AtomicUsize>,
29}
30
31impl TestSuiteExecution {
32    pub fn construct(
33        arguments: &Arguments,
34        dependencies: &[RegisteredDependency],
35        tests: &[RegisteredTest],
36        props: &[RegisteredTestSuiteProperty],
37    ) -> (Self, Vec<RegisteredTest>) {
38        let tests_with_props = apply_suite_props_to_tests(tests, props);
39        let mut filtered_tests = filter_registered_tests(arguments, &tests_with_props);
40        Self::shuffle(arguments, &mut filtered_tests);
41        filtered_tests.reverse();
42
43        if filtered_tests.is_empty() {
44            (
45                Self::root(
46                    dependencies
47                        .iter()
48                        .filter(|dep| dep.crate_name.is_empty() && dep.module_path.is_empty())
49                        .cloned()
50                        .collect::<Vec<_>>(),
51                    Vec::new(),
52                    props
53                        .iter()
54                        .filter(|dep| dep.crate_name().is_empty() && dep.module_path().is_empty())
55                        .cloned()
56                        .collect::<Vec<_>>(),
57                ),
58                Vec::new(),
59            )
60        } else {
61            let mut root = Self::root(Vec::new(), Vec::new(), Vec::new());
62
63            for prop in props {
64                root.add_prop(prop.clone());
65            }
66
67            for dep in dependencies {
68                root.add_dependency(dep.clone());
69            }
70
71            for test in filtered_tests.clone() {
72                root.add_test(test.clone());
73            }
74
75            root.propagate_sequential(None);
76            root.prune_unused_deps();
77
78            (root, filtered_tests)
79        }
80    }
81
82    fn shuffle(arguments: &Arguments, tests: &mut [RegisteredTest]) {
83        if let Some(seed) = arguments.shuffle_seed {
84            let mut rng = StdRng::seed_from_u64(seed);
85            tests.shuffle(&mut rng);
86        }
87    }
88
89    /// Disables creating dependencies when picking the next test. This is useful when the execution plan
90    /// is only used to drive spawned workers instead of actually running the tests.
91    pub fn skip_creating_dependencies(&mut self) {
92        self.skip_creating_dependencies = true;
93        for inner in &mut self.inner {
94            inner.skip_creating_dependencies();
95        }
96    }
97
98    pub fn remaining(&self) -> usize {
99        self.remaining_count
100    }
101
102    pub fn is_empty(&self) -> bool {
103        self.tests.is_empty() && self.inner.is_empty()
104    }
105
106    pub fn is_done(&self) -> bool {
107        self.remaining_count == 0
108    }
109
110    /// Returns true if either this level, or any of the inner levels have dependencies
111    pub fn has_dependencies(&self) -> bool {
112        !self.dependencies.is_empty() || self.inner.iter().any(|inner| inner.has_dependencies())
113    }
114
115    /// Returns true if there are any tests that require capturing, based on the given default setting
116    /// and the per-test CaptureControl overrides.
117    pub fn requires_capturing(&self, capture_by_default: bool) -> bool {
118        self.tests.iter().any(|test| {
119            test.props
120                .capture_control
121                .requires_capturing(capture_by_default)
122        }) || self
123            .inner
124            .iter()
125            .any(|inner| inner.requires_capturing(capture_by_default))
126    }
127
128    #[cfg(feature = "tokio")]
129    pub async fn pick_next(&mut self) -> Option<TestExecution> {
130        if self.is_empty() {
131            None
132        } else {
133            match self
134                .pick_next_internal(&self.create_dependency_map(&HashMap::new()))
135                .await
136            {
137                Some((test, deps, seq_lock, in_progress_counter)) => {
138                    let index = self.idx;
139                    self.idx += 1;
140                    Some(TestExecution {
141                        test: test.clone(),
142                        deps: Arc::new(deps),
143                        index,
144                        _seq_lock: seq_lock,
145                        in_progress_counter,
146                    })
147                }
148                None => None,
149            }
150        }
151    }
152
153    pub fn pick_next_sync(&mut self) -> Option<TestExecution> {
154        match self.pick_next_internal_sync(&HashMap::new()) {
155            Some((test, deps, seq_lock, in_progress_counter)) => {
156                let index = self.idx;
157                self.idx += 1;
158                Some(TestExecution {
159                    test: test.clone(),
160                    deps: Arc::new(deps),
161                    index,
162                    _seq_lock: seq_lock,
163                    in_progress_counter,
164                })
165            }
166            None => None,
167        }
168    }
169
170    #[cfg(feature = "tokio")]
171    #[allow(clippy::type_complexity)]
172    async fn pick_next_internal(
173        &mut self,
174        materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
175    ) -> Option<(
176        RegisteredTest,
177        HashMap<String, Arc<dyn Any + Send + Sync>>,
178        SequentialExecutionLockGuard,
179        Arc<AtomicUsize>,
180    )> {
181        if self.is_empty() {
182            None
183        } else {
184            let dependency_map = if !self.is_materialized() {
185                self.materialize_deps(materialized_parent_deps).await
186            } else {
187                self.create_dependency_map(materialized_parent_deps)
188            };
189
190            let locked = self.sequential_lock.is_locked().await;
191            let result = if self.tests.is_empty() || locked {
192                let current = self.inner.iter_mut();
193                let mut result = None;
194                for inner in current {
195                    if let Some((test, deps, seq_lock, in_progress_counter)) =
196                        Box::pin(inner.pick_next_internal(&dependency_map)).await
197                    {
198                        result = Some((test, deps, seq_lock, in_progress_counter));
199                        break;
200                    }
201                }
202                self.inner.retain(|inner| !inner.is_empty());
203
204                result
205            } else {
206                let guard = self.sequential_lock.lock(self.is_sequential).await;
207                self.in_progress.fetch_add(1, Ordering::Release);
208                self.tests
209                    .pop()
210                    .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
211            };
212            if result.is_none()
213                && self.is_empty()
214                && self.is_materialized()
215                && !locked
216                && self.in_progress.load(Ordering::Acquire) == 0
217            {
218                self.drop_deps();
219            }
220            if result.is_some() {
221                self.remaining_count -= 1;
222            }
223            result
224        }
225    }
226
227    #[allow(clippy::type_complexity)]
228    fn pick_next_internal_sync(
229        &mut self,
230        materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
231    ) -> Option<(
232        RegisteredTest,
233        HashMap<String, Arc<dyn Any + Send + Sync>>,
234        SequentialExecutionLockGuard,
235        Arc<AtomicUsize>,
236    )> {
237        if self.is_empty() {
238            None
239        } else {
240            let dependency_map = if !self.is_materialized() {
241                self.materialize_deps_sync(materialized_parent_deps)
242            } else {
243                self.create_dependency_map(materialized_parent_deps)
244            };
245
246            let locked = self.sequential_lock.is_locked_sync();
247            let result = if self.tests.is_empty() || locked {
248                let current = self.inner.iter_mut();
249                let mut result = None;
250                for inner in current {
251                    if let Some((test, deps, seq_lock, in_progress_counter)) =
252                        inner.pick_next_internal_sync(&dependency_map)
253                    {
254                        result = Some((test, deps, seq_lock, in_progress_counter));
255                        break;
256                    }
257                }
258
259                self.inner.retain(|inner| !inner.is_empty());
260                result
261            } else {
262                let guard = self.sequential_lock.lock_sync(self.is_sequential);
263                self.in_progress.fetch_add(1, Ordering::Release);
264                self.tests
265                    .pop()
266                    .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
267            };
268            if result.is_none()
269                && self.is_materialized()
270                && !locked
271                && self.in_progress.load(Ordering::Acquire) == 0
272            {
273                self.drop_deps();
274            }
275            if result.is_some() {
276                self.remaining_count -= 1;
277            }
278            result
279        }
280    }
281
282    fn create_dependency_map(
283        &self,
284        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
285    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
286        let mut result = parent_map.clone();
287        for (key, dep) in &self.materialized_dependencies {
288            result.insert(key.clone(), dep.clone());
289        }
290        result
291    }
292
293    fn root(
294        deps: Vec<RegisteredDependency>,
295        tests: Vec<RegisteredTest>,
296        props: Vec<RegisteredTestSuiteProperty>,
297    ) -> Self {
298        let total_count = tests.len();
299        let is_sequential = props
300            .iter()
301            .any(|prop| matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }))
302            || tests.iter().any(|test| test.run.is_bench());
303        Self {
304            crate_and_module: String::new(),
305            dependencies: deps,
306            tests,
307            props,
308            inner: Vec::new(),
309            materialized_dependencies: HashMap::new(),
310            remaining_count: total_count,
311            idx: 0,
312            sequential_lock: SequentialExecutionLock::new(),
313            is_sequential,
314            skip_creating_dependencies: false,
315            in_progress: Arc::new(AtomicUsize::new(0)),
316        }
317    }
318
319    fn add_dependency(&mut self, dep: RegisteredDependency) {
320        let crate_and_module = dep.crate_and_module();
321        if self.crate_and_module == crate_and_module {
322            self.dependencies.push(dep);
323        } else {
324            let mut found = false;
325            for inner in &mut self.inner {
326                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
327                    inner.add_dependency(dep.clone());
328                    found = true;
329                    break;
330                }
331            }
332            if !found {
333                let mut inner = Self {
334                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
335                    dependencies: vec![],
336                    tests: vec![],
337                    inner: vec![],
338                    props: vec![],
339                    materialized_dependencies: HashMap::new(),
340                    remaining_count: 0,
341                    idx: 0,
342                    is_sequential: false,
343                    sequential_lock: SequentialExecutionLock::new(),
344                    skip_creating_dependencies: false,
345                    in_progress: Arc::new(AtomicUsize::new(0)),
346                };
347                inner.add_dependency(dep);
348                self.inner.push(inner);
349            }
350        }
351    }
352
353    fn add_test(&mut self, test: RegisteredTest) {
354        let crate_and_module = test.crate_and_module();
355        if self.crate_and_module == crate_and_module {
356            self.tests.push(test.clone());
357
358            if test.run.is_bench() {
359                self.is_sequential = true;
360            }
361        } else {
362            let mut found = false;
363            for inner in &mut self.inner {
364                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
365                    inner.add_test(test.clone());
366                    found = true;
367                    break;
368                }
369            }
370            if !found {
371                let mut inner = Self {
372                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
373                    dependencies: vec![],
374                    tests: vec![],
375                    inner: vec![],
376                    props: vec![],
377                    materialized_dependencies: HashMap::new(),
378                    remaining_count: 0,
379                    idx: 0,
380                    is_sequential: false,
381                    sequential_lock: SequentialExecutionLock::new(),
382                    skip_creating_dependencies: false,
383                    in_progress: Arc::new(AtomicUsize::new(0)),
384                };
385                inner.add_test(test);
386                self.inner.push(inner);
387            }
388        }
389        self.remaining_count += 1;
390    }
391
392    fn add_prop(&mut self, prop: RegisteredTestSuiteProperty) {
393        let crate_and_module = prop.crate_and_module();
394        if self.crate_and_module == crate_and_module {
395            if matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }) {
396                self.is_sequential = true;
397            }
398            self.props.push(prop);
399        } else {
400            let mut found = false;
401            for inner in &mut self.inner {
402                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
403                    inner.add_prop(prop.clone());
404                    found = true;
405                    break;
406                }
407            }
408            if !found {
409                let mut inner = Self {
410                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
411                    dependencies: vec![],
412                    tests: vec![],
413                    inner: vec![],
414                    props: vec![],
415                    materialized_dependencies: HashMap::new(),
416                    remaining_count: 0,
417                    idx: 0,
418                    is_sequential: false,
419                    sequential_lock: SequentialExecutionLock::new(),
420                    skip_creating_dependencies: false,
421                    in_progress: Arc::new(AtomicUsize::new(0)),
422                };
423                inner.add_prop(prop);
424                self.inner.push(inner);
425            }
426        }
427    }
428
429    fn is_materialized(&self) -> bool {
430        self.skip_creating_dependencies
431            || self.materialized_dependencies.len() == self.dependencies.len()
432    }
433
434    #[cfg(feature = "tokio")]
435    async fn materialize_deps(
436        &mut self,
437        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
438    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
439        let mut deps = HashMap::with_capacity(self.dependencies.len());
440        let mut dependency_map = parent_map.clone();
441
442        let sorted_dependencies = self.sorted_dependencies();
443        for dep in &sorted_dependencies {
444            let materialized_dep = match &dep.constructor {
445                DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
446                DependencyConstructor::Async(cons) => cons(Arc::new(dependency_map.clone())).await,
447            };
448            deps.insert(dep.name.clone(), materialized_dep.clone());
449            dependency_map.insert(dep.name.clone(), materialized_dep);
450        }
451        self.materialized_dependencies = deps;
452        dependency_map
453    }
454
455    fn materialize_deps_sync(
456        &mut self,
457        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
458    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
459        let mut deps = HashMap::with_capacity(self.dependencies.len());
460        let mut dependency_map = parent_map.clone();
461
462        let sorted_dependencies = self.sorted_dependencies();
463        for dep in &sorted_dependencies {
464            let materialized_dep = match &dep.constructor {
465                DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
466                DependencyConstructor::Async(_cons) => {
467                    panic!("Async dependencies are not supported in sync mode")
468                }
469            };
470            deps.insert(dep.name.clone(), materialized_dep.clone());
471            dependency_map.insert(dep.name.clone(), materialized_dep);
472        }
473        self.materialized_dependencies = deps;
474        dependency_map
475    }
476
477    fn sorted_dependencies(&self) -> Vec<&RegisteredDependency> {
478        let mut ts: TopologicalSort<&RegisteredDependency> = TopologicalSort::new();
479        for dep in &self.dependencies {
480            let mut added = false;
481            for dep_dep_name in &dep.dependencies {
482                if let Some(dep_dep) = self.dependencies.iter().find(|d| &d.name == dep_dep_name) {
483                    ts.add_dependency(dep_dep, dep);
484                    added = true;
485                } else {
486                    // otherwise it is expected to come from the parent level
487                }
488            }
489            if !added {
490                ts.insert(dep);
491            }
492        }
493        let mut result = Vec::with_capacity(self.dependencies.len());
494        loop {
495            let chunk = ts.pop_all();
496            if chunk.is_empty() {
497                break;
498            }
499            result.extend(chunk);
500        }
501        result
502    }
503
504    fn drop_deps(&mut self) {
505        self.materialized_dependencies.clear();
506    }
507
508    /// Prunes dependencies that are not needed by any test in this subtree.
509    /// Returns `Some(needed_from_parent)` with dep names needed from ancestor levels,
510    /// or `None` if pruning is disabled for this subtree (unknown deps).
511    fn prune_unused_deps(&mut self) -> Option<HashSet<String>> {
512        // Collect dep names needed by tests at this level
513        let mut needed: Option<HashSet<String>> = Some(HashSet::new());
514        for test in &self.tests {
515            match &test.dependencies {
516                None => {
517                    needed = None;
518                    break;
519                }
520                Some(deps) => {
521                    if let Some(ref mut set) = needed {
522                        set.extend(deps.iter().cloned());
523                    }
524                }
525            }
526        }
527
528        // Merge children's needs
529        for inner in &mut self.inner {
530            let child_needs = inner.prune_unused_deps();
531            needed = match (needed, child_needs) {
532                (None, _) | (_, None) => None,
533                (Some(mut a), Some(b)) => {
534                    a.extend(b);
535                    Some(a)
536                }
537            };
538        }
539
540        // If any test has unknown deps, keep everything
541        let needed = needed?;
542
543        // Determine which local deps to keep
544        let local_names: HashSet<String> =
545            self.dependencies.iter().map(|d| d.name.clone()).collect();
546        let mut keep_local: HashSet<String> = needed.intersection(&local_names).cloned().collect();
547
548        // Expand transitive closure for local deps only (fixpoint)
549        let mut queue: VecDeque<String> = keep_local.iter().cloned().collect();
550        let mut needed_from_parent: HashSet<String> =
551            needed.difference(&local_names).cloned().collect();
552
553        while let Some(dep_name) = queue.pop_front() {
554            if let Some(dep) = self.dependencies.iter().find(|d| d.name == dep_name) {
555                for transitive in &dep.dependencies {
556                    if local_names.contains(transitive) {
557                        if keep_local.insert(transitive.clone()) {
558                            queue.push_back(transitive.clone());
559                        }
560                    } else {
561                        needed_from_parent.insert(transitive.clone());
562                    }
563                }
564            }
565        }
566
567        // Prune
568        self.dependencies.retain(|d| keep_local.contains(&d.name));
569
570        Some(needed_from_parent)
571    }
572
573    fn is_prefix_of(this: &str, that: &str) -> bool {
574        this.is_empty() || this == that || that.starts_with(&format!("{this}::"))
575    }
576
577    fn next_level(from: &str, to: &str) -> String {
578        assert!(Self::is_prefix_of(from, to));
579        let remaining = if from.is_empty() {
580            to
581        } else {
582            &to[from.len() + 2..]
583        };
584
585        let result = if let Some((next, _tail)) = remaining.split_once("::") {
586            format!("{from}::{next}")
587        } else {
588            format!("{from}::{remaining}")
589        };
590        result.trim_start_matches("::").to_string()
591    }
592
593    fn propagate_sequential(&mut self, inherited_lock: Option<&SequentialExecutionLock>) {
594        if let Some(parent_lock) = inherited_lock {
595            self.is_sequential = true;
596            self.sequential_lock = parent_lock.clone();
597        }
598
599        let lock_for_children = if self.is_sequential {
600            Some(self.sequential_lock.clone())
601        } else {
602            None
603        };
604
605        for child in &mut self.inner {
606            child.propagate_sequential(lock_for_children.as_ref());
607        }
608    }
609}
610
611impl Debug for TestSuiteExecution {
612    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
613        writeln!(
614            f,
615            "'{}' {} [{}]",
616            self.crate_and_module,
617            self.props
618                .iter()
619                .map(|x| format!("{x:?}"))
620                .collect::<Vec<_>>()
621                .join(", "),
622            if self.is_sequential { "S" } else { "P" }
623        )?;
624        writeln!(f, "  deps:")?;
625        for dep in &self.dependencies {
626            writeln!(f, "    '{}'", dep.name)?;
627        }
628        writeln!(f, "  tests:")?;
629        for test in &self.tests {
630            writeln!(f, "    '{}' [{:?}]", test.name, test.props.test_type)?;
631        }
632        for inner in &self.inner {
633            let inner_str = format!("{inner:?}");
634            for inner_line in inner_str.lines() {
635                writeln!(f, "  {inner_line}")?;
636            }
637        }
638        Ok(())
639    }
640}
641
642impl DependencyView for HashMap<String, Arc<dyn Any + Send + Sync>> {
643    fn get(&self, name: &str) -> Option<Arc<dyn Any + Send + Sync>> {
644        self.get(name).cloned()
645    }
646}
647
648pub struct TestExecution {
649    pub test: RegisteredTest,
650    pub deps: Arc<dyn DependencyView + Send + Sync>,
651    pub index: usize,
652    _seq_lock: SequentialExecutionLockGuard,
653    in_progress_counter: Arc<AtomicUsize>,
654}
655
656impl Drop for TestExecution {
657    fn drop(&mut self) {
658        self.in_progress_counter.fetch_sub(1, Ordering::Release);
659    }
660}
661
662#[allow(dead_code)]
663enum SequentialExecutionLockGuard {
664    None,
665    #[cfg(feature = "tokio")]
666    Async(tokio::sync::OwnedMutexGuard<()>),
667    Sync(parking_lot::ArcMutexGuard<parking_lot::RawMutex, ()>),
668}
669
670#[derive(Clone)]
671struct SequentialExecutionLock {
672    #[cfg(feature = "tokio")]
673    async_mutex: Arc<tokio::sync::Mutex<()>>,
674    sync_mutex: Arc<parking_lot::Mutex<()>>,
675}
676
677impl SequentialExecutionLock {
678    pub fn new() -> Self {
679        Self {
680            #[cfg(feature = "tokio")]
681            async_mutex: Arc::new(tokio::sync::Mutex::new(())),
682            sync_mutex: Arc::new(parking_lot::Mutex::new(())),
683        }
684    }
685
686    #[cfg(feature = "tokio")]
687    pub async fn is_locked(&self) -> bool {
688        self.async_mutex.try_lock().is_err()
689    }
690
691    pub fn is_locked_sync(&self) -> bool {
692        self.sync_mutex.try_lock().is_none()
693    }
694
695    #[cfg(feature = "tokio")]
696    pub async fn lock(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
697        if is_sequential {
698            let permit = tokio::sync::Mutex::lock_owned(self.async_mutex.clone()).await;
699            SequentialExecutionLockGuard::Async(permit)
700        } else {
701            SequentialExecutionLockGuard::None
702        }
703    }
704
705    pub fn lock_sync(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
706        if is_sequential {
707            let permit = parking_lot::Mutex::lock_arc(&self.sync_mutex);
708            SequentialExecutionLockGuard::Sync(permit)
709        } else {
710            SequentialExecutionLockGuard::None
711        }
712    }
713}