1#![allow(clippy::result_large_err)]
3
4mod cloudevents;
34mod dispatch;
35mod factory;
36mod helpers;
37mod saga_context;
38mod state;
39mod traits;
40mod upcaster;
41
42use std::collections::HashMap;
43use std::marker::PhantomData;
44use std::sync::Arc;
45
46use prost_types::Any;
47use tonic::Status;
48
49use crate::proto::{
50 business_response, event_page, BusinessResponse, ContextualCommand, Cover, EventBook,
51 Notification, ProcessManagerHandleResponse, Projection, RejectionNotification,
52 RevocationResponse, SagaResponse,
53};
54
55pub use helpers::{event_book_from, event_page, new_event_book, new_event_book_multi, pack_event};
57pub use saga_context::SagaContext;
58pub use state::{EventApplier, EventApplierHOF, StateFactory, StateRouter};
59pub use traits::{
60 CommandHandlerDomainHandler, CommandRejectedError, CommandResult, ProcessManagerDomainHandler,
61 ProcessManagerResponse, ProjectorDomainHandler, RejectionHandlerResponse, SagaDomainHandler,
62 SagaHandlerResponse, UnpackAny,
63};
64pub use upcaster::{BoxedUpcasterHandler, UpcasterHandler, UpcasterHandlerHOF, UpcasterMode, UpcasterRouter};
65
66pub use factory::{BoxedHandlerFactory, HandlerFactory, HandlerHOF};
68
69pub use cloudevents::{CloudEventsHandler, CloudEventsProjector, CloudEventsRouter};
71
72pub use crate::dispatch_command;
74pub use crate::dispatch_event;
75
76pub struct CommandHandlerMode;
82
83pub struct SagaMode;
85
86pub struct ProcessManagerMode;
88
89pub struct ProjectorMode;
91
92enum HandlerStorage<H> {
98 Static(H),
100 Factory(Arc<dyn Fn() -> H + Send + Sync>),
102}
103
104impl<H> HandlerStorage<H> {
105 fn get(&self) -> HandlerRef<'_, H>
107 where
108 H: Clone,
109 {
110 match self {
111 Self::Static(h) => HandlerRef::Borrowed(h),
112 Self::Factory(f) => HandlerRef::Owned(f()),
113 }
114 }
115}
116
117enum HandlerRef<'a, H> {
119 Borrowed(&'a H),
120 Owned(H),
121}
122
123impl<'a, H> std::ops::Deref for HandlerRef<'a, H> {
124 type Target = H;
125
126 fn deref(&self) -> &Self::Target {
127 match self {
128 Self::Borrowed(h) => h,
129 Self::Owned(h) => h,
130 }
131 }
132}
133
134pub struct CommandHandlerRouter<S, H>
154where
155 H: CommandHandlerDomainHandler<State = S>,
156{
157 name: String,
158 domain: String,
159 storage: HandlerStorage<H>,
160 _state: PhantomData<S>,
161}
162
163impl<S: Default + Send + Sync + 'static, H: CommandHandlerDomainHandler<State = S> + Clone>
164 CommandHandlerRouter<S, H>
165{
166 pub fn new(name: impl Into<String>, domain: impl Into<String>, handler: H) -> Self {
171 Self {
172 name: name.into(),
173 domain: domain.into(),
174 storage: HandlerStorage::Static(handler),
175 _state: PhantomData,
176 }
177 }
178
179 pub fn with_factory<F>(name: impl Into<String>, domain: impl Into<String>, factory: F) -> Self
195 where
196 F: Fn() -> H + Send + Sync + 'static,
197 {
198 Self {
199 name: name.into(),
200 domain: domain.into(),
201 storage: HandlerStorage::Factory(Arc::new(factory)),
202 _state: PhantomData,
203 }
204 }
205
206 pub fn name(&self) -> &str {
208 &self.name
209 }
210
211 pub fn domain(&self) -> &str {
213 &self.domain
214 }
215
216 pub fn command_types(&self) -> Vec<String> {
218 self.storage.get().command_types()
219 }
220
221 pub fn subscriptions(&self) -> Vec<(String, Vec<String>)> {
223 vec![(self.domain.clone(), self.command_types())]
224 }
225
226 pub fn rebuild_state(&self, events: &EventBook) -> S {
228 self.storage.get().rebuild(events)
229 }
230
231 pub fn dispatch(&self, cmd: &ContextualCommand) -> Result<BusinessResponse, Status> {
233 let command_book = cmd
234 .command
235 .as_ref()
236 .ok_or_else(|| Status::invalid_argument("Missing command book"))?;
237
238 let command_page = command_book
239 .pages
240 .first()
241 .ok_or_else(|| Status::invalid_argument("Missing command page"))?;
242
243 let command_any = match &command_page.payload {
244 Some(crate::proto::command_page::Payload::Command(c)) => c,
245 _ => return Err(Status::invalid_argument("Missing command")),
246 };
247
248 let event_book = cmd
249 .events
250 .as_ref()
251 .ok_or_else(|| Status::invalid_argument("Missing event book"))?;
252
253 let handler = self.storage.get();
255
256 let state = handler.rebuild(event_book);
258 let seq = crate::EventBookExt::next_sequence(event_book);
259
260 let type_url = &command_any.type_url;
261
262 if type_url.ends_with("Notification") {
264 return dispatch_command_handler_notification(&*handler, command_any, &state);
265 }
266
267 let result_book = handler.handle(command_book, command_any, &state, seq)?;
269
270 Ok(BusinessResponse {
271 result: Some(business_response::Result::Events(result_book)),
272 })
273 }
274}
275
276fn dispatch_command_handler_notification<S: Default + 'static>(
278 handler: &dyn CommandHandlerDomainHandler<State = S>,
279 command_any: &Any,
280 state: &S,
281) -> Result<BusinessResponse, Status> {
282 use prost::Message;
283
284 let notification = Notification::decode(command_any.value.as_slice())
285 .map_err(|e| Status::invalid_argument(format!("Failed to decode Notification: {}", e)))?;
286
287 let rejection = notification
288 .payload
289 .as_ref()
290 .map(|p| RejectionNotification::decode(p.value.as_slice()))
291 .transpose()
292 .map_err(|e| {
293 Status::invalid_argument(format!("Failed to decode RejectionNotification: {}", e))
294 })?
295 .unwrap_or_default();
296
297 let (domain, cmd_suffix) = extract_rejection_key(&rejection);
298
299 let response = handler.on_rejected(¬ification, state, &domain, &cmd_suffix)?;
300
301 match (response.events, response.notification) {
302 (Some(events), _) => Ok(BusinessResponse {
303 result: Some(business_response::Result::Events(events)),
304 }),
305 (None, Some(notif)) => Ok(BusinessResponse {
306 result: Some(business_response::Result::Notification(notif)),
307 }),
308 (None, None) => Ok(BusinessResponse {
309 result: Some(business_response::Result::Revocation(RevocationResponse {
310 emit_system_revocation: true,
311 send_to_dead_letter_queue: false,
312 escalate: false,
313 abort: false,
314 reason: format!(
315 "Handler returned empty response for {}/{}",
316 domain, cmd_suffix
317 ),
318 })),
319 }),
320 }
321}
322
323pub struct SagaRouter<H>
343where
344 H: SagaDomainHandler,
345{
346 name: String,
347 domain: String,
348 storage: HandlerStorage<H>,
349}
350
351impl<H: SagaDomainHandler + Clone> SagaRouter<H> {
352 pub fn new(name: impl Into<String>, domain: impl Into<String>, handler: H) -> Self {
357 Self {
358 name: name.into(),
359 domain: domain.into(),
360 storage: HandlerStorage::Static(handler),
361 }
362 }
363
364 pub fn with_factory<F>(name: impl Into<String>, domain: impl Into<String>, factory: F) -> Self
379 where
380 F: Fn() -> H + Send + Sync + 'static,
381 {
382 Self {
383 name: name.into(),
384 domain: domain.into(),
385 storage: HandlerStorage::Factory(Arc::new(factory)),
386 }
387 }
388
389 pub fn name(&self) -> &str {
391 &self.name
392 }
393
394 pub fn input_domain(&self) -> &str {
396 &self.domain
397 }
398
399 pub fn event_types(&self) -> Vec<String> {
401 self.storage.get().event_types()
402 }
403
404 pub fn subscriptions(&self) -> Vec<(String, Vec<String>)> {
406 vec![(self.domain.clone(), self.event_types())]
407 }
408
409 pub fn dispatch(&self, source: &EventBook) -> Result<SagaResponse, Status> {
414 let event_page = source
415 .pages
416 .last()
417 .ok_or_else(|| Status::invalid_argument("Source event book has no events"))?;
418
419 let event_any = match &event_page.payload {
420 Some(event_page::Payload::Event(e)) => e,
421 _ => return Err(Status::invalid_argument("Missing event payload")),
422 };
423
424 let handler = self.storage.get();
426
427 if event_any.type_url.ends_with("Notification") {
429 return dispatch_saga_notification(&*handler, event_any);
430 }
431
432 let response = handler.handle(source, event_any)?;
433
434 Ok(SagaResponse {
435 commands: response.commands,
436 events: response.events,
437 })
438 }
439}
440
441fn dispatch_saga_notification<H: SagaDomainHandler>(
443 handler: &H,
444 event_any: &Any,
445) -> Result<SagaResponse, Status> {
446 use prost::Message;
447
448 let notification = Notification::decode(event_any.value.as_slice())
449 .map_err(|e| Status::invalid_argument(format!("Failed to decode Notification: {}", e)))?;
450
451 let rejection = notification
452 .payload
453 .as_ref()
454 .map(|p| RejectionNotification::decode(p.value.as_slice()))
455 .transpose()
456 .map_err(|e| {
457 Status::invalid_argument(format!("Failed to decode RejectionNotification: {}", e))
458 })?
459 .unwrap_or_default();
460
461 let (domain, cmd_suffix) = extract_rejection_key(&rejection);
462
463 let response = handler.on_rejected(¬ification, &domain, &cmd_suffix)?;
464
465 Ok(SagaResponse {
467 commands: vec![],
468 events: response.events.into_iter().collect(),
469 })
470}
471
472fn extract_rejection_key(rejection: &RejectionNotification) -> (String, String) {
474 if let Some(rejected) = &rejection.rejected_command {
475 let domain = rejected
476 .cover
477 .as_ref()
478 .map(|c| c.domain.clone())
479 .unwrap_or_default();
480
481 let cmd_suffix = rejected
482 .pages
483 .first()
484 .and_then(|p| match &p.payload {
485 Some(crate::proto::command_page::Payload::Command(c)) => Some(c),
486 _ => None,
487 })
488 .map(|c| {
489 c.type_url
490 .rsplit('/')
491 .next()
492 .unwrap_or(&c.type_url)
493 .to_string()
494 })
495 .unwrap_or_default();
496
497 (domain, cmd_suffix)
498 } else {
499 (String::new(), String::new())
500 }
501}
502
503pub struct ProcessManagerRouter<S: Default + Send + Sync + 'static> {
511 name: String,
512 pm_domain: String,
513 rebuild: Arc<dyn Fn(&EventBook) -> S + Send + Sync>,
514 domains: HashMap<String, Arc<dyn ProcessManagerDomainHandler<S>>>,
515}
516
517impl<S: Default + Send + Sync + 'static> ProcessManagerRouter<S> {
518 pub fn new<R>(name: impl Into<String>, pm_domain: impl Into<String>, rebuild: R) -> Self
523 where
524 R: Fn(&EventBook) -> S + Send + Sync + 'static,
525 {
526 Self {
527 name: name.into(),
528 pm_domain: pm_domain.into(),
529 rebuild: Arc::new(rebuild),
530 domains: HashMap::new(),
531 }
532 }
533
534 pub fn domain<H>(mut self, name: impl Into<String>, handler: H) -> Self
538 where
539 H: ProcessManagerDomainHandler<S> + 'static,
540 {
541 self.domains.insert(name.into(), Arc::new(handler));
542 self
543 }
544
545 pub fn name(&self) -> &str {
547 &self.name
548 }
549
550 pub fn pm_domain(&self) -> &str {
552 &self.pm_domain
553 }
554
555 pub fn subscriptions(&self) -> Vec<(String, Vec<String>)> {
557 self.domains
558 .iter()
559 .map(|(domain, handler)| (domain.clone(), handler.event_types()))
560 .collect()
561 }
562
563 pub fn rebuild_state(&self, events: &EventBook) -> S {
565 (self.rebuild)(events)
566 }
567
568 pub fn prepare_destinations(
570 &self,
571 trigger: &Option<EventBook>,
572 process_state: &Option<EventBook>,
573 ) -> Vec<Cover> {
574 let trigger = match trigger {
575 Some(t) => t,
576 None => return vec![],
577 };
578
579 let trigger_domain = trigger
580 .cover
581 .as_ref()
582 .map(|c| c.domain.as_str())
583 .unwrap_or("");
584
585 let event_page = match trigger.pages.last() {
586 Some(p) => p,
587 None => return vec![],
588 };
589
590 let event_any = match &event_page.payload {
591 Some(event_page::Payload::Event(e)) => e,
592 _ => return vec![],
593 };
594
595 let state = match process_state {
596 Some(ps) => self.rebuild_state(ps),
597 None => S::default(),
598 };
599
600 self.domains
601 .get(trigger_domain)
602 .map(|handler| handler.prepare(trigger, &state, event_any))
603 .unwrap_or_default()
604 }
605
606 pub fn dispatch(
608 &self,
609 trigger: &EventBook,
610 process_state: &EventBook,
611 destinations: &[EventBook],
612 ) -> Result<ProcessManagerHandleResponse, Status> {
613 let trigger_domain = trigger
614 .cover
615 .as_ref()
616 .map(|c| c.domain.as_str())
617 .unwrap_or("");
618
619 let handler = self.domains.get(trigger_domain).ok_or_else(|| {
620 Status::unimplemented(format!("No handler for domain: {}", trigger_domain))
621 })?;
622
623 let event_page = trigger
624 .pages
625 .last()
626 .ok_or_else(|| Status::invalid_argument("Trigger event book has no events"))?;
627
628 let event_any = match &event_page.payload {
629 Some(event_page::Payload::Event(e)) => e,
630 _ => return Err(Status::invalid_argument("Missing event payload")),
631 };
632
633 let state = self.rebuild_state(process_state);
634
635 if event_any.type_url.ends_with("Notification") {
637 return dispatch_pm_notification(handler.as_ref(), event_any, &state);
638 }
639
640 let response = handler.handle(trigger, &state, event_any, destinations)?;
641
642 Ok(ProcessManagerHandleResponse {
643 commands: response.commands,
644 process_events: response.process_events,
645 facts: response.facts,
646 })
647 }
648}
649
650fn dispatch_pm_notification<S: Default>(
652 handler: &dyn ProcessManagerDomainHandler<S>,
653 event_any: &Any,
654 state: &S,
655) -> Result<ProcessManagerHandleResponse, Status> {
656 use prost::Message;
657
658 let notification = Notification::decode(event_any.value.as_slice())
659 .map_err(|e| Status::invalid_argument(format!("Failed to decode Notification: {}", e)))?;
660
661 let rejection = notification
662 .payload
663 .as_ref()
664 .map(|p| RejectionNotification::decode(p.value.as_slice()))
665 .transpose()
666 .map_err(|e| {
667 Status::invalid_argument(format!("Failed to decode RejectionNotification: {}", e))
668 })?
669 .unwrap_or_default();
670
671 let (domain, cmd_suffix) = extract_rejection_key(&rejection);
672
673 let response = handler.on_rejected(¬ification, state, &domain, &cmd_suffix)?;
674
675 Ok(ProcessManagerHandleResponse {
676 commands: vec![],
677 process_events: response.events,
678 facts: vec![],
679 })
680}
681
682pub struct ProjectorRouter {
690 name: String,
691 domains: HashMap<String, Arc<dyn ProjectorDomainHandler>>,
692}
693
694impl ProjectorRouter {
695 pub fn new(name: impl Into<String>) -> Self {
699 Self {
700 name: name.into(),
701 domains: HashMap::new(),
702 }
703 }
704
705 pub fn domain<H>(mut self, name: impl Into<String>, handler: H) -> Self
709 where
710 H: ProjectorDomainHandler + 'static,
711 {
712 self.domains.insert(name.into(), Arc::new(handler));
713 self
714 }
715
716 pub fn name(&self) -> &str {
718 &self.name
719 }
720
721 pub fn subscriptions(&self) -> Vec<(String, Vec<String>)> {
723 self.domains
724 .iter()
725 .map(|(domain, handler)| (domain.clone(), handler.event_types()))
726 .collect()
727 }
728
729 pub fn dispatch(&self, events: &EventBook) -> Result<Projection, Status> {
731 let domain = events
732 .cover
733 .as_ref()
734 .map(|c| c.domain.as_str())
735 .unwrap_or("");
736
737 let handler = self
738 .domains
739 .get(domain)
740 .ok_or_else(|| Status::unimplemented(format!("No handler for domain: {}", domain)))?;
741
742 handler
743 .project(events)
744 .map_err(|e| Status::internal(e.to_string()))
745 }
746}
747
748#[cfg(test)]
753mod tests {
754 use super::*;
755
756 #[test]
758 fn mode_markers_are_zero_sized() {
759 assert_eq!(std::mem::size_of::<CommandHandlerMode>(), 0);
760 assert_eq!(std::mem::size_of::<SagaMode>(), 0);
761 assert_eq!(std::mem::size_of::<ProcessManagerMode>(), 0);
762 assert_eq!(std::mem::size_of::<ProjectorMode>(), 0);
763 }
764
765 #[test]
767 fn pm_router_creation() {
768 let router: ProcessManagerRouter<()> =
769 ProcessManagerRouter::new("test-pm", "pm-domain", |_| ());
770 assert_eq!(router.name(), "test-pm");
771 assert_eq!(router.pm_domain(), "pm-domain");
772 }
773
774 #[test]
776 fn projector_router_creation() {
777 let router = ProjectorRouter::new("test-prj");
778 assert_eq!(router.name(), "test-prj");
779 }
780}