1use anyhow::{Context, Result};
6use reqwest::{header, Client, StatusCode};
7use serde::{Deserialize, Serialize};
8use sha2::Sha256;
9use std::time::Duration;
10
11use super::blob::Blob;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct R2Config {
16 pub account_id: String,
18
19 pub bucket_name: String,
21
22 pub access_key_id: String,
24
25 pub secret_access_key: String,
27
28 pub custom_domain: Option<String>,
30}
31
32impl R2Config {
33 pub fn from_env() -> Result<Self> {
35 dotenvy::dotenv().ok();
36
37 let account_id = std::env::var("R2_ACCOUNT_ID").context("R2_ACCOUNT_ID not set in .env")?;
38 let bucket_name =
39 std::env::var("R2_BUCKET_NAME").context("R2_BUCKET_NAME not set in .env")?;
40 let access_key_id =
41 std::env::var("R2_ACCESS_KEY_ID").context("R2_ACCESS_KEY_ID not set in .env")?;
42 let secret_access_key = std::env::var("R2_SECRET_ACCESS_KEY")
43 .context("R2_SECRET_ACCESS_KEY not set in .env")?;
44 let custom_domain = std::env::var("R2_CUSTOM_DOMAIN").ok();
45
46 Ok(Self {
47 account_id,
48 bucket_name,
49 access_key_id,
50 secret_access_key,
51 custom_domain,
52 })
53 }
54
55 pub fn endpoint_url(&self) -> String {
57 if let Some(domain) = &self.custom_domain {
58 format!("https://{}", domain)
59 } else {
60 format!("https://{}.r2.cloudflarestorage.com", self.account_id)
61 }
62 }
63}
64
65pub struct R2Storage {
67 config: R2Config,
68 client: Client,
69}
70
71impl R2Storage {
72 pub fn new(config: R2Config) -> Result<Self> {
74 let client = Client::builder().timeout(Duration::from_secs(30)).build()?;
75
76 Ok(Self { config, client })
77 }
78
79 pub async fn upload_blob(&self, blob: &Blob) -> Result<String> {
81 let hash = blob.hash();
82 let key = format!("blobs/{}/{}", &hash[..2], &hash[2..]);
83
84 let binary = blob.to_binary()?;
85 let content_hash = compute_sha256_hex(&binary);
86 let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
87
88 let url = format!(
89 "{}/{}/{}",
90 self.config.endpoint_url(),
91 self.config.bucket_name,
92 key
93 );
94
95 let authorization = self.create_auth_header("PUT", &key, &binary)?;
97
98 let response = self
99 .client
100 .put(&url)
101 .header(header::AUTHORIZATION, authorization)
102 .header(header::CONTENT_TYPE, "application/octet-stream")
103 .header("x-amz-content-sha256", content_hash)
104 .header("x-amz-date", date)
105 .body(binary)
106 .send()
107 .await?;
108
109 if !response.status().is_success() {
110 let status = response.status();
111 let body = response.text().await.unwrap_or_default();
112 anyhow::bail!("R2 upload failed: {} - {}", status, body);
113 }
114
115 Ok(key)
116 }
117
118 pub async fn download_blob(&self, hash: &str) -> Result<Blob> {
120 let key = format!("blobs/{}/{}", &hash[..2], &hash[2..]);
121 let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
122
123 let url = format!(
124 "{}/{}/{}",
125 self.config.endpoint_url(),
126 self.config.bucket_name,
127 key
128 );
129
130 let authorization = self.create_auth_header("GET", &key, &[])?;
131
132 let response = self
133 .client
134 .get(&url)
135 .header(header::AUTHORIZATION, authorization)
136 .header("x-amz-date", date)
137 .header("x-amz-content-sha256", "UNSIGNED-PAYLOAD")
138 .send()
139 .await?;
140
141 if response.status() == StatusCode::NOT_FOUND {
142 anyhow::bail!("Blob not found: {}", hash);
143 }
144
145 if !response.status().is_success() {
146 let status = response.status();
147 let body = response.text().await.unwrap_or_default();
148 anyhow::bail!("R2 download failed: {} - {}", status, body);
149 }
150
151 let binary = response.bytes().await?;
152 Blob::from_binary(&binary)
153 }
154
155 pub async fn blob_exists(&self, hash: &str) -> Result<bool> {
157 let key = format!("blobs/{}/{}", &hash[..2], &hash[2..]);
158 let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
159
160 let url = format!(
161 "{}/{}/{}",
162 self.config.endpoint_url(),
163 self.config.bucket_name,
164 key
165 );
166
167 let authorization = self.create_auth_header("HEAD", &key, &[])?;
168
169 let response = self
170 .client
171 .head(&url)
172 .header(header::AUTHORIZATION, authorization)
173 .header("x-amz-date", date)
174 .send()
175 .await?;
176
177 Ok(response.status().is_success())
178 }
179
180 pub async fn delete_blob(&self, hash: &str) -> Result<()> {
182 let key = format!("blobs/{}/{}", &hash[..2], &hash[2..]);
183 let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
184
185 let url = format!(
186 "{}/{}/{}",
187 self.config.endpoint_url(),
188 self.config.bucket_name,
189 key
190 );
191
192 let authorization = self.create_auth_header("DELETE", &key, &[])?;
193
194 let response = self
195 .client
196 .delete(&url)
197 .header(header::AUTHORIZATION, authorization)
198 .header("x-amz-date", date)
199 .send()
200 .await?;
201
202 if !response.status().is_success() {
203 let status = response.status();
204 let body = response.text().await.unwrap_or_default();
205 anyhow::bail!("R2 delete failed: {} - {}", status, body);
206 }
207
208 Ok(())
209 }
210
211 fn create_auth_header(&self, method: &str, key: &str, body: &[u8]) -> Result<String> {
213 use hmac::{Hmac, Mac};
217
218 type HmacSha256 = Hmac<Sha256>;
219
220 let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
221 let date_short = &date[..8];
222
223 let body_hash = compute_sha256_hex(body);
224 let host = format!("{}.r2.cloudflarestorage.com", self.config.account_id);
225
226 let canonical_request = format!(
228 "{}\n/{}/{}\n\nhost:{}\nx-amz-content-sha256:{}\nx-amz-date:{}\n\nhost;x-amz-content-sha256;x-amz-date\n{}",
229 method,
230 self.config.bucket_name,
231 key,
232 host,
233 body_hash,
234 date,
235 body_hash
236 );
237
238 let canonical_request_hash = compute_sha256_hex(canonical_request.as_bytes());
239
240 let string_to_sign = format!(
242 "AWS4-HMAC-SHA256\n{}\n{}/auto/s3/aws4_request\n{}",
243 date, date_short, canonical_request_hash
244 );
245
246 let mut mac = HmacSha256::new_from_slice(
248 format!("AWS4{}", self.config.secret_access_key).as_bytes(),
249 )?;
250 mac.update(date_short.as_bytes());
251 let date_key = mac.finalize().into_bytes();
252
253 let mut mac = HmacSha256::new_from_slice(&date_key)?;
254 mac.update(b"auto");
255 let region_key = mac.finalize().into_bytes();
256
257 let mut mac = HmacSha256::new_from_slice(®ion_key)?;
258 mac.update(b"s3");
259 let service_key = mac.finalize().into_bytes();
260
261 let mut mac = HmacSha256::new_from_slice(&service_key)?;
262 mac.update(b"aws4_request");
263 let signing_key = mac.finalize().into_bytes();
264
265 let mut mac = HmacSha256::new_from_slice(&signing_key)?;
267 mac.update(string_to_sign.as_bytes());
268 let signature = hex::encode(mac.finalize().into_bytes());
269
270 Ok(format!(
271 "AWS4-HMAC-SHA256 Credential={}/{}/auto/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature={}",
272 self.config.access_key_id,
273 date_short,
274 signature
275 ))
276 }
277}
278
279fn compute_sha256_hex(data: &[u8]) -> String {
281 use sha2::Digest;
282 let mut hasher = Sha256::new();
283 hasher.update(data);
284 format!("{:x}", hasher.finalize())
285}
286
287pub async fn batch_upload_blobs(
289 storage: &R2Storage,
290 blobs: Vec<Blob>,
291 progress_callback: impl Fn(usize, usize),
292) -> Result<Vec<String>> {
293 use futures::stream::{self, StreamExt};
294
295 let total = blobs.len();
296 let mut keys = Vec::with_capacity(total);
297
298 let mut stream = stream::iter(blobs.into_iter().enumerate())
300 .map(|(idx, blob)| async move {
301 let key = storage.upload_blob(&blob).await?;
302 Ok::<(usize, String), anyhow::Error>((idx, key))
303 })
304 .buffer_unordered(10);
305
306 while let Some(result) = stream.next().await {
307 let (idx, key) = result?;
308 keys.push(key);
309 progress_callback(idx + 1, total);
310 }
311
312 Ok(keys)
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318
319 #[test]
320 fn test_r2_config() {
321 let config = R2Config {
323 account_id: "test-account".to_string(),
324 bucket_name: "forge-blobs".to_string(),
325 access_key_id: "test-key".to_string(),
326 secret_access_key: "test-secret".to_string(),
327 custom_domain: None,
328 };
329
330 assert!(config.endpoint_url().contains("test-account"));
331 assert!(config.endpoint_url().contains("r2.cloudflarestorage.com"));
332 }
333}