tap-node 0.6.0

Transaction Authorization Protocol (TAP) node implementation for routing and processing messages
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
//! # PlainMessage Processor Implementations for TAP Node
//!
//! This module provides message processing functionality for TAP Node. PlainMessage processors
//! serve as middleware in the message handling pipeline, allowing for validation, transformation,
//! and filtering of messages as they flow through the system.
//!
//! ## PlainMessage Processing Pipeline
//!
//! The TAP Node uses a pipeline architecture for message processing, where messages pass through
//! a series of processors in sequence. Each processor can:
//!
//! - Pass the message through unchanged
//! - Transform the message in some way
//! - Filter out (drop) messages based on certain criteria
//! - Perform side effects (logging, metrics collection, etc.)
//!
//! ## Processor Types
//!
//! The module provides several built-in processor implementations:
//!
//! - `LoggingPlainMessageProcessor`: Logs information about messages passing through the system
//! - `ValidationPlainMessageProcessor`: Validates message structure and content
//! - `DefaultPlainMessageProcessor`: A simple pass-through processor with minimal functionality
//! - `CompositePlainMessageProcessor`: Combines multiple processors into a processing chain
//!
//! ## Custom Processors
//!
//! You can create custom processors by implementing the `PlainMessageProcessor` trait. This
//! allows for specialized processing such as:
//!
//! - PlainMessage transformation for protocol version compatibility
//! - Content-based filtering and routing
//! - Security scanning and anomaly detection
//! - Metrics collection and performance monitoring
//!
//! ## Processing Modes
//!
//! Each processor implements two key methods:
//!
//! - `process_incoming()`: For messages received by the node
//! - `process_outgoing()`: For messages being sent from the node
//!
//! This separation allows for different processing logic depending on message direction.

use async_trait::async_trait;
use log::{debug, info};
use std::sync::Arc;
use tap_msg::didcomm::PlainMessage;

use crate::error::Result;

/// Trait for processing DIDComm messages in TAP nodes
///
/// The `PlainMessageProcessor` trait defines the interface for message processors
/// that handle DIDComm messages flowing through the TAP node. Processors act
/// as middleware, allowing for validation, transformation, logging, metrics
/// collection, and other operations on messages.
///
/// # Design Patterns
///
/// This trait follows the Chain of Responsibility pattern, where each processor
/// can either:
/// - Pass the message along unchanged
/// - Transform the message before passing it along
/// - Filter out (drop) the message by returning None
/// - Perform side effects during processing (logging, metrics, etc.)
///
/// # Thread Safety
///
/// All implementations must be `Send + Sync + Clone` to ensure they can be
/// safely used in multithreaded environments and composed into processor chains.
///
/// # Implementation Guidelines
///
/// When implementing a custom processor:
/// - Ensure both `process_incoming` and `process_outgoing` are implemented
/// - Be mindful of performance in high-throughput environments
/// - Consider making processors stateless when possible
/// - Use the processor's Clone trait to avoid expensive setup/teardown
/// - Document any side effects or transformations clearly
///
/// # Examples
///
/// ```
/// # use async_trait::async_trait;
/// # use tap_node::error::Result;
/// # use tap_msg::didcomm::PlainMessage;
/// # use tap_node::message::processor::PlainMessageProcessor;
/// #
/// #[derive(Clone, Debug)]
/// struct MyCustomProcessor;
///
/// #[async_trait]
/// impl PlainMessageProcessor for MyCustomProcessor {
///     async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
///         // Process incoming message - e.g., validate fields, log, transform
///         println!("Processing incoming message: {}", message.id);
///         Ok(Some(message))  // Pass message along unchanged
///     }
///
///     async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
///         // Process outgoing message
///         println!("Processing outgoing message: {}", message.id);
///         Ok(Some(message))  // Pass message along unchanged
///     }
/// }
/// ```
#[async_trait]
pub trait PlainMessageProcessor: Send + Sync + Clone {
    /// Process an incoming message received by the node
    ///
    /// This method handles messages that are being received by the TAP node from
    /// external sources. Implementations can validate, transform, or filter these
    /// messages before they are routed to their target agents.
    ///
    /// # Parameters
    ///
    /// * `message` - The DIDComm message to process
    ///
    /// # Returns
    ///
    /// * `Ok(Some(message))` - The message to pass to the next processor
    /// * `Ok(None)` - Drop the message (do not process further)
    /// * `Err(e)` - Processing error
    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>>;

    /// Process an outgoing message being sent from the node
    ///
    /// This method handles messages that are being sent from the TAP node to
    /// external recipients. Implementations can transform these messages for
    /// compatibility, add headers, perform logging, or filter messages before
    /// they are delivered.
    ///
    /// # Parameters
    ///
    /// * `message` - The DIDComm message to process
    ///
    /// # Returns
    ///
    /// * `Ok(Some(message))` - The message to pass to the next processor
    /// * `Ok(None)` - Drop the message (do not process further)
    /// * `Err(e)` - Processing error
    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>>;
}

/// A message processor that logs messages
#[derive(Debug, Clone)]
pub struct LoggingPlainMessageProcessor;

#[async_trait]
impl PlainMessageProcessor for LoggingPlainMessageProcessor {
    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
        info!("Incoming message: {}", message.id);
        debug!("PlainMessage content: {:?}", message);
        Ok(Some(message))
    }

    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
        info!("Outgoing message: {}", message.id);
        debug!("PlainMessage content: {:?}", message);
        Ok(Some(message))
    }
}

/// A message processor that validates messages
///
/// This processor validates incoming and outgoing DIDComm messages to ensure they
/// conform to the expected structure and protocol requirements.
///
/// In a production implementation, this would perform comprehensive validation including:
/// - Field validation (required fields, format, values)
/// - Protocol compliance checks for each message type
/// - Signature verification
/// - Timestamp and expiration checks
/// - Security and authorization checks
///
/// # Implementation
///
/// Currently, this implementation validates:
/// - The message ID is not empty
/// - The message type is not empty
/// - Any 'from' or 'to' DIDs follow the 'did:' prefix format
/// - Basic protocol-specific requirements based on message type
///
/// # PlainMessage Flow
///
/// The validator sits in the message processor pipeline and can filter out invalid
/// messages by returning Ok(None), or let valid messages continue through the
/// pipeline by returning Ok(Some(message)).
#[derive(Debug, Clone)]
pub struct ValidationPlainMessageProcessor;

#[async_trait]
impl PlainMessageProcessor for ValidationPlainMessageProcessor {
    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
        debug!("Validating incoming message: {}", message.id);

        // Basic validation - ID and type should not be empty
        if message.id.is_empty() {
            info!("PlainMessage has empty ID, rejecting");
            return Ok(None);
        }

        if message.typ.is_empty() {
            info!("PlainMessage has empty type, rejecting");
            return Ok(None);
        }

        // Validate DID format if present
        if !message.from.is_empty() && !message.from.starts_with("did:") {
            info!("Invalid 'from' DID format: {}", message.from);
            return Ok(None);
        }

        // Validate recipient DIDs
        if !message.to.is_empty() {
            // All DIDs should have valid format
            for recipient in &message.to {
                if !recipient.starts_with("did:") {
                    info!("Invalid recipient DID format: {}", recipient);
                    return Ok(None);
                }
            }
        }

        // Validate body
        if message.body == serde_json::json!(null) {
            info!("PlainMessage has null body, rejecting");
            return Ok(None);
        }

        // Validate pthid if present
        if let Some(pthid) = &message.pthid {
            if pthid.is_empty() {
                info!("PlainMessage has empty parent thread ID, rejecting");
                return Ok(None);
            }
        }

        // Validate timestamp
        if let Some(created_time) = message.created_time {
            // Detect if timestamp is in seconds or milliseconds
            // Timestamps in seconds since 1970 are much smaller than timestamps in milliseconds
            // A reasonable cutoff is 10^10 (around year 2286 in seconds, or year 1970 + 4 months in milliseconds)
            let (normalized_created_time, now) = if created_time < 10_000_000_000 {
                // Timestamp is likely in seconds, convert to milliseconds
                (
                    created_time * 1000,
                    chrono::Utc::now().timestamp_millis() as u64,
                )
            } else {
                // Timestamp is likely in milliseconds
                (created_time, chrono::Utc::now().timestamp_millis() as u64)
            };

            // Check if the timestamp is more than 5 minutes in the future (300,000 milliseconds)
            if normalized_created_time > now + 300_000 {
                info!("PlainMessage has future timestamp, rejecting");
                return Ok(None);
            }
        }

        // Protocol-specific validation based on message type
        let message_type = &message.type_;

        // Validate TAP messages
        if message_type.starts_with("https://tap.rsvp/schema/") {
            // TAP-specific validations
            // Check that it's a valid TAP message type
            if !message_type.contains("Transfer")
                && !message_type.contains("Authorize")
                && !message_type.contains("Reject")
                && !message_type.contains("Settle")
                && !message_type.contains("Payment")
                && !message_type.contains("Connect")
                && !message_type.contains("Cancel")
                && !message_type.contains("Revert")
                && !message_type.contains("AddAgents")
                && !message_type.contains("ReplaceAgent")
                && !message_type.contains("RemoveAgent")
                && !message_type.contains("UpdateParty")
                && !message_type.contains("UpdatePolicies")
                && !message_type.contains("ConfirmRelationship")
                && !message_type.contains("OutOfBand")
                && !message_type.contains("AuthorizationRequired")
                && !message_type.contains("RequestPresentation")
                && !message_type.contains("Presentation")
                && !message_type.contains("Error")
            {
                info!("Unknown TAP message type: {}", message_type);
                return Ok(None);
            }
        }
        // Validate DIDComm messages
        else if message_type.starts_with("https://didcomm.org/") {
            // DIDComm-specific validations
            // Check for common DIDComm message types
            if !message_type.contains("trust-ping")
                && !message_type.contains("basicmessage")
                && !message_type.contains("routing")
                && !message_type.contains("discover-features")
                && !message_type.contains("problem-report")
                && !message_type.contains("ack")
                && !message_type.contains("notification")
                && !message_type.contains("ping")
                && !message_type.contains("coordinate-mediation")
                && !message_type.contains("keylist")
                && !message_type.contains("out-of-band")
            {
                info!("Unknown DIDComm message type: {}", message_type);
                // For now, allow unknown DIDComm message types to pass through
                // In a production system, you might want stricter validation
            }
        }
        // Unknown message type protocol
        else if !message_type.starts_with("https://tap.rsvp/schema/")
            && !message_type.starts_with("https://didcomm.org/")
        {
            info!("Unknown message protocol: {}", message_type);
            // Reject unknown message protocols
            return Ok(None);
        }

        // PlainMessage passed validation
        Ok(Some(message))
    }

    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
        debug!("Validating outgoing message: {}", message.id);

        // For outgoing messages, apply the same validations as incoming messages
        // In a production system, there might be different validations for outgoing vs incoming

        // Basic validation - ID and type should not be empty
        if message.id.is_empty() {
            info!("Outgoing message has empty ID, rejecting");
            return Ok(None);
        }

        if message.typ.is_empty() {
            info!("Outgoing message has empty type, rejecting");
            return Ok(None);
        }

        // Validate DID format if present
        if !message.from.is_empty() && !message.from.starts_with("did:") {
            info!(
                "Invalid 'from' DID format in outgoing message: {}",
                message.from
            );
            return Ok(None);
        }

        // Validate recipient DIDs
        if !message.to.is_empty() {
            // All DIDs should have valid format
            for recipient in &message.to {
                if !recipient.starts_with("did:") {
                    info!(
                        "Invalid recipient DID format in outgoing message: {}",
                        recipient
                    );
                    return Ok(None);
                }
            }
        }

        // Validate body
        if message.body == serde_json::json!(null) {
            info!("Outgoing message has null body, rejecting");
            return Ok(None);
        }

        // Validate pthid if present
        if let Some(pthid) = &message.pthid {
            if pthid.is_empty() {
                info!("Outgoing message has empty parent thread ID, rejecting");
                return Ok(None);
            }
        }

        // Validate timestamp
        if let Some(created_time) = message.created_time {
            // Detect if timestamp is in seconds or milliseconds
            // Timestamps in seconds since 1970 are much smaller than timestamps in milliseconds
            // A reasonable cutoff is 10^10 (around year 2286 in seconds, or year 1970 + 4 months in milliseconds)
            let (normalized_created_time, now) = if created_time < 10_000_000_000 {
                // Timestamp is likely in seconds, convert to milliseconds
                (
                    created_time * 1000,
                    chrono::Utc::now().timestamp_millis() as u64,
                )
            } else {
                // Timestamp is likely in milliseconds
                (created_time, chrono::Utc::now().timestamp_millis() as u64)
            };

            // Check if the timestamp is more than 5 minutes in the future (300,000 milliseconds)
            if normalized_created_time > now + 300_000 {
                info!("Outgoing message has future timestamp, rejecting");
                return Ok(None);
            }
        }

        // Protocol-specific validation based on message type
        let message_type = &message.type_;

        // Validate TAP messages
        if message_type.starts_with("https://tap.rsvp/schema/") {
            // TAP-specific validations
            // Check that it's a valid TAP message type
            if !message_type.contains("Transfer")
                && !message_type.contains("Authorize")
                && !message_type.contains("Reject")
                && !message_type.contains("Settle")
                && !message_type.contains("Payment")
                && !message_type.contains("Connect")
                && !message_type.contains("Cancel")
                && !message_type.contains("Revert")
                && !message_type.contains("AddAgents")
                && !message_type.contains("ReplaceAgent")
                && !message_type.contains("RemoveAgent")
                && !message_type.contains("UpdateParty")
                && !message_type.contains("UpdatePolicies")
                && !message_type.contains("ConfirmRelationship")
                && !message_type.contains("OutOfBand")
                && !message_type.contains("AuthorizationRequired")
                && !message_type.contains("RequestPresentation")
                && !message_type.contains("Presentation")
                && !message_type.contains("Error")
            {
                info!(
                    "Unknown TAP message type in outgoing message: {}",
                    message_type
                );
                return Ok(None);
            }
        }
        // Validate DIDComm messages
        else if message_type.starts_with("https://didcomm.org/") {
            // DIDComm-specific validations
            // Check for common DIDComm message types
            if !message_type.contains("trust-ping")
                && !message_type.contains("basicmessage")
                && !message_type.contains("routing")
                && !message_type.contains("discover-features")
                && !message_type.contains("problem-report")
                && !message_type.contains("ack")
                && !message_type.contains("notification")
                && !message_type.contains("ping")
                && !message_type.contains("coordinate-mediation")
                && !message_type.contains("keylist")
                && !message_type.contains("out-of-band")
            {
                info!(
                    "Unknown DIDComm message type in outgoing message: {}",
                    message_type
                );
                // For now, allow unknown DIDComm message types to pass through
                // In a production system, you might want stricter validation
            }
        }
        // Unknown message type protocol
        else if !message_type.starts_with("https://tap.rsvp/schema/")
            && !message_type.starts_with("https://didcomm.org/")
        {
            info!(
                "Unknown message protocol in outgoing message: {}",
                message_type
            );
            // Reject unknown message protocols
            return Ok(None);
        }

        // PlainMessage passed validation
        Ok(Some(message))
    }
}

/// Default message processor with core functionality
#[derive(Debug, Clone)]
pub struct DefaultPlainMessageProcessor;

#[async_trait]
impl PlainMessageProcessor for DefaultPlainMessageProcessor {
    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
        // By default, we just pass the message through
        Ok(Some(message))
    }

    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
        // By default, we just pass the message through
        Ok(Some(message))
    }
}

/// Default message processor that logs and validates messages
#[derive(Clone, Debug)]
pub struct DefaultPlainMessageProcessorImpl {
    /// The internal processor
    processor: crate::message::PlainMessageProcessorType,
}

impl Default for DefaultPlainMessageProcessorImpl {
    fn default() -> Self {
        Self::new()
    }
}

impl DefaultPlainMessageProcessorImpl {
    /// Create a new default message processor
    pub fn new() -> Self {
        let logging_processor =
            crate::message::PlainMessageProcessorType::Logging(LoggingPlainMessageProcessor);
        let validation_processor =
            crate::message::PlainMessageProcessorType::Validation(ValidationPlainMessageProcessor);

        let mut processor = crate::message::CompositePlainMessageProcessor::new(Vec::new());
        processor.add_processor(validation_processor);
        processor.add_processor(logging_processor);

        let processor = crate::message::PlainMessageProcessorType::Composite(processor);

        Self { processor }
    }
}

#[async_trait]
impl PlainMessageProcessor for DefaultPlainMessageProcessorImpl {
    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
        match &self.processor {
            crate::message::PlainMessageProcessorType::Default(p) => {
                p.process_incoming(message).await
            }
            crate::message::PlainMessageProcessorType::Logging(p) => {
                p.process_incoming(message).await
            }
            crate::message::PlainMessageProcessorType::Validation(p) => {
                p.process_incoming(message).await
            }
            crate::message::PlainMessageProcessorType::StateMachine(p) => {
                p.process_incoming(message).await
            }
            crate::message::PlainMessageProcessorType::Composite(p) => {
                p.process_incoming(message).await
            }
            crate::message::PlainMessageProcessorType::TravelRule(p) => {
                p.process_incoming(message).await
            }
            crate::message::PlainMessageProcessorType::TrustPing(p) => {
                p.process_incoming(message).await
            }
        }
    }

    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
        match &self.processor {
            crate::message::PlainMessageProcessorType::Default(p) => {
                p.process_outgoing(message).await
            }
            crate::message::PlainMessageProcessorType::Logging(p) => {
                p.process_outgoing(message).await
            }
            crate::message::PlainMessageProcessorType::Validation(p) => {
                p.process_outgoing(message).await
            }
            crate::message::PlainMessageProcessorType::StateMachine(p) => {
                p.process_outgoing(message).await
            }
            crate::message::PlainMessageProcessorType::Composite(p) => {
                p.process_outgoing(message).await
            }
            crate::message::PlainMessageProcessorType::TravelRule(p) => {
                p.process_outgoing(message).await
            }
            crate::message::PlainMessageProcessorType::TrustPing(p) => {
                p.process_outgoing(message).await
            }
        }
    }
}

/// State machine integration processor
///
/// This processor integrates the message processing pipeline with the transaction state machine.
/// It processes incoming TAP messages and updates transaction state accordingly.
#[derive(Clone)]
pub struct StateMachineIntegrationProcessor {
    /// Arc-wrapped state processor for thread safety
    state_processor: Option<Arc<dyn crate::state_machine::TransactionStateProcessor>>,
}

impl std::fmt::Debug for StateMachineIntegrationProcessor {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("StateMachineIntegrationProcessor")
            .field("state_processor", &self.state_processor.is_some())
            .finish()
    }
}

impl Default for StateMachineIntegrationProcessor {
    fn default() -> Self {
        Self::new()
    }
}

impl StateMachineIntegrationProcessor {
    /// Create a new state machine integration processor
    pub fn new() -> Self {
        Self {
            state_processor: None,
        }
    }

    /// Set the state processor
    pub fn with_state_processor(
        mut self,
        processor: Arc<dyn crate::state_machine::TransactionStateProcessor>,
    ) -> Self {
        self.state_processor = Some(processor);
        self
    }
}

#[async_trait]
impl PlainMessageProcessor for StateMachineIntegrationProcessor {
    async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
        // Process the message through the state machine if available
        if let Some(state_processor) = &self.state_processor {
            if let Err(e) = state_processor.process_message(&message).await {
                log::warn!(
                    "State machine processing failed for message {}: {}",
                    message.id,
                    e
                );
                // Don't fail the message processing, just log the error
            }
        }

        // Always pass the message through for further processing
        Ok(Some(message))
    }

    async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
        // For outgoing messages, we typically don't need state machine processing
        // since they're already being sent by the state machine or agents
        Ok(Some(message))
    }
}