Skip to main content

hashtree_cli/
blossom_push.rs

1use anyhow::{Context, Result};
2use futures::stream::{self, StreamExt};
3use std::collections::HashSet;
4use std::path::Path;
5use std::sync::Arc;
6
7use crate::config::ensure_keys_string;
8use crate::fetch::{FetchConfig, Fetcher};
9use crate::HashtreeStore;
10use hashtree_core::{to_hex, Cid, HashTree, HashTreeConfig, Link};
11
12const BLOSSOM_PUSH_CONCURRENCY: usize = 16;
13const BLOSSOM_PUSH_PROGRESS_EVERY: usize = 512;
14
15fn parse_root_cid(cid_str: &str) -> Result<Cid> {
16    Cid::parse(cid_str).map_err(|e| anyhow::anyhow!("Invalid CID '{}': {}", cid_str, e))
17}
18
19fn child_cid(parent: &Cid, link: &Link) -> Cid {
20    let inherits_parent_key = link
21        .name
22        .as_deref()
23        .map(|name| {
24            name.starts_with("_chunk_")
25                || (name.starts_with('_') && name.chars().count() == 2 && link.link_type.is_tree())
26        })
27        .unwrap_or(false);
28
29    Cid {
30        hash: link.hash,
31        key: link.key.or(if inherits_parent_key {
32            parent.key
33        } else {
34            None
35        }),
36    }
37}
38
39async fn ensure_local_blob_for_push(
40    store: &HashtreeStore,
41    fetcher: Option<&Fetcher>,
42    cid: &Cid,
43) -> Result<()> {
44    if store.get_blob(&cid.hash)?.is_some() {
45        return Ok(());
46    }
47
48    if let Some(fetcher) = fetcher {
49        let data = fetcher
50            .fetch_chunk(None, &to_hex(&cid.hash))
51            .await
52            .with_context(|| format!("failed to hydrate missing local blob {}", cid))?;
53        store
54            .put_blob(&data)
55            .with_context(|| format!("failed to persist hydrated blob {}", cid))?;
56        if store.get_blob(&cid.hash)?.is_some() {
57            return Ok(());
58        }
59    }
60
61    anyhow::bail!("missing local blob while pushing DAG: {}", cid);
62}
63
64pub(crate) async fn collect_cids_for_push(
65    store: &HashtreeStore,
66    root_cid: Cid,
67    fetcher: Option<&Fetcher>,
68) -> Result<Vec<Cid>> {
69    let mut cids_to_push = Vec::new();
70    let mut visited: HashSet<[u8; 32]> = HashSet::new();
71    let mut queue = vec![root_cid];
72    let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
73
74    while let Some(cid) = queue.pop() {
75        if !visited.insert(cid.hash) {
76            continue;
77        }
78
79        ensure_local_blob_for_push(store, fetcher, &cid).await?;
80        cids_to_push.push(cid.clone());
81
82        let node = tree
83            .get_node(&cid)
84            .await
85            .map_err(|e| anyhow::anyhow!("Failed to inspect {}: {}", cid, e))?;
86
87        if let Some(node) = node {
88            for link in &node.links {
89                if !visited.contains(&link.hash) {
90                    queue.push(child_cid(&cid, link));
91                }
92            }
93        }
94    }
95
96    Ok(cids_to_push)
97}
98
99async fn upload_cids_with_client(
100    store: Arc<HashtreeStore>,
101    fetcher: Option<Arc<Fetcher>>,
102    client: hashtree_blossom::BlossomClient,
103    cids_to_push: Vec<Cid>,
104) -> Result<(usize, usize, usize, Option<String>)> {
105    let total = cids_to_push.len();
106    let mut total_uploaded = 0usize;
107    let mut total_skipped = 0usize;
108    let mut total_errors = 0usize;
109    let mut last_error = None;
110    let mut processed = 0usize;
111
112    let mut uploads = stream::iter(cids_to_push.into_iter().map(|cid| {
113        let store = Arc::clone(&store);
114        let fetcher = fetcher.clone();
115        let client = client.clone();
116        async move {
117            ensure_local_blob_for_push(store.as_ref(), fetcher.as_deref(), &cid).await?;
118            let data = store
119                .get_blob(&cid.hash)?
120                .ok_or_else(|| anyhow::anyhow!("missing local blob while uploading {}", cid))?;
121            client
122                .upload_if_missing(&data)
123                .await
124                .map_err(|error| anyhow::anyhow!(error.to_string()))
125        }
126    }))
127    .buffer_unordered(BLOSSOM_PUSH_CONCURRENCY);
128
129    while let Some(result) = uploads.next().await {
130        processed += 1;
131        match result {
132            Ok((_hash, was_uploaded)) => {
133                if was_uploaded {
134                    total_uploaded += 1;
135                } else {
136                    total_skipped += 1;
137                }
138            }
139            Err(error) => {
140                tracing::warn!("Blossom upload failed: {}", error);
141                total_errors += 1;
142                last_error = Some(error.to_string());
143            }
144        }
145
146        if processed % BLOSSOM_PUSH_PROGRESS_EVERY == 0 || processed == total {
147            println!(
148                "  file servers: {processed}/{total} processed ({total_uploaded} uploaded, {total_skipped} already exist, {total_errors} failed)",
149            );
150        }
151    }
152
153    Ok((total_uploaded, total_skipped, total_errors, last_error))
154}
155
156/// Push content to Blossom servers.
157pub async fn push_to_blossom(
158    data_dir: &Path,
159    cid_str: &str,
160    server_override: Option<String>,
161) -> Result<()> {
162    use hashtree_blossom::BlossomClient;
163    use nostr::Keys;
164
165    let (nsec_str, _) = ensure_keys_string()?;
166    let keys = Keys::parse(&nsec_str).context("Failed to parse nsec")?;
167
168    let client = if let Some(server) = server_override {
169        BlossomClient::new(keys).with_write_servers(vec![server])
170    } else {
171        BlossomClient::new(keys)
172    };
173
174    if client.write_servers().is_empty() {
175        anyhow::bail!(
176            "No file servers configured. Use --server or add write_servers to config.toml"
177        );
178    }
179
180    let store = Arc::new(HashtreeStore::new(data_dir)?);
181    let fetcher = Arc::new(Fetcher::new(FetchConfig::default()));
182
183    println!("Collecting blocks...");
184    let root_cid = parse_root_cid(cid_str)?;
185    let cids_to_push =
186        collect_cids_for_push(store.as_ref(), root_cid, Some(fetcher.as_ref())).await?;
187
188    println!("Found {} blocks to push", cids_to_push.len());
189    let (uploaded, skipped, errors, last_error) =
190        upload_cids_with_client(store, Some(fetcher), client, cids_to_push).await?;
191
192    println!(
193        "\nUploaded: {}, Skipped: {}, Errors: {}",
194        uploaded, skipped, errors
195    );
196    if let Some(last_error) = last_error {
197        eprintln!("Last error: {last_error}");
198    }
199    println!("Done!");
200    Ok(())
201}
202
203/// Push tree to Blossom servers using BlossomClient.
204pub async fn background_blossom_push(
205    data_dir: &Path,
206    cid_str: &str,
207    servers: &[String],
208) -> Result<()> {
209    let store = Arc::new(HashtreeStore::new(data_dir)?);
210    background_blossom_push_with_store(store, cid_str, servers).await
211}
212
213pub async fn background_blossom_push_with_store(
214    store: Arc<HashtreeStore>,
215    cid_str: &str,
216    servers: &[String],
217) -> Result<()> {
218    use hashtree_blossom::BlossomClient;
219    use nostr::Keys;
220
221    let (nsec_str, _) = ensure_keys_string()?;
222    let keys = Keys::parse(&nsec_str).context("Failed to parse nsec")?;
223
224    let root_cid = parse_root_cid(cid_str)?;
225    let fetcher = Arc::new(Fetcher::new(FetchConfig::default()));
226    println!("Collecting DAG for file-server push...");
227    let cids_to_push =
228        collect_cids_for_push(store.as_ref(), root_cid, Some(fetcher.as_ref())).await?;
229
230    if cids_to_push.is_empty() {
231        return Ok(());
232    }
233
234    let client = if servers.is_empty() {
235        BlossomClient::new(keys)
236    } else {
237        BlossomClient::new(keys).with_write_servers(servers.to_vec())
238    };
239    let (_total_uploaded, _total_skipped, total_errors, last_error) =
240        upload_cids_with_client(store, Some(fetcher), client, cids_to_push).await?;
241
242    if total_errors > 0 {
243        let detail = last_error
244            .as_deref()
245            .map(|err| format!(" (last error: {err})"))
246            .unwrap_or_default();
247        anyhow::bail!(
248            "failed to upload {} blob(s) to configured file servers{}",
249            total_errors,
250            detail
251        );
252    }
253
254    Ok(())
255}
256
257#[cfg(test)]
258mod tests {
259    use super::collect_cids_for_push;
260    use crate::HashtreeStore;
261    use futures::executor::block_on as sync_block_on;
262    use hashtree_core::{HashTree, HashTreeConfig};
263    use tempfile::tempdir;
264
265    #[tokio::test]
266    async fn collect_cids_for_push_fails_on_missing_descendant_blob() {
267        let tmp = tempdir().expect("tempdir");
268        let store = HashtreeStore::with_options(tmp.path(), None, 32 * 1024 * 1024).expect("store");
269        let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
270
271        let root = sync_block_on(async {
272            let (file_cid, _size) = tree.put_file(b"hello").await.expect("file");
273            tree.put_directory(vec![hashtree_core::DirEntry::from_cid(
274                "greeting.txt",
275                &file_cid,
276            )])
277            .await
278            .expect("dir")
279        });
280
281        let entries = store
282            .get_tree_node(&root.hash)
283            .expect("root node")
284            .expect("root node present")
285            .links;
286        let child_hash = entries[0].hash;
287        store
288            .router()
289            .delete_local_only(&child_hash)
290            .expect("delete child locally");
291
292        let err = collect_cids_for_push(&store, root, None)
293            .await
294            .expect_err("missing child should fail");
295        assert!(err
296            .to_string()
297            .contains("missing local blob while pushing DAG"));
298    }
299}