#![cfg(feature = "automerge-backend")]
use automerge::transaction::Transactable;
use peat_protocol::network::{IrohTransport, PeerInfo};
use peat_protocol::qos::{PropagationDirection, Tombstone, TombstoneBatch, TombstoneSyncMessage};
use peat_protocol::storage::capabilities::SyncCapable;
use peat_protocol::storage::{AutomergeBackend, AutomergeStore};
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
fn create_peer_info_dynamic(name: &str, transport: &IrohTransport) -> PeerInfo {
use iroh::TransportAddr;
let endpoint_id = transport.endpoint_id();
let node_id_hex = hex::encode(endpoint_id.as_bytes());
let addr = transport.endpoint_addr();
let addresses: Vec<String> = addr
.addrs
.iter()
.filter_map(|a| {
if let TransportAddr::Ip(socket_addr) = a {
Some(socket_addr.to_string())
} else {
None
}
})
.collect();
PeerInfo {
name: name.to_string(),
node_id: node_id_hex,
addresses,
relay_url: None,
}
}
#[test]
fn test_tombstone_sync_message_wire_format() {
println!("=== Tombstone Sync Message Wire Format ===");
let tombstone = Tombstone::with_reason("doc-123", "tracks", "node-alpha", 42, "Test deletion");
let msg = TombstoneSyncMessage::new(tombstone, PropagationDirection::Bidirectional);
println!(" Original: {:?}", msg);
let encoded = msg.encode();
println!(" Encoded: {} bytes", encoded.len());
let decoded = TombstoneSyncMessage::decode(&encoded).expect("Should decode successfully");
println!(" Decoded: {:?}", decoded);
assert_eq!(msg.tombstone.document_id, decoded.tombstone.document_id);
assert_eq!(msg.tombstone.collection, decoded.tombstone.collection);
assert_eq!(msg.tombstone.deleted_by, decoded.tombstone.deleted_by);
assert_eq!(msg.tombstone.lamport, decoded.tombstone.lamport);
assert_eq!(msg.tombstone.reason, decoded.tombstone.reason);
assert_eq!(msg.direction, decoded.direction);
println!(" ✓ Wire format round-trip successful");
}
#[test]
fn test_tombstone_batch_wire_format() {
println!("=== Tombstone Batch Wire Format ===");
let tombstones = vec![
Tombstone::new("doc-1", "tracks", "node-a", 10),
Tombstone::with_reason("doc-2", "alerts", "node-b", 20, "Dismissed"),
Tombstone::new("doc-3", "nodes", "node-c", 30),
];
let batch = TombstoneBatch::from_tombstones(tombstones);
println!(" Batch size: {} tombstones", batch.len());
let encoded = batch.encode();
println!(" Encoded: {} bytes", encoded.len());
let decoded = TombstoneBatch::decode(&encoded).expect("Should decode successfully");
println!(" Decoded: {} tombstones", decoded.len());
assert_eq!(batch.len(), decoded.len());
assert_eq!(
batch.tombstones[0].tombstone.document_id,
decoded.tombstones[0].tombstone.document_id
);
assert_eq!(
batch.tombstones[1].tombstone.reason,
decoded.tombstones[1].tombstone.reason
);
assert_eq!(
batch.tombstones[2].tombstone.collection,
decoded.tombstones[2].tombstone.collection
);
println!(" ✓ Batch wire format round-trip successful");
}
#[test]
fn test_empty_tombstone_batch() {
println!("=== Empty Tombstone Batch ===");
let batch = TombstoneBatch::new();
assert!(batch.is_empty());
let encoded = batch.encode();
println!(" Encoded empty batch: {} bytes", encoded.len());
let decoded = TombstoneBatch::decode(&encoded).expect("Should decode empty batch");
assert!(decoded.is_empty());
println!(" ✓ Empty batch handled correctly");
}
#[test]
fn test_direction_defaults() {
println!("=== Direction-Based Propagation Defaults ===");
assert_eq!(
PropagationDirection::default_for_collection("tracks"),
PropagationDirection::Bidirectional
);
assert_eq!(
PropagationDirection::default_for_collection("nodes"),
PropagationDirection::Bidirectional
);
assert_eq!(
PropagationDirection::default_for_collection("alerts"),
PropagationDirection::Bidirectional
);
println!(" ✓ tracks/nodes/alerts → Bidirectional");
assert_eq!(
PropagationDirection::default_for_collection("cells"),
PropagationDirection::UpOnly
);
assert_eq!(
PropagationDirection::default_for_collection("contact_reports"),
PropagationDirection::UpOnly
);
println!(" ✓ cells/contact_reports → UpOnly");
assert_eq!(
PropagationDirection::default_for_collection("commands"),
PropagationDirection::DownOnly
);
println!(" ✓ commands → DownOnly");
assert!(PropagationDirection::Bidirectional.allows_up());
assert!(PropagationDirection::Bidirectional.allows_down());
assert!(PropagationDirection::UpOnly.allows_up());
assert!(!PropagationDirection::UpOnly.allows_down());
assert!(!PropagationDirection::DownOnly.allows_up());
assert!(PropagationDirection::DownOnly.allows_down());
assert!(PropagationDirection::SystemWide.allows_up());
assert!(PropagationDirection::SystemWide.allows_down());
println!(" ✓ Direction allows_up/allows_down correct");
}
#[test]
fn test_tombstone_sync_message_default_direction() {
println!("=== TombstoneSyncMessage Default Direction ===");
let tombstone = Tombstone::new("cmd-1", "commands", "leader", 1);
let msg = TombstoneSyncMessage::from_tombstone(tombstone);
assert_eq!(msg.direction, PropagationDirection::DownOnly);
println!(" ✓ commands → DownOnly");
let tombstone = Tombstone::new("report-1", "contact_reports", "soldier", 2);
let msg = TombstoneSyncMessage::from_tombstone(tombstone);
assert_eq!(msg.direction, PropagationDirection::UpOnly);
println!(" ✓ contact_reports → UpOnly");
let tombstone = Tombstone::new("track-1", "tracks", "any", 3);
let msg = TombstoneSyncMessage::from_tombstone(tombstone);
assert_eq!(msg.direction, PropagationDirection::Bidirectional);
println!(" ✓ tracks → Bidirectional");
}
#[test]
fn test_all_propagation_directions_encode_decode() {
println!("=== All Propagation Directions Encode/Decode ===");
let directions = [
PropagationDirection::Bidirectional,
PropagationDirection::UpOnly,
PropagationDirection::DownOnly,
PropagationDirection::SystemWide,
];
for direction in directions {
let tombstone = Tombstone::new("doc", "col", "node", 1);
let msg = TombstoneSyncMessage::new(tombstone, direction);
let encoded = msg.encode();
let decoded = TombstoneSyncMessage::decode(&encoded).unwrap();
assert_eq!(
direction, decoded.direction,
"Direction {:?} failed round-trip",
direction
);
println!(" ✓ {:?} round-trip OK", direction);
}
}
#[tokio::test]
async fn test_tombstone_storage() {
println!("=== Tombstone Storage ===");
let temp_dir = TempDir::new().unwrap();
let store = AutomergeStore::open(temp_dir.path()).unwrap();
let tombstones = store.get_all_tombstones().unwrap();
assert!(tombstones.is_empty());
println!(" ✓ Initially empty");
let t1 = Tombstone::new("doc-1", "tracks", "node-a", 10);
let t2 = Tombstone::with_reason("doc-2", "alerts", "node-b", 20, "Dismissed");
store.put_tombstone(&t1).unwrap();
store.put_tombstone(&t2).unwrap();
assert!(store.has_tombstone("tracks", "doc-1").unwrap());
assert!(store.has_tombstone("alerts", "doc-2").unwrap());
assert!(!store.has_tombstone("tracks", "doc-999").unwrap());
println!(" ✓ has_tombstone works");
let tombstones = store.get_all_tombstones().unwrap();
assert_eq!(tombstones.len(), 2);
println!(" ✓ Retrieved {} tombstones", tombstones.len());
}
#[tokio::test]
async fn test_two_node_tombstone_exchange() {
println!("=== Two-Node Tombstone Exchange ===");
let temp_dir1 = TempDir::new().unwrap();
let temp_dir2 = TempDir::new().unwrap();
let store1 = Arc::new(AutomergeStore::open(temp_dir1.path()).unwrap());
let store2 = Arc::new(AutomergeStore::open(temp_dir2.path()).unwrap());
let transport1 = Arc::new(IrohTransport::new().await.unwrap());
let transport2 = Arc::new(IrohTransport::new().await.unwrap());
let backend1 = AutomergeBackend::with_transport(Arc::clone(&store1), Arc::clone(&transport1));
let backend2 = AutomergeBackend::with_transport(Arc::clone(&store2), Arc::clone(&transport2));
println!(" Node 1 ID: {:?}", transport1.endpoint_id());
println!(" Node 2 ID: {:?}", transport2.endpoint_id());
let tombstone = Tombstone::new("doc-to-delete", "tracks", "node-1", 100);
store1.put_tombstone(&tombstone).unwrap();
println!(" ✓ Added tombstone on node1");
assert!(!store2.has_tombstone("tracks", "doc-to-delete").unwrap());
println!(" ✓ Node2 doesn't have tombstone yet");
transport1.start_accept_loop().unwrap();
transport2.start_accept_loop().unwrap();
println!(" ✓ Started accept loops on both nodes");
let peer2_info = create_peer_info_dynamic("node-2", &transport2);
let connected = transport1.connect_peer(&peer2_info).await.is_ok();
if !connected {
println!(" ⚠ Connection failed - skipping tombstone exchange test");
return;
}
println!(" ✓ Peers connected");
tokio::time::sleep(Duration::from_millis(500)).await;
assert!(backend1.sync_stats().is_ok());
assert!(backend2.sync_stats().is_ok());
println!(" ✓ Tombstone exchange test completed");
}
#[tokio::test]
async fn test_tombstone_prevents_resurrection() {
println!("=== Tombstone Prevents Resurrection ===");
let temp_dir = TempDir::new().unwrap();
let store = AutomergeStore::open(temp_dir.path()).unwrap();
let doc_key = "tracks:track-001";
let mut doc = automerge::Automerge::new();
doc.transact::<_, _, automerge::AutomergeError>(|tx| {
tx.put(automerge::ROOT, "name", "Test Track")?;
tx.put(automerge::ROOT, "status", "active")?;
Ok(())
})
.unwrap();
store.put(doc_key, &doc).unwrap();
println!(" ✓ Created document {}", doc_key);
let tombstone = Tombstone::new("track-001", "tracks", "admin", 999);
store.put_tombstone(&tombstone).unwrap();
println!(" ✓ Created tombstone");
assert!(store.has_tombstone("tracks", "track-001").unwrap());
println!(" ✓ Tombstone infrastructure validated");
}
#[tokio::test]
async fn test_full_tombstone_sync_e2e() {
use peat_protocol::discovery::peer::{PeerInfo, StaticDiscovery};
use peat_protocol::sync::automerge::AutomergeIrohBackend;
use peat_protocol::sync::traits::DataSyncBackend;
use peat_protocol::sync::types::{BackendConfig, Document, TransportConfig};
use std::collections::HashMap;
let _ = tracing_subscriber::fmt()
.with_env_filter("peat_protocol::storage::automerge_sync=debug")
.with_test_writer()
.try_init();
println!("=== Full E2E: Tombstone Syncs to Peer and Deletes Document ===");
let temp_a = TempDir::new().unwrap();
let temp_b = TempDir::new().unwrap();
let transport_a = Arc::new(IrohTransport::new().await.unwrap());
let store_a = Arc::new(AutomergeStore::open(temp_a.path()).unwrap());
let backend_a = Arc::new(AutomergeIrohBackend::from_parts(
Arc::clone(&store_a),
Arc::clone(&transport_a),
));
let transport_b = Arc::new(IrohTransport::new().await.unwrap());
let store_b = Arc::new(AutomergeStore::open(temp_b.path()).unwrap());
let backend_b = Arc::new(AutomergeIrohBackend::from_parts(
Arc::clone(&store_b),
Arc::clone(&transport_b),
));
let endpoint_a = transport_a.endpoint_id();
let endpoint_b = transport_b.endpoint_id();
let addr_a = transport_a.endpoint_addr();
let addr_b = transport_b.endpoint_addr();
println!(" Node A: {:?}", endpoint_a);
println!(" Node B: {:?}", endpoint_b);
let peer_b_info = PeerInfo {
name: "Node B".to_string(),
node_id: hex::encode(endpoint_b.as_bytes()),
addresses: addr_b.ip_addrs().map(|a| a.to_string()).collect(),
relay_url: addr_b.relay_urls().next().map(|u| u.to_string()),
};
backend_a
.add_discovery_strategy(Box::new(StaticDiscovery::from_peers(vec![peer_b_info])))
.await
.unwrap();
let peer_a_info = PeerInfo {
name: "Node A".to_string(),
node_id: hex::encode(endpoint_a.as_bytes()),
addresses: addr_a.ip_addrs().map(|a| a.to_string()).collect(),
relay_url: addr_a.relay_urls().next().map(|u| u.to_string()),
};
backend_b
.add_discovery_strategy(Box::new(StaticDiscovery::from_peers(vec![peer_a_info])))
.await
.unwrap();
let test_secret = peat_protocol::security::FormationKey::generate_secret();
let config_a = BackendConfig {
app_id: "tombstone-test".to_string(),
persistence_dir: temp_a.path().to_path_buf(),
shared_key: Some(test_secret.clone()),
transport: TransportConfig::default(),
extra: HashMap::new(),
};
let config_b = BackendConfig {
app_id: "tombstone-test".to_string(),
persistence_dir: temp_b.path().to_path_buf(),
shared_key: Some(test_secret),
transport: TransportConfig::default(),
extra: HashMap::new(),
};
backend_a.initialize(config_a).await.unwrap();
backend_b.initialize(config_b).await.unwrap();
backend_a.sync_engine().start_sync().await.unwrap();
backend_b.sync_engine().start_sync().await.unwrap();
println!(" Waiting for connection establishment...");
tokio::time::sleep(Duration::from_secs(7)).await;
let connected =
!transport_a.connected_peers().is_empty() || !transport_b.connected_peers().is_empty();
if !connected {
println!(" ⚠ Peers didn't connect - skipping full E2E test");
let _ = backend_a.shutdown().await;
let _ = backend_b.shutdown().await;
return;
}
println!(" ✓ Peers connected");
let doc_store_b = backend_b.document_store();
let mut fields = HashMap::new();
fields.insert("name".to_string(), serde_json::json!("Test Track"));
fields.insert("status".to_string(), serde_json::json!("active"));
let doc = Document {
id: Some("track-to-delete".to_string()),
fields,
updated_at: std::time::SystemTime::now(),
};
doc_store_b.upsert("tracks", doc).await.unwrap();
println!(" ✓ Document created on Node B");
println!(" Waiting for document sync to Node A...");
let doc_store_a = backend_a.document_store();
let mut doc_synced = false;
for i in 0..20 {
tokio::time::sleep(Duration::from_millis(500)).await;
let docs = doc_store_a
.query("tracks", &peat_protocol::sync::types::Query::All)
.await
.unwrap();
if docs
.iter()
.any(|d| d.id.as_deref() == Some("track-to-delete"))
{
println!(" ✓ Document synced to Node A after {} attempts", i + 1);
doc_synced = true;
break;
}
}
if !doc_synced {
println!(" ⚠ Document didn't sync to Node A - skipping tombstone test");
let _ = backend_a.shutdown().await;
let _ = backend_b.shutdown().await;
return;
}
let delete_result = doc_store_a
.delete(
"tracks",
&"track-to-delete".to_string(),
Some("Test deletion"),
)
.await
.expect("delete should succeed");
assert!(
delete_result.deleted,
"Delete should succeed for tracks (Tombstone policy)"
);
assert!(
delete_result.tombstone_id.is_some(),
"Should create a tombstone"
);
println!(" ✓ Document deleted on Node A via DocumentStore::delete()");
tokio::time::sleep(Duration::from_millis(200)).await;
println!(" Waiting for tombstone propagation to Node B...");
let mut tombstone_received = false;
for i in 0..20 {
tokio::time::sleep(Duration::from_millis(500)).await;
if store_b.has_tombstone("tracks", "track-to-delete").unwrap() {
println!(" ✓ Tombstone received on Node B after {} attempts", i + 1);
tombstone_received = true;
break;
}
}
assert!(
tombstone_received,
"Tombstone should propagate from Node A to Node B"
);
let doc_b = store_b.get("tracks:track-to-delete").unwrap();
assert!(
doc_b.is_none(),
"Document should be deleted on Node B after receiving tombstone"
);
println!(" ✓ Document deleted on Node B via tombstone sync");
let _ = backend_a.shutdown().await;
let _ = backend_b.shutdown().await;
println!(" ✓ Full E2E tombstone test completed");
}
#[tokio::test]
async fn test_tombstone_batch_exchange_on_connect() {
use peat_protocol::discovery::peer::{PeerInfo, StaticDiscovery};
use peat_protocol::sync::automerge::AutomergeIrohBackend;
use peat_protocol::sync::traits::DataSyncBackend;
use peat_protocol::sync::types::{BackendConfig, TransportConfig};
use std::collections::HashMap;
println!("=== Tombstone Batch Exchange on Connect ===");
let temp_a = TempDir::new().unwrap();
let temp_b = TempDir::new().unwrap();
let store_a = Arc::new(AutomergeStore::open(temp_a.path()).unwrap());
let tombstone1 = Tombstone::new("old-doc-1", "tracks", "node-a", 100);
let tombstone2 = Tombstone::new("old-doc-2", "alerts", "node-a", 200);
let tombstone3 =
Tombstone::with_reason("old-doc-3", "nodes", "node-a", 300, "Deleted by admin");
store_a.put_tombstone(&tombstone1).unwrap();
store_a.put_tombstone(&tombstone2).unwrap();
store_a.put_tombstone(&tombstone3).unwrap();
println!(" ✓ Node A has 3 pre-existing tombstones");
let transport_a = Arc::new(IrohTransport::new().await.unwrap());
let backend_a = Arc::new(AutomergeIrohBackend::from_parts(
Arc::clone(&store_a),
Arc::clone(&transport_a),
));
let transport_b = Arc::new(IrohTransport::new().await.unwrap());
let store_b = Arc::new(AutomergeStore::open(temp_b.path()).unwrap());
let backend_b = Arc::new(AutomergeIrohBackend::from_parts(
Arc::clone(&store_b),
Arc::clone(&transport_b),
));
assert!(store_b.get_all_tombstones().unwrap().is_empty());
println!(" ✓ Node B has 0 tombstones initially");
let endpoint_a = transport_a.endpoint_id();
let endpoint_b = transport_b.endpoint_id();
let addr_a = transport_a.endpoint_addr();
let addr_b = transport_b.endpoint_addr();
let peer_b_info = PeerInfo {
name: "Node B".to_string(),
node_id: hex::encode(endpoint_b.as_bytes()),
addresses: addr_b.ip_addrs().map(|a| a.to_string()).collect(),
relay_url: addr_b.relay_urls().next().map(|u| u.to_string()),
};
backend_a
.add_discovery_strategy(Box::new(StaticDiscovery::from_peers(vec![peer_b_info])))
.await
.unwrap();
let peer_a_info = PeerInfo {
name: "Node A".to_string(),
node_id: hex::encode(endpoint_a.as_bytes()),
addresses: addr_a.ip_addrs().map(|a| a.to_string()).collect(),
relay_url: addr_a.relay_urls().next().map(|u| u.to_string()),
};
backend_b
.add_discovery_strategy(Box::new(StaticDiscovery::from_peers(vec![peer_a_info])))
.await
.unwrap();
let test_secret = peat_protocol::security::FormationKey::generate_secret();
backend_a
.initialize(BackendConfig {
app_id: "batch-test".to_string(),
persistence_dir: temp_a.path().to_path_buf(),
shared_key: Some(test_secret.clone()),
transport: TransportConfig::default(),
extra: HashMap::new(),
})
.await
.unwrap();
backend_b
.initialize(BackendConfig {
app_id: "batch-test".to_string(),
persistence_dir: temp_b.path().to_path_buf(),
shared_key: Some(test_secret),
transport: TransportConfig::default(),
extra: HashMap::new(),
})
.await
.unwrap();
backend_a.sync_engine().start_sync().await.unwrap();
backend_b.sync_engine().start_sync().await.unwrap();
#[cfg(target_os = "linux")]
let deadline = Duration::from_secs(15);
#[cfg(not(target_os = "linux"))]
let deadline = Duration::from_secs(60);
println!(" Waiting for connection and tombstone batch exchange...");
let started = std::time::Instant::now();
let (tombstones_on_b, has_doc1, has_doc2, has_doc3) = loop {
let ts = store_b.get_all_tombstones().unwrap();
let d1 = store_b.has_tombstone("tracks", "old-doc-1").unwrap();
let d2 = store_b.has_tombstone("alerts", "old-doc-2").unwrap();
let d3 = store_b.has_tombstone("nodes", "old-doc-3").unwrap();
if !ts.is_empty() && d1 && d2 && d3 {
break (ts, d1, d2, d3);
}
if started.elapsed() >= deadline {
break (ts, d1, d2, d3);
}
tokio::time::sleep(Duration::from_millis(500)).await;
};
println!(
" Node B received {} tombstones after {:?}",
tombstones_on_b.len(),
started.elapsed()
);
println!(
" Tombstone 1 (tracks:old-doc-1): {}",
if has_doc1 { "✓" } else { "✗" }
);
println!(
" Tombstone 2 (alerts:old-doc-2): {}",
if has_doc2 { "✓" } else { "✗" }
);
println!(
" Tombstone 3 (nodes:old-doc-3): {}",
if has_doc3 { "✓" } else { "✗" }
);
let _ = backend_a.shutdown().await;
let _ = backend_b.shutdown().await;
assert!(
!tombstones_on_b.is_empty(),
"Node B should receive tombstones from Node A via batch exchange on connect"
);
assert!(has_doc1, "Tombstone for tracks:old-doc-1 should propagate");
assert!(has_doc2, "Tombstone for alerts:old-doc-2 should propagate");
assert!(has_doc3, "Tombstone for nodes:old-doc-3 should propagate");
println!(" ✓ Tombstone batch exchange working!");
}
#[tokio::test]
async fn test_tombstone_blocks_document_sync() {
println!("=== Tombstone Blocks Document Sync ===");
let temp_dir = TempDir::new().unwrap();
let store = AutomergeStore::open(temp_dir.path()).unwrap();
let tombstone = Tombstone::new("blocked-doc", "tracks", "admin", 9999);
store.put_tombstone(&tombstone).unwrap();
println!(" ✓ Created tombstone for tracks:blocked-doc");
assert!(store.has_tombstone("tracks", "blocked-doc").unwrap());
let has_tombstone = store.has_tombstone("tracks", "blocked-doc").unwrap();
if has_tombstone {
println!(" ✓ Tombstone found - document creation would be blocked");
} else {
println!(" ✗ Tombstone not found - document creation would proceed");
}
let existing = store.get_tombstone("tracks", "blocked-doc").unwrap();
if let Some(ts) = existing {
let older_update_lamport = 5000u64;
let would_reject = older_update_lamport < ts.lamport;
println!(
" ✓ Update with lamport {} {} by tombstone lamport {}",
older_update_lamport,
if would_reject { "blocked" } else { "allowed" },
ts.lamport
);
assert!(would_reject, "Older updates should be blocked");
}
println!(" ✓ Tombstone blocking mechanism validated");
}