tap_node/message/
mod.rs

1//! PlainMessage processing and routing for TAP Node
2//!
3//! This module provides functionality for processing and routing TAP messages between agents.
4
5pub mod processor;
6pub mod processor_pool;
7pub mod router;
8pub mod sender;
9
10// Re-export processors, routers, and senders
11pub use processor::{
12    DefaultPlainMessageProcessor, LoggingPlainMessageProcessor, PlainMessageProcessor,
13    StateMachineIntegrationProcessor, ValidationPlainMessageProcessor,
14};
15pub use processor_pool::{ProcessorPool, ProcessorPoolConfig};
16pub use router::{DefaultPlainMessageRouter, IntraNodePlainMessageRouter};
17pub use sender::{HttpPlainMessageSender, NodePlainMessageSender, PlainMessageSender};
18
19// Import the PlainMessage type from tap-msg
20use crate::error::Result;
21use async_trait::async_trait;
22use tap_msg::didcomm::PlainMessage;
23
24/// Router to determine the destination agent for a message
25pub trait PlainMessageRouter: Send + Sync {
26    /// Route a message to determine the target agent DID
27    fn route_message_impl(&self, message: &PlainMessage) -> Result<String>;
28}
29
30/// Async extension trait for the PlainMessageRouter
31#[async_trait]
32pub trait RouterAsyncExt: PlainMessageRouter {
33    /// Route a message asynchronously
34    async fn route_message(&self, message: &PlainMessage) -> Result<String>;
35}
36
37#[async_trait]
38impl<T: PlainMessageRouter + Sync> RouterAsyncExt for T {
39    async fn route_message(&self, message: &PlainMessage) -> Result<String> {
40        self.route_message_impl(message)
41    }
42}
43
44/// Processor enum to replace trait objects
45#[derive(Clone, Debug)]
46pub enum PlainMessageProcessorType {
47    Default(DefaultPlainMessageProcessor),
48    Logging(LoggingPlainMessageProcessor),
49    Validation(ValidationPlainMessageProcessor),
50    StateMachine(StateMachineIntegrationProcessor),
51    Composite(CompositePlainMessageProcessor),
52}
53
54/// Router enum to replace trait objects
55#[derive(Clone, Debug)]
56pub enum PlainMessageRouterType {
57    Default(DefaultPlainMessageRouter),
58    IntraNode(IntraNodePlainMessageRouter),
59}
60
61/// A message processor that applies multiple processors in sequence
62#[derive(Clone, Debug)]
63pub struct CompositePlainMessageProcessor {
64    processors: Vec<PlainMessageProcessorType>,
65}
66
67impl CompositePlainMessageProcessor {
68    /// Create a new composite message processor
69    pub fn new(processors: Vec<PlainMessageProcessorType>) -> Self {
70        Self { processors }
71    }
72
73    /// Add a processor to the chain
74    pub fn add_processor(&mut self, processor: PlainMessageProcessorType) {
75        self.processors.push(processor);
76    }
77}
78
79#[async_trait]
80impl PlainMessageProcessor for CompositePlainMessageProcessor {
81    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
82        let mut current_message = message;
83
84        for processor in &self.processors {
85            let processed = match processor {
86                PlainMessageProcessorType::Default(p) => {
87                    p.process_incoming(current_message).await?
88                }
89                PlainMessageProcessorType::Logging(p) => {
90                    p.process_incoming(current_message).await?
91                }
92                PlainMessageProcessorType::Validation(p) => {
93                    p.process_incoming(current_message).await?
94                }
95                PlainMessageProcessorType::StateMachine(p) => {
96                    p.process_incoming(current_message).await?
97                }
98                PlainMessageProcessorType::Composite(p) => {
99                    p.process_incoming(current_message).await?
100                }
101            };
102
103            if let Some(msg) = processed {
104                current_message = msg;
105            } else {
106                // PlainMessage was filtered out
107                return Ok(None);
108            }
109        }
110
111        Ok(Some(current_message))
112    }
113
114    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
115        let mut current_message = message;
116
117        for processor in &self.processors {
118            let processed = match processor {
119                PlainMessageProcessorType::Default(p) => {
120                    p.process_outgoing(current_message).await?
121                }
122                PlainMessageProcessorType::Logging(p) => {
123                    p.process_outgoing(current_message).await?
124                }
125                PlainMessageProcessorType::Validation(p) => {
126                    p.process_outgoing(current_message).await?
127                }
128                PlainMessageProcessorType::StateMachine(p) => {
129                    p.process_outgoing(current_message).await?
130                }
131                PlainMessageProcessorType::Composite(p) => {
132                    p.process_outgoing(current_message).await?
133                }
134            };
135
136            if let Some(msg) = processed {
137                current_message = msg;
138            } else {
139                // PlainMessage was filtered out
140                return Ok(None);
141            }
142        }
143
144        Ok(Some(current_message))
145    }
146}
147
148/// A composite router that tries multiple routers in sequence
149#[derive(Clone)]
150pub struct CompositePlainMessageRouter {
151    routers: Vec<PlainMessageRouterType>,
152}
153
154impl CompositePlainMessageRouter {
155    /// Create a new composite router
156    pub fn new(routers: Vec<PlainMessageRouterType>) -> Self {
157        Self { routers }
158    }
159
160    /// Add a router to the chain
161    pub fn add_router(&mut self, router: PlainMessageRouterType) {
162        self.routers.push(router);
163    }
164}
165
166impl PlainMessageRouter for CompositePlainMessageRouter {
167    fn route_message_impl(&self, message: &PlainMessage) -> Result<String> {
168        // Try each router in sequence until one succeeds
169        for router in &self.routers {
170            let result = match router {
171                PlainMessageRouterType::Default(r) => r.route_message_impl(message),
172                PlainMessageRouterType::IntraNode(r) => r.route_message_impl(message),
173            };
174
175            match result {
176                Ok(did) => return Ok(did),
177                Err(_) => continue, // Try the next router
178            }
179        }
180
181        // If we get here, all routers failed
182        Err(crate::error::Error::Routing(
183            "No router could handle the message".to_string(),
184        ))
185    }
186}