use super::*;
use crate::internal::{
CloneableCodec, DependencyConstructor, RegisteredDependency, RegisteredTest,
RegisteredTestSuiteProperty, TestFunction, TestProperties,
};
use std::sync::atomic::{AtomicUsize, Ordering};
fn registered_test(name: &str, deps: Vec<String>) -> RegisteredTest {
registered_test_in_module(name, "", deps)
}
fn registered_test_in_module(name: &str, module_path: &str, deps: Vec<String>) -> RegisteredTest {
RegisteredTest {
name: name.to_string(),
crate_name: "tcrate".to_string(),
module_path: module_path.to_string(),
run: TestFunction::Sync(Arc::new(|_| Box::new(()))),
props: TestProperties::default(),
dependencies: Some(deps),
}
}
fn registered_cloneable_dep(name: &str, counter: Arc<AtomicUsize>) -> RegisteredDependency {
registered_cloneable_dep_in(name, "", 0xdead_beef, counter)
}
fn registered_cloneable_dep_in(
name: &str,
module_path: &str,
constructor_value: u64,
counter: Arc<AtomicUsize>,
) -> RegisteredDependency {
let constructor_counter = counter.clone();
let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
constructor_counter.fetch_add(1, Ordering::SeqCst);
Arc::new(constructor_value) as Arc<dyn Any + Send + Sync>
}));
let codec = CloneableCodec {
to_wire: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
let value: Arc<u64> = any.downcast::<u64>().unwrap();
(*value).to_le_bytes().to_vec()
}),
from_wire_bytes: Arc::new(|bytes: &[u8]| {
let arr: [u8; 8] = bytes.try_into().unwrap();
let value = u64::from_le_bytes(arr);
Arc::new(value) as Arc<dyn Any + Send + Sync>
}),
};
RegisteredDependency {
name: name.to_string(),
crate_name: "tcrate".to_string(),
module_path: module_path.to_string(),
constructor,
dependencies: Vec::new(),
scope: DepScope::Cloneable,
worker_fn: Some(crate::internal::WorkerReconstructor::Sync(Arc::new(
|wire_payload, _deps| wire_payload,
))),
cloneable_codec: Some(codec),
hosted_codec: None,
rpc_factory: None,
companions: Vec::new(),
}
}
fn registered_perworker_counting_dep(
name: &str,
module_path: &str,
counter: Arc<AtomicUsize>,
) -> RegisteredDependency {
let constructor_counter = counter.clone();
let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
constructor_counter.fetch_add(1, Ordering::SeqCst);
Arc::new(0u64) as Arc<dyn Any + Send + Sync>
}));
RegisteredDependency {
name: name.to_string(),
crate_name: "tcrate".to_string(),
module_path: module_path.to_string(),
constructor,
dependencies: Vec::new(),
scope: DepScope::PerWorker,
worker_fn: None,
cloneable_codec: None,
hosted_codec: None,
rpc_factory: None,
companions: Vec::new(),
}
}
#[test]
fn perworker_dep_not_rematerialised_when_descendant_subtree_is_locked() {
let counter = Arc::new(AtomicUsize::new(0));
let dep = registered_perworker_counting_dep("perworker_dep", "parent", counter.clone());
let test_a =
registered_test_in_module("t_a", "parent::child", vec!["perworker_dep".to_string()]);
let test_b =
registered_test_in_module("t_b", "parent::child", vec!["perworker_dep".to_string()]);
let sequential_prop = RegisteredTestSuiteProperty::Sequential {
name: "child".to_string(),
crate_name: "tcrate".to_string(),
module_path: "parent".to_string(),
};
let (mut execution, _filtered) = TestSuiteExecution::construct(
&Arguments::default(),
&[dep],
&[test_a, test_b],
&[sequential_prop],
);
let first = execution
.pick_next_sync()
.expect("first test should be picked");
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"PerWorker constructor must run once when the parent subtree first materialises"
);
let none = execution.pick_next_sync();
assert!(
none.is_none(),
"no test should be picked while the sequential lock is held"
);
drop(first);
let second = execution
.pick_next_sync()
.expect("second test should be picked");
drop(second);
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"PerWorker constructor must remain one per process — a temporarily-locked \
descendant subtree must not cause the parent's materialised deps to be \
released and rematerialised"
);
}
#[cfg(feature = "tokio")]
#[test]
fn perworker_dep_not_rematerialised_when_descendant_subtree_is_locked_async() {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async {
let counter = Arc::new(AtomicUsize::new(0));
let dep = registered_perworker_counting_dep("perworker_dep", "parent", counter.clone());
let test_a =
registered_test_in_module("t_a", "parent::child", vec!["perworker_dep".to_string()]);
let test_b =
registered_test_in_module("t_b", "parent::child", vec!["perworker_dep".to_string()]);
let sequential_prop = RegisteredTestSuiteProperty::Sequential {
name: "child".to_string(),
crate_name: "tcrate".to_string(),
module_path: "parent".to_string(),
};
let (mut execution, _filtered) = TestSuiteExecution::construct(
&Arguments::default(),
&[dep],
&[test_a, test_b],
&[sequential_prop],
);
let first = execution
.pick_next()
.await
.expect("first test should be picked");
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"PerWorker constructor must run once when the parent subtree first materialises"
);
let none = execution.pick_next().await;
assert!(
none.is_none(),
"no test should be picked while the sequential lock is held"
);
drop(first);
let second = execution
.pick_next()
.await
.expect("second test should be picked");
drop(second);
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"PerWorker constructor must remain one per process on the async path — \
keep `pick_next_internal` and `pick_next_internal_sync` symmetrical"
);
});
}
#[test]
fn cloneable_wire_collection_runs_constructor_once_and_encodes_value() {
let counter = Arc::new(AtomicUsize::new(0));
let dep = registered_cloneable_dep("clone_dep", counter.clone());
let test = registered_test("t1", vec!["clone_dep".to_string()]);
let (execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
let collected = execution.collect_cloneable_wire_bytes_sync();
assert_eq!(collected.len(), 1, "exactly one cloneable dep expected");
let (dep_id, wire_bytes) = &collected[0];
assert_eq!(
dep_id, "tcrate::clone_dep",
"wire bytes must be keyed by the fully-qualified id, not the local name"
);
assert_eq!(
wire_bytes.as_slice(),
&0xdead_beef_u64.to_le_bytes(),
"expected the codec-encoded value to round-trip via to_wire"
);
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"constructor must have run exactly once when collecting"
);
}
#[test]
fn prune_unused_deps_retains_companion_when_only_one_half_is_referenced() {
let counter_a = Arc::new(AtomicUsize::new(0));
let counter_b = Arc::new(AtomicUsize::new(0));
let mut dep_a = registered_cloneable_dep("clone_a", counter_a.clone());
let mut dep_b = registered_cloneable_dep("clone_b", counter_b.clone());
dep_a.companions = vec!["clone_b".to_string()];
dep_b.companions = vec!["clone_a".to_string()];
let test_a = registered_test("t_uses_a", vec!["clone_a".to_string()]);
let (execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test_a], &[]);
let kept: Vec<String> = execution
.collect_cloneable_dependencies()
.into_iter()
.map(|d| d.name)
.collect();
assert!(
kept.contains(&"clone_a".to_string()),
"directly referenced dep must be retained, kept = {kept:?}"
);
assert!(
kept.contains(&"clone_b".to_string()),
"companion of a retained dep must also be retained (the planner-only \
sibling link used by `worker = both(...)`), kept = {kept:?}"
);
let counter_a = Arc::new(AtomicUsize::new(0));
let counter_b = Arc::new(AtomicUsize::new(0));
let mut dep_a = registered_cloneable_dep("clone_a", counter_a.clone());
let mut dep_b = registered_cloneable_dep("clone_b", counter_b.clone());
dep_a.companions = vec!["clone_b".to_string()];
dep_b.companions = vec!["clone_a".to_string()];
let test_b = registered_test("t_uses_b", vec!["clone_b".to_string()]);
let (execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test_b], &[]);
let kept: Vec<String> = execution
.collect_cloneable_dependencies()
.into_iter()
.map(|d| d.name)
.collect();
assert!(
kept.contains(&"clone_a".to_string()),
"companion of a stub-referenced dep must be retained, kept = {kept:?}"
);
assert!(
kept.contains(&"clone_b".to_string()),
"directly referenced dep must be retained, kept = {kept:?}"
);
let counter_a = Arc::new(AtomicUsize::new(0));
let counter_b = Arc::new(AtomicUsize::new(0));
let dep_a = registered_cloneable_dep("clone_a", counter_a.clone());
let dep_b = registered_cloneable_dep("clone_b", counter_b.clone());
let test_a = registered_test("t_uses_a", vec!["clone_a".to_string()]);
let (execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test_a], &[]);
let kept: Vec<String> = execution
.collect_cloneable_dependencies()
.into_iter()
.map(|d| d.name)
.collect();
assert!(
kept.contains(&"clone_a".to_string()),
"directly referenced dep must be retained, kept = {kept:?}"
);
assert!(
!kept.contains(&"clone_b".to_string()),
"without a companion link, an unreferenced dep must be pruned; \
kept = {kept:?}"
);
}
#[test]
fn provide_cloneable_value_short_circuits_constructor() {
let counter = Arc::new(AtomicUsize::new(0));
let dep = registered_cloneable_dep("clone_dep", counter.clone());
let test = registered_test("t1", vec!["clone_dep".to_string()]);
let (mut execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
let pre_value: Arc<dyn Any + Send + Sync> = Arc::new(99_u64);
let applied = execution.provide_cloneable_value("tcrate::clone_dep", pre_value);
assert!(
applied,
"pre-populated value should match the dep's qualified id"
);
let next = execution.pick_next_sync().expect("test should be picked");
assert_eq!(next.test.name, "t1");
let view = next.deps.get("clone_dep").expect("dep available");
let value: Arc<u64> = view.downcast::<u64>().unwrap();
assert_eq!(*value, 99);
assert_eq!(
counter.load(Ordering::SeqCst),
0,
"constructor must not run when a pre-populated value is supplied"
);
}
#[test]
fn provided_shared_value_is_a_worker_side_leaf() {
let provided_counter = Arc::new(AtomicUsize::new(0));
let parent_only_counter = Arc::new(AtomicUsize::new(0));
let mut provided_dep = registered_cloneable_dep("clone_dep", provided_counter.clone());
provided_dep.dependencies = vec!["parent_only_dep".to_string()];
let parent_only_dep = registered_cloneable_dep("parent_only_dep", parent_only_counter.clone());
let test = registered_test("t1", vec!["clone_dep".to_string()]);
let (mut execution, _filtered) = TestSuiteExecution::construct(
&Arguments::default(),
&[provided_dep, parent_only_dep],
&[test],
&[],
);
let pre_value: Arc<dyn Any + Send + Sync> = Arc::new(99_u64);
let applied = execution.provide_cloneable_value("tcrate::clone_dep", pre_value);
assert!(applied);
let next = execution.pick_next_sync().expect("test should be picked");
let view = next.deps.get("clone_dep").expect("dep available");
let value: Arc<u64> = view.downcast::<u64>().unwrap();
assert_eq!(*value, 99);
assert_eq!(
provided_counter.load(Ordering::SeqCst),
0,
"worker-side provided values must not run their original constructor"
);
assert_eq!(
parent_only_counter.load(Ordering::SeqCst),
0,
"constructor dependencies are parent-only once a value arrives from wire bytes or an RPC stub"
);
}
#[cfg(feature = "tokio")]
#[test]
fn async_cloneable_wire_collection_awaits_async_constructor() {
use std::pin::Pin;
let counter = Arc::new(AtomicUsize::new(0));
let constructor_counter = counter.clone();
let constructor = DependencyConstructor::Async(Arc::new(move |_view| {
let counter = constructor_counter.clone();
Box::pin(async move {
tokio::task::yield_now().await;
counter.fetch_add(1, Ordering::SeqCst);
let value: u64 = 0xdead_beef;
Arc::new(value) as Arc<dyn Any + Send + Sync>
}) as Pin<Box<dyn std::future::Future<Output = Arc<dyn Any + Send + Sync>>>>
}));
let codec = CloneableCodec {
to_wire: Arc::new(|any| {
let v: Arc<u64> = any.downcast::<u64>().unwrap();
(*v).to_le_bytes().to_vec()
}),
from_wire_bytes: Arc::new(|bytes| {
let arr: [u8; 8] = bytes.try_into().unwrap();
Arc::new(u64::from_le_bytes(arr)) as Arc<dyn Any + Send + Sync>
}),
};
let dep = RegisteredDependency {
name: "clone_dep".to_string(),
crate_name: "tcrate".to_string(),
module_path: String::new(),
constructor,
dependencies: Vec::new(),
scope: DepScope::Cloneable,
worker_fn: Some(crate::internal::WorkerReconstructor::Sync(Arc::new(
|wire_payload, _| wire_payload,
))),
cloneable_codec: Some(codec),
hosted_codec: None,
rpc_factory: None,
companions: Vec::new(),
};
let test = registered_test("t1", vec!["clone_dep".to_string()]);
let (execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let collected = runtime.block_on(execution.collect_cloneable_wire_bytes_async());
assert_eq!(collected.len(), 1);
assert_eq!(collected[0].0, "tcrate::clone_dep");
assert_eq!(collected[0].1.as_slice(), &0xdead_beef_u64.to_le_bytes());
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"async constructor must have run exactly once"
);
}
#[test]
fn cloneable_value_routing_uses_qualified_id_across_modules() {
let counter_a = Arc::new(AtomicUsize::new(0));
let counter_b = Arc::new(AtomicUsize::new(0));
let dep_a = registered_cloneable_dep_in("clone_dep", "mod_a", 11, counter_a.clone());
let dep_b = registered_cloneable_dep_in("clone_dep", "mod_b", 22, counter_b.clone());
let test_a = registered_test_in_module("t_a", "mod_a", vec!["clone_dep".to_string()]);
let test_b = registered_test_in_module("t_b", "mod_b", vec!["clone_dep".to_string()]);
let (execution, _filtered) = TestSuiteExecution::construct(
&Arguments::default(),
&[dep_a, dep_b],
&[test_a, test_b],
&[],
);
let mut collected = execution.collect_cloneable_wire_bytes_sync();
collected.sort_by(|l, r| l.0.cmp(&r.0));
assert_eq!(collected.len(), 2);
assert_eq!(collected[0].0, "tcrate::mod_a::clone_dep");
assert_eq!(collected[1].0, "tcrate::mod_b::clone_dep");
assert_eq!(collected[0].1.as_slice(), &11_u64.to_le_bytes());
assert_eq!(collected[1].1.as_slice(), &22_u64.to_le_bytes());
let mut execution_a = execution;
let applied_a =
execution_a.provide_cloneable_value("tcrate::mod_a::clone_dep", Arc::new(111_u64));
assert!(applied_a, "mod_a dep must be reachable by qualified id");
let applied_b =
execution_a.provide_cloneable_value("tcrate::mod_b::clone_dep", Arc::new(222_u64));
assert!(applied_b, "mod_b dep must be reachable by qualified id");
let applied_unknown =
execution_a.provide_cloneable_value("tcrate::mod_c::clone_dep", Arc::new(333_u64));
assert!(
!applied_unknown,
"unknown qualified id must not be applied anywhere"
);
let first = execution_a.pick_next_sync().expect("first test");
let second = execution_a.pick_next_sync().expect("second test");
let pairs: Vec<(String, u64)> = [first, second]
.into_iter()
.map(|n| {
let v: Arc<u64> = n
.deps
.get("clone_dep")
.expect("dep available")
.clone()
.downcast()
.unwrap();
(n.test.name.clone(), *v)
})
.collect();
let val_a = pairs
.iter()
.find(|(n, _)| n == "t_a")
.expect("t_a picked")
.1;
let val_b = pairs
.iter()
.find(|(n, _)| n == "t_b")
.expect("t_b picked")
.1;
assert_eq!(
val_a, 111,
"mod_a test must see mod_a's pre-populated value"
);
assert_eq!(
val_b, 222,
"mod_b test must see mod_b's pre-populated value"
);
assert_eq!(
counter_a.load(Ordering::SeqCst),
1,
"mod_a constructor must have run exactly once (during wire collection)"
);
assert_eq!(
counter_b.load(Ordering::SeqCst),
1,
"mod_b constructor must have run exactly once (during wire collection)"
);
}
fn registered_hosted_dep(
name: &str,
payload: u64,
owner_counter: Arc<AtomicUsize>,
) -> RegisteredDependency {
registered_hosted_dep_in(name, "", payload, owner_counter)
}
fn registered_hosted_dep_in(
name: &str,
module_path: &str,
payload: u64,
owner_counter: Arc<AtomicUsize>,
) -> RegisteredDependency {
let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
owner_counter.fetch_add(1, Ordering::SeqCst);
Arc::new(payload) as Arc<dyn Any + Send + Sync>
}));
let codec = CloneableCodec {
to_wire: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
let v: Arc<u64> = any.downcast::<u64>().unwrap();
(*v).to_le_bytes().to_vec()
}),
from_wire_bytes: Arc::new(|bytes: &[u8]| {
let boxed: Vec<u8> = bytes.to_vec();
Arc::new(boxed) as Arc<dyn Any + Send + Sync>
}),
};
let worker_fn = crate::internal::WorkerReconstructor::Sync(Arc::new(|wire_payload, _deps| {
let bytes_arc: Arc<Vec<u8>> = wire_payload.downcast::<Vec<u8>>().unwrap();
let arr: [u8; 8] = (*bytes_arc).as_slice().try_into().unwrap();
let value: u64 = u64::from_le_bytes(arr);
Arc::new(value) as Arc<dyn Any + Send + Sync>
}));
RegisteredDependency {
name: name.to_string(),
crate_name: "tcrate".to_string(),
module_path: module_path.to_string(),
constructor,
dependencies: Vec::new(),
scope: DepScope::Hosted,
worker_fn: Some(worker_fn),
cloneable_codec: None,
hosted_codec: Some(codec),
rpc_factory: None,
companions: Vec::new(),
}
}
#[test]
fn hosted_descriptor_collection_runs_owner_once_and_keeps_it_alive() {
let owner_counter = Arc::new(AtomicUsize::new(0));
let dep = registered_hosted_dep("hosted_dep", 0xcafe_babe_dead_beef, owner_counter.clone());
let test = registered_test("t1", vec!["hosted_dep".to_string()]);
let (execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
let (descriptors, owners) = execution.collect_hosted_descriptor_bytes_sync();
assert_eq!(descriptors.len(), 1, "exactly one hosted dep expected");
assert_eq!(owners.len(), 1, "exactly one hosted owner kept alive");
let (dep_id, descriptor_bytes) = &descriptors[0];
assert_eq!(
dep_id, "tcrate::hosted_dep",
"descriptor must be keyed by the fully-qualified id"
);
assert_eq!(
descriptor_bytes.as_slice(),
&0xcafe_babe_dead_beef_u64.to_le_bytes(),
"expected descriptor bytes to match codec.to_wire of payload"
);
assert_eq!(
owner_counter.load(Ordering::SeqCst),
1,
"owner constructor must have run exactly once"
);
let held: Arc<u64> = owners[0].clone().downcast::<u64>().unwrap();
assert_eq!(*held, 0xcafe_babe_dead_beef);
}
#[test]
fn hosted_descriptor_roundtrips_to_worker_value_via_provide_cloneable_value() {
let owner_counter = Arc::new(AtomicUsize::new(0));
let dep = registered_hosted_dep("hosted_dep", 0x1234_5678_u64, owner_counter.clone());
let test = registered_test("t1", vec!["hosted_dep".to_string()]);
let (mut execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
let pre_value: Arc<dyn Any + Send + Sync> = Arc::new(0x1234_5678_u64);
let applied = execution.provide_cloneable_value("tcrate::hosted_dep", pre_value);
assert!(
applied,
"Hosted dep must accept pre-populated values via the same path as Cloneable"
);
let next = execution.pick_next_sync().expect("test should be picked");
let view = next.deps.get("hosted_dep").expect("dep available");
let value: Arc<u64> = view.downcast::<u64>().unwrap();
assert_eq!(*value, 0x1234_5678);
assert_eq!(
owner_counter.load(Ordering::SeqCst),
0,
"Hosted owner constructor must not run on the worker side"
);
}
#[test]
fn has_hosted_dependencies_reports_correctly() {
let dep = registered_hosted_dep("h", 0, Arc::new(AtomicUsize::new(0)));
let test = registered_test("t1", vec!["h".to_string()]);
let (execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
assert!(execution.has_hosted_dependencies());
assert!(!execution.has_shared_dependencies());
assert!(!execution.has_cloneable_dependencies());
}
#[test]
fn hosted_owner_runs_exactly_once_even_when_collecting_multiple_times() {
let counter_a = Arc::new(AtomicUsize::new(0));
let counter_b = Arc::new(AtomicUsize::new(0));
let mut dep_a = registered_hosted_dep("hosted_a", 1, counter_a.clone());
dep_a.name = "hosted_a".to_string();
let mut dep_b = registered_hosted_dep("hosted_b", 2, counter_b.clone());
dep_b.name = "hosted_b".to_string();
let test = registered_test("t1", vec!["hosted_a".to_string(), "hosted_b".to_string()]);
let (execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep_a, dep_b], &[test], &[]);
let (descriptors, owners) = execution.collect_hosted_descriptor_bytes_sync();
assert_eq!(descriptors.len(), 2);
assert_eq!(owners.len(), 2);
assert_eq!(counter_a.load(Ordering::SeqCst), 1);
assert_eq!(counter_b.load(Ordering::SeqCst), 1);
}
#[test]
fn hosted_descriptor_routing_uses_qualified_id_across_modules() {
let counter_a = Arc::new(AtomicUsize::new(0));
let counter_b = Arc::new(AtomicUsize::new(0));
let dep_a = registered_hosted_dep_in("hosted_dep", "mod_a", 11, counter_a.clone());
let dep_b = registered_hosted_dep_in("hosted_dep", "mod_b", 22, counter_b.clone());
let test_a = registered_test_in_module("t_a", "mod_a", vec!["hosted_dep".to_string()]);
let test_b = registered_test_in_module("t_b", "mod_b", vec!["hosted_dep".to_string()]);
let (execution, _filtered) = TestSuiteExecution::construct(
&Arguments::default(),
&[dep_a, dep_b],
&[test_a, test_b],
&[],
);
let (mut descriptors, _owners) = execution.collect_hosted_descriptor_bytes_sync();
descriptors.sort_by(|l, r| l.0.cmp(&r.0));
assert_eq!(descriptors.len(), 2);
assert_eq!(descriptors[0].0, "tcrate::mod_a::hosted_dep");
assert_eq!(descriptors[1].0, "tcrate::mod_b::hosted_dep");
assert_eq!(descriptors[0].1.as_slice(), &11_u64.to_le_bytes());
assert_eq!(descriptors[1].1.as_slice(), &22_u64.to_le_bytes());
let mut execution = execution;
let applied_a =
execution.provide_cloneable_value("tcrate::mod_a::hosted_dep", Arc::new(111_u64));
assert!(
applied_a,
"mod_a hosted dep must be reachable by qualified id"
);
let applied_b =
execution.provide_cloneable_value("tcrate::mod_b::hosted_dep", Arc::new(222_u64));
assert!(
applied_b,
"mod_b hosted dep must be reachable by qualified id"
);
let applied_unknown =
execution.provide_cloneable_value("tcrate::mod_c::hosted_dep", Arc::new(333_u64));
assert!(
!applied_unknown,
"unknown qualified id must not be applied to any dep"
);
let first = execution.pick_next_sync().expect("first test");
let second = execution.pick_next_sync().expect("second test");
let pairs: Vec<(String, u64)> = [first, second]
.into_iter()
.map(|n| {
let v: Arc<u64> = n
.deps
.get("hosted_dep")
.expect("dep available")
.clone()
.downcast()
.unwrap();
(n.test.name.clone(), *v)
})
.collect();
let val_a = pairs
.iter()
.find(|(n, _)| n == "t_a")
.expect("t_a picked")
.1;
let val_b = pairs
.iter()
.find(|(n, _)| n == "t_b")
.expect("t_b picked")
.1;
assert_eq!(val_a, 111);
assert_eq!(val_b, 222);
assert_eq!(counter_a.load(Ordering::SeqCst), 1);
assert_eq!(counter_b.load(Ordering::SeqCst), 1);
}
#[test]
fn hosted_no_spawn_workers_uses_worker_side_handle() {
let owner_counter = Arc::new(AtomicUsize::new(0));
let constructor_counter = owner_counter.clone();
let owner_value: u64 = 0xAAAA_AAAA_AAAA_AAAA_u64;
let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
constructor_counter.fetch_add(1, Ordering::SeqCst);
Arc::new(owner_value) as Arc<dyn Any + Send + Sync>
}));
let codec = CloneableCodec {
to_wire: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
let v: Arc<u64> = any.downcast::<u64>().unwrap();
(*v).to_le_bytes().to_vec()
}),
from_wire_bytes: Arc::new(|bytes: &[u8]| {
let boxed: Vec<u8> = bytes.to_vec();
Arc::new(boxed) as Arc<dyn Any + Send + Sync>
}),
};
let worker_fn = crate::internal::WorkerReconstructor::Sync(Arc::new(|wire_payload, _deps| {
let bytes_arc: Arc<Vec<u8>> = wire_payload.downcast::<Vec<u8>>().unwrap();
let arr: [u8; 8] = (*bytes_arc).as_slice().try_into().unwrap();
let raw: u64 = u64::from_le_bytes(arr);
let handle_value: u64 = !raw;
Arc::new(handle_value) as Arc<dyn Any + Send + Sync>
}));
let dep = RegisteredDependency {
name: "hosted_dep".to_string(),
crate_name: "tcrate".to_string(),
module_path: String::new(),
constructor,
dependencies: Vec::new(),
scope: DepScope::Hosted,
worker_fn: Some(worker_fn.clone()),
cloneable_codec: None,
hosted_codec: Some(codec.clone()),
rpc_factory: None,
companions: Vec::new(),
};
let test = registered_test("t1", vec!["hosted_dep".to_string()]);
let (mut execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
let (descriptors, owners) = execution.collect_hosted_descriptor_bytes_sync();
assert_eq!(descriptors.len(), 1);
assert_eq!(owners.len(), 1);
let (dep_id, wire_bytes) = &descriptors[0];
let wire_payload = (codec.from_wire_bytes)(wire_bytes.as_slice());
let empty_deps: Arc<dyn crate::internal::DependencyView + Send + Sync> =
Arc::new(HashMap::<String, Arc<dyn Any + Send + Sync>>::new());
let reconstructed = match &worker_fn {
crate::internal::WorkerReconstructor::Sync(f) => f(wire_payload, empty_deps),
crate::internal::WorkerReconstructor::Async(_) => unreachable!(),
};
let applied = execution.provide_cloneable_value(dep_id, reconstructed);
assert!(applied);
let next = execution.pick_next_sync().expect("test picked");
let view = next.deps.get("hosted_dep").expect("dep available");
let value: Arc<u64> = view.clone().downcast::<u64>().unwrap();
assert_eq!(
*value,
!owner_value,
"Hosted dep must expose the worker-side handle (from_descriptor) even in the no-spawn-workers path"
);
assert_eq!(
owner_counter.load(Ordering::SeqCst),
1,
"owner constructor must have run exactly once during descriptor collection"
);
}
#[test]
fn hosted_dep_with_owner_dependencies_constructs_in_parent_context() {
let dep_counter = Arc::new(AtomicUsize::new(0));
let owner_counter = Arc::new(AtomicUsize::new(0));
let dep = registered_cloneable_dep("some_other_dep", dep_counter.clone());
let mut hosted = registered_hosted_dep("h_with_deps", 0, owner_counter.clone());
hosted.dependencies = vec!["some_other_dep".to_string()];
let test = registered_test("t1", vec!["h_with_deps".to_string()]);
let (execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep, hosted], &[test], &[]);
let collected = execution.collect_parent_shared_dependencies_sync();
assert_eq!(collected.hosted_descriptor_bytes.len(), 1);
assert_eq!(dep_counter.load(Ordering::SeqCst), 1);
assert_eq!(owner_counter.load(Ordering::SeqCst), 1);
}
#[cfg(feature = "tokio")]
#[test]
fn async_hosted_descriptor_collection_awaits_async_constructor() {
use std::pin::Pin;
let counter = Arc::new(AtomicUsize::new(0));
let constructor_counter = counter.clone();
let constructor = DependencyConstructor::Async(Arc::new(move |_view| {
let counter = constructor_counter.clone();
Box::pin(async move {
tokio::task::yield_now().await;
counter.fetch_add(1, Ordering::SeqCst);
let value: u64 = 42;
Arc::new(value) as Arc<dyn Any + Send + Sync>
}) as Pin<Box<dyn std::future::Future<Output = Arc<dyn Any + Send + Sync>>>>
}));
let codec = CloneableCodec {
to_wire: Arc::new(|any| {
let v: Arc<u64> = any.downcast::<u64>().unwrap();
(*v).to_le_bytes().to_vec()
}),
from_wire_bytes: Arc::new(|bytes| {
let boxed: Vec<u8> = bytes.to_vec();
Arc::new(boxed) as Arc<dyn Any + Send + Sync>
}),
};
let dep = RegisteredDependency {
name: "hosted_async".to_string(),
crate_name: "tcrate".to_string(),
module_path: String::new(),
constructor,
dependencies: Vec::new(),
scope: DepScope::Hosted,
worker_fn: Some(crate::internal::WorkerReconstructor::Sync(Arc::new(
|wire_payload, _| {
let bytes_arc: Arc<Vec<u8>> = wire_payload.downcast::<Vec<u8>>().unwrap();
let arr: [u8; 8] = (*bytes_arc).as_slice().try_into().unwrap();
let value: u64 = u64::from_le_bytes(arr);
Arc::new(value) as Arc<dyn Any + Send + Sync>
},
))),
cloneable_codec: None,
hosted_codec: Some(codec),
rpc_factory: None,
companions: Vec::new(),
};
let test = registered_test("t1", vec!["hosted_async".to_string()]);
let (execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let (descriptors, owners) = runtime.block_on(execution.collect_hosted_descriptor_bytes_async());
assert_eq!(descriptors.len(), 1);
assert_eq!(owners.len(), 1);
assert_eq!(descriptors[0].0, "tcrate::hosted_async");
assert_eq!(descriptors[0].1.as_slice(), &42_u64.to_le_bytes());
assert_eq!(counter.load(Ordering::SeqCst), 1);
let held: Arc<u64> = owners[0].clone().downcast::<u64>().unwrap();
assert_eq!(*held, 42);
}
use crate::internal::{
HostedRpcChannel, HostedRpcDep, HostedRpcError, HostedRpcOwnerCell, HostedRpcTransport,
InProcessHostedRpcTransport, RpcFactory,
};
struct RpcCounter {
n: u64,
}
impl HostedRpcDep for RpcCounter {
type Stub = RpcCounterStub;
fn dispatch(&mut self, method_idx: u32, args: &[u8]) -> Result<Vec<u8>, String> {
match method_idx {
1 => {
self.n += 1;
Ok(self.n.to_be_bytes().to_vec())
}
2 => {
let arr: [u8; 4] = args
.try_into()
.map_err(|_| "method_idx=2 requires exactly 4 bytes (size)".to_string())?;
let size = u32::from_be_bytes(arr) as usize;
let mut out = vec![0u8; size];
for (i, b) in out.iter_mut().enumerate() {
*b = (i % 251) as u8;
}
Ok(out)
}
other => Err(format!("RpcCounter: unknown method_idx {other}")),
}
}
fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
RpcCounterStub { channel }
}
}
struct RpcCounterStub {
channel: HostedRpcChannel,
}
impl RpcCounterStub {
fn next(&self) -> u64 {
let bytes = self.channel.call(1, Vec::new()).expect("rpc call");
let arr: [u8; 8] = bytes.as_slice().try_into().unwrap();
u64::from_be_bytes(arr)
}
fn echo(&self, size: u32) -> Vec<u8> {
self.channel
.call(2, size.to_be_bytes().to_vec())
.expect("echo rpc call")
}
}
fn registered_hosted_rpc_dep(
name: &str,
module_path: &str,
owner_counter: Arc<AtomicUsize>,
) -> RegisteredDependency {
let ctor_counter = owner_counter.clone();
let constructor = DependencyConstructor::Sync(Arc::new(move |_view| {
ctor_counter.fetch_add(1, Ordering::SeqCst);
let cell = HostedRpcOwnerCell::from_owner(RpcCounter { n: 0 });
Arc::new(cell) as Arc<dyn Any + Send + Sync>
}));
let factory = RpcFactory {
owner_into_cell: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
any.downcast::<HostedRpcOwnerCell>()
.expect("HostedRpc owner downcast")
}),
build_stub: Arc::new(|channel: HostedRpcChannel| {
let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
Arc::new(stub) as Arc<dyn Any + Send + Sync>
}),
};
RegisteredDependency {
name: name.to_string(),
crate_name: "tcrate".to_string(),
module_path: module_path.to_string(),
constructor,
dependencies: Vec::new(),
scope: DepScope::HostedRpc,
worker_fn: None,
cloneable_codec: None,
hosted_codec: None,
rpc_factory: Some(factory),
companions: Vec::new(),
}
}
#[test]
fn hosted_rpc_owner_cells_collected_once_and_keyed_by_qualified_id() {
let counter = Arc::new(AtomicUsize::new(0));
let dep = registered_hosted_rpc_dep("rpc_dep", "", counter.clone());
let test = registered_test("t1", vec!["rpc_dep".to_string()]);
let (execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
assert!(execution.has_hosted_rpc_dependencies());
let cells = execution.collect_hosted_rpc_owner_cells_sync();
assert_eq!(cells.len(), 1, "exactly one hosted rpc dep expected");
let (dep_id, _cell) = &cells[0];
assert_eq!(dep_id, "tcrate::rpc_dep");
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"owner constructor must run exactly once on the parent"
);
let cells_b = execution.collect_hosted_rpc_owner_cells_sync();
assert_eq!(cells_b.len(), 1);
assert_eq!(
counter.load(Ordering::SeqCst),
2,
"collect_hosted_rpc_owner_cells_sync runs the constructor on every call; \
callers (the runner) are responsible for only calling it once per suite"
);
}
#[test]
fn hosted_rpc_owner_dependencies_construct_in_parent_context() {
let parent_only_counter = Arc::new(AtomicUsize::new(0));
let owner_counter = Arc::new(AtomicUsize::new(0));
let parent_only_dep = registered_cloneable_dep("parent_only_dep", parent_only_counter.clone());
let mut rpc_dep = registered_hosted_rpc_dep("rpc_dep", "", owner_counter.clone());
rpc_dep.dependencies = vec!["parent_only_dep".to_string()];
let test = registered_test("t1", vec!["rpc_dep".to_string()]);
let (execution, _filtered) = TestSuiteExecution::construct(
&Arguments::default(),
&[parent_only_dep, rpc_dep],
&[test],
&[],
);
let cells = execution.collect_hosted_rpc_owner_cells_sync();
assert_eq!(cells.len(), 1);
assert_eq!(parent_only_counter.load(Ordering::SeqCst), 1);
assert_eq!(owner_counter.load(Ordering::SeqCst), 1);
}
#[test]
fn hosted_rpc_in_process_transport_routes_to_owner_cell() {
let counter = Arc::new(AtomicUsize::new(0));
let dep = registered_hosted_rpc_dep("rpc_dep", "", counter.clone());
let test = registered_test("t1", vec!["rpc_dep".to_string()]);
let (execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
.collect_hosted_rpc_owner_cells_sync()
.into_iter()
.collect();
let transport: Arc<dyn HostedRpcTransport> =
Arc::new(InProcessHostedRpcTransport::new(cells.clone()));
let channel = HostedRpcChannel::new("tcrate::rpc_dep".to_string(), transport.clone());
let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
assert_eq!(stub.next(), 1);
assert_eq!(stub.next(), 2);
assert_eq!(stub.next(), 3);
}
#[test]
fn hosted_rpc_in_process_transport_returns_dispatch_error_on_unknown_method() {
let counter = Arc::new(AtomicUsize::new(0));
let dep = registered_hosted_rpc_dep("rpc_dep", "", counter);
let test = registered_test("t1", vec!["rpc_dep".to_string()]);
let (execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
.collect_hosted_rpc_owner_cells_sync()
.into_iter()
.collect();
let transport: Arc<dyn HostedRpcTransport> =
Arc::new(InProcessHostedRpcTransport::new(cells.clone()));
let channel = HostedRpcChannel::new("tcrate::rpc_dep".to_string(), transport.clone());
let err = channel.call(999, Vec::new()).unwrap_err();
match err {
HostedRpcError::Dispatch(msg) => {
assert!(
msg.contains("unknown method_idx 999"),
"expected dispatch error to mention method_idx, got '{msg}'"
);
}
HostedRpcError::Transport(msg) => {
panic!("expected Dispatch error, got Transport({msg})");
}
}
}
#[test]
fn hosted_rpc_in_process_transport_returns_transport_error_on_unknown_dep_id() {
let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = HashMap::new();
let transport: Arc<dyn HostedRpcTransport> = Arc::new(InProcessHostedRpcTransport::new(cells));
let channel = HostedRpcChannel::new("tcrate::missing_dep".to_string(), transport.clone());
let err = channel.call(1, Vec::new()).unwrap_err();
match err {
HostedRpcError::Transport(msg) => {
assert!(
msg.contains("unknown dep id 'tcrate::missing_dep'"),
"expected transport error to mention dep id, got '{msg}'"
);
}
HostedRpcError::Dispatch(msg) => {
panic!("expected Transport error, got Dispatch({msg})");
}
}
}
struct PanickingRpcOwner;
impl HostedRpcDep for PanickingRpcOwner {
type Stub = RpcCounterStub;
fn dispatch(&mut self, _method_idx: u32, _args: &[u8]) -> Result<Vec<u8>, String> {
panic!("owner_panic_for_test");
}
fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
RpcCounterStub { channel }
}
}
#[cfg(feature = "tokio")]
struct AsyncRpcCounter {
n: u64,
}
#[cfg(feature = "tokio")]
impl crate::internal::AsyncHostedRpcDep for AsyncRpcCounter {
type Stub = RpcCounterStub;
async fn dispatch(&mut self, method_idx: u32, _args: &[u8]) -> Result<Vec<u8>, String> {
::tokio::task::yield_now().await;
if method_idx == 1 {
self.n += 1;
Ok(self.n.to_be_bytes().to_vec())
} else {
Err(format!("AsyncRpcCounter: unknown method_idx {method_idx}"))
}
}
fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
RpcCounterStub { channel }
}
}
#[cfg(feature = "tokio")]
struct PanickingAsyncRpcOwner;
#[cfg(feature = "tokio")]
impl crate::internal::AsyncHostedRpcDep for PanickingAsyncRpcOwner {
type Stub = RpcCounterStub;
async fn dispatch(&mut self, _method_idx: u32, _args: &[u8]) -> Result<Vec<u8>, String> {
::tokio::task::yield_now().await;
panic!("async_owner_panic_for_test");
}
fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
RpcCounterStub { channel }
}
}
#[cfg(feature = "tokio")]
#[test]
fn async_hosted_rpc_owner_dispatches_through_async_cell() {
let cell = HostedRpcOwnerCell::from_async_owner(AsyncRpcCounter { n: 0 });
let rt = ::tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("build tokio runtime");
let bytes_a = rt
.block_on(cell.dispatch_async(1, &[]))
.expect("first async dispatch must succeed");
assert_eq!(bytes_a, 1u64.to_be_bytes().to_vec());
let bytes_b = rt
.block_on(cell.dispatch_async(1, &[]))
.expect("second async dispatch must succeed");
assert_eq!(bytes_b, 2u64.to_be_bytes().to_vec());
}
#[cfg(feature = "tokio")]
#[test]
fn async_hosted_rpc_owner_panic_surfaces_then_poisons() {
let cell = HostedRpcOwnerCell::from_async_owner(PanickingAsyncRpcOwner);
let rt = ::tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("build tokio runtime");
let err1 = rt
.block_on(cell.dispatch_async(1, &[]))
.expect_err("first async dispatch must surface the panic as Err");
assert!(
err1.contains("hosted rpc owner panicked: async_owner_panic_for_test"),
"expected first-call error to wrap the async panic payload, got '{err1}'"
);
let err2 = rt
.block_on(cell.dispatch_async(1, &[]))
.expect_err("second async dispatch must short-circuit on the poisoned cell");
assert_eq!(
err2, "hosted rpc owner poisoned",
"expected poisoned-cell error on the second async call, got '{err2}'"
);
}
#[cfg(feature = "tokio")]
#[test]
fn async_hosted_rpc_owner_poison_blocks_concurrent_waiter() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
struct OnePanicThenForbidden {
entries: Arc<AtomicUsize>,
}
impl crate::internal::AsyncHostedRpcDep for OnePanicThenForbidden {
type Stub = RpcCounterStub;
async fn dispatch(&mut self, _method_idx: u32, _args: &[u8]) -> Result<Vec<u8>, String> {
let n = self.entries.fetch_add(1, Ordering::SeqCst);
if n == 0 {
::tokio::time::sleep(Duration::from_millis(50)).await;
panic!("first_dispatch_panic_poison_race");
}
panic!("second_dispatch_unexpectedly_re_entered_after_poison");
}
fn build_stub(channel: HostedRpcChannel) -> Self::Stub {
RpcCounterStub { channel }
}
}
let entries = Arc::new(AtomicUsize::new(0));
let cell = Arc::new(HostedRpcOwnerCell::from_async_owner(
OnePanicThenForbidden {
entries: entries.clone(),
},
));
let rt = ::tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("build tokio runtime");
rt.block_on(async {
let cell_a = cell.clone();
let cell_b = cell.clone();
let first = ::tokio::spawn(async move { cell_a.dispatch_async(1, &[]).await });
::tokio::time::sleep(Duration::from_millis(5)).await;
let second = ::tokio::spawn(async move { cell_b.dispatch_async(1, &[]).await });
let first_res = first.await.expect("first task must not be cancelled");
let second_res = second.await.expect("second task must not be cancelled");
let first_err =
first_res.expect_err("first dispatch must surface the panic as Err, not Ok");
assert!(
first_err.contains("hosted rpc owner panicked: first_dispatch_panic_poison_race"),
"expected the first call to surface the panic; got '{first_err}'"
);
let second_err = second_res
.expect_err("second dispatch must short-circuit on the poisoned cell, not Ok");
assert_eq!(
second_err, "hosted rpc owner poisoned",
"expected the second waiter to see the poison flag; got '{second_err}'"
);
});
assert_eq!(
entries.load(Ordering::SeqCst),
1,
"owner dispatcher must run at most once across the poisoned pair"
);
}
#[cfg(feature = "tokio")]
#[test]
fn async_hosted_rpc_dispatch_blocking_drives_async_cell() {
let cell = HostedRpcOwnerCell::from_async_owner(AsyncRpcCounter { n: 0 });
let rt = ::tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("build tokio runtime");
let bytes = rt
.block_on(async {
::tokio::task::spawn_blocking(move || cell.dispatch_blocking(1, &[])).await
})
.expect("spawn_blocking joined")
.expect("dispatch_blocking must succeed against an async cell on multi-thread rt");
assert_eq!(bytes, 1u64.to_be_bytes().to_vec());
}
#[cfg(feature = "tokio")]
#[test]
fn async_hosted_rpc_dispatch_blocking_rejects_current_thread_runtime() {
let cell = HostedRpcOwnerCell::from_async_owner(AsyncRpcCounter { n: 0 });
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build current-thread tokio runtime");
let err = rt
.block_on(async { cell.dispatch_blocking(1, &[]) })
.expect_err("dispatch_blocking must reject current-thread runtimes cleanly");
assert!(
err.contains("multi-threaded"),
"expected the rejection error to mention multi-threaded requirement, got '{err}'"
);
}
#[cfg(feature = "tokio")]
#[test]
fn async_hosted_rpc_in_process_transport_routes_to_async_cell() {
use std::collections::HashMap;
use std::sync::Arc;
let dep_id = "in_process_async_owner".to_string();
let cell = Arc::new(HostedRpcOwnerCell::from_async_owner(AsyncRpcCounter {
n: 0,
}));
let mut cells = HashMap::new();
cells.insert(dep_id.clone(), cell);
let transport: Arc<dyn crate::internal::HostedRpcTransport> =
Arc::new(InProcessHostedRpcTransport::new(cells));
let rt = ::tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("build tokio runtime");
let transport_clone = transport.clone();
let dep_id_clone = dep_id.clone();
let bytes = rt
.block_on(async move {
::tokio::task::spawn_blocking(move || transport_clone.call(&dep_id_clone, 1, vec![]))
.await
.expect("spawn_blocking joined")
})
.expect("first in-process dispatch must succeed");
assert_eq!(bytes, 1u64.to_be_bytes().to_vec());
let transport_clone = transport.clone();
let dep_id_clone = dep_id.clone();
let bytes2 = rt
.block_on(async move {
::tokio::task::spawn_blocking(move || transport_clone.call(&dep_id_clone, 1, vec![]))
.await
.expect("spawn_blocking joined")
})
.expect("second in-process dispatch must succeed");
assert_eq!(bytes2, 2u64.to_be_bytes().to_vec());
}
#[test]
fn hosted_rpc_owner_panic_surfaces_then_poisons() {
let cell = HostedRpcOwnerCell::from_owner(PanickingRpcOwner);
let err1 = cell
.dispatch(1, &[])
.expect_err("first call must surface the panic as Err");
assert!(
err1.contains("hosted rpc owner panicked: owner_panic_for_test"),
"expected first-call error to wrap the panic payload, got '{err1}'"
);
let err2 = cell
.dispatch(1, &[])
.expect_err("second call must short-circuit on the poisoned cell");
assert_eq!(
err2, "hosted rpc owner poisoned",
"expected poisoned-cell error on the second call, got '{err2}'"
);
}
#[test]
fn hosted_rpc_in_process_transport_round_trips_large_payload_exceeding_64_kib() {
let counter = Arc::new(AtomicUsize::new(0));
let dep = registered_hosted_rpc_dep("rpc_dep", "", counter);
let test = registered_test("t1", vec!["rpc_dep".to_string()]);
let (execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
.collect_hosted_rpc_owner_cells_sync()
.into_iter()
.collect();
let transport: Arc<dyn HostedRpcTransport> = Arc::new(InProcessHostedRpcTransport::new(cells));
let channel = HostedRpcChannel::new("tcrate::rpc_dep".to_string(), transport);
let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
const SIZE: u32 = 256 * 1024; let bytes = stub.echo(SIZE);
assert_eq!(
bytes.len(),
SIZE as usize,
"framing dropped/truncated bytes"
);
for (i, b) in bytes.iter().enumerate() {
assert_eq!(
*b,
(i % 251) as u8,
"framing corrupted byte at index {i}: expected {}, got {b}",
(i % 251) as u8
);
}
}
#[test]
fn hosted_rpc_in_process_transport_multiplexes_concurrent_calls_from_threads() {
use std::thread;
let counter = Arc::new(AtomicUsize::new(0));
let dep = registered_hosted_rpc_dep("rpc_dep", "", counter);
let test = registered_test("t1", vec!["rpc_dep".to_string()]);
let (execution, _filtered) =
TestSuiteExecution::construct(&Arguments::default(), &[dep], &[test], &[]);
let cells: HashMap<String, Arc<HostedRpcOwnerCell>> = execution
.collect_hosted_rpc_owner_cells_sync()
.into_iter()
.collect();
let transport: Arc<dyn HostedRpcTransport> = Arc::new(InProcessHostedRpcTransport::new(cells));
const N: usize = 4;
const M: usize = 32;
let mut handles = Vec::new();
for _ in 0..N {
let dep_id = "tcrate::rpc_dep".to_string();
let transport = transport.clone();
handles.push(thread::spawn(move || {
let channel = HostedRpcChannel::new(dep_id, transport);
let stub = <RpcCounter as HostedRpcDep>::build_stub(channel);
let mut ids = Vec::with_capacity(M);
for _ in 0..M {
ids.push(stub.next());
}
ids
}));
}
let mut all = Vec::with_capacity(N * M);
for h in handles {
all.extend(h.join().expect("thread panicked"));
}
all.sort();
let mut prev: u64 = 0;
for id in &all {
assert!(
*id > prev,
"duplicate or non-monotonic id {id} after {prev}"
);
prev = *id;
}
assert_eq!(
all.len(),
N * M,
"expected exactly {} ids in total, got {}",
N * M,
all.len()
);
}