1use std::path::Path;
41use std::sync::Arc;
42
43use aws_config::BehaviorVersion;
44use aws_sdk_s3::config::Region;
45use aws_sdk_s3::Client;
46use aws_smithy_http_client::tls::rustls_provider::CryptoMode;
47use aws_smithy_http_client::tls::Provider as TlsProvider;
48use aws_smithy_http_client::Builder as HttpClientBuilder;
49use snapdir_core::manifest::Manifest;
50use snapdir_core::merkle::{Blake3Hasher, Hasher};
51use snapdir_core::store::{manifest_path, object_path, Store, StoreError};
52use snapdir_core::Meter;
53
54use crate::fetch::fetch_files_concurrent;
55use crate::push::{push_objects_concurrent, upload_object};
56use crate::stream::StreamStore;
57use crate::transfer::{RateLimiter, TransferConfig};
58
59use tokio::runtime::Runtime;
60
61const MAX_FETCH_RETRIES: u32 = 5;
64
65#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct S3Location {
68 pub bucket: String,
70 pub prefix: String,
73}
74
75impl S3Location {
76 #[must_use]
86 pub fn parse(store_url: &str) -> Self {
87 let without_scheme = match store_url.find("://") {
91 Some(idx) => &store_url[idx + 3..],
92 None => store_url,
93 };
94 let mut parts = without_scheme.splitn(2, '/');
95 let bucket = parts.next().unwrap_or("").to_owned();
96 let prefix = parts.next().unwrap_or("");
97 let prefix = prefix
100 .trim_end_matches('/')
101 .trim_start_matches('/')
102 .to_owned();
103 Self { bucket, prefix }
104 }
105
106 #[must_use]
109 pub fn object_key(&self, checksum: &str) -> String {
110 self.key_for(&object_path(checksum))
111 }
112
113 #[must_use]
116 pub fn manifest_key(&self, id: &str) -> String {
117 self.key_for(&manifest_path(id))
118 }
119
120 fn key_for(&self, rel: &str) -> String {
125 let rel = rel.trim_start_matches('/');
126 if self.prefix.is_empty() {
127 rel.to_owned()
128 } else {
129 format!("{}/{rel}", self.prefix)
130 }
131 }
132}
133
134pub struct S3Store {
140 client: Client,
141 location: S3Location,
142 runtime: Arc<Runtime>,
143 config: TransferConfig,
144 meter: Option<Arc<Meter>>,
148}
149
150impl S3Store {
151 pub fn connect(store_url: &str, endpoint_url: Option<&str>) -> Result<Self, StoreError> {
164 Self::connect_with(store_url, endpoint_url, TransferConfig::default())
165 }
166
167 pub fn connect_with(
176 store_url: &str,
177 endpoint_url: Option<&str>,
178 config: TransferConfig,
179 ) -> Result<Self, StoreError> {
180 let location = S3Location::parse(store_url);
181 let runtime = build_runtime()?;
182
183 let http_client = ring_https_client();
184 let endpoint = endpoint_url.map(ToOwned::to_owned);
185 let client = runtime.block_on(async move {
186 let mut loader =
187 aws_config::defaults(BehaviorVersion::latest()).http_client(http_client.clone());
188 if let Some(ep) = endpoint.as_deref() {
189 loader = loader.endpoint_url(ep);
190 }
191 let shared = loader.load().await;
192 let mut builder = aws_sdk_s3::config::Builder::from(&shared);
193 if endpoint.is_some() {
194 builder = builder.force_path_style(true);
196 }
197 if shared.region().is_none() {
200 builder = builder.region(Region::new("us-east-1"));
201 }
202 Client::from_conf(builder.build())
203 });
204
205 Ok(Self {
206 client,
207 location,
208 runtime: Arc::new(runtime),
209 config,
210 meter: None,
211 })
212 }
213
214 pub fn from_client(client: Client, location: S3Location) -> Result<Self, StoreError> {
222 Ok(Self {
223 client,
224 location,
225 runtime: Arc::new(build_runtime()?),
226 config: TransferConfig::default(),
227 meter: None,
228 })
229 }
230
231 #[must_use]
237 pub fn with_meter(mut self, meter: Option<Arc<Meter>>) -> Self {
238 self.meter = meter;
239 self
240 }
241
242 #[must_use]
244 pub fn location(&self) -> &S3Location {
245 &self.location
246 }
247
248 #[must_use]
251 pub fn transfer_config(&self) -> &TransferConfig {
252 &self.config
253 }
254
255 async fn key_exists(&self, key: &str) -> Result<bool, StoreError> {
257 match self
258 .client
259 .head_object()
260 .bucket(&self.location.bucket)
261 .key(key)
262 .send()
263 .await
264 {
265 Ok(_) => Ok(true),
266 Err(err) => {
267 let svc = err.into_service_error();
268 if svc.is_not_found() {
269 Ok(false)
270 } else {
271 Err(backend("S3 HEAD object failed", svc))
272 }
273 }
274 }
275 }
276
277 async fn get_bytes(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
279 match self
280 .client
281 .get_object()
282 .bucket(&self.location.bucket)
283 .key(key)
284 .send()
285 .await
286 {
287 Ok(resp) => {
288 let data = resp
289 .body
290 .collect()
291 .await
292 .map_err(|e| backend("reading S3 object body", e))?;
293 Ok(Some(data.into_bytes().to_vec()))
294 }
295 Err(err) => {
296 let svc = err.into_service_error();
297 if svc.is_no_such_key() {
298 Ok(None)
299 } else {
300 Err(backend("S3 GET object failed", svc))
301 }
302 }
303 }
304 }
305
306 async fn put_bytes(&self, key: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
309 self.client
310 .put_object()
311 .bucket(&self.location.bucket)
312 .key(key)
313 .body(bytes.into())
314 .send()
315 .await
316 .map_err(|e| backend("S3 PUT object failed", e))?;
317 Ok(())
318 }
319
320 async fn fetch_verified(&self, key: &str, expected: &str) -> Result<Vec<u8>, StoreError> {
323 let hasher = Blake3Hasher::new();
324 let mut attempts_left = MAX_FETCH_RETRIES;
325 loop {
326 match self.get_bytes(key).await? {
327 Some(bytes) => {
328 let actual = hasher.hash_hex(&bytes);
329 if actual == expected {
330 return Ok(bytes);
331 }
332 attempts_left = attempts_left.saturating_sub(1);
335 if attempts_left == 0 {
336 return Err(StoreError::Integrity {
337 address: format!("s3://{}/{key}", self.location.bucket),
338 expected: expected.to_owned(),
339 actual,
340 });
341 }
342 }
343 None => {
344 return Err(StoreError::ObjectNotFound {
346 checksum: expected.to_owned(),
347 });
348 }
349 }
350 }
351 }
352}
353
354impl Store for S3Store {
355 fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
356 let key = self.location.manifest_key(id);
357 let bytes = self.runtime.block_on(async {
358 match self.get_bytes(&key).await? {
359 Some(b) => Ok(b),
360 None => Err(StoreError::ManifestNotFound { id: id.to_owned() }),
361 }
362 })?;
363
364 let text = String::from_utf8(bytes).map_err(|err| StoreError::Backend {
365 message: format!("manifest {id} is not valid UTF-8"),
366 source: Some(Box::new(err)),
367 })?;
368 let manifest = Manifest::parse(&text)?;
369
370 let actual = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
373 if actual != id {
374 return Err(StoreError::Integrity {
375 address: self.location.manifest_key(id),
376 expected: id.to_owned(),
377 actual,
378 });
379 }
380 Ok(manifest)
381 }
382
383 fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
384 let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
390 let meter = self.meter.as_deref();
391 self.runtime.block_on(async {
392 fetch_files_concurrent(
393 manifest,
394 dest,
395 &self.config,
396 &limiter,
397 meter,
398 |entry| async {
399 let key = self.location.object_key(&entry.checksum);
400 self.fetch_verified(&key, &entry.checksum).await
401 },
402 )
403 .await
404 })
405 }
406
407 fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
408 let hasher = Blake3Hasher::new();
409 let id = snapdir_core::merkle::snapshot_id(manifest, &hasher);
410
411 let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
417 let meter = self.meter.as_deref();
418 self.runtime.block_on(async {
419 let manifest_key = self.location.manifest_key(&id);
422 if self.key_exists(&manifest_key).await? {
423 return Ok(());
424 }
425
426 push_objects_concurrent(
427 manifest,
428 &self.config,
429 meter,
430 |entry| {
431 let object_key = self.location.object_key(&entry.checksum);
432 upload_object(
433 entry,
434 object_key,
435 source,
436 &limiter,
437 meter,
438 |key| async move { self.key_exists(&key).await },
439 |key, bytes| async move { self.put_bytes(&key, bytes).await },
440 )
441 },
442 || async {
443 let mut text = manifest.to_string();
446 text.push('\n');
447 let manifest_actual = hasher.hash_hex(text.as_bytes());
448 if manifest_actual != id {
449 return Err(StoreError::Integrity {
450 address: manifest_key.clone(),
451 expected: id.clone(),
452 actual: manifest_actual,
453 });
454 }
455 self.put_bytes(&manifest_key, text.into_bytes()).await
456 },
457 )
458 .await
459 })
460 }
461}
462
463impl StreamStore for S3Store {
464 fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
465 let key = self.location.object_key(checksum);
466 self.runtime.block_on(async { self.key_exists(&key).await })
467 }
468
469 fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
470 let key = self.location.object_key(checksum);
471 let bytes = self.runtime.block_on(async {
472 self.get_bytes(&key)
473 .await?
474 .ok_or_else(|| StoreError::ObjectNotFound {
475 checksum: checksum.to_owned(),
476 })
477 })?;
478
479 let actual = Blake3Hasher::new().hash_hex(&bytes);
482 if actual != checksum {
483 return Err(StoreError::Integrity {
484 address: format!("s3://{}/{key}", self.location.bucket),
485 expected: checksum.to_owned(),
486 actual,
487 });
488 }
489 Ok(bytes)
490 }
491
492 fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
493 let actual = Blake3Hasher::new().hash_hex(&bytes);
496 if actual != checksum {
497 return Err(StoreError::Integrity {
498 address: self.location.object_key(checksum),
499 expected: checksum.to_owned(),
500 actual,
501 });
502 }
503 let key = self.location.object_key(checksum);
504 self.runtime
505 .block_on(async { self.put_bytes(&key, bytes).await })
506 }
507
508 fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
509 let key = self.location.manifest_key(id);
510 let mut text = manifest.to_string();
513 text.push('\n');
514 let actual = Blake3Hasher::new().hash_hex(text.as_bytes());
515 if actual != id {
516 return Err(StoreError::Integrity {
517 address: key,
518 expected: id.to_owned(),
519 actual,
520 });
521 }
522 self.runtime
523 .block_on(async { self.put_bytes(&key, text.into_bytes()).await })
524 }
525}
526
527fn build_runtime() -> Result<Runtime, StoreError> {
529 tokio::runtime::Builder::new_multi_thread()
530 .enable_all()
531 .build()
532 .map_err(|e| backend("creating tokio runtime for S3Store", e))
533}
534
535fn ring_https_client() -> aws_smithy_runtime_api::client::http::SharedHttpClient {
541 HttpClientBuilder::new()
542 .tls_provider(TlsProvider::Rustls(CryptoMode::Ring))
543 .build_https()
544}
545
546fn backend<E>(message: &str, source: E) -> StoreError
548where
549 E: std::error::Error + Send + Sync + 'static,
550{
551 StoreError::Backend {
552 message: message.to_owned(),
553 source: Some(Box::new(source)),
554 }
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560 use snapdir_core::manifest::PathType;
561
562 fn strip_leading_dot_slash(path: &str) -> &str {
566 let trimmed = path.strip_prefix("./").unwrap_or(path);
567 trimmed.strip_suffix('/').unwrap_or(trimmed)
568 }
569
570 const FOO_CHECKSUM: &str = "49dc870df1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
572 const FOO_SHARDED: &str = "49d/c87/0df/1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
573 const MANIFEST_ID: &str = "aa91e498f401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
574 const MANIFEST_SHARDED: &str =
575 "aa9/1e4/98f/401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
576
577 #[test]
578 fn s3_store_parses_bucket_and_prefix() {
579 let loc = S3Location::parse("s3://my-bucket/long/term/storage");
580 assert_eq!(loc.bucket, "my-bucket");
581 assert_eq!(loc.prefix, "long/term/storage");
582 }
583
584 #[test]
585 fn s3_store_parse_matches_oracle_cut_fields() {
586 let loc = S3Location::parse("s3://bucket/a/b/c");
589 assert_eq!(loc.bucket, "bucket");
590 assert_eq!(loc.prefix, "a/b/c");
591 }
592
593 #[test]
594 fn s3_store_parse_strips_trailing_slash() {
595 let loc = S3Location::parse("s3://bucket/prefix/");
597 assert_eq!(loc.bucket, "bucket");
598 assert_eq!(loc.prefix, "prefix");
599 }
600
601 #[test]
602 fn s3_store_parse_bucket_root_has_empty_prefix() {
603 let loc = S3Location::parse("s3://bucket");
604 assert_eq!(loc.bucket, "bucket");
605 assert_eq!(loc.prefix, "");
606
607 let loc_slash = S3Location::parse("s3://bucket/");
608 assert_eq!(loc_slash.bucket, "bucket");
609 assert_eq!(loc_slash.prefix, "");
610 }
611
612 #[test]
613 fn s3_store_parse_accepts_bare_bucket_prefix_without_scheme() {
614 let loc = S3Location::parse("bucket/some/prefix");
615 assert_eq!(loc.bucket, "bucket");
616 assert_eq!(loc.prefix, "some/prefix");
617 }
618
619 #[test]
620 fn s3_store_object_key_matches_sharded_scheme() {
621 let loc = S3Location::parse("s3://b/long/term/storage");
622 assert_eq!(
623 loc.object_key(FOO_CHECKSUM),
624 format!("long/term/storage/.objects/{FOO_SHARDED}")
625 );
626 }
627
628 #[test]
629 fn s3_store_manifest_key_matches_sharded_scheme() {
630 let loc = S3Location::parse("s3://b/long/term/storage");
631 assert_eq!(
632 loc.manifest_key(MANIFEST_ID),
633 format!("long/term/storage/.manifests/{MANIFEST_SHARDED}")
634 );
635 }
636
637 #[test]
638 fn s3_store_keys_have_no_leading_slash_at_bucket_root() {
639 let loc = S3Location::parse("s3://bucket");
641 assert_eq!(
642 loc.object_key(FOO_CHECKSUM),
643 format!(".objects/{FOO_SHARDED}")
644 );
645 assert_eq!(
646 loc.manifest_key(MANIFEST_ID),
647 format!(".manifests/{MANIFEST_SHARDED}")
648 );
649 }
650
651 #[test]
652 fn s3_store_object_key_uses_core_object_path() {
653 let loc = S3Location::parse("s3://b");
656 assert_eq!(loc.object_key(FOO_CHECKSUM), object_path(FOO_CHECKSUM));
657 }
658
659 #[test]
660 fn s3_store_strip_leading_dot_slash() {
661 assert_eq!(strip_leading_dot_slash("./foo"), "foo");
662 assert_eq!(strip_leading_dot_slash("./a/b/c"), "a/b/c");
663 assert_eq!(strip_leading_dot_slash("./a/"), "a");
664 assert_eq!(strip_leading_dot_slash("./"), "");
665 }
666
667 #[test]
675 fn s3_store_live_round_trip_when_configured() {
676 use snapdir_core::manifest::ManifestEntry;
677
678 let (Ok(endpoint), Ok(store)) = (
679 std::env::var("SNAPDIR_S3_TEST_ENDPOINT"),
680 std::env::var("SNAPDIR_S3_TEST_STORE"),
681 ) else {
682 eprintln!(
683 "skipping s3_store live round-trip: set SNAPDIR_S3_TEST_ENDPOINT \
684 and SNAPDIR_S3_TEST_STORE (s3://bucket/prefix) to run it"
685 );
686 return;
687 };
688
689 let hasher = Blake3Hasher::new();
690
691 let src = std::env::temp_dir().join(format!("snapdir-s3-live-{}", std::process::id()));
693 std::fs::create_dir_all(&src).unwrap();
694 std::fs::write(src.join("foo"), b"foo\n").unwrap();
695 let foo_sum = hasher.hash_hex(b"foo\n");
696 let root_sum = snapdir_core::merkle::directory_checksum([foo_sum.as_str()], &hasher);
697 let mut manifest = Manifest::new();
698 manifest.push(ManifestEntry::new(
699 PathType::Directory,
700 "700",
701 root_sum,
702 4,
703 "./",
704 ));
705 manifest.push(ManifestEntry::new(
706 PathType::File,
707 "600",
708 foo_sum,
709 4,
710 "./foo",
711 ));
712 let manifest = Manifest::from_entries(manifest.entries().to_vec());
713 let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
714
715 let s3 = S3Store::connect(&store, Some(&endpoint)).expect("connect");
716 s3.push(&manifest, &src).expect("push");
717 let read_back = s3.get_manifest(&id).expect("get_manifest");
718 assert_eq!(read_back, manifest);
719
720 let dest = std::env::temp_dir().join(format!("snapdir-s3-dest-{}", std::process::id()));
721 std::fs::create_dir_all(&dest).unwrap();
722 s3.fetch_files(&read_back, &dest).expect("fetch_files");
723 assert_eq!(std::fs::read(dest.join("foo")).unwrap(), b"foo\n");
724
725 let _ = std::fs::remove_dir_all(&src);
726 let _ = std::fs::remove_dir_all(&dest);
727 }
728}