Skip to main content

solid_pod_rs/quota/
mod.rs

1//! Pod-level quota policy (Sprint 7 §6.4, ADR-057).
2//!
3//! Mirrors JSS `src/storage/quota.js` — each pod gets a `.quota.json`
4//! sidecar file at its storage root carrying `{used_bytes, limit_bytes}`.
5//! [`QuotaPolicy::reconcile`] re-walks the pod's directory tree and
6//! rewrites the sidecar against disk truth; this is the recovery path
7//! after a crash / manual edit / storage-backend swap.
8//!
9//! The in-memory mutation path is cooperative, not authoritative:
10//! [`FsQuotaStore::record`] updates the sidecar best-effort, but
11//! callers MUST invoke [`FsQuotaStore::check`] before accepting a write
12//! to enforce the cap atomically relative to the policy's own view.
13//!
14//! # Feature gate
15//!
16//! The FS implementation lives behind `#[cfg(feature = "quota")]`
17//! alongside the [`QuotaPolicy`] trait's sole shipped adapter. The
18//! trait itself, [`QuotaUsage`], and [`QuotaExceeded`] are always
19//! compiled so downstream crates can build their own backends without
20//! opting in to the FS one.
21
22#[cfg(feature = "quota")]
23use std::path::{Path, PathBuf};
24
25use async_trait::async_trait;
26use serde::{Deserialize, Serialize};
27
28// ---------------------------------------------------------------------------
29// Always-compiled public types
30// ---------------------------------------------------------------------------
31
32/// Snapshot of a pod's quota counters.
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
34pub struct QuotaUsage {
35    /// Bytes currently attributed to the pod.
36    pub used_bytes: u64,
37    /// Hard cap; `0` in an on-disk sidecar is treated as "unset →
38    /// apply default" by [`FsQuotaStore`].
39    pub limit_bytes: u64,
40}
41
42/// Error surfaced when a pre-write check exceeds the pod's cap.
43///
44/// Carried through [`crate::error::PodError::QuotaExceeded`] when the
45/// `quota` feature is enabled; kept as a standalone type so other
46/// backends (e.g. S3-tag quota) can reuse the shape without pulling in
47/// `PodError` machinery directly.
48#[derive(Debug, Clone, thiserror::Error)]
49#[error("quota exceeded: pod={pod} used={used} limit={limit}")]
50pub struct QuotaExceeded {
51    pub pod: String,
52    pub used: u64,
53    pub limit: u64,
54}
55
56/// Per-pod quota policy. Call [`Self::check`] on write paths and
57/// [`Self::record`] after a successful write; [`Self::reconcile`] is
58/// the crash-recovery entry point that re-reads disk truth.
59#[async_trait]
60pub trait QuotaPolicy: Send + Sync {
61    /// Pre-write check: would adding `delta_bytes` push the pod over
62    /// its limit? A `delta_bytes == 0` check is always allowed so
63    /// HEAD/GET pre-checks never trip the quota.
64    async fn check(&self, pod: &str, delta_bytes: u64) -> Result<(), QuotaExceeded>;
65
66    /// Record an actual write. `delta_bytes` is signed to accommodate
67    /// DELETE as a negative delta; implementations saturate at zero.
68    async fn record(&self, pod: &str, delta_bytes: i64);
69
70    /// Re-scan the pod's storage and reset counters to disk truth.
71    async fn reconcile(&self, pod: &str) -> std::io::Result<QuotaUsage>;
72
73    /// Inspect current counters. `None` when the pod has no quota
74    /// sidecar yet (never written to, never reconciled).
75    async fn usage(&self, pod: &str) -> Option<QuotaUsage>;
76}
77
78// ---------------------------------------------------------------------------
79// FsQuotaStore — feature-gated.
80// ---------------------------------------------------------------------------
81
82#[cfg(feature = "quota")]
83mod fs_impl {
84    use super::*;
85    use tokio::fs;
86
87    const QUOTA_FILE: &str = ".quota.json";
88
89    /// Filesystem-backed quota store. Each pod lives under `root/<pod>/`
90    /// with a `.quota.json` sidecar at its root.
91    pub struct FsQuotaStore {
92        root: PathBuf,
93        default_limit: u64,
94    }
95
96    impl FsQuotaStore {
97        /// Construct a store rooted at `root` with `default_limit`
98        /// applied when a pod's sidecar is absent or has
99        /// `limit_bytes == 0` (parity with JSS's "uninitialised quota
100        /// → seed from default" branch).
101        pub fn new(root: PathBuf, default_limit: u64) -> Self {
102            Self {
103                root,
104                default_limit,
105            }
106        }
107
108        /// Filesystem path of the sidecar for a given pod.
109        pub fn quota_file(&self, pod: &str) -> PathBuf {
110            self.root.join(pod).join(QUOTA_FILE)
111        }
112
113        fn pod_dir(&self, pod: &str) -> PathBuf {
114            self.root.join(pod)
115        }
116
117        async fn read_sidecar(&self, pod: &str) -> std::io::Result<Option<QuotaUsage>> {
118            match fs::read(self.quota_file(pod)).await {
119                Ok(bytes) => {
120                    let parsed: QuotaUsage = serde_json::from_slice(&bytes).map_err(|e| {
121                        std::io::Error::new(std::io::ErrorKind::InvalidData, e)
122                    })?;
123                    Ok(Some(parsed))
124                }
125                Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
126                Err(e) => Err(e),
127            }
128        }
129
130        /// Atomic sidecar write — mirrors JSS `saveQuota` (PR #309) which
131        /// closed an intermittent-500 race on concurrent PUTs. The write
132        /// goes to `.quota.json.tmp-<pid>-<nanos>` first, then is
133        /// renamed onto `.quota.json`; on POSIX the rename is atomic so
134        /// a concurrent reader never observes a half-written document
135        /// and a crash leaves at most an orphan `.tmp-*` that
136        /// [`Self::reconcile`] / startup sweep can clean up.
137        async fn write_sidecar(&self, pod: &str, usage: &QuotaUsage) -> std::io::Result<()> {
138            let path = self.quota_file(pod);
139            if let Some(parent) = path.parent() {
140                fs::create_dir_all(parent).await?;
141            }
142            let body = serde_json::to_vec_pretty(usage).map_err(|e| {
143                std::io::Error::new(std::io::ErrorKind::InvalidData, e)
144            })?;
145            let tmp = {
146                use std::time::{SystemTime, UNIX_EPOCH};
147                let nanos = SystemTime::now()
148                    .duration_since(UNIX_EPOCH)
149                    .map(|d| d.as_nanos())
150                    .unwrap_or(0);
151                let pid = std::process::id();
152                let mut t = path.as_os_str().to_owned();
153                t.push(format!(".tmp-{pid}-{nanos}"));
154                PathBuf::from(t)
155            };
156            match fs::write(&tmp, &body).await {
157                Ok(()) => {}
158                Err(e) => {
159                    let _ = fs::remove_file(&tmp).await;
160                    return Err(e);
161                }
162            }
163            if let Err(e) = fs::rename(&tmp, &path).await {
164                let _ = fs::remove_file(&tmp).await;
165                return Err(e);
166            }
167            Ok(())
168        }
169
170        /// Sweep stale tempfile orphans left by crashed writers.
171        ///
172        /// Called by [`Self::reconcile`] to match JSS's post-#310
173        /// behaviour of ignoring (and cleaning up) half-written quota
174        /// sidecars. Deletes any `.quota.json.tmp-*` under the pod root.
175        async fn sweep_quota_orphans(&self, pod: &str) -> std::io::Result<()> {
176            let dir = match self.quota_file(pod).parent() {
177                Some(p) => p.to_path_buf(),
178                None => return Ok(()),
179            };
180            let mut rd = match fs::read_dir(&dir).await {
181                Ok(r) => r,
182                Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
183                Err(e) => return Err(e),
184            };
185            while let Some(entry) = rd.next_entry().await? {
186                if let Some(name) = entry.file_name().to_str() {
187                    if name.starts_with(".quota.json.tmp-") {
188                        let _ = fs::remove_file(entry.path()).await;
189                    }
190                }
191            }
192            Ok(())
193        }
194
195        /// Resolve the effective sidecar: on-disk value if present, else
196        /// default seed (used=0, limit=self.default_limit).
197        async fn effective(&self, pod: &str) -> std::io::Result<QuotaUsage> {
198            match self.read_sidecar(pod).await? {
199                Some(mut u) => {
200                    // JSS parity: `limit == 0` means "apply default".
201                    if u.limit_bytes == 0 {
202                        u.limit_bytes = self.default_limit;
203                    }
204                    Ok(u)
205                }
206                None => Ok(QuotaUsage {
207                    used_bytes: 0,
208                    limit_bytes: self.default_limit,
209                }),
210            }
211        }
212
213        /// Recursively sum file sizes under `dir`, skipping `.quota.json`
214        /// at any depth.
215        fn dir_size_boxed<'a>(
216            &'a self,
217            dir: &'a Path,
218        ) -> std::pin::Pin<
219            Box<dyn std::future::Future<Output = std::io::Result<u64>> + Send + 'a>,
220        > {
221            Box::pin(async move {
222                let mut total: u64 = 0;
223                let mut rd = match fs::read_dir(dir).await {
224                    Ok(r) => r,
225                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
226                    Err(e) => return Err(e),
227                };
228                while let Some(entry) = rd.next_entry().await? {
229                    let name = entry.file_name();
230                    if name == QUOTA_FILE {
231                        continue;
232                    }
233                    let ft = entry.file_type().await?;
234                    if ft.is_dir() {
235                        total = total.saturating_add(
236                            self.dir_size_boxed(&entry.path()).await?,
237                        );
238                    } else if ft.is_file() {
239                        let md = entry.metadata().await?;
240                        total = total.saturating_add(md.len());
241                    }
242                }
243                Ok(total)
244            })
245        }
246    }
247
248    #[async_trait]
249    impl QuotaPolicy for FsQuotaStore {
250        async fn check(&self, pod: &str, delta_bytes: u64) -> Result<(), QuotaExceeded> {
251            if delta_bytes == 0 {
252                // HEAD/GET-style zero-delta checks never reject; also
253                // matches JSS `checkQuota` where `additionalBytes=0` is
254                // structurally allowed (projected == used).
255                return Ok(());
256            }
257            let u = self.effective(pod).await.unwrap_or(QuotaUsage {
258                used_bytes: 0,
259                limit_bytes: self.default_limit,
260            });
261            // `limit == 0` after applying defaults means "no cap".
262            if u.limit_bytes == 0 {
263                return Ok(());
264            }
265            let projected = u.used_bytes.saturating_add(delta_bytes);
266            if projected > u.limit_bytes {
267                return Err(QuotaExceeded {
268                    pod: pod.to_string(),
269                    used: u.used_bytes,
270                    limit: u.limit_bytes,
271                });
272            }
273            Ok(())
274        }
275
276        async fn record(&self, pod: &str, delta_bytes: i64) {
277            let current = self
278                .effective(pod)
279                .await
280                .unwrap_or(QuotaUsage {
281                    used_bytes: 0,
282                    limit_bytes: self.default_limit,
283                });
284            let new_used = if delta_bytes >= 0 {
285                current
286                    .used_bytes
287                    .saturating_add(delta_bytes as u64)
288            } else {
289                current
290                    .used_bytes
291                    .saturating_sub((-delta_bytes) as u64)
292            };
293            let updated = QuotaUsage {
294                used_bytes: new_used,
295                limit_bytes: current.limit_bytes,
296            };
297            // Best-effort: swallow IO errors at record time — callers
298            // should rely on reconcile() to recover.
299            let _ = self.write_sidecar(pod, &updated).await;
300        }
301
302        async fn reconcile(&self, pod: &str) -> std::io::Result<QuotaUsage> {
303            // JSS parity (post-#310): opportunistically clean up stale
304            // `.quota.json.tmp-*` orphans left by crashed writers
305            // BEFORE computing disk truth. Errors here are ignored —
306            // reconcile is best-effort and the authoritative write path
307            // at the bottom of this method will surface any IO failure
308            // that actually matters.
309            let _ = self.sweep_quota_orphans(pod).await;
310            let actual = self.dir_size_boxed(&self.pod_dir(pod)).await?;
311            let limit = match self.read_sidecar(pod).await? {
312                Some(u) if u.limit_bytes > 0 => u.limit_bytes,
313                _ => self.default_limit,
314            };
315            let reconciled = QuotaUsage {
316                used_bytes: actual,
317                limit_bytes: limit,
318            };
319            self.write_sidecar(pod, &reconciled).await?;
320            Ok(reconciled)
321        }
322
323        async fn usage(&self, pod: &str) -> Option<QuotaUsage> {
324            match self.read_sidecar(pod).await {
325                Ok(Some(mut u)) => {
326                    if u.limit_bytes == 0 {
327                        u.limit_bytes = self.default_limit;
328                    }
329                    Some(u)
330                }
331                Ok(None) | Err(_) => None,
332            }
333        }
334    }
335}
336
337#[cfg(feature = "quota")]
338pub use fs_impl::FsQuotaStore;