1use crate::Error;
26use async_trait::async_trait;
27use std::collections::VecDeque;
28use std::time::Duration;
29use tokio::sync::Mutex;
30use tokio::time::Instant;
31
32const DO_CDN_API_BASE: &str = "https://api.digitalocean.com";
33const BATCH_SIZE: usize = 50;
34const RATE_LIMIT_WINDOW: Duration = Duration::from_secs(10);
35const RATE_LIMIT_MAX: usize = 5;
36
37#[async_trait]
42pub trait PurgeApi: Send + Sync {
43 async fn purge(&self, paths: &[String]) -> Result<(), Error>;
48}
49
50#[derive(Clone)]
58pub struct DoSpacesCdnConfig {
59 pub endpoint_id: Option<String>,
61 pub api_token: String,
63 pub(crate) api_base: Option<String>,
65}
66
67impl std::fmt::Debug for DoSpacesCdnConfig {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("DoSpacesCdnConfig")
70 .field("endpoint_id", &self.endpoint_id)
71 .field("api_token", &"<redacted>")
72 .field("api_base", &self.api_base)
73 .finish()
74 }
75}
76
77impl DoSpacesCdnConfig {
78 pub fn from_env() -> Self {
83 Self {
84 endpoint_id: std::env::var("DO_SPACES_CDN_ID").ok(),
85 api_token: std::env::var("DIGITALOCEAN_ACCESS_TOKEN").unwrap_or_default(),
86 api_base: None,
87 }
88 }
89}
90
91pub struct DoSpacesCdn {
96 config: DoSpacesCdnConfig,
97 client: reqwest::Client,
98 request_times: Mutex<VecDeque<Instant>>,
99}
100
101impl DoSpacesCdn {
102 pub fn new(config: DoSpacesCdnConfig) -> Self {
104 Self {
105 config,
106 client: reqwest::Client::new(),
107 request_times: Mutex::new(VecDeque::new()),
108 }
109 }
110
111 fn api_base(&self) -> &str {
112 self.config.api_base.as_deref().unwrap_or(DO_CDN_API_BASE)
113 }
114
115 async fn throttle(&self) {
121 loop {
122 let mut times = self.request_times.lock().await;
123 let now = Instant::now();
124 while times
126 .front()
127 .map(|t| now.duration_since(*t) >= RATE_LIMIT_WINDOW)
128 .unwrap_or(false)
129 {
130 times.pop_front();
131 }
132 if times.len() < RATE_LIMIT_MAX {
133 times.push_back(Instant::now());
135 return;
136 }
137 let oldest = *times.front().unwrap();
139 let sleep_for = RATE_LIMIT_WINDOW - now.duration_since(oldest);
140 drop(times);
141 tokio::time::sleep(sleep_for).await;
142 }
144 }
145}
146
147#[async_trait]
148impl PurgeApi for DoSpacesCdn {
149 async fn purge(&self, paths: &[String]) -> Result<(), Error> {
150 if paths.is_empty() {
151 return Ok(());
152 }
153 let Some(id) = &self.config.endpoint_id else {
154 tracing::info!("DO_SPACES_CDN_ID not set — CDN purge is a no-op");
155 return Ok(());
156 };
157 if self.config.api_token.is_empty() {
158 return Err(Error::cdn(
159 "DIGITALOCEAN_ACCESS_TOKEN not set — cannot purge CDN cache",
160 ));
161 }
162 let url = format!("{}/v2/cdn/endpoints/{}/cache", self.api_base(), id);
163 let mut batches = 0usize;
164 for chunk in paths.chunks(BATCH_SIZE) {
165 self.throttle().await;
166 let resp = self
167 .client
168 .delete(&url)
169 .bearer_auth(&self.config.api_token)
170 .json(&serde_json::json!({ "files": chunk }))
171 .send()
172 .await
173 .map_err(|e| Error::cdn(e.to_string()))?;
174 if resp.status().as_u16() != 204 {
175 let status = resp.status().as_u16();
176 let body = resp.text().await.unwrap_or_default();
177 return Err(Error::cdn(format!("DO CDN purge status {status}: {body}")));
178 }
179 batches += 1;
180 }
181 tracing::info!("purged {} paths in {} request(s)", paths.len(), batches);
182 Ok(())
183 }
184}
185
186#[cfg(feature = "cdn-bunny")]
187pub mod bunny;
188#[cfg(feature = "cdn-bunny")]
189pub use bunny::{BunnyCdn, BunnyCdnConfig};
190
191#[cfg(feature = "cdn-cloudflare")]
192pub mod cloudflare;
193#[cfg(feature = "cdn-cloudflare")]
194pub use cloudflare::{CloudflareCdn, CloudflareCdnConfig};
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use wiremock::matchers::{body_json, header, method, path_regex};
200 use wiremock::{Mock, MockServer, ResponseTemplate};
201
202 fn cfg(server_uri: &str, id: Option<&str>, token: &str) -> DoSpacesCdnConfig {
204 DoSpacesCdnConfig {
205 endpoint_id: id.map(String::from),
206 api_token: token.to_string(),
207 api_base: Some(server_uri.to_string()),
208 }
209 }
210
211 #[test]
214 fn debug_does_not_contain_token() {
215 let config = DoSpacesCdnConfig {
216 endpoint_id: Some("ep-123".into()),
217 api_token: "secret-token-abc".into(),
218 api_base: None,
219 };
220 let dbg = format!("{config:?}");
221 assert!(
222 !dbg.contains("secret-token-abc"),
223 "Debug output must not contain the token: {dbg}"
224 );
225 assert!(
226 dbg.contains("<redacted>"),
227 "Debug output must show <redacted>: {dbg}"
228 );
229 }
230
231 #[tokio::test]
235 async fn do_adapter_request_shape() {
236 let server = MockServer::start().await;
237 Mock::given(method("DELETE"))
238 .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
239 .and(header("Authorization", "Bearer test-token"))
240 .and(body_json(serde_json::json!({ "files": ["index.html"] })))
241 .respond_with(ResponseTemplate::new(204))
242 .expect(1)
243 .mount(&server)
244 .await;
245
246 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
247 purger.purge(&["index.html".to_string()]).await.unwrap();
248 }
250
251 #[tokio::test]
253 async fn do_adapter_batches_over_50() {
254 let server = MockServer::start().await;
255 Mock::given(method("DELETE"))
256 .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
257 .and(header("Authorization", "Bearer test-token"))
258 .respond_with(ResponseTemplate::new(204))
259 .expect(2)
260 .mount(&server)
261 .await;
262
263 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
264 let paths: Vec<String> = (0..55).map(|i| format!("file{i}.html")).collect();
265 purger.purge(&paths).await.unwrap();
266 }
267
268 #[tokio::test]
271 async fn do_adapter_wildcard_slot() {
272 let server = MockServer::start().await;
273 Mock::given(method("DELETE"))
274 .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
275 .and(header("Authorization", "Bearer test-token"))
276 .respond_with(ResponseTemplate::new(204))
277 .expect(2)
278 .mount(&server)
279 .await;
280
281 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
282 let mut paths: Vec<String> = (0..50).map(|i| format!("file{i}.html")).collect();
283 paths.push("dir/*".to_string()); purger.purge(&paths).await.unwrap();
285 }
286
287 #[tokio::test]
289 async fn do_adapter_noop_missing_id() {
290 let server = MockServer::start().await;
291 Mock::given(method("DELETE"))
293 .respond_with(ResponseTemplate::new(204))
294 .expect(0)
295 .mount(&server)
296 .await;
297
298 let purger = DoSpacesCdn::new(cfg(&server.uri(), None, "test-token"));
299 purger.purge(&["a".to_string()]).await.unwrap();
300 }
301
302 #[tokio::test]
304 async fn purge_empty_noop() {
305 let server = MockServer::start().await;
306 Mock::given(method("DELETE"))
307 .respond_with(ResponseTemplate::new(204))
308 .expect(0)
309 .mount(&server)
310 .await;
311
312 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
313 purger.purge(&[]).await.unwrap();
314 }
315
316 #[tokio::test]
318 async fn do_adapter_error_on_non_204() {
319 let server = MockServer::start().await;
320 Mock::given(method("DELETE"))
321 .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
322 .respond_with(ResponseTemplate::new(403))
323 .mount(&server)
324 .await;
325
326 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
327 let err = purger.purge(&["index.html".to_string()]).await.unwrap_err();
328 assert!(
329 err.to_string().contains("403"),
330 "Error must contain status 403: {err}"
331 );
332 }
333
334 #[tokio::test]
337 async fn do_adapter_missing_token_errors() {
338 let server = MockServer::start().await;
339 Mock::given(method("DELETE"))
340 .respond_with(ResponseTemplate::new(204))
341 .expect(0)
342 .mount(&server)
343 .await;
344
345 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), ""));
346 let err = purger.purge(&["index.html".to_string()]).await.unwrap_err();
347 assert!(
348 err.to_string().contains("DIGITALOCEAN_ACCESS_TOKEN"),
349 "Error must mention the token env var: {err}"
350 );
351 }
352
353 #[tokio::test]
360 async fn do_adapter_throttle_serializes() {
361 let server = MockServer::start().await;
362 Mock::given(method("DELETE"))
363 .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
364 .and(header("Authorization", "Bearer test-token"))
365 .respond_with(ResponseTemplate::new(204))
366 .expect(6)
367 .mount(&server)
368 .await;
369
370 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
371 let paths: Vec<String> = (0..300).map(|i| format!("file{i}.html")).collect();
373
374 let start = std::time::Instant::now();
375 purger.purge(&paths).await.unwrap();
376 let elapsed = start.elapsed();
377
378 assert!(
379 elapsed >= Duration::from_secs(9),
380 "Throttle should have held the 6th request for ~10s; elapsed: {elapsed:?}"
381 );
382 }
383}