Skip to main content

hashtree_blossom/
lib.rs

1//! Blossom protocol client for hashtree
2//!
3//! Provides upload/download of blobs to Blossom servers with NIP-98 authentication.
4//!
5//! # Example
6//!
7//! ```rust,no_run
8//! use hashtree_blossom::BlossomClient;
9//! use nostr::Keys;
10//!
11//! #[tokio::main]
12//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
13//!     let keys = Keys::generate();
14//!     let client = BlossomClient::new(keys)
15//!         .with_servers(vec!["https://blossom.example.com".to_string()]);
16//!
17//!     // Upload
18//!     let hash = client.upload(b"hello world").await?;
19//!     println!("Uploaded: {}", hash);
20//!
21//!     // Download
22//!     let data = client.download(&hash).await?;
23//!     assert_eq!(data, b"hello world");
24//!
25//!     Ok(())
26//! }
27//! ```
28
29use base64::Engine;
30use nostr::prelude::*;
31use sha2::{Digest, Sha256};
32use std::time::Duration;
33use thiserror::Error;
34use tracing::{debug, warn};
35
36#[derive(Error, Debug)]
37pub enum BlossomError {
38    #[error("HTTP error: {0}")]
39    Http(#[from] reqwest::Error),
40
41    #[error("No servers configured")]
42    NoServers,
43
44    #[error("Upload failed: {0}")]
45    UploadFailed(String),
46
47    #[error("Download failed on all servers: {0}")]
48    DownloadFailed(String),
49
50    #[error("Hash mismatch: expected {expected}, got {actual}")]
51    HashMismatch { expected: String, actual: String },
52
53    #[error("Signing error: {0}")]
54    Signing(String),
55}
56
57/// Blossom protocol client
58#[derive(Clone)]
59pub struct BlossomClient {
60    keys: Keys,
61    /// Servers for reading (download)
62    read_servers: Vec<String>,
63    /// Servers for writing (upload)
64    write_servers: Vec<String>,
65    http: reqwest::Client,
66    timeout: Duration,
67}
68
69impl BlossomClient {
70    /// Create a new client with the given keys
71    /// Automatically loads server config from ~/.hashtree/config.toml
72    /// and prioritizes local daemon if running
73    #[cfg(feature = "config")]
74    pub fn new(keys: Keys) -> Self {
75        let config = hashtree_config::Config::load_or_default();
76        let mut read_servers = config.blossom.all_read_servers();
77
78        // Prioritize local daemon if running
79        if let Some(local_url) =
80            hashtree_config::detect_local_daemon_url(Some(&config.server.bind_address))
81        {
82            if !read_servers.iter().any(|s| s == &local_url) {
83                debug!(
84                    "Local daemon detected at {}, prioritizing for reads",
85                    local_url
86                );
87                read_servers.insert(0, local_url);
88            }
89        }
90
91        Self {
92            keys,
93            read_servers,
94            write_servers: config.blossom.all_write_servers(),
95            http: reqwest::Client::builder()
96                .timeout(Duration::from_secs(30))
97                .build()
98                .unwrap(),
99            timeout: Duration::from_secs(30),
100        }
101    }
102
103    /// Create a new client with the given keys (no config loading)
104    #[cfg(not(feature = "config"))]
105    pub fn new(keys: Keys) -> Self {
106        Self {
107            keys,
108            read_servers: vec![],
109            write_servers: vec![],
110            http: reqwest::Client::builder()
111                .timeout(Duration::from_secs(30))
112                .build()
113                .unwrap(),
114            timeout: Duration::from_secs(30),
115        }
116    }
117
118    /// Create a new client without loading config (empty servers)
119    pub fn new_empty(keys: Keys) -> Self {
120        Self {
121            keys,
122            read_servers: vec![],
123            write_servers: vec![],
124            http: reqwest::Client::builder()
125                .timeout(Duration::from_secs(30))
126                .build()
127                .unwrap(),
128            timeout: Duration::from_secs(30),
129        }
130    }
131
132    /// Set the Blossom servers to use (for both read and write)
133    pub fn with_servers(mut self, servers: Vec<String>) -> Self {
134        self.read_servers = servers.clone();
135        self.write_servers = servers;
136        self
137    }
138
139    /// Set read-only servers (for downloads)
140    pub fn with_read_servers(mut self, servers: Vec<String>) -> Self {
141        self.read_servers = servers;
142        self
143    }
144
145    /// Set write servers (for uploads)
146    pub fn with_write_servers(mut self, servers: Vec<String>) -> Self {
147        self.write_servers = servers;
148        self
149    }
150
151    /// Set request timeout
152    pub fn with_timeout(mut self, timeout: Duration) -> Self {
153        self.timeout = timeout;
154        self.http = reqwest::Client::builder().timeout(timeout).build().unwrap();
155        self
156    }
157
158    /// Set local daemon URL (prioritized for reads)
159    /// The local daemon is prepended to read_servers if not already present
160    pub fn with_local_daemon(mut self, url: String) -> Self {
161        // Don't add if already present
162        if !self.read_servers.iter().any(|s| s == &url) {
163            self.read_servers.insert(0, url);
164        }
165        self
166    }
167
168    /// Get configured read servers
169    pub fn read_servers(&self) -> &[String] {
170        &self.read_servers
171    }
172
173    /// Get configured write servers
174    pub fn write_servers(&self) -> &[String] {
175        &self.write_servers
176    }
177
178    /// Get configured servers (returns read servers for backwards compatibility)
179    pub fn servers(&self) -> &[String] {
180        &self.read_servers
181    }
182
183    /// Upload data to Blossom servers
184    /// Returns the SHA256 hash of the uploaded data
185    pub async fn upload(&self, data: &[u8]) -> Result<String, BlossomError> {
186        if self.write_servers.is_empty() {
187            return Err(BlossomError::NoServers);
188        }
189
190        let hash = compute_sha256(data);
191        let auth_header = self.create_upload_auth(&hash).await?;
192
193        for server in &self.write_servers {
194            match self
195                .upload_to_server(server, data, &hash, &auth_header)
196                .await
197            {
198                Ok(_) => {
199                    debug!("Uploaded {} to {}", &hash[..12], server);
200                    return Ok(hash);
201                }
202                Err(e) => {
203                    warn!("Upload to {} failed: {}", server, e);
204                    continue;
205                }
206            }
207        }
208
209        Err(BlossomError::UploadFailed("all servers failed".to_string()))
210    }
211
212    /// Upload data only if it doesn't already exist
213    /// Returns (hash, was_uploaded) tuple
214    ///
215    /// For small files (<256KB), skips existence check and relies on server returning 409.
216    /// For large files (>=256KB), does HEAD check first to save bandwidth.
217    /// Retries up to 3 times with exponential backoff on transient failures.
218    pub async fn upload_if_missing(&self, data: &[u8]) -> Result<(String, bool), BlossomError> {
219        if self.write_servers.is_empty() {
220            return Err(BlossomError::NoServers);
221        }
222
223        let hash = compute_sha256(data);
224
225        // Warn if uploading empty data
226        if data.is_empty() {
227            warn!("Attempting to upload empty blob with hash {}", hash);
228        }
229
230        // For large files, check existence first to save bandwidth
231        const HEAD_CHECK_THRESHOLD: usize = 256 * 1024; // 256KB
232        if data.len() >= HEAD_CHECK_THRESHOLD && self.exists(&hash).await {
233            debug!("Large blob {} already exists (skipped upload)", &hash[..12]);
234            return Ok((hash, false));
235        }
236
237        const MAX_RETRIES: u32 = 3;
238        let mut last_error = String::new();
239
240        for attempt in 0..MAX_RETRIES {
241            if attempt > 0 {
242                // Exponential backoff: 100ms, 200ms, 400ms
243                let delay = Duration::from_millis(100 * (1 << (attempt - 1)));
244                debug!(
245                    "Retrying upload {} (attempt {}/{}), waiting {:?}",
246                    &hash[..12],
247                    attempt + 1,
248                    MAX_RETRIES,
249                    delay
250                );
251                tokio::time::sleep(delay).await;
252            }
253
254            // Regenerate auth header for each retry (in case of expiration)
255            let auth_header = self.create_upload_auth(&hash).await?;
256
257            for server in &self.write_servers {
258                match self
259                    .upload_to_server(server, data, &hash, &auth_header)
260                    .await
261                {
262                    Ok(was_new) => {
263                        if was_new {
264                            debug!("Uploaded {} to {}", &hash[..12], server);
265                        } else {
266                            debug!("Blob {} already exists on {}", &hash[..12], server);
267                        }
268                        return Ok((hash, was_new));
269                    }
270                    Err(e) => {
271                        last_error = format!("{}: {}", server, e);
272                        warn!("Upload to {} failed: {}", server, e);
273                        continue;
274                    }
275                }
276            }
277        }
278
279        Err(BlossomError::UploadFailed(format!(
280            "all servers failed after {} retries (last: {})",
281            MAX_RETRIES, last_error
282        )))
283    }
284
285    /// Check if a blob exists on any write server
286    pub async fn exists(&self, hash: &str) -> bool {
287        for server in &self.write_servers {
288            if self.exists_on_server(hash, server).await {
289                return true;
290            }
291        }
292        false
293    }
294
295    /// Check if a blob exists on a specific server
296    pub async fn exists_on_server(&self, hash: &str, server: &str) -> bool {
297        let url = format!("{}/{}.bin", server.trim_end_matches('/'), hash);
298        debug!("Checking exists: {}", url);
299        if let Ok(resp) = self.http.head(&url).send().await {
300            debug!("  -> status: {}", resp.status());
301            if resp.status().is_success() {
302                // Verify content-type is binary, not HTML error page
303                if let Some(ct) = resp.headers().get("content-type") {
304                    if let Ok(ct_str) = ct.to_str() {
305                        if ct_str.starts_with("text/html") {
306                            return false;
307                        }
308                    }
309                }
310                // Verify content-length > 0
311                if let Some(cl) = resp.headers().get("content-length") {
312                    if let Ok(cl_str) = cl.to_str() {
313                        if cl_str == "0" {
314                            return false;
315                        }
316                    }
317                }
318                return true;
319            }
320        }
321        false
322    }
323
324    /// Check if server has a tree by sampling hashes (parallel checks)
325    pub async fn server_has_tree_samples(
326        &self,
327        server: &str,
328        hashes: &[&str],
329        sample_size: usize,
330    ) -> bool {
331        use futures::future::join_all;
332        if hashes.is_empty() {
333            return false;
334        }
335        // Spread samples across the hash list
336        let step = (hashes.len() / sample_size.min(hashes.len())).max(1);
337        let samples: Vec<_> = hashes.iter().step_by(step).take(sample_size).collect();
338        let checks: Vec<_> = samples
339            .iter()
340            .map(|h| self.exists_on_server(h, server))
341            .collect();
342        join_all(checks).await.iter().all(|&exists| exists)
343    }
344
345    /// Upload to all write servers in parallel, returns (hash, success_count)
346    pub async fn upload_to_all_servers(
347        &self,
348        data: &[u8],
349    ) -> Result<(String, usize), BlossomError> {
350        use futures::future::join_all;
351        if self.write_servers.is_empty() {
352            return Err(BlossomError::NoServers);
353        }
354        let hash = compute_sha256(data);
355        let auth = self.create_upload_auth(&hash).await?;
356        let uploads: Vec<_> = self
357            .write_servers
358            .iter()
359            .map(|s| self.upload_to_server(s, data, &hash, &auth))
360            .collect();
361        let results = join_all(uploads).await;
362        let ok_count = results.iter().filter(|r| r.is_ok()).count();
363        if ok_count == 0 {
364            return Err(BlossomError::UploadFailed("all servers failed".to_string()));
365        }
366        Ok((hash, ok_count))
367    }
368
369    /// Download data from Blossom servers
370    /// Verifies the hash matches before returning
371    pub async fn download(&self, hash: &str) -> Result<Vec<u8>, BlossomError> {
372        if self.read_servers.is_empty() {
373            return Err(BlossomError::NoServers);
374        }
375
376        let mut last_error = String::new();
377
378        for server in &self.read_servers {
379            let url = format!("{}/{}.bin", server.trim_end_matches('/'), hash);
380            match self.http.get(&url).send().await {
381                Ok(resp) if resp.status().is_success() => {
382                    // Capture X-Source header before consuming body (if from local daemon)
383                    let x_source = resp
384                        .headers()
385                        .get("x-source")
386                        .and_then(|v| v.to_str().ok())
387                        .map(|s| s.to_string());
388
389                    match resp.bytes().await {
390                        Ok(bytes) => {
391                            let computed = compute_sha256(&bytes);
392                            if computed == hash {
393                                if let Some(source) = x_source {
394                                    debug!(
395                                        "Downloaded {} ({} bytes) via {} [source: {}]",
396                                        &hash[..12.min(hash.len())],
397                                        bytes.len(),
398                                        server,
399                                        source
400                                    );
401                                } else {
402                                    debug!(
403                                        "Downloaded {} ({} bytes) from {}",
404                                        &hash[..12.min(hash.len())],
405                                        bytes.len(),
406                                        server
407                                    );
408                                }
409                                return Ok(bytes.to_vec());
410                            } else {
411                                last_error = format!("hash mismatch from {}: expected {}, got {} ({} bytes received)",
412                                    server, hash, computed, bytes.len());
413                                warn!(
414                                    "Hash mismatch downloading {} from {}: got {} ({} bytes)",
415                                    hash,
416                                    server,
417                                    &computed[..12.min(computed.len())],
418                                    bytes.len()
419                                );
420                            }
421                        }
422                        Err(e) => {
423                            last_error = e.to_string();
424                        }
425                    }
426                }
427                Ok(resp) => {
428                    last_error = format!("{} returned {}", server, resp.status());
429                    debug!(
430                        "Download {} from {} returned status {}",
431                        hash,
432                        server,
433                        resp.status()
434                    );
435                }
436                Err(e) => {
437                    last_error = e.to_string();
438                }
439            }
440        }
441
442        Err(BlossomError::DownloadFailed(last_error))
443    }
444
445    /// Download if available, returns None if not found
446    pub async fn try_download(&self, hash: &str) -> Option<Vec<u8>> {
447        self.download(hash).await.ok()
448    }
449
450    /// Upload to a single server
451    /// Returns Ok(true) if uploaded, Ok(false) if already exists (409)
452    async fn upload_to_server(
453        &self,
454        server: &str,
455        data: &[u8],
456        hash: &str,
457        auth_header: &str,
458    ) -> Result<bool, BlossomError> {
459        let url = format!("{}/upload", server.trim_end_matches('/'));
460
461        let resp = self
462            .http
463            .put(&url)
464            .header("Authorization", auth_header)
465            .header("Content-Type", "application/octet-stream")
466            .header("X-SHA-256", hash)
467            .body(data.to_vec())
468            .send()
469            .await?;
470
471        let status = resp.status();
472        if status.is_success() {
473            Ok(true) // Actually uploaded
474        } else if status.as_u16() == 409 {
475            Ok(false) // Already exists
476        } else {
477            let text = resp.text().await.unwrap_or_default();
478            Err(BlossomError::UploadFailed(format!("{}: {}", status, text)))
479        }
480    }
481
482    async fn create_upload_auth(&self, hash: &str) -> Result<String, BlossomError> {
483        let now = std::time::SystemTime::now()
484            .duration_since(std::time::UNIX_EPOCH)
485            .unwrap()
486            .as_secs();
487        let expiration = now + 300; // 5 minutes
488
489        let tags = vec![
490            Tag::custom(TagKind::custom("t"), vec!["upload".to_string()]),
491            Tag::custom(TagKind::custom("x"), vec![hash.to_string()]),
492            Tag::custom(TagKind::custom("expiration"), vec![expiration.to_string()]),
493        ];
494        let event = EventBuilder::new(Kind::Custom(24242), "Upload", tags)
495            .to_event(&self.keys)
496            .map_err(|e| BlossomError::Signing(e.to_string()))?;
497
498        let json = event.as_json();
499        let encoded = base64::engine::general_purpose::STANDARD.encode(json);
500        Ok(format!("Nostr {}", encoded))
501    }
502}
503
504/// Compute SHA256 hash of data, returning hex string
505pub fn compute_sha256(data: &[u8]) -> String {
506    let mut hasher = Sha256::new();
507    hasher.update(data);
508    hex::encode(hasher.finalize())
509}
510
511/// Store implementation for Blossom (read-only, fetches from servers on demand)
512#[cfg(feature = "store")]
513mod store_impl {
514    use super::*;
515    use async_trait::async_trait;
516    use hashtree_core::{to_hex, Hash, Store, StoreError};
517    use std::collections::HashMap;
518    use std::sync::RwLock;
519
520    /// Blossom-backed store (read-only with local cache)
521    ///
522    /// Fetches data from Blossom servers on demand and caches locally.
523    /// Write operations are no-ops (data should be uploaded separately).
524    pub struct BlossomStore {
525        client: BlossomClient,
526        cache: RwLock<HashMap<String, Vec<u8>>>,
527    }
528
529    impl BlossomStore {
530        pub fn new(client: BlossomClient) -> Self {
531            Self {
532                client,
533                cache: RwLock::new(HashMap::new()),
534            }
535        }
536
537        /// Create with servers (convenience constructor)
538        pub fn with_servers(keys: nostr::Keys, servers: Vec<String>) -> Self {
539            let client = BlossomClient::new(keys).with_servers(servers);
540            Self::new(client)
541        }
542
543        /// Get underlying client
544        pub fn client(&self) -> &BlossomClient {
545            &self.client
546        }
547    }
548
549    #[async_trait]
550    impl Store for BlossomStore {
551        async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
552            // Cache locally, but don't upload (caller should upload explicitly)
553            let key = to_hex(&hash);
554            let mut cache = self.cache.write().unwrap();
555            if cache.contains_key(&key) {
556                return Ok(false);
557            }
558            cache.insert(key, data);
559            Ok(true)
560        }
561
562        async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
563            let key = to_hex(hash);
564
565            // Check cache first
566            {
567                let cache = self.cache.read().unwrap();
568                if let Some(data) = cache.get(&key) {
569                    return Ok(Some(data.clone()));
570                }
571            }
572
573            // Fetch from Blossom
574            match self.client.try_download(&key).await {
575                Some(data) => {
576                    // Cache for future use
577                    let mut cache = self.cache.write().unwrap();
578                    cache.insert(key, data.clone());
579                    Ok(Some(data))
580                }
581                None => Ok(None),
582            }
583        }
584
585        async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
586            let key = to_hex(hash);
587
588            // Check cache first
589            {
590                let cache = self.cache.read().unwrap();
591                if cache.contains_key(&key) {
592                    return Ok(true);
593                }
594            }
595
596            // Check Blossom
597            Ok(self.client.exists(&key).await)
598        }
599
600        async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
601            // Only delete from local cache (can't delete from Blossom)
602            let key = to_hex(hash);
603            let mut cache = self.cache.write().unwrap();
604            Ok(cache.remove(&key).is_some())
605        }
606    }
607}
608
609#[cfg(feature = "store")]
610pub use store_impl::BlossomStore;
611
612#[cfg(test)]
613mod tests {
614    use super::*;
615
616    #[test]
617    fn test_compute_sha256() {
618        let hash = compute_sha256(b"hello world");
619        assert_eq!(
620            hash,
621            "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
622        );
623    }
624
625    #[test]
626    fn test_client_builder() {
627        let keys = Keys::generate();
628        let client = BlossomClient::new(keys)
629            .with_servers(vec!["https://example.com".to_string()])
630            .with_timeout(Duration::from_secs(60));
631
632        assert_eq!(client.servers().len(), 1);
633    }
634
635    #[tokio::test]
636    async fn test_exists_on_server() {
637        let keys = Keys::generate();
638        let client = BlossomClient::new(keys).with_servers(vec!["https://example.com".to_string()]);
639
640        // Method should exist and return bool
641        let result = client
642            .exists_on_server("abc123", "https://example.com")
643            .await;
644        assert!(!result); // Non-existent hash
645    }
646
647    #[tokio::test]
648    async fn test_server_has_tree_samples() {
649        let keys = Keys::generate();
650        let client = BlossomClient::new(keys).with_servers(vec!["https://example.com".to_string()]);
651
652        let hashes = vec!["hash1", "hash2", "hash3"];
653        // Method should exist and check samples
654        let result = client
655            .server_has_tree_samples("https://example.com", &hashes, 3)
656            .await;
657        assert!(!result); // Non-existent hashes
658    }
659
660    #[tokio::test]
661    async fn test_upload_to_all_servers() {
662        let keys = Keys::generate();
663        let client = BlossomClient::new(keys).with_servers(vec![
664            "https://example1.com".to_string(),
665            "https://example2.com".to_string(),
666        ]);
667
668        // Method should exist and return (hash, server_count)
669        // Will fail since servers don't exist, but should compile
670        let result = client.upload_to_all_servers(b"test data").await;
671        assert!(result.is_err()); // Expected to fail - servers don't exist
672    }
673
674    #[test]
675    fn test_local_daemon_priority() {
676        let keys = Keys::generate();
677        let client = BlossomClient::new_empty(keys)
678            .with_servers(vec![
679                "https://remote1.com".to_string(),
680                "https://remote2.com".to_string(),
681            ])
682            .with_local_daemon("http://127.0.0.1:8080".to_string());
683
684        // Local daemon should be first in read_servers
685        assert_eq!(client.read_servers().len(), 3);
686        assert_eq!(client.read_servers()[0], "http://127.0.0.1:8080");
687        assert_eq!(client.read_servers()[1], "https://remote1.com");
688        assert_eq!(client.read_servers()[2], "https://remote2.com");
689    }
690
691    #[test]
692    fn test_local_daemon_not_duplicated() {
693        let keys = Keys::generate();
694        // If local daemon is already in servers, don't add it again
695        let client = BlossomClient::new_empty(keys)
696            .with_servers(vec![
697                "http://127.0.0.1:8080".to_string(),
698                "https://remote.com".to_string(),
699            ])
700            .with_local_daemon("http://127.0.0.1:8080".to_string());
701
702        // Should not duplicate
703        assert_eq!(client.read_servers().len(), 2);
704        assert_eq!(client.read_servers()[0], "http://127.0.0.1:8080");
705    }
706}