dx_forge/storage/
r2.rs

1/// Cloudflare R2 Storage Backend
2///
3/// This module provides integration with Cloudflare R2 for blob storage.
4/// Zero egress fees make it perfect for code hosting platforms.
5use anyhow::{Context, Result};
6use reqwest::{header, Client, StatusCode};
7use serde::{Deserialize, Serialize};
8use sha2::Sha256;
9use std::time::Duration;
10
11use super::blob::Blob;
12
13/// R2 configuration
14#[derive(Debug, Clone, Serialize, Deserialize, Default)]
15pub struct R2Config {
16    /// R2 account ID
17    pub account_id: String,
18
19    /// R2 bucket name
20    pub bucket_name: String,
21
22    /// R2 access key ID
23    pub access_key_id: String,
24
25    /// R2 secret access key
26    pub secret_access_key: String,
27
28    /// Custom domain (optional)
29    pub custom_domain: Option<String>,
30}
31
32impl R2Config {
33    /// Load configuration from environment variables
34    pub fn from_env() -> Result<Self> {
35        dotenvy::dotenv().ok();
36
37        let account_id = std::env::var("R2_ACCOUNT_ID").context("R2_ACCOUNT_ID not set in .env")?;
38        let bucket_name =
39            std::env::var("R2_BUCKET_NAME").context("R2_BUCKET_NAME not set in .env")?;
40        let access_key_id =
41            std::env::var("R2_ACCESS_KEY_ID").context("R2_ACCESS_KEY_ID not set in .env")?;
42        let secret_access_key = std::env::var("R2_SECRET_ACCESS_KEY")
43            .context("R2_SECRET_ACCESS_KEY not set in .env")?;
44        let custom_domain = std::env::var("R2_CUSTOM_DOMAIN").ok();
45
46        Ok(Self {
47            account_id,
48            bucket_name,
49            access_key_id,
50            secret_access_key,
51            custom_domain,
52        })
53    }
54
55    /// Get R2 endpoint URL
56    pub fn endpoint_url(&self) -> String {
57        if let Some(domain) = &self.custom_domain {
58            format!("https://{}", domain)
59        } else {
60            format!("https://{}.r2.cloudflarestorage.com", self.account_id)
61        }
62    }
63}
64
65/// R2 storage client
66pub struct R2Storage {
67    config: R2Config,
68    client: Client,
69}
70
71impl R2Storage {
72    /// Create new R2 storage client
73    pub fn new(config: R2Config) -> Result<Self> {
74        let client = Client::builder().timeout(Duration::from_secs(30)).build()?;
75
76        Ok(Self { config, client })
77    }
78
79    /// Upload blob to R2
80    pub async fn upload_blob(&self, blob: &Blob) -> Result<String> {
81        let hash = blob.hash();
82        let key = format!("blobs/{}/{}", &hash[..2], &hash[2..]);
83
84        let binary = blob.to_binary()?;
85        let content_hash = compute_sha256_hex(&binary);
86        let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
87
88        let url = format!(
89            "{}/{}/{}",
90            self.config.endpoint_url(),
91            self.config.bucket_name,
92            key
93        );
94
95        // Create AWS Signature V4 (simplified - in production use aws-sigv4 crate)
96        let authorization = self.create_auth_header("PUT", &key, &binary)?;
97
98        let response = self
99            .client
100            .put(&url)
101            .header(header::AUTHORIZATION, authorization)
102            .header(header::CONTENT_TYPE, "application/octet-stream")
103            .header("x-amz-content-sha256", content_hash)
104            .header("x-amz-date", date)
105            .body(binary)
106            .send()
107            .await?;
108
109        if !response.status().is_success() {
110            let status = response.status();
111            let body = response.text().await.unwrap_or_default();
112            anyhow::bail!("R2 upload failed: {} - {}", status, body);
113        }
114
115        Ok(key)
116    }
117
118    /// Download blob from R2
119    pub async fn download_blob(&self, hash: &str) -> Result<Blob> {
120        let key = format!("blobs/{}/{}", &hash[..2], &hash[2..]);
121        let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
122
123        let url = format!(
124            "{}/{}/{}",
125            self.config.endpoint_url(),
126            self.config.bucket_name,
127            key
128        );
129
130        let authorization = self.create_auth_header("GET", &key, &[])?;
131
132        let response = self
133            .client
134            .get(&url)
135            .header(header::AUTHORIZATION, authorization)
136            .header("x-amz-date", date)
137            .header("x-amz-content-sha256", "UNSIGNED-PAYLOAD")
138            .send()
139            .await?;
140
141        if response.status() == StatusCode::NOT_FOUND {
142            anyhow::bail!("Blob not found: {}", hash);
143        }
144
145        if !response.status().is_success() {
146            let status = response.status();
147            let body = response.text().await.unwrap_or_default();
148            anyhow::bail!("R2 download failed: {} - {}", status, body);
149        }
150
151        let binary = response.bytes().await?;
152        Blob::from_binary(&binary)
153    }
154
155    /// Check if blob exists in R2
156    pub async fn blob_exists(&self, hash: &str) -> Result<bool> {
157        let key = format!("blobs/{}/{}", &hash[..2], &hash[2..]);
158        let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
159
160        let url = format!(
161            "{}/{}/{}",
162            self.config.endpoint_url(),
163            self.config.bucket_name,
164            key
165        );
166
167        let authorization = self.create_auth_header("HEAD", &key, &[])?;
168
169        let response = self
170            .client
171            .head(&url)
172            .header(header::AUTHORIZATION, authorization)
173            .header("x-amz-date", date)
174            .send()
175            .await?;
176
177        Ok(response.status().is_success())
178    }
179
180    /// Delete blob from R2
181    pub async fn delete_blob(&self, hash: &str) -> Result<()> {
182        let key = format!("blobs/{}/{}", &hash[..2], &hash[2..]);
183        let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
184
185        let url = format!(
186            "{}/{}/{}",
187            self.config.endpoint_url(),
188            self.config.bucket_name,
189            key
190        );
191
192        let authorization = self.create_auth_header("DELETE", &key, &[])?;
193
194        let response = self
195            .client
196            .delete(&url)
197            .header(header::AUTHORIZATION, authorization)
198            .header("x-amz-date", date)
199            .send()
200            .await?;
201
202        if !response.status().is_success() {
203            let status = response.status();
204            let body = response.text().await.unwrap_or_default();
205            anyhow::bail!("R2 delete failed: {} - {}", status, body);
206        }
207
208        Ok(())
209    }
210
211    /// Download component from R2
212    pub async fn download_component(
213        &self,
214        tool: &str,
215        component: &str,
216        version: Option<&str>,
217    ) -> Result<String> {
218        let version = version.unwrap_or("latest");
219        let key = format!("components/{}/{}/{}.tsx", tool, version, component);
220        let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
221
222        let url = format!(
223            "{}/{}/{}",
224            self.config.endpoint_url(),
225            self.config.bucket_name,
226            key
227        );
228
229        let authorization = self.create_auth_header("GET", &key, &[])?;
230
231        let response = self
232            .client
233            .get(&url)
234            .header(header::AUTHORIZATION, authorization)
235            .header("x-amz-date", date)
236            .header("x-amz-content-sha256", "UNSIGNED-PAYLOAD")
237            .send()
238            .await?;
239
240        if response.status() == StatusCode::NOT_FOUND {
241            anyhow::bail!("Component not found: {}/{} v{}", tool, component, version);
242        }
243
244        if !response.status().is_success() {
245            let status = response.status();
246            let body = response.text().await.unwrap_or_default();
247            anyhow::bail!("R2 component download failed: {} - {}", status, body);
248        }
249
250        let content = response.text().await?;
251        Ok(content)
252    }
253
254    /// Upload component to R2
255    pub async fn upload_component(
256        &self,
257        tool: &str,
258        component: &str,
259        version: &str,
260        content: &str,
261    ) -> Result<String> {
262        let key = format!("components/{}/{}/{}.tsx", tool, version, component);
263        let binary = content.as_bytes();
264        let content_hash = compute_sha256_hex(binary);
265        let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
266
267        let url = format!(
268            "{}/{}/{}",
269            self.config.endpoint_url(),
270            self.config.bucket_name,
271            key
272        );
273
274        let authorization = self.create_auth_header("PUT", &key, binary)?;
275
276        let response = self
277            .client
278            .put(&url)
279            .header(header::AUTHORIZATION, authorization)
280            .header(header::CONTENT_TYPE, "text/plain; charset=utf-8")
281            .header("x-amz-content-sha256", content_hash)
282            .header("x-amz-date", date)
283            .body(content.to_string())
284            .send()
285            .await?;
286
287        if !response.status().is_success() {
288            let status = response.status();
289            let body = response.text().await.unwrap_or_default();
290            anyhow::bail!("R2 component upload failed: {} - {}", status, body);
291        }
292
293        Ok(key)
294    }
295
296    /// Check if component exists in R2
297    pub async fn component_exists(
298        &self,
299        tool: &str,
300        component: &str,
301        version: Option<&str>,
302    ) -> Result<bool> {
303        let version = version.unwrap_or("latest");
304        let key = format!("components/{}/{}/{}.tsx", tool, version, component);
305        let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
306
307        let url = format!(
308            "{}/{}/{}",
309            self.config.endpoint_url(),
310            self.config.bucket_name,
311            key
312        );
313
314        let authorization = self.create_auth_header("HEAD", &key, &[])?;
315
316        let response = self
317            .client
318            .head(&url)
319            .header(header::AUTHORIZATION, authorization)
320            .header("x-amz-date", date)
321            .send()
322            .await?;
323
324        Ok(response.status().is_success())
325    }
326
327    /// List all components in R2
328    pub async fn list_components(&self, tool: &str) -> Result<Vec<String>> {
329        let prefix = format!("components/{}/", tool);
330        let url = format!(
331            "{}/{}/?list-type=2&prefix={}",
332            self.config.endpoint_url(),
333            self.config.bucket_name,
334            prefix
335        );
336
337        let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
338        let authorization = self.create_auth_header("GET", &format!("?list-type=2&prefix={}", prefix), &[])?;
339
340        let response = self
341            .client
342            .get(&url)
343            .header(header::AUTHORIZATION, authorization)
344            .header("x-amz-date", date)
345            .header("x-amz-content-sha256", "UNSIGNED-PAYLOAD")
346            .send()
347            .await?;
348
349        if !response.status().is_success() {
350            let status = response.status();
351            let body = response.text().await.unwrap_or_default();
352            anyhow::bail!("R2 list failed: {} - {}", status, body);
353        }
354
355        // Parse XML response (simplified - in production use proper XML parser)
356        let body = response.text().await?;
357        let mut components = Vec::new();
358        
359        for line in body.lines() {
360            if line.contains("<Key>") {
361                let key = line.replace("<Key>", "").replace("</Key>", "").trim().to_string();
362                if let Some(name) = key.split('/').last() {
363                    if let Some(component_name) = name.strip_suffix(".tsx") {
364                        components.push(component_name.to_string());
365                    }
366                }
367            }
368        }
369
370        Ok(components)
371    }
372
373    /// Sync components (bidirectional)
374    pub async fn sync_components(
375        &self, 
376        tool: &str, 
377        local_components: &[String],
378        on_download: impl Fn(&str),
379        on_upload: impl Fn(&str)
380    ) -> Result<()> {
381        // 1. List remote components
382        let remote_components = self.list_components(tool).await?;
383        
384        // 2. Calculate sync actions
385        let (to_download, to_upload) = self.calculate_sync_actions(&remote_components, local_components);
386        
387        // 3. Execute actions
388        for remote in to_download {
389            on_download(&remote);
390        }
391        
392        for local in to_upload {
393            on_upload(&local);
394        }
395        
396        Ok(())
397    }
398
399    /// Calculate what needs to be downloaded and uploaded
400    /// Returns (to_download, to_upload)
401    #[cfg_attr(test, allow(dead_code))]
402    pub(crate) fn calculate_sync_actions(&self, remote_components: &[String], local_components: &[String]) -> (Vec<String>, Vec<String>) {
403        let mut to_download = Vec::new();
404        let mut to_upload = Vec::new();
405
406        for remote in remote_components {
407            if !local_components.contains(remote) {
408                to_download.push(remote.clone());
409            }
410        }
411
412        for local in local_components {
413            if !remote_components.contains(local) {
414                to_upload.push(local.clone());
415            }
416        }
417
418        (to_download, to_upload)
419    }
420
421    /// Create AWS Signature V4 authorization header (simplified)
422    fn create_auth_header(&self, method: &str, key: &str, body: &[u8]) -> Result<String> {
423        // Simplified auth - in production, use aws-sigv4 crate for proper signing
424        // For R2, you can also use S3-compatible libraries
425
426        use hmac::{Hmac, Mac};
427        use sha2::Sha256;
428
429        type HmacSha256 = Hmac<Sha256>;
430
431        let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
432        let date_short = &date[..8];
433
434        let body_hash = compute_sha256_hex(body);
435        let host = format!("{}.r2.cloudflarestorage.com", self.config.account_id);
436
437        // Canonical request
438        let canonical_request = format!(
439            "{}\n/{}/{}\n\nhost:{}\nx-amz-content-sha256:{}\nx-amz-date:{}\n\nhost;x-amz-content-sha256;x-amz-date\n{}",
440            method,
441            self.config.bucket_name,
442            key,
443            host,
444            body_hash,
445            date,
446            body_hash
447        );
448
449        let canonical_request_hash = compute_sha256_hex(canonical_request.as_bytes());
450
451        // String to sign
452        let string_to_sign = format!(
453            "AWS4-HMAC-SHA256\n{}\n{}/auto/s3/aws4_request\n{}",
454            date, date_short, canonical_request_hash
455        );
456
457        // Signing key
458        let mut mac = HmacSha256::new_from_slice(
459            format!("AWS4{}", self.config.secret_access_key).as_bytes(),
460        )?;
461        mac.update(date_short.as_bytes());
462        let date_key = mac.finalize().into_bytes();
463
464        let mut mac = HmacSha256::new_from_slice(&date_key)?;
465        mac.update(b"auto");
466        let region_key = mac.finalize().into_bytes();
467
468        let mut mac = HmacSha256::new_from_slice(&region_key)?;
469        mac.update(b"s3");
470        let service_key = mac.finalize().into_bytes();
471
472        let mut mac = HmacSha256::new_from_slice(&service_key)?;
473        mac.update(b"aws4_request");
474        let signing_key = mac.finalize().into_bytes();
475
476        // Signature
477        let mut mac = HmacSha256::new_from_slice(&signing_key)?;
478        mac.update(string_to_sign.as_bytes());
479        let signature = hex::encode(mac.finalize().into_bytes());
480
481        Ok(format!(
482            "AWS4-HMAC-SHA256 Credential={}/{}/auto/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature={}",
483            self.config.access_key_id,
484            date_short,
485            signature
486        ))
487    }
488
489    /// Sync local blobs up to R2 (upload missing blobs)
490    pub async fn sync_up(
491        &self,
492        local_blobs: Vec<Blob>,
493        progress_callback: Option<impl Fn(usize, usize) + Send + Sync>,
494    ) -> Result<SyncResult> {
495        use futures::stream::{self, StreamExt};
496        
497        tracing::info!("🔄 Starting R2 sync up: {} local blobs", local_blobs.len());
498        
499        let mut uploaded = 0;
500        let mut skipped = 0;
501        let mut errors = Vec::new();
502        let total = local_blobs.len();
503
504        // Check which blobs already exist in R2
505        let mut to_upload = Vec::new();
506        for blob in local_blobs {
507            match self.blob_exists(&blob.hash()).await {
508                Ok(exists) => {
509                    if exists {
510                        skipped += 1;
511                    } else {
512                        to_upload.push(blob);
513                    }
514                }
515                Err(e) => {
516                    errors.push(format!("Failed to check blob {}: {}", blob.hash(), e));
517                    to_upload.push(blob); // Try to upload anyway
518                }
519            }
520        }
521
522        // Upload missing blobs in parallel (max 10 concurrent)
523        let mut stream = stream::iter(to_upload.into_iter().enumerate())
524            .map(|(idx, blob)| async move {
525                let hash = blob.hash();
526                match self.upload_blob(&blob).await {
527                    Ok(_) => Ok::<(usize, String), String>((idx, hash.to_string())),
528                    Err(e) => Err(format!("Failed to upload {}: {}", hash, e)),
529                }
530            })
531            .buffer_unordered(10);
532
533        while let Some(result) = stream.next().await {
534            match result {
535                Ok((_idx, _hash)) => {
536                    uploaded += 1;
537                    if let Some(cb) = &progress_callback {
538                        cb(uploaded + skipped, total);
539                    }
540                }
541                Err(e) => {
542                    errors.push(e);
543                }
544            }
545        }
546
547        tracing::info!(
548            "✅ Sync up complete: {} uploaded, {} skipped, {} errors",
549            uploaded,
550            skipped,
551            errors.len()
552        );
553
554        Ok(SyncResult {
555            uploaded,
556            downloaded: 0,
557            skipped,
558            errors,
559        })
560    }
561
562    /// Sync remote blobs down from R2 (download missing blobs)
563    pub async fn sync_down(
564        &self,
565        remote_hashes: Vec<String>,
566        progress_callback: Option<impl Fn(usize, usize) + Send + Sync>,
567    ) -> Result<Vec<Blob>> {
568        use futures::stream::{self, StreamExt};
569        
570        tracing::info!("🔄 Starting R2 sync down: {} remote blobs", remote_hashes.len());
571        
572        let total = remote_hashes.len();
573        let mut downloaded_blobs = Vec::new();
574
575        // Download blobs in parallel (max 10 concurrent)
576        let mut stream = stream::iter(remote_hashes.into_iter().enumerate())
577            .map(|(idx, hash)| async move {
578                match self.download_blob(&hash).await {
579                    Ok(blob) => Ok::<(usize, Blob), String>((idx, blob)),
580                    Err(e) => Err(format!("Failed to download {}: {}", hash, e)),
581                }
582            })
583            .buffer_unordered(10);
584
585        let mut errors = Vec::new();
586        while let Some(result) = stream.next().await {
587            match result {
588                Ok((idx, blob)) => {
589                    downloaded_blobs.push(blob);
590                    if let Some(cb) = &progress_callback {
591                        cb(idx + 1, total);
592                    }
593                }
594                Err(e) => {
595                    tracing::warn!("⚠️ {}", e);
596                    errors.push(e);
597                }
598            }
599        }
600
601        tracing::info!(
602            "✅ Sync down complete: {} downloaded, {} errors", downloaded_blobs.len(),
603            errors.len()
604        );
605
606        Ok(downloaded_blobs)
607    }
608
609    /// List all blob hashes in R2 bucket (simplified - in production use pagination)
610    pub async fn list_blobs(&self, _prefix: Option<&str>) -> Result<Vec<String>> {
611        // This is a simplified version. In production, use S3 ListObjects API
612        // For now, return empty list as listing requires more complex S3 API integration
613        tracing::warn!("R2 list_blobs not fully implemented - requires S3 ListObjects API");
614        Ok(Vec::new())
615    }
616}
617
618/// Sync operation result
619#[derive(Debug, Clone, Serialize, Deserialize)]
620pub struct SyncResult {
621    pub uploaded: usize,
622    pub downloaded: usize,
623    pub skipped: usize,
624    pub errors: Vec<String>,
625}
626
627/// Compute SHA-256 hex string
628fn compute_sha256_hex(data: &[u8]) -> String {
629    use sha2::Digest;
630    let mut hasher = Sha256::new();
631    hasher.update(data);
632    format!("{:x}", hasher.finalize())
633}
634
635/// Batch upload blobs with progress tracking
636pub async fn batch_upload_blobs(
637    storage: &R2Storage,
638    blobs: Vec<Blob>,
639    progress_callback: impl Fn(usize, usize),
640) -> Result<Vec<String>> {
641    use futures::stream::{self, StreamExt};
642
643    let total = blobs.len();
644    let mut keys = Vec::with_capacity(total);
645
646    // Upload in parallel (max 10 concurrent)
647    let mut stream = stream::iter(blobs.into_iter().enumerate())
648        .map(|(idx, blob)| async move {
649            let key = storage.upload_blob(&blob).await?;
650            Ok::<(usize, String), anyhow::Error>((idx, key))
651        })
652        .buffer_unordered(10);
653
654    while let Some(result) = stream.next().await {
655        let (idx, key) = result?;
656        keys.push(key);
657        progress_callback(idx + 1, total);
658    }
659
660    Ok(keys)
661}
662
663#[cfg(test)]
664mod tests {
665    use super::*;
666
667    #[test]
668    fn test_r2_config() {
669        let config = R2Config {
670            account_id: "test-account".to_string(),
671            bucket_name: "forge-blobs".to_string(),
672            access_key_id: "test-key".to_string(),
673            secret_access_key: "test-secret".to_string(),
674            custom_domain: None,
675        };
676
677        assert!(config.endpoint_url().contains("test-account"));
678        assert!(config.endpoint_url().contains("r2.cloudflarestorage.com"));
679    }
680
681    #[test]
682    fn test_sync_calculation() {
683        let config = R2Config::default();
684        let storage = R2Storage::new(config).unwrap();
685        
686        let remote = vec!["comp1.tsx".to_string(), "comp2.tsx".to_string()];
687        let local = vec!["comp2.tsx".to_string(), "comp3.tsx".to_string()];
688        
689        let (download, upload) = storage.calculate_sync_actions(&remote, &local);
690        
691        assert_eq!(download, vec!["comp1.tsx".to_string()]);
692        assert_eq!(upload, vec!["comp3.tsx".to_string()]);
693    }
694    
695    #[test]
696    fn test_sync_empty() {
697        let config = R2Config::default();
698        let storage = R2Storage::new(config).unwrap();
699        
700        let remote = vec![];
701        let local = vec![];
702        
703        let (download, upload) = storage.calculate_sync_actions(&remote, &local);
704        
705        assert!(download.is_empty());
706        assert!(upload.is_empty());
707    }
708}