Skip to main content

hashtree_cli/
fetch.rs

1//! Remote content fetching with WebRTC and Blossom fallback
2//!
3//! Provides shared logic for fetching content from:
4//! 1. Local storage (first)
5//! 2. WebRTC peers (second)
6//! 3. Blossom HTTP servers (fallback)
7
8use anyhow::Result;
9use hashtree_blossom::BlossomClient;
10use hashtree_config::detect_local_daemon_url;
11use hashtree_core::{to_hex, Cid, HashTree, HashTreeConfig, Link};
12use nostr::Keys;
13use std::collections::{HashSet, VecDeque};
14use std::sync::Arc;
15use std::time::Duration;
16use tracing::debug;
17
18use crate::config::Config as CliConfig;
19use crate::storage::HashtreeStore;
20use crate::webrtc::WebRTCState;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22
23fn child_cid(parent: &Cid, link: &Link) -> Cid {
24    let inherits_parent_key = link
25        .name
26        .as_deref()
27        .map(|name| {
28            name.starts_with("_chunk_")
29                || (name.starts_with('_') && name.chars().count() == 2 && link.link_type.is_tree())
30        })
31        .unwrap_or(false);
32
33    Cid {
34        hash: link.hash,
35        key: link.key.or(if inherits_parent_key {
36            parent.key
37        } else {
38            None
39        }),
40    }
41}
42
43#[derive(Debug, Default)]
44pub struct FetchProgress {
45    chunks_fetched: AtomicUsize,
46    bytes_fetched: AtomicU64,
47}
48
49#[derive(Debug, Clone, Copy, Default)]
50pub struct FetchProgressSnapshot {
51    pub chunks_fetched: usize,
52    pub bytes_fetched: u64,
53}
54
55impl FetchProgress {
56    pub fn new() -> Self {
57        Self::default()
58    }
59
60    pub fn snapshot(&self) -> FetchProgressSnapshot {
61        FetchProgressSnapshot {
62            chunks_fetched: self.chunks_fetched.load(Ordering::Relaxed),
63            bytes_fetched: self.bytes_fetched.load(Ordering::Relaxed),
64        }
65    }
66
67    fn record_chunk(&self, byte_len: usize) {
68        self.chunks_fetched.fetch_add(1, Ordering::Relaxed);
69        self.bytes_fetched
70            .fetch_add(byte_len as u64, Ordering::Relaxed);
71    }
72}
73
74/// Configuration for remote fetching
75#[derive(Clone)]
76pub struct FetchConfig {
77    /// Timeout for WebRTC requests
78    pub webrtc_timeout: Duration,
79    /// Timeout for Blossom requests
80    pub blossom_timeout: Duration,
81}
82
83impl Default for FetchConfig {
84    fn default() -> Self {
85        Self {
86            webrtc_timeout: Duration::from_millis(2000),
87            blossom_timeout: Duration::from_millis(10000),
88        }
89    }
90}
91
92/// Fetcher for remote content
93pub struct Fetcher {
94    config: FetchConfig,
95    blossom: BlossomClient,
96}
97
98impl Fetcher {
99    /// Create a new fetcher with the given config
100    /// BlossomClient auto-loads servers from ~/.hashtree/config.toml
101    pub fn new(config: FetchConfig) -> Self {
102        // Generate ephemeral keys for downloads (no signing needed)
103        let keys = Keys::generate();
104        let blossom = BlossomClient::new(keys).with_timeout(config.blossom_timeout);
105        let blossom = with_local_daemon_read(blossom);
106
107        Self { config, blossom }
108    }
109
110    /// Create a new fetcher with specific keys (for authenticated uploads)
111    pub fn with_keys(config: FetchConfig, keys: Keys) -> Self {
112        let blossom = BlossomClient::new(keys).with_timeout(config.blossom_timeout);
113        let blossom = with_local_daemon_read(blossom);
114
115        Self { config, blossom }
116    }
117
118    /// Get the underlying BlossomClient
119    pub fn blossom(&self) -> &BlossomClient {
120        &self.blossom
121    }
122
123    /// Fetch a single chunk by hash, trying WebRTC first then Blossom
124    pub async fn fetch_chunk(
125        &self,
126        webrtc_state: Option<&Arc<WebRTCState>>,
127        hash_hex: &str,
128    ) -> Result<Vec<u8>> {
129        let short_hash = if hash_hex.len() >= 12 {
130            &hash_hex[..12]
131        } else {
132            hash_hex
133        };
134
135        // Try WebRTC first
136        if let Some(state) = webrtc_state {
137            debug!("Trying WebRTC for {}", short_hash);
138            let webrtc_result = tokio::time::timeout(
139                self.config.webrtc_timeout,
140                state.request_from_peers(hash_hex),
141            )
142            .await;
143
144            if let Ok(Some(data)) = webrtc_result {
145                debug!("Got {} from WebRTC ({} bytes)", short_hash, data.len());
146                return Ok(data);
147            }
148        }
149
150        // Fallback to Blossom
151        debug!("Trying Blossom for {}", short_hash);
152        match self.blossom.download(hash_hex).await {
153            Ok(data) => {
154                debug!("Got {} from Blossom ({} bytes)", short_hash, data.len());
155                Ok(data)
156            }
157            Err(e) => {
158                debug!("Blossom download failed for {}: {}", short_hash, e);
159                Err(anyhow::anyhow!(
160                    "Failed to fetch {} from any source: {}",
161                    short_hash,
162                    e
163                ))
164            }
165        }
166    }
167
168    /// Fetch a chunk, checking local storage first
169    pub async fn fetch_chunk_with_store(
170        &self,
171        store: &HashtreeStore,
172        webrtc_state: Option<&Arc<WebRTCState>>,
173        hash: &[u8; 32],
174    ) -> Result<Vec<u8>> {
175        // Check local storage first
176        if let Some(data) = store.get_chunk(hash)? {
177            return Ok(data);
178        }
179
180        // Fetch remotely and store
181        let hash_hex = to_hex(hash);
182        let data = self.fetch_chunk(webrtc_state, &hash_hex).await?;
183        store.put_cached_blob(&data)?;
184        Ok(data)
185    }
186
187    /// Fetch an entire tree (all chunks recursively) - sequential version
188    /// Returns (chunks_fetched, bytes_fetched)
189    pub async fn fetch_tree(
190        &self,
191        store: &HashtreeStore,
192        webrtc_state: Option<&Arc<WebRTCState>>,
193        root_hash: &[u8; 32],
194    ) -> Result<(usize, u64)> {
195        self.fetch_cid_tree(store, webrtc_state, &Cid::public(*root_hash))
196            .await
197    }
198
199    /// Fetch an entire tree from a CID, preserving decryption keys for encrypted trees.
200    pub async fn fetch_cid_tree(
201        &self,
202        store: &HashtreeStore,
203        webrtc_state: Option<&Arc<WebRTCState>>,
204        root_cid: &Cid,
205    ) -> Result<(usize, u64)> {
206        self.fetch_cid_tree_with_progress(store, webrtc_state, root_cid, None)
207            .await
208    }
209
210    /// Fetch an entire tree from a CID, preserving decryption keys, with optional progress updates.
211    pub async fn fetch_cid_tree_with_progress(
212        &self,
213        store: &HashtreeStore,
214        webrtc_state: Option<&Arc<WebRTCState>>,
215        root_cid: &Cid,
216        progress: Option<&FetchProgress>,
217    ) -> Result<(usize, u64)> {
218        self.fetch_cid_tree_parallel_with_progress(store, webrtc_state, root_cid, 1, progress)
219            .await
220    }
221
222    /// Fetch an entire tree with parallel downloads
223    /// Uses work-stealing: always keeps `concurrency` requests in flight
224    /// Returns (chunks_fetched, bytes_fetched)
225    pub async fn fetch_tree_parallel(
226        &self,
227        store: &HashtreeStore,
228        webrtc_state: Option<&Arc<WebRTCState>>,
229        root_hash: &[u8; 32],
230        concurrency: usize,
231    ) -> Result<(usize, u64)> {
232        self.fetch_cid_tree_parallel(store, webrtc_state, &Cid::public(*root_hash), concurrency)
233            .await
234    }
235
236    /// Fetch an entire tree with parallel downloads, preserving decryption keys.
237    pub async fn fetch_cid_tree_parallel(
238        &self,
239        store: &HashtreeStore,
240        webrtc_state: Option<&Arc<WebRTCState>>,
241        root_cid: &Cid,
242        concurrency: usize,
243    ) -> Result<(usize, u64)> {
244        self.fetch_cid_tree_parallel_with_progress(store, webrtc_state, root_cid, concurrency, None)
245            .await
246    }
247
248    /// Fetch an entire tree with parallel downloads, preserving decryption keys, with optional progress updates.
249    pub async fn fetch_cid_tree_parallel_with_progress(
250        &self,
251        store: &HashtreeStore,
252        webrtc_state: Option<&Arc<WebRTCState>>,
253        root_cid: &Cid,
254        concurrency: usize,
255        progress: Option<&FetchProgress>,
256    ) -> Result<(usize, u64)> {
257        use futures::stream::{FuturesUnordered, StreamExt};
258
259        let chunks_fetched = Arc::new(AtomicUsize::new(0));
260        let bytes_fetched = Arc::new(AtomicU64::new(0));
261        let mut queued: HashSet<[u8; 32]> = HashSet::new();
262        let mut pending: VecDeque<Cid> = VecDeque::new();
263
264        pending.push_back(root_cid.clone());
265        queued.insert(root_cid.hash);
266
267        let mut active = FuturesUnordered::new();
268        let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
269
270        loop {
271            // Fill up to concurrency limit from pending queue
272            while active.len() < concurrency {
273                if let Some(cid) = pending.pop_front() {
274                    if store.blob_exists(&cid.hash).unwrap_or(false) {
275                        if let Some(node) = tree.get_node(&cid).await? {
276                            for link in node.links {
277                                let child = child_cid(&cid, &link);
278                                if queued.insert(child.hash) {
279                                    pending.push_back(child);
280                                }
281                            }
282                        }
283                        continue;
284                    }
285
286                    let hash_hex = to_hex(&cid.hash);
287                    let blossom = self.blossom.clone();
288                    let webrtc = webrtc_state.map(Arc::clone);
289                    let timeout = self.config.webrtc_timeout;
290
291                    let fut = async move {
292                        // Try WebRTC first
293                        if let Some(state) = &webrtc {
294                            if let Ok(Some(data)) =
295                                tokio::time::timeout(timeout, state.request_from_peers(&hash_hex))
296                                    .await
297                            {
298                                return (cid, Ok(data));
299                            }
300                        }
301                        // Fallback to Blossom
302                        let data = blossom.download(&hash_hex).await;
303                        (cid, data)
304                    };
305                    active.push(fut);
306                } else {
307                    break;
308                }
309            }
310
311            // If nothing active, we're done
312            if active.is_empty() {
313                break;
314            }
315
316            // Wait for any download to complete
317            if let Some((cid, result)) = active.next().await {
318                match result {
319                    Ok(data) => {
320                        // Store it
321                        store.put_cached_blob(&data)?;
322                        if let Some(progress) = progress {
323                            progress.record_chunk(data.len());
324                        }
325                        chunks_fetched.fetch_add(1, Ordering::Relaxed);
326                        bytes_fetched.fetch_add(data.len() as u64, Ordering::Relaxed);
327
328                        if let Some(node) = tree.get_node(&cid).await? {
329                            for link in node.links {
330                                let child = child_cid(&cid, &link);
331                                if queued.insert(child.hash) {
332                                    pending.push_back(child);
333                                }
334                            }
335                        }
336                    }
337                    Err(e) => {
338                        debug!("Failed to fetch {}: {}", to_hex(&cid.hash), e);
339                        // Continue with other chunks - don't fail the whole tree
340                    }
341                }
342            }
343        }
344
345        Ok((
346            chunks_fetched.load(Ordering::Relaxed),
347            bytes_fetched.load(Ordering::Relaxed),
348        ))
349    }
350
351    /// Fetch a file by hash, fetching all chunks if needed
352    /// Returns the complete file content
353    pub async fn fetch_file(
354        &self,
355        store: &HashtreeStore,
356        webrtc_state: Option<&Arc<WebRTCState>>,
357        hash: &[u8; 32],
358    ) -> Result<Option<Vec<u8>>> {
359        // First, try to get from local storage
360        if let Some(content) = store.get_file(hash)? {
361            return Ok(Some(content));
362        }
363
364        // Fetch the tree
365        self.fetch_tree(store, webrtc_state, hash).await?;
366
367        // Now try to read the file
368        store.get_file(hash)
369    }
370
371    /// Fetch a directory listing, fetching chunks if needed
372    pub async fn fetch_directory(
373        &self,
374        store: &HashtreeStore,
375        webrtc_state: Option<&Arc<WebRTCState>>,
376        hash: &[u8; 32],
377    ) -> Result<Option<crate::storage::DirectoryListing>> {
378        // First, try to get from local storage
379        if let Ok(Some(listing)) = store.get_directory_listing(hash) {
380            return Ok(Some(listing));
381        }
382
383        // Fetch the tree
384        self.fetch_tree(store, webrtc_state, hash).await?;
385
386        // Now try to get the directory listing
387        store.get_directory_listing(hash)
388    }
389
390    /// Upload data to Blossom servers
391    pub async fn upload(&self, data: &[u8]) -> Result<String> {
392        self.blossom
393            .upload(data)
394            .await
395            .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
396    }
397
398    /// Upload data if it doesn't already exist
399    pub async fn upload_if_missing(&self, data: &[u8]) -> Result<(String, bool)> {
400        self.blossom
401            .upload_if_missing(data)
402            .await
403            .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
404    }
405}
406
407fn with_local_daemon_read(blossom: BlossomClient) -> BlossomClient {
408    let bind_address = CliConfig::load().ok().map(|cfg| cfg.server.bind_address);
409    let local_url = detect_local_daemon_url(bind_address.as_deref());
410    let Some(local_url) = local_url else {
411        return blossom;
412    };
413
414    let mut servers = blossom.read_servers().to_vec();
415    if servers.iter().any(|server| server == &local_url) {
416        return blossom;
417    }
418    servers.insert(0, local_url);
419    blossom.with_read_servers(servers)
420}