atomr_patterns/inbox/
mod.rs1use std::collections::HashSet;
22use std::marker::PhantomData;
23use std::sync::Arc;
24
25use async_trait::async_trait;
26use atomr_core::actor::ActorSystem;
27use parking_lot::Mutex;
28use tokio::sync::mpsc::UnboundedReceiver;
29
30use crate::topology::Topology;
31use crate::PatternError;
32
33#[async_trait]
35pub trait InboxStore: Send + Sync + 'static {
36 async fn was_seen(&self, key: &str) -> bool;
38 async fn mark_seen(&self, key: &str);
41}
42
43#[derive(Default)]
46pub struct InMemoryInboxStore {
47 inner: Arc<Mutex<HashSet<String>>>,
48}
49
50impl InMemoryInboxStore {
51 pub fn new() -> Self {
52 Self::default()
53 }
54}
55
56#[async_trait]
57impl InboxStore for InMemoryInboxStore {
58 async fn was_seen(&self, key: &str) -> bool {
59 self.inner.lock().contains(key)
60 }
61 async fn mark_seen(&self, key: &str) {
62 self.inner.lock().insert(key.into());
63 }
64}
65
66type KeyFn<E> = Arc<dyn Fn(&E) -> String + Send + Sync + 'static>;
67type Handler<E> = Arc<dyn Fn(E) -> futures::future::BoxFuture<'static, bool> + Send + Sync>;
68
69pub struct InboxPattern<E>(PhantomData<E>);
71
72impl<E: Send + 'static> InboxPattern<E> {
73 pub fn builder() -> InboxBuilder<E> {
74 InboxBuilder { name: None, key: None, source: None, handler: None, store: None }
75 }
76}
77
78pub struct InboxBuilder<E: Send + 'static> {
79 name: Option<String>,
80 key: Option<KeyFn<E>>,
81 source: Option<UnboundedReceiver<E>>,
82 handler: Option<Handler<E>>,
83 store: Option<Arc<dyn InboxStore>>,
84}
85
86impl<E: Send + 'static> InboxBuilder<E> {
87 pub fn name(mut self, n: impl Into<String>) -> Self {
88 self.name = Some(n.into());
89 self
90 }
91
92 pub fn key<F>(mut self, f: F) -> Self
94 where
95 F: Fn(&E) -> String + Send + Sync + 'static,
96 {
97 self.key = Some(Arc::new(f));
98 self
99 }
100
101 pub fn source(mut self, rx: UnboundedReceiver<E>) -> Self {
103 self.source = Some(rx);
104 self
105 }
106
107 pub fn handler<F, Fut>(mut self, f: F) -> Self
111 where
112 F: Fn(E) -> Fut + Send + Sync + 'static,
113 Fut: std::future::Future<Output = bool> + Send + 'static,
114 {
115 let f = Arc::new(f);
116 self.handler = Some(Arc::new(move |e| {
117 let f = f.clone();
118 Box::pin(async move { f(e).await })
119 }));
120 self
121 }
122
123 pub fn store<S: InboxStore>(mut self, store: Arc<S>) -> Self {
124 self.store = Some(store);
125 self
126 }
127
128 pub fn build(self) -> Result<InboxTopology<E>, PatternError<()>> {
129 Ok(InboxTopology {
130 name: self.name.unwrap_or_else(|| "inbox".into()),
131 key: self.key.ok_or(PatternError::NotConfigured("key"))?,
132 source: self.source.ok_or(PatternError::NotConfigured("source"))?,
133 handler: self.handler.ok_or(PatternError::NotConfigured("handler"))?,
134 store: self.store.unwrap_or_else(|| Arc::new(InMemoryInboxStore::new())),
135 })
136 }
137}
138
139pub struct InboxTopology<E: Send + 'static> {
140 name: String,
141 key: KeyFn<E>,
142 source: UnboundedReceiver<E>,
143 handler: Handler<E>,
144 store: Arc<dyn InboxStore>,
145}
146
147pub struct InboxHandles {
148 pub name: String,
149}
150
151#[async_trait]
152impl<E: Send + 'static> Topology for InboxTopology<E> {
153 type Handles = InboxHandles;
154
155 async fn materialize(self, _system: &ActorSystem) -> Result<InboxHandles, PatternError<()>> {
156 let InboxTopology { name, key, mut source, handler, store } = self;
157 let task_name = name.clone();
158 tokio::spawn(async move {
159 while let Some(msg) = source.recv().await {
160 let k = (key)(&msg);
161 if store.was_seen(&k).await {
162 tracing::trace!(inbox = %task_name, key = %k, "duplicate suppressed");
163 continue;
164 }
165 if (handler)(msg).await {
166 store.mark_seen(&k).await;
167 } else {
168 tracing::warn!(inbox = %task_name, key = %k, "handler returned false; key not marked");
169 }
170 }
171 });
172 Ok(InboxHandles { name })
173 }
174}