#![allow(clippy::result_large_err)]
mod cloudevents;
mod dispatch;
mod factory;
mod helpers;
mod saga_context;
mod state;
mod traits;
mod upcaster;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use prost_types::Any;
use tonic::Status;
use crate::proto::{
business_response, event_page, BusinessResponse, ContextualCommand, Cover, EventBook,
Notification, ProcessManagerHandleResponse, Projection, RejectionNotification,
RevocationResponse, SagaResponse,
};
pub use helpers::{event_book_from, event_page, new_event_book, new_event_book_multi, pack_event};
pub use saga_context::SagaContext;
pub use state::{EventApplier, EventApplierHOF, StateFactory, StateRouter};
pub use traits::{
CommandHandlerDomainHandler, CommandRejectedError, CommandResult, ProcessManagerDomainHandler,
ProcessManagerResponse, ProjectorDomainHandler, RejectionHandlerResponse, SagaDomainHandler,
SagaHandlerResponse, UnpackAny,
};
pub use upcaster::{BoxedUpcasterHandler, UpcasterHandler, UpcasterHandlerHOF, UpcasterMode, UpcasterRouter};
pub use factory::{BoxedHandlerFactory, HandlerFactory, HandlerHOF};
pub use cloudevents::{CloudEventsHandler, CloudEventsProjector, CloudEventsRouter};
pub use crate::dispatch_command;
pub use crate::dispatch_event;
pub struct CommandHandlerMode;
pub struct SagaMode;
pub struct ProcessManagerMode;
pub struct ProjectorMode;
enum HandlerStorage<H> {
Static(H),
Factory(Arc<dyn Fn() -> H + Send + Sync>),
}
impl<H> HandlerStorage<H> {
fn get(&self) -> HandlerRef<'_, H>
where
H: Clone,
{
match self {
Self::Static(h) => HandlerRef::Borrowed(h),
Self::Factory(f) => HandlerRef::Owned(f()),
}
}
}
enum HandlerRef<'a, H> {
Borrowed(&'a H),
Owned(H),
}
impl<'a, H> std::ops::Deref for HandlerRef<'a, H> {
type Target = H;
fn deref(&self) -> &Self::Target {
match self {
Self::Borrowed(h) => h,
Self::Owned(h) => h,
}
}
}
pub struct CommandHandlerRouter<S, H>
where
H: CommandHandlerDomainHandler<State = S>,
{
name: String,
domain: String,
storage: HandlerStorage<H>,
_state: PhantomData<S>,
}
impl<S: Default + Send + Sync + 'static, H: CommandHandlerDomainHandler<State = S> + Clone>
CommandHandlerRouter<S, H>
{
pub fn new(name: impl Into<String>, domain: impl Into<String>, handler: H) -> Self {
Self {
name: name.into(),
domain: domain.into(),
storage: HandlerStorage::Static(handler),
_state: PhantomData,
}
}
pub fn with_factory<F>(name: impl Into<String>, domain: impl Into<String>, factory: F) -> Self
where
F: Fn() -> H + Send + Sync + 'static,
{
Self {
name: name.into(),
domain: domain.into(),
storage: HandlerStorage::Factory(Arc::new(factory)),
_state: PhantomData,
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn domain(&self) -> &str {
&self.domain
}
pub fn command_types(&self) -> Vec<String> {
self.storage.get().command_types()
}
pub fn subscriptions(&self) -> Vec<(String, Vec<String>)> {
vec![(self.domain.clone(), self.command_types())]
}
pub fn rebuild_state(&self, events: &EventBook) -> S {
self.storage.get().rebuild(events)
}
pub fn dispatch(&self, cmd: &ContextualCommand) -> Result<BusinessResponse, Status> {
let command_book = cmd
.command
.as_ref()
.ok_or_else(|| Status::invalid_argument("Missing command book"))?;
let command_page = command_book
.pages
.first()
.ok_or_else(|| Status::invalid_argument("Missing command page"))?;
let command_any = match &command_page.payload {
Some(crate::proto::command_page::Payload::Command(c)) => c,
_ => return Err(Status::invalid_argument("Missing command")),
};
let event_book = cmd
.events
.as_ref()
.ok_or_else(|| Status::invalid_argument("Missing event book"))?;
let handler = self.storage.get();
let state = handler.rebuild(event_book);
let seq = crate::EventBookExt::next_sequence(event_book);
let type_url = &command_any.type_url;
if type_url.ends_with("Notification") {
return dispatch_command_handler_notification(&*handler, command_any, &state);
}
let result_book = handler.handle(command_book, command_any, &state, seq)?;
Ok(BusinessResponse {
result: Some(business_response::Result::Events(result_book)),
})
}
}
fn dispatch_command_handler_notification<S: Default + 'static>(
handler: &dyn CommandHandlerDomainHandler<State = S>,
command_any: &Any,
state: &S,
) -> Result<BusinessResponse, Status> {
use prost::Message;
let notification = Notification::decode(command_any.value.as_slice())
.map_err(|e| Status::invalid_argument(format!("Failed to decode Notification: {}", e)))?;
let rejection = notification
.payload
.as_ref()
.map(|p| RejectionNotification::decode(p.value.as_slice()))
.transpose()
.map_err(|e| {
Status::invalid_argument(format!("Failed to decode RejectionNotification: {}", e))
})?
.unwrap_or_default();
let (domain, cmd_suffix) = extract_rejection_key(&rejection);
let response = handler.on_rejected(¬ification, state, &domain, &cmd_suffix)?;
match (response.events, response.notification) {
(Some(events), _) => Ok(BusinessResponse {
result: Some(business_response::Result::Events(events)),
}),
(None, Some(notif)) => Ok(BusinessResponse {
result: Some(business_response::Result::Notification(notif)),
}),
(None, None) => Ok(BusinessResponse {
result: Some(business_response::Result::Revocation(RevocationResponse {
emit_system_revocation: true,
send_to_dead_letter_queue: false,
escalate: false,
abort: false,
reason: format!(
"Handler returned empty response for {}/{}",
domain, cmd_suffix
),
})),
}),
}
}
pub struct SagaRouter<H>
where
H: SagaDomainHandler,
{
name: String,
domain: String,
storage: HandlerStorage<H>,
}
impl<H: SagaDomainHandler + Clone> SagaRouter<H> {
pub fn new(name: impl Into<String>, domain: impl Into<String>, handler: H) -> Self {
Self {
name: name.into(),
domain: domain.into(),
storage: HandlerStorage::Static(handler),
}
}
pub fn with_factory<F>(name: impl Into<String>, domain: impl Into<String>, factory: F) -> Self
where
F: Fn() -> H + Send + Sync + 'static,
{
Self {
name: name.into(),
domain: domain.into(),
storage: HandlerStorage::Factory(Arc::new(factory)),
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn input_domain(&self) -> &str {
&self.domain
}
pub fn event_types(&self) -> Vec<String> {
self.storage.get().event_types()
}
pub fn subscriptions(&self) -> Vec<(String, Vec<String>)> {
vec![(self.domain.clone(), self.event_types())]
}
pub fn dispatch(&self, source: &EventBook) -> Result<SagaResponse, Status> {
let event_page = source
.pages
.last()
.ok_or_else(|| Status::invalid_argument("Source event book has no events"))?;
let event_any = match &event_page.payload {
Some(event_page::Payload::Event(e)) => e,
_ => return Err(Status::invalid_argument("Missing event payload")),
};
let handler = self.storage.get();
if event_any.type_url.ends_with("Notification") {
return dispatch_saga_notification(&*handler, event_any);
}
let response = handler.handle(source, event_any)?;
Ok(SagaResponse {
commands: response.commands,
events: response.events,
})
}
}
fn dispatch_saga_notification<H: SagaDomainHandler>(
handler: &H,
event_any: &Any,
) -> Result<SagaResponse, Status> {
use prost::Message;
let notification = Notification::decode(event_any.value.as_slice())
.map_err(|e| Status::invalid_argument(format!("Failed to decode Notification: {}", e)))?;
let rejection = notification
.payload
.as_ref()
.map(|p| RejectionNotification::decode(p.value.as_slice()))
.transpose()
.map_err(|e| {
Status::invalid_argument(format!("Failed to decode RejectionNotification: {}", e))
})?
.unwrap_or_default();
let (domain, cmd_suffix) = extract_rejection_key(&rejection);
let response = handler.on_rejected(¬ification, &domain, &cmd_suffix)?;
Ok(SagaResponse {
commands: vec![],
events: response.events.into_iter().collect(),
})
}
fn extract_rejection_key(rejection: &RejectionNotification) -> (String, String) {
if let Some(rejected) = &rejection.rejected_command {
let domain = rejected
.cover
.as_ref()
.map(|c| c.domain.clone())
.unwrap_or_default();
let cmd_suffix = rejected
.pages
.first()
.and_then(|p| match &p.payload {
Some(crate::proto::command_page::Payload::Command(c)) => Some(c),
_ => None,
})
.map(|c| {
c.type_url
.rsplit('/')
.next()
.unwrap_or(&c.type_url)
.to_string()
})
.unwrap_or_default();
(domain, cmd_suffix)
} else {
(String::new(), String::new())
}
}
pub struct ProcessManagerRouter<S: Default + Send + Sync + 'static> {
name: String,
pm_domain: String,
rebuild: Arc<dyn Fn(&EventBook) -> S + Send + Sync>,
domains: HashMap<String, Arc<dyn ProcessManagerDomainHandler<S>>>,
}
impl<S: Default + Send + Sync + 'static> ProcessManagerRouter<S> {
pub fn new<R>(name: impl Into<String>, pm_domain: impl Into<String>, rebuild: R) -> Self
where
R: Fn(&EventBook) -> S + Send + Sync + 'static,
{
Self {
name: name.into(),
pm_domain: pm_domain.into(),
rebuild: Arc::new(rebuild),
domains: HashMap::new(),
}
}
pub fn domain<H>(mut self, name: impl Into<String>, handler: H) -> Self
where
H: ProcessManagerDomainHandler<S> + 'static,
{
self.domains.insert(name.into(), Arc::new(handler));
self
}
pub fn name(&self) -> &str {
&self.name
}
pub fn pm_domain(&self) -> &str {
&self.pm_domain
}
pub fn subscriptions(&self) -> Vec<(String, Vec<String>)> {
self.domains
.iter()
.map(|(domain, handler)| (domain.clone(), handler.event_types()))
.collect()
}
pub fn rebuild_state(&self, events: &EventBook) -> S {
(self.rebuild)(events)
}
pub fn prepare_destinations(
&self,
trigger: &Option<EventBook>,
process_state: &Option<EventBook>,
) -> Vec<Cover> {
let trigger = match trigger {
Some(t) => t,
None => return vec![],
};
let trigger_domain = trigger
.cover
.as_ref()
.map(|c| c.domain.as_str())
.unwrap_or("");
let event_page = match trigger.pages.last() {
Some(p) => p,
None => return vec![],
};
let event_any = match &event_page.payload {
Some(event_page::Payload::Event(e)) => e,
_ => return vec![],
};
let state = match process_state {
Some(ps) => self.rebuild_state(ps),
None => S::default(),
};
self.domains
.get(trigger_domain)
.map(|handler| handler.prepare(trigger, &state, event_any))
.unwrap_or_default()
}
pub fn dispatch(
&self,
trigger: &EventBook,
process_state: &EventBook,
destinations: &[EventBook],
) -> Result<ProcessManagerHandleResponse, Status> {
let trigger_domain = trigger
.cover
.as_ref()
.map(|c| c.domain.as_str())
.unwrap_or("");
let handler = self.domains.get(trigger_domain).ok_or_else(|| {
Status::unimplemented(format!("No handler for domain: {}", trigger_domain))
})?;
let event_page = trigger
.pages
.last()
.ok_or_else(|| Status::invalid_argument("Trigger event book has no events"))?;
let event_any = match &event_page.payload {
Some(event_page::Payload::Event(e)) => e,
_ => return Err(Status::invalid_argument("Missing event payload")),
};
let state = self.rebuild_state(process_state);
if event_any.type_url.ends_with("Notification") {
return dispatch_pm_notification(handler.as_ref(), event_any, &state);
}
let response = handler.handle(trigger, &state, event_any, destinations)?;
Ok(ProcessManagerHandleResponse {
commands: response.commands,
process_events: response.process_events,
facts: response.facts,
})
}
}
fn dispatch_pm_notification<S: Default>(
handler: &dyn ProcessManagerDomainHandler<S>,
event_any: &Any,
state: &S,
) -> Result<ProcessManagerHandleResponse, Status> {
use prost::Message;
let notification = Notification::decode(event_any.value.as_slice())
.map_err(|e| Status::invalid_argument(format!("Failed to decode Notification: {}", e)))?;
let rejection = notification
.payload
.as_ref()
.map(|p| RejectionNotification::decode(p.value.as_slice()))
.transpose()
.map_err(|e| {
Status::invalid_argument(format!("Failed to decode RejectionNotification: {}", e))
})?
.unwrap_or_default();
let (domain, cmd_suffix) = extract_rejection_key(&rejection);
let response = handler.on_rejected(¬ification, state, &domain, &cmd_suffix)?;
Ok(ProcessManagerHandleResponse {
commands: vec![],
process_events: response.events,
facts: vec![],
})
}
pub struct ProjectorRouter {
name: String,
domains: HashMap<String, Arc<dyn ProjectorDomainHandler>>,
}
impl ProjectorRouter {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
domains: HashMap::new(),
}
}
pub fn domain<H>(mut self, name: impl Into<String>, handler: H) -> Self
where
H: ProjectorDomainHandler + 'static,
{
self.domains.insert(name.into(), Arc::new(handler));
self
}
pub fn name(&self) -> &str {
&self.name
}
pub fn subscriptions(&self) -> Vec<(String, Vec<String>)> {
self.domains
.iter()
.map(|(domain, handler)| (domain.clone(), handler.event_types()))
.collect()
}
pub fn dispatch(&self, events: &EventBook) -> Result<Projection, Status> {
let domain = events
.cover
.as_ref()
.map(|c| c.domain.as_str())
.unwrap_or("");
let handler = self
.domains
.get(domain)
.ok_or_else(|| Status::unimplemented(format!("No handler for domain: {}", domain)))?;
handler
.project(events)
.map_err(|e| Status::internal(e.to_string()))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn mode_markers_are_zero_sized() {
assert_eq!(std::mem::size_of::<CommandHandlerMode>(), 0);
assert_eq!(std::mem::size_of::<SagaMode>(), 0);
assert_eq!(std::mem::size_of::<ProcessManagerMode>(), 0);
assert_eq!(std::mem::size_of::<ProjectorMode>(), 0);
}
#[test]
fn pm_router_creation() {
let router: ProcessManagerRouter<()> =
ProcessManagerRouter::new("test-pm", "pm-domain", |_| ());
assert_eq!(router.name(), "test-pm");
assert_eq!(router.pm_domain(), "pm-domain");
}
#[test]
fn projector_router_creation() {
let router = ProjectorRouter::new("test-prj");
assert_eq!(router.name(), "test-prj");
}
}