distributed_topic_tracker/gossip/
sender.rs1use crate::{
2 actor::{Action, Actor, Handle}
3};
4use anyhow::Result;
5use rand::seq::SliceRandom;
6
7#[derive(Debug, Clone)]
8pub struct GossipSender {
9 api: Handle<GossipSenderActor>,
10 _gossip: iroh_gossip::net::Gossip,
11}
12
13#[derive(Debug)]
14pub struct GossipSenderActor {
15 rx: tokio::sync::mpsc::Receiver<Action<GossipSenderActor>>,
16 gossip_sender: iroh_gossip::api::GossipSender,
17 _gossip: iroh_gossip::net::Gossip,
18}
19
20impl Actor for GossipSenderActor {
21 async fn run(&mut self) -> Result<()> {
22 loop {
23 tokio::select! {
24 Some(action) = self.rx.recv() => {
25 action(self).await;
26 }
27 _ = tokio::signal::ctrl_c() => {
28 break;
29 }
30 }
31 }
32 Ok(())
33 }
34}
35
36impl GossipSenderActor {
37 async fn broadcast(&mut self, data: Vec<u8>) -> Result<()> {
38 self.gossip_sender
39 .broadcast(data.into())
40 .await
41 .map_err(|e| anyhow::anyhow!(e))
42 }
43
44 async fn broadcast_neighbors(&mut self, data: Vec<u8>) -> Result<()> {
45 self.gossip_sender
46 .broadcast_neighbors(data.into())
47 .await
48 .map_err(|e| anyhow::anyhow!(e))
49 }
50
51 async fn join_peers(&mut self, peers: Vec<iroh::NodeId>) -> Result<()> {
52 self.gossip_sender
53 .join_peers(peers)
54 .await
55 .map_err(|e| anyhow::anyhow!(e))
56 }
57}
58
59impl GossipSender {
60 pub fn new(
61 gossip_sender: iroh_gossip::api::GossipSender,
62 gossip: iroh_gossip::net::Gossip,
63 ) -> Self {
64 let (api, rx) = Handle::channel(1024);
65 tokio::spawn({
66 let gossip = gossip.clone();
67 async move {
68 let mut actor = GossipSenderActor {
69 rx,
70 gossip_sender,
71 _gossip: gossip.clone(),
72 };
73 let _ = actor.run().await;
74 }
75 });
76
77 Self {
78 api,
79 _gossip: gossip,
80 }
81 }
82
83 pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
84 self.api
85 .call(move |actor| Box::pin(actor.broadcast(data)))
86 .await
87 }
88
89 pub async fn broadcast_neighbors(&self, data: Vec<u8>) -> Result<()> {
90 self.api
91 .call(move |actor| Box::pin(actor.broadcast_neighbors(data)))
92 .await
93 }
94
95 pub async fn join_peers(
96 &self,
97 peers: Vec<iroh::NodeId>,
98 max_peers: Option<usize>,
99 ) -> Result<()> {
100 let mut peers = peers;
101 if let Some(max_peers) = max_peers {
102 peers.shuffle(&mut rand::thread_rng());
103 peers.truncate(max_peers);
104 }
105
106 self.api
107 .call(move |actor| Box::pin(actor.join_peers(peers)))
108 .await
109 }
110}