Skip to main content

distributed_topic_tracker/gossip/
receiver.rs

1//! Actor-based wrapper for iroh-gossip message receiving.
2
3use std::{
4    collections::{HashSet, VecDeque},
5    fmt::Display,
6    sync::Arc,
7};
8
9use actor_helper::{Action, Handle, Receiver, act_ok};
10use anyhow::Result;
11use futures_lite::StreamExt;
12use iroh::EndpointId;
13use sha2::Digest;
14
15use crate::{MAX_MESSAGE_HASHES, Topic};
16
17/// Gossip receiver that collects incoming messages and neighbor info.
18///
19/// Tracks SHA512 message hashes (first 32 bytes) for overlap detection and provides
20/// neighbor list for topology analysis.
21#[derive(Debug)]
22pub struct GossipReceiver {
23    api: Handle<GossipReceiverActor, anyhow::Error>,
24    pub(crate) _topic_keep_alive: Option<Arc<Topic>>,
25    _next_channel_sender: tokio::sync::broadcast::WeakSender<Option<iroh_gossip::api::Event>>,
26    next_channel_receiver: tokio::sync::broadcast::Receiver<Option<iroh_gossip::api::Event>>,
27    _join_channel_sender: tokio::sync::broadcast::WeakSender<Option<()>>,
28    join_channel_receiver: tokio::sync::broadcast::Receiver<Option<()>>,
29}
30
31impl Clone for GossipReceiver {
32    fn clone(&self) -> Self {
33        let next_rx = match self._next_channel_sender.upgrade() {
34            Some(sender) => sender.subscribe(),
35            None => {
36                let (tx, rx) = tokio::sync::broadcast::channel(1);
37                drop(tx);
38                rx
39            }
40        };
41        let join_rx = match self._join_channel_sender.upgrade() {
42            Some(sender) => sender.subscribe(),
43            None => {
44                let (tx, rx) = tokio::sync::broadcast::channel(1);
45                drop(tx);
46                rx
47            }
48        };
49        Self {
50            api: self.api.clone(),
51            _topic_keep_alive: self._topic_keep_alive.clone(),
52            _next_channel_sender: self._next_channel_sender.clone(),
53            next_channel_receiver: next_rx,
54            _join_channel_sender: self._join_channel_sender.clone(),
55            join_channel_receiver: join_rx,
56        }
57    }
58}
59
60/// Internal actor for gossip receive operations.
61#[derive(Debug)]
62pub struct GossipReceiverActor {
63    gossip_receiver: iroh_gossip::api::GossipReceiver,
64    last_message_hashes: VecDeque<[u8; 32]>,
65    cancel_token: tokio_util::sync::CancellationToken,
66    next_channel_sender: tokio::sync::broadcast::Sender<Option<iroh_gossip::api::Event>>,
67    join_channel_sender: tokio::sync::broadcast::Sender<Option<()>>,
68}
69
70#[derive(Debug)]
71pub enum ChannelError {
72    Closed,
73    Lagged(u64),
74}
75
76impl From<tokio::sync::broadcast::error::RecvError> for ChannelError {
77    fn from(err: tokio::sync::broadcast::error::RecvError) -> Self {
78        match err {
79            tokio::sync::broadcast::error::RecvError::Closed => ChannelError::Closed,
80            tokio::sync::broadcast::error::RecvError::Lagged(skipped) => {
81                ChannelError::Lagged(skipped)
82            }
83        }
84    }
85}
86
87impl std::error::Error for ChannelError {}
88impl Display for ChannelError {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        match self {
91            ChannelError::Closed => write!(f, "channel closed"),
92            ChannelError::Lagged(skipped) => {
93                write!(f, "channel lagged, skipped {} messages", skipped)
94            }
95        }
96    }
97}
98
99impl GossipReceiver {
100    /// Create a new gossip receiver from an iroh topic receiver.
101    pub fn new(
102        gossip_receiver: iroh_gossip::api::GossipReceiver,
103        cancel_token: tokio_util::sync::CancellationToken,
104    ) -> Self {
105        let (next_tx, next_rx) = tokio::sync::broadcast::channel(64);
106        let (join_tx, join_rx) = tokio::sync::broadcast::channel(64);
107        let api = Handle::spawn_with(
108            GossipReceiverActor {
109                gossip_receiver,
110                last_message_hashes: VecDeque::with_capacity(MAX_MESSAGE_HASHES),
111                cancel_token,
112                next_channel_sender: next_tx.clone(),
113                join_channel_sender: join_tx.clone(),
114            },
115            |mut actor, rx| async move { actor.run(rx).await },
116        )
117        .0;
118
119        Self {
120            api,
121            _topic_keep_alive: None,
122            next_channel_receiver: next_rx,
123            _next_channel_sender: next_tx.downgrade(),
124            join_channel_receiver: join_rx,
125            _join_channel_sender: join_tx.downgrade(),
126        }
127    }
128
129    /// Get the set of currently connected neighbor node IDs.
130    pub async fn neighbors(&self) -> Result<HashSet<EndpointId>> {
131        self.api
132            .call(act_ok!(actor => async move {
133                actor.gossip_receiver.neighbors().collect::<HashSet<EndpointId>>()
134            }))
135            .await
136    }
137
138    /// Check if the local node has joined the topic.
139    pub async fn is_joined(&self) -> Result<bool> {
140        self.api
141            .call(act_ok!(actor => async move { actor.gossip_receiver.is_joined() }))
142            .await
143    }
144
145    /// Receive the next gossip event.
146    pub async fn next(&mut self) -> Result<iroh_gossip::api::Event, ChannelError> {
147        match self.next_channel_receiver.recv().await {
148            Ok(event) => match event {
149                Some(event) => Ok(event),
150                None => Err(ChannelError::Closed),
151            },
152            Err(err) => Err(err.into()),
153        }
154    }
155
156    /// Waits for a NeighborUp or a message Received event then returns `Ok(())`.
157    pub async fn joined(&mut self) -> Result<(), ChannelError> {
158        if self.is_joined().await.map_err(|_| ChannelError::Closed)? {
159            return Ok(());
160        }
161        match self.join_channel_receiver.recv().await {
162            Ok(event) => match event {
163                Some(event) => Ok(event),
164                None => Err(ChannelError::Closed),
165            },
166            Err(err) => Err(err.into()),
167        }
168    }
169
170    /// Get SHA512 hashes (first 32 bytes) of recently received messages.
171    ///
172    /// Used for detecting message overlap during network partition recovery.
173    pub async fn last_message_hashes(&self) -> Result<VecDeque<[u8; 32]>> {
174        self.api
175            .call(act_ok!(actor => async move { actor.last_message_hashes.clone() }))
176            .await
177    }
178}
179
180impl GossipReceiverActor {
181    async fn run(&mut self, rx: Receiver<Action<GossipReceiverActor>>) -> Result<()> {
182        tracing::debug!("GossipReceiver: starting gossip receiver actor");
183        loop {
184            tokio::select! {
185                result = rx.recv_async() => {
186                    match result {
187                        Ok(action) => action(self).await,
188                        Err(_) => break Ok(()),
189                    }
190                }
191                raw_event = self.gossip_receiver.next() => {
192                    let event = match raw_event {
193                        None => {
194                            tracing::debug!("GossipReceiver: gossip receiver closed");
195                            self.join_channel_sender.send(None).ok();
196                            self.next_channel_sender.send(None).ok();
197                            self.cancel_token.cancel();
198                            break Ok(());
199                        }
200                        Some(Err(err)) => {
201                            tracing::warn!("GossipReceiver: error receiving gossip event: {err}");
202                            self.next_channel_sender.send(None).ok();
203                            self.join_channel_sender.send(None).ok();
204                            self.cancel_token.cancel();
205                            break Ok(());
206                        }
207                        Some(Ok(ref event)) => {
208                            match event {
209                                iroh_gossip::api::Event::Received(msg) => {
210                                    tracing::debug!("GossipReceiver: received message from {:?}", msg.delivered_from);
211                                    let mut hash = sha2::Sha512::new();
212                                    hash.update(&msg.content);
213                                    if let Ok(lmh) = hash.finalize()[..32].try_into() {
214                                        if self.last_message_hashes.len() == MAX_MESSAGE_HASHES {
215                                            self.last_message_hashes.pop_front();
216                                        }
217                                        self.last_message_hashes.push_back(lmh);
218                                    }
219                                    self.join_channel_sender.send(Some(())).ok();
220                                }
221                                iroh_gossip::api::Event::NeighborUp(pub_key) => {
222                                    tracing::debug!("GossipReceiver: neighbor UP: {}", pub_key);
223                                    self.join_channel_sender.send(Some(())).ok();
224                                }
225                                iroh_gossip::api::Event::NeighborDown(pub_key) => {
226                                    tracing::debug!("GossipReceiver: neighbor DOWN: {}", pub_key);
227                                }
228                                iroh_gossip::api::Event::Lagged => {
229                                    tracing::debug!("GossipReceiver: event stream lagged");
230                                }
231                            };
232                            event.clone()
233                        }
234                    };
235
236                    self.next_channel_sender.send(Some(event)).ok();
237                }
238                _ = self.cancel_token.cancelled() => {
239                    self.join_channel_sender.send(None).ok();
240                    self.next_channel_sender.send(None).ok();
241                    break Ok(());
242                }
243                else => break Ok(()),
244            }
245        }
246    }
247}