Skip to main content

test_r_core/
execution.rs

1use rand::prelude::{SliceRandom, StdRng};
2use rand::SeedableRng;
3use std::any::Any;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::fmt::{Debug, Formatter};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use topological_sort::TopologicalSort;
9
10use crate::args::Arguments;
11use crate::internal::{
12    apply_suite_props_to_tests, filter_registered_tests, DepScope, DependencyConstructor,
13    DependencyView, HostedRpcOwnerCell, RegisteredDependency, RegisteredTest,
14    RegisteredTestSuiteProperty,
15};
16
17/// Wire bytes for a single Cloneable / Hosted dependency, keyed by its
18/// fully-qualified id (`{crate}::{module}::{name}`).
19pub type DepWireBytes = (String, Vec<u8>);
20
21/// Parent-held owner value (used only for `Hosted` deps — the parent keeps
22/// the owner alive for the suite's duration).
23pub type HostedOwner = Arc<dyn Any + Send + Sync>;
24
25#[cfg(feature = "tokio")]
26type ParentSharedDependenciesFuture<'a> = std::pin::Pin<
27    Box<dyn std::future::Future<Output = HashMap<String, Arc<dyn Any + Send + Sync>>> + 'a>,
28>;
29
30#[cfg(test)]
31/// Result of [`TestSuiteExecution::collect_hosted_descriptor_bytes_sync`] /
32/// [`TestSuiteExecution::collect_hosted_descriptor_bytes_async`]: the
33/// descriptor bytes that get shipped to workers, plus the parent-held owner
34/// values that must outlive every worker.
35pub type HostedDescriptorCollection = (Vec<DepWireBytes>, Vec<HostedOwner>);
36
37/// Parent-side materialisation output for dependency scopes whose worker-side
38/// value is derived from parent-owned state instead of by rerunning the user
39/// constructor in each worker process.
40pub struct ParentSharedDependencies {
41    pub cloneable_wire_bytes: Vec<DepWireBytes>,
42    /// Parent-constructed `Cloneable` values keyed by fully-qualified dep id.
43    /// In **no-spawn-workers** mode (e.g. `--nocapture`) the runner installs
44    /// these directly into the execution tree via
45    /// [`TestSuiteExecution::provide_cloneable_value`] so tests see the
46    /// parent's value without re-running the constructor in
47    /// `materialize_deps`. For Cloneable, the round-trip
48    /// `from_wire(to_wire(value))` is by contract semantics-preserving, so
49    /// reusing the parent value directly is equivalent to round-tripping
50    /// while avoiding the duplicate constructor run that historically
51    /// occurred on the no-spawn-workers code path.
52    ///
53    /// In spawn-workers mode this list is unused — workers receive
54    /// `cloneable_wire_bytes` over IPC instead.
55    pub cloneable_local_values: Vec<(String, Arc<dyn Any + Send + Sync>)>,
56    pub hosted_descriptor_bytes: Vec<DepWireBytes>,
57    pub hosted_owners: Vec<HostedOwner>,
58    pub hosted_rpc_owner_cells: Vec<(String, Arc<HostedRpcOwnerCell>)>,
59}
60
61impl ParentSharedDependencies {
62    fn new() -> Self {
63        Self {
64            cloneable_wire_bytes: Vec::new(),
65            cloneable_local_values: Vec::new(),
66            hosted_descriptor_bytes: Vec::new(),
67            hosted_owners: Vec::new(),
68            hosted_rpc_owner_cells: Vec::new(),
69        }
70    }
71}
72
73pub(crate) struct TestSuiteExecution {
74    crate_and_module: String,
75    dependencies: Vec<RegisteredDependency>,
76    tests: Vec<RegisteredTest>,
77    props: Vec<RegisteredTestSuiteProperty>,
78    inner: Vec<TestSuiteExecution>,
79    materialized_dependencies: HashMap<String, Arc<dyn Any + Send + Sync>>,
80    sequential_lock: SequentialExecutionLock,
81    remaining_count: usize,
82    idx: usize,
83    is_sequential: bool,
84    skip_creating_dependencies: bool,
85    in_progress: Arc<AtomicUsize>,
86}
87
88impl TestSuiteExecution {
89    pub fn construct(
90        arguments: &Arguments,
91        dependencies: &[RegisteredDependency],
92        tests: &[RegisteredTest],
93        props: &[RegisteredTestSuiteProperty],
94    ) -> (Self, Vec<RegisteredTest>) {
95        let tests_with_props = apply_suite_props_to_tests(tests, props);
96        let mut filtered_tests = filter_registered_tests(arguments, &tests_with_props);
97        Self::shuffle(arguments, &mut filtered_tests);
98        filtered_tests.reverse();
99
100        if filtered_tests.is_empty() {
101            (
102                Self::root(
103                    dependencies
104                        .iter()
105                        .filter(|dep| dep.crate_name.is_empty() && dep.module_path.is_empty())
106                        .cloned()
107                        .collect::<Vec<_>>(),
108                    Vec::new(),
109                    props
110                        .iter()
111                        .filter(|dep| dep.crate_name().is_empty() && dep.module_path().is_empty())
112                        .cloned()
113                        .collect::<Vec<_>>(),
114                ),
115                Vec::new(),
116            )
117        } else {
118            let mut root = Self::root(Vec::new(), Vec::new(), Vec::new());
119
120            for prop in props {
121                root.add_prop(prop.clone());
122            }
123
124            for dep in dependencies {
125                root.add_dependency(dep.clone());
126            }
127
128            for test in filtered_tests.clone() {
129                root.add_test(test.clone());
130            }
131
132            root.propagate_sequential(None);
133            root.prune_unused_deps();
134
135            (root, filtered_tests)
136        }
137    }
138
139    fn shuffle(arguments: &Arguments, tests: &mut [RegisteredTest]) {
140        if let Some(seed) = arguments.shuffle_seed {
141            let mut rng = StdRng::seed_from_u64(seed);
142            tests.shuffle(&mut rng);
143        }
144    }
145
146    /// Disables creating dependencies when picking the next test. This is useful when the execution plan
147    /// is only used to drive spawned workers instead of actually running the tests.
148    pub fn skip_creating_dependencies(&mut self) {
149        self.skip_creating_dependencies = true;
150        for inner in &mut self.inner {
151            inner.skip_creating_dependencies();
152        }
153    }
154
155    pub fn remaining(&self) -> usize {
156        self.remaining_count
157    }
158
159    pub fn is_empty(&self) -> bool {
160        self.tests.is_empty() && self.inner.is_empty()
161    }
162
163    pub fn is_done(&self) -> bool {
164        self.remaining_count == 0
165    }
166
167    /// Returns true if either this level, or any of the inner levels have dependencies
168    #[allow(dead_code)]
169    pub fn has_dependencies(&self) -> bool {
170        !self.dependencies.is_empty() || self.inner.iter().any(|inner| inner.has_dependencies())
171    }
172
173    /// Returns true if any dependency in this subtree uses `DepScope::Shared`
174    /// — those force single-threaded execution when output capture is on,
175    /// because the materialised value cannot cross the parent/worker boundary.
176    pub fn has_shared_dependencies(&self) -> bool {
177        self.dependencies
178            .iter()
179            .any(|d| d.scope == DepScope::Shared)
180            || self
181                .inner
182                .iter()
183                .any(|inner| inner.has_shared_dependencies())
184    }
185
186    /// Returns true if any dependency in this subtree uses `DepScope::Cloneable`.
187    #[allow(dead_code)]
188    pub fn has_cloneable_dependencies(&self) -> bool {
189        self.dependencies
190            .iter()
191            .any(|d| d.scope == DepScope::Cloneable)
192            || self
193                .inner
194                .iter()
195                .any(|inner| inner.has_cloneable_dependencies())
196    }
197
198    /// Returns true if any dependency in this subtree uses `DepScope::Hosted`.
199    /// The parent keeps Hosted owners alive for the duration of the suite
200    /// while shipping descriptors to workers.
201    #[allow(dead_code)]
202    pub fn has_hosted_dependencies(&self) -> bool {
203        self.dependencies
204            .iter()
205            .any(|d| d.scope == DepScope::Hosted)
206            || self
207                .inner
208                .iter()
209                .any(|inner| inner.has_hosted_dependencies())
210    }
211
212    /// Returns true if any dependency in this subtree uses `DepScope::HostedRpc`.
213    /// The parent keeps owner cells alive for the suite and routes
214    /// worker-initiated IPC calls to those cells.
215    #[allow(dead_code)]
216    pub fn has_hosted_rpc_dependencies(&self) -> bool {
217        self.dependencies
218            .iter()
219            .any(|d| d.scope == DepScope::HostedRpc)
220            || self
221                .inner
222                .iter()
223                .any(|inner| inner.has_hosted_rpc_dependencies())
224    }
225
226    /// Collects every Cloneable dependency in this subtree (depth-first).
227    #[allow(dead_code)]
228    pub fn collect_cloneable_dependencies(&self) -> Vec<RegisteredDependency> {
229        let mut out = Vec::new();
230        self.collect_cloneable_dependencies_into(&mut out);
231        out
232    }
233
234    #[allow(dead_code)]
235    fn collect_cloneable_dependencies_into(&self, out: &mut Vec<RegisteredDependency>) {
236        for dep in &self.dependencies {
237            if dep.scope == DepScope::Cloneable {
238                out.push(dep.clone());
239            }
240        }
241        for inner in &self.inner {
242            inner.collect_cloneable_dependencies_into(out);
243        }
244    }
245
246    /// Walks the subtree, materialising dependencies in dependency order and
247    /// collecting the parent-side wire/state needed by Cloneable, Hosted, and
248    /// HostedRpc scopes. Constructor dependencies are resolved in this parent
249    /// context, but workers still receive these shared scopes as dependency-free
250    /// leaves: Cloneable/Hosted values are reconstructed from bytes, and
251    /// HostedRpc values are stubs backed by a channel.
252    pub fn collect_parent_shared_dependencies_sync(&self) -> ParentSharedDependencies {
253        let mut out = ParentSharedDependencies::new();
254        let parent_map = HashMap::new();
255        self.collect_parent_shared_dependencies_into_sync(&parent_map, &mut out);
256        out
257    }
258
259    fn collect_parent_shared_dependencies_into_sync(
260        &self,
261        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
262        out: &mut ParentSharedDependencies,
263    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
264        let mut dependency_map = parent_map.clone();
265        let sorted_dependencies = self.sorted_dependencies();
266
267        for dep in sorted_dependencies {
268            if dependency_map.contains_key(&dep.name) {
269                continue;
270            }
271
272            let value = Self::construct_dependency_sync(dep, &dependency_map);
273            match dep.scope {
274                DepScope::Cloneable => {
275                    let codec = dep.cloneable_codec.as_ref().unwrap_or_else(|| {
276                        panic!("Cloneable dep '{}' missing CloneableCodec", dep.name)
277                    });
278                    out.cloneable_wire_bytes
279                        .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
280                    // Keep the parent-constructed value too, for the
281                    // no-spawn-workers code path that installs Cloneable
282                    // values directly into the execution tree (instead of
283                    // re-running the constructor inside `materialize_deps`).
284                    out.cloneable_local_values
285                        .push((dep.qualified_id(), value.clone()));
286                }
287                DepScope::Hosted => {
288                    let codec = dep.hosted_codec.as_ref().unwrap_or_else(|| {
289                        panic!("Hosted dep '{}' missing hosted codec", dep.name)
290                    });
291                    out.hosted_descriptor_bytes
292                        .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
293                    out.hosted_owners.push(value.clone());
294                }
295                DepScope::HostedRpc => {
296                    let factory = dep.rpc_factory.as_ref().unwrap_or_else(|| {
297                        panic!("HostedRpc dep '{}' missing RpcFactory", dep.name)
298                    });
299                    let cell = (factory.owner_into_cell)(value.clone());
300                    out.hosted_rpc_owner_cells.push((dep.qualified_id(), cell));
301                }
302                DepScope::Shared | DepScope::PerWorker => {}
303            }
304
305            dependency_map.insert(dep.name.clone(), value);
306        }
307
308        for inner in &self.inner {
309            inner.collect_parent_shared_dependencies_into_sync(&dependency_map, out);
310        }
311
312        dependency_map
313    }
314
315    fn construct_dependency_sync(
316        dep: &RegisteredDependency,
317        dependency_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
318    ) -> Arc<dyn Any + Send + Sync> {
319        match &dep.constructor {
320            DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
321            DependencyConstructor::Async(cons) => {
322                futures::executor::block_on(cons(Arc::new(dependency_map.clone())))
323            }
324        }
325    }
326
327    /// Collects only Cloneable wire bytes. The runner uses
328    /// [`Self::collect_parent_shared_dependencies_sync`] to collect all shared
329    /// parent-side values in one pass; this narrower helper remains for unit
330    /// tests and focused callers.
331    #[cfg(test)]
332    pub fn collect_cloneable_wire_bytes_sync(&self) -> Vec<(String, Vec<u8>)> {
333        self.collect_parent_shared_dependencies_sync()
334            .cloneable_wire_bytes
335    }
336
337    /// Parent-side materialisation for `Hosted` dependencies.
338    ///
339    /// The returned descriptor bytes are keyed by fully-qualified dep id and
340    /// the returned owner values must be kept alive for the duration of the
341    /// suite. Unlike Cloneable, Hosted owners may hold resources (TCP
342    /// listeners, Docker containers, gRPC clients, etc.) that workers'
343    /// reconstructed handles depend on.
344    #[cfg(test)]
345    pub fn collect_hosted_descriptor_bytes_sync(&self) -> HostedDescriptorCollection {
346        let collected = self.collect_parent_shared_dependencies_sync();
347        (collected.hosted_descriptor_bytes, collected.hosted_owners)
348    }
349
350    /// Parent-side materialisation for `HostedRpc` dependencies.
351    ///
352    /// Returns `(qualified_id, cell)` pairs that the runtime keeps alive for
353    /// the suite's lifetime and uses to dispatch worker-initiated RPC calls.
354    #[cfg(test)]
355    pub fn collect_hosted_rpc_owner_cells_sync(&self) -> Vec<(String, Arc<HostedRpcOwnerCell>)> {
356        self.collect_parent_shared_dependencies_sync()
357            .hosted_rpc_owner_cells
358    }
359
360    /// Async counterpart of [`Self::collect_parent_shared_dependencies_sync`].
361    /// Async constructors are awaited on the parent before workers receive
362    /// wire bytes, descriptors, or RPC stubs.
363    #[cfg(feature = "tokio")]
364    pub async fn collect_parent_shared_dependencies_async(&self) -> ParentSharedDependencies {
365        let mut out = ParentSharedDependencies::new();
366        let parent_map = HashMap::new();
367        self.collect_parent_shared_dependencies_into_async(&parent_map, &mut out)
368            .await;
369        out
370    }
371
372    #[cfg(feature = "tokio")]
373    fn collect_parent_shared_dependencies_into_async<'a>(
374        &'a self,
375        parent_map: &'a HashMap<String, Arc<dyn Any + Send + Sync>>,
376        out: &'a mut ParentSharedDependencies,
377    ) -> ParentSharedDependenciesFuture<'a> {
378        Box::pin(async move {
379            let mut dependency_map = parent_map.clone();
380            let sorted_dependencies = self.sorted_dependencies();
381
382            for dep in sorted_dependencies {
383                if dependency_map.contains_key(&dep.name) {
384                    continue;
385                }
386
387                let value = match &dep.constructor {
388                    DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
389                    DependencyConstructor::Async(cons) => {
390                        cons(Arc::new(dependency_map.clone())).await
391                    }
392                };
393                match dep.scope {
394                    DepScope::Cloneable => {
395                        let codec = dep.cloneable_codec.as_ref().unwrap_or_else(|| {
396                            panic!("Cloneable dep '{}' missing CloneableCodec", dep.name)
397                        });
398                        out.cloneable_wire_bytes
399                            .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
400                        // Keep the parent-constructed value too, for the
401                        // no-spawn-workers code path that installs Cloneable
402                        // values directly into the execution tree (instead
403                        // of re-running the constructor inside
404                        // `materialize_deps`).
405                        out.cloneable_local_values
406                            .push((dep.qualified_id(), value.clone()));
407                    }
408                    DepScope::Hosted => {
409                        let codec = dep.hosted_codec.as_ref().unwrap_or_else(|| {
410                            panic!("Hosted dep '{}' missing hosted codec", dep.name)
411                        });
412                        out.hosted_descriptor_bytes
413                            .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
414                        out.hosted_owners.push(value.clone());
415                    }
416                    DepScope::HostedRpc => {
417                        let factory = dep.rpc_factory.as_ref().unwrap_or_else(|| {
418                            panic!("HostedRpc dep '{}' missing RpcFactory", dep.name)
419                        });
420                        let cell = (factory.owner_into_cell)(value.clone());
421                        out.hosted_rpc_owner_cells.push((dep.qualified_id(), cell));
422                    }
423                    DepScope::Shared | DepScope::PerWorker => {}
424                }
425
426                dependency_map.insert(dep.name.clone(), value);
427            }
428
429            for inner in &self.inner {
430                inner
431                    .collect_parent_shared_dependencies_into_async(&dependency_map, out)
432                    .await;
433            }
434            dependency_map
435        })
436    }
437
438    /// Async Hosted-only collection helper retained for focused callers.
439    #[cfg(feature = "tokio")]
440    #[cfg(test)]
441    pub async fn collect_hosted_descriptor_bytes_async(&self) -> HostedDescriptorCollection {
442        let collected = self.collect_parent_shared_dependencies_async().await;
443        (collected.hosted_descriptor_bytes, collected.hosted_owners)
444    }
445
446    /// Async Cloneable-only collection helper retained for focused callers.
447    ///
448    /// **Intentionally `!Send`.** The underlying `DependencyConstructor::Async`
449    /// future is not `Send`, so the returned future from this collector cannot
450    /// be either. Must be awaited on the root runner task (i.e., under
451    /// `Runtime::block_on` or directly inside `test_runner`) — never inside
452    /// `tokio::spawn` / a `JoinSet`. If we ever want to spawn Cloneable
453    /// collection onto a worker, the constructor type would need to require
454    /// `Send` first.
455    #[cfg(feature = "tokio")]
456    #[cfg(test)]
457    pub async fn collect_cloneable_wire_bytes_async(&self) -> Vec<(String, Vec<u8>)> {
458        self.collect_parent_shared_dependencies_async()
459            .await
460            .cloneable_wire_bytes
461    }
462
463    /// Worker-side counterpart to [`Self::collect_cloneable_wire_bytes_sync`]:
464    /// pre-populates the Cloneable dep value at the node where the dep is
465    /// registered, so the upcoming `materialize_deps_sync` call uses the
466    /// provided value instead of running the original constructor. The lookup
467    /// is keyed by the dep's fully-qualified id
468    /// (`{crate}::{module}::{name}`), but the value is stored under the local
469    /// `name` so the rest of the materialisation logic keeps working unchanged.
470    /// Returns `true` if a matching dep was found in any node of the subtree.
471    pub fn provide_cloneable_value(
472        &mut self,
473        dep_id: &str,
474        value: Arc<dyn Any + Send + Sync>,
475    ) -> bool {
476        let applied = self.provide_cloneable_value_internal(dep_id, value);
477        if applied {
478            self.prune_unused_deps();
479        }
480        applied
481    }
482
483    fn provide_cloneable_value_internal(
484        &mut self,
485        dep_id: &str,
486        value: Arc<dyn Any + Send + Sync>,
487    ) -> bool {
488        let mut applied = false;
489        if let Some((local_name, dep_idx)) = self
490            .dependencies
491            .iter()
492            .enumerate()
493            .find(|(_, d)| d.qualified_id() == dep_id)
494            .map(|(idx, d)| (d.name.clone(), idx))
495        {
496            // From the worker execution tree's perspective this dependency is
497            // now a leaf: its value came from wire bytes or a HostedRpc channel,
498            // so the worker must not instantiate constructor-only dependencies
499            // that were needed solely in the parent collection context.
500            self.dependencies[dep_idx].dependencies.clear();
501            self.materialized_dependencies
502                .insert(local_name, value.clone());
503            applied = true;
504        }
505        for inner in &mut self.inner {
506            applied |= inner.provide_cloneable_value_internal(dep_id, value.clone());
507        }
508        applied
509    }
510
511    /// Returns true if there are any tests that require capturing, based on the given default setting
512    /// and the per-test CaptureControl overrides.
513    pub fn requires_capturing(&self, capture_by_default: bool) -> bool {
514        self.tests.iter().any(|test| {
515            test.props
516                .capture_control
517                .requires_capturing(capture_by_default)
518        }) || self
519            .inner
520            .iter()
521            .any(|inner| inner.requires_capturing(capture_by_default))
522    }
523
524    #[cfg(feature = "tokio")]
525    pub async fn pick_next(&mut self) -> Option<TestExecution> {
526        if self.is_empty() {
527            None
528        } else {
529            match self
530                .pick_next_internal(&self.create_dependency_map(&HashMap::new()))
531                .await
532            {
533                Some((test, deps, seq_lock, in_progress_counter)) => {
534                    let index = self.idx;
535                    self.idx += 1;
536                    Some(TestExecution {
537                        test: test.clone(),
538                        deps: Arc::new(deps),
539                        index,
540                        _seq_lock: seq_lock,
541                        in_progress_counter,
542                    })
543                }
544                None => None,
545            }
546        }
547    }
548
549    pub fn pick_next_sync(&mut self) -> Option<TestExecution> {
550        match self.pick_next_internal_sync(&HashMap::new()) {
551            Some((test, deps, seq_lock, in_progress_counter)) => {
552                let index = self.idx;
553                self.idx += 1;
554                Some(TestExecution {
555                    test: test.clone(),
556                    deps: Arc::new(deps),
557                    index,
558                    _seq_lock: seq_lock,
559                    in_progress_counter,
560                })
561            }
562            None => None,
563        }
564    }
565
566    #[cfg(feature = "tokio")]
567    #[allow(clippy::type_complexity)]
568    async fn pick_next_internal(
569        &mut self,
570        materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
571    ) -> Option<(
572        RegisteredTest,
573        HashMap<String, Arc<dyn Any + Send + Sync>>,
574        SequentialExecutionLockGuard,
575        Arc<AtomicUsize>,
576    )> {
577        if self.is_empty() {
578            None
579        } else {
580            let dependency_map = if !self.is_materialized() {
581                self.materialize_deps(materialized_parent_deps).await
582            } else {
583                self.create_dependency_map(materialized_parent_deps)
584            };
585
586            let locked = self.sequential_lock.is_locked().await;
587            let result = if self.tests.is_empty() || locked {
588                let current = self.inner.iter_mut();
589                let mut result = None;
590                for inner in current {
591                    if let Some((test, deps, seq_lock, in_progress_counter)) =
592                        Box::pin(inner.pick_next_internal(&dependency_map)).await
593                    {
594                        result = Some((test, deps, seq_lock, in_progress_counter));
595                        break;
596                    }
597                }
598                self.inner.retain(|inner| !inner.is_empty());
599
600                result
601            } else {
602                let guard = self.sequential_lock.lock(self.is_sequential).await;
603                self.in_progress.fetch_add(1, Ordering::Release);
604                self.tests
605                    .pop()
606                    .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
607            };
608            if result.is_none()
609                && self.is_empty()
610                && self.is_materialized()
611                && !locked
612                && self.in_progress.load(Ordering::Acquire) == 0
613            {
614                self.drop_deps();
615            }
616            if result.is_some() {
617                self.remaining_count -= 1;
618            }
619            result
620        }
621    }
622
623    #[allow(clippy::type_complexity)]
624    fn pick_next_internal_sync(
625        &mut self,
626        materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
627    ) -> Option<(
628        RegisteredTest,
629        HashMap<String, Arc<dyn Any + Send + Sync>>,
630        SequentialExecutionLockGuard,
631        Arc<AtomicUsize>,
632    )> {
633        if self.is_empty() {
634            None
635        } else {
636            let dependency_map = if !self.is_materialized() {
637                self.materialize_deps_sync(materialized_parent_deps)
638            } else {
639                self.create_dependency_map(materialized_parent_deps)
640            };
641
642            let locked = self.sequential_lock.is_locked_sync();
643            let result = if self.tests.is_empty() || locked {
644                let current = self.inner.iter_mut();
645                let mut result = None;
646                for inner in current {
647                    if let Some((test, deps, seq_lock, in_progress_counter)) =
648                        inner.pick_next_internal_sync(&dependency_map)
649                    {
650                        result = Some((test, deps, seq_lock, in_progress_counter));
651                        break;
652                    }
653                }
654
655                self.inner.retain(|inner| !inner.is_empty());
656                result
657            } else {
658                let guard = self.sequential_lock.lock_sync(self.is_sequential);
659                self.in_progress.fetch_add(1, Ordering::Release);
660                self.tests
661                    .pop()
662                    .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
663            };
664            if result.is_none()
665                && self.is_materialized()
666                && !locked
667                && self.in_progress.load(Ordering::Acquire) == 0
668            {
669                self.drop_deps();
670            }
671            if result.is_some() {
672                self.remaining_count -= 1;
673            }
674            result
675        }
676    }
677
678    fn create_dependency_map(
679        &self,
680        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
681    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
682        let mut result = parent_map.clone();
683        for (key, dep) in &self.materialized_dependencies {
684            result.insert(key.clone(), dep.clone());
685        }
686        result
687    }
688
689    fn root(
690        deps: Vec<RegisteredDependency>,
691        tests: Vec<RegisteredTest>,
692        props: Vec<RegisteredTestSuiteProperty>,
693    ) -> Self {
694        let total_count = tests.len();
695        let is_sequential = props
696            .iter()
697            .any(|prop| matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }))
698            || tests.iter().any(|test| test.run.is_bench());
699        Self {
700            crate_and_module: String::new(),
701            dependencies: deps,
702            tests,
703            props,
704            inner: Vec::new(),
705            materialized_dependencies: HashMap::new(),
706            remaining_count: total_count,
707            idx: 0,
708            sequential_lock: SequentialExecutionLock::new(),
709            is_sequential,
710            skip_creating_dependencies: false,
711            in_progress: Arc::new(AtomicUsize::new(0)),
712        }
713    }
714
715    fn add_dependency(&mut self, dep: RegisteredDependency) {
716        let crate_and_module = dep.crate_and_module();
717        if self.crate_and_module == crate_and_module {
718            self.dependencies.push(dep);
719        } else {
720            let mut found = false;
721            for inner in &mut self.inner {
722                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
723                    inner.add_dependency(dep.clone());
724                    found = true;
725                    break;
726                }
727            }
728            if !found {
729                let mut inner = Self {
730                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
731                    dependencies: vec![],
732                    tests: vec![],
733                    inner: vec![],
734                    props: vec![],
735                    materialized_dependencies: HashMap::new(),
736                    remaining_count: 0,
737                    idx: 0,
738                    is_sequential: false,
739                    sequential_lock: SequentialExecutionLock::new(),
740                    skip_creating_dependencies: false,
741                    in_progress: Arc::new(AtomicUsize::new(0)),
742                };
743                inner.add_dependency(dep);
744                self.inner.push(inner);
745            }
746        }
747    }
748
749    fn add_test(&mut self, test: RegisteredTest) {
750        let crate_and_module = test.crate_and_module();
751        if self.crate_and_module == crate_and_module {
752            self.tests.push(test.clone());
753
754            if test.run.is_bench() {
755                self.is_sequential = true;
756            }
757        } else {
758            let mut found = false;
759            for inner in &mut self.inner {
760                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
761                    inner.add_test(test.clone());
762                    found = true;
763                    break;
764                }
765            }
766            if !found {
767                let mut inner = Self {
768                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
769                    dependencies: vec![],
770                    tests: vec![],
771                    inner: vec![],
772                    props: vec![],
773                    materialized_dependencies: HashMap::new(),
774                    remaining_count: 0,
775                    idx: 0,
776                    is_sequential: false,
777                    sequential_lock: SequentialExecutionLock::new(),
778                    skip_creating_dependencies: false,
779                    in_progress: Arc::new(AtomicUsize::new(0)),
780                };
781                inner.add_test(test);
782                self.inner.push(inner);
783            }
784        }
785        self.remaining_count += 1;
786    }
787
788    fn add_prop(&mut self, prop: RegisteredTestSuiteProperty) {
789        let crate_and_module = prop.crate_and_module();
790        if self.crate_and_module == crate_and_module {
791            if matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }) {
792                self.is_sequential = true;
793            }
794            self.props.push(prop);
795        } else {
796            let mut found = false;
797            for inner in &mut self.inner {
798                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
799                    inner.add_prop(prop.clone());
800                    found = true;
801                    break;
802                }
803            }
804            if !found {
805                let mut inner = Self {
806                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
807                    dependencies: vec![],
808                    tests: vec![],
809                    inner: vec![],
810                    props: vec![],
811                    materialized_dependencies: HashMap::new(),
812                    remaining_count: 0,
813                    idx: 0,
814                    is_sequential: false,
815                    sequential_lock: SequentialExecutionLock::new(),
816                    skip_creating_dependencies: false,
817                    in_progress: Arc::new(AtomicUsize::new(0)),
818                };
819                inner.add_prop(prop);
820                self.inner.push(inner);
821            }
822        }
823    }
824
825    fn is_materialized(&self) -> bool {
826        self.skip_creating_dependencies
827            || self.materialized_dependencies.len() == self.dependencies.len()
828    }
829
830    #[cfg(feature = "tokio")]
831    async fn materialize_deps(
832        &mut self,
833        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
834    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
835        // Start with any pre-populated values (e.g. Cloneable deps received
836        // from the parent via ProvideCloneable IPC).
837        let mut deps = self.materialized_dependencies.clone();
838        let mut dependency_map = parent_map.clone();
839        for (k, v) in &deps {
840            dependency_map.insert(k.clone(), v.clone());
841        }
842
843        let sorted_dependencies = self.sorted_dependencies();
844        for dep in &sorted_dependencies {
845            if deps.contains_key(&dep.name) {
846                continue;
847            }
848            let materialized_dep = match &dep.constructor {
849                DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
850                DependencyConstructor::Async(cons) => cons(Arc::new(dependency_map.clone())).await,
851            };
852            deps.insert(dep.name.clone(), materialized_dep.clone());
853            dependency_map.insert(dep.name.clone(), materialized_dep);
854        }
855        self.materialized_dependencies = deps;
856        dependency_map
857    }
858
859    fn materialize_deps_sync(
860        &mut self,
861        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
862    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
863        // Start with any pre-populated values (e.g. Cloneable deps received
864        // from the parent via ProvideCloneable IPC).
865        let mut deps = self.materialized_dependencies.clone();
866        let mut dependency_map = parent_map.clone();
867        for (k, v) in &deps {
868            dependency_map.insert(k.clone(), v.clone());
869        }
870
871        let sorted_dependencies = self.sorted_dependencies();
872        for dep in &sorted_dependencies {
873            if deps.contains_key(&dep.name) {
874                continue;
875            }
876            let materialized_dep = match &dep.constructor {
877                DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
878                DependencyConstructor::Async(cons) => {
879                    futures::executor::block_on(cons(Arc::new(dependency_map.clone())))
880                }
881            };
882            deps.insert(dep.name.clone(), materialized_dep.clone());
883            dependency_map.insert(dep.name.clone(), materialized_dep);
884        }
885        self.materialized_dependencies = deps;
886        dependency_map
887    }
888
889    fn sorted_dependencies(&self) -> Vec<&RegisteredDependency> {
890        let mut ts: TopologicalSort<&RegisteredDependency> = TopologicalSort::new();
891        for dep in &self.dependencies {
892            let mut added = false;
893            for dep_dep_name in &dep.dependencies {
894                if let Some(dep_dep) = self.dependencies.iter().find(|d| &d.name == dep_dep_name) {
895                    ts.add_dependency(dep_dep, dep);
896                    added = true;
897                } else {
898                    // otherwise it is expected to come from the parent level
899                }
900            }
901            if !added {
902                ts.insert(dep);
903            }
904        }
905        let mut result = Vec::with_capacity(self.dependencies.len());
906        loop {
907            let chunk = ts.pop_all();
908            if chunk.is_empty() {
909                break;
910            }
911            result.extend(chunk);
912        }
913        result
914    }
915
916    fn drop_deps(&mut self) {
917        self.materialized_dependencies.clear();
918    }
919
920    /// Prunes dependencies that are not needed by any test in this subtree.
921    /// Returns `Some(needed_from_parent)` with dep names needed from ancestor levels,
922    /// or `None` if pruning is disabled for this subtree (unknown deps).
923    fn prune_unused_deps(&mut self) -> Option<HashSet<String>> {
924        // Collect dep names needed by tests at this level
925        let mut needed: Option<HashSet<String>> = Some(HashSet::new());
926        for test in &self.tests {
927            match &test.dependencies {
928                None => {
929                    needed = None;
930                    break;
931                }
932                Some(deps) => {
933                    if let Some(ref mut set) = needed {
934                        set.extend(deps.iter().cloned());
935                    }
936                }
937            }
938        }
939
940        // Merge children's needs
941        for inner in &mut self.inner {
942            let child_needs = inner.prune_unused_deps();
943            needed = match (needed, child_needs) {
944                (None, _) | (_, None) => None,
945                (Some(mut a), Some(b)) => {
946                    a.extend(b);
947                    Some(a)
948                }
949            };
950        }
951
952        // If any test has unknown deps, keep everything
953        let needed = needed?;
954
955        // Determine which local deps to keep
956        let local_names: HashSet<String> =
957            self.dependencies.iter().map(|d| d.name.clone()).collect();
958        let mut keep_local: HashSet<String> = needed.intersection(&local_names).cloned().collect();
959
960        // Expand transitive closure for local deps only (fixpoint)
961        let mut queue: VecDeque<String> = keep_local.iter().cloned().collect();
962        let mut needed_from_parent: HashSet<String> =
963            needed.difference(&local_names).cloned().collect();
964
965        while let Some(dep_name) = queue.pop_front() {
966            if let Some(dep) = self.dependencies.iter().find(|d| d.name == dep_name) {
967                for transitive in &dep.dependencies {
968                    if local_names.contains(transitive) {
969                        if keep_local.insert(transitive.clone()) {
970                            queue.push_back(transitive.clone());
971                        }
972                    } else {
973                        needed_from_parent.insert(transitive.clone());
974                    }
975                }
976                // Companions are planner-only sibling links — no
977                // constructor argument is derived from them, but they
978                // must be retained together with the dep they are
979                // declared on. Used by the
980                // `#[test_dep(scope = Hosted, worker = both(T))]`
981                // lowering to keep the Hosted owner half and the
982                // HostedRpc stub half as a pair even when the
983                // selected tests only parameterise on one of them.
984                for companion in &dep.companions {
985                    if local_names.contains(companion) {
986                        if keep_local.insert(companion.clone()) {
987                            queue.push_back(companion.clone());
988                        }
989                    } else {
990                        needed_from_parent.insert(companion.clone());
991                    }
992                }
993            }
994        }
995
996        // Prune
997        self.dependencies.retain(|d| keep_local.contains(&d.name));
998
999        Some(needed_from_parent)
1000    }
1001
1002    fn is_prefix_of(this: &str, that: &str) -> bool {
1003        this.is_empty() || this == that || that.starts_with(&format!("{this}::"))
1004    }
1005
1006    fn next_level(from: &str, to: &str) -> String {
1007        assert!(Self::is_prefix_of(from, to));
1008        let remaining = if from.is_empty() {
1009            to
1010        } else {
1011            &to[from.len() + 2..]
1012        };
1013
1014        let result = if let Some((next, _tail)) = remaining.split_once("::") {
1015            format!("{from}::{next}")
1016        } else {
1017            format!("{from}::{remaining}")
1018        };
1019        result.trim_start_matches("::").to_string()
1020    }
1021
1022    fn propagate_sequential(&mut self, inherited_lock: Option<&SequentialExecutionLock>) {
1023        if let Some(parent_lock) = inherited_lock {
1024            self.is_sequential = true;
1025            self.sequential_lock = parent_lock.clone();
1026        }
1027
1028        let lock_for_children = if self.is_sequential {
1029            Some(self.sequential_lock.clone())
1030        } else {
1031            None
1032        };
1033
1034        for child in &mut self.inner {
1035            child.propagate_sequential(lock_for_children.as_ref());
1036        }
1037    }
1038}
1039
1040impl Debug for TestSuiteExecution {
1041    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1042        writeln!(
1043            f,
1044            "'{}' {} [{}]",
1045            self.crate_and_module,
1046            self.props
1047                .iter()
1048                .map(|x| format!("{x:?}"))
1049                .collect::<Vec<_>>()
1050                .join(", "),
1051            if self.is_sequential { "S" } else { "P" }
1052        )?;
1053        writeln!(f, "  deps:")?;
1054        for dep in &self.dependencies {
1055            writeln!(f, "    '{}'", dep.name)?;
1056        }
1057        writeln!(f, "  tests:")?;
1058        for test in &self.tests {
1059            writeln!(f, "    '{}' [{:?}]", test.name, test.props.test_type)?;
1060        }
1061        for inner in &self.inner {
1062            let inner_str = format!("{inner:?}");
1063            for inner_line in inner_str.lines() {
1064                writeln!(f, "  {inner_line}")?;
1065            }
1066        }
1067        Ok(())
1068    }
1069}
1070
1071impl DependencyView for HashMap<String, Arc<dyn Any + Send + Sync>> {
1072    fn get(&self, name: &str) -> Option<Arc<dyn Any + Send + Sync>> {
1073        self.get(name).cloned()
1074    }
1075}
1076
1077pub struct TestExecution {
1078    pub test: RegisteredTest,
1079    pub deps: Arc<dyn DependencyView + Send + Sync>,
1080    pub index: usize,
1081    _seq_lock: SequentialExecutionLockGuard,
1082    in_progress_counter: Arc<AtomicUsize>,
1083}
1084
1085impl Drop for TestExecution {
1086    fn drop(&mut self) {
1087        self.in_progress_counter.fetch_sub(1, Ordering::Release);
1088    }
1089}
1090
1091#[allow(dead_code)]
1092enum SequentialExecutionLockGuard {
1093    None,
1094    #[cfg(feature = "tokio")]
1095    Async(tokio::sync::OwnedMutexGuard<()>),
1096    Sync(parking_lot::ArcMutexGuard<parking_lot::RawMutex, ()>),
1097}
1098
1099#[derive(Clone)]
1100struct SequentialExecutionLock {
1101    #[cfg(feature = "tokio")]
1102    async_mutex: Arc<tokio::sync::Mutex<()>>,
1103    sync_mutex: Arc<parking_lot::Mutex<()>>,
1104}
1105
1106impl SequentialExecutionLock {
1107    pub fn new() -> Self {
1108        Self {
1109            #[cfg(feature = "tokio")]
1110            async_mutex: Arc::new(tokio::sync::Mutex::new(())),
1111            sync_mutex: Arc::new(parking_lot::Mutex::new(())),
1112        }
1113    }
1114
1115    #[cfg(feature = "tokio")]
1116    pub async fn is_locked(&self) -> bool {
1117        self.async_mutex.try_lock().is_err()
1118    }
1119
1120    pub fn is_locked_sync(&self) -> bool {
1121        self.sync_mutex.try_lock().is_none()
1122    }
1123
1124    #[cfg(feature = "tokio")]
1125    pub async fn lock(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
1126        if is_sequential {
1127            let permit = tokio::sync::Mutex::lock_owned(self.async_mutex.clone()).await;
1128            SequentialExecutionLockGuard::Async(permit)
1129        } else {
1130            SequentialExecutionLockGuard::None
1131        }
1132    }
1133
1134    pub fn lock_sync(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
1135        if is_sequential {
1136            let permit = parking_lot::Mutex::lock_arc(&self.sync_mutex);
1137            SequentialExecutionLockGuard::Sync(permit)
1138        } else {
1139            SequentialExecutionLockGuard::None
1140        }
1141    }
1142}
1143
1144#[cfg(test)]
1145mod cloneable_tests {
1146    use super::*;
1147    use crate::internal::{
1148        CloneableCodec, DependencyConstructor, RegisteredDependency, RegisteredTest, TestFunction,
1149        TestProperties,
1150    };
1151    use std::sync::atomic::{AtomicUsize, Ordering};
1152
1153    fn registered_test(name: &str, deps: Vec<String>) -> RegisteredTest {
1154        registered_test_in_module(name, "", deps)
1155    }
1156
1157    fn registered_test_in_module(
1158        name: &str,
1159        module_path: &str,
1160        deps: Vec<String>,
1161    ) -> RegisteredTest {
1162        RegisteredTest {
1163            name: name.to_string(),
1164            crate_name: "tcrate".to_string(),
1165            module_path: module_path.to_string(),
1166            run: TestFunction::Sync(Arc::new(|_| Box::new(()))),
1167            props: TestProperties::default(),
1168            dependencies: Some(deps),
1169        }
1170    }
1171
1172    /// A Cloneable dep whose constructor increments a counter (so we can
1173    /// assert it ran exactly once), encodes via simple little-endian bytes.
1174    fn registered_cloneable_dep(name: &str, counter: Arc<AtomicUsize>) -> RegisteredDependency {
1175        registered_cloneable_dep_in(name, "", 0xdead_beef, counter)
1176    }
1177
1178    /// Like [`registered_cloneable_dep`] but lets the caller pick the
1179    /// dep's module path and the constant the constructor emits — so a
1180    /// collision test can assert two same-named deps in different modules
1181    /// don't get crossed up.
1182    fn registered_cloneable_dep_in(
1183        name: &str,
1184        module_path: &str,
1185        constructor_value: u64,
1186        counter: Arc<AtomicUsize>,
1187    ) -> RegisteredDependency {
1188        let constructor_counter = counter.clone();
1189        let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
1190            constructor_counter.fetch_add(1, Ordering::SeqCst);
1191            Arc::new(constructor_value) as Arc<dyn Any + Send + Sync>
1192        }));
1193        let codec = CloneableCodec {
1194            to_wire: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
1195                let value: Arc<u64> = any.downcast::<u64>().unwrap();
1196                (*value).to_le_bytes().to_vec()
1197            }),
1198            from_wire_bytes: Arc::new(|bytes: &[u8]| {
1199                let arr: [u8; 8] = bytes.try_into().unwrap();
1200                let value = u64::from_le_bytes(arr);
1201                Arc::new(value) as Arc<dyn Any + Send + Sync>
1202            }),
1203        };
1204        RegisteredDependency {
1205            name: name.to_string(),
1206            crate_name: "tcrate".to_string(),
1207            module_path: module_path.to_string(),
1208            constructor,
1209            dependencies: Vec::new(),
1210            scope: DepScope::Cloneable,
1211            worker_fn: Some(crate::internal::WorkerReconstructor::Sync(Arc::new(
1212                |wire_payload, _deps| wire_payload,
1213            ))),
1214            cloneable_codec: Some(codec),
1215            hosted_codec: None,
1216            rpc_factory: None,
1217            companions: Vec::new(),
1218        }
1219    }
1220
1221    #[test]
1222    fn cloneable_wire_collection_runs_constructor_once_and_encodes_value() {
1223        let counter = Arc::new(AtomicUsize::new(0));
1224        let dep = registered_cloneable_dep("clone_dep", counter.clone());
1225        let test = registered_test("t1", vec!["clone_dep".to_string()]);
1226
1227        let (execution, _filtered) =
1228            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1229
1230        let collected = execution.collect_cloneable_wire_bytes_sync();
1231        assert_eq!(collected.len(), 1, "exactly one cloneable dep expected");
1232        let (dep_id, wire_bytes) = &collected[0];
1233        assert_eq!(
1234            dep_id, "tcrate::clone_dep",
1235            "wire bytes must be keyed by the fully-qualified id, not the local name"
1236        );
1237        assert_eq!(
1238            wire_bytes.as_slice(),
1239            &0xdead_beef_u64.to_le_bytes(),
1240            "expected the codec-encoded value to round-trip via to_wire"
1241        );
1242        assert_eq!(
1243            counter.load(Ordering::SeqCst),
1244            1,
1245            "constructor must have run exactly once when collecting"
1246        );
1247    }
1248
1249    #[test]
1250    fn prune_unused_deps_retains_companion_when_only_one_half_is_referenced() {
1251        // Regression for `#[test_dep(scope = Hosted, worker = both(T))]`
1252        // pruning. That lowering registers two dep entries (Hosted owner
1253        // view + HostedRpc stub view) for a single logical dep, backed by
1254        // a shared `Arc<HostedBothShared>` cache; the macro now declares
1255        // the two halves as `companions` of each other so that the pruner
1256        // retains the Hosted half even when selected tests only reference
1257        // the stub half. Without that, the async-ctor flavour would panic
1258        // (`Poll::Pending`) at runtime because the shared cache stays
1259        // empty.
1260        //
1261        // This test reproduces the pruner-level invariant cheaply via two
1262        // Cloneable deps. The `keep_local` traversal in
1263        // `prune_unused_deps` should expand the keep-set across
1264        // companions in either direction.
1265
1266        // Case A: reference only `dep_a`; `dep_b` is declared as a
1267        // companion of `dep_a` and must survive pruning.
1268        let counter_a = Arc::new(AtomicUsize::new(0));
1269        let counter_b = Arc::new(AtomicUsize::new(0));
1270        let mut dep_a = registered_cloneable_dep("clone_a", counter_a.clone());
1271        let mut dep_b = registered_cloneable_dep("clone_b", counter_b.clone());
1272        dep_a.companions = vec!["clone_b".to_string()];
1273        dep_b.companions = vec!["clone_a".to_string()];
1274
1275        let test_a = registered_test("t_uses_a", vec!["clone_a".to_string()]);
1276
1277        let (execution, _filtered) =
1278            TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test_a], &[]);
1279
1280        let kept: Vec<String> = execution
1281            .collect_cloneable_dependencies()
1282            .into_iter()
1283            .map(|d| d.name)
1284            .collect();
1285        assert!(
1286            kept.contains(&"clone_a".to_string()),
1287            "directly referenced dep must be retained, kept = {kept:?}"
1288        );
1289        assert!(
1290            kept.contains(&"clone_b".to_string()),
1291            "companion of a retained dep must also be retained (the planner-only \
1292             sibling link used by `worker = both(...)`), kept = {kept:?}"
1293        );
1294
1295        // Case B (reverse direction): reference only `dep_b`, with the
1296        // same companion link. `dep_a` must survive pruning.
1297        let counter_a = Arc::new(AtomicUsize::new(0));
1298        let counter_b = Arc::new(AtomicUsize::new(0));
1299        let mut dep_a = registered_cloneable_dep("clone_a", counter_a.clone());
1300        let mut dep_b = registered_cloneable_dep("clone_b", counter_b.clone());
1301        dep_a.companions = vec!["clone_b".to_string()];
1302        dep_b.companions = vec!["clone_a".to_string()];
1303
1304        let test_b = registered_test("t_uses_b", vec!["clone_b".to_string()]);
1305
1306        let (execution, _filtered) =
1307            TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test_b], &[]);
1308
1309        let kept: Vec<String> = execution
1310            .collect_cloneable_dependencies()
1311            .into_iter()
1312            .map(|d| d.name)
1313            .collect();
1314        assert!(
1315            kept.contains(&"clone_a".to_string()),
1316            "companion of a stub-referenced dep must be retained, kept = {kept:?}"
1317        );
1318        assert!(
1319            kept.contains(&"clone_b".to_string()),
1320            "directly referenced dep must be retained, kept = {kept:?}"
1321        );
1322
1323        // Sanity: a dep with no companion link and not referenced
1324        // anywhere is still pruned. (Prevents the test above from
1325        // accidentally turning into "the pruner never drops anything".)
1326        let counter_a = Arc::new(AtomicUsize::new(0));
1327        let counter_b = Arc::new(AtomicUsize::new(0));
1328        let dep_a = registered_cloneable_dep("clone_a", counter_a.clone());
1329        let dep_b = registered_cloneable_dep("clone_b", counter_b.clone());
1330        let test_a = registered_test("t_uses_a", vec!["clone_a".to_string()]);
1331
1332        let (execution, _filtered) =
1333            TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test_a], &[]);
1334
1335        let kept: Vec<String> = execution
1336            .collect_cloneable_dependencies()
1337            .into_iter()
1338            .map(|d| d.name)
1339            .collect();
1340        assert!(
1341            kept.contains(&"clone_a".to_string()),
1342            "directly referenced dep must be retained, kept = {kept:?}"
1343        );
1344        assert!(
1345            !kept.contains(&"clone_b".to_string()),
1346            "without a companion link, an unreferenced dep must be pruned; \
1347             kept = {kept:?}"
1348        );
1349    }
1350
1351    #[test]
1352    fn provide_cloneable_value_short_circuits_constructor() {
1353        let counter = Arc::new(AtomicUsize::new(0));
1354        let dep = registered_cloneable_dep("clone_dep", counter.clone());
1355        let test = registered_test("t1", vec!["clone_dep".to_string()]);
1356
1357        let (mut execution, _filtered) =
1358            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1359
1360        let pre_value: Arc<dyn Any + Send + Sync> = Arc::new(99_u64);
1361        let applied = execution.provide_cloneable_value("tcrate::clone_dep", pre_value);
1362        assert!(
1363            applied,
1364            "pre-populated value should match the dep's qualified id"
1365        );
1366
1367        // Pick the test — materialize_deps_sync must reuse the pre-populated
1368        // value instead of running the original constructor.
1369        let next = execution.pick_next_sync().expect("test should be picked");
1370        assert_eq!(next.test.name, "t1");
1371
1372        let view = next.deps.get("clone_dep").expect("dep available");
1373        let value: Arc<u64> = view.downcast::<u64>().unwrap();
1374        assert_eq!(*value, 99);
1375
1376        assert_eq!(
1377            counter.load(Ordering::SeqCst),
1378            0,
1379            "constructor must not run when a pre-populated value is supplied"
1380        );
1381    }
1382
1383    #[test]
1384    fn provided_shared_value_is_a_worker_side_leaf() {
1385        let provided_counter = Arc::new(AtomicUsize::new(0));
1386        let parent_only_counter = Arc::new(AtomicUsize::new(0));
1387        let mut provided_dep = registered_cloneable_dep("clone_dep", provided_counter.clone());
1388        provided_dep.dependencies = vec!["parent_only_dep".to_string()];
1389        let parent_only_dep =
1390            registered_cloneable_dep("parent_only_dep", parent_only_counter.clone());
1391        let test = registered_test("t1", vec!["clone_dep".to_string()]);
1392
1393        let (mut execution, _filtered) = TestSuiteExecution::construct(
1394            &Arguments::default(),
1395            &[provided_dep, parent_only_dep],
1396            &[test],
1397            &[],
1398        );
1399
1400        let pre_value: Arc<dyn Any + Send + Sync> = Arc::new(99_u64);
1401        let applied = execution.provide_cloneable_value("tcrate::clone_dep", pre_value);
1402        assert!(applied);
1403
1404        let next = execution.pick_next_sync().expect("test should be picked");
1405        let view = next.deps.get("clone_dep").expect("dep available");
1406        let value: Arc<u64> = view.downcast::<u64>().unwrap();
1407        assert_eq!(*value, 99);
1408        assert_eq!(
1409            provided_counter.load(Ordering::SeqCst),
1410            0,
1411            "worker-side provided values must not run their original constructor"
1412        );
1413        assert_eq!(
1414            parent_only_counter.load(Ordering::SeqCst),
1415            0,
1416            "constructor dependencies are parent-only once a value arrives from wire bytes or an RPC stub"
1417        );
1418    }
1419
1420    /// Async-constructor counterpart for the parent-side collector used by the
1421    /// tokio runner. Verifies that a Cloneable owner declared with
1422    /// `async fn` is awaited on the parent and its wire bytes are produced
1423    /// keyed by the dep's qualified id.
1424    #[cfg(feature = "tokio")]
1425    #[test]
1426    fn async_cloneable_wire_collection_awaits_async_constructor() {
1427        use std::pin::Pin;
1428
1429        let counter = Arc::new(AtomicUsize::new(0));
1430        let constructor_counter = counter.clone();
1431
1432        // Build a RegisteredDependency with an Async constructor that
1433        // genuinely awaits a future (tokio::task::yield_now) on the parent
1434        // side, then returns a u64.
1435        let constructor = DependencyConstructor::Async(Arc::new(move |_view| {
1436            let counter = constructor_counter.clone();
1437            Box::pin(async move {
1438                tokio::task::yield_now().await;
1439                counter.fetch_add(1, Ordering::SeqCst);
1440                let value: u64 = 0xdead_beef;
1441                Arc::new(value) as Arc<dyn Any + Send + Sync>
1442            }) as Pin<Box<dyn std::future::Future<Output = Arc<dyn Any + Send + Sync>>>>
1443        }));
1444        let codec = CloneableCodec {
1445            to_wire: Arc::new(|any| {
1446                let v: Arc<u64> = any.downcast::<u64>().unwrap();
1447                (*v).to_le_bytes().to_vec()
1448            }),
1449            from_wire_bytes: Arc::new(|bytes| {
1450                let arr: [u8; 8] = bytes.try_into().unwrap();
1451                Arc::new(u64::from_le_bytes(arr)) as Arc<dyn Any + Send + Sync>
1452            }),
1453        };
1454        let dep = RegisteredDependency {
1455            name: "clone_dep".to_string(),
1456            crate_name: "tcrate".to_string(),
1457            module_path: String::new(),
1458            constructor,
1459            dependencies: Vec::new(),
1460            scope: DepScope::Cloneable,
1461            worker_fn: Some(crate::internal::WorkerReconstructor::Sync(Arc::new(
1462                |wire_payload, _| wire_payload,
1463            ))),
1464            cloneable_codec: Some(codec),
1465            hosted_codec: None,
1466            rpc_factory: None,
1467            companions: Vec::new(),
1468        };
1469        let test = registered_test("t1", vec!["clone_dep".to_string()]);
1470
1471        let (execution, _filtered) =
1472            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1473
1474        let runtime = tokio::runtime::Builder::new_current_thread()
1475            .enable_all()
1476            .build()
1477            .unwrap();
1478        let collected = runtime.block_on(execution.collect_cloneable_wire_bytes_async());
1479
1480        assert_eq!(collected.len(), 1);
1481        assert_eq!(collected[0].0, "tcrate::clone_dep");
1482        assert_eq!(collected[0].1.as_slice(), &0xdead_beef_u64.to_le_bytes());
1483        assert_eq!(
1484            counter.load(Ordering::SeqCst),
1485            1,
1486            "async constructor must have run exactly once"
1487        );
1488    }
1489
1490    /// Regression test: two cloneable deps that share a local `name` but live
1491    /// in different modules must not collide on the wire and must not
1492    /// cross-apply on the worker side.
1493    #[test]
1494    fn cloneable_value_routing_uses_qualified_id_across_modules() {
1495        let counter_a = Arc::new(AtomicUsize::new(0));
1496        let counter_b = Arc::new(AtomicUsize::new(0));
1497
1498        // Same local name `clone_dep`, different module paths.
1499        let dep_a = registered_cloneable_dep_in("clone_dep", "mod_a", 11, counter_a.clone());
1500        let dep_b = registered_cloneable_dep_in("clone_dep", "mod_b", 22, counter_b.clone());
1501
1502        // One test per module, each takes "clone_dep" from its own module.
1503        let test_a = registered_test_in_module("t_a", "mod_a", vec!["clone_dep".to_string()]);
1504        let test_b = registered_test_in_module("t_b", "mod_b", vec!["clone_dep".to_string()]);
1505
1506        let (execution, _filtered) = TestSuiteExecution::construct(
1507            &Arguments::default(),
1508            &[dep_a, dep_b],
1509            &[test_a, test_b],
1510            &[],
1511        );
1512
1513        // The wire bytes must carry distinct qualified ids and distinct payloads.
1514        let mut collected = execution.collect_cloneable_wire_bytes_sync();
1515        collected.sort_by(|l, r| l.0.cmp(&r.0));
1516        assert_eq!(collected.len(), 2);
1517        assert_eq!(collected[0].0, "tcrate::mod_a::clone_dep");
1518        assert_eq!(collected[1].0, "tcrate::mod_b::clone_dep");
1519        assert_eq!(collected[0].1.as_slice(), &11_u64.to_le_bytes());
1520        assert_eq!(collected[1].1.as_slice(), &22_u64.to_le_bytes());
1521
1522        // Worker-side routing: pre-populating with `mod_a`'s qualified id must
1523        // apply only to `mod_a`'s dep, and similarly for `mod_b`. If the
1524        // routing fell back to plain `name`, both nodes would be updated.
1525        let mut execution_a = execution;
1526        let applied_a =
1527            execution_a.provide_cloneable_value("tcrate::mod_a::clone_dep", Arc::new(111_u64));
1528        assert!(applied_a, "mod_a dep must be reachable by qualified id");
1529        let applied_b =
1530            execution_a.provide_cloneable_value("tcrate::mod_b::clone_dep", Arc::new(222_u64));
1531        assert!(applied_b, "mod_b dep must be reachable by qualified id");
1532
1533        // An unrelated qualified id must not apply to anything.
1534        let applied_unknown =
1535            execution_a.provide_cloneable_value("tcrate::mod_c::clone_dep", Arc::new(333_u64));
1536        assert!(
1537            !applied_unknown,
1538            "unknown qualified id must not be applied anywhere"
1539        );
1540
1541        // Pick both tests and confirm the per-module values stayed separate.
1542        let first = execution_a.pick_next_sync().expect("first test");
1543        let second = execution_a.pick_next_sync().expect("second test");
1544
1545        let pairs: Vec<(String, u64)> = [first, second]
1546            .into_iter()
1547            .map(|n| {
1548                let v: Arc<u64> = n
1549                    .deps
1550                    .get("clone_dep")
1551                    .expect("dep available")
1552                    .clone()
1553                    .downcast()
1554                    .unwrap();
1555                (n.test.name.clone(), *v)
1556            })
1557            .collect();
1558
1559        let val_a = pairs
1560            .iter()
1561            .find(|(n, _)| n == "t_a")
1562            .expect("t_a picked")
1563            .1;
1564        let val_b = pairs
1565            .iter()
1566            .find(|(n, _)| n == "t_b")
1567            .expect("t_b picked")
1568            .1;
1569        assert_eq!(
1570            val_a, 111,
1571            "mod_a test must see mod_a's pre-populated value"
1572        );
1573        assert_eq!(
1574            val_b, 222,
1575            "mod_b test must see mod_b's pre-populated value"
1576        );
1577
1578        // Each per-module constructor ran exactly once — during the parent-side
1579        // wire-bytes collection above. The worker-side `provide_cloneable_value`
1580        // calls must NOT have triggered the constructor a second time on either
1581        // node (otherwise the qualified-id routing is wrong / it cross-applied).
1582        assert_eq!(
1583            counter_a.load(Ordering::SeqCst),
1584            1,
1585            "mod_a constructor must have run exactly once (during wire collection)"
1586        );
1587        assert_eq!(
1588            counter_b.load(Ordering::SeqCst),
1589            1,
1590            "mod_b constructor must have run exactly once (during wire collection)"
1591        );
1592    }
1593
1594    // -------- Hosted dep tests --------
1595
1596    /// Builds a Hosted RegisteredDependency for tests. Owner value is a u64
1597    /// (`payload`). `descriptor()` is modelled by the codec's `to_wire` as
1598    /// the LE bytes of `payload`, and `from_descriptor` is modelled by the
1599    /// worker_fn which downcasts the bytes and rebuilds a u64 — so we can
1600    /// observe both halves of the Hosted round-trip without depending on
1601    /// the user-facing `HostedDep` trait inside this private test helper.
1602    fn registered_hosted_dep(
1603        name: &str,
1604        payload: u64,
1605        owner_counter: Arc<AtomicUsize>,
1606    ) -> RegisteredDependency {
1607        registered_hosted_dep_in(name, "", payload, owner_counter)
1608    }
1609
1610    /// Like [`registered_hosted_dep`] but lets the caller pick the dep's
1611    /// module path — so a collision test can assert two same-named Hosted
1612    /// deps in different modules don't get crossed up on the wire / in the
1613    /// worker routing.
1614    fn registered_hosted_dep_in(
1615        name: &str,
1616        module_path: &str,
1617        payload: u64,
1618        owner_counter: Arc<AtomicUsize>,
1619    ) -> RegisteredDependency {
1620        let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
1621            owner_counter.fetch_add(1, Ordering::SeqCst);
1622            Arc::new(payload) as Arc<dyn Any + Send + Sync>
1623        }));
1624        let codec = CloneableCodec {
1625            // descriptor() on the owner: encode the payload as LE bytes
1626            to_wire: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
1627                let v: Arc<u64> = any.downcast::<u64>().unwrap();
1628                (*v).to_le_bytes().to_vec()
1629            }),
1630            // worker side: box the bytes as Any (worker_fn does
1631            // from_descriptor on them)
1632            from_wire_bytes: Arc::new(|bytes: &[u8]| {
1633                let boxed: Vec<u8> = bytes.to_vec();
1634                Arc::new(boxed) as Arc<dyn Any + Send + Sync>
1635            }),
1636        };
1637        let worker_fn =
1638            crate::internal::WorkerReconstructor::Sync(Arc::new(|wire_payload, _deps| {
1639                let bytes_arc: Arc<Vec<u8>> = wire_payload.downcast::<Vec<u8>>().unwrap();
1640                let arr: [u8; 8] = (*bytes_arc).as_slice().try_into().unwrap();
1641                let value: u64 = u64::from_le_bytes(arr);
1642                Arc::new(value) as Arc<dyn Any + Send + Sync>
1643            }));
1644        RegisteredDependency {
1645            name: name.to_string(),
1646            crate_name: "tcrate".to_string(),
1647            module_path: module_path.to_string(),
1648            constructor,
1649            dependencies: Vec::new(),
1650            scope: DepScope::Hosted,
1651            worker_fn: Some(worker_fn),
1652            cloneable_codec: None,
1653            hosted_codec: Some(codec),
1654            rpc_factory: None,
1655            companions: Vec::new(),
1656        }
1657    }
1658
1659    #[test]
1660    fn hosted_descriptor_collection_runs_owner_once_and_keeps_it_alive() {
1661        let owner_counter = Arc::new(AtomicUsize::new(0));
1662        let dep = registered_hosted_dep("hosted_dep", 0xcafe_babe_dead_beef, owner_counter.clone());
1663        let test = registered_test("t1", vec!["hosted_dep".to_string()]);
1664
1665        let (execution, _filtered) =
1666            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1667
1668        let (descriptors, owners) = execution.collect_hosted_descriptor_bytes_sync();
1669        assert_eq!(descriptors.len(), 1, "exactly one hosted dep expected");
1670        assert_eq!(owners.len(), 1, "exactly one hosted owner kept alive");
1671
1672        let (dep_id, descriptor_bytes) = &descriptors[0];
1673        assert_eq!(
1674            dep_id, "tcrate::hosted_dep",
1675            "descriptor must be keyed by the fully-qualified id"
1676        );
1677        assert_eq!(
1678            descriptor_bytes.as_slice(),
1679            &0xcafe_babe_dead_beef_u64.to_le_bytes(),
1680            "expected descriptor bytes to match codec.to_wire of payload"
1681        );
1682        assert_eq!(
1683            owner_counter.load(Ordering::SeqCst),
1684            1,
1685            "owner constructor must have run exactly once"
1686        );
1687
1688        // The returned owner Arc<dyn Any> must wrap the same payload value
1689        // — i.e. the parent really is holding the owner alive.
1690        let held: Arc<u64> = owners[0].clone().downcast::<u64>().unwrap();
1691        assert_eq!(*held, 0xcafe_babe_dead_beef);
1692    }
1693
1694    #[test]
1695    fn hosted_descriptor_roundtrips_to_worker_value_via_provide_cloneable_value() {
1696        let owner_counter = Arc::new(AtomicUsize::new(0));
1697        let dep = registered_hosted_dep("hosted_dep", 0x1234_5678_u64, owner_counter.clone());
1698        let test = registered_test("t1", vec!["hosted_dep".to_string()]);
1699
1700        let (mut execution, _filtered) =
1701            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1702
1703        // Worker-side simulation: pre-populate a reconstructed value (this
1704        // is what `apply_provided_wire_bytes` does after running the worker_fn
1705        // against the descriptor bytes).
1706        let pre_value: Arc<dyn Any + Send + Sync> = Arc::new(0x1234_5678_u64);
1707        let applied = execution.provide_cloneable_value("tcrate::hosted_dep", pre_value);
1708        assert!(
1709            applied,
1710            "Hosted dep must accept pre-populated values via the same path as Cloneable"
1711        );
1712
1713        // Pick the test — the owner constructor must NOT run on the worker
1714        // side (we provided a value directly).
1715        let next = execution.pick_next_sync().expect("test should be picked");
1716        let view = next.deps.get("hosted_dep").expect("dep available");
1717        let value: Arc<u64> = view.downcast::<u64>().unwrap();
1718        assert_eq!(*value, 0x1234_5678);
1719        assert_eq!(
1720            owner_counter.load(Ordering::SeqCst),
1721            0,
1722            "Hosted owner constructor must not run on the worker side"
1723        );
1724    }
1725
1726    #[test]
1727    fn has_hosted_dependencies_reports_correctly() {
1728        let dep = registered_hosted_dep("h", 0, Arc::new(AtomicUsize::new(0)));
1729        let test = registered_test("t1", vec!["h".to_string()]);
1730        let (execution, _filtered) =
1731            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1732        assert!(execution.has_hosted_dependencies());
1733        assert!(!execution.has_shared_dependencies());
1734        assert!(!execution.has_cloneable_dependencies());
1735    }
1736
1737    /// The owner constructor must run EXACTLY once even with multiple workers —
1738    /// descriptors are computed once on the parent and shipped to each worker.
1739    #[test]
1740    fn hosted_owner_runs_exactly_once_even_when_collecting_multiple_times() {
1741        // The collector is what the parent calls once; we verify the
1742        // expected invariant: a single collect call invokes the owner once
1743        // (even if multiple Hosted deps share the same dep id structure).
1744        let counter_a = Arc::new(AtomicUsize::new(0));
1745        let counter_b = Arc::new(AtomicUsize::new(0));
1746
1747        // Two distinct Hosted deps in the same module/crate.
1748        let mut dep_a = registered_hosted_dep("hosted_a", 1, counter_a.clone());
1749        dep_a.name = "hosted_a".to_string();
1750        let mut dep_b = registered_hosted_dep("hosted_b", 2, counter_b.clone());
1751        dep_b.name = "hosted_b".to_string();
1752        let test = registered_test("t1", vec!["hosted_a".to_string(), "hosted_b".to_string()]);
1753
1754        let (execution, _filtered) =
1755            TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test], &[]);
1756
1757        let (descriptors, owners) = execution.collect_hosted_descriptor_bytes_sync();
1758        assert_eq!(descriptors.len(), 2);
1759        assert_eq!(owners.len(), 2);
1760        assert_eq!(counter_a.load(Ordering::SeqCst), 1);
1761        assert_eq!(counter_b.load(Ordering::SeqCst), 1);
1762    }
1763
1764    /// Regression test for qualified-id routing on Hosted deps: two Hosted
1765    /// deps that share a local `name` but live in different modules must
1766    /// not collide on the wire and must not cross-apply on the worker side.
1767    /// This mirrors `cloneable_value_routing_uses_qualified_id_across_modules`
1768    /// to make sure the same hardened routing applies to descriptor bytes.
1769    #[test]
1770    fn hosted_descriptor_routing_uses_qualified_id_across_modules() {
1771        let counter_a = Arc::new(AtomicUsize::new(0));
1772        let counter_b = Arc::new(AtomicUsize::new(0));
1773
1774        // Same local name `hosted_dep`, different module paths.
1775        let dep_a = registered_hosted_dep_in("hosted_dep", "mod_a", 11, counter_a.clone());
1776        let dep_b = registered_hosted_dep_in("hosted_dep", "mod_b", 22, counter_b.clone());
1777
1778        let test_a = registered_test_in_module("t_a", "mod_a", vec!["hosted_dep".to_string()]);
1779        let test_b = registered_test_in_module("t_b", "mod_b", vec!["hosted_dep".to_string()]);
1780
1781        let (execution, _filtered) = TestSuiteExecution::construct(
1782            &Arguments::default(),
1783            &[dep_a, dep_b],
1784            &[test_a, test_b],
1785            &[],
1786        );
1787
1788        // Descriptor bytes must carry distinct qualified ids and distinct payloads.
1789        let (mut descriptors, _owners) = execution.collect_hosted_descriptor_bytes_sync();
1790        descriptors.sort_by(|l, r| l.0.cmp(&r.0));
1791        assert_eq!(descriptors.len(), 2);
1792        assert_eq!(descriptors[0].0, "tcrate::mod_a::hosted_dep");
1793        assert_eq!(descriptors[1].0, "tcrate::mod_b::hosted_dep");
1794        assert_eq!(descriptors[0].1.as_slice(), &11_u64.to_le_bytes());
1795        assert_eq!(descriptors[1].1.as_slice(), &22_u64.to_le_bytes());
1796
1797        // Worker-side routing: pre-populating with `mod_a`'s qualified id
1798        // must apply only to `mod_a`'s dep, and similarly for `mod_b`.
1799        // Hosted deps use the same routing pathway as Cloneable, so the same
1800        // qualified-id-routing guarantee applies here.
1801        let mut execution = execution;
1802        let applied_a =
1803            execution.provide_cloneable_value("tcrate::mod_a::hosted_dep", Arc::new(111_u64));
1804        assert!(
1805            applied_a,
1806            "mod_a hosted dep must be reachable by qualified id"
1807        );
1808        let applied_b =
1809            execution.provide_cloneable_value("tcrate::mod_b::hosted_dep", Arc::new(222_u64));
1810        assert!(
1811            applied_b,
1812            "mod_b hosted dep must be reachable by qualified id"
1813        );
1814
1815        let applied_unknown =
1816            execution.provide_cloneable_value("tcrate::mod_c::hosted_dep", Arc::new(333_u64));
1817        assert!(
1818            !applied_unknown,
1819            "unknown qualified id must not be applied to any dep"
1820        );
1821
1822        let first = execution.pick_next_sync().expect("first test");
1823        let second = execution.pick_next_sync().expect("second test");
1824        let pairs: Vec<(String, u64)> = [first, second]
1825            .into_iter()
1826            .map(|n| {
1827                let v: Arc<u64> = n
1828                    .deps
1829                    .get("hosted_dep")
1830                    .expect("dep available")
1831                    .clone()
1832                    .downcast()
1833                    .unwrap();
1834                (n.test.name.clone(), *v)
1835            })
1836            .collect();
1837
1838        let val_a = pairs
1839            .iter()
1840            .find(|(n, _)| n == "t_a")
1841            .expect("t_a picked")
1842            .1;
1843        let val_b = pairs
1844            .iter()
1845            .find(|(n, _)| n == "t_b")
1846            .expect("t_b picked")
1847            .1;
1848        assert_eq!(val_a, 111);
1849        assert_eq!(val_b, 222);
1850
1851        // Each per-module owner constructor must have run exactly once
1852        // (during the parent-side descriptor collection above); the
1853        // worker-side provide_cloneable_value calls must not have re-run
1854        // them on either node.
1855        assert_eq!(counter_a.load(Ordering::SeqCst), 1);
1856        assert_eq!(counter_b.load(Ordering::SeqCst), 1);
1857    }
1858
1859    /// Mode-consistency regression test for the Hosted scope: when the
1860    /// runner does NOT spawn workers (e.g. `--nocapture`), tests must
1861    /// still see the *worker-side handle* produced by the registered
1862    /// `worker_fn` (i.e. `HostedDep::from_descriptor`), not the raw owner
1863    /// value returned by the parent constructor. This exercises the same
1864    /// codec + worker_fn round-trip that the runner-side
1865    /// `apply_hosted_descriptors_locally` helpers in sync.rs / tokio.rs
1866    /// perform on the no-spawn-workers path.
1867    #[test]
1868    fn hosted_no_spawn_workers_uses_worker_side_handle() {
1869        // Build a Hosted dep whose owner is one u64 value but whose
1870        // worker reconstructor produces a DIFFERENT u64 value. If the
1871        // local code path goes through descriptor->worker_fn correctly,
1872        // the test must see the worker value (not the owner value).
1873        let owner_counter = Arc::new(AtomicUsize::new(0));
1874        let constructor_counter = owner_counter.clone();
1875        let owner_value: u64 = 0xAAAA_AAAA_AAAA_AAAA_u64;
1876        let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
1877            constructor_counter.fetch_add(1, Ordering::SeqCst);
1878            Arc::new(owner_value) as Arc<dyn Any + Send + Sync>
1879        }));
1880        // Owner-side codec serialises the owner value as raw LE bytes.
1881        // The worker side wraps those bytes in `Vec<u8>` and the
1882        // worker_fn flips every bit to demonstrate the worker reconstruction
1883        // path is taken (the bit-flip stands in for any non-identity
1884        // `HostedDep::from_descriptor` implementation).
1885        let codec = CloneableCodec {
1886            to_wire: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
1887                let v: Arc<u64> = any.downcast::<u64>().unwrap();
1888                (*v).to_le_bytes().to_vec()
1889            }),
1890            from_wire_bytes: Arc::new(|bytes: &[u8]| {
1891                let boxed: Vec<u8> = bytes.to_vec();
1892                Arc::new(boxed) as Arc<dyn Any + Send + Sync>
1893            }),
1894        };
1895        let worker_fn =
1896            crate::internal::WorkerReconstructor::Sync(Arc::new(|wire_payload, _deps| {
1897                let bytes_arc: Arc<Vec<u8>> = wire_payload.downcast::<Vec<u8>>().unwrap();
1898                let arr: [u8; 8] = (*bytes_arc).as_slice().try_into().unwrap();
1899                let raw: u64 = u64::from_le_bytes(arr);
1900                let handle_value: u64 = !raw;
1901                Arc::new(handle_value) as Arc<dyn Any + Send + Sync>
1902            }));
1903        let dep = RegisteredDependency {
1904            name: "hosted_dep".to_string(),
1905            crate_name: "tcrate".to_string(),
1906            module_path: String::new(),
1907            constructor,
1908            dependencies: Vec::new(),
1909            scope: DepScope::Hosted,
1910            worker_fn: Some(worker_fn.clone()),
1911            cloneable_codec: None,
1912            hosted_codec: Some(codec.clone()),
1913            rpc_factory: None,
1914            companions: Vec::new(),
1915        };
1916        let test = registered_test("t1", vec!["hosted_dep".to_string()]);
1917
1918        let (mut execution, _filtered) =
1919            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
1920
1921        // Parent runs the owner constructor once and collects descriptor bytes
1922        // (mirroring `collect_hosted_descriptor_bytes_sync` invoked by the
1923        // no-spawn-workers parent runner).
1924        let (descriptors, owners) = execution.collect_hosted_descriptor_bytes_sync();
1925        assert_eq!(descriptors.len(), 1);
1926        assert_eq!(owners.len(), 1);
1927        let (dep_id, wire_bytes) = &descriptors[0];
1928
1929        // Parent reconstructs the WORKER-side handle locally via
1930        // codec.from_wire_bytes + worker_fn (mirroring
1931        // `apply_hosted_descriptors_locally` in sync.rs / tokio.rs).
1932        let wire_payload = (codec.from_wire_bytes)(wire_bytes.as_slice());
1933        let empty_deps: Arc<dyn crate::internal::DependencyView + Send + Sync> =
1934            Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
1935        let reconstructed = match &worker_fn {
1936            crate::internal::WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
1937            crate::internal::WorkerReconstructor::Async(_) => unreachable!(),
1938        };
1939        let applied = execution.provide_cloneable_value(dep_id, reconstructed);
1940        assert!(applied);
1941
1942        // Pick the test — it must see the WORKER handle (~owner), NOT the owner
1943        // constructor's return value.
1944        let next = execution.pick_next_sync().expect("test picked");
1945        let view = next.deps.get("hosted_dep").expect("dep available");
1946        let value: Arc<u64> = view.clone().downcast::<u64>().unwrap();
1947        assert_eq!(
1948            *value,
1949            !owner_value,
1950            "Hosted dep must expose the worker-side handle (from_descriptor) even in the no-spawn-workers path"
1951        );
1952        assert_eq!(
1953            owner_counter.load(Ordering::SeqCst),
1954            1,
1955            "owner constructor must have run exactly once during descriptor collection"
1956        );
1957    }
1958
1959    /// Hosted owners construct their dependencies in the parent collection
1960    /// context. Worker-side values reconstructed from descriptors are leaves;
1961    /// the constructor dependencies are not re-created from wire bytes.
1962    #[test]
1963    fn hosted_dep_with_owner_dependencies_constructs_in_parent_context() {
1964        let dep_counter = Arc::new(AtomicUsize::new(0));
1965        let owner_counter = Arc::new(AtomicUsize::new(0));
1966        let dep = registered_cloneable_dep("some_other_dep", dep_counter.clone());
1967        let mut hosted = registered_hosted_dep("h_with_deps", 0, owner_counter.clone());
1968        hosted.dependencies = vec!["some_other_dep".to_string()];
1969        let test = registered_test("t1", vec!["h_with_deps".to_string()]);
1970        let (execution, _filtered) =
1971            TestSuiteExecution::construct(&Arguments::default(), &[dep, hosted], &[test], &[]);
1972        let collected = execution.collect_parent_shared_dependencies_sync();
1973
1974        assert_eq!(collected.hosted_descriptor_bytes.len(), 1);
1975        assert_eq!(dep_counter.load(Ordering::SeqCst), 1);
1976        assert_eq!(owner_counter.load(Ordering::SeqCst), 1);
1977    }
1978
1979    /// Tokio path: async owner constructors are awaited on the parent's
1980    /// collector.
1981    #[cfg(feature = "tokio")]
1982    #[test]
1983    fn async_hosted_descriptor_collection_awaits_async_constructor() {
1984        use std::pin::Pin;
1985
1986        let counter = Arc::new(AtomicUsize::new(0));
1987        let constructor_counter = counter.clone();
1988
1989        let constructor = DependencyConstructor::Async(Arc::new(move |_view| {
1990            let counter = constructor_counter.clone();
1991            Box::pin(async move {
1992                tokio::task::yield_now().await;
1993                counter.fetch_add(1, Ordering::SeqCst);
1994                let value: u64 = 42;
1995                Arc::new(value) as Arc<dyn Any + Send + Sync>
1996            }) as Pin<Box<dyn std::future::Future<Output = Arc<dyn Any + Send + Sync>>>>
1997        }));
1998        let codec = CloneableCodec {
1999            to_wire: Arc::new(|any| {
2000                let v: Arc<u64> = any.downcast::<u64>().unwrap();
2001                (*v).to_le_bytes().to_vec()
2002            }),
2003            from_wire_bytes: Arc::new(|bytes| {
2004                let boxed: Vec<u8> = bytes.to_vec();
2005                Arc::new(boxed) as Arc<dyn Any + Send + Sync>
2006            }),
2007        };
2008        let dep = RegisteredDependency {
2009            name: "hosted_async".to_string(),
2010            crate_name: "tcrate".to_string(),
2011            module_path: String::new(),
2012            constructor,
2013            dependencies: Vec::new(),
2014            scope: DepScope::Hosted,
2015            worker_fn: Some(crate::internal::WorkerReconstructor::Sync(Arc::new(
2016                |wire_payload, _| {
2017                    let bytes_arc: Arc<Vec<u8>> = wire_payload.downcast::<Vec<u8>>().unwrap();
2018                    let arr: [u8; 8] = (*bytes_arc).as_slice().try_into().unwrap();
2019                    let value: u64 = u64::from_le_bytes(arr);
2020                    Arc::new(value) as Arc<dyn Any + Send + Sync>
2021                },
2022            ))),
2023            cloneable_codec: None,
2024            hosted_codec: Some(codec),
2025            rpc_factory: None,
2026            companions: Vec::new(),
2027        };
2028        let test = registered_test("t1", vec!["hosted_async".to_string()]);
2029
2030        let (execution, _filtered) =
2031            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2032
2033        let runtime = tokio::runtime::Builder::new_current_thread()
2034            .enable_all()
2035            .build()
2036            .unwrap();
2037        let (descriptors, owners) =
2038            runtime.block_on(execution.collect_hosted_descriptor_bytes_async());
2039
2040        assert_eq!(descriptors.len(), 1);
2041        assert_eq!(owners.len(), 1);
2042        assert_eq!(descriptors[0].0, "tcrate::hosted_async");
2043        assert_eq!(descriptors[0].1.as_slice(), &42_u64.to_le_bytes());
2044        assert_eq!(counter.load(Ordering::SeqCst), 1);
2045
2046        let held: Arc<u64> = owners[0].clone().downcast::<u64>().unwrap();
2047        assert_eq!(*held, 42);
2048    }
2049
2050    // ===========================================================
2051    // HostedRpc unit tests
2052    // ===========================================================
2053
2054    use crate::internal::{
2055        HostedRpcChannel, HostedRpcDep, HostedRpcError, HostedRpcOwnerCell, HostedRpcTransport,
2056        InProcessHostedRpcTransport, RpcFactory,
2057    };
2058
2059    /// Owner type for HostedRpc unit tests. A trivial monotonic counter
2060    /// that increments on every `dispatch(method_idx=1, _)` call and
2061    /// returns the new value as big-endian bytes.
2062    struct RpcCounter {
2063        n: u64,
2064    }
2065
2066    impl HostedRpcDep for RpcCounter {
2067        type Stub = RpcCounterStub;
2068        fn dispatch(&mut self, method_idx: u32, args: &[u8]) -> Result<Vec<u8>, String> {
2069            match method_idx {
2070                1 => {
2071                    self.n += 1;
2072                    Ok(self.n.to_be_bytes().to_vec())
2073                }
2074                // Large-payload echo. Args are a 4-byte big-endian
2075                // u32 size; the owner returns `size` bytes filled with a
2076                // deterministic `i % 251` pattern so framing corruption
2077                // is caught explicitly, not just length mismatch.
2078                2 => {
2079                    let arr: [u8; 4] = args
2080                        .try_into()
2081                        .map_err(|_| "method_idx=2 requires exactly 4 bytes (size)".to_string())?;
2082                    let size = u32::from_be_bytes(arr) as usize;
2083                    let mut out = vec![0u8; size];
2084                    for (i, b) in out.iter_mut().enumerate() {
2085                        *b = (i % 251) as u8;
2086                    }
2087                    Ok(out)
2088                }
2089                other => Err(format!("RpcCounter: unknown method_idx {other}")),
2090            }
2091        }
2092        fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
2093            RpcCounterStub { channel }
2094        }
2095    }
2096
2097    /// Worker-visible stub for the test owner above.
2098    struct RpcCounterStub {
2099        channel: HostedRpcChannel,
2100    }
2101
2102    impl RpcCounterStub {
2103        fn next(&self) -> u64 {
2104            let bytes = self.channel.call(1, Vec::new()).expect("rpc call");
2105            let arr: [u8; 8] = bytes.as_slice().try_into().unwrap();
2106            u64::from_be_bytes(arr)
2107        }
2108
2109        /// Request `size` bytes back from the owner.
2110        fn echo(&self, size: u32) -> Vec<u8> {
2111            self.channel
2112                .call(2, size.to_be_bytes().to_vec())
2113                .expect("echo rpc call")
2114        }
2115    }
2116
2117    /// Builds a HostedRpc `RegisteredDependency` for tests. The constructor
2118    /// wraps an [`RpcCounter`] into a [`HostedRpcOwnerCell`] (mirroring the
2119    /// macro-emitted code), counts its own runs in `counter`, and the
2120    /// `RpcFactory` performs the symmetric downcast back to a cell.
2121    fn registered_hosted_rpc_dep(
2122        name: &str,
2123        module_path: &str,
2124        owner_counter: Arc<AtomicUsize>,
2125    ) -> RegisteredDependency {
2126        let ctor_counter = owner_counter.clone();
2127        let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
2128            ctor_counter.fetch_add(1, Ordering::SeqCst);
2129            let cell = HostedRpcOwnerCell::from_owner(RpcCounter { n: 0 });
2130            Arc::new(cell) as Arc<dyn Any + Send + Sync>
2131        }));
2132        let factory = RpcFactory {
2133            owner_into_cell: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
2134                any.downcast::<HostedRpcOwnerCell>()
2135                    .expect("HostedRpc owner downcast")
2136            }),
2137            build_stub: Arc::new(|channel: HostedRpcChannel| {
2138                let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
2139                Arc::new(stub) as Arc<dyn Any + Send + Sync>
2140            }),
2141        };
2142        RegisteredDependency {
2143            name: name.to_string(),
2144            crate_name: "tcrate".to_string(),
2145            module_path: module_path.to_string(),
2146            constructor,
2147            dependencies: Vec::new(),
2148            scope: DepScope::HostedRpc,
2149            worker_fn: None,
2150            cloneable_codec: None,
2151            hosted_codec: None,
2152            rpc_factory: Some(factory),
2153            companions: Vec::new(),
2154        }
2155    }
2156
2157    #[test]
2158    fn hosted_rpc_owner_cells_collected_once_and_keyed_by_qualified_id() {
2159        let counter = Arc::new(AtomicUsize::new(0));
2160        let dep = registered_hosted_rpc_dep("rpc_dep", "", counter.clone());
2161        let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2162
2163        let (execution, _filtered) =
2164            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2165
2166        assert!(execution.has_hosted_rpc_dependencies());
2167
2168        let cells = execution.collect_hosted_rpc_owner_cells_sync();
2169        assert_eq!(cells.len(), 1, "exactly one hosted rpc dep expected");
2170        let (dep_id, _cell) = &cells[0];
2171        assert_eq!(dep_id, "tcrate::rpc_dep");
2172        assert_eq!(
2173            counter.load(Ordering::SeqCst),
2174            1,
2175            "owner constructor must run exactly once on the parent"
2176        );
2177
2178        // Collecting again must not re-run the constructor in the same
2179        // execution tree (the constructor is called inside collect, not
2180        // memoised). This asserts that we don't accidentally double-collect
2181        // when the runner makes the call.
2182        let cells_b = execution.collect_hosted_rpc_owner_cells_sync();
2183        assert_eq!(cells_b.len(), 1);
2184        assert_eq!(
2185            counter.load(Ordering::SeqCst),
2186            2,
2187            "collect_hosted_rpc_owner_cells_sync runs the constructor on every call; \
2188             callers (the runner) are responsible for only calling it once per suite"
2189        );
2190    }
2191
2192    #[test]
2193    fn hosted_rpc_owner_dependencies_construct_in_parent_context() {
2194        let parent_only_counter = Arc::new(AtomicUsize::new(0));
2195        let owner_counter = Arc::new(AtomicUsize::new(0));
2196        let parent_only_dep =
2197            registered_cloneable_dep("parent_only_dep", parent_only_counter.clone());
2198        let mut rpc_dep = registered_hosted_rpc_dep("rpc_dep", "", owner_counter.clone());
2199        rpc_dep.dependencies = vec!["parent_only_dep".to_string()];
2200        let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2201
2202        let (execution, _filtered) = TestSuiteExecution::construct(
2203            &Arguments::default(),
2204            &[parent_only_dep, rpc_dep],
2205            &[test],
2206            &[],
2207        );
2208
2209        let cells = execution.collect_hosted_rpc_owner_cells_sync();
2210        assert_eq!(cells.len(), 1);
2211        assert_eq!(parent_only_counter.load(Ordering::SeqCst), 1);
2212        assert_eq!(owner_counter.load(Ordering::SeqCst), 1);
2213    }
2214
2215    #[test]
2216    fn hosted_rpc_in_process_transport_routes_to_owner_cell() {
2217        let counter = Arc::new(AtomicUsize::new(0));
2218        let dep = registered_hosted_rpc_dep("rpc_dep", "", counter.clone());
2219        let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2220
2221        let (execution, _filtered) =
2222            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2223
2224        let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
2225            .collect_hosted_rpc_owner_cells_sync()
2226            .into_iter()
2227            .collect();
2228
2229        let transport: Arc<dyn HostedRpcTransport> =
2230            Arc::new(InProcessHostedRpcTransport::new(cells.clone()));
2231        let channel = HostedRpcChannel::new("tcrate::rpc_dep".to_string(), transport.clone());
2232        let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
2233
2234        assert_eq!(stub.next(), 1);
2235        assert_eq!(stub.next(), 2);
2236        assert_eq!(stub.next(), 3);
2237    }
2238
2239    #[test]
2240    fn hosted_rpc_in_process_transport_returns_dispatch_error_on_unknown_method() {
2241        let counter = Arc::new(AtomicUsize::new(0));
2242        let dep = registered_hosted_rpc_dep("rpc_dep", "", counter);
2243        let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2244
2245        let (execution, _filtered) =
2246            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2247
2248        let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
2249            .collect_hosted_rpc_owner_cells_sync()
2250            .into_iter()
2251            .collect();
2252        let transport: Arc<dyn HostedRpcTransport> =
2253            Arc::new(InProcessHostedRpcTransport::new(cells.clone()));
2254        let channel = HostedRpcChannel::new("tcrate::rpc_dep".to_string(), transport.clone());
2255
2256        // Call an unknown method index directly; the owner's `dispatch`
2257        // returns `Err("…")` which the transport surfaces as a Dispatch error.
2258        let err = channel.call(999, Vec::new()).unwrap_err();
2259        match err {
2260            HostedRpcError::Dispatch(msg) => {
2261                assert!(
2262                    msg.contains("unknown method_idx 999"),
2263                    "expected dispatch error to mention method_idx, got '{msg}'"
2264                );
2265            }
2266            HostedRpcError::Transport(msg) => {
2267                panic!("expected Dispatch error, got Transport({msg})");
2268            }
2269        }
2270    }
2271
2272    #[test]
2273    fn hosted_rpc_in_process_transport_returns_transport_error_on_unknown_dep_id() {
2274        let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = HashMap::new();
2275        let transport: Arc<dyn HostedRpcTransport> =
2276            Arc::new(InProcessHostedRpcTransport::new(cells));
2277        let channel = HostedRpcChannel::new("tcrate::missing_dep".to_string(), transport.clone());
2278        let err = channel.call(1, Vec::new()).unwrap_err();
2279        match err {
2280            HostedRpcError::Transport(msg) => {
2281                assert!(
2282                    msg.contains("unknown dep id 'tcrate::missing_dep'"),
2283                    "expected transport error to mention dep id, got '{msg}'"
2284                );
2285            }
2286            HostedRpcError::Dispatch(msg) => {
2287                panic!("expected Transport error, got Dispatch({msg})");
2288            }
2289        }
2290    }
2291
2292    // -------------------------------------------------------------
2293    // Coverage for owner panic + mutex poisoning. The owner-cell catches the
2294    // panic, turns it into `Err("hosted rpc owner panicked: ...")` for the
2295    // first call, and subsequent calls hit the poisoned mutex and get the
2296    // stable `"hosted rpc owner poisoned"` error.
2297    // -------------------------------------------------------------
2298
2299    /// Owner that panics on every dispatch call. Used to exercise the
2300    /// catch_unwind + poisoned-mutex paths in `HostedRpcOwnerCell::dispatch`.
2301    struct PanickingRpcOwner;
2302
2303    impl HostedRpcDep for PanickingRpcOwner {
2304        type Stub = RpcCounterStub;
2305        fn dispatch(&mut self, _method_idx: u32, _args: &[u8]) -> Result<Vec<u8>, String> {
2306            panic!("owner_panic_for_test");
2307        }
2308        fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
2309            RpcCounterStub { channel }
2310        }
2311    }
2312
2313    /// Async owner used to verify `AsyncHostedRpcDep`-flavoured cells
2314    /// dispatch through the async path and surface results correctly.
2315    /// Yields once to force the future to actually `.await` (a no-op
2316    /// `std::future::ready` wouldn't exercise the async cell machinery).
2317    #[cfg(feature = "tokio")]
2318    struct AsyncRpcCounter {
2319        n: u64,
2320    }
2321
2322    #[cfg(feature = "tokio")]
2323    impl crate::internal::AsyncHostedRpcDep for AsyncRpcCounter {
2324        type Stub = RpcCounterStub;
2325        async fn dispatch(&mut self, method_idx: u32, _args: &[u8]) -> Result<Vec<u8>, String> {
2326            // Force a real `.await` so the async dispatch machinery
2327            // is actually exercised (not just a `ready(...)` bridge).
2328            ::tokio::task::yield_now().await;
2329            if method_idx == 1 {
2330                self.n += 1;
2331                Ok(self.n.to_be_bytes().to_vec())
2332            } else {
2333                Err(format!("AsyncRpcCounter: unknown method_idx {method_idx}"))
2334            }
2335        }
2336        fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
2337            RpcCounterStub { channel }
2338        }
2339    }
2340
2341    /// Async owner that always panics — exercises the
2342    /// `futures::FutureExt::catch_unwind` + poison-flag machinery on
2343    /// the async cell variant.
2344    #[cfg(feature = "tokio")]
2345    struct PanickingAsyncRpcOwner;
2346
2347    #[cfg(feature = "tokio")]
2348    impl crate::internal::AsyncHostedRpcDep for PanickingAsyncRpcOwner {
2349        type Stub = RpcCounterStub;
2350        async fn dispatch(&mut self, _method_idx: u32, _args: &[u8]) -> Result<Vec<u8>, String> {
2351            ::tokio::task::yield_now().await;
2352            panic!("async_owner_panic_for_test");
2353        }
2354        fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
2355            RpcCounterStub { channel }
2356        }
2357    }
2358
2359    /// End-to-end: an async owner registered via `from_async_owner`
2360    /// dispatches via `dispatch_async` and returns the expected bytes
2361    /// after actually `.await`ing inside its method body.
2362    #[cfg(feature = "tokio")]
2363    #[test]
2364    fn async_hosted_rpc_owner_dispatches_through_async_cell() {
2365        let cell = HostedRpcOwnerCell::from_async_owner(AsyncRpcCounter { n: 0 });
2366        let rt = ::tokio::runtime::Builder::new_multi_thread()
2367            .enable_all()
2368            .build()
2369            .expect("build tokio runtime");
2370
2371        let bytes_a = rt
2372            .block_on(cell.dispatch_async(1, &[]))
2373            .expect("first async dispatch must succeed");
2374        assert_eq!(bytes_a, 1u64.to_be_bytes().to_vec());
2375
2376        let bytes_b = rt
2377            .block_on(cell.dispatch_async(1, &[]))
2378            .expect("second async dispatch must succeed");
2379        assert_eq!(bytes_b, 2u64.to_be_bytes().to_vec());
2380    }
2381
2382    /// An async owner panic must surface as
2383    /// `"hosted rpc owner panicked: ..."` and poison the cell so every
2384    /// subsequent dispatch short-circuits with the stable
2385    /// `"hosted rpc owner poisoned"` error. Mirrors the sync test.
2386    #[cfg(feature = "tokio")]
2387    #[test]
2388    fn async_hosted_rpc_owner_panic_surfaces_then_poisons() {
2389        let cell = HostedRpcOwnerCell::from_async_owner(PanickingAsyncRpcOwner);
2390        let rt = ::tokio::runtime::Builder::new_multi_thread()
2391            .enable_all()
2392            .build()
2393            .expect("build tokio runtime");
2394
2395        let err1 = rt
2396            .block_on(cell.dispatch_async(1, &[]))
2397            .expect_err("first async dispatch must surface the panic as Err");
2398        assert!(
2399            err1.contains("hosted rpc owner panicked: async_owner_panic_for_test"),
2400            "expected first-call error to wrap the async panic payload, got '{err1}'"
2401        );
2402
2403        let err2 = rt
2404            .block_on(cell.dispatch_async(1, &[]))
2405            .expect_err("second async dispatch must short-circuit on the poisoned cell");
2406        assert_eq!(
2407            err2, "hosted rpc owner poisoned",
2408            "expected poisoned-cell error on the second async call, got '{err2}'"
2409        );
2410    }
2411
2412    /// Regression for the async poison race: a second dispatch that
2413    /// parks on `tokio::sync::Mutex::lock().await` *before* the first
2414    /// dispatch panics must still observe the poison flag once it
2415    /// acquires the mutex, and must not re-enter the owner. Without
2416    /// the in-lock re-check the second waiter would get a fresh
2417    /// `MutexGuard` and call the user method on the half-mutated
2418    /// owner.
2419    #[cfg(feature = "tokio")]
2420    #[test]
2421    fn async_hosted_rpc_owner_poison_blocks_concurrent_waiter() {
2422        use std::sync::atomic::{AtomicUsize, Ordering};
2423        use std::sync::Arc;
2424        use std::time::Duration;
2425
2426        /// Counts dispatch entries to verify the second call never
2427        /// re-enters after the first call's panic.
2428        struct OnePanicThenForbidden {
2429            entries: Arc<AtomicUsize>,
2430        }
2431
2432        impl crate::internal::AsyncHostedRpcDep for OnePanicThenForbidden {
2433            type Stub = RpcCounterStub;
2434            async fn dispatch(
2435                &mut self,
2436                _method_idx: u32,
2437                _args: &[u8],
2438            ) -> Result<Vec<u8>, String> {
2439                let n = self.entries.fetch_add(1, Ordering::SeqCst);
2440                // First entry: hold the mutex for long enough that a
2441                // second dispatch parks on `lock().await`, then panic.
2442                if n == 0 {
2443                    ::tokio::time::sleep(Duration::from_millis(50)).await;
2444                    panic!("first_dispatch_panic_poison_race");
2445                }
2446                // Any subsequent re-entry is a bug: the poison flag
2447                // re-check inside the lock should have short-circuited
2448                // before we got here.
2449                panic!("second_dispatch_unexpectedly_re_entered_after_poison");
2450            }
2451            fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
2452                RpcCounterStub { channel }
2453            }
2454        }
2455
2456        let entries = Arc::new(AtomicUsize::new(0));
2457        let cell = Arc::new(HostedRpcOwnerCell::from_async_owner(
2458            OnePanicThenForbidden {
2459                entries: entries.clone(),
2460            },
2461        ));
2462
2463        let rt = ::tokio::runtime::Builder::new_multi_thread()
2464            .enable_all()
2465            .build()
2466            .expect("build tokio runtime");
2467
2468        rt.block_on(async {
2469            let cell_a = cell.clone();
2470            let cell_b = cell.clone();
2471
2472            let first = ::tokio::spawn(async move { cell_a.dispatch_async(1, &[]).await });
2473
2474            // Give `first` a head start so it definitely owns the
2475            // mutex before `second` starts and parks on `lock().await`.
2476            ::tokio::time::sleep(Duration::from_millis(5)).await;
2477
2478            let second = ::tokio::spawn(async move { cell_b.dispatch_async(1, &[]).await });
2479
2480            let first_res = first.await.expect("first task must not be cancelled");
2481            let second_res = second.await.expect("second task must not be cancelled");
2482
2483            let first_err =
2484                first_res.expect_err("first dispatch must surface the panic as Err, not Ok");
2485            assert!(
2486                first_err.contains("hosted rpc owner panicked: first_dispatch_panic_poison_race"),
2487                "expected the first call to surface the panic; got '{first_err}'"
2488            );
2489
2490            let second_err = second_res
2491                .expect_err("second dispatch must short-circuit on the poisoned cell, not Ok");
2492            assert_eq!(
2493                second_err, "hosted rpc owner poisoned",
2494                "expected the second waiter to see the poison flag; got '{second_err}'"
2495            );
2496        });
2497
2498        // Exactly one entry into the owner: the second waiter must
2499        // have been turned away by the poison re-check, never reaching
2500        // the user dispatcher body.
2501        assert_eq!(
2502            entries.load(Ordering::SeqCst),
2503            1,
2504            "owner dispatcher must run at most once across the poisoned pair"
2505        );
2506    }
2507
2508    /// `dispatch_blocking` against an `Async` cell on a multi-thread
2509    /// tokio runtime must succeed (it bridges to `dispatch_async`
2510    /// via `block_in_place` + `block_on`).
2511    #[cfg(feature = "tokio")]
2512    #[test]
2513    fn async_hosted_rpc_dispatch_blocking_drives_async_cell() {
2514        let cell = HostedRpcOwnerCell::from_async_owner(AsyncRpcCounter { n: 0 });
2515        let rt = ::tokio::runtime::Builder::new_multi_thread()
2516            .enable_all()
2517            .build()
2518            .expect("build tokio runtime");
2519        let bytes = rt
2520            .block_on(async {
2521                ::tokio::task::spawn_blocking(move || cell.dispatch_blocking(1, &[])).await
2522            })
2523            .expect("spawn_blocking joined")
2524            .expect("dispatch_blocking must succeed against an async cell on multi-thread rt");
2525        assert_eq!(bytes, 1u64.to_be_bytes().to_vec());
2526    }
2527
2528    /// `dispatch_blocking` on a `current_thread` runtime must return a
2529    /// clean `Err` rather than panicking inside `block_in_place`. The
2530    /// API contract is `Result<_, String>`; a `current_thread` runtime
2531    /// is unsupported but it must not blow up the dispatcher loop.
2532    #[cfg(feature = "tokio")]
2533    #[test]
2534    fn async_hosted_rpc_dispatch_blocking_rejects_current_thread_runtime() {
2535        let cell = HostedRpcOwnerCell::from_async_owner(AsyncRpcCounter { n: 0 });
2536        let rt = ::tokio::runtime::Builder::new_current_thread()
2537            .enable_all()
2538            .build()
2539            .expect("build current-thread tokio runtime");
2540        // `dispatch_blocking` is invoked *from* the current-thread
2541        // runtime so that `Handle::try_current()` resolves to it.
2542        let err = rt
2543            .block_on(async { cell.dispatch_blocking(1, &[]) })
2544            .expect_err("dispatch_blocking must reject current-thread runtimes cleanly");
2545        assert!(
2546            err.contains("multi-threaded"),
2547            "expected the rejection error to mention multi-threaded requirement, got '{err}'"
2548        );
2549    }
2550
2551    /// `InProcessHostedRpcTransport` is the `--nocapture` / no-spawn
2552    /// codepath. Under the tokio runner it must route to async owner
2553    /// cells via the sync `call` -> `dispatch_blocking` bridge so the
2554    /// in-process and IPC modes look identical to the user.
2555    #[cfg(feature = "tokio")]
2556    #[test]
2557    fn async_hosted_rpc_in_process_transport_routes_to_async_cell() {
2558        use std::collections::HashMap;
2559        use std::sync::Arc;
2560
2561        let dep_id = "in_process_async_owner".to_string();
2562        let cell = Arc::new(HostedRpcOwnerCell::from_async_owner(AsyncRpcCounter {
2563            n: 0,
2564        }));
2565        let mut cells = HashMap::new();
2566        cells.insert(dep_id.clone(), cell);
2567
2568        let transport: Arc<dyn crate::internal::HostedRpcTransport> =
2569            Arc::new(InProcessHostedRpcTransport::new(cells));
2570
2571        let rt = ::tokio::runtime::Builder::new_multi_thread()
2572            .enable_all()
2573            .build()
2574            .expect("build tokio runtime");
2575
2576        let transport_clone = transport.clone();
2577        let dep_id_clone = dep_id.clone();
2578        let bytes = rt
2579            .block_on(async move {
2580                ::tokio::task::spawn_blocking(move || {
2581                    transport_clone.call(&dep_id_clone, 1, vec![])
2582                })
2583                .await
2584                .expect("spawn_blocking joined")
2585            })
2586            .expect("first in-process dispatch must succeed");
2587        assert_eq!(bytes, 1u64.to_be_bytes().to_vec());
2588
2589        let transport_clone = transport.clone();
2590        let dep_id_clone = dep_id.clone();
2591        let bytes2 = rt
2592            .block_on(async move {
2593                ::tokio::task::spawn_blocking(move || {
2594                    transport_clone.call(&dep_id_clone, 1, vec![])
2595                })
2596                .await
2597                .expect("spawn_blocking joined")
2598            })
2599            .expect("second in-process dispatch must succeed");
2600        assert_eq!(bytes2, 2u64.to_be_bytes().to_vec());
2601    }
2602
2603    #[test]
2604    fn hosted_rpc_owner_panic_surfaces_then_poisons() {
2605        let cell = HostedRpcOwnerCell::from_owner(PanickingRpcOwner);
2606
2607        // First call: the owner's `dispatch` panics with the literal string
2608        // "owner_panic_for_test"; `HostedRpcOwnerCell::dispatch` catches the
2609        // unwind and converts it into a textual error containing the panic
2610        // payload prefixed with "hosted rpc owner panicked: ".
2611        //
2612        // The catch_unwind catches the panic AFTER the MutexGuard has
2613        // started unwinding, which still leaves the mutex poisoned (verified
2614        // by direct std::sync::Mutex behaviour).
2615        let err1 = cell
2616            .dispatch(1, &[])
2617            .expect_err("first call must surface the panic as Err");
2618        assert!(
2619            err1.contains("hosted rpc owner panicked: owner_panic_for_test"),
2620            "expected first-call error to wrap the panic payload, got '{err1}'"
2621        );
2622
2623        // Second call: the mutex is now poisoned from the panic above.
2624        // The cell must short-circuit with the stable "hosted rpc owner
2625        // poisoned" error and must NOT retry the owner.
2626        let err2 = cell
2627            .dispatch(1, &[])
2628            .expect_err("second call must short-circuit on the poisoned cell");
2629        assert_eq!(
2630            err2, "hosted rpc owner poisoned",
2631            "expected poisoned-cell error on the second call, got '{err2}'"
2632        );
2633    }
2634
2635    // -------------------------------------------------------------
2636    // Large-payload IPC framing coverage (>64 KiB) and
2637    // concurrent in-flight RPC requests routed through the in-process
2638    // transport without deadlock or framing corruption.
2639    // -------------------------------------------------------------
2640
2641    #[test]
2642    fn hosted_rpc_in_process_transport_round_trips_large_payload_exceeding_64_kib() {
2643        let counter = Arc::new(AtomicUsize::new(0));
2644        let dep = registered_hosted_rpc_dep("rpc_dep", "", counter);
2645        let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2646
2647        let (execution, _filtered) =
2648            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2649        let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
2650            .collect_hosted_rpc_owner_cells_sync()
2651            .into_iter()
2652            .collect();
2653        let transport: Arc<dyn HostedRpcTransport> =
2654            Arc::new(InProcessHostedRpcTransport::new(cells));
2655        let channel = HostedRpcChannel::new("tcrate::rpc_dep".to_string(), transport);
2656        let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
2657
2658        const SIZE: u32 = 256 * 1024; // 256 KiB
2659        let bytes = stub.echo(SIZE);
2660        assert_eq!(
2661            bytes.len(),
2662            SIZE as usize,
2663            "framing dropped/truncated bytes"
2664        );
2665        for (i, b) in bytes.iter().enumerate() {
2666            assert_eq!(
2667                *b,
2668                (i % 251) as u8,
2669                "framing corrupted byte at index {i}: expected {}, got {b}",
2670                (i % 251) as u8
2671            );
2672        }
2673    }
2674
2675    #[test]
2676    fn hosted_rpc_in_process_transport_multiplexes_concurrent_calls_from_threads() {
2677        use std::thread;
2678
2679        let counter = Arc::new(AtomicUsize::new(0));
2680        let dep = registered_hosted_rpc_dep("rpc_dep", "", counter);
2681        let test = registered_test("t1", vec!["rpc_dep".to_string()]);
2682
2683        let (execution, _filtered) =
2684            TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
2685        let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
2686            .collect_hosted_rpc_owner_cells_sync()
2687            .into_iter()
2688            .collect();
2689        let transport: Arc<dyn HostedRpcTransport> =
2690            Arc::new(InProcessHostedRpcTransport::new(cells));
2691
2692        // Spawn N threads, each making M calls. Every call must return a
2693        // unique positive id (the owner is a single global counter
2694        // serialised by its own mutex). If the in-process transport ever
2695        // deadlocks or routes a reply to the wrong caller, the assertions
2696        // below would fire (duplicate ids, or the spawned thread would
2697        // panic and the join() would surface the failure).
2698        const N: usize = 4;
2699        const M: usize = 32;
2700        let mut handles = Vec::new();
2701        for _ in 0..N {
2702            let dep_id = "tcrate::rpc_dep".to_string();
2703            let transport = transport.clone();
2704            handles.push(thread::spawn(move || {
2705                let channel = HostedRpcChannel::new(dep_id, transport);
2706                let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
2707                let mut ids = Vec::with_capacity(M);
2708                for _ in 0..M {
2709                    ids.push(stub.next());
2710                }
2711                ids
2712            }));
2713        }
2714        let mut all = Vec::with_capacity(N * M);
2715        for h in handles {
2716            all.extend(h.join().expect("thread panicked"));
2717        }
2718        all.sort();
2719        let mut prev: u64 = 0;
2720        for id in &all {
2721            assert!(
2722                *id > prev,
2723                "duplicate or non-monotonic id {id} after {prev}"
2724            );
2725            prev = *id;
2726        }
2727        assert_eq!(
2728            all.len(),
2729            N * M,
2730            "expected exactly {} ids in total, got {}",
2731            N * M,
2732            all.len()
2733        );
2734    }
2735}