Skip to main content

distributed_topic_tracker/gossip/topic/
topic.rs

1//! Main topic handle combining bootstrap, publishing, and merging.
2
3use std::sync::{Arc, Weak};
4
5use crate::{
6    BubbleMergeConfig, Config, GossipSender, MessageOverlapMergeConfig,
7    config::PublisherConfig,
8    gossip::{
9        merge::{BubbleMerge, MessageOverlapMerge},
10        topic::{bootstrap::Bootstrap, publisher::Publisher},
11    },
12};
13use actor_helper::{Handle, act, act_ok};
14use anyhow::Result;
15use tokio_util::sync::CancellationToken;
16
17/// Handle to a joined gossip topic with auto-discovery.
18///
19/// Manages bootstrap, publishing, bubble detection, and split-brain recovery.
20/// Can be split into sender and receiver for message exchange.
21#[derive(Debug, Clone)]
22pub struct Topic {
23    api: Arc<Handle<TopicActor, anyhow::Error>>,
24    cancel_token: CancellationToken,
25}
26
27#[derive(Debug)]
28struct TopicActor {
29    bootstrap: Bootstrap,
30    publisher: Option<Publisher>,
31    bubble_merge: Option<BubbleMerge>,
32    message_overlap_merge: Option<MessageOverlapMerge>,
33    record_publisher: crate::crypto::RecordPublisher,
34    cancel_token: CancellationToken,
35}
36
37impl Drop for TopicActor {
38    fn drop(&mut self) {
39        self.cancel_token.cancel();
40    }
41}
42
43impl Topic {
44    /// Create and initialize a new topic with auto-discovery.
45    ///
46    /// # Arguments
47    ///
48    /// * `record_publisher` - Record publisher for DHT operations
49    /// * `gossip` - Gossip instance for topic subscription
50    /// * `async_bootstrap` - If false, awaits until bootstrap completes
51    pub async fn new(
52        record_publisher: crate::crypto::RecordPublisher,
53        gossip: iroh_gossip::net::Gossip,
54        async_bootstrap: bool,
55    ) -> Result<Self> {
56        tracing::debug!(
57            "Topic: creating new topic (async_bootstrap={})",
58            async_bootstrap
59        );
60
61        let cancel_token = CancellationToken::new();
62        let bootstrap = Bootstrap::new(
63            record_publisher.clone(),
64            gossip.clone(),
65            cancel_token.clone(),
66            record_publisher.config().timeouts().clone(),
67            record_publisher.config().bootstrap_config().clone(),
68        )
69        .await?;
70        tracing::debug!("Topic: bootstrap instance created");
71
72        let api = Arc::new(
73            Handle::spawn(TopicActor {
74                bootstrap: bootstrap.clone(),
75                record_publisher: record_publisher.clone(),
76                publisher: None,
77                bubble_merge: None,
78                message_overlap_merge: None,
79                cancel_token: cancel_token.clone(),
80            })
81            .0,
82        );
83
84        let bootstrap_done = bootstrap.bootstrap().await?;
85        let config = record_publisher.config().clone();
86        tokio::spawn({
87            let api = Arc::downgrade(&api);
88            let config = config.clone();
89            let cancel_token = cancel_token.clone();
90            async move {
91                if let Err(err) = wait_for_bootstrap(bootstrap_done, cancel_token.clone()).await {
92                    tracing::warn!("bootstrap failed: {}", err);
93                    return;
94                }
95
96                if async_bootstrap {
97                    tracing::debug!("Bootstrap completed, now spawning workers");
98                    if let Err(err) = spawn_workers(api, config, cancel_token.clone()).await {
99                        cancel_token.cancel();
100                        tracing::warn!("failed to spawn workers: {}", err);
101                    }
102                }
103            }
104        });
105
106        if !async_bootstrap {
107            tracing::debug!("Topic: waiting for bootstrap to complete");
108            bootstrap.gossip_receiver().await?.joined().await?;
109            if let Err(err) =
110                spawn_workers(Arc::downgrade(&api), config, cancel_token.clone()).await
111            {
112                tracing::warn!("failed to spawn workers: {}", err);
113                cancel_token.cancel();
114                return Err(anyhow::anyhow!("failed to spawn workers: {}", err));
115            }
116            tracing::debug!("Topic: bootstrap completed");
117        } else {
118            tracing::debug!("Topic: bootstrap started asynchronously");
119        }
120
121        Ok(Self { api, cancel_token })
122    }
123
124    /// Split into sender and receiver for message exchange.
125    pub async fn split(&self) -> Result<(GossipSender, crate::gossip::receiver::GossipReceiver)> {
126        let topic_ref = Arc::new(self.clone());
127        let mut sender = self.gossip_sender().await?;
128        let mut receiver = self.gossip_receiver().await?;
129        sender._topic_keep_alive = Some(topic_ref.clone());
130        receiver._topic_keep_alive = Some(topic_ref);
131        Ok((sender, receiver))
132    }
133
134    /// Get the gossip sender for this topic.
135    pub async fn gossip_sender(&self) -> Result<GossipSender> {
136        self.api
137            .call(act!(actor => actor.bootstrap.gossip_sender()))
138            .await
139    }
140
141    /// Get the gossip receiver for this topic.
142    pub async fn gossip_receiver(&self) -> Result<crate::gossip::receiver::GossipReceiver> {
143        self.api
144            .call(act!(actor => actor.bootstrap.gossip_receiver()))
145            .await
146    }
147
148    /// Get the record publisher for this topic.
149    pub async fn record_creator(&self) -> Result<crate::crypto::RecordPublisher> {
150        self.api
151            .call(act_ok!(actor => async move { actor.record_publisher.clone() }))
152            .await
153    }
154
155    #[allow(dead_code)]
156    pub(crate) fn cancel_token(&self) -> CancellationToken {
157        self.cancel_token.clone()
158    }
159}
160
161async fn wait_for_bootstrap(
162    bootstrap_done: tokio::sync::oneshot::Receiver<Result<()>>,
163    cancel_token: CancellationToken,
164) -> Result<()> {
165    if let Ok(Ok(_)) = bootstrap_done.await {
166        Ok(())
167    } else {
168        tracing::error!("Topic: bootstrap failed or cancelled, shutting down topic");
169        cancel_token.cancel();
170        Err(anyhow::anyhow!("bootstrap failed or cancelled"))
171    }
172}
173
174async fn spawn_workers(
175    api: Weak<Handle<TopicActor, anyhow::Error>>,
176    config: Config,
177    cancel_token: CancellationToken,
178) -> Result<()> {
179    if !cancel_token.is_cancelled() {
180        if matches!(config.publisher_config(), PublisherConfig::Enabled(_)) {
181            tracing::debug!("Topic: starting publisher");
182            match api.upgrade() {
183                Some(api) => {
184                    if let Err(err) = api.call(act!(actor => actor.start_publishing())).await {
185                        return Err(anyhow::anyhow!("failed to start publisher: {err}"));
186                    }
187                }
188                None => {
189                    return Err(anyhow::anyhow!(
190                        "failed to start publisher, topic actor dropped"
191                    ));
192                }
193            }
194        }
195
196        if matches!(
197            config.merge_config().bubble_merge(),
198            BubbleMergeConfig::Enabled(_)
199        ) {
200            tracing::debug!("Topic: starting bubble merge");
201            match api.upgrade() {
202                Some(api) => {
203                    if let Err(err) = api.call(act!(actor => actor.start_bubble_merge())).await {
204                        return Err(anyhow::anyhow!("failed to start bubble merge: {err}"));
205                    }
206                }
207                None => {
208                    return Err(anyhow::anyhow!(
209                        "failed to start bubble merge, topic actor dropped"
210                    ));
211                }
212            }
213        }
214
215        if matches!(
216            config.merge_config().message_overlap_merge(),
217            MessageOverlapMergeConfig::Enabled(_)
218        ) {
219            tracing::debug!("Topic: starting message overlap merge");
220            match api.upgrade() {
221                Some(api) => {
222                    if let Err(err) = api
223                        .call(act!(actor => actor.start_message_overlap_merge()))
224                        .await
225                    {
226                        return Err(anyhow::anyhow!(
227                            "failed to start message overlap merge: {err}"
228                        ));
229                    }
230                }
231                None => {
232                    return Err(anyhow::anyhow!(
233                        "failed to start message overlap merge, topic actor dropped"
234                    ));
235                }
236            }
237        }
238        tracing::debug!("Topic: spawn_worker finished");
239        Ok(())
240    } else {
241        tracing::warn!("Topic: cancelled before workers could be spawned");
242        Err(anyhow::anyhow!("cancelled before workers could be spawned"))
243    }
244}
245
246impl TopicActor {
247    pub async fn start_publishing(&mut self) -> Result<()> {
248        if let PublisherConfig::Enabled(config) = self.record_publisher.config().publisher_config()
249        {
250            tracing::debug!("TopicActor: initializing publisher");
251            let publisher = async {
252                Publisher::new(
253                    self.record_publisher.clone(),
254                    self.bootstrap.gossip_receiver().await?,
255                    self.cancel_token.clone(),
256                    config.initial_delay(),
257                    config.base_interval(),
258                    config.max_jitter(),
259                )
260            };
261
262            match publisher.await {
263                Ok(publisher) => {
264                    self.publisher = Some(publisher);
265                    tracing::debug!("TopicActor: publisher started");
266                }
267                Err(err) => {
268                    if config.fail_topic_creation_on_publishing_startup_failure() {
269                        return Err(anyhow::anyhow!("failed to start publisher: {}", err));
270                    } else {
271                        tracing::warn!(
272                            "TopicActor: failed to start publisher: {}, but continuing because Publisher.fail_topic_creation_on_publishing_startup_failure is false",
273                            err
274                        );
275                    }
276                }
277            }
278        }
279        Ok(())
280    }
281
282    pub async fn start_bubble_merge(&mut self) -> Result<()> {
283        if let BubbleMergeConfig::Enabled(config) =
284            self.record_publisher.config().merge_config().bubble_merge()
285        {
286            tracing::debug!("TopicActor: initializing bubble merge");
287            let bubble_merge = async {
288                BubbleMerge::new(
289                    self.record_publisher.clone(),
290                    self.bootstrap.gossip_sender().await?,
291                    self.bootstrap.gossip_receiver().await?,
292                    self.cancel_token.clone(),
293                    self.record_publisher.config().max_join_peer_count(),
294                    config.base_interval(),
295                    config.max_jitter(),
296                    config.min_neighbors(),
297                )
298            };
299
300            match bubble_merge.await {
301                Ok(bubble_merge) => {
302                    self.bubble_merge = Some(bubble_merge);
303                    tracing::debug!("TopicActor: bubble merge started");
304                }
305                Err(err) => {
306                    if config.fail_topic_creation_on_merge_startup_failure() {
307                        return Err(anyhow::anyhow!("failed to start bubble merge: {}", err));
308                    } else {
309                        tracing::warn!(
310                            "TopicActor: failed to start bubble merge: {}, but continuing because BubbleMerge.fail_topic_creation_on_merge_startup_failure is false",
311                            err
312                        );
313                    }
314                }
315            }
316        }
317        Ok(())
318    }
319
320    pub async fn start_message_overlap_merge(&mut self) -> Result<()> {
321        if let MessageOverlapMergeConfig::Enabled(config) = self
322            .record_publisher
323            .config()
324            .merge_config()
325            .message_overlap_merge()
326        {
327            tracing::debug!("TopicActor: initializing message overlap merge");
328            let message_overlap_merge = async {
329                MessageOverlapMerge::new(
330                    self.record_publisher.clone(),
331                    self.bootstrap.gossip_sender().await?,
332                    self.bootstrap.gossip_receiver().await?,
333                    self.cancel_token.clone(),
334                    self.record_publisher.config().max_join_peer_count(),
335                    config.base_interval(),
336                    config.max_jitter(),
337                )
338            };
339
340            match message_overlap_merge.await {
341                Ok(message_overlap_merge) => {
342                    self.message_overlap_merge = Some(message_overlap_merge);
343                    tracing::debug!("TopicActor: message overlap merge started");
344                }
345                Err(err) => {
346                    if config.fail_topic_creation_on_merge_startup_failure() {
347                        return Err(anyhow::anyhow!(
348                            "failed to start message overlap merge: {}",
349                            err
350                        ));
351                    } else {
352                        tracing::warn!(
353                            "TopicActor: failed to start message overlap merge: {}, but continuing because MessageOverlapMerge.fail_topic_creation_on_merge_startup_failure is false",
354                            err
355                        );
356                    }
357                }
358            }
359        }
360        Ok(())
361    }
362}
363
364#[cfg(test)]
365mod tests {
366    #[tokio::test]
367    async fn test_receiver_returns_none_after_shutdown() {
368        let secret_key = iroh::SecretKey::generate();
369        let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes());
370        let endpoint = iroh::Endpoint::builder(iroh::endpoint::presets::N0)
371            .secret_key(secret_key.clone())
372            .bind()
373            .await
374            .expect("failed to bind endpoint");
375        let gossip = iroh_gossip::net::Gossip::builder().spawn(endpoint.clone());
376
377        let topic_id = crate::TopicId::new("shutdown-receiver-test".to_string());
378        let initial_secret = b"my-initial-secret".to_vec();
379
380        let record_publisher = crate::RecordPublisher::new(
381            topic_id.clone(),
382            signing_key.clone(),
383            None,
384            initial_secret,
385            crate::config::Config::default(),
386        );
387
388        let topic = crate::Topic::new(record_publisher, gossip.clone(), true)
389            .await
390            .expect("failed to create topic");
391
392        let cancel_token = topic.cancel_token();
393        let (_sender, receiver) = topic.split().await.expect("failed to split topic");
394
395        // Clone the receiver before shutdown, this survivor must not hang
396        let mut survivor = receiver.clone();
397
398        cancel_token.cancel();
399
400        // next() on a receiver that was alive before shutdown must return ChannelError,
401        // not hang. If the broadcast channel didn't close, this would block forever
402        let result = tokio::time::timeout(std::time::Duration::from_secs(5), survivor.next())
403            .await
404            .expect("next() hung after shutdown - broadcast channel didn't close");
405        assert!(result.is_err(), "expected Err from next() after shutdown");
406
407        // joined() must also return Err after shutdown
408        let result = tokio::time::timeout(std::time::Duration::from_secs(5), survivor.joined())
409            .await
410            .expect("joined() hung after shutdown - broadcast channel didn't close");
411        assert!(result.is_err(), "expected Err from joined() after shutdown");
412
413        // A clone made after shutdown must also return Err immediately
414        // (WeakSender::upgrade fails -> gets an already closed channel)
415        let mut late_clone = survivor.clone();
416
417        let result = tokio::time::timeout(std::time::Duration::from_secs(5), late_clone.next())
418            .await
419            .expect("next() hung on post shutdown clone, WeakSender upgrade should fail");
420        assert!(
421            result.is_err(),
422            "expected Err from next() on post shutdown clone"
423        );
424
425        let result = tokio::time::timeout(std::time::Duration::from_secs(5), late_clone.joined())
426            .await
427            .expect("joined() hung on post shutdown clone, WeakSender upgrade should fail");
428        assert!(
429            result.is_err(),
430            "expected Err from joined() on post shutdown clone"
431        );
432    }
433
434    #[tokio::test]
435    async fn test_topic_full_shutdown_on_drop() {
436        let secret_key = iroh::SecretKey::generate();
437        let signing_key = mainline::SigningKey::from_bytes(&secret_key.to_bytes());
438        let endpoint = iroh::Endpoint::builder(iroh::endpoint::presets::N0)
439            .secret_key(secret_key.clone())
440            .bind()
441            .await
442            .expect("failed to bind endpoint");
443        let gossip = iroh_gossip::net::Gossip::builder().spawn(endpoint.clone());
444
445        let topic_id = crate::TopicId::new("my-iroh-gossip-topic".to_string());
446        let initial_secret = b"my-initial-secret".to_vec();
447
448        let record_publisher = crate::RecordPublisher::new(
449            topic_id.clone(),
450            signing_key.clone(),
451            None,
452            initial_secret,
453            crate::config::Config::default(),
454        );
455
456        let topic = crate::Topic::new(record_publisher, gossip.clone(), true)
457            .await
458            .expect("failed to create Topic");
459
460        let cancel_token = topic.cancel_token();
461
462        let (sender, receiver) = topic.split().await.expect("failed to split topic");
463
464        assert!(!cancel_token.is_cancelled());
465
466        drop(sender);
467        drop(receiver);
468        drop(topic);
469
470        tokio::time::timeout(std::time::Duration::from_secs(5), cancel_token.cancelled())
471            .await
472            .expect("cancel token timed out");
473
474        assert!(cancel_token.is_cancelled());
475    }
476}