Skip to main content

angzarr_client/router/
traits.rs

1//! Handler traits for each component type.
2//!
3//! Each trait defines the contract for domain handlers. Implementors
4//! encapsulate their routing logic internally and declare which types
5//! they handle via `command_types()` or `event_types()`.
6
7use prost_types::Any;
8use std::error::Error;
9use tonic::Status;
10
11use crate::proto::{CommandBook, Cover, EventBook, Notification, Projection};
12use crate::router::StateRouter;
13
14// ============================================================================
15// Common Types
16// ============================================================================
17
18/// Error type for command/event rejection with a human-readable reason.
19#[derive(Debug, Clone)]
20pub struct CommandRejectedError {
21    pub reason: String,
22}
23
24impl CommandRejectedError {
25    pub fn new(reason: impl Into<String>) -> Self {
26        Self {
27            reason: reason.into(),
28        }
29    }
30}
31
32impl std::fmt::Display for CommandRejectedError {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        write!(f, "Command rejected: {}", self.reason)
35    }
36}
37
38impl std::error::Error for CommandRejectedError {}
39
40impl From<CommandRejectedError> for Status {
41    fn from(err: CommandRejectedError) -> Self {
42        Status::failed_precondition(err.reason)
43    }
44}
45
46/// Result type for handlers.
47pub type CommandResult<T> = std::result::Result<T, CommandRejectedError>;
48
49/// Response from rejection handlers.
50///
51/// Handlers may return:
52/// - Events to compensate/fix state
53/// - Notification to forward upstream
54/// - Both
55#[derive(Default)]
56pub struct RejectionHandlerResponse {
57    /// Events to persist (compensation).
58    pub events: Option<EventBook>,
59    /// Notification to forward upstream.
60    pub notification: Option<Notification>,
61}
62
63/// Response from saga handlers.
64#[derive(Default)]
65pub struct SagaHandlerResponse {
66    /// Commands to send to other aggregates.
67    pub commands: Vec<CommandBook>,
68    /// Facts/events to inject to other aggregates.
69    pub events: Vec<EventBook>,
70}
71
72/// Response from process manager handlers.
73#[derive(Default)]
74pub struct ProcessManagerResponse {
75    /// Commands to send to other aggregates.
76    pub commands: Vec<CommandBook>,
77    /// Events to persist to the PM's own domain.
78    pub process_events: Option<EventBook>,
79    /// Facts to inject to other aggregates.
80    pub facts: Vec<EventBook>,
81}
82
83/// Helper trait for unpacking Any messages.
84pub trait UnpackAny {
85    /// Unpack an Any to a specific message type.
86    fn unpack<M: prost::Message + Default>(&self) -> Result<M, prost::DecodeError>;
87}
88
89impl UnpackAny for Any {
90    fn unpack<M: prost::Message + Default>(&self) -> Result<M, prost::DecodeError> {
91        M::decode(self.value.as_slice())
92    }
93}
94
95// ============================================================================
96// Command Handler
97// ============================================================================
98
99/// Handler for a single domain's command handler logic.
100///
101/// Command handlers receive commands and emit events. They maintain state
102/// that is rebuilt from events using a `StateRouter`.
103///
104/// # Example
105///
106/// ```rust,ignore
107/// struct PlayerHandler {
108///     state_router: StateRouter<PlayerState>,
109/// }
110///
111/// impl CommandHandlerDomainHandler for PlayerHandler {
112///     type State = PlayerState;
113///
114///     fn command_types(&self) -> Vec<String> {
115///         vec!["RegisterPlayer".into(), "DepositFunds".into()]
116///     }
117///
118///     fn state_router(&self) -> &StateRouter<Self::State> {
119///         &self.state_router
120///     }
121///
122///     fn handle(
123///         &self,
124///         cmd: &CommandBook,
125///         payload: &Any,
126///         state: &Self::State,
127///         seq: u32,
128///     ) -> CommandResult<EventBook> {
129///         dispatch_command!(payload, cmd, state, seq, {
130///             "RegisterPlayer" => self.handle_register,
131///             "DepositFunds" => self.handle_deposit,
132///         })
133///     }
134/// }
135/// ```
136pub trait CommandHandlerDomainHandler: Send + Sync {
137    /// The state type for this aggregate.
138    type State: Default + 'static;
139
140    /// Command type suffixes this handler processes.
141    ///
142    /// Used for subscription derivation and routing.
143    fn command_types(&self) -> Vec<String>;
144
145    /// Get the state router for rebuilding state from events.
146    fn state_router(&self) -> &StateRouter<Self::State>;
147
148    /// Rebuild state from events.
149    ///
150    /// Default implementation uses `state_router().with_event_book()`.
151    fn rebuild(&self, events: &EventBook) -> Self::State {
152        self.state_router().with_event_book(events)
153    }
154
155    /// Handle a command and return resulting events.
156    ///
157    /// The handler should dispatch internally based on `payload.type_url`.
158    fn handle(
159        &self,
160        cmd: &CommandBook,
161        payload: &Any,
162        state: &Self::State,
163        seq: u32,
164    ) -> CommandResult<EventBook>;
165
166    /// Handle a rejection notification.
167    ///
168    /// Called when a command issued by a saga/PM targeting this aggregate's
169    /// domain was rejected. Override to provide custom compensation logic.
170    ///
171    /// Default implementation returns an empty response (framework handles).
172    fn on_rejected(
173        &self,
174        _notification: &Notification,
175        _state: &Self::State,
176        _target_domain: &str,
177        _target_command: &str,
178    ) -> CommandResult<RejectionHandlerResponse> {
179        Ok(RejectionHandlerResponse::default())
180    }
181}
182
183// ============================================================================
184// Saga Handler
185// ============================================================================
186
187/// Handler for a single domain's events in a saga.
188///
189/// Sagas are **pure translators**: they receive source events and produce
190/// commands for target domains. They do NOT receive destination state —
191/// the framework handles sequence stamping and delivery retries.
192///
193/// # Example
194///
195/// ```rust,ignore
196/// struct OrderSagaHandler;
197///
198/// impl SagaDomainHandler for OrderSagaHandler {
199///     fn event_types(&self) -> Vec<String> {
200///         vec!["OrderCompleted".into(), "OrderCancelled".into()]
201///     }
202///
203///     fn handle(
204///         &self,
205///         source: &EventBook,
206///         event: &Any,
207///     ) -> CommandResult<SagaHandlerResponse> {
208///         dispatch_event!(event, source, {
209///             "OrderCompleted" => self.handle_completed,
210///             "OrderCancelled" => self.handle_cancelled,
211///         })
212///     }
213/// }
214/// ```
215pub trait SagaDomainHandler: Send + Sync {
216    /// Event type suffixes this handler processes.
217    ///
218    /// Used for subscription derivation.
219    fn event_types(&self) -> Vec<String>;
220
221    /// Translate source events into commands for target domains.
222    ///
223    /// Commands should have `cover` set to identify the target aggregate.
224    /// The framework will stamp `angzarr_deferred` with source info and
225    /// handle sequence assignment on delivery.
226    ///
227    /// Returns commands to send to other aggregates and events/facts to inject.
228    fn handle(&self, source: &EventBook, event: &Any) -> CommandResult<SagaHandlerResponse>;
229
230    /// Handle a rejection notification.
231    ///
232    /// Called when a saga-issued command was rejected. Override to provide
233    /// custom compensation logic.
234    ///
235    /// Default implementation returns an empty response (framework handles).
236    fn on_rejected(
237        &self,
238        _notification: &Notification,
239        _target_domain: &str,
240        _target_command: &str,
241    ) -> CommandResult<RejectionHandlerResponse> {
242        Ok(RejectionHandlerResponse::default())
243    }
244}
245
246// ============================================================================
247// Process Manager Handler
248// ============================================================================
249
250/// Handler for a single domain's events in a process manager.
251///
252/// Process managers correlate events across multiple domains and maintain
253/// their own state. Each domain gets its own handler, but they all share
254/// the same PM state type.
255///
256/// # Example
257///
258/// ```rust,ignore
259/// struct OrderPmHandler;
260///
261/// impl ProcessManagerDomainHandler<HandFlowState> for OrderPmHandler {
262///     fn event_types(&self) -> Vec<String> {
263///         vec!["OrderCreated".into()]
264///     }
265///
266///     fn prepare(&self, trigger: &EventBook, state: &HandFlowState, event: &Any) -> Vec<Cover> {
267///         // Declare needed destinations
268///         vec![]
269///     }
270///
271///     fn handle(
272///         &self,
273///         trigger: &EventBook,
274///         state: &HandFlowState,
275///         event: &Any,
276///         destinations: &[EventBook],
277///     ) -> CommandResult<ProcessManagerResponse> {
278///         // Process event, emit commands and/or PM events
279///         Ok(ProcessManagerResponse::default())
280///     }
281/// }
282/// ```
283pub trait ProcessManagerDomainHandler<S>: Send + Sync {
284    /// Event type suffixes this handler processes.
285    fn event_types(&self) -> Vec<String>;
286
287    /// Prepare phase — declare destination covers needed.
288    fn prepare(&self, trigger: &EventBook, state: &S, event: &Any) -> Vec<Cover>;
289
290    /// Handle phase — produce commands and PM events.
291    fn handle(
292        &self,
293        trigger: &EventBook,
294        state: &S,
295        event: &Any,
296        destinations: &[EventBook],
297    ) -> CommandResult<ProcessManagerResponse>;
298
299    /// Handle a rejection notification.
300    ///
301    /// Called when a PM-issued command was rejected. Override to provide
302    /// custom compensation logic.
303    fn on_rejected(
304        &self,
305        _notification: &Notification,
306        _state: &S,
307        _target_domain: &str,
308        _target_command: &str,
309    ) -> CommandResult<RejectionHandlerResponse> {
310        Ok(RejectionHandlerResponse::default())
311    }
312}
313
314// ============================================================================
315// Projector Handler
316// ============================================================================
317
318/// Handler for a single domain's events in a projector.
319///
320/// Projectors consume events and produce external output (read models,
321/// caches, external systems).
322///
323/// # Example
324///
325/// ```rust,ignore
326/// struct PlayerProjectorHandler;
327///
328/// impl ProjectorDomainHandler for PlayerProjectorHandler {
329///     fn event_types(&self) -> Vec<String> {
330///         vec!["PlayerRegistered".into(), "FundsDeposited".into()]
331///     }
332///
333///     fn project(&self, events: &EventBook) -> Result<Projection, Box<dyn Error + Send + Sync>> {
334///         // Update external read model
335///         Ok(Projection::default())
336///     }
337/// }
338/// ```
339pub trait ProjectorDomainHandler: Send + Sync {
340    /// Event type suffixes this handler processes.
341    fn event_types(&self) -> Vec<String>;
342
343    /// Project events to external output.
344    fn project(&self, events: &EventBook) -> Result<Projection, Box<dyn Error + Send + Sync>>;
345}
346
347// ============================================================================
348// Tests
349// ============================================================================
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354
355    #[test]
356    fn command_rejected_error_display() {
357        let err = CommandRejectedError::new("insufficient funds");
358        assert_eq!(err.to_string(), "Command rejected: insufficient funds");
359    }
360
361    #[test]
362    fn command_rejected_error_to_status() {
363        let err = CommandRejectedError::new("invalid input");
364        let status: Status = err.into();
365        assert_eq!(status.code(), tonic::Code::FailedPrecondition);
366    }
367
368    #[test]
369    fn rejection_handler_response_default() {
370        let response = RejectionHandlerResponse::default();
371        assert!(response.events.is_none());
372        assert!(response.notification.is_none());
373    }
374
375    #[test]
376    fn process_manager_response_default() {
377        let response = ProcessManagerResponse::default();
378        assert!(response.commands.is_empty());
379        assert!(response.process_events.is_none());
380        assert!(response.facts.is_empty());
381    }
382
383    #[test]
384    fn saga_handler_response_default() {
385        let response = SagaHandlerResponse::default();
386        assert!(response.commands.is_empty());
387        assert!(response.events.is_empty());
388    }
389}