1use crate::args::{Arguments, TimeThreshold};
2use crate::bench::AsyncBencher;
3use crate::execution::{DepWireBytes, TestExecution, TestSuiteExecution};
4use crate::internal;
5use crate::internal::{
6 generate_tests, get_ensure_time, CapturedOutput, CloneableCodec, FailureCause,
7 FlakinessControl, HostedRpcChannel, HostedRpcError, HostedRpcOwnerCell, HostedRpcTransport,
8 InProcessHostedRpcTransport, RegisteredTest, RpcFactory, SuiteResult, TestFunction, TestResult,
9 WorkerReconstructor,
10};
11use crate::ipc::{
12 ipc_name, read_frame_async, write_frame_async, HostedRpcReplyBody, IpcCommand, IpcResponse,
13};
14use crate::output::{test_runner_output, TestRunnerOutput};
15use desert_rust::{deserialize, serialize_to_byte_vec};
16use futures::FutureExt;
17use interprocess::local_socket::tokio::prelude::*;
18use interprocess::local_socket::tokio::{Listener, Stream};
19use interprocess::local_socket::{GenericNamespaced, ListenerOptions};
20use std::any::Any;
21use std::collections::HashMap;
22use std::collections::VecDeque;
23use std::future::Future;
24use std::panic::AssertUnwindSafe;
25use std::pin::Pin;
26use std::process::{ExitCode, Stdio};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
30use tokio::process::{Child, Command};
31use tokio::spawn;
32use tokio::sync::Mutex;
33use tokio::task::{spawn_blocking, JoinHandle, JoinSet};
34use tokio::time::Instant;
35use uuid::Uuid;
36
37pub fn test_runner() -> ExitCode {
38 tokio::runtime::Builder::new_multi_thread()
39 .enable_all()
40 .build()
41 .unwrap()
42 .block_on(async_test_runner())
43}
44
45#[allow(clippy::await_holding_lock)]
46async fn async_test_runner() -> ExitCode {
47 crate::panic_hook::install_panic_hook();
48 let mut args = Arguments::from_args();
49 if let Some(idx) = args.worker_index {
53 crate::worker::set_worker_index(idx);
54 }
55 let output = test_runner_output(&args);
59
60 let registered_tests = internal::REGISTERED_TESTS.lock().unwrap();
61 let registered_dependency_constructors =
62 internal::REGISTERED_DEPENDENCY_CONSTRUCTORS.lock().unwrap();
63 let registered_testsuite_props = internal::REGISTERED_TESTSUITE_PROPS.lock().unwrap();
64 let registered_test_generators = internal::REGISTERED_TEST_GENERATORS.lock().unwrap();
65
66 let generated_tests = generate_tests(®istered_test_generators).await;
67
68 let all_tests: Vec<RegisteredTest> = registered_tests
69 .iter()
70 .cloned()
71 .chain(generated_tests)
72 .collect();
73
74 if args.list {
75 output.test_list(&all_tests);
76 ExitCode::SUCCESS
77 } else {
78 let mut remaining_retries = args.flaky_run.unwrap_or(1);
79
80 let mut exit_code = ExitCode::from(101);
81 while remaining_retries > 0 {
82 let (mut execution, filtered_tests) = TestSuiteExecution::construct(
83 &args,
84 registered_dependency_constructors.as_slice(),
85 &all_tests,
86 registered_testsuite_props.as_slice(),
87 );
88 args.finalize_for_execution(&execution, output.clone());
89 let mut host_capture = crate::host_capture::install_if_needed(&args);
93 let host_capture_epoch: Option<std::time::Instant> =
94 host_capture.as_ref().map(|hc| hc.epoch());
95 let host_capture_epoch_wall: Option<std::time::SystemTime> =
96 host_capture.as_ref().map(|hc| hc.epoch_wall());
97 let is_top_level_parent = args.is_top_level_parent();
98 let has_selected_tests = execution.remaining() > 0;
99 let needs_parent_shared = execution.has_cloneable_dependencies()
104 || execution.has_hosted_dependencies()
105 || execution.has_hosted_rpc_dependencies();
106 let parent_shared = if is_top_level_parent && has_selected_tests && needs_parent_shared
107 {
108 execution.collect_parent_shared_dependencies_async().await
109 } else {
110 crate::execution::ParentSharedDependencies {
111 cloneable_wire_bytes: Vec::new(),
112 cloneable_local_values: Vec::new(),
113 hosted_descriptor_bytes: Vec::new(),
114 hosted_owners: Vec::new(),
115 hosted_rpc_owner_cells: Vec::new(),
116 parent_constructed_shared_values: Vec::new(),
117 }
118 };
119 let cloneable_wire_bytes = parent_shared.cloneable_wire_bytes;
120 let cloneable_local_values = parent_shared.cloneable_local_values;
121 let hosted_descriptor_bytes = parent_shared.hosted_descriptor_bytes;
122 let _hosted_owners = parent_shared.hosted_owners;
123 let hosted_rpc_owner_cells: HashMap<String, Arc<HostedRpcOwnerCell>> =
124 parent_shared.hosted_rpc_owner_cells.into_iter().collect();
125 let parent_constructed_shared_values = parent_shared.parent_constructed_shared_values;
126 let rpc_factories: HashMap<String, RpcFactory> = registered_dependency_constructors
130 .iter()
131 .filter_map(|d| {
132 if d.scope == crate::internal::DepScope::HostedRpc {
133 d.rpc_factory
134 .as_ref()
135 .map(|f| (d.qualified_id(), f.clone()))
136 } else {
137 None
138 }
139 })
140 .collect();
141 let cloneable_codecs: HashMap<String, (CloneableCodec, WorkerReconstructor)> =
149 registered_dependency_constructors
150 .iter()
151 .filter_map(|d| {
152 let codec_opt = match d.scope {
153 crate::internal::DepScope::Cloneable => d.cloneable_codec.as_ref(),
154 crate::internal::DepScope::Hosted => d.hosted_codec.as_ref(),
155 _ => None,
156 };
157 match (codec_opt, &d.worker_fn) {
158 (Some(codec), Some(worker_fn)) => {
159 Some((d.qualified_id(), (codec.clone(), worker_fn.clone())))
160 }
161 _ => None,
162 }
163 })
164 .collect();
165 if is_top_level_parent && !args.spawn_workers && !cloneable_local_values.is_empty() {
175 apply_cloneable_values_locally(&mut execution, &cloneable_local_values);
176 }
177 if is_top_level_parent && !args.spawn_workers && !hosted_descriptor_bytes.is_empty() {
185 apply_hosted_descriptors_locally(
186 &mut execution,
187 &cloneable_codecs,
188 &hosted_descriptor_bytes,
189 )
190 .await;
191 }
192 if is_top_level_parent && !args.spawn_workers && !hosted_rpc_owner_cells.is_empty() {
197 install_local_hosted_rpc_stubs(
198 &mut execution,
199 &rpc_factories,
200 &hosted_rpc_owner_cells,
201 );
202 }
203 if is_top_level_parent
210 && !args.spawn_workers
211 && !parent_constructed_shared_values.is_empty()
212 {
213 apply_parent_constructed_shared_values_locally(
214 &mut execution,
215 &parent_constructed_shared_values,
216 );
217 }
218 if args.spawn_workers {
219 execution.skip_creating_dependencies();
220 }
221
222 let count = execution.remaining();
227 let results = Arc::new(Mutex::new(Vec::with_capacity(count)));
228 let host_windows: Arc<Mutex<Vec<crate::host_capture::HostWindow>>> =
233 Arc::new(Mutex::new(Vec::with_capacity(count)));
234
235 let start = Instant::now();
236 output.start_suite(&filtered_tests);
237
238 let execution = Arc::new(Mutex::new(execution));
239 let cloneable_wire_bytes = Arc::new(cloneable_wire_bytes);
240 let hosted_descriptor_bytes = Arc::new(hosted_descriptor_bytes);
241 let cloneable_codecs = Arc::new(cloneable_codecs);
242 let rpc_factories = Arc::new(rpc_factories);
243 let hosted_rpc_owner_cells = Arc::new(hosted_rpc_owner_cells);
244 let mut join_set = JoinSet::new();
245 let threads = args.test_threads().get();
246
247 for worker_idx in 0..threads {
248 let execution_clone = execution.clone();
249 let output_clone = output.clone();
250 let mut args_clone = args.clone();
253 if args_clone.spawn_workers {
254 args_clone.worker_index = Some(worker_idx);
255 }
256 let results_clone = results.clone();
257 let host_windows_clone = host_windows.clone();
258 let wire_bytes_clone = cloneable_wire_bytes.clone();
259 let hosted_bytes_clone = hosted_descriptor_bytes.clone();
260 let codecs_clone = cloneable_codecs.clone();
261 let rpc_factories_clone = rpc_factories.clone();
262 let hosted_rpc_owner_cells_clone = hosted_rpc_owner_cells.clone();
263 let handle = tokio::runtime::Handle::current();
264 join_set.spawn_blocking(move || {
265 handle.block_on(test_thread(
266 args_clone,
267 execution_clone,
268 output_clone,
269 count,
270 results_clone,
271 host_windows_clone,
272 wire_bytes_clone,
273 hosted_bytes_clone,
274 codecs_clone,
275 rpc_factories_clone,
276 hosted_rpc_owner_cells_clone,
277 host_capture_epoch,
278 ))
279 });
280 }
281
282 while let Some(res) = join_set.join_next().await {
283 res.expect("Failed to join task");
284 }
285
286 drop(execution);
287
288 let mut results = results.lock().await;
289 drop(hosted_rpc_owner_cells);
297 drop(_hosted_owners);
298
299 if let Some(hc) = host_capture.take() {
303 let epoch_wall = host_capture_epoch_wall.unwrap_or_else(|| hc.epoch_wall());
304 let records = hc.finalize();
305 let windows = host_windows.lock().await;
306 let windows_indexed: Vec<(usize, crate::host_capture::HostWindow)> =
307 windows.iter().copied().enumerate().collect();
308 crate::host_capture::attribute_records_to_tests(
309 epoch_wall,
310 &records,
311 &windows_indexed,
312 &mut results,
313 );
314 }
315 output.finished_suite(&all_tests, &results, start.elapsed());
316 exit_code = SuiteResult::exit_code(&results);
317
318 if exit_code == ExitCode::SUCCESS {
319 break;
320 } else {
321 remaining_retries -= 1;
322 }
323 }
324 exit_code
325 }
326}
327
328#[allow(clippy::too_many_arguments)]
329async fn test_thread(
330 args: Arguments,
331 execution: Arc<Mutex<TestSuiteExecution>>,
332 output: Arc<dyn TestRunnerOutput>,
333 count: usize,
334 results: Arc<Mutex<Vec<(RegisteredTest, TestResult)>>>,
335 host_windows: Arc<Mutex<Vec<crate::host_capture::HostWindow>>>,
336 cloneable_wire_bytes: Arc<Vec<DepWireBytes>>,
337 hosted_descriptor_bytes: Arc<Vec<DepWireBytes>>,
338 cloneable_codecs: Arc<HashMap<String, (CloneableCodec, WorkerReconstructor)>>,
339 rpc_factories: Arc<HashMap<String, RpcFactory>>,
340 hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
341 host_capture_epoch: Option<std::time::Instant>,
342) {
343 let mut worker = spawn_worker_if_needed(&args).await;
344 if let Some(worker) = worker.as_mut() {
349 worker.set_hosted_rpc_owner_cells(hosted_rpc_owner_cells.clone());
350 }
351 let connection_arc = if let Some(ref name) = args.ipc {
352 let name = ipc_name(name.clone());
353 let stream = Stream::connect(name)
354 .await
355 .expect("Failed to connect to IPC socket");
356 Some(Arc::new(Mutex::new(stream)))
357 } else {
358 None
359 };
360
361 if let Some(worker) = worker.as_mut() {
362 for (dep_id, wire_bytes) in cloneable_wire_bytes.iter() {
363 worker
364 .provide_cloneable(dep_id.clone(), wire_bytes.clone())
365 .await;
366 }
367 for (dep_id, descriptor_bytes) in hosted_descriptor_bytes.iter() {
369 worker
370 .provide_hosted_descriptor(dep_id.clone(), descriptor_bytes.clone())
371 .await;
372 }
373 }
374
375 if let Some(connection) = connection_arc.as_ref() {
380 if !rpc_factories.is_empty() {
381 install_worker_subprocess_hosted_rpc_stubs(
382 &execution,
383 &rpc_factories,
384 connection.clone(),
385 )
386 .await;
387 }
388 }
389
390 let mut expected_test = None;
391
392 while !is_done(&execution).await {
393 if let Some(connection) = connection_arc.as_ref() {
394 while expected_test.is_none() {
395 let mut conn = connection.lock().await;
396 let command_bytes = read_frame_async(&mut *conn)
397 .await
398 .expect("Failed to read IPC command frame");
399 drop(conn);
400 let command: IpcCommand =
401 deserialize(&command_bytes).expect("Failed to decode IPC command");
402
403 match command {
404 IpcCommand::RunTest {
405 name,
406 crate_name,
407 module_path,
408 } => {
409 expected_test = Some((name, crate_name, module_path));
410 }
411 IpcCommand::ProvideCloneable { dep_id, wire_bytes } => {
412 apply_provided_wire_bytes(
414 &execution,
415 &cloneable_codecs,
416 &dep_id,
417 &wire_bytes,
418 "ProvideCloneable",
419 )
420 .await;
421 let response = IpcResponse::CloneableAccepted { dep_id };
422 let msg = serialize_to_byte_vec(&response)
423 .expect("Failed to encode IPC response");
424 let mut conn = connection.lock().await;
425 write_frame_async(&mut *conn, &msg)
426 .await
427 .expect("Failed to write IPC response frame");
428 }
429 IpcCommand::ProvideHostedDescriptor { dep_id, wire_bytes } => {
430 apply_provided_wire_bytes(
434 &execution,
435 &cloneable_codecs,
436 &dep_id,
437 &wire_bytes,
438 "ProvideHostedDescriptor",
439 )
440 .await;
441 let response = IpcResponse::HostedDescriptorAccepted { dep_id };
442 let msg = serialize_to_byte_vec(&response)
443 .expect("Failed to encode IPC response");
444 let mut conn = connection.lock().await;
445 write_frame_async(&mut *conn, &msg)
446 .await
447 .expect("Failed to write IPC response frame");
448 }
449 IpcCommand::HostedRpcReply { .. } => {
450 panic!(
457 "unexpected `HostedRpcReply` while waiting for the next \
458 between-tests command in the tokio worker subprocess: a \
459 stub call must have left a reply on the wire without \
460 draining it inline"
461 );
462 }
463 }
464 }
465 }
466
467 if let Some(next) = pick_next(&execution).await {
468 let skip = if let Some((name, crate_name, module_path)) = &expected_test {
469 next.test.name != *name
470 || next.test.crate_name != *crate_name
471 || next.test.module_path != *module_path
472 } else {
473 false
474 };
475
476 if !skip {
477 expected_test = None;
478
479 let ensure_time = get_ensure_time(&args, &next.test);
480
481 let window_start = std::time::Instant::now();
489
490 output.start_running_test(&next.test, next.index, count);
491 let result = run_test(
492 output.clone(),
493 next.index,
494 count,
495 args.nocapture,
496 args.include_ignored,
497 ensure_time,
498 next.deps.clone(),
499 &next.test,
500 &mut worker,
501 )
502 .await;
503 output.finished_running_test(&next.test, next.index, count, &result);
504 let window_end = std::time::Instant::now();
505
506 if let Some(connection) = connection_arc.as_ref() {
507 let finish_marker = Uuid::new_v4().to_string();
508 let finish_marker_line = format!("{finish_marker}\n");
509 tokio::io::stdout()
510 .write_all(finish_marker_line.as_bytes())
511 .await
512 .unwrap();
513 tokio::io::stderr()
514 .write_all(finish_marker_line.as_bytes())
515 .await
516 .unwrap();
517 tokio::io::stdout().flush().await.unwrap();
518 tokio::io::stderr().flush().await.unwrap();
519
520 let response = IpcResponse::TestFinished {
521 result: (&result).into(),
522 finish_marker,
523 };
524 let msg =
525 serialize_to_byte_vec(&response).expect("Failed to encode IPC response");
526 let mut conn = connection.lock().await;
527 write_frame_async(&mut *conn, &msg)
528 .await
529 .expect("Failed to write IPC response frame");
530 }
531
532 let window = crate::host_capture::HostWindow::from_instants(
536 host_capture_epoch,
537 window_start,
538 window_end,
539 )
540 .unwrap_or(crate::host_capture::HostWindow {
541 start: std::time::Duration::ZERO,
542 end: std::time::Duration::ZERO,
543 });
544 let mut results_guard = results.lock().await;
545 let mut windows_guard = host_windows.lock().await;
546 results_guard.push((next.test.clone(), result));
547 windows_guard.push(window);
548 }
549 }
550 }
551}
552
553async fn is_done(execution: &Arc<Mutex<TestSuiteExecution>>) -> bool {
554 let execution = execution.lock().await;
555 execution.is_done()
556}
557
558async fn apply_provided_wire_bytes(
568 execution: &Arc<Mutex<TestSuiteExecution>>,
569 wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
570 dep_id: &str,
571 wire_bytes: &[u8],
572 source_command: &str,
573) {
574 let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
575 panic!("{source_command} referenced unknown wire-shipped dep '{dep_id}'")
576 });
577
578 let wire_payload = (codec.from_wire_bytes)(wire_bytes);
579 let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
580 Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
581 let reconstructed = match worker_fn {
582 WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
583 WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
584 };
585
586 let mut execution = execution.lock().await;
587 let applied = execution.provide_cloneable_value(dep_id, reconstructed);
588 assert!(
589 applied,
590 "{source_command} for dep '{dep_id}' did not match any registered dep in this worker"
591 );
592}
593
594fn apply_cloneable_values_locally(
609 execution: &mut TestSuiteExecution,
610 cloneable_local_values: &[(String, Arc<dyn Any + Send + Sync>)],
611) {
612 for (dep_id, value) in cloneable_local_values {
613 let applied = execution.provide_cloneable_value(dep_id, value.clone());
614 assert!(
615 applied,
616 "Cloneable dep '{dep_id}' could not be pre-populated locally"
617 );
618 }
619}
620
621fn apply_parent_constructed_shared_values_locally(
627 execution: &mut TestSuiteExecution,
628 values: &[(String, Arc<dyn Any + Send + Sync>)],
629) {
630 for (dep_id, value) in values {
631 let applied = execution.provide_materialized_shared_value(dep_id, value.clone());
632 assert!(
633 applied,
634 "Shared/PerWorker dep '{dep_id}' could not be pre-populated locally"
635 );
636 }
637}
638
639async fn apply_hosted_descriptors_locally(
652 execution: &mut TestSuiteExecution,
653 wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
654 descriptor_bytes: &[DepWireBytes],
655) {
656 for (dep_id, wire_bytes) in descriptor_bytes {
657 let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
658 panic!("Hosted dep '{dep_id}' missing codec/worker_fn for local handle reconstruction")
659 });
660 let wire_payload = (codec.from_wire_bytes)(wire_bytes);
661 let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
662 Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
663 let reconstructed = match worker_fn {
664 WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
665 WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
666 };
667 let applied = execution.provide_cloneable_value(dep_id, reconstructed);
668 assert!(
669 applied,
670 "Hosted dep '{dep_id}' could not be pre-populated locally"
671 );
672 }
673}
674
675async fn pick_next(execution: &Arc<Mutex<TestSuiteExecution>>) -> Option<TestExecution> {
676 let mut execution = execution.lock().await;
677 execution.pick_next().await
678}
679
680async fn run_with_flakiness_control<F>(
681 output: Arc<dyn TestRunnerOutput>,
682 test_description: &RegisteredTest,
683 idx: usize,
684 count: usize,
685 test: F,
686) -> Result<Result<(), FailureCause>, Box<dyn Any + Send>>
687where
688 F: Fn(
689 Instant,
690 )
691 -> Pin<Box<dyn Future<Output = Result<Result<(), FailureCause>, Box<dyn Any + Send>>>>>
692 + Send
693 + Sync,
694{
695 match &test_description.props.flakiness_control {
696 FlakinessControl::None => {
697 let start = Instant::now();
698 test(start).await
699 }
700 FlakinessControl::ProveNonFlaky(tries) => {
701 for n in 0..*tries {
702 if n > 0 {
703 output.repeat_running_test(
704 test_description,
705 idx,
706 count,
707 n + 1,
708 *tries,
709 "to ensure test is not flaky",
710 );
711 }
712 let start = Instant::now();
713 match test(start).await {
714 Ok(Ok(())) => {}
715 Ok(Err(e)) => return Ok(Err(e)),
716 Err(e) => return Err(e),
717 };
718 }
719 Ok(Ok(()))
720 }
721 FlakinessControl::RetryKnownFlaky(max_retries) => {
722 let mut tries = 1;
723 loop {
724 let start = Instant::now();
725 let result = test(start).await;
726
727 if result.is_err() && tries < *max_retries {
728 tries += 1;
729 output.repeat_running_test(
730 test_description,
731 idx,
732 count,
733 tries,
734 *max_retries,
735 "because test is known to be flaky",
736 );
737 } else {
738 break result;
739 }
740 }
741 }
742 }
743}
744
745#[allow(clippy::too_many_arguments)]
746async fn run_test(
747 output: Arc<dyn TestRunnerOutput>,
748 idx: usize,
749 count: usize,
750 nocapture: bool,
751 include_ignored: bool,
752 ensure_time: Option<TimeThreshold>,
753 dependency_view: Arc<dyn internal::DependencyView + Send + Sync>,
754 test: &RegisteredTest,
755 worker: &mut Option<Worker>,
756) -> TestResult {
757 if test.props.is_ignored && !include_ignored {
758 TestResult::ignored()
759 } else if let Some(worker) = worker.as_mut() {
760 worker.run_test(nocapture, test).await
761 } else {
762 let start = Instant::now();
763 let test = test.clone();
764 match &test.run {
765 TestFunction::Sync(_) => {
766 let handle = spawn_blocking(move || {
767 let test = test.clone();
768 crate::sync::run_sync_test_function(
769 output,
770 &test,
771 idx,
772 count,
773 ensure_time,
774 dependency_view,
775 )
776 });
777 handle.await.unwrap_or_else(|join_error| {
778 TestResult::failed(
779 start.elapsed(),
780 FailureCause::HarnessError(format!(
781 "Failed joining test task: {join_error}"
782 )),
783 )
784 })
785 }
786 TestFunction::Async(test_fn) => {
787 let timeout = test.props.timeout;
788 let test_fn = test_fn.clone();
789 let detached_panic_policy = test.props.detached_panic_policy.clone();
790 let result = run_with_flakiness_control(output, &test, idx, count, |start| {
791 let dependency_view = dependency_view.clone();
792 let test_fn = test_fn.clone();
793 Box::pin(async move {
794 let test_id = crate::panic_hook::next_test_id();
795 crate::panic_hook::set_current_test_id(test_id);
796 crate::panic_hook::create_detached_collector(test_id);
797 let result = AssertUnwindSafe(Box::pin(async move {
798 match timeout {
799 None => test_fn(dependency_view).await,
800 Some(duration) => {
801 let result =
802 tokio::time::timeout(duration, test_fn(dependency_view))
803 .await;
804 match result {
805 Ok(result) => result,
806 Err(_) => {
807 return Err(FailureCause::HarnessError(
808 "Test timed out".to_string(),
809 ))
810 }
811 }
812 }
813 }
814 .into_result()?;
815 if let Some(ensure_time) = ensure_time {
816 let elapsed = start.elapsed();
817 if ensure_time.is_critical(&elapsed) {
818 return Err(FailureCause::HarnessError(format!(
819 "Test run time exceeds critical threshold: {elapsed:?}"
820 )));
821 }
822 }
823 Ok(())
824 }))
825 .catch_unwind()
826 .await;
827 result
828 })
829 })
830 .await;
831 let mut test_result =
832 TestResult::from_result(&test.props.should_panic, start.elapsed(), result);
833 if let Some(test_id) = crate::panic_hook::current_test_id() {
834 if let Some(collector) = crate::panic_hook::take_detached_collector(test_id) {
835 let panics = match collector.lock() {
836 Ok(p) => p,
837 Err(poisoned) => poisoned.into_inner(),
838 };
839 if !panics.is_empty()
840 && detached_panic_policy == internal::DetachedPanicPolicy::FailTest
841 && test_result.is_passed()
842 {
843 let messages: Vec<String> = panics.iter().map(|p| p.render()).collect();
844 test_result = TestResult::failed(
845 start.elapsed(),
846 FailureCause::Panic(internal::PanicCause {
847 message: Some(format!(
848 "Detached task(s) panicked:\n{}",
849 messages.join("\n---\n")
850 )),
851 location: panics.first().and_then(|p| p.location.clone()),
852 backtrace: panics.first().and_then(|p| p.backtrace.clone()),
853 }),
854 );
855 }
856 }
857 }
858 crate::panic_hook::clear_current_test_id();
859 test_result
860 }
861 TestFunction::SyncBench(_) => {
862 let handle = spawn_blocking(move || {
863 let test = test.clone();
864 crate::sync::run_sync_test_function(
865 output,
866 &test,
867 idx,
868 count,
869 ensure_time,
870 dependency_view,
871 )
872 });
873 handle.await.unwrap_or_else(|join_error| {
874 TestResult::failed(
875 start.elapsed(),
876 FailureCause::HarnessError(format!(
877 "Failed joining test task: {join_error}"
878 )),
879 )
880 })
881 }
882 TestFunction::AsyncBench(bench_fn) => {
883 let mut bencher = AsyncBencher::new();
884 let test_id = crate::panic_hook::next_test_id();
885 crate::panic_hook::set_current_test_id(test_id);
886 let result = AssertUnwindSafe(async move {
887 bench_fn(&mut bencher, dependency_view).await;
888 (
889 bencher
890 .summary()
891 .expect("iter() was not called in bench function"),
892 bencher.bytes,
893 )
894 })
895 .catch_unwind()
896 .await;
897 let bytes = result.as_ref().map(|(_, bytes)| *bytes).unwrap_or_default();
898 let test_result = TestResult::from_summary(
899 &test.props.should_panic,
900 start.elapsed(),
901 result.map(|(summary, _)| summary),
902 bytes,
903 );
904 crate::panic_hook::clear_current_test_id();
905 test_result
906 }
907 }
908 }
909}
910
911struct Worker {
912 _listener: Listener,
913 _process: Child,
914 _out_handle: JoinHandle<()>,
915 _err_handle: JoinHandle<()>,
916 out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
917 err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
918 capture_enabled: Arc<Mutex<bool>>,
919 connection: Stream,
920 hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
924}
925
926impl Worker {
927 fn set_hosted_rpc_owner_cells(&mut self, cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>) {
931 self.hosted_rpc_owner_cells = cells;
932 }
933
934 async fn handle_hosted_rpc_call(
940 &mut self,
941 dump_on_ipc_failure: &DumpOnFailure,
942 request_id: u64,
943 dep_id: String,
944 method_idx: u32,
945 args_bytes: Vec<u8>,
946 ) {
947 let body = match self.hosted_rpc_owner_cells.get(&dep_id) {
948 Some(cell) => match cell.dispatch_async(method_idx, &args_bytes).await {
954 Ok(result_bytes) => HostedRpcReplyBody::Ok { result_bytes },
955 Err(message) => HostedRpcReplyBody::Err { message },
956 },
957 None => HostedRpcReplyBody::Err {
958 message: format!(
959 "HostedRpc dispatch: unknown dep id '{dep_id}' in parent owner-cell map"
960 ),
961 },
962 };
963 let reply = IpcCommand::HostedRpcReply { request_id, body };
964 let msg = serialize_to_byte_vec(&reply).expect("Failed to encode HostedRpcReply");
965 dump_on_ipc_failure
966 .run(write_frame_async(&mut self.connection, &msg).await)
967 .await;
968 }
969
970 pub async fn run_test(&mut self, nocapture: bool, test: &RegisteredTest) -> TestResult {
971 let mut capture_enabled = self.capture_enabled.lock().await;
972 *capture_enabled = test.props.capture_control.requires_capturing(!nocapture);
973 drop(capture_enabled);
974
975 let cmd = IpcCommand::RunTest {
977 name: test.name.clone(),
978 crate_name: test.crate_name.clone(),
979 module_path: test.module_path.clone(),
980 };
981
982 let dump_on_ipc_failure = self.dump_on_failure();
983
984 let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
985 dump_on_ipc_failure
986 .run(write_frame_async(&mut self.connection, &msg).await)
987 .await;
988
989 let response = loop {
990 let response_bytes = dump_on_ipc_failure
991 .run(read_frame_async(&mut self.connection).await)
992 .await;
993 let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
994 match response {
995 IpcResponse::TestFinished { .. } => break response,
996 IpcResponse::CloneableAccepted { .. }
997 | IpcResponse::HostedDescriptorAccepted { .. } => continue,
998 IpcResponse::HostedRpcCall {
999 request_id,
1000 dep_id,
1001 method_idx,
1002 args_bytes,
1003 } => {
1004 self.handle_hosted_rpc_call(
1005 &dump_on_ipc_failure,
1006 request_id,
1007 dep_id,
1008 method_idx,
1009 args_bytes,
1010 )
1011 .await;
1012 continue;
1013 }
1014 }
1015 };
1016
1017 let IpcResponse::TestFinished {
1018 result,
1019 finish_marker,
1020 } = response
1021 else {
1022 unreachable!("loop only breaks on TestFinished")
1023 };
1024
1025 if test.props.capture_control.requires_capturing(!nocapture) {
1026 let out_lines: Vec<_> =
1027 Self::drain_until(self.out_lines.clone(), finish_marker.clone()).await;
1028 let err_lines: Vec<_> =
1029 Self::drain_until(self.err_lines.clone(), finish_marker.clone()).await;
1030 result.into_test_result(out_lines, err_lines)
1031 } else {
1032 result.into_test_result(Vec::new(), Vec::new())
1033 }
1034 }
1035
1036 async fn provide_cloneable(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
1039 let dump_on_ipc_failure = self.dump_on_failure();
1040 let cmd = IpcCommand::ProvideCloneable {
1041 dep_id: dep_id.clone(),
1042 wire_bytes,
1043 };
1044 let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
1045 dump_on_ipc_failure
1046 .run(write_frame_async(&mut self.connection, &msg).await)
1047 .await;
1048
1049 loop {
1050 let response_bytes = dump_on_ipc_failure
1051 .run(read_frame_async(&mut self.connection).await)
1052 .await;
1053 let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
1054 match response {
1055 IpcResponse::CloneableAccepted { dep_id: ack_id } => {
1056 if ack_id == dep_id {
1057 return;
1058 }
1059 }
1060 IpcResponse::HostedDescriptorAccepted { .. } => {
1061 }
1063 IpcResponse::TestFinished { .. } => {
1064 }
1066 IpcResponse::HostedRpcCall {
1067 request_id,
1068 dep_id: rpc_dep_id,
1069 method_idx,
1070 args_bytes,
1071 } => {
1072 self.handle_hosted_rpc_call(
1077 &dump_on_ipc_failure,
1078 request_id,
1079 rpc_dep_id,
1080 method_idx,
1081 args_bytes,
1082 )
1083 .await;
1084 }
1085 }
1086 }
1087 }
1088
1089 async fn provide_hosted_descriptor(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
1091 let dump_on_ipc_failure = self.dump_on_failure();
1092 let cmd = IpcCommand::ProvideHostedDescriptor {
1093 dep_id: dep_id.clone(),
1094 wire_bytes,
1095 };
1096 let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
1097 dump_on_ipc_failure
1098 .run(write_frame_async(&mut self.connection, &msg).await)
1099 .await;
1100
1101 loop {
1102 let response_bytes = dump_on_ipc_failure
1103 .run(read_frame_async(&mut self.connection).await)
1104 .await;
1105 let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
1106 match response {
1107 IpcResponse::HostedDescriptorAccepted { dep_id: ack_id } => {
1108 if ack_id == dep_id {
1109 return;
1110 }
1111 }
1112 IpcResponse::CloneableAccepted { .. } => {
1113 }
1115 IpcResponse::TestFinished { .. } => {
1116 }
1118 IpcResponse::HostedRpcCall {
1119 request_id,
1120 dep_id: rpc_dep_id,
1121 method_idx,
1122 args_bytes,
1123 } => {
1124 self.handle_hosted_rpc_call(
1127 &dump_on_ipc_failure,
1128 request_id,
1129 rpc_dep_id,
1130 method_idx,
1131 args_bytes,
1132 )
1133 .await;
1134 }
1135 }
1136 }
1137 }
1138
1139 fn dump_on_failure(&self) -> DumpOnFailure {
1140 DumpOnFailure {
1141 out_lines: self.out_lines.clone(),
1142 err_lines: self.err_lines.clone(),
1143 }
1144 }
1145
1146 async fn drain_until(
1147 source: Arc<Mutex<VecDeque<CapturedOutput>>>,
1148 finish_marker: String,
1149 ) -> Vec<CapturedOutput> {
1150 let mut result = Vec::new();
1151 loop {
1152 let mut source = source.lock().await;
1153 while let Some(line) = source.pop_front() {
1154 if line.line() == finish_marker {
1155 return result;
1156 } else {
1157 result.push(line.clone());
1158 }
1159 }
1160 drop(source);
1161
1162 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1163 }
1164 }
1165}
1166
1167struct DumpOnFailure {
1168 out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1169 err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1170}
1171
1172impl DumpOnFailure {
1173 pub async fn run<T, E>(&self, result: Result<T, E>) -> T {
1174 match result {
1175 Ok(value) => value,
1176 Err(_error) => {
1177 let out_lines: Vec<_> = self.out_lines.lock().await.drain(..).collect();
1178 let err_lines: Vec<_> = self.err_lines.lock().await.drain(..).collect();
1179 let mut all_lines = [out_lines, err_lines].concat();
1180 all_lines.sort();
1181
1182 use std::io::Write;
1190 let mut err = crate::host_capture::TerminalStderr;
1191 for line in all_lines {
1192 let _ = writeln!(err, "{}", line.line());
1193 }
1194 let _ = err.flush();
1195
1196 std::process::exit(1);
1197 }
1198 }
1199 }
1200}
1201
1202async fn spawn_worker_if_needed(args: &Arguments) -> Option<Worker> {
1203 if args.spawn_workers {
1204 let id = Uuid::new_v4();
1205 let name_str = format!("{id}.sock");
1206 let name = name_str
1207 .clone()
1208 .to_ns_name::<GenericNamespaced>()
1209 .expect("Invalid local socket name");
1210 let opts = ListenerOptions::new().name(name.clone());
1211 let listener = opts
1212 .create_tokio()
1213 .expect("Failed to create local socket listener");
1214
1215 let exe = std::env::current_exe().expect("Failed to get current executable path");
1216
1217 let mut args = args.clone();
1218 args.ipc = Some(name_str);
1219 args.spawn_workers = false;
1220 args.logfile = None;
1221 let args = args.to_args();
1222
1223 let mut process = Command::new(exe)
1224 .args(args)
1225 .stdin(Stdio::inherit())
1226 .stderr(Stdio::piped())
1227 .stdout(Stdio::piped())
1228 .spawn()
1229 .expect("Failed to spawn worker process");
1230
1231 let stdout = process.stdout.take().unwrap();
1232 let stderr = process.stderr.take().unwrap();
1233
1234 let out_lines = Arc::new(Mutex::new(VecDeque::new()));
1235 let err_lines = Arc::new(Mutex::new(VecDeque::new()));
1236 let capture_enabled = Arc::new(Mutex::new(true));
1237
1238 let out_lines_clone = out_lines.clone();
1239 let capture_enabled_clone = capture_enabled.clone();
1240 let out_handle = spawn(async move {
1241 let reader = BufReader::new(stdout);
1242 let mut lines = reader.lines();
1243 while let Some(line) = lines
1244 .next_line()
1245 .await
1246 .expect("Failed to read from worker stdout")
1247 {
1248 if *capture_enabled_clone.lock().await {
1249 out_lines_clone
1250 .lock()
1251 .await
1252 .push_back(CapturedOutput::stdout(line));
1253 } else {
1254 use std::io::Write;
1260 let mut out = crate::host_capture::TerminalStdout;
1261 let _ = writeln!(out, "{line}");
1262 let _ = out.flush();
1263 }
1264 }
1265 });
1266
1267 let err_lines_clone = err_lines.clone();
1268 let capture_enabled_clone = capture_enabled.clone();
1269 let err_handle = spawn(async move {
1270 let reader = BufReader::new(stderr);
1271 let mut lines = reader.lines();
1272 while let Some(line) = lines
1273 .next_line()
1274 .await
1275 .expect("Failed to read from worker stderr")
1276 {
1277 if *capture_enabled_clone.lock().await {
1278 err_lines_clone
1279 .lock()
1280 .await
1281 .push_back(CapturedOutput::stderr(line));
1282 } else {
1283 use std::io::Write;
1287 let mut err = crate::host_capture::TerminalStderr;
1288 let _ = writeln!(err, "{line}");
1289 let _ = err.flush();
1290 }
1291 }
1292 });
1293
1294 let connection = listener
1295 .accept()
1296 .await
1297 .expect("Failed to accept connection");
1298
1299 Some(Worker {
1300 _listener: listener,
1301 _process: process,
1302 _out_handle: out_handle,
1303 _err_handle: err_handle,
1304 out_lines,
1305 err_lines,
1306 connection,
1307 capture_enabled,
1308 hosted_rpc_owner_cells: Arc::new(HashMap::new()),
1309 })
1310 } else {
1311 None
1312 }
1313}
1314
1315fn install_local_hosted_rpc_stubs(
1321 execution: &mut TestSuiteExecution,
1322 rpc_factories: &HashMap<String, RpcFactory>,
1323 owner_cells: &HashMap<String, Arc<HostedRpcOwnerCell>>,
1324) {
1325 let transport: Arc<dyn HostedRpcTransport> =
1326 Arc::new(InProcessHostedRpcTransport::new(owner_cells.clone()));
1327 for (dep_id, factory) in rpc_factories.iter() {
1328 if !owner_cells.contains_key(dep_id) {
1329 continue;
1333 }
1334 let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1335 let stub = (factory.build_stub)(channel);
1336 let applied = execution.provide_cloneable_value(dep_id, stub);
1337 assert!(
1338 applied,
1339 "Local HostedRpc stub for '{dep_id}' did not match any registered dep"
1340 );
1341 }
1342}
1343
1344async fn install_worker_subprocess_hosted_rpc_stubs(
1349 execution: &Arc<Mutex<TestSuiteExecution>>,
1350 rpc_factories: &HashMap<String, RpcFactory>,
1351 connection_arc: Arc<Mutex<Stream>>,
1352) {
1353 let transport: Arc<dyn HostedRpcTransport> =
1354 Arc::new(IpcHostedRpcTransport::new(connection_arc));
1355 for (dep_id, factory) in rpc_factories.iter() {
1356 let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1357 let stub = (factory.build_stub)(channel);
1358 let mut execution = execution.lock().await;
1359 let applied = execution.provide_cloneable_value(dep_id, stub);
1360 let _ = applied;
1363 }
1364}
1365
1366struct IpcHostedRpcTransport {
1384 connection: Arc<Mutex<Stream>>,
1385 next_request_id: AtomicU64,
1386}
1387
1388impl IpcHostedRpcTransport {
1389 fn new(connection: Arc<Mutex<Stream>>) -> Self {
1390 Self {
1391 connection,
1392 next_request_id: AtomicU64::new(0),
1393 }
1394 }
1395}
1396
1397impl HostedRpcTransport for IpcHostedRpcTransport {
1398 fn call(
1399 &self,
1400 dep_id: &str,
1401 method_idx: u32,
1402 args: Vec<u8>,
1403 ) -> Result<Vec<u8>, HostedRpcError> {
1404 let request_id = self.next_request_id.fetch_add(1, Ordering::SeqCst);
1405 let call = IpcResponse::HostedRpcCall {
1406 request_id,
1407 dep_id: dep_id.to_string(),
1408 method_idx,
1409 args_bytes: args,
1410 };
1411 let msg = serialize_to_byte_vec(&call).map_err(|e| {
1412 HostedRpcError::Transport(format!("encode HostedRpcCall failed: {e:?}"))
1413 })?;
1414
1415 let connection = self.connection.clone();
1416 let handle = tokio::runtime::Handle::current();
1417
1418 tokio::task::block_in_place(move || {
1423 handle.block_on(async move {
1424 let mut conn = connection.lock().await;
1425 write_frame_async(&mut *conn, &msg).await.map_err(|e| {
1426 HostedRpcError::Transport(format!("write HostedRpcCall failed: {e:?}"))
1427 })?;
1428 let reply_bytes = read_frame_async(&mut *conn).await.map_err(|e| {
1429 HostedRpcError::Transport(format!("read HostedRpcReply failed: {e:?}"))
1430 })?;
1431 let command: IpcCommand = deserialize(&reply_bytes).map_err(|e| {
1432 HostedRpcError::Transport(format!("decode HostedRpcReply failed: {e:?}"))
1433 })?;
1434 match command {
1435 IpcCommand::HostedRpcReply {
1436 request_id: reply_id,
1437 body,
1438 } => {
1439 if reply_id != request_id {
1440 return Err(HostedRpcError::Transport(format!(
1441 "HostedRpcReply request_id mismatch: expected {request_id}, got {reply_id}"
1442 )));
1443 }
1444 match body {
1445 HostedRpcReplyBody::Ok { result_bytes } => Ok(result_bytes),
1446 HostedRpcReplyBody::Err { message } => {
1447 Err(HostedRpcError::Dispatch(message))
1448 }
1449 }
1450 }
1451 other => Err(HostedRpcError::Transport(format!(
1452 "unexpected IpcCommand while waiting for HostedRpcReply: {other:?}"
1453 ))),
1454 }
1455 })
1456 })
1457 }
1458}