1use std::{hash::Hash, sync::Arc};
16use ulid::Ulid;
17
18use crate::{
19 cursor::{Args, ReadResult, Value},
20 Event, RoutingKey, WriteError,
21};
22
23#[derive(Clone, PartialEq, Eq)]
40pub struct ReadAggregator {
41 pub aggregator_type: String,
43 pub aggregator_id: Option<String>,
45 pub name: Option<String>,
47}
48
49impl ReadAggregator {
50 pub fn new(
51 aggregator_type: impl Into<String>,
52 id: impl Into<String>,
53 name: impl Into<String>,
54 ) -> Self {
55 Self {
56 aggregator_type: aggregator_type.into(),
57 aggregator_id: Some(id.into()),
58 name: Some(name.into()),
59 }
60 }
61
62 pub fn aggregator(value: impl Into<String>) -> Self {
63 Self {
64 aggregator_type: value.into(),
65 aggregator_id: None,
66 name: None,
67 }
68 }
69
70 pub fn id(aggregator_type: impl Into<String>, id: impl Into<String>) -> Self {
71 Self {
72 aggregator_type: aggregator_type.into(),
73 aggregator_id: Some(id.into()),
74 name: None,
75 }
76 }
77
78 pub fn event(aggregator_type: impl Into<String>, name: impl Into<String>) -> Self {
79 Self {
80 aggregator_type: aggregator_type.into(),
81 aggregator_id: None,
82 name: Some(name.into()),
83 }
84 }
85}
86
87impl Hash for ReadAggregator {
88 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
89 self.aggregator_type.hash(state);
90 self.aggregator_id.hash(state);
91 self.name.hash(state);
92 }
93}
94
95#[async_trait::async_trait]
109pub trait Executor: Send + Sync + 'static {
110 async fn write(&self, events: Vec<Event>) -> Result<(), WriteError>;
114
115 async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>>;
117
118 async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool>;
120
121 async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()>;
123
124 async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()>;
126
127 async fn read(
129 &self,
130 aggregators: Option<Vec<ReadAggregator>>,
131 routing_key: Option<RoutingKey>,
132 args: Args,
133 ) -> anyhow::Result<ReadResult<Event>>;
134}
135
136pub struct Evento(Arc<Box<dyn Executor>>);
151
152impl Clone for Evento {
153 fn clone(&self) -> Self {
154 Self(self.0.clone())
155 }
156}
157
158#[async_trait::async_trait]
159impl Executor for Evento {
160 async fn write(&self, events: Vec<Event>) -> Result<(), WriteError> {
161 self.0.write(events).await
162 }
163
164 async fn read(
165 &self,
166 aggregators: Option<Vec<ReadAggregator>>,
167 routing_key: Option<RoutingKey>,
168 args: Args,
169 ) -> anyhow::Result<ReadResult<Event>> {
170 self.0.read(aggregators, routing_key, args).await
171 }
172
173 async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
174 self.0.get_subscriber_cursor(key).await
175 }
176
177 async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
178 self.0.is_subscriber_running(key, worker_id).await
179 }
180
181 async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
182 self.0.upsert_subscriber(key, worker_id).await
183 }
184
185 async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
186 self.0.acknowledge(key, cursor, lag).await
187 }
188}
189
190impl Evento {
191 pub fn new<E: Executor>(executor: E) -> Self {
192 Self(Arc::new(Box::new(executor)))
193 }
194}
195
196#[cfg(feature = "group")]
203#[derive(Clone, Default)]
204pub struct EventoGroup {
205 executors: Vec<Evento>,
206}
207
208#[cfg(feature = "group")]
209impl EventoGroup {
210 pub fn executor(mut self, executor: impl Into<Evento>) -> Self {
211 self.executors.push(executor.into());
212
213 self
214 }
215
216 pub fn first(&self) -> &Evento {
217 self.executors
218 .first()
219 .expect("EventoGroup must have at least one executor")
220 }
221}
222
223#[cfg(feature = "group")]
224#[async_trait::async_trait]
225impl Executor for EventoGroup {
226 async fn write(&self, events: Vec<Event>) -> Result<(), WriteError> {
227 self.first().write(events).await
228 }
229
230 async fn read(
231 &self,
232 aggregators: Option<Vec<ReadAggregator>>,
233 routing_key: Option<RoutingKey>,
234 args: Args,
235 ) -> anyhow::Result<ReadResult<Event>> {
236 use crate::cursor;
237 let futures = self
238 .executors
239 .iter()
240 .map(|e| e.read(aggregators.to_owned(), routing_key.to_owned(), args.clone()));
241
242 let results = futures_util::future::join_all(futures).await;
243 let mut events = vec![];
244 for res in results {
245 for edge in res?.edges {
246 events.push(edge.node);
247 }
248 }
249
250 Ok(cursor::Reader::new(events).args(args).execute()?)
251 }
252
253 async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
254 self.first().get_subscriber_cursor(key).await
255 }
256
257 async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
258 self.first().is_subscriber_running(key, worker_id).await
259 }
260
261 async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
262 self.first().upsert_subscriber(key, worker_id).await
263 }
264
265 async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
266 self.first().acknowledge(key, cursor, lag).await
267 }
268}
269
270#[cfg(feature = "rw")]
281pub struct Rw<R: Executor, W: Executor> {
282 r: R,
283 w: W,
284}
285
286#[cfg(feature = "rw")]
287impl<R: Executor + Clone, W: Executor + Clone> Clone for Rw<R, W> {
288 fn clone(&self) -> Self {
289 Self {
290 r: self.r.clone(),
291 w: self.w.clone(),
292 }
293 }
294}
295
296#[cfg(feature = "rw")]
297#[async_trait::async_trait]
298impl<R: Executor, W: Executor> Executor for Rw<R, W> {
299 async fn write(&self, events: Vec<Event>) -> Result<(), WriteError> {
300 self.w.write(events).await
301 }
302
303 async fn read(
304 &self,
305 aggregators: Option<Vec<ReadAggregator>>,
306 routing_key: Option<RoutingKey>,
307 args: Args,
308 ) -> anyhow::Result<ReadResult<Event>> {
309 self.r.read(aggregators, routing_key, args).await
310 }
311
312 async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
313 self.r.get_subscriber_cursor(key).await
314 }
315
316 async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
317 self.r.is_subscriber_running(key, worker_id).await
318 }
319
320 async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
321 self.w.upsert_subscriber(key, worker_id).await
322 }
323
324 async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
325 self.w.acknowledge(key, cursor, lag).await
326 }
327}
328
329#[cfg(feature = "rw")]
330impl<R: Executor, W: Executor> From<(R, W)> for Rw<R, W> {
331 fn from((r, w): (R, W)) -> Self {
332 Self { r, w }
333 }
334}