pub mod args;
pub mod bench;
mod execution;
pub mod internal;
mod ipc;
mod output;
mod panic_hook;
pub mod spawn;
mod stats;
#[cfg(feature = "tokio")]
mod tokio;
pub mod worker;
#[allow(dead_code)]
mod sync;
#[cfg(not(feature = "tokio"))]
pub use sync::test_runner;
#[cfg(feature = "tokio")]
pub use tokio::test_runner;
pub use worker::worker_index;
#[doc(hidden)]
pub use desert_rust;
#[doc(hidden)]
#[cfg(feature = "tokio")]
pub fn __test_r_make_hosted_codec<T>() -> internal::CloneableCodec
where
T: internal::AsyncHostedDep,
{
use std::sync::Arc;
internal::CloneableCodec {
to_wire: Arc::new(|any: Arc<dyn std::any::Any + Send + Sync>| {
let value: Arc<T> = any
.downcast::<T>()
.expect("Hosted dependency type mismatch in descriptor()");
<T as internal::AsyncHostedDep>::descriptor(&*value)
}),
from_wire_bytes: Arc::new(|bytes: &[u8]| {
let boxed: Arc<dyn std::any::Any + Send + Sync> = Arc::new(bytes.to_vec());
boxed
}),
}
}
#[doc(hidden)]
#[cfg(not(feature = "tokio"))]
pub fn __test_r_make_hosted_codec<T>() -> internal::CloneableCodec
where
T: internal::HostedDep,
{
use std::sync::Arc;
internal::CloneableCodec {
to_wire: Arc::new(|any: Arc<dyn std::any::Any + Send + Sync>| {
let value: Arc<T> = any
.downcast::<T>()
.expect("Hosted dependency type mismatch in descriptor()");
<T as internal::HostedDep>::descriptor(&*value)
}),
from_wire_bytes: Arc::new(|bytes: &[u8]| {
let boxed: Arc<dyn std::any::Any + Send + Sync> = Arc::new(bytes.to_vec());
boxed
}),
}
}
#[doc(hidden)]
#[cfg(feature = "tokio")]
pub fn __test_r_make_hosted_worker_reconstructor<T>() -> internal::WorkerReconstructor
where
T: internal::AsyncHostedDep,
{
use std::sync::Arc;
internal::WorkerReconstructor::Async(Arc::new(
|wire_payload: Arc<dyn std::any::Any + Send + Sync>, _deps| {
Box::pin(async move {
let bytes: Arc<Vec<u8>> = wire_payload
.downcast::<Vec<u8>>()
.expect("Hosted worker reconstructor expected Vec<u8> descriptor payload");
let value: T = <T as internal::AsyncHostedDep>::from_descriptor(&bytes).await;
let boxed: Arc<dyn std::any::Any + Send + Sync> = Arc::new(value);
boxed
})
},
))
}
#[doc(hidden)]
#[cfg(not(feature = "tokio"))]
pub fn __test_r_make_hosted_worker_reconstructor<T>() -> internal::WorkerReconstructor
where
T: internal::HostedDep,
{
use std::sync::Arc;
internal::WorkerReconstructor::Sync(Arc::new(
|wire_payload: Arc<dyn std::any::Any + Send + Sync>, _deps| {
let bytes: Arc<Vec<u8>> = wire_payload
.downcast::<Vec<u8>>()
.expect("Hosted worker reconstructor expected Vec<u8> descriptor payload");
let value: T = <T as internal::HostedDep>::from_descriptor(&bytes);
let boxed: Arc<dyn std::any::Any + Send + Sync> = Arc::new(value);
boxed
},
))
}
#[doc(hidden)]
#[cfg(feature = "tokio")]
pub fn __test_r_make_hosted_both_shared<T>(owner: T) -> internal::HostedBothShared
where
T: internal::AsyncHostedDep + internal::AsyncHostedRpcDep,
{
use std::sync::Arc;
let descriptor_bytes = <T as internal::AsyncHostedDep>::descriptor(&owner);
let rpc_cell = Arc::new(internal::HostedRpcOwnerCell::from_async_owner(owner));
internal::HostedBothShared::new(descriptor_bytes, rpc_cell)
}
#[doc(hidden)]
#[cfg(not(feature = "tokio"))]
pub fn __test_r_make_hosted_both_shared<T>(owner: T) -> internal::HostedBothShared
where
T: internal::HostedDep + internal::HostedRpcDep,
{
use std::sync::Arc;
let descriptor_bytes = <T as internal::HostedDep>::descriptor(&owner);
let rpc_cell = Arc::new(internal::HostedRpcOwnerCell::from_owner(owner));
internal::HostedBothShared::new(descriptor_bytes, rpc_cell)
}
#[doc(hidden)]
#[cfg(feature = "tokio")]
pub fn __test_r_make_hosted_rpc_cell<T>(owner: T) -> internal::HostedRpcOwnerCell
where
T: internal::AsyncHostedRpcDep,
{
internal::HostedRpcOwnerCell::from_async_owner(owner)
}
#[doc(hidden)]
#[cfg(not(feature = "tokio"))]
pub fn __test_r_make_hosted_rpc_cell<T>(owner: T) -> internal::HostedRpcOwnerCell
where
T: internal::HostedRpcDep,
{
internal::HostedRpcOwnerCell::from_owner(owner)
}
#[doc(hidden)]
pub fn __test_r_make_hosted_both_codec() -> internal::CloneableCodec {
use std::any::Any;
use std::sync::Arc;
internal::CloneableCodec {
to_wire: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
let shared: Arc<internal::HostedBothShared> = any
.downcast::<internal::HostedBothShared>()
.expect("HostedBothShared downcast failed in both-codec to_wire");
shared.descriptor_bytes().to_vec()
}),
from_wire_bytes: Arc::new(|bytes: &[u8]| {
let boxed: Arc<dyn Any + Send + Sync> = Arc::new(bytes.to_vec());
boxed
}),
}
}
#[doc(hidden)]
#[cfg(feature = "tokio")]
pub fn __test_r_make_hosted_both_rpc_factory<T, Stub>() -> internal::RpcFactory
where
T: internal::AsyncHostedRpcDep<Stub = Stub>,
Stub: Send + Sync + 'static,
{
use std::any::Any;
use std::sync::Arc;
internal::RpcFactory {
owner_into_cell: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
let shared: Arc<internal::HostedBothShared> = any
.downcast::<internal::HostedBothShared>()
.expect("HostedBothShared downcast failed in both-rpc-factory owner_into_cell");
shared.rpc_cell()
}),
build_stub: Arc::new(|channel: internal::HostedRpcChannel| {
let stub: Stub = <T as internal::AsyncHostedRpcDep>::build_stub(channel);
let boxed: Arc<dyn Any + Send + Sync> = Arc::new(stub);
boxed
}),
}
}
#[doc(hidden)]
#[cfg(not(feature = "tokio"))]
pub fn __test_r_make_hosted_both_rpc_factory<T, Stub>() -> internal::RpcFactory
where
T: internal::HostedRpcDep<Stub = Stub>,
Stub: Send + Sync + 'static,
{
use std::any::Any;
use std::sync::Arc;
internal::RpcFactory {
owner_into_cell: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
let shared: Arc<internal::HostedBothShared> = any
.downcast::<internal::HostedBothShared>()
.expect("HostedBothShared downcast failed in both-rpc-factory owner_into_cell");
shared.rpc_cell()
}),
build_stub: Arc::new(|channel: internal::HostedRpcChannel| {
let stub: Stub = <T as internal::HostedRpcDep>::build_stub(channel);
let boxed: Arc<dyn Any + Send + Sync> = Arc::new(stub);
boxed
}),
}
}
#[doc(hidden)]
#[cfg(feature = "tokio")]
pub fn __test_r_make_hosted_rpc_factory<T, Stub>() -> internal::RpcFactory
where
T: internal::AsyncHostedRpcDep<Stub = Stub>,
Stub: Send + Sync + 'static,
{
use std::any::Any;
use std::sync::Arc;
internal::RpcFactory {
owner_into_cell: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
any.downcast::<internal::HostedRpcOwnerCell>()
.expect("HostedRpc owner downcast to HostedRpcOwnerCell failed")
}),
build_stub: Arc::new(|channel: internal::HostedRpcChannel| {
let stub: Stub = <T as internal::AsyncHostedRpcDep>::build_stub(channel);
let boxed: Arc<dyn Any + Send + Sync> = Arc::new(stub);
boxed
}),
}
}
#[doc(hidden)]
#[cfg(not(feature = "tokio"))]
pub fn __test_r_make_hosted_rpc_factory<T, Stub>() -> internal::RpcFactory
where
T: internal::HostedRpcDep<Stub = Stub>,
Stub: Send + Sync + 'static,
{
use std::any::Any;
use std::sync::Arc;
internal::RpcFactory {
owner_into_cell: Arc::new(|any: Arc<dyn Any + Send + Sync>| {
any.downcast::<internal::HostedRpcOwnerCell>()
.expect("HostedRpc owner downcast to HostedRpcOwnerCell failed")
}),
build_stub: Arc::new(|channel: internal::HostedRpcChannel| {
let stub: Stub = <T as internal::HostedRpcDep>::build_stub(channel);
let boxed: Arc<dyn Any + Send + Sync> = Arc::new(stub);
boxed
}),
}
}
#[cfg(test)]
mod hosted_helper_tests {
use super::*;
use std::any::Any;
use std::sync::Arc;
#[derive(Debug, PartialEq, Eq)]
struct Fixture {
bytes: Vec<u8>,
}
impl internal::HostedDep for Fixture {
fn descriptor(&self) -> Vec<u8> {
self.bytes.clone()
}
fn from_descriptor(bytes: &[u8]) -> Self {
Self {
bytes: bytes.to_vec(),
}
}
}
#[test]
fn make_hosted_codec_round_trips_descriptor_bytes() {
let codec = __test_r_make_hosted_codec::<Fixture>();
let owner: Arc<dyn Any + Send + Sync> = Arc::new(Fixture {
bytes: vec![1, 2, 3, 4],
});
let wire_bytes = (codec.to_wire)(owner);
assert_eq!(wire_bytes, vec![1, 2, 3, 4]);
let wire_payload = (codec.from_wire_bytes)(&wire_bytes);
let recovered_bytes: Arc<Vec<u8>> = wire_payload
.downcast::<Vec<u8>>()
.expect("from_wire_bytes must produce Arc<Vec<u8>>");
assert_eq!(*recovered_bytes, vec![1, 2, 3, 4]);
}
#[cfg(feature = "tokio")]
#[test]
fn make_hosted_worker_reconstructor_is_async_under_tokio() {
let recon = __test_r_make_hosted_worker_reconstructor::<Fixture>();
match recon {
internal::WorkerReconstructor::Async(_) => {}
internal::WorkerReconstructor::Sync(_) => panic!(
"tokio build must produce a WorkerReconstructor::Async for Hosted deps; got Sync"
),
}
}
#[cfg(not(feature = "tokio"))]
#[test]
fn make_hosted_worker_reconstructor_is_sync_under_sync_runtime() {
let recon = __test_r_make_hosted_worker_reconstructor::<Fixture>();
match recon {
internal::WorkerReconstructor::Sync(_) => {}
internal::WorkerReconstructor::Async(_) => panic!(
"sync build must produce a WorkerReconstructor::Sync for Hosted deps; got Async"
),
}
}
#[cfg(not(feature = "tokio"))]
#[test]
fn sync_worker_reconstructor_rebuilds_fixture_from_descriptor() {
#[derive(Debug)]
struct EmptyView;
impl internal::DependencyView for EmptyView {
fn get(&self, _name: &str) -> Option<Arc<dyn Any + Send + Sync>> {
None
}
}
let codec = __test_r_make_hosted_codec::<Fixture>();
let recon = __test_r_make_hosted_worker_reconstructor::<Fixture>();
let owner: Arc<dyn Any + Send + Sync> = Arc::new(Fixture {
bytes: vec![5, 6, 7],
});
let wire_bytes = (codec.to_wire)(owner);
let payload = (codec.from_wire_bytes)(&wire_bytes);
let deps: Arc<dyn internal::DependencyView + Send + Sync> = Arc::new(EmptyView);
let rebuilt = match recon {
internal::WorkerReconstructor::Sync(f) => f(payload, deps),
internal::WorkerReconstructor::Async(_) => unreachable!(
"sync build cannot return Async; pinned by \
make_hosted_worker_reconstructor_is_sync_under_sync_runtime",
),
};
let rebuilt: Arc<Fixture> = rebuilt
.downcast::<Fixture>()
.expect("worker reconstructor must produce the original Hosted dep type");
assert_eq!(
*rebuilt,
Fixture {
bytes: vec![5, 6, 7]
}
);
}
#[derive(Debug)]
struct BothFixture {
bytes: Vec<u8>,
counter: std::sync::Mutex<u64>,
}
impl BothFixture {
fn new(bytes: Vec<u8>) -> Self {
Self {
bytes,
counter: std::sync::Mutex::new(0),
}
}
}
impl internal::HostedDep for BothFixture {
fn descriptor(&self) -> Vec<u8> {
self.bytes.clone()
}
fn from_descriptor(bytes: &[u8]) -> Self {
Self::new(bytes.to_vec())
}
}
pub struct BothStub {
_channel: internal::HostedRpcChannel,
}
impl internal::HostedRpcDep for BothFixture {
type Stub = BothStub;
fn dispatch(&mut self, method_idx: u32, _args: &[u8]) -> Result<Vec<u8>, String> {
if method_idx == 1 {
let mut g = self.counter.lock().map_err(|e| e.to_string())?;
*g += 1;
Ok(g.to_be_bytes().to_vec())
} else {
Err(format!("BothFixture: unknown method_idx {method_idx}"))
}
}
fn build_stub(channel: internal::HostedRpcChannel) -> Self::Stub {
BothStub { _channel: channel }
}
}
#[test]
fn make_hosted_both_shared_captures_descriptor_bytes() {
let owner = BothFixture::new(vec![10, 20, 30]);
let shared = __test_r_make_hosted_both_shared::<BothFixture>(owner);
assert_eq!(shared.descriptor_bytes(), &[10, 20, 30]);
let reply =
dispatch_cell_for_test(&shared.rpc_cell(), 1, &[]).expect("dispatch must succeed");
assert_eq!(reply, 1u64.to_be_bytes().to_vec());
}
fn dispatch_cell_for_test(
cell: &internal::HostedRpcOwnerCell,
method_idx: u32,
args: &[u8],
) -> Result<Vec<u8>, String> {
#[cfg(feature = "tokio")]
{
let rt = ::tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("build tokio runtime");
rt.block_on(cell.dispatch_async(method_idx, args))
}
#[cfg(not(feature = "tokio"))]
{
cell.dispatch(method_idx, args)
}
}
#[test]
fn make_hosted_both_codec_serializes_descriptor_bytes() {
let codec = __test_r_make_hosted_both_codec();
let shared: Arc<internal::HostedBothShared> =
Arc::new(__test_r_make_hosted_both_shared::<BothFixture>(
BothFixture::new(vec![1, 2, 3, 4]),
));
let arc_any: Arc<dyn Any + Send + Sync> = shared;
let wire_bytes = (codec.to_wire)(arc_any);
assert_eq!(wire_bytes, vec![1, 2, 3, 4]);
let wire_payload = (codec.from_wire_bytes)(&wire_bytes);
let recovered_bytes: Arc<Vec<u8>> = wire_payload
.downcast::<Vec<u8>>()
.expect("from_wire_bytes must produce Arc<Vec<u8>>");
assert_eq!(*recovered_bytes, vec![1, 2, 3, 4]);
}
#[test]
fn make_hosted_both_rpc_factory_extracts_owner_cell() {
let factory = __test_r_make_hosted_both_rpc_factory::<BothFixture, BothStub>();
let shared: Arc<internal::HostedBothShared> = Arc::new(__test_r_make_hosted_both_shared::<
BothFixture,
>(BothFixture::new(vec![])));
let arc_any: Arc<dyn Any + Send + Sync> = shared.clone();
let cell = (factory.owner_into_cell)(arc_any);
assert!(
Arc::ptr_eq(&cell, &shared.rpc_cell()),
"factory must return the exact same inner HostedRpcOwnerCell Arc the \
shared cell holds; otherwise the RPC view would dispatch against a \
different owner than the descriptor view captured"
);
let reply = dispatch_cell_for_test(&cell, 1, &[]).expect("dispatch must succeed");
assert_eq!(reply, 1u64.to_be_bytes().to_vec());
}
#[test]
fn make_hosted_both_rpc_factory_builds_stub() {
use internal::{HostedRpcChannel, HostedRpcError, HostedRpcTransport};
struct DummyTransport;
impl HostedRpcTransport for DummyTransport {
fn call(
&self,
_dep_id: &str,
_method_idx: u32,
_args: Vec<u8>,
) -> Result<Vec<u8>, HostedRpcError> {
Ok(Vec::new())
}
}
let factory = __test_r_make_hosted_both_rpc_factory::<BothFixture, BothStub>();
let transport: Arc<dyn HostedRpcTransport> = Arc::new(DummyTransport);
let channel = HostedRpcChannel::new("test::both_fixture".to_string(), transport);
let stub_any: Arc<dyn Any + Send + Sync> = (factory.build_stub)(channel);
let _stub: Arc<BothStub> = stub_any
.downcast::<BothStub>()
.expect("build_stub must produce the BothFixture::Stub type");
}
}