use std::collections::{BTreeMap, HashMap};
use ff_core::backend::UsageDimensions;
use ff_core::contracts::{
BudgetStatus, CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs,
CreateQuotaPolicyResult, ReportUsageAdminArgs, ReportUsageResult, ResetBudgetArgs,
ResetBudgetResult,
};
use ff_core::engine_error::{backend_context, EngineError, ValidationKind};
use ff_core::partition::{budget_partition, quota_partition, PartitionConfig};
use ff_core::types::{BudgetId, TimestampMs};
use serde_json::{json, Value as JsonValue};
use sqlx::{PgPool, Row};
use crate::error::map_sqlx_error;
fn dim_row_key(name: &str) -> String {
name.to_string()
}
fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}
fn outcome_to_json(r: &ReportUsageResult) -> JsonValue {
match r {
ReportUsageResult::Ok => json!({"kind": "Ok"}),
ReportUsageResult::AlreadyApplied => json!({"kind": "AlreadyApplied"}),
ReportUsageResult::SoftBreach {
dimension,
current_usage,
soft_limit,
} => json!({
"kind": "SoftBreach",
"dimension": dimension,
"current_usage": current_usage,
"soft_limit": soft_limit,
}),
ReportUsageResult::HardBreach {
dimension,
current_usage,
hard_limit,
} => json!({
"kind": "HardBreach",
"dimension": dimension,
"current_usage": current_usage,
"hard_limit": hard_limit,
}),
_ => json!({"kind": "Ok"}),
}
}
fn outcome_from_json(v: &JsonValue) -> Result<ReportUsageResult, EngineError> {
let kind = v.get("kind").and_then(|k| k.as_str()).ok_or_else(|| {
EngineError::Validation {
kind: ValidationKind::Corruption,
detail: "budget dedup outcome_json missing `kind`".into(),
}
})?;
match kind {
"Ok" => Ok(ReportUsageResult::Ok),
"AlreadyApplied" => Ok(ReportUsageResult::AlreadyApplied),
"SoftBreach" => Ok(ReportUsageResult::SoftBreach {
dimension: v
.get("dimension")
.and_then(|d| d.as_str())
.unwrap_or_default()
.to_string(),
current_usage: v.get("current_usage").and_then(|d| d.as_u64()).unwrap_or(0),
soft_limit: v.get("soft_limit").and_then(|d| d.as_u64()).unwrap_or(0),
}),
"HardBreach" => Ok(ReportUsageResult::HardBreach {
dimension: v
.get("dimension")
.and_then(|d| d.as_str())
.unwrap_or_default()
.to_string(),
current_usage: v.get("current_usage").and_then(|d| d.as_u64()).unwrap_or(0),
hard_limit: v.get("hard_limit").and_then(|d| d.as_u64()).unwrap_or(0),
}),
other => Err(EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("budget dedup outcome_json unknown kind: {other}"),
}),
}
}
fn limits_from_policy(policy: &JsonValue, key: &str) -> BTreeMap<String, u64> {
policy
.get(key)
.and_then(|v| v.as_object())
.map(|obj| {
obj.iter()
.filter_map(|(k, v)| v.as_u64().map(|n| (k.clone(), n)))
.collect()
})
.unwrap_or_default()
}
const DEDUP_TTL_MS: i64 = 24 * 60 * 60 * 1_000;
pub(crate) async fn report_usage_impl(
pool: &PgPool,
partition_config: &PartitionConfig,
budget: &BudgetId,
dimensions: UsageDimensions,
) -> Result<ReportUsageResult, EngineError> {
report_usage_and_check_core(pool, partition_config, budget, dimensions).await
}
pub(crate) async fn report_usage_admin_impl(
pool: &PgPool,
partition_config: &PartitionConfig,
budget: &BudgetId,
args: ReportUsageAdminArgs,
) -> Result<ReportUsageResult, EngineError> {
if args.dimensions.len() != args.deltas.len() {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "report_usage_admin: dimensions and deltas length mismatch".into(),
});
}
let mut custom: BTreeMap<String, u64> = BTreeMap::new();
for (d, v) in args.dimensions.into_iter().zip(args.deltas) {
custom.insert(d, v);
}
let mut ud = UsageDimensions::new();
ud.custom = custom;
ud.dedup_key = args.dedup_key;
report_usage_and_check_core(pool, partition_config, budget, ud).await
}
async fn report_usage_and_check_core(
pool: &PgPool,
partition_config: &PartitionConfig,
budget: &BudgetId,
dimensions: UsageDimensions,
) -> Result<ReportUsageResult, EngineError> {
let partition = budget_partition(budget, partition_config);
let partition_key: i16 = partition.index as i16;
let budget_id_str = budget.to_string();
let now = now_ms();
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
sqlx::query("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let dedup_owned = match dimensions.dedup_key.as_deref().filter(|k| !k.is_empty()) {
Some(dk) => {
let inserted = sqlx::query(
"INSERT INTO ff_budget_usage_dedup \
(partition_key, dedup_key, outcome_json, applied_at_ms, expires_at_ms) \
VALUES ($1, $2, '{}'::jsonb, $3, $4) \
ON CONFLICT (partition_key, dedup_key) DO NOTHING \
RETURNING applied_at_ms",
)
.bind(partition_key)
.bind(dk)
.bind(now)
.bind(now + DEDUP_TTL_MS)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
if inserted.is_none() {
let row = sqlx::query(
"SELECT outcome_json FROM ff_budget_usage_dedup \
WHERE partition_key = $1 AND dedup_key = $2",
)
.bind(partition_key)
.bind(dk)
.fetch_one(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let outcome: JsonValue = row.get("outcome_json");
tx.commit().await.map_err(map_sqlx_error)?;
if outcome.as_object().map(|o| o.is_empty()).unwrap_or(false) {
return Ok(ReportUsageResult::AlreadyApplied);
}
return outcome_from_json(&outcome);
}
Some(dk.to_string())
}
None => None,
};
let policy_row = sqlx::query(
"SELECT policy_json FROM ff_budget_policy \
WHERE partition_key = $1 AND budget_id = $2 FOR NO KEY UPDATE",
)
.bind(partition_key)
.bind(&budget_id_str)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let policy: JsonValue = match policy_row {
Some(r) => r.get("policy_json"),
None => JsonValue::Object(Default::default()),
};
let hard_limits = limits_from_policy(&policy, "hard_limits");
let soft_limits = limits_from_policy(&policy, "soft_limits");
let mut per_dim_current: BTreeMap<String, u64> = BTreeMap::new();
for (dim, delta) in dimensions.custom.iter() {
let dim_key = dim_row_key(dim);
sqlx::query(
"INSERT INTO ff_budget_usage \
(partition_key, budget_id, dimensions_key, current_value, updated_at_ms) \
VALUES ($1, $2, $3, 0, $4) \
ON CONFLICT (partition_key, budget_id, dimensions_key) DO NOTHING",
)
.bind(partition_key)
.bind(&budget_id_str)
.bind(&dim_key)
.bind(now)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let row = sqlx::query(
"SELECT current_value FROM ff_budget_usage \
WHERE partition_key = $1 AND budget_id = $2 AND dimensions_key = $3 \
FOR UPDATE",
)
.bind(partition_key)
.bind(&budget_id_str)
.bind(&dim_key)
.fetch_one(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let cur: i64 = row.get("current_value");
let new_val = (cur as u64).saturating_add(*delta);
if let Some(hard) = hard_limits.get(dim)
&& *hard > 0
&& new_val > *hard
{
sqlx::query(
"UPDATE ff_budget_policy \
SET breach_count = breach_count + 1, \
last_breach_at_ms = $3, \
last_breach_dim = $4, \
updated_at_ms = $3 \
WHERE partition_key = $1 AND budget_id = $2",
)
.bind(partition_key)
.bind(&budget_id_str)
.bind(now)
.bind(dim)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let outcome = ReportUsageResult::HardBreach {
dimension: dim.clone(),
current_usage: cur as u64,
hard_limit: *hard,
};
if let Some(dk) = dedup_owned.as_deref() {
finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
}
tx.commit().await.map_err(map_sqlx_error)?;
return Ok(outcome);
}
per_dim_current.insert(dim.clone(), new_val);
}
let mut soft_breach: Option<ReportUsageResult> = None;
for (dim, delta) in dimensions.custom.iter() {
let dim_key = dim_row_key(dim);
let new_val = per_dim_current[dim];
sqlx::query(
"UPDATE ff_budget_usage \
SET current_value = current_value + $1, updated_at_ms = $2 \
WHERE partition_key = $3 AND budget_id = $4 AND dimensions_key = $5",
)
.bind(*delta as i64)
.bind(now)
.bind(partition_key)
.bind(&budget_id_str)
.bind(&dim_key)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
if soft_breach.is_none()
&& let Some(soft) = soft_limits.get(dim)
&& *soft > 0
&& new_val > *soft
{
soft_breach = Some(ReportUsageResult::SoftBreach {
dimension: dim.clone(),
current_usage: new_val,
soft_limit: *soft,
});
}
}
if soft_breach.is_some() {
sqlx::query(
"UPDATE ff_budget_policy \
SET soft_breach_count = soft_breach_count + 1, \
updated_at_ms = $3 \
WHERE partition_key = $1 AND budget_id = $2",
)
.bind(partition_key)
.bind(&budget_id_str)
.bind(now)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
}
let outcome = soft_breach.unwrap_or(ReportUsageResult::Ok);
if let Some(dk) = dedup_owned.as_deref() {
finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
}
tx.commit().await.map_err(map_sqlx_error)?;
Ok(outcome)
}
async fn finalize_dedup(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
partition_key: i16,
dedup_key: &str,
outcome: &ReportUsageResult,
) -> Result<(), EngineError> {
let json = outcome_to_json(outcome);
sqlx::query(
"UPDATE ff_budget_usage_dedup SET outcome_json = $1 \
WHERE partition_key = $2 AND dedup_key = $3",
)
.bind(json)
.bind(partition_key)
.bind(dedup_key)
.execute(&mut **tx)
.await
.map_err(map_sqlx_error)?;
Ok(())
}
#[doc(hidden)]
pub async fn upsert_policy_for_test(
pool: &PgPool,
partition_config: &PartitionConfig,
budget: &BudgetId,
policy_json: JsonValue,
) -> Result<(), EngineError> {
let partition = budget_partition(budget, partition_config);
let partition_key: i16 = partition.index as i16;
let now = now_ms();
sqlx::query(
"INSERT INTO ff_budget_policy \
(partition_key, budget_id, policy_json, created_at_ms, updated_at_ms) \
VALUES ($1, $2, $3, $4, $4) \
ON CONFLICT (partition_key, budget_id) DO UPDATE \
SET policy_json = EXCLUDED.policy_json, \
updated_at_ms = EXCLUDED.updated_at_ms",
)
.bind(partition_key)
.bind(budget.to_string())
.bind(policy_json)
.bind(now)
.execute(pool)
.await
.map_err(map_sqlx_error)?;
Ok(())
}
fn build_policy_json(args: &CreateBudgetArgs) -> JsonValue {
let mut hard = serde_json::Map::new();
let mut soft = serde_json::Map::new();
for (i, dim) in args.dimensions.iter().enumerate() {
if let Some(h) = args.hard_limits.get(i).copied() {
hard.insert(dim.clone(), json!(h));
}
if let Some(s) = args.soft_limits.get(i).copied() {
soft.insert(dim.clone(), json!(s));
}
}
json!({
"hard_limits": hard,
"soft_limits": soft,
"reset_interval_ms": args.reset_interval_ms,
"on_hard_limit": args.on_hard_limit,
"on_soft_limit": args.on_soft_limit,
})
}
pub(crate) async fn create_budget_impl(
pool: &PgPool,
partition_config: &PartitionConfig,
args: CreateBudgetArgs,
) -> Result<CreateBudgetResult, EngineError> {
if args.dimensions.len() != args.hard_limits.len()
|| args.dimensions.len() != args.soft_limits.len()
{
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "create_budget: dimensions / hard_limits / soft_limits length mismatch"
.into(),
});
}
let partition = budget_partition(&args.budget_id, partition_config);
let partition_key: i16 = partition.index as i16;
let budget_id = args.budget_id.clone();
let now: i64 = args.now.0;
let policy_json = build_policy_json(&args);
let reset_interval_ms = args.reset_interval_ms as i64;
let row = sqlx::query(
"INSERT INTO ff_budget_policy \
(partition_key, budget_id, policy_json, scope_type, scope_id, \
enforcement_mode, breach_count, soft_breach_count, \
last_breach_at_ms, last_breach_dim, next_reset_at_ms, \
created_at_ms, updated_at_ms) \
VALUES ($1, $2, $3, $4, $5, $6, 0, 0, NULL, NULL, \
CASE WHEN $7::bigint > 0 THEN $8::bigint + $7::bigint ELSE NULL END, \
$8, $8) \
ON CONFLICT (partition_key, budget_id) DO NOTHING \
RETURNING created_at_ms",
)
.bind(partition_key)
.bind(budget_id.to_string())
.bind(policy_json)
.bind(&args.scope_type)
.bind(&args.scope_id)
.bind(&args.enforcement_mode)
.bind(reset_interval_ms)
.bind(now)
.fetch_optional(pool)
.await
.map_err(map_sqlx_error)?;
match row {
Some(_) => Ok(CreateBudgetResult::Created { budget_id }),
None => Ok(CreateBudgetResult::AlreadySatisfied { budget_id }),
}
}
pub(crate) async fn reset_budget_impl(
pool: &PgPool,
partition_config: &PartitionConfig,
args: ResetBudgetArgs,
) -> Result<ResetBudgetResult, EngineError> {
let partition = budget_partition(&args.budget_id, partition_config);
let partition_key: i16 = partition.index as i16;
let budget_id_str = args.budget_id.to_string();
let now: i64 = args.now.0;
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let policy_row = sqlx::query(
"SELECT policy_json FROM ff_budget_policy \
WHERE partition_key = $1 AND budget_id = $2 FOR NO KEY UPDATE",
)
.bind(partition_key)
.bind(&budget_id_str)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let Some(policy_row) = policy_row else {
tx.rollback().await.map_err(map_sqlx_error)?;
return Err(backend_context(
EngineError::NotFound { entity: "budget" },
format!("reset_budget: {}", args.budget_id),
));
};
let policy_json: JsonValue = policy_row.get("policy_json");
let reset_interval_ms: i64 = policy_json
.get("reset_interval_ms")
.and_then(|v| v.as_i64())
.unwrap_or(0);
sqlx::query(
"UPDATE ff_budget_usage \
SET current_value = 0, last_reset_at_ms = $3, updated_at_ms = $3 \
WHERE partition_key = $1 AND budget_id = $2",
)
.bind(partition_key)
.bind(&budget_id_str)
.bind(now)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let row = sqlx::query(
"UPDATE ff_budget_policy \
SET last_breach_at_ms = NULL, \
last_breach_dim = NULL, \
updated_at_ms = $3, \
next_reset_at_ms = CASE \
WHEN $4::bigint > 0 THEN $3 + $4::bigint \
ELSE NULL \
END \
WHERE partition_key = $1 AND budget_id = $2 \
RETURNING next_reset_at_ms",
)
.bind(partition_key)
.bind(&budget_id_str)
.bind(now)
.bind(reset_interval_ms)
.fetch_one(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let next_reset: Option<i64> = row
.try_get::<Option<i64>, _>("next_reset_at_ms")
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(ResetBudgetResult::Reset {
next_reset_at: TimestampMs(next_reset.unwrap_or(0)),
})
}
pub(crate) async fn create_quota_policy_impl(
pool: &PgPool,
partition_config: &PartitionConfig,
args: CreateQuotaPolicyArgs,
) -> Result<CreateQuotaPolicyResult, EngineError> {
let partition = quota_partition(&args.quota_policy_id, partition_config);
let partition_key: i16 = partition.index as i16;
let qid = args.quota_policy_id.clone();
let now: i64 = args.now.0;
let row = sqlx::query(
"INSERT INTO ff_quota_policy \
(partition_key, quota_policy_id, requests_per_window_seconds, \
max_requests_per_window, active_concurrency_cap, \
active_concurrency, created_at_ms, updated_at_ms) \
VALUES ($1, $2, $3, $4, $5, 0, $6, $6) \
ON CONFLICT (partition_key, quota_policy_id) DO NOTHING \
RETURNING created_at_ms",
)
.bind(partition_key)
.bind(qid.to_string())
.bind(args.window_seconds as i64)
.bind(args.max_requests_per_window as i64)
.bind(args.max_concurrent as i64)
.bind(now)
.fetch_optional(pool)
.await
.map_err(map_sqlx_error)?;
match row {
Some(_) => Ok(CreateQuotaPolicyResult::Created {
quota_policy_id: qid,
}),
None => Ok(CreateQuotaPolicyResult::AlreadySatisfied {
quota_policy_id: qid,
}),
}
}
pub(crate) async fn get_budget_status_impl(
pool: &PgPool,
partition_config: &PartitionConfig,
budget: &BudgetId,
) -> Result<BudgetStatus, EngineError> {
let partition = budget_partition(budget, partition_config);
let partition_key: i16 = partition.index as i16;
let budget_id_str = budget.to_string();
let policy_row = sqlx::query(
"SELECT scope_type, scope_id, enforcement_mode, \
breach_count, soft_breach_count, \
last_breach_at_ms, last_breach_dim, \
next_reset_at_ms, created_at_ms, \
policy_json \
FROM ff_budget_policy \
WHERE partition_key = $1 AND budget_id = $2",
)
.bind(partition_key)
.bind(&budget_id_str)
.fetch_optional(pool)
.await
.map_err(map_sqlx_error)?;
let Some(policy_row) = policy_row else {
return Err(backend_context(
EngineError::NotFound { entity: "budget" },
format!("get_budget_status: {budget}"),
));
};
let scope_type: String = policy_row.get("scope_type");
let scope_id: String = policy_row.get("scope_id");
let enforcement_mode: String = policy_row.get("enforcement_mode");
let breach_count: i64 = policy_row.get("breach_count");
let soft_breach_count: i64 = policy_row.get("soft_breach_count");
let last_breach_at_ms: Option<i64> = policy_row
.try_get::<Option<i64>, _>("last_breach_at_ms")
.map_err(map_sqlx_error)?;
let last_breach_dim: Option<String> = policy_row
.try_get::<Option<String>, _>("last_breach_dim")
.map_err(map_sqlx_error)?;
let next_reset_at_ms: Option<i64> = policy_row
.try_get::<Option<i64>, _>("next_reset_at_ms")
.map_err(map_sqlx_error)?;
let created_at_ms: i64 = policy_row.get("created_at_ms");
let policy_json: JsonValue = policy_row.get("policy_json");
let usage_rows = sqlx::query(
"SELECT dimensions_key, current_value \
FROM ff_budget_usage \
WHERE partition_key = $1 AND budget_id = $2",
)
.bind(partition_key)
.bind(&budget_id_str)
.fetch_all(pool)
.await
.map_err(map_sqlx_error)?;
let mut usage: HashMap<String, u64> = HashMap::new();
for r in &usage_rows {
let key: String = r.get("dimensions_key");
let val: i64 = r.get("current_value");
if key == "_init" {
continue;
}
usage.insert(key, val.max(0) as u64);
}
let hard_limits: HashMap<String, u64> =
limits_from_policy(&policy_json, "hard_limits").into_iter().collect();
let soft_limits: HashMap<String, u64> =
limits_from_policy(&policy_json, "soft_limits").into_iter().collect();
let fmt_opt = |v: Option<i64>| -> Option<String> { v.map(|n| n.to_string()) };
Ok(BudgetStatus {
budget_id: budget.to_string(),
scope_type,
scope_id,
enforcement_mode,
usage,
hard_limits,
soft_limits,
breach_count: breach_count.max(0) as u64,
soft_breach_count: soft_breach_count.max(0) as u64,
last_breach_at: fmt_opt(last_breach_at_ms),
last_breach_dim,
next_reset_at: fmt_opt(next_reset_at_ms),
created_at: Some(created_at_ms.to_string()),
})
}
pub(crate) async fn budget_reset_reconciler_apply(
pool: &PgPool,
partition_key: i16,
budget_id: &str,
now: i64,
) -> Result<(), EngineError> {
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let policy_row = sqlx::query(
"SELECT policy_json, next_reset_at_ms FROM ff_budget_policy \
WHERE partition_key = $1 AND budget_id = $2 FOR NO KEY UPDATE",
)
.bind(partition_key)
.bind(budget_id)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let Some(policy_row) = policy_row else {
tx.rollback().await.map_err(map_sqlx_error)?;
return Ok(());
};
let next_reset: Option<i64> = policy_row
.try_get::<Option<i64>, _>("next_reset_at_ms")
.map_err(map_sqlx_error)?;
if !matches!(next_reset, Some(n) if n <= now) {
tx.rollback().await.map_err(map_sqlx_error)?;
return Ok(());
}
let policy_json: JsonValue = policy_row.get("policy_json");
let reset_interval_ms: i64 = policy_json
.get("reset_interval_ms")
.and_then(|v| v.as_i64())
.unwrap_or(0);
sqlx::query(
"UPDATE ff_budget_usage \
SET current_value = 0, last_reset_at_ms = $3, updated_at_ms = $3 \
WHERE partition_key = $1 AND budget_id = $2",
)
.bind(partition_key)
.bind(budget_id)
.bind(now)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
"UPDATE ff_budget_policy \
SET last_breach_at_ms = NULL, \
last_breach_dim = NULL, \
updated_at_ms = $3, \
next_reset_at_ms = CASE \
WHEN $4::bigint > 0 THEN $3 + $4::bigint \
ELSE NULL \
END \
WHERE partition_key = $1 AND budget_id = $2",
)
.bind(partition_key)
.bind(budget_id)
.bind(now)
.bind(reset_interval_ms)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(())
}