distributed_topic_tracker/
dht.rs1use std::time::Duration;
7
8use actor_helper::{Handle, act};
9use anyhow::{Context, Result, bail};
10use ed25519_dalek::VerifyingKey;
11use futures_lite::StreamExt;
12use mainline::{MutableItem, SigningKey};
13
14use crate::config::DhtConfig;
15
16#[derive(Debug, Clone)]
21pub struct Dht {
22 api: Handle<DhtActor, anyhow::Error>,
23}
24
25#[derive(Debug, Default)]
26struct DhtActor {
27 dht: Option<mainline::async_dht::AsyncDht>,
28 config: DhtConfig,
29}
30
31impl Dht {
32 pub fn new(dht_config: &DhtConfig) -> Self {
36 Self {
37 api: Handle::spawn(DhtActor {
38 dht: None,
39 config: dht_config.clone(),
40 })
41 .0,
42 }
43 }
44
45 pub async fn get(
53 &self,
54 pub_key: VerifyingKey,
55 salt: Option<Vec<u8>>,
56 more_recent_than: Option<i64>,
57 ) -> Result<Vec<MutableItem>> {
58 self.api
59 .call(act!(actor => actor.get(pub_key, salt, more_recent_than)))
60 .await
61 }
62
63 pub async fn put_mutable(
72 &self,
73 signing_key: SigningKey,
74 salt: Option<Vec<u8>>,
75 data: Vec<u8>,
76 next_unix_minute_seq: i64,
77 ) -> Result<()> {
78 self.api
79 .call(act!(actor => actor.put_mutable(signing_key, salt, data, next_unix_minute_seq)))
80 .await
81 }
82}
83
84impl DhtActor {
85 pub async fn get(
86 &mut self,
87 pub_key: VerifyingKey,
88 salt: Option<Vec<u8>>,
89 more_recent_than: Option<i64>,
90 ) -> Result<Vec<MutableItem>> {
91 if self.dht.is_none() {
92 self.reset().await?;
93 }
94
95 let dht = self.dht.as_mut().context("DHT not initialized")?;
96 match tokio::time::timeout(
97 self.config.get_timeout(),
98 dht.get_mutable(pub_key.as_bytes(), salt.as_deref(), more_recent_than)
99 .collect::<Vec<_>>(),
100 )
101 .await
102 {
103 Ok(items) => Ok(items),
104 Err(_) => {
105 tracing::warn!("DHT get operation timed out");
106 bail!("DHT get operation timed out")
107 }
108 }
109 }
110
111 pub async fn put_mutable(
112 &mut self,
113 signing_key: SigningKey,
114 salt: Option<Vec<u8>>,
115 data: Vec<u8>,
116 next_unix_minute_seq: i64,
117 ) -> Result<()> {
118 if self.dht.is_none() {
119 self.reset().await?;
120 }
121
122 for i in 0..1 + self.config.retries() {
123 let dht = self.dht.as_mut().context("DHT not initialized")?;
124
125 let item = MutableItem::new(
126 signing_key.clone(),
127 &data,
128 next_unix_minute_seq,
129 salt.as_deref(),
130 );
131
132 let put_result = match tokio::time::timeout(
133 self.config.put_timeout(),
134 dht.put_mutable(item.clone(), Some(item.seq())),
135 )
136 .await
137 {
138 Ok(result) => match result {
139 Ok(id) => Some(id),
140 Err(err) => {
141 tracing::warn!("DHT put_mutable operation failed: {err:?}");
142 None
143 }
144 },
145 Err(_) => None,
146 };
147
148 if put_result.is_some() {
149 break;
150 } else if i == self.config.retries() {
151 bail!("failed to publish record")
152 }
153
154 self.reset().await?;
155
156 let jitter = if self.config.max_retry_jitter() > Duration::ZERO {
157 Duration::from_nanos(
158 (rand::random::<u128>() % self.config.max_retry_jitter().as_nanos()) as u64,
159 )
160 } else {
161 Duration::ZERO
162 };
163 let retry_interval = self.config.base_retry_interval() + jitter;
164
165 tracing::debug!(
166 "DHTActor: put_mutable attempt {}/{} failed, retrying in {}ms",
167 i + 1,
168 1 + self.config.retries(),
169 retry_interval.as_millis()
170 );
171 tokio::time::sleep(retry_interval).await;
172 }
173 Ok(())
174 }
175
176 async fn reset(&mut self) -> Result<()> {
177 self.dht = Some(mainline::Dht::builder().build()?.as_async());
178 Ok(())
179 }
180}