Skip to main content

test_r_core/
execution.rs

1use rand::prelude::{SliceRandom, StdRng};
2use rand::SeedableRng;
3use std::any::Any;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::fmt::{Debug, Formatter};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use topological_sort::TopologicalSort;
9
10use crate::args::Arguments;
11use crate::internal::{
12    apply_suite_tags, apply_suite_timeouts, filter_registered_tests, DependencyConstructor,
13    DependencyView, RegisteredDependency, RegisteredTest, RegisteredTestSuiteProperty,
14};
15
16pub(crate) struct TestSuiteExecution {
17    crate_and_module: String,
18    dependencies: Vec<RegisteredDependency>,
19    tests: Vec<RegisteredTest>,
20    props: Vec<RegisteredTestSuiteProperty>,
21    inner: Vec<TestSuiteExecution>,
22    materialized_dependencies: HashMap<String, Arc<dyn Any + Send + Sync>>,
23    sequential_lock: SequentialExecutionLock,
24    remaining_count: usize,
25    idx: usize,
26    is_sequential: bool,
27    skip_creating_dependencies: bool,
28    in_progress: Arc<AtomicUsize>,
29}
30
31impl TestSuiteExecution {
32    pub fn construct(
33        arguments: &Arguments,
34        dependencies: &[RegisteredDependency],
35        tests: &[RegisteredTest],
36        props: &[RegisteredTestSuiteProperty],
37    ) -> (Self, Vec<RegisteredTest>) {
38        let tagged_tests = apply_suite_tags(tests, props);
39        let timed_tests = apply_suite_timeouts(&tagged_tests, props);
40        let mut filtered_tests = filter_registered_tests(arguments, &timed_tests);
41        Self::shuffle(arguments, &mut filtered_tests);
42        filtered_tests.reverse();
43
44        if filtered_tests.is_empty() {
45            (
46                Self::root(
47                    dependencies
48                        .iter()
49                        .filter(|dep| dep.crate_name.is_empty() && dep.module_path.is_empty())
50                        .cloned()
51                        .collect::<Vec<_>>(),
52                    Vec::new(),
53                    props
54                        .iter()
55                        .filter(|dep| dep.crate_name().is_empty() && dep.module_path().is_empty())
56                        .cloned()
57                        .collect::<Vec<_>>(),
58                ),
59                Vec::new(),
60            )
61        } else {
62            let mut root = Self::root(Vec::new(), Vec::new(), Vec::new());
63
64            for prop in props {
65                root.add_prop(prop.clone());
66            }
67
68            for dep in dependencies {
69                root.add_dependency(dep.clone());
70            }
71
72            for test in filtered_tests.clone() {
73                root.add_test(test.clone());
74            }
75
76            root.propagate_sequential(None);
77            root.prune_unused_deps();
78
79            (root, filtered_tests)
80        }
81    }
82
83    fn shuffle(arguments: &Arguments, tests: &mut [RegisteredTest]) {
84        if let Some(seed) = arguments.shuffle_seed {
85            let mut rng = StdRng::seed_from_u64(seed);
86            tests.shuffle(&mut rng);
87        }
88    }
89
90    /// Disables creating dependencies when picking the next test. This is useful when the execution plan
91    /// is only used to drive spawned workers instead of actually running the tests.
92    pub fn skip_creating_dependencies(&mut self) {
93        self.skip_creating_dependencies = true;
94        for inner in &mut self.inner {
95            inner.skip_creating_dependencies();
96        }
97    }
98
99    pub fn remaining(&self) -> usize {
100        self.remaining_count
101    }
102
103    pub fn is_empty(&self) -> bool {
104        self.tests.is_empty() && self.inner.is_empty()
105    }
106
107    pub fn is_done(&self) -> bool {
108        self.remaining_count == 0
109    }
110
111    /// Returns true if either this level, or any of the inner levels have dependencies
112    pub fn has_dependencies(&self) -> bool {
113        !self.dependencies.is_empty() || self.inner.iter().any(|inner| inner.has_dependencies())
114    }
115
116    /// Returns true if there are any tests that require capturing, based on the given default setting
117    /// and the per-test CaptureControl overrides.
118    pub fn requires_capturing(&self, capture_by_default: bool) -> bool {
119        self.tests.iter().any(|test| {
120            test.props
121                .capture_control
122                .requires_capturing(capture_by_default)
123        }) || self
124            .inner
125            .iter()
126            .any(|inner| inner.requires_capturing(capture_by_default))
127    }
128
129    #[cfg(feature = "tokio")]
130    pub async fn pick_next(&mut self) -> Option<TestExecution> {
131        if self.is_empty() {
132            None
133        } else {
134            match self
135                .pick_next_internal(&self.create_dependency_map(&HashMap::new()))
136                .await
137            {
138                Some((test, deps, seq_lock, in_progress_counter)) => {
139                    let index = self.idx;
140                    self.idx += 1;
141                    Some(TestExecution {
142                        test: test.clone(),
143                        deps: Arc::new(deps),
144                        index,
145                        _seq_lock: seq_lock,
146                        in_progress_counter,
147                    })
148                }
149                None => None,
150            }
151        }
152    }
153
154    pub fn pick_next_sync(&mut self) -> Option<TestExecution> {
155        match self.pick_next_internal_sync(&HashMap::new()) {
156            Some((test, deps, seq_lock, in_progress_counter)) => {
157                let index = self.idx;
158                self.idx += 1;
159                Some(TestExecution {
160                    test: test.clone(),
161                    deps: Arc::new(deps),
162                    index,
163                    _seq_lock: seq_lock,
164                    in_progress_counter,
165                })
166            }
167            None => None,
168        }
169    }
170
171    #[cfg(feature = "tokio")]
172    #[allow(clippy::type_complexity)]
173    async fn pick_next_internal(
174        &mut self,
175        materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
176    ) -> Option<(
177        RegisteredTest,
178        HashMap<String, Arc<dyn Any + Send + Sync>>,
179        SequentialExecutionLockGuard,
180        Arc<AtomicUsize>,
181    )> {
182        if self.is_empty() {
183            None
184        } else {
185            let dependency_map = if !self.is_materialized() {
186                self.materialize_deps(materialized_parent_deps).await
187            } else {
188                self.create_dependency_map(materialized_parent_deps)
189            };
190
191            let locked = self.sequential_lock.is_locked().await;
192            let result = if self.tests.is_empty() || locked {
193                let current = self.inner.iter_mut();
194                let mut result = None;
195                for inner in current {
196                    if let Some((test, deps, seq_lock, in_progress_counter)) =
197                        Box::pin(inner.pick_next_internal(&dependency_map)).await
198                    {
199                        result = Some((test, deps, seq_lock, in_progress_counter));
200                        break;
201                    }
202                }
203                self.inner.retain(|inner| !inner.is_empty());
204
205                result
206            } else {
207                let guard = self.sequential_lock.lock(self.is_sequential).await;
208                self.in_progress.fetch_add(1, Ordering::Release);
209                self.tests
210                    .pop()
211                    .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
212            };
213            if result.is_none()
214                && self.is_empty()
215                && self.is_materialized()
216                && !locked
217                && self.in_progress.load(Ordering::Acquire) == 0
218            {
219                self.drop_deps();
220            }
221            if result.is_some() {
222                self.remaining_count -= 1;
223            }
224            result
225        }
226    }
227
228    #[allow(clippy::type_complexity)]
229    fn pick_next_internal_sync(
230        &mut self,
231        materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
232    ) -> Option<(
233        RegisteredTest,
234        HashMap<String, Arc<dyn Any + Send + Sync>>,
235        SequentialExecutionLockGuard,
236        Arc<AtomicUsize>,
237    )> {
238        if self.is_empty() {
239            None
240        } else {
241            let dependency_map = if !self.is_materialized() {
242                self.materialize_deps_sync(materialized_parent_deps)
243            } else {
244                self.create_dependency_map(materialized_parent_deps)
245            };
246
247            let locked = self.sequential_lock.is_locked_sync();
248            let result = if self.tests.is_empty() || locked {
249                let current = self.inner.iter_mut();
250                let mut result = None;
251                for inner in current {
252                    if let Some((test, deps, seq_lock, in_progress_counter)) =
253                        inner.pick_next_internal_sync(&dependency_map)
254                    {
255                        result = Some((test, deps, seq_lock, in_progress_counter));
256                        break;
257                    }
258                }
259
260                self.inner.retain(|inner| !inner.is_empty());
261                result
262            } else {
263                let guard = self.sequential_lock.lock_sync(self.is_sequential);
264                self.in_progress.fetch_add(1, Ordering::Release);
265                self.tests
266                    .pop()
267                    .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
268            };
269            if result.is_none()
270                && self.is_materialized()
271                && !locked
272                && self.in_progress.load(Ordering::Acquire) == 0
273            {
274                self.drop_deps();
275            }
276            if result.is_some() {
277                self.remaining_count -= 1;
278            }
279            result
280        }
281    }
282
283    fn create_dependency_map(
284        &self,
285        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
286    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
287        let mut result = parent_map.clone();
288        for (key, dep) in &self.materialized_dependencies {
289            result.insert(key.clone(), dep.clone());
290        }
291        result
292    }
293
294    fn root(
295        deps: Vec<RegisteredDependency>,
296        tests: Vec<RegisteredTest>,
297        props: Vec<RegisteredTestSuiteProperty>,
298    ) -> Self {
299        let total_count = tests.len();
300        let is_sequential = props
301            .iter()
302            .any(|prop| matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }))
303            || tests.iter().any(|test| test.run.is_bench());
304        Self {
305            crate_and_module: String::new(),
306            dependencies: deps,
307            tests,
308            props,
309            inner: Vec::new(),
310            materialized_dependencies: HashMap::new(),
311            remaining_count: total_count,
312            idx: 0,
313            sequential_lock: SequentialExecutionLock::new(),
314            is_sequential,
315            skip_creating_dependencies: false,
316            in_progress: Arc::new(AtomicUsize::new(0)),
317        }
318    }
319
320    fn add_dependency(&mut self, dep: RegisteredDependency) {
321        let crate_and_module = dep.crate_and_module();
322        if self.crate_and_module == crate_and_module {
323            self.dependencies.push(dep);
324        } else {
325            let mut found = false;
326            for inner in &mut self.inner {
327                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
328                    inner.add_dependency(dep.clone());
329                    found = true;
330                    break;
331                }
332            }
333            if !found {
334                let mut inner = Self {
335                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
336                    dependencies: vec![],
337                    tests: vec![],
338                    inner: vec![],
339                    props: vec![],
340                    materialized_dependencies: HashMap::new(),
341                    remaining_count: 0,
342                    idx: 0,
343                    is_sequential: false,
344                    sequential_lock: SequentialExecutionLock::new(),
345                    skip_creating_dependencies: false,
346                    in_progress: Arc::new(AtomicUsize::new(0)),
347                };
348                inner.add_dependency(dep);
349                self.inner.push(inner);
350            }
351        }
352    }
353
354    fn add_test(&mut self, test: RegisteredTest) {
355        let crate_and_module = test.crate_and_module();
356        if self.crate_and_module == crate_and_module {
357            self.tests.push(test.clone());
358
359            if test.run.is_bench() {
360                self.is_sequential = true;
361            }
362        } else {
363            let mut found = false;
364            for inner in &mut self.inner {
365                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
366                    inner.add_test(test.clone());
367                    found = true;
368                    break;
369                }
370            }
371            if !found {
372                let mut inner = Self {
373                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
374                    dependencies: vec![],
375                    tests: vec![],
376                    inner: vec![],
377                    props: vec![],
378                    materialized_dependencies: HashMap::new(),
379                    remaining_count: 0,
380                    idx: 0,
381                    is_sequential: false,
382                    sequential_lock: SequentialExecutionLock::new(),
383                    skip_creating_dependencies: false,
384                    in_progress: Arc::new(AtomicUsize::new(0)),
385                };
386                inner.add_test(test);
387                self.inner.push(inner);
388            }
389        }
390        self.remaining_count += 1;
391    }
392
393    fn add_prop(&mut self, prop: RegisteredTestSuiteProperty) {
394        let crate_and_module = prop.crate_and_module();
395        if self.crate_and_module == crate_and_module {
396            if matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }) {
397                self.is_sequential = true;
398            }
399            self.props.push(prop);
400        } else {
401            let mut found = false;
402            for inner in &mut self.inner {
403                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
404                    inner.add_prop(prop.clone());
405                    found = true;
406                    break;
407                }
408            }
409            if !found {
410                let mut inner = Self {
411                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
412                    dependencies: vec![],
413                    tests: vec![],
414                    inner: vec![],
415                    props: vec![],
416                    materialized_dependencies: HashMap::new(),
417                    remaining_count: 0,
418                    idx: 0,
419                    is_sequential: false,
420                    sequential_lock: SequentialExecutionLock::new(),
421                    skip_creating_dependencies: false,
422                    in_progress: Arc::new(AtomicUsize::new(0)),
423                };
424                inner.add_prop(prop);
425                self.inner.push(inner);
426            }
427        }
428    }
429
430    fn is_materialized(&self) -> bool {
431        self.skip_creating_dependencies
432            || self.materialized_dependencies.len() == self.dependencies.len()
433    }
434
435    #[cfg(feature = "tokio")]
436    async fn materialize_deps(
437        &mut self,
438        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
439    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
440        let mut deps = HashMap::with_capacity(self.dependencies.len());
441        let mut dependency_map = parent_map.clone();
442
443        let sorted_dependencies = self.sorted_dependencies();
444        for dep in &sorted_dependencies {
445            let materialized_dep = match &dep.constructor {
446                DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
447                DependencyConstructor::Async(cons) => cons(Arc::new(dependency_map.clone())).await,
448            };
449            deps.insert(dep.name.clone(), materialized_dep.clone());
450            dependency_map.insert(dep.name.clone(), materialized_dep);
451        }
452        self.materialized_dependencies = deps;
453        dependency_map
454    }
455
456    fn materialize_deps_sync(
457        &mut self,
458        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
459    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
460        let mut deps = HashMap::with_capacity(self.dependencies.len());
461        let mut dependency_map = parent_map.clone();
462
463        let sorted_dependencies = self.sorted_dependencies();
464        for dep in &sorted_dependencies {
465            let materialized_dep = match &dep.constructor {
466                DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
467                DependencyConstructor::Async(_cons) => {
468                    panic!("Async dependencies are not supported in sync mode")
469                }
470            };
471            deps.insert(dep.name.clone(), materialized_dep.clone());
472            dependency_map.insert(dep.name.clone(), materialized_dep);
473        }
474        self.materialized_dependencies = deps;
475        dependency_map
476    }
477
478    fn sorted_dependencies(&self) -> Vec<&RegisteredDependency> {
479        let mut ts: TopologicalSort<&RegisteredDependency> = TopologicalSort::new();
480        for dep in &self.dependencies {
481            let mut added = false;
482            for dep_dep_name in &dep.dependencies {
483                if let Some(dep_dep) = self.dependencies.iter().find(|d| &d.name == dep_dep_name) {
484                    ts.add_dependency(dep_dep, dep);
485                    added = true;
486                } else {
487                    // otherwise it is expected to come from the parent level
488                }
489            }
490            if !added {
491                ts.insert(dep);
492            }
493        }
494        let mut result = Vec::with_capacity(self.dependencies.len());
495        loop {
496            let chunk = ts.pop_all();
497            if chunk.is_empty() {
498                break;
499            }
500            result.extend(chunk);
501        }
502        result
503    }
504
505    fn drop_deps(&mut self) {
506        self.materialized_dependencies.clear();
507    }
508
509    /// Prunes dependencies that are not needed by any test in this subtree.
510    /// Returns `Some(needed_from_parent)` with dep names needed from ancestor levels,
511    /// or `None` if pruning is disabled for this subtree (unknown deps).
512    fn prune_unused_deps(&mut self) -> Option<HashSet<String>> {
513        // Collect dep names needed by tests at this level
514        let mut needed: Option<HashSet<String>> = Some(HashSet::new());
515        for test in &self.tests {
516            match &test.dependencies {
517                None => {
518                    needed = None;
519                    break;
520                }
521                Some(deps) => {
522                    if let Some(ref mut set) = needed {
523                        set.extend(deps.iter().cloned());
524                    }
525                }
526            }
527        }
528
529        // Merge children's needs
530        for inner in &mut self.inner {
531            let child_needs = inner.prune_unused_deps();
532            needed = match (needed, child_needs) {
533                (None, _) | (_, None) => None,
534                (Some(mut a), Some(b)) => {
535                    a.extend(b);
536                    Some(a)
537                }
538            };
539        }
540
541        // If any test has unknown deps, keep everything
542        let needed = needed?;
543
544        // Determine which local deps to keep
545        let local_names: HashSet<String> =
546            self.dependencies.iter().map(|d| d.name.clone()).collect();
547        let mut keep_local: HashSet<String> = needed.intersection(&local_names).cloned().collect();
548
549        // Expand transitive closure for local deps only (fixpoint)
550        let mut queue: VecDeque<String> = keep_local.iter().cloned().collect();
551        let mut needed_from_parent: HashSet<String> =
552            needed.difference(&local_names).cloned().collect();
553
554        while let Some(dep_name) = queue.pop_front() {
555            if let Some(dep) = self.dependencies.iter().find(|d| d.name == dep_name) {
556                for transitive in &dep.dependencies {
557                    if local_names.contains(transitive) {
558                        if keep_local.insert(transitive.clone()) {
559                            queue.push_back(transitive.clone());
560                        }
561                    } else {
562                        needed_from_parent.insert(transitive.clone());
563                    }
564                }
565            }
566        }
567
568        // Prune
569        self.dependencies.retain(|d| keep_local.contains(&d.name));
570
571        Some(needed_from_parent)
572    }
573
574    fn is_prefix_of(this: &str, that: &str) -> bool {
575        this.is_empty() || this == that || that.starts_with(&format!("{this}::"))
576    }
577
578    fn next_level(from: &str, to: &str) -> String {
579        assert!(Self::is_prefix_of(from, to));
580        let remaining = if from.is_empty() {
581            to
582        } else {
583            &to[from.len() + 2..]
584        };
585
586        let result = if let Some((next, _tail)) = remaining.split_once("::") {
587            format!("{from}::{next}")
588        } else {
589            format!("{from}::{remaining}")
590        };
591        result.trim_start_matches("::").to_string()
592    }
593
594    fn propagate_sequential(&mut self, inherited_lock: Option<&SequentialExecutionLock>) {
595        if let Some(parent_lock) = inherited_lock {
596            self.is_sequential = true;
597            self.sequential_lock = parent_lock.clone();
598        }
599
600        let lock_for_children = if self.is_sequential {
601            Some(self.sequential_lock.clone())
602        } else {
603            None
604        };
605
606        for child in &mut self.inner {
607            child.propagate_sequential(lock_for_children.as_ref());
608        }
609    }
610}
611
612impl Debug for TestSuiteExecution {
613    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
614        writeln!(
615            f,
616            "'{}' {} [{}]",
617            self.crate_and_module,
618            self.props
619                .iter()
620                .map(|x| format!("{x:?}"))
621                .collect::<Vec<_>>()
622                .join(", "),
623            if self.is_sequential { "S" } else { "P" }
624        )?;
625        writeln!(f, "  deps:")?;
626        for dep in &self.dependencies {
627            writeln!(f, "    '{}'", dep.name)?;
628        }
629        writeln!(f, "  tests:")?;
630        for test in &self.tests {
631            writeln!(f, "    '{}' [{:?}]", test.name, test.props.test_type)?;
632        }
633        for inner in &self.inner {
634            let inner_str = format!("{inner:?}");
635            for inner_line in inner_str.lines() {
636                writeln!(f, "  {inner_line}")?;
637            }
638        }
639        Ok(())
640    }
641}
642
643impl DependencyView for HashMap<String, Arc<dyn Any + Send + Sync>> {
644    fn get(&self, name: &str) -> Option<Arc<dyn Any + Send + Sync>> {
645        self.get(name).cloned()
646    }
647}
648
649pub struct TestExecution {
650    pub test: RegisteredTest,
651    pub deps: Arc<dyn DependencyView + Send + Sync>,
652    pub index: usize,
653    _seq_lock: SequentialExecutionLockGuard,
654    in_progress_counter: Arc<AtomicUsize>,
655}
656
657impl Drop for TestExecution {
658    fn drop(&mut self) {
659        self.in_progress_counter.fetch_sub(1, Ordering::Release);
660    }
661}
662
663#[allow(dead_code)]
664enum SequentialExecutionLockGuard {
665    None,
666    #[cfg(feature = "tokio")]
667    Async(tokio::sync::OwnedMutexGuard<()>),
668    Sync(parking_lot::ArcMutexGuard<parking_lot::RawMutex, ()>),
669}
670
671#[derive(Clone)]
672struct SequentialExecutionLock {
673    #[cfg(feature = "tokio")]
674    async_mutex: Arc<tokio::sync::Mutex<()>>,
675    sync_mutex: Arc<parking_lot::Mutex<()>>,
676}
677
678impl SequentialExecutionLock {
679    pub fn new() -> Self {
680        Self {
681            #[cfg(feature = "tokio")]
682            async_mutex: Arc::new(tokio::sync::Mutex::new(())),
683            sync_mutex: Arc::new(parking_lot::Mutex::new(())),
684        }
685    }
686
687    #[cfg(feature = "tokio")]
688    pub async fn is_locked(&self) -> bool {
689        self.async_mutex.try_lock().is_err()
690    }
691
692    pub fn is_locked_sync(&self) -> bool {
693        self.sync_mutex.try_lock().is_none()
694    }
695
696    #[cfg(feature = "tokio")]
697    pub async fn lock(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
698        if is_sequential {
699            let permit = tokio::sync::Mutex::lock_owned(self.async_mutex.clone()).await;
700            SequentialExecutionLockGuard::Async(permit)
701        } else {
702            SequentialExecutionLockGuard::None
703        }
704    }
705
706    pub fn lock_sync(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
707        if is_sequential {
708            let permit = parking_lot::Mutex::lock_arc(&self.sync_mutex);
709            SequentialExecutionLockGuard::Sync(permit)
710        } else {
711            SequentialExecutionLockGuard::None
712        }
713    }
714}