1use crate::core::service::ServiceError;
4use async_trait::async_trait;
5#[allow(unused_imports)] use futures::StreamExt;
7use std::sync::Arc;
8#[cfg(feature = "registry-publish")]
9use tracing::info;
10
11#[async_trait]
13pub trait BlobStorage: Send + Sync {
14 async fn upload(&self, path: &str, data: &[u8]) -> Result<String, ServiceError>;
16
17 async fn download(&self, path: &str) -> Result<Vec<u8>, ServiceError>;
19
20 async fn exists(&self, path: &str) -> Result<bool, ServiceError>;
22
23 async fn delete(&self, path: &str) -> Result<(), ServiceError>;
25
26 fn base_url(&self) -> Option<&str>;
28}
29
30pub struct LocalBlobStorage {
32 base_path: std::path::PathBuf,
33 base_url: Option<String>,
34}
35
36impl LocalBlobStorage {
37 pub fn new(base_path: std::path::PathBuf, base_url: Option<String>) -> Self {
38 Self {
39 base_path,
40 base_url,
41 }
42 }
43}
44
45#[async_trait]
46impl BlobStorage for LocalBlobStorage {
47 async fn upload(&self, path: &str, data: &[u8]) -> Result<String, ServiceError> {
48 let full_path = self.base_path.join(path);
49
50 if let Some(parent) = full_path.parent() {
52 tokio::fs::create_dir_all(parent)
53 .await
54 .map_err(ServiceError::Io)?;
55 }
56
57 tokio::fs::write(&full_path, data)
58 .await
59 .map_err(ServiceError::Io)?;
60
61 Ok(path.to_string())
62 }
63
64 async fn download(&self, path: &str) -> Result<Vec<u8>, ServiceError> {
65 let full_path = self.base_path.join(path);
66 let data = tokio::fs::read(&full_path)
67 .await
68 .map_err(ServiceError::Io)?;
69 Ok(data)
70 }
71
72 async fn exists(&self, path: &str) -> Result<bool, ServiceError> {
73 let full_path = self.base_path.join(path);
74 Ok(full_path.exists())
75 }
76
77 async fn delete(&self, path: &str) -> Result<(), ServiceError> {
78 let full_path = self.base_path.join(path);
79 if full_path.exists() {
80 if full_path.is_dir() {
81 tokio::fs::remove_dir_all(&full_path)
82 .await
83 .map_err(ServiceError::Io)?;
84 } else {
85 tokio::fs::remove_file(&full_path)
86 .await
87 .map_err(ServiceError::Io)?;
88 }
89 }
90 Ok(())
91 }
92
93 fn base_url(&self) -> Option<&str> {
94 self.base_url.as_deref()
95 }
96}
97
98#[cfg(feature = "registry-publish")]
100pub struct S3BlobStorage {
101 client: aws_sdk_s3::Client,
102 bucket: String,
103 base_url: Option<String>,
104}
105
106#[cfg(feature = "registry-publish")]
107impl S3BlobStorage {
108 pub async fn new(
109 bucket: String,
110 region: String,
111 endpoint: Option<String>,
112 access_key: String,
113 secret_key: String,
114 base_url: Option<String>,
115 ) -> Result<Self, ServiceError> {
116 use aws_config::meta::region::RegionProviderChain;
117 use aws_config::Region;
118 use aws_sdk_s3::config::Credentials;
119
120 let mut config_builder = aws_config::defaults(aws_config::BehaviorVersion::latest())
122 .region(RegionProviderChain::first_try(if region.is_empty() {
123 Region::new("us-east-1")
124 } else {
125 Region::new(region.clone())
126 }));
127
128 if let Some(endpoint_url) = endpoint {
130 config_builder = config_builder.endpoint_url(endpoint_url);
131 }
132
133 if !access_key.is_empty() && !secret_key.is_empty() {
135 let credentials = Credentials::new(access_key, secret_key, None, None, "fastskill");
136 config_builder = config_builder.credentials_provider(credentials);
137 }
138
139 let config = config_builder.load().await;
140 let client = aws_sdk_s3::Client::new(&config);
141
142 Ok(Self {
143 client,
144 bucket,
145 base_url,
146 })
147 }
148}
149
150#[cfg(feature = "registry-publish")]
151#[async_trait]
152impl BlobStorage for S3BlobStorage {
153 async fn upload(&self, path: &str, data: &[u8]) -> Result<String, ServiceError> {
154 use aws_sdk_s3::primitives::ByteStream;
155
156 let body = ByteStream::from(data.to_vec());
157
158 let request = self
159 .client
160 .put_object()
161 .bucket(&self.bucket)
162 .key(path)
163 .body(body)
164 .content_type("application/zip");
165
166 let _result = request.send().await.map_err(|e| {
167 ServiceError::Custom(format!(
168 "Failed to upload to S3: {}",
169 map_s3_error(&e as &dyn std::error::Error)
170 ))
171 })?;
172
173 Ok(path.to_string())
175 }
176
177 async fn download(&self, path: &str) -> Result<Vec<u8>, ServiceError> {
178 let result = self
179 .client
180 .get_object()
181 .bucket(&self.bucket)
182 .key(path)
183 .send()
184 .await
185 .map_err(|e| {
186 let error_msg = map_s3_error(&e as &dyn std::error::Error);
187 if error_msg.contains("NoSuchKey") || error_msg.contains("not found") {
188 ServiceError::Custom(format!("Object not found: {}", path))
189 } else {
190 ServiceError::Custom(format!("Failed to download from S3: {}", error_msg))
191 }
192 })?;
193
194 let mut body = result.body;
195 let mut data = Vec::new();
196 while let Some(chunk) = body.next().await {
197 let chunk = chunk
198 .map_err(|e| ServiceError::Custom(format!("Failed to read S3 response: {}", e)))?;
199 data.extend_from_slice(&chunk);
200 }
201
202 Ok(data)
203 }
204
205 async fn exists(&self, path: &str) -> Result<bool, ServiceError> {
206 let result = self
207 .client
208 .head_object()
209 .bucket(&self.bucket)
210 .key(path)
211 .send()
212 .await;
213
214 match result {
215 Ok(_) => Ok(true),
216 Err(e) => {
217 let error_msg = map_s3_error(&e as &dyn std::error::Error);
218 if error_msg.contains("NoSuchKey")
219 || error_msg.contains("404")
220 || error_msg.contains("not found")
221 {
222 Ok(false)
223 } else {
224 Err(ServiceError::Custom(format!(
225 "Failed to check object existence in S3: {}",
226 error_msg
227 )))
228 }
229 }
230 }
231 }
232
233 async fn delete(&self, path: &str) -> Result<(), ServiceError> {
234 let result = self
235 .client
236 .delete_object()
237 .bucket(&self.bucket)
238 .key(path)
239 .send()
240 .await;
241
242 match result {
243 Ok(_) => Ok(()),
244 Err(e) => {
245 let error_msg = map_s3_error(&e as &dyn std::error::Error);
246 if error_msg.contains("NoSuchKey") || error_msg.contains("not found") {
248 Ok(())
249 } else {
250 Err(ServiceError::Custom(format!(
251 "Failed to delete from S3: {}",
252 error_msg
253 )))
254 }
255 }
256 }
257 }
258
259 fn base_url(&self) -> Option<&str> {
260 self.base_url.as_deref()
261 }
262}
263
264#[cfg(feature = "registry-publish")]
266fn map_s3_error(err: &dyn std::error::Error) -> String {
267 let err_str = err.to_string();
268
269 if err_str.contains("NoSuchKey") || err_str.contains("not found") {
271 "Object not found".to_string()
272 } else if err_str.contains("NoSuchBucket") {
273 "Bucket not found".to_string()
274 } else if err_str.contains("AccessDenied") || err_str.contains("access denied") {
275 "Access denied".to_string()
276 } else if err_str.contains("timeout") {
277 "Request timeout".to_string()
278 } else {
279 format!("S3 error: {}", err_str)
280 }
281}
282
283pub async fn create_blob_storage(
285 storage_type: &str,
286 config: &BlobStorageConfig,
287) -> Result<Arc<dyn BlobStorage>, ServiceError> {
288 match storage_type {
289 "local" => {
290 let base_path = std::path::PathBuf::from(&config.base_path);
291 Ok(Arc::new(LocalBlobStorage::new(
292 base_path,
293 config.base_url.clone(),
294 )))
295 }
296 #[cfg(feature = "registry-publish")]
297 "s3" => {
298 info!(
301 "Creating S3 blob storage: bucket='{}', region='{}', endpoint='{}', base_url='{}'",
302 config.bucket,
303 config.region,
304 config.endpoint.as_deref().unwrap_or("<none>"),
305 config.base_url.as_deref().unwrap_or("<none>"),
306 );
307
308 let storage = S3BlobStorage::new(
310 config.bucket.clone(),
311 config.region.clone(),
312 config.endpoint.clone(),
313 config.access_key.clone(),
314 config.secret_key.clone(),
315 config.base_url.clone(),
316 )
317 .await?;
318 Ok(Arc::new(storage))
319 }
320 #[cfg(not(feature = "registry-publish"))]
321 "s3" => Err(ServiceError::Custom(
322 "S3 storage requires the 'registry-publish' feature to be enabled".to_string(),
323 )),
324 _ => Err(ServiceError::Custom(format!(
325 "Unsupported storage type: {}",
326 storage_type
327 ))),
328 }
329}
330
331#[derive(Debug, Clone)]
333pub struct BlobStorageConfig {
334 pub storage_type: String,
335 pub base_path: String,
336 pub bucket: String,
337 pub region: String,
338 pub endpoint: Option<String>,
339 pub access_key: String,
340 pub secret_key: String,
341 pub base_url: Option<String>,
342}
343
344impl Default for BlobStorageConfig {
345 fn default() -> Self {
346 Self {
347 storage_type: "local".to_string(),
348 base_path: "./artifacts".to_string(),
349 bucket: String::new(),
350 region: String::new(),
351 endpoint: None,
352 access_key: String::new(),
353 secret_key: String::new(),
354 base_url: None,
355 }
356 }
357}