Skip to main content

serverust_core/events/
mod.rs

1//! Suporte a event handlers tipados, paralelo ao roteador HTTP.
2//!
3//! [`EventHandler<E>`] permite plugar handlers para event sources não-HTTP
4//! (Kafka, SQS, EventBridge, S3) sem afetar o roteamento HTTP existente.
5//! O [`Container`](crate::Container) é compartilhado entre handlers HTTP e
6//! event handlers, garantindo a mesma injeção de dependências.
7
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::Arc;
11
12use crate::container::Container;
13
14/// Erro retornado por [`EventHandler::handle`].
15#[derive(Debug)]
16pub struct EventError(pub String);
17
18impl std::fmt::Display for EventError {
19    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20        write!(f, "EventError: {}", self.0)
21    }
22}
23
24impl std::error::Error for EventError {}
25
26/// Handler para um tipo de evento `E`.
27///
28/// Implementadores recebem o evento por valor e o [`Container`] compartilhado
29/// para injeção de dependências, e retornam `Ok(())` ou um [`EventError`].
30///
31/// # Exemplo
32///
33/// ```rust
34/// use serverust_core::events::{EventHandler, EventError};
35/// use serverust_core::Container;
36/// use std::sync::Arc;
37///
38/// struct MyEvent { value: u32 }
39///
40/// struct MyHandler;
41///
42/// impl EventHandler<MyEvent> for MyHandler {
43///     async fn handle(&self, event: MyEvent, _ctx: &Container) -> Result<(), EventError> {
44///         println!("received: {}", event.value);
45///         Ok(())
46///     }
47/// }
48/// ```
49pub trait EventHandler<E: Send + 'static>: Send + Sync + 'static {
50    fn handle(
51        &self,
52        event: E,
53        ctx: &Container,
54    ) -> impl Future<Output = Result<(), EventError>> + Send;
55}
56
57// Trait interna apagada para permitir boxing de handlers heterogêneos.
58pub(crate) trait ErasedHandler<E: Send + 'static>: Send + Sync {
59    fn handle_erased<'a>(
60        &'a self,
61        event: E,
62        ctx: &'a Container,
63    ) -> Pin<Box<dyn Future<Output = Result<(), EventError>> + Send + 'a>>;
64}
65
66struct HandlerWrapper<H>(H);
67
68impl<E, H> ErasedHandler<E> for HandlerWrapper<H>
69where
70    E: Send + 'static,
71    H: EventHandler<E>,
72{
73    fn handle_erased<'a>(
74        &'a self,
75        event: E,
76        ctx: &'a Container,
77    ) -> Pin<Box<dyn Future<Output = Result<(), EventError>> + Send + 'a>> {
78        Box::pin(self.0.handle(event, ctx))
79    }
80}
81
82/// Despacha eventos `E` para os handlers registrados via [`crate::App::event`].
83///
84/// Constrói-se via [`crate::App::into_event_dispatcher`] e executa os
85/// handlers em sequência, parando no primeiro erro.
86pub struct EventDispatcher<E: Send + 'static> {
87    handlers: Vec<Arc<dyn ErasedHandler<E>>>,
88    container: Container,
89}
90
91impl<E: Send + 'static> EventDispatcher<E> {
92    pub(crate) fn new(handlers: Vec<Arc<dyn ErasedHandler<E>>>, container: Container) -> Self {
93        Self {
94            handlers,
95            container,
96        }
97    }
98
99    /// Despacha `event` para todos os handlers registrados em sequência.
100    /// Retorna no primeiro erro encontrado.
101    pub async fn dispatch_event(&self, event: E) -> Result<(), EventError>
102    where
103        E: Clone,
104    {
105        let last = self.handlers.len().saturating_sub(1);
106        for (i, handler) in self.handlers.iter().enumerate() {
107            let evt = if i == last {
108                // Último handler: consume sem clone extra (mas E: Clone já existe)
109                event.clone()
110            } else {
111                event.clone()
112            };
113            handler.handle_erased(evt, &self.container).await?;
114        }
115        Ok(())
116    }
117}
118
119/// Builder interno que acumula handlers antes de construir o EventDispatcher.
120pub(crate) struct EventHandlerRegistry<E: Send + 'static> {
121    handlers: Vec<Arc<dyn ErasedHandler<E>>>,
122}
123
124impl<E: Send + 'static> EventHandlerRegistry<E> {
125    pub(crate) fn new() -> Self {
126        Self {
127            handlers: Vec::new(),
128        }
129    }
130
131    pub(crate) fn register<H: EventHandler<E>>(&mut self, handler: H) {
132        self.handlers.push(Arc::new(HandlerWrapper(handler)));
133    }
134
135    pub(crate) fn into_dispatcher(self, container: Container) -> EventDispatcher<E> {
136        EventDispatcher::new(self.handlers, container)
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use std::sync::{Arc, Mutex};
143
144    use super::*;
145
146    #[derive(Clone)]
147    struct TestEvent {
148        value: u32,
149    }
150
151    struct RecordingHandler {
152        log: Arc<Mutex<Vec<u32>>>,
153    }
154
155    impl EventHandler<TestEvent> for RecordingHandler {
156        async fn handle(&self, event: TestEvent, _ctx: &Container) -> Result<(), EventError> {
157            self.log.lock().unwrap().push(event.value);
158            Ok(())
159        }
160    }
161
162    struct StateCheckHandler;
163
164    impl EventHandler<TestEvent> for StateCheckHandler {
165        async fn handle(&self, event: TestEvent, ctx: &Container) -> Result<(), EventError> {
166            let counter: Arc<Mutex<u32>> = ctx
167                .get::<Mutex<u32>>()
168                .ok_or_else(|| EventError("counter not found".into()))?;
169            let mut v = counter.lock().unwrap();
170            *v += event.value;
171            Ok(())
172        }
173    }
174
175    struct FailingHandler;
176
177    impl EventHandler<TestEvent> for FailingHandler {
178        async fn handle(&self, _event: TestEvent, _ctx: &Container) -> Result<(), EventError> {
179            Err(EventError("intentional failure".into()))
180        }
181    }
182
183    #[tokio::test]
184    async fn handler_stub_dispara_dispatch_event_e_verifica_retorno() {
185        let log = Arc::new(Mutex::new(Vec::<u32>::new()));
186        let mut registry = EventHandlerRegistry::<TestEvent>::new();
187        registry.register(RecordingHandler {
188            log: Arc::clone(&log),
189        });
190
191        let dispatcher = registry.into_dispatcher(Container::new());
192        dispatcher
193            .dispatch_event(TestEvent { value: 42 })
194            .await
195            .unwrap();
196
197        assert_eq!(*log.lock().unwrap(), vec![42]);
198    }
199
200    #[tokio::test]
201    async fn handler_acessa_state_injetado_no_container() {
202        let counter = Arc::new(Mutex::new(0u32));
203        let mut container = Container::new();
204        container.insert::<Mutex<u32>>(Arc::clone(&counter));
205
206        let mut registry = EventHandlerRegistry::<TestEvent>::new();
207        registry.register(StateCheckHandler);
208
209        let dispatcher = registry.into_dispatcher(container);
210        dispatcher
211            .dispatch_event(TestEvent { value: 10 })
212            .await
213            .unwrap();
214
215        assert_eq!(*counter.lock().unwrap(), 10);
216    }
217
218    #[tokio::test]
219    async fn multiplos_handlers_executam_em_sequencia() {
220        let log = Arc::new(Mutex::new(Vec::<u32>::new()));
221        let log2 = Arc::clone(&log);
222        let mut registry = EventHandlerRegistry::<TestEvent>::new();
223        registry.register(RecordingHandler {
224            log: Arc::clone(&log),
225        });
226        registry.register(RecordingHandler { log: log2 });
227
228        let dispatcher = registry.into_dispatcher(Container::new());
229        dispatcher
230            .dispatch_event(TestEvent { value: 7 })
231            .await
232            .unwrap();
233
234        assert_eq!(*log.lock().unwrap(), vec![7, 7]);
235    }
236
237    #[tokio::test]
238    async fn dispatch_para_no_primeiro_erro() {
239        let log = Arc::new(Mutex::new(Vec::<u32>::new()));
240        let mut registry = EventHandlerRegistry::<TestEvent>::new();
241        registry.register(FailingHandler);
242        registry.register(RecordingHandler {
243            log: Arc::clone(&log),
244        });
245
246        let dispatcher = registry.into_dispatcher(Container::new());
247        let result = dispatcher.dispatch_event(TestEvent { value: 1 }).await;
248
249        assert!(result.is_err());
250        assert!(
251            log.lock().unwrap().is_empty(),
252            "segundo handler não deve ter rodado"
253        );
254    }
255}