tap_node/message/
mod.rs

1//! PlainMessage processing and routing for TAP Node
2//!
3//! This module provides functionality for processing and routing TAP messages between agents.
4
5pub mod processor;
6pub mod processor_pool;
7pub mod router;
8pub mod sender;
9
10// Re-export processors, routers, and senders
11pub use processor::{
12    DefaultPlainMessageProcessor, LoggingPlainMessageProcessor, PlainMessageProcessor,
13    ValidationPlainMessageProcessor,
14};
15pub use processor_pool::{ProcessorPool, ProcessorPoolConfig};
16pub use router::DefaultPlainMessageRouter;
17pub use sender::{HttpPlainMessageSender, NodePlainMessageSender, PlainMessageSender};
18
19// Import the PlainMessage type from tap-msg
20use crate::error::Result;
21use async_trait::async_trait;
22use tap_msg::didcomm::PlainMessage;
23
24/// Router to determine the destination agent for a message
25pub trait PlainMessageRouter: Send + Sync {
26    /// Route a message to determine the target agent DID
27    fn route_message_impl(&self, message: &PlainMessage) -> Result<String>;
28}
29
30/// Async extension trait for the PlainMessageRouter
31#[async_trait]
32pub trait RouterAsyncExt: PlainMessageRouter {
33    /// Route a message asynchronously
34    async fn route_message(&self, message: &PlainMessage) -> Result<String>;
35}
36
37#[async_trait]
38impl<T: PlainMessageRouter + Sync> RouterAsyncExt for T {
39    async fn route_message(&self, message: &PlainMessage) -> Result<String> {
40        self.route_message_impl(message)
41    }
42}
43
44/// Processor enum to replace trait objects
45#[derive(Clone, Debug)]
46pub enum PlainMessageProcessorType {
47    Default(DefaultPlainMessageProcessor),
48    Logging(LoggingPlainMessageProcessor),
49    Validation(ValidationPlainMessageProcessor),
50    Composite(CompositePlainMessageProcessor),
51}
52
53/// Router enum to replace trait objects
54#[derive(Clone, Debug)]
55pub enum PlainMessageRouterType {
56    Default(DefaultPlainMessageRouter),
57}
58
59/// A message processor that applies multiple processors in sequence
60#[derive(Clone, Debug)]
61pub struct CompositePlainMessageProcessor {
62    processors: Vec<PlainMessageProcessorType>,
63}
64
65impl CompositePlainMessageProcessor {
66    /// Create a new composite message processor
67    pub fn new(processors: Vec<PlainMessageProcessorType>) -> Self {
68        Self { processors }
69    }
70
71    /// Add a processor to the chain
72    pub fn add_processor(&mut self, processor: PlainMessageProcessorType) {
73        self.processors.push(processor);
74    }
75}
76
77#[async_trait]
78impl PlainMessageProcessor for CompositePlainMessageProcessor {
79    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
80        let mut current_message = message;
81
82        for processor in &self.processors {
83            let processed = match processor {
84                PlainMessageProcessorType::Default(p) => {
85                    p.process_incoming(current_message).await?
86                }
87                PlainMessageProcessorType::Logging(p) => {
88                    p.process_incoming(current_message).await?
89                }
90                PlainMessageProcessorType::Validation(p) => {
91                    p.process_incoming(current_message).await?
92                }
93                PlainMessageProcessorType::Composite(p) => {
94                    p.process_incoming(current_message).await?
95                }
96            };
97
98            if let Some(msg) = processed {
99                current_message = msg;
100            } else {
101                // PlainMessage was filtered out
102                return Ok(None);
103            }
104        }
105
106        Ok(Some(current_message))
107    }
108
109    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
110        let mut current_message = message;
111
112        for processor in &self.processors {
113            let processed = match processor {
114                PlainMessageProcessorType::Default(p) => {
115                    p.process_outgoing(current_message).await?
116                }
117                PlainMessageProcessorType::Logging(p) => {
118                    p.process_outgoing(current_message).await?
119                }
120                PlainMessageProcessorType::Validation(p) => {
121                    p.process_outgoing(current_message).await?
122                }
123                PlainMessageProcessorType::Composite(p) => {
124                    p.process_outgoing(current_message).await?
125                }
126            };
127
128            if let Some(msg) = processed {
129                current_message = msg;
130            } else {
131                // PlainMessage was filtered out
132                return Ok(None);
133            }
134        }
135
136        Ok(Some(current_message))
137    }
138}
139
140/// A composite router that tries multiple routers in sequence
141#[derive(Clone)]
142pub struct CompositePlainMessageRouter {
143    routers: Vec<PlainMessageRouterType>,
144}
145
146impl CompositePlainMessageRouter {
147    /// Create a new composite router
148    pub fn new(routers: Vec<PlainMessageRouterType>) -> Self {
149        Self { routers }
150    }
151
152    /// Add a router to the chain
153    pub fn add_router(&mut self, router: PlainMessageRouterType) {
154        self.routers.push(router);
155    }
156}
157
158impl PlainMessageRouter for CompositePlainMessageRouter {
159    fn route_message_impl(&self, message: &PlainMessage) -> Result<String> {
160        // Try each router in sequence until one succeeds
161        for router in &self.routers {
162            let result = match router {
163                PlainMessageRouterType::Default(r) => r.route_message_impl(message),
164            };
165
166            match result {
167                Ok(did) => return Ok(did),
168                Err(_) => continue, // Try the next router
169            }
170        }
171
172        // If we get here, all routers failed
173        Err(crate::error::Error::Routing(
174            "No router could handle the message".to_string(),
175        ))
176    }
177}