distributed_topic_tracker/gossip/
receiver.rs

1use std::collections::{HashSet, VecDeque};
2
3use actor_helper::{Action, Actor, Handle, act, act_ok};
4use anyhow::Result;
5use futures_lite::StreamExt;
6use sha2::Digest;
7
8#[derive(Debug, Clone)]
9pub struct GossipReceiver {
10    api: Handle<GossipReceiverActor>,
11    _gossip: iroh_gossip::net::Gossip,
12}
13
14#[derive(Debug)]
15pub struct GossipReceiverActor {
16    rx: tokio::sync::mpsc::Receiver<Action<GossipReceiverActor>>,
17    gossip_receiver: iroh_gossip::api::GossipReceiver,
18    last_message_hashes: Vec<[u8; 32]>,
19    msg_queue: VecDeque<Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>>,
20    waiters: VecDeque<
21        tokio::sync::oneshot::Sender<
22            Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>,
23        >,
24    >,
25    _gossip: iroh_gossip::net::Gossip,
26}
27
28impl GossipReceiver {
29    pub fn new(
30        gossip_receiver: iroh_gossip::api::GossipReceiver,
31        gossip: iroh_gossip::net::Gossip,
32    ) -> Self {
33        let (api, rx) = Handle::channel(1024);
34        tokio::spawn({
35            let gossip = gossip.clone();
36            async move {
37                let mut actor = GossipReceiverActor {
38                    rx,
39                    gossip_receiver,
40                    last_message_hashes: Vec::new(),
41                    msg_queue: VecDeque::new(),
42                    waiters: VecDeque::new(),
43                    _gossip: gossip.clone(),
44                };
45                let _ = actor.run().await;
46            }
47        });
48
49        Self {
50            api,
51            _gossip: gossip.clone(),
52        }
53    }
54
55    pub async fn neighbors(&self) -> HashSet<iroh::NodeId> {
56        self.api
57            .call(act_ok!(actor => async move {
58                actor.gossip_receiver.neighbors().collect::<HashSet<iroh::NodeId>>()
59            }))
60            .await
61            .expect("actor stopped")
62    }
63
64    pub async fn is_joined(&self) -> bool {
65        self.api
66            .call(act_ok!(actor => async move { actor.gossip_receiver.is_joined() }))
67            .await
68            .expect("actor stopped")
69    }
70
71    pub async fn next(
72        &self,
73    ) -> Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>> {
74        let (tx, rx) = tokio::sync::oneshot::channel();
75        self.api
76            .call(act!(actor => actor.register_next(tx)))
77            .await
78            .ok()?;
79        rx.await.ok()?
80    }
81
82    pub async fn last_message_hashes(&self) -> Vec<[u8; 32]> {
83        self.api
84            .call(act_ok!(actor => async move { actor.last_message_hashes.clone() }))
85            .await
86            .expect("void")
87    }
88}
89
90impl Actor for GossipReceiverActor {
91    async fn run(&mut self) -> Result<()> {
92        loop {
93            tokio::select! {
94                Some(action) = self.rx.recv() => {
95                    action(self).await;
96                }
97                raw_event = self.gossip_receiver.next() => {
98                    self.msg_queue.push_front(raw_event);
99
100                    if let Some(waiter) = self.waiters.pop_back() {
101                        if let Some(event) = self.msg_queue.pop_back() {
102                            let _ = waiter.send(event);
103                        } else {
104                            let _ = waiter.send(None);
105                            // this should never happen
106                        }
107                    }
108                    if let Some(Some(Ok(event))) = self.msg_queue.front() {
109                        match event {
110                            iroh_gossip::api::Event::Received(msg) => {
111                                let mut hash = sha2::Sha512::new();
112                                hash.update(msg.content.clone());
113                                if let Ok(lmh) = hash.finalize()[..32].try_into() {
114                                    self.last_message_hashes.push(lmh);
115                                }
116                            }
117                            _ => {}
118                        }
119                    }
120                }
121                _ = tokio::signal::ctrl_c() => {
122                    break;
123                }
124            }
125        }
126        Ok(())
127    }
128}
129
130impl GossipReceiverActor {
131    pub async fn register_next(
132        &mut self,
133        waiter: tokio::sync::oneshot::Sender<
134            Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>,
135        >,
136    ) -> Result<()> {
137        if let Some(event) = self.msg_queue.pop_back() {
138            let _ = waiter.send(event);
139        } else {
140            self.waiters.push_front(waiter);
141        }
142        Ok(())
143    }
144}