distributed_topic_tracker/gossip/topic/
topic.rs

1//! Main topic handle combining bootstrap, publishing, and merging.
2
3use crate::{
4    GossipSender,
5    crypto::RecordTopic,
6    gossip::{
7        merge::{BubbleMerge, MessageOverlapMerge},
8        topic::{bootstrap::Bootstrap, publisher::Publisher},
9    },
10};
11use actor_helper::{Action, Actor, Handle, Receiver, act, act_ok};
12use anyhow::Result;
13use sha2::Digest;
14
15/// Topic identifier derived from a string via SHA512 hashing.
16///
17/// Used as the stable identifier for gossip subscriptions and DHT records.
18///
19/// # Example
20///
21/// ```ignore
22/// let topic_id = TopicId::new("chat-room-1".to_string());
23/// ```
24#[derive(Debug, Clone)]
25pub struct TopicId {
26    _raw: String,
27    hash: [u8; 32],
28}
29
30impl Into<RecordTopic> for TopicId {
31    fn into(self) -> RecordTopic {
32        RecordTopic::from_bytes(&self.hash)
33    }
34}
35
36impl TopicId {
37    /// Create a new topic ID from a string.
38    ///
39    /// String is hashed with SHA512; the first 32 bytes produce the identifier.
40    pub fn new(raw: String) -> Self {
41        let mut raw_hash = sha2::Sha512::new();
42        raw_hash.update(raw.as_bytes());
43
44        Self {
45            _raw: raw,
46            hash: raw_hash.finalize()[..32]
47                .try_into()
48                .expect("hashing 'raw' failed"),
49        }
50    }
51
52    /// Get the hash bytes.
53    pub fn hash(&self) -> [u8; 32] {
54        self.hash
55    }
56
57    /// Get the original string.
58    #[allow(dead_code)]
59    pub fn raw(&self) -> &str {
60        &self._raw
61    }
62}
63
64/// Handle to a joined gossip topic with auto-discovery.
65///
66/// Manages bootstrap, publishing, bubble detection, and split-brain recovery.
67/// Can be split into sender and receiver for message exchange.
68#[derive(Debug, Clone)]
69pub struct Topic {
70    api: Handle<TopicActor, anyhow::Error>,
71}
72
73#[derive(Debug)]
74struct TopicActor {
75    rx: Receiver<Action<Self>>,
76    bootstrap: Bootstrap,
77    publisher: Option<Publisher>,
78    bubble_merge: Option<BubbleMerge>,
79    message_overlap_merge: Option<MessageOverlapMerge>,
80    record_publisher: crate::crypto::RecordPublisher,
81}
82
83impl Topic {
84    /// Create and initialize a new topic with auto-discovery.
85    ///
86    /// # Arguments
87    ///
88    /// * `record_publisher` - Record publisher for DHT operations
89    /// * `gossip` - Gossip instance for topic subscription
90    /// * `async_bootstrap` - If false, awaits until bootstrap completes
91    pub async fn new(
92        record_publisher: crate::crypto::RecordPublisher,
93        gossip: iroh_gossip::net::Gossip,
94        async_bootstrap: bool,
95    ) -> Result<Self> {
96        tracing::debug!("Topic: creating new topic (async_bootstrap={})", async_bootstrap);
97        
98        let (api, rx) = Handle::channel();
99
100        let bootstrap = Bootstrap::new(record_publisher.clone(), gossip.clone()).await?;
101        tracing::debug!("Topic: bootstrap instance created");
102
103        tokio::spawn({
104            let bootstrap = bootstrap.clone();
105            async move {
106                tracing::debug!("Topic: starting topic actor");
107                let mut actor = TopicActor {
108                    rx,
109                    bootstrap: bootstrap.clone(),
110                    record_publisher,
111                    publisher: None,
112                    bubble_merge: None,
113                    message_overlap_merge: None,
114                };
115                let _ = actor.run().await;
116            }
117        });
118
119        let bootstrap_done = bootstrap.bootstrap().await?;
120        if !async_bootstrap {
121            tracing::debug!("Topic: waiting for bootstrap to complete");
122            bootstrap_done.await?;
123            tracing::debug!("Topic: bootstrap completed");
124        } else {
125            tracing::debug!("Topic: bootstrap started asynchronously");
126        }
127
128        tracing::debug!("Topic: starting publisher");
129        let _ = api.call(act!(actor => actor.start_publishing())).await;
130        
131        tracing::debug!("Topic: starting bubble merge");
132        let _ = api.call(act!(actor => actor.start_bubble_merge())).await;
133        
134        tracing::debug!("Topic: starting message overlap merge");
135        let _ = api
136            .call(act!(actor => actor.start_message_overlap_merge()))
137            .await;
138
139        tracing::debug!("Topic: fully initialized");
140        Ok(Self { api })
141    }
142
143    /// Split into sender and receiver for message exchange.
144    pub async fn split(&self) -> Result<(GossipSender, crate::gossip::receiver::GossipReceiver)> {
145        Ok((self.gossip_sender().await?, self.gossip_receiver().await?))
146    }
147
148    /// Get the gossip sender for this topic.
149    pub async fn gossip_sender(&self) -> Result<GossipSender> {
150        self.api
151            .call(act!(actor => actor.bootstrap.gossip_sender()))
152            .await
153    }
154
155    /// Get the gossip receiver for this topic.
156    pub async fn gossip_receiver(&self) -> Result<crate::gossip::receiver::GossipReceiver> {
157        self.api
158            .call(act!(actor => actor.bootstrap.gossip_receiver()))
159            .await
160    }
161
162    /// Get the record publisher for this topic.
163    pub async fn record_creator(&self) -> Result<crate::crypto::RecordPublisher> {
164        self.api
165            .call(act_ok!(actor => async move { actor.record_publisher.clone() }))
166            .await
167    }
168}
169
170impl Actor<anyhow::Error> for TopicActor {
171    async fn run(&mut self) -> Result<()> {
172        loop {
173            tokio::select! {
174                Ok(action) = self.rx.recv_async() => {
175                    let _ = action(self).await;
176                }
177                _ = tokio::signal::ctrl_c() => {
178                    break;
179                }
180            }
181        }
182        Ok(())
183    }
184}
185
186impl TopicActor {
187    pub async fn start_publishing(&mut self) -> Result<()> {
188        tracing::debug!("TopicActor: initializing publisher");
189        let publisher = Publisher::new(
190            self.record_publisher.clone(),
191            self.bootstrap.gossip_receiver().await?,
192        )?;
193        self.publisher = Some(publisher);
194        tracing::debug!("TopicActor: publisher started");
195        Ok(())
196    }
197
198    pub async fn start_bubble_merge(&mut self) -> Result<()> {
199        tracing::debug!("TopicActor: initializing bubble merge");
200        let bubble_merge = BubbleMerge::new(
201            self.record_publisher.clone(),
202            self.bootstrap.gossip_sender().await?,
203            self.bootstrap.gossip_receiver().await?,
204        )?;
205        self.bubble_merge = Some(bubble_merge);
206        tracing::debug!("TopicActor: bubble merge started");
207        Ok(())
208    }
209
210    pub async fn start_message_overlap_merge(&mut self) -> Result<()> {
211        tracing::debug!("TopicActor: initializing message overlap merge");
212        let message_overlap_merge = MessageOverlapMerge::new(
213            self.record_publisher.clone(),
214            self.bootstrap.gossip_sender().await?,
215            self.bootstrap.gossip_receiver().await?,
216        )?;
217        self.message_overlap_merge = Some(message_overlap_merge);
218        tracing::debug!("TopicActor: message overlap merge started");
219        Ok(())
220    }
221}