1pub use aws_sdk_s3;
5
6use std::{
7 collections::HashMap, fmt, future::ready, ops::Range, path::PathBuf, pin::Pin,
8 sync::Arc, time::Duration,
9};
10
11use async_trait::async_trait;
12use aws_config::{
13 AppName, BehaviorVersion, meta::region::RegionProviderChain, retry::RetryConfig,
14 timeout::TimeoutConfig,
15};
16use aws_credential_types::provider::error::CredentialsError;
17use aws_sdk_s3::{
18 Client,
19 config::{
20 Builder, ConfigBag, IdentityCache, Intercept, ProvideCredentials, Region,
21 RuntimeComponents, StalledStreamProtectionConfig,
22 interceptors::BeforeTransmitInterceptorContextMut,
23 },
24 error::{BoxError, SdkError},
25 operation::{copy_object::CopyObjectError, put_object::PutObjectError},
26 primitives::ByteStream,
27 types::{CompletedMultipartUpload, CompletedPart, Delete, Object, ObjectIdentifier},
28};
29use aws_smithy_runtime::client::retries::classifiers::HttpStatusCodeClassifier;
30use aws_smithy_types_convert::{
31 date_time::DateTimeExt as _, stream::PaginationStreamExt as _,
32};
33use bytes::Bytes;
34use chrono::{DateTime, Utc};
35use futures::{
36 Stream, StreamExt as _, TryStreamExt as _,
37 stream::{self, BoxStream, FuturesOrdered},
38};
39pub use icechunk_storage::s3_config::{
40 S3Credentials, S3CredentialsFetcher, S3Options, S3StaticCredentials,
41};
42use icechunk_storage::{
43 DeleteObjectsResult, GetModifiedResult, ListInfo, Settings, Storage, StorageError,
44 StorageErrorKind, StorageResult, VersionInfo, VersionedUpdateResult,
45 obj_not_found_res, obj_store_error, obj_store_error_res, other_error, sealed,
46 split_in_multiple_equal_requests, strip_quotes,
47};
48use icechunk_types::ICResultExt as _;
49use serde::{Deserialize, Serialize};
50use tokio::sync::OnceCell;
51use tokio_util::io::StreamReader;
52use tracing::{error, instrument};
53use typed_path::Utf8UnixPath;
54
55#[derive(Debug, Serialize, Deserialize)]
56pub struct S3Storage {
57 config: S3Options,
59 credentials: S3Credentials,
60 bucket: String,
61 prefix: String,
62 can_write: bool,
63 extra_read_headers: Vec<(String, String)>,
64 extra_write_headers: Vec<(String, String)>,
65 #[serde(skip)]
66 client: OnceCell<Arc<Client>>,
70}
71
72impl fmt::Display for S3Storage {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 write!(
75 f,
76 "S3Storage(bucket={}, prefix={}, config={})",
77 self.bucket, self.prefix, self.config,
78 )
79 }
80}
81#[derive(Debug)]
82struct ExtraHeadersInterceptor {
83 extra_read_headers: Vec<(String, String)>,
84 extra_write_headers: Vec<(String, String)>,
85}
86
87impl Intercept for ExtraHeadersInterceptor {
88 fn name(&self) -> &'static str {
89 "ExtraHeaders"
90 }
91
92 fn modify_before_retry_loop(
93 &self,
94 context: &mut BeforeTransmitInterceptorContextMut<'_>,
95 _runtime_components: &RuntimeComponents,
96 _cfg: &mut ConfigBag,
97 ) -> Result<(), BoxError> {
98 let request = context.request_mut();
99 match request.method() {
100 "GET" | "HEAD" | "OPTIONS" | "TRACE" => {
101 for (k, v) in self.extra_read_headers.iter() {
102 request.headers_mut().insert(k.clone(), v.clone());
103 }
104 }
105 _ => {
106 for (k, v) in self.extra_write_headers.iter() {
107 request.headers_mut().insert(k.clone(), v.clone());
108 }
109 }
110 }
111 Ok(())
112 }
113}
114
115#[instrument(skip(credentials))]
116pub async fn mk_client(
117 config: &S3Options,
118 credentials: S3Credentials,
119 extra_read_headers: Vec<(String, String)>,
120 extra_write_headers: Vec<(String, String)>,
121 settings: &Settings,
122) -> Client {
123 let region = config
124 .region
125 .as_ref()
126 .map(|r| RegionProviderChain::first_try(Some(Region::new(r.clone()))))
127 .unwrap_or_else(RegionProviderChain::default_provider);
128
129 let endpoint = config.endpoint_url.clone();
130 let region = if endpoint.is_some() {
131 region.or_else(Region::new("region-was-not-set"))
136 } else {
137 region
138 };
139
140 #[expect(clippy::unwrap_used)]
141 let app_name = AppName::new(icechunk_types::user_agent()).unwrap();
142 let mut aws_config = aws_config::defaults(BehaviorVersion::v2026_01_12())
143 .region(region)
144 .app_name(app_name);
145
146 if let Some(endpoint) = endpoint {
147 aws_config = aws_config.endpoint_url(endpoint);
148 }
149
150 let stalled_stream = if config.network_stream_timeout_seconds == Some(0) {
151 StalledStreamProtectionConfig::disabled()
152 } else {
153 StalledStreamProtectionConfig::enabled()
154 .grace_period(Duration::from_secs(
155 config.network_stream_timeout_seconds.unwrap_or(10) as u64,
156 ))
157 .build()
158 };
159 aws_config = aws_config.stalled_stream_protection(stalled_stream);
160
161 match credentials {
162 S3Credentials::FromEnv => {}
163 S3Credentials::Anonymous => aws_config = aws_config.no_credentials(),
164 S3Credentials::Static(credentials) => {
165 aws_config =
166 aws_config.credentials_provider(aws_credential_types::Credentials::new(
167 credentials.access_key_id,
168 credentials.secret_access_key,
169 credentials.session_token,
170 credentials.expires_after.map(|e| e.into()),
171 "user",
172 ));
173 }
174 S3Credentials::Refreshable(fetcher) => {
175 aws_config =
176 aws_config.credentials_provider(ProvideRefreshableCredentials(fetcher));
177 }
178 }
179
180 let retry_config = RetryConfig::standard()
181 .with_max_attempts(settings.retries().max_tries().get() as u32)
182 .with_initial_backoff(Duration::from_millis(
183 settings.retries().initial_backoff_ms() as u64,
184 ))
185 .with_max_backoff(Duration::from_millis(
186 settings.retries().max_backoff_ms() as u64
187 ));
188
189 if let Some(timeouts) = settings.timeouts() {
190 let mut timeout_builder = TimeoutConfig::builder();
191 if let Some(ms) = timeouts.connect_timeout_ms {
192 timeout_builder =
193 timeout_builder.connect_timeout(Duration::from_millis(ms as u64));
194 }
195 if let Some(ms) = timeouts.read_timeout_ms {
196 timeout_builder =
197 timeout_builder.read_timeout(Duration::from_millis(ms as u64));
198 }
199 if let Some(ms) = timeouts.operation_timeout_ms {
200 timeout_builder =
201 timeout_builder.operation_timeout(Duration::from_millis(ms as u64));
202 }
203 if let Some(ms) = timeouts.operation_attempt_timeout_ms {
204 timeout_builder = timeout_builder
205 .operation_attempt_timeout(Duration::from_millis(ms as u64));
206 }
207 aws_config = aws_config.timeout_config(timeout_builder.build());
208 }
209
210 let mut s3_builder = Builder::from(&aws_config.load().await)
211 .force_path_style(config.force_path_style)
212 .retry_config(retry_config);
213
214 let id_cache = IdentityCache::lazy()
216 .load_timeout(Duration::from_secs(120))
217 .buffer_time(Duration::from_secs(120))
218 .build();
219
220 s3_builder = s3_builder.identity_cache(id_cache);
221
222 static RETRY_CODES: &[u16] = &[408, 429, 499];
229 s3_builder = s3_builder
236 .retry_classifier(HttpStatusCodeClassifier::new_from_codes(RETRY_CODES));
237
238 if !extra_read_headers.is_empty() || !extra_write_headers.is_empty() {
239 s3_builder = s3_builder.interceptor(ExtraHeadersInterceptor {
240 extra_read_headers,
241 extra_write_headers,
242 });
243 }
244
245 let config = s3_builder.build();
246
247 Client::from_conf(config)
248}
249
250fn stream2stream(
251 s: ByteStream,
252) -> Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>> {
253 let res = stream::try_unfold(s, move |mut stream| async move {
254 let next = stream.try_next().await?;
255 Ok(next.map(|bytes| (bytes, stream)))
256 });
257 Box::pin(res)
258}
259
260impl S3Storage {
261 pub fn new(
262 config: S3Options,
263 bucket: String,
264 prefix: Option<String>,
265 credentials: S3Credentials,
266 can_write: bool,
267 extra_read_headers: Vec<(String, String)>,
268 extra_write_headers: Vec<(String, String)>,
269 ) -> Result<S3Storage, StorageError> {
270 let client = OnceCell::new();
271 let prefix = prefix.unwrap_or_default();
272 let prefix = prefix.strip_suffix("/").unwrap_or(prefix.as_str()).to_string();
273 Ok(S3Storage {
274 client,
275 config,
276 bucket,
277 prefix,
278 credentials,
279 can_write,
280 extra_read_headers,
281 extra_write_headers,
282 })
283 }
284
285 #[instrument(skip_all)]
289 pub async fn get_client(&self, settings: &Settings) -> &Arc<Client> {
290 self.client
291 .get_or_init(|| async {
292 Arc::new(
293 mk_client(
294 &self.config,
295 self.credentials.clone(),
296 self.extra_read_headers.clone(),
297 self.extra_write_headers.clone(),
298 settings,
299 )
300 .await,
301 )
302 })
303 .await
304 }
305
306 pub fn get_path_str(&self, file_prefix: &str, id: &str) -> StorageResult<String> {
307 let path = PathBuf::from_iter([self.prefix.as_str(), file_prefix, id]);
308 let path_str = path
309 .into_os_string()
310 .into_string()
311 .map_err(|s| StorageError::capture(StorageErrorKind::BadPrefix(s)))?;
312
313 Ok(path_str.replace("\\", "/"))
314 }
315
316 fn prefixed_path(&self, path: &str) -> String {
317 format!("{}/{path}", self.prefix)
318 }
319
320 async fn put_object_single<
321 I: IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
322 >(
323 &self,
324 settings: &Settings,
325 key: &str,
326 bytes: Bytes,
327 content_type: Option<impl Into<String>>,
328 metadata: I,
329 previous_version: Option<&VersionInfo>,
330 ) -> StorageResult<VersionedUpdateResult> {
331 let mut req = self
332 .get_client(settings)
333 .await
334 .put_object()
335 .bucket(self.bucket.clone())
336 .key(key)
337 .body(bytes.into());
338
339 if settings.unsafe_use_metadata() {
340 if let Some(ct) = content_type {
341 req = req.content_type(ct);
342 };
343
344 for (k, v) in metadata {
345 req = req.metadata(k, v);
346 }
347 }
348
349 if let Some(klass) = settings.storage_class() {
350 let klass = klass.as_str().into();
351 req = req.storage_class(klass);
352 }
353
354 if let Some(previous_version) = previous_version.as_ref() {
355 match (
356 previous_version.etag(),
357 settings.unsafe_use_conditional_create(),
358 settings.unsafe_use_conditional_update(),
359 ) {
360 (None, true, _) => req = req.if_none_match("*"),
361 (Some(etag), _, true) => req = req.if_match(strip_quotes(etag)),
362 (_, _, _) => {}
363 }
364 }
365
366 match req.send().await {
367 Ok(out) => {
368 let new_etag = out
369 .e_tag()
370 .ok_or(other_error("Object should have an etag".to_string()))?
371 .to_string();
372 let new_version = VersionInfo::from_etag_only(new_etag);
373 Ok(VersionedUpdateResult::Updated { new_version })
374 }
375 Err(SdkError::ServiceError(err)) => {
377 let code = err.err().meta().code().unwrap_or_default();
378 if code == "PreconditionFailed"
379 || code == "ConditionalRequestConflict"
380 || code == "ConcurrentModification"
382 {
383 Ok(VersionedUpdateResult::NotOnLatestVersion)
384 } else {
385 obj_store_error_res(SdkError::<PutObjectError>::ServiceError(err))
386 }
387 }
388 Err(SdkError::ResponseError(err)) => {
390 let status = err.raw().status().as_u16();
391 if status == 409 || status == 412 {
393 Ok(VersionedUpdateResult::NotOnLatestVersion)
394 } else {
395 obj_store_error_res(SdkError::<PutObjectError>::ResponseError(err))
396 }
397 }
398 Err(err) => obj_store_error_res(err),
399 }
400 }
401
402 async fn put_object_multipart<
403 I: IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
404 >(
405 &self,
406 settings: &Settings,
407 key: &str,
408 bytes: &Bytes,
409 content_type: Option<impl Into<String>>,
410 metadata: I,
411 previous_version: Option<&VersionInfo>,
412 ) -> StorageResult<VersionedUpdateResult> {
413 let mut multi = self
414 .get_client(settings)
415 .await
416 .create_multipart_upload()
417 .bucket(self.bucket.clone())
421 .key(key);
422
423 if settings.unsafe_use_metadata() {
424 if let Some(ct) = content_type {
425 multi = multi.content_type(ct);
426 };
427 for (k, v) in metadata {
428 multi = multi.metadata(k, v);
429 }
430 }
431
432 if let Some(klass) = settings.storage_class() {
433 let klass = klass.as_str().into();
434 multi = multi.storage_class(klass);
435 }
436
437 let create_res = multi.send().await.capture_box()?;
438 let upload_id = create_res.upload_id().ok_or(other_error(
439 "No upload_id in create multipart upload result".to_string(),
440 ))?;
441
442 let parts = split_in_multiple_equal_requests(
445 &(0..bytes.len() as u64),
446 settings.concurrency().ideal_concurrent_request_size().get(),
447 settings.concurrency().max_concurrent_requests_for_object().get(),
448 )
449 .collect::<Vec<_>>();
450
451 let results = parts
452 .into_iter()
453 .enumerate()
454 .map(|(part_idx, range)| async move {
455 let body = bytes.slice(range.start as usize..range.end as usize).into();
456 let idx = part_idx as i32 + 1;
457 let req = self
458 .get_client(settings)
459 .await
460 .upload_part()
461 .upload_id(upload_id)
462 .bucket(self.bucket.clone())
463 .key(key)
464 .part_number(idx)
465 .body(body);
466
467 req.send().await.map(|res| (idx, res))
468 })
469 .collect::<FuturesOrdered<_>>();
470
471 let completed_parts = results
472 .map_ok(|(idx, res)| {
473 let etag = res.e_tag().unwrap_or("");
474 CompletedPart::builder()
475 .e_tag(strip_quotes(etag))
476 .part_number(idx)
477 .build()
478 })
479 .try_collect::<Vec<_>>()
480 .await
481 .capture_box()?;
482
483 let completed_parts =
484 CompletedMultipartUpload::builder().set_parts(Some(completed_parts)).build();
485
486 let mut req = self
487 .get_client(settings)
488 .await
489 .complete_multipart_upload()
490 .bucket(self.bucket.clone())
491 .key(key)
492 .upload_id(upload_id)
493 .multipart_upload(completed_parts);
495
496 if let Some(previous_version) = previous_version.as_ref() {
497 match (
498 previous_version.etag(),
499 settings.unsafe_use_conditional_create(),
500 settings.unsafe_use_conditional_update(),
501 ) {
502 (None, true, _) => req = req.if_none_match("*"),
503 (Some(etag), _, true) => req = req.if_match(strip_quotes(etag)),
504 (_, _, _) => {}
505 }
506 }
507
508 match req.send().await {
509 Ok(out) => {
510 let new_etag = out
511 .e_tag()
512 .ok_or(other_error("Object should have an etag".to_string()))?
513 .to_string();
514 let new_version = VersionInfo::from_etag_only(new_etag);
515 Ok(VersionedUpdateResult::Updated { new_version })
516 }
517 Err(SdkError::ServiceError(err)) => {
519 let code = err.err().meta().code().unwrap_or_default();
520 if code == "PreconditionFailed"
521 || code == "ConditionalRequestConflict"
522 || code == "ConcurrentModification"
524 {
525 Ok(VersionedUpdateResult::NotOnLatestVersion)
526 } else {
527 obj_store_error_res(SdkError::ServiceError(err))
528 }
529 }
530 Err(SdkError::ResponseError(err)) => {
532 let status = err.raw().status().as_u16();
533 if status == 409 || status == 412 {
535 Ok(VersionedUpdateResult::NotOnLatestVersion)
536 } else {
537 obj_store_error_res(SdkError::<PutObjectError>::ResponseError(err))
538 }
539 }
540 Err(err) => obj_store_error_res(err),
541 }
542 }
543}
544
545pub fn range_to_header(range: &Range<u64>) -> String {
546 format!("bytes={}-{}", range.start, range.end - 1)
547}
548
549impl sealed::Sealed for S3Storage {}
550
551#[async_trait]
552#[typetag::serde]
553impl Storage for S3Storage {
554 async fn can_write(&self) -> StorageResult<bool> {
555 Ok(self.can_write)
556 }
557
558 async fn put_object(
559 &self,
560 settings: &Settings,
561 path: &str,
562 bytes: Bytes,
563 content_type: Option<&str>,
564 metadata: Vec<(String, String)>,
565 previous_version: Option<&VersionInfo>,
566 ) -> StorageResult<VersionedUpdateResult> {
567 let path = self.prefixed_path(path);
568 if bytes.len() >= settings.minimum_size_for_multipart_upload() as usize {
569 self.put_object_multipart(
570 settings,
571 path.as_str(),
572 &bytes,
573 content_type,
574 metadata,
575 previous_version,
576 )
577 .await
578 } else {
579 self.put_object_single(
580 settings,
581 path.as_str(),
582 bytes,
583 content_type,
584 metadata,
585 previous_version,
586 )
587 .await
588 }
589 }
590
591 async fn copy_object(
592 &self,
593 settings: &Settings,
594 from: &str,
595 to: &str,
596 content_type: Option<&str>,
597 version: &VersionInfo,
598 ) -> StorageResult<VersionedUpdateResult> {
599 let from = format!("{}/{}", self.bucket, self.prefixed_path(from));
600 let to = self.prefixed_path(to);
601 let mut req = self
602 .get_client(settings)
603 .await
604 .copy_object()
605 .bucket(self.bucket.clone())
606 .key(to)
607 .copy_source(from);
608 if settings.unsafe_use_conditional_update()
609 && let Some(etag) = version.etag()
610 {
611 req = req.copy_source_if_match(strip_quotes(etag));
612 }
613 if let Some(klass) = settings.storage_class() {
614 let klass = klass.as_str().into();
615 req = req.storage_class(klass);
616 }
617 if let Some(ct) = content_type {
618 req = req.content_type(ct);
619 }
620 if self.config.requester_pays {
621 req = req.request_payer(aws_sdk_s3::types::RequestPayer::Requester);
622 }
623 match req.send().await {
624 Ok(_) => Ok(VersionedUpdateResult::Updated { new_version: version.clone() }),
625 Err(SdkError::ServiceError(err)) => {
626 let code = err.err().meta().code().unwrap_or_default();
627 if code == "PreconditionFailed"
628 || code == "ConditionalRequestConflict"
629 || code == "ConcurrentModification"
631 {
632 Ok(VersionedUpdateResult::NotOnLatestVersion)
633 } else {
634 obj_store_error_res(SdkError::<CopyObjectError>::ServiceError(err))
635 }
636 }
637 Err(SdkError::ResponseError(err)) => {
639 let status = err.raw().status().as_u16();
640 if status == 409 || status == 412 {
642 Ok(VersionedUpdateResult::NotOnLatestVersion)
643 } else {
644 obj_store_error_res(SdkError::<PutObjectError>::ResponseError(err))
645 }
646 }
647 Err(sdk_err) => match sdk_err.as_service_error() {
648 Some(_)
649 if sdk_err
650 .raw_response()
651 .is_some_and(|x| x.status().as_u16() == 404) =>
652 {
653 obj_not_found_res()
658 }
659 _ => obj_store_error_res(sdk_err),
660 },
661 }
662 }
663
664 #[instrument(skip(self, settings))]
665 async fn list_objects<'a>(
666 &'a self,
667 settings: &Settings,
668 prefix: &str,
669 ) -> StorageResult<BoxStream<'a, StorageResult<ListInfo<String>>>> {
670 let prefix = format!("{}/{}", self.prefix, prefix).replace("//", "/");
671 let mut req = self
672 .get_client(settings)
673 .await
674 .list_objects_v2()
675 .bucket(self.bucket.clone())
676 .prefix(prefix.clone());
677
678 if self.config.requester_pays {
679 req = req.request_payer(aws_sdk_s3::types::RequestPayer::Requester);
680 }
681
682 let stream = req
683 .into_paginator()
684 .send()
685 .into_stream_03x()
686 .map_err(obj_store_error)
687 .try_filter_map(|page| {
688 let contents = page.contents.map(|cont| stream::iter(cont).map(Ok));
689 ready(Ok(contents))
690 })
691 .try_flatten()
692 .and_then(move |object| {
693 let prefix = prefix.clone();
694 ready(object_to_list_info(prefix.as_str(), &object))
695 });
696 Ok(stream.boxed())
697 }
698
699 #[instrument(skip(self, batch))]
700 async fn delete_batch(
701 &self,
702 settings: &Settings,
703 prefix: &str,
704 batch: Vec<(String, u64)>,
705 ) -> StorageResult<DeleteObjectsResult> {
706 let mut sizes = HashMap::new();
707 let mut ids = Vec::new();
708 for (id, size) in batch.into_iter() {
709 if let Ok(key) = self.get_path_str(prefix, id.as_str())
710 && let Ok(ident) = ObjectIdentifier::builder().key(key.clone()).build()
711 {
712 ids.push(ident);
713 sizes.insert(key, size);
714 }
715 }
716
717 let delete = Delete::builder()
718 .set_objects(Some(ids))
719 .build()
720 .map_err(|e| other_error(e.to_string()))?;
721
722 let mut req = self
723 .get_client(settings)
724 .await
725 .delete_objects()
726 .bucket(self.bucket.clone())
727 .delete(delete);
728
729 if self.config.requester_pays {
730 req = req.request_payer(aws_sdk_s3::types::RequestPayer::Requester);
731 }
732
733 let res = req.send().await.capture_box()?;
734
735 if let Some(err) = res.errors.as_ref().and_then(|e| e.first()) {
736 tracing::error!(
737 error = ?err,
738 "Errors deleting objects",
739 );
740 }
741
742 let mut result = DeleteObjectsResult::default();
743 for deleted in res.deleted() {
744 if let Some(key) = deleted.key() {
745 let size = sizes.get(key).unwrap_or(&0);
746 result.deleted_bytes += *size;
747 result.deleted_objects += 1;
748 } else {
749 tracing::error!("Deleted object without key");
750 }
751 }
752 Ok(result)
753 }
754
755 #[instrument(skip(self, settings))]
756 async fn get_object_last_modified(
757 &self,
758 path: &str,
759 settings: &Settings,
760 ) -> StorageResult<DateTime<Utc>> {
761 let key = self.prefixed_path(path);
762 let mut req = self
763 .get_client(settings)
764 .await
765 .head_object()
766 .bucket(self.bucket.clone())
767 .key(key);
768
769 if self.config.requester_pays {
770 req = req.request_payer(aws_sdk_s3::types::RequestPayer::Requester);
771 }
772
773 let res = req.send().await.capture_box()?;
774
775 let res = res
776 .last_modified
777 .ok_or(other_error("Object has no last_modified field".to_string()))?;
778 let res = res
779 .to_chrono_utc()
780 .map_err(|_| other_error("Invalid metadata timestamp".to_string()))?;
781
782 Ok(res)
783 }
784
785 #[instrument(skip(self, settings))]
786 async fn get_object_conditional(
787 &self,
788 settings: &Settings,
789 path: &str,
790 previous_version: Option<&VersionInfo>,
791 ) -> StorageResult<GetModifiedResult> {
792 match self
793 .get_object_range_conditional(settings, path, None, previous_version)
794 .await
795 {
796 Ok(Some((stream, new_version))) => {
797 let reader = StreamReader::new(stream.map_err(std::io::Error::other));
798 Ok(GetModifiedResult::Modified { data: Box::pin(reader), new_version })
799 }
800 Ok(None) => Ok(GetModifiedResult::OnLatestVersion),
801 Err(e) => Err(e),
802 }
803 }
804
805 async fn get_object_range(
806 &self,
807 settings: &Settings,
808 path: &str,
809 range: Option<&Range<u64>>,
810 ) -> StorageResult<(
811 Pin<Box<dyn Stream<Item = Result<Bytes, StorageError>> + Send>>,
812 VersionInfo,
813 )> {
814 self.get_object_range_conditional(settings, path, range, None).await.map(|v| {
815 #[expect(clippy::expect_used)]
819 v.expect("Logic bug in get_object_range_conditional, should not get None")
820 })
821 }
822}
823
824impl S3Storage {
825 async fn get_object_range_conditional(
826 &self,
827 settings: &Settings,
828 path: &str,
829 range: Option<&Range<u64>>,
830 previous_version: Option<&VersionInfo>,
831 ) -> StorageResult<
832 Option<(
833 Pin<Box<dyn Stream<Item = Result<Bytes, StorageError>> + Send>>,
834 VersionInfo,
835 )>,
836 > {
837 let client = self.get_client(settings).await;
838 let bucket = self.bucket.clone();
839 let key = self.prefixed_path(path);
840
841 let mut req = client.get_object().bucket(bucket).key(key);
842
843 if let Some(range) = range {
844 req = req.range(range_to_header(range));
845 }
846
847 if self.config.requester_pays {
848 req = req.request_payer(aws_sdk_s3::types::RequestPayer::Requester);
849 }
850
851 if let Some(previous_version) = previous_version.as_ref()
852 && let Some(etag) = previous_version.etag()
853 {
854 req = req.if_none_match(strip_quotes(etag));
855 };
856
857 match req.send().await {
858 Ok(output) => match output.e_tag {
859 Some(etag) => {
860 let stream = stream2stream(output.body)
861 .map_err(|e| StorageError::capture(e.into()));
862 Ok(Some((Box::pin(stream), VersionInfo::from_etag_only(etag))))
863 }
864 None => Err(other_error("Object should have an etag".to_string())),
865 },
866 Err(sdk_err) => {
867 if let SdkError::ResponseError(ref e) = sdk_err
872 && e.raw().status().as_u16() == 304
873 {
874 return Ok(None);
875 };
876
877 match sdk_err.as_service_error() {
878 Some(e) if e.is_no_such_key() => {
879 obj_not_found_res()
880 }
881 Some(_)
882 if sdk_err
883 .raw_response()
884 .is_some_and(|x| x.status().as_u16() == 404) =>
885 {
886 obj_not_found_res()
891 }
892 Some(_)
893 if sdk_err
899 .raw_response()
900 .is_some_and(|x| x.status().as_u16() == 304) =>
901 {
902 Ok(None)
903 }
904 _ => obj_store_error_res(sdk_err),
905 }
906 }
907 }
908 }
909}
910
911fn object_to_list_info(prefix: &str, object: &Object) -> StorageResult<ListInfo<String>> {
912 let inner = || {
913 let key = object.key()?;
914 let last_modified = object.last_modified()?;
915 let created_at = last_modified.to_chrono_utc().ok()?;
916 let prefix = Utf8UnixPath::new(prefix);
917 let id = Utf8UnixPath::new(key).strip_prefix(prefix).ok()?.to_string();
918 let size_bytes = object.size.unwrap_or(0) as u64;
919 Some(ListInfo { id, created_at, size_bytes })
920 };
921 inner()
922 .ok_or_else(|| StorageError::capture(StorageErrorKind::BadPrefix(prefix.into())))
923}
924
925#[derive(Debug)]
926struct ProvideRefreshableCredentials(Arc<dyn S3CredentialsFetcher>);
927
928impl ProvideCredentials for ProvideRefreshableCredentials {
929 fn provide_credentials<'a>(
930 &'a self,
931 ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
932 where
933 Self: 'a,
934 {
935 aws_credential_types::provider::future::ProvideCredentials::new(self.provide())
936 }
937}
938
939impl ProvideRefreshableCredentials {
940 async fn provide(
941 &self,
942 ) -> Result<aws_credential_types::Credentials, CredentialsError> {
943 let creds = self
944 .0
945 .get()
946 .await
947 .inspect_err(|err| error!(error = err, "Cannot load credentials"))
948 .map_err(CredentialsError::not_loaded)?;
949 let creds = aws_credential_types::Credentials::new(
950 creds.access_key_id,
951 creds.secret_access_key,
952 creds.session_token,
953 creds.expires_after.map(|e| e.into()),
954 "user",
955 );
956 Ok(creds)
957 }
958}
959
960pub fn new_s3_storage(
963 config: S3Options,
964 bucket: String,
965 prefix: Option<String>,
966 credentials: Option<S3Credentials>,
967) -> StorageResult<Arc<dyn Storage + Send + Sync>> {
968 if let Some(endpoint) = &config.endpoint_url
969 && (endpoint.contains("fly.storage.tigris.dev")
970 || endpoint.contains("t3.storage.dev"))
971 {
972 return Err(other_error(
973 "Tigris Storage is not S3 compatible, use the Tigris specific constructor instead"
974 .to_string(),
975 ));
976 }
977
978 let st = S3Storage::new(
979 config,
980 bucket,
981 prefix,
982 credentials.unwrap_or(S3Credentials::FromEnv),
983 true,
984 Vec::new(),
985 Vec::new(),
986 )?;
987 Ok(Arc::new(st))
988}
989
990pub fn new_r2_storage(
991 config: S3Options,
992 bucket: Option<String>,
993 prefix: Option<String>,
994 account_id: Option<String>,
995 credentials: Option<S3Credentials>,
996) -> StorageResult<Arc<dyn Storage + Send + Sync>> {
997 let (bucket, prefix) = match (bucket, prefix) {
998 (Some(bucket), Some(prefix)) => (bucket, Some(prefix)),
999 (None, Some(prefix)) => match prefix.split_once("/") {
1000 Some((bucket, prefix)) => (bucket.to_string(), Some(prefix.to_string())),
1001 None => (prefix, None),
1002 },
1003 (Some(bucket), None) => (bucket, None),
1004 (None, None) => {
1005 return Err(StorageErrorKind::R2ConfigurationError(
1006 "Either bucket or prefix must be provided.".to_string(),
1007 ))
1008 .capture();
1009 }
1010 };
1011
1012 if config.endpoint_url.is_none() && account_id.is_none() {
1013 return Err(StorageErrorKind::R2ConfigurationError(
1014 "Either endpoint_url or account_id must be provided.".to_string(),
1015 ))
1016 .capture();
1017 }
1018
1019 let config = S3Options {
1020 region: config.region.or(Some("auto".to_string())),
1021 endpoint_url: config
1022 .endpoint_url
1023 .or(account_id.map(|x| format!("https://{x}.r2.cloudflarestorage.com"))),
1024 force_path_style: true,
1025 ..config
1026 };
1027 let st = S3Storage::new(
1028 config,
1029 bucket,
1030 prefix,
1031 credentials.unwrap_or(S3Credentials::FromEnv),
1032 true,
1033 Vec::new(),
1034 Vec::new(),
1035 )?;
1036 Ok(Arc::new(st))
1037}
1038
1039pub fn new_tigris_storage(
1040 config: S3Options,
1041 bucket: String,
1042 prefix: Option<String>,
1043 credentials: Option<S3Credentials>,
1044 use_weak_consistency: bool,
1045) -> StorageResult<Arc<dyn Storage + Send + Sync>> {
1046 let config = S3Options {
1047 endpoint_url: Some(
1048 config.endpoint_url.unwrap_or("https://t3.storage.dev".to_string()),
1049 ),
1050 ..config
1051 };
1052 let mut extra_write_headers = Vec::with_capacity(2);
1053 let mut extra_read_headers = Vec::with_capacity(3);
1054
1055 if !use_weak_consistency {
1056 if let Some(region) = config.region.as_ref() {
1059 extra_write_headers.push(("X-Tigris-Regions".to_string(), region.clone()));
1060 extra_write_headers
1061 .push(("X-Tigris-Consistent".to_string(), "true".to_string()));
1062
1063 extra_read_headers.push(("X-Tigris-Regions".to_string(), region.clone()));
1064 extra_read_headers
1065 .push(("Cache-Control".to_string(), "no-cache".to_string()));
1066 extra_read_headers
1067 .push(("X-Tigris-Consistent".to_string(), "true".to_string()));
1068 } else {
1069 return Err(other_error("Tigris storage requires a region to provide full consistency. Either set the region for the bucket or use the read-only, eventually consistent storage by passing `use_weak_consistency=True` (experts only)".to_string()));
1070 }
1071 }
1072 let st = S3Storage::new(
1073 config,
1074 bucket,
1075 prefix,
1076 credentials.unwrap_or(S3Credentials::FromEnv),
1077 !use_weak_consistency, extra_read_headers,
1079 extra_write_headers,
1080 )?;
1081 Ok(Arc::new(st))
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086 use icechunk_macros::tokio_test;
1087
1088 use super::*;
1089
1090 #[tokio_test]
1091 async fn test_serialize_s3_storage() {
1092 let config = S3Options {
1093 region: Some("us-west-2".to_string()),
1094 endpoint_url: Some("http://localhost:9000".to_string()),
1095 allow_http: true,
1096 anonymous: false,
1097 force_path_style: false,
1098 network_stream_timeout_seconds: None,
1099 requester_pays: false,
1100 };
1101 let credentials = S3Credentials::Static(S3StaticCredentials {
1102 access_key_id: "access_key_id".to_string(),
1103 secret_access_key: "secret_access_key".to_string(),
1104 session_token: Some("session_token".to_string()),
1105 expires_after: None,
1106 });
1107 let storage = S3Storage::new(
1108 config,
1109 "bucket".to_string(),
1110 Some("prefix".to_string()),
1111 credentials,
1112 true,
1113 Vec::new(),
1114 Vec::new(),
1115 )
1116 .unwrap();
1117
1118 let serialized = serde_json::to_string(&storage).unwrap();
1119
1120 assert_eq!(
1121 serialized,
1122 r#"{"config":{"region":"us-west-2","endpoint_url":"http://localhost:9000","anonymous":false,"allow_http":true,"force_path_style":false,"network_stream_timeout_seconds":null,"requester_pays":false},"credentials":{"s3_credential_type":"static","access_key_id":"access_key_id","secret_access_key":"secret_access_key","session_token":"session_token","expires_after":null},"bucket":"bucket","prefix":"prefix","can_write":true,"extra_read_headers":[],"extra_write_headers":[]}"#
1123 );
1124
1125 let deserialized: S3Storage = serde_json::from_str(&serialized).unwrap();
1126 assert_eq!(storage.config, deserialized.config);
1127 }
1128}