1use anyhow::Result;
9use hashtree_blossom::BlossomClient;
10use hashtree_config::detect_local_daemon_url;
11use hashtree_core::{decode_tree_node, to_hex};
12use nostr::Keys;
13use std::collections::VecDeque;
14use std::sync::Arc;
15use std::time::Duration;
16use tracing::debug;
17
18use crate::config::Config as CliConfig;
19use crate::storage::HashtreeStore;
20use crate::webrtc::WebRTCState;
21
22#[derive(Clone)]
24pub struct FetchConfig {
25 pub webrtc_timeout: Duration,
27 pub blossom_timeout: Duration,
29}
30
31impl Default for FetchConfig {
32 fn default() -> Self {
33 Self {
34 webrtc_timeout: Duration::from_millis(2000),
35 blossom_timeout: Duration::from_millis(10000),
36 }
37 }
38}
39
40pub struct Fetcher {
42 config: FetchConfig,
43 blossom: BlossomClient,
44}
45
46impl Fetcher {
47 pub fn new(config: FetchConfig) -> Self {
50 let keys = Keys::generate();
52 let blossom = BlossomClient::new(keys)
53 .with_timeout(config.blossom_timeout);
54 let blossom = with_local_daemon_read(blossom);
55
56 Self { config, blossom }
57 }
58
59 pub fn with_keys(config: FetchConfig, keys: Keys) -> Self {
61 let blossom = BlossomClient::new(keys)
62 .with_timeout(config.blossom_timeout);
63 let blossom = with_local_daemon_read(blossom);
64
65 Self { config, blossom }
66 }
67
68 pub fn blossom(&self) -> &BlossomClient {
70 &self.blossom
71 }
72
73 pub async fn fetch_chunk(
75 &self,
76 webrtc_state: Option<&Arc<WebRTCState>>,
77 hash_hex: &str,
78 ) -> Result<Vec<u8>> {
79 let short_hash = if hash_hex.len() >= 12 {
80 &hash_hex[..12]
81 } else {
82 hash_hex
83 };
84
85 if let Some(state) = webrtc_state {
87 debug!("Trying WebRTC for {}", short_hash);
88 let webrtc_result = tokio::time::timeout(
89 self.config.webrtc_timeout,
90 state.request_from_peers(hash_hex),
91 )
92 .await;
93
94 if let Ok(Some(data)) = webrtc_result {
95 debug!("Got {} from WebRTC ({} bytes)", short_hash, data.len());
96 return Ok(data);
97 }
98 }
99
100 debug!("Trying Blossom for {}", short_hash);
102 match self.blossom.download(hash_hex).await {
103 Ok(data) => {
104 debug!("Got {} from Blossom ({} bytes)", short_hash, data.len());
105 Ok(data)
106 }
107 Err(e) => {
108 debug!("Blossom download failed for {}: {}", short_hash, e);
109 Err(anyhow::anyhow!("Failed to fetch {} from any source: {}", short_hash, e))
110 }
111 }
112 }
113
114 pub async fn fetch_chunk_with_store(
116 &self,
117 store: &HashtreeStore,
118 webrtc_state: Option<&Arc<WebRTCState>>,
119 hash: &[u8; 32],
120 ) -> Result<Vec<u8>> {
121 if let Some(data) = store.get_chunk(hash)? {
123 return Ok(data);
124 }
125
126 let hash_hex = to_hex(hash);
128 let data = self.fetch_chunk(webrtc_state, &hash_hex).await?;
129 store.put_blob(&data)?;
130 Ok(data)
131 }
132
133 pub async fn fetch_tree(
136 &self,
137 store: &HashtreeStore,
138 webrtc_state: Option<&Arc<WebRTCState>>,
139 root_hash: &[u8; 32],
140 ) -> Result<(usize, u64)> {
141 self.fetch_tree_parallel(store, webrtc_state, root_hash, 1).await
142 }
143
144 pub async fn fetch_tree_parallel(
148 &self,
149 store: &HashtreeStore,
150 webrtc_state: Option<&Arc<WebRTCState>>,
151 root_hash: &[u8; 32],
152 concurrency: usize,
153 ) -> Result<(usize, u64)> {
154 use futures::stream::{FuturesUnordered, StreamExt};
155 use std::collections::HashSet;
156 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
157
158 if store.blob_exists(root_hash)? {
160 return Ok((0, 0));
161 }
162
163 let chunks_fetched = Arc::new(AtomicUsize::new(0));
164 let bytes_fetched = Arc::new(AtomicU64::new(0));
165
166 let mut queued: HashSet<[u8; 32]> = HashSet::new();
168 let mut pending: VecDeque<[u8; 32]> = VecDeque::new();
169
170 pending.push_back(*root_hash);
172 queued.insert(*root_hash);
173
174 let mut active = FuturesUnordered::new();
175
176 loop {
177 while active.len() < concurrency {
179 if let Some(hash) = pending.pop_front() {
180 if store.blob_exists(&hash).unwrap_or(false) {
182 continue;
183 }
184
185 let hash_hex = to_hex(&hash);
186 let blossom = self.blossom.clone();
187 let webrtc = webrtc_state.map(Arc::clone);
188 let timeout = self.config.webrtc_timeout;
189
190 let fut = async move {
191 if let Some(state) = &webrtc {
193 if let Ok(Some(data)) = tokio::time::timeout(
194 timeout,
195 state.request_from_peers(&hash_hex),
196 )
197 .await
198 {
199 return (hash, Ok(data));
200 }
201 }
202 let data = blossom.download(&hash_hex).await;
204 (hash, data)
205 };
206 active.push(fut);
207 } else {
208 break;
209 }
210 }
211
212 if active.is_empty() {
214 break;
215 }
216
217 if let Some((hash, result)) = active.next().await {
219 match result {
220 Ok(data) => {
221 store.put_blob(&data)?;
223 chunks_fetched.fetch_add(1, Ordering::Relaxed);
224 bytes_fetched.fetch_add(data.len() as u64, Ordering::Relaxed);
225
226 if let Ok(node) = decode_tree_node(&data) {
228 for link in node.links {
229 if !queued.contains(&link.hash) {
230 queued.insert(link.hash);
231 pending.push_back(link.hash);
232 }
233 }
234 }
235 }
236 Err(e) => {
237 debug!("Failed to fetch {}: {}", to_hex(&hash), e);
238 }
240 }
241 }
242 }
243
244 Ok((
245 chunks_fetched.load(Ordering::Relaxed),
246 bytes_fetched.load(Ordering::Relaxed),
247 ))
248 }
249
250 pub async fn fetch_file(
253 &self,
254 store: &HashtreeStore,
255 webrtc_state: Option<&Arc<WebRTCState>>,
256 hash: &[u8; 32],
257 ) -> Result<Option<Vec<u8>>> {
258 if let Some(content) = store.get_file(hash)? {
260 return Ok(Some(content));
261 }
262
263 self.fetch_tree(store, webrtc_state, hash).await?;
265
266 store.get_file(hash)
268 }
269
270 pub async fn fetch_directory(
272 &self,
273 store: &HashtreeStore,
274 webrtc_state: Option<&Arc<WebRTCState>>,
275 hash: &[u8; 32],
276 ) -> Result<Option<crate::storage::DirectoryListing>> {
277 if let Ok(Some(listing)) = store.get_directory_listing(hash) {
279 return Ok(Some(listing));
280 }
281
282 self.fetch_tree(store, webrtc_state, hash).await?;
284
285 store.get_directory_listing(hash)
287 }
288
289 pub async fn upload(&self, data: &[u8]) -> Result<String> {
291 self.blossom
292 .upload(data)
293 .await
294 .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
295 }
296
297 pub async fn upload_if_missing(&self, data: &[u8]) -> Result<(String, bool)> {
299 self.blossom
300 .upload_if_missing(data)
301 .await
302 .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
303 }
304}
305
306fn with_local_daemon_read(blossom: BlossomClient) -> BlossomClient {
307 let bind_address = CliConfig::load().ok().map(|cfg| cfg.server.bind_address);
308 let local_url = detect_local_daemon_url(bind_address.as_deref());
309 let Some(local_url) = local_url else {
310 return blossom;
311 };
312
313 let mut servers = blossom.read_servers().to_vec();
314 if servers.iter().any(|server| server == &local_url) {
315 return blossom;
316 }
317 servers.insert(0, local_url);
318 blossom.with_read_servers(servers)
319}