1use anyhow::Result;
9use hashtree_blossom::BlossomClient;
10use hashtree_config::detect_local_daemon_url;
11use hashtree_core::{decode_tree_node, to_hex};
12use nostr::Keys;
13use std::collections::VecDeque;
14use std::sync::Arc;
15use std::time::Duration;
16use tracing::debug;
17
18use crate::config::Config as CliConfig;
19use crate::storage::HashtreeStore;
20use crate::webrtc::WebRTCState;
21
22#[derive(Clone)]
24pub struct FetchConfig {
25 pub webrtc_timeout: Duration,
27 pub blossom_timeout: Duration,
29}
30
31impl Default for FetchConfig {
32 fn default() -> Self {
33 Self {
34 webrtc_timeout: Duration::from_millis(2000),
35 blossom_timeout: Duration::from_millis(10000),
36 }
37 }
38}
39
40pub struct Fetcher {
42 config: FetchConfig,
43 blossom: BlossomClient,
44}
45
46impl Fetcher {
47 pub fn new(config: FetchConfig) -> Self {
50 let keys = Keys::generate();
52 let blossom = BlossomClient::new(keys).with_timeout(config.blossom_timeout);
53 let blossom = with_local_daemon_read(blossom);
54
55 Self { config, blossom }
56 }
57
58 pub fn with_keys(config: FetchConfig, keys: Keys) -> Self {
60 let blossom = BlossomClient::new(keys).with_timeout(config.blossom_timeout);
61 let blossom = with_local_daemon_read(blossom);
62
63 Self { config, blossom }
64 }
65
66 pub fn blossom(&self) -> &BlossomClient {
68 &self.blossom
69 }
70
71 pub async fn fetch_chunk(
73 &self,
74 webrtc_state: Option<&Arc<WebRTCState>>,
75 hash_hex: &str,
76 ) -> Result<Vec<u8>> {
77 let short_hash = if hash_hex.len() >= 12 {
78 &hash_hex[..12]
79 } else {
80 hash_hex
81 };
82
83 if let Some(state) = webrtc_state {
85 debug!("Trying WebRTC for {}", short_hash);
86 let webrtc_result = tokio::time::timeout(
87 self.config.webrtc_timeout,
88 state.request_from_peers(hash_hex),
89 )
90 .await;
91
92 if let Ok(Some(data)) = webrtc_result {
93 debug!("Got {} from WebRTC ({} bytes)", short_hash, data.len());
94 return Ok(data);
95 }
96 }
97
98 debug!("Trying Blossom for {}", short_hash);
100 match self.blossom.download(hash_hex).await {
101 Ok(data) => {
102 debug!("Got {} from Blossom ({} bytes)", short_hash, data.len());
103 Ok(data)
104 }
105 Err(e) => {
106 debug!("Blossom download failed for {}: {}", short_hash, e);
107 Err(anyhow::anyhow!(
108 "Failed to fetch {} from any source: {}",
109 short_hash,
110 e
111 ))
112 }
113 }
114 }
115
116 pub async fn fetch_chunk_with_store(
118 &self,
119 store: &HashtreeStore,
120 webrtc_state: Option<&Arc<WebRTCState>>,
121 hash: &[u8; 32],
122 ) -> Result<Vec<u8>> {
123 if let Some(data) = store.get_chunk(hash)? {
125 return Ok(data);
126 }
127
128 let hash_hex = to_hex(hash);
130 let data = self.fetch_chunk(webrtc_state, &hash_hex).await?;
131 store.put_blob(&data)?;
132 Ok(data)
133 }
134
135 pub async fn fetch_tree(
138 &self,
139 store: &HashtreeStore,
140 webrtc_state: Option<&Arc<WebRTCState>>,
141 root_hash: &[u8; 32],
142 ) -> Result<(usize, u64)> {
143 self.fetch_tree_parallel(store, webrtc_state, root_hash, 1)
144 .await
145 }
146
147 pub async fn fetch_tree_parallel(
151 &self,
152 store: &HashtreeStore,
153 webrtc_state: Option<&Arc<WebRTCState>>,
154 root_hash: &[u8; 32],
155 concurrency: usize,
156 ) -> Result<(usize, u64)> {
157 use futures::stream::{FuturesUnordered, StreamExt};
158 use std::collections::HashSet;
159 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
160
161 if store.blob_exists(root_hash)? {
163 return Ok((0, 0));
164 }
165
166 let chunks_fetched = Arc::new(AtomicUsize::new(0));
167 let bytes_fetched = Arc::new(AtomicU64::new(0));
168
169 let mut queued: HashSet<[u8; 32]> = HashSet::new();
171 let mut pending: VecDeque<[u8; 32]> = VecDeque::new();
172
173 pending.push_back(*root_hash);
175 queued.insert(*root_hash);
176
177 let mut active = FuturesUnordered::new();
178
179 loop {
180 while active.len() < concurrency {
182 if let Some(hash) = pending.pop_front() {
183 if store.blob_exists(&hash).unwrap_or(false) {
185 continue;
186 }
187
188 let hash_hex = to_hex(&hash);
189 let blossom = self.blossom.clone();
190 let webrtc = webrtc_state.map(Arc::clone);
191 let timeout = self.config.webrtc_timeout;
192
193 let fut = async move {
194 if let Some(state) = &webrtc {
196 if let Ok(Some(data)) =
197 tokio::time::timeout(timeout, state.request_from_peers(&hash_hex))
198 .await
199 {
200 return (hash, Ok(data));
201 }
202 }
203 let data = blossom.download(&hash_hex).await;
205 (hash, data)
206 };
207 active.push(fut);
208 } else {
209 break;
210 }
211 }
212
213 if active.is_empty() {
215 break;
216 }
217
218 if let Some((hash, result)) = active.next().await {
220 match result {
221 Ok(data) => {
222 store.put_blob(&data)?;
224 chunks_fetched.fetch_add(1, Ordering::Relaxed);
225 bytes_fetched.fetch_add(data.len() as u64, Ordering::Relaxed);
226
227 if let Ok(node) = decode_tree_node(&data) {
229 for link in node.links {
230 if !queued.contains(&link.hash) {
231 queued.insert(link.hash);
232 pending.push_back(link.hash);
233 }
234 }
235 }
236 }
237 Err(e) => {
238 debug!("Failed to fetch {}: {}", to_hex(&hash), e);
239 }
241 }
242 }
243 }
244
245 Ok((
246 chunks_fetched.load(Ordering::Relaxed),
247 bytes_fetched.load(Ordering::Relaxed),
248 ))
249 }
250
251 pub async fn fetch_file(
254 &self,
255 store: &HashtreeStore,
256 webrtc_state: Option<&Arc<WebRTCState>>,
257 hash: &[u8; 32],
258 ) -> Result<Option<Vec<u8>>> {
259 if let Some(content) = store.get_file(hash)? {
261 return Ok(Some(content));
262 }
263
264 self.fetch_tree(store, webrtc_state, hash).await?;
266
267 store.get_file(hash)
269 }
270
271 pub async fn fetch_directory(
273 &self,
274 store: &HashtreeStore,
275 webrtc_state: Option<&Arc<WebRTCState>>,
276 hash: &[u8; 32],
277 ) -> Result<Option<crate::storage::DirectoryListing>> {
278 if let Ok(Some(listing)) = store.get_directory_listing(hash) {
280 return Ok(Some(listing));
281 }
282
283 self.fetch_tree(store, webrtc_state, hash).await?;
285
286 store.get_directory_listing(hash)
288 }
289
290 pub async fn upload(&self, data: &[u8]) -> Result<String> {
292 self.blossom
293 .upload(data)
294 .await
295 .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
296 }
297
298 pub async fn upload_if_missing(&self, data: &[u8]) -> Result<(String, bool)> {
300 self.blossom
301 .upload_if_missing(data)
302 .await
303 .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
304 }
305}
306
307fn with_local_daemon_read(blossom: BlossomClient) -> BlossomClient {
308 let bind_address = CliConfig::load().ok().map(|cfg| cfg.server.bind_address);
309 let local_url = detect_local_daemon_url(bind_address.as_deref());
310 let Some(local_url) = local_url else {
311 return blossom;
312 };
313
314 let mut servers = blossom.read_servers().to_vec();
315 if servers.iter().any(|server| server == &local_url) {
316 return blossom;
317 }
318 servers.insert(0, local_url);
319 blossom.with_read_servers(servers)
320}