Skip to main content

distributed_topic_tracker/
dht.rs

1//! Mainline BitTorrent DHT client for mutable record operations.
2//!
3//! Provides async interface for DHT get/put operations with automatic
4//! retry logic and connection management.
5
6use std::time::Duration;
7
8use actor_helper::{Handle, act};
9use anyhow::{Context, Result, bail};
10use ed25519_dalek::VerifyingKey;
11use futures_lite::StreamExt;
12use mainline::{MutableItem, SigningKey};
13
14use crate::config::DhtConfig;
15
16/// DHT client wrapper with actor-based concurrency.
17///
18/// Manages connections to the mainline DHT and handles
19/// mutable record get/put operations with automatic retries.
20#[derive(Debug, Clone)]
21pub struct Dht {
22    api: Handle<DhtActor, anyhow::Error>,
23}
24
25#[derive(Debug, Default)]
26struct DhtActor {
27    dht: Option<mainline::async_dht::AsyncDht>,
28    config: DhtConfig,
29}
30
31impl Dht {
32    /// Create a new DHT client.
33    ///
34    /// Spawns a background actor for handling DHT operations.
35    pub fn new(dht_config: &DhtConfig) -> Self {
36        Self {
37            api: Handle::spawn(DhtActor {
38                dht: None,
39                config: dht_config.clone(),
40            })
41            .0,
42        }
43    }
44
45    /// Retrieve mutable records from the DHT.
46    ///
47    /// # Arguments
48    ///
49    /// * `pub_key` - Ed25519 public key for the record
50    /// * `salt` - Optional salt for record lookup
51    /// * `more_recent_than` - Sequence number filter (get records newer than this)
52    pub async fn get(
53        &self,
54        pub_key: VerifyingKey,
55        salt: Option<Vec<u8>>,
56        more_recent_than: Option<i64>,
57    ) -> Result<Vec<MutableItem>> {
58        self.api
59            .call(act!(actor => actor.get(pub_key, salt, more_recent_than)))
60            .await
61    }
62
63    /// Publish a mutable record to the DHT.
64    ///
65    /// # Arguments
66    ///
67    /// * `signing_key` - Ed25519 secret key for signing
68    /// * `salt` - Optional salt for record slot
69    /// * `data` - Record value to publish
70    /// * `next_unix_minute_seq` - Sequence number for the record (per unix_minute)
71    pub async fn put_mutable(
72        &self,
73        signing_key: SigningKey,
74        salt: Option<Vec<u8>>,
75        data: Vec<u8>,
76        next_unix_minute_seq: i64,
77    ) -> Result<()> {
78        self.api
79            .call(act!(actor => actor.put_mutable(signing_key, salt, data, next_unix_minute_seq)))
80            .await
81    }
82}
83
84impl DhtActor {
85    pub async fn get(
86        &mut self,
87        pub_key: VerifyingKey,
88        salt: Option<Vec<u8>>,
89        more_recent_than: Option<i64>,
90    ) -> Result<Vec<MutableItem>> {
91        if self.dht.is_none() {
92            self.reset().await?;
93        }
94
95        let dht = self.dht.as_mut().context("DHT not initialized")?;
96        match tokio::time::timeout(
97            self.config.get_timeout(),
98            dht.get_mutable(pub_key.as_bytes(), salt.as_deref(), more_recent_than)
99                .collect::<Vec<_>>(),
100        )
101        .await
102        {
103            Ok(items) => Ok(items),
104            Err(_) => {
105                tracing::warn!("DHT get operation timed out");
106                bail!("DHT get operation timed out")
107            }
108        }
109    }
110
111    pub async fn put_mutable(
112        &mut self,
113        signing_key: SigningKey,
114        salt: Option<Vec<u8>>,
115        data: Vec<u8>,
116        next_unix_minute_seq: i64,
117    ) -> Result<()> {
118        if self.dht.is_none() {
119            self.reset().await?;
120        }
121
122        for i in 0..1 + self.config.retries() {
123            let dht = self.dht.as_mut().context("DHT not initialized")?;
124
125            let item = MutableItem::new(
126                signing_key.clone(),
127                &data,
128                next_unix_minute_seq,
129                salt.as_deref(),
130            );
131
132            let put_result = match tokio::time::timeout(
133                self.config.put_timeout(),
134                dht.put_mutable(item.clone(), Some(item.seq())),
135            )
136            .await
137            {
138                Ok(result) => match result {
139                    Ok(id) => Some(id),
140                    Err(err) => {
141                        tracing::warn!("DHT put_mutable operation failed: {err:?}");
142                        None
143                    }
144                },
145                Err(_) => None,
146            };
147
148            if put_result.is_some() {
149                break;
150            } else if i == self.config.retries() {
151                bail!("failed to publish record")
152            }
153
154            self.reset().await?;
155
156            let jitter = if self.config.max_retry_jitter() > Duration::ZERO {
157                Duration::from_nanos(
158                    (rand::random::<u128>() % self.config.max_retry_jitter().as_nanos()) as u64,
159                )
160            } else {
161                Duration::ZERO
162            };
163            let retry_interval = self.config.base_retry_interval() + jitter;
164
165            tracing::debug!(
166                "DHTActor: put_mutable attempt {}/{} failed, retrying in {}ms",
167                i + 1,
168                1 + self.config.retries(),
169                retry_interval.as_millis()
170            );
171            tokio::time::sleep(retry_interval).await;
172        }
173        Ok(())
174    }
175
176    async fn reset(&mut self) -> Result<()> {
177        self.dht = Some(mainline::Dht::builder().build()?.as_async());
178        Ok(())
179    }
180}