1use crate::args::{Arguments, TimeThreshold};
2use crate::bench::AsyncBencher;
3use crate::execution::{DepWireBytes, TestExecution, TestSuiteExecution};
4use crate::internal;
5use crate::internal::{
6 generate_tests, get_ensure_time, CapturedOutput, CloneableCodec, FailureCause,
7 FlakinessControl, HostedRpcChannel, HostedRpcError, HostedRpcOwnerCell, HostedRpcTransport,
8 InProcessHostedRpcTransport, RegisteredTest, RpcFactory, SuiteResult, TestFunction, TestResult,
9 WorkerReconstructor,
10};
11use crate::ipc::{
12 ipc_name, read_frame_async, write_frame_async, HostedRpcReplyBody, IpcCommand, IpcResponse,
13};
14use crate::output::{test_runner_output, TestRunnerOutput};
15use desert_rust::{deserialize, serialize_to_byte_vec};
16use futures::FutureExt;
17use interprocess::local_socket::tokio::prelude::*;
18use interprocess::local_socket::tokio::{Listener, Stream};
19use interprocess::local_socket::{GenericNamespaced, ListenerOptions};
20use std::any::Any;
21use std::collections::HashMap;
22use std::collections::VecDeque;
23use std::future::Future;
24use std::panic::AssertUnwindSafe;
25use std::pin::Pin;
26use std::process::{ExitCode, Stdio};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
30use tokio::process::{Child, Command};
31use tokio::spawn;
32use tokio::sync::Mutex;
33use tokio::task::{spawn_blocking, JoinHandle, JoinSet};
34use tokio::time::Instant;
35use uuid::Uuid;
36
37pub fn test_runner() -> ExitCode {
38 tokio::runtime::Builder::new_multi_thread()
39 .enable_all()
40 .build()
41 .unwrap()
42 .block_on(async_test_runner())
43}
44
45#[allow(clippy::await_holding_lock)]
46async fn async_test_runner() -> ExitCode {
47 crate::panic_hook::install_panic_hook();
48 let mut args = Arguments::from_args();
49 if let Some(idx) = args.worker_index {
53 crate::worker::set_worker_index(idx);
54 }
55 let output = test_runner_output(&args);
56
57 let registered_tests = internal::REGISTERED_TESTS.lock().unwrap();
58 let registered_dependency_constructors =
59 internal::REGISTERED_DEPENDENCY_CONSTRUCTORS.lock().unwrap();
60 let registered_testsuite_props = internal::REGISTERED_TESTSUITE_PROPS.lock().unwrap();
61 let registered_test_generators = internal::REGISTERED_TEST_GENERATORS.lock().unwrap();
62
63 let generated_tests = generate_tests(®istered_test_generators).await;
64
65 let all_tests: Vec<RegisteredTest> = registered_tests
66 .iter()
67 .cloned()
68 .chain(generated_tests)
69 .collect();
70
71 if args.list {
72 output.test_list(&all_tests);
73 ExitCode::SUCCESS
74 } else {
75 let mut remaining_retries = args.flaky_run.unwrap_or(1);
76
77 let mut exit_code = ExitCode::from(101);
78 while remaining_retries > 0 {
79 let (mut execution, filtered_tests) = TestSuiteExecution::construct(
80 &args,
81 registered_dependency_constructors.as_slice(),
82 &all_tests,
83 registered_testsuite_props.as_slice(),
84 );
85 args.finalize_for_execution(&execution, output.clone());
86 let is_top_level_parent = args.is_top_level_parent();
87 let has_selected_tests = execution.remaining() > 0;
88 let needs_parent_shared = execution.has_cloneable_dependencies()
93 || execution.has_hosted_dependencies()
94 || execution.has_hosted_rpc_dependencies();
95 let parent_shared = if is_top_level_parent && has_selected_tests && needs_parent_shared
96 {
97 execution.collect_parent_shared_dependencies_async().await
98 } else {
99 crate::execution::ParentSharedDependencies {
100 cloneable_wire_bytes: Vec::new(),
101 cloneable_local_values: Vec::new(),
102 hosted_descriptor_bytes: Vec::new(),
103 hosted_owners: Vec::new(),
104 hosted_rpc_owner_cells: Vec::new(),
105 parent_constructed_shared_values: Vec::new(),
106 }
107 };
108 let cloneable_wire_bytes = parent_shared.cloneable_wire_bytes;
109 let cloneable_local_values = parent_shared.cloneable_local_values;
110 let hosted_descriptor_bytes = parent_shared.hosted_descriptor_bytes;
111 let _hosted_owners = parent_shared.hosted_owners;
112 let hosted_rpc_owner_cells: HashMap<String, Arc<HostedRpcOwnerCell>> =
113 parent_shared.hosted_rpc_owner_cells.into_iter().collect();
114 let parent_constructed_shared_values = parent_shared.parent_constructed_shared_values;
115 let rpc_factories: HashMap<String, RpcFactory> = registered_dependency_constructors
119 .iter()
120 .filter_map(|d| {
121 if d.scope == crate::internal::DepScope::HostedRpc {
122 d.rpc_factory
123 .as_ref()
124 .map(|f| (d.qualified_id(), f.clone()))
125 } else {
126 None
127 }
128 })
129 .collect();
130 let cloneable_codecs: HashMap<String, (CloneableCodec, WorkerReconstructor)> =
138 registered_dependency_constructors
139 .iter()
140 .filter_map(|d| {
141 let codec_opt = match d.scope {
142 crate::internal::DepScope::Cloneable => d.cloneable_codec.as_ref(),
143 crate::internal::DepScope::Hosted => d.hosted_codec.as_ref(),
144 _ => None,
145 };
146 match (codec_opt, &d.worker_fn) {
147 (Some(codec), Some(worker_fn)) => {
148 Some((d.qualified_id(), (codec.clone(), worker_fn.clone())))
149 }
150 _ => None,
151 }
152 })
153 .collect();
154 if is_top_level_parent && !args.spawn_workers && !cloneable_local_values.is_empty() {
164 apply_cloneable_values_locally(&mut execution, &cloneable_local_values);
165 }
166 if is_top_level_parent && !args.spawn_workers && !hosted_descriptor_bytes.is_empty() {
174 apply_hosted_descriptors_locally(
175 &mut execution,
176 &cloneable_codecs,
177 &hosted_descriptor_bytes,
178 )
179 .await;
180 }
181 if is_top_level_parent && !args.spawn_workers && !hosted_rpc_owner_cells.is_empty() {
186 install_local_hosted_rpc_stubs(
187 &mut execution,
188 &rpc_factories,
189 &hosted_rpc_owner_cells,
190 );
191 }
192 if is_top_level_parent
199 && !args.spawn_workers
200 && !parent_constructed_shared_values.is_empty()
201 {
202 apply_parent_constructed_shared_values_locally(
203 &mut execution,
204 &parent_constructed_shared_values,
205 );
206 }
207 if args.spawn_workers {
208 execution.skip_creating_dependencies();
209 }
210
211 let count = execution.remaining();
216 let results = Arc::new(Mutex::new(Vec::with_capacity(count)));
217
218 let start = Instant::now();
219 output.start_suite(&filtered_tests);
220
221 let execution = Arc::new(Mutex::new(execution));
222 let cloneable_wire_bytes = Arc::new(cloneable_wire_bytes);
223 let hosted_descriptor_bytes = Arc::new(hosted_descriptor_bytes);
224 let cloneable_codecs = Arc::new(cloneable_codecs);
225 let rpc_factories = Arc::new(rpc_factories);
226 let hosted_rpc_owner_cells = Arc::new(hosted_rpc_owner_cells);
227 let mut join_set = JoinSet::new();
228 let threads = args.test_threads().get();
229
230 for worker_idx in 0..threads {
231 let execution_clone = execution.clone();
232 let output_clone = output.clone();
233 let mut args_clone = args.clone();
236 if args_clone.spawn_workers {
237 args_clone.worker_index = Some(worker_idx);
238 }
239 let results_clone = results.clone();
240 let wire_bytes_clone = cloneable_wire_bytes.clone();
241 let hosted_bytes_clone = hosted_descriptor_bytes.clone();
242 let codecs_clone = cloneable_codecs.clone();
243 let rpc_factories_clone = rpc_factories.clone();
244 let hosted_rpc_owner_cells_clone = hosted_rpc_owner_cells.clone();
245 let handle = tokio::runtime::Handle::current();
246 join_set.spawn_blocking(move || {
247 handle.block_on(test_thread(
248 args_clone,
249 execution_clone,
250 output_clone,
251 count,
252 results_clone,
253 wire_bytes_clone,
254 hosted_bytes_clone,
255 codecs_clone,
256 rpc_factories_clone,
257 hosted_rpc_owner_cells_clone,
258 ))
259 });
260 }
261
262 while let Some(res) = join_set.join_next().await {
263 res.expect("Failed to join task");
264 }
265
266 drop(execution);
267
268 let results = results.lock().await;
269 output.finished_suite(&all_tests, &results, start.elapsed());
270 exit_code = SuiteResult::exit_code(&results);
271
272 if exit_code == ExitCode::SUCCESS {
273 break;
274 } else {
275 remaining_retries -= 1;
276 }
277 }
278 exit_code
279 }
280}
281
282#[allow(clippy::too_many_arguments)]
283async fn test_thread(
284 args: Arguments,
285 execution: Arc<Mutex<TestSuiteExecution>>,
286 output: Arc<dyn TestRunnerOutput>,
287 count: usize,
288 results: Arc<Mutex<Vec<(RegisteredTest, TestResult)>>>,
289 cloneable_wire_bytes: Arc<Vec<DepWireBytes>>,
290 hosted_descriptor_bytes: Arc<Vec<DepWireBytes>>,
291 cloneable_codecs: Arc<HashMap<String, (CloneableCodec, WorkerReconstructor)>>,
292 rpc_factories: Arc<HashMap<String, RpcFactory>>,
293 hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
294) {
295 let mut worker = spawn_worker_if_needed(&args).await;
296 if let Some(worker) = worker.as_mut() {
301 worker.set_hosted_rpc_owner_cells(hosted_rpc_owner_cells.clone());
302 }
303 let connection_arc = if let Some(ref name) = args.ipc {
304 let name = ipc_name(name.clone());
305 let stream = Stream::connect(name)
306 .await
307 .expect("Failed to connect to IPC socket");
308 Some(Arc::new(Mutex::new(stream)))
309 } else {
310 None
311 };
312
313 if let Some(worker) = worker.as_mut() {
314 for (dep_id, wire_bytes) in cloneable_wire_bytes.iter() {
315 worker
316 .provide_cloneable(dep_id.clone(), wire_bytes.clone())
317 .await;
318 }
319 for (dep_id, descriptor_bytes) in hosted_descriptor_bytes.iter() {
321 worker
322 .provide_hosted_descriptor(dep_id.clone(), descriptor_bytes.clone())
323 .await;
324 }
325 }
326
327 if let Some(connection) = connection_arc.as_ref() {
332 if !rpc_factories.is_empty() {
333 install_worker_subprocess_hosted_rpc_stubs(
334 &execution,
335 &rpc_factories,
336 connection.clone(),
337 )
338 .await;
339 }
340 }
341
342 let mut expected_test = None;
343
344 while !is_done(&execution).await {
345 if let Some(connection) = connection_arc.as_ref() {
346 while expected_test.is_none() {
347 let mut conn = connection.lock().await;
348 let command_bytes = read_frame_async(&mut *conn)
349 .await
350 .expect("Failed to read IPC command frame");
351 drop(conn);
352 let command: IpcCommand =
353 deserialize(&command_bytes).expect("Failed to decode IPC command");
354
355 match command {
356 IpcCommand::RunTest {
357 name,
358 crate_name,
359 module_path,
360 } => {
361 expected_test = Some((name, crate_name, module_path));
362 }
363 IpcCommand::ProvideCloneable { dep_id, wire_bytes } => {
364 apply_provided_wire_bytes(
366 &execution,
367 &cloneable_codecs,
368 &dep_id,
369 &wire_bytes,
370 "ProvideCloneable",
371 )
372 .await;
373 let response = IpcResponse::CloneableAccepted { dep_id };
374 let msg = serialize_to_byte_vec(&response)
375 .expect("Failed to encode IPC response");
376 let mut conn = connection.lock().await;
377 write_frame_async(&mut *conn, &msg)
378 .await
379 .expect("Failed to write IPC response frame");
380 }
381 IpcCommand::ProvideHostedDescriptor { dep_id, wire_bytes } => {
382 apply_provided_wire_bytes(
386 &execution,
387 &cloneable_codecs,
388 &dep_id,
389 &wire_bytes,
390 "ProvideHostedDescriptor",
391 )
392 .await;
393 let response = IpcResponse::HostedDescriptorAccepted { dep_id };
394 let msg = serialize_to_byte_vec(&response)
395 .expect("Failed to encode IPC response");
396 let mut conn = connection.lock().await;
397 write_frame_async(&mut *conn, &msg)
398 .await
399 .expect("Failed to write IPC response frame");
400 }
401 IpcCommand::HostedRpcReply { .. } => {
402 panic!(
409 "unexpected `HostedRpcReply` while waiting for the next \
410 between-tests command in the tokio worker subprocess: a \
411 stub call must have left a reply on the wire without \
412 draining it inline"
413 );
414 }
415 }
416 }
417 }
418
419 if let Some(next) = pick_next(&execution).await {
420 let skip = if let Some((name, crate_name, module_path)) = &expected_test {
421 next.test.name != *name
422 || next.test.crate_name != *crate_name
423 || next.test.module_path != *module_path
424 } else {
425 false
426 };
427
428 if !skip {
429 expected_test = None;
430
431 let ensure_time = get_ensure_time(&args, &next.test);
432
433 output.start_running_test(&next.test, next.index, count);
434 let result = run_test(
435 output.clone(),
436 next.index,
437 count,
438 args.nocapture,
439 args.include_ignored,
440 ensure_time,
441 next.deps.clone(),
442 &next.test,
443 &mut worker,
444 )
445 .await;
446 output.finished_running_test(&next.test, next.index, count, &result);
447
448 if let Some(connection) = connection_arc.as_ref() {
449 let finish_marker = Uuid::new_v4().to_string();
450 let finish_marker_line = format!("{finish_marker}\n");
451 tokio::io::stdout()
452 .write_all(finish_marker_line.as_bytes())
453 .await
454 .unwrap();
455 tokio::io::stderr()
456 .write_all(finish_marker_line.as_bytes())
457 .await
458 .unwrap();
459 tokio::io::stdout().flush().await.unwrap();
460 tokio::io::stderr().flush().await.unwrap();
461
462 let response = IpcResponse::TestFinished {
463 result: (&result).into(),
464 finish_marker,
465 };
466 let msg =
467 serialize_to_byte_vec(&response).expect("Failed to encode IPC response");
468 let mut conn = connection.lock().await;
469 write_frame_async(&mut *conn, &msg)
470 .await
471 .expect("Failed to write IPC response frame");
472 }
473
474 results.lock().await.push((next.test.clone(), result));
475 }
476 }
477 }
478}
479
480async fn is_done(execution: &Arc<Mutex<TestSuiteExecution>>) -> bool {
481 let execution = execution.lock().await;
482 execution.is_done()
483}
484
485async fn apply_provided_wire_bytes(
495 execution: &Arc<Mutex<TestSuiteExecution>>,
496 wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
497 dep_id: &str,
498 wire_bytes: &[u8],
499 source_command: &str,
500) {
501 let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
502 panic!("{source_command} referenced unknown wire-shipped dep '{dep_id}'")
503 });
504
505 let wire_payload = (codec.from_wire_bytes)(wire_bytes);
506 let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
507 Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
508 let reconstructed = match worker_fn {
509 WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
510 WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
511 };
512
513 let mut execution = execution.lock().await;
514 let applied = execution.provide_cloneable_value(dep_id, reconstructed);
515 assert!(
516 applied,
517 "{source_command} for dep '{dep_id}' did not match any registered dep in this worker"
518 );
519}
520
521fn apply_cloneable_values_locally(
536 execution: &mut TestSuiteExecution,
537 cloneable_local_values: &[(String, Arc<dyn Any + Send + Sync>)],
538) {
539 for (dep_id, value) in cloneable_local_values {
540 let applied = execution.provide_cloneable_value(dep_id, value.clone());
541 assert!(
542 applied,
543 "Cloneable dep '{dep_id}' could not be pre-populated locally"
544 );
545 }
546}
547
548fn apply_parent_constructed_shared_values_locally(
554 execution: &mut TestSuiteExecution,
555 values: &[(String, Arc<dyn Any + Send + Sync>)],
556) {
557 for (dep_id, value) in values {
558 let applied = execution.provide_materialized_shared_value(dep_id, value.clone());
559 assert!(
560 applied,
561 "Shared/PerWorker dep '{dep_id}' could not be pre-populated locally"
562 );
563 }
564}
565
566async fn apply_hosted_descriptors_locally(
579 execution: &mut TestSuiteExecution,
580 wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
581 descriptor_bytes: &[DepWireBytes],
582) {
583 for (dep_id, wire_bytes) in descriptor_bytes {
584 let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
585 panic!("Hosted dep '{dep_id}' missing codec/worker_fn for local handle reconstruction")
586 });
587 let wire_payload = (codec.from_wire_bytes)(wire_bytes);
588 let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
589 Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
590 let reconstructed = match worker_fn {
591 WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
592 WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
593 };
594 let applied = execution.provide_cloneable_value(dep_id, reconstructed);
595 assert!(
596 applied,
597 "Hosted dep '{dep_id}' could not be pre-populated locally"
598 );
599 }
600}
601
602async fn pick_next(execution: &Arc<Mutex<TestSuiteExecution>>) -> Option<TestExecution> {
603 let mut execution = execution.lock().await;
604 execution.pick_next().await
605}
606
607async fn run_with_flakiness_control<F>(
608 output: Arc<dyn TestRunnerOutput>,
609 test_description: &RegisteredTest,
610 idx: usize,
611 count: usize,
612 test: F,
613) -> Result<Result<(), FailureCause>, Box<dyn Any + Send>>
614where
615 F: Fn(
616 Instant,
617 )
618 -> Pin<Box<dyn Future<Output = Result<Result<(), FailureCause>, Box<dyn Any + Send>>>>>
619 + Send
620 + Sync,
621{
622 match &test_description.props.flakiness_control {
623 FlakinessControl::None => {
624 let start = Instant::now();
625 test(start).await
626 }
627 FlakinessControl::ProveNonFlaky(tries) => {
628 for n in 0..*tries {
629 if n > 0 {
630 output.repeat_running_test(
631 test_description,
632 idx,
633 count,
634 n + 1,
635 *tries,
636 "to ensure test is not flaky",
637 );
638 }
639 let start = Instant::now();
640 match test(start).await {
641 Ok(Ok(())) => {}
642 Ok(Err(e)) => return Ok(Err(e)),
643 Err(e) => return Err(e),
644 };
645 }
646 Ok(Ok(()))
647 }
648 FlakinessControl::RetryKnownFlaky(max_retries) => {
649 let mut tries = 1;
650 loop {
651 let start = Instant::now();
652 let result = test(start).await;
653
654 if result.is_err() && tries < *max_retries {
655 tries += 1;
656 output.repeat_running_test(
657 test_description,
658 idx,
659 count,
660 tries,
661 *max_retries,
662 "because test is known to be flaky",
663 );
664 } else {
665 break result;
666 }
667 }
668 }
669 }
670}
671
672#[allow(clippy::too_many_arguments)]
673async fn run_test(
674 output: Arc<dyn TestRunnerOutput>,
675 idx: usize,
676 count: usize,
677 nocapture: bool,
678 include_ignored: bool,
679 ensure_time: Option<TimeThreshold>,
680 dependency_view: Arc<dyn internal::DependencyView + Send + Sync>,
681 test: &RegisteredTest,
682 worker: &mut Option<Worker>,
683) -> TestResult {
684 if test.props.is_ignored && !include_ignored {
685 TestResult::ignored()
686 } else if let Some(worker) = worker.as_mut() {
687 worker.run_test(nocapture, test).await
688 } else {
689 let start = Instant::now();
690 let test = test.clone();
691 match &test.run {
692 TestFunction::Sync(_) => {
693 let handle = spawn_blocking(move || {
694 let test = test.clone();
695 crate::sync::run_sync_test_function(
696 output,
697 &test,
698 idx,
699 count,
700 ensure_time,
701 dependency_view,
702 )
703 });
704 handle.await.unwrap_or_else(|join_error| {
705 TestResult::failed(
706 start.elapsed(),
707 FailureCause::HarnessError(format!(
708 "Failed joining test task: {join_error}"
709 )),
710 )
711 })
712 }
713 TestFunction::Async(test_fn) => {
714 let timeout = test.props.timeout;
715 let test_fn = test_fn.clone();
716 let detached_panic_policy = test.props.detached_panic_policy.clone();
717 let result = run_with_flakiness_control(output, &test, idx, count, |start| {
718 let dependency_view = dependency_view.clone();
719 let test_fn = test_fn.clone();
720 Box::pin(async move {
721 let test_id = crate::panic_hook::next_test_id();
722 crate::panic_hook::set_current_test_id(test_id);
723 crate::panic_hook::create_detached_collector(test_id);
724 let result = AssertUnwindSafe(Box::pin(async move {
725 match timeout {
726 None => test_fn(dependency_view).await,
727 Some(duration) => {
728 let result =
729 tokio::time::timeout(duration, test_fn(dependency_view))
730 .await;
731 match result {
732 Ok(result) => result,
733 Err(_) => {
734 return Err(FailureCause::HarnessError(
735 "Test timed out".to_string(),
736 ))
737 }
738 }
739 }
740 }
741 .into_result()?;
742 if let Some(ensure_time) = ensure_time {
743 let elapsed = start.elapsed();
744 if ensure_time.is_critical(&elapsed) {
745 return Err(FailureCause::HarnessError(format!(
746 "Test run time exceeds critical threshold: {elapsed:?}"
747 )));
748 }
749 }
750 Ok(())
751 }))
752 .catch_unwind()
753 .await;
754 result
755 })
756 })
757 .await;
758 let mut test_result =
759 TestResult::from_result(&test.props.should_panic, start.elapsed(), result);
760 if let Some(test_id) = crate::panic_hook::current_test_id() {
761 if let Some(collector) = crate::panic_hook::take_detached_collector(test_id) {
762 let panics = match collector.lock() {
763 Ok(p) => p,
764 Err(poisoned) => poisoned.into_inner(),
765 };
766 if !panics.is_empty()
767 && detached_panic_policy == internal::DetachedPanicPolicy::FailTest
768 && test_result.is_passed()
769 {
770 let messages: Vec<String> = panics.iter().map(|p| p.render()).collect();
771 test_result = TestResult::failed(
772 start.elapsed(),
773 FailureCause::Panic(internal::PanicCause {
774 message: Some(format!(
775 "Detached task(s) panicked:\n{}",
776 messages.join("\n---\n")
777 )),
778 location: panics.first().and_then(|p| p.location.clone()),
779 backtrace: panics.first().and_then(|p| p.backtrace.clone()),
780 }),
781 );
782 }
783 }
784 }
785 crate::panic_hook::clear_current_test_id();
786 test_result
787 }
788 TestFunction::SyncBench(_) => {
789 let handle = spawn_blocking(move || {
790 let test = test.clone();
791 crate::sync::run_sync_test_function(
792 output,
793 &test,
794 idx,
795 count,
796 ensure_time,
797 dependency_view,
798 )
799 });
800 handle.await.unwrap_or_else(|join_error| {
801 TestResult::failed(
802 start.elapsed(),
803 FailureCause::HarnessError(format!(
804 "Failed joining test task: {join_error}"
805 )),
806 )
807 })
808 }
809 TestFunction::AsyncBench(bench_fn) => {
810 let mut bencher = AsyncBencher::new();
811 let test_id = crate::panic_hook::next_test_id();
812 crate::panic_hook::set_current_test_id(test_id);
813 let result = AssertUnwindSafe(async move {
814 bench_fn(&mut bencher, dependency_view).await;
815 (
816 bencher
817 .summary()
818 .expect("iter() was not called in bench function"),
819 bencher.bytes,
820 )
821 })
822 .catch_unwind()
823 .await;
824 let bytes = result.as_ref().map(|(_, bytes)| *bytes).unwrap_or_default();
825 let test_result = TestResult::from_summary(
826 &test.props.should_panic,
827 start.elapsed(),
828 result.map(|(summary, _)| summary),
829 bytes,
830 );
831 crate::panic_hook::clear_current_test_id();
832 test_result
833 }
834 }
835 }
836}
837
838struct Worker {
839 _listener: Listener,
840 _process: Child,
841 _out_handle: JoinHandle<()>,
842 _err_handle: JoinHandle<()>,
843 out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
844 err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
845 capture_enabled: Arc<Mutex<bool>>,
846 connection: Stream,
847 hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
851}
852
853impl Worker {
854 fn set_hosted_rpc_owner_cells(&mut self, cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>) {
858 self.hosted_rpc_owner_cells = cells;
859 }
860
861 async fn handle_hosted_rpc_call(
867 &mut self,
868 dump_on_ipc_failure: &DumpOnFailure,
869 request_id: u64,
870 dep_id: String,
871 method_idx: u32,
872 args_bytes: Vec<u8>,
873 ) {
874 let body = match self.hosted_rpc_owner_cells.get(&dep_id) {
875 Some(cell) => match cell.dispatch_async(method_idx, &args_bytes).await {
881 Ok(result_bytes) => HostedRpcReplyBody::Ok { result_bytes },
882 Err(message) => HostedRpcReplyBody::Err { message },
883 },
884 None => HostedRpcReplyBody::Err {
885 message: format!(
886 "HostedRpc dispatch: unknown dep id '{dep_id}' in parent owner-cell map"
887 ),
888 },
889 };
890 let reply = IpcCommand::HostedRpcReply { request_id, body };
891 let msg = serialize_to_byte_vec(&reply).expect("Failed to encode HostedRpcReply");
892 dump_on_ipc_failure
893 .run(write_frame_async(&mut self.connection, &msg).await)
894 .await;
895 }
896
897 pub async fn run_test(&mut self, nocapture: bool, test: &RegisteredTest) -> TestResult {
898 let mut capture_enabled = self.capture_enabled.lock().await;
899 *capture_enabled = test.props.capture_control.requires_capturing(!nocapture);
900 drop(capture_enabled);
901
902 let cmd = IpcCommand::RunTest {
904 name: test.name.clone(),
905 crate_name: test.crate_name.clone(),
906 module_path: test.module_path.clone(),
907 };
908
909 let dump_on_ipc_failure = self.dump_on_failure();
910
911 let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
912 dump_on_ipc_failure
913 .run(write_frame_async(&mut self.connection, &msg).await)
914 .await;
915
916 let response = loop {
917 let response_bytes = dump_on_ipc_failure
918 .run(read_frame_async(&mut self.connection).await)
919 .await;
920 let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
921 match response {
922 IpcResponse::TestFinished { .. } => break response,
923 IpcResponse::CloneableAccepted { .. }
924 | IpcResponse::HostedDescriptorAccepted { .. } => continue,
925 IpcResponse::HostedRpcCall {
926 request_id,
927 dep_id,
928 method_idx,
929 args_bytes,
930 } => {
931 self.handle_hosted_rpc_call(
932 &dump_on_ipc_failure,
933 request_id,
934 dep_id,
935 method_idx,
936 args_bytes,
937 )
938 .await;
939 continue;
940 }
941 }
942 };
943
944 let IpcResponse::TestFinished {
945 result,
946 finish_marker,
947 } = response
948 else {
949 unreachable!("loop only breaks on TestFinished")
950 };
951
952 if test.props.capture_control.requires_capturing(!nocapture) {
953 let out_lines: Vec<_> =
954 Self::drain_until(self.out_lines.clone(), finish_marker.clone()).await;
955 let err_lines: Vec<_> =
956 Self::drain_until(self.err_lines.clone(), finish_marker.clone()).await;
957 result.into_test_result(out_lines, err_lines)
958 } else {
959 result.into_test_result(Vec::new(), Vec::new())
960 }
961 }
962
963 async fn provide_cloneable(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
966 let dump_on_ipc_failure = self.dump_on_failure();
967 let cmd = IpcCommand::ProvideCloneable {
968 dep_id: dep_id.clone(),
969 wire_bytes,
970 };
971 let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
972 dump_on_ipc_failure
973 .run(write_frame_async(&mut self.connection, &msg).await)
974 .await;
975
976 loop {
977 let response_bytes = dump_on_ipc_failure
978 .run(read_frame_async(&mut self.connection).await)
979 .await;
980 let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
981 match response {
982 IpcResponse::CloneableAccepted { dep_id: ack_id } => {
983 if ack_id == dep_id {
984 return;
985 }
986 }
987 IpcResponse::HostedDescriptorAccepted { .. } => {
988 }
990 IpcResponse::TestFinished { .. } => {
991 }
993 IpcResponse::HostedRpcCall {
994 request_id,
995 dep_id: rpc_dep_id,
996 method_idx,
997 args_bytes,
998 } => {
999 self.handle_hosted_rpc_call(
1004 &dump_on_ipc_failure,
1005 request_id,
1006 rpc_dep_id,
1007 method_idx,
1008 args_bytes,
1009 )
1010 .await;
1011 }
1012 }
1013 }
1014 }
1015
1016 async fn provide_hosted_descriptor(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
1018 let dump_on_ipc_failure = self.dump_on_failure();
1019 let cmd = IpcCommand::ProvideHostedDescriptor {
1020 dep_id: dep_id.clone(),
1021 wire_bytes,
1022 };
1023 let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
1024 dump_on_ipc_failure
1025 .run(write_frame_async(&mut self.connection, &msg).await)
1026 .await;
1027
1028 loop {
1029 let response_bytes = dump_on_ipc_failure
1030 .run(read_frame_async(&mut self.connection).await)
1031 .await;
1032 let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
1033 match response {
1034 IpcResponse::HostedDescriptorAccepted { dep_id: ack_id } => {
1035 if ack_id == dep_id {
1036 return;
1037 }
1038 }
1039 IpcResponse::CloneableAccepted { .. } => {
1040 }
1042 IpcResponse::TestFinished { .. } => {
1043 }
1045 IpcResponse::HostedRpcCall {
1046 request_id,
1047 dep_id: rpc_dep_id,
1048 method_idx,
1049 args_bytes,
1050 } => {
1051 self.handle_hosted_rpc_call(
1054 &dump_on_ipc_failure,
1055 request_id,
1056 rpc_dep_id,
1057 method_idx,
1058 args_bytes,
1059 )
1060 .await;
1061 }
1062 }
1063 }
1064 }
1065
1066 fn dump_on_failure(&self) -> DumpOnFailure {
1067 DumpOnFailure {
1068 out_lines: self.out_lines.clone(),
1069 err_lines: self.err_lines.clone(),
1070 }
1071 }
1072
1073 async fn drain_until(
1074 source: Arc<Mutex<VecDeque<CapturedOutput>>>,
1075 finish_marker: String,
1076 ) -> Vec<CapturedOutput> {
1077 let mut result = Vec::new();
1078 loop {
1079 let mut source = source.lock().await;
1080 while let Some(line) = source.pop_front() {
1081 if line.line() == finish_marker {
1082 return result;
1083 } else {
1084 result.push(line.clone());
1085 }
1086 }
1087 drop(source);
1088
1089 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1090 }
1091 }
1092}
1093
1094struct DumpOnFailure {
1095 out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1096 err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1097}
1098
1099impl DumpOnFailure {
1100 pub async fn run<T, E>(&self, result: Result<T, E>) -> T {
1101 match result {
1102 Ok(value) => value,
1103 Err(_error) => {
1104 let out_lines: Vec<_> = self.out_lines.lock().await.drain(..).collect();
1105 let err_lines: Vec<_> = self.err_lines.lock().await.drain(..).collect();
1106 let mut all_lines = [out_lines, err_lines].concat();
1107 all_lines.sort();
1108
1109 for line in all_lines {
1110 eprintln!("{}", line.line());
1111 }
1112
1113 std::process::exit(1);
1114 }
1115 }
1116 }
1117}
1118
1119async fn spawn_worker_if_needed(args: &Arguments) -> Option<Worker> {
1120 if args.spawn_workers {
1121 let id = Uuid::new_v4();
1122 let name_str = format!("{id}.sock");
1123 let name = name_str
1124 .clone()
1125 .to_ns_name::<GenericNamespaced>()
1126 .expect("Invalid local socket name");
1127 let opts = ListenerOptions::new().name(name.clone());
1128 let listener = opts
1129 .create_tokio()
1130 .expect("Failed to create local socket listener");
1131
1132 let exe = std::env::current_exe().expect("Failed to get current executable path");
1133
1134 let mut args = args.clone();
1135 args.ipc = Some(name_str);
1136 args.spawn_workers = false;
1137 args.logfile = None;
1138 let args = args.to_args();
1139
1140 let mut process = Command::new(exe)
1141 .args(args)
1142 .stdin(Stdio::inherit())
1143 .stderr(Stdio::piped())
1144 .stdout(Stdio::piped())
1145 .spawn()
1146 .expect("Failed to spawn worker process");
1147
1148 let stdout = process.stdout.take().unwrap();
1149 let stderr = process.stderr.take().unwrap();
1150
1151 let out_lines = Arc::new(Mutex::new(VecDeque::new()));
1152 let err_lines = Arc::new(Mutex::new(VecDeque::new()));
1153 let capture_enabled = Arc::new(Mutex::new(true));
1154
1155 let out_lines_clone = out_lines.clone();
1156 let capture_enabled_clone = capture_enabled.clone();
1157 let out_handle = spawn(async move {
1158 let reader = BufReader::new(stdout);
1159 let mut lines = reader.lines();
1160 while let Some(line) = lines
1161 .next_line()
1162 .await
1163 .expect("Failed to read from worker stdout")
1164 {
1165 if *capture_enabled_clone.lock().await {
1166 out_lines_clone
1167 .lock()
1168 .await
1169 .push_back(CapturedOutput::stdout(line));
1170 } else {
1171 println!("{line}");
1172 }
1173 }
1174 });
1175
1176 let err_lines_clone = err_lines.clone();
1177 let capture_enabled_clone = capture_enabled.clone();
1178 let err_handle = spawn(async move {
1179 let reader = BufReader::new(stderr);
1180 let mut lines = reader.lines();
1181 while let Some(line) = lines
1182 .next_line()
1183 .await
1184 .expect("Failed to read from worker stderr")
1185 {
1186 if *capture_enabled_clone.lock().await {
1187 err_lines_clone
1188 .lock()
1189 .await
1190 .push_back(CapturedOutput::stderr(line));
1191 } else {
1192 eprintln!("{line}");
1193 }
1194 }
1195 });
1196
1197 let connection = listener
1198 .accept()
1199 .await
1200 .expect("Failed to accept connection");
1201
1202 Some(Worker {
1203 _listener: listener,
1204 _process: process,
1205 _out_handle: out_handle,
1206 _err_handle: err_handle,
1207 out_lines,
1208 err_lines,
1209 connection,
1210 capture_enabled,
1211 hosted_rpc_owner_cells: Arc::new(HashMap::new()),
1212 })
1213 } else {
1214 None
1215 }
1216}
1217
1218fn install_local_hosted_rpc_stubs(
1224 execution: &mut TestSuiteExecution,
1225 rpc_factories: &HashMap<String, RpcFactory>,
1226 owner_cells: &HashMap<String, Arc<HostedRpcOwnerCell>>,
1227) {
1228 let transport: Arc<dyn HostedRpcTransport> =
1229 Arc::new(InProcessHostedRpcTransport::new(owner_cells.clone()));
1230 for (dep_id, factory) in rpc_factories.iter() {
1231 if !owner_cells.contains_key(dep_id) {
1232 continue;
1236 }
1237 let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1238 let stub = (factory.build_stub)(channel);
1239 let applied = execution.provide_cloneable_value(dep_id, stub);
1240 assert!(
1241 applied,
1242 "Local HostedRpc stub for '{dep_id}' did not match any registered dep"
1243 );
1244 }
1245}
1246
1247async fn install_worker_subprocess_hosted_rpc_stubs(
1252 execution: &Arc<Mutex<TestSuiteExecution>>,
1253 rpc_factories: &HashMap<String, RpcFactory>,
1254 connection_arc: Arc<Mutex<Stream>>,
1255) {
1256 let transport: Arc<dyn HostedRpcTransport> =
1257 Arc::new(IpcHostedRpcTransport::new(connection_arc));
1258 for (dep_id, factory) in rpc_factories.iter() {
1259 let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1260 let stub = (factory.build_stub)(channel);
1261 let mut execution = execution.lock().await;
1262 let applied = execution.provide_cloneable_value(dep_id, stub);
1263 let _ = applied;
1266 }
1267}
1268
1269struct IpcHostedRpcTransport {
1287 connection: Arc<Mutex<Stream>>,
1288 next_request_id: AtomicU64,
1289}
1290
1291impl IpcHostedRpcTransport {
1292 fn new(connection: Arc<Mutex<Stream>>) -> Self {
1293 Self {
1294 connection,
1295 next_request_id: AtomicU64::new(0),
1296 }
1297 }
1298}
1299
1300impl HostedRpcTransport for IpcHostedRpcTransport {
1301 fn call(
1302 &self,
1303 dep_id: &str,
1304 method_idx: u32,
1305 args: Vec<u8>,
1306 ) -> Result<Vec<u8>, HostedRpcError> {
1307 let request_id = self.next_request_id.fetch_add(1, Ordering::SeqCst);
1308 let call = IpcResponse::HostedRpcCall {
1309 request_id,
1310 dep_id: dep_id.to_string(),
1311 method_idx,
1312 args_bytes: args,
1313 };
1314 let msg = serialize_to_byte_vec(&call).map_err(|e| {
1315 HostedRpcError::Transport(format!("encode HostedRpcCall failed: {e:?}"))
1316 })?;
1317
1318 let connection = self.connection.clone();
1319 let handle = tokio::runtime::Handle::current();
1320
1321 tokio::task::block_in_place(move || {
1326 handle.block_on(async move {
1327 let mut conn = connection.lock().await;
1328 write_frame_async(&mut *conn, &msg).await.map_err(|e| {
1329 HostedRpcError::Transport(format!("write HostedRpcCall failed: {e:?}"))
1330 })?;
1331 let reply_bytes = read_frame_async(&mut *conn).await.map_err(|e| {
1332 HostedRpcError::Transport(format!("read HostedRpcReply failed: {e:?}"))
1333 })?;
1334 let command: IpcCommand = deserialize(&reply_bytes).map_err(|e| {
1335 HostedRpcError::Transport(format!("decode HostedRpcReply failed: {e:?}"))
1336 })?;
1337 match command {
1338 IpcCommand::HostedRpcReply {
1339 request_id: reply_id,
1340 body,
1341 } => {
1342 if reply_id != request_id {
1343 return Err(HostedRpcError::Transport(format!(
1344 "HostedRpcReply request_id mismatch: expected {request_id}, got {reply_id}"
1345 )));
1346 }
1347 match body {
1348 HostedRpcReplyBody::Ok { result_bytes } => Ok(result_bytes),
1349 HostedRpcReplyBody::Err { message } => {
1350 Err(HostedRpcError::Dispatch(message))
1351 }
1352 }
1353 }
1354 other => Err(HostedRpcError::Transport(format!(
1355 "unexpected IpcCommand while waiting for HostedRpcReply: {other:?}"
1356 ))),
1357 }
1358 })
1359 })
1360 }
1361}