1use anyhow::{Context, Result};
2use futures::stream::{self, StreamExt};
3use std::collections::HashSet;
4use std::path::Path;
5use std::sync::Arc;
6
7use crate::config::ensure_keys_string;
8use crate::fetch::{FetchConfig, Fetcher};
9use crate::HashtreeStore;
10use hashtree_core::{to_hex, Cid, HashTree, HashTreeConfig, Link};
11
12const BLOSSOM_PUSH_CONCURRENCY: usize = 16;
13const BLOSSOM_PUSH_PROGRESS_EVERY: usize = 512;
14
15fn parse_root_cid(cid_str: &str) -> Result<Cid> {
16 Cid::parse(cid_str).map_err(|e| anyhow::anyhow!("Invalid CID '{}': {}", cid_str, e))
17}
18
19fn child_cid(parent: &Cid, link: &Link) -> Cid {
20 let inherits_parent_key = link
21 .name
22 .as_deref()
23 .map(|name| {
24 name.starts_with("_chunk_")
25 || (name.starts_with('_') && name.chars().count() == 2 && link.link_type.is_tree())
26 })
27 .unwrap_or(false);
28
29 Cid {
30 hash: link.hash,
31 key: link.key.or(if inherits_parent_key {
32 parent.key
33 } else {
34 None
35 }),
36 }
37}
38
39async fn ensure_local_blob_for_push(
40 store: &HashtreeStore,
41 fetcher: Option<&Fetcher>,
42 cid: &Cid,
43) -> Result<()> {
44 if store.get_blob(&cid.hash)?.is_some() {
45 return Ok(());
46 }
47
48 if let Some(fetcher) = fetcher {
49 let data = fetcher
50 .fetch_chunk(None, &to_hex(&cid.hash))
51 .await
52 .with_context(|| format!("failed to hydrate missing local blob {}", cid))?;
53 store
54 .put_blob(&data)
55 .with_context(|| format!("failed to persist hydrated blob {}", cid))?;
56 if store.get_blob(&cid.hash)?.is_some() {
57 return Ok(());
58 }
59 }
60
61 anyhow::bail!("missing local blob while pushing DAG: {}", cid);
62}
63
64pub(crate) async fn collect_cids_for_push(
65 store: &HashtreeStore,
66 root_cid: Cid,
67 fetcher: Option<&Fetcher>,
68) -> Result<Vec<Cid>> {
69 let mut cids_to_push = Vec::new();
70 let mut visited: HashSet<[u8; 32]> = HashSet::new();
71 let mut queue = vec![root_cid];
72 let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
73
74 while let Some(cid) = queue.pop() {
75 if !visited.insert(cid.hash) {
76 continue;
77 }
78
79 ensure_local_blob_for_push(store, fetcher, &cid).await?;
80 cids_to_push.push(cid.clone());
81
82 let node = tree
83 .get_node(&cid)
84 .await
85 .map_err(|e| anyhow::anyhow!("Failed to inspect {}: {}", cid, e))?;
86
87 if let Some(node) = node {
88 for link in &node.links {
89 if !visited.contains(&link.hash) {
90 queue.push(child_cid(&cid, link));
91 }
92 }
93 }
94 }
95
96 Ok(cids_to_push)
97}
98
99fn matching_old_child<'a>(
100 old_links: &'a [Link],
101 new_index: usize,
102 new_link: &Link,
103) -> Option<&'a Link> {
104 if let Some(name) = new_link.name.as_deref() {
105 old_links
106 .iter()
107 .find(|old_link| old_link.name.as_deref() == Some(name))
108 } else {
109 old_links
110 .get(new_index)
111 .filter(|old_link| old_link.name.is_none())
112 }
113}
114
115pub(crate) async fn collect_incremental_cids_for_push(
116 store: &HashtreeStore,
117 root_cid: Cid,
118 previous_root_cid: Cid,
119 fetcher: Option<&Fetcher>,
120) -> Result<Vec<Cid>> {
121 let mut cids_to_push = Vec::new();
122 let mut visited_new: HashSet<[u8; 32]> = HashSet::new();
123 let mut queue = vec![(root_cid, Some(previous_root_cid))];
124 let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
125
126 while let Some((cid, old_cid)) = queue.pop() {
127 if old_cid.as_ref().is_some_and(|old| old.hash == cid.hash) {
128 continue;
129 }
130 if !visited_new.insert(cid.hash) {
131 continue;
132 }
133
134 ensure_local_blob_for_push(store, fetcher, &cid).await?;
135 cids_to_push.push(cid.clone());
136
137 let node = tree
138 .get_node(&cid)
139 .await
140 .map_err(|e| anyhow::anyhow!("Failed to inspect {}: {}", cid, e))?;
141 let Some(node) = node else {
142 continue;
143 };
144
145 let old_node = match old_cid.as_ref() {
146 Some(old_cid) => match tree.get_node(old_cid).await {
147 Ok(old_node) => old_node,
148 Err(err) => {
149 tracing::warn!(
150 "Failed to inspect previous Blossom DAG node {}; uploading changed subtree: {}",
151 old_cid,
152 err
153 );
154 None
155 }
156 },
157 None => None,
158 };
159
160 for (index, link) in node.links.iter().enumerate() {
161 let child = child_cid(&cid, link);
162 let old_child = old_node
163 .as_ref()
164 .and_then(|old_node| matching_old_child(&old_node.links, index, link))
165 .map(|old_link| child_cid(old_cid.as_ref().expect("old node has cid"), old_link));
166
167 if old_child
168 .as_ref()
169 .is_some_and(|old_child| old_child.hash == child.hash)
170 {
171 continue;
172 }
173 queue.push((child, old_child));
174 }
175 }
176
177 Ok(cids_to_push)
178}
179
180async fn upload_cids_with_client(
181 store: Arc<HashtreeStore>,
182 fetcher: Option<Arc<Fetcher>>,
183 client: hashtree_blossom::BlossomClient,
184 cids_to_push: Vec<Cid>,
185) -> Result<(usize, usize, usize, Option<String>)> {
186 let total = cids_to_push.len();
187 let mut total_uploaded = 0usize;
188 let mut total_skipped = 0usize;
189 let mut total_errors = 0usize;
190 let mut last_error = None;
191 let mut processed = 0usize;
192
193 let mut uploads = stream::iter(cids_to_push.into_iter().map(|cid| {
194 let store = Arc::clone(&store);
195 let fetcher = fetcher.clone();
196 let client = client.clone();
197 async move {
198 ensure_local_blob_for_push(store.as_ref(), fetcher.as_deref(), &cid).await?;
199 let data = store
200 .get_blob(&cid.hash)?
201 .ok_or_else(|| anyhow::anyhow!("missing local blob while uploading {}", cid))?;
202 client
203 .upload_if_missing(&data)
204 .await
205 .map_err(|error| anyhow::anyhow!(error.to_string()))
206 }
207 }))
208 .buffer_unordered(BLOSSOM_PUSH_CONCURRENCY);
209
210 while let Some(result) = uploads.next().await {
211 processed += 1;
212 match result {
213 Ok((_hash, was_uploaded)) => {
214 if was_uploaded {
215 total_uploaded += 1;
216 } else {
217 total_skipped += 1;
218 }
219 }
220 Err(error) => {
221 tracing::warn!("Blossom upload failed: {}", error);
222 total_errors += 1;
223 last_error = Some(error.to_string());
224 }
225 }
226
227 if processed % BLOSSOM_PUSH_PROGRESS_EVERY == 0 || processed == total {
228 println!(
229 " file servers: {processed}/{total} processed ({total_uploaded} uploaded, {total_skipped} already exist, {total_errors} failed)",
230 );
231 }
232 }
233
234 Ok((total_uploaded, total_skipped, total_errors, last_error))
235}
236
237pub async fn push_to_blossom(
239 data_dir: &Path,
240 cid_str: &str,
241 server_override: Option<String>,
242) -> Result<()> {
243 use hashtree_blossom::BlossomClient;
244 use nostr::Keys;
245
246 let (nsec_str, _) = ensure_keys_string()?;
247 let keys = Keys::parse(&nsec_str).context("Failed to parse nsec")?;
248
249 let client = if let Some(server) = server_override {
250 BlossomClient::new(keys).with_write_servers(vec![server])
251 } else {
252 BlossomClient::new(keys)
253 };
254
255 if client.write_servers().is_empty() {
256 anyhow::bail!(
257 "No file servers configured. Use --server or add write_servers to config.toml"
258 );
259 }
260
261 let store = Arc::new(HashtreeStore::new(data_dir)?);
262 let fetcher = Arc::new(Fetcher::new(FetchConfig::default()));
263
264 println!("Collecting blocks...");
265 let root_cid = parse_root_cid(cid_str)?;
266 let cids_to_push =
267 collect_cids_for_push(store.as_ref(), root_cid, Some(fetcher.as_ref())).await?;
268
269 println!("Found {} blocks to push", cids_to_push.len());
270 let (uploaded, skipped, errors, last_error) =
271 upload_cids_with_client(store, Some(fetcher), client, cids_to_push).await?;
272
273 println!(
274 "\nUploaded: {}, Skipped: {}, Errors: {}",
275 uploaded, skipped, errors
276 );
277 if let Some(last_error) = last_error {
278 eprintln!("Last error: {last_error}");
279 }
280 println!("Done!");
281 Ok(())
282}
283
284pub async fn background_blossom_push(
286 data_dir: &Path,
287 cid_str: &str,
288 servers: &[String],
289) -> Result<()> {
290 let store = Arc::new(HashtreeStore::new(data_dir)?);
291 background_blossom_push_with_store(store, cid_str, servers).await
292}
293
294pub async fn background_blossom_push_with_store(
295 store: Arc<HashtreeStore>,
296 cid_str: &str,
297 servers: &[String],
298) -> Result<()> {
299 let root_cid = parse_root_cid(cid_str)?;
300 background_blossom_push_incremental_with_store(store, root_cid, None, servers).await
301}
302
303pub async fn background_blossom_push_incremental_with_store(
304 store: Arc<HashtreeStore>,
305 root_cid: Cid,
306 previous_root_cid: Option<Cid>,
307 servers: &[String],
308) -> Result<()> {
309 use hashtree_blossom::BlossomClient;
310 use nostr::Keys;
311
312 let (nsec_str, _) = ensure_keys_string()?;
313 let keys = Keys::parse(&nsec_str).context("Failed to parse nsec")?;
314
315 let fetcher = Arc::new(Fetcher::new(FetchConfig::default()));
316 let cids_to_push = if let Some(previous_root_cid) = previous_root_cid.as_ref() {
317 println!("Collecting bounded DAG diff for file-server push...");
318 match collect_incremental_cids_for_push(
319 store.as_ref(),
320 root_cid.clone(),
321 previous_root_cid.clone(),
322 Some(fetcher.as_ref()),
323 )
324 .await
325 {
326 Ok(cids) => cids,
327 Err(err) => {
328 tracing::warn!(
329 "Blossom DAG diff failed; falling back to full push: {}",
330 err
331 );
332 collect_cids_for_push(store.as_ref(), root_cid, Some(fetcher.as_ref())).await?
333 }
334 }
335 } else {
336 println!("Collecting DAG for file-server push...");
337 collect_cids_for_push(store.as_ref(), root_cid, Some(fetcher.as_ref())).await?
338 };
339
340 if cids_to_push.is_empty() {
341 return Ok(());
342 }
343
344 let client = if servers.is_empty() {
345 BlossomClient::new(keys)
346 } else {
347 BlossomClient::new(keys).with_write_servers(servers.to_vec())
348 };
349 let (_total_uploaded, _total_skipped, total_errors, last_error) =
350 upload_cids_with_client(store, Some(fetcher), client, cids_to_push).await?;
351
352 if total_errors > 0 {
353 let detail = last_error
354 .as_deref()
355 .map(|err| format!(" (last error: {err})"))
356 .unwrap_or_default();
357 anyhow::bail!(
358 "failed to upload {} blob(s) to configured file servers{}",
359 total_errors,
360 detail
361 );
362 }
363
364 Ok(())
365}
366
367#[cfg(test)]
368mod tests {
369 use super::{collect_cids_for_push, collect_incremental_cids_for_push};
370 use crate::HashtreeStore;
371 use futures::executor::block_on as sync_block_on;
372 use hashtree_core::{DirEntry, HashTree, HashTreeConfig, LinkType};
373 use tempfile::tempdir;
374
375 #[tokio::test]
376 async fn collect_cids_for_push_fails_on_missing_descendant_blob() {
377 let tmp = tempdir().expect("tempdir");
378 let store = HashtreeStore::with_options(tmp.path(), None, 32 * 1024 * 1024).expect("store");
379 let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
380
381 let root = sync_block_on(async {
382 let (file_cid, _size) = tree.put_file(b"hello").await.expect("file");
383 tree.put_directory(vec![hashtree_core::DirEntry::from_cid(
384 "greeting.txt",
385 &file_cid,
386 )])
387 .await
388 .expect("dir")
389 });
390
391 let entries = store
392 .get_tree_node(&root.hash)
393 .expect("root node")
394 .expect("root node present")
395 .links;
396 let child_hash = entries[0].hash;
397 store
398 .router()
399 .delete_local_only(&child_hash)
400 .expect("delete child locally");
401
402 let err = collect_cids_for_push(&store, root, None)
403 .await
404 .expect_err("missing child should fail");
405 assert!(err
406 .to_string()
407 .contains("missing local blob while pushing DAG"));
408 }
409
410 #[tokio::test]
411 async fn incremental_push_collects_only_changed_named_subtrees() {
412 let tmp = tempdir().expect("tempdir");
413 let store = HashtreeStore::with_options(tmp.path(), None, 32 * 1024 * 1024).expect("store");
414 let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
415
416 let stable_file = tree.put_blob(b"stable").await.expect("stable file");
417 let old_changed_file = tree.put_blob(b"old").await.expect("old file");
418 let old_subdir = tree
419 .put_directory(vec![
420 DirEntry::new("changed.txt", old_changed_file).with_size(3),
421 DirEntry::new("stable.txt", stable_file).with_size(6),
422 ])
423 .await
424 .expect("old subdir");
425 let old_root = tree
426 .put_directory(vec![
427 DirEntry::new("subdir", old_subdir.hash).with_link_type(LinkType::Dir),
428 DirEntry::new("stable-root.txt", stable_file).with_size(6),
429 ])
430 .await
431 .expect("old root");
432
433 let new_changed_file = tree.put_blob(b"new").await.expect("new file");
434 let new_subdir = tree
435 .put_directory(vec![
436 DirEntry::new("stable.txt", stable_file).with_size(6),
437 DirEntry::new("changed.txt", new_changed_file).with_size(3),
438 ])
439 .await
440 .expect("new subdir");
441 let new_root = tree
442 .put_directory(vec![
443 DirEntry::new("stable-root.txt", stable_file).with_size(6),
444 DirEntry::new("subdir", new_subdir.hash).with_link_type(LinkType::Dir),
445 ])
446 .await
447 .expect("new root");
448
449 let cids = collect_incremental_cids_for_push(&store, new_root.clone(), old_root, None)
450 .await
451 .expect("incremental cids");
452 let hashes = cids.iter().map(|cid| cid.hash).collect::<Vec<_>>();
453
454 assert_eq!(hashes.len(), 3);
455 assert!(hashes.contains(&new_root.hash));
456 assert!(hashes.contains(&new_subdir.hash));
457 assert!(hashes.contains(&new_changed_file));
458 assert!(!hashes.contains(&stable_file));
459 }
460}