Skip to main content

test_r_core/
tokio.rs

1use crate::args::{Arguments, TimeThreshold};
2use crate::bench::AsyncBencher;
3use crate::execution::{DepWireBytes, TestExecution, TestSuiteExecution};
4use crate::internal;
5use crate::internal::{
6    generate_tests, get_ensure_time, CapturedOutput, CloneableCodec, FailureCause,
7    FlakinessControl, HostedRpcChannel, HostedRpcError, HostedRpcOwnerCell, HostedRpcTransport,
8    InProcessHostedRpcTransport, RegisteredTest, RpcFactory, SuiteResult, TestFunction, TestResult,
9    WorkerReconstructor,
10};
11use crate::ipc::{
12    ipc_name, read_frame_async, write_frame_async, HostedRpcReplyBody, IpcCommand, IpcResponse,
13};
14use crate::output::{test_runner_output, TestRunnerOutput};
15use desert_rust::{deserialize, serialize_to_byte_vec};
16use futures::FutureExt;
17use interprocess::local_socket::tokio::prelude::*;
18use interprocess::local_socket::tokio::{Listener, Stream};
19use interprocess::local_socket::{GenericNamespaced, ListenerOptions};
20use std::any::Any;
21use std::collections::HashMap;
22use std::collections::VecDeque;
23use std::future::Future;
24use std::panic::AssertUnwindSafe;
25use std::pin::Pin;
26use std::process::{ExitCode, Stdio};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
30use tokio::process::{Child, Command};
31use tokio::spawn;
32use tokio::sync::Mutex;
33use tokio::task::{spawn_blocking, JoinHandle, JoinSet};
34use tokio::time::Instant;
35use uuid::Uuid;
36
37pub fn test_runner() -> ExitCode {
38    tokio::runtime::Builder::new_multi_thread()
39        .enable_all()
40        .build()
41        .unwrap()
42        .block_on(async_test_runner())
43}
44
45#[allow(clippy::await_holding_lock)]
46async fn async_test_runner() -> ExitCode {
47    crate::panic_hook::install_panic_hook();
48    let mut args = Arguments::from_args();
49    // When the parent spawned this process as a worker it passed
50    // `--worker-index <N>`. Stash it so `crate::worker::worker_index()`
51    // returns the correct value for PerWorker dep constructors.
52    if let Some(idx) = args.worker_index {
53        crate::worker::set_worker_index(idx);
54    }
55    let output = test_runner_output(&args);
56
57    let registered_tests = internal::REGISTERED_TESTS.lock().unwrap();
58    let registered_dependency_constructors =
59        internal::REGISTERED_DEPENDENCY_CONSTRUCTORS.lock().unwrap();
60    let registered_testsuite_props = internal::REGISTERED_TESTSUITE_PROPS.lock().unwrap();
61    let registered_test_generators = internal::REGISTERED_TEST_GENERATORS.lock().unwrap();
62
63    let generated_tests = generate_tests(&registered_test_generators).await;
64
65    let all_tests: Vec<RegisteredTest> = registered_tests
66        .iter()
67        .cloned()
68        .chain(generated_tests)
69        .collect();
70
71    if args.list {
72        output.test_list(&all_tests);
73        ExitCode::SUCCESS
74    } else {
75        let mut remaining_retries = args.flaky_run.unwrap_or(1);
76
77        let mut exit_code = ExitCode::from(101);
78        while remaining_retries > 0 {
79            let (mut execution, filtered_tests) = TestSuiteExecution::construct(
80                &args,
81                registered_dependency_constructors.as_slice(),
82                &all_tests,
83                registered_testsuite_props.as_slice(),
84            );
85            args.finalize_for_execution(&execution, output.clone());
86            let is_top_level_parent = args.is_top_level_parent();
87            let has_selected_tests = execution.remaining() > 0;
88            // Parent-side collection for dependency scopes whose worker-side
89            // value is shipped as bytes or represented as an RPC stub. Async
90            // constructors are awaited here, before workers receive their
91            // reconstructed values. Skip it for empty filtered runs.
92            let needs_parent_shared = execution.has_cloneable_dependencies()
93                || execution.has_hosted_dependencies()
94                || execution.has_hosted_rpc_dependencies();
95            let parent_shared = if is_top_level_parent && has_selected_tests && needs_parent_shared
96            {
97                execution.collect_parent_shared_dependencies_async().await
98            } else {
99                crate::execution::ParentSharedDependencies {
100                    cloneable_wire_bytes: Vec::new(),
101                    hosted_descriptor_bytes: Vec::new(),
102                    hosted_owners: Vec::new(),
103                    hosted_rpc_owner_cells: Vec::new(),
104                }
105            };
106            let cloneable_wire_bytes = parent_shared.cloneable_wire_bytes;
107            let hosted_descriptor_bytes = parent_shared.hosted_descriptor_bytes;
108            let _hosted_owners = parent_shared.hosted_owners;
109            let hosted_rpc_owner_cells: HashMap<String, Arc<HostedRpcOwnerCell>> =
110                parent_shared.hosted_rpc_owner_cells.into_iter().collect();
111            // Pre-built RpcFactory lookup keyed by qualified id, so worker
112            // subprocesses can build stubs without re-locking the global
113            // REGISTERED_DEPENDENCY_CONSTRUCTORS.
114            let rpc_factories: HashMap<String, RpcFactory> = registered_dependency_constructors
115                .iter()
116                .filter_map(|d| {
117                    if d.scope == crate::internal::DepScope::HostedRpc {
118                        d.rpc_factory
119                            .as_ref()
120                            .map(|f| (d.qualified_id(), f.clone()))
121                    } else {
122                        None
123                    }
124                })
125                .collect();
126            // Build a combined Cloneable + Hosted codec/worker lookup table
127            // now, before `test_thread` workers are spawned (see sync.rs
128            // for rationale). Keyed by the dep's fully-qualified id
129            // (`{crate}::{module}::{name}`) so workers can route an incoming
130            // `ProvideCloneable` / `ProvideHostedDescriptor` to the correct
131            // dep even when two deps share a local `name` in different
132            // modules.
133            let cloneable_codecs: HashMap<String, (CloneableCodec, WorkerReconstructor)> =
134                registered_dependency_constructors
135                    .iter()
136                    .filter_map(|d| {
137                        let codec_opt = match d.scope {
138                            crate::internal::DepScope::Cloneable => d.cloneable_codec.as_ref(),
139                            crate::internal::DepScope::Hosted => d.hosted_codec.as_ref(),
140                            _ => None,
141                        };
142                        match (codec_opt, &d.worker_fn) {
143                            (Some(codec), Some(worker_fn)) => {
144                                Some((d.qualified_id(), (codec.clone(), worker_fn.clone())))
145                            }
146                            _ => None,
147                        }
148                    })
149                    .collect();
150            // Mode-consistent Hosted semantics: when this is the top-level
151            // parent AND we do NOT spawn workers (e.g. --nocapture), the
152            // test functions run in this same process, but they must still
153            // see the *worker-side handle* produced by
154            // `HostedDep::from_descriptor`. Reconstruct each handle locally
155            // via the descriptor round-trip and pre-populate the execution
156            // tree.
157            if is_top_level_parent && !args.spawn_workers && !hosted_descriptor_bytes.is_empty() {
158                apply_hosted_descriptors_locally(
159                    &mut execution,
160                    &cloneable_codecs,
161                    &hosted_descriptor_bytes,
162                )
163                .await;
164            }
165            // Mode-consistent HostedRpc semantics for the no-spawn-workers
166            // path: install in-process stubs that route straight to the
167            // parent-held owner cells, so tests see the same `Stub` value
168            // whether or not the runner spawns workers.
169            if is_top_level_parent && !args.spawn_workers && !hosted_rpc_owner_cells.is_empty() {
170                install_local_hosted_rpc_stubs(
171                    &mut execution,
172                    &rpc_factories,
173                    &hosted_rpc_owner_cells,
174                );
175            }
176            if args.spawn_workers {
177                execution.skip_creating_dependencies();
178            }
179
180            // println!("Execution plan: {execution:?}");
181            // println!("Final args: {args:?}");
182            // println!("Has dependencies: {:?}", execution.has_dependencies());
183
184            let count = execution.remaining();
185            let results = Arc::new(Mutex::new(Vec::with_capacity(count)));
186
187            let start = Instant::now();
188            output.start_suite(&filtered_tests);
189
190            let execution = Arc::new(Mutex::new(execution));
191            let cloneable_wire_bytes = Arc::new(cloneable_wire_bytes);
192            let hosted_descriptor_bytes = Arc::new(hosted_descriptor_bytes);
193            let cloneable_codecs = Arc::new(cloneable_codecs);
194            let rpc_factories = Arc::new(rpc_factories);
195            let hosted_rpc_owner_cells = Arc::new(hosted_rpc_owner_cells);
196            let mut join_set = JoinSet::new();
197            let threads = args.test_threads().get();
198
199            for worker_idx in 0..threads {
200                let execution_clone = execution.clone();
201                let output_clone = output.clone();
202                // Stamp each test-thread's args with the worker index it will
203                // hand to its spawned child via `--worker-index <N>`.
204                let mut args_clone = args.clone();
205                if args_clone.spawn_workers {
206                    args_clone.worker_index = Some(worker_idx);
207                }
208                let results_clone = results.clone();
209                let wire_bytes_clone = cloneable_wire_bytes.clone();
210                let hosted_bytes_clone = hosted_descriptor_bytes.clone();
211                let codecs_clone = cloneable_codecs.clone();
212                let rpc_factories_clone = rpc_factories.clone();
213                let hosted_rpc_owner_cells_clone = hosted_rpc_owner_cells.clone();
214                let handle = tokio::runtime::Handle::current();
215                join_set.spawn_blocking(move || {
216                    handle.block_on(test_thread(
217                        args_clone,
218                        execution_clone,
219                        output_clone,
220                        count,
221                        results_clone,
222                        wire_bytes_clone,
223                        hosted_bytes_clone,
224                        codecs_clone,
225                        rpc_factories_clone,
226                        hosted_rpc_owner_cells_clone,
227                    ))
228                });
229            }
230
231            while let Some(res) = join_set.join_next().await {
232                res.expect("Failed to join task");
233            }
234
235            drop(execution);
236
237            let results = results.lock().await;
238            output.finished_suite(&all_tests, &results, start.elapsed());
239            exit_code = SuiteResult::exit_code(&results);
240
241            if exit_code == ExitCode::SUCCESS {
242                break;
243            } else {
244                remaining_retries -= 1;
245            }
246        }
247        exit_code
248    }
249}
250
251#[allow(clippy::too_many_arguments)]
252async fn test_thread(
253    args: Arguments,
254    execution: Arc<Mutex<TestSuiteExecution>>,
255    output: Arc<dyn TestRunnerOutput>,
256    count: usize,
257    results: Arc<Mutex<Vec<(RegisteredTest, TestResult)>>>,
258    cloneable_wire_bytes: Arc<Vec<DepWireBytes>>,
259    hosted_descriptor_bytes: Arc<Vec<DepWireBytes>>,
260    cloneable_codecs: Arc<HashMap<String, (CloneableCodec, WorkerReconstructor)>>,
261    rpc_factories: Arc<HashMap<String, RpcFactory>>,
262    hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
263) {
264    let mut worker = spawn_worker_if_needed(&args).await;
265    // Parent dispatches incoming `HostedRpcCall` frames against the owner
266    // cells materialised in the top-level parent. Workers don't need the owner
267    // cells (they own stubs instead), so they receive an empty map and the
268    // dispatch code path is never reached in subprocesses.
269    if let Some(worker) = worker.as_mut() {
270        worker.set_hosted_rpc_owner_cells(hosted_rpc_owner_cells.clone());
271    }
272    let connection_arc = if let Some(ref name) = args.ipc {
273        let name = ipc_name(name.clone());
274        let stream = Stream::connect(name)
275            .await
276            .expect("Failed to connect to IPC socket");
277        Some(Arc::new(Mutex::new(stream)))
278    } else {
279        None
280    };
281
282    if let Some(worker) = worker.as_mut() {
283        for (dep_id, wire_bytes) in cloneable_wire_bytes.iter() {
284            worker
285                .provide_cloneable(dep_id.clone(), wire_bytes.clone())
286                .await;
287        }
288        // Ship every Hosted dep's descriptor bytes too.
289        for (dep_id, descriptor_bytes) in hosted_descriptor_bytes.iter() {
290            worker
291                .provide_hosted_descriptor(dep_id.clone(), descriptor_bytes.clone())
292                .await;
293        }
294    }
295
296    // Worker subprocess side: build a stub for every HostedRpc dep registered
297    // in this binary using the IPC-backed transport sharing the same socket as
298    // the main IPC loop. Install the stubs in the execution tree so dependency
299    // materialisation skips the parent-only owner constructor.
300    if let Some(connection) = connection_arc.as_ref() {
301        if !rpc_factories.is_empty() {
302            install_worker_subprocess_hosted_rpc_stubs(
303                &execution,
304                &rpc_factories,
305                connection.clone(),
306            )
307            .await;
308        }
309    }
310
311    let mut expected_test = None;
312
313    while !is_done(&execution).await {
314        if let Some(connection) = connection_arc.as_ref() {
315            while expected_test.is_none() {
316                let mut conn = connection.lock().await;
317                let command_bytes = read_frame_async(&mut *conn)
318                    .await
319                    .expect("Failed to read IPC command frame");
320                drop(conn);
321                let command: IpcCommand =
322                    deserialize(&command_bytes).expect("Failed to decode IPC command");
323
324                match command {
325                    IpcCommand::RunTest {
326                        name,
327                        crate_name,
328                        module_path,
329                    } => {
330                        expected_test = Some((name, crate_name, module_path));
331                    }
332                    IpcCommand::ProvideCloneable { dep_id, wire_bytes } => {
333                        // Worker-side reconstruction (see sync.rs::apply_provided_wire_bytes).
334                        apply_provided_wire_bytes(
335                            &execution,
336                            &cloneable_codecs,
337                            &dep_id,
338                            &wire_bytes,
339                            "ProvideCloneable",
340                        )
341                        .await;
342                        let response = IpcResponse::CloneableAccepted { dep_id };
343                        let msg = serialize_to_byte_vec(&response)
344                            .expect("Failed to encode IPC response");
345                        let mut conn = connection.lock().await;
346                        write_frame_async(&mut *conn, &msg)
347                            .await
348                            .expect("Failed to write IPC response frame");
349                    }
350                    IpcCommand::ProvideHostedDescriptor { dep_id, wire_bytes } => {
351                        // Worker-side reconstruction: same shape as
352                        // ProvideCloneable but routed through the registered
353                        // HostedDep worker_fn.
354                        apply_provided_wire_bytes(
355                            &execution,
356                            &cloneable_codecs,
357                            &dep_id,
358                            &wire_bytes,
359                            "ProvideHostedDescriptor",
360                        )
361                        .await;
362                        let response = IpcResponse::HostedDescriptorAccepted { dep_id };
363                        let msg = serialize_to_byte_vec(&response)
364                            .expect("Failed to encode IPC response");
365                        let mut conn = connection.lock().await;
366                        write_frame_async(&mut *conn, &msg)
367                            .await
368                            .expect("Failed to write IPC response frame");
369                    }
370                    IpcCommand::HostedRpcReply { .. } => {
371                        // HR1.2: replies for worker-initiated HostedRpc calls
372                        // are consumed inline by the IPC transport during
373                        // test execution, never by this between-tests
374                        // command loop. Receiving one here means the
375                        // protocol got out of sync; surface that loudly
376                        // rather than dropping the frame.
377                        panic!(
378                            "unexpected `HostedRpcReply` while waiting for the next \
379                             between-tests command in the tokio worker subprocess: a \
380                             stub call must have left a reply on the wire without \
381                             draining it inline"
382                        );
383                    }
384                }
385            }
386        }
387
388        if let Some(next) = pick_next(&execution).await {
389            let skip = if let Some((name, crate_name, module_path)) = &expected_test {
390                next.test.name != *name
391                    || next.test.crate_name != *crate_name
392                    || next.test.module_path != *module_path
393            } else {
394                false
395            };
396
397            if !skip {
398                expected_test = None;
399
400                let ensure_time = get_ensure_time(&args, &next.test);
401
402                output.start_running_test(&next.test, next.index, count);
403                let result = run_test(
404                    output.clone(),
405                    next.index,
406                    count,
407                    args.nocapture,
408                    args.include_ignored,
409                    ensure_time,
410                    next.deps.clone(),
411                    &next.test,
412                    &mut worker,
413                )
414                .await;
415                output.finished_running_test(&next.test, next.index, count, &result);
416
417                if let Some(connection) = connection_arc.as_ref() {
418                    let finish_marker = Uuid::new_v4().to_string();
419                    let finish_marker_line = format!("{finish_marker}\n");
420                    tokio::io::stdout()
421                        .write_all(finish_marker_line.as_bytes())
422                        .await
423                        .unwrap();
424                    tokio::io::stderr()
425                        .write_all(finish_marker_line.as_bytes())
426                        .await
427                        .unwrap();
428                    tokio::io::stdout().flush().await.unwrap();
429                    tokio::io::stderr().flush().await.unwrap();
430
431                    let response = IpcResponse::TestFinished {
432                        result: (&result).into(),
433                        finish_marker,
434                    };
435                    let msg =
436                        serialize_to_byte_vec(&response).expect("Failed to encode IPC response");
437                    let mut conn = connection.lock().await;
438                    write_frame_async(&mut *conn, &msg)
439                        .await
440                        .expect("Failed to write IPC response frame");
441                }
442
443                results.lock().await.push((next.test.clone(), result));
444            }
445        }
446    }
447}
448
449async fn is_done(execution: &Arc<Mutex<TestSuiteExecution>>) -> bool {
450    let execution = execution.lock().await;
451    execution.is_done()
452}
453
454/// Async counterpart to `sync::apply_provided_wire_bytes`. Decodes the wire
455/// bytes into a worker-side dependency value (looked up by the dep's
456/// fully-qualified id `{crate}::{module}::{name}`) and stores it in the
457/// execution tree so the next `materialize_deps` call uses the pre-resolved
458/// value.
459///
460/// `source_command` is the textual name of the IPC command that delivered
461/// the bytes (`"ProvideCloneable"` or `"ProvideHostedDescriptor"`); used
462/// only in panic messages.
463async fn apply_provided_wire_bytes(
464    execution: &Arc<Mutex<TestSuiteExecution>>,
465    wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
466    dep_id: &str,
467    wire_bytes: &[u8],
468    source_command: &str,
469) {
470    let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
471        panic!("{source_command} referenced unknown wire-shipped dep '{dep_id}'")
472    });
473
474    let wire_payload = (codec.from_wire_bytes)(wire_bytes);
475    let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
476        Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
477    let reconstructed = match worker_fn {
478        WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
479        WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
480    };
481
482    let mut execution = execution.lock().await;
483    let applied = execution.provide_cloneable_value(dep_id, reconstructed);
484    assert!(
485        applied,
486        "{source_command} for dep '{dep_id}' did not match any registered dep in this worker"
487    );
488}
489
490/// Mode-consistent Hosted semantics for the no-spawn-workers path on the
491/// tokio runner. Mirrors `sync::apply_hosted_descriptors_locally`: takes
492/// the parent-collected descriptor bytes and reconstructs each Hosted
493/// dep's worker-side handle (via the registered codec + worker_fn)
494/// directly in the parent's `TestSuiteExecution`.
495///
496/// Both `WorkerReconstructor::Sync` (`HostedDep::from_descriptor`) and
497/// `WorkerReconstructor::Async` (`AsyncHostedDep::from_descriptor`) are
498/// supported here so that `async_worker` Hosted deps see the same
499/// worker-side handle whether the runner ended up in spawned-worker mode
500/// or in the no-spawn fallback, matching the documented mode-consistent
501/// Hosted contract in `book/src/advanced_features/dependency_sharing.md`.
502async fn apply_hosted_descriptors_locally(
503    execution: &mut TestSuiteExecution,
504    wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
505    descriptor_bytes: &[DepWireBytes],
506) {
507    for (dep_id, wire_bytes) in descriptor_bytes {
508        let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
509            panic!("Hosted dep '{dep_id}' missing codec/worker_fn for local handle reconstruction")
510        });
511        let wire_payload = (codec.from_wire_bytes)(wire_bytes);
512        let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
513            Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
514        let reconstructed = match worker_fn {
515            WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
516            WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
517        };
518        let applied = execution.provide_cloneable_value(dep_id, reconstructed);
519        assert!(
520            applied,
521            "Hosted dep '{dep_id}' could not be pre-populated locally"
522        );
523    }
524}
525
526async fn pick_next(execution: &Arc<Mutex<TestSuiteExecution>>) -> Option<TestExecution> {
527    let mut execution = execution.lock().await;
528    execution.pick_next().await
529}
530
531async fn run_with_flakiness_control<F>(
532    output: Arc<dyn TestRunnerOutput>,
533    test_description: &RegisteredTest,
534    idx: usize,
535    count: usize,
536    test: F,
537) -> Result<Result<(), FailureCause>, Box<dyn Any + Send>>
538where
539    F: Fn(
540            Instant,
541        )
542            -> Pin<Box<dyn Future<Output = Result<Result<(), FailureCause>, Box<dyn Any + Send>>>>>
543        + Send
544        + Sync,
545{
546    match &test_description.props.flakiness_control {
547        FlakinessControl::None => {
548            let start = Instant::now();
549            test(start).await
550        }
551        FlakinessControl::ProveNonFlaky(tries) => {
552            for n in 0..*tries {
553                if n > 0 {
554                    output.repeat_running_test(
555                        test_description,
556                        idx,
557                        count,
558                        n + 1,
559                        *tries,
560                        "to ensure test is not flaky",
561                    );
562                }
563                let start = Instant::now();
564                match test(start).await {
565                    Ok(Ok(())) => {}
566                    Ok(Err(e)) => return Ok(Err(e)),
567                    Err(e) => return Err(e),
568                };
569            }
570            Ok(Ok(()))
571        }
572        FlakinessControl::RetryKnownFlaky(max_retries) => {
573            let mut tries = 1;
574            loop {
575                let start = Instant::now();
576                let result = test(start).await;
577
578                if result.is_err() && tries < *max_retries {
579                    tries += 1;
580                    output.repeat_running_test(
581                        test_description,
582                        idx,
583                        count,
584                        tries,
585                        *max_retries,
586                        "because test is known to be flaky",
587                    );
588                } else {
589                    break result;
590                }
591            }
592        }
593    }
594}
595
596#[allow(clippy::too_many_arguments)]
597async fn run_test(
598    output: Arc<dyn TestRunnerOutput>,
599    idx: usize,
600    count: usize,
601    nocapture: bool,
602    include_ignored: bool,
603    ensure_time: Option<TimeThreshold>,
604    dependency_view: Arc<dyn internal::DependencyView + Send + Sync>,
605    test: &RegisteredTest,
606    worker: &mut Option<Worker>,
607) -> TestResult {
608    if test.props.is_ignored && !include_ignored {
609        TestResult::ignored()
610    } else if let Some(worker) = worker.as_mut() {
611        worker.run_test(nocapture, test).await
612    } else {
613        let start = Instant::now();
614        let test = test.clone();
615        match &test.run {
616            TestFunction::Sync(_) => {
617                let handle = spawn_blocking(move || {
618                    let test = test.clone();
619                    crate::sync::run_sync_test_function(
620                        output,
621                        &test,
622                        idx,
623                        count,
624                        ensure_time,
625                        dependency_view,
626                    )
627                });
628                handle.await.unwrap_or_else(|join_error| {
629                    TestResult::failed(
630                        start.elapsed(),
631                        FailureCause::HarnessError(format!(
632                            "Failed joining test task: {join_error}"
633                        )),
634                    )
635                })
636            }
637            TestFunction::Async(test_fn) => {
638                let timeout = test.props.timeout;
639                let test_fn = test_fn.clone();
640                let detached_panic_policy = test.props.detached_panic_policy.clone();
641                let result = run_with_flakiness_control(output, &test, idx, count, |start| {
642                    let dependency_view = dependency_view.clone();
643                    let test_fn = test_fn.clone();
644                    Box::pin(async move {
645                        let test_id = crate::panic_hook::next_test_id();
646                        crate::panic_hook::set_current_test_id(test_id);
647                        crate::panic_hook::create_detached_collector(test_id);
648                        let result = AssertUnwindSafe(Box::pin(async move {
649                            match timeout {
650                                None => test_fn(dependency_view).await,
651                                Some(duration) => {
652                                    let result =
653                                        tokio::time::timeout(duration, test_fn(dependency_view))
654                                            .await;
655                                    match result {
656                                        Ok(result) => result,
657                                        Err(_) => {
658                                            return Err(FailureCause::HarnessError(
659                                                "Test timed out".to_string(),
660                                            ))
661                                        }
662                                    }
663                                }
664                            }
665                            .into_result()?;
666                            if let Some(ensure_time) = ensure_time {
667                                let elapsed = start.elapsed();
668                                if ensure_time.is_critical(&elapsed) {
669                                    return Err(FailureCause::HarnessError(format!(
670                                        "Test run time exceeds critical threshold: {elapsed:?}"
671                                    )));
672                                }
673                            }
674                            Ok(())
675                        }))
676                        .catch_unwind()
677                        .await;
678                        result
679                    })
680                })
681                .await;
682                let mut test_result =
683                    TestResult::from_result(&test.props.should_panic, start.elapsed(), result);
684                if let Some(test_id) = crate::panic_hook::current_test_id() {
685                    if let Some(collector) = crate::panic_hook::take_detached_collector(test_id) {
686                        let panics = match collector.lock() {
687                            Ok(p) => p,
688                            Err(poisoned) => poisoned.into_inner(),
689                        };
690                        if !panics.is_empty()
691                            && detached_panic_policy == internal::DetachedPanicPolicy::FailTest
692                            && test_result.is_passed()
693                        {
694                            let messages: Vec<String> = panics.iter().map(|p| p.render()).collect();
695                            test_result = TestResult::failed(
696                                start.elapsed(),
697                                FailureCause::Panic(internal::PanicCause {
698                                    message: Some(format!(
699                                        "Detached task(s) panicked:\n{}",
700                                        messages.join("\n---\n")
701                                    )),
702                                    location: panics.first().and_then(|p| p.location.clone()),
703                                    backtrace: panics.first().and_then(|p| p.backtrace.clone()),
704                                }),
705                            );
706                        }
707                    }
708                }
709                crate::panic_hook::clear_current_test_id();
710                test_result
711            }
712            TestFunction::SyncBench(_) => {
713                let handle = spawn_blocking(move || {
714                    let test = test.clone();
715                    crate::sync::run_sync_test_function(
716                        output,
717                        &test,
718                        idx,
719                        count,
720                        ensure_time,
721                        dependency_view,
722                    )
723                });
724                handle.await.unwrap_or_else(|join_error| {
725                    TestResult::failed(
726                        start.elapsed(),
727                        FailureCause::HarnessError(format!(
728                            "Failed joining test task: {join_error}"
729                        )),
730                    )
731                })
732            }
733            TestFunction::AsyncBench(bench_fn) => {
734                let mut bencher = AsyncBencher::new();
735                let test_id = crate::panic_hook::next_test_id();
736                crate::panic_hook::set_current_test_id(test_id);
737                let result = AssertUnwindSafe(async move {
738                    bench_fn(&mut bencher, dependency_view).await;
739                    (
740                        bencher
741                            .summary()
742                            .expect("iter() was not called in bench function"),
743                        bencher.bytes,
744                    )
745                })
746                .catch_unwind()
747                .await;
748                let bytes = result.as_ref().map(|(_, bytes)| *bytes).unwrap_or_default();
749                let test_result = TestResult::from_summary(
750                    &test.props.should_panic,
751                    start.elapsed(),
752                    result.map(|(summary, _)| summary),
753                    bytes,
754                );
755                crate::panic_hook::clear_current_test_id();
756                test_result
757            }
758        }
759    }
760}
761
762struct Worker {
763    _listener: Listener,
764    _process: Child,
765    _out_handle: JoinHandle<()>,
766    _err_handle: JoinHandle<()>,
767    out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
768    err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
769    capture_enabled: Arc<Mutex<bool>>,
770    connection: Stream,
771    /// Parent-held HostedRpc owner cells keyed by fully-qualified dep id. Used
772    /// to dispatch incoming `IpcResponse::HostedRpcCall` frames from the worker
773    /// subprocess back to the right owner.
774    hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
775}
776
777impl Worker {
778    /// Installs the parent-side map of HostedRpc owner cells so this worker can
779    /// route incoming `IpcResponse::HostedRpcCall` frames to the right
780    /// `HostedRpcOwnerCell` while waiting for a worker subprocess response.
781    fn set_hosted_rpc_owner_cells(&mut self, cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>) {
782        self.hosted_rpc_owner_cells = cells;
783    }
784
785    /// Parent-side dispatcher for a single `IpcResponse::HostedRpcCall`. Looks
786    /// up the owner cell by fully-qualified dep id, runs the dispatch on the
787    /// parent's stored owner, and writes the matching
788    /// `IpcCommand::HostedRpcReply` back to the worker subprocess. Mirrors
789    /// `sync::Worker::handle_hosted_rpc_call`.
790    async fn handle_hosted_rpc_call(
791        &mut self,
792        dump_on_ipc_failure: &DumpOnFailure,
793        request_id: u64,
794        dep_id: String,
795        method_idx: u32,
796        args_bytes: Vec<u8>,
797    ) {
798        let body = match self.hosted_rpc_owner_cells.get(&dep_id) {
799            Some(cell) => match cell.dispatch(method_idx, &args_bytes) {
800                Ok(result_bytes) => HostedRpcReplyBody::Ok { result_bytes },
801                Err(message) => HostedRpcReplyBody::Err { message },
802            },
803            None => HostedRpcReplyBody::Err {
804                message: format!(
805                    "HostedRpc dispatch: unknown dep id '{dep_id}' in parent owner-cell map"
806                ),
807            },
808        };
809        let reply = IpcCommand::HostedRpcReply { request_id, body };
810        let msg = serialize_to_byte_vec(&reply).expect("Failed to encode HostedRpcReply");
811        dump_on_ipc_failure
812            .run(write_frame_async(&mut self.connection, &msg).await)
813            .await;
814    }
815
816    pub async fn run_test(&mut self, nocapture: bool, test: &RegisteredTest) -> TestResult {
817        let mut capture_enabled = self.capture_enabled.lock().await;
818        *capture_enabled = test.props.capture_control.requires_capturing(!nocapture);
819        drop(capture_enabled);
820
821        // Send IPC command and wait for IPC response, and in the meantime read from the stdout/stderr channels
822        let cmd = IpcCommand::RunTest {
823            name: test.name.clone(),
824            crate_name: test.crate_name.clone(),
825            module_path: test.module_path.clone(),
826        };
827
828        let dump_on_ipc_failure = self.dump_on_failure();
829
830        let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
831        dump_on_ipc_failure
832            .run(write_frame_async(&mut self.connection, &msg).await)
833            .await;
834
835        let response = loop {
836            let response_bytes = dump_on_ipc_failure
837                .run(read_frame_async(&mut self.connection).await)
838                .await;
839            let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
840            match response {
841                IpcResponse::TestFinished { .. } => break response,
842                IpcResponse::CloneableAccepted { .. }
843                | IpcResponse::HostedDescriptorAccepted { .. } => continue,
844                IpcResponse::HostedRpcCall {
845                    request_id,
846                    dep_id,
847                    method_idx,
848                    args_bytes,
849                } => {
850                    self.handle_hosted_rpc_call(
851                        &dump_on_ipc_failure,
852                        request_id,
853                        dep_id,
854                        method_idx,
855                        args_bytes,
856                    )
857                    .await;
858                    continue;
859                }
860            }
861        };
862
863        let IpcResponse::TestFinished {
864            result,
865            finish_marker,
866        } = response
867        else {
868            unreachable!("loop only breaks on TestFinished")
869        };
870
871        if test.props.capture_control.requires_capturing(!nocapture) {
872            let out_lines: Vec<_> =
873                Self::drain_until(self.out_lines.clone(), finish_marker.clone()).await;
874            let err_lines: Vec<_> =
875                Self::drain_until(self.err_lines.clone(), finish_marker.clone()).await;
876            result.into_test_result(out_lines, err_lines)
877        } else {
878            result.into_test_result(Vec::new(), Vec::new())
879        }
880    }
881
882    /// Async counterpart to `sync::Worker::provide_cloneable`. `dep_id` is the
883    /// dep's fully-qualified id (`{crate}::{module}::{name}`).
884    async fn provide_cloneable(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
885        let dump_on_ipc_failure = self.dump_on_failure();
886        let cmd = IpcCommand::ProvideCloneable {
887            dep_id: dep_id.clone(),
888            wire_bytes,
889        };
890        let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
891        dump_on_ipc_failure
892            .run(write_frame_async(&mut self.connection, &msg).await)
893            .await;
894
895        loop {
896            let response_bytes = dump_on_ipc_failure
897                .run(read_frame_async(&mut self.connection).await)
898                .await;
899            let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
900            match response {
901                IpcResponse::CloneableAccepted { dep_id: ack_id } => {
902                    if ack_id == dep_id {
903                        return;
904                    }
905                }
906                IpcResponse::HostedDescriptorAccepted { .. } => {
907                    // Out-of-band ack from a previous ProvideHostedDescriptor; ignore.
908                }
909                IpcResponse::TestFinished { .. } => {
910                    // Should not happen before any RunTest.
911                }
912                IpcResponse::HostedRpcCall {
913                    request_id,
914                    dep_id: rpc_dep_id,
915                    method_idx,
916                    args_bytes,
917                } => {
918                    // A worker subprocess can issue a HostedRpc call from
919                    // inside an in-progress test, even while the parent is
920                    // mid-`ProvideCloneable` for a different dep. Dispatch it
921                    // so the protocol doesn't desync.
922                    self.handle_hosted_rpc_call(
923                        &dump_on_ipc_failure,
924                        request_id,
925                        rpc_dep_id,
926                        method_idx,
927                        args_bytes,
928                    )
929                    .await;
930                }
931            }
932        }
933    }
934
935    /// Async counterpart to `sync::Worker::provide_hosted_descriptor`.
936    async fn provide_hosted_descriptor(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
937        let dump_on_ipc_failure = self.dump_on_failure();
938        let cmd = IpcCommand::ProvideHostedDescriptor {
939            dep_id: dep_id.clone(),
940            wire_bytes,
941        };
942        let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
943        dump_on_ipc_failure
944            .run(write_frame_async(&mut self.connection, &msg).await)
945            .await;
946
947        loop {
948            let response_bytes = dump_on_ipc_failure
949                .run(read_frame_async(&mut self.connection).await)
950                .await;
951            let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
952            match response {
953                IpcResponse::HostedDescriptorAccepted { dep_id: ack_id } => {
954                    if ack_id == dep_id {
955                        return;
956                    }
957                }
958                IpcResponse::CloneableAccepted { .. } => {
959                    // Out-of-band ack from a previous ProvideCloneable; ignore.
960                }
961                IpcResponse::TestFinished { .. } => {
962                    // Should not happen before any RunTest.
963                }
964                IpcResponse::HostedRpcCall {
965                    request_id,
966                    dep_id: rpc_dep_id,
967                    method_idx,
968                    args_bytes,
969                } => {
970                    // See provide_cloneable arm. Dispatch the call inline so
971                    // the IPC stream stays in sync.
972                    self.handle_hosted_rpc_call(
973                        &dump_on_ipc_failure,
974                        request_id,
975                        rpc_dep_id,
976                        method_idx,
977                        args_bytes,
978                    )
979                    .await;
980                }
981            }
982        }
983    }
984
985    fn dump_on_failure(&self) -> DumpOnFailure {
986        DumpOnFailure {
987            out_lines: self.out_lines.clone(),
988            err_lines: self.err_lines.clone(),
989        }
990    }
991
992    async fn drain_until(
993        source: Arc<Mutex<VecDeque<CapturedOutput>>>,
994        finish_marker: String,
995    ) -> Vec<CapturedOutput> {
996        let mut result = Vec::new();
997        loop {
998            let mut source = source.lock().await;
999            while let Some(line) = source.pop_front() {
1000                if line.line() == finish_marker {
1001                    return result;
1002                } else {
1003                    result.push(line.clone());
1004                }
1005            }
1006            drop(source);
1007
1008            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1009        }
1010    }
1011}
1012
1013struct DumpOnFailure {
1014    out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1015    err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1016}
1017
1018impl DumpOnFailure {
1019    pub async fn run<T, E>(&self, result: Result<T, E>) -> T {
1020        match result {
1021            Ok(value) => value,
1022            Err(_error) => {
1023                let out_lines: Vec<_> = self.out_lines.lock().await.drain(..).collect();
1024                let err_lines: Vec<_> = self.err_lines.lock().await.drain(..).collect();
1025                let mut all_lines = [out_lines, err_lines].concat();
1026                all_lines.sort();
1027
1028                for line in all_lines {
1029                    eprintln!("{}", line.line());
1030                }
1031
1032                std::process::exit(1);
1033            }
1034        }
1035    }
1036}
1037
1038async fn spawn_worker_if_needed(args: &Arguments) -> Option<Worker> {
1039    if args.spawn_workers {
1040        let id = Uuid::new_v4();
1041        let name_str = format!("{id}.sock");
1042        let name = name_str
1043            .clone()
1044            .to_ns_name::<GenericNamespaced>()
1045            .expect("Invalid local socket name");
1046        let opts = ListenerOptions::new().name(name.clone());
1047        let listener = opts
1048            .create_tokio()
1049            .expect("Failed to create local socket listener");
1050
1051        let exe = std::env::current_exe().expect("Failed to get current executable path");
1052
1053        let mut args = args.clone();
1054        args.ipc = Some(name_str);
1055        args.spawn_workers = false;
1056        args.logfile = None;
1057        let args = args.to_args();
1058
1059        let mut process = Command::new(exe)
1060            .args(args)
1061            .stdin(Stdio::inherit())
1062            .stderr(Stdio::piped())
1063            .stdout(Stdio::piped())
1064            .spawn()
1065            .expect("Failed to spawn worker process");
1066
1067        let stdout = process.stdout.take().unwrap();
1068        let stderr = process.stderr.take().unwrap();
1069
1070        let out_lines = Arc::new(Mutex::new(VecDeque::new()));
1071        let err_lines = Arc::new(Mutex::new(VecDeque::new()));
1072        let capture_enabled = Arc::new(Mutex::new(true));
1073
1074        let out_lines_clone = out_lines.clone();
1075        let capture_enabled_clone = capture_enabled.clone();
1076        let out_handle = spawn(async move {
1077            let reader = BufReader::new(stdout);
1078            let mut lines = reader.lines();
1079            while let Some(line) = lines
1080                .next_line()
1081                .await
1082                .expect("Failed to read from worker stdout")
1083            {
1084                if *capture_enabled_clone.lock().await {
1085                    out_lines_clone
1086                        .lock()
1087                        .await
1088                        .push_back(CapturedOutput::stdout(line));
1089                } else {
1090                    println!("{line}");
1091                }
1092            }
1093        });
1094
1095        let err_lines_clone = err_lines.clone();
1096        let capture_enabled_clone = capture_enabled.clone();
1097        let err_handle = spawn(async move {
1098            let reader = BufReader::new(stderr);
1099            let mut lines = reader.lines();
1100            while let Some(line) = lines
1101                .next_line()
1102                .await
1103                .expect("Failed to read from worker stderr")
1104            {
1105                if *capture_enabled_clone.lock().await {
1106                    err_lines_clone
1107                        .lock()
1108                        .await
1109                        .push_back(CapturedOutput::stderr(line));
1110                } else {
1111                    eprintln!("{line}");
1112                }
1113            }
1114        });
1115
1116        let connection = listener
1117            .accept()
1118            .await
1119            .expect("Failed to accept connection");
1120
1121        Some(Worker {
1122            _listener: listener,
1123            _process: process,
1124            _out_handle: out_handle,
1125            _err_handle: err_handle,
1126            out_lines,
1127            err_lines,
1128            connection,
1129            capture_enabled,
1130            hosted_rpc_owner_cells: Arc::new(HashMap::new()),
1131        })
1132    } else {
1133        None
1134    }
1135}
1136
1137/// Parent-side `--nocapture` / no-spawn-workers helper. Builds one stub per
1138/// HostedRpc dep using an [`InProcessHostedRpcTransport`] that points at the
1139/// parent-held owner cells, and stashes it in the execution tree so dependency
1140/// materialisation skips the owner-only constructor. Mirrors
1141/// `sync::install_local_hosted_rpc_stubs`.
1142fn install_local_hosted_rpc_stubs(
1143    execution: &mut TestSuiteExecution,
1144    rpc_factories: &HashMap<String, RpcFactory>,
1145    owner_cells: &HashMap<String, Arc<HostedRpcOwnerCell>>,
1146) {
1147    let transport: Arc<dyn HostedRpcTransport> =
1148        Arc::new(InProcessHostedRpcTransport::new(owner_cells.clone()));
1149    for (dep_id, factory) in rpc_factories.iter() {
1150        if !owner_cells.contains_key(dep_id) {
1151            // No owner cell materialised for this dep (e.g. registered
1152            // globally but not pulled into the current filter). Skip so
1153            // we don't install a stub nothing routes.
1154            continue;
1155        }
1156        let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1157        let stub = (factory.build_stub)(channel);
1158        let applied = execution.provide_cloneable_value(dep_id, stub);
1159        assert!(
1160            applied,
1161            "Local HostedRpc stub for '{dep_id}' did not match any registered dep"
1162        );
1163    }
1164}
1165
1166/// Worker subprocess helper. Builds one stub per registered HostedRpc dep backed
1167/// by [`IpcHostedRpcTransport`], and installs it in the execution tree. Mirrors
1168/// `sync::install_worker_subprocess_hosted_rpc_stubs` but runs on the tokio
1169/// `Arc<Mutex<Stream>>` connection.
1170async fn install_worker_subprocess_hosted_rpc_stubs(
1171    execution: &Arc<Mutex<TestSuiteExecution>>,
1172    rpc_factories: &HashMap<String, RpcFactory>,
1173    connection_arc: Arc<Mutex<Stream>>,
1174) {
1175    let transport: Arc<dyn HostedRpcTransport> =
1176        Arc::new(IpcHostedRpcTransport::new(connection_arc));
1177    for (dep_id, factory) in rpc_factories.iter() {
1178        let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1179        let stub = (factory.build_stub)(channel);
1180        let mut execution = execution.lock().await;
1181        let applied = execution.provide_cloneable_value(dep_id, stub);
1182        // Not every binary registers a HostedRpc dep that the current
1183        // execution actually uses; if so, just move on.
1184        let _ = applied;
1185    }
1186}
1187
1188/// Worker subprocess `HostedRpcTransport` for the tokio runner.
1189/// Mirrors `sync::IpcHostedRpcTransport` but bridges a sync trait method
1190/// to the async tokio IPC primitives via
1191/// `tokio::task::block_in_place` + `Handle::current().block_on(...)`.
1192///
1193/// The shared `Arc<Mutex<Stream>>` is the same one used by the
1194/// worker subprocess's main IPC loop; the lock guarantees that a stub
1195/// call and the main loop never interleave a half-written frame. Calls
1196/// serialise across all in-flight stubs by acquiring the mutex for the
1197/// full request-then-reply round trip.
1198///
1199/// This relies on the temporal invariant documented on
1200/// [`crate::internal::HostedRpcChannel::call`]: stubs are only invoked
1201/// from inside a running test body, never from `build_stub`, and never
1202/// from detached background work that outlives the test. Under those
1203/// rules the main IPC loop is only reading between tests, so it cannot
1204/// race with a stub call.
1205struct IpcHostedRpcTransport {
1206    connection: Arc<Mutex<Stream>>,
1207    next_request_id: AtomicU64,
1208}
1209
1210impl IpcHostedRpcTransport {
1211    fn new(connection: Arc<Mutex<Stream>>) -> Self {
1212        Self {
1213            connection,
1214            next_request_id: AtomicU64::new(0),
1215        }
1216    }
1217}
1218
1219impl HostedRpcTransport for IpcHostedRpcTransport {
1220    fn call(
1221        &self,
1222        dep_id: &str,
1223        method_idx: u32,
1224        args: Vec<u8>,
1225    ) -> Result<Vec<u8>, HostedRpcError> {
1226        let request_id = self.next_request_id.fetch_add(1, Ordering::SeqCst);
1227        let call = IpcResponse::HostedRpcCall {
1228            request_id,
1229            dep_id: dep_id.to_string(),
1230            method_idx,
1231            args_bytes: args,
1232        };
1233        let msg = serialize_to_byte_vec(&call).map_err(|e| {
1234            HostedRpcError::Transport(format!("encode HostedRpcCall failed: {e:?}"))
1235        })?;
1236
1237        let connection = self.connection.clone();
1238        let handle = tokio::runtime::Handle::current();
1239
1240        // Run the async I/O round-trip from inside this sync trait
1241        // method. `block_in_place` releases this worker thread back to
1242        // the scheduler so the parent's read loop can continue making
1243        // progress on other tasks while we wait for the reply.
1244        tokio::task::block_in_place(move || {
1245            handle.block_on(async move {
1246                let mut conn = connection.lock().await;
1247                write_frame_async(&mut *conn, &msg).await.map_err(|e| {
1248                    HostedRpcError::Transport(format!("write HostedRpcCall failed: {e:?}"))
1249                })?;
1250                let reply_bytes = read_frame_async(&mut *conn).await.map_err(|e| {
1251                    HostedRpcError::Transport(format!("read HostedRpcReply failed: {e:?}"))
1252                })?;
1253                let command: IpcCommand = deserialize(&reply_bytes).map_err(|e| {
1254                    HostedRpcError::Transport(format!("decode HostedRpcReply failed: {e:?}"))
1255                })?;
1256                match command {
1257                    IpcCommand::HostedRpcReply {
1258                        request_id: reply_id,
1259                        body,
1260                    } => {
1261                        if reply_id != request_id {
1262                            return Err(HostedRpcError::Transport(format!(
1263                                "HostedRpcReply request_id mismatch: expected {request_id}, got {reply_id}"
1264                            )));
1265                        }
1266                        match body {
1267                            HostedRpcReplyBody::Ok { result_bytes } => Ok(result_bytes),
1268                            HostedRpcReplyBody::Err { message } => {
1269                                Err(HostedRpcError::Dispatch(message))
1270                            }
1271                        }
1272                    }
1273                    other => Err(HostedRpcError::Transport(format!(
1274                        "unexpected IpcCommand while waiting for HostedRpcReply: {other:?}"
1275                    ))),
1276                }
1277            })
1278        })
1279    }
1280}