hashtree_cli/
fetch.rs

1//! Remote content fetching with WebRTC and Blossom fallback
2//!
3//! Provides shared logic for fetching content from:
4//! 1. Local storage (first)
5//! 2. WebRTC peers (second)
6//! 3. Blossom HTTP servers (fallback)
7
8use anyhow::Result;
9use hashtree_blossom::BlossomClient;
10use hashtree_core::{decode_tree_node, to_hex};
11use nostr::Keys;
12use std::collections::VecDeque;
13use std::sync::Arc;
14use std::time::Duration;
15use tracing::debug;
16
17use crate::storage::HashtreeStore;
18use crate::webrtc::WebRTCState;
19
20/// Configuration for remote fetching
21#[derive(Clone)]
22pub struct FetchConfig {
23    /// Timeout for WebRTC requests
24    pub webrtc_timeout: Duration,
25    /// Timeout for Blossom requests
26    pub blossom_timeout: Duration,
27}
28
29impl Default for FetchConfig {
30    fn default() -> Self {
31        Self {
32            webrtc_timeout: Duration::from_millis(2000),
33            blossom_timeout: Duration::from_millis(10000),
34        }
35    }
36}
37
38/// Fetcher for remote content
39pub struct Fetcher {
40    config: FetchConfig,
41    blossom: BlossomClient,
42}
43
44impl Fetcher {
45    /// Create a new fetcher with the given config
46    /// BlossomClient auto-loads servers from ~/.hashtree/config.toml
47    pub fn new(config: FetchConfig) -> Self {
48        // Generate ephemeral keys for downloads (no signing needed)
49        let keys = Keys::generate();
50        let blossom = BlossomClient::new(keys)
51            .with_timeout(config.blossom_timeout);
52
53        Self { config, blossom }
54    }
55
56    /// Create a new fetcher with specific keys (for authenticated uploads)
57    pub fn with_keys(config: FetchConfig, keys: Keys) -> Self {
58        let blossom = BlossomClient::new(keys)
59            .with_timeout(config.blossom_timeout);
60
61        Self { config, blossom }
62    }
63
64    /// Get the underlying BlossomClient
65    pub fn blossom(&self) -> &BlossomClient {
66        &self.blossom
67    }
68
69    /// Fetch a single chunk by hash, trying WebRTC first then Blossom
70    pub async fn fetch_chunk(
71        &self,
72        webrtc_state: Option<&Arc<WebRTCState>>,
73        hash_hex: &str,
74    ) -> Result<Vec<u8>> {
75        let short_hash = if hash_hex.len() >= 12 {
76            &hash_hex[..12]
77        } else {
78            hash_hex
79        };
80
81        // Try WebRTC first
82        if let Some(state) = webrtc_state {
83            debug!("Trying WebRTC for {}", short_hash);
84            let webrtc_result = tokio::time::timeout(
85                self.config.webrtc_timeout,
86                state.request_from_peers(hash_hex),
87            )
88            .await;
89
90            if let Ok(Some(data)) = webrtc_result {
91                debug!("Got {} from WebRTC ({} bytes)", short_hash, data.len());
92                return Ok(data);
93            }
94        }
95
96        // Fallback to Blossom
97        debug!("Trying Blossom for {}", short_hash);
98        match self.blossom.download(hash_hex).await {
99            Ok(data) => {
100                debug!("Got {} from Blossom ({} bytes)", short_hash, data.len());
101                Ok(data)
102            }
103            Err(e) => {
104                debug!("Blossom download failed for {}: {}", short_hash, e);
105                Err(anyhow::anyhow!("Failed to fetch {} from any source: {}", short_hash, e))
106            }
107        }
108    }
109
110    /// Fetch a chunk, checking local storage first
111    pub async fn fetch_chunk_with_store(
112        &self,
113        store: &HashtreeStore,
114        webrtc_state: Option<&Arc<WebRTCState>>,
115        hash: &[u8; 32],
116    ) -> Result<Vec<u8>> {
117        // Check local storage first
118        if let Some(data) = store.get_chunk(hash)? {
119            return Ok(data);
120        }
121
122        // Fetch remotely and store
123        let hash_hex = to_hex(hash);
124        let data = self.fetch_chunk(webrtc_state, &hash_hex).await?;
125        store.put_blob(&data)?;
126        Ok(data)
127    }
128
129    /// Fetch an entire tree (all chunks recursively) - sequential version
130    /// Returns (chunks_fetched, bytes_fetched)
131    pub async fn fetch_tree(
132        &self,
133        store: &HashtreeStore,
134        webrtc_state: Option<&Arc<WebRTCState>>,
135        root_hash: &[u8; 32],
136    ) -> Result<(usize, u64)> {
137        self.fetch_tree_parallel(store, webrtc_state, root_hash, 1).await
138    }
139
140    /// Fetch an entire tree with parallel downloads
141    /// Uses work-stealing: always keeps `concurrency` requests in flight
142    /// Returns (chunks_fetched, bytes_fetched)
143    pub async fn fetch_tree_parallel(
144        &self,
145        store: &HashtreeStore,
146        webrtc_state: Option<&Arc<WebRTCState>>,
147        root_hash: &[u8; 32],
148        concurrency: usize,
149    ) -> Result<(usize, u64)> {
150        use futures::stream::{FuturesUnordered, StreamExt};
151        use std::collections::HashSet;
152        use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
153
154        // Check if we already have the root
155        if store.blob_exists(root_hash)? {
156            return Ok((0, 0));
157        }
158
159        let chunks_fetched = Arc::new(AtomicUsize::new(0));
160        let bytes_fetched = Arc::new(AtomicU64::new(0));
161
162        // Track what we've queued to avoid duplicates
163        let mut queued: HashSet<[u8; 32]> = HashSet::new();
164        let mut pending: VecDeque<[u8; 32]> = VecDeque::new();
165
166        // Seed with root
167        pending.push_back(*root_hash);
168        queued.insert(*root_hash);
169
170        let mut active = FuturesUnordered::new();
171
172        loop {
173            // Fill up to concurrency limit from pending queue
174            while active.len() < concurrency {
175                if let Some(hash) = pending.pop_front() {
176                    // Skip if we already have it locally
177                    if store.blob_exists(&hash).unwrap_or(false) {
178                        continue;
179                    }
180
181                    let hash_hex = to_hex(&hash);
182                    let blossom = &self.blossom;
183
184                    let fut = async move {
185                        let data = blossom.download(&hash_hex).await;
186                        (hash, data)
187                    };
188                    active.push(fut);
189                } else {
190                    break;
191                }
192            }
193
194            // If nothing active, we're done
195            if active.is_empty() {
196                break;
197            }
198
199            // Wait for any download to complete
200            if let Some((hash, result)) = active.next().await {
201                match result {
202                    Ok(data) => {
203                        // Store it
204                        store.put_blob(&data)?;
205                        chunks_fetched.fetch_add(1, Ordering::Relaxed);
206                        bytes_fetched.fetch_add(data.len() as u64, Ordering::Relaxed);
207
208                        // Parse as tree node and queue children
209                        if let Ok(node) = decode_tree_node(&data) {
210                            for link in node.links {
211                                if !queued.contains(&link.hash) {
212                                    queued.insert(link.hash);
213                                    pending.push_back(link.hash);
214                                }
215                            }
216                        }
217                    }
218                    Err(e) => {
219                        debug!("Failed to fetch {}: {}", to_hex(&hash), e);
220                        // Continue with other chunks - don't fail the whole tree
221                    }
222                }
223            }
224        }
225
226        Ok((
227            chunks_fetched.load(Ordering::Relaxed),
228            bytes_fetched.load(Ordering::Relaxed),
229        ))
230    }
231
232    /// Fetch a file by hash, fetching all chunks if needed
233    /// Returns the complete file content
234    pub async fn fetch_file(
235        &self,
236        store: &HashtreeStore,
237        webrtc_state: Option<&Arc<WebRTCState>>,
238        hash: &[u8; 32],
239    ) -> Result<Option<Vec<u8>>> {
240        // First, try to get from local storage
241        if let Some(content) = store.get_file(hash)? {
242            return Ok(Some(content));
243        }
244
245        // Fetch the tree
246        self.fetch_tree(store, webrtc_state, hash).await?;
247
248        // Now try to read the file
249        store.get_file(hash)
250    }
251
252    /// Fetch a directory listing, fetching chunks if needed
253    pub async fn fetch_directory(
254        &self,
255        store: &HashtreeStore,
256        webrtc_state: Option<&Arc<WebRTCState>>,
257        hash: &[u8; 32],
258    ) -> Result<Option<crate::storage::DirectoryListing>> {
259        // First, try to get from local storage
260        if let Ok(Some(listing)) = store.get_directory_listing(hash) {
261            return Ok(Some(listing));
262        }
263
264        // Fetch the tree
265        self.fetch_tree(store, webrtc_state, hash).await?;
266
267        // Now try to get the directory listing
268        store.get_directory_listing(hash)
269    }
270
271    /// Upload data to Blossom servers
272    pub async fn upload(&self, data: &[u8]) -> Result<String> {
273        self.blossom
274            .upload(data)
275            .await
276            .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
277    }
278
279    /// Upload data if it doesn't already exist
280    pub async fn upload_if_missing(&self, data: &[u8]) -> Result<(String, bool)> {
281        self.blossom
282            .upload_if_missing(data)
283            .await
284            .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
285    }
286}