distributed_topic_tracker/gossip/topic/
topic.rs

1use crate::{
2    crypto::RecordTopic, gossip::{
3        merge::{BubbleMerge, MessageOverlapMerge},
4        topic::{bootstrap::Bootstrap, publisher::Publisher},
5    }, GossipSender
6};
7use actor_helper::{act, act_ok, Action, Actor, Handle};
8use anyhow::Result;
9use sha2::Digest;
10
11#[derive(Debug, Clone)]
12pub struct TopicId {
13    _raw: String,
14    hash: [u8; 32], // sha512( raw )[..32]
15}
16
17impl Into<RecordTopic> for TopicId {
18    fn into(self) -> RecordTopic {
19        RecordTopic::from_bytes(&self.hash)
20    }
21}
22
23impl TopicId {
24    pub fn new(raw: String) -> Self {
25        let mut raw_hash = sha2::Sha512::new();
26        raw_hash.update(raw.as_bytes());
27
28        Self {
29            _raw: raw,
30            hash: raw_hash.finalize()[..32]
31                .try_into()
32                .expect("hashing 'raw' failed"),
33        }
34    }
35
36    pub fn hash(&self) -> [u8; 32] {
37        self.hash
38    }
39
40    #[allow(dead_code)]
41    pub fn raw(&self) -> &str {
42        &self._raw
43    }
44}
45
46#[derive(Debug, Clone)]
47pub struct Topic {
48    api: Handle<TopicActor>,
49}
50
51#[derive(Debug)]
52struct TopicActor {
53    rx: tokio::sync::mpsc::Receiver<Action<Self>>,
54    bootstrap: Bootstrap,
55    publisher: Option<Publisher>,
56    bubble_merge: Option<BubbleMerge>,
57    message_overlap_merge: Option<MessageOverlapMerge>,
58    record_publisher: crate::crypto::RecordPublisher,
59}
60
61impl Topic {
62    pub async fn new(
63        record_publisher: crate::crypto::RecordPublisher,
64        gossip: iroh_gossip::net::Gossip,
65        async_bootstrap: bool,
66    ) -> Result<Self> {
67        let (api, rx) = Handle::channel(32);
68
69        let bootstrap = Bootstrap::new(record_publisher.clone(), gossip.clone()).await?;
70
71        tokio::spawn({
72            let bootstrap = bootstrap.clone();
73            async move {
74                let mut actor = TopicActor {
75                    rx,
76                    bootstrap: bootstrap.clone(),
77                    record_publisher,
78                    publisher: None,
79                    bubble_merge: None,
80                    message_overlap_merge: None,
81                };
82                let _ = actor.run().await;
83            }
84        });
85
86        let bootstrap_done = bootstrap.bootstrap().await?;
87        if !async_bootstrap {
88            bootstrap_done.await?;
89        }
90        
91        // Spawn publisher after bootstrap
92        let _ = api
93            .call(act!(actor => actor.start_publishing()))
94            .await;
95
96        let _ = api
97            .call(act!(actor => actor.start_bubble_merge()))
98            .await;
99
100        let _ = api
101            .call(act!(actor => actor.start_message_overlap_merge()))
102            .await;
103
104        Ok(Self { api })
105    }
106
107    pub async fn split(&self) -> Result<(GossipSender, crate::gossip::receiver::GossipReceiver)> {
108        Ok((self.gossip_sender().await?, self.gossip_receiver().await?))
109    }
110
111    pub async fn gossip_sender(&self) -> Result<GossipSender> {
112        self.api
113            .call(act!(actor => actor.bootstrap.gossip_sender()))
114            .await
115    }
116
117    pub async fn gossip_receiver(&self) -> Result<crate::gossip::receiver::GossipReceiver> {
118        self.api
119            .call(act!(actor => actor.bootstrap.gossip_receiver()))
120            .await
121    }
122
123    pub async fn record_creator(&self) -> Result<crate::crypto::RecordPublisher> {
124        self.api
125            .call(act_ok!(actor => async move { actor.record_publisher.clone() }))
126            .await
127    }
128}
129
130impl Actor for TopicActor {
131    async fn run(&mut self) -> Result<()> {
132        loop {
133            tokio::select! {
134                Some(action) = self.rx.recv() => {
135                    let _ = action(self).await;
136                }
137                _ = tokio::signal::ctrl_c() => {
138                    break;
139                }
140            }
141        }
142        Ok(())
143    }
144}
145
146impl TopicActor {
147
148    pub async fn start_publishing(&mut self) -> Result<()> {
149        let publisher =
150            Publisher::new(self.record_publisher.clone(), self.bootstrap.gossip_receiver().await?)?;
151        self.publisher = Some(publisher);
152        Ok(())
153    }
154
155    pub async fn start_bubble_merge(&mut self) -> Result<()> {
156        let bubble_merge = BubbleMerge::new(
157            self.record_publisher.clone(),
158            self.bootstrap.gossip_sender().await?,
159            self.bootstrap.gossip_receiver().await?,
160        )?;
161        self.bubble_merge = Some(bubble_merge);
162        Ok(())
163    }
164
165    pub async fn start_message_overlap_merge(&mut self) -> Result<()> {
166        let message_overlap_merge = MessageOverlapMerge::new(
167            self.record_publisher.clone(),
168            self.bootstrap.gossip_sender().await?,
169            self.bootstrap.gossip_receiver().await?,
170        )?;
171        self.message_overlap_merge = Some(message_overlap_merge);
172        Ok(())
173    }
174}