use std::collections::HashMap;
use crate::NodeAddr;
use crate::protocol::TreeAnnounce;
use super::{Node, NodeError};
use tracing::{debug, info, trace, warn};
impl Node {
fn build_tree_announce(&self) -> Result<TreeAnnounce, NodeError> {
let decl = self.tree_state.my_declaration().clone();
let ancestry = self.tree_state.my_coords().clone();
if !decl.is_signed() {
return Err(NodeError::SendFailed {
node_addr: *self.identity.node_addr(),
reason: "declaration not signed".into(),
});
}
Ok(TreeAnnounce::new(decl, ancestry))
}
pub(super) async fn send_tree_announce_to_peer(
&mut self,
peer_addr: &NodeAddr,
) -> Result<(), NodeError> {
let now_ms = Self::now_ms();
let peer = match self.peers.get_mut(peer_addr) {
Some(p) => p,
None => return Err(NodeError::PeerNotFound(*peer_addr)),
};
if !peer.can_send_tree_announce(now_ms) {
peer.mark_tree_announce_pending();
self.stats_mut().tree.rate_limited += 1;
debug!(
peer = %self.peer_display_name(peer_addr),
"TreeAnnounce rate-limited, marking pending"
);
return Ok(());
}
let announce = self.build_tree_announce()?;
let encoded = announce.encode().map_err(|e| NodeError::SendFailed {
node_addr: *peer_addr,
reason: format!("encode failed: {}", e),
})?;
if let Err(e) = self.send_encrypted_link_message(peer_addr, &encoded).await {
self.stats_mut().tree.send_failed += 1;
return Err(e);
}
self.stats_mut().tree.sent += 1;
if let Some(peer) = self.peers.get_mut(peer_addr) {
peer.record_tree_announce_sent(now_ms);
}
trace!(peer = %self.peer_display_name(peer_addr), "Sent TreeAnnounce");
Ok(())
}
pub(super) async fn send_tree_announce_to_all(&mut self) {
let peer_addrs: Vec<NodeAddr> = self.peers.keys().copied().collect();
for peer_addr in peer_addrs {
if let Err(e) = self.send_tree_announce_to_peer(&peer_addr).await {
debug!(
peer = %self.peer_display_name(&peer_addr),
error = %e,
"Failed to send TreeAnnounce"
);
}
}
}
pub(super) async fn send_pending_tree_announces(&mut self) {
let now_ms = Self::now_ms();
let ready: Vec<NodeAddr> = self
.peers
.iter()
.filter(|(_, peer)| {
peer.has_pending_tree_announce() && peer.can_send_tree_announce(now_ms)
})
.map(|(addr, _)| *addr)
.collect();
for peer_addr in ready {
if let Err(e) = self.send_tree_announce_to_peer(&peer_addr).await {
debug!(
peer = %self.peer_display_name(&peer_addr),
error = %e,
"Failed to send pending TreeAnnounce"
);
}
}
}
pub(super) async fn handle_tree_announce(&mut self, from: &NodeAddr, payload: &[u8]) {
self.stats_mut().tree.received += 1;
let announce = match TreeAnnounce::decode(payload) {
Ok(a) => a,
Err(e) => {
self.stats_mut().tree.decode_error += 1;
debug!(from = %self.peer_display_name(from), error = %e, "Malformed TreeAnnounce");
return;
}
};
let pubkey = match self.peers.get(from) {
Some(peer) => peer.pubkey(),
None => {
self.stats_mut().tree.unknown_peer += 1;
debug!(from = %self.peer_display_name(from), "TreeAnnounce from unknown peer");
return;
}
};
if announce.declaration.node_addr() != from {
self.stats_mut().tree.addr_mismatch += 1;
debug!(
from = %self.peer_display_name(from),
declared = %announce.declaration.node_addr(),
"TreeAnnounce node_addr mismatch"
);
return;
}
if let Err(e) = announce.declaration.verify(&pubkey) {
self.stats_mut().tree.sig_failed += 1;
warn!(
from = %self.peer_display_name(from),
error = %e,
"TreeAnnounce signature verification failed"
);
return;
}
if let Err(e) = announce.validate_semantics() {
warn!(
from = %self.peer_display_name(from),
error = %e,
"Rejected TreeAnnounce with invalid ancestry"
);
return;
}
let now_ms = Self::now_ms();
if let Some(peer) = self.peers.get_mut(from) {
peer.update_tree_position(
announce.declaration.clone(),
announce.ancestry.clone(),
now_ms,
);
}
let updated = self
.tree_state
.update_peer(announce.declaration.clone(), announce.ancestry.clone());
if !updated {
self.stats_mut().tree.stale += 1;
debug!(from = %self.peer_display_name(from), "TreeAnnounce not fresher than existing, ignored");
return;
}
self.stats_mut().tree.accepted += 1;
debug!(
from = %self.peer_display_name(from),
seq = announce.declaration.sequence(),
depth = announce.ancestry.depth(),
root = %announce.ancestry.root_id(),
"Processed TreeAnnounce"
);
let peer_costs: HashMap<NodeAddr, f64> = self
.peers
.iter()
.filter(|(_, peer)| peer.has_srtt())
.map(|(addr, peer)| (*addr, peer.link_cost()))
.collect();
if let Some(new_parent) = self.tree_state.evaluate_parent(&peer_costs) {
let new_seq = self.tree_state.my_declaration().sequence() + 1;
let timestamp = crate::time::now_secs();
let flap_dampened = self.tree_state.set_parent(new_parent, new_seq, timestamp);
self.tree_state.recompute_coords();
if let Err(e) = self.tree_state.sign_declaration(&self.identity) {
warn!(error = %e, "Failed to sign declaration after parent switch");
return;
}
self.coord_cache.clear();
self.reset_discovery_backoff();
self.stats_mut().tree.parent_switched += 1;
self.stats_mut().tree.parent_switches += 1;
info!(
new_parent = %self.peer_display_name(&new_parent),
new_seq = new_seq,
new_root = %self.tree_state.root(),
depth = self.tree_state.my_coords().depth(),
"Parent switched, flushed coord cache, announcing to all peers"
);
if flap_dampened {
self.stats_mut().tree.flap_dampened += 1;
warn!("Flap dampening engaged: excessive parent switches detected");
}
self.send_tree_announce_to_all().await;
let all_peers: Vec<NodeAddr> = self.peers.keys().copied().collect();
self.bloom_state.mark_all_updates_needed(all_peers);
} else if !self.tree_state.is_root() && self.tree_state.should_be_root() {
self.tree_state.become_root();
if let Err(e) = self.tree_state.sign_declaration(&self.identity) {
warn!(error = %e, "Failed to sign self-root declaration");
return;
}
self.coord_cache.clear();
self.reset_discovery_backoff();
self.stats_mut().tree.parent_switched += 1;
self.stats_mut().tree.parent_switches += 1;
info!(
new_root = %self.tree_state.root(),
"Self-promoted to root: smallest visible NodeAddr"
);
self.send_tree_announce_to_all().await;
let all_peers: Vec<NodeAddr> = self.peers.keys().copied().collect();
self.bloom_state.mark_all_updates_needed(all_peers);
} else if !self.tree_state.is_root()
&& *self.tree_state.my_declaration().parent_id() == *from
{
if let Some(parent_coords) = self.tree_state.peer_coords(from)
&& parent_coords.contains(self.identity.node_addr())
{
self.stats_mut().tree.loop_detected += 1;
warn!(
parent = %self.peer_display_name(from),
"Parent ancestry contains us — loop detected, dropping parent"
);
let peer_costs: HashMap<NodeAddr, f64> = self
.peers
.iter()
.filter(|(_, peer)| peer.has_srtt())
.map(|(addr, peer)| (*addr, peer.link_cost()))
.collect();
if self.tree_state.handle_parent_lost(&peer_costs) {
if let Err(e) = self.tree_state.sign_declaration(&self.identity) {
warn!(error = %e, "Failed to sign declaration after loop detection");
return;
}
self.coord_cache.clear();
self.reset_discovery_backoff();
self.send_tree_announce_to_all().await;
}
return;
}
let old_root = *self.tree_state.root();
let old_depth = self.tree_state.my_coords().depth();
let old_addrs: Vec<NodeAddr> =
self.tree_state.my_coords().node_addrs().copied().collect();
let new_seq = self.tree_state.my_declaration().sequence() + 1;
let timestamp = crate::time::now_secs();
self.tree_state.set_parent(*from, new_seq, timestamp);
self.tree_state.recompute_coords();
if let Err(e) = self.tree_state.sign_declaration(&self.identity) {
warn!(error = %e, "Failed to sign declaration after parent update");
return;
}
self.coord_cache.clear();
self.reset_discovery_backoff();
let new_addrs: Vec<NodeAddr> =
self.tree_state.my_coords().node_addrs().copied().collect();
if old_addrs != new_addrs {
self.stats_mut().tree.ancestry_changed += 1;
info!(
parent = %self.peer_display_name(from),
old_root = %old_root,
new_root = %self.tree_state.root(),
old_depth = old_depth,
new_depth = self.tree_state.my_coords().depth(),
"Parent ancestry changed, re-announcing"
);
self.send_tree_announce_to_all().await;
let peer_addrs: Vec<NodeAddr> = self.peers.keys().copied().collect();
let peer_filters = self.peer_inbound_filters();
self.bloom_state
.mark_changed_peers(from, &peer_addrs, &peer_filters);
}
}
}
pub(super) async fn check_tree_state(&mut self) {
self.send_pending_tree_announces().await;
self.check_periodic_parent_reeval().await;
}
async fn check_periodic_parent_reeval(&mut self) {
let interval_secs = self.config.node.tree.reeval_interval_secs;
if interval_secs == 0 {
return;
}
if self.peers.len() < 2 {
return;
}
let now = crate::time::instant_now();
let interval = std::time::Duration::from_secs(interval_secs);
if let Some(last) = self.last_parent_reeval
&& now.duration_since(last) < interval
{
return;
}
self.last_parent_reeval = Some(now);
let peer_costs: HashMap<NodeAddr, f64> = self
.peers
.iter()
.filter(|(_, peer)| peer.has_srtt())
.map(|(addr, peer)| (*addr, peer.link_cost()))
.collect();
if let Some(new_parent) = self.tree_state.evaluate_parent(&peer_costs) {
let new_seq = self.tree_state.my_declaration().sequence() + 1;
let timestamp = crate::time::now_secs();
let flap_dampened = self.tree_state.set_parent(new_parent, new_seq, timestamp);
self.tree_state.recompute_coords();
if let Err(e) = self.tree_state.sign_declaration(&self.identity) {
warn!(error = %e, "Failed to sign declaration after periodic parent re-eval");
return;
}
self.coord_cache.clear();
self.reset_discovery_backoff();
self.stats_mut().tree.parent_switched += 1;
self.stats_mut().tree.parent_switches += 1;
info!(
new_parent = %self.peer_display_name(&new_parent),
new_seq = new_seq,
new_root = %self.tree_state.root(),
depth = self.tree_state.my_coords().depth(),
trigger = "periodic",
"Parent switched via periodic cost re-evaluation"
);
if flap_dampened {
self.stats_mut().tree.flap_dampened += 1;
warn!("Flap dampening engaged: excessive parent switches detected");
}
self.send_tree_announce_to_all().await;
let all_peers: Vec<NodeAddr> = self.peers.keys().copied().collect();
self.bloom_state.mark_all_updates_needed(all_peers);
} else if !self.tree_state.is_root() && self.tree_state.should_be_root() {
self.tree_state.become_root();
if let Err(e) = self.tree_state.sign_declaration(&self.identity) {
warn!(error = %e, "Failed to sign self-root declaration in periodic reeval");
return;
}
self.coord_cache.clear();
self.reset_discovery_backoff();
self.stats_mut().tree.parent_switched += 1;
self.stats_mut().tree.parent_switches += 1;
info!(
new_root = %self.tree_state.root(),
trigger = "periodic",
"Self-promoted to root in periodic reeval: smallest visible NodeAddr"
);
self.send_tree_announce_to_all().await;
let all_peers: Vec<NodeAddr> = self.peers.keys().copied().collect();
self.bloom_state.mark_all_updates_needed(all_peers);
}
}
pub(super) fn handle_peer_removal_tree_cleanup(&mut self, node_addr: &NodeAddr) -> bool {
let was_parent =
!self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == node_addr;
self.tree_state.remove_peer(node_addr);
if was_parent {
self.stats_mut().tree.parent_losses += 1;
let peer_costs: HashMap<NodeAddr, f64> = self
.peers
.iter()
.filter(|(_, peer)| peer.has_srtt())
.map(|(addr, peer)| (*addr, peer.link_cost()))
.collect();
let changed = self.tree_state.handle_parent_lost(&peer_costs);
if changed {
if let Err(e) = self.tree_state.sign_declaration(&self.identity) {
warn!(error = %e, "Failed to sign declaration after parent loss");
}
info!(
new_root = %self.tree_state.root(),
is_root = self.tree_state.is_root(),
"Tree state updated after parent loss"
);
}
changed
} else {
false
}
}
}