distributed_topic_tracker/gossip/merge/
bubble.rs1use actor_helper::{Action, Actor, Handle, Receiver};
6use std::{collections::HashSet, time::Duration};
7
8use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
9use anyhow::Result;
10
11#[derive(Debug, Clone)]
16pub struct BubbleMerge {
17 _api: Handle<BubbleMergeActor, anyhow::Error>,
18}
19
20#[derive(Debug)]
21struct BubbleMergeActor {
22 rx: Receiver<Action<BubbleMergeActor>>,
23
24 record_publisher: RecordPublisher,
25 gossip_receiver: GossipReceiver,
26 gossip_sender: GossipSender,
27 ticker: tokio::time::Interval,
28}
29
30impl BubbleMerge {
31 pub fn new(
35 record_publisher: RecordPublisher,
36 gossip_sender: GossipSender,
37 gossip_receiver: GossipReceiver,
38 ) -> Result<Self> {
39 let (api, rx) = Handle::channel();
40
41 let mut ticker = tokio::time::interval(Duration::from_secs(10));
42 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
43
44 tokio::spawn(async move {
45 let mut actor = BubbleMergeActor {
46 rx,
47 record_publisher,
48 gossip_receiver,
49 gossip_sender,
50 ticker,
51 };
52 let _ = actor.run().await;
53 });
54
55 Ok(Self { _api: api })
56 }
57}
58
59impl Actor<anyhow::Error> for BubbleMergeActor {
60 async fn run(&mut self) -> Result<()> {
61 tracing::debug!("BubbleMerge: starting bubble merge actor");
62 loop {
63 tokio::select! {
64 Ok(action) = self.rx.recv_async() => {
65 action(self).await;
66 }
67 _ = self.ticker.tick() => {
68 tracing::debug!("BubbleMerge: tick fired, checking for bubbles");
69 let _ = self.merge().await;
70 let next_interval = rand::random::<u64>() % 50;
71 tracing::debug!("BubbleMerge: next check in {}s", next_interval);
72 self.ticker.reset_after(Duration::from_secs(next_interval));
73 }
74 _ = tokio::signal::ctrl_c() => break,
75 }
76 }
77 Ok(())
78 }
79}
80
81impl BubbleMergeActor {
82 async fn merge(&mut self) -> Result<()> {
84 let unix_minute = crate::unix_minute(0);
85 let mut records = self.record_publisher.get_records(unix_minute - 1).await;
86 records.extend(self.record_publisher.get_records(unix_minute).await);
87
88 let neighbors = self.gossip_receiver.neighbors().await;
89 tracing::debug!(
90 "BubbleMerge: checking with {} neighbors and {} records",
91 neighbors.len(),
92 records.len()
93 );
94
95 if neighbors.len() < 4 && !records.is_empty() {
96 tracing::debug!(
97 "BubbleMerge: detected small bubble ({} neighbors < 4), attempting merge",
98 neighbors.len()
99 );
100 let node_ids = records
101 .iter()
102 .flat_map(|record| {
103 let mut node_ids = if let Ok(content) = record.content::<GossipRecordContent>()
104 {
105 content
106 .active_peers
107 .iter()
108 .filter_map(|&active_peer| {
109 if active_peer == [0; 32]
110 || neighbors.contains(&active_peer)
111 || active_peer.eq(record.node_id().to_vec().as_slice())
112 || active_peer.eq(self.record_publisher.pub_key().as_bytes())
113 {
114 None
115 } else {
116 iroh::NodeId::from_bytes(&active_peer).ok()
117 }
118 })
119 .collect::<Vec<_>>()
120 } else {
121 vec![]
122 };
123 if let Ok(node_id) = iroh::NodeId::from_bytes(&record.node_id()) {
124 if node_id != self.record_publisher.pub_key().into() {
125 node_ids.push(node_id);
126 }
127 }
128 node_ids
129 })
130 .collect::<HashSet<_>>();
131
132 tracing::debug!(
133 "BubbleMerge: found {} potential peers to join",
134 node_ids.len()
135 );
136
137 if !node_ids.is_empty() {
138 self.gossip_sender
139 .join_peers(
140 node_ids.iter().cloned().collect::<Vec<_>>(),
141 Some(super::MAX_JOIN_PEERS_COUNT),
142 )
143 .await?;
144 tracing::debug!("BubbleMerge: join_peers request sent");
145 }
146 } else {
147 tracing::debug!(
148 "BubbleMerge: no merge needed (neighbors={}, records={})",
149 neighbors.len(),
150 records.len()
151 );
152 }
153 Ok(())
154 }
155}