1use crate::env_helpers::env_with_fallback;
26use crate::Error;
27use async_trait::async_trait;
28use std::collections::VecDeque;
29use std::time::Duration;
30use tokio::sync::Mutex;
31use tokio::time::Instant;
32
33const DO_CDN_API_BASE: &str = "https://api.digitalocean.com";
34const BATCH_SIZE: usize = 50;
35const RATE_LIMIT_WINDOW: Duration = Duration::from_secs(10);
36const RATE_LIMIT_MAX: usize = 5;
37
38#[async_trait]
43pub trait PurgeApi: Send + Sync {
44 async fn purge(&self, paths: &[String]) -> Result<(), Error>;
49}
50
51#[derive(Clone)]
59pub struct DoSpacesCdnConfig {
60 pub endpoint_id: Option<String>,
62 pub api_token: String,
64 pub(crate) api_base: Option<String>,
66}
67
68impl std::fmt::Debug for DoSpacesCdnConfig {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 f.debug_struct("DoSpacesCdnConfig")
71 .field("endpoint_id", &self.endpoint_id)
72 .field("api_token", &"<redacted>")
73 .field("api_base", &self.api_base)
74 .finish()
75 }
76}
77
78impl DoSpacesCdnConfig {
79 pub fn from_env() -> Self {
84 Self {
85 endpoint_id: std::env::var("DO_SPACES_CDN_ID").ok(),
86 api_token: std::env::var("DIGITALOCEAN_ACCESS_TOKEN").unwrap_or_default(),
87 api_base: None,
88 }
89 }
90}
91
92pub struct DoSpacesCdn {
97 config: DoSpacesCdnConfig,
98 client: reqwest::Client,
99 request_times: Mutex<VecDeque<Instant>>,
100}
101
102impl DoSpacesCdn {
103 pub fn new(config: DoSpacesCdnConfig) -> Self {
105 Self {
106 config,
107 client: reqwest::Client::new(),
108 request_times: Mutex::new(VecDeque::new()),
109 }
110 }
111
112 fn api_base(&self) -> &str {
113 self.config.api_base.as_deref().unwrap_or(DO_CDN_API_BASE)
114 }
115
116 async fn throttle(&self) {
122 loop {
123 let mut times = self.request_times.lock().await;
124 let now = Instant::now();
125 while times
127 .front()
128 .map(|t| now.duration_since(*t) >= RATE_LIMIT_WINDOW)
129 .unwrap_or(false)
130 {
131 times.pop_front();
132 }
133 if times.len() < RATE_LIMIT_MAX {
134 times.push_back(Instant::now());
136 return;
137 }
138 let oldest = *times.front().unwrap();
140 let sleep_for = RATE_LIMIT_WINDOW - now.duration_since(oldest);
141 drop(times);
142 tokio::time::sleep(sleep_for).await;
143 }
145 }
146}
147
148#[async_trait]
149impl PurgeApi for DoSpacesCdn {
150 async fn purge(&self, paths: &[String]) -> Result<(), Error> {
151 if paths.is_empty() {
152 return Ok(());
153 }
154 let Some(id) = &self.config.endpoint_id else {
155 tracing::info!("DO_SPACES_CDN_ID not set — CDN purge is a no-op");
156 return Ok(());
157 };
158 if self.config.api_token.is_empty() {
159 return Err(Error::cdn(
160 "DIGITALOCEAN_ACCESS_TOKEN not set — cannot purge CDN cache",
161 ));
162 }
163 let url = format!("{}/v2/cdn/endpoints/{}/cache", self.api_base(), id);
164 let mut batches = 0usize;
165 for chunk in paths.chunks(BATCH_SIZE) {
166 self.throttle().await;
167 let resp = self
168 .client
169 .delete(&url)
170 .bearer_auth(&self.config.api_token)
171 .json(&serde_json::json!({ "files": chunk }))
172 .send()
173 .await
174 .map_err(|e| Error::cdn(e.to_string()))?;
175 if resp.status().as_u16() != 204 {
176 let status = resp.status().as_u16();
177 let body = resp.text().await.unwrap_or_default();
178 return Err(Error::cdn(format!("DO CDN purge status {status}: {body}")));
179 }
180 batches += 1;
181 }
182 tracing::info!("purged {} paths in {} request(s)", paths.len(), batches);
183 Ok(())
184 }
185}
186
187#[derive(Debug, Clone, PartialEq, Eq, Default)]
189pub enum CdnProvider {
190 #[default]
192 None,
193 DigitalOcean,
195 Bunny,
197 Cloudflare,
199}
200
201impl CdnProvider {
202 pub fn from_str_ci(s: &str) -> Result<Self, Error> {
204 match s.trim().to_ascii_lowercase().as_str() {
205 "none" => Ok(Self::None),
206 "digitalocean" => Ok(Self::DigitalOcean),
207 "bunny" => Ok(Self::Bunny),
208 "cloudflare" => Ok(Self::Cloudflare),
209 other => Err(Error::cdn_invalid_provider(other)),
210 }
211 }
212}
213
214#[derive(Clone, Default)]
220pub struct Config {
221 pub url: Option<String>,
223 pub provider: CdnProvider,
225 pub purge_token: Option<String>,
227 pub purge_zone: Option<String>,
229 pub provider_error: Option<String>,
232}
233
234impl std::fmt::Debug for Config {
235 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236 f.debug_struct("cdn::Config")
237 .field("url", &self.url)
238 .field("provider", &self.provider)
239 .field("purge_token", &"<redacted>")
240 .field("purge_zone", &self.purge_zone)
241 .field("provider_error", &self.provider_error)
242 .finish()
243 }
244}
245
246impl Config {
247 pub fn from_env() -> Self {
257 let url = env_with_fallback("CDN_URL", &["AWS_CDN_URL", "CF_CDN_URL", "BUNNY_CDN_URL"]);
259
260 let mut provider_error: Option<String> = None;
262 let provider = if let Ok(val) = std::env::var("CDN_PROVIDER") {
263 match CdnProvider::from_str_ci(&val) {
264 Ok(p) => p,
265 Err(e) => {
266 tracing::error!("{e}; set CDN_PROVIDER to a valid value to enable purging");
267 provider_error = Some(val);
268 CdnProvider::None
269 }
270 }
271 } else if std::env::var("DO_SPACES_CDN_ID").is_ok() {
272 tracing::warn!("CDN_PROVIDER unset; inferred digitalocean from DO_SPACES_CDN_ID. Set CDN_PROVIDER=digitalocean to silence.");
273 CdnProvider::DigitalOcean
274 } else if std::env::var("CF_ZONE_ID").is_ok() {
275 tracing::warn!("CDN_PROVIDER unset; inferred cloudflare from CF_ZONE_ID. Set CDN_PROVIDER=cloudflare to silence.");
276 CdnProvider::Cloudflare
277 } else if std::env::var("BUNNY_CDN_URL").is_ok()
278 || std::env::var("BUNNY_ACCESS_KEY").is_ok()
279 {
280 tracing::warn!("CDN_PROVIDER unset; inferred bunny from BUNNY_CDN_URL/BUNNY_ACCESS_KEY. Set CDN_PROVIDER=bunny to silence.");
281 CdnProvider::Bunny
282 } else {
283 CdnProvider::None
284 };
285
286 let (purge_token, purge_zone) = match &provider {
289 CdnProvider::DigitalOcean => (
290 env_with_fallback("CDN_PURGE_TOKEN", &["DIGITALOCEAN_ACCESS_TOKEN"]),
291 env_with_fallback("CDN_PURGE_ZONE", &["DO_SPACES_CDN_ID"]),
292 ),
293 CdnProvider::Cloudflare => (
294 env_with_fallback("CDN_PURGE_TOKEN", &["CF_API_TOKEN"]),
295 env_with_fallback("CDN_PURGE_ZONE", &["CF_ZONE_ID"]),
296 ),
297 CdnProvider::Bunny => (
298 env_with_fallback("CDN_PURGE_TOKEN", &["BUNNY_ACCESS_KEY"]),
299 None,
300 ),
301 CdnProvider::None => (
302 env_with_fallback("CDN_PURGE_TOKEN", &[]),
303 env_with_fallback("CDN_PURGE_ZONE", &[]),
304 ),
305 };
306
307 Self {
308 url,
309 provider,
310 purge_token,
311 purge_zone,
312 provider_error,
313 }
314 }
315
316 pub fn build_purge_api(&self) -> Result<Option<Box<dyn PurgeApi>>, Error> {
322 if let Some(bad) = &self.provider_error {
324 return Err(Error::cdn_invalid_provider(bad));
325 }
326
327 match &self.provider {
328 CdnProvider::None => {
329 tracing::info!("CDN_PROVIDER=none — purge is a no-op");
330 Ok(None)
331 }
332 CdnProvider::DigitalOcean => {
333 let cfg = DoSpacesCdnConfig {
334 endpoint_id: self.purge_zone.clone(),
335 api_token: self.purge_token.clone().unwrap_or_default(),
336 api_base: None,
337 };
338 Ok(Some(Box::new(DoSpacesCdn::new(cfg))))
339 }
340 CdnProvider::Bunny => {
341 #[cfg(feature = "cdn-bunny")]
342 {
343 let cfg = BunnyCdnConfig {
344 cdn_base_url: self.url.clone().unwrap_or_default(),
345 access_key: self.purge_token.clone().unwrap_or_default(),
346 };
347 Ok(Some(Box::new(BunnyCdn::new(cfg))))
348 }
349 #[cfg(not(feature = "cdn-bunny"))]
350 {
351 Err(Error::cdn_feature_required("bunny", "cdn-bunny"))
352 }
353 }
354 CdnProvider::Cloudflare => {
355 #[cfg(feature = "cdn-cloudflare")]
356 {
357 let cfg = CloudflareCdnConfig {
358 zone_id: self.purge_zone.clone().unwrap_or_default(),
359 api_token: self.purge_token.clone().unwrap_or_default(),
360 cdn_base_url: self.url.clone().unwrap_or_default(),
361 };
362 Ok(Some(Box::new(CloudflareCdn::new(cfg))))
363 }
364 #[cfg(not(feature = "cdn-cloudflare"))]
365 {
366 Err(Error::cdn_feature_required("cloudflare", "cdn-cloudflare"))
367 }
368 }
369 }
370 }
371}
372
373#[cfg(feature = "cdn-bunny")]
374pub mod bunny;
375#[cfg(feature = "cdn-bunny")]
376pub use bunny::{BunnyCdn, BunnyCdnConfig};
377
378#[cfg(feature = "cdn-cloudflare")]
379pub mod cloudflare;
380#[cfg(feature = "cdn-cloudflare")]
381pub use cloudflare::{CloudflareCdn, CloudflareCdnConfig};
382
383#[cfg(test)]
384mod tests {
385 use super::*;
386 use wiremock::matchers::{body_json, header, method, path_regex};
387 use wiremock::{Mock, MockServer, ResponseTemplate};
388
389 fn cfg(server_uri: &str, id: Option<&str>, token: &str) -> DoSpacesCdnConfig {
391 DoSpacesCdnConfig {
392 endpoint_id: id.map(String::from),
393 api_token: token.to_string(),
394 api_base: Some(server_uri.to_string()),
395 }
396 }
397
398 #[test]
401 fn debug_does_not_contain_token() {
402 let config = DoSpacesCdnConfig {
403 endpoint_id: Some("ep-123".into()),
404 api_token: "secret-token-abc".into(),
405 api_base: None,
406 };
407 let dbg = format!("{config:?}");
408 assert!(
409 !dbg.contains("secret-token-abc"),
410 "Debug output must not contain the token: {dbg}"
411 );
412 assert!(
413 dbg.contains("<redacted>"),
414 "Debug output must show <redacted>: {dbg}"
415 );
416 }
417
418 #[tokio::test]
422 async fn do_adapter_request_shape() {
423 let server = MockServer::start().await;
424 Mock::given(method("DELETE"))
425 .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
426 .and(header("Authorization", "Bearer test-token"))
427 .and(body_json(serde_json::json!({ "files": ["index.html"] })))
428 .respond_with(ResponseTemplate::new(204))
429 .expect(1)
430 .mount(&server)
431 .await;
432
433 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
434 purger.purge(&["index.html".to_string()]).await.unwrap();
435 }
437
438 #[tokio::test]
440 async fn do_adapter_batches_over_50() {
441 let server = MockServer::start().await;
442 Mock::given(method("DELETE"))
443 .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
444 .and(header("Authorization", "Bearer test-token"))
445 .respond_with(ResponseTemplate::new(204))
446 .expect(2)
447 .mount(&server)
448 .await;
449
450 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
451 let paths: Vec<String> = (0..55).map(|i| format!("file{i}.html")).collect();
452 purger.purge(&paths).await.unwrap();
453 }
454
455 #[tokio::test]
458 async fn do_adapter_wildcard_slot() {
459 let server = MockServer::start().await;
460 Mock::given(method("DELETE"))
461 .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
462 .and(header("Authorization", "Bearer test-token"))
463 .respond_with(ResponseTemplate::new(204))
464 .expect(2)
465 .mount(&server)
466 .await;
467
468 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
469 let mut paths: Vec<String> = (0..50).map(|i| format!("file{i}.html")).collect();
470 paths.push("dir/*".to_string()); purger.purge(&paths).await.unwrap();
472 }
473
474 #[tokio::test]
476 async fn do_adapter_noop_missing_id() {
477 let server = MockServer::start().await;
478 Mock::given(method("DELETE"))
480 .respond_with(ResponseTemplate::new(204))
481 .expect(0)
482 .mount(&server)
483 .await;
484
485 let purger = DoSpacesCdn::new(cfg(&server.uri(), None, "test-token"));
486 purger.purge(&["a".to_string()]).await.unwrap();
487 }
488
489 #[tokio::test]
491 async fn purge_empty_noop() {
492 let server = MockServer::start().await;
493 Mock::given(method("DELETE"))
494 .respond_with(ResponseTemplate::new(204))
495 .expect(0)
496 .mount(&server)
497 .await;
498
499 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
500 purger.purge(&[]).await.unwrap();
501 }
502
503 #[tokio::test]
505 async fn do_adapter_error_on_non_204() {
506 let server = MockServer::start().await;
507 Mock::given(method("DELETE"))
508 .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
509 .respond_with(ResponseTemplate::new(403))
510 .mount(&server)
511 .await;
512
513 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
514 let err = purger.purge(&["index.html".to_string()]).await.unwrap_err();
515 assert!(
516 err.to_string().contains("403"),
517 "Error must contain status 403: {err}"
518 );
519 }
520
521 #[test]
524 fn cdn_config_debug_redacts_token() {
525 let config = Config {
526 purge_token: Some("secret-xyz".into()),
527 ..Default::default()
528 };
529 let dbg = format!("{config:?}");
530 assert!(
531 !dbg.contains("secret-xyz"),
532 "Debug output must not contain the token: {dbg}"
533 );
534 assert!(
535 dbg.contains("<redacted>"),
536 "Debug output must show <redacted>: {dbg}"
537 );
538 }
539
540 #[test]
541 fn cdn_invalid_provider() {
542 let result = CdnProvider::from_str_ci("Fastly");
543 assert!(result.is_err(), "Expected Err for unknown provider");
544 let msg = result.unwrap_err().to_string();
545 assert!(
546 msg.contains("none, digitalocean, bunny, cloudflare"),
547 "Error must list valid values: {msg}"
548 );
549 }
550
551 #[test]
554 fn cdn_invalid_provider_from_env_errors() {
555 let config = Config {
556 provider_error: Some("Fastly".into()),
557 ..Default::default()
558 };
559 let result = config.build_purge_api();
560 let err = match result {
561 Err(e) => e,
562 Ok(_) => panic!("build_purge_api must return Err when provider_error is set"),
563 };
564 let msg = err.to_string();
565 assert!(
566 msg.contains("none, digitalocean, bunny, cloudflare"),
567 "Error must list valid values: {msg}"
568 );
569 }
570
571 #[test]
572 fn cdn_provider_none_no_op() {
573 let config = Config::default();
574 assert!(
575 matches!(config.build_purge_api(), Ok(None)),
576 "provider=None must return Ok(None)"
577 );
578 }
579
580 #[test]
581 #[serial_test::serial]
582 fn cdn_config_from_env_quartet() {
583 std::env::set_var("CDN_URL", "https://q.example.com");
584 std::env::set_var("CDN_PROVIDER", "digitalocean");
585 std::env::set_var("CDN_PURGE_TOKEN", "tok");
586 std::env::set_var("CDN_PURGE_ZONE", "ep-9");
587
588 let config = Config::from_env();
589
590 std::env::remove_var("CDN_URL");
591 std::env::remove_var("CDN_PROVIDER");
592 std::env::remove_var("CDN_PURGE_TOKEN");
593 std::env::remove_var("CDN_PURGE_ZONE");
594
595 assert_eq!(config.url, Some("https://q.example.com".into()));
596 assert_eq!(config.provider, CdnProvider::DigitalOcean);
597 assert_eq!(config.purge_token, Some("tok".into()));
598 assert_eq!(config.purge_zone, Some("ep-9".into()));
599 }
600
601 #[test]
602 #[serial_test::serial]
603 fn cdn_fallback_aws_cdn_url() {
604 std::env::remove_var("CDN_URL");
605 std::env::set_var("AWS_CDN_URL", "https://legacy");
606
607 let config = Config::from_env();
608
609 std::env::remove_var("AWS_CDN_URL");
610
611 assert_eq!(config.url, Some("https://legacy".into()));
612 }
613
614 #[test]
615 #[serial_test::serial]
616 fn cdn_fallback_do_zone_and_token() {
617 std::env::remove_var("CDN_PURGE_ZONE");
618 std::env::remove_var("CDN_PURGE_TOKEN");
619 std::env::remove_var("CDN_PROVIDER");
620 std::env::set_var("DO_SPACES_CDN_ID", "ep-1");
621 std::env::set_var("DIGITALOCEAN_ACCESS_TOKEN", "t");
622
623 let config = Config::from_env();
624
625 std::env::remove_var("DO_SPACES_CDN_ID");
626 std::env::remove_var("DIGITALOCEAN_ACCESS_TOKEN");
627
628 assert_eq!(config.purge_zone, Some("ep-1".into()));
629 assert_eq!(config.purge_token, Some("t".into()));
630 assert_eq!(config.provider, CdnProvider::DigitalOcean);
631 }
632
633 #[cfg(not(feature = "cdn-bunny"))]
634 #[test]
635 fn cdn_feature_required_bunny() {
636 let config = Config {
637 provider: CdnProvider::Bunny,
638 ..Default::default()
639 };
640 let result = config.build_purge_api();
641 assert!(
642 result.is_err(),
643 "Expected Err when cdn-bunny feature is off"
644 );
645 let msg = match result {
646 Err(e) => e.to_string(),
647 Ok(_) => unreachable!(),
648 };
649 assert!(
650 msg.contains("cdn-bunny"),
651 "Error must mention the cdn-bunny feature: {msg}"
652 );
653 }
654
655 #[tokio::test]
658 async fn do_adapter_missing_token_errors() {
659 let server = MockServer::start().await;
660 Mock::given(method("DELETE"))
661 .respond_with(ResponseTemplate::new(204))
662 .expect(0)
663 .mount(&server)
664 .await;
665
666 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), ""));
667 let err = purger.purge(&["index.html".to_string()]).await.unwrap_err();
668 assert!(
669 err.to_string().contains("DIGITALOCEAN_ACCESS_TOKEN"),
670 "Error must mention the token env var: {err}"
671 );
672 }
673
674 #[tokio::test]
681 async fn do_adapter_throttle_serializes() {
682 let server = MockServer::start().await;
683 Mock::given(method("DELETE"))
684 .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
685 .and(header("Authorization", "Bearer test-token"))
686 .respond_with(ResponseTemplate::new(204))
687 .expect(6)
688 .mount(&server)
689 .await;
690
691 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
692 let paths: Vec<String> = (0..300).map(|i| format!("file{i}.html")).collect();
694
695 let start = std::time::Instant::now();
696 purger.purge(&paths).await.unwrap();
697 let elapsed = start.elapsed();
698
699 assert!(
700 elapsed >= Duration::from_secs(9),
701 "Throttle should have held the 6th request for ~10s; elapsed: {elapsed:?}"
702 );
703 }
704}