Skip to main content

atomr_patterns/
extensions.rs

1//! Extension hooks shared across patterns.
2//!
3//! Two flavors:
4//!
5//! 1. **Named slots** — typed closures that the framework runs at
6//!    well-known points (`on_command`, `on_event`). These can fail and
7//!    surface a [`crate::PatternError`].
8//! 2. **Generic taps** — fire-and-forget [`tokio::sync::mpsc`] senders
9//!    that receive a clone of every command/event. Use these to bridge
10//!    into [`atomr_streams`] without coupling the pattern surface to a
11//!    specific Source/Sink shape.
12
13use std::sync::Arc;
14
15use crate::PatternError;
16
17/// A pre-handler interceptor. Receives the command by reference; may
18/// reject it (turns into [`PatternError::Intercepted`]).
19pub type CommandInterceptor<C, E> = Arc<dyn Fn(&C) -> Result<(), PatternError<E>> + Send + Sync + 'static>;
20
21/// A post-persist event listener. Synchronous; fast hooks only — for
22/// async work, push events into a tap channel and react out-of-band.
23pub type EventListener<EV> = Arc<dyn Fn(&EV) + Send + Sync + 'static>;
24
25/// Bundle of extension hooks. Reused by every pattern that wants to let
26/// users plug in their own actors / sinks at well-known points.
27pub struct ExtensionSlots<C, EV, DE> {
28    pub command_interceptors: Vec<CommandInterceptor<C, DE>>,
29    pub event_listeners: Vec<EventListener<EV>>,
30    pub command_taps: Vec<tokio::sync::mpsc::UnboundedSender<C>>,
31    pub event_taps: Vec<tokio::sync::mpsc::UnboundedSender<EV>>,
32}
33
34impl<C, EV, DE> Default for ExtensionSlots<C, EV, DE> {
35    fn default() -> Self {
36        Self {
37            command_interceptors: Vec::new(),
38            event_listeners: Vec::new(),
39            command_taps: Vec::new(),
40            event_taps: Vec::new(),
41        }
42    }
43}
44
45impl<C, EV, DE> Clone for ExtensionSlots<C, EV, DE> {
46    fn clone(&self) -> Self {
47        Self {
48            command_interceptors: self.command_interceptors.clone(),
49            event_listeners: self.event_listeners.clone(),
50            command_taps: self.command_taps.clone(),
51            event_taps: self.event_taps.clone(),
52        }
53    }
54}
55
56impl<C, EV, DE> ExtensionSlots<C, EV, DE> {
57    /// Run every interceptor; bail on the first rejection.
58    pub fn run_interceptors(&self, cmd: &C) -> Result<(), PatternError<DE>> {
59        for hook in &self.command_interceptors {
60            hook(cmd)?;
61        }
62        Ok(())
63    }
64
65    /// Notify every event listener.
66    pub fn notify_listeners(&self, ev: &EV) {
67        for hook in &self.event_listeners {
68            hook(ev);
69        }
70    }
71}
72
73impl<C: Clone, EV, DE> ExtensionSlots<C, EV, DE> {
74    /// Push a command clone to every command tap. Closed receivers are
75    /// silently pruned.
76    pub fn push_command_taps(&mut self, cmd: &C) {
77        self.command_taps.retain(|tx| tx.send(cmd.clone()).is_ok());
78    }
79}
80
81impl<C, EV: Clone, DE> ExtensionSlots<C, EV, DE> {
82    /// Push an event clone to every event tap. Closed receivers are
83    /// silently pruned.
84    pub fn push_event_taps(&mut self, ev: &EV) {
85        self.event_taps.retain(|tx| tx.send(ev.clone()).is_ok());
86    }
87}