1use std::sync::Arc;
6
7use prost_types::Any;
8use tonic::{Request, Response, Status};
9
10use crate::proto::{
11 command_handler_service_server::CommandHandlerService,
12 process_manager_service_server::ProcessManagerService,
13 projector_service_server::ProjectorService, saga_service_server::SagaService,
14 upcaster_service_server::UpcasterService, BusinessResponse, ContextualCommand, EventBook,
15 ProcessManagerHandleRequest, ProcessManagerHandleResponse, ProcessManagerPrepareRequest,
16 ProcessManagerPrepareResponse, Projection, ReplayRequest, ReplayResponse, SagaHandleRequest,
17 SagaResponse, UpcastRequest, UpcastResponse,
18};
19use crate::router::{
20 CloudEventsRouter, CommandHandlerDomainHandler, CommandHandlerRouter, ProcessManagerRouter,
21 SagaDomainHandler, SagaRouter,
22};
23
24pub type StatePacker<S> = fn(&S) -> Result<Any, Status>;
28
29pub struct CommandHandlerGrpc<S, H>
34where
35 S: Default + Send + Sync + 'static,
36 H: CommandHandlerDomainHandler<State = S> + Clone + 'static,
37{
38 router: Arc<CommandHandlerRouter<S, H>>,
39 state_packer: Option<StatePacker<S>>,
41}
42
43impl<S, H> CommandHandlerGrpc<S, H>
44where
45 S: Default + Send + Sync + 'static,
46 H: CommandHandlerDomainHandler<State = S> + Clone + 'static,
47{
48 pub fn new(router: CommandHandlerRouter<S, H>) -> Self {
50 Self {
51 router: Arc::new(router),
52 state_packer: None,
53 }
54 }
55
56 pub fn with_replay(mut self, packer: StatePacker<S>) -> Self {
58 self.state_packer = Some(packer);
59 self
60 }
61
62 pub fn router(&self) -> &CommandHandlerRouter<S, H> {
64 &self.router
65 }
66}
67
68#[tonic::async_trait]
69impl<S, H> CommandHandlerService for CommandHandlerGrpc<S, H>
70where
71 S: Default + Send + Sync + 'static,
72 H: CommandHandlerDomainHandler<State = S> + Clone + 'static,
73{
74 async fn handle(
75 &self,
76 request: Request<ContextualCommand>,
77 ) -> Result<Response<BusinessResponse>, Status> {
78 let cmd = request.into_inner();
79 let response = self.router.dispatch(&cmd)?;
80 Ok(Response::new(response))
81 }
82
83 async fn replay(
84 &self,
85 request: Request<ReplayRequest>,
86 ) -> Result<Response<ReplayResponse>, Status> {
87 let packer = self.state_packer.ok_or_else(|| {
88 Status::unimplemented(
89 "Replay not implemented. Call with_replay() to enable for MERGE_COMMUTATIVE strategy.",
90 )
91 })?;
92
93 let req = request.into_inner();
94 let event_book = build_event_book_for_replay(&req);
95 let state = self.router.rebuild_state(&event_book);
96 let state_any = packer(&state)?;
97
98 Ok(Response::new(ReplayResponse {
99 state: Some(state_any),
100 }))
101 }
102}
103
104fn build_event_book_for_replay(req: &ReplayRequest) -> EventBook {
106 EventBook {
107 cover: None,
108 pages: req.events.clone(),
109 snapshot: req.base_snapshot.clone(),
110 next_sequence: 0,
111 }
112}
113
114pub struct SagaHandler<H>
118where
119 H: SagaDomainHandler + Clone + 'static,
120{
121 router: Arc<SagaRouter<H>>,
122}
123
124impl<H: SagaDomainHandler + Clone + 'static> SagaHandler<H> {
125 pub fn new(router: SagaRouter<H>) -> Self {
127 Self {
128 router: Arc::new(router),
129 }
130 }
131
132 pub fn router(&self) -> &SagaRouter<H> {
134 &self.router
135 }
136}
137
138#[tonic::async_trait]
139impl<H: SagaDomainHandler + Clone + 'static> SagaService for SagaHandler<H> {
140 async fn handle(
141 &self,
142 request: Request<SagaHandleRequest>,
143 ) -> Result<Response<SagaResponse>, Status> {
144 let req = request.into_inner();
145 let source = req
146 .source
147 .as_ref()
148 .ok_or_else(|| Status::invalid_argument("Missing source event book"))?;
149
150 let response = self.router.dispatch(source)?;
151 Ok(Response::new(response))
152 }
153}
154
155pub type ProjectorHandleFn = fn(&EventBook) -> Result<Projection, Status>;
157
158pub type ProjectorHandleClosureFn =
160 Arc<dyn Fn(&EventBook) -> Result<Projection, Status> + Send + Sync>;
161
162enum ProjectorHandleType {
164 Fn(ProjectorHandleFn),
165 Closure(ProjectorHandleClosureFn),
166}
167
168pub struct ProjectorHandler {
172 name: String,
173 handle_type: Option<ProjectorHandleType>,
174}
175
176impl ProjectorHandler {
177 pub fn new(name: impl Into<String>) -> Self {
179 Self {
180 name: name.into(),
181 handle_type: None,
182 }
183 }
184
185 pub fn with_handle(mut self, handle_fn: ProjectorHandleFn) -> Self {
187 self.handle_type = Some(ProjectorHandleType::Fn(handle_fn));
188 self
189 }
190
191 pub fn with_handle_fn<H>(mut self, handle_fn: H) -> Self
193 where
194 H: Fn(&EventBook) -> Result<Projection, Status> + Send + Sync + 'static,
195 {
196 self.handle_type = Some(ProjectorHandleType::Closure(Arc::new(handle_fn)));
197 self
198 }
199
200 pub fn name(&self) -> &str {
202 &self.name
203 }
204}
205
206#[tonic::async_trait]
207impl ProjectorService for ProjectorHandler {
208 async fn handle(&self, request: Request<EventBook>) -> Result<Response<Projection>, Status> {
209 let event_book = request.into_inner();
210 match &self.handle_type {
211 Some(ProjectorHandleType::Fn(handle_fn)) => {
212 let projection = handle_fn(&event_book)?;
213 Ok(Response::new(projection))
214 }
215 Some(ProjectorHandleType::Closure(handle_fn)) => {
216 let projection = handle_fn(&event_book)?;
217 Ok(Response::new(projection))
218 }
219 None => Ok(Response::new(Projection::default())),
220 }
221 }
222
223 async fn handle_speculative(
224 &self,
225 request: Request<EventBook>,
226 ) -> Result<Response<Projection>, Status> {
227 self.handle(request).await
228 }
229}
230
231pub struct ProcessManagerGrpcHandler<S: Default + Send + Sync + 'static> {
235 router: Arc<ProcessManagerRouter<S>>,
236}
237
238impl<S: Default + Send + Sync + 'static> ProcessManagerGrpcHandler<S> {
239 pub fn new(router: ProcessManagerRouter<S>) -> Self {
241 Self {
242 router: Arc::new(router),
243 }
244 }
245
246 pub fn router(&self) -> &ProcessManagerRouter<S> {
248 &self.router
249 }
250}
251
252#[tonic::async_trait]
253impl<S: Default + Send + Sync + 'static> ProcessManagerService for ProcessManagerGrpcHandler<S> {
254 async fn prepare(
255 &self,
256 request: Request<ProcessManagerPrepareRequest>,
257 ) -> Result<Response<ProcessManagerPrepareResponse>, Status> {
258 let req = request.into_inner();
259 let destinations = self
260 .router
261 .prepare_destinations(&req.trigger, &req.process_state);
262
263 Ok(Response::new(ProcessManagerPrepareResponse {
264 destinations,
265 }))
266 }
267
268 async fn handle(
269 &self,
270 request: Request<ProcessManagerHandleRequest>,
271 ) -> Result<Response<ProcessManagerHandleResponse>, Status> {
272 let req = request.into_inner();
273
274 let trigger = req
275 .trigger
276 .as_ref()
277 .ok_or_else(|| Status::invalid_argument("Missing trigger event book"))?;
278
279 let process_state = req.process_state.as_ref().cloned().unwrap_or_default();
280
281 let response = self
282 .router
283 .dispatch(trigger, &process_state, &req.destinations)?;
284
285 Ok(Response::new(response))
286 }
287}
288
289pub type UpcasterHandleFn = fn(&[crate::proto::EventPage]) -> Vec<crate::proto::EventPage>;
291
292pub type UpcasterHandleClosureFn =
294 Arc<dyn Fn(&[crate::proto::EventPage]) -> Vec<crate::proto::EventPage> + Send + Sync>;
295
296enum UpcasterHandleType {
298 Fn(UpcasterHandleFn),
299 Closure(UpcasterHandleClosureFn),
300}
301
302pub struct UpcasterGrpcHandler {
306 name: String,
307 domain: String,
308 handle_type: Option<UpcasterHandleType>,
309}
310
311impl UpcasterGrpcHandler {
312 pub fn new(name: impl Into<String>, domain: impl Into<String>) -> Self {
314 Self {
315 name: name.into(),
316 domain: domain.into(),
317 handle_type: None,
318 }
319 }
320
321 pub fn with_handle(mut self, handle_fn: UpcasterHandleFn) -> Self {
323 self.handle_type = Some(UpcasterHandleType::Fn(handle_fn));
324 self
325 }
326
327 pub fn with_handle_fn<H>(mut self, handle_fn: H) -> Self
329 where
330 H: Fn(&[crate::proto::EventPage]) -> Vec<crate::proto::EventPage> + Send + Sync + 'static,
331 {
332 self.handle_type = Some(UpcasterHandleType::Closure(Arc::new(handle_fn)));
333 self
334 }
335
336 pub fn name(&self) -> &str {
338 &self.name
339 }
340
341 pub fn domain(&self) -> &str {
343 &self.domain
344 }
345}
346
347#[tonic::async_trait]
348impl UpcasterService for UpcasterGrpcHandler {
349 async fn upcast(
350 &self,
351 request: Request<UpcastRequest>,
352 ) -> Result<Response<UpcastResponse>, Status> {
353 let req = request.into_inner();
354 let events = req.events;
355
356 let result = match &self.handle_type {
357 Some(UpcasterHandleType::Fn(handle_fn)) => handle_fn(&events),
358 Some(UpcasterHandleType::Closure(handle_fn)) => handle_fn(&events),
359 None => events, };
361
362 Ok(Response::new(UpcastResponse { events: result }))
363 }
364}
365
366pub struct CloudEventsGrpcHandler {
372 router: Arc<CloudEventsRouter>,
373}
374
375impl CloudEventsGrpcHandler {
376 pub fn new(router: CloudEventsRouter) -> Self {
378 Self {
379 router: Arc::new(router),
380 }
381 }
382
383 pub fn router(&self) -> &CloudEventsRouter {
385 &self.router
386 }
387}
388
389#[tonic::async_trait]
390impl ProjectorService for CloudEventsGrpcHandler {
391 async fn handle(&self, request: Request<EventBook>) -> Result<Response<Projection>, Status> {
392 let event_book = request.into_inner();
393 let response = self.router.project(&event_book);
394
395 let projection_any =
397 Any::from_msg(&response).map_err(|e| Status::internal(format!("Pack error: {}", e)))?;
398
399 Ok(Response::new(Projection {
400 cover: event_book.cover,
401 projector: self.router.name().to_string(),
402 sequence: event_book.next_sequence,
403 projection: Some(projection_any),
404 }))
405 }
406
407 async fn handle_speculative(
408 &self,
409 request: Request<EventBook>,
410 ) -> Result<Response<Projection>, Status> {
411 self.handle(request).await
412 }
413}