Skip to main content

hashtree_cli/
fetch.rs

1//! Remote content fetching with WebRTC and Blossom fallback
2//!
3//! Provides shared logic for fetching content from:
4//! 1. Local storage (first)
5//! 2. WebRTC peers (second)
6//! 3. Blossom HTTP servers (fallback)
7
8use anyhow::Result;
9use hashtree_blossom::BlossomClient;
10use hashtree_config::detect_local_daemon_url;
11use hashtree_core::{to_hex, Cid, HashTree, HashTreeConfig, Link};
12use nostr::Keys;
13use std::collections::{HashSet, VecDeque};
14use std::sync::Arc;
15use std::time::Duration;
16use tracing::debug;
17
18use crate::config::Config as CliConfig;
19use crate::storage::HashtreeStore;
20use crate::webrtc::WebRTCState;
21
22fn child_cid(parent: &Cid, link: &Link) -> Cid {
23    let inherits_parent_key = link
24        .name
25        .as_deref()
26        .map(|name| {
27            name.starts_with("_chunk_")
28                || (name.starts_with('_') && name.chars().count() == 2 && link.link_type.is_tree())
29        })
30        .unwrap_or(false);
31
32    Cid {
33        hash: link.hash,
34        key: link.key.or(if inherits_parent_key {
35            parent.key
36        } else {
37            None
38        }),
39    }
40}
41
42/// Configuration for remote fetching
43#[derive(Clone)]
44pub struct FetchConfig {
45    /// Timeout for WebRTC requests
46    pub webrtc_timeout: Duration,
47    /// Timeout for Blossom requests
48    pub blossom_timeout: Duration,
49}
50
51impl Default for FetchConfig {
52    fn default() -> Self {
53        Self {
54            webrtc_timeout: Duration::from_millis(2000),
55            blossom_timeout: Duration::from_millis(10000),
56        }
57    }
58}
59
60/// Fetcher for remote content
61pub struct Fetcher {
62    config: FetchConfig,
63    blossom: BlossomClient,
64}
65
66impl Fetcher {
67    /// Create a new fetcher with the given config
68    /// BlossomClient auto-loads servers from ~/.hashtree/config.toml
69    pub fn new(config: FetchConfig) -> Self {
70        // Generate ephemeral keys for downloads (no signing needed)
71        let keys = Keys::generate();
72        let blossom = BlossomClient::new(keys).with_timeout(config.blossom_timeout);
73        let blossom = with_local_daemon_read(blossom);
74
75        Self { config, blossom }
76    }
77
78    /// Create a new fetcher with specific keys (for authenticated uploads)
79    pub fn with_keys(config: FetchConfig, keys: Keys) -> Self {
80        let blossom = BlossomClient::new(keys).with_timeout(config.blossom_timeout);
81        let blossom = with_local_daemon_read(blossom);
82
83        Self { config, blossom }
84    }
85
86    /// Get the underlying BlossomClient
87    pub fn blossom(&self) -> &BlossomClient {
88        &self.blossom
89    }
90
91    /// Fetch a single chunk by hash, trying WebRTC first then Blossom
92    pub async fn fetch_chunk(
93        &self,
94        webrtc_state: Option<&Arc<WebRTCState>>,
95        hash_hex: &str,
96    ) -> Result<Vec<u8>> {
97        let short_hash = if hash_hex.len() >= 12 {
98            &hash_hex[..12]
99        } else {
100            hash_hex
101        };
102
103        // Try WebRTC first
104        if let Some(state) = webrtc_state {
105            debug!("Trying WebRTC for {}", short_hash);
106            let webrtc_result = tokio::time::timeout(
107                self.config.webrtc_timeout,
108                state.request_from_peers(hash_hex),
109            )
110            .await;
111
112            if let Ok(Some(data)) = webrtc_result {
113                debug!("Got {} from WebRTC ({} bytes)", short_hash, data.len());
114                return Ok(data);
115            }
116        }
117
118        // Fallback to Blossom
119        debug!("Trying Blossom for {}", short_hash);
120        match self.blossom.download(hash_hex).await {
121            Ok(data) => {
122                debug!("Got {} from Blossom ({} bytes)", short_hash, data.len());
123                Ok(data)
124            }
125            Err(e) => {
126                debug!("Blossom download failed for {}: {}", short_hash, e);
127                Err(anyhow::anyhow!(
128                    "Failed to fetch {} from any source: {}",
129                    short_hash,
130                    e
131                ))
132            }
133        }
134    }
135
136    /// Fetch a chunk, checking local storage first
137    pub async fn fetch_chunk_with_store(
138        &self,
139        store: &HashtreeStore,
140        webrtc_state: Option<&Arc<WebRTCState>>,
141        hash: &[u8; 32],
142    ) -> Result<Vec<u8>> {
143        // Check local storage first
144        if let Some(data) = store.get_chunk(hash)? {
145            return Ok(data);
146        }
147
148        // Fetch remotely and store
149        let hash_hex = to_hex(hash);
150        let data = self.fetch_chunk(webrtc_state, &hash_hex).await?;
151        store.put_blob(&data)?;
152        Ok(data)
153    }
154
155    /// Fetch an entire tree (all chunks recursively) - sequential version
156    /// Returns (chunks_fetched, bytes_fetched)
157    pub async fn fetch_tree(
158        &self,
159        store: &HashtreeStore,
160        webrtc_state: Option<&Arc<WebRTCState>>,
161        root_hash: &[u8; 32],
162    ) -> Result<(usize, u64)> {
163        self.fetch_cid_tree(store, webrtc_state, &Cid::public(*root_hash))
164            .await
165    }
166
167    /// Fetch an entire tree from a CID, preserving decryption keys for encrypted trees.
168    pub async fn fetch_cid_tree(
169        &self,
170        store: &HashtreeStore,
171        webrtc_state: Option<&Arc<WebRTCState>>,
172        root_cid: &Cid,
173    ) -> Result<(usize, u64)> {
174        self.fetch_cid_tree_parallel(store, webrtc_state, root_cid, 1)
175            .await
176    }
177
178    /// Fetch an entire tree with parallel downloads
179    /// Uses work-stealing: always keeps `concurrency` requests in flight
180    /// Returns (chunks_fetched, bytes_fetched)
181    pub async fn fetch_tree_parallel(
182        &self,
183        store: &HashtreeStore,
184        webrtc_state: Option<&Arc<WebRTCState>>,
185        root_hash: &[u8; 32],
186        concurrency: usize,
187    ) -> Result<(usize, u64)> {
188        self.fetch_cid_tree_parallel(store, webrtc_state, &Cid::public(*root_hash), concurrency)
189            .await
190    }
191
192    /// Fetch an entire tree with parallel downloads, preserving decryption keys.
193    pub async fn fetch_cid_tree_parallel(
194        &self,
195        store: &HashtreeStore,
196        webrtc_state: Option<&Arc<WebRTCState>>,
197        root_cid: &Cid,
198        concurrency: usize,
199    ) -> Result<(usize, u64)> {
200        use futures::stream::{FuturesUnordered, StreamExt};
201        use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
202
203        let chunks_fetched = Arc::new(AtomicUsize::new(0));
204        let bytes_fetched = Arc::new(AtomicU64::new(0));
205        let mut queued: HashSet<[u8; 32]> = HashSet::new();
206        let mut pending: VecDeque<Cid> = VecDeque::new();
207
208        pending.push_back(root_cid.clone());
209        queued.insert(root_cid.hash);
210
211        let mut active = FuturesUnordered::new();
212        let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
213
214        loop {
215            // Fill up to concurrency limit from pending queue
216            while active.len() < concurrency {
217                if let Some(cid) = pending.pop_front() {
218                    if store.blob_exists(&cid.hash).unwrap_or(false) {
219                        if let Some(node) = tree.get_node(&cid).await? {
220                            for link in node.links {
221                                let child = child_cid(&cid, &link);
222                                if queued.insert(child.hash) {
223                                    pending.push_back(child);
224                                }
225                            }
226                        }
227                        continue;
228                    }
229
230                    let hash_hex = to_hex(&cid.hash);
231                    let blossom = self.blossom.clone();
232                    let webrtc = webrtc_state.map(Arc::clone);
233                    let timeout = self.config.webrtc_timeout;
234
235                    let fut = async move {
236                        // Try WebRTC first
237                        if let Some(state) = &webrtc {
238                            if let Ok(Some(data)) =
239                                tokio::time::timeout(timeout, state.request_from_peers(&hash_hex))
240                                    .await
241                            {
242                                return (cid, Ok(data));
243                            }
244                        }
245                        // Fallback to Blossom
246                        let data = blossom.download(&hash_hex).await;
247                        (cid, data)
248                    };
249                    active.push(fut);
250                } else {
251                    break;
252                }
253            }
254
255            // If nothing active, we're done
256            if active.is_empty() {
257                break;
258            }
259
260            // Wait for any download to complete
261            if let Some((cid, result)) = active.next().await {
262                match result {
263                    Ok(data) => {
264                        // Store it
265                        store.put_blob(&data)?;
266                        chunks_fetched.fetch_add(1, Ordering::Relaxed);
267                        bytes_fetched.fetch_add(data.len() as u64, Ordering::Relaxed);
268
269                        if let Some(node) = tree.get_node(&cid).await? {
270                            for link in node.links {
271                                let child = child_cid(&cid, &link);
272                                if queued.insert(child.hash) {
273                                    pending.push_back(child);
274                                }
275                            }
276                        }
277                    }
278                    Err(e) => {
279                        debug!("Failed to fetch {}: {}", to_hex(&cid.hash), e);
280                        // Continue with other chunks - don't fail the whole tree
281                    }
282                }
283            }
284        }
285
286        Ok((
287            chunks_fetched.load(Ordering::Relaxed),
288            bytes_fetched.load(Ordering::Relaxed),
289        ))
290    }
291
292    /// Fetch a file by hash, fetching all chunks if needed
293    /// Returns the complete file content
294    pub async fn fetch_file(
295        &self,
296        store: &HashtreeStore,
297        webrtc_state: Option<&Arc<WebRTCState>>,
298        hash: &[u8; 32],
299    ) -> Result<Option<Vec<u8>>> {
300        // First, try to get from local storage
301        if let Some(content) = store.get_file(hash)? {
302            return Ok(Some(content));
303        }
304
305        // Fetch the tree
306        self.fetch_tree(store, webrtc_state, hash).await?;
307
308        // Now try to read the file
309        store.get_file(hash)
310    }
311
312    /// Fetch a directory listing, fetching chunks if needed
313    pub async fn fetch_directory(
314        &self,
315        store: &HashtreeStore,
316        webrtc_state: Option<&Arc<WebRTCState>>,
317        hash: &[u8; 32],
318    ) -> Result<Option<crate::storage::DirectoryListing>> {
319        // First, try to get from local storage
320        if let Ok(Some(listing)) = store.get_directory_listing(hash) {
321            return Ok(Some(listing));
322        }
323
324        // Fetch the tree
325        self.fetch_tree(store, webrtc_state, hash).await?;
326
327        // Now try to get the directory listing
328        store.get_directory_listing(hash)
329    }
330
331    /// Upload data to Blossom servers
332    pub async fn upload(&self, data: &[u8]) -> Result<String> {
333        self.blossom
334            .upload(data)
335            .await
336            .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
337    }
338
339    /// Upload data if it doesn't already exist
340    pub async fn upload_if_missing(&self, data: &[u8]) -> Result<(String, bool)> {
341        self.blossom
342            .upload_if_missing(data)
343            .await
344            .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
345    }
346}
347
348fn with_local_daemon_read(blossom: BlossomClient) -> BlossomClient {
349    let bind_address = CliConfig::load().ok().map(|cfg| cfg.server.bind_address);
350    let local_url = detect_local_daemon_url(bind_address.as_deref());
351    let Some(local_url) = local_url else {
352        return blossom;
353    };
354
355    let mut servers = blossom.read_servers().to_vec();
356    if servers.iter().any(|server| server == &local_url) {
357        return blossom;
358    }
359    servers.insert(0, local_url);
360    blossom.with_read_servers(servers)
361}