#[cfg(feature = "quota")]
use std::path::{Path, PathBuf};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct QuotaUsage {
pub used_bytes: u64,
pub limit_bytes: u64,
}
#[derive(Debug, Clone, thiserror::Error)]
#[error("quota exceeded: pod={pod} used={used} limit={limit}")]
pub struct QuotaExceeded {
pub pod: String,
pub used: u64,
pub limit: u64,
}
#[async_trait]
pub trait QuotaPolicy: Send + Sync {
async fn check(&self, pod: &str, delta_bytes: u64) -> Result<(), QuotaExceeded>;
async fn record(&self, pod: &str, delta_bytes: i64);
async fn reconcile(&self, pod: &str) -> std::io::Result<QuotaUsage>;
async fn usage(&self, pod: &str) -> Option<QuotaUsage>;
}
#[cfg(feature = "quota")]
mod fs_impl {
use super::*;
use tokio::fs;
const QUOTA_FILE: &str = ".quota.json";
pub struct FsQuotaStore {
root: PathBuf,
default_limit: u64,
}
impl FsQuotaStore {
pub fn new(root: PathBuf, default_limit: u64) -> Self {
Self {
root,
default_limit,
}
}
pub fn quota_file(&self, pod: &str) -> PathBuf {
self.root.join(pod).join(QUOTA_FILE)
}
fn pod_dir(&self, pod: &str) -> PathBuf {
self.root.join(pod)
}
async fn read_sidecar(&self, pod: &str) -> std::io::Result<Option<QuotaUsage>> {
match fs::read(self.quota_file(pod)).await {
Ok(bytes) => {
let parsed: QuotaUsage = serde_json::from_slice(&bytes).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, e)
})?;
Ok(Some(parsed))
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e),
}
}
async fn write_sidecar(&self, pod: &str, usage: &QuotaUsage) -> std::io::Result<()> {
let path = self.quota_file(pod);
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
let body = serde_json::to_vec_pretty(usage).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, e)
})?;
let tmp = {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let pid = std::process::id();
let mut t = path.as_os_str().to_owned();
t.push(format!(".tmp-{pid}-{nanos}"));
PathBuf::from(t)
};
match fs::write(&tmp, &body).await {
Ok(()) => {}
Err(e) => {
let _ = fs::remove_file(&tmp).await;
return Err(e);
}
}
if let Err(e) = fs::rename(&tmp, &path).await {
let _ = fs::remove_file(&tmp).await;
return Err(e);
}
Ok(())
}
async fn sweep_quota_orphans(&self, pod: &str) -> std::io::Result<()> {
let dir = match self.quota_file(pod).parent() {
Some(p) => p.to_path_buf(),
None => return Ok(()),
};
let mut rd = match fs::read_dir(&dir).await {
Ok(r) => r,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(e) => return Err(e),
};
while let Some(entry) = rd.next_entry().await? {
if let Some(name) = entry.file_name().to_str() {
if name.starts_with(".quota.json.tmp-") {
let _ = fs::remove_file(entry.path()).await;
}
}
}
Ok(())
}
async fn effective(&self, pod: &str) -> std::io::Result<QuotaUsage> {
match self.read_sidecar(pod).await? {
Some(mut u) => {
if u.limit_bytes == 0 {
u.limit_bytes = self.default_limit;
}
Ok(u)
}
None => Ok(QuotaUsage {
used_bytes: 0,
limit_bytes: self.default_limit,
}),
}
}
fn dir_size_boxed<'a>(
&'a self,
dir: &'a Path,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = std::io::Result<u64>> + Send + 'a>,
> {
Box::pin(async move {
let mut total: u64 = 0;
let mut rd = match fs::read_dir(dir).await {
Ok(r) => r,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
Err(e) => return Err(e),
};
while let Some(entry) = rd.next_entry().await? {
let name = entry.file_name();
if name == QUOTA_FILE {
continue;
}
let ft = entry.file_type().await?;
if ft.is_dir() {
total = total.saturating_add(
self.dir_size_boxed(&entry.path()).await?,
);
} else if ft.is_file() {
let md = entry.metadata().await?;
total = total.saturating_add(md.len());
}
}
Ok(total)
})
}
}
#[async_trait]
impl QuotaPolicy for FsQuotaStore {
async fn check(&self, pod: &str, delta_bytes: u64) -> Result<(), QuotaExceeded> {
if delta_bytes == 0 {
return Ok(());
}
let u = self.effective(pod).await.unwrap_or(QuotaUsage {
used_bytes: 0,
limit_bytes: self.default_limit,
});
if u.limit_bytes == 0 {
return Ok(());
}
let projected = u.used_bytes.saturating_add(delta_bytes);
if projected > u.limit_bytes {
return Err(QuotaExceeded {
pod: pod.to_string(),
used: u.used_bytes,
limit: u.limit_bytes,
});
}
Ok(())
}
async fn record(&self, pod: &str, delta_bytes: i64) {
let current = self
.effective(pod)
.await
.unwrap_or(QuotaUsage {
used_bytes: 0,
limit_bytes: self.default_limit,
});
let new_used = if delta_bytes >= 0 {
current
.used_bytes
.saturating_add(delta_bytes as u64)
} else {
current
.used_bytes
.saturating_sub((-delta_bytes) as u64)
};
let updated = QuotaUsage {
used_bytes: new_used,
limit_bytes: current.limit_bytes,
};
let _ = self.write_sidecar(pod, &updated).await;
}
async fn reconcile(&self, pod: &str) -> std::io::Result<QuotaUsage> {
let _ = self.sweep_quota_orphans(pod).await;
let actual = self.dir_size_boxed(&self.pod_dir(pod)).await?;
let limit = match self.read_sidecar(pod).await? {
Some(u) if u.limit_bytes > 0 => u.limit_bytes,
_ => self.default_limit,
};
let reconciled = QuotaUsage {
used_bytes: actual,
limit_bytes: limit,
};
self.write_sidecar(pod, &reconciled).await?;
Ok(reconciled)
}
async fn usage(&self, pod: &str) -> Option<QuotaUsage> {
match self.read_sidecar(pod).await {
Ok(Some(mut u)) => {
if u.limit_bytes == 0 {
u.limit_bytes = self.default_limit;
}
Some(u)
}
Ok(None) | Err(_) => None,
}
}
}
}
#[cfg(feature = "quota")]
pub use fs_impl::FsQuotaStore;