Skip to main content

test_r_core/execution/
mod.rs

1use rand::prelude::{SliceRandom, StdRng};
2use rand::SeedableRng;
3use std::any::Any;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::fmt::{Debug, Formatter};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use topological_sort::TopologicalSort;
9
10use crate::args::Arguments;
11use crate::internal::{
12    apply_suite_props_to_tests, filter_registered_tests, DepScope, DependencyConstructor,
13    DependencyView, HostedRpcOwnerCell, RegisteredDependency, RegisteredTest,
14    RegisteredTestSuiteProperty,
15};
16
17/// Wire bytes for a single Cloneable / Hosted dependency, keyed by its
18/// fully-qualified id (`{crate}::{module}::{name}`).
19pub type DepWireBytes = (String, Vec<u8>);
20
21/// Parent-held owner value (used only for `Hosted` deps — the parent keeps
22/// the owner alive for the suite's duration).
23pub type HostedOwner = Arc<dyn Any + Send + Sync>;
24
25#[cfg(feature = "tokio")]
26type ParentSharedDependenciesFuture<'a> = std::pin::Pin<
27    Box<dyn std::future::Future<Output = HashMap<String, Arc<dyn Any + Send + Sync>>> + 'a>,
28>;
29
30#[cfg(test)]
31/// Result of [`TestSuiteExecution::collect_hosted_descriptor_bytes_sync`] /
32/// [`TestSuiteExecution::collect_hosted_descriptor_bytes_async`]: the
33/// descriptor bytes that get shipped to workers, plus the parent-held owner
34/// values that must outlive every worker.
35pub type HostedDescriptorCollection = (Vec<DepWireBytes>, Vec<HostedOwner>);
36
37/// Parent-side materialisation output for dependency scopes whose worker-side
38/// value is derived from parent-owned state instead of by rerunning the user
39/// constructor in each worker process.
40pub struct ParentSharedDependencies {
41    pub cloneable_wire_bytes: Vec<DepWireBytes>,
42    /// Parent-constructed `Cloneable` values keyed by fully-qualified dep id.
43    /// In **no-spawn-workers** mode (e.g. `--nocapture`) the runner installs
44    /// these directly into the execution tree via
45    /// [`TestSuiteExecution::provide_cloneable_value`] so tests see the
46    /// parent's value without re-running the constructor in
47    /// `materialize_deps`. For Cloneable, the round-trip
48    /// `from_wire(to_wire(value))` is by contract semantics-preserving, so
49    /// reusing the parent value directly is equivalent to round-tripping
50    /// while avoiding the duplicate constructor run that historically
51    /// occurred on the no-spawn-workers code path.
52    ///
53    /// In spawn-workers mode this list is unused — workers receive
54    /// `cloneable_wire_bytes` over IPC instead.
55    pub cloneable_local_values: Vec<(String, Arc<dyn Any + Send + Sync>)>,
56    pub hosted_descriptor_bytes: Vec<DepWireBytes>,
57    pub hosted_owners: Vec<HostedOwner>,
58    pub hosted_rpc_owner_cells: Vec<(String, Arc<HostedRpcOwnerCell>)>,
59}
60
61impl ParentSharedDependencies {
62    fn new() -> Self {
63        Self {
64            cloneable_wire_bytes: Vec::new(),
65            cloneable_local_values: Vec::new(),
66            hosted_descriptor_bytes: Vec::new(),
67            hosted_owners: Vec::new(),
68            hosted_rpc_owner_cells: Vec::new(),
69        }
70    }
71}
72
73pub(crate) struct TestSuiteExecution {
74    crate_and_module: String,
75    dependencies: Vec<RegisteredDependency>,
76    tests: Vec<RegisteredTest>,
77    props: Vec<RegisteredTestSuiteProperty>,
78    inner: Vec<TestSuiteExecution>,
79    materialized_dependencies: HashMap<String, Arc<dyn Any + Send + Sync>>,
80    sequential_lock: SequentialExecutionLock,
81    remaining_count: usize,
82    idx: usize,
83    is_sequential: bool,
84    skip_creating_dependencies: bool,
85    in_progress: Arc<AtomicUsize>,
86}
87
88impl TestSuiteExecution {
89    pub fn construct(
90        arguments: &Arguments,
91        dependencies: &[RegisteredDependency],
92        tests: &[RegisteredTest],
93        props: &[RegisteredTestSuiteProperty],
94    ) -> (Self, Vec<RegisteredTest>) {
95        let tests_with_props = apply_suite_props_to_tests(tests, props);
96        let mut filtered_tests = filter_registered_tests(arguments, &tests_with_props);
97        Self::shuffle(arguments, &mut filtered_tests);
98        filtered_tests.reverse();
99
100        if filtered_tests.is_empty() {
101            (
102                Self::root(
103                    dependencies
104                        .iter()
105                        .filter(|dep| dep.crate_name.is_empty() && dep.module_path.is_empty())
106                        .cloned()
107                        .collect::<Vec<_>>(),
108                    Vec::new(),
109                    props
110                        .iter()
111                        .filter(|dep| dep.crate_name().is_empty() && dep.module_path().is_empty())
112                        .cloned()
113                        .collect::<Vec<_>>(),
114                ),
115                Vec::new(),
116            )
117        } else {
118            let mut root = Self::root(Vec::new(), Vec::new(), Vec::new());
119
120            for prop in props {
121                root.add_prop(prop.clone());
122            }
123
124            for dep in dependencies {
125                root.add_dependency(dep.clone());
126            }
127
128            for test in filtered_tests.clone() {
129                root.add_test(test.clone());
130            }
131
132            root.propagate_sequential(None);
133            root.prune_unused_deps();
134
135            (root, filtered_tests)
136        }
137    }
138
139    fn shuffle(arguments: &Arguments, tests: &mut [RegisteredTest]) {
140        if let Some(seed) = arguments.shuffle_seed {
141            let mut rng = StdRng::seed_from_u64(seed);
142            tests.shuffle(&mut rng);
143        }
144    }
145
146    /// Disables creating dependencies when picking the next test. This is useful when the execution plan
147    /// is only used to drive spawned workers instead of actually running the tests.
148    pub fn skip_creating_dependencies(&mut self) {
149        self.skip_creating_dependencies = true;
150        for inner in &mut self.inner {
151            inner.skip_creating_dependencies();
152        }
153    }
154
155    pub fn remaining(&self) -> usize {
156        self.remaining_count
157    }
158
159    pub fn is_empty(&self) -> bool {
160        self.tests.is_empty() && self.inner.is_empty()
161    }
162
163    pub fn is_done(&self) -> bool {
164        self.remaining_count == 0
165    }
166
167    /// Returns true if either this level, or any of the inner levels have dependencies
168    #[allow(dead_code)]
169    pub fn has_dependencies(&self) -> bool {
170        !self.dependencies.is_empty() || self.inner.iter().any(|inner| inner.has_dependencies())
171    }
172
173    /// Returns true if any dependency in this subtree uses `DepScope::Shared`
174    /// — those force single-threaded execution when output capture is on,
175    /// because the materialised value cannot cross the parent/worker boundary.
176    pub fn has_shared_dependencies(&self) -> bool {
177        self.dependencies
178            .iter()
179            .any(|d| d.scope == DepScope::Shared)
180            || self
181                .inner
182                .iter()
183                .any(|inner| inner.has_shared_dependencies())
184    }
185
186    /// Returns true if any dependency in this subtree uses `DepScope::Cloneable`.
187    #[allow(dead_code)]
188    pub fn has_cloneable_dependencies(&self) -> bool {
189        self.dependencies
190            .iter()
191            .any(|d| d.scope == DepScope::Cloneable)
192            || self
193                .inner
194                .iter()
195                .any(|inner| inner.has_cloneable_dependencies())
196    }
197
198    /// Returns true if any dependency in this subtree uses `DepScope::Hosted`.
199    /// The parent keeps Hosted owners alive for the duration of the suite
200    /// while shipping descriptors to workers.
201    #[allow(dead_code)]
202    pub fn has_hosted_dependencies(&self) -> bool {
203        self.dependencies
204            .iter()
205            .any(|d| d.scope == DepScope::Hosted)
206            || self
207                .inner
208                .iter()
209                .any(|inner| inner.has_hosted_dependencies())
210    }
211
212    /// Returns true if any dependency in this subtree uses `DepScope::HostedRpc`.
213    /// The parent keeps owner cells alive for the suite and routes
214    /// worker-initiated IPC calls to those cells.
215    #[allow(dead_code)]
216    pub fn has_hosted_rpc_dependencies(&self) -> bool {
217        self.dependencies
218            .iter()
219            .any(|d| d.scope == DepScope::HostedRpc)
220            || self
221                .inner
222                .iter()
223                .any(|inner| inner.has_hosted_rpc_dependencies())
224    }
225
226    /// Collects every Cloneable dependency in this subtree (depth-first).
227    #[allow(dead_code)]
228    pub fn collect_cloneable_dependencies(&self) -> Vec<RegisteredDependency> {
229        let mut out = Vec::new();
230        self.collect_cloneable_dependencies_into(&mut out);
231        out
232    }
233
234    #[allow(dead_code)]
235    fn collect_cloneable_dependencies_into(&self, out: &mut Vec<RegisteredDependency>) {
236        for dep in &self.dependencies {
237            if dep.scope == DepScope::Cloneable {
238                out.push(dep.clone());
239            }
240        }
241        for inner in &self.inner {
242            inner.collect_cloneable_dependencies_into(out);
243        }
244    }
245
246    /// Walks the subtree, materialising dependencies in dependency order and
247    /// collecting the parent-side wire/state needed by Cloneable, Hosted, and
248    /// HostedRpc scopes. Constructor dependencies are resolved in this parent
249    /// context, but workers still receive these shared scopes as dependency-free
250    /// leaves: Cloneable/Hosted values are reconstructed from bytes, and
251    /// HostedRpc values are stubs backed by a channel.
252    pub fn collect_parent_shared_dependencies_sync(&self) -> ParentSharedDependencies {
253        let mut out = ParentSharedDependencies::new();
254        let parent_map = HashMap::new();
255        self.collect_parent_shared_dependencies_into_sync(&parent_map, &mut out);
256        out
257    }
258
259    fn collect_parent_shared_dependencies_into_sync(
260        &self,
261        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
262        out: &mut ParentSharedDependencies,
263    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
264        let mut dependency_map = parent_map.clone();
265        let sorted_dependencies = self.sorted_dependencies();
266
267        for dep in sorted_dependencies {
268            if dependency_map.contains_key(&dep.name) {
269                continue;
270            }
271
272            let value = Self::construct_dependency_sync(dep, &dependency_map);
273            match dep.scope {
274                DepScope::Cloneable => {
275                    let codec = dep.cloneable_codec.as_ref().unwrap_or_else(|| {
276                        panic!("Cloneable dep '{}' missing CloneableCodec", dep.name)
277                    });
278                    out.cloneable_wire_bytes
279                        .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
280                    // Keep the parent-constructed value too, for the
281                    // no-spawn-workers code path that installs Cloneable
282                    // values directly into the execution tree (instead of
283                    // re-running the constructor inside `materialize_deps`).
284                    out.cloneable_local_values
285                        .push((dep.qualified_id(), value.clone()));
286                }
287                DepScope::Hosted => {
288                    let codec = dep.hosted_codec.as_ref().unwrap_or_else(|| {
289                        panic!("Hosted dep '{}' missing hosted codec", dep.name)
290                    });
291                    out.hosted_descriptor_bytes
292                        .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
293                    out.hosted_owners.push(value.clone());
294                }
295                DepScope::HostedRpc => {
296                    let factory = dep.rpc_factory.as_ref().unwrap_or_else(|| {
297                        panic!("HostedRpc dep '{}' missing RpcFactory", dep.name)
298                    });
299                    let cell = (factory.owner_into_cell)(value.clone());
300                    out.hosted_rpc_owner_cells.push((dep.qualified_id(), cell));
301                }
302                DepScope::Shared | DepScope::PerWorker => {}
303            }
304
305            dependency_map.insert(dep.name.clone(), value);
306        }
307
308        for inner in &self.inner {
309            inner.collect_parent_shared_dependencies_into_sync(&dependency_map, out);
310        }
311
312        dependency_map
313    }
314
315    fn construct_dependency_sync(
316        dep: &RegisteredDependency,
317        dependency_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
318    ) -> Arc<dyn Any + Send + Sync> {
319        match &dep.constructor {
320            DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
321            DependencyConstructor::Async(cons) => {
322                futures::executor::block_on(cons(Arc::new(dependency_map.clone())))
323            }
324        }
325    }
326
327    /// Collects only Cloneable wire bytes. The runner uses
328    /// [`Self::collect_parent_shared_dependencies_sync`] to collect all shared
329    /// parent-side values in one pass; this narrower helper remains for unit
330    /// tests and focused callers.
331    #[cfg(test)]
332    pub fn collect_cloneable_wire_bytes_sync(&self) -> Vec<(String, Vec<u8>)> {
333        self.collect_parent_shared_dependencies_sync()
334            .cloneable_wire_bytes
335    }
336
337    /// Parent-side materialisation for `Hosted` dependencies.
338    ///
339    /// The returned descriptor bytes are keyed by fully-qualified dep id and
340    /// the returned owner values must be kept alive for the duration of the
341    /// suite. Unlike Cloneable, Hosted owners may hold resources (TCP
342    /// listeners, Docker containers, gRPC clients, etc.) that workers'
343    /// reconstructed handles depend on.
344    #[cfg(test)]
345    pub fn collect_hosted_descriptor_bytes_sync(&self) -> HostedDescriptorCollection {
346        let collected = self.collect_parent_shared_dependencies_sync();
347        (collected.hosted_descriptor_bytes, collected.hosted_owners)
348    }
349
350    /// Parent-side materialisation for `HostedRpc` dependencies.
351    ///
352    /// Returns `(qualified_id, cell)` pairs that the runtime keeps alive for
353    /// the suite's lifetime and uses to dispatch worker-initiated RPC calls.
354    #[cfg(test)]
355    pub fn collect_hosted_rpc_owner_cells_sync(&self) -> Vec<(String, Arc<HostedRpcOwnerCell>)> {
356        self.collect_parent_shared_dependencies_sync()
357            .hosted_rpc_owner_cells
358    }
359
360    /// Async counterpart of [`Self::collect_parent_shared_dependencies_sync`].
361    /// Async constructors are awaited on the parent before workers receive
362    /// wire bytes, descriptors, or RPC stubs.
363    #[cfg(feature = "tokio")]
364    pub async fn collect_parent_shared_dependencies_async(&self) -> ParentSharedDependencies {
365        let mut out = ParentSharedDependencies::new();
366        let parent_map = HashMap::new();
367        self.collect_parent_shared_dependencies_into_async(&parent_map, &mut out)
368            .await;
369        out
370    }
371
372    #[cfg(feature = "tokio")]
373    fn collect_parent_shared_dependencies_into_async<'a>(
374        &'a self,
375        parent_map: &'a HashMap<String, Arc<dyn Any + Send + Sync>>,
376        out: &'a mut ParentSharedDependencies,
377    ) -> ParentSharedDependenciesFuture<'a> {
378        Box::pin(async move {
379            let mut dependency_map = parent_map.clone();
380            let sorted_dependencies = self.sorted_dependencies();
381
382            for dep in sorted_dependencies {
383                if dependency_map.contains_key(&dep.name) {
384                    continue;
385                }
386
387                let value = match &dep.constructor {
388                    DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
389                    DependencyConstructor::Async(cons) => {
390                        cons(Arc::new(dependency_map.clone())).await
391                    }
392                };
393                match dep.scope {
394                    DepScope::Cloneable => {
395                        let codec = dep.cloneable_codec.as_ref().unwrap_or_else(|| {
396                            panic!("Cloneable dep '{}' missing CloneableCodec", dep.name)
397                        });
398                        out.cloneable_wire_bytes
399                            .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
400                        // Keep the parent-constructed value too, for the
401                        // no-spawn-workers code path that installs Cloneable
402                        // values directly into the execution tree (instead
403                        // of re-running the constructor inside
404                        // `materialize_deps`).
405                        out.cloneable_local_values
406                            .push((dep.qualified_id(), value.clone()));
407                    }
408                    DepScope::Hosted => {
409                        let codec = dep.hosted_codec.as_ref().unwrap_or_else(|| {
410                            panic!("Hosted dep '{}' missing hosted codec", dep.name)
411                        });
412                        out.hosted_descriptor_bytes
413                            .push((dep.qualified_id(), (codec.to_wire)(value.clone())));
414                        out.hosted_owners.push(value.clone());
415                    }
416                    DepScope::HostedRpc => {
417                        let factory = dep.rpc_factory.as_ref().unwrap_or_else(|| {
418                            panic!("HostedRpc dep '{}' missing RpcFactory", dep.name)
419                        });
420                        let cell = (factory.owner_into_cell)(value.clone());
421                        out.hosted_rpc_owner_cells.push((dep.qualified_id(), cell));
422                    }
423                    DepScope::Shared | DepScope::PerWorker => {}
424                }
425
426                dependency_map.insert(dep.name.clone(), value);
427            }
428
429            for inner in &self.inner {
430                inner
431                    .collect_parent_shared_dependencies_into_async(&dependency_map, out)
432                    .await;
433            }
434            dependency_map
435        })
436    }
437
438    /// Async Hosted-only collection helper retained for focused callers.
439    #[cfg(feature = "tokio")]
440    #[cfg(test)]
441    pub async fn collect_hosted_descriptor_bytes_async(&self) -> HostedDescriptorCollection {
442        let collected = self.collect_parent_shared_dependencies_async().await;
443        (collected.hosted_descriptor_bytes, collected.hosted_owners)
444    }
445
446    /// Async Cloneable-only collection helper retained for focused callers.
447    ///
448    /// **Intentionally `!Send`.** The underlying `DependencyConstructor::Async`
449    /// future is not `Send`, so the returned future from this collector cannot
450    /// be either. Must be awaited on the root runner task (i.e., under
451    /// `Runtime::block_on` or directly inside `test_runner`) — never inside
452    /// `tokio::spawn` / a `JoinSet`. If we ever want to spawn Cloneable
453    /// collection onto a worker, the constructor type would need to require
454    /// `Send` first.
455    #[cfg(feature = "tokio")]
456    #[cfg(test)]
457    pub async fn collect_cloneable_wire_bytes_async(&self) -> Vec<(String, Vec<u8>)> {
458        self.collect_parent_shared_dependencies_async()
459            .await
460            .cloneable_wire_bytes
461    }
462
463    /// Worker-side counterpart to [`Self::collect_cloneable_wire_bytes_sync`]:
464    /// pre-populates the Cloneable dep value at the node where the dep is
465    /// registered, so the upcoming `materialize_deps_sync` call uses the
466    /// provided value instead of running the original constructor. The lookup
467    /// is keyed by the dep's fully-qualified id
468    /// (`{crate}::{module}::{name}`), but the value is stored under the local
469    /// `name` so the rest of the materialisation logic keeps working unchanged.
470    /// Returns `true` if a matching dep was found in any node of the subtree.
471    pub fn provide_cloneable_value(
472        &mut self,
473        dep_id: &str,
474        value: Arc<dyn Any + Send + Sync>,
475    ) -> bool {
476        let applied = self.provide_cloneable_value_internal(dep_id, value);
477        if applied {
478            self.prune_unused_deps();
479        }
480        applied
481    }
482
483    fn provide_cloneable_value_internal(
484        &mut self,
485        dep_id: &str,
486        value: Arc<dyn Any + Send + Sync>,
487    ) -> bool {
488        let mut applied = false;
489        if let Some((local_name, dep_idx)) = self
490            .dependencies
491            .iter()
492            .enumerate()
493            .find(|(_, d)| d.qualified_id() == dep_id)
494            .map(|(idx, d)| (d.name.clone(), idx))
495        {
496            // From the worker execution tree's perspective this dependency is
497            // now a leaf: its value came from wire bytes or a HostedRpc channel,
498            // so the worker must not instantiate constructor-only dependencies
499            // that were needed solely in the parent collection context.
500            self.dependencies[dep_idx].dependencies.clear();
501            self.materialized_dependencies
502                .insert(local_name, value.clone());
503            applied = true;
504        }
505        for inner in &mut self.inner {
506            applied |= inner.provide_cloneable_value_internal(dep_id, value.clone());
507        }
508        applied
509    }
510
511    /// Returns true if there are any tests that require capturing, based on the given default setting
512    /// and the per-test CaptureControl overrides.
513    pub fn requires_capturing(&self, capture_by_default: bool) -> bool {
514        self.tests.iter().any(|test| {
515            test.props
516                .capture_control
517                .requires_capturing(capture_by_default)
518        }) || self
519            .inner
520            .iter()
521            .any(|inner| inner.requires_capturing(capture_by_default))
522    }
523
524    #[cfg(feature = "tokio")]
525    pub async fn pick_next(&mut self) -> Option<TestExecution> {
526        if self.is_empty() {
527            None
528        } else {
529            match self
530                .pick_next_internal(&self.create_dependency_map(&HashMap::new()))
531                .await
532            {
533                Some((test, deps, seq_lock, in_progress_counter)) => {
534                    let index = self.idx;
535                    self.idx += 1;
536                    Some(TestExecution {
537                        test: test.clone(),
538                        deps: Arc::new(deps),
539                        index,
540                        _seq_lock: seq_lock,
541                        in_progress_counter,
542                    })
543                }
544                None => None,
545            }
546        }
547    }
548
549    pub fn pick_next_sync(&mut self) -> Option<TestExecution> {
550        match self.pick_next_internal_sync(&HashMap::new()) {
551            Some((test, deps, seq_lock, in_progress_counter)) => {
552                let index = self.idx;
553                self.idx += 1;
554                Some(TestExecution {
555                    test: test.clone(),
556                    deps: Arc::new(deps),
557                    index,
558                    _seq_lock: seq_lock,
559                    in_progress_counter,
560                })
561            }
562            None => None,
563        }
564    }
565
566    #[cfg(feature = "tokio")]
567    #[allow(clippy::type_complexity)]
568    async fn pick_next_internal(
569        &mut self,
570        materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
571    ) -> Option<(
572        RegisteredTest,
573        HashMap<String, Arc<dyn Any + Send + Sync>>,
574        SequentialExecutionLockGuard,
575        Arc<AtomicUsize>,
576    )> {
577        if self.is_empty() {
578            None
579        } else {
580            let dependency_map = if !self.is_materialized() {
581                self.materialize_deps(materialized_parent_deps).await
582            } else {
583                self.create_dependency_map(materialized_parent_deps)
584            };
585
586            let locked = self.sequential_lock.is_locked().await;
587            let result = if self.tests.is_empty() || locked {
588                let current = self.inner.iter_mut();
589                let mut result = None;
590                for inner in current {
591                    if let Some((test, deps, seq_lock, in_progress_counter)) =
592                        Box::pin(inner.pick_next_internal(&dependency_map)).await
593                    {
594                        result = Some((test, deps, seq_lock, in_progress_counter));
595                        break;
596                    }
597                }
598                self.inner.retain(|inner| !inner.is_empty());
599
600                result
601            } else {
602                let guard = self.sequential_lock.lock(self.is_sequential).await;
603                self.in_progress.fetch_add(1, Ordering::Release);
604                self.tests
605                    .pop()
606                    .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
607            };
608            if result.is_none()
609                && self.is_empty()
610                && self.is_materialized()
611                && !locked
612                && self.in_progress.load(Ordering::Acquire) == 0
613            {
614                self.drop_deps();
615            }
616            if result.is_some() {
617                self.remaining_count -= 1;
618            }
619            result
620        }
621    }
622
623    #[allow(clippy::type_complexity)]
624    fn pick_next_internal_sync(
625        &mut self,
626        materialized_parent_deps: &HashMap<String, Arc<dyn Any + Send + Sync>>,
627    ) -> Option<(
628        RegisteredTest,
629        HashMap<String, Arc<dyn Any + Send + Sync>>,
630        SequentialExecutionLockGuard,
631        Arc<AtomicUsize>,
632    )> {
633        if self.is_empty() {
634            None
635        } else {
636            let dependency_map = if !self.is_materialized() {
637                self.materialize_deps_sync(materialized_parent_deps)
638            } else {
639                self.create_dependency_map(materialized_parent_deps)
640            };
641
642            let locked = self.sequential_lock.is_locked_sync();
643            let result = if self.tests.is_empty() || locked {
644                let current = self.inner.iter_mut();
645                let mut result = None;
646                for inner in current {
647                    if let Some((test, deps, seq_lock, in_progress_counter)) =
648                        inner.pick_next_internal_sync(&dependency_map)
649                    {
650                        result = Some((test, deps, seq_lock, in_progress_counter));
651                        break;
652                    }
653                }
654
655                self.inner.retain(|inner| !inner.is_empty());
656                result
657            } else {
658                let guard = self.sequential_lock.lock_sync(self.is_sequential);
659                self.in_progress.fetch_add(1, Ordering::Release);
660                self.tests
661                    .pop()
662                    .map(|test| (test, dependency_map, guard, self.in_progress.clone()))
663            };
664            // `is_empty()` matches `pick_next_internal`: a `None` result
665            // can mean "descendant is temporarily locked", not "subtree
666            // done" — dropping deps here would force rematerialisation.
667            if result.is_none()
668                && self.is_empty()
669                && self.is_materialized()
670                && !locked
671                && self.in_progress.load(Ordering::Acquire) == 0
672            {
673                self.drop_deps();
674            }
675            if result.is_some() {
676                self.remaining_count -= 1;
677            }
678            result
679        }
680    }
681
682    fn create_dependency_map(
683        &self,
684        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
685    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
686        let mut result = parent_map.clone();
687        for (key, dep) in &self.materialized_dependencies {
688            result.insert(key.clone(), dep.clone());
689        }
690        result
691    }
692
693    fn root(
694        deps: Vec<RegisteredDependency>,
695        tests: Vec<RegisteredTest>,
696        props: Vec<RegisteredTestSuiteProperty>,
697    ) -> Self {
698        let total_count = tests.len();
699        let is_sequential = props
700            .iter()
701            .any(|prop| matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }))
702            || tests.iter().any(|test| test.run.is_bench());
703        Self {
704            crate_and_module: String::new(),
705            dependencies: deps,
706            tests,
707            props,
708            inner: Vec::new(),
709            materialized_dependencies: HashMap::new(),
710            remaining_count: total_count,
711            idx: 0,
712            sequential_lock: SequentialExecutionLock::new(),
713            is_sequential,
714            skip_creating_dependencies: false,
715            in_progress: Arc::new(AtomicUsize::new(0)),
716        }
717    }
718
719    fn add_dependency(&mut self, dep: RegisteredDependency) {
720        let crate_and_module = dep.crate_and_module();
721        if self.crate_and_module == crate_and_module {
722            self.dependencies.push(dep);
723        } else {
724            let mut found = false;
725            for inner in &mut self.inner {
726                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
727                    inner.add_dependency(dep.clone());
728                    found = true;
729                    break;
730                }
731            }
732            if !found {
733                let mut inner = Self {
734                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
735                    dependencies: vec![],
736                    tests: vec![],
737                    inner: vec![],
738                    props: vec![],
739                    materialized_dependencies: HashMap::new(),
740                    remaining_count: 0,
741                    idx: 0,
742                    is_sequential: false,
743                    sequential_lock: SequentialExecutionLock::new(),
744                    skip_creating_dependencies: false,
745                    in_progress: Arc::new(AtomicUsize::new(0)),
746                };
747                inner.add_dependency(dep);
748                self.inner.push(inner);
749            }
750        }
751    }
752
753    fn add_test(&mut self, test: RegisteredTest) {
754        let crate_and_module = test.crate_and_module();
755        if self.crate_and_module == crate_and_module {
756            self.tests.push(test.clone());
757
758            if test.run.is_bench() {
759                self.is_sequential = true;
760            }
761        } else {
762            let mut found = false;
763            for inner in &mut self.inner {
764                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
765                    inner.add_test(test.clone());
766                    found = true;
767                    break;
768                }
769            }
770            if !found {
771                let mut inner = Self {
772                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
773                    dependencies: vec![],
774                    tests: vec![],
775                    inner: vec![],
776                    props: vec![],
777                    materialized_dependencies: HashMap::new(),
778                    remaining_count: 0,
779                    idx: 0,
780                    is_sequential: false,
781                    sequential_lock: SequentialExecutionLock::new(),
782                    skip_creating_dependencies: false,
783                    in_progress: Arc::new(AtomicUsize::new(0)),
784                };
785                inner.add_test(test);
786                self.inner.push(inner);
787            }
788        }
789        self.remaining_count += 1;
790    }
791
792    fn add_prop(&mut self, prop: RegisteredTestSuiteProperty) {
793        let crate_and_module = prop.crate_and_module();
794        if self.crate_and_module == crate_and_module {
795            if matches!(prop, RegisteredTestSuiteProperty::Sequential { .. }) {
796                self.is_sequential = true;
797            }
798            self.props.push(prop);
799        } else {
800            let mut found = false;
801            for inner in &mut self.inner {
802                if Self::is_prefix_of(&inner.crate_and_module, &crate_and_module) {
803                    inner.add_prop(prop.clone());
804                    found = true;
805                    break;
806                }
807            }
808            if !found {
809                let mut inner = Self {
810                    crate_and_module: Self::next_level(&self.crate_and_module, &crate_and_module),
811                    dependencies: vec![],
812                    tests: vec![],
813                    inner: vec![],
814                    props: vec![],
815                    materialized_dependencies: HashMap::new(),
816                    remaining_count: 0,
817                    idx: 0,
818                    is_sequential: false,
819                    sequential_lock: SequentialExecutionLock::new(),
820                    skip_creating_dependencies: false,
821                    in_progress: Arc::new(AtomicUsize::new(0)),
822                };
823                inner.add_prop(prop);
824                self.inner.push(inner);
825            }
826        }
827    }
828
829    fn is_materialized(&self) -> bool {
830        self.skip_creating_dependencies
831            || self.materialized_dependencies.len() == self.dependencies.len()
832    }
833
834    #[cfg(feature = "tokio")]
835    async fn materialize_deps(
836        &mut self,
837        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
838    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
839        // Start with any pre-populated values (e.g. Cloneable deps received
840        // from the parent via ProvideCloneable IPC).
841        let mut deps = self.materialized_dependencies.clone();
842        let mut dependency_map = parent_map.clone();
843        for (k, v) in &deps {
844            dependency_map.insert(k.clone(), v.clone());
845        }
846
847        let sorted_dependencies = self.sorted_dependencies();
848        for dep in &sorted_dependencies {
849            if deps.contains_key(&dep.name) {
850                continue;
851            }
852            let materialized_dep = match &dep.constructor {
853                DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
854                DependencyConstructor::Async(cons) => cons(Arc::new(dependency_map.clone())).await,
855            };
856            deps.insert(dep.name.clone(), materialized_dep.clone());
857            dependency_map.insert(dep.name.clone(), materialized_dep);
858        }
859        self.materialized_dependencies = deps;
860        dependency_map
861    }
862
863    fn materialize_deps_sync(
864        &mut self,
865        parent_map: &HashMap<String, Arc<dyn Any + Send + Sync>>,
866    ) -> HashMap<String, Arc<dyn Any + Send + Sync>> {
867        // Start with any pre-populated values (e.g. Cloneable deps received
868        // from the parent via ProvideCloneable IPC).
869        let mut deps = self.materialized_dependencies.clone();
870        let mut dependency_map = parent_map.clone();
871        for (k, v) in &deps {
872            dependency_map.insert(k.clone(), v.clone());
873        }
874
875        let sorted_dependencies = self.sorted_dependencies();
876        for dep in &sorted_dependencies {
877            if deps.contains_key(&dep.name) {
878                continue;
879            }
880            let materialized_dep = match &dep.constructor {
881                DependencyConstructor::Sync(cons) => cons(Arc::new(dependency_map.clone())),
882                DependencyConstructor::Async(cons) => {
883                    futures::executor::block_on(cons(Arc::new(dependency_map.clone())))
884                }
885            };
886            deps.insert(dep.name.clone(), materialized_dep.clone());
887            dependency_map.insert(dep.name.clone(), materialized_dep);
888        }
889        self.materialized_dependencies = deps;
890        dependency_map
891    }
892
893    fn sorted_dependencies(&self) -> Vec<&RegisteredDependency> {
894        let mut ts: TopologicalSort<&RegisteredDependency> = TopologicalSort::new();
895        for dep in &self.dependencies {
896            let mut added = false;
897            for dep_dep_name in &dep.dependencies {
898                if let Some(dep_dep) = self.dependencies.iter().find(|d| &d.name == dep_dep_name) {
899                    ts.add_dependency(dep_dep, dep);
900                    added = true;
901                } else {
902                    // otherwise it is expected to come from the parent level
903                }
904            }
905            if !added {
906                ts.insert(dep);
907            }
908        }
909        let mut result = Vec::with_capacity(self.dependencies.len());
910        loop {
911            let chunk = ts.pop_all();
912            if chunk.is_empty() {
913                break;
914            }
915            result.extend(chunk);
916        }
917        result
918    }
919
920    fn drop_deps(&mut self) {
921        self.materialized_dependencies.clear();
922    }
923
924    /// Prunes dependencies that are not needed by any test in this subtree.
925    /// Returns `Some(needed_from_parent)` with dep names needed from ancestor levels,
926    /// or `None` if pruning is disabled for this subtree (unknown deps).
927    fn prune_unused_deps(&mut self) -> Option<HashSet<String>> {
928        // Collect dep names needed by tests at this level
929        let mut needed: Option<HashSet<String>> = Some(HashSet::new());
930        for test in &self.tests {
931            match &test.dependencies {
932                None => {
933                    needed = None;
934                    break;
935                }
936                Some(deps) => {
937                    if let Some(ref mut set) = needed {
938                        set.extend(deps.iter().cloned());
939                    }
940                }
941            }
942        }
943
944        // Merge children's needs
945        for inner in &mut self.inner {
946            let child_needs = inner.prune_unused_deps();
947            needed = match (needed, child_needs) {
948                (None, _) | (_, None) => None,
949                (Some(mut a), Some(b)) => {
950                    a.extend(b);
951                    Some(a)
952                }
953            };
954        }
955
956        // If any test has unknown deps, keep everything
957        let needed = needed?;
958
959        // Determine which local deps to keep
960        let local_names: HashSet<String> =
961            self.dependencies.iter().map(|d| d.name.clone()).collect();
962        let mut keep_local: HashSet<String> = needed.intersection(&local_names).cloned().collect();
963
964        // Expand transitive closure for local deps only (fixpoint)
965        let mut queue: VecDeque<String> = keep_local.iter().cloned().collect();
966        let mut needed_from_parent: HashSet<String> =
967            needed.difference(&local_names).cloned().collect();
968
969        while let Some(dep_name) = queue.pop_front() {
970            if let Some(dep) = self.dependencies.iter().find(|d| d.name == dep_name) {
971                for transitive in &dep.dependencies {
972                    if local_names.contains(transitive) {
973                        if keep_local.insert(transitive.clone()) {
974                            queue.push_back(transitive.clone());
975                        }
976                    } else {
977                        needed_from_parent.insert(transitive.clone());
978                    }
979                }
980                // Companions are planner-only sibling links — no
981                // constructor argument is derived from them, but they
982                // must be retained together with the dep they are
983                // declared on. Used by the
984                // `#[test_dep(scope = Hosted, worker = both(T))]`
985                // lowering to keep the Hosted owner half and the
986                // HostedRpc stub half as a pair even when the
987                // selected tests only parameterise on one of them.
988                for companion in &dep.companions {
989                    if local_names.contains(companion) {
990                        if keep_local.insert(companion.clone()) {
991                            queue.push_back(companion.clone());
992                        }
993                    } else {
994                        needed_from_parent.insert(companion.clone());
995                    }
996                }
997            }
998        }
999
1000        // Prune
1001        self.dependencies.retain(|d| keep_local.contains(&d.name));
1002
1003        Some(needed_from_parent)
1004    }
1005
1006    fn is_prefix_of(this: &str, that: &str) -> bool {
1007        this.is_empty() || this == that || that.starts_with(&format!("{this}::"))
1008    }
1009
1010    fn next_level(from: &str, to: &str) -> String {
1011        assert!(Self::is_prefix_of(from, to));
1012        let remaining = if from.is_empty() {
1013            to
1014        } else {
1015            &to[from.len() + 2..]
1016        };
1017
1018        let result = if let Some((next, _tail)) = remaining.split_once("::") {
1019            format!("{from}::{next}")
1020        } else {
1021            format!("{from}::{remaining}")
1022        };
1023        result.trim_start_matches("::").to_string()
1024    }
1025
1026    fn propagate_sequential(&mut self, inherited_lock: Option<&SequentialExecutionLock>) {
1027        if let Some(parent_lock) = inherited_lock {
1028            self.is_sequential = true;
1029            self.sequential_lock = parent_lock.clone();
1030        }
1031
1032        let lock_for_children = if self.is_sequential {
1033            Some(self.sequential_lock.clone())
1034        } else {
1035            None
1036        };
1037
1038        for child in &mut self.inner {
1039            child.propagate_sequential(lock_for_children.as_ref());
1040        }
1041    }
1042}
1043
1044impl Debug for TestSuiteExecution {
1045    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1046        writeln!(
1047            f,
1048            "'{}' {} [{}]",
1049            self.crate_and_module,
1050            self.props
1051                .iter()
1052                .map(|x| format!("{x:?}"))
1053                .collect::<Vec<_>>()
1054                .join(", "),
1055            if self.is_sequential { "S" } else { "P" }
1056        )?;
1057        writeln!(f, "  deps:")?;
1058        for dep in &self.dependencies {
1059            writeln!(f, "    '{}'", dep.name)?;
1060        }
1061        writeln!(f, "  tests:")?;
1062        for test in &self.tests {
1063            writeln!(f, "    '{}' [{:?}]", test.name, test.props.test_type)?;
1064        }
1065        for inner in &self.inner {
1066            let inner_str = format!("{inner:?}");
1067            for inner_line in inner_str.lines() {
1068                writeln!(f, "  {inner_line}")?;
1069            }
1070        }
1071        Ok(())
1072    }
1073}
1074
1075impl DependencyView for HashMap<String, Arc<dyn Any + Send + Sync>> {
1076    fn get(&self, name: &str) -> Option<Arc<dyn Any + Send + Sync>> {
1077        self.get(name).cloned()
1078    }
1079}
1080
1081pub struct TestExecution {
1082    pub test: RegisteredTest,
1083    pub deps: Arc<dyn DependencyView + Send + Sync>,
1084    pub index: usize,
1085    _seq_lock: SequentialExecutionLockGuard,
1086    in_progress_counter: Arc<AtomicUsize>,
1087}
1088
1089impl Drop for TestExecution {
1090    fn drop(&mut self) {
1091        self.in_progress_counter.fetch_sub(1, Ordering::Release);
1092    }
1093}
1094
1095#[allow(dead_code)]
1096enum SequentialExecutionLockGuard {
1097    None,
1098    #[cfg(feature = "tokio")]
1099    Async(tokio::sync::OwnedMutexGuard<()>),
1100    Sync(parking_lot::ArcMutexGuard<parking_lot::RawMutex, ()>),
1101}
1102
1103#[derive(Clone)]
1104struct SequentialExecutionLock {
1105    #[cfg(feature = "tokio")]
1106    async_mutex: Arc<tokio::sync::Mutex<()>>,
1107    sync_mutex: Arc<parking_lot::Mutex<()>>,
1108}
1109
1110impl SequentialExecutionLock {
1111    pub fn new() -> Self {
1112        Self {
1113            #[cfg(feature = "tokio")]
1114            async_mutex: Arc::new(tokio::sync::Mutex::new(())),
1115            sync_mutex: Arc::new(parking_lot::Mutex::new(())),
1116        }
1117    }
1118
1119    #[cfg(feature = "tokio")]
1120    pub async fn is_locked(&self) -> bool {
1121        self.async_mutex.try_lock().is_err()
1122    }
1123
1124    pub fn is_locked_sync(&self) -> bool {
1125        self.sync_mutex.try_lock().is_none()
1126    }
1127
1128    #[cfg(feature = "tokio")]
1129    pub async fn lock(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
1130        if is_sequential {
1131            let permit = tokio::sync::Mutex::lock_owned(self.async_mutex.clone()).await;
1132            SequentialExecutionLockGuard::Async(permit)
1133        } else {
1134            SequentialExecutionLockGuard::None
1135        }
1136    }
1137
1138    pub fn lock_sync(&self, is_sequential: bool) -> SequentialExecutionLockGuard {
1139        if is_sequential {
1140            let permit = parking_lot::Mutex::lock_arc(&self.sync_mutex);
1141            SequentialExecutionLockGuard::Sync(permit)
1142        } else {
1143            SequentialExecutionLockGuard::None
1144        }
1145    }
1146}
1147
1148#[cfg(test)]
1149mod tests;