Skip to main content

angzarr_client/
handler.rs

1//! gRPC service handlers for aggregates, sagas, and process managers.
2//!
3//! This module provides gRPC service implementations that wrap routers.
4
5use std::sync::Arc;
6
7use prost_types::Any;
8use tonic::{Request, Response, Status};
9
10use crate::proto::{
11    command_handler_service_server::CommandHandlerService,
12    process_manager_service_server::ProcessManagerService,
13    projector_service_server::ProjectorService, saga_service_server::SagaService,
14    upcaster_service_server::UpcasterService, BusinessResponse, ContextualCommand, EventBook,
15    ProcessManagerHandleRequest, ProcessManagerHandleResponse, ProcessManagerPrepareRequest,
16    ProcessManagerPrepareResponse, Projection, ReplayRequest, ReplayResponse, SagaHandleRequest,
17    SagaResponse, UpcastRequest, UpcastResponse,
18};
19use crate::router::{
20    CloudEventsRouter, CommandHandlerDomainHandler, CommandHandlerRouter, ProcessManagerRouter,
21    SagaDomainHandler, SagaRouter,
22};
23
24/// Function type for packing state into protobuf Any.
25///
26/// Used by Replay RPC to return state as a serializable message.
27pub type StatePacker<S> = fn(&S) -> Result<Any, Status>;
28
29/// gRPC command handler service implementation.
30///
31/// Wraps a `CommandHandlerRouter` to handle commands.
32/// Optionally supports Replay RPC for MERGE_COMMUTATIVE conflict detection.
33pub struct CommandHandlerGrpc<S, H>
34where
35    S: Default + Send + Sync + 'static,
36    H: CommandHandlerDomainHandler<State = S> + Clone + 'static,
37{
38    router: Arc<CommandHandlerRouter<S, H>>,
39    /// Optional state packer for Replay RPC support.
40    state_packer: Option<StatePacker<S>>,
41}
42
43impl<S, H> CommandHandlerGrpc<S, H>
44where
45    S: Default + Send + Sync + 'static,
46    H: CommandHandlerDomainHandler<State = S> + Clone + 'static,
47{
48    /// Create a new command handler from a router.
49    pub fn new(router: CommandHandlerRouter<S, H>) -> Self {
50        Self {
51            router: Arc::new(router),
52            state_packer: None,
53        }
54    }
55
56    /// Enable Replay RPC support by providing a state packer.
57    pub fn with_replay(mut self, packer: StatePacker<S>) -> Self {
58        self.state_packer = Some(packer);
59        self
60    }
61
62    /// Get the underlying router.
63    pub fn router(&self) -> &CommandHandlerRouter<S, H> {
64        &self.router
65    }
66}
67
68#[tonic::async_trait]
69impl<S, H> CommandHandlerService for CommandHandlerGrpc<S, H>
70where
71    S: Default + Send + Sync + 'static,
72    H: CommandHandlerDomainHandler<State = S> + Clone + 'static,
73{
74    async fn handle(
75        &self,
76        request: Request<ContextualCommand>,
77    ) -> Result<Response<BusinessResponse>, Status> {
78        let cmd = request.into_inner();
79        let response = self.router.dispatch(&cmd)?;
80        Ok(Response::new(response))
81    }
82
83    async fn replay(
84        &self,
85        request: Request<ReplayRequest>,
86    ) -> Result<Response<ReplayResponse>, Status> {
87        let packer = self.state_packer.ok_or_else(|| {
88            Status::unimplemented(
89                "Replay not implemented. Call with_replay() to enable for MERGE_COMMUTATIVE strategy.",
90            )
91        })?;
92
93        let req = request.into_inner();
94        let event_book = build_event_book_for_replay(&req);
95        let state = self.router.rebuild_state(&event_book);
96        let state_any = packer(&state)?;
97
98        Ok(Response::new(ReplayResponse {
99            state: Some(state_any),
100        }))
101    }
102}
103
104/// Build an EventBook from a ReplayRequest for state reconstruction.
105fn build_event_book_for_replay(req: &ReplayRequest) -> EventBook {
106    EventBook {
107        cover: None,
108        pages: req.events.clone(),
109        snapshot: req.base_snapshot.clone(),
110        next_sequence: 0,
111    }
112}
113
114/// gRPC saga service implementation.
115///
116/// Wraps a `SagaRouter` to handle saga events.
117pub struct SagaHandler<H>
118where
119    H: SagaDomainHandler + Clone + 'static,
120{
121    router: Arc<SagaRouter<H>>,
122}
123
124impl<H: SagaDomainHandler + Clone + 'static> SagaHandler<H> {
125    /// Create a new saga handler from a router.
126    pub fn new(router: SagaRouter<H>) -> Self {
127        Self {
128            router: Arc::new(router),
129        }
130    }
131
132    /// Get the underlying router.
133    pub fn router(&self) -> &SagaRouter<H> {
134        &self.router
135    }
136}
137
138#[tonic::async_trait]
139impl<H: SagaDomainHandler + Clone + 'static> SagaService for SagaHandler<H> {
140    async fn handle(
141        &self,
142        request: Request<SagaHandleRequest>,
143    ) -> Result<Response<SagaResponse>, Status> {
144        let req = request.into_inner();
145        let source = req
146            .source
147            .as_ref()
148            .ok_or_else(|| Status::invalid_argument("Missing source event book"))?;
149
150        let response = self.router.dispatch(source)?;
151        Ok(Response::new(response))
152    }
153}
154
155/// Handle function type for projectors (function pointer).
156pub type ProjectorHandleFn = fn(&EventBook) -> Result<Projection, Status>;
157
158/// Handle closure type for projectors.
159pub type ProjectorHandleClosureFn =
160    Arc<dyn Fn(&EventBook) -> Result<Projection, Status> + Send + Sync>;
161
162/// Internal handle type - either fn pointer or closure.
163enum ProjectorHandleType {
164    Fn(ProjectorHandleFn),
165    Closure(ProjectorHandleClosureFn),
166}
167
168/// gRPC projector service implementation.
169///
170/// Wraps a handle function to process events and produce projections.
171pub struct ProjectorHandler {
172    name: String,
173    handle_type: Option<ProjectorHandleType>,
174}
175
176impl ProjectorHandler {
177    /// Create a new projector handler.
178    pub fn new(name: impl Into<String>) -> Self {
179        Self {
180            name: name.into(),
181            handle_type: None,
182        }
183    }
184
185    /// Set the handle function (function pointer).
186    pub fn with_handle(mut self, handle_fn: ProjectorHandleFn) -> Self {
187        self.handle_type = Some(ProjectorHandleType::Fn(handle_fn));
188        self
189    }
190
191    /// Set the handle function (closure).
192    pub fn with_handle_fn<H>(mut self, handle_fn: H) -> Self
193    where
194        H: Fn(&EventBook) -> Result<Projection, Status> + Send + Sync + 'static,
195    {
196        self.handle_type = Some(ProjectorHandleType::Closure(Arc::new(handle_fn)));
197        self
198    }
199
200    /// Get the projector name.
201    pub fn name(&self) -> &str {
202        &self.name
203    }
204}
205
206#[tonic::async_trait]
207impl ProjectorService for ProjectorHandler {
208    async fn handle(&self, request: Request<EventBook>) -> Result<Response<Projection>, Status> {
209        let event_book = request.into_inner();
210        match &self.handle_type {
211            Some(ProjectorHandleType::Fn(handle_fn)) => {
212                let projection = handle_fn(&event_book)?;
213                Ok(Response::new(projection))
214            }
215            Some(ProjectorHandleType::Closure(handle_fn)) => {
216                let projection = handle_fn(&event_book)?;
217                Ok(Response::new(projection))
218            }
219            None => Ok(Response::new(Projection::default())),
220        }
221    }
222
223    async fn handle_speculative(
224        &self,
225        request: Request<EventBook>,
226    ) -> Result<Response<Projection>, Status> {
227        self.handle(request).await
228    }
229}
230
231/// gRPC process manager service implementation.
232///
233/// Wraps a `ProcessManagerRouter` to handle PM events.
234pub struct ProcessManagerGrpcHandler<S: Default + Send + Sync + 'static> {
235    router: Arc<ProcessManagerRouter<S>>,
236}
237
238impl<S: Default + Send + Sync + 'static> ProcessManagerGrpcHandler<S> {
239    /// Create a new process manager handler from a router.
240    pub fn new(router: ProcessManagerRouter<S>) -> Self {
241        Self {
242            router: Arc::new(router),
243        }
244    }
245
246    /// Get the underlying router.
247    pub fn router(&self) -> &ProcessManagerRouter<S> {
248        &self.router
249    }
250}
251
252#[tonic::async_trait]
253impl<S: Default + Send + Sync + 'static> ProcessManagerService for ProcessManagerGrpcHandler<S> {
254    async fn prepare(
255        &self,
256        request: Request<ProcessManagerPrepareRequest>,
257    ) -> Result<Response<ProcessManagerPrepareResponse>, Status> {
258        let req = request.into_inner();
259        let destinations = self
260            .router
261            .prepare_destinations(&req.trigger, &req.process_state);
262
263        Ok(Response::new(ProcessManagerPrepareResponse {
264            destinations,
265        }))
266    }
267
268    async fn handle(
269        &self,
270        request: Request<ProcessManagerHandleRequest>,
271    ) -> Result<Response<ProcessManagerHandleResponse>, Status> {
272        let req = request.into_inner();
273
274        let trigger = req
275            .trigger
276            .as_ref()
277            .ok_or_else(|| Status::invalid_argument("Missing trigger event book"))?;
278
279        let process_state = req.process_state.as_ref().cloned().unwrap_or_default();
280
281        let response = self
282            .router
283            .dispatch(trigger, &process_state, &req.destinations)?;
284
285        Ok(Response::new(response))
286    }
287}
288
289/// Handle function type for upcasters (function pointer).
290pub type UpcasterHandleFn = fn(&[crate::proto::EventPage]) -> Vec<crate::proto::EventPage>;
291
292/// Handle closure type for upcasters.
293pub type UpcasterHandleClosureFn =
294    Arc<dyn Fn(&[crate::proto::EventPage]) -> Vec<crate::proto::EventPage> + Send + Sync>;
295
296/// Internal handle type - either fn pointer or closure.
297enum UpcasterHandleType {
298    Fn(UpcasterHandleFn),
299    Closure(UpcasterHandleClosureFn),
300}
301
302/// gRPC upcaster service implementation.
303///
304/// Wraps a handle function to transform events to current versions.
305pub struct UpcasterGrpcHandler {
306    name: String,
307    domain: String,
308    handle_type: Option<UpcasterHandleType>,
309}
310
311impl UpcasterGrpcHandler {
312    /// Create a new upcaster handler.
313    pub fn new(name: impl Into<String>, domain: impl Into<String>) -> Self {
314        Self {
315            name: name.into(),
316            domain: domain.into(),
317            handle_type: None,
318        }
319    }
320
321    /// Set the handle function (function pointer).
322    pub fn with_handle(mut self, handle_fn: UpcasterHandleFn) -> Self {
323        self.handle_type = Some(UpcasterHandleType::Fn(handle_fn));
324        self
325    }
326
327    /// Set the handle function (closure).
328    pub fn with_handle_fn<H>(mut self, handle_fn: H) -> Self
329    where
330        H: Fn(&[crate::proto::EventPage]) -> Vec<crate::proto::EventPage> + Send + Sync + 'static,
331    {
332        self.handle_type = Some(UpcasterHandleType::Closure(Arc::new(handle_fn)));
333        self
334    }
335
336    /// Get the upcaster name.
337    pub fn name(&self) -> &str {
338        &self.name
339    }
340
341    /// Get the domain this upcaster handles.
342    pub fn domain(&self) -> &str {
343        &self.domain
344    }
345}
346
347#[tonic::async_trait]
348impl UpcasterService for UpcasterGrpcHandler {
349    async fn upcast(
350        &self,
351        request: Request<UpcastRequest>,
352    ) -> Result<Response<UpcastResponse>, Status> {
353        let req = request.into_inner();
354        let events = req.events;
355
356        let result = match &self.handle_type {
357            Some(UpcasterHandleType::Fn(handle_fn)) => handle_fn(&events),
358            Some(UpcasterHandleType::Closure(handle_fn)) => handle_fn(&events),
359            None => events, // Passthrough if no handler
360        };
361
362        Ok(Response::new(UpcastResponse { events: result }))
363    }
364}
365
366/// gRPC CloudEvents projector service implementation.
367///
368/// Wraps a `CloudEventsRouter` to transform events into CloudEvents.
369/// Uses the standard ProjectorService protocol but returns CloudEventsResponse
370/// packed into Projection.projection.
371pub struct CloudEventsGrpcHandler {
372    router: Arc<CloudEventsRouter>,
373}
374
375impl CloudEventsGrpcHandler {
376    /// Create a new CloudEvents handler from a router.
377    pub fn new(router: CloudEventsRouter) -> Self {
378        Self {
379            router: Arc::new(router),
380        }
381    }
382
383    /// Get the underlying router.
384    pub fn router(&self) -> &CloudEventsRouter {
385        &self.router
386    }
387}
388
389#[tonic::async_trait]
390impl ProjectorService for CloudEventsGrpcHandler {
391    async fn handle(&self, request: Request<EventBook>) -> Result<Response<Projection>, Status> {
392        let event_book = request.into_inner();
393        let response = self.router.project(&event_book);
394
395        // Pack CloudEventsResponse into Projection.projection
396        let projection_any =
397            Any::from_msg(&response).map_err(|e| Status::internal(format!("Pack error: {}", e)))?;
398
399        Ok(Response::new(Projection {
400            cover: event_book.cover,
401            projector: self.router.name().to_string(),
402            sequence: event_book.next_sequence,
403            projection: Some(projection_any),
404        }))
405    }
406
407    async fn handle_speculative(
408        &self,
409        request: Request<EventBook>,
410    ) -> Result<Response<Projection>, Status> {
411        self.handle(request).await
412    }
413}