solid_pod_rs/quota/mod.rs
1//! Pod-level quota policy (Sprint 7 §6.4, ADR-057).
2//!
3//! Mirrors JSS `src/storage/quota.js` — each pod gets a `.quota.json`
4//! sidecar file at its storage root carrying `{used_bytes, limit_bytes}`.
5//! [`QuotaPolicy::reconcile`] re-walks the pod's directory tree and
6//! rewrites the sidecar against disk truth; this is the recovery path
7//! after a crash / manual edit / storage-backend swap.
8//!
9//! The in-memory mutation path is cooperative, not authoritative:
10//! [`FsQuotaStore::record`] updates the sidecar best-effort, but
11//! callers MUST invoke [`FsQuotaStore::check`] before accepting a write
12//! to enforce the cap atomically relative to the policy's own view.
13//!
14//! # Feature gate
15//!
16//! The FS implementation lives behind `#[cfg(feature = "quota")]`
17//! alongside the [`QuotaPolicy`] trait's sole shipped adapter. The
18//! trait itself, [`QuotaUsage`], and [`QuotaExceeded`] are always
19//! compiled so downstream crates can build their own backends without
20//! opting in to the FS one.
21
22#[cfg(feature = "quota")]
23use std::path::{Path, PathBuf};
24
25use async_trait::async_trait;
26use serde::{Deserialize, Serialize};
27
28// ---------------------------------------------------------------------------
29// Always-compiled public types
30// ---------------------------------------------------------------------------
31
32/// Snapshot of a pod's quota counters.
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
34pub struct QuotaUsage {
35 /// Bytes currently attributed to the pod.
36 pub used_bytes: u64,
37 /// Hard cap; `0` in an on-disk sidecar is treated as "unset →
38 /// apply default" by [`FsQuotaStore`].
39 pub limit_bytes: u64,
40}
41
42/// Error surfaced when a pre-write check exceeds the pod's cap.
43///
44/// Carried through [`crate::error::PodError::QuotaExceeded`] when the
45/// `quota` feature is enabled; kept as a standalone type so other
46/// backends (e.g. S3-tag quota) can reuse the shape without pulling in
47/// `PodError` machinery directly.
48#[derive(Debug, Clone, thiserror::Error)]
49#[error("quota exceeded: pod={pod} used={used} limit={limit}")]
50pub struct QuotaExceeded {
51 pub pod: String,
52 pub used: u64,
53 pub limit: u64,
54}
55
56/// Per-pod quota policy. Call [`Self::check`] on write paths and
57/// [`Self::record`] after a successful write; [`Self::reconcile`] is
58/// the crash-recovery entry point that re-reads disk truth.
59#[async_trait]
60pub trait QuotaPolicy: Send + Sync {
61 /// Pre-write check: would adding `delta_bytes` push the pod over
62 /// its limit? A `delta_bytes == 0` check is always allowed so
63 /// HEAD/GET pre-checks never trip the quota.
64 async fn check(&self, pod: &str, delta_bytes: u64) -> Result<(), QuotaExceeded>;
65
66 /// Record an actual write. `delta_bytes` is signed to accommodate
67 /// DELETE as a negative delta; implementations saturate at zero.
68 async fn record(&self, pod: &str, delta_bytes: i64);
69
70 /// Re-scan the pod's storage and reset counters to disk truth.
71 async fn reconcile(&self, pod: &str) -> std::io::Result<QuotaUsage>;
72
73 /// Inspect current counters. `None` when the pod has no quota
74 /// sidecar yet (never written to, never reconciled).
75 async fn usage(&self, pod: &str) -> Option<QuotaUsage>;
76}
77
78// ---------------------------------------------------------------------------
79// FsQuotaStore — feature-gated.
80// ---------------------------------------------------------------------------
81
82#[cfg(feature = "quota")]
83mod fs_impl {
84 use super::*;
85 use tokio::fs;
86
87 const QUOTA_FILE: &str = ".quota.json";
88
89 /// Filesystem-backed quota store. Each pod lives under `root/<pod>/`
90 /// with a `.quota.json` sidecar at its root.
91 pub struct FsQuotaStore {
92 root: PathBuf,
93 default_limit: u64,
94 }
95
96 impl FsQuotaStore {
97 /// Construct a store rooted at `root` with `default_limit`
98 /// applied when a pod's sidecar is absent or has
99 /// `limit_bytes == 0` (parity with JSS's "uninitialised quota
100 /// → seed from default" branch).
101 pub fn new(root: PathBuf, default_limit: u64) -> Self {
102 Self {
103 root,
104 default_limit,
105 }
106 }
107
108 /// Filesystem path of the sidecar for a given pod.
109 pub fn quota_file(&self, pod: &str) -> PathBuf {
110 self.root.join(pod).join(QUOTA_FILE)
111 }
112
113 fn pod_dir(&self, pod: &str) -> PathBuf {
114 self.root.join(pod)
115 }
116
117 async fn read_sidecar(&self, pod: &str) -> std::io::Result<Option<QuotaUsage>> {
118 match fs::read(self.quota_file(pod)).await {
119 Ok(bytes) => {
120 let parsed: QuotaUsage = serde_json::from_slice(&bytes).map_err(|e| {
121 std::io::Error::new(std::io::ErrorKind::InvalidData, e)
122 })?;
123 Ok(Some(parsed))
124 }
125 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
126 Err(e) => Err(e),
127 }
128 }
129
130 /// Atomic sidecar write — mirrors JSS `saveQuota` (PR #309) which
131 /// closed an intermittent-500 race on concurrent PUTs. The write
132 /// goes to `.quota.json.tmp-<pid>-<nanos>` first, then is
133 /// renamed onto `.quota.json`; on POSIX the rename is atomic so
134 /// a concurrent reader never observes a half-written document
135 /// and a crash leaves at most an orphan `.tmp-*` that
136 /// [`Self::reconcile`] / startup sweep can clean up.
137 async fn write_sidecar(&self, pod: &str, usage: &QuotaUsage) -> std::io::Result<()> {
138 let path = self.quota_file(pod);
139 if let Some(parent) = path.parent() {
140 fs::create_dir_all(parent).await?;
141 }
142 let body = serde_json::to_vec_pretty(usage).map_err(|e| {
143 std::io::Error::new(std::io::ErrorKind::InvalidData, e)
144 })?;
145 let tmp = {
146 use std::time::{SystemTime, UNIX_EPOCH};
147 let nanos = SystemTime::now()
148 .duration_since(UNIX_EPOCH)
149 .map(|d| d.as_nanos())
150 .unwrap_or(0);
151 let pid = std::process::id();
152 let mut t = path.as_os_str().to_owned();
153 t.push(format!(".tmp-{pid}-{nanos}"));
154 PathBuf::from(t)
155 };
156 match fs::write(&tmp, &body).await {
157 Ok(()) => {}
158 Err(e) => {
159 let _ = fs::remove_file(&tmp).await;
160 return Err(e);
161 }
162 }
163 if let Err(e) = fs::rename(&tmp, &path).await {
164 let _ = fs::remove_file(&tmp).await;
165 return Err(e);
166 }
167 Ok(())
168 }
169
170 /// Sweep stale tempfile orphans left by crashed writers.
171 ///
172 /// Called by [`Self::reconcile`] to match JSS's post-#310
173 /// behaviour of ignoring (and cleaning up) half-written quota
174 /// sidecars. Deletes any `.quota.json.tmp-*` under the pod root.
175 async fn sweep_quota_orphans(&self, pod: &str) -> std::io::Result<()> {
176 let dir = match self.quota_file(pod).parent() {
177 Some(p) => p.to_path_buf(),
178 None => return Ok(()),
179 };
180 let mut rd = match fs::read_dir(&dir).await {
181 Ok(r) => r,
182 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
183 Err(e) => return Err(e),
184 };
185 while let Some(entry) = rd.next_entry().await? {
186 if let Some(name) = entry.file_name().to_str() {
187 if name.starts_with(".quota.json.tmp-") {
188 let _ = fs::remove_file(entry.path()).await;
189 }
190 }
191 }
192 Ok(())
193 }
194
195 /// Resolve the effective sidecar: on-disk value if present, else
196 /// default seed (used=0, limit=self.default_limit).
197 async fn effective(&self, pod: &str) -> std::io::Result<QuotaUsage> {
198 match self.read_sidecar(pod).await? {
199 Some(mut u) => {
200 // JSS parity: `limit == 0` means "apply default".
201 if u.limit_bytes == 0 {
202 u.limit_bytes = self.default_limit;
203 }
204 Ok(u)
205 }
206 None => Ok(QuotaUsage {
207 used_bytes: 0,
208 limit_bytes: self.default_limit,
209 }),
210 }
211 }
212
213 /// Recursively sum file sizes under `dir`, skipping `.quota.json`
214 /// at any depth.
215 fn dir_size_boxed<'a>(
216 &'a self,
217 dir: &'a Path,
218 ) -> std::pin::Pin<
219 Box<dyn std::future::Future<Output = std::io::Result<u64>> + Send + 'a>,
220 > {
221 Box::pin(async move {
222 let mut total: u64 = 0;
223 let mut rd = match fs::read_dir(dir).await {
224 Ok(r) => r,
225 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
226 Err(e) => return Err(e),
227 };
228 while let Some(entry) = rd.next_entry().await? {
229 let name = entry.file_name();
230 if name == QUOTA_FILE {
231 continue;
232 }
233 let ft = entry.file_type().await?;
234 if ft.is_dir() {
235 total = total.saturating_add(
236 self.dir_size_boxed(&entry.path()).await?,
237 );
238 } else if ft.is_file() {
239 let md = entry.metadata().await?;
240 total = total.saturating_add(md.len());
241 }
242 }
243 Ok(total)
244 })
245 }
246 }
247
248 #[async_trait]
249 impl QuotaPolicy for FsQuotaStore {
250 async fn check(&self, pod: &str, delta_bytes: u64) -> Result<(), QuotaExceeded> {
251 if delta_bytes == 0 {
252 // HEAD/GET-style zero-delta checks never reject; also
253 // matches JSS `checkQuota` where `additionalBytes=0` is
254 // structurally allowed (projected == used).
255 return Ok(());
256 }
257 let u = self.effective(pod).await.unwrap_or(QuotaUsage {
258 used_bytes: 0,
259 limit_bytes: self.default_limit,
260 });
261 // `limit == 0` after applying defaults means "no cap".
262 if u.limit_bytes == 0 {
263 return Ok(());
264 }
265 let projected = u.used_bytes.saturating_add(delta_bytes);
266 if projected > u.limit_bytes {
267 return Err(QuotaExceeded {
268 pod: pod.to_string(),
269 used: u.used_bytes,
270 limit: u.limit_bytes,
271 });
272 }
273 Ok(())
274 }
275
276 async fn record(&self, pod: &str, delta_bytes: i64) {
277 let current = self
278 .effective(pod)
279 .await
280 .unwrap_or(QuotaUsage {
281 used_bytes: 0,
282 limit_bytes: self.default_limit,
283 });
284 let new_used = if delta_bytes >= 0 {
285 current
286 .used_bytes
287 .saturating_add(delta_bytes as u64)
288 } else {
289 current
290 .used_bytes
291 .saturating_sub((-delta_bytes) as u64)
292 };
293 let updated = QuotaUsage {
294 used_bytes: new_used,
295 limit_bytes: current.limit_bytes,
296 };
297 // Best-effort: swallow IO errors at record time — callers
298 // should rely on reconcile() to recover.
299 let _ = self.write_sidecar(pod, &updated).await;
300 }
301
302 async fn reconcile(&self, pod: &str) -> std::io::Result<QuotaUsage> {
303 // JSS parity (post-#310): opportunistically clean up stale
304 // `.quota.json.tmp-*` orphans left by crashed writers
305 // BEFORE computing disk truth. Errors here are ignored —
306 // reconcile is best-effort and the authoritative write path
307 // at the bottom of this method will surface any IO failure
308 // that actually matters.
309 let _ = self.sweep_quota_orphans(pod).await;
310 let actual = self.dir_size_boxed(&self.pod_dir(pod)).await?;
311 let limit = match self.read_sidecar(pod).await? {
312 Some(u) if u.limit_bytes > 0 => u.limit_bytes,
313 _ => self.default_limit,
314 };
315 let reconciled = QuotaUsage {
316 used_bytes: actual,
317 limit_bytes: limit,
318 };
319 self.write_sidecar(pod, &reconciled).await?;
320 Ok(reconciled)
321 }
322
323 async fn usage(&self, pod: &str) -> Option<QuotaUsage> {
324 match self.read_sidecar(pod).await {
325 Ok(Some(mut u)) => {
326 if u.limit_bytes == 0 {
327 u.limit_bytes = self.default_limit;
328 }
329 Some(u)
330 }
331 Ok(None) | Err(_) => None,
332 }
333 }
334 }
335}
336
337#[cfg(feature = "quota")]
338pub use fs_impl::FsQuotaStore;