use std::sync::Arc;
use prost_types::Any;
use tonic::{Request, Response, Status};
use crate::proto::{
command_handler_service_server::CommandHandlerService,
process_manager_service_server::ProcessManagerService,
projector_service_server::ProjectorService, saga_service_server::SagaService,
upcaster_service_server::UpcasterService, BusinessResponse, ContextualCommand, EventBook,
ProcessManagerHandleRequest, ProcessManagerHandleResponse, ProcessManagerPrepareRequest,
ProcessManagerPrepareResponse, Projection, ReplayRequest, ReplayResponse, SagaHandleRequest,
SagaResponse, UpcastRequest, UpcastResponse,
};
use crate::router::{
CloudEventsRouter, CommandHandlerDomainHandler, CommandHandlerRouter, ProcessManagerRouter,
SagaDomainHandler, SagaRouter,
};
pub type StatePacker<S> = fn(&S) -> Result<Any, Status>;
pub struct CommandHandlerGrpc<S, H>
where
S: Default + Send + Sync + 'static,
H: CommandHandlerDomainHandler<State = S> + Clone + 'static,
{
router: Arc<CommandHandlerRouter<S, H>>,
state_packer: Option<StatePacker<S>>,
}
impl<S, H> CommandHandlerGrpc<S, H>
where
S: Default + Send + Sync + 'static,
H: CommandHandlerDomainHandler<State = S> + Clone + 'static,
{
pub fn new(router: CommandHandlerRouter<S, H>) -> Self {
Self {
router: Arc::new(router),
state_packer: None,
}
}
pub fn with_replay(mut self, packer: StatePacker<S>) -> Self {
self.state_packer = Some(packer);
self
}
pub fn router(&self) -> &CommandHandlerRouter<S, H> {
&self.router
}
}
#[tonic::async_trait]
impl<S, H> CommandHandlerService for CommandHandlerGrpc<S, H>
where
S: Default + Send + Sync + 'static,
H: CommandHandlerDomainHandler<State = S> + Clone + 'static,
{
async fn handle(
&self,
request: Request<ContextualCommand>,
) -> Result<Response<BusinessResponse>, Status> {
let cmd = request.into_inner();
let response = self.router.dispatch(&cmd)?;
Ok(Response::new(response))
}
async fn replay(
&self,
request: Request<ReplayRequest>,
) -> Result<Response<ReplayResponse>, Status> {
let packer = self.state_packer.ok_or_else(|| {
Status::unimplemented(
"Replay not implemented. Call with_replay() to enable for MERGE_COMMUTATIVE strategy.",
)
})?;
let req = request.into_inner();
let event_book = build_event_book_for_replay(&req);
let state = self.router.rebuild_state(&event_book);
let state_any = packer(&state)?;
Ok(Response::new(ReplayResponse {
state: Some(state_any),
}))
}
}
fn build_event_book_for_replay(req: &ReplayRequest) -> EventBook {
EventBook {
cover: None,
pages: req.events.clone(),
snapshot: req.base_snapshot.clone(),
next_sequence: 0,
}
}
pub struct SagaHandler<H>
where
H: SagaDomainHandler + Clone + 'static,
{
router: Arc<SagaRouter<H>>,
}
impl<H: SagaDomainHandler + Clone + 'static> SagaHandler<H> {
pub fn new(router: SagaRouter<H>) -> Self {
Self {
router: Arc::new(router),
}
}
pub fn router(&self) -> &SagaRouter<H> {
&self.router
}
}
#[tonic::async_trait]
impl<H: SagaDomainHandler + Clone + 'static> SagaService for SagaHandler<H> {
async fn handle(
&self,
request: Request<SagaHandleRequest>,
) -> Result<Response<SagaResponse>, Status> {
let req = request.into_inner();
let source = req
.source
.as_ref()
.ok_or_else(|| Status::invalid_argument("Missing source event book"))?;
let response = self.router.dispatch(source)?;
Ok(Response::new(response))
}
}
pub type ProjectorHandleFn = fn(&EventBook) -> Result<Projection, Status>;
pub type ProjectorHandleClosureFn =
Arc<dyn Fn(&EventBook) -> Result<Projection, Status> + Send + Sync>;
enum ProjectorHandleType {
Fn(ProjectorHandleFn),
Closure(ProjectorHandleClosureFn),
}
pub struct ProjectorHandler {
name: String,
handle_type: Option<ProjectorHandleType>,
}
impl ProjectorHandler {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
handle_type: None,
}
}
pub fn with_handle(mut self, handle_fn: ProjectorHandleFn) -> Self {
self.handle_type = Some(ProjectorHandleType::Fn(handle_fn));
self
}
pub fn with_handle_fn<H>(mut self, handle_fn: H) -> Self
where
H: Fn(&EventBook) -> Result<Projection, Status> + Send + Sync + 'static,
{
self.handle_type = Some(ProjectorHandleType::Closure(Arc::new(handle_fn)));
self
}
pub fn name(&self) -> &str {
&self.name
}
}
#[tonic::async_trait]
impl ProjectorService for ProjectorHandler {
async fn handle(&self, request: Request<EventBook>) -> Result<Response<Projection>, Status> {
let event_book = request.into_inner();
match &self.handle_type {
Some(ProjectorHandleType::Fn(handle_fn)) => {
let projection = handle_fn(&event_book)?;
Ok(Response::new(projection))
}
Some(ProjectorHandleType::Closure(handle_fn)) => {
let projection = handle_fn(&event_book)?;
Ok(Response::new(projection))
}
None => Ok(Response::new(Projection::default())),
}
}
async fn handle_speculative(
&self,
request: Request<EventBook>,
) -> Result<Response<Projection>, Status> {
self.handle(request).await
}
}
pub struct ProcessManagerGrpcHandler<S: Default + Send + Sync + 'static> {
router: Arc<ProcessManagerRouter<S>>,
}
impl<S: Default + Send + Sync + 'static> ProcessManagerGrpcHandler<S> {
pub fn new(router: ProcessManagerRouter<S>) -> Self {
Self {
router: Arc::new(router),
}
}
pub fn router(&self) -> &ProcessManagerRouter<S> {
&self.router
}
}
#[tonic::async_trait]
impl<S: Default + Send + Sync + 'static> ProcessManagerService for ProcessManagerGrpcHandler<S> {
async fn prepare(
&self,
request: Request<ProcessManagerPrepareRequest>,
) -> Result<Response<ProcessManagerPrepareResponse>, Status> {
let req = request.into_inner();
let destinations = self
.router
.prepare_destinations(&req.trigger, &req.process_state);
Ok(Response::new(ProcessManagerPrepareResponse {
destinations,
}))
}
async fn handle(
&self,
request: Request<ProcessManagerHandleRequest>,
) -> Result<Response<ProcessManagerHandleResponse>, Status> {
let req = request.into_inner();
let trigger = req
.trigger
.as_ref()
.ok_or_else(|| Status::invalid_argument("Missing trigger event book"))?;
let process_state = req.process_state.as_ref().cloned().unwrap_or_default();
let response = self
.router
.dispatch(trigger, &process_state, &req.destinations)?;
Ok(Response::new(response))
}
}
pub type UpcasterHandleFn = fn(&[crate::proto::EventPage]) -> Vec<crate::proto::EventPage>;
pub type UpcasterHandleClosureFn =
Arc<dyn Fn(&[crate::proto::EventPage]) -> Vec<crate::proto::EventPage> + Send + Sync>;
enum UpcasterHandleType {
Fn(UpcasterHandleFn),
Closure(UpcasterHandleClosureFn),
}
pub struct UpcasterGrpcHandler {
name: String,
domain: String,
handle_type: Option<UpcasterHandleType>,
}
impl UpcasterGrpcHandler {
pub fn new(name: impl Into<String>, domain: impl Into<String>) -> Self {
Self {
name: name.into(),
domain: domain.into(),
handle_type: None,
}
}
pub fn with_handle(mut self, handle_fn: UpcasterHandleFn) -> Self {
self.handle_type = Some(UpcasterHandleType::Fn(handle_fn));
self
}
pub fn with_handle_fn<H>(mut self, handle_fn: H) -> Self
where
H: Fn(&[crate::proto::EventPage]) -> Vec<crate::proto::EventPage> + Send + Sync + 'static,
{
self.handle_type = Some(UpcasterHandleType::Closure(Arc::new(handle_fn)));
self
}
pub fn name(&self) -> &str {
&self.name
}
pub fn domain(&self) -> &str {
&self.domain
}
}
#[tonic::async_trait]
impl UpcasterService for UpcasterGrpcHandler {
async fn upcast(
&self,
request: Request<UpcastRequest>,
) -> Result<Response<UpcastResponse>, Status> {
let req = request.into_inner();
let events = req.events;
let result = match &self.handle_type {
Some(UpcasterHandleType::Fn(handle_fn)) => handle_fn(&events),
Some(UpcasterHandleType::Closure(handle_fn)) => handle_fn(&events),
None => events, };
Ok(Response::new(UpcastResponse { events: result }))
}
}
pub struct CloudEventsGrpcHandler {
router: Arc<CloudEventsRouter>,
}
impl CloudEventsGrpcHandler {
pub fn new(router: CloudEventsRouter) -> Self {
Self {
router: Arc::new(router),
}
}
pub fn router(&self) -> &CloudEventsRouter {
&self.router
}
}
#[tonic::async_trait]
impl ProjectorService for CloudEventsGrpcHandler {
async fn handle(&self, request: Request<EventBook>) -> Result<Response<Projection>, Status> {
let event_book = request.into_inner();
let response = self.router.project(&event_book);
let projection_any =
Any::from_msg(&response).map_err(|e| Status::internal(format!("Pack error: {}", e)))?;
Ok(Response::new(Projection {
cover: event_book.cover,
projector: self.router.name().to_string(),
sequence: event_book.next_sequence,
projection: Some(projection_any),
}))
}
async fn handle_speculative(
&self,
request: Request<EventBook>,
) -> Result<Response<Projection>, Status> {
self.handle(request).await
}
}