distributed_topic_tracker/topic/
topic.rs

1use crate::{GossipSender, actor::Actor};
2use anyhow::Result;
3use sha2::Digest;
4
5#[derive(Debug, Clone)]
6pub struct TopicId {
7    _raw: String,
8    hash: [u8; 32], // sha512( raw )[..32]
9}
10
11impl TopicId {
12    pub fn new(raw: String) -> Self {
13        let mut raw_hash = sha2::Sha512::new();
14        raw_hash.update(raw.as_bytes());
15
16        Self {
17            _raw: raw,
18            hash: raw_hash.finalize()[..32]
19                .try_into()
20                .expect("hashing 'raw' failed"),
21        }
22    }
23
24    pub fn hash(&self) -> [u8; 32] {
25        self.hash
26    }
27
28    #[allow(dead_code)]
29    pub(crate) fn raw(&self) -> &str {
30        &self._raw
31    }
32}
33
34#[derive(Debug, Clone)]
35pub struct Topic {
36    api: crate::actor::Handle<TopicActor>,
37}
38
39#[derive(Debug)]
40struct TopicActor {
41    rx: tokio::sync::mpsc::Receiver<crate::actor::Action<Self>>,
42    bootstrap: crate::topic::bootstrap::Bootstrap,
43    publisher: Option<crate::topic::publisher::Publisher>,
44    bubble_merge: Option<crate::merge::bubble::BubbleMerge>,
45    message_overlap_merge: Option<crate::merge::message_overlap::MessageOverlapMerge>,
46    record_publisher: crate::crypto::record::RecordPublisher,
47}
48
49impl Topic {
50    pub async fn new(
51        record_publisher: crate::crypto::record::RecordPublisher,
52        gossip: iroh_gossip::net::Gossip,
53        async_bootstrap: bool,
54    ) -> Result<Self> {
55        let (api, rx) = crate::actor::Handle::channel(32);
56
57        let bootstrap =
58            crate::topic::bootstrap::Bootstrap::new(record_publisher.clone(), gossip.clone())
59                .await?;
60
61        tokio::spawn({
62            let bootstrap = bootstrap.clone();
63            async move {
64                let mut actor = TopicActor {
65                    rx,
66                    bootstrap: bootstrap.clone(),
67                    record_publisher,
68                    publisher: None,
69                    bubble_merge: None,
70                    message_overlap_merge: None,
71                };
72                let _ = actor.run().await;
73            }
74        });
75
76        let bootstrap_done = bootstrap.bootstrap().await?;
77        if !async_bootstrap {
78            bootstrap_done.await?;
79        }
80
81        // Spawn publisher after bootstrap
82        let _ = api
83            .call(move |actor| Box::pin(actor.start_publishing()))
84            .await;
85
86        let _ = api
87            .call(move |actor| Box::pin(actor.start_bubble_merge()))
88            .await;
89
90        let _ = api
91            .call(move |actor| Box::pin(actor.start_message_overlap_merge()))
92            .await;
93
94        Ok(Self { api })
95    }
96
97    pub async fn split(&self) -> Result<(GossipSender, crate::gossip::receiver::GossipReceiver)> {
98        Ok((self.gossip_sender().await?, self.gossip_receiver().await?))
99    }
100
101    pub async fn gossip_sender(&self) -> Result<GossipSender> {
102        self.api
103            .call(move |actor| Box::pin(actor.gossip_sender()))
104            .await
105    }
106
107    pub async fn gossip_receiver(&self) -> Result<crate::gossip::receiver::GossipReceiver> {
108        self.api
109            .call(move |actor| Box::pin(actor.gossip_receiver()))
110            .await
111    }
112
113    pub async fn record_creator(&self) -> Result<crate::crypto::record::RecordPublisher> {
114        self.api
115            .call(move |actor| Box::pin(async move { Ok(actor.record_publisher.clone()) }))
116            .await
117    }
118}
119
120impl Actor for TopicActor {
121    async fn run(&mut self) -> Result<()> {
122        loop {
123            tokio::select! {
124                Some(action) = self.rx.recv() => {
125                    let _ = action(self).await;
126                }
127                _ = tokio::signal::ctrl_c() => {
128                    break;
129                }
130            }
131        }
132        Ok(())
133    }
134}
135
136impl TopicActor {
137    pub async fn gossip_receiver(&mut self) -> Result<crate::gossip::receiver::GossipReceiver> {
138        self.bootstrap.gossip_receiver().await
139    }
140
141    pub async fn gossip_sender(&mut self) -> Result<GossipSender> {
142        self.bootstrap.gossip_sender().await
143    }
144
145    pub async fn start_publishing(&mut self) -> Result<()> {
146        let publisher = crate::topic::publisher::Publisher::new(
147            self.record_publisher.clone(),
148            self.gossip_receiver().await?,
149        )?;
150        self.publisher = Some(publisher);
151        Ok(())
152    }
153
154    pub async fn start_bubble_merge(&mut self) -> Result<()> {
155        let bubble_merge = crate::merge::bubble::BubbleMerge::new(
156            self.record_publisher.clone(),
157            self.gossip_sender().await?,
158            self.gossip_receiver().await?,
159        )?;
160        self.bubble_merge = Some(bubble_merge);
161        Ok(())
162    }
163
164    pub async fn start_message_overlap_merge(&mut self) -> Result<()> {
165        let message_overlap_merge = crate::merge::message_overlap::MessageOverlapMerge::new(
166            self.record_publisher.clone(),
167            self.gossip_sender().await?,
168            self.gossip_receiver().await?,
169        )?;
170        self.message_overlap_merge = Some(message_overlap_merge);
171        Ok(())
172    }
173}