Skip to main content

hashtree_s3/
lib.rs

1//! S3-backed storage for hashtree with non-blocking uploads.
2//!
3//! This crate provides an S3 storage backend that:
4//! - Stores data locally first (fast writes)
5//! - Syncs to S3 in the background (non-blocking)
6//! - Falls back to S3 if data not in local cache
7//!
8//! # Example
9//!
10//! ```ignore
11//! use hashtree_s3::{S3Store, S3Config};
12//! use hashtree_core::store::MemoryStore;
13//! use std::sync::Arc;
14//!
15//! let local_store = Arc::new(MemoryStore::new());
16//! let config = S3Config {
17//!     bucket: "my-bucket".to_string(),
18//!     prefix: Some("blobs/".to_string()),
19//!     region: None, // Uses AWS_REGION env var
20//!     endpoint: None, // For S3-compatible services
21//! };
22//!
23//! let s3_store = S3Store::new(local_store, config).await?;
24//! ```
25
26use async_trait::async_trait;
27use aws_sdk_s3::primitives::ByteStream;
28use aws_sdk_s3::Client as S3Client;
29use hashtree_core::store::{Store, StoreError};
30use hashtree_core::types::{to_hex, Hash};
31use std::sync::Arc;
32use tokio::sync::mpsc;
33use tracing::{debug, error, info, warn};
34
35/// S3 configuration
36#[derive(Debug, Clone)]
37pub struct S3Config {
38    /// S3 bucket name
39    pub bucket: String,
40    /// Optional prefix for all keys (e.g., "blobs/")
41    pub prefix: Option<String>,
42    /// AWS region (defaults to AWS_REGION env var)
43    pub region: Option<String>,
44    /// Custom endpoint URL (for S3-compatible services like MinIO, R2, etc.)
45    pub endpoint: Option<String>,
46}
47
48/// Background sync task message
49enum SyncMessage {
50    /// Upload blob to S3
51    Upload { hash: Hash, data: Vec<u8> },
52    /// Delete blob from S3
53    Delete { hash: Hash },
54    /// Shutdown the sync task
55    Shutdown,
56}
57
58/// S3-backed store with local caching and background sync.
59///
60/// Writes go to the local store first (fast), then are synced to S3 in the background.
61/// Reads check local store first, then fall back to S3.
62pub struct S3Store<L: Store> {
63    /// Local store for fast access
64    local: Arc<L>,
65    /// S3 client
66    s3_client: S3Client,
67    /// S3 bucket name
68    bucket: String,
69    /// Key prefix
70    prefix: String,
71    /// Channel to send sync messages to background task
72    sync_tx: mpsc::UnboundedSender<SyncMessage>,
73}
74
75impl<L: Store + 'static> S3Store<L> {
76    /// Create a new S3 store wrapping a local store.
77    ///
78    /// Spawns a background task for non-blocking S3 uploads.
79    pub async fn new(local: Arc<L>, config: S3Config) -> Result<Self, S3StoreError> {
80        // Build AWS config
81        let mut aws_config_loader = aws_config::from_env();
82
83        if let Some(ref region) = config.region {
84            aws_config_loader =
85                aws_config_loader.region(aws_sdk_s3::config::Region::new(region.clone()));
86        }
87
88        let aws_config = aws_config_loader.load().await;
89
90        // Build S3 client
91        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
92
93        if let Some(ref endpoint) = config.endpoint {
94            s3_config_builder = s3_config_builder
95                .endpoint_url(endpoint)
96                .force_path_style(true); // Required for most S3-compatible services
97        }
98
99        let s3_client = S3Client::from_conf(s3_config_builder.build());
100
101        let prefix = config.prefix.unwrap_or_default();
102        let bucket = config.bucket.clone();
103
104        // Create channel for background sync
105        let (sync_tx, sync_rx) = mpsc::unbounded_channel();
106
107        // Spawn background sync task
108        let sync_client = s3_client.clone();
109        let sync_bucket = bucket.clone();
110        let sync_prefix = prefix.clone();
111
112        tokio::spawn(async move {
113            Self::sync_task(sync_rx, sync_client, sync_bucket, sync_prefix).await;
114        });
115
116        info!(
117            "S3Store initialized with bucket: {}, prefix: {}",
118            bucket, prefix
119        );
120
121        Ok(Self {
122            local,
123            s3_client,
124            bucket,
125            prefix,
126            sync_tx,
127        })
128    }
129
130    /// Background task that handles S3 uploads/deletes
131    async fn sync_task(
132        mut rx: mpsc::UnboundedReceiver<SyncMessage>,
133        client: S3Client,
134        bucket: String,
135        prefix: String,
136    ) {
137        info!("S3 sync task started");
138
139        while let Some(msg) = rx.recv().await {
140            match msg {
141                SyncMessage::Upload { hash, data } => {
142                    let key = format!("{}{}", prefix, to_hex(&hash));
143                    debug!(
144                        "S3 uploading {} ({} bytes)",
145                        &key[..16.min(key.len())],
146                        data.len()
147                    );
148
149                    match client
150                        .put_object()
151                        .bucket(&bucket)
152                        .key(&key)
153                        .body(ByteStream::from(data))
154                        .send()
155                        .await
156                    {
157                        Ok(_) => {
158                            debug!("S3 upload complete: {}", &key[..16.min(key.len())]);
159                        }
160                        Err(e) => {
161                            error!("S3 upload failed for {}: {}", &key[..16.min(key.len())], e);
162                        }
163                    }
164                }
165                SyncMessage::Delete { hash } => {
166                    let key = format!("{}{}", prefix, to_hex(&hash));
167                    debug!("S3 deleting {}", &key[..16.min(key.len())]);
168
169                    match client
170                        .delete_object()
171                        .bucket(&bucket)
172                        .key(&key)
173                        .send()
174                        .await
175                    {
176                        Ok(_) => {
177                            debug!("S3 delete complete: {}", &key[..16.min(key.len())]);
178                        }
179                        Err(e) => {
180                            error!("S3 delete failed for {}: {}", &key[..16.min(key.len())], e);
181                        }
182                    }
183                }
184                SyncMessage::Shutdown => {
185                    info!("S3 sync task shutting down");
186                    break;
187                }
188            }
189        }
190    }
191
192    /// Get the S3 key for a hash
193    fn s3_key(&self, hash: &Hash) -> String {
194        format!("{}{}", self.prefix, to_hex(hash))
195    }
196
197    /// Fetch from S3 directly (used when local miss)
198    async fn fetch_from_s3(&self, hash: &Hash) -> Result<Option<Vec<u8>>, S3StoreError> {
199        let key = self.s3_key(hash);
200
201        match self
202            .s3_client
203            .get_object()
204            .bucket(&self.bucket)
205            .key(&key)
206            .send()
207            .await
208        {
209            Ok(output) => {
210                let data = output
211                    .body
212                    .collect()
213                    .await
214                    .map_err(|e| S3StoreError::S3(format!("Failed to read body: {}", e)))?;
215                Ok(Some(data.into_bytes().to_vec()))
216            }
217            Err(e) => {
218                // Check if it's a "not found" error
219                let service_err = e.into_service_error();
220                if service_err.is_no_such_key() {
221                    Ok(None)
222                } else {
223                    Err(S3StoreError::S3(format!("S3 get failed: {}", service_err)))
224                }
225            }
226        }
227    }
228
229    /// Check if exists in S3 directly
230    async fn exists_in_s3(&self, hash: &Hash) -> Result<bool, S3StoreError> {
231        let key = self.s3_key(hash);
232
233        match self
234            .s3_client
235            .head_object()
236            .bucket(&self.bucket)
237            .key(&key)
238            .send()
239            .await
240        {
241            Ok(_) => Ok(true),
242            Err(e) => {
243                let service_err = e.into_service_error();
244                if service_err.is_not_found() {
245                    Ok(false)
246                } else {
247                    Err(S3StoreError::S3(format!("S3 head failed: {}", service_err)))
248                }
249            }
250        }
251    }
252
253    /// Queue a blob for upload to S3 (non-blocking)
254    fn queue_upload(&self, hash: Hash, data: Vec<u8>) {
255        if let Err(e) = self.sync_tx.send(SyncMessage::Upload { hash, data }) {
256            warn!("Failed to queue S3 upload: {}", e);
257        }
258    }
259
260    /// Queue a blob for deletion from S3 (non-blocking)
261    fn queue_delete(&self, hash: Hash) {
262        if let Err(e) = self.sync_tx.send(SyncMessage::Delete { hash }) {
263            warn!("Failed to queue S3 delete: {}", e);
264        }
265    }
266
267    /// Shutdown the background sync task
268    pub fn shutdown(&self) {
269        let _ = self.sync_tx.send(SyncMessage::Shutdown);
270    }
271}
272
273impl<L: Store> Drop for S3Store<L> {
274    fn drop(&mut self) {
275        let _ = self.sync_tx.send(SyncMessage::Shutdown);
276    }
277}
278
279#[async_trait]
280impl<L: Store + 'static> Store for S3Store<L> {
281    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
282        // Store locally first (fast)
283        let is_new = self.local.put(hash, data.clone()).await?;
284
285        // Queue for S3 upload in background (non-blocking)
286        if is_new {
287            self.queue_upload(hash, data);
288        }
289
290        Ok(is_new)
291    }
292
293    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
294        // Try local first
295        if let Some(data) = self.local.get(hash).await? {
296            return Ok(Some(data));
297        }
298
299        // Fall back to S3
300        match self.fetch_from_s3(hash).await {
301            Ok(Some(data)) => {
302                // Cache locally for future access
303                let _ = self.local.put(*hash, data.clone()).await;
304                Ok(Some(data))
305            }
306            Ok(None) => Ok(None),
307            Err(e) => {
308                warn!("S3 fetch failed, returning None: {}", e);
309                Ok(None)
310            }
311        }
312    }
313
314    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
315        // Check local first
316        if self.local.has(hash).await? {
317            return Ok(true);
318        }
319
320        // Check S3
321        match self.exists_in_s3(hash).await {
322            Ok(exists) => Ok(exists),
323            Err(e) => {
324                warn!("S3 exists check failed, returning false: {}", e);
325                Ok(false)
326            }
327        }
328    }
329
330    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
331        // Delete locally
332        let deleted = self.local.delete(hash).await?;
333
334        // Queue S3 deletion in background
335        self.queue_delete(*hash);
336
337        Ok(deleted)
338    }
339}
340
341/// S3 store specific errors
342#[derive(Debug, thiserror::Error)]
343pub enum S3StoreError {
344    #[error("S3 error: {0}")]
345    S3(String),
346    #[error("Configuration error: {0}")]
347    Config(String),
348}
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353    use hashtree_core::hash::sha256;
354
355    #[test]
356    fn test_s3_key_generation() {
357        // Just test that prefix is applied correctly
358        let prefix = "blobs/";
359        let hash = sha256(b"test");
360        let key = format!("{}{}", prefix, to_hex(&hash));
361        assert!(key.starts_with("blobs/"));
362        assert_eq!(key.len(), 6 + 64); // "blobs/" + 64 hex chars
363    }
364
365    #[test]
366    fn test_s3_config() {
367        let config = S3Config {
368            bucket: "test-bucket".to_string(),
369            prefix: Some("data/".to_string()),
370            region: Some("us-east-1".to_string()),
371            endpoint: None,
372        };
373
374        assert_eq!(config.bucket, "test-bucket");
375        assert_eq!(config.prefix, Some("data/".to_string()));
376    }
377}