distributed_topic_tracker/gossip/
receiver.rs

1//! Actor-based wrapper for iroh-gossip message receiving.
2
3use std::collections::{HashSet, VecDeque};
4
5use actor_helper::{Action, Actor, Handle, Receiver, act, act_ok};
6use anyhow::Result;
7use futures_lite::StreamExt;
8use sha2::Digest;
9
10/// Gossip receiver that collects incoming messages and neighbor info.
11///
12/// Tracks SHA512 message hashes (first 32 bytes) for overlap detection and provides
13/// neighbor list for topology analysis.
14#[derive(Debug, Clone)]
15pub struct GossipReceiver {
16    api: Handle<GossipReceiverActor, anyhow::Error>,
17    _gossip: iroh_gossip::net::Gossip,
18}
19
20#[derive(Debug)]
21pub struct GossipReceiverActor {
22    rx: Receiver<Action<GossipReceiverActor>>,
23    gossip_receiver: iroh_gossip::api::GossipReceiver,
24    last_message_hashes: Vec<[u8; 32]>,
25    msg_queue: VecDeque<Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>>,
26    waiters: VecDeque<
27        tokio::sync::oneshot::Sender<
28            Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>,
29        >,
30    >,
31    _gossip: iroh_gossip::net::Gossip,
32}
33
34impl GossipReceiver {
35    /// Create a new gossip receiver from an iroh topic receiver.
36    pub fn new(
37        gossip_receiver: iroh_gossip::api::GossipReceiver,
38        gossip: iroh_gossip::net::Gossip,
39    ) -> Self {
40        let (api, rx) = Handle::channel();
41        tokio::spawn({
42            let gossip = gossip.clone();
43            async move {
44                let mut actor = GossipReceiverActor {
45                    rx,
46                    gossip_receiver,
47                    last_message_hashes: Vec::new(),
48                    msg_queue: VecDeque::new(),
49                    waiters: VecDeque::new(),
50                    _gossip: gossip.clone(),
51                };
52                let _ = actor.run().await;
53            }
54        });
55
56        Self {
57            api,
58            _gossip: gossip.clone(),
59        }
60    }
61
62    /// Get the set of currently connected neighbor node IDs.
63    pub async fn neighbors(&self) -> HashSet<iroh::NodeId> {
64        self.api
65            .call(act_ok!(actor => async move {
66                actor.gossip_receiver.neighbors().collect::<HashSet<iroh::NodeId>>()
67            }))
68            .await
69            .expect("actor stopped")
70    }
71
72    /// Check if the local node has joined the topic.
73    pub async fn is_joined(&self) -> bool {
74        self.api
75            .call(act_ok!(actor => async move { actor.gossip_receiver.is_joined() }))
76            .await
77            .expect("actor stopped")
78    }
79
80    /// Receive the next gossip event.
81    ///
82    /// Returns `None` if the receiver is closed.
83    pub async fn next(
84        &self,
85    ) -> Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>> {
86        let (tx, rx) = tokio::sync::oneshot::channel();
87        self.api
88            .call(act!(actor => actor.register_next(tx)))
89            .await
90            .ok()?;
91        rx.await.ok()?
92    }
93
94    /// Get SHA512 hashes (first 32 bytes) of recently received messages.
95    ///
96    /// Used for detecting message overlap during network partition recovery.
97    pub async fn last_message_hashes(&self) -> Vec<[u8; 32]> {
98        self.api
99            .call(act_ok!(actor => async move { actor.last_message_hashes.clone() }))
100            .await
101            .expect("void")
102    }
103}
104
105impl Actor<anyhow::Error> for GossipReceiverActor {
106    async fn run(&mut self) -> Result<()> {
107        tracing::debug!("GossipReceiver: starting gossip receiver actor");
108        loop {
109            tokio::select! {
110                Ok(action) = self.rx.recv_async() => {
111                    action(self).await;
112                }
113                raw_event = self.gossip_receiver.next() => {
114                    self.msg_queue.push_front(raw_event);
115
116                    if let Some(waiter) = self.waiters.pop_back() {
117                        if let Some(event) = self.msg_queue.pop_back() {
118                            let _ = waiter.send(event);
119                        } else {
120                            let _ = waiter.send(None);
121                            // this should never happen
122                        }
123                    }
124                    if let Some(Some(Ok(event))) = self.msg_queue.front() {
125                        match event {
126                            iroh_gossip::api::Event::Received(msg) => {
127                                tracing::debug!("GossipReceiver: received message from {:?}", msg.delivered_from);
128                                let mut hash = sha2::Sha512::new();
129                                hash.update(msg.content.clone());
130                                if let Ok(lmh) = hash.finalize()[..32].try_into() {
131                                    self.last_message_hashes.push(lmh);
132                                }
133                            }
134                            iroh_gossip::api::Event::NeighborUp(node_id) => {
135                                tracing::debug!("GossipReceiver: neighbor UP: {}", node_id);
136                            }
137                            iroh_gossip::api::Event::NeighborDown(node_id) => {
138                                tracing::debug!("GossipReceiver: neighbor DOWN: {}", node_id);
139                            }
140                            iroh_gossip::api::Event::Lagged => {
141                                tracing::debug!("GossipReceiver: event stream lagged");
142                            }
143                        }
144                    }
145                }
146                _ = tokio::signal::ctrl_c() => {
147                    break;
148                }
149            }
150        }
151        Ok(())
152    }
153}
154
155impl GossipReceiverActor {
156    pub async fn register_next(
157        &mut self,
158        waiter: tokio::sync::oneshot::Sender<
159            Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>,
160        >,
161    ) -> Result<()> {
162        if let Some(event) = self.msg_queue.pop_back() {
163            let _ = waiter.send(event);
164        } else {
165            self.waiters.push_front(waiter);
166        }
167        Ok(())
168    }
169}