1use async_trait::async_trait;
27use aws_sdk_s3::Client as S3Client;
28use aws_sdk_s3::primitives::ByteStream;
29use hashtree_core::store::{Store, StoreError};
30use hashtree_core::types::{to_hex, Hash};
31use std::sync::Arc;
32use tokio::sync::mpsc;
33use tracing::{debug, error, info, warn};
34
35#[derive(Debug, Clone)]
37pub struct S3Config {
38 pub bucket: String,
40 pub prefix: Option<String>,
42 pub region: Option<String>,
44 pub endpoint: Option<String>,
46}
47
48enum SyncMessage {
50 Upload { hash: Hash, data: Vec<u8> },
52 Delete { hash: Hash },
54 Shutdown,
56}
57
58pub struct S3Store<L: Store> {
63 local: Arc<L>,
65 s3_client: S3Client,
67 bucket: String,
69 prefix: String,
71 sync_tx: mpsc::UnboundedSender<SyncMessage>,
73}
74
75impl<L: Store + 'static> S3Store<L> {
76 pub async fn new(local: Arc<L>, config: S3Config) -> Result<Self, S3StoreError> {
80 let mut aws_config_loader = aws_config::from_env();
82
83 if let Some(ref region) = config.region {
84 aws_config_loader = aws_config_loader.region(aws_sdk_s3::config::Region::new(region.clone()));
85 }
86
87 let aws_config = aws_config_loader.load().await;
88
89 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
91
92 if let Some(ref endpoint) = config.endpoint {
93 s3_config_builder = s3_config_builder
94 .endpoint_url(endpoint)
95 .force_path_style(true); }
97
98 let s3_client = S3Client::from_conf(s3_config_builder.build());
99
100 let prefix = config.prefix.unwrap_or_default();
101 let bucket = config.bucket.clone();
102
103 let (sync_tx, sync_rx) = mpsc::unbounded_channel();
105
106 let sync_client = s3_client.clone();
108 let sync_bucket = bucket.clone();
109 let sync_prefix = prefix.clone();
110
111 tokio::spawn(async move {
112 Self::sync_task(sync_rx, sync_client, sync_bucket, sync_prefix).await;
113 });
114
115 info!("S3Store initialized with bucket: {}, prefix: {}", bucket, prefix);
116
117 Ok(Self {
118 local,
119 s3_client,
120 bucket,
121 prefix,
122 sync_tx,
123 })
124 }
125
126 async fn sync_task(
128 mut rx: mpsc::UnboundedReceiver<SyncMessage>,
129 client: S3Client,
130 bucket: String,
131 prefix: String,
132 ) {
133 info!("S3 sync task started");
134
135 while let Some(msg) = rx.recv().await {
136 match msg {
137 SyncMessage::Upload { hash, data } => {
138 let key = format!("{}{}", prefix, to_hex(&hash));
139 debug!("S3 uploading {} ({} bytes)", &key[..16.min(key.len())], data.len());
140
141 match client
142 .put_object()
143 .bucket(&bucket)
144 .key(&key)
145 .body(ByteStream::from(data))
146 .send()
147 .await
148 {
149 Ok(_) => {
150 debug!("S3 upload complete: {}", &key[..16.min(key.len())]);
151 }
152 Err(e) => {
153 error!("S3 upload failed for {}: {}", &key[..16.min(key.len())], e);
154 }
155 }
156 }
157 SyncMessage::Delete { hash } => {
158 let key = format!("{}{}", prefix, to_hex(&hash));
159 debug!("S3 deleting {}", &key[..16.min(key.len())]);
160
161 match client
162 .delete_object()
163 .bucket(&bucket)
164 .key(&key)
165 .send()
166 .await
167 {
168 Ok(_) => {
169 debug!("S3 delete complete: {}", &key[..16.min(key.len())]);
170 }
171 Err(e) => {
172 error!("S3 delete failed for {}: {}", &key[..16.min(key.len())], e);
173 }
174 }
175 }
176 SyncMessage::Shutdown => {
177 info!("S3 sync task shutting down");
178 break;
179 }
180 }
181 }
182 }
183
184 fn s3_key(&self, hash: &Hash) -> String {
186 format!("{}{}", self.prefix, to_hex(hash))
187 }
188
189 async fn fetch_from_s3(&self, hash: &Hash) -> Result<Option<Vec<u8>>, S3StoreError> {
191 let key = self.s3_key(hash);
192
193 match self.s3_client
194 .get_object()
195 .bucket(&self.bucket)
196 .key(&key)
197 .send()
198 .await
199 {
200 Ok(output) => {
201 let data = output.body.collect().await
202 .map_err(|e| S3StoreError::S3(format!("Failed to read body: {}", e)))?;
203 Ok(Some(data.into_bytes().to_vec()))
204 }
205 Err(e) => {
206 let service_err = e.into_service_error();
208 if service_err.is_no_such_key() {
209 Ok(None)
210 } else {
211 Err(S3StoreError::S3(format!("S3 get failed: {}", service_err)))
212 }
213 }
214 }
215 }
216
217 async fn exists_in_s3(&self, hash: &Hash) -> Result<bool, S3StoreError> {
219 let key = self.s3_key(hash);
220
221 match self.s3_client
222 .head_object()
223 .bucket(&self.bucket)
224 .key(&key)
225 .send()
226 .await
227 {
228 Ok(_) => Ok(true),
229 Err(e) => {
230 let service_err = e.into_service_error();
231 if service_err.is_not_found() {
232 Ok(false)
233 } else {
234 Err(S3StoreError::S3(format!("S3 head failed: {}", service_err)))
235 }
236 }
237 }
238 }
239
240 fn queue_upload(&self, hash: Hash, data: Vec<u8>) {
242 if let Err(e) = self.sync_tx.send(SyncMessage::Upload { hash, data }) {
243 warn!("Failed to queue S3 upload: {}", e);
244 }
245 }
246
247 fn queue_delete(&self, hash: Hash) {
249 if let Err(e) = self.sync_tx.send(SyncMessage::Delete { hash }) {
250 warn!("Failed to queue S3 delete: {}", e);
251 }
252 }
253
254 pub fn shutdown(&self) {
256 let _ = self.sync_tx.send(SyncMessage::Shutdown);
257 }
258}
259
260impl<L: Store> Drop for S3Store<L> {
261 fn drop(&mut self) {
262 let _ = self.sync_tx.send(SyncMessage::Shutdown);
263 }
264}
265
266#[async_trait]
267impl<L: Store + 'static> Store for S3Store<L> {
268 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
269 let is_new = self.local.put(hash, data.clone()).await?;
271
272 if is_new {
274 self.queue_upload(hash, data);
275 }
276
277 Ok(is_new)
278 }
279
280 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
281 if let Some(data) = self.local.get(hash).await? {
283 return Ok(Some(data));
284 }
285
286 match self.fetch_from_s3(hash).await {
288 Ok(Some(data)) => {
289 let _ = self.local.put(*hash, data.clone()).await;
291 Ok(Some(data))
292 }
293 Ok(None) => Ok(None),
294 Err(e) => {
295 warn!("S3 fetch failed, returning None: {}", e);
296 Ok(None)
297 }
298 }
299 }
300
301 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
302 if self.local.has(hash).await? {
304 return Ok(true);
305 }
306
307 match self.exists_in_s3(hash).await {
309 Ok(exists) => Ok(exists),
310 Err(e) => {
311 warn!("S3 exists check failed, returning false: {}", e);
312 Ok(false)
313 }
314 }
315 }
316
317 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
318 let deleted = self.local.delete(hash).await?;
320
321 self.queue_delete(*hash);
323
324 Ok(deleted)
325 }
326}
327
328#[derive(Debug, thiserror::Error)]
330pub enum S3StoreError {
331 #[error("S3 error: {0}")]
332 S3(String),
333 #[error("Configuration error: {0}")]
334 Config(String),
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use hashtree_core::store::MemoryStore;
341 use hashtree_core::hash::sha256;
342
343 #[test]
344 fn test_s3_key_generation() {
345 let prefix = "blobs/";
347 let hash = sha256(b"test");
348 let key = format!("{}{}", prefix, to_hex(&hash));
349 assert!(key.starts_with("blobs/"));
350 assert_eq!(key.len(), 6 + 64); }
352
353 #[test]
354 fn test_s3_config() {
355 let config = S3Config {
356 bucket: "test-bucket".to_string(),
357 prefix: Some("data/".to_string()),
358 region: Some("us-east-1".to_string()),
359 endpoint: None,
360 };
361
362 assert_eq!(config.bucket, "test-bucket");
363 assert_eq!(config.prefix, Some("data/".to_string()));
364 }
365}