distributed_topic_tracker/gossip/
sender.rs1use actor_helper::{Action, Actor, Handle, Receiver, act};
4use anyhow::Result;
5use iroh::EndpointId;
6use rand::seq::SliceRandom;
7
8#[derive(Debug, Clone)]
13pub struct GossipSender {
14 api: Handle<GossipSenderActor, anyhow::Error>,
15 _gossip: iroh_gossip::net::Gossip,
16}
17
18#[derive(Debug)]
19pub struct GossipSenderActor {
20 rx: Receiver<Action<GossipSenderActor>>,
21 gossip_sender: iroh_gossip::api::GossipSender,
22 _gossip: iroh_gossip::net::Gossip,
23}
24
25impl GossipSender {
26 pub fn new(
28 gossip_sender: iroh_gossip::api::GossipSender,
29 gossip: iroh_gossip::net::Gossip,
30 ) -> Self {
31 let (api, rx) = Handle::channel();
32 tokio::spawn({
33 let gossip = gossip.clone();
34 async move {
35 let mut actor = GossipSenderActor {
36 rx,
37 gossip_sender,
38 _gossip: gossip.clone(),
39 };
40 let _ = actor.run().await;
41 }
42 });
43
44 Self {
45 api,
46 _gossip: gossip,
47 }
48 }
49
50 pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
52 tracing::debug!("GossipSender: broadcasting message ({} bytes)", data.len());
53 self.api
54 .call(act!(actor => async move {
55 actor.gossip_sender
56 .broadcast(data.into()).await.map_err(|e| anyhow::anyhow!(e))
57 }))
58 .await
59 }
60
61 pub async fn broadcast_neighbors(&self, data: Vec<u8>) -> Result<()> {
63 tracing::debug!(
64 "GossipSender: broadcasting to neighbors ({} bytes)",
65 data.len()
66 );
67 self.api
68 .call(act!(actor => async move {
69 actor.gossip_sender.broadcast_neighbors(data.into()).await.map_err(|e| anyhow::anyhow!(e))
70 }))
71 .await
72 }
73
74 pub async fn join_peers(&self, peers: Vec<EndpointId>, max_peers: Option<usize>) -> Result<()> {
81 let mut peers = peers;
82 if let Some(max_peers) = max_peers {
83 peers.shuffle(&mut rand::rng());
84 peers.truncate(max_peers);
85 }
86
87 tracing::debug!("GossipSender: joining {} peers", peers.len());
88
89 self.api
90 .call(act!(actor => async move {
91 actor.gossip_sender
92 .join_peers(peers)
93 .await
94 .map_err(|e| anyhow::anyhow!(e))
95 }))
96 .await
97 }
98}
99
100impl Actor<anyhow::Error> for GossipSenderActor {
101 async fn run(&mut self) -> Result<()> {
102 loop {
103 tokio::select! {
104 Ok(action) = self.rx.recv_async() => {
105 action(self).await;
106 }
107 _ = tokio::signal::ctrl_c() => {
108 break;
109 }
110 }
111 }
112 Ok(())
113 }
114}