Skip to main content

hashtree_cli/
blossom_push.rs

1use anyhow::{Context, Result};
2use futures::stream::{self, StreamExt};
3use std::collections::HashSet;
4use std::path::Path;
5use std::sync::Arc;
6
7use crate::config::ensure_keys_string;
8use crate::fetch::{FetchConfig, Fetcher};
9use crate::HashtreeStore;
10use hashtree_core::{Cid, HashTree, HashTreeConfig, Link};
11
12const BLOSSOM_PUSH_CONCURRENCY: usize = 16;
13const BLOSSOM_PUSH_PROGRESS_EVERY: usize = 512;
14
15fn parse_root_cid(cid_str: &str) -> Result<Cid> {
16    Cid::parse(cid_str).map_err(|e| anyhow::anyhow!("Invalid CID '{}': {}", cid_str, e))
17}
18
19fn child_cid(parent: &Cid, link: &Link) -> Cid {
20    let inherits_parent_key = link
21        .name
22        .as_deref()
23        .map(|name| {
24            name.starts_with("_chunk_")
25                || (name.starts_with('_') && name.chars().count() == 2 && link.link_type.is_tree())
26        })
27        .unwrap_or(false);
28
29    Cid {
30        hash: link.hash,
31        key: link.key.or(if inherits_parent_key {
32            parent.key
33        } else {
34            None
35        }),
36    }
37}
38
39async fn ensure_local_blob_for_push(
40    store: &HashtreeStore,
41    fetcher: Option<&Fetcher>,
42    cid: &Cid,
43) -> Result<()> {
44    if store.get_blob(&cid.hash)?.is_some() {
45        return Ok(());
46    }
47
48    if let Some(fetcher) = fetcher {
49        fetcher
50            .fetch_chunk_with_store(store, None, &cid.hash)
51            .await
52            .with_context(|| format!("failed to hydrate missing local blob {}", cid))?;
53        if store.get_blob(&cid.hash)?.is_some() {
54            return Ok(());
55        }
56    }
57
58    anyhow::bail!("missing local blob while pushing DAG: {}", cid);
59}
60
61pub(crate) async fn collect_cids_for_push(
62    store: &HashtreeStore,
63    root_cid: Cid,
64    fetcher: Option<&Fetcher>,
65) -> Result<Vec<Cid>> {
66    let mut cids_to_push = Vec::new();
67    let mut visited: HashSet<[u8; 32]> = HashSet::new();
68    let mut queue = vec![root_cid];
69    let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
70
71    while let Some(cid) = queue.pop() {
72        if !visited.insert(cid.hash) {
73            continue;
74        }
75
76        ensure_local_blob_for_push(store, fetcher, &cid).await?;
77        cids_to_push.push(cid.clone());
78
79        let node = tree
80            .get_node(&cid)
81            .await
82            .map_err(|e| anyhow::anyhow!("Failed to inspect {}: {}", cid, e))?;
83
84        if let Some(node) = node {
85            for link in &node.links {
86                if !visited.contains(&link.hash) {
87                    queue.push(child_cid(&cid, link));
88                }
89            }
90        }
91    }
92
93    Ok(cids_to_push)
94}
95
96async fn upload_cids_with_client(
97    store: Arc<HashtreeStore>,
98    client: hashtree_blossom::BlossomClient,
99    cids_to_push: Vec<Cid>,
100) -> Result<(usize, usize, usize, Option<String>)> {
101    let total = cids_to_push.len();
102    let mut total_uploaded = 0usize;
103    let mut total_skipped = 0usize;
104    let mut total_errors = 0usize;
105    let mut last_error = None;
106    let mut processed = 0usize;
107
108    let mut uploads = stream::iter(cids_to_push.into_iter().map(|cid| {
109        let store = Arc::clone(&store);
110        let client = client.clone();
111        async move {
112            let data = store
113                .get_blob(&cid.hash)?
114                .ok_or_else(|| anyhow::anyhow!("missing local blob while uploading {}", cid))?;
115            client
116                .upload_if_missing(&data)
117                .await
118                .map_err(|error| anyhow::anyhow!(error.to_string()))
119        }
120    }))
121    .buffer_unordered(BLOSSOM_PUSH_CONCURRENCY);
122
123    while let Some(result) = uploads.next().await {
124        processed += 1;
125        match result {
126            Ok((_hash, was_uploaded)) => {
127                if was_uploaded {
128                    total_uploaded += 1;
129                } else {
130                    total_skipped += 1;
131                }
132            }
133            Err(error) => {
134                tracing::warn!("Blossom upload failed: {}", error);
135                total_errors += 1;
136                last_error = Some(error.to_string());
137            }
138        }
139
140        if processed % BLOSSOM_PUSH_PROGRESS_EVERY == 0 || processed == total {
141            println!(
142                "  file servers: {processed}/{total} processed ({total_uploaded} uploaded, {total_skipped} already exist, {total_errors} failed)",
143            );
144        }
145    }
146
147    Ok((total_uploaded, total_skipped, total_errors, last_error))
148}
149
150/// Push content to Blossom servers.
151pub async fn push_to_blossom(
152    data_dir: &Path,
153    cid_str: &str,
154    server_override: Option<String>,
155) -> Result<()> {
156    use hashtree_blossom::BlossomClient;
157    use nostr::Keys;
158
159    let (nsec_str, _) = ensure_keys_string()?;
160    let keys = Keys::parse(&nsec_str).context("Failed to parse nsec")?;
161
162    let client = if let Some(server) = server_override {
163        BlossomClient::new(keys).with_write_servers(vec![server])
164    } else {
165        BlossomClient::new(keys)
166    };
167
168    if client.write_servers().is_empty() {
169        anyhow::bail!(
170            "No file servers configured. Use --server or add write_servers to config.toml"
171        );
172    }
173
174    let store = Arc::new(HashtreeStore::new(data_dir)?);
175    let fetcher = Fetcher::new(FetchConfig::default());
176
177    println!("Collecting blocks...");
178    let root_cid = parse_root_cid(cid_str)?;
179    let cids_to_push = collect_cids_for_push(store.as_ref(), root_cid, Some(&fetcher)).await?;
180
181    println!("Found {} blocks to push", cids_to_push.len());
182    let (uploaded, skipped, errors, last_error) =
183        upload_cids_with_client(store, client, cids_to_push).await?;
184
185    println!(
186        "\nUploaded: {}, Skipped: {}, Errors: {}",
187        uploaded, skipped, errors
188    );
189    if let Some(last_error) = last_error {
190        eprintln!("Last error: {last_error}");
191    }
192    println!("Done!");
193    Ok(())
194}
195
196/// Push tree to Blossom servers using BlossomClient.
197pub async fn background_blossom_push(
198    data_dir: &Path,
199    cid_str: &str,
200    servers: &[String],
201) -> Result<()> {
202    let store = Arc::new(HashtreeStore::new(data_dir)?);
203    background_blossom_push_with_store(store, cid_str, servers).await
204}
205
206pub async fn background_blossom_push_with_store(
207    store: Arc<HashtreeStore>,
208    cid_str: &str,
209    servers: &[String],
210) -> Result<()> {
211    use hashtree_blossom::BlossomClient;
212    use nostr::Keys;
213
214    let (nsec_str, _) = ensure_keys_string()?;
215    let keys = Keys::parse(&nsec_str).context("Failed to parse nsec")?;
216
217    let root_cid = parse_root_cid(cid_str)?;
218    let fetcher = Fetcher::new(FetchConfig::default());
219    println!("Collecting DAG for file-server push...");
220    let cids_to_push = collect_cids_for_push(store.as_ref(), root_cid, Some(&fetcher)).await?;
221
222    if cids_to_push.is_empty() {
223        return Ok(());
224    }
225
226    let client = if servers.is_empty() {
227        BlossomClient::new(keys)
228    } else {
229        BlossomClient::new(keys).with_write_servers(servers.to_vec())
230    };
231    let (_total_uploaded, _total_skipped, total_errors, last_error) =
232        upload_cids_with_client(store, client, cids_to_push).await?;
233
234    if total_errors > 0 {
235        let detail = last_error
236            .as_deref()
237            .map(|err| format!(" (last error: {err})"))
238            .unwrap_or_default();
239        anyhow::bail!(
240            "failed to upload {} blob(s) to configured file servers{}",
241            total_errors,
242            detail
243        );
244    }
245
246    Ok(())
247}
248
249#[cfg(test)]
250mod tests {
251    use super::collect_cids_for_push;
252    use crate::HashtreeStore;
253    use futures::executor::block_on as sync_block_on;
254    use hashtree_core::{HashTree, HashTreeConfig};
255    use tempfile::tempdir;
256
257    #[tokio::test]
258    async fn collect_cids_for_push_fails_on_missing_descendant_blob() {
259        let tmp = tempdir().expect("tempdir");
260        let store = HashtreeStore::with_options(tmp.path(), None, 32 * 1024 * 1024).expect("store");
261        let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
262
263        let root = sync_block_on(async {
264            let (file_cid, _size) = tree.put_file(b"hello").await.expect("file");
265            tree.put_directory(vec![hashtree_core::DirEntry::from_cid(
266                "greeting.txt",
267                &file_cid,
268            )])
269            .await
270            .expect("dir")
271        });
272
273        let entries = store
274            .get_tree_node(&root.hash)
275            .expect("root node")
276            .expect("root node present")
277            .links;
278        let child_hash = entries[0].hash;
279        store
280            .router()
281            .delete_local_only(&child_hash)
282            .expect("delete child locally");
283
284        let err = collect_cids_for_push(&store, root, None)
285            .await
286            .expect_err("missing child should fail");
287        assert!(err
288            .to_string()
289            .contains("missing local blob while pushing DAG"));
290    }
291}