distributed_topic_tracker/gossip/
receiver.rs1use std::collections::{HashSet, VecDeque};
4
5use actor_helper::{Action, Actor, Handle, Receiver, act, act_ok};
6use anyhow::Result;
7use futures_lite::StreamExt;
8use sha2::Digest;
9
10#[derive(Debug, Clone)]
15pub struct GossipReceiver {
16 api: Handle<GossipReceiverActor, anyhow::Error>,
17 _gossip: iroh_gossip::net::Gossip,
18}
19
20#[derive(Debug)]
21pub struct GossipReceiverActor {
22 rx: Receiver<Action<GossipReceiverActor>>,
23 gossip_receiver: iroh_gossip::api::GossipReceiver,
24 last_message_hashes: Vec<[u8; 32]>,
25 msg_queue: VecDeque<Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>>,
26 waiters: VecDeque<
27 tokio::sync::oneshot::Sender<
28 Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>,
29 >,
30 >,
31 _gossip: iroh_gossip::net::Gossip,
32}
33
34impl GossipReceiver {
35 pub fn new(
37 gossip_receiver: iroh_gossip::api::GossipReceiver,
38 gossip: iroh_gossip::net::Gossip,
39 ) -> Self {
40 let (api, rx) = Handle::channel();
41 tokio::spawn({
42 let gossip = gossip.clone();
43 async move {
44 let mut actor = GossipReceiverActor {
45 rx,
46 gossip_receiver,
47 last_message_hashes: Vec::new(),
48 msg_queue: VecDeque::new(),
49 waiters: VecDeque::new(),
50 _gossip: gossip.clone(),
51 };
52 let _ = actor.run().await;
53 }
54 });
55
56 Self {
57 api,
58 _gossip: gossip.clone(),
59 }
60 }
61
62 pub async fn neighbors(&self) -> HashSet<iroh::NodeId> {
64 self.api
65 .call(act_ok!(actor => async move {
66 actor.gossip_receiver.neighbors().collect::<HashSet<iroh::NodeId>>()
67 }))
68 .await
69 .expect("actor stopped")
70 }
71
72 pub async fn is_joined(&self) -> bool {
74 self.api
75 .call(act_ok!(actor => async move { actor.gossip_receiver.is_joined() }))
76 .await
77 .expect("actor stopped")
78 }
79
80 pub async fn next(
84 &self,
85 ) -> Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>> {
86 let (tx, rx) = tokio::sync::oneshot::channel();
87 self.api
88 .call(act!(actor => actor.register_next(tx)))
89 .await
90 .ok()?;
91 rx.await.ok()?
92 }
93
94 pub async fn last_message_hashes(&self) -> Vec<[u8; 32]> {
98 self.api
99 .call(act_ok!(actor => async move { actor.last_message_hashes.clone() }))
100 .await
101 .expect("void")
102 }
103}
104
105impl Actor<anyhow::Error> for GossipReceiverActor {
106 async fn run(&mut self) -> Result<()> {
107 tracing::debug!("GossipReceiver: starting gossip receiver actor");
108 loop {
109 tokio::select! {
110 Ok(action) = self.rx.recv_async() => {
111 action(self).await;
112 }
113 raw_event = self.gossip_receiver.next() => {
114 self.msg_queue.push_front(raw_event);
115
116 if let Some(waiter) = self.waiters.pop_back() {
117 if let Some(event) = self.msg_queue.pop_back() {
118 let _ = waiter.send(event);
119 } else {
120 let _ = waiter.send(None);
121 }
123 }
124 if let Some(Some(Ok(event))) = self.msg_queue.front() {
125 match event {
126 iroh_gossip::api::Event::Received(msg) => {
127 tracing::debug!("GossipReceiver: received message from {:?}", msg.delivered_from);
128 let mut hash = sha2::Sha512::new();
129 hash.update(msg.content.clone());
130 if let Ok(lmh) = hash.finalize()[..32].try_into() {
131 self.last_message_hashes.push(lmh);
132 }
133 }
134 iroh_gossip::api::Event::NeighborUp(node_id) => {
135 tracing::debug!("GossipReceiver: neighbor UP: {}", node_id);
136 }
137 iroh_gossip::api::Event::NeighborDown(node_id) => {
138 tracing::debug!("GossipReceiver: neighbor DOWN: {}", node_id);
139 }
140 iroh_gossip::api::Event::Lagged => {
141 tracing::debug!("GossipReceiver: event stream lagged");
142 }
143 }
144 }
145 }
146 _ = tokio::signal::ctrl_c() => {
147 break;
148 }
149 }
150 }
151 Ok(())
152 }
153}
154
155impl GossipReceiverActor {
156 pub async fn register_next(
157 &mut self,
158 waiter: tokio::sync::oneshot::Sender<
159 Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>,
160 >,
161 ) -> Result<()> {
162 if let Some(event) = self.msg_queue.pop_back() {
163 let _ = waiter.send(event);
164 } else {
165 self.waiters.push_front(waiter);
166 }
167 Ok(())
168 }
169}