distributed_topic_tracker/gossip/
sender.rs1use actor_helper::{Action, Actor, Handle, Receiver, act};
4use anyhow::Result;
5use rand::seq::SliceRandom;
6
7#[derive(Debug, Clone)]
12pub struct GossipSender {
13 api: Handle<GossipSenderActor, anyhow::Error>,
14 _gossip: iroh_gossip::net::Gossip,
15}
16
17#[derive(Debug)]
18pub struct GossipSenderActor {
19 rx: Receiver<Action<GossipSenderActor>>,
20 gossip_sender: iroh_gossip::api::GossipSender,
21 _gossip: iroh_gossip::net::Gossip,
22}
23
24impl GossipSender {
25 pub fn new(
27 gossip_sender: iroh_gossip::api::GossipSender,
28 gossip: iroh_gossip::net::Gossip,
29 ) -> Self {
30 let (api, rx) = Handle::channel();
31 tokio::spawn({
32 let gossip = gossip.clone();
33 async move {
34 let mut actor = GossipSenderActor {
35 rx,
36 gossip_sender,
37 _gossip: gossip.clone(),
38 };
39 let _ = actor.run().await;
40 }
41 });
42
43 Self {
44 api,
45 _gossip: gossip,
46 }
47 }
48
49 pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
51 tracing::debug!("GossipSender: broadcasting message ({} bytes)", data.len());
52 self.api
53 .call(act!(actor => async move {
54 actor.gossip_sender
55 .broadcast(data.into()).await.map_err(|e| anyhow::anyhow!(e))
56 }))
57 .await
58 }
59
60 pub async fn broadcast_neighbors(&self, data: Vec<u8>) -> Result<()> {
62 tracing::debug!("GossipSender: broadcasting to neighbors ({} bytes)", data.len());
63 self.api
64 .call(act!(actor => async move {
65 actor.gossip_sender.broadcast_neighbors(data.into()).await.map_err(|e| anyhow::anyhow!(e))
66 }))
67 .await
68 }
69
70 pub async fn join_peers(
77 &self,
78 peers: Vec<iroh::NodeId>,
79 max_peers: Option<usize>,
80 ) -> Result<()> {
81 let mut peers = peers;
82 if let Some(max_peers) = max_peers {
83 peers.shuffle(&mut rand::rng());
84 peers.truncate(max_peers);
85 }
86
87 tracing::debug!("GossipSender: joining {} peers", peers.len());
88
89 self.api
90 .call(act!(actor => async move {
91 actor.gossip_sender
92 .join_peers(peers)
93 .await
94 .map_err(|e| anyhow::anyhow!(e))
95 }))
96 .await
97 }
98}
99
100impl Actor<anyhow::Error> for GossipSenderActor {
101 async fn run(&mut self) -> Result<()> {
102 loop {
103 tokio::select! {
104 Ok(action) = self.rx.recv_async() => {
105 action(self).await;
106 }
107 _ = tokio::signal::ctrl_c() => {
108 break;
109 }
110 }
111 }
112 Ok(())
113 }
114}