distributed_topic_tracker/gossip/
sender.rs1use crate::{
2 actor::{Action, Actor, Handle}
3};
4use anyhow::Result;
5use rand::seq::SliceRandom;
6
7#[derive(Debug, Clone)]
8pub struct GossipSender {
9 api: Handle<GossipSenderActor>,
10 _gossip: iroh_gossip::net::Gossip,
11}
12
13#[derive(Debug)]
14pub struct GossipSenderActor {
15 rx: tokio::sync::mpsc::Receiver<Action<GossipSenderActor>>,
16 gossip_sender: iroh_gossip::api::GossipSender,
17 _gossip: iroh_gossip::net::Gossip,
18}
19
20impl Actor for GossipSenderActor {
21 async fn run(&mut self) -> Result<()> {
22 loop {
23 tokio::select! {
24 Some(action) = self.rx.recv() => {
25 action(self).await;
26 }
27 _ = tokio::signal::ctrl_c() => {
28 break;
29 }
30 }
31 }
32 Ok(())
33 }
34}
35
36impl GossipSenderActor {
37 async fn broadcast(&mut self, data: Vec<u8>) -> Result<()> {
38 self.gossip_sender
39 .broadcast(data.into())
40 .await
41 .map_err(|e| anyhow::anyhow!(e))
42 }
43
44 async fn join_peers(&mut self, peers: Vec<iroh::NodeId>) -> Result<()> {
45 self.gossip_sender
46 .join_peers(peers)
47 .await
48 .map_err(|e| anyhow::anyhow!(e))
49 }
50}
51
52impl GossipSender {
53 pub fn new(
54 gossip_sender: iroh_gossip::api::GossipSender,
55 gossip: iroh_gossip::net::Gossip,
56 ) -> Self {
57 let (api, rx) = Handle::channel(1024);
58 tokio::spawn({
59 let gossip = gossip.clone();
60 async move {
61 let mut actor = GossipSenderActor {
62 rx,
63 gossip_sender,
64 _gossip: gossip.clone(),
65 };
66 let _ = actor.run().await;
67 }
68 });
69
70 Self {
71 api,
72 _gossip: gossip,
73 }
74 }
75
76 pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
77 self.api
78 .call(move |actor| Box::pin(actor.broadcast(data)))
79 .await
80 }
81
82 pub async fn join_peers(
83 &self,
84 peers: Vec<iroh::NodeId>,
85 max_peers: Option<usize>,
86 ) -> Result<()> {
87 let mut peers = peers;
88 if let Some(max_peers) = max_peers {
89 peers.shuffle(&mut rand::thread_rng());
90 peers.truncate(max_peers);
91 }
92
93 self.api
94 .call(move |actor| Box::pin(actor.join_peers(peers)))
95 .await
96 }
97}