distributed_topic_tracker/gossip/topic/
topic.rs1use crate::{
4 GossipSender,
5 crypto::RecordTopic,
6 gossip::{
7 merge::{BubbleMerge, MessageOverlapMerge},
8 topic::{bootstrap::Bootstrap, publisher::Publisher},
9 },
10};
11use actor_helper::{Action, Actor, Handle, Receiver, act, act_ok};
12use anyhow::Result;
13use sha2::Digest;
14
15#[derive(Debug, Clone)]
25pub struct TopicId {
26 _raw: String,
27 hash: [u8; 32],
28}
29
30impl From<TopicId> for RecordTopic {
31 fn from(val: TopicId) -> Self {
32 RecordTopic::from_bytes(&val.hash)
33 }
34}
35
36impl TopicId {
37 pub fn new(raw: String) -> Self {
41 let mut raw_hash = sha2::Sha512::new();
42 raw_hash.update(raw.as_bytes());
43
44 Self {
45 _raw: raw,
46 hash: raw_hash.finalize()[..32]
47 .try_into()
48 .expect("hashing 'raw' failed"),
49 }
50 }
51
52 pub fn hash(&self) -> [u8; 32] {
54 self.hash
55 }
56
57 #[allow(dead_code)]
59 pub fn raw(&self) -> &str {
60 &self._raw
61 }
62}
63
64#[derive(Debug, Clone)]
69pub struct Topic {
70 api: Handle<TopicActor, anyhow::Error>,
71}
72
73#[derive(Debug)]
74struct TopicActor {
75 rx: Receiver<Action<Self>>,
76 bootstrap: Bootstrap,
77 publisher: Option<Publisher>,
78 bubble_merge: Option<BubbleMerge>,
79 message_overlap_merge: Option<MessageOverlapMerge>,
80 record_publisher: crate::crypto::RecordPublisher,
81}
82
83impl Topic {
84 pub async fn new(
92 record_publisher: crate::crypto::RecordPublisher,
93 gossip: iroh_gossip::net::Gossip,
94 async_bootstrap: bool,
95 ) -> Result<Self> {
96 tracing::debug!(
97 "Topic: creating new topic (async_bootstrap={})",
98 async_bootstrap
99 );
100
101 let (api, rx) = Handle::channel();
102
103 let bootstrap = Bootstrap::new(record_publisher.clone(), gossip.clone()).await?;
104 tracing::debug!("Topic: bootstrap instance created");
105
106 tokio::spawn({
107 let bootstrap = bootstrap.clone();
108 async move {
109 tracing::debug!("Topic: starting topic actor");
110 let mut actor = TopicActor {
111 rx,
112 bootstrap: bootstrap.clone(),
113 record_publisher,
114 publisher: None,
115 bubble_merge: None,
116 message_overlap_merge: None,
117 };
118 let _ = actor.run().await;
119 }
120 });
121
122 let bootstrap_done = bootstrap.bootstrap().await?;
123 if !async_bootstrap {
124 tracing::debug!("Topic: waiting for bootstrap to complete");
125 bootstrap_done.await?;
126 tracing::debug!("Topic: bootstrap completed");
127 } else {
128 tracing::debug!("Topic: bootstrap started asynchronously");
129 }
130
131 tracing::debug!("Topic: starting publisher");
132 let _ = api.call(act!(actor => actor.start_publishing())).await;
133
134 tracing::debug!("Topic: starting bubble merge");
135 let _ = api.call(act!(actor => actor.start_bubble_merge())).await;
136
137 tracing::debug!("Topic: starting message overlap merge");
138 let _ = api
139 .call(act!(actor => actor.start_message_overlap_merge()))
140 .await;
141
142 tracing::debug!("Topic: fully initialized");
143 Ok(Self { api })
144 }
145
146 pub async fn split(&self) -> Result<(GossipSender, crate::gossip::receiver::GossipReceiver)> {
148 Ok((self.gossip_sender().await?, self.gossip_receiver().await?))
149 }
150
151 pub async fn gossip_sender(&self) -> Result<GossipSender> {
153 self.api
154 .call(act!(actor => actor.bootstrap.gossip_sender()))
155 .await
156 }
157
158 pub async fn gossip_receiver(&self) -> Result<crate::gossip::receiver::GossipReceiver> {
160 self.api
161 .call(act!(actor => actor.bootstrap.gossip_receiver()))
162 .await
163 }
164
165 pub async fn record_creator(&self) -> Result<crate::crypto::RecordPublisher> {
167 self.api
168 .call(act_ok!(actor => async move { actor.record_publisher.clone() }))
169 .await
170 }
171}
172
173impl Actor<anyhow::Error> for TopicActor {
174 async fn run(&mut self) -> Result<()> {
175 loop {
176 tokio::select! {
177 Ok(action) = self.rx.recv_async() => {
178 let _ = action(self).await;
179 }
180 _ = tokio::signal::ctrl_c() => {
181 break;
182 }
183 }
184 }
185 Ok(())
186 }
187}
188
189impl TopicActor {
190 pub async fn start_publishing(&mut self) -> Result<()> {
191 tracing::debug!("TopicActor: initializing publisher");
192 let publisher = Publisher::new(
193 self.record_publisher.clone(),
194 self.bootstrap.gossip_receiver().await?,
195 )?;
196 self.publisher = Some(publisher);
197 tracing::debug!("TopicActor: publisher started");
198 Ok(())
199 }
200
201 pub async fn start_bubble_merge(&mut self) -> Result<()> {
202 tracing::debug!("TopicActor: initializing bubble merge");
203 let bubble_merge = BubbleMerge::new(
204 self.record_publisher.clone(),
205 self.bootstrap.gossip_sender().await?,
206 self.bootstrap.gossip_receiver().await?,
207 )?;
208 self.bubble_merge = Some(bubble_merge);
209 tracing::debug!("TopicActor: bubble merge started");
210 Ok(())
211 }
212
213 pub async fn start_message_overlap_merge(&mut self) -> Result<()> {
214 tracing::debug!("TopicActor: initializing message overlap merge");
215 let message_overlap_merge = MessageOverlapMerge::new(
216 self.record_publisher.clone(),
217 self.bootstrap.gossip_sender().await?,
218 self.bootstrap.gossip_receiver().await?,
219 )?;
220 self.message_overlap_merge = Some(message_overlap_merge);
221 tracing::debug!("TopicActor: message overlap merge started");
222 Ok(())
223 }
224}