1use async_trait::async_trait;
27use aws_sdk_s3::primitives::ByteStream;
28use aws_sdk_s3::Client as S3Client;
29use hashtree_core::store::{Store, StoreError};
30use hashtree_core::types::{to_hex, Hash};
31use std::sync::Arc;
32use tokio::sync::mpsc;
33use tracing::{debug, error, info, warn};
34
35#[derive(Debug, Clone)]
37pub struct S3Config {
38 pub bucket: String,
40 pub prefix: Option<String>,
42 pub region: Option<String>,
44 pub endpoint: Option<String>,
46}
47
48enum SyncMessage {
50 Upload { hash: Hash, data: Vec<u8> },
52 Delete { hash: Hash },
54 Shutdown,
56}
57
58pub struct S3Store<L: Store> {
63 local: Arc<L>,
65 s3_client: S3Client,
67 bucket: String,
69 prefix: String,
71 sync_tx: mpsc::UnboundedSender<SyncMessage>,
73}
74
75impl<L: Store + 'static> S3Store<L> {
76 pub async fn new(local: Arc<L>, config: S3Config) -> Result<Self, S3StoreError> {
80 let mut aws_config_loader = aws_config::from_env();
82
83 if let Some(ref region) = config.region {
84 aws_config_loader =
85 aws_config_loader.region(aws_sdk_s3::config::Region::new(region.clone()));
86 }
87
88 let aws_config = aws_config_loader.load().await;
89
90 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
92
93 if let Some(ref endpoint) = config.endpoint {
94 s3_config_builder = s3_config_builder
95 .endpoint_url(endpoint)
96 .force_path_style(true); }
98
99 let s3_client = S3Client::from_conf(s3_config_builder.build());
100
101 let prefix = config.prefix.unwrap_or_default();
102 let bucket = config.bucket.clone();
103
104 let (sync_tx, sync_rx) = mpsc::unbounded_channel();
106
107 let sync_client = s3_client.clone();
109 let sync_bucket = bucket.clone();
110 let sync_prefix = prefix.clone();
111
112 tokio::spawn(async move {
113 Self::sync_task(sync_rx, sync_client, sync_bucket, sync_prefix).await;
114 });
115
116 info!(
117 "S3Store initialized with bucket: {}, prefix: {}",
118 bucket, prefix
119 );
120
121 Ok(Self {
122 local,
123 s3_client,
124 bucket,
125 prefix,
126 sync_tx,
127 })
128 }
129
130 async fn sync_task(
132 mut rx: mpsc::UnboundedReceiver<SyncMessage>,
133 client: S3Client,
134 bucket: String,
135 prefix: String,
136 ) {
137 info!("S3 sync task started");
138
139 while let Some(msg) = rx.recv().await {
140 match msg {
141 SyncMessage::Upload { hash, data } => {
142 let key = format!("{}{}", prefix, to_hex(&hash));
143 debug!(
144 "S3 uploading {} ({} bytes)",
145 &key[..16.min(key.len())],
146 data.len()
147 );
148
149 match client
150 .put_object()
151 .bucket(&bucket)
152 .key(&key)
153 .body(ByteStream::from(data))
154 .send()
155 .await
156 {
157 Ok(_) => {
158 debug!("S3 upload complete: {}", &key[..16.min(key.len())]);
159 }
160 Err(e) => {
161 error!("S3 upload failed for {}: {}", &key[..16.min(key.len())], e);
162 }
163 }
164 }
165 SyncMessage::Delete { hash } => {
166 let key = format!("{}{}", prefix, to_hex(&hash));
167 debug!("S3 deleting {}", &key[..16.min(key.len())]);
168
169 match client
170 .delete_object()
171 .bucket(&bucket)
172 .key(&key)
173 .send()
174 .await
175 {
176 Ok(_) => {
177 debug!("S3 delete complete: {}", &key[..16.min(key.len())]);
178 }
179 Err(e) => {
180 error!("S3 delete failed for {}: {}", &key[..16.min(key.len())], e);
181 }
182 }
183 }
184 SyncMessage::Shutdown => {
185 info!("S3 sync task shutting down");
186 break;
187 }
188 }
189 }
190 }
191
192 fn s3_key(&self, hash: &Hash) -> String {
194 format!("{}{}", self.prefix, to_hex(hash))
195 }
196
197 async fn fetch_from_s3(&self, hash: &Hash) -> Result<Option<Vec<u8>>, S3StoreError> {
199 let key = self.s3_key(hash);
200
201 match self
202 .s3_client
203 .get_object()
204 .bucket(&self.bucket)
205 .key(&key)
206 .send()
207 .await
208 {
209 Ok(output) => {
210 let data = output
211 .body
212 .collect()
213 .await
214 .map_err(|e| S3StoreError::S3(format!("Failed to read body: {}", e)))?;
215 Ok(Some(data.into_bytes().to_vec()))
216 }
217 Err(e) => {
218 let service_err = e.into_service_error();
220 if service_err.is_no_such_key() {
221 Ok(None)
222 } else {
223 Err(S3StoreError::S3(format!("S3 get failed: {}", service_err)))
224 }
225 }
226 }
227 }
228
229 async fn exists_in_s3(&self, hash: &Hash) -> Result<bool, S3StoreError> {
231 let key = self.s3_key(hash);
232
233 match self
234 .s3_client
235 .head_object()
236 .bucket(&self.bucket)
237 .key(&key)
238 .send()
239 .await
240 {
241 Ok(_) => Ok(true),
242 Err(e) => {
243 let service_err = e.into_service_error();
244 if service_err.is_not_found() {
245 Ok(false)
246 } else {
247 Err(S3StoreError::S3(format!("S3 head failed: {}", service_err)))
248 }
249 }
250 }
251 }
252
253 fn queue_upload(&self, hash: Hash, data: Vec<u8>) {
255 if let Err(e) = self.sync_tx.send(SyncMessage::Upload { hash, data }) {
256 warn!("Failed to queue S3 upload: {}", e);
257 }
258 }
259
260 fn queue_delete(&self, hash: Hash) {
262 if let Err(e) = self.sync_tx.send(SyncMessage::Delete { hash }) {
263 warn!("Failed to queue S3 delete: {}", e);
264 }
265 }
266
267 pub fn shutdown(&self) {
269 let _ = self.sync_tx.send(SyncMessage::Shutdown);
270 }
271}
272
273impl<L: Store> Drop for S3Store<L> {
274 fn drop(&mut self) {
275 let _ = self.sync_tx.send(SyncMessage::Shutdown);
276 }
277}
278
279#[async_trait]
280impl<L: Store + 'static> Store for S3Store<L> {
281 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
282 let is_new = self.local.put(hash, data.clone()).await?;
284
285 if is_new {
287 self.queue_upload(hash, data);
288 }
289
290 Ok(is_new)
291 }
292
293 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
294 if let Some(data) = self.local.get(hash).await? {
296 return Ok(Some(data));
297 }
298
299 match self.fetch_from_s3(hash).await {
301 Ok(Some(data)) => {
302 let _ = self.local.put(*hash, data.clone()).await;
304 Ok(Some(data))
305 }
306 Ok(None) => Ok(None),
307 Err(e) => {
308 warn!("S3 fetch failed, returning None: {}", e);
309 Ok(None)
310 }
311 }
312 }
313
314 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
315 if self.local.has(hash).await? {
317 return Ok(true);
318 }
319
320 match self.exists_in_s3(hash).await {
322 Ok(exists) => Ok(exists),
323 Err(e) => {
324 warn!("S3 exists check failed, returning false: {}", e);
325 Ok(false)
326 }
327 }
328 }
329
330 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
331 let deleted = self.local.delete(hash).await?;
333
334 self.queue_delete(*hash);
336
337 Ok(deleted)
338 }
339}
340
341#[derive(Debug, thiserror::Error)]
343pub enum S3StoreError {
344 #[error("S3 error: {0}")]
345 S3(String),
346 #[error("Configuration error: {0}")]
347 Config(String),
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353 use hashtree_core::hash::sha256;
354
355 #[test]
356 fn test_s3_key_generation() {
357 let prefix = "blobs/";
359 let hash = sha256(b"test");
360 let key = format!("{}{}", prefix, to_hex(&hash));
361 assert!(key.starts_with("blobs/"));
362 assert_eq!(key.len(), 6 + 64); }
364
365 #[test]
366 fn test_s3_config() {
367 let config = S3Config {
368 bucket: "test-bucket".to_string(),
369 prefix: Some("data/".to_string()),
370 region: Some("us-east-1".to_string()),
371 endpoint: None,
372 };
373
374 assert_eq!(config.bucket, "test-bucket");
375 assert_eq!(config.prefix, Some("data/".to_string()));
376 }
377}