distributed_topic_tracker/gossip/
sender.rs

1//! Actor-based wrapper for iroh-gossip broadcast operations.
2
3use actor_helper::{Action, Actor, Handle, Receiver, act};
4use anyhow::Result;
5use iroh::EndpointId;
6use rand::seq::SliceRandom;
7
8/// Gossip sender that broadcasts messages to peers.
9///
10/// Provides methods for broadcasting to all peers or just direct neighbors,
11/// with peer joining capabilities for topology management.
12#[derive(Debug, Clone)]
13pub struct GossipSender {
14    api: Handle<GossipSenderActor, anyhow::Error>,
15    _gossip: iroh_gossip::net::Gossip,
16}
17
18#[derive(Debug)]
19pub struct GossipSenderActor {
20    rx: Receiver<Action<GossipSenderActor>>,
21    gossip_sender: iroh_gossip::api::GossipSender,
22    _gossip: iroh_gossip::net::Gossip,
23}
24
25impl GossipSender {
26    /// Create a new gossip sender from an iroh topic sender.
27    pub fn new(
28        gossip_sender: iroh_gossip::api::GossipSender,
29        gossip: iroh_gossip::net::Gossip,
30    ) -> Self {
31        let (api, rx) = Handle::channel();
32        tokio::spawn({
33            let gossip = gossip.clone();
34            async move {
35                let mut actor = GossipSenderActor {
36                    rx,
37                    gossip_sender,
38                    _gossip: gossip.clone(),
39                };
40                let _ = actor.run().await;
41            }
42        });
43
44        Self {
45            api,
46            _gossip: gossip,
47        }
48    }
49
50    /// Broadcast a message to all peers in the topic.
51    pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
52        tracing::debug!("GossipSender: broadcasting message ({} bytes)", data.len());
53        self.api
54            .call(act!(actor => async move {
55                    actor.gossip_sender
56                .broadcast(data.into()).await.map_err(|e| anyhow::anyhow!(e))
57            }))
58            .await
59    }
60
61    /// Broadcast a message only to direct neighbors.
62    pub async fn broadcast_neighbors(&self, data: Vec<u8>) -> Result<()> {
63        tracing::debug!(
64            "GossipSender: broadcasting to neighbors ({} bytes)",
65            data.len()
66        );
67        self.api
68            .call(act!(actor => async move {
69                actor.gossip_sender.broadcast_neighbors(data.into()).await.map_err(|e| anyhow::anyhow!(e))
70            }))
71            .await
72    }
73
74    /// Join specific peer nodes.
75    ///
76    /// # Arguments
77    ///
78    /// * `peers` - List of node IDs to join
79    /// * `max_peers` - Optional maximum number of peers to join (randomly selected if exceeded)
80    pub async fn join_peers(&self, peers: Vec<EndpointId>, max_peers: Option<usize>) -> Result<()> {
81        let mut peers = peers;
82        if let Some(max_peers) = max_peers {
83            peers.shuffle(&mut rand::rng());
84            peers.truncate(max_peers);
85        }
86
87        tracing::debug!("GossipSender: joining {} peers", peers.len());
88
89        self.api
90            .call(act!(actor => async move {
91                    actor.gossip_sender
92                .join_peers(peers)
93                .await
94                .map_err(|e| anyhow::anyhow!(e))
95            }))
96            .await
97    }
98}
99
100impl Actor<anyhow::Error> for GossipSenderActor {
101    async fn run(&mut self) -> Result<()> {
102        loop {
103            tokio::select! {
104                Ok(action) = self.rx.recv_async() => {
105                    action(self).await;
106                }
107                _ = tokio::signal::ctrl_c() => {
108                    break;
109                }
110            }
111        }
112        Ok(())
113    }
114}