Skip to main content

hashtree_cli/
blossom_push.rs

1use anyhow::{Context, Result};
2use futures::stream::{self, StreamExt};
3use std::collections::HashSet;
4use std::path::Path;
5use std::sync::Arc;
6
7use crate::config::ensure_keys_string;
8use crate::fetch::{FetchConfig, Fetcher};
9use crate::HashtreeStore;
10use hashtree_core::{to_hex, Cid, HashTree, HashTreeConfig, Link};
11
12const BLOSSOM_PUSH_CONCURRENCY: usize = 16;
13const BLOSSOM_PUSH_PROGRESS_EVERY: usize = 512;
14
15fn parse_root_cid(cid_str: &str) -> Result<Cid> {
16    Cid::parse(cid_str).map_err(|e| anyhow::anyhow!("Invalid CID '{}': {}", cid_str, e))
17}
18
19fn child_cid(parent: &Cid, link: &Link) -> Cid {
20    let inherits_parent_key = link
21        .name
22        .as_deref()
23        .map(|name| {
24            name.starts_with("_chunk_")
25                || (name.starts_with('_') && name.chars().count() == 2 && link.link_type.is_tree())
26        })
27        .unwrap_or(false);
28
29    Cid {
30        hash: link.hash,
31        key: link.key.or(if inherits_parent_key {
32            parent.key
33        } else {
34            None
35        }),
36    }
37}
38
39async fn ensure_local_blob_for_push(
40    store: &HashtreeStore,
41    fetcher: Option<&Fetcher>,
42    cid: &Cid,
43) -> Result<()> {
44    if store.get_blob(&cid.hash)?.is_some() {
45        return Ok(());
46    }
47
48    if let Some(fetcher) = fetcher {
49        let data = fetcher
50            .fetch_chunk(None, &to_hex(&cid.hash))
51            .await
52            .with_context(|| format!("failed to hydrate missing local blob {}", cid))?;
53        store
54            .put_blob(&data)
55            .with_context(|| format!("failed to persist hydrated blob {}", cid))?;
56        if store.get_blob(&cid.hash)?.is_some() {
57            return Ok(());
58        }
59    }
60
61    anyhow::bail!("missing local blob while pushing DAG: {}", cid);
62}
63
64pub(crate) async fn collect_cids_for_push(
65    store: &HashtreeStore,
66    root_cid: Cid,
67    fetcher: Option<&Fetcher>,
68) -> Result<Vec<Cid>> {
69    let mut cids_to_push = Vec::new();
70    let mut visited: HashSet<[u8; 32]> = HashSet::new();
71    let mut queue = vec![root_cid];
72    let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
73
74    while let Some(cid) = queue.pop() {
75        if !visited.insert(cid.hash) {
76            continue;
77        }
78
79        ensure_local_blob_for_push(store, fetcher, &cid).await?;
80        cids_to_push.push(cid.clone());
81
82        let node = tree
83            .get_node(&cid)
84            .await
85            .map_err(|e| anyhow::anyhow!("Failed to inspect {}: {}", cid, e))?;
86
87        if let Some(node) = node {
88            for link in &node.links {
89                if !visited.contains(&link.hash) {
90                    queue.push(child_cid(&cid, link));
91                }
92            }
93        }
94    }
95
96    Ok(cids_to_push)
97}
98
99fn matching_old_child<'a>(
100    old_links: &'a [Link],
101    new_index: usize,
102    new_link: &Link,
103) -> Option<&'a Link> {
104    if let Some(name) = new_link.name.as_deref() {
105        old_links
106            .iter()
107            .find(|old_link| old_link.name.as_deref() == Some(name))
108    } else {
109        old_links
110            .get(new_index)
111            .filter(|old_link| old_link.name.is_none())
112    }
113}
114
115pub(crate) async fn collect_incremental_cids_for_push(
116    store: &HashtreeStore,
117    root_cid: Cid,
118    previous_root_cid: Cid,
119    fetcher: Option<&Fetcher>,
120) -> Result<Vec<Cid>> {
121    let mut cids_to_push = Vec::new();
122    let mut visited_new: HashSet<[u8; 32]> = HashSet::new();
123    let mut queue = vec![(root_cid, Some(previous_root_cid))];
124    let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
125
126    while let Some((cid, old_cid)) = queue.pop() {
127        if old_cid.as_ref().is_some_and(|old| old.hash == cid.hash) {
128            continue;
129        }
130        if !visited_new.insert(cid.hash) {
131            continue;
132        }
133
134        ensure_local_blob_for_push(store, fetcher, &cid).await?;
135        cids_to_push.push(cid.clone());
136
137        let node = tree
138            .get_node(&cid)
139            .await
140            .map_err(|e| anyhow::anyhow!("Failed to inspect {}: {}", cid, e))?;
141        let Some(node) = node else {
142            continue;
143        };
144
145        let old_node = match old_cid.as_ref() {
146            Some(old_cid) => match tree.get_node(old_cid).await {
147                Ok(old_node) => old_node,
148                Err(err) => {
149                    tracing::warn!(
150                        "Failed to inspect previous Blossom DAG node {}; uploading changed subtree: {}",
151                        old_cid,
152                        err
153                    );
154                    None
155                }
156            },
157            None => None,
158        };
159
160        for (index, link) in node.links.iter().enumerate() {
161            let child = child_cid(&cid, link);
162            let old_child = old_node
163                .as_ref()
164                .and_then(|old_node| matching_old_child(&old_node.links, index, link))
165                .map(|old_link| child_cid(old_cid.as_ref().expect("old node has cid"), old_link));
166
167            if old_child
168                .as_ref()
169                .is_some_and(|old_child| old_child.hash == child.hash)
170            {
171                continue;
172            }
173            queue.push((child, old_child));
174        }
175    }
176
177    Ok(cids_to_push)
178}
179
180async fn upload_cids_with_client(
181    store: Arc<HashtreeStore>,
182    fetcher: Option<Arc<Fetcher>>,
183    client: hashtree_blossom::BlossomClient,
184    cids_to_push: Vec<Cid>,
185) -> Result<(usize, usize, usize, Option<String>)> {
186    let total = cids_to_push.len();
187    let mut total_uploaded = 0usize;
188    let mut total_skipped = 0usize;
189    let mut total_errors = 0usize;
190    let mut last_error = None;
191    let mut processed = 0usize;
192
193    let mut uploads = stream::iter(cids_to_push.into_iter().map(|cid| {
194        let store = Arc::clone(&store);
195        let fetcher = fetcher.clone();
196        let client = client.clone();
197        async move {
198            ensure_local_blob_for_push(store.as_ref(), fetcher.as_deref(), &cid).await?;
199            let data = store
200                .get_blob(&cid.hash)?
201                .ok_or_else(|| anyhow::anyhow!("missing local blob while uploading {}", cid))?;
202            client
203                .upload_if_missing(&data)
204                .await
205                .map_err(|error| anyhow::anyhow!(error.to_string()))
206        }
207    }))
208    .buffer_unordered(BLOSSOM_PUSH_CONCURRENCY);
209
210    while let Some(result) = uploads.next().await {
211        processed += 1;
212        match result {
213            Ok((_hash, was_uploaded)) => {
214                if was_uploaded {
215                    total_uploaded += 1;
216                } else {
217                    total_skipped += 1;
218                }
219            }
220            Err(error) => {
221                tracing::warn!("Blossom upload failed: {}", error);
222                total_errors += 1;
223                last_error = Some(error.to_string());
224            }
225        }
226
227        if processed % BLOSSOM_PUSH_PROGRESS_EVERY == 0 || processed == total {
228            println!(
229                "  file servers: {processed}/{total} processed ({total_uploaded} uploaded, {total_skipped} already exist, {total_errors} failed)",
230            );
231        }
232    }
233
234    Ok((total_uploaded, total_skipped, total_errors, last_error))
235}
236
237/// Push content to Blossom servers.
238pub async fn push_to_blossom(
239    data_dir: &Path,
240    cid_str: &str,
241    server_override: Option<String>,
242) -> Result<()> {
243    use hashtree_blossom::BlossomClient;
244    use nostr::Keys;
245
246    let (nsec_str, _) = ensure_keys_string()?;
247    let keys = Keys::parse(&nsec_str).context("Failed to parse nsec")?;
248
249    let client = if let Some(server) = server_override {
250        BlossomClient::new(keys).with_write_servers(vec![server])
251    } else {
252        BlossomClient::new(keys)
253    };
254
255    if client.write_servers().is_empty() {
256        anyhow::bail!(
257            "No file servers configured. Use --server or add write_servers to config.toml"
258        );
259    }
260
261    let store = Arc::new(HashtreeStore::new(data_dir)?);
262    let fetcher = Arc::new(Fetcher::new(FetchConfig::default()));
263
264    println!("Collecting blocks...");
265    let root_cid = parse_root_cid(cid_str)?;
266    let cids_to_push =
267        collect_cids_for_push(store.as_ref(), root_cid, Some(fetcher.as_ref())).await?;
268
269    println!("Found {} blocks to push", cids_to_push.len());
270    let (uploaded, skipped, errors, last_error) =
271        upload_cids_with_client(store, Some(fetcher), client, cids_to_push).await?;
272
273    println!(
274        "\nUploaded: {}, Skipped: {}, Errors: {}",
275        uploaded, skipped, errors
276    );
277    if let Some(last_error) = last_error {
278        eprintln!("Last error: {last_error}");
279    }
280    println!("Done!");
281    Ok(())
282}
283
284/// Push tree to Blossom servers using BlossomClient.
285pub async fn background_blossom_push(
286    data_dir: &Path,
287    cid_str: &str,
288    servers: &[String],
289) -> Result<()> {
290    let store = Arc::new(HashtreeStore::new(data_dir)?);
291    background_blossom_push_with_store(store, cid_str, servers).await
292}
293
294pub async fn background_blossom_push_with_store(
295    store: Arc<HashtreeStore>,
296    cid_str: &str,
297    servers: &[String],
298) -> Result<()> {
299    let root_cid = parse_root_cid(cid_str)?;
300    background_blossom_push_incremental_with_store(store, root_cid, None, servers).await
301}
302
303pub async fn background_blossom_push_incremental_with_store(
304    store: Arc<HashtreeStore>,
305    root_cid: Cid,
306    previous_root_cid: Option<Cid>,
307    servers: &[String],
308) -> Result<()> {
309    use hashtree_blossom::BlossomClient;
310    use nostr::Keys;
311
312    let (nsec_str, _) = ensure_keys_string()?;
313    let keys = Keys::parse(&nsec_str).context("Failed to parse nsec")?;
314
315    let fetcher = Arc::new(Fetcher::new(FetchConfig::default()));
316    let cids_to_push = if let Some(previous_root_cid) = previous_root_cid.as_ref() {
317        println!("Collecting bounded DAG diff for file-server push...");
318        match collect_incremental_cids_for_push(
319            store.as_ref(),
320            root_cid.clone(),
321            previous_root_cid.clone(),
322            Some(fetcher.as_ref()),
323        )
324        .await
325        {
326            Ok(cids) => cids,
327            Err(err) => {
328                tracing::warn!(
329                    "Blossom DAG diff failed; falling back to full push: {}",
330                    err
331                );
332                collect_cids_for_push(store.as_ref(), root_cid, Some(fetcher.as_ref())).await?
333            }
334        }
335    } else {
336        println!("Collecting DAG for file-server push...");
337        collect_cids_for_push(store.as_ref(), root_cid, Some(fetcher.as_ref())).await?
338    };
339
340    if cids_to_push.is_empty() {
341        return Ok(());
342    }
343
344    let client = if servers.is_empty() {
345        BlossomClient::new(keys)
346    } else {
347        BlossomClient::new(keys).with_write_servers(servers.to_vec())
348    };
349    let (_total_uploaded, _total_skipped, total_errors, last_error) =
350        upload_cids_with_client(store, Some(fetcher), client, cids_to_push).await?;
351
352    if total_errors > 0 {
353        let detail = last_error
354            .as_deref()
355            .map(|err| format!(" (last error: {err})"))
356            .unwrap_or_default();
357        anyhow::bail!(
358            "failed to upload {} blob(s) to configured file servers{}",
359            total_errors,
360            detail
361        );
362    }
363
364    Ok(())
365}
366
367#[cfg(test)]
368mod tests {
369    use super::{collect_cids_for_push, collect_incremental_cids_for_push};
370    use crate::HashtreeStore;
371    use futures::executor::block_on as sync_block_on;
372    use hashtree_core::{DirEntry, HashTree, HashTreeConfig, LinkType};
373    use tempfile::tempdir;
374
375    #[tokio::test]
376    async fn collect_cids_for_push_fails_on_missing_descendant_blob() {
377        let tmp = tempdir().expect("tempdir");
378        let store = HashtreeStore::with_options(tmp.path(), None, 32 * 1024 * 1024).expect("store");
379        let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
380
381        let root = sync_block_on(async {
382            let (file_cid, _size) = tree.put_file(b"hello").await.expect("file");
383            tree.put_directory(vec![hashtree_core::DirEntry::from_cid(
384                "greeting.txt",
385                &file_cid,
386            )])
387            .await
388            .expect("dir")
389        });
390
391        let entries = store
392            .get_tree_node(&root.hash)
393            .expect("root node")
394            .expect("root node present")
395            .links;
396        let child_hash = entries[0].hash;
397        store
398            .router()
399            .delete_local_only(&child_hash)
400            .expect("delete child locally");
401
402        let err = collect_cids_for_push(&store, root, None)
403            .await
404            .expect_err("missing child should fail");
405        assert!(err
406            .to_string()
407            .contains("missing local blob while pushing DAG"));
408    }
409
410    #[tokio::test]
411    async fn incremental_push_collects_only_changed_named_subtrees() {
412        let tmp = tempdir().expect("tempdir");
413        let store = HashtreeStore::with_options(tmp.path(), None, 32 * 1024 * 1024).expect("store");
414        let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
415
416        let stable_file = tree.put_blob(b"stable").await.expect("stable file");
417        let old_changed_file = tree.put_blob(b"old").await.expect("old file");
418        let old_subdir = tree
419            .put_directory(vec![
420                DirEntry::new("changed.txt", old_changed_file).with_size(3),
421                DirEntry::new("stable.txt", stable_file).with_size(6),
422            ])
423            .await
424            .expect("old subdir");
425        let old_root = tree
426            .put_directory(vec![
427                DirEntry::new("subdir", old_subdir.hash).with_link_type(LinkType::Dir),
428                DirEntry::new("stable-root.txt", stable_file).with_size(6),
429            ])
430            .await
431            .expect("old root");
432
433        let new_changed_file = tree.put_blob(b"new").await.expect("new file");
434        let new_subdir = tree
435            .put_directory(vec![
436                DirEntry::new("stable.txt", stable_file).with_size(6),
437                DirEntry::new("changed.txt", new_changed_file).with_size(3),
438            ])
439            .await
440            .expect("new subdir");
441        let new_root = tree
442            .put_directory(vec![
443                DirEntry::new("stable-root.txt", stable_file).with_size(6),
444                DirEntry::new("subdir", new_subdir.hash).with_link_type(LinkType::Dir),
445            ])
446            .await
447            .expect("new root");
448
449        let cids = collect_incremental_cids_for_push(&store, new_root.clone(), old_root, None)
450            .await
451            .expect("incremental cids");
452        let hashes = cids.iter().map(|cid| cid.hash).collect::<Vec<_>>();
453
454        assert_eq!(hashes.len(), 3);
455        assert!(hashes.contains(&new_root.hash));
456        assert!(hashes.contains(&new_subdir.hash));
457        assert!(hashes.contains(&new_changed_file));
458        assert!(!hashes.contains(&stable_file));
459    }
460}