tap_node/message/processor.rs
1//! # PlainMessage Processor Implementations for TAP Node
2//!
3//! This module provides message processing functionality for TAP Node. PlainMessage processors
4//! serve as middleware in the message handling pipeline, allowing for validation, transformation,
5//! and filtering of messages as they flow through the system.
6//!
7//! ## PlainMessage Processing Pipeline
8//!
9//! The TAP Node uses a pipeline architecture for message processing, where messages pass through
10//! a series of processors in sequence. Each processor can:
11//!
12//! - Pass the message through unchanged
13//! - Transform the message in some way
14//! - Filter out (drop) messages based on certain criteria
15//! - Perform side effects (logging, metrics collection, etc.)
16//!
17//! ## Processor Types
18//!
19//! The module provides several built-in processor implementations:
20//!
21//! - `LoggingPlainMessageProcessor`: Logs information about messages passing through the system
22//! - `ValidationPlainMessageProcessor`: Validates message structure and content
23//! - `DefaultPlainMessageProcessor`: A simple pass-through processor with minimal functionality
24//! - `CompositePlainMessageProcessor`: Combines multiple processors into a processing chain
25//!
26//! ## Custom Processors
27//!
28//! You can create custom processors by implementing the `PlainMessageProcessor` trait. This
29//! allows for specialized processing such as:
30//!
31//! - PlainMessage transformation for protocol version compatibility
32//! - Content-based filtering and routing
33//! - Security scanning and anomaly detection
34//! - Metrics collection and performance monitoring
35//!
36//! ## Processing Modes
37//!
38//! Each processor implements two key methods:
39//!
40//! - `process_incoming()`: For messages received by the node
41//! - `process_outgoing()`: For messages being sent from the node
42//!
43//! This separation allows for different processing logic depending on message direction.
44
45use async_trait::async_trait;
46use log::{debug, info};
47use std::sync::Arc;
48use tap_msg::didcomm::PlainMessage;
49
50use crate::error::Result;
51
52/// Trait for processing DIDComm messages in TAP nodes
53///
54/// The `PlainMessageProcessor` trait defines the interface for message processors
55/// that handle DIDComm messages flowing through the TAP node. Processors act
56/// as middleware, allowing for validation, transformation, logging, metrics
57/// collection, and other operations on messages.
58///
59/// # Design Patterns
60///
61/// This trait follows the Chain of Responsibility pattern, where each processor
62/// can either:
63/// - Pass the message along unchanged
64/// - Transform the message before passing it along
65/// - Filter out (drop) the message by returning None
66/// - Perform side effects during processing (logging, metrics, etc.)
67///
68/// # Thread Safety
69///
70/// All implementations must be `Send + Sync + Clone` to ensure they can be
71/// safely used in multithreaded environments and composed into processor chains.
72///
73/// # Implementation Guidelines
74///
75/// When implementing a custom processor:
76/// - Ensure both `process_incoming` and `process_outgoing` are implemented
77/// - Be mindful of performance in high-throughput environments
78/// - Consider making processors stateless when possible
79/// - Use the processor's Clone trait to avoid expensive setup/teardown
80/// - Document any side effects or transformations clearly
81///
82/// # Examples
83///
84/// ```
85/// # use async_trait::async_trait;
86/// # use tap_node::error::Result;
87/// # use tap_msg::didcomm::PlainMessage;
88/// # use tap_node::message::processor::PlainMessageProcessor;
89/// #
90/// #[derive(Clone, Debug)]
91/// struct MyCustomProcessor;
92///
93/// #[async_trait]
94/// impl PlainMessageProcessor for MyCustomProcessor {
95/// async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
96/// // Process incoming message - e.g., validate fields, log, transform
97/// println!("Processing incoming message: {}", message.id);
98/// Ok(Some(message)) // Pass message along unchanged
99/// }
100///
101/// async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
102/// // Process outgoing message
103/// println!("Processing outgoing message: {}", message.id);
104/// Ok(Some(message)) // Pass message along unchanged
105/// }
106/// }
107/// ```
108#[async_trait]
109pub trait PlainMessageProcessor: Send + Sync + Clone {
110 /// Process an incoming message received by the node
111 ///
112 /// This method handles messages that are being received by the TAP node from
113 /// external sources. Implementations can validate, transform, or filter these
114 /// messages before they are routed to their target agents.
115 ///
116 /// # Parameters
117 ///
118 /// * `message` - The DIDComm message to process
119 ///
120 /// # Returns
121 ///
122 /// * `Ok(Some(message))` - The message to pass to the next processor
123 /// * `Ok(None)` - Drop the message (do not process further)
124 /// * `Err(e)` - Processing error
125 async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>>;
126
127 /// Process an outgoing message being sent from the node
128 ///
129 /// This method handles messages that are being sent from the TAP node to
130 /// external recipients. Implementations can transform these messages for
131 /// compatibility, add headers, perform logging, or filter messages before
132 /// they are delivered.
133 ///
134 /// # Parameters
135 ///
136 /// * `message` - The DIDComm message to process
137 ///
138 /// # Returns
139 ///
140 /// * `Ok(Some(message))` - The message to pass to the next processor
141 /// * `Ok(None)` - Drop the message (do not process further)
142 /// * `Err(e)` - Processing error
143 async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>>;
144}
145
146/// A message processor that logs messages
147#[derive(Debug, Clone)]
148pub struct LoggingPlainMessageProcessor;
149
150#[async_trait]
151impl PlainMessageProcessor for LoggingPlainMessageProcessor {
152 async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
153 info!("Incoming message: {}", message.id);
154 debug!("PlainMessage content: {:?}", message);
155 Ok(Some(message))
156 }
157
158 async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
159 info!("Outgoing message: {}", message.id);
160 debug!("PlainMessage content: {:?}", message);
161 Ok(Some(message))
162 }
163}
164
165/// A message processor that validates messages
166///
167/// This processor validates incoming and outgoing DIDComm messages to ensure they
168/// conform to the expected structure and protocol requirements.
169///
170/// In a production implementation, this would perform comprehensive validation including:
171/// - Field validation (required fields, format, values)
172/// - Protocol compliance checks for each message type
173/// - Signature verification
174/// - Timestamp and expiration checks
175/// - Security and authorization checks
176///
177/// # Implementation
178///
179/// Currently, this implementation validates:
180/// - The message ID is not empty
181/// - The message type is not empty
182/// - Any 'from' or 'to' DIDs follow the 'did:' prefix format
183/// - Basic protocol-specific requirements based on message type
184///
185/// # PlainMessage Flow
186///
187/// The validator sits in the message processor pipeline and can filter out invalid
188/// messages by returning Ok(None), or let valid messages continue through the
189/// pipeline by returning Ok(Some(message)).
190#[derive(Debug, Clone)]
191pub struct ValidationPlainMessageProcessor;
192
193#[async_trait]
194impl PlainMessageProcessor for ValidationPlainMessageProcessor {
195 async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
196 debug!("Validating incoming message: {}", message.id);
197
198 // Basic validation - ID and type should not be empty
199 if message.id.is_empty() {
200 info!("PlainMessage has empty ID, rejecting");
201 return Ok(None);
202 }
203
204 if message.typ.is_empty() {
205 info!("PlainMessage has empty type, rejecting");
206 return Ok(None);
207 }
208
209 // Validate DID format if present
210 if !message.from.is_empty() && !message.from.starts_with("did:") {
211 info!("Invalid 'from' DID format: {}", message.from);
212 return Ok(None);
213 }
214
215 // Validate recipient DIDs
216 if !message.to.is_empty() {
217 // All DIDs should have valid format
218 for recipient in &message.to {
219 if !recipient.starts_with("did:") {
220 info!("Invalid recipient DID format: {}", recipient);
221 return Ok(None);
222 }
223 }
224 }
225
226 // Validate body
227 if message.body == serde_json::json!(null) {
228 info!("PlainMessage has null body, rejecting");
229 return Ok(None);
230 }
231
232 // Validate pthid if present
233 if let Some(pthid) = &message.pthid {
234 if pthid.is_empty() {
235 info!("PlainMessage has empty parent thread ID, rejecting");
236 return Ok(None);
237 }
238 }
239
240 // Validate timestamp
241 if let Some(created_time) = message.created_time {
242 let now = chrono::Utc::now().timestamp() as u64;
243 // Check if the timestamp is more than 5 minutes in the future
244 if created_time > now + 300 {
245 info!("PlainMessage has future timestamp, rejecting");
246 return Ok(None);
247 }
248 }
249
250 // Protocol-specific validation based on message type
251 let message_type = &message.type_;
252
253 // Validate TAP messages
254 if message_type.starts_with("https://tap.rsvp/schema/") {
255 // TAP-specific validations
256 // Check that it's a valid TAP message type
257 if !message_type.contains("Transfer")
258 && !message_type.contains("Authorize")
259 && !message_type.contains("Reject")
260 && !message_type.contains("Settle")
261 && !message_type.contains("Payment")
262 && !message_type.contains("Connect")
263 && !message_type.contains("Cancel")
264 && !message_type.contains("Revert")
265 && !message_type.contains("AddAgents")
266 && !message_type.contains("ReplaceAgent")
267 && !message_type.contains("RemoveAgent")
268 && !message_type.contains("UpdateParty")
269 && !message_type.contains("UpdatePolicies")
270 && !message_type.contains("ConfirmRelationship")
271 && !message_type.contains("OutOfBand")
272 && !message_type.contains("AuthorizationRequired")
273 && !message_type.contains("RequestPresentation")
274 && !message_type.contains("Presentation")
275 && !message_type.contains("Error")
276 {
277 info!("Unknown TAP message type: {}", message_type);
278 return Ok(None);
279 }
280 }
281 // Validate DIDComm messages
282 else if message_type.starts_with("https://didcomm.org/") {
283 // DIDComm-specific validations
284 // Add more specific DIDComm validations here
285 }
286 // Unknown message type protocol
287 else if !message_type.starts_with("https://tap.rsvp/schema/")
288 && !message_type.starts_with("https://didcomm.org/")
289 {
290 info!("Unknown message protocol: {}", message_type);
291 // Reject unknown message protocols
292 return Ok(None);
293 }
294
295 // PlainMessage passed validation
296 Ok(Some(message))
297 }
298
299 async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
300 debug!("Validating outgoing message: {}", message.id);
301
302 // For outgoing messages, apply the same validations as incoming messages
303 // In a production system, there might be different validations for outgoing vs incoming
304
305 // Basic validation - ID and type should not be empty
306 if message.id.is_empty() {
307 info!("Outgoing message has empty ID, rejecting");
308 return Ok(None);
309 }
310
311 if message.typ.is_empty() {
312 info!("Outgoing message has empty type, rejecting");
313 return Ok(None);
314 }
315
316 // Validate DID format if present
317 if !message.from.is_empty() && !message.from.starts_with("did:") {
318 info!(
319 "Invalid 'from' DID format in outgoing message: {}",
320 message.from
321 );
322 return Ok(None);
323 }
324
325 // Validate recipient DIDs
326 if !message.to.is_empty() {
327 // All DIDs should have valid format
328 for recipient in &message.to {
329 if !recipient.starts_with("did:") {
330 info!(
331 "Invalid recipient DID format in outgoing message: {}",
332 recipient
333 );
334 return Ok(None);
335 }
336 }
337 }
338
339 // Validate body
340 if message.body == serde_json::json!(null) {
341 info!("Outgoing message has null body, rejecting");
342 return Ok(None);
343 }
344
345 // Validate pthid if present
346 if let Some(pthid) = &message.pthid {
347 if pthid.is_empty() {
348 info!("Outgoing message has empty parent thread ID, rejecting");
349 return Ok(None);
350 }
351 }
352
353 // Validate timestamp
354 if let Some(created_time) = message.created_time {
355 let now = chrono::Utc::now().timestamp() as u64;
356 // Check if the timestamp is more than 5 minutes in the future
357 if created_time > now + 300 {
358 info!("Outgoing message has future timestamp, rejecting");
359 return Ok(None);
360 }
361 }
362
363 // Protocol-specific validation based on message type
364 let message_type = &message.type_;
365
366 // Validate TAP messages
367 if message_type.starts_with("https://tap.rsvp/schema/") {
368 // TAP-specific validations
369 // Check that it's a valid TAP message type
370 if !message_type.contains("Transfer")
371 && !message_type.contains("Authorize")
372 && !message_type.contains("Reject")
373 && !message_type.contains("Settle")
374 && !message_type.contains("Payment")
375 && !message_type.contains("Connect")
376 && !message_type.contains("Cancel")
377 && !message_type.contains("Revert")
378 && !message_type.contains("AddAgents")
379 && !message_type.contains("ReplaceAgent")
380 && !message_type.contains("RemoveAgent")
381 && !message_type.contains("UpdateParty")
382 && !message_type.contains("UpdatePolicies")
383 && !message_type.contains("ConfirmRelationship")
384 && !message_type.contains("OutOfBand")
385 && !message_type.contains("AuthorizationRequired")
386 && !message_type.contains("RequestPresentation")
387 && !message_type.contains("Presentation")
388 && !message_type.contains("Error")
389 {
390 info!(
391 "Unknown TAP message type in outgoing message: {}",
392 message_type
393 );
394 return Ok(None);
395 }
396 }
397 // Validate DIDComm messages
398 else if message_type.starts_with("https://didcomm.org/") {
399 // DIDComm-specific validations
400 // Add more specific DIDComm validations here
401 }
402 // Unknown message type protocol
403 else if !message_type.starts_with("https://tap.rsvp/schema/")
404 && !message_type.starts_with("https://didcomm.org/")
405 {
406 info!(
407 "Unknown message protocol in outgoing message: {}",
408 message_type
409 );
410 // Reject unknown message protocols
411 return Ok(None);
412 }
413
414 // PlainMessage passed validation
415 Ok(Some(message))
416 }
417}
418
419/// Default message processor with core functionality
420#[derive(Debug, Clone)]
421pub struct DefaultPlainMessageProcessor;
422
423#[async_trait]
424impl PlainMessageProcessor for DefaultPlainMessageProcessor {
425 async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
426 // By default, we just pass the message through
427 Ok(Some(message))
428 }
429
430 async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
431 // By default, we just pass the message through
432 Ok(Some(message))
433 }
434}
435
436/// Default message processor that logs and validates messages
437#[derive(Clone, Debug)]
438pub struct DefaultPlainMessageProcessorImpl {
439 /// The internal processor
440 processor: crate::message::PlainMessageProcessorType,
441}
442
443impl Default for DefaultPlainMessageProcessorImpl {
444 fn default() -> Self {
445 Self::new()
446 }
447}
448
449impl DefaultPlainMessageProcessorImpl {
450 /// Create a new default message processor
451 pub fn new() -> Self {
452 let logging_processor =
453 crate::message::PlainMessageProcessorType::Logging(LoggingPlainMessageProcessor);
454 let validation_processor =
455 crate::message::PlainMessageProcessorType::Validation(ValidationPlainMessageProcessor);
456
457 let mut processor = crate::message::CompositePlainMessageProcessor::new(Vec::new());
458 processor.add_processor(validation_processor);
459 processor.add_processor(logging_processor);
460
461 let processor = crate::message::PlainMessageProcessorType::Composite(processor);
462
463 Self { processor }
464 }
465}
466
467#[async_trait]
468impl PlainMessageProcessor for DefaultPlainMessageProcessorImpl {
469 async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
470 match &self.processor {
471 crate::message::PlainMessageProcessorType::Default(p) => {
472 p.process_incoming(message).await
473 }
474 crate::message::PlainMessageProcessorType::Logging(p) => {
475 p.process_incoming(message).await
476 }
477 crate::message::PlainMessageProcessorType::Validation(p) => {
478 p.process_incoming(message).await
479 }
480 crate::message::PlainMessageProcessorType::StateMachine(p) => {
481 p.process_incoming(message).await
482 }
483 crate::message::PlainMessageProcessorType::Composite(p) => {
484 p.process_incoming(message).await
485 }
486 }
487 }
488
489 async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
490 match &self.processor {
491 crate::message::PlainMessageProcessorType::Default(p) => {
492 p.process_outgoing(message).await
493 }
494 crate::message::PlainMessageProcessorType::Logging(p) => {
495 p.process_outgoing(message).await
496 }
497 crate::message::PlainMessageProcessorType::Validation(p) => {
498 p.process_outgoing(message).await
499 }
500 crate::message::PlainMessageProcessorType::StateMachine(p) => {
501 p.process_outgoing(message).await
502 }
503 crate::message::PlainMessageProcessorType::Composite(p) => {
504 p.process_outgoing(message).await
505 }
506 }
507 }
508}
509
510/// State machine integration processor
511///
512/// This processor integrates the message processing pipeline with the transaction state machine.
513/// It processes incoming TAP messages and updates transaction state accordingly.
514#[derive(Clone)]
515pub struct StateMachineIntegrationProcessor {
516 /// Arc-wrapped state processor for thread safety
517 state_processor: Option<Arc<dyn crate::state_machine::TransactionStateProcessor>>,
518}
519
520impl std::fmt::Debug for StateMachineIntegrationProcessor {
521 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
522 f.debug_struct("StateMachineIntegrationProcessor")
523 .field("state_processor", &self.state_processor.is_some())
524 .finish()
525 }
526}
527
528impl Default for StateMachineIntegrationProcessor {
529 fn default() -> Self {
530 Self::new()
531 }
532}
533
534impl StateMachineIntegrationProcessor {
535 /// Create a new state machine integration processor
536 pub fn new() -> Self {
537 Self {
538 state_processor: None,
539 }
540 }
541
542 /// Set the state processor
543 pub fn with_state_processor(
544 mut self,
545 processor: Arc<dyn crate::state_machine::TransactionStateProcessor>,
546 ) -> Self {
547 self.state_processor = Some(processor);
548 self
549 }
550}
551
552#[async_trait]
553impl PlainMessageProcessor for StateMachineIntegrationProcessor {
554 async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
555 // Process the message through the state machine if available
556 if let Some(state_processor) = &self.state_processor {
557 if let Err(e) = state_processor.process_message(&message).await {
558 log::warn!(
559 "State machine processing failed for message {}: {}",
560 message.id,
561 e
562 );
563 // Don't fail the message processing, just log the error
564 }
565 }
566
567 // Always pass the message through for further processing
568 Ok(Some(message))
569 }
570
571 async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
572 // For outgoing messages, we typically don't need state machine processing
573 // since they're already being sent by the state machine or agents
574 Ok(Some(message))
575 }
576}