distributed_topic_tracker/gossip/
sender.rs

1use crate::{
2    actor::{Action, Actor, Handle}
3};
4use anyhow::Result;
5use rand::seq::SliceRandom;
6
7#[derive(Debug, Clone)]
8pub struct GossipSender {
9    api: Handle<GossipSenderActor>,
10    _gossip: iroh_gossip::net::Gossip,
11}
12
13#[derive(Debug)]
14pub struct GossipSenderActor {
15    rx: tokio::sync::mpsc::Receiver<Action<GossipSenderActor>>,
16    gossip_sender: iroh_gossip::api::GossipSender,
17    _gossip: iroh_gossip::net::Gossip,
18}
19
20impl Actor for GossipSenderActor {
21    async fn run(&mut self) -> Result<()> {
22        loop {
23            tokio::select! {
24                Some(action) = self.rx.recv() => {
25                    action(self).await;
26                }
27                _ = tokio::signal::ctrl_c() => {
28                    break;
29                }
30            }
31        }
32        Ok(())
33    }
34}
35
36impl GossipSenderActor {
37    async fn broadcast(&mut self, data: Vec<u8>) -> Result<()> {
38        self.gossip_sender
39            .broadcast(data.into())
40            .await
41            .map_err(|e| anyhow::anyhow!(e))
42    }
43
44    async fn broadcast_neighbors(&mut self, data: Vec<u8>) -> Result<()> {
45        self.gossip_sender
46            .broadcast_neighbors(data.into())
47            .await
48            .map_err(|e| anyhow::anyhow!(e))
49    }
50
51    async fn join_peers(&mut self, peers: Vec<iroh::NodeId>) -> Result<()> {
52        self.gossip_sender
53            .join_peers(peers)
54            .await
55            .map_err(|e| anyhow::anyhow!(e))
56    }
57}
58
59impl GossipSender {
60    pub fn new(
61        gossip_sender: iroh_gossip::api::GossipSender,
62        gossip: iroh_gossip::net::Gossip,
63    ) -> Self {
64        let (api, rx) = Handle::channel(1024);
65        tokio::spawn({
66            let gossip = gossip.clone();
67            async move {
68                let mut actor = GossipSenderActor {
69                    rx,
70                    gossip_sender,
71                    _gossip: gossip.clone(),
72                };
73                let _ = actor.run().await;
74            }
75        });
76
77        Self {
78            api,
79            _gossip: gossip,
80        }
81    }
82
83    pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
84        self.api
85            .call(move |actor| Box::pin(actor.broadcast(data)))
86            .await
87    }
88
89    pub async fn broadcast_neighbors(&self, data: Vec<u8>) -> Result<()> {
90        self.api
91            .call(move |actor| Box::pin(actor.broadcast_neighbors(data)))
92            .await
93    }
94
95    pub async fn join_peers(
96        &self,
97        peers: Vec<iroh::NodeId>,
98        max_peers: Option<usize>,
99    ) -> Result<()> {
100        let mut peers = peers;
101        if let Some(max_peers) = max_peers {
102            peers.shuffle(&mut rand::thread_rng());
103            peers.truncate(max_peers);
104        }
105
106        self.api
107            .call(move |actor| Box::pin(actor.join_peers(peers)))
108            .await
109    }
110}