Skip to main content

test_r_core/
tokio.rs

1use crate::args::{Arguments, TimeThreshold};
2use crate::bench::AsyncBencher;
3use crate::execution::{DepWireBytes, TestExecution, TestSuiteExecution};
4use crate::internal;
5use crate::internal::{
6    generate_tests, get_ensure_time, CapturedOutput, CloneableCodec, FailureCause,
7    FlakinessControl, HostedRpcChannel, HostedRpcError, HostedRpcOwnerCell, HostedRpcTransport,
8    InProcessHostedRpcTransport, RegisteredTest, RpcFactory, SuiteResult, TestFunction, TestResult,
9    WorkerReconstructor,
10};
11use crate::ipc::{
12    ipc_name, read_frame_async, write_frame_async, HostedRpcReplyBody, IpcCommand, IpcResponse,
13};
14use crate::output::{test_runner_output, TestRunnerOutput};
15use desert_rust::{deserialize, serialize_to_byte_vec};
16use futures::FutureExt;
17use interprocess::local_socket::tokio::prelude::*;
18use interprocess::local_socket::tokio::{Listener, Stream};
19use interprocess::local_socket::{GenericNamespaced, ListenerOptions};
20use std::any::Any;
21use std::collections::HashMap;
22use std::collections::VecDeque;
23use std::future::Future;
24use std::panic::AssertUnwindSafe;
25use std::pin::Pin;
26use std::process::{ExitCode, Stdio};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
30use tokio::process::{Child, Command};
31use tokio::spawn;
32use tokio::sync::Mutex;
33use tokio::task::{spawn_blocking, JoinHandle, JoinSet};
34use tokio::time::Instant;
35use uuid::Uuid;
36
37pub fn test_runner() -> ExitCode {
38    tokio::runtime::Builder::new_multi_thread()
39        .enable_all()
40        .build()
41        .unwrap()
42        .block_on(async_test_runner())
43}
44
45#[allow(clippy::await_holding_lock)]
46async fn async_test_runner() -> ExitCode {
47    crate::panic_hook::install_panic_hook();
48    let mut args = Arguments::from_args();
49    // When the parent spawned this process as a worker it passed
50    // `--worker-index <N>`. Stash it so `crate::worker::worker_index()`
51    // returns the correct value for PerWorker dep constructors.
52    if let Some(idx) = args.worker_index {
53        crate::worker::set_worker_index(idx);
54    }
55    let output = test_runner_output(&args);
56
57    let registered_tests = internal::REGISTERED_TESTS.lock().unwrap();
58    let registered_dependency_constructors =
59        internal::REGISTERED_DEPENDENCY_CONSTRUCTORS.lock().unwrap();
60    let registered_testsuite_props = internal::REGISTERED_TESTSUITE_PROPS.lock().unwrap();
61    let registered_test_generators = internal::REGISTERED_TEST_GENERATORS.lock().unwrap();
62
63    let generated_tests = generate_tests(&registered_test_generators).await;
64
65    let all_tests: Vec<RegisteredTest> = registered_tests
66        .iter()
67        .cloned()
68        .chain(generated_tests)
69        .collect();
70
71    if args.list {
72        output.test_list(&all_tests);
73        ExitCode::SUCCESS
74    } else {
75        let mut remaining_retries = args.flaky_run.unwrap_or(1);
76
77        let mut exit_code = ExitCode::from(101);
78        while remaining_retries > 0 {
79            let (mut execution, filtered_tests) = TestSuiteExecution::construct(
80                &args,
81                registered_dependency_constructors.as_slice(),
82                &all_tests,
83                registered_testsuite_props.as_slice(),
84            );
85            args.finalize_for_execution(&execution, output.clone());
86            let is_top_level_parent = args.is_top_level_parent();
87            let has_selected_tests = execution.remaining() > 0;
88            // Parent-side collection for dependency scopes whose worker-side
89            // value is shipped as bytes or represented as an RPC stub. Async
90            // constructors are awaited here, before workers receive their
91            // reconstructed values. Skip it for empty filtered runs.
92            let needs_parent_shared = execution.has_cloneable_dependencies()
93                || execution.has_hosted_dependencies()
94                || execution.has_hosted_rpc_dependencies();
95            let parent_shared = if is_top_level_parent && has_selected_tests && needs_parent_shared
96            {
97                execution.collect_parent_shared_dependencies_async().await
98            } else {
99                crate::execution::ParentSharedDependencies {
100                    cloneable_wire_bytes: Vec::new(),
101                    cloneable_local_values: Vec::new(),
102                    hosted_descriptor_bytes: Vec::new(),
103                    hosted_owners: Vec::new(),
104                    hosted_rpc_owner_cells: Vec::new(),
105                    parent_constructed_shared_values: Vec::new(),
106                }
107            };
108            let cloneable_wire_bytes = parent_shared.cloneable_wire_bytes;
109            let cloneable_local_values = parent_shared.cloneable_local_values;
110            let hosted_descriptor_bytes = parent_shared.hosted_descriptor_bytes;
111            let _hosted_owners = parent_shared.hosted_owners;
112            let hosted_rpc_owner_cells: HashMap<String, Arc<HostedRpcOwnerCell>> =
113                parent_shared.hosted_rpc_owner_cells.into_iter().collect();
114            let parent_constructed_shared_values = parent_shared.parent_constructed_shared_values;
115            // Pre-built RpcFactory lookup keyed by qualified id, so worker
116            // subprocesses can build stubs without re-locking the global
117            // REGISTERED_DEPENDENCY_CONSTRUCTORS.
118            let rpc_factories: HashMap<String, RpcFactory> = registered_dependency_constructors
119                .iter()
120                .filter_map(|d| {
121                    if d.scope == crate::internal::DepScope::HostedRpc {
122                        d.rpc_factory
123                            .as_ref()
124                            .map(|f| (d.qualified_id(), f.clone()))
125                    } else {
126                        None
127                    }
128                })
129                .collect();
130            // Build a combined Cloneable + Hosted codec/worker lookup table
131            // now, before `test_thread` workers are spawned (see sync.rs
132            // for rationale). Keyed by the dep's fully-qualified id
133            // (`{crate}::{module}::{name}`) so workers can route an incoming
134            // `ProvideCloneable` / `ProvideHostedDescriptor` to the correct
135            // dep even when two deps share a local `name` in different
136            // modules.
137            let cloneable_codecs: HashMap<String, (CloneableCodec, WorkerReconstructor)> =
138                registered_dependency_constructors
139                    .iter()
140                    .filter_map(|d| {
141                        let codec_opt = match d.scope {
142                            crate::internal::DepScope::Cloneable => d.cloneable_codec.as_ref(),
143                            crate::internal::DepScope::Hosted => d.hosted_codec.as_ref(),
144                            _ => None,
145                        };
146                        match (codec_opt, &d.worker_fn) {
147                            (Some(codec), Some(worker_fn)) => {
148                                Some((d.qualified_id(), (codec.clone(), worker_fn.clone())))
149                            }
150                            _ => None,
151                        }
152                    })
153                    .collect();
154            // Mode-consistent Cloneable semantics for the no-spawn-workers
155            // path (e.g. `--nocapture`): reuse the parent-constructed value
156            // directly instead of re-running the user constructor in
157            // `materialize_deps`. Without this, a Cloneable dep's
158            // constructor would run twice (once for parent-side
159            // `collect_parent_shared_dependencies_async`, once for the
160            // in-process test execution), which both violates the
161            // "constructor runs once" expectation and can deadlock when
162            // the constructor takes a runtime-wide lock.
163            if is_top_level_parent && !args.spawn_workers && !cloneable_local_values.is_empty() {
164                apply_cloneable_values_locally(&mut execution, &cloneable_local_values);
165            }
166            // Mode-consistent Hosted semantics: when this is the top-level
167            // parent AND we do NOT spawn workers (e.g. --nocapture), the
168            // test functions run in this same process, but they must still
169            // see the *worker-side handle* produced by
170            // `HostedDep::from_descriptor`. Reconstruct each handle locally
171            // via the descriptor round-trip and pre-populate the execution
172            // tree.
173            if is_top_level_parent && !args.spawn_workers && !hosted_descriptor_bytes.is_empty() {
174                apply_hosted_descriptors_locally(
175                    &mut execution,
176                    &cloneable_codecs,
177                    &hosted_descriptor_bytes,
178                )
179                .await;
180            }
181            // Mode-consistent HostedRpc semantics for the no-spawn-workers
182            // path: install in-process stubs that route straight to the
183            // parent-held owner cells, so tests see the same `Stub` value
184            // whether or not the runner spawns workers.
185            if is_top_level_parent && !args.spawn_workers && !hosted_rpc_owner_cells.is_empty() {
186                install_local_hosted_rpc_stubs(
187                    &mut execution,
188                    &rpc_factories,
189                    &hosted_rpc_owner_cells,
190                );
191            }
192            // Mirror of `sync::apply_parent_constructed_shared_values_locally`:
193            // in no-spawn-workers mode, install any `Shared`/`PerWorker` dep
194            // values the parent had to construct as transitive inputs to a
195            // Cloneable/Hosted/HostedRpc dep. The in-process test thread's
196            // `materialize_deps` then reuses them instead of re-running the
197            // constructor in the same process.
198            if is_top_level_parent
199                && !args.spawn_workers
200                && !parent_constructed_shared_values.is_empty()
201            {
202                apply_parent_constructed_shared_values_locally(
203                    &mut execution,
204                    &parent_constructed_shared_values,
205                );
206            }
207            if args.spawn_workers {
208                execution.skip_creating_dependencies();
209            }
210
211            // println!("Execution plan: {execution:?}");
212            // println!("Final args: {args:?}");
213            // println!("Has dependencies: {:?}", execution.has_dependencies());
214
215            let count = execution.remaining();
216            let results = Arc::new(Mutex::new(Vec::with_capacity(count)));
217
218            let start = Instant::now();
219            output.start_suite(&filtered_tests);
220
221            let execution = Arc::new(Mutex::new(execution));
222            let cloneable_wire_bytes = Arc::new(cloneable_wire_bytes);
223            let hosted_descriptor_bytes = Arc::new(hosted_descriptor_bytes);
224            let cloneable_codecs = Arc::new(cloneable_codecs);
225            let rpc_factories = Arc::new(rpc_factories);
226            let hosted_rpc_owner_cells = Arc::new(hosted_rpc_owner_cells);
227            let mut join_set = JoinSet::new();
228            let threads = args.test_threads().get();
229
230            for worker_idx in 0..threads {
231                let execution_clone = execution.clone();
232                let output_clone = output.clone();
233                // Stamp each test-thread's args with the worker index it will
234                // hand to its spawned child via `--worker-index <N>`.
235                let mut args_clone = args.clone();
236                if args_clone.spawn_workers {
237                    args_clone.worker_index = Some(worker_idx);
238                }
239                let results_clone = results.clone();
240                let wire_bytes_clone = cloneable_wire_bytes.clone();
241                let hosted_bytes_clone = hosted_descriptor_bytes.clone();
242                let codecs_clone = cloneable_codecs.clone();
243                let rpc_factories_clone = rpc_factories.clone();
244                let hosted_rpc_owner_cells_clone = hosted_rpc_owner_cells.clone();
245                let handle = tokio::runtime::Handle::current();
246                join_set.spawn_blocking(move || {
247                    handle.block_on(test_thread(
248                        args_clone,
249                        execution_clone,
250                        output_clone,
251                        count,
252                        results_clone,
253                        wire_bytes_clone,
254                        hosted_bytes_clone,
255                        codecs_clone,
256                        rpc_factories_clone,
257                        hosted_rpc_owner_cells_clone,
258                    ))
259                });
260            }
261
262            while let Some(res) = join_set.join_next().await {
263                res.expect("Failed to join task");
264            }
265
266            drop(execution);
267
268            let results = results.lock().await;
269            output.finished_suite(&all_tests, &results, start.elapsed());
270            exit_code = SuiteResult::exit_code(&results);
271
272            if exit_code == ExitCode::SUCCESS {
273                break;
274            } else {
275                remaining_retries -= 1;
276            }
277        }
278        exit_code
279    }
280}
281
282#[allow(clippy::too_many_arguments)]
283async fn test_thread(
284    args: Arguments,
285    execution: Arc<Mutex<TestSuiteExecution>>,
286    output: Arc<dyn TestRunnerOutput>,
287    count: usize,
288    results: Arc<Mutex<Vec<(RegisteredTest, TestResult)>>>,
289    cloneable_wire_bytes: Arc<Vec<DepWireBytes>>,
290    hosted_descriptor_bytes: Arc<Vec<DepWireBytes>>,
291    cloneable_codecs: Arc<HashMap<String, (CloneableCodec, WorkerReconstructor)>>,
292    rpc_factories: Arc<HashMap<String, RpcFactory>>,
293    hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
294) {
295    let mut worker = spawn_worker_if_needed(&args).await;
296    // Parent dispatches incoming `HostedRpcCall` frames against the owner
297    // cells materialised in the top-level parent. Workers don't need the owner
298    // cells (they own stubs instead), so they receive an empty map and the
299    // dispatch code path is never reached in subprocesses.
300    if let Some(worker) = worker.as_mut() {
301        worker.set_hosted_rpc_owner_cells(hosted_rpc_owner_cells.clone());
302    }
303    let connection_arc = if let Some(ref name) = args.ipc {
304        let name = ipc_name(name.clone());
305        let stream = Stream::connect(name)
306            .await
307            .expect("Failed to connect to IPC socket");
308        Some(Arc::new(Mutex::new(stream)))
309    } else {
310        None
311    };
312
313    if let Some(worker) = worker.as_mut() {
314        for (dep_id, wire_bytes) in cloneable_wire_bytes.iter() {
315            worker
316                .provide_cloneable(dep_id.clone(), wire_bytes.clone())
317                .await;
318        }
319        // Ship every Hosted dep's descriptor bytes too.
320        for (dep_id, descriptor_bytes) in hosted_descriptor_bytes.iter() {
321            worker
322                .provide_hosted_descriptor(dep_id.clone(), descriptor_bytes.clone())
323                .await;
324        }
325    }
326
327    // Worker subprocess side: build a stub for every HostedRpc dep registered
328    // in this binary using the IPC-backed transport sharing the same socket as
329    // the main IPC loop. Install the stubs in the execution tree so dependency
330    // materialisation skips the parent-only owner constructor.
331    if let Some(connection) = connection_arc.as_ref() {
332        if !rpc_factories.is_empty() {
333            install_worker_subprocess_hosted_rpc_stubs(
334                &execution,
335                &rpc_factories,
336                connection.clone(),
337            )
338            .await;
339        }
340    }
341
342    let mut expected_test = None;
343
344    while !is_done(&execution).await {
345        if let Some(connection) = connection_arc.as_ref() {
346            while expected_test.is_none() {
347                let mut conn = connection.lock().await;
348                let command_bytes = read_frame_async(&mut *conn)
349                    .await
350                    .expect("Failed to read IPC command frame");
351                drop(conn);
352                let command: IpcCommand =
353                    deserialize(&command_bytes).expect("Failed to decode IPC command");
354
355                match command {
356                    IpcCommand::RunTest {
357                        name,
358                        crate_name,
359                        module_path,
360                    } => {
361                        expected_test = Some((name, crate_name, module_path));
362                    }
363                    IpcCommand::ProvideCloneable { dep_id, wire_bytes } => {
364                        // Worker-side reconstruction (see sync.rs::apply_provided_wire_bytes).
365                        apply_provided_wire_bytes(
366                            &execution,
367                            &cloneable_codecs,
368                            &dep_id,
369                            &wire_bytes,
370                            "ProvideCloneable",
371                        )
372                        .await;
373                        let response = IpcResponse::CloneableAccepted { dep_id };
374                        let msg = serialize_to_byte_vec(&response)
375                            .expect("Failed to encode IPC response");
376                        let mut conn = connection.lock().await;
377                        write_frame_async(&mut *conn, &msg)
378                            .await
379                            .expect("Failed to write IPC response frame");
380                    }
381                    IpcCommand::ProvideHostedDescriptor { dep_id, wire_bytes } => {
382                        // Worker-side reconstruction: same shape as
383                        // ProvideCloneable but routed through the registered
384                        // HostedDep worker_fn.
385                        apply_provided_wire_bytes(
386                            &execution,
387                            &cloneable_codecs,
388                            &dep_id,
389                            &wire_bytes,
390                            "ProvideHostedDescriptor",
391                        )
392                        .await;
393                        let response = IpcResponse::HostedDescriptorAccepted { dep_id };
394                        let msg = serialize_to_byte_vec(&response)
395                            .expect("Failed to encode IPC response");
396                        let mut conn = connection.lock().await;
397                        write_frame_async(&mut *conn, &msg)
398                            .await
399                            .expect("Failed to write IPC response frame");
400                    }
401                    IpcCommand::HostedRpcReply { .. } => {
402                        // HR1.2: replies for worker-initiated HostedRpc calls
403                        // are consumed inline by the IPC transport during
404                        // test execution, never by this between-tests
405                        // command loop. Receiving one here means the
406                        // protocol got out of sync; surface that loudly
407                        // rather than dropping the frame.
408                        panic!(
409                            "unexpected `HostedRpcReply` while waiting for the next \
410                             between-tests command in the tokio worker subprocess: a \
411                             stub call must have left a reply on the wire without \
412                             draining it inline"
413                        );
414                    }
415                }
416            }
417        }
418
419        if let Some(next) = pick_next(&execution).await {
420            let skip = if let Some((name, crate_name, module_path)) = &expected_test {
421                next.test.name != *name
422                    || next.test.crate_name != *crate_name
423                    || next.test.module_path != *module_path
424            } else {
425                false
426            };
427
428            if !skip {
429                expected_test = None;
430
431                let ensure_time = get_ensure_time(&args, &next.test);
432
433                output.start_running_test(&next.test, next.index, count);
434                let result = run_test(
435                    output.clone(),
436                    next.index,
437                    count,
438                    args.nocapture,
439                    args.include_ignored,
440                    ensure_time,
441                    next.deps.clone(),
442                    &next.test,
443                    &mut worker,
444                )
445                .await;
446                output.finished_running_test(&next.test, next.index, count, &result);
447
448                if let Some(connection) = connection_arc.as_ref() {
449                    let finish_marker = Uuid::new_v4().to_string();
450                    let finish_marker_line = format!("{finish_marker}\n");
451                    tokio::io::stdout()
452                        .write_all(finish_marker_line.as_bytes())
453                        .await
454                        .unwrap();
455                    tokio::io::stderr()
456                        .write_all(finish_marker_line.as_bytes())
457                        .await
458                        .unwrap();
459                    tokio::io::stdout().flush().await.unwrap();
460                    tokio::io::stderr().flush().await.unwrap();
461
462                    let response = IpcResponse::TestFinished {
463                        result: (&result).into(),
464                        finish_marker,
465                    };
466                    let msg =
467                        serialize_to_byte_vec(&response).expect("Failed to encode IPC response");
468                    let mut conn = connection.lock().await;
469                    write_frame_async(&mut *conn, &msg)
470                        .await
471                        .expect("Failed to write IPC response frame");
472                }
473
474                results.lock().await.push((next.test.clone(), result));
475            }
476        }
477    }
478}
479
480async fn is_done(execution: &Arc<Mutex<TestSuiteExecution>>) -> bool {
481    let execution = execution.lock().await;
482    execution.is_done()
483}
484
485/// Async counterpart to `sync::apply_provided_wire_bytes`. Decodes the wire
486/// bytes into a worker-side dependency value (looked up by the dep's
487/// fully-qualified id `{crate}::{module}::{name}`) and stores it in the
488/// execution tree so the next `materialize_deps` call uses the pre-resolved
489/// value.
490///
491/// `source_command` is the textual name of the IPC command that delivered
492/// the bytes (`"ProvideCloneable"` or `"ProvideHostedDescriptor"`); used
493/// only in panic messages.
494async fn apply_provided_wire_bytes(
495    execution: &Arc<Mutex<TestSuiteExecution>>,
496    wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
497    dep_id: &str,
498    wire_bytes: &[u8],
499    source_command: &str,
500) {
501    let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
502        panic!("{source_command} referenced unknown wire-shipped dep '{dep_id}'")
503    });
504
505    let wire_payload = (codec.from_wire_bytes)(wire_bytes);
506    let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
507        Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
508    let reconstructed = match worker_fn {
509        WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
510        WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
511    };
512
513    let mut execution = execution.lock().await;
514    let applied = execution.provide_cloneable_value(dep_id, reconstructed);
515    assert!(
516        applied,
517        "{source_command} for dep '{dep_id}' did not match any registered dep in this worker"
518    );
519}
520
521/// Mode-consistent Cloneable semantics for the no-spawn-workers path on
522/// the tokio runner. Mirrors `sync::apply_cloneable_values_locally`: takes
523/// the parent-constructed Cloneable values and installs them directly
524/// into the parent's `TestSuiteExecution`, so `materialize_deps` reuses
525/// them instead of re-running the user constructor.
526///
527/// For `Cloneable`, the documented round-trip
528/// `from_wire(to_wire(value))` is semantics-preserving, so reusing the
529/// parent value directly is equivalent to round-tripping it through the
530/// wire codec while avoiding the duplicate constructor run that would
531/// otherwise occur on the no-spawn-workers code path. The duplicate run
532/// historically caused user-visible problems (extra observable side
533/// effects under `--nocapture`, and deadlocks when the constructor takes
534/// a runtime-wide lock).
535fn apply_cloneable_values_locally(
536    execution: &mut TestSuiteExecution,
537    cloneable_local_values: &[(String, Arc<dyn Any + Send + Sync>)],
538) {
539    for (dep_id, value) in cloneable_local_values {
540        let applied = execution.provide_cloneable_value(dep_id, value.clone());
541        assert!(
542            applied,
543            "Cloneable dep '{dep_id}' could not be pre-populated locally"
544        );
545    }
546}
547
548/// Tokio counterpart to `sync::apply_parent_constructed_shared_values_locally`.
549/// In no-spawn-workers mode, installs `Shared`/`PerWorker` dep values that
550/// the parent had to construct as transitive inputs to a
551/// Cloneable/Hosted/HostedRpc dep, so the in-process test thread reuses
552/// them instead of re-running the constructor in the same process.
553fn apply_parent_constructed_shared_values_locally(
554    execution: &mut TestSuiteExecution,
555    values: &[(String, Arc<dyn Any + Send + Sync>)],
556) {
557    for (dep_id, value) in values {
558        let applied = execution.provide_materialized_shared_value(dep_id, value.clone());
559        assert!(
560            applied,
561            "Shared/PerWorker dep '{dep_id}' could not be pre-populated locally"
562        );
563    }
564}
565
566/// Mode-consistent Hosted semantics for the no-spawn-workers path on the
567/// tokio runner. Mirrors `sync::apply_hosted_descriptors_locally`: takes
568/// the parent-collected descriptor bytes and reconstructs each Hosted
569/// dep's worker-side handle (via the registered codec + worker_fn)
570/// directly in the parent's `TestSuiteExecution`.
571///
572/// Both `WorkerReconstructor::Sync` (`HostedDep::from_descriptor`) and
573/// `WorkerReconstructor::Async` (`AsyncHostedDep::from_descriptor`) are
574/// supported here so that `async_worker` Hosted deps see the same
575/// worker-side handle whether the runner ended up in spawned-worker mode
576/// or in the no-spawn fallback, matching the documented mode-consistent
577/// Hosted contract in `book/src/advanced_features/dependency_sharing.md`.
578async fn apply_hosted_descriptors_locally(
579    execution: &mut TestSuiteExecution,
580    wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
581    descriptor_bytes: &[DepWireBytes],
582) {
583    for (dep_id, wire_bytes) in descriptor_bytes {
584        let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
585            panic!("Hosted dep '{dep_id}' missing codec/worker_fn for local handle reconstruction")
586        });
587        let wire_payload = (codec.from_wire_bytes)(wire_bytes);
588        let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
589            Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
590        let reconstructed = match worker_fn {
591            WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
592            WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
593        };
594        let applied = execution.provide_cloneable_value(dep_id, reconstructed);
595        assert!(
596            applied,
597            "Hosted dep '{dep_id}' could not be pre-populated locally"
598        );
599    }
600}
601
602async fn pick_next(execution: &Arc<Mutex<TestSuiteExecution>>) -> Option<TestExecution> {
603    let mut execution = execution.lock().await;
604    execution.pick_next().await
605}
606
607async fn run_with_flakiness_control<F>(
608    output: Arc<dyn TestRunnerOutput>,
609    test_description: &RegisteredTest,
610    idx: usize,
611    count: usize,
612    test: F,
613) -> Result<Result<(), FailureCause>, Box<dyn Any + Send>>
614where
615    F: Fn(
616            Instant,
617        )
618            -> Pin<Box<dyn Future<Output = Result<Result<(), FailureCause>, Box<dyn Any + Send>>>>>
619        + Send
620        + Sync,
621{
622    match &test_description.props.flakiness_control {
623        FlakinessControl::None => {
624            let start = Instant::now();
625            test(start).await
626        }
627        FlakinessControl::ProveNonFlaky(tries) => {
628            for n in 0..*tries {
629                if n > 0 {
630                    output.repeat_running_test(
631                        test_description,
632                        idx,
633                        count,
634                        n + 1,
635                        *tries,
636                        "to ensure test is not flaky",
637                    );
638                }
639                let start = Instant::now();
640                match test(start).await {
641                    Ok(Ok(())) => {}
642                    Ok(Err(e)) => return Ok(Err(e)),
643                    Err(e) => return Err(e),
644                };
645            }
646            Ok(Ok(()))
647        }
648        FlakinessControl::RetryKnownFlaky(max_retries) => {
649            let mut tries = 1;
650            loop {
651                let start = Instant::now();
652                let result = test(start).await;
653
654                if result.is_err() && tries < *max_retries {
655                    tries += 1;
656                    output.repeat_running_test(
657                        test_description,
658                        idx,
659                        count,
660                        tries,
661                        *max_retries,
662                        "because test is known to be flaky",
663                    );
664                } else {
665                    break result;
666                }
667            }
668        }
669    }
670}
671
672#[allow(clippy::too_many_arguments)]
673async fn run_test(
674    output: Arc<dyn TestRunnerOutput>,
675    idx: usize,
676    count: usize,
677    nocapture: bool,
678    include_ignored: bool,
679    ensure_time: Option<TimeThreshold>,
680    dependency_view: Arc<dyn internal::DependencyView + Send + Sync>,
681    test: &RegisteredTest,
682    worker: &mut Option<Worker>,
683) -> TestResult {
684    if test.props.is_ignored && !include_ignored {
685        TestResult::ignored()
686    } else if let Some(worker) = worker.as_mut() {
687        worker.run_test(nocapture, test).await
688    } else {
689        let start = Instant::now();
690        let test = test.clone();
691        match &test.run {
692            TestFunction::Sync(_) => {
693                let handle = spawn_blocking(move || {
694                    let test = test.clone();
695                    crate::sync::run_sync_test_function(
696                        output,
697                        &test,
698                        idx,
699                        count,
700                        ensure_time,
701                        dependency_view,
702                    )
703                });
704                handle.await.unwrap_or_else(|join_error| {
705                    TestResult::failed(
706                        start.elapsed(),
707                        FailureCause::HarnessError(format!(
708                            "Failed joining test task: {join_error}"
709                        )),
710                    )
711                })
712            }
713            TestFunction::Async(test_fn) => {
714                let timeout = test.props.timeout;
715                let test_fn = test_fn.clone();
716                let detached_panic_policy = test.props.detached_panic_policy.clone();
717                let result = run_with_flakiness_control(output, &test, idx, count, |start| {
718                    let dependency_view = dependency_view.clone();
719                    let test_fn = test_fn.clone();
720                    Box::pin(async move {
721                        let test_id = crate::panic_hook::next_test_id();
722                        crate::panic_hook::set_current_test_id(test_id);
723                        crate::panic_hook::create_detached_collector(test_id);
724                        let result = AssertUnwindSafe(Box::pin(async move {
725                            match timeout {
726                                None => test_fn(dependency_view).await,
727                                Some(duration) => {
728                                    let result =
729                                        tokio::time::timeout(duration, test_fn(dependency_view))
730                                            .await;
731                                    match result {
732                                        Ok(result) => result,
733                                        Err(_) => {
734                                            return Err(FailureCause::HarnessError(
735                                                "Test timed out".to_string(),
736                                            ))
737                                        }
738                                    }
739                                }
740                            }
741                            .into_result()?;
742                            if let Some(ensure_time) = ensure_time {
743                                let elapsed = start.elapsed();
744                                if ensure_time.is_critical(&elapsed) {
745                                    return Err(FailureCause::HarnessError(format!(
746                                        "Test run time exceeds critical threshold: {elapsed:?}"
747                                    )));
748                                }
749                            }
750                            Ok(())
751                        }))
752                        .catch_unwind()
753                        .await;
754                        result
755                    })
756                })
757                .await;
758                let mut test_result =
759                    TestResult::from_result(&test.props.should_panic, start.elapsed(), result);
760                if let Some(test_id) = crate::panic_hook::current_test_id() {
761                    if let Some(collector) = crate::panic_hook::take_detached_collector(test_id) {
762                        let panics = match collector.lock() {
763                            Ok(p) => p,
764                            Err(poisoned) => poisoned.into_inner(),
765                        };
766                        if !panics.is_empty()
767                            && detached_panic_policy == internal::DetachedPanicPolicy::FailTest
768                            && test_result.is_passed()
769                        {
770                            let messages: Vec<String> = panics.iter().map(|p| p.render()).collect();
771                            test_result = TestResult::failed(
772                                start.elapsed(),
773                                FailureCause::Panic(internal::PanicCause {
774                                    message: Some(format!(
775                                        "Detached task(s) panicked:\n{}",
776                                        messages.join("\n---\n")
777                                    )),
778                                    location: panics.first().and_then(|p| p.location.clone()),
779                                    backtrace: panics.first().and_then(|p| p.backtrace.clone()),
780                                }),
781                            );
782                        }
783                    }
784                }
785                crate::panic_hook::clear_current_test_id();
786                test_result
787            }
788            TestFunction::SyncBench(_) => {
789                let handle = spawn_blocking(move || {
790                    let test = test.clone();
791                    crate::sync::run_sync_test_function(
792                        output,
793                        &test,
794                        idx,
795                        count,
796                        ensure_time,
797                        dependency_view,
798                    )
799                });
800                handle.await.unwrap_or_else(|join_error| {
801                    TestResult::failed(
802                        start.elapsed(),
803                        FailureCause::HarnessError(format!(
804                            "Failed joining test task: {join_error}"
805                        )),
806                    )
807                })
808            }
809            TestFunction::AsyncBench(bench_fn) => {
810                let mut bencher = AsyncBencher::new();
811                let test_id = crate::panic_hook::next_test_id();
812                crate::panic_hook::set_current_test_id(test_id);
813                let result = AssertUnwindSafe(async move {
814                    bench_fn(&mut bencher, dependency_view).await;
815                    (
816                        bencher
817                            .summary()
818                            .expect("iter() was not called in bench function"),
819                        bencher.bytes,
820                    )
821                })
822                .catch_unwind()
823                .await;
824                let bytes = result.as_ref().map(|(_, bytes)| *bytes).unwrap_or_default();
825                let test_result = TestResult::from_summary(
826                    &test.props.should_panic,
827                    start.elapsed(),
828                    result.map(|(summary, _)| summary),
829                    bytes,
830                );
831                crate::panic_hook::clear_current_test_id();
832                test_result
833            }
834        }
835    }
836}
837
838struct Worker {
839    _listener: Listener,
840    _process: Child,
841    _out_handle: JoinHandle<()>,
842    _err_handle: JoinHandle<()>,
843    out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
844    err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
845    capture_enabled: Arc<Mutex<bool>>,
846    connection: Stream,
847    /// Parent-held HostedRpc owner cells keyed by fully-qualified dep id. Used
848    /// to dispatch incoming `IpcResponse::HostedRpcCall` frames from the worker
849    /// subprocess back to the right owner.
850    hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
851}
852
853impl Worker {
854    /// Installs the parent-side map of HostedRpc owner cells so this worker can
855    /// route incoming `IpcResponse::HostedRpcCall` frames to the right
856    /// `HostedRpcOwnerCell` while waiting for a worker subprocess response.
857    fn set_hosted_rpc_owner_cells(&mut self, cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>) {
858        self.hosted_rpc_owner_cells = cells;
859    }
860
861    /// Parent-side dispatcher for a single `IpcResponse::HostedRpcCall`. Looks
862    /// up the owner cell by fully-qualified dep id, runs the dispatch on the
863    /// parent's stored owner, and writes the matching
864    /// `IpcCommand::HostedRpcReply` back to the worker subprocess. Mirrors
865    /// `sync::Worker::handle_hosted_rpc_call`.
866    async fn handle_hosted_rpc_call(
867        &mut self,
868        dump_on_ipc_failure: &DumpOnFailure,
869        request_id: u64,
870        dep_id: String,
871        method_idx: u32,
872        args_bytes: Vec<u8>,
873    ) {
874        let body = match self.hosted_rpc_owner_cells.get(&dep_id) {
875            // Use the async dispatch entry point so an owner that implements
876            // `AsyncHostedRpcDep` directly can `.await` inside its dispatcher
877            // without blocking the tokio runtime. Sync owners reach this
878            // entry point through the blanket bridge and their dispatched
879            // future resolves immediately.
880            Some(cell) => match cell.dispatch_async(method_idx, &args_bytes).await {
881                Ok(result_bytes) => HostedRpcReplyBody::Ok { result_bytes },
882                Err(message) => HostedRpcReplyBody::Err { message },
883            },
884            None => HostedRpcReplyBody::Err {
885                message: format!(
886                    "HostedRpc dispatch: unknown dep id '{dep_id}' in parent owner-cell map"
887                ),
888            },
889        };
890        let reply = IpcCommand::HostedRpcReply { request_id, body };
891        let msg = serialize_to_byte_vec(&reply).expect("Failed to encode HostedRpcReply");
892        dump_on_ipc_failure
893            .run(write_frame_async(&mut self.connection, &msg).await)
894            .await;
895    }
896
897    pub async fn run_test(&mut self, nocapture: bool, test: &RegisteredTest) -> TestResult {
898        let mut capture_enabled = self.capture_enabled.lock().await;
899        *capture_enabled = test.props.capture_control.requires_capturing(!nocapture);
900        drop(capture_enabled);
901
902        // Send IPC command and wait for IPC response, and in the meantime read from the stdout/stderr channels
903        let cmd = IpcCommand::RunTest {
904            name: test.name.clone(),
905            crate_name: test.crate_name.clone(),
906            module_path: test.module_path.clone(),
907        };
908
909        let dump_on_ipc_failure = self.dump_on_failure();
910
911        let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
912        dump_on_ipc_failure
913            .run(write_frame_async(&mut self.connection, &msg).await)
914            .await;
915
916        let response = loop {
917            let response_bytes = dump_on_ipc_failure
918                .run(read_frame_async(&mut self.connection).await)
919                .await;
920            let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
921            match response {
922                IpcResponse::TestFinished { .. } => break response,
923                IpcResponse::CloneableAccepted { .. }
924                | IpcResponse::HostedDescriptorAccepted { .. } => continue,
925                IpcResponse::HostedRpcCall {
926                    request_id,
927                    dep_id,
928                    method_idx,
929                    args_bytes,
930                } => {
931                    self.handle_hosted_rpc_call(
932                        &dump_on_ipc_failure,
933                        request_id,
934                        dep_id,
935                        method_idx,
936                        args_bytes,
937                    )
938                    .await;
939                    continue;
940                }
941            }
942        };
943
944        let IpcResponse::TestFinished {
945            result,
946            finish_marker,
947        } = response
948        else {
949            unreachable!("loop only breaks on TestFinished")
950        };
951
952        if test.props.capture_control.requires_capturing(!nocapture) {
953            let out_lines: Vec<_> =
954                Self::drain_until(self.out_lines.clone(), finish_marker.clone()).await;
955            let err_lines: Vec<_> =
956                Self::drain_until(self.err_lines.clone(), finish_marker.clone()).await;
957            result.into_test_result(out_lines, err_lines)
958        } else {
959            result.into_test_result(Vec::new(), Vec::new())
960        }
961    }
962
963    /// Async counterpart to `sync::Worker::provide_cloneable`. `dep_id` is the
964    /// dep's fully-qualified id (`{crate}::{module}::{name}`).
965    async fn provide_cloneable(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
966        let dump_on_ipc_failure = self.dump_on_failure();
967        let cmd = IpcCommand::ProvideCloneable {
968            dep_id: dep_id.clone(),
969            wire_bytes,
970        };
971        let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
972        dump_on_ipc_failure
973            .run(write_frame_async(&mut self.connection, &msg).await)
974            .await;
975
976        loop {
977            let response_bytes = dump_on_ipc_failure
978                .run(read_frame_async(&mut self.connection).await)
979                .await;
980            let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
981            match response {
982                IpcResponse::CloneableAccepted { dep_id: ack_id } => {
983                    if ack_id == dep_id {
984                        return;
985                    }
986                }
987                IpcResponse::HostedDescriptorAccepted { .. } => {
988                    // Out-of-band ack from a previous ProvideHostedDescriptor; ignore.
989                }
990                IpcResponse::TestFinished { .. } => {
991                    // Should not happen before any RunTest.
992                }
993                IpcResponse::HostedRpcCall {
994                    request_id,
995                    dep_id: rpc_dep_id,
996                    method_idx,
997                    args_bytes,
998                } => {
999                    // A worker subprocess can issue a HostedRpc call from
1000                    // inside an in-progress test, even while the parent is
1001                    // mid-`ProvideCloneable` for a different dep. Dispatch it
1002                    // so the protocol doesn't desync.
1003                    self.handle_hosted_rpc_call(
1004                        &dump_on_ipc_failure,
1005                        request_id,
1006                        rpc_dep_id,
1007                        method_idx,
1008                        args_bytes,
1009                    )
1010                    .await;
1011                }
1012            }
1013        }
1014    }
1015
1016    /// Async counterpart to `sync::Worker::provide_hosted_descriptor`.
1017    async fn provide_hosted_descriptor(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
1018        let dump_on_ipc_failure = self.dump_on_failure();
1019        let cmd = IpcCommand::ProvideHostedDescriptor {
1020            dep_id: dep_id.clone(),
1021            wire_bytes,
1022        };
1023        let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
1024        dump_on_ipc_failure
1025            .run(write_frame_async(&mut self.connection, &msg).await)
1026            .await;
1027
1028        loop {
1029            let response_bytes = dump_on_ipc_failure
1030                .run(read_frame_async(&mut self.connection).await)
1031                .await;
1032            let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
1033            match response {
1034                IpcResponse::HostedDescriptorAccepted { dep_id: ack_id } => {
1035                    if ack_id == dep_id {
1036                        return;
1037                    }
1038                }
1039                IpcResponse::CloneableAccepted { .. } => {
1040                    // Out-of-band ack from a previous ProvideCloneable; ignore.
1041                }
1042                IpcResponse::TestFinished { .. } => {
1043                    // Should not happen before any RunTest.
1044                }
1045                IpcResponse::HostedRpcCall {
1046                    request_id,
1047                    dep_id: rpc_dep_id,
1048                    method_idx,
1049                    args_bytes,
1050                } => {
1051                    // See provide_cloneable arm. Dispatch the call inline so
1052                    // the IPC stream stays in sync.
1053                    self.handle_hosted_rpc_call(
1054                        &dump_on_ipc_failure,
1055                        request_id,
1056                        rpc_dep_id,
1057                        method_idx,
1058                        args_bytes,
1059                    )
1060                    .await;
1061                }
1062            }
1063        }
1064    }
1065
1066    fn dump_on_failure(&self) -> DumpOnFailure {
1067        DumpOnFailure {
1068            out_lines: self.out_lines.clone(),
1069            err_lines: self.err_lines.clone(),
1070        }
1071    }
1072
1073    async fn drain_until(
1074        source: Arc<Mutex<VecDeque<CapturedOutput>>>,
1075        finish_marker: String,
1076    ) -> Vec<CapturedOutput> {
1077        let mut result = Vec::new();
1078        loop {
1079            let mut source = source.lock().await;
1080            while let Some(line) = source.pop_front() {
1081                if line.line() == finish_marker {
1082                    return result;
1083                } else {
1084                    result.push(line.clone());
1085                }
1086            }
1087            drop(source);
1088
1089            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1090        }
1091    }
1092}
1093
1094struct DumpOnFailure {
1095    out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1096    err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1097}
1098
1099impl DumpOnFailure {
1100    pub async fn run<T, E>(&self, result: Result<T, E>) -> T {
1101        match result {
1102            Ok(value) => value,
1103            Err(_error) => {
1104                let out_lines: Vec<_> = self.out_lines.lock().await.drain(..).collect();
1105                let err_lines: Vec<_> = self.err_lines.lock().await.drain(..).collect();
1106                let mut all_lines = [out_lines, err_lines].concat();
1107                all_lines.sort();
1108
1109                for line in all_lines {
1110                    eprintln!("{}", line.line());
1111                }
1112
1113                std::process::exit(1);
1114            }
1115        }
1116    }
1117}
1118
1119async fn spawn_worker_if_needed(args: &Arguments) -> Option<Worker> {
1120    if args.spawn_workers {
1121        let id = Uuid::new_v4();
1122        let name_str = format!("{id}.sock");
1123        let name = name_str
1124            .clone()
1125            .to_ns_name::<GenericNamespaced>()
1126            .expect("Invalid local socket name");
1127        let opts = ListenerOptions::new().name(name.clone());
1128        let listener = opts
1129            .create_tokio()
1130            .expect("Failed to create local socket listener");
1131
1132        let exe = std::env::current_exe().expect("Failed to get current executable path");
1133
1134        let mut args = args.clone();
1135        args.ipc = Some(name_str);
1136        args.spawn_workers = false;
1137        args.logfile = None;
1138        let args = args.to_args();
1139
1140        let mut process = Command::new(exe)
1141            .args(args)
1142            .stdin(Stdio::inherit())
1143            .stderr(Stdio::piped())
1144            .stdout(Stdio::piped())
1145            .spawn()
1146            .expect("Failed to spawn worker process");
1147
1148        let stdout = process.stdout.take().unwrap();
1149        let stderr = process.stderr.take().unwrap();
1150
1151        let out_lines = Arc::new(Mutex::new(VecDeque::new()));
1152        let err_lines = Arc::new(Mutex::new(VecDeque::new()));
1153        let capture_enabled = Arc::new(Mutex::new(true));
1154
1155        let out_lines_clone = out_lines.clone();
1156        let capture_enabled_clone = capture_enabled.clone();
1157        let out_handle = spawn(async move {
1158            let reader = BufReader::new(stdout);
1159            let mut lines = reader.lines();
1160            while let Some(line) = lines
1161                .next_line()
1162                .await
1163                .expect("Failed to read from worker stdout")
1164            {
1165                if *capture_enabled_clone.lock().await {
1166                    out_lines_clone
1167                        .lock()
1168                        .await
1169                        .push_back(CapturedOutput::stdout(line));
1170                } else {
1171                    println!("{line}");
1172                }
1173            }
1174        });
1175
1176        let err_lines_clone = err_lines.clone();
1177        let capture_enabled_clone = capture_enabled.clone();
1178        let err_handle = spawn(async move {
1179            let reader = BufReader::new(stderr);
1180            let mut lines = reader.lines();
1181            while let Some(line) = lines
1182                .next_line()
1183                .await
1184                .expect("Failed to read from worker stderr")
1185            {
1186                if *capture_enabled_clone.lock().await {
1187                    err_lines_clone
1188                        .lock()
1189                        .await
1190                        .push_back(CapturedOutput::stderr(line));
1191                } else {
1192                    eprintln!("{line}");
1193                }
1194            }
1195        });
1196
1197        let connection = listener
1198            .accept()
1199            .await
1200            .expect("Failed to accept connection");
1201
1202        Some(Worker {
1203            _listener: listener,
1204            _process: process,
1205            _out_handle: out_handle,
1206            _err_handle: err_handle,
1207            out_lines,
1208            err_lines,
1209            connection,
1210            capture_enabled,
1211            hosted_rpc_owner_cells: Arc::new(HashMap::new()),
1212        })
1213    } else {
1214        None
1215    }
1216}
1217
1218/// Parent-side `--nocapture` / no-spawn-workers helper. Builds one stub per
1219/// HostedRpc dep using an [`InProcessHostedRpcTransport`] that points at the
1220/// parent-held owner cells, and stashes it in the execution tree so dependency
1221/// materialisation skips the owner-only constructor. Mirrors
1222/// `sync::install_local_hosted_rpc_stubs`.
1223fn install_local_hosted_rpc_stubs(
1224    execution: &mut TestSuiteExecution,
1225    rpc_factories: &HashMap<String, RpcFactory>,
1226    owner_cells: &HashMap<String, Arc<HostedRpcOwnerCell>>,
1227) {
1228    let transport: Arc<dyn HostedRpcTransport> =
1229        Arc::new(InProcessHostedRpcTransport::new(owner_cells.clone()));
1230    for (dep_id, factory) in rpc_factories.iter() {
1231        if !owner_cells.contains_key(dep_id) {
1232            // No owner cell materialised for this dep (e.g. registered
1233            // globally but not pulled into the current filter). Skip so
1234            // we don't install a stub nothing routes.
1235            continue;
1236        }
1237        let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1238        let stub = (factory.build_stub)(channel);
1239        let applied = execution.provide_cloneable_value(dep_id, stub);
1240        assert!(
1241            applied,
1242            "Local HostedRpc stub for '{dep_id}' did not match any registered dep"
1243        );
1244    }
1245}
1246
1247/// Worker subprocess helper. Builds one stub per registered HostedRpc dep backed
1248/// by [`IpcHostedRpcTransport`], and installs it in the execution tree. Mirrors
1249/// `sync::install_worker_subprocess_hosted_rpc_stubs` but runs on the tokio
1250/// `Arc<Mutex<Stream>>` connection.
1251async fn install_worker_subprocess_hosted_rpc_stubs(
1252    execution: &Arc<Mutex<TestSuiteExecution>>,
1253    rpc_factories: &HashMap<String, RpcFactory>,
1254    connection_arc: Arc<Mutex<Stream>>,
1255) {
1256    let transport: Arc<dyn HostedRpcTransport> =
1257        Arc::new(IpcHostedRpcTransport::new(connection_arc));
1258    for (dep_id, factory) in rpc_factories.iter() {
1259        let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1260        let stub = (factory.build_stub)(channel);
1261        let mut execution = execution.lock().await;
1262        let applied = execution.provide_cloneable_value(dep_id, stub);
1263        // Not every binary registers a HostedRpc dep that the current
1264        // execution actually uses; if so, just move on.
1265        let _ = applied;
1266    }
1267}
1268
1269/// Worker subprocess `HostedRpcTransport` for the tokio runner.
1270/// Mirrors `sync::IpcHostedRpcTransport` but bridges a sync trait method
1271/// to the async tokio IPC primitives via
1272/// `tokio::task::block_in_place` + `Handle::current().block_on(...)`.
1273///
1274/// The shared `Arc<Mutex<Stream>>` is the same one used by the
1275/// worker subprocess's main IPC loop; the lock guarantees that a stub
1276/// call and the main loop never interleave a half-written frame. Calls
1277/// serialise across all in-flight stubs by acquiring the mutex for the
1278/// full request-then-reply round trip.
1279///
1280/// This relies on the temporal invariant documented on
1281/// [`crate::internal::HostedRpcChannel::call`]: stubs are only invoked
1282/// from inside a running test body, never from `build_stub`, and never
1283/// from detached background work that outlives the test. Under those
1284/// rules the main IPC loop is only reading between tests, so it cannot
1285/// race with a stub call.
1286struct IpcHostedRpcTransport {
1287    connection: Arc<Mutex<Stream>>,
1288    next_request_id: AtomicU64,
1289}
1290
1291impl IpcHostedRpcTransport {
1292    fn new(connection: Arc<Mutex<Stream>>) -> Self {
1293        Self {
1294            connection,
1295            next_request_id: AtomicU64::new(0),
1296        }
1297    }
1298}
1299
1300impl HostedRpcTransport for IpcHostedRpcTransport {
1301    fn call(
1302        &self,
1303        dep_id: &str,
1304        method_idx: u32,
1305        args: Vec<u8>,
1306    ) -> Result<Vec<u8>, HostedRpcError> {
1307        let request_id = self.next_request_id.fetch_add(1, Ordering::SeqCst);
1308        let call = IpcResponse::HostedRpcCall {
1309            request_id,
1310            dep_id: dep_id.to_string(),
1311            method_idx,
1312            args_bytes: args,
1313        };
1314        let msg = serialize_to_byte_vec(&call).map_err(|e| {
1315            HostedRpcError::Transport(format!("encode HostedRpcCall failed: {e:?}"))
1316        })?;
1317
1318        let connection = self.connection.clone();
1319        let handle = tokio::runtime::Handle::current();
1320
1321        // Run the async I/O round-trip from inside this sync trait
1322        // method. `block_in_place` releases this worker thread back to
1323        // the scheduler so the parent's read loop can continue making
1324        // progress on other tasks while we wait for the reply.
1325        tokio::task::block_in_place(move || {
1326            handle.block_on(async move {
1327                let mut conn = connection.lock().await;
1328                write_frame_async(&mut *conn, &msg).await.map_err(|e| {
1329                    HostedRpcError::Transport(format!("write HostedRpcCall failed: {e:?}"))
1330                })?;
1331                let reply_bytes = read_frame_async(&mut *conn).await.map_err(|e| {
1332                    HostedRpcError::Transport(format!("read HostedRpcReply failed: {e:?}"))
1333                })?;
1334                let command: IpcCommand = deserialize(&reply_bytes).map_err(|e| {
1335                    HostedRpcError::Transport(format!("decode HostedRpcReply failed: {e:?}"))
1336                })?;
1337                match command {
1338                    IpcCommand::HostedRpcReply {
1339                        request_id: reply_id,
1340                        body,
1341                    } => {
1342                        if reply_id != request_id {
1343                            return Err(HostedRpcError::Transport(format!(
1344                                "HostedRpcReply request_id mismatch: expected {request_id}, got {reply_id}"
1345                            )));
1346                        }
1347                        match body {
1348                            HostedRpcReplyBody::Ok { result_bytes } => Ok(result_bytes),
1349                            HostedRpcReplyBody::Err { message } => {
1350                                Err(HostedRpcError::Dispatch(message))
1351                            }
1352                        }
1353                    }
1354                    other => Err(HostedRpcError::Transport(format!(
1355                        "unexpected IpcCommand while waiting for HostedRpcReply: {other:?}"
1356                    ))),
1357                }
1358            })
1359        })
1360    }
1361}