pub mod processor;
pub mod processor_pool;
pub mod router;
pub mod sender;
pub mod travel_rule_processor;
pub mod trust_ping_processor;
#[cfg(test)]
pub mod trust_ping_tests;
pub use processor::{
DefaultPlainMessageProcessor, LoggingPlainMessageProcessor, PlainMessageProcessor,
StateMachineIntegrationProcessor, ValidationPlainMessageProcessor,
};
pub use processor_pool::{ProcessorPool, ProcessorPoolConfig};
pub use router::{DefaultPlainMessageRouter, IntraNodePlainMessageRouter};
pub use sender::{HttpPlainMessageSender, NodePlainMessageSender, PlainMessageSender};
pub use travel_rule_processor::TravelRuleProcessor;
pub use trust_ping_processor::TrustPingProcessor;
use crate::error::Result;
use async_trait::async_trait;
use tap_msg::didcomm::PlainMessage;
pub trait PlainMessageRouter: Send + Sync {
fn route_message_impl(&self, message: &PlainMessage) -> Result<String>;
}
#[async_trait]
pub trait RouterAsyncExt: PlainMessageRouter {
async fn route_message(&self, message: &PlainMessage) -> Result<String>;
}
#[async_trait]
impl<T: PlainMessageRouter + Sync> RouterAsyncExt for T {
async fn route_message(&self, message: &PlainMessage) -> Result<String> {
self.route_message_impl(message)
}
}
#[derive(Clone, Debug)]
pub enum PlainMessageProcessorType {
Default(DefaultPlainMessageProcessor),
Logging(LoggingPlainMessageProcessor),
Validation(ValidationPlainMessageProcessor),
StateMachine(StateMachineIntegrationProcessor),
TravelRule(TravelRuleProcessor),
TrustPing(TrustPingProcessor),
Composite(CompositePlainMessageProcessor),
}
#[derive(Clone, Debug)]
pub enum PlainMessageRouterType {
Default(DefaultPlainMessageRouter),
IntraNode(IntraNodePlainMessageRouter),
}
#[derive(Clone, Debug)]
pub struct CompositePlainMessageProcessor {
processors: Vec<PlainMessageProcessorType>,
}
impl CompositePlainMessageProcessor {
pub fn new(processors: Vec<PlainMessageProcessorType>) -> Self {
Self { processors }
}
pub fn add_processor(&mut self, processor: PlainMessageProcessorType) {
self.processors.push(processor);
}
}
#[async_trait]
impl PlainMessageProcessor for CompositePlainMessageProcessor {
async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
let mut current_message = message;
for processor in &self.processors {
let processed = match processor {
PlainMessageProcessorType::Default(p) => {
p.process_incoming(current_message).await?
}
PlainMessageProcessorType::Logging(p) => {
p.process_incoming(current_message).await?
}
PlainMessageProcessorType::Validation(p) => {
p.process_incoming(current_message).await?
}
PlainMessageProcessorType::StateMachine(p) => {
p.process_incoming(current_message).await?
}
PlainMessageProcessorType::TravelRule(p) => {
p.process_incoming(current_message).await?
}
PlainMessageProcessorType::TrustPing(p) => {
p.process_incoming(current_message).await?
}
PlainMessageProcessorType::Composite(p) => {
p.process_incoming(current_message).await?
}
};
if let Some(msg) = processed {
current_message = msg;
} else {
return Ok(None);
}
}
Ok(Some(current_message))
}
async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
let mut current_message = message;
for processor in &self.processors {
let processed = match processor {
PlainMessageProcessorType::Default(p) => {
p.process_outgoing(current_message).await?
}
PlainMessageProcessorType::Logging(p) => {
p.process_outgoing(current_message).await?
}
PlainMessageProcessorType::Validation(p) => {
p.process_outgoing(current_message).await?
}
PlainMessageProcessorType::StateMachine(p) => {
p.process_outgoing(current_message).await?
}
PlainMessageProcessorType::TravelRule(p) => {
p.process_outgoing(current_message).await?
}
PlainMessageProcessorType::TrustPing(p) => {
p.process_outgoing(current_message).await?
}
PlainMessageProcessorType::Composite(p) => {
p.process_outgoing(current_message).await?
}
};
if let Some(msg) = processed {
current_message = msg;
} else {
return Ok(None);
}
}
Ok(Some(current_message))
}
}
#[derive(Clone)]
pub struct CompositePlainMessageRouter {
routers: Vec<PlainMessageRouterType>,
}
impl CompositePlainMessageRouter {
pub fn new(routers: Vec<PlainMessageRouterType>) -> Self {
Self { routers }
}
pub fn add_router(&mut self, router: PlainMessageRouterType) {
self.routers.push(router);
}
}
impl PlainMessageRouter for CompositePlainMessageRouter {
fn route_message_impl(&self, message: &PlainMessage) -> Result<String> {
for router in &self.routers {
let result = match router {
PlainMessageRouterType::Default(r) => r.route_message_impl(message),
PlainMessageRouterType::IntraNode(r) => r.route_message_impl(message),
};
match result {
Ok(did) => return Ok(did),
Err(_) => continue, }
}
Err(crate::error::Error::Routing(
"No router could handle the message".to_string(),
))
}
}