distributed_topic_tracker/topic/
topic.rs1use crate::{GossipSender, actor::Actor};
2use anyhow::Result;
3use sha2::Digest;
4
5#[derive(Debug, Clone)]
6pub struct TopicId {
7 _raw: String,
8 hash: [u8; 32], }
10
11impl TopicId {
12 pub fn new(raw: String) -> Self {
13 let mut raw_hash = sha2::Sha512::new();
14 raw_hash.update(raw.as_bytes());
15
16 Self {
17 _raw: raw,
18 hash: raw_hash.finalize()[..32]
19 .try_into()
20 .expect("hashing 'raw' failed"),
21 }
22 }
23
24 pub fn hash(&self) -> [u8; 32] {
25 self.hash
26 }
27
28 #[allow(dead_code)]
29 pub(crate) fn raw(&self) -> &str {
30 &self._raw
31 }
32}
33
34pub struct Topic {
35 api: crate::actor::Handle<TopicActor>,
36}
37
38struct TopicActor {
39 rx: tokio::sync::mpsc::Receiver<crate::actor::Action<Self>>,
40 bootstrap: crate::topic::bootstrap::Bootstrap,
41 publisher: Option<crate::topic::publisher::Publisher>,
42 bubble_merge: Option<crate::merge::bubble::BubbleMerge>,
43 message_overlap_merge: Option<crate::merge::message_overlap::MessageOverlapMerge>,
44 record_publisher: crate::crypto::record::RecordPublisher,
45}
46
47impl Topic {
48 pub async fn new(
49 record_publisher: crate::crypto::record::RecordPublisher,
50 gossip: iroh_gossip::net::Gossip,
51 async_bootstrap: bool,
52 ) -> Result<Self> {
53 let (api, rx) = crate::actor::Handle::channel(32);
54
55 let bootstrap =
56 crate::topic::bootstrap::Bootstrap::new(record_publisher.clone(), gossip.clone())
57 .await?;
58
59 tokio::spawn({
60 let bootstrap = bootstrap.clone();
61 async move {
62 let mut actor = TopicActor {
63 rx,
64 bootstrap: bootstrap.clone(),
65 record_publisher,
66 publisher: None,
67 bubble_merge: None,
68 message_overlap_merge: None,
69 };
70 let _ = actor.run().await;
71 }
72 });
73
74 let bootstrap_done = bootstrap.bootstrap().await?;
75 if !async_bootstrap {
76 bootstrap_done.await?;
77 }
78
79 let _ = api
81 .call(move |actor| Box::pin(actor.start_publishing()))
82 .await;
83
84 let _ = api
85 .call(move |actor| Box::pin(actor.start_bubble_merge()))
86 .await;
87
88 let _ = api
89 .call(move |actor| Box::pin(actor.start_message_overlap_merge()))
90 .await;
91
92 Ok(Self { api })
93 }
94
95 pub async fn split(&self) -> Result<(GossipSender, crate::gossip::receiver::GossipReceiver)> {
96 Ok((self.gossip_sender().await?, self.gossip_receiver().await?))
97 }
98
99 pub async fn gossip_sender(&self) -> Result<GossipSender> {
100 self.api
101 .call(move |actor| Box::pin(actor.gossip_sender()))
102 .await
103 }
104
105 pub async fn gossip_receiver(&self) -> Result<crate::gossip::receiver::GossipReceiver> {
106 self.api
107 .call(move |actor| Box::pin(actor.gossip_receiver()))
108 .await
109 }
110
111 pub async fn record_creator(&self) -> Result<crate::crypto::record::RecordPublisher> {
112 self.api
113 .call(move |actor| Box::pin(async move { Ok(actor.record_publisher.clone()) }))
114 .await
115 }
116}
117
118impl Actor for TopicActor {
119 async fn run(&mut self) -> Result<()> {
120 loop {
121 tokio::select! {
122 Some(action) = self.rx.recv() => {
123 let _ = action(self).await;
124 }
125 _ = tokio::signal::ctrl_c() => {
126 break;
127 }
128 }
129 }
130 Ok(())
131 }
132}
133
134impl TopicActor {
135 pub async fn gossip_receiver(&mut self) -> Result<crate::gossip::receiver::GossipReceiver> {
136 self.bootstrap.gossip_receiver().await
137 }
138
139 pub async fn gossip_sender(&mut self) -> Result<GossipSender> {
140 self.bootstrap.gossip_sender().await
141 }
142
143 pub async fn start_publishing(&mut self) -> Result<()> {
144 let publisher = crate::topic::publisher::Publisher::new(
145 self.record_publisher.clone(),
146 self.gossip_receiver().await?,
147 )?;
148 self.publisher = Some(publisher);
149 Ok(())
150 }
151
152 pub async fn start_bubble_merge(&mut self) -> Result<()> {
153 let bubble_merge = crate::merge::bubble::BubbleMerge::new(
154 self.record_publisher.clone(),
155 self.gossip_sender().await?,
156 self.gossip_receiver().await?,
157 )?;
158 self.bubble_merge = Some(bubble_merge);
159 Ok(())
160 }
161
162 pub async fn start_message_overlap_merge(&mut self) -> Result<()> {
163 let message_overlap_merge = crate::merge::message_overlap::MessageOverlapMerge::new(
164 self.record_publisher.clone(),
165 self.gossip_sender().await?,
166 self.gossip_receiver().await?,
167 )?;
168 self.message_overlap_merge = Some(message_overlap_merge);
169 Ok(())
170 }
171}