1use async_trait::async_trait;
30use aws_sdk_s3::primitives::ByteStream;
31use aws_sdk_s3::Client as S3Client;
32use hashtree_core::store::{Store, StoreError};
33use hashtree_core::types::{to_hex, Hash};
34use std::sync::Arc;
35use tokio::sync::mpsc;
36use tracing::{debug, error, info, warn};
37
38#[derive(Debug, Clone)]
40pub struct S3Config {
41 pub bucket: String,
43 pub prefix: Option<String>,
45 pub region: Option<String>,
47 pub endpoint: Option<String>,
49}
50
51enum SyncMessage {
53 Upload { hash: Hash, data: Vec<u8> },
55 Delete { hash: Hash },
57 Shutdown,
59}
60
61pub struct S3Store<L: Store> {
66 local: Arc<L>,
68 s3_client: S3Client,
70 bucket: String,
72 prefix: String,
74 sync_tx: mpsc::UnboundedSender<SyncMessage>,
76}
77
78impl<L: Store + 'static> S3Store<L> {
79 pub async fn new(local: Arc<L>, config: S3Config) -> Result<Self, S3StoreError> {
83 let mut aws_config_loader = aws_config::from_env();
85
86 if let Some(ref region) = config.region {
87 aws_config_loader =
88 aws_config_loader.region(aws_sdk_s3::config::Region::new(region.clone()));
89 }
90
91 let aws_config = aws_config_loader.load().await;
92
93 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
95
96 if let Some(ref endpoint) = config.endpoint {
97 s3_config_builder = s3_config_builder
98 .endpoint_url(endpoint)
99 .force_path_style(true); }
101
102 let s3_client = S3Client::from_conf(s3_config_builder.build());
103
104 let prefix = config.prefix.unwrap_or_default();
105 let bucket = config.bucket.clone();
106
107 let (sync_tx, sync_rx) = mpsc::unbounded_channel();
109
110 let sync_client = s3_client.clone();
112 let sync_bucket = bucket.clone();
113 let sync_prefix = prefix.clone();
114
115 tokio::spawn(async move {
116 Self::sync_task(sync_rx, sync_client, sync_bucket, sync_prefix).await;
117 });
118
119 info!(
120 "S3Store initialized with bucket: {}, prefix: {}",
121 bucket, prefix
122 );
123
124 Ok(Self {
125 local,
126 s3_client,
127 bucket,
128 prefix,
129 sync_tx,
130 })
131 }
132
133 async fn sync_task(
135 mut rx: mpsc::UnboundedReceiver<SyncMessage>,
136 client: S3Client,
137 bucket: String,
138 prefix: String,
139 ) {
140 info!("S3 sync task started");
141
142 while let Some(msg) = rx.recv().await {
143 match msg {
144 SyncMessage::Upload { hash, data } => {
145 let key = format!("{}{}", prefix, to_hex(&hash));
146 debug!(
147 "S3 uploading {} ({} bytes)",
148 &key[..16.min(key.len())],
149 data.len()
150 );
151
152 match client
153 .put_object()
154 .bucket(&bucket)
155 .key(&key)
156 .body(ByteStream::from(data))
157 .send()
158 .await
159 {
160 Ok(_) => {
161 debug!("S3 upload complete: {}", &key[..16.min(key.len())]);
162 }
163 Err(e) => {
164 error!("S3 upload failed for {}: {}", &key[..16.min(key.len())], e);
165 }
166 }
167 }
168 SyncMessage::Delete { hash } => {
169 let key = format!("{}{}", prefix, to_hex(&hash));
170 debug!("S3 deleting {}", &key[..16.min(key.len())]);
171
172 match client
173 .delete_object()
174 .bucket(&bucket)
175 .key(&key)
176 .send()
177 .await
178 {
179 Ok(_) => {
180 debug!("S3 delete complete: {}", &key[..16.min(key.len())]);
181 }
182 Err(e) => {
183 error!("S3 delete failed for {}: {}", &key[..16.min(key.len())], e);
184 }
185 }
186 }
187 SyncMessage::Shutdown => {
188 info!("S3 sync task shutting down");
189 break;
190 }
191 }
192 }
193 }
194
195 fn s3_key(&self, hash: &Hash) -> String {
197 format!("{}{}", self.prefix, to_hex(hash))
198 }
199
200 async fn fetch_from_s3(&self, hash: &Hash) -> Result<Option<Vec<u8>>, S3StoreError> {
202 let key = self.s3_key(hash);
203
204 match self
205 .s3_client
206 .get_object()
207 .bucket(&self.bucket)
208 .key(&key)
209 .send()
210 .await
211 {
212 Ok(output) => {
213 let data = output
214 .body
215 .collect()
216 .await
217 .map_err(|e| S3StoreError::S3(format!("Failed to read body: {}", e)))?;
218 Ok(Some(data.into_bytes().to_vec()))
219 }
220 Err(e) => {
221 let service_err = e.into_service_error();
223 if service_err.is_no_such_key() {
224 Ok(None)
225 } else {
226 Err(S3StoreError::S3(format!("S3 get failed: {}", service_err)))
227 }
228 }
229 }
230 }
231
232 async fn exists_in_s3(&self, hash: &Hash) -> Result<bool, S3StoreError> {
234 let key = self.s3_key(hash);
235
236 match self
237 .s3_client
238 .head_object()
239 .bucket(&self.bucket)
240 .key(&key)
241 .send()
242 .await
243 {
244 Ok(_) => Ok(true),
245 Err(e) => {
246 let service_err = e.into_service_error();
247 if service_err.is_not_found() {
248 Ok(false)
249 } else {
250 Err(S3StoreError::S3(format!("S3 head failed: {}", service_err)))
251 }
252 }
253 }
254 }
255
256 fn queue_upload(&self, hash: Hash, data: Vec<u8>) {
258 if let Err(e) = self.sync_tx.send(SyncMessage::Upload { hash, data }) {
259 warn!("Failed to queue S3 upload: {}", e);
260 }
261 }
262
263 fn queue_delete(&self, hash: Hash) {
265 if let Err(e) = self.sync_tx.send(SyncMessage::Delete { hash }) {
266 warn!("Failed to queue S3 delete: {}", e);
267 }
268 }
269
270 pub fn shutdown(&self) {
272 let _ = self.sync_tx.send(SyncMessage::Shutdown);
273 }
274}
275
276impl<L: Store> Drop for S3Store<L> {
277 fn drop(&mut self) {
278 let _ = self.sync_tx.send(SyncMessage::Shutdown);
279 }
280}
281
282#[async_trait]
283impl<L: Store + 'static> Store for S3Store<L> {
284 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
285 let is_new = self.local.put(hash, data.clone()).await?;
287
288 if is_new {
290 self.queue_upload(hash, data);
291 }
292
293 Ok(is_new)
294 }
295
296 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
297 if let Some(data) = self.local.get(hash).await? {
299 return Ok(Some(data));
300 }
301
302 match self.fetch_from_s3(hash).await {
304 Ok(Some(data)) => {
305 let _ = self.local.put(*hash, data.clone()).await;
307 Ok(Some(data))
308 }
309 Ok(None) => Ok(None),
310 Err(e) => {
311 warn!("S3 fetch failed, returning None: {}", e);
312 Ok(None)
313 }
314 }
315 }
316
317 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
318 if self.local.has(hash).await? {
320 return Ok(true);
321 }
322
323 match self.exists_in_s3(hash).await {
325 Ok(exists) => Ok(exists),
326 Err(e) => {
327 warn!("S3 exists check failed, returning false: {}", e);
328 Ok(false)
329 }
330 }
331 }
332
333 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
334 let deleted = self.local.delete(hash).await?;
336
337 self.queue_delete(*hash);
339
340 Ok(deleted)
341 }
342}
343
344#[derive(Debug, thiserror::Error)]
346pub enum S3StoreError {
347 #[error("S3 error: {0}")]
348 S3(String),
349 #[error("Configuration error: {0}")]
350 Config(String),
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356 use hashtree_core::hash::sha256;
357
358 #[test]
359 fn test_s3_key_generation() {
360 let prefix = "blobs/";
362 let hash = sha256(b"test");
363 let key = format!("{}{}", prefix, to_hex(&hash));
364 assert!(key.starts_with("blobs/"));
365 assert_eq!(key.len(), 6 + 64); }
367
368 #[test]
369 fn test_s3_config() {
370 let config = S3Config {
371 bucket: "test-bucket".to_string(),
372 prefix: Some("data/".to_string()),
373 region: Some("us-east-1".to_string()),
374 endpoint: None,
375 };
376
377 assert_eq!(config.bucket, "test-bucket");
378 assert_eq!(config.prefix, Some("data/".to_string()));
379 }
380}