tap_node/message/
processor.rs

1//! # PlainMessage Processor Implementations for TAP Node
2//!
3//! This module provides message processing functionality for TAP Node. PlainMessage processors
4//! serve as middleware in the message handling pipeline, allowing for validation, transformation,
5//! and filtering of messages as they flow through the system.
6//!
7//! ## PlainMessage Processing Pipeline
8//!
9//! The TAP Node uses a pipeline architecture for message processing, where messages pass through
10//! a series of processors in sequence. Each processor can:
11//!
12//! - Pass the message through unchanged
13//! - Transform the message in some way
14//! - Filter out (drop) messages based on certain criteria
15//! - Perform side effects (logging, metrics collection, etc.)
16//!
17//! ## Processor Types
18//!
19//! The module provides several built-in processor implementations:
20//!
21//! - `LoggingPlainMessageProcessor`: Logs information about messages passing through the system
22//! - `ValidationPlainMessageProcessor`: Validates message structure and content
23//! - `DefaultPlainMessageProcessor`: A simple pass-through processor with minimal functionality
24//! - `CompositePlainMessageProcessor`: Combines multiple processors into a processing chain
25//!
26//! ## Custom Processors
27//!
28//! You can create custom processors by implementing the `PlainMessageProcessor` trait. This
29//! allows for specialized processing such as:
30//!
31//! - PlainMessage transformation for protocol version compatibility
32//! - Content-based filtering and routing
33//! - Security scanning and anomaly detection
34//! - Metrics collection and performance monitoring
35//!
36//! ## Processing Modes
37//!
38//! Each processor implements two key methods:
39//!
40//! - `process_incoming()`: For messages received by the node
41//! - `process_outgoing()`: For messages being sent from the node
42//!
43//! This separation allows for different processing logic depending on message direction.
44
45use async_trait::async_trait;
46use log::{debug, info};
47use tap_msg::didcomm::PlainMessage;
48
49use crate::error::Result;
50
51/// Trait for processing DIDComm messages in TAP nodes
52///
53/// The `PlainMessageProcessor` trait defines the interface for message processors
54/// that handle DIDComm messages flowing through the TAP node. Processors act
55/// as middleware, allowing for validation, transformation, logging, metrics
56/// collection, and other operations on messages.
57///
58/// # Design Patterns
59///
60/// This trait follows the Chain of Responsibility pattern, where each processor
61/// can either:
62/// - Pass the message along unchanged
63/// - Transform the message before passing it along
64/// - Filter out (drop) the message by returning None
65/// - Perform side effects during processing (logging, metrics, etc.)
66///
67/// # Thread Safety
68///
69/// All implementations must be `Send + Sync + Clone` to ensure they can be
70/// safely used in multithreaded environments and composed into processor chains.
71///
72/// # Implementation Guidelines
73///
74/// When implementing a custom processor:
75/// - Ensure both `process_incoming` and `process_outgoing` are implemented
76/// - Be mindful of performance in high-throughput environments
77/// - Consider making processors stateless when possible
78/// - Use the processor's Clone trait to avoid expensive setup/teardown
79/// - Document any side effects or transformations clearly
80///
81/// # Examples
82///
83/// ```
84/// # use async_trait::async_trait;
85/// # use tap_node::error::Result;
86/// # use tap_msg::didcomm::PlainMessage;
87/// # use tap_node::message::processor::PlainMessageProcessor;
88/// #
89/// #[derive(Clone, Debug)]
90/// struct MyCustomProcessor;
91///
92/// #[async_trait]
93/// impl PlainMessageProcessor for MyCustomProcessor {
94///     async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
95///         // Process incoming message - e.g., validate fields, log, transform
96///         println!("Processing incoming message: {}", message.id);
97///         Ok(Some(message))  // Pass message along unchanged
98///     }
99///
100///     async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
101///         // Process outgoing message
102///         println!("Processing outgoing message: {}", message.id);
103///         Ok(Some(message))  // Pass message along unchanged
104///     }
105/// }
106/// ```
107#[async_trait]
108pub trait PlainMessageProcessor: Send + Sync + Clone {
109    /// Process an incoming message received by the node
110    ///
111    /// This method handles messages that are being received by the TAP node from
112    /// external sources. Implementations can validate, transform, or filter these
113    /// messages before they are routed to their target agents.
114    ///
115    /// # Parameters
116    ///
117    /// * `message` - The DIDComm message to process
118    ///
119    /// # Returns
120    ///
121    /// * `Ok(Some(message))` - The message to pass to the next processor
122    /// * `Ok(None)` - Drop the message (do not process further)
123    /// * `Err(e)` - Processing error
124    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>>;
125
126    /// Process an outgoing message being sent from the node
127    ///
128    /// This method handles messages that are being sent from the TAP node to
129    /// external recipients. Implementations can transform these messages for
130    /// compatibility, add headers, perform logging, or filter messages before
131    /// they are delivered.
132    ///
133    /// # Parameters
134    ///
135    /// * `message` - The DIDComm message to process
136    ///
137    /// # Returns
138    ///
139    /// * `Ok(Some(message))` - The message to pass to the next processor
140    /// * `Ok(None)` - Drop the message (do not process further)
141    /// * `Err(e)` - Processing error
142    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>>;
143}
144
145/// A message processor that logs messages
146#[derive(Debug, Clone)]
147pub struct LoggingPlainMessageProcessor;
148
149#[async_trait]
150impl PlainMessageProcessor for LoggingPlainMessageProcessor {
151    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
152        info!("Incoming message: {}", message.id);
153        debug!("PlainMessage content: {:?}", message);
154        Ok(Some(message))
155    }
156
157    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
158        info!("Outgoing message: {}", message.id);
159        debug!("PlainMessage content: {:?}", message);
160        Ok(Some(message))
161    }
162}
163
164/// A message processor that validates messages
165///
166/// This processor validates incoming and outgoing DIDComm messages to ensure they
167/// conform to the expected structure and protocol requirements.
168///
169/// In a production implementation, this would perform comprehensive validation including:
170/// - Field validation (required fields, format, values)
171/// - Protocol compliance checks for each message type
172/// - Signature verification
173/// - Timestamp and expiration checks
174/// - Security and authorization checks
175///
176/// # Implementation
177///
178/// Currently, this implementation validates:
179/// - The message ID is not empty
180/// - The message type is not empty
181/// - Any 'from' or 'to' DIDs follow the 'did:' prefix format
182/// - Basic protocol-specific requirements based on message type
183///
184/// # PlainMessage Flow
185///
186/// The validator sits in the message processor pipeline and can filter out invalid
187/// messages by returning Ok(None), or let valid messages continue through the
188/// pipeline by returning Ok(Some(message)).
189#[derive(Debug, Clone)]
190pub struct ValidationPlainMessageProcessor;
191
192#[async_trait]
193impl PlainMessageProcessor for ValidationPlainMessageProcessor {
194    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
195        debug!("Validating incoming message: {}", message.id);
196
197        // Basic validation - ID and type should not be empty
198        if message.id.is_empty() {
199            info!("PlainMessage has empty ID, rejecting");
200            return Ok(None);
201        }
202
203        if message.typ.is_empty() {
204            info!("PlainMessage has empty type, rejecting");
205            return Ok(None);
206        }
207
208        // Validate DID format if present
209        if !message.from.is_empty() && !message.from.starts_with("did:") {
210            info!("Invalid 'from' DID format: {}", message.from);
211            return Ok(None);
212        }
213
214        // Validate recipient DIDs
215        if !message.to.is_empty() {
216            // All DIDs should have valid format
217            for recipient in &message.to {
218                if !recipient.starts_with("did:") {
219                    info!("Invalid recipient DID format: {}", recipient);
220                    return Ok(None);
221                }
222            }
223        }
224
225        // Validate body
226        if message.body == serde_json::json!(null) {
227            info!("PlainMessage has null body, rejecting");
228            return Ok(None);
229        }
230
231        // Validate pthid if present
232        if let Some(pthid) = &message.pthid {
233            if pthid.is_empty() {
234                info!("PlainMessage has empty parent thread ID, rejecting");
235                return Ok(None);
236            }
237        }
238
239        // Validate timestamp
240        if let Some(created_time) = message.created_time {
241            let now = chrono::Utc::now().timestamp() as u64;
242            // Check if the timestamp is more than 5 minutes in the future
243            if created_time > now + 300 {
244                info!("PlainMessage has future timestamp, rejecting");
245                return Ok(None);
246            }
247        }
248
249        // Protocol-specific validation based on message type
250        let typ = &message.typ;
251
252        // Validate TAP messages
253        if typ.starts_with("https://tap.rsvp/schema/") {
254            // TAP-specific validations
255            // Check that it's a valid TAP message type
256            if !typ.contains("transfer")
257                && !typ.contains("authorize")
258                && !typ.contains("reject")
259                && !typ.contains("settle")
260            {
261                info!("Unknown TAP message type: {}", typ);
262                return Ok(None);
263            }
264        }
265        // Validate DIDComm messages
266        else if typ.starts_with("https://didcomm.org/") {
267            // DIDComm-specific validations
268            // Add more specific DIDComm validations here
269        }
270        // Unknown message type protocol
271        else if !typ.starts_with("https://tap.rsvp/schema/")
272            && !typ.starts_with("https://didcomm.org/")
273        {
274            info!("Unknown message protocol: {}", typ);
275            // Reject unknown message protocols
276            return Ok(None);
277        }
278
279        // PlainMessage passed validation
280        Ok(Some(message))
281    }
282
283    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
284        debug!("Validating outgoing message: {}", message.id);
285
286        // For outgoing messages, apply the same validations as incoming messages
287        // In a production system, there might be different validations for outgoing vs incoming
288
289        // Basic validation - ID and type should not be empty
290        if message.id.is_empty() {
291            info!("Outgoing message has empty ID, rejecting");
292            return Ok(None);
293        }
294
295        if message.typ.is_empty() {
296            info!("Outgoing message has empty type, rejecting");
297            return Ok(None);
298        }
299
300        // Validate DID format if present
301        if !message.from.is_empty() && !message.from.starts_with("did:") {
302            info!(
303                "Invalid 'from' DID format in outgoing message: {}",
304                message.from
305            );
306            return Ok(None);
307        }
308
309        // Validate recipient DIDs
310        if !message.to.is_empty() {
311            // All DIDs should have valid format
312            for recipient in &message.to {
313                if !recipient.starts_with("did:") {
314                    info!(
315                        "Invalid recipient DID format in outgoing message: {}",
316                        recipient
317                    );
318                    return Ok(None);
319                }
320            }
321        }
322
323        // Validate body
324        if message.body == serde_json::json!(null) {
325            info!("Outgoing message has null body, rejecting");
326            return Ok(None);
327        }
328
329        // Validate pthid if present
330        if let Some(pthid) = &message.pthid {
331            if pthid.is_empty() {
332                info!("Outgoing message has empty parent thread ID, rejecting");
333                return Ok(None);
334            }
335        }
336
337        // Validate timestamp
338        if let Some(created_time) = message.created_time {
339            let now = chrono::Utc::now().timestamp() as u64;
340            // Check if the timestamp is more than 5 minutes in the future
341            if created_time > now + 300 {
342                info!("Outgoing message has future timestamp, rejecting");
343                return Ok(None);
344            }
345        }
346
347        // Protocol-specific validation based on message type
348        let typ = &message.typ;
349
350        // Validate TAP messages
351        if typ.starts_with("https://tap.rsvp/schema/") {
352            // TAP-specific validations
353            // Check that it's a valid TAP message type
354            if !typ.contains("transfer")
355                && !typ.contains("authorize")
356                && !typ.contains("reject")
357                && !typ.contains("settle")
358            {
359                info!("Unknown TAP message type in outgoing message: {}", typ);
360                return Ok(None);
361            }
362        }
363        // Validate DIDComm messages
364        else if typ.starts_with("https://didcomm.org/") {
365            // DIDComm-specific validations
366            // Add more specific DIDComm validations here
367        }
368        // Unknown message type protocol
369        else if !typ.starts_with("https://tap.rsvp/schema/")
370            && !typ.starts_with("https://didcomm.org/")
371        {
372            info!("Unknown message protocol in outgoing message: {}", typ);
373            // Reject unknown message protocols
374            return Ok(None);
375        }
376
377        // PlainMessage passed validation
378        Ok(Some(message))
379    }
380}
381
382/// Default message processor with core functionality
383#[derive(Debug, Clone)]
384pub struct DefaultPlainMessageProcessor;
385
386#[async_trait]
387impl PlainMessageProcessor for DefaultPlainMessageProcessor {
388    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
389        // By default, we just pass the message through
390        Ok(Some(message))
391    }
392
393    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
394        // By default, we just pass the message through
395        Ok(Some(message))
396    }
397}
398
399/// Default message processor that logs and validates messages
400#[derive(Clone, Debug)]
401pub struct DefaultPlainMessageProcessorImpl {
402    /// The internal processor
403    processor: crate::message::PlainMessageProcessorType,
404}
405
406impl Default for DefaultPlainMessageProcessorImpl {
407    fn default() -> Self {
408        Self::new()
409    }
410}
411
412impl DefaultPlainMessageProcessorImpl {
413    /// Create a new default message processor
414    pub fn new() -> Self {
415        let logging_processor =
416            crate::message::PlainMessageProcessorType::Logging(LoggingPlainMessageProcessor);
417        let validation_processor =
418            crate::message::PlainMessageProcessorType::Validation(ValidationPlainMessageProcessor);
419
420        let mut processor = crate::message::CompositePlainMessageProcessor::new(Vec::new());
421        processor.add_processor(validation_processor);
422        processor.add_processor(logging_processor);
423
424        let processor = crate::message::PlainMessageProcessorType::Composite(processor);
425
426        Self { processor }
427    }
428}
429
430#[async_trait]
431impl PlainMessageProcessor for DefaultPlainMessageProcessorImpl {
432    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
433        match &self.processor {
434            crate::message::PlainMessageProcessorType::Default(p) => {
435                p.process_incoming(message).await
436            }
437            crate::message::PlainMessageProcessorType::Logging(p) => {
438                p.process_incoming(message).await
439            }
440            crate::message::PlainMessageProcessorType::Validation(p) => {
441                p.process_incoming(message).await
442            }
443            crate::message::PlainMessageProcessorType::Composite(p) => {
444                p.process_incoming(message).await
445            }
446        }
447    }
448
449    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
450        match &self.processor {
451            crate::message::PlainMessageProcessorType::Default(p) => {
452                p.process_outgoing(message).await
453            }
454            crate::message::PlainMessageProcessorType::Logging(p) => {
455                p.process_outgoing(message).await
456            }
457            crate::message::PlainMessageProcessorType::Validation(p) => {
458                p.process_outgoing(message).await
459            }
460            crate::message::PlainMessageProcessorType::Composite(p) => {
461                p.process_outgoing(message).await
462            }
463        }
464    }
465}