tap-node 0.6.0

Transaction Authorization Protocol (TAP) node implementation for routing and processing messages
Documentation
use std::sync::Arc;
use tap_agent::TapAgent;
use tap_msg::{
    didcomm::PlainMessage,
    message::{transfer::Transfer, Party},
};
use tap_node::{NodeConfig, TapNode};
use tempfile::TempDir;

/// Test that internal message delivery between two agents on the same node
/// correctly populates all three tables:
/// 1. Delivery records in the deliveries table (for tracking delivery to recipient)
/// 2. Receiver's transactions table
/// 3. Receiver's received table
#[tokio::test]
async fn test_internal_message_delivery_all_tables() -> Result<(), Box<dyn std::error::Error>> {
    // Create a temporary directory for storage
    let temp_dir = TempDir::new()?;
    let tap_root = temp_dir.path().to_path_buf();

    // Create TAP Node with storage enabled
    let config = NodeConfig {
        tap_root: Some(tap_root.clone()),
        enable_message_logging: true,
        log_message_content: true,
        ..Default::default()
    };

    let node = Arc::new(TapNode::new(config));

    // Create two agents
    let (agent1, agent1_did) = TapAgent::from_ephemeral_key().await?;
    let (agent2, agent2_did) = TapAgent::from_ephemeral_key().await?;

    println!("Agent 1 DID: {}", agent1_did);
    println!("Agent 2 DID: {}", agent2_did);

    // Register both agents with the node
    node.register_agent(Arc::new(agent1)).await?;
    node.register_agent(Arc::new(agent2)).await?;

    // Create a transfer message from agent1 to agent2
    let transfer = Transfer {
        transaction_id: Some("test-tx-123".to_string()),
        asset: "eip155:1/erc20:0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48".parse()?,
        amount: "1000.00".to_string(),
        originator: Some(Party::new(&agent1_did)),
        beneficiary: Some(Party::new(&agent2_did)),
        agents: vec![],
        memo: Some("Test internal transfer".to_string()),
        settlement_id: None,
        expiry: None,
        transaction_value: None,
        connection_id: None,
        metadata: Default::default(),
    };

    // Create a PlainMessage
    let transfer_message = PlainMessage {
        id: "transfer-msg-456".to_string(),
        typ: "application/didcomm-plain+json".to_string(),
        type_: "https://tap.rsvp/message/transfer".to_string(),
        body: serde_json::to_value(&transfer)?,
        from: agent1_did.clone(),
        to: vec![agent2_did.clone()],
        thid: Some("test-thread-789".to_string()),
        pthid: None,
        extra_headers: Default::default(),
        attachments: None,
        created_time: Some(chrono::Utc::now().timestamp() as u64),
        expires_time: None,
        from_prior: None,
    };

    // Convert to JSON for the receive_message call
    let message_json = serde_json::to_value(&transfer_message)?;

    // Process the message
    node.receive_message(message_json).await?;

    // Allow some time for storage operations to complete
    tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;

    // Get the agent storage manager
    let storage_manager = node
        .agent_storage_manager()
        .expect("Storage manager should exist");

    // 1. Verify delivery record exists (deliveries are tracked in sender's storage)
    let sender_storage = storage_manager.get_agent_storage(&agent1_did).await?;

    // Get deliveries for this specific message
    let deliveries = sender_storage
        .get_deliveries_for_message("transfer-msg-456")
        .await?;
    println!("Deliveries for message count: {}", deliveries.len());

    if !deliveries.is_empty() {
        println!("✓ Delivery record found in deliveries table");
        let delivery = &deliveries[0];
        assert_eq!(delivery.message_id, "transfer-msg-456");
        assert_eq!(delivery.recipient_did, agent2_did);
        println!("  - Message ID: {}", delivery.message_id);
        println!("  - Recipient: {}", delivery.recipient_did);
        println!("  - Status: {:?}", delivery.status);
        println!("  - Delivery Type: {:?}", delivery.delivery_type);
    } else {
        // Alternative: Check if the message was sent from agent1
        let messages = sender_storage.list_messages(10, 0, None).await?;
        println!("Sender has {} messages in their log", messages.len());

        // For internal deliveries, the delivery record might be in receiver's storage
        let receiver_deliveries = sender_storage
            .get_deliveries_by_recipient(&agent2_did, 10, 0)
            .await?;
        println!("Deliveries to receiver: {}", receiver_deliveries.len());
    }

    // 2. Verify receiver's transactions table
    let receiver_storage = storage_manager.get_agent_storage(&agent2_did).await?;

    let transactions = receiver_storage.list_transactions(10, 0).await?;
    println!("\nReceiver transactions count: {}", transactions.len());

    assert!(
        !transactions.is_empty(),
        "Receiver should have the transaction"
    );
    let transaction = &transactions[0];
    assert_eq!(transaction.reference_id, "transfer-msg-456");
    println!("✓ Transaction record found in receiver's transactions table");
    println!("  - Reference ID: {}", transaction.reference_id);
    println!("  - Type: {:?}", transaction.transaction_type);
    println!("  - Status: {:?}", transaction.status);

    // 3. Verify receiver's received table
    let received_messages = receiver_storage.list_received(10, 0, None, None).await?;
    println!(
        "\nReceiver received messages count: {}",
        received_messages.len()
    );

    if !received_messages.is_empty() {
        println!("✓ Received record found in receiver's received table");
        let received = &received_messages[0];
        println!("  - Record ID: {}", received.id);
        println!("  - Message ID: {:?}", received.message_id);
        println!("  - Source Type: {:?}", received.source_type);
        println!("  - Status: {:?}", received.status);

        // The message_id might be extracted from the raw message
        if let Some(msg_id) = &received.message_id {
            assert_eq!(msg_id, "transfer-msg-456");
        }
    } else {
        println!("❌ No received records found in receiver's received table (this was the bug!)");
    }

    // Additional verification: Check message content in messages table
    let messages = receiver_storage.list_messages(10, 0, None).await?;
    println!("\nReceiver messages count: {}", messages.len());

    if !messages.is_empty() {
        println!("✓ Message record found in receiver's messages table");
        let message = &messages[0];
        assert_eq!(message.message_id, "transfer-msg-456");
        assert_eq!(message.from_did, Some(agent1_did.clone()));
        assert_eq!(message.to_did, Some(agent2_did.clone()));
    }

    // Check for any pending deliveries (should be none if delivered successfully)
    let pending_deliveries = receiver_storage.get_pending_deliveries(10, 0).await?;
    println!("\nPending deliveries: {}", pending_deliveries.len());

    println!("\n✅ Test completed - checking all tables!");

    // Final summary
    println!("\n=== Summary ===");
    println!("Deliveries for message: {} records", deliveries.len());
    println!(
        "Receiver's transactions table: {} records",
        transactions.len()
    );
    println!(
        "Receiver's received table: {} records",
        received_messages.len()
    );
    println!("Receiver's messages table: {} records", messages.len());

    Ok(())
}

/// Test that internal delivery works correctly for non-transaction messages
#[tokio::test]
async fn test_internal_delivery_non_transaction_message() -> Result<(), Box<dyn std::error::Error>>
{
    let temp_dir = TempDir::new()?;
    let tap_root = temp_dir.path().to_path_buf();

    let config = NodeConfig {
        tap_root: Some(tap_root.clone()),
        enable_message_logging: true,
        ..Default::default()
    };

    let node = Arc::new(TapNode::new(config));

    // Create two agents
    let (agent1, agent1_did) = TapAgent::from_ephemeral_key().await?;
    let (agent2, agent2_did) = TapAgent::from_ephemeral_key().await?;

    node.register_agent(Arc::new(agent1)).await?;
    node.register_agent(Arc::new(agent2)).await?;

    // Create a basic message (non-transaction)
    let basic_message = serde_json::json!({
        "id": "basic-msg-123",
        "type": "https://didcomm.org/basicmessage/1.0/message",
        "from": agent1_did,
        "to": [agent2_did.clone()],
        "created_time": chrono::Utc::now().timestamp(),
        "body": {
            "content": "Hello from agent1!"
        }
    });

    // Process the message
    node.receive_message(basic_message).await?;
    tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;

    let storage_manager = node
        .agent_storage_manager()
        .expect("Storage manager should exist");

    // Check sender's deliveries
    let sender_storage = storage_manager.get_agent_storage(&agent1_did).await?;
    let deliveries = sender_storage
        .get_deliveries_for_message("basic-msg-123")
        .await?;
    println!("Non-transaction message - Deliveries: {}", deliveries.len());

    // Check receiver's received table
    let receiver_storage = storage_manager.get_agent_storage(&agent2_did).await?;
    let received = receiver_storage.list_received(10, 0, None, None).await?;
    println!(
        "Non-transaction message - Receiver received: {}",
        received.len()
    );

    // Non-transaction messages should NOT appear in transactions table
    let transactions = receiver_storage.list_transactions(10, 0).await?;
    assert!(
        transactions.is_empty(),
        "Non-transaction messages should not be in transactions table"
    );

    // But should appear in messages table
    let messages = receiver_storage.list_messages(10, 0, None).await?;
    assert!(
        !messages.is_empty(),
        "Message should be logged in messages table"
    );

    println!("✅ Non-transaction message delivery test passed!");

    Ok(())
}

/// Test concurrent internal deliveries
#[tokio::test]
async fn test_concurrent_internal_deliveries() -> Result<(), Box<dyn std::error::Error>> {
    let temp_dir = TempDir::new()?;
    let tap_root = temp_dir.path().to_path_buf();

    let config = NodeConfig {
        tap_root: Some(tap_root.clone()),
        enable_message_logging: true,
        ..Default::default()
    };

    let node = Arc::new(TapNode::new(config));

    // Create three agents
    let (agent1, did1) = TapAgent::from_ephemeral_key().await?;
    let (agent2, did2) = TapAgent::from_ephemeral_key().await?;
    let (agent3, did3) = TapAgent::from_ephemeral_key().await?;

    node.register_agent(Arc::new(agent1)).await?;
    node.register_agent(Arc::new(agent2)).await?;
    node.register_agent(Arc::new(agent3)).await?;

    let agents = [did1.clone(), did2.clone(), did3.clone()];

    // Each agent sends a message to every other agent
    let mut handles = vec![];

    for (i, sender_did) in agents.iter().enumerate() {
        for (j, receiver_did) in agents.iter().enumerate() {
            if i != j {
                let node_clone = node.clone();
                let sender = sender_did.clone();
                let receiver = receiver_did.clone();
                let message_id = format!("msg-{}-to-{}", i, j);

                let handle = tokio::spawn(async move {
                    let message = serde_json::json!({
                        "id": message_id,
                        "type": "https://didcomm.org/basicmessage/1.0/message",
                        "from": sender,
                        "to": [receiver],
                        "created_time": chrono::Utc::now().timestamp(),
                        "body": {
                            "content": format!("Hello from agent {} to agent {}", i, j)
                        }
                    });

                    node_clone.receive_message(message).await
                });

                handles.push(handle);
            }
        }
    }

    // Wait for all messages to be processed
    for handle in handles {
        handle.await??;
    }

    tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;

    // Verify results
    let storage_manager = node
        .agent_storage_manager()
        .expect("Storage manager should exist");

    for (i, agent_did) in agents.iter().enumerate() {
        let storage = storage_manager.get_agent_storage(agent_did).await?;

        // Check received messages
        let received = storage.list_received(10, 0, None, None).await?;
        println!("Agent {} received {} messages", i, received.len());

        // Check messages in log
        let _messages = storage.list_messages(10, 0, None).await?;

        // Each agent should have received 2 messages
        assert_eq!(
            received.len(),
            2,
            "Agent {} should have received 2 messages",
            i
        );
    }

    println!("✅ Concurrent delivery test passed!");

    Ok(())
}