distributed_topic_tracker/gossip/
receiver.rs

1use std::collections::{HashSet, VecDeque};
2
3use crate::actor::{Action, Actor, Handle};
4use anyhow::Result;
5use futures::StreamExt;
6use sha2::Digest;
7
8#[derive(Debug, Clone)]
9pub struct GossipReceiver {
10    api: Handle<GossipReceiverActor>,
11    _gossip: iroh_gossip::net::Gossip,
12}
13
14#[derive(Debug)]
15pub struct GossipReceiverActor {
16    rx: tokio::sync::mpsc::Receiver<Action<GossipReceiverActor>>,
17    gossip_receiver: iroh_gossip::api::GossipReceiver,
18    last_message_hashes: Vec<[u8; 32]>,
19    msg_queue: VecDeque<Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>>,
20    waiters: VecDeque<tokio::sync::oneshot::Sender<Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>>>,
21    _gossip: iroh_gossip::net::Gossip,
22}
23
24impl Actor for GossipReceiverActor {
25    async fn run(&mut self) -> Result<()> {
26        loop {
27            tokio::select! {
28                Some(action) = self.rx.recv() => {
29                    action(self).await;
30                }
31                raw_event = self.gossip_receiver.next() => {
32                    self.msg_queue.push_front(raw_event);
33
34                    if let Some(waiter) = self.waiters.pop_back() {
35                        if let Some(event) = self.msg_queue.pop_back() {
36                            let _ = waiter.send(event);
37                        } else {
38                            let _ = waiter.send(None);
39                            // this should never happen
40                        }
41                    }
42                    if let Some(Some(Ok(event))) = self.msg_queue.front() {
43                        match event {
44                            iroh_gossip::api::Event::Received(msg) => {
45                                let mut hash = sha2::Sha512::new();
46                                hash.update(msg.content.clone());
47                                if let Ok(lmh) = hash.finalize()[..32].try_into() {
48                                    self.last_message_hashes.push(lmh);
49                                }
50                            }
51                            _ => {}
52                        }
53                    }
54                }
55                _ = tokio::signal::ctrl_c() => {
56                    break;
57                }
58            }
59        }
60        Ok(())
61    }
62}
63
64impl GossipReceiverActor {
65    pub fn neighbors(&mut self) -> HashSet<iroh::NodeId> {
66        self.gossip_receiver
67            .neighbors()
68            .collect::<HashSet<iroh::NodeId>>()
69    }
70
71    pub fn is_joined(&mut self) -> bool {
72        self.gossip_receiver.is_joined()
73    }
74
75    pub async fn register_next(
76        &mut self,
77        waiter: tokio::sync::oneshot::Sender<Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>>,
78    ) -> Result<()> {
79        if let Some(event) = self.msg_queue.pop_back() {
80            let _ = waiter.send(event);
81        } else {
82            self.waiters.push_front(waiter);
83        }
84        Ok(())
85    }
86
87    pub fn last_message_hashes(&self) -> Vec<[u8; 32]> {
88        self.last_message_hashes.clone()
89    }
90}
91
92impl GossipReceiver {
93    pub fn new(
94        gossip_receiver: iroh_gossip::api::GossipReceiver,
95        gossip: iroh_gossip::net::Gossip,
96    ) -> Self {
97        let (api, rx) = Handle::channel(1024);
98        tokio::spawn({
99            let gossip = gossip.clone();
100            async move {
101                let mut actor = GossipReceiverActor {
102                    rx,
103                    gossip_receiver,
104                    last_message_hashes: Vec::new(),
105                    msg_queue: VecDeque::new(),
106                    waiters: VecDeque::new(),
107                    _gossip: gossip.clone(),
108                };
109                let _ = actor.run().await;
110            }
111        });
112
113        Self {
114            api,
115            _gossip: gossip.clone(),
116        }
117    }
118
119    pub async fn neighbors(&self) -> HashSet<iroh::NodeId> {
120        self.api
121            .call(move |actor| Box::pin(async move { Ok(actor.neighbors()) }))
122            .await
123            .expect("void")
124    }
125
126    pub async fn is_joined(&self) -> bool {
127        self.api
128            .call(move |actor| Box::pin(async move { Ok(actor.is_joined()) }))
129            .await
130            .expect("void")
131    }
132
133    pub async fn next(&self) -> Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>> {
134        let (tx, rx) = tokio::sync::oneshot::channel();
135        // Enqueue request
136        self.api
137            .call(move |actor| {
138                Box::pin(async move {
139                    actor.register_next(tx).await?;
140                    Ok(())
141                })
142            })
143            .await.ok()?;
144        // Await delivery
145        rx.await.ok()?
146    }
147
148    pub async fn last_message_hashes(&self) -> Vec<[u8; 32]> {
149        self.api
150            .call(move |actor| Box::pin(async move { Ok(actor.last_message_hashes()) }))
151            .await
152            .expect("void")
153    }
154}