1use async_trait::async_trait;
10use bytes::Bytes;
11use object_store::aws::AmazonS3Builder;
12use object_store::path::Path;
13use object_store::ObjectStore;
14use tracing::{debug, warn};
15
16use super::StorageBackend;
17use crate::CacheError;
18
19pub struct S3Storage {
21 store: Box<dyn ObjectStore>,
22 bucket: String,
23 region: String,
24 endpoint: Option<String>,
25}
26
27impl S3Storage {
28 pub fn new(bucket: String, region: String, endpoint: Option<String>) -> Result<Self, CacheError> {
33 let mut builder = AmazonS3Builder::new()
34 .with_bucket_name(&bucket)
35 .with_region(®ion);
36
37 if let Some(ep) = &endpoint {
38 builder = builder.with_endpoint(ep).with_allow_http(true);
39 }
40
41 let store = builder
42 .build()
43 .map_err(|e| CacheError::Io(std::io::Error::other(format!("S3 init failed: {e}"))))?;
44
45 Ok(Self {
46 store: Box::new(store),
47 bucket,
48 region,
49 endpoint,
50 })
51 }
52
53 #[must_use]
55 pub fn bucket(&self) -> &str {
56 &self.bucket
57 }
58
59 #[must_use]
61 pub fn region(&self) -> &str {
62 &self.region
63 }
64
65 #[must_use]
67 pub fn endpoint(&self) -> Option<&str> {
68 self.endpoint.as_deref()
69 }
70}
71
72#[async_trait]
73impl StorageBackend for S3Storage {
74 async fn get_narinfo(&self, hash: &str) -> Result<Option<String>, CacheError> {
75 let path = Path::from(format!("{hash}.narinfo"));
76 match self.store.get(&path).await {
77 Ok(result) => {
78 let bytes = result
79 .bytes()
80 .await
81 .map_err(|e| CacheError::Io(std::io::Error::other(format!("S3 read: {e}"))))?;
82 Ok(Some(
83 String::from_utf8(bytes.to_vec())
84 .map_err(|e| CacheError::NarInfo(format!("Invalid UTF-8: {e}")))?,
85 ))
86 }
87 Err(object_store::Error::NotFound { .. }) => Ok(None),
88 Err(e) => Err(CacheError::Io(std::io::Error::other(format!("S3 get: {e}")))),
89 }
90 }
91
92 async fn put_narinfo(&self, hash: &str, content: &str) -> Result<(), CacheError> {
93 let path = Path::from(format!("{hash}.narinfo"));
94 self.store
95 .put(&path, Bytes::from(content.to_string()).into())
96 .await
97 .map_err(|e| CacheError::Io(std::io::Error::other(format!("S3 put: {e}"))))?;
98 debug!(hash = %hash, "Stored narinfo in S3");
99 Ok(())
100 }
101
102 async fn get_nar(&self, nar_path: &str) -> Result<Option<Vec<u8>>, CacheError> {
103 let path = Path::from(nar_path);
104 match self.store.get(&path).await {
105 Ok(result) => {
106 let bytes = result
107 .bytes()
108 .await
109 .map_err(|e| CacheError::Io(std::io::Error::other(format!("S3 read: {e}"))))?;
110 Ok(Some(bytes.to_vec()))
111 }
112 Err(object_store::Error::NotFound { .. }) => Ok(None),
113 Err(e) => Err(CacheError::Io(std::io::Error::other(format!("S3 get: {e}")))),
114 }
115 }
116
117 async fn put_nar(&self, nar_path: &str, data: &[u8]) -> Result<(), CacheError> {
118 let path = Path::from(nar_path);
119 self.store
120 .put(&path, Bytes::from(data.to_vec()).into())
121 .await
122 .map_err(|e| CacheError::Io(std::io::Error::other(format!("S3 put: {e}"))))?;
123 debug!(path = %nar_path, size = data.len(), "Stored NAR in S3");
124 Ok(())
125 }
126
127 async fn delete(&self, hash: &str) -> Result<(), CacheError> {
128 let narinfo_path = Path::from(format!("{hash}.narinfo"));
130 if let Err(e) = self.store.delete(&narinfo_path).await {
131 warn!(hash = %hash, error = %e, "Failed to delete narinfo from S3");
132 }
133
134 for ext in &["nar.xz", "nar.zst", "nar"] {
136 let nar_path = Path::from(format!("nar/{hash}.{ext}"));
137 let _ = self.store.delete(&nar_path).await;
138 }
139
140 debug!(hash = %hash, "Deleted from S3");
141 Ok(())
142 }
143
144 async fn list_narinfos(&self) -> Result<Vec<String>, CacheError> {
145 use futures::TryStreamExt;
146
147 let prefix = Path::from("");
148 let mut hashes = Vec::new();
149
150 let mut list_stream = self.store.list(Some(&prefix));
151
152 while let Some(meta) = list_stream
153 .try_next()
154 .await
155 .map_err(|e| CacheError::Io(std::io::Error::other(format!("S3 list: {e}"))))?
156 {
157 let key = meta.location.to_string();
158 if let Some(hash) = key.strip_suffix(".narinfo") {
159 hashes.push(hash.to_string());
160 }
161 }
162
163 debug!(count = hashes.len(), "Listed narinfos from S3");
164 Ok(hashes)
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171
172 #[test]
173 fn s3_storage_accessors() {
174 let storage = S3Storage::new(
175 "my-bucket".to_string(),
176 "us-east-1".to_string(),
177 Some("http://localhost:9000".to_string()),
178 )
179 .unwrap();
180 assert_eq!(storage.bucket(), "my-bucket");
181 assert_eq!(storage.region(), "us-east-1");
182 assert_eq!(storage.endpoint(), Some("http://localhost:9000"));
183 }
184
185 #[test]
186 fn s3_storage_no_endpoint() {
187 let result = S3Storage::new("bucket".to_string(), "eu-west-1".to_string(), None);
189 assert!(result.is_ok());
191 }
192}