Skip to main content

hexeract_core/
handler.rs

1use crate::command::Command;
2use crate::context::HandlerContext;
3use crate::error::HexeractError;
4use crate::notification::Notification;
5use crate::query::Query;
6
7/// Asynchronous handler for a [`Command`].
8///
9/// Each [`Command`] type has exactly one registered `CommandHandler`. The
10/// handler receives an immutable reference to itself, the command value and
11/// a [`HandlerContext`] carrying tracing and cancellation information.
12///
13/// # Example
14///
15/// ```
16/// use hexeract_core::{Command, CommandHandler, HandlerContext, HexeractError};
17/// use uuid::Uuid;
18///
19/// struct CreateUser {
20///     pub email: String,
21/// }
22///
23/// impl Command for CreateUser {
24///     type Output = Uuid;
25/// }
26///
27/// struct UserRepository;
28///
29/// impl CommandHandler<CreateUser> for UserRepository {
30///     type Error = HexeractError;
31///
32///     async fn handle(
33///         &self,
34///         cmd: CreateUser,
35///         _ctx: &HandlerContext,
36///     ) -> Result<Uuid, Self::Error> {
37///         let _ = cmd.email;
38///         Ok(Uuid::new_v4())
39///     }
40/// }
41/// ```
42#[trait_variant::make(Send)]
43pub trait CommandHandler<C: Command>: Send + Sync + 'static {
44    /// The handler-defined error type, convertible into [`HexeractError`].
45    type Error: Into<HexeractError> + Send + Sync + 'static;
46
47    /// Handles the command and produces its output.
48    async fn handle(&self, command: C, ctx: &HandlerContext) -> Result<C::Output, Self::Error>;
49}
50
51/// Asynchronous handler for a [`Query`].
52#[trait_variant::make(Send)]
53pub trait QueryHandler<Q: Query>: Send + Sync + 'static {
54    /// The handler-defined error type, convertible into [`HexeractError`].
55    type Error: Into<HexeractError> + Send + Sync + 'static;
56
57    /// Handles the query and produces its output.
58    async fn handle(&self, query: Q, ctx: &HandlerContext) -> Result<Q::Output, Self::Error>;
59}
60
61/// Asynchronous handler for a [`Notification`].
62///
63/// Multiple handlers may be registered for the same notification type; the
64/// mediator delivers the notification to each of them. A handler failure does
65/// not interrupt the fan-out: every registered handler is invoked regardless
66/// of sibling outcomes.
67#[trait_variant::make(Send)]
68pub trait NotificationHandler<N: Notification>: Send + Sync + 'static {
69    /// The handler-defined error type, convertible into [`HexeractError`].
70    type Error: Into<HexeractError> + Send + Sync + 'static;
71
72    /// Handles the notification.
73    async fn handle(&self, notification: N, ctx: &HandlerContext) -> Result<(), Self::Error>;
74}
75
76#[cfg(test)]
77mod tests {
78    use super::*;
79    use crate::ids::{CorrelationId, MessageId};
80    use std::sync::Arc;
81    use std::time::Duration;
82    use uuid::Uuid;
83
84    fn fresh_ctx() -> HandlerContext {
85        HandlerContext::new(MessageId::new(), CorrelationId::new())
86    }
87
88    fn assert_send<T: Send>(_: &T) {}
89
90    #[derive(Debug, PartialEq, Eq, Clone)]
91    struct UserCreated {
92        id: Uuid,
93        email: String,
94    }
95
96    struct CreateUser {
97        email: String,
98    }
99
100    impl Command for CreateUser {
101        type Output = UserCreated;
102    }
103
104    #[derive(Debug, thiserror::Error)]
105    enum UserError {
106        #[error("invalid email")]
107        InvalidEmail,
108    }
109
110    impl From<UserError> for HexeractError {
111        fn from(value: UserError) -> Self {
112            Self::handler_failed(value)
113        }
114    }
115
116    struct UserRepo {
117        prefix: String,
118    }
119
120    impl CommandHandler<CreateUser> for UserRepo {
121        type Error = UserError;
122        async fn handle(
123            &self,
124            cmd: CreateUser,
125            _ctx: &HandlerContext,
126        ) -> Result<UserCreated, Self::Error> {
127            if cmd.email.is_empty() {
128                return Err(UserError::InvalidEmail);
129            }
130            Ok(UserCreated {
131                id: Uuid::new_v4(),
132                email: format!("{}-{}", self.prefix, cmd.email),
133            })
134        }
135    }
136
137    #[tokio::test]
138    async fn command_handler_returns_complex_output() {
139        let repo = UserRepo {
140            prefix: "test".into(),
141        };
142        let ctx = fresh_ctx();
143        let result = repo
144            .handle(
145                CreateUser {
146                    email: "alice@example.com".into(),
147                },
148                &ctx,
149            )
150            .await
151            .expect("handler should succeed");
152        assert_eq!(result.email, "test-alice@example.com");
153    }
154
155    #[tokio::test]
156    async fn command_handler_returns_typed_error_for_invalid_input() {
157        let repo = UserRepo {
158            prefix: "test".into(),
159        };
160        let ctx = fresh_ctx();
161        let err = repo
162            .handle(
163                CreateUser {
164                    email: String::new(),
165                },
166                &ctx,
167            )
168            .await
169            .expect_err("empty email must fail");
170        assert!(matches!(err, UserError::InvalidEmail));
171        let framework_err: HexeractError = err.into();
172        assert!(matches!(framework_err, HexeractError::HandlerFailed { .. }));
173    }
174
175    #[tokio::test]
176    async fn handler_future_is_send() {
177        let repo = UserRepo {
178            prefix: "send".into(),
179        };
180        let ctx = fresh_ctx();
181        let future = repo.handle(
182            CreateUser {
183                email: "send@test".into(),
184            },
185            &ctx,
186        );
187        assert_send(&future);
188        let _ = future.await;
189    }
190
191    #[tokio::test]
192    async fn handler_runs_in_spawned_task() {
193        let repo = Arc::new(UserRepo {
194            prefix: "spawn".into(),
195        });
196        let cloned = Arc::clone(&repo);
197        let result = tokio::spawn(async move {
198            let ctx = fresh_ctx();
199            cloned.handle(CreateUser { email: "ok".into() }, &ctx).await
200        })
201        .await
202        .expect("task panicked");
203        assert!(result.is_ok());
204    }
205
206    struct DirectErrorHandler;
207    impl CommandHandler<CreateUser> for DirectErrorHandler {
208        type Error = HexeractError;
209        async fn handle(
210            &self,
211            _cmd: CreateUser,
212            _ctx: &HandlerContext,
213        ) -> Result<UserCreated, Self::Error> {
214            Err(HexeractError::Dispatch("forced".into()))
215        }
216    }
217
218    #[tokio::test]
219    async fn handler_can_use_hexeract_error_directly_as_error_type() {
220        let handler = DirectErrorHandler;
221        let ctx = fresh_ctx();
222        let err = handler
223            .handle(
224                CreateUser {
225                    email: "any".into(),
226                },
227                &ctx,
228            )
229            .await
230            .expect_err("must fail");
231        assert!(matches!(err, HexeractError::Dispatch(_)));
232    }
233
234    struct EchoIdsHandler;
235    struct EchoIds;
236    impl Command for EchoIds {
237        type Output = (MessageId, CorrelationId);
238    }
239
240    impl CommandHandler<EchoIds> for EchoIdsHandler {
241        type Error = HexeractError;
242        async fn handle(
243            &self,
244            _cmd: EchoIds,
245            ctx: &HandlerContext,
246        ) -> Result<(MessageId, CorrelationId), Self::Error> {
247            Ok((ctx.message_id, ctx.correlation_id))
248        }
249    }
250
251    #[tokio::test]
252    async fn handler_reads_message_and_correlation_ids_from_context() {
253        let message_id = MessageId::new();
254        let correlation_id = CorrelationId::new();
255        let ctx = HandlerContext::new(message_id, correlation_id);
256
257        let handler = EchoIdsHandler;
258        let (got_msg, got_corr) = handler
259            .handle(EchoIds, &ctx)
260            .await
261            .expect("handler should succeed");
262        assert_eq!(got_msg, message_id);
263        assert_eq!(got_corr, correlation_id);
264    }
265
266    struct SleepHandler;
267    struct SleepFor(u64);
268    impl Command for SleepFor {
269        type Output = &'static str;
270    }
271
272    impl CommandHandler<SleepFor> for SleepHandler {
273        type Error = HexeractError;
274        async fn handle(
275            &self,
276            cmd: SleepFor,
277            ctx: &HandlerContext,
278        ) -> Result<&'static str, Self::Error> {
279            tokio::select! {
280                () = ctx.cancellation.cancelled() => Err(HexeractError::Dispatch("cancelled".into())),
281                () = tokio::time::sleep(Duration::from_millis(cmd.0)) => Ok("completed"),
282            }
283        }
284    }
285
286    #[tokio::test]
287    async fn handler_observes_external_cancellation() {
288        let ctx = fresh_ctx();
289        let token = ctx.cancellation.clone();
290
291        let handle = tokio::spawn(async move {
292            let handler = SleepHandler;
293            handler.handle(SleepFor(5_000), &ctx).await
294        });
295
296        tokio::time::sleep(Duration::from_millis(50)).await;
297        token.cancel();
298
299        let result = handle.await.expect("task panicked");
300        assert!(matches!(result, Err(HexeractError::Dispatch(ref m)) if m == "cancelled"));
301    }
302
303    #[tokio::test]
304    async fn handler_is_shareable_via_arc() {
305        let handler: Arc<UserRepo> = Arc::new(UserRepo {
306            prefix: "arc".into(),
307        });
308        let h1 = Arc::clone(&handler);
309        let h2 = Arc::clone(&handler);
310
311        let t1 = tokio::spawn(async move {
312            let ctx = fresh_ctx();
313            h1.handle(CreateUser { email: "u1".into() }, &ctx).await
314        });
315        let t2 = tokio::spawn(async move {
316            let ctx = fresh_ctx();
317            h2.handle(CreateUser { email: "u2".into() }, &ctx).await
318        });
319
320        let (r1, r2) = tokio::join!(t1, t2);
321        assert!(r1.unwrap().is_ok());
322        assert!(r2.unwrap().is_ok());
323    }
324
325    #[derive(Debug)]
326    struct UserSummary {
327        id: Uuid,
328    }
329
330    struct FindUser {
331        id: Uuid,
332    }
333
334    impl Query for FindUser {
335        type Output = Option<UserSummary>;
336    }
337
338    struct UserFinder;
339
340    impl QueryHandler<FindUser> for UserFinder {
341        type Error = HexeractError;
342        async fn handle(
343            &self,
344            query: FindUser,
345            _ctx: &HandlerContext,
346        ) -> Result<Option<UserSummary>, Self::Error> {
347            Ok(Some(UserSummary { id: query.id }))
348        }
349    }
350
351    #[tokio::test]
352    async fn query_handler_returns_output() {
353        let id = Uuid::new_v4();
354        let handler = UserFinder;
355        let ctx = fresh_ctx();
356        let result = handler
357            .handle(FindUser { id }, &ctx)
358            .await
359            .expect("query should succeed");
360        assert_eq!(result.unwrap().id, id);
361    }
362
363    #[tokio::test]
364    async fn query_handler_future_is_send() {
365        let handler = UserFinder;
366        let ctx = fresh_ctx();
367        let future = handler.handle(FindUser { id: Uuid::new_v4() }, &ctx);
368        assert_send(&future);
369        let _ = future.await;
370    }
371
372    #[tokio::test]
373    async fn query_handler_runs_in_spawned_task() {
374        let handler = Arc::new(UserFinder);
375        let cloned = Arc::clone(&handler);
376        let result = tokio::spawn(async move {
377            let ctx = fresh_ctx();
378            cloned.handle(FindUser { id: Uuid::new_v4() }, &ctx).await
379        })
380        .await
381        .expect("task panicked");
382        assert!(result.is_ok());
383    }
384
385    struct FailingQuery;
386    impl QueryHandler<FindUser> for FailingQuery {
387        type Error = UserError;
388        async fn handle(
389            &self,
390            _query: FindUser,
391            _ctx: &HandlerContext,
392        ) -> Result<Option<UserSummary>, Self::Error> {
393            Err(UserError::InvalidEmail)
394        }
395    }
396
397    #[tokio::test]
398    async fn query_handler_error_converts_into_hexeract_error() {
399        let handler = FailingQuery;
400        let ctx = fresh_ctx();
401        let err = handler
402            .handle(FindUser { id: Uuid::new_v4() }, &ctx)
403            .await
404            .expect_err("must fail");
405        let framework_err: HexeractError = err.into();
406        assert!(matches!(framework_err, HexeractError::HandlerFailed { .. }));
407    }
408}