distributed_topic_tracker/topic/
topic.rs1use crate::{GossipSender, actor::Actor};
2use anyhow::Result;
3use sha2::Digest;
4
5#[derive(Debug, Clone)]
6pub struct TopicId {
7 _raw: String,
8 hash: [u8; 32], }
10
11impl TopicId {
12 pub fn new(raw: String) -> Self {
13 let mut raw_hash = sha2::Sha512::new();
14 raw_hash.update(raw.as_bytes());
15
16 Self {
17 _raw: raw,
18 hash: raw_hash.finalize()[..32]
19 .try_into()
20 .expect("hashing 'raw' failed"),
21 }
22 }
23
24 pub fn hash(&self) -> [u8; 32] {
25 self.hash
26 }
27
28 #[allow(dead_code)]
29 pub(crate) fn raw(&self) -> &str {
30 &self._raw
31 }
32}
33
34#[derive(Debug, Clone)]
35pub struct Topic {
36 api: crate::actor::Handle<TopicActor>,
37}
38
39#[derive(Debug)]
40struct TopicActor {
41 rx: tokio::sync::mpsc::Receiver<crate::actor::Action<Self>>,
42 bootstrap: crate::topic::bootstrap::Bootstrap,
43 publisher: Option<crate::topic::publisher::Publisher>,
44 bubble_merge: Option<crate::merge::bubble::BubbleMerge>,
45 message_overlap_merge: Option<crate::merge::message_overlap::MessageOverlapMerge>,
46 record_publisher: crate::crypto::record::RecordPublisher,
47}
48
49impl Topic {
50 pub async fn new(
51 record_publisher: crate::crypto::record::RecordPublisher,
52 gossip: iroh_gossip::net::Gossip,
53 async_bootstrap: bool,
54 ) -> Result<Self> {
55 let (api, rx) = crate::actor::Handle::channel(32);
56
57 let bootstrap =
58 crate::topic::bootstrap::Bootstrap::new(record_publisher.clone(), gossip.clone())
59 .await?;
60
61 tokio::spawn({
62 let bootstrap = bootstrap.clone();
63 async move {
64 let mut actor = TopicActor {
65 rx,
66 bootstrap: bootstrap.clone(),
67 record_publisher,
68 publisher: None,
69 bubble_merge: None,
70 message_overlap_merge: None,
71 };
72 let _ = actor.run().await;
73 }
74 });
75
76 let bootstrap_done = bootstrap.bootstrap().await?;
77 if !async_bootstrap {
78 bootstrap_done.await?;
79 }
80
81 let _ = api
83 .call(move |actor| Box::pin(actor.start_publishing()))
84 .await;
85
86 let _ = api
87 .call(move |actor| Box::pin(actor.start_bubble_merge()))
88 .await;
89
90 let _ = api
91 .call(move |actor| Box::pin(actor.start_message_overlap_merge()))
92 .await;
93
94 Ok(Self { api })
95 }
96
97 pub async fn split(&self) -> Result<(GossipSender, crate::gossip::receiver::GossipReceiver)> {
98 Ok((self.gossip_sender().await?, self.gossip_receiver().await?))
99 }
100
101 pub async fn gossip_sender(&self) -> Result<GossipSender> {
102 self.api
103 .call(move |actor| Box::pin(actor.gossip_sender()))
104 .await
105 }
106
107 pub async fn gossip_receiver(&self) -> Result<crate::gossip::receiver::GossipReceiver> {
108 self.api
109 .call(move |actor| Box::pin(actor.gossip_receiver()))
110 .await
111 }
112
113 pub async fn record_creator(&self) -> Result<crate::crypto::record::RecordPublisher> {
114 self.api
115 .call(move |actor| Box::pin(async move { Ok(actor.record_publisher.clone()) }))
116 .await
117 }
118}
119
120impl Actor for TopicActor {
121 async fn run(&mut self) -> Result<()> {
122 loop {
123 tokio::select! {
124 Some(action) = self.rx.recv() => {
125 let _ = action(self).await;
126 }
127 _ = tokio::signal::ctrl_c() => {
128 break;
129 }
130 }
131 }
132 Ok(())
133 }
134}
135
136impl TopicActor {
137 pub async fn gossip_receiver(&mut self) -> Result<crate::gossip::receiver::GossipReceiver> {
138 self.bootstrap.gossip_receiver().await
139 }
140
141 pub async fn gossip_sender(&mut self) -> Result<GossipSender> {
142 self.bootstrap.gossip_sender().await
143 }
144
145 pub async fn start_publishing(&mut self) -> Result<()> {
146 let publisher = crate::topic::publisher::Publisher::new(
147 self.record_publisher.clone(),
148 self.gossip_receiver().await?,
149 )?;
150 self.publisher = Some(publisher);
151 Ok(())
152 }
153
154 pub async fn start_bubble_merge(&mut self) -> Result<()> {
155 let bubble_merge = crate::merge::bubble::BubbleMerge::new(
156 self.record_publisher.clone(),
157 self.gossip_sender().await?,
158 self.gossip_receiver().await?,
159 )?;
160 self.bubble_merge = Some(bubble_merge);
161 Ok(())
162 }
163
164 pub async fn start_message_overlap_merge(&mut self) -> Result<()> {
165 let message_overlap_merge = crate::merge::message_overlap::MessageOverlapMerge::new(
166 self.record_publisher.clone(),
167 self.gossip_sender().await?,
168 self.gossip_receiver().await?,
169 )?;
170 self.message_overlap_merge = Some(message_overlap_merge);
171 Ok(())
172 }
173}