1use anyhow::{Context, Result};
6use reqwest::{header, Client, StatusCode};
7use serde::{Deserialize, Serialize};
8use sha2::Sha256;
9use std::time::Duration;
10
11use super::blob::Blob;
12
13#[derive(Debug, Clone, Serialize, Deserialize, Default)]
15pub struct R2Config {
16 pub account_id: String,
18
19 pub bucket_name: String,
21
22 pub access_key_id: String,
24
25 pub secret_access_key: String,
27
28 pub custom_domain: Option<String>,
30}
31
32impl R2Config {
33 pub fn from_env() -> Result<Self> {
35 dotenvy::dotenv().ok();
36
37 let account_id = std::env::var("R2_ACCOUNT_ID").context("R2_ACCOUNT_ID not set in .env")?;
38 let bucket_name =
39 std::env::var("R2_BUCKET_NAME").context("R2_BUCKET_NAME not set in .env")?;
40 let access_key_id =
41 std::env::var("R2_ACCESS_KEY_ID").context("R2_ACCESS_KEY_ID not set in .env")?;
42 let secret_access_key = std::env::var("R2_SECRET_ACCESS_KEY")
43 .context("R2_SECRET_ACCESS_KEY not set in .env")?;
44 let custom_domain = std::env::var("R2_CUSTOM_DOMAIN").ok();
45
46 Ok(Self {
47 account_id,
48 bucket_name,
49 access_key_id,
50 secret_access_key,
51 custom_domain,
52 })
53 }
54
55 pub fn endpoint_url(&self) -> String {
57 if let Some(domain) = &self.custom_domain {
58 format!("https://{}", domain)
59 } else {
60 format!("https://{}.r2.cloudflarestorage.com", self.account_id)
61 }
62 }
63}
64
65pub struct R2Storage {
67 config: R2Config,
68 client: Client,
69}
70
71impl R2Storage {
72 pub fn new(config: R2Config) -> Result<Self> {
74 let client = Client::builder().timeout(Duration::from_secs(30)).build()?;
75
76 Ok(Self { config, client })
77 }
78
79 pub async fn upload_blob(&self, blob: &Blob) -> Result<String> {
81 let hash = blob.hash();
82 let key = format!("blobs/{}/{}", &hash[..2], &hash[2..]);
83
84 let binary = blob.to_binary()?;
85 let content_hash = compute_sha256_hex(&binary);
86 let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
87
88 let url = format!(
89 "{}/{}/{}",
90 self.config.endpoint_url(),
91 self.config.bucket_name,
92 key
93 );
94
95 let authorization = self.create_auth_header("PUT", &key, &binary)?;
97
98 let response = self
99 .client
100 .put(&url)
101 .header(header::AUTHORIZATION, authorization)
102 .header(header::CONTENT_TYPE, "application/octet-stream")
103 .header("x-amz-content-sha256", content_hash)
104 .header("x-amz-date", date)
105 .body(binary)
106 .send()
107 .await?;
108
109 if !response.status().is_success() {
110 let status = response.status();
111 let body = response.text().await.unwrap_or_default();
112 anyhow::bail!("R2 upload failed: {} - {}", status, body);
113 }
114
115 Ok(key)
116 }
117
118 pub async fn download_blob(&self, hash: &str) -> Result<Blob> {
120 let key = format!("blobs/{}/{}", &hash[..2], &hash[2..]);
121 let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
122
123 let url = format!(
124 "{}/{}/{}",
125 self.config.endpoint_url(),
126 self.config.bucket_name,
127 key
128 );
129
130 let authorization = self.create_auth_header("GET", &key, &[])?;
131
132 let response = self
133 .client
134 .get(&url)
135 .header(header::AUTHORIZATION, authorization)
136 .header("x-amz-date", date)
137 .header("x-amz-content-sha256", "UNSIGNED-PAYLOAD")
138 .send()
139 .await?;
140
141 if response.status() == StatusCode::NOT_FOUND {
142 anyhow::bail!("Blob not found: {}", hash);
143 }
144
145 if !response.status().is_success() {
146 let status = response.status();
147 let body = response.text().await.unwrap_or_default();
148 anyhow::bail!("R2 download failed: {} - {}", status, body);
149 }
150
151 let binary = response.bytes().await?;
152 Blob::from_binary(&binary)
153 }
154
155 pub async fn blob_exists(&self, hash: &str) -> Result<bool> {
157 let key = format!("blobs/{}/{}", &hash[..2], &hash[2..]);
158 let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
159
160 let url = format!(
161 "{}/{}/{}",
162 self.config.endpoint_url(),
163 self.config.bucket_name,
164 key
165 );
166
167 let authorization = self.create_auth_header("HEAD", &key, &[])?;
168
169 let response = self
170 .client
171 .head(&url)
172 .header(header::AUTHORIZATION, authorization)
173 .header("x-amz-date", date)
174 .send()
175 .await?;
176
177 Ok(response.status().is_success())
178 }
179
180 pub async fn delete_blob(&self, hash: &str) -> Result<()> {
182 let key = format!("blobs/{}/{}", &hash[..2], &hash[2..]);
183 let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
184
185 let url = format!(
186 "{}/{}/{}",
187 self.config.endpoint_url(),
188 self.config.bucket_name,
189 key
190 );
191
192 let authorization = self.create_auth_header("DELETE", &key, &[])?;
193
194 let response = self
195 .client
196 .delete(&url)
197 .header(header::AUTHORIZATION, authorization)
198 .header("x-amz-date", date)
199 .send()
200 .await?;
201
202 if !response.status().is_success() {
203 let status = response.status();
204 let body = response.text().await.unwrap_or_default();
205 anyhow::bail!("R2 delete failed: {} - {}", status, body);
206 }
207
208 Ok(())
209 }
210
211 pub async fn download_component(
213 &self,
214 tool: &str,
215 component: &str,
216 version: Option<&str>,
217 ) -> Result<String> {
218 let version = version.unwrap_or("latest");
219 let key = format!("components/{}/{}/{}.tsx", tool, version, component);
220 let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
221
222 let url = format!(
223 "{}/{}/{}",
224 self.config.endpoint_url(),
225 self.config.bucket_name,
226 key
227 );
228
229 let authorization = self.create_auth_header("GET", &key, &[])?;
230
231 let response = self
232 .client
233 .get(&url)
234 .header(header::AUTHORIZATION, authorization)
235 .header("x-amz-date", date)
236 .header("x-amz-content-sha256", "UNSIGNED-PAYLOAD")
237 .send()
238 .await?;
239
240 if response.status() == StatusCode::NOT_FOUND {
241 anyhow::bail!("Component not found: {}/{} v{}", tool, component, version);
242 }
243
244 if !response.status().is_success() {
245 let status = response.status();
246 let body = response.text().await.unwrap_or_default();
247 anyhow::bail!("R2 component download failed: {} - {}", status, body);
248 }
249
250 let content = response.text().await?;
251 Ok(content)
252 }
253
254 pub async fn upload_component(
256 &self,
257 tool: &str,
258 component: &str,
259 version: &str,
260 content: &str,
261 ) -> Result<String> {
262 let key = format!("components/{}/{}/{}.tsx", tool, version, component);
263 let binary = content.as_bytes();
264 let content_hash = compute_sha256_hex(binary);
265 let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
266
267 let url = format!(
268 "{}/{}/{}",
269 self.config.endpoint_url(),
270 self.config.bucket_name,
271 key
272 );
273
274 let authorization = self.create_auth_header("PUT", &key, binary)?;
275
276 let response = self
277 .client
278 .put(&url)
279 .header(header::AUTHORIZATION, authorization)
280 .header(header::CONTENT_TYPE, "text/plain; charset=utf-8")
281 .header("x-amz-content-sha256", content_hash)
282 .header("x-amz-date", date)
283 .body(content.to_string())
284 .send()
285 .await?;
286
287 if !response.status().is_success() {
288 let status = response.status();
289 let body = response.text().await.unwrap_or_default();
290 anyhow::bail!("R2 component upload failed: {} - {}", status, body);
291 }
292
293 Ok(key)
294 }
295
296 pub async fn component_exists(
298 &self,
299 tool: &str,
300 component: &str,
301 version: Option<&str>,
302 ) -> Result<bool> {
303 let version = version.unwrap_or("latest");
304 let key = format!("components/{}/{}/{}.tsx", tool, version, component);
305 let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
306
307 let url = format!(
308 "{}/{}/{}",
309 self.config.endpoint_url(),
310 self.config.bucket_name,
311 key
312 );
313
314 let authorization = self.create_auth_header("HEAD", &key, &[])?;
315
316 let response = self
317 .client
318 .head(&url)
319 .header(header::AUTHORIZATION, authorization)
320 .header("x-amz-date", date)
321 .send()
322 .await?;
323
324 Ok(response.status().is_success())
325 }
326
327 pub async fn list_components(&self, tool: &str) -> Result<Vec<String>> {
329 let prefix = format!("components/{}/", tool);
330 let url = format!(
331 "{}/{}/?list-type=2&prefix={}",
332 self.config.endpoint_url(),
333 self.config.bucket_name,
334 prefix
335 );
336
337 let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
338 let authorization = self.create_auth_header("GET", &format!("?list-type=2&prefix={}", prefix), &[])?;
339
340 let response = self
341 .client
342 .get(&url)
343 .header(header::AUTHORIZATION, authorization)
344 .header("x-amz-date", date)
345 .header("x-amz-content-sha256", "UNSIGNED-PAYLOAD")
346 .send()
347 .await?;
348
349 if !response.status().is_success() {
350 let status = response.status();
351 let body = response.text().await.unwrap_or_default();
352 anyhow::bail!("R2 list failed: {} - {}", status, body);
353 }
354
355 let body = response.text().await?;
357 let mut components = Vec::new();
358
359 for line in body.lines() {
360 if line.contains("<Key>") {
361 let key = line.replace("<Key>", "").replace("</Key>", "").trim().to_string();
362 if let Some(name) = key.split('/').last() {
363 if let Some(component_name) = name.strip_suffix(".tsx") {
364 components.push(component_name.to_string());
365 }
366 }
367 }
368 }
369
370 Ok(components)
371 }
372
373 pub async fn sync_components(
375 &self,
376 tool: &str,
377 local_components: &[String],
378 on_download: impl Fn(&str),
379 on_upload: impl Fn(&str)
380 ) -> Result<()> {
381 let remote_components = self.list_components(tool).await?;
383
384 let (to_download, to_upload) = self.calculate_sync_actions(&remote_components, local_components);
386
387 for remote in to_download {
389 on_download(&remote);
390 }
391
392 for local in to_upload {
393 on_upload(&local);
394 }
395
396 Ok(())
397 }
398
399 #[cfg_attr(test, allow(dead_code))]
402 pub(crate) fn calculate_sync_actions(&self, remote_components: &[String], local_components: &[String]) -> (Vec<String>, Vec<String>) {
403 let mut to_download = Vec::new();
404 let mut to_upload = Vec::new();
405
406 for remote in remote_components {
407 if !local_components.contains(remote) {
408 to_download.push(remote.clone());
409 }
410 }
411
412 for local in local_components {
413 if !remote_components.contains(local) {
414 to_upload.push(local.clone());
415 }
416 }
417
418 (to_download, to_upload)
419 }
420
421 fn create_auth_header(&self, method: &str, key: &str, body: &[u8]) -> Result<String> {
423 use hmac::{Hmac, Mac};
427 use sha2::Sha256;
428
429 type HmacSha256 = Hmac<Sha256>;
430
431 let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
432 let date_short = &date[..8];
433
434 let body_hash = compute_sha256_hex(body);
435 let host = format!("{}.r2.cloudflarestorage.com", self.config.account_id);
436
437 let canonical_request = format!(
439 "{}\n/{}/{}\n\nhost:{}\nx-amz-content-sha256:{}\nx-amz-date:{}\n\nhost;x-amz-content-sha256;x-amz-date\n{}",
440 method,
441 self.config.bucket_name,
442 key,
443 host,
444 body_hash,
445 date,
446 body_hash
447 );
448
449 let canonical_request_hash = compute_sha256_hex(canonical_request.as_bytes());
450
451 let string_to_sign = format!(
453 "AWS4-HMAC-SHA256\n{}\n{}/auto/s3/aws4_request\n{}",
454 date, date_short, canonical_request_hash
455 );
456
457 let mut mac = HmacSha256::new_from_slice(
459 format!("AWS4{}", self.config.secret_access_key).as_bytes(),
460 )?;
461 mac.update(date_short.as_bytes());
462 let date_key = mac.finalize().into_bytes();
463
464 let mut mac = HmacSha256::new_from_slice(&date_key)?;
465 mac.update(b"auto");
466 let region_key = mac.finalize().into_bytes();
467
468 let mut mac = HmacSha256::new_from_slice(®ion_key)?;
469 mac.update(b"s3");
470 let service_key = mac.finalize().into_bytes();
471
472 let mut mac = HmacSha256::new_from_slice(&service_key)?;
473 mac.update(b"aws4_request");
474 let signing_key = mac.finalize().into_bytes();
475
476 let mut mac = HmacSha256::new_from_slice(&signing_key)?;
478 mac.update(string_to_sign.as_bytes());
479 let signature = hex::encode(mac.finalize().into_bytes());
480
481 Ok(format!(
482 "AWS4-HMAC-SHA256 Credential={}/{}/auto/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature={}",
483 self.config.access_key_id,
484 date_short,
485 signature
486 ))
487 }
488
489 pub async fn sync_up(
491 &self,
492 local_blobs: Vec<Blob>,
493 progress_callback: Option<impl Fn(usize, usize) + Send + Sync>,
494 ) -> Result<SyncResult> {
495 use futures::stream::{self, StreamExt};
496
497 tracing::info!("🔄 Starting R2 sync up: {} local blobs", local_blobs.len());
498
499 let mut uploaded = 0;
500 let mut skipped = 0;
501 let mut errors = Vec::new();
502 let total = local_blobs.len();
503
504 let mut to_upload = Vec::new();
506 for blob in local_blobs {
507 match self.blob_exists(&blob.hash()).await {
508 Ok(exists) => {
509 if exists {
510 skipped += 1;
511 } else {
512 to_upload.push(blob);
513 }
514 }
515 Err(e) => {
516 errors.push(format!("Failed to check blob {}: {}", blob.hash(), e));
517 to_upload.push(blob); }
519 }
520 }
521
522 let mut stream = stream::iter(to_upload.into_iter().enumerate())
524 .map(|(idx, blob)| async move {
525 let hash = blob.hash();
526 match self.upload_blob(&blob).await {
527 Ok(_) => Ok::<(usize, String), String>((idx, hash.to_string())),
528 Err(e) => Err(format!("Failed to upload {}: {}", hash, e)),
529 }
530 })
531 .buffer_unordered(10);
532
533 while let Some(result) = stream.next().await {
534 match result {
535 Ok((_idx, _hash)) => {
536 uploaded += 1;
537 if let Some(cb) = &progress_callback {
538 cb(uploaded + skipped, total);
539 }
540 }
541 Err(e) => {
542 errors.push(e);
543 }
544 }
545 }
546
547 tracing::info!(
548 "✅ Sync up complete: {} uploaded, {} skipped, {} errors",
549 uploaded,
550 skipped,
551 errors.len()
552 );
553
554 Ok(SyncResult {
555 uploaded,
556 downloaded: 0,
557 skipped,
558 errors,
559 })
560 }
561
562 pub async fn sync_down(
564 &self,
565 remote_hashes: Vec<String>,
566 progress_callback: Option<impl Fn(usize, usize) + Send + Sync>,
567 ) -> Result<Vec<Blob>> {
568 use futures::stream::{self, StreamExt};
569
570 tracing::info!("🔄 Starting R2 sync down: {} remote blobs", remote_hashes.len());
571
572 let total = remote_hashes.len();
573 let mut downloaded_blobs = Vec::new();
574
575 let mut stream = stream::iter(remote_hashes.into_iter().enumerate())
577 .map(|(idx, hash)| async move {
578 match self.download_blob(&hash).await {
579 Ok(blob) => Ok::<(usize, Blob), String>((idx, blob)),
580 Err(e) => Err(format!("Failed to download {}: {}", hash, e)),
581 }
582 })
583 .buffer_unordered(10);
584
585 let mut errors = Vec::new();
586 while let Some(result) = stream.next().await {
587 match result {
588 Ok((idx, blob)) => {
589 downloaded_blobs.push(blob);
590 if let Some(cb) = &progress_callback {
591 cb(idx + 1, total);
592 }
593 }
594 Err(e) => {
595 tracing::warn!("âš ï¸ {}", e);
596 errors.push(e);
597 }
598 }
599 }
600
601 tracing::info!(
602 "✅ Sync down complete: {} downloaded, {} errors", downloaded_blobs.len(),
603 errors.len()
604 );
605
606 Ok(downloaded_blobs)
607 }
608
609 pub async fn list_blobs(&self, _prefix: Option<&str>) -> Result<Vec<String>> {
611 tracing::warn!("R2 list_blobs not fully implemented - requires S3 ListObjects API");
614 Ok(Vec::new())
615 }
616}
617
618#[derive(Debug, Clone, Serialize, Deserialize)]
620pub struct SyncResult {
621 pub uploaded: usize,
622 pub downloaded: usize,
623 pub skipped: usize,
624 pub errors: Vec<String>,
625}
626
627fn compute_sha256_hex(data: &[u8]) -> String {
629 use sha2::Digest;
630 let mut hasher = Sha256::new();
631 hasher.update(data);
632 format!("{:x}", hasher.finalize())
633}
634
635pub async fn batch_upload_blobs(
637 storage: &R2Storage,
638 blobs: Vec<Blob>,
639 progress_callback: impl Fn(usize, usize),
640) -> Result<Vec<String>> {
641 use futures::stream::{self, StreamExt};
642
643 let total = blobs.len();
644 let mut keys = Vec::with_capacity(total);
645
646 let mut stream = stream::iter(blobs.into_iter().enumerate())
648 .map(|(idx, blob)| async move {
649 let key = storage.upload_blob(&blob).await?;
650 Ok::<(usize, String), anyhow::Error>((idx, key))
651 })
652 .buffer_unordered(10);
653
654 while let Some(result) = stream.next().await {
655 let (idx, key) = result?;
656 keys.push(key);
657 progress_callback(idx + 1, total);
658 }
659
660 Ok(keys)
661}
662
663#[cfg(test)]
664mod tests {
665 use super::*;
666
667 #[test]
668 fn test_r2_config() {
669 let config = R2Config {
670 account_id: "test-account".to_string(),
671 bucket_name: "forge-blobs".to_string(),
672 access_key_id: "test-key".to_string(),
673 secret_access_key: "test-secret".to_string(),
674 custom_domain: None,
675 };
676
677 assert!(config.endpoint_url().contains("test-account"));
678 assert!(config.endpoint_url().contains("r2.cloudflarestorage.com"));
679 }
680
681 #[test]
682 fn test_sync_calculation() {
683 let config = R2Config::default();
684 let storage = R2Storage::new(config).unwrap();
685
686 let remote = vec!["comp1.tsx".to_string(), "comp2.tsx".to_string()];
687 let local = vec!["comp2.tsx".to_string(), "comp3.tsx".to_string()];
688
689 let (download, upload) = storage.calculate_sync_actions(&remote, &local);
690
691 assert_eq!(download, vec!["comp1.tsx".to_string()]);
692 assert_eq!(upload, vec!["comp3.tsx".to_string()]);
693 }
694
695 #[test]
696 fn test_sync_empty() {
697 let config = R2Config::default();
698 let storage = R2Storage::new(config).unwrap();
699
700 let remote = vec![];
701 let local = vec![];
702
703 let (download, upload) = storage.calculate_sync_actions(&remote, &local);
704
705 assert!(download.is_empty());
706 assert!(upload.is_empty());
707 }
708}