tap_node/message/
mod.rs

1//! Message processing and routing for TAP Node
2//!
3//! This module provides functionality for processing and routing TAP messages between agents.
4
5pub mod processor;
6pub mod processor_pool;
7pub mod router;
8pub mod sender;
9
10// Re-export processors, routers, and senders
11pub use processor::{
12    DefaultMessageProcessor, LoggingMessageProcessor, MessageProcessor, ValidationMessageProcessor,
13};
14pub use processor_pool::{ProcessorPool, ProcessorPoolConfig};
15pub use router::DefaultMessageRouter;
16pub use sender::{HttpMessageSender, MessageSender, NodeMessageSender};
17
18// Import the Message type from tap-msg
19use crate::error::Result;
20use async_trait::async_trait;
21use tap_msg::didcomm::Message;
22
23/// Router to determine the destination agent for a message
24pub trait MessageRouter: Send + Sync {
25    /// Route a message to determine the target agent DID
26    fn route_message_impl(&self, message: &Message) -> Result<String>;
27}
28
29/// Async extension trait for the MessageRouter
30#[async_trait]
31pub trait RouterAsyncExt: MessageRouter {
32    /// Route a message asynchronously
33    async fn route_message(&self, message: &Message) -> Result<String>;
34}
35
36#[async_trait]
37impl<T: MessageRouter + Sync> RouterAsyncExt for T {
38    async fn route_message(&self, message: &Message) -> Result<String> {
39        self.route_message_impl(message)
40    }
41}
42
43/// Processor enum to replace trait objects
44#[derive(Clone, Debug)]
45pub enum MessageProcessorType {
46    Default(DefaultMessageProcessor),
47    Logging(LoggingMessageProcessor),
48    Validation(ValidationMessageProcessor),
49    Composite(CompositeMessageProcessor),
50}
51
52/// Router enum to replace trait objects
53#[derive(Clone, Debug)]
54pub enum MessageRouterType {
55    Default(DefaultMessageRouter),
56}
57
58/// A message processor that applies multiple processors in sequence
59#[derive(Clone, Debug)]
60pub struct CompositeMessageProcessor {
61    processors: Vec<MessageProcessorType>,
62}
63
64impl CompositeMessageProcessor {
65    /// Create a new composite message processor
66    pub fn new(processors: Vec<MessageProcessorType>) -> Self {
67        Self { processors }
68    }
69
70    /// Add a processor to the chain
71    pub fn add_processor(&mut self, processor: MessageProcessorType) {
72        self.processors.push(processor);
73    }
74}
75
76#[async_trait]
77impl MessageProcessor for CompositeMessageProcessor {
78    async fn process_incoming(&self, message: Message) -> Result<Option<Message>> {
79        let mut current_message = message;
80
81        for processor in &self.processors {
82            let processed = match processor {
83                MessageProcessorType::Default(p) => p.process_incoming(current_message).await?,
84                MessageProcessorType::Logging(p) => p.process_incoming(current_message).await?,
85                MessageProcessorType::Validation(p) => p.process_incoming(current_message).await?,
86                MessageProcessorType::Composite(p) => p.process_incoming(current_message).await?,
87            };
88
89            if let Some(msg) = processed {
90                current_message = msg;
91            } else {
92                // Message was filtered out
93                return Ok(None);
94            }
95        }
96
97        Ok(Some(current_message))
98    }
99
100    async fn process_outgoing(&self, message: Message) -> Result<Option<Message>> {
101        let mut current_message = message;
102
103        for processor in &self.processors {
104            let processed = match processor {
105                MessageProcessorType::Default(p) => p.process_outgoing(current_message).await?,
106                MessageProcessorType::Logging(p) => p.process_outgoing(current_message).await?,
107                MessageProcessorType::Validation(p) => p.process_outgoing(current_message).await?,
108                MessageProcessorType::Composite(p) => p.process_outgoing(current_message).await?,
109            };
110
111            if let Some(msg) = processed {
112                current_message = msg;
113            } else {
114                // Message was filtered out
115                return Ok(None);
116            }
117        }
118
119        Ok(Some(current_message))
120    }
121}
122
123/// A composite router that tries multiple routers in sequence
124#[derive(Clone)]
125pub struct CompositeMessageRouter {
126    routers: Vec<MessageRouterType>,
127}
128
129impl CompositeMessageRouter {
130    /// Create a new composite router
131    pub fn new(routers: Vec<MessageRouterType>) -> Self {
132        Self { routers }
133    }
134
135    /// Add a router to the chain
136    pub fn add_router(&mut self, router: MessageRouterType) {
137        self.routers.push(router);
138    }
139}
140
141impl MessageRouter for CompositeMessageRouter {
142    fn route_message_impl(&self, message: &Message) -> Result<String> {
143        // Try each router in sequence until one succeeds
144        for router in &self.routers {
145            let result = match router {
146                MessageRouterType::Default(r) => r.route_message_impl(message),
147            };
148
149            match result {
150                Ok(did) => return Ok(did),
151                Err(_) => continue, // Try the next router
152            }
153        }
154
155        // If we get here, all routers failed
156        Err(crate::error::Error::Routing(
157            "No router could handle the message".to_string(),
158        ))
159    }
160}