mod step;
mod with_step;
use crate::event::Event;
use crate::{Bus, Close, Drain, Emit, ToEmitter};
pub use step::*;
pub use with_step::*;
#[derive(Debug, Clone)]
pub struct Chain<T, U> {
emitter: T,
step: U,
}
impl<T: Emit, U: Step> Chain<T, U> {
pub fn prepend<P: Step>(self, step: P) -> Chain<Self, P> {
Chain {
emitter: self,
step,
}
}
}
impl<T: Drain, U: Send> Drain for Chain<T, U> {
async fn drain(self) {
self.emitter.drain().await;
}
}
impl<T: Close, U: Send> Close for Chain<T, U> {
async fn close(self) {
self.emitter.close().await;
}
}
impl<T: Emit, U: Step> Emit for Chain<T, U> {
async fn emit<E: Event>(&self, event: E) {
let event = self.step.clone().process(event).await;
self.emitter.emit(event).await
}
}
impl<ToE: ToEmitter, U: Step> ToEmitter for Chain<ToE, U> {
type Emitter<E: Event> = WithStep<E, ToE::Emitter<U::Event<E>>, U>;
fn to_emitter<E: Event>(&self) -> Self::Emitter<E> {
let emitter = self.emitter.to_emitter();
WithStep::new(emitter, self.step.clone())
}
}
impl From<Bus> for Chain<Bus, Identity> {
fn from(value: Bus) -> Self {
Self {
emitter: value,
step: Identity,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::listener::from_fn;
use crate::{Bus, Guard, TypedEmit};
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::oneshot;
#[derive(Debug, Clone, PartialEq)]
struct Scoped<E, C> {
event: E,
context: C,
}
#[derive(Debug, Clone, PartialEq)]
struct OriginalEvent(String);
#[derive(Debug, Clone, PartialEq)]
struct RequestContext {
request_id: u64,
}
#[derive(Debug, Clone, PartialEq)]
struct UserContext {
user_id: u32,
}
#[derive(Clone)]
struct WithRequestStep {
request_counter: std::sync::Arc<AtomicU64>,
}
impl WithRequestStep {
fn new() -> Self {
Self {
request_counter: std::sync::Arc::new(AtomicU64::new(1)),
}
}
}
impl Step for WithRequestStep {
type Event<E: Event> = Scoped<E, RequestContext>;
async fn process<E: Event>(self, event: E) -> Self::Event<E> {
let request_id = self.request_counter.fetch_add(1, Ordering::Relaxed);
Scoped {
event,
context: RequestContext { request_id },
}
}
}
#[derive(Clone)]
struct WithUserStep {
user_id: u32,
}
impl Step for WithUserStep {
type Event<E: Event> = Scoped<E, UserContext>;
async fn process<E: Event>(self, event: E) -> Self::Event<E> {
Scoped {
event,
context: UserContext {
user_id: self.user_id,
},
}
}
}
#[tokio::test]
async fn test_identity_chain_passes_event_through() {
let bus = Bus::new(2);
let chain = Chain::from(bus.clone());
let (tx, rx) = oneshot::channel();
let mut tx_wrap = Some(tx);
bus.on(from_fn(move |event: Guard<OriginalEvent>| {
let tx = tx_wrap.take().unwrap();
async move {
tx.send(event.clone()).unwrap_or(());
}
}));
let original_event = OriginalEvent("hello".to_string());
chain.emit(original_event.clone()).await;
let received_event = rx.await.unwrap();
assert_eq!(received_event, original_event);
}
#[tokio::test]
async fn test_single_step_injects_context() {
let bus = Bus::new(2);
let chain = Chain::from(bus.clone()).prepend(WithRequestStep::new());
let (tx, rx) = oneshot::channel();
let mut tx_wrap = Some(tx);
bus.on(from_fn(
move |event: Guard<Scoped<OriginalEvent, RequestContext>>| {
let tx = tx_wrap.take().unwrap();
async move {
tx.send(event.clone()).unwrap_or(());
}
},
));
let original_event = OriginalEvent("find user".to_string());
chain.emit(original_event.clone()).await;
let received_event = rx.await.unwrap();
assert_eq!(received_event.event, original_event);
assert_eq!(received_event.context.request_id, 1);
}
#[tokio::test]
async fn test_chained_steps_nest_context_correctly() {
let bus = Bus::new(2);
let chain = Chain::from(bus.clone())
.prepend(WithUserStep { user_id: 123 }) .prepend(WithRequestStep::new());
let (tx, rx) = oneshot::channel();
let mut tx_wrap = Some(tx);
bus.on(from_fn(
move |event: Guard<Scoped<Scoped<OriginalEvent, RequestContext>, UserContext>>| {
let tx = tx_wrap.take().unwrap();
async move {
tx.send(event.clone()).unwrap_or(());
}
},
));
let original_event = OriginalEvent("update profile".to_string());
chain.emit(original_event.clone()).await;
let received = rx.await.unwrap();
assert_eq!(received.context.user_id, 123);
assert_eq!(received.event.context.request_id, 1);
assert_eq!(received.event.event, original_event);
}
#[tokio::test]
async fn test_chain_to_emitter_with_context_injection() {
let bus = Bus::new(2);
let chain = Chain::from(bus.clone())
.prepend(WithUserStep { user_id: 456 })
.prepend(WithRequestStep::new());
let (tx, rx) = oneshot::channel();
let mut tx_wrap = Some(tx);
bus.on(from_fn(
move |event: Guard<Scoped<Scoped<OriginalEvent, RequestContext>, UserContext>>| {
let tx = tx_wrap.take().unwrap();
async move {
tx.send(event.clone()).unwrap_or(());
}
},
));
let typed_emitter = chain.to_emitter::<OriginalEvent>();
let original_event = OriginalEvent("create post".to_string());
typed_emitter.emit(original_event.clone()).await;
let received = rx.await.unwrap();
assert_eq!(received.context.user_id, 456);
assert_eq!(received.event.context.request_id, 1);
assert_eq!(received.event.event, original_event);
}
}