1use rand::prelude::{SliceRandom, StdRng};
2use rand::SeedableRng;
3use std::any::Any;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::fmt::{Debug, Formatter};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use topological_sort::TopologicalSort;
9
10use crate::args::Arguments;
11use crate::internal::{
12 apply_suite_tags, apply_suite_timeouts, filter_registered_tests, DependencyConstructor,
13 DependencyView, RegisteredDependency, RegisteredTest, RegisteredTestSuiteProperty,
14};
15
16pub(crate) struct TestSuiteExecution {
17 crate_and_module: String,
18 dependencies: Vec<RegisteredDependency>,
19 tests: Vec<RegisteredTest>,
20 props: Vec<RegisteredTestSuiteProperty>,
21 inner: Vec<TestSuiteExecution>,
22 materialized_dependencies: HashMap<String, Arc<dyn Any + Send + Sync>>,
23 sequential_lock: SequentialExecutionLock,
24 remaining_count: usize,
25 idx: usize,
26 is_sequential: bool,
27 skip_creating_dependencies: bool,
28 in_progress: Arc<AtomicUsize>,
29}
30
31impl TestSuiteExecution {
32 pub fn construct(
33 arguments: &Arguments,
34 dependencies: &[RegisteredDependency],
35 tests: &[RegisteredTest],
36 props: &[RegisteredTestSuiteProperty],
37 ) -> (Self, Vec<RegisteredTest>) {
38 let tagged_tests = apply_suite_tags(tests, props);
39 let timed_tests = apply_suite_timeouts(&tagged_tests, props);
40 let mut filtered_tests = filter_registered_tests(arguments, &timed_tests);
41 Self::shuffle(arguments, &mut filtered_tests);
42 filtered_tests.reverse();
43
44 if filtered_tests.is_empty() {
45 (
46 Self::root(
47 dependencies
48 .iter()
49 .filter(|dep| dep.crate_name.is_empty() && dep.module_path.is_empty())
50 .cloned()
51 .collect::<Vec<_>>(),
52 Vec::new(),
53 props
54 .iter()
55 .filter(|dep| dep.crate_name().is_empty() && dep.module_path().is_empty())
56 .cloned()
57 .collect::<Vec<_>>(),
58 ),
59 Vec::new(),
60 )
61 } else {
62 let mut root = Self::root(Vec::new(), Vec::new(), Vec::new());
63
64 for prop in props {
65 root.add_prop(prop.clone());
66 }
67
68 for dep in dependencies {
69 root.add_dependency(dep.clone());
70 }
71
72 for test in filtered_tests.clone() {
73 root.add_test(test.clone());
74 }
75
76 root.propagate_sequential(None);
77 root.prune_unused_deps();
78
79 (root, filtered_tests)
80 }
81 }
82
83 fn shuffle(arguments: &Arguments, tests: &mut [RegisteredTest]) {
84 if let Some(seed) = arguments.shuffle_seed {
85 let mut rng = StdRng::seed_from_u64(seed);
86 tests.shuffle(&mut rng);
87 }
88 }
89
90 pub fn skip_creating_dependencies(&mut self) {
93 self.skip_creating_dependencies = true;
94 for inner in &mut self.inner {
95 inner.skip_creating_dependencies();
96 }
97 }
98
99 pub fn remaining(&self) -> usize {
100 self.remaining_count
101 }
102
103 pub fn is_empty(&self) -> bool {
104 self.tests.is_empty() && self.inner.is_empty()
105 }
106
107 pub fn is_done(&self) -> bool {
108 self.remaining_count == 0
109 }
110
111 pub fn has_dependencies(&self) -> bool {
113 !self.dependencies.is_empty() || self.inner.iter().any(|inner| inner.has_dependencies())
114 }
115
116 pub fn requires_capturing(&self, capture_by_default: bool) -> bool {
119 self.tests.iter().any(|test| {
120 test.props
121 .capture_control
122 .requires_capturing(capture_by_default)
123 }) || self
124 .inner
125 .iter()
126 .any(|inner| inner.requires_capturing(capture_by_default))
127 }
128
129 #[cfg(feature = "tokio")]
130 pub async fn pick_next(&mut self) -> Option<TestExecution> {
131 if self.is_empty() {
132 None
133 } else {
134 match self
135 .pick_next_internal(&self.create_dependency_map(&HashMap::new()))
136 .await
137 {
138 Some((test, deps, seq_lock, in_progress_counter)) => {
139 let index = self.idx;
140 self.idx += 1;
141 Some(TestExecution {
142 test: test.clone(),
143 deps: Arc::new(deps),
144 index,
145 _seq_lock: seq_lock,
146 in_progress_counter,
147 })
148 }
149 None => None,
150 }
151 }
152 }
153
154 pub fn pick_next_sync(&mut self) -> Option<TestExecution> {
155 match self.pick_next_internal_sync(&HashMap::new()) {
156 Some((test, deps, seq_lock, in_progress_counter)) => {
157 let index = self.idx;
158 self.idx += 1;
159 Some(TestExecution {
160 test: test.clone(),
161 deps: Arc::new(deps),
162 index,
163 _seq_lock: seq_lock,
164 in_progress_counter,
165 })
166 }
167 None => None,
168 }
169 }
170
171 #[cfg(feature = "tokio")]
172 #[allow(clippy::type_complexity)]
173 async fn pick_next_internal(
174 &mut self,
175 materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
176 ) -> Option<(
177 RegisteredTest,
178 HashMap<String, Arc<dyn Any + Send + Sync>>,
179 SequentialExecutionLockGuard,
180 Arc<AtomicUsize>,
181 )> {
182 if self.is_empty() {
183 None
184 } else {
185 let dependency_map = if !self.is_materialized() {
186 self.materialize_deps(materialized_parent_deps).await
187 } else {
188 self.create_dependency_map(materialized_parent_deps)
189 };
190
191 let locked = self.sequential_lock.is_locked().await;
192 let result = if self.tests.is_empty() || locked {
193 let current = self.inner.iter_mut();
194 let mut result = None;
195 for inner in current {
196 if let Some((test, deps, seq_lock, in_progress_counter)) =
197 Box::pin(inner.pick_next_internal(&dependency_map)).await
198 {
199 result = Some((test, deps, seq_lock, in_progress_counter));
200 break;
201 }
202 }
203 self.inner.retain(|inner| !inner.is_empty());
204
205 result
206 } else {
207 let guard = self.sequential_lock.lock(self.is_sequential).await;
208 self.in_progress.fetch_add(1, Ordering::Release);
209 self.tests
210 .pop()
211 .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
212 };
213 if result.is_none()
214 && self.is_empty()
215 && self.is_materialized()
216 && !locked
217 && self.in_progress.load(Ordering::Acquire) == 0
218 {
219 self.drop_deps();
220 }
221 if result.is_some() {
222 self.remaining_count -= 1;
223 }
224 result
225 }
226 }
227
228 #[allow(clippy::type_complexity)]
229 fn pick_next_internal_sync(
230 &mut self,
231 materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
232 ) -> Option<(
233 RegisteredTest,
234 HashMap<String, Arc<dyn Any + Send + Sync>>,
235 SequentialExecutionLockGuard,
236 Arc<AtomicUsize>,
237 )> {
238 if self.is_empty() {
239 None
240 } else {
241 let dependency_map = if !self.is_materialized() {
242 self.materialize_deps_sync(materialized_parent_deps)
243 } else {
244 self.create_dependency_map(materialized_parent_deps)
245 };
246
247 let locked = self.sequential_lock.is_locked_sync();
248 let result = if self.tests.is_empty() || locked {
249 let current = self.inner.iter_mut();
250 let mut result = None;
251 for inner in current {
252 if let Some((test, deps, seq_lock, in_progress_counter)) =
253 inner.pick_next_internal_sync(&dependency_map)
254 {
255 result = Some((test, deps, seq_lock, in_progress_counter));
256 break;
257 }
258 }
259
260 self.inner.retain(|inner| !inner.is_empty());
261 result
262 } else {
263 let guard = self.sequential_lock.lock_sync(self.is_sequential);
264 self.in_progress.fetch_add(1, Ordering::Release);
265 self.tests
266 .pop()
267 .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
268 };
269 if result.is_none()
270 && self.is_materialized()
271 && !locked
272 && self.in_progress.load(Ordering::Acquire) == 0
273 {
274 self.drop_deps();
275 }
276 if result.is_some() {
277 self.remaining_count -= 1;
278 }
279 result
280 }
281 }
282
283 fn create_dependency_map(
284 &self,
285 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
286 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
287 let mut result = parent_map.clone();
288 for (key, dep) in &self.materialized_dependencies {
289 result.insert(key.clone(), dep.clone());
290 }
291 result
292 }
293
294 fn root(
295 deps: Vec<RegisteredDependency>,
296 tests: Vec<RegisteredTest>,
297 props: Vec<RegisteredTestSuiteProperty>,
298 ) -> Self {
299 let total_count = tests.len();
300 let is_sequential = props
301 .iter()
302 .any(|prop| matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }))
303 || tests.iter().any(|test| test.run.is_bench());
304 Self {
305 crate_and_module: String::new(),
306 dependencies: deps,
307 tests,
308 props,
309 inner: Vec::new(),
310 materialized_dependencies: HashMap::new(),
311 remaining_count: total_count,
312 idx: 0,
313 sequential_lock: SequentialExecutionLock::new(),
314 is_sequential,
315 skip_creating_dependencies: false,
316 in_progress: Arc::new(AtomicUsize::new(0)),
317 }
318 }
319
320 fn add_dependency(&mut self, dep: RegisteredDependency) {
321 let crate_and_module = dep.crate_and_module();
322 if self.crate_and_module == crate_and_module {
323 self.dependencies.push(dep);
324 } else {
325 let mut found = false;
326 for inner in &mut self.inner {
327 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
328 inner.add_dependency(dep.clone());
329 found = true;
330 break;
331 }
332 }
333 if !found {
334 let mut inner = Self {
335 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
336 dependencies: vec![],
337 tests: vec![],
338 inner: vec![],
339 props: vec![],
340 materialized_dependencies: HashMap::new(),
341 remaining_count: 0,
342 idx: 0,
343 is_sequential: false,
344 sequential_lock: SequentialExecutionLock::new(),
345 skip_creating_dependencies: false,
346 in_progress: Arc::new(AtomicUsize::new(0)),
347 };
348 inner.add_dependency(dep);
349 self.inner.push(inner);
350 }
351 }
352 }
353
354 fn add_test(&mut self, test: RegisteredTest) {
355 let crate_and_module = test.crate_and_module();
356 if self.crate_and_module == crate_and_module {
357 self.tests.push(test.clone());
358
359 if test.run.is_bench() {
360 self.is_sequential = true;
361 }
362 } else {
363 let mut found = false;
364 for inner in &mut self.inner {
365 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
366 inner.add_test(test.clone());
367 found = true;
368 break;
369 }
370 }
371 if !found {
372 let mut inner = Self {
373 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
374 dependencies: vec![],
375 tests: vec![],
376 inner: vec![],
377 props: vec![],
378 materialized_dependencies: HashMap::new(),
379 remaining_count: 0,
380 idx: 0,
381 is_sequential: false,
382 sequential_lock: SequentialExecutionLock::new(),
383 skip_creating_dependencies: false,
384 in_progress: Arc::new(AtomicUsize::new(0)),
385 };
386 inner.add_test(test);
387 self.inner.push(inner);
388 }
389 }
390 self.remaining_count += 1;
391 }
392
393 fn add_prop(&mut self, prop: RegisteredTestSuiteProperty) {
394 let crate_and_module = prop.crate_and_module();
395 if self.crate_and_module == crate_and_module {
396 if matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }) {
397 self.is_sequential = true;
398 }
399 self.props.push(prop);
400 } else {
401 let mut found = false;
402 for inner in &mut self.inner {
403 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
404 inner.add_prop(prop.clone());
405 found = true;
406 break;
407 }
408 }
409 if !found {
410 let mut inner = Self {
411 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
412 dependencies: vec![],
413 tests: vec![],
414 inner: vec![],
415 props: vec![],
416 materialized_dependencies: HashMap::new(),
417 remaining_count: 0,
418 idx: 0,
419 is_sequential: false,
420 sequential_lock: SequentialExecutionLock::new(),
421 skip_creating_dependencies: false,
422 in_progress: Arc::new(AtomicUsize::new(0)),
423 };
424 inner.add_prop(prop);
425 self.inner.push(inner);
426 }
427 }
428 }
429
430 fn is_materialized(&self) -> bool {
431 self.skip_creating_dependencies
432 || self.materialized_dependencies.len() == self.dependencies.len()
433 }
434
435 #[cfg(feature = "tokio")]
436 async fn materialize_deps(
437 &mut self,
438 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
439 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
440 let mut deps = HashMap::with_capacity(self.dependencies.len());
441 let mut dependency_map = parent_map.clone();
442
443 let sorted_dependencies = self.sorted_dependencies();
444 for dep in &sorted_dependencies {
445 let materialized_dep = match &dep.constructor {
446 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
447 DependencyConstructor::Async(cons) => cons(Arc::new(dependency_map.clone())).await,
448 };
449 deps.insert(dep.name.clone(), materialized_dep.clone());
450 dependency_map.insert(dep.name.clone(), materialized_dep);
451 }
452 self.materialized_dependencies = deps;
453 dependency_map
454 }
455
456 fn materialize_deps_sync(
457 &mut self,
458 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
459 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
460 let mut deps = HashMap::with_capacity(self.dependencies.len());
461 let mut dependency_map = parent_map.clone();
462
463 let sorted_dependencies = self.sorted_dependencies();
464 for dep in &sorted_dependencies {
465 let materialized_dep = match &dep.constructor {
466 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
467 DependencyConstructor::Async(_cons) => {
468 panic!("Async dependencies are not supported in sync mode")
469 }
470 };
471 deps.insert(dep.name.clone(), materialized_dep.clone());
472 dependency_map.insert(dep.name.clone(), materialized_dep);
473 }
474 self.materialized_dependencies = deps;
475 dependency_map
476 }
477
478 fn sorted_dependencies(&self) -> Vec<&RegisteredDependency> {
479 let mut ts: TopologicalSort<&RegisteredDependency> = TopologicalSort::new();
480 for dep in &self.dependencies {
481 let mut added = false;
482 for dep_dep_name in &dep.dependencies {
483 if let Some(dep_dep) = self.dependencies.iter().find(|d| &d.name == dep_dep_name) {
484 ts.add_dependency(dep_dep, dep);
485 added = true;
486 } else {
487 }
489 }
490 if !added {
491 ts.insert(dep);
492 }
493 }
494 let mut result = Vec::with_capacity(self.dependencies.len());
495 loop {
496 let chunk = ts.pop_all();
497 if chunk.is_empty() {
498 break;
499 }
500 result.extend(chunk);
501 }
502 result
503 }
504
505 fn drop_deps(&mut self) {
506 self.materialized_dependencies.clear();
507 }
508
509 fn prune_unused_deps(&mut self) -> Option<HashSet<String>> {
513 let mut needed: Option<HashSet<String>> = Some(HashSet::new());
515 for test in &self.tests {
516 match &test.dependencies {
517 None => {
518 needed = None;
519 break;
520 }
521 Some(deps) => {
522 if let Some(ref mut set) = needed {
523 set.extend(deps.iter().cloned());
524 }
525 }
526 }
527 }
528
529 for inner in &mut self.inner {
531 let child_needs = inner.prune_unused_deps();
532 needed = match (needed, child_needs) {
533 (None, _) | (_, None) => None,
534 (Some(mut a), Some(b)) => {
535 a.extend(b);
536 Some(a)
537 }
538 };
539 }
540
541 let needed = needed?;
543
544 let local_names: HashSet<String> =
546 self.dependencies.iter().map(|d| d.name.clone()).collect();
547 let mut keep_local: HashSet<String> = needed.intersection(&local_names).cloned().collect();
548
549 let mut queue: VecDeque<String> = keep_local.iter().cloned().collect();
551 let mut needed_from_parent: HashSet<String> =
552 needed.difference(&local_names).cloned().collect();
553
554 while let Some(dep_name) = queue.pop_front() {
555 if let Some(dep) = self.dependencies.iter().find(|d| d.name == dep_name) {
556 for transitive in &dep.dependencies {
557 if local_names.contains(transitive) {
558 if keep_local.insert(transitive.clone()) {
559 queue.push_back(transitive.clone());
560 }
561 } else {
562 needed_from_parent.insert(transitive.clone());
563 }
564 }
565 }
566 }
567
568 self.dependencies.retain(|d| keep_local.contains(&d.name));
570
571 Some(needed_from_parent)
572 }
573
574 fn is_prefix_of(this: &str, that: &str) -> bool {
575 this.is_empty() || this == that || that.starts_with(&format!("{this}::"))
576 }
577
578 fn next_level(from: &str, to: &str) -> String {
579 assert!(Self::is_prefix_of(from, to));
580 let remaining = if from.is_empty() {
581 to
582 } else {
583 &to[from.len() + 2..]
584 };
585
586 let result = if let Some((next, _tail)) = remaining.split_once("::") {
587 format!("{from}::{next}")
588 } else {
589 format!("{from}::{remaining}")
590 };
591 result.trim_start_matches("::").to_string()
592 }
593
594 fn propagate_sequential(&mut self, inherited_lock: Option<&SequentialExecutionLock>) {
595 if let Some(parent_lock) = inherited_lock {
596 self.is_sequential = true;
597 self.sequential_lock = parent_lock.clone();
598 }
599
600 let lock_for_children = if self.is_sequential {
601 Some(self.sequential_lock.clone())
602 } else {
603 None
604 };
605
606 for child in &mut self.inner {
607 child.propagate_sequential(lock_for_children.as_ref());
608 }
609 }
610}
611
612impl Debug for TestSuiteExecution {
613 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
614 writeln!(
615 f,
616 "'{}' {} [{}]",
617 self.crate_and_module,
618 self.props
619 .iter()
620 .map(|x| format!("{x:?}"))
621 .collect::<Vec<_>>()
622 .join(", "),
623 if self.is_sequential { "S" } else { "P" }
624 )?;
625 writeln!(f, " deps:")?;
626 for dep in &self.dependencies {
627 writeln!(f, " '{}'", dep.name)?;
628 }
629 writeln!(f, " tests:")?;
630 for test in &self.tests {
631 writeln!(f, " '{}' [{:?}]", test.name, test.props.test_type)?;
632 }
633 for inner in &self.inner {
634 let inner_str = format!("{inner:?}");
635 for inner_line in inner_str.lines() {
636 writeln!(f, " {inner_line}")?;
637 }
638 }
639 Ok(())
640 }
641}
642
643impl DependencyView for HashMap<String, Arc<dyn Any + Send + Sync>> {
644 fn get(&self, name: &str) -> Option<Arc<dyn Any + Send + Sync>> {
645 self.get(name).cloned()
646 }
647}
648
649pub struct TestExecution {
650 pub test: RegisteredTest,
651 pub deps: Arc<dyn DependencyView + Send + Sync>,
652 pub index: usize,
653 _seq_lock: SequentialExecutionLockGuard,
654 in_progress_counter: Arc<AtomicUsize>,
655}
656
657impl Drop for TestExecution {
658 fn drop(&mut self) {
659 self.in_progress_counter.fetch_sub(1, Ordering::Release);
660 }
661}
662
663#[allow(dead_code)]
664enum SequentialExecutionLockGuard {
665 None,
666 #[cfg(feature = "tokio")]
667 Async(tokio::sync::OwnedMutexGuard<()>),
668 Sync(parking_lot::ArcMutexGuard<parking_lot::RawMutex, ()>),
669}
670
671#[derive(Clone)]
672struct SequentialExecutionLock {
673 #[cfg(feature = "tokio")]
674 async_mutex: Arc<tokio::sync::Mutex<()>>,
675 sync_mutex: Arc<parking_lot::Mutex<()>>,
676}
677
678impl SequentialExecutionLock {
679 pub fn new() -> Self {
680 Self {
681 #[cfg(feature = "tokio")]
682 async_mutex: Arc::new(tokio::sync::Mutex::new(())),
683 sync_mutex: Arc::new(parking_lot::Mutex::new(())),
684 }
685 }
686
687 #[cfg(feature = "tokio")]
688 pub async fn is_locked(&self) -> bool {
689 self.async_mutex.try_lock().is_err()
690 }
691
692 pub fn is_locked_sync(&self) -> bool {
693 self.sync_mutex.try_lock().is_none()
694 }
695
696 #[cfg(feature = "tokio")]
697 pub async fn lock(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
698 if is_sequential {
699 let permit = tokio::sync::Mutex::lock_owned(self.async_mutex.clone()).await;
700 SequentialExecutionLockGuard::Async(permit)
701 } else {
702 SequentialExecutionLockGuard::None
703 }
704 }
705
706 pub fn lock_sync(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
707 if is_sequential {
708 let permit = parking_lot::Mutex::lock_arc(&self.sync_mutex);
709 SequentialExecutionLockGuard::Sync(permit)
710 } else {
711 SequentialExecutionLockGuard::None
712 }
713 }
714}