use dyn_hash::DynHash;
use irys::*;
use serde::Serialize;
use std::any::Any;
use std::fmt;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::sync::{mpsc, Arc};
use std::thread;
struct CoreRegistry;
struct ObservabilityRegistry;
trait DynCloneAny: Send + Sync {
fn clone_boxed(&self) -> Box<dyn Any + Send + Sync>;
}
impl<T: Clone + Send + Sync + 'static> DynCloneAny for T {
fn clone_boxed(&self) -> Box<dyn Any + Send + Sync> { Box::new(self.clone()) }
}
struct SerializeCap;
impl Capability for SerializeCap { type Handle = dyn erased_serde::Serialize + Send + Sync; }
struct DisplayCap;
impl Capability for DisplayCap { type Handle = dyn fmt::Display + Send + Sync; }
struct DebugCap;
impl Capability for DebugCap { type Handle = dyn fmt::Debug + Send + Sync; }
struct ErrorCap;
impl Capability for ErrorCap { type Handle = dyn std::error::Error + Send + Sync; }
struct CloneCap;
impl Capability for CloneCap { type Handle = dyn DynCloneAny; }
struct HashCap;
impl Capability for HashCap { type Handle = dyn DynHash + Send + Sync; }
register_capability! { registry: CoreRegistry, slot: 0, cap: SerializeCap, trait_bound: erased_serde::Serialize + Send + Sync }
register_capability! { registry: CoreRegistry, slot: 1, cap: DisplayCap, trait_bound: fmt::Display + Send + Sync }
register_capability! { registry: CoreRegistry, slot: 2, cap: DebugCap, trait_bound: fmt::Debug + Send + Sync }
register_capability! { registry: CoreRegistry, slot: 3, cap: CloneCap, trait_bound: DynCloneAny }
register_capability! { registry: ObservabilityRegistry, slot: 0, cap: ErrorCap, trait_bound: std::error::Error + Send + Sync }
register_capability! { registry: ObservabilityRegistry, slot: 1, cap: HashCap, trait_bound: DynHash + Send + Sync }
macro_rules! bus_reflect {
($msg:expr) => {
reflect!($msg, [
{ registry: CoreRegistry, slots: 0..4 },
{ registry: ObservabilityRegistry, slots: 0..2 },
])
}
}
#[derive(Debug, Clone, Hash, Serialize)]
struct UserLoginEvent { user_id: u64, ip: String, success: bool }
impl fmt::Display for UserLoginEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Login {} for user #{}", if self.success { "OK" } else { "FAILED" }, self.user_id)
}
}
#[derive(Debug, Clone, Serialize)]
struct AuthError { user_id: u64, reason: String }
impl fmt::Display for AuthError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Auth failed for user {}: {}", self.user_id, self.reason)
}
}
impl std::error::Error for AuthError {}
#[derive(Debug, Serialize)]
struct MetricEvent { name: String, value: f64 }
struct OpaqueBlob { _bytes: Vec<u8> }
#[test]
fn test_event_bus_routing() {
let (tx, rx) = mpsc::channel::<Envelope>();
let router = thread::spawn(move || {
let mut serialized = 0u32;
let mut errors = 0u32;
let mut dropped = 0u32;
while let Ok(envelope) = rx.recv() {
if envelope.capability_count() == 0 {
dropped += 1;
continue;
}
if envelope.has::<ErrorCap>() { errors += 1; }
if envelope.has::<SerializeCap>() { serialized += 1; }
}
(serialized, errors, dropped)
});
tx.send(bus_reflect!(UserLoginEvent { user_id: 1, ip: "x".into(), success: true })).unwrap();
tx.send(bus_reflect!(AuthError { user_id: 2, reason: "bad password".into() })).unwrap();
tx.send(bus_reflect!(MetricEvent { name: "cpu".into(), value: 72.0 })).unwrap();
tx.send(bus_reflect!(OpaqueBlob { _bytes: vec![0xFF] })).unwrap();
drop(tx);
let (serialized, errors, dropped) = router.join().unwrap();
assert_eq!(serialized, 3); assert_eq!(errors, 1); assert_eq!(dropped, 1); }
#[test]
fn test_arc_fanout_multiple_readers() {
let envelope = Arc::new(bus_reflect!(UserLoginEvent {
user_id: 42, ip: "10.0.0.1".into(), success: false
}));
let handles: Vec<_> = (0..5).map(|_| {
let env = Arc::clone(&envelope);
thread::spawn(move || {
let json = serde_json::to_string(env.get::<SerializeCap>().unwrap()).unwrap();
assert!(json.contains("42"));
let mut hasher = DefaultHasher::new();
env.get::<HashCap>().unwrap().dyn_hash(&mut hasher);
hasher.finish()
})
}).collect();
let hashes: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
assert!(hashes.windows(2).all(|w| w[0] == w[1]));
}
#[test]
fn test_clone_and_reserialize() {
let envelope = bus_reflect!(UserLoginEvent { user_id: 99, ip: "y".into(), success: true });
let json_orig = serde_json::to_string(envelope.get::<SerializeCap>().unwrap()).unwrap();
let cloned_data = envelope.get::<CloneCap>().unwrap().clone_boxed();
let cloned_env = Envelope::from_raw(cloned_data, envelope.caps().clone());
let json_clone = serde_json::to_string(cloned_env.get::<SerializeCap>().unwrap()).unwrap();
assert_eq!(json_orig, json_clone);
}
#[test]
fn test_priority_routing_and_dead_letter() {
let events: Vec<Envelope> = vec![
bus_reflect!(AuthError { user_id: 1, reason: "hack".into() }),
bus_reflect!(UserLoginEvent { user_id: 2, ip: "x".into(), success: true }),
bus_reflect!(MetricEvent { name: "req".into(), value: 100.0 }),
bus_reflect!(OpaqueBlob { _bytes: vec![] }),
];
let (mut fast, mut batch, mut dlq) = (vec![], vec![], vec![]);
for env in events {
if env.capability_count() == 0 { dlq.push(env); }
else if env.has::<ErrorCap>() { fast.push(env); }
else { batch.push(env); }
}
assert_eq!(fast.len(), 1);
assert_eq!(batch.len(), 2);
assert_eq!(dlq.len(), 1);
}
#[test]
fn test_generic_iterator() {
use std::marker::PhantomData;
struct IterCap<I>(PhantomData<I>);
impl<I: 'static> Capability for IterCap<I> {
type Handle = dyn Iterator<Item = I>;
}
register_capability! {
slot: 0, cap: IterCap<I>,
trait_bound: Iterator<Item = I>,
generics: [I: 'static],
}
let mut envelope = reflect!(vec!["a", "b", "c"].into_iter());
assert!(envelope.has::<IterCap<&str>>());
let iter = envelope.get_mut::<IterCap<&str>>().unwrap();
assert_eq!(iter.collect::<Vec<_>>(), vec!["a", "b", "c"]);
}
#[test]
fn test_generic_future() {
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
struct FutureCap<O>(PhantomData<O>);
impl<O: 'static> Capability for FutureCap<O> {
type Handle = dyn Future<Output = O> + Unpin;
}
register_capability! {
slot: 1, cap: FutureCap<O>,
trait_bound: Future<Output = O> + Unpin,
generics: [O: 'static],
}
struct Ready<T>(Option<T>);
impl<T: Unpin> Future for Ready<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
Poll::Ready(self.0.take().unwrap())
}
}
let mut envelope = reflect!(Ready(Some(42u64)));
assert!(envelope.has::<FutureCap<u64>>());
assert!(!envelope.has::<FutureCap<String>>());
let waker = Waker::noop();
let mut cx = Context::from_waker(&waker);
let future = envelope.get_mut::<FutureCap<u64>>().unwrap();
assert_eq!(Pin::new(future).poll(&mut cx), Poll::Ready(42u64));
}
#[tokio::test]
async fn test_generic_stream() {
use futures_util::stream::{self, Stream, StreamExt};
use std::marker::PhantomData;
struct StreamCap<I>(PhantomData<I>);
impl<I: 'static> Capability for StreamCap<I> {
type Handle = dyn Stream<Item = I> + Unpin;
}
register_capability! {
slot: 2, cap: StreamCap<I>,
trait_bound: Stream<Item = I> + Unpin,
generics: [I: 'static],
}
let mut envelope = reflect!(stream::iter(vec![10u32, 20, 30]));
assert!(envelope.has::<StreamCap<u32>>());
let stream = envelope.get_mut::<StreamCap<u32>>().unwrap();
let mut collected = Vec::new();
while let Some(item) = stream.next().await {
collected.push(item);
}
assert_eq!(collected, vec![10, 20, 30]);
}
trait SerializableIterator {
fn next_ser(&mut self) -> Option<Box<dyn erased_serde::Serialize>>;
}
impl<T> SerializableIterator for T
where
T: Iterator,
T::Item: erased_serde::Serialize + 'static,
{
fn next_ser(&mut self) -> Option<Box<dyn erased_serde::Serialize>> {
Iterator::next(self).map(|s| Box::new(s) as _)
}
}
struct SerializableIteratorCap;
impl Capability for SerializableIteratorCap { type Handle = dyn SerializableIterator; }
register_capability! { slot: 3, cap: SerializableIteratorCap, trait_bound: SerializableIterator }
#[test]
fn test_adapter_trait_serializable_iterator() {
#[derive(Serialize)]
struct LogEntry { msg: String }
#[derive(Serialize)]
struct Metric { name: String, value: f64 }
let mut log_env = reflect!(vec![
LogEntry { msg: "started".into() },
LogEntry { msg: "stopped".into() },
].into_iter());
let mut metric_env = reflect!(vec![
Metric { name: "cpu".into(), value: 72.5 },
].into_iter());
assert!(log_env.has::<SerializableIteratorCap>());
assert!(metric_env.has::<SerializableIteratorCap>());
let iter = log_env.get_mut::<SerializableIteratorCap>().unwrap();
let v = serde_json::to_value(&*iter.next_ser().unwrap()).unwrap();
assert_eq!(v["msg"], "started");
let iter = metric_env.get_mut::<SerializableIteratorCap>().unwrap();
let v = serde_json::to_value(&*iter.next_ser().unwrap()).unwrap();
assert_eq!(v["value"], 72.5);
struct Opaque;
let opaque_env = reflect!(vec![Opaque].into_iter());
assert!(!opaque_env.has::<SerializableIteratorCap>());
}