use anyhow::{Context, Result};
use rusqlite::{Connection, OptionalExtension, params};
use serde::{Deserialize, Serialize};
pub const GLOBAL_NAMESPACE: &str = "_global";
pub const DEFAULT_MAX_MEMORIES_PER_DAY: i64 = 1000;
pub const DEFAULT_MAX_STORAGE_BYTES: i64 = 100 * 1024 * 1024;
pub const DEFAULT_MAX_LINKS_PER_DAY: i64 = 5000;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct QuotaDefaults {
pub max_memories_per_day: i64,
pub max_storage_bytes: i64,
pub max_links_per_day: i64,
}
impl Default for QuotaDefaults {
fn default() -> Self {
Self {
max_memories_per_day: DEFAULT_MAX_MEMORIES_PER_DAY,
max_storage_bytes: DEFAULT_MAX_STORAGE_BYTES,
max_links_per_day: DEFAULT_MAX_LINKS_PER_DAY,
}
}
}
static QUOTA_DEFAULTS: std::sync::OnceLock<QuotaDefaults> = std::sync::OnceLock::new();
pub fn set_quota_defaults(defaults: QuotaDefaults) {
let _ = QUOTA_DEFAULTS.set(defaults);
}
#[must_use]
pub fn quota_defaults() -> QuotaDefaults {
QUOTA_DEFAULTS.get().copied().unwrap_or_default()
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QuotaOp {
Memory { bytes: i64 },
Link,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum QuotaLimit {
MemoriesPerDay,
StorageBytes,
LinksPerDay,
}
impl QuotaLimit {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::MemoriesPerDay => "memories_per_day",
Self::StorageBytes => "storage_bytes",
Self::LinksPerDay => "links_per_day",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QuotaError {
pub agent_id: String,
pub namespace: String,
pub limit: QuotaLimit,
pub current: i64,
pub max: i64,
}
impl std::fmt::Display for QuotaError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"QUOTA_EXCEEDED: agent {} namespace {} hit {} (current={}, max={})",
self.agent_id,
self.namespace,
self.limit.as_str(),
self.current,
self.max,
)
}
}
impl std::error::Error for QuotaError {}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct QuotaStatus {
pub agent_id: String,
#[serde(default = "default_namespace")]
pub namespace: String,
pub max_memories_per_day: i64,
pub max_storage_bytes: i64,
pub max_links_per_day: i64,
pub current_memories_today: i64,
pub current_storage_bytes: i64,
pub current_links_today: i64,
pub day_started_at: String,
pub created_at: String,
pub updated_at: String,
}
fn default_namespace() -> String {
GLOBAL_NAMESPACE.to_string()
}
fn ensure_row(conn: &Connection, agent_id: &str, namespace: &str) -> Result<QuotaStatus> {
if let Some(row) = load_row(conn, agent_id, namespace)? {
return Ok(row);
}
let now = chrono::Utc::now().to_rfc3339();
let day = day_bucket(&now);
let defaults = quota_defaults();
conn.execute(
"INSERT OR IGNORE INTO agent_quotas
(agent_id, namespace,
max_memories_per_day, max_storage_bytes, max_links_per_day,
current_memories_today, current_storage_bytes, current_links_today,
day_started_at, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, 0, 0, 0, ?6, ?7, ?7)",
params![
agent_id,
namespace,
defaults.max_memories_per_day,
defaults.max_storage_bytes,
defaults.max_links_per_day,
day,
now,
],
)
.context("failed to insert default quota row")?;
load_row(conn, agent_id, namespace)?
.context("quota row missing immediately after insert (concurrent delete?)")
}
fn load_row(conn: &Connection, agent_id: &str, namespace: &str) -> Result<Option<QuotaStatus>> {
conn.query_row(
"SELECT agent_id, namespace,
max_memories_per_day, max_storage_bytes, max_links_per_day,
current_memories_today, current_storage_bytes, current_links_today,
day_started_at, created_at, updated_at
FROM agent_quotas
WHERE agent_id = ?1 AND namespace = ?2",
params![agent_id, namespace],
|r| {
Ok(QuotaStatus {
agent_id: r.get(0)?,
namespace: r.get(1)?,
max_memories_per_day: r.get(2)?,
max_storage_bytes: r.get(3)?,
max_links_per_day: r.get(4)?,
current_memories_today: r.get(5)?,
current_storage_bytes: r.get(6)?,
current_links_today: r.get(7)?,
day_started_at: r.get(8)?,
created_at: r.get(9)?,
updated_at: r.get(10)?,
})
},
)
.optional()
.context("failed to load agent quota row")
}
fn day_bucket(rfc3339: &str) -> String {
rfc3339.get(..10).unwrap_or(rfc3339).to_string()
}
pub fn check_quota(
conn: &Connection,
agent_id: &str,
namespace: &str,
op: QuotaOp,
) -> std::result::Result<(), QuotaCheckError> {
let row = ensure_row(conn, agent_id, namespace).map_err(QuotaCheckError::Sql)?;
let today = day_bucket(&chrono::Utc::now().to_rfc3339());
let stored_day = day_bucket(&row.day_started_at);
let (memories_today, links_today) = if stored_day == today {
(row.current_memories_today, row.current_links_today)
} else {
(0, 0)
};
match op {
QuotaOp::Memory { bytes } => {
if memories_today.saturating_add(1) > row.max_memories_per_day {
return Err(QuotaCheckError::Quota(QuotaError {
agent_id: agent_id.to_string(),
namespace: namespace.to_string(),
limit: QuotaLimit::MemoriesPerDay,
current: memories_today,
max: row.max_memories_per_day,
}));
}
if row.current_storage_bytes.saturating_add(bytes) > row.max_storage_bytes {
return Err(QuotaCheckError::Quota(QuotaError {
agent_id: agent_id.to_string(),
namespace: namespace.to_string(),
limit: QuotaLimit::StorageBytes,
current: row.current_storage_bytes,
max: row.max_storage_bytes,
}));
}
}
QuotaOp::Link => {
if links_today.saturating_add(1) > row.max_links_per_day {
return Err(QuotaCheckError::Quota(QuotaError {
agent_id: agent_id.to_string(),
namespace: namespace.to_string(),
limit: QuotaLimit::LinksPerDay,
current: links_today,
max: row.max_links_per_day,
}));
}
}
}
Ok(())
}
#[derive(Debug)]
pub enum QuotaCheckError {
Quota(QuotaError),
Sql(anyhow::Error),
}
impl std::fmt::Display for QuotaCheckError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Quota(q) => std::fmt::Display::fmt(q, f),
Self::Sql(e) => write!(f, "quota check substrate error: {e}"),
}
}
}
impl std::error::Error for QuotaCheckError {}
fn quota_update_failed(e: impl std::fmt::Display) -> QuotaCheckError {
QuotaCheckError::Sql(anyhow::anyhow!("update failed: {e}"))
}
pub(crate) fn log_refund_op_failed(agent_id: &str, e: &dyn std::fmt::Display) {
tracing::warn!("quota refund_op failed for agent {agent_id}: {e}");
}
pub fn check_and_record(
conn: &Connection,
agent_id: &str,
namespace: &str,
op: QuotaOp,
) -> std::result::Result<(), QuotaCheckError> {
let _ = ensure_row(conn, agent_id, namespace).map_err(QuotaCheckError::Sql)?;
conn.execute_batch(crate::storage::connection::SQL_BEGIN_IMMEDIATE)
.map_err(|e| QuotaCheckError::Sql(anyhow::anyhow!("BEGIN IMMEDIATE failed: {e}")))?;
let result: std::result::Result<(), QuotaCheckError> = (|| {
let row = load_row(conn, agent_id, namespace)
.map_err(QuotaCheckError::Sql)?
.ok_or_else(|| {
QuotaCheckError::Sql(anyhow::anyhow!(
"quota row vanished mid-transaction for agent {agent_id} namespace {namespace}"
))
})?;
let now = chrono::Utc::now().to_rfc3339();
let today = day_bucket(&now);
let stored_day = day_bucket(&row.day_started_at);
let day_rolled = stored_day != today;
let (memories_today, links_today) = if day_rolled {
(0, 0)
} else {
(row.current_memories_today, row.current_links_today)
};
match op {
QuotaOp::Memory { bytes } => {
if memories_today.saturating_add(1) > row.max_memories_per_day {
return Err(QuotaCheckError::Quota(QuotaError {
agent_id: agent_id.to_string(),
namespace: namespace.to_string(),
limit: QuotaLimit::MemoriesPerDay,
current: memories_today,
max: row.max_memories_per_day,
}));
}
if row.current_storage_bytes.saturating_add(bytes) > row.max_storage_bytes {
return Err(QuotaCheckError::Quota(QuotaError {
agent_id: agent_id.to_string(),
namespace: namespace.to_string(),
limit: QuotaLimit::StorageBytes,
current: row.current_storage_bytes,
max: row.max_storage_bytes,
}));
}
if day_rolled {
conn.execute(
"UPDATE agent_quotas SET
current_memories_today = 1,
current_links_today = 0,
current_storage_bytes = current_storage_bytes + ?1,
day_started_at = ?2,
updated_at = ?2
WHERE agent_id = ?3 AND namespace = ?4",
params![bytes, now, agent_id, namespace],
)
.map_err(quota_update_failed)?;
} else {
conn.execute(
"UPDATE agent_quotas SET
current_memories_today = current_memories_today + 1,
current_storage_bytes = current_storage_bytes + ?1,
updated_at = ?2
WHERE agent_id = ?3 AND namespace = ?4",
params![bytes, now, agent_id, namespace],
)
.map_err(quota_update_failed)?;
}
}
QuotaOp::Link => {
if links_today.saturating_add(1) > row.max_links_per_day {
return Err(QuotaCheckError::Quota(QuotaError {
agent_id: agent_id.to_string(),
namespace: namespace.to_string(),
limit: QuotaLimit::LinksPerDay,
current: links_today,
max: row.max_links_per_day,
}));
}
if day_rolled {
conn.execute(
"UPDATE agent_quotas SET
current_memories_today = 0,
current_links_today = 1,
day_started_at = ?1,
updated_at = ?1
WHERE agent_id = ?2 AND namespace = ?3",
params![now, agent_id, namespace],
)
.map_err(quota_update_failed)?;
} else {
conn.execute(
"UPDATE agent_quotas SET
current_links_today = current_links_today + 1,
updated_at = ?1
WHERE agent_id = ?2 AND namespace = ?3",
params![now, agent_id, namespace],
)
.map_err(quota_update_failed)?;
}
}
}
Ok(())
})();
match result {
Ok(()) => {
conn.execute_batch(crate::storage::connection::SQL_COMMIT)
.map_err(|e| QuotaCheckError::Sql(anyhow::anyhow!("quota commit failed: {e}")))?;
Ok(())
}
Err(e) => {
let _ = conn.execute_batch(crate::storage::connection::SQL_ROLLBACK);
Err(e)
}
}
}
pub fn refund_op(conn: &Connection, agent_id: &str, namespace: &str, op: QuotaOp) -> Result<()> {
let now = chrono::Utc::now().to_rfc3339();
match op {
QuotaOp::Memory { bytes } => {
conn.execute(
"UPDATE agent_quotas SET
current_memories_today = MAX(current_memories_today - 1, 0),
current_storage_bytes = MAX(current_storage_bytes - ?1, 0),
updated_at = ?2
WHERE agent_id = ?3 AND namespace = ?4",
params![bytes, now, agent_id, namespace],
)?;
}
QuotaOp::Link => {
conn.execute(
"UPDATE agent_quotas SET
current_links_today = MAX(current_links_today - 1, 0),
updated_at = ?1
WHERE agent_id = ?2 AND namespace = ?3",
params![now, agent_id, namespace],
)?;
}
}
Ok(())
}
pub fn record_op(conn: &Connection, agent_id: &str, namespace: &str, op: QuotaOp) -> Result<()> {
let row = ensure_row(conn, agent_id, namespace)?;
let now = chrono::Utc::now().to_rfc3339();
let today = day_bucket(&now);
let stored_day = day_bucket(&row.day_started_at);
let day_rolled = stored_day != today;
match op {
QuotaOp::Memory { bytes } => {
if day_rolled {
conn.execute(
"UPDATE agent_quotas SET
current_memories_today = 1,
current_links_today = 0,
current_storage_bytes = current_storage_bytes + ?1,
day_started_at = ?2,
updated_at = ?2
WHERE agent_id = ?3 AND namespace = ?4",
params![bytes, now, agent_id, namespace],
)?;
} else {
conn.execute(
"UPDATE agent_quotas SET
current_memories_today = current_memories_today + 1,
current_storage_bytes = current_storage_bytes + ?1,
updated_at = ?2
WHERE agent_id = ?3 AND namespace = ?4",
params![bytes, now, agent_id, namespace],
)?;
}
}
QuotaOp::Link => {
if day_rolled {
conn.execute(
"UPDATE agent_quotas SET
current_memories_today = 0,
current_links_today = 1,
day_started_at = ?1,
updated_at = ?1
WHERE agent_id = ?2 AND namespace = ?3",
params![now, agent_id, namespace],
)?;
} else {
conn.execute(
"UPDATE agent_quotas SET
current_links_today = current_links_today + 1,
updated_at = ?1
WHERE agent_id = ?2 AND namespace = ?3",
params![now, agent_id, namespace],
)?;
}
}
}
Ok(())
}
pub fn reset_daily(conn: &Connection) -> Result<usize> {
let now = chrono::Utc::now().to_rfc3339();
let today = day_bucket(&now);
let affected = conn.execute(
"UPDATE agent_quotas SET
current_memories_today = 0,
current_links_today = 0,
day_started_at = ?1,
updated_at = ?1
WHERE substr(day_started_at, 1, 10) <> ?2",
params![now, today],
)?;
Ok(affected)
}
pub fn get_status(conn: &Connection, agent_id: &str, namespace: &str) -> Result<QuotaStatus> {
ensure_row(conn, agent_id, namespace)
}
pub fn get_aggregate_status(conn: &Connection, agent_id: &str) -> Result<QuotaStatus> {
let mut stmt = conn
.prepare(
"SELECT
COALESCE(MAX(max_memories_per_day), 0),
COALESCE(MAX(max_storage_bytes), 0),
COALESCE(MAX(max_links_per_day), 0),
COALESCE(SUM(current_memories_today), 0),
COALESCE(SUM(current_storage_bytes), 0),
COALESCE(SUM(current_links_today), 0),
COALESCE(MIN(day_started_at), ''),
COALESCE(MIN(created_at), ''),
COALESCE(MAX(updated_at), '')
FROM agent_quotas WHERE agent_id = ?1",
)
.context("failed to prepare aggregate quota query")?;
let row: Option<(i64, i64, i64, i64, i64, i64, String, String, String)> = stmt
.query_row(params![agent_id], |r| {
Ok((
r.get(0)?,
r.get(1)?,
r.get(2)?,
r.get(3)?,
r.get(4)?,
r.get(5)?,
r.get(6)?,
r.get(7)?,
r.get(8)?,
))
})
.optional()
.context("failed to read aggregate quota row")?;
drop(stmt);
if let Some((mm, ms, ml, cm, cs, cl, day, created, updated)) = row {
if !created.is_empty() {
return Ok(QuotaStatus {
agent_id: agent_id.to_string(),
namespace: GLOBAL_NAMESPACE.to_string(),
max_memories_per_day: mm,
max_storage_bytes: ms,
max_links_per_day: ml,
current_memories_today: cm,
current_storage_bytes: cs,
current_links_today: cl,
day_started_at: day,
created_at: created,
updated_at: updated,
});
}
}
ensure_row(conn, agent_id, GLOBAL_NAMESPACE)
}
pub fn list_status(conn: &Connection, namespace_filter: Option<&str>) -> Result<Vec<QuotaStatus>> {
let map_row = |r: &rusqlite::Row<'_>| -> rusqlite::Result<QuotaStatus> {
Ok(QuotaStatus {
agent_id: r.get(0)?,
namespace: r.get(1)?,
max_memories_per_day: r.get(2)?,
max_storage_bytes: r.get(3)?,
max_links_per_day: r.get(4)?,
current_memories_today: r.get(5)?,
current_storage_bytes: r.get(6)?,
current_links_today: r.get(7)?,
day_started_at: r.get(8)?,
created_at: r.get(9)?,
updated_at: r.get(10)?,
})
};
let mut out = Vec::new();
if let Some(ns) = namespace_filter {
let mut stmt = conn
.prepare(
"SELECT agent_id, namespace,
max_memories_per_day, max_storage_bytes, max_links_per_day,
current_memories_today, current_storage_bytes, current_links_today,
day_started_at, created_at, updated_at
FROM agent_quotas
WHERE namespace = ?1
ORDER BY agent_id ASC, namespace ASC",
)
.context("failed to prepare per-namespace quota list query")?;
let rows = stmt
.query_map(params![ns], map_row)
.context("failed to query per-namespace quota rows")?;
for row in rows {
out.push(row.context("failed to materialize quota row")?);
}
} else {
let mut stmt = conn
.prepare(
"SELECT agent_id, namespace,
max_memories_per_day, max_storage_bytes, max_links_per_day,
current_memories_today, current_storage_bytes, current_links_today,
day_started_at, created_at, updated_at
FROM agent_quotas
ORDER BY agent_id ASC, namespace ASC",
)
.context("failed to prepare quota list query")?;
let rows = stmt
.query_map([], map_row)
.context("failed to query quota rows")?;
for row in rows {
out.push(row.context("failed to materialize quota row")?);
}
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
fn fresh_db() -> Connection {
let conn = Connection::open_in_memory().expect("open in-memory");
conn.execute_batch(include_str!(
"../migrations/sqlite/0022_v07_agent_quotas.sql"
))
.expect("apply v28 K8 migration");
conn.execute_batch(include_str!(
"../migrations/sqlite/0042_v50_per_namespace_quota.sql"
))
.expect("apply v50 per-namespace migration");
conn
}
#[test]
fn check_quota_under_limit_returns_ok() {
let conn = fresh_db();
assert!(
check_quota(
&conn,
"agent-a",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 100 }
)
.is_ok()
);
}
#[test]
fn check_quota_at_memory_limit_returns_quota_exceeded() {
let conn = fresh_db();
check_quota(
&conn,
"agent-a",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 1 },
)
.unwrap();
conn.execute(
"UPDATE agent_quotas SET max_memories_per_day = 1
WHERE agent_id = ?1 AND namespace = ?2",
params!["agent-a", GLOBAL_NAMESPACE],
)
.unwrap();
record_op(
&conn,
"agent-a",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 1 },
)
.unwrap();
let err = check_quota(
&conn,
"agent-a",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 1 },
)
.unwrap_err();
match err {
QuotaCheckError::Quota(q) => {
assert_eq!(q.limit, QuotaLimit::MemoriesPerDay);
assert_eq!(q.max, 1);
assert_eq!(q.namespace, GLOBAL_NAMESPACE);
}
QuotaCheckError::Sql(e) => panic!("expected QuotaError, got SQL: {e}"),
}
}
#[test]
fn check_quota_storage_bytes_limit_fires() {
let conn = fresh_db();
check_quota(
&conn,
"agent-b",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 1 },
)
.unwrap();
conn.execute(
"UPDATE agent_quotas SET max_storage_bytes = 100
WHERE agent_id = ?1 AND namespace = ?2",
params!["agent-b", GLOBAL_NAMESPACE],
)
.unwrap();
let err = check_quota(
&conn,
"agent-b",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 200 },
)
.unwrap_err();
match err {
QuotaCheckError::Quota(q) => assert_eq!(q.limit, QuotaLimit::StorageBytes),
QuotaCheckError::Sql(e) => panic!("expected QuotaError, got SQL: {e}"),
}
}
#[test]
fn check_quota_links_per_day_limit_fires() {
let conn = fresh_db();
check_quota(&conn, "agent-c", GLOBAL_NAMESPACE, QuotaOp::Link).unwrap();
conn.execute(
"UPDATE agent_quotas SET max_links_per_day = 1, current_links_today = 1
WHERE agent_id = ?1 AND namespace = ?2",
params!["agent-c", GLOBAL_NAMESPACE],
)
.unwrap();
let err = check_quota(&conn, "agent-c", GLOBAL_NAMESPACE, QuotaOp::Link).unwrap_err();
match err {
QuotaCheckError::Quota(q) => assert_eq!(q.limit, QuotaLimit::LinksPerDay),
QuotaCheckError::Sql(e) => panic!("expected QuotaError, got SQL: {e}"),
}
}
#[test]
fn record_op_increments_counters() {
let conn = fresh_db();
record_op(
&conn,
"agent-d",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 42 },
)
.unwrap();
let s = get_status(&conn, "agent-d", GLOBAL_NAMESPACE).unwrap();
assert_eq!(s.current_memories_today, 1);
assert_eq!(s.current_storage_bytes, 42);
record_op(&conn, "agent-d", GLOBAL_NAMESPACE, QuotaOp::Link).unwrap();
let s2 = get_status(&conn, "agent-d", GLOBAL_NAMESPACE).unwrap();
assert_eq!(s2.current_links_today, 1);
}
#[test]
fn reset_daily_zeros_stale_rows_only() {
let conn = fresh_db();
record_op(
&conn,
"agent-e",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 10 },
)
.unwrap();
record_op(&conn, "agent-f", GLOBAL_NAMESPACE, QuotaOp::Link).unwrap();
conn.execute(
"UPDATE agent_quotas SET day_started_at = '2020-01-01T00:00:00+00:00'
WHERE agent_id = ?1 AND namespace = ?2",
params!["agent-e", GLOBAL_NAMESPACE],
)
.unwrap();
let n = reset_daily(&conn).unwrap();
assert_eq!(n, 1, "exactly one stale row should be reset");
let s_e = get_status(&conn, "agent-e", GLOBAL_NAMESPACE).unwrap();
assert_eq!(s_e.current_memories_today, 0);
let s_f = get_status(&conn, "agent-f", GLOBAL_NAMESPACE).unwrap();
assert_eq!(
s_f.current_links_today, 1,
"fresh row must not be touched by the daily reset"
);
assert_eq!(s_e.current_storage_bytes, 10);
}
#[test]
fn list_status_returns_all_rows_sorted() {
let conn = fresh_db();
record_op(
&conn,
"z-agent",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 1 },
)
.unwrap();
record_op(
&conn,
"a-agent",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 1 },
)
.unwrap();
record_op(
&conn,
"m-agent",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 1 },
)
.unwrap();
let rows = list_status(&conn, None).unwrap();
let ids: Vec<&str> = rows.iter().map(|r| r.agent_id.as_str()).collect();
assert_eq!(ids, vec!["a-agent", "m-agent", "z-agent"]);
}
#[test]
fn get_status_auto_inserts_default_row() {
let conn = fresh_db();
let s = get_status(&conn, "fresh-agent", GLOBAL_NAMESPACE).unwrap();
assert_eq!(s.max_memories_per_day, DEFAULT_MAX_MEMORIES_PER_DAY);
assert_eq!(s.max_storage_bytes, DEFAULT_MAX_STORAGE_BYTES);
assert_eq!(s.max_links_per_day, DEFAULT_MAX_LINKS_PER_DAY);
assert_eq!(s.current_memories_today, 0);
assert_eq!(s.namespace, GLOBAL_NAMESPACE);
}
#[test]
fn quota_limit_as_str_returns_expected_canonical_form() {
assert_eq!(QuotaLimit::MemoriesPerDay.as_str(), "memories_per_day");
assert_eq!(QuotaLimit::StorageBytes.as_str(), "storage_bytes");
assert_eq!(QuotaLimit::LinksPerDay.as_str(), "links_per_day");
}
#[test]
fn quota_error_display_format_contract() {
let err = QuotaError {
agent_id: "alice".to_string(),
namespace: "team/policies".to_string(),
limit: QuotaLimit::StorageBytes,
current: 1024,
max: 2048,
};
let s = format!("{err}");
assert!(s.contains("QUOTA_EXCEEDED"));
assert!(s.contains("alice"));
assert!(s.contains("team/policies"));
assert!(s.contains("storage_bytes"));
assert!(s.contains("current=1024"));
assert!(s.contains("max=2048"));
let _: &dyn std::error::Error = &err;
}
#[test]
fn quota_check_error_display_quota_variant_delegates_to_inner() {
let err = QuotaCheckError::Quota(QuotaError {
agent_id: "bob".to_string(),
namespace: GLOBAL_NAMESPACE.to_string(),
limit: QuotaLimit::MemoriesPerDay,
current: 99,
max: 100,
});
let s = format!("{err}");
assert!(s.contains("QUOTA_EXCEEDED"));
assert!(s.contains("memories_per_day"));
let _: &dyn std::error::Error = &err;
}
#[test]
fn quota_check_error_display_sql_variant_wraps_substrate_error() {
let err = QuotaCheckError::Sql(anyhow::anyhow!("boom"));
let s = format!("{err}");
assert!(s.contains("quota check substrate error"));
assert!(s.contains("boom"));
}
#[test]
fn check_and_record_under_limit_increments_counters() {
let conn = fresh_db();
check_and_record(
&conn,
"agent-cr-a",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 50 },
)
.unwrap();
let s = get_status(&conn, "agent-cr-a", GLOBAL_NAMESPACE).unwrap();
assert_eq!(s.current_memories_today, 1);
assert_eq!(s.current_storage_bytes, 50);
check_and_record(&conn, "agent-cr-a", GLOBAL_NAMESPACE, QuotaOp::Link).unwrap();
let s2 = get_status(&conn, "agent-cr-a", GLOBAL_NAMESPACE).unwrap();
assert_eq!(s2.current_links_today, 1);
}
#[test]
fn check_and_record_at_memories_limit_returns_quota_error_and_rolls_back() {
let conn = fresh_db();
check_and_record(
&conn,
"agent-cr-b",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 1 },
)
.unwrap();
conn.execute(
"UPDATE agent_quotas SET max_memories_per_day = 1
WHERE agent_id = ?1 AND namespace = ?2",
params!["agent-cr-b", GLOBAL_NAMESPACE],
)
.unwrap();
let err = check_and_record(
&conn,
"agent-cr-b",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 1 },
)
.unwrap_err();
match err {
QuotaCheckError::Quota(q) => {
assert_eq!(q.limit, QuotaLimit::MemoriesPerDay);
}
QuotaCheckError::Sql(e) => panic!("expected Quota, got SQL: {e}"),
}
let s = get_status(&conn, "agent-cr-b", GLOBAL_NAMESPACE).unwrap();
assert_eq!(s.current_memories_today, 1);
}
#[test]
fn check_and_record_storage_limit_returns_quota_error() {
let conn = fresh_db();
check_and_record(
&conn,
"agent-cr-c",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 1 },
)
.unwrap();
conn.execute(
"UPDATE agent_quotas SET max_storage_bytes = 100
WHERE agent_id = ?1 AND namespace = ?2",
params!["agent-cr-c", GLOBAL_NAMESPACE],
)
.unwrap();
let err = check_and_record(
&conn,
"agent-cr-c",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 1000 },
)
.expect_err("storage cap should fire");
match err {
QuotaCheckError::Quota(q) => assert_eq!(q.limit, QuotaLimit::StorageBytes),
QuotaCheckError::Sql(e) => panic!("expected quota, got SQL: {e}"),
}
}
#[test]
fn check_and_record_links_limit_returns_quota_error() {
let conn = fresh_db();
check_and_record(&conn, "agent-cr-d", GLOBAL_NAMESPACE, QuotaOp::Link).unwrap();
conn.execute(
"UPDATE agent_quotas SET max_links_per_day = 1
WHERE agent_id = ?1 AND namespace = ?2",
params!["agent-cr-d", GLOBAL_NAMESPACE],
)
.unwrap();
let err = check_and_record(&conn, "agent-cr-d", GLOBAL_NAMESPACE, QuotaOp::Link)
.expect_err("links cap should fire");
match err {
QuotaCheckError::Quota(q) => assert_eq!(q.limit, QuotaLimit::LinksPerDay),
QuotaCheckError::Sql(e) => panic!("expected quota, got SQL: {e}"),
}
}
#[test]
fn check_and_record_day_roll_branch_for_memory_zeros_daily_counters() {
let conn = fresh_db();
check_and_record(
&conn,
"agent-cr-e",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 10 },
)
.unwrap();
conn.execute(
"UPDATE agent_quotas SET day_started_at = '2020-01-01T00:00:00+00:00',
current_memories_today = 999, current_links_today = 7
WHERE agent_id = ?1 AND namespace = ?2",
params!["agent-cr-e", GLOBAL_NAMESPACE],
)
.unwrap();
check_and_record(
&conn,
"agent-cr-e",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 5 },
)
.unwrap();
let s = get_status(&conn, "agent-cr-e", GLOBAL_NAMESPACE).unwrap();
assert_eq!(s.current_memories_today, 1);
assert_eq!(s.current_links_today, 0);
assert_eq!(s.current_storage_bytes, 15);
}
#[test]
fn check_and_record_day_roll_branch_for_link_resets_daily_counters() {
let conn = fresh_db();
check_and_record(&conn, "agent-cr-f", GLOBAL_NAMESPACE, QuotaOp::Link).unwrap();
conn.execute(
"UPDATE agent_quotas SET day_started_at = '2020-01-01T00:00:00+00:00',
current_memories_today = 50, current_links_today = 8
WHERE agent_id = ?1 AND namespace = ?2",
params!["agent-cr-f", GLOBAL_NAMESPACE],
)
.unwrap();
check_and_record(&conn, "agent-cr-f", GLOBAL_NAMESPACE, QuotaOp::Link).unwrap();
let s = get_status(&conn, "agent-cr-f", GLOBAL_NAMESPACE).unwrap();
assert_eq!(s.current_memories_today, 0);
assert_eq!(s.current_links_today, 1);
}
#[test]
fn refund_op_memory_decrements_counters_saturating_to_zero() {
let conn = fresh_db();
check_and_record(
&conn,
"agent-rf-a",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 200 },
)
.unwrap();
refund_op(
&conn,
"agent-rf-a",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 200 },
)
.unwrap();
let s = get_status(&conn, "agent-rf-a", GLOBAL_NAMESPACE).unwrap();
assert_eq!(s.current_memories_today, 0);
assert_eq!(s.current_storage_bytes, 0);
refund_op(
&conn,
"agent-rf-a",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 200 },
)
.unwrap();
let s2 = get_status(&conn, "agent-rf-a", GLOBAL_NAMESPACE).unwrap();
assert_eq!(s2.current_memories_today, 0);
assert_eq!(s2.current_storage_bytes, 0);
}
#[test]
fn refund_op_link_decrements_counter_saturating_to_zero() {
let conn = fresh_db();
check_and_record(&conn, "agent-rf-b", GLOBAL_NAMESPACE, QuotaOp::Link).unwrap();
refund_op(&conn, "agent-rf-b", GLOBAL_NAMESPACE, QuotaOp::Link).unwrap();
let s = get_status(&conn, "agent-rf-b", GLOBAL_NAMESPACE).unwrap();
assert_eq!(s.current_links_today, 0);
refund_op(&conn, "agent-rf-b", GLOBAL_NAMESPACE, QuotaOp::Link).unwrap();
let s2 = get_status(&conn, "agent-rf-b", GLOBAL_NAMESPACE).unwrap();
assert_eq!(s2.current_links_today, 0);
}
#[test]
fn record_op_day_roll_branch_for_memory() {
let conn = fresh_db();
record_op(
&conn,
"agent-ro-a",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 100 },
)
.unwrap();
conn.execute(
"UPDATE agent_quotas SET day_started_at = '2020-01-01T00:00:00+00:00',
current_memories_today = 50, current_links_today = 4
WHERE agent_id = ?1 AND namespace = ?2",
params!["agent-ro-a", GLOBAL_NAMESPACE],
)
.unwrap();
record_op(
&conn,
"agent-ro-a",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 5 },
)
.unwrap();
let s = get_status(&conn, "agent-ro-a", GLOBAL_NAMESPACE).unwrap();
assert_eq!(s.current_memories_today, 1);
assert_eq!(s.current_links_today, 0);
assert_eq!(s.current_storage_bytes, 105);
}
#[test]
fn record_op_day_roll_branch_for_link() {
let conn = fresh_db();
record_op(&conn, "agent-ro-b", GLOBAL_NAMESPACE, QuotaOp::Link).unwrap();
conn.execute(
"UPDATE agent_quotas SET day_started_at = '2020-01-01T00:00:00+00:00',
current_memories_today = 7, current_links_today = 9
WHERE agent_id = ?1 AND namespace = ?2",
params!["agent-ro-b", GLOBAL_NAMESPACE],
)
.unwrap();
record_op(&conn, "agent-ro-b", GLOBAL_NAMESPACE, QuotaOp::Link).unwrap();
let s = get_status(&conn, "agent-ro-b", GLOBAL_NAMESPACE).unwrap();
assert_eq!(s.current_memories_today, 0);
assert_eq!(s.current_links_today, 1);
}
#[test]
fn quota_status_serde_roundtrip_carries_namespace() {
let conn = fresh_db();
let s = get_status(&conn, "ser-agent", "team/policies").unwrap();
let json = serde_json::to_string(&s).unwrap();
let parsed: QuotaStatus = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.agent_id, "ser-agent");
assert_eq!(parsed.namespace, "team/policies");
assert_eq!(parsed.max_memories_per_day, DEFAULT_MAX_MEMORIES_PER_DAY);
}
#[test]
fn check_quota_day_roll_branch_treats_daily_as_zero() {
let conn = fresh_db();
check_quota(
&conn,
"agent-cq-roll",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 1 },
)
.unwrap();
conn.execute(
"UPDATE agent_quotas SET day_started_at = '2020-01-01T00:00:00+00:00',
current_memories_today = 99999, current_links_today = 99999
WHERE agent_id = ?1 AND namespace = ?2",
params!["agent-cq-roll", GLOBAL_NAMESPACE],
)
.unwrap();
assert!(
check_quota(
&conn,
"agent-cq-roll",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 1 }
)
.is_ok()
);
assert!(check_quota(&conn, "agent-cq-roll", GLOBAL_NAMESPACE, QuotaOp::Link).is_ok());
}
#[test]
fn per_namespace_isolation_memories() {
let conn = fresh_db();
check_and_record(
&conn,
"agent-ns",
"alice/scratch",
QuotaOp::Memory { bytes: 1 },
)
.unwrap();
check_and_record(
&conn,
"agent-ns",
"team/policies",
QuotaOp::Memory { bytes: 1 },
)
.unwrap();
conn.execute(
"UPDATE agent_quotas SET max_memories_per_day = 1
WHERE agent_id = ?1 AND namespace = ?2",
params!["agent-ns", "alice/scratch"],
)
.unwrap();
let err = check_and_record(
&conn,
"agent-ns",
"alice/scratch",
QuotaOp::Memory { bytes: 1 },
)
.unwrap_err();
match err {
QuotaCheckError::Quota(q) => {
assert_eq!(q.namespace, "alice/scratch");
assert_eq!(q.limit, QuotaLimit::MemoriesPerDay);
}
QuotaCheckError::Sql(e) => panic!("expected Quota, got SQL: {e}"),
}
check_and_record(
&conn,
"agent-ns",
"team/policies",
QuotaOp::Memory { bytes: 1 },
)
.unwrap();
}
#[test]
fn per_namespace_isolation_storage_bytes() {
let conn = fresh_db();
check_and_record(
&conn,
"agent-ns2",
"alice/scratch",
QuotaOp::Memory { bytes: 50 },
)
.unwrap();
conn.execute(
"UPDATE agent_quotas SET max_storage_bytes = 100
WHERE agent_id = ?1 AND namespace = ?2",
params!["agent-ns2", "alice/scratch"],
)
.unwrap();
let err = check_and_record(
&conn,
"agent-ns2",
"alice/scratch",
QuotaOp::Memory { bytes: 60 },
)
.unwrap_err();
assert!(matches!(err, QuotaCheckError::Quota(q) if q.limit == QuotaLimit::StorageBytes));
check_and_record(
&conn,
"agent-ns2",
"shared/team-a",
QuotaOp::Memory { bytes: 60 },
)
.unwrap();
}
#[test]
fn per_namespace_isolation_links() {
let conn = fresh_db();
check_and_record(&conn, "agent-ns3", "alice/scratch", QuotaOp::Link).unwrap();
conn.execute(
"UPDATE agent_quotas SET max_links_per_day = 1
WHERE agent_id = ?1 AND namespace = ?2",
params!["agent-ns3", "alice/scratch"],
)
.unwrap();
let err = check_and_record(&conn, "agent-ns3", "alice/scratch", QuotaOp::Link)
.expect_err("links cap on alice/scratch should fire");
assert!(matches!(err, QuotaCheckError::Quota(q) if q.limit == QuotaLimit::LinksPerDay));
check_and_record(&conn, "agent-ns3", "team/policies", QuotaOp::Link).unwrap();
}
#[test]
fn aggregate_status_sums_across_namespaces() {
let conn = fresh_db();
record_op(
&conn,
"agent-agg",
"alice/scratch",
QuotaOp::Memory { bytes: 100 },
)
.unwrap();
record_op(
&conn,
"agent-agg",
"team/policies",
QuotaOp::Memory { bytes: 200 },
)
.unwrap();
record_op(&conn, "agent-agg", "alice/scratch", QuotaOp::Link).unwrap();
record_op(&conn, "agent-agg", "team/policies", QuotaOp::Link).unwrap();
record_op(&conn, "agent-agg", "team/policies", QuotaOp::Link).unwrap();
let agg = get_aggregate_status(&conn, "agent-agg").unwrap();
assert_eq!(agg.agent_id, "agent-agg");
assert_eq!(agg.namespace, GLOBAL_NAMESPACE);
assert_eq!(agg.current_memories_today, 2);
assert_eq!(agg.current_storage_bytes, 300);
assert_eq!(agg.current_links_today, 3);
}
#[test]
fn list_status_returns_per_namespace_rows_sorted() {
let conn = fresh_db();
record_op(&conn, "agent-ls", "z-ns", QuotaOp::Memory { bytes: 1 }).unwrap();
record_op(&conn, "agent-ls", "a-ns", QuotaOp::Memory { bytes: 1 }).unwrap();
let rows = list_status(&conn, None).unwrap();
let agent_ls_rows: Vec<&QuotaStatus> =
rows.iter().filter(|r| r.agent_id == "agent-ls").collect();
assert_eq!(agent_ls_rows.len(), 2);
assert_eq!(agent_ls_rows[0].namespace, "a-ns");
assert_eq!(agent_ls_rows[1].namespace, "z-ns");
}
#[test]
fn list_status_namespace_filter() {
let conn = fresh_db();
record_op(
&conn,
"agent-lf",
"team/policies",
QuotaOp::Memory { bytes: 1 },
)
.unwrap();
record_op(
&conn,
"agent-lf",
"alice/scratch",
QuotaOp::Memory { bytes: 1 },
)
.unwrap();
record_op(
&conn,
"other-agent",
"team/policies",
QuotaOp::Memory { bytes: 1 },
)
.unwrap();
let rows = list_status(&conn, Some("team/policies")).unwrap();
for r in &rows {
assert_eq!(r.namespace, "team/policies");
}
let agent_ids: std::collections::HashSet<&str> =
rows.iter().map(|r| r.agent_id.as_str()).collect();
assert!(agent_ids.contains("agent-lf"));
assert!(agent_ids.contains("other-agent"));
}
#[test]
fn global_sentinel_is_backwards_compat_landing_zone() {
let conn = fresh_db();
record_op(
&conn,
"agent-bc",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 42 },
)
.unwrap();
let s = get_status(&conn, "agent-bc", GLOBAL_NAMESPACE).unwrap();
assert_eq!(s.namespace, GLOBAL_NAMESPACE);
assert_eq!(s.current_memories_today, 1);
assert_eq!(s.current_storage_bytes, 42);
}
#[test]
fn issue_1256_i64_max_input_does_not_wrap_under_saturating_add() {
let conn = fresh_db();
check_quota(
&conn,
"agent-1256",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 1 },
)
.expect("seed call must pass");
record_op(
&conn,
"agent-1256",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: 1 },
)
.unwrap();
let err = check_quota(
&conn,
"agent-1256",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: i64::MAX },
)
.expect_err("i64::MAX bytes input MUST be refused (post-#1256 saturating_add)");
match err {
QuotaCheckError::Quota(q) => {
assert_eq!(
q.limit,
QuotaLimit::StorageBytes,
"#1256: the storage-bytes cap must fire on the saturated sum, \
not any other limit"
);
assert_eq!(q.agent_id, "agent-1256");
}
QuotaCheckError::Sql(e) => {
panic!("#1256: expected QuotaError::StorageBytes, got SQL: {e}")
}
}
let err = check_and_record(
&conn,
"agent-1256-atomic",
GLOBAL_NAMESPACE,
QuotaOp::Memory { bytes: i64::MAX },
)
.expect_err("check_and_record must also refuse i64::MAX bytes (post-#1256)");
match err {
QuotaCheckError::Quota(q) => {
assert_eq!(q.limit, QuotaLimit::StorageBytes);
}
QuotaCheckError::Sql(e) => panic!("#1256: expected QuotaError, got SQL: {e}"),
}
}
}