1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
//! Link message dispatch and peer removal.
use crate::NodeAddr;
use crate::node::Node;
use tracing::{debug, info, trace};
impl Node {
/// Dispatch a decrypted link message to the appropriate handler.
///
/// Link messages are protocol messages exchanged between authenticated peers.
pub(in crate::node) async fn dispatch_link_message(
&mut self,
from: &NodeAddr,
plaintext: &[u8],
ce_flag: bool,
) {
if plaintext.is_empty() {
return;
}
let msg_type = plaintext[0];
let payload = &plaintext[1..];
match msg_type {
0x00 => {
// SessionDatagram
self.handle_session_datagram(from, payload, ce_flag).await;
}
0x01 => {
// SenderReport
self.handle_sender_report(from, payload);
}
0x02 => {
// ReceiverReport
self.handle_receiver_report(from, payload).await;
}
0x10 => {
// TreeAnnounce
self.handle_tree_announce(from, payload).await;
}
0x20 => {
// FilterAnnounce
self.handle_filter_announce(from, payload).await;
}
0x30 => {
// LookupRequest
self.handle_lookup_request(from, payload).await;
}
0x31 => {
// LookupResponse
self.handle_lookup_response(from, payload).await;
}
0x50 => {
// Disconnect
self.handle_disconnect(from, payload);
}
0x51 => {
// Heartbeat — no-op, last_recv_time already updated by record_recv()
trace!(peer = %self.peer_display_name(from), "Received heartbeat");
}
_ => {
debug!(msg_type = msg_type, "Unknown link message type");
}
}
}
/// Handle a Disconnect notification from a peer.
///
/// The peer is signaling an orderly departure. We immediately remove
/// them from all state rather than waiting for timeout detection, and
/// schedule a reconnect if the peer is configured as auto-connect.
/// Without this, a graceful upstream shutdown orphans auto-connect
/// entries — other removal paths (link-dead, decrypt failure, peer
/// restart) all schedule reconnect.
pub(in crate::node) fn handle_disconnect(&mut self, from: &NodeAddr, payload: &[u8]) {
let disconnect = match crate::protocol::Disconnect::decode(payload) {
Ok(msg) => msg,
Err(e) => {
debug!(from = %self.peer_display_name(from), error = %e, "Malformed disconnect message");
return;
}
};
info!(
peer = %self.peer_display_name(from),
reason = %disconnect.reason,
"Peer sent disconnect notification"
);
let addr = *from;
self.remove_active_peer(from);
let now_ms = Self::now_ms();
self.schedule_reconnect(addr, now_ms);
}
/// Remove an active peer and clean up all associated state.
///
/// Frees session index, removes link and address mappings. Used for
/// both graceful disconnect and timeout-based eviction.
///
/// Also handles tree state cleanup: if the removed peer was our parent,
/// selects an alternative or becomes root, and marks remaining peers
/// for pending tree announce (delivered on next tick).
pub(in crate::node) fn remove_active_peer(&mut self, node_addr: &NodeAddr) {
let peer = match self.peers.remove(node_addr) {
Some(p) => p,
None => {
debug!(peer = %self.peer_display_name(node_addr), "Peer already removed");
return;
}
};
// Log suppressed replay detection summary before teardown
let suppressed = peer.replay_suppressed_count();
if suppressed > 0 {
debug!(
peer = %self.peer_display_name(node_addr),
count = suppressed,
"Suppressed replay detections during link transition"
);
}
// MMP teardown log (before we drop the peer)
let peer_name = self
.peer_aliases
.get(node_addr)
.cloned()
.unwrap_or_else(|| peer.identity().short_npub());
if let Some(mmp) = peer.mmp() {
Self::log_mmp_teardown(&peer_name, mmp);
}
// Remove any end-to-end session associated with this peer.
//
// Sessions are tracked separately from peers (self.sessions vs self.peers).
// Leaving a stale session alive after removing the peer causes:
// 1. check_session_mmp_reports() keeps logging stale "MMP session metrics"
// with frozen counters until purge_idle_sessions() eventually fires.
// 2. initiate_session() finds is_established() == true on the stale entry
// and silently returns Ok(()), preventing a new session from being
// established even after the link layer reconnects successfully.
if let Some(session_entry) = self.sessions.remove(node_addr)
&& let Some(mmp) = session_entry.mmp()
{
Self::log_session_mmp_teardown(&peer_name, mmp);
}
self.pending_tun_packets.remove(node_addr);
let link_id = peer.link_id();
let transport_id = peer.transport_id();
// Free session indices (current, rekey, pending, previous)
if let Some(tid) = transport_id {
if let Some(idx) = peer.our_index() {
self.deregister_session_index((tid, idx.as_u32()));
let _ = self.index_allocator.free(idx);
}
if let Some(idx) = peer.rekey_our_index() {
self.pending_outbound.remove(&(tid, idx.as_u32()));
self.deregister_session_index((tid, idx.as_u32()));
let _ = self.index_allocator.free(idx);
}
if let Some(idx) = peer.pending_our_index() {
self.deregister_session_index((tid, idx.as_u32()));
let _ = self.index_allocator.free(idx);
}
if let Some(idx) = peer.previous_our_index() {
self.deregister_session_index((tid, idx.as_u32()));
let _ = self.index_allocator.free(idx);
}
}
// Remove link and address mapping
self.remove_link(&link_id);
if let Some(transport_id) = transport_id {
self.cleanup_bootstrap_transport_if_unused(transport_id);
}
// Tree state cleanup
let tree_changed = self.handle_peer_removal_tree_cleanup(node_addr);
if tree_changed {
// Mark all remaining peers for pending tree announce.
// These will be sent on the next tick via check_tree_state().
for peer in self.peers.values_mut() {
peer.mark_tree_announce_pending();
}
}
// Bloom filter cleanup: clear state for removed peer, mark all remaining peers
self.bloom_state.remove_peer_state(node_addr);
let remaining_peers: Vec<NodeAddr> = self.peers.keys().copied().collect();
self.bloom_state.mark_all_updates_needed(remaining_peers);
info!(
peer = %self.peer_display_name(node_addr),
link_id = %link_id,
tree_changed = tree_changed,
"Peer removed and state cleaned up"
);
}
}