1use crate::env_helpers::env_with_fallback;
26use crate::Error;
27use async_trait::async_trait;
28use std::collections::VecDeque;
29use std::time::Duration;
30use tokio::sync::Mutex;
31use tokio::time::Instant;
32
33const DO_CDN_API_BASE: &str = "https://api.digitalocean.com";
34const BATCH_SIZE: usize = 50;
35const RATE_LIMIT_WINDOW: Duration = Duration::from_secs(10);
36const RATE_LIMIT_MAX: usize = 5;
37
38#[async_trait]
43pub trait PurgeApi: Send + Sync {
44 async fn purge(&self, paths: &[String]) -> Result<(), Error>;
49}
50
51#[derive(Clone)]
59pub struct DoSpacesCdnConfig {
60 pub endpoint_id: Option<String>,
62 pub api_token: String,
64 pub(crate) api_base: Option<String>,
66}
67
68impl std::fmt::Debug for DoSpacesCdnConfig {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 f.debug_struct("DoSpacesCdnConfig")
71 .field("endpoint_id", &self.endpoint_id)
72 .field("api_token", &"<redacted>")
73 .field("api_base", &self.api_base)
74 .finish()
75 }
76}
77
78impl DoSpacesCdnConfig {
79 pub fn from_env() -> Self {
89 Self {
90 endpoint_id: crate::env_helpers::env_with_fallback(
91 "CDN_PURGE_ZONE",
92 &["DO_SPACES_CDN_ID"],
93 ),
94 api_token: crate::env_helpers::env_with_fallback(
95 "CDN_PURGE_TOKEN",
96 &["DIGITALOCEAN_ACCESS_TOKEN"],
97 )
98 .unwrap_or_default(),
99 api_base: None,
100 }
101 }
102}
103
104pub struct DoSpacesCdn {
109 config: DoSpacesCdnConfig,
110 client: reqwest::Client,
111 request_times: Mutex<VecDeque<Instant>>,
112}
113
114impl DoSpacesCdn {
115 pub fn new(config: DoSpacesCdnConfig) -> Self {
117 Self {
118 config,
119 client: reqwest::Client::new(),
120 request_times: Mutex::new(VecDeque::new()),
121 }
122 }
123
124 fn api_base(&self) -> &str {
125 self.config.api_base.as_deref().unwrap_or(DO_CDN_API_BASE)
126 }
127
128 async fn throttle(&self) {
134 loop {
135 let mut times = self.request_times.lock().await;
136 let now = Instant::now();
137 while times
139 .front()
140 .map(|t| now.duration_since(*t) >= RATE_LIMIT_WINDOW)
141 .unwrap_or(false)
142 {
143 times.pop_front();
144 }
145 if times.len() < RATE_LIMIT_MAX {
146 times.push_back(Instant::now());
148 return;
149 }
150 let oldest = *times.front().unwrap();
152 let sleep_for = RATE_LIMIT_WINDOW - now.duration_since(oldest);
153 drop(times);
154 tokio::time::sleep(sleep_for).await;
155 }
157 }
158}
159
160#[async_trait]
161impl PurgeApi for DoSpacesCdn {
162 async fn purge(&self, paths: &[String]) -> Result<(), Error> {
163 if paths.is_empty() {
164 return Ok(());
165 }
166 let Some(id) = &self.config.endpoint_id else {
167 tracing::info!("DO_SPACES_CDN_ID not set — CDN purge is a no-op");
168 return Ok(());
169 };
170 if self.config.api_token.is_empty() {
171 return Err(Error::cdn(
172 "DIGITALOCEAN_ACCESS_TOKEN not set — cannot purge CDN cache",
173 ));
174 }
175 let url = format!("{}/v2/cdn/endpoints/{}/cache", self.api_base(), id);
176 let mut batches = 0usize;
177 for chunk in paths.chunks(BATCH_SIZE) {
178 self.throttle().await;
179 let resp = self
180 .client
181 .delete(&url)
182 .bearer_auth(&self.config.api_token)
183 .json(&serde_json::json!({ "files": chunk }))
184 .send()
185 .await
186 .map_err(|e| Error::cdn(e.to_string()))?;
187 if resp.status().as_u16() != 204 {
188 let status = resp.status().as_u16();
189 let body = resp.text().await.unwrap_or_default();
190 return Err(Error::cdn(format!("DO CDN purge status {status}: {body}")));
191 }
192 batches += 1;
193 }
194 tracing::info!("purged {} paths in {} request(s)", paths.len(), batches);
195 Ok(())
196 }
197}
198
199#[derive(Debug, Clone, PartialEq, Eq, Default)]
201pub enum CdnProvider {
202 #[default]
204 None,
205 DigitalOcean,
207 Bunny,
209 Cloudflare,
211}
212
213impl CdnProvider {
214 pub fn from_str_ci(s: &str) -> Result<Self, Error> {
216 match s.trim().to_ascii_lowercase().as_str() {
217 "none" => Ok(Self::None),
218 "digitalocean" => Ok(Self::DigitalOcean),
219 "bunny" => Ok(Self::Bunny),
220 "cloudflare" => Ok(Self::Cloudflare),
221 other => Err(Error::cdn_invalid_provider(other)),
222 }
223 }
224}
225
226#[derive(Clone, Default)]
232pub struct Config {
233 pub url: Option<String>,
235 pub provider: CdnProvider,
237 pub purge_token: Option<String>,
239 pub purge_zone: Option<String>,
241 pub provider_error: Option<String>,
244}
245
246impl std::fmt::Debug for Config {
247 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248 f.debug_struct("cdn::Config")
249 .field("url", &self.url)
250 .field("provider", &self.provider)
251 .field("purge_token", &"<redacted>")
252 .field("purge_zone", &self.purge_zone)
253 .field("provider_error", &self.provider_error)
254 .finish()
255 }
256}
257
258impl Config {
259 pub fn from_env() -> Self {
269 let url = env_with_fallback("CDN_URL", &["AWS_CDN_URL", "CF_CDN_URL", "BUNNY_CDN_URL"]);
271
272 let mut provider_error: Option<String> = None;
274 let provider = if let Ok(val) = std::env::var("CDN_PROVIDER") {
275 match CdnProvider::from_str_ci(&val) {
276 Ok(p) => p,
277 Err(e) => {
278 tracing::error!("{e}; set CDN_PROVIDER to a valid value to enable purging");
279 provider_error = Some(val);
280 CdnProvider::None
281 }
282 }
283 } else if std::env::var("DO_SPACES_CDN_ID").is_ok() {
284 tracing::warn!("CDN_PROVIDER unset; inferred digitalocean from DO_SPACES_CDN_ID. Set CDN_PROVIDER=digitalocean to silence.");
285 CdnProvider::DigitalOcean
286 } else if std::env::var("CF_ZONE_ID").is_ok() {
287 tracing::warn!("CDN_PROVIDER unset; inferred cloudflare from CF_ZONE_ID. Set CDN_PROVIDER=cloudflare to silence.");
288 CdnProvider::Cloudflare
289 } else if std::env::var("BUNNY_CDN_URL").is_ok()
290 || std::env::var("BUNNY_ACCESS_KEY").is_ok()
291 {
292 tracing::warn!("CDN_PROVIDER unset; inferred bunny from BUNNY_CDN_URL/BUNNY_ACCESS_KEY. Set CDN_PROVIDER=bunny to silence.");
293 CdnProvider::Bunny
294 } else {
295 CdnProvider::None
296 };
297
298 let (purge_token, purge_zone) = match &provider {
301 CdnProvider::DigitalOcean => (
302 env_with_fallback("CDN_PURGE_TOKEN", &["DIGITALOCEAN_ACCESS_TOKEN"]),
303 env_with_fallback("CDN_PURGE_ZONE", &["DO_SPACES_CDN_ID"]),
304 ),
305 CdnProvider::Cloudflare => (
306 env_with_fallback("CDN_PURGE_TOKEN", &["CF_API_TOKEN"]),
307 env_with_fallback("CDN_PURGE_ZONE", &["CF_ZONE_ID"]),
308 ),
309 CdnProvider::Bunny => (
310 env_with_fallback("CDN_PURGE_TOKEN", &["BUNNY_ACCESS_KEY"]),
311 None,
312 ),
313 CdnProvider::None => (
314 env_with_fallback("CDN_PURGE_TOKEN", &[]),
315 env_with_fallback("CDN_PURGE_ZONE", &[]),
316 ),
317 };
318
319 Self {
320 url,
321 provider,
322 purge_token,
323 purge_zone,
324 provider_error,
325 }
326 }
327
328 pub fn build_purge_api(&self) -> Result<Option<Box<dyn PurgeApi>>, Error> {
334 if let Some(bad) = &self.provider_error {
336 return Err(Error::cdn_invalid_provider(bad));
337 }
338
339 match &self.provider {
340 CdnProvider::None => {
341 tracing::info!("CDN_PROVIDER=none — purge is a no-op");
342 Ok(None)
343 }
344 CdnProvider::DigitalOcean => {
345 let cfg = DoSpacesCdnConfig {
346 endpoint_id: self.purge_zone.clone(),
347 api_token: self.purge_token.clone().unwrap_or_default(),
348 api_base: None,
349 };
350 Ok(Some(Box::new(DoSpacesCdn::new(cfg))))
351 }
352 CdnProvider::Bunny => {
353 #[cfg(feature = "cdn-bunny")]
354 {
355 let cfg = BunnyCdnConfig {
356 cdn_base_url: self.url.clone().unwrap_or_default(),
357 access_key: self.purge_token.clone().unwrap_or_default(),
358 };
359 Ok(Some(Box::new(BunnyCdn::new(cfg))))
360 }
361 #[cfg(not(feature = "cdn-bunny"))]
362 {
363 Err(Error::cdn_feature_required("bunny", "cdn-bunny"))
364 }
365 }
366 CdnProvider::Cloudflare => {
367 #[cfg(feature = "cdn-cloudflare")]
368 {
369 let cfg = CloudflareCdnConfig {
370 zone_id: self.purge_zone.clone().unwrap_or_default(),
371 api_token: self.purge_token.clone().unwrap_or_default(),
372 cdn_base_url: self.url.clone().unwrap_or_default(),
373 };
374 Ok(Some(Box::new(CloudflareCdn::new(cfg))))
375 }
376 #[cfg(not(feature = "cdn-cloudflare"))]
377 {
378 Err(Error::cdn_feature_required("cloudflare", "cdn-cloudflare"))
379 }
380 }
381 }
382 }
383}
384
385#[cfg(feature = "cdn-bunny")]
386pub mod bunny;
387#[cfg(feature = "cdn-bunny")]
388pub use bunny::{BunnyCdn, BunnyCdnConfig};
389
390#[cfg(feature = "cdn-cloudflare")]
391pub mod cloudflare;
392#[cfg(feature = "cdn-cloudflare")]
393pub use cloudflare::{CloudflareCdn, CloudflareCdnConfig};
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398 use wiremock::matchers::{body_json, header, method, path_regex};
399 use wiremock::{Mock, MockServer, ResponseTemplate};
400
401 fn cfg(server_uri: &str, id: Option<&str>, token: &str) -> DoSpacesCdnConfig {
403 DoSpacesCdnConfig {
404 endpoint_id: id.map(String::from),
405 api_token: token.to_string(),
406 api_base: Some(server_uri.to_string()),
407 }
408 }
409
410 #[test]
413 fn debug_does_not_contain_token() {
414 let config = DoSpacesCdnConfig {
415 endpoint_id: Some("ep-123".into()),
416 api_token: "secret-token-abc".into(),
417 api_base: None,
418 };
419 let dbg = format!("{config:?}");
420 assert!(
421 !dbg.contains("secret-token-abc"),
422 "Debug output must not contain the token: {dbg}"
423 );
424 assert!(
425 dbg.contains("<redacted>"),
426 "Debug output must show <redacted>: {dbg}"
427 );
428 }
429
430 #[tokio::test]
434 async fn do_adapter_request_shape() {
435 let server = MockServer::start().await;
436 Mock::given(method("DELETE"))
437 .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
438 .and(header("Authorization", "Bearer test-token"))
439 .and(body_json(serde_json::json!({ "files": ["index.html"] })))
440 .respond_with(ResponseTemplate::new(204))
441 .expect(1)
442 .mount(&server)
443 .await;
444
445 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
446 purger.purge(&["index.html".to_string()]).await.unwrap();
447 }
449
450 #[tokio::test]
452 async fn do_adapter_batches_over_50() {
453 let server = MockServer::start().await;
454 Mock::given(method("DELETE"))
455 .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
456 .and(header("Authorization", "Bearer test-token"))
457 .respond_with(ResponseTemplate::new(204))
458 .expect(2)
459 .mount(&server)
460 .await;
461
462 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
463 let paths: Vec<String> = (0..55).map(|i| format!("file{i}.html")).collect();
464 purger.purge(&paths).await.unwrap();
465 }
466
467 #[tokio::test]
470 async fn do_adapter_wildcard_slot() {
471 let server = MockServer::start().await;
472 Mock::given(method("DELETE"))
473 .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
474 .and(header("Authorization", "Bearer test-token"))
475 .respond_with(ResponseTemplate::new(204))
476 .expect(2)
477 .mount(&server)
478 .await;
479
480 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
481 let mut paths: Vec<String> = (0..50).map(|i| format!("file{i}.html")).collect();
482 paths.push("dir/*".to_string()); purger.purge(&paths).await.unwrap();
484 }
485
486 #[tokio::test]
488 async fn do_adapter_noop_missing_id() {
489 let server = MockServer::start().await;
490 Mock::given(method("DELETE"))
492 .respond_with(ResponseTemplate::new(204))
493 .expect(0)
494 .mount(&server)
495 .await;
496
497 let purger = DoSpacesCdn::new(cfg(&server.uri(), None, "test-token"));
498 purger.purge(&["a".to_string()]).await.unwrap();
499 }
500
501 #[tokio::test]
503 async fn purge_empty_noop() {
504 let server = MockServer::start().await;
505 Mock::given(method("DELETE"))
506 .respond_with(ResponseTemplate::new(204))
507 .expect(0)
508 .mount(&server)
509 .await;
510
511 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
512 purger.purge(&[]).await.unwrap();
513 }
514
515 #[tokio::test]
517 async fn do_adapter_error_on_non_204() {
518 let server = MockServer::start().await;
519 Mock::given(method("DELETE"))
520 .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
521 .respond_with(ResponseTemplate::new(403))
522 .mount(&server)
523 .await;
524
525 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
526 let err = purger.purge(&["index.html".to_string()]).await.unwrap_err();
527 assert!(
528 err.to_string().contains("403"),
529 "Error must contain status 403: {err}"
530 );
531 }
532
533 #[test]
536 fn cdn_config_debug_redacts_token() {
537 let config = Config {
538 purge_token: Some("secret-xyz".into()),
539 ..Default::default()
540 };
541 let dbg = format!("{config:?}");
542 assert!(
543 !dbg.contains("secret-xyz"),
544 "Debug output must not contain the token: {dbg}"
545 );
546 assert!(
547 dbg.contains("<redacted>"),
548 "Debug output must show <redacted>: {dbg}"
549 );
550 }
551
552 #[test]
553 fn cdn_invalid_provider() {
554 let result = CdnProvider::from_str_ci("Fastly");
555 assert!(result.is_err(), "Expected Err for unknown provider");
556 let msg = result.unwrap_err().to_string();
557 assert!(
558 msg.contains("none, digitalocean, bunny, cloudflare"),
559 "Error must list valid values: {msg}"
560 );
561 }
562
563 #[test]
566 fn cdn_invalid_provider_from_env_errors() {
567 let config = Config {
568 provider_error: Some("Fastly".into()),
569 ..Default::default()
570 };
571 let result = config.build_purge_api();
572 let err = match result {
573 Err(e) => e,
574 Ok(_) => panic!("build_purge_api must return Err when provider_error is set"),
575 };
576 let msg = err.to_string();
577 assert!(
578 msg.contains("none, digitalocean, bunny, cloudflare"),
579 "Error must list valid values: {msg}"
580 );
581 }
582
583 #[test]
584 fn cdn_provider_none_no_op() {
585 let config = Config::default();
586 assert!(
587 matches!(config.build_purge_api(), Ok(None)),
588 "provider=None must return Ok(None)"
589 );
590 }
591
592 #[test]
593 #[serial_test::serial]
594 fn cdn_config_from_env_quartet() {
595 std::env::set_var("CDN_URL", "https://q.example.com");
596 std::env::set_var("CDN_PROVIDER", "digitalocean");
597 std::env::set_var("CDN_PURGE_TOKEN", "tok");
598 std::env::set_var("CDN_PURGE_ZONE", "ep-9");
599
600 let config = Config::from_env();
601
602 std::env::remove_var("CDN_URL");
603 std::env::remove_var("CDN_PROVIDER");
604 std::env::remove_var("CDN_PURGE_TOKEN");
605 std::env::remove_var("CDN_PURGE_ZONE");
606
607 assert_eq!(config.url, Some("https://q.example.com".into()));
608 assert_eq!(config.provider, CdnProvider::DigitalOcean);
609 assert_eq!(config.purge_token, Some("tok".into()));
610 assert_eq!(config.purge_zone, Some("ep-9".into()));
611 }
612
613 #[test]
614 #[serial_test::serial]
615 fn cdn_fallback_aws_cdn_url() {
616 std::env::remove_var("CDN_URL");
617 std::env::set_var("AWS_CDN_URL", "https://legacy");
618
619 let config = Config::from_env();
620
621 std::env::remove_var("AWS_CDN_URL");
622
623 assert_eq!(config.url, Some("https://legacy".into()));
624 }
625
626 #[test]
627 #[serial_test::serial]
628 fn cdn_fallback_do_zone_and_token() {
629 std::env::remove_var("CDN_PURGE_ZONE");
630 std::env::remove_var("CDN_PURGE_TOKEN");
631 std::env::remove_var("CDN_PROVIDER");
632 std::env::set_var("DO_SPACES_CDN_ID", "ep-1");
633 std::env::set_var("DIGITALOCEAN_ACCESS_TOKEN", "t");
634
635 let config = Config::from_env();
636
637 std::env::remove_var("DO_SPACES_CDN_ID");
638 std::env::remove_var("DIGITALOCEAN_ACCESS_TOKEN");
639
640 assert_eq!(config.purge_zone, Some("ep-1".into()));
641 assert_eq!(config.purge_token, Some("t".into()));
642 assert_eq!(config.provider, CdnProvider::DigitalOcean);
643 }
644
645 #[cfg(not(feature = "cdn-bunny"))]
646 #[test]
647 fn cdn_feature_required_bunny() {
648 let config = Config {
649 provider: CdnProvider::Bunny,
650 ..Default::default()
651 };
652 let result = config.build_purge_api();
653 assert!(
654 result.is_err(),
655 "Expected Err when cdn-bunny feature is off"
656 );
657 let msg = match result {
658 Err(e) => e.to_string(),
659 Ok(_) => unreachable!(),
660 };
661 assert!(
662 msg.contains("cdn-bunny"),
663 "Error must mention the cdn-bunny feature: {msg}"
664 );
665 }
666
667 #[tokio::test]
670 async fn do_adapter_missing_token_errors() {
671 let server = MockServer::start().await;
672 Mock::given(method("DELETE"))
673 .respond_with(ResponseTemplate::new(204))
674 .expect(0)
675 .mount(&server)
676 .await;
677
678 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), ""));
679 let err = purger.purge(&["index.html".to_string()]).await.unwrap_err();
680 assert!(
681 err.to_string().contains("DIGITALOCEAN_ACCESS_TOKEN"),
682 "Error must mention the token env var: {err}"
683 );
684 }
685
686 #[tokio::test]
693 async fn do_adapter_throttle_serializes() {
694 let server = MockServer::start().await;
695 Mock::given(method("DELETE"))
696 .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
697 .and(header("Authorization", "Bearer test-token"))
698 .respond_with(ResponseTemplate::new(204))
699 .expect(6)
700 .mount(&server)
701 .await;
702
703 let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
704 let paths: Vec<String> = (0..300).map(|i| format!("file{i}.html")).collect();
706
707 let start = std::time::Instant::now();
708 purger.purge(&paths).await.unwrap();
709 let elapsed = start.elapsed();
710
711 assert!(
712 elapsed >= Duration::from_secs(9),
713 "Throttle should have held the 6th request for ~10s; elapsed: {elapsed:?}"
714 );
715 }
716}