1use crate::command::Command;
2use crate::context::HandlerContext;
3use crate::error::HexeractError;
4use crate::notification::Notification;
5use crate::query::Query;
6
7#[trait_variant::make(Send)]
43pub trait CommandHandler<C: Command>: Send + Sync + 'static {
44 type Error: Into<HexeractError> + Send + Sync + 'static;
46
47 async fn handle(&self, command: C, ctx: &HandlerContext) -> Result<C::Output, Self::Error>;
49}
50
51#[trait_variant::make(Send)]
53pub trait QueryHandler<Q: Query>: Send + Sync + 'static {
54 type Error: Into<HexeractError> + Send + Sync + 'static;
56
57 async fn handle(&self, query: Q, ctx: &HandlerContext) -> Result<Q::Output, Self::Error>;
59}
60
61#[trait_variant::make(Send)]
68pub trait NotificationHandler<N: Notification>: Send + Sync + 'static {
69 type Error: Into<HexeractError> + Send + Sync + 'static;
71
72 async fn handle(&self, notification: N, ctx: &HandlerContext) -> Result<(), Self::Error>;
74}
75
76#[cfg(test)]
77mod tests {
78 use super::*;
79 use crate::ids::{CorrelationId, MessageId};
80 use std::sync::Arc;
81 use std::time::Duration;
82 use uuid::Uuid;
83
84 fn fresh_ctx() -> HandlerContext {
85 HandlerContext::new(MessageId::new(), CorrelationId::new())
86 }
87
88 fn assert_send<T: Send>(_: &T) {}
89
90 #[derive(Debug, PartialEq, Eq, Clone)]
91 struct UserCreated {
92 id: Uuid,
93 email: String,
94 }
95
96 struct CreateUser {
97 email: String,
98 }
99
100 impl Command for CreateUser {
101 type Output = UserCreated;
102 }
103
104 #[derive(Debug, thiserror::Error)]
105 enum UserError {
106 #[error("invalid email")]
107 InvalidEmail,
108 }
109
110 impl From<UserError> for HexeractError {
111 fn from(value: UserError) -> Self {
112 Self::handler_failed(value)
113 }
114 }
115
116 struct UserRepo {
117 prefix: String,
118 }
119
120 impl CommandHandler<CreateUser> for UserRepo {
121 type Error = UserError;
122 async fn handle(
123 &self,
124 cmd: CreateUser,
125 _ctx: &HandlerContext,
126 ) -> Result<UserCreated, Self::Error> {
127 if cmd.email.is_empty() {
128 return Err(UserError::InvalidEmail);
129 }
130 Ok(UserCreated {
131 id: Uuid::new_v4(),
132 email: format!("{}-{}", self.prefix, cmd.email),
133 })
134 }
135 }
136
137 #[tokio::test]
138 async fn command_handler_returns_complex_output() {
139 let repo = UserRepo {
140 prefix: "test".into(),
141 };
142 let ctx = fresh_ctx();
143 let result = repo
144 .handle(
145 CreateUser {
146 email: "alice@example.com".into(),
147 },
148 &ctx,
149 )
150 .await
151 .expect("handler should succeed");
152 assert_eq!(result.email, "test-alice@example.com");
153 }
154
155 #[tokio::test]
156 async fn command_handler_returns_typed_error_for_invalid_input() {
157 let repo = UserRepo {
158 prefix: "test".into(),
159 };
160 let ctx = fresh_ctx();
161 let err = repo
162 .handle(
163 CreateUser {
164 email: String::new(),
165 },
166 &ctx,
167 )
168 .await
169 .expect_err("empty email must fail");
170 assert!(matches!(err, UserError::InvalidEmail));
171 let framework_err: HexeractError = err.into();
172 assert!(matches!(framework_err, HexeractError::HandlerFailed { .. }));
173 }
174
175 #[tokio::test]
176 async fn handler_future_is_send() {
177 let repo = UserRepo {
178 prefix: "send".into(),
179 };
180 let ctx = fresh_ctx();
181 let future = repo.handle(
182 CreateUser {
183 email: "send@test".into(),
184 },
185 &ctx,
186 );
187 assert_send(&future);
188 let _ = future.await;
189 }
190
191 #[tokio::test]
192 async fn handler_runs_in_spawned_task() {
193 let repo = Arc::new(UserRepo {
194 prefix: "spawn".into(),
195 });
196 let cloned = Arc::clone(&repo);
197 let result = tokio::spawn(async move {
198 let ctx = fresh_ctx();
199 cloned.handle(CreateUser { email: "ok".into() }, &ctx).await
200 })
201 .await
202 .expect("task panicked");
203 assert!(result.is_ok());
204 }
205
206 struct DirectErrorHandler;
207 impl CommandHandler<CreateUser> for DirectErrorHandler {
208 type Error = HexeractError;
209 async fn handle(
210 &self,
211 _cmd: CreateUser,
212 _ctx: &HandlerContext,
213 ) -> Result<UserCreated, Self::Error> {
214 Err(HexeractError::Dispatch("forced".into()))
215 }
216 }
217
218 #[tokio::test]
219 async fn handler_can_use_hexeract_error_directly_as_error_type() {
220 let handler = DirectErrorHandler;
221 let ctx = fresh_ctx();
222 let err = handler
223 .handle(
224 CreateUser {
225 email: "any".into(),
226 },
227 &ctx,
228 )
229 .await
230 .expect_err("must fail");
231 assert!(matches!(err, HexeractError::Dispatch(_)));
232 }
233
234 struct EchoIdsHandler;
235 struct EchoIds;
236 impl Command for EchoIds {
237 type Output = (MessageId, CorrelationId);
238 }
239
240 impl CommandHandler<EchoIds> for EchoIdsHandler {
241 type Error = HexeractError;
242 async fn handle(
243 &self,
244 _cmd: EchoIds,
245 ctx: &HandlerContext,
246 ) -> Result<(MessageId, CorrelationId), Self::Error> {
247 Ok((ctx.message_id, ctx.correlation_id))
248 }
249 }
250
251 #[tokio::test]
252 async fn handler_reads_message_and_correlation_ids_from_context() {
253 let message_id = MessageId::new();
254 let correlation_id = CorrelationId::new();
255 let ctx = HandlerContext::new(message_id, correlation_id);
256
257 let handler = EchoIdsHandler;
258 let (got_msg, got_corr) = handler
259 .handle(EchoIds, &ctx)
260 .await
261 .expect("handler should succeed");
262 assert_eq!(got_msg, message_id);
263 assert_eq!(got_corr, correlation_id);
264 }
265
266 struct SleepHandler;
267 struct SleepFor(u64);
268 impl Command for SleepFor {
269 type Output = &'static str;
270 }
271
272 impl CommandHandler<SleepFor> for SleepHandler {
273 type Error = HexeractError;
274 async fn handle(
275 &self,
276 cmd: SleepFor,
277 ctx: &HandlerContext,
278 ) -> Result<&'static str, Self::Error> {
279 tokio::select! {
280 () = ctx.cancellation.cancelled() => Err(HexeractError::Dispatch("cancelled".into())),
281 () = tokio::time::sleep(Duration::from_millis(cmd.0)) => Ok("completed"),
282 }
283 }
284 }
285
286 #[tokio::test]
287 async fn handler_observes_external_cancellation() {
288 let ctx = fresh_ctx();
289 let token = ctx.cancellation.clone();
290
291 let handle = tokio::spawn(async move {
292 let handler = SleepHandler;
293 handler.handle(SleepFor(5_000), &ctx).await
294 });
295
296 tokio::time::sleep(Duration::from_millis(50)).await;
297 token.cancel();
298
299 let result = handle.await.expect("task panicked");
300 assert!(matches!(result, Err(HexeractError::Dispatch(ref m)) if m == "cancelled"));
301 }
302
303 #[tokio::test]
304 async fn handler_is_shareable_via_arc() {
305 let handler: Arc<UserRepo> = Arc::new(UserRepo {
306 prefix: "arc".into(),
307 });
308 let h1 = Arc::clone(&handler);
309 let h2 = Arc::clone(&handler);
310
311 let t1 = tokio::spawn(async move {
312 let ctx = fresh_ctx();
313 h1.handle(CreateUser { email: "u1".into() }, &ctx).await
314 });
315 let t2 = tokio::spawn(async move {
316 let ctx = fresh_ctx();
317 h2.handle(CreateUser { email: "u2".into() }, &ctx).await
318 });
319
320 let (r1, r2) = tokio::join!(t1, t2);
321 assert!(r1.unwrap().is_ok());
322 assert!(r2.unwrap().is_ok());
323 }
324
325 #[derive(Debug)]
326 struct UserSummary {
327 id: Uuid,
328 }
329
330 struct FindUser {
331 id: Uuid,
332 }
333
334 impl Query for FindUser {
335 type Output = Option<UserSummary>;
336 }
337
338 struct UserFinder;
339
340 impl QueryHandler<FindUser> for UserFinder {
341 type Error = HexeractError;
342 async fn handle(
343 &self,
344 query: FindUser,
345 _ctx: &HandlerContext,
346 ) -> Result<Option<UserSummary>, Self::Error> {
347 Ok(Some(UserSummary { id: query.id }))
348 }
349 }
350
351 #[tokio::test]
352 async fn query_handler_returns_output() {
353 let id = Uuid::new_v4();
354 let handler = UserFinder;
355 let ctx = fresh_ctx();
356 let result = handler
357 .handle(FindUser { id }, &ctx)
358 .await
359 .expect("query should succeed");
360 assert_eq!(result.unwrap().id, id);
361 }
362
363 #[tokio::test]
364 async fn query_handler_future_is_send() {
365 let handler = UserFinder;
366 let ctx = fresh_ctx();
367 let future = handler.handle(FindUser { id: Uuid::new_v4() }, &ctx);
368 assert_send(&future);
369 let _ = future.await;
370 }
371
372 #[tokio::test]
373 async fn query_handler_runs_in_spawned_task() {
374 let handler = Arc::new(UserFinder);
375 let cloned = Arc::clone(&handler);
376 let result = tokio::spawn(async move {
377 let ctx = fresh_ctx();
378 cloned.handle(FindUser { id: Uuid::new_v4() }, &ctx).await
379 })
380 .await
381 .expect("task panicked");
382 assert!(result.is_ok());
383 }
384
385 struct FailingQuery;
386 impl QueryHandler<FindUser> for FailingQuery {
387 type Error = UserError;
388 async fn handle(
389 &self,
390 _query: FindUser,
391 _ctx: &HandlerContext,
392 ) -> Result<Option<UserSummary>, Self::Error> {
393 Err(UserError::InvalidEmail)
394 }
395 }
396
397 #[tokio::test]
398 async fn query_handler_error_converts_into_hexeract_error() {
399 let handler = FailingQuery;
400 let ctx = fresh_ctx();
401 let err = handler
402 .handle(FindUser { id: Uuid::new_v4() }, &ctx)
403 .await
404 .expect_err("must fail");
405 let framework_err: HexeractError = err.into();
406 assert!(matches!(framework_err, HexeractError::HandlerFailed { .. }));
407 }
408}