use std::collections::BTreeMap;
use ff_core::backend::UsageDimensions;
use ff_core::contracts::ReportUsageResult;
use ff_core::engine_error::{EngineError, ValidationKind};
use ff_core::partition::{budget_partition, PartitionConfig};
use ff_core::types::BudgetId;
use serde_json::{json, Value as JsonValue};
use sqlx::{PgPool, Row};
use crate::error::map_sqlx_error;
fn dim_row_key(name: &str) -> String {
name.to_string()
}
fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}
fn outcome_to_json(r: &ReportUsageResult) -> JsonValue {
match r {
ReportUsageResult::Ok => json!({"kind": "Ok"}),
ReportUsageResult::AlreadyApplied => json!({"kind": "AlreadyApplied"}),
ReportUsageResult::SoftBreach {
dimension,
current_usage,
soft_limit,
} => json!({
"kind": "SoftBreach",
"dimension": dimension,
"current_usage": current_usage,
"soft_limit": soft_limit,
}),
ReportUsageResult::HardBreach {
dimension,
current_usage,
hard_limit,
} => json!({
"kind": "HardBreach",
"dimension": dimension,
"current_usage": current_usage,
"hard_limit": hard_limit,
}),
_ => json!({"kind": "Ok"}),
}
}
fn outcome_from_json(v: &JsonValue) -> Result<ReportUsageResult, EngineError> {
let kind = v.get("kind").and_then(|k| k.as_str()).ok_or_else(|| {
EngineError::Validation {
kind: ValidationKind::Corruption,
detail: "budget dedup outcome_json missing `kind`".into(),
}
})?;
match kind {
"Ok" => Ok(ReportUsageResult::Ok),
"AlreadyApplied" => Ok(ReportUsageResult::AlreadyApplied),
"SoftBreach" => Ok(ReportUsageResult::SoftBreach {
dimension: v
.get("dimension")
.and_then(|d| d.as_str())
.unwrap_or_default()
.to_string(),
current_usage: v.get("current_usage").and_then(|d| d.as_u64()).unwrap_or(0),
soft_limit: v.get("soft_limit").and_then(|d| d.as_u64()).unwrap_or(0),
}),
"HardBreach" => Ok(ReportUsageResult::HardBreach {
dimension: v
.get("dimension")
.and_then(|d| d.as_str())
.unwrap_or_default()
.to_string(),
current_usage: v.get("current_usage").and_then(|d| d.as_u64()).unwrap_or(0),
hard_limit: v.get("hard_limit").and_then(|d| d.as_u64()).unwrap_or(0),
}),
other => Err(EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("budget dedup outcome_json unknown kind: {other}"),
}),
}
}
fn limits_from_policy(policy: &JsonValue, key: &str) -> BTreeMap<String, u64> {
policy
.get(key)
.and_then(|v| v.as_object())
.map(|obj| {
obj.iter()
.filter_map(|(k, v)| v.as_u64().map(|n| (k.clone(), n)))
.collect()
})
.unwrap_or_default()
}
const DEDUP_TTL_MS: i64 = 24 * 60 * 60 * 1_000;
pub(crate) async fn report_usage_impl(
pool: &PgPool,
partition_config: &PartitionConfig,
budget: &BudgetId,
dimensions: UsageDimensions,
) -> Result<ReportUsageResult, EngineError> {
let partition = budget_partition(budget, partition_config);
let partition_key: i16 = partition.index as i16;
let budget_id_str = budget.to_string();
let now = now_ms();
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
sqlx::query("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let dedup_owned = match dimensions.dedup_key.as_deref().filter(|k| !k.is_empty()) {
Some(dk) => {
let inserted = sqlx::query(
"INSERT INTO ff_budget_usage_dedup \
(partition_key, dedup_key, outcome_json, applied_at_ms, expires_at_ms) \
VALUES ($1, $2, '{}'::jsonb, $3, $4) \
ON CONFLICT (partition_key, dedup_key) DO NOTHING \
RETURNING applied_at_ms",
)
.bind(partition_key)
.bind(dk)
.bind(now)
.bind(now + DEDUP_TTL_MS)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
if inserted.is_none() {
let row = sqlx::query(
"SELECT outcome_json FROM ff_budget_usage_dedup \
WHERE partition_key = $1 AND dedup_key = $2",
)
.bind(partition_key)
.bind(dk)
.fetch_one(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let outcome: JsonValue = row.get("outcome_json");
tx.commit().await.map_err(map_sqlx_error)?;
if outcome.as_object().map(|o| o.is_empty()).unwrap_or(false) {
return Ok(ReportUsageResult::AlreadyApplied);
}
return outcome_from_json(&outcome);
}
Some(dk.to_string())
}
None => None,
};
let policy_row = sqlx::query(
"SELECT policy_json FROM ff_budget_policy \
WHERE partition_key = $1 AND budget_id = $2 FOR SHARE",
)
.bind(partition_key)
.bind(&budget_id_str)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let policy: JsonValue = match policy_row {
Some(r) => r.get("policy_json"),
None => JsonValue::Object(Default::default()),
};
let hard_limits = limits_from_policy(&policy, "hard_limits");
let soft_limits = limits_from_policy(&policy, "soft_limits");
let mut per_dim_current: BTreeMap<String, u64> = BTreeMap::new();
for (dim, delta) in dimensions.custom.iter() {
let dim_key = dim_row_key(dim);
sqlx::query(
"INSERT INTO ff_budget_usage \
(partition_key, budget_id, dimensions_key, current_value, updated_at_ms) \
VALUES ($1, $2, $3, 0, $4) \
ON CONFLICT (partition_key, budget_id, dimensions_key) DO NOTHING",
)
.bind(partition_key)
.bind(&budget_id_str)
.bind(&dim_key)
.bind(now)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let row = sqlx::query(
"SELECT current_value FROM ff_budget_usage \
WHERE partition_key = $1 AND budget_id = $2 AND dimensions_key = $3 \
FOR UPDATE",
)
.bind(partition_key)
.bind(&budget_id_str)
.bind(&dim_key)
.fetch_one(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let cur: i64 = row.get("current_value");
let new_val = (cur as u64).saturating_add(*delta);
if let Some(hard) = hard_limits.get(dim)
&& *hard > 0
&& new_val > *hard
{
let outcome = ReportUsageResult::HardBreach {
dimension: dim.clone(),
current_usage: cur as u64,
hard_limit: *hard,
};
if let Some(dk) = dedup_owned.as_deref() {
finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
}
tx.commit().await.map_err(map_sqlx_error)?;
return Ok(outcome);
}
per_dim_current.insert(dim.clone(), new_val);
}
let mut soft_breach: Option<ReportUsageResult> = None;
for (dim, delta) in dimensions.custom.iter() {
let dim_key = dim_row_key(dim);
let new_val = per_dim_current[dim];
sqlx::query(
"UPDATE ff_budget_usage \
SET current_value = current_value + $1, updated_at_ms = $2 \
WHERE partition_key = $3 AND budget_id = $4 AND dimensions_key = $5",
)
.bind(*delta as i64)
.bind(now)
.bind(partition_key)
.bind(&budget_id_str)
.bind(&dim_key)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
if soft_breach.is_none()
&& let Some(soft) = soft_limits.get(dim)
&& *soft > 0
&& new_val > *soft
{
soft_breach = Some(ReportUsageResult::SoftBreach {
dimension: dim.clone(),
current_usage: new_val,
soft_limit: *soft,
});
}
}
let outcome = soft_breach.unwrap_or(ReportUsageResult::Ok);
if let Some(dk) = dedup_owned.as_deref() {
finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
}
tx.commit().await.map_err(map_sqlx_error)?;
Ok(outcome)
}
async fn finalize_dedup(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
partition_key: i16,
dedup_key: &str,
outcome: &ReportUsageResult,
) -> Result<(), EngineError> {
let json = outcome_to_json(outcome);
sqlx::query(
"UPDATE ff_budget_usage_dedup SET outcome_json = $1 \
WHERE partition_key = $2 AND dedup_key = $3",
)
.bind(json)
.bind(partition_key)
.bind(dedup_key)
.execute(&mut **tx)
.await
.map_err(map_sqlx_error)?;
Ok(())
}
#[doc(hidden)]
pub async fn upsert_policy_for_test(
pool: &PgPool,
partition_config: &PartitionConfig,
budget: &BudgetId,
policy_json: JsonValue,
) -> Result<(), EngineError> {
let partition = budget_partition(budget, partition_config);
let partition_key: i16 = partition.index as i16;
let now = now_ms();
sqlx::query(
"INSERT INTO ff_budget_policy \
(partition_key, budget_id, policy_json, created_at_ms, updated_at_ms) \
VALUES ($1, $2, $3, $4, $4) \
ON CONFLICT (partition_key, budget_id) DO UPDATE \
SET policy_json = EXCLUDED.policy_json, \
updated_at_ms = EXCLUDED.updated_at_ms",
)
.bind(partition_key)
.bind(budget.to_string())
.bind(policy_json)
.bind(now)
.execute(pool)
.await
.map_err(map_sqlx_error)?;
Ok(())
}