Skip to main content

fastskill_core/core/
blob_storage.rs

1//! Blob storage abstraction for artifact publishing
2
3use crate::core::service::ServiceError;
4use async_trait::async_trait;
5#[allow(unused_imports)] // StreamExt is used in download() method
6use futures::StreamExt;
7use std::sync::Arc;
8#[cfg(feature = "registry-publish")]
9use tracing::info;
10
11/// Trait for blob storage backends
12#[async_trait]
13pub trait BlobStorage: Send + Sync {
14    /// Upload data to blob storage
15    async fn upload(&self, path: &str, data: &[u8]) -> Result<String, ServiceError>;
16
17    /// Download data from blob storage
18    async fn download(&self, path: &str) -> Result<Vec<u8>, ServiceError>;
19
20    /// Check if a path exists in blob storage
21    async fn exists(&self, path: &str) -> Result<bool, ServiceError>;
22
23    /// Delete a path from blob storage
24    async fn delete(&self, path: &str) -> Result<(), ServiceError>;
25
26    /// Get the base URL for downloads
27    fn base_url(&self) -> Option<&str>;
28}
29
30/// Local filesystem blob storage (for testing)
31pub struct LocalBlobStorage {
32    base_path: std::path::PathBuf,
33    base_url: Option<String>,
34}
35
36impl LocalBlobStorage {
37    pub fn new(base_path: std::path::PathBuf, base_url: Option<String>) -> Self {
38        Self {
39            base_path,
40            base_url,
41        }
42    }
43}
44
45#[async_trait]
46impl BlobStorage for LocalBlobStorage {
47    async fn upload(&self, path: &str, data: &[u8]) -> Result<String, ServiceError> {
48        let full_path = self.base_path.join(path);
49
50        // Create parent directory if needed
51        if let Some(parent) = full_path.parent() {
52            tokio::fs::create_dir_all(parent)
53                .await
54                .map_err(ServiceError::Io)?;
55        }
56
57        tokio::fs::write(&full_path, data)
58            .await
59            .map_err(ServiceError::Io)?;
60
61        Ok(path.to_string())
62    }
63
64    async fn download(&self, path: &str) -> Result<Vec<u8>, ServiceError> {
65        let full_path = self.base_path.join(path);
66        let data = tokio::fs::read(&full_path)
67            .await
68            .map_err(ServiceError::Io)?;
69        Ok(data)
70    }
71
72    async fn exists(&self, path: &str) -> Result<bool, ServiceError> {
73        let full_path = self.base_path.join(path);
74        Ok(full_path.exists())
75    }
76
77    async fn delete(&self, path: &str) -> Result<(), ServiceError> {
78        let full_path = self.base_path.join(path);
79        if full_path.exists() {
80            if full_path.is_dir() {
81                tokio::fs::remove_dir_all(&full_path)
82                    .await
83                    .map_err(ServiceError::Io)?;
84            } else {
85                tokio::fs::remove_file(&full_path)
86                    .await
87                    .map_err(ServiceError::Io)?;
88            }
89        }
90        Ok(())
91    }
92
93    fn base_url(&self) -> Option<&str> {
94        self.base_url.as_deref()
95    }
96}
97
98/// S3 blob storage (supports AWS S3 and S3-compatible services via endpoint)
99#[cfg(feature = "registry-publish")]
100pub struct S3BlobStorage {
101    client: aws_sdk_s3::Client,
102    bucket: String,
103    base_url: Option<String>,
104}
105
106#[cfg(feature = "registry-publish")]
107impl S3BlobStorage {
108    pub async fn new(
109        bucket: String,
110        region: String,
111        endpoint: Option<String>,
112        access_key: String,
113        secret_key: String,
114        base_url: Option<String>,
115    ) -> Result<Self, ServiceError> {
116        use aws_config::meta::region::RegionProviderChain;
117        use aws_config::Region;
118        use aws_sdk_s3::config::Credentials;
119
120        // Build AWS config
121        let mut config_builder = aws_config::defaults(aws_config::BehaviorVersion::latest())
122            .region(RegionProviderChain::first_try(if region.is_empty() {
123                Region::new("us-east-1")
124            } else {
125                Region::new(region.clone())
126            }));
127
128        // Set custom endpoint if provided (for S3-compatible services)
129        if let Some(endpoint_url) = endpoint {
130            config_builder = config_builder.endpoint_url(endpoint_url);
131        }
132
133        // Handle credentials: use config if provided, otherwise let AWS SDK use default chain
134        if !access_key.is_empty() && !secret_key.is_empty() {
135            let credentials = Credentials::new(access_key, secret_key, None, None, "fastskill");
136            config_builder = config_builder.credentials_provider(credentials);
137        }
138
139        let config = config_builder.load().await;
140        let client = aws_sdk_s3::Client::new(&config);
141
142        Ok(Self {
143            client,
144            bucket,
145            base_url,
146        })
147    }
148}
149
150#[cfg(feature = "registry-publish")]
151#[async_trait]
152impl BlobStorage for S3BlobStorage {
153    async fn upload(&self, path: &str, data: &[u8]) -> Result<String, ServiceError> {
154        use aws_sdk_s3::primitives::ByteStream;
155
156        let body = ByteStream::from(data.to_vec());
157
158        let request = self
159            .client
160            .put_object()
161            .bucket(&self.bucket)
162            .key(path)
163            .body(body)
164            .content_type("application/zip");
165
166        let _result = request.send().await.map_err(|e| {
167            ServiceError::Custom(format!(
168                "Failed to upload to S3: {}",
169                map_s3_error(&e as &dyn std::error::Error)
170            ))
171        })?;
172
173        // Return the path that was uploaded
174        Ok(path.to_string())
175    }
176
177    async fn download(&self, path: &str) -> Result<Vec<u8>, ServiceError> {
178        let result = self
179            .client
180            .get_object()
181            .bucket(&self.bucket)
182            .key(path)
183            .send()
184            .await
185            .map_err(|e| {
186                let error_msg = map_s3_error(&e as &dyn std::error::Error);
187                if error_msg.contains("NoSuchKey") || error_msg.contains("not found") {
188                    ServiceError::Custom(format!("Object not found: {}", path))
189                } else {
190                    ServiceError::Custom(format!("Failed to download from S3: {}", error_msg))
191                }
192            })?;
193
194        let mut body = result.body;
195        let mut data = Vec::new();
196        while let Some(chunk) = body.next().await {
197            let chunk = chunk
198                .map_err(|e| ServiceError::Custom(format!("Failed to read S3 response: {}", e)))?;
199            data.extend_from_slice(&chunk);
200        }
201
202        Ok(data)
203    }
204
205    async fn exists(&self, path: &str) -> Result<bool, ServiceError> {
206        let result = self
207            .client
208            .head_object()
209            .bucket(&self.bucket)
210            .key(path)
211            .send()
212            .await;
213
214        match result {
215            Ok(_) => Ok(true),
216            Err(e) => {
217                let error_msg = map_s3_error(&e as &dyn std::error::Error);
218                if error_msg.contains("NoSuchKey")
219                    || error_msg.contains("404")
220                    || error_msg.contains("not found")
221                {
222                    Ok(false)
223                } else {
224                    Err(ServiceError::Custom(format!(
225                        "Failed to check object existence in S3: {}",
226                        error_msg
227                    )))
228                }
229            }
230        }
231    }
232
233    async fn delete(&self, path: &str) -> Result<(), ServiceError> {
234        let result = self
235            .client
236            .delete_object()
237            .bucket(&self.bucket)
238            .key(path)
239            .send()
240            .await;
241
242        match result {
243            Ok(_) => Ok(()),
244            Err(e) => {
245                let error_msg = map_s3_error(&e as &dyn std::error::Error);
246                // Delete is idempotent - if object doesn't exist, that's OK
247                if error_msg.contains("NoSuchKey") || error_msg.contains("not found") {
248                    Ok(())
249                } else {
250                    Err(ServiceError::Custom(format!(
251                        "Failed to delete from S3: {}",
252                        error_msg
253                    )))
254                }
255            }
256        }
257    }
258
259    fn base_url(&self) -> Option<&str> {
260        self.base_url.as_deref()
261    }
262}
263
264/// Map AWS SDK errors to user-friendly error messages
265#[cfg(feature = "registry-publish")]
266fn map_s3_error(err: &dyn std::error::Error) -> String {
267    let err_str = err.to_string();
268
269    // Check for common S3 error patterns
270    if err_str.contains("NoSuchKey") || err_str.contains("not found") {
271        "Object not found".to_string()
272    } else if err_str.contains("NoSuchBucket") {
273        "Bucket not found".to_string()
274    } else if err_str.contains("AccessDenied") || err_str.contains("access denied") {
275        "Access denied".to_string()
276    } else if err_str.contains("timeout") {
277        "Request timeout".to_string()
278    } else {
279        format!("S3 error: {}", err_str)
280    }
281}
282
283/// Create blob storage from configuration
284pub async fn create_blob_storage(
285    storage_type: &str,
286    config: &BlobStorageConfig,
287) -> Result<Arc<dyn BlobStorage>, ServiceError> {
288    match storage_type {
289        "local" => {
290            let base_path = std::path::PathBuf::from(&config.base_path);
291            Ok(Arc::new(LocalBlobStorage::new(
292                base_path,
293                config.base_url.clone(),
294            )))
295        }
296        #[cfg(feature = "registry-publish")]
297        "s3" => {
298            // S3 storage supports both AWS S3 and S3-compatible services
299            // When endpoint is set, it points to an S3-compatible service
300            info!(
301                "Creating S3 blob storage: bucket='{}', region='{}', endpoint='{}', base_url='{}'",
302                config.bucket,
303                config.region,
304                config.endpoint.as_deref().unwrap_or("<none>"),
305                config.base_url.as_deref().unwrap_or("<none>"),
306            );
307
308            // Note: access_key / secret_key are intentionally NOT logged for security.
309            let storage = S3BlobStorage::new(
310                config.bucket.clone(),
311                config.region.clone(),
312                config.endpoint.clone(),
313                config.access_key.clone(),
314                config.secret_key.clone(),
315                config.base_url.clone(),
316            )
317            .await?;
318            Ok(Arc::new(storage))
319        }
320        #[cfg(not(feature = "registry-publish"))]
321        "s3" => Err(ServiceError::Custom(
322            "S3 storage requires the 'registry-publish' feature to be enabled".to_string(),
323        )),
324        _ => Err(ServiceError::Custom(format!(
325            "Unsupported storage type: {}",
326            storage_type
327        ))),
328    }
329}
330
331/// Blob storage configuration
332#[derive(Debug, Clone)]
333pub struct BlobStorageConfig {
334    pub storage_type: String,
335    pub base_path: String,
336    pub bucket: String,
337    pub region: String,
338    pub endpoint: Option<String>,
339    pub access_key: String,
340    pub secret_key: String,
341    pub base_url: Option<String>,
342}
343
344impl Default for BlobStorageConfig {
345    fn default() -> Self {
346        Self {
347            storage_type: "local".to_string(),
348            base_path: "./artifacts".to_string(),
349            bucket: String::new(),
350            region: String::new(),
351            endpoint: None,
352            access_key: String::new(),
353            secret_key: String::new(),
354            base_url: None,
355        }
356    }
357}