pub struct Topic { /* private fields */ }Expand description
Handle to a joined gossip topic with auto-discovery.
Manages bootstrap, publishing, bubble detection, and split-brain recovery. Can be split into sender and receiver for message exchange.
Implementations§
Source§impl Topic
impl Topic
Sourcepub async fn new(
record_publisher: RecordPublisher,
gossip: Gossip,
async_bootstrap: bool,
) -> Result<Self>
pub async fn new( record_publisher: RecordPublisher, gossip: Gossip, async_bootstrap: bool, ) -> Result<Self>
Create and initialize a new topic with auto-discovery.
§Arguments
record_publisher- Record publisher for DHT operationsgossip- Gossip instance for topic subscriptionasync_bootstrap- If false, awaits until bootstrap completes
Sourcepub async fn split(&self) -> Result<(GossipSender, GossipReceiver)>
pub async fn split(&self) -> Result<(GossipSender, GossipReceiver)>
Split into sender and receiver for message exchange.
Examples found in repository?
examples/simple.rs (line 48)
10async fn main() -> Result<()> {
11 // Generate a new random secret key
12 let secret_key = SecretKey::generate();
13 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
14
15 // Set up endpoint with discovery enabled
16 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
17 .secret_key(secret_key.clone())
18 .bind()
19 .await?;
20
21 // Initialize gossip with auto-discovery
22 let gossip = Gossip::builder().spawn(endpoint.clone());
23
24 // Set up protocol router
25 let _router = iroh::protocol::Router::builder(endpoint.clone())
26 .accept(iroh_gossip::ALPN, gossip.clone())
27 .spawn();
28
29 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
30 let initial_secret = b"my-initial-secret".to_vec();
31
32 let record_publisher = RecordPublisher::new(
33 topic_id.clone(),
34 signing_key.clone(),
35 None,
36 initial_secret,
37 Config::default(),
38 );
39
40 let topic = gossip
41 .subscribe_and_join_with_auto_discovery(record_publisher)
42 .await?;
43
44 println!("[joined topic]");
45
46 // Do something with the gossip topic
47 // (bonus: GossipSender and GossipReceiver are safely clonable)
48 let (_gossip_sender, _gossip_receiver) = topic.split().await?;
49
50 Ok(())
51}More examples
examples/full_config.rs (line 117)
81async fn main() -> Result<()> {
82 // Generate a new random secret key
83 let secret_key = SecretKey::generate();
84 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
85
86 // Set up endpoint with discovery enabled
87 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
88 .secret_key(secret_key.clone())
89 .bind()
90 .await?;
91
92 // Initialize gossip with auto-discovery
93 let gossip = Gossip::builder().spawn(endpoint.clone());
94
95 // Set up protocol router
96 let _router = iroh::protocol::Router::builder(endpoint.clone())
97 .accept(iroh_gossip::ALPN, gossip.clone())
98 .spawn();
99
100 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
101 let initial_secret = b"my-initial-secret".to_vec();
102
103 let record_publisher =
104 RecordPublisher::builder(topic_id.clone(), signing_key.clone(), initial_secret)
105 .config(config_builder())
106 .secret_rotation(RotationHandle::new(DefaultSecretRotation))
107 .build();
108
109 let topic = gossip
110 .subscribe_and_join_with_auto_discovery(record_publisher)
111 .await?;
112
113 println!("[joined topic]");
114
115 // Do something with the gossip topic
116 // (bonus: GossipSender and GossipReceiver are safely clonable)
117 let (_gossip_sender, _gossip_receiver) = topic.split().await?;
118
119 Ok(())
120}examples/without_mergers.rs (line 59)
13async fn main() -> Result<()> {
14 // Generate a new random secret key
15 let secret_key = SecretKey::generate();
16 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
17
18 // Set up endpoint with discovery enabled
19 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
20 .secret_key(secret_key.clone())
21 .bind()
22 .await?;
23
24 // Initialize gossip with auto-discovery
25 let gossip = Gossip::builder().spawn(endpoint.clone());
26
27 // Set up protocol router
28 let _router = iroh::protocol::Router::builder(endpoint.clone())
29 .accept(iroh_gossip::ALPN, gossip.clone())
30 .spawn();
31
32 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
33 let initial_secret = b"my-initial-secret".to_vec();
34
35 let record_publisher = RecordPublisher::new(
36 topic_id.clone(),
37 signing_key.clone(),
38 None,
39 initial_secret,
40 // [!] Disable merge workers (BubbleMerge and MessageOverlapMerge)
41 Config::builder()
42 .merge_config(
43 MergeConfig::builder()
44 .bubble_merge(BubbleMergeConfig::Disabled)
45 .message_overlap_merge(MessageOverlapMergeConfig::Disabled)
46 .build(),
47 )
48 .build(),
49 );
50
51 let topic = gossip
52 .subscribe_and_join_with_auto_discovery(record_publisher)
53 .await?;
54
55 println!("[joined topic]");
56
57 // Do something with the gossip topic
58 // (bonus: GossipSender and GossipReceiver are safely clonable)
59 let (_gossip_sender, _gossip_receiver) = topic.split().await?;
60
61 Ok(())
62}examples/e2e_test.rs (line 43)
11async fn main() -> Result<()> {
12 // Generate a new random secret key
13 let secret_key = SecretKey::generate();
14 let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes());
15
16 // Set up endpoint with address lookup enabled
17 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
18 .secret_key(secret_key.clone())
19 .bind()
20 .await?;
21
22 // Initialize gossip with auto-discovery
23 let gossip = Gossip::builder().spawn(endpoint.clone());
24
25 // Set up protocol router
26 let _router = iroh::protocol::Router::builder(endpoint.clone())
27 .accept(iroh_gossip::ALPN, gossip.clone())
28 .spawn();
29
30 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
31 let initial_secret = b"my-initial-secret".to_vec();
32
33 let record_publisher = RecordPublisher::new(
34 topic_id.clone(),
35 signing_key.clone(),
36 None,
37 initial_secret,
38 Config::default(),
39 );
40 let (gossip_sender, mut gossip_receiver) = gossip
41 .subscribe_and_join_with_auto_discovery(record_publisher)
42 .await?
43 .split()
44 .await?;
45
46 tokio::spawn(async move {
47 while let Ok(event) = gossip_receiver.next().await {
48 println!("event: {event:?}");
49 }
50 println!("\nGossip receiver stream ended");
51 });
52
53 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
54 gossip_sender
55 .broadcast(format!("hi from {}", endpoint.id()).into())
56 .await?;
57
58 println!("[joined topic]");
59
60 tokio::time::sleep(std::time::Duration::from_secs(10)).await;
61
62 println!("[finished]");
63
64 // successfully joined
65 // exit with code 0
66 Ok(())
67}examples/secret_rotation.rs (line 62)
30async fn main() -> Result<()> {
31 // Generate a new random secret key
32 let secret_key = SecretKey::generate();
33 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
34
35 // Set up endpoint with discovery enabled
36 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
37 .secret_key(secret_key.clone())
38 .bind()
39 .await?;
40
41 // Initialize gossip with auto-discovery
42 let gossip = Gossip::builder().spawn(endpoint.clone());
43
44 // Set up protocol router
45 let _router = iroh::protocol::Router::builder(endpoint.clone())
46 .accept(iroh_gossip::ALPN, gossip.clone())
47 .spawn();
48
49 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
50 let initial_secret = b"my-initial-secret".to_vec();
51
52 let record_publisher = RecordPublisher::new(
53 topic_id.clone(),
54 signing_key.clone(),
55 Some(RotationHandle::new(MySecretRotation)),
56 initial_secret,
57 Config::default(),
58 );
59 let (gossip_sender, mut gossip_receiver) = gossip
60 .subscribe_and_join_with_auto_discovery(record_publisher)
61 .await?
62 .split()
63 .await?;
64
65 println!("Joined topic");
66
67 // Spawn listener for incoming messages
68 tokio::spawn(async move {
69 while let Ok(event) = gossip_receiver.next().await {
70 if let Event::Received(msg) = event {
71 println!(
72 "\nMessage from {}: {}",
73 &msg.delivered_from.to_string()[0..8],
74 String::from_utf8(msg.content.to_vec()).unwrap()
75 );
76 } else if let Event::NeighborUp(peer) = event {
77 println!("\nJoined by {}", &peer.to_string()[0..8]);
78 }
79 }
80 println!("\nGossip receiver stream ended");
81 });
82
83 // Main input loop for sending messages
84 let mut buffer = String::new();
85 let stdin = std::io::stdin();
86 loop {
87 print!("\n> ");
88 stdin.read_line(&mut buffer).unwrap();
89 gossip_sender
90 .broadcast(buffer.clone().replace("\n", "").into())
91 .await
92 .unwrap();
93 println!(" - (sent)");
94 buffer.clear();
95 }
96}examples/chat_no_wait.rs (line 53)
12async fn main() -> Result<()> {
13 tracing_subscriber::fmt()
14 .with_thread_ids(true)
15 .with_ansi(true)
16 .with_env_filter(
17 EnvFilter::try_from_default_env()
18 .unwrap_or_else(|_| EnvFilter::new("distributed_topic_tracker=debug")),
19 )
20 .init();
21
22 // Generate a new random secret key
23 let secret_key = SecretKey::generate();
24 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
25
26 // Set up endpoint with discovery enabled
27 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
28 .secret_key(secret_key.clone())
29 .bind()
30 .await?;
31
32 // Initialize gossip with auto-discovery
33 let gossip = Gossip::builder().spawn(endpoint.clone());
34
35 // Set up protocol router
36 let _router = iroh::protocol::Router::builder(endpoint.clone())
37 .accept(iroh_gossip::ALPN, gossip.clone())
38 .spawn();
39
40 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
41 let initial_secret = b"my-initial-secret".to_vec();
42
43 let record_publisher = RecordPublisher::new(
44 topic_id.clone(),
45 signing_key.clone(),
46 None,
47 initial_secret,
48 Config::default(),
49 );
50 let (gossip_sender, mut gossip_receiver) = gossip
51 .subscribe_and_join_with_auto_discovery_no_wait(record_publisher)
52 .await?
53 .split()
54 .await?;
55
56 println!("Joined topic");
57
58 // Spawn listener for incoming messages
59 tokio::spawn(async move {
60 while let Ok(event) = gossip_receiver.next().await {
61 if let Event::Received(msg) = event {
62 println!(
63 "\nMessage from {}: {}",
64 &msg.delivered_from.to_string()[0..8],
65 String::from_utf8(msg.content.to_vec()).unwrap()
66 );
67 } else if let Event::NeighborUp(peer) = event {
68 println!("\nJoined by {}", &peer.to_string()[0..8]);
69 }
70 }
71 println!("\nGossip receiver stream ended");
72 });
73
74 // Main input loop for sending messages
75 let mut buffer = String::new();
76 let stdin = std::io::stdin();
77 loop {
78 print!("\n> ");
79 stdin.read_line(&mut buffer).unwrap();
80 gossip_sender
81 .broadcast(
82 buffer
83 .trim_end_matches(&['\r', '\n'][..])
84 .as_bytes()
85 .to_vec(),
86 )
87 .await
88 .unwrap();
89 println!(" - (sent)");
90 buffer.clear();
91 }
92}Additional examples can be found in:
Sourcepub async fn gossip_sender(&self) -> Result<GossipSender>
pub async fn gossip_sender(&self) -> Result<GossipSender>
Get the gossip sender for this topic.
Sourcepub async fn gossip_receiver(&self) -> Result<GossipReceiver>
pub async fn gossip_receiver(&self) -> Result<GossipReceiver>
Get the gossip receiver for this topic.
Sourcepub async fn record_creator(&self) -> Result<RecordPublisher>
pub async fn record_creator(&self) -> Result<RecordPublisher>
Get the record publisher for this topic.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Topic
impl RefUnwindSafe for Topic
impl Send for Topic
impl Sync for Topic
impl Unpin for Topic
impl UnsafeUnpin for Topic
impl UnwindSafe for Topic
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more