Skip to main content

hexeract_core/
handler.rs

1use std::sync::Arc;
2
3use crate::command::Command;
4use crate::context::HandlerContext;
5use crate::error::HexeractError;
6use crate::notification::Notification;
7use crate::query::Query;
8
9/// Asynchronous handler for a [`Command`].
10///
11/// Each [`Command`] type has exactly one registered `CommandHandler`. The
12/// handler receives an immutable reference to itself, the command value and
13/// a [`HandlerContext`] carrying tracing and cancellation information.
14///
15/// # Example
16///
17/// ```
18/// use hexeract_core::{Command, CommandHandler, HandlerContext, HexeractError};
19/// use uuid::Uuid;
20///
21/// struct CreateUser {
22///     pub email: String,
23/// }
24///
25/// impl Command for CreateUser {
26///     type Output = Uuid;
27/// }
28///
29/// struct UserRepository;
30///
31/// impl CommandHandler<CreateUser> for UserRepository {
32///     type Error = HexeractError;
33///
34///     async fn handle(
35///         &self,
36///         cmd: CreateUser,
37///         _ctx: &HandlerContext,
38///     ) -> Result<Uuid, Self::Error> {
39///         let _ = cmd.email;
40///         Ok(Uuid::new_v4())
41///     }
42/// }
43/// ```
44#[trait_variant::make(Send)]
45pub trait CommandHandler<C: Command>: Send + Sync + 'static {
46    /// The handler-defined error type, convertible into [`HexeractError`].
47    type Error: Into<HexeractError> + Send + Sync + 'static;
48
49    /// Handles the command and produces its output.
50    async fn handle(&self, command: C, ctx: &HandlerContext) -> Result<C::Output, Self::Error>;
51}
52
53/// Asynchronous handler for a [`Query`].
54#[trait_variant::make(Send)]
55pub trait QueryHandler<Q: Query>: Send + Sync + 'static {
56    /// The handler-defined error type, convertible into [`HexeractError`].
57    type Error: Into<HexeractError> + Send + Sync + 'static;
58
59    /// Handles the query and produces its output.
60    async fn handle(&self, query: Q, ctx: &HandlerContext) -> Result<Q::Output, Self::Error>;
61}
62
63/// Asynchronous handler for a [`Notification`].
64///
65/// Multiple handlers may be registered for the same notification type; the
66/// mediator delivers the notification to each of them. A handler failure does
67/// not interrupt the fan-out: every registered handler is invoked regardless
68/// of sibling outcomes.
69#[trait_variant::make(Send)]
70pub trait NotificationHandler<N: Notification>: Send + Sync + 'static {
71    /// The handler-defined error type, convertible into [`HexeractError`].
72    type Error: Into<HexeractError> + Send + Sync + 'static;
73
74    /// Handles the notification.
75    ///
76    /// The notification is shared across every registered handler as an
77    /// [`Arc`], so it is never deep-cloned per handler. Clone out of the `Arc`
78    /// if an owned value is needed.
79    async fn handle(&self, notification: Arc<N>, ctx: &HandlerContext) -> Result<(), Self::Error>;
80}
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85    use crate::ids::{CorrelationId, MessageId};
86    use std::time::Duration;
87    use uuid::Uuid;
88
89    fn fresh_ctx() -> HandlerContext {
90        HandlerContext::new(MessageId::new(), CorrelationId::new())
91    }
92
93    fn assert_send<T: Send>(_: &T) {}
94
95    #[derive(Debug, PartialEq, Eq, Clone)]
96    struct UserCreated {
97        id: Uuid,
98        email: String,
99    }
100
101    struct CreateUser {
102        email: String,
103    }
104
105    impl Command for CreateUser {
106        type Output = UserCreated;
107    }
108
109    #[derive(Debug, thiserror::Error)]
110    enum UserError {
111        #[error("invalid email")]
112        InvalidEmail,
113    }
114
115    impl From<UserError> for HexeractError {
116        fn from(value: UserError) -> Self {
117            Self::handler_failed(value)
118        }
119    }
120
121    struct UserRepo {
122        prefix: String,
123    }
124
125    impl CommandHandler<CreateUser> for UserRepo {
126        type Error = UserError;
127        async fn handle(
128            &self,
129            cmd: CreateUser,
130            _ctx: &HandlerContext,
131        ) -> Result<UserCreated, Self::Error> {
132            if cmd.email.is_empty() {
133                return Err(UserError::InvalidEmail);
134            }
135            Ok(UserCreated {
136                id: Uuid::new_v4(),
137                email: format!("{}-{}", self.prefix, cmd.email),
138            })
139        }
140    }
141
142    #[tokio::test]
143    async fn command_handler_returns_complex_output() {
144        let repo = UserRepo {
145            prefix: "test".into(),
146        };
147        let ctx = fresh_ctx();
148        let result = repo
149            .handle(
150                CreateUser {
151                    email: "alice@example.com".into(),
152                },
153                &ctx,
154            )
155            .await
156            .expect("handler should succeed");
157        assert_eq!(result.email, "test-alice@example.com");
158    }
159
160    #[tokio::test]
161    async fn command_handler_returns_typed_error_for_invalid_input() {
162        let repo = UserRepo {
163            prefix: "test".into(),
164        };
165        let ctx = fresh_ctx();
166        let err = repo
167            .handle(
168                CreateUser {
169                    email: String::new(),
170                },
171                &ctx,
172            )
173            .await
174            .expect_err("empty email must fail");
175        assert!(matches!(err, UserError::InvalidEmail));
176        let framework_err: HexeractError = err.into();
177        assert!(matches!(framework_err, HexeractError::HandlerFailed { .. }));
178    }
179
180    #[tokio::test]
181    async fn handler_future_is_send() {
182        let repo = UserRepo {
183            prefix: "send".into(),
184        };
185        let ctx = fresh_ctx();
186        let future = repo.handle(
187            CreateUser {
188                email: "send@test".into(),
189            },
190            &ctx,
191        );
192        assert_send(&future);
193        let _ = future.await;
194    }
195
196    #[tokio::test]
197    async fn handler_runs_in_spawned_task() {
198        let repo = Arc::new(UserRepo {
199            prefix: "spawn".into(),
200        });
201        let cloned = Arc::clone(&repo);
202        let result = tokio::spawn(async move {
203            let ctx = fresh_ctx();
204            cloned.handle(CreateUser { email: "ok".into() }, &ctx).await
205        })
206        .await
207        .expect("task panicked");
208        assert!(result.is_ok());
209    }
210
211    struct DirectErrorHandler;
212    impl CommandHandler<CreateUser> for DirectErrorHandler {
213        type Error = HexeractError;
214        async fn handle(
215            &self,
216            _cmd: CreateUser,
217            _ctx: &HandlerContext,
218        ) -> Result<UserCreated, Self::Error> {
219            Err(HexeractError::Dispatch("forced".into()))
220        }
221    }
222
223    #[tokio::test]
224    async fn handler_can_use_hexeract_error_directly_as_error_type() {
225        let handler = DirectErrorHandler;
226        let ctx = fresh_ctx();
227        let err = handler
228            .handle(
229                CreateUser {
230                    email: "any".into(),
231                },
232                &ctx,
233            )
234            .await
235            .expect_err("must fail");
236        assert!(matches!(err, HexeractError::Dispatch(_)));
237    }
238
239    struct EchoIdsHandler;
240    struct EchoIds;
241    impl Command for EchoIds {
242        type Output = (MessageId, CorrelationId);
243    }
244
245    impl CommandHandler<EchoIds> for EchoIdsHandler {
246        type Error = HexeractError;
247        async fn handle(
248            &self,
249            _cmd: EchoIds,
250            ctx: &HandlerContext,
251        ) -> Result<(MessageId, CorrelationId), Self::Error> {
252            Ok((ctx.message_id, ctx.correlation_id))
253        }
254    }
255
256    #[tokio::test]
257    async fn handler_reads_message_and_correlation_ids_from_context() {
258        let message_id = MessageId::new();
259        let correlation_id = CorrelationId::new();
260        let ctx = HandlerContext::new(message_id, correlation_id);
261
262        let handler = EchoIdsHandler;
263        let (got_msg, got_corr) = handler
264            .handle(EchoIds, &ctx)
265            .await
266            .expect("handler should succeed");
267        assert_eq!(got_msg, message_id);
268        assert_eq!(got_corr, correlation_id);
269    }
270
271    struct SleepHandler;
272    struct SleepFor(u64);
273    impl Command for SleepFor {
274        type Output = &'static str;
275    }
276
277    impl CommandHandler<SleepFor> for SleepHandler {
278        type Error = HexeractError;
279        async fn handle(
280            &self,
281            cmd: SleepFor,
282            ctx: &HandlerContext,
283        ) -> Result<&'static str, Self::Error> {
284            tokio::select! {
285                () = ctx.cancellation.cancelled() => Err(HexeractError::Dispatch("cancelled".into())),
286                () = tokio::time::sleep(Duration::from_millis(cmd.0)) => Ok("completed"),
287            }
288        }
289    }
290
291    #[tokio::test]
292    async fn handler_observes_external_cancellation() {
293        let ctx = fresh_ctx();
294        let token = ctx.cancellation.clone();
295
296        let handle = tokio::spawn(async move {
297            let handler = SleepHandler;
298            handler.handle(SleepFor(5_000), &ctx).await
299        });
300
301        tokio::time::sleep(Duration::from_millis(50)).await;
302        token.cancel();
303
304        let result = handle.await.expect("task panicked");
305        assert!(matches!(result, Err(HexeractError::Dispatch(ref m)) if m == "cancelled"));
306    }
307
308    #[tokio::test]
309    async fn handler_is_shareable_via_arc() {
310        let handler: Arc<UserRepo> = Arc::new(UserRepo {
311            prefix: "arc".into(),
312        });
313        let h1 = Arc::clone(&handler);
314        let h2 = Arc::clone(&handler);
315
316        let t1 = tokio::spawn(async move {
317            let ctx = fresh_ctx();
318            h1.handle(CreateUser { email: "u1".into() }, &ctx).await
319        });
320        let t2 = tokio::spawn(async move {
321            let ctx = fresh_ctx();
322            h2.handle(CreateUser { email: "u2".into() }, &ctx).await
323        });
324
325        let (r1, r2) = tokio::join!(t1, t2);
326        assert!(r1.unwrap().is_ok());
327        assert!(r2.unwrap().is_ok());
328    }
329
330    #[derive(Debug)]
331    struct UserSummary {
332        id: Uuid,
333    }
334
335    struct FindUser {
336        id: Uuid,
337    }
338
339    impl Query for FindUser {
340        type Output = Option<UserSummary>;
341    }
342
343    struct UserFinder;
344
345    impl QueryHandler<FindUser> for UserFinder {
346        type Error = HexeractError;
347        async fn handle(
348            &self,
349            query: FindUser,
350            _ctx: &HandlerContext,
351        ) -> Result<Option<UserSummary>, Self::Error> {
352            Ok(Some(UserSummary { id: query.id }))
353        }
354    }
355
356    #[tokio::test]
357    async fn query_handler_returns_output() {
358        let id = Uuid::new_v4();
359        let handler = UserFinder;
360        let ctx = fresh_ctx();
361        let result = handler
362            .handle(FindUser { id }, &ctx)
363            .await
364            .expect("query should succeed");
365        assert_eq!(result.unwrap().id, id);
366    }
367
368    #[tokio::test]
369    async fn query_handler_future_is_send() {
370        let handler = UserFinder;
371        let ctx = fresh_ctx();
372        let future = handler.handle(FindUser { id: Uuid::new_v4() }, &ctx);
373        assert_send(&future);
374        let _ = future.await;
375    }
376
377    #[tokio::test]
378    async fn query_handler_runs_in_spawned_task() {
379        let handler = Arc::new(UserFinder);
380        let cloned = Arc::clone(&handler);
381        let result = tokio::spawn(async move {
382            let ctx = fresh_ctx();
383            cloned.handle(FindUser { id: Uuid::new_v4() }, &ctx).await
384        })
385        .await
386        .expect("task panicked");
387        assert!(result.is_ok());
388    }
389
390    struct FailingQuery;
391    impl QueryHandler<FindUser> for FailingQuery {
392        type Error = UserError;
393        async fn handle(
394            &self,
395            _query: FindUser,
396            _ctx: &HandlerContext,
397        ) -> Result<Option<UserSummary>, Self::Error> {
398            Err(UserError::InvalidEmail)
399        }
400    }
401
402    #[tokio::test]
403    async fn query_handler_error_converts_into_hexeract_error() {
404        let handler = FailingQuery;
405        let ctx = fresh_ctx();
406        let err = handler
407            .handle(FindUser { id: Uuid::new_v4() }, &ctx)
408            .await
409            .expect_err("must fail");
410        let framework_err: HexeractError = err.into();
411        assert!(matches!(framework_err, HexeractError::HandlerFailed { .. }));
412    }
413}