use std::sync::Arc;
use tap_agent::TapAgent;
use tap_msg::{
didcomm::PlainMessage,
message::{transfer::Transfer, Party},
};
use tap_node::{NodeConfig, TapNode};
use tempfile::TempDir;
#[tokio::test]
async fn test_internal_message_delivery_all_tables() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let tap_root = temp_dir.path().to_path_buf();
let config = NodeConfig {
tap_root: Some(tap_root.clone()),
enable_message_logging: true,
log_message_content: true,
..Default::default()
};
let node = Arc::new(TapNode::new(config));
let (agent1, agent1_did) = TapAgent::from_ephemeral_key().await?;
let (agent2, agent2_did) = TapAgent::from_ephemeral_key().await?;
println!("Agent 1 DID: {}", agent1_did);
println!("Agent 2 DID: {}", agent2_did);
node.register_agent(Arc::new(agent1)).await?;
node.register_agent(Arc::new(agent2)).await?;
let transfer = Transfer {
transaction_id: Some("test-tx-123".to_string()),
asset: "eip155:1/erc20:0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48".parse()?,
amount: "1000.00".to_string(),
originator: Some(Party::new(&agent1_did)),
beneficiary: Some(Party::new(&agent2_did)),
agents: vec![],
memo: Some("Test internal transfer".to_string()),
settlement_id: None,
expiry: None,
transaction_value: None,
connection_id: None,
metadata: Default::default(),
};
let transfer_message = PlainMessage {
id: "transfer-msg-456".to_string(),
typ: "application/didcomm-plain+json".to_string(),
type_: "https://tap.rsvp/message/transfer".to_string(),
body: serde_json::to_value(&transfer)?,
from: agent1_did.clone(),
to: vec![agent2_did.clone()],
thid: Some("test-thread-789".to_string()),
pthid: None,
extra_headers: Default::default(),
attachments: None,
created_time: Some(chrono::Utc::now().timestamp() as u64),
expires_time: None,
from_prior: None,
};
let message_json = serde_json::to_value(&transfer_message)?;
node.receive_message(message_json).await?;
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let storage_manager = node
.agent_storage_manager()
.expect("Storage manager should exist");
let sender_storage = storage_manager.get_agent_storage(&agent1_did).await?;
let deliveries = sender_storage
.get_deliveries_for_message("transfer-msg-456")
.await?;
println!("Deliveries for message count: {}", deliveries.len());
if !deliveries.is_empty() {
println!("✓ Delivery record found in deliveries table");
let delivery = &deliveries[0];
assert_eq!(delivery.message_id, "transfer-msg-456");
assert_eq!(delivery.recipient_did, agent2_did);
println!(" - Message ID: {}", delivery.message_id);
println!(" - Recipient: {}", delivery.recipient_did);
println!(" - Status: {:?}", delivery.status);
println!(" - Delivery Type: {:?}", delivery.delivery_type);
} else {
let messages = sender_storage.list_messages(10, 0, None).await?;
println!("Sender has {} messages in their log", messages.len());
let receiver_deliveries = sender_storage
.get_deliveries_by_recipient(&agent2_did, 10, 0)
.await?;
println!("Deliveries to receiver: {}", receiver_deliveries.len());
}
let receiver_storage = storage_manager.get_agent_storage(&agent2_did).await?;
let transactions = receiver_storage.list_transactions(10, 0).await?;
println!("\nReceiver transactions count: {}", transactions.len());
assert!(
!transactions.is_empty(),
"Receiver should have the transaction"
);
let transaction = &transactions[0];
assert_eq!(transaction.reference_id, "transfer-msg-456");
println!("✓ Transaction record found in receiver's transactions table");
println!(" - Reference ID: {}", transaction.reference_id);
println!(" - Type: {:?}", transaction.transaction_type);
println!(" - Status: {:?}", transaction.status);
let received_messages = receiver_storage.list_received(10, 0, None, None).await?;
println!(
"\nReceiver received messages count: {}",
received_messages.len()
);
if !received_messages.is_empty() {
println!("✓ Received record found in receiver's received table");
let received = &received_messages[0];
println!(" - Record ID: {}", received.id);
println!(" - Message ID: {:?}", received.message_id);
println!(" - Source Type: {:?}", received.source_type);
println!(" - Status: {:?}", received.status);
if let Some(msg_id) = &received.message_id {
assert_eq!(msg_id, "transfer-msg-456");
}
} else {
println!("❌ No received records found in receiver's received table (this was the bug!)");
}
let messages = receiver_storage.list_messages(10, 0, None).await?;
println!("\nReceiver messages count: {}", messages.len());
if !messages.is_empty() {
println!("✓ Message record found in receiver's messages table");
let message = &messages[0];
assert_eq!(message.message_id, "transfer-msg-456");
assert_eq!(message.from_did, Some(agent1_did.clone()));
assert_eq!(message.to_did, Some(agent2_did.clone()));
}
let pending_deliveries = receiver_storage.get_pending_deliveries(10, 0).await?;
println!("\nPending deliveries: {}", pending_deliveries.len());
println!("\n✅ Test completed - checking all tables!");
println!("\n=== Summary ===");
println!("Deliveries for message: {} records", deliveries.len());
println!(
"Receiver's transactions table: {} records",
transactions.len()
);
println!(
"Receiver's received table: {} records",
received_messages.len()
);
println!("Receiver's messages table: {} records", messages.len());
Ok(())
}
#[tokio::test]
async fn test_internal_delivery_non_transaction_message() -> Result<(), Box<dyn std::error::Error>>
{
let temp_dir = TempDir::new()?;
let tap_root = temp_dir.path().to_path_buf();
let config = NodeConfig {
tap_root: Some(tap_root.clone()),
enable_message_logging: true,
..Default::default()
};
let node = Arc::new(TapNode::new(config));
let (agent1, agent1_did) = TapAgent::from_ephemeral_key().await?;
let (agent2, agent2_did) = TapAgent::from_ephemeral_key().await?;
node.register_agent(Arc::new(agent1)).await?;
node.register_agent(Arc::new(agent2)).await?;
let basic_message = serde_json::json!({
"id": "basic-msg-123",
"type": "https://didcomm.org/basicmessage/1.0/message",
"from": agent1_did,
"to": [agent2_did.clone()],
"created_time": chrono::Utc::now().timestamp(),
"body": {
"content": "Hello from agent1!"
}
});
node.receive_message(basic_message).await?;
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let storage_manager = node
.agent_storage_manager()
.expect("Storage manager should exist");
let sender_storage = storage_manager.get_agent_storage(&agent1_did).await?;
let deliveries = sender_storage
.get_deliveries_for_message("basic-msg-123")
.await?;
println!("Non-transaction message - Deliveries: {}", deliveries.len());
let receiver_storage = storage_manager.get_agent_storage(&agent2_did).await?;
let received = receiver_storage.list_received(10, 0, None, None).await?;
println!(
"Non-transaction message - Receiver received: {}",
received.len()
);
let transactions = receiver_storage.list_transactions(10, 0).await?;
assert!(
transactions.is_empty(),
"Non-transaction messages should not be in transactions table"
);
let messages = receiver_storage.list_messages(10, 0, None).await?;
assert!(
!messages.is_empty(),
"Message should be logged in messages table"
);
println!("✅ Non-transaction message delivery test passed!");
Ok(())
}
#[tokio::test]
async fn test_concurrent_internal_deliveries() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let tap_root = temp_dir.path().to_path_buf();
let config = NodeConfig {
tap_root: Some(tap_root.clone()),
enable_message_logging: true,
..Default::default()
};
let node = Arc::new(TapNode::new(config));
let (agent1, did1) = TapAgent::from_ephemeral_key().await?;
let (agent2, did2) = TapAgent::from_ephemeral_key().await?;
let (agent3, did3) = TapAgent::from_ephemeral_key().await?;
node.register_agent(Arc::new(agent1)).await?;
node.register_agent(Arc::new(agent2)).await?;
node.register_agent(Arc::new(agent3)).await?;
let agents = [did1.clone(), did2.clone(), did3.clone()];
let mut handles = vec![];
for (i, sender_did) in agents.iter().enumerate() {
for (j, receiver_did) in agents.iter().enumerate() {
if i != j {
let node_clone = node.clone();
let sender = sender_did.clone();
let receiver = receiver_did.clone();
let message_id = format!("msg-{}-to-{}", i, j);
let handle = tokio::spawn(async move {
let message = serde_json::json!({
"id": message_id,
"type": "https://didcomm.org/basicmessage/1.0/message",
"from": sender,
"to": [receiver],
"created_time": chrono::Utc::now().timestamp(),
"body": {
"content": format!("Hello from agent {} to agent {}", i, j)
}
});
node_clone.receive_message(message).await
});
handles.push(handle);
}
}
}
for handle in handles {
handle.await??;
}
tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
let storage_manager = node
.agent_storage_manager()
.expect("Storage manager should exist");
for (i, agent_did) in agents.iter().enumerate() {
let storage = storage_manager.get_agent_storage(agent_did).await?;
let received = storage.list_received(10, 0, None, None).await?;
println!("Agent {} received {} messages", i, received.len());
let _messages = storage.list_messages(10, 0, None).await?;
assert_eq!(
received.len(),
2,
"Agent {} should have received 2 messages",
i
);
}
println!("✅ Concurrent delivery test passed!");
Ok(())
}