hashtree_cli/
fetch.rs

1//! Remote content fetching with WebRTC and Blossom fallback
2//!
3//! Provides shared logic for fetching content from:
4//! 1. Local storage (first)
5//! 2. WebRTC peers (second)
6//! 3. Blossom HTTP servers (fallback)
7
8use anyhow::Result;
9use hashtree_blossom::BlossomClient;
10use hashtree_core::{decode_tree_node, to_hex};
11use nostr::Keys;
12use std::collections::VecDeque;
13use std::sync::Arc;
14use std::time::Duration;
15use tracing::debug;
16
17use crate::storage::HashtreeStore;
18use crate::webrtc::WebRTCState;
19
20/// Configuration for remote fetching
21#[derive(Clone)]
22pub struct FetchConfig {
23    /// Timeout for WebRTC requests
24    pub webrtc_timeout: Duration,
25    /// Timeout for Blossom requests
26    pub blossom_timeout: Duration,
27}
28
29impl Default for FetchConfig {
30    fn default() -> Self {
31        Self {
32            webrtc_timeout: Duration::from_millis(2000),
33            blossom_timeout: Duration::from_millis(10000),
34        }
35    }
36}
37
38/// Fetcher for remote content
39pub struct Fetcher {
40    config: FetchConfig,
41    blossom: BlossomClient,
42}
43
44impl Fetcher {
45    /// Create a new fetcher with the given config
46    /// BlossomClient auto-loads servers from ~/.hashtree/config.toml
47    pub fn new(config: FetchConfig) -> Self {
48        // Generate ephemeral keys for downloads (no signing needed)
49        let keys = Keys::generate();
50        let blossom = BlossomClient::new(keys)
51            .with_timeout(config.blossom_timeout);
52
53        Self { config, blossom }
54    }
55
56    /// Create a new fetcher with specific keys (for authenticated uploads)
57    pub fn with_keys(config: FetchConfig, keys: Keys) -> Self {
58        let blossom = BlossomClient::new(keys)
59            .with_timeout(config.blossom_timeout);
60
61        Self { config, blossom }
62    }
63
64    /// Get the underlying BlossomClient
65    pub fn blossom(&self) -> &BlossomClient {
66        &self.blossom
67    }
68
69    /// Fetch a single chunk by hash, trying WebRTC first then Blossom
70    pub async fn fetch_chunk(
71        &self,
72        webrtc_state: Option<&Arc<WebRTCState>>,
73        hash_hex: &str,
74    ) -> Result<Vec<u8>> {
75        let short_hash = if hash_hex.len() >= 12 {
76            &hash_hex[..12]
77        } else {
78            hash_hex
79        };
80
81        // Try WebRTC first
82        if let Some(state) = webrtc_state {
83            debug!("Trying WebRTC for {}", short_hash);
84            let webrtc_result = tokio::time::timeout(
85                self.config.webrtc_timeout,
86                state.request_from_peers(hash_hex),
87            )
88            .await;
89
90            if let Ok(Some(data)) = webrtc_result {
91                debug!("Got {} from WebRTC ({} bytes)", short_hash, data.len());
92                return Ok(data);
93            }
94        }
95
96        // Fallback to Blossom
97        debug!("Trying Blossom for {}", short_hash);
98        match self.blossom.download(hash_hex).await {
99            Ok(data) => {
100                debug!("Got {} from Blossom ({} bytes)", short_hash, data.len());
101                Ok(data)
102            }
103            Err(e) => {
104                debug!("Blossom download failed for {}: {}", short_hash, e);
105                Err(anyhow::anyhow!("Failed to fetch {} from any source: {}", short_hash, e))
106            }
107        }
108    }
109
110    /// Fetch a chunk, checking local storage first
111    pub async fn fetch_chunk_with_store(
112        &self,
113        store: &HashtreeStore,
114        webrtc_state: Option<&Arc<WebRTCState>>,
115        hash: &[u8; 32],
116    ) -> Result<Vec<u8>> {
117        // Check local storage first
118        if let Some(data) = store.get_chunk(hash)? {
119            return Ok(data);
120        }
121
122        // Fetch remotely and store
123        let hash_hex = to_hex(hash);
124        let data = self.fetch_chunk(webrtc_state, &hash_hex).await?;
125        store.put_blob(&data)?;
126        Ok(data)
127    }
128
129    /// Fetch an entire tree (all chunks recursively) - sequential version
130    /// Returns (chunks_fetched, bytes_fetched)
131    pub async fn fetch_tree(
132        &self,
133        store: &HashtreeStore,
134        webrtc_state: Option<&Arc<WebRTCState>>,
135        root_hash: &[u8; 32],
136    ) -> Result<(usize, u64)> {
137        self.fetch_tree_parallel(store, webrtc_state, root_hash, 1).await
138    }
139
140    /// Fetch an entire tree with parallel downloads
141    /// Uses work-stealing: always keeps `concurrency` requests in flight
142    /// Returns (chunks_fetched, bytes_fetched)
143    pub async fn fetch_tree_parallel(
144        &self,
145        store: &HashtreeStore,
146        webrtc_state: Option<&Arc<WebRTCState>>,
147        root_hash: &[u8; 32],
148        concurrency: usize,
149    ) -> Result<(usize, u64)> {
150        use futures::stream::{FuturesUnordered, StreamExt};
151        use std::collections::HashSet;
152        use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
153
154        // Check if we already have the root
155        if store.blob_exists(root_hash)? {
156            return Ok((0, 0));
157        }
158
159        let chunks_fetched = Arc::new(AtomicUsize::new(0));
160        let bytes_fetched = Arc::new(AtomicU64::new(0));
161
162        // Track what we've queued to avoid duplicates
163        let mut queued: HashSet<[u8; 32]> = HashSet::new();
164        let mut pending: VecDeque<[u8; 32]> = VecDeque::new();
165
166        // Seed with root
167        pending.push_back(*root_hash);
168        queued.insert(*root_hash);
169
170        let mut active = FuturesUnordered::new();
171
172        loop {
173            // Fill up to concurrency limit from pending queue
174            while active.len() < concurrency {
175                if let Some(hash) = pending.pop_front() {
176                    // Skip if we already have it locally
177                    if store.blob_exists(&hash).unwrap_or(false) {
178                        continue;
179                    }
180
181                    let hash_hex = to_hex(&hash);
182                    let blossom = self.blossom.clone();
183                    let webrtc = webrtc_state.map(Arc::clone);
184                    let timeout = self.config.webrtc_timeout;
185
186                    let fut = async move {
187                        // Try WebRTC first
188                        if let Some(state) = &webrtc {
189                            if let Ok(Some(data)) = tokio::time::timeout(
190                                timeout,
191                                state.request_from_peers(&hash_hex),
192                            )
193                            .await
194                            {
195                                return (hash, Ok(data));
196                            }
197                        }
198                        // Fallback to Blossom
199                        let data = blossom.download(&hash_hex).await;
200                        (hash, data)
201                    };
202                    active.push(fut);
203                } else {
204                    break;
205                }
206            }
207
208            // If nothing active, we're done
209            if active.is_empty() {
210                break;
211            }
212
213            // Wait for any download to complete
214            if let Some((hash, result)) = active.next().await {
215                match result {
216                    Ok(data) => {
217                        // Store it
218                        store.put_blob(&data)?;
219                        chunks_fetched.fetch_add(1, Ordering::Relaxed);
220                        bytes_fetched.fetch_add(data.len() as u64, Ordering::Relaxed);
221
222                        // Parse as tree node and queue children
223                        if let Ok(node) = decode_tree_node(&data) {
224                            for link in node.links {
225                                if !queued.contains(&link.hash) {
226                                    queued.insert(link.hash);
227                                    pending.push_back(link.hash);
228                                }
229                            }
230                        }
231                    }
232                    Err(e) => {
233                        debug!("Failed to fetch {}: {}", to_hex(&hash), e);
234                        // Continue with other chunks - don't fail the whole tree
235                    }
236                }
237            }
238        }
239
240        Ok((
241            chunks_fetched.load(Ordering::Relaxed),
242            bytes_fetched.load(Ordering::Relaxed),
243        ))
244    }
245
246    /// Fetch a file by hash, fetching all chunks if needed
247    /// Returns the complete file content
248    pub async fn fetch_file(
249        &self,
250        store: &HashtreeStore,
251        webrtc_state: Option<&Arc<WebRTCState>>,
252        hash: &[u8; 32],
253    ) -> Result<Option<Vec<u8>>> {
254        // First, try to get from local storage
255        if let Some(content) = store.get_file(hash)? {
256            return Ok(Some(content));
257        }
258
259        // Fetch the tree
260        self.fetch_tree(store, webrtc_state, hash).await?;
261
262        // Now try to read the file
263        store.get_file(hash)
264    }
265
266    /// Fetch a directory listing, fetching chunks if needed
267    pub async fn fetch_directory(
268        &self,
269        store: &HashtreeStore,
270        webrtc_state: Option<&Arc<WebRTCState>>,
271        hash: &[u8; 32],
272    ) -> Result<Option<crate::storage::DirectoryListing>> {
273        // First, try to get from local storage
274        if let Ok(Some(listing)) = store.get_directory_listing(hash) {
275            return Ok(Some(listing));
276        }
277
278        // Fetch the tree
279        self.fetch_tree(store, webrtc_state, hash).await?;
280
281        // Now try to get the directory listing
282        store.get_directory_listing(hash)
283    }
284
285    /// Upload data to Blossom servers
286    pub async fn upload(&self, data: &[u8]) -> Result<String> {
287        self.blossom
288            .upload(data)
289            .await
290            .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
291    }
292
293    /// Upload data if it doesn't already exist
294    pub async fn upload_if_missing(&self, data: &[u8]) -> Result<(String, bool)> {
295        self.blossom
296            .upload_if_missing(data)
297            .await
298            .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
299    }
300}