Skip to main content

sui_cache/storage/
s3.rs

1//! S3-compatible object storage backend.
2//!
3//! Uses `object_store` crate — works with AWS S3, CloudFlare R2, MinIO,
4//! RustFS, Backblaze B2, and any S3-compatible endpoint.
5//!
6//! Breathable by design: S3 provides infinite elasticity.
7//! Combined with redb for ephemeral local metadata index.
8
9use async_trait::async_trait;
10use bytes::Bytes;
11use object_store::aws::AmazonS3Builder;
12use object_store::path::Path;
13use object_store::ObjectStore;
14use tracing::{debug, warn};
15
16use super::StorageBackend;
17use crate::CacheError;
18
19/// S3-compatible object storage backend.
20pub struct S3Storage {
21    store: Box<dyn ObjectStore>,
22    bucket: String,
23    region: String,
24    endpoint: Option<String>,
25}
26
27impl S3Storage {
28    /// Create a new S3 storage backend.
29    ///
30    /// Uses AWS default credential chain (IRSA, env vars, instance profile).
31    /// Set `endpoint` for non-AWS S3-compatible services (MinIO, RustFS, R2).
32    pub fn new(bucket: String, region: String, endpoint: Option<String>) -> Result<Self, CacheError> {
33        let mut builder = AmazonS3Builder::new()
34            .with_bucket_name(&bucket)
35            .with_region(&region);
36
37        if let Some(ep) = &endpoint {
38            builder = builder.with_endpoint(ep).with_allow_http(true);
39        }
40
41        let store = builder
42            .build()
43            .map_err(|e| CacheError::Io(std::io::Error::other(format!("S3 init failed: {e}"))))?;
44
45        Ok(Self {
46            store: Box::new(store),
47            bucket,
48            region,
49            endpoint,
50        })
51    }
52
53    /// Return the bucket name.
54    #[must_use]
55    pub fn bucket(&self) -> &str {
56        &self.bucket
57    }
58
59    /// Return the region.
60    #[must_use]
61    pub fn region(&self) -> &str {
62        &self.region
63    }
64
65    /// Return the custom endpoint, if any.
66    #[must_use]
67    pub fn endpoint(&self) -> Option<&str> {
68        self.endpoint.as_deref()
69    }
70}
71
72#[async_trait]
73impl StorageBackend for S3Storage {
74    async fn get_narinfo(&self, hash: &str) -> Result<Option<String>, CacheError> {
75        let path = Path::from(format!("{hash}.narinfo"));
76        match self.store.get(&path).await {
77            Ok(result) => {
78                let bytes = result
79                    .bytes()
80                    .await
81                    .map_err(|e| CacheError::Io(std::io::Error::other(format!("S3 read: {e}"))))?;
82                Ok(Some(
83                    String::from_utf8(bytes.to_vec())
84                        .map_err(|e| CacheError::NarInfo(format!("Invalid UTF-8: {e}")))?,
85                ))
86            }
87            Err(object_store::Error::NotFound { .. }) => Ok(None),
88            Err(e) => Err(CacheError::Io(std::io::Error::other(format!("S3 get: {e}")))),
89        }
90    }
91
92    async fn put_narinfo(&self, hash: &str, content: &str) -> Result<(), CacheError> {
93        let path = Path::from(format!("{hash}.narinfo"));
94        self.store
95            .put(&path, Bytes::from(content.to_string()).into())
96            .await
97            .map_err(|e| CacheError::Io(std::io::Error::other(format!("S3 put: {e}"))))?;
98        debug!(hash = %hash, "Stored narinfo in S3");
99        Ok(())
100    }
101
102    async fn get_nar(&self, nar_path: &str) -> Result<Option<Vec<u8>>, CacheError> {
103        let path = Path::from(nar_path);
104        match self.store.get(&path).await {
105            Ok(result) => {
106                let bytes = result
107                    .bytes()
108                    .await
109                    .map_err(|e| CacheError::Io(std::io::Error::other(format!("S3 read: {e}"))))?;
110                Ok(Some(bytes.to_vec()))
111            }
112            Err(object_store::Error::NotFound { .. }) => Ok(None),
113            Err(e) => Err(CacheError::Io(std::io::Error::other(format!("S3 get: {e}")))),
114        }
115    }
116
117    async fn put_nar(&self, nar_path: &str, data: &[u8]) -> Result<(), CacheError> {
118        let path = Path::from(nar_path);
119        self.store
120            .put(&path, Bytes::from(data.to_vec()).into())
121            .await
122            .map_err(|e| CacheError::Io(std::io::Error::other(format!("S3 put: {e}"))))?;
123        debug!(path = %nar_path, size = data.len(), "Stored NAR in S3");
124        Ok(())
125    }
126
127    async fn delete(&self, hash: &str) -> Result<(), CacheError> {
128        // Delete narinfo
129        let narinfo_path = Path::from(format!("{hash}.narinfo"));
130        if let Err(e) = self.store.delete(&narinfo_path).await {
131            warn!(hash = %hash, error = %e, "Failed to delete narinfo from S3");
132        }
133
134        // Try to delete NAR blob (common path patterns)
135        for ext in &["nar.xz", "nar.zst", "nar"] {
136            let nar_path = Path::from(format!("nar/{hash}.{ext}"));
137            let _ = self.store.delete(&nar_path).await;
138        }
139
140        debug!(hash = %hash, "Deleted from S3");
141        Ok(())
142    }
143
144    async fn list_narinfos(&self) -> Result<Vec<String>, CacheError> {
145        use futures::TryStreamExt;
146
147        let prefix = Path::from("");
148        let mut hashes = Vec::new();
149
150        let mut list_stream = self.store.list(Some(&prefix));
151
152        while let Some(meta) = list_stream
153            .try_next()
154            .await
155            .map_err(|e| CacheError::Io(std::io::Error::other(format!("S3 list: {e}"))))?
156        {
157            let key = meta.location.to_string();
158            if let Some(hash) = key.strip_suffix(".narinfo") {
159                hashes.push(hash.to_string());
160            }
161        }
162
163        debug!(count = hashes.len(), "Listed narinfos from S3");
164        Ok(hashes)
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171
172    #[test]
173    fn s3_storage_accessors() {
174        let storage = S3Storage::new(
175            "my-bucket".to_string(),
176            "us-east-1".to_string(),
177            Some("http://localhost:9000".to_string()),
178        )
179        .unwrap();
180        assert_eq!(storage.bucket(), "my-bucket");
181        assert_eq!(storage.region(), "us-east-1");
182        assert_eq!(storage.endpoint(), Some("http://localhost:9000"));
183    }
184
185    #[test]
186    fn s3_storage_no_endpoint() {
187        // This may fail without valid AWS creds — skip in CI
188        let result = S3Storage::new("bucket".to_string(), "eu-west-1".to_string(), None);
189        // Just verify construction doesn't panic
190        assert!(result.is_ok());
191    }
192}