1use anyhow::Result;
9use hashtree_blossom::BlossomClient;
10use hashtree_config::detect_local_daemon_url;
11use hashtree_core::{to_hex, Cid, HashTree, HashTreeConfig, Link};
12use nostr::Keys;
13use std::collections::{HashSet, VecDeque};
14use std::sync::Arc;
15use std::time::Duration;
16use tracing::debug;
17
18use crate::config::Config as CliConfig;
19use crate::storage::HashtreeStore;
20use crate::webrtc::WebRTCState;
21
22fn child_cid(parent: &Cid, link: &Link) -> Cid {
23 let inherits_parent_key = link
24 .name
25 .as_deref()
26 .map(|name| {
27 name.starts_with("_chunk_")
28 || (name.starts_with('_') && name.chars().count() == 2 && link.link_type.is_tree())
29 })
30 .unwrap_or(false);
31
32 Cid {
33 hash: link.hash,
34 key: link.key.or(if inherits_parent_key {
35 parent.key
36 } else {
37 None
38 }),
39 }
40}
41
42#[derive(Clone)]
44pub struct FetchConfig {
45 pub webrtc_timeout: Duration,
47 pub blossom_timeout: Duration,
49}
50
51impl Default for FetchConfig {
52 fn default() -> Self {
53 Self {
54 webrtc_timeout: Duration::from_millis(2000),
55 blossom_timeout: Duration::from_millis(10000),
56 }
57 }
58}
59
60pub struct Fetcher {
62 config: FetchConfig,
63 blossom: BlossomClient,
64}
65
66impl Fetcher {
67 pub fn new(config: FetchConfig) -> Self {
70 let keys = Keys::generate();
72 let blossom = BlossomClient::new(keys).with_timeout(config.blossom_timeout);
73 let blossom = with_local_daemon_read(blossom);
74
75 Self { config, blossom }
76 }
77
78 pub fn with_keys(config: FetchConfig, keys: Keys) -> Self {
80 let blossom = BlossomClient::new(keys).with_timeout(config.blossom_timeout);
81 let blossom = with_local_daemon_read(blossom);
82
83 Self { config, blossom }
84 }
85
86 pub fn blossom(&self) -> &BlossomClient {
88 &self.blossom
89 }
90
91 pub async fn fetch_chunk(
93 &self,
94 webrtc_state: Option<&Arc<WebRTCState>>,
95 hash_hex: &str,
96 ) -> Result<Vec<u8>> {
97 let short_hash = if hash_hex.len() >= 12 {
98 &hash_hex[..12]
99 } else {
100 hash_hex
101 };
102
103 if let Some(state) = webrtc_state {
105 debug!("Trying WebRTC for {}", short_hash);
106 let webrtc_result = tokio::time::timeout(
107 self.config.webrtc_timeout,
108 state.request_from_peers(hash_hex),
109 )
110 .await;
111
112 if let Ok(Some(data)) = webrtc_result {
113 debug!("Got {} from WebRTC ({} bytes)", short_hash, data.len());
114 return Ok(data);
115 }
116 }
117
118 debug!("Trying Blossom for {}", short_hash);
120 match self.blossom.download(hash_hex).await {
121 Ok(data) => {
122 debug!("Got {} from Blossom ({} bytes)", short_hash, data.len());
123 Ok(data)
124 }
125 Err(e) => {
126 debug!("Blossom download failed for {}: {}", short_hash, e);
127 Err(anyhow::anyhow!(
128 "Failed to fetch {} from any source: {}",
129 short_hash,
130 e
131 ))
132 }
133 }
134 }
135
136 pub async fn fetch_chunk_with_store(
138 &self,
139 store: &HashtreeStore,
140 webrtc_state: Option<&Arc<WebRTCState>>,
141 hash: &[u8; 32],
142 ) -> Result<Vec<u8>> {
143 if let Some(data) = store.get_chunk(hash)? {
145 return Ok(data);
146 }
147
148 let hash_hex = to_hex(hash);
150 let data = self.fetch_chunk(webrtc_state, &hash_hex).await?;
151 store.put_blob(&data)?;
152 Ok(data)
153 }
154
155 pub async fn fetch_tree(
158 &self,
159 store: &HashtreeStore,
160 webrtc_state: Option<&Arc<WebRTCState>>,
161 root_hash: &[u8; 32],
162 ) -> Result<(usize, u64)> {
163 self.fetch_cid_tree(store, webrtc_state, &Cid::public(*root_hash))
164 .await
165 }
166
167 pub async fn fetch_cid_tree(
169 &self,
170 store: &HashtreeStore,
171 webrtc_state: Option<&Arc<WebRTCState>>,
172 root_cid: &Cid,
173 ) -> Result<(usize, u64)> {
174 self.fetch_cid_tree_parallel(store, webrtc_state, root_cid, 1)
175 .await
176 }
177
178 pub async fn fetch_tree_parallel(
182 &self,
183 store: &HashtreeStore,
184 webrtc_state: Option<&Arc<WebRTCState>>,
185 root_hash: &[u8; 32],
186 concurrency: usize,
187 ) -> Result<(usize, u64)> {
188 self.fetch_cid_tree_parallel(store, webrtc_state, &Cid::public(*root_hash), concurrency)
189 .await
190 }
191
192 pub async fn fetch_cid_tree_parallel(
194 &self,
195 store: &HashtreeStore,
196 webrtc_state: Option<&Arc<WebRTCState>>,
197 root_cid: &Cid,
198 concurrency: usize,
199 ) -> Result<(usize, u64)> {
200 use futures::stream::{FuturesUnordered, StreamExt};
201 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
202
203 let chunks_fetched = Arc::new(AtomicUsize::new(0));
204 let bytes_fetched = Arc::new(AtomicU64::new(0));
205 let mut queued: HashSet<[u8; 32]> = HashSet::new();
206 let mut pending: VecDeque<Cid> = VecDeque::new();
207
208 pending.push_back(root_cid.clone());
209 queued.insert(root_cid.hash);
210
211 let mut active = FuturesUnordered::new();
212 let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
213
214 loop {
215 while active.len() < concurrency {
217 if let Some(cid) = pending.pop_front() {
218 if store.blob_exists(&cid.hash).unwrap_or(false) {
219 if let Some(node) = tree.get_node(&cid).await? {
220 for link in node.links {
221 let child = child_cid(&cid, &link);
222 if queued.insert(child.hash) {
223 pending.push_back(child);
224 }
225 }
226 }
227 continue;
228 }
229
230 let hash_hex = to_hex(&cid.hash);
231 let blossom = self.blossom.clone();
232 let webrtc = webrtc_state.map(Arc::clone);
233 let timeout = self.config.webrtc_timeout;
234
235 let fut = async move {
236 if let Some(state) = &webrtc {
238 if let Ok(Some(data)) =
239 tokio::time::timeout(timeout, state.request_from_peers(&hash_hex))
240 .await
241 {
242 return (cid, Ok(data));
243 }
244 }
245 let data = blossom.download(&hash_hex).await;
247 (cid, data)
248 };
249 active.push(fut);
250 } else {
251 break;
252 }
253 }
254
255 if active.is_empty() {
257 break;
258 }
259
260 if let Some((cid, result)) = active.next().await {
262 match result {
263 Ok(data) => {
264 store.put_blob(&data)?;
266 chunks_fetched.fetch_add(1, Ordering::Relaxed);
267 bytes_fetched.fetch_add(data.len() as u64, Ordering::Relaxed);
268
269 if let Some(node) = tree.get_node(&cid).await? {
270 for link in node.links {
271 let child = child_cid(&cid, &link);
272 if queued.insert(child.hash) {
273 pending.push_back(child);
274 }
275 }
276 }
277 }
278 Err(e) => {
279 debug!("Failed to fetch {}: {}", to_hex(&cid.hash), e);
280 }
282 }
283 }
284 }
285
286 Ok((
287 chunks_fetched.load(Ordering::Relaxed),
288 bytes_fetched.load(Ordering::Relaxed),
289 ))
290 }
291
292 pub async fn fetch_file(
295 &self,
296 store: &HashtreeStore,
297 webrtc_state: Option<&Arc<WebRTCState>>,
298 hash: &[u8; 32],
299 ) -> Result<Option<Vec<u8>>> {
300 if let Some(content) = store.get_file(hash)? {
302 return Ok(Some(content));
303 }
304
305 self.fetch_tree(store, webrtc_state, hash).await?;
307
308 store.get_file(hash)
310 }
311
312 pub async fn fetch_directory(
314 &self,
315 store: &HashtreeStore,
316 webrtc_state: Option<&Arc<WebRTCState>>,
317 hash: &[u8; 32],
318 ) -> Result<Option<crate::storage::DirectoryListing>> {
319 if let Ok(Some(listing)) = store.get_directory_listing(hash) {
321 return Ok(Some(listing));
322 }
323
324 self.fetch_tree(store, webrtc_state, hash).await?;
326
327 store.get_directory_listing(hash)
329 }
330
331 pub async fn upload(&self, data: &[u8]) -> Result<String> {
333 self.blossom
334 .upload(data)
335 .await
336 .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
337 }
338
339 pub async fn upload_if_missing(&self, data: &[u8]) -> Result<(String, bool)> {
341 self.blossom
342 .upload_if_missing(data)
343 .await
344 .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
345 }
346}
347
348fn with_local_daemon_read(blossom: BlossomClient) -> BlossomClient {
349 let bind_address = CliConfig::load().ok().map(|cfg| cfg.server.bind_address);
350 let local_url = detect_local_daemon_url(bind_address.as_deref());
351 let Some(local_url) = local_url else {
352 return blossom;
353 };
354
355 let mut servers = blossom.read_servers().to_vec();
356 if servers.iter().any(|server| server == &local_url) {
357 return blossom;
358 }
359 servers.insert(0, local_url);
360 blossom.with_read_servers(servers)
361}