Skip to main content

tap_node/message/
mod.rs

1//! PlainMessage processing and routing for TAP Node
2//!
3//! This module provides functionality for processing and routing TAP messages between agents.
4
5pub mod processor;
6pub mod processor_pool;
7pub mod router;
8pub mod sender;
9pub mod travel_rule_processor;
10pub mod trust_ping_processor;
11#[cfg(test)]
12pub mod trust_ping_tests;
13
14// Re-export processors, routers, and senders
15pub use processor::{
16    DefaultPlainMessageProcessor, LoggingPlainMessageProcessor, PlainMessageProcessor,
17    StateMachineIntegrationProcessor, ValidationPlainMessageProcessor,
18};
19pub use processor_pool::{ProcessorPool, ProcessorPoolConfig};
20pub use router::{DefaultPlainMessageRouter, IntraNodePlainMessageRouter};
21pub use sender::{HttpPlainMessageSender, NodePlainMessageSender, PlainMessageSender};
22pub use travel_rule_processor::TravelRuleProcessor;
23pub use trust_ping_processor::TrustPingProcessor;
24
25// Import the PlainMessage type from tap-msg
26use crate::error::Result;
27use async_trait::async_trait;
28use tap_msg::didcomm::PlainMessage;
29
30/// Router to determine the destination agent for a message
31pub trait PlainMessageRouter: Send + Sync {
32    /// Route a message to determine the target agent DID
33    fn route_message_impl(&self, message: &PlainMessage) -> Result<String>;
34}
35
36/// Async extension trait for the PlainMessageRouter
37#[async_trait]
38pub trait RouterAsyncExt: PlainMessageRouter {
39    /// Route a message asynchronously
40    async fn route_message(&self, message: &PlainMessage) -> Result<String>;
41}
42
43#[async_trait]
44impl<T: PlainMessageRouter + Sync> RouterAsyncExt for T {
45    async fn route_message(&self, message: &PlainMessage) -> Result<String> {
46        self.route_message_impl(message)
47    }
48}
49
50/// Processor enum to replace trait objects
51#[derive(Clone, Debug)]
52pub enum PlainMessageProcessorType {
53    Default(DefaultPlainMessageProcessor),
54    Logging(LoggingPlainMessageProcessor),
55    Validation(ValidationPlainMessageProcessor),
56    StateMachine(StateMachineIntegrationProcessor),
57    TravelRule(TravelRuleProcessor),
58    TrustPing(TrustPingProcessor),
59    Composite(CompositePlainMessageProcessor),
60}
61
62/// Router enum to replace trait objects
63#[derive(Clone, Debug)]
64pub enum PlainMessageRouterType {
65    Default(DefaultPlainMessageRouter),
66    IntraNode(IntraNodePlainMessageRouter),
67}
68
69/// A message processor that applies multiple processors in sequence
70#[derive(Clone, Debug)]
71pub struct CompositePlainMessageProcessor {
72    processors: Vec<PlainMessageProcessorType>,
73}
74
75impl CompositePlainMessageProcessor {
76    /// Create a new composite message processor
77    pub fn new(processors: Vec<PlainMessageProcessorType>) -> Self {
78        Self { processors }
79    }
80
81    /// Add a processor to the chain
82    pub fn add_processor(&mut self, processor: PlainMessageProcessorType) {
83        self.processors.push(processor);
84    }
85}
86
87#[async_trait]
88impl PlainMessageProcessor for CompositePlainMessageProcessor {
89    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
90        let mut current_message = message;
91
92        for processor in &self.processors {
93            let processed = match processor {
94                PlainMessageProcessorType::Default(p) => {
95                    p.process_incoming(current_message).await?
96                }
97                PlainMessageProcessorType::Logging(p) => {
98                    p.process_incoming(current_message).await?
99                }
100                PlainMessageProcessorType::Validation(p) => {
101                    p.process_incoming(current_message).await?
102                }
103                PlainMessageProcessorType::StateMachine(p) => {
104                    p.process_incoming(current_message).await?
105                }
106                PlainMessageProcessorType::TravelRule(p) => {
107                    p.process_incoming(current_message).await?
108                }
109                PlainMessageProcessorType::TrustPing(p) => {
110                    p.process_incoming(current_message).await?
111                }
112                PlainMessageProcessorType::Composite(p) => {
113                    p.process_incoming(current_message).await?
114                }
115            };
116
117            if let Some(msg) = processed {
118                current_message = msg;
119            } else {
120                // PlainMessage was filtered out
121                return Ok(None);
122            }
123        }
124
125        Ok(Some(current_message))
126    }
127
128    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
129        let mut current_message = message;
130
131        for processor in &self.processors {
132            let processed = match processor {
133                PlainMessageProcessorType::Default(p) => {
134                    p.process_outgoing(current_message).await?
135                }
136                PlainMessageProcessorType::Logging(p) => {
137                    p.process_outgoing(current_message).await?
138                }
139                PlainMessageProcessorType::Validation(p) => {
140                    p.process_outgoing(current_message).await?
141                }
142                PlainMessageProcessorType::StateMachine(p) => {
143                    p.process_outgoing(current_message).await?
144                }
145                PlainMessageProcessorType::TravelRule(p) => {
146                    p.process_outgoing(current_message).await?
147                }
148                PlainMessageProcessorType::TrustPing(p) => {
149                    p.process_outgoing(current_message).await?
150                }
151                PlainMessageProcessorType::Composite(p) => {
152                    p.process_outgoing(current_message).await?
153                }
154            };
155
156            if let Some(msg) = processed {
157                current_message = msg;
158            } else {
159                // PlainMessage was filtered out
160                return Ok(None);
161            }
162        }
163
164        Ok(Some(current_message))
165    }
166}
167
168/// A composite router that tries multiple routers in sequence
169#[derive(Clone)]
170pub struct CompositePlainMessageRouter {
171    routers: Vec<PlainMessageRouterType>,
172}
173
174impl CompositePlainMessageRouter {
175    /// Create a new composite router
176    pub fn new(routers: Vec<PlainMessageRouterType>) -> Self {
177        Self { routers }
178    }
179
180    /// Add a router to the chain
181    pub fn add_router(&mut self, router: PlainMessageRouterType) {
182        self.routers.push(router);
183    }
184}
185
186impl PlainMessageRouter for CompositePlainMessageRouter {
187    fn route_message_impl(&self, message: &PlainMessage) -> Result<String> {
188        // Try each router in sequence until one succeeds
189        for router in &self.routers {
190            let result = match router {
191                PlainMessageRouterType::Default(r) => r.route_message_impl(message),
192                PlainMessageRouterType::IntraNode(r) => r.route_message_impl(message),
193            };
194
195            match result {
196                Ok(did) => return Ok(did),
197                Err(_) => continue, // Try the next router
198            }
199        }
200
201        // If we get here, all routers failed
202        Err(crate::error::Error::Routing(
203            "No router could handle the message".to_string(),
204        ))
205    }
206}