distributed_topic_tracker/gossip/
receiver.rs1use std::collections::{HashSet, VecDeque};
4
5use actor_helper::{Action, Actor, Handle, Receiver, act, act_ok};
6use anyhow::Result;
7use futures_lite::StreamExt;
8use iroh::EndpointId;
9use sha2::Digest;
10
11#[derive(Debug, Clone)]
16pub struct GossipReceiver {
17 api: Handle<GossipReceiverActor, anyhow::Error>,
18 _gossip: iroh_gossip::net::Gossip,
19}
20
21#[derive(Debug)]
22pub struct GossipReceiverActor {
23 rx: Receiver<Action<GossipReceiverActor>>,
24 gossip_receiver: iroh_gossip::api::GossipReceiver,
25 last_message_hashes: Vec<[u8; 32]>,
26 msg_queue: VecDeque<Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>>,
27 waiters: VecDeque<
28 tokio::sync::oneshot::Sender<
29 Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>,
30 >,
31 >,
32 _gossip: iroh_gossip::net::Gossip,
33}
34
35impl GossipReceiver {
36 pub fn new(
38 gossip_receiver: iroh_gossip::api::GossipReceiver,
39 gossip: iroh_gossip::net::Gossip,
40 ) -> Self {
41 let (api, rx) = Handle::channel();
42 tokio::spawn({
43 let gossip = gossip.clone();
44 async move {
45 let mut actor = GossipReceiverActor {
46 rx,
47 gossip_receiver,
48 last_message_hashes: Vec::new(),
49 msg_queue: VecDeque::new(),
50 waiters: VecDeque::new(),
51 _gossip: gossip.clone(),
52 };
53 let _ = actor.run().await;
54 }
55 });
56
57 Self {
58 api,
59 _gossip: gossip.clone(),
60 }
61 }
62
63 pub async fn neighbors(&self) -> HashSet<EndpointId> {
65 self.api
66 .call(act_ok!(actor => async move {
67 actor.gossip_receiver.neighbors().collect::<HashSet<EndpointId>>()
68 }))
69 .await
70 .expect("actor stopped")
71 }
72
73 pub async fn is_joined(&self) -> bool {
75 self.api
76 .call(act_ok!(actor => async move { actor.gossip_receiver.is_joined() }))
77 .await
78 .expect("actor stopped")
79 }
80
81 pub async fn next(
85 &self,
86 ) -> Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>> {
87 let (tx, rx) = tokio::sync::oneshot::channel();
88 self.api
89 .call(act!(actor => actor.register_next(tx)))
90 .await
91 .ok()?;
92 rx.await.ok()?
93 }
94
95 pub async fn last_message_hashes(&self) -> Vec<[u8; 32]> {
99 self.api
100 .call(act_ok!(actor => async move { actor.last_message_hashes.clone() }))
101 .await
102 .expect("void")
103 }
104}
105
106impl Actor<anyhow::Error> for GossipReceiverActor {
107 async fn run(&mut self) -> Result<()> {
108 tracing::debug!("GossipReceiver: starting gossip receiver actor");
109 loop {
110 tokio::select! {
111 Ok(action) = self.rx.recv_async() => {
112 action(self).await;
113 }
114 raw_event = self.gossip_receiver.next() => {
115 self.msg_queue.push_front(raw_event);
116
117 if let Some(waiter) = self.waiters.pop_back() {
118 if let Some(event) = self.msg_queue.pop_back() {
119 let _ = waiter.send(event);
120 } else {
121 let _ = waiter.send(None);
122 }
124 }
125 if let Some(Some(Ok(event))) = self.msg_queue.front() {
126 match event {
127 iroh_gossip::api::Event::Received(msg) => {
128 tracing::debug!("GossipReceiver: received message from {:?}", msg.delivered_from);
129 let mut hash = sha2::Sha512::new();
130 hash.update(msg.content.clone());
131 if let Ok(lmh) = hash.finalize()[..32].try_into() {
132 self.last_message_hashes.push(lmh);
133 }
134 }
135 iroh_gossip::api::Event::NeighborUp(node_id) => {
136 tracing::debug!("GossipReceiver: neighbor UP: {}", node_id);
137 }
138 iroh_gossip::api::Event::NeighborDown(node_id) => {
139 tracing::debug!("GossipReceiver: neighbor DOWN: {}", node_id);
140 }
141 iroh_gossip::api::Event::Lagged => {
142 tracing::debug!("GossipReceiver: event stream lagged");
143 }
144 }
145 }
146 }
147 _ = tokio::signal::ctrl_c() => {
148 break;
149 }
150 }
151 }
152 Ok(())
153 }
154}
155
156impl GossipReceiverActor {
157 pub async fn register_next(
158 &mut self,
159 waiter: tokio::sync::oneshot::Sender<
160 Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>,
161 >,
162 ) -> Result<()> {
163 if let Some(event) = self.msg_queue.pop_back() {
164 let _ = waiter.send(event);
165 } else {
166 self.waiters.push_front(waiter);
167 }
168 Ok(())
169 }
170}