Skip to main content

test_r_core/
tokio.rs

1use crate::args::{Arguments, TimeThreshold};
2use crate::bench::AsyncBencher;
3use crate::execution::{DepWireBytes, TestExecution, TestSuiteExecution};
4use crate::internal;
5use crate::internal::{
6    generate_tests, get_ensure_time, CapturedOutput, CloneableCodec, FailureCause,
7    FlakinessControl, HostedRpcChannel, HostedRpcError, HostedRpcOwnerCell, HostedRpcTransport,
8    InProcessHostedRpcTransport, RegisteredTest, RpcFactory, SuiteResult, TestFunction, TestResult,
9    WorkerReconstructor,
10};
11use crate::ipc::{
12    ipc_name, read_frame_async, write_frame_async, HostedRpcReplyBody, IpcCommand, IpcResponse,
13};
14use crate::output::{test_runner_output, TestRunnerOutput};
15use desert_rust::{deserialize, serialize_to_byte_vec};
16use futures::FutureExt;
17use interprocess::local_socket::tokio::prelude::*;
18use interprocess::local_socket::tokio::{Listener, Stream};
19use interprocess::local_socket::{GenericNamespaced, ListenerOptions};
20use std::any::Any;
21use std::collections::HashMap;
22use std::collections::VecDeque;
23use std::future::Future;
24use std::panic::AssertUnwindSafe;
25use std::pin::Pin;
26use std::process::{ExitCode, Stdio};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
30use tokio::process::{Child, Command};
31use tokio::spawn;
32use tokio::sync::Mutex;
33use tokio::task::{spawn_blocking, JoinHandle, JoinSet};
34use tokio::time::Instant;
35use uuid::Uuid;
36
37pub fn test_runner() -> ExitCode {
38    tokio::runtime::Builder::new_multi_thread()
39        .enable_all()
40        .build()
41        .unwrap()
42        .block_on(async_test_runner())
43}
44
45#[allow(clippy::await_holding_lock)]
46async fn async_test_runner() -> ExitCode {
47    crate::panic_hook::install_panic_hook();
48    let mut args = Arguments::from_args();
49    // When the parent spawned this process as a worker it passed
50    // `--worker-index <N>`. Stash it so `crate::worker::worker_index()`
51    // returns the correct value for PerWorker dep constructors.
52    if let Some(idx) = args.worker_index {
53        crate::worker::set_worker_index(idx);
54    }
55    // Host-side output capture is installed PER retry attempt below
56    // (after `finalize_for_execution`), mirroring the sync runner.
57    // See `crate::host_capture` for the pipeline.
58    let output = test_runner_output(&args);
59
60    let registered_tests = internal::REGISTERED_TESTS.lock().unwrap();
61    let registered_dependency_constructors =
62        internal::REGISTERED_DEPENDENCY_CONSTRUCTORS.lock().unwrap();
63    let registered_testsuite_props = internal::REGISTERED_TESTSUITE_PROPS.lock().unwrap();
64    let registered_test_generators = internal::REGISTERED_TEST_GENERATORS.lock().unwrap();
65
66    let generated_tests = generate_tests(&registered_test_generators).await;
67
68    let all_tests: Vec<RegisteredTest> = registered_tests
69        .iter()
70        .cloned()
71        .chain(generated_tests)
72        .collect();
73
74    if args.list {
75        output.test_list(&all_tests);
76        ExitCode::SUCCESS
77    } else {
78        let mut remaining_retries = args.flaky_run.unwrap_or(1);
79
80        let mut exit_code = ExitCode::from(101);
81        while remaining_retries > 0 {
82            let (mut execution, filtered_tests) = TestSuiteExecution::construct(
83                &args,
84                registered_dependency_constructors.as_slice(),
85                &all_tests,
86                registered_testsuite_props.as_slice(),
87            );
88            args.finalize_for_execution(&execution, output.clone());
89            // Install host capture for this attempt, after
90            // `finalize_for_execution` has decided whether workers
91            // will spawn. See `sync::test_runner` for the rationale.
92            let mut host_capture = crate::host_capture::install_if_needed(&args);
93            let host_capture_epoch: Option<std::time::Instant> =
94                host_capture.as_ref().map(|hc| hc.epoch());
95            let host_capture_epoch_wall: Option<std::time::SystemTime> =
96                host_capture.as_ref().map(|hc| hc.epoch_wall());
97            let is_top_level_parent = args.is_top_level_parent();
98            let has_selected_tests = execution.remaining() > 0;
99            // Parent-side collection for dependency scopes whose worker-side
100            // value is shipped as bytes or represented as an RPC stub. Async
101            // constructors are awaited here, before workers receive their
102            // reconstructed values. Skip it for empty filtered runs.
103            let needs_parent_shared = execution.has_cloneable_dependencies()
104                || execution.has_hosted_dependencies()
105                || execution.has_hosted_rpc_dependencies();
106            let parent_shared = if is_top_level_parent && has_selected_tests && needs_parent_shared
107            {
108                execution.collect_parent_shared_dependencies_async().await
109            } else {
110                crate::execution::ParentSharedDependencies {
111                    cloneable_wire_bytes: Vec::new(),
112                    cloneable_local_values: Vec::new(),
113                    hosted_descriptor_bytes: Vec::new(),
114                    hosted_owners: Vec::new(),
115                    hosted_rpc_owner_cells: Vec::new(),
116                    parent_constructed_shared_values: Vec::new(),
117                }
118            };
119            let cloneable_wire_bytes = parent_shared.cloneable_wire_bytes;
120            let cloneable_local_values = parent_shared.cloneable_local_values;
121            let hosted_descriptor_bytes = parent_shared.hosted_descriptor_bytes;
122            let _hosted_owners = parent_shared.hosted_owners;
123            let hosted_rpc_owner_cells: HashMap<String, Arc<HostedRpcOwnerCell>> =
124                parent_shared.hosted_rpc_owner_cells.into_iter().collect();
125            let parent_constructed_shared_values = parent_shared.parent_constructed_shared_values;
126            // Pre-built RpcFactory lookup keyed by qualified id, so worker
127            // subprocesses can build stubs without re-locking the global
128            // REGISTERED_DEPENDENCY_CONSTRUCTORS.
129            let rpc_factories: HashMap<String, RpcFactory> = registered_dependency_constructors
130                .iter()
131                .filter_map(|d| {
132                    if d.scope == crate::internal::DepScope::HostedRpc {
133                        d.rpc_factory
134                            .as_ref()
135                            .map(|f| (d.qualified_id(), f.clone()))
136                    } else {
137                        None
138                    }
139                })
140                .collect();
141            // Build a combined Cloneable + Hosted codec/worker lookup table
142            // now, before `test_thread` workers are spawned (see sync.rs
143            // for rationale). Keyed by the dep's fully-qualified id
144            // (`{crate}::{module}::{name}`) so workers can route an incoming
145            // `ProvideCloneable` / `ProvideHostedDescriptor` to the correct
146            // dep even when two deps share a local `name` in different
147            // modules.
148            let cloneable_codecs: HashMap<String, (CloneableCodec, WorkerReconstructor)> =
149                registered_dependency_constructors
150                    .iter()
151                    .filter_map(|d| {
152                        let codec_opt = match d.scope {
153                            crate::internal::DepScope::Cloneable => d.cloneable_codec.as_ref(),
154                            crate::internal::DepScope::Hosted => d.hosted_codec.as_ref(),
155                            _ => None,
156                        };
157                        match (codec_opt, &d.worker_fn) {
158                            (Some(codec), Some(worker_fn)) => {
159                                Some((d.qualified_id(), (codec.clone(), worker_fn.clone())))
160                            }
161                            _ => None,
162                        }
163                    })
164                    .collect();
165            // Mode-consistent Cloneable semantics for the no-spawn-workers
166            // path (e.g. `--nocapture`): reuse the parent-constructed value
167            // directly instead of re-running the user constructor in
168            // `materialize_deps`. Without this, a Cloneable dep's
169            // constructor would run twice (once for parent-side
170            // `collect_parent_shared_dependencies_async`, once for the
171            // in-process test execution), which both violates the
172            // "constructor runs once" expectation and can deadlock when
173            // the constructor takes a runtime-wide lock.
174            if is_top_level_parent && !args.spawn_workers && !cloneable_local_values.is_empty() {
175                apply_cloneable_values_locally(&mut execution, &cloneable_local_values);
176            }
177            // Mode-consistent Hosted semantics: when this is the top-level
178            // parent AND we do NOT spawn workers (e.g. --nocapture), the
179            // test functions run in this same process, but they must still
180            // see the *worker-side handle* produced by
181            // `HostedDep::from_descriptor`. Reconstruct each handle locally
182            // via the descriptor round-trip and pre-populate the execution
183            // tree.
184            if is_top_level_parent && !args.spawn_workers && !hosted_descriptor_bytes.is_empty() {
185                apply_hosted_descriptors_locally(
186                    &mut execution,
187                    &cloneable_codecs,
188                    &hosted_descriptor_bytes,
189                )
190                .await;
191            }
192            // Mode-consistent HostedRpc semantics for the no-spawn-workers
193            // path: install in-process stubs that route straight to the
194            // parent-held owner cells, so tests see the same `Stub` value
195            // whether or not the runner spawns workers.
196            if is_top_level_parent && !args.spawn_workers && !hosted_rpc_owner_cells.is_empty() {
197                install_local_hosted_rpc_stubs(
198                    &mut execution,
199                    &rpc_factories,
200                    &hosted_rpc_owner_cells,
201                );
202            }
203            // Mirror of `sync::apply_parent_constructed_shared_values_locally`:
204            // in no-spawn-workers mode, install any `Shared`/`PerWorker` dep
205            // values the parent had to construct as transitive inputs to a
206            // Cloneable/Hosted/HostedRpc dep. The in-process test thread's
207            // `materialize_deps` then reuses them instead of re-running the
208            // constructor in the same process.
209            if is_top_level_parent
210                && !args.spawn_workers
211                && !parent_constructed_shared_values.is_empty()
212            {
213                apply_parent_constructed_shared_values_locally(
214                    &mut execution,
215                    &parent_constructed_shared_values,
216                );
217            }
218            if args.spawn_workers {
219                execution.skip_creating_dependencies();
220            }
221
222            // println!("Execution plan: {execution:?}");
223            // println!("Final args: {args:?}");
224            // println!("Has dependencies: {:?}", execution.has_dependencies());
225
226            let count = execution.remaining();
227            let results = Arc::new(Mutex::new(Vec::with_capacity(count)));
228            // Parent-side per-test execution windows aligned 1:1 with
229            // `results`. The host-capture finaliser uses them after all
230            // test_threads finish to attribute spilled host-log records
231            // to the test(s) whose window contains each record.
232            let host_windows: Arc<Mutex<Vec<crate::host_capture::HostWindow>>> =
233                Arc::new(Mutex::new(Vec::with_capacity(count)));
234
235            let start = Instant::now();
236            output.start_suite(&filtered_tests);
237
238            let execution = Arc::new(Mutex::new(execution));
239            let cloneable_wire_bytes = Arc::new(cloneable_wire_bytes);
240            let hosted_descriptor_bytes = Arc::new(hosted_descriptor_bytes);
241            let cloneable_codecs = Arc::new(cloneable_codecs);
242            let rpc_factories = Arc::new(rpc_factories);
243            let hosted_rpc_owner_cells = Arc::new(hosted_rpc_owner_cells);
244            let mut join_set = JoinSet::new();
245            let threads = args.test_threads().get();
246
247            for worker_idx in 0..threads {
248                let execution_clone = execution.clone();
249                let output_clone = output.clone();
250                // Stamp each test-thread's args with the worker index it will
251                // hand to its spawned child via `--worker-index <N>`.
252                let mut args_clone = args.clone();
253                if args_clone.spawn_workers {
254                    args_clone.worker_index = Some(worker_idx);
255                }
256                let results_clone = results.clone();
257                let host_windows_clone = host_windows.clone();
258                let wire_bytes_clone = cloneable_wire_bytes.clone();
259                let hosted_bytes_clone = hosted_descriptor_bytes.clone();
260                let codecs_clone = cloneable_codecs.clone();
261                let rpc_factories_clone = rpc_factories.clone();
262                let hosted_rpc_owner_cells_clone = hosted_rpc_owner_cells.clone();
263                let handle = tokio::runtime::Handle::current();
264                join_set.spawn_blocking(move || {
265                    handle.block_on(test_thread(
266                        args_clone,
267                        execution_clone,
268                        output_clone,
269                        count,
270                        results_clone,
271                        host_windows_clone,
272                        wire_bytes_clone,
273                        hosted_bytes_clone,
274                        codecs_clone,
275                        rpc_factories_clone,
276                        hosted_rpc_owner_cells_clone,
277                        host_capture_epoch,
278                    ))
279                });
280            }
281
282            while let Some(res) = join_set.join_next().await {
283                res.expect("Failed to join task");
284            }
285
286            drop(execution);
287
288            let mut results = results.lock().await;
289            // Drop parent-owned hosted / hosted-rpc owners BEFORE
290            // finalising host capture so any `Drop` impls that
291            // shutdown background threads / subprocesses get a chance
292            // to emit their final lines through the still-active
293            // capture pipe. If we restored fd 1/2 first, those late
294            // lines would either land on the about-to-render
295            // structured output or be swallowed entirely.
296            drop(hosted_rpc_owner_cells);
297            drop(_hosted_owners);
298
299            // Finalise host capture (if any) BEFORE rendering the
300            // suite so the attributed host-log records make it into
301            // the per-test captured-output vecs the formatter walks.
302            if let Some(hc) = host_capture.take() {
303                let epoch_wall = host_capture_epoch_wall.unwrap_or_else(|| hc.epoch_wall());
304                let records = hc.finalize();
305                let windows = host_windows.lock().await;
306                let windows_indexed: Vec<(usize, crate::host_capture::HostWindow)> =
307                    windows.iter().copied().enumerate().collect();
308                crate::host_capture::attribute_records_to_tests(
309                    epoch_wall,
310                    &records,
311                    &windows_indexed,
312                    &mut results,
313                );
314            }
315            output.finished_suite(&all_tests, &results, start.elapsed());
316            exit_code = SuiteResult::exit_code(&results);
317
318            if exit_code == ExitCode::SUCCESS {
319                break;
320            } else {
321                remaining_retries -= 1;
322            }
323        }
324        exit_code
325    }
326}
327
328#[allow(clippy::too_many_arguments)]
329async fn test_thread(
330    args: Arguments,
331    execution: Arc<Mutex<TestSuiteExecution>>,
332    output: Arc<dyn TestRunnerOutput>,
333    count: usize,
334    results: Arc<Mutex<Vec<(RegisteredTest, TestResult)>>>,
335    host_windows: Arc<Mutex<Vec<crate::host_capture::HostWindow>>>,
336    cloneable_wire_bytes: Arc<Vec<DepWireBytes>>,
337    hosted_descriptor_bytes: Arc<Vec<DepWireBytes>>,
338    cloneable_codecs: Arc<HashMap<String, (CloneableCodec, WorkerReconstructor)>>,
339    rpc_factories: Arc<HashMap<String, RpcFactory>>,
340    hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
341    host_capture_epoch: Option<std::time::Instant>,
342) {
343    let mut worker = spawn_worker_if_needed(&args).await;
344    // Parent dispatches incoming `HostedRpcCall` frames against the owner
345    // cells materialised in the top-level parent. Workers don't need the owner
346    // cells (they own stubs instead), so they receive an empty map and the
347    // dispatch code path is never reached in subprocesses.
348    if let Some(worker) = worker.as_mut() {
349        worker.set_hosted_rpc_owner_cells(hosted_rpc_owner_cells.clone());
350    }
351    let connection_arc = if let Some(ref name) = args.ipc {
352        let name = ipc_name(name.clone());
353        let stream = Stream::connect(name)
354            .await
355            .expect("Failed to connect to IPC socket");
356        Some(Arc::new(Mutex::new(stream)))
357    } else {
358        None
359    };
360
361    if let Some(worker) = worker.as_mut() {
362        for (dep_id, wire_bytes) in cloneable_wire_bytes.iter() {
363            worker
364                .provide_cloneable(dep_id.clone(), wire_bytes.clone())
365                .await;
366        }
367        // Ship every Hosted dep's descriptor bytes too.
368        for (dep_id, descriptor_bytes) in hosted_descriptor_bytes.iter() {
369            worker
370                .provide_hosted_descriptor(dep_id.clone(), descriptor_bytes.clone())
371                .await;
372        }
373    }
374
375    // Worker subprocess side: build a stub for every HostedRpc dep registered
376    // in this binary using the IPC-backed transport sharing the same socket as
377    // the main IPC loop. Install the stubs in the execution tree so dependency
378    // materialisation skips the parent-only owner constructor.
379    if let Some(connection) = connection_arc.as_ref() {
380        if !rpc_factories.is_empty() {
381            install_worker_subprocess_hosted_rpc_stubs(
382                &execution,
383                &rpc_factories,
384                connection.clone(),
385            )
386            .await;
387        }
388    }
389
390    let mut expected_test = None;
391
392    while !is_done(&execution).await {
393        if let Some(connection) = connection_arc.as_ref() {
394            while expected_test.is_none() {
395                let mut conn = connection.lock().await;
396                let command_bytes = read_frame_async(&mut *conn)
397                    .await
398                    .expect("Failed to read IPC command frame");
399                drop(conn);
400                let command: IpcCommand =
401                    deserialize(&command_bytes).expect("Failed to decode IPC command");
402
403                match command {
404                    IpcCommand::RunTest {
405                        name,
406                        crate_name,
407                        module_path,
408                    } => {
409                        expected_test = Some((name, crate_name, module_path));
410                    }
411                    IpcCommand::ProvideCloneable { dep_id, wire_bytes } => {
412                        // Worker-side reconstruction (see sync.rs::apply_provided_wire_bytes).
413                        apply_provided_wire_bytes(
414                            &execution,
415                            &cloneable_codecs,
416                            &dep_id,
417                            &wire_bytes,
418                            "ProvideCloneable",
419                        )
420                        .await;
421                        let response = IpcResponse::CloneableAccepted { dep_id };
422                        let msg = serialize_to_byte_vec(&response)
423                            .expect("Failed to encode IPC response");
424                        let mut conn = connection.lock().await;
425                        write_frame_async(&mut *conn, &msg)
426                            .await
427                            .expect("Failed to write IPC response frame");
428                    }
429                    IpcCommand::ProvideHostedDescriptor { dep_id, wire_bytes } => {
430                        // Worker-side reconstruction: same shape as
431                        // ProvideCloneable but routed through the registered
432                        // HostedDep worker_fn.
433                        apply_provided_wire_bytes(
434                            &execution,
435                            &cloneable_codecs,
436                            &dep_id,
437                            &wire_bytes,
438                            "ProvideHostedDescriptor",
439                        )
440                        .await;
441                        let response = IpcResponse::HostedDescriptorAccepted { dep_id };
442                        let msg = serialize_to_byte_vec(&response)
443                            .expect("Failed to encode IPC response");
444                        let mut conn = connection.lock().await;
445                        write_frame_async(&mut *conn, &msg)
446                            .await
447                            .expect("Failed to write IPC response frame");
448                    }
449                    IpcCommand::HostedRpcReply { .. } => {
450                        // HR1.2: replies for worker-initiated HostedRpc calls
451                        // are consumed inline by the IPC transport during
452                        // test execution, never by this between-tests
453                        // command loop. Receiving one here means the
454                        // protocol got out of sync; surface that loudly
455                        // rather than dropping the frame.
456                        panic!(
457                            "unexpected `HostedRpcReply` while waiting for the next \
458                             between-tests command in the tokio worker subprocess: a \
459                             stub call must have left a reply on the wire without \
460                             draining it inline"
461                        );
462                    }
463                }
464            }
465        }
466
467        if let Some(next) = pick_next(&execution).await {
468            let skip = if let Some((name, crate_name, module_path)) = &expected_test {
469                next.test.name != *name
470                    || next.test.crate_name != *crate_name
471                    || next.test.module_path != *module_path
472            } else {
473                false
474            };
475
476            if !skip {
477                expected_test = None;
478
479                let ensure_time = get_ensure_time(&args, &next.test);
480
481                // Snapshot the parent's monotonic-clock view of the
482                // test start. The matching end-instant is captured
483                // after `finished_running_test`, and the pair becomes
484                // a `HostWindow` for record attribution. Uses
485                // `std::time::Instant` because the host-capture epoch
486                // is `std::time::Instant` (tokio's `Instant` is a
487                // different type without `From` interop).
488                let window_start = std::time::Instant::now();
489
490                output.start_running_test(&next.test, next.index, count);
491                let result = run_test(
492                    output.clone(),
493                    next.index,
494                    count,
495                    args.nocapture,
496                    args.include_ignored,
497                    ensure_time,
498                    next.deps.clone(),
499                    &next.test,
500                    &mut worker,
501                )
502                .await;
503                output.finished_running_test(&next.test, next.index, count, &result);
504                let window_end = std::time::Instant::now();
505
506                if let Some(connection) = connection_arc.as_ref() {
507                    let finish_marker = Uuid::new_v4().to_string();
508                    let finish_marker_line = format!("{finish_marker}\n");
509                    tokio::io::stdout()
510                        .write_all(finish_marker_line.as_bytes())
511                        .await
512                        .unwrap();
513                    tokio::io::stderr()
514                        .write_all(finish_marker_line.as_bytes())
515                        .await
516                        .unwrap();
517                    tokio::io::stdout().flush().await.unwrap();
518                    tokio::io::stderr().flush().await.unwrap();
519
520                    let response = IpcResponse::TestFinished {
521                        result: (&result).into(),
522                        finish_marker,
523                    };
524                    let msg =
525                        serialize_to_byte_vec(&response).expect("Failed to encode IPC response");
526                    let mut conn = connection.lock().await;
527                    write_frame_async(&mut *conn, &msg)
528                        .await
529                        .expect("Failed to write IPC response frame");
530                }
531
532                // Push the result and its window under the same
533                // critical section so the two vecs stay aligned in the
534                // face of concurrent pushes from sibling test_threads.
535                let window = crate::host_capture::HostWindow::from_instants(
536                    host_capture_epoch,
537                    window_start,
538                    window_end,
539                )
540                .unwrap_or(crate::host_capture::HostWindow {
541                    start: std::time::Duration::ZERO,
542                    end: std::time::Duration::ZERO,
543                });
544                let mut results_guard = results.lock().await;
545                let mut windows_guard = host_windows.lock().await;
546                results_guard.push((next.test.clone(), result));
547                windows_guard.push(window);
548            }
549        }
550    }
551}
552
553async fn is_done(execution: &Arc<Mutex<TestSuiteExecution>>) -> bool {
554    let execution = execution.lock().await;
555    execution.is_done()
556}
557
558/// Async counterpart to `sync::apply_provided_wire_bytes`. Decodes the wire
559/// bytes into a worker-side dependency value (looked up by the dep's
560/// fully-qualified id `{crate}::{module}::{name}`) and stores it in the
561/// execution tree so the next `materialize_deps` call uses the pre-resolved
562/// value.
563///
564/// `source_command` is the textual name of the IPC command that delivered
565/// the bytes (`"ProvideCloneable"` or `"ProvideHostedDescriptor"`); used
566/// only in panic messages.
567async fn apply_provided_wire_bytes(
568    execution: &Arc<Mutex<TestSuiteExecution>>,
569    wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
570    dep_id: &str,
571    wire_bytes: &[u8],
572    source_command: &str,
573) {
574    let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
575        panic!("{source_command} referenced unknown wire-shipped dep '{dep_id}'")
576    });
577
578    let wire_payload = (codec.from_wire_bytes)(wire_bytes);
579    let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
580        Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
581    let reconstructed = match worker_fn {
582        WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
583        WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
584    };
585
586    let mut execution = execution.lock().await;
587    let applied = execution.provide_cloneable_value(dep_id, reconstructed);
588    assert!(
589        applied,
590        "{source_command} for dep '{dep_id}' did not match any registered dep in this worker"
591    );
592}
593
594/// Mode-consistent Cloneable semantics for the no-spawn-workers path on
595/// the tokio runner. Mirrors `sync::apply_cloneable_values_locally`: takes
596/// the parent-constructed Cloneable values and installs them directly
597/// into the parent's `TestSuiteExecution`, so `materialize_deps` reuses
598/// them instead of re-running the user constructor.
599///
600/// For `Cloneable`, the documented round-trip
601/// `from_wire(to_wire(value))` is semantics-preserving, so reusing the
602/// parent value directly is equivalent to round-tripping it through the
603/// wire codec while avoiding the duplicate constructor run that would
604/// otherwise occur on the no-spawn-workers code path. The duplicate run
605/// historically caused user-visible problems (extra observable side
606/// effects under `--nocapture`, and deadlocks when the constructor takes
607/// a runtime-wide lock).
608fn apply_cloneable_values_locally(
609    execution: &mut TestSuiteExecution,
610    cloneable_local_values: &[(String, Arc<dyn Any + Send + Sync>)],
611) {
612    for (dep_id, value) in cloneable_local_values {
613        let applied = execution.provide_cloneable_value(dep_id, value.clone());
614        assert!(
615            applied,
616            "Cloneable dep '{dep_id}' could not be pre-populated locally"
617        );
618    }
619}
620
621/// Tokio counterpart to `sync::apply_parent_constructed_shared_values_locally`.
622/// In no-spawn-workers mode, installs `Shared`/`PerWorker` dep values that
623/// the parent had to construct as transitive inputs to a
624/// Cloneable/Hosted/HostedRpc dep, so the in-process test thread reuses
625/// them instead of re-running the constructor in the same process.
626fn apply_parent_constructed_shared_values_locally(
627    execution: &mut TestSuiteExecution,
628    values: &[(String, Arc<dyn Any + Send + Sync>)],
629) {
630    for (dep_id, value) in values {
631        let applied = execution.provide_materialized_shared_value(dep_id, value.clone());
632        assert!(
633            applied,
634            "Shared/PerWorker dep '{dep_id}' could not be pre-populated locally"
635        );
636    }
637}
638
639/// Mode-consistent Hosted semantics for the no-spawn-workers path on the
640/// tokio runner. Mirrors `sync::apply_hosted_descriptors_locally`: takes
641/// the parent-collected descriptor bytes and reconstructs each Hosted
642/// dep's worker-side handle (via the registered codec + worker_fn)
643/// directly in the parent's `TestSuiteExecution`.
644///
645/// Both `WorkerReconstructor::Sync` (`HostedDep::from_descriptor`) and
646/// `WorkerReconstructor::Async` (`AsyncHostedDep::from_descriptor`) are
647/// supported here so that `async_worker` Hosted deps see the same
648/// worker-side handle whether the runner ended up in spawned-worker mode
649/// or in the no-spawn fallback, matching the documented mode-consistent
650/// Hosted contract in `book/src/advanced_features/dependency_sharing.md`.
651async fn apply_hosted_descriptors_locally(
652    execution: &mut TestSuiteExecution,
653    wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
654    descriptor_bytes: &[DepWireBytes],
655) {
656    for (dep_id, wire_bytes) in descriptor_bytes {
657        let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
658            panic!("Hosted dep '{dep_id}' missing codec/worker_fn for local handle reconstruction")
659        });
660        let wire_payload = (codec.from_wire_bytes)(wire_bytes);
661        let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
662            Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
663        let reconstructed = match worker_fn {
664            WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
665            WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
666        };
667        let applied = execution.provide_cloneable_value(dep_id, reconstructed);
668        assert!(
669            applied,
670            "Hosted dep '{dep_id}' could not be pre-populated locally"
671        );
672    }
673}
674
675async fn pick_next(execution: &Arc<Mutex<TestSuiteExecution>>) -> Option<TestExecution> {
676    let mut execution = execution.lock().await;
677    execution.pick_next().await
678}
679
680async fn run_with_flakiness_control<F>(
681    output: Arc<dyn TestRunnerOutput>,
682    test_description: &RegisteredTest,
683    idx: usize,
684    count: usize,
685    test: F,
686) -> Result<Result<(), FailureCause>, Box<dyn Any + Send>>
687where
688    F: Fn(
689            Instant,
690        )
691            -> Pin<Box<dyn Future<Output = Result<Result<(), FailureCause>, Box<dyn Any + Send>>>>>
692        + Send
693        + Sync,
694{
695    match &test_description.props.flakiness_control {
696        FlakinessControl::None => {
697            let start = Instant::now();
698            test(start).await
699        }
700        FlakinessControl::ProveNonFlaky(tries) => {
701            for n in 0..*tries {
702                if n > 0 {
703                    output.repeat_running_test(
704                        test_description,
705                        idx,
706                        count,
707                        n + 1,
708                        *tries,
709                        "to ensure test is not flaky",
710                    );
711                }
712                let start = Instant::now();
713                match test(start).await {
714                    Ok(Ok(())) => {}
715                    Ok(Err(e)) => return Ok(Err(e)),
716                    Err(e) => return Err(e),
717                };
718            }
719            Ok(Ok(()))
720        }
721        FlakinessControl::RetryKnownFlaky(max_retries) => {
722            let mut tries = 1;
723            loop {
724                let start = Instant::now();
725                let result = test(start).await;
726
727                if result.is_err() && tries < *max_retries {
728                    tries += 1;
729                    output.repeat_running_test(
730                        test_description,
731                        idx,
732                        count,
733                        tries,
734                        *max_retries,
735                        "because test is known to be flaky",
736                    );
737                } else {
738                    break result;
739                }
740            }
741        }
742    }
743}
744
745#[allow(clippy::too_many_arguments)]
746async fn run_test(
747    output: Arc<dyn TestRunnerOutput>,
748    idx: usize,
749    count: usize,
750    nocapture: bool,
751    include_ignored: bool,
752    ensure_time: Option<TimeThreshold>,
753    dependency_view: Arc<dyn internal::DependencyView + Send + Sync>,
754    test: &RegisteredTest,
755    worker: &mut Option<Worker>,
756) -> TestResult {
757    if test.props.is_ignored && !include_ignored {
758        TestResult::ignored()
759    } else if let Some(worker) = worker.as_mut() {
760        worker.run_test(nocapture, test).await
761    } else {
762        let start = Instant::now();
763        let test = test.clone();
764        match &test.run {
765            TestFunction::Sync(_) => {
766                let handle = spawn_blocking(move || {
767                    let test = test.clone();
768                    crate::sync::run_sync_test_function(
769                        output,
770                        &test,
771                        idx,
772                        count,
773                        ensure_time,
774                        dependency_view,
775                    )
776                });
777                handle.await.unwrap_or_else(|join_error| {
778                    TestResult::failed(
779                        start.elapsed(),
780                        FailureCause::HarnessError(format!(
781                            "Failed joining test task: {join_error}"
782                        )),
783                    )
784                })
785            }
786            TestFunction::Async(test_fn) => {
787                let timeout = test.props.timeout;
788                let test_fn = test_fn.clone();
789                let detached_panic_policy = test.props.detached_panic_policy.clone();
790                let result = run_with_flakiness_control(output, &test, idx, count, |start| {
791                    let dependency_view = dependency_view.clone();
792                    let test_fn = test_fn.clone();
793                    Box::pin(async move {
794                        let test_id = crate::panic_hook::next_test_id();
795                        crate::panic_hook::set_current_test_id(test_id);
796                        crate::panic_hook::create_detached_collector(test_id);
797                        let result = AssertUnwindSafe(Box::pin(async move {
798                            match timeout {
799                                None => test_fn(dependency_view).await,
800                                Some(duration) => {
801                                    let result =
802                                        tokio::time::timeout(duration, test_fn(dependency_view))
803                                            .await;
804                                    match result {
805                                        Ok(result) => result,
806                                        Err(_) => {
807                                            return Err(FailureCause::HarnessError(
808                                                "Test timed out".to_string(),
809                                            ))
810                                        }
811                                    }
812                                }
813                            }
814                            .into_result()?;
815                            if let Some(ensure_time) = ensure_time {
816                                let elapsed = start.elapsed();
817                                if ensure_time.is_critical(&elapsed) {
818                                    return Err(FailureCause::HarnessError(format!(
819                                        "Test run time exceeds critical threshold: {elapsed:?}"
820                                    )));
821                                }
822                            }
823                            Ok(())
824                        }))
825                        .catch_unwind()
826                        .await;
827                        result
828                    })
829                })
830                .await;
831                let mut test_result =
832                    TestResult::from_result(&test.props.should_panic, start.elapsed(), result);
833                if let Some(test_id) = crate::panic_hook::current_test_id() {
834                    if let Some(collector) = crate::panic_hook::take_detached_collector(test_id) {
835                        let panics = match collector.lock() {
836                            Ok(p) => p,
837                            Err(poisoned) => poisoned.into_inner(),
838                        };
839                        if !panics.is_empty()
840                            && detached_panic_policy == internal::DetachedPanicPolicy::FailTest
841                            && test_result.is_passed()
842                        {
843                            let messages: Vec<String> = panics.iter().map(|p| p.render()).collect();
844                            test_result = TestResult::failed(
845                                start.elapsed(),
846                                FailureCause::Panic(internal::PanicCause {
847                                    message: Some(format!(
848                                        "Detached task(s) panicked:\n{}",
849                                        messages.join("\n---\n")
850                                    )),
851                                    location: panics.first().and_then(|p| p.location.clone()),
852                                    backtrace: panics.first().and_then(|p| p.backtrace.clone()),
853                                }),
854                            );
855                        }
856                    }
857                }
858                crate::panic_hook::clear_current_test_id();
859                test_result
860            }
861            TestFunction::SyncBench(_) => {
862                let handle = spawn_blocking(move || {
863                    let test = test.clone();
864                    crate::sync::run_sync_test_function(
865                        output,
866                        &test,
867                        idx,
868                        count,
869                        ensure_time,
870                        dependency_view,
871                    )
872                });
873                handle.await.unwrap_or_else(|join_error| {
874                    TestResult::failed(
875                        start.elapsed(),
876                        FailureCause::HarnessError(format!(
877                            "Failed joining test task: {join_error}"
878                        )),
879                    )
880                })
881            }
882            TestFunction::AsyncBench(bench_fn) => {
883                let mut bencher = AsyncBencher::new();
884                let test_id = crate::panic_hook::next_test_id();
885                crate::panic_hook::set_current_test_id(test_id);
886                let result = AssertUnwindSafe(async move {
887                    bench_fn(&mut bencher, dependency_view).await;
888                    (
889                        bencher
890                            .summary()
891                            .expect("iter() was not called in bench function"),
892                        bencher.bytes,
893                    )
894                })
895                .catch_unwind()
896                .await;
897                let bytes = result.as_ref().map(|(_, bytes)| *bytes).unwrap_or_default();
898                let test_result = TestResult::from_summary(
899                    &test.props.should_panic,
900                    start.elapsed(),
901                    result.map(|(summary, _)| summary),
902                    bytes,
903                );
904                crate::panic_hook::clear_current_test_id();
905                test_result
906            }
907        }
908    }
909}
910
911struct Worker {
912    _listener: Listener,
913    _process: Child,
914    _out_handle: JoinHandle<()>,
915    _err_handle: JoinHandle<()>,
916    out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
917    err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
918    capture_enabled: Arc<Mutex<bool>>,
919    connection: Stream,
920    /// Parent-held HostedRpc owner cells keyed by fully-qualified dep id. Used
921    /// to dispatch incoming `IpcResponse::HostedRpcCall` frames from the worker
922    /// subprocess back to the right owner.
923    hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
924}
925
926impl Worker {
927    /// Installs the parent-side map of HostedRpc owner cells so this worker can
928    /// route incoming `IpcResponse::HostedRpcCall` frames to the right
929    /// `HostedRpcOwnerCell` while waiting for a worker subprocess response.
930    fn set_hosted_rpc_owner_cells(&mut self, cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>) {
931        self.hosted_rpc_owner_cells = cells;
932    }
933
934    /// Parent-side dispatcher for a single `IpcResponse::HostedRpcCall`. Looks
935    /// up the owner cell by fully-qualified dep id, runs the dispatch on the
936    /// parent's stored owner, and writes the matching
937    /// `IpcCommand::HostedRpcReply` back to the worker subprocess. Mirrors
938    /// `sync::Worker::handle_hosted_rpc_call`.
939    async fn handle_hosted_rpc_call(
940        &mut self,
941        dump_on_ipc_failure: &DumpOnFailure,
942        request_id: u64,
943        dep_id: String,
944        method_idx: u32,
945        args_bytes: Vec<u8>,
946    ) {
947        let body = match self.hosted_rpc_owner_cells.get(&dep_id) {
948            // Use the async dispatch entry point so an owner that implements
949            // `AsyncHostedRpcDep` directly can `.await` inside its dispatcher
950            // without blocking the tokio runtime. Sync owners reach this
951            // entry point through the blanket bridge and their dispatched
952            // future resolves immediately.
953            Some(cell) => match cell.dispatch_async(method_idx, &args_bytes).await {
954                Ok(result_bytes) => HostedRpcReplyBody::Ok { result_bytes },
955                Err(message) => HostedRpcReplyBody::Err { message },
956            },
957            None => HostedRpcReplyBody::Err {
958                message: format!(
959                    "HostedRpc dispatch: unknown dep id '{dep_id}' in parent owner-cell map"
960                ),
961            },
962        };
963        let reply = IpcCommand::HostedRpcReply { request_id, body };
964        let msg = serialize_to_byte_vec(&reply).expect("Failed to encode HostedRpcReply");
965        dump_on_ipc_failure
966            .run(write_frame_async(&mut self.connection, &msg).await)
967            .await;
968    }
969
970    pub async fn run_test(&mut self, nocapture: bool, test: &RegisteredTest) -> TestResult {
971        let mut capture_enabled = self.capture_enabled.lock().await;
972        *capture_enabled = test.props.capture_control.requires_capturing(!nocapture);
973        drop(capture_enabled);
974
975        // Send IPC command and wait for IPC response, and in the meantime read from the stdout/stderr channels
976        let cmd = IpcCommand::RunTest {
977            name: test.name.clone(),
978            crate_name: test.crate_name.clone(),
979            module_path: test.module_path.clone(),
980        };
981
982        let dump_on_ipc_failure = self.dump_on_failure();
983
984        let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
985        dump_on_ipc_failure
986            .run(write_frame_async(&mut self.connection, &msg).await)
987            .await;
988
989        let response = loop {
990            let response_bytes = dump_on_ipc_failure
991                .run(read_frame_async(&mut self.connection).await)
992                .await;
993            let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
994            match response {
995                IpcResponse::TestFinished { .. } => break response,
996                IpcResponse::CloneableAccepted { .. }
997                | IpcResponse::HostedDescriptorAccepted { .. } => continue,
998                IpcResponse::HostedRpcCall {
999                    request_id,
1000                    dep_id,
1001                    method_idx,
1002                    args_bytes,
1003                } => {
1004                    self.handle_hosted_rpc_call(
1005                        &dump_on_ipc_failure,
1006                        request_id,
1007                        dep_id,
1008                        method_idx,
1009                        args_bytes,
1010                    )
1011                    .await;
1012                    continue;
1013                }
1014            }
1015        };
1016
1017        let IpcResponse::TestFinished {
1018            result,
1019            finish_marker,
1020        } = response
1021        else {
1022            unreachable!("loop only breaks on TestFinished")
1023        };
1024
1025        if test.props.capture_control.requires_capturing(!nocapture) {
1026            let out_lines: Vec<_> =
1027                Self::drain_until(self.out_lines.clone(), finish_marker.clone()).await;
1028            let err_lines: Vec<_> =
1029                Self::drain_until(self.err_lines.clone(), finish_marker.clone()).await;
1030            result.into_test_result(out_lines, err_lines)
1031        } else {
1032            result.into_test_result(Vec::new(), Vec::new())
1033        }
1034    }
1035
1036    /// Async counterpart to `sync::Worker::provide_cloneable`. `dep_id` is the
1037    /// dep's fully-qualified id (`{crate}::{module}::{name}`).
1038    async fn provide_cloneable(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
1039        let dump_on_ipc_failure = self.dump_on_failure();
1040        let cmd = IpcCommand::ProvideCloneable {
1041            dep_id: dep_id.clone(),
1042            wire_bytes,
1043        };
1044        let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
1045        dump_on_ipc_failure
1046            .run(write_frame_async(&mut self.connection, &msg).await)
1047            .await;
1048
1049        loop {
1050            let response_bytes = dump_on_ipc_failure
1051                .run(read_frame_async(&mut self.connection).await)
1052                .await;
1053            let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
1054            match response {
1055                IpcResponse::CloneableAccepted { dep_id: ack_id } => {
1056                    if ack_id == dep_id {
1057                        return;
1058                    }
1059                }
1060                IpcResponse::HostedDescriptorAccepted { .. } => {
1061                    // Out-of-band ack from a previous ProvideHostedDescriptor; ignore.
1062                }
1063                IpcResponse::TestFinished { .. } => {
1064                    // Should not happen before any RunTest.
1065                }
1066                IpcResponse::HostedRpcCall {
1067                    request_id,
1068                    dep_id: rpc_dep_id,
1069                    method_idx,
1070                    args_bytes,
1071                } => {
1072                    // A worker subprocess can issue a HostedRpc call from
1073                    // inside an in-progress test, even while the parent is
1074                    // mid-`ProvideCloneable` for a different dep. Dispatch it
1075                    // so the protocol doesn't desync.
1076                    self.handle_hosted_rpc_call(
1077                        &dump_on_ipc_failure,
1078                        request_id,
1079                        rpc_dep_id,
1080                        method_idx,
1081                        args_bytes,
1082                    )
1083                    .await;
1084                }
1085            }
1086        }
1087    }
1088
1089    /// Async counterpart to `sync::Worker::provide_hosted_descriptor`.
1090    async fn provide_hosted_descriptor(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
1091        let dump_on_ipc_failure = self.dump_on_failure();
1092        let cmd = IpcCommand::ProvideHostedDescriptor {
1093            dep_id: dep_id.clone(),
1094            wire_bytes,
1095        };
1096        let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
1097        dump_on_ipc_failure
1098            .run(write_frame_async(&mut self.connection, &msg).await)
1099            .await;
1100
1101        loop {
1102            let response_bytes = dump_on_ipc_failure
1103                .run(read_frame_async(&mut self.connection).await)
1104                .await;
1105            let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
1106            match response {
1107                IpcResponse::HostedDescriptorAccepted { dep_id: ack_id } => {
1108                    if ack_id == dep_id {
1109                        return;
1110                    }
1111                }
1112                IpcResponse::CloneableAccepted { .. } => {
1113                    // Out-of-band ack from a previous ProvideCloneable; ignore.
1114                }
1115                IpcResponse::TestFinished { .. } => {
1116                    // Should not happen before any RunTest.
1117                }
1118                IpcResponse::HostedRpcCall {
1119                    request_id,
1120                    dep_id: rpc_dep_id,
1121                    method_idx,
1122                    args_bytes,
1123                } => {
1124                    // See provide_cloneable arm. Dispatch the call inline so
1125                    // the IPC stream stays in sync.
1126                    self.handle_hosted_rpc_call(
1127                        &dump_on_ipc_failure,
1128                        request_id,
1129                        rpc_dep_id,
1130                        method_idx,
1131                        args_bytes,
1132                    )
1133                    .await;
1134                }
1135            }
1136        }
1137    }
1138
1139    fn dump_on_failure(&self) -> DumpOnFailure {
1140        DumpOnFailure {
1141            out_lines: self.out_lines.clone(),
1142            err_lines: self.err_lines.clone(),
1143        }
1144    }
1145
1146    async fn drain_until(
1147        source: Arc<Mutex<VecDeque<CapturedOutput>>>,
1148        finish_marker: String,
1149    ) -> Vec<CapturedOutput> {
1150        let mut result = Vec::new();
1151        loop {
1152            let mut source = source.lock().await;
1153            while let Some(line) = source.pop_front() {
1154                if line.line() == finish_marker {
1155                    return result;
1156                } else {
1157                    result.push(line.clone());
1158                }
1159            }
1160            drop(source);
1161
1162            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1163        }
1164    }
1165}
1166
1167struct DumpOnFailure {
1168    out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1169    err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1170}
1171
1172impl DumpOnFailure {
1173    pub async fn run<T, E>(&self, result: Result<T, E>) -> T {
1174        match result {
1175            Ok(value) => value,
1176            Err(_error) => {
1177                let out_lines: Vec<_> = self.out_lines.lock().await.drain(..).collect();
1178                let err_lines: Vec<_> = self.err_lines.lock().await.drain(..).collect();
1179                let mut all_lines = [out_lines, err_lines].concat();
1180                all_lines.sort();
1181
1182                // Route the diagnostic through the real terminal even
1183                // when host capture has fd 2 redirected into its pipe.
1184                // We `process::exit(1)` immediately after this and
1185                // never reach `host_capture.finalize()`, so anything we
1186                // wrote into the host pipe would be lost. The capture
1187                // pipe and its reader are abandoned on exit; that is
1188                // acceptable for this fatal-IPC path.
1189                use std::io::Write;
1190                let mut err = crate::host_capture::TerminalStderr;
1191                for line in all_lines {
1192                    let _ = writeln!(err, "{}", line.line());
1193                }
1194                let _ = err.flush();
1195
1196                std::process::exit(1);
1197            }
1198        }
1199    }
1200}
1201
1202async fn spawn_worker_if_needed(args: &Arguments) -> Option<Worker> {
1203    if args.spawn_workers {
1204        let id = Uuid::new_v4();
1205        let name_str = format!("{id}.sock");
1206        let name = name_str
1207            .clone()
1208            .to_ns_name::<GenericNamespaced>()
1209            .expect("Invalid local socket name");
1210        let opts = ListenerOptions::new().name(name.clone());
1211        let listener = opts
1212            .create_tokio()
1213            .expect("Failed to create local socket listener");
1214
1215        let exe = std::env::current_exe().expect("Failed to get current executable path");
1216
1217        let mut args = args.clone();
1218        args.ipc = Some(name_str);
1219        args.spawn_workers = false;
1220        args.logfile = None;
1221        let args = args.to_args();
1222
1223        let mut process = Command::new(exe)
1224            .args(args)
1225            .stdin(Stdio::inherit())
1226            .stderr(Stdio::piped())
1227            .stdout(Stdio::piped())
1228            .spawn()
1229            .expect("Failed to spawn worker process");
1230
1231        let stdout = process.stdout.take().unwrap();
1232        let stderr = process.stderr.take().unwrap();
1233
1234        let out_lines = Arc::new(Mutex::new(VecDeque::new()));
1235        let err_lines = Arc::new(Mutex::new(VecDeque::new()));
1236        let capture_enabled = Arc::new(Mutex::new(true));
1237
1238        let out_lines_clone = out_lines.clone();
1239        let capture_enabled_clone = capture_enabled.clone();
1240        let out_handle = spawn(async move {
1241            let reader = BufReader::new(stdout);
1242            let mut lines = reader.lines();
1243            while let Some(line) = lines
1244                .next_line()
1245                .await
1246                .expect("Failed to read from worker stdout")
1247            {
1248                if *capture_enabled_clone.lock().await {
1249                    out_lines_clone
1250                        .lock()
1251                        .await
1252                        .push_back(CapturedOutput::stdout(line));
1253                } else {
1254                    // `#[never_capture]` pass-through: write to the real
1255                    // terminal even when host capture has redirected
1256                    // fd 1 into its pipe, so the worker line stays
1257                    // uncaptured live output and is not later
1258                    // re-labelled `[host]`.
1259                    use std::io::Write;
1260                    let mut out = crate::host_capture::TerminalStdout;
1261                    let _ = writeln!(out, "{line}");
1262                    let _ = out.flush();
1263                }
1264            }
1265        });
1266
1267        let err_lines_clone = err_lines.clone();
1268        let capture_enabled_clone = capture_enabled.clone();
1269        let err_handle = spawn(async move {
1270            let reader = BufReader::new(stderr);
1271            let mut lines = reader.lines();
1272            while let Some(line) = lines
1273                .next_line()
1274                .await
1275                .expect("Failed to read from worker stderr")
1276            {
1277                if *capture_enabled_clone.lock().await {
1278                    err_lines_clone
1279                        .lock()
1280                        .await
1281                        .push_back(CapturedOutput::stderr(line));
1282                } else {
1283                    // Same as the stdout pass-through above: route the
1284                    // never-captured worker line to the real terminal
1285                    // stderr, not the host capture pipe.
1286                    use std::io::Write;
1287                    let mut err = crate::host_capture::TerminalStderr;
1288                    let _ = writeln!(err, "{line}");
1289                    let _ = err.flush();
1290                }
1291            }
1292        });
1293
1294        let connection = listener
1295            .accept()
1296            .await
1297            .expect("Failed to accept connection");
1298
1299        Some(Worker {
1300            _listener: listener,
1301            _process: process,
1302            _out_handle: out_handle,
1303            _err_handle: err_handle,
1304            out_lines,
1305            err_lines,
1306            connection,
1307            capture_enabled,
1308            hosted_rpc_owner_cells: Arc::new(HashMap::new()),
1309        })
1310    } else {
1311        None
1312    }
1313}
1314
1315/// Parent-side `--nocapture` / no-spawn-workers helper. Builds one stub per
1316/// HostedRpc dep using an [`InProcessHostedRpcTransport`] that points at the
1317/// parent-held owner cells, and stashes it in the execution tree so dependency
1318/// materialisation skips the owner-only constructor. Mirrors
1319/// `sync::install_local_hosted_rpc_stubs`.
1320fn install_local_hosted_rpc_stubs(
1321    execution: &mut TestSuiteExecution,
1322    rpc_factories: &HashMap<String, RpcFactory>,
1323    owner_cells: &HashMap<String, Arc<HostedRpcOwnerCell>>,
1324) {
1325    let transport: Arc<dyn HostedRpcTransport> =
1326        Arc::new(InProcessHostedRpcTransport::new(owner_cells.clone()));
1327    for (dep_id, factory) in rpc_factories.iter() {
1328        if !owner_cells.contains_key(dep_id) {
1329            // No owner cell materialised for this dep (e.g. registered
1330            // globally but not pulled into the current filter). Skip so
1331            // we don't install a stub nothing routes.
1332            continue;
1333        }
1334        let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1335        let stub = (factory.build_stub)(channel);
1336        let applied = execution.provide_cloneable_value(dep_id, stub);
1337        assert!(
1338            applied,
1339            "Local HostedRpc stub for '{dep_id}' did not match any registered dep"
1340        );
1341    }
1342}
1343
1344/// Worker subprocess helper. Builds one stub per registered HostedRpc dep backed
1345/// by [`IpcHostedRpcTransport`], and installs it in the execution tree. Mirrors
1346/// `sync::install_worker_subprocess_hosted_rpc_stubs` but runs on the tokio
1347/// `Arc<Mutex<Stream>>` connection.
1348async fn install_worker_subprocess_hosted_rpc_stubs(
1349    execution: &Arc<Mutex<TestSuiteExecution>>,
1350    rpc_factories: &HashMap<String, RpcFactory>,
1351    connection_arc: Arc<Mutex<Stream>>,
1352) {
1353    let transport: Arc<dyn HostedRpcTransport> =
1354        Arc::new(IpcHostedRpcTransport::new(connection_arc));
1355    for (dep_id, factory) in rpc_factories.iter() {
1356        let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1357        let stub = (factory.build_stub)(channel);
1358        let mut execution = execution.lock().await;
1359        let applied = execution.provide_cloneable_value(dep_id, stub);
1360        // Not every binary registers a HostedRpc dep that the current
1361        // execution actually uses; if so, just move on.
1362        let _ = applied;
1363    }
1364}
1365
1366/// Worker subprocess `HostedRpcTransport` for the tokio runner.
1367/// Mirrors `sync::IpcHostedRpcTransport` but bridges a sync trait method
1368/// to the async tokio IPC primitives via
1369/// `tokio::task::block_in_place` + `Handle::current().block_on(...)`.
1370///
1371/// The shared `Arc<Mutex<Stream>>` is the same one used by the
1372/// worker subprocess's main IPC loop; the lock guarantees that a stub
1373/// call and the main loop never interleave a half-written frame. Calls
1374/// serialise across all in-flight stubs by acquiring the mutex for the
1375/// full request-then-reply round trip.
1376///
1377/// This relies on the temporal invariant documented on
1378/// [`crate::internal::HostedRpcChannel::call`]: stubs are only invoked
1379/// from inside a running test body, never from `build_stub`, and never
1380/// from detached background work that outlives the test. Under those
1381/// rules the main IPC loop is only reading between tests, so it cannot
1382/// race with a stub call.
1383struct IpcHostedRpcTransport {
1384    connection: Arc<Mutex<Stream>>,
1385    next_request_id: AtomicU64,
1386}
1387
1388impl IpcHostedRpcTransport {
1389    fn new(connection: Arc<Mutex<Stream>>) -> Self {
1390        Self {
1391            connection,
1392            next_request_id: AtomicU64::new(0),
1393        }
1394    }
1395}
1396
1397impl HostedRpcTransport for IpcHostedRpcTransport {
1398    fn call(
1399        &self,
1400        dep_id: &str,
1401        method_idx: u32,
1402        args: Vec<u8>,
1403    ) -> Result<Vec<u8>, HostedRpcError> {
1404        let request_id = self.next_request_id.fetch_add(1, Ordering::SeqCst);
1405        let call = IpcResponse::HostedRpcCall {
1406            request_id,
1407            dep_id: dep_id.to_string(),
1408            method_idx,
1409            args_bytes: args,
1410        };
1411        let msg = serialize_to_byte_vec(&call).map_err(|e| {
1412            HostedRpcError::Transport(format!("encode HostedRpcCall failed: {e:?}"))
1413        })?;
1414
1415        let connection = self.connection.clone();
1416        let handle = tokio::runtime::Handle::current();
1417
1418        // Run the async I/O round-trip from inside this sync trait
1419        // method. `block_in_place` releases this worker thread back to
1420        // the scheduler so the parent's read loop can continue making
1421        // progress on other tasks while we wait for the reply.
1422        tokio::task::block_in_place(move || {
1423            handle.block_on(async move {
1424                let mut conn = connection.lock().await;
1425                write_frame_async(&mut *conn, &msg).await.map_err(|e| {
1426                    HostedRpcError::Transport(format!("write HostedRpcCall failed: {e:?}"))
1427                })?;
1428                let reply_bytes = read_frame_async(&mut *conn).await.map_err(|e| {
1429                    HostedRpcError::Transport(format!("read HostedRpcReply failed: {e:?}"))
1430                })?;
1431                let command: IpcCommand = deserialize(&reply_bytes).map_err(|e| {
1432                    HostedRpcError::Transport(format!("decode HostedRpcReply failed: {e:?}"))
1433                })?;
1434                match command {
1435                    IpcCommand::HostedRpcReply {
1436                        request_id: reply_id,
1437                        body,
1438                    } => {
1439                        if reply_id != request_id {
1440                            return Err(HostedRpcError::Transport(format!(
1441                                "HostedRpcReply request_id mismatch: expected {request_id}, got {reply_id}"
1442                            )));
1443                        }
1444                        match body {
1445                            HostedRpcReplyBody::Ok { result_bytes } => Ok(result_bytes),
1446                            HostedRpcReplyBody::Err { message } => {
1447                                Err(HostedRpcError::Dispatch(message))
1448                            }
1449                        }
1450                    }
1451                    other => Err(HostedRpcError::Transport(format!(
1452                        "unexpected IpcCommand while waiting for HostedRpcReply: {other:?}"
1453                    ))),
1454                }
1455            })
1456        })
1457    }
1458}