distributed_topic_tracker/gossip/
sender.rs

1use crate::{
2    actor::{Action, Actor, Handle}
3};
4use anyhow::Result;
5use rand::seq::SliceRandom;
6
7#[derive(Debug, Clone)]
8pub struct GossipSender {
9    api: Handle<GossipSenderActor>,
10    _gossip: iroh_gossip::net::Gossip,
11}
12
13#[derive(Debug)]
14pub struct GossipSenderActor {
15    rx: tokio::sync::mpsc::Receiver<Action<GossipSenderActor>>,
16    gossip_sender: iroh_gossip::api::GossipSender,
17    _gossip: iroh_gossip::net::Gossip,
18}
19
20impl Actor for GossipSenderActor {
21    async fn run(&mut self) -> Result<()> {
22        loop {
23            tokio::select! {
24                Some(action) = self.rx.recv() => {
25                    action(self).await;
26                }
27                _ = tokio::signal::ctrl_c() => {
28                    break;
29                }
30            }
31        }
32        Ok(())
33    }
34}
35
36impl GossipSenderActor {
37    async fn broadcast(&mut self, data: Vec<u8>) -> Result<()> {
38        self.gossip_sender
39            .broadcast(data.into())
40            .await
41            .map_err(|e| anyhow::anyhow!(e))
42    }
43
44    async fn join_peers(&mut self, peers: Vec<iroh::NodeId>) -> Result<()> {
45        self.gossip_sender
46            .join_peers(peers)
47            .await
48            .map_err(|e| anyhow::anyhow!(e))
49    }
50}
51
52impl GossipSender {
53    pub fn new(
54        gossip_sender: iroh_gossip::api::GossipSender,
55        gossip: iroh_gossip::net::Gossip,
56    ) -> Self {
57        let (api, rx) = Handle::channel(1024);
58        tokio::spawn({
59            let gossip = gossip.clone();
60            async move {
61                let mut actor = GossipSenderActor {
62                    rx,
63                    gossip_sender,
64                    _gossip: gossip.clone(),
65                };
66                let _ = actor.run().await;
67            }
68        });
69
70        Self {
71            api,
72            _gossip: gossip,
73        }
74    }
75
76    pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
77        self.api
78            .call(move |actor| Box::pin(actor.broadcast(data)))
79            .await
80    }
81
82    pub async fn join_peers(
83        &self,
84        peers: Vec<iroh::NodeId>,
85        max_peers: Option<usize>,
86    ) -> Result<()> {
87        let mut peers = peers;
88        if let Some(max_peers) = max_peers {
89            peers.shuffle(&mut rand::thread_rng());
90            peers.truncate(max_peers);
91        }
92
93        self.api
94            .call(move |actor| Box::pin(actor.join_peers(peers)))
95            .await
96    }
97}