1use rand::prelude::{SliceRandom, StdRng};
2use rand::SeedableRng;
3use std::any::Any;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::fmt::{Debug, Formatter};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use topological_sort::TopologicalSort;
9
10use crate::args::Arguments;
11use crate::internal::{
12 apply_suite_props_to_tests, filter_registered_tests, DepScope, DependencyConstructor,
13 DependencyView, HostedRpcOwnerCell, RegisteredDependency, RegisteredTest,
14 RegisteredTestSuiteProperty,
15};
16
17pub type DepWireBytes = (String, Vec<u8>);
20
21pub type HostedOwner = Arc<dyn Any + Send + Sync>;
24
25#[cfg(feature = "tokio")]
26type ParentSharedDependenciesFuture<'a> = std::pin::Pin<
27 Box<dyn std::future::Future<Output = HashMap<String, Arc<dyn Any + Send + Sync>>> + 'a>,
28>;
29
30#[cfg(test)]
31pub type HostedDescriptorCollection = (Vec<DepWireBytes>, Vec<HostedOwner>);
36
37pub struct ParentSharedDependencies {
41 pub cloneable_wire_bytes: Vec<DepWireBytes>,
42 pub cloneable_local_values: Vec<(String, Arc<dyn Any + Send + Sync>)>,
56 pub hosted_descriptor_bytes: Vec<DepWireBytes>,
57 pub hosted_owners: Vec<HostedOwner>,
58 pub hosted_rpc_owner_cells: Vec<(String, Arc<HostedRpcOwnerCell>)>,
59}
60
61impl ParentSharedDependencies {
62 fn new() -> Self {
63 Self {
64 cloneable_wire_bytes: Vec::new(),
65 cloneable_local_values: Vec::new(),
66 hosted_descriptor_bytes: Vec::new(),
67 hosted_owners: Vec::new(),
68 hosted_rpc_owner_cells: Vec::new(),
69 }
70 }
71}
72
73pub(crate) struct TestSuiteExecution {
74 crate_and_module: String,
75 dependencies: Vec<RegisteredDependency>,
76 tests: Vec<RegisteredTest>,
77 props: Vec<RegisteredTestSuiteProperty>,
78 inner: Vec<TestSuiteExecution>,
79 materialized_dependencies: HashMap<String, Arc<dyn Any + Send + Sync>>,
80 sequential_lock: SequentialExecutionLock,
81 remaining_count: usize,
82 idx: usize,
83 is_sequential: bool,
84 skip_creating_dependencies: bool,
85 in_progress: Arc<AtomicUsize>,
86}
87
88impl TestSuiteExecution {
89 pub fn construct(
90 arguments: &Arguments,
91 dependencies: &[RegisteredDependency],
92 tests: &[RegisteredTest],
93 props: &[RegisteredTestSuiteProperty],
94 ) -> (Self, Vec<RegisteredTest>) {
95 let tests_with_props = apply_suite_props_to_tests(tests, props);
96 let mut filtered_tests = filter_registered_tests(arguments, &tests_with_props);
97 Self::shuffle(arguments, &mut filtered_tests);
98 filtered_tests.reverse();
99
100 if filtered_tests.is_empty() {
101 (
102 Self::root(
103 dependencies
104 .iter()
105 .filter(|dep| dep.crate_name.is_empty() && dep.module_path.is_empty())
106 .cloned()
107 .collect::<Vec<_>>(),
108 Vec::new(),
109 props
110 .iter()
111 .filter(|dep| dep.crate_name().is_empty() && dep.module_path().is_empty())
112 .cloned()
113 .collect::<Vec<_>>(),
114 ),
115 Vec::new(),
116 )
117 } else {
118 let mut root = Self::root(Vec::new(), Vec::new(), Vec::new());
119
120 for prop in props {
121 root.add_prop(prop.clone());
122 }
123
124 for dep in dependencies {
125 root.add_dependency(dep.clone());
126 }
127
128 for test in filtered_tests.clone() {
129 root.add_test(test.clone());
130 }
131
132 root.propagate_sequential(None);
133 root.prune_unused_deps();
134
135 (root, filtered_tests)
136 }
137 }
138
139 fn shuffle(arguments: &Arguments, tests: &mut [RegisteredTest]) {
140 if let Some(seed) = arguments.shuffle_seed {
141 let mut rng = StdRng::seed_from_u64(seed);
142 tests.shuffle(&mut rng);
143 }
144 }
145
146 pub fn skip_creating_dependencies(&mut self) {
149 self.skip_creating_dependencies = true;
150 for inner in &mut self.inner {
151 inner.skip_creating_dependencies();
152 }
153 }
154
155 pub fn remaining(&self) -> usize {
156 self.remaining_count
157 }
158
159 pub fn is_empty(&self) -> bool {
160 self.tests.is_empty() && self.inner.is_empty()
161 }
162
163 pub fn is_done(&self) -> bool {
164 self.remaining_count == 0
165 }
166
167 #[allow(dead_code)]
169 pub fn has_dependencies(&self) -> bool {
170 !self.dependencies.is_empty() || self.inner.iter().any(|inner| inner.has_dependencies())
171 }
172
173 pub fn has_shared_dependencies(&self) -> bool {
177 self.dependencies
178 .iter()
179 .any(|d| d.scope == DepScope::Shared)
180 || self
181 .inner
182 .iter()
183 .any(|inner| inner.has_shared_dependencies())
184 }
185
186 #[allow(dead_code)]
188 pub fn has_cloneable_dependencies(&self) -> bool {
189 self.dependencies
190 .iter()
191 .any(|d| d.scope == DepScope::Cloneable)
192 || self
193 .inner
194 .iter()
195 .any(|inner| inner.has_cloneable_dependencies())
196 }
197
198 #[allow(dead_code)]
202 pub fn has_hosted_dependencies(&self) -> bool {
203 self.dependencies
204 .iter()
205 .any(|d| d.scope == DepScope::Hosted)
206 || self
207 .inner
208 .iter()
209 .any(|inner| inner.has_hosted_dependencies())
210 }
211
212 #[allow(dead_code)]
216 pub fn has_hosted_rpc_dependencies(&self) -> bool {
217 self.dependencies
218 .iter()
219 .any(|d| d.scope == DepScope::HostedRpc)
220 || self
221 .inner
222 .iter()
223 .any(|inner| inner.has_hosted_rpc_dependencies())
224 }
225
226 #[allow(dead_code)]
228 pub fn collect_cloneable_dependencies(&self) -> Vec<RegisteredDependency> {
229 let mut out = Vec::new();
230 self.collect_cloneable_dependencies_into(&mut out);
231 out
232 }
233
234 #[allow(dead_code)]
235 fn collect_cloneable_dependencies_into(&self, out: &mut Vec<RegisteredDependency>) {
236 for dep in &self.dependencies {
237 if dep.scope == DepScope::Cloneable {
238 out.push(dep.clone());
239 }
240 }
241 for inner in &self.inner {
242 inner.collect_cloneable_dependencies_into(out);
243 }
244 }
245
246 pub fn collect_parent_shared_dependencies_sync(&self) -> ParentSharedDependencies {
253 let mut out = ParentSharedDependencies::new();
254 let parent_map = HashMap::new();
255 self.collect_parent_shared_dependencies_into_sync(&parent_map, &mut out);
256 out
257 }
258
259 fn collect_parent_shared_dependencies_into_sync(
260 &self,
261 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
262 out: &mut ParentSharedDependencies,
263 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
264 let mut dependency_map = parent_map.clone();
265 let sorted_dependencies = self.sorted_dependencies();
266
267 for dep in sorted_dependencies {
268 if dependency_map.contains_key(&dep.name) {
269 continue;
270 }
271
272 let value = Self::construct_dependency_sync(dep, &dependency_map);
273 match dep.scope {
274 DepScope::Cloneable => {
275 let codec = dep.cloneable_codec.as_ref().unwrap_or_else(|| {
276 panic!("Cloneable dep '{}' missing CloneableCodec", dep.name)
277 });
278 out.cloneable_wire_bytes
279 .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
280 out.cloneable_local_values
285 .push((dep.qualified_id(), value.clone()));
286 }
287 DepScope::Hosted => {
288 let codec = dep.hosted_codec.as_ref().unwrap_or_else(|| {
289 panic!("Hosted dep '{}' missing hosted codec", dep.name)
290 });
291 out.hosted_descriptor_bytes
292 .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
293 out.hosted_owners.push(value.clone());
294 }
295 DepScope::HostedRpc => {
296 let factory = dep.rpc_factory.as_ref().unwrap_or_else(|| {
297 panic!("HostedRpc dep '{}' missing RpcFactory", dep.name)
298 });
299 let cell = (factory.owner_into_cell)(value.clone());
300 out.hosted_rpc_owner_cells.push((dep.qualified_id(), cell));
301 }
302 DepScope::Shared | DepScope::PerWorker => {}
303 }
304
305 dependency_map.insert(dep.name.clone(), value);
306 }
307
308 for inner in &self.inner {
309 inner.collect_parent_shared_dependencies_into_sync(&dependency_map, out);
310 }
311
312 dependency_map
313 }
314
315 fn construct_dependency_sync(
316 dep: &RegisteredDependency,
317 dependency_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
318 ) -> Arc<dyn Any + Send + Sync> {
319 match &dep.constructor {
320 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
321 DependencyConstructor::Async(cons) => {
322 futures::executor::block_on(cons(Arc::new(dependency_map.clone())))
323 }
324 }
325 }
326
327 #[cfg(test)]
332 pub fn collect_cloneable_wire_bytes_sync(&self) -> Vec<(String, Vec<u8>)> {
333 self.collect_parent_shared_dependencies_sync()
334 .cloneable_wire_bytes
335 }
336
337 #[cfg(test)]
345 pub fn collect_hosted_descriptor_bytes_sync(&self) -> HostedDescriptorCollection {
346 let collected = self.collect_parent_shared_dependencies_sync();
347 (collected.hosted_descriptor_bytes, collected.hosted_owners)
348 }
349
350 #[cfg(test)]
355 pub fn collect_hosted_rpc_owner_cells_sync(&self) -> Vec<(String, Arc<HostedRpcOwnerCell>)> {
356 self.collect_parent_shared_dependencies_sync()
357 .hosted_rpc_owner_cells
358 }
359
360 #[cfg(feature = "tokio")]
364 pub async fn collect_parent_shared_dependencies_async(&self) -> ParentSharedDependencies {
365 let mut out = ParentSharedDependencies::new();
366 let parent_map = HashMap::new();
367 self.collect_parent_shared_dependencies_into_async(&parent_map, &mut out)
368 .await;
369 out
370 }
371
372 #[cfg(feature = "tokio")]
373 fn collect_parent_shared_dependencies_into_async<'a>(
374 &'a self,
375 parent_map: &'a HashMap<String, Arc<dyn Any + Send + Sync>>,
376 out: &'a mut ParentSharedDependencies,
377 ) -> ParentSharedDependenciesFuture<'a> {
378 Box::pin(async move {
379 let mut dependency_map = parent_map.clone();
380 let sorted_dependencies = self.sorted_dependencies();
381
382 for dep in sorted_dependencies {
383 if dependency_map.contains_key(&dep.name) {
384 continue;
385 }
386
387 let value = match &dep.constructor {
388 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
389 DependencyConstructor::Async(cons) => {
390 cons(Arc::new(dependency_map.clone())).await
391 }
392 };
393 match dep.scope {
394 DepScope::Cloneable => {
395 let codec = dep.cloneable_codec.as_ref().unwrap_or_else(|| {
396 panic!("Cloneable dep '{}' missing CloneableCodec", dep.name)
397 });
398 out.cloneable_wire_bytes
399 .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
400 out.cloneable_local_values
406 .push((dep.qualified_id(), value.clone()));
407 }
408 DepScope::Hosted => {
409 let codec = dep.hosted_codec.as_ref().unwrap_or_else(|| {
410 panic!("Hosted dep '{}' missing hosted codec", dep.name)
411 });
412 out.hosted_descriptor_bytes
413 .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
414 out.hosted_owners.push(value.clone());
415 }
416 DepScope::HostedRpc => {
417 let factory = dep.rpc_factory.as_ref().unwrap_or_else(|| {
418 panic!("HostedRpc dep '{}' missing RpcFactory", dep.name)
419 });
420 let cell = (factory.owner_into_cell)(value.clone());
421 out.hosted_rpc_owner_cells.push((dep.qualified_id(), cell));
422 }
423 DepScope::Shared | DepScope::PerWorker => {}
424 }
425
426 dependency_map.insert(dep.name.clone(), value);
427 }
428
429 for inner in &self.inner {
430 inner
431 .collect_parent_shared_dependencies_into_async(&dependency_map, out)
432 .await;
433 }
434 dependency_map
435 })
436 }
437
438 #[cfg(feature = "tokio")]
440 #[cfg(test)]
441 pub async fn collect_hosted_descriptor_bytes_async(&self) -> HostedDescriptorCollection {
442 let collected = self.collect_parent_shared_dependencies_async().await;
443 (collected.hosted_descriptor_bytes, collected.hosted_owners)
444 }
445
446 #[cfg(feature = "tokio")]
456 #[cfg(test)]
457 pub async fn collect_cloneable_wire_bytes_async(&self) -> Vec<(String, Vec<u8>)> {
458 self.collect_parent_shared_dependencies_async()
459 .await
460 .cloneable_wire_bytes
461 }
462
463 pub fn provide_cloneable_value(
472 &mut self,
473 dep_id: &str,
474 value: Arc<dyn Any + Send + Sync>,
475 ) -> bool {
476 let applied = self.provide_cloneable_value_internal(dep_id, value);
477 if applied {
478 self.prune_unused_deps();
479 }
480 applied
481 }
482
483 fn provide_cloneable_value_internal(
484 &mut self,
485 dep_id: &str,
486 value: Arc<dyn Any + Send + Sync>,
487 ) -> bool {
488 let mut applied = false;
489 if let Some((local_name, dep_idx)) = self
490 .dependencies
491 .iter()
492 .enumerate()
493 .find(|(_, d)| d.qualified_id() == dep_id)
494 .map(|(idx, d)| (d.name.clone(), idx))
495 {
496 self.dependencies[dep_idx].dependencies.clear();
501 self.materialized_dependencies
502 .insert(local_name, value.clone());
503 applied = true;
504 }
505 for inner in &mut self.inner {
506 applied |= inner.provide_cloneable_value_internal(dep_id, value.clone());
507 }
508 applied
509 }
510
511 pub fn requires_capturing(&self, capture_by_default: bool) -> bool {
514 self.tests.iter().any(|test| {
515 test.props
516 .capture_control
517 .requires_capturing(capture_by_default)
518 }) || self
519 .inner
520 .iter()
521 .any(|inner| inner.requires_capturing(capture_by_default))
522 }
523
524 #[cfg(feature = "tokio")]
525 pub async fn pick_next(&mut self) -> Option<TestExecution> {
526 if self.is_empty() {
527 None
528 } else {
529 match self
530 .pick_next_internal(&self.create_dependency_map(&HashMap::new()))
531 .await
532 {
533 Some((test, deps, seq_lock, in_progress_counter)) => {
534 let index = self.idx;
535 self.idx += 1;
536 Some(TestExecution {
537 test: test.clone(),
538 deps: Arc::new(deps),
539 index,
540 _seq_lock: seq_lock,
541 in_progress_counter,
542 })
543 }
544 None => None,
545 }
546 }
547 }
548
549 pub fn pick_next_sync(&mut self) -> Option<TestExecution> {
550 match self.pick_next_internal_sync(&HashMap::new()) {
551 Some((test, deps, seq_lock, in_progress_counter)) => {
552 let index = self.idx;
553 self.idx += 1;
554 Some(TestExecution {
555 test: test.clone(),
556 deps: Arc::new(deps),
557 index,
558 _seq_lock: seq_lock,
559 in_progress_counter,
560 })
561 }
562 None => None,
563 }
564 }
565
566 #[cfg(feature = "tokio")]
567 #[allow(clippy::type_complexity)]
568 async fn pick_next_internal(
569 &mut self,
570 materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
571 ) -> Option<(
572 RegisteredTest,
573 HashMap<String, Arc<dyn Any + Send + Sync>>,
574 SequentialExecutionLockGuard,
575 Arc<AtomicUsize>,
576 )> {
577 if self.is_empty() {
578 None
579 } else {
580 let dependency_map = if !self.is_materialized() {
581 self.materialize_deps(materialized_parent_deps).await
582 } else {
583 self.create_dependency_map(materialized_parent_deps)
584 };
585
586 let locked = self.sequential_lock.is_locked().await;
587 let result = if self.tests.is_empty() || locked {
588 let current = self.inner.iter_mut();
589 let mut result = None;
590 for inner in current {
591 if let Some((test, deps, seq_lock, in_progress_counter)) =
592 Box::pin(inner.pick_next_internal(&dependency_map)).await
593 {
594 result = Some((test, deps, seq_lock, in_progress_counter));
595 break;
596 }
597 }
598 self.inner.retain(|inner| !inner.is_empty());
599
600 result
601 } else {
602 let guard = self.sequential_lock.lock(self.is_sequential).await;
603 self.in_progress.fetch_add(1, Ordering::Release);
604 self.tests
605 .pop()
606 .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
607 };
608 if result.is_none()
609 && self.is_empty()
610 && self.is_materialized()
611 && !locked
612 && self.in_progress.load(Ordering::Acquire) == 0
613 {
614 self.drop_deps();
615 }
616 if result.is_some() {
617 self.remaining_count -= 1;
618 }
619 result
620 }
621 }
622
623 #[allow(clippy::type_complexity)]
624 fn pick_next_internal_sync(
625 &mut self,
626 materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
627 ) -> Option<(
628 RegisteredTest,
629 HashMap<String, Arc<dyn Any + Send + Sync>>,
630 SequentialExecutionLockGuard,
631 Arc<AtomicUsize>,
632 )> {
633 if self.is_empty() {
634 None
635 } else {
636 let dependency_map = if !self.is_materialized() {
637 self.materialize_deps_sync(materialized_parent_deps)
638 } else {
639 self.create_dependency_map(materialized_parent_deps)
640 };
641
642 let locked = self.sequential_lock.is_locked_sync();
643 let result = if self.tests.is_empty() || locked {
644 let current = self.inner.iter_mut();
645 let mut result = None;
646 for inner in current {
647 if let Some((test, deps, seq_lock, in_progress_counter)) =
648 inner.pick_next_internal_sync(&dependency_map)
649 {
650 result = Some((test, deps, seq_lock, in_progress_counter));
651 break;
652 }
653 }
654
655 self.inner.retain(|inner| !inner.is_empty());
656 result
657 } else {
658 let guard = self.sequential_lock.lock_sync(self.is_sequential);
659 self.in_progress.fetch_add(1, Ordering::Release);
660 self.tests
661 .pop()
662 .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
663 };
664 if result.is_none()
668 && self.is_empty()
669 && self.is_materialized()
670 && !locked
671 && self.in_progress.load(Ordering::Acquire) == 0
672 {
673 self.drop_deps();
674 }
675 if result.is_some() {
676 self.remaining_count -= 1;
677 }
678 result
679 }
680 }
681
682 fn create_dependency_map(
683 &self,
684 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
685 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
686 let mut result = parent_map.clone();
687 for (key, dep) in &self.materialized_dependencies {
688 result.insert(key.clone(), dep.clone());
689 }
690 result
691 }
692
693 fn root(
694 deps: Vec<RegisteredDependency>,
695 tests: Vec<RegisteredTest>,
696 props: Vec<RegisteredTestSuiteProperty>,
697 ) -> Self {
698 let total_count = tests.len();
699 let is_sequential = props
700 .iter()
701 .any(|prop| matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }))
702 || tests.iter().any(|test| test.run.is_bench());
703 Self {
704 crate_and_module: String::new(),
705 dependencies: deps,
706 tests,
707 props,
708 inner: Vec::new(),
709 materialized_dependencies: HashMap::new(),
710 remaining_count: total_count,
711 idx: 0,
712 sequential_lock: SequentialExecutionLock::new(),
713 is_sequential,
714 skip_creating_dependencies: false,
715 in_progress: Arc::new(AtomicUsize::new(0)),
716 }
717 }
718
719 fn add_dependency(&mut self, dep: RegisteredDependency) {
720 let crate_and_module = dep.crate_and_module();
721 if self.crate_and_module == crate_and_module {
722 self.dependencies.push(dep);
723 } else {
724 let mut found = false;
725 for inner in &mut self.inner {
726 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
727 inner.add_dependency(dep.clone());
728 found = true;
729 break;
730 }
731 }
732 if !found {
733 let mut inner = Self {
734 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
735 dependencies: vec![],
736 tests: vec![],
737 inner: vec![],
738 props: vec![],
739 materialized_dependencies: HashMap::new(),
740 remaining_count: 0,
741 idx: 0,
742 is_sequential: false,
743 sequential_lock: SequentialExecutionLock::new(),
744 skip_creating_dependencies: false,
745 in_progress: Arc::new(AtomicUsize::new(0)),
746 };
747 inner.add_dependency(dep);
748 self.inner.push(inner);
749 }
750 }
751 }
752
753 fn add_test(&mut self, test: RegisteredTest) {
754 let crate_and_module = test.crate_and_module();
755 if self.crate_and_module == crate_and_module {
756 self.tests.push(test.clone());
757
758 if test.run.is_bench() {
759 self.is_sequential = true;
760 }
761 } else {
762 let mut found = false;
763 for inner in &mut self.inner {
764 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
765 inner.add_test(test.clone());
766 found = true;
767 break;
768 }
769 }
770 if !found {
771 let mut inner = Self {
772 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
773 dependencies: vec![],
774 tests: vec![],
775 inner: vec![],
776 props: vec![],
777 materialized_dependencies: HashMap::new(),
778 remaining_count: 0,
779 idx: 0,
780 is_sequential: false,
781 sequential_lock: SequentialExecutionLock::new(),
782 skip_creating_dependencies: false,
783 in_progress: Arc::new(AtomicUsize::new(0)),
784 };
785 inner.add_test(test);
786 self.inner.push(inner);
787 }
788 }
789 self.remaining_count += 1;
790 }
791
792 fn add_prop(&mut self, prop: RegisteredTestSuiteProperty) {
793 let crate_and_module = prop.crate_and_module();
794 if self.crate_and_module == crate_and_module {
795 if matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }) {
796 self.is_sequential = true;
797 }
798 self.props.push(prop);
799 } else {
800 let mut found = false;
801 for inner in &mut self.inner {
802 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
803 inner.add_prop(prop.clone());
804 found = true;
805 break;
806 }
807 }
808 if !found {
809 let mut inner = Self {
810 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
811 dependencies: vec![],
812 tests: vec![],
813 inner: vec![],
814 props: vec![],
815 materialized_dependencies: HashMap::new(),
816 remaining_count: 0,
817 idx: 0,
818 is_sequential: false,
819 sequential_lock: SequentialExecutionLock::new(),
820 skip_creating_dependencies: false,
821 in_progress: Arc::new(AtomicUsize::new(0)),
822 };
823 inner.add_prop(prop);
824 self.inner.push(inner);
825 }
826 }
827 }
828
829 fn is_materialized(&self) -> bool {
830 self.skip_creating_dependencies
831 || self.materialized_dependencies.len() == self.dependencies.len()
832 }
833
834 #[cfg(feature = "tokio")]
835 async fn materialize_deps(
836 &mut self,
837 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
838 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
839 let mut deps = self.materialized_dependencies.clone();
842 let mut dependency_map = parent_map.clone();
843 for (k, v) in &deps {
844 dependency_map.insert(k.clone(), v.clone());
845 }
846
847 let sorted_dependencies = self.sorted_dependencies();
848 for dep in &sorted_dependencies {
849 if deps.contains_key(&dep.name) {
850 continue;
851 }
852 let materialized_dep = match &dep.constructor {
853 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
854 DependencyConstructor::Async(cons) => cons(Arc::new(dependency_map.clone())).await,
855 };
856 deps.insert(dep.name.clone(), materialized_dep.clone());
857 dependency_map.insert(dep.name.clone(), materialized_dep);
858 }
859 self.materialized_dependencies = deps;
860 dependency_map
861 }
862
863 fn materialize_deps_sync(
864 &mut self,
865 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
866 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
867 let mut deps = self.materialized_dependencies.clone();
870 let mut dependency_map = parent_map.clone();
871 for (k, v) in &deps {
872 dependency_map.insert(k.clone(), v.clone());
873 }
874
875 let sorted_dependencies = self.sorted_dependencies();
876 for dep in &sorted_dependencies {
877 if deps.contains_key(&dep.name) {
878 continue;
879 }
880 let materialized_dep = match &dep.constructor {
881 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
882 DependencyConstructor::Async(cons) => {
883 futures::executor::block_on(cons(Arc::new(dependency_map.clone())))
884 }
885 };
886 deps.insert(dep.name.clone(), materialized_dep.clone());
887 dependency_map.insert(dep.name.clone(), materialized_dep);
888 }
889 self.materialized_dependencies = deps;
890 dependency_map
891 }
892
893 fn sorted_dependencies(&self) -> Vec<&RegisteredDependency> {
894 let mut ts: TopologicalSort<&RegisteredDependency> = TopologicalSort::new();
895 for dep in &self.dependencies {
896 let mut added = false;
897 for dep_dep_name in &dep.dependencies {
898 if let Some(dep_dep) = self.dependencies.iter().find(|d| &d.name == dep_dep_name) {
899 ts.add_dependency(dep_dep, dep);
900 added = true;
901 } else {
902 }
904 }
905 if !added {
906 ts.insert(dep);
907 }
908 }
909 let mut result = Vec::with_capacity(self.dependencies.len());
910 loop {
911 let chunk = ts.pop_all();
912 if chunk.is_empty() {
913 break;
914 }
915 result.extend(chunk);
916 }
917 result
918 }
919
920 fn drop_deps(&mut self) {
921 self.materialized_dependencies.clear();
922 }
923
924 fn prune_unused_deps(&mut self) -> Option<HashSet<String>> {
928 let mut needed: Option<HashSet<String>> = Some(HashSet::new());
930 for test in &self.tests {
931 match &test.dependencies {
932 None => {
933 needed = None;
934 break;
935 }
936 Some(deps) => {
937 if let Some(ref mut set) = needed {
938 set.extend(deps.iter().cloned());
939 }
940 }
941 }
942 }
943
944 for inner in &mut self.inner {
946 let child_needs = inner.prune_unused_deps();
947 needed = match (needed, child_needs) {
948 (None, _) | (_, None) => None,
949 (Some(mut a), Some(b)) => {
950 a.extend(b);
951 Some(a)
952 }
953 };
954 }
955
956 let needed = needed?;
958
959 let local_names: HashSet<String> =
961 self.dependencies.iter().map(|d| d.name.clone()).collect();
962 let mut keep_local: HashSet<String> = needed.intersection(&local_names).cloned().collect();
963
964 let mut queue: VecDeque<String> = keep_local.iter().cloned().collect();
966 let mut needed_from_parent: HashSet<String> =
967 needed.difference(&local_names).cloned().collect();
968
969 while let Some(dep_name) = queue.pop_front() {
970 if let Some(dep) = self.dependencies.iter().find(|d| d.name == dep_name) {
971 for transitive in &dep.dependencies {
972 if local_names.contains(transitive) {
973 if keep_local.insert(transitive.clone()) {
974 queue.push_back(transitive.clone());
975 }
976 } else {
977 needed_from_parent.insert(transitive.clone());
978 }
979 }
980 for companion in &dep.companions {
989 if local_names.contains(companion) {
990 if keep_local.insert(companion.clone()) {
991 queue.push_back(companion.clone());
992 }
993 } else {
994 needed_from_parent.insert(companion.clone());
995 }
996 }
997 }
998 }
999
1000 self.dependencies.retain(|d| keep_local.contains(&d.name));
1002
1003 Some(needed_from_parent)
1004 }
1005
1006 fn is_prefix_of(this: &str, that: &str) -> bool {
1007 this.is_empty() || this == that || that.starts_with(&format!("{this}::"))
1008 }
1009
1010 fn next_level(from: &str, to: &str) -> String {
1011 assert!(Self::is_prefix_of(from, to));
1012 let remaining = if from.is_empty() {
1013 to
1014 } else {
1015 &to[from.len() + 2..]
1016 };
1017
1018 let result = if let Some((next, _tail)) = remaining.split_once("::") {
1019 format!("{from}::{next}")
1020 } else {
1021 format!("{from}::{remaining}")
1022 };
1023 result.trim_start_matches("::").to_string()
1024 }
1025
1026 fn propagate_sequential(&mut self, inherited_lock: Option<&SequentialExecutionLock>) {
1027 if let Some(parent_lock) = inherited_lock {
1028 self.is_sequential = true;
1029 self.sequential_lock = parent_lock.clone();
1030 }
1031
1032 let lock_for_children = if self.is_sequential {
1033 Some(self.sequential_lock.clone())
1034 } else {
1035 None
1036 };
1037
1038 for child in &mut self.inner {
1039 child.propagate_sequential(lock_for_children.as_ref());
1040 }
1041 }
1042}
1043
1044impl Debug for TestSuiteExecution {
1045 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1046 writeln!(
1047 f,
1048 "'{}' {} [{}]",
1049 self.crate_and_module,
1050 self.props
1051 .iter()
1052 .map(|x| format!("{x:?}"))
1053 .collect::<Vec<_>>()
1054 .join(", "),
1055 if self.is_sequential { "S" } else { "P" }
1056 )?;
1057 writeln!(f, " deps:")?;
1058 for dep in &self.dependencies {
1059 writeln!(f, " '{}'", dep.name)?;
1060 }
1061 writeln!(f, " tests:")?;
1062 for test in &self.tests {
1063 writeln!(f, " '{}' [{:?}]", test.name, test.props.test_type)?;
1064 }
1065 for inner in &self.inner {
1066 let inner_str = format!("{inner:?}");
1067 for inner_line in inner_str.lines() {
1068 writeln!(f, " {inner_line}")?;
1069 }
1070 }
1071 Ok(())
1072 }
1073}
1074
1075impl DependencyView for HashMap<String, Arc<dyn Any + Send + Sync>> {
1076 fn get(&self, name: &str) -> Option<Arc<dyn Any + Send + Sync>> {
1077 self.get(name).cloned()
1078 }
1079}
1080
1081pub struct TestExecution {
1082 pub test: RegisteredTest,
1083 pub deps: Arc<dyn DependencyView + Send + Sync>,
1084 pub index: usize,
1085 _seq_lock: SequentialExecutionLockGuard,
1086 in_progress_counter: Arc<AtomicUsize>,
1087}
1088
1089impl Drop for TestExecution {
1090 fn drop(&mut self) {
1091 self.in_progress_counter.fetch_sub(1, Ordering::Release);
1092 }
1093}
1094
1095#[allow(dead_code)]
1096enum SequentialExecutionLockGuard {
1097 None,
1098 #[cfg(feature = "tokio")]
1099 Async(tokio::sync::OwnedMutexGuard<()>),
1100 Sync(parking_lot::ArcMutexGuard<parking_lot::RawMutex, ()>),
1101}
1102
1103#[derive(Clone)]
1104struct SequentialExecutionLock {
1105 #[cfg(feature = "tokio")]
1106 async_mutex: Arc<tokio::sync::Mutex<()>>,
1107 sync_mutex: Arc<parking_lot::Mutex<()>>,
1108}
1109
1110impl SequentialExecutionLock {
1111 pub fn new() -> Self {
1112 Self {
1113 #[cfg(feature = "tokio")]
1114 async_mutex: Arc::new(tokio::sync::Mutex::new(())),
1115 sync_mutex: Arc::new(parking_lot::Mutex::new(())),
1116 }
1117 }
1118
1119 #[cfg(feature = "tokio")]
1120 pub async fn is_locked(&self) -> bool {
1121 self.async_mutex.try_lock().is_err()
1122 }
1123
1124 pub fn is_locked_sync(&self) -> bool {
1125 self.sync_mutex.try_lock().is_none()
1126 }
1127
1128 #[cfg(feature = "tokio")]
1129 pub async fn lock(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
1130 if is_sequential {
1131 let permit = tokio::sync::Mutex::lock_owned(self.async_mutex.clone()).await;
1132 SequentialExecutionLockGuard::Async(permit)
1133 } else {
1134 SequentialExecutionLockGuard::None
1135 }
1136 }
1137
1138 pub fn lock_sync(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
1139 if is_sequential {
1140 let permit = parking_lot::Mutex::lock_arc(&self.sync_mutex);
1141 SequentialExecutionLockGuard::Sync(permit)
1142 } else {
1143 SequentialExecutionLockGuard::None
1144 }
1145 }
1146}
1147
1148#[cfg(test)]
1149mod tests;