use std::sync::Arc;
use tap_caip::{AssetId, ChainId};
use tap_msg::didcomm::PlainMessage;
use tap_msg::message::tap_message_trait::TapMessageBody;
use tap_msg::message::{Agent, Party, Payment, Transfer};
use tap_node::agent::AgentRegistry;
use tap_node::event::EventBus;
use tap_node::message::{PlainMessageProcessor, StateMachineIntegrationProcessor};
use tap_node::state_machine::fsm::DecisionMode;
use tap_node::state_machine::{StandardTransactionProcessor, TransactionStateProcessor};
use tap_node::storage::Storage;
fn test_agent_did(name: &str) -> String {
format!("did:test:{}", name)
}
fn test_party(name: &str) -> Party {
Party::new(&test_agent_did(name))
}
fn test_agent(name: &str, role: &str, for_party: &str) -> Agent {
Agent::new(&test_agent_did(name), role, &test_agent_did(for_party))
}
fn test_asset() -> AssetId {
let chain_id = ChainId::new("eip155", "1").unwrap();
AssetId::new(
chain_id,
"erc20",
"0x6b175474e89094c44da98b954eedeac495271d0f",
)
.unwrap()
}
#[tokio::test]
async fn test_complete_state_machine_integration() {
let storage = Arc::new(Storage::new(Some(":memory:".into())).await.unwrap());
let event_bus = Arc::new(EventBus::new());
let agents = Arc::new(AgentRegistry::new(None));
let state_processor = Arc::new(StandardTransactionProcessor::new(
storage.clone(),
event_bus.clone(),
agents.clone(),
DecisionMode::AutoApprove,
));
let integration_processor =
StateMachineIntegrationProcessor::new().with_state_processor(state_processor.clone());
let originator = test_party("alice");
let beneficiary = test_party("bob");
let compliance_agent = test_agent("compliance1", "compliance", "alice");
let transfer = Transfer {
asset: test_asset(),
originator: Some(originator),
beneficiary: Some(beneficiary),
amount: "100.0".to_string(),
agents: vec![compliance_agent],
memo: None,
settlement_id: None,
expiry: None,
transaction_value: None,
transaction_id: Some("test-tx-001".to_string()),
connection_id: None,
metadata: std::collections::HashMap::new(),
};
let mut plain_message = transfer.to_didcomm(&test_agent_did("alice")).unwrap();
plain_message.id = "test-tx-001".to_string();
let result = state_processor.process_message(&plain_message).await;
assert!(result.is_ok());
let integration_result = integration_processor
.process_incoming(plain_message.clone())
.await;
assert!(integration_result.is_ok());
assert!(integration_result.unwrap().is_some());
let stored_transaction = storage
.get_transaction_by_id(&plain_message.id)
.await
.unwrap();
assert!(stored_transaction.is_some());
let agent_authorized = storage
.are_all_agents_authorized(&plain_message.id)
.await
.unwrap();
assert!(!agent_authorized);
}
#[tokio::test]
async fn test_automatic_authorization() {
let storage = Arc::new(Storage::new(Some(":memory:".into())).await.unwrap());
let event_bus = Arc::new(EventBus::new());
let agents = Arc::new(AgentRegistry::new(None));
let state_processor = Arc::new(StandardTransactionProcessor::new(
storage.clone(),
event_bus.clone(),
agents.clone(),
DecisionMode::AutoApprove,
));
let customer = test_party("customer1");
let merchant = test_party("merchant1");
let compliance_agent = test_agent("compliance1", "compliance", "customer1");
let payment = Payment {
asset: Some(test_asset()),
amount: "50.0".to_string(),
currency_code: None,
supported_assets: None,
customer: Some(customer),
merchant,
transaction_id: Some("test-payment-001".to_string()),
memo: None,
expiry: None,
invoice: None,
agents: vec![compliance_agent],
connection_id: None,
fallback_settlement_addresses: None,
metadata: std::collections::HashMap::new(),
};
let mut plain_message = payment.to_didcomm(&test_agent_did("customer1")).unwrap();
plain_message.id = "test-payment-001".to_string();
let result = state_processor.process_message(&plain_message).await;
assert!(result.is_ok());
let stored_transaction = storage
.get_transaction_by_id(&plain_message.id)
.await
.unwrap();
assert!(stored_transaction.is_some());
let agent_authorized = storage
.are_all_agents_authorized(&plain_message.id)
.await
.unwrap();
assert!(!agent_authorized);
}
#[tokio::test]
async fn test_processing_pipeline_order() {
use tap_node::message::{CompositePlainMessageProcessor, PlainMessageProcessorType};
use tap_node::message::{LoggingPlainMessageProcessor, ValidationPlainMessageProcessor};
let storage = Arc::new(Storage::new(Some(":memory:".into())).await.unwrap());
let event_bus = Arc::new(EventBus::new());
let agents = Arc::new(AgentRegistry::new(None));
let state_processor = Arc::new(StandardTransactionProcessor::new(
storage.clone(),
event_bus.clone(),
agents.clone(),
DecisionMode::AutoApprove,
));
let mut composite = CompositePlainMessageProcessor::new(vec![]);
composite.add_processor(PlainMessageProcessorType::Validation(
ValidationPlainMessageProcessor,
));
composite.add_processor(PlainMessageProcessorType::Logging(
LoggingPlainMessageProcessor,
));
composite.add_processor(PlainMessageProcessorType::StateMachine(
StateMachineIntegrationProcessor::new().with_state_processor(state_processor),
));
let transfer = Transfer {
asset: test_asset(),
originator: Some(test_party("alice")),
beneficiary: Some(test_party("bob")),
amount: "100.0".to_string(),
agents: vec![test_agent("compliance1", "compliance", "alice")],
memo: None,
settlement_id: None,
expiry: None,
transaction_value: None,
transaction_id: Some("test-pipeline-001".to_string()),
connection_id: None,
metadata: std::collections::HashMap::new(),
};
let mut plain_message = transfer.to_didcomm(&test_agent_did("alice")).unwrap();
plain_message.id = "test-pipeline-001".to_string();
let result = composite.process_incoming(plain_message.clone()).await;
assert!(result.is_ok());
let processed_message = result.unwrap();
if processed_message.is_none() {
println!("Message was filtered out by the pipeline");
println!("Message typ: '{}'", plain_message.typ);
println!("Message type_: '{}'", plain_message.type_);
println!("Message ID: '{}'", plain_message.id);
println!("Message from: '{}'", plain_message.from);
println!("Message to: {:?}", plain_message.to);
println!("Message body: {:?}", plain_message.body);
}
assert!(processed_message.is_some());
let stored_transaction = storage
.get_transaction_by_id(&plain_message.id)
.await
.unwrap();
assert!(stored_transaction.is_some());
}
#[tokio::test]
async fn test_invalid_message_filtering() {
use tap_node::message::ValidationPlainMessageProcessor;
use tap_node::message::{CompositePlainMessageProcessor, PlainMessageProcessorType};
let storage = Arc::new(Storage::new(Some(":memory:".into())).await.unwrap());
let event_bus = Arc::new(EventBus::new());
let agents = Arc::new(AgentRegistry::new(None));
let state_processor = Arc::new(StandardTransactionProcessor::new(
storage.clone(),
event_bus.clone(),
agents.clone(),
DecisionMode::AutoApprove,
));
let mut composite = CompositePlainMessageProcessor::new(vec![]);
composite.add_processor(PlainMessageProcessorType::Validation(
ValidationPlainMessageProcessor,
));
composite.add_processor(PlainMessageProcessorType::StateMachine(
StateMachineIntegrationProcessor::new().with_state_processor(state_processor),
));
let invalid_message = PlainMessage {
id: "".to_string(), typ: "application/didcomm-plain+json".to_string(),
type_: "https://tap.rsvp/schema/1.0#Transfer".to_string(),
from: test_agent_did("alice"),
to: vec![test_agent_did("bob")],
thid: None,
pthid: None,
created_time: Some(chrono::Utc::now().timestamp() as u64),
expires_time: None,
extra_headers: std::collections::HashMap::new(),
from_prior: None,
body: serde_json::json!({}),
attachments: None,
};
let result = composite.process_incoming(invalid_message).await;
assert!(result.is_ok());
assert!(result.unwrap().is_none());
let transactions = storage.list_transactions(10, 0).await.unwrap();
assert_eq!(transactions.len(), 0);
}