hashtree_s3/
lib.rs

1//! S3-backed storage for hashtree with non-blocking uploads.
2//!
3//! This crate provides an S3 storage backend that:
4//! - Stores data locally first (fast writes)
5//! - Syncs to S3 in the background (non-blocking)
6//! - Falls back to S3 if data not in local cache
7//!
8//! # Example
9//!
10//! ```ignore
11//! use hashtree_s3::{S3Store, S3Config};
12//! use hashtree_core::store::MemoryStore;
13//! use std::sync::Arc;
14//!
15//! let local_store = Arc::new(MemoryStore::new());
16//! let config = S3Config {
17//!     bucket: "my-bucket".to_string(),
18//!     prefix: Some("blobs/".to_string()),
19//!     region: None, // Uses AWS_REGION env var
20//!     endpoint: None, // For S3-compatible services
21//! };
22//!
23//! let s3_store = S3Store::new(local_store, config).await?;
24//! ```
25
26use async_trait::async_trait;
27use aws_sdk_s3::Client as S3Client;
28use aws_sdk_s3::primitives::ByteStream;
29use hashtree_core::store::{Store, StoreError};
30use hashtree_core::types::{to_hex, Hash};
31use std::sync::Arc;
32use tokio::sync::mpsc;
33use tracing::{debug, error, info, warn};
34
35/// S3 configuration
36#[derive(Debug, Clone)]
37pub struct S3Config {
38    /// S3 bucket name
39    pub bucket: String,
40    /// Optional prefix for all keys (e.g., "blobs/")
41    pub prefix: Option<String>,
42    /// AWS region (defaults to AWS_REGION env var)
43    pub region: Option<String>,
44    /// Custom endpoint URL (for S3-compatible services like MinIO, R2, etc.)
45    pub endpoint: Option<String>,
46}
47
48/// Background sync task message
49enum SyncMessage {
50    /// Upload blob to S3
51    Upload { hash: Hash, data: Vec<u8> },
52    /// Delete blob from S3
53    Delete { hash: Hash },
54    /// Shutdown the sync task
55    Shutdown,
56}
57
58/// S3-backed store with local caching and background sync.
59///
60/// Writes go to the local store first (fast), then are synced to S3 in the background.
61/// Reads check local store first, then fall back to S3.
62pub struct S3Store<L: Store> {
63    /// Local store for fast access
64    local: Arc<L>,
65    /// S3 client
66    s3_client: S3Client,
67    /// S3 bucket name
68    bucket: String,
69    /// Key prefix
70    prefix: String,
71    /// Channel to send sync messages to background task
72    sync_tx: mpsc::UnboundedSender<SyncMessage>,
73}
74
75impl<L: Store + 'static> S3Store<L> {
76    /// Create a new S3 store wrapping a local store.
77    ///
78    /// Spawns a background task for non-blocking S3 uploads.
79    pub async fn new(local: Arc<L>, config: S3Config) -> Result<Self, S3StoreError> {
80        // Build AWS config
81        let mut aws_config_loader = aws_config::from_env();
82
83        if let Some(ref region) = config.region {
84            aws_config_loader = aws_config_loader.region(aws_sdk_s3::config::Region::new(region.clone()));
85        }
86
87        let aws_config = aws_config_loader.load().await;
88
89        // Build S3 client
90        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
91
92        if let Some(ref endpoint) = config.endpoint {
93            s3_config_builder = s3_config_builder
94                .endpoint_url(endpoint)
95                .force_path_style(true); // Required for most S3-compatible services
96        }
97
98        let s3_client = S3Client::from_conf(s3_config_builder.build());
99
100        let prefix = config.prefix.unwrap_or_default();
101        let bucket = config.bucket.clone();
102
103        // Create channel for background sync
104        let (sync_tx, sync_rx) = mpsc::unbounded_channel();
105
106        // Spawn background sync task
107        let sync_client = s3_client.clone();
108        let sync_bucket = bucket.clone();
109        let sync_prefix = prefix.clone();
110
111        tokio::spawn(async move {
112            Self::sync_task(sync_rx, sync_client, sync_bucket, sync_prefix).await;
113        });
114
115        info!("S3Store initialized with bucket: {}, prefix: {}", bucket, prefix);
116
117        Ok(Self {
118            local,
119            s3_client,
120            bucket,
121            prefix,
122            sync_tx,
123        })
124    }
125
126    /// Background task that handles S3 uploads/deletes
127    async fn sync_task(
128        mut rx: mpsc::UnboundedReceiver<SyncMessage>,
129        client: S3Client,
130        bucket: String,
131        prefix: String,
132    ) {
133        info!("S3 sync task started");
134
135        while let Some(msg) = rx.recv().await {
136            match msg {
137                SyncMessage::Upload { hash, data } => {
138                    let key = format!("{}{}", prefix, to_hex(&hash));
139                    debug!("S3 uploading {} ({} bytes)", &key[..16.min(key.len())], data.len());
140
141                    match client
142                        .put_object()
143                        .bucket(&bucket)
144                        .key(&key)
145                        .body(ByteStream::from(data))
146                        .send()
147                        .await
148                    {
149                        Ok(_) => {
150                            debug!("S3 upload complete: {}", &key[..16.min(key.len())]);
151                        }
152                        Err(e) => {
153                            error!("S3 upload failed for {}: {}", &key[..16.min(key.len())], e);
154                        }
155                    }
156                }
157                SyncMessage::Delete { hash } => {
158                    let key = format!("{}{}", prefix, to_hex(&hash));
159                    debug!("S3 deleting {}", &key[..16.min(key.len())]);
160
161                    match client
162                        .delete_object()
163                        .bucket(&bucket)
164                        .key(&key)
165                        .send()
166                        .await
167                    {
168                        Ok(_) => {
169                            debug!("S3 delete complete: {}", &key[..16.min(key.len())]);
170                        }
171                        Err(e) => {
172                            error!("S3 delete failed for {}: {}", &key[..16.min(key.len())], e);
173                        }
174                    }
175                }
176                SyncMessage::Shutdown => {
177                    info!("S3 sync task shutting down");
178                    break;
179                }
180            }
181        }
182    }
183
184    /// Get the S3 key for a hash
185    fn s3_key(&self, hash: &Hash) -> String {
186        format!("{}{}", self.prefix, to_hex(hash))
187    }
188
189    /// Fetch from S3 directly (used when local miss)
190    async fn fetch_from_s3(&self, hash: &Hash) -> Result<Option<Vec<u8>>, S3StoreError> {
191        let key = self.s3_key(hash);
192
193        match self.s3_client
194            .get_object()
195            .bucket(&self.bucket)
196            .key(&key)
197            .send()
198            .await
199        {
200            Ok(output) => {
201                let data = output.body.collect().await
202                    .map_err(|e| S3StoreError::S3(format!("Failed to read body: {}", e)))?;
203                Ok(Some(data.into_bytes().to_vec()))
204            }
205            Err(e) => {
206                // Check if it's a "not found" error
207                let service_err = e.into_service_error();
208                if service_err.is_no_such_key() {
209                    Ok(None)
210                } else {
211                    Err(S3StoreError::S3(format!("S3 get failed: {}", service_err)))
212                }
213            }
214        }
215    }
216
217    /// Check if exists in S3 directly
218    async fn exists_in_s3(&self, hash: &Hash) -> Result<bool, S3StoreError> {
219        let key = self.s3_key(hash);
220
221        match self.s3_client
222            .head_object()
223            .bucket(&self.bucket)
224            .key(&key)
225            .send()
226            .await
227        {
228            Ok(_) => Ok(true),
229            Err(e) => {
230                let service_err = e.into_service_error();
231                if service_err.is_not_found() {
232                    Ok(false)
233                } else {
234                    Err(S3StoreError::S3(format!("S3 head failed: {}", service_err)))
235                }
236            }
237        }
238    }
239
240    /// Queue a blob for upload to S3 (non-blocking)
241    fn queue_upload(&self, hash: Hash, data: Vec<u8>) {
242        if let Err(e) = self.sync_tx.send(SyncMessage::Upload { hash, data }) {
243            warn!("Failed to queue S3 upload: {}", e);
244        }
245    }
246
247    /// Queue a blob for deletion from S3 (non-blocking)
248    fn queue_delete(&self, hash: Hash) {
249        if let Err(e) = self.sync_tx.send(SyncMessage::Delete { hash }) {
250            warn!("Failed to queue S3 delete: {}", e);
251        }
252    }
253
254    /// Shutdown the background sync task
255    pub fn shutdown(&self) {
256        let _ = self.sync_tx.send(SyncMessage::Shutdown);
257    }
258}
259
260impl<L: Store> Drop for S3Store<L> {
261    fn drop(&mut self) {
262        let _ = self.sync_tx.send(SyncMessage::Shutdown);
263    }
264}
265
266#[async_trait]
267impl<L: Store + 'static> Store for S3Store<L> {
268    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
269        // Store locally first (fast)
270        let is_new = self.local.put(hash, data.clone()).await?;
271
272        // Queue for S3 upload in background (non-blocking)
273        if is_new {
274            self.queue_upload(hash, data);
275        }
276
277        Ok(is_new)
278    }
279
280    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
281        // Try local first
282        if let Some(data) = self.local.get(hash).await? {
283            return Ok(Some(data));
284        }
285
286        // Fall back to S3
287        match self.fetch_from_s3(hash).await {
288            Ok(Some(data)) => {
289                // Cache locally for future access
290                let _ = self.local.put(*hash, data.clone()).await;
291                Ok(Some(data))
292            }
293            Ok(None) => Ok(None),
294            Err(e) => {
295                warn!("S3 fetch failed, returning None: {}", e);
296                Ok(None)
297            }
298        }
299    }
300
301    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
302        // Check local first
303        if self.local.has(hash).await? {
304            return Ok(true);
305        }
306
307        // Check S3
308        match self.exists_in_s3(hash).await {
309            Ok(exists) => Ok(exists),
310            Err(e) => {
311                warn!("S3 exists check failed, returning false: {}", e);
312                Ok(false)
313            }
314        }
315    }
316
317    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
318        // Delete locally
319        let deleted = self.local.delete(hash).await?;
320
321        // Queue S3 deletion in background
322        self.queue_delete(*hash);
323
324        Ok(deleted)
325    }
326}
327
328/// S3 store specific errors
329#[derive(Debug, thiserror::Error)]
330pub enum S3StoreError {
331    #[error("S3 error: {0}")]
332    S3(String),
333    #[error("Configuration error: {0}")]
334    Config(String),
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use hashtree_core::store::MemoryStore;
341    use hashtree_core::hash::sha256;
342
343    #[test]
344    fn test_s3_key_generation() {
345        // Just test that prefix is applied correctly
346        let prefix = "blobs/";
347        let hash = sha256(b"test");
348        let key = format!("{}{}", prefix, to_hex(&hash));
349        assert!(key.starts_with("blobs/"));
350        assert_eq!(key.len(), 6 + 64); // "blobs/" + 64 hex chars
351    }
352
353    #[test]
354    fn test_s3_config() {
355        let config = S3Config {
356            bucket: "test-bucket".to_string(),
357            prefix: Some("data/".to_string()),
358            region: Some("us-east-1".to_string()),
359            endpoint: None,
360        };
361
362        assert_eq!(config.bucket, "test-bucket");
363        assert_eq!(config.prefix, Some("data/".to_string()));
364    }
365}