1use tokio::sync::mpsc::{channel, Sender};
2use tracing::{info, warn};
3
4use crate::completion_handler::CompletionHandler;
5use crate::consumer::Consumer;
6use crate::event_handler::{EventHandler, OutputEvent};
7use crate::event_retriever::PayloadRetriever;
8use std::fmt::Debug;
9
10use aktors::actor::Actor;
11use async_trait::async_trait;
12
13use serde::export::Formatter;
14use tracing::instrument;
15
16#[derive(Copy, Clone, Debug)]
17pub enum ProcessorState {
18 Started,
19 Waiting,
20 Complete,
21}
22
23#[derive(Clone)]
24pub struct EventProcessor<M, C, EH, Input, Output, ER, CH>
25where
26 M: Send + Clone + Sync + 'static,
27 C: Consumer<M> + Clone + Send + Sync + 'static,
28 EH: EventHandler<InputEvent = Input, OutputEvent = Output> + Send + Sync + Clone + 'static,
29 Input: Send + Clone + 'static,
30 Output: Send + Sync + Clone + 'static,
31 ER: PayloadRetriever<Input, Message = M> + Send + Sync + Clone + 'static,
32 CH: CompletionHandler<
33 Message = M,
34 CompletedEvent = OutputEvent<Output, <EH as EventHandler>::Error>,
35 > + Send
36 + Sync
37 + Clone
38 + 'static,
39{
40 consumer: C,
41 completion_handler: CH,
42 event_retriever: ER,
43 event_handler: EH,
44 state: ProcessorState,
45 self_actor: Option<EventProcessorActor<M>>,
46}
47
48impl<M, C, EH, Input, Output, ER, CH> std::fmt::Debug
49 for EventProcessor<M, C, EH, Input, Output, ER, CH>
50where
51 M: Send + Clone + Sync + 'static,
52 C: Consumer<M> + Clone + Send + Sync + 'static,
53 EH: EventHandler<InputEvent = Input, OutputEvent = Output> + Send + Sync + Clone + 'static,
54 Input: Send + Clone + 'static,
55 Output: Send + Sync + Clone + 'static,
56 ER: PayloadRetriever<Input, Message = M> + Send + Sync + Clone + 'static,
57 CH: CompletionHandler<
58 Message = M,
59 CompletedEvent = OutputEvent<Output, <EH as EventHandler>::Error>,
60 > + Send
61 + Sync
62 + Clone
63 + 'static,
64{
65 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
66 f.debug_struct("EventProcessor").finish()
67 }
68}
69
70impl<M> std::fmt::Debug for EventProcessorActor<M>
71where
72 M: Send + Clone + Sync + 'static,
73{
74 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("EventProcessorActor")
76 .field("actor_name", &self.actor_name)
77 .finish()
78 }
79}
80
81impl<M, C, EH, Input, Output, ER, CH> EventProcessor<M, C, EH, Input, Output, ER, CH>
82where
83 M: Send + Clone + Sync + 'static,
84 C: Consumer<M> + Clone + Send + Sync + 'static,
85 EH: EventHandler<InputEvent = Input, OutputEvent = Output> + Send + Sync + Clone + 'static,
86 Input: Send + Clone + 'static,
87 Output: Send + Sync + Clone + 'static,
88 ER: PayloadRetriever<Input, Message = M> + Send + Sync + Clone + 'static,
89 CH: CompletionHandler<
90 Message = M,
91 CompletedEvent = OutputEvent<Output, <EH as EventHandler>::Error>,
92 > + Send
93 + Sync
94 + Clone
95 + 'static,
96{
97 pub fn new(
98 consumer: C,
99 completion_handler: CH,
100 event_handler: EH,
101 event_retriever: ER,
102 ) -> Self {
103 Self {
104 consumer,
105 completion_handler,
106 event_handler,
107 event_retriever,
108 state: ProcessorState::Waiting,
109 self_actor: None,
110 }
111 }
112}
113
114impl<M, C, EH, Input, Output, ER, CH> EventProcessor<M, C, EH, Input, Output, ER, CH>
115where
116 M: Send + Clone + Sync + 'static,
117 C: Consumer<M> + Clone + Send + Sync + 'static,
118 EH: EventHandler<InputEvent = Input, OutputEvent = Output> + Send + Sync + Clone + 'static,
119 Input: Send + Clone + 'static,
120 Output: Send + Sync + Clone + 'static,
121 ER: PayloadRetriever<Input, Message = M> + Send + Sync + Clone + 'static,
122 CH: CompletionHandler<
123 Message = M,
124 CompletedEvent = OutputEvent<Output, <EH as EventHandler>::Error>,
125 > + Send
126 + Sync
127 + Clone
128 + 'static,
129{
130 #[instrument(skip(event))]
131 pub async fn process_event(&mut self, event: M) {
132 info!("Processing event");
134
135 let mut unsupported = false;
136
137 let retrieved_event = match self.event_retriever.retrieve_event(&event).await {
138 Ok(event @ Some(_)) => {
139 info!("Retrieved event");
140 event
141 }
142 Ok(None) => {
143 unsupported = true;
148 None
149 }
150 Err(e) => {
151 warn!("Failed to retrieve event with: {:?}", e);
152 None
153 }
156 };
157
158 if let Some(retrieved_event) = retrieved_event {
159 info!("Handling retrieved event");
160 let output_event = self.event_handler.handle_event(retrieved_event).await;
161 self.completion_handler
162 .mark_complete(event, output_event)
163 .await;
164 } else {
165 if unsupported {
166 self.completion_handler.ack_message(event).await;
168 }
169 }
170
171 info!("self.processor_state {:?}", self.state);
172 if let ProcessorState::Started = self.state {
173 info!("Getting next event");
174 self.consumer
175 .get_next_event(
176 self.self_actor
177 .clone()
178 .expect("event_processor, self_actor"),
179 )
180 .await;
181 }
182 }
183
184 #[instrument]
185 pub async fn start_processing(&mut self) {
186 self.state = ProcessorState::Started;
187
188 info!("Getting next event from consumer");
189 self.consumer
190 .get_next_event(self.self_actor.clone().unwrap())
191 .await;
192 }
193
194 #[instrument]
195 pub fn stop_processing(&mut self) {
196 info!("stop_processing");
197 self.state = ProcessorState::Complete;
198 }
199}
200
201#[allow(non_camel_case_types)]
202pub enum EventProcessorMessage<M>
203where
204 M: Send + Clone + Sync + 'static,
205{
206 process_event { event: M },
207 start_processing {},
208 stop_processing {},
209}
210
211#[async_trait]
212impl<M, C, EH, Input, Output, ER, CH> Actor<EventProcessorMessage<M>>
213 for EventProcessor<M, C, EH, Input, Output, ER, CH>
214where
215 M: Send + Clone + Sync + 'static,
216 C: Consumer<M> + Clone + Send + Sync + 'static,
217 EH: EventHandler<InputEvent = Input, OutputEvent = Output> + Send + Sync + Clone + 'static,
218 Input: Send + Clone + 'static,
219 Output: Send + Sync + Clone + 'static,
220 ER: PayloadRetriever<Input, Message = M> + Send + Sync + Clone + 'static,
221 CH: CompletionHandler<
222 Message = M,
223 CompletedEvent = OutputEvent<Output, <EH as EventHandler>::Error>,
224 > + Send
225 + Sync
226 + Clone
227 + 'static,
228{
229 #[instrument(skip(self, msg))]
230 async fn route_message(&mut self, msg: EventProcessorMessage<M>) {
231 match msg {
232 EventProcessorMessage::process_event { event } => self.process_event(event).await,
233 EventProcessorMessage::start_processing {} => self.start_processing().await,
234 EventProcessorMessage::stop_processing {} => self.stop_processing(),
235 };
236 }
237
238 fn get_actor_name(&self) -> &str {
239 &self.self_actor.as_ref().unwrap().actor_name
240 }
241
242 fn close(&mut self) {
243 self.self_actor = None;
244 }
245}
246
247pub struct EventProcessorActor<M>
248where
249 M: Send + Clone + Sync + 'static,
250{
251 sender: Sender<EventProcessorMessage<M>>,
252 inner_rc: std::sync::Arc<std::sync::atomic::AtomicUsize>,
253 queue_len: std::sync::Arc<std::sync::atomic::AtomicUsize>,
254 actor_name: String,
255 actor_uuid: uuid::Uuid,
256 actor_num: usize,
257}
258
259impl<M> Clone for EventProcessorActor<M>
260where
261 M: Send + Clone + Sync + 'static,
262{
263 fn clone(&self) -> Self {
264 self.inner_rc
265 .clone()
266 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
267
268 Self {
269 sender: self.sender.clone(),
270 inner_rc: self.inner_rc.clone(),
271 queue_len: self.queue_len.clone(),
272 actor_name: format!(
273 "{} {} {}",
274 stringify!(EventProcessorActor),
275 self.actor_uuid,
276 self.actor_num + 1,
277 ),
278 actor_uuid: self.actor_uuid,
279 actor_num: self.actor_num + 1,
280 }
281 }
282}
283
284impl<M> EventProcessorActor<M>
285where
286 M: Send + Clone + Sync + 'static,
287{
288 #[instrument(skip(actor_impl))]
289 pub fn new<C, EH, Input, Output, ER, CH>(
290 mut actor_impl: EventProcessor<M, C, EH, Input, Output, ER, CH>,
291 ) -> (Self, tokio::task::JoinHandle<()>)
292 where
293 C: Consumer<M> + Clone + Send + Sync + 'static,
294 EH: EventHandler<InputEvent = Input, OutputEvent = Output> + Send + Sync + Clone + 'static,
295 Input: Send + Clone + 'static,
296 Output: Send + Sync + Clone + 'static,
297 ER: PayloadRetriever<Input, Message = M> + Send + Sync + Clone + 'static,
298 CH: CompletionHandler<
299 Message = M,
300 CompletedEvent = OutputEvent<Output, <EH as EventHandler>::Error>,
301 > + Send
302 + Sync
303 + Clone
304 + 'static,
305 {
306 let (sender, receiver) = channel(1);
307 let inner_rc = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(1));
308 let queue_len = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
309
310 let actor_uuid = uuid::Uuid::new_v4();
311 let actor_name = format!("{} {} {}", stringify!(EventProcessor), actor_uuid, 0,);
312 let inner_actor = Self {
313 sender,
314 inner_rc: inner_rc.clone(),
315 queue_len: queue_len.clone(),
316 actor_name,
317 actor_uuid,
318 actor_num: 0,
319 };
320
321 let self_actor = inner_actor.clone();
322
323 actor_impl.self_actor = Some(inner_actor);
324
325 let handle = tokio::task::spawn(aktors::actor::route_wrapper(aktors::actor::Router::new(
326 actor_impl, receiver, inner_rc, queue_len,
327 )));
328
329 (self_actor, handle)
330 }
331
332 #[instrument(skip(event))]
333 pub async fn process_event(&self, event: M) {
334 let msg = EventProcessorMessage::process_event { event };
335
336 let mut sender = self.sender.clone();
337
338 let queue_len = self.queue_len.clone();
339 queue_len.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
340
341 tokio::task::spawn(async move {
342 if let Err(e) = sender.send(msg).await {
343 panic!(
344 concat!(
345 "Receiver has failed with {}, propagating error. ",
346 "EventProcessorActor.process_event"
347 ),
348 e
349 )
350 }
351 });
352 }
353
354 #[instrument]
355 pub async fn start_processing(&self) {
356 let msg = EventProcessorMessage::start_processing {};
357
358 let mut sender = self.sender.clone();
359
360 let queue_len = self.queue_len.clone();
361 queue_len.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
362
363 tokio::task::spawn(async move {
364 if let Err(e) = sender.send(msg).await {
365 panic!(
366 concat!(
367 "Receiver has failed with {}, propagating error. ",
368 "EventProcessorActor.start_processing"
369 ),
370 e
371 )
372 }
373 });
374 }
375
376 #[instrument]
377 pub async fn stop_processing(&self) {
378 let msg = EventProcessorMessage::stop_processing {};
379
380 let mut sender = self.sender.clone();
381
382 let queue_len = self.queue_len.clone();
383
384 queue_len.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
385 tokio::task::spawn(async move {
386 if let Err(e) = sender.send(msg).await {
387 panic!(
388 concat!(
389 "Receiver has failed with {}, propagating error. ",
390 "EventProcessorActor.stop_processing"
391 ),
392 e
393 )
394 }
395 });
396 }
397}
398
399impl<M> Drop for EventProcessorActor<M>
400where
401 M: Send + Clone + Sync + 'static,
402{
403 fn drop(&mut self) {
404 self.inner_rc
405 .clone()
406 .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
407 }
408}