pub struct Topic { /* private fields */ }Expand description
Handle to a joined gossip topic with auto-discovery.
Manages bootstrap, publishing, bubble detection, and split-brain recovery. Can be split into sender and receiver for message exchange.
Implementations§
Source§impl Topic
impl Topic
Sourcepub async fn new(
record_publisher: RecordPublisher,
gossip: Gossip,
async_bootstrap: bool,
) -> Result<Self>
pub async fn new( record_publisher: RecordPublisher, gossip: Gossip, async_bootstrap: bool, ) -> Result<Self>
Create and initialize a new topic with auto-discovery.
§Arguments
record_publisher- Record publisher for DHT operationsgossip- Gossip instance for topic subscriptionasync_bootstrap- If false, awaits until bootstrap completes
Sourcepub async fn split(&self) -> Result<(GossipSender, GossipReceiver)>
pub async fn split(&self) -> Result<(GossipSender, GossipReceiver)>
Split into sender and receiver for message exchange.
Examples found in repository?
examples/simple.rs (line 49)
9async fn main() -> Result<()> {
10 // Generate a new random secret key
11 let secret_key = SecretKey::generate(&mut rand::rng());
12
13 // Set up endpoint with discovery enabled
14 let endpoint = Endpoint::builder()
15 .secret_key(secret_key.clone())
16 .discovery_n0()
17 .bind()
18 .await?;
19
20 // Initialize gossip with auto-discovery
21 let gossip = Gossip::builder().spawn(endpoint.clone());
22
23 // Set up protocol router
24 let _router = iroh::protocol::Router::builder(endpoint.clone())
25 .accept(iroh_gossip::ALPN, gossip.clone())
26 .spawn();
27
28 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
29 let initial_secret = b"my-initial-secret".to_vec();
30
31 // Split into sink (sending) and stream (receiving)
32
33 let record_publisher = RecordPublisher::new(
34 topic_id.clone(),
35 endpoint.node_id().public(),
36 secret_key.secret().clone(),
37 None,
38 initial_secret,
39 );
40
41 let topic = gossip
42 .subscribe_and_join_with_auto_discovery(record_publisher)
43 .await?;
44
45 println!("[joined topic]");
46
47 // Do something with the gossip topic
48 // (bonus: GossipSender and GossipReceiver are safely clonable)
49 let (_gossip_sender, _gossip_receiver) = topic.split().await?;
50
51 Ok(())
52}More examples
examples/e2e_test.rs (line 41)
9async fn main() -> Result<()> {
10 // Generate a new random secret key
11 let secret_key = SecretKey::generate(&mut rand::rng());
12
13 // Set up endpoint with discovery enabled
14 let endpoint = Endpoint::builder()
15 .secret_key(secret_key.clone())
16 .discovery_n0()
17 .bind()
18 .await?;
19
20 // Initialize gossip with auto-discovery
21 let gossip = Gossip::builder().spawn(endpoint.clone());
22
23 // Set up protocol router
24 let _router = iroh::protocol::Router::builder(endpoint.clone())
25 .accept(iroh_gossip::ALPN, gossip.clone())
26 .spawn();
27
28 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
29 let initial_secret = b"my-initial-secret".to_vec();
30
31 let record_publisher = RecordPublisher::new(
32 topic_id.clone(),
33 endpoint.node_id().public(),
34 secret_key.secret().clone(),
35 None,
36 initial_secret,
37 );
38 let (gossip_sender, gossip_receiver) = gossip
39 .subscribe_and_join_with_auto_discovery(record_publisher)
40 .await?
41 .split()
42 .await?;
43
44 tokio::spawn(async move {
45 while let Some(Ok(event)) = gossip_receiver.next().await {
46 println!("event: {event:?}");
47 }
48 });
49
50 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
51 gossip_sender
52 .broadcast(format!("hi from {}", endpoint.node_id()).into())
53 .await?;
54
55 println!("[joined topic]");
56
57 tokio::time::sleep(std::time::Duration::from_secs(10)).await;
58
59 println!("[finished]");
60
61 // successfully joined
62 // exit with code 0
63 Ok(())
64}examples/chat_no_wait.rs (line 41)
9async fn main() -> Result<()> {
10 // Generate a new random secret key
11 let secret_key = SecretKey::generate(&mut rand::rng());
12
13 // Set up endpoint with discovery enabled
14 let endpoint = Endpoint::builder()
15 .secret_key(secret_key.clone())
16 .discovery_n0()
17 .bind()
18 .await?;
19
20 // Initialize gossip with auto-discovery
21 let gossip = Gossip::builder().spawn(endpoint.clone());
22
23 // Set up protocol router
24 let _router = iroh::protocol::Router::builder(endpoint.clone())
25 .accept(iroh_gossip::ALPN, gossip.clone())
26 .spawn();
27
28 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
29 let initial_secret = b"my-initial-secret".to_vec();
30
31 let record_publisher = RecordPublisher::new(
32 topic_id.clone(),
33 endpoint.node_id().public(),
34 secret_key.secret().clone(),
35 None,
36 initial_secret,
37 );
38 let (gossip_sender, gossip_receiver) = gossip
39 .subscribe_and_join_with_auto_discovery_no_wait(record_publisher)
40 .await?
41 .split()
42 .await?;
43
44 println!("Joined topic");
45
46 // Spawn listener for incoming messages
47 tokio::spawn(async move {
48 while let Some(Ok(event)) = gossip_receiver.next().await {
49 if let Event::Received(msg) = event {
50 println!(
51 "\nMessage from {}: {}",
52 &msg.delivered_from.to_string()[0..8],
53 String::from_utf8(msg.content.to_vec()).unwrap()
54 );
55 } else if let Event::NeighborUp(peer) = event {
56 println!("\nJoined by {}", &peer.to_string()[0..8]);
57 }
58 }
59 });
60
61 // Main input loop for sending messages
62 let mut buffer = String::new();
63 let stdin = std::io::stdin();
64 loop {
65 print!("\n> ");
66 stdin.read_line(&mut buffer).unwrap();
67 gossip_sender
68 .broadcast(buffer.clone().replace("\n", "").into())
69 .await
70 .unwrap();
71 println!(" - (sent)");
72 buffer.clear();
73 }
74}examples/secret_rotation.rs (line 63)
29async fn main() -> Result<()> {
30 // Generate a new random secret key
31 let secret_key = SecretKey::generate(&mut rand::rng());
32
33 // Set up endpoint with discovery enabled
34 let endpoint = Endpoint::builder()
35 .secret_key(secret_key.clone())
36 .discovery_n0()
37 .bind()
38 .await?;
39
40 // Initialize gossip with auto-discovery
41 let gossip = Gossip::builder().spawn(endpoint.clone());
42
43 // Set up protocol router
44 let _router = iroh::protocol::Router::builder(endpoint.clone())
45 .accept(iroh_gossip::ALPN, gossip.clone())
46 .spawn();
47
48 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
49 let initial_secret = b"my-initial-secret".to_vec();
50
51 // Split into sink (sending) and stream (receiving)
52
53 let record_publisher = RecordPublisher::new(
54 topic_id.clone(),
55 endpoint.node_id().public(),
56 secret_key.secret().clone(),
57 Some(RotationHandle::new(MySecretRotation)),
58 initial_secret,
59 );
60 let (gossip_sender, gossip_receiver) = gossip
61 .subscribe_and_join_with_auto_discovery(record_publisher)
62 .await?
63 .split()
64 .await?;
65
66 println!("Joined topic");
67
68 // Spawn listener for incoming messages
69 tokio::spawn(async move {
70 while let Some(Ok(event)) = gossip_receiver.next().await {
71 if let Event::Received(msg) = event {
72 println!(
73 "\nMessage from {}: {}",
74 &msg.delivered_from.to_string()[0..8],
75 String::from_utf8(msg.content.to_vec()).unwrap()
76 );
77 } else if let Event::NeighborUp(peer) = event {
78 println!("\nJoined by {}", &peer.to_string()[0..8]);
79 }
80 }
81 });
82
83 // Main input loop for sending messages
84 let mut buffer = String::new();
85 let stdin = std::io::stdin();
86 loop {
87 print!("\n> ");
88 stdin.read_line(&mut buffer).unwrap();
89 gossip_sender
90 .broadcast(buffer.clone().replace("\n", "").into())
91 .await
92 .unwrap();
93 println!(" - (sent)");
94 buffer.clear();
95 }
96}examples/chat.rs (line 56)
9async fn main() -> Result<()> {
10
11 // tracing init - only show distributed_topic_tracker logs
12 use tracing_subscriber::filter::EnvFilter;
13
14 tracing_subscriber::fmt()
15 .with_thread_ids(true)
16 .with_ansi(true)
17 .with_env_filter(
18 EnvFilter::try_from_default_env()
19 .unwrap_or_else(|_| EnvFilter::new("distributed_topic_tracker=debug"))
20 )
21 .init();
22
23 // Generate a new random secret key
24 let secret_key = SecretKey::generate(&mut rand::rng());
25
26 // Set up endpoint with discovery enabled
27 let endpoint = Endpoint::builder()
28 .secret_key(secret_key.clone())
29 .discovery_n0()
30 .bind()
31 .await?;
32
33 // Initialize gossip with auto-discovery
34 let gossip = Gossip::builder().spawn(endpoint.clone());
35
36 // Set up protocol router
37 let _router = iroh::protocol::Router::builder(endpoint.clone())
38 .accept(iroh_gossip::ALPN, gossip.clone())
39 .spawn();
40
41 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
42 let initial_secret = b"my-initial-secret".to_vec();
43
44 let record_publisher = RecordPublisher::new(
45 topic_id.clone(),
46 endpoint.node_id().public(),
47 secret_key.secret().clone(),
48 None,
49 initial_secret,
50 );
51
52 // Split into sink (sending) and stream (receiving)
53 let (gossip_sender, gossip_receiver) = gossip
54 .subscribe_and_join_with_auto_discovery(record_publisher)
55 .await?
56 .split()
57 .await?;
58
59 println!("Joined topic");
60
61 // Spawn listener for incoming messages
62 tokio::spawn(async move {
63 while let Some(Ok(event)) = gossip_receiver.next().await {
64 if let Event::Received(msg) = event {
65 println!(
66 "\nMessage from {}: {}",
67 &msg.delivered_from.to_string()[0..8],
68 String::from_utf8(msg.content.to_vec()).unwrap()
69 );
70 } else if let Event::NeighborUp(peer) = event {
71 println!("\nJoined by {}", &peer.to_string()[0..8]);
72 }
73 }
74 });
75
76 // Main input loop for sending messages
77 let mut buffer = String::new();
78 let stdin = std::io::stdin();
79 loop {
80 print!("\n> ");
81 stdin.read_line(&mut buffer).unwrap();
82 gossip_sender
83 .broadcast(buffer.clone().replace("\n", "").into())
84 .await
85 .unwrap();
86 println!(" - (sent)");
87 buffer.clear();
88 }
89}Sourcepub async fn gossip_sender(&self) -> Result<GossipSender>
pub async fn gossip_sender(&self) -> Result<GossipSender>
Get the gossip sender for this topic.
Sourcepub async fn gossip_receiver(&self) -> Result<GossipReceiver>
pub async fn gossip_receiver(&self) -> Result<GossipReceiver>
Get the gossip receiver for this topic.
Sourcepub async fn record_creator(&self) -> Result<RecordPublisher>
pub async fn record_creator(&self) -> Result<RecordPublisher>
Get the record publisher for this topic.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Topic
impl RefUnwindSafe for Topic
impl Send for Topic
impl Sync for Topic
impl Unpin for Topic
impl UnwindSafe for Topic
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more