distributed_topic_tracker/gossip/merge/
bubble.rs1use actor_helper::{Action, Actor, Handle, Receiver};
6use iroh::EndpointId;
7use std::{collections::HashSet, time::Duration};
8
9use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
10use anyhow::Result;
11
12#[derive(Debug, Clone)]
17pub struct BubbleMerge {
18 _api: Handle<BubbleMergeActor, anyhow::Error>,
19}
20
21#[derive(Debug)]
22struct BubbleMergeActor {
23 rx: Receiver<Action<BubbleMergeActor>>,
24
25 record_publisher: RecordPublisher,
26 gossip_receiver: GossipReceiver,
27 gossip_sender: GossipSender,
28 ticker: tokio::time::Interval,
29}
30
31impl BubbleMerge {
32 pub fn new(
36 record_publisher: RecordPublisher,
37 gossip_sender: GossipSender,
38 gossip_receiver: GossipReceiver,
39 ) -> Result<Self> {
40 let (api, rx) = Handle::channel();
41
42 let mut ticker = tokio::time::interval(Duration::from_secs(10));
43 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
44
45 tokio::spawn(async move {
46 let mut actor = BubbleMergeActor {
47 rx,
48 record_publisher,
49 gossip_receiver,
50 gossip_sender,
51 ticker,
52 };
53 let _ = actor.run().await;
54 });
55
56 Ok(Self { _api: api })
57 }
58}
59
60impl Actor<anyhow::Error> for BubbleMergeActor {
61 async fn run(&mut self) -> Result<()> {
62 tracing::debug!("BubbleMerge: starting bubble merge actor");
63 loop {
64 tokio::select! {
65 Ok(action) = self.rx.recv_async() => {
66 action(self).await;
67 }
68 _ = self.ticker.tick() => {
69 tracing::debug!("BubbleMerge: tick fired, checking for bubbles");
70 let _ = self.merge().await;
71 let next_interval = rand::random::<u64>() % 50;
72 tracing::debug!("BubbleMerge: next check in {}s", next_interval);
73 self.ticker.reset_after(Duration::from_secs(next_interval));
74 }
75 else => break Ok(()),
76 }
77 }
78 }
79}
80
81impl BubbleMergeActor {
82 async fn merge(&mut self) -> Result<()> {
84 let unix_minute = crate::unix_minute(0);
85 let mut records = self.record_publisher.get_records(unix_minute - 1).await;
86 records.extend(self.record_publisher.get_records(unix_minute).await);
87
88 let neighbors = self.gossip_receiver.neighbors().await;
89 tracing::debug!(
90 "BubbleMerge: checking with {} neighbors and {} records",
91 neighbors.len(),
92 records.len()
93 );
94
95 if neighbors.len() < 4 && !records.is_empty() {
96 tracing::debug!(
97 "BubbleMerge: detected small bubble ({} neighbors < 4), attempting merge",
98 neighbors.len()
99 );
100 let node_ids = records
101 .iter()
102 .flat_map(|record| {
103 let mut endpoint_ids = if let Ok(content) =
104 record.content::<GossipRecordContent>()
105 {
106 content
107 .active_peers
108 .iter()
109 .filter_map(|&active_peer| {
110 if active_peer == [0; 32]
111 || neighbors.contains(&active_peer)
112 || active_peer.eq(record.node_id().to_vec().as_slice())
113 || active_peer.eq(self.record_publisher.pub_key().as_bytes())
114 {
115 None
116 } else {
117 iroh::EndpointId::from_bytes(&active_peer).ok()
118 }
119 })
120 .collect::<Vec<_>>()
121 } else {
122 vec![]
123 };
124 if let Ok(endpoint_id) = EndpointId::from_bytes(&record.node_id())
125 && endpoint_id
126 != EndpointId::from_verifying_key(self.record_publisher.pub_key())
127 {
128 endpoint_ids.push(endpoint_id);
129 }
130 endpoint_ids
131 })
132 .collect::<HashSet<_>>();
133
134 tracing::debug!(
135 "BubbleMerge: found {} potential peers to join",
136 node_ids.len()
137 );
138
139 if !node_ids.is_empty() {
140 self.gossip_sender
141 .join_peers(
142 node_ids.iter().cloned().collect::<Vec<_>>(),
143 Some(super::MAX_JOIN_PEERS_COUNT),
144 )
145 .await?;
146 tracing::debug!("BubbleMerge: join_peers request sent");
147 }
148 } else {
149 tracing::debug!(
150 "BubbleMerge: no merge needed (neighbors={}, records={})",
151 neighbors.len(),
152 records.len()
153 );
154 }
155 Ok(())
156 }
157}