tap_node/message/processor.rs
1//! # Message Processor Implementations for TAP Node
2//!
3//! This module provides message processing functionality for TAP Node. Message processors
4//! serve as middleware in the message handling pipeline, allowing for validation, transformation,
5//! and filtering of messages as they flow through the system.
6//!
7//! ## Message Processing Pipeline
8//!
9//! The TAP Node uses a pipeline architecture for message processing, where messages pass through
10//! a series of processors in sequence. Each processor can:
11//!
12//! - Pass the message through unchanged
13//! - Transform the message in some way
14//! - Filter out (drop) messages based on certain criteria
15//! - Perform side effects (logging, metrics collection, etc.)
16//!
17//! ## Processor Types
18//!
19//! The module provides several built-in processor implementations:
20//!
21//! - `LoggingMessageProcessor`: Logs information about messages passing through the system
22//! - `ValidationMessageProcessor`: Validates message structure and content
23//! - `DefaultMessageProcessor`: A simple pass-through processor with minimal functionality
24//! - `CompositeMessageProcessor`: Combines multiple processors into a processing chain
25//!
26//! ## Custom Processors
27//!
28//! You can create custom processors by implementing the `MessageProcessor` trait. This
29//! allows for specialized processing such as:
30//!
31//! - Message transformation for protocol version compatibility
32//! - Content-based filtering and routing
33//! - Security scanning and anomaly detection
34//! - Metrics collection and performance monitoring
35//!
36//! ## Processing Modes
37//!
38//! Each processor implements two key methods:
39//!
40//! - `process_incoming()`: For messages received by the node
41//! - `process_outgoing()`: For messages being sent from the node
42//!
43//! This separation allows for different processing logic depending on message direction.
44
45use async_trait::async_trait;
46use log::{debug, info};
47use tap_msg::didcomm::Message;
48
49use crate::error::Result;
50
51/// Trait for processing DIDComm messages in TAP nodes
52///
53/// The `MessageProcessor` trait defines the interface for message processors
54/// that handle DIDComm messages flowing through the TAP node. Processors act
55/// as middleware, allowing for validation, transformation, logging, metrics
56/// collection, and other operations on messages.
57///
58/// # Design Patterns
59///
60/// This trait follows the Chain of Responsibility pattern, where each processor
61/// can either:
62/// - Pass the message along unchanged
63/// - Transform the message before passing it along
64/// - Filter out (drop) the message by returning None
65/// - Perform side effects during processing (logging, metrics, etc.)
66///
67/// # Thread Safety
68///
69/// All implementations must be `Send + Sync + Clone` to ensure they can be
70/// safely used in multithreaded environments and composed into processor chains.
71///
72/// # Implementation Guidelines
73///
74/// When implementing a custom processor:
75/// - Ensure both `process_incoming` and `process_outgoing` are implemented
76/// - Be mindful of performance in high-throughput environments
77/// - Consider making processors stateless when possible
78/// - Use the processor's Clone trait to avoid expensive setup/teardown
79/// - Document any side effects or transformations clearly
80///
81/// # Examples
82///
83/// ```
84/// # use async_trait::async_trait;
85/// # use tap_node::error::Result;
86/// # use tap_msg::didcomm::Message;
87/// # use tap_node::message::processor::MessageProcessor;
88/// #
89/// #[derive(Clone, Debug)]
90/// struct MyCustomProcessor;
91///
92/// #[async_trait]
93/// impl MessageProcessor for MyCustomProcessor {
94/// async fn process_incoming(&self, message: Message) -> Result<Option<Message>> {
95/// // Process incoming message - e.g., validate fields, log, transform
96/// println!("Processing incoming message: {}", message.id);
97/// Ok(Some(message)) // Pass message along unchanged
98/// }
99///
100/// async fn process_outgoing(&self, message: Message) -> Result<Option<Message>> {
101/// // Process outgoing message
102/// println!("Processing outgoing message: {}", message.id);
103/// Ok(Some(message)) // Pass message along unchanged
104/// }
105/// }
106/// ```
107#[async_trait]
108pub trait MessageProcessor: Send + Sync + Clone {
109 /// Process an incoming message received by the node
110 ///
111 /// This method handles messages that are being received by the TAP node from
112 /// external sources. Implementations can validate, transform, or filter these
113 /// messages before they are routed to their target agents.
114 ///
115 /// # Parameters
116 ///
117 /// * `message` - The DIDComm message to process
118 ///
119 /// # Returns
120 ///
121 /// * `Ok(Some(message))` - The message to pass to the next processor
122 /// * `Ok(None)` - Drop the message (do not process further)
123 /// * `Err(e)` - Processing error
124 async fn process_incoming(&self, message: Message) -> Result<Option<Message>>;
125
126 /// Process an outgoing message being sent from the node
127 ///
128 /// This method handles messages that are being sent from the TAP node to
129 /// external recipients. Implementations can transform these messages for
130 /// compatibility, add headers, perform logging, or filter messages before
131 /// they are delivered.
132 ///
133 /// # Parameters
134 ///
135 /// * `message` - The DIDComm message to process
136 ///
137 /// # Returns
138 ///
139 /// * `Ok(Some(message))` - The message to pass to the next processor
140 /// * `Ok(None)` - Drop the message (do not process further)
141 /// * `Err(e)` - Processing error
142 async fn process_outgoing(&self, message: Message) -> Result<Option<Message>>;
143}
144
145/// A message processor that logs messages
146#[derive(Debug, Clone)]
147pub struct LoggingMessageProcessor;
148
149#[async_trait]
150impl MessageProcessor for LoggingMessageProcessor {
151 async fn process_incoming(&self, message: Message) -> Result<Option<Message>> {
152 info!("Incoming message: {}", message.id);
153 debug!("Message content: {:?}", message);
154 Ok(Some(message))
155 }
156
157 async fn process_outgoing(&self, message: Message) -> Result<Option<Message>> {
158 info!("Outgoing message: {}", message.id);
159 debug!("Message content: {:?}", message);
160 Ok(Some(message))
161 }
162}
163
164/// A message processor that validates messages
165///
166/// This processor validates incoming and outgoing DIDComm messages to ensure they
167/// conform to the expected structure and protocol requirements.
168///
169/// In a production implementation, this would perform comprehensive validation including:
170/// - Field validation (required fields, format, values)
171/// - Protocol compliance checks for each message type
172/// - Signature verification
173/// - Timestamp and expiration checks
174/// - Security and authorization checks
175///
176/// # Implementation
177///
178/// Currently, this implementation validates:
179/// - The message ID is not empty
180/// - The message type is not empty
181/// - Any 'from' or 'to' DIDs follow the 'did:' prefix format
182/// - Basic protocol-specific requirements based on message type
183///
184/// # Message Flow
185///
186/// The validator sits in the message processor pipeline and can filter out invalid
187/// messages by returning Ok(None), or let valid messages continue through the
188/// pipeline by returning Ok(Some(message)).
189#[derive(Debug, Clone)]
190pub struct ValidationMessageProcessor;
191
192#[async_trait]
193impl MessageProcessor for ValidationMessageProcessor {
194 async fn process_incoming(&self, message: Message) -> Result<Option<Message>> {
195 debug!("Validating incoming message: {}", message.id);
196
197 // Basic validation - ID and type should not be empty
198 if message.id.is_empty() {
199 info!("Message has empty ID, rejecting");
200 return Ok(None);
201 }
202
203 if message.typ.is_empty() {
204 info!("Message has empty type, rejecting");
205 return Ok(None);
206 }
207
208 // Validate DID format if present
209 if let Some(from) = &message.from {
210 if !from.starts_with("did:") {
211 info!("Invalid 'from' DID format: {}", from);
212 return Ok(None);
213 }
214 }
215
216 // Validate recipient DIDs
217 if let Some(to) = &message.to {
218 if to.is_empty() {
219 info!("Message has empty 'to' field");
220 return Ok(None);
221 }
222
223 // All DIDs should have valid format
224 for recipient in to {
225 if !recipient.starts_with("did:") {
226 info!("Invalid recipient DID format: {}", recipient);
227 return Ok(None);
228 }
229 }
230 }
231
232 // Validate body
233 if message.body == serde_json::json!(null) {
234 info!("Message has null body, rejecting");
235 return Ok(None);
236 }
237
238 // Validate pthid if present
239 if let Some(pthid) = &message.pthid {
240 if pthid.is_empty() {
241 info!("Message has empty parent thread ID, rejecting");
242 return Ok(None);
243 }
244 }
245
246 // Validate timestamp
247 if let Some(created_time) = message.created_time {
248 let now = chrono::Utc::now().timestamp() as u64;
249 // Check if the timestamp is more than 5 minutes in the future
250 if created_time > now + 300 {
251 info!("Message has future timestamp, rejecting");
252 return Ok(None);
253 }
254 }
255
256 // Protocol-specific validation based on message type
257 let typ = &message.typ;
258
259 // Validate TAP messages
260 if typ.starts_with("https://tap.rsvp/schema/") {
261 // TAP-specific validations
262 // Check that it's a valid TAP message type
263 if !typ.contains("transfer")
264 && !typ.contains("authorize")
265 && !typ.contains("reject")
266 && !typ.contains("settle")
267 {
268 info!("Unknown TAP message type: {}", typ);
269 return Ok(None);
270 }
271 }
272 // Validate DIDComm messages
273 else if typ.starts_with("https://didcomm.org/") {
274 // DIDComm-specific validations
275 // Add more specific DIDComm validations here
276 }
277 // Unknown message type protocol
278 else if !typ.starts_with("https://tap.rsvp/schema/")
279 && !typ.starts_with("https://didcomm.org/")
280 {
281 info!("Unknown message protocol: {}", typ);
282 // Reject unknown message protocols
283 return Ok(None);
284 }
285
286 // Message passed validation
287 Ok(Some(message))
288 }
289
290 async fn process_outgoing(&self, message: Message) -> Result<Option<Message>> {
291 debug!("Validating outgoing message: {}", message.id);
292
293 // For outgoing messages, apply the same validations as incoming messages
294 // In a production system, there might be different validations for outgoing vs incoming
295
296 // Basic validation - ID and type should not be empty
297 if message.id.is_empty() {
298 info!("Outgoing message has empty ID, rejecting");
299 return Ok(None);
300 }
301
302 if message.typ.is_empty() {
303 info!("Outgoing message has empty type, rejecting");
304 return Ok(None);
305 }
306
307 // Validate DID format if present
308 if let Some(from) = &message.from {
309 if !from.starts_with("did:") {
310 info!("Invalid 'from' DID format in outgoing message: {}", from);
311 return Ok(None);
312 }
313 }
314
315 // Validate recipient DIDs
316 if let Some(to) = &message.to {
317 if to.is_empty() {
318 info!("Outgoing message has empty 'to' field");
319 return Ok(None);
320 }
321
322 // All DIDs should have valid format
323 for recipient in to {
324 if !recipient.starts_with("did:") {
325 info!(
326 "Invalid recipient DID format in outgoing message: {}",
327 recipient
328 );
329 return Ok(None);
330 }
331 }
332 }
333
334 // Validate body
335 if message.body == serde_json::json!(null) {
336 info!("Outgoing message has null body, rejecting");
337 return Ok(None);
338 }
339
340 // Validate pthid if present
341 if let Some(pthid) = &message.pthid {
342 if pthid.is_empty() {
343 info!("Outgoing message has empty parent thread ID, rejecting");
344 return Ok(None);
345 }
346 }
347
348 // Validate timestamp
349 if let Some(created_time) = message.created_time {
350 let now = chrono::Utc::now().timestamp() as u64;
351 // Check if the timestamp is more than 5 minutes in the future
352 if created_time > now + 300 {
353 info!("Outgoing message has future timestamp, rejecting");
354 return Ok(None);
355 }
356 }
357
358 // Protocol-specific validation based on message type
359 let typ = &message.typ;
360
361 // Validate TAP messages
362 if typ.starts_with("https://tap.rsvp/schema/") {
363 // TAP-specific validations
364 // Check that it's a valid TAP message type
365 if !typ.contains("transfer")
366 && !typ.contains("authorize")
367 && !typ.contains("reject")
368 && !typ.contains("settle")
369 {
370 info!("Unknown TAP message type in outgoing message: {}", typ);
371 return Ok(None);
372 }
373 }
374 // Validate DIDComm messages
375 else if typ.starts_with("https://didcomm.org/") {
376 // DIDComm-specific validations
377 // Add more specific DIDComm validations here
378 }
379 // Unknown message type protocol
380 else if !typ.starts_with("https://tap.rsvp/schema/")
381 && !typ.starts_with("https://didcomm.org/")
382 {
383 info!("Unknown message protocol in outgoing message: {}", typ);
384 // Reject unknown message protocols
385 return Ok(None);
386 }
387
388 // Message passed validation
389 Ok(Some(message))
390 }
391}
392
393/// Default message processor with core functionality
394#[derive(Debug, Clone)]
395pub struct DefaultMessageProcessor;
396
397#[async_trait]
398impl MessageProcessor for DefaultMessageProcessor {
399 async fn process_incoming(&self, message: Message) -> Result<Option<Message>> {
400 // By default, we just pass the message through
401 Ok(Some(message))
402 }
403
404 async fn process_outgoing(&self, message: Message) -> Result<Option<Message>> {
405 // By default, we just pass the message through
406 Ok(Some(message))
407 }
408}
409
410/// Default message processor that logs and validates messages
411#[derive(Clone, Debug)]
412pub struct DefaultMessageProcessorImpl {
413 /// The internal processor
414 processor: crate::message::MessageProcessorType,
415}
416
417impl Default for DefaultMessageProcessorImpl {
418 fn default() -> Self {
419 Self::new()
420 }
421}
422
423impl DefaultMessageProcessorImpl {
424 /// Create a new default message processor
425 pub fn new() -> Self {
426 let logging_processor =
427 crate::message::MessageProcessorType::Logging(LoggingMessageProcessor);
428 let validation_processor =
429 crate::message::MessageProcessorType::Validation(ValidationMessageProcessor);
430
431 let mut processor = crate::message::CompositeMessageProcessor::new(Vec::new());
432 processor.add_processor(validation_processor);
433 processor.add_processor(logging_processor);
434
435 let processor = crate::message::MessageProcessorType::Composite(processor);
436
437 Self { processor }
438 }
439}
440
441#[async_trait]
442impl MessageProcessor for DefaultMessageProcessorImpl {
443 async fn process_incoming(&self, message: Message) -> Result<Option<Message>> {
444 match &self.processor {
445 crate::message::MessageProcessorType::Default(p) => p.process_incoming(message).await,
446 crate::message::MessageProcessorType::Logging(p) => p.process_incoming(message).await,
447 crate::message::MessageProcessorType::Validation(p) => {
448 p.process_incoming(message).await
449 }
450 crate::message::MessageProcessorType::Composite(p) => p.process_incoming(message).await,
451 }
452 }
453
454 async fn process_outgoing(&self, message: Message) -> Result<Option<Message>> {
455 match &self.processor {
456 crate::message::MessageProcessorType::Default(p) => p.process_outgoing(message).await,
457 crate::message::MessageProcessorType::Logging(p) => p.process_outgoing(message).await,
458 crate::message::MessageProcessorType::Validation(p) => {
459 p.process_outgoing(message).await
460 }
461 crate::message::MessageProcessorType::Composite(p) => p.process_outgoing(message).await,
462 }
463 }
464}