hashtree_cli/
fetch.rs

1//! Remote content fetching with WebRTC and Blossom fallback
2//!
3//! Provides shared logic for fetching content from:
4//! 1. Local storage (first)
5//! 2. WebRTC peers (second)
6//! 3. Blossom HTTP servers (fallback)
7
8use anyhow::Result;
9use hashtree_core::{decode_tree_node, to_hex};
10use std::collections::VecDeque;
11use std::sync::Arc;
12use std::time::Duration;
13use tracing::debug;
14
15use crate::storage::HashtreeStore;
16use crate::webrtc::WebRTCState;
17
18/// Configuration for remote fetching
19#[derive(Clone)]
20pub struct FetchConfig {
21    /// Blossom servers for fallback
22    pub blossom_servers: Vec<String>,
23    /// Timeout for WebRTC requests
24    pub webrtc_timeout: Duration,
25    /// Timeout for Blossom requests
26    pub blossom_timeout: Duration,
27}
28
29impl Default for FetchConfig {
30    fn default() -> Self {
31        Self {
32            blossom_servers: vec!["https://blossom.iris.to".to_string()],
33            webrtc_timeout: Duration::from_millis(2000),
34            blossom_timeout: Duration::from_millis(10000),
35        }
36    }
37}
38
39/// Fetcher for remote content
40pub struct Fetcher {
41    config: FetchConfig,
42    http_client: reqwest::Client,
43}
44
45impl Fetcher {
46    /// Create a new fetcher with the given config
47    pub fn new(config: FetchConfig) -> Self {
48        Self {
49            config,
50            http_client: reqwest::Client::builder()
51                .timeout(Duration::from_secs(30))
52                .build()
53                .unwrap(),
54        }
55    }
56
57    /// Fetch a single chunk by hash, trying WebRTC first then Blossom
58    pub async fn fetch_chunk(
59        &self,
60        webrtc_state: Option<&Arc<WebRTCState>>,
61        hash_hex: &str,
62    ) -> Result<Vec<u8>> {
63        let short_hash = if hash_hex.len() >= 12 {
64            &hash_hex[..12]
65        } else {
66            hash_hex
67        };
68
69        // Try WebRTC first
70        if let Some(state) = webrtc_state {
71            debug!("Trying WebRTC for {}", short_hash);
72            let webrtc_result = tokio::time::timeout(
73                self.config.webrtc_timeout,
74                state.request_from_peers(hash_hex),
75            )
76            .await;
77
78            if let Ok(Some(data)) = webrtc_result {
79                debug!("Got {} from WebRTC ({} bytes)", short_hash, data.len());
80                return Ok(data);
81            }
82        }
83
84        // Fallback to Blossom
85        for server in &self.config.blossom_servers {
86            let url = format!("{}/{}", server.trim_end_matches('/'), hash_hex);
87            debug!("Trying Blossom {} for {}", server, short_hash);
88
89            let result = tokio::time::timeout(
90                self.config.blossom_timeout,
91                self.http_client.get(&url).send(),
92            )
93            .await;
94
95            match result {
96                Ok(Ok(response)) if response.status().is_success() => {
97                    if let Ok(data) = response.bytes().await {
98                        debug!("Got {} from Blossom ({} bytes)", short_hash, data.len());
99                        return Ok(data.to_vec());
100                    }
101                }
102                Ok(Ok(response)) => {
103                    debug!("Blossom {} returned {} for {}", server, response.status(), short_hash);
104                }
105                Ok(Err(e)) => {
106                    debug!("Blossom {} error for {}: {}", server, short_hash, e);
107                }
108                Err(_) => {
109                    debug!("Blossom {} timeout for {}", server, short_hash);
110                }
111            }
112        }
113
114        Err(anyhow::anyhow!("Failed to fetch {} from any source", short_hash))
115    }
116
117    /// Fetch a chunk, checking local storage first
118    pub async fn fetch_chunk_with_store(
119        &self,
120        store: &HashtreeStore,
121        webrtc_state: Option<&Arc<WebRTCState>>,
122        hash_hex: &str,
123    ) -> Result<Vec<u8>> {
124        // Check local storage first
125        if let Some(data) = store.get_chunk(hash_hex)? {
126            return Ok(data);
127        }
128
129        // Fetch remotely and store
130        let data = self.fetch_chunk(webrtc_state, hash_hex).await?;
131        store.put_blob(&data)?;
132        Ok(data)
133    }
134
135    /// Fetch an entire tree (all chunks recursively)
136    /// Returns (chunks_fetched, bytes_fetched)
137    pub async fn fetch_tree(
138        &self,
139        store: &HashtreeStore,
140        webrtc_state: Option<&Arc<WebRTCState>>,
141        root_hash: &[u8; 32],
142    ) -> Result<(usize, u64)> {
143        let mut chunks_fetched = 0usize;
144        let mut bytes_fetched = 0u64;
145
146        let root_hex = to_hex(root_hash);
147
148        // Check if we already have the root
149        if store.blob_exists(&root_hex)? {
150            return Ok((0, 0));
151        }
152
153        // BFS to fetch all chunks
154        let mut queue: VecDeque<[u8; 32]> = VecDeque::new();
155        queue.push_back(*root_hash);
156
157        while let Some(hash) = queue.pop_front() {
158            let hash_hex = to_hex(&hash);
159
160            // Check if we already have it
161            if store.blob_exists(&hash_hex)? {
162                continue;
163            }
164
165            // Fetch it
166            let data = self.fetch_chunk(webrtc_state, &hash_hex).await?;
167
168            // Store it
169            store.put_blob(&data)?;
170            chunks_fetched += 1;
171            bytes_fetched += data.len() as u64;
172
173            // Parse as tree node and queue children
174            if let Ok(node) = decode_tree_node(&data) {
175                for link in node.links {
176                    queue.push_back(link.hash);
177                }
178            }
179        }
180
181        Ok((chunks_fetched, bytes_fetched))
182    }
183
184    /// Fetch a file by hash, fetching all chunks if needed
185    /// Returns the complete file content
186    pub async fn fetch_file(
187        &self,
188        store: &HashtreeStore,
189        webrtc_state: Option<&Arc<WebRTCState>>,
190        hash_hex: &str,
191    ) -> Result<Option<Vec<u8>>> {
192        // First, try to get from local storage
193        if let Some(content) = store.get_file(hash_hex)? {
194            return Ok(Some(content));
195        }
196
197        // Parse hash
198        let hash = hashtree_core::from_hex(hash_hex)
199            .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
200
201        // Fetch the tree
202        self.fetch_tree(store, webrtc_state, &hash).await?;
203
204        // Now try to read the file
205        store.get_file(hash_hex)
206    }
207
208    /// Fetch a directory listing, fetching chunks if needed
209    pub async fn fetch_directory(
210        &self,
211        store: &HashtreeStore,
212        webrtc_state: Option<&Arc<WebRTCState>>,
213        hash_hex: &str,
214    ) -> Result<Option<crate::storage::DirectoryListing>> {
215        // First, try to get from local storage
216        if let Ok(Some(listing)) = store.get_directory_listing(hash_hex) {
217            return Ok(Some(listing));
218        }
219
220        // Parse hash
221        let hash = hashtree_core::from_hex(hash_hex)
222            .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
223
224        // Fetch the tree
225        self.fetch_tree(store, webrtc_state, &hash).await?;
226
227        // Now try to get the directory listing
228        store.get_directory_listing(hash_hex)
229    }
230}