pub struct RecordPublisher { /* private fields */ }Expand description
Publisher for creating and distributing signed DHT records.
Checks existing DHT record count before publishing to respect capacity limits.
Implementations§
Source§impl RecordPublisher
impl RecordPublisher
Sourcepub fn builder(
topic_id: impl Into<TopicId>,
signing_key: SigningKey,
initial_secret: impl Into<Vec<u8>>,
) -> RecordPublisherBuilder
pub fn builder( topic_id: impl Into<TopicId>, signing_key: SigningKey, initial_secret: impl Into<Vec<u8>>, ) -> RecordPublisherBuilder
Create a new RecordPublisherBuilder.
Examples found in repository?
77async fn main() -> Result<()> {
78 // Generate a new random secret key
79 let secret_key = SecretKey::generate(&mut rand::rng());
80 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
81
82 // Set up endpoint with discovery enabled
83 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
84 .secret_key(secret_key.clone())
85 .bind()
86 .await?;
87
88 // Initialize gossip with auto-discovery
89 let gossip = Gossip::builder().spawn(endpoint.clone());
90
91 // Set up protocol router
92 let _router = iroh::protocol::Router::builder(endpoint.clone())
93 .accept(iroh_gossip::ALPN, gossip.clone())
94 .spawn();
95
96 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
97 let initial_secret = b"my-initial-secret".to_vec();
98
99 let record_publisher =
100 RecordPublisher::builder(topic_id.clone(), signing_key.clone(), initial_secret)
101 .config(config_builder())
102 .secret_rotation(RotationHandle::new(DefaultSecretRotation))
103 .build();
104
105 let topic = gossip
106 .subscribe_and_join_with_auto_discovery(record_publisher)
107 .await?;
108
109 println!("[joined topic]");
110
111 // Do something with the gossip topic
112 // (bonus: GossipSender and GossipReceiver are safely clonable)
113 let (_gossip_sender, _gossip_receiver) = topic.split().await?;
114
115 Ok(())
116}Sourcepub fn new(
topic_id: impl Into<TopicId>,
signing_key: SigningKey,
secret_rotation: Option<RotationHandle>,
initial_secret: impl Into<Vec<u8>>,
config: Config,
) -> Self
pub fn new( topic_id: impl Into<TopicId>, signing_key: SigningKey, secret_rotation: Option<RotationHandle>, initial_secret: impl Into<Vec<u8>>, config: Config, ) -> Self
Create a new record publisher.
§Arguments
topic_id- Topic identifiersigning_key- Ed25519 secret key (signing key)secret_rotation- Optional custom key rotation strategyinitial_secret- Initial secret for key derivationconfig- Configuration settings
Examples found in repository?
10async fn main() -> Result<()> {
11 // Generate a new random secret key
12 let secret_key = SecretKey::generate(&mut rand::rng());
13 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
14
15 // Set up endpoint with discovery enabled
16 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
17 .secret_key(secret_key.clone())
18 .bind()
19 .await?;
20
21 // Initialize gossip with auto-discovery
22 let gossip = Gossip::builder().spawn(endpoint.clone());
23
24 // Set up protocol router
25 let _router = iroh::protocol::Router::builder(endpoint.clone())
26 .accept(iroh_gossip::ALPN, gossip.clone())
27 .spawn();
28
29 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
30 let initial_secret = b"my-initial-secret".to_vec();
31
32 let record_publisher = RecordPublisher::new(
33 topic_id.clone(),
34 signing_key.clone(),
35 None,
36 initial_secret,
37 Config::default(),
38 );
39
40 let topic = gossip
41 .subscribe_and_join_with_auto_discovery(record_publisher)
42 .await?;
43
44 println!("[joined topic]");
45
46 // Do something with the gossip topic
47 // (bonus: GossipSender and GossipReceiver are safely clonable)
48 let (_gossip_sender, _gossip_receiver) = topic.split().await?;
49
50 Ok(())
51}More examples
13async fn main() -> Result<()> {
14 // Generate a new random secret key
15 let secret_key = SecretKey::generate(&mut rand::rng());
16 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
17
18 // Set up endpoint with discovery enabled
19 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
20 .secret_key(secret_key.clone())
21 .bind()
22 .await?;
23
24 // Initialize gossip with auto-discovery
25 let gossip = Gossip::builder().spawn(endpoint.clone());
26
27 // Set up protocol router
28 let _router = iroh::protocol::Router::builder(endpoint.clone())
29 .accept(iroh_gossip::ALPN, gossip.clone())
30 .spawn();
31
32 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
33 let initial_secret = b"my-initial-secret".to_vec();
34
35 let record_publisher = RecordPublisher::new(
36 topic_id.clone(),
37 signing_key.clone(),
38 None,
39 initial_secret,
40 // [!] Disable merge workers (BubbleMerge and MessageOverlapMerge)
41 Config::builder()
42 .merge_config(
43 MergeConfig::builder()
44 .bubble_merge(BubbleMergeConfig::Disabled)
45 .message_overlap_merge(MessageOverlapMergeConfig::Disabled)
46 .build(),
47 )
48 .build(),
49 );
50
51 let topic = gossip
52 .subscribe_and_join_with_auto_discovery(record_publisher)
53 .await?;
54
55 println!("[joined topic]");
56
57 // Do something with the gossip topic
58 // (bonus: GossipSender and GossipReceiver are safely clonable)
59 let (_gossip_sender, _gossip_receiver) = topic.split().await?;
60
61 Ok(())
62}11async fn main() -> Result<()> {
12 // Generate a new random secret key
13 let secret_key = SecretKey::generate(&mut rand::rng());
14 let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes());
15
16 // Set up endpoint with address lookup enabled
17 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
18 .secret_key(secret_key.clone())
19 .bind()
20 .await?;
21
22 // Initialize gossip with auto-discovery
23 let gossip = Gossip::builder().spawn(endpoint.clone());
24
25 // Set up protocol router
26 let _router = iroh::protocol::Router::builder(endpoint.clone())
27 .accept(iroh_gossip::ALPN, gossip.clone())
28 .spawn();
29
30 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
31 let initial_secret = b"my-initial-secret".to_vec();
32
33 let record_publisher = RecordPublisher::new(
34 topic_id.clone(),
35 signing_key.clone(),
36 None,
37 initial_secret,
38 Config::default(),
39 );
40 let (gossip_sender, mut gossip_receiver) = gossip
41 .subscribe_and_join_with_auto_discovery(record_publisher)
42 .await?
43 .split()
44 .await?;
45
46 tokio::spawn(async move {
47 while let Ok(event) = gossip_receiver.next().await {
48 println!("event: {event:?}");
49 }
50 println!("\nGossip receiver stream ended");
51 });
52
53 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
54 gossip_sender
55 .broadcast(format!("hi from {}", endpoint.id()).into())
56 .await?;
57
58 println!("[joined topic]");
59
60 tokio::time::sleep(std::time::Duration::from_secs(10)).await;
61
62 println!("[finished]");
63
64 // successfully joined
65 // exit with code 0
66 Ok(())
67}30async fn main() -> Result<()> {
31 // Generate a new random secret key
32 let secret_key = SecretKey::generate(&mut rand::rng());
33 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
34
35 // Set up endpoint with discovery enabled
36 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
37 .secret_key(secret_key.clone())
38 .bind()
39 .await?;
40
41 // Initialize gossip with auto-discovery
42 let gossip = Gossip::builder().spawn(endpoint.clone());
43
44 // Set up protocol router
45 let _router = iroh::protocol::Router::builder(endpoint.clone())
46 .accept(iroh_gossip::ALPN, gossip.clone())
47 .spawn();
48
49 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
50 let initial_secret = b"my-initial-secret".to_vec();
51
52 let record_publisher = RecordPublisher::new(
53 topic_id.clone(),
54 signing_key.clone(),
55 Some(RotationHandle::new(MySecretRotation)),
56 initial_secret,
57 Config::default(),
58 );
59 let (gossip_sender, mut gossip_receiver) = gossip
60 .subscribe_and_join_with_auto_discovery(record_publisher)
61 .await?
62 .split()
63 .await?;
64
65 println!("Joined topic");
66
67 // Spawn listener for incoming messages
68 tokio::spawn(async move {
69 while let Ok(event) = gossip_receiver.next().await {
70 if let Event::Received(msg) = event {
71 println!(
72 "\nMessage from {}: {}",
73 &msg.delivered_from.to_string()[0..8],
74 String::from_utf8(msg.content.to_vec()).unwrap()
75 );
76 } else if let Event::NeighborUp(peer) = event {
77 println!("\nJoined by {}", &peer.to_string()[0..8]);
78 }
79 }
80 println!("\nGossip receiver stream ended");
81 });
82
83 // Main input loop for sending messages
84 let mut buffer = String::new();
85 let stdin = std::io::stdin();
86 loop {
87 print!("\n> ");
88 stdin.read_line(&mut buffer).unwrap();
89 gossip_sender
90 .broadcast(buffer.clone().replace("\n", "").into())
91 .await
92 .unwrap();
93 println!(" - (sent)");
94 buffer.clear();
95 }
96}12async fn main() -> Result<()> {
13 tracing_subscriber::fmt()
14 .with_thread_ids(true)
15 .with_ansi(true)
16 .with_env_filter(
17 EnvFilter::try_from_default_env()
18 .unwrap_or_else(|_| EnvFilter::new("distributed_topic_tracker=debug")),
19 )
20 .init();
21
22 // Generate a new random secret key
23 let secret_key = SecretKey::generate(&mut rand::rng());
24 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
25
26 // Set up endpoint with discovery enabled
27 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
28 .secret_key(secret_key.clone())
29 .bind()
30 .await?;
31
32 // Initialize gossip with auto-discovery
33 let gossip = Gossip::builder().spawn(endpoint.clone());
34
35 // Set up protocol router
36 let _router = iroh::protocol::Router::builder(endpoint.clone())
37 .accept(iroh_gossip::ALPN, gossip.clone())
38 .spawn();
39
40 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
41 let initial_secret = b"my-initial-secret".to_vec();
42
43 let record_publisher = RecordPublisher::new(
44 topic_id.clone(),
45 signing_key.clone(),
46 None,
47 initial_secret,
48 Config::default(),
49 );
50 let (gossip_sender, mut gossip_receiver) = gossip
51 .subscribe_and_join_with_auto_discovery_no_wait(record_publisher)
52 .await?
53 .split()
54 .await?;
55
56 println!("Joined topic");
57
58 // Spawn listener for incoming messages
59 tokio::spawn(async move {
60 while let Ok(event) = gossip_receiver.next().await {
61 if let Event::Received(msg) = event {
62 println!(
63 "\nMessage from {}: {}",
64 &msg.delivered_from.to_string()[0..8],
65 String::from_utf8(msg.content.to_vec()).unwrap()
66 );
67 } else if let Event::NeighborUp(peer) = event {
68 println!("\nJoined by {}", &peer.to_string()[0..8]);
69 }
70 }
71 println!("\nGossip receiver stream ended");
72 });
73
74 // Main input loop for sending messages
75 let mut buffer = String::new();
76 let stdin = std::io::stdin();
77 loop {
78 print!("\n> ");
79 stdin.read_line(&mut buffer).unwrap();
80 gossip_sender
81 .broadcast(
82 buffer
83 .trim_end_matches(&['\r', '\n'][..])
84 .as_bytes()
85 .to_vec(),
86 )
87 .await
88 .unwrap();
89 println!(" - (sent)");
90 buffer.clear();
91 }
92}13async fn main() -> Result<()> {
14 // tracing init - only show distributed_topic_tracker logs
15 use tracing_subscriber::filter::EnvFilter;
16
17 tracing_subscriber::fmt()
18 .with_thread_ids(true)
19 .with_ansi(true)
20 .with_env_filter(
21 EnvFilter::try_from_default_env()
22 .unwrap_or_else(|_| EnvFilter::new("distributed_topic_tracker=debug")),
23 )
24 .init();
25
26 // Generate a new random secret key
27 let secret_key = SecretKey::generate(&mut rand::rng());
28 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
29
30 // Set up endpoint with discovery enabled
31 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
32 .secret_key(secret_key.clone())
33 .bind()
34 .await?;
35
36 // Initialize gossip with auto-discovery
37 let gossip = Gossip::builder().spawn(endpoint.clone());
38
39 // Set up protocol router
40 let _router = iroh::protocol::Router::builder(endpoint.clone())
41 .accept(iroh_gossip::ALPN, gossip.clone())
42 .spawn();
43
44 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
45 let initial_secret = b"my-initial-secret".to_vec();
46
47 let record_publisher = RecordPublisher::new(
48 topic_id.clone(),
49 signing_key.clone(),
50 None,
51 initial_secret,
52 Config::builder()
53 .bootstrap_config(
54 BootstrapConfig::builder()
55 .check_older_records_first_on_startup(false)
56 .build(),
57 )
58 .build(),
59 );
60
61 // Split into sink (sending) and stream (receiving)
62 let (gossip_sender, mut gossip_receiver) = gossip
63 .subscribe_and_join_with_auto_discovery(record_publisher)
64 .await?
65 .split()
66 .await?;
67
68 println!("Joined topic");
69
70 // Spawn listener for incoming messages
71 tokio::spawn(async move {
72 while let Ok(event) = gossip_receiver.next().await {
73 if let Event::Received(msg) = event {
74 println!(
75 "\nMessage from {}: {}",
76 &msg.delivered_from.to_string()[0..8],
77 String::from_utf8(msg.content.to_vec()).unwrap()
78 );
79 } else if let Event::NeighborUp(peer) = event {
80 println!("\nJoined by {}", &peer.to_string()[0..8]);
81 }
82 }
83 println!("\nGossip receiver stream ended");
84 });
85
86 // Main input loop for sending messages
87 let mut buffer = String::new();
88 let stdin = std::io::stdin();
89 loop {
90 print!("\n> ");
91 stdin.read_line(&mut buffer).unwrap();
92 gossip_sender
93 .broadcast(
94 buffer
95 .trim_end_matches(&['\r', '\n'][..])
96 .as_bytes()
97 .to_vec(),
98 )
99 .await
100 .unwrap();
101 println!(" - (sent)");
102 buffer.clear();
103 }
104}Sourcepub fn new_record<'a>(
&'a self,
unix_minute: u64,
record_content: impl Serialize + Deserialize<'a>,
) -> Result<Record>
pub fn new_record<'a>( &'a self, unix_minute: u64, record_content: impl Serialize + Deserialize<'a>, ) -> Result<Record>
Create a new signed record with content.
§Arguments
unix_minute- Time slot for this recordrecord_content- Serializable content
Sourcepub fn pub_key(&self) -> VerifyingKey
pub fn pub_key(&self) -> VerifyingKey
Get this publisher’s Ed25519 verifying key.
Sourcepub fn signing_key(&self) -> &SigningKey
pub fn signing_key(&self) -> &SigningKey
Get the signing key.
Sourcepub fn secret_rotation(&self) -> Option<RotationHandle>
pub fn secret_rotation(&self) -> Option<RotationHandle>
Get the secret rotation handle if set.
Sourcepub fn initial_secret_hash(&self) -> [u8; 32]
pub fn initial_secret_hash(&self) -> [u8; 32]
Get the initial secret hash.
Source§impl RecordPublisher
impl RecordPublisher
Sourcepub async fn publish_record(
&self,
record: Record,
cancel_token: CancellationToken,
) -> Result<()>
pub async fn publish_record( &self, record: Record, cancel_token: CancellationToken, ) -> Result<()>
Publish a record to the DHT if slot capacity allows.
Checks existing record count for this time slot and skips publishing if
self.config.bootstrap_config().max_bootstrap_records() limit reached.
Sourcepub async fn publish_record_cached_records(
&self,
record: Record,
cached_records: Option<HashSet<Record>>,
cancel_token: CancellationToken,
) -> Result<()>
pub async fn publish_record_cached_records( &self, record: Record, cached_records: Option<HashSet<Record>>, cancel_token: CancellationToken, ) -> Result<()>
Publish a record to the DHT (using cached get_records) if slot capacity allows.
Checks existing record count for this time slot and skips publishing if
self.config.bootstrap_config().max_bootstrap_records() limit reached.
Sourcepub async fn get_records(
&self,
unix_minute: u64,
cancel_token: CancellationToken,
) -> Result<HashSet<Record>>
pub async fn get_records( &self, unix_minute: u64, cancel_token: CancellationToken, ) -> Result<HashSet<Record>>
Retrieve all verified records for a given time slot from the DHT.
Filters out records from this publisher’s own node ID. Dedup’s records based on pub_key, keeping the highest sequence number per pub_key.
Trait Implementations§
Source§impl Clone for RecordPublisher
impl Clone for RecordPublisher
Source§fn clone(&self) -> RecordPublisher
fn clone(&self) -> RecordPublisher
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more