1pub mod processor;
6pub mod processor_pool;
7pub mod router;
8pub mod sender;
9pub mod travel_rule_processor;
10pub mod trust_ping_processor;
11#[cfg(test)]
12pub mod trust_ping_tests;
13
14pub use processor::{
16 DefaultPlainMessageProcessor, LoggingPlainMessageProcessor, PlainMessageProcessor,
17 StateMachineIntegrationProcessor, ValidationPlainMessageProcessor,
18};
19pub use processor_pool::{ProcessorPool, ProcessorPoolConfig};
20pub use router::{DefaultPlainMessageRouter, IntraNodePlainMessageRouter};
21pub use sender::{HttpPlainMessageSender, NodePlainMessageSender, PlainMessageSender};
22pub use travel_rule_processor::TravelRuleProcessor;
23pub use trust_ping_processor::TrustPingProcessor;
24
25use crate::error::Result;
27use async_trait::async_trait;
28use tap_msg::didcomm::PlainMessage;
29
30pub trait PlainMessageRouter: Send + Sync {
32 fn route_message_impl(&self, message: &PlainMessage) -> Result<String>;
34}
35
36#[async_trait]
38pub trait RouterAsyncExt: PlainMessageRouter {
39 async fn route_message(&self, message: &PlainMessage) -> Result<String>;
41}
42
43#[async_trait]
44impl<T: PlainMessageRouter + Sync> RouterAsyncExt for T {
45 async fn route_message(&self, message: &PlainMessage) -> Result<String> {
46 self.route_message_impl(message)
47 }
48}
49
50#[derive(Clone, Debug)]
52pub enum PlainMessageProcessorType {
53 Default(DefaultPlainMessageProcessor),
54 Logging(LoggingPlainMessageProcessor),
55 Validation(ValidationPlainMessageProcessor),
56 StateMachine(StateMachineIntegrationProcessor),
57 TravelRule(TravelRuleProcessor),
58 TrustPing(TrustPingProcessor),
59 Composite(CompositePlainMessageProcessor),
60}
61
62#[derive(Clone, Debug)]
64pub enum PlainMessageRouterType {
65 Default(DefaultPlainMessageRouter),
66 IntraNode(IntraNodePlainMessageRouter),
67}
68
69#[derive(Clone, Debug)]
71pub struct CompositePlainMessageProcessor {
72 processors: Vec<PlainMessageProcessorType>,
73}
74
75impl CompositePlainMessageProcessor {
76 pub fn new(processors: Vec<PlainMessageProcessorType>) -> Self {
78 Self { processors }
79 }
80
81 pub fn add_processor(&mut self, processor: PlainMessageProcessorType) {
83 self.processors.push(processor);
84 }
85}
86
87#[async_trait]
88impl PlainMessageProcessor for CompositePlainMessageProcessor {
89 async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
90 let mut current_message = message;
91
92 for processor in &self.processors {
93 let processed = match processor {
94 PlainMessageProcessorType::Default(p) => {
95 p.process_incoming(current_message).await?
96 }
97 PlainMessageProcessorType::Logging(p) => {
98 p.process_incoming(current_message).await?
99 }
100 PlainMessageProcessorType::Validation(p) => {
101 p.process_incoming(current_message).await?
102 }
103 PlainMessageProcessorType::StateMachine(p) => {
104 p.process_incoming(current_message).await?
105 }
106 PlainMessageProcessorType::TravelRule(p) => {
107 p.process_incoming(current_message).await?
108 }
109 PlainMessageProcessorType::TrustPing(p) => {
110 p.process_incoming(current_message).await?
111 }
112 PlainMessageProcessorType::Composite(p) => {
113 p.process_incoming(current_message).await?
114 }
115 };
116
117 if let Some(msg) = processed {
118 current_message = msg;
119 } else {
120 return Ok(None);
122 }
123 }
124
125 Ok(Some(current_message))
126 }
127
128 async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
129 let mut current_message = message;
130
131 for processor in &self.processors {
132 let processed = match processor {
133 PlainMessageProcessorType::Default(p) => {
134 p.process_outgoing(current_message).await?
135 }
136 PlainMessageProcessorType::Logging(p) => {
137 p.process_outgoing(current_message).await?
138 }
139 PlainMessageProcessorType::Validation(p) => {
140 p.process_outgoing(current_message).await?
141 }
142 PlainMessageProcessorType::StateMachine(p) => {
143 p.process_outgoing(current_message).await?
144 }
145 PlainMessageProcessorType::TravelRule(p) => {
146 p.process_outgoing(current_message).await?
147 }
148 PlainMessageProcessorType::TrustPing(p) => {
149 p.process_outgoing(current_message).await?
150 }
151 PlainMessageProcessorType::Composite(p) => {
152 p.process_outgoing(current_message).await?
153 }
154 };
155
156 if let Some(msg) = processed {
157 current_message = msg;
158 } else {
159 return Ok(None);
161 }
162 }
163
164 Ok(Some(current_message))
165 }
166}
167
168#[derive(Clone)]
170pub struct CompositePlainMessageRouter {
171 routers: Vec<PlainMessageRouterType>,
172}
173
174impl CompositePlainMessageRouter {
175 pub fn new(routers: Vec<PlainMessageRouterType>) -> Self {
177 Self { routers }
178 }
179
180 pub fn add_router(&mut self, router: PlainMessageRouterType) {
182 self.routers.push(router);
183 }
184}
185
186impl PlainMessageRouter for CompositePlainMessageRouter {
187 fn route_message_impl(&self, message: &PlainMessage) -> Result<String> {
188 for router in &self.routers {
190 let result = match router {
191 PlainMessageRouterType::Default(r) => r.route_message_impl(message),
192 PlainMessageRouterType::IntraNode(r) => r.route_message_impl(message),
193 };
194
195 match result {
196 Ok(did) => return Ok(did),
197 Err(_) => continue, }
199 }
200
201 Err(crate::error::Error::Routing(
203 "No router could handle the message".to_string(),
204 ))
205 }
206}