distributed_topic_tracker/
dht.rs

1use std::time::Duration;
2
3use crate::actor::{Action, Actor, Handle};
4use anyhow::{Context, Result, bail};
5use futures::StreamExt;
6use iroh::PublicKey;
7use mainline::{MutableItem, SigningKey};
8
9const RETRY_DEFAULT: usize = 3;
10
11#[derive(Debug,Clone)]
12pub struct Dht {
13    api: Handle<DhtActor>,
14}
15
16#[derive(Debug)]
17struct DhtActor {
18    rx: tokio::sync::mpsc::Receiver<Action<Self>>,
19    dht: Option<mainline::async_dht::AsyncDht>,
20}
21
22impl Dht {
23    pub fn new() -> Self {
24        let (api, rx) = Handle::channel(32);
25
26        tokio::spawn(async move {
27            let mut actor = DhtActor { rx, dht: None };
28            let _ = actor.run().await;
29        });
30
31        Self { api }
32    }
33
34    pub async fn get(
35        &self,
36        node_id: PublicKey,
37        salt: Option<Vec<u8>>,
38        more_recent_then: Option<i64>,
39        timeout: Duration,
40    ) -> Result<Vec<MutableItem>> {
41        self.api
42            .call(move |actor| Box::pin(actor.get(node_id, salt, more_recent_then, timeout)))
43            .await
44    }
45
46    pub async fn put_mutable(
47        &self,
48        signing_key: SigningKey,
49        node_id: PublicKey,
50        salt: Option<Vec<u8>>,
51        data: Vec<u8>,
52        retry_count: Option<usize>,
53        timeout: Duration,
54    ) -> Result<()> {
55        self.api
56            .call(move |actor| {
57                Box::pin(actor.put_mutable(signing_key, node_id, salt, data, retry_count, timeout))
58            })
59            .await
60    }
61}
62
63impl Actor for DhtActor {
64    async fn run(&mut self) -> Result<()> {
65        loop {
66            tokio::select! {
67                Some(action) = self.rx.recv() => {
68                    action(self).await;
69                }
70                _ = tokio::signal::ctrl_c() => {
71                    break;
72                }
73            }
74        }
75        Ok(())
76    }
77}
78
79impl DhtActor {
80    pub async fn get(
81        &mut self,
82        node_id: PublicKey,
83        salt: Option<Vec<u8>>,
84        more_recent_then: Option<i64>,
85        timeout: Duration,
86    ) -> Result<Vec<MutableItem>> {
87        if self.dht.is_none() {
88            self.reset().await?;
89        }
90
91        let dht = self.dht.as_mut().context("DHT not initialized")?;
92        Ok(tokio::time::timeout(
93            timeout,
94            dht.get_mutable(node_id.as_bytes(), salt.as_deref(), more_recent_then)
95                .collect::<Vec<_>>(),
96        )
97        .await?)
98    }
99
100    pub async fn put_mutable(
101        &mut self,
102        signing_key: SigningKey,
103        node_id: PublicKey,
104        salt: Option<Vec<u8>>,
105        data: Vec<u8>,
106        retry_count: Option<usize>,
107        timeout: Duration,
108    ) -> Result<()> {
109        if self.dht.is_none() {
110            self.reset().await?;
111        }
112
113        for i in 0..retry_count.unwrap_or(RETRY_DEFAULT) {
114            let dht = self.dht.as_mut().context("DHT not initialized")?;
115
116            let most_recent_result = tokio::time::timeout(
117                timeout,
118                dht.get_mutable_most_recent(node_id.as_bytes(), salt.as_deref()),
119            )
120            .await?;
121
122            let item = if let Some(mut_item) = most_recent_result {
123                MutableItem::new(
124                    signing_key.clone(),
125                    &data,
126                    mut_item.seq() + 1,
127                    salt.as_deref(),
128                )
129            } else {
130                MutableItem::new(signing_key.clone(), &data, 0, salt.as_deref())
131            };
132
133            let put_result = match tokio::time::timeout(
134                Duration::from_secs(10),
135                dht.put_mutable(item.clone(), Some(item.seq())),
136            )
137            .await
138            {
139                Ok(result) => result.ok(),
140                Err(_) => None,
141            };
142
143            if put_result.is_some() {
144                break;
145            } else if i == retry_count.unwrap_or(RETRY_DEFAULT) - 1 {
146                bail!("failed to publish record")
147            }
148
149            self.reset().await?;
150
151            tokio::time::sleep(Duration::from_millis(rand::random::<u64>() % 2000)).await;
152        }
153        Ok(())
154    }
155
156    async fn reset(&mut self) -> Result<()> {
157        self.dht = Some(mainline::Dht::builder().build()?.as_async());
158        Ok(())
159    }
160}