distributed_topic_tracker/gossip/
receiver.rs

1//! Actor-based wrapper for iroh-gossip message receiving.
2
3use std::collections::{HashSet, VecDeque};
4
5use actor_helper::{Action, Actor, Handle, Receiver, act, act_ok};
6use anyhow::Result;
7use futures_lite::StreamExt;
8use iroh::EndpointId;
9use sha2::Digest;
10
11/// Gossip receiver that collects incoming messages and neighbor info.
12///
13/// Tracks SHA512 message hashes (first 32 bytes) for overlap detection and provides
14/// neighbor list for topology analysis.
15#[derive(Debug, Clone)]
16pub struct GossipReceiver {
17    api: Handle<GossipReceiverActor, anyhow::Error>,
18    _gossip: iroh_gossip::net::Gossip,
19}
20
21#[derive(Debug)]
22pub struct GossipReceiverActor {
23    rx: Receiver<Action<GossipReceiverActor>>,
24    gossip_receiver: iroh_gossip::api::GossipReceiver,
25    last_message_hashes: Vec<[u8; 32]>,
26    msg_queue: VecDeque<Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>>,
27    waiters: VecDeque<
28        tokio::sync::oneshot::Sender<
29            Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>,
30        >,
31    >,
32    _gossip: iroh_gossip::net::Gossip,
33}
34
35impl GossipReceiver {
36    /// Create a new gossip receiver from an iroh topic receiver.
37    pub fn new(
38        gossip_receiver: iroh_gossip::api::GossipReceiver,
39        gossip: iroh_gossip::net::Gossip,
40    ) -> Self {
41        let (api, rx) = Handle::channel();
42        tokio::spawn({
43            let gossip = gossip.clone();
44            async move {
45                let mut actor = GossipReceiverActor {
46                    rx,
47                    gossip_receiver,
48                    last_message_hashes: Vec::new(),
49                    msg_queue: VecDeque::new(),
50                    waiters: VecDeque::new(),
51                    _gossip: gossip.clone(),
52                };
53                let _ = actor.run().await;
54            }
55        });
56
57        Self {
58            api,
59            _gossip: gossip.clone(),
60        }
61    }
62
63    /// Get the set of currently connected neighbor node IDs.
64    pub async fn neighbors(&self) -> HashSet<EndpointId> {
65        self.api
66            .call(act_ok!(actor => async move {
67                actor.gossip_receiver.neighbors().collect::<HashSet<EndpointId>>()
68            }))
69            .await
70            .expect("actor stopped")
71    }
72
73    /// Check if the local node has joined the topic.
74    pub async fn is_joined(&self) -> bool {
75        self.api
76            .call(act_ok!(actor => async move { actor.gossip_receiver.is_joined() }))
77            .await
78            .expect("actor stopped")
79    }
80
81    /// Receive the next gossip event.
82    ///
83    /// Returns `None` if the receiver is closed.
84    pub async fn next(
85        &self,
86    ) -> Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>> {
87        let (tx, rx) = tokio::sync::oneshot::channel();
88        self.api
89            .call(act!(actor => actor.register_next(tx)))
90            .await
91            .ok()?;
92        rx.await.ok()?
93    }
94
95    /// Get SHA512 hashes (first 32 bytes) of recently received messages.
96    ///
97    /// Used for detecting message overlap during network partition recovery.
98    pub async fn last_message_hashes(&self) -> Vec<[u8; 32]> {
99        self.api
100            .call(act_ok!(actor => async move { actor.last_message_hashes.clone() }))
101            .await
102            .expect("void")
103    }
104}
105
106impl Actor<anyhow::Error> for GossipReceiverActor {
107    async fn run(&mut self) -> Result<()> {
108        tracing::debug!("GossipReceiver: starting gossip receiver actor");
109        loop {
110            tokio::select! {
111                Ok(action) = self.rx.recv_async() => {
112                    action(self).await;
113                }
114                raw_event = self.gossip_receiver.next() => {
115                    self.msg_queue.push_front(raw_event);
116
117                    if let Some(waiter) = self.waiters.pop_back() {
118                        if let Some(event) = self.msg_queue.pop_back() {
119                            let _ = waiter.send(event);
120                        } else {
121                            let _ = waiter.send(None);
122                            // this should never happen
123                        }
124                    }
125                    if let Some(Some(Ok(event))) = self.msg_queue.front() {
126                        match event {
127                            iroh_gossip::api::Event::Received(msg) => {
128                                tracing::debug!("GossipReceiver: received message from {:?}", msg.delivered_from);
129                                let mut hash = sha2::Sha512::new();
130                                hash.update(msg.content.clone());
131                                if let Ok(lmh) = hash.finalize()[..32].try_into() {
132                                    self.last_message_hashes.push(lmh);
133                                }
134                            }
135                            iroh_gossip::api::Event::NeighborUp(node_id) => {
136                                tracing::debug!("GossipReceiver: neighbor UP: {}", node_id);
137                            }
138                            iroh_gossip::api::Event::NeighborDown(node_id) => {
139                                tracing::debug!("GossipReceiver: neighbor DOWN: {}", node_id);
140                            }
141                            iroh_gossip::api::Event::Lagged => {
142                                tracing::debug!("GossipReceiver: event stream lagged");
143                            }
144                        }
145                    }
146                }
147                _ = tokio::signal::ctrl_c() => {
148                    break;
149                }
150            }
151        }
152        Ok(())
153    }
154}
155
156impl GossipReceiverActor {
157    pub async fn register_next(
158        &mut self,
159        waiter: tokio::sync::oneshot::Sender<
160            Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>,
161        >,
162    ) -> Result<()> {
163        if let Some(event) = self.msg_queue.pop_back() {
164            let _ = waiter.send(event);
165        } else {
166            self.waiters.push_front(waiter);
167        }
168        Ok(())
169    }
170}