1use crate::args::{Arguments, TimeThreshold};
2use crate::bench::AsyncBencher;
3use crate::execution::{DepWireBytes, TestExecution, TestSuiteExecution};
4use crate::internal;
5use crate::internal::{
6 generate_tests, get_ensure_time, CapturedOutput, CloneableCodec, FailureCause,
7 FlakinessControl, HostedRpcChannel, HostedRpcError, HostedRpcOwnerCell, HostedRpcTransport,
8 InProcessHostedRpcTransport, RegisteredTest, RpcFactory, SuiteResult, TestFunction, TestResult,
9 WorkerReconstructor,
10};
11use crate::ipc::{
12 ipc_name, read_frame_async, write_frame_async, HostedRpcReplyBody, IpcCommand, IpcResponse,
13};
14use crate::output::{test_runner_output, TestRunnerOutput};
15use desert_rust::{deserialize, serialize_to_byte_vec};
16use futures::FutureExt;
17use interprocess::local_socket::tokio::prelude::*;
18use interprocess::local_socket::tokio::{Listener, Stream};
19use interprocess::local_socket::{GenericNamespaced, ListenerOptions};
20use std::any::Any;
21use std::collections::HashMap;
22use std::collections::VecDeque;
23use std::future::Future;
24use std::panic::AssertUnwindSafe;
25use std::pin::Pin;
26use std::process::{ExitCode, Stdio};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
30use tokio::process::{Child, Command};
31use tokio::spawn;
32use tokio::sync::Mutex;
33use tokio::task::{spawn_blocking, JoinHandle, JoinSet};
34use tokio::time::Instant;
35use uuid::Uuid;
36
37pub fn test_runner() -> ExitCode {
38 tokio::runtime::Builder::new_multi_thread()
39 .enable_all()
40 .build()
41 .unwrap()
42 .block_on(async_test_runner())
43}
44
45#[allow(clippy::await_holding_lock)]
46async fn async_test_runner() -> ExitCode {
47 crate::panic_hook::install_panic_hook();
48 let mut args = Arguments::from_args();
49 if let Some(idx) = args.worker_index {
53 crate::worker::set_worker_index(idx);
54 }
55 let output = test_runner_output(&args);
56
57 let registered_tests = internal::REGISTERED_TESTS.lock().unwrap();
58 let registered_dependency_constructors =
59 internal::REGISTERED_DEPENDENCY_CONSTRUCTORS.lock().unwrap();
60 let registered_testsuite_props = internal::REGISTERED_TESTSUITE_PROPS.lock().unwrap();
61 let registered_test_generators = internal::REGISTERED_TEST_GENERATORS.lock().unwrap();
62
63 let generated_tests = generate_tests(®istered_test_generators).await;
64
65 let all_tests: Vec<RegisteredTest> = registered_tests
66 .iter()
67 .cloned()
68 .chain(generated_tests)
69 .collect();
70
71 if args.list {
72 output.test_list(&all_tests);
73 ExitCode::SUCCESS
74 } else {
75 let mut remaining_retries = args.flaky_run.unwrap_or(1);
76
77 let mut exit_code = ExitCode::from(101);
78 while remaining_retries > 0 {
79 let (mut execution, filtered_tests) = TestSuiteExecution::construct(
80 &args,
81 registered_dependency_constructors.as_slice(),
82 &all_tests,
83 registered_testsuite_props.as_slice(),
84 );
85 args.finalize_for_execution(&execution, output.clone());
86 let is_top_level_parent = args.is_top_level_parent();
87 let has_selected_tests = execution.remaining() > 0;
88 let needs_parent_shared = execution.has_cloneable_dependencies()
93 || execution.has_hosted_dependencies()
94 || execution.has_hosted_rpc_dependencies();
95 let parent_shared = if is_top_level_parent && has_selected_tests && needs_parent_shared
96 {
97 execution.collect_parent_shared_dependencies_async().await
98 } else {
99 crate::execution::ParentSharedDependencies {
100 cloneable_wire_bytes: Vec::new(),
101 hosted_descriptor_bytes: Vec::new(),
102 hosted_owners: Vec::new(),
103 hosted_rpc_owner_cells: Vec::new(),
104 }
105 };
106 let cloneable_wire_bytes = parent_shared.cloneable_wire_bytes;
107 let hosted_descriptor_bytes = parent_shared.hosted_descriptor_bytes;
108 let _hosted_owners = parent_shared.hosted_owners;
109 let hosted_rpc_owner_cells: HashMap<String, Arc<HostedRpcOwnerCell>> =
110 parent_shared.hosted_rpc_owner_cells.into_iter().collect();
111 let rpc_factories: HashMap<String, RpcFactory> = registered_dependency_constructors
115 .iter()
116 .filter_map(|d| {
117 if d.scope == crate::internal::DepScope::HostedRpc {
118 d.rpc_factory
119 .as_ref()
120 .map(|f| (d.qualified_id(), f.clone()))
121 } else {
122 None
123 }
124 })
125 .collect();
126 let cloneable_codecs: HashMap<String, (CloneableCodec, WorkerReconstructor)> =
134 registered_dependency_constructors
135 .iter()
136 .filter_map(|d| {
137 let codec_opt = match d.scope {
138 crate::internal::DepScope::Cloneable => d.cloneable_codec.as_ref(),
139 crate::internal::DepScope::Hosted => d.hosted_codec.as_ref(),
140 _ => None,
141 };
142 match (codec_opt, &d.worker_fn) {
143 (Some(codec), Some(worker_fn)) => {
144 Some((d.qualified_id(), (codec.clone(), worker_fn.clone())))
145 }
146 _ => None,
147 }
148 })
149 .collect();
150 if is_top_level_parent && !args.spawn_workers && !hosted_descriptor_bytes.is_empty() {
158 apply_hosted_descriptors_locally(
159 &mut execution,
160 &cloneable_codecs,
161 &hosted_descriptor_bytes,
162 )
163 .await;
164 }
165 if is_top_level_parent && !args.spawn_workers && !hosted_rpc_owner_cells.is_empty() {
170 install_local_hosted_rpc_stubs(
171 &mut execution,
172 &rpc_factories,
173 &hosted_rpc_owner_cells,
174 );
175 }
176 if args.spawn_workers {
177 execution.skip_creating_dependencies();
178 }
179
180 let count = execution.remaining();
185 let results = Arc::new(Mutex::new(Vec::with_capacity(count)));
186
187 let start = Instant::now();
188 output.start_suite(&filtered_tests);
189
190 let execution = Arc::new(Mutex::new(execution));
191 let cloneable_wire_bytes = Arc::new(cloneable_wire_bytes);
192 let hosted_descriptor_bytes = Arc::new(hosted_descriptor_bytes);
193 let cloneable_codecs = Arc::new(cloneable_codecs);
194 let rpc_factories = Arc::new(rpc_factories);
195 let hosted_rpc_owner_cells = Arc::new(hosted_rpc_owner_cells);
196 let mut join_set = JoinSet::new();
197 let threads = args.test_threads().get();
198
199 for worker_idx in 0..threads {
200 let execution_clone = execution.clone();
201 let output_clone = output.clone();
202 let mut args_clone = args.clone();
205 if args_clone.spawn_workers {
206 args_clone.worker_index = Some(worker_idx);
207 }
208 let results_clone = results.clone();
209 let wire_bytes_clone = cloneable_wire_bytes.clone();
210 let hosted_bytes_clone = hosted_descriptor_bytes.clone();
211 let codecs_clone = cloneable_codecs.clone();
212 let rpc_factories_clone = rpc_factories.clone();
213 let hosted_rpc_owner_cells_clone = hosted_rpc_owner_cells.clone();
214 let handle = tokio::runtime::Handle::current();
215 join_set.spawn_blocking(move || {
216 handle.block_on(test_thread(
217 args_clone,
218 execution_clone,
219 output_clone,
220 count,
221 results_clone,
222 wire_bytes_clone,
223 hosted_bytes_clone,
224 codecs_clone,
225 rpc_factories_clone,
226 hosted_rpc_owner_cells_clone,
227 ))
228 });
229 }
230
231 while let Some(res) = join_set.join_next().await {
232 res.expect("Failed to join task");
233 }
234
235 drop(execution);
236
237 let results = results.lock().await;
238 output.finished_suite(&all_tests, &results, start.elapsed());
239 exit_code = SuiteResult::exit_code(&results);
240
241 if exit_code == ExitCode::SUCCESS {
242 break;
243 } else {
244 remaining_retries -= 1;
245 }
246 }
247 exit_code
248 }
249}
250
251#[allow(clippy::too_many_arguments)]
252async fn test_thread(
253 args: Arguments,
254 execution: Arc<Mutex<TestSuiteExecution>>,
255 output: Arc<dyn TestRunnerOutput>,
256 count: usize,
257 results: Arc<Mutex<Vec<(RegisteredTest, TestResult)>>>,
258 cloneable_wire_bytes: Arc<Vec<DepWireBytes>>,
259 hosted_descriptor_bytes: Arc<Vec<DepWireBytes>>,
260 cloneable_codecs: Arc<HashMap<String, (CloneableCodec, WorkerReconstructor)>>,
261 rpc_factories: Arc<HashMap<String, RpcFactory>>,
262 hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
263) {
264 let mut worker = spawn_worker_if_needed(&args).await;
265 if let Some(worker) = worker.as_mut() {
270 worker.set_hosted_rpc_owner_cells(hosted_rpc_owner_cells.clone());
271 }
272 let connection_arc = if let Some(ref name) = args.ipc {
273 let name = ipc_name(name.clone());
274 let stream = Stream::connect(name)
275 .await
276 .expect("Failed to connect to IPC socket");
277 Some(Arc::new(Mutex::new(stream)))
278 } else {
279 None
280 };
281
282 if let Some(worker) = worker.as_mut() {
283 for (dep_id, wire_bytes) in cloneable_wire_bytes.iter() {
284 worker
285 .provide_cloneable(dep_id.clone(), wire_bytes.clone())
286 .await;
287 }
288 for (dep_id, descriptor_bytes) in hosted_descriptor_bytes.iter() {
290 worker
291 .provide_hosted_descriptor(dep_id.clone(), descriptor_bytes.clone())
292 .await;
293 }
294 }
295
296 if let Some(connection) = connection_arc.as_ref() {
301 if !rpc_factories.is_empty() {
302 install_worker_subprocess_hosted_rpc_stubs(
303 &execution,
304 &rpc_factories,
305 connection.clone(),
306 )
307 .await;
308 }
309 }
310
311 let mut expected_test = None;
312
313 while !is_done(&execution).await {
314 if let Some(connection) = connection_arc.as_ref() {
315 while expected_test.is_none() {
316 let mut conn = connection.lock().await;
317 let command_bytes = read_frame_async(&mut *conn)
318 .await
319 .expect("Failed to read IPC command frame");
320 drop(conn);
321 let command: IpcCommand =
322 deserialize(&command_bytes).expect("Failed to decode IPC command");
323
324 match command {
325 IpcCommand::RunTest {
326 name,
327 crate_name,
328 module_path,
329 } => {
330 expected_test = Some((name, crate_name, module_path));
331 }
332 IpcCommand::ProvideCloneable { dep_id, wire_bytes } => {
333 apply_provided_wire_bytes(
335 &execution,
336 &cloneable_codecs,
337 &dep_id,
338 &wire_bytes,
339 "ProvideCloneable",
340 )
341 .await;
342 let response = IpcResponse::CloneableAccepted { dep_id };
343 let msg = serialize_to_byte_vec(&response)
344 .expect("Failed to encode IPC response");
345 let mut conn = connection.lock().await;
346 write_frame_async(&mut *conn, &msg)
347 .await
348 .expect("Failed to write IPC response frame");
349 }
350 IpcCommand::ProvideHostedDescriptor { dep_id, wire_bytes } => {
351 apply_provided_wire_bytes(
355 &execution,
356 &cloneable_codecs,
357 &dep_id,
358 &wire_bytes,
359 "ProvideHostedDescriptor",
360 )
361 .await;
362 let response = IpcResponse::HostedDescriptorAccepted { dep_id };
363 let msg = serialize_to_byte_vec(&response)
364 .expect("Failed to encode IPC response");
365 let mut conn = connection.lock().await;
366 write_frame_async(&mut *conn, &msg)
367 .await
368 .expect("Failed to write IPC response frame");
369 }
370 IpcCommand::HostedRpcReply { .. } => {
371 panic!(
378 "unexpected `HostedRpcReply` while waiting for the next \
379 between-tests command in the tokio worker subprocess: a \
380 stub call must have left a reply on the wire without \
381 draining it inline"
382 );
383 }
384 }
385 }
386 }
387
388 if let Some(next) = pick_next(&execution).await {
389 let skip = if let Some((name, crate_name, module_path)) = &expected_test {
390 next.test.name != *name
391 || next.test.crate_name != *crate_name
392 || next.test.module_path != *module_path
393 } else {
394 false
395 };
396
397 if !skip {
398 expected_test = None;
399
400 let ensure_time = get_ensure_time(&args, &next.test);
401
402 output.start_running_test(&next.test, next.index, count);
403 let result = run_test(
404 output.clone(),
405 next.index,
406 count,
407 args.nocapture,
408 args.include_ignored,
409 ensure_time,
410 next.deps.clone(),
411 &next.test,
412 &mut worker,
413 )
414 .await;
415 output.finished_running_test(&next.test, next.index, count, &result);
416
417 if let Some(connection) = connection_arc.as_ref() {
418 let finish_marker = Uuid::new_v4().to_string();
419 let finish_marker_line = format!("{finish_marker}\n");
420 tokio::io::stdout()
421 .write_all(finish_marker_line.as_bytes())
422 .await
423 .unwrap();
424 tokio::io::stderr()
425 .write_all(finish_marker_line.as_bytes())
426 .await
427 .unwrap();
428 tokio::io::stdout().flush().await.unwrap();
429 tokio::io::stderr().flush().await.unwrap();
430
431 let response = IpcResponse::TestFinished {
432 result: (&result).into(),
433 finish_marker,
434 };
435 let msg =
436 serialize_to_byte_vec(&response).expect("Failed to encode IPC response");
437 let mut conn = connection.lock().await;
438 write_frame_async(&mut *conn, &msg)
439 .await
440 .expect("Failed to write IPC response frame");
441 }
442
443 results.lock().await.push((next.test.clone(), result));
444 }
445 }
446 }
447}
448
449async fn is_done(execution: &Arc<Mutex<TestSuiteExecution>>) -> bool {
450 let execution = execution.lock().await;
451 execution.is_done()
452}
453
454async fn apply_provided_wire_bytes(
464 execution: &Arc<Mutex<TestSuiteExecution>>,
465 wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
466 dep_id: &str,
467 wire_bytes: &[u8],
468 source_command: &str,
469) {
470 let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
471 panic!("{source_command} referenced unknown wire-shipped dep '{dep_id}'")
472 });
473
474 let wire_payload = (codec.from_wire_bytes)(wire_bytes);
475 let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
476 Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
477 let reconstructed = match worker_fn {
478 WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
479 WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
480 };
481
482 let mut execution = execution.lock().await;
483 let applied = execution.provide_cloneable_value(dep_id, reconstructed);
484 assert!(
485 applied,
486 "{source_command} for dep '{dep_id}' did not match any registered dep in this worker"
487 );
488}
489
490async fn apply_hosted_descriptors_locally(
503 execution: &mut TestSuiteExecution,
504 wire_codecs: &HashMap<String, (CloneableCodec, WorkerReconstructor)>,
505 descriptor_bytes: &[DepWireBytes],
506) {
507 for (dep_id, wire_bytes) in descriptor_bytes {
508 let (codec, worker_fn) = wire_codecs.get(dep_id).unwrap_or_else(|| {
509 panic!("Hosted dep '{dep_id}' missing codec/worker_fn for local handle reconstruction")
510 });
511 let wire_payload = (codec.from_wire_bytes)(wire_bytes);
512 let empty_deps: Arc<dyn internal::DependencyView + Send + Sync> =
513 Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
514 let reconstructed = match worker_fn {
515 WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
516 WorkerReconstructor::Async(f) => f(wire_payload, empty_deps).await,
517 };
518 let applied = execution.provide_cloneable_value(dep_id, reconstructed);
519 assert!(
520 applied,
521 "Hosted dep '{dep_id}' could not be pre-populated locally"
522 );
523 }
524}
525
526async fn pick_next(execution: &Arc<Mutex<TestSuiteExecution>>) -> Option<TestExecution> {
527 let mut execution = execution.lock().await;
528 execution.pick_next().await
529}
530
531async fn run_with_flakiness_control<F>(
532 output: Arc<dyn TestRunnerOutput>,
533 test_description: &RegisteredTest,
534 idx: usize,
535 count: usize,
536 test: F,
537) -> Result<Result<(), FailureCause>, Box<dyn Any + Send>>
538where
539 F: Fn(
540 Instant,
541 )
542 -> Pin<Box<dyn Future<Output = Result<Result<(), FailureCause>, Box<dyn Any + Send>>>>>
543 + Send
544 + Sync,
545{
546 match &test_description.props.flakiness_control {
547 FlakinessControl::None => {
548 let start = Instant::now();
549 test(start).await
550 }
551 FlakinessControl::ProveNonFlaky(tries) => {
552 for n in 0..*tries {
553 if n > 0 {
554 output.repeat_running_test(
555 test_description,
556 idx,
557 count,
558 n + 1,
559 *tries,
560 "to ensure test is not flaky",
561 );
562 }
563 let start = Instant::now();
564 match test(start).await {
565 Ok(Ok(())) => {}
566 Ok(Err(e)) => return Ok(Err(e)),
567 Err(e) => return Err(e),
568 };
569 }
570 Ok(Ok(()))
571 }
572 FlakinessControl::RetryKnownFlaky(max_retries) => {
573 let mut tries = 1;
574 loop {
575 let start = Instant::now();
576 let result = test(start).await;
577
578 if result.is_err() && tries < *max_retries {
579 tries += 1;
580 output.repeat_running_test(
581 test_description,
582 idx,
583 count,
584 tries,
585 *max_retries,
586 "because test is known to be flaky",
587 );
588 } else {
589 break result;
590 }
591 }
592 }
593 }
594}
595
596#[allow(clippy::too_many_arguments)]
597async fn run_test(
598 output: Arc<dyn TestRunnerOutput>,
599 idx: usize,
600 count: usize,
601 nocapture: bool,
602 include_ignored: bool,
603 ensure_time: Option<TimeThreshold>,
604 dependency_view: Arc<dyn internal::DependencyView + Send + Sync>,
605 test: &RegisteredTest,
606 worker: &mut Option<Worker>,
607) -> TestResult {
608 if test.props.is_ignored && !include_ignored {
609 TestResult::ignored()
610 } else if let Some(worker) = worker.as_mut() {
611 worker.run_test(nocapture, test).await
612 } else {
613 let start = Instant::now();
614 let test = test.clone();
615 match &test.run {
616 TestFunction::Sync(_) => {
617 let handle = spawn_blocking(move || {
618 let test = test.clone();
619 crate::sync::run_sync_test_function(
620 output,
621 &test,
622 idx,
623 count,
624 ensure_time,
625 dependency_view,
626 )
627 });
628 handle.await.unwrap_or_else(|join_error| {
629 TestResult::failed(
630 start.elapsed(),
631 FailureCause::HarnessError(format!(
632 "Failed joining test task: {join_error}"
633 )),
634 )
635 })
636 }
637 TestFunction::Async(test_fn) => {
638 let timeout = test.props.timeout;
639 let test_fn = test_fn.clone();
640 let detached_panic_policy = test.props.detached_panic_policy.clone();
641 let result = run_with_flakiness_control(output, &test, idx, count, |start| {
642 let dependency_view = dependency_view.clone();
643 let test_fn = test_fn.clone();
644 Box::pin(async move {
645 let test_id = crate::panic_hook::next_test_id();
646 crate::panic_hook::set_current_test_id(test_id);
647 crate::panic_hook::create_detached_collector(test_id);
648 let result = AssertUnwindSafe(Box::pin(async move {
649 match timeout {
650 None => test_fn(dependency_view).await,
651 Some(duration) => {
652 let result =
653 tokio::time::timeout(duration, test_fn(dependency_view))
654 .await;
655 match result {
656 Ok(result) => result,
657 Err(_) => {
658 return Err(FailureCause::HarnessError(
659 "Test timed out".to_string(),
660 ))
661 }
662 }
663 }
664 }
665 .into_result()?;
666 if let Some(ensure_time) = ensure_time {
667 let elapsed = start.elapsed();
668 if ensure_time.is_critical(&elapsed) {
669 return Err(FailureCause::HarnessError(format!(
670 "Test run time exceeds critical threshold: {elapsed:?}"
671 )));
672 }
673 }
674 Ok(())
675 }))
676 .catch_unwind()
677 .await;
678 result
679 })
680 })
681 .await;
682 let mut test_result =
683 TestResult::from_result(&test.props.should_panic, start.elapsed(), result);
684 if let Some(test_id) = crate::panic_hook::current_test_id() {
685 if let Some(collector) = crate::panic_hook::take_detached_collector(test_id) {
686 let panics = match collector.lock() {
687 Ok(p) => p,
688 Err(poisoned) => poisoned.into_inner(),
689 };
690 if !panics.is_empty()
691 && detached_panic_policy == internal::DetachedPanicPolicy::FailTest
692 && test_result.is_passed()
693 {
694 let messages: Vec<String> = panics.iter().map(|p| p.render()).collect();
695 test_result = TestResult::failed(
696 start.elapsed(),
697 FailureCause::Panic(internal::PanicCause {
698 message: Some(format!(
699 "Detached task(s) panicked:\n{}",
700 messages.join("\n---\n")
701 )),
702 location: panics.first().and_then(|p| p.location.clone()),
703 backtrace: panics.first().and_then(|p| p.backtrace.clone()),
704 }),
705 );
706 }
707 }
708 }
709 crate::panic_hook::clear_current_test_id();
710 test_result
711 }
712 TestFunction::SyncBench(_) => {
713 let handle = spawn_blocking(move || {
714 let test = test.clone();
715 crate::sync::run_sync_test_function(
716 output,
717 &test,
718 idx,
719 count,
720 ensure_time,
721 dependency_view,
722 )
723 });
724 handle.await.unwrap_or_else(|join_error| {
725 TestResult::failed(
726 start.elapsed(),
727 FailureCause::HarnessError(format!(
728 "Failed joining test task: {join_error}"
729 )),
730 )
731 })
732 }
733 TestFunction::AsyncBench(bench_fn) => {
734 let mut bencher = AsyncBencher::new();
735 let test_id = crate::panic_hook::next_test_id();
736 crate::panic_hook::set_current_test_id(test_id);
737 let result = AssertUnwindSafe(async move {
738 bench_fn(&mut bencher, dependency_view).await;
739 (
740 bencher
741 .summary()
742 .expect("iter() was not called in bench function"),
743 bencher.bytes,
744 )
745 })
746 .catch_unwind()
747 .await;
748 let bytes = result.as_ref().map(|(_, bytes)| *bytes).unwrap_or_default();
749 let test_result = TestResult::from_summary(
750 &test.props.should_panic,
751 start.elapsed(),
752 result.map(|(summary, _)| summary),
753 bytes,
754 );
755 crate::panic_hook::clear_current_test_id();
756 test_result
757 }
758 }
759 }
760}
761
762struct Worker {
763 _listener: Listener,
764 _process: Child,
765 _out_handle: JoinHandle<()>,
766 _err_handle: JoinHandle<()>,
767 out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
768 err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
769 capture_enabled: Arc<Mutex<bool>>,
770 connection: Stream,
771 hosted_rpc_owner_cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>,
775}
776
777impl Worker {
778 fn set_hosted_rpc_owner_cells(&mut self, cells: Arc<HashMap<String, Arc<HostedRpcOwnerCell>>>) {
782 self.hosted_rpc_owner_cells = cells;
783 }
784
785 async fn handle_hosted_rpc_call(
791 &mut self,
792 dump_on_ipc_failure: &DumpOnFailure,
793 request_id: u64,
794 dep_id: String,
795 method_idx: u32,
796 args_bytes: Vec<u8>,
797 ) {
798 let body = match self.hosted_rpc_owner_cells.get(&dep_id) {
799 Some(cell) => match cell.dispatch(method_idx, &args_bytes) {
800 Ok(result_bytes) => HostedRpcReplyBody::Ok { result_bytes },
801 Err(message) => HostedRpcReplyBody::Err { message },
802 },
803 None => HostedRpcReplyBody::Err {
804 message: format!(
805 "HostedRpc dispatch: unknown dep id '{dep_id}' in parent owner-cell map"
806 ),
807 },
808 };
809 let reply = IpcCommand::HostedRpcReply { request_id, body };
810 let msg = serialize_to_byte_vec(&reply).expect("Failed to encode HostedRpcReply");
811 dump_on_ipc_failure
812 .run(write_frame_async(&mut self.connection, &msg).await)
813 .await;
814 }
815
816 pub async fn run_test(&mut self, nocapture: bool, test: &RegisteredTest) -> TestResult {
817 let mut capture_enabled = self.capture_enabled.lock().await;
818 *capture_enabled = test.props.capture_control.requires_capturing(!nocapture);
819 drop(capture_enabled);
820
821 let cmd = IpcCommand::RunTest {
823 name: test.name.clone(),
824 crate_name: test.crate_name.clone(),
825 module_path: test.module_path.clone(),
826 };
827
828 let dump_on_ipc_failure = self.dump_on_failure();
829
830 let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
831 dump_on_ipc_failure
832 .run(write_frame_async(&mut self.connection, &msg).await)
833 .await;
834
835 let response = loop {
836 let response_bytes = dump_on_ipc_failure
837 .run(read_frame_async(&mut self.connection).await)
838 .await;
839 let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
840 match response {
841 IpcResponse::TestFinished { .. } => break response,
842 IpcResponse::CloneableAccepted { .. }
843 | IpcResponse::HostedDescriptorAccepted { .. } => continue,
844 IpcResponse::HostedRpcCall {
845 request_id,
846 dep_id,
847 method_idx,
848 args_bytes,
849 } => {
850 self.handle_hosted_rpc_call(
851 &dump_on_ipc_failure,
852 request_id,
853 dep_id,
854 method_idx,
855 args_bytes,
856 )
857 .await;
858 continue;
859 }
860 }
861 };
862
863 let IpcResponse::TestFinished {
864 result,
865 finish_marker,
866 } = response
867 else {
868 unreachable!("loop only breaks on TestFinished")
869 };
870
871 if test.props.capture_control.requires_capturing(!nocapture) {
872 let out_lines: Vec<_> =
873 Self::drain_until(self.out_lines.clone(), finish_marker.clone()).await;
874 let err_lines: Vec<_> =
875 Self::drain_until(self.err_lines.clone(), finish_marker.clone()).await;
876 result.into_test_result(out_lines, err_lines)
877 } else {
878 result.into_test_result(Vec::new(), Vec::new())
879 }
880 }
881
882 async fn provide_cloneable(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
885 let dump_on_ipc_failure = self.dump_on_failure();
886 let cmd = IpcCommand::ProvideCloneable {
887 dep_id: dep_id.clone(),
888 wire_bytes,
889 };
890 let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
891 dump_on_ipc_failure
892 .run(write_frame_async(&mut self.connection, &msg).await)
893 .await;
894
895 loop {
896 let response_bytes = dump_on_ipc_failure
897 .run(read_frame_async(&mut self.connection).await)
898 .await;
899 let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
900 match response {
901 IpcResponse::CloneableAccepted { dep_id: ack_id } => {
902 if ack_id == dep_id {
903 return;
904 }
905 }
906 IpcResponse::HostedDescriptorAccepted { .. } => {
907 }
909 IpcResponse::TestFinished { .. } => {
910 }
912 IpcResponse::HostedRpcCall {
913 request_id,
914 dep_id: rpc_dep_id,
915 method_idx,
916 args_bytes,
917 } => {
918 self.handle_hosted_rpc_call(
923 &dump_on_ipc_failure,
924 request_id,
925 rpc_dep_id,
926 method_idx,
927 args_bytes,
928 )
929 .await;
930 }
931 }
932 }
933 }
934
935 async fn provide_hosted_descriptor(&mut self, dep_id: String, wire_bytes: Vec<u8>) {
937 let dump_on_ipc_failure = self.dump_on_failure();
938 let cmd = IpcCommand::ProvideHostedDescriptor {
939 dep_id: dep_id.clone(),
940 wire_bytes,
941 };
942 let msg = serialize_to_byte_vec(&cmd).expect("Failed to encode IPC command");
943 dump_on_ipc_failure
944 .run(write_frame_async(&mut self.connection, &msg).await)
945 .await;
946
947 loop {
948 let response_bytes = dump_on_ipc_failure
949 .run(read_frame_async(&mut self.connection).await)
950 .await;
951 let response: IpcResponse = dump_on_ipc_failure.run(deserialize(&response_bytes)).await;
952 match response {
953 IpcResponse::HostedDescriptorAccepted { dep_id: ack_id } => {
954 if ack_id == dep_id {
955 return;
956 }
957 }
958 IpcResponse::CloneableAccepted { .. } => {
959 }
961 IpcResponse::TestFinished { .. } => {
962 }
964 IpcResponse::HostedRpcCall {
965 request_id,
966 dep_id: rpc_dep_id,
967 method_idx,
968 args_bytes,
969 } => {
970 self.handle_hosted_rpc_call(
973 &dump_on_ipc_failure,
974 request_id,
975 rpc_dep_id,
976 method_idx,
977 args_bytes,
978 )
979 .await;
980 }
981 }
982 }
983 }
984
985 fn dump_on_failure(&self) -> DumpOnFailure {
986 DumpOnFailure {
987 out_lines: self.out_lines.clone(),
988 err_lines: self.err_lines.clone(),
989 }
990 }
991
992 async fn drain_until(
993 source: Arc<Mutex<VecDeque<CapturedOutput>>>,
994 finish_marker: String,
995 ) -> Vec<CapturedOutput> {
996 let mut result = Vec::new();
997 loop {
998 let mut source = source.lock().await;
999 while let Some(line) = source.pop_front() {
1000 if line.line() == finish_marker {
1001 return result;
1002 } else {
1003 result.push(line.clone());
1004 }
1005 }
1006 drop(source);
1007
1008 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1009 }
1010 }
1011}
1012
1013struct DumpOnFailure {
1014 out_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1015 err_lines: Arc<Mutex<VecDeque<CapturedOutput>>>,
1016}
1017
1018impl DumpOnFailure {
1019 pub async fn run<T, E>(&self, result: Result<T, E>) -> T {
1020 match result {
1021 Ok(value) => value,
1022 Err(_error) => {
1023 let out_lines: Vec<_> = self.out_lines.lock().await.drain(..).collect();
1024 let err_lines: Vec<_> = self.err_lines.lock().await.drain(..).collect();
1025 let mut all_lines = [out_lines, err_lines].concat();
1026 all_lines.sort();
1027
1028 for line in all_lines {
1029 eprintln!("{}", line.line());
1030 }
1031
1032 std::process::exit(1);
1033 }
1034 }
1035 }
1036}
1037
1038async fn spawn_worker_if_needed(args: &Arguments) -> Option<Worker> {
1039 if args.spawn_workers {
1040 let id = Uuid::new_v4();
1041 let name_str = format!("{id}.sock");
1042 let name = name_str
1043 .clone()
1044 .to_ns_name::<GenericNamespaced>()
1045 .expect("Invalid local socket name");
1046 let opts = ListenerOptions::new().name(name.clone());
1047 let listener = opts
1048 .create_tokio()
1049 .expect("Failed to create local socket listener");
1050
1051 let exe = std::env::current_exe().expect("Failed to get current executable path");
1052
1053 let mut args = args.clone();
1054 args.ipc = Some(name_str);
1055 args.spawn_workers = false;
1056 args.logfile = None;
1057 let args = args.to_args();
1058
1059 let mut process = Command::new(exe)
1060 .args(args)
1061 .stdin(Stdio::inherit())
1062 .stderr(Stdio::piped())
1063 .stdout(Stdio::piped())
1064 .spawn()
1065 .expect("Failed to spawn worker process");
1066
1067 let stdout = process.stdout.take().unwrap();
1068 let stderr = process.stderr.take().unwrap();
1069
1070 let out_lines = Arc::new(Mutex::new(VecDeque::new()));
1071 let err_lines = Arc::new(Mutex::new(VecDeque::new()));
1072 let capture_enabled = Arc::new(Mutex::new(true));
1073
1074 let out_lines_clone = out_lines.clone();
1075 let capture_enabled_clone = capture_enabled.clone();
1076 let out_handle = spawn(async move {
1077 let reader = BufReader::new(stdout);
1078 let mut lines = reader.lines();
1079 while let Some(line) = lines
1080 .next_line()
1081 .await
1082 .expect("Failed to read from worker stdout")
1083 {
1084 if *capture_enabled_clone.lock().await {
1085 out_lines_clone
1086 .lock()
1087 .await
1088 .push_back(CapturedOutput::stdout(line));
1089 } else {
1090 println!("{line}");
1091 }
1092 }
1093 });
1094
1095 let err_lines_clone = err_lines.clone();
1096 let capture_enabled_clone = capture_enabled.clone();
1097 let err_handle = spawn(async move {
1098 let reader = BufReader::new(stderr);
1099 let mut lines = reader.lines();
1100 while let Some(line) = lines
1101 .next_line()
1102 .await
1103 .expect("Failed to read from worker stderr")
1104 {
1105 if *capture_enabled_clone.lock().await {
1106 err_lines_clone
1107 .lock()
1108 .await
1109 .push_back(CapturedOutput::stderr(line));
1110 } else {
1111 eprintln!("{line}");
1112 }
1113 }
1114 });
1115
1116 let connection = listener
1117 .accept()
1118 .await
1119 .expect("Failed to accept connection");
1120
1121 Some(Worker {
1122 _listener: listener,
1123 _process: process,
1124 _out_handle: out_handle,
1125 _err_handle: err_handle,
1126 out_lines,
1127 err_lines,
1128 connection,
1129 capture_enabled,
1130 hosted_rpc_owner_cells: Arc::new(HashMap::new()),
1131 })
1132 } else {
1133 None
1134 }
1135}
1136
1137fn install_local_hosted_rpc_stubs(
1143 execution: &mut TestSuiteExecution,
1144 rpc_factories: &HashMap<String, RpcFactory>,
1145 owner_cells: &HashMap<String, Arc<HostedRpcOwnerCell>>,
1146) {
1147 let transport: Arc<dyn HostedRpcTransport> =
1148 Arc::new(InProcessHostedRpcTransport::new(owner_cells.clone()));
1149 for (dep_id, factory) in rpc_factories.iter() {
1150 if !owner_cells.contains_key(dep_id) {
1151 continue;
1155 }
1156 let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1157 let stub = (factory.build_stub)(channel);
1158 let applied = execution.provide_cloneable_value(dep_id, stub);
1159 assert!(
1160 applied,
1161 "Local HostedRpc stub for '{dep_id}' did not match any registered dep"
1162 );
1163 }
1164}
1165
1166async fn install_worker_subprocess_hosted_rpc_stubs(
1171 execution: &Arc<Mutex<TestSuiteExecution>>,
1172 rpc_factories: &HashMap<String, RpcFactory>,
1173 connection_arc: Arc<Mutex<Stream>>,
1174) {
1175 let transport: Arc<dyn HostedRpcTransport> =
1176 Arc::new(IpcHostedRpcTransport::new(connection_arc));
1177 for (dep_id, factory) in rpc_factories.iter() {
1178 let channel = HostedRpcChannel::new(dep_id.clone(), transport.clone());
1179 let stub = (factory.build_stub)(channel);
1180 let mut execution = execution.lock().await;
1181 let applied = execution.provide_cloneable_value(dep_id, stub);
1182 let _ = applied;
1185 }
1186}
1187
1188struct IpcHostedRpcTransport {
1206 connection: Arc<Mutex<Stream>>,
1207 next_request_id: AtomicU64,
1208}
1209
1210impl IpcHostedRpcTransport {
1211 fn new(connection: Arc<Mutex<Stream>>) -> Self {
1212 Self {
1213 connection,
1214 next_request_id: AtomicU64::new(0),
1215 }
1216 }
1217}
1218
1219impl HostedRpcTransport for IpcHostedRpcTransport {
1220 fn call(
1221 &self,
1222 dep_id: &str,
1223 method_idx: u32,
1224 args: Vec<u8>,
1225 ) -> Result<Vec<u8>, HostedRpcError> {
1226 let request_id = self.next_request_id.fetch_add(1, Ordering::SeqCst);
1227 let call = IpcResponse::HostedRpcCall {
1228 request_id,
1229 dep_id: dep_id.to_string(),
1230 method_idx,
1231 args_bytes: args,
1232 };
1233 let msg = serialize_to_byte_vec(&call).map_err(|e| {
1234 HostedRpcError::Transport(format!("encode HostedRpcCall failed: {e:?}"))
1235 })?;
1236
1237 let connection = self.connection.clone();
1238 let handle = tokio::runtime::Handle::current();
1239
1240 tokio::task::block_in_place(move || {
1245 handle.block_on(async move {
1246 let mut conn = connection.lock().await;
1247 write_frame_async(&mut *conn, &msg).await.map_err(|e| {
1248 HostedRpcError::Transport(format!("write HostedRpcCall failed: {e:?}"))
1249 })?;
1250 let reply_bytes = read_frame_async(&mut *conn).await.map_err(|e| {
1251 HostedRpcError::Transport(format!("read HostedRpcReply failed: {e:?}"))
1252 })?;
1253 let command: IpcCommand = deserialize(&reply_bytes).map_err(|e| {
1254 HostedRpcError::Transport(format!("decode HostedRpcReply failed: {e:?}"))
1255 })?;
1256 match command {
1257 IpcCommand::HostedRpcReply {
1258 request_id: reply_id,
1259 body,
1260 } => {
1261 if reply_id != request_id {
1262 return Err(HostedRpcError::Transport(format!(
1263 "HostedRpcReply request_id mismatch: expected {request_id}, got {reply_id}"
1264 )));
1265 }
1266 match body {
1267 HostedRpcReplyBody::Ok { result_bytes } => Ok(result_bytes),
1268 HostedRpcReplyBody::Err { message } => {
1269 Err(HostedRpcError::Dispatch(message))
1270 }
1271 }
1272 }
1273 other => Err(HostedRpcError::Transport(format!(
1274 "unexpected IpcCommand while waiting for HostedRpcReply: {other:?}"
1275 ))),
1276 }
1277 })
1278 })
1279 }
1280}