Skip to main content

hashtree_cli/
fetch.rs

1//! Remote content fetching with WebRTC and Blossom fallback
2//!
3//! Provides shared logic for fetching content from:
4//! 1. Local storage (first)
5//! 2. WebRTC peers (second)
6//! 3. Blossom HTTP servers (fallback)
7
8use anyhow::Result;
9use hashtree_blossom::BlossomClient;
10use hashtree_config::detect_local_daemon_url;
11use hashtree_core::{decode_tree_node, to_hex};
12use nostr::Keys;
13use std::collections::VecDeque;
14use std::sync::Arc;
15use std::time::Duration;
16use tracing::debug;
17
18use crate::config::Config as CliConfig;
19use crate::storage::HashtreeStore;
20use crate::webrtc::WebRTCState;
21
22/// Configuration for remote fetching
23#[derive(Clone)]
24pub struct FetchConfig {
25    /// Timeout for WebRTC requests
26    pub webrtc_timeout: Duration,
27    /// Timeout for Blossom requests
28    pub blossom_timeout: Duration,
29}
30
31impl Default for FetchConfig {
32    fn default() -> Self {
33        Self {
34            webrtc_timeout: Duration::from_millis(2000),
35            blossom_timeout: Duration::from_millis(10000),
36        }
37    }
38}
39
40/// Fetcher for remote content
41pub struct Fetcher {
42    config: FetchConfig,
43    blossom: BlossomClient,
44}
45
46impl Fetcher {
47    /// Create a new fetcher with the given config
48    /// BlossomClient auto-loads servers from ~/.hashtree/config.toml
49    pub fn new(config: FetchConfig) -> Self {
50        // Generate ephemeral keys for downloads (no signing needed)
51        let keys = Keys::generate();
52        let blossom = BlossomClient::new(keys)
53            .with_timeout(config.blossom_timeout);
54        let blossom = with_local_daemon_read(blossom);
55
56        Self { config, blossom }
57    }
58
59    /// Create a new fetcher with specific keys (for authenticated uploads)
60    pub fn with_keys(config: FetchConfig, keys: Keys) -> Self {
61        let blossom = BlossomClient::new(keys)
62            .with_timeout(config.blossom_timeout);
63        let blossom = with_local_daemon_read(blossom);
64
65        Self { config, blossom }
66    }
67
68    /// Get the underlying BlossomClient
69    pub fn blossom(&self) -> &BlossomClient {
70        &self.blossom
71    }
72
73    /// Fetch a single chunk by hash, trying WebRTC first then Blossom
74    pub async fn fetch_chunk(
75        &self,
76        webrtc_state: Option<&Arc<WebRTCState>>,
77        hash_hex: &str,
78    ) -> Result<Vec<u8>> {
79        let short_hash = if hash_hex.len() >= 12 {
80            &hash_hex[..12]
81        } else {
82            hash_hex
83        };
84
85        // Try WebRTC first
86        if let Some(state) = webrtc_state {
87            debug!("Trying WebRTC for {}", short_hash);
88            let webrtc_result = tokio::time::timeout(
89                self.config.webrtc_timeout,
90                state.request_from_peers(hash_hex),
91            )
92            .await;
93
94            if let Ok(Some(data)) = webrtc_result {
95                debug!("Got {} from WebRTC ({} bytes)", short_hash, data.len());
96                return Ok(data);
97            }
98        }
99
100        // Fallback to Blossom
101        debug!("Trying Blossom for {}", short_hash);
102        match self.blossom.download(hash_hex).await {
103            Ok(data) => {
104                debug!("Got {} from Blossom ({} bytes)", short_hash, data.len());
105                Ok(data)
106            }
107            Err(e) => {
108                debug!("Blossom download failed for {}: {}", short_hash, e);
109                Err(anyhow::anyhow!("Failed to fetch {} from any source: {}", short_hash, e))
110            }
111        }
112    }
113
114    /// Fetch a chunk, checking local storage first
115    pub async fn fetch_chunk_with_store(
116        &self,
117        store: &HashtreeStore,
118        webrtc_state: Option<&Arc<WebRTCState>>,
119        hash: &[u8; 32],
120    ) -> Result<Vec<u8>> {
121        // Check local storage first
122        if let Some(data) = store.get_chunk(hash)? {
123            return Ok(data);
124        }
125
126        // Fetch remotely and store
127        let hash_hex = to_hex(hash);
128        let data = self.fetch_chunk(webrtc_state, &hash_hex).await?;
129        store.put_blob(&data)?;
130        Ok(data)
131    }
132
133    /// Fetch an entire tree (all chunks recursively) - sequential version
134    /// Returns (chunks_fetched, bytes_fetched)
135    pub async fn fetch_tree(
136        &self,
137        store: &HashtreeStore,
138        webrtc_state: Option<&Arc<WebRTCState>>,
139        root_hash: &[u8; 32],
140    ) -> Result<(usize, u64)> {
141        self.fetch_tree_parallel(store, webrtc_state, root_hash, 1).await
142    }
143
144    /// Fetch an entire tree with parallel downloads
145    /// Uses work-stealing: always keeps `concurrency` requests in flight
146    /// Returns (chunks_fetched, bytes_fetched)
147    pub async fn fetch_tree_parallel(
148        &self,
149        store: &HashtreeStore,
150        webrtc_state: Option<&Arc<WebRTCState>>,
151        root_hash: &[u8; 32],
152        concurrency: usize,
153    ) -> Result<(usize, u64)> {
154        use futures::stream::{FuturesUnordered, StreamExt};
155        use std::collections::HashSet;
156        use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
157
158        // Check if we already have the root
159        if store.blob_exists(root_hash)? {
160            return Ok((0, 0));
161        }
162
163        let chunks_fetched = Arc::new(AtomicUsize::new(0));
164        let bytes_fetched = Arc::new(AtomicU64::new(0));
165
166        // Track what we've queued to avoid duplicates
167        let mut queued: HashSet<[u8; 32]> = HashSet::new();
168        let mut pending: VecDeque<[u8; 32]> = VecDeque::new();
169
170        // Seed with root
171        pending.push_back(*root_hash);
172        queued.insert(*root_hash);
173
174        let mut active = FuturesUnordered::new();
175
176        loop {
177            // Fill up to concurrency limit from pending queue
178            while active.len() < concurrency {
179                if let Some(hash) = pending.pop_front() {
180                    // Skip if we already have it locally
181                    if store.blob_exists(&hash).unwrap_or(false) {
182                        continue;
183                    }
184
185                    let hash_hex = to_hex(&hash);
186                    let blossom = self.blossom.clone();
187                    let webrtc = webrtc_state.map(Arc::clone);
188                    let timeout = self.config.webrtc_timeout;
189
190                    let fut = async move {
191                        // Try WebRTC first
192                        if let Some(state) = &webrtc {
193                            if let Ok(Some(data)) = tokio::time::timeout(
194                                timeout,
195                                state.request_from_peers(&hash_hex),
196                            )
197                            .await
198                            {
199                                return (hash, Ok(data));
200                            }
201                        }
202                        // Fallback to Blossom
203                        let data = blossom.download(&hash_hex).await;
204                        (hash, data)
205                    };
206                    active.push(fut);
207                } else {
208                    break;
209                }
210            }
211
212            // If nothing active, we're done
213            if active.is_empty() {
214                break;
215            }
216
217            // Wait for any download to complete
218            if let Some((hash, result)) = active.next().await {
219                match result {
220                    Ok(data) => {
221                        // Store it
222                        store.put_blob(&data)?;
223                        chunks_fetched.fetch_add(1, Ordering::Relaxed);
224                        bytes_fetched.fetch_add(data.len() as u64, Ordering::Relaxed);
225
226                        // Parse as tree node and queue children
227                        if let Ok(node) = decode_tree_node(&data) {
228                            for link in node.links {
229                                if !queued.contains(&link.hash) {
230                                    queued.insert(link.hash);
231                                    pending.push_back(link.hash);
232                                }
233                            }
234                        }
235                    }
236                    Err(e) => {
237                        debug!("Failed to fetch {}: {}", to_hex(&hash), e);
238                        // Continue with other chunks - don't fail the whole tree
239                    }
240                }
241            }
242        }
243
244        Ok((
245            chunks_fetched.load(Ordering::Relaxed),
246            bytes_fetched.load(Ordering::Relaxed),
247        ))
248    }
249
250    /// Fetch a file by hash, fetching all chunks if needed
251    /// Returns the complete file content
252    pub async fn fetch_file(
253        &self,
254        store: &HashtreeStore,
255        webrtc_state: Option<&Arc<WebRTCState>>,
256        hash: &[u8; 32],
257    ) -> Result<Option<Vec<u8>>> {
258        // First, try to get from local storage
259        if let Some(content) = store.get_file(hash)? {
260            return Ok(Some(content));
261        }
262
263        // Fetch the tree
264        self.fetch_tree(store, webrtc_state, hash).await?;
265
266        // Now try to read the file
267        store.get_file(hash)
268    }
269
270    /// Fetch a directory listing, fetching chunks if needed
271    pub async fn fetch_directory(
272        &self,
273        store: &HashtreeStore,
274        webrtc_state: Option<&Arc<WebRTCState>>,
275        hash: &[u8; 32],
276    ) -> Result<Option<crate::storage::DirectoryListing>> {
277        // First, try to get from local storage
278        if let Ok(Some(listing)) = store.get_directory_listing(hash) {
279            return Ok(Some(listing));
280        }
281
282        // Fetch the tree
283        self.fetch_tree(store, webrtc_state, hash).await?;
284
285        // Now try to get the directory listing
286        store.get_directory_listing(hash)
287    }
288
289    /// Upload data to Blossom servers
290    pub async fn upload(&self, data: &[u8]) -> Result<String> {
291        self.blossom
292            .upload(data)
293            .await
294            .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
295    }
296
297    /// Upload data if it doesn't already exist
298    pub async fn upload_if_missing(&self, data: &[u8]) -> Result<(String, bool)> {
299        self.blossom
300            .upload_if_missing(data)
301            .await
302            .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
303    }
304}
305
306fn with_local_daemon_read(blossom: BlossomClient) -> BlossomClient {
307    let bind_address = CliConfig::load().ok().map(|cfg| cfg.server.bind_address);
308    let local_url = detect_local_daemon_url(bind_address.as_deref());
309    let Some(local_url) = local_url else {
310        return blossom;
311    };
312
313    let mut servers = blossom.read_servers().to_vec();
314    if servers.iter().any(|server| server == &local_url) {
315        return blossom;
316    }
317    servers.insert(0, local_url);
318    blossom.with_read_servers(servers)
319}