Skip to main content

icechunk_s3/
lib.rs

1//! Native S3 client implementation of [`Storage`](icechunk_storage::Storage).
2
3// Re-export AWS SDK types needed by consumers (e.g., icechunk's virtual_chunks)
4pub use aws_sdk_s3;
5
6use std::{
7    collections::HashMap, fmt, future::ready, ops::Range, path::PathBuf, pin::Pin,
8    sync::Arc, time::Duration,
9};
10
11use async_trait::async_trait;
12use aws_config::{
13    AppName, BehaviorVersion, meta::region::RegionProviderChain, retry::RetryConfig,
14    timeout::TimeoutConfig,
15};
16use aws_credential_types::provider::error::CredentialsError;
17use aws_sdk_s3::{
18    Client,
19    config::{
20        Builder, ConfigBag, IdentityCache, Intercept, ProvideCredentials, Region,
21        RuntimeComponents, StalledStreamProtectionConfig,
22        interceptors::BeforeTransmitInterceptorContextMut,
23    },
24    error::{BoxError, SdkError},
25    operation::{copy_object::CopyObjectError, put_object::PutObjectError},
26    primitives::ByteStream,
27    types::{CompletedMultipartUpload, CompletedPart, Delete, Object, ObjectIdentifier},
28};
29use aws_smithy_runtime::client::retries::classifiers::HttpStatusCodeClassifier;
30use aws_smithy_types_convert::{
31    date_time::DateTimeExt as _, stream::PaginationStreamExt as _,
32};
33use bytes::Bytes;
34use chrono::{DateTime, Utc};
35use futures::{
36    Stream, StreamExt as _, TryStreamExt as _,
37    stream::{self, BoxStream, FuturesOrdered},
38};
39pub use icechunk_storage::s3_config::{
40    S3Credentials, S3CredentialsFetcher, S3Options, S3StaticCredentials,
41};
42use icechunk_storage::{
43    DeleteObjectsResult, GetModifiedResult, ListInfo, Settings, Storage, StorageError,
44    StorageErrorKind, StorageResult, VersionInfo, VersionedUpdateResult,
45    obj_not_found_res, obj_store_error, obj_store_error_res, other_error, sealed,
46    split_in_multiple_equal_requests, strip_quotes,
47};
48use icechunk_types::ICResultExt as _;
49use serde::{Deserialize, Serialize};
50use tokio::sync::OnceCell;
51use tokio_util::io::StreamReader;
52use tracing::{error, instrument};
53use typed_path::Utf8UnixPath;
54
55#[derive(Debug, Serialize, Deserialize)]
56pub struct S3Storage {
57    // config and credentials are stored so we are able to serialize and deserialize the struct
58    config: S3Options,
59    credentials: S3Credentials,
60    bucket: String,
61    prefix: String,
62    can_write: bool,
63    extra_read_headers: Vec<(String, String)>,
64    extra_write_headers: Vec<(String, String)>,
65    #[serde(skip)]
66    /// We need to use `OnceCell` to allow async initialization, because serde
67    /// does not support async cfunction calls from deserialization. This gives
68    /// us a way to lazily initialize the client.
69    client: OnceCell<Arc<Client>>,
70}
71
72impl fmt::Display for S3Storage {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        write!(
75            f,
76            "S3Storage(bucket={}, prefix={}, config={})",
77            self.bucket, self.prefix, self.config,
78        )
79    }
80}
81#[derive(Debug)]
82struct ExtraHeadersInterceptor {
83    extra_read_headers: Vec<(String, String)>,
84    extra_write_headers: Vec<(String, String)>,
85}
86
87impl Intercept for ExtraHeadersInterceptor {
88    fn name(&self) -> &'static str {
89        "ExtraHeaders"
90    }
91
92    fn modify_before_retry_loop(
93        &self,
94        context: &mut BeforeTransmitInterceptorContextMut<'_>,
95        _runtime_components: &RuntimeComponents,
96        _cfg: &mut ConfigBag,
97    ) -> Result<(), BoxError> {
98        let request = context.request_mut();
99        match request.method() {
100            "GET" | "HEAD" | "OPTIONS" | "TRACE" => {
101                for (k, v) in self.extra_read_headers.iter() {
102                    request.headers_mut().insert(k.clone(), v.clone());
103                }
104            }
105            _ => {
106                for (k, v) in self.extra_write_headers.iter() {
107                    request.headers_mut().insert(k.clone(), v.clone());
108                }
109            }
110        }
111        Ok(())
112    }
113}
114
115#[instrument(skip(credentials))]
116pub async fn mk_client(
117    config: &S3Options,
118    credentials: S3Credentials,
119    extra_read_headers: Vec<(String, String)>,
120    extra_write_headers: Vec<(String, String)>,
121    settings: &Settings,
122) -> Client {
123    let region = config
124        .region
125        .as_ref()
126        .map(|r| RegionProviderChain::first_try(Some(Region::new(r.clone()))))
127        .unwrap_or_else(RegionProviderChain::default_provider);
128
129    let endpoint = config.endpoint_url.clone();
130    let region = if endpoint.is_some() {
131        // GH793, the S3 SDK requires a region even though it may not make sense
132        // for S3-compatible object stores like Tigris or Ceph.
133        // So we set a fake region, using the `endpoint_url` as a sign that
134        // we are not talking to real S3
135        region.or_else(Region::new("region-was-not-set"))
136    } else {
137        region
138    };
139
140    #[expect(clippy::unwrap_used)]
141    let app_name = AppName::new(icechunk_types::user_agent()).unwrap();
142    let mut aws_config = aws_config::defaults(BehaviorVersion::v2026_01_12())
143        .region(region)
144        .app_name(app_name);
145
146    if let Some(endpoint) = endpoint {
147        aws_config = aws_config.endpoint_url(endpoint);
148    }
149
150    let stalled_stream = if config.network_stream_timeout_seconds == Some(0) {
151        StalledStreamProtectionConfig::disabled()
152    } else {
153        StalledStreamProtectionConfig::enabled()
154            .grace_period(Duration::from_secs(
155                config.network_stream_timeout_seconds.unwrap_or(10) as u64,
156            ))
157            .build()
158    };
159    aws_config = aws_config.stalled_stream_protection(stalled_stream);
160
161    match credentials {
162        S3Credentials::FromEnv => {}
163        S3Credentials::Anonymous => aws_config = aws_config.no_credentials(),
164        S3Credentials::Static(credentials) => {
165            aws_config =
166                aws_config.credentials_provider(aws_credential_types::Credentials::new(
167                    credentials.access_key_id,
168                    credentials.secret_access_key,
169                    credentials.session_token,
170                    credentials.expires_after.map(|e| e.into()),
171                    "user",
172                ));
173        }
174        S3Credentials::Refreshable(fetcher) => {
175            aws_config =
176                aws_config.credentials_provider(ProvideRefreshableCredentials(fetcher));
177        }
178    }
179
180    let retry_config = RetryConfig::standard()
181        .with_max_attempts(settings.retries().max_tries().get() as u32)
182        .with_initial_backoff(Duration::from_millis(
183            settings.retries().initial_backoff_ms() as u64,
184        ))
185        .with_max_backoff(Duration::from_millis(
186            settings.retries().max_backoff_ms() as u64
187        ));
188
189    if let Some(timeouts) = settings.timeouts() {
190        let mut timeout_builder = TimeoutConfig::builder();
191        if let Some(ms) = timeouts.connect_timeout_ms {
192            timeout_builder =
193                timeout_builder.connect_timeout(Duration::from_millis(ms as u64));
194        }
195        if let Some(ms) = timeouts.read_timeout_ms {
196            timeout_builder =
197                timeout_builder.read_timeout(Duration::from_millis(ms as u64));
198        }
199        if let Some(ms) = timeouts.operation_timeout_ms {
200            timeout_builder =
201                timeout_builder.operation_timeout(Duration::from_millis(ms as u64));
202        }
203        if let Some(ms) = timeouts.operation_attempt_timeout_ms {
204            timeout_builder = timeout_builder
205                .operation_attempt_timeout(Duration::from_millis(ms as u64));
206        }
207        aws_config = aws_config.timeout_config(timeout_builder.build());
208    }
209
210    let mut s3_builder = Builder::from(&aws_config.load().await)
211        .force_path_style(config.force_path_style)
212        .retry_config(retry_config);
213
214    // credentials may take a while to refresh, defaults are too strict
215    let id_cache = IdentityCache::lazy()
216        .load_timeout(Duration::from_secs(120))
217        .buffer_time(Duration::from_secs(120))
218        .build();
219
220    s3_builder = s3_builder.identity_cache(id_cache);
221
222    // Add retry classifier for HTTP 408 (Request Timeout) and 429 (Too Many Requests).
223    // The default HttpStatusCodeClassifier only retries on 500, 502, 503, 504
224    // Note R2 sends 429 for "slowdown" while S3 sends 503.
225    //   - R2: https://developers.cloudflare.com/r2/api/error-codes/
226    //   - S3: https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
227    // Tigris can occasionally respond with 499: "Client Closed Request"
228    static RETRY_CODES: &[u16] = &[408, 429, 499];
229    // This confusingly named `retry_classifier` method ends up calling
230    // `push_retry_classifier` after wrapping our custom classifier in `SharedRetryClassifier`.
231    // Ultimately, this is a push on to a `Vec<SharedRetryClassifier>`, and is thus additive
232    // to the existing default retry configuration.
233    // https://github.com/smithy-lang/smithy-rs/blob/cfcc39cf4b5bea665bba684b64bfca2b89e4bc73/rust-runtime/aws-smithy-runtime-api/src/client/runtime_components.rs#L755
234    // https://github.com/smithy-lang/smithy-rs/blob/cfcc39cf4b5bea665bba684b64bfca2b89e4bc73/rust-runtime/aws-smithy-runtime-api/src/client/runtime_components.rs#L370
235    s3_builder = s3_builder
236        .retry_classifier(HttpStatusCodeClassifier::new_from_codes(RETRY_CODES));
237
238    if !extra_read_headers.is_empty() || !extra_write_headers.is_empty() {
239        s3_builder = s3_builder.interceptor(ExtraHeadersInterceptor {
240            extra_read_headers,
241            extra_write_headers,
242        });
243    }
244
245    let config = s3_builder.build();
246
247    Client::from_conf(config)
248}
249
250fn stream2stream(
251    s: ByteStream,
252) -> Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>> {
253    let res = stream::try_unfold(s, move |mut stream| async move {
254        let next = stream.try_next().await?;
255        Ok(next.map(|bytes| (bytes, stream)))
256    });
257    Box::pin(res)
258}
259
260impl S3Storage {
261    pub fn new(
262        config: S3Options,
263        bucket: String,
264        prefix: Option<String>,
265        credentials: S3Credentials,
266        can_write: bool,
267        extra_read_headers: Vec<(String, String)>,
268        extra_write_headers: Vec<(String, String)>,
269    ) -> Result<S3Storage, StorageError> {
270        let client = OnceCell::new();
271        let prefix = prefix.unwrap_or_default();
272        let prefix = prefix.strip_suffix("/").unwrap_or(prefix.as_str()).to_string();
273        Ok(S3Storage {
274            client,
275            config,
276            bucket,
277            prefix,
278            credentials,
279            can_write,
280            extra_read_headers,
281            extra_write_headers,
282        })
283    }
284
285    /// Get the client, initializing it if it hasn't been initialized yet. This is necessary because the
286    /// client is not serializeable and must be initialized after deserialization. Under normal construction
287    /// the original client is returned immediately.
288    #[instrument(skip_all)]
289    pub async fn get_client(&self, settings: &Settings) -> &Arc<Client> {
290        self.client
291            .get_or_init(|| async {
292                Arc::new(
293                    mk_client(
294                        &self.config,
295                        self.credentials.clone(),
296                        self.extra_read_headers.clone(),
297                        self.extra_write_headers.clone(),
298                        settings,
299                    )
300                    .await,
301                )
302            })
303            .await
304    }
305
306    pub fn get_path_str(&self, file_prefix: &str, id: &str) -> StorageResult<String> {
307        let path = PathBuf::from_iter([self.prefix.as_str(), file_prefix, id]);
308        let path_str = path
309            .into_os_string()
310            .into_string()
311            .map_err(|s| StorageError::capture(StorageErrorKind::BadPrefix(s)))?;
312
313        Ok(path_str.replace("\\", "/"))
314    }
315
316    fn prefixed_path(&self, path: &str) -> String {
317        format!("{}/{path}", self.prefix)
318    }
319
320    async fn put_object_single<
321        I: IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
322    >(
323        &self,
324        settings: &Settings,
325        key: &str,
326        bytes: Bytes,
327        content_type: Option<impl Into<String>>,
328        metadata: I,
329        previous_version: Option<&VersionInfo>,
330    ) -> StorageResult<VersionedUpdateResult> {
331        let mut req = self
332            .get_client(settings)
333            .await
334            .put_object()
335            .bucket(self.bucket.clone())
336            .key(key)
337            .body(bytes.into());
338
339        if settings.unsafe_use_metadata() {
340            if let Some(ct) = content_type {
341                req = req.content_type(ct);
342            };
343
344            for (k, v) in metadata {
345                req = req.metadata(k, v);
346            }
347        }
348
349        if let Some(klass) = settings.storage_class() {
350            let klass = klass.as_str().into();
351            req = req.storage_class(klass);
352        }
353
354        if let Some(previous_version) = previous_version.as_ref() {
355            match (
356                previous_version.etag(),
357                settings.unsafe_use_conditional_create(),
358                settings.unsafe_use_conditional_update(),
359            ) {
360                (None, true, _) => req = req.if_none_match("*"),
361                (Some(etag), _, true) => req = req.if_match(strip_quotes(etag)),
362                (_, _, _) => {}
363            }
364        }
365
366        match req.send().await {
367            Ok(out) => {
368                let new_etag = out
369                    .e_tag()
370                    .ok_or(other_error("Object should have an etag".to_string()))?
371                    .to_string();
372                let new_version = VersionInfo::from_etag_only(new_etag);
373                Ok(VersionedUpdateResult::Updated { new_version })
374            }
375            // minio returns this
376            Err(SdkError::ServiceError(err)) => {
377                let code = err.err().meta().code().unwrap_or_default();
378                if code == "PreconditionFailed"
379                    || code == "ConditionalRequestConflict"
380                    // ConcurrentModification sent by Ceph Object Gateway
381                    || code == "ConcurrentModification"
382                {
383                    Ok(VersionedUpdateResult::NotOnLatestVersion)
384                } else {
385                    obj_store_error_res(SdkError::<PutObjectError>::ServiceError(err))
386                }
387            }
388            // S3 API documents this
389            Err(SdkError::ResponseError(err)) => {
390                let status = err.raw().status().as_u16();
391                // see https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#API_PutObject_RequestSyntax
392                if status == 409 || status == 412 {
393                    Ok(VersionedUpdateResult::NotOnLatestVersion)
394                } else {
395                    obj_store_error_res(SdkError::<PutObjectError>::ResponseError(err))
396                }
397            }
398            Err(err) => obj_store_error_res(err),
399        }
400    }
401
402    async fn put_object_multipart<
403        I: IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
404    >(
405        &self,
406        settings: &Settings,
407        key: &str,
408        bytes: &Bytes,
409        content_type: Option<impl Into<String>>,
410        metadata: I,
411        previous_version: Option<&VersionInfo>,
412    ) -> StorageResult<VersionedUpdateResult> {
413        let mut multi = self
414            .get_client(settings)
415            .await
416            .create_multipart_upload()
417            // We would like this, but it fails in MinIO
418            //.checksum_type(aws_sdk_s3::types::ChecksumType::FullObject)
419            //.checksum_algorithm(aws_sdk_s3::types::ChecksumAlgorithm::Crc64Nvme)
420            .bucket(self.bucket.clone())
421            .key(key);
422
423        if settings.unsafe_use_metadata() {
424            if let Some(ct) = content_type {
425                multi = multi.content_type(ct);
426            };
427            for (k, v) in metadata {
428                multi = multi.metadata(k, v);
429            }
430        }
431
432        if let Some(klass) = settings.storage_class() {
433            let klass = klass.as_str().into();
434            multi = multi.storage_class(klass);
435        }
436
437        let create_res = multi.send().await.capture_box()?;
438        let upload_id = create_res.upload_id().ok_or(other_error(
439            "No upload_id in create multipart upload result".to_string(),
440        ))?;
441
442        // We need to ensure all requests are the same size except for the last one, which can be
443        // smaller. This is a requirement for R2 compatibility
444        let parts = split_in_multiple_equal_requests(
445            &(0..bytes.len() as u64),
446            settings.concurrency().ideal_concurrent_request_size().get(),
447            settings.concurrency().max_concurrent_requests_for_object().get(),
448        )
449        .collect::<Vec<_>>();
450
451        let results = parts
452            .into_iter()
453            .enumerate()
454            .map(|(part_idx, range)| async move {
455                let body = bytes.slice(range.start as usize..range.end as usize).into();
456                let idx = part_idx as i32 + 1;
457                let req = self
458                    .get_client(settings)
459                    .await
460                    .upload_part()
461                    .upload_id(upload_id)
462                    .bucket(self.bucket.clone())
463                    .key(key)
464                    .part_number(idx)
465                    .body(body);
466
467                req.send().await.map(|res| (idx, res))
468            })
469            .collect::<FuturesOrdered<_>>();
470
471        let completed_parts = results
472            .map_ok(|(idx, res)| {
473                let etag = res.e_tag().unwrap_or("");
474                CompletedPart::builder()
475                    .e_tag(strip_quotes(etag))
476                    .part_number(idx)
477                    .build()
478            })
479            .try_collect::<Vec<_>>()
480            .await
481            .capture_box()?;
482
483        let completed_parts =
484            CompletedMultipartUpload::builder().set_parts(Some(completed_parts)).build();
485
486        let mut req = self
487            .get_client(settings)
488            .await
489            .complete_multipart_upload()
490            .bucket(self.bucket.clone())
491            .key(key)
492            .upload_id(upload_id)
493            //.checksum_type(aws_sdk_s3::types::ChecksumType::FullObject)
494            .multipart_upload(completed_parts);
495
496        if let Some(previous_version) = previous_version.as_ref() {
497            match (
498                previous_version.etag(),
499                settings.unsafe_use_conditional_create(),
500                settings.unsafe_use_conditional_update(),
501            ) {
502                (None, true, _) => req = req.if_none_match("*"),
503                (Some(etag), _, true) => req = req.if_match(strip_quotes(etag)),
504                (_, _, _) => {}
505            }
506        }
507
508        match req.send().await {
509            Ok(out) => {
510                let new_etag = out
511                    .e_tag()
512                    .ok_or(other_error("Object should have an etag".to_string()))?
513                    .to_string();
514                let new_version = VersionInfo::from_etag_only(new_etag);
515                Ok(VersionedUpdateResult::Updated { new_version })
516            }
517            // minio returns this
518            Err(SdkError::ServiceError(err)) => {
519                let code = err.err().meta().code().unwrap_or_default();
520                if code == "PreconditionFailed"
521                    || code == "ConditionalRequestConflict"
522                    // ConcurrentModification sent by Ceph Object Gateway
523                    || code == "ConcurrentModification"
524                {
525                    Ok(VersionedUpdateResult::NotOnLatestVersion)
526                } else {
527                    obj_store_error_res(SdkError::ServiceError(err))
528                }
529            }
530            // S3 API documents this
531            Err(SdkError::ResponseError(err)) => {
532                let status = err.raw().status().as_u16();
533                // see https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#API_PutObject_RequestSyntax
534                if status == 409 || status == 412 {
535                    Ok(VersionedUpdateResult::NotOnLatestVersion)
536                } else {
537                    obj_store_error_res(SdkError::<PutObjectError>::ResponseError(err))
538                }
539            }
540            Err(err) => obj_store_error_res(err),
541        }
542    }
543}
544
545pub fn range_to_header(range: &Range<u64>) -> String {
546    format!("bytes={}-{}", range.start, range.end - 1)
547}
548
549impl sealed::Sealed for S3Storage {}
550
551#[async_trait]
552#[typetag::serde]
553impl Storage for S3Storage {
554    async fn can_write(&self) -> StorageResult<bool> {
555        Ok(self.can_write)
556    }
557
558    async fn put_object(
559        &self,
560        settings: &Settings,
561        path: &str,
562        bytes: Bytes,
563        content_type: Option<&str>,
564        metadata: Vec<(String, String)>,
565        previous_version: Option<&VersionInfo>,
566    ) -> StorageResult<VersionedUpdateResult> {
567        let path = self.prefixed_path(path);
568        if bytes.len() >= settings.minimum_size_for_multipart_upload() as usize {
569            self.put_object_multipart(
570                settings,
571                path.as_str(),
572                &bytes,
573                content_type,
574                metadata,
575                previous_version,
576            )
577            .await
578        } else {
579            self.put_object_single(
580                settings,
581                path.as_str(),
582                bytes,
583                content_type,
584                metadata,
585                previous_version,
586            )
587            .await
588        }
589    }
590
591    async fn copy_object(
592        &self,
593        settings: &Settings,
594        from: &str,
595        to: &str,
596        content_type: Option<&str>,
597        version: &VersionInfo,
598    ) -> StorageResult<VersionedUpdateResult> {
599        let from = format!("{}/{}", self.bucket, self.prefixed_path(from));
600        let to = self.prefixed_path(to);
601        let mut req = self
602            .get_client(settings)
603            .await
604            .copy_object()
605            .bucket(self.bucket.clone())
606            .key(to)
607            .copy_source(from);
608        if settings.unsafe_use_conditional_update()
609            && let Some(etag) = version.etag()
610        {
611            req = req.copy_source_if_match(strip_quotes(etag));
612        }
613        if let Some(klass) = settings.storage_class() {
614            let klass = klass.as_str().into();
615            req = req.storage_class(klass);
616        }
617        if let Some(ct) = content_type {
618            req = req.content_type(ct);
619        }
620        if self.config.requester_pays {
621            req = req.request_payer(aws_sdk_s3::types::RequestPayer::Requester);
622        }
623        match req.send().await {
624            Ok(_) => Ok(VersionedUpdateResult::Updated { new_version: version.clone() }),
625            Err(SdkError::ServiceError(err)) => {
626                let code = err.err().meta().code().unwrap_or_default();
627                if code == "PreconditionFailed"
628                    || code == "ConditionalRequestConflict"
629                    // ConcurrentModification sent by Ceph Object Gateway
630                    || code == "ConcurrentModification"
631                {
632                    Ok(VersionedUpdateResult::NotOnLatestVersion)
633                } else {
634                    obj_store_error_res(SdkError::<CopyObjectError>::ServiceError(err))
635                }
636            }
637            // S3 API documents this
638            Err(SdkError::ResponseError(err)) => {
639                let status = err.raw().status().as_u16();
640                // see https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#API_PutObject_RequestSyntax
641                if status == 409 || status == 412 {
642                    Ok(VersionedUpdateResult::NotOnLatestVersion)
643                } else {
644                    obj_store_error_res(SdkError::<PutObjectError>::ResponseError(err))
645                }
646            }
647            Err(sdk_err) => match sdk_err.as_service_error() {
648                Some(_)
649                    if sdk_err
650                        .raw_response()
651                        .is_some_and(|x| x.status().as_u16() == 404) =>
652                {
653                    // needed for Cloudflare R2 public bucket URLs
654                    // if object doesn't exist we get a 404 that isn't parsed by the AWS SDK
655                    // into anything useful. So we need to parse the raw response, and match
656                    // the status code.
657                    obj_not_found_res()
658                }
659                _ => obj_store_error_res(sdk_err),
660            },
661        }
662    }
663
664    #[instrument(skip(self, settings))]
665    async fn list_objects<'a>(
666        &'a self,
667        settings: &Settings,
668        prefix: &str,
669    ) -> StorageResult<BoxStream<'a, StorageResult<ListInfo<String>>>> {
670        let prefix = format!("{}/{}", self.prefix, prefix).replace("//", "/");
671        let mut req = self
672            .get_client(settings)
673            .await
674            .list_objects_v2()
675            .bucket(self.bucket.clone())
676            .prefix(prefix.clone());
677
678        if self.config.requester_pays {
679            req = req.request_payer(aws_sdk_s3::types::RequestPayer::Requester);
680        }
681
682        let stream = req
683            .into_paginator()
684            .send()
685            .into_stream_03x()
686            .map_err(obj_store_error)
687            .try_filter_map(|page| {
688                let contents = page.contents.map(|cont| stream::iter(cont).map(Ok));
689                ready(Ok(contents))
690            })
691            .try_flatten()
692            .and_then(move |object| {
693                let prefix = prefix.clone();
694                ready(object_to_list_info(prefix.as_str(), &object))
695            });
696        Ok(stream.boxed())
697    }
698
699    #[instrument(skip(self, batch))]
700    async fn delete_batch(
701        &self,
702        settings: &Settings,
703        prefix: &str,
704        batch: Vec<(String, u64)>,
705    ) -> StorageResult<DeleteObjectsResult> {
706        let mut sizes = HashMap::new();
707        let mut ids = Vec::new();
708        for (id, size) in batch.into_iter() {
709            if let Ok(key) = self.get_path_str(prefix, id.as_str())
710                && let Ok(ident) = ObjectIdentifier::builder().key(key.clone()).build()
711            {
712                ids.push(ident);
713                sizes.insert(key, size);
714            }
715        }
716
717        let delete = Delete::builder()
718            .set_objects(Some(ids))
719            .build()
720            .map_err(|e| other_error(e.to_string()))?;
721
722        let mut req = self
723            .get_client(settings)
724            .await
725            .delete_objects()
726            .bucket(self.bucket.clone())
727            .delete(delete);
728
729        if self.config.requester_pays {
730            req = req.request_payer(aws_sdk_s3::types::RequestPayer::Requester);
731        }
732
733        let res = req.send().await.capture_box()?;
734
735        if let Some(err) = res.errors.as_ref().and_then(|e| e.first()) {
736            tracing::error!(
737                error = ?err,
738                "Errors deleting objects",
739            );
740        }
741
742        let mut result = DeleteObjectsResult::default();
743        for deleted in res.deleted() {
744            if let Some(key) = deleted.key() {
745                let size = sizes.get(key).unwrap_or(&0);
746                result.deleted_bytes += *size;
747                result.deleted_objects += 1;
748            } else {
749                tracing::error!("Deleted object without key");
750            }
751        }
752        Ok(result)
753    }
754
755    #[instrument(skip(self, settings))]
756    async fn get_object_last_modified(
757        &self,
758        path: &str,
759        settings: &Settings,
760    ) -> StorageResult<DateTime<Utc>> {
761        let key = self.prefixed_path(path);
762        let mut req = self
763            .get_client(settings)
764            .await
765            .head_object()
766            .bucket(self.bucket.clone())
767            .key(key);
768
769        if self.config.requester_pays {
770            req = req.request_payer(aws_sdk_s3::types::RequestPayer::Requester);
771        }
772
773        let res = req.send().await.capture_box()?;
774
775        let res = res
776            .last_modified
777            .ok_or(other_error("Object has no last_modified field".to_string()))?;
778        let res = res
779            .to_chrono_utc()
780            .map_err(|_| other_error("Invalid metadata timestamp".to_string()))?;
781
782        Ok(res)
783    }
784
785    #[instrument(skip(self, settings))]
786    async fn get_object_conditional(
787        &self,
788        settings: &Settings,
789        path: &str,
790        previous_version: Option<&VersionInfo>,
791    ) -> StorageResult<GetModifiedResult> {
792        match self
793            .get_object_range_conditional(settings, path, None, previous_version)
794            .await
795        {
796            Ok(Some((stream, new_version))) => {
797                let reader = StreamReader::new(stream.map_err(std::io::Error::other));
798                Ok(GetModifiedResult::Modified { data: Box::pin(reader), new_version })
799            }
800            Ok(None) => Ok(GetModifiedResult::OnLatestVersion),
801            Err(e) => Err(e),
802        }
803    }
804
805    async fn get_object_range(
806        &self,
807        settings: &Settings,
808        path: &str,
809        range: Option<&Range<u64>>,
810    ) -> StorageResult<(
811        Pin<Box<dyn Stream<Item = Result<Bytes, StorageError>> + Send>>,
812        VersionInfo,
813    )> {
814        self.get_object_range_conditional(settings, path, range, None).await.map(|v| {
815            // If we got a result, then we can unwrap safely here:
816            // Errors would be in the other branch, and None is only expected
817            // if previous_version was passed in function call, but we set it to None
818            #[expect(clippy::expect_used)]
819            v.expect("Logic bug in get_object_range_conditional, should not get None")
820        })
821    }
822}
823
824impl S3Storage {
825    async fn get_object_range_conditional(
826        &self,
827        settings: &Settings,
828        path: &str,
829        range: Option<&Range<u64>>,
830        previous_version: Option<&VersionInfo>,
831    ) -> StorageResult<
832        Option<(
833            Pin<Box<dyn Stream<Item = Result<Bytes, StorageError>> + Send>>,
834            VersionInfo,
835        )>,
836    > {
837        let client = self.get_client(settings).await;
838        let bucket = self.bucket.clone();
839        let key = self.prefixed_path(path);
840
841        let mut req = client.get_object().bucket(bucket).key(key);
842
843        if let Some(range) = range {
844            req = req.range(range_to_header(range));
845        }
846
847        if self.config.requester_pays {
848            req = req.request_payer(aws_sdk_s3::types::RequestPayer::Requester);
849        }
850
851        if let Some(previous_version) = previous_version.as_ref()
852            && let Some(etag) = previous_version.etag()
853        {
854            req = req.if_none_match(strip_quotes(etag));
855        };
856
857        match req.send().await {
858            Ok(output) => match output.e_tag {
859                Some(etag) => {
860                    let stream = stream2stream(output.body)
861                        .map_err(|e| StorageError::capture(e.into()));
862                    Ok(Some((Box::pin(stream), VersionInfo::from_etag_only(etag))))
863                }
864                None => Err(other_error("Object should have an etag".to_string())),
865            },
866            Err(sdk_err) => {
867                // aws_sdk_s3 on R2 can return a response error (checksum mismatch)
868                // when status 304 (Not Modified) happens, so we need to check
869                // the if it happens here, before other regular checks when
870                // the response is well-formed.
871                if let SdkError::ResponseError(ref e) = sdk_err
872                    && e.raw().status().as_u16() == 304
873                {
874                    return Ok(None);
875                };
876
877                match sdk_err.as_service_error() {
878                    Some(e) if e.is_no_such_key() => {
879                        obj_not_found_res()
880                    }
881                    Some(_)
882                        if sdk_err
883                            .raw_response()
884                            .is_some_and(|x| x.status().as_u16() == 404) =>
885                    {
886                        // needed for Cloudflare R2 public bucket URLs
887                        // if object doesn't exist we get a 404 that isn't parsed by the AWS SDK
888                        // into anything useful. So we need to parse the raw response, and match
889                        // the status code.
890                        obj_not_found_res()
891                    }
892                    Some(_)
893                        // aws_sdk_s3 doesn't return an error when
894                        // status 304 (Not Modified) happens, so
895                        // check the http status code here and
896                        // return None to make it easy to catch
897                        // downstream
898                        if sdk_err
899                            .raw_response()
900                            .is_some_and(|x| x.status().as_u16() == 304) =>
901                    {
902                        Ok(None)
903                    }
904                    _ => obj_store_error_res(sdk_err),
905                }
906            }
907        }
908    }
909}
910
911fn object_to_list_info(prefix: &str, object: &Object) -> StorageResult<ListInfo<String>> {
912    let inner = || {
913        let key = object.key()?;
914        let last_modified = object.last_modified()?;
915        let created_at = last_modified.to_chrono_utc().ok()?;
916        let prefix = Utf8UnixPath::new(prefix);
917        let id = Utf8UnixPath::new(key).strip_prefix(prefix).ok()?.to_string();
918        let size_bytes = object.size.unwrap_or(0) as u64;
919        Some(ListInfo { id, created_at, size_bytes })
920    };
921    inner()
922        .ok_or_else(|| StorageError::capture(StorageErrorKind::BadPrefix(prefix.into())))
923}
924
925#[derive(Debug)]
926struct ProvideRefreshableCredentials(Arc<dyn S3CredentialsFetcher>);
927
928impl ProvideCredentials for ProvideRefreshableCredentials {
929    fn provide_credentials<'a>(
930        &'a self,
931    ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
932    where
933        Self: 'a,
934    {
935        aws_credential_types::provider::future::ProvideCredentials::new(self.provide())
936    }
937}
938
939impl ProvideRefreshableCredentials {
940    async fn provide(
941        &self,
942    ) -> Result<aws_credential_types::Credentials, CredentialsError> {
943        let creds = self
944            .0
945            .get()
946            .await
947            .inspect_err(|err| error!(error = err, "Cannot load credentials"))
948            .map_err(CredentialsError::not_loaded)?;
949        let creds = aws_credential_types::Credentials::new(
950            creds.access_key_id,
951            creds.secret_access_key,
952            creds.session_token,
953            creds.expires_after.map(|e| e.into()),
954            "user",
955        );
956        Ok(creds)
957    }
958}
959
960// Factory functions
961
962pub fn new_s3_storage(
963    config: S3Options,
964    bucket: String,
965    prefix: Option<String>,
966    credentials: Option<S3Credentials>,
967) -> StorageResult<Arc<dyn Storage + Send + Sync>> {
968    if let Some(endpoint) = &config.endpoint_url
969        && (endpoint.contains("fly.storage.tigris.dev")
970            || endpoint.contains("t3.storage.dev"))
971    {
972        return Err(other_error(
973            "Tigris Storage is not S3 compatible, use the Tigris specific constructor instead"
974                .to_string(),
975        ));
976    }
977
978    let st = S3Storage::new(
979        config,
980        bucket,
981        prefix,
982        credentials.unwrap_or(S3Credentials::FromEnv),
983        true,
984        Vec::new(),
985        Vec::new(),
986    )?;
987    Ok(Arc::new(st))
988}
989
990pub fn new_r2_storage(
991    config: S3Options,
992    bucket: Option<String>,
993    prefix: Option<String>,
994    account_id: Option<String>,
995    credentials: Option<S3Credentials>,
996) -> StorageResult<Arc<dyn Storage + Send + Sync>> {
997    let (bucket, prefix) = match (bucket, prefix) {
998        (Some(bucket), Some(prefix)) => (bucket, Some(prefix)),
999        (None, Some(prefix)) => match prefix.split_once("/") {
1000            Some((bucket, prefix)) => (bucket.to_string(), Some(prefix.to_string())),
1001            None => (prefix, None),
1002        },
1003        (Some(bucket), None) => (bucket, None),
1004        (None, None) => {
1005            return Err(StorageErrorKind::R2ConfigurationError(
1006                "Either bucket or prefix must be provided.".to_string(),
1007            ))
1008            .capture();
1009        }
1010    };
1011
1012    if config.endpoint_url.is_none() && account_id.is_none() {
1013        return Err(StorageErrorKind::R2ConfigurationError(
1014            "Either endpoint_url or account_id must be provided.".to_string(),
1015        ))
1016        .capture();
1017    }
1018
1019    let config = S3Options {
1020        region: config.region.or(Some("auto".to_string())),
1021        endpoint_url: config
1022            .endpoint_url
1023            .or(account_id.map(|x| format!("https://{x}.r2.cloudflarestorage.com"))),
1024        force_path_style: true,
1025        ..config
1026    };
1027    let st = S3Storage::new(
1028        config,
1029        bucket,
1030        prefix,
1031        credentials.unwrap_or(S3Credentials::FromEnv),
1032        true,
1033        Vec::new(),
1034        Vec::new(),
1035    )?;
1036    Ok(Arc::new(st))
1037}
1038
1039pub fn new_tigris_storage(
1040    config: S3Options,
1041    bucket: String,
1042    prefix: Option<String>,
1043    credentials: Option<S3Credentials>,
1044    use_weak_consistency: bool,
1045) -> StorageResult<Arc<dyn Storage + Send + Sync>> {
1046    let config = S3Options {
1047        endpoint_url: Some(
1048            config.endpoint_url.unwrap_or("https://t3.storage.dev".to_string()),
1049        ),
1050        ..config
1051    };
1052    let mut extra_write_headers = Vec::with_capacity(2);
1053    let mut extra_read_headers = Vec::with_capacity(3);
1054
1055    if !use_weak_consistency {
1056        // TODO: Tigris will need more than this to offer good eventually consistent behavior
1057        // For example: we should use no-cache for branches and config file
1058        if let Some(region) = config.region.as_ref() {
1059            extra_write_headers.push(("X-Tigris-Regions".to_string(), region.clone()));
1060            extra_write_headers
1061                .push(("X-Tigris-Consistent".to_string(), "true".to_string()));
1062
1063            extra_read_headers.push(("X-Tigris-Regions".to_string(), region.clone()));
1064            extra_read_headers
1065                .push(("Cache-Control".to_string(), "no-cache".to_string()));
1066            extra_read_headers
1067                .push(("X-Tigris-Consistent".to_string(), "true".to_string()));
1068        } else {
1069            return Err(other_error("Tigris storage requires a region to provide full consistency. Either set the region for the bucket or use the read-only, eventually consistent storage by passing `use_weak_consistency=True` (experts only)".to_string()));
1070        }
1071    }
1072    let st = S3Storage::new(
1073        config,
1074        bucket,
1075        prefix,
1076        credentials.unwrap_or(S3Credentials::FromEnv),
1077        !use_weak_consistency, // notice eventually consistent storage can't do writes
1078        extra_read_headers,
1079        extra_write_headers,
1080    )?;
1081    Ok(Arc::new(st))
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086    use icechunk_macros::tokio_test;
1087
1088    use super::*;
1089
1090    #[tokio_test]
1091    async fn test_serialize_s3_storage() {
1092        let config = S3Options {
1093            region: Some("us-west-2".to_string()),
1094            endpoint_url: Some("http://localhost:9000".to_string()),
1095            allow_http: true,
1096            anonymous: false,
1097            force_path_style: false,
1098            network_stream_timeout_seconds: None,
1099            requester_pays: false,
1100        };
1101        let credentials = S3Credentials::Static(S3StaticCredentials {
1102            access_key_id: "access_key_id".to_string(),
1103            secret_access_key: "secret_access_key".to_string(),
1104            session_token: Some("session_token".to_string()),
1105            expires_after: None,
1106        });
1107        let storage = S3Storage::new(
1108            config,
1109            "bucket".to_string(),
1110            Some("prefix".to_string()),
1111            credentials,
1112            true,
1113            Vec::new(),
1114            Vec::new(),
1115        )
1116        .unwrap();
1117
1118        let serialized = serde_json::to_string(&storage).unwrap();
1119
1120        assert_eq!(
1121            serialized,
1122            r#"{"config":{"region":"us-west-2","endpoint_url":"http://localhost:9000","anonymous":false,"allow_http":true,"force_path_style":false,"network_stream_timeout_seconds":null,"requester_pays":false},"credentials":{"s3_credential_type":"static","access_key_id":"access_key_id","secret_access_key":"secret_access_key","session_token":"session_token","expires_after":null},"bucket":"bucket","prefix":"prefix","can_write":true,"extra_read_headers":[],"extra_write_headers":[]}"#
1123        );
1124
1125        let deserialized: S3Storage = serde_json::from_str(&serialized).unwrap();
1126        assert_eq!(storage.config, deserialized.config);
1127    }
1128}