distributed_topic_tracker/
dht.rs

1//! Mainline BitTorrent DHT client for mutable record operations.
2//!
3//! Provides async interface for DHT get/put operations with automatic
4//! retry logic and connection management.
5
6use std::time::Duration;
7
8use actor_helper::{Action, Actor, Handle, Receiver, act};
9use anyhow::{Context, Result, bail};
10use ed25519_dalek::VerifyingKey;
11use futures_lite::StreamExt;
12use mainline::{MutableItem, SigningKey};
13
14const RETRY_DEFAULT: usize = 3;
15
16/// DHT client wrapper with actor-based concurrency.
17///
18/// Manages connections to the mainline DHT and handles
19/// mutable record get/put operations with automatic retries.
20#[derive(Debug, Clone)]
21pub struct Dht {
22    api: Handle<DhtActor, anyhow::Error>,
23}
24
25#[derive(Debug)]
26struct DhtActor {
27    rx: Receiver<Action<Self>>,
28    dht: Option<mainline::async_dht::AsyncDht>,
29}
30
31impl Dht {
32    /// Create a new DHT client.
33    ///
34    /// Spawns a background actor for handling DHT operations.
35    pub fn new() -> Self {
36        let (api, rx) = Handle::channel();
37
38        tokio::spawn(async move {
39            let mut actor = DhtActor { rx, dht: None };
40            let _ = actor.run().await;
41        });
42
43        Self { api }
44    }
45
46    /// Retrieve mutable records from the DHT.
47    ///
48    /// # Arguments
49    ///
50    /// * `pub_key` - Ed25519 public key for the record
51    /// * `salt` - Optional salt for record lookup
52    /// * `more_recent_than` - Sequence number filter (get records newer than this)
53    /// * `timeout` - Maximum time to wait for results
54    pub async fn get(
55        &self,
56        pub_key: VerifyingKey,
57        salt: Option<Vec<u8>>,
58        more_recent_than: Option<i64>,
59        timeout: Duration,
60    ) -> Result<Vec<MutableItem>> {
61        self.api
62            .call(act!(actor => actor.get(pub_key, salt, more_recent_than, timeout)))
63            .await
64    }
65
66    /// Publish a mutable record to the DHT.
67    ///
68    /// # Arguments
69    ///
70    /// * `signing_key` - Ed25519 secret key for signing
71    /// * `pub_key` - Ed25519 public key (used for routing)
72    /// * `salt` - Optional salt for record slot
73    /// * `data` - Record value to publish
74    /// * `retry_count` - Number of retry attempts (default: 3)
75    /// * `timeout` - Per-request timeout
76    pub async fn put_mutable(
77        &self,
78        signing_key: SigningKey,
79        pub_key: VerifyingKey,
80        salt: Option<Vec<u8>>,
81        data: Vec<u8>,
82        retry_count: Option<usize>,
83        timeout: Duration,
84    ) -> Result<()> {
85        self.api.call(act!(actor => actor.put_mutable(signing_key, pub_key, salt, data, retry_count, timeout))).await
86    }
87}
88
89impl Default for Dht {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95impl Actor<anyhow::Error> for DhtActor {
96    async fn run(&mut self) -> Result<()> {
97        loop {
98            tokio::select! {
99                Ok(action) = self.rx.recv_async() => {
100                    action(self).await;
101                }
102                _ = tokio::signal::ctrl_c() => {
103                    break;
104                }
105            }
106        }
107        Err(anyhow::anyhow!("actor stopped"))
108    }
109}
110
111impl DhtActor {
112    pub async fn get(
113        &mut self,
114        pub_key: VerifyingKey,
115        salt: Option<Vec<u8>>,
116        more_recent_than: Option<i64>,
117        timeout: Duration,
118    ) -> Result<Vec<MutableItem>> {
119        if self.dht.is_none() {
120            self.reset().await?;
121        }
122
123        let dht = self.dht.as_mut().context("DHT not initialized")?;
124        Ok(tokio::time::timeout(
125            timeout,
126            dht.get_mutable(pub_key.as_bytes(), salt.as_deref(), more_recent_than)
127                .collect::<Vec<_>>(),
128        )
129        .await?)
130    }
131
132    pub async fn put_mutable(
133        &mut self,
134        signing_key: SigningKey,
135        pub_key: VerifyingKey,
136        salt: Option<Vec<u8>>,
137        data: Vec<u8>,
138        retry_count: Option<usize>,
139        timeout: Duration,
140    ) -> Result<()> {
141        if self.dht.is_none() {
142            self.reset().await?;
143        }
144
145        for i in 0..retry_count.unwrap_or(RETRY_DEFAULT) {
146            let dht = self.dht.as_mut().context("DHT not initialized")?;
147
148            let most_recent_result = tokio::time::timeout(
149                timeout,
150                dht.get_mutable_most_recent(pub_key.as_bytes(), salt.as_deref()),
151            )
152            .await?;
153
154            let item = if let Some(mut_item) = most_recent_result {
155                MutableItem::new(
156                    signing_key.clone(),
157                    &data,
158                    mut_item.seq() + 1,
159                    salt.as_deref(),
160                )
161            } else {
162                MutableItem::new(signing_key.clone(), &data, 0, salt.as_deref())
163            };
164
165            let put_result = match tokio::time::timeout(
166                Duration::from_secs(10),
167                dht.put_mutable(item.clone(), Some(item.seq())),
168            )
169            .await
170            {
171                Ok(result) => result.ok(),
172                Err(_) => None,
173            };
174
175            if put_result.is_some() {
176                break;
177            } else if i == retry_count.unwrap_or(RETRY_DEFAULT) - 1 {
178                bail!("failed to publish record")
179            }
180
181            self.reset().await?;
182
183            tokio::time::sleep(Duration::from_millis(rand::random::<u64>() % 2000)).await;
184        }
185        Ok(())
186    }
187
188    async fn reset(&mut self) -> Result<()> {
189        self.dht = Some(mainline::Dht::builder().build()?.as_async());
190        Ok(())
191    }
192}