distributed_topic_tracker/gossip/
sender.rs

1use actor_helper::{Action, Actor, Handle, act};
2use anyhow::Result;
3use rand::seq::SliceRandom;
4
5#[derive(Debug, Clone)]
6pub struct GossipSender {
7    api: Handle<GossipSenderActor>,
8    _gossip: iroh_gossip::net::Gossip,
9}
10
11#[derive(Debug)]
12pub struct GossipSenderActor {
13    rx: tokio::sync::mpsc::Receiver<Action<GossipSenderActor>>,
14    gossip_sender: iroh_gossip::api::GossipSender,
15    _gossip: iroh_gossip::net::Gossip,
16}
17
18impl GossipSender {
19    pub fn new(
20        gossip_sender: iroh_gossip::api::GossipSender,
21        gossip: iroh_gossip::net::Gossip,
22    ) -> Self {
23        let (api, rx) = Handle::channel(1024);
24        tokio::spawn({
25            let gossip = gossip.clone();
26            async move {
27                let mut actor = GossipSenderActor {
28                    rx,
29                    gossip_sender,
30                    _gossip: gossip.clone(),
31                };
32                let _ = actor.run().await;
33            }
34        });
35
36        Self {
37            api,
38            _gossip: gossip,
39        }
40    }
41
42    pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
43        self.api
44            .call(act!(actor => async move {
45                        actor.gossip_sender
46                    .broadcast(data.into()).await.map_err(|e| anyhow::anyhow!(e))
47                }))
48            .await
49    }
50
51    pub async fn broadcast_neighbors(&self, data: Vec<u8>) -> Result<()> {
52        self.api
53            .call(act!(actor => async move {
54                actor.gossip_sender.broadcast_neighbors(data.into()).await.map_err(|e| anyhow::anyhow!(e))
55            }))
56            .await
57    }
58
59    pub async fn join_peers(
60        &self,
61        peers: Vec<iroh::NodeId>,
62        max_peers: Option<usize>,
63    ) -> Result<()> {
64        let mut peers = peers;
65        if let Some(max_peers) = max_peers {
66            peers.shuffle(&mut rand::thread_rng());
67            peers.truncate(max_peers);
68        }
69
70        self.api
71            .call(act!(actor => async move {
72                actor.gossip_sender
73            .join_peers(peers)
74            .await
75            .map_err(|e| anyhow::anyhow!(e))
76        })).await
77    }
78}
79
80impl Actor for GossipSenderActor {
81    async fn run(&mut self) -> Result<()> {
82        loop {
83            tokio::select! {
84                Some(action) = self.rx.recv() => {
85                    action(self).await;
86                }
87                _ = tokio::signal::ctrl_c() => {
88                    break;
89                }
90            }
91        }
92        Ok(())
93    }
94}