Skip to main content

hashtree_s3/
lib.rs

1//! S3-backed storage for hashtree with non-blocking uploads.
2//!
3//! This crate provides an S3 storage backend that:
4//! - Stores data locally first (fast writes)
5//! - Syncs to S3 in the background (non-blocking)
6//! - Falls back to S3 if data not in local cache
7//!
8//! # Example
9//!
10//! ```ignore
11//! use hashtree_s3::{S3Store, S3Config};
12//! use hashtree_core::store::MemoryStore;
13//! use std::sync::Arc;
14//!
15//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
16//! let local_store = Arc::new(MemoryStore::new());
17//! let config = S3Config {
18//!     bucket: "my-bucket".to_string(),
19//!     prefix: Some("blobs/".to_string()),
20//!     region: None, // Uses AWS_REGION env var
21//!     endpoint: None, // For S3-compatible services
22//! };
23//!
24//! let _s3_store = S3Store::new(local_store, config).await?;
25//! # Ok(())
26//! # }
27//! ```
28
29use async_trait::async_trait;
30use aws_sdk_s3::primitives::ByteStream;
31use aws_sdk_s3::Client as S3Client;
32use hashtree_core::store::{Store, StoreError};
33use hashtree_core::types::{to_hex, Hash};
34use std::sync::Arc;
35use tokio::sync::mpsc;
36use tracing::{debug, error, info, warn};
37
38/// S3 configuration
39#[derive(Debug, Clone)]
40pub struct S3Config {
41    /// S3 bucket name
42    pub bucket: String,
43    /// Optional prefix for all keys (e.g., "blobs/")
44    pub prefix: Option<String>,
45    /// AWS region (defaults to AWS_REGION env var)
46    pub region: Option<String>,
47    /// Custom endpoint URL (for S3-compatible services like MinIO, R2, etc.)
48    pub endpoint: Option<String>,
49}
50
51/// Background sync task message
52enum SyncMessage {
53    /// Upload blob to S3
54    Upload { hash: Hash, data: Vec<u8> },
55    /// Delete blob from S3
56    Delete { hash: Hash },
57    /// Shutdown the sync task
58    Shutdown,
59}
60
61/// S3-backed store with local caching and background sync.
62///
63/// Writes go to the local store first (fast), then are synced to S3 in the background.
64/// Reads check local store first, then fall back to S3.
65pub struct S3Store<L: Store> {
66    /// Local store for fast access
67    local: Arc<L>,
68    /// S3 client
69    s3_client: S3Client,
70    /// S3 bucket name
71    bucket: String,
72    /// Key prefix
73    prefix: String,
74    /// Channel to send sync messages to background task
75    sync_tx: mpsc::UnboundedSender<SyncMessage>,
76}
77
78impl<L: Store + 'static> S3Store<L> {
79    /// Create a new S3 store wrapping a local store.
80    ///
81    /// Spawns a background task for non-blocking S3 uploads.
82    pub async fn new(local: Arc<L>, config: S3Config) -> Result<Self, S3StoreError> {
83        // Build AWS config
84        let mut aws_config_loader = aws_config::from_env();
85
86        if let Some(ref region) = config.region {
87            aws_config_loader =
88                aws_config_loader.region(aws_sdk_s3::config::Region::new(region.clone()));
89        }
90
91        let aws_config = aws_config_loader.load().await;
92
93        // Build S3 client
94        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
95
96        if let Some(ref endpoint) = config.endpoint {
97            s3_config_builder = s3_config_builder
98                .endpoint_url(endpoint)
99                .force_path_style(true); // Required for most S3-compatible services
100        }
101
102        let s3_client = S3Client::from_conf(s3_config_builder.build());
103
104        let prefix = config.prefix.unwrap_or_default();
105        let bucket = config.bucket.clone();
106
107        // Create channel for background sync
108        let (sync_tx, sync_rx) = mpsc::unbounded_channel();
109
110        // Spawn background sync task
111        let sync_client = s3_client.clone();
112        let sync_bucket = bucket.clone();
113        let sync_prefix = prefix.clone();
114
115        tokio::spawn(async move {
116            Self::sync_task(sync_rx, sync_client, sync_bucket, sync_prefix).await;
117        });
118
119        info!(
120            "S3Store initialized with bucket: {}, prefix: {}",
121            bucket, prefix
122        );
123
124        Ok(Self {
125            local,
126            s3_client,
127            bucket,
128            prefix,
129            sync_tx,
130        })
131    }
132
133    /// Background task that handles S3 uploads/deletes
134    async fn sync_task(
135        mut rx: mpsc::UnboundedReceiver<SyncMessage>,
136        client: S3Client,
137        bucket: String,
138        prefix: String,
139    ) {
140        info!("S3 sync task started");
141
142        while let Some(msg) = rx.recv().await {
143            match msg {
144                SyncMessage::Upload { hash, data } => {
145                    let key = format!("{}{}", prefix, to_hex(&hash));
146                    debug!(
147                        "S3 uploading {} ({} bytes)",
148                        &key[..16.min(key.len())],
149                        data.len()
150                    );
151
152                    match client
153                        .put_object()
154                        .bucket(&bucket)
155                        .key(&key)
156                        .body(ByteStream::from(data))
157                        .send()
158                        .await
159                    {
160                        Ok(_) => {
161                            debug!("S3 upload complete: {}", &key[..16.min(key.len())]);
162                        }
163                        Err(e) => {
164                            error!("S3 upload failed for {}: {}", &key[..16.min(key.len())], e);
165                        }
166                    }
167                }
168                SyncMessage::Delete { hash } => {
169                    let key = format!("{}{}", prefix, to_hex(&hash));
170                    debug!("S3 deleting {}", &key[..16.min(key.len())]);
171
172                    match client
173                        .delete_object()
174                        .bucket(&bucket)
175                        .key(&key)
176                        .send()
177                        .await
178                    {
179                        Ok(_) => {
180                            debug!("S3 delete complete: {}", &key[..16.min(key.len())]);
181                        }
182                        Err(e) => {
183                            error!("S3 delete failed for {}: {}", &key[..16.min(key.len())], e);
184                        }
185                    }
186                }
187                SyncMessage::Shutdown => {
188                    info!("S3 sync task shutting down");
189                    break;
190                }
191            }
192        }
193    }
194
195    /// Get the S3 key for a hash
196    fn s3_key(&self, hash: &Hash) -> String {
197        format!("{}{}", self.prefix, to_hex(hash))
198    }
199
200    /// Fetch from S3 directly (used when local miss)
201    async fn fetch_from_s3(&self, hash: &Hash) -> Result<Option<Vec<u8>>, S3StoreError> {
202        let key = self.s3_key(hash);
203
204        match self
205            .s3_client
206            .get_object()
207            .bucket(&self.bucket)
208            .key(&key)
209            .send()
210            .await
211        {
212            Ok(output) => {
213                let data = output
214                    .body
215                    .collect()
216                    .await
217                    .map_err(|e| S3StoreError::S3(format!("Failed to read body: {}", e)))?;
218                Ok(Some(data.into_bytes().to_vec()))
219            }
220            Err(e) => {
221                // Check if it's a "not found" error
222                let service_err = e.into_service_error();
223                if service_err.is_no_such_key() {
224                    Ok(None)
225                } else {
226                    Err(S3StoreError::S3(format!("S3 get failed: {}", service_err)))
227                }
228            }
229        }
230    }
231
232    /// Check if exists in S3 directly
233    async fn exists_in_s3(&self, hash: &Hash) -> Result<bool, S3StoreError> {
234        let key = self.s3_key(hash);
235
236        match self
237            .s3_client
238            .head_object()
239            .bucket(&self.bucket)
240            .key(&key)
241            .send()
242            .await
243        {
244            Ok(_) => Ok(true),
245            Err(e) => {
246                let service_err = e.into_service_error();
247                if service_err.is_not_found() {
248                    Ok(false)
249                } else {
250                    Err(S3StoreError::S3(format!("S3 head failed: {}", service_err)))
251                }
252            }
253        }
254    }
255
256    /// Queue a blob for upload to S3 (non-blocking)
257    fn queue_upload(&self, hash: Hash, data: Vec<u8>) {
258        if let Err(e) = self.sync_tx.send(SyncMessage::Upload { hash, data }) {
259            warn!("Failed to queue S3 upload: {}", e);
260        }
261    }
262
263    /// Queue a blob for deletion from S3 (non-blocking)
264    fn queue_delete(&self, hash: Hash) {
265        if let Err(e) = self.sync_tx.send(SyncMessage::Delete { hash }) {
266            warn!("Failed to queue S3 delete: {}", e);
267        }
268    }
269
270    /// Shutdown the background sync task
271    pub fn shutdown(&self) {
272        let _ = self.sync_tx.send(SyncMessage::Shutdown);
273    }
274}
275
276impl<L: Store> Drop for S3Store<L> {
277    fn drop(&mut self) {
278        let _ = self.sync_tx.send(SyncMessage::Shutdown);
279    }
280}
281
282#[async_trait]
283impl<L: Store + 'static> Store for S3Store<L> {
284    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
285        // Store locally first (fast)
286        let is_new = self.local.put(hash, data.clone()).await?;
287
288        // Queue for S3 upload in background (non-blocking)
289        if is_new {
290            self.queue_upload(hash, data);
291        }
292
293        Ok(is_new)
294    }
295
296    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
297        // Try local first
298        if let Some(data) = self.local.get(hash).await? {
299            return Ok(Some(data));
300        }
301
302        // Fall back to S3
303        match self.fetch_from_s3(hash).await {
304            Ok(Some(data)) => {
305                // Cache locally for future access
306                let _ = self.local.put(*hash, data.clone()).await;
307                Ok(Some(data))
308            }
309            Ok(None) => Ok(None),
310            Err(e) => {
311                warn!("S3 fetch failed, returning None: {}", e);
312                Ok(None)
313            }
314        }
315    }
316
317    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
318        // Check local first
319        if self.local.has(hash).await? {
320            return Ok(true);
321        }
322
323        // Check S3
324        match self.exists_in_s3(hash).await {
325            Ok(exists) => Ok(exists),
326            Err(e) => {
327                warn!("S3 exists check failed, returning false: {}", e);
328                Ok(false)
329            }
330        }
331    }
332
333    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
334        // Delete locally
335        let deleted = self.local.delete(hash).await?;
336
337        // Queue S3 deletion in background
338        self.queue_delete(*hash);
339
340        Ok(deleted)
341    }
342}
343
344/// S3 store specific errors
345#[derive(Debug, thiserror::Error)]
346pub enum S3StoreError {
347    #[error("S3 error: {0}")]
348    S3(String),
349    #[error("Configuration error: {0}")]
350    Config(String),
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356    use hashtree_core::hash::sha256;
357
358    #[test]
359    fn test_s3_key_generation() {
360        // Just test that prefix is applied correctly
361        let prefix = "blobs/";
362        let hash = sha256(b"test");
363        let key = format!("{}{}", prefix, to_hex(&hash));
364        assert!(key.starts_with("blobs/"));
365        assert_eq!(key.len(), 6 + 64); // "blobs/" + 64 hex chars
366    }
367
368    #[test]
369    fn test_s3_config() {
370        let config = S3Config {
371            bucket: "test-bucket".to_string(),
372            prefix: Some("data/".to_string()),
373            region: Some("us-east-1".to_string()),
374            endpoint: None,
375        };
376
377        assert_eq!(config.bucket, "test-bucket");
378        assert_eq!(config.prefix, Some("data/".to_string()));
379    }
380}