distributed_topic_tracker/gossip/merge/
bubble.rs1use actor_helper::{Action, Actor, Handle, Receiver};
6use iroh::EndpointId;
7use std::{collections::HashSet, time::Duration};
8
9use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
10use anyhow::Result;
11
12#[derive(Debug, Clone)]
17pub struct BubbleMerge {
18 _api: Handle<BubbleMergeActor, anyhow::Error>,
19}
20
21#[derive(Debug)]
22struct BubbleMergeActor {
23 rx: Receiver<Action<BubbleMergeActor>>,
24
25 record_publisher: RecordPublisher,
26 gossip_receiver: GossipReceiver,
27 gossip_sender: GossipSender,
28 ticker: tokio::time::Interval,
29}
30
31impl BubbleMerge {
32 pub fn new(
36 record_publisher: RecordPublisher,
37 gossip_sender: GossipSender,
38 gossip_receiver: GossipReceiver,
39 ) -> Result<Self> {
40 let (api, rx) = Handle::channel();
41
42 let mut ticker = tokio::time::interval(Duration::from_secs(10));
43 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
44
45 tokio::spawn(async move {
46 let mut actor = BubbleMergeActor {
47 rx,
48 record_publisher,
49 gossip_receiver,
50 gossip_sender,
51 ticker,
52 };
53 let _ = actor.run().await;
54 });
55
56 Ok(Self { _api: api })
57 }
58}
59
60impl Actor<anyhow::Error> for BubbleMergeActor {
61 async fn run(&mut self) -> Result<()> {
62 tracing::debug!("BubbleMerge: starting bubble merge actor");
63 loop {
64 tokio::select! {
65 Ok(action) = self.rx.recv_async() => {
66 action(self).await;
67 }
68 _ = self.ticker.tick() => {
69 tracing::debug!("BubbleMerge: tick fired, checking for bubbles");
70 let _ = self.merge().await;
71 let next_interval = rand::random::<u64>() % 50;
72 tracing::debug!("BubbleMerge: next check in {}s", next_interval);
73 self.ticker.reset_after(Duration::from_secs(next_interval));
74 }
75 _ = tokio::signal::ctrl_c() => break,
76 }
77 }
78 Ok(())
79 }
80}
81
82impl BubbleMergeActor {
83 async fn merge(&mut self) -> Result<()> {
85 let unix_minute = crate::unix_minute(0);
86 let mut records = self.record_publisher.get_records(unix_minute - 1).await;
87 records.extend(self.record_publisher.get_records(unix_minute).await);
88
89 let neighbors = self.gossip_receiver.neighbors().await;
90 tracing::debug!(
91 "BubbleMerge: checking with {} neighbors and {} records",
92 neighbors.len(),
93 records.len()
94 );
95
96 if neighbors.len() < 4 && !records.is_empty() {
97 tracing::debug!(
98 "BubbleMerge: detected small bubble ({} neighbors < 4), attempting merge",
99 neighbors.len()
100 );
101 let node_ids = records
102 .iter()
103 .flat_map(|record| {
104 let mut endpoint_ids = if let Ok(content) =
105 record.content::<GossipRecordContent>()
106 {
107 content
108 .active_peers
109 .iter()
110 .filter_map(|&active_peer| {
111 if active_peer == [0; 32]
112 || neighbors.contains(&active_peer)
113 || active_peer.eq(record.node_id().to_vec().as_slice())
114 || active_peer.eq(self.record_publisher.pub_key().as_bytes())
115 {
116 None
117 } else {
118 iroh::EndpointId::from_bytes(&active_peer).ok()
119 }
120 })
121 .collect::<Vec<_>>()
122 } else {
123 vec![]
124 };
125 if let Ok(endpoint_id) = EndpointId::from_bytes(&record.node_id()) {
126 if endpoint_id
127 != EndpointId::from_verifying_key(self.record_publisher.pub_key())
128 {
129 endpoint_ids.push(endpoint_id);
130 }
131 }
132 endpoint_ids
133 })
134 .collect::<HashSet<_>>();
135
136 tracing::debug!(
137 "BubbleMerge: found {} potential peers to join",
138 node_ids.len()
139 );
140
141 if !node_ids.is_empty() {
142 self.gossip_sender
143 .join_peers(
144 node_ids.iter().cloned().collect::<Vec<_>>(),
145 Some(super::MAX_JOIN_PEERS_COUNT),
146 )
147 .await?;
148 tracing::debug!("BubbleMerge: join_peers request sent");
149 }
150 } else {
151 tracing::debug!(
152 "BubbleMerge: no merge needed (neighbors={}, records={})",
153 neighbors.len(),
154 records.len()
155 );
156 }
157 Ok(())
158 }
159}