1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
//! RX event loop and packet dispatch.
use crate::control::queries;
use crate::control::{ControlSocket, commands};
use crate::node::decrypt_worker::{DecryptFailureReport, DecryptFallback, DecryptWorkerEvent};
use crate::node::wire::{
COMMON_PREFIX_SIZE, CommonPrefix, FLAG_CE, FLAG_SP, FMP_VERSION, PHASE_ESTABLISHED, PHASE_MSG1,
PHASE_MSG2,
};
use crate::node::{Node, NodeError};
use crate::transport::ReceivedPacket;
use crate::transport::TransportHandle;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedReceiver;
use tracing::{debug, info, warn};
/// How often the raw-packet drain loop yields a slice of work to the
/// decrypt-fallback drain. Keeps TCP ACK / heartbeat / handshake
/// progress steady under sustained inbound bursts.
const FALLBACK_INTERLEAVE_EVERY: usize = 32;
/// Cap on the per-interleave fallback drain so a hot inbound spike
/// can't starve the outer raw-packet drain in the opposite direction.
const FALLBACK_INTERLEAVE_BUDGET: usize = 64;
impl Node {
/// Run the receive event loop.
///
/// Processes packets from all transports, dispatching based on
/// the phase field in the 4-byte common prefix:
/// - Phase 0x0: Encrypted frame (session data)
/// - Phase 0x1: Handshake message 1 (initiator -> responder)
/// - Phase 0x2: Handshake message 2 (responder -> initiator)
///
/// Also processes outbound IPv6 packets from the TUN reader for session
/// encapsulation and routing through the mesh.
///
/// Also processes DNS-resolved identities for identity cache population.
///
/// Also runs a periodic tick (1s) to clean up stale handshake connections
/// that never received a response. This prevents resource leaks when peers
/// are unreachable.
///
/// This method takes ownership of the packet_rx channel and runs
/// until the channel is closed (typically when stop() is called).
pub async fn run_rx_loop(&mut self) -> Result<(), NodeError> {
let mut packet_rx = self.packet_rx.take().ok_or(NodeError::NotStarted)?;
// Take the TUN outbound receiver, or create a dummy channel that never
// produces messages (when TUN is disabled). Holding the sender prevents
// the channel from closing.
let (mut tun_outbound_rx, _tun_guard) = match self.tun_outbound_rx.take() {
Some(rx) => (rx, None),
None => {
let (tx, rx) = tokio::sync::mpsc::channel(1);
(rx, Some(tx))
}
};
// Take the DNS identity receiver, or create a dummy channel (when DNS
// is disabled). Same pattern as TUN outbound.
let (mut dns_identity_rx, _dns_guard) = match self.dns_identity_rx.take() {
Some(rx) => (rx, None),
None => {
let (tx, rx) = tokio::sync::mpsc::channel(1);
(rx, Some(tx))
}
};
// Take the endpoint-data command receiver, or create a dummy channel
// when the embedded endpoint API is not in use.
let (mut endpoint_command_rx, _endpoint_command_guard) =
match self.endpoint_command_rx.take() {
Some(rx) => (rx, None),
None => {
let (tx, rx) = tokio::sync::mpsc::channel(1);
(rx, Some(tx))
}
};
// Take the decrypt worker fallback receiver if a worker pool
// is in use. The worker pushes non-fast-path packets (anything
// that's not bulk EndpointData) here for the legacy dispatch.
let (mut decrypt_fallback_rx, _decrypt_fallback_guard) =
match self.decrypt_fallback_rx.take() {
Some(rx) => (rx, None),
None => {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
(rx, Some(tx))
}
};
let mut tick =
tokio::time::interval(Duration::from_secs(self.config.node.tick_interval_secs));
// Set up control socket channel
let (control_tx, mut control_rx) =
tokio::sync::mpsc::channel::<crate::control::ControlMessage>(32);
if self.config.node.control.enabled {
let config = self.config.node.control.clone();
let tx = control_tx.clone();
tokio::spawn(async move {
match ControlSocket::bind(&config) {
Ok(socket) => {
socket.accept_loop(tx).await;
}
Err(e) => {
warn!(error = %e, "Failed to bind control socket");
}
}
});
}
// Drop unused sender to avoid keeping channel open if control is disabled
drop(control_tx);
info!("RX event loop started");
// Optional perf profiler (FIPS_PERF=1). No-op otherwise.
crate::perf_profile::maybe_spawn_reporter();
loop {
tokio::select! {
biased;
// Decrypt-worker fallback drains FIRST. The previous
// ordering put `packet_rx` first, which under sustained
// inbound bursts let the raw-packet drain (up to 256
// packets + flush) starve fallback work for tens of
// milliseconds. UDP throughput tolerates that; TCP
// doesn't — late FMP plaintexts mean late ACKs,
// dup-ACK fast retransmits, and cwnd collapse.
// Reproduced on native macOS / Wi-Fi where TCP fell to
// ~10 Mb/s while UDP cleared ~100 Mb/s on the same
// tunnel. Promoting fallback gives the kernel-side
// TCP machinery a fair chance to see its ACKs and
// keep cwnd growing.
Some(event) = decrypt_fallback_rx.recv() => {
self.process_decrypt_worker_event(event).await;
self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 255).await;
self.flush_pending_sends().await;
}
// Keep maintenance above the hot packet drains in this
// biased select. Sustained UDP/TUN traffic can otherwise
// keep packet branches ready long enough to delay MMP
// reports, rekey retries, congestion samples, and connected
// UDP activation until after the dataplane already looks
// unhealthy.
_ = tick.tick() => {
self.run_rx_loop_maintenance_tick().await;
}
packet = packet_rx.recv() => {
match packet {
Some(p) => self.process_packet(p).await,
None => break, // channel closed
}
// Drain remaining ready inbound packets in a tight loop
// before yielding back to select! — every yield is a
// futex hop on tokio's multi-thread scheduler, and at
// line rate the kernel UDP queue typically has several
// datagrams available per wake. Caps at a batch
// boundary so other branches (tick, control) eventually
// get a turn even under sustained load.
let mut drained: usize = 1; // counts the packet processed above
while drained < 256 {
if drained.is_multiple_of(FALLBACK_INTERLEAVE_EVERY) {
// Interleave fallback drain so bounced FMP
// plaintexts (heartbeats, post-FSP-decrypt
// TCP ACKs, control frames) don't sit in
// the fallback queue for a full 256-packet
// burst. Without this, even the
// priority-first ordering above can't help
// once we're inside this inner loop.
self.drain_decrypt_fallback(
&mut decrypt_fallback_rx,
FALLBACK_INTERLEAVE_BUDGET,
)
.await;
}
match packet_rx.try_recv() {
Ok(p) => {
self.process_packet(p).await;
drained += 1;
}
Err(_) => break,
}
}
// One trailing fallback drain so the last bounced
// packets of the burst aren't held up by the
// post-burst send flush.
self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 256)
.await;
// Flush any batched sends triggered by inbound
// packets (e.g. forwarded SessionDatagrams, MMP
// reports, tree announces).
self.flush_pending_sends().await;
}
Some(ipv6_packet) = tun_outbound_rx.recv() => {
self.handle_tun_outbound(ipv6_packet).await;
let mut drained = 0;
while drained < 256 {
match tun_outbound_rx.try_recv() {
Ok(p) => {
self.handle_tun_outbound(p).await;
drained += 1;
}
Err(_) => break,
}
}
// Flush any trailing batched sends so the last
// packets of a burst don't sit in the per-transport
// sendmmsg buffer waiting for the threshold.
self.flush_pending_sends().await;
}
Some(identity) = dns_identity_rx.recv() => {
debug!(
node_addr = %identity.node_addr,
"Registering identity from DNS resolution"
);
self.register_identity(identity.node_addr, identity.pubkey);
}
Some(command) = endpoint_command_rx.recv() => {
self.handle_endpoint_data_command(command).await;
// Same drain pattern: when the application is shoving
// tunnel data in via send_oneway, several Send commands
// typically queue up between scheduler hops.
let mut drained = 0;
while drained < 256 {
match endpoint_command_rx.try_recv() {
Ok(c) => {
self.handle_endpoint_data_command(c).await;
drained += 1;
}
Err(_) => break,
}
}
// Flush any trailing batched sends from the
// per-transport sendmmsg buffer.
self.flush_pending_sends().await;
}
Some((request, response_tx)) = control_rx.recv() => {
let response = if request.command.starts_with("show_") {
queries::dispatch(self, &request.command, request.params.as_ref())
} else {
commands::dispatch(
self,
&request.command,
request.params.as_ref(),
).await
};
let _ = response_tx.send(response);
}
}
}
info!("RX event loop stopped (channel closed)");
Ok(())
}
async fn run_rx_loop_maintenance_tick(&mut self) {
self.check_timeouts();
let now_ms = Self::now_ms();
self.reload_peer_acl();
self.poll_pending_connects().await;
self.poll_nostr_discovery().await;
self.poll_lan_discovery().await;
self.resend_pending_handshakes(now_ms).await;
self.resend_pending_rekeys(now_ms).await;
self.resend_pending_session_handshakes(now_ms).await;
self.purge_idle_sessions(now_ms);
self.purge_learned_routes(now_ms);
self.process_pending_retries(now_ms).await;
self.check_tree_state().await;
self.check_bloom_state().await;
self.compute_mesh_size();
self.record_stats_history();
self.check_mmp_reports().await;
self.check_session_mmp_reports().await;
self.check_link_heartbeats().await;
self.check_rekey().await;
self.check_session_rekey().await;
self.check_pending_lookups(now_ms).await;
self.poll_transport_discovery().await;
self.sample_transport_congestion();
self.activate_connected_udp_sessions().await;
}
/// Hand a decrypt-worker fallback to the canonical post-FMP-decrypt
/// processor. Reconstructs `ce_flag` / `sp_flag` from the FMP header
/// flag byte the worker captured into `DecryptFallback::fmp_flags`
/// (without this both ECN CE propagation and spin-bit RTT
/// observation are dropped on the worker path) and slices the
/// plaintext out of the original wire buffer with zero allocation.
async fn process_decrypt_worker_event(&mut self, event: DecryptWorkerEvent) {
match event {
DecryptWorkerEvent::Plaintext(fallback) => {
self.process_decrypt_fallback(fallback).await;
}
DecryptWorkerEvent::DecryptFailure(report) => {
self.process_decrypt_failure_report(report).await;
}
}
}
async fn process_decrypt_fallback(&mut self, fallback: DecryptFallback) {
let ce_flag = fallback.fmp_flags & FLAG_CE != 0;
let sp_flag = fallback.fmp_flags & FLAG_SP != 0;
let plaintext = &fallback.packet_data[fallback.fmp_plaintext_offset
..fallback.fmp_plaintext_offset + fallback.fmp_plaintext_len];
self.process_authentic_fmp_plaintext(
&fallback.source_node_addr,
fallback.transport_id,
&fallback.remote_addr,
fallback.timestamp_ms,
fallback.packet_len,
fallback.fmp_counter,
ce_flag,
sp_flag,
plaintext,
)
.await;
}
async fn process_decrypt_failure_report(&mut self, report: DecryptFailureReport) {
debug!(
peer = %self.peer_display_name(&report.source_node_addr),
counter = report.fmp_counter,
replay_highest = report.fmp_replay_highest,
"Worker FMP AEAD decryption failed"
);
self.handle_decrypt_failure_report(&report).await;
}
/// Drain up to `budget` queued fallbacks without yielding back to
/// `select!`. Returns the number processed. Called both from the
/// promoted-fallback select arm (after the head item) and
/// interleaved inside the packet_rx drain loop so bounced FMP
/// plaintexts can't accumulate behind a 256-packet inbound burst.
async fn drain_decrypt_fallback(
&mut self,
rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
budget: usize,
) -> usize {
let mut drained = 0;
while drained < budget {
match rx.try_recv() {
Ok(event) => {
self.process_decrypt_worker_event(event).await;
drained += 1;
}
Err(_) => break,
}
}
drained
}
/// Flush any pending batched sends across all transports. Today
/// every transport's `flush_pending_send` is a no-op — the UDP
/// transport's per-transport `pending_send` buffer was removed
/// when the bulk data path moved into `encrypt_worker` (which
/// does its own target-grouped `sendmmsg(2)` directly). The
/// call sites are retained so any future batched transport can
/// opt in by overriding `flush_pending_send` without touching
/// the rx_loop.
async fn flush_pending_sends(&self) {
for transport in self.transports.values() {
if matches!(transport, TransportHandle::Udp(_)) {
transport.flush_pending_send().await;
}
}
}
/// Process a single received packet.
///
/// Dispatches based on the phase field in the 4-byte common prefix.
async fn process_packet(&mut self, packet: ReceivedPacket) {
let _t_total = crate::perf_profile::Timer::start(crate::perf_profile::Stage::ProcessPacket);
crate::perf_profile::record_since(
crate::perf_profile::Stage::TransportQueueWait,
packet.trace_enqueued_at,
);
if packet.data.len() < COMMON_PREFIX_SIZE {
return; // Drop packets too short for common prefix
}
let prefix = match CommonPrefix::parse(&packet.data) {
Some(p) => p,
None => return, // Malformed prefix
};
if prefix.version != FMP_VERSION {
debug!(
version = prefix.version,
transport_id = %packet.transport_id,
"Unknown FMP version, dropping"
);
// If the packet arrived on an adopted Nostr-NAT bootstrap
// transport, the originating peer is necessarily on a
// different FMP-protocol version than us — the discovery
// sweep would otherwise re-traverse them every cycle even
// though no msg1/msg2 exchange can ever succeed. Bump the
// discovery-layer cooldown to the long protocol-mismatch
// window and emit a single WARN per fresh observation.
if self.bootstrap_transports.contains(&packet.transport_id)
&& let Some(npub) = self
.bootstrap_transport_npubs
.get(&packet.transport_id)
.cloned()
&& let Some(handle) = self.nostr_discovery_handle()
{
let now_ms = Self::now_ms();
let cooldown_secs = handle.protocol_mismatch_cooldown_secs();
if handle.record_protocol_mismatch(&npub, now_ms) {
warn!(
peer_npub = %npub,
transport_id = %packet.transport_id,
peer_version = prefix.version,
our_version = FMP_VERSION,
cooldown_secs,
"Nostr-discovered peer speaks a different FMP version; suppressing retraversal"
);
}
}
return;
}
match prefix.phase {
PHASE_ESTABLISHED => {
self.handle_encrypted_frame(packet).await;
}
PHASE_MSG1 => {
self.handle_msg1(packet).await;
}
PHASE_MSG2 => {
self.handle_msg2(packet).await;
}
_ => {
debug!(
phase = prefix.phase,
transport_id = %packet.transport_id,
"Unknown FMP phase, dropping"
);
}
}
}
}