#![cfg(feature = "core")]
use std::sync::Arc;
use std::time::Duration;
use ff_backend_sqlite::{SqliteBackend, SqliteScannerConfig};
use ff_core::contracts::{CreateBudgetArgs, ReportUsageAdminArgs};
use ff_core::engine_backend::EngineBackend;
use ff_core::types::{BudgetId, TimestampMs};
use serial_test::serial;
fn now_ms() -> i64 {
use std::time::{SystemTime, UNIX_EPOCH};
i64::try_from(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0),
)
.unwrap_or(0)
}
fn uuid_like() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let tid = std::thread::current().id();
format!("{ns}-{tid:?}").replace([':', ' '], "-")
}
async fn fresh_backend(tag: &str) -> Arc<SqliteBackend> {
unsafe {
std::env::set_var("FF_DEV_MODE", "1");
}
let uri = format!(
"file:rfc-023-wave9-reconciler-{tag}-{}?mode=memory&cache=shared",
uuid_like()
);
SqliteBackend::new(&uri).await.expect("backend")
}
fn budget_args(id: &BudgetId, reset_interval_ms: u64) -> CreateBudgetArgs {
CreateBudgetArgs {
budget_id: id.clone(),
scope_type: "flow".into(),
scope_id: "flow-1".into(),
enforcement_mode: "strict".into(),
on_hard_limit: "fail".into(),
on_soft_limit: "warn".into(),
reset_interval_ms,
dimensions: vec!["tokens".into()],
hard_limits: vec![1000],
soft_limits: vec![800],
now: TimestampMs(now_ms()),
}
}
fn admin_args(dim: &str, delta: u64) -> ReportUsageAdminArgs {
ReportUsageAdminArgs::new(vec![dim.into()], vec![delta], TimestampMs(now_ms()))
}
async fn force_next_reset(b: &SqliteBackend, budget_id: &BudgetId, next_reset_at_ms: Option<i64>) {
let pool = b.pool_for_test();
sqlx::query("UPDATE ff_budget_policy SET next_reset_at_ms = ? WHERE partition_key = 0 AND budget_id = ?")
.bind(next_reset_at_ms)
.bind(budget_id.to_string())
.execute(pool)
.await
.expect("force next_reset_at_ms");
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn budget_reset_reconciler_fires_on_due_row() {
let b = fresh_backend("fires").await;
let bid = BudgetId::new();
let interval_ms: u64 = 60_000;
b.create_budget(budget_args(&bid, interval_ms))
.await
.unwrap();
b.report_usage_admin(&bid, admin_args("tokens", 200))
.await
.unwrap();
let t_now = now_ms();
force_next_reset(&b, &bid, Some(t_now - 1_000)).await;
let pre = b.get_budget_status(&bid).await.unwrap();
assert_eq!(pre.usage.get("tokens").copied(), Some(200));
let (processed, errors) = b
.budget_reset_scan_tick_for_test(t_now)
.await
.expect("scan_tick");
assert_eq!(errors, 0, "no reconciler errors expected");
assert_eq!(processed, 1, "one due row processed");
let post = b.get_budget_status(&bid).await.unwrap();
assert_eq!(
post.usage.get("tokens").copied(),
Some(0),
"usage zeroed by reconciler"
);
let next_post: i64 = post
.next_reset_at
.as_deref()
.expect("reconciler must reschedule next_reset_at")
.parse()
.expect("parseable");
assert!(
next_post >= t_now + interval_ms as i64,
"next_reset_at advanced by at least one interval (got {next_post}, expected >= {})",
t_now + interval_ms as i64
);
assert!(post.last_breach_at.is_none(), "last_breach_at cleared");
assert!(post.last_breach_dim.is_none(), "last_breach_dim cleared");
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn budget_reset_reconciler_skips_not_due() {
let b = fresh_backend("skips").await;
let bid = BudgetId::new();
let interval_ms: u64 = 60_000;
b.create_budget(budget_args(&bid, interval_ms))
.await
.unwrap();
b.report_usage_admin(&bid, admin_args("tokens", 150))
.await
.unwrap();
let t_now = now_ms();
let future = t_now + 10 * interval_ms as i64;
force_next_reset(&b, &bid, Some(future)).await;
let (processed, errors) = b
.budget_reset_scan_tick_for_test(t_now)
.await
.expect("scan_tick");
assert_eq!(errors, 0);
assert_eq!(processed, 0, "future-due row must not be processed");
let post = b.get_budget_status(&bid).await.unwrap();
assert_eq!(
post.usage.get("tokens").copied(),
Some(150),
"usage unchanged when not due"
);
assert_eq!(
post.next_reset_at.as_deref().map(|s| s.parse::<i64>().unwrap()),
Some(future),
"next_reset_at unchanged when not due"
);
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn budget_reset_reconciler_skips_null_schedule() {
let b = fresh_backend("null-schedule").await;
let bid = BudgetId::new();
b.create_budget(budget_args(&bid, 0)).await.unwrap();
b.report_usage_admin(&bid, admin_args("tokens", 42))
.await
.unwrap();
let (processed, errors) = b
.budget_reset_scan_tick_for_test(now_ms() + 10_000_000)
.await
.expect("scan_tick");
assert_eq!(errors, 0);
assert_eq!(processed, 0, "NULL-scheduled row never due");
let post = b.get_budget_status(&bid).await.unwrap();
assert_eq!(post.usage.get("tokens").copied(), Some(42));
assert!(post.next_reset_at.is_none());
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn supervisor_starts_and_stops_cleanly() {
let b = fresh_backend("start-stop").await;
let installed = b.with_scanners(SqliteScannerConfig {
budget_reset_interval: Duration::from_millis(100),
worker_registry_ttl_interval: Duration::from_millis(0),
});
assert!(installed, "first with_scanners call installs supervisor");
let again = b.with_scanners(SqliteScannerConfig {
budget_reset_interval: Duration::from_millis(100),
worker_registry_ttl_interval: Duration::from_millis(0),
});
assert!(!again, "second with_scanners call must be a no-op");
b.shutdown_prepare(Duration::from_secs(2))
.await
.expect("shutdown_prepare");
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn supervisor_shutdown_drains_immediately_after_install() {
let b = fresh_backend("immediate-shutdown").await;
b.with_scanners(SqliteScannerConfig {
budget_reset_interval: Duration::from_secs(3600),
worker_registry_ttl_interval: Duration::from_secs(3600),
});
b.shutdown_prepare(Duration::from_secs(2))
.await
.expect("shutdown_prepare drains registered tasks");
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn supervisor_zero_interval_is_disabled_not_panic() {
let b = fresh_backend("zero-interval").await;
b.with_scanners(SqliteScannerConfig {
budget_reset_interval: Duration::from_millis(0),
worker_registry_ttl_interval: Duration::from_millis(0),
});
b.shutdown_prepare(Duration::from_secs(1))
.await
.expect("zero-interval supervisor must shut down cleanly");
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn supervisor_tick_advances_on_cadence() {
let b = fresh_backend("cadence").await;
let bid = BudgetId::new();
let interval_ms: u64 = 60_000;
b.create_budget(budget_args(&bid, interval_ms))
.await
.unwrap();
b.report_usage_admin(&bid, admin_args("tokens", 300))
.await
.unwrap();
force_next_reset(&b, &bid, Some(now_ms() - 1_000)).await;
b.with_scanners(SqliteScannerConfig {
budget_reset_interval: Duration::from_millis(50),
worker_registry_ttl_interval: Duration::from_millis(0),
});
let deadline = std::time::Instant::now() + Duration::from_secs(3);
let mut observed_reset = false;
while std::time::Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(100)).await;
let s = b.get_budget_status(&bid).await.unwrap();
if s.usage.get("tokens").copied() == Some(0) {
observed_reset = true;
break;
}
}
assert!(
observed_reset,
"supervisor must reset the due budget within the cadence window"
);
b.shutdown_prepare(Duration::from_secs(2))
.await
.expect("shutdown_prepare");
}