distributed_topic_tracker/gossip/
sender.rs1use std::sync::Arc;
4
5use actor_helper::{Handle, act};
6use anyhow::Result;
7use iroh::EndpointId;
8use rand::seq::SliceRandom;
9
10use crate::{TimeoutConfig, Topic};
11
12#[derive(Debug, Clone)]
17pub struct GossipSender {
18 api: Handle<GossipSenderActor, anyhow::Error>,
19 pub(crate) _topic_keep_alive: Option<Arc<Topic>>,
20 pub(crate) timeout_config: TimeoutConfig,
21}
22
23#[derive(Debug)]
25pub struct GossipSenderActor {
26 gossip_sender: iroh_gossip::api::GossipSender,
27}
28
29impl GossipSender {
30 pub fn new(gossip_sender: iroh_gossip::api::GossipSender, timeout_config: TimeoutConfig) -> Self {
32 let api = Handle::spawn(GossipSenderActor { gossip_sender }).0;
33
34 Self {
35 api,
36 _topic_keep_alive: None,
37 timeout_config,
38 }
39 }
40
41 pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
43 tracing::debug!("GossipSender: broadcasting message ({} bytes)", data.len());
44 let broadcast_timeout = self.timeout_config.broadcast_timeout();
45 self.api
46 .call(act!(actor => async move {
47 tokio::time::timeout(
48 broadcast_timeout,
49 actor.gossip_sender.broadcast(data.into())
50 ).await
51 .map_err(|_| anyhow::anyhow!("broadcast timed out"))?
52 .map_err(|e| anyhow::anyhow!(e))
53 }))
54 .await
55 }
56
57 pub async fn broadcast_neighbors(&self, data: Vec<u8>) -> Result<()> {
59 tracing::debug!(
60 "GossipSender: broadcasting to neighbors ({} bytes)",
61 data.len()
62 );
63 let broadcast_neighbors_timeout = self.timeout_config.broadcast_neighbors_timeout();
64 self.api
65 .call(act!(actor => async move {
66 tokio::time::timeout(
67 broadcast_neighbors_timeout,
68 actor.gossip_sender.broadcast_neighbors(data.into())
69 ).await
70 .map_err(|_| anyhow::anyhow!("broadcast_neighbors timed out"))?
71 .map_err(|e| anyhow::anyhow!(e))
72 }))
73 .await
74 }
75
76 pub async fn join_peers(&self, peers: Vec<EndpointId>, max_peers: Option<usize>) -> Result<()> {
83 let mut peers = peers;
84 if let Some(max_peers) = max_peers {
85 peers.shuffle(&mut rand::rng());
86 peers.truncate(max_peers);
87 }
88
89 tracing::debug!("GossipSender: joining {} peers", peers.len());
90
91 let join_peers_timeout = self.timeout_config.join_peer_timeout();
92 self.api
93 .call(act!(actor => async move {
94 tokio::time::timeout(
95 join_peers_timeout,
96 actor.gossip_sender.join_peers(peers)
97 ).await
98 .map_err(|_| anyhow::anyhow!("join_peers timed out"))?
99 .map_err(|e| anyhow::anyhow!(e))
100 }))
101 .await
102 }
103}