1use std::collections::BTreeMap;
24
25use ff_core::backend::UsageDimensions;
26use ff_core::contracts::ReportUsageResult;
27use ff_core::engine_error::{EngineError, ValidationKind};
28use ff_core::partition::{budget_partition, PartitionConfig};
29use ff_core::types::BudgetId;
30use serde_json::{json, Value as JsonValue};
31use sqlx::{PgPool, Row};
32
33use crate::error::map_sqlx_error;
34
35fn dim_row_key(name: &str) -> String {
44 name.to_string()
45}
46
47fn now_ms() -> i64 {
50 std::time::SystemTime::now()
51 .duration_since(std::time::UNIX_EPOCH)
52 .map(|d| d.as_millis() as i64)
53 .unwrap_or(0)
54}
55
56fn outcome_to_json(r: &ReportUsageResult) -> JsonValue {
66 match r {
67 ReportUsageResult::Ok => json!({"kind": "Ok"}),
68 ReportUsageResult::AlreadyApplied => json!({"kind": "AlreadyApplied"}),
69 ReportUsageResult::SoftBreach {
70 dimension,
71 current_usage,
72 soft_limit,
73 } => json!({
74 "kind": "SoftBreach",
75 "dimension": dimension,
76 "current_usage": current_usage,
77 "soft_limit": soft_limit,
78 }),
79 ReportUsageResult::HardBreach {
80 dimension,
81 current_usage,
82 hard_limit,
83 } => json!({
84 "kind": "HardBreach",
85 "dimension": dimension,
86 "current_usage": current_usage,
87 "hard_limit": hard_limit,
88 }),
89 _ => json!({"kind": "Ok"}),
94 }
95}
96
97fn outcome_from_json(v: &JsonValue) -> Result<ReportUsageResult, EngineError> {
99 let kind = v.get("kind").and_then(|k| k.as_str()).ok_or_else(|| {
100 EngineError::Validation {
101 kind: ValidationKind::Corruption,
102 detail: "budget dedup outcome_json missing `kind`".into(),
103 }
104 })?;
105 match kind {
106 "Ok" => Ok(ReportUsageResult::Ok),
107 "AlreadyApplied" => Ok(ReportUsageResult::AlreadyApplied),
108 "SoftBreach" => Ok(ReportUsageResult::SoftBreach {
109 dimension: v
110 .get("dimension")
111 .and_then(|d| d.as_str())
112 .unwrap_or_default()
113 .to_string(),
114 current_usage: v.get("current_usage").and_then(|d| d.as_u64()).unwrap_or(0),
115 soft_limit: v.get("soft_limit").and_then(|d| d.as_u64()).unwrap_or(0),
116 }),
117 "HardBreach" => Ok(ReportUsageResult::HardBreach {
118 dimension: v
119 .get("dimension")
120 .and_then(|d| d.as_str())
121 .unwrap_or_default()
122 .to_string(),
123 current_usage: v.get("current_usage").and_then(|d| d.as_u64()).unwrap_or(0),
124 hard_limit: v.get("hard_limit").and_then(|d| d.as_u64()).unwrap_or(0),
125 }),
126 other => Err(EngineError::Validation {
127 kind: ValidationKind::Corruption,
128 detail: format!("budget dedup outcome_json unknown kind: {other}"),
129 }),
130 }
131}
132
133fn limits_from_policy(policy: &JsonValue, key: &str) -> BTreeMap<String, u64> {
138 policy
139 .get(key)
140 .and_then(|v| v.as_object())
141 .map(|obj| {
142 obj.iter()
143 .filter_map(|(k, v)| v.as_u64().map(|n| (k.clone(), n)))
144 .collect()
145 })
146 .unwrap_or_default()
147}
148
149const DEDUP_TTL_MS: i64 = 24 * 60 * 60 * 1_000;
152
153pub(crate) async fn report_usage_impl(
155 pool: &PgPool,
156 partition_config: &PartitionConfig,
157 budget: &BudgetId,
158 dimensions: UsageDimensions,
159) -> Result<ReportUsageResult, EngineError> {
160 let partition = budget_partition(budget, partition_config);
161 let partition_key: i16 = partition.index as i16;
162 let budget_id_str = budget.to_string();
163 let now = now_ms();
164
165 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
166
167 sqlx::query("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
169 .execute(&mut *tx)
170 .await
171 .map_err(map_sqlx_error)?;
172
173 let dedup_owned = match dimensions.dedup_key.as_deref().filter(|k| !k.is_empty()) {
180 Some(dk) => {
181 let inserted = sqlx::query(
182 "INSERT INTO ff_budget_usage_dedup \
183 (partition_key, dedup_key, outcome_json, applied_at_ms, expires_at_ms) \
184 VALUES ($1, $2, '{}'::jsonb, $3, $4) \
185 ON CONFLICT (partition_key, dedup_key) DO NOTHING \
186 RETURNING applied_at_ms",
187 )
188 .bind(partition_key)
189 .bind(dk)
190 .bind(now)
191 .bind(now + DEDUP_TTL_MS)
192 .fetch_optional(&mut *tx)
193 .await
194 .map_err(map_sqlx_error)?;
195
196 if inserted.is_none() {
197 let row = sqlx::query(
199 "SELECT outcome_json FROM ff_budget_usage_dedup \
200 WHERE partition_key = $1 AND dedup_key = $2",
201 )
202 .bind(partition_key)
203 .bind(dk)
204 .fetch_one(&mut *tx)
205 .await
206 .map_err(map_sqlx_error)?;
207 let outcome: JsonValue = row.get("outcome_json");
208 tx.commit().await.map_err(map_sqlx_error)?;
209 if outcome.as_object().map(|o| o.is_empty()).unwrap_or(false) {
214 return Ok(ReportUsageResult::AlreadyApplied);
215 }
216 return outcome_from_json(&outcome);
217 }
218 Some(dk.to_string())
219 }
220 None => None,
221 };
222
223 let policy_row = sqlx::query(
227 "SELECT policy_json FROM ff_budget_policy \
228 WHERE partition_key = $1 AND budget_id = $2 FOR SHARE",
229 )
230 .bind(partition_key)
231 .bind(&budget_id_str)
232 .fetch_optional(&mut *tx)
233 .await
234 .map_err(map_sqlx_error)?;
235
236 let policy: JsonValue = match policy_row {
237 Some(r) => r.get("policy_json"),
238 None => JsonValue::Object(Default::default()),
242 };
243 let hard_limits = limits_from_policy(&policy, "hard_limits");
244 let soft_limits = limits_from_policy(&policy, "soft_limits");
245
246 let mut per_dim_current: BTreeMap<String, u64> = BTreeMap::new();
257 for (dim, delta) in dimensions.custom.iter() {
258 let dim_key = dim_row_key(dim);
259 sqlx::query(
260 "INSERT INTO ff_budget_usage \
261 (partition_key, budget_id, dimensions_key, current_value, updated_at_ms) \
262 VALUES ($1, $2, $3, 0, $4) \
263 ON CONFLICT (partition_key, budget_id, dimensions_key) DO NOTHING",
264 )
265 .bind(partition_key)
266 .bind(&budget_id_str)
267 .bind(&dim_key)
268 .bind(now)
269 .execute(&mut *tx)
270 .await
271 .map_err(map_sqlx_error)?;
272
273 let row = sqlx::query(
274 "SELECT current_value FROM ff_budget_usage \
275 WHERE partition_key = $1 AND budget_id = $2 AND dimensions_key = $3 \
276 FOR UPDATE",
277 )
278 .bind(partition_key)
279 .bind(&budget_id_str)
280 .bind(&dim_key)
281 .fetch_one(&mut *tx)
282 .await
283 .map_err(map_sqlx_error)?;
284 let cur: i64 = row.get("current_value");
285 let new_val = (cur as u64).saturating_add(*delta);
286
287 if let Some(hard) = hard_limits.get(dim)
288 && *hard > 0
289 && new_val > *hard
290 {
291 let outcome = ReportUsageResult::HardBreach {
293 dimension: dim.clone(),
294 current_usage: cur as u64,
295 hard_limit: *hard,
296 };
297 if let Some(dk) = dedup_owned.as_deref() {
298 finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
299 }
300 tx.commit().await.map_err(map_sqlx_error)?;
301 return Ok(outcome);
302 }
303 per_dim_current.insert(dim.clone(), new_val);
304 }
305
306 let mut soft_breach: Option<ReportUsageResult> = None;
309 for (dim, delta) in dimensions.custom.iter() {
310 let dim_key = dim_row_key(dim);
311 let new_val = per_dim_current[dim];
312 sqlx::query(
313 "UPDATE ff_budget_usage \
314 SET current_value = current_value + $1, updated_at_ms = $2 \
315 WHERE partition_key = $3 AND budget_id = $4 AND dimensions_key = $5",
316 )
317 .bind(*delta as i64)
318 .bind(now)
319 .bind(partition_key)
320 .bind(&budget_id_str)
321 .bind(&dim_key)
322 .execute(&mut *tx)
323 .await
324 .map_err(map_sqlx_error)?;
325
326 if soft_breach.is_none()
327 && let Some(soft) = soft_limits.get(dim)
328 && *soft > 0
329 && new_val > *soft
330 {
331 soft_breach = Some(ReportUsageResult::SoftBreach {
332 dimension: dim.clone(),
333 current_usage: new_val,
334 soft_limit: *soft,
335 });
336 }
337 }
338
339 let outcome = soft_breach.unwrap_or(ReportUsageResult::Ok);
340 if let Some(dk) = dedup_owned.as_deref() {
341 finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
342 }
343 tx.commit().await.map_err(map_sqlx_error)?;
344 Ok(outcome)
345}
346
347async fn finalize_dedup(
350 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
351 partition_key: i16,
352 dedup_key: &str,
353 outcome: &ReportUsageResult,
354) -> Result<(), EngineError> {
355 let json = outcome_to_json(outcome);
356 sqlx::query(
357 "UPDATE ff_budget_usage_dedup SET outcome_json = $1 \
358 WHERE partition_key = $2 AND dedup_key = $3",
359 )
360 .bind(json)
361 .bind(partition_key)
362 .bind(dedup_key)
363 .execute(&mut **tx)
364 .await
365 .map_err(map_sqlx_error)?;
366 Ok(())
367}
368
369#[doc(hidden)]
375pub async fn upsert_policy_for_test(
376 pool: &PgPool,
377 partition_config: &PartitionConfig,
378 budget: &BudgetId,
379 policy_json: JsonValue,
380) -> Result<(), EngineError> {
381 let partition = budget_partition(budget, partition_config);
382 let partition_key: i16 = partition.index as i16;
383 let now = now_ms();
384 sqlx::query(
385 "INSERT INTO ff_budget_policy \
386 (partition_key, budget_id, policy_json, created_at_ms, updated_at_ms) \
387 VALUES ($1, $2, $3, $4, $4) \
388 ON CONFLICT (partition_key, budget_id) DO UPDATE \
389 SET policy_json = EXCLUDED.policy_json, \
390 updated_at_ms = EXCLUDED.updated_at_ms",
391 )
392 .bind(partition_key)
393 .bind(budget.to_string())
394 .bind(policy_json)
395 .bind(now)
396 .execute(pool)
397 .await
398 .map_err(map_sqlx_error)?;
399 Ok(())
400}