distributed_topic_tracker/gossip/
receiver.rs1use std::collections::{HashSet, VecDeque};
2
3use crate::actor::{Action, Actor, Handle};
4use anyhow::Result;
5use futures::StreamExt;
6use sha2::Digest;
7
8#[derive(Debug, Clone)]
9pub struct GossipReceiver {
10 api: Handle<GossipReceiverActor>,
11 _gossip: iroh_gossip::net::Gossip,
12}
13
14#[derive(Debug)]
15pub struct GossipReceiverActor {
16 rx: tokio::sync::mpsc::Receiver<Action<GossipReceiverActor>>,
17 gossip_receiver: iroh_gossip::api::GossipReceiver,
18 last_message_hashes: Vec<[u8; 32]>,
19 msg_queue: VecDeque<Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>>,
20 waiters: VecDeque<tokio::sync::oneshot::Sender<Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>>>,
21 _gossip: iroh_gossip::net::Gossip,
22}
23
24impl Actor for GossipReceiverActor {
25 async fn run(&mut self) -> Result<()> {
26 loop {
27 tokio::select! {
28 Some(action) = self.rx.recv() => {
29 action(self).await;
30 }
31 raw_event = self.gossip_receiver.next() => {
32 self.msg_queue.push_front(raw_event);
33
34 if let Some(waiter) = self.waiters.pop_back() {
35 if let Some(event) = self.msg_queue.pop_back() {
36 let _ = waiter.send(event);
37 } else {
38 let _ = waiter.send(None);
39 }
41 }
42 if let Some(Some(Ok(event))) = self.msg_queue.front() {
43 match event {
44 iroh_gossip::api::Event::Received(msg) => {
45 let mut hash = sha2::Sha512::new();
46 hash.update(msg.content.clone());
47 if let Ok(lmh) = hash.finalize()[..32].try_into() {
48 self.last_message_hashes.push(lmh);
49 }
50 }
51 _ => {}
52 }
53 }
54 }
55 _ = tokio::signal::ctrl_c() => {
56 break;
57 }
58 }
59 }
60 Ok(())
61 }
62}
63
64impl GossipReceiverActor {
65 pub fn neighbors(&mut self) -> HashSet<iroh::NodeId> {
66 self.gossip_receiver
67 .neighbors()
68 .collect::<HashSet<iroh::NodeId>>()
69 }
70
71 pub fn is_joined(&mut self) -> bool {
72 self.gossip_receiver.is_joined()
73 }
74
75 pub async fn register_next(
76 &mut self,
77 waiter: tokio::sync::oneshot::Sender<Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>>,
78 ) -> Result<()> {
79 if let Some(event) = self.msg_queue.pop_back() {
80 let _ = waiter.send(event);
81 } else {
82 self.waiters.push_front(waiter);
83 }
84 Ok(())
85 }
86
87 pub fn last_message_hashes(&self) -> Vec<[u8; 32]> {
88 self.last_message_hashes.clone()
89 }
90}
91
92impl GossipReceiver {
93 pub fn new(
94 gossip_receiver: iroh_gossip::api::GossipReceiver,
95 gossip: iroh_gossip::net::Gossip,
96 ) -> Self {
97 let (api, rx) = Handle::channel(1024);
98 tokio::spawn({
99 let gossip = gossip.clone();
100 async move {
101 let mut actor = GossipReceiverActor {
102 rx,
103 gossip_receiver,
104 last_message_hashes: Vec::new(),
105 msg_queue: VecDeque::new(),
106 waiters: VecDeque::new(),
107 _gossip: gossip.clone(),
108 };
109 let _ = actor.run().await;
110 }
111 });
112
113 Self {
114 api,
115 _gossip: gossip.clone(),
116 }
117 }
118
119 pub async fn neighbors(&self) -> HashSet<iroh::NodeId> {
120 self.api
121 .call(move |actor| Box::pin(async move { Ok(actor.neighbors()) }))
122 .await
123 .expect("actor stopped running: neighbors call failed")
124 }
125
126 pub async fn is_joined(&self) -> bool {
127 self.api
128 .call(move |actor| Box::pin(async move { Ok(actor.is_joined()) }))
129 .await
130 .expect("actor stopped running: is_joined call failed")
131 }
132
133 pub async fn next(&self) -> Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>> {
134 let (tx, rx) = tokio::sync::oneshot::channel();
135 self.api
137 .call(move |actor| {
138 Box::pin(async move {
139 actor.register_next(tx).await?;
140 Ok(())
141 })
142 })
143 .await.ok()?;
144 rx.await.ok()?
146 }
147
148 pub async fn last_message_hashes(&self) -> Vec<[u8; 32]> {
149 self.api
150 .call(move |actor| Box::pin(async move { Ok(actor.last_message_hashes()) }))
151 .await
152 .expect("actor stopped running: last_message_hashes call failed")
153 }
154}