1use std::sync::Arc;
2
3use crate::command::Command;
4use crate::context::HandlerContext;
5use crate::error::HexeractError;
6use crate::notification::Notification;
7use crate::query::Query;
8
9#[trait_variant::make(Send)]
45pub trait CommandHandler<C: Command>: Send + Sync + 'static {
46 type Error: Into<HexeractError> + Send + Sync + 'static;
48
49 async fn handle(&self, command: C, ctx: &HandlerContext) -> Result<C::Output, Self::Error>;
51}
52
53#[trait_variant::make(Send)]
55pub trait QueryHandler<Q: Query>: Send + Sync + 'static {
56 type Error: Into<HexeractError> + Send + Sync + 'static;
58
59 async fn handle(&self, query: Q, ctx: &HandlerContext) -> Result<Q::Output, Self::Error>;
61}
62
63#[trait_variant::make(Send)]
70pub trait NotificationHandler<N: Notification>: Send + Sync + 'static {
71 type Error: Into<HexeractError> + Send + Sync + 'static;
73
74 async fn handle(&self, notification: Arc<N>, ctx: &HandlerContext) -> Result<(), Self::Error>;
80}
81
82#[cfg(test)]
83mod tests {
84 use super::*;
85 use crate::ids::{CorrelationId, MessageId};
86 use std::time::Duration;
87 use uuid::Uuid;
88
89 fn fresh_ctx() -> HandlerContext {
90 HandlerContext::new(MessageId::new(), CorrelationId::new())
91 }
92
93 fn assert_send<T: Send>(_: &T) {}
94
95 #[derive(Debug, PartialEq, Eq, Clone)]
96 struct UserCreated {
97 id: Uuid,
98 email: String,
99 }
100
101 struct CreateUser {
102 email: String,
103 }
104
105 impl Command for CreateUser {
106 type Output = UserCreated;
107 }
108
109 #[derive(Debug, thiserror::Error)]
110 enum UserError {
111 #[error("invalid email")]
112 InvalidEmail,
113 }
114
115 impl From<UserError> for HexeractError {
116 fn from(value: UserError) -> Self {
117 Self::handler_failed(value)
118 }
119 }
120
121 struct UserRepo {
122 prefix: String,
123 }
124
125 impl CommandHandler<CreateUser> for UserRepo {
126 type Error = UserError;
127 async fn handle(
128 &self,
129 cmd: CreateUser,
130 _ctx: &HandlerContext,
131 ) -> Result<UserCreated, Self::Error> {
132 if cmd.email.is_empty() {
133 return Err(UserError::InvalidEmail);
134 }
135 Ok(UserCreated {
136 id: Uuid::new_v4(),
137 email: format!("{}-{}", self.prefix, cmd.email),
138 })
139 }
140 }
141
142 #[tokio::test]
143 async fn command_handler_returns_complex_output() {
144 let repo = UserRepo {
145 prefix: "test".into(),
146 };
147 let ctx = fresh_ctx();
148 let result = repo
149 .handle(
150 CreateUser {
151 email: "alice@example.com".into(),
152 },
153 &ctx,
154 )
155 .await
156 .expect("handler should succeed");
157 assert_eq!(result.email, "test-alice@example.com");
158 }
159
160 #[tokio::test]
161 async fn command_handler_returns_typed_error_for_invalid_input() {
162 let repo = UserRepo {
163 prefix: "test".into(),
164 };
165 let ctx = fresh_ctx();
166 let err = repo
167 .handle(
168 CreateUser {
169 email: String::new(),
170 },
171 &ctx,
172 )
173 .await
174 .expect_err("empty email must fail");
175 assert!(matches!(err, UserError::InvalidEmail));
176 let framework_err: HexeractError = err.into();
177 assert!(matches!(framework_err, HexeractError::HandlerFailed { .. }));
178 }
179
180 #[tokio::test]
181 async fn handler_future_is_send() {
182 let repo = UserRepo {
183 prefix: "send".into(),
184 };
185 let ctx = fresh_ctx();
186 let future = repo.handle(
187 CreateUser {
188 email: "send@test".into(),
189 },
190 &ctx,
191 );
192 assert_send(&future);
193 let _ = future.await;
194 }
195
196 #[tokio::test]
197 async fn handler_runs_in_spawned_task() {
198 let repo = Arc::new(UserRepo {
199 prefix: "spawn".into(),
200 });
201 let cloned = Arc::clone(&repo);
202 let result = tokio::spawn(async move {
203 let ctx = fresh_ctx();
204 cloned.handle(CreateUser { email: "ok".into() }, &ctx).await
205 })
206 .await
207 .expect("task panicked");
208 assert!(result.is_ok());
209 }
210
211 struct DirectErrorHandler;
212 impl CommandHandler<CreateUser> for DirectErrorHandler {
213 type Error = HexeractError;
214 async fn handle(
215 &self,
216 _cmd: CreateUser,
217 _ctx: &HandlerContext,
218 ) -> Result<UserCreated, Self::Error> {
219 Err(HexeractError::Dispatch("forced".into()))
220 }
221 }
222
223 #[tokio::test]
224 async fn handler_can_use_hexeract_error_directly_as_error_type() {
225 let handler = DirectErrorHandler;
226 let ctx = fresh_ctx();
227 let err = handler
228 .handle(
229 CreateUser {
230 email: "any".into(),
231 },
232 &ctx,
233 )
234 .await
235 .expect_err("must fail");
236 assert!(matches!(err, HexeractError::Dispatch(_)));
237 }
238
239 struct EchoIdsHandler;
240 struct EchoIds;
241 impl Command for EchoIds {
242 type Output = (MessageId, CorrelationId);
243 }
244
245 impl CommandHandler<EchoIds> for EchoIdsHandler {
246 type Error = HexeractError;
247 async fn handle(
248 &self,
249 _cmd: EchoIds,
250 ctx: &HandlerContext,
251 ) -> Result<(MessageId, CorrelationId), Self::Error> {
252 Ok((ctx.message_id, ctx.correlation_id))
253 }
254 }
255
256 #[tokio::test]
257 async fn handler_reads_message_and_correlation_ids_from_context() {
258 let message_id = MessageId::new();
259 let correlation_id = CorrelationId::new();
260 let ctx = HandlerContext::new(message_id, correlation_id);
261
262 let handler = EchoIdsHandler;
263 let (got_msg, got_corr) = handler
264 .handle(EchoIds, &ctx)
265 .await
266 .expect("handler should succeed");
267 assert_eq!(got_msg, message_id);
268 assert_eq!(got_corr, correlation_id);
269 }
270
271 struct SleepHandler;
272 struct SleepFor(u64);
273 impl Command for SleepFor {
274 type Output = &'static str;
275 }
276
277 impl CommandHandler<SleepFor> for SleepHandler {
278 type Error = HexeractError;
279 async fn handle(
280 &self,
281 cmd: SleepFor,
282 ctx: &HandlerContext,
283 ) -> Result<&'static str, Self::Error> {
284 tokio::select! {
285 () = ctx.cancellation.cancelled() => Err(HexeractError::Dispatch("cancelled".into())),
286 () = tokio::time::sleep(Duration::from_millis(cmd.0)) => Ok("completed"),
287 }
288 }
289 }
290
291 #[tokio::test]
292 async fn handler_observes_external_cancellation() {
293 let ctx = fresh_ctx();
294 let token = ctx.cancellation.clone();
295
296 let handle = tokio::spawn(async move {
297 let handler = SleepHandler;
298 handler.handle(SleepFor(5_000), &ctx).await
299 });
300
301 tokio::time::sleep(Duration::from_millis(50)).await;
302 token.cancel();
303
304 let result = handle.await.expect("task panicked");
305 assert!(matches!(result, Err(HexeractError::Dispatch(ref m)) if m == "cancelled"));
306 }
307
308 #[tokio::test]
309 async fn handler_is_shareable_via_arc() {
310 let handler: Arc<UserRepo> = Arc::new(UserRepo {
311 prefix: "arc".into(),
312 });
313 let h1 = Arc::clone(&handler);
314 let h2 = Arc::clone(&handler);
315
316 let t1 = tokio::spawn(async move {
317 let ctx = fresh_ctx();
318 h1.handle(CreateUser { email: "u1".into() }, &ctx).await
319 });
320 let t2 = tokio::spawn(async move {
321 let ctx = fresh_ctx();
322 h2.handle(CreateUser { email: "u2".into() }, &ctx).await
323 });
324
325 let (r1, r2) = tokio::join!(t1, t2);
326 assert!(r1.unwrap().is_ok());
327 assert!(r2.unwrap().is_ok());
328 }
329
330 #[derive(Debug)]
331 struct UserSummary {
332 id: Uuid,
333 }
334
335 struct FindUser {
336 id: Uuid,
337 }
338
339 impl Query for FindUser {
340 type Output = Option<UserSummary>;
341 }
342
343 struct UserFinder;
344
345 impl QueryHandler<FindUser> for UserFinder {
346 type Error = HexeractError;
347 async fn handle(
348 &self,
349 query: FindUser,
350 _ctx: &HandlerContext,
351 ) -> Result<Option<UserSummary>, Self::Error> {
352 Ok(Some(UserSummary { id: query.id }))
353 }
354 }
355
356 #[tokio::test]
357 async fn query_handler_returns_output() {
358 let id = Uuid::new_v4();
359 let handler = UserFinder;
360 let ctx = fresh_ctx();
361 let result = handler
362 .handle(FindUser { id }, &ctx)
363 .await
364 .expect("query should succeed");
365 assert_eq!(result.unwrap().id, id);
366 }
367
368 #[tokio::test]
369 async fn query_handler_future_is_send() {
370 let handler = UserFinder;
371 let ctx = fresh_ctx();
372 let future = handler.handle(FindUser { id: Uuid::new_v4() }, &ctx);
373 assert_send(&future);
374 let _ = future.await;
375 }
376
377 #[tokio::test]
378 async fn query_handler_runs_in_spawned_task() {
379 let handler = Arc::new(UserFinder);
380 let cloned = Arc::clone(&handler);
381 let result = tokio::spawn(async move {
382 let ctx = fresh_ctx();
383 cloned.handle(FindUser { id: Uuid::new_v4() }, &ctx).await
384 })
385 .await
386 .expect("task panicked");
387 assert!(result.is_ok());
388 }
389
390 struct FailingQuery;
391 impl QueryHandler<FindUser> for FailingQuery {
392 type Error = UserError;
393 async fn handle(
394 &self,
395 _query: FindUser,
396 _ctx: &HandlerContext,
397 ) -> Result<Option<UserSummary>, Self::Error> {
398 Err(UserError::InvalidEmail)
399 }
400 }
401
402 #[tokio::test]
403 async fn query_handler_error_converts_into_hexeract_error() {
404 let handler = FailingQuery;
405 let ctx = fresh_ctx();
406 let err = handler
407 .handle(FindUser { id: Uuid::new_v4() }, &ctx)
408 .await
409 .expect_err("must fail");
410 let framework_err: HexeractError = err.into();
411 assert!(matches!(framework_err, HexeractError::HandlerFailed { .. }));
412 }
413}