1use anyhow::Result;
9use hashtree_blossom::BlossomClient;
10use hashtree_core::{decode_tree_node, to_hex};
11use nostr::Keys;
12use std::collections::VecDeque;
13use std::sync::Arc;
14use std::time::Duration;
15use tracing::debug;
16
17use crate::storage::HashtreeStore;
18use crate::webrtc::WebRTCState;
19
20#[derive(Clone)]
22pub struct FetchConfig {
23 pub webrtc_timeout: Duration,
25 pub blossom_timeout: Duration,
27}
28
29impl Default for FetchConfig {
30 fn default() -> Self {
31 Self {
32 webrtc_timeout: Duration::from_millis(2000),
33 blossom_timeout: Duration::from_millis(10000),
34 }
35 }
36}
37
38pub struct Fetcher {
40 config: FetchConfig,
41 blossom: BlossomClient,
42}
43
44impl Fetcher {
45 pub fn new(config: FetchConfig) -> Self {
48 let keys = Keys::generate();
50 let blossom = BlossomClient::new(keys)
51 .with_timeout(config.blossom_timeout);
52
53 Self { config, blossom }
54 }
55
56 pub fn with_keys(config: FetchConfig, keys: Keys) -> Self {
58 let blossom = BlossomClient::new(keys)
59 .with_timeout(config.blossom_timeout);
60
61 Self { config, blossom }
62 }
63
64 pub fn blossom(&self) -> &BlossomClient {
66 &self.blossom
67 }
68
69 pub async fn fetch_chunk(
71 &self,
72 webrtc_state: Option<&Arc<WebRTCState>>,
73 hash_hex: &str,
74 ) -> Result<Vec<u8>> {
75 let short_hash = if hash_hex.len() >= 12 {
76 &hash_hex[..12]
77 } else {
78 hash_hex
79 };
80
81 if let Some(state) = webrtc_state {
83 debug!("Trying WebRTC for {}", short_hash);
84 let webrtc_result = tokio::time::timeout(
85 self.config.webrtc_timeout,
86 state.request_from_peers(hash_hex),
87 )
88 .await;
89
90 if let Ok(Some(data)) = webrtc_result {
91 debug!("Got {} from WebRTC ({} bytes)", short_hash, data.len());
92 return Ok(data);
93 }
94 }
95
96 debug!("Trying Blossom for {}", short_hash);
98 match self.blossom.download(hash_hex).await {
99 Ok(data) => {
100 debug!("Got {} from Blossom ({} bytes)", short_hash, data.len());
101 Ok(data)
102 }
103 Err(e) => {
104 debug!("Blossom download failed for {}: {}", short_hash, e);
105 Err(anyhow::anyhow!("Failed to fetch {} from any source: {}", short_hash, e))
106 }
107 }
108 }
109
110 pub async fn fetch_chunk_with_store(
112 &self,
113 store: &HashtreeStore,
114 webrtc_state: Option<&Arc<WebRTCState>>,
115 hash: &[u8; 32],
116 ) -> Result<Vec<u8>> {
117 if let Some(data) = store.get_chunk(hash)? {
119 return Ok(data);
120 }
121
122 let hash_hex = to_hex(hash);
124 let data = self.fetch_chunk(webrtc_state, &hash_hex).await?;
125 store.put_blob(&data)?;
126 Ok(data)
127 }
128
129 pub async fn fetch_tree(
132 &self,
133 store: &HashtreeStore,
134 webrtc_state: Option<&Arc<WebRTCState>>,
135 root_hash: &[u8; 32],
136 ) -> Result<(usize, u64)> {
137 self.fetch_tree_parallel(store, webrtc_state, root_hash, 1).await
138 }
139
140 pub async fn fetch_tree_parallel(
144 &self,
145 store: &HashtreeStore,
146 webrtc_state: Option<&Arc<WebRTCState>>,
147 root_hash: &[u8; 32],
148 concurrency: usize,
149 ) -> Result<(usize, u64)> {
150 use futures::stream::{FuturesUnordered, StreamExt};
151 use std::collections::HashSet;
152 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
153
154 if store.blob_exists(root_hash)? {
156 return Ok((0, 0));
157 }
158
159 let chunks_fetched = Arc::new(AtomicUsize::new(0));
160 let bytes_fetched = Arc::new(AtomicU64::new(0));
161
162 let mut queued: HashSet<[u8; 32]> = HashSet::new();
164 let mut pending: VecDeque<[u8; 32]> = VecDeque::new();
165
166 pending.push_back(*root_hash);
168 queued.insert(*root_hash);
169
170 let mut active = FuturesUnordered::new();
171
172 loop {
173 while active.len() < concurrency {
175 if let Some(hash) = pending.pop_front() {
176 if store.blob_exists(&hash).unwrap_or(false) {
178 continue;
179 }
180
181 let hash_hex = to_hex(&hash);
182 let blossom = self.blossom.clone();
183 let webrtc = webrtc_state.map(Arc::clone);
184 let timeout = self.config.webrtc_timeout;
185
186 let fut = async move {
187 if let Some(state) = &webrtc {
189 if let Ok(Some(data)) = tokio::time::timeout(
190 timeout,
191 state.request_from_peers(&hash_hex),
192 )
193 .await
194 {
195 return (hash, Ok(data));
196 }
197 }
198 let data = blossom.download(&hash_hex).await;
200 (hash, data)
201 };
202 active.push(fut);
203 } else {
204 break;
205 }
206 }
207
208 if active.is_empty() {
210 break;
211 }
212
213 if let Some((hash, result)) = active.next().await {
215 match result {
216 Ok(data) => {
217 store.put_blob(&data)?;
219 chunks_fetched.fetch_add(1, Ordering::Relaxed);
220 bytes_fetched.fetch_add(data.len() as u64, Ordering::Relaxed);
221
222 if let Ok(node) = decode_tree_node(&data) {
224 for link in node.links {
225 if !queued.contains(&link.hash) {
226 queued.insert(link.hash);
227 pending.push_back(link.hash);
228 }
229 }
230 }
231 }
232 Err(e) => {
233 debug!("Failed to fetch {}: {}", to_hex(&hash), e);
234 }
236 }
237 }
238 }
239
240 Ok((
241 chunks_fetched.load(Ordering::Relaxed),
242 bytes_fetched.load(Ordering::Relaxed),
243 ))
244 }
245
246 pub async fn fetch_file(
249 &self,
250 store: &HashtreeStore,
251 webrtc_state: Option<&Arc<WebRTCState>>,
252 hash: &[u8; 32],
253 ) -> Result<Option<Vec<u8>>> {
254 if let Some(content) = store.get_file(hash)? {
256 return Ok(Some(content));
257 }
258
259 self.fetch_tree(store, webrtc_state, hash).await?;
261
262 store.get_file(hash)
264 }
265
266 pub async fn fetch_directory(
268 &self,
269 store: &HashtreeStore,
270 webrtc_state: Option<&Arc<WebRTCState>>,
271 hash: &[u8; 32],
272 ) -> Result<Option<crate::storage::DirectoryListing>> {
273 if let Ok(Some(listing)) = store.get_directory_listing(hash) {
275 return Ok(Some(listing));
276 }
277
278 self.fetch_tree(store, webrtc_state, hash).await?;
280
281 store.get_directory_listing(hash)
283 }
284
285 pub async fn upload(&self, data: &[u8]) -> Result<String> {
287 self.blossom
288 .upload(data)
289 .await
290 .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
291 }
292
293 pub async fn upload_if_missing(&self, data: &[u8]) -> Result<(String, bool)> {
295 self.blossom
296 .upload_if_missing(data)
297 .await
298 .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
299 }
300}