atomr_patterns/reactor/
mod.rs1use std::marker::PhantomData;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use atomr_core::actor::ActorSystem;
23use tokio::sync::mpsc::UnboundedReceiver;
24
25use crate::topology::Topology;
26use crate::PatternError;
27
28type Reaction<E> = Arc<dyn Fn(E) -> futures::future::BoxFuture<'static, ()> + Send + Sync>;
29
30pub struct ReactorPattern<E>(PhantomData<E>);
31
32impl<E: Send + 'static> ReactorPattern<E> {
33 pub fn builder() -> ReactorBuilder<E> {
34 ReactorBuilder { name: None, events: None, reaction: None }
35 }
36}
37
38pub struct ReactorBuilder<E: Send + 'static> {
39 name: Option<String>,
40 events: Option<UnboundedReceiver<E>>,
41 reaction: Option<Reaction<E>>,
42}
43
44impl<E: Send + 'static> ReactorBuilder<E> {
45 pub fn name(mut self, n: impl Into<String>) -> Self {
46 self.name = Some(n.into());
47 self
48 }
49 pub fn events(mut self, rx: UnboundedReceiver<E>) -> Self {
50 self.events = Some(rx);
51 self
52 }
53 pub fn reaction<F, Fut>(mut self, f: F) -> Self
54 where
55 F: Fn(E) -> Fut + Send + Sync + 'static,
56 Fut: std::future::Future<Output = ()> + Send + 'static,
57 {
58 let f = Arc::new(f);
59 self.reaction = Some(Arc::new(move |e| {
60 let f = f.clone();
61 Box::pin(async move { f(e).await })
62 }));
63 self
64 }
65 pub fn build(self) -> Result<ReactorTopology<E>, PatternError<()>> {
66 Ok(ReactorTopology {
67 name: self.name.unwrap_or_else(|| "reactor".into()),
68 events: self.events.ok_or(PatternError::NotConfigured("events"))?,
69 reaction: self.reaction.ok_or(PatternError::NotConfigured("reaction"))?,
70 })
71 }
72}
73
74pub struct ReactorTopology<E: Send + 'static> {
75 name: String,
76 events: UnboundedReceiver<E>,
77 reaction: Reaction<E>,
78}
79
80pub struct ReactorHandles {
81 pub name: String,
82}
83
84#[async_trait]
85impl<E: Send + 'static> Topology for ReactorTopology<E> {
86 type Handles = ReactorHandles;
87 async fn materialize(self, _system: &ActorSystem) -> Result<ReactorHandles, PatternError<()>> {
88 let ReactorTopology { name, mut events, reaction } = self;
89 tokio::spawn(async move {
90 while let Some(event) = events.recv().await {
91 (reaction)(event).await;
92 }
93 });
94 Ok(ReactorHandles { name })
95 }
96}