pub struct Topic { /* private fields */ }Expand description
Handle to a joined gossip topic with auto-discovery.
Manages bootstrap, publishing, bubble detection, and split-brain recovery. Can be split into sender and receiver for message exchange.
Implementations§
Source§impl Topic
impl Topic
Sourcepub async fn new(
record_publisher: RecordPublisher,
gossip: Gossip,
async_bootstrap: bool,
) -> Result<Self>
pub async fn new( record_publisher: RecordPublisher, gossip: Gossip, async_bootstrap: bool, ) -> Result<Self>
Create and initialize a new topic with auto-discovery.
§Arguments
record_publisher- Record publisher for DHT operationsgossip- Gossip instance for topic subscriptionasync_bootstrap- If false, awaits until bootstrap completes
Sourcepub async fn split(&self) -> Result<(GossipSender, GossipReceiver)>
pub async fn split(&self) -> Result<(GossipSender, GossipReceiver)>
Split into sender and receiver for message exchange.
Examples found in repository?
examples/simple.rs (line 48)
10async fn main() -> Result<()> {
11 // Generate a new random secret key
12 let secret_key = SecretKey::generate(&mut rand::rng());
13 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
14
15 // Set up endpoint with discovery enabled
16 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
17 .secret_key(secret_key.clone())
18 .bind()
19 .await?;
20
21 // Initialize gossip with auto-discovery
22 let gossip = Gossip::builder().spawn(endpoint.clone());
23
24 // Set up protocol router
25 let _router = iroh::protocol::Router::builder(endpoint.clone())
26 .accept(iroh_gossip::ALPN, gossip.clone())
27 .spawn();
28
29 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
30 let initial_secret = b"my-initial-secret".to_vec();
31
32 let record_publisher = RecordPublisher::new(
33 topic_id.clone(),
34 signing_key.clone(),
35 None,
36 initial_secret,
37 Config::default(),
38 );
39
40 let topic = gossip
41 .subscribe_and_join_with_auto_discovery(record_publisher)
42 .await?;
43
44 println!("[joined topic]");
45
46 // Do something with the gossip topic
47 // (bonus: GossipSender and GossipReceiver are safely clonable)
48 let (_gossip_sender, _gossip_receiver) = topic.split().await?;
49
50 Ok(())
51}More examples
examples/full_config.rs (line 113)
77async fn main() -> Result<()> {
78 // Generate a new random secret key
79 let secret_key = SecretKey::generate(&mut rand::rng());
80 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
81
82 // Set up endpoint with discovery enabled
83 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
84 .secret_key(secret_key.clone())
85 .bind()
86 .await?;
87
88 // Initialize gossip with auto-discovery
89 let gossip = Gossip::builder().spawn(endpoint.clone());
90
91 // Set up protocol router
92 let _router = iroh::protocol::Router::builder(endpoint.clone())
93 .accept(iroh_gossip::ALPN, gossip.clone())
94 .spawn();
95
96 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
97 let initial_secret = b"my-initial-secret".to_vec();
98
99 let record_publisher =
100 RecordPublisher::builder(topic_id.clone(), signing_key.clone(), initial_secret)
101 .config(config_builder())
102 .secret_rotation(RotationHandle::new(DefaultSecretRotation))
103 .build();
104
105 let topic = gossip
106 .subscribe_and_join_with_auto_discovery(record_publisher)
107 .await?;
108
109 println!("[joined topic]");
110
111 // Do something with the gossip topic
112 // (bonus: GossipSender and GossipReceiver are safely clonable)
113 let (_gossip_sender, _gossip_receiver) = topic.split().await?;
114
115 Ok(())
116}examples/without_mergers.rs (line 59)
13async fn main() -> Result<()> {
14 // Generate a new random secret key
15 let secret_key = SecretKey::generate(&mut rand::rng());
16 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
17
18 // Set up endpoint with discovery enabled
19 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
20 .secret_key(secret_key.clone())
21 .bind()
22 .await?;
23
24 // Initialize gossip with auto-discovery
25 let gossip = Gossip::builder().spawn(endpoint.clone());
26
27 // Set up protocol router
28 let _router = iroh::protocol::Router::builder(endpoint.clone())
29 .accept(iroh_gossip::ALPN, gossip.clone())
30 .spawn();
31
32 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
33 let initial_secret = b"my-initial-secret".to_vec();
34
35 let record_publisher = RecordPublisher::new(
36 topic_id.clone(),
37 signing_key.clone(),
38 None,
39 initial_secret,
40 // [!] Disable merge workers (BubbleMerge and MessageOverlapMerge)
41 Config::builder()
42 .merge_config(
43 MergeConfig::builder()
44 .bubble_merge(BubbleMergeConfig::Disabled)
45 .message_overlap_merge(MessageOverlapMergeConfig::Disabled)
46 .build(),
47 )
48 .build(),
49 );
50
51 let topic = gossip
52 .subscribe_and_join_with_auto_discovery(record_publisher)
53 .await?;
54
55 println!("[joined topic]");
56
57 // Do something with the gossip topic
58 // (bonus: GossipSender and GossipReceiver are safely clonable)
59 let (_gossip_sender, _gossip_receiver) = topic.split().await?;
60
61 Ok(())
62}examples/e2e_test.rs (line 43)
11async fn main() -> Result<()> {
12 // Generate a new random secret key
13 let secret_key = SecretKey::generate(&mut rand::rng());
14 let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes());
15
16 // Set up endpoint with address lookup enabled
17 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
18 .secret_key(secret_key.clone())
19 .bind()
20 .await?;
21
22 // Initialize gossip with auto-discovery
23 let gossip = Gossip::builder().spawn(endpoint.clone());
24
25 // Set up protocol router
26 let _router = iroh::protocol::Router::builder(endpoint.clone())
27 .accept(iroh_gossip::ALPN, gossip.clone())
28 .spawn();
29
30 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
31 let initial_secret = b"my-initial-secret".to_vec();
32
33 let record_publisher = RecordPublisher::new(
34 topic_id.clone(),
35 signing_key.clone(),
36 None,
37 initial_secret,
38 Config::default(),
39 );
40 let (gossip_sender, mut gossip_receiver) = gossip
41 .subscribe_and_join_with_auto_discovery(record_publisher)
42 .await?
43 .split()
44 .await?;
45
46 tokio::spawn(async move {
47 while let Ok(event) = gossip_receiver.next().await {
48 println!("event: {event:?}");
49 }
50 println!("\nGossip receiver stream ended");
51 });
52
53 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
54 gossip_sender
55 .broadcast(format!("hi from {}", endpoint.id()).into())
56 .await?;
57
58 println!("[joined topic]");
59
60 tokio::time::sleep(std::time::Duration::from_secs(10)).await;
61
62 println!("[finished]");
63
64 // successfully joined
65 // exit with code 0
66 Ok(())
67}examples/secret_rotation.rs (line 62)
30async fn main() -> Result<()> {
31 // Generate a new random secret key
32 let secret_key = SecretKey::generate(&mut rand::rng());
33 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
34
35 // Set up endpoint with discovery enabled
36 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
37 .secret_key(secret_key.clone())
38 .bind()
39 .await?;
40
41 // Initialize gossip with auto-discovery
42 let gossip = Gossip::builder().spawn(endpoint.clone());
43
44 // Set up protocol router
45 let _router = iroh::protocol::Router::builder(endpoint.clone())
46 .accept(iroh_gossip::ALPN, gossip.clone())
47 .spawn();
48
49 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
50 let initial_secret = b"my-initial-secret".to_vec();
51
52 let record_publisher = RecordPublisher::new(
53 topic_id.clone(),
54 signing_key.clone(),
55 Some(RotationHandle::new(MySecretRotation)),
56 initial_secret,
57 Config::default(),
58 );
59 let (gossip_sender, mut gossip_receiver) = gossip
60 .subscribe_and_join_with_auto_discovery(record_publisher)
61 .await?
62 .split()
63 .await?;
64
65 println!("Joined topic");
66
67 // Spawn listener for incoming messages
68 tokio::spawn(async move {
69 while let Ok(event) = gossip_receiver.next().await {
70 if let Event::Received(msg) = event {
71 println!(
72 "\nMessage from {}: {}",
73 &msg.delivered_from.to_string()[0..8],
74 String::from_utf8(msg.content.to_vec()).unwrap()
75 );
76 } else if let Event::NeighborUp(peer) = event {
77 println!("\nJoined by {}", &peer.to_string()[0..8]);
78 }
79 }
80 println!("\nGossip receiver stream ended");
81 });
82
83 // Main input loop for sending messages
84 let mut buffer = String::new();
85 let stdin = std::io::stdin();
86 loop {
87 print!("\n> ");
88 stdin.read_line(&mut buffer).unwrap();
89 gossip_sender
90 .broadcast(buffer.clone().replace("\n", "").into())
91 .await
92 .unwrap();
93 println!(" - (sent)");
94 buffer.clear();
95 }
96}examples/chat_no_wait.rs (line 53)
12async fn main() -> Result<()> {
13 tracing_subscriber::fmt()
14 .with_thread_ids(true)
15 .with_ansi(true)
16 .with_env_filter(
17 EnvFilter::try_from_default_env()
18 .unwrap_or_else(|_| EnvFilter::new("distributed_topic_tracker=debug")),
19 )
20 .init();
21
22 // Generate a new random secret key
23 let secret_key = SecretKey::generate(&mut rand::rng());
24 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
25
26 // Set up endpoint with discovery enabled
27 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
28 .secret_key(secret_key.clone())
29 .bind()
30 .await?;
31
32 // Initialize gossip with auto-discovery
33 let gossip = Gossip::builder().spawn(endpoint.clone());
34
35 // Set up protocol router
36 let _router = iroh::protocol::Router::builder(endpoint.clone())
37 .accept(iroh_gossip::ALPN, gossip.clone())
38 .spawn();
39
40 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
41 let initial_secret = b"my-initial-secret".to_vec();
42
43 let record_publisher = RecordPublisher::new(
44 topic_id.clone(),
45 signing_key.clone(),
46 None,
47 initial_secret,
48 Config::default(),
49 );
50 let (gossip_sender, mut gossip_receiver) = gossip
51 .subscribe_and_join_with_auto_discovery_no_wait(record_publisher)
52 .await?
53 .split()
54 .await?;
55
56 println!("Joined topic");
57
58 // Spawn listener for incoming messages
59 tokio::spawn(async move {
60 while let Ok(event) = gossip_receiver.next().await {
61 if let Event::Received(msg) = event {
62 println!(
63 "\nMessage from {}: {}",
64 &msg.delivered_from.to_string()[0..8],
65 String::from_utf8(msg.content.to_vec()).unwrap()
66 );
67 } else if let Event::NeighborUp(peer) = event {
68 println!("\nJoined by {}", &peer.to_string()[0..8]);
69 }
70 }
71 println!("\nGossip receiver stream ended");
72 });
73
74 // Main input loop for sending messages
75 let mut buffer = String::new();
76 let stdin = std::io::stdin();
77 loop {
78 print!("\n> ");
79 stdin.read_line(&mut buffer).unwrap();
80 gossip_sender
81 .broadcast(
82 buffer
83 .trim_end_matches(&['\r', '\n'][..])
84 .as_bytes()
85 .to_vec(),
86 )
87 .await
88 .unwrap();
89 println!(" - (sent)");
90 buffer.clear();
91 }
92}Additional examples can be found in:
Sourcepub async fn gossip_sender(&self) -> Result<GossipSender>
pub async fn gossip_sender(&self) -> Result<GossipSender>
Get the gossip sender for this topic.
Sourcepub async fn gossip_receiver(&self) -> Result<GossipReceiver>
pub async fn gossip_receiver(&self) -> Result<GossipReceiver>
Get the gossip receiver for this topic.
Sourcepub async fn record_creator(&self) -> Result<RecordPublisher>
pub async fn record_creator(&self) -> Result<RecordPublisher>
Get the record publisher for this topic.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Topic
impl RefUnwindSafe for Topic
impl Send for Topic
impl Sync for Topic
impl Unpin for Topic
impl UnsafeUnpin for Topic
impl UnwindSafe for Topic
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more