Skip to main content

ff_backend_postgres/
budget.rs

1//! Budget family — Postgres impl.
2//!
3//! **RFC-v0.7 Wave 4f.** Implements `EngineBackend::report_usage`
4//! against the Wave 3b schema (`0002_budget.sql`): `ff_budget_policy`,
5//! `ff_budget_usage`, `ff_budget_usage_dedup`.
6//!
7//! Isolation per v0.7 migration-master §Q11: READ COMMITTED + row-level
8//! `FOR UPDATE` on `ff_budget_usage` and `FOR SHARE` on
9//! `ff_budget_policy` — NOT SERIALIZABLE. Worker-A Tier A: trivial
10//! single-key atomic increment per dimension.
11//!
12//! Idempotency per RFC-012 §R7.2.3: the caller-supplied
13//! `UsageDimensions::dedup_key` is used as the key for an
14//! `INSERT … ON CONFLICT DO NOTHING`; on conflict the cached
15//! `outcome_json` row is returned verbatim (replay).
16//!
17//! `update_budget_policy` is NOT on the `EngineBackend` trait today
18//! (only `report_usage` is). Wave 4f therefore ships only
19//! `report_usage_impl`; definitional writes land via a test-only
20//! helper (`upsert_policy_for_test`) that the integration suite
21//! seeds through directly.
22
23use std::collections::BTreeMap;
24
25use ff_core::backend::UsageDimensions;
26use ff_core::contracts::ReportUsageResult;
27use ff_core::engine_error::{EngineError, ValidationKind};
28use ff_core::partition::{budget_partition, PartitionConfig};
29use ff_core::types::BudgetId;
30use serde_json::{json, Value as JsonValue};
31use sqlx::{PgPool, Row};
32
33use crate::error::map_sqlx_error;
34
35/// Canonical stringification of the custom-dimensions map.
36///
37/// Keyed lookups on `ff_budget_usage(..., dimensions_key)` rely on the
38/// canonical form being stable across callers. A `BTreeMap` iterates in
39/// sorted-key order, so the serialised JSON object is deterministic.
40/// For `report_usage` we key per-dimension (one row per dimension) so
41/// the stored key is simply the dimension name; this mirrors the Valkey
42/// Hash-field shape where each dim has its own HGET slot.
43fn dim_row_key(name: &str) -> String {
44    name.to_string()
45}
46
47/// Now in unix-milliseconds. Mirrors the `now_ms_timestamp` helper in
48/// the Valkey backend.
49fn now_ms() -> i64 {
50    std::time::SystemTime::now()
51        .duration_since(std::time::UNIX_EPOCH)
52        .map(|d| d.as_millis() as i64)
53        .unwrap_or(0)
54}
55
56/// Serialize a `ReportUsageResult` to the jsonb shape stored in
57/// `ff_budget_usage_dedup.outcome_json`. Shape:
58///
59/// ```json
60/// {"kind":"Ok"}
61/// {"kind":"AlreadyApplied"}
62/// {"kind":"SoftBreach","dimension":"...","current_usage":123,"soft_limit":100}
63/// {"kind":"HardBreach","dimension":"...","current_usage":123,"hard_limit":100}
64/// ```
65fn outcome_to_json(r: &ReportUsageResult) -> JsonValue {
66    match r {
67        ReportUsageResult::Ok => json!({"kind": "Ok"}),
68        ReportUsageResult::AlreadyApplied => json!({"kind": "AlreadyApplied"}),
69        ReportUsageResult::SoftBreach {
70            dimension,
71            current_usage,
72            soft_limit,
73        } => json!({
74            "kind": "SoftBreach",
75            "dimension": dimension,
76            "current_usage": current_usage,
77            "soft_limit": soft_limit,
78        }),
79        ReportUsageResult::HardBreach {
80            dimension,
81            current_usage,
82            hard_limit,
83        } => json!({
84            "kind": "HardBreach",
85            "dimension": dimension,
86            "current_usage": current_usage,
87            "hard_limit": hard_limit,
88        }),
89        // `ReportUsageResult` is `#[non_exhaustive]`; future variants
90        // land in ff-core before reaching the backend impl, so emit a
91        // safe placeholder that the replay path will reject rather
92        // than silently mis-decoding.
93        _ => json!({"kind": "Ok"}),
94    }
95}
96
97/// Inverse of [`outcome_to_json`] — used on the dedup-replay path.
98fn outcome_from_json(v: &JsonValue) -> Result<ReportUsageResult, EngineError> {
99    let kind = v.get("kind").and_then(|k| k.as_str()).ok_or_else(|| {
100        EngineError::Validation {
101            kind: ValidationKind::Corruption,
102            detail: "budget dedup outcome_json missing `kind`".into(),
103        }
104    })?;
105    match kind {
106        "Ok" => Ok(ReportUsageResult::Ok),
107        "AlreadyApplied" => Ok(ReportUsageResult::AlreadyApplied),
108        "SoftBreach" => Ok(ReportUsageResult::SoftBreach {
109            dimension: v
110                .get("dimension")
111                .and_then(|d| d.as_str())
112                .unwrap_or_default()
113                .to_string(),
114            current_usage: v.get("current_usage").and_then(|d| d.as_u64()).unwrap_or(0),
115            soft_limit: v.get("soft_limit").and_then(|d| d.as_u64()).unwrap_or(0),
116        }),
117        "HardBreach" => Ok(ReportUsageResult::HardBreach {
118            dimension: v
119                .get("dimension")
120                .and_then(|d| d.as_str())
121                .unwrap_or_default()
122                .to_string(),
123            current_usage: v.get("current_usage").and_then(|d| d.as_u64()).unwrap_or(0),
124            hard_limit: v.get("hard_limit").and_then(|d| d.as_u64()).unwrap_or(0),
125        }),
126        other => Err(EngineError::Validation {
127            kind: ValidationKind::Corruption,
128            detail: format!("budget dedup outcome_json unknown kind: {other}"),
129        }),
130    }
131}
132
133/// Extract the dimension → limit map from a `policy_json` blob, keyed
134/// by the shape documented in RFC-008 §Policy schema:
135/// `{"hard_limits": {"dim": u64}, "soft_limits": {"dim": u64}, ...}`.
136/// Missing field / non-object → empty map (= no limit on any dim).
137fn limits_from_policy(policy: &JsonValue, key: &str) -> BTreeMap<String, u64> {
138    policy
139        .get(key)
140        .and_then(|v| v.as_object())
141        .map(|obj| {
142            obj.iter()
143                .filter_map(|(k, v)| v.as_u64().map(|n| (k.clone(), n)))
144                .collect()
145        })
146        .unwrap_or_default()
147}
148
149/// Dedup-expiry window. Matches RFC-012 §R7.2.3's "caller-supplied
150/// idempotency key" retention: 24h by default.
151const DEDUP_TTL_MS: i64 = 24 * 60 * 60 * 1_000;
152
153/// `EngineBackend::report_usage` — Postgres impl.
154pub(crate) async fn report_usage_impl(
155    pool: &PgPool,
156    partition_config: &PartitionConfig,
157    budget: &BudgetId,
158    dimensions: UsageDimensions,
159) -> Result<ReportUsageResult, EngineError> {
160    let partition = budget_partition(budget, partition_config);
161    let partition_key: i16 = partition.index as i16;
162    let budget_id_str = budget.to_string();
163    let now = now_ms();
164
165    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
166
167    // RC is Postgres's default; assert explicitly per Q11.
168    sqlx::query("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
169        .execute(&mut *tx)
170        .await
171        .map_err(map_sqlx_error)?;
172
173    // ── Dedup ──
174    //
175    // When a dedup_key is provided, try to INSERT a placeholder dedup
176    // row. If the INSERT succeeds we've taken ownership and will fill
177    // the outcome_json below. If it conflicts the row already exists
178    // from a prior call — read it back and replay the cached outcome.
179    let dedup_owned = match dimensions.dedup_key.as_deref().filter(|k| !k.is_empty()) {
180        Some(dk) => {
181            let inserted = sqlx::query(
182                "INSERT INTO ff_budget_usage_dedup \
183                     (partition_key, dedup_key, outcome_json, applied_at_ms, expires_at_ms) \
184                 VALUES ($1, $2, '{}'::jsonb, $3, $4) \
185                 ON CONFLICT (partition_key, dedup_key) DO NOTHING \
186                 RETURNING applied_at_ms",
187            )
188            .bind(partition_key)
189            .bind(dk)
190            .bind(now)
191            .bind(now + DEDUP_TTL_MS)
192            .fetch_optional(&mut *tx)
193            .await
194            .map_err(map_sqlx_error)?;
195
196            if inserted.is_none() {
197                // Replay: fetch the cached outcome_json and return it.
198                let row = sqlx::query(
199                    "SELECT outcome_json FROM ff_budget_usage_dedup \
200                     WHERE partition_key = $1 AND dedup_key = $2",
201                )
202                .bind(partition_key)
203                .bind(dk)
204                .fetch_one(&mut *tx)
205                .await
206                .map_err(map_sqlx_error)?;
207                let outcome: JsonValue = row.get("outcome_json");
208                tx.commit().await.map_err(map_sqlx_error)?;
209                // Empty placeholder means a prior in-flight caller
210                // crashed before writing the outcome. Treat as
211                // AlreadyApplied so the caller sees idempotent
212                // semantics rather than double-counting.
213                if outcome.as_object().map(|o| o.is_empty()).unwrap_or(false) {
214                    return Ok(ReportUsageResult::AlreadyApplied);
215                }
216                return outcome_from_json(&outcome);
217            }
218            Some(dk.to_string())
219        }
220        None => None,
221    };
222
223    // ── Load policy (FOR SHARE so concurrent report_usage can proceed
224    //    but an in-flight policy update blocks on the FOR UPDATE it
225    //    would hold — when that admin surface lands). ──
226    let policy_row = sqlx::query(
227        "SELECT policy_json FROM ff_budget_policy \
228         WHERE partition_key = $1 AND budget_id = $2 FOR SHARE",
229    )
230    .bind(partition_key)
231    .bind(&budget_id_str)
232    .fetch_optional(&mut *tx)
233    .await
234    .map_err(map_sqlx_error)?;
235
236    let policy: JsonValue = match policy_row {
237        Some(r) => r.get("policy_json"),
238        // No policy row ⇒ no limits defined. Treat all reports as Ok
239        // and still apply increments (the Valkey impl behaves the
240        // same when the definition Hash is absent).
241        None => JsonValue::Object(Default::default()),
242    };
243    let hard_limits = limits_from_policy(&policy, "hard_limits");
244    let soft_limits = limits_from_policy(&policy, "soft_limits");
245
246    // ── Compute admission per dimension. Hard breach on ANY dim
247    //    rejects the whole report (no increments applied); soft breach
248    //    is advisory (increments applied). Iterate in sorted-key order
249    //    so the reported dimension is deterministic on multi-dim
250    //    reports. ──
251    //
252    // Per-dim: SELECT current_value FOR UPDATE (locks the row for the
253    // remainder of the txn). Row may not exist yet (first report for
254    // that dim) — INSERT … ON CONFLICT DO NOTHING first, then re-SELECT
255    // so the lock is held deterministically.
256    let mut per_dim_current: BTreeMap<String, u64> = BTreeMap::new();
257    for (dim, delta) in dimensions.custom.iter() {
258        let dim_key = dim_row_key(dim);
259        sqlx::query(
260            "INSERT INTO ff_budget_usage \
261                 (partition_key, budget_id, dimensions_key, current_value, updated_at_ms) \
262             VALUES ($1, $2, $3, 0, $4) \
263             ON CONFLICT (partition_key, budget_id, dimensions_key) DO NOTHING",
264        )
265        .bind(partition_key)
266        .bind(&budget_id_str)
267        .bind(&dim_key)
268        .bind(now)
269        .execute(&mut *tx)
270        .await
271        .map_err(map_sqlx_error)?;
272
273        let row = sqlx::query(
274            "SELECT current_value FROM ff_budget_usage \
275             WHERE partition_key = $1 AND budget_id = $2 AND dimensions_key = $3 \
276             FOR UPDATE",
277        )
278        .bind(partition_key)
279        .bind(&budget_id_str)
280        .bind(&dim_key)
281        .fetch_one(&mut *tx)
282        .await
283        .map_err(map_sqlx_error)?;
284        let cur: i64 = row.get("current_value");
285        let new_val = (cur as u64).saturating_add(*delta);
286
287        if let Some(hard) = hard_limits.get(dim)
288            && *hard > 0
289            && new_val > *hard
290        {
291            // Hard breach: no increments applied, commit dedup outcome.
292            let outcome = ReportUsageResult::HardBreach {
293                dimension: dim.clone(),
294                current_usage: cur as u64,
295                hard_limit: *hard,
296            };
297            if let Some(dk) = dedup_owned.as_deref() {
298                finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
299            }
300            tx.commit().await.map_err(map_sqlx_error)?;
301            return Ok(outcome);
302        }
303        per_dim_current.insert(dim.clone(), new_val);
304    }
305
306    // ── Apply increments + check soft limits. Soft breach is advisory
307    //    (first-dim-wins, matching Valkey's iteration-order semantics). ──
308    let mut soft_breach: Option<ReportUsageResult> = None;
309    for (dim, delta) in dimensions.custom.iter() {
310        let dim_key = dim_row_key(dim);
311        let new_val = per_dim_current[dim];
312        sqlx::query(
313            "UPDATE ff_budget_usage \
314             SET current_value = current_value + $1, updated_at_ms = $2 \
315             WHERE partition_key = $3 AND budget_id = $4 AND dimensions_key = $5",
316        )
317        .bind(*delta as i64)
318        .bind(now)
319        .bind(partition_key)
320        .bind(&budget_id_str)
321        .bind(&dim_key)
322        .execute(&mut *tx)
323        .await
324        .map_err(map_sqlx_error)?;
325
326        if soft_breach.is_none()
327            && let Some(soft) = soft_limits.get(dim)
328            && *soft > 0
329            && new_val > *soft
330        {
331            soft_breach = Some(ReportUsageResult::SoftBreach {
332                dimension: dim.clone(),
333                current_usage: new_val,
334                soft_limit: *soft,
335            });
336        }
337    }
338
339    let outcome = soft_breach.unwrap_or(ReportUsageResult::Ok);
340    if let Some(dk) = dedup_owned.as_deref() {
341        finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
342    }
343    tx.commit().await.map_err(map_sqlx_error)?;
344    Ok(outcome)
345}
346
347/// Write the final `outcome_json` onto the dedup row we inserted at
348/// the top of [`report_usage_impl`].
349async fn finalize_dedup(
350    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
351    partition_key: i16,
352    dedup_key: &str,
353    outcome: &ReportUsageResult,
354) -> Result<(), EngineError> {
355    let json = outcome_to_json(outcome);
356    sqlx::query(
357        "UPDATE ff_budget_usage_dedup SET outcome_json = $1 \
358         WHERE partition_key = $2 AND dedup_key = $3",
359    )
360    .bind(json)
361    .bind(partition_key)
362    .bind(dedup_key)
363    .execute(&mut **tx)
364    .await
365    .map_err(map_sqlx_error)?;
366    Ok(())
367}
368
369/// Test-only helper: upsert a `ff_budget_policy` row directly. The
370/// trait has no `update_budget_policy` method in v0.7 (Wave 4f
371/// verification), so the Postgres backend exposes this as a public
372/// helper so the integration suite can seed policies without going
373/// through an op we haven't yet added to the trait surface.
374#[doc(hidden)]
375pub async fn upsert_policy_for_test(
376    pool: &PgPool,
377    partition_config: &PartitionConfig,
378    budget: &BudgetId,
379    policy_json: JsonValue,
380) -> Result<(), EngineError> {
381    let partition = budget_partition(budget, partition_config);
382    let partition_key: i16 = partition.index as i16;
383    let now = now_ms();
384    sqlx::query(
385        "INSERT INTO ff_budget_policy \
386             (partition_key, budget_id, policy_json, created_at_ms, updated_at_ms) \
387         VALUES ($1, $2, $3, $4, $4) \
388         ON CONFLICT (partition_key, budget_id) DO UPDATE \
389             SET policy_json = EXCLUDED.policy_json, \
390                 updated_at_ms = EXCLUDED.updated_at_ms",
391    )
392    .bind(partition_key)
393    .bind(budget.to_string())
394    .bind(policy_json)
395    .bind(now)
396    .execute(pool)
397    .await
398    .map_err(map_sqlx_error)?;
399    Ok(())
400}