use async_trait::async_trait;
use log::{debug, info};
use std::sync::Arc;
use tap_msg::didcomm::PlainMessage;
use crate::error::Result;
#[async_trait]
pub trait PlainMessageProcessor: Send + Sync + Clone {
async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>>;
async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>>;
}
#[derive(Debug, Clone)]
pub struct LoggingPlainMessageProcessor;
#[async_trait]
impl PlainMessageProcessor for LoggingPlainMessageProcessor {
async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
info!("Incoming message: {}", message.id);
debug!("PlainMessage content: {:?}", message);
Ok(Some(message))
}
async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
info!("Outgoing message: {}", message.id);
debug!("PlainMessage content: {:?}", message);
Ok(Some(message))
}
}
#[derive(Debug, Clone)]
pub struct ValidationPlainMessageProcessor;
#[async_trait]
impl PlainMessageProcessor for ValidationPlainMessageProcessor {
async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
debug!("Validating incoming message: {}", message.id);
if message.id.is_empty() {
info!("PlainMessage has empty ID, rejecting");
return Ok(None);
}
if message.typ.is_empty() {
info!("PlainMessage has empty type, rejecting");
return Ok(None);
}
if !message.from.is_empty() && !message.from.starts_with("did:") {
info!("Invalid 'from' DID format: {}", message.from);
return Ok(None);
}
if !message.to.is_empty() {
for recipient in &message.to {
if !recipient.starts_with("did:") {
info!("Invalid recipient DID format: {}", recipient);
return Ok(None);
}
}
}
if message.body == serde_json::json!(null) {
info!("PlainMessage has null body, rejecting");
return Ok(None);
}
if let Some(pthid) = &message.pthid {
if pthid.is_empty() {
info!("PlainMessage has empty parent thread ID, rejecting");
return Ok(None);
}
}
if let Some(created_time) = message.created_time {
let (normalized_created_time, now) = if created_time < 10_000_000_000 {
(
created_time * 1000,
chrono::Utc::now().timestamp_millis() as u64,
)
} else {
(created_time, chrono::Utc::now().timestamp_millis() as u64)
};
if normalized_created_time > now + 300_000 {
info!("PlainMessage has future timestamp, rejecting");
return Ok(None);
}
}
let message_type = &message.type_;
if message_type.starts_with("https://tap.rsvp/schema/") {
if !message_type.contains("Transfer")
&& !message_type.contains("Authorize")
&& !message_type.contains("Reject")
&& !message_type.contains("Settle")
&& !message_type.contains("Payment")
&& !message_type.contains("Connect")
&& !message_type.contains("Cancel")
&& !message_type.contains("Revert")
&& !message_type.contains("AddAgents")
&& !message_type.contains("ReplaceAgent")
&& !message_type.contains("RemoveAgent")
&& !message_type.contains("UpdateParty")
&& !message_type.contains("UpdatePolicies")
&& !message_type.contains("ConfirmRelationship")
&& !message_type.contains("OutOfBand")
&& !message_type.contains("AuthorizationRequired")
&& !message_type.contains("RequestPresentation")
&& !message_type.contains("Presentation")
&& !message_type.contains("Error")
{
info!("Unknown TAP message type: {}", message_type);
return Ok(None);
}
}
else if message_type.starts_with("https://didcomm.org/") {
if !message_type.contains("trust-ping")
&& !message_type.contains("basicmessage")
&& !message_type.contains("routing")
&& !message_type.contains("discover-features")
&& !message_type.contains("problem-report")
&& !message_type.contains("ack")
&& !message_type.contains("notification")
&& !message_type.contains("ping")
&& !message_type.contains("coordinate-mediation")
&& !message_type.contains("keylist")
&& !message_type.contains("out-of-band")
{
info!("Unknown DIDComm message type: {}", message_type);
}
}
else if !message_type.starts_with("https://tap.rsvp/schema/")
&& !message_type.starts_with("https://didcomm.org/")
{
info!("Unknown message protocol: {}", message_type);
return Ok(None);
}
Ok(Some(message))
}
async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
debug!("Validating outgoing message: {}", message.id);
if message.id.is_empty() {
info!("Outgoing message has empty ID, rejecting");
return Ok(None);
}
if message.typ.is_empty() {
info!("Outgoing message has empty type, rejecting");
return Ok(None);
}
if !message.from.is_empty() && !message.from.starts_with("did:") {
info!(
"Invalid 'from' DID format in outgoing message: {}",
message.from
);
return Ok(None);
}
if !message.to.is_empty() {
for recipient in &message.to {
if !recipient.starts_with("did:") {
info!(
"Invalid recipient DID format in outgoing message: {}",
recipient
);
return Ok(None);
}
}
}
if message.body == serde_json::json!(null) {
info!("Outgoing message has null body, rejecting");
return Ok(None);
}
if let Some(pthid) = &message.pthid {
if pthid.is_empty() {
info!("Outgoing message has empty parent thread ID, rejecting");
return Ok(None);
}
}
if let Some(created_time) = message.created_time {
let (normalized_created_time, now) = if created_time < 10_000_000_000 {
(
created_time * 1000,
chrono::Utc::now().timestamp_millis() as u64,
)
} else {
(created_time, chrono::Utc::now().timestamp_millis() as u64)
};
if normalized_created_time > now + 300_000 {
info!("Outgoing message has future timestamp, rejecting");
return Ok(None);
}
}
let message_type = &message.type_;
if message_type.starts_with("https://tap.rsvp/schema/") {
if !message_type.contains("Transfer")
&& !message_type.contains("Authorize")
&& !message_type.contains("Reject")
&& !message_type.contains("Settle")
&& !message_type.contains("Payment")
&& !message_type.contains("Connect")
&& !message_type.contains("Cancel")
&& !message_type.contains("Revert")
&& !message_type.contains("AddAgents")
&& !message_type.contains("ReplaceAgent")
&& !message_type.contains("RemoveAgent")
&& !message_type.contains("UpdateParty")
&& !message_type.contains("UpdatePolicies")
&& !message_type.contains("ConfirmRelationship")
&& !message_type.contains("OutOfBand")
&& !message_type.contains("AuthorizationRequired")
&& !message_type.contains("RequestPresentation")
&& !message_type.contains("Presentation")
&& !message_type.contains("Error")
{
info!(
"Unknown TAP message type in outgoing message: {}",
message_type
);
return Ok(None);
}
}
else if message_type.starts_with("https://didcomm.org/") {
if !message_type.contains("trust-ping")
&& !message_type.contains("basicmessage")
&& !message_type.contains("routing")
&& !message_type.contains("discover-features")
&& !message_type.contains("problem-report")
&& !message_type.contains("ack")
&& !message_type.contains("notification")
&& !message_type.contains("ping")
&& !message_type.contains("coordinate-mediation")
&& !message_type.contains("keylist")
&& !message_type.contains("out-of-band")
{
info!(
"Unknown DIDComm message type in outgoing message: {}",
message_type
);
}
}
else if !message_type.starts_with("https://tap.rsvp/schema/")
&& !message_type.starts_with("https://didcomm.org/")
{
info!(
"Unknown message protocol in outgoing message: {}",
message_type
);
return Ok(None);
}
Ok(Some(message))
}
}
#[derive(Debug, Clone)]
pub struct DefaultPlainMessageProcessor;
#[async_trait]
impl PlainMessageProcessor for DefaultPlainMessageProcessor {
async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
Ok(Some(message))
}
async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
Ok(Some(message))
}
}
#[derive(Clone, Debug)]
pub struct DefaultPlainMessageProcessorImpl {
processor: crate::message::PlainMessageProcessorType,
}
impl Default for DefaultPlainMessageProcessorImpl {
fn default() -> Self {
Self::new()
}
}
impl DefaultPlainMessageProcessorImpl {
pub fn new() -> Self {
let logging_processor =
crate::message::PlainMessageProcessorType::Logging(LoggingPlainMessageProcessor);
let validation_processor =
crate::message::PlainMessageProcessorType::Validation(ValidationPlainMessageProcessor);
let mut processor = crate::message::CompositePlainMessageProcessor::new(Vec::new());
processor.add_processor(validation_processor);
processor.add_processor(logging_processor);
let processor = crate::message::PlainMessageProcessorType::Composite(processor);
Self { processor }
}
}
#[async_trait]
impl PlainMessageProcessor for DefaultPlainMessageProcessorImpl {
async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
match &self.processor {
crate::message::PlainMessageProcessorType::Default(p) => {
p.process_incoming(message).await
}
crate::message::PlainMessageProcessorType::Logging(p) => {
p.process_incoming(message).await
}
crate::message::PlainMessageProcessorType::Validation(p) => {
p.process_incoming(message).await
}
crate::message::PlainMessageProcessorType::StateMachine(p) => {
p.process_incoming(message).await
}
crate::message::PlainMessageProcessorType::Composite(p) => {
p.process_incoming(message).await
}
crate::message::PlainMessageProcessorType::TravelRule(p) => {
p.process_incoming(message).await
}
crate::message::PlainMessageProcessorType::TrustPing(p) => {
p.process_incoming(message).await
}
}
}
async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
match &self.processor {
crate::message::PlainMessageProcessorType::Default(p) => {
p.process_outgoing(message).await
}
crate::message::PlainMessageProcessorType::Logging(p) => {
p.process_outgoing(message).await
}
crate::message::PlainMessageProcessorType::Validation(p) => {
p.process_outgoing(message).await
}
crate::message::PlainMessageProcessorType::StateMachine(p) => {
p.process_outgoing(message).await
}
crate::message::PlainMessageProcessorType::Composite(p) => {
p.process_outgoing(message).await
}
crate::message::PlainMessageProcessorType::TravelRule(p) => {
p.process_outgoing(message).await
}
crate::message::PlainMessageProcessorType::TrustPing(p) => {
p.process_outgoing(message).await
}
}
}
}
#[derive(Clone)]
pub struct StateMachineIntegrationProcessor {
state_processor: Option<Arc<dyn crate::state_machine::TransactionStateProcessor>>,
}
impl std::fmt::Debug for StateMachineIntegrationProcessor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StateMachineIntegrationProcessor")
.field("state_processor", &self.state_processor.is_some())
.finish()
}
}
impl Default for StateMachineIntegrationProcessor {
fn default() -> Self {
Self::new()
}
}
impl StateMachineIntegrationProcessor {
pub fn new() -> Self {
Self {
state_processor: None,
}
}
pub fn with_state_processor(
mut self,
processor: Arc<dyn crate::state_machine::TransactionStateProcessor>,
) -> Self {
self.state_processor = Some(processor);
self
}
}
#[async_trait]
impl PlainMessageProcessor for StateMachineIntegrationProcessor {
async fn process_incoming(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
if let Some(state_processor) = &self.state_processor {
if let Err(e) = state_processor.process_message(&message).await {
log::warn!(
"State machine processing failed for message {}: {}",
message.id,
e
);
}
}
Ok(Some(message))
}
async fn process_outgoing(&self, message: PlainMessage) -> Result<Option<PlainMessage>> {
Ok(Some(message))
}
}