freighter_storage/
s3_client.rs1use anyhow::{bail, Context};
20use async_trait::async_trait;
21use aws_credential_types::Credentials;
22use aws_sdk_s3::config::{AppName, BehaviorVersion, Config, Region};
23use aws_sdk_s3::error::SdkError;
24use aws_sdk_s3::primitives::ByteStream;
25use bytes::Bytes;
26use freighter_api_types::storage::{
27 Metadata, MetadataStorageProvider, StorageError, StorageProvider, StorageResult,
28};
29use std::collections::HashMap;
30
31#[derive(Clone)]
35pub struct S3StorageProvider {
36 client: aws_sdk_s3::Client,
37 bucket_name: String,
38}
39
40impl S3StorageProvider {
41 #[must_use]
44 pub fn new(
45 bucket_name: &str,
46 endpoint_url: &str,
47 region: &str,
48 access_key: &str,
49 secret_key: &str,
50 ) -> Self {
51 let config = Config::builder()
52 .behavior_version(BehaviorVersion::v2024_03_28())
53 .region(Region::new(region.to_string()))
54 .endpoint_url(endpoint_url)
55 .credentials_provider(Credentials::from_keys(access_key, secret_key, None))
56 .app_name(AppName::new("freighter".to_string()).unwrap())
57 .build();
58
59 let bucket_name = bucket_name.to_string();
60 let client = aws_sdk_s3::Client::from_conf(config);
61
62 Self {
63 client,
64 bucket_name,
65 }
66 }
67
68 async fn pull_object(&self, path: String) -> StorageResult<Bytes> {
69 let resp = self
70 .client
71 .get_object()
72 .bucket(self.bucket_name.clone())
73 .key(path)
74 .send()
75 .await;
76
77 if let Err(SdkError::ServiceError(e)) = &resp {
79 if e.err().is_no_such_key() {
80 return Err(StorageError::NotFound);
81 }
82 }
83
84 let data = resp.context("Storage response error")?;
85
86 let crate_bytes = data
87 .body
88 .collect()
89 .await
90 .context("Error while retrieving body")?
91 .into_bytes();
92
93 Ok(crate_bytes)
94 }
95
96 async fn put_object(
97 &self,
98 path: String,
99 file_bytes: ByteStream,
100 meta: Metadata,
101 ) -> StorageResult<()> {
102 let mut obj = self
103 .client
104 .put_object()
105 .bucket(self.bucket_name.clone())
106 .key(path)
107 .body(file_bytes);
108 if let Some(len) = meta.content_length {
109 obj = obj.content_length(len as _);
110 }
111 if let Some(ty) = meta.content_type {
112 obj = obj.content_type(ty);
113 }
114 if let Some(cc) = meta.cache_control {
115 obj = obj.cache_control(cc);
116 }
117 if let Some(sha) = meta.sha256 {
118 use base64::{engine, Engine as _};
119 obj = obj.checksum_sha256(engine::general_purpose::STANDARD.encode(sha));
120 }
121 for (k, v) in meta.kv {
122 obj = obj.metadata(k, v);
123 }
124 obj.send().await.context("Failed to put file")?;
125 Ok(())
126 }
127
128 async fn delete_object(&self, path: String) -> StorageResult<()> {
129 self.client
130 .delete_object()
131 .bucket(self.bucket_name.clone())
132 .key(path)
133 .send()
134 .await
135 .context("Failed to delete file")?;
136 Ok(())
137 }
138
139 async fn healthcheck(&self, path: String) -> Result<(), anyhow::Error> {
141 for _ in 0..3 {
142 match self.pull_object(path.clone()).await {
144 Ok(obj) => {
145 if obj.as_ref() == b"ok" {
146 return Ok(());
147 }
148
149 bail!("wrong data");
152 }
153 Err(e) => {
154 if matches!(e, StorageError::NotFound) {
155 self.put_object(
158 path.clone(),
159 Bytes::from_static(b"ok").into(),
160 Metadata {
161 content_type: Some("text/plain"),
162 ..Metadata::default()
163 },
164 )
165 .await?;
166
167 continue;
168 }
169
170 bail!(e);
173 }
174 }
175 }
176
177 bail!("successfully put object but saw NotFound on pull 3 times");
180 }
181}
182
183#[async_trait]
184impl MetadataStorageProvider for S3StorageProvider {
185 async fn pull_file(&self, path: &str) -> StorageResult<Bytes> {
186 self.pull_object(path.into()).await
187 }
188
189 async fn put_file(&self, path: &str, file_bytes: Bytes, meta: Metadata) -> StorageResult<()> {
190 self.put_object(path.into(), file_bytes.into(), meta).await
191 }
192
193 async fn create_or_append_file(
194 &self,
195 path: &str,
196 file_bytes: Bytes,
197 meta: Metadata,
198 ) -> StorageResult<()> {
199 let mut all_data = match self.pull_object(path.into()).await {
200 Ok(data) => Vec::from(data),
201 Err(StorageError::NotFound) => Vec::new(),
202 Err(e) => return Err(e),
203 };
204 all_data.append(&mut Vec::from(file_bytes));
205 self.put_object(path.into(), all_data.into(), meta).await
206 }
207
208 async fn delete_file(&self, path: &str) -> StorageResult<()> {
209 self.delete_object(path.into()).await
210 }
211
212 async fn healthcheck(&self) -> anyhow::Result<()> {
213 self.healthcheck(".healthcheck-meta".into()).await
214 }
215}
216
217#[async_trait]
218impl StorageProvider for S3StorageProvider {
219 async fn pull_crate(&self, name: &str, version: &str) -> StorageResult<Bytes> {
220 let path = construct_path(name, version);
221 self.pull_object(path).await
222 }
223
224 async fn put_crate(
225 &self,
226 name: &str,
227 version: &str,
228 crate_bytes: Bytes,
229 sha256: [u8; 32],
230 ) -> StorageResult<()> {
231 let len = crate_bytes.len();
232 let path = construct_path(name, version);
233 self.put_object(
234 path,
235 crate_bytes.into(),
236 Metadata {
237 content_type: Some("application/x-tar"),
238 content_length: Some(len),
239 cache_control: Some("public,immutable".into()),
240 content_encoding: None,
241 sha256: Some(sha256),
242 kv: HashMap::new(),
243 },
244 )
245 .await
246 }
247
248 async fn delete_crate(&self, name: &str, version: &str) -> StorageResult<()> {
249 let path = construct_path(name, version);
250 self.delete_object(path).await
251 }
252
253 async fn healthcheck(&self) -> anyhow::Result<()> {
254 self.healthcheck(".healthcheck-data".into()).await
255 }
256}
257
258#[inline(always)]
259fn construct_path(name: &str, version: &str) -> String {
260 format!("crates/{name}-{version}.crate")
261}