freenet 0.2.29

Freenet core software
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
use anyhow::{Context, anyhow, bail};
use freenet::test_utils::{
    self, TestContext, TestResult, make_get, make_put, make_subscribe, make_update,
};
use freenet_macros::freenet_test;
use freenet_stdlib::{
    client_api::{ClientRequest, ContractResponse, HostResponse, NodeQuery, QueryResponse, WebApi},
    prelude::*,
};
use std::{net::SocketAddr, time::Duration};
use tokio_tungstenite::connect_async;

/// Query a node for its connected peers with resilient error handling.
///
/// Returns `Some(peers)` if the query succeeded, `None` if it failed (timeout, error, etc.).
/// All failures are logged with the attempt number for diagnostics.
///
/// This helper is designed for use in retry loops where transient failures should not
/// immediately fail the test.
async fn query_connected_peers(
    client: &mut WebApi,
    node_name: &str,
    timeout: Duration,
    attempt: u32,
) -> Option<Vec<(String, SocketAddr)>> {
    // Send the query
    if let Err(e) = client
        .send(ClientRequest::NodeQueries(NodeQuery::ConnectedPeers))
        .await
    {
        tracing::warn!(
            attempt,
            node = node_name,
            error = %e,
            "Failed to send query, will retry..."
        );
        return None;
    }

    // Wait for response with timeout
    match tokio::time::timeout(timeout, client.recv()).await {
        Ok(Ok(HostResponse::QueryResponse(QueryResponse::ConnectedPeers { peers }))) => Some(peers),
        Ok(Ok(other)) => {
            // Unexpected response - could be out-of-order notification
            tracing::warn!(
                attempt,
                node = node_name,
                ?other,
                "Unexpected response, will retry..."
            );
            None
        }
        Ok(Err(e)) => {
            // WebSocket error
            tracing::warn!(
                attempt,
                node = node_name,
                error = %e,
                "WebSocket error, will retry..."
            );
            None
        }
        Err(_elapsed) => {
            // Timeout
            tracing::warn!(
                attempt,
                node = node_name,
                timeout_secs = timeout.as_secs(),
                "Query timed out, will retry..."
            );
            None
        }
    }
}

/// Test gateway reconnection:
/// 1. Start a gateway and a peer connected to it
/// 2. Perform operations to verify connectivity
/// 3. Force disconnect
/// 4. Verify that the peer can reconnect and operate normally
/// NOTE: The freenet_test macro configures each peer with a public network port
/// so auto_connect_peers ensures they can form a full mesh rather than only
/// connecting to the gateway.
#[freenet_test(
    health_check_readiness = true,
    nodes = ["gateway", "peer"],
    // Increased timeout for CI where 8 parallel tests compete for resources
    timeout_secs = 300,
    startup_wait_secs = 30,
    aggregate_events = "always",
    tokio_flavor = "multi_thread",
    tokio_worker_threads = 4
)]
async fn test_gateway_reconnection(ctx: &mut TestContext) -> TestResult {
    // Load test contract
    const TEST_CONTRACT: &str = "test-contract-integration";
    let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
    let contract_key = contract.key();
    let initial_state = test_utils::create_empty_todo_list();
    let wrapped_state = WrappedState::from(initial_state);

    // Get nodes from context
    let peer = ctx.node("peer")?;

    // Give extra time for peer to connect to gateway
    tokio::time::sleep(Duration::from_secs(5)).await;

    // Connect to peer's websocket API
    let uri = peer.ws_url();
    let (stream, _) = connect_async(&uri).await?;
    let mut client_api = WebApi::start(stream);

    // Perform initial PUT to verify connectivity
    tracing::info!("Performing initial PUT to verify connectivity");
    make_put(
        &mut client_api,
        wrapped_state.clone(),
        contract.clone(),
        false,
    )
    .await?;

    // Wait for put response, draining any stale responses under CI load
    let deadline = tokio::time::Instant::now() + Duration::from_secs(60);
    loop {
        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
        if remaining.is_zero() {
            bail!("Timeout waiting for put response");
        }
        match tokio::time::timeout(remaining, client_api.recv()).await {
            Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
                assert_eq!(key, contract_key);
                tracing::info!("Initial PUT successful");
                break;
            }
            Ok(Ok(other)) => {
                tracing::debug!(
                    "Skipping stale response while waiting for PutResponse: {:?}",
                    other
                );
                continue;
            }
            Ok(Err(e)) => {
                bail!("Error receiving put response: {}", e);
            }
            Err(_) => {
                bail!("Timeout waiting for put response");
            }
        }
    }

    // Verify with GET
    tracing::info!("Verifying with GET");
    make_get(&mut client_api, contract_key, true, false).await?;
    let get_response = tokio::time::timeout(Duration::from_secs(60), client_api.recv()).await;
    match get_response {
        Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
            contract: recv_contract,
            state: recv_state,
            ..
        }))) => {
            assert_eq!(
                recv_contract.as_ref().expect("Contract should exist").key(),
                contract_key
            );
            if recv_state != wrapped_state {
                tracing::error!("State mismatch!");
                tracing::error!(
                    "Expected state: {:?}",
                    String::from_utf8_lossy(wrapped_state.as_ref())
                );
                tracing::error!(
                    "Received state: {:?}",
                    String::from_utf8_lossy(recv_state.as_ref())
                );
            }
            assert_eq!(recv_state, wrapped_state);
            tracing::info!("Initial GET successful");
        }
        Ok(Ok(other)) => {
            bail!("Unexpected response while waiting for get: {:?}", other);
        }
        Ok(Err(e)) => {
            bail!("Error receiving get response: {}", e);
        }
        Err(_) => {
            bail!("Timeout waiting for get response");
        }
    }

    // Disconnect from peer
    tracing::info!("Disconnecting from peer");
    client_api
        .send(ClientRequest::Disconnect { cause: None })
        .await?;

    // Wait for disconnect to complete
    tokio::time::sleep(Duration::from_secs(3)).await;

    // Reconnect to the peer's websocket API
    tracing::info!("Reconnecting to peer");
    let (stream, _) = connect_async(&uri).await?;
    let mut client_api = WebApi::start(stream);

    // Wait for reconnection to establish (peer should reconnect to gateway)
    tokio::time::sleep(Duration::from_secs(5)).await;

    // Perform GET to verify reconnection worked and peer can operate normally
    tracing::info!("Performing GET after reconnection");
    make_get(&mut client_api, contract_key, true, false).await?;
    // Drain stale responses until we get the GetResponse
    let deadline = tokio::time::Instant::now() + Duration::from_secs(60);
    loop {
        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
        if remaining.is_zero() {
            bail!("Timeout waiting for get response after reconnection");
        }
        match tokio::time::timeout(remaining, client_api.recv()).await {
            Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
                contract: recv_contract,
                state: recv_state,
                ..
            }))) => {
                assert_eq!(
                    recv_contract.as_ref().expect("Contract should exist").key(),
                    contract_key
                );
                assert_eq!(recv_state, wrapped_state);
                tracing::info!(
                    "Reconnection test successful - peer can perform operations after reconnecting"
                );
                break;
            }
            Ok(Ok(other)) => {
                tracing::debug!(
                    "Skipping stale response while waiting for GetResponse: {:?}",
                    other
                );
                continue;
            }
            Ok(Err(e)) => {
                bail!("Error receiving get response after reconnection: {}", e);
            }
            Err(_) => {
                bail!("Timeout waiting for get response after reconnection");
            }
        }
    }

    // Clean disconnect
    client_api
        .send(ClientRequest::Disconnect { cause: None })
        .await?;
    tokio::time::sleep(Duration::from_millis(100)).await;

    Ok(())
}

/// Simplified test to verify basic gateway connectivity
#[freenet_test(
    health_check_readiness = true,
    nodes = ["gateway"],
    // Increased timeout for CI where 8 parallel tests compete for resources
    timeout_secs = 60,
    startup_wait_secs = 5,
    aggregate_events = "always",
    tokio_flavor = "multi_thread",
    tokio_worker_threads = 4,
)]
async fn test_basic_gateway_connectivity(ctx: &mut TestContext) -> TestResult {
    // Get the gateway node from context
    let gateway = ctx.node("gateway")?;

    // Try to connect to the gateway's WebSocket API
    let uri = gateway.ws_url();
    let result = tokio::time::timeout(Duration::from_secs(10), connect_async(&uri)).await;

    match result {
        Ok(Ok((stream, _))) => {
            tracing::info!("Successfully connected to gateway WebSocket");
            let mut client = WebApi::start(stream);

            // Disconnect cleanly
            client
                .send(ClientRequest::Disconnect { cause: None })
                .await?;
            tokio::time::sleep(Duration::from_millis(100)).await;
            Ok(())
        }
        Ok(Err(e)) => {
            bail!("Failed to connect to gateway: {}", e);
        }
        Err(_) => {
            bail!("Timeout connecting to gateway");
        }
    }
}

/// Test three-node network connectivity with full mesh formation
/// This test verifies that a network of 3 nodes (1 gateway + 2 peers) can:
/// 1. Establish connections to form a full mesh
/// 2. Successfully perform PUT/GET operations across the network
/// 3. Propagate UPDATE via proximity cache (issue #2294 regression test)
///
/// # Port Configuration for P2P Mesh
///
/// For peers to participate in P2P mesh connectivity, they must have BOTH
/// `public_address` AND `public_port` configured. This ensures the peer's
/// PeerId is set from config (see config.rs:242-251).
///
/// ## Port Types
///
/// - **network_port**: The local port the peer binds to for listening
/// - **public_port**: The external port peers should connect to
///   - In localhost tests (no NAT): public_port = network_port
///   - In production with NAT: public_port = router's external port
///
/// ## How It Works
///
/// ### Localhost Tests (this test)
/// 1. Peer binds UDP socket to network_port (e.g., 53425)
/// 2. When sending to gateway, UDP uses bound port as source (53425)
/// 3. Gateway sees source port 53425 in handshake
/// 4. Gateway sends back "your external address is 127.0.0.1:53425"
/// 5. Peer's PeerId is already set from config with public_port=53425
/// 6. Other peers connect directly to 127.0.0.1:53425 ✅
///
/// ### Real P2P Network (with NAT)
/// 1. Peer behind NAT binds to network_port (e.g., 8080)
/// 2. Peer sets public_port to router's external port (e.g., 54321)
/// 3. Router forwards external port 54321 → internal port 8080
/// 4. When peer sends to gateway, NAT translates:
///    - Source: 192.168.1.100:8080 → PublicIP:54321
/// 5. Gateway sees source as PublicIP:54321
/// 6. Peer's PeerId is set from config: PublicIP:54321
/// 7. Other peers connect to PublicIP:54321
/// 8. Router forwards to peer's internal 192.168.1.100:8080 ✅
///
#[freenet_test(
    health_check_readiness = true,
    nodes = ["gateway", "peer1", "peer2"],
    // Increased timeout for CI where 8 parallel tests compete for resources
    timeout_secs = 300,
    startup_wait_secs = 30,
    // Locations are derived from varied loopback IPs (127.x.y.1) which gives each node
    // a unique location without needing explicit configuration
    aggregate_events = "always",
    tokio_flavor = "multi_thread",
    tokio_worker_threads = 4
)]
async fn test_three_node_network_connectivity(ctx: &mut TestContext) -> TestResult {
    // Load test contract
    const TEST_CONTRACT: &str = "test-contract-integration";
    let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
    let contract_key = contract.key();
    let initial_state = test_utils::create_empty_todo_list();
    let wrapped_state = WrappedState::from(initial_state);

    // Get node information from context
    let gateway = ctx.node("gateway")?;
    let peer1 = ctx.node("peer1")?;
    let peer2 = ctx.node("peer2")?;
    tracing::info!(
        "Using deterministic node locations: gateway={:.3}, peer1={:.3}, peer2={:.3}",
        gateway.location,
        peer1.location,
        peer2.location
    );

    let peer1_public_port = peer1.network_port.context(
        "peer1 missing network port; auto_connect_peers requires public_port for mesh connectivity",
    )?;
    let peer2_public_port = peer2.network_port.context(
        "peer2 missing network port; auto_connect_peers requires public_port for mesh connectivity",
    )?;
    tracing::info!(
        peer1_port = peer1_public_port,
        peer2_port = peer2_public_port,
        "Verified peer network ports for direct connectivity"
    );

    // Give extra time for peers to connect to gateway
    tokio::time::sleep(Duration::from_secs(5)).await;

    // Connect to websockets using node-specific IPs
    let (stream_gw, _) = connect_async(&gateway.ws_url()).await?;
    let mut client_gw = WebApi::start(stream_gw);

    let (stream1, _) = connect_async(&peer1.ws_url()).await?;
    let mut client1 = WebApi::start(stream1);

    let (stream2, _) = connect_async(&peer2.ws_url()).await?;
    let mut client2 = WebApi::start(stream2);

    // Retry loop to wait for full mesh connectivity.
    // Use a deadline-based approach to ensure we leave time for PUT/GET operations.
    //
    // Timeout budget (test_timeout=180s, startup_wait=30s → 150s available):
    //   - Mesh formation: up to 90s (deadline-based, not retry count)
    //   - PUT with retries: ~30s
    //   - GET: ~30s
    //   - Safety margin: ~30s for CI variability
    const MESH_FORMATION_TIMEOUT: Duration = Duration::from_secs(90);
    const QUERY_TIMEOUT: Duration = Duration::from_secs(10);
    const RETRY_DELAY: Duration = Duration::from_secs(2);
    let mesh_deadline = tokio::time::Instant::now() + MESH_FORMATION_TIMEOUT;
    let mut mesh_established = false;
    let mut last_snapshot = (String::new(), String::new(), String::new());
    let mut attempt: u32 = 0;

    while tokio::time::Instant::now() < mesh_deadline {
        attempt += 1;
        let remaining_secs = mesh_deadline
            .saturating_duration_since(tokio::time::Instant::now())
            .as_secs();
        // Log first 5 attempts at info level for visibility
        if attempt <= 5 {
            tracing::info!(
                "Attempt {} ({}s remaining): Querying all nodes for connected peers...",
                attempt,
                remaining_secs
            );
        }
        tracing::info!(
            attempt,
            remaining_secs,
            "Querying all nodes for connected peers..."
        );

        // Query each node for connections using resilient helper
        let Some(gw_peers) =
            query_connected_peers(&mut client_gw, "gateway", QUERY_TIMEOUT, attempt).await
        else {
            tokio::time::sleep(RETRY_DELAY).await;
            continue;
        };

        let Some(peer1_peers) =
            query_connected_peers(&mut client1, "peer1", QUERY_TIMEOUT, attempt).await
        else {
            tokio::time::sleep(RETRY_DELAY).await;
            continue;
        };

        let Some(peer2_peers) =
            query_connected_peers(&mut client2, "peer2", QUERY_TIMEOUT, attempt).await
        else {
            tokio::time::sleep(RETRY_DELAY).await;
            continue;
        };

        // Log first 5 attempts at info level for visibility
        if attempt <= 5 {
            tracing::info!(
                "  - Gateway has {} connections, Peer1 has {}, Peer2 has {}",
                gw_peers.len(),
                peer1_peers.len(),
                peer2_peers.len()
            );
        }
        tracing::info!(
            gateway_connections = gw_peers.len(),
            peer1_connections = peer1_peers.len(),
            peer2_connections = peer2_peers.len(),
            "Connection counts"
        );
        tracing::debug!("Gateway peers: {:?}", gw_peers);
        tracing::debug!("Peer1 peers: {:?}", peer1_peers);
        tracing::debug!("Peer2 peers: {:?}", peer2_peers);

        last_snapshot = (
            format!("{:?}", gw_peers),
            format!("{:?}", peer1_peers),
            format!("{:?}", peer2_peers),
        );

        // With terminus-only acceptance (accept only when we can't forward to a closer peer),
        // the gateway may have fewer direct connections in small networks. What matters is
        // that the network is connected (all nodes can reach each other through some path).
        //
        // Minimum connectivity requirements:
        // - Gateway: at least 1 connection (to be part of the network)
        // - Each peer: at least 1 connection
        //
        // Note: In a 3-node network with terminus-only acceptance, the topology might be:
        // - Gateway ← Peer1 ← Peer2 (linear)
        // - Gateway ← Peer1 ↔ Peer2 (gateway to peer1, peer1 to both)
        // Both topologies are valid as long as the network is connected.
        let gateway_has_minimum = !gw_peers.is_empty();
        let peer1_has_minimum = !peer1_peers.is_empty();
        let peer2_has_minimum = !peer2_peers.is_empty();

        if gateway_has_minimum && peer1_has_minimum && peer2_has_minimum {
            // Check if we have full mesh (ideal) or minimum connectivity (acceptable)
            let is_full_mesh =
                gw_peers.len() >= 2 && peer1_peers.len() >= 2 && peer2_peers.len() >= 2;
            if is_full_mesh {
                tracing::info!("Full mesh connectivity established!");
            } else {
                tracing::info!(
                    "Minimum connectivity achieved (all nodes connected, network is reachable)"
                );
            }
            mesh_established = true;
            break;
        }

        tracing::info!("Network not yet meeting minimum connectivity, waiting...");
        tokio::time::sleep(RETRY_DELAY).await;
    }

    if !mesh_established {
        tracing::error!(
            gateway_peers = %last_snapshot.0,
            peer1_peers = %last_snapshot.1,
            peer2_peers = %last_snapshot.2,
            "Connectivity check failed; dumping last snapshot"
        );

        if let Ok(aggregator) = ctx.aggregate_events().await {
            if let Ok(events) = aggregator.get_all_events().await {
                tracing::error!(total_events = events.len(), "Aggregated events at timeout");
                for event in events.iter().rev().take(10).rev() {
                    tracing::error!(?event.kind, peer=%event.peer_id, ts=%event.datetime, "Recent event");
                }
            }
        }

        bail!(
            "Failed to establish minimum connectivity after {} attempts ({}s timeout). Gateway peers: {}; peer1 peers: {}; peer2 peers: {}",
            attempt,
            MESH_FORMATION_TIMEOUT.as_secs(),
            last_snapshot.0,
            last_snapshot.1,
            last_snapshot.2
        );
    }

    // Allow a brief settling period before exercising contract operations.
    tokio::time::sleep(Duration::from_secs(2)).await;

    // Verify functionality with PUT/GET
    tracing::info!("Verifying network functionality with PUT/GET operations");

    const PUT_RETRIES: usize = 3;
    perform_put_with_retries(
        &mut client1,
        &wrapped_state,
        &contract,
        &contract_key,
        PUT_RETRIES,
    )
    .await?;

    // GET so peer2 caches the contract (return_contract_code=true, subscribe=false)
    make_get(&mut client2, contract_key, true, false).await?;
    let get_response = tokio::time::timeout(Duration::from_secs(60), client2.recv()).await;
    match get_response {
        Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
            contract: recv_contract,
            state: recv_state,
            ..
        }))) => {
            assert_eq!(recv_contract.as_ref().unwrap().key(), contract_key);
            assert_eq!(recv_state, wrapped_state);
            tracing::info!("✅ Peer2 successfully retrieved data from network");
        }
        Ok(Ok(other)) => bail!("Unexpected GET response: {:?}", other),
        Ok(Err(e)) => bail!("Error receiving GET response: {}", e),
        Err(_) => bail!("Timeout waiting for GET response"),
    }

    // Test UPDATE propagation via proximity cache (issue #2294 regression test).
    // At this point:
    // - Peer1 hosted the contract via PUT, sent HostingAnnounce to neighbors
    // - Peer2 hosted the contract via GET, sent HostingAnnounce to neighbors
    // - Both peers should have each other in neighbor_hosting.neighbors_with_contract()
    // - UPDATE from peer1 should route to peer2 via neighbor hosting, not ring routing
    tracing::info!("Testing UPDATE propagation via neighbor hosting");

    // Explicitly subscribe peer2 to receive update notifications
    make_subscribe(&mut client2, contract_key).await?;
    loop {
        let resp = tokio::time::timeout(Duration::from_secs(30), client2.recv()).await;
        match resp {
            Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse {
                key,
                subscribed,
            }))) => {
                assert_eq!(key, contract_key, "Subscribe response key mismatch");
                assert!(subscribed, "Subscribe should succeed");
                tracing::info!("✅ Peer2 subscribed to contract updates");
                break;
            }
            Ok(Ok(other)) => {
                tracing::debug!("Ignoring non-subscribe response: {:?}", other);
                continue;
            }
            Ok(Err(e)) => bail!("Error receiving subscribe response: {}", e),
            Err(_) => bail!("Timeout waiting for subscribe response"),
        }
    }

    // Allow time for HostingAnnounce messages to propagate
    tokio::time::sleep(Duration::from_secs(2)).await;

    // Create updated state
    let mut todo_list: test_utils::TodoList =
        serde_json::from_slice(wrapped_state.as_ref()).expect("deserialize state");
    todo_list.tasks.push(test_utils::Task {
        id: todo_list.tasks.len() as u64 + 1,
        title: "Proximity cache test".to_string(),
        description: "Update via proximity cache test".to_string(),
        completed: false,
        priority: 1,
    });
    todo_list.version += 1;
    let updated_bytes = serde_json::to_vec(&todo_list).expect("serialize updated state");
    let updated_state = WrappedState::from(updated_bytes);

    // Peer1 sends UPDATE
    make_update(&mut client1, contract_key, updated_state.clone()).await?;

    // Wait for UPDATE response on peer1, draining any stale responses
    // (e.g. late PutResponse from previous operation under CI load)
    let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
    loop {
        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
        if remaining.is_zero() {
            bail!("Timeout waiting for UpdateResponse on peer1");
        }
        match tokio::time::timeout(remaining, client1.recv()).await {
            Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateResponse {
                key,
                ..
            }))) => {
                assert_eq!(key, contract_key);
                tracing::info!("✅ Peer1 received UpdateResponse");
                break;
            }
            Ok(Ok(other)) => {
                tracing::debug!(
                    "Skipping stale response while waiting for UpdateResponse: {:?}",
                    other
                );
                continue;
            }
            Ok(Err(e)) => bail!("Error receiving UpdateResponse: {}", e),
            Err(_) => bail!("Timeout waiting for UpdateResponse on peer1"),
        }
    }

    // Wait for UPDATE notification on peer2 (subscribed via GET), draining stale responses
    let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
    loop {
        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
        if remaining.is_zero() {
            bail!(
                "Timeout waiting for UpdateNotification on peer2 - \
                 UPDATE may not have propagated via proximity cache (issue #2294)"
            );
        }
        match tokio::time::timeout(remaining, client2.recv()).await {
            Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateNotification {
                key,
                update,
            }))) => {
                assert_eq!(key, contract_key);
                match update {
                    UpdateData::State(state) => {
                        let received_list: test_utils::TodoList =
                            serde_json::from_slice(state.as_ref())
                                .expect("deserialize update notification state");
                        let has_our_task = received_list
                            .tasks
                            .iter()
                            .any(|t| t.title == "Proximity cache test");
                        assert!(
                            has_our_task,
                            "Update notification state should contain our task. Got: {:?}",
                            received_list.tasks
                        );
                        tracing::info!(
                            "✅ Peer2 received UpdateNotification via proximity cache (issue #2294 regression test passed)"
                        );
                    }
                    other @ UpdateData::Delta(_)
                    | other @ UpdateData::StateAndDelta { .. }
                    | other @ UpdateData::RelatedState { .. }
                    | other @ UpdateData::RelatedDelta { .. }
                    | other @ UpdateData::RelatedStateAndDelta { .. } => {
                        bail!("Unexpected update data type: {:?}", other)
                    }
                }
                break;
            }
            Ok(Ok(other)) => {
                tracing::debug!(
                    "Skipping stale response while waiting for UpdateNotification: {:?}",
                    other
                );
                continue;
            }
            Ok(Err(e)) => bail!("Error receiving UpdateNotification: {}", e),
            Err(_) => bail!(
                "Timeout waiting for UpdateNotification on peer2 - \
                 UPDATE may not have propagated via proximity cache (issue #2294)"
            ),
        }
    }

    // Clean disconnect
    client_gw
        .send(ClientRequest::Disconnect { cause: None })
        .await?;
    client1
        .send(ClientRequest::Disconnect { cause: None })
        .await?;
    client2
        .send(ClientRequest::Disconnect { cause: None })
        .await?;
    tokio::time::sleep(Duration::from_millis(100)).await;

    Ok(())
}

async fn perform_put_with_retries(
    client: &mut WebApi,
    wrapped_state: &WrappedState,
    contract: &ContractContainer,
    contract_key: &ContractKey,
    max_attempts: usize,
) -> TestResult {
    let mut last_err: Option<anyhow::Error> = None;

    for attempt in 1..=max_attempts {
        tracing::info!(attempt, max_attempts, "Starting PUT attempt");
        if let Err(err) = make_put(client, wrapped_state.clone(), contract.clone(), false).await {
            last_err = Some(err);
        } else {
            match tokio::time::timeout(Duration::from_secs(60), client.recv()).await {
                Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
                    if key == *contract_key {
                        tracing::info!(attempt, "Peer1 successfully performed PUT");
                        return Ok(());
                    }
                    last_err = Some(anyhow!(
                        "Received PUT response for unexpected key {key:?} (expected {contract_key:?})"
                    ));
                }
                Ok(Ok(other)) => {
                    last_err = Some(anyhow!("Unexpected PUT response: {other:?}"));
                }
                Ok(Err(e)) => {
                    last_err = Some(anyhow!("Error receiving PUT response: {e}"));
                }
                Err(_) => {
                    last_err = Some(anyhow!("Timeout waiting for PUT response"));
                }
            }
        }

        tracing::warn!(
            attempt,
            max_attempts,
            "PUT attempt failed; retrying after short delay"
        );
        tokio::time::sleep(Duration::from_secs(2)).await;
    }

    Err(last_err.unwrap_or_else(|| anyhow!("PUT failed after {max_attempts} attempts")))
}

/// Regression test for: when a gateway promotes a transient connection,
/// the peer's identity must be immediately visible via QueryConnections.
///
/// This test validates that after the Connect operation completes:
/// 1. The gateway's QueryConnections returns the peer (not empty)
/// 2. The peer's QueryConnections returns the gateway
///
/// Previously, transient connections were stored with `pub_key: None` and
/// only updated via message-based identity learning, which created race
/// conditions where QueryConnections could return empty despite successful
/// connections.
///
/// The fix ensures handle_connect_peer updates the transport entry's pub_key
/// when promoting transients.
#[freenet_test(
    health_check_readiness = true,
    nodes = ["gateway", "peer"],
    // Increased timeout for CI where 8 parallel tests compete for resources
    timeout_secs = 120,
    // Reduced startup wait - we'll poll for actual connection establishment
    startup_wait_secs = 5,
    aggregate_events = "always",
    tokio_flavor = "multi_thread",
    tokio_worker_threads = 4
)]
async fn test_gateway_reports_peer_identity_after_connect(ctx: &mut TestContext) -> TestResult {
    let gateway = ctx.node("gateway")?;
    let peer = ctx.node("peer")?;

    // Connect to websockets using node-specific IPs
    let (stream_gw, _) = connect_async(&gateway.ws_url()).await?;
    let mut client_gw = WebApi::start(stream_gw);

    let (stream_peer, _) = connect_async(&peer.ws_url()).await?;
    let mut client_peer = WebApi::start(stream_peer);

    // Poll for connection establishment with a deadline.
    // The #2211 bug would cause QueryConnections to NEVER return the peer identity,
    // even after the connection was established. This test verifies that once the
    // connection is established, the identity IS visible (within a reasonable timeout).
    const CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
    const QUERY_TIMEOUT: Duration = Duration::from_secs(10);
    const RETRY_DELAY: Duration = Duration::from_secs(2);
    let deadline = tokio::time::Instant::now() + CONNECTION_TIMEOUT;

    let mut gw_peers = Vec::new();
    let mut peer_peers = Vec::new();
    let mut attempt: u32 = 0;

    while tokio::time::Instant::now() < deadline {
        attempt += 1;
        let remaining_secs = deadline
            .saturating_duration_since(tokio::time::Instant::now())
            .as_secs();

        tracing::info!(
            attempt,
            remaining_secs,
            "Querying nodes for connected peers..."
        );

        // Query both nodes using resilient helper
        let Some(gw_result) =
            query_connected_peers(&mut client_gw, "gateway", QUERY_TIMEOUT, attempt).await
        else {
            tokio::time::sleep(RETRY_DELAY).await;
            continue;
        };

        let Some(peer_result) =
            query_connected_peers(&mut client_peer, "peer", QUERY_TIMEOUT, attempt).await
        else {
            tokio::time::sleep(RETRY_DELAY).await;
            continue;
        };

        gw_peers = gw_result;
        peer_peers = peer_result;

        tracing::info!(
            gateway_connections = gw_peers.len(),
            peer_connections = peer_peers.len(),
            "Connection visibility check"
        );

        // Both nodes must see each other
        if !gw_peers.is_empty() && !peer_peers.is_empty() {
            break;
        }

        tokio::time::sleep(RETRY_DELAY).await;
    }

    // CRITICAL ASSERTIONS - the whole point of this regression test
    // The #2211 bug would cause these to fail even after waiting indefinitely,
    // because the peer identity was never propagated to the transport layer.
    if gw_peers.is_empty() {
        bail!(
            "REGRESSION: Gateway's QueryConnections returned empty after {} attempts ({}s timeout)! \
             This indicates the peer's identity was not propagated to the \
             transport layer when the transient connection was promoted. \
             See PR #2211 for the original bug fix.",
            attempt,
            CONNECTION_TIMEOUT.as_secs()
        );
    }

    if peer_peers.is_empty() {
        bail!(
            "REGRESSION: Peer's QueryConnections returned empty after {} attempts ({}s timeout)! \
             This indicates the connection wasn't properly established.",
            attempt,
            CONNECTION_TIMEOUT.as_secs()
        );
    }

    tracing::info!(
        "Connection identity propagation verified: gateway sees {} peer(s), peer sees {} connection(s)",
        gw_peers.len(),
        peer_peers.len()
    );

    Ok(())
}