distributed_topic_tracker/gossip/
sender.rs

1//! Actor-based wrapper for iroh-gossip broadcast operations.
2
3use actor_helper::{Action, Actor, Handle, Receiver, act};
4use anyhow::Result;
5use rand::seq::SliceRandom;
6
7/// Gossip sender that broadcasts messages to peers.
8///
9/// Provides methods for broadcasting to all peers or just direct neighbors,
10/// with peer joining capabilities for topology management.
11#[derive(Debug, Clone)]
12pub struct GossipSender {
13    api: Handle<GossipSenderActor, anyhow::Error>,
14    _gossip: iroh_gossip::net::Gossip,
15}
16
17#[derive(Debug)]
18pub struct GossipSenderActor {
19    rx: Receiver<Action<GossipSenderActor>>,
20    gossip_sender: iroh_gossip::api::GossipSender,
21    _gossip: iroh_gossip::net::Gossip,
22}
23
24impl GossipSender {
25    /// Create a new gossip sender from an iroh topic sender.
26    pub fn new(
27        gossip_sender: iroh_gossip::api::GossipSender,
28        gossip: iroh_gossip::net::Gossip,
29    ) -> Self {
30        let (api, rx) = Handle::channel();
31        tokio::spawn({
32            let gossip = gossip.clone();
33            async move {
34                let mut actor = GossipSenderActor {
35                    rx,
36                    gossip_sender,
37                    _gossip: gossip.clone(),
38                };
39                let _ = actor.run().await;
40            }
41        });
42
43        Self {
44            api,
45            _gossip: gossip,
46        }
47    }
48
49    /// Broadcast a message to all peers in the topic.
50    pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
51        tracing::debug!("GossipSender: broadcasting message ({} bytes)", data.len());
52        self.api
53            .call(act!(actor => async move {
54                    actor.gossip_sender
55                .broadcast(data.into()).await.map_err(|e| anyhow::anyhow!(e))
56            }))
57            .await
58    }
59
60    /// Broadcast a message only to direct neighbors.
61    pub async fn broadcast_neighbors(&self, data: Vec<u8>) -> Result<()> {
62        tracing::debug!("GossipSender: broadcasting to neighbors ({} bytes)", data.len());
63        self.api
64            .call(act!(actor => async move {
65                actor.gossip_sender.broadcast_neighbors(data.into()).await.map_err(|e| anyhow::anyhow!(e))
66            }))
67            .await
68    }
69
70    /// Join specific peer nodes.
71    ///
72    /// # Arguments
73    ///
74    /// * `peers` - List of node IDs to join
75    /// * `max_peers` - Optional maximum number of peers to join (randomly selected if exceeded)
76    pub async fn join_peers(
77        &self,
78        peers: Vec<iroh::NodeId>,
79        max_peers: Option<usize>,
80    ) -> Result<()> {
81        let mut peers = peers;
82        if let Some(max_peers) = max_peers {
83            peers.shuffle(&mut rand::rng());
84            peers.truncate(max_peers);
85        }
86
87        tracing::debug!("GossipSender: joining {} peers", peers.len());
88
89        self.api
90            .call(act!(actor => async move {
91                    actor.gossip_sender
92                .join_peers(peers)
93                .await
94                .map_err(|e| anyhow::anyhow!(e))
95            }))
96            .await
97    }
98}
99
100impl Actor<anyhow::Error> for GossipSenderActor {
101    async fn run(&mut self) -> Result<()> {
102        loop {
103            tokio::select! {
104                Ok(action) = self.rx.recv_async() => {
105                    action(self).await;
106                }
107                _ = tokio::signal::ctrl_c() => {
108                    break;
109                }
110            }
111        }
112        Ok(())
113    }
114}