1use anyhow::Result;
9use hashtree_blossom::BlossomClient;
10use hashtree_config::detect_local_daemon_url;
11use hashtree_core::{to_hex, Cid, HashTree, HashTreeConfig, Link};
12use nostr::Keys;
13use std::collections::{HashSet, VecDeque};
14use std::sync::Arc;
15use std::time::Duration;
16use tracing::debug;
17
18use crate::config::Config as CliConfig;
19use crate::storage::HashtreeStore;
20use crate::webrtc::WebRTCState;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22
23fn child_cid(parent: &Cid, link: &Link) -> Cid {
24 let inherits_parent_key = link
25 .name
26 .as_deref()
27 .map(|name| {
28 name.starts_with("_chunk_")
29 || (name.starts_with('_') && name.chars().count() == 2 && link.link_type.is_tree())
30 })
31 .unwrap_or(false);
32
33 Cid {
34 hash: link.hash,
35 key: link.key.or(if inherits_parent_key {
36 parent.key
37 } else {
38 None
39 }),
40 }
41}
42
43#[derive(Debug, Default)]
44pub struct FetchProgress {
45 chunks_fetched: AtomicUsize,
46 bytes_fetched: AtomicU64,
47}
48
49#[derive(Debug, Clone, Copy, Default)]
50pub struct FetchProgressSnapshot {
51 pub chunks_fetched: usize,
52 pub bytes_fetched: u64,
53}
54
55impl FetchProgress {
56 pub fn new() -> Self {
57 Self::default()
58 }
59
60 pub fn snapshot(&self) -> FetchProgressSnapshot {
61 FetchProgressSnapshot {
62 chunks_fetched: self.chunks_fetched.load(Ordering::Relaxed),
63 bytes_fetched: self.bytes_fetched.load(Ordering::Relaxed),
64 }
65 }
66
67 fn record_chunk(&self, byte_len: usize) {
68 self.chunks_fetched.fetch_add(1, Ordering::Relaxed);
69 self.bytes_fetched
70 .fetch_add(byte_len as u64, Ordering::Relaxed);
71 }
72}
73
74#[derive(Clone)]
76pub struct FetchConfig {
77 pub webrtc_timeout: Duration,
79 pub blossom_timeout: Duration,
81}
82
83impl Default for FetchConfig {
84 fn default() -> Self {
85 Self {
86 webrtc_timeout: Duration::from_millis(2000),
87 blossom_timeout: Duration::from_millis(10000),
88 }
89 }
90}
91
92pub struct Fetcher {
94 config: FetchConfig,
95 blossom: BlossomClient,
96}
97
98impl Fetcher {
99 pub fn new(config: FetchConfig) -> Self {
102 let keys = Keys::generate();
104 let blossom = BlossomClient::new(keys).with_timeout(config.blossom_timeout);
105 let blossom = with_local_daemon_read(blossom);
106
107 Self { config, blossom }
108 }
109
110 pub fn with_keys(config: FetchConfig, keys: Keys) -> Self {
112 let blossom = BlossomClient::new(keys).with_timeout(config.blossom_timeout);
113 let blossom = with_local_daemon_read(blossom);
114
115 Self { config, blossom }
116 }
117
118 pub fn blossom(&self) -> &BlossomClient {
120 &self.blossom
121 }
122
123 pub async fn fetch_chunk(
125 &self,
126 webrtc_state: Option<&Arc<WebRTCState>>,
127 hash_hex: &str,
128 ) -> Result<Vec<u8>> {
129 let short_hash = if hash_hex.len() >= 12 {
130 &hash_hex[..12]
131 } else {
132 hash_hex
133 };
134
135 if let Some(state) = webrtc_state {
137 debug!("Trying WebRTC for {}", short_hash);
138 let webrtc_result = tokio::time::timeout(
139 self.config.webrtc_timeout,
140 state.request_from_peers(hash_hex),
141 )
142 .await;
143
144 if let Ok(Some(data)) = webrtc_result {
145 debug!("Got {} from WebRTC ({} bytes)", short_hash, data.len());
146 return Ok(data);
147 }
148 }
149
150 debug!("Trying Blossom for {}", short_hash);
152 match self.blossom.download(hash_hex).await {
153 Ok(data) => {
154 debug!("Got {} from Blossom ({} bytes)", short_hash, data.len());
155 Ok(data)
156 }
157 Err(e) => {
158 debug!("Blossom download failed for {}: {}", short_hash, e);
159 Err(anyhow::anyhow!(
160 "Failed to fetch {} from any source: {}",
161 short_hash,
162 e
163 ))
164 }
165 }
166 }
167
168 pub async fn fetch_chunk_with_store(
170 &self,
171 store: &HashtreeStore,
172 webrtc_state: Option<&Arc<WebRTCState>>,
173 hash: &[u8; 32],
174 ) -> Result<Vec<u8>> {
175 if let Some(data) = store.get_chunk(hash)? {
177 return Ok(data);
178 }
179
180 let hash_hex = to_hex(hash);
182 let data = self.fetch_chunk(webrtc_state, &hash_hex).await?;
183 store.put_cached_blob(&data)?;
184 Ok(data)
185 }
186
187 pub async fn fetch_tree(
190 &self,
191 store: &HashtreeStore,
192 webrtc_state: Option<&Arc<WebRTCState>>,
193 root_hash: &[u8; 32],
194 ) -> Result<(usize, u64)> {
195 self.fetch_cid_tree(store, webrtc_state, &Cid::public(*root_hash))
196 .await
197 }
198
199 pub async fn fetch_cid_tree(
201 &self,
202 store: &HashtreeStore,
203 webrtc_state: Option<&Arc<WebRTCState>>,
204 root_cid: &Cid,
205 ) -> Result<(usize, u64)> {
206 self.fetch_cid_tree_with_progress(store, webrtc_state, root_cid, None)
207 .await
208 }
209
210 pub async fn fetch_cid_tree_with_progress(
212 &self,
213 store: &HashtreeStore,
214 webrtc_state: Option<&Arc<WebRTCState>>,
215 root_cid: &Cid,
216 progress: Option<&FetchProgress>,
217 ) -> Result<(usize, u64)> {
218 self.fetch_cid_tree_parallel_with_progress(store, webrtc_state, root_cid, 1, progress)
219 .await
220 }
221
222 pub async fn fetch_tree_parallel(
226 &self,
227 store: &HashtreeStore,
228 webrtc_state: Option<&Arc<WebRTCState>>,
229 root_hash: &[u8; 32],
230 concurrency: usize,
231 ) -> Result<(usize, u64)> {
232 self.fetch_cid_tree_parallel(store, webrtc_state, &Cid::public(*root_hash), concurrency)
233 .await
234 }
235
236 pub async fn fetch_cid_tree_parallel(
238 &self,
239 store: &HashtreeStore,
240 webrtc_state: Option<&Arc<WebRTCState>>,
241 root_cid: &Cid,
242 concurrency: usize,
243 ) -> Result<(usize, u64)> {
244 self.fetch_cid_tree_parallel_with_progress(store, webrtc_state, root_cid, concurrency, None)
245 .await
246 }
247
248 pub async fn fetch_cid_tree_parallel_with_progress(
250 &self,
251 store: &HashtreeStore,
252 webrtc_state: Option<&Arc<WebRTCState>>,
253 root_cid: &Cid,
254 concurrency: usize,
255 progress: Option<&FetchProgress>,
256 ) -> Result<(usize, u64)> {
257 use futures::stream::{FuturesUnordered, StreamExt};
258
259 let chunks_fetched = Arc::new(AtomicUsize::new(0));
260 let bytes_fetched = Arc::new(AtomicU64::new(0));
261 let mut queued: HashSet<[u8; 32]> = HashSet::new();
262 let mut pending: VecDeque<Cid> = VecDeque::new();
263
264 pending.push_back(root_cid.clone());
265 queued.insert(root_cid.hash);
266
267 let mut active = FuturesUnordered::new();
268 let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
269
270 loop {
271 while active.len() < concurrency {
273 if let Some(cid) = pending.pop_front() {
274 if store.blob_exists(&cid.hash).unwrap_or(false) {
275 if let Some(node) = tree.get_node(&cid).await? {
276 for link in node.links {
277 let child = child_cid(&cid, &link);
278 if queued.insert(child.hash) {
279 pending.push_back(child);
280 }
281 }
282 }
283 continue;
284 }
285
286 let hash_hex = to_hex(&cid.hash);
287 let blossom = self.blossom.clone();
288 let webrtc = webrtc_state.map(Arc::clone);
289 let timeout = self.config.webrtc_timeout;
290
291 let fut = async move {
292 if let Some(state) = &webrtc {
294 if let Ok(Some(data)) =
295 tokio::time::timeout(timeout, state.request_from_peers(&hash_hex))
296 .await
297 {
298 return (cid, Ok(data));
299 }
300 }
301 let data = blossom.download(&hash_hex).await;
303 (cid, data)
304 };
305 active.push(fut);
306 } else {
307 break;
308 }
309 }
310
311 if active.is_empty() {
313 break;
314 }
315
316 if let Some((cid, result)) = active.next().await {
318 match result {
319 Ok(data) => {
320 store.put_cached_blob(&data)?;
322 if let Some(progress) = progress {
323 progress.record_chunk(data.len());
324 }
325 chunks_fetched.fetch_add(1, Ordering::Relaxed);
326 bytes_fetched.fetch_add(data.len() as u64, Ordering::Relaxed);
327
328 if let Some(node) = tree.get_node(&cid).await? {
329 for link in node.links {
330 let child = child_cid(&cid, &link);
331 if queued.insert(child.hash) {
332 pending.push_back(child);
333 }
334 }
335 }
336 }
337 Err(e) => {
338 debug!("Failed to fetch {}: {}", to_hex(&cid.hash), e);
339 }
341 }
342 }
343 }
344
345 Ok((
346 chunks_fetched.load(Ordering::Relaxed),
347 bytes_fetched.load(Ordering::Relaxed),
348 ))
349 }
350
351 pub async fn fetch_file(
354 &self,
355 store: &HashtreeStore,
356 webrtc_state: Option<&Arc<WebRTCState>>,
357 hash: &[u8; 32],
358 ) -> Result<Option<Vec<u8>>> {
359 if let Some(content) = store.get_file(hash)? {
361 return Ok(Some(content));
362 }
363
364 self.fetch_tree(store, webrtc_state, hash).await?;
366
367 store.get_file(hash)
369 }
370
371 pub async fn fetch_directory(
373 &self,
374 store: &HashtreeStore,
375 webrtc_state: Option<&Arc<WebRTCState>>,
376 hash: &[u8; 32],
377 ) -> Result<Option<crate::storage::DirectoryListing>> {
378 if let Ok(Some(listing)) = store.get_directory_listing(hash) {
380 return Ok(Some(listing));
381 }
382
383 self.fetch_tree(store, webrtc_state, hash).await?;
385
386 store.get_directory_listing(hash)
388 }
389
390 pub async fn upload(&self, data: &[u8]) -> Result<String> {
392 self.blossom
393 .upload(data)
394 .await
395 .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
396 }
397
398 pub async fn upload_if_missing(&self, data: &[u8]) -> Result<(String, bool)> {
400 self.blossom
401 .upload_if_missing(data)
402 .await
403 .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
404 }
405}
406
407fn with_local_daemon_read(blossom: BlossomClient) -> BlossomClient {
408 let bind_address = CliConfig::load().ok().map(|cfg| cfg.server.bind_address);
409 let local_url = detect_local_daemon_url(bind_address.as_deref());
410 let Some(local_url) = local_url else {
411 return blossom;
412 };
413
414 let mut servers = blossom.read_servers().to_vec();
415 if servers.iter().any(|server| server == &local_url) {
416 return blossom;
417 }
418 servers.insert(0, local_url);
419 blossom.with_read_servers(servers)
420}