1use base64::Engine;
30use nostr::prelude::*;
31use sha2::{Digest, Sha256};
32use std::time::Duration;
33use thiserror::Error;
34use tracing::{debug, warn};
35
36#[derive(Error, Debug)]
37pub enum BlossomError {
38 #[error("HTTP error: {0}")]
39 Http(#[from] reqwest::Error),
40
41 #[error("No servers configured")]
42 NoServers,
43
44 #[error("Upload failed: {0}")]
45 UploadFailed(String),
46
47 #[error("Download failed on all servers: {0}")]
48 DownloadFailed(String),
49
50 #[error("Hash mismatch: expected {expected}, got {actual}")]
51 HashMismatch { expected: String, actual: String },
52
53 #[error("Signing error: {0}")]
54 Signing(String),
55}
56
57#[derive(Clone)]
59pub struct BlossomClient {
60 keys: Keys,
61 read_servers: Vec<String>,
63 write_servers: Vec<String>,
65 http: reqwest::Client,
66 timeout: Duration,
67}
68
69impl BlossomClient {
70 #[cfg(feature = "config")]
74 pub fn new(keys: Keys) -> Self {
75 let config = hashtree_config::Config::load_or_default();
76 let mut read_servers = config.blossom.all_read_servers();
77
78 if let Some(local_url) =
80 hashtree_config::detect_local_daemon_url(Some(&config.server.bind_address))
81 {
82 if !read_servers.iter().any(|s| s == &local_url) {
83 debug!(
84 "Local daemon detected at {}, prioritizing for reads",
85 local_url
86 );
87 read_servers.insert(0, local_url);
88 }
89 }
90
91 Self {
92 keys,
93 read_servers,
94 write_servers: config.blossom.all_write_servers(),
95 http: reqwest::Client::builder()
96 .timeout(Duration::from_secs(30))
97 .build()
98 .unwrap(),
99 timeout: Duration::from_secs(30),
100 }
101 }
102
103 #[cfg(not(feature = "config"))]
105 pub fn new(keys: Keys) -> Self {
106 Self {
107 keys,
108 read_servers: vec![],
109 write_servers: vec![],
110 http: reqwest::Client::builder()
111 .timeout(Duration::from_secs(30))
112 .build()
113 .unwrap(),
114 timeout: Duration::from_secs(30),
115 }
116 }
117
118 pub fn new_empty(keys: Keys) -> Self {
120 Self {
121 keys,
122 read_servers: vec![],
123 write_servers: vec![],
124 http: reqwest::Client::builder()
125 .timeout(Duration::from_secs(30))
126 .build()
127 .unwrap(),
128 timeout: Duration::from_secs(30),
129 }
130 }
131
132 pub fn with_servers(mut self, servers: Vec<String>) -> Self {
134 self.read_servers = servers.clone();
135 self.write_servers = servers;
136 self
137 }
138
139 pub fn with_read_servers(mut self, servers: Vec<String>) -> Self {
141 self.read_servers = servers;
142 self
143 }
144
145 pub fn with_write_servers(mut self, servers: Vec<String>) -> Self {
147 self.write_servers = servers;
148 self
149 }
150
151 pub fn with_timeout(mut self, timeout: Duration) -> Self {
153 self.timeout = timeout;
154 self.http = reqwest::Client::builder().timeout(timeout).build().unwrap();
155 self
156 }
157
158 pub fn with_local_daemon(mut self, url: String) -> Self {
161 if !self.read_servers.iter().any(|s| s == &url) {
163 self.read_servers.insert(0, url);
164 }
165 self
166 }
167
168 pub fn read_servers(&self) -> &[String] {
170 &self.read_servers
171 }
172
173 pub fn write_servers(&self) -> &[String] {
175 &self.write_servers
176 }
177
178 pub fn servers(&self) -> &[String] {
180 &self.read_servers
181 }
182
183 pub async fn upload(&self, data: &[u8]) -> Result<String, BlossomError> {
186 if self.write_servers.is_empty() {
187 return Err(BlossomError::NoServers);
188 }
189
190 let hash = compute_sha256(data);
191 let auth_header = self.create_upload_auth(&hash).await?;
192
193 for server in &self.write_servers {
194 match self
195 .upload_to_server(server, data, &hash, &auth_header)
196 .await
197 {
198 Ok(_) => {
199 debug!("Uploaded {} to {}", &hash[..12], server);
200 return Ok(hash);
201 }
202 Err(e) => {
203 warn!("Upload to {} failed: {}", server, e);
204 continue;
205 }
206 }
207 }
208
209 Err(BlossomError::UploadFailed("all servers failed".to_string()))
210 }
211
212 pub async fn upload_if_missing(&self, data: &[u8]) -> Result<(String, bool), BlossomError> {
219 if self.write_servers.is_empty() {
220 return Err(BlossomError::NoServers);
221 }
222
223 let hash = compute_sha256(data);
224
225 if data.is_empty() {
227 warn!("Attempting to upload empty blob with hash {}", hash);
228 }
229
230 const HEAD_CHECK_THRESHOLD: usize = 256 * 1024; if data.len() >= HEAD_CHECK_THRESHOLD && self.exists(&hash).await {
233 debug!("Large blob {} already exists (skipped upload)", &hash[..12]);
234 return Ok((hash, false));
235 }
236
237 const MAX_RETRIES: u32 = 3;
238 let mut last_error = String::new();
239
240 for attempt in 0..MAX_RETRIES {
241 if attempt > 0 {
242 let delay = Duration::from_millis(100 * (1 << (attempt - 1)));
244 debug!(
245 "Retrying upload {} (attempt {}/{}), waiting {:?}",
246 &hash[..12],
247 attempt + 1,
248 MAX_RETRIES,
249 delay
250 );
251 tokio::time::sleep(delay).await;
252 }
253
254 let auth_header = self.create_upload_auth(&hash).await?;
256
257 for server in &self.write_servers {
258 match self
259 .upload_to_server(server, data, &hash, &auth_header)
260 .await
261 {
262 Ok(was_new) => {
263 if was_new {
264 debug!("Uploaded {} to {}", &hash[..12], server);
265 } else {
266 debug!("Blob {} already exists on {}", &hash[..12], server);
267 }
268 return Ok((hash, was_new));
269 }
270 Err(e) => {
271 last_error = format!("{}: {}", server, e);
272 warn!("Upload to {} failed: {}", server, e);
273 continue;
274 }
275 }
276 }
277 }
278
279 Err(BlossomError::UploadFailed(format!(
280 "all servers failed after {} retries (last: {})",
281 MAX_RETRIES, last_error
282 )))
283 }
284
285 pub async fn exists(&self, hash: &str) -> bool {
287 for server in &self.write_servers {
288 if self.exists_on_server(hash, server).await {
289 return true;
290 }
291 }
292 false
293 }
294
295 pub async fn exists_on_server(&self, hash: &str, server: &str) -> bool {
297 let url = format!("{}/{}.bin", server.trim_end_matches('/'), hash);
298 debug!("Checking exists: {}", url);
299 if let Ok(resp) = self.http.head(&url).send().await {
300 debug!(" -> status: {}", resp.status());
301 if resp.status().is_success() {
302 if let Some(ct) = resp.headers().get("content-type") {
304 if let Ok(ct_str) = ct.to_str() {
305 if ct_str.starts_with("text/html") {
306 return false;
307 }
308 }
309 }
310 if let Some(cl) = resp.headers().get("content-length") {
312 if let Ok(cl_str) = cl.to_str() {
313 if cl_str == "0" {
314 return false;
315 }
316 }
317 }
318 return true;
319 }
320 }
321 false
322 }
323
324 pub async fn server_has_tree_samples(
326 &self,
327 server: &str,
328 hashes: &[&str],
329 sample_size: usize,
330 ) -> bool {
331 use futures::future::join_all;
332 if hashes.is_empty() {
333 return false;
334 }
335 let step = (hashes.len() / sample_size.min(hashes.len())).max(1);
337 let samples: Vec<_> = hashes.iter().step_by(step).take(sample_size).collect();
338 let checks: Vec<_> = samples
339 .iter()
340 .map(|h| self.exists_on_server(h, server))
341 .collect();
342 join_all(checks).await.iter().all(|&exists| exists)
343 }
344
345 pub async fn upload_to_all_servers(
347 &self,
348 data: &[u8],
349 ) -> Result<(String, usize), BlossomError> {
350 use futures::future::join_all;
351 if self.write_servers.is_empty() {
352 return Err(BlossomError::NoServers);
353 }
354 let hash = compute_sha256(data);
355 let auth = self.create_upload_auth(&hash).await?;
356 let uploads: Vec<_> = self
357 .write_servers
358 .iter()
359 .map(|s| self.upload_to_server(s, data, &hash, &auth))
360 .collect();
361 let results = join_all(uploads).await;
362 let ok_count = results.iter().filter(|r| r.is_ok()).count();
363 if ok_count == 0 {
364 return Err(BlossomError::UploadFailed("all servers failed".to_string()));
365 }
366 Ok((hash, ok_count))
367 }
368
369 pub async fn download(&self, hash: &str) -> Result<Vec<u8>, BlossomError> {
372 if self.read_servers.is_empty() {
373 return Err(BlossomError::NoServers);
374 }
375
376 let mut last_error = String::new();
377
378 for server in &self.read_servers {
379 let url = format!("{}/{}.bin", server.trim_end_matches('/'), hash);
380 match self.http.get(&url).send().await {
381 Ok(resp) if resp.status().is_success() => {
382 let x_source = resp
384 .headers()
385 .get("x-source")
386 .and_then(|v| v.to_str().ok())
387 .map(|s| s.to_string());
388
389 match resp.bytes().await {
390 Ok(bytes) => {
391 let computed = compute_sha256(&bytes);
392 if computed == hash {
393 if let Some(source) = x_source {
394 debug!(
395 "Downloaded {} ({} bytes) via {} [source: {}]",
396 &hash[..12.min(hash.len())],
397 bytes.len(),
398 server,
399 source
400 );
401 } else {
402 debug!(
403 "Downloaded {} ({} bytes) from {}",
404 &hash[..12.min(hash.len())],
405 bytes.len(),
406 server
407 );
408 }
409 return Ok(bytes.to_vec());
410 } else {
411 last_error = format!("hash mismatch from {}: expected {}, got {} ({} bytes received)",
412 server, hash, computed, bytes.len());
413 warn!(
414 "Hash mismatch downloading {} from {}: got {} ({} bytes)",
415 hash,
416 server,
417 &computed[..12.min(computed.len())],
418 bytes.len()
419 );
420 }
421 }
422 Err(e) => {
423 last_error = e.to_string();
424 }
425 }
426 }
427 Ok(resp) => {
428 last_error = format!("{} returned {}", server, resp.status());
429 debug!(
430 "Download {} from {} returned status {}",
431 hash,
432 server,
433 resp.status()
434 );
435 }
436 Err(e) => {
437 last_error = e.to_string();
438 }
439 }
440 }
441
442 Err(BlossomError::DownloadFailed(last_error))
443 }
444
445 pub async fn try_download(&self, hash: &str) -> Option<Vec<u8>> {
447 self.download(hash).await.ok()
448 }
449
450 async fn upload_to_server(
453 &self,
454 server: &str,
455 data: &[u8],
456 hash: &str,
457 auth_header: &str,
458 ) -> Result<bool, BlossomError> {
459 let url = format!("{}/upload", server.trim_end_matches('/'));
460
461 let resp = self
462 .http
463 .put(&url)
464 .header("Authorization", auth_header)
465 .header("Content-Type", "application/octet-stream")
466 .header("X-SHA-256", hash)
467 .body(data.to_vec())
468 .send()
469 .await?;
470
471 let status = resp.status();
472 if status.is_success() {
473 Ok(true) } else if status.as_u16() == 409 {
475 Ok(false) } else {
477 let text = resp.text().await.unwrap_or_default();
478 Err(BlossomError::UploadFailed(format!("{}: {}", status, text)))
479 }
480 }
481
482 async fn create_upload_auth(&self, hash: &str) -> Result<String, BlossomError> {
483 let now = std::time::SystemTime::now()
484 .duration_since(std::time::UNIX_EPOCH)
485 .unwrap()
486 .as_secs();
487 let expiration = now + 300; let tags = vec![
490 Tag::custom(TagKind::custom("t"), vec!["upload".to_string()]),
491 Tag::custom(TagKind::custom("x"), vec![hash.to_string()]),
492 Tag::custom(TagKind::custom("expiration"), vec![expiration.to_string()]),
493 ];
494 let event = EventBuilder::new(Kind::Custom(24242), "Upload", tags)
495 .to_event(&self.keys)
496 .map_err(|e| BlossomError::Signing(e.to_string()))?;
497
498 let json = event.as_json();
499 let encoded = base64::engine::general_purpose::STANDARD.encode(json);
500 Ok(format!("Nostr {}", encoded))
501 }
502}
503
504pub fn compute_sha256(data: &[u8]) -> String {
506 let mut hasher = Sha256::new();
507 hasher.update(data);
508 hex::encode(hasher.finalize())
509}
510
511#[cfg(feature = "store")]
513mod store_impl {
514 use super::*;
515 use async_trait::async_trait;
516 use hashtree_core::{to_hex, Hash, Store, StoreError};
517 use std::collections::HashMap;
518 use std::sync::RwLock;
519
520 pub struct BlossomStore {
525 client: BlossomClient,
526 cache: RwLock<HashMap<String, Vec<u8>>>,
527 }
528
529 impl BlossomStore {
530 pub fn new(client: BlossomClient) -> Self {
531 Self {
532 client,
533 cache: RwLock::new(HashMap::new()),
534 }
535 }
536
537 pub fn with_servers(keys: nostr::Keys, servers: Vec<String>) -> Self {
539 let client = BlossomClient::new(keys).with_servers(servers);
540 Self::new(client)
541 }
542
543 pub fn client(&self) -> &BlossomClient {
545 &self.client
546 }
547 }
548
549 #[async_trait]
550 impl Store for BlossomStore {
551 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
552 let key = to_hex(&hash);
554 let mut cache = self.cache.write().unwrap();
555 if cache.contains_key(&key) {
556 return Ok(false);
557 }
558 cache.insert(key, data);
559 Ok(true)
560 }
561
562 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
563 let key = to_hex(hash);
564
565 {
567 let cache = self.cache.read().unwrap();
568 if let Some(data) = cache.get(&key) {
569 return Ok(Some(data.clone()));
570 }
571 }
572
573 match self.client.try_download(&key).await {
575 Some(data) => {
576 let mut cache = self.cache.write().unwrap();
578 cache.insert(key, data.clone());
579 Ok(Some(data))
580 }
581 None => Ok(None),
582 }
583 }
584
585 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
586 let key = to_hex(hash);
587
588 {
590 let cache = self.cache.read().unwrap();
591 if cache.contains_key(&key) {
592 return Ok(true);
593 }
594 }
595
596 Ok(self.client.exists(&key).await)
598 }
599
600 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
601 let key = to_hex(hash);
603 let mut cache = self.cache.write().unwrap();
604 Ok(cache.remove(&key).is_some())
605 }
606 }
607}
608
609#[cfg(feature = "store")]
610pub use store_impl::BlossomStore;
611
612#[cfg(test)]
613mod tests {
614 use super::*;
615
616 #[test]
617 fn test_compute_sha256() {
618 let hash = compute_sha256(b"hello world");
619 assert_eq!(
620 hash,
621 "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
622 );
623 }
624
625 #[test]
626 fn test_client_builder() {
627 let keys = Keys::generate();
628 let client = BlossomClient::new(keys)
629 .with_servers(vec!["https://example.com".to_string()])
630 .with_timeout(Duration::from_secs(60));
631
632 assert_eq!(client.servers().len(), 1);
633 }
634
635 #[tokio::test]
636 async fn test_exists_on_server() {
637 let keys = Keys::generate();
638 let client = BlossomClient::new(keys).with_servers(vec!["https://example.com".to_string()]);
639
640 let result = client
642 .exists_on_server("abc123", "https://example.com")
643 .await;
644 assert!(!result); }
646
647 #[tokio::test]
648 async fn test_server_has_tree_samples() {
649 let keys = Keys::generate();
650 let client = BlossomClient::new(keys).with_servers(vec!["https://example.com".to_string()]);
651
652 let hashes = vec!["hash1", "hash2", "hash3"];
653 let result = client
655 .server_has_tree_samples("https://example.com", &hashes, 3)
656 .await;
657 assert!(!result); }
659
660 #[tokio::test]
661 async fn test_upload_to_all_servers() {
662 let keys = Keys::generate();
663 let client = BlossomClient::new(keys).with_servers(vec![
664 "https://example1.com".to_string(),
665 "https://example2.com".to_string(),
666 ]);
667
668 let result = client.upload_to_all_servers(b"test data").await;
671 assert!(result.is_err()); }
673
674 #[test]
675 fn test_local_daemon_priority() {
676 let keys = Keys::generate();
677 let client = BlossomClient::new_empty(keys)
678 .with_servers(vec![
679 "https://remote1.com".to_string(),
680 "https://remote2.com".to_string(),
681 ])
682 .with_local_daemon("http://127.0.0.1:8080".to_string());
683
684 assert_eq!(client.read_servers().len(), 3);
686 assert_eq!(client.read_servers()[0], "http://127.0.0.1:8080");
687 assert_eq!(client.read_servers()[1], "https://remote1.com");
688 assert_eq!(client.read_servers()[2], "https://remote2.com");
689 }
690
691 #[test]
692 fn test_local_daemon_not_duplicated() {
693 let keys = Keys::generate();
694 let client = BlossomClient::new_empty(keys)
696 .with_servers(vec![
697 "http://127.0.0.1:8080".to_string(),
698 "https://remote.com".to_string(),
699 ])
700 .with_local_daemon("http://127.0.0.1:8080".to_string());
701
702 assert_eq!(client.read_servers().len(), 2);
704 assert_eq!(client.read_servers()[0], "http://127.0.0.1:8080");
705 }
706}