#![cfg(feature = "core")]
use std::sync::Arc;
use ff_backend_sqlite::SqliteBackend;
use ff_core::contracts::{
CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
ReportUsageAdminArgs, ReportUsageResult, ResetBudgetArgs, ResetBudgetResult,
};
use ff_core::engine_backend::EngineBackend;
use ff_core::engine_error::EngineError;
use ff_core::types::{BudgetId, QuotaPolicyId, TimestampMs};
use serial_test::serial;
fn now_ms() -> i64 {
use std::time::{SystemTime, UNIX_EPOCH};
i64::try_from(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0),
)
.unwrap_or(0)
}
fn uuid_like() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let tid = std::thread::current().id();
format!("{ns}-{tid:?}").replace([':', ' '], "-")
}
async fn fresh_backend() -> Arc<SqliteBackend> {
unsafe {
std::env::set_var("FF_DEV_MODE", "1");
}
let uri = format!(
"file:rfc-023-wave9-budget-{}?mode=memory&cache=shared",
uuid_like()
);
SqliteBackend::new(&uri).await.expect("backend")
}
fn budget_args(
id: &BudgetId,
reset_interval_ms: u64,
hard_tokens: u64,
soft_tokens: u64,
) -> CreateBudgetArgs {
CreateBudgetArgs {
budget_id: id.clone(),
scope_type: "flow".into(),
scope_id: "flow-1".into(),
enforcement_mode: "strict".into(),
on_hard_limit: "fail".into(),
on_soft_limit: "warn".into(),
reset_interval_ms,
dimensions: vec!["tokens".into()],
hard_limits: vec![hard_tokens],
soft_limits: vec![soft_tokens],
now: TimestampMs(now_ms()),
}
}
fn admin_args(dim: &str, delta: u64) -> ReportUsageAdminArgs {
ReportUsageAdminArgs::new(vec![dim.into()], vec![delta], TimestampMs(now_ms()))
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn create_budget_happy_path() {
let b = fresh_backend().await;
let bid = BudgetId::new();
let r1 = b.create_budget(budget_args(&bid, 0, 100, 80)).await.unwrap();
assert!(matches!(r1, CreateBudgetResult::Created { .. }));
let r2 = b.create_budget(budget_args(&bid, 0, 100, 80)).await.unwrap();
assert!(matches!(r2, CreateBudgetResult::AlreadySatisfied { .. }));
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn create_budget_seeds_next_reset_with_interval() {
let b = fresh_backend().await;
let bid = BudgetId::new();
let before = now_ms();
let interval_ms: u64 = 60_000;
b.create_budget(budget_args(&bid, interval_ms, 100, 80))
.await
.unwrap();
let status = b.get_budget_status(&bid).await.unwrap();
let next: i64 = status
.next_reset_at
.as_deref()
.expect("next_reset_at set when interval > 0")
.parse()
.expect("parseable");
assert!(
next >= before + interval_ms as i64,
"next_reset_at must be at least now + interval"
);
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn reset_budget_clears_counters_advances_next_reset() {
let b = fresh_backend().await;
let bid = BudgetId::new();
let interval_ms: u64 = 60_000;
b.create_budget(budget_args(&bid, interval_ms, 100, 80))
.await
.unwrap();
b.report_usage_admin(&bid, admin_args("tokens", 40))
.await
.unwrap();
let pre = b.get_budget_status(&bid).await.unwrap();
assert_eq!(pre.usage.get("tokens").copied(), Some(40));
let t = now_ms();
let r = b
.reset_budget(ResetBudgetArgs {
budget_id: bid.clone(),
now: TimestampMs(t),
})
.await
.unwrap();
let next = match r {
ResetBudgetResult::Reset { next_reset_at } => next_reset_at.0,
#[allow(unreachable_patterns)]
_ => panic!("unexpected reset result"),
};
assert!(next >= t + interval_ms as i64);
let post = b.get_budget_status(&bid).await.unwrap();
assert_eq!(post.usage.get("tokens").copied(), Some(0));
assert!(post.last_breach_at.is_none());
assert!(post.last_breach_dim.is_none());
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn reset_budget_not_found_returns_notfound() {
let b = fresh_backend().await;
let bid = BudgetId::new();
let err = b
.reset_budget(ResetBudgetArgs {
budget_id: bid,
now: TimestampMs(now_ms()),
})
.await
.unwrap_err();
match err {
EngineError::NotFound { entity } => assert_eq!(entity, "budget"),
other => panic!("expected NotFound, got {other:?}"),
}
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn create_quota_policy_writes_policy_row_idempotent() {
let b = fresh_backend().await;
let qid = QuotaPolicyId::new();
let args = CreateQuotaPolicyArgs {
quota_policy_id: qid.clone(),
window_seconds: 60,
max_requests_per_window: 100,
max_concurrent: 10,
now: TimestampMs(now_ms()),
};
let r1 = b.create_quota_policy(args.clone()).await.unwrap();
assert!(matches!(r1, CreateQuotaPolicyResult::Created { .. }));
let r2 = b.create_quota_policy(args).await.unwrap();
assert!(matches!(r2, CreateQuotaPolicyResult::AlreadySatisfied { .. }));
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn get_budget_status_returns_limits_and_counters() {
let b = fresh_backend().await;
let bid = BudgetId::new();
let missing = b.get_budget_status(&bid).await.unwrap_err();
match missing {
EngineError::NotFound { entity } => assert_eq!(entity, "budget"),
other => panic!("expected NotFound, got {other:?}"),
}
b.create_budget(budget_args(&bid, 0, 100, 80)).await.unwrap();
let status = b.get_budget_status(&bid).await.unwrap();
assert_eq!(status.scope_type, "flow");
assert_eq!(status.scope_id, "flow-1");
assert_eq!(status.enforcement_mode, "strict");
assert_eq!(status.hard_limits.get("tokens").copied(), Some(100));
assert_eq!(status.soft_limits.get("tokens").copied(), Some(80));
assert_eq!(status.breach_count, 0);
assert_eq!(status.soft_breach_count, 0);
assert!(status.created_at.is_some());
assert!(status.next_reset_at.is_none(), "no reset scheduled");
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn report_usage_admin_ok_increments_usage() {
let b = fresh_backend().await;
let bid = BudgetId::new();
b.create_budget(budget_args(&bid, 0, 100, 80)).await.unwrap();
let r = b.report_usage_admin(&bid, admin_args("tokens", 30)).await.unwrap();
assert!(matches!(r, ReportUsageResult::Ok));
let status = b.get_budget_status(&bid).await.unwrap();
assert_eq!(status.usage.get("tokens").copied(), Some(30));
assert_eq!(status.breach_count, 0);
assert_eq!(status.soft_breach_count, 0);
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn report_usage_admin_soft_breach_increments_soft_count() {
let b = fresh_backend().await;
let bid = BudgetId::new();
b.create_budget(budget_args(&bid, 0, 100, 50)).await.unwrap();
let r = b.report_usage_admin(&bid, admin_args("tokens", 60)).await.unwrap();
assert!(
matches!(r, ReportUsageResult::SoftBreach { .. }),
"expected SoftBreach, got {r:?}"
);
let status = b.get_budget_status(&bid).await.unwrap();
assert_eq!(status.usage.get("tokens").copied(), Some(60));
assert_eq!(status.soft_breach_count, 1);
assert_eq!(status.breach_count, 0);
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn report_usage_admin_hard_breach_rejects_and_increments_hard_count() {
let b = fresh_backend().await;
let bid = BudgetId::new();
b.create_budget(budget_args(&bid, 0, 100, 50)).await.unwrap();
b.report_usage_admin(&bid, admin_args("tokens", 60))
.await
.unwrap();
let r = b.report_usage_admin(&bid, admin_args("tokens", 60)).await.unwrap();
assert!(
matches!(r, ReportUsageResult::HardBreach { .. }),
"expected HardBreach, got {r:?}"
);
let status = b.get_budget_status(&bid).await.unwrap();
assert_eq!(
status.usage.get("tokens").copied(),
Some(60),
"hard-breach must NOT apply the increment"
);
assert_eq!(status.breach_count, 1);
assert_eq!(status.last_breach_dim.as_deref(), Some("tokens"));
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn token_budget_scenario_trips_hard_breach_via_counter() {
let b = fresh_backend().await;
let bid = BudgetId::new();
let args = CreateBudgetArgs {
budget_id: bid.clone(),
scope_type: "flow".into(),
scope_id: "flow-abc".into(),
enforcement_mode: "strict".into(),
on_hard_limit: "fail".into(),
on_soft_limit: "warn".into(),
reset_interval_ms: 0,
dimensions: vec!["tokens".into(), "cost_micros".into()],
hard_limits: vec![1_200, 15_000],
soft_limits: vec![800, 10_000],
now: TimestampMs(now_ms()),
};
b.create_budget(args).await.unwrap();
let mut soft_seen = false;
let mut hard_seen = false;
for i in 0..5 {
let args = ReportUsageAdminArgs::new(
vec!["tokens".into(), "cost_micros".into()],
vec![350, 350 * 25 / 1000],
TimestampMs(now_ms()),
)
.with_dedup_key(format!("attempt-{i}"));
let r = b.report_usage_admin(&bid, args).await.unwrap();
match r {
ReportUsageResult::Ok => {}
ReportUsageResult::SoftBreach { .. } => soft_seen = true,
ReportUsageResult::HardBreach { .. } => {
hard_seen = true;
break;
}
other => panic!("unexpected outcome: {other:?}"),
}
}
assert!(soft_seen, "expected at least one SoftBreach in the batch");
assert!(hard_seen, "expected a HardBreach to trip");
let status = b.get_budget_status(&bid).await.unwrap();
assert!(status.breach_count >= 1, "breach_count must increment");
assert!(
status.soft_breach_count >= 1,
"soft_breach_count must increment before hard trip"
);
let tokens = status.usage.get("tokens").copied().unwrap_or(0);
assert!(
tokens < 1_200,
"hard breach must reject the triggering increment — got {tokens}"
);
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn report_usage_admin_dedup_replay_returns_cached_outcome() {
let b = fresh_backend().await;
let bid = BudgetId::new();
b.create_budget(budget_args(&bid, 0, 100, 80)).await.unwrap();
let dk = format!("dedup-{}", uuid_like());
let args = ReportUsageAdminArgs::new(
vec!["tokens".into()],
vec![10],
TimestampMs(now_ms()),
)
.with_dedup_key(dk.clone());
let r1 = b.report_usage_admin(&bid, args.clone()).await.unwrap();
assert!(matches!(r1, ReportUsageResult::Ok));
let r2 = b.report_usage_admin(&bid, args).await.unwrap();
assert!(matches!(r2, ReportUsageResult::Ok));
let status = b.get_budget_status(&bid).await.unwrap();
assert_eq!(
status.usage.get("tokens").copied(),
Some(10),
"dedup replay must not double-count"
);
}