1use crate::args::{Arguments, TimeThreshold};
2use crate::bench::AsyncBencher;
3use crate::execution::{DepWireBytes, TestExecution, TestSuiteExecution};
4use crate::internal;
5use crate::internal::{
6 generate_tests, get_ensure_time, CapturedOutput, CloneableCodec, FailureCause,
7 FlakinessControl, HostedRpcChannel, HostedRpcError, HostedRpcOwnerCell, HostedRpcTransport,
8 InProcessHostedRpcTransport, RegisteredTest, RpcFactory, SuiteResult, TestFunction, TestResult,
9 WorkerReconstructor,
10};
11use crate::ipc::{
12 ipc_name, read_frame_async, write_frame_async, HostedRpcReplyBody, IpcCommand, IpcResponse,
13};
14use crate::output::{test_runner_output, TestRunnerOutput};
15use desert_rust::{deserialize, serialize_to_byte_vec};
16use futures::FutureExt;
17use interprocess::local_socket::tokio::prelude::*;
18use interprocess::local_socket::tokio::{Listener, Stream};
19use interprocess::local_socket::{GenericNamespaced, ListenerOptions};
20use std::any::Any;
21use std::collections::HashMap;
22use std::collections::VecDeque;
23use std::future::Future;
24use std::panic::AssertUnwindSafe;
25use std::pin::Pin;
26use std::process::{ExitCode, Stdio};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
30use tokio::process::{Child, Command};
31use tokio::spawn;
32use tokio::sync::Mutex;
33use tokio::task::{spawn_blocking, JoinHandle, JoinSet};
34use tokio::time::Instant;
35use uuid::Uuid;
36
37pub fn test_runner() -> ExitCode {
38 tokio::runtime::Builder::new_multi_thread()
39 .enable_all()
40 .build()
41 .unwrap()
42 .block_on(async_test_runner())
43}
44
45#[allow(clippy::await_holding_lock)]
46async fn async_test_runner() -> ExitCode {
47 crate::panic_hook::install_panic_hook();
48 let mut args = Arguments::from_args();
49 if let Some(idx) = args.worker_index {
53 crate::worker::set_worker_index(idx);
54 }
55 let output = test_runner_output(&args);
56
57 let registered_tests = internal::REGISTERED_TESTS.lock().unwrap();
58 let registered_dependency_constructors =
59 internal::REGISTERED_DEPENDENCY_CONSTRUCTORS.lock().unwrap();
60 let registered_testsuite_props = internal::REGISTERED_TESTSUITE_PROPS.lock().unwrap();
61 let registered_test_generators = internal::REGISTERED_TEST_GENERATORS.lock().unwrap();
62
63 let generated_tests = generate_tests(®istered_test_generators).await;
64
65 let all_tests: Vec<RegisteredTest> = registered_tests
66 .iter()
67 .cloned()
68 .chain(generated_tests)
69 .collect();
70
71 if args.list {
72 output.test_list(&all_tests);
73 ExitCode::SUCCESS
74 } else {
75 let mut remaining_retries = args.flaky_run.unwrap_or(1);
76
77 let mut exit_code = ExitCode::from(101);
78 while remaining_retries > 0 {
79 let (mut execution, filtered_tests) = TestSuiteExecution::construct(
80 &args,
81 registered_dependency_constructors.as_slice(),
82 &all_tests,
83 registered_testsuite_props.as_slice(),
84 );
85 args.finalize_for_execution(&execution, output.clone());
86 let is_top_level_parent = args.is_top_level_parent();
87 let has_selected_tests = execution.remaining() > 0;
88 let needs_parent_shared = execution.has_cloneable_dependencies()
93 || execution.has_hosted_dependencies()
94 || execution.has_hosted_rpc_dependencies();
95 let parent_shared = if is_top_level_parent && has_selected_tests && needs_parent_shared
96 {
97 execution.collect_parent_shared_dependencies_async().await
98 } else {
99 crate::execution::ParentSharedDependencies {
100 cloneable_wire_bytes: Vec::new(),
101 hosted_descriptor_bytes: Vec::new(),
102 hosted_owners: Vec::new(),
103 hosted_rpc_owner_cells: Vec::new(),
104 }
105 };
106 let cloneable_wire_bytes = parent_shared.cloneable_wire_bytes;
107 let hosted_descriptor_bytes = parent_shared.hosted_descriptor_bytes;
108 let _hosted_owners = parent_shared.hosted_owners;
109 let hosted_rpc_owner_cells: HashMap<String, Arc<HostedRpcOwnerCell>> =
110 parent_shared.hosted_rpc_owner_cells.into_iter().collect();
111 let rpc_factories: HashMap<String, RpcFactory> = registered_dependency_constructors
115 .iter()
116 .filter_map(|d| {
117 if d.scope == crate::internal::DepScope::HostedRpc {
118 d.rpc_factory
119 .as_ref()
120 .map(|f| (d.qualified_id(), f.clone()))
121 } else {
122 None
123 }
124 })
125 .collect();
126 let cloneable_codecs: HashMap<String, (CloneableCodec, WorkerReconstructor)> =
134 registered_dependency_constructors
135 .iter()
136 .filter_map(|d| {
137 let codec_opt = match d.scope {
138 crate::internal::DepScope::Cloneable => d.cloneable_codec.as_ref(),
139 crate::internal::DepScope::Hosted => d.hosted_codec.as_ref(),
140 _ => None,
141 };
142 match (codec_opt, &d.worker_fn) {
143 (Some(codec), Some(worker_fn)) => {
144 Some((d.qualified_id(), (codec.clone(), worker_fn.clone())))
145 }
146 _ => None,
147 }
148 })
149 .collect();
150 if is_top_level_parent && !args.spawn_workers && !hosted_descriptor_bytes.is_empty() {
158 apply_hosted_descriptors_locally(
159 &mut execution,
160 &cloneable_codecs,
161 &hosted_descriptor_bytes,
162 )
163 .await;
164 }
165 if is_top_level_parent && !args.spawn_workers && !hosted_rpc_owner_cells.is_empty() {
170 install_local_hosted_rpc_stubs(
171 &mut execution,
172 &rpc_factories,
173 &hosted_rpc_owner_cells,
174 );
175 }
176 if args.spawn_workers {
177 execution.skip_creating_dependencies();
178 }
179
180 let count = execution.remaining();
185 let results = Arc::new(Mutex::new(Vec::with_capacity(count)));
186
187 let start = Instant::now();
188 output.start_suite(&filtered_tests);
189
190 let execution = Arc::new(Mutex::new(execution));
191 let cloneable_wire_bytes = Arc::new(cloneable_wire_bytes);
192 let hosted_descriptor_bytes = Arc::new(hosted_descriptor_bytes);
193 let cloneable_codecs = Arc::new(cloneable_codecs);
194 let rpc_factories = Arc::new(rpc_factories);
195 let hosted_rpc_owner_cells = Arc::new(hosted_rpc_owner_cells);
196 let mut join_set = JoinSet::new();
197 let threads = args.test_threads().get();
198
199 for worker_idx in 0..threads {
200 let execution_clone = execution.clone();
201 let output_clone = output.clone();
202 let mut args_clone = args.clone();
205 if args_clone.spawn_workers {
206 args_clone.worker_index = Some(worker_idx);
207 }
208 let results_clone = results.clone();
209 let wire_bytes_clone = cloneable_wire_bytes.clone();
210 let hosted_bytes_clone = hosted_descriptor_bytes.clone();
211 let codecs_clone = cloneable_codecs.clone();
212 let rpc_factories_clone = rpc_factories.clone();
213 let hosted_rpc_owner_cells_clone = hosted_rpc_owner_cells.clone();
214 let handle = tokio::runtime::Handle::current();
215 join_set.spawn_blocking(move || {
216 handle.block_on(test_thread(
217 args_clone,
218 execution_clone,
219 output_clone,
220 count,
221 results_clone,
222 wire_bytes_clone,
223 hosted_bytes_clone,
224 codecs_clone,
225 rpc_factories_clone,
226 hosted_rpc_owner_cells_clone,
227 ))
228 });
229 }
230
231 while let Some(res) = join_set.join_next().await {
232 res.expect("Failed to join task");
233 }
234
235 drop(execution);
236
237 let results = results.lock().await;
238 output.finished_suite(&all_tests, &results, start.elapsed());
239 exit_code = SuiteResult::exit_code(&results);
240
241 if exit_code == ExitCode::SUCCESS {
242 break;
243 } else {
244 remaining_retries -= 1;
245 }
246 }
247 exit_code
248 }
249}
250
251#[allow(clippy::too_many_arguments)]
252async fn test_thread(
253 args: Arguments,
254 execution: Arc<Mutex<TestSuiteExecution>>,
255 output: Arc<dyn TestRunnerOutput>,
256 count: usize,
257 results: Arc<Mutex<Vec<(RegisteredTest, TestResult)>>>,
258 cloneable_wire_bytes: Arc<Vec<DepWireBytes>>,
259 hosted_descriptor_bytes: Arc<Vec<DepWireBytes>>,
260 cloneable_codecs: Arc<HashMap<String, (CloneableCodec, WorkerReconstructor)>>,
261 rpc_factories: Arc<HashMap<String, RpcFactory>>,
262 hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
263) {
264 let mut worker = spawn_worker_if_needed(&args).await;
265 if let Some(worker) = worker.as_mut() {
270 worker.set_hosted_rpc_owner_cells(hosted_rpc_owner_cells.clone());
271 }
272 let connection_arc = if let Some(ref name) = args.ipc {
273 let name = ipc_name(name.clone());
274 let stream = Stream::connect(name)
275 .await
276 .expect("Failed to connect to IPC socket");
277 Some(Arc::new(Mutex::new(stream)))
278 } else {
279 None
280 };
281
282 if let Some(worker) = worker.as_mut() {
283 for (dep_id, wire_bytes) in cloneable_wire_bytes.iter() {
284 worker
285 .provide_cloneable(dep_id.clone(), wire_bytes.clone())
286 .await;
287 }
288 for (dep_id, descriptor_bytes) in hosted_descriptor_bytes.iter() {
290 worker
291 .provide_hosted_descriptor(dep_id.clone(), descriptor_bytes.clone())
292 .await;
293 }
294 }
295
296 if let Some(connection) = connection_arc.as_ref() {
301 if !rpc_factories.is_empty() {
302 install_worker_subprocess_hosted_rpc_stubs(
303 &execution,
304 &rpc_factories,
305 connection.clone(),
306 )
307 .await;
308 }
309 }
310
311 let mut expected_test = None;
312
313 while !is_done(&execution).await {
314 if let Some(connection) = connection_arc.as_ref() {
315 while expected_test.is_none() {
316 let mut conn = connection.lock().await;
317 let command_bytes = read_frame_async(&mut *conn)
318 .await
319 .expect("Failed to read IPC command frame");
320 drop(conn);
321 let command: IpcCommand =
322 deserialize(&command_bytes).expect("Failed to decode IPC command");
323
324 match command {
325 IpcCommand::RunTest {
326 name,
327 crate_name,
328 module_path,
329 } => {
330 expected_test = Some((name, crate_name, module_path));
331 }
332 IpcCommand::ProvideCloneable { dep_id, wire_bytes } => {
333 apply_provided_wire_bytes(
335 &execution,
336 &cloneable_codecs,
337 &dep_id,
338 &wire_bytes,
339 "ProvideCloneable",
340 )
341 .await;
342 let response = IpcResponse::CloneableAccepted { dep_id };
343 let msg = serialize_to_byte_vec(&response)
344 .expect("Failed to encode IPC response");
345 let mut conn = connection.lock().await;
346 write_frame_async(&mut *conn, &msg)
347 .await
348 .expect("Failed to write IPC response frame");
349 }
350 IpcCommand::ProvideHostedDescriptor { dep_id, wire_bytes } => {
351 apply_provided_wire_bytes(
355 &execution,
356 &cloneable_codecs,
357 &dep_id,
358 &wire_bytes,
359 "ProvideHostedDescriptor",
360 )
361 .await;
362 let response = IpcResponse::HostedDescriptorAccepted { dep_id };
363 let msg = serialize_to_byte_vec(&response)
364 .expect("Failed to encode IPC response");
365 let mut conn = connection.lock().await;
366 write_frame_async(&mut *conn, &msg)
367 .await
368 .expect("Failed to write IPC response frame");
369 }
370 IpcCommand::HostedRpcReply { .. } => {
371 panic!(
378 "unexpected `HostedRpcReply` while waiting for the next \
379 between-tests command in the tokio worker subprocess: a \
380 stub call must have left a reply on the wire without \
381 draining it inline"
382 );
383 }
384 }
385 }
386 }
387
388 if let Some(next) = pick_next(&execution).await {
389 let skip = if let Some((name, crate_name, module_path)) = &expected_test {
390 next.test.name != *name
391 || next.test.crate_name != *crate_name
392 || next.test.module_path != *module_path
393 } else {
394 false
395 };
396
397 if !skip {
398 expected_test = None;
399
400 let ensure_time = get_ensure_time(&args, &next.test);
401
402 output.start_running_test(&next.test, next.index, count);
403 let result = run_test(
404 output.clone(),
405 next.index,
406 count,
407 args.nocapture,
408 args.include_ignored,
409 ensure_time,
410 next.deps.clone(),
411 &next.test,
412 &mut worker,
413 )
414 .await;
415 output.finished_running_test(&next.test, next.index, count, &result);
416
417 if let Some(connection) = connection_arc.as_ref() {
418 let finish_marker = Uuid::new_v4().to_string();
419 let finish_marker_line = format!("{finish_marker}\n");
420 tokio::io::stdout()
421 .write_all(finish_marker_line.as_bytes())
422 .await
423 .unwrap();
424 tokio::io::stderr()
425 .write_all(finish_marker_line.as_bytes())
426 .await
427 .unwrap();
428 tokio::io::stdout().flush().await.unwrap();
429 tokio::io::stderr().flush().await.unwrap();
430
431 let response = IpcResponse::TestFinished {
432 result: (&result).into(),
433 finish_marker,
434 };
435 let msg =
436 serialize_to_byte_vec(&response).expect("Failed to encode IPC response");
437 let mut conn = connection.lock().await;
438 write_frame_async(&mut *conn, &msg)
439 .await
440 .expect("Failed to write IPC response frame");
441 }
442
443 results.lock().await.push((next.test.clone(), result));
444 }
445 }
446 }
447}
448
449async fn is_done(execution: &Arc<Mutex<TestSuiteExecution>>) -> bool {
450 let execution = execution.lock().await;
451 execution.is_done()
452}
453
454async fn apply_provided_wire_bytes(
464 execution: &Arc<Mutex<TestSuiteExecution>>,
465 wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
466 dep_id: &str,
467 wire_bytes: &[u8],
468 source_command: &str,
469) {
470 let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
471 panic!("{source_command} referenced unknown wire-shipped dep '{dep_id}'")
472 });
473
474 let wire_payload = (codec.from_wire_bytes)(wire_bytes);
475 let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
476 Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
477 let reconstructed = match worker_fn {
478 WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
479 WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
480 };
481
482 let mut execution = execution.lock().await;
483 let applied = execution.provide_cloneable_value(dep_id, reconstructed);
484 assert!(
485 applied,
486 "{source_command} for dep '{dep_id}' did not match any registered dep in this worker"
487 );
488}
489
490async fn apply_hosted_descriptors_locally(
503 execution: &mut TestSuiteExecution,
504 wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
505 descriptor_bytes: &[DepWireBytes],
506) {
507 for (dep_id, wire_bytes) in descriptor_bytes {
508 let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
509 panic!("Hosted dep '{dep_id}' missing codec/worker_fn for local handle reconstruction")
510 });
511 let wire_payload = (codec.from_wire_bytes)(wire_bytes);
512 let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
513 Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
514 let reconstructed = match worker_fn {
515 WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
516 WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
517 };
518 let applied = execution.provide_cloneable_value(dep_id, reconstructed);
519 assert!(
520 applied,
521 "Hosted dep '{dep_id}' could not be pre-populated locally"
522 );
523 }
524}
525
526async fn pick_next(execution: &Arc<Mutex<TestSuiteExecution>>) -> Option<TestExecution> {
527 let mut execution = execution.lock().await;
528 execution.pick_next().await
529}
530
531async fn run_with_flakiness_control<F>(
532 output: Arc<dyn TestRunnerOutput>,
533 test_description: &RegisteredTest,
534 idx: usize,
535 count: usize,
536 test: F,
537) -> Result<Result<(), FailureCause>, Box<dyn Any + Send>>
538where
539 F: Fn(
540 Instant,
541 )
542 -> Pin<Box<dyn Future<Output = Result<Result<(), FailureCause>, Box<dyn Any + Send>>>>>
543 + Send
544 + Sync,
545{
546 match &test_description.props.flakiness_control {
547 FlakinessControl::None => {
548 let start = Instant::now();
549 test(start).await
550 }
551 FlakinessControl::ProveNonFlaky(tries) => {
552 for n in 0..*tries {
553 if n > 0 {
554 output.repeat_running_test(
555 test_description,
556 idx,
557 count,
558 n + 1,
559 *tries,
560 "to ensure test is not flaky",
561 );
562 }
563 let start = Instant::now();
564 match test(start).await {
565 Ok(Ok(())) => {}
566 Ok(Err(e)) => return Ok(Err(e)),
567 Err(e) => return Err(e),
568 };
569 }
570 Ok(Ok(()))
571 }
572 FlakinessControl::RetryKnownFlaky(max_retries) => {
573 let mut tries = 1;
574 loop {
575 let start = Instant::now();
576 let result = test(start).await;
577
578 if result.is_err() && tries < *max_retries {
579 tries += 1;
580 output.repeat_running_test(
581 test_description,
582 idx,
583 count,
584 tries,
585 *max_retries,
586 "because test is known to be flaky",
587 );
588 } else {
589 break result;
590 }
591 }
592 }
593 }
594}
595
596#[allow(clippy::too_many_arguments)]
597async fn run_test(
598 output: Arc<dyn TestRunnerOutput>,
599 idx: usize,
600 count: usize,
601 nocapture: bool,
602 include_ignored: bool,
603 ensure_time: Option<TimeThreshold>,
604 dependency_view: Arc<dyn internal::DependencyView + Send + Sync>,
605 test: &RegisteredTest,
606 worker: &mut Option<Worker>,
607) -> TestResult {
608 if test.props.is_ignored && !include_ignored {
609 TestResult::ignored()
610 } else if let Some(worker) = worker.as_mut() {
611 worker.run_test(nocapture, test).await
612 } else {
613 let start = Instant::now();
614 let test = test.clone();
615 match &test.run {
616 TestFunction::Sync(_) => {
617 let handle = spawn_blocking(move || {
618 let test = test.clone();
619 crate::sync::run_sync_test_function(
620 output,
621 &test,
622 idx,
623 count,
624 ensure_time,
625 dependency_view,
626 )
627 });
628 handle.await.unwrap_or_else(|join_error| {
629 TestResult::failed(
630 start.elapsed(),
631 FailureCause::HarnessError(format!(
632 "Failed joining test task: {join_error}"
633 )),
634 )
635 })
636 }
637 TestFunction::Async(test_fn) => {
638 let timeout = test.props.timeout;
639 let test_fn = test_fn.clone();
640 let detached_panic_policy = test.props.detached_panic_policy.clone();
641 let result = run_with_flakiness_control(output, &test, idx, count, |start| {
642 let dependency_view = dependency_view.clone();
643 let test_fn = test_fn.clone();
644 Box::pin(async move {
645 let test_id = crate::panic_hook::next_test_id();
646 crate::panic_hook::set_current_test_id(test_id);
647 crate::panic_hook::create_detached_collector(test_id);
648 let result = AssertUnwindSafe(Box::pin(async move {
649 match timeout {
650 None => test_fn(dependency_view).await,
651 Some(duration) => {
652 let result =
653 tokio::time::timeout(duration, test_fn(dependency_view))
654 .await;
655 match result {
656 Ok(result) => result,
657 Err(_) => {
658 return Err(FailureCause::HarnessError(
659 "Test timed out".to_string(),
660 ))
661 }
662 }
663 }
664 }
665 .into_result()?;
666 if let Some(ensure_time) = ensure_time {
667 let elapsed = start.elapsed();
668 if ensure_time.is_critical(&elapsed) {
669 return Err(FailureCause::HarnessError(format!(
670 "Test run time exceeds critical threshold: {elapsed:?}"
671 )));
672 }
673 }
674 Ok(())
675 }))
676 .catch_unwind()
677 .await;
678 result
679 })
680 })
681 .await;
682 let mut test_result =
683 TestResult::from_result(&test.props.should_panic, start.elapsed(), result);
684 if let Some(test_id) = crate::panic_hook::current_test_id() {
685 if let Some(collector) = crate::panic_hook::take_detached_collector(test_id) {
686 let panics = match collector.lock() {
687 Ok(p) => p,
688 Err(poisoned) => poisoned.into_inner(),
689 };
690 if !panics.is_empty()
691 && detached_panic_policy == internal::DetachedPanicPolicy::FailTest
692 && test_result.is_passed()
693 {
694 let messages: Vec<String> = panics.iter().map(|p| p.render()).collect();
695 test_result = TestResult::failed(
696 start.elapsed(),
697 FailureCause::Panic(internal::PanicCause {
698 message: Some(format!(
699 "Detached task(s) panicked:\n{}",
700 messages.join("\n---\n")
701 )),
702 location: panics.first().and_then(|p| p.location.clone()),
703 backtrace: panics.first().and_then(|p| p.backtrace.clone()),
704 }),
705 );
706 }
707 }
708 }
709 crate::panic_hook::clear_current_test_id();
710 test_result
711 }
712 TestFunction::SyncBench(_) => {
713 let handle = spawn_blocking(move || {
714 let test = test.clone();
715 crate::sync::run_sync_test_function(
716 output,
717 &test,
718 idx,
719 count,
720 ensure_time,
721 dependency_view,
722 )
723 });
724 handle.await.unwrap_or_else(|join_error| {
725 TestResult::failed(
726 start.elapsed(),
727 FailureCause::HarnessError(format!(
728 "Failed joining test task: {join_error}"
729 )),
730 )
731 })
732 }
733 TestFunction::AsyncBench(bench_fn) => {
734 let mut bencher = AsyncBencher::new();
735 let test_id = crate::panic_hook::next_test_id();
736 crate::panic_hook::set_current_test_id(test_id);
737 let result = AssertUnwindSafe(async move {
738 bench_fn(&mut bencher, dependency_view).await;
739 (
740 bencher
741 .summary()
742 .expect("iter() was not called in bench function"),
743 bencher.bytes,
744 )
745 })
746 .catch_unwind()
747 .await;
748 let bytes = result.as_ref().map(|(_, bytes)| *bytes).unwrap_or_default();
749 let test_result = TestResult::from_summary(
750 &test.props.should_panic,
751 start.elapsed(),
752 result.map(|(summary, _)| summary),
753 bytes,
754 );
755 crate::panic_hook::clear_current_test_id();
756 test_result
757 }
758 }
759 }
760}
761
762struct Worker {
763 _listener: Listener,
764 _process: Child,
765 _out_handle: JoinHandle<()>,
766 _err_handle: JoinHandle<()>,
767 out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
768 err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
769 capture_enabled: Arc<Mutex<bool>>,
770 connection: Stream,
771 hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
775}
776
777impl Worker {
778 fn set_hosted_rpc_owner_cells(&mut self, cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>) {
782 self.hosted_rpc_owner_cells = cells;
783 }
784
785 async fn handle_hosted_rpc_call(
791 &mut self,
792 dump_on_ipc_failure: &DumpOnFailure,
793 request_id: u64,
794 dep_id: String,
795 method_idx: u32,
796 args_bytes: Vec<u8>,
797 ) {
798 let body = match self.hosted_rpc_owner_cells.get(&dep_id) {
799 Some(cell) => match cell.dispatch_async(method_idx, &args_bytes).await {
805 Ok(result_bytes) => HostedRpcReplyBody::Ok { result_bytes },
806 Err(message) => HostedRpcReplyBody::Err { message },
807 },
808 None => HostedRpcReplyBody::Err {
809 message: format!(
810 "HostedRpc dispatch: unknown dep id '{dep_id}' in parent owner-cell map"
811 ),
812 },
813 };
814 let reply = IpcCommand::HostedRpcReply { request_id, body };
815 let msg = serialize_to_byte_vec(&reply).expect("Failed to encode HostedRpcReply");
816 dump_on_ipc_failure
817 .run(write_frame_async(&mut self.connection, &msg).await)
818 .await;
819 }
820
821 pub async fn run_test(&mut self, nocapture: bool, test: &RegisteredTest) -> TestResult {
822 let mut capture_enabled = self.capture_enabled.lock().await;
823 *capture_enabled = test.props.capture_control.requires_capturing(!nocapture);
824 drop(capture_enabled);
825
826 let cmd = IpcCommand::RunTest {
828 name: test.name.clone(),
829 crate_name: test.crate_name.clone(),
830 module_path: test.module_path.clone(),
831 };
832
833 let dump_on_ipc_failure = self.dump_on_failure();
834
835 let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
836 dump_on_ipc_failure
837 .run(write_frame_async(&mut self.connection, &msg).await)
838 .await;
839
840 let response = loop {
841 let response_bytes = dump_on_ipc_failure
842 .run(read_frame_async(&mut self.connection).await)
843 .await;
844 let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
845 match response {
846 IpcResponse::TestFinished { .. } => break response,
847 IpcResponse::CloneableAccepted { .. }
848 | IpcResponse::HostedDescriptorAccepted { .. } => continue,
849 IpcResponse::HostedRpcCall {
850 request_id,
851 dep_id,
852 method_idx,
853 args_bytes,
854 } => {
855 self.handle_hosted_rpc_call(
856 &dump_on_ipc_failure,
857 request_id,
858 dep_id,
859 method_idx,
860 args_bytes,
861 )
862 .await;
863 continue;
864 }
865 }
866 };
867
868 let IpcResponse::TestFinished {
869 result,
870 finish_marker,
871 } = response
872 else {
873 unreachable!("loop only breaks on TestFinished")
874 };
875
876 if test.props.capture_control.requires_capturing(!nocapture) {
877 let out_lines: Vec<_> =
878 Self::drain_until(self.out_lines.clone(), finish_marker.clone()).await;
879 let err_lines: Vec<_> =
880 Self::drain_until(self.err_lines.clone(), finish_marker.clone()).await;
881 result.into_test_result(out_lines, err_lines)
882 } else {
883 result.into_test_result(Vec::new(), Vec::new())
884 }
885 }
886
887 async fn provide_cloneable(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
890 let dump_on_ipc_failure = self.dump_on_failure();
891 let cmd = IpcCommand::ProvideCloneable {
892 dep_id: dep_id.clone(),
893 wire_bytes,
894 };
895 let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
896 dump_on_ipc_failure
897 .run(write_frame_async(&mut self.connection, &msg).await)
898 .await;
899
900 loop {
901 let response_bytes = dump_on_ipc_failure
902 .run(read_frame_async(&mut self.connection).await)
903 .await;
904 let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
905 match response {
906 IpcResponse::CloneableAccepted { dep_id: ack_id } => {
907 if ack_id == dep_id {
908 return;
909 }
910 }
911 IpcResponse::HostedDescriptorAccepted { .. } => {
912 }
914 IpcResponse::TestFinished { .. } => {
915 }
917 IpcResponse::HostedRpcCall {
918 request_id,
919 dep_id: rpc_dep_id,
920 method_idx,
921 args_bytes,
922 } => {
923 self.handle_hosted_rpc_call(
928 &dump_on_ipc_failure,
929 request_id,
930 rpc_dep_id,
931 method_idx,
932 args_bytes,
933 )
934 .await;
935 }
936 }
937 }
938 }
939
940 async fn provide_hosted_descriptor(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
942 let dump_on_ipc_failure = self.dump_on_failure();
943 let cmd = IpcCommand::ProvideHostedDescriptor {
944 dep_id: dep_id.clone(),
945 wire_bytes,
946 };
947 let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
948 dump_on_ipc_failure
949 .run(write_frame_async(&mut self.connection, &msg).await)
950 .await;
951
952 loop {
953 let response_bytes = dump_on_ipc_failure
954 .run(read_frame_async(&mut self.connection).await)
955 .await;
956 let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
957 match response {
958 IpcResponse::HostedDescriptorAccepted { dep_id: ack_id } => {
959 if ack_id == dep_id {
960 return;
961 }
962 }
963 IpcResponse::CloneableAccepted { .. } => {
964 }
966 IpcResponse::TestFinished { .. } => {
967 }
969 IpcResponse::HostedRpcCall {
970 request_id,
971 dep_id: rpc_dep_id,
972 method_idx,
973 args_bytes,
974 } => {
975 self.handle_hosted_rpc_call(
978 &dump_on_ipc_failure,
979 request_id,
980 rpc_dep_id,
981 method_idx,
982 args_bytes,
983 )
984 .await;
985 }
986 }
987 }
988 }
989
990 fn dump_on_failure(&self) -> DumpOnFailure {
991 DumpOnFailure {
992 out_lines: self.out_lines.clone(),
993 err_lines: self.err_lines.clone(),
994 }
995 }
996
997 async fn drain_until(
998 source: Arc<Mutex<VecDeque<CapturedOutput>>>,
999 finish_marker: String,
1000 ) -> Vec<CapturedOutput> {
1001 let mut result = Vec::new();
1002 loop {
1003 let mut source = source.lock().await;
1004 while let Some(line) = source.pop_front() {
1005 if line.line() == finish_marker {
1006 return result;
1007 } else {
1008 result.push(line.clone());
1009 }
1010 }
1011 drop(source);
1012
1013 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1014 }
1015 }
1016}
1017
1018struct DumpOnFailure {
1019 out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1020 err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1021}
1022
1023impl DumpOnFailure {
1024 pub async fn run<T, E>(&self, result: Result<T, E>) -> T {
1025 match result {
1026 Ok(value) => value,
1027 Err(_error) => {
1028 let out_lines: Vec<_> = self.out_lines.lock().await.drain(..).collect();
1029 let err_lines: Vec<_> = self.err_lines.lock().await.drain(..).collect();
1030 let mut all_lines = [out_lines, err_lines].concat();
1031 all_lines.sort();
1032
1033 for line in all_lines {
1034 eprintln!("{}", line.line());
1035 }
1036
1037 std::process::exit(1);
1038 }
1039 }
1040 }
1041}
1042
1043async fn spawn_worker_if_needed(args: &Arguments) -> Option<Worker> {
1044 if args.spawn_workers {
1045 let id = Uuid::new_v4();
1046 let name_str = format!("{id}.sock");
1047 let name = name_str
1048 .clone()
1049 .to_ns_name::<GenericNamespaced>()
1050 .expect("Invalid local socket name");
1051 let opts = ListenerOptions::new().name(name.clone());
1052 let listener = opts
1053 .create_tokio()
1054 .expect("Failed to create local socket listener");
1055
1056 let exe = std::env::current_exe().expect("Failed to get current executable path");
1057
1058 let mut args = args.clone();
1059 args.ipc = Some(name_str);
1060 args.spawn_workers = false;
1061 args.logfile = None;
1062 let args = args.to_args();
1063
1064 let mut process = Command::new(exe)
1065 .args(args)
1066 .stdin(Stdio::inherit())
1067 .stderr(Stdio::piped())
1068 .stdout(Stdio::piped())
1069 .spawn()
1070 .expect("Failed to spawn worker process");
1071
1072 let stdout = process.stdout.take().unwrap();
1073 let stderr = process.stderr.take().unwrap();
1074
1075 let out_lines = Arc::new(Mutex::new(VecDeque::new()));
1076 let err_lines = Arc::new(Mutex::new(VecDeque::new()));
1077 let capture_enabled = Arc::new(Mutex::new(true));
1078
1079 let out_lines_clone = out_lines.clone();
1080 let capture_enabled_clone = capture_enabled.clone();
1081 let out_handle = spawn(async move {
1082 let reader = BufReader::new(stdout);
1083 let mut lines = reader.lines();
1084 while let Some(line) = lines
1085 .next_line()
1086 .await
1087 .expect("Failed to read from worker stdout")
1088 {
1089 if *capture_enabled_clone.lock().await {
1090 out_lines_clone
1091 .lock()
1092 .await
1093 .push_back(CapturedOutput::stdout(line));
1094 } else {
1095 println!("{line}");
1096 }
1097 }
1098 });
1099
1100 let err_lines_clone = err_lines.clone();
1101 let capture_enabled_clone = capture_enabled.clone();
1102 let err_handle = spawn(async move {
1103 let reader = BufReader::new(stderr);
1104 let mut lines = reader.lines();
1105 while let Some(line) = lines
1106 .next_line()
1107 .await
1108 .expect("Failed to read from worker stderr")
1109 {
1110 if *capture_enabled_clone.lock().await {
1111 err_lines_clone
1112 .lock()
1113 .await
1114 .push_back(CapturedOutput::stderr(line));
1115 } else {
1116 eprintln!("{line}");
1117 }
1118 }
1119 });
1120
1121 let connection = listener
1122 .accept()
1123 .await
1124 .expect("Failed to accept connection");
1125
1126 Some(Worker {
1127 _listener: listener,
1128 _process: process,
1129 _out_handle: out_handle,
1130 _err_handle: err_handle,
1131 out_lines,
1132 err_lines,
1133 connection,
1134 capture_enabled,
1135 hosted_rpc_owner_cells: Arc::new(HashMap::new()),
1136 })
1137 } else {
1138 None
1139 }
1140}
1141
1142fn install_local_hosted_rpc_stubs(
1148 execution: &mut TestSuiteExecution,
1149 rpc_factories: &HashMap<String, RpcFactory>,
1150 owner_cells: &HashMap<String, Arc<HostedRpcOwnerCell>>,
1151) {
1152 let transport: Arc<dyn HostedRpcTransport> =
1153 Arc::new(InProcessHostedRpcTransport::new(owner_cells.clone()));
1154 for (dep_id, factory) in rpc_factories.iter() {
1155 if !owner_cells.contains_key(dep_id) {
1156 continue;
1160 }
1161 let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1162 let stub = (factory.build_stub)(channel);
1163 let applied = execution.provide_cloneable_value(dep_id, stub);
1164 assert!(
1165 applied,
1166 "Local HostedRpc stub for '{dep_id}' did not match any registered dep"
1167 );
1168 }
1169}
1170
1171async fn install_worker_subprocess_hosted_rpc_stubs(
1176 execution: &Arc<Mutex<TestSuiteExecution>>,
1177 rpc_factories: &HashMap<String, RpcFactory>,
1178 connection_arc: Arc<Mutex<Stream>>,
1179) {
1180 let transport: Arc<dyn HostedRpcTransport> =
1181 Arc::new(IpcHostedRpcTransport::new(connection_arc));
1182 for (dep_id, factory) in rpc_factories.iter() {
1183 let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1184 let stub = (factory.build_stub)(channel);
1185 let mut execution = execution.lock().await;
1186 let applied = execution.provide_cloneable_value(dep_id, stub);
1187 let _ = applied;
1190 }
1191}
1192
1193struct IpcHostedRpcTransport {
1211 connection: Arc<Mutex<Stream>>,
1212 next_request_id: AtomicU64,
1213}
1214
1215impl IpcHostedRpcTransport {
1216 fn new(connection: Arc<Mutex<Stream>>) -> Self {
1217 Self {
1218 connection,
1219 next_request_id: AtomicU64::new(0),
1220 }
1221 }
1222}
1223
1224impl HostedRpcTransport for IpcHostedRpcTransport {
1225 fn call(
1226 &self,
1227 dep_id: &str,
1228 method_idx: u32,
1229 args: Vec<u8>,
1230 ) -> Result<Vec<u8>, HostedRpcError> {
1231 let request_id = self.next_request_id.fetch_add(1, Ordering::SeqCst);
1232 let call = IpcResponse::HostedRpcCall {
1233 request_id,
1234 dep_id: dep_id.to_string(),
1235 method_idx,
1236 args_bytes: args,
1237 };
1238 let msg = serialize_to_byte_vec(&call).map_err(|e| {
1239 HostedRpcError::Transport(format!("encode HostedRpcCall failed: {e:?}"))
1240 })?;
1241
1242 let connection = self.connection.clone();
1243 let handle = tokio::runtime::Handle::current();
1244
1245 tokio::task::block_in_place(move || {
1250 handle.block_on(async move {
1251 let mut conn = connection.lock().await;
1252 write_frame_async(&mut *conn, &msg).await.map_err(|e| {
1253 HostedRpcError::Transport(format!("write HostedRpcCall failed: {e:?}"))
1254 })?;
1255 let reply_bytes = read_frame_async(&mut *conn).await.map_err(|e| {
1256 HostedRpcError::Transport(format!("read HostedRpcReply failed: {e:?}"))
1257 })?;
1258 let command: IpcCommand = deserialize(&reply_bytes).map_err(|e| {
1259 HostedRpcError::Transport(format!("decode HostedRpcReply failed: {e:?}"))
1260 })?;
1261 match command {
1262 IpcCommand::HostedRpcReply {
1263 request_id: reply_id,
1264 body,
1265 } => {
1266 if reply_id != request_id {
1267 return Err(HostedRpcError::Transport(format!(
1268 "HostedRpcReply request_id mismatch: expected {request_id}, got {reply_id}"
1269 )));
1270 }
1271 match body {
1272 HostedRpcReplyBody::Ok { result_bytes } => Ok(result_bytes),
1273 HostedRpcReplyBody::Err { message } => {
1274 Err(HostedRpcError::Dispatch(message))
1275 }
1276 }
1277 }
1278 other => Err(HostedRpcError::Transport(format!(
1279 "unexpected IpcCommand while waiting for HostedRpcReply: {other:?}"
1280 ))),
1281 }
1282 })
1283 })
1284 }
1285}