Skip to main content

distributed_topic_tracker/gossip/
sender.rs

1//! Actor-based wrapper for iroh-gossip broadcast operations.
2
3use std::sync::Arc;
4
5use actor_helper::{Handle, act};
6use anyhow::Result;
7use iroh::EndpointId;
8use rand::seq::SliceRandom;
9
10use crate::{TimeoutConfig, Topic};
11
12/// Gossip sender that broadcasts messages to peers.
13///
14/// Provides methods for broadcasting to all peers or just direct neighbors,
15/// with peer joining capabilities for topology management.
16#[derive(Debug, Clone)]
17pub struct GossipSender {
18    api: Handle<GossipSenderActor, anyhow::Error>,
19    pub(crate) _topic_keep_alive: Option<Arc<Topic>>,
20    pub(crate) timeout_config: TimeoutConfig,
21}
22
23/// Internal actor for gossip send operations.
24#[derive(Debug)]
25pub struct GossipSenderActor {
26    gossip_sender: iroh_gossip::api::GossipSender,
27}
28
29impl GossipSender {
30    /// Create a new gossip sender from an iroh topic sender.
31    pub fn new(gossip_sender: iroh_gossip::api::GossipSender, timeout_config: TimeoutConfig) -> Self {
32        let api = Handle::spawn(GossipSenderActor { gossip_sender }).0;
33
34        Self {
35            api,
36            _topic_keep_alive: None,
37            timeout_config,
38        }
39    }
40
41    /// Broadcast a message to all peers in the topic.
42    pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
43        tracing::debug!("GossipSender: broadcasting message ({} bytes)", data.len());
44        let broadcast_timeout = self.timeout_config.broadcast_timeout();
45        self.api
46            .call(act!(actor => async move {
47                tokio::time::timeout(
48                    broadcast_timeout,
49                    actor.gossip_sender.broadcast(data.into())
50                ).await
51                .map_err(|_| anyhow::anyhow!("broadcast timed out"))?
52                .map_err(|e| anyhow::anyhow!(e))
53            }))
54            .await
55    }
56
57    /// Broadcast a message only to direct neighbors.
58    pub async fn broadcast_neighbors(&self, data: Vec<u8>) -> Result<()> {
59        tracing::debug!(
60            "GossipSender: broadcasting to neighbors ({} bytes)",
61            data.len()
62        );
63        let broadcast_neighbors_timeout = self.timeout_config.broadcast_neighbors_timeout();
64        self.api
65            .call(act!(actor => async move {
66                tokio::time::timeout(
67                    broadcast_neighbors_timeout,
68                    actor.gossip_sender.broadcast_neighbors(data.into())
69                ).await
70                .map_err(|_| anyhow::anyhow!("broadcast_neighbors timed out"))?
71                .map_err(|e| anyhow::anyhow!(e))
72            }))
73            .await
74    }
75
76    /// Join specific peer nodes.
77    ///
78    /// # Arguments
79    ///
80    /// * `peers` - List of node IDs to join
81    /// * `max_peers` - Optional maximum number of peers to join (randomly selected if exceeded)
82    pub async fn join_peers(&self, peers: Vec<EndpointId>, max_peers: Option<usize>) -> Result<()> {
83        let mut peers = peers;
84        if let Some(max_peers) = max_peers {
85            peers.shuffle(&mut rand::rng());
86            peers.truncate(max_peers);
87        }
88
89        tracing::debug!("GossipSender: joining {} peers", peers.len());
90
91        let join_peers_timeout = self.timeout_config.join_peer_timeout();
92        self.api
93            .call(act!(actor => async move {
94                tokio::time::timeout(
95                    join_peers_timeout,
96                    actor.gossip_sender.join_peers(peers)
97                ).await
98                .map_err(|_| anyhow::anyhow!("join_peers timed out"))?
99                .map_err(|e| anyhow::anyhow!(e))
100            }))
101            .await
102    }
103}