dx_forge/storage/
r2.rs

1/// Cloudflare R2 Storage Backend
2///
3/// This module provides integration with Cloudflare R2 for blob storage.
4/// Zero egress fees make it perfect for code hosting platforms.
5use anyhow::{Context, Result};
6use reqwest::{header, Client, StatusCode};
7use serde::{Deserialize, Serialize};
8use sha2::Sha256;
9use std::time::Duration;
10
11use super::blob::Blob;
12
13/// R2 configuration
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct R2Config {
16    /// R2 account ID
17    pub account_id: String,
18
19    /// R2 bucket name
20    pub bucket_name: String,
21
22    /// R2 access key ID
23    pub access_key_id: String,
24
25    /// R2 secret access key
26    pub secret_access_key: String,
27
28    /// Custom domain (optional)
29    pub custom_domain: Option<String>,
30}
31
32impl R2Config {
33    /// Load configuration from environment variables
34    pub fn from_env() -> Result<Self> {
35        dotenvy::dotenv().ok();
36
37        let account_id = std::env::var("R2_ACCOUNT_ID").context("R2_ACCOUNT_ID not set in .env")?;
38        let bucket_name =
39            std::env::var("R2_BUCKET_NAME").context("R2_BUCKET_NAME not set in .env")?;
40        let access_key_id =
41            std::env::var("R2_ACCESS_KEY_ID").context("R2_ACCESS_KEY_ID not set in .env")?;
42        let secret_access_key = std::env::var("R2_SECRET_ACCESS_KEY")
43            .context("R2_SECRET_ACCESS_KEY not set in .env")?;
44        let custom_domain = std::env::var("R2_CUSTOM_DOMAIN").ok();
45
46        Ok(Self {
47            account_id,
48            bucket_name,
49            access_key_id,
50            secret_access_key,
51            custom_domain,
52        })
53    }
54
55    /// Get R2 endpoint URL
56    pub fn endpoint_url(&self) -> String {
57        if let Some(domain) = &self.custom_domain {
58            format!("https://{}", domain)
59        } else {
60            format!("https://{}.r2.cloudflarestorage.com", self.account_id)
61        }
62    }
63}
64
65/// R2 storage client
66pub struct R2Storage {
67    config: R2Config,
68    client: Client,
69}
70
71impl R2Storage {
72    /// Create new R2 storage client
73    pub fn new(config: R2Config) -> Result<Self> {
74        let client = Client::builder().timeout(Duration::from_secs(30)).build()?;
75
76        Ok(Self { config, client })
77    }
78
79    /// Upload blob to R2
80    pub async fn upload_blob(&self, blob: &Blob) -> Result<String> {
81        let hash = blob.hash();
82        let key = format!("blobs/{}/{}", &hash[..2], &hash[2..]);
83
84        let binary = blob.to_binary()?;
85        let content_hash = compute_sha256_hex(&binary);
86        let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
87
88        let url = format!(
89            "{}/{}/{}",
90            self.config.endpoint_url(),
91            self.config.bucket_name,
92            key
93        );
94
95        // Create AWS Signature V4 (simplified - in production use aws-sigv4 crate)
96        let authorization = self.create_auth_header("PUT", &key, &binary)?;
97
98        let response = self
99            .client
100            .put(&url)
101            .header(header::AUTHORIZATION, authorization)
102            .header(header::CONTENT_TYPE, "application/octet-stream")
103            .header("x-amz-content-sha256", content_hash)
104            .header("x-amz-date", date)
105            .body(binary)
106            .send()
107            .await?;
108
109        if !response.status().is_success() {
110            let status = response.status();
111            let body = response.text().await.unwrap_or_default();
112            anyhow::bail!("R2 upload failed: {} - {}", status, body);
113        }
114
115        Ok(key)
116    }
117
118    /// Download blob from R2
119    pub async fn download_blob(&self, hash: &str) -> Result<Blob> {
120        let key = format!("blobs/{}/{}", &hash[..2], &hash[2..]);
121        let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
122
123        let url = format!(
124            "{}/{}/{}",
125            self.config.endpoint_url(),
126            self.config.bucket_name,
127            key
128        );
129
130        let authorization = self.create_auth_header("GET", &key, &[])?;
131
132        let response = self
133            .client
134            .get(&url)
135            .header(header::AUTHORIZATION, authorization)
136            .header("x-amz-date", date)
137            .header("x-amz-content-sha256", "UNSIGNED-PAYLOAD")
138            .send()
139            .await?;
140
141        if response.status() == StatusCode::NOT_FOUND {
142            anyhow::bail!("Blob not found: {}", hash);
143        }
144
145        if !response.status().is_success() {
146            let status = response.status();
147            let body = response.text().await.unwrap_or_default();
148            anyhow::bail!("R2 download failed: {} - {}", status, body);
149        }
150
151        let binary = response.bytes().await?;
152        Blob::from_binary(&binary)
153    }
154
155    /// Check if blob exists in R2
156    pub async fn blob_exists(&self, hash: &str) -> Result<bool> {
157        let key = format!("blobs/{}/{}", &hash[..2], &hash[2..]);
158        let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
159
160        let url = format!(
161            "{}/{}/{}",
162            self.config.endpoint_url(),
163            self.config.bucket_name,
164            key
165        );
166
167        let authorization = self.create_auth_header("HEAD", &key, &[])?;
168
169        let response = self
170            .client
171            .head(&url)
172            .header(header::AUTHORIZATION, authorization)
173            .header("x-amz-date", date)
174            .send()
175            .await?;
176
177        Ok(response.status().is_success())
178    }
179
180    /// Delete blob from R2
181    pub async fn delete_blob(&self, hash: &str) -> Result<()> {
182        let key = format!("blobs/{}/{}", &hash[..2], &hash[2..]);
183        let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
184
185        let url = format!(
186            "{}/{}/{}",
187            self.config.endpoint_url(),
188            self.config.bucket_name,
189            key
190        );
191
192        let authorization = self.create_auth_header("DELETE", &key, &[])?;
193
194        let response = self
195            .client
196            .delete(&url)
197            .header(header::AUTHORIZATION, authorization)
198            .header("x-amz-date", date)
199            .send()
200            .await?;
201
202        if !response.status().is_success() {
203            let status = response.status();
204            let body = response.text().await.unwrap_or_default();
205            anyhow::bail!("R2 delete failed: {} - {}", status, body);
206        }
207
208        Ok(())
209    }
210
211    /// Create AWS Signature V4 authorization header (simplified)
212    fn create_auth_header(&self, method: &str, key: &str, body: &[u8]) -> Result<String> {
213        // Simplified auth - in production, use aws-sigv4 crate for proper signing
214        // For R2, you can also use S3-compatible libraries
215
216        use hmac::{Hmac, Mac};
217
218        type HmacSha256 = Hmac<Sha256>;
219
220        let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
221        let date_short = &date[..8];
222
223        let body_hash = compute_sha256_hex(body);
224        let host = format!("{}.r2.cloudflarestorage.com", self.config.account_id);
225
226        // Canonical request
227        let canonical_request = format!(
228            "{}\n/{}/{}\n\nhost:{}\nx-amz-content-sha256:{}\nx-amz-date:{}\n\nhost;x-amz-content-sha256;x-amz-date\n{}",
229            method,
230            self.config.bucket_name,
231            key,
232            host,
233            body_hash,
234            date,
235            body_hash
236        );
237
238        let canonical_request_hash = compute_sha256_hex(canonical_request.as_bytes());
239
240        // String to sign
241        let string_to_sign = format!(
242            "AWS4-HMAC-SHA256\n{}\n{}/auto/s3/aws4_request\n{}",
243            date, date_short, canonical_request_hash
244        );
245
246        // Signing key
247        let mut mac = HmacSha256::new_from_slice(
248            format!("AWS4{}", self.config.secret_access_key).as_bytes(),
249        )?;
250        mac.update(date_short.as_bytes());
251        let date_key = mac.finalize().into_bytes();
252
253        let mut mac = HmacSha256::new_from_slice(&date_key)?;
254        mac.update(b"auto");
255        let region_key = mac.finalize().into_bytes();
256
257        let mut mac = HmacSha256::new_from_slice(&region_key)?;
258        mac.update(b"s3");
259        let service_key = mac.finalize().into_bytes();
260
261        let mut mac = HmacSha256::new_from_slice(&service_key)?;
262        mac.update(b"aws4_request");
263        let signing_key = mac.finalize().into_bytes();
264
265        // Signature
266        let mut mac = HmacSha256::new_from_slice(&signing_key)?;
267        mac.update(string_to_sign.as_bytes());
268        let signature = hex::encode(mac.finalize().into_bytes());
269
270        Ok(format!(
271            "AWS4-HMAC-SHA256 Credential={}/{}/auto/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature={}",
272            self.config.access_key_id,
273            date_short,
274            signature
275        ))
276    }
277}
278
279/// Compute SHA-256 hex string
280fn compute_sha256_hex(data: &[u8]) -> String {
281    use sha2::Digest;
282    let mut hasher = Sha256::new();
283    hasher.update(data);
284    format!("{:x}", hasher.finalize())
285}
286
287/// Batch upload blobs with progress tracking
288pub async fn batch_upload_blobs(
289    storage: &R2Storage,
290    blobs: Vec<Blob>,
291    progress_callback: impl Fn(usize, usize),
292) -> Result<Vec<String>> {
293    use futures::stream::{self, StreamExt};
294
295    let total = blobs.len();
296    let mut keys = Vec::with_capacity(total);
297
298    // Upload in parallel (max 10 concurrent)
299    let mut stream = stream::iter(blobs.into_iter().enumerate())
300        .map(|(idx, blob)| async move {
301            let key = storage.upload_blob(&blob).await?;
302            Ok::<(usize, String), anyhow::Error>((idx, key))
303        })
304        .buffer_unordered(10);
305
306    while let Some(result) = stream.next().await {
307        let (idx, key) = result?;
308        keys.push(key);
309        progress_callback(idx + 1, total);
310    }
311
312    Ok(keys)
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318
319    #[test]
320    fn test_r2_config() {
321        // Test config creation
322        let config = R2Config {
323            account_id: "test-account".to_string(),
324            bucket_name: "forge-blobs".to_string(),
325            access_key_id: "test-key".to_string(),
326            secret_access_key: "test-secret".to_string(),
327            custom_domain: None,
328        };
329
330        assert!(config.endpoint_url().contains("test-account"));
331        assert!(config.endpoint_url().contains("r2.cloudflarestorage.com"));
332    }
333}