Skip to main content

test_r_core/
execution.rs

1use rand::prelude::{SliceRandom, StdRng};
2use rand::SeedableRng;
3use std::any::Any;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::fmt::{Debug, Formatter};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use topological_sort::TopologicalSort;
9
10use crate::args::Arguments;
11use crate::internal::{
12    apply_suite_props_to_tests, filter_registered_tests, DepScope, DependencyConstructor,
13    DependencyView, HostedRpcOwnerCell, RegisteredDependency, RegisteredTest,
14    RegisteredTestSuiteProperty,
15};
16
17/// Wire bytes for a single Cloneable / Hosted dependency, keyed by its
18/// fully-qualified id (`{crate}::{module}::{name}`).
19pub type DepWireBytes = (String, Vec<u8>);
20
21/// Parent-held owner value (used only for `Hosted` deps — the parent keeps
22/// the owner alive for the suite's duration).
23pub type HostedOwner = Arc<dyn Any + Send + Sync>;
24
25#[cfg(feature = "tokio")]
26type ParentSharedDependenciesFuture<'a> = std::pin::Pin<
27    Box<dyn std::future::Future<Output = HashMap<String, Arc<dyn Any + Send + Sync>>> + 'a>,
28>;
29
30#[cfg(test)]
31/// Result of [`TestSuiteExecution::collect_hosted_descriptor_bytes_sync`] /
32/// [`TestSuiteExecution::collect_hosted_descriptor_bytes_async`]: the
33/// descriptor bytes that get shipped to workers, plus the parent-held owner
34/// values that must outlive every worker.
35pub type HostedDescriptorCollection = (Vec<DepWireBytes>, Vec<HostedOwner>);
36
37/// Parent-side materialisation output for dependency scopes whose worker-side
38/// value is derived from parent-owned state instead of by rerunning the user
39/// constructor in each worker process.
40pub struct ParentSharedDependencies {
41    pub cloneable_wire_bytes: Vec<DepWireBytes>,
42    pub hosted_descriptor_bytes: Vec<DepWireBytes>,
43    pub hosted_owners: Vec<HostedOwner>,
44    pub hosted_rpc_owner_cells: Vec<(String, Arc<HostedRpcOwnerCell>)>,
45}
46
47impl ParentSharedDependencies {
48    fn new() -> Self {
49        Self {
50            cloneable_wire_bytes: Vec::new(),
51            hosted_descriptor_bytes: Vec::new(),
52            hosted_owners: Vec::new(),
53            hosted_rpc_owner_cells: Vec::new(),
54        }
55    }
56}
57
58pub(crate) struct TestSuiteExecution {
59    crate_and_module: String,
60    dependencies: Vec<RegisteredDependency>,
61    tests: Vec<RegisteredTest>,
62    props: Vec<RegisteredTestSuiteProperty>,
63    inner: Vec<TestSuiteExecution>,
64    materialized_dependencies: HashMap<String, Arc<dyn Any + Send + Sync>>,
65    sequential_lock: SequentialExecutionLock,
66    remaining_count: usize,
67    idx: usize,
68    is_sequential: bool,
69    skip_creating_dependencies: bool,
70    in_progress: Arc<AtomicUsize>,
71}
72
73impl TestSuiteExecution {
74    pub fn construct(
75        arguments: &Arguments,
76        dependencies: &[RegisteredDependency],
77        tests: &[RegisteredTest],
78        props: &[RegisteredTestSuiteProperty],
79    ) -> (Self, Vec<RegisteredTest>) {
80        let tests_with_props = apply_suite_props_to_tests(tests, props);
81        let mut filtered_tests = filter_registered_tests(arguments, &tests_with_props);
82        Self::shuffle(arguments, &mut filtered_tests);
83        filtered_tests.reverse();
84
85        if filtered_tests.is_empty() {
86            (
87                Self::root(
88                    dependencies
89                        .iter()
90                        .filter(|dep| dep.crate_name.is_empty() && dep.module_path.is_empty())
91                        .cloned()
92                        .collect::<Vec<_>>(),
93                    Vec::new(),
94                    props
95                        .iter()
96                        .filter(|dep| dep.crate_name().is_empty() && dep.module_path().is_empty())
97                        .cloned()
98                        .collect::<Vec<_>>(),
99                ),
100                Vec::new(),
101            )
102        } else {
103            let mut root = Self::root(Vec::new(), Vec::new(), Vec::new());
104
105            for prop in props {
106                root.add_prop(prop.clone());
107            }
108
109            for dep in dependencies {
110                root.add_dependency(dep.clone());
111            }
112
113            for test in filtered_tests.clone() {
114                root.add_test(test.clone());
115            }
116
117            root.propagate_sequential(None);
118            root.prune_unused_deps();
119
120            (root, filtered_tests)
121        }
122    }
123
124    fn shuffle(arguments: &Arguments, tests: &mut [RegisteredTest]) {
125        if let Some(seed) = arguments.shuffle_seed {
126            let mut rng = StdRng::seed_from_u64(seed);
127            tests.shuffle(&mut rng);
128        }
129    }
130
131    /// Disables creating dependencies when picking the next test. This is useful when the execution plan
132    /// is only used to drive spawned workers instead of actually running the tests.
133    pub fn skip_creating_dependencies(&mut self) {
134        self.skip_creating_dependencies = true;
135        for inner in &mut self.inner {
136            inner.skip_creating_dependencies();
137        }
138    }
139
140    pub fn remaining(&self) -> usize {
141        self.remaining_count
142    }
143
144    pub fn is_empty(&self) -> bool {
145        self.tests.is_empty() && self.inner.is_empty()
146    }
147
148    pub fn is_done(&self) -> bool {
149        self.remaining_count == 0
150    }
151
152    /// Returns true if either this level, or any of the inner levels have dependencies
153    #[allow(dead_code)]
154    pub fn has_dependencies(&self) -> bool {
155        !self.dependencies.is_empty() || self.inner.iter().any(|inner| inner.has_dependencies())
156    }
157
158    /// Returns true if any dependency in this subtree uses `DepScope::Shared`
159    /// — those force single-threaded execution when output capture is on,
160    /// because the materialised value cannot cross the parent/worker boundary.
161    pub fn has_shared_dependencies(&self) -> bool {
162        self.dependencies
163            .iter()
164            .any(|d| d.scope == DepScope::Shared)
165            || self
166                .inner
167                .iter()
168                .any(|inner| inner.has_shared_dependencies())
169    }
170
171    /// Returns true if any dependency in this subtree uses `DepScope::Cloneable`.
172    #[allow(dead_code)]
173    pub fn has_cloneable_dependencies(&self) -> bool {
174        self.dependencies
175            .iter()
176            .any(|d| d.scope == DepScope::Cloneable)
177            || self
178                .inner
179                .iter()
180                .any(|inner| inner.has_cloneable_dependencies())
181    }
182
183    /// Returns true if any dependency in this subtree uses `DepScope::Hosted`.
184    /// The parent keeps Hosted owners alive for the duration of the suite
185    /// while shipping descriptors to workers.
186    #[allow(dead_code)]
187    pub fn has_hosted_dependencies(&self) -> bool {
188        self.dependencies
189            .iter()
190            .any(|d| d.scope == DepScope::Hosted)
191            || self
192                .inner
193                .iter()
194                .any(|inner| inner.has_hosted_dependencies())
195    }
196
197    /// Returns true if any dependency in this subtree uses `DepScope::HostedRpc`.
198    /// The parent keeps owner cells alive for the suite and routes
199    /// worker-initiated IPC calls to those cells.
200    #[allow(dead_code)]
201    pub fn has_hosted_rpc_dependencies(&self) -> bool {
202        self.dependencies
203            .iter()
204            .any(|d| d.scope == DepScope::HostedRpc)
205            || self
206                .inner
207                .iter()
208                .any(|inner| inner.has_hosted_rpc_dependencies())
209    }
210
211    /// Collects every Cloneable dependency in this subtree (depth-first).
212    #[allow(dead_code)]
213    pub fn collect_cloneable_dependencies(&self) -> Vec<RegisteredDependency> {
214        let mut out = Vec::new();
215        self.collect_cloneable_dependencies_into(&mut out);
216        out
217    }
218
219    #[allow(dead_code)]
220    fn collect_cloneable_dependencies_into(&self, out: &mut Vec<RegisteredDependency>) {
221        for dep in &self.dependencies {
222            if dep.scope == DepScope::Cloneable {
223                out.push(dep.clone());
224            }
225        }
226        for inner in &self.inner {
227            inner.collect_cloneable_dependencies_into(out);
228        }
229    }
230
231    /// Walks the subtree, materialising dependencies in dependency order and
232    /// collecting the parent-side wire/state needed by Cloneable, Hosted, and
233    /// HostedRpc scopes. Constructor dependencies are resolved in this parent
234    /// context, but workers still receive these shared scopes as dependency-free
235    /// leaves: Cloneable/Hosted values are reconstructed from bytes, and
236    /// HostedRpc values are stubs backed by a channel.
237    pub fn collect_parent_shared_dependencies_sync(&self) -> ParentSharedDependencies {
238        let mut out = ParentSharedDependencies::new();
239        let parent_map = HashMap::new();
240        self.collect_parent_shared_dependencies_into_sync(&parent_map, &mut out);
241        out
242    }
243
244    fn collect_parent_shared_dependencies_into_sync(
245        &self,
246        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
247        out: &mut ParentSharedDependencies,
248    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
249        let mut dependency_map = parent_map.clone();
250        let sorted_dependencies = self.sorted_dependencies();
251
252        for dep in sorted_dependencies {
253            if dependency_map.contains_key(&dep.name) {
254                continue;
255            }
256
257            let value = Self::construct_dependency_sync(dep, &dependency_map);
258            match dep.scope {
259                DepScope::Cloneable => {
260                    let codec = dep.cloneable_codec.as_ref().unwrap_or_else(|| {
261                        panic!("Cloneable dep '{}' missing CloneableCodec", dep.name)
262                    });
263                    out.cloneable_wire_bytes
264                        .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
265                }
266                DepScope::Hosted => {
267                    let codec = dep.hosted_codec.as_ref().unwrap_or_else(|| {
268                        panic!("Hosted dep '{}' missing hosted codec", dep.name)
269                    });
270                    out.hosted_descriptor_bytes
271                        .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
272                    out.hosted_owners.push(value.clone());
273                }
274                DepScope::HostedRpc => {
275                    let factory = dep.rpc_factory.as_ref().unwrap_or_else(|| {
276                        panic!("HostedRpc dep '{}' missing RpcFactory", dep.name)
277                    });
278                    let cell = (factory.owner_into_cell)(value.clone());
279                    out.hosted_rpc_owner_cells.push((dep.qualified_id(), cell));
280                }
281                DepScope::Shared | DepScope::PerWorker => {}
282            }
283
284            dependency_map.insert(dep.name.clone(), value);
285        }
286
287        for inner in &self.inner {
288            inner.collect_parent_shared_dependencies_into_sync(&dependency_map, out);
289        }
290
291        dependency_map
292    }
293
294    fn construct_dependency_sync(
295        dep: &RegisteredDependency,
296        dependency_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
297    ) -> Arc<dyn Any + Send + Sync> {
298        match &dep.constructor {
299            DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
300            DependencyConstructor::Async(cons) => {
301                futures::executor::block_on(cons(Arc::new(dependency_map.clone())))
302            }
303        }
304    }
305
306    /// Collects only Cloneable wire bytes. The runner uses
307    /// [`Self::collect_parent_shared_dependencies_sync`] to collect all shared
308    /// parent-side values in one pass; this narrower helper remains for unit
309    /// tests and focused callers.
310    #[cfg(test)]
311    pub fn collect_cloneable_wire_bytes_sync(&self) -> Vec<(String, Vec<u8>)> {
312        self.collect_parent_shared_dependencies_sync()
313            .cloneable_wire_bytes
314    }
315
316    /// Parent-side materialisation for `Hosted` dependencies.
317    ///
318    /// The returned descriptor bytes are keyed by fully-qualified dep id and
319    /// the returned owner values must be kept alive for the duration of the
320    /// suite. Unlike Cloneable, Hosted owners may hold resources (TCP
321    /// listeners, Docker containers, gRPC clients, etc.) that workers'
322    /// reconstructed handles depend on.
323    #[cfg(test)]
324    pub fn collect_hosted_descriptor_bytes_sync(&self) -> HostedDescriptorCollection {
325        let collected = self.collect_parent_shared_dependencies_sync();
326        (collected.hosted_descriptor_bytes, collected.hosted_owners)
327    }
328
329    /// Parent-side materialisation for `HostedRpc` dependencies.
330    ///
331    /// Returns `(qualified_id, cell)` pairs that the runtime keeps alive for
332    /// the suite's lifetime and uses to dispatch worker-initiated RPC calls.
333    #[cfg(test)]
334    pub fn collect_hosted_rpc_owner_cells_sync(&self) -> Vec<(String, Arc<HostedRpcOwnerCell>)> {
335        self.collect_parent_shared_dependencies_sync()
336            .hosted_rpc_owner_cells
337    }
338
339    /// Async counterpart of [`Self::collect_parent_shared_dependencies_sync`].
340    /// Async constructors are awaited on the parent before workers receive
341    /// wire bytes, descriptors, or RPC stubs.
342    #[cfg(feature = "tokio")]
343    pub async fn collect_parent_shared_dependencies_async(&self) -> ParentSharedDependencies {
344        let mut out = ParentSharedDependencies::new();
345        let parent_map = HashMap::new();
346        self.collect_parent_shared_dependencies_into_async(&parent_map, &mut out)
347            .await;
348        out
349    }
350
351    #[cfg(feature = "tokio")]
352    fn collect_parent_shared_dependencies_into_async<'a>(
353        &'a self,
354        parent_map: &'a HashMap<String, Arc<dyn Any + Send + Sync>>,
355        out: &'a mut ParentSharedDependencies,
356    ) -> ParentSharedDependenciesFuture<'a> {
357        Box::pin(async move {
358            let mut dependency_map = parent_map.clone();
359            let sorted_dependencies = self.sorted_dependencies();
360
361            for dep in sorted_dependencies {
362                if dependency_map.contains_key(&dep.name) {
363                    continue;
364                }
365
366                let value = match &dep.constructor {
367                    DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
368                    DependencyConstructor::Async(cons) => {
369                        cons(Arc::new(dependency_map.clone())).await
370                    }
371                };
372                match dep.scope {
373                    DepScope::Cloneable => {
374                        let codec = dep.cloneable_codec.as_ref().unwrap_or_else(|| {
375                            panic!("Cloneable dep '{}' missing CloneableCodec", dep.name)
376                        });
377                        out.cloneable_wire_bytes
378                            .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
379                    }
380                    DepScope::Hosted => {
381                        let codec = dep.hosted_codec.as_ref().unwrap_or_else(|| {
382                            panic!("Hosted dep '{}' missing hosted codec", dep.name)
383                        });
384                        out.hosted_descriptor_bytes
385                            .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
386                        out.hosted_owners.push(value.clone());
387                    }
388                    DepScope::HostedRpc => {
389                        let factory = dep.rpc_factory.as_ref().unwrap_or_else(|| {
390                            panic!("HostedRpc dep '{}' missing RpcFactory", dep.name)
391                        });
392                        let cell = (factory.owner_into_cell)(value.clone());
393                        out.hosted_rpc_owner_cells.push((dep.qualified_id(), cell));
394                    }
395                    DepScope::Shared | DepScope::PerWorker => {}
396                }
397
398                dependency_map.insert(dep.name.clone(), value);
399            }
400
401            for inner in &self.inner {
402                inner
403                    .collect_parent_shared_dependencies_into_async(&dependency_map, out)
404                    .await;
405            }
406            dependency_map
407        })
408    }
409
410    /// Async Hosted-only collection helper retained for focused callers.
411    #[cfg(feature = "tokio")]
412    #[cfg(test)]
413    pub async fn collect_hosted_descriptor_bytes_async(&self) -> HostedDescriptorCollection {
414        let collected = self.collect_parent_shared_dependencies_async().await;
415        (collected.hosted_descriptor_bytes, collected.hosted_owners)
416    }
417
418    /// Async Cloneable-only collection helper retained for focused callers.
419    ///
420    /// **Intentionally `!Send`.** The underlying `DependencyConstructor::Async`
421    /// future is not `Send`, so the returned future from this collector cannot
422    /// be either. Must be awaited on the root runner task (i.e., under
423    /// `Runtime::block_on` or directly inside `test_runner`) — never inside
424    /// `tokio::spawn` / a `JoinSet`. If we ever want to spawn Cloneable
425    /// collection onto a worker, the constructor type would need to require
426    /// `Send` first.
427    #[cfg(feature = "tokio")]
428    #[cfg(test)]
429    pub async fn collect_cloneable_wire_bytes_async(&self) -> Vec<(String, Vec<u8>)> {
430        self.collect_parent_shared_dependencies_async()
431            .await
432            .cloneable_wire_bytes
433    }
434
435    /// Worker-side counterpart to [`Self::collect_cloneable_wire_bytes_sync`]:
436    /// pre-populates the Cloneable dep value at the node where the dep is
437    /// registered, so the upcoming `materialize_deps_sync` call uses the
438    /// provided value instead of running the original constructor. The lookup
439    /// is keyed by the dep's fully-qualified id
440    /// (`{crate}::{module}::{name}`), but the value is stored under the local
441    /// `name` so the rest of the materialisation logic keeps working unchanged.
442    /// Returns `true` if a matching dep was found in any node of the subtree.
443    pub fn provide_cloneable_value(
444        &mut self,
445        dep_id: &str,
446        value: Arc<dyn Any + Send + Sync>,
447    ) -> bool {
448        let applied = self.provide_cloneable_value_internal(dep_id, value);
449        if applied {
450            self.prune_unused_deps();
451        }
452        applied
453    }
454
455    fn provide_cloneable_value_internal(
456        &mut self,
457        dep_id: &str,
458        value: Arc<dyn Any + Send + Sync>,
459    ) -> bool {
460        let mut applied = false;
461        if let Some((local_name, dep_idx)) = self
462            .dependencies
463            .iter()
464            .enumerate()
465            .find(|(_, d)| d.qualified_id() == dep_id)
466            .map(|(idx, d)| (d.name.clone(), idx))
467        {
468            // From the worker execution tree's perspective this dependency is
469            // now a leaf: its value came from wire bytes or a HostedRpc channel,
470            // so the worker must not instantiate constructor-only dependencies
471            // that were needed solely in the parent collection context.
472            self.dependencies[dep_idx].dependencies.clear();
473            self.materialized_dependencies
474                .insert(local_name, value.clone());
475            applied = true;
476        }
477        for inner in &mut self.inner {
478            applied |= inner.provide_cloneable_value_internal(dep_id, value.clone());
479        }
480        applied
481    }
482
483    /// Returns true if there are any tests that require capturing, based on the given default setting
484    /// and the per-test CaptureControl overrides.
485    pub fn requires_capturing(&self, capture_by_default: bool) -> bool {
486        self.tests.iter().any(|test| {
487            test.props
488                .capture_control
489                .requires_capturing(capture_by_default)
490        }) || self
491            .inner
492            .iter()
493            .any(|inner| inner.requires_capturing(capture_by_default))
494    }
495
496    #[cfg(feature = "tokio")]
497    pub async fn pick_next(&mut self) -> Option<TestExecution> {
498        if self.is_empty() {
499            None
500        } else {
501            match self
502                .pick_next_internal(&self.create_dependency_map(&HashMap::new()))
503                .await
504            {
505                Some((test, deps, seq_lock, in_progress_counter)) => {
506                    let index = self.idx;
507                    self.idx += 1;
508                    Some(TestExecution {
509                        test: test.clone(),
510                        deps: Arc::new(deps),
511                        index,
512                        _seq_lock: seq_lock,
513                        in_progress_counter,
514                    })
515                }
516                None => None,
517            }
518        }
519    }
520
521    pub fn pick_next_sync(&mut self) -> Option<TestExecution> {
522        match self.pick_next_internal_sync(&HashMap::new()) {
523            Some((test, deps, seq_lock, in_progress_counter)) => {
524                let index = self.idx;
525                self.idx += 1;
526                Some(TestExecution {
527                    test: test.clone(),
528                    deps: Arc::new(deps),
529                    index,
530                    _seq_lock: seq_lock,
531                    in_progress_counter,
532                })
533            }
534            None => None,
535        }
536    }
537
538    #[cfg(feature = "tokio")]
539    #[allow(clippy::type_complexity)]
540    async fn pick_next_internal(
541        &mut self,
542        materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
543    ) -> Option<(
544        RegisteredTest,
545        HashMap<String, Arc<dyn Any + Send + Sync>>,
546        SequentialExecutionLockGuard,
547        Arc<AtomicUsize>,
548    )> {
549        if self.is_empty() {
550            None
551        } else {
552            let dependency_map = if !self.is_materialized() {
553                self.materialize_deps(materialized_parent_deps).await
554            } else {
555                self.create_dependency_map(materialized_parent_deps)
556            };
557
558            let locked = self.sequential_lock.is_locked().await;
559            let result = if self.tests.is_empty() || locked {
560                let current = self.inner.iter_mut();
561                let mut result = None;
562                for inner in current {
563                    if let Some((test, deps, seq_lock, in_progress_counter)) =
564                        Box::pin(inner.pick_next_internal(&dependency_map)).await
565                    {
566                        result = Some((test, deps, seq_lock, in_progress_counter));
567                        break;
568                    }
569                }
570                self.inner.retain(|inner| !inner.is_empty());
571
572                result
573            } else {
574                let guard = self.sequential_lock.lock(self.is_sequential).await;
575                self.in_progress.fetch_add(1, Ordering::Release);
576                self.tests
577                    .pop()
578                    .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
579            };
580            if result.is_none()
581                && self.is_empty()
582                && self.is_materialized()
583                && !locked
584                && self.in_progress.load(Ordering::Acquire) == 0
585            {
586                self.drop_deps();
587            }
588            if result.is_some() {
589                self.remaining_count -= 1;
590            }
591            result
592        }
593    }
594
595    #[allow(clippy::type_complexity)]
596    fn pick_next_internal_sync(
597        &mut self,
598        materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
599    ) -> Option<(
600        RegisteredTest,
601        HashMap<String, Arc<dyn Any + Send + Sync>>,
602        SequentialExecutionLockGuard,
603        Arc<AtomicUsize>,
604    )> {
605        if self.is_empty() {
606            None
607        } else {
608            let dependency_map = if !self.is_materialized() {
609                self.materialize_deps_sync(materialized_parent_deps)
610            } else {
611                self.create_dependency_map(materialized_parent_deps)
612            };
613
614            let locked = self.sequential_lock.is_locked_sync();
615            let result = if self.tests.is_empty() || locked {
616                let current = self.inner.iter_mut();
617                let mut result = None;
618                for inner in current {
619                    if let Some((test, deps, seq_lock, in_progress_counter)) =
620                        inner.pick_next_internal_sync(&dependency_map)
621                    {
622                        result = Some((test, deps, seq_lock, in_progress_counter));
623                        break;
624                    }
625                }
626
627                self.inner.retain(|inner| !inner.is_empty());
628                result
629            } else {
630                let guard = self.sequential_lock.lock_sync(self.is_sequential);
631                self.in_progress.fetch_add(1, Ordering::Release);
632                self.tests
633                    .pop()
634                    .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
635            };
636            if result.is_none()
637                && self.is_materialized()
638                && !locked
639                && self.in_progress.load(Ordering::Acquire) == 0
640            {
641                self.drop_deps();
642            }
643            if result.is_some() {
644                self.remaining_count -= 1;
645            }
646            result
647        }
648    }
649
650    fn create_dependency_map(
651        &self,
652        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
653    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
654        let mut result = parent_map.clone();
655        for (key, dep) in &self.materialized_dependencies {
656            result.insert(key.clone(), dep.clone());
657        }
658        result
659    }
660
661    fn root(
662        deps: Vec<RegisteredDependency>,
663        tests: Vec<RegisteredTest>,
664        props: Vec<RegisteredTestSuiteProperty>,
665    ) -> Self {
666        let total_count = tests.len();
667        let is_sequential = props
668            .iter()
669            .any(|prop| matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }))
670            || tests.iter().any(|test| test.run.is_bench());
671        Self {
672            crate_and_module: String::new(),
673            dependencies: deps,
674            tests,
675            props,
676            inner: Vec::new(),
677            materialized_dependencies: HashMap::new(),
678            remaining_count: total_count,
679            idx: 0,
680            sequential_lock: SequentialExecutionLock::new(),
681            is_sequential,
682            skip_creating_dependencies: false,
683            in_progress: Arc::new(AtomicUsize::new(0)),
684        }
685    }
686
687    fn add_dependency(&mut self, dep: RegisteredDependency) {
688        let crate_and_module = dep.crate_and_module();
689        if self.crate_and_module == crate_and_module {
690            self.dependencies.push(dep);
691        } else {
692            let mut found = false;
693            for inner in &mut self.inner {
694                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
695                    inner.add_dependency(dep.clone());
696                    found = true;
697                    break;
698                }
699            }
700            if !found {
701                let mut inner = Self {
702                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
703                    dependencies: vec![],
704                    tests: vec![],
705                    inner: vec![],
706                    props: vec![],
707                    materialized_dependencies: HashMap::new(),
708                    remaining_count: 0,
709                    idx: 0,
710                    is_sequential: false,
711                    sequential_lock: SequentialExecutionLock::new(),
712                    skip_creating_dependencies: false,
713                    in_progress: Arc::new(AtomicUsize::new(0)),
714                };
715                inner.add_dependency(dep);
716                self.inner.push(inner);
717            }
718        }
719    }
720
721    fn add_test(&mut self, test: RegisteredTest) {
722        let crate_and_module = test.crate_and_module();
723        if self.crate_and_module == crate_and_module {
724            self.tests.push(test.clone());
725
726            if test.run.is_bench() {
727                self.is_sequential = true;
728            }
729        } else {
730            let mut found = false;
731            for inner in &mut self.inner {
732                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
733                    inner.add_test(test.clone());
734                    found = true;
735                    break;
736                }
737            }
738            if !found {
739                let mut inner = Self {
740                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
741                    dependencies: vec![],
742                    tests: vec![],
743                    inner: vec![],
744                    props: vec![],
745                    materialized_dependencies: HashMap::new(),
746                    remaining_count: 0,
747                    idx: 0,
748                    is_sequential: false,
749                    sequential_lock: SequentialExecutionLock::new(),
750                    skip_creating_dependencies: false,
751                    in_progress: Arc::new(AtomicUsize::new(0)),
752                };
753                inner.add_test(test);
754                self.inner.push(inner);
755            }
756        }
757        self.remaining_count += 1;
758    }
759
760    fn add_prop(&mut self, prop: RegisteredTestSuiteProperty) {
761        let crate_and_module = prop.crate_and_module();
762        if self.crate_and_module == crate_and_module {
763            if matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }) {
764                self.is_sequential = true;
765            }
766            self.props.push(prop);
767        } else {
768            let mut found = false;
769            for inner in &mut self.inner {
770                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
771                    inner.add_prop(prop.clone());
772                    found = true;
773                    break;
774                }
775            }
776            if !found {
777                let mut inner = Self {
778                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
779                    dependencies: vec![],
780                    tests: vec![],
781                    inner: vec![],
782                    props: vec![],
783                    materialized_dependencies: HashMap::new(),
784                    remaining_count: 0,
785                    idx: 0,
786                    is_sequential: false,
787                    sequential_lock: SequentialExecutionLock::new(),
788                    skip_creating_dependencies: false,
789                    in_progress: Arc::new(AtomicUsize::new(0)),
790                };
791                inner.add_prop(prop);
792                self.inner.push(inner);
793            }
794        }
795    }
796
797    fn is_materialized(&self) -> bool {
798        self.skip_creating_dependencies
799            || self.materialized_dependencies.len() == self.dependencies.len()
800    }
801
802    #[cfg(feature = "tokio")]
803    async fn materialize_deps(
804        &mut self,
805        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
806    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
807        // Start with any pre-populated values (e.g. Cloneable deps received
808        // from the parent via ProvideCloneable IPC).
809        let mut deps = self.materialized_dependencies.clone();
810        let mut dependency_map = parent_map.clone();
811        for (k, v) in &deps {
812            dependency_map.insert(k.clone(), v.clone());
813        }
814
815        let sorted_dependencies = self.sorted_dependencies();
816        for dep in &sorted_dependencies {
817            if deps.contains_key(&dep.name) {
818                continue;
819            }
820            let materialized_dep = match &dep.constructor {
821                DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
822                DependencyConstructor::Async(cons) => cons(Arc::new(dependency_map.clone())).await,
823            };
824            deps.insert(dep.name.clone(), materialized_dep.clone());
825            dependency_map.insert(dep.name.clone(), materialized_dep);
826        }
827        self.materialized_dependencies = deps;
828        dependency_map
829    }
830
831    fn materialize_deps_sync(
832        &mut self,
833        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
834    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
835        // Start with any pre-populated values (e.g. Cloneable deps received
836        // from the parent via ProvideCloneable IPC).
837        let mut deps = self.materialized_dependencies.clone();
838        let mut dependency_map = parent_map.clone();
839        for (k, v) in &deps {
840            dependency_map.insert(k.clone(), v.clone());
841        }
842
843        let sorted_dependencies = self.sorted_dependencies();
844        for dep in &sorted_dependencies {
845            if deps.contains_key(&dep.name) {
846                continue;
847            }
848            let materialized_dep = match &dep.constructor {
849                DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
850                DependencyConstructor::Async(cons) => {
851                    futures::executor::block_on(cons(Arc::new(dependency_map.clone())))
852                }
853            };
854            deps.insert(dep.name.clone(), materialized_dep.clone());
855            dependency_map.insert(dep.name.clone(), materialized_dep);
856        }
857        self.materialized_dependencies = deps;
858        dependency_map
859    }
860
861    fn sorted_dependencies(&self) -> Vec<&RegisteredDependency> {
862        let mut ts: TopologicalSort<&RegisteredDependency> = TopologicalSort::new();
863        for dep in &self.dependencies {
864            let mut added = false;
865            for dep_dep_name in &dep.dependencies {
866                if let Some(dep_dep) = self.dependencies.iter().find(|d| &d.name == dep_dep_name) {
867                    ts.add_dependency(dep_dep, dep);
868                    added = true;
869                } else {
870                    // otherwise it is expected to come from the parent level
871                }
872            }
873            if !added {
874                ts.insert(dep);
875            }
876        }
877        let mut result = Vec::with_capacity(self.dependencies.len());
878        loop {
879            let chunk = ts.pop_all();
880            if chunk.is_empty() {
881                break;
882            }
883            result.extend(chunk);
884        }
885        result
886    }
887
888    fn drop_deps(&mut self) {
889        self.materialized_dependencies.clear();
890    }
891
892    /// Prunes dependencies that are not needed by any test in this subtree.
893    /// Returns `Some(needed_from_parent)` with dep names needed from ancestor levels,
894    /// or `None` if pruning is disabled for this subtree (unknown deps).
895    fn prune_unused_deps(&mut self) -> Option<HashSet<String>> {
896        // Collect dep names needed by tests at this level
897        let mut needed: Option<HashSet<String>> = Some(HashSet::new());
898        for test in &self.tests {
899            match &test.dependencies {
900                None => {
901                    needed = None;
902                    break;
903                }
904                Some(deps) => {
905                    if let Some(ref mut set) = needed {
906                        set.extend(deps.iter().cloned());
907                    }
908                }
909            }
910        }
911
912        // Merge children's needs
913        for inner in &mut self.inner {
914            let child_needs = inner.prune_unused_deps();
915            needed = match (needed, child_needs) {
916                (None, _) | (_, None) => None,
917                (Some(mut a), Some(b)) => {
918                    a.extend(b);
919                    Some(a)
920                }
921            };
922        }
923
924        // If any test has unknown deps, keep everything
925        let needed = needed?;
926
927        // Determine which local deps to keep
928        let local_names: HashSet<String> =
929            self.dependencies.iter().map(|d| d.name.clone()).collect();
930        let mut keep_local: HashSet<String> = needed.intersection(&local_names).cloned().collect();
931
932        // Expand transitive closure for local deps only (fixpoint)
933        let mut queue: VecDeque<String> = keep_local.iter().cloned().collect();
934        let mut needed_from_parent: HashSet<String> =
935            needed.difference(&local_names).cloned().collect();
936
937        while let Some(dep_name) = queue.pop_front() {
938            if let Some(dep) = self.dependencies.iter().find(|d| d.name == dep_name) {
939                for transitive in &dep.dependencies {
940                    if local_names.contains(transitive) {
941                        if keep_local.insert(transitive.clone()) {
942                            queue.push_back(transitive.clone());
943                        }
944                    } else {
945                        needed_from_parent.insert(transitive.clone());
946                    }
947                }
948                // Companions are planner-only sibling links — no
949                // constructor argument is derived from them, but they
950                // must be retained together with the dep they are
951                // declared on. Used by the
952                // `#[test_dep(scope = Hosted, worker = both(T))]`
953                // lowering to keep the Hosted owner half and the
954                // HostedRpc stub half as a pair even when the
955                // selected tests only parameterise on one of them.
956                for companion in &dep.companions {
957                    if local_names.contains(companion) {
958                        if keep_local.insert(companion.clone()) {
959                            queue.push_back(companion.clone());
960                        }
961                    } else {
962                        needed_from_parent.insert(companion.clone());
963                    }
964                }
965            }
966        }
967
968        // Prune
969        self.dependencies.retain(|d| keep_local.contains(&d.name));
970
971        Some(needed_from_parent)
972    }
973
974    fn is_prefix_of(this: &str, that: &str) -> bool {
975        this.is_empty() || this == that || that.starts_with(&format!("{this}::"))
976    }
977
978    fn next_level(from: &str, to: &str) -> String {
979        assert!(Self::is_prefix_of(from, to));
980        let remaining = if from.is_empty() {
981            to
982        } else {
983            &to[from.len() + 2..]
984        };
985
986        let result = if let Some((next, _tail)) = remaining.split_once("::") {
987            format!("{from}::{next}")
988        } else {
989            format!("{from}::{remaining}")
990        };
991        result.trim_start_matches("::").to_string()
992    }
993
994    fn propagate_sequential(&mut self, inherited_lock: Option<&SequentialExecutionLock>) {
995        if let Some(parent_lock) = inherited_lock {
996            self.is_sequential = true;
997            self.sequential_lock = parent_lock.clone();
998        }
999
1000        let lock_for_children = if self.is_sequential {
1001            Some(self.sequential_lock.clone())
1002        } else {
1003            None
1004        };
1005
1006        for child in &mut self.inner {
1007            child.propagate_sequential(lock_for_children.as_ref());
1008        }
1009    }
1010}
1011
1012impl Debug for TestSuiteExecution {
1013    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1014        writeln!(
1015            f,
1016            "'{}' {} [{}]",
1017            self.crate_and_module,
1018            self.props
1019                .iter()
1020                .map(|x| format!("{x:?}"))
1021                .collect::<Vec<_>>()
1022                .join(", "),
1023            if self.is_sequential { "S" } else { "P" }
1024        )?;
1025        writeln!(f, "  deps:")?;
1026        for dep in &self.dependencies {
1027            writeln!(f, "    '{}'", dep.name)?;
1028        }
1029        writeln!(f, "  tests:")?;
1030        for test in &self.tests {
1031            writeln!(f, "    '{}' [{:?}]", test.name, test.props.test_type)?;
1032        }
1033        for inner in &self.inner {
1034            let inner_str = format!("{inner:?}");
1035            for inner_line in inner_str.lines() {
1036                writeln!(f, "  {inner_line}")?;
1037            }
1038        }
1039        Ok(())
1040    }
1041}
1042
1043impl DependencyView for HashMap<String, Arc<dyn Any + Send + Sync>> {
1044    fn get(&self, name: &str) -> Option<Arc<dyn Any + Send + Sync>> {
1045        self.get(name).cloned()
1046    }
1047}
1048
1049pub struct TestExecution {
1050    pub test: RegisteredTest,
1051    pub deps: Arc<dyn DependencyView + Send + Sync>,
1052    pub index: usize,
1053    _seq_lock: SequentialExecutionLockGuard,
1054    in_progress_counter: Arc<AtomicUsize>,
1055}
1056
1057impl Drop for TestExecution {
1058    fn drop(&mut self) {
1059        self.in_progress_counter.fetch_sub(1, Ordering::Release);
1060    }
1061}
1062
1063#[allow(dead_code)]
1064enum SequentialExecutionLockGuard {
1065    None,
1066    #[cfg(feature = "tokio")]
1067    Async(tokio::sync::OwnedMutexGuard<()>),
1068    Sync(parking_lot::ArcMutexGuard<parking_lot::RawMutex, ()>),
1069}
1070
1071#[derive(Clone)]
1072struct SequentialExecutionLock {
1073    #[cfg(feature = "tokio")]
1074    async_mutex: Arc<tokio::sync::Mutex<()>>,
1075    sync_mutex: Arc<parking_lot::Mutex<()>>,
1076}
1077
1078impl SequentialExecutionLock {
1079    pub fn new() -> Self {
1080        Self {
1081            #[cfg(feature = "tokio")]
1082            async_mutex: Arc::new(tokio::sync::Mutex::new(())),
1083            sync_mutex: Arc::new(parking_lot::Mutex::new(())),
1084        }
1085    }
1086
1087    #[cfg(feature = "tokio")]
1088    pub async fn is_locked(&self) -> bool {
1089        self.async_mutex.try_lock().is_err()
1090    }
1091
1092    pub fn is_locked_sync(&self) -> bool {
1093        self.sync_mutex.try_lock().is_none()
1094    }
1095
1096    #[cfg(feature = "tokio")]
1097    pub async fn lock(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
1098        if is_sequential {
1099            let permit = tokio::sync::Mutex::lock_owned(self.async_mutex.clone()).await;
1100            SequentialExecutionLockGuard::Async(permit)
1101        } else {
1102            SequentialExecutionLockGuard::None
1103        }
1104    }
1105
1106    pub fn lock_sync(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
1107        if is_sequential {
1108            let permit = parking_lot::Mutex::lock_arc(&self.sync_mutex);
1109            SequentialExecutionLockGuard::Sync(permit)
1110        } else {
1111            SequentialExecutionLockGuard::None
1112        }
1113    }
1114}
1115
1116#[cfg(test)]
1117mod cloneable_tests {
1118    use super::*;
1119    use crate::internal::{
1120        CloneableCodec, DependencyConstructor, RegisteredDependency, RegisteredTest, TestFunction,
1121        TestProperties,
1122    };
1123    use std::sync::atomic::{AtomicUsize, Ordering};
1124
1125    fn registered_test(name: &str, deps: Vec<String>) -> RegisteredTest {
1126        registered_test_in_module(name, "", deps)
1127    }
1128
1129    fn registered_test_in_module(
1130        name: &str,
1131        module_path: &str,
1132        deps: Vec<String>,
1133    ) -> RegisteredTest {
1134        RegisteredTest {
1135            name: name.to_string(),
1136            crate_name: "tcrate".to_string(),
1137            module_path: module_path.to_string(),
1138            run: TestFunction::Sync(Arc::new(|_| Box::new(()))),
1139            props: TestProperties::default(),
1140            dependencies: Some(deps),
1141        }
1142    }
1143
1144    /// A Cloneable dep whose constructor increments a counter (so we can
1145    /// assert it ran exactly once), encodes via simple little-endian bytes.
1146    fn registered_cloneable_dep(name: &str, counter: Arc<AtomicUsize>) -> RegisteredDependency {
1147        registered_cloneable_dep_in(name, "", 0xdead_beef, counter)
1148    }
1149
1150    /// Like [`registered_cloneable_dep`] but lets the caller pick the
1151    /// dep's module path and the constant the constructor emits — so a
1152    /// collision test can assert two same-named deps in different modules
1153    /// don't get crossed up.
1154    fn registered_cloneable_dep_in(
1155        name: &str,
1156        module_path: &str,
1157        constructor_value: u64,
1158        counter: Arc<AtomicUsize>,
1159    ) -> RegisteredDependency {
1160        let constructor_counter = counter.clone();
1161        let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
1162            constructor_counter.fetch_add(1, Ordering::SeqCst);
1163            Arc::new(constructor_value) as Arc<dyn Any + Send + Sync>
1164        }));
1165        let codec = CloneableCodec {
1166            to_wire: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
1167                let value: Arc<u64> = any.downcast::<u64>().unwrap();
1168                (*value).to_le_bytes().to_vec()
1169            }),
1170            from_wire_bytes: Arc::new(|bytes: &[u8]| {
1171                let arr: [u8; 8] = bytes.try_into().unwrap();
1172                let value = u64::from_le_bytes(arr);
1173                Arc::new(value) as Arc<dyn Any + Send + Sync>
1174            }),
1175        };
1176        RegisteredDependency {
1177            name: name.to_string(),
1178            crate_name: "tcrate".to_string(),
1179            module_path: module_path.to_string(),
1180            constructor,
1181            dependencies: Vec::new(),
1182            scope: DepScope::Cloneable,
1183            worker_fn: Some(crate::internal::WorkerReconstructor::Sync(Arc::new(
1184                |wire_payload, _deps| wire_payload,
1185            ))),
1186            cloneable_codec: Some(codec),
1187            hosted_codec: None,
1188            rpc_factory: None,
1189            companions: Vec::new(),
1190        }
1191    }
1192
1193    #[test]
1194    fn cloneable_wire_collection_runs_constructor_once_and_encodes_value() {
1195        let counter = Arc::new(AtomicUsize::new(0));
1196        let dep = registered_cloneable_dep("clone_dep", counter.clone());
1197        let test = registered_test("t1", vec!["clone_dep".to_string()]);
1198
1199        let (execution, _filtered) =
1200            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1201
1202        let collected = execution.collect_cloneable_wire_bytes_sync();
1203        assert_eq!(collected.len(), 1, "exactly one cloneable dep expected");
1204        let (dep_id, wire_bytes) = &collected[0];
1205        assert_eq!(
1206            dep_id, "tcrate::clone_dep",
1207            "wire bytes must be keyed by the fully-qualified id, not the local name"
1208        );
1209        assert_eq!(
1210            wire_bytes.as_slice(),
1211            &0xdead_beef_u64.to_le_bytes(),
1212            "expected the codec-encoded value to round-trip via to_wire"
1213        );
1214        assert_eq!(
1215            counter.load(Ordering::SeqCst),
1216            1,
1217            "constructor must have run exactly once when collecting"
1218        );
1219    }
1220
1221    #[test]
1222    fn prune_unused_deps_retains_companion_when_only_one_half_is_referenced() {
1223        // Regression for `#[test_dep(scope = Hosted, worker = both(T))]`
1224        // pruning. That lowering registers two dep entries (Hosted owner
1225        // view + HostedRpc stub view) for a single logical dep, backed by
1226        // a shared `Arc<HostedBothShared>` cache; the macro now declares
1227        // the two halves as `companions` of each other so that the pruner
1228        // retains the Hosted half even when selected tests only reference
1229        // the stub half. Without that, the async-ctor flavour would panic
1230        // (`Poll::Pending`) at runtime because the shared cache stays
1231        // empty.
1232        //
1233        // This test reproduces the pruner-level invariant cheaply via two
1234        // Cloneable deps. The `keep_local` traversal in
1235        // `prune_unused_deps` should expand the keep-set across
1236        // companions in either direction.
1237
1238        // Case A: reference only `dep_a`; `dep_b` is declared as a
1239        // companion of `dep_a` and must survive pruning.
1240        let counter_a = Arc::new(AtomicUsize::new(0));
1241        let counter_b = Arc::new(AtomicUsize::new(0));
1242        let mut dep_a = registered_cloneable_dep("clone_a", counter_a.clone());
1243        let mut dep_b = registered_cloneable_dep("clone_b", counter_b.clone());
1244        dep_a.companions = vec!["clone_b".to_string()];
1245        dep_b.companions = vec!["clone_a".to_string()];
1246
1247        let test_a = registered_test("t_uses_a", vec!["clone_a".to_string()]);
1248
1249        let (execution, _filtered) =
1250            TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test_a], &[]);
1251
1252        let kept: Vec<String> = execution
1253            .collect_cloneable_dependencies()
1254            .into_iter()
1255            .map(|d| d.name)
1256            .collect();
1257        assert!(
1258            kept.contains(&"clone_a".to_string()),
1259            "directly referenced dep must be retained, kept = {kept:?}"
1260        );
1261        assert!(
1262            kept.contains(&"clone_b".to_string()),
1263            "companion of a retained dep must also be retained (the planner-only \
1264             sibling link used by `worker = both(...)`), kept = {kept:?}"
1265        );
1266
1267        // Case B (reverse direction): reference only `dep_b`, with the
1268        // same companion link. `dep_a` must survive pruning.
1269        let counter_a = Arc::new(AtomicUsize::new(0));
1270        let counter_b = Arc::new(AtomicUsize::new(0));
1271        let mut dep_a = registered_cloneable_dep("clone_a", counter_a.clone());
1272        let mut dep_b = registered_cloneable_dep("clone_b", counter_b.clone());
1273        dep_a.companions = vec!["clone_b".to_string()];
1274        dep_b.companions = vec!["clone_a".to_string()];
1275
1276        let test_b = registered_test("t_uses_b", vec!["clone_b".to_string()]);
1277
1278        let (execution, _filtered) =
1279            TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test_b], &[]);
1280
1281        let kept: Vec<String> = execution
1282            .collect_cloneable_dependencies()
1283            .into_iter()
1284            .map(|d| d.name)
1285            .collect();
1286        assert!(
1287            kept.contains(&"clone_a".to_string()),
1288            "companion of a stub-referenced dep must be retained, kept = {kept:?}"
1289        );
1290        assert!(
1291            kept.contains(&"clone_b".to_string()),
1292            "directly referenced dep must be retained, kept = {kept:?}"
1293        );
1294
1295        // Sanity: a dep with no companion link and not referenced
1296        // anywhere is still pruned. (Prevents the test above from
1297        // accidentally turning into "the pruner never drops anything".)
1298        let counter_a = Arc::new(AtomicUsize::new(0));
1299        let counter_b = Arc::new(AtomicUsize::new(0));
1300        let dep_a = registered_cloneable_dep("clone_a", counter_a.clone());
1301        let dep_b = registered_cloneable_dep("clone_b", counter_b.clone());
1302        let test_a = registered_test("t_uses_a", vec!["clone_a".to_string()]);
1303
1304        let (execution, _filtered) =
1305            TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test_a], &[]);
1306
1307        let kept: Vec<String> = execution
1308            .collect_cloneable_dependencies()
1309            .into_iter()
1310            .map(|d| d.name)
1311            .collect();
1312        assert!(
1313            kept.contains(&"clone_a".to_string()),
1314            "directly referenced dep must be retained, kept = {kept:?}"
1315        );
1316        assert!(
1317            !kept.contains(&"clone_b".to_string()),
1318            "without a companion link, an unreferenced dep must be pruned; \
1319             kept = {kept:?}"
1320        );
1321    }
1322
1323    #[test]
1324    fn provide_cloneable_value_short_circuits_constructor() {
1325        let counter = Arc::new(AtomicUsize::new(0));
1326        let dep = registered_cloneable_dep("clone_dep", counter.clone());
1327        let test = registered_test("t1", vec!["clone_dep".to_string()]);
1328
1329        let (mut execution, _filtered) =
1330            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1331
1332        let pre_value: Arc<dyn Any + Send + Sync> = Arc::new(99_u64);
1333        let applied = execution.provide_cloneable_value("tcrate::clone_dep", pre_value);
1334        assert!(
1335            applied,
1336            "pre-populated value should match the dep's qualified id"
1337        );
1338
1339        // Pick the test — materialize_deps_sync must reuse the pre-populated
1340        // value instead of running the original constructor.
1341        let next = execution.pick_next_sync().expect("test should be picked");
1342        assert_eq!(next.test.name, "t1");
1343
1344        let view = next.deps.get("clone_dep").expect("dep available");
1345        let value: Arc<u64> = view.downcast::<u64>().unwrap();
1346        assert_eq!(*value, 99);
1347
1348        assert_eq!(
1349            counter.load(Ordering::SeqCst),
1350            0,
1351            "constructor must not run when a pre-populated value is supplied"
1352        );
1353    }
1354
1355    #[test]
1356    fn provided_shared_value_is_a_worker_side_leaf() {
1357        let provided_counter = Arc::new(AtomicUsize::new(0));
1358        let parent_only_counter = Arc::new(AtomicUsize::new(0));
1359        let mut provided_dep = registered_cloneable_dep("clone_dep", provided_counter.clone());
1360        provided_dep.dependencies = vec!["parent_only_dep".to_string()];
1361        let parent_only_dep =
1362            registered_cloneable_dep("parent_only_dep", parent_only_counter.clone());
1363        let test = registered_test("t1", vec!["clone_dep".to_string()]);
1364
1365        let (mut execution, _filtered) = TestSuiteExecution::construct(
1366            &Arguments::default(),
1367            &[provided_dep, parent_only_dep],
1368            &[test],
1369            &[],
1370        );
1371
1372        let pre_value: Arc<dyn Any + Send + Sync> = Arc::new(99_u64);
1373        let applied = execution.provide_cloneable_value("tcrate::clone_dep", pre_value);
1374        assert!(applied);
1375
1376        let next = execution.pick_next_sync().expect("test should be picked");
1377        let view = next.deps.get("clone_dep").expect("dep available");
1378        let value: Arc<u64> = view.downcast::<u64>().unwrap();
1379        assert_eq!(*value, 99);
1380        assert_eq!(
1381            provided_counter.load(Ordering::SeqCst),
1382            0,
1383            "worker-side provided values must not run their original constructor"
1384        );
1385        assert_eq!(
1386            parent_only_counter.load(Ordering::SeqCst),
1387            0,
1388            "constructor dependencies are parent-only once a value arrives from wire bytes or an RPC stub"
1389        );
1390    }
1391
1392    /// Async-constructor counterpart for the parent-side collector used by the
1393    /// tokio runner. Verifies that a Cloneable owner declared with
1394    /// `async fn` is awaited on the parent and its wire bytes are produced
1395    /// keyed by the dep's qualified id.
1396    #[cfg(feature = "tokio")]
1397    #[test]
1398    fn async_cloneable_wire_collection_awaits_async_constructor() {
1399        use std::pin::Pin;
1400
1401        let counter = Arc::new(AtomicUsize::new(0));
1402        let constructor_counter = counter.clone();
1403
1404        // Build a RegisteredDependency with an Async constructor that
1405        // genuinely awaits a future (tokio::task::yield_now) on the parent
1406        // side, then returns a u64.
1407        let constructor = DependencyConstructor::Async(Arc::new(move |_view| {
1408            let counter = constructor_counter.clone();
1409            Box::pin(async move {
1410                tokio::task::yield_now().await;
1411                counter.fetch_add(1, Ordering::SeqCst);
1412                let value: u64 = 0xdead_beef;
1413                Arc::new(value) as Arc<dyn Any + Send + Sync>
1414            }) as Pin<Box<dyn std::future::Future<Output = Arc<dyn Any + Send + Sync>>>>
1415        }));
1416        let codec = CloneableCodec {
1417            to_wire: Arc::new(|any| {
1418                let v: Arc<u64> = any.downcast::<u64>().unwrap();
1419                (*v).to_le_bytes().to_vec()
1420            }),
1421            from_wire_bytes: Arc::new(|bytes| {
1422                let arr: [u8; 8] = bytes.try_into().unwrap();
1423                Arc::new(u64::from_le_bytes(arr)) as Arc<dyn Any + Send + Sync>
1424            }),
1425        };
1426        let dep = RegisteredDependency {
1427            name: "clone_dep".to_string(),
1428            crate_name: "tcrate".to_string(),
1429            module_path: String::new(),
1430            constructor,
1431            dependencies: Vec::new(),
1432            scope: DepScope::Cloneable,
1433            worker_fn: Some(crate::internal::WorkerReconstructor::Sync(Arc::new(
1434                |wire_payload, _| wire_payload,
1435            ))),
1436            cloneable_codec: Some(codec),
1437            hosted_codec: None,
1438            rpc_factory: None,
1439            companions: Vec::new(),
1440        };
1441        let test = registered_test("t1", vec!["clone_dep".to_string()]);
1442
1443        let (execution, _filtered) =
1444            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1445
1446        let runtime = tokio::runtime::Builder::new_current_thread()
1447            .enable_all()
1448            .build()
1449            .unwrap();
1450        let collected = runtime.block_on(execution.collect_cloneable_wire_bytes_async());
1451
1452        assert_eq!(collected.len(), 1);
1453        assert_eq!(collected[0].0, "tcrate::clone_dep");
1454        assert_eq!(collected[0].1.as_slice(), &0xdead_beef_u64.to_le_bytes());
1455        assert_eq!(
1456            counter.load(Ordering::SeqCst),
1457            1,
1458            "async constructor must have run exactly once"
1459        );
1460    }
1461
1462    /// Regression test: two cloneable deps that share a local `name` but live
1463    /// in different modules must not collide on the wire and must not
1464    /// cross-apply on the worker side.
1465    #[test]
1466    fn cloneable_value_routing_uses_qualified_id_across_modules() {
1467        let counter_a = Arc::new(AtomicUsize::new(0));
1468        let counter_b = Arc::new(AtomicUsize::new(0));
1469
1470        // Same local name `clone_dep`, different module paths.
1471        let dep_a = registered_cloneable_dep_in("clone_dep", "mod_a", 11, counter_a.clone());
1472        let dep_b = registered_cloneable_dep_in("clone_dep", "mod_b", 22, counter_b.clone());
1473
1474        // One test per module, each takes "clone_dep" from its own module.
1475        let test_a = registered_test_in_module("t_a", "mod_a", vec!["clone_dep".to_string()]);
1476        let test_b = registered_test_in_module("t_b", "mod_b", vec!["clone_dep".to_string()]);
1477
1478        let (execution, _filtered) = TestSuiteExecution::construct(
1479            &Arguments::default(),
1480            &[dep_a, dep_b],
1481            &[test_a, test_b],
1482            &[],
1483        );
1484
1485        // The wire bytes must carry distinct qualified ids and distinct payloads.
1486        let mut collected = execution.collect_cloneable_wire_bytes_sync();
1487        collected.sort_by(|l, r| l.0.cmp(&r.0));
1488        assert_eq!(collected.len(), 2);
1489        assert_eq!(collected[0].0, "tcrate::mod_a::clone_dep");
1490        assert_eq!(collected[1].0, "tcrate::mod_b::clone_dep");
1491        assert_eq!(collected[0].1.as_slice(), &11_u64.to_le_bytes());
1492        assert_eq!(collected[1].1.as_slice(), &22_u64.to_le_bytes());
1493
1494        // Worker-side routing: pre-populating with `mod_a`'s qualified id must
1495        // apply only to `mod_a`'s dep, and similarly for `mod_b`. If the
1496        // routing fell back to plain `name`, both nodes would be updated.
1497        let mut execution_a = execution;
1498        let applied_a =
1499            execution_a.provide_cloneable_value("tcrate::mod_a::clone_dep", Arc::new(111_u64));
1500        assert!(applied_a, "mod_a dep must be reachable by qualified id");
1501        let applied_b =
1502            execution_a.provide_cloneable_value("tcrate::mod_b::clone_dep", Arc::new(222_u64));
1503        assert!(applied_b, "mod_b dep must be reachable by qualified id");
1504
1505        // An unrelated qualified id must not apply to anything.
1506        let applied_unknown =
1507            execution_a.provide_cloneable_value("tcrate::mod_c::clone_dep", Arc::new(333_u64));
1508        assert!(
1509            !applied_unknown,
1510            "unknown qualified id must not be applied anywhere"
1511        );
1512
1513        // Pick both tests and confirm the per-module values stayed separate.
1514        let first = execution_a.pick_next_sync().expect("first test");
1515        let second = execution_a.pick_next_sync().expect("second test");
1516
1517        let pairs: Vec<(String, u64)> = [first, second]
1518            .into_iter()
1519            .map(|n| {
1520                let v: Arc<u64> = n
1521                    .deps
1522                    .get("clone_dep")
1523                    .expect("dep available")
1524                    .clone()
1525                    .downcast()
1526                    .unwrap();
1527                (n.test.name.clone(), *v)
1528            })
1529            .collect();
1530
1531        let val_a = pairs
1532            .iter()
1533            .find(|(n, _)| n == "t_a")
1534            .expect("t_a picked")
1535            .1;
1536        let val_b = pairs
1537            .iter()
1538            .find(|(n, _)| n == "t_b")
1539            .expect("t_b picked")
1540            .1;
1541        assert_eq!(
1542            val_a, 111,
1543            "mod_a test must see mod_a's pre-populated value"
1544        );
1545        assert_eq!(
1546            val_b, 222,
1547            "mod_b test must see mod_b's pre-populated value"
1548        );
1549
1550        // Each per-module constructor ran exactly once — during the parent-side
1551        // wire-bytes collection above. The worker-side `provide_cloneable_value`
1552        // calls must NOT have triggered the constructor a second time on either
1553        // node (otherwise the qualified-id routing is wrong / it cross-applied).
1554        assert_eq!(
1555            counter_a.load(Ordering::SeqCst),
1556            1,
1557            "mod_a constructor must have run exactly once (during wire collection)"
1558        );
1559        assert_eq!(
1560            counter_b.load(Ordering::SeqCst),
1561            1,
1562            "mod_b constructor must have run exactly once (during wire collection)"
1563        );
1564    }
1565
1566    // -------- Hosted dep tests --------
1567
1568    /// Builds a Hosted RegisteredDependency for tests. Owner value is a u64
1569    /// (`payload`). `descriptor()` is modelled by the codec's `to_wire` as
1570    /// the LE bytes of `payload`, and `from_descriptor` is modelled by the
1571    /// worker_fn which downcasts the bytes and rebuilds a u64 — so we can
1572    /// observe both halves of the Hosted round-trip without depending on
1573    /// the user-facing `HostedDep` trait inside this private test helper.
1574    fn registered_hosted_dep(
1575        name: &str,
1576        payload: u64,
1577        owner_counter: Arc<AtomicUsize>,
1578    ) -> RegisteredDependency {
1579        registered_hosted_dep_in(name, "", payload, owner_counter)
1580    }
1581
1582    /// Like [`registered_hosted_dep`] but lets the caller pick the dep's
1583    /// module path — so a collision test can assert two same-named Hosted
1584    /// deps in different modules don't get crossed up on the wire / in the
1585    /// worker routing.
1586    fn registered_hosted_dep_in(
1587        name: &str,
1588        module_path: &str,
1589        payload: u64,
1590        owner_counter: Arc<AtomicUsize>,
1591    ) -> RegisteredDependency {
1592        let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
1593            owner_counter.fetch_add(1, Ordering::SeqCst);
1594            Arc::new(payload) as Arc<dyn Any + Send + Sync>
1595        }));
1596        let codec = CloneableCodec {
1597            // descriptor() on the owner: encode the payload as LE bytes
1598            to_wire: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
1599                let v: Arc<u64> = any.downcast::<u64>().unwrap();
1600                (*v).to_le_bytes().to_vec()
1601            }),
1602            // worker side: box the bytes as Any (worker_fn does
1603            // from_descriptor on them)
1604            from_wire_bytes: Arc::new(|bytes: &[u8]| {
1605                let boxed: Vec<u8> = bytes.to_vec();
1606                Arc::new(boxed) as Arc<dyn Any + Send + Sync>
1607            }),
1608        };
1609        let worker_fn =
1610            crate::internal::WorkerReconstructor::Sync(Arc::new(|wire_payload, _deps| {
1611                let bytes_arc: Arc<Vec<u8>> = wire_payload.downcast::<Vec<u8>>().unwrap();
1612                let arr: [u8; 8] = (*bytes_arc).as_slice().try_into().unwrap();
1613                let value: u64 = u64::from_le_bytes(arr);
1614                Arc::new(value) as Arc<dyn Any + Send + Sync>
1615            }));
1616        RegisteredDependency {
1617            name: name.to_string(),
1618            crate_name: "tcrate".to_string(),
1619            module_path: module_path.to_string(),
1620            constructor,
1621            dependencies: Vec::new(),
1622            scope: DepScope::Hosted,
1623            worker_fn: Some(worker_fn),
1624            cloneable_codec: None,
1625            hosted_codec: Some(codec),
1626            rpc_factory: None,
1627            companions: Vec::new(),
1628        }
1629    }
1630
1631    #[test]
1632    fn hosted_descriptor_collection_runs_owner_once_and_keeps_it_alive() {
1633        let owner_counter = Arc::new(AtomicUsize::new(0));
1634        let dep = registered_hosted_dep("hosted_dep", 0xcafe_babe_dead_beef, owner_counter.clone());
1635        let test = registered_test("t1", vec!["hosted_dep".to_string()]);
1636
1637        let (execution, _filtered) =
1638            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1639
1640        let (descriptors, owners) = execution.collect_hosted_descriptor_bytes_sync();
1641        assert_eq!(descriptors.len(), 1, "exactly one hosted dep expected");
1642        assert_eq!(owners.len(), 1, "exactly one hosted owner kept alive");
1643
1644        let (dep_id, descriptor_bytes) = &descriptors[0];
1645        assert_eq!(
1646            dep_id, "tcrate::hosted_dep",
1647            "descriptor must be keyed by the fully-qualified id"
1648        );
1649        assert_eq!(
1650            descriptor_bytes.as_slice(),
1651            &0xcafe_babe_dead_beef_u64.to_le_bytes(),
1652            "expected descriptor bytes to match codec.to_wire of payload"
1653        );
1654        assert_eq!(
1655            owner_counter.load(Ordering::SeqCst),
1656            1,
1657            "owner constructor must have run exactly once"
1658        );
1659
1660        // The returned owner Arc<dyn Any> must wrap the same payload value
1661        // — i.e. the parent really is holding the owner alive.
1662        let held: Arc<u64> = owners[0].clone().downcast::<u64>().unwrap();
1663        assert_eq!(*held, 0xcafe_babe_dead_beef);
1664    }
1665
1666    #[test]
1667    fn hosted_descriptor_roundtrips_to_worker_value_via_provide_cloneable_value() {
1668        let owner_counter = Arc::new(AtomicUsize::new(0));
1669        let dep = registered_hosted_dep("hosted_dep", 0x1234_5678_u64, owner_counter.clone());
1670        let test = registered_test("t1", vec!["hosted_dep".to_string()]);
1671
1672        let (mut execution, _filtered) =
1673            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1674
1675        // Worker-side simulation: pre-populate a reconstructed value (this
1676        // is what `apply_provided_wire_bytes` does after running the worker_fn
1677        // against the descriptor bytes).
1678        let pre_value: Arc<dyn Any + Send + Sync> = Arc::new(0x1234_5678_u64);
1679        let applied = execution.provide_cloneable_value("tcrate::hosted_dep", pre_value);
1680        assert!(
1681            applied,
1682            "Hosted dep must accept pre-populated values via the same path as Cloneable"
1683        );
1684
1685        // Pick the test — the owner constructor must NOT run on the worker
1686        // side (we provided a value directly).
1687        let next = execution.pick_next_sync().expect("test should be picked");
1688        let view = next.deps.get("hosted_dep").expect("dep available");
1689        let value: Arc<u64> = view.downcast::<u64>().unwrap();
1690        assert_eq!(*value, 0x1234_5678);
1691        assert_eq!(
1692            owner_counter.load(Ordering::SeqCst),
1693            0,
1694            "Hosted owner constructor must not run on the worker side"
1695        );
1696    }
1697
1698    #[test]
1699    fn has_hosted_dependencies_reports_correctly() {
1700        let dep = registered_hosted_dep("h", 0, Arc::new(AtomicUsize::new(0)));
1701        let test = registered_test("t1", vec!["h".to_string()]);
1702        let (execution, _filtered) =
1703            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1704        assert!(execution.has_hosted_dependencies());
1705        assert!(!execution.has_shared_dependencies());
1706        assert!(!execution.has_cloneable_dependencies());
1707    }
1708
1709    /// The owner constructor must run EXACTLY once even with multiple workers —
1710    /// descriptors are computed once on the parent and shipped to each worker.
1711    #[test]
1712    fn hosted_owner_runs_exactly_once_even_when_collecting_multiple_times() {
1713        // The collector is what the parent calls once; we verify the
1714        // expected invariant: a single collect call invokes the owner once
1715        // (even if multiple Hosted deps share the same dep id structure).
1716        let counter_a = Arc::new(AtomicUsize::new(0));
1717        let counter_b = Arc::new(AtomicUsize::new(0));
1718
1719        // Two distinct Hosted deps in the same module/crate.
1720        let mut dep_a = registered_hosted_dep("hosted_a", 1, counter_a.clone());
1721        dep_a.name = "hosted_a".to_string();
1722        let mut dep_b = registered_hosted_dep("hosted_b", 2, counter_b.clone());
1723        dep_b.name = "hosted_b".to_string();
1724        let test = registered_test("t1", vec!["hosted_a".to_string(), "hosted_b".to_string()]);
1725
1726        let (execution, _filtered) =
1727            TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test], &[]);
1728
1729        let (descriptors, owners) = execution.collect_hosted_descriptor_bytes_sync();
1730        assert_eq!(descriptors.len(), 2);
1731        assert_eq!(owners.len(), 2);
1732        assert_eq!(counter_a.load(Ordering::SeqCst), 1);
1733        assert_eq!(counter_b.load(Ordering::SeqCst), 1);
1734    }
1735
1736    /// Regression test for qualified-id routing on Hosted deps: two Hosted
1737    /// deps that share a local `name` but live in different modules must
1738    /// not collide on the wire and must not cross-apply on the worker side.
1739    /// This mirrors `cloneable_value_routing_uses_qualified_id_across_modules`
1740    /// to make sure the same hardened routing applies to descriptor bytes.
1741    #[test]
1742    fn hosted_descriptor_routing_uses_qualified_id_across_modules() {
1743        let counter_a = Arc::new(AtomicUsize::new(0));
1744        let counter_b = Arc::new(AtomicUsize::new(0));
1745
1746        // Same local name `hosted_dep`, different module paths.
1747        let dep_a = registered_hosted_dep_in("hosted_dep", "mod_a", 11, counter_a.clone());
1748        let dep_b = registered_hosted_dep_in("hosted_dep", "mod_b", 22, counter_b.clone());
1749
1750        let test_a = registered_test_in_module("t_a", "mod_a", vec!["hosted_dep".to_string()]);
1751        let test_b = registered_test_in_module("t_b", "mod_b", vec!["hosted_dep".to_string()]);
1752
1753        let (execution, _filtered) = TestSuiteExecution::construct(
1754            &Arguments::default(),
1755            &[dep_a, dep_b],
1756            &[test_a, test_b],
1757            &[],
1758        );
1759
1760        // Descriptor bytes must carry distinct qualified ids and distinct payloads.
1761        let (mut descriptors, _owners) = execution.collect_hosted_descriptor_bytes_sync();
1762        descriptors.sort_by(|l, r| l.0.cmp(&r.0));
1763        assert_eq!(descriptors.len(), 2);
1764        assert_eq!(descriptors[0].0, "tcrate::mod_a::hosted_dep");
1765        assert_eq!(descriptors[1].0, "tcrate::mod_b::hosted_dep");
1766        assert_eq!(descriptors[0].1.as_slice(), &11_u64.to_le_bytes());
1767        assert_eq!(descriptors[1].1.as_slice(), &22_u64.to_le_bytes());
1768
1769        // Worker-side routing: pre-populating with `mod_a`'s qualified id
1770        // must apply only to `mod_a`'s dep, and similarly for `mod_b`.
1771        // Hosted deps use the same routing pathway as Cloneable, so the same
1772        // qualified-id-routing guarantee applies here.
1773        let mut execution = execution;
1774        let applied_a =
1775            execution.provide_cloneable_value("tcrate::mod_a::hosted_dep", Arc::new(111_u64));
1776        assert!(
1777            applied_a,
1778            "mod_a hosted dep must be reachable by qualified id"
1779        );
1780        let applied_b =
1781            execution.provide_cloneable_value("tcrate::mod_b::hosted_dep", Arc::new(222_u64));
1782        assert!(
1783            applied_b,
1784            "mod_b hosted dep must be reachable by qualified id"
1785        );
1786
1787        let applied_unknown =
1788            execution.provide_cloneable_value("tcrate::mod_c::hosted_dep", Arc::new(333_u64));
1789        assert!(
1790            !applied_unknown,
1791            "unknown qualified id must not be applied to any dep"
1792        );
1793
1794        let first = execution.pick_next_sync().expect("first test");
1795        let second = execution.pick_next_sync().expect("second test");
1796        let pairs: Vec<(String, u64)> = [first, second]
1797            .into_iter()
1798            .map(|n| {
1799                let v: Arc<u64> = n
1800                    .deps
1801                    .get("hosted_dep")
1802                    .expect("dep available")
1803                    .clone()
1804                    .downcast()
1805                    .unwrap();
1806                (n.test.name.clone(), *v)
1807            })
1808            .collect();
1809
1810        let val_a = pairs
1811            .iter()
1812            .find(|(n, _)| n == "t_a")
1813            .expect("t_a picked")
1814            .1;
1815        let val_b = pairs
1816            .iter()
1817            .find(|(n, _)| n == "t_b")
1818            .expect("t_b picked")
1819            .1;
1820        assert_eq!(val_a, 111);
1821        assert_eq!(val_b, 222);
1822
1823        // Each per-module owner constructor must have run exactly once
1824        // (during the parent-side descriptor collection above); the
1825        // worker-side provide_cloneable_value calls must not have re-run
1826        // them on either node.
1827        assert_eq!(counter_a.load(Ordering::SeqCst), 1);
1828        assert_eq!(counter_b.load(Ordering::SeqCst), 1);
1829    }
1830
1831    /// Mode-consistency regression test for the Hosted scope: when the
1832    /// runner does NOT spawn workers (e.g. `--nocapture`), tests must
1833    /// still see the *worker-side handle* produced by the registered
1834    /// `worker_fn` (i.e. `HostedDep::from_descriptor`), not the raw owner
1835    /// value returned by the parent constructor. This exercises the same
1836    /// codec + worker_fn round-trip that the runner-side
1837    /// `apply_hosted_descriptors_locally` helpers in sync.rs / tokio.rs
1838    /// perform on the no-spawn-workers path.
1839    #[test]
1840    fn hosted_no_spawn_workers_uses_worker_side_handle() {
1841        // Build a Hosted dep whose owner is one u64 value but whose
1842        // worker reconstructor produces a DIFFERENT u64 value. If the
1843        // local code path goes through descriptor->worker_fn correctly,
1844        // the test must see the worker value (not the owner value).
1845        let owner_counter = Arc::new(AtomicUsize::new(0));
1846        let constructor_counter = owner_counter.clone();
1847        let owner_value: u64 = 0xAAAA_AAAA_AAAA_AAAA_u64;
1848        let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
1849            constructor_counter.fetch_add(1, Ordering::SeqCst);
1850            Arc::new(owner_value) as Arc<dyn Any + Send + Sync>
1851        }));
1852        // Owner-side codec serialises the owner value as raw LE bytes.
1853        // The worker side wraps those bytes in `Vec<u8>` and the
1854        // worker_fn flips every bit to demonstrate the worker reconstruction
1855        // path is taken (the bit-flip stands in for any non-identity
1856        // `HostedDep::from_descriptor` implementation).
1857        let codec = CloneableCodec {
1858            to_wire: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
1859                let v: Arc<u64> = any.downcast::<u64>().unwrap();
1860                (*v).to_le_bytes().to_vec()
1861            }),
1862            from_wire_bytes: Arc::new(|bytes: &[u8]| {
1863                let boxed: Vec<u8> = bytes.to_vec();
1864                Arc::new(boxed) as Arc<dyn Any + Send + Sync>
1865            }),
1866        };
1867        let worker_fn =
1868            crate::internal::WorkerReconstructor::Sync(Arc::new(|wire_payload, _deps| {
1869                let bytes_arc: Arc<Vec<u8>> = wire_payload.downcast::<Vec<u8>>().unwrap();
1870                let arr: [u8; 8] = (*bytes_arc).as_slice().try_into().unwrap();
1871                let raw: u64 = u64::from_le_bytes(arr);
1872                let handle_value: u64 = !raw;
1873                Arc::new(handle_value) as Arc<dyn Any + Send + Sync>
1874            }));
1875        let dep = RegisteredDependency {
1876            name: "hosted_dep".to_string(),
1877            crate_name: "tcrate".to_string(),
1878            module_path: String::new(),
1879            constructor,
1880            dependencies: Vec::new(),
1881            scope: DepScope::Hosted,
1882            worker_fn: Some(worker_fn.clone()),
1883            cloneable_codec: None,
1884            hosted_codec: Some(codec.clone()),
1885            rpc_factory: None,
1886            companions: Vec::new(),
1887        };
1888        let test = registered_test("t1", vec!["hosted_dep".to_string()]);
1889
1890        let (mut execution, _filtered) =
1891            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1892
1893        // Parent runs the owner constructor once and collects descriptor bytes
1894        // (mirroring `collect_hosted_descriptor_bytes_sync` invoked by the
1895        // no-spawn-workers parent runner).
1896        let (descriptors, owners) = execution.collect_hosted_descriptor_bytes_sync();
1897        assert_eq!(descriptors.len(), 1);
1898        assert_eq!(owners.len(), 1);
1899        let (dep_id, wire_bytes) = &descriptors[0];
1900
1901        // Parent reconstructs the WORKER-side handle locally via
1902        // codec.from_wire_bytes + worker_fn (mirroring
1903        // `apply_hosted_descriptors_locally` in sync.rs / tokio.rs).
1904        let wire_payload = (codec.from_wire_bytes)(wire_bytes.as_slice());
1905        let empty_deps: Arc<dyn crate::internal::DependencyView + Send + Sync> =
1906            Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
1907        let reconstructed = match &worker_fn {
1908            crate::internal::WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
1909            crate::internal::WorkerReconstructor::Async(_) => unreachable!(),
1910        };
1911        let applied = execution.provide_cloneable_value(dep_id, reconstructed);
1912        assert!(applied);
1913
1914        // Pick the test — it must see the WORKER handle (~owner), NOT the owner
1915        // constructor's return value.
1916        let next = execution.pick_next_sync().expect("test picked");
1917        let view = next.deps.get("hosted_dep").expect("dep available");
1918        let value: Arc<u64> = view.clone().downcast::<u64>().unwrap();
1919        assert_eq!(
1920            *value,
1921            !owner_value,
1922            "Hosted dep must expose the worker-side handle (from_descriptor) even in the no-spawn-workers path"
1923        );
1924        assert_eq!(
1925            owner_counter.load(Ordering::SeqCst),
1926            1,
1927            "owner constructor must have run exactly once during descriptor collection"
1928        );
1929    }
1930
1931    /// Hosted owners construct their dependencies in the parent collection
1932    /// context. Worker-side values reconstructed from descriptors are leaves;
1933    /// the constructor dependencies are not re-created from wire bytes.
1934    #[test]
1935    fn hosted_dep_with_owner_dependencies_constructs_in_parent_context() {
1936        let dep_counter = Arc::new(AtomicUsize::new(0));
1937        let owner_counter = Arc::new(AtomicUsize::new(0));
1938        let dep = registered_cloneable_dep("some_other_dep", dep_counter.clone());
1939        let mut hosted = registered_hosted_dep("h_with_deps", 0, owner_counter.clone());
1940        hosted.dependencies = vec!["some_other_dep".to_string()];
1941        let test = registered_test("t1", vec!["h_with_deps".to_string()]);
1942        let (execution, _filtered) =
1943            TestSuiteExecution::construct(&Arguments::default(), &[dep, hosted], &[test], &[]);
1944        let collected = execution.collect_parent_shared_dependencies_sync();
1945
1946        assert_eq!(collected.hosted_descriptor_bytes.len(), 1);
1947        assert_eq!(dep_counter.load(Ordering::SeqCst), 1);
1948        assert_eq!(owner_counter.load(Ordering::SeqCst), 1);
1949    }
1950
1951    /// Tokio path: async owner constructors are awaited on the parent's
1952    /// collector.
1953    #[cfg(feature = "tokio")]
1954    #[test]
1955    fn async_hosted_descriptor_collection_awaits_async_constructor() {
1956        use std::pin::Pin;
1957
1958        let counter = Arc::new(AtomicUsize::new(0));
1959        let constructor_counter = counter.clone();
1960
1961        let constructor = DependencyConstructor::Async(Arc::new(move |_view| {
1962            let counter = constructor_counter.clone();
1963            Box::pin(async move {
1964                tokio::task::yield_now().await;
1965                counter.fetch_add(1, Ordering::SeqCst);
1966                let value: u64 = 42;
1967                Arc::new(value) as Arc<dyn Any + Send + Sync>
1968            }) as Pin<Box<dyn std::future::Future<Output = Arc<dyn Any + Send + Sync>>>>
1969        }));
1970        let codec = CloneableCodec {
1971            to_wire: Arc::new(|any| {
1972                let v: Arc<u64> = any.downcast::<u64>().unwrap();
1973                (*v).to_le_bytes().to_vec()
1974            }),
1975            from_wire_bytes: Arc::new(|bytes| {
1976                let boxed: Vec<u8> = bytes.to_vec();
1977                Arc::new(boxed) as Arc<dyn Any + Send + Sync>
1978            }),
1979        };
1980        let dep = RegisteredDependency {
1981            name: "hosted_async".to_string(),
1982            crate_name: "tcrate".to_string(),
1983            module_path: String::new(),
1984            constructor,
1985            dependencies: Vec::new(),
1986            scope: DepScope::Hosted,
1987            worker_fn: Some(crate::internal::WorkerReconstructor::Sync(Arc::new(
1988                |wire_payload, _| {
1989                    let bytes_arc: Arc<Vec<u8>> = wire_payload.downcast::<Vec<u8>>().unwrap();
1990                    let arr: [u8; 8] = (*bytes_arc).as_slice().try_into().unwrap();
1991                    let value: u64 = u64::from_le_bytes(arr);
1992                    Arc::new(value) as Arc<dyn Any + Send + Sync>
1993                },
1994            ))),
1995            cloneable_codec: None,
1996            hosted_codec: Some(codec),
1997            rpc_factory: None,
1998            companions: Vec::new(),
1999        };
2000        let test = registered_test("t1", vec!["hosted_async".to_string()]);
2001
2002        let (execution, _filtered) =
2003            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2004
2005        let runtime = tokio::runtime::Builder::new_current_thread()
2006            .enable_all()
2007            .build()
2008            .unwrap();
2009        let (descriptors, owners) =
2010            runtime.block_on(execution.collect_hosted_descriptor_bytes_async());
2011
2012        assert_eq!(descriptors.len(), 1);
2013        assert_eq!(owners.len(), 1);
2014        assert_eq!(descriptors[0].0, "tcrate::hosted_async");
2015        assert_eq!(descriptors[0].1.as_slice(), &42_u64.to_le_bytes());
2016        assert_eq!(counter.load(Ordering::SeqCst), 1);
2017
2018        let held: Arc<u64> = owners[0].clone().downcast::<u64>().unwrap();
2019        assert_eq!(*held, 42);
2020    }
2021
2022    // ===========================================================
2023    // HostedRpc unit tests
2024    // ===========================================================
2025
2026    use crate::internal::{
2027        HostedRpcChannel, HostedRpcDep, HostedRpcError, HostedRpcOwnerCell, HostedRpcTransport,
2028        InProcessHostedRpcTransport, RpcFactory,
2029    };
2030
2031    /// Owner type for HostedRpc unit tests. A trivial monotonic counter
2032    /// that increments on every `dispatch(method_idx=1, _)` call and
2033    /// returns the new value as big-endian bytes.
2034    struct RpcCounter {
2035        n: u64,
2036    }
2037
2038    impl HostedRpcDep for RpcCounter {
2039        type Stub = RpcCounterStub;
2040        fn dispatch(&mut self, method_idx: u32, args: &[u8]) -> Result<Vec<u8>, String> {
2041            match method_idx {
2042                1 => {
2043                    self.n += 1;
2044                    Ok(self.n.to_be_bytes().to_vec())
2045                }
2046                // Large-payload echo. Args are a 4-byte big-endian
2047                // u32 size; the owner returns `size` bytes filled with a
2048                // deterministic `i % 251` pattern so framing corruption
2049                // is caught explicitly, not just length mismatch.
2050                2 => {
2051                    let arr: [u8; 4] = args
2052                        .try_into()
2053                        .map_err(|_| "method_idx=2 requires exactly 4 bytes (size)".to_string())?;
2054                    let size = u32::from_be_bytes(arr) as usize;
2055                    let mut out = vec![0u8; size];
2056                    for (i, b) in out.iter_mut().enumerate() {
2057                        *b = (i % 251) as u8;
2058                    }
2059                    Ok(out)
2060                }
2061                other => Err(format!("RpcCounter: unknown method_idx {other}")),
2062            }
2063        }
2064        fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
2065            RpcCounterStub { channel }
2066        }
2067    }
2068
2069    /// Worker-visible stub for the test owner above.
2070    struct RpcCounterStub {
2071        channel: HostedRpcChannel,
2072    }
2073
2074    impl RpcCounterStub {
2075        fn next(&self) -> u64 {
2076            let bytes = self.channel.call(1, Vec::new()).expect("rpc call");
2077            let arr: [u8; 8] = bytes.as_slice().try_into().unwrap();
2078            u64::from_be_bytes(arr)
2079        }
2080
2081        /// Request `size` bytes back from the owner.
2082        fn echo(&self, size: u32) -> Vec<u8> {
2083            self.channel
2084                .call(2, size.to_be_bytes().to_vec())
2085                .expect("echo rpc call")
2086        }
2087    }
2088
2089    /// Builds a HostedRpc `RegisteredDependency` for tests. The constructor
2090    /// wraps an [`RpcCounter`] into a [`HostedRpcOwnerCell`] (mirroring the
2091    /// macro-emitted code), counts its own runs in `counter`, and the
2092    /// `RpcFactory` performs the symmetric downcast back to a cell.
2093    fn registered_hosted_rpc_dep(
2094        name: &str,
2095        module_path: &str,
2096        owner_counter: Arc<AtomicUsize>,
2097    ) -> RegisteredDependency {
2098        let ctor_counter = owner_counter.clone();
2099        let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
2100            ctor_counter.fetch_add(1, Ordering::SeqCst);
2101            let cell = HostedRpcOwnerCell::from_owner(RpcCounter { n: 0 });
2102            Arc::new(cell) as Arc<dyn Any + Send + Sync>
2103        }));
2104        let factory = RpcFactory {
2105            owner_into_cell: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
2106                any.downcast::<HostedRpcOwnerCell>()
2107                    .expect("HostedRpc owner downcast")
2108            }),
2109            build_stub: Arc::new(|channel: HostedRpcChannel| {
2110                let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
2111                Arc::new(stub) as Arc<dyn Any + Send + Sync>
2112            }),
2113        };
2114        RegisteredDependency {
2115            name: name.to_string(),
2116            crate_name: "tcrate".to_string(),
2117            module_path: module_path.to_string(),
2118            constructor,
2119            dependencies: Vec::new(),
2120            scope: DepScope::HostedRpc,
2121            worker_fn: None,
2122            cloneable_codec: None,
2123            hosted_codec: None,
2124            rpc_factory: Some(factory),
2125            companions: Vec::new(),
2126        }
2127    }
2128
2129    #[test]
2130    fn hosted_rpc_owner_cells_collected_once_and_keyed_by_qualified_id() {
2131        let counter = Arc::new(AtomicUsize::new(0));
2132        let dep = registered_hosted_rpc_dep("rpc_dep", "", counter.clone());
2133        let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2134
2135        let (execution, _filtered) =
2136            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2137
2138        assert!(execution.has_hosted_rpc_dependencies());
2139
2140        let cells = execution.collect_hosted_rpc_owner_cells_sync();
2141        assert_eq!(cells.len(), 1, "exactly one hosted rpc dep expected");
2142        let (dep_id, _cell) = &cells[0];
2143        assert_eq!(dep_id, "tcrate::rpc_dep");
2144        assert_eq!(
2145            counter.load(Ordering::SeqCst),
2146            1,
2147            "owner constructor must run exactly once on the parent"
2148        );
2149
2150        // Collecting again must not re-run the constructor in the same
2151        // execution tree (the constructor is called inside collect, not
2152        // memoised). This asserts that we don't accidentally double-collect
2153        // when the runner makes the call.
2154        let cells_b = execution.collect_hosted_rpc_owner_cells_sync();
2155        assert_eq!(cells_b.len(), 1);
2156        assert_eq!(
2157            counter.load(Ordering::SeqCst),
2158            2,
2159            "collect_hosted_rpc_owner_cells_sync runs the constructor on every call; \
2160             callers (the runner) are responsible for only calling it once per suite"
2161        );
2162    }
2163
2164    #[test]
2165    fn hosted_rpc_owner_dependencies_construct_in_parent_context() {
2166        let parent_only_counter = Arc::new(AtomicUsize::new(0));
2167        let owner_counter = Arc::new(AtomicUsize::new(0));
2168        let parent_only_dep =
2169            registered_cloneable_dep("parent_only_dep", parent_only_counter.clone());
2170        let mut rpc_dep = registered_hosted_rpc_dep("rpc_dep", "", owner_counter.clone());
2171        rpc_dep.dependencies = vec!["parent_only_dep".to_string()];
2172        let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2173
2174        let (execution, _filtered) = TestSuiteExecution::construct(
2175            &Arguments::default(),
2176            &[parent_only_dep, rpc_dep],
2177            &[test],
2178            &[],
2179        );
2180
2181        let cells = execution.collect_hosted_rpc_owner_cells_sync();
2182        assert_eq!(cells.len(), 1);
2183        assert_eq!(parent_only_counter.load(Ordering::SeqCst), 1);
2184        assert_eq!(owner_counter.load(Ordering::SeqCst), 1);
2185    }
2186
2187    #[test]
2188    fn hosted_rpc_in_process_transport_routes_to_owner_cell() {
2189        let counter = Arc::new(AtomicUsize::new(0));
2190        let dep = registered_hosted_rpc_dep("rpc_dep", "", counter.clone());
2191        let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2192
2193        let (execution, _filtered) =
2194            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2195
2196        let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
2197            .collect_hosted_rpc_owner_cells_sync()
2198            .into_iter()
2199            .collect();
2200
2201        let transport: Arc<dyn HostedRpcTransport> =
2202            Arc::new(InProcessHostedRpcTransport::new(cells.clone()));
2203        let channel = HostedRpcChannel::new("tcrate::rpc_dep".to_string(), transport.clone());
2204        let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
2205
2206        assert_eq!(stub.next(), 1);
2207        assert_eq!(stub.next(), 2);
2208        assert_eq!(stub.next(), 3);
2209    }
2210
2211    #[test]
2212    fn hosted_rpc_in_process_transport_returns_dispatch_error_on_unknown_method() {
2213        let counter = Arc::new(AtomicUsize::new(0));
2214        let dep = registered_hosted_rpc_dep("rpc_dep", "", counter);
2215        let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2216
2217        let (execution, _filtered) =
2218            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2219
2220        let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
2221            .collect_hosted_rpc_owner_cells_sync()
2222            .into_iter()
2223            .collect();
2224        let transport: Arc<dyn HostedRpcTransport> =
2225            Arc::new(InProcessHostedRpcTransport::new(cells.clone()));
2226        let channel = HostedRpcChannel::new("tcrate::rpc_dep".to_string(), transport.clone());
2227
2228        // Call an unknown method index directly; the owner's `dispatch`
2229        // returns `Err("…")` which the transport surfaces as a Dispatch error.
2230        let err = channel.call(999, Vec::new()).unwrap_err();
2231        match err {
2232            HostedRpcError::Dispatch(msg) => {
2233                assert!(
2234                    msg.contains("unknown method_idx 999"),
2235                    "expected dispatch error to mention method_idx, got '{msg}'"
2236                );
2237            }
2238            HostedRpcError::Transport(msg) => {
2239                panic!("expected Dispatch error, got Transport({msg})");
2240            }
2241        }
2242    }
2243
2244    #[test]
2245    fn hosted_rpc_in_process_transport_returns_transport_error_on_unknown_dep_id() {
2246        let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = HashMap::new();
2247        let transport: Arc<dyn HostedRpcTransport> =
2248            Arc::new(InProcessHostedRpcTransport::new(cells));
2249        let channel = HostedRpcChannel::new("tcrate::missing_dep".to_string(), transport.clone());
2250        let err = channel.call(1, Vec::new()).unwrap_err();
2251        match err {
2252            HostedRpcError::Transport(msg) => {
2253                assert!(
2254                    msg.contains("unknown dep id 'tcrate::missing_dep'"),
2255                    "expected transport error to mention dep id, got '{msg}'"
2256                );
2257            }
2258            HostedRpcError::Dispatch(msg) => {
2259                panic!("expected Transport error, got Dispatch({msg})");
2260            }
2261        }
2262    }
2263
2264    // -------------------------------------------------------------
2265    // Coverage for owner panic + mutex poisoning. The owner-cell catches the
2266    // panic, turns it into `Err("hosted rpc owner panicked: ...")` for the
2267    // first call, and subsequent calls hit the poisoned mutex and get the
2268    // stable `"hosted rpc owner poisoned"` error.
2269    // -------------------------------------------------------------
2270
2271    /// Owner that panics on every dispatch call. Used to exercise the
2272    /// catch_unwind + poisoned-mutex paths in `HostedRpcOwnerCell::dispatch`.
2273    struct PanickingRpcOwner;
2274
2275    impl HostedRpcDep for PanickingRpcOwner {
2276        type Stub = RpcCounterStub;
2277        fn dispatch(&mut self, _method_idx: u32, _args: &[u8]) -> Result<Vec<u8>, String> {
2278            panic!("owner_panic_for_test");
2279        }
2280        fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
2281            RpcCounterStub { channel }
2282        }
2283    }
2284
2285    /// Async owner used to verify `AsyncHostedRpcDep`-flavoured cells
2286    /// dispatch through the async path and surface results correctly.
2287    /// Yields once to force the future to actually `.await` (a no-op
2288    /// `std::future::ready` wouldn't exercise the async cell machinery).
2289    #[cfg(feature = "tokio")]
2290    struct AsyncRpcCounter {
2291        n: u64,
2292    }
2293
2294    #[cfg(feature = "tokio")]
2295    impl crate::internal::AsyncHostedRpcDep for AsyncRpcCounter {
2296        type Stub = RpcCounterStub;
2297        async fn dispatch(&mut self, method_idx: u32, _args: &[u8]) -> Result<Vec<u8>, String> {
2298            // Force a real `.await` so the async dispatch machinery
2299            // is actually exercised (not just a `ready(...)` bridge).
2300            ::tokio::task::yield_now().await;
2301            if method_idx == 1 {
2302                self.n += 1;
2303                Ok(self.n.to_be_bytes().to_vec())
2304            } else {
2305                Err(format!("AsyncRpcCounter: unknown method_idx {method_idx}"))
2306            }
2307        }
2308        fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
2309            RpcCounterStub { channel }
2310        }
2311    }
2312
2313    /// Async owner that always panics — exercises the
2314    /// `futures::FutureExt::catch_unwind` + poison-flag machinery on
2315    /// the async cell variant.
2316    #[cfg(feature = "tokio")]
2317    struct PanickingAsyncRpcOwner;
2318
2319    #[cfg(feature = "tokio")]
2320    impl crate::internal::AsyncHostedRpcDep for PanickingAsyncRpcOwner {
2321        type Stub = RpcCounterStub;
2322        async fn dispatch(&mut self, _method_idx: u32, _args: &[u8]) -> Result<Vec<u8>, String> {
2323            ::tokio::task::yield_now().await;
2324            panic!("async_owner_panic_for_test");
2325        }
2326        fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
2327            RpcCounterStub { channel }
2328        }
2329    }
2330
2331    /// End-to-end: an async owner registered via `from_async_owner`
2332    /// dispatches via `dispatch_async` and returns the expected bytes
2333    /// after actually `.await`ing inside its method body.
2334    #[cfg(feature = "tokio")]
2335    #[test]
2336    fn async_hosted_rpc_owner_dispatches_through_async_cell() {
2337        let cell = HostedRpcOwnerCell::from_async_owner(AsyncRpcCounter { n: 0 });
2338        let rt = ::tokio::runtime::Builder::new_multi_thread()
2339            .enable_all()
2340            .build()
2341            .expect("build tokio runtime");
2342
2343        let bytes_a = rt
2344            .block_on(cell.dispatch_async(1, &[]))
2345            .expect("first async dispatch must succeed");
2346        assert_eq!(bytes_a, 1u64.to_be_bytes().to_vec());
2347
2348        let bytes_b = rt
2349            .block_on(cell.dispatch_async(1, &[]))
2350            .expect("second async dispatch must succeed");
2351        assert_eq!(bytes_b, 2u64.to_be_bytes().to_vec());
2352    }
2353
2354    /// An async owner panic must surface as
2355    /// `"hosted rpc owner panicked: ..."` and poison the cell so every
2356    /// subsequent dispatch short-circuits with the stable
2357    /// `"hosted rpc owner poisoned"` error. Mirrors the sync test.
2358    #[cfg(feature = "tokio")]
2359    #[test]
2360    fn async_hosted_rpc_owner_panic_surfaces_then_poisons() {
2361        let cell = HostedRpcOwnerCell::from_async_owner(PanickingAsyncRpcOwner);
2362        let rt = ::tokio::runtime::Builder::new_multi_thread()
2363            .enable_all()
2364            .build()
2365            .expect("build tokio runtime");
2366
2367        let err1 = rt
2368            .block_on(cell.dispatch_async(1, &[]))
2369            .expect_err("first async dispatch must surface the panic as Err");
2370        assert!(
2371            err1.contains("hosted rpc owner panicked: async_owner_panic_for_test"),
2372            "expected first-call error to wrap the async panic payload, got '{err1}'"
2373        );
2374
2375        let err2 = rt
2376            .block_on(cell.dispatch_async(1, &[]))
2377            .expect_err("second async dispatch must short-circuit on the poisoned cell");
2378        assert_eq!(
2379            err2, "hosted rpc owner poisoned",
2380            "expected poisoned-cell error on the second async call, got '{err2}'"
2381        );
2382    }
2383
2384    /// Regression for the async poison race: a second dispatch that
2385    /// parks on `tokio::sync::Mutex::lock().await` *before* the first
2386    /// dispatch panics must still observe the poison flag once it
2387    /// acquires the mutex, and must not re-enter the owner. Without
2388    /// the in-lock re-check the second waiter would get a fresh
2389    /// `MutexGuard` and call the user method on the half-mutated
2390    /// owner.
2391    #[cfg(feature = "tokio")]
2392    #[test]
2393    fn async_hosted_rpc_owner_poison_blocks_concurrent_waiter() {
2394        use std::sync::atomic::{AtomicUsize, Ordering};
2395        use std::sync::Arc;
2396        use std::time::Duration;
2397
2398        /// Counts dispatch entries to verify the second call never
2399        /// re-enters after the first call's panic.
2400        struct OnePanicThenForbidden {
2401            entries: Arc<AtomicUsize>,
2402        }
2403
2404        impl crate::internal::AsyncHostedRpcDep for OnePanicThenForbidden {
2405            type Stub = RpcCounterStub;
2406            async fn dispatch(
2407                &mut self,
2408                _method_idx: u32,
2409                _args: &[u8],
2410            ) -> Result<Vec<u8>, String> {
2411                let n = self.entries.fetch_add(1, Ordering::SeqCst);
2412                // First entry: hold the mutex for long enough that a
2413                // second dispatch parks on `lock().await`, then panic.
2414                if n == 0 {
2415                    ::tokio::time::sleep(Duration::from_millis(50)).await;
2416                    panic!("first_dispatch_panic_poison_race");
2417                }
2418                // Any subsequent re-entry is a bug: the poison flag
2419                // re-check inside the lock should have short-circuited
2420                // before we got here.
2421                panic!("second_dispatch_unexpectedly_re_entered_after_poison");
2422            }
2423            fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
2424                RpcCounterStub { channel }
2425            }
2426        }
2427
2428        let entries = Arc::new(AtomicUsize::new(0));
2429        let cell = Arc::new(HostedRpcOwnerCell::from_async_owner(
2430            OnePanicThenForbidden {
2431                entries: entries.clone(),
2432            },
2433        ));
2434
2435        let rt = ::tokio::runtime::Builder::new_multi_thread()
2436            .enable_all()
2437            .build()
2438            .expect("build tokio runtime");
2439
2440        rt.block_on(async {
2441            let cell_a = cell.clone();
2442            let cell_b = cell.clone();
2443
2444            let first = ::tokio::spawn(async move { cell_a.dispatch_async(1, &[]).await });
2445
2446            // Give `first` a head start so it definitely owns the
2447            // mutex before `second` starts and parks on `lock().await`.
2448            ::tokio::time::sleep(Duration::from_millis(5)).await;
2449
2450            let second = ::tokio::spawn(async move { cell_b.dispatch_async(1, &[]).await });
2451
2452            let first_res = first.await.expect("first task must not be cancelled");
2453            let second_res = second.await.expect("second task must not be cancelled");
2454
2455            let first_err =
2456                first_res.expect_err("first dispatch must surface the panic as Err, not Ok");
2457            assert!(
2458                first_err.contains("hosted rpc owner panicked: first_dispatch_panic_poison_race"),
2459                "expected the first call to surface the panic; got '{first_err}'"
2460            );
2461
2462            let second_err = second_res
2463                .expect_err("second dispatch must short-circuit on the poisoned cell, not Ok");
2464            assert_eq!(
2465                second_err, "hosted rpc owner poisoned",
2466                "expected the second waiter to see the poison flag; got '{second_err}'"
2467            );
2468        });
2469
2470        // Exactly one entry into the owner: the second waiter must
2471        // have been turned away by the poison re-check, never reaching
2472        // the user dispatcher body.
2473        assert_eq!(
2474            entries.load(Ordering::SeqCst),
2475            1,
2476            "owner dispatcher must run at most once across the poisoned pair"
2477        );
2478    }
2479
2480    /// `dispatch_blocking` against an `Async` cell on a multi-thread
2481    /// tokio runtime must succeed (it bridges to `dispatch_async`
2482    /// via `block_in_place` + `block_on`).
2483    #[cfg(feature = "tokio")]
2484    #[test]
2485    fn async_hosted_rpc_dispatch_blocking_drives_async_cell() {
2486        let cell = HostedRpcOwnerCell::from_async_owner(AsyncRpcCounter { n: 0 });
2487        let rt = ::tokio::runtime::Builder::new_multi_thread()
2488            .enable_all()
2489            .build()
2490            .expect("build tokio runtime");
2491        let bytes = rt
2492            .block_on(async {
2493                ::tokio::task::spawn_blocking(move || cell.dispatch_blocking(1, &[])).await
2494            })
2495            .expect("spawn_blocking joined")
2496            .expect("dispatch_blocking must succeed against an async cell on multi-thread rt");
2497        assert_eq!(bytes, 1u64.to_be_bytes().to_vec());
2498    }
2499
2500    /// `dispatch_blocking` on a `current_thread` runtime must return a
2501    /// clean `Err` rather than panicking inside `block_in_place`. The
2502    /// API contract is `Result<_, String>`; a `current_thread` runtime
2503    /// is unsupported but it must not blow up the dispatcher loop.
2504    #[cfg(feature = "tokio")]
2505    #[test]
2506    fn async_hosted_rpc_dispatch_blocking_rejects_current_thread_runtime() {
2507        let cell = HostedRpcOwnerCell::from_async_owner(AsyncRpcCounter { n: 0 });
2508        let rt = ::tokio::runtime::Builder::new_current_thread()
2509            .enable_all()
2510            .build()
2511            .expect("build current-thread tokio runtime");
2512        // `dispatch_blocking` is invoked *from* the current-thread
2513        // runtime so that `Handle::try_current()` resolves to it.
2514        let err = rt
2515            .block_on(async { cell.dispatch_blocking(1, &[]) })
2516            .expect_err("dispatch_blocking must reject current-thread runtimes cleanly");
2517        assert!(
2518            err.contains("multi-threaded"),
2519            "expected the rejection error to mention multi-threaded requirement, got '{err}'"
2520        );
2521    }
2522
2523    /// `InProcessHostedRpcTransport` is the `--nocapture` / no-spawn
2524    /// codepath. Under the tokio runner it must route to async owner
2525    /// cells via the sync `call` -> `dispatch_blocking` bridge so the
2526    /// in-process and IPC modes look identical to the user.
2527    #[cfg(feature = "tokio")]
2528    #[test]
2529    fn async_hosted_rpc_in_process_transport_routes_to_async_cell() {
2530        use std::collections::HashMap;
2531        use std::sync::Arc;
2532
2533        let dep_id = "in_process_async_owner".to_string();
2534        let cell = Arc::new(HostedRpcOwnerCell::from_async_owner(AsyncRpcCounter {
2535            n: 0,
2536        }));
2537        let mut cells = HashMap::new();
2538        cells.insert(dep_id.clone(), cell);
2539
2540        let transport: Arc<dyn crate::internal::HostedRpcTransport> =
2541            Arc::new(InProcessHostedRpcTransport::new(cells));
2542
2543        let rt = ::tokio::runtime::Builder::new_multi_thread()
2544            .enable_all()
2545            .build()
2546            .expect("build tokio runtime");
2547
2548        let transport_clone = transport.clone();
2549        let dep_id_clone = dep_id.clone();
2550        let bytes = rt
2551            .block_on(async move {
2552                ::tokio::task::spawn_blocking(move || {
2553                    transport_clone.call(&dep_id_clone, 1, vec![])
2554                })
2555                .await
2556                .expect("spawn_blocking joined")
2557            })
2558            .expect("first in-process dispatch must succeed");
2559        assert_eq!(bytes, 1u64.to_be_bytes().to_vec());
2560
2561        let transport_clone = transport.clone();
2562        let dep_id_clone = dep_id.clone();
2563        let bytes2 = rt
2564            .block_on(async move {
2565                ::tokio::task::spawn_blocking(move || {
2566                    transport_clone.call(&dep_id_clone, 1, vec![])
2567                })
2568                .await
2569                .expect("spawn_blocking joined")
2570            })
2571            .expect("second in-process dispatch must succeed");
2572        assert_eq!(bytes2, 2u64.to_be_bytes().to_vec());
2573    }
2574
2575    #[test]
2576    fn hosted_rpc_owner_panic_surfaces_then_poisons() {
2577        let cell = HostedRpcOwnerCell::from_owner(PanickingRpcOwner);
2578
2579        // First call: the owner's `dispatch` panics with the literal string
2580        // "owner_panic_for_test"; `HostedRpcOwnerCell::dispatch` catches the
2581        // unwind and converts it into a textual error containing the panic
2582        // payload prefixed with "hosted rpc owner panicked: ".
2583        //
2584        // The catch_unwind catches the panic AFTER the MutexGuard has
2585        // started unwinding, which still leaves the mutex poisoned (verified
2586        // by direct std::sync::Mutex behaviour).
2587        let err1 = cell
2588            .dispatch(1, &[])
2589            .expect_err("first call must surface the panic as Err");
2590        assert!(
2591            err1.contains("hosted rpc owner panicked: owner_panic_for_test"),
2592            "expected first-call error to wrap the panic payload, got '{err1}'"
2593        );
2594
2595        // Second call: the mutex is now poisoned from the panic above.
2596        // The cell must short-circuit with the stable "hosted rpc owner
2597        // poisoned" error and must NOT retry the owner.
2598        let err2 = cell
2599            .dispatch(1, &[])
2600            .expect_err("second call must short-circuit on the poisoned cell");
2601        assert_eq!(
2602            err2, "hosted rpc owner poisoned",
2603            "expected poisoned-cell error on the second call, got '{err2}'"
2604        );
2605    }
2606
2607    // -------------------------------------------------------------
2608    // Large-payload IPC framing coverage (>64 KiB) and
2609    // concurrent in-flight RPC requests routed through the in-process
2610    // transport without deadlock or framing corruption.
2611    // -------------------------------------------------------------
2612
2613    #[test]
2614    fn hosted_rpc_in_process_transport_round_trips_large_payload_exceeding_64_kib() {
2615        let counter = Arc::new(AtomicUsize::new(0));
2616        let dep = registered_hosted_rpc_dep("rpc_dep", "", counter);
2617        let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2618
2619        let (execution, _filtered) =
2620            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2621        let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
2622            .collect_hosted_rpc_owner_cells_sync()
2623            .into_iter()
2624            .collect();
2625        let transport: Arc<dyn HostedRpcTransport> =
2626            Arc::new(InProcessHostedRpcTransport::new(cells));
2627        let channel = HostedRpcChannel::new("tcrate::rpc_dep".to_string(), transport);
2628        let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
2629
2630        const SIZE: u32 = 256 * 1024; // 256 KiB
2631        let bytes = stub.echo(SIZE);
2632        assert_eq!(
2633            bytes.len(),
2634            SIZE as usize,
2635            "framing dropped/truncated bytes"
2636        );
2637        for (i, b) in bytes.iter().enumerate() {
2638            assert_eq!(
2639                *b,
2640                (i % 251) as u8,
2641                "framing corrupted byte at index {i}: expected {}, got {b}",
2642                (i % 251) as u8
2643            );
2644        }
2645    }
2646
2647    #[test]
2648    fn hosted_rpc_in_process_transport_multiplexes_concurrent_calls_from_threads() {
2649        use std::thread;
2650
2651        let counter = Arc::new(AtomicUsize::new(0));
2652        let dep = registered_hosted_rpc_dep("rpc_dep", "", counter);
2653        let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2654
2655        let (execution, _filtered) =
2656            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2657        let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
2658            .collect_hosted_rpc_owner_cells_sync()
2659            .into_iter()
2660            .collect();
2661        let transport: Arc<dyn HostedRpcTransport> =
2662            Arc::new(InProcessHostedRpcTransport::new(cells));
2663
2664        // Spawn N threads, each making M calls. Every call must return a
2665        // unique positive id (the owner is a single global counter
2666        // serialised by its own mutex). If the in-process transport ever
2667        // deadlocks or routes a reply to the wrong caller, the assertions
2668        // below would fire (duplicate ids, or the spawned thread would
2669        // panic and the join() would surface the failure).
2670        const N: usize = 4;
2671        const M: usize = 32;
2672        let mut handles = Vec::new();
2673        for _ in 0..N {
2674            let dep_id = "tcrate::rpc_dep".to_string();
2675            let transport = transport.clone();
2676            handles.push(thread::spawn(move || {
2677                let channel = HostedRpcChannel::new(dep_id, transport);
2678                let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
2679                let mut ids = Vec::with_capacity(M);
2680                for _ in 0..M {
2681                    ids.push(stub.next());
2682                }
2683                ids
2684            }));
2685        }
2686        let mut all = Vec::with_capacity(N * M);
2687        for h in handles {
2688            all.extend(h.join().expect("thread panicked"));
2689        }
2690        all.sort();
2691        let mut prev: u64 = 0;
2692        for id in &all {
2693            assert!(
2694                *id > prev,
2695                "duplicate or non-monotonic id {id} after {prev}"
2696            );
2697            prev = *id;
2698        }
2699        assert_eq!(
2700            all.len(),
2701            N * M,
2702            "expected exactly {} ids in total, got {}",
2703            N * M,
2704            all.len()
2705        );
2706    }
2707}