1pub mod processor;
6pub mod processor_pool;
7pub mod router;
8pub mod sender;
9
10pub use processor::{
12 DefaultMessageProcessor, LoggingMessageProcessor, MessageProcessor, ValidationMessageProcessor,
13};
14pub use processor_pool::{ProcessorPool, ProcessorPoolConfig};
15pub use router::DefaultMessageRouter;
16pub use sender::{HttpMessageSender, MessageSender, NodeMessageSender};
17
18use crate::error::Result;
20use async_trait::async_trait;
21use tap_msg::didcomm::Message;
22
23pub trait MessageRouter: Send + Sync {
25 fn route_message_impl(&self, message: &Message) -> Result<String>;
27}
28
29#[async_trait]
31pub trait RouterAsyncExt: MessageRouter {
32 async fn route_message(&self, message: &Message) -> Result<String>;
34}
35
36#[async_trait]
37impl<T: MessageRouter + Sync> RouterAsyncExt for T {
38 async fn route_message(&self, message: &Message) -> Result<String> {
39 self.route_message_impl(message)
40 }
41}
42
43#[derive(Clone, Debug)]
45pub enum MessageProcessorType {
46 Default(DefaultMessageProcessor),
47 Logging(LoggingMessageProcessor),
48 Validation(ValidationMessageProcessor),
49 Composite(CompositeMessageProcessor),
50}
51
52#[derive(Clone, Debug)]
54pub enum MessageRouterType {
55 Default(DefaultMessageRouter),
56}
57
58#[derive(Clone, Debug)]
60pub struct CompositeMessageProcessor {
61 processors: Vec<MessageProcessorType>,
62}
63
64impl CompositeMessageProcessor {
65 pub fn new(processors: Vec<MessageProcessorType>) -> Self {
67 Self { processors }
68 }
69
70 pub fn add_processor(&mut self, processor: MessageProcessorType) {
72 self.processors.push(processor);
73 }
74}
75
76#[async_trait]
77impl MessageProcessor for CompositeMessageProcessor {
78 async fn process_incoming(&self, message: Message) -> Result<Option<Message>> {
79 let mut current_message = message;
80
81 for processor in &self.processors {
82 let processed = match processor {
83 MessageProcessorType::Default(p) => p.process_incoming(current_message).await?,
84 MessageProcessorType::Logging(p) => p.process_incoming(current_message).await?,
85 MessageProcessorType::Validation(p) => p.process_incoming(current_message).await?,
86 MessageProcessorType::Composite(p) => p.process_incoming(current_message).await?,
87 };
88
89 if let Some(msg) = processed {
90 current_message = msg;
91 } else {
92 return Ok(None);
94 }
95 }
96
97 Ok(Some(current_message))
98 }
99
100 async fn process_outgoing(&self, message: Message) -> Result<Option<Message>> {
101 let mut current_message = message;
102
103 for processor in &self.processors {
104 let processed = match processor {
105 MessageProcessorType::Default(p) => p.process_outgoing(current_message).await?,
106 MessageProcessorType::Logging(p) => p.process_outgoing(current_message).await?,
107 MessageProcessorType::Validation(p) => p.process_outgoing(current_message).await?,
108 MessageProcessorType::Composite(p) => p.process_outgoing(current_message).await?,
109 };
110
111 if let Some(msg) = processed {
112 current_message = msg;
113 } else {
114 return Ok(None);
116 }
117 }
118
119 Ok(Some(current_message))
120 }
121}
122
123#[derive(Clone)]
125pub struct CompositeMessageRouter {
126 routers: Vec<MessageRouterType>,
127}
128
129impl CompositeMessageRouter {
130 pub fn new(routers: Vec<MessageRouterType>) -> Self {
132 Self { routers }
133 }
134
135 pub fn add_router(&mut self, router: MessageRouterType) {
137 self.routers.push(router);
138 }
139}
140
141impl MessageRouter for CompositeMessageRouter {
142 fn route_message_impl(&self, message: &Message) -> Result<String> {
143 for router in &self.routers {
145 let result = match router {
146 MessageRouterType::Default(r) => r.route_message_impl(message),
147 };
148
149 match result {
150 Ok(did) => return Ok(did),
151 Err(_) => continue, }
153 }
154
155 Err(crate::error::Error::Routing(
157 "No router could handle the message".to_string(),
158 ))
159 }
160}