distributed_topic_tracker/
dht.rs1use std::time::Duration;
2
3use crate::actor::{Action, Actor, Handle};
4use anyhow::{Context, Result, bail};
5use futures::StreamExt;
6use iroh::PublicKey;
7use mainline::{MutableItem, SigningKey};
8
9const RETRY_DEFAULT: usize = 3;
10
11#[derive(Debug,Clone)]
12pub struct Dht {
13 api: Handle<DhtActor>,
14}
15
16#[derive(Debug)]
17struct DhtActor {
18 rx: tokio::sync::mpsc::Receiver<Action<Self>>,
19 dht: Option<mainline::async_dht::AsyncDht>,
20}
21
22impl Dht {
23 pub fn new() -> Self {
24 let (api, rx) = Handle::channel(32);
25
26 tokio::spawn(async move {
27 let mut actor = DhtActor { rx, dht: None };
28 let _ = actor.run().await;
29 });
30
31 Self { api }
32 }
33
34 pub async fn get(
35 &self,
36 node_id: PublicKey,
37 salt: Option<Vec<u8>>,
38 more_recent_then: Option<i64>,
39 timeout: Duration,
40 ) -> Result<Vec<MutableItem>> {
41 self.api
42 .call(move |actor| Box::pin(actor.get(node_id, salt, more_recent_then, timeout)))
43 .await
44 }
45
46 pub async fn put_mutable(
47 &self,
48 signing_key: SigningKey,
49 node_id: PublicKey,
50 salt: Option<Vec<u8>>,
51 data: Vec<u8>,
52 retry_count: Option<usize>,
53 timeout: Duration,
54 ) -> Result<()> {
55 self.api
56 .call(move |actor| {
57 Box::pin(actor.put_mutable(signing_key, node_id, salt, data, retry_count, timeout))
58 })
59 .await
60 }
61}
62
63impl Actor for DhtActor {
64 async fn run(&mut self) -> Result<()> {
65 loop {
66 tokio::select! {
67 Some(action) = self.rx.recv() => {
68 action(self).await;
69 }
70 _ = tokio::signal::ctrl_c() => {
71 break;
72 }
73 }
74 }
75 Ok(())
76 }
77}
78
79impl DhtActor {
80 pub async fn get(
81 &mut self,
82 node_id: PublicKey,
83 salt: Option<Vec<u8>>,
84 more_recent_then: Option<i64>,
85 timeout: Duration,
86 ) -> Result<Vec<MutableItem>> {
87 if self.dht.is_none() {
88 self.reset().await?;
89 }
90
91 let dht = self.dht.as_mut().context("DHT not initialized")?;
92 Ok(tokio::time::timeout(
93 timeout,
94 dht.get_mutable(node_id.as_bytes(), salt.as_deref(), more_recent_then)
95 .collect::<Vec<_>>(),
96 )
97 .await?)
98 }
99
100 pub async fn put_mutable(
101 &mut self,
102 signing_key: SigningKey,
103 node_id: PublicKey,
104 salt: Option<Vec<u8>>,
105 data: Vec<u8>,
106 retry_count: Option<usize>,
107 timeout: Duration,
108 ) -> Result<()> {
109 if self.dht.is_none() {
110 self.reset().await?;
111 }
112
113 for i in 0..retry_count.unwrap_or(RETRY_DEFAULT) {
114 let dht = self.dht.as_mut().context("DHT not initialized")?;
115
116 let most_recent_result = tokio::time::timeout(
117 timeout,
118 dht.get_mutable_most_recent(node_id.as_bytes(), salt.as_deref()),
119 )
120 .await?;
121
122 let item = if let Some(mut_item) = most_recent_result {
123 MutableItem::new(
124 signing_key.clone(),
125 &data,
126 mut_item.seq() + 1,
127 salt.as_deref(),
128 )
129 } else {
130 MutableItem::new(signing_key.clone(), &data, 0, salt.as_deref())
131 };
132
133 let put_result = match tokio::time::timeout(
134 Duration::from_secs(10),
135 dht.put_mutable(item.clone(), Some(item.seq())),
136 )
137 .await
138 {
139 Ok(result) => result.ok(),
140 Err(_) => None,
141 };
142
143 if put_result.is_some() {
144 break;
145 } else if i == retry_count.unwrap_or(RETRY_DEFAULT) - 1 {
146 bail!("failed to publish record")
147 }
148
149 self.reset().await?;
150
151 tokio::time::sleep(Duration::from_millis(rand::random::<u64>() % 2000)).await;
152 }
153 Ok(())
154 }
155
156 async fn reset(&mut self) -> Result<()> {
157 self.dht = Some(mainline::Dht::builder().build()?.as_async());
158 Ok(())
159 }
160}