1use rand::prelude::{SliceRandom, StdRng};
2use rand::SeedableRng;
3use std::any::Any;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::fmt::{Debug, Formatter};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use topological_sort::TopologicalSort;
9
10use crate::args::Arguments;
11use crate::internal::{
12 apply_suite_props_to_tests, filter_registered_tests, DepScope, DependencyConstructor,
13 DependencyView, HostedRpcOwnerCell, RegisteredDependency, RegisteredTest,
14 RegisteredTestSuiteProperty,
15};
16
17pub type DepWireBytes = (String, Vec<u8>);
20
21pub type HostedOwner = Arc<dyn Any + Send + Sync>;
24
25#[cfg(feature = "tokio")]
26type ParentSharedDependenciesFuture<'a> = std::pin::Pin<
27 Box<dyn std::future::Future<Output = HashMap<String, Arc<dyn Any + Send + Sync>>> + 'a>,
28>;
29
30#[cfg(test)]
31pub type HostedDescriptorCollection = (Vec<DepWireBytes>, Vec<HostedOwner>);
36
37pub struct ParentSharedDependencies {
41 pub cloneable_wire_bytes: Vec<DepWireBytes>,
42 pub cloneable_local_values: Vec<(String, Arc<dyn Any + Send + Sync>)>,
56 pub hosted_descriptor_bytes: Vec<DepWireBytes>,
57 pub hosted_owners: Vec<HostedOwner>,
58 pub hosted_rpc_owner_cells: Vec<(String, Arc<HostedRpcOwnerCell>)>,
59 pub parent_constructed_shared_values: Vec<(String, Arc<dyn Any + Send + Sync>)>,
75}
76
77impl ParentSharedDependencies {
78 fn new() -> Self {
79 Self {
80 cloneable_wire_bytes: Vec::new(),
81 cloneable_local_values: Vec::new(),
82 hosted_descriptor_bytes: Vec::new(),
83 hosted_owners: Vec::new(),
84 hosted_rpc_owner_cells: Vec::new(),
85 parent_constructed_shared_values: Vec::new(),
86 }
87 }
88}
89
90struct ParentConstructionPlan {
100 needed_here: HashSet<String>,
103 children: Vec<ParentConstructionPlan>,
106}
107
108pub(crate) struct TestSuiteExecution {
109 crate_and_module: String,
110 dependencies: Vec<RegisteredDependency>,
111 tests: Vec<RegisteredTest>,
112 props: Vec<RegisteredTestSuiteProperty>,
113 inner: Vec<TestSuiteExecution>,
114 materialized_dependencies: HashMap<String, Arc<dyn Any + Send + Sync>>,
115 sequential_lock: SequentialExecutionLock,
116 remaining_count: usize,
117 idx: usize,
118 is_sequential: bool,
119 skip_creating_dependencies: bool,
120 in_progress: Arc<AtomicUsize>,
121}
122
123impl TestSuiteExecution {
124 pub fn construct(
125 arguments: &Arguments,
126 dependencies: &[RegisteredDependency],
127 tests: &[RegisteredTest],
128 props: &[RegisteredTestSuiteProperty],
129 ) -> (Self, Vec<RegisteredTest>) {
130 let tests_with_props = apply_suite_props_to_tests(tests, props);
131 let mut filtered_tests = filter_registered_tests(arguments, &tests_with_props);
132 Self::shuffle(arguments, &mut filtered_tests);
133 filtered_tests.reverse();
134
135 if filtered_tests.is_empty() {
136 (
137 Self::root(
138 dependencies
139 .iter()
140 .filter(|dep| dep.crate_name.is_empty() && dep.module_path.is_empty())
141 .cloned()
142 .collect::<Vec<_>>(),
143 Vec::new(),
144 props
145 .iter()
146 .filter(|dep| dep.crate_name().is_empty() && dep.module_path().is_empty())
147 .cloned()
148 .collect::<Vec<_>>(),
149 ),
150 Vec::new(),
151 )
152 } else {
153 let mut root = Self::root(Vec::new(), Vec::new(), Vec::new());
154
155 for prop in props {
156 root.add_prop(prop.clone());
157 }
158
159 for dep in dependencies {
160 root.add_dependency(dep.clone());
161 }
162
163 for test in filtered_tests.clone() {
164 root.add_test(test.clone());
165 }
166
167 root.propagate_sequential(None);
168 root.prune_unused_deps();
169
170 (root, filtered_tests)
171 }
172 }
173
174 fn shuffle(arguments: &Arguments, tests: &mut [RegisteredTest]) {
175 if let Some(seed) = arguments.shuffle_seed {
176 let mut rng = StdRng::seed_from_u64(seed);
177 tests.shuffle(&mut rng);
178 }
179 }
180
181 pub fn skip_creating_dependencies(&mut self) {
184 self.skip_creating_dependencies = true;
185 for inner in &mut self.inner {
186 inner.skip_creating_dependencies();
187 }
188 }
189
190 pub fn remaining(&self) -> usize {
191 self.remaining_count
192 }
193
194 pub fn is_empty(&self) -> bool {
195 self.tests.is_empty() && self.inner.is_empty()
196 }
197
198 pub fn is_done(&self) -> bool {
199 self.remaining_count == 0
200 }
201
202 #[allow(dead_code)]
204 pub fn has_dependencies(&self) -> bool {
205 !self.dependencies.is_empty() || self.inner.iter().any(|inner| inner.has_dependencies())
206 }
207
208 pub fn has_shared_dependencies(&self) -> bool {
212 self.dependencies
213 .iter()
214 .any(|d| d.scope == DepScope::Shared)
215 || self
216 .inner
217 .iter()
218 .any(|inner| inner.has_shared_dependencies())
219 }
220
221 #[allow(dead_code)]
223 pub fn has_cloneable_dependencies(&self) -> bool {
224 self.dependencies
225 .iter()
226 .any(|d| d.scope == DepScope::Cloneable)
227 || self
228 .inner
229 .iter()
230 .any(|inner| inner.has_cloneable_dependencies())
231 }
232
233 #[allow(dead_code)]
237 pub fn has_hosted_dependencies(&self) -> bool {
238 self.dependencies
239 .iter()
240 .any(|d| d.scope == DepScope::Hosted)
241 || self
242 .inner
243 .iter()
244 .any(|inner| inner.has_hosted_dependencies())
245 }
246
247 #[allow(dead_code)]
251 pub fn has_hosted_rpc_dependencies(&self) -> bool {
252 self.dependencies
253 .iter()
254 .any(|d| d.scope == DepScope::HostedRpc)
255 || self
256 .inner
257 .iter()
258 .any(|inner| inner.has_hosted_rpc_dependencies())
259 }
260
261 #[allow(dead_code)]
263 pub fn collect_cloneable_dependencies(&self) -> Vec<RegisteredDependency> {
264 let mut out = Vec::new();
265 self.collect_cloneable_dependencies_into(&mut out);
266 out
267 }
268
269 #[allow(dead_code)]
270 fn collect_cloneable_dependencies_into(&self, out: &mut Vec<RegisteredDependency>) {
271 for dep in &self.dependencies {
272 if dep.scope == DepScope::Cloneable {
273 out.push(dep.clone());
274 }
275 }
276 for inner in &self.inner {
277 inner.collect_cloneable_dependencies_into(out);
278 }
279 }
280
281 pub fn collect_parent_shared_dependencies_sync(&self) -> ParentSharedDependencies {
288 let mut out = ParentSharedDependencies::new();
289 let parent_map = HashMap::new();
290 let (plan, _needed_from_parent) = self.compute_parent_construction_plan();
291 self.collect_parent_shared_dependencies_into_sync(&parent_map, &plan, &mut out);
292 out
293 }
294
295 fn collect_parent_shared_dependencies_into_sync(
296 &self,
297 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
298 plan: &ParentConstructionPlan,
299 out: &mut ParentSharedDependencies,
300 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
301 let mut dependency_map = parent_map.clone();
302 let sorted_dependencies = self.sorted_dependencies();
303 let mut constructed_here: HashSet<String> = HashSet::new();
309
310 for dep in sorted_dependencies {
311 if !plan.needed_here.contains(&dep.name) {
316 continue;
317 }
318 if !constructed_here.insert(dep.name.clone()) {
322 continue;
323 }
324
325 let value = Self::construct_dependency_sync(dep, &dependency_map);
326 match dep.scope {
327 DepScope::Cloneable => {
328 let codec = dep.cloneable_codec.as_ref().unwrap_or_else(|| {
329 panic!("Cloneable dep '{}' missing CloneableCodec", dep.name)
330 });
331 out.cloneable_wire_bytes
332 .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
333 out.cloneable_local_values
338 .push((dep.qualified_id(), value.clone()));
339 }
340 DepScope::Hosted => {
341 let codec = dep.hosted_codec.as_ref().unwrap_or_else(|| {
342 panic!("Hosted dep '{}' missing hosted codec", dep.name)
343 });
344 out.hosted_descriptor_bytes
345 .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
346 out.hosted_owners.push(value.clone());
347 }
348 DepScope::HostedRpc => {
349 let factory = dep.rpc_factory.as_ref().unwrap_or_else(|| {
350 panic!("HostedRpc dep '{}' missing RpcFactory", dep.name)
351 });
352 let cell = (factory.owner_into_cell)(value.clone());
353 out.hosted_rpc_owner_cells.push((dep.qualified_id(), cell));
354 }
355 DepScope::Shared | DepScope::PerWorker => {
356 out.parent_constructed_shared_values
362 .push((dep.qualified_id(), value.clone()));
363 }
364 }
365
366 dependency_map.insert(dep.name.clone(), value);
367 }
368
369 debug_assert_eq!(
370 self.inner.len(),
371 plan.children.len(),
372 "ParentConstructionPlan child count must mirror execution tree shape"
373 );
374 for (inner, child_plan) in self.inner.iter().zip(plan.children.iter()) {
375 inner.collect_parent_shared_dependencies_into_sync(&dependency_map, child_plan, out);
376 }
377
378 dependency_map
379 }
380
381 fn construct_dependency_sync(
382 dep: &RegisteredDependency,
383 dependency_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
384 ) -> Arc<dyn Any + Send + Sync> {
385 match &dep.constructor {
386 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
387 DependencyConstructor::Async(cons) => {
388 futures::executor::block_on(cons(Arc::new(dependency_map.clone())))
389 }
390 }
391 }
392
393 #[cfg(test)]
398 pub fn collect_cloneable_wire_bytes_sync(&self) -> Vec<(String, Vec<u8>)> {
399 self.collect_parent_shared_dependencies_sync()
400 .cloneable_wire_bytes
401 }
402
403 #[cfg(test)]
411 pub fn collect_hosted_descriptor_bytes_sync(&self) -> HostedDescriptorCollection {
412 let collected = self.collect_parent_shared_dependencies_sync();
413 (collected.hosted_descriptor_bytes, collected.hosted_owners)
414 }
415
416 #[cfg(test)]
421 pub fn collect_hosted_rpc_owner_cells_sync(&self) -> Vec<(String, Arc<HostedRpcOwnerCell>)> {
422 self.collect_parent_shared_dependencies_sync()
423 .hosted_rpc_owner_cells
424 }
425
426 #[cfg(feature = "tokio")]
430 pub async fn collect_parent_shared_dependencies_async(&self) -> ParentSharedDependencies {
431 let mut out = ParentSharedDependencies::new();
432 let parent_map = HashMap::new();
433 let (plan, _needed_from_parent) = self.compute_parent_construction_plan();
434 self.collect_parent_shared_dependencies_into_async(&parent_map, &plan, &mut out)
435 .await;
436 out
437 }
438
439 #[cfg(feature = "tokio")]
440 fn collect_parent_shared_dependencies_into_async<'a>(
441 &'a self,
442 parent_map: &'a HashMap<String, Arc<dyn Any + Send + Sync>>,
443 plan: &'a ParentConstructionPlan,
444 out: &'a mut ParentSharedDependencies,
445 ) -> ParentSharedDependenciesFuture<'a> {
446 Box::pin(async move {
447 let mut dependency_map = parent_map.clone();
448 let sorted_dependencies = self.sorted_dependencies();
449 let mut constructed_here: HashSet<String> = HashSet::new();
453
454 for dep in sorted_dependencies {
455 if !plan.needed_here.contains(&dep.name) {
459 continue;
460 }
461 if !constructed_here.insert(dep.name.clone()) {
462 continue;
463 }
464
465 let value = match &dep.constructor {
466 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
467 DependencyConstructor::Async(cons) => {
468 cons(Arc::new(dependency_map.clone())).await
469 }
470 };
471 match dep.scope {
472 DepScope::Cloneable => {
473 let codec = dep.cloneable_codec.as_ref().unwrap_or_else(|| {
474 panic!("Cloneable dep '{}' missing CloneableCodec", dep.name)
475 });
476 out.cloneable_wire_bytes
477 .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
478 out.cloneable_local_values
484 .push((dep.qualified_id(), value.clone()));
485 }
486 DepScope::Hosted => {
487 let codec = dep.hosted_codec.as_ref().unwrap_or_else(|| {
488 panic!("Hosted dep '{}' missing hosted codec", dep.name)
489 });
490 out.hosted_descriptor_bytes
491 .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
492 out.hosted_owners.push(value.clone());
493 }
494 DepScope::HostedRpc => {
495 let factory = dep.rpc_factory.as_ref().unwrap_or_else(|| {
496 panic!("HostedRpc dep '{}' missing RpcFactory", dep.name)
497 });
498 let cell = (factory.owner_into_cell)(value.clone());
499 out.hosted_rpc_owner_cells.push((dep.qualified_id(), cell));
500 }
501 DepScope::Shared | DepScope::PerWorker => {
502 out.parent_constructed_shared_values
506 .push((dep.qualified_id(), value.clone()));
507 }
508 }
509
510 dependency_map.insert(dep.name.clone(), value);
511 }
512
513 debug_assert_eq!(
514 self.inner.len(),
515 plan.children.len(),
516 "ParentConstructionPlan child count must mirror execution tree shape"
517 );
518 for (inner, child_plan) in self.inner.iter().zip(plan.children.iter()) {
519 inner
520 .collect_parent_shared_dependencies_into_async(&dependency_map, child_plan, out)
521 .await;
522 }
523 dependency_map
524 })
525 }
526
527 #[cfg(feature = "tokio")]
529 #[cfg(test)]
530 pub async fn collect_hosted_descriptor_bytes_async(&self) -> HostedDescriptorCollection {
531 let collected = self.collect_parent_shared_dependencies_async().await;
532 (collected.hosted_descriptor_bytes, collected.hosted_owners)
533 }
534
535 #[cfg(feature = "tokio")]
545 #[cfg(test)]
546 pub async fn collect_cloneable_wire_bytes_async(&self) -> Vec<(String, Vec<u8>)> {
547 self.collect_parent_shared_dependencies_async()
548 .await
549 .cloneable_wire_bytes
550 }
551
552 pub fn provide_cloneable_value(
561 &mut self,
562 dep_id: &str,
563 value: Arc<dyn Any + Send + Sync>,
564 ) -> bool {
565 let applied = self.provide_cloneable_value_internal(dep_id, value);
566 if applied {
567 self.prune_unused_deps();
568 }
569 applied
570 }
571
572 fn provide_cloneable_value_internal(
573 &mut self,
574 dep_id: &str,
575 value: Arc<dyn Any + Send + Sync>,
576 ) -> bool {
577 let mut applied = false;
578 if let Some((local_name, dep_idx)) = self
579 .dependencies
580 .iter()
581 .enumerate()
582 .find(|(_, d)| d.qualified_id() == dep_id)
583 .map(|(idx, d)| (d.name.clone(), idx))
584 {
585 self.dependencies[dep_idx].dependencies.clear();
590 self.materialized_dependencies
591 .insert(local_name, value.clone());
592 applied = true;
593 }
594 for inner in &mut self.inner {
595 applied |= inner.provide_cloneable_value_internal(dep_id, value.clone());
596 }
597 applied
598 }
599
600 pub fn provide_materialized_shared_value(
615 &mut self,
616 dep_id: &str,
617 value: Arc<dyn Any + Send + Sync>,
618 ) -> bool {
619 let mut applied = false;
620 if let Some(local_name) = self
621 .dependencies
622 .iter()
623 .find(|d| d.qualified_id() == dep_id)
624 .map(|d| d.name.clone())
625 {
626 self.materialized_dependencies
627 .insert(local_name, value.clone());
628 applied = true;
629 }
630 for inner in &mut self.inner {
631 applied |= inner.provide_materialized_shared_value(dep_id, value.clone());
632 }
633 applied
634 }
635
636 pub fn requires_capturing(&self, capture_by_default: bool) -> bool {
639 self.tests.iter().any(|test| {
640 test.props
641 .capture_control
642 .requires_capturing(capture_by_default)
643 }) || self
644 .inner
645 .iter()
646 .any(|inner| inner.requires_capturing(capture_by_default))
647 }
648
649 #[cfg(feature = "tokio")]
650 pub async fn pick_next(&mut self) -> Option<TestExecution> {
651 if self.is_empty() {
652 None
653 } else {
654 match self
655 .pick_next_internal(&self.create_dependency_map(&HashMap::new()))
656 .await
657 {
658 Some((test, deps, seq_lock, in_progress_counter)) => {
659 let index = self.idx;
660 self.idx += 1;
661 Some(TestExecution {
662 test: test.clone(),
663 deps: Arc::new(deps),
664 index,
665 _seq_lock: seq_lock,
666 in_progress_counter,
667 })
668 }
669 None => None,
670 }
671 }
672 }
673
674 pub fn pick_next_sync(&mut self) -> Option<TestExecution> {
675 match self.pick_next_internal_sync(&HashMap::new()) {
676 Some((test, deps, seq_lock, in_progress_counter)) => {
677 let index = self.idx;
678 self.idx += 1;
679 Some(TestExecution {
680 test: test.clone(),
681 deps: Arc::new(deps),
682 index,
683 _seq_lock: seq_lock,
684 in_progress_counter,
685 })
686 }
687 None => None,
688 }
689 }
690
691 #[cfg(feature = "tokio")]
692 #[allow(clippy::type_complexity)]
693 async fn pick_next_internal(
694 &mut self,
695 materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
696 ) -> Option<(
697 RegisteredTest,
698 HashMap<String, Arc<dyn Any + Send + Sync>>,
699 SequentialExecutionLockGuard,
700 Arc<AtomicUsize>,
701 )> {
702 if self.is_empty() {
703 None
704 } else {
705 let dependency_map = if !self.is_materialized() {
706 self.materialize_deps(materialized_parent_deps).await
707 } else {
708 self.create_dependency_map(materialized_parent_deps)
709 };
710
711 let locked = self.sequential_lock.is_locked().await;
712 let result = if self.tests.is_empty() || locked {
713 let current = self.inner.iter_mut();
714 let mut result = None;
715 for inner in current {
716 if let Some((test, deps, seq_lock, in_progress_counter)) =
717 Box::pin(inner.pick_next_internal(&dependency_map)).await
718 {
719 result = Some((test, deps, seq_lock, in_progress_counter));
720 break;
721 }
722 }
723 self.inner.retain(|inner| !inner.is_empty());
724
725 result
726 } else {
727 let guard = self.sequential_lock.lock(self.is_sequential).await;
728 self.in_progress.fetch_add(1, Ordering::Release);
729 self.tests
730 .pop()
731 .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
732 };
733 if result.is_none()
734 && self.is_empty()
735 && self.is_materialized()
736 && !locked
737 && self.in_progress.load(Ordering::Acquire) == 0
738 {
739 self.drop_deps();
740 }
741 if result.is_some() {
742 self.remaining_count -= 1;
743 }
744 result
745 }
746 }
747
748 #[allow(clippy::type_complexity)]
749 fn pick_next_internal_sync(
750 &mut self,
751 materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
752 ) -> Option<(
753 RegisteredTest,
754 HashMap<String, Arc<dyn Any + Send + Sync>>,
755 SequentialExecutionLockGuard,
756 Arc<AtomicUsize>,
757 )> {
758 if self.is_empty() {
759 None
760 } else {
761 let dependency_map = if !self.is_materialized() {
762 self.materialize_deps_sync(materialized_parent_deps)
763 } else {
764 self.create_dependency_map(materialized_parent_deps)
765 };
766
767 let locked = self.sequential_lock.is_locked_sync();
768 let result = if self.tests.is_empty() || locked {
769 let current = self.inner.iter_mut();
770 let mut result = None;
771 for inner in current {
772 if let Some((test, deps, seq_lock, in_progress_counter)) =
773 inner.pick_next_internal_sync(&dependency_map)
774 {
775 result = Some((test, deps, seq_lock, in_progress_counter));
776 break;
777 }
778 }
779
780 self.inner.retain(|inner| !inner.is_empty());
781 result
782 } else {
783 let guard = self.sequential_lock.lock_sync(self.is_sequential);
784 self.in_progress.fetch_add(1, Ordering::Release);
785 self.tests
786 .pop()
787 .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
788 };
789 if result.is_none()
793 && self.is_empty()
794 && self.is_materialized()
795 && !locked
796 && self.in_progress.load(Ordering::Acquire) == 0
797 {
798 self.drop_deps();
799 }
800 if result.is_some() {
801 self.remaining_count -= 1;
802 }
803 result
804 }
805 }
806
807 fn create_dependency_map(
808 &self,
809 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
810 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
811 let mut result = parent_map.clone();
812 for (key, dep) in &self.materialized_dependencies {
813 result.insert(key.clone(), dep.clone());
814 }
815 result
816 }
817
818 fn root(
819 deps: Vec<RegisteredDependency>,
820 tests: Vec<RegisteredTest>,
821 props: Vec<RegisteredTestSuiteProperty>,
822 ) -> Self {
823 let total_count = tests.len();
824 let is_sequential = props
825 .iter()
826 .any(|prop| matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }))
827 || tests.iter().any(|test| test.run.is_bench());
828 Self {
829 crate_and_module: String::new(),
830 dependencies: deps,
831 tests,
832 props,
833 inner: Vec::new(),
834 materialized_dependencies: HashMap::new(),
835 remaining_count: total_count,
836 idx: 0,
837 sequential_lock: SequentialExecutionLock::new(),
838 is_sequential,
839 skip_creating_dependencies: false,
840 in_progress: Arc::new(AtomicUsize::new(0)),
841 }
842 }
843
844 fn add_dependency(&mut self, dep: RegisteredDependency) {
845 let crate_and_module = dep.crate_and_module();
846 if self.crate_and_module == crate_and_module {
847 self.dependencies.push(dep);
848 } else {
849 let mut found = false;
850 for inner in &mut self.inner {
851 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
852 inner.add_dependency(dep.clone());
853 found = true;
854 break;
855 }
856 }
857 if !found {
858 let mut inner = Self {
859 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
860 dependencies: vec![],
861 tests: vec![],
862 inner: vec![],
863 props: vec![],
864 materialized_dependencies: HashMap::new(),
865 remaining_count: 0,
866 idx: 0,
867 is_sequential: false,
868 sequential_lock: SequentialExecutionLock::new(),
869 skip_creating_dependencies: false,
870 in_progress: Arc::new(AtomicUsize::new(0)),
871 };
872 inner.add_dependency(dep);
873 self.inner.push(inner);
874 }
875 }
876 }
877
878 fn add_test(&mut self, test: RegisteredTest) {
879 let crate_and_module = test.crate_and_module();
880 if self.crate_and_module == crate_and_module {
881 self.tests.push(test.clone());
882
883 if test.run.is_bench() {
884 self.is_sequential = true;
885 }
886 } else {
887 let mut found = false;
888 for inner in &mut self.inner {
889 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
890 inner.add_test(test.clone());
891 found = true;
892 break;
893 }
894 }
895 if !found {
896 let mut inner = Self {
897 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
898 dependencies: vec![],
899 tests: vec![],
900 inner: vec![],
901 props: vec![],
902 materialized_dependencies: HashMap::new(),
903 remaining_count: 0,
904 idx: 0,
905 is_sequential: false,
906 sequential_lock: SequentialExecutionLock::new(),
907 skip_creating_dependencies: false,
908 in_progress: Arc::new(AtomicUsize::new(0)),
909 };
910 inner.add_test(test);
911 self.inner.push(inner);
912 }
913 }
914 self.remaining_count += 1;
915 }
916
917 fn add_prop(&mut self, prop: RegisteredTestSuiteProperty) {
918 let crate_and_module = prop.crate_and_module();
919 if self.crate_and_module == crate_and_module {
920 if matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }) {
921 self.is_sequential = true;
922 }
923 self.props.push(prop);
924 } else {
925 let mut found = false;
926 for inner in &mut self.inner {
927 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
928 inner.add_prop(prop.clone());
929 found = true;
930 break;
931 }
932 }
933 if !found {
934 let mut inner = Self {
935 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
936 dependencies: vec![],
937 tests: vec![],
938 inner: vec![],
939 props: vec![],
940 materialized_dependencies: HashMap::new(),
941 remaining_count: 0,
942 idx: 0,
943 is_sequential: false,
944 sequential_lock: SequentialExecutionLock::new(),
945 skip_creating_dependencies: false,
946 in_progress: Arc::new(AtomicUsize::new(0)),
947 };
948 inner.add_prop(prop);
949 self.inner.push(inner);
950 }
951 }
952 }
953
954 fn is_materialized(&self) -> bool {
955 self.skip_creating_dependencies
956 || self.materialized_dependencies.len() == self.dependencies.len()
957 }
958
959 #[cfg(feature = "tokio")]
960 async fn materialize_deps(
961 &mut self,
962 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
963 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
964 let mut deps = self.materialized_dependencies.clone();
967 let mut dependency_map = parent_map.clone();
968 for (k, v) in &deps {
969 dependency_map.insert(k.clone(), v.clone());
970 }
971
972 let sorted_dependencies = self.sorted_dependencies();
973 for dep in &sorted_dependencies {
974 if deps.contains_key(&dep.name) {
975 continue;
976 }
977 let materialized_dep = match &dep.constructor {
978 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
979 DependencyConstructor::Async(cons) => cons(Arc::new(dependency_map.clone())).await,
980 };
981 deps.insert(dep.name.clone(), materialized_dep.clone());
982 dependency_map.insert(dep.name.clone(), materialized_dep);
983 }
984 self.materialized_dependencies = deps;
985 dependency_map
986 }
987
988 fn materialize_deps_sync(
989 &mut self,
990 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
991 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
992 let mut deps = self.materialized_dependencies.clone();
995 let mut dependency_map = parent_map.clone();
996 for (k, v) in &deps {
997 dependency_map.insert(k.clone(), v.clone());
998 }
999
1000 let sorted_dependencies = self.sorted_dependencies();
1001 for dep in &sorted_dependencies {
1002 if deps.contains_key(&dep.name) {
1003 continue;
1004 }
1005 let materialized_dep = match &dep.constructor {
1006 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
1007 DependencyConstructor::Async(cons) => {
1008 futures::executor::block_on(cons(Arc::new(dependency_map.clone())))
1009 }
1010 };
1011 deps.insert(dep.name.clone(), materialized_dep.clone());
1012 dependency_map.insert(dep.name.clone(), materialized_dep);
1013 }
1014 self.materialized_dependencies = deps;
1015 dependency_map
1016 }
1017
1018 fn compute_parent_construction_plan(&self) -> (ParentConstructionPlan, HashSet<String>) {
1029 let mut child_plans = Vec::with_capacity(self.inner.len());
1031 let mut seeds: HashSet<String> = HashSet::new();
1032 for inner in &self.inner {
1033 let (child_plan, child_needs_from_parent) = inner.compute_parent_construction_plan();
1034 for name in child_needs_from_parent {
1035 seeds.insert(name);
1036 }
1037 child_plans.push(child_plan);
1038 }
1039
1040 for dep in &self.dependencies {
1042 if matches!(
1043 dep.scope,
1044 DepScope::Cloneable | DepScope::Hosted | DepScope::HostedRpc
1045 ) {
1046 seeds.insert(dep.name.clone());
1047 }
1048 }
1049
1050 let mut visited: HashSet<String> = HashSet::new();
1054 let mut needed_from_parent: HashSet<String> = HashSet::new();
1055 let mut stack: Vec<String> = seeds.into_iter().collect();
1056 while let Some(name) = stack.pop() {
1057 if !visited.insert(name.clone()) {
1058 continue;
1059 }
1060 if let Some(dep) = self.dependencies.iter().find(|d| d.name == name) {
1061 for dep_dep_name in &dep.dependencies {
1062 if !visited.contains(dep_dep_name) {
1063 stack.push(dep_dep_name.clone());
1064 }
1065 }
1066 } else {
1067 needed_from_parent.insert(name);
1068 }
1069 }
1070
1071 let own_names: HashSet<String> = self.dependencies.iter().map(|d| d.name.clone()).collect();
1072 let needed_here: HashSet<String> = visited.intersection(&own_names).cloned().collect();
1073
1074 (
1075 ParentConstructionPlan {
1076 needed_here,
1077 children: child_plans,
1078 },
1079 needed_from_parent,
1080 )
1081 }
1082
1083 fn sorted_dependencies(&self) -> Vec<&RegisteredDependency> {
1084 let mut ts: TopologicalSort<&RegisteredDependency> = TopologicalSort::new();
1085 for dep in &self.dependencies {
1086 let mut added = false;
1087 for dep_dep_name in &dep.dependencies {
1088 if let Some(dep_dep) = self.dependencies.iter().find(|d| &d.name == dep_dep_name) {
1089 ts.add_dependency(dep_dep, dep);
1090 added = true;
1091 } else {
1092 }
1094 }
1095 if !added {
1096 ts.insert(dep);
1097 }
1098 }
1099 let mut result = Vec::with_capacity(self.dependencies.len());
1100 loop {
1101 let chunk = ts.pop_all();
1102 if chunk.is_empty() {
1103 break;
1104 }
1105 result.extend(chunk);
1106 }
1107 result
1108 }
1109
1110 fn drop_deps(&mut self) {
1111 self.materialized_dependencies.clear();
1112 }
1113
1114 fn prune_unused_deps(&mut self) -> Option<HashSet<String>> {
1118 let mut needed: Option<HashSet<String>> = Some(HashSet::new());
1120 for test in &self.tests {
1121 match &test.dependencies {
1122 None => {
1123 needed = None;
1124 break;
1125 }
1126 Some(deps) => {
1127 if let Some(ref mut set) = needed {
1128 set.extend(deps.iter().cloned());
1129 }
1130 }
1131 }
1132 }
1133
1134 for inner in &mut self.inner {
1136 let child_needs = inner.prune_unused_deps();
1137 needed = match (needed, child_needs) {
1138 (None, _) | (_, None) => None,
1139 (Some(mut a), Some(b)) => {
1140 a.extend(b);
1141 Some(a)
1142 }
1143 };
1144 }
1145
1146 let needed = needed?;
1148
1149 let local_names: HashSet<String> =
1151 self.dependencies.iter().map(|d| d.name.clone()).collect();
1152 let mut keep_local: HashSet<String> = needed.intersection(&local_names).cloned().collect();
1153
1154 let mut queue: VecDeque<String> = keep_local.iter().cloned().collect();
1156 let mut needed_from_parent: HashSet<String> =
1157 needed.difference(&local_names).cloned().collect();
1158
1159 while let Some(dep_name) = queue.pop_front() {
1160 if let Some(dep) = self.dependencies.iter().find(|d| d.name == dep_name) {
1161 for transitive in &dep.dependencies {
1162 if local_names.contains(transitive) {
1163 if keep_local.insert(transitive.clone()) {
1164 queue.push_back(transitive.clone());
1165 }
1166 } else {
1167 needed_from_parent.insert(transitive.clone());
1168 }
1169 }
1170 for companion in &dep.companions {
1179 if local_names.contains(companion) {
1180 if keep_local.insert(companion.clone()) {
1181 queue.push_back(companion.clone());
1182 }
1183 } else {
1184 needed_from_parent.insert(companion.clone());
1185 }
1186 }
1187 }
1188 }
1189
1190 self.dependencies.retain(|d| keep_local.contains(&d.name));
1192
1193 Some(needed_from_parent)
1194 }
1195
1196 fn is_prefix_of(this: &str, that: &str) -> bool {
1197 this.is_empty() || this == that || that.starts_with(&format!("{this}::"))
1198 }
1199
1200 fn next_level(from: &str, to: &str) -> String {
1201 assert!(Self::is_prefix_of(from, to));
1202 let remaining = if from.is_empty() {
1203 to
1204 } else {
1205 &to[from.len() + 2..]
1206 };
1207
1208 let result = if let Some((next, _tail)) = remaining.split_once("::") {
1209 format!("{from}::{next}")
1210 } else {
1211 format!("{from}::{remaining}")
1212 };
1213 result.trim_start_matches("::").to_string()
1214 }
1215
1216 fn propagate_sequential(&mut self, inherited_lock: Option<&SequentialExecutionLock>) {
1217 if let Some(parent_lock) = inherited_lock {
1218 self.is_sequential = true;
1219 self.sequential_lock = parent_lock.clone();
1220 }
1221
1222 let lock_for_children = if self.is_sequential {
1223 Some(self.sequential_lock.clone())
1224 } else {
1225 None
1226 };
1227
1228 for child in &mut self.inner {
1229 child.propagate_sequential(lock_for_children.as_ref());
1230 }
1231 }
1232}
1233
1234impl Debug for TestSuiteExecution {
1235 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1236 writeln!(
1237 f,
1238 "'{}' {} [{}]",
1239 self.crate_and_module,
1240 self.props
1241 .iter()
1242 .map(|x| format!("{x:?}"))
1243 .collect::<Vec<_>>()
1244 .join(", "),
1245 if self.is_sequential { "S" } else { "P" }
1246 )?;
1247 writeln!(f, " deps:")?;
1248 for dep in &self.dependencies {
1249 writeln!(f, " '{}'", dep.name)?;
1250 }
1251 writeln!(f, " tests:")?;
1252 for test in &self.tests {
1253 writeln!(f, " '{}' [{:?}]", test.name, test.props.test_type)?;
1254 }
1255 for inner in &self.inner {
1256 let inner_str = format!("{inner:?}");
1257 for inner_line in inner_str.lines() {
1258 writeln!(f, " {inner_line}")?;
1259 }
1260 }
1261 Ok(())
1262 }
1263}
1264
1265impl DependencyView for HashMap<String, Arc<dyn Any + Send + Sync>> {
1266 fn get(&self, name: &str) -> Option<Arc<dyn Any + Send + Sync>> {
1267 self.get(name).cloned()
1268 }
1269}
1270
1271pub struct TestExecution {
1272 pub test: RegisteredTest,
1273 pub deps: Arc<dyn DependencyView + Send + Sync>,
1274 pub index: usize,
1275 _seq_lock: SequentialExecutionLockGuard,
1276 in_progress_counter: Arc<AtomicUsize>,
1277}
1278
1279impl Drop for TestExecution {
1280 fn drop(&mut self) {
1281 self.in_progress_counter.fetch_sub(1, Ordering::Release);
1282 }
1283}
1284
1285#[allow(dead_code)]
1286enum SequentialExecutionLockGuard {
1287 None,
1288 #[cfg(feature = "tokio")]
1289 Async(tokio::sync::OwnedMutexGuard<()>),
1290 Sync(parking_lot::ArcMutexGuard<parking_lot::RawMutex, ()>),
1291}
1292
1293#[derive(Clone)]
1294struct SequentialExecutionLock {
1295 #[cfg(feature = "tokio")]
1296 async_mutex: Arc<tokio::sync::Mutex<()>>,
1297 sync_mutex: Arc<parking_lot::Mutex<()>>,
1298}
1299
1300impl SequentialExecutionLock {
1301 pub fn new() -> Self {
1302 Self {
1303 #[cfg(feature = "tokio")]
1304 async_mutex: Arc::new(tokio::sync::Mutex::new(())),
1305 sync_mutex: Arc::new(parking_lot::Mutex::new(())),
1306 }
1307 }
1308
1309 #[cfg(feature = "tokio")]
1310 pub async fn is_locked(&self) -> bool {
1311 self.async_mutex.try_lock().is_err()
1312 }
1313
1314 pub fn is_locked_sync(&self) -> bool {
1315 self.sync_mutex.try_lock().is_none()
1316 }
1317
1318 #[cfg(feature = "tokio")]
1319 pub async fn lock(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
1320 if is_sequential {
1321 let permit = tokio::sync::Mutex::lock_owned(self.async_mutex.clone()).await;
1322 SequentialExecutionLockGuard::Async(permit)
1323 } else {
1324 SequentialExecutionLockGuard::None
1325 }
1326 }
1327
1328 pub fn lock_sync(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
1329 if is_sequential {
1330 let permit = parking_lot::Mutex::lock_arc(&self.sync_mutex);
1331 SequentialExecutionLockGuard::Sync(permit)
1332 } else {
1333 SequentialExecutionLockGuard::None
1334 }
1335 }
1336}
1337
1338#[cfg(test)]
1339mod tests;