Skip to main content

atomr_patterns/reactor/
mod.rs

1//! Reactor pattern — fire-and-forget side effects in response to events.
2//!
3//! Lighter than [`crate::saga::SagaPattern`]: no per-correlation
4//! state, no command dispatch, no compensation. Just "for each event,
5//! run this side-effect closure." Commonly used for notifications,
6//! metrics emission, log aggregation.
7//!
8//! ```ignore
9//! ReactorPattern::<OrderEvent>::builder()
10//!     .name("notifier")
11//!     .events(bus.subscribe())
12//!     .reaction(|e| async move { send_notification(e).await; })
13//!     .build()?
14//!     .materialize(&system)
15//!     .await?;
16//! ```
17
18use std::marker::PhantomData;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use atomr_core::actor::ActorSystem;
23use tokio::sync::mpsc::UnboundedReceiver;
24
25use crate::topology::Topology;
26use crate::PatternError;
27
28type Reaction<E> = Arc<dyn Fn(E) -> futures::future::BoxFuture<'static, ()> + Send + Sync>;
29
30pub struct ReactorPattern<E>(PhantomData<E>);
31
32impl<E: Send + 'static> ReactorPattern<E> {
33    pub fn builder() -> ReactorBuilder<E> {
34        ReactorBuilder { name: None, events: None, reaction: None }
35    }
36}
37
38pub struct ReactorBuilder<E: Send + 'static> {
39    name: Option<String>,
40    events: Option<UnboundedReceiver<E>>,
41    reaction: Option<Reaction<E>>,
42}
43
44impl<E: Send + 'static> ReactorBuilder<E> {
45    pub fn name(mut self, n: impl Into<String>) -> Self {
46        self.name = Some(n.into());
47        self
48    }
49    pub fn events(mut self, rx: UnboundedReceiver<E>) -> Self {
50        self.events = Some(rx);
51        self
52    }
53    pub fn reaction<F, Fut>(mut self, f: F) -> Self
54    where
55        F: Fn(E) -> Fut + Send + Sync + 'static,
56        Fut: std::future::Future<Output = ()> + Send + 'static,
57    {
58        let f = Arc::new(f);
59        self.reaction = Some(Arc::new(move |e| {
60            let f = f.clone();
61            Box::pin(async move { f(e).await })
62        }));
63        self
64    }
65    pub fn build(self) -> Result<ReactorTopology<E>, PatternError<()>> {
66        Ok(ReactorTopology {
67            name: self.name.unwrap_or_else(|| "reactor".into()),
68            events: self.events.ok_or(PatternError::NotConfigured("events"))?,
69            reaction: self.reaction.ok_or(PatternError::NotConfigured("reaction"))?,
70        })
71    }
72}
73
74pub struct ReactorTopology<E: Send + 'static> {
75    name: String,
76    events: UnboundedReceiver<E>,
77    reaction: Reaction<E>,
78}
79
80pub struct ReactorHandles {
81    pub name: String,
82}
83
84#[async_trait]
85impl<E: Send + 'static> Topology for ReactorTopology<E> {
86    type Handles = ReactorHandles;
87    async fn materialize(self, _system: &ActorSystem) -> Result<ReactorHandles, PatternError<()>> {
88        let ReactorTopology { name, mut events, reaction } = self;
89        tokio::spawn(async move {
90            while let Some(event) = events.recv().await {
91                (reaction)(event).await;
92            }
93        });
94        Ok(ReactorHandles { name })
95    }
96}