1#![cfg_attr(docsrs, feature(doc_cfg))]
60
61mod erased;
62mod terminal;
63
64use std::any::{TypeId, type_name};
65use std::collections::hash_map::Entry;
66use std::collections::{HashMap, HashSet};
67use std::fmt;
68use std::sync::{Arc, Mutex};
69
70use futures_util::future::join_all;
71use hexeract_core::{
72 Command, CommandHandler, CorrelationId, DynMiddleware, HandlerContext, HandlerKind,
73 HandlerRegistration, HexeractError, MessageEnvelope, MessageId, Middleware, Next, Notification,
74 NotificationFailure, NotificationHandler, Query, QueryHandler,
75};
76
77use crate::erased::{
78 BoxAny, ErasedCommandHandler, ErasedNotificationHandler, ErasedQueryHandler,
79 TypedCommandHandler, TypedNotificationHandler, TypedQueryHandler,
80};
81use crate::terminal::{CommandTerminal, NotificationTerminal, QueryTerminal};
82
83#[derive(Debug, thiserror::Error)]
90#[non_exhaustive]
91pub enum MediatorBuildError {
92 #[error("duplicate handler registered for {type_name}")]
96 DuplicateHandler {
97 type_name: &'static str,
99 },
100}
101
102#[derive(Debug, Clone)]
105pub struct MissingHandler {
106 pub kind: HandlerKind,
108 pub message_type_name: &'static str,
110 pub handler_type_name: &'static str,
112}
113
114#[derive(Debug, thiserror::Error)]
119#[non_exhaustive]
120pub enum HandlersVerificationError {
121 #[error("{} handler(s) declared via #[handler] are missing from the registry", missing.len())]
124 Missing {
125 missing: Vec<MissingHandler>,
127 },
128}
129
130pub struct MediatorBuilder {
143 command_handlers: HashMap<TypeId, Arc<dyn ErasedCommandHandler>>,
144 query_handlers: HashMap<TypeId, Arc<dyn ErasedQueryHandler>>,
145 notification_handlers: HashMap<TypeId, Vec<Arc<dyn ErasedNotificationHandler>>>,
146 registered_command_types: HashSet<&'static str>,
147 registered_query_types: HashSet<&'static str>,
148 registered_notification_types: HashSet<&'static str>,
149 middlewares: Vec<Arc<dyn DynMiddleware>>,
150 errors: Vec<MediatorBuildError>,
151}
152
153impl MediatorBuilder {
154 #[must_use]
156 pub fn new() -> Self {
157 Self {
158 command_handlers: HashMap::new(),
159 query_handlers: HashMap::new(),
160 notification_handlers: HashMap::new(),
161 registered_command_types: HashSet::new(),
162 registered_query_types: HashSet::new(),
163 registered_notification_types: HashSet::new(),
164 middlewares: Vec::new(),
165 errors: Vec::new(),
166 }
167 }
168
169 #[must_use]
174 pub fn register_command_handler<C, H>(mut self, handler: H) -> Self
175 where
176 C: Command,
177 H: CommandHandler<C>,
178 {
179 let tid = TypeId::of::<C>();
180 match self.command_handlers.entry(tid) {
181 Entry::Vacant(slot) => {
182 slot.insert(Arc::new(TypedCommandHandler::<C, H>::new(handler)));
183 self.registered_command_types.insert(type_name::<C>());
184 }
185 Entry::Occupied(_) => {
186 self.errors.push(MediatorBuildError::DuplicateHandler {
187 type_name: type_name::<C>(),
188 });
189 }
190 }
191 self
192 }
193
194 #[must_use]
199 pub fn register_query_handler<Q, H>(mut self, handler: H) -> Self
200 where
201 Q: Query,
202 H: QueryHandler<Q>,
203 {
204 let tid = TypeId::of::<Q>();
205 match self.query_handlers.entry(tid) {
206 Entry::Vacant(slot) => {
207 slot.insert(Arc::new(TypedQueryHandler::<Q, H>::new(handler)));
208 self.registered_query_types.insert(type_name::<Q>());
209 }
210 Entry::Occupied(_) => {
211 self.errors.push(MediatorBuildError::DuplicateHandler {
212 type_name: type_name::<Q>(),
213 });
214 }
215 }
216 self
217 }
218
219 #[must_use]
224 pub fn register_notification_handler<N, H>(mut self, handler: H) -> Self
225 where
226 N: Notification,
227 H: NotificationHandler<N>,
228 {
229 let tid = TypeId::of::<N>();
230 self.notification_handlers
231 .entry(tid)
232 .or_default()
233 .push(Arc::new(TypedNotificationHandler::<N, H>::new(handler)));
234 self.registered_notification_types.insert(type_name::<N>());
235 self
236 }
237
238 #[must_use]
241 pub fn with_middleware<M: Middleware>(mut self, middleware: M) -> Self {
242 self.middlewares.push(Arc::new(middleware));
243 self
244 }
245
246 pub fn verify_handlers(&self) -> Result<(), HandlersVerificationError> {
289 let mut missing = Vec::new();
290 for reg in inventory::iter::<HandlerRegistration> {
291 let message_type_name = (reg.message_type_name)();
292 let present = match reg.kind {
293 HandlerKind::Command => self.registered_command_types.contains(message_type_name),
294 HandlerKind::Query => self.registered_query_types.contains(message_type_name),
295 HandlerKind::Notification => self
296 .registered_notification_types
297 .contains(message_type_name),
298 };
299 if !present {
300 missing.push(MissingHandler {
301 kind: reg.kind,
302 message_type_name,
303 handler_type_name: (reg.handler_type_name)(),
304 });
305 }
306 }
307 if missing.is_empty() {
308 Ok(())
309 } else {
310 Err(HandlersVerificationError::Missing { missing })
311 }
312 }
313
314 pub fn build(self) -> Result<Mediator, MediatorBuildError> {
325 if let Some(err) = self.errors.into_iter().next() {
326 return Err(err);
327 }
328 Ok(Mediator {
329 inner: Arc::new(MediatorInner {
330 command_handlers: self.command_handlers,
331 query_handlers: self.query_handlers,
332 notification_handlers: self.notification_handlers,
333 middlewares: self.middlewares.into(),
334 }),
335 })
336 }
337}
338
339impl Default for MediatorBuilder {
340 fn default() -> Self {
341 Self::new()
342 }
343}
344
345impl fmt::Debug for MediatorBuilder {
346 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347 f.debug_struct("MediatorBuilder")
348 .field("command_handlers", &self.command_handlers.len())
349 .field("query_handlers", &self.query_handlers.len())
350 .field(
351 "notification_handlers",
352 &self
353 .notification_handlers
354 .values()
355 .map(Vec::len)
356 .sum::<usize>(),
357 )
358 .field("middlewares", &self.middlewares.len())
359 .field("errors", &self.errors.len())
360 .finish_non_exhaustive()
361 }
362}
363
364#[derive(Clone)]
370pub struct Mediator {
371 inner: Arc<MediatorInner>,
372}
373
374impl fmt::Debug for Mediator {
375 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
376 f.debug_struct("Mediator")
377 .field("command_handlers", &self.inner.command_handlers.len())
378 .field("query_handlers", &self.inner.query_handlers.len())
379 .field(
380 "notification_handlers",
381 &self
382 .inner
383 .notification_handlers
384 .values()
385 .map(Vec::len)
386 .sum::<usize>(),
387 )
388 .field("middlewares", &self.inner.middlewares.len())
389 .finish()
390 }
391}
392
393struct MediatorInner {
394 command_handlers: HashMap<TypeId, Arc<dyn ErasedCommandHandler>>,
395 query_handlers: HashMap<TypeId, Arc<dyn ErasedQueryHandler>>,
396 notification_handlers: HashMap<TypeId, Vec<Arc<dyn ErasedNotificationHandler>>>,
397 middlewares: Arc<[Arc<dyn DynMiddleware>]>,
398}
399
400impl Mediator {
401 pub async fn send<C: Command>(&self, command: C) -> Result<C::Output, HexeractError> {
414 self.send_with_correlation_id(command, CorrelationId::new())
415 .await
416 }
417
418 pub async fn send_with_correlation_id<C: Command>(
434 &self,
435 command: C,
436 correlation_id: CorrelationId,
437 ) -> Result<C::Output, HexeractError> {
438 let tid = TypeId::of::<C>();
439 let handler = self
440 .inner
441 .command_handlers
442 .get(&tid)
443 .ok_or_else(|| HexeractError::handler_not_found(type_name::<C>()))?;
444
445 let message_id = MessageId::new();
446 let envelope = MessageEnvelope::for_command::<C>(message_id, correlation_id);
447 let ctx = HandlerContext::new(message_id, correlation_id);
448
449 let terminal = Arc::new(CommandTerminal {
450 handler: Arc::clone(handler),
451 payload: Mutex::new(Some(Box::new(command) as BoxAny)),
452 });
453
454 let next = Next::new(self.inner.middlewares.clone(), terminal);
455 let output = next.run(&envelope, &ctx).await?;
456
457 output
458 .downcast::<C::Output>()
459 .map(|boxed| *boxed)
460 .map_err(|_| HexeractError::downcast_failed(type_name::<C::Output>()))
461 }
462
463 pub async fn query<Q: Query>(&self, query: Q) -> Result<Q::Output, HexeractError> {
476 self.query_with_correlation_id(query, CorrelationId::new())
477 .await
478 }
479
480 pub async fn query_with_correlation_id<Q: Query>(
496 &self,
497 query: Q,
498 correlation_id: CorrelationId,
499 ) -> Result<Q::Output, HexeractError> {
500 let tid = TypeId::of::<Q>();
501 let handler = self
502 .inner
503 .query_handlers
504 .get(&tid)
505 .ok_or_else(|| HexeractError::handler_not_found(type_name::<Q>()))?;
506
507 let message_id = MessageId::new();
508 let envelope = MessageEnvelope::for_query::<Q>(message_id, correlation_id);
509 let ctx = HandlerContext::new(message_id, correlation_id);
510
511 let terminal = Arc::new(QueryTerminal {
512 handler: Arc::clone(handler),
513 payload: Mutex::new(Some(Box::new(query) as BoxAny)),
514 });
515
516 let next = Next::new(self.inner.middlewares.clone(), terminal);
517 let output = next.run(&envelope, &ctx).await?;
518
519 output
520 .downcast::<Q::Output>()
521 .map(|boxed| *boxed)
522 .map_err(|_| HexeractError::downcast_failed(type_name::<Q::Output>()))
523 }
524
525 pub async fn publish<N: Notification>(&self, notification: N) -> Result<(), HexeractError> {
556 self.publish_with_correlation_id(notification, CorrelationId::new())
557 .await
558 }
559
560 pub async fn publish_with_correlation_id<N: Notification>(
579 &self,
580 notification: N,
581 correlation_id: CorrelationId,
582 ) -> Result<(), HexeractError> {
583 let tid = TypeId::of::<N>();
584 let Some(handlers) = self.inner.notification_handlers.get(&tid) else {
585 return Ok(());
586 };
587 if handlers.is_empty() {
588 return Ok(());
589 }
590
591 let total = handlers.len();
592
593 let shared = Arc::new(notification);
596
597 let dispatches = handlers.iter().map(|handler| {
598 let handler_name = handler.handler_type_name();
599 let message_id = MessageId::new();
600 let envelope = MessageEnvelope::for_notification::<N>(message_id, correlation_id);
601 let ctx = HandlerContext::new(message_id, correlation_id);
602
603 let payload = Box::new(Arc::clone(&shared)) as BoxAny;
604 let terminal = Arc::new(NotificationTerminal {
605 handler: Arc::clone(handler),
606 payload: Mutex::new(Some(payload)),
607 });
608 let next = Next::new(self.inner.middlewares.clone(), terminal);
609
610 async move {
611 next.run(&envelope, &ctx)
612 .await
613 .map_err(|error| NotificationFailure {
614 handler: handler_name,
615 error,
616 })
617 }
618 });
619
620 let failures: Vec<NotificationFailure> = join_all(dispatches)
621 .await
622 .into_iter()
623 .filter_map(Result::err)
624 .collect();
625
626 if failures.is_empty() {
627 Ok(())
628 } else {
629 Err(HexeractError::publish_failed(
630 type_name::<N>(),
631 total,
632 failures,
633 ))
634 }
635 }
636}
637
638#[cfg(test)]
639mod tests {
640 use super::*;
641 use hexeract_core::HandlerContext;
642
643 struct Ping {
644 value: u32,
645 }
646
647 impl Command for Ping {
648 type Output = u32;
649 }
650
651 struct PingHandler;
652
653 impl CommandHandler<Ping> for PingHandler {
654 type Error = HexeractError;
655
656 async fn handle(&self, cmd: Ping, _ctx: &HandlerContext) -> Result<u32, Self::Error> {
657 Ok(cmd.value * 2)
658 }
659 }
660
661 struct GetCount;
662
663 impl Query for GetCount {
664 type Output = u32;
665 }
666
667 struct CountHandler;
668
669 impl QueryHandler<GetCount> for CountHandler {
670 type Error = HexeractError;
671
672 async fn handle(&self, _q: GetCount, _ctx: &HandlerContext) -> Result<u32, Self::Error> {
673 Ok(99)
674 }
675 }
676
677 #[derive(Clone)]
678 struct UserCreated {
679 id: u32,
680 }
681
682 impl Notification for UserCreated {}
683
684 struct AuditHandler;
685
686 impl NotificationHandler<UserCreated> for AuditHandler {
687 type Error = HexeractError;
688
689 async fn handle(
690 &self,
691 _n: Arc<UserCreated>,
692 _ctx: &HandlerContext,
693 ) -> Result<(), Self::Error> {
694 Ok(())
695 }
696 }
697
698 struct RecordingNotifHandler {
699 label: &'static str,
700 seen: Arc<Mutex<Vec<(&'static str, u32)>>>,
701 }
702
703 impl NotificationHandler<UserCreated> for RecordingNotifHandler {
704 type Error = HexeractError;
705
706 async fn handle(
707 &self,
708 notif: Arc<UserCreated>,
709 _ctx: &HandlerContext,
710 ) -> Result<(), Self::Error> {
711 self.seen
712 .lock()
713 .expect("recorder mutex poisoned")
714 .push((self.label, notif.id));
715 Ok(())
716 }
717 }
718
719 struct FailingNotifHandler;
720
721 impl NotificationHandler<UserCreated> for FailingNotifHandler {
722 type Error = HexeractError;
723
724 async fn handle(
725 &self,
726 _n: Arc<UserCreated>,
727 _ctx: &HandlerContext,
728 ) -> Result<(), Self::Error> {
729 Err(HexeractError::Dispatch("boom".into()))
730 }
731 }
732
733 struct BarrierNotifHandler {
734 barrier: Arc<tokio::sync::Barrier>,
735 }
736
737 impl NotificationHandler<UserCreated> for BarrierNotifHandler {
738 type Error = HexeractError;
739
740 async fn handle(
741 &self,
742 _n: Arc<UserCreated>,
743 _ctx: &HandlerContext,
744 ) -> Result<(), Self::Error> {
745 self.barrier.wait().await;
746 Ok(())
747 }
748 }
749
750 #[derive(Debug, thiserror::Error)]
751 enum PersistError {
752 #[error("database unavailable")]
753 Unavailable,
754 }
755
756 impl From<PersistError> for HexeractError {
757 fn from(err: PersistError) -> Self {
758 HexeractError::handler_failed(err)
759 }
760 }
761
762 struct SourcedFailingHandler;
763
764 impl NotificationHandler<UserCreated> for SourcedFailingHandler {
765 type Error = PersistError;
766
767 async fn handle(
768 &self,
769 _n: Arc<UserCreated>,
770 _ctx: &HandlerContext,
771 ) -> Result<(), Self::Error> {
772 Err(PersistError::Unavailable)
773 }
774 }
775
776 #[test]
777 fn default_builder_is_empty() {
778 let builder = MediatorBuilder::default();
779 assert!(builder.command_handlers.is_empty());
780 assert!(builder.query_handlers.is_empty());
781 assert!(builder.notification_handlers.is_empty());
782 assert!(builder.middlewares.is_empty());
783 assert!(builder.errors.is_empty());
784 }
785
786 #[test]
787 fn registers_one_command_handler_then_builds_ok() {
788 let mediator = MediatorBuilder::new()
789 .register_command_handler::<Ping, _>(PingHandler)
790 .build()
791 .expect("build must succeed");
792 let _clone = mediator.clone();
793 }
794
795 #[tokio::test]
796 async fn send_routes_to_command_handler_and_returns_output() {
797 let mediator = MediatorBuilder::new()
798 .register_command_handler::<Ping, _>(PingHandler)
799 .build()
800 .expect("build must succeed");
801 let out = mediator
802 .send(Ping { value: 21 })
803 .await
804 .expect("dispatch must succeed");
805 assert_eq!(out, 42);
806 }
807
808 #[tokio::test]
809 async fn send_returns_handler_not_found_when_unregistered() {
810 let mediator = MediatorBuilder::new().build().expect("empty build is ok");
811 let err = mediator
812 .send(Ping { value: 0 })
813 .await
814 .expect_err("missing handler must fail");
815 assert!(matches!(
816 err,
817 HexeractError::HandlerNotFound { message_type, .. } if message_type.ends_with("::Ping")
818 ));
819 }
820
821 #[tokio::test]
822 async fn query_routes_to_query_handler_and_returns_output() {
823 let mediator = MediatorBuilder::new()
824 .register_query_handler::<GetCount, _>(CountHandler)
825 .build()
826 .expect("build must succeed");
827 let out = mediator.query(GetCount).await.expect("query must succeed");
828 assert_eq!(out, 99);
829 }
830
831 #[tokio::test]
832 async fn query_returns_handler_not_found_when_unregistered() {
833 let mediator = MediatorBuilder::new().build().expect("empty build is ok");
834 let err = mediator
835 .query(GetCount)
836 .await
837 .expect_err("missing handler must fail");
838 assert!(matches!(
839 err,
840 HexeractError::HandlerNotFound { message_type, .. } if message_type.ends_with("::GetCount")
841 ));
842 }
843
844 #[tokio::test]
845 async fn publish_fans_out_to_all_notification_handlers() {
846 let seen = Arc::new(Mutex::new(Vec::new()));
847 let mediator = MediatorBuilder::new()
848 .register_notification_handler::<UserCreated, _>(RecordingNotifHandler {
849 label: "audit",
850 seen: Arc::clone(&seen),
851 })
852 .register_notification_handler::<UserCreated, _>(RecordingNotifHandler {
853 label: "email",
854 seen: Arc::clone(&seen),
855 })
856 .register_notification_handler::<UserCreated, _>(RecordingNotifHandler {
857 label: "search",
858 seen: Arc::clone(&seen),
859 })
860 .build()
861 .expect("build must succeed");
862
863 mediator
864 .publish(UserCreated { id: 7 })
865 .await
866 .expect("publish must succeed");
867
868 let recorded = seen.lock().unwrap().clone();
869 assert_eq!(
870 recorded,
871 vec![("audit", 7), ("email", 7), ("search", 7)],
872 "every handler must observe the notification once, in registration order"
873 );
874 }
875
876 #[tokio::test]
877 async fn publish_with_no_handlers_is_ok() {
878 let mediator = MediatorBuilder::new().build().expect("empty build is ok");
879 mediator
880 .publish(UserCreated { id: 1 })
881 .await
882 .expect("publish with zero handlers must succeed");
883 }
884
885 #[tokio::test]
886 async fn publish_invokes_all_handlers_even_when_one_fails() {
887 let seen = Arc::new(Mutex::new(Vec::new()));
888 let mediator = MediatorBuilder::new()
889 .register_notification_handler::<UserCreated, _>(RecordingNotifHandler {
890 label: "first",
891 seen: Arc::clone(&seen),
892 })
893 .register_notification_handler::<UserCreated, _>(FailingNotifHandler)
894 .register_notification_handler::<UserCreated, _>(RecordingNotifHandler {
895 label: "third",
896 seen: Arc::clone(&seen),
897 })
898 .build()
899 .expect("build must succeed");
900
901 let err = mediator
902 .publish(UserCreated { id: 42 })
903 .await
904 .expect_err("at least one handler failed");
905
906 let HexeractError::PublishFailed {
907 total,
908 ref failures,
909 ..
910 } = err
911 else {
912 panic!("unexpected variant: {err:?}");
913 };
914 assert_eq!(total, 3);
915 assert_eq!(failures.len(), 1);
916 assert!(matches!(failures[0].error, HexeractError::Dispatch(_)));
917 assert!(failures[0].error.to_string().contains("boom"));
918 assert!(
919 err.to_string()
920 .starts_with("publish: 1 of 3 handlers failed")
921 );
922
923 let recorded = seen.lock().unwrap().clone();
924 assert_eq!(
925 recorded,
926 vec![("first", 42), ("third", 42)],
927 "siblings must run even after a failure"
928 );
929 }
930
931 #[tokio::test]
932 async fn publish_runs_handlers_concurrently() {
933 let barrier = Arc::new(tokio::sync::Barrier::new(2));
934 let mediator = MediatorBuilder::new()
935 .register_notification_handler::<UserCreated, _>(BarrierNotifHandler {
936 barrier: Arc::clone(&barrier),
937 })
938 .register_notification_handler::<UserCreated, _>(BarrierNotifHandler {
939 barrier: Arc::clone(&barrier),
940 })
941 .build()
942 .expect("build must succeed");
943
944 tokio::time::timeout(
948 std::time::Duration::from_secs(5),
949 mediator.publish(UserCreated { id: 1 }),
950 )
951 .await
952 .expect("handlers must run concurrently, not sequentially")
953 .expect("publish must succeed");
954 }
955
956 #[tokio::test]
957 async fn publish_aggregates_typed_failures_with_handler_names() {
958 let mediator = MediatorBuilder::new()
959 .register_notification_handler::<UserCreated, _>(SourcedFailingHandler)
960 .register_notification_handler::<UserCreated, _>(AuditHandler)
961 .register_notification_handler::<UserCreated, _>(FailingNotifHandler)
962 .build()
963 .expect("build must succeed");
964
965 let err = mediator
966 .publish(UserCreated { id: 9 })
967 .await
968 .expect_err("two of three handlers failed");
969
970 let HexeractError::PublishFailed {
971 notification_type,
972 total,
973 failures,
974 ..
975 } = err
976 else {
977 panic!("expected PublishFailed, got {err:?}");
978 };
979
980 assert!(notification_type.ends_with("::UserCreated"));
981 assert_eq!(total, 3);
982 assert_eq!(failures.len(), 2);
983
984 assert!(failures[0].handler.ends_with("SourcedFailingHandler"));
986 assert!(matches!(
987 failures[0].error,
988 HexeractError::HandlerFailed { .. }
989 ));
990 assert!(
991 std::error::Error::source(&failures[0].error).is_some(),
992 "the handler's typed source must survive aggregation"
993 );
994
995 assert!(failures[1].handler.ends_with("FailingNotifHandler"));
996 assert!(matches!(failures[1].error, HexeractError::Dispatch(_)));
997 }
998
999 #[tokio::test]
1000 async fn audit_handler_stub_compiles() {
1001 let mediator = MediatorBuilder::new()
1003 .register_notification_handler::<UserCreated, _>(AuditHandler)
1004 .build()
1005 .expect("build must succeed");
1006 mediator
1007 .publish(UserCreated { id: 0 })
1008 .await
1009 .expect("audit handler must succeed");
1010 }
1011
1012 #[test]
1013 fn detects_duplicate_command_handler() {
1014 let err = MediatorBuilder::new()
1015 .register_command_handler::<Ping, _>(PingHandler)
1016 .register_command_handler::<Ping, _>(PingHandler)
1017 .build()
1018 .expect_err("second registration must fail at build");
1019 let MediatorBuildError::DuplicateHandler { type_name } = err;
1020 assert!(type_name.ends_with("::Ping"));
1021 }
1022
1023 #[test]
1024 fn detects_duplicate_query_handler() {
1025 let err = MediatorBuilder::new()
1026 .register_query_handler::<GetCount, _>(CountHandler)
1027 .register_query_handler::<GetCount, _>(CountHandler)
1028 .build()
1029 .expect_err("second registration must fail at build");
1030 let MediatorBuildError::DuplicateHandler { type_name } = err;
1031 assert!(type_name.ends_with("::GetCount"));
1032 }
1033
1034 #[test]
1035 fn allows_multiple_notification_handlers_for_same_type() {
1036 let builder = MediatorBuilder::new()
1037 .register_notification_handler::<UserCreated, _>(AuditHandler)
1038 .register_notification_handler::<UserCreated, _>(AuditHandler)
1039 .register_notification_handler::<UserCreated, _>(AuditHandler);
1040 let tid = TypeId::of::<UserCreated>();
1041 assert_eq!(builder.notification_handlers[&tid].len(), 3);
1042 let mediator = builder.build().expect("notifications must not collide");
1043 assert_eq!(
1044 mediator.inner.notification_handlers[&TypeId::of::<UserCreated>()].len(),
1045 3
1046 );
1047 }
1048
1049 #[test]
1050 fn mediator_is_clone_and_shares_registry() {
1051 let mediator = MediatorBuilder::new()
1052 .register_command_handler::<Ping, _>(PingHandler)
1053 .build()
1054 .expect("build must succeed");
1055 let clone = mediator.clone();
1056 assert!(Arc::ptr_eq(&mediator.inner, &clone.inner));
1057 }
1058
1059 fn verify_probe_cmd_name() -> &'static str {
1060 "hexeract_mediator::tests::VerifyProbeCmd"
1061 }
1062
1063 fn verify_probe_handler_name() -> &'static str {
1064 "hexeract_mediator::tests::VerifyProbeHandler"
1065 }
1066
1067 fn verify_probe_query_name() -> &'static str {
1068 "hexeract_mediator::tests::VerifyProbeQuery"
1069 }
1070
1071 fn verify_probe_query_handler_name() -> &'static str {
1072 "hexeract_mediator::tests::VerifyProbeQueryHandler"
1073 }
1074
1075 inventory::submit!(HandlerRegistration {
1076 kind: HandlerKind::Command,
1077 message_type_name: verify_probe_cmd_name,
1078 handler_type_name: verify_probe_handler_name,
1079 });
1080
1081 inventory::submit!(HandlerRegistration {
1082 kind: HandlerKind::Query,
1083 message_type_name: verify_probe_query_name,
1084 handler_type_name: verify_probe_query_handler_name,
1085 });
1086
1087 #[test]
1088 fn verify_handlers_reports_every_inventory_entry_when_builder_is_empty() {
1089 let err = MediatorBuilder::new()
1090 .verify_handlers()
1091 .expect_err("empty builder must report all inventory entries as missing");
1092 let HandlersVerificationError::Missing { missing } = err;
1093 assert!(missing.iter().any(|m| {
1094 m.kind == HandlerKind::Command
1095 && m.message_type_name == "hexeract_mediator::tests::VerifyProbeCmd"
1096 }));
1097 assert!(missing.iter().any(|m| {
1098 m.kind == HandlerKind::Query
1099 && m.message_type_name == "hexeract_mediator::tests::VerifyProbeQuery"
1100 }));
1101 }
1102
1103 #[test]
1104 fn verify_handlers_uses_message_type_name_strings_to_match_registrations() {
1105 let missing = MediatorBuilder::new()
1110 .register_command_handler::<Ping, _>(PingHandler)
1111 .register_query_handler::<GetCount, _>(CountHandler)
1112 .verify_handlers()
1113 .map_or_else(
1114 |HandlersVerificationError::Missing { missing }| missing,
1115 |()| Vec::new(),
1116 );
1117 assert!(
1118 missing.iter().all(|m| {
1119 m.message_type_name != type_name::<Ping>()
1120 && m.message_type_name != type_name::<GetCount>()
1121 }),
1122 "registered handlers must not appear as missing"
1123 );
1124 }
1125}