pub struct GossipReceiver { /* private fields */ }Expand description
Gossip receiver that collects incoming messages and neighbor info.
Tracks SHA512 message hashes (first 32 bytes) for overlap detection and provides neighbor list for topology analysis.
Implementations§
Source§impl GossipReceiver
impl GossipReceiver
Sourcepub fn new(gossip_receiver: GossipReceiver, gossip: Gossip) -> Self
pub fn new(gossip_receiver: GossipReceiver, gossip: Gossip) -> Self
Create a new gossip receiver from an iroh topic receiver.
Sourcepub async fn neighbors(&self) -> HashSet<NodeId>
pub async fn neighbors(&self) -> HashSet<NodeId>
Get the set of currently connected neighbor node IDs.
Sourcepub async fn next(&self) -> Option<Result<Event, ApiError>>
pub async fn next(&self) -> Option<Result<Event, ApiError>>
Receive the next gossip event.
Returns None if the receiver is closed.
Examples found in repository?
examples/e2e_test.rs (line 45)
9async fn main() -> Result<()> {
10 // Generate a new random secret key
11 let secret_key = SecretKey::generate(&mut rand::rng());
12
13 // Set up endpoint with discovery enabled
14 let endpoint = Endpoint::builder()
15 .secret_key(secret_key.clone())
16 .discovery_n0()
17 .bind()
18 .await?;
19
20 // Initialize gossip with auto-discovery
21 let gossip = Gossip::builder().spawn(endpoint.clone());
22
23 // Set up protocol router
24 let _router = iroh::protocol::Router::builder(endpoint.clone())
25 .accept(iroh_gossip::ALPN, gossip.clone())
26 .spawn();
27
28 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
29 let initial_secret = b"my-initial-secret".to_vec();
30
31 let record_publisher = RecordPublisher::new(
32 topic_id.clone(),
33 endpoint.node_id().public(),
34 secret_key.secret().clone(),
35 None,
36 initial_secret,
37 );
38 let (gossip_sender, gossip_receiver) = gossip
39 .subscribe_and_join_with_auto_discovery(record_publisher)
40 .await?
41 .split()
42 .await?;
43
44 tokio::spawn(async move {
45 while let Some(Ok(event)) = gossip_receiver.next().await {
46 println!("event: {event:?}");
47 }
48 });
49
50 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
51 gossip_sender
52 .broadcast(format!("hi from {}", endpoint.node_id()).into())
53 .await?;
54
55 println!("[joined topic]");
56
57 tokio::time::sleep(std::time::Duration::from_secs(10)).await;
58
59 println!("[finished]");
60
61 // successfully joined
62 // exit with code 0
63 Ok(())
64}More examples
examples/chat_no_wait.rs (line 48)
9async fn main() -> Result<()> {
10 // Generate a new random secret key
11 let secret_key = SecretKey::generate(&mut rand::rng());
12
13 // Set up endpoint with discovery enabled
14 let endpoint = Endpoint::builder()
15 .secret_key(secret_key.clone())
16 .discovery_n0()
17 .bind()
18 .await?;
19
20 // Initialize gossip with auto-discovery
21 let gossip = Gossip::builder().spawn(endpoint.clone());
22
23 // Set up protocol router
24 let _router = iroh::protocol::Router::builder(endpoint.clone())
25 .accept(iroh_gossip::ALPN, gossip.clone())
26 .spawn();
27
28 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
29 let initial_secret = b"my-initial-secret".to_vec();
30
31 let record_publisher = RecordPublisher::new(
32 topic_id.clone(),
33 endpoint.node_id().public(),
34 secret_key.secret().clone(),
35 None,
36 initial_secret,
37 );
38 let (gossip_sender, gossip_receiver) = gossip
39 .subscribe_and_join_with_auto_discovery_no_wait(record_publisher)
40 .await?
41 .split()
42 .await?;
43
44 println!("Joined topic");
45
46 // Spawn listener for incoming messages
47 tokio::spawn(async move {
48 while let Some(Ok(event)) = gossip_receiver.next().await {
49 if let Event::Received(msg) = event {
50 println!(
51 "\nMessage from {}: {}",
52 &msg.delivered_from.to_string()[0..8],
53 String::from_utf8(msg.content.to_vec()).unwrap()
54 );
55 } else if let Event::NeighborUp(peer) = event {
56 println!("\nJoined by {}", &peer.to_string()[0..8]);
57 }
58 }
59 });
60
61 // Main input loop for sending messages
62 let mut buffer = String::new();
63 let stdin = std::io::stdin();
64 loop {
65 print!("\n> ");
66 stdin.read_line(&mut buffer).unwrap();
67 gossip_sender
68 .broadcast(buffer.clone().replace("\n", "").into())
69 .await
70 .unwrap();
71 println!(" - (sent)");
72 buffer.clear();
73 }
74}examples/secret_rotation.rs (line 70)
29async fn main() -> Result<()> {
30 // Generate a new random secret key
31 let secret_key = SecretKey::generate(&mut rand::rng());
32
33 // Set up endpoint with discovery enabled
34 let endpoint = Endpoint::builder()
35 .secret_key(secret_key.clone())
36 .discovery_n0()
37 .bind()
38 .await?;
39
40 // Initialize gossip with auto-discovery
41 let gossip = Gossip::builder().spawn(endpoint.clone());
42
43 // Set up protocol router
44 let _router = iroh::protocol::Router::builder(endpoint.clone())
45 .accept(iroh_gossip::ALPN, gossip.clone())
46 .spawn();
47
48 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
49 let initial_secret = b"my-initial-secret".to_vec();
50
51 // Split into sink (sending) and stream (receiving)
52
53 let record_publisher = RecordPublisher::new(
54 topic_id.clone(),
55 endpoint.node_id().public(),
56 secret_key.secret().clone(),
57 Some(RotationHandle::new(MySecretRotation)),
58 initial_secret,
59 );
60 let (gossip_sender, gossip_receiver) = gossip
61 .subscribe_and_join_with_auto_discovery(record_publisher)
62 .await?
63 .split()
64 .await?;
65
66 println!("Joined topic");
67
68 // Spawn listener for incoming messages
69 tokio::spawn(async move {
70 while let Some(Ok(event)) = gossip_receiver.next().await {
71 if let Event::Received(msg) = event {
72 println!(
73 "\nMessage from {}: {}",
74 &msg.delivered_from.to_string()[0..8],
75 String::from_utf8(msg.content.to_vec()).unwrap()
76 );
77 } else if let Event::NeighborUp(peer) = event {
78 println!("\nJoined by {}", &peer.to_string()[0..8]);
79 }
80 }
81 });
82
83 // Main input loop for sending messages
84 let mut buffer = String::new();
85 let stdin = std::io::stdin();
86 loop {
87 print!("\n> ");
88 stdin.read_line(&mut buffer).unwrap();
89 gossip_sender
90 .broadcast(buffer.clone().replace("\n", "").into())
91 .await
92 .unwrap();
93 println!(" - (sent)");
94 buffer.clear();
95 }
96}examples/chat.rs (line 63)
9async fn main() -> Result<()> {
10
11 // tracing init - only show distributed_topic_tracker logs
12 use tracing_subscriber::filter::EnvFilter;
13
14 tracing_subscriber::fmt()
15 .with_thread_ids(true)
16 .with_ansi(true)
17 .with_env_filter(
18 EnvFilter::try_from_default_env()
19 .unwrap_or_else(|_| EnvFilter::new("distributed_topic_tracker=debug"))
20 )
21 .init();
22
23 // Generate a new random secret key
24 let secret_key = SecretKey::generate(&mut rand::rng());
25
26 // Set up endpoint with discovery enabled
27 let endpoint = Endpoint::builder()
28 .secret_key(secret_key.clone())
29 .discovery_n0()
30 .bind()
31 .await?;
32
33 // Initialize gossip with auto-discovery
34 let gossip = Gossip::builder().spawn(endpoint.clone());
35
36 // Set up protocol router
37 let _router = iroh::protocol::Router::builder(endpoint.clone())
38 .accept(iroh_gossip::ALPN, gossip.clone())
39 .spawn();
40
41 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
42 let initial_secret = b"my-initial-secret".to_vec();
43
44 let record_publisher = RecordPublisher::new(
45 topic_id.clone(),
46 endpoint.node_id().public(),
47 secret_key.secret().clone(),
48 None,
49 initial_secret,
50 );
51
52 // Split into sink (sending) and stream (receiving)
53 let (gossip_sender, gossip_receiver) = gossip
54 .subscribe_and_join_with_auto_discovery(record_publisher)
55 .await?
56 .split()
57 .await?;
58
59 println!("Joined topic");
60
61 // Spawn listener for incoming messages
62 tokio::spawn(async move {
63 while let Some(Ok(event)) = gossip_receiver.next().await {
64 if let Event::Received(msg) = event {
65 println!(
66 "\nMessage from {}: {}",
67 &msg.delivered_from.to_string()[0..8],
68 String::from_utf8(msg.content.to_vec()).unwrap()
69 );
70 } else if let Event::NeighborUp(peer) = event {
71 println!("\nJoined by {}", &peer.to_string()[0..8]);
72 }
73 }
74 });
75
76 // Main input loop for sending messages
77 let mut buffer = String::new();
78 let stdin = std::io::stdin();
79 loop {
80 print!("\n> ");
81 stdin.read_line(&mut buffer).unwrap();
82 gossip_sender
83 .broadcast(buffer.clone().replace("\n", "").into())
84 .await
85 .unwrap();
86 println!(" - (sent)");
87 buffer.clear();
88 }
89}Sourcepub async fn last_message_hashes(&self) -> Vec<[u8; 32]>
pub async fn last_message_hashes(&self) -> Vec<[u8; 32]>
Get SHA512 hashes (first 32 bytes) of recently received messages.
Used for detecting message overlap during network partition recovery.
Trait Implementations§
Source§impl Clone for GossipReceiver
impl Clone for GossipReceiver
Source§fn clone(&self) -> GossipReceiver
fn clone(&self) -> GossipReceiver
Returns a duplicate of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moreAuto Trait Implementations§
impl Freeze for GossipReceiver
impl !RefUnwindSafe for GossipReceiver
impl Send for GossipReceiver
impl Sync for GossipReceiver
impl Unpin for GossipReceiver
impl !UnwindSafe for GossipReceiver
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more