1use rand::prelude::{SliceRandom, StdRng};
2use rand::SeedableRng;
3use std::any::Any;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::fmt::{Debug, Formatter};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use topological_sort::TopologicalSort;
9
10use crate::args::Arguments;
11use crate::internal::{
12 apply_suite_props_to_tests, filter_registered_tests, DepScope, DependencyConstructor,
13 DependencyView, HostedRpcOwnerCell, RegisteredDependency, RegisteredTest,
14 RegisteredTestSuiteProperty,
15};
16
17pub type DepWireBytes = (String, Vec<u8>);
20
21pub type HostedOwner = Arc<dyn Any + Send + Sync>;
24
25#[cfg(feature = "tokio")]
26type ParentSharedDependenciesFuture<'a> = std::pin::Pin<
27 Box<dyn std::future::Future<Output = HashMap<String, Arc<dyn Any + Send + Sync>>> + 'a>,
28>;
29
30#[cfg(test)]
31pub type HostedDescriptorCollection = (Vec<DepWireBytes>, Vec<HostedOwner>);
36
37pub struct ParentSharedDependencies {
41 pub cloneable_wire_bytes: Vec<DepWireBytes>,
42 pub hosted_descriptor_bytes: Vec<DepWireBytes>,
43 pub hosted_owners: Vec<HostedOwner>,
44 pub hosted_rpc_owner_cells: Vec<(String, Arc<HostedRpcOwnerCell>)>,
45}
46
47impl ParentSharedDependencies {
48 fn new() -> Self {
49 Self {
50 cloneable_wire_bytes: Vec::new(),
51 hosted_descriptor_bytes: Vec::new(),
52 hosted_owners: Vec::new(),
53 hosted_rpc_owner_cells: Vec::new(),
54 }
55 }
56}
57
58pub(crate) struct TestSuiteExecution {
59 crate_and_module: String,
60 dependencies: Vec<RegisteredDependency>,
61 tests: Vec<RegisteredTest>,
62 props: Vec<RegisteredTestSuiteProperty>,
63 inner: Vec<TestSuiteExecution>,
64 materialized_dependencies: HashMap<String, Arc<dyn Any + Send + Sync>>,
65 sequential_lock: SequentialExecutionLock,
66 remaining_count: usize,
67 idx: usize,
68 is_sequential: bool,
69 skip_creating_dependencies: bool,
70 in_progress: Arc<AtomicUsize>,
71}
72
73impl TestSuiteExecution {
74 pub fn construct(
75 arguments: &Arguments,
76 dependencies: &[RegisteredDependency],
77 tests: &[RegisteredTest],
78 props: &[RegisteredTestSuiteProperty],
79 ) -> (Self, Vec<RegisteredTest>) {
80 let tests_with_props = apply_suite_props_to_tests(tests, props);
81 let mut filtered_tests = filter_registered_tests(arguments, &tests_with_props);
82 Self::shuffle(arguments, &mut filtered_tests);
83 filtered_tests.reverse();
84
85 if filtered_tests.is_empty() {
86 (
87 Self::root(
88 dependencies
89 .iter()
90 .filter(|dep| dep.crate_name.is_empty() && dep.module_path.is_empty())
91 .cloned()
92 .collect::<Vec<_>>(),
93 Vec::new(),
94 props
95 .iter()
96 .filter(|dep| dep.crate_name().is_empty() && dep.module_path().is_empty())
97 .cloned()
98 .collect::<Vec<_>>(),
99 ),
100 Vec::new(),
101 )
102 } else {
103 let mut root = Self::root(Vec::new(), Vec::new(), Vec::new());
104
105 for prop in props {
106 root.add_prop(prop.clone());
107 }
108
109 for dep in dependencies {
110 root.add_dependency(dep.clone());
111 }
112
113 for test in filtered_tests.clone() {
114 root.add_test(test.clone());
115 }
116
117 root.propagate_sequential(None);
118 root.prune_unused_deps();
119
120 (root, filtered_tests)
121 }
122 }
123
124 fn shuffle(arguments: &Arguments, tests: &mut [RegisteredTest]) {
125 if let Some(seed) = arguments.shuffle_seed {
126 let mut rng = StdRng::seed_from_u64(seed);
127 tests.shuffle(&mut rng);
128 }
129 }
130
131 pub fn skip_creating_dependencies(&mut self) {
134 self.skip_creating_dependencies = true;
135 for inner in &mut self.inner {
136 inner.skip_creating_dependencies();
137 }
138 }
139
140 pub fn remaining(&self) -> usize {
141 self.remaining_count
142 }
143
144 pub fn is_empty(&self) -> bool {
145 self.tests.is_empty() && self.inner.is_empty()
146 }
147
148 pub fn is_done(&self) -> bool {
149 self.remaining_count == 0
150 }
151
152 #[allow(dead_code)]
154 pub fn has_dependencies(&self) -> bool {
155 !self.dependencies.is_empty() || self.inner.iter().any(|inner| inner.has_dependencies())
156 }
157
158 pub fn has_shared_dependencies(&self) -> bool {
162 self.dependencies
163 .iter()
164 .any(|d| d.scope == DepScope::Shared)
165 || self
166 .inner
167 .iter()
168 .any(|inner| inner.has_shared_dependencies())
169 }
170
171 #[allow(dead_code)]
173 pub fn has_cloneable_dependencies(&self) -> bool {
174 self.dependencies
175 .iter()
176 .any(|d| d.scope == DepScope::Cloneable)
177 || self
178 .inner
179 .iter()
180 .any(|inner| inner.has_cloneable_dependencies())
181 }
182
183 #[allow(dead_code)]
187 pub fn has_hosted_dependencies(&self) -> bool {
188 self.dependencies
189 .iter()
190 .any(|d| d.scope == DepScope::Hosted)
191 || self
192 .inner
193 .iter()
194 .any(|inner| inner.has_hosted_dependencies())
195 }
196
197 #[allow(dead_code)]
201 pub fn has_hosted_rpc_dependencies(&self) -> bool {
202 self.dependencies
203 .iter()
204 .any(|d| d.scope == DepScope::HostedRpc)
205 || self
206 .inner
207 .iter()
208 .any(|inner| inner.has_hosted_rpc_dependencies())
209 }
210
211 #[allow(dead_code)]
213 pub fn collect_cloneable_dependencies(&self) -> Vec<RegisteredDependency> {
214 let mut out = Vec::new();
215 self.collect_cloneable_dependencies_into(&mut out);
216 out
217 }
218
219 #[allow(dead_code)]
220 fn collect_cloneable_dependencies_into(&self, out: &mut Vec<RegisteredDependency>) {
221 for dep in &self.dependencies {
222 if dep.scope == DepScope::Cloneable {
223 out.push(dep.clone());
224 }
225 }
226 for inner in &self.inner {
227 inner.collect_cloneable_dependencies_into(out);
228 }
229 }
230
231 pub fn collect_parent_shared_dependencies_sync(&self) -> ParentSharedDependencies {
238 let mut out = ParentSharedDependencies::new();
239 let parent_map = HashMap::new();
240 self.collect_parent_shared_dependencies_into_sync(&parent_map, &mut out);
241 out
242 }
243
244 fn collect_parent_shared_dependencies_into_sync(
245 &self,
246 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
247 out: &mut ParentSharedDependencies,
248 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
249 let mut dependency_map = parent_map.clone();
250 let sorted_dependencies = self.sorted_dependencies();
251
252 for dep in sorted_dependencies {
253 if dependency_map.contains_key(&dep.name) {
254 continue;
255 }
256
257 let value = Self::construct_dependency_sync(dep, &dependency_map);
258 match dep.scope {
259 DepScope::Cloneable => {
260 let codec = dep.cloneable_codec.as_ref().unwrap_or_else(|| {
261 panic!("Cloneable dep '{}' missing CloneableCodec", dep.name)
262 });
263 out.cloneable_wire_bytes
264 .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
265 }
266 DepScope::Hosted => {
267 let codec = dep.hosted_codec.as_ref().unwrap_or_else(|| {
268 panic!("Hosted dep '{}' missing hosted codec", dep.name)
269 });
270 out.hosted_descriptor_bytes
271 .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
272 out.hosted_owners.push(value.clone());
273 }
274 DepScope::HostedRpc => {
275 let factory = dep.rpc_factory.as_ref().unwrap_or_else(|| {
276 panic!("HostedRpc dep '{}' missing RpcFactory", dep.name)
277 });
278 let cell = (factory.owner_into_cell)(value.clone());
279 out.hosted_rpc_owner_cells.push((dep.qualified_id(), cell));
280 }
281 DepScope::Shared | DepScope::PerWorker => {}
282 }
283
284 dependency_map.insert(dep.name.clone(), value);
285 }
286
287 for inner in &self.inner {
288 inner.collect_parent_shared_dependencies_into_sync(&dependency_map, out);
289 }
290
291 dependency_map
292 }
293
294 fn construct_dependency_sync(
295 dep: &RegisteredDependency,
296 dependency_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
297 ) -> Arc<dyn Any + Send + Sync> {
298 match &dep.constructor {
299 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
300 DependencyConstructor::Async(cons) => {
301 futures::executor::block_on(cons(Arc::new(dependency_map.clone())))
302 }
303 }
304 }
305
306 #[cfg(test)]
311 pub fn collect_cloneable_wire_bytes_sync(&self) -> Vec<(String, Vec<u8>)> {
312 self.collect_parent_shared_dependencies_sync()
313 .cloneable_wire_bytes
314 }
315
316 #[cfg(test)]
324 pub fn collect_hosted_descriptor_bytes_sync(&self) -> HostedDescriptorCollection {
325 let collected = self.collect_parent_shared_dependencies_sync();
326 (collected.hosted_descriptor_bytes, collected.hosted_owners)
327 }
328
329 #[cfg(test)]
334 pub fn collect_hosted_rpc_owner_cells_sync(&self) -> Vec<(String, Arc<HostedRpcOwnerCell>)> {
335 self.collect_parent_shared_dependencies_sync()
336 .hosted_rpc_owner_cells
337 }
338
339 #[cfg(feature = "tokio")]
343 pub async fn collect_parent_shared_dependencies_async(&self) -> ParentSharedDependencies {
344 let mut out = ParentSharedDependencies::new();
345 let parent_map = HashMap::new();
346 self.collect_parent_shared_dependencies_into_async(&parent_map, &mut out)
347 .await;
348 out
349 }
350
351 #[cfg(feature = "tokio")]
352 fn collect_parent_shared_dependencies_into_async<'a>(
353 &'a self,
354 parent_map: &'a HashMap<String, Arc<dyn Any + Send + Sync>>,
355 out: &'a mut ParentSharedDependencies,
356 ) -> ParentSharedDependenciesFuture<'a> {
357 Box::pin(async move {
358 let mut dependency_map = parent_map.clone();
359 let sorted_dependencies = self.sorted_dependencies();
360
361 for dep in sorted_dependencies {
362 if dependency_map.contains_key(&dep.name) {
363 continue;
364 }
365
366 let value = match &dep.constructor {
367 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
368 DependencyConstructor::Async(cons) => {
369 cons(Arc::new(dependency_map.clone())).await
370 }
371 };
372 match dep.scope {
373 DepScope::Cloneable => {
374 let codec = dep.cloneable_codec.as_ref().unwrap_or_else(|| {
375 panic!("Cloneable dep '{}' missing CloneableCodec", dep.name)
376 });
377 out.cloneable_wire_bytes
378 .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
379 }
380 DepScope::Hosted => {
381 let codec = dep.hosted_codec.as_ref().unwrap_or_else(|| {
382 panic!("Hosted dep '{}' missing hosted codec", dep.name)
383 });
384 out.hosted_descriptor_bytes
385 .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
386 out.hosted_owners.push(value.clone());
387 }
388 DepScope::HostedRpc => {
389 let factory = dep.rpc_factory.as_ref().unwrap_or_else(|| {
390 panic!("HostedRpc dep '{}' missing RpcFactory", dep.name)
391 });
392 let cell = (factory.owner_into_cell)(value.clone());
393 out.hosted_rpc_owner_cells.push((dep.qualified_id(), cell));
394 }
395 DepScope::Shared | DepScope::PerWorker => {}
396 }
397
398 dependency_map.insert(dep.name.clone(), value);
399 }
400
401 for inner in &self.inner {
402 inner
403 .collect_parent_shared_dependencies_into_async(&dependency_map, out)
404 .await;
405 }
406 dependency_map
407 })
408 }
409
410 #[cfg(feature = "tokio")]
412 #[cfg(test)]
413 pub async fn collect_hosted_descriptor_bytes_async(&self) -> HostedDescriptorCollection {
414 let collected = self.collect_parent_shared_dependencies_async().await;
415 (collected.hosted_descriptor_bytes, collected.hosted_owners)
416 }
417
418 #[cfg(feature = "tokio")]
428 #[cfg(test)]
429 pub async fn collect_cloneable_wire_bytes_async(&self) -> Vec<(String, Vec<u8>)> {
430 self.collect_parent_shared_dependencies_async()
431 .await
432 .cloneable_wire_bytes
433 }
434
435 pub fn provide_cloneable_value(
444 &mut self,
445 dep_id: &str,
446 value: Arc<dyn Any + Send + Sync>,
447 ) -> bool {
448 let applied = self.provide_cloneable_value_internal(dep_id, value);
449 if applied {
450 self.prune_unused_deps();
451 }
452 applied
453 }
454
455 fn provide_cloneable_value_internal(
456 &mut self,
457 dep_id: &str,
458 value: Arc<dyn Any + Send + Sync>,
459 ) -> bool {
460 let mut applied = false;
461 if let Some((local_name, dep_idx)) = self
462 .dependencies
463 .iter()
464 .enumerate()
465 .find(|(_, d)| d.qualified_id() == dep_id)
466 .map(|(idx, d)| (d.name.clone(), idx))
467 {
468 self.dependencies[dep_idx].dependencies.clear();
473 self.materialized_dependencies
474 .insert(local_name, value.clone());
475 applied = true;
476 }
477 for inner in &mut self.inner {
478 applied |= inner.provide_cloneable_value_internal(dep_id, value.clone());
479 }
480 applied
481 }
482
483 pub fn requires_capturing(&self, capture_by_default: bool) -> bool {
486 self.tests.iter().any(|test| {
487 test.props
488 .capture_control
489 .requires_capturing(capture_by_default)
490 }) || self
491 .inner
492 .iter()
493 .any(|inner| inner.requires_capturing(capture_by_default))
494 }
495
496 #[cfg(feature = "tokio")]
497 pub async fn pick_next(&mut self) -> Option<TestExecution> {
498 if self.is_empty() {
499 None
500 } else {
501 match self
502 .pick_next_internal(&self.create_dependency_map(&HashMap::new()))
503 .await
504 {
505 Some((test, deps, seq_lock, in_progress_counter)) => {
506 let index = self.idx;
507 self.idx += 1;
508 Some(TestExecution {
509 test: test.clone(),
510 deps: Arc::new(deps),
511 index,
512 _seq_lock: seq_lock,
513 in_progress_counter,
514 })
515 }
516 None => None,
517 }
518 }
519 }
520
521 pub fn pick_next_sync(&mut self) -> Option<TestExecution> {
522 match self.pick_next_internal_sync(&HashMap::new()) {
523 Some((test, deps, seq_lock, in_progress_counter)) => {
524 let index = self.idx;
525 self.idx += 1;
526 Some(TestExecution {
527 test: test.clone(),
528 deps: Arc::new(deps),
529 index,
530 _seq_lock: seq_lock,
531 in_progress_counter,
532 })
533 }
534 None => None,
535 }
536 }
537
538 #[cfg(feature = "tokio")]
539 #[allow(clippy::type_complexity)]
540 async fn pick_next_internal(
541 &mut self,
542 materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
543 ) -> Option<(
544 RegisteredTest,
545 HashMap<String, Arc<dyn Any + Send + Sync>>,
546 SequentialExecutionLockGuard,
547 Arc<AtomicUsize>,
548 )> {
549 if self.is_empty() {
550 None
551 } else {
552 let dependency_map = if !self.is_materialized() {
553 self.materialize_deps(materialized_parent_deps).await
554 } else {
555 self.create_dependency_map(materialized_parent_deps)
556 };
557
558 let locked = self.sequential_lock.is_locked().await;
559 let result = if self.tests.is_empty() || locked {
560 let current = self.inner.iter_mut();
561 let mut result = None;
562 for inner in current {
563 if let Some((test, deps, seq_lock, in_progress_counter)) =
564 Box::pin(inner.pick_next_internal(&dependency_map)).await
565 {
566 result = Some((test, deps, seq_lock, in_progress_counter));
567 break;
568 }
569 }
570 self.inner.retain(|inner| !inner.is_empty());
571
572 result
573 } else {
574 let guard = self.sequential_lock.lock(self.is_sequential).await;
575 self.in_progress.fetch_add(1, Ordering::Release);
576 self.tests
577 .pop()
578 .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
579 };
580 if result.is_none()
581 && self.is_empty()
582 && self.is_materialized()
583 && !locked
584 && self.in_progress.load(Ordering::Acquire) == 0
585 {
586 self.drop_deps();
587 }
588 if result.is_some() {
589 self.remaining_count -= 1;
590 }
591 result
592 }
593 }
594
595 #[allow(clippy::type_complexity)]
596 fn pick_next_internal_sync(
597 &mut self,
598 materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
599 ) -> Option<(
600 RegisteredTest,
601 HashMap<String, Arc<dyn Any + Send + Sync>>,
602 SequentialExecutionLockGuard,
603 Arc<AtomicUsize>,
604 )> {
605 if self.is_empty() {
606 None
607 } else {
608 let dependency_map = if !self.is_materialized() {
609 self.materialize_deps_sync(materialized_parent_deps)
610 } else {
611 self.create_dependency_map(materialized_parent_deps)
612 };
613
614 let locked = self.sequential_lock.is_locked_sync();
615 let result = if self.tests.is_empty() || locked {
616 let current = self.inner.iter_mut();
617 let mut result = None;
618 for inner in current {
619 if let Some((test, deps, seq_lock, in_progress_counter)) =
620 inner.pick_next_internal_sync(&dependency_map)
621 {
622 result = Some((test, deps, seq_lock, in_progress_counter));
623 break;
624 }
625 }
626
627 self.inner.retain(|inner| !inner.is_empty());
628 result
629 } else {
630 let guard = self.sequential_lock.lock_sync(self.is_sequential);
631 self.in_progress.fetch_add(1, Ordering::Release);
632 self.tests
633 .pop()
634 .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
635 };
636 if result.is_none()
637 && self.is_materialized()
638 && !locked
639 && self.in_progress.load(Ordering::Acquire) == 0
640 {
641 self.drop_deps();
642 }
643 if result.is_some() {
644 self.remaining_count -= 1;
645 }
646 result
647 }
648 }
649
650 fn create_dependency_map(
651 &self,
652 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
653 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
654 let mut result = parent_map.clone();
655 for (key, dep) in &self.materialized_dependencies {
656 result.insert(key.clone(), dep.clone());
657 }
658 result
659 }
660
661 fn root(
662 deps: Vec<RegisteredDependency>,
663 tests: Vec<RegisteredTest>,
664 props: Vec<RegisteredTestSuiteProperty>,
665 ) -> Self {
666 let total_count = tests.len();
667 let is_sequential = props
668 .iter()
669 .any(|prop| matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }))
670 || tests.iter().any(|test| test.run.is_bench());
671 Self {
672 crate_and_module: String::new(),
673 dependencies: deps,
674 tests,
675 props,
676 inner: Vec::new(),
677 materialized_dependencies: HashMap::new(),
678 remaining_count: total_count,
679 idx: 0,
680 sequential_lock: SequentialExecutionLock::new(),
681 is_sequential,
682 skip_creating_dependencies: false,
683 in_progress: Arc::new(AtomicUsize::new(0)),
684 }
685 }
686
687 fn add_dependency(&mut self, dep: RegisteredDependency) {
688 let crate_and_module = dep.crate_and_module();
689 if self.crate_and_module == crate_and_module {
690 self.dependencies.push(dep);
691 } else {
692 let mut found = false;
693 for inner in &mut self.inner {
694 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
695 inner.add_dependency(dep.clone());
696 found = true;
697 break;
698 }
699 }
700 if !found {
701 let mut inner = Self {
702 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
703 dependencies: vec![],
704 tests: vec![],
705 inner: vec![],
706 props: vec![],
707 materialized_dependencies: HashMap::new(),
708 remaining_count: 0,
709 idx: 0,
710 is_sequential: false,
711 sequential_lock: SequentialExecutionLock::new(),
712 skip_creating_dependencies: false,
713 in_progress: Arc::new(AtomicUsize::new(0)),
714 };
715 inner.add_dependency(dep);
716 self.inner.push(inner);
717 }
718 }
719 }
720
721 fn add_test(&mut self, test: RegisteredTest) {
722 let crate_and_module = test.crate_and_module();
723 if self.crate_and_module == crate_and_module {
724 self.tests.push(test.clone());
725
726 if test.run.is_bench() {
727 self.is_sequential = true;
728 }
729 } else {
730 let mut found = false;
731 for inner in &mut self.inner {
732 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
733 inner.add_test(test.clone());
734 found = true;
735 break;
736 }
737 }
738 if !found {
739 let mut inner = Self {
740 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
741 dependencies: vec![],
742 tests: vec![],
743 inner: vec![],
744 props: vec![],
745 materialized_dependencies: HashMap::new(),
746 remaining_count: 0,
747 idx: 0,
748 is_sequential: false,
749 sequential_lock: SequentialExecutionLock::new(),
750 skip_creating_dependencies: false,
751 in_progress: Arc::new(AtomicUsize::new(0)),
752 };
753 inner.add_test(test);
754 self.inner.push(inner);
755 }
756 }
757 self.remaining_count += 1;
758 }
759
760 fn add_prop(&mut self, prop: RegisteredTestSuiteProperty) {
761 let crate_and_module = prop.crate_and_module();
762 if self.crate_and_module == crate_and_module {
763 if matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }) {
764 self.is_sequential = true;
765 }
766 self.props.push(prop);
767 } else {
768 let mut found = false;
769 for inner in &mut self.inner {
770 if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
771 inner.add_prop(prop.clone());
772 found = true;
773 break;
774 }
775 }
776 if !found {
777 let mut inner = Self {
778 crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
779 dependencies: vec![],
780 tests: vec![],
781 inner: vec![],
782 props: vec![],
783 materialized_dependencies: HashMap::new(),
784 remaining_count: 0,
785 idx: 0,
786 is_sequential: false,
787 sequential_lock: SequentialExecutionLock::new(),
788 skip_creating_dependencies: false,
789 in_progress: Arc::new(AtomicUsize::new(0)),
790 };
791 inner.add_prop(prop);
792 self.inner.push(inner);
793 }
794 }
795 }
796
797 fn is_materialized(&self) -> bool {
798 self.skip_creating_dependencies
799 || self.materialized_dependencies.len() == self.dependencies.len()
800 }
801
802 #[cfg(feature = "tokio")]
803 async fn materialize_deps(
804 &mut self,
805 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
806 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
807 let mut deps = self.materialized_dependencies.clone();
810 let mut dependency_map = parent_map.clone();
811 for (k, v) in &deps {
812 dependency_map.insert(k.clone(), v.clone());
813 }
814
815 let sorted_dependencies = self.sorted_dependencies();
816 for dep in &sorted_dependencies {
817 if deps.contains_key(&dep.name) {
818 continue;
819 }
820 let materialized_dep = match &dep.constructor {
821 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
822 DependencyConstructor::Async(cons) => cons(Arc::new(dependency_map.clone())).await,
823 };
824 deps.insert(dep.name.clone(), materialized_dep.clone());
825 dependency_map.insert(dep.name.clone(), materialized_dep);
826 }
827 self.materialized_dependencies = deps;
828 dependency_map
829 }
830
831 fn materialize_deps_sync(
832 &mut self,
833 parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
834 ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
835 let mut deps = self.materialized_dependencies.clone();
838 let mut dependency_map = parent_map.clone();
839 for (k, v) in &deps {
840 dependency_map.insert(k.clone(), v.clone());
841 }
842
843 let sorted_dependencies = self.sorted_dependencies();
844 for dep in &sorted_dependencies {
845 if deps.contains_key(&dep.name) {
846 continue;
847 }
848 let materialized_dep = match &dep.constructor {
849 DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
850 DependencyConstructor::Async(cons) => {
851 futures::executor::block_on(cons(Arc::new(dependency_map.clone())))
852 }
853 };
854 deps.insert(dep.name.clone(), materialized_dep.clone());
855 dependency_map.insert(dep.name.clone(), materialized_dep);
856 }
857 self.materialized_dependencies = deps;
858 dependency_map
859 }
860
861 fn sorted_dependencies(&self) -> Vec<&RegisteredDependency> {
862 let mut ts: TopologicalSort<&RegisteredDependency> = TopologicalSort::new();
863 for dep in &self.dependencies {
864 let mut added = false;
865 for dep_dep_name in &dep.dependencies {
866 if let Some(dep_dep) = self.dependencies.iter().find(|d| &d.name == dep_dep_name) {
867 ts.add_dependency(dep_dep, dep);
868 added = true;
869 } else {
870 }
872 }
873 if !added {
874 ts.insert(dep);
875 }
876 }
877 let mut result = Vec::with_capacity(self.dependencies.len());
878 loop {
879 let chunk = ts.pop_all();
880 if chunk.is_empty() {
881 break;
882 }
883 result.extend(chunk);
884 }
885 result
886 }
887
888 fn drop_deps(&mut self) {
889 self.materialized_dependencies.clear();
890 }
891
892 fn prune_unused_deps(&mut self) -> Option<HashSet<String>> {
896 let mut needed: Option<HashSet<String>> = Some(HashSet::new());
898 for test in &self.tests {
899 match &test.dependencies {
900 None => {
901 needed = None;
902 break;
903 }
904 Some(deps) => {
905 if let Some(ref mut set) = needed {
906 set.extend(deps.iter().cloned());
907 }
908 }
909 }
910 }
911
912 for inner in &mut self.inner {
914 let child_needs = inner.prune_unused_deps();
915 needed = match (needed, child_needs) {
916 (None, _) | (_, None) => None,
917 (Some(mut a), Some(b)) => {
918 a.extend(b);
919 Some(a)
920 }
921 };
922 }
923
924 let needed = needed?;
926
927 let local_names: HashSet<String> =
929 self.dependencies.iter().map(|d| d.name.clone()).collect();
930 let mut keep_local: HashSet<String> = needed.intersection(&local_names).cloned().collect();
931
932 let mut queue: VecDeque<String> = keep_local.iter().cloned().collect();
934 let mut needed_from_parent: HashSet<String> =
935 needed.difference(&local_names).cloned().collect();
936
937 while let Some(dep_name) = queue.pop_front() {
938 if let Some(dep) = self.dependencies.iter().find(|d| d.name == dep_name) {
939 for transitive in &dep.dependencies {
940 if local_names.contains(transitive) {
941 if keep_local.insert(transitive.clone()) {
942 queue.push_back(transitive.clone());
943 }
944 } else {
945 needed_from_parent.insert(transitive.clone());
946 }
947 }
948 for companion in &dep.companions {
957 if local_names.contains(companion) {
958 if keep_local.insert(companion.clone()) {
959 queue.push_back(companion.clone());
960 }
961 } else {
962 needed_from_parent.insert(companion.clone());
963 }
964 }
965 }
966 }
967
968 self.dependencies.retain(|d| keep_local.contains(&d.name));
970
971 Some(needed_from_parent)
972 }
973
974 fn is_prefix_of(this: &str, that: &str) -> bool {
975 this.is_empty() || this == that || that.starts_with(&format!("{this}::"))
976 }
977
978 fn next_level(from: &str, to: &str) -> String {
979 assert!(Self::is_prefix_of(from, to));
980 let remaining = if from.is_empty() {
981 to
982 } else {
983 &to[from.len() + 2..]
984 };
985
986 let result = if let Some((next, _tail)) = remaining.split_once("::") {
987 format!("{from}::{next}")
988 } else {
989 format!("{from}::{remaining}")
990 };
991 result.trim_start_matches("::").to_string()
992 }
993
994 fn propagate_sequential(&mut self, inherited_lock: Option<&SequentialExecutionLock>) {
995 if let Some(parent_lock) = inherited_lock {
996 self.is_sequential = true;
997 self.sequential_lock = parent_lock.clone();
998 }
999
1000 let lock_for_children = if self.is_sequential {
1001 Some(self.sequential_lock.clone())
1002 } else {
1003 None
1004 };
1005
1006 for child in &mut self.inner {
1007 child.propagate_sequential(lock_for_children.as_ref());
1008 }
1009 }
1010}
1011
1012impl Debug for TestSuiteExecution {
1013 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1014 writeln!(
1015 f,
1016 "'{}' {} [{}]",
1017 self.crate_and_module,
1018 self.props
1019 .iter()
1020 .map(|x| format!("{x:?}"))
1021 .collect::<Vec<_>>()
1022 .join(", "),
1023 if self.is_sequential { "S" } else { "P" }
1024 )?;
1025 writeln!(f, " deps:")?;
1026 for dep in &self.dependencies {
1027 writeln!(f, " '{}'", dep.name)?;
1028 }
1029 writeln!(f, " tests:")?;
1030 for test in &self.tests {
1031 writeln!(f, " '{}' [{:?}]", test.name, test.props.test_type)?;
1032 }
1033 for inner in &self.inner {
1034 let inner_str = format!("{inner:?}");
1035 for inner_line in inner_str.lines() {
1036 writeln!(f, " {inner_line}")?;
1037 }
1038 }
1039 Ok(())
1040 }
1041}
1042
1043impl DependencyView for HashMap<String, Arc<dyn Any + Send + Sync>> {
1044 fn get(&self, name: &str) -> Option<Arc<dyn Any + Send + Sync>> {
1045 self.get(name).cloned()
1046 }
1047}
1048
1049pub struct TestExecution {
1050 pub test: RegisteredTest,
1051 pub deps: Arc<dyn DependencyView + Send + Sync>,
1052 pub index: usize,
1053 _seq_lock: SequentialExecutionLockGuard,
1054 in_progress_counter: Arc<AtomicUsize>,
1055}
1056
1057impl Drop for TestExecution {
1058 fn drop(&mut self) {
1059 self.in_progress_counter.fetch_sub(1, Ordering::Release);
1060 }
1061}
1062
1063#[allow(dead_code)]
1064enum SequentialExecutionLockGuard {
1065 None,
1066 #[cfg(feature = "tokio")]
1067 Async(tokio::sync::OwnedMutexGuard<()>),
1068 Sync(parking_lot::ArcMutexGuard<parking_lot::RawMutex, ()>),
1069}
1070
1071#[derive(Clone)]
1072struct SequentialExecutionLock {
1073 #[cfg(feature = "tokio")]
1074 async_mutex: Arc<tokio::sync::Mutex<()>>,
1075 sync_mutex: Arc<parking_lot::Mutex<()>>,
1076}
1077
1078impl SequentialExecutionLock {
1079 pub fn new() -> Self {
1080 Self {
1081 #[cfg(feature = "tokio")]
1082 async_mutex: Arc::new(tokio::sync::Mutex::new(())),
1083 sync_mutex: Arc::new(parking_lot::Mutex::new(())),
1084 }
1085 }
1086
1087 #[cfg(feature = "tokio")]
1088 pub async fn is_locked(&self) -> bool {
1089 self.async_mutex.try_lock().is_err()
1090 }
1091
1092 pub fn is_locked_sync(&self) -> bool {
1093 self.sync_mutex.try_lock().is_none()
1094 }
1095
1096 #[cfg(feature = "tokio")]
1097 pub async fn lock(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
1098 if is_sequential {
1099 let permit = tokio::sync::Mutex::lock_owned(self.async_mutex.clone()).await;
1100 SequentialExecutionLockGuard::Async(permit)
1101 } else {
1102 SequentialExecutionLockGuard::None
1103 }
1104 }
1105
1106 pub fn lock_sync(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
1107 if is_sequential {
1108 let permit = parking_lot::Mutex::lock_arc(&self.sync_mutex);
1109 SequentialExecutionLockGuard::Sync(permit)
1110 } else {
1111 SequentialExecutionLockGuard::None
1112 }
1113 }
1114}
1115
1116#[cfg(test)]
1117mod cloneable_tests {
1118 use super::*;
1119 use crate::internal::{
1120 CloneableCodec, DependencyConstructor, RegisteredDependency, RegisteredTest, TestFunction,
1121 TestProperties,
1122 };
1123 use std::sync::atomic::{AtomicUsize, Ordering};
1124
1125 fn registered_test(name: &str, deps: Vec<String>) -> RegisteredTest {
1126 registered_test_in_module(name, "", deps)
1127 }
1128
1129 fn registered_test_in_module(
1130 name: &str,
1131 module_path: &str,
1132 deps: Vec<String>,
1133 ) -> RegisteredTest {
1134 RegisteredTest {
1135 name: name.to_string(),
1136 crate_name: "tcrate".to_string(),
1137 module_path: module_path.to_string(),
1138 run: TestFunction::Sync(Arc::new(|_| Box::new(()))),
1139 props: TestProperties::default(),
1140 dependencies: Some(deps),
1141 }
1142 }
1143
1144 fn registered_cloneable_dep(name: &str, counter: Arc<AtomicUsize>) -> RegisteredDependency {
1147 registered_cloneable_dep_in(name, "", 0xdead_beef, counter)
1148 }
1149
1150 fn registered_cloneable_dep_in(
1155 name: &str,
1156 module_path: &str,
1157 constructor_value: u64,
1158 counter: Arc<AtomicUsize>,
1159 ) -> RegisteredDependency {
1160 let constructor_counter = counter.clone();
1161 let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
1162 constructor_counter.fetch_add(1, Ordering::SeqCst);
1163 Arc::new(constructor_value) as Arc<dyn Any + Send + Sync>
1164 }));
1165 let codec = CloneableCodec {
1166 to_wire: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
1167 let value: Arc<u64> = any.downcast::<u64>().unwrap();
1168 (*value).to_le_bytes().to_vec()
1169 }),
1170 from_wire_bytes: Arc::new(|bytes: &[u8]| {
1171 let arr: [u8; 8] = bytes.try_into().unwrap();
1172 let value = u64::from_le_bytes(arr);
1173 Arc::new(value) as Arc<dyn Any + Send + Sync>
1174 }),
1175 };
1176 RegisteredDependency {
1177 name: name.to_string(),
1178 crate_name: "tcrate".to_string(),
1179 module_path: module_path.to_string(),
1180 constructor,
1181 dependencies: Vec::new(),
1182 scope: DepScope::Cloneable,
1183 worker_fn: Some(crate::internal::WorkerReconstructor::Sync(Arc::new(
1184 |wire_payload, _deps| wire_payload,
1185 ))),
1186 cloneable_codec: Some(codec),
1187 hosted_codec: None,
1188 rpc_factory: None,
1189 companions: Vec::new(),
1190 }
1191 }
1192
1193 #[test]
1194 fn cloneable_wire_collection_runs_constructor_once_and_encodes_value() {
1195 let counter = Arc::new(AtomicUsize::new(0));
1196 let dep = registered_cloneable_dep("clone_dep", counter.clone());
1197 let test = registered_test("t1", vec!["clone_dep".to_string()]);
1198
1199 let (execution, _filtered) =
1200 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1201
1202 let collected = execution.collect_cloneable_wire_bytes_sync();
1203 assert_eq!(collected.len(), 1, "exactly one cloneable dep expected");
1204 let (dep_id, wire_bytes) = &collected[0];
1205 assert_eq!(
1206 dep_id, "tcrate::clone_dep",
1207 "wire bytes must be keyed by the fully-qualified id, not the local name"
1208 );
1209 assert_eq!(
1210 wire_bytes.as_slice(),
1211 &0xdead_beef_u64.to_le_bytes(),
1212 "expected the codec-encoded value to round-trip via to_wire"
1213 );
1214 assert_eq!(
1215 counter.load(Ordering::SeqCst),
1216 1,
1217 "constructor must have run exactly once when collecting"
1218 );
1219 }
1220
1221 #[test]
1222 fn prune_unused_deps_retains_companion_when_only_one_half_is_referenced() {
1223 let counter_a = Arc::new(AtomicUsize::new(0));
1241 let counter_b = Arc::new(AtomicUsize::new(0));
1242 let mut dep_a = registered_cloneable_dep("clone_a", counter_a.clone());
1243 let mut dep_b = registered_cloneable_dep("clone_b", counter_b.clone());
1244 dep_a.companions = vec!["clone_b".to_string()];
1245 dep_b.companions = vec!["clone_a".to_string()];
1246
1247 let test_a = registered_test("t_uses_a", vec!["clone_a".to_string()]);
1248
1249 let (execution, _filtered) =
1250 TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test_a], &[]);
1251
1252 let kept: Vec<String> = execution
1253 .collect_cloneable_dependencies()
1254 .into_iter()
1255 .map(|d| d.name)
1256 .collect();
1257 assert!(
1258 kept.contains(&"clone_a".to_string()),
1259 "directly referenced dep must be retained, kept = {kept:?}"
1260 );
1261 assert!(
1262 kept.contains(&"clone_b".to_string()),
1263 "companion of a retained dep must also be retained (the planner-only \
1264 sibling link used by `worker = both(...)`), kept = {kept:?}"
1265 );
1266
1267 let counter_a = Arc::new(AtomicUsize::new(0));
1270 let counter_b = Arc::new(AtomicUsize::new(0));
1271 let mut dep_a = registered_cloneable_dep("clone_a", counter_a.clone());
1272 let mut dep_b = registered_cloneable_dep("clone_b", counter_b.clone());
1273 dep_a.companions = vec!["clone_b".to_string()];
1274 dep_b.companions = vec!["clone_a".to_string()];
1275
1276 let test_b = registered_test("t_uses_b", vec!["clone_b".to_string()]);
1277
1278 let (execution, _filtered) =
1279 TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test_b], &[]);
1280
1281 let kept: Vec<String> = execution
1282 .collect_cloneable_dependencies()
1283 .into_iter()
1284 .map(|d| d.name)
1285 .collect();
1286 assert!(
1287 kept.contains(&"clone_a".to_string()),
1288 "companion of a stub-referenced dep must be retained, kept = {kept:?}"
1289 );
1290 assert!(
1291 kept.contains(&"clone_b".to_string()),
1292 "directly referenced dep must be retained, kept = {kept:?}"
1293 );
1294
1295 let counter_a = Arc::new(AtomicUsize::new(0));
1299 let counter_b = Arc::new(AtomicUsize::new(0));
1300 let dep_a = registered_cloneable_dep("clone_a", counter_a.clone());
1301 let dep_b = registered_cloneable_dep("clone_b", counter_b.clone());
1302 let test_a = registered_test("t_uses_a", vec!["clone_a".to_string()]);
1303
1304 let (execution, _filtered) =
1305 TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test_a], &[]);
1306
1307 let kept: Vec<String> = execution
1308 .collect_cloneable_dependencies()
1309 .into_iter()
1310 .map(|d| d.name)
1311 .collect();
1312 assert!(
1313 kept.contains(&"clone_a".to_string()),
1314 "directly referenced dep must be retained, kept = {kept:?}"
1315 );
1316 assert!(
1317 !kept.contains(&"clone_b".to_string()),
1318 "without a companion link, an unreferenced dep must be pruned; \
1319 kept = {kept:?}"
1320 );
1321 }
1322
1323 #[test]
1324 fn provide_cloneable_value_short_circuits_constructor() {
1325 let counter = Arc::new(AtomicUsize::new(0));
1326 let dep = registered_cloneable_dep("clone_dep", counter.clone());
1327 let test = registered_test("t1", vec!["clone_dep".to_string()]);
1328
1329 let (mut execution, _filtered) =
1330 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1331
1332 let pre_value: Arc<dyn Any + Send + Sync> = Arc::new(99_u64);
1333 let applied = execution.provide_cloneable_value("tcrate::clone_dep", pre_value);
1334 assert!(
1335 applied,
1336 "pre-populated value should match the dep's qualified id"
1337 );
1338
1339 let next = execution.pick_next_sync().expect("test should be picked");
1342 assert_eq!(next.test.name, "t1");
1343
1344 let view = next.deps.get("clone_dep").expect("dep available");
1345 let value: Arc<u64> = view.downcast::<u64>().unwrap();
1346 assert_eq!(*value, 99);
1347
1348 assert_eq!(
1349 counter.load(Ordering::SeqCst),
1350 0,
1351 "constructor must not run when a pre-populated value is supplied"
1352 );
1353 }
1354
1355 #[test]
1356 fn provided_shared_value_is_a_worker_side_leaf() {
1357 let provided_counter = Arc::new(AtomicUsize::new(0));
1358 let parent_only_counter = Arc::new(AtomicUsize::new(0));
1359 let mut provided_dep = registered_cloneable_dep("clone_dep", provided_counter.clone());
1360 provided_dep.dependencies = vec!["parent_only_dep".to_string()];
1361 let parent_only_dep =
1362 registered_cloneable_dep("parent_only_dep", parent_only_counter.clone());
1363 let test = registered_test("t1", vec!["clone_dep".to_string()]);
1364
1365 let (mut execution, _filtered) = TestSuiteExecution::construct(
1366 &Arguments::default(),
1367 &[provided_dep, parent_only_dep],
1368 &[test],
1369 &[],
1370 );
1371
1372 let pre_value: Arc<dyn Any + Send + Sync> = Arc::new(99_u64);
1373 let applied = execution.provide_cloneable_value("tcrate::clone_dep", pre_value);
1374 assert!(applied);
1375
1376 let next = execution.pick_next_sync().expect("test should be picked");
1377 let view = next.deps.get("clone_dep").expect("dep available");
1378 let value: Arc<u64> = view.downcast::<u64>().unwrap();
1379 assert_eq!(*value, 99);
1380 assert_eq!(
1381 provided_counter.load(Ordering::SeqCst),
1382 0,
1383 "worker-side provided values must not run their original constructor"
1384 );
1385 assert_eq!(
1386 parent_only_counter.load(Ordering::SeqCst),
1387 0,
1388 "constructor dependencies are parent-only once a value arrives from wire bytes or an RPC stub"
1389 );
1390 }
1391
1392 #[cfg(feature = "tokio")]
1397 #[test]
1398 fn async_cloneable_wire_collection_awaits_async_constructor() {
1399 use std::pin::Pin;
1400
1401 let counter = Arc::new(AtomicUsize::new(0));
1402 let constructor_counter = counter.clone();
1403
1404 let constructor = DependencyConstructor::Async(Arc::new(move |_view| {
1408 let counter = constructor_counter.clone();
1409 Box::pin(async move {
1410 tokio::task::yield_now().await;
1411 counter.fetch_add(1, Ordering::SeqCst);
1412 let value: u64 = 0xdead_beef;
1413 Arc::new(value) as Arc<dyn Any + Send + Sync>
1414 }) as Pin<Box<dyn std::future::Future<Output = Arc<dyn Any + Send + Sync>>>>
1415 }));
1416 let codec = CloneableCodec {
1417 to_wire: Arc::new(|any| {
1418 let v: Arc<u64> = any.downcast::<u64>().unwrap();
1419 (*v).to_le_bytes().to_vec()
1420 }),
1421 from_wire_bytes: Arc::new(|bytes| {
1422 let arr: [u8; 8] = bytes.try_into().unwrap();
1423 Arc::new(u64::from_le_bytes(arr)) as Arc<dyn Any + Send + Sync>
1424 }),
1425 };
1426 let dep = RegisteredDependency {
1427 name: "clone_dep".to_string(),
1428 crate_name: "tcrate".to_string(),
1429 module_path: String::new(),
1430 constructor,
1431 dependencies: Vec::new(),
1432 scope: DepScope::Cloneable,
1433 worker_fn: Some(crate::internal::WorkerReconstructor::Sync(Arc::new(
1434 |wire_payload, _| wire_payload,
1435 ))),
1436 cloneable_codec: Some(codec),
1437 hosted_codec: None,
1438 rpc_factory: None,
1439 companions: Vec::new(),
1440 };
1441 let test = registered_test("t1", vec!["clone_dep".to_string()]);
1442
1443 let (execution, _filtered) =
1444 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1445
1446 let runtime = tokio::runtime::Builder::new_current_thread()
1447 .enable_all()
1448 .build()
1449 .unwrap();
1450 let collected = runtime.block_on(execution.collect_cloneable_wire_bytes_async());
1451
1452 assert_eq!(collected.len(), 1);
1453 assert_eq!(collected[0].0, "tcrate::clone_dep");
1454 assert_eq!(collected[0].1.as_slice(), &0xdead_beef_u64.to_le_bytes());
1455 assert_eq!(
1456 counter.load(Ordering::SeqCst),
1457 1,
1458 "async constructor must have run exactly once"
1459 );
1460 }
1461
1462 #[test]
1466 fn cloneable_value_routing_uses_qualified_id_across_modules() {
1467 let counter_a = Arc::new(AtomicUsize::new(0));
1468 let counter_b = Arc::new(AtomicUsize::new(0));
1469
1470 let dep_a = registered_cloneable_dep_in("clone_dep", "mod_a", 11, counter_a.clone());
1472 let dep_b = registered_cloneable_dep_in("clone_dep", "mod_b", 22, counter_b.clone());
1473
1474 let test_a = registered_test_in_module("t_a", "mod_a", vec!["clone_dep".to_string()]);
1476 let test_b = registered_test_in_module("t_b", "mod_b", vec!["clone_dep".to_string()]);
1477
1478 let (execution, _filtered) = TestSuiteExecution::construct(
1479 &Arguments::default(),
1480 &[dep_a, dep_b],
1481 &[test_a, test_b],
1482 &[],
1483 );
1484
1485 let mut collected = execution.collect_cloneable_wire_bytes_sync();
1487 collected.sort_by(|l, r| l.0.cmp(&r.0));
1488 assert_eq!(collected.len(), 2);
1489 assert_eq!(collected[0].0, "tcrate::mod_a::clone_dep");
1490 assert_eq!(collected[1].0, "tcrate::mod_b::clone_dep");
1491 assert_eq!(collected[0].1.as_slice(), &11_u64.to_le_bytes());
1492 assert_eq!(collected[1].1.as_slice(), &22_u64.to_le_bytes());
1493
1494 let mut execution_a = execution;
1498 let applied_a =
1499 execution_a.provide_cloneable_value("tcrate::mod_a::clone_dep", Arc::new(111_u64));
1500 assert!(applied_a, "mod_a dep must be reachable by qualified id");
1501 let applied_b =
1502 execution_a.provide_cloneable_value("tcrate::mod_b::clone_dep", Arc::new(222_u64));
1503 assert!(applied_b, "mod_b dep must be reachable by qualified id");
1504
1505 let applied_unknown =
1507 execution_a.provide_cloneable_value("tcrate::mod_c::clone_dep", Arc::new(333_u64));
1508 assert!(
1509 !applied_unknown,
1510 "unknown qualified id must not be applied anywhere"
1511 );
1512
1513 let first = execution_a.pick_next_sync().expect("first test");
1515 let second = execution_a.pick_next_sync().expect("second test");
1516
1517 let pairs: Vec<(String, u64)> = [first, second]
1518 .into_iter()
1519 .map(|n| {
1520 let v: Arc<u64> = n
1521 .deps
1522 .get("clone_dep")
1523 .expect("dep available")
1524 .clone()
1525 .downcast()
1526 .unwrap();
1527 (n.test.name.clone(), *v)
1528 })
1529 .collect();
1530
1531 let val_a = pairs
1532 .iter()
1533 .find(|(n, _)| n == "t_a")
1534 .expect("t_a picked")
1535 .1;
1536 let val_b = pairs
1537 .iter()
1538 .find(|(n, _)| n == "t_b")
1539 .expect("t_b picked")
1540 .1;
1541 assert_eq!(
1542 val_a, 111,
1543 "mod_a test must see mod_a's pre-populated value"
1544 );
1545 assert_eq!(
1546 val_b, 222,
1547 "mod_b test must see mod_b's pre-populated value"
1548 );
1549
1550 assert_eq!(
1555 counter_a.load(Ordering::SeqCst),
1556 1,
1557 "mod_a constructor must have run exactly once (during wire collection)"
1558 );
1559 assert_eq!(
1560 counter_b.load(Ordering::SeqCst),
1561 1,
1562 "mod_b constructor must have run exactly once (during wire collection)"
1563 );
1564 }
1565
1566 fn registered_hosted_dep(
1575 name: &str,
1576 payload: u64,
1577 owner_counter: Arc<AtomicUsize>,
1578 ) -> RegisteredDependency {
1579 registered_hosted_dep_in(name, "", payload, owner_counter)
1580 }
1581
1582 fn registered_hosted_dep_in(
1587 name: &str,
1588 module_path: &str,
1589 payload: u64,
1590 owner_counter: Arc<AtomicUsize>,
1591 ) -> RegisteredDependency {
1592 let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
1593 owner_counter.fetch_add(1, Ordering::SeqCst);
1594 Arc::new(payload) as Arc<dyn Any + Send + Sync>
1595 }));
1596 let codec = CloneableCodec {
1597 to_wire: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
1599 let v: Arc<u64> = any.downcast::<u64>().unwrap();
1600 (*v).to_le_bytes().to_vec()
1601 }),
1602 from_wire_bytes: Arc::new(|bytes: &[u8]| {
1605 let boxed: Vec<u8> = bytes.to_vec();
1606 Arc::new(boxed) as Arc<dyn Any + Send + Sync>
1607 }),
1608 };
1609 let worker_fn =
1610 crate::internal::WorkerReconstructor::Sync(Arc::new(|wire_payload, _deps| {
1611 let bytes_arc: Arc<Vec<u8>> = wire_payload.downcast::<Vec<u8>>().unwrap();
1612 let arr: [u8; 8] = (*bytes_arc).as_slice().try_into().unwrap();
1613 let value: u64 = u64::from_le_bytes(arr);
1614 Arc::new(value) as Arc<dyn Any + Send + Sync>
1615 }));
1616 RegisteredDependency {
1617 name: name.to_string(),
1618 crate_name: "tcrate".to_string(),
1619 module_path: module_path.to_string(),
1620 constructor,
1621 dependencies: Vec::new(),
1622 scope: DepScope::Hosted,
1623 worker_fn: Some(worker_fn),
1624 cloneable_codec: None,
1625 hosted_codec: Some(codec),
1626 rpc_factory: None,
1627 companions: Vec::new(),
1628 }
1629 }
1630
1631 #[test]
1632 fn hosted_descriptor_collection_runs_owner_once_and_keeps_it_alive() {
1633 let owner_counter = Arc::new(AtomicUsize::new(0));
1634 let dep = registered_hosted_dep("hosted_dep", 0xcafe_babe_dead_beef, owner_counter.clone());
1635 let test = registered_test("t1", vec!["hosted_dep".to_string()]);
1636
1637 let (execution, _filtered) =
1638 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1639
1640 let (descriptors, owners) = execution.collect_hosted_descriptor_bytes_sync();
1641 assert_eq!(descriptors.len(), 1, "exactly one hosted dep expected");
1642 assert_eq!(owners.len(), 1, "exactly one hosted owner kept alive");
1643
1644 let (dep_id, descriptor_bytes) = &descriptors[0];
1645 assert_eq!(
1646 dep_id, "tcrate::hosted_dep",
1647 "descriptor must be keyed by the fully-qualified id"
1648 );
1649 assert_eq!(
1650 descriptor_bytes.as_slice(),
1651 &0xcafe_babe_dead_beef_u64.to_le_bytes(),
1652 "expected descriptor bytes to match codec.to_wire of payload"
1653 );
1654 assert_eq!(
1655 owner_counter.load(Ordering::SeqCst),
1656 1,
1657 "owner constructor must have run exactly once"
1658 );
1659
1660 let held: Arc<u64> = owners[0].clone().downcast::<u64>().unwrap();
1663 assert_eq!(*held, 0xcafe_babe_dead_beef);
1664 }
1665
1666 #[test]
1667 fn hosted_descriptor_roundtrips_to_worker_value_via_provide_cloneable_value() {
1668 let owner_counter = Arc::new(AtomicUsize::new(0));
1669 let dep = registered_hosted_dep("hosted_dep", 0x1234_5678_u64, owner_counter.clone());
1670 let test = registered_test("t1", vec!["hosted_dep".to_string()]);
1671
1672 let (mut execution, _filtered) =
1673 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1674
1675 let pre_value: Arc<dyn Any + Send + Sync> = Arc::new(0x1234_5678_u64);
1679 let applied = execution.provide_cloneable_value("tcrate::hosted_dep", pre_value);
1680 assert!(
1681 applied,
1682 "Hosted dep must accept pre-populated values via the same path as Cloneable"
1683 );
1684
1685 let next = execution.pick_next_sync().expect("test should be picked");
1688 let view = next.deps.get("hosted_dep").expect("dep available");
1689 let value: Arc<u64> = view.downcast::<u64>().unwrap();
1690 assert_eq!(*value, 0x1234_5678);
1691 assert_eq!(
1692 owner_counter.load(Ordering::SeqCst),
1693 0,
1694 "Hosted owner constructor must not run on the worker side"
1695 );
1696 }
1697
1698 #[test]
1699 fn has_hosted_dependencies_reports_correctly() {
1700 let dep = registered_hosted_dep("h", 0, Arc::new(AtomicUsize::new(0)));
1701 let test = registered_test("t1", vec!["h".to_string()]);
1702 let (execution, _filtered) =
1703 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1704 assert!(execution.has_hosted_dependencies());
1705 assert!(!execution.has_shared_dependencies());
1706 assert!(!execution.has_cloneable_dependencies());
1707 }
1708
1709 #[test]
1712 fn hosted_owner_runs_exactly_once_even_when_collecting_multiple_times() {
1713 let counter_a = Arc::new(AtomicUsize::new(0));
1717 let counter_b = Arc::new(AtomicUsize::new(0));
1718
1719 let mut dep_a = registered_hosted_dep("hosted_a", 1, counter_a.clone());
1721 dep_a.name = "hosted_a".to_string();
1722 let mut dep_b = registered_hosted_dep("hosted_b", 2, counter_b.clone());
1723 dep_b.name = "hosted_b".to_string();
1724 let test = registered_test("t1", vec!["hosted_a".to_string(), "hosted_b".to_string()]);
1725
1726 let (execution, _filtered) =
1727 TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test], &[]);
1728
1729 let (descriptors, owners) = execution.collect_hosted_descriptor_bytes_sync();
1730 assert_eq!(descriptors.len(), 2);
1731 assert_eq!(owners.len(), 2);
1732 assert_eq!(counter_a.load(Ordering::SeqCst), 1);
1733 assert_eq!(counter_b.load(Ordering::SeqCst), 1);
1734 }
1735
1736 #[test]
1742 fn hosted_descriptor_routing_uses_qualified_id_across_modules() {
1743 let counter_a = Arc::new(AtomicUsize::new(0));
1744 let counter_b = Arc::new(AtomicUsize::new(0));
1745
1746 let dep_a = registered_hosted_dep_in("hosted_dep", "mod_a", 11, counter_a.clone());
1748 let dep_b = registered_hosted_dep_in("hosted_dep", "mod_b", 22, counter_b.clone());
1749
1750 let test_a = registered_test_in_module("t_a", "mod_a", vec!["hosted_dep".to_string()]);
1751 let test_b = registered_test_in_module("t_b", "mod_b", vec!["hosted_dep".to_string()]);
1752
1753 let (execution, _filtered) = TestSuiteExecution::construct(
1754 &Arguments::default(),
1755 &[dep_a, dep_b],
1756 &[test_a, test_b],
1757 &[],
1758 );
1759
1760 let (mut descriptors, _owners) = execution.collect_hosted_descriptor_bytes_sync();
1762 descriptors.sort_by(|l, r| l.0.cmp(&r.0));
1763 assert_eq!(descriptors.len(), 2);
1764 assert_eq!(descriptors[0].0, "tcrate::mod_a::hosted_dep");
1765 assert_eq!(descriptors[1].0, "tcrate::mod_b::hosted_dep");
1766 assert_eq!(descriptors[0].1.as_slice(), &11_u64.to_le_bytes());
1767 assert_eq!(descriptors[1].1.as_slice(), &22_u64.to_le_bytes());
1768
1769 let mut execution = execution;
1774 let applied_a =
1775 execution.provide_cloneable_value("tcrate::mod_a::hosted_dep", Arc::new(111_u64));
1776 assert!(
1777 applied_a,
1778 "mod_a hosted dep must be reachable by qualified id"
1779 );
1780 let applied_b =
1781 execution.provide_cloneable_value("tcrate::mod_b::hosted_dep", Arc::new(222_u64));
1782 assert!(
1783 applied_b,
1784 "mod_b hosted dep must be reachable by qualified id"
1785 );
1786
1787 let applied_unknown =
1788 execution.provide_cloneable_value("tcrate::mod_c::hosted_dep", Arc::new(333_u64));
1789 assert!(
1790 !applied_unknown,
1791 "unknown qualified id must not be applied to any dep"
1792 );
1793
1794 let first = execution.pick_next_sync().expect("first test");
1795 let second = execution.pick_next_sync().expect("second test");
1796 let pairs: Vec<(String, u64)> = [first, second]
1797 .into_iter()
1798 .map(|n| {
1799 let v: Arc<u64> = n
1800 .deps
1801 .get("hosted_dep")
1802 .expect("dep available")
1803 .clone()
1804 .downcast()
1805 .unwrap();
1806 (n.test.name.clone(), *v)
1807 })
1808 .collect();
1809
1810 let val_a = pairs
1811 .iter()
1812 .find(|(n, _)| n == "t_a")
1813 .expect("t_a picked")
1814 .1;
1815 let val_b = pairs
1816 .iter()
1817 .find(|(n, _)| n == "t_b")
1818 .expect("t_b picked")
1819 .1;
1820 assert_eq!(val_a, 111);
1821 assert_eq!(val_b, 222);
1822
1823 assert_eq!(counter_a.load(Ordering::SeqCst), 1);
1828 assert_eq!(counter_b.load(Ordering::SeqCst), 1);
1829 }
1830
1831 #[test]
1840 fn hosted_no_spawn_workers_uses_worker_side_handle() {
1841 let owner_counter = Arc::new(AtomicUsize::new(0));
1846 let constructor_counter = owner_counter.clone();
1847 let owner_value: u64 = 0xAAAA_AAAA_AAAA_AAAA_u64;
1848 let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
1849 constructor_counter.fetch_add(1, Ordering::SeqCst);
1850 Arc::new(owner_value) as Arc<dyn Any + Send + Sync>
1851 }));
1852 let codec = CloneableCodec {
1858 to_wire: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
1859 let v: Arc<u64> = any.downcast::<u64>().unwrap();
1860 (*v).to_le_bytes().to_vec()
1861 }),
1862 from_wire_bytes: Arc::new(|bytes: &[u8]| {
1863 let boxed: Vec<u8> = bytes.to_vec();
1864 Arc::new(boxed) as Arc<dyn Any + Send + Sync>
1865 }),
1866 };
1867 let worker_fn =
1868 crate::internal::WorkerReconstructor::Sync(Arc::new(|wire_payload, _deps| {
1869 let bytes_arc: Arc<Vec<u8>> = wire_payload.downcast::<Vec<u8>>().unwrap();
1870 let arr: [u8; 8] = (*bytes_arc).as_slice().try_into().unwrap();
1871 let raw: u64 = u64::from_le_bytes(arr);
1872 let handle_value: u64 = !raw;
1873 Arc::new(handle_value) as Arc<dyn Any + Send + Sync>
1874 }));
1875 let dep = RegisteredDependency {
1876 name: "hosted_dep".to_string(),
1877 crate_name: "tcrate".to_string(),
1878 module_path: String::new(),
1879 constructor,
1880 dependencies: Vec::new(),
1881 scope: DepScope::Hosted,
1882 worker_fn: Some(worker_fn.clone()),
1883 cloneable_codec: None,
1884 hosted_codec: Some(codec.clone()),
1885 rpc_factory: None,
1886 companions: Vec::new(),
1887 };
1888 let test = registered_test("t1", vec!["hosted_dep".to_string()]);
1889
1890 let (mut execution, _filtered) =
1891 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1892
1893 let (descriptors, owners) = execution.collect_hosted_descriptor_bytes_sync();
1897 assert_eq!(descriptors.len(), 1);
1898 assert_eq!(owners.len(), 1);
1899 let (dep_id, wire_bytes) = &descriptors[0];
1900
1901 let wire_payload = (codec.from_wire_bytes)(wire_bytes.as_slice());
1905 let empty_deps: Arc<dyn crate::internal::DependencyView + Send + Sync> =
1906 Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
1907 let reconstructed = match &worker_fn {
1908 crate::internal::WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
1909 crate::internal::WorkerReconstructor::Async(_) => unreachable!(),
1910 };
1911 let applied = execution.provide_cloneable_value(dep_id, reconstructed);
1912 assert!(applied);
1913
1914 let next = execution.pick_next_sync().expect("test picked");
1917 let view = next.deps.get("hosted_dep").expect("dep available");
1918 let value: Arc<u64> = view.clone().downcast::<u64>().unwrap();
1919 assert_eq!(
1920 *value,
1921 !owner_value,
1922 "Hosted dep must expose the worker-side handle (from_descriptor) even in the no-spawn-workers path"
1923 );
1924 assert_eq!(
1925 owner_counter.load(Ordering::SeqCst),
1926 1,
1927 "owner constructor must have run exactly once during descriptor collection"
1928 );
1929 }
1930
1931 #[test]
1935 fn hosted_dep_with_owner_dependencies_constructs_in_parent_context() {
1936 let dep_counter = Arc::new(AtomicUsize::new(0));
1937 let owner_counter = Arc::new(AtomicUsize::new(0));
1938 let dep = registered_cloneable_dep("some_other_dep", dep_counter.clone());
1939 let mut hosted = registered_hosted_dep("h_with_deps", 0, owner_counter.clone());
1940 hosted.dependencies = vec!["some_other_dep".to_string()];
1941 let test = registered_test("t1", vec!["h_with_deps".to_string()]);
1942 let (execution, _filtered) =
1943 TestSuiteExecution::construct(&Arguments::default(), &[dep, hosted], &[test], &[]);
1944 let collected = execution.collect_parent_shared_dependencies_sync();
1945
1946 assert_eq!(collected.hosted_descriptor_bytes.len(), 1);
1947 assert_eq!(dep_counter.load(Ordering::SeqCst), 1);
1948 assert_eq!(owner_counter.load(Ordering::SeqCst), 1);
1949 }
1950
1951 #[cfg(feature = "tokio")]
1954 #[test]
1955 fn async_hosted_descriptor_collection_awaits_async_constructor() {
1956 use std::pin::Pin;
1957
1958 let counter = Arc::new(AtomicUsize::new(0));
1959 let constructor_counter = counter.clone();
1960
1961 let constructor = DependencyConstructor::Async(Arc::new(move |_view| {
1962 let counter = constructor_counter.clone();
1963 Box::pin(async move {
1964 tokio::task::yield_now().await;
1965 counter.fetch_add(1, Ordering::SeqCst);
1966 let value: u64 = 42;
1967 Arc::new(value) as Arc<dyn Any + Send + Sync>
1968 }) as Pin<Box<dyn std::future::Future<Output = Arc<dyn Any + Send + Sync>>>>
1969 }));
1970 let codec = CloneableCodec {
1971 to_wire: Arc::new(|any| {
1972 let v: Arc<u64> = any.downcast::<u64>().unwrap();
1973 (*v).to_le_bytes().to_vec()
1974 }),
1975 from_wire_bytes: Arc::new(|bytes| {
1976 let boxed: Vec<u8> = bytes.to_vec();
1977 Arc::new(boxed) as Arc<dyn Any + Send + Sync>
1978 }),
1979 };
1980 let dep = RegisteredDependency {
1981 name: "hosted_async".to_string(),
1982 crate_name: "tcrate".to_string(),
1983 module_path: String::new(),
1984 constructor,
1985 dependencies: Vec::new(),
1986 scope: DepScope::Hosted,
1987 worker_fn: Some(crate::internal::WorkerReconstructor::Sync(Arc::new(
1988 |wire_payload, _| {
1989 let bytes_arc: Arc<Vec<u8>> = wire_payload.downcast::<Vec<u8>>().unwrap();
1990 let arr: [u8; 8] = (*bytes_arc).as_slice().try_into().unwrap();
1991 let value: u64 = u64::from_le_bytes(arr);
1992 Arc::new(value) as Arc<dyn Any + Send + Sync>
1993 },
1994 ))),
1995 cloneable_codec: None,
1996 hosted_codec: Some(codec),
1997 rpc_factory: None,
1998 companions: Vec::new(),
1999 };
2000 let test = registered_test("t1", vec!["hosted_async".to_string()]);
2001
2002 let (execution, _filtered) =
2003 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2004
2005 let runtime = tokio::runtime::Builder::new_current_thread()
2006 .enable_all()
2007 .build()
2008 .unwrap();
2009 let (descriptors, owners) =
2010 runtime.block_on(execution.collect_hosted_descriptor_bytes_async());
2011
2012 assert_eq!(descriptors.len(), 1);
2013 assert_eq!(owners.len(), 1);
2014 assert_eq!(descriptors[0].0, "tcrate::hosted_async");
2015 assert_eq!(descriptors[0].1.as_slice(), &42_u64.to_le_bytes());
2016 assert_eq!(counter.load(Ordering::SeqCst), 1);
2017
2018 let held: Arc<u64> = owners[0].clone().downcast::<u64>().unwrap();
2019 assert_eq!(*held, 42);
2020 }
2021
2022 use crate::internal::{
2027 HostedRpcChannel, HostedRpcDep, HostedRpcError, HostedRpcOwnerCell, HostedRpcTransport,
2028 InProcessHostedRpcTransport, RpcFactory,
2029 };
2030
2031 struct RpcCounter {
2035 n: u64,
2036 }
2037
2038 impl HostedRpcDep for RpcCounter {
2039 type Stub = RpcCounterStub;
2040 fn dispatch(&mut self, method_idx: u32, args: &[u8]) -> Result<Vec<u8>, String> {
2041 match method_idx {
2042 1 => {
2043 self.n += 1;
2044 Ok(self.n.to_be_bytes().to_vec())
2045 }
2046 2 => {
2051 let arr: [u8; 4] = args
2052 .try_into()
2053 .map_err(|_| "method_idx=2 requires exactly 4 bytes (size)".to_string())?;
2054 let size = u32::from_be_bytes(arr) as usize;
2055 let mut out = vec![0u8; size];
2056 for (i, b) in out.iter_mut().enumerate() {
2057 *b = (i % 251) as u8;
2058 }
2059 Ok(out)
2060 }
2061 other => Err(format!("RpcCounter: unknown method_idx {other}")),
2062 }
2063 }
2064 fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
2065 RpcCounterStub { channel }
2066 }
2067 }
2068
2069 struct RpcCounterStub {
2071 channel: HostedRpcChannel,
2072 }
2073
2074 impl RpcCounterStub {
2075 fn next(&self) -> u64 {
2076 let bytes = self.channel.call(1, Vec::new()).expect("rpc call");
2077 let arr: [u8; 8] = bytes.as_slice().try_into().unwrap();
2078 u64::from_be_bytes(arr)
2079 }
2080
2081 fn echo(&self, size: u32) -> Vec<u8> {
2083 self.channel
2084 .call(2, size.to_be_bytes().to_vec())
2085 .expect("echo rpc call")
2086 }
2087 }
2088
2089 fn registered_hosted_rpc_dep(
2094 name: &str,
2095 module_path: &str,
2096 owner_counter: Arc<AtomicUsize>,
2097 ) -> RegisteredDependency {
2098 let ctor_counter = owner_counter.clone();
2099 let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
2100 ctor_counter.fetch_add(1, Ordering::SeqCst);
2101 let cell = HostedRpcOwnerCell::from_owner(RpcCounter { n: 0 });
2102 Arc::new(cell) as Arc<dyn Any + Send + Sync>
2103 }));
2104 let factory = RpcFactory {
2105 owner_into_cell: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
2106 any.downcast::<HostedRpcOwnerCell>()
2107 .expect("HostedRpc owner downcast")
2108 }),
2109 build_stub: Arc::new(|channel: HostedRpcChannel| {
2110 let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
2111 Arc::new(stub) as Arc<dyn Any + Send + Sync>
2112 }),
2113 };
2114 RegisteredDependency {
2115 name: name.to_string(),
2116 crate_name: "tcrate".to_string(),
2117 module_path: module_path.to_string(),
2118 constructor,
2119 dependencies: Vec::new(),
2120 scope: DepScope::HostedRpc,
2121 worker_fn: None,
2122 cloneable_codec: None,
2123 hosted_codec: None,
2124 rpc_factory: Some(factory),
2125 companions: Vec::new(),
2126 }
2127 }
2128
2129 #[test]
2130 fn hosted_rpc_owner_cells_collected_once_and_keyed_by_qualified_id() {
2131 let counter = Arc::new(AtomicUsize::new(0));
2132 let dep = registered_hosted_rpc_dep("rpc_dep", "", counter.clone());
2133 let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2134
2135 let (execution, _filtered) =
2136 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2137
2138 assert!(execution.has_hosted_rpc_dependencies());
2139
2140 let cells = execution.collect_hosted_rpc_owner_cells_sync();
2141 assert_eq!(cells.len(), 1, "exactly one hosted rpc dep expected");
2142 let (dep_id, _cell) = &cells[0];
2143 assert_eq!(dep_id, "tcrate::rpc_dep");
2144 assert_eq!(
2145 counter.load(Ordering::SeqCst),
2146 1,
2147 "owner constructor must run exactly once on the parent"
2148 );
2149
2150 let cells_b = execution.collect_hosted_rpc_owner_cells_sync();
2155 assert_eq!(cells_b.len(), 1);
2156 assert_eq!(
2157 counter.load(Ordering::SeqCst),
2158 2,
2159 "collect_hosted_rpc_owner_cells_sync runs the constructor on every call; \
2160 callers (the runner) are responsible for only calling it once per suite"
2161 );
2162 }
2163
2164 #[test]
2165 fn hosted_rpc_owner_dependencies_construct_in_parent_context() {
2166 let parent_only_counter = Arc::new(AtomicUsize::new(0));
2167 let owner_counter = Arc::new(AtomicUsize::new(0));
2168 let parent_only_dep =
2169 registered_cloneable_dep("parent_only_dep", parent_only_counter.clone());
2170 let mut rpc_dep = registered_hosted_rpc_dep("rpc_dep", "", owner_counter.clone());
2171 rpc_dep.dependencies = vec!["parent_only_dep".to_string()];
2172 let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2173
2174 let (execution, _filtered) = TestSuiteExecution::construct(
2175 &Arguments::default(),
2176 &[parent_only_dep, rpc_dep],
2177 &[test],
2178 &[],
2179 );
2180
2181 let cells = execution.collect_hosted_rpc_owner_cells_sync();
2182 assert_eq!(cells.len(), 1);
2183 assert_eq!(parent_only_counter.load(Ordering::SeqCst), 1);
2184 assert_eq!(owner_counter.load(Ordering::SeqCst), 1);
2185 }
2186
2187 #[test]
2188 fn hosted_rpc_in_process_transport_routes_to_owner_cell() {
2189 let counter = Arc::new(AtomicUsize::new(0));
2190 let dep = registered_hosted_rpc_dep("rpc_dep", "", counter.clone());
2191 let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2192
2193 let (execution, _filtered) =
2194 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2195
2196 let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
2197 .collect_hosted_rpc_owner_cells_sync()
2198 .into_iter()
2199 .collect();
2200
2201 let transport: Arc<dyn HostedRpcTransport> =
2202 Arc::new(InProcessHostedRpcTransport::new(cells.clone()));
2203 let channel = HostedRpcChannel::new("tcrate::rpc_dep".to_string(), transport.clone());
2204 let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
2205
2206 assert_eq!(stub.next(), 1);
2207 assert_eq!(stub.next(), 2);
2208 assert_eq!(stub.next(), 3);
2209 }
2210
2211 #[test]
2212 fn hosted_rpc_in_process_transport_returns_dispatch_error_on_unknown_method() {
2213 let counter = Arc::new(AtomicUsize::new(0));
2214 let dep = registered_hosted_rpc_dep("rpc_dep", "", counter);
2215 let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2216
2217 let (execution, _filtered) =
2218 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2219
2220 let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
2221 .collect_hosted_rpc_owner_cells_sync()
2222 .into_iter()
2223 .collect();
2224 let transport: Arc<dyn HostedRpcTransport> =
2225 Arc::new(InProcessHostedRpcTransport::new(cells.clone()));
2226 let channel = HostedRpcChannel::new("tcrate::rpc_dep".to_string(), transport.clone());
2227
2228 let err = channel.call(999, Vec::new()).unwrap_err();
2231 match err {
2232 HostedRpcError::Dispatch(msg) => {
2233 assert!(
2234 msg.contains("unknown method_idx 999"),
2235 "expected dispatch error to mention method_idx, got '{msg}'"
2236 );
2237 }
2238 HostedRpcError::Transport(msg) => {
2239 panic!("expected Dispatch error, got Transport({msg})");
2240 }
2241 }
2242 }
2243
2244 #[test]
2245 fn hosted_rpc_in_process_transport_returns_transport_error_on_unknown_dep_id() {
2246 let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = HashMap::new();
2247 let transport: Arc<dyn HostedRpcTransport> =
2248 Arc::new(InProcessHostedRpcTransport::new(cells));
2249 let channel = HostedRpcChannel::new("tcrate::missing_dep".to_string(), transport.clone());
2250 let err = channel.call(1, Vec::new()).unwrap_err();
2251 match err {
2252 HostedRpcError::Transport(msg) => {
2253 assert!(
2254 msg.contains("unknown dep id 'tcrate::missing_dep'"),
2255 "expected transport error to mention dep id, got '{msg}'"
2256 );
2257 }
2258 HostedRpcError::Dispatch(msg) => {
2259 panic!("expected Transport error, got Dispatch({msg})");
2260 }
2261 }
2262 }
2263
2264 struct PanickingRpcOwner;
2274
2275 impl HostedRpcDep for PanickingRpcOwner {
2276 type Stub = RpcCounterStub;
2277 fn dispatch(&mut self, _method_idx: u32, _args: &[u8]) -> Result<Vec<u8>, String> {
2278 panic!("owner_panic_for_test");
2279 }
2280 fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
2281 RpcCounterStub { channel }
2282 }
2283 }
2284
2285 #[test]
2286 fn hosted_rpc_owner_panic_surfaces_then_poisons() {
2287 let cell = HostedRpcOwnerCell::from_owner(PanickingRpcOwner);
2288
2289 let err1 = cell
2298 .dispatch(1, &[])
2299 .expect_err("first call must surface the panic as Err");
2300 assert!(
2301 err1.contains("hosted rpc owner panicked: owner_panic_for_test"),
2302 "expected first-call error to wrap the panic payload, got '{err1}'"
2303 );
2304
2305 let err2 = cell
2309 .dispatch(1, &[])
2310 .expect_err("second call must short-circuit on the poisoned cell");
2311 assert_eq!(
2312 err2, "hosted rpc owner poisoned",
2313 "expected poisoned-cell error on the second call, got '{err2}'"
2314 );
2315 }
2316
2317 #[test]
2324 fn hosted_rpc_in_process_transport_round_trips_large_payload_exceeding_64_kib() {
2325 let counter = Arc::new(AtomicUsize::new(0));
2326 let dep = registered_hosted_rpc_dep("rpc_dep", "", counter);
2327 let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2328
2329 let (execution, _filtered) =
2330 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2331 let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
2332 .collect_hosted_rpc_owner_cells_sync()
2333 .into_iter()
2334 .collect();
2335 let transport: Arc<dyn HostedRpcTransport> =
2336 Arc::new(InProcessHostedRpcTransport::new(cells));
2337 let channel = HostedRpcChannel::new("tcrate::rpc_dep".to_string(), transport);
2338 let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
2339
2340 const SIZE: u32 = 256 * 1024; let bytes = stub.echo(SIZE);
2342 assert_eq!(
2343 bytes.len(),
2344 SIZE as usize,
2345 "framing dropped/truncated bytes"
2346 );
2347 for (i, b) in bytes.iter().enumerate() {
2348 assert_eq!(
2349 *b,
2350 (i % 251) as u8,
2351 "framing corrupted byte at index {i}: expected {}, got {b}",
2352 (i % 251) as u8
2353 );
2354 }
2355 }
2356
2357 #[test]
2358 fn hosted_rpc_in_process_transport_multiplexes_concurrent_calls_from_threads() {
2359 use std::thread;
2360
2361 let counter = Arc::new(AtomicUsize::new(0));
2362 let dep = registered_hosted_rpc_dep("rpc_dep", "", counter);
2363 let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2364
2365 let (execution, _filtered) =
2366 TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2367 let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
2368 .collect_hosted_rpc_owner_cells_sync()
2369 .into_iter()
2370 .collect();
2371 let transport: Arc<dyn HostedRpcTransport> =
2372 Arc::new(InProcessHostedRpcTransport::new(cells));
2373
2374 const N: usize = 4;
2381 const M: usize = 32;
2382 let mut handles = Vec::new();
2383 for _ in 0..N {
2384 let dep_id = "tcrate::rpc_dep".to_string();
2385 let transport = transport.clone();
2386 handles.push(thread::spawn(move || {
2387 let channel = HostedRpcChannel::new(dep_id, transport);
2388 let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
2389 let mut ids = Vec::with_capacity(M);
2390 for _ in 0..M {
2391 ids.push(stub.next());
2392 }
2393 ids
2394 }));
2395 }
2396 let mut all = Vec::with_capacity(N * M);
2397 for h in handles {
2398 all.extend(h.join().expect("thread panicked"));
2399 }
2400 all.sort();
2401 let mut prev: u64 = 0;
2402 for id in &all {
2403 assert!(
2404 *id > prev,
2405 "duplicate or non-monotonic id {id} after {prev}"
2406 );
2407 prev = *id;
2408 }
2409 assert_eq!(
2410 all.len(),
2411 N * M,
2412 "expected exactly {} ids in total, got {}",
2413 N * M,
2414 all.len()
2415 );
2416 }
2417}