Skip to main content

distributed_topic_tracker/gossip/topic/
topic.rs

1//! Main topic handle combining bootstrap, publishing, and merging.
2
3use std::sync::{Arc, Weak};
4
5use crate::{
6    BubbleMergeConfig, Config, GossipReceiver, GossipSender, MessageOverlapMergeConfig, RecordPublisher, config::PublisherConfig, gossip::{
7        merge::{BubbleMerge, MessageOverlapMerge},
8        topic::{bootstrap::Bootstrap, publisher::Publisher},
9    }
10};
11use actor_helper::{Handle, act, act_ok};
12use anyhow::Result;
13use tokio_util::sync::CancellationToken;
14
15/// Handle to a joined gossip topic with auto-discovery.
16///
17/// Manages bootstrap, publishing, bubble detection, and split-brain recovery.
18/// Can be split into sender and receiver for message exchange.
19#[derive(Debug, Clone)]
20pub struct Topic {
21    api: Arc<Handle<TopicActor, anyhow::Error>>,
22    cancel_token: CancellationToken,
23}
24
25#[derive(Debug)]
26struct TopicActor {
27    bootstrap: Bootstrap,
28    publisher: Option<Publisher>,
29    bubble_merge: Option<BubbleMerge>,
30    message_overlap_merge: Option<MessageOverlapMerge>,
31    record_publisher: RecordPublisher,
32    cancel_token: CancellationToken,
33}
34
35impl Drop for TopicActor {
36    fn drop(&mut self) {
37        self.cancel_token.cancel();
38    }
39}
40
41impl Topic {
42    /// Create and initialize a new topic with auto-discovery.
43    ///
44    /// # Arguments
45    ///
46    /// * `record_publisher` - Record publisher for DHT operations
47    /// * `gossip` - Gossip instance for topic subscription
48    /// * `async_bootstrap` - If false, awaits until bootstrap completes
49    pub async fn new(
50        record_publisher: RecordPublisher,
51        gossip: iroh_gossip::net::Gossip,
52        async_bootstrap: bool,
53    ) -> Result<Self> {
54        tracing::debug!(
55            "Topic: creating new topic (async_bootstrap={})",
56            async_bootstrap
57        );
58
59        let cancel_token = CancellationToken::new();
60        let bootstrap = Bootstrap::new(
61            record_publisher.clone(),
62            gossip.clone(),
63            cancel_token.clone(),
64            record_publisher.config().timeouts().clone(),
65            record_publisher.config().bootstrap_config().clone(),
66        )
67        .await?;
68        tracing::debug!("Topic: bootstrap instance created");
69
70        let api = Arc::new(
71            Handle::spawn(TopicActor {
72                bootstrap: bootstrap.clone(),
73                record_publisher: record_publisher.clone(),
74                publisher: None,
75                bubble_merge: None,
76                message_overlap_merge: None,
77                cancel_token: cancel_token.clone(),
78            })
79            .0,
80        );
81
82        let bootstrap_done = bootstrap.bootstrap().await?;
83        let config = record_publisher.config().clone();
84        tokio::spawn({
85            let api = Arc::downgrade(&api);
86            let config = config.clone();
87            let cancel_token = cancel_token.clone();
88            async move {
89                if let Err(err) = wait_for_bootstrap(bootstrap_done, cancel_token.clone()).await {
90                    tracing::warn!("bootstrap failed: {}", err);
91                    return;
92                }
93
94                if async_bootstrap {
95                    tracing::debug!("Bootstrap completed, now spawning workers");
96                    if let Err(err) = spawn_workers(api, config, cancel_token.clone()).await {
97                        cancel_token.cancel();
98                        tracing::warn!("failed to spawn workers: {}", err);
99                    }
100                }
101            }
102        });
103
104        if !async_bootstrap {
105            tracing::debug!("Topic: waiting for bootstrap to complete");
106            bootstrap.gossip_receiver().await?.joined().await?;
107            if let Err(err) =
108                spawn_workers(Arc::downgrade(&api), config, cancel_token.clone()).await
109            {
110                tracing::warn!("failed to spawn workers: {}", err);
111                cancel_token.cancel();
112                return Err(anyhow::anyhow!("failed to spawn workers: {}", err));
113            }
114            tracing::debug!("Topic: bootstrap completed");
115        } else {
116            tracing::debug!("Topic: bootstrap started asynchronously");
117        }
118
119        Ok(Self { api, cancel_token })
120    }
121
122    /// Split into sender and receiver for message exchange.
123    pub async fn split(&self) -> Result<(GossipSender, GossipReceiver)> {
124        Ok((self.gossip_sender().await?, self.gossip_receiver().await?))
125    }
126
127    /// Get the gossip sender for this topic.
128    pub async fn gossip_sender(&self) -> Result<GossipSender> {
129        let topic_ref = Arc::new(self.clone());
130        self.api
131            .call(act!(actor => async move {
132                let mut sender = actor.bootstrap.gossip_sender().await?;
133                sender._topic_keep_alive = Some(topic_ref.clone());
134                Ok(sender)
135
136            }))
137            .await
138    }
139
140    /// Get the gossip receiver for this topic.
141    pub async fn gossip_receiver(&self) -> Result<GossipReceiver> {
142        let topic_ref = Arc::new(self.clone());
143        self.api
144            .call(act!(actor => async move {
145                let mut receiver = actor.bootstrap.gossip_receiver().await?;
146                receiver._topic_keep_alive = Some(topic_ref.clone());
147                Ok(receiver)
148            }))
149            .await
150    }
151
152    /// Get the record publisher for this topic.
153    pub async fn record_creator(&self) -> Result<RecordPublisher> {
154        self.api
155            .call(act_ok!(actor => async move { actor.record_publisher.clone() }))
156            .await
157    }
158
159    #[allow(dead_code)]
160    pub(crate) fn cancel_token(&self) -> CancellationToken {
161        self.cancel_token.clone()
162    }
163}
164
165async fn wait_for_bootstrap(
166    bootstrap_done: tokio::sync::oneshot::Receiver<Result<()>>,
167    cancel_token: CancellationToken,
168) -> Result<()> {
169    if let Ok(Ok(_)) = bootstrap_done.await {
170        Ok(())
171    } else {
172        tracing::error!("Topic: bootstrap failed or cancelled, shutting down topic");
173        cancel_token.cancel();
174        Err(anyhow::anyhow!("bootstrap failed or cancelled"))
175    }
176}
177
178async fn spawn_workers(
179    api: Weak<Handle<TopicActor, anyhow::Error>>,
180    config: Config,
181    cancel_token: CancellationToken,
182) -> Result<()> {
183    if !cancel_token.is_cancelled() {
184        if matches!(config.publisher_config(), PublisherConfig::Enabled(_)) {
185            tracing::debug!("Topic: starting publisher");
186            match api.upgrade() {
187                Some(api) => {
188                    if let Err(err) = api.call(act!(actor => actor.start_publishing())).await {
189                        return Err(anyhow::anyhow!("failed to start publisher: {err}"));
190                    }
191                }
192                None => {
193                    return Err(anyhow::anyhow!(
194                        "failed to start publisher, topic actor dropped"
195                    ));
196                }
197            }
198        }
199
200        if matches!(
201            config.merge_config().bubble_merge(),
202            BubbleMergeConfig::Enabled(_)
203        ) {
204            tracing::debug!("Topic: starting bubble merge");
205            match api.upgrade() {
206                Some(api) => {
207                    if let Err(err) = api.call(act!(actor => actor.start_bubble_merge())).await {
208                        return Err(anyhow::anyhow!("failed to start bubble merge: {err}"));
209                    }
210                }
211                None => {
212                    return Err(anyhow::anyhow!(
213                        "failed to start bubble merge, topic actor dropped"
214                    ));
215                }
216            }
217        }
218
219        if matches!(
220            config.merge_config().message_overlap_merge(),
221            MessageOverlapMergeConfig::Enabled(_)
222        ) {
223            tracing::debug!("Topic: starting message overlap merge");
224            match api.upgrade() {
225                Some(api) => {
226                    if let Err(err) = api
227                        .call(act!(actor => actor.start_message_overlap_merge()))
228                        .await
229                    {
230                        return Err(anyhow::anyhow!(
231                            "failed to start message overlap merge: {err}"
232                        ));
233                    }
234                }
235                None => {
236                    return Err(anyhow::anyhow!(
237                        "failed to start message overlap merge, topic actor dropped"
238                    ));
239                }
240            }
241        }
242        tracing::debug!("Topic: spawn_worker finished");
243        Ok(())
244    } else {
245        tracing::warn!("Topic: cancelled before workers could be spawned");
246        Err(anyhow::anyhow!("cancelled before workers could be spawned"))
247    }
248}
249
250impl TopicActor {
251    pub async fn start_publishing(&mut self) -> Result<()> {
252        if let PublisherConfig::Enabled(config) = self.record_publisher.config().publisher_config()
253        {
254            tracing::debug!("TopicActor: initializing publisher");
255            let publisher = async {
256                Publisher::new(
257                    self.record_publisher.clone(),
258                    self.bootstrap.gossip_receiver().await?,
259                    self.cancel_token.clone(),
260                    config.initial_delay(),
261                    config.base_interval(),
262                    config.max_jitter(),
263                )
264            };
265
266            match publisher.await {
267                Ok(publisher) => {
268                    self.publisher = Some(publisher);
269                    tracing::debug!("TopicActor: publisher started");
270                }
271                Err(err) => {
272                    if config.fail_topic_creation_on_publishing_startup_failure() {
273                        return Err(anyhow::anyhow!("failed to start publisher: {}", err));
274                    } else {
275                        tracing::warn!(
276                            "TopicActor: failed to start publisher: {}, but continuing because Publisher.fail_topic_creation_on_publishing_startup_failure is false",
277                            err
278                        );
279                    }
280                }
281            }
282        }
283        Ok(())
284    }
285
286    pub async fn start_bubble_merge(&mut self) -> Result<()> {
287        if let BubbleMergeConfig::Enabled(config) =
288            self.record_publisher.config().merge_config().bubble_merge()
289        {
290            tracing::debug!("TopicActor: initializing bubble merge");
291            let bubble_merge = async {
292                BubbleMerge::new(
293                    self.record_publisher.clone(),
294                    self.bootstrap.gossip_sender().await?,
295                    self.bootstrap.gossip_receiver().await?,
296                    self.cancel_token.clone(),
297                    config.max_join_peers(),
298                    config.initial_interval(),
299                    config.base_interval(),
300                    config.max_jitter(),
301                    config.min_neighbors(),
302                )
303            };
304
305            match bubble_merge.await {
306                Ok(bubble_merge) => {
307                    self.bubble_merge = Some(bubble_merge);
308                    tracing::debug!("TopicActor: bubble merge started");
309                }
310                Err(err) => {
311                    if config.fail_topic_creation_on_merge_startup_failure() {
312                        return Err(anyhow::anyhow!("failed to start bubble merge: {}", err));
313                    } else {
314                        tracing::warn!(
315                            "TopicActor: failed to start bubble merge: {}, but continuing because BubbleMerge.fail_topic_creation_on_merge_startup_failure is false",
316                            err
317                        );
318                    }
319                }
320            }
321        }
322        Ok(())
323    }
324
325    pub async fn start_message_overlap_merge(&mut self) -> Result<()> {
326        if let MessageOverlapMergeConfig::Enabled(config) = self
327            .record_publisher
328            .config()
329            .merge_config()
330            .message_overlap_merge()
331        {
332            tracing::debug!("TopicActor: initializing message overlap merge");
333            let message_overlap_merge = async {
334                MessageOverlapMerge::new(
335                    self.record_publisher.clone(),
336                    self.bootstrap.gossip_sender().await?,
337                    self.bootstrap.gossip_receiver().await?,
338                    self.cancel_token.clone(),
339                    config.max_join_peers(),
340                    config.initial_interval(),
341                    config.base_interval(),
342                    config.max_jitter(),
343                )
344            };
345
346            match message_overlap_merge.await {
347                Ok(message_overlap_merge) => {
348                    self.message_overlap_merge = Some(message_overlap_merge);
349                    tracing::debug!("TopicActor: message overlap merge started");
350                }
351                Err(err) => {
352                    if config.fail_topic_creation_on_merge_startup_failure() {
353                        return Err(anyhow::anyhow!(
354                            "failed to start message overlap merge: {}",
355                            err
356                        ));
357                    } else {
358                        tracing::warn!(
359                            "TopicActor: failed to start message overlap merge: {}, but continuing because MessageOverlapMerge.fail_topic_creation_on_merge_startup_failure is false",
360                            err
361                        );
362                    }
363                }
364            }
365        }
366        Ok(())
367    }
368}
369
370#[cfg(test)]
371mod tests {
372    use crate::RecordPublisher;
373
374    #[tokio::test]
375    async fn test_receiver_returns_none_after_shutdown() {
376        let secret_key = iroh::SecretKey::generate();
377        let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes());
378        let endpoint = iroh::Endpoint::builder(iroh::endpoint::presets::N0)
379            .secret_key(secret_key.clone())
380            .bind()
381            .await
382            .expect("failed to bind endpoint");
383        let gossip = iroh_gossip::net::Gossip::builder().spawn(endpoint.clone());
384
385        let topic_id = crate::TopicId::new("shutdown-receiver-test".to_string());
386        let initial_secret = b"my-initial-secret".to_vec();
387
388        let record_publisher = RecordPublisher::new(
389            topic_id.clone(),
390            signing_key.clone(),
391            None,
392            initial_secret,
393            crate::config::Config::default(),
394        );
395
396        let topic = crate::Topic::new(record_publisher, gossip.clone(), true)
397            .await
398            .expect("failed to create topic");
399
400        let cancel_token = topic.cancel_token();
401        let (_sender, receiver) = topic.split().await.expect("failed to split topic");
402
403        // Clone the receiver before shutdown, this survivor must not hang
404        let mut survivor = receiver.clone();
405
406        cancel_token.cancel();
407
408        // next() on a receiver that was alive before shutdown must return ChannelError,
409        // not hang. If the broadcast channel didn't close, this would block forever
410        let result = tokio::time::timeout(std::time::Duration::from_secs(5), survivor.next())
411            .await
412            .expect("next() hung after shutdown - broadcast channel didn't close");
413        assert!(result.is_err(), "expected Err from next() after shutdown");
414
415        // joined() must also return Err after shutdown
416        let result = tokio::time::timeout(std::time::Duration::from_secs(5), survivor.joined())
417            .await
418            .expect("joined() hung after shutdown - broadcast channel didn't close");
419        assert!(result.is_err(), "expected Err from joined() after shutdown");
420
421        // A clone made after shutdown must also return Err immediately
422        // (WeakSender::upgrade fails -> gets an already closed channel)
423        let mut late_clone = survivor.clone();
424
425        let result = tokio::time::timeout(std::time::Duration::from_secs(5), late_clone.next())
426            .await
427            .expect("next() hung on post shutdown clone, WeakSender upgrade should fail");
428        assert!(
429            result.is_err(),
430            "expected Err from next() on post shutdown clone"
431        );
432
433        let result = tokio::time::timeout(std::time::Duration::from_secs(5), late_clone.joined())
434            .await
435            .expect("joined() hung on post shutdown clone, WeakSender upgrade should fail");
436        assert!(
437            result.is_err(),
438            "expected Err from joined() on post shutdown clone"
439        );
440    }
441
442    #[tokio::test]
443    async fn test_topic_full_shutdown_on_drop() {
444        let secret_key = iroh::SecretKey::generate();
445        let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes());
446        let endpoint = iroh::Endpoint::builder(iroh::endpoint::presets::N0)
447            .secret_key(secret_key.clone())
448            .bind()
449            .await
450            .expect("failed to bind endpoint");
451        let gossip = iroh_gossip::net::Gossip::builder().spawn(endpoint.clone());
452
453        let topic_id = crate::TopicId::new("my-iroh-gossip-topic".to_string());
454        let initial_secret = b"my-initial-secret".to_vec();
455
456        let record_publisher = RecordPublisher::new(
457            topic_id.clone(),
458            signing_key.clone(),
459            None,
460            initial_secret,
461            crate::config::Config::default(),
462        );
463
464        let topic = crate::Topic::new(record_publisher, gossip.clone(), true)
465            .await
466            .expect("failed to create Topic");
467
468        let cancel_token = topic.cancel_token();
469
470        let (sender, receiver) = topic.split().await.expect("failed to split topic");
471
472        assert!(!cancel_token.is_cancelled());
473
474        drop(sender);
475        drop(receiver);
476        drop(topic);
477
478        tokio::time::timeout(std::time::Duration::from_secs(5), cancel_token.cancelled())
479            .await
480            .expect("cancel token timed out");
481
482        assert!(cancel_token.is_cancelled());
483    }
484
485    #[tokio::test]
486    async fn test_topic_survives_topic_drop_split() {
487        let secret_key = iroh::SecretKey::generate();
488        let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes());
489        let endpoint = iroh::Endpoint::builder(iroh::endpoint::presets::N0)
490            .secret_key(secret_key.clone())
491            .bind()
492            .await
493            .expect("failed to bind endpoint");
494        let gossip = iroh_gossip::net::Gossip::builder().spawn(endpoint.clone());
495
496        let topic_id = crate::TopicId::new("my-iroh-gossip-topic-survives-drop-split".to_string());
497        let initial_secret = b"my-initial-secret".to_vec();
498
499        let record_publisher = RecordPublisher::new(
500            topic_id.clone(),
501            signing_key.clone(),
502            None,
503            initial_secret,
504            crate::config::Config::default(),
505        );
506
507        let topic = crate::Topic::new(record_publisher, gossip.clone(), true)
508            .await
509            .expect("failed to create Topic");
510
511        let cancel_token = topic.cancel_token();
512        let (_sender, _receiver) = topic.split().await.expect("failed to split topic");
513
514        drop(topic);
515
516        assert!(
517            tokio::time::timeout(std::time::Duration::from_secs(2), cancel_token.cancelled())
518                .await
519                .is_err()
520        );
521
522        assert!(!cancel_token.is_cancelled());
523
524        drop(_sender);
525        drop(_receiver);
526
527        assert!(
528            tokio::time::timeout(std::time::Duration::from_secs(2), cancel_token.cancelled())
529                .await
530                .is_ok()
531        );
532
533        assert!(cancel_token.is_cancelled());
534    }
535
536    #[tokio::test]
537    async fn test_topic_survives_topic_drop_manual_sender_receiver() {
538        let secret_key = iroh::SecretKey::generate();
539        let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes());
540        let endpoint = iroh::Endpoint::builder(iroh::endpoint::presets::N0)
541            .secret_key(secret_key.clone())
542            .bind()
543            .await
544            .expect("failed to bind endpoint");
545        let gossip = iroh_gossip::net::Gossip::builder().spawn(endpoint.clone());
546
547        let topic_id = crate::TopicId::new(
548            "my-iroh-gossip-topic-survives-drop-manual-sender-receiver".to_string(),
549        );
550        let initial_secret = b"my-initial-secret".to_vec();
551
552        let record_publisher = RecordPublisher::new(
553            topic_id.clone(),
554            signing_key.clone(),
555            None,
556            initial_secret,
557            crate::config::Config::default(),
558        );
559
560        let topic = crate::Topic::new(record_publisher, gossip.clone(), true)
561            .await
562            .expect("failed to create Topic");
563
564        let cancel_token = topic.cancel_token();
565        let (_sender, _receiver) = (
566            topic
567                .gossip_sender()
568                .await
569                .expect("failed to get gossip sender"),
570            topic
571                .gossip_receiver()
572                .await
573                .expect("failed to get gossip receiver"),
574        );
575
576        drop(topic);
577
578        assert!(
579            tokio::time::timeout(std::time::Duration::from_secs(2), cancel_token.cancelled())
580                .await
581                .is_err()
582        );
583
584        assert!(!cancel_token.is_cancelled());
585
586        drop(_sender);
587        drop(_receiver);
588
589        assert!(
590            tokio::time::timeout(std::time::Duration::from_secs(2), cancel_token.cancelled())
591                .await
592                .is_ok()
593        );
594
595        assert!(cancel_token.is_cancelled());
596    }
597}