Skip to main content

hexeract_mediator/
lib.rs

1//! In-process mediator for the Hexeract messaging framework.
2//!
3//! The mediator dispatches a [`Command`], a [`Query`] or a [`Notification`]
4//! to the handlers registered with a [`MediatorBuilder`] at startup. Dispatch
5//! is type-safe and reflection-free: each call to
6//! [`Mediator::send`], [`Mediator::query`] or [`Mediator::publish`] resolves
7//! to the matching handler through a compile-time generic, while the internal
8//! registry erases the handler types behind a `TypeId` lookup table.
9//!
10//! Commands and queries are single-handler: registering a second handler for
11//! the same type is a build-time error. Notifications are multi-handler and
12//! fan out concurrently; every handler runs regardless of its siblings, and
13//! failures are aggregated so one handler's error never hides another.
14//!
15//! To propagate a [`CorrelationId`] across in-process dispatches (for example
16//! forwarding `ctx.correlation_id` from a command handler to a follow-up
17//! notification), use [`Mediator::send_with_correlation_id`],
18//! [`Mediator::query_with_correlation_id`] or
19//! [`Mediator::publish_with_correlation_id`]. The plain `send` / `query` /
20//! `publish` methods mint a fresh id each time.
21//!
22//! The `hexeract-middleware` crate ships two built-in middlewares:
23//! `TracingMiddleware` and `TimeoutMiddleware`. Wire them through
24//! [`MediatorBuilder::with_middleware`], or supply your own [`Middleware`]
25//! implementations for other cross-cutting concerns.
26//!
27//! # Example
28//!
29//! ```
30//! use hexeract_core::{Command, CommandHandler, HandlerContext, HexeractError};
31//! use hexeract_mediator::MediatorBuilder;
32//!
33//! struct Greet { name: String }
34//!
35//! impl Command for Greet {
36//!     type Output = String;
37//! }
38//!
39//! struct GreetHandler;
40//!
41//! impl CommandHandler<Greet> for GreetHandler {
42//!     type Error = HexeractError;
43//!     async fn handle(&self, cmd: Greet, _ctx: &HandlerContext)
44//!         -> Result<String, Self::Error>
45//!     {
46//!         Ok(format!("hello {}", cmd.name))
47//!     }
48//! }
49//!
50//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
51//! let mediator = MediatorBuilder::new()
52//!     .register_command_handler::<Greet, _>(GreetHandler)
53//!     .build()?;
54//!
55//! let greeting = mediator.send(Greet { name: "world".into() }).await?;
56//! assert_eq!(greeting, "hello world");
57//! # Ok(()) }
58//! ```
59#![cfg_attr(docsrs, feature(doc_cfg))]
60
61mod erased;
62mod terminal;
63
64use std::any::{TypeId, type_name};
65use std::collections::hash_map::Entry;
66use std::collections::{HashMap, HashSet};
67use std::fmt;
68use std::sync::{Arc, Mutex};
69
70use futures_util::future::join_all;
71use hexeract_core::{
72    Command, CommandHandler, CorrelationId, DynMiddleware, HandlerContext, HandlerKind,
73    HandlerRegistration, HexeractError, MessageEnvelope, MessageId, Middleware, Next, Notification,
74    NotificationFailure, NotificationHandler, Query, QueryHandler,
75};
76
77use crate::erased::{
78    BoxAny, ErasedCommandHandler, ErasedNotificationHandler, ErasedQueryHandler,
79    TypedCommandHandler, TypedNotificationHandler, TypedQueryHandler,
80};
81use crate::terminal::{CommandTerminal, NotificationTerminal, QueryTerminal};
82
83/// Errors raised by [`MediatorBuilder::build`] when the requested
84/// configuration is inconsistent. Handler failures at dispatch time keep
85/// flowing through [`HexeractError`].
86///
87/// Marked `#[non_exhaustive]` so that new variants can be added in minor
88/// versions without breaking downstream `match` arms.
89#[derive(Debug, thiserror::Error)]
90#[non_exhaustive]
91pub enum MediatorBuildError {
92    /// A second handler was registered for a [`Command`] or [`Query`] that
93    /// already had one. Commands and queries are single-handler by contract;
94    /// notifications are not affected by this rule.
95    #[error("duplicate handler registered for {type_name}")]
96    DuplicateHandler {
97        /// Fully-qualified type name of the offending message.
98        type_name: &'static str,
99    },
100}
101
102/// One handler that was declared via the `#[handler]` attribute macro but
103/// never registered through [`MediatorBuilder`].
104#[derive(Debug, Clone)]
105pub struct MissingHandler {
106    /// Kind of handler that was expected.
107    pub kind: HandlerKind,
108    /// Fully-qualified type name of the message type.
109    pub message_type_name: &'static str,
110    /// Fully-qualified type name of the handler type.
111    pub handler_type_name: &'static str,
112}
113
114/// Errors raised by [`MediatorBuilder::verify_handlers`].
115///
116/// Marked `#[non_exhaustive]` so that new variants can be added in minor
117/// versions without breaking downstream `match` arms.
118#[derive(Debug, thiserror::Error)]
119#[non_exhaustive]
120pub enum HandlersVerificationError {
121    /// One or more handlers declared via the `#[handler]` macro were not
122    /// registered through the fluent builder.
123    #[error("{} handler(s) declared via #[handler] are missing from the registry", missing.len())]
124    Missing {
125        /// List of missing handlers, in inventory iteration order.
126        missing: Vec<MissingHandler>,
127    },
128}
129
130/// Fluent builder that wires handlers and middlewares into a [`Mediator`].
131///
132/// # Example
133///
134/// ```ignore
135/// let mediator = MediatorBuilder::new()
136///     .register_command_handler::<CreateUser, _>(UserRepository)
137///     .register_query_handler::<GetUser, _>(UserReadModel)
138///     .register_notification_handler::<UserCreated, _>(AuditWriter)
139///     .register_notification_handler::<UserCreated, _>(EmailNotifier)
140///     .build()?;
141/// ```
142pub struct MediatorBuilder {
143    command_handlers: HashMap<TypeId, Arc<dyn ErasedCommandHandler>>,
144    query_handlers: HashMap<TypeId, Arc<dyn ErasedQueryHandler>>,
145    notification_handlers: HashMap<TypeId, Vec<Arc<dyn ErasedNotificationHandler>>>,
146    registered_command_types: HashSet<&'static str>,
147    registered_query_types: HashSet<&'static str>,
148    registered_notification_types: HashSet<&'static str>,
149    middlewares: Vec<Arc<dyn DynMiddleware>>,
150    errors: Vec<MediatorBuildError>,
151}
152
153impl MediatorBuilder {
154    /// Creates a fresh builder with no handlers and no middlewares.
155    #[must_use]
156    pub fn new() -> Self {
157        Self {
158            command_handlers: HashMap::new(),
159            query_handlers: HashMap::new(),
160            notification_handlers: HashMap::new(),
161            registered_command_types: HashSet::new(),
162            registered_query_types: HashSet::new(),
163            registered_notification_types: HashSet::new(),
164            middlewares: Vec::new(),
165            errors: Vec::new(),
166        }
167    }
168
169    /// Registers the single [`CommandHandler`] responsible for command `C`.
170    ///
171    /// Calling this twice for the same `C` accumulates a
172    /// [`MediatorBuildError::DuplicateHandler`] surfaced by [`Self::build`].
173    #[must_use]
174    pub fn register_command_handler<C, H>(mut self, handler: H) -> Self
175    where
176        C: Command,
177        H: CommandHandler<C>,
178    {
179        let tid = TypeId::of::<C>();
180        match self.command_handlers.entry(tid) {
181            Entry::Vacant(slot) => {
182                slot.insert(Arc::new(TypedCommandHandler::<C, H>::new(handler)));
183                self.registered_command_types.insert(type_name::<C>());
184            }
185            Entry::Occupied(_) => {
186                self.errors.push(MediatorBuildError::DuplicateHandler {
187                    type_name: type_name::<C>(),
188                });
189            }
190        }
191        self
192    }
193
194    /// Registers the single [`QueryHandler`] responsible for query `Q`.
195    ///
196    /// Calling this twice for the same `Q` accumulates a
197    /// [`MediatorBuildError::DuplicateHandler`] surfaced by [`Self::build`].
198    #[must_use]
199    pub fn register_query_handler<Q, H>(mut self, handler: H) -> Self
200    where
201        Q: Query,
202        H: QueryHandler<Q>,
203    {
204        let tid = TypeId::of::<Q>();
205        match self.query_handlers.entry(tid) {
206            Entry::Vacant(slot) => {
207                slot.insert(Arc::new(TypedQueryHandler::<Q, H>::new(handler)));
208                self.registered_query_types.insert(type_name::<Q>());
209            }
210            Entry::Occupied(_) => {
211                self.errors.push(MediatorBuildError::DuplicateHandler {
212                    type_name: type_name::<Q>(),
213                });
214            }
215        }
216        self
217    }
218
219    /// Registers one of possibly many [`NotificationHandler`]s for `N`.
220    ///
221    /// Notification dispatch fans out to every handler registered for `N`
222    /// in registration order.
223    #[must_use]
224    pub fn register_notification_handler<N, H>(mut self, handler: H) -> Self
225    where
226        N: Notification,
227        H: NotificationHandler<N>,
228    {
229        let tid = TypeId::of::<N>();
230        self.notification_handlers
231            .entry(tid)
232            .or_default()
233            .push(Arc::new(TypedNotificationHandler::<N, H>::new(handler)));
234        self.registered_notification_types.insert(type_name::<N>());
235        self
236    }
237
238    /// Appends a [`Middleware`] to the dispatch pipeline. Middlewares are
239    /// invoked in the order they are added, around every handler invocation.
240    #[must_use]
241    pub fn with_middleware<M: Middleware>(mut self, middleware: M) -> Self {
242        self.middlewares.push(Arc::new(middleware));
243        self
244    }
245
246    /// Verifies that every handler declared with the `#[handler]` attribute
247    /// macro from `hexeract-macros` was also registered through the fluent
248    /// builder.
249    ///
250    /// The macro emits a [`HandlerRegistration`] for every annotated item
251    /// via [`inventory`]; this method iterates the collected entries and
252    /// returns the set of declared-but-not-registered handlers. The check
253    /// is a sanity guard for typos and forgotten wirings; it does not
254    /// auto-populate the registry, since stateful handlers cannot be
255    /// constructed from metadata alone.
256    ///
257    /// # Known limitations
258    ///
259    /// **Type-name matching is not guaranteed unique.** Matching is performed
260    /// by comparing `std::any::type_name()` strings, which are not guaranteed
261    /// to be unique across different crates in the same build. Two distinct
262    /// types with the same short name in different crates but identical paths
263    /// would be treated as the same handler. This is unlikely in practice but
264    /// callers should be aware that the check is a best-effort heuristic, not a
265    /// formal proof. `type_name` output is also not stable across compiler
266    /// versions, so the strings should never be persisted or compared across
267    /// compilation units.
268    ///
269    /// **`inventory::iter` is process-global.** All `#[handler]`-annotated
270    /// items in the current process are visible to `inventory`, regardless of
271    /// which `MediatorBuilder` instance they were intended for. In a single
272    /// process that constructs two or more mediators each covering a subset of
273    /// handlers, both builders will see every registered declaration from every
274    /// crate. Each builder will therefore report the handlers it does not cover
275    /// as `Missing`. Use this method only when a single mediator is expected to
276    /// cover all declared handlers in the process.
277    ///
278    /// # Ordering
279    ///
280    /// Call this method on the builder before [`Self::build`], which
281    /// consumes the builder. It is safe to call multiple times: the
282    /// method takes `&self` and does not mutate state.
283    ///
284    /// # Errors
285    ///
286    /// Returns [`HandlersVerificationError::Missing`] listing the handlers
287    /// that are visible to `inventory` but not present in the registry.
288    pub fn verify_handlers(&self) -> Result<(), HandlersVerificationError> {
289        let mut missing = Vec::new();
290        for reg in inventory::iter::<HandlerRegistration> {
291            let message_type_name = (reg.message_type_name)();
292            let present = match reg.kind {
293                HandlerKind::Command => self.registered_command_types.contains(message_type_name),
294                HandlerKind::Query => self.registered_query_types.contains(message_type_name),
295                HandlerKind::Notification => self
296                    .registered_notification_types
297                    .contains(message_type_name),
298            };
299            if !present {
300                missing.push(MissingHandler {
301                    kind: reg.kind,
302                    message_type_name,
303                    handler_type_name: (reg.handler_type_name)(),
304                });
305            }
306        }
307        if missing.is_empty() {
308            Ok(())
309        } else {
310            Err(HandlersVerificationError::Missing { missing })
311        }
312    }
313
314    /// Consumes the builder and produces an immutable, ready-to-use
315    /// [`Mediator`].
316    ///
317    /// # Errors
318    ///
319    /// Returns the first accumulated [`MediatorBuildError`] when the
320    /// configuration is inconsistent (for example a duplicate command or
321    /// query handler registration). Only the first error is surfaced: if
322    /// several invalid registrations were performed, fix the reported one
323    /// and call [`Self::build`] again to see the next.
324    pub fn build(self) -> Result<Mediator, MediatorBuildError> {
325        if let Some(err) = self.errors.into_iter().next() {
326            return Err(err);
327        }
328        Ok(Mediator {
329            inner: Arc::new(MediatorInner {
330                command_handlers: self.command_handlers,
331                query_handlers: self.query_handlers,
332                notification_handlers: self.notification_handlers,
333                middlewares: self.middlewares.into(),
334            }),
335        })
336    }
337}
338
339impl Default for MediatorBuilder {
340    fn default() -> Self {
341        Self::new()
342    }
343}
344
345impl fmt::Debug for MediatorBuilder {
346    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347        f.debug_struct("MediatorBuilder")
348            .field("command_handlers", &self.command_handlers.len())
349            .field("query_handlers", &self.query_handlers.len())
350            .field(
351                "notification_handlers",
352                &self
353                    .notification_handlers
354                    .values()
355                    .map(Vec::len)
356                    .sum::<usize>(),
357            )
358            .field("middlewares", &self.middlewares.len())
359            .field("errors", &self.errors.len())
360            .finish_non_exhaustive()
361    }
362}
363
364/// In-process dispatcher for commands, queries and notifications.
365///
366/// Construct one with [`MediatorBuilder`], clone it freely (the registry is
367/// shared behind an [`Arc`]), and call [`Mediator::send`], [`Mediator::query`]
368/// or [`Mediator::publish`] from anywhere in your async runtime.
369#[derive(Clone)]
370pub struct Mediator {
371    inner: Arc<MediatorInner>,
372}
373
374impl fmt::Debug for Mediator {
375    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
376        f.debug_struct("Mediator")
377            .field("command_handlers", &self.inner.command_handlers.len())
378            .field("query_handlers", &self.inner.query_handlers.len())
379            .field(
380                "notification_handlers",
381                &self
382                    .inner
383                    .notification_handlers
384                    .values()
385                    .map(Vec::len)
386                    .sum::<usize>(),
387            )
388            .field("middlewares", &self.inner.middlewares.len())
389            .finish()
390    }
391}
392
393struct MediatorInner {
394    command_handlers: HashMap<TypeId, Arc<dyn ErasedCommandHandler>>,
395    query_handlers: HashMap<TypeId, Arc<dyn ErasedQueryHandler>>,
396    notification_handlers: HashMap<TypeId, Vec<Arc<dyn ErasedNotificationHandler>>>,
397    middlewares: Arc<[Arc<dyn DynMiddleware>]>,
398}
399
400impl Mediator {
401    /// Dispatches a [`Command`] to its registered handler and returns the
402    /// handler's output.
403    ///
404    /// A fresh [`CorrelationId`] is minted for this dispatch. To continue an
405    /// existing causal chain (e.g. from inside another handler), use
406    /// [`Mediator::send_with_correlation_id`] instead.
407    ///
408    /// # Errors
409    ///
410    /// Returns [`HexeractError::HandlerNotFound`] if no handler is
411    /// registered for `C`, or the handler's own error converted into
412    /// [`HexeractError`] when the handler itself fails.
413    pub async fn send<C: Command>(&self, command: C) -> Result<C::Output, HexeractError> {
414        self.send_with_correlation_id(command, CorrelationId::new())
415            .await
416    }
417
418    /// Dispatches a [`Command`] to its registered handler using the supplied
419    /// [`CorrelationId`], preserving the causal chain across in-process
420    /// dispatches.
421    ///
422    /// Use this variant when the caller already holds a [`CorrelationId`]
423    /// (for example `ctx.correlation_id` inside a handler) and wants all
424    /// follow-up messages to share the same identifier. The plain
425    /// [`Mediator::send`] method is equivalent to calling this with
426    /// `CorrelationId::new()`.
427    ///
428    /// # Errors
429    ///
430    /// Returns [`HexeractError::HandlerNotFound`] if no handler is
431    /// registered for `C`, or the handler's own error converted into
432    /// [`HexeractError`] when the handler itself fails.
433    pub async fn send_with_correlation_id<C: Command>(
434        &self,
435        command: C,
436        correlation_id: CorrelationId,
437    ) -> Result<C::Output, HexeractError> {
438        let tid = TypeId::of::<C>();
439        let handler = self
440            .inner
441            .command_handlers
442            .get(&tid)
443            .ok_or_else(|| HexeractError::handler_not_found(type_name::<C>()))?;
444
445        let message_id = MessageId::new();
446        let envelope = MessageEnvelope::for_command::<C>(message_id, correlation_id);
447        let ctx = HandlerContext::new(message_id, correlation_id);
448
449        let terminal = Arc::new(CommandTerminal {
450            handler: Arc::clone(handler),
451            payload: Mutex::new(Some(Box::new(command) as BoxAny)),
452        });
453
454        let next = Next::new(self.inner.middlewares.clone(), terminal);
455        let output = next.run(&envelope, &ctx).await?;
456
457        output
458            .downcast::<C::Output>()
459            .map(|boxed| *boxed)
460            .map_err(|_| HexeractError::downcast_failed(type_name::<C::Output>()))
461    }
462
463    /// Dispatches a [`Query`] to its registered handler and returns the
464    /// handler's output.
465    ///
466    /// A fresh [`CorrelationId`] is minted for this dispatch. To continue an
467    /// existing causal chain, use [`Mediator::query_with_correlation_id`]
468    /// instead.
469    ///
470    /// # Errors
471    ///
472    /// Returns [`HexeractError::HandlerNotFound`] if no handler is
473    /// registered for `Q`, or the handler's own error converted into
474    /// [`HexeractError`] when the handler itself fails.
475    pub async fn query<Q: Query>(&self, query: Q) -> Result<Q::Output, HexeractError> {
476        self.query_with_correlation_id(query, CorrelationId::new())
477            .await
478    }
479
480    /// Dispatches a [`Query`] to its registered handler using the supplied
481    /// [`CorrelationId`], preserving the causal chain across in-process
482    /// dispatches.
483    ///
484    /// Use this variant when the caller already holds a [`CorrelationId`]
485    /// (for example `ctx.correlation_id` inside a handler) and wants all
486    /// follow-up messages to share the same identifier. The plain
487    /// [`Mediator::query`] method is equivalent to calling this with
488    /// `CorrelationId::new()`.
489    ///
490    /// # Errors
491    ///
492    /// Returns [`HexeractError::HandlerNotFound`] if no handler is
493    /// registered for `Q`, or the handler's own error converted into
494    /// [`HexeractError`] when the handler itself fails.
495    pub async fn query_with_correlation_id<Q: Query>(
496        &self,
497        query: Q,
498        correlation_id: CorrelationId,
499    ) -> Result<Q::Output, HexeractError> {
500        let tid = TypeId::of::<Q>();
501        let handler = self
502            .inner
503            .query_handlers
504            .get(&tid)
505            .ok_or_else(|| HexeractError::handler_not_found(type_name::<Q>()))?;
506
507        let message_id = MessageId::new();
508        let envelope = MessageEnvelope::for_query::<Q>(message_id, correlation_id);
509        let ctx = HandlerContext::new(message_id, correlation_id);
510
511        let terminal = Arc::new(QueryTerminal {
512            handler: Arc::clone(handler),
513            payload: Mutex::new(Some(Box::new(query) as BoxAny)),
514        });
515
516        let next = Next::new(self.inner.middlewares.clone(), terminal);
517        let output = next.run(&envelope, &ctx).await?;
518
519        output
520            .downcast::<Q::Output>()
521            .map(|boxed| *boxed)
522            .map_err(|_| HexeractError::downcast_failed(type_name::<Q::Output>()))
523    }
524
525    /// Publishes a [`Notification`] to every handler registered for `N`.
526    /// A notification with zero handlers is a no-op.
527    ///
528    /// A fresh [`CorrelationId`] is minted for this dispatch. To continue an
529    /// existing causal chain, use [`Mediator::publish_with_correlation_id`]
530    /// instead.
531    ///
532    /// Handlers are dispatched **concurrently**: every handler future is built
533    /// up front and driven together with [`join_all`], so a slow handler no
534    /// longer blocks the handlers registered after it. The fan-out is
535    /// cooperative and runtime-agnostic: it runs on the caller's task without
536    /// spawning, so a CPU-bound handler that never `.await`s still blocks its
537    /// siblings until it yields. Every handler shares the same
538    /// [`CorrelationId`] so traces can link the fan-out to its source publish
539    /// call, but each handler receives a dedicated [`MessageId`].
540    ///
541    /// # Ordering
542    ///
543    /// Execution is interleaved, so the order in which handlers observe the
544    /// notification is unspecified. Only the *collection* order is
545    /// deterministic: the [`NotificationFailure`]s aggregated on failure
546    /// appear in handler registration order.
547    ///
548    /// # Errors
549    ///
550    /// Every handler runs regardless of its siblings. If one or more fail,
551    /// their typed errors are aggregated into [`HexeractError::PublishFailed`],
552    /// each [`NotificationFailure`] retaining the failing handler's name and
553    /// the error `source` chain so the caller can recover an individual
554    /// failure instead of parsing a flattened string.
555    pub async fn publish<N: Notification>(&self, notification: N) -> Result<(), HexeractError> {
556        self.publish_with_correlation_id(notification, CorrelationId::new())
557            .await
558    }
559
560    /// Publishes a [`Notification`] to every handler registered for `N` using
561    /// the supplied [`CorrelationId`], preserving the causal chain across
562    /// in-process dispatches.
563    ///
564    /// Use this variant when the caller already holds a [`CorrelationId`]
565    /// (for example `ctx.correlation_id` inside a handler) and wants all
566    /// follow-up messages to share the same identifier. Every handler in the
567    /// fan-out receives the same supplied id and a dedicated [`MessageId`].
568    /// The plain [`Mediator::publish`] method is equivalent to calling this
569    /// with `CorrelationId::new()`.
570    ///
571    /// # Ordering
572    ///
573    /// See [`Mediator::publish`] for ordering and concurrency guarantees.
574    ///
575    /// # Errors
576    ///
577    /// See [`Mediator::publish`] for error aggregation semantics.
578    pub async fn publish_with_correlation_id<N: Notification>(
579        &self,
580        notification: N,
581        correlation_id: CorrelationId,
582    ) -> Result<(), HexeractError> {
583        let tid = TypeId::of::<N>();
584        let Some(handlers) = self.inner.notification_handlers.get(&tid) else {
585            return Ok(());
586        };
587        if handlers.is_empty() {
588            return Ok(());
589        }
590
591        let total = handlers.len();
592
593        // Shared once across the fan-out: each handler receives a cheap
594        // `Arc` clone (refcount bump) rather than a deep clone of the payload.
595        let shared = Arc::new(notification);
596
597        let dispatches = handlers.iter().map(|handler| {
598            let handler_name = handler.handler_type_name();
599            let message_id = MessageId::new();
600            let envelope = MessageEnvelope::for_notification::<N>(message_id, correlation_id);
601            let ctx = HandlerContext::new(message_id, correlation_id);
602
603            let payload = Box::new(Arc::clone(&shared)) as BoxAny;
604            let terminal = Arc::new(NotificationTerminal {
605                handler: Arc::clone(handler),
606                payload: Mutex::new(Some(payload)),
607            });
608            let next = Next::new(self.inner.middlewares.clone(), terminal);
609
610            async move {
611                next.run(&envelope, &ctx)
612                    .await
613                    .map_err(|error| NotificationFailure {
614                        handler: handler_name,
615                        error,
616                    })
617            }
618        });
619
620        let failures: Vec<NotificationFailure> = join_all(dispatches)
621            .await
622            .into_iter()
623            .filter_map(Result::err)
624            .collect();
625
626        if failures.is_empty() {
627            Ok(())
628        } else {
629            Err(HexeractError::publish_failed(
630                type_name::<N>(),
631                total,
632                failures,
633            ))
634        }
635    }
636}
637
638#[cfg(test)]
639mod tests {
640    use super::*;
641    use hexeract_core::HandlerContext;
642
643    struct Ping {
644        value: u32,
645    }
646
647    impl Command for Ping {
648        type Output = u32;
649    }
650
651    struct PingHandler;
652
653    impl CommandHandler<Ping> for PingHandler {
654        type Error = HexeractError;
655
656        async fn handle(&self, cmd: Ping, _ctx: &HandlerContext) -> Result<u32, Self::Error> {
657            Ok(cmd.value * 2)
658        }
659    }
660
661    struct GetCount;
662
663    impl Query for GetCount {
664        type Output = u32;
665    }
666
667    struct CountHandler;
668
669    impl QueryHandler<GetCount> for CountHandler {
670        type Error = HexeractError;
671
672        async fn handle(&self, _q: GetCount, _ctx: &HandlerContext) -> Result<u32, Self::Error> {
673            Ok(99)
674        }
675    }
676
677    #[derive(Clone)]
678    struct UserCreated {
679        id: u32,
680    }
681
682    impl Notification for UserCreated {}
683
684    struct AuditHandler;
685
686    impl NotificationHandler<UserCreated> for AuditHandler {
687        type Error = HexeractError;
688
689        async fn handle(
690            &self,
691            _n: Arc<UserCreated>,
692            _ctx: &HandlerContext,
693        ) -> Result<(), Self::Error> {
694            Ok(())
695        }
696    }
697
698    struct RecordingNotifHandler {
699        label: &'static str,
700        seen: Arc<Mutex<Vec<(&'static str, u32)>>>,
701    }
702
703    impl NotificationHandler<UserCreated> for RecordingNotifHandler {
704        type Error = HexeractError;
705
706        async fn handle(
707            &self,
708            notif: Arc<UserCreated>,
709            _ctx: &HandlerContext,
710        ) -> Result<(), Self::Error> {
711            self.seen
712                .lock()
713                .expect("recorder mutex poisoned")
714                .push((self.label, notif.id));
715            Ok(())
716        }
717    }
718
719    struct FailingNotifHandler;
720
721    impl NotificationHandler<UserCreated> for FailingNotifHandler {
722        type Error = HexeractError;
723
724        async fn handle(
725            &self,
726            _n: Arc<UserCreated>,
727            _ctx: &HandlerContext,
728        ) -> Result<(), Self::Error> {
729            Err(HexeractError::Dispatch("boom".into()))
730        }
731    }
732
733    struct BarrierNotifHandler {
734        barrier: Arc<tokio::sync::Barrier>,
735    }
736
737    impl NotificationHandler<UserCreated> for BarrierNotifHandler {
738        type Error = HexeractError;
739
740        async fn handle(
741            &self,
742            _n: Arc<UserCreated>,
743            _ctx: &HandlerContext,
744        ) -> Result<(), Self::Error> {
745            self.barrier.wait().await;
746            Ok(())
747        }
748    }
749
750    #[derive(Debug, thiserror::Error)]
751    enum PersistError {
752        #[error("database unavailable")]
753        Unavailable,
754    }
755
756    impl From<PersistError> for HexeractError {
757        fn from(err: PersistError) -> Self {
758            HexeractError::handler_failed(err)
759        }
760    }
761
762    struct SourcedFailingHandler;
763
764    impl NotificationHandler<UserCreated> for SourcedFailingHandler {
765        type Error = PersistError;
766
767        async fn handle(
768            &self,
769            _n: Arc<UserCreated>,
770            _ctx: &HandlerContext,
771        ) -> Result<(), Self::Error> {
772            Err(PersistError::Unavailable)
773        }
774    }
775
776    #[test]
777    fn default_builder_is_empty() {
778        let builder = MediatorBuilder::default();
779        assert!(builder.command_handlers.is_empty());
780        assert!(builder.query_handlers.is_empty());
781        assert!(builder.notification_handlers.is_empty());
782        assert!(builder.middlewares.is_empty());
783        assert!(builder.errors.is_empty());
784    }
785
786    #[test]
787    fn registers_one_command_handler_then_builds_ok() {
788        let mediator = MediatorBuilder::new()
789            .register_command_handler::<Ping, _>(PingHandler)
790            .build()
791            .expect("build must succeed");
792        let _clone = mediator.clone();
793    }
794
795    #[tokio::test]
796    async fn send_routes_to_command_handler_and_returns_output() {
797        let mediator = MediatorBuilder::new()
798            .register_command_handler::<Ping, _>(PingHandler)
799            .build()
800            .expect("build must succeed");
801        let out = mediator
802            .send(Ping { value: 21 })
803            .await
804            .expect("dispatch must succeed");
805        assert_eq!(out, 42);
806    }
807
808    #[tokio::test]
809    async fn send_returns_handler_not_found_when_unregistered() {
810        let mediator = MediatorBuilder::new().build().expect("empty build is ok");
811        let err = mediator
812            .send(Ping { value: 0 })
813            .await
814            .expect_err("missing handler must fail");
815        assert!(matches!(
816            err,
817            HexeractError::HandlerNotFound { message_type, .. } if message_type.ends_with("::Ping")
818        ));
819    }
820
821    #[tokio::test]
822    async fn query_routes_to_query_handler_and_returns_output() {
823        let mediator = MediatorBuilder::new()
824            .register_query_handler::<GetCount, _>(CountHandler)
825            .build()
826            .expect("build must succeed");
827        let out = mediator.query(GetCount).await.expect("query must succeed");
828        assert_eq!(out, 99);
829    }
830
831    #[tokio::test]
832    async fn query_returns_handler_not_found_when_unregistered() {
833        let mediator = MediatorBuilder::new().build().expect("empty build is ok");
834        let err = mediator
835            .query(GetCount)
836            .await
837            .expect_err("missing handler must fail");
838        assert!(matches!(
839            err,
840            HexeractError::HandlerNotFound { message_type, .. } if message_type.ends_with("::GetCount")
841        ));
842    }
843
844    #[tokio::test]
845    async fn publish_fans_out_to_all_notification_handlers() {
846        let seen = Arc::new(Mutex::new(Vec::new()));
847        let mediator = MediatorBuilder::new()
848            .register_notification_handler::<UserCreated, _>(RecordingNotifHandler {
849                label: "audit",
850                seen: Arc::clone(&seen),
851            })
852            .register_notification_handler::<UserCreated, _>(RecordingNotifHandler {
853                label: "email",
854                seen: Arc::clone(&seen),
855            })
856            .register_notification_handler::<UserCreated, _>(RecordingNotifHandler {
857                label: "search",
858                seen: Arc::clone(&seen),
859            })
860            .build()
861            .expect("build must succeed");
862
863        mediator
864            .publish(UserCreated { id: 7 })
865            .await
866            .expect("publish must succeed");
867
868        let recorded = seen.lock().unwrap().clone();
869        assert_eq!(
870            recorded,
871            vec![("audit", 7), ("email", 7), ("search", 7)],
872            "every handler must observe the notification once, in registration order"
873        );
874    }
875
876    #[tokio::test]
877    async fn publish_with_no_handlers_is_ok() {
878        let mediator = MediatorBuilder::new().build().expect("empty build is ok");
879        mediator
880            .publish(UserCreated { id: 1 })
881            .await
882            .expect("publish with zero handlers must succeed");
883    }
884
885    #[tokio::test]
886    async fn publish_invokes_all_handlers_even_when_one_fails() {
887        let seen = Arc::new(Mutex::new(Vec::new()));
888        let mediator = MediatorBuilder::new()
889            .register_notification_handler::<UserCreated, _>(RecordingNotifHandler {
890                label: "first",
891                seen: Arc::clone(&seen),
892            })
893            .register_notification_handler::<UserCreated, _>(FailingNotifHandler)
894            .register_notification_handler::<UserCreated, _>(RecordingNotifHandler {
895                label: "third",
896                seen: Arc::clone(&seen),
897            })
898            .build()
899            .expect("build must succeed");
900
901        let err = mediator
902            .publish(UserCreated { id: 42 })
903            .await
904            .expect_err("at least one handler failed");
905
906        let HexeractError::PublishFailed {
907            total,
908            ref failures,
909            ..
910        } = err
911        else {
912            panic!("unexpected variant: {err:?}");
913        };
914        assert_eq!(total, 3);
915        assert_eq!(failures.len(), 1);
916        assert!(matches!(failures[0].error, HexeractError::Dispatch(_)));
917        assert!(failures[0].error.to_string().contains("boom"));
918        assert!(
919            err.to_string()
920                .starts_with("publish: 1 of 3 handlers failed")
921        );
922
923        let recorded = seen.lock().unwrap().clone();
924        assert_eq!(
925            recorded,
926            vec![("first", 42), ("third", 42)],
927            "siblings must run even after a failure"
928        );
929    }
930
931    #[tokio::test]
932    async fn publish_runs_handlers_concurrently() {
933        let barrier = Arc::new(tokio::sync::Barrier::new(2));
934        let mediator = MediatorBuilder::new()
935            .register_notification_handler::<UserCreated, _>(BarrierNotifHandler {
936                barrier: Arc::clone(&barrier),
937            })
938            .register_notification_handler::<UserCreated, _>(BarrierNotifHandler {
939                barrier: Arc::clone(&barrier),
940            })
941            .build()
942            .expect("build must succeed");
943
944        // Each handler parks on the shared barrier and only completes once the
945        // other has arrived. Sequential dispatch would park the first handler
946        // forever and trip the timeout; concurrent fan-out lets both arrive.
947        tokio::time::timeout(
948            std::time::Duration::from_secs(5),
949            mediator.publish(UserCreated { id: 1 }),
950        )
951        .await
952        .expect("handlers must run concurrently, not sequentially")
953        .expect("publish must succeed");
954    }
955
956    #[tokio::test]
957    async fn publish_aggregates_typed_failures_with_handler_names() {
958        let mediator = MediatorBuilder::new()
959            .register_notification_handler::<UserCreated, _>(SourcedFailingHandler)
960            .register_notification_handler::<UserCreated, _>(AuditHandler)
961            .register_notification_handler::<UserCreated, _>(FailingNotifHandler)
962            .build()
963            .expect("build must succeed");
964
965        let err = mediator
966            .publish(UserCreated { id: 9 })
967            .await
968            .expect_err("two of three handlers failed");
969
970        let HexeractError::PublishFailed {
971            notification_type,
972            total,
973            failures,
974            ..
975        } = err
976        else {
977            panic!("expected PublishFailed, got {err:?}");
978        };
979
980        assert!(notification_type.ends_with("::UserCreated"));
981        assert_eq!(total, 3);
982        assert_eq!(failures.len(), 2);
983
984        // Failures keep registration order: the sourced failure comes first.
985        assert!(failures[0].handler.ends_with("SourcedFailingHandler"));
986        assert!(matches!(
987            failures[0].error,
988            HexeractError::HandlerFailed { .. }
989        ));
990        assert!(
991            std::error::Error::source(&failures[0].error).is_some(),
992            "the handler's typed source must survive aggregation"
993        );
994
995        assert!(failures[1].handler.ends_with("FailingNotifHandler"));
996        assert!(matches!(failures[1].error, HexeractError::Dispatch(_)));
997    }
998
999    #[tokio::test]
1000    async fn audit_handler_stub_compiles() {
1001        // The `AuditHandler` fixture is kept for symmetry with prior tests.
1002        let mediator = MediatorBuilder::new()
1003            .register_notification_handler::<UserCreated, _>(AuditHandler)
1004            .build()
1005            .expect("build must succeed");
1006        mediator
1007            .publish(UserCreated { id: 0 })
1008            .await
1009            .expect("audit handler must succeed");
1010    }
1011
1012    #[test]
1013    fn detects_duplicate_command_handler() {
1014        let err = MediatorBuilder::new()
1015            .register_command_handler::<Ping, _>(PingHandler)
1016            .register_command_handler::<Ping, _>(PingHandler)
1017            .build()
1018            .expect_err("second registration must fail at build");
1019        let MediatorBuildError::DuplicateHandler { type_name } = err;
1020        assert!(type_name.ends_with("::Ping"));
1021    }
1022
1023    #[test]
1024    fn detects_duplicate_query_handler() {
1025        let err = MediatorBuilder::new()
1026            .register_query_handler::<GetCount, _>(CountHandler)
1027            .register_query_handler::<GetCount, _>(CountHandler)
1028            .build()
1029            .expect_err("second registration must fail at build");
1030        let MediatorBuildError::DuplicateHandler { type_name } = err;
1031        assert!(type_name.ends_with("::GetCount"));
1032    }
1033
1034    #[test]
1035    fn allows_multiple_notification_handlers_for_same_type() {
1036        let builder = MediatorBuilder::new()
1037            .register_notification_handler::<UserCreated, _>(AuditHandler)
1038            .register_notification_handler::<UserCreated, _>(AuditHandler)
1039            .register_notification_handler::<UserCreated, _>(AuditHandler);
1040        let tid = TypeId::of::<UserCreated>();
1041        assert_eq!(builder.notification_handlers[&tid].len(), 3);
1042        let mediator = builder.build().expect("notifications must not collide");
1043        assert_eq!(
1044            mediator.inner.notification_handlers[&TypeId::of::<UserCreated>()].len(),
1045            3
1046        );
1047    }
1048
1049    #[test]
1050    fn mediator_is_clone_and_shares_registry() {
1051        let mediator = MediatorBuilder::new()
1052            .register_command_handler::<Ping, _>(PingHandler)
1053            .build()
1054            .expect("build must succeed");
1055        let clone = mediator.clone();
1056        assert!(Arc::ptr_eq(&mediator.inner, &clone.inner));
1057    }
1058
1059    fn verify_probe_cmd_name() -> &'static str {
1060        "hexeract_mediator::tests::VerifyProbeCmd"
1061    }
1062
1063    fn verify_probe_handler_name() -> &'static str {
1064        "hexeract_mediator::tests::VerifyProbeHandler"
1065    }
1066
1067    fn verify_probe_query_name() -> &'static str {
1068        "hexeract_mediator::tests::VerifyProbeQuery"
1069    }
1070
1071    fn verify_probe_query_handler_name() -> &'static str {
1072        "hexeract_mediator::tests::VerifyProbeQueryHandler"
1073    }
1074
1075    inventory::submit!(HandlerRegistration {
1076        kind: HandlerKind::Command,
1077        message_type_name: verify_probe_cmd_name,
1078        handler_type_name: verify_probe_handler_name,
1079    });
1080
1081    inventory::submit!(HandlerRegistration {
1082        kind: HandlerKind::Query,
1083        message_type_name: verify_probe_query_name,
1084        handler_type_name: verify_probe_query_handler_name,
1085    });
1086
1087    #[test]
1088    fn verify_handlers_reports_every_inventory_entry_when_builder_is_empty() {
1089        let err = MediatorBuilder::new()
1090            .verify_handlers()
1091            .expect_err("empty builder must report all inventory entries as missing");
1092        let HandlersVerificationError::Missing { missing } = err;
1093        assert!(missing.iter().any(|m| {
1094            m.kind == HandlerKind::Command
1095                && m.message_type_name == "hexeract_mediator::tests::VerifyProbeCmd"
1096        }));
1097        assert!(missing.iter().any(|m| {
1098            m.kind == HandlerKind::Query
1099                && m.message_type_name == "hexeract_mediator::tests::VerifyProbeQuery"
1100        }));
1101    }
1102
1103    #[test]
1104    fn verify_handlers_uses_message_type_name_strings_to_match_registrations() {
1105        // The probe entries above name fictional types. We register handlers
1106        // for `Ping` and `GetCount`, whose `type_name`s do not match, so
1107        // verify_handlers should still report the probes as missing while
1108        // never complaining about Ping or GetCount themselves.
1109        let missing = MediatorBuilder::new()
1110            .register_command_handler::<Ping, _>(PingHandler)
1111            .register_query_handler::<GetCount, _>(CountHandler)
1112            .verify_handlers()
1113            .map_or_else(
1114                |HandlersVerificationError::Missing { missing }| missing,
1115                |()| Vec::new(),
1116            );
1117        assert!(
1118            missing.iter().all(|m| {
1119                m.message_type_name != type_name::<Ping>()
1120                    && m.message_type_name != type_name::<GetCount>()
1121            }),
1122            "registered handlers must not appear as missing"
1123        );
1124    }
1125}