riley_cms_core/
storage.rs

1//! S3/R2 storage operations for riley_cms
2
3use crate::config::StorageConfig;
4use crate::error::{Error, Result};
5use crate::types::{Asset, AssetListOptions, AssetListResult};
6use aws_sdk_s3::Client;
7use aws_sdk_s3::primitives::ByteStream;
8use chrono::{DateTime, Utc};
9use std::path::Path;
10
11/// Storage backend for assets
12pub struct Storage {
13    client: Client,
14    config: StorageConfig,
15}
16
17impl Storage {
18    /// Create a new storage backend
19    pub async fn new(config: &StorageConfig) -> Result<Self> {
20        let mut aws_config_builder = aws_config::from_env();
21
22        // Set custom endpoint for R2 or other S3-compatible storage
23        if let Some(endpoint) = &config.endpoint {
24            aws_config_builder = aws_config_builder.endpoint_url(endpoint);
25        }
26
27        // Set region
28        aws_config_builder =
29            aws_config_builder.region(aws_config::Region::new(config.region.clone()));
30
31        let aws_config = aws_config_builder.load().await;
32        let client = Client::new(&aws_config);
33
34        let storage = Self {
35            client,
36            config: config.clone(),
37        };
38
39        // Non-fatal connectivity check at startup
40        if let Err(e) = storage.check_connectivity().await {
41            tracing::warn!(
42                "S3 connectivity check failed for bucket '{}': {}. Asset operations may fail.",
43                config.bucket,
44                e
45            );
46        }
47
48        Ok(storage)
49    }
50
51    /// Check S3 connectivity by issuing a HeadBucket request.
52    ///
53    /// This is a lightweight check that verifies credentials and bucket access
54    /// without listing or reading any objects.
55    async fn check_connectivity(&self) -> Result<()> {
56        self.client
57            .head_bucket()
58            .bucket(&self.config.bucket)
59            .send()
60            .await
61            .map_err(|e| Error::S3(format!("HeadBucket failed: {}", e)))?;
62        Ok(())
63    }
64
65    /// Maximum assets per page
66    const MAX_PAGE_SIZE: usize = 1000;
67
68    /// List assets in the bucket with pagination.
69    ///
70    /// Uses S3's native continuation token for efficient cursor-based pagination.
71    /// Defaults to 100 assets per page, capped at 1000.
72    pub async fn list_assets(&self, opts: &AssetListOptions) -> Result<AssetListResult> {
73        let limit = opts.limit.unwrap_or(100).min(Self::MAX_PAGE_SIZE);
74
75        let mut request = self
76            .client
77            .list_objects_v2()
78            .bucket(&self.config.bucket)
79            .max_keys(limit as i32);
80
81        if let Some(ref token) = opts.continuation_token {
82            request = request.continuation_token(token);
83        }
84
85        let response = request
86            .send()
87            .await
88            .map_err(|e| Error::S3(format!("Failed to list objects: {}", e)))?;
89
90        let mut assets = Vec::new();
91        if let Some(contents) = response.contents {
92            for obj in contents {
93                let key = obj.key.unwrap_or_default();
94                let size = obj.size.unwrap_or(0) as u64;
95                let last_modified = obj
96                    .last_modified
97                    .and_then(|t| DateTime::from_timestamp(t.secs(), t.subsec_nanos()))
98                    .unwrap_or_else(Utc::now);
99
100                let url = format!(
101                    "{}/{}",
102                    self.config.public_url_base.trim_end_matches('/'),
103                    key
104                );
105
106                assets.push(Asset {
107                    key,
108                    url,
109                    size,
110                    last_modified,
111                });
112            }
113        }
114
115        let next_continuation_token = if response.is_truncated == Some(true) {
116            response.next_continuation_token
117        } else {
118            None
119        };
120
121        Ok(AssetListResult {
122            assets,
123            next_continuation_token,
124        })
125    }
126
127    /// Upload an asset to the bucket
128    pub async fn upload_asset(&self, path: &Path, dest: Option<&str>) -> Result<Asset> {
129        let file_name = path
130            .file_name()
131            .and_then(|n| n.to_str())
132            .ok_or_else(|| Error::Storage("Invalid file name".to_string()))?;
133
134        let key = match dest {
135            Some(prefix) => {
136                // Reject path traversal attempts in the destination prefix
137                let sanitized = prefix.trim_matches('/');
138                if sanitized.split('/').any(|seg| seg == "..") {
139                    return Err(Error::Storage(
140                        "Invalid destination: path traversal not allowed".to_string(),
141                    ));
142                }
143                format!("{}/{}", sanitized, file_name)
144            }
145            None => file_name.to_string(),
146        };
147
148        let body = ByteStream::from_path(path)
149            .await
150            .map_err(|e| Error::Storage(format!("Failed to read file: {}", e)))?;
151
152        // Detect content type
153        let content_type = mime_guess::from_path(path)
154            .first_or_octet_stream()
155            .to_string();
156
157        self.client
158            .put_object()
159            .bucket(&self.config.bucket)
160            .key(&key)
161            .body(body)
162            .content_type(content_type)
163            .send()
164            .await
165            .map_err(|e| Error::S3(format!("Failed to upload: {}", e)))?;
166
167        let metadata = std::fs::metadata(path)?;
168        let url = format!(
169            "{}/{}",
170            self.config.public_url_base.trim_end_matches('/'),
171            key
172        );
173
174        Ok(Asset {
175            key,
176            url,
177            size: metadata.len(),
178            last_modified: Utc::now(),
179        })
180    }
181}