angzarr_client/router/traits.rs
1//! Handler traits for each component type.
2//!
3//! Each trait defines the contract for domain handlers. Implementors
4//! encapsulate their routing logic internally and declare which types
5//! they handle via `command_types()` or `event_types()`.
6
7use prost_types::Any;
8use std::error::Error;
9use tonic::Status;
10
11use crate::proto::{CommandBook, Cover, EventBook, Notification, Projection};
12use crate::router::StateRouter;
13
14// ============================================================================
15// Common Types
16// ============================================================================
17
18/// Error type for command/event rejection with a human-readable reason.
19#[derive(Debug, Clone)]
20pub struct CommandRejectedError {
21 pub reason: String,
22}
23
24impl CommandRejectedError {
25 pub fn new(reason: impl Into<String>) -> Self {
26 Self {
27 reason: reason.into(),
28 }
29 }
30}
31
32impl std::fmt::Display for CommandRejectedError {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 write!(f, "Command rejected: {}", self.reason)
35 }
36}
37
38impl std::error::Error for CommandRejectedError {}
39
40impl From<CommandRejectedError> for Status {
41 fn from(err: CommandRejectedError) -> Self {
42 Status::failed_precondition(err.reason)
43 }
44}
45
46/// Result type for handlers.
47pub type CommandResult<T> = std::result::Result<T, CommandRejectedError>;
48
49/// Response from rejection handlers.
50///
51/// Handlers may return:
52/// - Events to compensate/fix state
53/// - Notification to forward upstream
54/// - Both
55#[derive(Default)]
56pub struct RejectionHandlerResponse {
57 /// Events to persist (compensation).
58 pub events: Option<EventBook>,
59 /// Notification to forward upstream.
60 pub notification: Option<Notification>,
61}
62
63/// Response from saga handlers.
64#[derive(Default)]
65pub struct SagaHandlerResponse {
66 /// Commands to send to other aggregates.
67 pub commands: Vec<CommandBook>,
68 /// Facts/events to inject to other aggregates.
69 pub events: Vec<EventBook>,
70}
71
72/// Response from process manager handlers.
73#[derive(Default)]
74pub struct ProcessManagerResponse {
75 /// Commands to send to other aggregates.
76 pub commands: Vec<CommandBook>,
77 /// Events to persist to the PM's own domain.
78 pub process_events: Option<EventBook>,
79 /// Facts to inject to other aggregates.
80 pub facts: Vec<EventBook>,
81}
82
83/// Helper trait for unpacking Any messages.
84pub trait UnpackAny {
85 /// Unpack an Any to a specific message type.
86 fn unpack<M: prost::Message + Default>(&self) -> Result<M, prost::DecodeError>;
87}
88
89impl UnpackAny for Any {
90 fn unpack<M: prost::Message + Default>(&self) -> Result<M, prost::DecodeError> {
91 M::decode(self.value.as_slice())
92 }
93}
94
95// ============================================================================
96// Command Handler
97// ============================================================================
98
99/// Handler for a single domain's command handler logic.
100///
101/// Command handlers receive commands and emit events. They maintain state
102/// that is rebuilt from events using a `StateRouter`.
103///
104/// # Example
105///
106/// ```rust,ignore
107/// struct PlayerHandler {
108/// state_router: StateRouter<PlayerState>,
109/// }
110///
111/// impl CommandHandlerDomainHandler for PlayerHandler {
112/// type State = PlayerState;
113///
114/// fn command_types(&self) -> Vec<String> {
115/// vec!["RegisterPlayer".into(), "DepositFunds".into()]
116/// }
117///
118/// fn state_router(&self) -> &StateRouter<Self::State> {
119/// &self.state_router
120/// }
121///
122/// fn handle(
123/// &self,
124/// cmd: &CommandBook,
125/// payload: &Any,
126/// state: &Self::State,
127/// seq: u32,
128/// ) -> CommandResult<EventBook> {
129/// dispatch_command!(payload, cmd, state, seq, {
130/// "RegisterPlayer" => self.handle_register,
131/// "DepositFunds" => self.handle_deposit,
132/// })
133/// }
134/// }
135/// ```
136pub trait CommandHandlerDomainHandler: Send + Sync {
137 /// The state type for this aggregate.
138 type State: Default + 'static;
139
140 /// Command type suffixes this handler processes.
141 ///
142 /// Used for subscription derivation and routing.
143 fn command_types(&self) -> Vec<String>;
144
145 /// Get the state router for rebuilding state from events.
146 fn state_router(&self) -> &StateRouter<Self::State>;
147
148 /// Rebuild state from events.
149 ///
150 /// Default implementation uses `state_router().with_event_book()`.
151 fn rebuild(&self, events: &EventBook) -> Self::State {
152 self.state_router().with_event_book(events)
153 }
154
155 /// Handle a command and return resulting events.
156 ///
157 /// The handler should dispatch internally based on `payload.type_url`.
158 fn handle(
159 &self,
160 cmd: &CommandBook,
161 payload: &Any,
162 state: &Self::State,
163 seq: u32,
164 ) -> CommandResult<EventBook>;
165
166 /// Handle a rejection notification.
167 ///
168 /// Called when a command issued by a saga/PM targeting this aggregate's
169 /// domain was rejected. Override to provide custom compensation logic.
170 ///
171 /// Default implementation returns an empty response (framework handles).
172 fn on_rejected(
173 &self,
174 _notification: &Notification,
175 _state: &Self::State,
176 _target_domain: &str,
177 _target_command: &str,
178 ) -> CommandResult<RejectionHandlerResponse> {
179 Ok(RejectionHandlerResponse::default())
180 }
181}
182
183// ============================================================================
184// Saga Handler
185// ============================================================================
186
187/// Handler for a single domain's events in a saga.
188///
189/// Sagas are **pure translators**: they receive source events and produce
190/// commands for target domains. They do NOT receive destination state —
191/// the framework handles sequence stamping and delivery retries.
192///
193/// # Example
194///
195/// ```rust,ignore
196/// struct OrderSagaHandler;
197///
198/// impl SagaDomainHandler for OrderSagaHandler {
199/// fn event_types(&self) -> Vec<String> {
200/// vec!["OrderCompleted".into(), "OrderCancelled".into()]
201/// }
202///
203/// fn handle(
204/// &self,
205/// source: &EventBook,
206/// event: &Any,
207/// ) -> CommandResult<SagaHandlerResponse> {
208/// dispatch_event!(event, source, {
209/// "OrderCompleted" => self.handle_completed,
210/// "OrderCancelled" => self.handle_cancelled,
211/// })
212/// }
213/// }
214/// ```
215pub trait SagaDomainHandler: Send + Sync {
216 /// Event type suffixes this handler processes.
217 ///
218 /// Used for subscription derivation.
219 fn event_types(&self) -> Vec<String>;
220
221 /// Translate source events into commands for target domains.
222 ///
223 /// Commands should have `cover` set to identify the target aggregate.
224 /// The framework will stamp `angzarr_deferred` with source info and
225 /// handle sequence assignment on delivery.
226 ///
227 /// Returns commands to send to other aggregates and events/facts to inject.
228 fn handle(&self, source: &EventBook, event: &Any) -> CommandResult<SagaHandlerResponse>;
229
230 /// Handle a rejection notification.
231 ///
232 /// Called when a saga-issued command was rejected. Override to provide
233 /// custom compensation logic.
234 ///
235 /// Default implementation returns an empty response (framework handles).
236 fn on_rejected(
237 &self,
238 _notification: &Notification,
239 _target_domain: &str,
240 _target_command: &str,
241 ) -> CommandResult<RejectionHandlerResponse> {
242 Ok(RejectionHandlerResponse::default())
243 }
244}
245
246// ============================================================================
247// Process Manager Handler
248// ============================================================================
249
250/// Handler for a single domain's events in a process manager.
251///
252/// Process managers correlate events across multiple domains and maintain
253/// their own state. Each domain gets its own handler, but they all share
254/// the same PM state type.
255///
256/// # Example
257///
258/// ```rust,ignore
259/// struct OrderPmHandler;
260///
261/// impl ProcessManagerDomainHandler<HandFlowState> for OrderPmHandler {
262/// fn event_types(&self) -> Vec<String> {
263/// vec!["OrderCreated".into()]
264/// }
265///
266/// fn prepare(&self, trigger: &EventBook, state: &HandFlowState, event: &Any) -> Vec<Cover> {
267/// // Declare needed destinations
268/// vec![]
269/// }
270///
271/// fn handle(
272/// &self,
273/// trigger: &EventBook,
274/// state: &HandFlowState,
275/// event: &Any,
276/// destinations: &[EventBook],
277/// ) -> CommandResult<ProcessManagerResponse> {
278/// // Process event, emit commands and/or PM events
279/// Ok(ProcessManagerResponse::default())
280/// }
281/// }
282/// ```
283pub trait ProcessManagerDomainHandler<S>: Send + Sync {
284 /// Event type suffixes this handler processes.
285 fn event_types(&self) -> Vec<String>;
286
287 /// Prepare phase — declare destination covers needed.
288 fn prepare(&self, trigger: &EventBook, state: &S, event: &Any) -> Vec<Cover>;
289
290 /// Handle phase — produce commands and PM events.
291 fn handle(
292 &self,
293 trigger: &EventBook,
294 state: &S,
295 event: &Any,
296 destinations: &[EventBook],
297 ) -> CommandResult<ProcessManagerResponse>;
298
299 /// Handle a rejection notification.
300 ///
301 /// Called when a PM-issued command was rejected. Override to provide
302 /// custom compensation logic.
303 fn on_rejected(
304 &self,
305 _notification: &Notification,
306 _state: &S,
307 _target_domain: &str,
308 _target_command: &str,
309 ) -> CommandResult<RejectionHandlerResponse> {
310 Ok(RejectionHandlerResponse::default())
311 }
312}
313
314// ============================================================================
315// Projector Handler
316// ============================================================================
317
318/// Handler for a single domain's events in a projector.
319///
320/// Projectors consume events and produce external output (read models,
321/// caches, external systems).
322///
323/// # Example
324///
325/// ```rust,ignore
326/// struct PlayerProjectorHandler;
327///
328/// impl ProjectorDomainHandler for PlayerProjectorHandler {
329/// fn event_types(&self) -> Vec<String> {
330/// vec!["PlayerRegistered".into(), "FundsDeposited".into()]
331/// }
332///
333/// fn project(&self, events: &EventBook) -> Result<Projection, Box<dyn Error + Send + Sync>> {
334/// // Update external read model
335/// Ok(Projection::default())
336/// }
337/// }
338/// ```
339pub trait ProjectorDomainHandler: Send + Sync {
340 /// Event type suffixes this handler processes.
341 fn event_types(&self) -> Vec<String>;
342
343 /// Project events to external output.
344 fn project(&self, events: &EventBook) -> Result<Projection, Box<dyn Error + Send + Sync>>;
345}
346
347// ============================================================================
348// Tests
349// ============================================================================
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354
355 #[test]
356 fn command_rejected_error_display() {
357 let err = CommandRejectedError::new("insufficient funds");
358 assert_eq!(err.to_string(), "Command rejected: insufficient funds");
359 }
360
361 #[test]
362 fn command_rejected_error_to_status() {
363 let err = CommandRejectedError::new("invalid input");
364 let status: Status = err.into();
365 assert_eq!(status.code(), tonic::Code::FailedPrecondition);
366 }
367
368 #[test]
369 fn rejection_handler_response_default() {
370 let response = RejectionHandlerResponse::default();
371 assert!(response.events.is_none());
372 assert!(response.notification.is_none());
373 }
374
375 #[test]
376 fn process_manager_response_default() {
377 let response = ProcessManagerResponse::default();
378 assert!(response.commands.is_empty());
379 assert!(response.process_events.is_none());
380 assert!(response.facts.is_empty());
381 }
382
383 #[test]
384 fn saga_handler_response_default() {
385 let response = SagaHandlerResponse::default();
386 assert!(response.commands.is_empty());
387 assert!(response.events.is_empty());
388 }
389}