Skip to main content

hashtree_cli/
fetch.rs

1//! Remote content fetching with WebRTC and Blossom fallback
2//!
3//! Provides shared logic for fetching content from:
4//! 1. Local storage (first)
5//! 2. WebRTC peers (second)
6//! 3. Blossom HTTP servers (fallback)
7
8use anyhow::Result;
9use hashtree_blossom::BlossomClient;
10use hashtree_config::detect_local_daemon_url;
11use hashtree_core::{decode_tree_node, to_hex};
12use nostr::Keys;
13use std::collections::VecDeque;
14use std::sync::Arc;
15use std::time::Duration;
16use tracing::debug;
17
18use crate::config::Config as CliConfig;
19use crate::storage::HashtreeStore;
20use crate::webrtc::WebRTCState;
21
22/// Configuration for remote fetching
23#[derive(Clone)]
24pub struct FetchConfig {
25    /// Timeout for WebRTC requests
26    pub webrtc_timeout: Duration,
27    /// Timeout for Blossom requests
28    pub blossom_timeout: Duration,
29}
30
31impl Default for FetchConfig {
32    fn default() -> Self {
33        Self {
34            webrtc_timeout: Duration::from_millis(2000),
35            blossom_timeout: Duration::from_millis(10000),
36        }
37    }
38}
39
40/// Fetcher for remote content
41pub struct Fetcher {
42    config: FetchConfig,
43    blossom: BlossomClient,
44}
45
46impl Fetcher {
47    /// Create a new fetcher with the given config
48    /// BlossomClient auto-loads servers from ~/.hashtree/config.toml
49    pub fn new(config: FetchConfig) -> Self {
50        // Generate ephemeral keys for downloads (no signing needed)
51        let keys = Keys::generate();
52        let blossom = BlossomClient::new(keys).with_timeout(config.blossom_timeout);
53        let blossom = with_local_daemon_read(blossom);
54
55        Self { config, blossom }
56    }
57
58    /// Create a new fetcher with specific keys (for authenticated uploads)
59    pub fn with_keys(config: FetchConfig, keys: Keys) -> Self {
60        let blossom = BlossomClient::new(keys).with_timeout(config.blossom_timeout);
61        let blossom = with_local_daemon_read(blossom);
62
63        Self { config, blossom }
64    }
65
66    /// Get the underlying BlossomClient
67    pub fn blossom(&self) -> &BlossomClient {
68        &self.blossom
69    }
70
71    /// Fetch a single chunk by hash, trying WebRTC first then Blossom
72    pub async fn fetch_chunk(
73        &self,
74        webrtc_state: Option<&Arc<WebRTCState>>,
75        hash_hex: &str,
76    ) -> Result<Vec<u8>> {
77        let short_hash = if hash_hex.len() >= 12 {
78            &hash_hex[..12]
79        } else {
80            hash_hex
81        };
82
83        // Try WebRTC first
84        if let Some(state) = webrtc_state {
85            debug!("Trying WebRTC for {}", short_hash);
86            let webrtc_result = tokio::time::timeout(
87                self.config.webrtc_timeout,
88                state.request_from_peers(hash_hex),
89            )
90            .await;
91
92            if let Ok(Some(data)) = webrtc_result {
93                debug!("Got {} from WebRTC ({} bytes)", short_hash, data.len());
94                return Ok(data);
95            }
96        }
97
98        // Fallback to Blossom
99        debug!("Trying Blossom for {}", short_hash);
100        match self.blossom.download(hash_hex).await {
101            Ok(data) => {
102                debug!("Got {} from Blossom ({} bytes)", short_hash, data.len());
103                Ok(data)
104            }
105            Err(e) => {
106                debug!("Blossom download failed for {}: {}", short_hash, e);
107                Err(anyhow::anyhow!(
108                    "Failed to fetch {} from any source: {}",
109                    short_hash,
110                    e
111                ))
112            }
113        }
114    }
115
116    /// Fetch a chunk, checking local storage first
117    pub async fn fetch_chunk_with_store(
118        &self,
119        store: &HashtreeStore,
120        webrtc_state: Option<&Arc<WebRTCState>>,
121        hash: &[u8; 32],
122    ) -> Result<Vec<u8>> {
123        // Check local storage first
124        if let Some(data) = store.get_chunk(hash)? {
125            return Ok(data);
126        }
127
128        // Fetch remotely and store
129        let hash_hex = to_hex(hash);
130        let data = self.fetch_chunk(webrtc_state, &hash_hex).await?;
131        store.put_blob(&data)?;
132        Ok(data)
133    }
134
135    /// Fetch an entire tree (all chunks recursively) - sequential version
136    /// Returns (chunks_fetched, bytes_fetched)
137    pub async fn fetch_tree(
138        &self,
139        store: &HashtreeStore,
140        webrtc_state: Option<&Arc<WebRTCState>>,
141        root_hash: &[u8; 32],
142    ) -> Result<(usize, u64)> {
143        self.fetch_tree_parallel(store, webrtc_state, root_hash, 1)
144            .await
145    }
146
147    /// Fetch an entire tree with parallel downloads
148    /// Uses work-stealing: always keeps `concurrency` requests in flight
149    /// Returns (chunks_fetched, bytes_fetched)
150    pub async fn fetch_tree_parallel(
151        &self,
152        store: &HashtreeStore,
153        webrtc_state: Option<&Arc<WebRTCState>>,
154        root_hash: &[u8; 32],
155        concurrency: usize,
156    ) -> Result<(usize, u64)> {
157        use futures::stream::{FuturesUnordered, StreamExt};
158        use std::collections::HashSet;
159        use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
160
161        // Check if we already have the root
162        if store.blob_exists(root_hash)? {
163            return Ok((0, 0));
164        }
165
166        let chunks_fetched = Arc::new(AtomicUsize::new(0));
167        let bytes_fetched = Arc::new(AtomicU64::new(0));
168
169        // Track what we've queued to avoid duplicates
170        let mut queued: HashSet<[u8; 32]> = HashSet::new();
171        let mut pending: VecDeque<[u8; 32]> = VecDeque::new();
172
173        // Seed with root
174        pending.push_back(*root_hash);
175        queued.insert(*root_hash);
176
177        let mut active = FuturesUnordered::new();
178
179        loop {
180            // Fill up to concurrency limit from pending queue
181            while active.len() < concurrency {
182                if let Some(hash) = pending.pop_front() {
183                    // Skip if we already have it locally
184                    if store.blob_exists(&hash).unwrap_or(false) {
185                        continue;
186                    }
187
188                    let hash_hex = to_hex(&hash);
189                    let blossom = self.blossom.clone();
190                    let webrtc = webrtc_state.map(Arc::clone);
191                    let timeout = self.config.webrtc_timeout;
192
193                    let fut = async move {
194                        // Try WebRTC first
195                        if let Some(state) = &webrtc {
196                            if let Ok(Some(data)) =
197                                tokio::time::timeout(timeout, state.request_from_peers(&hash_hex))
198                                    .await
199                            {
200                                return (hash, Ok(data));
201                            }
202                        }
203                        // Fallback to Blossom
204                        let data = blossom.download(&hash_hex).await;
205                        (hash, data)
206                    };
207                    active.push(fut);
208                } else {
209                    break;
210                }
211            }
212
213            // If nothing active, we're done
214            if active.is_empty() {
215                break;
216            }
217
218            // Wait for any download to complete
219            if let Some((hash, result)) = active.next().await {
220                match result {
221                    Ok(data) => {
222                        // Store it
223                        store.put_blob(&data)?;
224                        chunks_fetched.fetch_add(1, Ordering::Relaxed);
225                        bytes_fetched.fetch_add(data.len() as u64, Ordering::Relaxed);
226
227                        // Parse as tree node and queue children
228                        if let Ok(node) = decode_tree_node(&data) {
229                            for link in node.links {
230                                if !queued.contains(&link.hash) {
231                                    queued.insert(link.hash);
232                                    pending.push_back(link.hash);
233                                }
234                            }
235                        }
236                    }
237                    Err(e) => {
238                        debug!("Failed to fetch {}: {}", to_hex(&hash), e);
239                        // Continue with other chunks - don't fail the whole tree
240                    }
241                }
242            }
243        }
244
245        Ok((
246            chunks_fetched.load(Ordering::Relaxed),
247            bytes_fetched.load(Ordering::Relaxed),
248        ))
249    }
250
251    /// Fetch a file by hash, fetching all chunks if needed
252    /// Returns the complete file content
253    pub async fn fetch_file(
254        &self,
255        store: &HashtreeStore,
256        webrtc_state: Option<&Arc<WebRTCState>>,
257        hash: &[u8; 32],
258    ) -> Result<Option<Vec<u8>>> {
259        // First, try to get from local storage
260        if let Some(content) = store.get_file(hash)? {
261            return Ok(Some(content));
262        }
263
264        // Fetch the tree
265        self.fetch_tree(store, webrtc_state, hash).await?;
266
267        // Now try to read the file
268        store.get_file(hash)
269    }
270
271    /// Fetch a directory listing, fetching chunks if needed
272    pub async fn fetch_directory(
273        &self,
274        store: &HashtreeStore,
275        webrtc_state: Option<&Arc<WebRTCState>>,
276        hash: &[u8; 32],
277    ) -> Result<Option<crate::storage::DirectoryListing>> {
278        // First, try to get from local storage
279        if let Ok(Some(listing)) = store.get_directory_listing(hash) {
280            return Ok(Some(listing));
281        }
282
283        // Fetch the tree
284        self.fetch_tree(store, webrtc_state, hash).await?;
285
286        // Now try to get the directory listing
287        store.get_directory_listing(hash)
288    }
289
290    /// Upload data to Blossom servers
291    pub async fn upload(&self, data: &[u8]) -> Result<String> {
292        self.blossom
293            .upload(data)
294            .await
295            .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
296    }
297
298    /// Upload data if it doesn't already exist
299    pub async fn upload_if_missing(&self, data: &[u8]) -> Result<(String, bool)> {
300        self.blossom
301            .upload_if_missing(data)
302            .await
303            .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
304    }
305}
306
307fn with_local_daemon_read(blossom: BlossomClient) -> BlossomClient {
308    let bind_address = CliConfig::load().ok().map(|cfg| cfg.server.bind_address);
309    let local_url = detect_local_daemon_url(bind_address.as_deref());
310    let Some(local_url) = local_url else {
311        return blossom;
312    };
313
314    let mut servers = blossom.read_servers().to_vec();
315    if servers.iter().any(|server| server == &local_url) {
316        return blossom;
317    }
318    servers.insert(0, local_url);
319    blossom.with_read_servers(servers)
320}