pub struct GossipSender { /* private fields */ }Expand description
Gossip sender that broadcasts messages to peers.
Provides methods for broadcasting to all peers or just direct neighbors, with peer joining capabilities for topology management.
Implementations§
Source§impl GossipSender
impl GossipSender
Sourcepub fn new(gossip_sender: GossipSender, timeout_config: TimeoutConfig) -> Self
pub fn new(gossip_sender: GossipSender, timeout_config: TimeoutConfig) -> Self
Create a new gossip sender from an iroh topic sender.
Sourcepub async fn broadcast(&self, data: Vec<u8>) -> Result<()>
pub async fn broadcast(&self, data: Vec<u8>) -> Result<()>
Broadcast a message to all peers in the topic.
Examples found in repository?
examples/e2e_test.rs (line 55)
11async fn main() -> Result<()> {
12 // Generate a new random secret key
13 let secret_key = SecretKey::generate(&mut rand::rng());
14 let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes());
15
16 // Set up endpoint with address lookup enabled
17 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
18 .secret_key(secret_key.clone())
19 .bind()
20 .await?;
21
22 // Initialize gossip with auto-discovery
23 let gossip = Gossip::builder().spawn(endpoint.clone());
24
25 // Set up protocol router
26 let _router = iroh::protocol::Router::builder(endpoint.clone())
27 .accept(iroh_gossip::ALPN, gossip.clone())
28 .spawn();
29
30 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
31 let initial_secret = b"my-initial-secret".to_vec();
32
33 let record_publisher = RecordPublisher::new(
34 topic_id.clone(),
35 signing_key.clone(),
36 None,
37 initial_secret,
38 Config::default(),
39 );
40 let (gossip_sender, mut gossip_receiver) = gossip
41 .subscribe_and_join_with_auto_discovery(record_publisher)
42 .await?
43 .split()
44 .await?;
45
46 tokio::spawn(async move {
47 while let Ok(event) = gossip_receiver.next().await {
48 println!("event: {event:?}");
49 }
50 println!("\nGossip receiver stream ended");
51 });
52
53 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
54 gossip_sender
55 .broadcast(format!("hi from {}", endpoint.id()).into())
56 .await?;
57
58 println!("[joined topic]");
59
60 tokio::time::sleep(std::time::Duration::from_secs(10)).await;
61
62 println!("[finished]");
63
64 // successfully joined
65 // exit with code 0
66 Ok(())
67}More examples
examples/secret_rotation.rs (line 90)
30async fn main() -> Result<()> {
31 // Generate a new random secret key
32 let secret_key = SecretKey::generate(&mut rand::rng());
33 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
34
35 // Set up endpoint with discovery enabled
36 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
37 .secret_key(secret_key.clone())
38 .bind()
39 .await?;
40
41 // Initialize gossip with auto-discovery
42 let gossip = Gossip::builder().spawn(endpoint.clone());
43
44 // Set up protocol router
45 let _router = iroh::protocol::Router::builder(endpoint.clone())
46 .accept(iroh_gossip::ALPN, gossip.clone())
47 .spawn();
48
49 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
50 let initial_secret = b"my-initial-secret".to_vec();
51
52 let record_publisher = RecordPublisher::new(
53 topic_id.clone(),
54 signing_key.clone(),
55 Some(RotationHandle::new(MySecretRotation)),
56 initial_secret,
57 Config::default(),
58 );
59 let (gossip_sender, mut gossip_receiver) = gossip
60 .subscribe_and_join_with_auto_discovery(record_publisher)
61 .await?
62 .split()
63 .await?;
64
65 println!("Joined topic");
66
67 // Spawn listener for incoming messages
68 tokio::spawn(async move {
69 while let Ok(event) = gossip_receiver.next().await {
70 if let Event::Received(msg) = event {
71 println!(
72 "\nMessage from {}: {}",
73 &msg.delivered_from.to_string()[0..8],
74 String::from_utf8(msg.content.to_vec()).unwrap()
75 );
76 } else if let Event::NeighborUp(peer) = event {
77 println!("\nJoined by {}", &peer.to_string()[0..8]);
78 }
79 }
80 println!("\nGossip receiver stream ended");
81 });
82
83 // Main input loop for sending messages
84 let mut buffer = String::new();
85 let stdin = std::io::stdin();
86 loop {
87 print!("\n> ");
88 stdin.read_line(&mut buffer).unwrap();
89 gossip_sender
90 .broadcast(buffer.clone().replace("\n", "").into())
91 .await
92 .unwrap();
93 println!(" - (sent)");
94 buffer.clear();
95 }
96}examples/chat_no_wait.rs (lines 81-86)
12async fn main() -> Result<()> {
13 tracing_subscriber::fmt()
14 .with_thread_ids(true)
15 .with_ansi(true)
16 .with_env_filter(
17 EnvFilter::try_from_default_env()
18 .unwrap_or_else(|_| EnvFilter::new("distributed_topic_tracker=debug")),
19 )
20 .init();
21
22 // Generate a new random secret key
23 let secret_key = SecretKey::generate(&mut rand::rng());
24 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
25
26 // Set up endpoint with discovery enabled
27 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
28 .secret_key(secret_key.clone())
29 .bind()
30 .await?;
31
32 // Initialize gossip with auto-discovery
33 let gossip = Gossip::builder().spawn(endpoint.clone());
34
35 // Set up protocol router
36 let _router = iroh::protocol::Router::builder(endpoint.clone())
37 .accept(iroh_gossip::ALPN, gossip.clone())
38 .spawn();
39
40 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
41 let initial_secret = b"my-initial-secret".to_vec();
42
43 let record_publisher = RecordPublisher::new(
44 topic_id.clone(),
45 signing_key.clone(),
46 None,
47 initial_secret,
48 Config::default(),
49 );
50 let (gossip_sender, mut gossip_receiver) = gossip
51 .subscribe_and_join_with_auto_discovery_no_wait(record_publisher)
52 .await?
53 .split()
54 .await?;
55
56 println!("Joined topic");
57
58 // Spawn listener for incoming messages
59 tokio::spawn(async move {
60 while let Ok(event) = gossip_receiver.next().await {
61 if let Event::Received(msg) = event {
62 println!(
63 "\nMessage from {}: {}",
64 &msg.delivered_from.to_string()[0..8],
65 String::from_utf8(msg.content.to_vec()).unwrap()
66 );
67 } else if let Event::NeighborUp(peer) = event {
68 println!("\nJoined by {}", &peer.to_string()[0..8]);
69 }
70 }
71 println!("\nGossip receiver stream ended");
72 });
73
74 // Main input loop for sending messages
75 let mut buffer = String::new();
76 let stdin = std::io::stdin();
77 loop {
78 print!("\n> ");
79 stdin.read_line(&mut buffer).unwrap();
80 gossip_sender
81 .broadcast(
82 buffer
83 .trim_end_matches(&['\r', '\n'][..])
84 .as_bytes()
85 .to_vec(),
86 )
87 .await
88 .unwrap();
89 println!(" - (sent)");
90 buffer.clear();
91 }
92}examples/chat.rs (lines 93-98)
13async fn main() -> Result<()> {
14 // tracing init - only show distributed_topic_tracker logs
15 use tracing_subscriber::filter::EnvFilter;
16
17 tracing_subscriber::fmt()
18 .with_thread_ids(true)
19 .with_ansi(true)
20 .with_env_filter(
21 EnvFilter::try_from_default_env()
22 .unwrap_or_else(|_| EnvFilter::new("distributed_topic_tracker=debug")),
23 )
24 .init();
25
26 // Generate a new random secret key
27 let secret_key = SecretKey::generate(&mut rand::rng());
28 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
29
30 // Set up endpoint with discovery enabled
31 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
32 .secret_key(secret_key.clone())
33 .bind()
34 .await?;
35
36 // Initialize gossip with auto-discovery
37 let gossip = Gossip::builder().spawn(endpoint.clone());
38
39 // Set up protocol router
40 let _router = iroh::protocol::Router::builder(endpoint.clone())
41 .accept(iroh_gossip::ALPN, gossip.clone())
42 .spawn();
43
44 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
45 let initial_secret = b"my-initial-secret".to_vec();
46
47 let record_publisher = RecordPublisher::new(
48 topic_id.clone(),
49 signing_key.clone(),
50 None,
51 initial_secret,
52 Config::builder()
53 .bootstrap_config(
54 BootstrapConfig::builder()
55 .check_older_records_first_on_startup(false)
56 .build(),
57 )
58 .build(),
59 );
60
61 // Split into sink (sending) and stream (receiving)
62 let (gossip_sender, mut gossip_receiver) = gossip
63 .subscribe_and_join_with_auto_discovery(record_publisher)
64 .await?
65 .split()
66 .await?;
67
68 println!("Joined topic");
69
70 // Spawn listener for incoming messages
71 tokio::spawn(async move {
72 while let Ok(event) = gossip_receiver.next().await {
73 if let Event::Received(msg) = event {
74 println!(
75 "\nMessage from {}: {}",
76 &msg.delivered_from.to_string()[0..8],
77 String::from_utf8(msg.content.to_vec()).unwrap()
78 );
79 } else if let Event::NeighborUp(peer) = event {
80 println!("\nJoined by {}", &peer.to_string()[0..8]);
81 }
82 }
83 println!("\nGossip receiver stream ended");
84 });
85
86 // Main input loop for sending messages
87 let mut buffer = String::new();
88 let stdin = std::io::stdin();
89 loop {
90 print!("\n> ");
91 stdin.read_line(&mut buffer).unwrap();
92 gossip_sender
93 .broadcast(
94 buffer
95 .trim_end_matches(&['\r', '\n'][..])
96 .as_bytes()
97 .to_vec(),
98 )
99 .await
100 .unwrap();
101 println!(" - (sent)");
102 buffer.clear();
103 }
104}Sourcepub async fn broadcast_neighbors(&self, data: Vec<u8>) -> Result<()>
pub async fn broadcast_neighbors(&self, data: Vec<u8>) -> Result<()>
Broadcast a message only to direct neighbors.
Sourcepub async fn join_peers(
&self,
peers: Vec<EndpointId>,
max_peers: Option<usize>,
) -> Result<()>
pub async fn join_peers( &self, peers: Vec<EndpointId>, max_peers: Option<usize>, ) -> Result<()>
Join specific peer nodes.
§Arguments
peers- List of node IDs to joinmax_peers- Optional maximum number of peers to join (randomly selected if exceeded)
Trait Implementations§
Source§impl Clone for GossipSender
impl Clone for GossipSender
Source§fn clone(&self) -> GossipSender
fn clone(&self) -> GossipSender
Returns a duplicate of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moreAuto Trait Implementations§
impl Freeze for GossipSender
impl RefUnwindSafe for GossipSender
impl Send for GossipSender
impl Sync for GossipSender
impl Unpin for GossipSender
impl UnsafeUnpin for GossipSender
impl UnwindSafe for GossipSender
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more