tap_node/message/
processor.rs

1//! # PlainMessage Processor Implementations for TAP Node
2//!
3//! This module provides message processing functionality for TAP Node. PlainMessage processors
4//! serve as middleware in the message handling pipeline, allowing for validation, transformation,
5//! and filtering of messages as they flow through the system.
6//!
7//! ## PlainMessage Processing Pipeline
8//!
9//! The TAP Node uses a pipeline architecture for message processing, where messages pass through
10//! a series of processors in sequence. Each processor can:
11//!
12//! - Pass the message through unchanged
13//! - Transform the message in some way
14//! - Filter out (drop) messages based on certain criteria
15//! - Perform side effects (logging, metrics collection, etc.)
16//!
17//! ## Processor Types
18//!
19//! The module provides several built-in processor implementations:
20//!
21//! - `LoggingPlainMessageProcessor`: Logs information about messages passing through the system
22//! - `ValidationPlainMessageProcessor`: Validates message structure and content
23//! - `DefaultPlainMessageProcessor`: A simple pass-through processor with minimal functionality
24//! - `CompositePlainMessageProcessor`: Combines multiple processors into a processing chain
25//!
26//! ## Custom Processors
27//!
28//! You can create custom processors by implementing the `PlainMessageProcessor` trait. This
29//! allows for specialized processing such as:
30//!
31//! - PlainMessage transformation for protocol version compatibility
32//! - Content-based filtering and routing
33//! - Security scanning and anomaly detection
34//! - Metrics collection and performance monitoring
35//!
36//! ## Processing Modes
37//!
38//! Each processor implements two key methods:
39//!
40//! - `process_incoming()`: For messages received by the node
41//! - `process_outgoing()`: For messages being sent from the node
42//!
43//! This separation allows for different processing logic depending on message direction.
44
45use async_trait::async_trait;
46use log::{debug, info};
47use std::sync::Arc;
48use tap_msg::didcomm::PlainMessage;
49
50use crate::error::Result;
51
52/// Trait for processing DIDComm messages in TAP nodes
53///
54/// The `PlainMessageProcessor` trait defines the interface for message processors
55/// that handle DIDComm messages flowing through the TAP node. Processors act
56/// as middleware, allowing for validation, transformation, logging, metrics
57/// collection, and other operations on messages.
58///
59/// # Design Patterns
60///
61/// This trait follows the Chain of Responsibility pattern, where each processor
62/// can either:
63/// - Pass the message along unchanged
64/// - Transform the message before passing it along
65/// - Filter out (drop) the message by returning None
66/// - Perform side effects during processing (logging, metrics, etc.)
67///
68/// # Thread Safety
69///
70/// All implementations must be `Send + Sync + Clone` to ensure they can be
71/// safely used in multithreaded environments and composed into processor chains.
72///
73/// # Implementation Guidelines
74///
75/// When implementing a custom processor:
76/// - Ensure both `process_incoming` and `process_outgoing` are implemented
77/// - Be mindful of performance in high-throughput environments
78/// - Consider making processors stateless when possible
79/// - Use the processor's Clone trait to avoid expensive setup/teardown
80/// - Document any side effects or transformations clearly
81///
82/// # Examples
83///
84/// ```
85/// # use async_trait::async_trait;
86/// # use tap_node::error::Result;
87/// # use tap_msg::didcomm::PlainMessage;
88/// # use tap_node::message::processor::PlainMessageProcessor;
89/// #
90/// #[derive(Clone, Debug)]
91/// struct MyCustomProcessor;
92///
93/// #[async_trait]
94/// impl PlainMessageProcessor for MyCustomProcessor {
95///     async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
96///         // Process incoming message - e.g., validate fields, log, transform
97///         println!("Processing incoming message: {}", message.id);
98///         Ok(Some(message))  // Pass message along unchanged
99///     }
100///
101///     async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
102///         // Process outgoing message
103///         println!("Processing outgoing message: {}", message.id);
104///         Ok(Some(message))  // Pass message along unchanged
105///     }
106/// }
107/// ```
108#[async_trait]
109pub trait PlainMessageProcessor: Send + Sync + Clone {
110    /// Process an incoming message received by the node
111    ///
112    /// This method handles messages that are being received by the TAP node from
113    /// external sources. Implementations can validate, transform, or filter these
114    /// messages before they are routed to their target agents.
115    ///
116    /// # Parameters
117    ///
118    /// * `message` - The DIDComm message to process
119    ///
120    /// # Returns
121    ///
122    /// * `Ok(Some(message))` - The message to pass to the next processor
123    /// * `Ok(None)` - Drop the message (do not process further)
124    /// * `Err(e)` - Processing error
125    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>>;
126
127    /// Process an outgoing message being sent from the node
128    ///
129    /// This method handles messages that are being sent from the TAP node to
130    /// external recipients. Implementations can transform these messages for
131    /// compatibility, add headers, perform logging, or filter messages before
132    /// they are delivered.
133    ///
134    /// # Parameters
135    ///
136    /// * `message` - The DIDComm message to process
137    ///
138    /// # Returns
139    ///
140    /// * `Ok(Some(message))` - The message to pass to the next processor
141    /// * `Ok(None)` - Drop the message (do not process further)
142    /// * `Err(e)` - Processing error
143    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>>;
144}
145
146/// A message processor that logs messages
147#[derive(Debug, Clone)]
148pub struct LoggingPlainMessageProcessor;
149
150#[async_trait]
151impl PlainMessageProcessor for LoggingPlainMessageProcessor {
152    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
153        info!("Incoming message: {}", message.id);
154        debug!("PlainMessage content: {:?}", message);
155        Ok(Some(message))
156    }
157
158    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
159        info!("Outgoing message: {}", message.id);
160        debug!("PlainMessage content: {:?}", message);
161        Ok(Some(message))
162    }
163}
164
165/// A message processor that validates messages
166///
167/// This processor validates incoming and outgoing DIDComm messages to ensure they
168/// conform to the expected structure and protocol requirements.
169///
170/// In a production implementation, this would perform comprehensive validation including:
171/// - Field validation (required fields, format, values)
172/// - Protocol compliance checks for each message type
173/// - Signature verification
174/// - Timestamp and expiration checks
175/// - Security and authorization checks
176///
177/// # Implementation
178///
179/// Currently, this implementation validates:
180/// - The message ID is not empty
181/// - The message type is not empty
182/// - Any 'from' or 'to' DIDs follow the 'did:' prefix format
183/// - Basic protocol-specific requirements based on message type
184///
185/// # PlainMessage Flow
186///
187/// The validator sits in the message processor pipeline and can filter out invalid
188/// messages by returning Ok(None), or let valid messages continue through the
189/// pipeline by returning Ok(Some(message)).
190#[derive(Debug, Clone)]
191pub struct ValidationPlainMessageProcessor;
192
193#[async_trait]
194impl PlainMessageProcessor for ValidationPlainMessageProcessor {
195    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
196        debug!("Validating incoming message: {}", message.id);
197
198        // Basic validation - ID and type should not be empty
199        if message.id.is_empty() {
200            info!("PlainMessage has empty ID, rejecting");
201            return Ok(None);
202        }
203
204        if message.typ.is_empty() {
205            info!("PlainMessage has empty type, rejecting");
206            return Ok(None);
207        }
208
209        // Validate DID format if present
210        if !message.from.is_empty() && !message.from.starts_with("did:") {
211            info!("Invalid 'from' DID format: {}", message.from);
212            return Ok(None);
213        }
214
215        // Validate recipient DIDs
216        if !message.to.is_empty() {
217            // All DIDs should have valid format
218            for recipient in &message.to {
219                if !recipient.starts_with("did:") {
220                    info!("Invalid recipient DID format: {}", recipient);
221                    return Ok(None);
222                }
223            }
224        }
225
226        // Validate body
227        if message.body == serde_json::json!(null) {
228            info!("PlainMessage has null body, rejecting");
229            return Ok(None);
230        }
231
232        // Validate pthid if present
233        if let Some(pthid) = &message.pthid {
234            if pthid.is_empty() {
235                info!("PlainMessage has empty parent thread ID, rejecting");
236                return Ok(None);
237            }
238        }
239
240        // Validate timestamp
241        if let Some(created_time) = message.created_time {
242            // Detect if timestamp is in seconds or milliseconds
243            // Timestamps in seconds since 1970 are much smaller than timestamps in milliseconds
244            // A reasonable cutoff is 10^10 (around year 2286 in seconds, or year 1970 + 4 months in milliseconds)
245            let (normalized_created_time, now) = if created_time < 10_000_000_000 {
246                // Timestamp is likely in seconds, convert to milliseconds
247                (
248                    created_time * 1000,
249                    chrono::Utc::now().timestamp_millis() as u64,
250                )
251            } else {
252                // Timestamp is likely in milliseconds
253                (created_time, chrono::Utc::now().timestamp_millis() as u64)
254            };
255
256            // Check if the timestamp is more than 5 minutes in the future (300,000 milliseconds)
257            if normalized_created_time > now + 300_000 {
258                info!("PlainMessage has future timestamp, rejecting");
259                return Ok(None);
260            }
261        }
262
263        // Protocol-specific validation based on message type
264        let message_type = &message.type_;
265
266        // Validate TAP messages
267        if message_type.starts_with("https://tap.rsvp/schema/") {
268            // TAP-specific validations
269            // Check that it's a valid TAP message type
270            if !message_type.contains("Transfer")
271                && !message_type.contains("Authorize")
272                && !message_type.contains("Reject")
273                && !message_type.contains("Settle")
274                && !message_type.contains("Payment")
275                && !message_type.contains("Connect")
276                && !message_type.contains("Cancel")
277                && !message_type.contains("Revert")
278                && !message_type.contains("AddAgents")
279                && !message_type.contains("ReplaceAgent")
280                && !message_type.contains("RemoveAgent")
281                && !message_type.contains("UpdateParty")
282                && !message_type.contains("UpdatePolicies")
283                && !message_type.contains("ConfirmRelationship")
284                && !message_type.contains("OutOfBand")
285                && !message_type.contains("AuthorizationRequired")
286                && !message_type.contains("RequestPresentation")
287                && !message_type.contains("Presentation")
288                && !message_type.contains("Error")
289            {
290                info!("Unknown TAP message type: {}", message_type);
291                return Ok(None);
292            }
293        }
294        // Validate DIDComm messages
295        else if message_type.starts_with("https://didcomm.org/") {
296            // DIDComm-specific validations
297            // Check for common DIDComm message types
298            if !message_type.contains("trust-ping")
299                && !message_type.contains("basicmessage")
300                && !message_type.contains("routing")
301                && !message_type.contains("discover-features")
302                && !message_type.contains("problem-report")
303                && !message_type.contains("ack")
304                && !message_type.contains("notification")
305                && !message_type.contains("ping")
306                && !message_type.contains("coordinate-mediation")
307                && !message_type.contains("keylist")
308                && !message_type.contains("out-of-band")
309            {
310                info!("Unknown DIDComm message type: {}", message_type);
311                // For now, allow unknown DIDComm message types to pass through
312                // In a production system, you might want stricter validation
313            }
314        }
315        // Unknown message type protocol
316        else if !message_type.starts_with("https://tap.rsvp/schema/")
317            && !message_type.starts_with("https://didcomm.org/")
318        {
319            info!("Unknown message protocol: {}", message_type);
320            // Reject unknown message protocols
321            return Ok(None);
322        }
323
324        // PlainMessage passed validation
325        Ok(Some(message))
326    }
327
328    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
329        debug!("Validating outgoing message: {}", message.id);
330
331        // For outgoing messages, apply the same validations as incoming messages
332        // In a production system, there might be different validations for outgoing vs incoming
333
334        // Basic validation - ID and type should not be empty
335        if message.id.is_empty() {
336            info!("Outgoing message has empty ID, rejecting");
337            return Ok(None);
338        }
339
340        if message.typ.is_empty() {
341            info!("Outgoing message has empty type, rejecting");
342            return Ok(None);
343        }
344
345        // Validate DID format if present
346        if !message.from.is_empty() && !message.from.starts_with("did:") {
347            info!(
348                "Invalid 'from' DID format in outgoing message: {}",
349                message.from
350            );
351            return Ok(None);
352        }
353
354        // Validate recipient DIDs
355        if !message.to.is_empty() {
356            // All DIDs should have valid format
357            for recipient in &message.to {
358                if !recipient.starts_with("did:") {
359                    info!(
360                        "Invalid recipient DID format in outgoing message: {}",
361                        recipient
362                    );
363                    return Ok(None);
364                }
365            }
366        }
367
368        // Validate body
369        if message.body == serde_json::json!(null) {
370            info!("Outgoing message has null body, rejecting");
371            return Ok(None);
372        }
373
374        // Validate pthid if present
375        if let Some(pthid) = &message.pthid {
376            if pthid.is_empty() {
377                info!("Outgoing message has empty parent thread ID, rejecting");
378                return Ok(None);
379            }
380        }
381
382        // Validate timestamp
383        if let Some(created_time) = message.created_time {
384            // Detect if timestamp is in seconds or milliseconds
385            // Timestamps in seconds since 1970 are much smaller than timestamps in milliseconds
386            // A reasonable cutoff is 10^10 (around year 2286 in seconds, or year 1970 + 4 months in milliseconds)
387            let (normalized_created_time, now) = if created_time < 10_000_000_000 {
388                // Timestamp is likely in seconds, convert to milliseconds
389                (
390                    created_time * 1000,
391                    chrono::Utc::now().timestamp_millis() as u64,
392                )
393            } else {
394                // Timestamp is likely in milliseconds
395                (created_time, chrono::Utc::now().timestamp_millis() as u64)
396            };
397
398            // Check if the timestamp is more than 5 minutes in the future (300,000 milliseconds)
399            if normalized_created_time > now + 300_000 {
400                info!("Outgoing message has future timestamp, rejecting");
401                return Ok(None);
402            }
403        }
404
405        // Protocol-specific validation based on message type
406        let message_type = &message.type_;
407
408        // Validate TAP messages
409        if message_type.starts_with("https://tap.rsvp/schema/") {
410            // TAP-specific validations
411            // Check that it's a valid TAP message type
412            if !message_type.contains("Transfer")
413                && !message_type.contains("Authorize")
414                && !message_type.contains("Reject")
415                && !message_type.contains("Settle")
416                && !message_type.contains("Payment")
417                && !message_type.contains("Connect")
418                && !message_type.contains("Cancel")
419                && !message_type.contains("Revert")
420                && !message_type.contains("AddAgents")
421                && !message_type.contains("ReplaceAgent")
422                && !message_type.contains("RemoveAgent")
423                && !message_type.contains("UpdateParty")
424                && !message_type.contains("UpdatePolicies")
425                && !message_type.contains("ConfirmRelationship")
426                && !message_type.contains("OutOfBand")
427                && !message_type.contains("AuthorizationRequired")
428                && !message_type.contains("RequestPresentation")
429                && !message_type.contains("Presentation")
430                && !message_type.contains("Error")
431            {
432                info!(
433                    "Unknown TAP message type in outgoing message: {}",
434                    message_type
435                );
436                return Ok(None);
437            }
438        }
439        // Validate DIDComm messages
440        else if message_type.starts_with("https://didcomm.org/") {
441            // DIDComm-specific validations
442            // Check for common DIDComm message types
443            if !message_type.contains("trust-ping")
444                && !message_type.contains("basicmessage")
445                && !message_type.contains("routing")
446                && !message_type.contains("discover-features")
447                && !message_type.contains("problem-report")
448                && !message_type.contains("ack")
449                && !message_type.contains("notification")
450                && !message_type.contains("ping")
451                && !message_type.contains("coordinate-mediation")
452                && !message_type.contains("keylist")
453                && !message_type.contains("out-of-band")
454            {
455                info!(
456                    "Unknown DIDComm message type in outgoing message: {}",
457                    message_type
458                );
459                // For now, allow unknown DIDComm message types to pass through
460                // In a production system, you might want stricter validation
461            }
462        }
463        // Unknown message type protocol
464        else if !message_type.starts_with("https://tap.rsvp/schema/")
465            && !message_type.starts_with("https://didcomm.org/")
466        {
467            info!(
468                "Unknown message protocol in outgoing message: {}",
469                message_type
470            );
471            // Reject unknown message protocols
472            return Ok(None);
473        }
474
475        // PlainMessage passed validation
476        Ok(Some(message))
477    }
478}
479
480/// Default message processor with core functionality
481#[derive(Debug, Clone)]
482pub struct DefaultPlainMessageProcessor;
483
484#[async_trait]
485impl PlainMessageProcessor for DefaultPlainMessageProcessor {
486    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
487        // By default, we just pass the message through
488        Ok(Some(message))
489    }
490
491    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
492        // By default, we just pass the message through
493        Ok(Some(message))
494    }
495}
496
497/// Default message processor that logs and validates messages
498#[derive(Clone, Debug)]
499pub struct DefaultPlainMessageProcessorImpl {
500    /// The internal processor
501    processor: crate::message::PlainMessageProcessorType,
502}
503
504impl Default for DefaultPlainMessageProcessorImpl {
505    fn default() -> Self {
506        Self::new()
507    }
508}
509
510impl DefaultPlainMessageProcessorImpl {
511    /// Create a new default message processor
512    pub fn new() -> Self {
513        let logging_processor =
514            crate::message::PlainMessageProcessorType::Logging(LoggingPlainMessageProcessor);
515        let validation_processor =
516            crate::message::PlainMessageProcessorType::Validation(ValidationPlainMessageProcessor);
517
518        let mut processor = crate::message::CompositePlainMessageProcessor::new(Vec::new());
519        processor.add_processor(validation_processor);
520        processor.add_processor(logging_processor);
521
522        let processor = crate::message::PlainMessageProcessorType::Composite(processor);
523
524        Self { processor }
525    }
526}
527
528#[async_trait]
529impl PlainMessageProcessor for DefaultPlainMessageProcessorImpl {
530    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
531        match &self.processor {
532            crate::message::PlainMessageProcessorType::Default(p) => {
533                p.process_incoming(message).await
534            }
535            crate::message::PlainMessageProcessorType::Logging(p) => {
536                p.process_incoming(message).await
537            }
538            crate::message::PlainMessageProcessorType::Validation(p) => {
539                p.process_incoming(message).await
540            }
541            crate::message::PlainMessageProcessorType::StateMachine(p) => {
542                p.process_incoming(message).await
543            }
544            crate::message::PlainMessageProcessorType::Composite(p) => {
545                p.process_incoming(message).await
546            }
547            crate::message::PlainMessageProcessorType::TravelRule(p) => {
548                p.process_incoming(message).await
549            }
550            crate::message::PlainMessageProcessorType::TrustPing(p) => {
551                p.process_incoming(message).await
552            }
553        }
554    }
555
556    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
557        match &self.processor {
558            crate::message::PlainMessageProcessorType::Default(p) => {
559                p.process_outgoing(message).await
560            }
561            crate::message::PlainMessageProcessorType::Logging(p) => {
562                p.process_outgoing(message).await
563            }
564            crate::message::PlainMessageProcessorType::Validation(p) => {
565                p.process_outgoing(message).await
566            }
567            crate::message::PlainMessageProcessorType::StateMachine(p) => {
568                p.process_outgoing(message).await
569            }
570            crate::message::PlainMessageProcessorType::Composite(p) => {
571                p.process_outgoing(message).await
572            }
573            crate::message::PlainMessageProcessorType::TravelRule(p) => {
574                p.process_outgoing(message).await
575            }
576            crate::message::PlainMessageProcessorType::TrustPing(p) => {
577                p.process_outgoing(message).await
578            }
579        }
580    }
581}
582
583/// State machine integration processor
584///
585/// This processor integrates the message processing pipeline with the transaction state machine.
586/// It processes incoming TAP messages and updates transaction state accordingly.
587#[derive(Clone)]
588pub struct StateMachineIntegrationProcessor {
589    /// Arc-wrapped state processor for thread safety
590    state_processor: Option<Arc<dyn crate::state_machine::TransactionStateProcessor>>,
591}
592
593impl std::fmt::Debug for StateMachineIntegrationProcessor {
594    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
595        f.debug_struct("StateMachineIntegrationProcessor")
596            .field("state_processor", &self.state_processor.is_some())
597            .finish()
598    }
599}
600
601impl Default for StateMachineIntegrationProcessor {
602    fn default() -> Self {
603        Self::new()
604    }
605}
606
607impl StateMachineIntegrationProcessor {
608    /// Create a new state machine integration processor
609    pub fn new() -> Self {
610        Self {
611            state_processor: None,
612        }
613    }
614
615    /// Set the state processor
616    pub fn with_state_processor(
617        mut self,
618        processor: Arc<dyn crate::state_machine::TransactionStateProcessor>,
619    ) -> Self {
620        self.state_processor = Some(processor);
621        self
622    }
623}
624
625#[async_trait]
626impl PlainMessageProcessor for StateMachineIntegrationProcessor {
627    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
628        // Process the message through the state machine if available
629        if let Some(state_processor) = &self.state_processor {
630            if let Err(e) = state_processor.process_message(&message).await {
631                log::warn!(
632                    "State machine processing failed for message {}: {}",
633                    message.id,
634                    e
635                );
636                // Don't fail the message processing, just log the error
637            }
638        }
639
640        // Always pass the message through for further processing
641        Ok(Some(message))
642    }
643
644    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
645        // For outgoing messages, we typically don't need state machine processing
646        // since they're already being sent by the state machine or agents
647        Ok(Some(message))
648    }
649}