1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
//! Integration tests for stage 2 of `docs/NAT_TRAVERSAL_PLAN.md`:
//! NAT-type classification + reflex-address piggyback on
//! `CapabilityAnnouncement`.
//!
//! Stage 1 (the reflex-probe subprotocol) is exercised in
//! `tests/reflex_probe.rs`; this file builds on top of that to verify
//! the classification FSM populates `MeshNode`'s atomic
//! `nat_class` + `reflex_addr` fields, and that the resulting
//! `nat:*` capability tag reaches a peer via the existing
//! capability-broadcast path.
//!
//! # Properties under test
//!
//! - **Classification populates on demand.** Calling
//! [`MeshNode::reclassify_nat`] with ≥2 connected peers on
//! localhost yields `NatClass::Open` (reflex == bind) and a
//! populated `reflex_addr()`.
//! - **`nat:*` tag rides the broadcast.** After reclassification,
//! the next `announce_capabilities` emits the `nat:*` tag, and a
//! peer can find the announcer via
//! `find_nodes_by_filter(require_tag("nat:open"))`.
//! - **Fewer than 2 peers leaves state at `Unknown`.** Running
//! reclassification with a lone peer is a no-op, preserving the
//! pre-classification `Unknown` state.
//! - **Background classify loop seeds state without explicit
//! upcall.** `spawn_nat_classify_loop` fires the first sweep
//! once ≥2 peers are reachable.
//!
//! Run: `cargo test --features net,nat-traversal --test nat_classify`
#![cfg(all(feature = "net", feature = "nat-traversal"))]
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use net::adapter::net::behavior::capability::{CapabilityFilter, CapabilitySet};
use net::adapter::net::traversal::classify::NatClass;
use net::adapter::net::{EntityKeypair, MeshNode, MeshNodeConfig, SocketBufferConfig};
const TEST_BUFFER_SIZE: usize = 256 * 1024;
const PSK: [u8; 32] = [0x42u8; 32];
/// Bind via `127.0.0.1:0` so the OS picks a free port — no
/// pre-bind reservation window, no TOCTOU race with parallel
/// tests.
fn test_config() -> MeshNodeConfig {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut cfg = MeshNodeConfig::new(addr, PSK)
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(5))
.with_handshake(3, Duration::from_secs(2));
cfg.socket_buffers = SocketBufferConfig {
send_buffer_size: TEST_BUFFER_SIZE,
recv_buffer_size: TEST_BUFFER_SIZE,
};
cfg
}
async fn build_node() -> Arc<MeshNode> {
let cfg = test_config();
let keypair = EntityKeypair::generate();
Arc::new(MeshNode::new(keypair, cfg).await.expect("MeshNode::new"))
}
/// Three-node setup: A in the center, B + C as its peers. Runs
/// both handshakes before starting the receive loops so a running
/// A doesn't conflict with a second inbound accept. Returns the
/// three started nodes.
async fn three_node_star() -> (Arc<MeshNode>, Arc<MeshNode>, Arc<MeshNode>) {
let a = build_node().await;
let b = build_node().await;
let c = build_node().await;
// A ↔ B handshake
{
let a_id = a.node_id();
let b_pub = *b.public_key();
let b_addr = b.local_addr();
let b_id = b.node_id();
let b_clone = b.clone();
let accept = tokio::spawn(async move { b_clone.accept(a_id).await });
a.connect(b_addr, &b_pub, b_id)
.await
.expect("connect A→B failed");
accept
.await
.expect("accept B panicked")
.expect("accept B failed");
}
// A ↔ C handshake
{
let a_id = a.node_id();
let c_pub = *c.public_key();
let c_addr = c.local_addr();
let c_id = c.node_id();
let c_clone = c.clone();
let accept = tokio::spawn(async move { c_clone.accept(a_id).await });
a.connect(c_addr, &c_pub, c_id)
.await
.expect("connect A→C failed");
accept
.await
.expect("accept C panicked")
.expect("accept C failed");
}
a.start();
b.start();
c.start();
(a, b, c)
}
/// Two-node variant of the star helper. Same order-of-operations
/// (handshake then start), kept as its own helper to keep the test
/// bodies readable.
async fn two_node_pair() -> (Arc<MeshNode>, Arc<MeshNode>) {
let a = build_node().await;
let b = build_node().await;
let a_id = a.node_id();
let b_pub = *b.public_key();
let b_addr = b.local_addr();
let b_id = b.node_id();
let b_clone = b.clone();
let accept = tokio::spawn(async move { b_clone.accept(a_id).await });
a.connect(b_addr, &b_pub, b_id)
.await
.expect("connect A→B failed");
accept
.await
.expect("accept B panicked")
.expect("accept B failed");
a.start();
b.start();
(a, b)
}
/// A manual reclassification with two connected peers on localhost
/// should yield `Open` — reflex equals bind, no NAT in the path.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reclassify_on_localhost_is_open() {
let (a, _b, _c) = three_node_star().await;
// Pre-classification: atomic still at the Unknown default.
assert_eq!(
a.nat_class(),
NatClass::Unknown,
"pre-sweep classification should be Unknown",
);
assert!(
a.reflex_addr().is_none(),
"reflex_addr should be None before the first sweep",
);
a.reclassify_nat().await;
assert_eq!(
a.nat_class(),
NatClass::Open,
"localhost loopback: reflex equals bind → Open",
);
assert_eq!(
a.reflex_addr(),
Some(a.local_addr()),
"reflex_addr should equal the bind addr on localhost",
);
}
/// A peer receiving an announcement from a classified node can find
/// it via `find_nodes_by_filter` on the `nat:*` tag.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn nat_tag_propagates_through_capability_broadcast() {
let (a, b, _c) = three_node_star().await;
a.reclassify_nat().await;
assert_eq!(a.nat_class(), NatClass::Open);
// Empty caps — the announce path should still synthesize the
// `nat:open` tag from the classifier state.
a.announce_capabilities(CapabilitySet::new())
.await
.expect("announce_capabilities");
// Give the broadcast a few hundred ms to reach B and land in
// its capability index.
let filter = CapabilityFilter::new().require_tag("nat:open");
let mut found = false;
for _ in 0..30 {
let peers = b.find_nodes_by_filter(&filter);
if peers.contains(&a.node_id()) {
found = true;
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
assert!(
found,
"B should see A's `nat:open` tag within 3s; got peers: {:?}",
b.find_nodes_by_filter(&filter),
);
}
/// Running `reclassify_nat` with a lone peer leaves the node at
/// `Unknown`. The FSM needs at least two probes to distinguish
/// Cone from Symmetric; one probe is treated as "unclassified" and
/// must not flip the atomic into a bogus state.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reclassify_with_single_peer_stays_unknown() {
let (a, _b) = two_node_pair().await;
a.reclassify_nat().await;
assert_eq!(
a.nat_class(),
NatClass::Unknown,
"one peer is insufficient for classification",
);
assert!(
a.reflex_addr().is_none(),
"reflex_addr should stay None when classification didn't run",
);
}
/// The background classify loop (spawned separately from `start`)
/// should fire the first sweep automatically once ≥2 peers are
/// connected. No explicit `reclassify_nat` call required.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn background_classify_loop_seeds_state() {
let (a, _b, _c) = three_node_star().await;
// The loop polls every 200 ms for ≥2 peers, so the first sweep
// fires within that window after the handshakes finish.
let handle = a.spawn_nat_classify_loop();
// Wait up to 3 s for the sweep to land.
let mut classified = false;
for _ in 0..30 {
if a.nat_class() == NatClass::Open {
classified = true;
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
assert!(
classified,
"background loop should classify within 3s; got {:?}",
a.nat_class(),
);
handle.abort();
}