sqs_lambda/
event_processor.rs

1use tokio::sync::mpsc::{channel, Sender};
2use tracing::{info, warn};
3
4use crate::completion_handler::CompletionHandler;
5use crate::consumer::Consumer;
6use crate::event_handler::{EventHandler, OutputEvent};
7use crate::event_retriever::PayloadRetriever;
8use std::fmt::Debug;
9
10use aktors::actor::Actor;
11use async_trait::async_trait;
12
13use serde::export::Formatter;
14use tracing::instrument;
15
16#[derive(Copy, Clone, Debug)]
17pub enum ProcessorState {
18    Started,
19    Waiting,
20    Complete,
21}
22
23#[derive(Clone)]
24pub struct EventProcessor<M, C, EH, Input, Output, ER, CH>
25where
26    M: Send + Clone + Sync + 'static,
27    C: Consumer<M> + Clone + Send + Sync + 'static,
28    EH: EventHandler<InputEvent = Input, OutputEvent = Output> + Send + Sync + Clone + 'static,
29    Input: Send + Clone + 'static,
30    Output: Send + Sync + Clone + 'static,
31    ER: PayloadRetriever<Input, Message = M> + Send + Sync + Clone + 'static,
32    CH: CompletionHandler<
33            Message = M,
34            CompletedEvent = OutputEvent<Output, <EH as EventHandler>::Error>,
35        > + Send
36        + Sync
37        + Clone
38        + 'static,
39{
40    consumer: C,
41    completion_handler: CH,
42    event_retriever: ER,
43    event_handler: EH,
44    state: ProcessorState,
45    self_actor: Option<EventProcessorActor<M>>,
46}
47
48impl<M, C, EH, Input, Output, ER, CH> std::fmt::Debug
49    for EventProcessor<M, C, EH, Input, Output, ER, CH>
50where
51    M: Send + Clone + Sync + 'static,
52    C: Consumer<M> + Clone + Send + Sync + 'static,
53    EH: EventHandler<InputEvent = Input, OutputEvent = Output> + Send + Sync + Clone + 'static,
54    Input: Send + Clone + 'static,
55    Output: Send + Sync + Clone + 'static,
56    ER: PayloadRetriever<Input, Message = M> + Send + Sync + Clone + 'static,
57    CH: CompletionHandler<
58            Message = M,
59            CompletedEvent = OutputEvent<Output, <EH as EventHandler>::Error>,
60        > + Send
61        + Sync
62        + Clone
63        + 'static,
64{
65    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
66        f.debug_struct("EventProcessor").finish()
67    }
68}
69
70impl<M> std::fmt::Debug for EventProcessorActor<M>
71where
72    M: Send + Clone + Sync + 'static,
73{
74    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("EventProcessorActor")
76            .field("actor_name", &self.actor_name)
77            .finish()
78    }
79}
80
81impl<M, C, EH, Input, Output, ER, CH> EventProcessor<M, C, EH, Input, Output, ER, CH>
82where
83    M: Send + Clone + Sync + 'static,
84    C: Consumer<M> + Clone + Send + Sync + 'static,
85    EH: EventHandler<InputEvent = Input, OutputEvent = Output> + Send + Sync + Clone + 'static,
86    Input: Send + Clone + 'static,
87    Output: Send + Sync + Clone + 'static,
88    ER: PayloadRetriever<Input, Message = M> + Send + Sync + Clone + 'static,
89    CH: CompletionHandler<
90            Message = M,
91            CompletedEvent = OutputEvent<Output, <EH as EventHandler>::Error>,
92        > + Send
93        + Sync
94        + Clone
95        + 'static,
96{
97    pub fn new(
98        consumer: C,
99        completion_handler: CH,
100        event_handler: EH,
101        event_retriever: ER,
102    ) -> Self {
103        Self {
104            consumer,
105            completion_handler,
106            event_handler,
107            event_retriever,
108            state: ProcessorState::Waiting,
109            self_actor: None,
110        }
111    }
112}
113
114impl<M, C, EH, Input, Output, ER, CH> EventProcessor<M, C, EH, Input, Output, ER, CH>
115where
116    M: Send + Clone + Sync + 'static,
117    C: Consumer<M> + Clone + Send + Sync + 'static,
118    EH: EventHandler<InputEvent = Input, OutputEvent = Output> + Send + Sync + Clone + 'static,
119    Input: Send + Clone + 'static,
120    Output: Send + Sync + Clone + 'static,
121    ER: PayloadRetriever<Input, Message = M> + Send + Sync + Clone + 'static,
122    CH: CompletionHandler<
123            Message = M,
124            CompletedEvent = OutputEvent<Output, <EH as EventHandler>::Error>,
125        > + Send
126        + Sync
127        + Clone
128        + 'static,
129{
130    #[instrument(skip(event))]
131    pub async fn process_event(&mut self, event: M) {
132        // TODO: Handle errors
133        info!("Processing event");
134
135        let mut unsupported = false;
136
137        let retrieved_event = match self.event_retriever.retrieve_event(&event).await {
138            Ok(event @ Some(_)) => {
139                info!("Retrieved event");
140                event
141            }
142            Ok(None) => {
143                // ack this event as it's unsupported and does not require processing
144                // self.completion_handler
145                //     .ack_message(event.clone())
146                //     .await;
147                unsupported = true;
148                None
149            }
150            Err(e) => {
151                warn!("Failed to retrieve event with: {:?}", e);
152                None
153                // TODO: Retry
154                // TODO: We could reset the message visibility to 0 so it gets picked up again?
155            }
156        };
157
158        if let Some(retrieved_event) = retrieved_event {
159            info!("Handling retrieved event");
160            let output_event = self.event_handler.handle_event(retrieved_event).await;
161            self.completion_handler
162                .mark_complete(event, output_event)
163                .await;
164        } else {
165            if unsupported {
166                // ack this event as it's unsupported and does not require processing
167                self.completion_handler.ack_message(event).await;
168            }
169        }
170
171        info!("self.processor_state {:?}", self.state);
172        if let ProcessorState::Started = self.state {
173            info!("Getting next event");
174            self.consumer
175                .get_next_event(
176                    self.self_actor
177                        .clone()
178                        .expect("event_processor, self_actor"),
179                )
180                .await;
181        }
182    }
183
184    #[instrument]
185    pub async fn start_processing(&mut self) {
186        self.state = ProcessorState::Started;
187
188        info!("Getting next event from consumer");
189        self.consumer
190            .get_next_event(self.self_actor.clone().unwrap())
191            .await;
192    }
193
194    #[instrument]
195    pub fn stop_processing(&mut self) {
196        info!("stop_processing");
197        self.state = ProcessorState::Complete;
198    }
199}
200
201#[allow(non_camel_case_types)]
202pub enum EventProcessorMessage<M>
203where
204    M: Send + Clone + Sync + 'static,
205{
206    process_event { event: M },
207    start_processing {},
208    stop_processing {},
209}
210
211#[async_trait]
212impl<M, C, EH, Input, Output, ER, CH> Actor<EventProcessorMessage<M>>
213    for EventProcessor<M, C, EH, Input, Output, ER, CH>
214where
215    M: Send + Clone + Sync + 'static,
216    C: Consumer<M> + Clone + Send + Sync + 'static,
217    EH: EventHandler<InputEvent = Input, OutputEvent = Output> + Send + Sync + Clone + 'static,
218    Input: Send + Clone + 'static,
219    Output: Send + Sync + Clone + 'static,
220    ER: PayloadRetriever<Input, Message = M> + Send + Sync + Clone + 'static,
221    CH: CompletionHandler<
222            Message = M,
223            CompletedEvent = OutputEvent<Output, <EH as EventHandler>::Error>,
224        > + Send
225        + Sync
226        + Clone
227        + 'static,
228{
229    #[instrument(skip(self, msg))]
230    async fn route_message(&mut self, msg: EventProcessorMessage<M>) {
231        match msg {
232            EventProcessorMessage::process_event { event } => self.process_event(event).await,
233            EventProcessorMessage::start_processing {} => self.start_processing().await,
234            EventProcessorMessage::stop_processing {} => self.stop_processing(),
235        };
236    }
237
238    fn get_actor_name(&self) -> &str {
239        &self.self_actor.as_ref().unwrap().actor_name
240    }
241
242    fn close(&mut self) {
243        self.self_actor = None;
244    }
245}
246
247pub struct EventProcessorActor<M>
248where
249    M: Send + Clone + Sync + 'static,
250{
251    sender: Sender<EventProcessorMessage<M>>,
252    inner_rc: std::sync::Arc<std::sync::atomic::AtomicUsize>,
253    queue_len: std::sync::Arc<std::sync::atomic::AtomicUsize>,
254    actor_name: String,
255    actor_uuid: uuid::Uuid,
256    actor_num: usize,
257}
258
259impl<M> Clone for EventProcessorActor<M>
260where
261    M: Send + Clone + Sync + 'static,
262{
263    fn clone(&self) -> Self {
264        self.inner_rc
265            .clone()
266            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
267
268        Self {
269            sender: self.sender.clone(),
270            inner_rc: self.inner_rc.clone(),
271            queue_len: self.queue_len.clone(),
272            actor_name: format!(
273                "{} {} {}",
274                stringify!(EventProcessorActor),
275                self.actor_uuid,
276                self.actor_num + 1,
277            ),
278            actor_uuid: self.actor_uuid,
279            actor_num: self.actor_num + 1,
280        }
281    }
282}
283
284impl<M> EventProcessorActor<M>
285where
286    M: Send + Clone + Sync + 'static,
287{
288    #[instrument(skip(actor_impl))]
289    pub fn new<C, EH, Input, Output, ER, CH>(
290        mut actor_impl: EventProcessor<M, C, EH, Input, Output, ER, CH>,
291    ) -> (Self, tokio::task::JoinHandle<()>)
292    where
293        C: Consumer<M> + Clone + Send + Sync + 'static,
294        EH: EventHandler<InputEvent = Input, OutputEvent = Output> + Send + Sync + Clone + 'static,
295        Input: Send + Clone + 'static,
296        Output: Send + Sync + Clone + 'static,
297        ER: PayloadRetriever<Input, Message = M> + Send + Sync + Clone + 'static,
298        CH: CompletionHandler<
299                Message = M,
300                CompletedEvent = OutputEvent<Output, <EH as EventHandler>::Error>,
301            > + Send
302            + Sync
303            + Clone
304            + 'static,
305    {
306        let (sender, receiver) = channel(1);
307        let inner_rc = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(1));
308        let queue_len = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
309
310        let actor_uuid = uuid::Uuid::new_v4();
311        let actor_name = format!("{} {} {}", stringify!(EventProcessor), actor_uuid, 0,);
312        let inner_actor = Self {
313            sender,
314            inner_rc: inner_rc.clone(),
315            queue_len: queue_len.clone(),
316            actor_name,
317            actor_uuid,
318            actor_num: 0,
319        };
320
321        let self_actor = inner_actor.clone();
322
323        actor_impl.self_actor = Some(inner_actor);
324
325        let handle = tokio::task::spawn(aktors::actor::route_wrapper(aktors::actor::Router::new(
326            actor_impl, receiver, inner_rc, queue_len,
327        )));
328
329        (self_actor, handle)
330    }
331
332    #[instrument(skip(event))]
333    pub async fn process_event(&self, event: M) {
334        let msg = EventProcessorMessage::process_event { event };
335
336        let mut sender = self.sender.clone();
337
338        let queue_len = self.queue_len.clone();
339        queue_len.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
340
341        tokio::task::spawn(async move {
342            if let Err(e) = sender.send(msg).await {
343                panic!(
344                    concat!(
345                        "Receiver has failed with {}, propagating error. ",
346                        "EventProcessorActor.process_event"
347                    ),
348                    e
349                )
350            }
351        });
352    }
353
354    #[instrument]
355    pub async fn start_processing(&self) {
356        let msg = EventProcessorMessage::start_processing {};
357
358        let mut sender = self.sender.clone();
359
360        let queue_len = self.queue_len.clone();
361        queue_len.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
362
363        tokio::task::spawn(async move {
364            if let Err(e) = sender.send(msg).await {
365                panic!(
366                    concat!(
367                        "Receiver has failed with {}, propagating error. ",
368                        "EventProcessorActor.start_processing"
369                    ),
370                    e
371                )
372            }
373        });
374    }
375
376    #[instrument]
377    pub async fn stop_processing(&self) {
378        let msg = EventProcessorMessage::stop_processing {};
379
380        let mut sender = self.sender.clone();
381
382        let queue_len = self.queue_len.clone();
383
384        queue_len.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
385        tokio::task::spawn(async move {
386            if let Err(e) = sender.send(msg).await {
387                panic!(
388                    concat!(
389                        "Receiver has failed with {}, propagating error. ",
390                        "EventProcessorActor.stop_processing"
391                    ),
392                    e
393                )
394            }
395        });
396    }
397}
398
399impl<M> Drop for EventProcessorActor<M>
400where
401    M: Send + Clone + Sync + 'static,
402{
403    fn drop(&mut self) {
404        self.inner_rc
405            .clone()
406            .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
407    }
408}