Skip to main content

test_r_core/
tokio.rs

1use crate::args::{Arguments, TimeThreshold};
2use crate::bench::AsyncBencher;
3use crate::execution::{DepWireBytes, TestExecution, TestSuiteExecution};
4use crate::internal;
5use crate::internal::{
6    generate_tests, get_ensure_time, CapturedOutput, CloneableCodec, FailureCause,
7    FlakinessControl, HostedRpcChannel, HostedRpcError, HostedRpcOwnerCell, HostedRpcTransport,
8    InProcessHostedRpcTransport, RegisteredTest, RpcFactory, SuiteResult, TestFunction, TestResult,
9    WorkerReconstructor,
10};
11use crate::ipc::{
12    ipc_name, read_frame_async, write_frame_async, HostedRpcReplyBody, IpcCommand, IpcResponse,
13};
14use crate::output::{test_runner_output, TestRunnerOutput};
15use desert_rust::{deserialize, serialize_to_byte_vec};
16use futures::FutureExt;
17use interprocess::local_socket::tokio::prelude::*;
18use interprocess::local_socket::tokio::{Listener, Stream};
19use interprocess::local_socket::{GenericNamespaced, ListenerOptions};
20use std::any::Any;
21use std::collections::HashMap;
22use std::collections::VecDeque;
23use std::future::Future;
24use std::panic::AssertUnwindSafe;
25use std::pin::Pin;
26use std::process::{ExitCode, Stdio};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
30use tokio::process::{Child, Command};
31use tokio::spawn;
32use tokio::sync::Mutex;
33use tokio::task::{spawn_blocking, JoinHandle, JoinSet};
34use tokio::time::Instant;
35use uuid::Uuid;
36
37pub fn test_runner() -> ExitCode {
38    tokio::runtime::Builder::new_multi_thread()
39        .enable_all()
40        .build()
41        .unwrap()
42        .block_on(async_test_runner())
43}
44
45#[allow(clippy::await_holding_lock)]
46async fn async_test_runner() -> ExitCode {
47    crate::panic_hook::install_panic_hook();
48    let mut args = Arguments::from_args();
49    // When the parent spawned this process as a worker it passed
50    // `--worker-index <N>`. Stash it so `crate::worker::worker_index()`
51    // returns the correct value for PerWorker dep constructors.
52    if let Some(idx) = args.worker_index {
53        crate::worker::set_worker_index(idx);
54    }
55    let output = test_runner_output(&args);
56
57    let registered_tests = internal::REGISTERED_TESTS.lock().unwrap();
58    let registered_dependency_constructors =
59        internal::REGISTERED_DEPENDENCY_CONSTRUCTORS.lock().unwrap();
60    let registered_testsuite_props = internal::REGISTERED_TESTSUITE_PROPS.lock().unwrap();
61    let registered_test_generators = internal::REGISTERED_TEST_GENERATORS.lock().unwrap();
62
63    let generated_tests = generate_tests(&registered_test_generators).await;
64
65    let all_tests: Vec<RegisteredTest> = registered_tests
66        .iter()
67        .cloned()
68        .chain(generated_tests)
69        .collect();
70
71    if args.list {
72        output.test_list(&all_tests);
73        ExitCode::SUCCESS
74    } else {
75        let mut remaining_retries = args.flaky_run.unwrap_or(1);
76
77        let mut exit_code = ExitCode::from(101);
78        while remaining_retries > 0 {
79            let (mut execution, filtered_tests) = TestSuiteExecution::construct(
80                &args,
81                registered_dependency_constructors.as_slice(),
82                &all_tests,
83                registered_testsuite_props.as_slice(),
84            );
85            args.finalize_for_execution(&execution, output.clone());
86            let is_top_level_parent = args.is_top_level_parent();
87            let has_selected_tests = execution.remaining() > 0;
88            // Parent-side collection for dependency scopes whose worker-side
89            // value is shipped as bytes or represented as an RPC stub. Async
90            // constructors are awaited here, before workers receive their
91            // reconstructed values. Skip it for empty filtered runs.
92            let needs_parent_shared = execution.has_cloneable_dependencies()
93                || execution.has_hosted_dependencies()
94                || execution.has_hosted_rpc_dependencies();
95            let parent_shared = if is_top_level_parent && has_selected_tests && needs_parent_shared
96            {
97                execution.collect_parent_shared_dependencies_async().await
98            } else {
99                crate::execution::ParentSharedDependencies {
100                    cloneable_wire_bytes: Vec::new(),
101                    hosted_descriptor_bytes: Vec::new(),
102                    hosted_owners: Vec::new(),
103                    hosted_rpc_owner_cells: Vec::new(),
104                }
105            };
106            let cloneable_wire_bytes = parent_shared.cloneable_wire_bytes;
107            let hosted_descriptor_bytes = parent_shared.hosted_descriptor_bytes;
108            let _hosted_owners = parent_shared.hosted_owners;
109            let hosted_rpc_owner_cells: HashMap<String, Arc<HostedRpcOwnerCell>> =
110                parent_shared.hosted_rpc_owner_cells.into_iter().collect();
111            // Pre-built RpcFactory lookup keyed by qualified id, so worker
112            // subprocesses can build stubs without re-locking the global
113            // REGISTERED_DEPENDENCY_CONSTRUCTORS.
114            let rpc_factories: HashMap<String, RpcFactory> = registered_dependency_constructors
115                .iter()
116                .filter_map(|d| {
117                    if d.scope == crate::internal::DepScope::HostedRpc {
118                        d.rpc_factory
119                            .as_ref()
120                            .map(|f| (d.qualified_id(), f.clone()))
121                    } else {
122                        None
123                    }
124                })
125                .collect();
126            // Build a combined Cloneable + Hosted codec/worker lookup table
127            // now, before `test_thread` workers are spawned (see sync.rs
128            // for rationale). Keyed by the dep's fully-qualified id
129            // (`{crate}::{module}::{name}`) so workers can route an incoming
130            // `ProvideCloneable` / `ProvideHostedDescriptor` to the correct
131            // dep even when two deps share a local `name` in different
132            // modules.
133            let cloneable_codecs: HashMap<String, (CloneableCodec, WorkerReconstructor)> =
134                registered_dependency_constructors
135                    .iter()
136                    .filter_map(|d| {
137                        let codec_opt = match d.scope {
138                            crate::internal::DepScope::Cloneable => d.cloneable_codec.as_ref(),
139                            crate::internal::DepScope::Hosted => d.hosted_codec.as_ref(),
140                            _ => None,
141                        };
142                        match (codec_opt, &d.worker_fn) {
143                            (Some(codec), Some(worker_fn)) => {
144                                Some((d.qualified_id(), (codec.clone(), worker_fn.clone())))
145                            }
146                            _ => None,
147                        }
148                    })
149                    .collect();
150            // Mode-consistent Hosted semantics: when this is the top-level
151            // parent AND we do NOT spawn workers (e.g. --nocapture), the
152            // test functions run in this same process, but they must still
153            // see the *worker-side handle* produced by
154            // `HostedDep::from_descriptor`. Reconstruct each handle locally
155            // via the descriptor round-trip and pre-populate the execution
156            // tree.
157            if is_top_level_parent && !args.spawn_workers && !hosted_descriptor_bytes.is_empty() {
158                apply_hosted_descriptors_locally(
159                    &mut execution,
160                    &cloneable_codecs,
161                    &hosted_descriptor_bytes,
162                )
163                .await;
164            }
165            // Mode-consistent HostedRpc semantics for the no-spawn-workers
166            // path: install in-process stubs that route straight to the
167            // parent-held owner cells, so tests see the same `Stub` value
168            // whether or not the runner spawns workers.
169            if is_top_level_parent && !args.spawn_workers && !hosted_rpc_owner_cells.is_empty() {
170                install_local_hosted_rpc_stubs(
171                    &mut execution,
172                    &rpc_factories,
173                    &hosted_rpc_owner_cells,
174                );
175            }
176            if args.spawn_workers {
177                execution.skip_creating_dependencies();
178            }
179
180            // println!("Execution plan: {execution:?}");
181            // println!("Final args: {args:?}");
182            // println!("Has dependencies: {:?}", execution.has_dependencies());
183
184            let count = execution.remaining();
185            let results = Arc::new(Mutex::new(Vec::with_capacity(count)));
186
187            let start = Instant::now();
188            output.start_suite(&filtered_tests);
189
190            let execution = Arc::new(Mutex::new(execution));
191            let cloneable_wire_bytes = Arc::new(cloneable_wire_bytes);
192            let hosted_descriptor_bytes = Arc::new(hosted_descriptor_bytes);
193            let cloneable_codecs = Arc::new(cloneable_codecs);
194            let rpc_factories = Arc::new(rpc_factories);
195            let hosted_rpc_owner_cells = Arc::new(hosted_rpc_owner_cells);
196            let mut join_set = JoinSet::new();
197            let threads = args.test_threads().get();
198
199            for worker_idx in 0..threads {
200                let execution_clone = execution.clone();
201                let output_clone = output.clone();
202                // Stamp each test-thread's args with the worker index it will
203                // hand to its spawned child via `--worker-index <N>`.
204                let mut args_clone = args.clone();
205                if args_clone.spawn_workers {
206                    args_clone.worker_index = Some(worker_idx);
207                }
208                let results_clone = results.clone();
209                let wire_bytes_clone = cloneable_wire_bytes.clone();
210                let hosted_bytes_clone = hosted_descriptor_bytes.clone();
211                let codecs_clone = cloneable_codecs.clone();
212                let rpc_factories_clone = rpc_factories.clone();
213                let hosted_rpc_owner_cells_clone = hosted_rpc_owner_cells.clone();
214                let handle = tokio::runtime::Handle::current();
215                join_set.spawn_blocking(move || {
216                    handle.block_on(test_thread(
217                        args_clone,
218                        execution_clone,
219                        output_clone,
220                        count,
221                        results_clone,
222                        wire_bytes_clone,
223                        hosted_bytes_clone,
224                        codecs_clone,
225                        rpc_factories_clone,
226                        hosted_rpc_owner_cells_clone,
227                    ))
228                });
229            }
230
231            while let Some(res) = join_set.join_next().await {
232                res.expect("Failed to join task");
233            }
234
235            drop(execution);
236
237            let results = results.lock().await;
238            output.finished_suite(&all_tests, &results, start.elapsed());
239            exit_code = SuiteResult::exit_code(&results);
240
241            if exit_code == ExitCode::SUCCESS {
242                break;
243            } else {
244                remaining_retries -= 1;
245            }
246        }
247        exit_code
248    }
249}
250
251#[allow(clippy::too_many_arguments)]
252async fn test_thread(
253    args: Arguments,
254    execution: Arc<Mutex<TestSuiteExecution>>,
255    output: Arc<dyn TestRunnerOutput>,
256    count: usize,
257    results: Arc<Mutex<Vec<(RegisteredTest, TestResult)>>>,
258    cloneable_wire_bytes: Arc<Vec<DepWireBytes>>,
259    hosted_descriptor_bytes: Arc<Vec<DepWireBytes>>,
260    cloneable_codecs: Arc<HashMap<String, (CloneableCodec, WorkerReconstructor)>>,
261    rpc_factories: Arc<HashMap<String, RpcFactory>>,
262    hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
263) {
264    let mut worker = spawn_worker_if_needed(&args).await;
265    // Parent dispatches incoming `HostedRpcCall` frames against the owner
266    // cells materialised in the top-level parent. Workers don't need the owner
267    // cells (they own stubs instead), so they receive an empty map and the
268    // dispatch code path is never reached in subprocesses.
269    if let Some(worker) = worker.as_mut() {
270        worker.set_hosted_rpc_owner_cells(hosted_rpc_owner_cells.clone());
271    }
272    let connection_arc = if let Some(ref name) = args.ipc {
273        let name = ipc_name(name.clone());
274        let stream = Stream::connect(name)
275            .await
276            .expect("Failed to connect to IPC socket");
277        Some(Arc::new(Mutex::new(stream)))
278    } else {
279        None
280    };
281
282    if let Some(worker) = worker.as_mut() {
283        for (dep_id, wire_bytes) in cloneable_wire_bytes.iter() {
284            worker
285                .provide_cloneable(dep_id.clone(), wire_bytes.clone())
286                .await;
287        }
288        // Ship every Hosted dep's descriptor bytes too.
289        for (dep_id, descriptor_bytes) in hosted_descriptor_bytes.iter() {
290            worker
291                .provide_hosted_descriptor(dep_id.clone(), descriptor_bytes.clone())
292                .await;
293        }
294    }
295
296    // Worker subprocess side: build a stub for every HostedRpc dep registered
297    // in this binary using the IPC-backed transport sharing the same socket as
298    // the main IPC loop. Install the stubs in the execution tree so dependency
299    // materialisation skips the parent-only owner constructor.
300    if let Some(connection) = connection_arc.as_ref() {
301        if !rpc_factories.is_empty() {
302            install_worker_subprocess_hosted_rpc_stubs(
303                &execution,
304                &rpc_factories,
305                connection.clone(),
306            )
307            .await;
308        }
309    }
310
311    let mut expected_test = None;
312
313    while !is_done(&execution).await {
314        if let Some(connection) = connection_arc.as_ref() {
315            while expected_test.is_none() {
316                let mut conn = connection.lock().await;
317                let command_bytes = read_frame_async(&mut *conn)
318                    .await
319                    .expect("Failed to read IPC command frame");
320                drop(conn);
321                let command: IpcCommand =
322                    deserialize(&command_bytes).expect("Failed to decode IPC command");
323
324                match command {
325                    IpcCommand::RunTest {
326                        name,
327                        crate_name,
328                        module_path,
329                    } => {
330                        expected_test = Some((name, crate_name, module_path));
331                    }
332                    IpcCommand::ProvideCloneable { dep_id, wire_bytes } => {
333                        // Worker-side reconstruction (see sync.rs::apply_provided_wire_bytes).
334                        apply_provided_wire_bytes(
335                            &execution,
336                            &cloneable_codecs,
337                            &dep_id,
338                            &wire_bytes,
339                            "ProvideCloneable",
340                        )
341                        .await;
342                        let response = IpcResponse::CloneableAccepted { dep_id };
343                        let msg = serialize_to_byte_vec(&response)
344                            .expect("Failed to encode IPC response");
345                        let mut conn = connection.lock().await;
346                        write_frame_async(&mut *conn, &msg)
347                            .await
348                            .expect("Failed to write IPC response frame");
349                    }
350                    IpcCommand::ProvideHostedDescriptor { dep_id, wire_bytes } => {
351                        // Worker-side reconstruction: same shape as
352                        // ProvideCloneable but routed through the registered
353                        // HostedDep worker_fn.
354                        apply_provided_wire_bytes(
355                            &execution,
356                            &cloneable_codecs,
357                            &dep_id,
358                            &wire_bytes,
359                            "ProvideHostedDescriptor",
360                        )
361                        .await;
362                        let response = IpcResponse::HostedDescriptorAccepted { dep_id };
363                        let msg = serialize_to_byte_vec(&response)
364                            .expect("Failed to encode IPC response");
365                        let mut conn = connection.lock().await;
366                        write_frame_async(&mut *conn, &msg)
367                            .await
368                            .expect("Failed to write IPC response frame");
369                    }
370                    IpcCommand::HostedRpcReply { .. } => {
371                        // HR1.2: replies for worker-initiated HostedRpc calls
372                        // are consumed inline by the IPC transport during
373                        // test execution, never by this between-tests
374                        // command loop. Receiving one here means the
375                        // protocol got out of sync; surface that loudly
376                        // rather than dropping the frame.
377                        panic!(
378                            "unexpected `HostedRpcReply` while waiting for the next \
379                             between-tests command in the tokio worker subprocess: a \
380                             stub call must have left a reply on the wire without \
381                             draining it inline"
382                        );
383                    }
384                }
385            }
386        }
387
388        if let Some(next) = pick_next(&execution).await {
389            let skip = if let Some((name, crate_name, module_path)) = &expected_test {
390                next.test.name != *name
391                    || next.test.crate_name != *crate_name
392                    || next.test.module_path != *module_path
393            } else {
394                false
395            };
396
397            if !skip {
398                expected_test = None;
399
400                let ensure_time = get_ensure_time(&args, &next.test);
401
402                output.start_running_test(&next.test, next.index, count);
403                let result = run_test(
404                    output.clone(),
405                    next.index,
406                    count,
407                    args.nocapture,
408                    args.include_ignored,
409                    ensure_time,
410                    next.deps.clone(),
411                    &next.test,
412                    &mut worker,
413                )
414                .await;
415                output.finished_running_test(&next.test, next.index, count, &result);
416
417                if let Some(connection) = connection_arc.as_ref() {
418                    let finish_marker = Uuid::new_v4().to_string();
419                    let finish_marker_line = format!("{finish_marker}\n");
420                    tokio::io::stdout()
421                        .write_all(finish_marker_line.as_bytes())
422                        .await
423                        .unwrap();
424                    tokio::io::stderr()
425                        .write_all(finish_marker_line.as_bytes())
426                        .await
427                        .unwrap();
428                    tokio::io::stdout().flush().await.unwrap();
429                    tokio::io::stderr().flush().await.unwrap();
430
431                    let response = IpcResponse::TestFinished {
432                        result: (&result).into(),
433                        finish_marker,
434                    };
435                    let msg =
436                        serialize_to_byte_vec(&response).expect("Failed to encode IPC response");
437                    let mut conn = connection.lock().await;
438                    write_frame_async(&mut *conn, &msg)
439                        .await
440                        .expect("Failed to write IPC response frame");
441                }
442
443                results.lock().await.push((next.test.clone(), result));
444            }
445        }
446    }
447}
448
449async fn is_done(execution: &Arc<Mutex<TestSuiteExecution>>) -> bool {
450    let execution = execution.lock().await;
451    execution.is_done()
452}
453
454/// Async counterpart to `sync::apply_provided_wire_bytes`. Decodes the wire
455/// bytes into a worker-side dependency value (looked up by the dep's
456/// fully-qualified id `{crate}::{module}::{name}`) and stores it in the
457/// execution tree so the next `materialize_deps` call uses the pre-resolved
458/// value.
459///
460/// `source_command` is the textual name of the IPC command that delivered
461/// the bytes (`"ProvideCloneable"` or `"ProvideHostedDescriptor"`); used
462/// only in panic messages.
463async fn apply_provided_wire_bytes(
464    execution: &Arc<Mutex<TestSuiteExecution>>,
465    wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
466    dep_id: &str,
467    wire_bytes: &[u8],
468    source_command: &str,
469) {
470    let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
471        panic!("{source_command} referenced unknown wire-shipped dep '{dep_id}'")
472    });
473
474    let wire_payload = (codec.from_wire_bytes)(wire_bytes);
475    let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
476        Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
477    let reconstructed = match worker_fn {
478        WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
479        WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
480    };
481
482    let mut execution = execution.lock().await;
483    let applied = execution.provide_cloneable_value(dep_id, reconstructed);
484    assert!(
485        applied,
486        "{source_command} for dep '{dep_id}' did not match any registered dep in this worker"
487    );
488}
489
490/// Mode-consistent Hosted semantics for the no-spawn-workers path on the
491/// tokio runner. Mirrors `sync::apply_hosted_descriptors_locally`: takes
492/// the parent-collected descriptor bytes and reconstructs each Hosted
493/// dep's worker-side handle (via the registered codec + worker_fn)
494/// directly in the parent's `TestSuiteExecution`.
495///
496/// Both `WorkerReconstructor::Sync` (`HostedDep::from_descriptor`) and
497/// `WorkerReconstructor::Async` (`AsyncHostedDep::from_descriptor`) are
498/// supported here so that `async_worker` Hosted deps see the same
499/// worker-side handle whether the runner ended up in spawned-worker mode
500/// or in the no-spawn fallback, matching the documented mode-consistent
501/// Hosted contract in `book/src/advanced_features/dependency_sharing.md`.
502async fn apply_hosted_descriptors_locally(
503    execution: &mut TestSuiteExecution,
504    wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
505    descriptor_bytes: &[DepWireBytes],
506) {
507    for (dep_id, wire_bytes) in descriptor_bytes {
508        let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
509            panic!("Hosted dep '{dep_id}' missing codec/worker_fn for local handle reconstruction")
510        });
511        let wire_payload = (codec.from_wire_bytes)(wire_bytes);
512        let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
513            Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
514        let reconstructed = match worker_fn {
515            WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
516            WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
517        };
518        let applied = execution.provide_cloneable_value(dep_id, reconstructed);
519        assert!(
520            applied,
521            "Hosted dep '{dep_id}' could not be pre-populated locally"
522        );
523    }
524}
525
526async fn pick_next(execution: &Arc<Mutex<TestSuiteExecution>>) -> Option<TestExecution> {
527    let mut execution = execution.lock().await;
528    execution.pick_next().await
529}
530
531async fn run_with_flakiness_control<F>(
532    output: Arc<dyn TestRunnerOutput>,
533    test_description: &RegisteredTest,
534    idx: usize,
535    count: usize,
536    test: F,
537) -> Result<Result<(), FailureCause>, Box<dyn Any + Send>>
538where
539    F: Fn(
540            Instant,
541        )
542            -> Pin<Box<dyn Future<Output = Result<Result<(), FailureCause>, Box<dyn Any + Send>>>>>
543        + Send
544        + Sync,
545{
546    match &test_description.props.flakiness_control {
547        FlakinessControl::None => {
548            let start = Instant::now();
549            test(start).await
550        }
551        FlakinessControl::ProveNonFlaky(tries) => {
552            for n in 0..*tries {
553                if n > 0 {
554                    output.repeat_running_test(
555                        test_description,
556                        idx,
557                        count,
558                        n + 1,
559                        *tries,
560                        "to ensure test is not flaky",
561                    );
562                }
563                let start = Instant::now();
564                match test(start).await {
565                    Ok(Ok(())) => {}
566                    Ok(Err(e)) => return Ok(Err(e)),
567                    Err(e) => return Err(e),
568                };
569            }
570            Ok(Ok(()))
571        }
572        FlakinessControl::RetryKnownFlaky(max_retries) => {
573            let mut tries = 1;
574            loop {
575                let start = Instant::now();
576                let result = test(start).await;
577
578                if result.is_err() && tries < *max_retries {
579                    tries += 1;
580                    output.repeat_running_test(
581                        test_description,
582                        idx,
583                        count,
584                        tries,
585                        *max_retries,
586                        "because test is known to be flaky",
587                    );
588                } else {
589                    break result;
590                }
591            }
592        }
593    }
594}
595
596#[allow(clippy::too_many_arguments)]
597async fn run_test(
598    output: Arc<dyn TestRunnerOutput>,
599    idx: usize,
600    count: usize,
601    nocapture: bool,
602    include_ignored: bool,
603    ensure_time: Option<TimeThreshold>,
604    dependency_view: Arc<dyn internal::DependencyView + Send + Sync>,
605    test: &RegisteredTest,
606    worker: &mut Option<Worker>,
607) -> TestResult {
608    if test.props.is_ignored && !include_ignored {
609        TestResult::ignored()
610    } else if let Some(worker) = worker.as_mut() {
611        worker.run_test(nocapture, test).await
612    } else {
613        let start = Instant::now();
614        let test = test.clone();
615        match &test.run {
616            TestFunction::Sync(_) => {
617                let handle = spawn_blocking(move || {
618                    let test = test.clone();
619                    crate::sync::run_sync_test_function(
620                        output,
621                        &test,
622                        idx,
623                        count,
624                        ensure_time,
625                        dependency_view,
626                    )
627                });
628                handle.await.unwrap_or_else(|join_error| {
629                    TestResult::failed(
630                        start.elapsed(),
631                        FailureCause::HarnessError(format!(
632                            "Failed joining test task: {join_error}"
633                        )),
634                    )
635                })
636            }
637            TestFunction::Async(test_fn) => {
638                let timeout = test.props.timeout;
639                let test_fn = test_fn.clone();
640                let detached_panic_policy = test.props.detached_panic_policy.clone();
641                let result = run_with_flakiness_control(output, &test, idx, count, |start| {
642                    let dependency_view = dependency_view.clone();
643                    let test_fn = test_fn.clone();
644                    Box::pin(async move {
645                        let test_id = crate::panic_hook::next_test_id();
646                        crate::panic_hook::set_current_test_id(test_id);
647                        crate::panic_hook::create_detached_collector(test_id);
648                        let result = AssertUnwindSafe(Box::pin(async move {
649                            match timeout {
650                                None => test_fn(dependency_view).await,
651                                Some(duration) => {
652                                    let result =
653                                        tokio::time::timeout(duration, test_fn(dependency_view))
654                                            .await;
655                                    match result {
656                                        Ok(result) => result,
657                                        Err(_) => {
658                                            return Err(FailureCause::HarnessError(
659                                                "Test timed out".to_string(),
660                                            ))
661                                        }
662                                    }
663                                }
664                            }
665                            .into_result()?;
666                            if let Some(ensure_time) = ensure_time {
667                                let elapsed = start.elapsed();
668                                if ensure_time.is_critical(&elapsed) {
669                                    return Err(FailureCause::HarnessError(format!(
670                                        "Test run time exceeds critical threshold: {elapsed:?}"
671                                    )));
672                                }
673                            }
674                            Ok(())
675                        }))
676                        .catch_unwind()
677                        .await;
678                        result
679                    })
680                })
681                .await;
682                let mut test_result =
683                    TestResult::from_result(&test.props.should_panic, start.elapsed(), result);
684                if let Some(test_id) = crate::panic_hook::current_test_id() {
685                    if let Some(collector) = crate::panic_hook::take_detached_collector(test_id) {
686                        let panics = match collector.lock() {
687                            Ok(p) => p,
688                            Err(poisoned) => poisoned.into_inner(),
689                        };
690                        if !panics.is_empty()
691                            && detached_panic_policy == internal::DetachedPanicPolicy::FailTest
692                            && test_result.is_passed()
693                        {
694                            let messages: Vec<String> = panics.iter().map(|p| p.render()).collect();
695                            test_result = TestResult::failed(
696                                start.elapsed(),
697                                FailureCause::Panic(internal::PanicCause {
698                                    message: Some(format!(
699                                        "Detached task(s) panicked:\n{}",
700                                        messages.join("\n---\n")
701                                    )),
702                                    location: panics.first().and_then(|p| p.location.clone()),
703                                    backtrace: panics.first().and_then(|p| p.backtrace.clone()),
704                                }),
705                            );
706                        }
707                    }
708                }
709                crate::panic_hook::clear_current_test_id();
710                test_result
711            }
712            TestFunction::SyncBench(_) => {
713                let handle = spawn_blocking(move || {
714                    let test = test.clone();
715                    crate::sync::run_sync_test_function(
716                        output,
717                        &test,
718                        idx,
719                        count,
720                        ensure_time,
721                        dependency_view,
722                    )
723                });
724                handle.await.unwrap_or_else(|join_error| {
725                    TestResult::failed(
726                        start.elapsed(),
727                        FailureCause::HarnessError(format!(
728                            "Failed joining test task: {join_error}"
729                        )),
730                    )
731                })
732            }
733            TestFunction::AsyncBench(bench_fn) => {
734                let mut bencher = AsyncBencher::new();
735                let test_id = crate::panic_hook::next_test_id();
736                crate::panic_hook::set_current_test_id(test_id);
737                let result = AssertUnwindSafe(async move {
738                    bench_fn(&mut bencher, dependency_view).await;
739                    (
740                        bencher
741                            .summary()
742                            .expect("iter() was not called in bench function"),
743                        bencher.bytes,
744                    )
745                })
746                .catch_unwind()
747                .await;
748                let bytes = result.as_ref().map(|(_, bytes)| *bytes).unwrap_or_default();
749                let test_result = TestResult::from_summary(
750                    &test.props.should_panic,
751                    start.elapsed(),
752                    result.map(|(summary, _)| summary),
753                    bytes,
754                );
755                crate::panic_hook::clear_current_test_id();
756                test_result
757            }
758        }
759    }
760}
761
762struct Worker {
763    _listener: Listener,
764    _process: Child,
765    _out_handle: JoinHandle<()>,
766    _err_handle: JoinHandle<()>,
767    out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
768    err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
769    capture_enabled: Arc<Mutex<bool>>,
770    connection: Stream,
771    /// Parent-held HostedRpc owner cells keyed by fully-qualified dep id. Used
772    /// to dispatch incoming `IpcResponse::HostedRpcCall` frames from the worker
773    /// subprocess back to the right owner.
774    hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
775}
776
777impl Worker {
778    /// Installs the parent-side map of HostedRpc owner cells so this worker can
779    /// route incoming `IpcResponse::HostedRpcCall` frames to the right
780    /// `HostedRpcOwnerCell` while waiting for a worker subprocess response.
781    fn set_hosted_rpc_owner_cells(&mut self, cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>) {
782        self.hosted_rpc_owner_cells = cells;
783    }
784
785    /// Parent-side dispatcher for a single `IpcResponse::HostedRpcCall`. Looks
786    /// up the owner cell by fully-qualified dep id, runs the dispatch on the
787    /// parent's stored owner, and writes the matching
788    /// `IpcCommand::HostedRpcReply` back to the worker subprocess. Mirrors
789    /// `sync::Worker::handle_hosted_rpc_call`.
790    async fn handle_hosted_rpc_call(
791        &mut self,
792        dump_on_ipc_failure: &DumpOnFailure,
793        request_id: u64,
794        dep_id: String,
795        method_idx: u32,
796        args_bytes: Vec<u8>,
797    ) {
798        let body = match self.hosted_rpc_owner_cells.get(&dep_id) {
799            // Use the async dispatch entry point so an owner that implements
800            // `AsyncHostedRpcDep` directly can `.await` inside its dispatcher
801            // without blocking the tokio runtime. Sync owners reach this
802            // entry point through the blanket bridge and their dispatched
803            // future resolves immediately.
804            Some(cell) => match cell.dispatch_async(method_idx, &args_bytes).await {
805                Ok(result_bytes) => HostedRpcReplyBody::Ok { result_bytes },
806                Err(message) => HostedRpcReplyBody::Err { message },
807            },
808            None => HostedRpcReplyBody::Err {
809                message: format!(
810                    "HostedRpc dispatch: unknown dep id '{dep_id}' in parent owner-cell map"
811                ),
812            },
813        };
814        let reply = IpcCommand::HostedRpcReply { request_id, body };
815        let msg = serialize_to_byte_vec(&reply).expect("Failed to encode HostedRpcReply");
816        dump_on_ipc_failure
817            .run(write_frame_async(&mut self.connection, &msg).await)
818            .await;
819    }
820
821    pub async fn run_test(&mut self, nocapture: bool, test: &RegisteredTest) -> TestResult {
822        let mut capture_enabled = self.capture_enabled.lock().await;
823        *capture_enabled = test.props.capture_control.requires_capturing(!nocapture);
824        drop(capture_enabled);
825
826        // Send IPC command and wait for IPC response, and in the meantime read from the stdout/stderr channels
827        let cmd = IpcCommand::RunTest {
828            name: test.name.clone(),
829            crate_name: test.crate_name.clone(),
830            module_path: test.module_path.clone(),
831        };
832
833        let dump_on_ipc_failure = self.dump_on_failure();
834
835        let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
836        dump_on_ipc_failure
837            .run(write_frame_async(&mut self.connection, &msg).await)
838            .await;
839
840        let response = loop {
841            let response_bytes = dump_on_ipc_failure
842                .run(read_frame_async(&mut self.connection).await)
843                .await;
844            let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
845            match response {
846                IpcResponse::TestFinished { .. } => break response,
847                IpcResponse::CloneableAccepted { .. }
848                | IpcResponse::HostedDescriptorAccepted { .. } => continue,
849                IpcResponse::HostedRpcCall {
850                    request_id,
851                    dep_id,
852                    method_idx,
853                    args_bytes,
854                } => {
855                    self.handle_hosted_rpc_call(
856                        &dump_on_ipc_failure,
857                        request_id,
858                        dep_id,
859                        method_idx,
860                        args_bytes,
861                    )
862                    .await;
863                    continue;
864                }
865            }
866        };
867
868        let IpcResponse::TestFinished {
869            result,
870            finish_marker,
871        } = response
872        else {
873            unreachable!("loop only breaks on TestFinished")
874        };
875
876        if test.props.capture_control.requires_capturing(!nocapture) {
877            let out_lines: Vec<_> =
878                Self::drain_until(self.out_lines.clone(), finish_marker.clone()).await;
879            let err_lines: Vec<_> =
880                Self::drain_until(self.err_lines.clone(), finish_marker.clone()).await;
881            result.into_test_result(out_lines, err_lines)
882        } else {
883            result.into_test_result(Vec::new(), Vec::new())
884        }
885    }
886
887    /// Async counterpart to `sync::Worker::provide_cloneable`. `dep_id` is the
888    /// dep's fully-qualified id (`{crate}::{module}::{name}`).
889    async fn provide_cloneable(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
890        let dump_on_ipc_failure = self.dump_on_failure();
891        let cmd = IpcCommand::ProvideCloneable {
892            dep_id: dep_id.clone(),
893            wire_bytes,
894        };
895        let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
896        dump_on_ipc_failure
897            .run(write_frame_async(&mut self.connection, &msg).await)
898            .await;
899
900        loop {
901            let response_bytes = dump_on_ipc_failure
902                .run(read_frame_async(&mut self.connection).await)
903                .await;
904            let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
905            match response {
906                IpcResponse::CloneableAccepted { dep_id: ack_id } => {
907                    if ack_id == dep_id {
908                        return;
909                    }
910                }
911                IpcResponse::HostedDescriptorAccepted { .. } => {
912                    // Out-of-band ack from a previous ProvideHostedDescriptor; ignore.
913                }
914                IpcResponse::TestFinished { .. } => {
915                    // Should not happen before any RunTest.
916                }
917                IpcResponse::HostedRpcCall {
918                    request_id,
919                    dep_id: rpc_dep_id,
920                    method_idx,
921                    args_bytes,
922                } => {
923                    // A worker subprocess can issue a HostedRpc call from
924                    // inside an in-progress test, even while the parent is
925                    // mid-`ProvideCloneable` for a different dep. Dispatch it
926                    // so the protocol doesn't desync.
927                    self.handle_hosted_rpc_call(
928                        &dump_on_ipc_failure,
929                        request_id,
930                        rpc_dep_id,
931                        method_idx,
932                        args_bytes,
933                    )
934                    .await;
935                }
936            }
937        }
938    }
939
940    /// Async counterpart to `sync::Worker::provide_hosted_descriptor`.
941    async fn provide_hosted_descriptor(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
942        let dump_on_ipc_failure = self.dump_on_failure();
943        let cmd = IpcCommand::ProvideHostedDescriptor {
944            dep_id: dep_id.clone(),
945            wire_bytes,
946        };
947        let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
948        dump_on_ipc_failure
949            .run(write_frame_async(&mut self.connection, &msg).await)
950            .await;
951
952        loop {
953            let response_bytes = dump_on_ipc_failure
954                .run(read_frame_async(&mut self.connection).await)
955                .await;
956            let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
957            match response {
958                IpcResponse::HostedDescriptorAccepted { dep_id: ack_id } => {
959                    if ack_id == dep_id {
960                        return;
961                    }
962                }
963                IpcResponse::CloneableAccepted { .. } => {
964                    // Out-of-band ack from a previous ProvideCloneable; ignore.
965                }
966                IpcResponse::TestFinished { .. } => {
967                    // Should not happen before any RunTest.
968                }
969                IpcResponse::HostedRpcCall {
970                    request_id,
971                    dep_id: rpc_dep_id,
972                    method_idx,
973                    args_bytes,
974                } => {
975                    // See provide_cloneable arm. Dispatch the call inline so
976                    // the IPC stream stays in sync.
977                    self.handle_hosted_rpc_call(
978                        &dump_on_ipc_failure,
979                        request_id,
980                        rpc_dep_id,
981                        method_idx,
982                        args_bytes,
983                    )
984                    .await;
985                }
986            }
987        }
988    }
989
990    fn dump_on_failure(&self) -> DumpOnFailure {
991        DumpOnFailure {
992            out_lines: self.out_lines.clone(),
993            err_lines: self.err_lines.clone(),
994        }
995    }
996
997    async fn drain_until(
998        source: Arc<Mutex<VecDeque<CapturedOutput>>>,
999        finish_marker: String,
1000    ) -> Vec<CapturedOutput> {
1001        let mut result = Vec::new();
1002        loop {
1003            let mut source = source.lock().await;
1004            while let Some(line) = source.pop_front() {
1005                if line.line() == finish_marker {
1006                    return result;
1007                } else {
1008                    result.push(line.clone());
1009                }
1010            }
1011            drop(source);
1012
1013            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1014        }
1015    }
1016}
1017
1018struct DumpOnFailure {
1019    out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1020    err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1021}
1022
1023impl DumpOnFailure {
1024    pub async fn run<T, E>(&self, result: Result<T, E>) -> T {
1025        match result {
1026            Ok(value) => value,
1027            Err(_error) => {
1028                let out_lines: Vec<_> = self.out_lines.lock().await.drain(..).collect();
1029                let err_lines: Vec<_> = self.err_lines.lock().await.drain(..).collect();
1030                let mut all_lines = [out_lines, err_lines].concat();
1031                all_lines.sort();
1032
1033                for line in all_lines {
1034                    eprintln!("{}", line.line());
1035                }
1036
1037                std::process::exit(1);
1038            }
1039        }
1040    }
1041}
1042
1043async fn spawn_worker_if_needed(args: &Arguments) -> Option<Worker> {
1044    if args.spawn_workers {
1045        let id = Uuid::new_v4();
1046        let name_str = format!("{id}.sock");
1047        let name = name_str
1048            .clone()
1049            .to_ns_name::<GenericNamespaced>()
1050            .expect("Invalid local socket name");
1051        let opts = ListenerOptions::new().name(name.clone());
1052        let listener = opts
1053            .create_tokio()
1054            .expect("Failed to create local socket listener");
1055
1056        let exe = std::env::current_exe().expect("Failed to get current executable path");
1057
1058        let mut args = args.clone();
1059        args.ipc = Some(name_str);
1060        args.spawn_workers = false;
1061        args.logfile = None;
1062        let args = args.to_args();
1063
1064        let mut process = Command::new(exe)
1065            .args(args)
1066            .stdin(Stdio::inherit())
1067            .stderr(Stdio::piped())
1068            .stdout(Stdio::piped())
1069            .spawn()
1070            .expect("Failed to spawn worker process");
1071
1072        let stdout = process.stdout.take().unwrap();
1073        let stderr = process.stderr.take().unwrap();
1074
1075        let out_lines = Arc::new(Mutex::new(VecDeque::new()));
1076        let err_lines = Arc::new(Mutex::new(VecDeque::new()));
1077        let capture_enabled = Arc::new(Mutex::new(true));
1078
1079        let out_lines_clone = out_lines.clone();
1080        let capture_enabled_clone = capture_enabled.clone();
1081        let out_handle = spawn(async move {
1082            let reader = BufReader::new(stdout);
1083            let mut lines = reader.lines();
1084            while let Some(line) = lines
1085                .next_line()
1086                .await
1087                .expect("Failed to read from worker stdout")
1088            {
1089                if *capture_enabled_clone.lock().await {
1090                    out_lines_clone
1091                        .lock()
1092                        .await
1093                        .push_back(CapturedOutput::stdout(line));
1094                } else {
1095                    println!("{line}");
1096                }
1097            }
1098        });
1099
1100        let err_lines_clone = err_lines.clone();
1101        let capture_enabled_clone = capture_enabled.clone();
1102        let err_handle = spawn(async move {
1103            let reader = BufReader::new(stderr);
1104            let mut lines = reader.lines();
1105            while let Some(line) = lines
1106                .next_line()
1107                .await
1108                .expect("Failed to read from worker stderr")
1109            {
1110                if *capture_enabled_clone.lock().await {
1111                    err_lines_clone
1112                        .lock()
1113                        .await
1114                        .push_back(CapturedOutput::stderr(line));
1115                } else {
1116                    eprintln!("{line}");
1117                }
1118            }
1119        });
1120
1121        let connection = listener
1122            .accept()
1123            .await
1124            .expect("Failed to accept connection");
1125
1126        Some(Worker {
1127            _listener: listener,
1128            _process: process,
1129            _out_handle: out_handle,
1130            _err_handle: err_handle,
1131            out_lines,
1132            err_lines,
1133            connection,
1134            capture_enabled,
1135            hosted_rpc_owner_cells: Arc::new(HashMap::new()),
1136        })
1137    } else {
1138        None
1139    }
1140}
1141
1142/// Parent-side `--nocapture` / no-spawn-workers helper. Builds one stub per
1143/// HostedRpc dep using an [`InProcessHostedRpcTransport`] that points at the
1144/// parent-held owner cells, and stashes it in the execution tree so dependency
1145/// materialisation skips the owner-only constructor. Mirrors
1146/// `sync::install_local_hosted_rpc_stubs`.
1147fn install_local_hosted_rpc_stubs(
1148    execution: &mut TestSuiteExecution,
1149    rpc_factories: &HashMap<String, RpcFactory>,
1150    owner_cells: &HashMap<String, Arc<HostedRpcOwnerCell>>,
1151) {
1152    let transport: Arc<dyn HostedRpcTransport> =
1153        Arc::new(InProcessHostedRpcTransport::new(owner_cells.clone()));
1154    for (dep_id, factory) in rpc_factories.iter() {
1155        if !owner_cells.contains_key(dep_id) {
1156            // No owner cell materialised for this dep (e.g. registered
1157            // globally but not pulled into the current filter). Skip so
1158            // we don't install a stub nothing routes.
1159            continue;
1160        }
1161        let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1162        let stub = (factory.build_stub)(channel);
1163        let applied = execution.provide_cloneable_value(dep_id, stub);
1164        assert!(
1165            applied,
1166            "Local HostedRpc stub for '{dep_id}' did not match any registered dep"
1167        );
1168    }
1169}
1170
1171/// Worker subprocess helper. Builds one stub per registered HostedRpc dep backed
1172/// by [`IpcHostedRpcTransport`], and installs it in the execution tree. Mirrors
1173/// `sync::install_worker_subprocess_hosted_rpc_stubs` but runs on the tokio
1174/// `Arc<Mutex<Stream>>` connection.
1175async fn install_worker_subprocess_hosted_rpc_stubs(
1176    execution: &Arc<Mutex<TestSuiteExecution>>,
1177    rpc_factories: &HashMap<String, RpcFactory>,
1178    connection_arc: Arc<Mutex<Stream>>,
1179) {
1180    let transport: Arc<dyn HostedRpcTransport> =
1181        Arc::new(IpcHostedRpcTransport::new(connection_arc));
1182    for (dep_id, factory) in rpc_factories.iter() {
1183        let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1184        let stub = (factory.build_stub)(channel);
1185        let mut execution = execution.lock().await;
1186        let applied = execution.provide_cloneable_value(dep_id, stub);
1187        // Not every binary registers a HostedRpc dep that the current
1188        // execution actually uses; if so, just move on.
1189        let _ = applied;
1190    }
1191}
1192
1193/// Worker subprocess `HostedRpcTransport` for the tokio runner.
1194/// Mirrors `sync::IpcHostedRpcTransport` but bridges a sync trait method
1195/// to the async tokio IPC primitives via
1196/// `tokio::task::block_in_place` + `Handle::current().block_on(...)`.
1197///
1198/// The shared `Arc<Mutex<Stream>>` is the same one used by the
1199/// worker subprocess's main IPC loop; the lock guarantees that a stub
1200/// call and the main loop never interleave a half-written frame. Calls
1201/// serialise across all in-flight stubs by acquiring the mutex for the
1202/// full request-then-reply round trip.
1203///
1204/// This relies on the temporal invariant documented on
1205/// [`crate::internal::HostedRpcChannel::call`]: stubs are only invoked
1206/// from inside a running test body, never from `build_stub`, and never
1207/// from detached background work that outlives the test. Under those
1208/// rules the main IPC loop is only reading between tests, so it cannot
1209/// race with a stub call.
1210struct IpcHostedRpcTransport {
1211    connection: Arc<Mutex<Stream>>,
1212    next_request_id: AtomicU64,
1213}
1214
1215impl IpcHostedRpcTransport {
1216    fn new(connection: Arc<Mutex<Stream>>) -> Self {
1217        Self {
1218            connection,
1219            next_request_id: AtomicU64::new(0),
1220        }
1221    }
1222}
1223
1224impl HostedRpcTransport for IpcHostedRpcTransport {
1225    fn call(
1226        &self,
1227        dep_id: &str,
1228        method_idx: u32,
1229        args: Vec<u8>,
1230    ) -> Result<Vec<u8>, HostedRpcError> {
1231        let request_id = self.next_request_id.fetch_add(1, Ordering::SeqCst);
1232        let call = IpcResponse::HostedRpcCall {
1233            request_id,
1234            dep_id: dep_id.to_string(),
1235            method_idx,
1236            args_bytes: args,
1237        };
1238        let msg = serialize_to_byte_vec(&call).map_err(|e| {
1239            HostedRpcError::Transport(format!("encode HostedRpcCall failed: {e:?}"))
1240        })?;
1241
1242        let connection = self.connection.clone();
1243        let handle = tokio::runtime::Handle::current();
1244
1245        // Run the async I/O round-trip from inside this sync trait
1246        // method. `block_in_place` releases this worker thread back to
1247        // the scheduler so the parent's read loop can continue making
1248        // progress on other tasks while we wait for the reply.
1249        tokio::task::block_in_place(move || {
1250            handle.block_on(async move {
1251                let mut conn = connection.lock().await;
1252                write_frame_async(&mut *conn, &msg).await.map_err(|e| {
1253                    HostedRpcError::Transport(format!("write HostedRpcCall failed: {e:?}"))
1254                })?;
1255                let reply_bytes = read_frame_async(&mut *conn).await.map_err(|e| {
1256                    HostedRpcError::Transport(format!("read HostedRpcReply failed: {e:?}"))
1257                })?;
1258                let command: IpcCommand = deserialize(&reply_bytes).map_err(|e| {
1259                    HostedRpcError::Transport(format!("decode HostedRpcReply failed: {e:?}"))
1260                })?;
1261                match command {
1262                    IpcCommand::HostedRpcReply {
1263                        request_id: reply_id,
1264                        body,
1265                    } => {
1266                        if reply_id != request_id {
1267                            return Err(HostedRpcError::Transport(format!(
1268                                "HostedRpcReply request_id mismatch: expected {request_id}, got {reply_id}"
1269                            )));
1270                        }
1271                        match body {
1272                            HostedRpcReplyBody::Ok { result_bytes } => Ok(result_bytes),
1273                            HostedRpcReplyBody::Err { message } => {
1274                                Err(HostedRpcError::Dispatch(message))
1275                            }
1276                        }
1277                    }
1278                    other => Err(HostedRpcError::Transport(format!(
1279                        "unexpected IpcCommand while waiting for HostedRpcReply: {other:?}"
1280                    ))),
1281                }
1282            })
1283        })
1284    }
1285}