1use rand::prelude::{SliceRandom, StdRng};
2use rand::SeedableRng;
3use std::any::Any;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::fmt::{Debug, Formatter};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use topological_sort::TopologicalSort;
9
10use crate::args::Arguments;
11use crate::internal::{
12 apply_suite_props_to_tests, filter_registered_tests, DepScope, DependencyConstructor,
13 DependencyView, HostedRpcOwnerCell, RegisteredDependency, RegisteredTest,
14 RegisteredTestSuiteProperty,
15};
16
17pub type DepWireBytes = (String, Vec<u8>);
20
21pub type HostedOwner = Arc<dyn Any + Send + Sync>;
24
25#[cfg(feature = "tokio")]
26type ParentSharedDependenciesFuture<'a> = std::pin::Pin<
27 Box<dyn std::future::Future<Output = HashMap<String, Arc<dyn Any + Send + Sync>>> + 'a>,
28>;
29
30#[cfg(test)]
31pub type HostedDescriptorCollection = (Vec<DepWireBytes>, Vec<HostedOwner>);
36
37pub struct ParentSharedDependencies {
41 pub cloneable_wire_bytes: Vec<DepWireBytes>,
42 pub cloneable_local_values: Vec<(String, Arc<dyn Any + Send + Sync>)>,
56 pub hosted_descriptor_bytes: Vec<DepWireBytes>,
57 pub hosted_owners: Vec<HostedOwner>,
58 pub hosted_rpc_owner_cells: Vec<(String, Arc<HostedRpcOwnerCell>)>,
59}
60
61impl ParentSharedDependencies {
62 fn new() -> Self {
63 Self {
64 cloneable_wire_bytes: Vec::new(),
65 cloneable_local_values: Vec::new(),
66 hosted_descriptor_bytes: Vec::new(),
67 hosted_owners: Vec::new(),
68 hosted_rpc_owner_cells: Vec::new(),
69 }
70 }
71}
72
73pub(crate) struct TestSuiteExecution {
74 crate_and_module: String,
75 dependencies: Vec<RegisteredDependency>,
76 tests: Vec<RegisteredTest>,
77 props: Vec<RegisteredTestSuiteProperty>,
78 inner: Vec<TestSuiteExecution>,
79 materialized_dependencies: HashMap<String, Arc<dyn Any + Send + Sync>>,
80 sequential_lock: SequentialExecutionLock,
81 remaining_count: usize,
82 idx: usize,
83 is_sequential: bool,
84 skip_creating_dependencies: bool,
85 in_progress: Arc<AtomicUsize>,
86}
87
88impl TestSuiteExecution {
89 pub fn construct(
90 arguments: &Arguments,
91 dependencies: &[RegisteredDependency],
92 tests: &[RegisteredTest],
93 props: &[RegisteredTestSuiteProperty],
94 ) -> (Self, Vec<RegisteredTest>) {
95 let tests_with_props = apply_suite_props_to_tests(tests, props);
96 let mut filtered_tests = filter_registered_tests(arguments, &tests_with_props);
97 Self::shuffle(arguments, &mut filtered_tests);
98 filtered_tests.reverse();
99
100 if filtered_tests.is_empty() {
101 (
102 Self::root(
103 dependencies
104 .iter()
105 .filter(|dep| dep.crate_name.is_empty() && dep.module_path.is_empty())
106 .cloned()
107 .collect::<Vec<_>>(),
108 Vec::new(),
109 props
110 .iter()
111 .filter(|dep| dep.crate_name().is_empty() && dep.module_path().is_empty())
112 .cloned()
113 .collect::<Vec<_>>(),
114 ),
115 Vec::new(),
116 )
117 } else {
118 let mut root = Self::root(Vec::new(), Vec::new(), Vec::new());
119
120 for prop in props {
121 root.add_prop(prop.clone());
122 }
123
124 for dep in dependencies {
125 root.add_dependency(dep.clone());
126 }
127
128 for test in filtered_tests.clone() {
129 root.add_test(test.clone());
130 }
131
132 root.propagate_sequential(None);
133 root.prune_unused_deps();
134
135 (root, filtered_tests)
136 }
137 }
138
139 fn shuffle(arguments: &Arguments, tests: &mut [RegisteredTest]) {
140 if let Some(seed) = arguments.shuffle_seed {
141 let mut rng = StdRng::seed_from_u64(seed);
142 tests.shuffle(&mut rng);
143 }
144 }
145
146 pub fn skip_creating_dependencies(&mut self) {
149 self.skip_creating_dependencies = true;
150 for inner in &mut self.inner {
151 inner.skip_creating_dependencies();
152 }
153 }
154
155 pub fn remaining(&self) -> usize {
156 self.remaining_count
157 }
158
159 pub fn is_empty(&self) -> bool {
160 self.tests.is_empty() && self.inner.is_empty()
161 }
162
163 pub fn is_done(&self) -> bool {
164 self.remaining_count == 0
165 }
166
167 #[allow(dead_code)]
169 pub fn has_dependencies(&self) -> bool {
170 !self.dependencies.is_empty() || self.inner.iter().any(|inner| inner.has_dependencies())
171 }
172
173 pub fn has_shared_dependencies(&self) -> bool {
177 self.dependencies
178 .iter()
179 .any(|d| d.scope == DepScope::Shared)
180 || self
181 .inner
182 .iter()
183 .any(|inner| inner.has_shared_dependencies())
184 }
185
186 #[allow(dead_code)]
188 pub fn has_cloneable_dependencies(&self) -> bool {
189 self.dependencies
190 .iter()
191 .any(|d| d.scope == DepScope::Cloneable)
192 || self
193 .inner
194 .iter()
195 .any(|inner| inner.has_cloneable_dependencies())
196 }
197
198 #[allow(dead_code)]
202 pub fn has_hosted_dependencies(&self) -> bool {
203 self.dependencies
204 .iter()
205 .any(|d| d.scope == DepScope::Hosted)
206 || self
207 .inner
208 .iter()
209 .any(|inner| inner.has_hosted_dependencies())
210 }
211
212 #[allow(dead_code)]
216 pub fn has_hosted_rpc_dependencies(&self) -> bool {
217 self.dependencies
218 .iter()
219 .any(|d| d.scope == DepScope::HostedRpc)
220 || self
221 .inner
222 .iter()
223 .any(|inner| inner.has_hosted_rpc_dependencies())
224 }
225
226 #[allow(dead_code)]
228 pub fn collect_cloneable_dependencies(&self) -> Vec<RegisteredDependency> {
229 let mut out = Vec::new();
230 self.collect_cloneable_dependencies_into(&mut out);
231 out
232 }
233
234 #[allow(dead_code)]
235 fn collect_cloneable_dependencies_into(&self, out: &mut Vec<RegisteredDependency>) {
236 for dep in &self.dependencies {
237 if dep.scope == DepScope::Cloneable {
238 out.push(dep.clone());
239 }
240 }
241 for inner in &self.inner {
242 inner.collect_cloneable_dependencies_into(out);
243 }
244 }
245
246 pub fn collect_parent_shared_dependencies_sync(&self) -> ParentSharedDependencies {
253 let mut out = ParentSharedDependencies::new();
254 let parent_map = HashMap::new();
255 self.collect_parent_shared_dependencies_into_sync(&parent_map, &mut out);
256 out
257 }
258
259 fn collect_parent_shared_dependencies_into_sync(
260 &self,
261 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
262 out: &mut ParentSharedDependencies,
263 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
264 let mut dependency_map = parent_map.clone();
265 let sorted_dependencies = self.sorted_dependencies();
266
267 for dep in sorted_dependencies {
268 if dependency_map.contains_key(&dep.name) {
269 continue;
270 }
271
272 let value = Self::construct_dependency_sync(dep, &dependency_map);
273 match dep.scope {
274 DepScope::Cloneable => {
275 let codec = dep.cloneable_codec.as_ref().unwrap_or_else(|| {
276 panic!("Cloneable dep '{}' missing CloneableCodec", dep.name)
277 });
278 out.cloneable_wire_bytes
279 .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
280 out.cloneable_local_values
285 .push((dep.qualified_id(), value.clone()));
286 }
287 DepScope::Hosted => {
288 let codec = dep.hosted_codec.as_ref().unwrap_or_else(|| {
289 panic!("Hosted dep '{}' missing hosted codec", dep.name)
290 });
291 out.hosted_descriptor_bytes
292 .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
293 out.hosted_owners.push(value.clone());
294 }
295 DepScope::HostedRpc => {
296 let factory = dep.rpc_factory.as_ref().unwrap_or_else(|| {
297 panic!("HostedRpc dep '{}' missing RpcFactory", dep.name)
298 });
299 let cell = (factory.owner_into_cell)(value.clone());
300 out.hosted_rpc_owner_cells.push((dep.qualified_id(), cell));
301 }
302 DepScope::Shared | DepScope::PerWorker => {}
303 }
304
305 dependency_map.insert(dep.name.clone(), value);
306 }
307
308 for inner in &self.inner {
309 inner.collect_parent_shared_dependencies_into_sync(&dependency_map, out);
310 }
311
312 dependency_map
313 }
314
315 fn construct_dependency_sync(
316 dep: &RegisteredDependency,
317 dependency_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
318 ) -> Arc<dyn Any + Send + Sync> {
319 match &dep.constructor {
320 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
321 DependencyConstructor::Async(cons) => {
322 futures::executor::block_on(cons(Arc::new(dependency_map.clone())))
323 }
324 }
325 }
326
327 #[cfg(test)]
332 pub fn collect_cloneable_wire_bytes_sync(&self) -> Vec<(String, Vec<u8>)> {
333 self.collect_parent_shared_dependencies_sync()
334 .cloneable_wire_bytes
335 }
336
337 #[cfg(test)]
345 pub fn collect_hosted_descriptor_bytes_sync(&self) -> HostedDescriptorCollection {
346 let collected = self.collect_parent_shared_dependencies_sync();
347 (collected.hosted_descriptor_bytes, collected.hosted_owners)
348 }
349
350 #[cfg(test)]
355 pub fn collect_hosted_rpc_owner_cells_sync(&self) -> Vec<(String, Arc<HostedRpcOwnerCell>)> {
356 self.collect_parent_shared_dependencies_sync()
357 .hosted_rpc_owner_cells
358 }
359
360 #[cfg(feature = "tokio")]
364 pub async fn collect_parent_shared_dependencies_async(&self) -> ParentSharedDependencies {
365 let mut out = ParentSharedDependencies::new();
366 let parent_map = HashMap::new();
367 self.collect_parent_shared_dependencies_into_async(&parent_map, &mut out)
368 .await;
369 out
370 }
371
372 #[cfg(feature = "tokio")]
373 fn collect_parent_shared_dependencies_into_async<'a>(
374 &'a self,
375 parent_map: &'a HashMap<String, Arc<dyn Any + Send + Sync>>,
376 out: &'a mut ParentSharedDependencies,
377 ) -> ParentSharedDependenciesFuture<'a> {
378 Box::pin(async move {
379 let mut dependency_map = parent_map.clone();
380 let sorted_dependencies = self.sorted_dependencies();
381
382 for dep in sorted_dependencies {
383 if dependency_map.contains_key(&dep.name) {
384 continue;
385 }
386
387 let value = match &dep.constructor {
388 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
389 DependencyConstructor::Async(cons) => {
390 cons(Arc::new(dependency_map.clone())).await
391 }
392 };
393 match dep.scope {
394 DepScope::Cloneable => {
395 let codec = dep.cloneable_codec.as_ref().unwrap_or_else(|| {
396 panic!("Cloneable dep '{}' missing CloneableCodec", dep.name)
397 });
398 out.cloneable_wire_bytes
399 .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
400 out.cloneable_local_values
406 .push((dep.qualified_id(), value.clone()));
407 }
408 DepScope::Hosted => {
409 let codec = dep.hosted_codec.as_ref().unwrap_or_else(|| {
410 panic!("Hosted dep '{}' missing hosted codec", dep.name)
411 });
412 out.hosted_descriptor_bytes
413 .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
414 out.hosted_owners.push(value.clone());
415 }
416 DepScope::HostedRpc => {
417 let factory = dep.rpc_factory.as_ref().unwrap_or_else(|| {
418 panic!("HostedRpc dep '{}' missing RpcFactory", dep.name)
419 });
420 let cell = (factory.owner_into_cell)(value.clone());
421 out.hosted_rpc_owner_cells.push((dep.qualified_id(), cell));
422 }
423 DepScope::Shared | DepScope::PerWorker => {}
424 }
425
426 dependency_map.insert(dep.name.clone(), value);
427 }
428
429 for inner in &self.inner {
430 inner
431 .collect_parent_shared_dependencies_into_async(&dependency_map, out)
432 .await;
433 }
434 dependency_map
435 })
436 }
437
438 #[cfg(feature = "tokio")]
440 #[cfg(test)]
441 pub async fn collect_hosted_descriptor_bytes_async(&self) -> HostedDescriptorCollection {
442 let collected = self.collect_parent_shared_dependencies_async().await;
443 (collected.hosted_descriptor_bytes, collected.hosted_owners)
444 }
445
446 #[cfg(feature = "tokio")]
456 #[cfg(test)]
457 pub async fn collect_cloneable_wire_bytes_async(&self) -> Vec<(String, Vec<u8>)> {
458 self.collect_parent_shared_dependencies_async()
459 .await
460 .cloneable_wire_bytes
461 }
462
463 pub fn provide_cloneable_value(
472 &mut self,
473 dep_id: &str,
474 value: Arc<dyn Any + Send + Sync>,
475 ) -> bool {
476 let applied = self.provide_cloneable_value_internal(dep_id, value);
477 if applied {
478 self.prune_unused_deps();
479 }
480 applied
481 }
482
483 fn provide_cloneable_value_internal(
484 &mut self,
485 dep_id: &str,
486 value: Arc<dyn Any + Send + Sync>,
487 ) -> bool {
488 let mut applied = false;
489 if let Some((local_name, dep_idx)) = self
490 .dependencies
491 .iter()
492 .enumerate()
493 .find(|(_, d)| d.qualified_id() == dep_id)
494 .map(|(idx, d)| (d.name.clone(), idx))
495 {
496 self.dependencies[dep_idx].dependencies.clear();
501 self.materialized_dependencies
502 .insert(local_name, value.clone());
503 applied = true;
504 }
505 for inner in &mut self.inner {
506 applied |= inner.provide_cloneable_value_internal(dep_id, value.clone());
507 }
508 applied
509 }
510
511 pub fn requires_capturing(&self, capture_by_default: bool) -> bool {
514 self.tests.iter().any(|test| {
515 test.props
516 .capture_control
517 .requires_capturing(capture_by_default)
518 }) || self
519 .inner
520 .iter()
521 .any(|inner| inner.requires_capturing(capture_by_default))
522 }
523
524 #[cfg(feature = "tokio")]
525 pub async fn pick_next(&mut self) -> Option<TestExecution> {
526 if self.is_empty() {
527 None
528 } else {
529 match self
530 .pick_next_internal(&self.create_dependency_map(&HashMap::new()))
531 .await
532 {
533 Some((test, deps, seq_lock, in_progress_counter)) => {
534 let index = self.idx;
535 self.idx += 1;
536 Some(TestExecution {
537 test: test.clone(),
538 deps: Arc::new(deps),
539 index,
540 _seq_lock: seq_lock,
541 in_progress_counter,
542 })
543 }
544 None => None,
545 }
546 }
547 }
548
549 pub fn pick_next_sync(&mut self) -> Option<TestExecution> {
550 match self.pick_next_internal_sync(&HashMap::new()) {
551 Some((test, deps, seq_lock, in_progress_counter)) => {
552 let index = self.idx;
553 self.idx += 1;
554 Some(TestExecution {
555 test: test.clone(),
556 deps: Arc::new(deps),
557 index,
558 _seq_lock: seq_lock,
559 in_progress_counter,
560 })
561 }
562 None => None,
563 }
564 }
565
566 #[cfg(feature = "tokio")]
567 #[allow(clippy::type_complexity)]
568 async fn pick_next_internal(
569 &mut self,
570 materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
571 ) -> Option<(
572 RegisteredTest,
573 HashMap<String, Arc<dyn Any + Send + Sync>>,
574 SequentialExecutionLockGuard,
575 Arc<AtomicUsize>,
576 )> {
577 if self.is_empty() {
578 None
579 } else {
580 let dependency_map = if !self.is_materialized() {
581 self.materialize_deps(materialized_parent_deps).await
582 } else {
583 self.create_dependency_map(materialized_parent_deps)
584 };
585
586 let locked = self.sequential_lock.is_locked().await;
587 let result = if self.tests.is_empty() || locked {
588 let current = self.inner.iter_mut();
589 let mut result = None;
590 for inner in current {
591 if let Some((test, deps, seq_lock, in_progress_counter)) =
592 Box::pin(inner.pick_next_internal(&dependency_map)).await
593 {
594 result = Some((test, deps, seq_lock, in_progress_counter));
595 break;
596 }
597 }
598 self.inner.retain(|inner| !inner.is_empty());
599
600 result
601 } else {
602 let guard = self.sequential_lock.lock(self.is_sequential).await;
603 self.in_progress.fetch_add(1, Ordering::Release);
604 self.tests
605 .pop()
606 .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
607 };
608 if result.is_none()
609 && self.is_empty()
610 && self.is_materialized()
611 && !locked
612 && self.in_progress.load(Ordering::Acquire) == 0
613 {
614 self.drop_deps();
615 }
616 if result.is_some() {
617 self.remaining_count -= 1;
618 }
619 result
620 }
621 }
622
623 #[allow(clippy::type_complexity)]
624 fn pick_next_internal_sync(
625 &mut self,
626 materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
627 ) -> Option<(
628 RegisteredTest,
629 HashMap<String, Arc<dyn Any + Send + Sync>>,
630 SequentialExecutionLockGuard,
631 Arc<AtomicUsize>,
632 )> {
633 if self.is_empty() {
634 None
635 } else {
636 let dependency_map = if !self.is_materialized() {
637 self.materialize_deps_sync(materialized_parent_deps)
638 } else {
639 self.create_dependency_map(materialized_parent_deps)
640 };
641
642 let locked = self.sequential_lock.is_locked_sync();
643 let result = if self.tests.is_empty() || locked {
644 let current = self.inner.iter_mut();
645 let mut result = None;
646 for inner in current {
647 if let Some((test, deps, seq_lock, in_progress_counter)) =
648 inner.pick_next_internal_sync(&dependency_map)
649 {
650 result = Some((test, deps, seq_lock, in_progress_counter));
651 break;
652 }
653 }
654
655 self.inner.retain(|inner| !inner.is_empty());
656 result
657 } else {
658 let guard = self.sequential_lock.lock_sync(self.is_sequential);
659 self.in_progress.fetch_add(1, Ordering::Release);
660 self.tests
661 .pop()
662 .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
663 };
664 if result.is_none()
665 && self.is_materialized()
666 && !locked
667 && self.in_progress.load(Ordering::Acquire) == 0
668 {
669 self.drop_deps();
670 }
671 if result.is_some() {
672 self.remaining_count -= 1;
673 }
674 result
675 }
676 }
677
678 fn create_dependency_map(
679 &self,
680 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
681 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
682 let mut result = parent_map.clone();
683 for (key, dep) in &self.materialized_dependencies {
684 result.insert(key.clone(), dep.clone());
685 }
686 result
687 }
688
689 fn root(
690 deps: Vec<RegisteredDependency>,
691 tests: Vec<RegisteredTest>,
692 props: Vec<RegisteredTestSuiteProperty>,
693 ) -> Self {
694 let total_count = tests.len();
695 let is_sequential = props
696 .iter()
697 .any(|prop| matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }))
698 || tests.iter().any(|test| test.run.is_bench());
699 Self {
700 crate_and_module: String::new(),
701 dependencies: deps,
702 tests,
703 props,
704 inner: Vec::new(),
705 materialized_dependencies: HashMap::new(),
706 remaining_count: total_count,
707 idx: 0,
708 sequential_lock: SequentialExecutionLock::new(),
709 is_sequential,
710 skip_creating_dependencies: false,
711 in_progress: Arc::new(AtomicUsize::new(0)),
712 }
713 }
714
715 fn add_dependency(&mut self, dep: RegisteredDependency) {
716 let crate_and_module = dep.crate_and_module();
717 if self.crate_and_module == crate_and_module {
718 self.dependencies.push(dep);
719 } else {
720 let mut found = false;
721 for inner in &mut self.inner {
722 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
723 inner.add_dependency(dep.clone());
724 found = true;
725 break;
726 }
727 }
728 if !found {
729 let mut inner = Self {
730 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
731 dependencies: vec![],
732 tests: vec![],
733 inner: vec![],
734 props: vec![],
735 materialized_dependencies: HashMap::new(),
736 remaining_count: 0,
737 idx: 0,
738 is_sequential: false,
739 sequential_lock: SequentialExecutionLock::new(),
740 skip_creating_dependencies: false,
741 in_progress: Arc::new(AtomicUsize::new(0)),
742 };
743 inner.add_dependency(dep);
744 self.inner.push(inner);
745 }
746 }
747 }
748
749 fn add_test(&mut self, test: RegisteredTest) {
750 let crate_and_module = test.crate_and_module();
751 if self.crate_and_module == crate_and_module {
752 self.tests.push(test.clone());
753
754 if test.run.is_bench() {
755 self.is_sequential = true;
756 }
757 } else {
758 let mut found = false;
759 for inner in &mut self.inner {
760 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
761 inner.add_test(test.clone());
762 found = true;
763 break;
764 }
765 }
766 if !found {
767 let mut inner = Self {
768 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
769 dependencies: vec![],
770 tests: vec![],
771 inner: vec![],
772 props: vec![],
773 materialized_dependencies: HashMap::new(),
774 remaining_count: 0,
775 idx: 0,
776 is_sequential: false,
777 sequential_lock: SequentialExecutionLock::new(),
778 skip_creating_dependencies: false,
779 in_progress: Arc::new(AtomicUsize::new(0)),
780 };
781 inner.add_test(test);
782 self.inner.push(inner);
783 }
784 }
785 self.remaining_count += 1;
786 }
787
788 fn add_prop(&mut self, prop: RegisteredTestSuiteProperty) {
789 let crate_and_module = prop.crate_and_module();
790 if self.crate_and_module == crate_and_module {
791 if matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }) {
792 self.is_sequential = true;
793 }
794 self.props.push(prop);
795 } else {
796 let mut found = false;
797 for inner in &mut self.inner {
798 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
799 inner.add_prop(prop.clone());
800 found = true;
801 break;
802 }
803 }
804 if !found {
805 let mut inner = Self {
806 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
807 dependencies: vec![],
808 tests: vec![],
809 inner: vec![],
810 props: vec![],
811 materialized_dependencies: HashMap::new(),
812 remaining_count: 0,
813 idx: 0,
814 is_sequential: false,
815 sequential_lock: SequentialExecutionLock::new(),
816 skip_creating_dependencies: false,
817 in_progress: Arc::new(AtomicUsize::new(0)),
818 };
819 inner.add_prop(prop);
820 self.inner.push(inner);
821 }
822 }
823 }
824
825 fn is_materialized(&self) -> bool {
826 self.skip_creating_dependencies
827 || self.materialized_dependencies.len() == self.dependencies.len()
828 }
829
830 #[cfg(feature = "tokio")]
831 async fn materialize_deps(
832 &mut self,
833 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
834 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
835 let mut deps = self.materialized_dependencies.clone();
838 let mut dependency_map = parent_map.clone();
839 for (k, v) in &deps {
840 dependency_map.insert(k.clone(), v.clone());
841 }
842
843 let sorted_dependencies = self.sorted_dependencies();
844 for dep in &sorted_dependencies {
845 if deps.contains_key(&dep.name) {
846 continue;
847 }
848 let materialized_dep = match &dep.constructor {
849 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
850 DependencyConstructor::Async(cons) => cons(Arc::new(dependency_map.clone())).await,
851 };
852 deps.insert(dep.name.clone(), materialized_dep.clone());
853 dependency_map.insert(dep.name.clone(), materialized_dep);
854 }
855 self.materialized_dependencies = deps;
856 dependency_map
857 }
858
859 fn materialize_deps_sync(
860 &mut self,
861 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
862 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
863 let mut deps = self.materialized_dependencies.clone();
866 let mut dependency_map = parent_map.clone();
867 for (k, v) in &deps {
868 dependency_map.insert(k.clone(), v.clone());
869 }
870
871 let sorted_dependencies = self.sorted_dependencies();
872 for dep in &sorted_dependencies {
873 if deps.contains_key(&dep.name) {
874 continue;
875 }
876 let materialized_dep = match &dep.constructor {
877 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
878 DependencyConstructor::Async(cons) => {
879 futures::executor::block_on(cons(Arc::new(dependency_map.clone())))
880 }
881 };
882 deps.insert(dep.name.clone(), materialized_dep.clone());
883 dependency_map.insert(dep.name.clone(), materialized_dep);
884 }
885 self.materialized_dependencies = deps;
886 dependency_map
887 }
888
889 fn sorted_dependencies(&self) -> Vec<&RegisteredDependency> {
890 let mut ts: TopologicalSort<&RegisteredDependency> = TopologicalSort::new();
891 for dep in &self.dependencies {
892 let mut added = false;
893 for dep_dep_name in &dep.dependencies {
894 if let Some(dep_dep) = self.dependencies.iter().find(|d| &d.name == dep_dep_name) {
895 ts.add_dependency(dep_dep, dep);
896 added = true;
897 } else {
898 }
900 }
901 if !added {
902 ts.insert(dep);
903 }
904 }
905 let mut result = Vec::with_capacity(self.dependencies.len());
906 loop {
907 let chunk = ts.pop_all();
908 if chunk.is_empty() {
909 break;
910 }
911 result.extend(chunk);
912 }
913 result
914 }
915
916 fn drop_deps(&mut self) {
917 self.materialized_dependencies.clear();
918 }
919
920 fn prune_unused_deps(&mut self) -> Option<HashSet<String>> {
924 let mut needed: Option<HashSet<String>> = Some(HashSet::new());
926 for test in &self.tests {
927 match &test.dependencies {
928 None => {
929 needed = None;
930 break;
931 }
932 Some(deps) => {
933 if let Some(ref mut set) = needed {
934 set.extend(deps.iter().cloned());
935 }
936 }
937 }
938 }
939
940 for inner in &mut self.inner {
942 let child_needs = inner.prune_unused_deps();
943 needed = match (needed, child_needs) {
944 (None, _) | (_, None) => None,
945 (Some(mut a), Some(b)) => {
946 a.extend(b);
947 Some(a)
948 }
949 };
950 }
951
952 let needed = needed?;
954
955 let local_names: HashSet<String> =
957 self.dependencies.iter().map(|d| d.name.clone()).collect();
958 let mut keep_local: HashSet<String> = needed.intersection(&local_names).cloned().collect();
959
960 let mut queue: VecDeque<String> = keep_local.iter().cloned().collect();
962 let mut needed_from_parent: HashSet<String> =
963 needed.difference(&local_names).cloned().collect();
964
965 while let Some(dep_name) = queue.pop_front() {
966 if let Some(dep) = self.dependencies.iter().find(|d| d.name == dep_name) {
967 for transitive in &dep.dependencies {
968 if local_names.contains(transitive) {
969 if keep_local.insert(transitive.clone()) {
970 queue.push_back(transitive.clone());
971 }
972 } else {
973 needed_from_parent.insert(transitive.clone());
974 }
975 }
976 for companion in &dep.companions {
985 if local_names.contains(companion) {
986 if keep_local.insert(companion.clone()) {
987 queue.push_back(companion.clone());
988 }
989 } else {
990 needed_from_parent.insert(companion.clone());
991 }
992 }
993 }
994 }
995
996 self.dependencies.retain(|d| keep_local.contains(&d.name));
998
999 Some(needed_from_parent)
1000 }
1001
1002 fn is_prefix_of(this: &str, that: &str) -> bool {
1003 this.is_empty() || this == that || that.starts_with(&format!("{this}::"))
1004 }
1005
1006 fn next_level(from: &str, to: &str) -> String {
1007 assert!(Self::is_prefix_of(from, to));
1008 let remaining = if from.is_empty() {
1009 to
1010 } else {
1011 &to[from.len() + 2..]
1012 };
1013
1014 let result = if let Some((next, _tail)) = remaining.split_once("::") {
1015 format!("{from}::{next}")
1016 } else {
1017 format!("{from}::{remaining}")
1018 };
1019 result.trim_start_matches("::").to_string()
1020 }
1021
1022 fn propagate_sequential(&mut self, inherited_lock: Option<&SequentialExecutionLock>) {
1023 if let Some(parent_lock) = inherited_lock {
1024 self.is_sequential = true;
1025 self.sequential_lock = parent_lock.clone();
1026 }
1027
1028 let lock_for_children = if self.is_sequential {
1029 Some(self.sequential_lock.clone())
1030 } else {
1031 None
1032 };
1033
1034 for child in &mut self.inner {
1035 child.propagate_sequential(lock_for_children.as_ref());
1036 }
1037 }
1038}
1039
1040impl Debug for TestSuiteExecution {
1041 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1042 writeln!(
1043 f,
1044 "'{}' {} [{}]",
1045 self.crate_and_module,
1046 self.props
1047 .iter()
1048 .map(|x| format!("{x:?}"))
1049 .collect::<Vec<_>>()
1050 .join(", "),
1051 if self.is_sequential { "S" } else { "P" }
1052 )?;
1053 writeln!(f, " deps:")?;
1054 for dep in &self.dependencies {
1055 writeln!(f, " '{}'", dep.name)?;
1056 }
1057 writeln!(f, " tests:")?;
1058 for test in &self.tests {
1059 writeln!(f, " '{}' [{:?}]", test.name, test.props.test_type)?;
1060 }
1061 for inner in &self.inner {
1062 let inner_str = format!("{inner:?}");
1063 for inner_line in inner_str.lines() {
1064 writeln!(f, " {inner_line}")?;
1065 }
1066 }
1067 Ok(())
1068 }
1069}
1070
1071impl DependencyView for HashMap<String, Arc<dyn Any + Send + Sync>> {
1072 fn get(&self, name: &str) -> Option<Arc<dyn Any + Send + Sync>> {
1073 self.get(name).cloned()
1074 }
1075}
1076
1077pub struct TestExecution {
1078 pub test: RegisteredTest,
1079 pub deps: Arc<dyn DependencyView + Send + Sync>,
1080 pub index: usize,
1081 _seq_lock: SequentialExecutionLockGuard,
1082 in_progress_counter: Arc<AtomicUsize>,
1083}
1084
1085impl Drop for TestExecution {
1086 fn drop(&mut self) {
1087 self.in_progress_counter.fetch_sub(1, Ordering::Release);
1088 }
1089}
1090
1091#[allow(dead_code)]
1092enum SequentialExecutionLockGuard {
1093 None,
1094 #[cfg(feature = "tokio")]
1095 Async(tokio::sync::OwnedMutexGuard<()>),
1096 Sync(parking_lot::ArcMutexGuard<parking_lot::RawMutex, ()>),
1097}
1098
1099#[derive(Clone)]
1100struct SequentialExecutionLock {
1101 #[cfg(feature = "tokio")]
1102 async_mutex: Arc<tokio::sync::Mutex<()>>,
1103 sync_mutex: Arc<parking_lot::Mutex<()>>,
1104}
1105
1106impl SequentialExecutionLock {
1107 pub fn new() -> Self {
1108 Self {
1109 #[cfg(feature = "tokio")]
1110 async_mutex: Arc::new(tokio::sync::Mutex::new(())),
1111 sync_mutex: Arc::new(parking_lot::Mutex::new(())),
1112 }
1113 }
1114
1115 #[cfg(feature = "tokio")]
1116 pub async fn is_locked(&self) -> bool {
1117 self.async_mutex.try_lock().is_err()
1118 }
1119
1120 pub fn is_locked_sync(&self) -> bool {
1121 self.sync_mutex.try_lock().is_none()
1122 }
1123
1124 #[cfg(feature = "tokio")]
1125 pub async fn lock(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
1126 if is_sequential {
1127 let permit = tokio::sync::Mutex::lock_owned(self.async_mutex.clone()).await;
1128 SequentialExecutionLockGuard::Async(permit)
1129 } else {
1130 SequentialExecutionLockGuard::None
1131 }
1132 }
1133
1134 pub fn lock_sync(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
1135 if is_sequential {
1136 let permit = parking_lot::Mutex::lock_arc(&self.sync_mutex);
1137 SequentialExecutionLockGuard::Sync(permit)
1138 } else {
1139 SequentialExecutionLockGuard::None
1140 }
1141 }
1142}
1143
1144#[cfg(test)]
1145mod cloneable_tests {
1146 use super::*;
1147 use crate::internal::{
1148 CloneableCodec, DependencyConstructor, RegisteredDependency, RegisteredTest, TestFunction,
1149 TestProperties,
1150 };
1151 use std::sync::atomic::{AtomicUsize, Ordering};
1152
1153 fn registered_test(name: &str, deps: Vec<String>) -> RegisteredTest {
1154 registered_test_in_module(name, "", deps)
1155 }
1156
1157 fn registered_test_in_module(
1158 name: &str,
1159 module_path: &str,
1160 deps: Vec<String>,
1161 ) -> RegisteredTest {
1162 RegisteredTest {
1163 name: name.to_string(),
1164 crate_name: "tcrate".to_string(),
1165 module_path: module_path.to_string(),
1166 run: TestFunction::Sync(Arc::new(|_| Box::new(()))),
1167 props: TestProperties::default(),
1168 dependencies: Some(deps),
1169 }
1170 }
1171
1172 fn registered_cloneable_dep(name: &str, counter: Arc<AtomicUsize>) -> RegisteredDependency {
1175 registered_cloneable_dep_in(name, "", 0xdead_beef, counter)
1176 }
1177
1178 fn registered_cloneable_dep_in(
1183 name: &str,
1184 module_path: &str,
1185 constructor_value: u64,
1186 counter: Arc<AtomicUsize>,
1187 ) -> RegisteredDependency {
1188 let constructor_counter = counter.clone();
1189 let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
1190 constructor_counter.fetch_add(1, Ordering::SeqCst);
1191 Arc::new(constructor_value) as Arc<dyn Any + Send + Sync>
1192 }));
1193 let codec = CloneableCodec {
1194 to_wire: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
1195 let value: Arc<u64> = any.downcast::<u64>().unwrap();
1196 (*value).to_le_bytes().to_vec()
1197 }),
1198 from_wire_bytes: Arc::new(|bytes: &[u8]| {
1199 let arr: [u8; 8] = bytes.try_into().unwrap();
1200 let value = u64::from_le_bytes(arr);
1201 Arc::new(value) as Arc<dyn Any + Send + Sync>
1202 }),
1203 };
1204 RegisteredDependency {
1205 name: name.to_string(),
1206 crate_name: "tcrate".to_string(),
1207 module_path: module_path.to_string(),
1208 constructor,
1209 dependencies: Vec::new(),
1210 scope: DepScope::Cloneable,
1211 worker_fn: Some(crate::internal::WorkerReconstructor::Sync(Arc::new(
1212 |wire_payload, _deps| wire_payload,
1213 ))),
1214 cloneable_codec: Some(codec),
1215 hosted_codec: None,
1216 rpc_factory: None,
1217 companions: Vec::new(),
1218 }
1219 }
1220
1221 #[test]
1222 fn cloneable_wire_collection_runs_constructor_once_and_encodes_value() {
1223 let counter = Arc::new(AtomicUsize::new(0));
1224 let dep = registered_cloneable_dep("clone_dep", counter.clone());
1225 let test = registered_test("t1", vec!["clone_dep".to_string()]);
1226
1227 let (execution, _filtered) =
1228 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1229
1230 let collected = execution.collect_cloneable_wire_bytes_sync();
1231 assert_eq!(collected.len(), 1, "exactly one cloneable dep expected");
1232 let (dep_id, wire_bytes) = &collected[0];
1233 assert_eq!(
1234 dep_id, "tcrate::clone_dep",
1235 "wire bytes must be keyed by the fully-qualified id, not the local name"
1236 );
1237 assert_eq!(
1238 wire_bytes.as_slice(),
1239 &0xdead_beef_u64.to_le_bytes(),
1240 "expected the codec-encoded value to round-trip via to_wire"
1241 );
1242 assert_eq!(
1243 counter.load(Ordering::SeqCst),
1244 1,
1245 "constructor must have run exactly once when collecting"
1246 );
1247 }
1248
1249 #[test]
1250 fn prune_unused_deps_retains_companion_when_only_one_half_is_referenced() {
1251 let counter_a = Arc::new(AtomicUsize::new(0));
1269 let counter_b = Arc::new(AtomicUsize::new(0));
1270 let mut dep_a = registered_cloneable_dep("clone_a", counter_a.clone());
1271 let mut dep_b = registered_cloneable_dep("clone_b", counter_b.clone());
1272 dep_a.companions = vec!["clone_b".to_string()];
1273 dep_b.companions = vec!["clone_a".to_string()];
1274
1275 let test_a = registered_test("t_uses_a", vec!["clone_a".to_string()]);
1276
1277 let (execution, _filtered) =
1278 TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test_a], &[]);
1279
1280 let kept: Vec<String> = execution
1281 .collect_cloneable_dependencies()
1282 .into_iter()
1283 .map(|d| d.name)
1284 .collect();
1285 assert!(
1286 kept.contains(&"clone_a".to_string()),
1287 "directly referenced dep must be retained, kept = {kept:?}"
1288 );
1289 assert!(
1290 kept.contains(&"clone_b".to_string()),
1291 "companion of a retained dep must also be retained (the planner-only \
1292 sibling link used by `worker = both(...)`), kept = {kept:?}"
1293 );
1294
1295 let counter_a = Arc::new(AtomicUsize::new(0));
1298 let counter_b = Arc::new(AtomicUsize::new(0));
1299 let mut dep_a = registered_cloneable_dep("clone_a", counter_a.clone());
1300 let mut dep_b = registered_cloneable_dep("clone_b", counter_b.clone());
1301 dep_a.companions = vec!["clone_b".to_string()];
1302 dep_b.companions = vec!["clone_a".to_string()];
1303
1304 let test_b = registered_test("t_uses_b", vec!["clone_b".to_string()]);
1305
1306 let (execution, _filtered) =
1307 TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test_b], &[]);
1308
1309 let kept: Vec<String> = execution
1310 .collect_cloneable_dependencies()
1311 .into_iter()
1312 .map(|d| d.name)
1313 .collect();
1314 assert!(
1315 kept.contains(&"clone_a".to_string()),
1316 "companion of a stub-referenced dep must be retained, kept = {kept:?}"
1317 );
1318 assert!(
1319 kept.contains(&"clone_b".to_string()),
1320 "directly referenced dep must be retained, kept = {kept:?}"
1321 );
1322
1323 let counter_a = Arc::new(AtomicUsize::new(0));
1327 let counter_b = Arc::new(AtomicUsize::new(0));
1328 let dep_a = registered_cloneable_dep("clone_a", counter_a.clone());
1329 let dep_b = registered_cloneable_dep("clone_b", counter_b.clone());
1330 let test_a = registered_test("t_uses_a", vec!["clone_a".to_string()]);
1331
1332 let (execution, _filtered) =
1333 TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test_a], &[]);
1334
1335 let kept: Vec<String> = execution
1336 .collect_cloneable_dependencies()
1337 .into_iter()
1338 .map(|d| d.name)
1339 .collect();
1340 assert!(
1341 kept.contains(&"clone_a".to_string()),
1342 "directly referenced dep must be retained, kept = {kept:?}"
1343 );
1344 assert!(
1345 !kept.contains(&"clone_b".to_string()),
1346 "without a companion link, an unreferenced dep must be pruned; \
1347 kept = {kept:?}"
1348 );
1349 }
1350
1351 #[test]
1352 fn provide_cloneable_value_short_circuits_constructor() {
1353 let counter = Arc::new(AtomicUsize::new(0));
1354 let dep = registered_cloneable_dep("clone_dep", counter.clone());
1355 let test = registered_test("t1", vec!["clone_dep".to_string()]);
1356
1357 let (mut execution, _filtered) =
1358 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1359
1360 let pre_value: Arc<dyn Any + Send + Sync> = Arc::new(99_u64);
1361 let applied = execution.provide_cloneable_value("tcrate::clone_dep", pre_value);
1362 assert!(
1363 applied,
1364 "pre-populated value should match the dep's qualified id"
1365 );
1366
1367 let next = execution.pick_next_sync().expect("test should be picked");
1370 assert_eq!(next.test.name, "t1");
1371
1372 let view = next.deps.get("clone_dep").expect("dep available");
1373 let value: Arc<u64> = view.downcast::<u64>().unwrap();
1374 assert_eq!(*value, 99);
1375
1376 assert_eq!(
1377 counter.load(Ordering::SeqCst),
1378 0,
1379 "constructor must not run when a pre-populated value is supplied"
1380 );
1381 }
1382
1383 #[test]
1384 fn provided_shared_value_is_a_worker_side_leaf() {
1385 let provided_counter = Arc::new(AtomicUsize::new(0));
1386 let parent_only_counter = Arc::new(AtomicUsize::new(0));
1387 let mut provided_dep = registered_cloneable_dep("clone_dep", provided_counter.clone());
1388 provided_dep.dependencies = vec!["parent_only_dep".to_string()];
1389 let parent_only_dep =
1390 registered_cloneable_dep("parent_only_dep", parent_only_counter.clone());
1391 let test = registered_test("t1", vec!["clone_dep".to_string()]);
1392
1393 let (mut execution, _filtered) = TestSuiteExecution::construct(
1394 &Arguments::default(),
1395 &[provided_dep, parent_only_dep],
1396 &[test],
1397 &[],
1398 );
1399
1400 let pre_value: Arc<dyn Any + Send + Sync> = Arc::new(99_u64);
1401 let applied = execution.provide_cloneable_value("tcrate::clone_dep", pre_value);
1402 assert!(applied);
1403
1404 let next = execution.pick_next_sync().expect("test should be picked");
1405 let view = next.deps.get("clone_dep").expect("dep available");
1406 let value: Arc<u64> = view.downcast::<u64>().unwrap();
1407 assert_eq!(*value, 99);
1408 assert_eq!(
1409 provided_counter.load(Ordering::SeqCst),
1410 0,
1411 "worker-side provided values must not run their original constructor"
1412 );
1413 assert_eq!(
1414 parent_only_counter.load(Ordering::SeqCst),
1415 0,
1416 "constructor dependencies are parent-only once a value arrives from wire bytes or an RPC stub"
1417 );
1418 }
1419
1420 #[cfg(feature = "tokio")]
1425 #[test]
1426 fn async_cloneable_wire_collection_awaits_async_constructor() {
1427 use std::pin::Pin;
1428
1429 let counter = Arc::new(AtomicUsize::new(0));
1430 let constructor_counter = counter.clone();
1431
1432 let constructor = DependencyConstructor::Async(Arc::new(move |_view| {
1436 let counter = constructor_counter.clone();
1437 Box::pin(async move {
1438 tokio::task::yield_now().await;
1439 counter.fetch_add(1, Ordering::SeqCst);
1440 let value: u64 = 0xdead_beef;
1441 Arc::new(value) as Arc<dyn Any + Send + Sync>
1442 }) as Pin<Box<dyn std::future::Future<Output = Arc<dyn Any + Send + Sync>>>>
1443 }));
1444 let codec = CloneableCodec {
1445 to_wire: Arc::new(|any| {
1446 let v: Arc<u64> = any.downcast::<u64>().unwrap();
1447 (*v).to_le_bytes().to_vec()
1448 }),
1449 from_wire_bytes: Arc::new(|bytes| {
1450 let arr: [u8; 8] = bytes.try_into().unwrap();
1451 Arc::new(u64::from_le_bytes(arr)) as Arc<dyn Any + Send + Sync>
1452 }),
1453 };
1454 let dep = RegisteredDependency {
1455 name: "clone_dep".to_string(),
1456 crate_name: "tcrate".to_string(),
1457 module_path: String::new(),
1458 constructor,
1459 dependencies: Vec::new(),
1460 scope: DepScope::Cloneable,
1461 worker_fn: Some(crate::internal::WorkerReconstructor::Sync(Arc::new(
1462 |wire_payload, _| wire_payload,
1463 ))),
1464 cloneable_codec: Some(codec),
1465 hosted_codec: None,
1466 rpc_factory: None,
1467 companions: Vec::new(),
1468 };
1469 let test = registered_test("t1", vec!["clone_dep".to_string()]);
1470
1471 let (execution, _filtered) =
1472 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1473
1474 let runtime = tokio::runtime::Builder::new_current_thread()
1475 .enable_all()
1476 .build()
1477 .unwrap();
1478 let collected = runtime.block_on(execution.collect_cloneable_wire_bytes_async());
1479
1480 assert_eq!(collected.len(), 1);
1481 assert_eq!(collected[0].0, "tcrate::clone_dep");
1482 assert_eq!(collected[0].1.as_slice(), &0xdead_beef_u64.to_le_bytes());
1483 assert_eq!(
1484 counter.load(Ordering::SeqCst),
1485 1,
1486 "async constructor must have run exactly once"
1487 );
1488 }
1489
1490 #[test]
1494 fn cloneable_value_routing_uses_qualified_id_across_modules() {
1495 let counter_a = Arc::new(AtomicUsize::new(0));
1496 let counter_b = Arc::new(AtomicUsize::new(0));
1497
1498 let dep_a = registered_cloneable_dep_in("clone_dep", "mod_a", 11, counter_a.clone());
1500 let dep_b = registered_cloneable_dep_in("clone_dep", "mod_b", 22, counter_b.clone());
1501
1502 let test_a = registered_test_in_module("t_a", "mod_a", vec!["clone_dep".to_string()]);
1504 let test_b = registered_test_in_module("t_b", "mod_b", vec!["clone_dep".to_string()]);
1505
1506 let (execution, _filtered) = TestSuiteExecution::construct(
1507 &Arguments::default(),
1508 &[dep_a, dep_b],
1509 &[test_a, test_b],
1510 &[],
1511 );
1512
1513 let mut collected = execution.collect_cloneable_wire_bytes_sync();
1515 collected.sort_by(|l, r| l.0.cmp(&r.0));
1516 assert_eq!(collected.len(), 2);
1517 assert_eq!(collected[0].0, "tcrate::mod_a::clone_dep");
1518 assert_eq!(collected[1].0, "tcrate::mod_b::clone_dep");
1519 assert_eq!(collected[0].1.as_slice(), &11_u64.to_le_bytes());
1520 assert_eq!(collected[1].1.as_slice(), &22_u64.to_le_bytes());
1521
1522 let mut execution_a = execution;
1526 let applied_a =
1527 execution_a.provide_cloneable_value("tcrate::mod_a::clone_dep", Arc::new(111_u64));
1528 assert!(applied_a, "mod_a dep must be reachable by qualified id");
1529 let applied_b =
1530 execution_a.provide_cloneable_value("tcrate::mod_b::clone_dep", Arc::new(222_u64));
1531 assert!(applied_b, "mod_b dep must be reachable by qualified id");
1532
1533 let applied_unknown =
1535 execution_a.provide_cloneable_value("tcrate::mod_c::clone_dep", Arc::new(333_u64));
1536 assert!(
1537 !applied_unknown,
1538 "unknown qualified id must not be applied anywhere"
1539 );
1540
1541 let first = execution_a.pick_next_sync().expect("first test");
1543 let second = execution_a.pick_next_sync().expect("second test");
1544
1545 let pairs: Vec<(String, u64)> = [first, second]
1546 .into_iter()
1547 .map(|n| {
1548 let v: Arc<u64> = n
1549 .deps
1550 .get("clone_dep")
1551 .expect("dep available")
1552 .clone()
1553 .downcast()
1554 .unwrap();
1555 (n.test.name.clone(), *v)
1556 })
1557 .collect();
1558
1559 let val_a = pairs
1560 .iter()
1561 .find(|(n, _)| n == "t_a")
1562 .expect("t_a picked")
1563 .1;
1564 let val_b = pairs
1565 .iter()
1566 .find(|(n, _)| n == "t_b")
1567 .expect("t_b picked")
1568 .1;
1569 assert_eq!(
1570 val_a, 111,
1571 "mod_a test must see mod_a's pre-populated value"
1572 );
1573 assert_eq!(
1574 val_b, 222,
1575 "mod_b test must see mod_b's pre-populated value"
1576 );
1577
1578 assert_eq!(
1583 counter_a.load(Ordering::SeqCst),
1584 1,
1585 "mod_a constructor must have run exactly once (during wire collection)"
1586 );
1587 assert_eq!(
1588 counter_b.load(Ordering::SeqCst),
1589 1,
1590 "mod_b constructor must have run exactly once (during wire collection)"
1591 );
1592 }
1593
1594 fn registered_hosted_dep(
1603 name: &str,
1604 payload: u64,
1605 owner_counter: Arc<AtomicUsize>,
1606 ) -> RegisteredDependency {
1607 registered_hosted_dep_in(name, "", payload, owner_counter)
1608 }
1609
1610 fn registered_hosted_dep_in(
1615 name: &str,
1616 module_path: &str,
1617 payload: u64,
1618 owner_counter: Arc<AtomicUsize>,
1619 ) -> RegisteredDependency {
1620 let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
1621 owner_counter.fetch_add(1, Ordering::SeqCst);
1622 Arc::new(payload) as Arc<dyn Any + Send + Sync>
1623 }));
1624 let codec = CloneableCodec {
1625 to_wire: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
1627 let v: Arc<u64> = any.downcast::<u64>().unwrap();
1628 (*v).to_le_bytes().to_vec()
1629 }),
1630 from_wire_bytes: Arc::new(|bytes: &[u8]| {
1633 let boxed: Vec<u8> = bytes.to_vec();
1634 Arc::new(boxed) as Arc<dyn Any + Send + Sync>
1635 }),
1636 };
1637 let worker_fn =
1638 crate::internal::WorkerReconstructor::Sync(Arc::new(|wire_payload, _deps| {
1639 let bytes_arc: Arc<Vec<u8>> = wire_payload.downcast::<Vec<u8>>().unwrap();
1640 let arr: [u8; 8] = (*bytes_arc).as_slice().try_into().unwrap();
1641 let value: u64 = u64::from_le_bytes(arr);
1642 Arc::new(value) as Arc<dyn Any + Send + Sync>
1643 }));
1644 RegisteredDependency {
1645 name: name.to_string(),
1646 crate_name: "tcrate".to_string(),
1647 module_path: module_path.to_string(),
1648 constructor,
1649 dependencies: Vec::new(),
1650 scope: DepScope::Hosted,
1651 worker_fn: Some(worker_fn),
1652 cloneable_codec: None,
1653 hosted_codec: Some(codec),
1654 rpc_factory: None,
1655 companions: Vec::new(),
1656 }
1657 }
1658
1659 #[test]
1660 fn hosted_descriptor_collection_runs_owner_once_and_keeps_it_alive() {
1661 let owner_counter = Arc::new(AtomicUsize::new(0));
1662 let dep = registered_hosted_dep("hosted_dep", 0xcafe_babe_dead_beef, owner_counter.clone());
1663 let test = registered_test("t1", vec!["hosted_dep".to_string()]);
1664
1665 let (execution, _filtered) =
1666 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1667
1668 let (descriptors, owners) = execution.collect_hosted_descriptor_bytes_sync();
1669 assert_eq!(descriptors.len(), 1, "exactly one hosted dep expected");
1670 assert_eq!(owners.len(), 1, "exactly one hosted owner kept alive");
1671
1672 let (dep_id, descriptor_bytes) = &descriptors[0];
1673 assert_eq!(
1674 dep_id, "tcrate::hosted_dep",
1675 "descriptor must be keyed by the fully-qualified id"
1676 );
1677 assert_eq!(
1678 descriptor_bytes.as_slice(),
1679 &0xcafe_babe_dead_beef_u64.to_le_bytes(),
1680 "expected descriptor bytes to match codec.to_wire of payload"
1681 );
1682 assert_eq!(
1683 owner_counter.load(Ordering::SeqCst),
1684 1,
1685 "owner constructor must have run exactly once"
1686 );
1687
1688 let held: Arc<u64> = owners[0].clone().downcast::<u64>().unwrap();
1691 assert_eq!(*held, 0xcafe_babe_dead_beef);
1692 }
1693
1694 #[test]
1695 fn hosted_descriptor_roundtrips_to_worker_value_via_provide_cloneable_value() {
1696 let owner_counter = Arc::new(AtomicUsize::new(0));
1697 let dep = registered_hosted_dep("hosted_dep", 0x1234_5678_u64, owner_counter.clone());
1698 let test = registered_test("t1", vec!["hosted_dep".to_string()]);
1699
1700 let (mut execution, _filtered) =
1701 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1702
1703 let pre_value: Arc<dyn Any + Send + Sync> = Arc::new(0x1234_5678_u64);
1707 let applied = execution.provide_cloneable_value("tcrate::hosted_dep", pre_value);
1708 assert!(
1709 applied,
1710 "Hosted dep must accept pre-populated values via the same path as Cloneable"
1711 );
1712
1713 let next = execution.pick_next_sync().expect("test should be picked");
1716 let view = next.deps.get("hosted_dep").expect("dep available");
1717 let value: Arc<u64> = view.downcast::<u64>().unwrap();
1718 assert_eq!(*value, 0x1234_5678);
1719 assert_eq!(
1720 owner_counter.load(Ordering::SeqCst),
1721 0,
1722 "Hosted owner constructor must not run on the worker side"
1723 );
1724 }
1725
1726 #[test]
1727 fn has_hosted_dependencies_reports_correctly() {
1728 let dep = registered_hosted_dep("h", 0, Arc::new(AtomicUsize::new(0)));
1729 let test = registered_test("t1", vec!["h".to_string()]);
1730 let (execution, _filtered) =
1731 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1732 assert!(execution.has_hosted_dependencies());
1733 assert!(!execution.has_shared_dependencies());
1734 assert!(!execution.has_cloneable_dependencies());
1735 }
1736
1737 #[test]
1740 fn hosted_owner_runs_exactly_once_even_when_collecting_multiple_times() {
1741 let counter_a = Arc::new(AtomicUsize::new(0));
1745 let counter_b = Arc::new(AtomicUsize::new(0));
1746
1747 let mut dep_a = registered_hosted_dep("hosted_a", 1, counter_a.clone());
1749 dep_a.name = "hosted_a".to_string();
1750 let mut dep_b = registered_hosted_dep("hosted_b", 2, counter_b.clone());
1751 dep_b.name = "hosted_b".to_string();
1752 let test = registered_test("t1", vec!["hosted_a".to_string(), "hosted_b".to_string()]);
1753
1754 let (execution, _filtered) =
1755 TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test], &[]);
1756
1757 let (descriptors, owners) = execution.collect_hosted_descriptor_bytes_sync();
1758 assert_eq!(descriptors.len(), 2);
1759 assert_eq!(owners.len(), 2);
1760 assert_eq!(counter_a.load(Ordering::SeqCst), 1);
1761 assert_eq!(counter_b.load(Ordering::SeqCst), 1);
1762 }
1763
1764 #[test]
1770 fn hosted_descriptor_routing_uses_qualified_id_across_modules() {
1771 let counter_a = Arc::new(AtomicUsize::new(0));
1772 let counter_b = Arc::new(AtomicUsize::new(0));
1773
1774 let dep_a = registered_hosted_dep_in("hosted_dep", "mod_a", 11, counter_a.clone());
1776 let dep_b = registered_hosted_dep_in("hosted_dep", "mod_b", 22, counter_b.clone());
1777
1778 let test_a = registered_test_in_module("t_a", "mod_a", vec!["hosted_dep".to_string()]);
1779 let test_b = registered_test_in_module("t_b", "mod_b", vec!["hosted_dep".to_string()]);
1780
1781 let (execution, _filtered) = TestSuiteExecution::construct(
1782 &Arguments::default(),
1783 &[dep_a, dep_b],
1784 &[test_a, test_b],
1785 &[],
1786 );
1787
1788 let (mut descriptors, _owners) = execution.collect_hosted_descriptor_bytes_sync();
1790 descriptors.sort_by(|l, r| l.0.cmp(&r.0));
1791 assert_eq!(descriptors.len(), 2);
1792 assert_eq!(descriptors[0].0, "tcrate::mod_a::hosted_dep");
1793 assert_eq!(descriptors[1].0, "tcrate::mod_b::hosted_dep");
1794 assert_eq!(descriptors[0].1.as_slice(), &11_u64.to_le_bytes());
1795 assert_eq!(descriptors[1].1.as_slice(), &22_u64.to_le_bytes());
1796
1797 let mut execution = execution;
1802 let applied_a =
1803 execution.provide_cloneable_value("tcrate::mod_a::hosted_dep", Arc::new(111_u64));
1804 assert!(
1805 applied_a,
1806 "mod_a hosted dep must be reachable by qualified id"
1807 );
1808 let applied_b =
1809 execution.provide_cloneable_value("tcrate::mod_b::hosted_dep", Arc::new(222_u64));
1810 assert!(
1811 applied_b,
1812 "mod_b hosted dep must be reachable by qualified id"
1813 );
1814
1815 let applied_unknown =
1816 execution.provide_cloneable_value("tcrate::mod_c::hosted_dep", Arc::new(333_u64));
1817 assert!(
1818 !applied_unknown,
1819 "unknown qualified id must not be applied to any dep"
1820 );
1821
1822 let first = execution.pick_next_sync().expect("first test");
1823 let second = execution.pick_next_sync().expect("second test");
1824 let pairs: Vec<(String, u64)> = [first, second]
1825 .into_iter()
1826 .map(|n| {
1827 let v: Arc<u64> = n
1828 .deps
1829 .get("hosted_dep")
1830 .expect("dep available")
1831 .clone()
1832 .downcast()
1833 .unwrap();
1834 (n.test.name.clone(), *v)
1835 })
1836 .collect();
1837
1838 let val_a = pairs
1839 .iter()
1840 .find(|(n, _)| n == "t_a")
1841 .expect("t_a picked")
1842 .1;
1843 let val_b = pairs
1844 .iter()
1845 .find(|(n, _)| n == "t_b")
1846 .expect("t_b picked")
1847 .1;
1848 assert_eq!(val_a, 111);
1849 assert_eq!(val_b, 222);
1850
1851 assert_eq!(counter_a.load(Ordering::SeqCst), 1);
1856 assert_eq!(counter_b.load(Ordering::SeqCst), 1);
1857 }
1858
1859 #[test]
1868 fn hosted_no_spawn_workers_uses_worker_side_handle() {
1869 let owner_counter = Arc::new(AtomicUsize::new(0));
1874 let constructor_counter = owner_counter.clone();
1875 let owner_value: u64 = 0xAAAA_AAAA_AAAA_AAAA_u64;
1876 let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
1877 constructor_counter.fetch_add(1, Ordering::SeqCst);
1878 Arc::new(owner_value) as Arc<dyn Any + Send + Sync>
1879 }));
1880 let codec = CloneableCodec {
1886 to_wire: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
1887 let v: Arc<u64> = any.downcast::<u64>().unwrap();
1888 (*v).to_le_bytes().to_vec()
1889 }),
1890 from_wire_bytes: Arc::new(|bytes: &[u8]| {
1891 let boxed: Vec<u8> = bytes.to_vec();
1892 Arc::new(boxed) as Arc<dyn Any + Send + Sync>
1893 }),
1894 };
1895 let worker_fn =
1896 crate::internal::WorkerReconstructor::Sync(Arc::new(|wire_payload, _deps| {
1897 let bytes_arc: Arc<Vec<u8>> = wire_payload.downcast::<Vec<u8>>().unwrap();
1898 let arr: [u8; 8] = (*bytes_arc).as_slice().try_into().unwrap();
1899 let raw: u64 = u64::from_le_bytes(arr);
1900 let handle_value: u64 = !raw;
1901 Arc::new(handle_value) as Arc<dyn Any + Send + Sync>
1902 }));
1903 let dep = RegisteredDependency {
1904 name: "hosted_dep".to_string(),
1905 crate_name: "tcrate".to_string(),
1906 module_path: String::new(),
1907 constructor,
1908 dependencies: Vec::new(),
1909 scope: DepScope::Hosted,
1910 worker_fn: Some(worker_fn.clone()),
1911 cloneable_codec: None,
1912 hosted_codec: Some(codec.clone()),
1913 rpc_factory: None,
1914 companions: Vec::new(),
1915 };
1916 let test = registered_test("t1", vec!["hosted_dep".to_string()]);
1917
1918 let (mut execution, _filtered) =
1919 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1920
1921 let (descriptors, owners) = execution.collect_hosted_descriptor_bytes_sync();
1925 assert_eq!(descriptors.len(), 1);
1926 assert_eq!(owners.len(), 1);
1927 let (dep_id, wire_bytes) = &descriptors[0];
1928
1929 let wire_payload = (codec.from_wire_bytes)(wire_bytes.as_slice());
1933 let empty_deps: Arc<dyn crate::internal::DependencyView + Send + Sync> =
1934 Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
1935 let reconstructed = match &worker_fn {
1936 crate::internal::WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
1937 crate::internal::WorkerReconstructor::Async(_) => unreachable!(),
1938 };
1939 let applied = execution.provide_cloneable_value(dep_id, reconstructed);
1940 assert!(applied);
1941
1942 let next = execution.pick_next_sync().expect("test picked");
1945 let view = next.deps.get("hosted_dep").expect("dep available");
1946 let value: Arc<u64> = view.clone().downcast::<u64>().unwrap();
1947 assert_eq!(
1948 *value,
1949 !owner_value,
1950 "Hosted dep must expose the worker-side handle (from_descriptor) even in the no-spawn-workers path"
1951 );
1952 assert_eq!(
1953 owner_counter.load(Ordering::SeqCst),
1954 1,
1955 "owner constructor must have run exactly once during descriptor collection"
1956 );
1957 }
1958
1959 #[test]
1963 fn hosted_dep_with_owner_dependencies_constructs_in_parent_context() {
1964 let dep_counter = Arc::new(AtomicUsize::new(0));
1965 let owner_counter = Arc::new(AtomicUsize::new(0));
1966 let dep = registered_cloneable_dep("some_other_dep", dep_counter.clone());
1967 let mut hosted = registered_hosted_dep("h_with_deps", 0, owner_counter.clone());
1968 hosted.dependencies = vec!["some_other_dep".to_string()];
1969 let test = registered_test("t1", vec!["h_with_deps".to_string()]);
1970 let (execution, _filtered) =
1971 TestSuiteExecution::construct(&Arguments::default(), &[dep, hosted], &[test], &[]);
1972 let collected = execution.collect_parent_shared_dependencies_sync();
1973
1974 assert_eq!(collected.hosted_descriptor_bytes.len(), 1);
1975 assert_eq!(dep_counter.load(Ordering::SeqCst), 1);
1976 assert_eq!(owner_counter.load(Ordering::SeqCst), 1);
1977 }
1978
1979 #[cfg(feature = "tokio")]
1982 #[test]
1983 fn async_hosted_descriptor_collection_awaits_async_constructor() {
1984 use std::pin::Pin;
1985
1986 let counter = Arc::new(AtomicUsize::new(0));
1987 let constructor_counter = counter.clone();
1988
1989 let constructor = DependencyConstructor::Async(Arc::new(move |_view| {
1990 let counter = constructor_counter.clone();
1991 Box::pin(async move {
1992 tokio::task::yield_now().await;
1993 counter.fetch_add(1, Ordering::SeqCst);
1994 let value: u64 = 42;
1995 Arc::new(value) as Arc<dyn Any + Send + Sync>
1996 }) as Pin<Box<dyn std::future::Future<Output = Arc<dyn Any + Send + Sync>>>>
1997 }));
1998 let codec = CloneableCodec {
1999 to_wire: Arc::new(|any| {
2000 let v: Arc<u64> = any.downcast::<u64>().unwrap();
2001 (*v).to_le_bytes().to_vec()
2002 }),
2003 from_wire_bytes: Arc::new(|bytes| {
2004 let boxed: Vec<u8> = bytes.to_vec();
2005 Arc::new(boxed) as Arc<dyn Any + Send + Sync>
2006 }),
2007 };
2008 let dep = RegisteredDependency {
2009 name: "hosted_async".to_string(),
2010 crate_name: "tcrate".to_string(),
2011 module_path: String::new(),
2012 constructor,
2013 dependencies: Vec::new(),
2014 scope: DepScope::Hosted,
2015 worker_fn: Some(crate::internal::WorkerReconstructor::Sync(Arc::new(
2016 |wire_payload, _| {
2017 let bytes_arc: Arc<Vec<u8>> = wire_payload.downcast::<Vec<u8>>().unwrap();
2018 let arr: [u8; 8] = (*bytes_arc).as_slice().try_into().unwrap();
2019 let value: u64 = u64::from_le_bytes(arr);
2020 Arc::new(value) as Arc<dyn Any + Send + Sync>
2021 },
2022 ))),
2023 cloneable_codec: None,
2024 hosted_codec: Some(codec),
2025 rpc_factory: None,
2026 companions: Vec::new(),
2027 };
2028 let test = registered_test("t1", vec!["hosted_async".to_string()]);
2029
2030 let (execution, _filtered) =
2031 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2032
2033 let runtime = tokio::runtime::Builder::new_current_thread()
2034 .enable_all()
2035 .build()
2036 .unwrap();
2037 let (descriptors, owners) =
2038 runtime.block_on(execution.collect_hosted_descriptor_bytes_async());
2039
2040 assert_eq!(descriptors.len(), 1);
2041 assert_eq!(owners.len(), 1);
2042 assert_eq!(descriptors[0].0, "tcrate::hosted_async");
2043 assert_eq!(descriptors[0].1.as_slice(), &42_u64.to_le_bytes());
2044 assert_eq!(counter.load(Ordering::SeqCst), 1);
2045
2046 let held: Arc<u64> = owners[0].clone().downcast::<u64>().unwrap();
2047 assert_eq!(*held, 42);
2048 }
2049
2050 use crate::internal::{
2055 HostedRpcChannel, HostedRpcDep, HostedRpcError, HostedRpcOwnerCell, HostedRpcTransport,
2056 InProcessHostedRpcTransport, RpcFactory,
2057 };
2058
2059 struct RpcCounter {
2063 n: u64,
2064 }
2065
2066 impl HostedRpcDep for RpcCounter {
2067 type Stub = RpcCounterStub;
2068 fn dispatch(&mut self, method_idx: u32, args: &[u8]) -> Result<Vec<u8>, String> {
2069 match method_idx {
2070 1 => {
2071 self.n += 1;
2072 Ok(self.n.to_be_bytes().to_vec())
2073 }
2074 2 => {
2079 let arr: [u8; 4] = args
2080 .try_into()
2081 .map_err(|_| "method_idx=2 requires exactly 4 bytes (size)".to_string())?;
2082 let size = u32::from_be_bytes(arr) as usize;
2083 let mut out = vec![0u8; size];
2084 for (i, b) in out.iter_mut().enumerate() {
2085 *b = (i % 251) as u8;
2086 }
2087 Ok(out)
2088 }
2089 other => Err(format!("RpcCounter: unknown method_idx {other}")),
2090 }
2091 }
2092 fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
2093 RpcCounterStub { channel }
2094 }
2095 }
2096
2097 struct RpcCounterStub {
2099 channel: HostedRpcChannel,
2100 }
2101
2102 impl RpcCounterStub {
2103 fn next(&self) -> u64 {
2104 let bytes = self.channel.call(1, Vec::new()).expect("rpc call");
2105 let arr: [u8; 8] = bytes.as_slice().try_into().unwrap();
2106 u64::from_be_bytes(arr)
2107 }
2108
2109 fn echo(&self, size: u32) -> Vec<u8> {
2111 self.channel
2112 .call(2, size.to_be_bytes().to_vec())
2113 .expect("echo rpc call")
2114 }
2115 }
2116
2117 fn registered_hosted_rpc_dep(
2122 name: &str,
2123 module_path: &str,
2124 owner_counter: Arc<AtomicUsize>,
2125 ) -> RegisteredDependency {
2126 let ctor_counter = owner_counter.clone();
2127 let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
2128 ctor_counter.fetch_add(1, Ordering::SeqCst);
2129 let cell = HostedRpcOwnerCell::from_owner(RpcCounter { n: 0 });
2130 Arc::new(cell) as Arc<dyn Any + Send + Sync>
2131 }));
2132 let factory = RpcFactory {
2133 owner_into_cell: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
2134 any.downcast::<HostedRpcOwnerCell>()
2135 .expect("HostedRpc owner downcast")
2136 }),
2137 build_stub: Arc::new(|channel: HostedRpcChannel| {
2138 let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
2139 Arc::new(stub) as Arc<dyn Any + Send + Sync>
2140 }),
2141 };
2142 RegisteredDependency {
2143 name: name.to_string(),
2144 crate_name: "tcrate".to_string(),
2145 module_path: module_path.to_string(),
2146 constructor,
2147 dependencies: Vec::new(),
2148 scope: DepScope::HostedRpc,
2149 worker_fn: None,
2150 cloneable_codec: None,
2151 hosted_codec: None,
2152 rpc_factory: Some(factory),
2153 companions: Vec::new(),
2154 }
2155 }
2156
2157 #[test]
2158 fn hosted_rpc_owner_cells_collected_once_and_keyed_by_qualified_id() {
2159 let counter = Arc::new(AtomicUsize::new(0));
2160 let dep = registered_hosted_rpc_dep("rpc_dep", "", counter.clone());
2161 let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2162
2163 let (execution, _filtered) =
2164 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2165
2166 assert!(execution.has_hosted_rpc_dependencies());
2167
2168 let cells = execution.collect_hosted_rpc_owner_cells_sync();
2169 assert_eq!(cells.len(), 1, "exactly one hosted rpc dep expected");
2170 let (dep_id, _cell) = &cells[0];
2171 assert_eq!(dep_id, "tcrate::rpc_dep");
2172 assert_eq!(
2173 counter.load(Ordering::SeqCst),
2174 1,
2175 "owner constructor must run exactly once on the parent"
2176 );
2177
2178 let cells_b = execution.collect_hosted_rpc_owner_cells_sync();
2183 assert_eq!(cells_b.len(), 1);
2184 assert_eq!(
2185 counter.load(Ordering::SeqCst),
2186 2,
2187 "collect_hosted_rpc_owner_cells_sync runs the constructor on every call; \
2188 callers (the runner) are responsible for only calling it once per suite"
2189 );
2190 }
2191
2192 #[test]
2193 fn hosted_rpc_owner_dependencies_construct_in_parent_context() {
2194 let parent_only_counter = Arc::new(AtomicUsize::new(0));
2195 let owner_counter = Arc::new(AtomicUsize::new(0));
2196 let parent_only_dep =
2197 registered_cloneable_dep("parent_only_dep", parent_only_counter.clone());
2198 let mut rpc_dep = registered_hosted_rpc_dep("rpc_dep", "", owner_counter.clone());
2199 rpc_dep.dependencies = vec!["parent_only_dep".to_string()];
2200 let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2201
2202 let (execution, _filtered) = TestSuiteExecution::construct(
2203 &Arguments::default(),
2204 &[parent_only_dep, rpc_dep],
2205 &[test],
2206 &[],
2207 );
2208
2209 let cells = execution.collect_hosted_rpc_owner_cells_sync();
2210 assert_eq!(cells.len(), 1);
2211 assert_eq!(parent_only_counter.load(Ordering::SeqCst), 1);
2212 assert_eq!(owner_counter.load(Ordering::SeqCst), 1);
2213 }
2214
2215 #[test]
2216 fn hosted_rpc_in_process_transport_routes_to_owner_cell() {
2217 let counter = Arc::new(AtomicUsize::new(0));
2218 let dep = registered_hosted_rpc_dep("rpc_dep", "", counter.clone());
2219 let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2220
2221 let (execution, _filtered) =
2222 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2223
2224 let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
2225 .collect_hosted_rpc_owner_cells_sync()
2226 .into_iter()
2227 .collect();
2228
2229 let transport: Arc<dyn HostedRpcTransport> =
2230 Arc::new(InProcessHostedRpcTransport::new(cells.clone()));
2231 let channel = HostedRpcChannel::new("tcrate::rpc_dep".to_string(), transport.clone());
2232 let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
2233
2234 assert_eq!(stub.next(), 1);
2235 assert_eq!(stub.next(), 2);
2236 assert_eq!(stub.next(), 3);
2237 }
2238
2239 #[test]
2240 fn hosted_rpc_in_process_transport_returns_dispatch_error_on_unknown_method() {
2241 let counter = Arc::new(AtomicUsize::new(0));
2242 let dep = registered_hosted_rpc_dep("rpc_dep", "", counter);
2243 let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2244
2245 let (execution, _filtered) =
2246 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2247
2248 let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
2249 .collect_hosted_rpc_owner_cells_sync()
2250 .into_iter()
2251 .collect();
2252 let transport: Arc<dyn HostedRpcTransport> =
2253 Arc::new(InProcessHostedRpcTransport::new(cells.clone()));
2254 let channel = HostedRpcChannel::new("tcrate::rpc_dep".to_string(), transport.clone());
2255
2256 let err = channel.call(999, Vec::new()).unwrap_err();
2259 match err {
2260 HostedRpcError::Dispatch(msg) => {
2261 assert!(
2262 msg.contains("unknown method_idx 999"),
2263 "expected dispatch error to mention method_idx, got '{msg}'"
2264 );
2265 }
2266 HostedRpcError::Transport(msg) => {
2267 panic!("expected Dispatch error, got Transport({msg})");
2268 }
2269 }
2270 }
2271
2272 #[test]
2273 fn hosted_rpc_in_process_transport_returns_transport_error_on_unknown_dep_id() {
2274 let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = HashMap::new();
2275 let transport: Arc<dyn HostedRpcTransport> =
2276 Arc::new(InProcessHostedRpcTransport::new(cells));
2277 let channel = HostedRpcChannel::new("tcrate::missing_dep".to_string(), transport.clone());
2278 let err = channel.call(1, Vec::new()).unwrap_err();
2279 match err {
2280 HostedRpcError::Transport(msg) => {
2281 assert!(
2282 msg.contains("unknown dep id 'tcrate::missing_dep'"),
2283 "expected transport error to mention dep id, got '{msg}'"
2284 );
2285 }
2286 HostedRpcError::Dispatch(msg) => {
2287 panic!("expected Transport error, got Dispatch({msg})");
2288 }
2289 }
2290 }
2291
2292 struct PanickingRpcOwner;
2302
2303 impl HostedRpcDep for PanickingRpcOwner {
2304 type Stub = RpcCounterStub;
2305 fn dispatch(&mut self, _method_idx: u32, _args: &[u8]) -> Result<Vec<u8>, String> {
2306 panic!("owner_panic_for_test");
2307 }
2308 fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
2309 RpcCounterStub { channel }
2310 }
2311 }
2312
2313 #[cfg(feature = "tokio")]
2318 struct AsyncRpcCounter {
2319 n: u64,
2320 }
2321
2322 #[cfg(feature = "tokio")]
2323 impl crate::internal::AsyncHostedRpcDep for AsyncRpcCounter {
2324 type Stub = RpcCounterStub;
2325 async fn dispatch(&mut self, method_idx: u32, _args: &[u8]) -> Result<Vec<u8>, String> {
2326 ::tokio::task::yield_now().await;
2329 if method_idx == 1 {
2330 self.n += 1;
2331 Ok(self.n.to_be_bytes().to_vec())
2332 } else {
2333 Err(format!("AsyncRpcCounter: unknown method_idx {method_idx}"))
2334 }
2335 }
2336 fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
2337 RpcCounterStub { channel }
2338 }
2339 }
2340
2341 #[cfg(feature = "tokio")]
2345 struct PanickingAsyncRpcOwner;
2346
2347 #[cfg(feature = "tokio")]
2348 impl crate::internal::AsyncHostedRpcDep for PanickingAsyncRpcOwner {
2349 type Stub = RpcCounterStub;
2350 async fn dispatch(&mut self, _method_idx: u32, _args: &[u8]) -> Result<Vec<u8>, String> {
2351 ::tokio::task::yield_now().await;
2352 panic!("async_owner_panic_for_test");
2353 }
2354 fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
2355 RpcCounterStub { channel }
2356 }
2357 }
2358
2359 #[cfg(feature = "tokio")]
2363 #[test]
2364 fn async_hosted_rpc_owner_dispatches_through_async_cell() {
2365 let cell = HostedRpcOwnerCell::from_async_owner(AsyncRpcCounter { n: 0 });
2366 let rt = ::tokio::runtime::Builder::new_multi_thread()
2367 .enable_all()
2368 .build()
2369 .expect("build tokio runtime");
2370
2371 let bytes_a = rt
2372 .block_on(cell.dispatch_async(1, &[]))
2373 .expect("first async dispatch must succeed");
2374 assert_eq!(bytes_a, 1u64.to_be_bytes().to_vec());
2375
2376 let bytes_b = rt
2377 .block_on(cell.dispatch_async(1, &[]))
2378 .expect("second async dispatch must succeed");
2379 assert_eq!(bytes_b, 2u64.to_be_bytes().to_vec());
2380 }
2381
2382 #[cfg(feature = "tokio")]
2387 #[test]
2388 fn async_hosted_rpc_owner_panic_surfaces_then_poisons() {
2389 let cell = HostedRpcOwnerCell::from_async_owner(PanickingAsyncRpcOwner);
2390 let rt = ::tokio::runtime::Builder::new_multi_thread()
2391 .enable_all()
2392 .build()
2393 .expect("build tokio runtime");
2394
2395 let err1 = rt
2396 .block_on(cell.dispatch_async(1, &[]))
2397 .expect_err("first async dispatch must surface the panic as Err");
2398 assert!(
2399 err1.contains("hosted rpc owner panicked: async_owner_panic_for_test"),
2400 "expected first-call error to wrap the async panic payload, got '{err1}'"
2401 );
2402
2403 let err2 = rt
2404 .block_on(cell.dispatch_async(1, &[]))
2405 .expect_err("second async dispatch must short-circuit on the poisoned cell");
2406 assert_eq!(
2407 err2, "hosted rpc owner poisoned",
2408 "expected poisoned-cell error on the second async call, got '{err2}'"
2409 );
2410 }
2411
2412 #[cfg(feature = "tokio")]
2420 #[test]
2421 fn async_hosted_rpc_owner_poison_blocks_concurrent_waiter() {
2422 use std::sync::atomic::{AtomicUsize, Ordering};
2423 use std::sync::Arc;
2424 use std::time::Duration;
2425
2426 struct OnePanicThenForbidden {
2429 entries: Arc<AtomicUsize>,
2430 }
2431
2432 impl crate::internal::AsyncHostedRpcDep for OnePanicThenForbidden {
2433 type Stub = RpcCounterStub;
2434 async fn dispatch(
2435 &mut self,
2436 _method_idx: u32,
2437 _args: &[u8],
2438 ) -> Result<Vec<u8>, String> {
2439 let n = self.entries.fetch_add(1, Ordering::SeqCst);
2440 if n == 0 {
2443 ::tokio::time::sleep(Duration::from_millis(50)).await;
2444 panic!("first_dispatch_panic_poison_race");
2445 }
2446 panic!("second_dispatch_unexpectedly_re_entered_after_poison");
2450 }
2451 fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
2452 RpcCounterStub { channel }
2453 }
2454 }
2455
2456 let entries = Arc::new(AtomicUsize::new(0));
2457 let cell = Arc::new(HostedRpcOwnerCell::from_async_owner(
2458 OnePanicThenForbidden {
2459 entries: entries.clone(),
2460 },
2461 ));
2462
2463 let rt = ::tokio::runtime::Builder::new_multi_thread()
2464 .enable_all()
2465 .build()
2466 .expect("build tokio runtime");
2467
2468 rt.block_on(async {
2469 let cell_a = cell.clone();
2470 let cell_b = cell.clone();
2471
2472 let first = ::tokio::spawn(async move { cell_a.dispatch_async(1, &[]).await });
2473
2474 ::tokio::time::sleep(Duration::from_millis(5)).await;
2477
2478 let second = ::tokio::spawn(async move { cell_b.dispatch_async(1, &[]).await });
2479
2480 let first_res = first.await.expect("first task must not be cancelled");
2481 let second_res = second.await.expect("second task must not be cancelled");
2482
2483 let first_err =
2484 first_res.expect_err("first dispatch must surface the panic as Err, not Ok");
2485 assert!(
2486 first_err.contains("hosted rpc owner panicked: first_dispatch_panic_poison_race"),
2487 "expected the first call to surface the panic; got '{first_err}'"
2488 );
2489
2490 let second_err = second_res
2491 .expect_err("second dispatch must short-circuit on the poisoned cell, not Ok");
2492 assert_eq!(
2493 second_err, "hosted rpc owner poisoned",
2494 "expected the second waiter to see the poison flag; got '{second_err}'"
2495 );
2496 });
2497
2498 assert_eq!(
2502 entries.load(Ordering::SeqCst),
2503 1,
2504 "owner dispatcher must run at most once across the poisoned pair"
2505 );
2506 }
2507
2508 #[cfg(feature = "tokio")]
2512 #[test]
2513 fn async_hosted_rpc_dispatch_blocking_drives_async_cell() {
2514 let cell = HostedRpcOwnerCell::from_async_owner(AsyncRpcCounter { n: 0 });
2515 let rt = ::tokio::runtime::Builder::new_multi_thread()
2516 .enable_all()
2517 .build()
2518 .expect("build tokio runtime");
2519 let bytes = rt
2520 .block_on(async {
2521 ::tokio::task::spawn_blocking(move || cell.dispatch_blocking(1, &[])).await
2522 })
2523 .expect("spawn_blocking joined")
2524 .expect("dispatch_blocking must succeed against an async cell on multi-thread rt");
2525 assert_eq!(bytes, 1u64.to_be_bytes().to_vec());
2526 }
2527
2528 #[cfg(feature = "tokio")]
2533 #[test]
2534 fn async_hosted_rpc_dispatch_blocking_rejects_current_thread_runtime() {
2535 let cell = HostedRpcOwnerCell::from_async_owner(AsyncRpcCounter { n: 0 });
2536 let rt = ::tokio::runtime::Builder::new_current_thread()
2537 .enable_all()
2538 .build()
2539 .expect("build current-thread tokio runtime");
2540 let err = rt
2543 .block_on(async { cell.dispatch_blocking(1, &[]) })
2544 .expect_err("dispatch_blocking must reject current-thread runtimes cleanly");
2545 assert!(
2546 err.contains("multi-threaded"),
2547 "expected the rejection error to mention multi-threaded requirement, got '{err}'"
2548 );
2549 }
2550
2551 #[cfg(feature = "tokio")]
2556 #[test]
2557 fn async_hosted_rpc_in_process_transport_routes_to_async_cell() {
2558 use std::collections::HashMap;
2559 use std::sync::Arc;
2560
2561 let dep_id = "in_process_async_owner".to_string();
2562 let cell = Arc::new(HostedRpcOwnerCell::from_async_owner(AsyncRpcCounter {
2563 n: 0,
2564 }));
2565 let mut cells = HashMap::new();
2566 cells.insert(dep_id.clone(), cell);
2567
2568 let transport: Arc<dyn crate::internal::HostedRpcTransport> =
2569 Arc::new(InProcessHostedRpcTransport::new(cells));
2570
2571 let rt = ::tokio::runtime::Builder::new_multi_thread()
2572 .enable_all()
2573 .build()
2574 .expect("build tokio runtime");
2575
2576 let transport_clone = transport.clone();
2577 let dep_id_clone = dep_id.clone();
2578 let bytes = rt
2579 .block_on(async move {
2580 ::tokio::task::spawn_blocking(move || {
2581 transport_clone.call(&dep_id_clone, 1, vec![])
2582 })
2583 .await
2584 .expect("spawn_blocking joined")
2585 })
2586 .expect("first in-process dispatch must succeed");
2587 assert_eq!(bytes, 1u64.to_be_bytes().to_vec());
2588
2589 let transport_clone = transport.clone();
2590 let dep_id_clone = dep_id.clone();
2591 let bytes2 = rt
2592 .block_on(async move {
2593 ::tokio::task::spawn_blocking(move || {
2594 transport_clone.call(&dep_id_clone, 1, vec![])
2595 })
2596 .await
2597 .expect("spawn_blocking joined")
2598 })
2599 .expect("second in-process dispatch must succeed");
2600 assert_eq!(bytes2, 2u64.to_be_bytes().to_vec());
2601 }
2602
2603 #[test]
2604 fn hosted_rpc_owner_panic_surfaces_then_poisons() {
2605 let cell = HostedRpcOwnerCell::from_owner(PanickingRpcOwner);
2606
2607 let err1 = cell
2616 .dispatch(1, &[])
2617 .expect_err("first call must surface the panic as Err");
2618 assert!(
2619 err1.contains("hosted rpc owner panicked: owner_panic_for_test"),
2620 "expected first-call error to wrap the panic payload, got '{err1}'"
2621 );
2622
2623 let err2 = cell
2627 .dispatch(1, &[])
2628 .expect_err("second call must short-circuit on the poisoned cell");
2629 assert_eq!(
2630 err2, "hosted rpc owner poisoned",
2631 "expected poisoned-cell error on the second call, got '{err2}'"
2632 );
2633 }
2634
2635 #[test]
2642 fn hosted_rpc_in_process_transport_round_trips_large_payload_exceeding_64_kib() {
2643 let counter = Arc::new(AtomicUsize::new(0));
2644 let dep = registered_hosted_rpc_dep("rpc_dep", "", counter);
2645 let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2646
2647 let (execution, _filtered) =
2648 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2649 let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
2650 .collect_hosted_rpc_owner_cells_sync()
2651 .into_iter()
2652 .collect();
2653 let transport: Arc<dyn HostedRpcTransport> =
2654 Arc::new(InProcessHostedRpcTransport::new(cells));
2655 let channel = HostedRpcChannel::new("tcrate::rpc_dep".to_string(), transport);
2656 let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
2657
2658 const SIZE: u32 = 256 * 1024; let bytes = stub.echo(SIZE);
2660 assert_eq!(
2661 bytes.len(),
2662 SIZE as usize,
2663 "framing dropped/truncated bytes"
2664 );
2665 for (i, b) in bytes.iter().enumerate() {
2666 assert_eq!(
2667 *b,
2668 (i % 251) as u8,
2669 "framing corrupted byte at index {i}: expected {}, got {b}",
2670 (i % 251) as u8
2671 );
2672 }
2673 }
2674
2675 #[test]
2676 fn hosted_rpc_in_process_transport_multiplexes_concurrent_calls_from_threads() {
2677 use std::thread;
2678
2679 let counter = Arc::new(AtomicUsize::new(0));
2680 let dep = registered_hosted_rpc_dep("rpc_dep", "", counter);
2681 let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2682
2683 let (execution, _filtered) =
2684 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2685 let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
2686 .collect_hosted_rpc_owner_cells_sync()
2687 .into_iter()
2688 .collect();
2689 let transport: Arc<dyn HostedRpcTransport> =
2690 Arc::new(InProcessHostedRpcTransport::new(cells));
2691
2692 const N: usize = 4;
2699 const M: usize = 32;
2700 let mut handles = Vec::new();
2701 for _ in 0..N {
2702 let dep_id = "tcrate::rpc_dep".to_string();
2703 let transport = transport.clone();
2704 handles.push(thread::spawn(move || {
2705 let channel = HostedRpcChannel::new(dep_id, transport);
2706 let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
2707 let mut ids = Vec::with_capacity(M);
2708 for _ in 0..M {
2709 ids.push(stub.next());
2710 }
2711 ids
2712 }));
2713 }
2714 let mut all = Vec::with_capacity(N * M);
2715 for h in handles {
2716 all.extend(h.join().expect("thread panicked"));
2717 }
2718 all.sort();
2719 let mut prev: u64 = 0;
2720 for id in &all {
2721 assert!(
2722 *id > prev,
2723 "duplicate or non-monotonic id {id} after {prev}"
2724 );
2725 prev = *id;
2726 }
2727 assert_eq!(
2728 all.len(),
2729 N * M,
2730 "expected exactly {} ids in total, got {}",
2731 N * M,
2732 all.len()
2733 );
2734 }
2735}