use crate::Error;
use async_trait::async_trait;
use std::collections::VecDeque;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::Instant;
const DO_CDN_API_BASE: &str = "https://api.digitalocean.com";
const BATCH_SIZE: usize = 50;
const RATE_LIMIT_WINDOW: Duration = Duration::from_secs(10);
const RATE_LIMIT_MAX: usize = 5;
#[async_trait]
pub trait PurgeApi: Send + Sync {
async fn purge(&self, paths: &[String]) -> Result<(), Error>;
}
#[derive(Clone)]
pub struct DoSpacesCdnConfig {
pub endpoint_id: Option<String>,
pub api_token: String,
pub(crate) api_base: Option<String>,
}
impl std::fmt::Debug for DoSpacesCdnConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DoSpacesCdnConfig")
.field("endpoint_id", &self.endpoint_id)
.field("api_token", &"<redacted>")
.field("api_base", &self.api_base)
.finish()
}
}
impl DoSpacesCdnConfig {
pub fn from_env() -> Self {
Self {
endpoint_id: std::env::var("DO_SPACES_CDN_ID").ok(),
api_token: std::env::var("DIGITALOCEAN_ACCESS_TOKEN").unwrap_or_default(),
api_base: None,
}
}
}
pub struct DoSpacesCdn {
config: DoSpacesCdnConfig,
client: reqwest::Client,
request_times: Mutex<VecDeque<Instant>>,
}
impl DoSpacesCdn {
pub fn new(config: DoSpacesCdnConfig) -> Self {
Self {
config,
client: reqwest::Client::new(),
request_times: Mutex::new(VecDeque::new()),
}
}
fn api_base(&self) -> &str {
self.config.api_base.as_deref().unwrap_or(DO_CDN_API_BASE)
}
async fn throttle(&self) {
loop {
let mut times = self.request_times.lock().await;
let now = Instant::now();
while times
.front()
.map(|t| now.duration_since(*t) >= RATE_LIMIT_WINDOW)
.unwrap_or(false)
{
times.pop_front();
}
if times.len() < RATE_LIMIT_MAX {
times.push_back(Instant::now());
return;
}
let oldest = *times.front().unwrap();
let sleep_for = RATE_LIMIT_WINDOW - now.duration_since(oldest);
drop(times);
tokio::time::sleep(sleep_for).await;
}
}
}
#[async_trait]
impl PurgeApi for DoSpacesCdn {
async fn purge(&self, paths: &[String]) -> Result<(), Error> {
if paths.is_empty() {
return Ok(());
}
let Some(id) = &self.config.endpoint_id else {
tracing::info!("DO_SPACES_CDN_ID not set — CDN purge is a no-op");
return Ok(());
};
if self.config.api_token.is_empty() {
return Err(Error::cdn(
"DIGITALOCEAN_ACCESS_TOKEN not set — cannot purge CDN cache",
));
}
let url = format!("{}/v2/cdn/endpoints/{}/cache", self.api_base(), id);
let mut batches = 0usize;
for chunk in paths.chunks(BATCH_SIZE) {
self.throttle().await;
let resp = self
.client
.delete(&url)
.bearer_auth(&self.config.api_token)
.json(&serde_json::json!({ "files": chunk }))
.send()
.await
.map_err(|e| Error::cdn(e.to_string()))?;
if resp.status().as_u16() != 204 {
let status = resp.status().as_u16();
let body = resp.text().await.unwrap_or_default();
return Err(Error::cdn(format!("DO CDN purge status {status}: {body}")));
}
batches += 1;
}
tracing::info!("purged {} paths in {} request(s)", paths.len(), batches);
Ok(())
}
}
#[cfg(feature = "cdn-bunny")]
pub mod bunny;
#[cfg(feature = "cdn-bunny")]
pub use bunny::{BunnyCdn, BunnyCdnConfig};
#[cfg(feature = "cdn-cloudflare")]
pub mod cloudflare;
#[cfg(feature = "cdn-cloudflare")]
pub use cloudflare::{CloudflareCdn, CloudflareCdnConfig};
#[cfg(test)]
mod tests {
use super::*;
use wiremock::matchers::{body_json, header, method, path_regex};
use wiremock::{Mock, MockServer, ResponseTemplate};
fn cfg(server_uri: &str, id: Option<&str>, token: &str) -> DoSpacesCdnConfig {
DoSpacesCdnConfig {
endpoint_id: id.map(String::from),
api_token: token.to_string(),
api_base: Some(server_uri.to_string()),
}
}
#[test]
fn debug_does_not_contain_token() {
let config = DoSpacesCdnConfig {
endpoint_id: Some("ep-123".into()),
api_token: "secret-token-abc".into(),
api_base: None,
};
let dbg = format!("{config:?}");
assert!(
!dbg.contains("secret-token-abc"),
"Debug output must not contain the token: {dbg}"
);
assert!(
dbg.contains("<redacted>"),
"Debug output must show <redacted>: {dbg}"
);
}
#[tokio::test]
async fn do_adapter_request_shape() {
let server = MockServer::start().await;
Mock::given(method("DELETE"))
.and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
.and(header("Authorization", "Bearer test-token"))
.and(body_json(serde_json::json!({ "files": ["index.html"] })))
.respond_with(ResponseTemplate::new(204))
.expect(1)
.mount(&server)
.await;
let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
purger.purge(&["index.html".to_string()]).await.unwrap();
}
#[tokio::test]
async fn do_adapter_batches_over_50() {
let server = MockServer::start().await;
Mock::given(method("DELETE"))
.and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
.and(header("Authorization", "Bearer test-token"))
.respond_with(ResponseTemplate::new(204))
.expect(2)
.mount(&server)
.await;
let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
let paths: Vec<String> = (0..55).map(|i| format!("file{i}.html")).collect();
purger.purge(&paths).await.unwrap();
}
#[tokio::test]
async fn do_adapter_wildcard_slot() {
let server = MockServer::start().await;
Mock::given(method("DELETE"))
.and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
.and(header("Authorization", "Bearer test-token"))
.respond_with(ResponseTemplate::new(204))
.expect(2)
.mount(&server)
.await;
let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
let mut paths: Vec<String> = (0..50).map(|i| format!("file{i}.html")).collect();
paths.push("dir/*".to_string()); purger.purge(&paths).await.unwrap();
}
#[tokio::test]
async fn do_adapter_noop_missing_id() {
let server = MockServer::start().await;
Mock::given(method("DELETE"))
.respond_with(ResponseTemplate::new(204))
.expect(0)
.mount(&server)
.await;
let purger = DoSpacesCdn::new(cfg(&server.uri(), None, "test-token"));
purger.purge(&["a".to_string()]).await.unwrap();
}
#[tokio::test]
async fn purge_empty_noop() {
let server = MockServer::start().await;
Mock::given(method("DELETE"))
.respond_with(ResponseTemplate::new(204))
.expect(0)
.mount(&server)
.await;
let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
purger.purge(&[]).await.unwrap();
}
#[tokio::test]
async fn do_adapter_error_on_non_204() {
let server = MockServer::start().await;
Mock::given(method("DELETE"))
.and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
.respond_with(ResponseTemplate::new(403))
.mount(&server)
.await;
let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
let err = purger.purge(&["index.html".to_string()]).await.unwrap_err();
assert!(
err.to_string().contains("403"),
"Error must contain status 403: {err}"
);
}
#[tokio::test]
async fn do_adapter_missing_token_errors() {
let server = MockServer::start().await;
Mock::given(method("DELETE"))
.respond_with(ResponseTemplate::new(204))
.expect(0)
.mount(&server)
.await;
let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), ""));
let err = purger.purge(&["index.html".to_string()]).await.unwrap_err();
assert!(
err.to_string().contains("DIGITALOCEAN_ACCESS_TOKEN"),
"Error must mention the token env var: {err}"
);
}
#[tokio::test]
async fn do_adapter_throttle_serializes() {
let server = MockServer::start().await;
Mock::given(method("DELETE"))
.and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
.and(header("Authorization", "Bearer test-token"))
.respond_with(ResponseTemplate::new(204))
.expect(6)
.mount(&server)
.await;
let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
let paths: Vec<String> = (0..300).map(|i| format!("file{i}.html")).collect();
let start = std::time::Instant::now();
purger.purge(&paths).await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_secs(9),
"Throttle should have held the 6th request for ~10s; elapsed: {elapsed:?}"
);
}
}