1use crate::args::{Arguments, TimeThreshold};
2use crate::bench::AsyncBencher;
3use crate::execution::{DepWireBytes, TestExecution, TestSuiteExecution};
4use crate::internal;
5use crate::internal::{
6 generate_tests, get_ensure_time, CapturedOutput, CloneableCodec, FailureCause,
7 FlakinessControl, HostedRpcChannel, HostedRpcError, HostedRpcOwnerCell, HostedRpcTransport,
8 InProcessHostedRpcTransport, RegisteredTest, RpcFactory, SuiteResult, TestFunction, TestResult,
9 WorkerReconstructor,
10};
11use crate::ipc::{
12 ipc_name, read_frame_async, write_frame_async, HostedRpcReplyBody, IpcCommand, IpcResponse,
13};
14use crate::output::{test_runner_output, TestRunnerOutput};
15use desert_rust::{deserialize, serialize_to_byte_vec};
16use futures::FutureExt;
17use interprocess::local_socket::tokio::prelude::*;
18use interprocess::local_socket::tokio::{Listener, Stream};
19use interprocess::local_socket::{GenericNamespaced, ListenerOptions};
20use std::any::Any;
21use std::collections::HashMap;
22use std::collections::VecDeque;
23use std::future::Future;
24use std::panic::AssertUnwindSafe;
25use std::pin::Pin;
26use std::process::{ExitCode, Stdio};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
30use tokio::process::{Child, Command};
31use tokio::spawn;
32use tokio::sync::Mutex;
33use tokio::task::{spawn_blocking, JoinHandle, JoinSet};
34use tokio::time::Instant;
35use uuid::Uuid;
36
37pub fn test_runner() -> ExitCode {
38 tokio::runtime::Builder::new_multi_thread()
39 .enable_all()
40 .build()
41 .unwrap()
42 .block_on(async_test_runner())
43}
44
45#[allow(clippy::await_holding_lock)]
46async fn async_test_runner() -> ExitCode {
47 crate::panic_hook::install_panic_hook();
48 let mut args = Arguments::from_args();
49 if let Some(idx) = args.worker_index {
53 crate::worker::set_worker_index(idx);
54 }
55 let output = test_runner_output(&args);
56
57 let registered_tests = internal::REGISTERED_TESTS.lock().unwrap();
58 let registered_dependency_constructors =
59 internal::REGISTERED_DEPENDENCY_CONSTRUCTORS.lock().unwrap();
60 let registered_testsuite_props = internal::REGISTERED_TESTSUITE_PROPS.lock().unwrap();
61 let registered_test_generators = internal::REGISTERED_TEST_GENERATORS.lock().unwrap();
62
63 let generated_tests = generate_tests(®istered_test_generators).await;
64
65 let all_tests: Vec<RegisteredTest> = registered_tests
66 .iter()
67 .cloned()
68 .chain(generated_tests)
69 .collect();
70
71 if args.list {
72 output.test_list(&all_tests);
73 ExitCode::SUCCESS
74 } else {
75 let mut remaining_retries = args.flaky_run.unwrap_or(1);
76
77 let mut exit_code = ExitCode::from(101);
78 while remaining_retries > 0 {
79 let (mut execution, filtered_tests) = TestSuiteExecution::construct(
80 &args,
81 registered_dependency_constructors.as_slice(),
82 &all_tests,
83 registered_testsuite_props.as_slice(),
84 );
85 args.finalize_for_execution(&execution, output.clone());
86 let is_top_level_parent = args.is_top_level_parent();
87 let has_selected_tests = execution.remaining() > 0;
88 let needs_parent_shared = execution.has_cloneable_dependencies()
93 || execution.has_hosted_dependencies()
94 || execution.has_hosted_rpc_dependencies();
95 let parent_shared = if is_top_level_parent && has_selected_tests && needs_parent_shared
96 {
97 execution.collect_parent_shared_dependencies_async().await
98 } else {
99 crate::execution::ParentSharedDependencies {
100 cloneable_wire_bytes: Vec::new(),
101 cloneable_local_values: Vec::new(),
102 hosted_descriptor_bytes: Vec::new(),
103 hosted_owners: Vec::new(),
104 hosted_rpc_owner_cells: Vec::new(),
105 }
106 };
107 let cloneable_wire_bytes = parent_shared.cloneable_wire_bytes;
108 let cloneable_local_values = parent_shared.cloneable_local_values;
109 let hosted_descriptor_bytes = parent_shared.hosted_descriptor_bytes;
110 let _hosted_owners = parent_shared.hosted_owners;
111 let hosted_rpc_owner_cells: HashMap<String, Arc<HostedRpcOwnerCell>> =
112 parent_shared.hosted_rpc_owner_cells.into_iter().collect();
113 let rpc_factories: HashMap<String, RpcFactory> = registered_dependency_constructors
117 .iter()
118 .filter_map(|d| {
119 if d.scope == crate::internal::DepScope::HostedRpc {
120 d.rpc_factory
121 .as_ref()
122 .map(|f| (d.qualified_id(), f.clone()))
123 } else {
124 None
125 }
126 })
127 .collect();
128 let cloneable_codecs: HashMap<String, (CloneableCodec, WorkerReconstructor)> =
136 registered_dependency_constructors
137 .iter()
138 .filter_map(|d| {
139 let codec_opt = match d.scope {
140 crate::internal::DepScope::Cloneable => d.cloneable_codec.as_ref(),
141 crate::internal::DepScope::Hosted => d.hosted_codec.as_ref(),
142 _ => None,
143 };
144 match (codec_opt, &d.worker_fn) {
145 (Some(codec), Some(worker_fn)) => {
146 Some((d.qualified_id(), (codec.clone(), worker_fn.clone())))
147 }
148 _ => None,
149 }
150 })
151 .collect();
152 if is_top_level_parent && !args.spawn_workers && !cloneable_local_values.is_empty() {
162 apply_cloneable_values_locally(&mut execution, &cloneable_local_values);
163 }
164 if is_top_level_parent && !args.spawn_workers && !hosted_descriptor_bytes.is_empty() {
172 apply_hosted_descriptors_locally(
173 &mut execution,
174 &cloneable_codecs,
175 &hosted_descriptor_bytes,
176 )
177 .await;
178 }
179 if is_top_level_parent && !args.spawn_workers && !hosted_rpc_owner_cells.is_empty() {
184 install_local_hosted_rpc_stubs(
185 &mut execution,
186 &rpc_factories,
187 &hosted_rpc_owner_cells,
188 );
189 }
190 if args.spawn_workers {
191 execution.skip_creating_dependencies();
192 }
193
194 let count = execution.remaining();
199 let results = Arc::new(Mutex::new(Vec::with_capacity(count)));
200
201 let start = Instant::now();
202 output.start_suite(&filtered_tests);
203
204 let execution = Arc::new(Mutex::new(execution));
205 let cloneable_wire_bytes = Arc::new(cloneable_wire_bytes);
206 let hosted_descriptor_bytes = Arc::new(hosted_descriptor_bytes);
207 let cloneable_codecs = Arc::new(cloneable_codecs);
208 let rpc_factories = Arc::new(rpc_factories);
209 let hosted_rpc_owner_cells = Arc::new(hosted_rpc_owner_cells);
210 let mut join_set = JoinSet::new();
211 let threads = args.test_threads().get();
212
213 for worker_idx in 0..threads {
214 let execution_clone = execution.clone();
215 let output_clone = output.clone();
216 let mut args_clone = args.clone();
219 if args_clone.spawn_workers {
220 args_clone.worker_index = Some(worker_idx);
221 }
222 let results_clone = results.clone();
223 let wire_bytes_clone = cloneable_wire_bytes.clone();
224 let hosted_bytes_clone = hosted_descriptor_bytes.clone();
225 let codecs_clone = cloneable_codecs.clone();
226 let rpc_factories_clone = rpc_factories.clone();
227 let hosted_rpc_owner_cells_clone = hosted_rpc_owner_cells.clone();
228 let handle = tokio::runtime::Handle::current();
229 join_set.spawn_blocking(move || {
230 handle.block_on(test_thread(
231 args_clone,
232 execution_clone,
233 output_clone,
234 count,
235 results_clone,
236 wire_bytes_clone,
237 hosted_bytes_clone,
238 codecs_clone,
239 rpc_factories_clone,
240 hosted_rpc_owner_cells_clone,
241 ))
242 });
243 }
244
245 while let Some(res) = join_set.join_next().await {
246 res.expect("Failed to join task");
247 }
248
249 drop(execution);
250
251 let results = results.lock().await;
252 output.finished_suite(&all_tests, &results, start.elapsed());
253 exit_code = SuiteResult::exit_code(&results);
254
255 if exit_code == ExitCode::SUCCESS {
256 break;
257 } else {
258 remaining_retries -= 1;
259 }
260 }
261 exit_code
262 }
263}
264
265#[allow(clippy::too_many_arguments)]
266async fn test_thread(
267 args: Arguments,
268 execution: Arc<Mutex<TestSuiteExecution>>,
269 output: Arc<dyn TestRunnerOutput>,
270 count: usize,
271 results: Arc<Mutex<Vec<(RegisteredTest, TestResult)>>>,
272 cloneable_wire_bytes: Arc<Vec<DepWireBytes>>,
273 hosted_descriptor_bytes: Arc<Vec<DepWireBytes>>,
274 cloneable_codecs: Arc<HashMap<String, (CloneableCodec, WorkerReconstructor)>>,
275 rpc_factories: Arc<HashMap<String, RpcFactory>>,
276 hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
277) {
278 let mut worker = spawn_worker_if_needed(&args).await;
279 if let Some(worker) = worker.as_mut() {
284 worker.set_hosted_rpc_owner_cells(hosted_rpc_owner_cells.clone());
285 }
286 let connection_arc = if let Some(ref name) = args.ipc {
287 let name = ipc_name(name.clone());
288 let stream = Stream::connect(name)
289 .await
290 .expect("Failed to connect to IPC socket");
291 Some(Arc::new(Mutex::new(stream)))
292 } else {
293 None
294 };
295
296 if let Some(worker) = worker.as_mut() {
297 for (dep_id, wire_bytes) in cloneable_wire_bytes.iter() {
298 worker
299 .provide_cloneable(dep_id.clone(), wire_bytes.clone())
300 .await;
301 }
302 for (dep_id, descriptor_bytes) in hosted_descriptor_bytes.iter() {
304 worker
305 .provide_hosted_descriptor(dep_id.clone(), descriptor_bytes.clone())
306 .await;
307 }
308 }
309
310 if let Some(connection) = connection_arc.as_ref() {
315 if !rpc_factories.is_empty() {
316 install_worker_subprocess_hosted_rpc_stubs(
317 &execution,
318 &rpc_factories,
319 connection.clone(),
320 )
321 .await;
322 }
323 }
324
325 let mut expected_test = None;
326
327 while !is_done(&execution).await {
328 if let Some(connection) = connection_arc.as_ref() {
329 while expected_test.is_none() {
330 let mut conn = connection.lock().await;
331 let command_bytes = read_frame_async(&mut *conn)
332 .await
333 .expect("Failed to read IPC command frame");
334 drop(conn);
335 let command: IpcCommand =
336 deserialize(&command_bytes).expect("Failed to decode IPC command");
337
338 match command {
339 IpcCommand::RunTest {
340 name,
341 crate_name,
342 module_path,
343 } => {
344 expected_test = Some((name, crate_name, module_path));
345 }
346 IpcCommand::ProvideCloneable { dep_id, wire_bytes } => {
347 apply_provided_wire_bytes(
349 &execution,
350 &cloneable_codecs,
351 &dep_id,
352 &wire_bytes,
353 "ProvideCloneable",
354 )
355 .await;
356 let response = IpcResponse::CloneableAccepted { dep_id };
357 let msg = serialize_to_byte_vec(&response)
358 .expect("Failed to encode IPC response");
359 let mut conn = connection.lock().await;
360 write_frame_async(&mut *conn, &msg)
361 .await
362 .expect("Failed to write IPC response frame");
363 }
364 IpcCommand::ProvideHostedDescriptor { dep_id, wire_bytes } => {
365 apply_provided_wire_bytes(
369 &execution,
370 &cloneable_codecs,
371 &dep_id,
372 &wire_bytes,
373 "ProvideHostedDescriptor",
374 )
375 .await;
376 let response = IpcResponse::HostedDescriptorAccepted { dep_id };
377 let msg = serialize_to_byte_vec(&response)
378 .expect("Failed to encode IPC response");
379 let mut conn = connection.lock().await;
380 write_frame_async(&mut *conn, &msg)
381 .await
382 .expect("Failed to write IPC response frame");
383 }
384 IpcCommand::HostedRpcReply { .. } => {
385 panic!(
392 "unexpected `HostedRpcReply` while waiting for the next \
393 between-tests command in the tokio worker subprocess: a \
394 stub call must have left a reply on the wire without \
395 draining it inline"
396 );
397 }
398 }
399 }
400 }
401
402 if let Some(next) = pick_next(&execution).await {
403 let skip = if let Some((name, crate_name, module_path)) = &expected_test {
404 next.test.name != *name
405 || next.test.crate_name != *crate_name
406 || next.test.module_path != *module_path
407 } else {
408 false
409 };
410
411 if !skip {
412 expected_test = None;
413
414 let ensure_time = get_ensure_time(&args, &next.test);
415
416 output.start_running_test(&next.test, next.index, count);
417 let result = run_test(
418 output.clone(),
419 next.index,
420 count,
421 args.nocapture,
422 args.include_ignored,
423 ensure_time,
424 next.deps.clone(),
425 &next.test,
426 &mut worker,
427 )
428 .await;
429 output.finished_running_test(&next.test, next.index, count, &result);
430
431 if let Some(connection) = connection_arc.as_ref() {
432 let finish_marker = Uuid::new_v4().to_string();
433 let finish_marker_line = format!("{finish_marker}\n");
434 tokio::io::stdout()
435 .write_all(finish_marker_line.as_bytes())
436 .await
437 .unwrap();
438 tokio::io::stderr()
439 .write_all(finish_marker_line.as_bytes())
440 .await
441 .unwrap();
442 tokio::io::stdout().flush().await.unwrap();
443 tokio::io::stderr().flush().await.unwrap();
444
445 let response = IpcResponse::TestFinished {
446 result: (&result).into(),
447 finish_marker,
448 };
449 let msg =
450 serialize_to_byte_vec(&response).expect("Failed to encode IPC response");
451 let mut conn = connection.lock().await;
452 write_frame_async(&mut *conn, &msg)
453 .await
454 .expect("Failed to write IPC response frame");
455 }
456
457 results.lock().await.push((next.test.clone(), result));
458 }
459 }
460 }
461}
462
463async fn is_done(execution: &Arc<Mutex<TestSuiteExecution>>) -> bool {
464 let execution = execution.lock().await;
465 execution.is_done()
466}
467
468async fn apply_provided_wire_bytes(
478 execution: &Arc<Mutex<TestSuiteExecution>>,
479 wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
480 dep_id: &str,
481 wire_bytes: &[u8],
482 source_command: &str,
483) {
484 let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
485 panic!("{source_command} referenced unknown wire-shipped dep '{dep_id}'")
486 });
487
488 let wire_payload = (codec.from_wire_bytes)(wire_bytes);
489 let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
490 Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
491 let reconstructed = match worker_fn {
492 WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
493 WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
494 };
495
496 let mut execution = execution.lock().await;
497 let applied = execution.provide_cloneable_value(dep_id, reconstructed);
498 assert!(
499 applied,
500 "{source_command} for dep '{dep_id}' did not match any registered dep in this worker"
501 );
502}
503
504fn apply_cloneable_values_locally(
519 execution: &mut TestSuiteExecution,
520 cloneable_local_values: &[(String, Arc<dyn Any + Send + Sync>)],
521) {
522 for (dep_id, value) in cloneable_local_values {
523 let applied = execution.provide_cloneable_value(dep_id, value.clone());
524 assert!(
525 applied,
526 "Cloneable dep '{dep_id}' could not be pre-populated locally"
527 );
528 }
529}
530
531async fn apply_hosted_descriptors_locally(
544 execution: &mut TestSuiteExecution,
545 wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
546 descriptor_bytes: &[DepWireBytes],
547) {
548 for (dep_id, wire_bytes) in descriptor_bytes {
549 let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
550 panic!("Hosted dep '{dep_id}' missing codec/worker_fn for local handle reconstruction")
551 });
552 let wire_payload = (codec.from_wire_bytes)(wire_bytes);
553 let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
554 Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
555 let reconstructed = match worker_fn {
556 WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
557 WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
558 };
559 let applied = execution.provide_cloneable_value(dep_id, reconstructed);
560 assert!(
561 applied,
562 "Hosted dep '{dep_id}' could not be pre-populated locally"
563 );
564 }
565}
566
567async fn pick_next(execution: &Arc<Mutex<TestSuiteExecution>>) -> Option<TestExecution> {
568 let mut execution = execution.lock().await;
569 execution.pick_next().await
570}
571
572async fn run_with_flakiness_control<F>(
573 output: Arc<dyn TestRunnerOutput>,
574 test_description: &RegisteredTest,
575 idx: usize,
576 count: usize,
577 test: F,
578) -> Result<Result<(), FailureCause>, Box<dyn Any + Send>>
579where
580 F: Fn(
581 Instant,
582 )
583 -> Pin<Box<dyn Future<Output = Result<Result<(), FailureCause>, Box<dyn Any + Send>>>>>
584 + Send
585 + Sync,
586{
587 match &test_description.props.flakiness_control {
588 FlakinessControl::None => {
589 let start = Instant::now();
590 test(start).await
591 }
592 FlakinessControl::ProveNonFlaky(tries) => {
593 for n in 0..*tries {
594 if n > 0 {
595 output.repeat_running_test(
596 test_description,
597 idx,
598 count,
599 n + 1,
600 *tries,
601 "to ensure test is not flaky",
602 );
603 }
604 let start = Instant::now();
605 match test(start).await {
606 Ok(Ok(())) => {}
607 Ok(Err(e)) => return Ok(Err(e)),
608 Err(e) => return Err(e),
609 };
610 }
611 Ok(Ok(()))
612 }
613 FlakinessControl::RetryKnownFlaky(max_retries) => {
614 let mut tries = 1;
615 loop {
616 let start = Instant::now();
617 let result = test(start).await;
618
619 if result.is_err() && tries < *max_retries {
620 tries += 1;
621 output.repeat_running_test(
622 test_description,
623 idx,
624 count,
625 tries,
626 *max_retries,
627 "because test is known to be flaky",
628 );
629 } else {
630 break result;
631 }
632 }
633 }
634 }
635}
636
637#[allow(clippy::too_many_arguments)]
638async fn run_test(
639 output: Arc<dyn TestRunnerOutput>,
640 idx: usize,
641 count: usize,
642 nocapture: bool,
643 include_ignored: bool,
644 ensure_time: Option<TimeThreshold>,
645 dependency_view: Arc<dyn internal::DependencyView + Send + Sync>,
646 test: &RegisteredTest,
647 worker: &mut Option<Worker>,
648) -> TestResult {
649 if test.props.is_ignored && !include_ignored {
650 TestResult::ignored()
651 } else if let Some(worker) = worker.as_mut() {
652 worker.run_test(nocapture, test).await
653 } else {
654 let start = Instant::now();
655 let test = test.clone();
656 match &test.run {
657 TestFunction::Sync(_) => {
658 let handle = spawn_blocking(move || {
659 let test = test.clone();
660 crate::sync::run_sync_test_function(
661 output,
662 &test,
663 idx,
664 count,
665 ensure_time,
666 dependency_view,
667 )
668 });
669 handle.await.unwrap_or_else(|join_error| {
670 TestResult::failed(
671 start.elapsed(),
672 FailureCause::HarnessError(format!(
673 "Failed joining test task: {join_error}"
674 )),
675 )
676 })
677 }
678 TestFunction::Async(test_fn) => {
679 let timeout = test.props.timeout;
680 let test_fn = test_fn.clone();
681 let detached_panic_policy = test.props.detached_panic_policy.clone();
682 let result = run_with_flakiness_control(output, &test, idx, count, |start| {
683 let dependency_view = dependency_view.clone();
684 let test_fn = test_fn.clone();
685 Box::pin(async move {
686 let test_id = crate::panic_hook::next_test_id();
687 crate::panic_hook::set_current_test_id(test_id);
688 crate::panic_hook::create_detached_collector(test_id);
689 let result = AssertUnwindSafe(Box::pin(async move {
690 match timeout {
691 None => test_fn(dependency_view).await,
692 Some(duration) => {
693 let result =
694 tokio::time::timeout(duration, test_fn(dependency_view))
695 .await;
696 match result {
697 Ok(result) => result,
698 Err(_) => {
699 return Err(FailureCause::HarnessError(
700 "Test timed out".to_string(),
701 ))
702 }
703 }
704 }
705 }
706 .into_result()?;
707 if let Some(ensure_time) = ensure_time {
708 let elapsed = start.elapsed();
709 if ensure_time.is_critical(&elapsed) {
710 return Err(FailureCause::HarnessError(format!(
711 "Test run time exceeds critical threshold: {elapsed:?}"
712 )));
713 }
714 }
715 Ok(())
716 }))
717 .catch_unwind()
718 .await;
719 result
720 })
721 })
722 .await;
723 let mut test_result =
724 TestResult::from_result(&test.props.should_panic, start.elapsed(), result);
725 if let Some(test_id) = crate::panic_hook::current_test_id() {
726 if let Some(collector) = crate::panic_hook::take_detached_collector(test_id) {
727 let panics = match collector.lock() {
728 Ok(p) => p,
729 Err(poisoned) => poisoned.into_inner(),
730 };
731 if !panics.is_empty()
732 && detached_panic_policy == internal::DetachedPanicPolicy::FailTest
733 && test_result.is_passed()
734 {
735 let messages: Vec<String> = panics.iter().map(|p| p.render()).collect();
736 test_result = TestResult::failed(
737 start.elapsed(),
738 FailureCause::Panic(internal::PanicCause {
739 message: Some(format!(
740 "Detached task(s) panicked:\n{}",
741 messages.join("\n---\n")
742 )),
743 location: panics.first().and_then(|p| p.location.clone()),
744 backtrace: panics.first().and_then(|p| p.backtrace.clone()),
745 }),
746 );
747 }
748 }
749 }
750 crate::panic_hook::clear_current_test_id();
751 test_result
752 }
753 TestFunction::SyncBench(_) => {
754 let handle = spawn_blocking(move || {
755 let test = test.clone();
756 crate::sync::run_sync_test_function(
757 output,
758 &test,
759 idx,
760 count,
761 ensure_time,
762 dependency_view,
763 )
764 });
765 handle.await.unwrap_or_else(|join_error| {
766 TestResult::failed(
767 start.elapsed(),
768 FailureCause::HarnessError(format!(
769 "Failed joining test task: {join_error}"
770 )),
771 )
772 })
773 }
774 TestFunction::AsyncBench(bench_fn) => {
775 let mut bencher = AsyncBencher::new();
776 let test_id = crate::panic_hook::next_test_id();
777 crate::panic_hook::set_current_test_id(test_id);
778 let result = AssertUnwindSafe(async move {
779 bench_fn(&mut bencher, dependency_view).await;
780 (
781 bencher
782 .summary()
783 .expect("iter() was not called in bench function"),
784 bencher.bytes,
785 )
786 })
787 .catch_unwind()
788 .await;
789 let bytes = result.as_ref().map(|(_, bytes)| *bytes).unwrap_or_default();
790 let test_result = TestResult::from_summary(
791 &test.props.should_panic,
792 start.elapsed(),
793 result.map(|(summary, _)| summary),
794 bytes,
795 );
796 crate::panic_hook::clear_current_test_id();
797 test_result
798 }
799 }
800 }
801}
802
803struct Worker {
804 _listener: Listener,
805 _process: Child,
806 _out_handle: JoinHandle<()>,
807 _err_handle: JoinHandle<()>,
808 out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
809 err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
810 capture_enabled: Arc<Mutex<bool>>,
811 connection: Stream,
812 hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
816}
817
818impl Worker {
819 fn set_hosted_rpc_owner_cells(&mut self, cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>) {
823 self.hosted_rpc_owner_cells = cells;
824 }
825
826 async fn handle_hosted_rpc_call(
832 &mut self,
833 dump_on_ipc_failure: &DumpOnFailure,
834 request_id: u64,
835 dep_id: String,
836 method_idx: u32,
837 args_bytes: Vec<u8>,
838 ) {
839 let body = match self.hosted_rpc_owner_cells.get(&dep_id) {
840 Some(cell) => match cell.dispatch_async(method_idx, &args_bytes).await {
846 Ok(result_bytes) => HostedRpcReplyBody::Ok { result_bytes },
847 Err(message) => HostedRpcReplyBody::Err { message },
848 },
849 None => HostedRpcReplyBody::Err {
850 message: format!(
851 "HostedRpc dispatch: unknown dep id '{dep_id}' in parent owner-cell map"
852 ),
853 },
854 };
855 let reply = IpcCommand::HostedRpcReply { request_id, body };
856 let msg = serialize_to_byte_vec(&reply).expect("Failed to encode HostedRpcReply");
857 dump_on_ipc_failure
858 .run(write_frame_async(&mut self.connection, &msg).await)
859 .await;
860 }
861
862 pub async fn run_test(&mut self, nocapture: bool, test: &RegisteredTest) -> TestResult {
863 let mut capture_enabled = self.capture_enabled.lock().await;
864 *capture_enabled = test.props.capture_control.requires_capturing(!nocapture);
865 drop(capture_enabled);
866
867 let cmd = IpcCommand::RunTest {
869 name: test.name.clone(),
870 crate_name: test.crate_name.clone(),
871 module_path: test.module_path.clone(),
872 };
873
874 let dump_on_ipc_failure = self.dump_on_failure();
875
876 let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
877 dump_on_ipc_failure
878 .run(write_frame_async(&mut self.connection, &msg).await)
879 .await;
880
881 let response = loop {
882 let response_bytes = dump_on_ipc_failure
883 .run(read_frame_async(&mut self.connection).await)
884 .await;
885 let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
886 match response {
887 IpcResponse::TestFinished { .. } => break response,
888 IpcResponse::CloneableAccepted { .. }
889 | IpcResponse::HostedDescriptorAccepted { .. } => continue,
890 IpcResponse::HostedRpcCall {
891 request_id,
892 dep_id,
893 method_idx,
894 args_bytes,
895 } => {
896 self.handle_hosted_rpc_call(
897 &dump_on_ipc_failure,
898 request_id,
899 dep_id,
900 method_idx,
901 args_bytes,
902 )
903 .await;
904 continue;
905 }
906 }
907 };
908
909 let IpcResponse::TestFinished {
910 result,
911 finish_marker,
912 } = response
913 else {
914 unreachable!("loop only breaks on TestFinished")
915 };
916
917 if test.props.capture_control.requires_capturing(!nocapture) {
918 let out_lines: Vec<_> =
919 Self::drain_until(self.out_lines.clone(), finish_marker.clone()).await;
920 let err_lines: Vec<_> =
921 Self::drain_until(self.err_lines.clone(), finish_marker.clone()).await;
922 result.into_test_result(out_lines, err_lines)
923 } else {
924 result.into_test_result(Vec::new(), Vec::new())
925 }
926 }
927
928 async fn provide_cloneable(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
931 let dump_on_ipc_failure = self.dump_on_failure();
932 let cmd = IpcCommand::ProvideCloneable {
933 dep_id: dep_id.clone(),
934 wire_bytes,
935 };
936 let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
937 dump_on_ipc_failure
938 .run(write_frame_async(&mut self.connection, &msg).await)
939 .await;
940
941 loop {
942 let response_bytes = dump_on_ipc_failure
943 .run(read_frame_async(&mut self.connection).await)
944 .await;
945 let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
946 match response {
947 IpcResponse::CloneableAccepted { dep_id: ack_id } => {
948 if ack_id == dep_id {
949 return;
950 }
951 }
952 IpcResponse::HostedDescriptorAccepted { .. } => {
953 }
955 IpcResponse::TestFinished { .. } => {
956 }
958 IpcResponse::HostedRpcCall {
959 request_id,
960 dep_id: rpc_dep_id,
961 method_idx,
962 args_bytes,
963 } => {
964 self.handle_hosted_rpc_call(
969 &dump_on_ipc_failure,
970 request_id,
971 rpc_dep_id,
972 method_idx,
973 args_bytes,
974 )
975 .await;
976 }
977 }
978 }
979 }
980
981 async fn provide_hosted_descriptor(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
983 let dump_on_ipc_failure = self.dump_on_failure();
984 let cmd = IpcCommand::ProvideHostedDescriptor {
985 dep_id: dep_id.clone(),
986 wire_bytes,
987 };
988 let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
989 dump_on_ipc_failure
990 .run(write_frame_async(&mut self.connection, &msg).await)
991 .await;
992
993 loop {
994 let response_bytes = dump_on_ipc_failure
995 .run(read_frame_async(&mut self.connection).await)
996 .await;
997 let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
998 match response {
999 IpcResponse::HostedDescriptorAccepted { dep_id: ack_id } => {
1000 if ack_id == dep_id {
1001 return;
1002 }
1003 }
1004 IpcResponse::CloneableAccepted { .. } => {
1005 }
1007 IpcResponse::TestFinished { .. } => {
1008 }
1010 IpcResponse::HostedRpcCall {
1011 request_id,
1012 dep_id: rpc_dep_id,
1013 method_idx,
1014 args_bytes,
1015 } => {
1016 self.handle_hosted_rpc_call(
1019 &dump_on_ipc_failure,
1020 request_id,
1021 rpc_dep_id,
1022 method_idx,
1023 args_bytes,
1024 )
1025 .await;
1026 }
1027 }
1028 }
1029 }
1030
1031 fn dump_on_failure(&self) -> DumpOnFailure {
1032 DumpOnFailure {
1033 out_lines: self.out_lines.clone(),
1034 err_lines: self.err_lines.clone(),
1035 }
1036 }
1037
1038 async fn drain_until(
1039 source: Arc<Mutex<VecDeque<CapturedOutput>>>,
1040 finish_marker: String,
1041 ) -> Vec<CapturedOutput> {
1042 let mut result = Vec::new();
1043 loop {
1044 let mut source = source.lock().await;
1045 while let Some(line) = source.pop_front() {
1046 if line.line() == finish_marker {
1047 return result;
1048 } else {
1049 result.push(line.clone());
1050 }
1051 }
1052 drop(source);
1053
1054 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1055 }
1056 }
1057}
1058
1059struct DumpOnFailure {
1060 out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1061 err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1062}
1063
1064impl DumpOnFailure {
1065 pub async fn run<T, E>(&self, result: Result<T, E>) -> T {
1066 match result {
1067 Ok(value) => value,
1068 Err(_error) => {
1069 let out_lines: Vec<_> = self.out_lines.lock().await.drain(..).collect();
1070 let err_lines: Vec<_> = self.err_lines.lock().await.drain(..).collect();
1071 let mut all_lines = [out_lines, err_lines].concat();
1072 all_lines.sort();
1073
1074 for line in all_lines {
1075 eprintln!("{}", line.line());
1076 }
1077
1078 std::process::exit(1);
1079 }
1080 }
1081 }
1082}
1083
1084async fn spawn_worker_if_needed(args: &Arguments) -> Option<Worker> {
1085 if args.spawn_workers {
1086 let id = Uuid::new_v4();
1087 let name_str = format!("{id}.sock");
1088 let name = name_str
1089 .clone()
1090 .to_ns_name::<GenericNamespaced>()
1091 .expect("Invalid local socket name");
1092 let opts = ListenerOptions::new().name(name.clone());
1093 let listener = opts
1094 .create_tokio()
1095 .expect("Failed to create local socket listener");
1096
1097 let exe = std::env::current_exe().expect("Failed to get current executable path");
1098
1099 let mut args = args.clone();
1100 args.ipc = Some(name_str);
1101 args.spawn_workers = false;
1102 args.logfile = None;
1103 let args = args.to_args();
1104
1105 let mut process = Command::new(exe)
1106 .args(args)
1107 .stdin(Stdio::inherit())
1108 .stderr(Stdio::piped())
1109 .stdout(Stdio::piped())
1110 .spawn()
1111 .expect("Failed to spawn worker process");
1112
1113 let stdout = process.stdout.take().unwrap();
1114 let stderr = process.stderr.take().unwrap();
1115
1116 let out_lines = Arc::new(Mutex::new(VecDeque::new()));
1117 let err_lines = Arc::new(Mutex::new(VecDeque::new()));
1118 let capture_enabled = Arc::new(Mutex::new(true));
1119
1120 let out_lines_clone = out_lines.clone();
1121 let capture_enabled_clone = capture_enabled.clone();
1122 let out_handle = spawn(async move {
1123 let reader = BufReader::new(stdout);
1124 let mut lines = reader.lines();
1125 while let Some(line) = lines
1126 .next_line()
1127 .await
1128 .expect("Failed to read from worker stdout")
1129 {
1130 if *capture_enabled_clone.lock().await {
1131 out_lines_clone
1132 .lock()
1133 .await
1134 .push_back(CapturedOutput::stdout(line));
1135 } else {
1136 println!("{line}");
1137 }
1138 }
1139 });
1140
1141 let err_lines_clone = err_lines.clone();
1142 let capture_enabled_clone = capture_enabled.clone();
1143 let err_handle = spawn(async move {
1144 let reader = BufReader::new(stderr);
1145 let mut lines = reader.lines();
1146 while let Some(line) = lines
1147 .next_line()
1148 .await
1149 .expect("Failed to read from worker stderr")
1150 {
1151 if *capture_enabled_clone.lock().await {
1152 err_lines_clone
1153 .lock()
1154 .await
1155 .push_back(CapturedOutput::stderr(line));
1156 } else {
1157 eprintln!("{line}");
1158 }
1159 }
1160 });
1161
1162 let connection = listener
1163 .accept()
1164 .await
1165 .expect("Failed to accept connection");
1166
1167 Some(Worker {
1168 _listener: listener,
1169 _process: process,
1170 _out_handle: out_handle,
1171 _err_handle: err_handle,
1172 out_lines,
1173 err_lines,
1174 connection,
1175 capture_enabled,
1176 hosted_rpc_owner_cells: Arc::new(HashMap::new()),
1177 })
1178 } else {
1179 None
1180 }
1181}
1182
1183fn install_local_hosted_rpc_stubs(
1189 execution: &mut TestSuiteExecution,
1190 rpc_factories: &HashMap<String, RpcFactory>,
1191 owner_cells: &HashMap<String, Arc<HostedRpcOwnerCell>>,
1192) {
1193 let transport: Arc<dyn HostedRpcTransport> =
1194 Arc::new(InProcessHostedRpcTransport::new(owner_cells.clone()));
1195 for (dep_id, factory) in rpc_factories.iter() {
1196 if !owner_cells.contains_key(dep_id) {
1197 continue;
1201 }
1202 let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1203 let stub = (factory.build_stub)(channel);
1204 let applied = execution.provide_cloneable_value(dep_id, stub);
1205 assert!(
1206 applied,
1207 "Local HostedRpc stub for '{dep_id}' did not match any registered dep"
1208 );
1209 }
1210}
1211
1212async fn install_worker_subprocess_hosted_rpc_stubs(
1217 execution: &Arc<Mutex<TestSuiteExecution>>,
1218 rpc_factories: &HashMap<String, RpcFactory>,
1219 connection_arc: Arc<Mutex<Stream>>,
1220) {
1221 let transport: Arc<dyn HostedRpcTransport> =
1222 Arc::new(IpcHostedRpcTransport::new(connection_arc));
1223 for (dep_id, factory) in rpc_factories.iter() {
1224 let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1225 let stub = (factory.build_stub)(channel);
1226 let mut execution = execution.lock().await;
1227 let applied = execution.provide_cloneable_value(dep_id, stub);
1228 let _ = applied;
1231 }
1232}
1233
1234struct IpcHostedRpcTransport {
1252 connection: Arc<Mutex<Stream>>,
1253 next_request_id: AtomicU64,
1254}
1255
1256impl IpcHostedRpcTransport {
1257 fn new(connection: Arc<Mutex<Stream>>) -> Self {
1258 Self {
1259 connection,
1260 next_request_id: AtomicU64::new(0),
1261 }
1262 }
1263}
1264
1265impl HostedRpcTransport for IpcHostedRpcTransport {
1266 fn call(
1267 &self,
1268 dep_id: &str,
1269 method_idx: u32,
1270 args: Vec<u8>,
1271 ) -> Result<Vec<u8>, HostedRpcError> {
1272 let request_id = self.next_request_id.fetch_add(1, Ordering::SeqCst);
1273 let call = IpcResponse::HostedRpcCall {
1274 request_id,
1275 dep_id: dep_id.to_string(),
1276 method_idx,
1277 args_bytes: args,
1278 };
1279 let msg = serialize_to_byte_vec(&call).map_err(|e| {
1280 HostedRpcError::Transport(format!("encode HostedRpcCall failed: {e:?}"))
1281 })?;
1282
1283 let connection = self.connection.clone();
1284 let handle = tokio::runtime::Handle::current();
1285
1286 tokio::task::block_in_place(move || {
1291 handle.block_on(async move {
1292 let mut conn = connection.lock().await;
1293 write_frame_async(&mut *conn, &msg).await.map_err(|e| {
1294 HostedRpcError::Transport(format!("write HostedRpcCall failed: {e:?}"))
1295 })?;
1296 let reply_bytes = read_frame_async(&mut *conn).await.map_err(|e| {
1297 HostedRpcError::Transport(format!("read HostedRpcReply failed: {e:?}"))
1298 })?;
1299 let command: IpcCommand = deserialize(&reply_bytes).map_err(|e| {
1300 HostedRpcError::Transport(format!("decode HostedRpcReply failed: {e:?}"))
1301 })?;
1302 match command {
1303 IpcCommand::HostedRpcReply {
1304 request_id: reply_id,
1305 body,
1306 } => {
1307 if reply_id != request_id {
1308 return Err(HostedRpcError::Transport(format!(
1309 "HostedRpcReply request_id mismatch: expected {request_id}, got {reply_id}"
1310 )));
1311 }
1312 match body {
1313 HostedRpcReplyBody::Ok { result_bytes } => Ok(result_bytes),
1314 HostedRpcReplyBody::Err { message } => {
1315 Err(HostedRpcError::Dispatch(message))
1316 }
1317 }
1318 }
1319 other => Err(HostedRpcError::Transport(format!(
1320 "unexpected IpcCommand while waiting for HostedRpcReply: {other:?}"
1321 ))),
1322 }
1323 })
1324 })
1325 }
1326}