Skip to main content

test_r_core/
tokio.rs

1use crate::args::{Arguments, TimeThreshold};
2use crate::bench::AsyncBencher;
3use crate::execution::{DepWireBytes, TestExecution, TestSuiteExecution};
4use crate::internal;
5use crate::internal::{
6    generate_tests, get_ensure_time, CapturedOutput, CloneableCodec, FailureCause,
7    FlakinessControl, HostedRpcChannel, HostedRpcError, HostedRpcOwnerCell, HostedRpcTransport,
8    InProcessHostedRpcTransport, RegisteredTest, RpcFactory, SuiteResult, TestFunction, TestResult,
9    WorkerReconstructor,
10};
11use crate::ipc::{
12    ipc_name, read_frame_async, write_frame_async, HostedRpcReplyBody, IpcCommand, IpcResponse,
13};
14use crate::output::{test_runner_output, TestRunnerOutput};
15use desert_rust::{deserialize, serialize_to_byte_vec};
16use futures::FutureExt;
17use interprocess::local_socket::tokio::prelude::*;
18use interprocess::local_socket::tokio::{Listener, Stream};
19use interprocess::local_socket::{GenericNamespaced, ListenerOptions};
20use std::any::Any;
21use std::collections::HashMap;
22use std::collections::VecDeque;
23use std::future::Future;
24use std::panic::AssertUnwindSafe;
25use std::pin::Pin;
26use std::process::{ExitCode, Stdio};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
30use tokio::process::{Child, Command};
31use tokio::spawn;
32use tokio::sync::Mutex;
33use tokio::task::{spawn_blocking, JoinHandle, JoinSet};
34use tokio::time::Instant;
35use uuid::Uuid;
36
37pub fn test_runner() -> ExitCode {
38    tokio::runtime::Builder::new_multi_thread()
39        .enable_all()
40        .build()
41        .unwrap()
42        .block_on(async_test_runner())
43}
44
45#[allow(clippy::await_holding_lock)]
46async fn async_test_runner() -> ExitCode {
47    crate::panic_hook::install_panic_hook();
48    let mut args = Arguments::from_args();
49    // When the parent spawned this process as a worker it passed
50    // `--worker-index <N>`. Stash it so `crate::worker::worker_index()`
51    // returns the correct value for PerWorker dep constructors.
52    if let Some(idx) = args.worker_index {
53        crate::worker::set_worker_index(idx);
54    }
55    let output = test_runner_output(&args);
56
57    let registered_tests = internal::REGISTERED_TESTS.lock().unwrap();
58    let registered_dependency_constructors =
59        internal::REGISTERED_DEPENDENCY_CONSTRUCTORS.lock().unwrap();
60    let registered_testsuite_props = internal::REGISTERED_TESTSUITE_PROPS.lock().unwrap();
61    let registered_test_generators = internal::REGISTERED_TEST_GENERATORS.lock().unwrap();
62
63    let generated_tests = generate_tests(&registered_test_generators).await;
64
65    let all_tests: Vec<RegisteredTest> = registered_tests
66        .iter()
67        .cloned()
68        .chain(generated_tests)
69        .collect();
70
71    if args.list {
72        output.test_list(&all_tests);
73        ExitCode::SUCCESS
74    } else {
75        let mut remaining_retries = args.flaky_run.unwrap_or(1);
76
77        let mut exit_code = ExitCode::from(101);
78        while remaining_retries > 0 {
79            let (mut execution, filtered_tests) = TestSuiteExecution::construct(
80                &args,
81                registered_dependency_constructors.as_slice(),
82                &all_tests,
83                registered_testsuite_props.as_slice(),
84            );
85            args.finalize_for_execution(&execution, output.clone());
86            let is_top_level_parent = args.is_top_level_parent();
87            let has_selected_tests = execution.remaining() > 0;
88            // Parent-side collection for dependency scopes whose worker-side
89            // value is shipped as bytes or represented as an RPC stub. Async
90            // constructors are awaited here, before workers receive their
91            // reconstructed values. Skip it for empty filtered runs.
92            let needs_parent_shared = execution.has_cloneable_dependencies()
93                || execution.has_hosted_dependencies()
94                || execution.has_hosted_rpc_dependencies();
95            let parent_shared = if is_top_level_parent && has_selected_tests && needs_parent_shared
96            {
97                execution.collect_parent_shared_dependencies_async().await
98            } else {
99                crate::execution::ParentSharedDependencies {
100                    cloneable_wire_bytes: Vec::new(),
101                    cloneable_local_values: Vec::new(),
102                    hosted_descriptor_bytes: Vec::new(),
103                    hosted_owners: Vec::new(),
104                    hosted_rpc_owner_cells: Vec::new(),
105                }
106            };
107            let cloneable_wire_bytes = parent_shared.cloneable_wire_bytes;
108            let cloneable_local_values = parent_shared.cloneable_local_values;
109            let hosted_descriptor_bytes = parent_shared.hosted_descriptor_bytes;
110            let _hosted_owners = parent_shared.hosted_owners;
111            let hosted_rpc_owner_cells: HashMap<String, Arc<HostedRpcOwnerCell>> =
112                parent_shared.hosted_rpc_owner_cells.into_iter().collect();
113            // Pre-built RpcFactory lookup keyed by qualified id, so worker
114            // subprocesses can build stubs without re-locking the global
115            // REGISTERED_DEPENDENCY_CONSTRUCTORS.
116            let rpc_factories: HashMap<String, RpcFactory> = registered_dependency_constructors
117                .iter()
118                .filter_map(|d| {
119                    if d.scope == crate::internal::DepScope::HostedRpc {
120                        d.rpc_factory
121                            .as_ref()
122                            .map(|f| (d.qualified_id(), f.clone()))
123                    } else {
124                        None
125                    }
126                })
127                .collect();
128            // Build a combined Cloneable + Hosted codec/worker lookup table
129            // now, before `test_thread` workers are spawned (see sync.rs
130            // for rationale). Keyed by the dep's fully-qualified id
131            // (`{crate}::{module}::{name}`) so workers can route an incoming
132            // `ProvideCloneable` / `ProvideHostedDescriptor` to the correct
133            // dep even when two deps share a local `name` in different
134            // modules.
135            let cloneable_codecs: HashMap<String, (CloneableCodec, WorkerReconstructor)> =
136                registered_dependency_constructors
137                    .iter()
138                    .filter_map(|d| {
139                        let codec_opt = match d.scope {
140                            crate::internal::DepScope::Cloneable => d.cloneable_codec.as_ref(),
141                            crate::internal::DepScope::Hosted => d.hosted_codec.as_ref(),
142                            _ => None,
143                        };
144                        match (codec_opt, &d.worker_fn) {
145                            (Some(codec), Some(worker_fn)) => {
146                                Some((d.qualified_id(), (codec.clone(), worker_fn.clone())))
147                            }
148                            _ => None,
149                        }
150                    })
151                    .collect();
152            // Mode-consistent Cloneable semantics for the no-spawn-workers
153            // path (e.g. `--nocapture`): reuse the parent-constructed value
154            // directly instead of re-running the user constructor in
155            // `materialize_deps`. Without this, a Cloneable dep's
156            // constructor would run twice (once for parent-side
157            // `collect_parent_shared_dependencies_async`, once for the
158            // in-process test execution), which both violates the
159            // "constructor runs once" expectation and can deadlock when
160            // the constructor takes a runtime-wide lock.
161            if is_top_level_parent && !args.spawn_workers && !cloneable_local_values.is_empty() {
162                apply_cloneable_values_locally(&mut execution, &cloneable_local_values);
163            }
164            // Mode-consistent Hosted semantics: when this is the top-level
165            // parent AND we do NOT spawn workers (e.g. --nocapture), the
166            // test functions run in this same process, but they must still
167            // see the *worker-side handle* produced by
168            // `HostedDep::from_descriptor`. Reconstruct each handle locally
169            // via the descriptor round-trip and pre-populate the execution
170            // tree.
171            if is_top_level_parent && !args.spawn_workers && !hosted_descriptor_bytes.is_empty() {
172                apply_hosted_descriptors_locally(
173                    &mut execution,
174                    &cloneable_codecs,
175                    &hosted_descriptor_bytes,
176                )
177                .await;
178            }
179            // Mode-consistent HostedRpc semantics for the no-spawn-workers
180            // path: install in-process stubs that route straight to the
181            // parent-held owner cells, so tests see the same `Stub` value
182            // whether or not the runner spawns workers.
183            if is_top_level_parent && !args.spawn_workers && !hosted_rpc_owner_cells.is_empty() {
184                install_local_hosted_rpc_stubs(
185                    &mut execution,
186                    &rpc_factories,
187                    &hosted_rpc_owner_cells,
188                );
189            }
190            if args.spawn_workers {
191                execution.skip_creating_dependencies();
192            }
193
194            // println!("Execution plan: {execution:?}");
195            // println!("Final args: {args:?}");
196            // println!("Has dependencies: {:?}", execution.has_dependencies());
197
198            let count = execution.remaining();
199            let results = Arc::new(Mutex::new(Vec::with_capacity(count)));
200
201            let start = Instant::now();
202            output.start_suite(&filtered_tests);
203
204            let execution = Arc::new(Mutex::new(execution));
205            let cloneable_wire_bytes = Arc::new(cloneable_wire_bytes);
206            let hosted_descriptor_bytes = Arc::new(hosted_descriptor_bytes);
207            let cloneable_codecs = Arc::new(cloneable_codecs);
208            let rpc_factories = Arc::new(rpc_factories);
209            let hosted_rpc_owner_cells = Arc::new(hosted_rpc_owner_cells);
210            let mut join_set = JoinSet::new();
211            let threads = args.test_threads().get();
212
213            for worker_idx in 0..threads {
214                let execution_clone = execution.clone();
215                let output_clone = output.clone();
216                // Stamp each test-thread's args with the worker index it will
217                // hand to its spawned child via `--worker-index <N>`.
218                let mut args_clone = args.clone();
219                if args_clone.spawn_workers {
220                    args_clone.worker_index = Some(worker_idx);
221                }
222                let results_clone = results.clone();
223                let wire_bytes_clone = cloneable_wire_bytes.clone();
224                let hosted_bytes_clone = hosted_descriptor_bytes.clone();
225                let codecs_clone = cloneable_codecs.clone();
226                let rpc_factories_clone = rpc_factories.clone();
227                let hosted_rpc_owner_cells_clone = hosted_rpc_owner_cells.clone();
228                let handle = tokio::runtime::Handle::current();
229                join_set.spawn_blocking(move || {
230                    handle.block_on(test_thread(
231                        args_clone,
232                        execution_clone,
233                        output_clone,
234                        count,
235                        results_clone,
236                        wire_bytes_clone,
237                        hosted_bytes_clone,
238                        codecs_clone,
239                        rpc_factories_clone,
240                        hosted_rpc_owner_cells_clone,
241                    ))
242                });
243            }
244
245            while let Some(res) = join_set.join_next().await {
246                res.expect("Failed to join task");
247            }
248
249            drop(execution);
250
251            let results = results.lock().await;
252            output.finished_suite(&all_tests, &results, start.elapsed());
253            exit_code = SuiteResult::exit_code(&results);
254
255            if exit_code == ExitCode::SUCCESS {
256                break;
257            } else {
258                remaining_retries -= 1;
259            }
260        }
261        exit_code
262    }
263}
264
265#[allow(clippy::too_many_arguments)]
266async fn test_thread(
267    args: Arguments,
268    execution: Arc<Mutex<TestSuiteExecution>>,
269    output: Arc<dyn TestRunnerOutput>,
270    count: usize,
271    results: Arc<Mutex<Vec<(RegisteredTest, TestResult)>>>,
272    cloneable_wire_bytes: Arc<Vec<DepWireBytes>>,
273    hosted_descriptor_bytes: Arc<Vec<DepWireBytes>>,
274    cloneable_codecs: Arc<HashMap<String, (CloneableCodec, WorkerReconstructor)>>,
275    rpc_factories: Arc<HashMap<String, RpcFactory>>,
276    hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
277) {
278    let mut worker = spawn_worker_if_needed(&args).await;
279    // Parent dispatches incoming `HostedRpcCall` frames against the owner
280    // cells materialised in the top-level parent. Workers don't need the owner
281    // cells (they own stubs instead), so they receive an empty map and the
282    // dispatch code path is never reached in subprocesses.
283    if let Some(worker) = worker.as_mut() {
284        worker.set_hosted_rpc_owner_cells(hosted_rpc_owner_cells.clone());
285    }
286    let connection_arc = if let Some(ref name) = args.ipc {
287        let name = ipc_name(name.clone());
288        let stream = Stream::connect(name)
289            .await
290            .expect("Failed to connect to IPC socket");
291        Some(Arc::new(Mutex::new(stream)))
292    } else {
293        None
294    };
295
296    if let Some(worker) = worker.as_mut() {
297        for (dep_id, wire_bytes) in cloneable_wire_bytes.iter() {
298            worker
299                .provide_cloneable(dep_id.clone(), wire_bytes.clone())
300                .await;
301        }
302        // Ship every Hosted dep's descriptor bytes too.
303        for (dep_id, descriptor_bytes) in hosted_descriptor_bytes.iter() {
304            worker
305                .provide_hosted_descriptor(dep_id.clone(), descriptor_bytes.clone())
306                .await;
307        }
308    }
309
310    // Worker subprocess side: build a stub for every HostedRpc dep registered
311    // in this binary using the IPC-backed transport sharing the same socket as
312    // the main IPC loop. Install the stubs in the execution tree so dependency
313    // materialisation skips the parent-only owner constructor.
314    if let Some(connection) = connection_arc.as_ref() {
315        if !rpc_factories.is_empty() {
316            install_worker_subprocess_hosted_rpc_stubs(
317                &execution,
318                &rpc_factories,
319                connection.clone(),
320            )
321            .await;
322        }
323    }
324
325    let mut expected_test = None;
326
327    while !is_done(&execution).await {
328        if let Some(connection) = connection_arc.as_ref() {
329            while expected_test.is_none() {
330                let mut conn = connection.lock().await;
331                let command_bytes = read_frame_async(&mut *conn)
332                    .await
333                    .expect("Failed to read IPC command frame");
334                drop(conn);
335                let command: IpcCommand =
336                    deserialize(&command_bytes).expect("Failed to decode IPC command");
337
338                match command {
339                    IpcCommand::RunTest {
340                        name,
341                        crate_name,
342                        module_path,
343                    } => {
344                        expected_test = Some((name, crate_name, module_path));
345                    }
346                    IpcCommand::ProvideCloneable { dep_id, wire_bytes } => {
347                        // Worker-side reconstruction (see sync.rs::apply_provided_wire_bytes).
348                        apply_provided_wire_bytes(
349                            &execution,
350                            &cloneable_codecs,
351                            &dep_id,
352                            &wire_bytes,
353                            "ProvideCloneable",
354                        )
355                        .await;
356                        let response = IpcResponse::CloneableAccepted { dep_id };
357                        let msg = serialize_to_byte_vec(&response)
358                            .expect("Failed to encode IPC response");
359                        let mut conn = connection.lock().await;
360                        write_frame_async(&mut *conn, &msg)
361                            .await
362                            .expect("Failed to write IPC response frame");
363                    }
364                    IpcCommand::ProvideHostedDescriptor { dep_id, wire_bytes } => {
365                        // Worker-side reconstruction: same shape as
366                        // ProvideCloneable but routed through the registered
367                        // HostedDep worker_fn.
368                        apply_provided_wire_bytes(
369                            &execution,
370                            &cloneable_codecs,
371                            &dep_id,
372                            &wire_bytes,
373                            "ProvideHostedDescriptor",
374                        )
375                        .await;
376                        let response = IpcResponse::HostedDescriptorAccepted { dep_id };
377                        let msg = serialize_to_byte_vec(&response)
378                            .expect("Failed to encode IPC response");
379                        let mut conn = connection.lock().await;
380                        write_frame_async(&mut *conn, &msg)
381                            .await
382                            .expect("Failed to write IPC response frame");
383                    }
384                    IpcCommand::HostedRpcReply { .. } => {
385                        // HR1.2: replies for worker-initiated HostedRpc calls
386                        // are consumed inline by the IPC transport during
387                        // test execution, never by this between-tests
388                        // command loop. Receiving one here means the
389                        // protocol got out of sync; surface that loudly
390                        // rather than dropping the frame.
391                        panic!(
392                            "unexpected `HostedRpcReply` while waiting for the next \
393                             between-tests command in the tokio worker subprocess: a \
394                             stub call must have left a reply on the wire without \
395                             draining it inline"
396                        );
397                    }
398                }
399            }
400        }
401
402        if let Some(next) = pick_next(&execution).await {
403            let skip = if let Some((name, crate_name, module_path)) = &expected_test {
404                next.test.name != *name
405                    || next.test.crate_name != *crate_name
406                    || next.test.module_path != *module_path
407            } else {
408                false
409            };
410
411            if !skip {
412                expected_test = None;
413
414                let ensure_time = get_ensure_time(&args, &next.test);
415
416                output.start_running_test(&next.test, next.index, count);
417                let result = run_test(
418                    output.clone(),
419                    next.index,
420                    count,
421                    args.nocapture,
422                    args.include_ignored,
423                    ensure_time,
424                    next.deps.clone(),
425                    &next.test,
426                    &mut worker,
427                )
428                .await;
429                output.finished_running_test(&next.test, next.index, count, &result);
430
431                if let Some(connection) = connection_arc.as_ref() {
432                    let finish_marker = Uuid::new_v4().to_string();
433                    let finish_marker_line = format!("{finish_marker}\n");
434                    tokio::io::stdout()
435                        .write_all(finish_marker_line.as_bytes())
436                        .await
437                        .unwrap();
438                    tokio::io::stderr()
439                        .write_all(finish_marker_line.as_bytes())
440                        .await
441                        .unwrap();
442                    tokio::io::stdout().flush().await.unwrap();
443                    tokio::io::stderr().flush().await.unwrap();
444
445                    let response = IpcResponse::TestFinished {
446                        result: (&result).into(),
447                        finish_marker,
448                    };
449                    let msg =
450                        serialize_to_byte_vec(&response).expect("Failed to encode IPC response");
451                    let mut conn = connection.lock().await;
452                    write_frame_async(&mut *conn, &msg)
453                        .await
454                        .expect("Failed to write IPC response frame");
455                }
456
457                results.lock().await.push((next.test.clone(), result));
458            }
459        }
460    }
461}
462
463async fn is_done(execution: &Arc<Mutex<TestSuiteExecution>>) -> bool {
464    let execution = execution.lock().await;
465    execution.is_done()
466}
467
468/// Async counterpart to `sync::apply_provided_wire_bytes`. Decodes the wire
469/// bytes into a worker-side dependency value (looked up by the dep's
470/// fully-qualified id `{crate}::{module}::{name}`) and stores it in the
471/// execution tree so the next `materialize_deps` call uses the pre-resolved
472/// value.
473///
474/// `source_command` is the textual name of the IPC command that delivered
475/// the bytes (`"ProvideCloneable"` or `"ProvideHostedDescriptor"`); used
476/// only in panic messages.
477async fn apply_provided_wire_bytes(
478    execution: &Arc<Mutex<TestSuiteExecution>>,
479    wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
480    dep_id: &str,
481    wire_bytes: &[u8],
482    source_command: &str,
483) {
484    let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
485        panic!("{source_command} referenced unknown wire-shipped dep '{dep_id}'")
486    });
487
488    let wire_payload = (codec.from_wire_bytes)(wire_bytes);
489    let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
490        Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
491    let reconstructed = match worker_fn {
492        WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
493        WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
494    };
495
496    let mut execution = execution.lock().await;
497    let applied = execution.provide_cloneable_value(dep_id, reconstructed);
498    assert!(
499        applied,
500        "{source_command} for dep '{dep_id}' did not match any registered dep in this worker"
501    );
502}
503
504/// Mode-consistent Cloneable semantics for the no-spawn-workers path on
505/// the tokio runner. Mirrors `sync::apply_cloneable_values_locally`: takes
506/// the parent-constructed Cloneable values and installs them directly
507/// into the parent's `TestSuiteExecution`, so `materialize_deps` reuses
508/// them instead of re-running the user constructor.
509///
510/// For `Cloneable`, the documented round-trip
511/// `from_wire(to_wire(value))` is semantics-preserving, so reusing the
512/// parent value directly is equivalent to round-tripping it through the
513/// wire codec while avoiding the duplicate constructor run that would
514/// otherwise occur on the no-spawn-workers code path. The duplicate run
515/// historically caused user-visible problems (extra observable side
516/// effects under `--nocapture`, and deadlocks when the constructor takes
517/// a runtime-wide lock).
518fn apply_cloneable_values_locally(
519    execution: &mut TestSuiteExecution,
520    cloneable_local_values: &[(String, Arc<dyn Any + Send + Sync>)],
521) {
522    for (dep_id, value) in cloneable_local_values {
523        let applied = execution.provide_cloneable_value(dep_id, value.clone());
524        assert!(
525            applied,
526            "Cloneable dep '{dep_id}' could not be pre-populated locally"
527        );
528    }
529}
530
531/// Mode-consistent Hosted semantics for the no-spawn-workers path on the
532/// tokio runner. Mirrors `sync::apply_hosted_descriptors_locally`: takes
533/// the parent-collected descriptor bytes and reconstructs each Hosted
534/// dep's worker-side handle (via the registered codec + worker_fn)
535/// directly in the parent's `TestSuiteExecution`.
536///
537/// Both `WorkerReconstructor::Sync` (`HostedDep::from_descriptor`) and
538/// `WorkerReconstructor::Async` (`AsyncHostedDep::from_descriptor`) are
539/// supported here so that `async_worker` Hosted deps see the same
540/// worker-side handle whether the runner ended up in spawned-worker mode
541/// or in the no-spawn fallback, matching the documented mode-consistent
542/// Hosted contract in `book/src/advanced_features/dependency_sharing.md`.
543async fn apply_hosted_descriptors_locally(
544    execution: &mut TestSuiteExecution,
545    wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
546    descriptor_bytes: &[DepWireBytes],
547) {
548    for (dep_id, wire_bytes) in descriptor_bytes {
549        let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
550            panic!("Hosted dep '{dep_id}' missing codec/worker_fn for local handle reconstruction")
551        });
552        let wire_payload = (codec.from_wire_bytes)(wire_bytes);
553        let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
554            Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
555        let reconstructed = match worker_fn {
556            WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
557            WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
558        };
559        let applied = execution.provide_cloneable_value(dep_id, reconstructed);
560        assert!(
561            applied,
562            "Hosted dep '{dep_id}' could not be pre-populated locally"
563        );
564    }
565}
566
567async fn pick_next(execution: &Arc<Mutex<TestSuiteExecution>>) -> Option<TestExecution> {
568    let mut execution = execution.lock().await;
569    execution.pick_next().await
570}
571
572async fn run_with_flakiness_control<F>(
573    output: Arc<dyn TestRunnerOutput>,
574    test_description: &RegisteredTest,
575    idx: usize,
576    count: usize,
577    test: F,
578) -> Result<Result<(), FailureCause>, Box<dyn Any + Send>>
579where
580    F: Fn(
581            Instant,
582        )
583            -> Pin<Box<dyn Future<Output = Result<Result<(), FailureCause>, Box<dyn Any + Send>>>>>
584        + Send
585        + Sync,
586{
587    match &test_description.props.flakiness_control {
588        FlakinessControl::None => {
589            let start = Instant::now();
590            test(start).await
591        }
592        FlakinessControl::ProveNonFlaky(tries) => {
593            for n in 0..*tries {
594                if n > 0 {
595                    output.repeat_running_test(
596                        test_description,
597                        idx,
598                        count,
599                        n + 1,
600                        *tries,
601                        "to ensure test is not flaky",
602                    );
603                }
604                let start = Instant::now();
605                match test(start).await {
606                    Ok(Ok(())) => {}
607                    Ok(Err(e)) => return Ok(Err(e)),
608                    Err(e) => return Err(e),
609                };
610            }
611            Ok(Ok(()))
612        }
613        FlakinessControl::RetryKnownFlaky(max_retries) => {
614            let mut tries = 1;
615            loop {
616                let start = Instant::now();
617                let result = test(start).await;
618
619                if result.is_err() && tries < *max_retries {
620                    tries += 1;
621                    output.repeat_running_test(
622                        test_description,
623                        idx,
624                        count,
625                        tries,
626                        *max_retries,
627                        "because test is known to be flaky",
628                    );
629                } else {
630                    break result;
631                }
632            }
633        }
634    }
635}
636
637#[allow(clippy::too_many_arguments)]
638async fn run_test(
639    output: Arc<dyn TestRunnerOutput>,
640    idx: usize,
641    count: usize,
642    nocapture: bool,
643    include_ignored: bool,
644    ensure_time: Option<TimeThreshold>,
645    dependency_view: Arc<dyn internal::DependencyView + Send + Sync>,
646    test: &RegisteredTest,
647    worker: &mut Option<Worker>,
648) -> TestResult {
649    if test.props.is_ignored && !include_ignored {
650        TestResult::ignored()
651    } else if let Some(worker) = worker.as_mut() {
652        worker.run_test(nocapture, test).await
653    } else {
654        let start = Instant::now();
655        let test = test.clone();
656        match &test.run {
657            TestFunction::Sync(_) => {
658                let handle = spawn_blocking(move || {
659                    let test = test.clone();
660                    crate::sync::run_sync_test_function(
661                        output,
662                        &test,
663                        idx,
664                        count,
665                        ensure_time,
666                        dependency_view,
667                    )
668                });
669                handle.await.unwrap_or_else(|join_error| {
670                    TestResult::failed(
671                        start.elapsed(),
672                        FailureCause::HarnessError(format!(
673                            "Failed joining test task: {join_error}"
674                        )),
675                    )
676                })
677            }
678            TestFunction::Async(test_fn) => {
679                let timeout = test.props.timeout;
680                let test_fn = test_fn.clone();
681                let detached_panic_policy = test.props.detached_panic_policy.clone();
682                let result = run_with_flakiness_control(output, &test, idx, count, |start| {
683                    let dependency_view = dependency_view.clone();
684                    let test_fn = test_fn.clone();
685                    Box::pin(async move {
686                        let test_id = crate::panic_hook::next_test_id();
687                        crate::panic_hook::set_current_test_id(test_id);
688                        crate::panic_hook::create_detached_collector(test_id);
689                        let result = AssertUnwindSafe(Box::pin(async move {
690                            match timeout {
691                                None => test_fn(dependency_view).await,
692                                Some(duration) => {
693                                    let result =
694                                        tokio::time::timeout(duration, test_fn(dependency_view))
695                                            .await;
696                                    match result {
697                                        Ok(result) => result,
698                                        Err(_) => {
699                                            return Err(FailureCause::HarnessError(
700                                                "Test timed out".to_string(),
701                                            ))
702                                        }
703                                    }
704                                }
705                            }
706                            .into_result()?;
707                            if let Some(ensure_time) = ensure_time {
708                                let elapsed = start.elapsed();
709                                if ensure_time.is_critical(&elapsed) {
710                                    return Err(FailureCause::HarnessError(format!(
711                                        "Test run time exceeds critical threshold: {elapsed:?}"
712                                    )));
713                                }
714                            }
715                            Ok(())
716                        }))
717                        .catch_unwind()
718                        .await;
719                        result
720                    })
721                })
722                .await;
723                let mut test_result =
724                    TestResult::from_result(&test.props.should_panic, start.elapsed(), result);
725                if let Some(test_id) = crate::panic_hook::current_test_id() {
726                    if let Some(collector) = crate::panic_hook::take_detached_collector(test_id) {
727                        let panics = match collector.lock() {
728                            Ok(p) => p,
729                            Err(poisoned) => poisoned.into_inner(),
730                        };
731                        if !panics.is_empty()
732                            && detached_panic_policy == internal::DetachedPanicPolicy::FailTest
733                            && test_result.is_passed()
734                        {
735                            let messages: Vec<String> = panics.iter().map(|p| p.render()).collect();
736                            test_result = TestResult::failed(
737                                start.elapsed(),
738                                FailureCause::Panic(internal::PanicCause {
739                                    message: Some(format!(
740                                        "Detached task(s) panicked:\n{}",
741                                        messages.join("\n---\n")
742                                    )),
743                                    location: panics.first().and_then(|p| p.location.clone()),
744                                    backtrace: panics.first().and_then(|p| p.backtrace.clone()),
745                                }),
746                            );
747                        }
748                    }
749                }
750                crate::panic_hook::clear_current_test_id();
751                test_result
752            }
753            TestFunction::SyncBench(_) => {
754                let handle = spawn_blocking(move || {
755                    let test = test.clone();
756                    crate::sync::run_sync_test_function(
757                        output,
758                        &test,
759                        idx,
760                        count,
761                        ensure_time,
762                        dependency_view,
763                    )
764                });
765                handle.await.unwrap_or_else(|join_error| {
766                    TestResult::failed(
767                        start.elapsed(),
768                        FailureCause::HarnessError(format!(
769                            "Failed joining test task: {join_error}"
770                        )),
771                    )
772                })
773            }
774            TestFunction::AsyncBench(bench_fn) => {
775                let mut bencher = AsyncBencher::new();
776                let test_id = crate::panic_hook::next_test_id();
777                crate::panic_hook::set_current_test_id(test_id);
778                let result = AssertUnwindSafe(async move {
779                    bench_fn(&mut bencher, dependency_view).await;
780                    (
781                        bencher
782                            .summary()
783                            .expect("iter() was not called in bench function"),
784                        bencher.bytes,
785                    )
786                })
787                .catch_unwind()
788                .await;
789                let bytes = result.as_ref().map(|(_, bytes)| *bytes).unwrap_or_default();
790                let test_result = TestResult::from_summary(
791                    &test.props.should_panic,
792                    start.elapsed(),
793                    result.map(|(summary, _)| summary),
794                    bytes,
795                );
796                crate::panic_hook::clear_current_test_id();
797                test_result
798            }
799        }
800    }
801}
802
803struct Worker {
804    _listener: Listener,
805    _process: Child,
806    _out_handle: JoinHandle<()>,
807    _err_handle: JoinHandle<()>,
808    out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
809    err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
810    capture_enabled: Arc<Mutex<bool>>,
811    connection: Stream,
812    /// Parent-held HostedRpc owner cells keyed by fully-qualified dep id. Used
813    /// to dispatch incoming `IpcResponse::HostedRpcCall` frames from the worker
814    /// subprocess back to the right owner.
815    hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
816}
817
818impl Worker {
819    /// Installs the parent-side map of HostedRpc owner cells so this worker can
820    /// route incoming `IpcResponse::HostedRpcCall` frames to the right
821    /// `HostedRpcOwnerCell` while waiting for a worker subprocess response.
822    fn set_hosted_rpc_owner_cells(&mut self, cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>) {
823        self.hosted_rpc_owner_cells = cells;
824    }
825
826    /// Parent-side dispatcher for a single `IpcResponse::HostedRpcCall`. Looks
827    /// up the owner cell by fully-qualified dep id, runs the dispatch on the
828    /// parent's stored owner, and writes the matching
829    /// `IpcCommand::HostedRpcReply` back to the worker subprocess. Mirrors
830    /// `sync::Worker::handle_hosted_rpc_call`.
831    async fn handle_hosted_rpc_call(
832        &mut self,
833        dump_on_ipc_failure: &DumpOnFailure,
834        request_id: u64,
835        dep_id: String,
836        method_idx: u32,
837        args_bytes: Vec<u8>,
838    ) {
839        let body = match self.hosted_rpc_owner_cells.get(&dep_id) {
840            // Use the async dispatch entry point so an owner that implements
841            // `AsyncHostedRpcDep` directly can `.await` inside its dispatcher
842            // without blocking the tokio runtime. Sync owners reach this
843            // entry point through the blanket bridge and their dispatched
844            // future resolves immediately.
845            Some(cell) => match cell.dispatch_async(method_idx, &args_bytes).await {
846                Ok(result_bytes) => HostedRpcReplyBody::Ok { result_bytes },
847                Err(message) => HostedRpcReplyBody::Err { message },
848            },
849            None => HostedRpcReplyBody::Err {
850                message: format!(
851                    "HostedRpc dispatch: unknown dep id '{dep_id}' in parent owner-cell map"
852                ),
853            },
854        };
855        let reply = IpcCommand::HostedRpcReply { request_id, body };
856        let msg = serialize_to_byte_vec(&reply).expect("Failed to encode HostedRpcReply");
857        dump_on_ipc_failure
858            .run(write_frame_async(&mut self.connection, &msg).await)
859            .await;
860    }
861
862    pub async fn run_test(&mut self, nocapture: bool, test: &RegisteredTest) -> TestResult {
863        let mut capture_enabled = self.capture_enabled.lock().await;
864        *capture_enabled = test.props.capture_control.requires_capturing(!nocapture);
865        drop(capture_enabled);
866
867        // Send IPC command and wait for IPC response, and in the meantime read from the stdout/stderr channels
868        let cmd = IpcCommand::RunTest {
869            name: test.name.clone(),
870            crate_name: test.crate_name.clone(),
871            module_path: test.module_path.clone(),
872        };
873
874        let dump_on_ipc_failure = self.dump_on_failure();
875
876        let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
877        dump_on_ipc_failure
878            .run(write_frame_async(&mut self.connection, &msg).await)
879            .await;
880
881        let response = loop {
882            let response_bytes = dump_on_ipc_failure
883                .run(read_frame_async(&mut self.connection).await)
884                .await;
885            let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
886            match response {
887                IpcResponse::TestFinished { .. } => break response,
888                IpcResponse::CloneableAccepted { .. }
889                | IpcResponse::HostedDescriptorAccepted { .. } => continue,
890                IpcResponse::HostedRpcCall {
891                    request_id,
892                    dep_id,
893                    method_idx,
894                    args_bytes,
895                } => {
896                    self.handle_hosted_rpc_call(
897                        &dump_on_ipc_failure,
898                        request_id,
899                        dep_id,
900                        method_idx,
901                        args_bytes,
902                    )
903                    .await;
904                    continue;
905                }
906            }
907        };
908
909        let IpcResponse::TestFinished {
910            result,
911            finish_marker,
912        } = response
913        else {
914            unreachable!("loop only breaks on TestFinished")
915        };
916
917        if test.props.capture_control.requires_capturing(!nocapture) {
918            let out_lines: Vec<_> =
919                Self::drain_until(self.out_lines.clone(), finish_marker.clone()).await;
920            let err_lines: Vec<_> =
921                Self::drain_until(self.err_lines.clone(), finish_marker.clone()).await;
922            result.into_test_result(out_lines, err_lines)
923        } else {
924            result.into_test_result(Vec::new(), Vec::new())
925        }
926    }
927
928    /// Async counterpart to `sync::Worker::provide_cloneable`. `dep_id` is the
929    /// dep's fully-qualified id (`{crate}::{module}::{name}`).
930    async fn provide_cloneable(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
931        let dump_on_ipc_failure = self.dump_on_failure();
932        let cmd = IpcCommand::ProvideCloneable {
933            dep_id: dep_id.clone(),
934            wire_bytes,
935        };
936        let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
937        dump_on_ipc_failure
938            .run(write_frame_async(&mut self.connection, &msg).await)
939            .await;
940
941        loop {
942            let response_bytes = dump_on_ipc_failure
943                .run(read_frame_async(&mut self.connection).await)
944                .await;
945            let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
946            match response {
947                IpcResponse::CloneableAccepted { dep_id: ack_id } => {
948                    if ack_id == dep_id {
949                        return;
950                    }
951                }
952                IpcResponse::HostedDescriptorAccepted { .. } => {
953                    // Out-of-band ack from a previous ProvideHostedDescriptor; ignore.
954                }
955                IpcResponse::TestFinished { .. } => {
956                    // Should not happen before any RunTest.
957                }
958                IpcResponse::HostedRpcCall {
959                    request_id,
960                    dep_id: rpc_dep_id,
961                    method_idx,
962                    args_bytes,
963                } => {
964                    // A worker subprocess can issue a HostedRpc call from
965                    // inside an in-progress test, even while the parent is
966                    // mid-`ProvideCloneable` for a different dep. Dispatch it
967                    // so the protocol doesn't desync.
968                    self.handle_hosted_rpc_call(
969                        &dump_on_ipc_failure,
970                        request_id,
971                        rpc_dep_id,
972                        method_idx,
973                        args_bytes,
974                    )
975                    .await;
976                }
977            }
978        }
979    }
980
981    /// Async counterpart to `sync::Worker::provide_hosted_descriptor`.
982    async fn provide_hosted_descriptor(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
983        let dump_on_ipc_failure = self.dump_on_failure();
984        let cmd = IpcCommand::ProvideHostedDescriptor {
985            dep_id: dep_id.clone(),
986            wire_bytes,
987        };
988        let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
989        dump_on_ipc_failure
990            .run(write_frame_async(&mut self.connection, &msg).await)
991            .await;
992
993        loop {
994            let response_bytes = dump_on_ipc_failure
995                .run(read_frame_async(&mut self.connection).await)
996                .await;
997            let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
998            match response {
999                IpcResponse::HostedDescriptorAccepted { dep_id: ack_id } => {
1000                    if ack_id == dep_id {
1001                        return;
1002                    }
1003                }
1004                IpcResponse::CloneableAccepted { .. } => {
1005                    // Out-of-band ack from a previous ProvideCloneable; ignore.
1006                }
1007                IpcResponse::TestFinished { .. } => {
1008                    // Should not happen before any RunTest.
1009                }
1010                IpcResponse::HostedRpcCall {
1011                    request_id,
1012                    dep_id: rpc_dep_id,
1013                    method_idx,
1014                    args_bytes,
1015                } => {
1016                    // See provide_cloneable arm. Dispatch the call inline so
1017                    // the IPC stream stays in sync.
1018                    self.handle_hosted_rpc_call(
1019                        &dump_on_ipc_failure,
1020                        request_id,
1021                        rpc_dep_id,
1022                        method_idx,
1023                        args_bytes,
1024                    )
1025                    .await;
1026                }
1027            }
1028        }
1029    }
1030
1031    fn dump_on_failure(&self) -> DumpOnFailure {
1032        DumpOnFailure {
1033            out_lines: self.out_lines.clone(),
1034            err_lines: self.err_lines.clone(),
1035        }
1036    }
1037
1038    async fn drain_until(
1039        source: Arc<Mutex<VecDeque<CapturedOutput>>>,
1040        finish_marker: String,
1041    ) -> Vec<CapturedOutput> {
1042        let mut result = Vec::new();
1043        loop {
1044            let mut source = source.lock().await;
1045            while let Some(line) = source.pop_front() {
1046                if line.line() == finish_marker {
1047                    return result;
1048                } else {
1049                    result.push(line.clone());
1050                }
1051            }
1052            drop(source);
1053
1054            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1055        }
1056    }
1057}
1058
1059struct DumpOnFailure {
1060    out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1061    err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1062}
1063
1064impl DumpOnFailure {
1065    pub async fn run<T, E>(&self, result: Result<T, E>) -> T {
1066        match result {
1067            Ok(value) => value,
1068            Err(_error) => {
1069                let out_lines: Vec<_> = self.out_lines.lock().await.drain(..).collect();
1070                let err_lines: Vec<_> = self.err_lines.lock().await.drain(..).collect();
1071                let mut all_lines = [out_lines, err_lines].concat();
1072                all_lines.sort();
1073
1074                for line in all_lines {
1075                    eprintln!("{}", line.line());
1076                }
1077
1078                std::process::exit(1);
1079            }
1080        }
1081    }
1082}
1083
1084async fn spawn_worker_if_needed(args: &Arguments) -> Option<Worker> {
1085    if args.spawn_workers {
1086        let id = Uuid::new_v4();
1087        let name_str = format!("{id}.sock");
1088        let name = name_str
1089            .clone()
1090            .to_ns_name::<GenericNamespaced>()
1091            .expect("Invalid local socket name");
1092        let opts = ListenerOptions::new().name(name.clone());
1093        let listener = opts
1094            .create_tokio()
1095            .expect("Failed to create local socket listener");
1096
1097        let exe = std::env::current_exe().expect("Failed to get current executable path");
1098
1099        let mut args = args.clone();
1100        args.ipc = Some(name_str);
1101        args.spawn_workers = false;
1102        args.logfile = None;
1103        let args = args.to_args();
1104
1105        let mut process = Command::new(exe)
1106            .args(args)
1107            .stdin(Stdio::inherit())
1108            .stderr(Stdio::piped())
1109            .stdout(Stdio::piped())
1110            .spawn()
1111            .expect("Failed to spawn worker process");
1112
1113        let stdout = process.stdout.take().unwrap();
1114        let stderr = process.stderr.take().unwrap();
1115
1116        let out_lines = Arc::new(Mutex::new(VecDeque::new()));
1117        let err_lines = Arc::new(Mutex::new(VecDeque::new()));
1118        let capture_enabled = Arc::new(Mutex::new(true));
1119
1120        let out_lines_clone = out_lines.clone();
1121        let capture_enabled_clone = capture_enabled.clone();
1122        let out_handle = spawn(async move {
1123            let reader = BufReader::new(stdout);
1124            let mut lines = reader.lines();
1125            while let Some(line) = lines
1126                .next_line()
1127                .await
1128                .expect("Failed to read from worker stdout")
1129            {
1130                if *capture_enabled_clone.lock().await {
1131                    out_lines_clone
1132                        .lock()
1133                        .await
1134                        .push_back(CapturedOutput::stdout(line));
1135                } else {
1136                    println!("{line}");
1137                }
1138            }
1139        });
1140
1141        let err_lines_clone = err_lines.clone();
1142        let capture_enabled_clone = capture_enabled.clone();
1143        let err_handle = spawn(async move {
1144            let reader = BufReader::new(stderr);
1145            let mut lines = reader.lines();
1146            while let Some(line) = lines
1147                .next_line()
1148                .await
1149                .expect("Failed to read from worker stderr")
1150            {
1151                if *capture_enabled_clone.lock().await {
1152                    err_lines_clone
1153                        .lock()
1154                        .await
1155                        .push_back(CapturedOutput::stderr(line));
1156                } else {
1157                    eprintln!("{line}");
1158                }
1159            }
1160        });
1161
1162        let connection = listener
1163            .accept()
1164            .await
1165            .expect("Failed to accept connection");
1166
1167        Some(Worker {
1168            _listener: listener,
1169            _process: process,
1170            _out_handle: out_handle,
1171            _err_handle: err_handle,
1172            out_lines,
1173            err_lines,
1174            connection,
1175            capture_enabled,
1176            hosted_rpc_owner_cells: Arc::new(HashMap::new()),
1177        })
1178    } else {
1179        None
1180    }
1181}
1182
1183/// Parent-side `--nocapture` / no-spawn-workers helper. Builds one stub per
1184/// HostedRpc dep using an [`InProcessHostedRpcTransport`] that points at the
1185/// parent-held owner cells, and stashes it in the execution tree so dependency
1186/// materialisation skips the owner-only constructor. Mirrors
1187/// `sync::install_local_hosted_rpc_stubs`.
1188fn install_local_hosted_rpc_stubs(
1189    execution: &mut TestSuiteExecution,
1190    rpc_factories: &HashMap<String, RpcFactory>,
1191    owner_cells: &HashMap<String, Arc<HostedRpcOwnerCell>>,
1192) {
1193    let transport: Arc<dyn HostedRpcTransport> =
1194        Arc::new(InProcessHostedRpcTransport::new(owner_cells.clone()));
1195    for (dep_id, factory) in rpc_factories.iter() {
1196        if !owner_cells.contains_key(dep_id) {
1197            // No owner cell materialised for this dep (e.g. registered
1198            // globally but not pulled into the current filter). Skip so
1199            // we don't install a stub nothing routes.
1200            continue;
1201        }
1202        let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1203        let stub = (factory.build_stub)(channel);
1204        let applied = execution.provide_cloneable_value(dep_id, stub);
1205        assert!(
1206            applied,
1207            "Local HostedRpc stub for '{dep_id}' did not match any registered dep"
1208        );
1209    }
1210}
1211
1212/// Worker subprocess helper. Builds one stub per registered HostedRpc dep backed
1213/// by [`IpcHostedRpcTransport`], and installs it in the execution tree. Mirrors
1214/// `sync::install_worker_subprocess_hosted_rpc_stubs` but runs on the tokio
1215/// `Arc<Mutex<Stream>>` connection.
1216async fn install_worker_subprocess_hosted_rpc_stubs(
1217    execution: &Arc<Mutex<TestSuiteExecution>>,
1218    rpc_factories: &HashMap<String, RpcFactory>,
1219    connection_arc: Arc<Mutex<Stream>>,
1220) {
1221    let transport: Arc<dyn HostedRpcTransport> =
1222        Arc::new(IpcHostedRpcTransport::new(connection_arc));
1223    for (dep_id, factory) in rpc_factories.iter() {
1224        let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1225        let stub = (factory.build_stub)(channel);
1226        let mut execution = execution.lock().await;
1227        let applied = execution.provide_cloneable_value(dep_id, stub);
1228        // Not every binary registers a HostedRpc dep that the current
1229        // execution actually uses; if so, just move on.
1230        let _ = applied;
1231    }
1232}
1233
1234/// Worker subprocess `HostedRpcTransport` for the tokio runner.
1235/// Mirrors `sync::IpcHostedRpcTransport` but bridges a sync trait method
1236/// to the async tokio IPC primitives via
1237/// `tokio::task::block_in_place` + `Handle::current().block_on(...)`.
1238///
1239/// The shared `Arc<Mutex<Stream>>` is the same one used by the
1240/// worker subprocess's main IPC loop; the lock guarantees that a stub
1241/// call and the main loop never interleave a half-written frame. Calls
1242/// serialise across all in-flight stubs by acquiring the mutex for the
1243/// full request-then-reply round trip.
1244///
1245/// This relies on the temporal invariant documented on
1246/// [`crate::internal::HostedRpcChannel::call`]: stubs are only invoked
1247/// from inside a running test body, never from `build_stub`, and never
1248/// from detached background work that outlives the test. Under those
1249/// rules the main IPC loop is only reading between tests, so it cannot
1250/// race with a stub call.
1251struct IpcHostedRpcTransport {
1252    connection: Arc<Mutex<Stream>>,
1253    next_request_id: AtomicU64,
1254}
1255
1256impl IpcHostedRpcTransport {
1257    fn new(connection: Arc<Mutex<Stream>>) -> Self {
1258        Self {
1259            connection,
1260            next_request_id: AtomicU64::new(0),
1261        }
1262    }
1263}
1264
1265impl HostedRpcTransport for IpcHostedRpcTransport {
1266    fn call(
1267        &self,
1268        dep_id: &str,
1269        method_idx: u32,
1270        args: Vec<u8>,
1271    ) -> Result<Vec<u8>, HostedRpcError> {
1272        let request_id = self.next_request_id.fetch_add(1, Ordering::SeqCst);
1273        let call = IpcResponse::HostedRpcCall {
1274            request_id,
1275            dep_id: dep_id.to_string(),
1276            method_idx,
1277            args_bytes: args,
1278        };
1279        let msg = serialize_to_byte_vec(&call).map_err(|e| {
1280            HostedRpcError::Transport(format!("encode HostedRpcCall failed: {e:?}"))
1281        })?;
1282
1283        let connection = self.connection.clone();
1284        let handle = tokio::runtime::Handle::current();
1285
1286        // Run the async I/O round-trip from inside this sync trait
1287        // method. `block_in_place` releases this worker thread back to
1288        // the scheduler so the parent's read loop can continue making
1289        // progress on other tasks while we wait for the reply.
1290        tokio::task::block_in_place(move || {
1291            handle.block_on(async move {
1292                let mut conn = connection.lock().await;
1293                write_frame_async(&mut *conn, &msg).await.map_err(|e| {
1294                    HostedRpcError::Transport(format!("write HostedRpcCall failed: {e:?}"))
1295                })?;
1296                let reply_bytes = read_frame_async(&mut *conn).await.map_err(|e| {
1297                    HostedRpcError::Transport(format!("read HostedRpcReply failed: {e:?}"))
1298                })?;
1299                let command: IpcCommand = deserialize(&reply_bytes).map_err(|e| {
1300                    HostedRpcError::Transport(format!("decode HostedRpcReply failed: {e:?}"))
1301                })?;
1302                match command {
1303                    IpcCommand::HostedRpcReply {
1304                        request_id: reply_id,
1305                        body,
1306                    } => {
1307                        if reply_id != request_id {
1308                            return Err(HostedRpcError::Transport(format!(
1309                                "HostedRpcReply request_id mismatch: expected {request_id}, got {reply_id}"
1310                            )));
1311                        }
1312                        match body {
1313                            HostedRpcReplyBody::Ok { result_bytes } => Ok(result_bytes),
1314                            HostedRpcReplyBody::Err { message } => {
1315                                Err(HostedRpcError::Dispatch(message))
1316                            }
1317                        }
1318                    }
1319                    other => Err(HostedRpcError::Transport(format!(
1320                        "unexpected IpcCommand while waiting for HostedRpcReply: {other:?}"
1321                    ))),
1322                }
1323            })
1324        })
1325    }
1326}