distributed_topic_tracker/
dht.rs

1use std::time::Duration;
2
3use actor_helper::{Action, Actor, Handle, act};
4use anyhow::{Context, Result, bail};
5use ed25519_dalek::VerifyingKey;
6use futures_lite::StreamExt;
7use mainline::{MutableItem, SigningKey};
8
9const RETRY_DEFAULT: usize = 3;
10
11#[derive(Debug, Clone)]
12pub struct Dht {
13    api: Handle<DhtActor>,
14}
15
16#[derive(Debug)]
17struct DhtActor {
18    rx: tokio::sync::mpsc::Receiver<Action<Self>>,
19    dht: Option<mainline::async_dht::AsyncDht>,
20}
21
22impl Dht {
23    pub fn new() -> Self {
24        let (api, rx) = Handle::channel(32);
25
26        tokio::spawn(async move {
27            let mut actor = DhtActor { rx, dht: None };
28            let _ = actor.run().await;
29        });
30
31        Self { api }
32    }
33
34    pub async fn get(
35        &self,
36        pub_key: VerifyingKey,
37        salt: Option<Vec<u8>>,
38        more_recent_then: Option<i64>,
39        timeout: Duration,
40    ) -> Result<Vec<MutableItem>> {
41        self.api
42            .call(act!(actor => actor.get(pub_key, salt, more_recent_then, timeout)))
43            .await
44    }
45
46    pub async fn put_mutable(
47        &self,
48        signing_key: SigningKey,
49        pub_key: VerifyingKey,
50        salt: Option<Vec<u8>>,
51        data: Vec<u8>,
52        retry_count: Option<usize>,
53        timeout: Duration,
54    ) -> Result<()> {
55        self.api.call(act!(actor => actor.put_mutable(signing_key, pub_key, salt, data, retry_count, timeout))).await
56    }
57}
58
59impl Actor for DhtActor {
60    async fn run(&mut self) -> Result<()> {
61        loop {
62            tokio::select! {
63                Some(action) = self.rx.recv() => {
64                    action(self).await;
65                }
66                _ = tokio::signal::ctrl_c() => {
67                    break;
68                }
69            }
70        }
71        Ok(())
72    }
73}
74
75impl DhtActor {
76    pub async fn get(
77        &mut self,
78        pub_key: VerifyingKey,
79        salt: Option<Vec<u8>>,
80        more_recent_then: Option<i64>,
81        timeout: Duration,
82    ) -> Result<Vec<MutableItem>> {
83        if self.dht.is_none() {
84            self.reset().await?;
85        }
86
87        let dht = self.dht.as_mut().context("DHT not initialized")?;
88        Ok(tokio::time::timeout(
89            timeout,
90            dht.get_mutable(pub_key.as_bytes(), salt.as_deref(), more_recent_then)
91                .collect::<Vec<_>>(),
92        )
93        .await?)
94    }
95
96    pub async fn put_mutable(
97        &mut self,
98        signing_key: SigningKey,
99        pub_key: VerifyingKey,
100        salt: Option<Vec<u8>>,
101        data: Vec<u8>,
102        retry_count: Option<usize>,
103        timeout: Duration,
104    ) -> Result<()> {
105        if self.dht.is_none() {
106            self.reset().await?;
107        }
108
109        for i in 0..retry_count.unwrap_or(RETRY_DEFAULT) {
110            let dht = self.dht.as_mut().context("DHT not initialized")?;
111
112            let most_recent_result = tokio::time::timeout(
113                timeout,
114                dht.get_mutable_most_recent(pub_key.as_bytes(), salt.as_deref()),
115            )
116            .await?;
117
118            let item = if let Some(mut_item) = most_recent_result {
119                MutableItem::new(
120                    signing_key.clone(),
121                    &data,
122                    mut_item.seq() + 1,
123                    salt.as_deref(),
124                )
125            } else {
126                MutableItem::new(signing_key.clone(), &data, 0, salt.as_deref())
127            };
128
129            let put_result = match tokio::time::timeout(
130                Duration::from_secs(10),
131                dht.put_mutable(item.clone(), Some(item.seq())),
132            )
133            .await
134            {
135                Ok(result) => result.ok(),
136                Err(_) => None,
137            };
138
139            if put_result.is_some() {
140                break;
141            } else if i == retry_count.unwrap_or(RETRY_DEFAULT) - 1 {
142                bail!("failed to publish record")
143            }
144
145            self.reset().await?;
146
147            tokio::time::sleep(Duration::from_millis(rand::random::<u64>() % 2000)).await;
148        }
149        Ok(())
150    }
151
152    async fn reset(&mut self) -> Result<()> {
153        self.dht = Some(mainline::Dht::builder().build()?.as_async());
154        Ok(())
155    }
156}