Skip to main content

hexeract_core/
handler.rs

1use crate::command::Command;
2use crate::context::HandlerContext;
3use crate::error::HexeractError;
4use crate::query::Query;
5
6/// Asynchronous handler for a [`Command`].
7///
8/// Each [`Command`] type has exactly one registered `CommandHandler`. The
9/// handler receives an immutable reference to itself, the command value and
10/// a [`HandlerContext`] carrying tracing and cancellation information.
11///
12/// # Example
13///
14/// ```
15/// use hexeract_core::{Command, CommandHandler, HandlerContext, HexeractError};
16/// use uuid::Uuid;
17///
18/// struct CreateUser {
19///     pub email: String,
20/// }
21///
22/// impl Command for CreateUser {
23///     type Output = Uuid;
24/// }
25///
26/// struct UserRepository;
27///
28/// impl CommandHandler<CreateUser> for UserRepository {
29///     type Error = HexeractError;
30///
31///     async fn handle(
32///         &self,
33///         cmd: CreateUser,
34///         _ctx: &HandlerContext,
35///     ) -> Result<Uuid, Self::Error> {
36///         let _ = cmd.email;
37///         Ok(Uuid::new_v4())
38///     }
39/// }
40/// ```
41#[trait_variant::make(Send)]
42pub trait CommandHandler<C: Command>: Send + Sync + 'static {
43    /// The handler-defined error type, convertible into [`HexeractError`].
44    type Error: Into<HexeractError> + Send + Sync + 'static;
45
46    /// Handles the command and produces its output.
47    async fn handle(&self, command: C, ctx: &HandlerContext) -> Result<C::Output, Self::Error>;
48}
49
50/// Asynchronous handler for a [`Query`].
51#[trait_variant::make(Send)]
52pub trait QueryHandler<Q: Query>: Send + Sync + 'static {
53    /// The handler-defined error type, convertible into [`HexeractError`].
54    type Error: Into<HexeractError> + Send + Sync + 'static;
55
56    /// Handles the query and produces its output.
57    async fn handle(&self, query: Q, ctx: &HandlerContext) -> Result<Q::Output, Self::Error>;
58}
59
60#[cfg(test)]
61mod tests {
62    use super::*;
63    use crate::ids::{CorrelationId, MessageId};
64    use std::sync::Arc;
65    use std::time::Duration;
66    use uuid::Uuid;
67
68    fn fresh_ctx() -> HandlerContext {
69        HandlerContext::new(MessageId::new(), CorrelationId::new())
70    }
71
72    fn assert_send<T: Send>(_: &T) {}
73
74    #[derive(Debug, PartialEq, Eq, Clone)]
75    struct UserCreated {
76        id: Uuid,
77        email: String,
78    }
79
80    struct CreateUser {
81        email: String,
82    }
83
84    impl Command for CreateUser {
85        type Output = UserCreated;
86    }
87
88    #[derive(Debug, thiserror::Error)]
89    enum UserError {
90        #[error("invalid email")]
91        InvalidEmail,
92    }
93
94    impl From<UserError> for HexeractError {
95        fn from(value: UserError) -> Self {
96            Self::handler_failed(value)
97        }
98    }
99
100    struct UserRepo {
101        prefix: String,
102    }
103
104    impl CommandHandler<CreateUser> for UserRepo {
105        type Error = UserError;
106        async fn handle(
107            &self,
108            cmd: CreateUser,
109            _ctx: &HandlerContext,
110        ) -> Result<UserCreated, Self::Error> {
111            if cmd.email.is_empty() {
112                return Err(UserError::InvalidEmail);
113            }
114            Ok(UserCreated {
115                id: Uuid::new_v4(),
116                email: format!("{}-{}", self.prefix, cmd.email),
117            })
118        }
119    }
120
121    #[tokio::test]
122    async fn command_handler_returns_complex_output() {
123        let repo = UserRepo {
124            prefix: "test".into(),
125        };
126        let ctx = fresh_ctx();
127        let result = repo
128            .handle(
129                CreateUser {
130                    email: "alice@example.com".into(),
131                },
132                &ctx,
133            )
134            .await
135            .expect("handler should succeed");
136        assert_eq!(result.email, "test-alice@example.com");
137    }
138
139    #[tokio::test]
140    async fn command_handler_returns_typed_error_for_invalid_input() {
141        let repo = UserRepo {
142            prefix: "test".into(),
143        };
144        let ctx = fresh_ctx();
145        let err = repo
146            .handle(
147                CreateUser {
148                    email: String::new(),
149                },
150                &ctx,
151            )
152            .await
153            .expect_err("empty email must fail");
154        assert!(matches!(err, UserError::InvalidEmail));
155        let framework_err: HexeractError = err.into();
156        assert!(matches!(framework_err, HexeractError::HandlerFailed { .. }));
157    }
158
159    #[tokio::test]
160    async fn handler_future_is_send() {
161        let repo = UserRepo {
162            prefix: "send".into(),
163        };
164        let ctx = fresh_ctx();
165        let future = repo.handle(
166            CreateUser {
167                email: "send@test".into(),
168            },
169            &ctx,
170        );
171        assert_send(&future);
172        let _ = future.await;
173    }
174
175    #[tokio::test]
176    async fn handler_runs_in_spawned_task() {
177        let repo = Arc::new(UserRepo {
178            prefix: "spawn".into(),
179        });
180        let cloned = Arc::clone(&repo);
181        let result = tokio::spawn(async move {
182            let ctx = fresh_ctx();
183            cloned.handle(CreateUser { email: "ok".into() }, &ctx).await
184        })
185        .await
186        .expect("task panicked");
187        assert!(result.is_ok());
188    }
189
190    struct DirectErrorHandler;
191    impl CommandHandler<CreateUser> for DirectErrorHandler {
192        type Error = HexeractError;
193        async fn handle(
194            &self,
195            _cmd: CreateUser,
196            _ctx: &HandlerContext,
197        ) -> Result<UserCreated, Self::Error> {
198            Err(HexeractError::Dispatch("forced".into()))
199        }
200    }
201
202    #[tokio::test]
203    async fn handler_can_use_hexeract_error_directly_as_error_type() {
204        let handler = DirectErrorHandler;
205        let ctx = fresh_ctx();
206        let err = handler
207            .handle(
208                CreateUser {
209                    email: "any".into(),
210                },
211                &ctx,
212            )
213            .await
214            .expect_err("must fail");
215        assert!(matches!(err, HexeractError::Dispatch(_)));
216    }
217
218    struct EchoIdsHandler;
219    struct EchoIds;
220    impl Command for EchoIds {
221        type Output = (MessageId, CorrelationId);
222    }
223
224    impl CommandHandler<EchoIds> for EchoIdsHandler {
225        type Error = HexeractError;
226        async fn handle(
227            &self,
228            _cmd: EchoIds,
229            ctx: &HandlerContext,
230        ) -> Result<(MessageId, CorrelationId), Self::Error> {
231            Ok((ctx.message_id, ctx.correlation_id))
232        }
233    }
234
235    #[tokio::test]
236    async fn handler_reads_message_and_correlation_ids_from_context() {
237        let message_id = MessageId::new();
238        let correlation_id = CorrelationId::new();
239        let ctx = HandlerContext::new(message_id, correlation_id);
240
241        let handler = EchoIdsHandler;
242        let (got_msg, got_corr) = handler
243            .handle(EchoIds, &ctx)
244            .await
245            .expect("handler should succeed");
246        assert_eq!(got_msg, message_id);
247        assert_eq!(got_corr, correlation_id);
248    }
249
250    struct SleepHandler;
251    struct SleepFor(u64);
252    impl Command for SleepFor {
253        type Output = &'static str;
254    }
255
256    impl CommandHandler<SleepFor> for SleepHandler {
257        type Error = HexeractError;
258        async fn handle(
259            &self,
260            cmd: SleepFor,
261            ctx: &HandlerContext,
262        ) -> Result<&'static str, Self::Error> {
263            tokio::select! {
264                () = ctx.cancellation.cancelled() => Err(HexeractError::Dispatch("cancelled".into())),
265                () = tokio::time::sleep(Duration::from_millis(cmd.0)) => Ok("completed"),
266            }
267        }
268    }
269
270    #[tokio::test]
271    async fn handler_observes_external_cancellation() {
272        let ctx = fresh_ctx();
273        let token = ctx.cancellation.clone();
274
275        let handle = tokio::spawn(async move {
276            let handler = SleepHandler;
277            handler.handle(SleepFor(5_000), &ctx).await
278        });
279
280        tokio::time::sleep(Duration::from_millis(50)).await;
281        token.cancel();
282
283        let result = handle.await.expect("task panicked");
284        assert!(matches!(result, Err(HexeractError::Dispatch(ref m)) if m == "cancelled"));
285    }
286
287    #[tokio::test]
288    async fn handler_is_shareable_via_arc() {
289        let handler: Arc<UserRepo> = Arc::new(UserRepo {
290            prefix: "arc".into(),
291        });
292        let h1 = Arc::clone(&handler);
293        let h2 = Arc::clone(&handler);
294
295        let t1 = tokio::spawn(async move {
296            let ctx = fresh_ctx();
297            h1.handle(CreateUser { email: "u1".into() }, &ctx).await
298        });
299        let t2 = tokio::spawn(async move {
300            let ctx = fresh_ctx();
301            h2.handle(CreateUser { email: "u2".into() }, &ctx).await
302        });
303
304        let (r1, r2) = tokio::join!(t1, t2);
305        assert!(r1.unwrap().is_ok());
306        assert!(r2.unwrap().is_ok());
307    }
308
309    #[derive(Debug)]
310    struct UserSummary {
311        id: Uuid,
312    }
313
314    struct FindUser {
315        id: Uuid,
316    }
317
318    impl Query for FindUser {
319        type Output = Option<UserSummary>;
320    }
321
322    struct UserFinder;
323
324    impl QueryHandler<FindUser> for UserFinder {
325        type Error = HexeractError;
326        async fn handle(
327            &self,
328            query: FindUser,
329            _ctx: &HandlerContext,
330        ) -> Result<Option<UserSummary>, Self::Error> {
331            Ok(Some(UserSummary { id: query.id }))
332        }
333    }
334
335    #[tokio::test]
336    async fn query_handler_returns_output() {
337        let id = Uuid::new_v4();
338        let handler = UserFinder;
339        let ctx = fresh_ctx();
340        let result = handler
341            .handle(FindUser { id }, &ctx)
342            .await
343            .expect("query should succeed");
344        assert_eq!(result.unwrap().id, id);
345    }
346
347    #[tokio::test]
348    async fn query_handler_future_is_send() {
349        let handler = UserFinder;
350        let ctx = fresh_ctx();
351        let future = handler.handle(FindUser { id: Uuid::new_v4() }, &ctx);
352        assert_send(&future);
353        let _ = future.await;
354    }
355
356    #[tokio::test]
357    async fn query_handler_runs_in_spawned_task() {
358        let handler = Arc::new(UserFinder);
359        let cloned = Arc::clone(&handler);
360        let result = tokio::spawn(async move {
361            let ctx = fresh_ctx();
362            cloned.handle(FindUser { id: Uuid::new_v4() }, &ctx).await
363        })
364        .await
365        .expect("task panicked");
366        assert!(result.is_ok());
367    }
368
369    struct FailingQuery;
370    impl QueryHandler<FindUser> for FailingQuery {
371        type Error = UserError;
372        async fn handle(
373            &self,
374            _query: FindUser,
375            _ctx: &HandlerContext,
376        ) -> Result<Option<UserSummary>, Self::Error> {
377            Err(UserError::InvalidEmail)
378        }
379    }
380
381    #[tokio::test]
382    async fn query_handler_error_converts_into_hexeract_error() {
383        let handler = FailingQuery;
384        let ctx = fresh_ctx();
385        let err = handler
386            .handle(FindUser { id: Uuid::new_v4() }, &ctx)
387            .await
388            .expect_err("must fail");
389        let framework_err: HexeractError = err.into();
390        assert!(matches!(framework_err, HexeractError::HandlerFailed { .. }));
391    }
392}