distributed_topic_tracker/gossip/
sender.rs1use actor_helper::{Action, Actor, Handle, act};
2use anyhow::Result;
3use rand::seq::SliceRandom;
4
5#[derive(Debug, Clone)]
6pub struct GossipSender {
7 api: Handle<GossipSenderActor>,
8 _gossip: iroh_gossip::net::Gossip,
9}
10
11#[derive(Debug)]
12pub struct GossipSenderActor {
13 rx: tokio::sync::mpsc::Receiver<Action<GossipSenderActor>>,
14 gossip_sender: iroh_gossip::api::GossipSender,
15 _gossip: iroh_gossip::net::Gossip,
16}
17
18impl GossipSender {
19 pub fn new(
20 gossip_sender: iroh_gossip::api::GossipSender,
21 gossip: iroh_gossip::net::Gossip,
22 ) -> Self {
23 let (api, rx) = Handle::channel(1024);
24 tokio::spawn({
25 let gossip = gossip.clone();
26 async move {
27 let mut actor = GossipSenderActor {
28 rx,
29 gossip_sender,
30 _gossip: gossip.clone(),
31 };
32 let _ = actor.run().await;
33 }
34 });
35
36 Self {
37 api,
38 _gossip: gossip,
39 }
40 }
41
42 pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
43 self.api
44 .call(act!(actor => async move {
45 actor.gossip_sender
46 .broadcast(data.into()).await.map_err(|e| anyhow::anyhow!(e))
47 }))
48 .await
49 }
50
51 pub async fn broadcast_neighbors(&self, data: Vec<u8>) -> Result<()> {
52 self.api
53 .call(act!(actor => async move {
54 actor.gossip_sender.broadcast_neighbors(data.into()).await.map_err(|e| anyhow::anyhow!(e))
55 }))
56 .await
57 }
58
59 pub async fn join_peers(
60 &self,
61 peers: Vec<iroh::NodeId>,
62 max_peers: Option<usize>,
63 ) -> Result<()> {
64 let mut peers = peers;
65 if let Some(max_peers) = max_peers {
66 peers.shuffle(&mut rand::thread_rng());
67 peers.truncate(max_peers);
68 }
69
70 self.api
71 .call(act!(actor => async move {
72 actor.gossip_sender
73 .join_peers(peers)
74 .await
75 .map_err(|e| anyhow::anyhow!(e))
76 })).await
77 }
78}
79
80impl Actor for GossipSenderActor {
81 async fn run(&mut self) -> Result<()> {
82 loop {
83 tokio::select! {
84 Some(action) = self.rx.recv() => {
85 action(self).await;
86 }
87 _ = tokio::signal::ctrl_c() => {
88 break;
89 }
90 }
91 }
92 Ok(())
93 }
94}