distributed_topic_tracker/topic/
topic.rs

1use crate::{GossipSender, actor::Actor};
2use anyhow::Result;
3use sha2::Digest;
4
5#[derive(Debug, Clone)]
6pub struct TopicId {
7    _raw: String,
8    hash: [u8; 32], // sha512( raw )[..32]
9}
10
11impl TopicId {
12    pub fn new(raw: String) -> Self {
13        let mut raw_hash = sha2::Sha512::new();
14        raw_hash.update(raw.as_bytes());
15
16        Self {
17            _raw: raw,
18            hash: raw_hash.finalize()[..32]
19                .try_into()
20                .expect("hashing 'raw' failed"),
21        }
22    }
23
24    pub fn hash(&self) -> [u8; 32] {
25        self.hash
26    }
27
28    #[allow(dead_code)]
29    pub(crate) fn raw(&self) -> &str {
30        &self._raw
31    }
32}
33
34pub struct Topic {
35    api: crate::actor::Handle<TopicActor>,
36}
37
38struct TopicActor {
39    rx: tokio::sync::mpsc::Receiver<crate::actor::Action<Self>>,
40    bootstrap: crate::topic::bootstrap::Bootstrap,
41    publisher: Option<crate::topic::publisher::Publisher>,
42    bubble_merge: Option<crate::merge::bubble::BubbleMerge>,
43    message_overlap_merge: Option<crate::merge::message_overlap::MessageOverlapMerge>,
44    record_publisher: crate::crypto::record::RecordPublisher,
45}
46
47impl Topic {
48    pub async fn new(
49        record_publisher: crate::crypto::record::RecordPublisher,
50        gossip: iroh_gossip::net::Gossip,
51        async_bootstrap: bool,
52    ) -> Result<Self> {
53        let (api, rx) = crate::actor::Handle::channel(32);
54
55        let bootstrap =
56            crate::topic::bootstrap::Bootstrap::new(record_publisher.clone(), gossip.clone())
57                .await?;
58
59        tokio::spawn({
60            let bootstrap = bootstrap.clone();
61            async move {
62                let mut actor = TopicActor {
63                    rx,
64                    bootstrap: bootstrap.clone(),
65                    record_publisher,
66                    publisher: None,
67                    bubble_merge: None,
68                    message_overlap_merge: None,
69                };
70                let _ = actor.run().await;
71            }
72        });
73
74        let bootstrap_done = bootstrap.bootstrap().await?;
75        if !async_bootstrap {
76            bootstrap_done.await?;
77        }
78
79        // Spawn publisher after bootstrap
80        let _ = api
81            .call(move |actor| Box::pin(actor.start_publishing()))
82            .await;
83
84        let _ = api
85            .call(move |actor| Box::pin(actor.start_bubble_merge()))
86            .await;
87
88        let _ = api
89            .call(move |actor| Box::pin(actor.start_message_overlap_merge()))
90            .await;
91
92        Ok(Self { api })
93    }
94
95    pub async fn split(&self) -> Result<(GossipSender, crate::gossip::receiver::GossipReceiver)> {
96        Ok((self.gossip_sender().await?, self.gossip_receiver().await?))
97    }
98
99    pub async fn gossip_sender(&self) -> Result<GossipSender> {
100        self.api
101            .call(move |actor| Box::pin(actor.gossip_sender()))
102            .await
103    }
104
105    pub async fn gossip_receiver(&self) -> Result<crate::gossip::receiver::GossipReceiver> {
106        self.api
107            .call(move |actor| Box::pin(actor.gossip_receiver()))
108            .await
109    }
110
111    pub async fn record_creator(&self) -> Result<crate::crypto::record::RecordPublisher> {
112        self.api
113            .call(move |actor| Box::pin(async move { Ok(actor.record_publisher.clone()) }))
114            .await
115    }
116}
117
118impl Actor for TopicActor {
119    async fn run(&mut self) -> Result<()> {
120        loop {
121            tokio::select! {
122                Some(action) = self.rx.recv() => {
123                    let _ = action(self).await;
124                }
125                _ = tokio::signal::ctrl_c() => {
126                    break;
127                }
128            }
129        }
130        Ok(())
131    }
132}
133
134impl TopicActor {
135    pub async fn gossip_receiver(&mut self) -> Result<crate::gossip::receiver::GossipReceiver> {
136        self.bootstrap.gossip_receiver().await
137    }
138
139    pub async fn gossip_sender(&mut self) -> Result<GossipSender> {
140        self.bootstrap.gossip_sender().await
141    }
142
143    pub async fn start_publishing(&mut self) -> Result<()> {
144        let publisher = crate::topic::publisher::Publisher::new(
145            self.record_publisher.clone(),
146            self.gossip_receiver().await?,
147        )?;
148        self.publisher = Some(publisher);
149        Ok(())
150    }
151
152    pub async fn start_bubble_merge(&mut self) -> Result<()> {
153        let bubble_merge = crate::merge::bubble::BubbleMerge::new(
154            self.record_publisher.clone(),
155            self.gossip_sender().await?,
156            self.gossip_receiver().await?,
157        )?;
158        self.bubble_merge = Some(bubble_merge);
159        Ok(())
160    }
161
162    pub async fn start_message_overlap_merge(&mut self) -> Result<()> {
163        let message_overlap_merge = crate::merge::message_overlap::MessageOverlapMerge::new(
164            self.record_publisher.clone(),
165            self.gossip_sender().await?,
166            self.gossip_receiver().await?,
167        )?;
168        self.message_overlap_merge = Some(message_overlap_merge);
169        Ok(())
170    }
171}