Skip to main content

hashtree_cli/
blossom_push.rs

1use anyhow::{Context, Result};
2use futures::executor::block_on as sync_block_on;
3use futures::stream::{self, StreamExt};
4use std::collections::HashSet;
5use std::path::Path;
6use std::sync::Arc;
7
8use crate::config::ensure_keys_string;
9use crate::HashtreeStore;
10use hashtree_core::{Cid, HashTree, HashTreeConfig, Link};
11
12const BLOSSOM_PUSH_CONCURRENCY: usize = 16;
13const BLOSSOM_PUSH_PROGRESS_EVERY: usize = 512;
14
15fn parse_root_cid(cid_str: &str) -> Result<Cid> {
16    Cid::parse(cid_str).map_err(|e| anyhow::anyhow!("Invalid CID '{}': {}", cid_str, e))
17}
18
19fn child_cid(parent: &Cid, link: &Link) -> Cid {
20    let inherits_parent_key = link
21        .name
22        .as_deref()
23        .map(|name| {
24            name.starts_with("_chunk_")
25                || (name.starts_with('_') && name.chars().count() == 2 && link.link_type.is_tree())
26        })
27        .unwrap_or(false);
28
29    Cid {
30        hash: link.hash,
31        key: link.key.or(if inherits_parent_key {
32            parent.key
33        } else {
34            None
35        }),
36    }
37}
38
39pub(crate) fn collect_cids_for_push(store: &HashtreeStore, root_cid: Cid) -> Result<Vec<Cid>> {
40    let mut cids_to_push = Vec::new();
41    let mut visited: HashSet<[u8; 32]> = HashSet::new();
42    let mut queue = vec![root_cid];
43    let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
44
45    while let Some(cid) = queue.pop() {
46        if !visited.insert(cid.hash) {
47            continue;
48        }
49
50        if store.get_blob(&cid.hash)?.is_none() {
51            anyhow::bail!("missing local blob while pushing DAG: {}", cid);
52        }
53        cids_to_push.push(cid.clone());
54
55        let node = sync_block_on(async { tree.get_node(&cid).await })
56            .map_err(|e| anyhow::anyhow!("Failed to inspect {}: {}", cid, e))?;
57
58        if let Some(node) = node {
59            for link in &node.links {
60                if !visited.contains(&link.hash) {
61                    queue.push(child_cid(&cid, link));
62                }
63            }
64        }
65    }
66
67    Ok(cids_to_push)
68}
69
70async fn upload_cids_with_client(
71    store: Arc<HashtreeStore>,
72    client: hashtree_blossom::BlossomClient,
73    cids_to_push: Vec<Cid>,
74) -> Result<(usize, usize, usize, Option<String>)> {
75    let total = cids_to_push.len();
76    let mut total_uploaded = 0usize;
77    let mut total_skipped = 0usize;
78    let mut total_errors = 0usize;
79    let mut last_error = None;
80    let mut processed = 0usize;
81
82    let mut uploads = stream::iter(cids_to_push.into_iter().map(|cid| {
83        let store = Arc::clone(&store);
84        let client = client.clone();
85        async move {
86            let data = store
87                .get_blob(&cid.hash)?
88                .ok_or_else(|| anyhow::anyhow!("missing local blob while uploading {}", cid))?;
89            client
90                .upload_if_missing(&data)
91                .await
92                .map_err(|error| anyhow::anyhow!(error.to_string()))
93        }
94    }))
95    .buffer_unordered(BLOSSOM_PUSH_CONCURRENCY);
96
97    while let Some(result) = uploads.next().await {
98        processed += 1;
99        match result {
100            Ok((_hash, was_uploaded)) => {
101                if was_uploaded {
102                    total_uploaded += 1;
103                } else {
104                    total_skipped += 1;
105                }
106            }
107            Err(error) => {
108                tracing::warn!("Blossom upload failed: {}", error);
109                total_errors += 1;
110                last_error = Some(error.to_string());
111            }
112        }
113
114        if processed % BLOSSOM_PUSH_PROGRESS_EVERY == 0 || processed == total {
115            println!(
116                "  file servers: {processed}/{total} processed ({total_uploaded} uploaded, {total_skipped} already exist, {total_errors} failed)",
117            );
118        }
119    }
120
121    Ok((total_uploaded, total_skipped, total_errors, last_error))
122}
123
124/// Push content to Blossom servers.
125pub async fn push_to_blossom(
126    data_dir: &Path,
127    cid_str: &str,
128    server_override: Option<String>,
129) -> Result<()> {
130    use hashtree_blossom::BlossomClient;
131    use nostr::Keys;
132
133    let (nsec_str, _) = ensure_keys_string()?;
134    let keys = Keys::parse(&nsec_str).context("Failed to parse nsec")?;
135
136    let client = if let Some(server) = server_override {
137        BlossomClient::new(keys).with_write_servers(vec![server])
138    } else {
139        BlossomClient::new(keys)
140    };
141
142    if client.write_servers().is_empty() {
143        anyhow::bail!(
144            "No file servers configured. Use --server or add write_servers to config.toml"
145        );
146    }
147
148    let store = Arc::new(HashtreeStore::new(data_dir)?);
149
150    println!("Collecting blocks...");
151    let root_cid = parse_root_cid(cid_str)?;
152    let cids_to_push = collect_cids_for_push(store.as_ref(), root_cid)?;
153
154    println!("Found {} blocks to push", cids_to_push.len());
155    let (uploaded, skipped, errors, last_error) =
156        upload_cids_with_client(store, client, cids_to_push).await?;
157
158    println!(
159        "\nUploaded: {}, Skipped: {}, Errors: {}",
160        uploaded, skipped, errors
161    );
162    if let Some(last_error) = last_error {
163        eprintln!("Last error: {last_error}");
164    }
165    println!("Done!");
166    Ok(())
167}
168
169/// Push tree to Blossom servers using BlossomClient.
170pub async fn background_blossom_push(
171    data_dir: &Path,
172    cid_str: &str,
173    servers: &[String],
174) -> Result<()> {
175    use hashtree_blossom::BlossomClient;
176    use nostr::Keys;
177
178    let (nsec_str, _) = ensure_keys_string()?;
179    let keys = Keys::parse(&nsec_str).context("Failed to parse nsec")?;
180
181    let store = Arc::new(HashtreeStore::new(data_dir)?);
182    let root_cid = parse_root_cid(cid_str)?;
183    let cids_to_push = collect_cids_for_push(store.as_ref(), root_cid)?;
184
185    if cids_to_push.is_empty() {
186        return Ok(());
187    }
188
189    let client = if servers.is_empty() {
190        BlossomClient::new(keys)
191    } else {
192        BlossomClient::new(keys).with_write_servers(servers.to_vec())
193    };
194    let (_total_uploaded, _total_skipped, total_errors, last_error) =
195        upload_cids_with_client(store, client, cids_to_push).await?;
196
197    if total_errors > 0 {
198        let detail = last_error
199            .as_deref()
200            .map(|err| format!(" (last error: {err})"))
201            .unwrap_or_default();
202        anyhow::bail!(
203            "failed to upload {} blob(s) to configured file servers{}",
204            total_errors,
205            detail
206        );
207    }
208
209    Ok(())
210}
211
212#[cfg(test)]
213mod tests {
214    use super::collect_cids_for_push;
215    use crate::HashtreeStore;
216    use futures::executor::block_on as sync_block_on;
217    use hashtree_core::{HashTree, HashTreeConfig};
218    use tempfile::tempdir;
219
220    #[test]
221    fn collect_cids_for_push_fails_on_missing_descendant_blob() {
222        let tmp = tempdir().expect("tempdir");
223        let store = HashtreeStore::with_options(tmp.path(), None, 32 * 1024 * 1024).expect("store");
224        let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
225
226        let root = sync_block_on(async {
227            let (file_cid, _size) = tree.put_file(b"hello").await.expect("file");
228            tree.put_directory(vec![hashtree_core::DirEntry::from_cid(
229                "greeting.txt",
230                &file_cid,
231            )])
232            .await
233            .expect("dir")
234        });
235
236        let entries = store
237            .get_tree_node(&root.hash)
238            .expect("root node")
239            .expect("root node present")
240            .links;
241        let child_hash = entries[0].hash;
242        store
243            .router()
244            .delete_local_only(&child_hash)
245            .expect("delete child locally");
246
247        let err = collect_cids_for_push(&store, root).expect_err("missing child should fail");
248        assert!(err
249            .to_string()
250            .contains("missing local blob while pushing DAG"));
251    }
252}