freighter_storage/
s3_client.rs

1//! Storage backend implementation for working with bucketing solutions compatible with the S3 API.
2//!
3//! This is currently built on the [`aws_sdk_s3`] crate.
4//!
5//! This client should do connection pooling, however the HTTP connection pool parameters not not
6//! well tuned at the moment.
7//!
8//! One performance limitation of this implementation is that the current simplistic API of this
9//! crate ([`StorageProvider`]) does not allow for streamed uploads or downloads, increasing both
10//! memory usage and latency, as the entire body of data must be received before transmission can
11//! start.
12//! This means that when uploading, the server must wait for and store the entirety of the HTTP
13//! request before it can begin uploading the body to the bucket, and, when downloading, the
14//! entirety of the response from the bucket must be received before transmission of the crate
15//! bytes to the eyeball.
16//! It is perfectly possible to perform both streaming uploads and streaming downloads, however
17//! doing so has been left to the future.
18
19use anyhow::{bail, Context};
20use async_trait::async_trait;
21use aws_credential_types::Credentials;
22use aws_sdk_s3::config::{AppName, BehaviorVersion, Config, Region};
23use aws_sdk_s3::error::SdkError;
24use aws_sdk_s3::primitives::ByteStream;
25use bytes::Bytes;
26use freighter_api_types::storage::{
27    Metadata, MetadataStorageProvider, StorageError, StorageProvider, StorageResult,
28};
29use std::collections::HashMap;
30
31/// Storage client for working with S3-compatible APIs.
32///
33/// See [the module-level docs](super::s3_client) for more information.
34#[derive(Clone)]
35pub struct S3StorageProvider {
36    client: aws_sdk_s3::Client,
37    bucket_name: String,
38}
39
40impl S3StorageProvider {
41    /// Construct a new client, returning an error if the information could not be used to
42    /// communicate with a valid bucket.
43    #[must_use]
44    pub fn new(
45        bucket_name: &str,
46        endpoint_url: &str,
47        region: &str,
48        access_key: &str,
49        secret_key: &str,
50    ) -> Self {
51        let config = Config::builder()
52            .behavior_version(BehaviorVersion::v2024_03_28())
53            .region(Region::new(region.to_string()))
54            .endpoint_url(endpoint_url)
55            .credentials_provider(Credentials::from_keys(access_key, secret_key, None))
56            .app_name(AppName::new("freighter".to_string()).unwrap())
57            .build();
58
59        let bucket_name = bucket_name.to_string();
60        let client = aws_sdk_s3::Client::from_conf(config);
61
62        Self {
63            client,
64            bucket_name,
65        }
66    }
67
68    async fn pull_object(&self, path: String) -> StorageResult<Bytes> {
69        let resp = self
70            .client
71            .get_object()
72            .bucket(self.bucket_name.clone())
73            .key(path)
74            .send()
75            .await;
76
77        // on 404, we return a different error variant
78        if let Err(SdkError::ServiceError(e)) = &resp {
79            if e.err().is_no_such_key() {
80                return Err(StorageError::NotFound);
81            }
82        }
83
84        let data = resp.context("Storage response error")?;
85
86        let crate_bytes = data
87            .body
88            .collect()
89            .await
90            .context("Error while retrieving body")?
91            .into_bytes();
92
93        Ok(crate_bytes)
94    }
95
96    async fn put_object(
97        &self,
98        path: String,
99        file_bytes: ByteStream,
100        meta: Metadata,
101    ) -> StorageResult<()> {
102        let mut obj = self
103            .client
104            .put_object()
105            .bucket(self.bucket_name.clone())
106            .key(path)
107            .body(file_bytes);
108        if let Some(len) = meta.content_length {
109            obj = obj.content_length(len as _);
110        }
111        if let Some(ty) = meta.content_type {
112            obj = obj.content_type(ty);
113        }
114        if let Some(cc) = meta.cache_control {
115            obj = obj.cache_control(cc);
116        }
117        if let Some(sha) = meta.sha256 {
118            use base64::{engine, Engine as _};
119            obj = obj.checksum_sha256(engine::general_purpose::STANDARD.encode(sha));
120        }
121        for (k, v) in meta.kv {
122            obj = obj.metadata(k, v);
123        }
124        obj.send().await.context("Failed to put file")?;
125        Ok(())
126    }
127
128    async fn delete_object(&self, path: String) -> StorageResult<()> {
129        self.client
130            .delete_object()
131            .bucket(self.bucket_name.clone())
132            .key(path)
133            .send()
134            .await
135            .context("Failed to delete file")?;
136        Ok(())
137    }
138
139    // check that we can actually contact the bucket
140    async fn healthcheck(&self, path: String) -> Result<(), anyhow::Error> {
141        for _ in 0..3 {
142            // try and pull the object initially to make sure the health file is there
143            match self.pull_object(path.clone()).await {
144                Ok(obj) => {
145                    if obj.as_ref() == b"ok" {
146                        return Ok(());
147                    }
148
149                    // this case will not attempt to repair the data - if corruption is occurring
150                    // healthchecks should continue to fail until manual intervention occurs
151                    bail!("wrong data");
152                }
153                Err(e) => {
154                    if matches!(e, StorageError::NotFound) {
155                        // if the key isn't there (because you just stood the service up), put it
156                        // there and retry the loop
157                        self.put_object(
158                            path.clone(),
159                            Bytes::from_static(b"ok").into(),
160                            Metadata {
161                                content_type: Some("text/plain"),
162                                ..Metadata::default()
163                            },
164                        )
165                        .await?;
166
167                        continue;
168                    }
169
170                    // if we failed to contact the bucket or anything else happened other than not
171                    // seeing the specific object, fail the check
172                    bail!(e);
173                }
174            }
175        }
176
177        // this case should never reasonably happen with most buckets, and should be extremely
178        // transient and only happen briefly when initially standing up the service with EC stores
179        bail!("successfully put object but saw NotFound on pull 3 times");
180    }
181}
182
183#[async_trait]
184impl MetadataStorageProvider for S3StorageProvider {
185    async fn pull_file(&self, path: &str) -> StorageResult<Bytes> {
186        self.pull_object(path.into()).await
187    }
188
189    async fn put_file(&self, path: &str, file_bytes: Bytes, meta: Metadata) -> StorageResult<()> {
190        self.put_object(path.into(), file_bytes.into(), meta).await
191    }
192
193    async fn create_or_append_file(
194        &self,
195        path: &str,
196        file_bytes: Bytes,
197        meta: Metadata,
198    ) -> StorageResult<()> {
199        let mut all_data = match self.pull_object(path.into()).await {
200            Ok(data) => Vec::from(data),
201            Err(StorageError::NotFound) => Vec::new(),
202            Err(e) => return Err(e),
203        };
204        all_data.append(&mut Vec::from(file_bytes));
205        self.put_object(path.into(), all_data.into(), meta).await
206    }
207
208    async fn delete_file(&self, path: &str) -> StorageResult<()> {
209        self.delete_object(path.into()).await
210    }
211
212    async fn healthcheck(&self) -> anyhow::Result<()> {
213        self.healthcheck(".healthcheck-meta".into()).await
214    }
215}
216
217#[async_trait]
218impl StorageProvider for S3StorageProvider {
219    async fn pull_crate(&self, name: &str, version: &str) -> StorageResult<Bytes> {
220        let path = construct_path(name, version);
221        self.pull_object(path).await
222    }
223
224    async fn put_crate(
225        &self,
226        name: &str,
227        version: &str,
228        crate_bytes: Bytes,
229        sha256: [u8; 32],
230    ) -> StorageResult<()> {
231        let len = crate_bytes.len();
232        let path = construct_path(name, version);
233        self.put_object(
234            path,
235            crate_bytes.into(),
236            Metadata {
237                content_type: Some("application/x-tar"),
238                content_length: Some(len),
239                cache_control: Some("public,immutable".into()),
240                content_encoding: None,
241                sha256: Some(sha256),
242                kv: HashMap::new(),
243            },
244        )
245        .await
246    }
247
248    async fn delete_crate(&self, name: &str, version: &str) -> StorageResult<()> {
249        let path = construct_path(name, version);
250        self.delete_object(path).await
251    }
252
253    async fn healthcheck(&self) -> anyhow::Result<()> {
254        self.healthcheck(".healthcheck-data".into()).await
255    }
256}
257
258#[inline(always)]
259fn construct_path(name: &str, version: &str) -> String {
260    format!("crates/{name}-{version}.crate")
261}