use std::collections::BTreeMap;
use std::sync::Arc;
use ff_backend_postgres::PostgresBackend;
use ff_core::contracts::{CreateBudgetArgs, RecordSpendArgs, ReleaseBudgetArgs};
use ff_core::engine_backend::EngineBackend;
use ff_core::partition::PartitionConfig;
use ff_core::types::{BudgetId, ExecutionId, LaneId, TimestampMs};
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use sqlx::Row;
const LANE: &str = "release-budget-pg-test-lane";
fn now_ms() -> i64 {
i64::try_from(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis(),
)
.unwrap()
}
async fn setup_or_skip() -> Option<PgPool> {
let url = std::env::var("FF_PG_TEST_URL").ok()?;
let pool = PgPoolOptions::new()
.max_connections(4)
.connect(&url)
.await
.expect("connect to FF_PG_TEST_URL");
ff_backend_postgres::apply_migrations(&pool)
.await
.expect("apply_migrations clean");
Some(pool)
}
fn backend(pool: PgPool) -> Arc<PostgresBackend> {
PostgresBackend::from_pool(pool, PartitionConfig::default())
}
fn partition_config() -> PartitionConfig {
PartitionConfig::default()
}
fn budget_args(bid: &BudgetId) -> CreateBudgetArgs {
CreateBudgetArgs {
budget_id: bid.clone(),
scope_type: "flow".into(),
scope_id: "flow-1".into(),
enforcement_mode: "strict".into(),
on_hard_limit: "fail".into(),
on_soft_limit: "warn".into(),
reset_interval_ms: 0,
dimensions: vec!["tokens".into()],
hard_limits: vec![1_000_000],
soft_limits: vec![999_999],
now: TimestampMs(now_ms()),
}
}
fn spend(
bid: &BudgetId,
exec: &ExecutionId,
tokens: u64,
idem: &str,
) -> RecordSpendArgs {
let mut m = BTreeMap::new();
m.insert("tokens".to_string(), tokens);
RecordSpendArgs::new(bid.clone(), exec.clone(), m, idem.to_owned())
}
async fn aggregate_tokens(pool: &PgPool, bid: &BudgetId) -> i64 {
use ff_core::partition::budget_partition;
let partition_key: i16 = budget_partition(bid, &partition_config()).index as i16;
let row = sqlx::query(
"SELECT current_value FROM ff_budget_usage \
WHERE partition_key=$1 AND budget_id=$2 AND dimensions_key='tokens'",
)
.bind(partition_key)
.bind(bid.to_string())
.fetch_optional(pool)
.await
.unwrap();
row.map(|r| r.get::<i64, _>("current_value")).unwrap_or(0)
}
async fn ledger_row_count(pool: &PgPool, bid: &BudgetId, exec: &ExecutionId) -> i64 {
use ff_core::partition::budget_partition;
let partition_key: i16 = budget_partition(bid, &partition_config()).index as i16;
let s = exec.as_str();
let uuid_str = &s[s.rfind("}:").unwrap() + 2..];
let exec_uuid = uuid::Uuid::parse_str(uuid_str).unwrap();
let row = sqlx::query(
"SELECT COUNT(*) AS c FROM ff_budget_usage_by_exec \
WHERE partition_key=$1 AND budget_id=$2 AND execution_id=$3",
)
.bind(partition_key)
.bind(bid.to_string())
.bind(exec_uuid)
.fetch_one(pool)
.await
.unwrap();
row.get::<i64, _>("c")
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn release_after_record_zeros_aggregate() {
let Some(pool) = setup_or_skip().await else {
return;
};
let be = backend(pool.clone());
let bid = BudgetId::new();
let _ = be.create_budget(budget_args(&bid)).await.expect("create_budget");
let exec = ExecutionId::solo(&LaneId::new(LANE), &partition_config());
let _ = be
.record_spend(spend(&bid, &exec, 5, "r-1"))
.await
.expect("record");
assert_eq!(aggregate_tokens(&pool, &bid).await, 5);
assert_eq!(ledger_row_count(&pool, &bid, &exec).await, 1);
be.release_budget(ReleaseBudgetArgs::new(bid.clone(), exec.clone()))
.await
.expect("release");
assert_eq!(aggregate_tokens(&pool, &bid).await, 0);
assert_eq!(ledger_row_count(&pool, &bid, &exec).await, 0);
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn release_reverses_accumulated_spends() {
let Some(pool) = setup_or_skip().await else {
return;
};
let be = backend(pool.clone());
let bid = BudgetId::new();
let _ = be.create_budget(budget_args(&bid)).await.expect("create_budget");
let exec = ExecutionId::solo(&LaneId::new(LANE), &partition_config());
let _ = be.record_spend(spend(&bid, &exec, 3, "a")).await.expect("a");
let _ = be.record_spend(spend(&bid, &exec, 7, "b")).await.expect("b");
let _ = be.record_spend(spend(&bid, &exec, 2, "c")).await.expect("c");
assert_eq!(aggregate_tokens(&pool, &bid).await, 12);
be.release_budget(ReleaseBudgetArgs::new(bid.clone(), exec.clone()))
.await
.expect("release");
assert_eq!(aggregate_tokens(&pool, &bid).await, 0);
assert_eq!(ledger_row_count(&pool, &bid, &exec).await, 0);
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn release_without_prior_record_is_noop() {
let Some(pool) = setup_or_skip().await else {
return;
};
let be = backend(pool.clone());
let bid = BudgetId::new();
let _ = be.create_budget(budget_args(&bid)).await.expect("create_budget");
let seeded = ExecutionId::solo(&LaneId::new(LANE), &partition_config());
let _ = be
.record_spend(spend(&bid, &seeded, 9, "seed"))
.await
.expect("seed");
assert_eq!(aggregate_tokens(&pool, &bid).await, 9);
let ghost = ExecutionId::solo(&LaneId::new(LANE), &partition_config());
be.release_budget(ReleaseBudgetArgs::new(bid.clone(), ghost))
.await
.expect("noop release");
assert_eq!(aggregate_tokens(&pool, &bid).await, 9);
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn release_is_idempotent() {
let Some(pool) = setup_or_skip().await else {
return;
};
let be = backend(pool.clone());
let bid = BudgetId::new();
let _ = be.create_budget(budget_args(&bid)).await.expect("create_budget");
let exec = ExecutionId::solo(&LaneId::new(LANE), &partition_config());
let _ = be
.record_spend(spend(&bid, &exec, 4, "x"))
.await
.expect("record");
be.release_budget(ReleaseBudgetArgs::new(bid.clone(), exec.clone()))
.await
.expect("release 1");
assert_eq!(aggregate_tokens(&pool, &bid).await, 0);
be.release_budget(ReleaseBudgetArgs::new(bid.clone(), exec.clone()))
.await
.expect("release 2 (noop)");
assert_eq!(aggregate_tokens(&pool, &bid).await, 0);
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn release_isolated_between_executions() {
let Some(pool) = setup_or_skip().await else {
return;
};
let be = backend(pool.clone());
let bid = BudgetId::new();
let _ = be.create_budget(budget_args(&bid)).await.expect("create_budget");
let e1 = ExecutionId::solo(&LaneId::new(LANE), &partition_config());
let e2 = ExecutionId::solo(&LaneId::new(LANE), &partition_config());
let _ = be.record_spend(spend(&bid, &e1, 5, "e1")).await.expect("e1");
let _ = be.record_spend(spend(&bid, &e2, 8, "e2")).await.expect("e2");
assert_eq!(aggregate_tokens(&pool, &bid).await, 13);
be.release_budget(ReleaseBudgetArgs::new(bid.clone(), e1.clone()))
.await
.expect("release e1");
assert_eq!(aggregate_tokens(&pool, &bid).await, 8);
assert_eq!(ledger_row_count(&pool, &bid, &e1).await, 0);
assert_eq!(ledger_row_count(&pool, &bid, &e2).await, 1);
}