1pub mod processor;
6pub mod processor_pool;
7pub mod router;
8pub mod sender;
9
10pub use processor::{
12 DefaultPlainMessageProcessor, LoggingPlainMessageProcessor, PlainMessageProcessor,
13 StateMachineIntegrationProcessor, ValidationPlainMessageProcessor,
14};
15pub use processor_pool::{ProcessorPool, ProcessorPoolConfig};
16pub use router::{DefaultPlainMessageRouter, IntraNodePlainMessageRouter};
17pub use sender::{HttpPlainMessageSender, NodePlainMessageSender, PlainMessageSender};
18
19use crate::error::Result;
21use async_trait::async_trait;
22use tap_msg::didcomm::PlainMessage;
23
24pub trait PlainMessageRouter: Send + Sync {
26 fn route_message_impl(&self, message: &PlainMessage) -> Result<String>;
28}
29
30#[async_trait]
32pub trait RouterAsyncExt: PlainMessageRouter {
33 async fn route_message(&self, message: &PlainMessage) -> Result<String>;
35}
36
37#[async_trait]
38impl<T: PlainMessageRouter + Sync> RouterAsyncExt for T {
39 async fn route_message(&self, message: &PlainMessage) -> Result<String> {
40 self.route_message_impl(message)
41 }
42}
43
44#[derive(Clone, Debug)]
46pub enum PlainMessageProcessorType {
47 Default(DefaultPlainMessageProcessor),
48 Logging(LoggingPlainMessageProcessor),
49 Validation(ValidationPlainMessageProcessor),
50 StateMachine(StateMachineIntegrationProcessor),
51 Composite(CompositePlainMessageProcessor),
52}
53
54#[derive(Clone, Debug)]
56pub enum PlainMessageRouterType {
57 Default(DefaultPlainMessageRouter),
58 IntraNode(IntraNodePlainMessageRouter),
59}
60
61#[derive(Clone, Debug)]
63pub struct CompositePlainMessageProcessor {
64 processors: Vec<PlainMessageProcessorType>,
65}
66
67impl CompositePlainMessageProcessor {
68 pub fn new(processors: Vec<PlainMessageProcessorType>) -> Self {
70 Self { processors }
71 }
72
73 pub fn add_processor(&mut self, processor: PlainMessageProcessorType) {
75 self.processors.push(processor);
76 }
77}
78
79#[async_trait]
80impl PlainMessageProcessor for CompositePlainMessageProcessor {
81 async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
82 let mut current_message = message;
83
84 for processor in &self.processors {
85 let processed = match processor {
86 PlainMessageProcessorType::Default(p) => {
87 p.process_incoming(current_message).await?
88 }
89 PlainMessageProcessorType::Logging(p) => {
90 p.process_incoming(current_message).await?
91 }
92 PlainMessageProcessorType::Validation(p) => {
93 p.process_incoming(current_message).await?
94 }
95 PlainMessageProcessorType::StateMachine(p) => {
96 p.process_incoming(current_message).await?
97 }
98 PlainMessageProcessorType::Composite(p) => {
99 p.process_incoming(current_message).await?
100 }
101 };
102
103 if let Some(msg) = processed {
104 current_message = msg;
105 } else {
106 return Ok(None);
108 }
109 }
110
111 Ok(Some(current_message))
112 }
113
114 async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
115 let mut current_message = message;
116
117 for processor in &self.processors {
118 let processed = match processor {
119 PlainMessageProcessorType::Default(p) => {
120 p.process_outgoing(current_message).await?
121 }
122 PlainMessageProcessorType::Logging(p) => {
123 p.process_outgoing(current_message).await?
124 }
125 PlainMessageProcessorType::Validation(p) => {
126 p.process_outgoing(current_message).await?
127 }
128 PlainMessageProcessorType::StateMachine(p) => {
129 p.process_outgoing(current_message).await?
130 }
131 PlainMessageProcessorType::Composite(p) => {
132 p.process_outgoing(current_message).await?
133 }
134 };
135
136 if let Some(msg) = processed {
137 current_message = msg;
138 } else {
139 return Ok(None);
141 }
142 }
143
144 Ok(Some(current_message))
145 }
146}
147
148#[derive(Clone)]
150pub struct CompositePlainMessageRouter {
151 routers: Vec<PlainMessageRouterType>,
152}
153
154impl CompositePlainMessageRouter {
155 pub fn new(routers: Vec<PlainMessageRouterType>) -> Self {
157 Self { routers }
158 }
159
160 pub fn add_router(&mut self, router: PlainMessageRouterType) {
162 self.routers.push(router);
163 }
164}
165
166impl PlainMessageRouter for CompositePlainMessageRouter {
167 fn route_message_impl(&self, message: &PlainMessage) -> Result<String> {
168 for router in &self.routers {
170 let result = match router {
171 PlainMessageRouterType::Default(r) => r.route_message_impl(message),
172 PlainMessageRouterType::IntraNode(r) => r.route_message_impl(message),
173 };
174
175 match result {
176 Ok(did) => return Ok(did),
177 Err(_) => continue, }
179 }
180
181 Err(crate::error::Error::Routing(
183 "No router could handle the message".to_string(),
184 ))
185 }
186}