use std::sync::Arc;
use crate::command::Command;
use crate::context::HandlerContext;
use crate::error::HexeractError;
use crate::notification::Notification;
use crate::query::Query;
#[trait_variant::make(Send)]
pub trait CommandHandler<C: Command>: Send + Sync + 'static {
type Error: Into<HexeractError> + Send + Sync + 'static;
async fn handle(&self, command: C, ctx: &HandlerContext) -> Result<C::Output, Self::Error>;
}
#[trait_variant::make(Send)]
pub trait QueryHandler<Q: Query>: Send + Sync + 'static {
type Error: Into<HexeractError> + Send + Sync + 'static;
async fn handle(&self, query: Q, ctx: &HandlerContext) -> Result<Q::Output, Self::Error>;
}
#[trait_variant::make(Send)]
pub trait NotificationHandler<N: Notification>: Send + Sync + 'static {
type Error: Into<HexeractError> + Send + Sync + 'static;
async fn handle(&self, notification: Arc<N>, ctx: &HandlerContext) -> Result<(), Self::Error>;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ids::{CorrelationId, MessageId};
use std::time::Duration;
use uuid::Uuid;
fn fresh_ctx() -> HandlerContext {
HandlerContext::new(MessageId::new(), CorrelationId::new())
}
fn assert_send<T: Send>(_: &T) {}
#[derive(Debug, PartialEq, Eq, Clone)]
struct UserCreated {
id: Uuid,
email: String,
}
struct CreateUser {
email: String,
}
impl Command for CreateUser {
type Output = UserCreated;
}
#[derive(Debug, thiserror::Error)]
enum UserError {
#[error("invalid email")]
InvalidEmail,
}
impl From<UserError> for HexeractError {
fn from(value: UserError) -> Self {
Self::handler_failed(value)
}
}
struct UserRepo {
prefix: String,
}
impl CommandHandler<CreateUser> for UserRepo {
type Error = UserError;
async fn handle(
&self,
cmd: CreateUser,
_ctx: &HandlerContext,
) -> Result<UserCreated, Self::Error> {
if cmd.email.is_empty() {
return Err(UserError::InvalidEmail);
}
Ok(UserCreated {
id: Uuid::new_v4(),
email: format!("{}-{}", self.prefix, cmd.email),
})
}
}
#[tokio::test]
async fn command_handler_returns_complex_output() {
let repo = UserRepo {
prefix: "test".into(),
};
let ctx = fresh_ctx();
let result = repo
.handle(
CreateUser {
email: "alice@example.com".into(),
},
&ctx,
)
.await
.expect("handler should succeed");
assert_eq!(result.email, "test-alice@example.com");
}
#[tokio::test]
async fn command_handler_returns_typed_error_for_invalid_input() {
let repo = UserRepo {
prefix: "test".into(),
};
let ctx = fresh_ctx();
let err = repo
.handle(
CreateUser {
email: String::new(),
},
&ctx,
)
.await
.expect_err("empty email must fail");
assert!(matches!(err, UserError::InvalidEmail));
let framework_err: HexeractError = err.into();
assert!(matches!(framework_err, HexeractError::HandlerFailed { .. }));
}
#[tokio::test]
async fn handler_future_is_send() {
let repo = UserRepo {
prefix: "send".into(),
};
let ctx = fresh_ctx();
let future = repo.handle(
CreateUser {
email: "send@test".into(),
},
&ctx,
);
assert_send(&future);
let _ = future.await;
}
#[tokio::test]
async fn handler_runs_in_spawned_task() {
let repo = Arc::new(UserRepo {
prefix: "spawn".into(),
});
let cloned = Arc::clone(&repo);
let result = tokio::spawn(async move {
let ctx = fresh_ctx();
cloned.handle(CreateUser { email: "ok".into() }, &ctx).await
})
.await
.expect("task panicked");
assert!(result.is_ok());
}
struct DirectErrorHandler;
impl CommandHandler<CreateUser> for DirectErrorHandler {
type Error = HexeractError;
async fn handle(
&self,
_cmd: CreateUser,
_ctx: &HandlerContext,
) -> Result<UserCreated, Self::Error> {
Err(HexeractError::Dispatch("forced".into()))
}
}
#[tokio::test]
async fn handler_can_use_hexeract_error_directly_as_error_type() {
let handler = DirectErrorHandler;
let ctx = fresh_ctx();
let err = handler
.handle(
CreateUser {
email: "any".into(),
},
&ctx,
)
.await
.expect_err("must fail");
assert!(matches!(err, HexeractError::Dispatch(_)));
}
struct EchoIdsHandler;
struct EchoIds;
impl Command for EchoIds {
type Output = (MessageId, CorrelationId);
}
impl CommandHandler<EchoIds> for EchoIdsHandler {
type Error = HexeractError;
async fn handle(
&self,
_cmd: EchoIds,
ctx: &HandlerContext,
) -> Result<(MessageId, CorrelationId), Self::Error> {
Ok((ctx.message_id, ctx.correlation_id))
}
}
#[tokio::test]
async fn handler_reads_message_and_correlation_ids_from_context() {
let message_id = MessageId::new();
let correlation_id = CorrelationId::new();
let ctx = HandlerContext::new(message_id, correlation_id);
let handler = EchoIdsHandler;
let (got_msg, got_corr) = handler
.handle(EchoIds, &ctx)
.await
.expect("handler should succeed");
assert_eq!(got_msg, message_id);
assert_eq!(got_corr, correlation_id);
}
struct SleepHandler;
struct SleepFor(u64);
impl Command for SleepFor {
type Output = &'static str;
}
impl CommandHandler<SleepFor> for SleepHandler {
type Error = HexeractError;
async fn handle(
&self,
cmd: SleepFor,
ctx: &HandlerContext,
) -> Result<&'static str, Self::Error> {
tokio::select! {
() = ctx.cancellation.cancelled() => Err(HexeractError::Dispatch("cancelled".into())),
() = tokio::time::sleep(Duration::from_millis(cmd.0)) => Ok("completed"),
}
}
}
#[tokio::test]
async fn handler_observes_external_cancellation() {
let ctx = fresh_ctx();
let token = ctx.cancellation.clone();
let handle = tokio::spawn(async move {
let handler = SleepHandler;
handler.handle(SleepFor(5_000), &ctx).await
});
tokio::time::sleep(Duration::from_millis(50)).await;
token.cancel();
let result = handle.await.expect("task panicked");
assert!(matches!(result, Err(HexeractError::Dispatch(ref m)) if m == "cancelled"));
}
#[tokio::test]
async fn handler_is_shareable_via_arc() {
let handler: Arc<UserRepo> = Arc::new(UserRepo {
prefix: "arc".into(),
});
let h1 = Arc::clone(&handler);
let h2 = Arc::clone(&handler);
let t1 = tokio::spawn(async move {
let ctx = fresh_ctx();
h1.handle(CreateUser { email: "u1".into() }, &ctx).await
});
let t2 = tokio::spawn(async move {
let ctx = fresh_ctx();
h2.handle(CreateUser { email: "u2".into() }, &ctx).await
});
let (r1, r2) = tokio::join!(t1, t2);
assert!(r1.unwrap().is_ok());
assert!(r2.unwrap().is_ok());
}
#[derive(Debug)]
struct UserSummary {
id: Uuid,
}
struct FindUser {
id: Uuid,
}
impl Query for FindUser {
type Output = Option<UserSummary>;
}
struct UserFinder;
impl QueryHandler<FindUser> for UserFinder {
type Error = HexeractError;
async fn handle(
&self,
query: FindUser,
_ctx: &HandlerContext,
) -> Result<Option<UserSummary>, Self::Error> {
Ok(Some(UserSummary { id: query.id }))
}
}
#[tokio::test]
async fn query_handler_returns_output() {
let id = Uuid::new_v4();
let handler = UserFinder;
let ctx = fresh_ctx();
let result = handler
.handle(FindUser { id }, &ctx)
.await
.expect("query should succeed");
assert_eq!(result.unwrap().id, id);
}
#[tokio::test]
async fn query_handler_future_is_send() {
let handler = UserFinder;
let ctx = fresh_ctx();
let future = handler.handle(FindUser { id: Uuid::new_v4() }, &ctx);
assert_send(&future);
let _ = future.await;
}
#[tokio::test]
async fn query_handler_runs_in_spawned_task() {
let handler = Arc::new(UserFinder);
let cloned = Arc::clone(&handler);
let result = tokio::spawn(async move {
let ctx = fresh_ctx();
cloned.handle(FindUser { id: Uuid::new_v4() }, &ctx).await
})
.await
.expect("task panicked");
assert!(result.is_ok());
}
struct FailingQuery;
impl QueryHandler<FindUser> for FailingQuery {
type Error = UserError;
async fn handle(
&self,
_query: FindUser,
_ctx: &HandlerContext,
) -> Result<Option<UserSummary>, Self::Error> {
Err(UserError::InvalidEmail)
}
}
#[tokio::test]
async fn query_handler_error_converts_into_hexeract_error() {
let handler = FailingQuery;
let ctx = fresh_ctx();
let err = handler
.handle(FindUser { id: Uuid::new_v4() }, &ctx)
.await
.expect_err("must fail");
let framework_err: HexeractError = err.into();
assert!(matches!(framework_err, HexeractError::HandlerFailed { .. }));
}
}