distributed_topic_tracker/
dht.rs1use std::time::Duration;
7
8use actor_helper::{Action, Actor, Handle, Receiver, act};
9use anyhow::{Context, Result, bail};
10use ed25519_dalek::VerifyingKey;
11use futures_lite::StreamExt;
12use mainline::{MutableItem, SigningKey};
13
14const RETRY_DEFAULT: usize = 3;
15
16#[derive(Debug, Clone)]
21pub struct Dht {
22 api: Handle<DhtActor, anyhow::Error>,
23}
24
25#[derive(Debug)]
26struct DhtActor {
27 rx: Receiver<Action<Self>>,
28 dht: Option<mainline::async_dht::AsyncDht>,
29}
30
31impl Dht {
32 pub fn new() -> Self {
36 let (api, rx) = Handle::channel();
37
38 tokio::spawn(async move {
39 let mut actor = DhtActor { rx, dht: None };
40 let _ = actor.run().await;
41 });
42
43 Self { api }
44 }
45
46 pub async fn get(
55 &self,
56 pub_key: VerifyingKey,
57 salt: Option<Vec<u8>>,
58 more_recent_than: Option<i64>,
59 timeout: Duration,
60 ) -> Result<Vec<MutableItem>> {
61 self.api
62 .call(act!(actor => actor.get(pub_key, salt, more_recent_than, timeout)))
63 .await
64 }
65
66 pub async fn put_mutable(
77 &self,
78 signing_key: SigningKey,
79 pub_key: VerifyingKey,
80 salt: Option<Vec<u8>>,
81 data: Vec<u8>,
82 retry_count: Option<usize>,
83 timeout: Duration,
84 ) -> Result<()> {
85 self.api.call(act!(actor => actor.put_mutable(signing_key, pub_key, salt, data, retry_count, timeout))).await
86 }
87}
88
89impl Default for Dht {
90 fn default() -> Self {
91 Self::new()
92 }
93}
94
95impl Actor<anyhow::Error> for DhtActor {
96 async fn run(&mut self) -> Result<()> {
97 loop {
98 tokio::select! {
99 Ok(action) = self.rx.recv_async() => {
100 action(self).await;
101 }
102 _ = tokio::signal::ctrl_c() => {
103 break;
104 }
105 }
106 }
107 Err(anyhow::anyhow!("actor stopped"))
108 }
109}
110
111impl DhtActor {
112 pub async fn get(
113 &mut self,
114 pub_key: VerifyingKey,
115 salt: Option<Vec<u8>>,
116 more_recent_than: Option<i64>,
117 timeout: Duration,
118 ) -> Result<Vec<MutableItem>> {
119 if self.dht.is_none() {
120 self.reset().await?;
121 }
122
123 let dht = self.dht.as_mut().context("DHT not initialized")?;
124 Ok(tokio::time::timeout(
125 timeout,
126 dht.get_mutable(pub_key.as_bytes(), salt.as_deref(), more_recent_than)
127 .collect::<Vec<_>>(),
128 )
129 .await?)
130 }
131
132 pub async fn put_mutable(
133 &mut self,
134 signing_key: SigningKey,
135 pub_key: VerifyingKey,
136 salt: Option<Vec<u8>>,
137 data: Vec<u8>,
138 retry_count: Option<usize>,
139 timeout: Duration,
140 ) -> Result<()> {
141 if self.dht.is_none() {
142 self.reset().await?;
143 }
144
145 for i in 0..retry_count.unwrap_or(RETRY_DEFAULT) {
146 let dht = self.dht.as_mut().context("DHT not initialized")?;
147
148 let most_recent_result = tokio::time::timeout(
149 timeout,
150 dht.get_mutable_most_recent(pub_key.as_bytes(), salt.as_deref()),
151 )
152 .await?;
153
154 let item = if let Some(mut_item) = most_recent_result {
155 MutableItem::new(
156 signing_key.clone(),
157 &data,
158 mut_item.seq() + 1,
159 salt.as_deref(),
160 )
161 } else {
162 MutableItem::new(signing_key.clone(), &data, 0, salt.as_deref())
163 };
164
165 let put_result = match tokio::time::timeout(
166 Duration::from_secs(10),
167 dht.put_mutable(item.clone(), Some(item.seq())),
168 )
169 .await
170 {
171 Ok(result) => result.ok(),
172 Err(_) => None,
173 };
174
175 if put_result.is_some() {
176 break;
177 } else if i == retry_count.unwrap_or(RETRY_DEFAULT) - 1 {
178 bail!("failed to publish record")
179 }
180
181 self.reset().await?;
182
183 tokio::time::sleep(Duration::from_millis(rand::random::<u64>() % 2000)).await;
184 }
185 Ok(())
186 }
187
188 async fn reset(&mut self) -> Result<()> {
189 self.dht = Some(mainline::Dht::builder().build()?.as_async());
190 Ok(())
191 }
192}