peat-protocol 0.9.0-rc.10

Peat Coordination Protocol — hierarchical capability composition over CRDTs for heterogeneous mesh networks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
//! End-to-End Tests for Startup Optimizations
//!
//! These tests validate the startup performance optimizations for the AutomergeIroh backend:
//!
//! 1. **Fast Transport Constructor**: `from_seed_at_addr()` creates working transports without mDNS
//! 2. **Deferred mDNS Discovery**: `enable_mdns_discovery()` can be called after transport creation
//! 3. **Parallel Initialization**: Store and transport can be initialized concurrently
//! 4. **Functional Sync**: Fast-created transports can still sync documents with peers
//!
//! # Background
//!
//! The AutomergeIroh backend was observed to have significantly longer startup times than
//! the Ditto backend in large-scale deployments (384-node hierarchical simulations).
//! This caused Docker API timeouts and required staged deployment workarounds.
//!
//! These optimizations reduce startup intensity by:
//! - Running store opening and transport creation in parallel
//! - Deferring mDNS discovery initialization until after critical startup path
//! - Providing a fast constructor that skips mDNS entirely for static peer configurations

#![cfg(feature = "automerge-backend")]

use iroh::TransportAddr;
use peat_protocol::network::{IrohTransport, PeerInfo};
use peat_protocol::storage::capabilities::SyncCapable;
use peat_protocol::storage::{AutomergeBackend, AutomergeStore};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tempfile::TempDir;

/// Test that the fast transport constructor (without mDNS) creates a functional transport
#[tokio::test]
async fn test_fast_transport_constructor_creates_functional_transport() {
    let seed = "test-fast-constructor/node-1";
    let bind_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();

    // Create transport using fast constructor (no mDNS)
    let transport = IrohTransport::from_seed_at_addr(seed, bind_addr)
        .await
        .expect("Fast constructor should succeed");

    // Verify transport is functional
    assert!(
        !transport.has_discovery(),
        "Fast constructor should NOT enable mDNS"
    );

    let endpoint_id = transport.endpoint_id();
    assert!(
        !endpoint_id.as_bytes().is_empty(),
        "Should have valid endpoint ID"
    );

    let addr = transport.endpoint_addr();
    assert!(!addr.addrs.is_empty(), "Should have bound addresses");

    // Verify deterministic key derivation
    let expected_id = IrohTransport::endpoint_id_from_seed(seed);
    assert_eq!(
        endpoint_id, expected_id,
        "Fast constructor should produce deterministic endpoint ID"
    );
}

/// Test that deferred mDNS discovery can be enabled after transport creation
#[tokio::test]
async fn test_deferred_mdns_discovery_enablement() {
    let seed = "test-deferred-mdns/node-1";
    let bind_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();

    // Create transport without mDNS
    let transport = IrohTransport::from_seed_at_addr(seed, bind_addr)
        .await
        .expect("Fast constructor should succeed");

    assert!(!transport.has_discovery(), "Should start without mDNS");

    // Enable mDNS discovery after creation
    transport
        .enable_mdns_discovery()
        .await
        .expect("Deferred mDNS enablement should succeed");

    assert!(
        transport.has_discovery(),
        "Should have mDNS after enablement"
    );

    // Verify mDNS discovery is accessible
    let mdns = transport.mdns_discovery();
    assert!(mdns.is_some(), "Should be able to access mDNS discovery");
}

/// Test that enabling mDNS twice fails gracefully
#[tokio::test]
async fn test_double_mdns_enablement_fails() {
    let seed = "test-double-mdns/node-1";
    let bind_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();

    let transport = IrohTransport::from_seed_at_addr(seed, bind_addr)
        .await
        .expect("Fast constructor should succeed");

    // First enablement should succeed
    transport
        .enable_mdns_discovery()
        .await
        .expect("First mDNS enablement should succeed");

    // Second enablement should fail
    let result = transport.enable_mdns_discovery().await;
    assert!(result.is_err(), "Double mDNS enablement should fail");
    assert!(
        result.unwrap_err().to_string().contains("already enabled"),
        "Error should indicate mDNS is already enabled"
    );
}

/// Test that fast constructor is measurably faster than mDNS constructor
#[tokio::test]
async fn test_fast_constructor_is_faster_than_mdns_constructor() {
    let bind_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();

    // Warm up (first transport creation may have one-time costs)
    let _ = IrohTransport::from_seed_at_addr("warmup", bind_addr).await;

    // Measure fast constructor (no mDNS) - run sequentially for consistent timing
    let mut fast_times = Vec::new();
    for i in 0..3 {
        let start = Instant::now();
        let seed = format!("fast-timing-test/node-{}", i);
        let _ = IrohTransport::from_seed_at_addr(&seed, bind_addr)
            .await
            .unwrap();
        fast_times.push(start.elapsed().as_millis());
    }

    // Measure mDNS constructor
    let mut mdns_times = Vec::new();
    for i in 0..3 {
        let start = Instant::now();
        let seed = format!("mdns-timing-test/node-{}", i);
        let _ = IrohTransport::from_seed_with_discovery_at_addr(&seed, bind_addr)
            .await
            .unwrap();
        mdns_times.push(start.elapsed().as_millis());
    }

    let avg_fast: u128 = fast_times.iter().sum::<u128>() / fast_times.len() as u128;
    let avg_mdns: u128 = mdns_times.iter().sum::<u128>() / mdns_times.len() as u128;

    // Telemetry-only: wall-clock comparison flaked on shared CI runners.
    // The functional guarantee (fast constructor skips mDNS setup) is covered
    // by test_fast_transport_constructor_creates_functional_transport. Perf
    // tracking for this lives in the benchmark job (see issue #786).
    eprintln!(
        "[STARTUP TIMING] Fast constructor avg: {}ms, mDNS constructor avg: {}ms",
        avg_fast, avg_mdns
    );
}

/// Test that parallel store + transport initialization works correctly
#[tokio::test]
async fn test_parallel_store_and_transport_initialization() {
    let temp_dir = TempDir::new().unwrap();
    let seed = "parallel-init-test/node-1";
    let bind_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
    let storage_path = temp_dir.path().to_path_buf();

    let start = Instant::now();

    // Run store and transport creation in parallel (simulating FFI create_node pattern)
    let (store_result, transport_result) = tokio::join!(
        tokio::task::spawn_blocking({
            let path = storage_path.clone();
            move || AutomergeStore::open(&path)
        }),
        IrohTransport::from_seed_at_addr(seed, bind_addr)
    );

    let parallel_time = start.elapsed();

    let store = Arc::new(store_result.unwrap().unwrap());
    let transport = Arc::new(transport_result.unwrap());

    eprintln!(
        "[STARTUP TIMING] Parallel store+transport init: {}ms",
        parallel_time.as_millis()
    );

    // Verify both are functional
    assert!(!store.is_in_memory());
    assert!(!transport.endpoint_id().as_bytes().is_empty());

    // Create backend to verify they work together
    let backend = AutomergeBackend::with_transport(store, transport);
    assert!(backend.sync_stats().is_ok());
}

/// Test that a transport created with fast constructor can establish peer connections
///
/// Note: Full document sync requires the AutomergeIrohBackend with FormationKey authentication.
/// This test validates that the fast constructor creates transports capable of P2P connections.
#[tokio::test]
async fn test_fast_transport_can_connect_to_peers() {
    // Create two nodes using fast constructor
    let transport1 = Arc::new(
        IrohTransport::from_seed_at_addr("connect-test/node-1", "127.0.0.1:0".parse().unwrap())
            .await
            .unwrap(),
    );
    let transport2 = Arc::new(
        IrohTransport::from_seed_at_addr("connect-test/node-2", "127.0.0.1:0".parse().unwrap())
            .await
            .unwrap(),
    );

    // Start accept loops so transports can receive connections
    transport1.start_accept_loop().unwrap();
    transport2.start_accept_loop().unwrap();

    // Get actual bound addresses
    let addr2 = get_first_ip_addr(&transport2);

    let peer2_info = PeerInfo {
        name: "node-2".to_string(),
        node_id: hex::encode(transport2.endpoint_id().as_bytes()),
        addresses: vec![addr2.to_string()],
        relay_url: None,
    };

    // Connect transport1 to transport2
    transport1.connect_peer(&peer2_info).await.unwrap();

    // Poll until at least one side registers the connection, with a hard ceiling
    // that IS the failure signal (no fixed sleep + optimistic assert).
    let connect_deadline = Duration::from_secs(5);
    let poll_interval = Duration::from_millis(25);
    let connected = tokio::time::timeout(connect_deadline, async {
        loop {
            if transport1.peer_count() > 0 || transport2.peer_count() > 0 {
                return;
            }
            tokio::time::sleep(poll_interval).await;
        }
    })
    .await
    .is_ok();

    let peer_count_1 = transport1.peer_count();
    let peer_count_2 = transport2.peer_count();

    eprintln!(
        "[FAST TRANSPORT] Connection test - transport1 peers: {}, transport2 peers: {}",
        peer_count_1, peer_count_2
    );

    assert!(
        connected,
        "Peers should connect using fast-created transports (no mDNS required); \
         timed out after {:?} with peer counts {}/{}",
        connect_deadline, peer_count_1, peer_count_2
    );

    // Cleanup
    let _ = transport1.stop_accept_loop();
    let _ = transport2.stop_accept_loop();
}

/// Test startup timing comparison: sequential vs parallel initialization
#[tokio::test]
async fn test_sequential_vs_parallel_initialization_timing() {
    let bind_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();

    // Sequential initialization (old pattern)
    let sequential_time = {
        let temp_dir = TempDir::new().unwrap();
        let start = Instant::now();

        let store = AutomergeStore::open(temp_dir.path()).unwrap();
        let _transport = IrohTransport::from_seed_at_addr("sequential/node", bind_addr)
            .await
            .unwrap();

        drop(store);
        start.elapsed()
    };

    // Parallel initialization (new pattern)
    let parallel_time = {
        let temp_dir = TempDir::new().unwrap();
        let storage_path = temp_dir.path().to_path_buf();
        let start = Instant::now();

        let (store_result, transport_result) = tokio::join!(
            tokio::task::spawn_blocking({
                let path = storage_path.clone();
                move || AutomergeStore::open(&path)
            }),
            IrohTransport::from_seed_at_addr("parallel/node", bind_addr)
        );

        let _ = store_result.unwrap().unwrap();
        let _ = transport_result.unwrap();
        start.elapsed()
    };

    // Telemetry-only: wall-clock comparisons flaked on shared CI runners even
    // with tolerance windows. Performance-regression tracking belongs in the
    // benchmark job (see issue #786), not a yes/no correctness gate.
    eprintln!(
        "[STARTUP TIMING] Sequential: {}ms, Parallel: {}ms, Improvement: {:.1}%",
        sequential_time.as_millis(),
        parallel_time.as_millis(),
        (1.0 - parallel_time.as_secs_f64() / sequential_time.as_secs_f64()) * 100.0
    );
}

/// Test that mimics FFI create_node timing to show actual startup performance
///
/// This test replicates the initialization pattern from peat-ffi/src/lib.rs create_node()
/// to provide timing output for the optimized startup path.
#[tokio::test]
async fn test_full_startup_timing_like_ffi() {
    use std::time::Instant;

    let temp_dir = TempDir::new().unwrap();
    let storage_path = temp_dir.path().to_path_buf();
    let bind_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
    let seed = "timing-test/full-startup";

    let total_start = Instant::now();

    // Phase 1: Parallel store + transport initialization (like FFI)
    let phase_start = Instant::now();
    let storage_path_for_store = storage_path.clone();

    let (store_result, transport_result) = tokio::join!(
        tokio::task::spawn_blocking(move || {
            let start = Instant::now();
            let result = AutomergeStore::open(&storage_path_for_store);
            (result, start.elapsed().as_millis())
        }),
        async {
            let start = Instant::now();
            let result = IrohTransport::from_seed_at_addr(seed, bind_addr).await;
            (result, start.elapsed().as_millis())
        }
    );

    let (store, store_ms) = store_result.unwrap();
    let store = Arc::new(store.unwrap());
    let (transport, transport_ms) = transport_result;
    let transport = Arc::new(transport.unwrap());
    let parallel_ms = phase_start.elapsed().as_millis();

    // Phase 2: Create backend
    let phase_start = Instant::now();
    let backend = AutomergeBackend::with_transport(Arc::clone(&store), Arc::clone(&transport));
    let backend_ms = phase_start.elapsed().as_millis();

    // Phase 3: Start sync (like sync_backend.initialize in FFI)
    let phase_start = Instant::now();
    backend.start_sync().unwrap();
    let sync_init_ms = phase_start.elapsed().as_millis();

    let total_ms = total_start.elapsed().as_millis();

    // Output timing in same format as FFI
    eprintln!("\n=== FFI-EQUIVALENT STARTUP TIMING ===");
    eprintln!("[Peat TIMING] Store open: {}ms", store_ms);
    eprintln!(
        "[Peat TIMING] Transport create (no mDNS): {}ms",
        transport_ms
    );
    eprintln!(
        "[Peat TIMING] Parallel total (max of above): {}ms",
        parallel_ms
    );
    eprintln!("[Peat TIMING] Backend creation: {}ms", backend_ms);
    eprintln!("[Peat TIMING] Sync init: {}ms", sync_init_ms);
    eprintln!("[Peat TIMING] === TOTAL: {}ms ===\n", total_ms);

    // Cleanup
    backend.stop_sync().unwrap();

    // Telemetry-only: a hard wall-clock budget flaked on shared CI runners
    // (e.g. 1366ms on a 1000ms budget). Startup-performance tracking belongs
    // in the benchmark job (see issue #786). This test still proves the FFI
    // startup sequence completes end-to-end via the unwraps above.
    let _ = total_ms;
}

/// Helper to extract first IP address from transport
fn get_first_ip_addr(transport: &IrohTransport) -> SocketAddr {
    let addr = transport.endpoint_addr();
    addr.addrs
        .iter()
        .find_map(|a| {
            if let TransportAddr::Ip(socket_addr) = a {
                Some(*socket_addr)
            } else {
                None
            }
        })
        .expect("Transport should have at least one IP address")
}