test_r_core/
execution.rs

1use rand::prelude::{SliceRandom, StdRng};
2use rand::SeedableRng;
3use std::any::Any;
4use std::collections::HashMap;
5use std::fmt::{Debug, Formatter};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use topological_sort::TopologicalSort;
9
10use crate::args::Arguments;
11use crate::internal::{
12    apply_suite_tags, filter_registered_tests, DependencyConstructor, DependencyView,
13    RegisteredDependency, RegisteredTest, RegisteredTestSuiteProperty,
14};
15
16pub(crate) struct TestSuiteExecution {
17    crate_and_module: String,
18    dependencies: Vec<RegisteredDependency>,
19    tests: Vec<RegisteredTest>,
20    props: Vec<RegisteredTestSuiteProperty>,
21    inner: Vec<TestSuiteExecution>,
22    materialized_dependencies: HashMap<String, Arc<dyn Any + Send + Sync>>,
23    sequential_lock: SequentialExecutionLock,
24    remaining_count: usize,
25    idx: usize,
26    is_sequential: bool,
27    skip_creating_dependencies: bool,
28    in_progress: Arc<AtomicUsize>,
29}
30
31impl TestSuiteExecution {
32    pub fn construct(
33        arguments: &Arguments,
34        dependencies: &[RegisteredDependency],
35        tests: &[RegisteredTest],
36        props: &[RegisteredTestSuiteProperty],
37    ) -> (Self, Vec<RegisteredTest>) {
38        let tagged_tests = apply_suite_tags(tests, props);
39        let mut filtered_tests = filter_registered_tests(arguments, &tagged_tests);
40        Self::shuffle(arguments, &mut filtered_tests);
41        filtered_tests.reverse();
42
43        if filtered_tests.is_empty() {
44            (
45                Self::root(
46                    dependencies
47                        .iter()
48                        .filter(|dep| dep.crate_name.is_empty() && dep.module_path.is_empty())
49                        .cloned()
50                        .collect::<Vec<_>>(),
51                    Vec::new(),
52                    props
53                        .iter()
54                        .filter(|dep| dep.crate_name().is_empty() && dep.module_path().is_empty())
55                        .cloned()
56                        .collect::<Vec<_>>(),
57                ),
58                Vec::new(),
59            )
60        } else {
61            let mut root = Self::root(Vec::new(), Vec::new(), Vec::new());
62
63            for prop in props {
64                root.add_prop(prop.clone());
65            }
66
67            for dep in dependencies {
68                root.add_dependency(dep.clone());
69            }
70
71            for test in filtered_tests.clone() {
72                root.add_test(test.clone());
73            }
74
75            (root, filtered_tests)
76        }
77    }
78
79    fn shuffle(arguments: &Arguments, tests: &mut [RegisteredTest]) {
80        if let Some(seed) = arguments.shuffle_seed {
81            let mut rng = StdRng::seed_from_u64(seed);
82            tests.shuffle(&mut rng);
83        }
84    }
85
86    /// Disables creating dependencies when picking the next test. This is useful when the execution plan
87    /// is only used to drive spawned workers instead of actually running the tests.
88    pub fn skip_creating_dependencies(&mut self) {
89        self.skip_creating_dependencies = true;
90        for inner in &mut self.inner {
91            inner.skip_creating_dependencies();
92        }
93    }
94
95    pub fn remaining(&self) -> usize {
96        self.remaining_count
97    }
98
99    pub fn is_empty(&self) -> bool {
100        self.tests.is_empty() && self.inner.is_empty()
101    }
102
103    pub fn is_done(&self) -> bool {
104        self.remaining_count == 0
105    }
106
107    /// Returns true if either this level, or any of the inner levels have dependencies
108    pub fn has_dependencies(&self) -> bool {
109        !self.dependencies.is_empty() || self.inner.iter().any(|inner| inner.has_dependencies())
110    }
111
112    /// Returns true if there are any tests that require capturing, based on the given default setting
113    /// and the per-test CaptureControl overrides.
114    pub fn requires_capturing(&self, capture_by_default: bool) -> bool {
115        self.tests.iter().any(|test| {
116            test.props
117                .capture_control
118                .requires_capturing(capture_by_default)
119        }) || self
120            .inner
121            .iter()
122            .any(|inner| inner.requires_capturing(capture_by_default))
123    }
124
125    #[cfg(feature = "tokio")]
126    pub async fn pick_next(&mut self) -> Option<TestExecution> {
127        if self.is_empty() {
128            None
129        } else {
130            match self
131                .pick_next_internal(&self.create_dependency_map(&HashMap::new()))
132                .await
133            {
134                Some((test, deps, seq_lock, in_progress_counter)) => {
135                    let index = self.idx;
136                    self.idx += 1;
137                    Some(TestExecution {
138                        test: test.clone(),
139                        deps: Arc::new(deps),
140                        index,
141                        _seq_lock: seq_lock,
142                        in_progress_counter,
143                    })
144                }
145                None => None,
146            }
147        }
148    }
149
150    pub fn pick_next_sync(&mut self) -> Option<TestExecution> {
151        match self.pick_next_internal_sync(&HashMap::new()) {
152            Some((test, deps, seq_lock, in_progress_counter)) => {
153                let index = self.idx;
154                self.idx += 1;
155                Some(TestExecution {
156                    test: test.clone(),
157                    deps: Arc::new(deps),
158                    index,
159                    _seq_lock: seq_lock,
160                    in_progress_counter,
161                })
162            }
163            None => None,
164        }
165    }
166
167    #[cfg(feature = "tokio")]
168    #[allow(clippy::type_complexity)]
169    async fn pick_next_internal(
170        &mut self,
171        materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
172    ) -> Option<(
173        RegisteredTest,
174        HashMap<String, Arc<dyn Any + Send + Sync>>,
175        SequentialExecutionLockGuard,
176        Arc<AtomicUsize>,
177    )> {
178        if self.is_empty() {
179            None
180        } else {
181            let dependency_map = if !self.is_materialized() {
182                self.materialize_deps(materialized_parent_deps).await
183            } else {
184                self.create_dependency_map(materialized_parent_deps)
185            };
186
187            let locked = self.sequential_lock.is_locked().await;
188            let result = if self.tests.is_empty() || locked {
189                let current = self.inner.iter_mut();
190                let mut result = None;
191                for inner in current {
192                    if let Some((test, deps, seq_lock, in_progress_counter)) =
193                        Box::pin(inner.pick_next_internal(&dependency_map)).await
194                    {
195                        result = Some((test, deps, seq_lock, in_progress_counter));
196                        break;
197                    }
198                }
199                self.inner.retain(|inner| !inner.is_empty());
200
201                result
202            } else {
203                let guard = self.sequential_lock.lock(self.is_sequential).await;
204                self.in_progress.fetch_add(1, Ordering::Release);
205                self.tests
206                    .pop()
207                    .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
208            };
209            if result.is_none()
210                && self.is_empty()
211                && self.is_materialized()
212                && !locked
213                && self.in_progress.load(Ordering::Acquire) == 0
214            {
215                self.drop_deps();
216            }
217            if result.is_some() {
218                self.remaining_count -= 1;
219            }
220            result
221        }
222    }
223
224    #[allow(clippy::type_complexity)]
225    fn pick_next_internal_sync(
226        &mut self,
227        materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
228    ) -> Option<(
229        RegisteredTest,
230        HashMap<String, Arc<dyn Any + Send + Sync>>,
231        SequentialExecutionLockGuard,
232        Arc<AtomicUsize>,
233    )> {
234        if self.is_empty() {
235            None
236        } else {
237            let dependency_map = if !self.is_materialized() {
238                self.materialize_deps_sync(materialized_parent_deps)
239            } else {
240                self.create_dependency_map(materialized_parent_deps)
241            };
242
243            let locked = self.sequential_lock.is_locked_sync();
244            let result = if self.tests.is_empty() || locked {
245                let current = self.inner.iter_mut();
246                let mut result = None;
247                for inner in current {
248                    if let Some((test, deps, seq_lock, in_progress_counter)) =
249                        inner.pick_next_internal_sync(&dependency_map)
250                    {
251                        result = Some((test, deps, seq_lock, in_progress_counter));
252                        break;
253                    }
254                }
255
256                self.inner.retain(|inner| !inner.is_empty());
257                result
258            } else {
259                let guard = self.sequential_lock.lock_sync(self.is_sequential);
260                self.in_progress.fetch_add(1, Ordering::Release);
261                self.tests
262                    .pop()
263                    .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
264            };
265            if result.is_none()
266                && self.is_materialized()
267                && !locked
268                && self.in_progress.load(Ordering::Acquire) == 0
269            {
270                self.drop_deps();
271            }
272            if result.is_some() {
273                self.remaining_count -= 1;
274            }
275            result
276        }
277    }
278
279    fn create_dependency_map(
280        &self,
281        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
282    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
283        let mut result = parent_map.clone();
284        for (key, dep) in &self.materialized_dependencies {
285            result.insert(key.clone(), dep.clone());
286        }
287        result
288    }
289
290    fn root(
291        deps: Vec<RegisteredDependency>,
292        tests: Vec<RegisteredTest>,
293        props: Vec<RegisteredTestSuiteProperty>,
294    ) -> Self {
295        let total_count = tests.len();
296        let is_sequential = props
297            .iter()
298            .any(|prop| matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }))
299            || tests.iter().any(|test| test.run.is_bench());
300        Self {
301            crate_and_module: String::new(),
302            dependencies: deps,
303            tests,
304            props,
305            inner: Vec::new(),
306            materialized_dependencies: HashMap::new(),
307            remaining_count: total_count,
308            idx: 0,
309            sequential_lock: SequentialExecutionLock::new(),
310            is_sequential,
311            skip_creating_dependencies: false,
312            in_progress: Arc::new(AtomicUsize::new(0)),
313        }
314    }
315
316    fn add_dependency(&mut self, dep: RegisteredDependency) {
317        let crate_and_module = dep.crate_and_module();
318        if self.crate_and_module == crate_and_module {
319            self.dependencies.push(dep);
320        } else {
321            let mut found = false;
322            for inner in &mut self.inner {
323                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
324                    inner.add_dependency(dep.clone());
325                    found = true;
326                    break;
327                }
328            }
329            if !found {
330                let mut inner = Self {
331                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
332                    dependencies: vec![],
333                    tests: vec![],
334                    inner: vec![],
335                    props: vec![],
336                    materialized_dependencies: HashMap::new(),
337                    remaining_count: 0,
338                    idx: 0,
339                    is_sequential: false,
340                    sequential_lock: SequentialExecutionLock::new(),
341                    skip_creating_dependencies: false,
342                    in_progress: Arc::new(AtomicUsize::new(0)),
343                };
344                inner.add_dependency(dep);
345                self.inner.push(inner);
346            }
347        }
348    }
349
350    fn add_test(&mut self, test: RegisteredTest) {
351        let crate_and_module = test.crate_and_module();
352        if self.crate_and_module == crate_and_module {
353            self.tests.push(test.clone());
354
355            if test.run.is_bench() {
356                self.is_sequential = true;
357            }
358        } else {
359            let mut found = false;
360            for inner in &mut self.inner {
361                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
362                    inner.add_test(test.clone());
363                    found = true;
364                    break;
365                }
366            }
367            if !found {
368                let mut inner = Self {
369                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
370                    dependencies: vec![],
371                    tests: vec![],
372                    inner: vec![],
373                    props: vec![],
374                    materialized_dependencies: HashMap::new(),
375                    remaining_count: 0,
376                    idx: 0,
377                    is_sequential: false,
378                    sequential_lock: SequentialExecutionLock::new(),
379                    skip_creating_dependencies: false,
380                    in_progress: Arc::new(AtomicUsize::new(0)),
381                };
382                inner.add_test(test);
383                self.inner.push(inner);
384            }
385        }
386        self.remaining_count += 1;
387    }
388
389    fn add_prop(&mut self, prop: RegisteredTestSuiteProperty) {
390        let crate_and_module = prop.crate_and_module();
391        if self.crate_and_module == crate_and_module {
392            if matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }) {
393                self.is_sequential = true;
394            }
395            self.props.push(prop);
396        } else {
397            let mut found = false;
398            for inner in &mut self.inner {
399                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
400                    inner.add_prop(prop.clone());
401                    found = true;
402                    break;
403                }
404            }
405            if !found {
406                let mut inner = Self {
407                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
408                    dependencies: vec![],
409                    tests: vec![],
410                    inner: vec![],
411                    props: vec![],
412                    materialized_dependencies: HashMap::new(),
413                    remaining_count: 0,
414                    idx: 0,
415                    is_sequential: false,
416                    sequential_lock: SequentialExecutionLock::new(),
417                    skip_creating_dependencies: false,
418                    in_progress: Arc::new(AtomicUsize::new(0)),
419                };
420                inner.add_prop(prop);
421                self.inner.push(inner);
422            }
423        }
424    }
425
426    fn is_materialized(&self) -> bool {
427        self.skip_creating_dependencies
428            || self.materialized_dependencies.len() == self.dependencies.len()
429    }
430
431    #[cfg(feature = "tokio")]
432    async fn materialize_deps(
433        &mut self,
434        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
435    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
436        let mut deps = HashMap::with_capacity(self.dependencies.len());
437        let mut dependency_map = parent_map.clone();
438
439        let sorted_dependencies = self.sorted_dependencies();
440        for dep in &sorted_dependencies {
441            let materialized_dep = match &dep.constructor {
442                DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
443                DependencyConstructor::Async(cons) => cons(Arc::new(dependency_map.clone())).await,
444            };
445            deps.insert(dep.name.clone(), materialized_dep.clone());
446            dependency_map.insert(dep.name.clone(), materialized_dep);
447        }
448        self.materialized_dependencies = deps;
449        dependency_map
450    }
451
452    fn materialize_deps_sync(
453        &mut self,
454        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
455    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
456        let mut deps = HashMap::with_capacity(self.dependencies.len());
457        let mut dependency_map = parent_map.clone();
458
459        let sorted_dependencies = self.sorted_dependencies();
460        for dep in &sorted_dependencies {
461            let materialized_dep = match &dep.constructor {
462                DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
463                DependencyConstructor::Async(_cons) => {
464                    panic!("Async dependencies are not supported in sync mode")
465                }
466            };
467            deps.insert(dep.name.clone(), materialized_dep.clone());
468            dependency_map.insert(dep.name.clone(), materialized_dep);
469        }
470        self.materialized_dependencies = deps;
471        dependency_map
472    }
473
474    fn sorted_dependencies(&self) -> Vec<&RegisteredDependency> {
475        let mut ts: TopologicalSort<&RegisteredDependency> = TopologicalSort::new();
476        for dep in &self.dependencies {
477            let mut added = false;
478            for dep_dep_name in &dep.dependencies {
479                if let Some(dep_dep) = self.dependencies.iter().find(|d| &d.name == dep_dep_name) {
480                    ts.add_dependency(dep_dep, dep);
481                    added = true;
482                } else {
483                    // otherwise it is expected to come from the parent level
484                }
485            }
486            if !added {
487                ts.insert(dep);
488            }
489        }
490        let mut result = Vec::with_capacity(self.dependencies.len());
491        loop {
492            let chunk = ts.pop_all();
493            if chunk.is_empty() {
494                break;
495            }
496            result.extend(chunk);
497        }
498        result
499    }
500
501    fn drop_deps(&mut self) {
502        self.materialized_dependencies.clear();
503    }
504
505    fn is_prefix_of(this: &str, that: &str) -> bool {
506        this.is_empty() || this == that || that.starts_with(&format!("{this}::"))
507    }
508
509    fn next_level(from: &str, to: &str) -> String {
510        assert!(Self::is_prefix_of(from, to));
511        let remaining = if from.is_empty() {
512            to
513        } else {
514            &to[from.len() + 2..]
515        };
516
517        let result = if let Some((next, _tail)) = remaining.split_once("::") {
518            format!("{from}::{next}")
519        } else {
520            format!("{from}::{remaining}")
521        };
522        result.trim_start_matches("::").to_string()
523    }
524}
525
526impl Debug for TestSuiteExecution {
527    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
528        writeln!(
529            f,
530            "'{}' {} [{}]",
531            self.crate_and_module,
532            self.props
533                .iter()
534                .map(|x| format!("{x:?}"))
535                .collect::<Vec<_>>()
536                .join(", "),
537            if self.is_sequential { "S" } else { "P" }
538        )?;
539        writeln!(f, "  deps:")?;
540        for dep in &self.dependencies {
541            writeln!(f, "    '{}'", dep.name)?;
542        }
543        writeln!(f, "  tests:")?;
544        for test in &self.tests {
545            writeln!(f, "    '{}' [{:?}]", test.name, test.props.test_type)?;
546        }
547        for inner in &self.inner {
548            let inner_str = format!("{inner:?}");
549            for inner_line in inner_str.lines() {
550                writeln!(f, "  {}", inner_line)?;
551            }
552        }
553        Ok(())
554    }
555}
556
557impl DependencyView for HashMap<String, Arc<dyn Any + Send + Sync>> {
558    fn get(&self, name: &str) -> Option<Arc<dyn Any + Send + Sync>> {
559        self.get(name).cloned()
560    }
561}
562
563pub struct TestExecution {
564    pub test: RegisteredTest,
565    pub deps: Arc<dyn DependencyView + Send + Sync>,
566    pub index: usize,
567    _seq_lock: SequentialExecutionLockGuard,
568    in_progress_counter: Arc<AtomicUsize>,
569}
570
571impl Drop for TestExecution {
572    fn drop(&mut self) {
573        self.in_progress_counter.fetch_sub(1, Ordering::Release);
574    }
575}
576
577#[allow(dead_code)]
578enum SequentialExecutionLockGuard {
579    None,
580    #[cfg(feature = "tokio")]
581    Async(tokio::sync::OwnedMutexGuard<()>),
582    Sync(parking_lot::ArcMutexGuard<parking_lot::RawMutex, ()>),
583}
584
585struct SequentialExecutionLock {
586    #[cfg(feature = "tokio")]
587    async_mutex: Option<Arc<tokio::sync::Mutex<()>>>,
588    sync_mutex: Option<Arc<parking_lot::Mutex<()>>>,
589}
590
591impl SequentialExecutionLock {
592    pub fn new() -> Self {
593        Self {
594            #[cfg(feature = "tokio")]
595            async_mutex: None,
596            sync_mutex: None,
597        }
598    }
599
600    #[cfg(feature = "tokio")]
601    pub async fn is_locked(&self) -> bool {
602        if let Some(mutex) = &self.async_mutex {
603            mutex.try_lock().is_err()
604        } else {
605            false
606        }
607    }
608
609    pub fn is_locked_sync(&self) -> bool {
610        if let Some(mutex) = &self.sync_mutex {
611            mutex.try_lock().is_none()
612        } else {
613            false
614        }
615    }
616
617    #[cfg(feature = "tokio")]
618    pub async fn lock(&mut self, is_sequential: bool) -> SequentialExecutionLockGuard {
619        if is_sequential {
620            if self.async_mutex.is_none() {
621                self.async_mutex = Some(Arc::new(tokio::sync::Mutex::new(())));
622            }
623
624            let permit =
625                tokio::sync::Mutex::lock_owned(self.async_mutex.as_ref().unwrap().clone()).await;
626            SequentialExecutionLockGuard::Async(permit)
627        } else {
628            SequentialExecutionLockGuard::None
629        }
630    }
631
632    pub fn lock_sync(&mut self, is_sequential: bool) -> SequentialExecutionLockGuard {
633        if is_sequential {
634            if self.sync_mutex.is_none() {
635                self.sync_mutex = Some(Arc::new(parking_lot::Mutex::new(())));
636            }
637
638            let permit = parking_lot::Mutex::lock_arc(&self.sync_mutex.as_ref().unwrap().clone());
639            SequentialExecutionLockGuard::Sync(permit)
640        } else {
641            SequentialExecutionLockGuard::None
642        }
643    }
644}