Skip to main content

ferro_storage/cdn/
mod.rs

1//! CDN cache-invalidation primitives for ferro-storage.
2//!
3//! This module provides the [`PurgeApi`] trait abstracting CDN cache invalidation, and the
4//! batteries-included [`DoSpacesCdn`] adapter for DigitalOcean Spaces CDN.
5//!
6//! # Relative paths
7//!
8//! The [`PurgeApi`] trait uses relative paths (e.g. `"index.html"`, `"assets/*"`).
9//! Implementations handle batching and rate limiting internally. Do NOT pass full CDN
10//! URLs — paths are relative to the CDN endpoint's origin.
11//!
12//! # DigitalOcean Spaces CDN adapter
13//!
14//! [`DoSpacesCdn`] encapsulates the operationally fiddly parts of the DO CDN purge API:
15//! - Batching: ≤50 files per `DELETE /v2/cdn/endpoints/{id}/cache` request.
16//! - Rate limiting: an internal sliding-window throttle enforces ≤5 requests per 10 s.
17//! - Wildcard slot accounting: a wildcard path (e.g. `"dir/*"`) counts as 1 file slot.
18//! - Missing endpoint id: `purge()` is a logged no-op returning `Ok(())` (no HTTP request).
19//!
20//! # Security
21//!
22//! The `DIGITALOCEAN_ACCESS_TOKEN` is never logged or printed. [`DoSpacesCdnConfig`]
23//! implements a hand-written `Debug` that redacts the token field.
24
25use crate::Error;
26use async_trait::async_trait;
27use std::collections::VecDeque;
28use std::time::Duration;
29use tokio::sync::Mutex;
30use tokio::time::Instant;
31
32const DO_CDN_API_BASE: &str = "https://api.digitalocean.com";
33const BATCH_SIZE: usize = 50;
34const RATE_LIMIT_WINDOW: Duration = Duration::from_secs(10);
35const RATE_LIMIT_MAX: usize = 5;
36
37/// Cache invalidation abstraction for CDN backends.
38///
39/// Paths are relative (e.g. `"index.html"`, `"assets/*"`); implementations handle
40/// batching and rate limiting internally.
41#[async_trait]
42pub trait PurgeApi: Send + Sync {
43    /// Purge cached content at the given relative paths.
44    ///
45    /// - An empty slice returns `Ok(())` with zero HTTP requests.
46    /// - Implementations handle batching, rate limiting, and wildcard slots internally.
47    async fn purge(&self, paths: &[String]) -> Result<(), Error>;
48}
49
50/// Configuration for the DigitalOcean Spaces CDN adapter.
51///
52/// Read from environment via [`DoSpacesCdnConfig::from_env()`].
53///
54/// # Token security
55///
56/// `api_token` is never logged. The `Debug` implementation prints `<redacted>` for this field.
57#[derive(Clone)]
58pub struct DoSpacesCdnConfig {
59    /// DO CDN endpoint id (`DO_SPACES_CDN_ID`). `None` → `purge()` is a logged no-op.
60    pub endpoint_id: Option<String>,
61    /// DO API token (`DIGITALOCEAN_ACCESS_TOKEN`). Never logged.
62    pub api_token: String,
63    /// API base URL override for tests only. Production uses the DO API base constant.
64    pub(crate) api_base: Option<String>,
65}
66
67impl std::fmt::Debug for DoSpacesCdnConfig {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("DoSpacesCdnConfig")
70            .field("endpoint_id", &self.endpoint_id)
71            .field("api_token", &"<redacted>")
72            .field("api_base", &self.api_base)
73            .finish()
74    }
75}
76
77impl DoSpacesCdnConfig {
78    /// Read config from environment.
79    ///
80    /// - `DO_SPACES_CDN_ID` — optional. When absent, `purge()` is a logged no-op.
81    /// - `DIGITALOCEAN_ACCESS_TOKEN` — required when endpoint id is set; otherwise unused.
82    pub fn from_env() -> Self {
83        Self {
84            endpoint_id: std::env::var("DO_SPACES_CDN_ID").ok(),
85            api_token: std::env::var("DIGITALOCEAN_ACCESS_TOKEN").unwrap_or_default(),
86            api_base: None,
87        }
88    }
89}
90
91/// DigitalOcean Spaces CDN adapter implementing [`PurgeApi`].
92///
93/// Encapsulates the DO CDN purge API: `DELETE /v2/cdn/endpoints/{id}/cache` with
94/// `{"files": [...]}` body, ≤50-file batching, and a 5-req/10s internal throttle.
95pub struct DoSpacesCdn {
96    config: DoSpacesCdnConfig,
97    client: reqwest::Client,
98    request_times: Mutex<VecDeque<Instant>>,
99}
100
101impl DoSpacesCdn {
102    /// Construct from config. Builds a single `reqwest::Client` shared across all requests.
103    pub fn new(config: DoSpacesCdnConfig) -> Self {
104        Self {
105            config,
106            client: reqwest::Client::new(),
107            request_times: Mutex::new(VecDeque::new()),
108        }
109    }
110
111    fn api_base(&self) -> &str {
112        self.config.api_base.as_deref().unwrap_or(DO_CDN_API_BASE)
113    }
114
115    /// Sliding-window rate limiter: ensures ≤ RATE_LIMIT_MAX requests per RATE_LIMIT_WINDOW.
116    ///
117    /// Loops until a free slot is confirmed while holding the lock, so concurrent callers
118    /// cannot both observe `len < RATE_LIMIT_MAX` and both proceed. The lock is never held
119    /// across the `.await` sleep to avoid deadlocking other waiters.
120    async fn throttle(&self) {
121        loop {
122            let mut times = self.request_times.lock().await;
123            let now = Instant::now();
124            // Evict entries older than the window.
125            while times
126                .front()
127                .map(|t| now.duration_since(*t) >= RATE_LIMIT_WINDOW)
128                .unwrap_or(false)
129            {
130                times.pop_front();
131            }
132            if times.len() < RATE_LIMIT_MAX {
133                // Slot available — record this request and proceed.
134                times.push_back(Instant::now());
135                return;
136            }
137            // Window full: compute sleep duration, drop the lock, then re-check.
138            let oldest = *times.front().unwrap();
139            let sleep_for = RATE_LIMIT_WINDOW - now.duration_since(oldest);
140            drop(times);
141            tokio::time::sleep(sleep_for).await;
142            // Loop back to re-acquire and re-check under the lock.
143        }
144    }
145}
146
147#[async_trait]
148impl PurgeApi for DoSpacesCdn {
149    async fn purge(&self, paths: &[String]) -> Result<(), Error> {
150        if paths.is_empty() {
151            return Ok(());
152        }
153        let Some(id) = &self.config.endpoint_id else {
154            tracing::info!("DO_SPACES_CDN_ID not set — CDN purge is a no-op");
155            return Ok(());
156        };
157        if self.config.api_token.is_empty() {
158            return Err(Error::cdn(
159                "DIGITALOCEAN_ACCESS_TOKEN not set — cannot purge CDN cache",
160            ));
161        }
162        let url = format!("{}/v2/cdn/endpoints/{}/cache", self.api_base(), id);
163        let mut batches = 0usize;
164        for chunk in paths.chunks(BATCH_SIZE) {
165            self.throttle().await;
166            let resp = self
167                .client
168                .delete(&url)
169                .bearer_auth(&self.config.api_token)
170                .json(&serde_json::json!({ "files": chunk }))
171                .send()
172                .await
173                .map_err(|e| Error::cdn(e.to_string()))?;
174            if resp.status().as_u16() != 204 {
175                let status = resp.status().as_u16();
176                let body = resp.text().await.unwrap_or_default();
177                return Err(Error::cdn(format!("DO CDN purge status {status}: {body}")));
178            }
179            batches += 1;
180        }
181        tracing::info!("purged {} paths in {} request(s)", paths.len(), batches);
182        Ok(())
183    }
184}
185
186#[cfg(feature = "cdn-bunny")]
187pub mod bunny;
188#[cfg(feature = "cdn-bunny")]
189pub use bunny::{BunnyCdn, BunnyCdnConfig};
190
191#[cfg(feature = "cdn-cloudflare")]
192pub mod cloudflare;
193#[cfg(feature = "cdn-cloudflare")]
194pub use cloudflare::{CloudflareCdn, CloudflareCdnConfig};
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use wiremock::matchers::{body_json, header, method, path_regex};
200    use wiremock::{Mock, MockServer, ResponseTemplate};
201
202    /// Build a DoSpacesCdnConfig pointing at the wiremock server URI.
203    fn cfg(server_uri: &str, id: Option<&str>, token: &str) -> DoSpacesCdnConfig {
204        DoSpacesCdnConfig {
205            endpoint_id: id.map(String::from),
206            api_token: token.to_string(),
207            api_base: Some(server_uri.to_string()),
208        }
209    }
210
211    // --- Structural / no-HTTP tests ---
212
213    #[test]
214    fn debug_does_not_contain_token() {
215        let config = DoSpacesCdnConfig {
216            endpoint_id: Some("ep-123".into()),
217            api_token: "secret-token-abc".into(),
218            api_base: None,
219        };
220        let dbg = format!("{config:?}");
221        assert!(
222            !dbg.contains("secret-token-abc"),
223            "Debug output must not contain the token: {dbg}"
224        );
225        assert!(
226            dbg.contains("<redacted>"),
227            "Debug output must show <redacted>: {dbg}"
228        );
229    }
230
231    // --- wiremock-backed tests ---
232
233    /// 1 path → exactly 1 DELETE to /v2/cdn/endpoints/test-id/cache, correct auth, correct body.
234    #[tokio::test]
235    async fn do_adapter_request_shape() {
236        let server = MockServer::start().await;
237        Mock::given(method("DELETE"))
238            .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
239            .and(header("Authorization", "Bearer test-token"))
240            .and(body_json(serde_json::json!({ "files": ["index.html"] })))
241            .respond_with(ResponseTemplate::new(204))
242            .expect(1)
243            .mount(&server)
244            .await;
245
246        let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
247        purger.purge(&["index.html".to_string()]).await.unwrap();
248        // wiremock verifies .expect(1) on drop
249    }
250
251    /// 55 paths → exactly 2 DELETE requests (chunks of 50 + 5).
252    #[tokio::test]
253    async fn do_adapter_batches_over_50() {
254        let server = MockServer::start().await;
255        Mock::given(method("DELETE"))
256            .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
257            .and(header("Authorization", "Bearer test-token"))
258            .respond_with(ResponseTemplate::new(204))
259            .expect(2)
260            .mount(&server)
261            .await;
262
263        let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
264        let paths: Vec<String> = (0..55).map(|i| format!("file{i}.html")).collect();
265        purger.purge(&paths).await.unwrap();
266    }
267
268    /// 50 plain paths + 1 wildcard "dir/*" = 51 elements → 2 requests.
269    /// Wildcard counts as 1 slot, not expanded.
270    #[tokio::test]
271    async fn do_adapter_wildcard_slot() {
272        let server = MockServer::start().await;
273        Mock::given(method("DELETE"))
274            .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
275            .and(header("Authorization", "Bearer test-token"))
276            .respond_with(ResponseTemplate::new(204))
277            .expect(2)
278            .mount(&server)
279            .await;
280
281        let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
282        let mut paths: Vec<String> = (0..50).map(|i| format!("file{i}.html")).collect();
283        paths.push("dir/*".to_string()); // 51 total → 2 requests
284        purger.purge(&paths).await.unwrap();
285    }
286
287    /// Missing endpoint id → purge() returns Ok(()), zero HTTP requests.
288    #[tokio::test]
289    async fn do_adapter_noop_missing_id() {
290        let server = MockServer::start().await;
291        // Expect zero requests — wiremock will fail the test if any arrive
292        Mock::given(method("DELETE"))
293            .respond_with(ResponseTemplate::new(204))
294            .expect(0)
295            .mount(&server)
296            .await;
297
298        let purger = DoSpacesCdn::new(cfg(&server.uri(), None, "test-token"));
299        purger.purge(&["a".to_string()]).await.unwrap();
300    }
301
302    /// purge(&[]) → Ok(()), zero HTTP requests.
303    #[tokio::test]
304    async fn purge_empty_noop() {
305        let server = MockServer::start().await;
306        Mock::given(method("DELETE"))
307            .respond_with(ResponseTemplate::new(204))
308            .expect(0)
309            .mount(&server)
310            .await;
311
312        let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
313        purger.purge(&[]).await.unwrap();
314    }
315
316    /// Non-204 response → Err(Error::Cdn(...)) whose message contains the status code.
317    #[tokio::test]
318    async fn do_adapter_error_on_non_204() {
319        let server = MockServer::start().await;
320        Mock::given(method("DELETE"))
321            .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
322            .respond_with(ResponseTemplate::new(403))
323            .mount(&server)
324            .await;
325
326        let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
327        let err = purger.purge(&["index.html".to_string()]).await.unwrap_err();
328        assert!(
329            err.to_string().contains("403"),
330            "Error must contain status 403: {err}"
331        );
332    }
333
334    /// Empty token with id set → Err(Error::Cdn) mentioning DIGITALOCEAN_ACCESS_TOKEN.
335    /// Zero HTTP requests are made.
336    #[tokio::test]
337    async fn do_adapter_missing_token_errors() {
338        let server = MockServer::start().await;
339        Mock::given(method("DELETE"))
340            .respond_with(ResponseTemplate::new(204))
341            .expect(0)
342            .mount(&server)
343            .await;
344
345        let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), ""));
346        let err = purger.purge(&["index.html".to_string()]).await.unwrap_err();
347        assert!(
348            err.to_string().contains("DIGITALOCEAN_ACCESS_TOKEN"),
349            "Error must mention the token env var: {err}"
350        );
351    }
352
353    /// 300 paths → 6 chunks. The 6th request must wait for the rate-limit window.
354    /// Total elapsed must be >= ~9 s (RATE_LIMIT_WINDOW - 1s tolerance).
355    ///
356    /// NOTE: This test takes ~10 s due to the real rate-limit sleep. This is intentional —
357    /// it asserts that the throttle actually serializes under the 5-req/10s window without
358    /// relying on tokio::time::pause (which could give a false pass).
359    #[tokio::test]
360    async fn do_adapter_throttle_serializes() {
361        let server = MockServer::start().await;
362        Mock::given(method("DELETE"))
363            .and(path_regex(r"/v2/cdn/endpoints/test-id/cache"))
364            .and(header("Authorization", "Bearer test-token"))
365            .respond_with(ResponseTemplate::new(204))
366            .expect(6)
367            .mount(&server)
368            .await;
369
370        let purger = DoSpacesCdn::new(cfg(&server.uri(), Some("test-id"), "test-token"));
371        // 300 paths → 6 chunks of 50 — the 6th must wait for the window
372        let paths: Vec<String> = (0..300).map(|i| format!("file{i}.html")).collect();
373
374        let start = std::time::Instant::now();
375        purger.purge(&paths).await.unwrap();
376        let elapsed = start.elapsed();
377
378        assert!(
379            elapsed >= Duration::from_secs(9),
380            "Throttle should have held the 6th request for ~10s; elapsed: {elapsed:?}"
381        );
382    }
383}