1use std::path::Path;
41use std::sync::Arc;
42
43use aws_config::BehaviorVersion;
44use aws_sdk_s3::config::Region;
45use aws_sdk_s3::Client;
46use aws_smithy_http_client::tls::rustls_provider::CryptoMode;
47use aws_smithy_http_client::tls::Provider as TlsProvider;
48use aws_smithy_http_client::Builder as HttpClientBuilder;
49use snapdir_core::manifest::Manifest;
50use snapdir_core::merkle::{Blake3Hasher, Hasher};
51use snapdir_core::store::{manifest_path, object_path, Store, StoreError};
52use snapdir_core::Meter;
53
54use crate::fetch::fetch_files_concurrent;
55use crate::push::{push_objects_concurrent, upload_object};
56use crate::stream::StreamStore;
57use crate::transfer::{RateLimiter, TransferConfig};
58
59use tokio::runtime::Runtime;
60
61const MAX_FETCH_RETRIES: u32 = 5;
64
65#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct S3Location {
68 pub bucket: String,
70 pub prefix: String,
73}
74
75impl S3Location {
76 #[must_use]
86 pub fn parse(store_url: &str) -> Self {
87 let without_scheme = match store_url.find("://") {
91 Some(idx) => &store_url[idx + 3..],
92 None => store_url,
93 };
94 let mut parts = without_scheme.splitn(2, '/');
95 let bucket = parts.next().unwrap_or("").to_owned();
96 let prefix = parts.next().unwrap_or("");
97 let prefix = prefix
100 .trim_end_matches('/')
101 .trim_start_matches('/')
102 .to_owned();
103 Self { bucket, prefix }
104 }
105
106 #[must_use]
109 pub fn object_key(&self, checksum: &str) -> String {
110 self.key_for(&object_path(checksum))
111 }
112
113 #[must_use]
116 pub fn manifest_key(&self, id: &str) -> String {
117 self.key_for(&manifest_path(id))
118 }
119
120 fn key_for(&self, rel: &str) -> String {
125 let rel = rel.trim_start_matches('/');
126 if self.prefix.is_empty() {
127 rel.to_owned()
128 } else {
129 format!("{}/{rel}", self.prefix)
130 }
131 }
132}
133
134pub struct S3Store {
140 client: Client,
141 location: S3Location,
142 runtime: Arc<Runtime>,
143 config: TransferConfig,
144 meter: Option<Arc<Meter>>,
148}
149
150impl S3Store {
151 pub fn connect(store_url: &str, endpoint_url: Option<&str>) -> Result<Self, StoreError> {
164 Self::connect_with(store_url, endpoint_url, TransferConfig::default())
165 }
166
167 pub fn connect_with(
176 store_url: &str,
177 endpoint_url: Option<&str>,
178 config: TransferConfig,
179 ) -> Result<Self, StoreError> {
180 let location = S3Location::parse(store_url);
181 let runtime = build_runtime()?;
182
183 let http_client = ring_https_client();
184 let endpoint = endpoint_url.map(ToOwned::to_owned);
185 let client = runtime.block_on(async move {
186 let mut loader =
187 aws_config::defaults(BehaviorVersion::latest()).http_client(http_client.clone());
188 if let Some(ep) = endpoint.as_deref() {
189 loader = loader.endpoint_url(ep);
190 }
191 let shared = loader.load().await;
192 let mut builder = aws_sdk_s3::config::Builder::from(&shared);
193 if endpoint.is_some() {
194 builder = builder.force_path_style(true);
196 }
197 if shared.region().is_none() {
200 builder = builder.region(Region::new("us-east-1"));
201 }
202 Client::from_conf(builder.build())
203 });
204
205 Ok(Self {
206 client,
207 location,
208 runtime: Arc::new(runtime),
209 config,
210 meter: None,
211 })
212 }
213
214 pub fn from_client(client: Client, location: S3Location) -> Result<Self, StoreError> {
222 Ok(Self {
223 client,
224 location,
225 runtime: Arc::new(build_runtime()?),
226 config: TransferConfig::default(),
227 meter: None,
228 })
229 }
230
231 #[must_use]
237 pub fn with_meter(mut self, meter: Option<Arc<Meter>>) -> Self {
238 self.meter = meter;
239 self
240 }
241
242 #[must_use]
244 pub fn location(&self) -> &S3Location {
245 &self.location
246 }
247
248 #[must_use]
251 pub fn transfer_config(&self) -> &TransferConfig {
252 &self.config
253 }
254
255 async fn key_exists(&self, key: &str) -> Result<bool, StoreError> {
257 match self
258 .client
259 .head_object()
260 .bucket(&self.location.bucket)
261 .key(key)
262 .send()
263 .await
264 {
265 Ok(_) => Ok(true),
266 Err(err) => {
267 let svc = err.into_service_error();
268 if svc.is_not_found() {
269 Ok(false)
270 } else {
271 Err(backend("S3 HEAD object failed", svc))
272 }
273 }
274 }
275 }
276
277 async fn get_bytes(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
279 match self
280 .client
281 .get_object()
282 .bucket(&self.location.bucket)
283 .key(key)
284 .send()
285 .await
286 {
287 Ok(resp) => {
288 let data = resp
289 .body
290 .collect()
291 .await
292 .map_err(|e| backend("reading S3 object body", e))?;
293 Ok(Some(data.into_bytes().to_vec()))
294 }
295 Err(err) => {
296 let svc = err.into_service_error();
297 if svc.is_no_such_key() {
298 Ok(None)
299 } else {
300 Err(backend("S3 GET object failed", svc))
301 }
302 }
303 }
304 }
305
306 async fn put_bytes(&self, key: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
309 self.client
310 .put_object()
311 .bucket(&self.location.bucket)
312 .key(key)
313 .body(bytes.into())
314 .send()
315 .await
316 .map_err(|e| backend("S3 PUT object failed", e))?;
317 Ok(())
318 }
319
320 async fn fetch_verified(&self, key: &str, expected: &str) -> Result<Vec<u8>, StoreError> {
323 let hasher = Blake3Hasher::new();
324 let mut attempts_left = MAX_FETCH_RETRIES;
325 loop {
326 match self.get_bytes(key).await? {
327 Some(bytes) => {
328 let actual = hasher.hash_hex(&bytes);
329 if actual == expected {
330 return Ok(bytes);
331 }
332 attempts_left = attempts_left.saturating_sub(1);
335 if attempts_left == 0 {
336 return Err(StoreError::Integrity {
337 address: format!("s3://{}/{key}", self.location.bucket),
338 expected: expected.to_owned(),
339 actual,
340 });
341 }
342 }
343 None => {
344 return Err(StoreError::ObjectNotFound {
346 checksum: expected.to_owned(),
347 });
348 }
349 }
350 }
351 }
352}
353
354impl Store for S3Store {
355 fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
356 let key = self.location.manifest_key(id);
357 let bytes = self.runtime.block_on(async {
358 match self.get_bytes(&key).await? {
359 Some(b) => Ok(b),
360 None => Err(StoreError::ManifestNotFound { id: id.to_owned() }),
361 }
362 })?;
363
364 let text = String::from_utf8(bytes).map_err(|err| StoreError::Backend {
365 message: format!("manifest {id} is not valid UTF-8"),
366 source: Some(Box::new(err)),
367 })?;
368 let manifest = Manifest::parse(&text)?;
369
370 let actual = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
373 if actual != id {
374 return Err(StoreError::Integrity {
375 address: self.location.manifest_key(id),
376 expected: id.to_owned(),
377 actual,
378 });
379 }
380 Ok(manifest)
381 }
382
383 fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
384 let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
390 let meter = self.meter.as_deref();
391 let meter_arc = self.meter.clone();
392 self.runtime.block_on(async {
393 fetch_files_concurrent(
394 manifest,
395 dest,
396 &self.config,
397 &limiter,
398 meter,
399 meter_arc,
400 |entry| async {
401 let key = self.location.object_key(&entry.checksum);
402 self.fetch_verified(&key, &entry.checksum).await
403 },
404 )
405 .await
406 })
407 }
408
409 fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
410 let hasher = Blake3Hasher::new();
411 let id = snapdir_core::merkle::snapshot_id(manifest, &hasher);
412
413 let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
419 let meter = self.meter.as_deref();
420 let meter_arc = self.meter.clone();
421 self.runtime.block_on(async {
422 let manifest_key = self.location.manifest_key(&id);
425 if self.key_exists(&manifest_key).await? {
426 return Ok(());
427 }
428
429 push_objects_concurrent(
430 manifest,
431 &self.config,
432 &limiter,
433 meter,
434 meter_arc,
435 |entry| {
436 let object_key = self.location.object_key(&entry.checksum);
437 upload_object(
438 entry,
439 object_key,
440 source,
441 &limiter,
442 meter,
443 |key| async move { self.key_exists(&key).await },
444 |key, bytes| async move { self.put_bytes(&key, bytes).await },
445 )
446 },
447 || async {
448 let mut text = manifest.to_string();
451 text.push('\n');
452 let manifest_actual = hasher.hash_hex(text.as_bytes());
453 if manifest_actual != id {
454 return Err(StoreError::Integrity {
455 address: manifest_key.clone(),
456 expected: id.clone(),
457 actual: manifest_actual,
458 });
459 }
460 self.put_bytes(&manifest_key, text.into_bytes()).await
461 },
462 )
463 .await
464 })
465 }
466}
467
468impl StreamStore for S3Store {
469 fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
470 let key = self.location.object_key(checksum);
471 self.runtime.block_on(async { self.key_exists(&key).await })
472 }
473
474 fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
475 let key = self.location.object_key(checksum);
476 let bytes = self.runtime.block_on(async {
477 self.get_bytes(&key)
478 .await?
479 .ok_or_else(|| StoreError::ObjectNotFound {
480 checksum: checksum.to_owned(),
481 })
482 })?;
483
484 let actual = Blake3Hasher::new().hash_hex(&bytes);
487 if actual != checksum {
488 return Err(StoreError::Integrity {
489 address: format!("s3://{}/{key}", self.location.bucket),
490 expected: checksum.to_owned(),
491 actual,
492 });
493 }
494 Ok(bytes)
495 }
496
497 fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
498 let actual = Blake3Hasher::new().hash_hex(&bytes);
501 if actual != checksum {
502 return Err(StoreError::Integrity {
503 address: self.location.object_key(checksum),
504 expected: checksum.to_owned(),
505 actual,
506 });
507 }
508 let key = self.location.object_key(checksum);
509 self.runtime
510 .block_on(async { self.put_bytes(&key, bytes).await })
511 }
512
513 fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
514 let key = self.location.manifest_key(id);
515 let mut text = manifest.to_string();
518 text.push('\n');
519 let actual = Blake3Hasher::new().hash_hex(text.as_bytes());
520 if actual != id {
521 return Err(StoreError::Integrity {
522 address: key,
523 expected: id.to_owned(),
524 actual,
525 });
526 }
527 self.runtime
528 .block_on(async { self.put_bytes(&key, text.into_bytes()).await })
529 }
530}
531
532fn build_runtime() -> Result<Runtime, StoreError> {
534 tokio::runtime::Builder::new_multi_thread()
535 .enable_all()
536 .build()
537 .map_err(|e| backend("creating tokio runtime for S3Store", e))
538}
539
540fn ring_https_client() -> aws_smithy_runtime_api::client::http::SharedHttpClient {
546 HttpClientBuilder::new()
547 .tls_provider(TlsProvider::Rustls(CryptoMode::Ring))
548 .build_https()
549}
550
551fn backend<E>(message: &str, source: E) -> StoreError
553where
554 E: std::error::Error + Send + Sync + 'static,
555{
556 StoreError::Backend {
557 message: message.to_owned(),
558 source: Some(Box::new(source)),
559 }
560}
561
562#[cfg(test)]
563mod tests {
564 use super::*;
565 use snapdir_core::manifest::PathType;
566
567 fn strip_leading_dot_slash(path: &str) -> &str {
571 let trimmed = path.strip_prefix("./").unwrap_or(path);
572 trimmed.strip_suffix('/').unwrap_or(trimmed)
573 }
574
575 const FOO_CHECKSUM: &str = "49dc870df1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
577 const FOO_SHARDED: &str = "49d/c87/0df/1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
578 const MANIFEST_ID: &str = "aa91e498f401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
579 const MANIFEST_SHARDED: &str =
580 "aa9/1e4/98f/401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
581
582 #[test]
583 fn s3_store_parses_bucket_and_prefix() {
584 let loc = S3Location::parse("s3://my-bucket/long/term/storage");
585 assert_eq!(loc.bucket, "my-bucket");
586 assert_eq!(loc.prefix, "long/term/storage");
587 }
588
589 #[test]
590 fn s3_store_parse_matches_oracle_cut_fields() {
591 let loc = S3Location::parse("s3://bucket/a/b/c");
594 assert_eq!(loc.bucket, "bucket");
595 assert_eq!(loc.prefix, "a/b/c");
596 }
597
598 #[test]
599 fn s3_store_parse_strips_trailing_slash() {
600 let loc = S3Location::parse("s3://bucket/prefix/");
602 assert_eq!(loc.bucket, "bucket");
603 assert_eq!(loc.prefix, "prefix");
604 }
605
606 #[test]
607 fn s3_store_parse_bucket_root_has_empty_prefix() {
608 let loc = S3Location::parse("s3://bucket");
609 assert_eq!(loc.bucket, "bucket");
610 assert_eq!(loc.prefix, "");
611
612 let loc_slash = S3Location::parse("s3://bucket/");
613 assert_eq!(loc_slash.bucket, "bucket");
614 assert_eq!(loc_slash.prefix, "");
615 }
616
617 #[test]
618 fn s3_store_parse_accepts_bare_bucket_prefix_without_scheme() {
619 let loc = S3Location::parse("bucket/some/prefix");
620 assert_eq!(loc.bucket, "bucket");
621 assert_eq!(loc.prefix, "some/prefix");
622 }
623
624 #[test]
625 fn s3_store_object_key_matches_sharded_scheme() {
626 let loc = S3Location::parse("s3://b/long/term/storage");
627 assert_eq!(
628 loc.object_key(FOO_CHECKSUM),
629 format!("long/term/storage/.objects/{FOO_SHARDED}")
630 );
631 }
632
633 #[test]
634 fn s3_store_manifest_key_matches_sharded_scheme() {
635 let loc = S3Location::parse("s3://b/long/term/storage");
636 assert_eq!(
637 loc.manifest_key(MANIFEST_ID),
638 format!("long/term/storage/.manifests/{MANIFEST_SHARDED}")
639 );
640 }
641
642 #[test]
643 fn s3_store_keys_have_no_leading_slash_at_bucket_root() {
644 let loc = S3Location::parse("s3://bucket");
646 assert_eq!(
647 loc.object_key(FOO_CHECKSUM),
648 format!(".objects/{FOO_SHARDED}")
649 );
650 assert_eq!(
651 loc.manifest_key(MANIFEST_ID),
652 format!(".manifests/{MANIFEST_SHARDED}")
653 );
654 }
655
656 #[test]
657 fn s3_store_object_key_uses_core_object_path() {
658 let loc = S3Location::parse("s3://b");
661 assert_eq!(loc.object_key(FOO_CHECKSUM), object_path(FOO_CHECKSUM));
662 }
663
664 #[test]
665 fn s3_store_strip_leading_dot_slash() {
666 assert_eq!(strip_leading_dot_slash("./foo"), "foo");
667 assert_eq!(strip_leading_dot_slash("./a/b/c"), "a/b/c");
668 assert_eq!(strip_leading_dot_slash("./a/"), "a");
669 assert_eq!(strip_leading_dot_slash("./"), "");
670 }
671
672 #[test]
680 fn s3_store_live_round_trip_when_configured() {
681 use snapdir_core::manifest::ManifestEntry;
682
683 let (Ok(endpoint), Ok(store)) = (
684 std::env::var("SNAPDIR_S3_TEST_ENDPOINT"),
685 std::env::var("SNAPDIR_S3_TEST_STORE"),
686 ) else {
687 eprintln!(
688 "skipping s3_store live round-trip: set SNAPDIR_S3_TEST_ENDPOINT \
689 and SNAPDIR_S3_TEST_STORE (s3://bucket/prefix) to run it"
690 );
691 return;
692 };
693
694 let hasher = Blake3Hasher::new();
695
696 let src = std::env::temp_dir().join(format!("snapdir-s3-live-{}", std::process::id()));
698 std::fs::create_dir_all(&src).unwrap();
699 std::fs::write(src.join("foo"), b"foo\n").unwrap();
700 let foo_sum = hasher.hash_hex(b"foo\n");
701 let root_sum = snapdir_core::merkle::directory_checksum([foo_sum.as_str()], &hasher);
702 let mut manifest = Manifest::new();
703 manifest.push(ManifestEntry::new(
704 PathType::Directory,
705 "700",
706 root_sum,
707 4,
708 "./",
709 ));
710 manifest.push(ManifestEntry::new(
711 PathType::File,
712 "600",
713 foo_sum,
714 4,
715 "./foo",
716 ));
717 let manifest = Manifest::from_entries(manifest.entries().to_vec());
718 let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
719
720 let s3 = S3Store::connect(&store, Some(&endpoint)).expect("connect");
721 s3.push(&manifest, &src).expect("push");
722 let read_back = s3.get_manifest(&id).expect("get_manifest");
723 assert_eq!(read_back, manifest);
724
725 let dest = std::env::temp_dir().join(format!("snapdir-s3-dest-{}", std::process::id()));
726 std::fs::create_dir_all(&dest).unwrap();
727 s3.fetch_files(&read_back, &dest).expect("fetch_files");
728 assert_eq!(std::fs::read(dest.join("foo")).unwrap(), b"foo\n");
729
730 let _ = std::fs::remove_dir_all(&src);
731 let _ = std::fs::remove_dir_all(&dest);
732 }
733}