distributed_topic_tracker/gossip/topic/
topic.rs1use std::sync::{Arc, Weak};
4
5use crate::{
6 BubbleMergeConfig, Config, GossipReceiver, GossipSender, MessageOverlapMergeConfig, RecordPublisher, config::PublisherConfig, gossip::{
7 merge::{BubbleMerge, MessageOverlapMerge},
8 topic::{bootstrap::Bootstrap, publisher::Publisher},
9 }
10};
11use actor_helper::{Handle, act, act_ok};
12use anyhow::Result;
13use tokio_util::sync::CancellationToken;
14
15#[derive(Debug, Clone)]
20pub struct Topic {
21 api: Arc<Handle<TopicActor, anyhow::Error>>,
22 cancel_token: CancellationToken,
23}
24
25#[derive(Debug)]
26struct TopicActor {
27 bootstrap: Bootstrap,
28 publisher: Option<Publisher>,
29 bubble_merge: Option<BubbleMerge>,
30 message_overlap_merge: Option<MessageOverlapMerge>,
31 record_publisher: RecordPublisher,
32 cancel_token: CancellationToken,
33}
34
35impl Drop for TopicActor {
36 fn drop(&mut self) {
37 self.cancel_token.cancel();
38 }
39}
40
41impl Topic {
42 pub async fn new(
50 record_publisher: RecordPublisher,
51 gossip: iroh_gossip::net::Gossip,
52 async_bootstrap: bool,
53 ) -> Result<Self> {
54 tracing::debug!(
55 "Topic: creating new topic (async_bootstrap={})",
56 async_bootstrap
57 );
58
59 let cancel_token = CancellationToken::new();
60 let bootstrap = Bootstrap::new(
61 record_publisher.clone(),
62 gossip.clone(),
63 cancel_token.clone(),
64 record_publisher.config().timeouts().clone(),
65 record_publisher.config().bootstrap_config().clone(),
66 )
67 .await?;
68 tracing::debug!("Topic: bootstrap instance created");
69
70 let api = Arc::new(
71 Handle::spawn(TopicActor {
72 bootstrap: bootstrap.clone(),
73 record_publisher: record_publisher.clone(),
74 publisher: None,
75 bubble_merge: None,
76 message_overlap_merge: None,
77 cancel_token: cancel_token.clone(),
78 })
79 .0,
80 );
81
82 let bootstrap_done = bootstrap.bootstrap().await?;
83 let config = record_publisher.config().clone();
84 tokio::spawn({
85 let api = Arc::downgrade(&api);
86 let config = config.clone();
87 let cancel_token = cancel_token.clone();
88 async move {
89 if let Err(err) = wait_for_bootstrap(bootstrap_done, cancel_token.clone()).await {
90 tracing::warn!("bootstrap failed: {}", err);
91 return;
92 }
93
94 if async_bootstrap {
95 tracing::debug!("Bootstrap completed, now spawning workers");
96 if let Err(err) = spawn_workers(api, config, cancel_token.clone()).await {
97 cancel_token.cancel();
98 tracing::warn!("failed to spawn workers: {}", err);
99 }
100 }
101 }
102 });
103
104 if !async_bootstrap {
105 tracing::debug!("Topic: waiting for bootstrap to complete");
106 bootstrap.gossip_receiver().await?.joined().await?;
107 if let Err(err) =
108 spawn_workers(Arc::downgrade(&api), config, cancel_token.clone()).await
109 {
110 tracing::warn!("failed to spawn workers: {}", err);
111 cancel_token.cancel();
112 return Err(anyhow::anyhow!("failed to spawn workers: {}", err));
113 }
114 tracing::debug!("Topic: bootstrap completed");
115 } else {
116 tracing::debug!("Topic: bootstrap started asynchronously");
117 }
118
119 Ok(Self { api, cancel_token })
120 }
121
122 pub async fn split(&self) -> Result<(GossipSender, GossipReceiver)> {
124 Ok((self.gossip_sender().await?, self.gossip_receiver().await?))
125 }
126
127 pub async fn gossip_sender(&self) -> Result<GossipSender> {
129 let topic_ref = Arc::new(self.clone());
130 self.api
131 .call(act!(actor => async move {
132 let mut sender = actor.bootstrap.gossip_sender().await?;
133 sender._topic_keep_alive = Some(topic_ref.clone());
134 Ok(sender)
135
136 }))
137 .await
138 }
139
140 pub async fn gossip_receiver(&self) -> Result<GossipReceiver> {
142 let topic_ref = Arc::new(self.clone());
143 self.api
144 .call(act!(actor => async move {
145 let mut receiver = actor.bootstrap.gossip_receiver().await?;
146 receiver._topic_keep_alive = Some(topic_ref.clone());
147 Ok(receiver)
148 }))
149 .await
150 }
151
152 pub async fn record_creator(&self) -> Result<RecordPublisher> {
154 self.api
155 .call(act_ok!(actor => async move { actor.record_publisher.clone() }))
156 .await
157 }
158
159 #[allow(dead_code)]
160 pub(crate) fn cancel_token(&self) -> CancellationToken {
161 self.cancel_token.clone()
162 }
163}
164
165async fn wait_for_bootstrap(
166 bootstrap_done: tokio::sync::oneshot::Receiver<Result<()>>,
167 cancel_token: CancellationToken,
168) -> Result<()> {
169 if let Ok(Ok(_)) = bootstrap_done.await {
170 Ok(())
171 } else {
172 tracing::error!("Topic: bootstrap failed or cancelled, shutting down topic");
173 cancel_token.cancel();
174 Err(anyhow::anyhow!("bootstrap failed or cancelled"))
175 }
176}
177
178async fn spawn_workers(
179 api: Weak<Handle<TopicActor, anyhow::Error>>,
180 config: Config,
181 cancel_token: CancellationToken,
182) -> Result<()> {
183 if !cancel_token.is_cancelled() {
184 if matches!(config.publisher_config(), PublisherConfig::Enabled(_)) {
185 tracing::debug!("Topic: starting publisher");
186 match api.upgrade() {
187 Some(api) => {
188 if let Err(err) = api.call(act!(actor => actor.start_publishing())).await {
189 return Err(anyhow::anyhow!("failed to start publisher: {err}"));
190 }
191 }
192 None => {
193 return Err(anyhow::anyhow!(
194 "failed to start publisher, topic actor dropped"
195 ));
196 }
197 }
198 }
199
200 if matches!(
201 config.merge_config().bubble_merge(),
202 BubbleMergeConfig::Enabled(_)
203 ) {
204 tracing::debug!("Topic: starting bubble merge");
205 match api.upgrade() {
206 Some(api) => {
207 if let Err(err) = api.call(act!(actor => actor.start_bubble_merge())).await {
208 return Err(anyhow::anyhow!("failed to start bubble merge: {err}"));
209 }
210 }
211 None => {
212 return Err(anyhow::anyhow!(
213 "failed to start bubble merge, topic actor dropped"
214 ));
215 }
216 }
217 }
218
219 if matches!(
220 config.merge_config().message_overlap_merge(),
221 MessageOverlapMergeConfig::Enabled(_)
222 ) {
223 tracing::debug!("Topic: starting message overlap merge");
224 match api.upgrade() {
225 Some(api) => {
226 if let Err(err) = api
227 .call(act!(actor => actor.start_message_overlap_merge()))
228 .await
229 {
230 return Err(anyhow::anyhow!(
231 "failed to start message overlap merge: {err}"
232 ));
233 }
234 }
235 None => {
236 return Err(anyhow::anyhow!(
237 "failed to start message overlap merge, topic actor dropped"
238 ));
239 }
240 }
241 }
242 tracing::debug!("Topic: spawn_worker finished");
243 Ok(())
244 } else {
245 tracing::warn!("Topic: cancelled before workers could be spawned");
246 Err(anyhow::anyhow!("cancelled before workers could be spawned"))
247 }
248}
249
250impl TopicActor {
251 pub async fn start_publishing(&mut self) -> Result<()> {
252 if let PublisherConfig::Enabled(config) = self.record_publisher.config().publisher_config()
253 {
254 tracing::debug!("TopicActor: initializing publisher");
255 let publisher = async {
256 Publisher::new(
257 self.record_publisher.clone(),
258 self.bootstrap.gossip_receiver().await?,
259 self.cancel_token.clone(),
260 config.initial_delay(),
261 config.base_interval(),
262 config.max_jitter(),
263 )
264 };
265
266 match publisher.await {
267 Ok(publisher) => {
268 self.publisher = Some(publisher);
269 tracing::debug!("TopicActor: publisher started");
270 }
271 Err(err) => {
272 if config.fail_topic_creation_on_publishing_startup_failure() {
273 return Err(anyhow::anyhow!("failed to start publisher: {}", err));
274 } else {
275 tracing::warn!(
276 "TopicActor: failed to start publisher: {}, but continuing because Publisher.fail_topic_creation_on_publishing_startup_failure is false",
277 err
278 );
279 }
280 }
281 }
282 }
283 Ok(())
284 }
285
286 pub async fn start_bubble_merge(&mut self) -> Result<()> {
287 if let BubbleMergeConfig::Enabled(config) =
288 self.record_publisher.config().merge_config().bubble_merge()
289 {
290 tracing::debug!("TopicActor: initializing bubble merge");
291 let bubble_merge = async {
292 BubbleMerge::new(
293 self.record_publisher.clone(),
294 self.bootstrap.gossip_sender().await?,
295 self.bootstrap.gossip_receiver().await?,
296 self.cancel_token.clone(),
297 config.max_join_peers(),
298 config.initial_interval(),
299 config.base_interval(),
300 config.max_jitter(),
301 config.min_neighbors(),
302 )
303 };
304
305 match bubble_merge.await {
306 Ok(bubble_merge) => {
307 self.bubble_merge = Some(bubble_merge);
308 tracing::debug!("TopicActor: bubble merge started");
309 }
310 Err(err) => {
311 if config.fail_topic_creation_on_merge_startup_failure() {
312 return Err(anyhow::anyhow!("failed to start bubble merge: {}", err));
313 } else {
314 tracing::warn!(
315 "TopicActor: failed to start bubble merge: {}, but continuing because BubbleMerge.fail_topic_creation_on_merge_startup_failure is false",
316 err
317 );
318 }
319 }
320 }
321 }
322 Ok(())
323 }
324
325 pub async fn start_message_overlap_merge(&mut self) -> Result<()> {
326 if let MessageOverlapMergeConfig::Enabled(config) = self
327 .record_publisher
328 .config()
329 .merge_config()
330 .message_overlap_merge()
331 {
332 tracing::debug!("TopicActor: initializing message overlap merge");
333 let message_overlap_merge = async {
334 MessageOverlapMerge::new(
335 self.record_publisher.clone(),
336 self.bootstrap.gossip_sender().await?,
337 self.bootstrap.gossip_receiver().await?,
338 self.cancel_token.clone(),
339 config.max_join_peers(),
340 config.initial_interval(),
341 config.base_interval(),
342 config.max_jitter(),
343 )
344 };
345
346 match message_overlap_merge.await {
347 Ok(message_overlap_merge) => {
348 self.message_overlap_merge = Some(message_overlap_merge);
349 tracing::debug!("TopicActor: message overlap merge started");
350 }
351 Err(err) => {
352 if config.fail_topic_creation_on_merge_startup_failure() {
353 return Err(anyhow::anyhow!(
354 "failed to start message overlap merge: {}",
355 err
356 ));
357 } else {
358 tracing::warn!(
359 "TopicActor: failed to start message overlap merge: {}, but continuing because MessageOverlapMerge.fail_topic_creation_on_merge_startup_failure is false",
360 err
361 );
362 }
363 }
364 }
365 }
366 Ok(())
367 }
368}
369
370#[cfg(test)]
371mod tests {
372 use crate::RecordPublisher;
373
374 #[tokio::test]
375 async fn test_receiver_returns_none_after_shutdown() {
376 let secret_key = iroh::SecretKey::generate();
377 let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes());
378 let endpoint = iroh::Endpoint::builder(iroh::endpoint::presets::N0)
379 .secret_key(secret_key.clone())
380 .bind()
381 .await
382 .expect("failed to bind endpoint");
383 let gossip = iroh_gossip::net::Gossip::builder().spawn(endpoint.clone());
384
385 let topic_id = crate::TopicId::new("shutdown-receiver-test".to_string());
386 let initial_secret = b"my-initial-secret".to_vec();
387
388 let record_publisher = RecordPublisher::new(
389 topic_id.clone(),
390 signing_key.clone(),
391 None,
392 initial_secret,
393 crate::config::Config::default(),
394 );
395
396 let topic = crate::Topic::new(record_publisher, gossip.clone(), true)
397 .await
398 .expect("failed to create topic");
399
400 let cancel_token = topic.cancel_token();
401 let (_sender, receiver) = topic.split().await.expect("failed to split topic");
402
403 let mut survivor = receiver.clone();
405
406 cancel_token.cancel();
407
408 let result = tokio::time::timeout(std::time::Duration::from_secs(5), survivor.next())
411 .await
412 .expect("next() hung after shutdown - broadcast channel didn't close");
413 assert!(result.is_err(), "expected Err from next() after shutdown");
414
415 let result = tokio::time::timeout(std::time::Duration::from_secs(5), survivor.joined())
417 .await
418 .expect("joined() hung after shutdown - broadcast channel didn't close");
419 assert!(result.is_err(), "expected Err from joined() after shutdown");
420
421 let mut late_clone = survivor.clone();
424
425 let result = tokio::time::timeout(std::time::Duration::from_secs(5), late_clone.next())
426 .await
427 .expect("next() hung on post shutdown clone, WeakSender upgrade should fail");
428 assert!(
429 result.is_err(),
430 "expected Err from next() on post shutdown clone"
431 );
432
433 let result = tokio::time::timeout(std::time::Duration::from_secs(5), late_clone.joined())
434 .await
435 .expect("joined() hung on post shutdown clone, WeakSender upgrade should fail");
436 assert!(
437 result.is_err(),
438 "expected Err from joined() on post shutdown clone"
439 );
440 }
441
442 #[tokio::test]
443 async fn test_topic_full_shutdown_on_drop() {
444 let secret_key = iroh::SecretKey::generate();
445 let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes());
446 let endpoint = iroh::Endpoint::builder(iroh::endpoint::presets::N0)
447 .secret_key(secret_key.clone())
448 .bind()
449 .await
450 .expect("failed to bind endpoint");
451 let gossip = iroh_gossip::net::Gossip::builder().spawn(endpoint.clone());
452
453 let topic_id = crate::TopicId::new("my-iroh-gossip-topic".to_string());
454 let initial_secret = b"my-initial-secret".to_vec();
455
456 let record_publisher = RecordPublisher::new(
457 topic_id.clone(),
458 signing_key.clone(),
459 None,
460 initial_secret,
461 crate::config::Config::default(),
462 );
463
464 let topic = crate::Topic::new(record_publisher, gossip.clone(), true)
465 .await
466 .expect("failed to create Topic");
467
468 let cancel_token = topic.cancel_token();
469
470 let (sender, receiver) = topic.split().await.expect("failed to split topic");
471
472 assert!(!cancel_token.is_cancelled());
473
474 drop(sender);
475 drop(receiver);
476 drop(topic);
477
478 tokio::time::timeout(std::time::Duration::from_secs(5), cancel_token.cancelled())
479 .await
480 .expect("cancel token timed out");
481
482 assert!(cancel_token.is_cancelled());
483 }
484
485 #[tokio::test]
486 async fn test_topic_survives_topic_drop_split() {
487 let secret_key = iroh::SecretKey::generate();
488 let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes());
489 let endpoint = iroh::Endpoint::builder(iroh::endpoint::presets::N0)
490 .secret_key(secret_key.clone())
491 .bind()
492 .await
493 .expect("failed to bind endpoint");
494 let gossip = iroh_gossip::net::Gossip::builder().spawn(endpoint.clone());
495
496 let topic_id = crate::TopicId::new("my-iroh-gossip-topic-survives-drop-split".to_string());
497 let initial_secret = b"my-initial-secret".to_vec();
498
499 let record_publisher = RecordPublisher::new(
500 topic_id.clone(),
501 signing_key.clone(),
502 None,
503 initial_secret,
504 crate::config::Config::default(),
505 );
506
507 let topic = crate::Topic::new(record_publisher, gossip.clone(), true)
508 .await
509 .expect("failed to create Topic");
510
511 let cancel_token = topic.cancel_token();
512 let (_sender, _receiver) = topic.split().await.expect("failed to split topic");
513
514 drop(topic);
515
516 assert!(
517 tokio::time::timeout(std::time::Duration::from_secs(2), cancel_token.cancelled())
518 .await
519 .is_err()
520 );
521
522 assert!(!cancel_token.is_cancelled());
523
524 drop(_sender);
525 drop(_receiver);
526
527 assert!(
528 tokio::time::timeout(std::time::Duration::from_secs(2), cancel_token.cancelled())
529 .await
530 .is_ok()
531 );
532
533 assert!(cancel_token.is_cancelled());
534 }
535
536 #[tokio::test]
537 async fn test_topic_survives_topic_drop_manual_sender_receiver() {
538 let secret_key = iroh::SecretKey::generate();
539 let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes());
540 let endpoint = iroh::Endpoint::builder(iroh::endpoint::presets::N0)
541 .secret_key(secret_key.clone())
542 .bind()
543 .await
544 .expect("failed to bind endpoint");
545 let gossip = iroh_gossip::net::Gossip::builder().spawn(endpoint.clone());
546
547 let topic_id = crate::TopicId::new(
548 "my-iroh-gossip-topic-survives-drop-manual-sender-receiver".to_string(),
549 );
550 let initial_secret = b"my-initial-secret".to_vec();
551
552 let record_publisher = RecordPublisher::new(
553 topic_id.clone(),
554 signing_key.clone(),
555 None,
556 initial_secret,
557 crate::config::Config::default(),
558 );
559
560 let topic = crate::Topic::new(record_publisher, gossip.clone(), true)
561 .await
562 .expect("failed to create Topic");
563
564 let cancel_token = topic.cancel_token();
565 let (_sender, _receiver) = (
566 topic
567 .gossip_sender()
568 .await
569 .expect("failed to get gossip sender"),
570 topic
571 .gossip_receiver()
572 .await
573 .expect("failed to get gossip receiver"),
574 );
575
576 drop(topic);
577
578 assert!(
579 tokio::time::timeout(std::time::Duration::from_secs(2), cancel_token.cancelled())
580 .await
581 .is_err()
582 );
583
584 assert!(!cancel_token.is_cancelled());
585
586 drop(_sender);
587 drop(_receiver);
588
589 assert!(
590 tokio::time::timeout(std::time::Duration::from_secs(2), cancel_token.cancelled())
591 .await
592 .is_ok()
593 );
594
595 assert!(cancel_token.is_cancelled());
596 }
597}