peat-protocol 0.9.0-rc.10

Peat Coordination Protocol — hierarchical capability composition over CRDTs for heterogeneous mesh networks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
//! End-to-End Tests for Tombstone Sync Protocol (ADR-034 Phase 2)
//!
//! These tests validate tombstone synchronization between peers using the
//! wire format v3 message types (0x04 Tombstone, 0x05 TombstoneBatch, 0x06 TombstoneAck).
//!
//! # Test Focus
//!
//! - **Tombstone Wire Format**: TombstoneSyncMessage encode/decode
//! - **Batch Exchange**: Tombstone batches sent on peer connect
//! - **Direction Propagation**: Bidirectional vs UpOnly/DownOnly
//! - **Document Deletion**: Received tombstones delete local documents

#![cfg(feature = "automerge-backend")]

use automerge::transaction::Transactable;
use peat_protocol::network::{IrohTransport, PeerInfo};
use peat_protocol::qos::{PropagationDirection, Tombstone, TombstoneBatch, TombstoneSyncMessage};
use peat_protocol::storage::capabilities::SyncCapable;
use peat_protocol::storage::{AutomergeBackend, AutomergeStore};
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;

/// Helper to create PeerInfo from transport using its actual bound address
fn create_peer_info_dynamic(name: &str, transport: &IrohTransport) -> PeerInfo {
    use iroh::TransportAddr;

    let endpoint_id = transport.endpoint_id();
    let node_id_hex = hex::encode(endpoint_id.as_bytes());
    let addr = transport.endpoint_addr();

    let addresses: Vec<String> = addr
        .addrs
        .iter()
        .filter_map(|a| {
            if let TransportAddr::Ip(socket_addr) = a {
                Some(socket_addr.to_string())
            } else {
                None
            }
        })
        .collect();

    PeerInfo {
        name: name.to_string(),
        node_id: node_id_hex,
        addresses,
        relay_url: None,
    }
}

/// Test 1: TombstoneSyncMessage Encoding/Decoding
///
/// Validates the wire format for individual tombstone messages.
#[test]
fn test_tombstone_sync_message_wire_format() {
    println!("=== Tombstone Sync Message Wire Format ===");

    // Create a tombstone
    let tombstone = Tombstone::with_reason("doc-123", "tracks", "node-alpha", 42, "Test deletion");

    // Create sync message with bidirectional direction
    let msg = TombstoneSyncMessage::new(tombstone, PropagationDirection::Bidirectional);

    println!("  Original: {:?}", msg);

    // Encode to wire format
    let encoded = msg.encode();
    println!("  Encoded: {} bytes", encoded.len());

    // Decode back
    let decoded = TombstoneSyncMessage::decode(&encoded).expect("Should decode successfully");

    println!("  Decoded: {:?}", decoded);

    // Verify all fields match
    assert_eq!(msg.tombstone.document_id, decoded.tombstone.document_id);
    assert_eq!(msg.tombstone.collection, decoded.tombstone.collection);
    assert_eq!(msg.tombstone.deleted_by, decoded.tombstone.deleted_by);
    assert_eq!(msg.tombstone.lamport, decoded.tombstone.lamport);
    assert_eq!(msg.tombstone.reason, decoded.tombstone.reason);
    assert_eq!(msg.direction, decoded.direction);

    println!("  ✓ Wire format round-trip successful");
}

/// Test 2: TombstoneBatch Encoding/Decoding
///
/// Validates the wire format for tombstone batches.
#[test]
fn test_tombstone_batch_wire_format() {
    println!("=== Tombstone Batch Wire Format ===");

    // Create multiple tombstones
    let tombstones = vec![
        Tombstone::new("doc-1", "tracks", "node-a", 10),
        Tombstone::with_reason("doc-2", "alerts", "node-b", 20, "Dismissed"),
        Tombstone::new("doc-3", "nodes", "node-c", 30),
    ];

    // Create batch
    let batch = TombstoneBatch::from_tombstones(tombstones);

    println!("  Batch size: {} tombstones", batch.len());

    // Encode
    let encoded = batch.encode();
    println!("  Encoded: {} bytes", encoded.len());

    // Decode
    let decoded = TombstoneBatch::decode(&encoded).expect("Should decode successfully");

    println!("  Decoded: {} tombstones", decoded.len());

    // Verify
    assert_eq!(batch.len(), decoded.len());
    assert_eq!(
        batch.tombstones[0].tombstone.document_id,
        decoded.tombstones[0].tombstone.document_id
    );
    assert_eq!(
        batch.tombstones[1].tombstone.reason,
        decoded.tombstones[1].tombstone.reason
    );
    assert_eq!(
        batch.tombstones[2].tombstone.collection,
        decoded.tombstones[2].tombstone.collection
    );

    println!("  ✓ Batch wire format round-trip successful");
}

/// Test 3: Empty Batch Handling
///
/// Validates that empty batches encode/decode correctly.
#[test]
fn test_empty_tombstone_batch() {
    println!("=== Empty Tombstone Batch ===");

    let batch = TombstoneBatch::new();
    assert!(batch.is_empty());

    let encoded = batch.encode();
    println!("  Encoded empty batch: {} bytes", encoded.len());

    let decoded = TombstoneBatch::decode(&encoded).expect("Should decode empty batch");
    assert!(decoded.is_empty());

    println!("  ✓ Empty batch handled correctly");
}

/// Test 4: Direction-Based Propagation Defaults
///
/// Validates that default directions match ADR-034 strategy matrix.
#[test]
fn test_direction_defaults() {
    println!("=== Direction-Based Propagation Defaults ===");

    // Bidirectional for tracks, nodes, alerts
    assert_eq!(
        PropagationDirection::default_for_collection("tracks"),
        PropagationDirection::Bidirectional
    );
    assert_eq!(
        PropagationDirection::default_for_collection("nodes"),
        PropagationDirection::Bidirectional
    );
    assert_eq!(
        PropagationDirection::default_for_collection("alerts"),
        PropagationDirection::Bidirectional
    );
    println!("  ✓ tracks/nodes/alerts → Bidirectional");

    // UpOnly for cells, contact_reports
    assert_eq!(
        PropagationDirection::default_for_collection("cells"),
        PropagationDirection::UpOnly
    );
    assert_eq!(
        PropagationDirection::default_for_collection("contact_reports"),
        PropagationDirection::UpOnly
    );
    println!("  ✓ cells/contact_reports → UpOnly");

    // DownOnly for commands
    assert_eq!(
        PropagationDirection::default_for_collection("commands"),
        PropagationDirection::DownOnly
    );
    println!("  ✓ commands → DownOnly");

    // Verify allows_up/allows_down
    assert!(PropagationDirection::Bidirectional.allows_up());
    assert!(PropagationDirection::Bidirectional.allows_down());
    assert!(PropagationDirection::UpOnly.allows_up());
    assert!(!PropagationDirection::UpOnly.allows_down());
    assert!(!PropagationDirection::DownOnly.allows_up());
    assert!(PropagationDirection::DownOnly.allows_down());
    assert!(PropagationDirection::SystemWide.allows_up());
    assert!(PropagationDirection::SystemWide.allows_down());
    println!("  ✓ Direction allows_up/allows_down correct");
}

/// Test 5: TombstoneSyncMessage from_tombstone uses default direction
///
/// Validates that from_tombstone() uses the correct default direction.
#[test]
fn test_tombstone_sync_message_default_direction() {
    println!("=== TombstoneSyncMessage Default Direction ===");

    // commands collection should default to DownOnly
    let tombstone = Tombstone::new("cmd-1", "commands", "leader", 1);
    let msg = TombstoneSyncMessage::from_tombstone(tombstone);
    assert_eq!(msg.direction, PropagationDirection::DownOnly);
    println!("  ✓ commands → DownOnly");

    // contact_reports should default to UpOnly
    let tombstone = Tombstone::new("report-1", "contact_reports", "soldier", 2);
    let msg = TombstoneSyncMessage::from_tombstone(tombstone);
    assert_eq!(msg.direction, PropagationDirection::UpOnly);
    println!("  ✓ contact_reports → UpOnly");

    // tracks should default to Bidirectional
    let tombstone = Tombstone::new("track-1", "tracks", "any", 3);
    let msg = TombstoneSyncMessage::from_tombstone(tombstone);
    assert_eq!(msg.direction, PropagationDirection::Bidirectional);
    println!("  ✓ tracks → Bidirectional");
}

/// Test 6: All propagation directions encode/decode correctly
#[test]
fn test_all_propagation_directions_encode_decode() {
    println!("=== All Propagation Directions Encode/Decode ===");

    let directions = [
        PropagationDirection::Bidirectional,
        PropagationDirection::UpOnly,
        PropagationDirection::DownOnly,
        PropagationDirection::SystemWide,
    ];

    for direction in directions {
        let tombstone = Tombstone::new("doc", "col", "node", 1);
        let msg = TombstoneSyncMessage::new(tombstone, direction);
        let encoded = msg.encode();
        let decoded = TombstoneSyncMessage::decode(&encoded).unwrap();
        assert_eq!(
            direction, decoded.direction,
            "Direction {:?} failed round-trip",
            direction
        );
        println!("{:?} round-trip OK", direction);
    }
}

/// Test 7: Store and Retrieve Tombstones
///
/// Validates tombstone storage in AutomergeStore.
#[tokio::test]
async fn test_tombstone_storage() {
    println!("=== Tombstone Storage ===");

    let temp_dir = TempDir::new().unwrap();
    let store = AutomergeStore::open(temp_dir.path()).unwrap();

    // Initially no tombstones
    let tombstones = store.get_all_tombstones().unwrap();
    assert!(tombstones.is_empty());
    println!("  ✓ Initially empty");

    // Add tombstones
    let t1 = Tombstone::new("doc-1", "tracks", "node-a", 10);
    let t2 = Tombstone::with_reason("doc-2", "alerts", "node-b", 20, "Dismissed");

    store.put_tombstone(&t1).unwrap();
    store.put_tombstone(&t2).unwrap();

    // Check existence
    assert!(store.has_tombstone("tracks", "doc-1").unwrap());
    assert!(store.has_tombstone("alerts", "doc-2").unwrap());
    assert!(!store.has_tombstone("tracks", "doc-999").unwrap());
    println!("  ✓ has_tombstone works");

    // Get all tombstones
    let tombstones = store.get_all_tombstones().unwrap();
    assert_eq!(tombstones.len(), 2);
    println!("  ✓ Retrieved {} tombstones", tombstones.len());
}

/// Test 8: Two-Node Tombstone Exchange
///
/// Validates that tombstones are exchanged when peers connect.
#[tokio::test]
async fn test_two_node_tombstone_exchange() {
    println!("=== Two-Node Tombstone Exchange ===");

    // Create two backends with stores
    let temp_dir1 = TempDir::new().unwrap();
    let temp_dir2 = TempDir::new().unwrap();
    let store1 = Arc::new(AutomergeStore::open(temp_dir1.path()).unwrap());
    let store2 = Arc::new(AutomergeStore::open(temp_dir2.path()).unwrap());
    let transport1 = Arc::new(IrohTransport::new().await.unwrap());
    let transport2 = Arc::new(IrohTransport::new().await.unwrap());

    // Create backends using with_transport
    let backend1 = AutomergeBackend::with_transport(Arc::clone(&store1), Arc::clone(&transport1));
    let backend2 = AutomergeBackend::with_transport(Arc::clone(&store2), Arc::clone(&transport2));

    println!("  Node 1 ID: {:?}", transport1.endpoint_id());
    println!("  Node 2 ID: {:?}", transport2.endpoint_id());

    // Add a tombstone on node1 before connecting
    let tombstone = Tombstone::new("doc-to-delete", "tracks", "node-1", 100);
    store1.put_tombstone(&tombstone).unwrap();
    println!("  ✓ Added tombstone on node1");

    // Verify node2 doesn't have the tombstone yet
    assert!(!store2.has_tombstone("tracks", "doc-to-delete").unwrap());
    println!("  ✓ Node2 doesn't have tombstone yet");

    // Start accept loops
    transport1.start_accept_loop().unwrap();
    transport2.start_accept_loop().unwrap();
    println!("  ✓ Started accept loops on both nodes");

    // Connect peers
    let peer2_info = create_peer_info_dynamic("node-2", &transport2);
    let connected = transport1.connect_peer(&peer2_info).await.is_ok();

    if !connected {
        println!("  ⚠ Connection failed - skipping tombstone exchange test");
        return;
    }
    println!("  ✓ Peers connected");

    // Give time for potential tombstone exchange
    tokio::time::sleep(Duration::from_millis(500)).await;

    // Note: Full tombstone exchange requires the sync coordinator to be called
    // on connect, which happens in the PeatMesh layer. This test validates
    // the building blocks (store, transport) are in place.

    // Verify backends are valid
    assert!(backend1.sync_stats().is_ok());
    assert!(backend2.sync_stats().is_ok());

    println!("  ✓ Tombstone exchange test completed");
}

/// Test 9: Tombstone Prevents Resurrection
///
/// Validates that a document with a tombstone cannot be resurrected by sync.
#[tokio::test]
async fn test_tombstone_prevents_resurrection() {
    println!("=== Tombstone Prevents Resurrection ===");

    let temp_dir = TempDir::new().unwrap();
    let store = AutomergeStore::open(temp_dir.path()).unwrap();

    // Create a document
    let doc_key = "tracks:track-001";
    let mut doc = automerge::Automerge::new();
    doc.transact::<_, _, automerge::AutomergeError>(|tx| {
        tx.put(automerge::ROOT, "name", "Test Track")?;
        tx.put(automerge::ROOT, "status", "active")?;
        Ok(())
    })
    .unwrap();
    store.put(doc_key, &doc).unwrap();
    println!("  ✓ Created document {}", doc_key);

    // Create tombstone
    let tombstone = Tombstone::new("track-001", "tracks", "admin", 999);
    store.put_tombstone(&tombstone).unwrap();
    println!("  ✓ Created tombstone");

    // Verify tombstone exists
    assert!(store.has_tombstone("tracks", "track-001").unwrap());

    // In a real system, the sync coordinator would check for tombstones
    // before applying incoming document updates. This test validates
    // the tombstone infrastructure is in place.

    println!("  ✓ Tombstone infrastructure validated");
}

/// Test 10: Full E2E - Tombstone Syncs to Peer and Deletes Document
///
/// This is the CRITICAL E2E test that validates tombstones actually sync
/// between nodes and delete documents on the receiving side.
///
/// Flow:
/// 1. Node A and Node B connect
/// 2. Create document on Node B
/// 3. Wait for document to sync to Node A
/// 4. Delete document on Node A via DocumentStore::delete() (creates tombstone + propagates)
/// 5. Verify Node B receives tombstone via sync
/// 6. Verify document is deleted on Node B
#[tokio::test]
async fn test_full_tombstone_sync_e2e() {
    use peat_protocol::discovery::peer::{PeerInfo, StaticDiscovery};
    use peat_protocol::sync::automerge::AutomergeIrohBackend;
    use peat_protocol::sync::traits::DataSyncBackend;
    use peat_protocol::sync::types::{BackendConfig, Document, TransportConfig};
    use std::collections::HashMap;

    let _ = tracing_subscriber::fmt()
        .with_env_filter("peat_protocol::storage::automerge_sync=debug")
        .with_test_writer()
        .try_init();

    println!("=== Full E2E: Tombstone Syncs to Peer and Deletes Document ===");

    // Create two backends
    let temp_a = TempDir::new().unwrap();
    let temp_b = TempDir::new().unwrap();

    let transport_a = Arc::new(IrohTransport::new().await.unwrap());
    let store_a = Arc::new(AutomergeStore::open(temp_a.path()).unwrap());
    let backend_a = Arc::new(AutomergeIrohBackend::from_parts(
        Arc::clone(&store_a),
        Arc::clone(&transport_a),
    ));

    let transport_b = Arc::new(IrohTransport::new().await.unwrap());
    let store_b = Arc::new(AutomergeStore::open(temp_b.path()).unwrap());
    let backend_b = Arc::new(AutomergeIrohBackend::from_parts(
        Arc::clone(&store_b),
        Arc::clone(&transport_b),
    ));

    // Setup bidirectional discovery
    let endpoint_a = transport_a.endpoint_id();
    let endpoint_b = transport_b.endpoint_id();
    let addr_a = transport_a.endpoint_addr();
    let addr_b = transport_b.endpoint_addr();

    println!("  Node A: {:?}", endpoint_a);
    println!("  Node B: {:?}", endpoint_b);

    let peer_b_info = PeerInfo {
        name: "Node B".to_string(),
        node_id: hex::encode(endpoint_b.as_bytes()),
        addresses: addr_b.ip_addrs().map(|a| a.to_string()).collect(),
        relay_url: addr_b.relay_urls().next().map(|u| u.to_string()),
    };
    backend_a
        .add_discovery_strategy(Box::new(StaticDiscovery::from_peers(vec![peer_b_info])))
        .await
        .unwrap();

    let peer_a_info = PeerInfo {
        name: "Node A".to_string(),
        node_id: hex::encode(endpoint_a.as_bytes()),
        addresses: addr_a.ip_addrs().map(|a| a.to_string()).collect(),
        relay_url: addr_a.relay_urls().next().map(|u| u.to_string()),
    };
    backend_b
        .add_discovery_strategy(Box::new(StaticDiscovery::from_peers(vec![peer_a_info])))
        .await
        .unwrap();

    // Initialize with shared credentials
    let test_secret = peat_protocol::security::FormationKey::generate_secret();

    let config_a = BackendConfig {
        app_id: "tombstone-test".to_string(),
        persistence_dir: temp_a.path().to_path_buf(),
        shared_key: Some(test_secret.clone()),
        transport: TransportConfig::default(),
        extra: HashMap::new(),
    };

    let config_b = BackendConfig {
        app_id: "tombstone-test".to_string(),
        persistence_dir: temp_b.path().to_path_buf(),
        shared_key: Some(test_secret),
        transport: TransportConfig::default(),
        extra: HashMap::new(),
    };

    backend_a.initialize(config_a).await.unwrap();
    backend_b.initialize(config_b).await.unwrap();

    // Start sync
    backend_a.sync_engine().start_sync().await.unwrap();
    backend_b.sync_engine().start_sync().await.unwrap();

    // Wait for connection
    println!("  Waiting for connection establishment...");
    tokio::time::sleep(Duration::from_secs(7)).await;

    // Verify connection
    let connected =
        !transport_a.connected_peers().is_empty() || !transport_b.connected_peers().is_empty();
    if !connected {
        println!("  ⚠ Peers didn't connect - skipping full E2E test");
        let _ = backend_a.shutdown().await;
        let _ = backend_b.shutdown().await;
        return;
    }
    println!("  ✓ Peers connected");

    // Create a document on Node B
    let doc_store_b = backend_b.document_store();
    let mut fields = HashMap::new();
    fields.insert("name".to_string(), serde_json::json!("Test Track"));
    fields.insert("status".to_string(), serde_json::json!("active"));

    let doc = Document {
        id: Some("track-to-delete".to_string()),
        fields,
        updated_at: std::time::SystemTime::now(),
    };
    doc_store_b.upsert("tracks", doc).await.unwrap();
    println!("  ✓ Document created on Node B");

    // Wait for document to sync to Node A
    println!("  Waiting for document sync to Node A...");
    let doc_store_a = backend_a.document_store();
    let mut doc_synced = false;
    for i in 0..20 {
        tokio::time::sleep(Duration::from_millis(500)).await;
        let docs = doc_store_a
            .query("tracks", &peat_protocol::sync::types::Query::All)
            .await
            .unwrap();
        if docs
            .iter()
            .any(|d| d.id.as_deref() == Some("track-to-delete"))
        {
            println!("  ✓ Document synced to Node A after {} attempts", i + 1);
            doc_synced = true;
            break;
        }
    }

    if !doc_synced {
        println!("  ⚠ Document didn't sync to Node A - skipping tombstone test");
        let _ = backend_a.shutdown().await;
        let _ = backend_b.shutdown().await;
        return;
    }

    // Delete via the DocumentStore API (routes through IrohDocumentStore::delete
    // which creates the tombstone, stores it, removes the doc, and propagates
    // to all connected peers via sync_tombstones_with_peer)
    let delete_result = doc_store_a
        .delete(
            "tracks",
            &"track-to-delete".to_string(),
            Some("Test deletion"),
        )
        .await
        .expect("delete should succeed");
    assert!(
        delete_result.deleted,
        "Delete should succeed for tracks (Tombstone policy)"
    );
    assert!(
        delete_result.tombstone_id.is_some(),
        "Should create a tombstone"
    );
    println!("  ✓ Document deleted on Node A via DocumentStore::delete()");

    // Brief pause to let the spawned propagation task run
    tokio::time::sleep(Duration::from_millis(200)).await;

    // Wait for tombstone to propagate to Node B
    println!("  Waiting for tombstone propagation to Node B...");
    let mut tombstone_received = false;
    for i in 0..20 {
        tokio::time::sleep(Duration::from_millis(500)).await;
        if store_b.has_tombstone("tracks", "track-to-delete").unwrap() {
            println!("  ✓ Tombstone received on Node B after {} attempts", i + 1);
            tombstone_received = true;
            break;
        }
    }

    assert!(
        tombstone_received,
        "Tombstone should propagate from Node A to Node B"
    );

    // Verify document was deleted on Node B
    let doc_b = store_b.get("tracks:track-to-delete").unwrap();
    assert!(
        doc_b.is_none(),
        "Document should be deleted on Node B after receiving tombstone"
    );
    println!("  ✓ Document deleted on Node B via tombstone sync");

    // Cleanup
    let _ = backend_a.shutdown().await;
    let _ = backend_b.shutdown().await;

    println!("  ✓ Full E2E tombstone test completed");
}

/// Test 11: Tombstone Batch Exchange on Fresh Connect
///
/// Validates that when two nodes connect for the first time,
/// existing tombstones are exchanged immediately.
#[tokio::test]
async fn test_tombstone_batch_exchange_on_connect() {
    use peat_protocol::discovery::peer::{PeerInfo, StaticDiscovery};
    use peat_protocol::sync::automerge::AutomergeIrohBackend;
    use peat_protocol::sync::traits::DataSyncBackend;
    use peat_protocol::sync::types::{BackendConfig, TransportConfig};
    use std::collections::HashMap;

    println!("=== Tombstone Batch Exchange on Connect ===");

    // Create Node A with pre-existing tombstones (before connection)
    let temp_a = TempDir::new().unwrap();
    let temp_b = TempDir::new().unwrap();

    let store_a = Arc::new(AutomergeStore::open(temp_a.path()).unwrap());

    // Add tombstones to Node A BEFORE starting sync
    let tombstone1 = Tombstone::new("old-doc-1", "tracks", "node-a", 100);
    let tombstone2 = Tombstone::new("old-doc-2", "alerts", "node-a", 200);
    let tombstone3 =
        Tombstone::with_reason("old-doc-3", "nodes", "node-a", 300, "Deleted by admin");
    store_a.put_tombstone(&tombstone1).unwrap();
    store_a.put_tombstone(&tombstone2).unwrap();
    store_a.put_tombstone(&tombstone3).unwrap();
    println!("  ✓ Node A has 3 pre-existing tombstones");

    // Now create backends
    let transport_a = Arc::new(IrohTransport::new().await.unwrap());
    let backend_a = Arc::new(AutomergeIrohBackend::from_parts(
        Arc::clone(&store_a),
        Arc::clone(&transport_a),
    ));

    let transport_b = Arc::new(IrohTransport::new().await.unwrap());
    let store_b = Arc::new(AutomergeStore::open(temp_b.path()).unwrap());
    let backend_b = Arc::new(AutomergeIrohBackend::from_parts(
        Arc::clone(&store_b),
        Arc::clone(&transport_b),
    ));

    // Verify Node B has no tombstones initially
    assert!(store_b.get_all_tombstones().unwrap().is_empty());
    println!("  ✓ Node B has 0 tombstones initially");

    // Setup bidirectional discovery
    let endpoint_a = transport_a.endpoint_id();
    let endpoint_b = transport_b.endpoint_id();
    let addr_a = transport_a.endpoint_addr();
    let addr_b = transport_b.endpoint_addr();

    let peer_b_info = PeerInfo {
        name: "Node B".to_string(),
        node_id: hex::encode(endpoint_b.as_bytes()),
        addresses: addr_b.ip_addrs().map(|a| a.to_string()).collect(),
        relay_url: addr_b.relay_urls().next().map(|u| u.to_string()),
    };
    backend_a
        .add_discovery_strategy(Box::new(StaticDiscovery::from_peers(vec![peer_b_info])))
        .await
        .unwrap();

    let peer_a_info = PeerInfo {
        name: "Node A".to_string(),
        node_id: hex::encode(endpoint_a.as_bytes()),
        addresses: addr_a.ip_addrs().map(|a| a.to_string()).collect(),
        relay_url: addr_a.relay_urls().next().map(|u| u.to_string()),
    };
    backend_b
        .add_discovery_strategy(Box::new(StaticDiscovery::from_peers(vec![peer_a_info])))
        .await
        .unwrap();

    // Initialize and start sync
    let test_secret = peat_protocol::security::FormationKey::generate_secret();

    backend_a
        .initialize(BackendConfig {
            app_id: "batch-test".to_string(),
            persistence_dir: temp_a.path().to_path_buf(),
            shared_key: Some(test_secret.clone()),
            transport: TransportConfig::default(),
            extra: HashMap::new(),
        })
        .await
        .unwrap();

    backend_b
        .initialize(BackendConfig {
            app_id: "batch-test".to_string(),
            persistence_dir: temp_b.path().to_path_buf(),
            shared_key: Some(test_secret),
            transport: TransportConfig::default(),
            extra: HashMap::new(),
        })
        .await
        .unwrap();

    backend_a.sync_engine().start_sync().await.unwrap();
    backend_b.sync_engine().start_sync().await.unwrap();

    // Wait for connection and tombstone batch exchange. Poll with a
    // deadline rather than a fixed sleep — macOS loopback QUIC handshake
    // runs ~3–5× slower than Linux CI, and raw sleeps flake when the
    // machine is busy. Linux sees completion well under 8s; give Darwin
    // a generous ceiling.
    #[cfg(target_os = "linux")]
    let deadline = Duration::from_secs(15);
    #[cfg(not(target_os = "linux"))]
    let deadline = Duration::from_secs(60);
    println!("  Waiting for connection and tombstone batch exchange...");
    let started = std::time::Instant::now();
    let (tombstones_on_b, has_doc1, has_doc2, has_doc3) = loop {
        let ts = store_b.get_all_tombstones().unwrap();
        let d1 = store_b.has_tombstone("tracks", "old-doc-1").unwrap();
        let d2 = store_b.has_tombstone("alerts", "old-doc-2").unwrap();
        let d3 = store_b.has_tombstone("nodes", "old-doc-3").unwrap();
        if !ts.is_empty() && d1 && d2 && d3 {
            break (ts, d1, d2, d3);
        }
        if started.elapsed() >= deadline {
            break (ts, d1, d2, d3);
        }
        tokio::time::sleep(Duration::from_millis(500)).await;
    };
    println!(
        "  Node B received {} tombstones after {:?}",
        tombstones_on_b.len(),
        started.elapsed()
    );

    println!(
        "  Tombstone 1 (tracks:old-doc-1): {}",
        if has_doc1 { "" } else { "" }
    );
    println!(
        "  Tombstone 2 (alerts:old-doc-2): {}",
        if has_doc2 { "" } else { "" }
    );
    println!(
        "  Tombstone 3 (nodes:old-doc-3): {}",
        if has_doc3 { "" } else { "" }
    );

    // Cleanup
    let _ = backend_a.shutdown().await;
    let _ = backend_b.shutdown().await;

    assert!(
        !tombstones_on_b.is_empty(),
        "Node B should receive tombstones from Node A via batch exchange on connect"
    );
    assert!(has_doc1, "Tombstone for tracks:old-doc-1 should propagate");
    assert!(has_doc2, "Tombstone for alerts:old-doc-2 should propagate");
    assert!(has_doc3, "Tombstone for nodes:old-doc-3 should propagate");
    println!("  ✓ Tombstone batch exchange working!");
}

/// Test 12: Document Sync Blocked by Existing Tombstone
///
/// Validates that if Node B has a tombstone for a document,
/// syncing that document from Node A is blocked.
#[tokio::test]
async fn test_tombstone_blocks_document_sync() {
    println!("=== Tombstone Blocks Document Sync ===");

    let temp_dir = TempDir::new().unwrap();
    let store = AutomergeStore::open(temp_dir.path()).unwrap();

    // Create a tombstone first
    let tombstone = Tombstone::new("blocked-doc", "tracks", "admin", 9999);
    store.put_tombstone(&tombstone).unwrap();
    println!("  ✓ Created tombstone for tracks:blocked-doc");

    // Verify tombstone exists
    assert!(store.has_tombstone("tracks", "blocked-doc").unwrap());

    // Now try to create a document with the same ID
    // In the real sync flow, the coordinator checks has_tombstone before applying
    // This test validates the tombstone lookup mechanism works
    let has_tombstone = store.has_tombstone("tracks", "blocked-doc").unwrap();

    if has_tombstone {
        println!("  ✓ Tombstone found - document creation would be blocked");
    } else {
        println!("  ✗ Tombstone not found - document creation would proceed");
    }

    // Verify lamport comparison would reject older updates
    let existing = store.get_tombstone("tracks", "blocked-doc").unwrap();
    if let Some(ts) = existing {
        // An update with lamport < 9999 should be rejected
        let older_update_lamport = 5000u64;
        let would_reject = older_update_lamport < ts.lamport;
        println!(
            "  ✓ Update with lamport {} {} by tombstone lamport {}",
            older_update_lamport,
            if would_reject { "blocked" } else { "allowed" },
            ts.lamport
        );
        assert!(would_reject, "Older updates should be blocked");
    }

    println!("  ✓ Tombstone blocking mechanism validated");
}