hashtree_cli/
blossom_push.rs1use anyhow::{Context, Result};
2use futures::stream::{self, StreamExt};
3use std::collections::HashSet;
4use std::path::Path;
5use std::sync::Arc;
6
7use crate::config::ensure_keys_string;
8use crate::fetch::{FetchConfig, Fetcher};
9use crate::HashtreeStore;
10use hashtree_core::{Cid, HashTree, HashTreeConfig, Link};
11
12const BLOSSOM_PUSH_CONCURRENCY: usize = 16;
13const BLOSSOM_PUSH_PROGRESS_EVERY: usize = 512;
14
15fn parse_root_cid(cid_str: &str) -> Result<Cid> {
16 Cid::parse(cid_str).map_err(|e| anyhow::anyhow!("Invalid CID '{}': {}", cid_str, e))
17}
18
19fn child_cid(parent: &Cid, link: &Link) -> Cid {
20 let inherits_parent_key = link
21 .name
22 .as_deref()
23 .map(|name| {
24 name.starts_with("_chunk_")
25 || (name.starts_with('_') && name.chars().count() == 2 && link.link_type.is_tree())
26 })
27 .unwrap_or(false);
28
29 Cid {
30 hash: link.hash,
31 key: link.key.or(if inherits_parent_key {
32 parent.key
33 } else {
34 None
35 }),
36 }
37}
38
39async fn ensure_local_blob_for_push(
40 store: &HashtreeStore,
41 fetcher: Option<&Fetcher>,
42 cid: &Cid,
43) -> Result<()> {
44 if store.get_blob(&cid.hash)?.is_some() {
45 return Ok(());
46 }
47
48 if let Some(fetcher) = fetcher {
49 fetcher
50 .fetch_chunk_with_store(store, None, &cid.hash)
51 .await
52 .with_context(|| format!("failed to hydrate missing local blob {}", cid))?;
53 if store.get_blob(&cid.hash)?.is_some() {
54 return Ok(());
55 }
56 }
57
58 anyhow::bail!("missing local blob while pushing DAG: {}", cid);
59}
60
61pub(crate) async fn collect_cids_for_push(
62 store: &HashtreeStore,
63 root_cid: Cid,
64 fetcher: Option<&Fetcher>,
65) -> Result<Vec<Cid>> {
66 let mut cids_to_push = Vec::new();
67 let mut visited: HashSet<[u8; 32]> = HashSet::new();
68 let mut queue = vec![root_cid];
69 let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
70
71 while let Some(cid) = queue.pop() {
72 if !visited.insert(cid.hash) {
73 continue;
74 }
75
76 ensure_local_blob_for_push(store, fetcher, &cid).await?;
77 cids_to_push.push(cid.clone());
78
79 let node = tree
80 .get_node(&cid)
81 .await
82 .map_err(|e| anyhow::anyhow!("Failed to inspect {}: {}", cid, e))?;
83
84 if let Some(node) = node {
85 for link in &node.links {
86 if !visited.contains(&link.hash) {
87 queue.push(child_cid(&cid, link));
88 }
89 }
90 }
91 }
92
93 Ok(cids_to_push)
94}
95
96async fn upload_cids_with_client(
97 store: Arc<HashtreeStore>,
98 client: hashtree_blossom::BlossomClient,
99 cids_to_push: Vec<Cid>,
100) -> Result<(usize, usize, usize, Option<String>)> {
101 let total = cids_to_push.len();
102 let mut total_uploaded = 0usize;
103 let mut total_skipped = 0usize;
104 let mut total_errors = 0usize;
105 let mut last_error = None;
106 let mut processed = 0usize;
107
108 let mut uploads = stream::iter(cids_to_push.into_iter().map(|cid| {
109 let store = Arc::clone(&store);
110 let client = client.clone();
111 async move {
112 let data = store
113 .get_blob(&cid.hash)?
114 .ok_or_else(|| anyhow::anyhow!("missing local blob while uploading {}", cid))?;
115 client
116 .upload_if_missing(&data)
117 .await
118 .map_err(|error| anyhow::anyhow!(error.to_string()))
119 }
120 }))
121 .buffer_unordered(BLOSSOM_PUSH_CONCURRENCY);
122
123 while let Some(result) = uploads.next().await {
124 processed += 1;
125 match result {
126 Ok((_hash, was_uploaded)) => {
127 if was_uploaded {
128 total_uploaded += 1;
129 } else {
130 total_skipped += 1;
131 }
132 }
133 Err(error) => {
134 tracing::warn!("Blossom upload failed: {}", error);
135 total_errors += 1;
136 last_error = Some(error.to_string());
137 }
138 }
139
140 if processed % BLOSSOM_PUSH_PROGRESS_EVERY == 0 || processed == total {
141 println!(
142 " file servers: {processed}/{total} processed ({total_uploaded} uploaded, {total_skipped} already exist, {total_errors} failed)",
143 );
144 }
145 }
146
147 Ok((total_uploaded, total_skipped, total_errors, last_error))
148}
149
150pub async fn push_to_blossom(
152 data_dir: &Path,
153 cid_str: &str,
154 server_override: Option<String>,
155) -> Result<()> {
156 use hashtree_blossom::BlossomClient;
157 use nostr::Keys;
158
159 let (nsec_str, _) = ensure_keys_string()?;
160 let keys = Keys::parse(&nsec_str).context("Failed to parse nsec")?;
161
162 let client = if let Some(server) = server_override {
163 BlossomClient::new(keys).with_write_servers(vec![server])
164 } else {
165 BlossomClient::new(keys)
166 };
167
168 if client.write_servers().is_empty() {
169 anyhow::bail!(
170 "No file servers configured. Use --server or add write_servers to config.toml"
171 );
172 }
173
174 let store = Arc::new(HashtreeStore::new(data_dir)?);
175 let fetcher = Fetcher::new(FetchConfig::default());
176
177 println!("Collecting blocks...");
178 let root_cid = parse_root_cid(cid_str)?;
179 let cids_to_push = collect_cids_for_push(store.as_ref(), root_cid, Some(&fetcher)).await?;
180
181 println!("Found {} blocks to push", cids_to_push.len());
182 let (uploaded, skipped, errors, last_error) =
183 upload_cids_with_client(store, client, cids_to_push).await?;
184
185 println!(
186 "\nUploaded: {}, Skipped: {}, Errors: {}",
187 uploaded, skipped, errors
188 );
189 if let Some(last_error) = last_error {
190 eprintln!("Last error: {last_error}");
191 }
192 println!("Done!");
193 Ok(())
194}
195
196pub async fn background_blossom_push(
198 data_dir: &Path,
199 cid_str: &str,
200 servers: &[String],
201) -> Result<()> {
202 let store = Arc::new(HashtreeStore::new(data_dir)?);
203 background_blossom_push_with_store(store, cid_str, servers).await
204}
205
206pub async fn background_blossom_push_with_store(
207 store: Arc<HashtreeStore>,
208 cid_str: &str,
209 servers: &[String],
210) -> Result<()> {
211 use hashtree_blossom::BlossomClient;
212 use nostr::Keys;
213
214 let (nsec_str, _) = ensure_keys_string()?;
215 let keys = Keys::parse(&nsec_str).context("Failed to parse nsec")?;
216
217 let root_cid = parse_root_cid(cid_str)?;
218 let fetcher = Fetcher::new(FetchConfig::default());
219 println!("Collecting DAG for file-server push...");
220 let cids_to_push = collect_cids_for_push(store.as_ref(), root_cid, Some(&fetcher)).await?;
221
222 if cids_to_push.is_empty() {
223 return Ok(());
224 }
225
226 let client = if servers.is_empty() {
227 BlossomClient::new(keys)
228 } else {
229 BlossomClient::new(keys).with_write_servers(servers.to_vec())
230 };
231 let (_total_uploaded, _total_skipped, total_errors, last_error) =
232 upload_cids_with_client(store, client, cids_to_push).await?;
233
234 if total_errors > 0 {
235 let detail = last_error
236 .as_deref()
237 .map(|err| format!(" (last error: {err})"))
238 .unwrap_or_default();
239 anyhow::bail!(
240 "failed to upload {} blob(s) to configured file servers{}",
241 total_errors,
242 detail
243 );
244 }
245
246 Ok(())
247}
248
249#[cfg(test)]
250mod tests {
251 use super::collect_cids_for_push;
252 use crate::HashtreeStore;
253 use futures::executor::block_on as sync_block_on;
254 use hashtree_core::{HashTree, HashTreeConfig};
255 use tempfile::tempdir;
256
257 #[tokio::test]
258 async fn collect_cids_for_push_fails_on_missing_descendant_blob() {
259 let tmp = tempdir().expect("tempdir");
260 let store = HashtreeStore::with_options(tmp.path(), None, 32 * 1024 * 1024).expect("store");
261 let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
262
263 let root = sync_block_on(async {
264 let (file_cid, _size) = tree.put_file(b"hello").await.expect("file");
265 tree.put_directory(vec![hashtree_core::DirEntry::from_cid(
266 "greeting.txt",
267 &file_cid,
268 )])
269 .await
270 .expect("dir")
271 });
272
273 let entries = store
274 .get_tree_node(&root.hash)
275 .expect("root node")
276 .expect("root node present")
277 .links;
278 let child_hash = entries[0].hash;
279 store
280 .router()
281 .delete_local_only(&child_hash)
282 .expect("delete child locally");
283
284 let err = collect_cids_for_push(&store, root, None)
285 .await
286 .expect_err("missing child should fail");
287 assert!(err
288 .to_string()
289 .contains("missing local blob while pushing DAG"));
290 }
291}