distributed_topic_tracker/gossip/
receiver.rs1use std::{
4 collections::{HashSet, VecDeque},
5 fmt::Display,
6 sync::Arc,
7};
8
9use actor_helper::{Action, Handle, Receiver, act_ok};
10use anyhow::Result;
11use futures_lite::StreamExt;
12use iroh::EndpointId;
13use sha2::Digest;
14
15use crate::{MAX_MESSAGE_HASHES, Topic};
16
17#[derive(Debug)]
22pub struct GossipReceiver {
23 api: Handle<GossipReceiverActor, anyhow::Error>,
24 pub(crate) _topic_keep_alive: Option<Arc<Topic>>,
25 _next_channel_sender: tokio::sync::broadcast::WeakSender<Option<iroh_gossip::api::Event>>,
26 next_channel_receiver: tokio::sync::broadcast::Receiver<Option<iroh_gossip::api::Event>>,
27 _join_channel_sender: tokio::sync::broadcast::WeakSender<Option<()>>,
28 join_channel_receiver: tokio::sync::broadcast::Receiver<Option<()>>,
29}
30
31impl Clone for GossipReceiver {
32 fn clone(&self) -> Self {
33 let next_rx = match self._next_channel_sender.upgrade() {
34 Some(sender) => sender.subscribe(),
35 None => {
36 let (tx, rx) = tokio::sync::broadcast::channel(1);
37 drop(tx);
38 rx
39 }
40 };
41 let join_rx = match self._join_channel_sender.upgrade() {
42 Some(sender) => sender.subscribe(),
43 None => {
44 let (tx, rx) = tokio::sync::broadcast::channel(1);
45 drop(tx);
46 rx
47 }
48 };
49 Self {
50 api: self.api.clone(),
51 _topic_keep_alive: self._topic_keep_alive.clone(),
52 _next_channel_sender: self._next_channel_sender.clone(),
53 next_channel_receiver: next_rx,
54 _join_channel_sender: self._join_channel_sender.clone(),
55 join_channel_receiver: join_rx,
56 }
57 }
58}
59
60#[derive(Debug)]
62pub struct GossipReceiverActor {
63 gossip_receiver: iroh_gossip::api::GossipReceiver,
64 last_message_hashes: VecDeque<[u8; 32]>,
65 cancel_token: tokio_util::sync::CancellationToken,
66 next_channel_sender: tokio::sync::broadcast::Sender<Option<iroh_gossip::api::Event>>,
67 join_channel_sender: tokio::sync::broadcast::Sender<Option<()>>,
68}
69
70#[derive(Debug)]
71pub enum ChannelError {
72 Closed,
73 Lagged(u64),
74}
75
76impl From<tokio::sync::broadcast::error::RecvError> for ChannelError {
77 fn from(err: tokio::sync::broadcast::error::RecvError) -> Self {
78 match err {
79 tokio::sync::broadcast::error::RecvError::Closed => ChannelError::Closed,
80 tokio::sync::broadcast::error::RecvError::Lagged(skipped) => {
81 ChannelError::Lagged(skipped)
82 }
83 }
84 }
85}
86
87impl std::error::Error for ChannelError {}
88impl Display for ChannelError {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 match self {
91 ChannelError::Closed => write!(f, "channel closed"),
92 ChannelError::Lagged(skipped) => {
93 write!(f, "channel lagged, skipped {} messages", skipped)
94 }
95 }
96 }
97}
98
99impl GossipReceiver {
100 pub fn new(
102 gossip_receiver: iroh_gossip::api::GossipReceiver,
103 cancel_token: tokio_util::sync::CancellationToken,
104 ) -> Self {
105 let (next_tx, next_rx) = tokio::sync::broadcast::channel(64);
106 let (join_tx, join_rx) = tokio::sync::broadcast::channel(64);
107 let api = Handle::spawn_with(
108 GossipReceiverActor {
109 gossip_receiver,
110 last_message_hashes: VecDeque::with_capacity(MAX_MESSAGE_HASHES),
111 cancel_token,
112 next_channel_sender: next_tx.clone(),
113 join_channel_sender: join_tx.clone(),
114 },
115 |mut actor, rx| async move { actor.run(rx).await },
116 )
117 .0;
118
119 Self {
120 api,
121 _topic_keep_alive: None,
122 next_channel_receiver: next_rx,
123 _next_channel_sender: next_tx.downgrade(),
124 join_channel_receiver: join_rx,
125 _join_channel_sender: join_tx.downgrade(),
126 }
127 }
128
129 pub async fn neighbors(&self) -> Result<HashSet<EndpointId>> {
131 self.api
132 .call(act_ok!(actor => async move {
133 actor.gossip_receiver.neighbors().collect::<HashSet<EndpointId>>()
134 }))
135 .await
136 }
137
138 pub async fn is_joined(&self) -> Result<bool> {
140 self.api
141 .call(act_ok!(actor => async move { actor.gossip_receiver.is_joined() }))
142 .await
143 }
144
145 pub async fn next(&mut self) -> Result<iroh_gossip::api::Event, ChannelError> {
147 match self.next_channel_receiver.recv().await {
148 Ok(event) => match event {
149 Some(event) => Ok(event),
150 None => Err(ChannelError::Closed),
151 },
152 Err(err) => Err(err.into()),
153 }
154 }
155
156 pub async fn joined(&mut self) -> Result<(), ChannelError> {
158 if self.is_joined().await.map_err(|_| ChannelError::Closed)? {
159 return Ok(());
160 }
161 match self.join_channel_receiver.recv().await {
162 Ok(event) => match event {
163 Some(event) => Ok(event),
164 None => Err(ChannelError::Closed),
165 },
166 Err(err) => Err(err.into()),
167 }
168 }
169
170 pub async fn last_message_hashes(&self) -> Result<VecDeque<[u8; 32]>> {
174 self.api
175 .call(act_ok!(actor => async move { actor.last_message_hashes.clone() }))
176 .await
177 }
178}
179
180impl GossipReceiverActor {
181 async fn run(&mut self, rx: Receiver<Action<GossipReceiverActor>>) -> Result<()> {
182 tracing::debug!("GossipReceiver: starting gossip receiver actor");
183 loop {
184 tokio::select! {
185 result = rx.recv_async() => {
186 match result {
187 Ok(action) => action(self).await,
188 Err(_) => break Ok(()),
189 }
190 }
191 raw_event = self.gossip_receiver.next() => {
192 let event = match raw_event {
193 None => {
194 tracing::debug!("GossipReceiver: gossip receiver closed");
195 self.join_channel_sender.send(None).ok();
196 self.next_channel_sender.send(None).ok();
197 self.cancel_token.cancel();
198 break Ok(());
199 }
200 Some(Err(err)) => {
201 tracing::warn!("GossipReceiver: error receiving gossip event: {err}");
202 self.next_channel_sender.send(None).ok();
203 self.join_channel_sender.send(None).ok();
204 self.cancel_token.cancel();
205 break Ok(());
206 }
207 Some(Ok(ref event)) => {
208 match event {
209 iroh_gossip::api::Event::Received(msg) => {
210 tracing::debug!("GossipReceiver: received message from {:?}", msg.delivered_from);
211 let mut hash = sha2::Sha512::new();
212 hash.update(&msg.content);
213 if let Ok(lmh) = hash.finalize()[..32].try_into() {
214 if self.last_message_hashes.len() == MAX_MESSAGE_HASHES {
215 self.last_message_hashes.pop_front();
216 }
217 self.last_message_hashes.push_back(lmh);
218 }
219 self.join_channel_sender.send(Some(())).ok();
220 }
221 iroh_gossip::api::Event::NeighborUp(pub_key) => {
222 tracing::debug!("GossipReceiver: neighbor UP: {}", pub_key);
223 self.join_channel_sender.send(Some(())).ok();
224 }
225 iroh_gossip::api::Event::NeighborDown(pub_key) => {
226 tracing::debug!("GossipReceiver: neighbor DOWN: {}", pub_key);
227 }
228 iroh_gossip::api::Event::Lagged => {
229 tracing::debug!("GossipReceiver: event stream lagged");
230 }
231 };
232 event.clone()
233 }
234 };
235
236 self.next_channel_sender.send(Some(event)).ok();
237 }
238 _ = self.cancel_token.cancelled() => {
239 self.join_channel_sender.send(None).ok();
240 self.next_channel_sender.send(None).ok();
241 break Ok(());
242 }
243 else => break Ok(()),
244 }
245 }
246 }
247}