pub struct RecordPublisher { /* private fields */ }Expand description
Publisher for creating and distributing signed DHT records.
Checks existing DHT record count before publishing to respect capacity limits.
Implementations§
Source§impl RecordPublisher
impl RecordPublisher
Sourcepub fn builder(
topic_id: impl Into<TopicId>,
signing_key: SigningKey,
initial_secret: impl Into<Vec<u8>>,
) -> RecordPublisherBuilder
pub fn builder( topic_id: impl Into<TopicId>, signing_key: SigningKey, initial_secret: impl Into<Vec<u8>>, ) -> RecordPublisherBuilder
Create a new RecordPublisherBuilder.
Examples found in repository?
81async fn main() -> Result<()> {
82 // Generate a new random secret key
83 let secret_key = SecretKey::generate();
84 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
85
86 // Set up endpoint with discovery enabled
87 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
88 .secret_key(secret_key.clone())
89 .bind()
90 .await?;
91
92 // Initialize gossip with auto-discovery
93 let gossip = Gossip::builder().spawn(endpoint.clone());
94
95 // Set up protocol router
96 let _router = iroh::protocol::Router::builder(endpoint.clone())
97 .accept(iroh_gossip::ALPN, gossip.clone())
98 .spawn();
99
100 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
101 let initial_secret = b"my-initial-secret".to_vec();
102
103 let record_publisher =
104 RecordPublisher::builder(topic_id.clone(), signing_key.clone(), initial_secret)
105 .config(config_builder())
106 .secret_rotation(RotationHandle::new(DefaultSecretRotation))
107 .build();
108
109 let topic = gossip
110 .subscribe_and_join_with_auto_discovery(record_publisher)
111 .await?;
112
113 println!("[joined topic]");
114
115 // Do something with the gossip topic
116 // (bonus: GossipSender and GossipReceiver are safely clonable)
117 let (_gossip_sender, _gossip_receiver) = topic.split().await?;
118
119 Ok(())
120}Sourcepub fn new(
topic_id: impl Into<TopicId>,
signing_key: SigningKey,
secret_rotation: Option<RotationHandle>,
initial_secret: impl Into<Vec<u8>>,
config: Config,
) -> Self
pub fn new( topic_id: impl Into<TopicId>, signing_key: SigningKey, secret_rotation: Option<RotationHandle>, initial_secret: impl Into<Vec<u8>>, config: Config, ) -> Self
Create a new record publisher.
§Arguments
topic_id- Topic identifiersigning_key- Ed25519 secret key (signing key)secret_rotation- Optional custom key rotation strategyinitial_secret- Initial secret for key derivationconfig- Configuration settings
Examples found in repository?
10async fn main() -> Result<()> {
11 // Generate a new random secret key
12 let secret_key = SecretKey::generate();
13 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
14
15 // Set up endpoint with discovery enabled
16 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
17 .secret_key(secret_key.clone())
18 .bind()
19 .await?;
20
21 // Initialize gossip with auto-discovery
22 let gossip = Gossip::builder().spawn(endpoint.clone());
23
24 // Set up protocol router
25 let _router = iroh::protocol::Router::builder(endpoint.clone())
26 .accept(iroh_gossip::ALPN, gossip.clone())
27 .spawn();
28
29 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
30 let initial_secret = b"my-initial-secret".to_vec();
31
32 let record_publisher = RecordPublisher::new(
33 topic_id.clone(),
34 signing_key.clone(),
35 None,
36 initial_secret,
37 Config::default(),
38 );
39
40 let topic = gossip
41 .subscribe_and_join_with_auto_discovery(record_publisher)
42 .await?;
43
44 println!("[joined topic]");
45
46 // Do something with the gossip topic
47 // (bonus: GossipSender and GossipReceiver are safely clonable)
48 let (_gossip_sender, _gossip_receiver) = topic.split().await?;
49
50 Ok(())
51}More examples
13async fn main() -> Result<()> {
14 // Generate a new random secret key
15 let secret_key = SecretKey::generate();
16 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
17
18 // Set up endpoint with discovery enabled
19 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
20 .secret_key(secret_key.clone())
21 .bind()
22 .await?;
23
24 // Initialize gossip with auto-discovery
25 let gossip = Gossip::builder().spawn(endpoint.clone());
26
27 // Set up protocol router
28 let _router = iroh::protocol::Router::builder(endpoint.clone())
29 .accept(iroh_gossip::ALPN, gossip.clone())
30 .spawn();
31
32 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
33 let initial_secret = b"my-initial-secret".to_vec();
34
35 let record_publisher = RecordPublisher::new(
36 topic_id.clone(),
37 signing_key.clone(),
38 None,
39 initial_secret,
40 // [!] Disable merge workers (BubbleMerge and MessageOverlapMerge)
41 Config::builder()
42 .merge_config(
43 MergeConfig::builder()
44 .bubble_merge(BubbleMergeConfig::Disabled)
45 .message_overlap_merge(MessageOverlapMergeConfig::Disabled)
46 .build(),
47 )
48 .build(),
49 );
50
51 let topic = gossip
52 .subscribe_and_join_with_auto_discovery(record_publisher)
53 .await?;
54
55 println!("[joined topic]");
56
57 // Do something with the gossip topic
58 // (bonus: GossipSender and GossipReceiver are safely clonable)
59 let (_gossip_sender, _gossip_receiver) = topic.split().await?;
60
61 Ok(())
62}11async fn main() -> Result<()> {
12 // Generate a new random secret key
13 let secret_key = SecretKey::generate();
14 let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes());
15
16 // Set up endpoint with address lookup enabled
17 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
18 .secret_key(secret_key.clone())
19 .bind()
20 .await?;
21
22 // Initialize gossip with auto-discovery
23 let gossip = Gossip::builder().spawn(endpoint.clone());
24
25 // Set up protocol router
26 let _router = iroh::protocol::Router::builder(endpoint.clone())
27 .accept(iroh_gossip::ALPN, gossip.clone())
28 .spawn();
29
30 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
31 let initial_secret = b"my-initial-secret".to_vec();
32
33 let record_publisher = RecordPublisher::new(
34 topic_id.clone(),
35 signing_key.clone(),
36 None,
37 initial_secret,
38 Config::default(),
39 );
40 let (gossip_sender, mut gossip_receiver) = gossip
41 .subscribe_and_join_with_auto_discovery(record_publisher)
42 .await?
43 .split()
44 .await?;
45
46 tokio::spawn(async move {
47 while let Ok(event) = gossip_receiver.next().await {
48 println!("event: {event:?}");
49 }
50 println!("\nGossip receiver stream ended");
51 });
52
53 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
54 gossip_sender
55 .broadcast(format!("hi from {}", endpoint.id()).into())
56 .await?;
57
58 println!("[joined topic]");
59
60 tokio::time::sleep(std::time::Duration::from_secs(10)).await;
61
62 println!("[finished]");
63
64 // successfully joined
65 // exit with code 0
66 Ok(())
67}30async fn main() -> Result<()> {
31 // Generate a new random secret key
32 let secret_key = SecretKey::generate();
33 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
34
35 // Set up endpoint with discovery enabled
36 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
37 .secret_key(secret_key.clone())
38 .bind()
39 .await?;
40
41 // Initialize gossip with auto-discovery
42 let gossip = Gossip::builder().spawn(endpoint.clone());
43
44 // Set up protocol router
45 let _router = iroh::protocol::Router::builder(endpoint.clone())
46 .accept(iroh_gossip::ALPN, gossip.clone())
47 .spawn();
48
49 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
50 let initial_secret = b"my-initial-secret".to_vec();
51
52 let record_publisher = RecordPublisher::new(
53 topic_id.clone(),
54 signing_key.clone(),
55 Some(RotationHandle::new(MySecretRotation)),
56 initial_secret,
57 Config::default(),
58 );
59 let (gossip_sender, mut gossip_receiver) = gossip
60 .subscribe_and_join_with_auto_discovery(record_publisher)
61 .await?
62 .split()
63 .await?;
64
65 println!("Joined topic");
66
67 // Spawn listener for incoming messages
68 tokio::spawn(async move {
69 while let Ok(event) = gossip_receiver.next().await {
70 if let Event::Received(msg) = event {
71 println!(
72 "\nMessage from {}: {}",
73 &msg.delivered_from.to_string()[0..8],
74 String::from_utf8(msg.content.to_vec()).unwrap()
75 );
76 } else if let Event::NeighborUp(peer) = event {
77 println!("\nJoined by {}", &peer.to_string()[0..8]);
78 }
79 }
80 println!("\nGossip receiver stream ended");
81 });
82
83 // Main input loop for sending messages
84 let mut buffer = String::new();
85 let stdin = std::io::stdin();
86 loop {
87 print!("\n> ");
88 stdin.read_line(&mut buffer).unwrap();
89 gossip_sender
90 .broadcast(buffer.clone().replace("\n", "").into())
91 .await
92 .unwrap();
93 println!(" - (sent)");
94 buffer.clear();
95 }
96}12async fn main() -> Result<()> {
13 tracing_subscriber::fmt()
14 .with_thread_ids(true)
15 .with_ansi(true)
16 .with_env_filter(
17 EnvFilter::try_from_default_env()
18 .unwrap_or_else(|_| EnvFilter::new("distributed_topic_tracker=debug")),
19 )
20 .init();
21
22 // Generate a new random secret key
23 let secret_key = SecretKey::generate();
24 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
25
26 // Set up endpoint with discovery enabled
27 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
28 .secret_key(secret_key.clone())
29 .bind()
30 .await?;
31
32 // Initialize gossip with auto-discovery
33 let gossip = Gossip::builder().spawn(endpoint.clone());
34
35 // Set up protocol router
36 let _router = iroh::protocol::Router::builder(endpoint.clone())
37 .accept(iroh_gossip::ALPN, gossip.clone())
38 .spawn();
39
40 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
41 let initial_secret = b"my-initial-secret".to_vec();
42
43 let record_publisher = RecordPublisher::new(
44 topic_id.clone(),
45 signing_key.clone(),
46 None,
47 initial_secret,
48 Config::default(),
49 );
50 let (gossip_sender, mut gossip_receiver) = gossip
51 .subscribe_and_join_with_auto_discovery_no_wait(record_publisher)
52 .await?
53 .split()
54 .await?;
55
56 println!("Joined topic");
57
58 // Spawn listener for incoming messages
59 tokio::spawn(async move {
60 while let Ok(event) = gossip_receiver.next().await {
61 if let Event::Received(msg) = event {
62 println!(
63 "\nMessage from {}: {}",
64 &msg.delivered_from.to_string()[0..8],
65 String::from_utf8(msg.content.to_vec()).unwrap()
66 );
67 } else if let Event::NeighborUp(peer) = event {
68 println!("\nJoined by {}", &peer.to_string()[0..8]);
69 }
70 }
71 println!("\nGossip receiver stream ended");
72 });
73
74 // Main input loop for sending messages
75 let mut buffer = String::new();
76 let stdin = std::io::stdin();
77 loop {
78 print!("\n> ");
79 stdin.read_line(&mut buffer).unwrap();
80 gossip_sender
81 .broadcast(
82 buffer
83 .trim_end_matches(&['\r', '\n'][..])
84 .as_bytes()
85 .to_vec(),
86 )
87 .await
88 .unwrap();
89 println!(" - (sent)");
90 buffer.clear();
91 }
92}13async fn main() -> Result<()> {
14 // tracing init - only show distributed_topic_tracker logs
15 use tracing_subscriber::filter::EnvFilter;
16
17 tracing_subscriber::fmt()
18 .with_thread_ids(true)
19 .with_ansi(true)
20 .with_env_filter(
21 EnvFilter::try_from_default_env()
22 .unwrap_or_else(|_| EnvFilter::new("distributed_topic_tracker=debug")),
23 )
24 .init();
25
26 // Generate a new random secret key
27 let secret_key = SecretKey::generate();
28 let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
29
30 // Set up endpoint with discovery enabled
31 let endpoint = Endpoint::builder(iroh::endpoint::presets::N0)
32 .secret_key(secret_key.clone())
33 .bind()
34 .await?;
35
36 // Initialize gossip with auto-discovery
37 let gossip = Gossip::builder().spawn(endpoint.clone());
38
39 // Set up protocol router
40 let _router = iroh::protocol::Router::builder(endpoint.clone())
41 .accept(iroh_gossip::ALPN, gossip.clone())
42 .spawn();
43
44 let topic_id = TopicId::new("my-iroh-gossip-topic".to_string());
45 let initial_secret = b"my-initial-secret".to_vec();
46
47 let record_publisher = RecordPublisher::new(
48 topic_id.clone(),
49 signing_key.clone(),
50 None,
51 initial_secret,
52 Config::builder()
53 .bootstrap_config(
54 BootstrapConfig::builder()
55 .check_older_records_first_on_startup(false)
56 .build(),
57 )
58 .build(),
59 );
60
61 // Split into sink (sending) and stream (receiving)
62 let (gossip_sender, mut gossip_receiver) = gossip
63 .subscribe_and_join_with_auto_discovery(record_publisher)
64 .await?
65 .split()
66 .await?;
67
68 println!("Joined topic");
69
70 // Spawn listener for incoming messages
71 tokio::spawn(async move {
72 while let Ok(event) = gossip_receiver.next().await {
73 if let Event::Received(msg) = event {
74 println!(
75 "\nMessage from {}: {}",
76 &msg.delivered_from.to_string()[0..8],
77 String::from_utf8(msg.content.to_vec()).unwrap()
78 );
79 } else if let Event::NeighborUp(peer) = event {
80 println!("\nJoined by {}", &peer.to_string()[0..8]);
81 }
82 }
83 println!("\nGossip receiver stream ended");
84 });
85
86 // Main input loop for sending messages
87 let mut buffer = String::new();
88 let stdin = std::io::stdin();
89 loop {
90 print!("\n> ");
91 stdin.read_line(&mut buffer).unwrap();
92 gossip_sender
93 .broadcast(
94 buffer
95 .trim_end_matches(&['\r', '\n'][..])
96 .as_bytes()
97 .to_vec(),
98 )
99 .await
100 .unwrap();
101 println!(" - (sent)");
102 buffer.clear();
103 }
104}Sourcepub fn new_record<'a>(
&'a self,
unix_minute: u64,
record_content: impl Serialize + Deserialize<'a>,
) -> Result<Record>
pub fn new_record<'a>( &'a self, unix_minute: u64, record_content: impl Serialize + Deserialize<'a>, ) -> Result<Record>
Create a new signed record with content.
§Arguments
unix_minute- Time slot for this recordrecord_content- Serializable content
Sourcepub fn pub_key(&self) -> VerifyingKey
pub fn pub_key(&self) -> VerifyingKey
Get this publisher’s Ed25519 verifying key.
Sourcepub fn signing_key(&self) -> &SigningKey
pub fn signing_key(&self) -> &SigningKey
Get the signing key.
Sourcepub fn secret_rotation(&self) -> Option<RotationHandle>
pub fn secret_rotation(&self) -> Option<RotationHandle>
Get the secret rotation handle if set.
Sourcepub fn initial_secret_hash(&self) -> [u8; 32]
pub fn initial_secret_hash(&self) -> [u8; 32]
Get the initial secret hash.
Source§impl RecordPublisher
impl RecordPublisher
Sourcepub async fn publish_record(
&self,
record: Record,
cancel_token: CancellationToken,
) -> Result<()>
pub async fn publish_record( &self, record: Record, cancel_token: CancellationToken, ) -> Result<()>
Publish a record to the DHT if slot capacity allows.
Checks existing record count for this time slot and skips publishing if
self.config.bootstrap_config().max_bootstrap_records() limit reached.
Sourcepub async fn publish_record_cached_records(
&self,
record: Record,
cached_records: Option<HashSet<Record>>,
cancel_token: CancellationToken,
) -> Result<()>
pub async fn publish_record_cached_records( &self, record: Record, cached_records: Option<HashSet<Record>>, cancel_token: CancellationToken, ) -> Result<()>
Publish a record to the DHT (using cached get_records) if slot capacity allows.
Checks existing record count for this time slot and skips publishing if
self.config.bootstrap_config().max_bootstrap_records() limit reached.
Sourcepub async fn get_records(
&self,
unix_minute: u64,
cancel_token: CancellationToken,
) -> Result<HashSet<Record>>
pub async fn get_records( &self, unix_minute: u64, cancel_token: CancellationToken, ) -> Result<HashSet<Record>>
Retrieve all verified records for a given time slot from the DHT.
Filters out records from this publisher’s own node ID. Dedup’s records based on pub_key, keeping the highest sequence number per pub_key.
Trait Implementations§
Source§impl Clone for RecordPublisher
impl Clone for RecordPublisher
Source§fn clone(&self) -> RecordPublisher
fn clone(&self) -> RecordPublisher
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreAuto Trait Implementations§
impl Freeze for RecordPublisher
impl !RefUnwindSafe for RecordPublisher
impl Send for RecordPublisher
impl Sync for RecordPublisher
impl Unpin for RecordPublisher
impl UnsafeUnpin for RecordPublisher
impl !UnwindSafe for RecordPublisher
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more