distributed_topic_tracker/
dht.rs1use std::time::Duration;
2
3use actor_helper::{Action, Actor, Handle, act};
4use anyhow::{Context, Result, bail};
5use ed25519_dalek::VerifyingKey;
6use futures_lite::StreamExt;
7use mainline::{MutableItem, SigningKey};
8
9const RETRY_DEFAULT: usize = 3;
10
11#[derive(Debug, Clone)]
12pub struct Dht {
13 api: Handle<DhtActor>,
14}
15
16#[derive(Debug)]
17struct DhtActor {
18 rx: tokio::sync::mpsc::Receiver<Action<Self>>,
19 dht: Option<mainline::async_dht::AsyncDht>,
20}
21
22impl Dht {
23 pub fn new() -> Self {
24 let (api, rx) = Handle::channel(32);
25
26 tokio::spawn(async move {
27 let mut actor = DhtActor { rx, dht: None };
28 let _ = actor.run().await;
29 });
30
31 Self { api }
32 }
33
34 pub async fn get(
35 &self,
36 pub_key: VerifyingKey,
37 salt: Option<Vec<u8>>,
38 more_recent_then: Option<i64>,
39 timeout: Duration,
40 ) -> Result<Vec<MutableItem>> {
41 self.api
42 .call(act!(actor => actor.get(pub_key, salt, more_recent_then, timeout)))
43 .await
44 }
45
46 pub async fn put_mutable(
47 &self,
48 signing_key: SigningKey,
49 pub_key: VerifyingKey,
50 salt: Option<Vec<u8>>,
51 data: Vec<u8>,
52 retry_count: Option<usize>,
53 timeout: Duration,
54 ) -> Result<()> {
55 self.api.call(act!(actor => actor.put_mutable(signing_key, pub_key, salt, data, retry_count, timeout))).await
56 }
57}
58
59impl Actor for DhtActor {
60 async fn run(&mut self) -> Result<()> {
61 loop {
62 tokio::select! {
63 Some(action) = self.rx.recv() => {
64 action(self).await;
65 }
66 _ = tokio::signal::ctrl_c() => {
67 break;
68 }
69 }
70 }
71 Ok(())
72 }
73}
74
75impl DhtActor {
76 pub async fn get(
77 &mut self,
78 pub_key: VerifyingKey,
79 salt: Option<Vec<u8>>,
80 more_recent_then: Option<i64>,
81 timeout: Duration,
82 ) -> Result<Vec<MutableItem>> {
83 if self.dht.is_none() {
84 self.reset().await?;
85 }
86
87 let dht = self.dht.as_mut().context("DHT not initialized")?;
88 Ok(tokio::time::timeout(
89 timeout,
90 dht.get_mutable(pub_key.as_bytes(), salt.as_deref(), more_recent_then)
91 .collect::<Vec<_>>(),
92 )
93 .await?)
94 }
95
96 pub async fn put_mutable(
97 &mut self,
98 signing_key: SigningKey,
99 pub_key: VerifyingKey,
100 salt: Option<Vec<u8>>,
101 data: Vec<u8>,
102 retry_count: Option<usize>,
103 timeout: Duration,
104 ) -> Result<()> {
105 if self.dht.is_none() {
106 self.reset().await?;
107 }
108
109 for i in 0..retry_count.unwrap_or(RETRY_DEFAULT) {
110 let dht = self.dht.as_mut().context("DHT not initialized")?;
111
112 let most_recent_result = tokio::time::timeout(
113 timeout,
114 dht.get_mutable_most_recent(pub_key.as_bytes(), salt.as_deref()),
115 )
116 .await?;
117
118 let item = if let Some(mut_item) = most_recent_result {
119 MutableItem::new(
120 signing_key.clone(),
121 &data,
122 mut_item.seq() + 1,
123 salt.as_deref(),
124 )
125 } else {
126 MutableItem::new(signing_key.clone(), &data, 0, salt.as_deref())
127 };
128
129 let put_result = match tokio::time::timeout(
130 Duration::from_secs(10),
131 dht.put_mutable(item.clone(), Some(item.seq())),
132 )
133 .await
134 {
135 Ok(result) => result.ok(),
136 Err(_) => None,
137 };
138
139 if put_result.is_some() {
140 break;
141 } else if i == retry_count.unwrap_or(RETRY_DEFAULT) - 1 {
142 bail!("failed to publish record")
143 }
144
145 self.reset().await?;
146
147 tokio::time::sleep(Duration::from_millis(rand::random::<u64>() % 2000)).await;
148 }
149 Ok(())
150 }
151
152 async fn reset(&mut self) -> Result<()> {
153 self.dht = Some(mainline::Dht::builder().build()?.as_async());
154 Ok(())
155 }
156}