1use rand::prelude::{SliceRandom, StdRng};
2use rand::SeedableRng;
3use std::any::Any;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::fmt::{Debug, Formatter};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use topological_sort::TopologicalSort;
9
10use crate::args::Arguments;
11use crate::internal::{
12 apply_suite_props_to_tests, filter_registered_tests, DependencyConstructor, DependencyView,
13 RegisteredDependency, RegisteredTest, RegisteredTestSuiteProperty,
14};
15
16pub(crate) struct TestSuiteExecution {
17 crate_and_module: String,
18 dependencies: Vec<RegisteredDependency>,
19 tests: Vec<RegisteredTest>,
20 props: Vec<RegisteredTestSuiteProperty>,
21 inner: Vec<TestSuiteExecution>,
22 materialized_dependencies: HashMap<String, Arc<dyn Any + Send + Sync>>,
23 sequential_lock: SequentialExecutionLock,
24 remaining_count: usize,
25 idx: usize,
26 is_sequential: bool,
27 skip_creating_dependencies: bool,
28 in_progress: Arc<AtomicUsize>,
29}
30
31impl TestSuiteExecution {
32 pub fn construct(
33 arguments: &Arguments,
34 dependencies: &[RegisteredDependency],
35 tests: &[RegisteredTest],
36 props: &[RegisteredTestSuiteProperty],
37 ) -> (Self, Vec<RegisteredTest>) {
38 let tests_with_props = apply_suite_props_to_tests(tests, props);
39 let mut filtered_tests = filter_registered_tests(arguments, &tests_with_props);
40 Self::shuffle(arguments, &mut filtered_tests);
41 filtered_tests.reverse();
42
43 if filtered_tests.is_empty() {
44 (
45 Self::root(
46 dependencies
47 .iter()
48 .filter(|dep| dep.crate_name.is_empty() && dep.module_path.is_empty())
49 .cloned()
50 .collect::<Vec<_>>(),
51 Vec::new(),
52 props
53 .iter()
54 .filter(|dep| dep.crate_name().is_empty() && dep.module_path().is_empty())
55 .cloned()
56 .collect::<Vec<_>>(),
57 ),
58 Vec::new(),
59 )
60 } else {
61 let mut root = Self::root(Vec::new(), Vec::new(), Vec::new());
62
63 for prop in props {
64 root.add_prop(prop.clone());
65 }
66
67 for dep in dependencies {
68 root.add_dependency(dep.clone());
69 }
70
71 for test in filtered_tests.clone() {
72 root.add_test(test.clone());
73 }
74
75 root.propagate_sequential(None);
76 root.prune_unused_deps();
77
78 (root, filtered_tests)
79 }
80 }
81
82 fn shuffle(arguments: &Arguments, tests: &mut [RegisteredTest]) {
83 if let Some(seed) = arguments.shuffle_seed {
84 let mut rng = StdRng::seed_from_u64(seed);
85 tests.shuffle(&mut rng);
86 }
87 }
88
89 pub fn skip_creating_dependencies(&mut self) {
92 self.skip_creating_dependencies = true;
93 for inner in &mut self.inner {
94 inner.skip_creating_dependencies();
95 }
96 }
97
98 pub fn remaining(&self) -> usize {
99 self.remaining_count
100 }
101
102 pub fn is_empty(&self) -> bool {
103 self.tests.is_empty() && self.inner.is_empty()
104 }
105
106 pub fn is_done(&self) -> bool {
107 self.remaining_count == 0
108 }
109
110 pub fn has_dependencies(&self) -> bool {
112 !self.dependencies.is_empty() || self.inner.iter().any(|inner| inner.has_dependencies())
113 }
114
115 pub fn requires_capturing(&self, capture_by_default: bool) -> bool {
118 self.tests.iter().any(|test| {
119 test.props
120 .capture_control
121 .requires_capturing(capture_by_default)
122 }) || self
123 .inner
124 .iter()
125 .any(|inner| inner.requires_capturing(capture_by_default))
126 }
127
128 #[cfg(feature = "tokio")]
129 pub async fn pick_next(&mut self) -> Option<TestExecution> {
130 if self.is_empty() {
131 None
132 } else {
133 match self
134 .pick_next_internal(&self.create_dependency_map(&HashMap::new()))
135 .await
136 {
137 Some((test, deps, seq_lock, in_progress_counter)) => {
138 let index = self.idx;
139 self.idx += 1;
140 Some(TestExecution {
141 test: test.clone(),
142 deps: Arc::new(deps),
143 index,
144 _seq_lock: seq_lock,
145 in_progress_counter,
146 })
147 }
148 None => None,
149 }
150 }
151 }
152
153 pub fn pick_next_sync(&mut self) -> Option<TestExecution> {
154 match self.pick_next_internal_sync(&HashMap::new()) {
155 Some((test, deps, seq_lock, in_progress_counter)) => {
156 let index = self.idx;
157 self.idx += 1;
158 Some(TestExecution {
159 test: test.clone(),
160 deps: Arc::new(deps),
161 index,
162 _seq_lock: seq_lock,
163 in_progress_counter,
164 })
165 }
166 None => None,
167 }
168 }
169
170 #[cfg(feature = "tokio")]
171 #[allow(clippy::type_complexity)]
172 async fn pick_next_internal(
173 &mut self,
174 materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
175 ) -> Option<(
176 RegisteredTest,
177 HashMap<String, Arc<dyn Any + Send + Sync>>,
178 SequentialExecutionLockGuard,
179 Arc<AtomicUsize>,
180 )> {
181 if self.is_empty() {
182 None
183 } else {
184 let dependency_map = if !self.is_materialized() {
185 self.materialize_deps(materialized_parent_deps).await
186 } else {
187 self.create_dependency_map(materialized_parent_deps)
188 };
189
190 let locked = self.sequential_lock.is_locked().await;
191 let result = if self.tests.is_empty() || locked {
192 let current = self.inner.iter_mut();
193 let mut result = None;
194 for inner in current {
195 if let Some((test, deps, seq_lock, in_progress_counter)) =
196 Box::pin(inner.pick_next_internal(&dependency_map)).await
197 {
198 result = Some((test, deps, seq_lock, in_progress_counter));
199 break;
200 }
201 }
202 self.inner.retain(|inner| !inner.is_empty());
203
204 result
205 } else {
206 let guard = self.sequential_lock.lock(self.is_sequential).await;
207 self.in_progress.fetch_add(1, Ordering::Release);
208 self.tests
209 .pop()
210 .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
211 };
212 if result.is_none()
213 && self.is_empty()
214 && self.is_materialized()
215 && !locked
216 && self.in_progress.load(Ordering::Acquire) == 0
217 {
218 self.drop_deps();
219 }
220 if result.is_some() {
221 self.remaining_count -= 1;
222 }
223 result
224 }
225 }
226
227 #[allow(clippy::type_complexity)]
228 fn pick_next_internal_sync(
229 &mut self,
230 materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
231 ) -> Option<(
232 RegisteredTest,
233 HashMap<String, Arc<dyn Any + Send + Sync>>,
234 SequentialExecutionLockGuard,
235 Arc<AtomicUsize>,
236 )> {
237 if self.is_empty() {
238 None
239 } else {
240 let dependency_map = if !self.is_materialized() {
241 self.materialize_deps_sync(materialized_parent_deps)
242 } else {
243 self.create_dependency_map(materialized_parent_deps)
244 };
245
246 let locked = self.sequential_lock.is_locked_sync();
247 let result = if self.tests.is_empty() || locked {
248 let current = self.inner.iter_mut();
249 let mut result = None;
250 for inner in current {
251 if let Some((test, deps, seq_lock, in_progress_counter)) =
252 inner.pick_next_internal_sync(&dependency_map)
253 {
254 result = Some((test, deps, seq_lock, in_progress_counter));
255 break;
256 }
257 }
258
259 self.inner.retain(|inner| !inner.is_empty());
260 result
261 } else {
262 let guard = self.sequential_lock.lock_sync(self.is_sequential);
263 self.in_progress.fetch_add(1, Ordering::Release);
264 self.tests
265 .pop()
266 .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
267 };
268 if result.is_none()
269 && self.is_materialized()
270 && !locked
271 && self.in_progress.load(Ordering::Acquire) == 0
272 {
273 self.drop_deps();
274 }
275 if result.is_some() {
276 self.remaining_count -= 1;
277 }
278 result
279 }
280 }
281
282 fn create_dependency_map(
283 &self,
284 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
285 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
286 let mut result = parent_map.clone();
287 for (key, dep) in &self.materialized_dependencies {
288 result.insert(key.clone(), dep.clone());
289 }
290 result
291 }
292
293 fn root(
294 deps: Vec<RegisteredDependency>,
295 tests: Vec<RegisteredTest>,
296 props: Vec<RegisteredTestSuiteProperty>,
297 ) -> Self {
298 let total_count = tests.len();
299 let is_sequential = props
300 .iter()
301 .any(|prop| matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }))
302 || tests.iter().any(|test| test.run.is_bench());
303 Self {
304 crate_and_module: String::new(),
305 dependencies: deps,
306 tests,
307 props,
308 inner: Vec::new(),
309 materialized_dependencies: HashMap::new(),
310 remaining_count: total_count,
311 idx: 0,
312 sequential_lock: SequentialExecutionLock::new(),
313 is_sequential,
314 skip_creating_dependencies: false,
315 in_progress: Arc::new(AtomicUsize::new(0)),
316 }
317 }
318
319 fn add_dependency(&mut self, dep: RegisteredDependency) {
320 let crate_and_module = dep.crate_and_module();
321 if self.crate_and_module == crate_and_module {
322 self.dependencies.push(dep);
323 } else {
324 let mut found = false;
325 for inner in &mut self.inner {
326 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
327 inner.add_dependency(dep.clone());
328 found = true;
329 break;
330 }
331 }
332 if !found {
333 let mut inner = Self {
334 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
335 dependencies: vec![],
336 tests: vec![],
337 inner: vec![],
338 props: vec![],
339 materialized_dependencies: HashMap::new(),
340 remaining_count: 0,
341 idx: 0,
342 is_sequential: false,
343 sequential_lock: SequentialExecutionLock::new(),
344 skip_creating_dependencies: false,
345 in_progress: Arc::new(AtomicUsize::new(0)),
346 };
347 inner.add_dependency(dep);
348 self.inner.push(inner);
349 }
350 }
351 }
352
353 fn add_test(&mut self, test: RegisteredTest) {
354 let crate_and_module = test.crate_and_module();
355 if self.crate_and_module == crate_and_module {
356 self.tests.push(test.clone());
357
358 if test.run.is_bench() {
359 self.is_sequential = true;
360 }
361 } else {
362 let mut found = false;
363 for inner in &mut self.inner {
364 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
365 inner.add_test(test.clone());
366 found = true;
367 break;
368 }
369 }
370 if !found {
371 let mut inner = Self {
372 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
373 dependencies: vec![],
374 tests: vec![],
375 inner: vec![],
376 props: vec![],
377 materialized_dependencies: HashMap::new(),
378 remaining_count: 0,
379 idx: 0,
380 is_sequential: false,
381 sequential_lock: SequentialExecutionLock::new(),
382 skip_creating_dependencies: false,
383 in_progress: Arc::new(AtomicUsize::new(0)),
384 };
385 inner.add_test(test);
386 self.inner.push(inner);
387 }
388 }
389 self.remaining_count += 1;
390 }
391
392 fn add_prop(&mut self, prop: RegisteredTestSuiteProperty) {
393 let crate_and_module = prop.crate_and_module();
394 if self.crate_and_module == crate_and_module {
395 if matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }) {
396 self.is_sequential = true;
397 }
398 self.props.push(prop);
399 } else {
400 let mut found = false;
401 for inner in &mut self.inner {
402 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
403 inner.add_prop(prop.clone());
404 found = true;
405 break;
406 }
407 }
408 if !found {
409 let mut inner = Self {
410 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
411 dependencies: vec![],
412 tests: vec![],
413 inner: vec![],
414 props: vec![],
415 materialized_dependencies: HashMap::new(),
416 remaining_count: 0,
417 idx: 0,
418 is_sequential: false,
419 sequential_lock: SequentialExecutionLock::new(),
420 skip_creating_dependencies: false,
421 in_progress: Arc::new(AtomicUsize::new(0)),
422 };
423 inner.add_prop(prop);
424 self.inner.push(inner);
425 }
426 }
427 }
428
429 fn is_materialized(&self) -> bool {
430 self.skip_creating_dependencies
431 || self.materialized_dependencies.len() == self.dependencies.len()
432 }
433
434 #[cfg(feature = "tokio")]
435 async fn materialize_deps(
436 &mut self,
437 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
438 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
439 let mut deps = HashMap::with_capacity(self.dependencies.len());
440 let mut dependency_map = parent_map.clone();
441
442 let sorted_dependencies = self.sorted_dependencies();
443 for dep in &sorted_dependencies {
444 let materialized_dep = match &dep.constructor {
445 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
446 DependencyConstructor::Async(cons) => cons(Arc::new(dependency_map.clone())).await,
447 };
448 deps.insert(dep.name.clone(), materialized_dep.clone());
449 dependency_map.insert(dep.name.clone(), materialized_dep);
450 }
451 self.materialized_dependencies = deps;
452 dependency_map
453 }
454
455 fn materialize_deps_sync(
456 &mut self,
457 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
458 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
459 let mut deps = HashMap::with_capacity(self.dependencies.len());
460 let mut dependency_map = parent_map.clone();
461
462 let sorted_dependencies = self.sorted_dependencies();
463 for dep in &sorted_dependencies {
464 let materialized_dep = match &dep.constructor {
465 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
466 DependencyConstructor::Async(_cons) => {
467 panic!("Async dependencies are not supported in sync mode")
468 }
469 };
470 deps.insert(dep.name.clone(), materialized_dep.clone());
471 dependency_map.insert(dep.name.clone(), materialized_dep);
472 }
473 self.materialized_dependencies = deps;
474 dependency_map
475 }
476
477 fn sorted_dependencies(&self) -> Vec<&RegisteredDependency> {
478 let mut ts: TopologicalSort<&RegisteredDependency> = TopologicalSort::new();
479 for dep in &self.dependencies {
480 let mut added = false;
481 for dep_dep_name in &dep.dependencies {
482 if let Some(dep_dep) = self.dependencies.iter().find(|d| &d.name == dep_dep_name) {
483 ts.add_dependency(dep_dep, dep);
484 added = true;
485 } else {
486 }
488 }
489 if !added {
490 ts.insert(dep);
491 }
492 }
493 let mut result = Vec::with_capacity(self.dependencies.len());
494 loop {
495 let chunk = ts.pop_all();
496 if chunk.is_empty() {
497 break;
498 }
499 result.extend(chunk);
500 }
501 result
502 }
503
504 fn drop_deps(&mut self) {
505 self.materialized_dependencies.clear();
506 }
507
508 fn prune_unused_deps(&mut self) -> Option<HashSet<String>> {
512 let mut needed: Option<HashSet<String>> = Some(HashSet::new());
514 for test in &self.tests {
515 match &test.dependencies {
516 None => {
517 needed = None;
518 break;
519 }
520 Some(deps) => {
521 if let Some(ref mut set) = needed {
522 set.extend(deps.iter().cloned());
523 }
524 }
525 }
526 }
527
528 for inner in &mut self.inner {
530 let child_needs = inner.prune_unused_deps();
531 needed = match (needed, child_needs) {
532 (None, _) | (_, None) => None,
533 (Some(mut a), Some(b)) => {
534 a.extend(b);
535 Some(a)
536 }
537 };
538 }
539
540 let needed = needed?;
542
543 let local_names: HashSet<String> =
545 self.dependencies.iter().map(|d| d.name.clone()).collect();
546 let mut keep_local: HashSet<String> = needed.intersection(&local_names).cloned().collect();
547
548 let mut queue: VecDeque<String> = keep_local.iter().cloned().collect();
550 let mut needed_from_parent: HashSet<String> =
551 needed.difference(&local_names).cloned().collect();
552
553 while let Some(dep_name) = queue.pop_front() {
554 if let Some(dep) = self.dependencies.iter().find(|d| d.name == dep_name) {
555 for transitive in &dep.dependencies {
556 if local_names.contains(transitive) {
557 if keep_local.insert(transitive.clone()) {
558 queue.push_back(transitive.clone());
559 }
560 } else {
561 needed_from_parent.insert(transitive.clone());
562 }
563 }
564 }
565 }
566
567 self.dependencies.retain(|d| keep_local.contains(&d.name));
569
570 Some(needed_from_parent)
571 }
572
573 fn is_prefix_of(this: &str, that: &str) -> bool {
574 this.is_empty() || this == that || that.starts_with(&format!("{this}::"))
575 }
576
577 fn next_level(from: &str, to: &str) -> String {
578 assert!(Self::is_prefix_of(from, to));
579 let remaining = if from.is_empty() {
580 to
581 } else {
582 &to[from.len() + 2..]
583 };
584
585 let result = if let Some((next, _tail)) = remaining.split_once("::") {
586 format!("{from}::{next}")
587 } else {
588 format!("{from}::{remaining}")
589 };
590 result.trim_start_matches("::").to_string()
591 }
592
593 fn propagate_sequential(&mut self, inherited_lock: Option<&SequentialExecutionLock>) {
594 if let Some(parent_lock) = inherited_lock {
595 self.is_sequential = true;
596 self.sequential_lock = parent_lock.clone();
597 }
598
599 let lock_for_children = if self.is_sequential {
600 Some(self.sequential_lock.clone())
601 } else {
602 None
603 };
604
605 for child in &mut self.inner {
606 child.propagate_sequential(lock_for_children.as_ref());
607 }
608 }
609}
610
611impl Debug for TestSuiteExecution {
612 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
613 writeln!(
614 f,
615 "'{}' {} [{}]",
616 self.crate_and_module,
617 self.props
618 .iter()
619 .map(|x| format!("{x:?}"))
620 .collect::<Vec<_>>()
621 .join(", "),
622 if self.is_sequential { "S" } else { "P" }
623 )?;
624 writeln!(f, " deps:")?;
625 for dep in &self.dependencies {
626 writeln!(f, " '{}'", dep.name)?;
627 }
628 writeln!(f, " tests:")?;
629 for test in &self.tests {
630 writeln!(f, " '{}' [{:?}]", test.name, test.props.test_type)?;
631 }
632 for inner in &self.inner {
633 let inner_str = format!("{inner:?}");
634 for inner_line in inner_str.lines() {
635 writeln!(f, " {inner_line}")?;
636 }
637 }
638 Ok(())
639 }
640}
641
642impl DependencyView for HashMap<String, Arc<dyn Any + Send + Sync>> {
643 fn get(&self, name: &str) -> Option<Arc<dyn Any + Send + Sync>> {
644 self.get(name).cloned()
645 }
646}
647
648pub struct TestExecution {
649 pub test: RegisteredTest,
650 pub deps: Arc<dyn DependencyView + Send + Sync>,
651 pub index: usize,
652 _seq_lock: SequentialExecutionLockGuard,
653 in_progress_counter: Arc<AtomicUsize>,
654}
655
656impl Drop for TestExecution {
657 fn drop(&mut self) {
658 self.in_progress_counter.fetch_sub(1, Ordering::Release);
659 }
660}
661
662#[allow(dead_code)]
663enum SequentialExecutionLockGuard {
664 None,
665 #[cfg(feature = "tokio")]
666 Async(tokio::sync::OwnedMutexGuard<()>),
667 Sync(parking_lot::ArcMutexGuard<parking_lot::RawMutex, ()>),
668}
669
670#[derive(Clone)]
671struct SequentialExecutionLock {
672 #[cfg(feature = "tokio")]
673 async_mutex: Arc<tokio::sync::Mutex<()>>,
674 sync_mutex: Arc<parking_lot::Mutex<()>>,
675}
676
677impl SequentialExecutionLock {
678 pub fn new() -> Self {
679 Self {
680 #[cfg(feature = "tokio")]
681 async_mutex: Arc::new(tokio::sync::Mutex::new(())),
682 sync_mutex: Arc::new(parking_lot::Mutex::new(())),
683 }
684 }
685
686 #[cfg(feature = "tokio")]
687 pub async fn is_locked(&self) -> bool {
688 self.async_mutex.try_lock().is_err()
689 }
690
691 pub fn is_locked_sync(&self) -> bool {
692 self.sync_mutex.try_lock().is_none()
693 }
694
695 #[cfg(feature = "tokio")]
696 pub async fn lock(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
697 if is_sequential {
698 let permit = tokio::sync::Mutex::lock_owned(self.async_mutex.clone()).await;
699 SequentialExecutionLockGuard::Async(permit)
700 } else {
701 SequentialExecutionLockGuard::None
702 }
703 }
704
705 pub fn lock_sync(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
706 if is_sequential {
707 let permit = parking_lot::Mutex::lock_arc(&self.sync_mutex);
708 SequentialExecutionLockGuard::Sync(permit)
709 } else {
710 SequentialExecutionLockGuard::None
711 }
712 }
713}