distributed_topic_tracker/gossip/topic/
topic.rs1use std::sync::{Arc, Weak};
4
5use crate::{
6 BubbleMergeConfig, Config, GossipSender, MessageOverlapMergeConfig,
7 config::PublisherConfig,
8 gossip::{
9 merge::{BubbleMerge, MessageOverlapMerge},
10 topic::{bootstrap::Bootstrap, publisher::Publisher},
11 },
12};
13use actor_helper::{Handle, act, act_ok};
14use anyhow::Result;
15use tokio_util::sync::CancellationToken;
16
17#[derive(Debug, Clone)]
22pub struct Topic {
23 api: Arc<Handle<TopicActor, anyhow::Error>>,
24 cancel_token: CancellationToken,
25}
26
27#[derive(Debug)]
28struct TopicActor {
29 bootstrap: Bootstrap,
30 publisher: Option<Publisher>,
31 bubble_merge: Option<BubbleMerge>,
32 message_overlap_merge: Option<MessageOverlapMerge>,
33 record_publisher: crate::crypto::RecordPublisher,
34 cancel_token: CancellationToken,
35}
36
37impl Drop for TopicActor {
38 fn drop(&mut self) {
39 self.cancel_token.cancel();
40 }
41}
42
43impl Topic {
44 pub async fn new(
52 record_publisher: crate::crypto::RecordPublisher,
53 gossip: iroh_gossip::net::Gossip,
54 async_bootstrap: bool,
55 ) -> Result<Self> {
56 tracing::debug!(
57 "Topic: creating new topic (async_bootstrap={})",
58 async_bootstrap
59 );
60
61 let cancel_token = CancellationToken::new();
62 let bootstrap = Bootstrap::new(
63 record_publisher.clone(),
64 gossip.clone(),
65 cancel_token.clone(),
66 record_publisher.config().timeouts().clone(),
67 record_publisher.config().bootstrap_config().clone(),
68 )
69 .await?;
70 tracing::debug!("Topic: bootstrap instance created");
71
72 let api = Arc::new(
73 Handle::spawn(TopicActor {
74 bootstrap: bootstrap.clone(),
75 record_publisher: record_publisher.clone(),
76 publisher: None,
77 bubble_merge: None,
78 message_overlap_merge: None,
79 cancel_token: cancel_token.clone(),
80 })
81 .0,
82 );
83
84 let bootstrap_done = bootstrap.bootstrap().await?;
85 let config = record_publisher.config().clone();
86 tokio::spawn({
87 let api = Arc::downgrade(&api);
88 let config = config.clone();
89 let cancel_token = cancel_token.clone();
90 async move {
91 if let Err(err) = wait_for_bootstrap(bootstrap_done, cancel_token.clone()).await {
92 tracing::warn!("bootstrap failed: {}", err);
93 return;
94 }
95
96 if async_bootstrap {
97 tracing::debug!("Bootstrap completed, now spawning workers");
98 if let Err(err) = spawn_workers(api, config, cancel_token.clone()).await {
99 cancel_token.cancel();
100 tracing::warn!("failed to spawn workers: {}", err);
101 }
102 }
103 }
104 });
105
106 if !async_bootstrap {
107 tracing::debug!("Topic: waiting for bootstrap to complete");
108 bootstrap.gossip_receiver().await?.joined().await?;
109 if let Err(err) =
110 spawn_workers(Arc::downgrade(&api), config, cancel_token.clone()).await
111 {
112 tracing::warn!("failed to spawn workers: {}", err);
113 cancel_token.cancel();
114 return Err(anyhow::anyhow!("failed to spawn workers: {}", err));
115 }
116 tracing::debug!("Topic: bootstrap completed");
117 } else {
118 tracing::debug!("Topic: bootstrap started asynchronously");
119 }
120
121 Ok(Self { api, cancel_token })
122 }
123
124 pub async fn split(&self) -> Result<(GossipSender, crate::gossip::receiver::GossipReceiver)> {
126 let topic_ref = Arc::new(self.clone());
127 let mut sender = self.gossip_sender().await?;
128 let mut receiver = self.gossip_receiver().await?;
129 sender._topic_keep_alive = Some(topic_ref.clone());
130 receiver._topic_keep_alive = Some(topic_ref);
131 Ok((sender, receiver))
132 }
133
134 pub async fn gossip_sender(&self) -> Result<GossipSender> {
136 self.api
137 .call(act!(actor => actor.bootstrap.gossip_sender()))
138 .await
139 }
140
141 pub async fn gossip_receiver(&self) -> Result<crate::gossip::receiver::GossipReceiver> {
143 self.api
144 .call(act!(actor => actor.bootstrap.gossip_receiver()))
145 .await
146 }
147
148 pub async fn record_creator(&self) -> Result<crate::crypto::RecordPublisher> {
150 self.api
151 .call(act_ok!(actor => async move { actor.record_publisher.clone() }))
152 .await
153 }
154
155 #[allow(dead_code)]
156 pub(crate) fn cancel_token(&self) -> CancellationToken {
157 self.cancel_token.clone()
158 }
159}
160
161async fn wait_for_bootstrap(
162 bootstrap_done: tokio::sync::oneshot::Receiver<Result<()>>,
163 cancel_token: CancellationToken,
164) -> Result<()> {
165 if let Ok(Ok(_)) = bootstrap_done.await {
166 Ok(())
167 } else {
168 tracing::error!("Topic: bootstrap failed or cancelled, shutting down topic");
169 cancel_token.cancel();
170 Err(anyhow::anyhow!("bootstrap failed or cancelled"))
171 }
172}
173
174async fn spawn_workers(
175 api: Weak<Handle<TopicActor, anyhow::Error>>,
176 config: Config,
177 cancel_token: CancellationToken,
178) -> Result<()> {
179 if !cancel_token.is_cancelled() {
180 if matches!(config.publisher_config(), PublisherConfig::Enabled(_)) {
181 tracing::debug!("Topic: starting publisher");
182 match api.upgrade() {
183 Some(api) => {
184 if let Err(err) = api.call(act!(actor => actor.start_publishing())).await {
185 return Err(anyhow::anyhow!("failed to start publisher: {err}"));
186 }
187 }
188 None => {
189 return Err(anyhow::anyhow!(
190 "failed to start publisher, topic actor dropped"
191 ));
192 }
193 }
194 }
195
196 if matches!(
197 config.merge_config().bubble_merge(),
198 BubbleMergeConfig::Enabled(_)
199 ) {
200 tracing::debug!("Topic: starting bubble merge");
201 match api.upgrade() {
202 Some(api) => {
203 if let Err(err) = api.call(act!(actor => actor.start_bubble_merge())).await {
204 return Err(anyhow::anyhow!("failed to start bubble merge: {err}"));
205 }
206 }
207 None => {
208 return Err(anyhow::anyhow!(
209 "failed to start bubble merge, topic actor dropped"
210 ));
211 }
212 }
213 }
214
215 if matches!(
216 config.merge_config().message_overlap_merge(),
217 MessageOverlapMergeConfig::Enabled(_)
218 ) {
219 tracing::debug!("Topic: starting message overlap merge");
220 match api.upgrade() {
221 Some(api) => {
222 if let Err(err) = api
223 .call(act!(actor => actor.start_message_overlap_merge()))
224 .await
225 {
226 return Err(anyhow::anyhow!(
227 "failed to start message overlap merge: {err}"
228 ));
229 }
230 }
231 None => {
232 return Err(anyhow::anyhow!(
233 "failed to start message overlap merge, topic actor dropped"
234 ));
235 }
236 }
237 }
238 tracing::debug!("Topic: spawn_worker finished");
239 Ok(())
240 } else {
241 tracing::warn!("Topic: cancelled before workers could be spawned");
242 Err(anyhow::anyhow!("cancelled before workers could be spawned"))
243 }
244}
245
246impl TopicActor {
247 pub async fn start_publishing(&mut self) -> Result<()> {
248 if let PublisherConfig::Enabled(config) = self.record_publisher.config().publisher_config()
249 {
250 tracing::debug!("TopicActor: initializing publisher");
251 let publisher = async {
252 Publisher::new(
253 self.record_publisher.clone(),
254 self.bootstrap.gossip_receiver().await?,
255 self.cancel_token.clone(),
256 config.initial_delay(),
257 config.base_interval(),
258 config.max_jitter(),
259 )
260 };
261
262 match publisher.await {
263 Ok(publisher) => {
264 self.publisher = Some(publisher);
265 tracing::debug!("TopicActor: publisher started");
266 }
267 Err(err) => {
268 if config.fail_topic_creation_on_publishing_startup_failure() {
269 return Err(anyhow::anyhow!("failed to start publisher: {}", err));
270 } else {
271 tracing::warn!(
272 "TopicActor: failed to start publisher: {}, but continuing because Publisher.fail_topic_creation_on_publishing_startup_failure is false",
273 err
274 );
275 }
276 }
277 }
278 }
279 Ok(())
280 }
281
282 pub async fn start_bubble_merge(&mut self) -> Result<()> {
283 if let BubbleMergeConfig::Enabled(config) =
284 self.record_publisher.config().merge_config().bubble_merge()
285 {
286 tracing::debug!("TopicActor: initializing bubble merge");
287 let bubble_merge = async {
288 BubbleMerge::new(
289 self.record_publisher.clone(),
290 self.bootstrap.gossip_sender().await?,
291 self.bootstrap.gossip_receiver().await?,
292 self.cancel_token.clone(),
293 self.record_publisher.config().max_join_peer_count(),
294 config.base_interval(),
295 config.max_jitter(),
296 config.min_neighbors(),
297 )
298 };
299
300 match bubble_merge.await {
301 Ok(bubble_merge) => {
302 self.bubble_merge = Some(bubble_merge);
303 tracing::debug!("TopicActor: bubble merge started");
304 }
305 Err(err) => {
306 if config.fail_topic_creation_on_merge_startup_failure() {
307 return Err(anyhow::anyhow!("failed to start bubble merge: {}", err));
308 } else {
309 tracing::warn!(
310 "TopicActor: failed to start bubble merge: {}, but continuing because BubbleMerge.fail_topic_creation_on_merge_startup_failure is false",
311 err
312 );
313 }
314 }
315 }
316 }
317 Ok(())
318 }
319
320 pub async fn start_message_overlap_merge(&mut self) -> Result<()> {
321 if let MessageOverlapMergeConfig::Enabled(config) = self
322 .record_publisher
323 .config()
324 .merge_config()
325 .message_overlap_merge()
326 {
327 tracing::debug!("TopicActor: initializing message overlap merge");
328 let message_overlap_merge = async {
329 MessageOverlapMerge::new(
330 self.record_publisher.clone(),
331 self.bootstrap.gossip_sender().await?,
332 self.bootstrap.gossip_receiver().await?,
333 self.cancel_token.clone(),
334 self.record_publisher.config().max_join_peer_count(),
335 config.base_interval(),
336 config.max_jitter(),
337 )
338 };
339
340 match message_overlap_merge.await {
341 Ok(message_overlap_merge) => {
342 self.message_overlap_merge = Some(message_overlap_merge);
343 tracing::debug!("TopicActor: message overlap merge started");
344 }
345 Err(err) => {
346 if config.fail_topic_creation_on_merge_startup_failure() {
347 return Err(anyhow::anyhow!(
348 "failed to start message overlap merge: {}",
349 err
350 ));
351 } else {
352 tracing::warn!(
353 "TopicActor: failed to start message overlap merge: {}, but continuing because MessageOverlapMerge.fail_topic_creation_on_merge_startup_failure is false",
354 err
355 );
356 }
357 }
358 }
359 }
360 Ok(())
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 #[tokio::test]
367 async fn test_receiver_returns_none_after_shutdown() {
368 let secret_key = iroh::SecretKey::generate(&mut rand::rng());
369 let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes());
370 let endpoint = iroh::Endpoint::builder(iroh::endpoint::presets::N0)
371 .secret_key(secret_key.clone())
372 .bind()
373 .await
374 .expect("failed to bind endpoint");
375 let gossip = iroh_gossip::net::Gossip::builder().spawn(endpoint.clone());
376
377 let topic_id = crate::TopicId::new("shutdown-receiver-test".to_string());
378 let initial_secret = b"my-initial-secret".to_vec();
379
380 let record_publisher = crate::RecordPublisher::new(
381 topic_id.clone(),
382 signing_key.clone(),
383 None,
384 initial_secret,
385 crate::config::Config::default(),
386 );
387
388 let topic = crate::Topic::new(record_publisher, gossip.clone(), true)
389 .await
390 .expect("failed to create topic");
391
392 let cancel_token = topic.cancel_token();
393 let (_sender, receiver) = topic.split().await.expect("failed to split topic");
394
395 let mut survivor = receiver.clone();
397
398 cancel_token.cancel();
399
400 let result = tokio::time::timeout(std::time::Duration::from_secs(5), survivor.next())
403 .await
404 .expect("next() hung after shutdown - broadcast channel didn't close");
405 assert!(result.is_err(), "expected Err from next() after shutdown");
406
407 let result = tokio::time::timeout(std::time::Duration::from_secs(5), survivor.joined())
409 .await
410 .expect("joined() hung after shutdown - broadcast channel didn't close");
411 assert!(result.is_err(), "expected Err from joined() after shutdown");
412
413 let mut late_clone = survivor.clone();
416
417 let result = tokio::time::timeout(std::time::Duration::from_secs(5), late_clone.next())
418 .await
419 .expect("next() hung on post shutdown clone, WeakSender upgrade should fail");
420 assert!(
421 result.is_err(),
422 "expected Err from next() on post shutdown clone"
423 );
424
425 let result = tokio::time::timeout(std::time::Duration::from_secs(5), late_clone.joined())
426 .await
427 .expect("joined() hung on post shutdown clone, WeakSender upgrade should fail");
428 assert!(
429 result.is_err(),
430 "expected Err from joined() on post shutdown clone"
431 );
432 }
433
434 #[tokio::test]
435 async fn test_topic_full_shutdown_on_drop() {
436 let secret_key = iroh::SecretKey::generate(&mut rand::rng());
437 let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes());
438 let endpoint = iroh::Endpoint::builder(iroh::endpoint::presets::N0)
439 .secret_key(secret_key.clone())
440 .bind()
441 .await
442 .expect("failed to bind endpoint");
443 let gossip = iroh_gossip::net::Gossip::builder().spawn(endpoint.clone());
444
445 let topic_id = crate::TopicId::new("my-iroh-gossip-topic".to_string());
446 let initial_secret = b"my-initial-secret".to_vec();
447
448 let record_publisher = crate::RecordPublisher::new(
449 topic_id.clone(),
450 signing_key.clone(),
451 None,
452 initial_secret,
453 crate::config::Config::default(),
454 );
455
456 let topic = crate::Topic::new(record_publisher, gossip.clone(), true)
457 .await
458 .expect("failed to create Topic");
459
460 let cancel_token = topic.cancel_token();
461
462 let (sender, receiver) = topic.split().await.expect("failed to split topic");
463
464 assert!(!cancel_token.is_cancelled());
465
466 drop(sender);
467 drop(receiver);
468 drop(topic);
469
470 tokio::time::timeout(std::time::Duration::from_secs(5), cancel_token.cancelled())
471 .await
472 .expect("cancel token timed out");
473
474 assert!(cancel_token.is_cancelled());
475 }
476}