distributed_topic_tracker/gossip/
receiver.rs1use std::collections::{HashSet, VecDeque};
2
3use actor_helper::{Action, Actor, Handle, act, act_ok};
4use anyhow::Result;
5use futures_lite::StreamExt;
6use sha2::Digest;
7
8#[derive(Debug, Clone)]
9pub struct GossipReceiver {
10 api: Handle<GossipReceiverActor>,
11 _gossip: iroh_gossip::net::Gossip,
12}
13
14#[derive(Debug)]
15pub struct GossipReceiverActor {
16 rx: tokio::sync::mpsc::Receiver<Action<GossipReceiverActor>>,
17 gossip_receiver: iroh_gossip::api::GossipReceiver,
18 last_message_hashes: Vec<[u8; 32]>,
19 msg_queue: VecDeque<Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>>,
20 waiters: VecDeque<
21 tokio::sync::oneshot::Sender<
22 Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>,
23 >,
24 >,
25 _gossip: iroh_gossip::net::Gossip,
26}
27
28impl GossipReceiver {
29 pub fn new(
30 gossip_receiver: iroh_gossip::api::GossipReceiver,
31 gossip: iroh_gossip::net::Gossip,
32 ) -> Self {
33 let (api, rx) = Handle::channel(1024);
34 tokio::spawn({
35 let gossip = gossip.clone();
36 async move {
37 let mut actor = GossipReceiverActor {
38 rx,
39 gossip_receiver,
40 last_message_hashes: Vec::new(),
41 msg_queue: VecDeque::new(),
42 waiters: VecDeque::new(),
43 _gossip: gossip.clone(),
44 };
45 let _ = actor.run().await;
46 }
47 });
48
49 Self {
50 api,
51 _gossip: gossip.clone(),
52 }
53 }
54
55 pub async fn neighbors(&self) -> HashSet<iroh::NodeId> {
56 self.api
57 .call(act_ok!(actor => async move {
58 actor.gossip_receiver.neighbors().collect::<HashSet<iroh::NodeId>>()
59 }))
60 .await
61 .expect("actor stopped")
62 }
63
64 pub async fn is_joined(&self) -> bool {
65 self.api
66 .call(act_ok!(actor => async move { actor.gossip_receiver.is_joined() }))
67 .await
68 .expect("actor stopped")
69 }
70
71 pub async fn next(
72 &self,
73 ) -> Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>> {
74 let (tx, rx) = tokio::sync::oneshot::channel();
75 self.api
76 .call(act!(actor => actor.register_next(tx)))
77 .await
78 .ok()?;
79 rx.await.ok()?
80 }
81
82 pub async fn last_message_hashes(&self) -> Vec<[u8; 32]> {
83 self.api
84 .call(act_ok!(actor => async move { actor.last_message_hashes.clone() }))
85 .await
86 .expect("void")
87 }
88}
89
90impl Actor for GossipReceiverActor {
91 async fn run(&mut self) -> Result<()> {
92 loop {
93 tokio::select! {
94 Some(action) = self.rx.recv() => {
95 action(self).await;
96 }
97 raw_event = self.gossip_receiver.next() => {
98 self.msg_queue.push_front(raw_event);
99
100 if let Some(waiter) = self.waiters.pop_back() {
101 if let Some(event) = self.msg_queue.pop_back() {
102 let _ = waiter.send(event);
103 } else {
104 let _ = waiter.send(None);
105 }
107 }
108 if let Some(Some(Ok(event))) = self.msg_queue.front() {
109 match event {
110 iroh_gossip::api::Event::Received(msg) => {
111 let mut hash = sha2::Sha512::new();
112 hash.update(msg.content.clone());
113 if let Ok(lmh) = hash.finalize()[..32].try_into() {
114 self.last_message_hashes.push(lmh);
115 }
116 }
117 _ => {}
118 }
119 }
120 }
121 _ = tokio::signal::ctrl_c() => {
122 break;
123 }
124 }
125 }
126 Ok(())
127 }
128}
129
130impl GossipReceiverActor {
131 pub async fn register_next(
132 &mut self,
133 waiter: tokio::sync::oneshot::Sender<
134 Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>,
135 >,
136 ) -> Result<()> {
137 if let Some(event) = self.msg_queue.pop_back() {
138 let _ = waiter.send(event);
139 } else {
140 self.waiters.push_front(waiter);
141 }
142 Ok(())
143 }
144}