Skip to main content

ff_backend_postgres/
budget.rs

1//! Budget family — Postgres impl.
2//!
3//! **RFC-v0.7 Wave 4f.** Ships `EngineBackend::report_usage` against
4//! the Wave 3b schema (`0002_budget.sql`): `ff_budget_policy`,
5//! `ff_budget_usage`, `ff_budget_usage_dedup`.
6//!
7//! **RFC-020 Wave 9 Standalone-1 (Revision 6).** Extends this module
8//! with the 5 budget/quota admin methods — `create_budget`,
9//! `reset_budget`, `create_quota_policy`, `get_budget_status`,
10//! `report_usage_admin` — writing against 0012 (`ff_quota_policy` +
11//! window + admitted set) and 0013 (additive breach / scope / reset
12//! columns on `ff_budget_policy`). `report_usage_impl` is narrowly
13//! extended to maintain the new breach counters per §4.4.6 and to
14//! hold the policy row under `FOR NO KEY UPDATE` (replacing the
15//! previous `FOR SHARE` — reviewer-finding deadlock fix on the
16//! breach-UPDATE path).
17//!
18//! Isolation per v0.7 migration-master §Q11: READ COMMITTED + row-
19//! level locking on `report_usage_impl` (hot path). `reset_budget`
20//! runs SERIALIZABLE to match Valkey's Lua atomicity — the zero-all
21//! pattern must not interleave with a concurrent `report_usage`
22//! mid-flight.
23//!
24//! Idempotency per RFC-012 §R7.2.3: the caller-supplied `dedup_key`
25//! keys an `INSERT ... ON CONFLICT DO NOTHING`; on conflict the cached
26//! `outcome_json` row is returned verbatim (replay).
27
28use std::collections::{BTreeMap, HashMap};
29
30use ff_core::backend::UsageDimensions;
31use ff_core::contracts::{
32    BudgetStatus, CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs,
33    CreateQuotaPolicyResult, RecordSpendArgs, ReleaseBudgetArgs, ReportUsageAdminArgs,
34    ReportUsageResult, ResetBudgetArgs, ResetBudgetResult,
35};
36use ff_core::engine_error::{backend_context, EngineError, ValidationKind};
37use ff_core::partition::{budget_partition, quota_partition, PartitionConfig};
38use ff_core::types::{BudgetId, ExecutionId, TimestampMs};
39use serde_json::{json, Value as JsonValue};
40use sqlx::{PgPool, Row};
41use uuid::Uuid;
42
43use crate::error::map_sqlx_error;
44
45/// Canonical stringification of the custom-dimensions map.
46///
47/// Keyed lookups on `ff_budget_usage(..., dimensions_key)` rely on the
48/// canonical form being stable across callers. A `BTreeMap` iterates in
49/// sorted-key order, so the serialised JSON object is deterministic.
50/// For `report_usage` we key per-dimension (one row per dimension) so
51/// the stored key is simply the dimension name; this mirrors the Valkey
52/// Hash-field shape where each dim has its own HGET slot.
53fn dim_row_key(name: &str) -> String {
54    name.to_string()
55}
56
57/// Now in unix-milliseconds. Mirrors the `now_ms_timestamp` helper in
58/// the Valkey backend.
59fn now_ms() -> i64 {
60    std::time::SystemTime::now()
61        .duration_since(std::time::UNIX_EPOCH)
62        .map(|d| d.as_millis() as i64)
63        .unwrap_or(0)
64}
65
66/// Serialize a `ReportUsageResult` to the jsonb shape stored in
67/// `ff_budget_usage_dedup.outcome_json`. Shape:
68///
69/// ```json
70/// {"kind":"Ok"}
71/// {"kind":"AlreadyApplied"}
72/// {"kind":"SoftBreach","dimension":"...","current_usage":123,"soft_limit":100}
73/// {"kind":"HardBreach","dimension":"...","current_usage":123,"hard_limit":100}
74/// ```
75fn outcome_to_json(r: &ReportUsageResult) -> JsonValue {
76    match r {
77        ReportUsageResult::Ok => json!({"kind": "Ok"}),
78        ReportUsageResult::AlreadyApplied => json!({"kind": "AlreadyApplied"}),
79        ReportUsageResult::SoftBreach {
80            dimension,
81            current_usage,
82            soft_limit,
83        } => json!({
84            "kind": "SoftBreach",
85            "dimension": dimension,
86            "current_usage": current_usage,
87            "soft_limit": soft_limit,
88        }),
89        ReportUsageResult::HardBreach {
90            dimension,
91            current_usage,
92            hard_limit,
93        } => json!({
94            "kind": "HardBreach",
95            "dimension": dimension,
96            "current_usage": current_usage,
97            "hard_limit": hard_limit,
98        }),
99        // `ReportUsageResult` is `#[non_exhaustive]`; future variants
100        // land in ff-core before reaching the backend impl, so emit a
101        // safe placeholder that the replay path will reject rather
102        // than silently mis-decoding.
103        _ => json!({"kind": "Ok"}),
104    }
105}
106
107/// Inverse of [`outcome_to_json`] — used on the dedup-replay path.
108fn outcome_from_json(v: &JsonValue) -> Result<ReportUsageResult, EngineError> {
109    let kind = v.get("kind").and_then(|k| k.as_str()).ok_or_else(|| {
110        EngineError::Validation {
111            kind: ValidationKind::Corruption,
112            detail: "budget dedup outcome_json missing `kind`".into(),
113        }
114    })?;
115    match kind {
116        "Ok" => Ok(ReportUsageResult::Ok),
117        "AlreadyApplied" => Ok(ReportUsageResult::AlreadyApplied),
118        "SoftBreach" => Ok(ReportUsageResult::SoftBreach {
119            dimension: v
120                .get("dimension")
121                .and_then(|d| d.as_str())
122                .unwrap_or_default()
123                .to_string(),
124            current_usage: v.get("current_usage").and_then(|d| d.as_u64()).unwrap_or(0),
125            soft_limit: v.get("soft_limit").and_then(|d| d.as_u64()).unwrap_or(0),
126        }),
127        "HardBreach" => Ok(ReportUsageResult::HardBreach {
128            dimension: v
129                .get("dimension")
130                .and_then(|d| d.as_str())
131                .unwrap_or_default()
132                .to_string(),
133            current_usage: v.get("current_usage").and_then(|d| d.as_u64()).unwrap_or(0),
134            hard_limit: v.get("hard_limit").and_then(|d| d.as_u64()).unwrap_or(0),
135        }),
136        other => Err(EngineError::Validation {
137            kind: ValidationKind::Corruption,
138            detail: format!("budget dedup outcome_json unknown kind: {other}"),
139        }),
140    }
141}
142
143/// Extract the dimension → limit map from a `policy_json` blob, keyed
144/// by the shape documented in RFC-008 §Policy schema:
145/// `{"hard_limits": {"dim": u64}, "soft_limits": {"dim": u64}, ...}`.
146/// Missing field / non-object → empty map (= no limit on any dim).
147fn limits_from_policy(policy: &JsonValue, key: &str) -> BTreeMap<String, u64> {
148    policy
149        .get(key)
150        .and_then(|v| v.as_object())
151        .map(|obj| {
152            obj.iter()
153                .filter_map(|(k, v)| v.as_u64().map(|n| (k.clone(), n)))
154                .collect()
155        })
156        .unwrap_or_default()
157}
158
159/// Dedup-expiry window. Matches RFC-012 §R7.2.3's "caller-supplied
160/// idempotency key" retention: 24h by default.
161const DEDUP_TTL_MS: i64 = 24 * 60 * 60 * 1_000;
162
163// ───────────────────────────────────────────────────────────────────
164// §4.4.6 — `report_usage_impl` / `report_usage_admin` shared core
165// ───────────────────────────────────────────────────────────────────
166
167/// `EngineBackend::report_usage` — Postgres impl (worker-path). Thin
168/// wrapper around [`report_usage_and_check_core`].
169pub(crate) async fn report_usage_impl(
170    pool: &PgPool,
171    partition_config: &PartitionConfig,
172    budget: &BudgetId,
173    dimensions: UsageDimensions,
174) -> Result<ReportUsageResult, EngineError> {
175    report_usage_and_check_core(pool, partition_config, budget, dimensions).await
176}
177
178/// Admin-path `report_usage_admin`. Translates the admin-shape
179/// `ReportUsageAdminArgs` (parallel `dimensions` / `deltas` vectors)
180/// to `UsageDimensions` and delegates to the shared core. See RFC-020
181/// §4.4.6 "report_usage_admin entry point".
182pub(crate) async fn report_usage_admin_impl(
183    pool: &PgPool,
184    partition_config: &PartitionConfig,
185    budget: &BudgetId,
186    args: ReportUsageAdminArgs,
187) -> Result<ReportUsageResult, EngineError> {
188    if args.dimensions.len() != args.deltas.len() {
189        return Err(EngineError::Validation {
190            kind: ValidationKind::InvalidInput,
191            detail: "report_usage_admin: dimensions and deltas length mismatch".into(),
192        });
193    }
194    let mut custom: BTreeMap<String, u64> = BTreeMap::new();
195    for (d, v) in args.dimensions.into_iter().zip(args.deltas) {
196        custom.insert(d, v);
197    }
198    // `UsageDimensions` is `#[non_exhaustive]`; build via `new()` +
199    // mutate the fields we have direct access to (`custom`,
200    // `dedup_key`). `input_tokens` / `output_tokens` / `wall_ms` stay
201    // at their `Default` values — the shared core only inspects
202    // `custom` + `dedup_key`.
203    let mut ud = UsageDimensions::new();
204    ud.custom = custom;
205    ud.dedup_key = args.dedup_key;
206    report_usage_and_check_core(pool, partition_config, budget, ud).await
207}
208
209/// Shared body of `report_usage` + `report_usage_admin`. §4.4.6 lifts
210/// the Revision 5 §7.2 pin narrowly: this function now (1) loads the
211/// policy row with `FOR NO KEY UPDATE` rather than `FOR SHARE`
212/// (deadlock fix for the breach-UPDATE path) and (2) maintains the
213/// `breach_count` / `soft_breach_count` columns + `last_breach_*`
214/// metadata on the 0013 columns.
215async fn report_usage_and_check_core(
216    pool: &PgPool,
217    partition_config: &PartitionConfig,
218    budget: &BudgetId,
219    dimensions: UsageDimensions,
220) -> Result<ReportUsageResult, EngineError> {
221    let partition = budget_partition(budget, partition_config);
222    let partition_key: i16 = partition.index as i16;
223    let budget_id_str = budget.to_string();
224    let now = now_ms();
225
226    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
227
228    // RC is Postgres's default; assert explicitly per Q11.
229    sqlx::query("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
230        .execute(&mut *tx)
231        .await
232        .map_err(map_sqlx_error)?;
233
234    // ── Dedup ──
235    //
236    // When a dedup_key is provided, try to INSERT a placeholder dedup
237    // row. If the INSERT succeeds we've taken ownership and will fill
238    // the outcome_json below. If it conflicts the row already exists
239    // from a prior call — read it back and replay the cached outcome.
240    let dedup_owned = match dimensions.dedup_key.as_deref().filter(|k| !k.is_empty()) {
241        Some(dk) => {
242            let inserted = sqlx::query(
243                "INSERT INTO ff_budget_usage_dedup \
244                     (partition_key, dedup_key, outcome_json, applied_at_ms, expires_at_ms) \
245                 VALUES ($1, $2, '{}'::jsonb, $3, $4) \
246                 ON CONFLICT (partition_key, dedup_key) DO NOTHING \
247                 RETURNING applied_at_ms",
248            )
249            .bind(partition_key)
250            .bind(dk)
251            .bind(now)
252            .bind(now + DEDUP_TTL_MS)
253            .fetch_optional(&mut *tx)
254            .await
255            .map_err(map_sqlx_error)?;
256
257            if inserted.is_none() {
258                // Replay: fetch the cached outcome_json and return it.
259                let row = sqlx::query(
260                    "SELECT outcome_json FROM ff_budget_usage_dedup \
261                     WHERE partition_key = $1 AND dedup_key = $2",
262                )
263                .bind(partition_key)
264                .bind(dk)
265                .fetch_one(&mut *tx)
266                .await
267                .map_err(map_sqlx_error)?;
268                let outcome: JsonValue = row.get("outcome_json");
269                tx.commit().await.map_err(map_sqlx_error)?;
270                // Empty placeholder means a prior in-flight caller
271                // crashed before writing the outcome. Treat as
272                // AlreadyApplied so the caller sees idempotent
273                // semantics rather than double-counting.
274                if outcome.as_object().map(|o| o.is_empty()).unwrap_or(false) {
275                    return Ok(ReportUsageResult::AlreadyApplied);
276                }
277                return outcome_from_json(&outcome);
278            }
279            Some(dk.to_string())
280        }
281        None => None,
282    };
283
284    // ── Load policy row ──
285    //
286    // §4.4.6 lock-mode correction: `FOR NO KEY UPDATE` serialises the
287    // breach path deterministically (two concurrent callers both
288    // hitting hard-breach now serialise on this SELECT, not deadlock
289    // on the breach UPDATE that would otherwise need NO-KEY-UPDATE
290    // while each tx holds SHARE).
291    let policy_row = sqlx::query(
292        "SELECT policy_json FROM ff_budget_policy \
293         WHERE partition_key = $1 AND budget_id = $2 FOR NO KEY UPDATE",
294    )
295    .bind(partition_key)
296    .bind(&budget_id_str)
297    .fetch_optional(&mut *tx)
298    .await
299    .map_err(map_sqlx_error)?;
300
301    let policy: JsonValue = match policy_row {
302        Some(r) => r.get("policy_json"),
303        // No policy row ⇒ no limits defined. Treat all reports as Ok
304        // and still apply increments (Valkey parity).
305        None => JsonValue::Object(Default::default()),
306    };
307    let hard_limits = limits_from_policy(&policy, "hard_limits");
308    let soft_limits = limits_from_policy(&policy, "soft_limits");
309
310    // ── Compute admission per dimension. Hard breach on ANY dim
311    //    rejects the whole report (no increments applied); soft breach
312    //    is advisory (increments applied). Sorted-key iteration keeps
313    //    the reported dimension deterministic on multi-dim reports. ──
314    let mut per_dim_current: BTreeMap<String, u64> = BTreeMap::new();
315    for (dim, delta) in dimensions.custom.iter() {
316        let dim_key = dim_row_key(dim);
317        sqlx::query(
318            "INSERT INTO ff_budget_usage \
319                 (partition_key, budget_id, dimensions_key, current_value, updated_at_ms) \
320             VALUES ($1, $2, $3, 0, $4) \
321             ON CONFLICT (partition_key, budget_id, dimensions_key) DO NOTHING",
322        )
323        .bind(partition_key)
324        .bind(&budget_id_str)
325        .bind(&dim_key)
326        .bind(now)
327        .execute(&mut *tx)
328        .await
329        .map_err(map_sqlx_error)?;
330
331        let row = sqlx::query(
332            "SELECT current_value FROM ff_budget_usage \
333             WHERE partition_key = $1 AND budget_id = $2 AND dimensions_key = $3 \
334             FOR UPDATE",
335        )
336        .bind(partition_key)
337        .bind(&budget_id_str)
338        .bind(&dim_key)
339        .fetch_one(&mut *tx)
340        .await
341        .map_err(map_sqlx_error)?;
342        let cur: i64 = row.get("current_value");
343        let new_val = (cur as u64).saturating_add(*delta);
344
345        if let Some(hard) = hard_limits.get(dim)
346            && *hard > 0
347            && new_val > *hard
348        {
349            // §4.4.6 — Hard breach: no increments applied, maintain
350            // `breach_count` + `last_breach_*` on the policy row
351            // (mirrors Valkey `HINCRBY breach_count 1` + `HSET
352            // last_breach_at/last_breach_dim` at
353            // flowfabric.lua:6576-6580), commit dedup outcome.
354            sqlx::query(
355                "UPDATE ff_budget_policy \
356                 SET breach_count      = breach_count + 1, \
357                     last_breach_at_ms = $3, \
358                     last_breach_dim   = $4, \
359                     updated_at_ms     = $3 \
360                 WHERE partition_key = $1 AND budget_id = $2",
361            )
362            .bind(partition_key)
363            .bind(&budget_id_str)
364            .bind(now)
365            .bind(dim)
366            .execute(&mut *tx)
367            .await
368            .map_err(map_sqlx_error)?;
369
370            let outcome = ReportUsageResult::HardBreach {
371                dimension: dim.clone(),
372                current_usage: cur as u64,
373                hard_limit: *hard,
374            };
375            if let Some(dk) = dedup_owned.as_deref() {
376                finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
377            }
378            tx.commit().await.map_err(map_sqlx_error)?;
379            return Ok(outcome);
380        }
381        per_dim_current.insert(dim.clone(), new_val);
382    }
383
384    // ── Apply increments + check soft limits. Soft breach is advisory
385    //    (first-dim-wins, matching Valkey's iteration-order semantics). ──
386    let mut soft_breach: Option<ReportUsageResult> = None;
387    for (dim, delta) in dimensions.custom.iter() {
388        let dim_key = dim_row_key(dim);
389        let new_val = per_dim_current[dim];
390        sqlx::query(
391            "UPDATE ff_budget_usage \
392             SET current_value = current_value + $1, updated_at_ms = $2 \
393             WHERE partition_key = $3 AND budget_id = $4 AND dimensions_key = $5",
394        )
395        .bind(*delta as i64)
396        .bind(now)
397        .bind(partition_key)
398        .bind(&budget_id_str)
399        .bind(&dim_key)
400        .execute(&mut *tx)
401        .await
402        .map_err(map_sqlx_error)?;
403
404        if soft_breach.is_none()
405            && let Some(soft) = soft_limits.get(dim)
406            && *soft > 0
407            && new_val > *soft
408        {
409            soft_breach = Some(ReportUsageResult::SoftBreach {
410                dimension: dim.clone(),
411                current_usage: new_val,
412                soft_limit: *soft,
413            });
414        }
415    }
416
417    // §4.4.6 — on soft breach, HINCRBY soft_breach_count (Valkey
418    // parity at flowfabric.lua:6614). Hard-breach path returned early
419    // above; this block only fires on the non-hard-breach outcome.
420    if soft_breach.is_some() {
421        sqlx::query(
422            "UPDATE ff_budget_policy \
423             SET soft_breach_count = soft_breach_count + 1, \
424                 updated_at_ms     = $3 \
425             WHERE partition_key = $1 AND budget_id = $2",
426        )
427        .bind(partition_key)
428        .bind(&budget_id_str)
429        .bind(now)
430        .execute(&mut *tx)
431        .await
432        .map_err(map_sqlx_error)?;
433    }
434
435    let outcome = soft_breach.unwrap_or(ReportUsageResult::Ok);
436    if let Some(dk) = dedup_owned.as_deref() {
437        finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
438    }
439    tx.commit().await.map_err(map_sqlx_error)?;
440    Ok(outcome)
441}
442
443/// Write the final `outcome_json` onto the dedup row we inserted at
444/// the top of [`report_usage_and_check_core`].
445async fn finalize_dedup(
446    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
447    partition_key: i16,
448    dedup_key: &str,
449    outcome: &ReportUsageResult,
450) -> Result<(), EngineError> {
451    let json = outcome_to_json(outcome);
452    sqlx::query(
453        "UPDATE ff_budget_usage_dedup SET outcome_json = $1 \
454         WHERE partition_key = $2 AND dedup_key = $3",
455    )
456    .bind(json)
457    .bind(partition_key)
458    .bind(dedup_key)
459    .execute(&mut **tx)
460    .await
461    .map_err(map_sqlx_error)?;
462    Ok(())
463}
464
465/// Test-only helper: upsert a `ff_budget_policy` row directly. Pre-
466/// dates the trait-lifted `create_budget` (RFC-020 §4.4.2); retained
467/// for tests that still seed a raw `policy_json` blob + exercise the
468/// policy-row-missing branch of `report_usage_impl`.
469#[doc(hidden)]
470pub async fn upsert_policy_for_test(
471    pool: &PgPool,
472    partition_config: &PartitionConfig,
473    budget: &BudgetId,
474    policy_json: JsonValue,
475) -> Result<(), EngineError> {
476    let partition = budget_partition(budget, partition_config);
477    let partition_key: i16 = partition.index as i16;
478    let now = now_ms();
479    sqlx::query(
480        "INSERT INTO ff_budget_policy \
481             (partition_key, budget_id, policy_json, created_at_ms, updated_at_ms) \
482         VALUES ($1, $2, $3, $4, $4) \
483         ON CONFLICT (partition_key, budget_id) DO UPDATE \
484             SET policy_json = EXCLUDED.policy_json, \
485                 updated_at_ms = EXCLUDED.updated_at_ms",
486    )
487    .bind(partition_key)
488    .bind(budget.to_string())
489    .bind(policy_json)
490    .bind(now)
491    .execute(pool)
492    .await
493    .map_err(map_sqlx_error)?;
494    Ok(())
495}
496
497// ───────────────────────────────────────────────────────────────────
498// §4.4.2 — `create_budget`
499// ───────────────────────────────────────────────────────────────────
500
501/// Pack a [`CreateBudgetArgs`] into the `policy_json` blob shape
502/// `report_usage_and_check_core` + `get_budget_status` consume. Writes
503/// parallel `dimensions` / `hard_limits` / `soft_limits` vectors into
504/// `{hard_limits: {dim: u64, ...}, soft_limits: {dim: u64, ...},
505/// reset_interval_ms, on_hard_limit, on_soft_limit}` — the shape
506/// already used by `upsert_policy_for_test` + `limits_from_policy`.
507fn build_policy_json(args: &CreateBudgetArgs) -> JsonValue {
508    let mut hard = serde_json::Map::new();
509    let mut soft = serde_json::Map::new();
510    for (i, dim) in args.dimensions.iter().enumerate() {
511        if let Some(h) = args.hard_limits.get(i).copied() {
512            hard.insert(dim.clone(), json!(h));
513        }
514        if let Some(s) = args.soft_limits.get(i).copied() {
515            soft.insert(dim.clone(), json!(s));
516        }
517    }
518    json!({
519        "hard_limits": hard,
520        "soft_limits": soft,
521        "reset_interval_ms": args.reset_interval_ms,
522        "on_hard_limit": args.on_hard_limit,
523        "on_soft_limit": args.on_soft_limit,
524    })
525}
526
527pub(crate) async fn create_budget_impl(
528    pool: &PgPool,
529    partition_config: &PartitionConfig,
530    args: CreateBudgetArgs,
531) -> Result<CreateBudgetResult, EngineError> {
532    if args.dimensions.len() != args.hard_limits.len()
533        || args.dimensions.len() != args.soft_limits.len()
534    {
535        return Err(EngineError::Validation {
536            kind: ValidationKind::InvalidInput,
537            detail: "create_budget: dimensions / hard_limits / soft_limits length mismatch"
538                .into(),
539        });
540    }
541
542    let partition = budget_partition(&args.budget_id, partition_config);
543    let partition_key: i16 = partition.index as i16;
544    let budget_id = args.budget_id.clone();
545    let now: i64 = args.now.0;
546    let policy_json = build_policy_json(&args);
547    let reset_interval_ms = args.reset_interval_ms as i64;
548
549    // `next_reset_at_ms` seeded to `now + interval` when interval > 0
550    // so the `budget_reset` reconciler picks it up (RFC §4.4.3).
551    // Matches Valkey's `ff_create_budget` `ZADD resets_zset` scheduling
552    // at flowfabric.lua:6522-6526.
553    let row = sqlx::query(
554        "INSERT INTO ff_budget_policy \
555             (partition_key, budget_id, policy_json, scope_type, scope_id, \
556              enforcement_mode, breach_count, soft_breach_count, \
557              last_breach_at_ms, last_breach_dim, next_reset_at_ms, \
558              created_at_ms, updated_at_ms) \
559         VALUES ($1, $2, $3, $4, $5, $6, 0, 0, NULL, NULL, \
560                 CASE WHEN $7::bigint > 0 THEN $8::bigint + $7::bigint ELSE NULL END, \
561                 $8, $8) \
562         ON CONFLICT (partition_key, budget_id) DO NOTHING \
563         RETURNING created_at_ms",
564    )
565    .bind(partition_key)
566    .bind(budget_id.to_string())
567    .bind(policy_json)
568    .bind(&args.scope_type)
569    .bind(&args.scope_id)
570    .bind(&args.enforcement_mode)
571    .bind(reset_interval_ms)
572    .bind(now)
573    .fetch_optional(pool)
574    .await
575    .map_err(map_sqlx_error)?;
576
577    match row {
578        Some(_) => Ok(CreateBudgetResult::Created { budget_id }),
579        None => Ok(CreateBudgetResult::AlreadySatisfied { budget_id }),
580    }
581}
582
583// ───────────────────────────────────────────────────────────────────
584// §4.4.3 — `reset_budget`
585// ───────────────────────────────────────────────────────────────────
586
587pub(crate) async fn reset_budget_impl(
588    pool: &PgPool,
589    partition_config: &PartitionConfig,
590    args: ResetBudgetArgs,
591) -> Result<ResetBudgetResult, EngineError> {
592    let partition = budget_partition(&args.budget_id, partition_config);
593    let partition_key: i16 = partition.index as i16;
594    let budget_id_str = args.budget_id.to_string();
595    let now: i64 = args.now.0;
596
597    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
598    // SERIALIZABLE matches Valkey's Lua atomicity — the zero-all
599    // pattern must not see mid-flight increments from concurrent
600    // `report_usage` / `report_usage_admin`.
601    sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
602        .execute(&mut *tx)
603        .await
604        .map_err(map_sqlx_error)?;
605
606    // Lock the policy row with `FOR NO KEY UPDATE` before zeroing —
607    // serialises against a concurrent breach-UPDATE on the same row
608    // (§4.4.6 lock-mode discipline).
609    let policy_row = sqlx::query(
610        "SELECT policy_json FROM ff_budget_policy \
611         WHERE partition_key = $1 AND budget_id = $2 FOR NO KEY UPDATE",
612    )
613    .bind(partition_key)
614    .bind(&budget_id_str)
615    .fetch_optional(&mut *tx)
616    .await
617    .map_err(map_sqlx_error)?;
618    let Some(policy_row) = policy_row else {
619        tx.rollback().await.map_err(map_sqlx_error)?;
620        return Err(backend_context(
621            EngineError::NotFound { entity: "budget" },
622            format!("reset_budget: {}", args.budget_id),
623        ));
624    };
625    let policy_json: JsonValue = policy_row.get("policy_json");
626    let reset_interval_ms: i64 = policy_json
627        .get("reset_interval_ms")
628        .and_then(|v| v.as_i64())
629        .unwrap_or(0);
630
631    // Zero all usage rows for this budget.
632    sqlx::query(
633        "UPDATE ff_budget_usage \
634         SET current_value = 0, last_reset_at_ms = $3, updated_at_ms = $3 \
635         WHERE partition_key = $1 AND budget_id = $2",
636    )
637    .bind(partition_key)
638    .bind(&budget_id_str)
639    .bind(now)
640    .execute(&mut *tx)
641    .await
642    .map_err(map_sqlx_error)?;
643
644    // Reset policy metadata + reschedule.
645    let row = sqlx::query(
646        "UPDATE ff_budget_policy \
647         SET last_breach_at_ms = NULL, \
648             last_breach_dim   = NULL, \
649             updated_at_ms     = $3, \
650             next_reset_at_ms  = CASE \
651                 WHEN $4::bigint > 0 THEN $3 + $4::bigint \
652                 ELSE NULL \
653             END \
654         WHERE partition_key = $1 AND budget_id = $2 \
655         RETURNING next_reset_at_ms",
656    )
657    .bind(partition_key)
658    .bind(&budget_id_str)
659    .bind(now)
660    .bind(reset_interval_ms)
661    .fetch_one(&mut *tx)
662    .await
663    .map_err(map_sqlx_error)?;
664
665    let next_reset: Option<i64> = row
666        .try_get::<Option<i64>, _>("next_reset_at_ms")
667        .map_err(map_sqlx_error)?;
668
669    tx.commit().await.map_err(map_sqlx_error)?;
670
671    Ok(ResetBudgetResult::Reset {
672        // `ResetBudgetResult::Reset` carries a non-optional
673        // `TimestampMs`; when no interval is configured the budget
674        // has no scheduled-reset, so report `0` (matches Valkey's
675        // zero-score-when-unset behaviour for ZADD resets_zset).
676        next_reset_at: TimestampMs(next_reset.unwrap_or(0)),
677    })
678}
679
680// ───────────────────────────────────────────────────────────────────
681// §4.4.1 — `create_quota_policy`
682// ───────────────────────────────────────────────────────────────────
683
684pub(crate) async fn create_quota_policy_impl(
685    pool: &PgPool,
686    partition_config: &PartitionConfig,
687    args: CreateQuotaPolicyArgs,
688) -> Result<CreateQuotaPolicyResult, EngineError> {
689    let partition = quota_partition(&args.quota_policy_id, partition_config);
690    let partition_key: i16 = partition.index as i16;
691    let qid = args.quota_policy_id.clone();
692    let now: i64 = args.now.0;
693
694    let row = sqlx::query(
695        "INSERT INTO ff_quota_policy \
696             (partition_key, quota_policy_id, requests_per_window_seconds, \
697              max_requests_per_window, active_concurrency_cap, \
698              active_concurrency, created_at_ms, updated_at_ms) \
699         VALUES ($1, $2, $3, $4, $5, 0, $6, $6) \
700         ON CONFLICT (partition_key, quota_policy_id) DO NOTHING \
701         RETURNING created_at_ms",
702    )
703    .bind(partition_key)
704    .bind(qid.to_string())
705    .bind(args.window_seconds as i64)
706    .bind(args.max_requests_per_window as i64)
707    .bind(args.max_concurrent as i64)
708    .bind(now)
709    .fetch_optional(pool)
710    .await
711    .map_err(map_sqlx_error)?;
712
713    match row {
714        Some(_) => Ok(CreateQuotaPolicyResult::Created {
715            quota_policy_id: qid,
716        }),
717        None => Ok(CreateQuotaPolicyResult::AlreadySatisfied {
718            quota_policy_id: qid,
719        }),
720    }
721}
722
723// ───────────────────────────────────────────────────────────────────
724// §4.4.7 — `get_budget_status`
725// ───────────────────────────────────────────────────────────────────
726
727pub(crate) async fn get_budget_status_impl(
728    pool: &PgPool,
729    partition_config: &PartitionConfig,
730    budget: &BudgetId,
731) -> Result<BudgetStatus, EngineError> {
732    let partition = budget_partition(budget, partition_config);
733    let partition_key: i16 = partition.index as i16;
734    let budget_id_str = budget.to_string();
735
736    // SELECT 1: policy row (definitional + counters + scheduler).
737    let policy_row = sqlx::query(
738        "SELECT scope_type, scope_id, enforcement_mode, \
739                breach_count, soft_breach_count, \
740                last_breach_at_ms, last_breach_dim, \
741                next_reset_at_ms, created_at_ms, \
742                policy_json \
743         FROM ff_budget_policy \
744         WHERE partition_key = $1 AND budget_id = $2",
745    )
746    .bind(partition_key)
747    .bind(&budget_id_str)
748    .fetch_optional(pool)
749    .await
750    .map_err(map_sqlx_error)?;
751    let Some(policy_row) = policy_row else {
752        return Err(backend_context(
753            EngineError::NotFound { entity: "budget" },
754            format!("get_budget_status: {budget}"),
755        ));
756    };
757
758    let scope_type: String = policy_row.get("scope_type");
759    let scope_id: String = policy_row.get("scope_id");
760    let enforcement_mode: String = policy_row.get("enforcement_mode");
761    let breach_count: i64 = policy_row.get("breach_count");
762    let soft_breach_count: i64 = policy_row.get("soft_breach_count");
763    let last_breach_at_ms: Option<i64> = policy_row
764        .try_get::<Option<i64>, _>("last_breach_at_ms")
765        .map_err(map_sqlx_error)?;
766    let last_breach_dim: Option<String> = policy_row
767        .try_get::<Option<String>, _>("last_breach_dim")
768        .map_err(map_sqlx_error)?;
769    let next_reset_at_ms: Option<i64> = policy_row
770        .try_get::<Option<i64>, _>("next_reset_at_ms")
771        .map_err(map_sqlx_error)?;
772    let created_at_ms: i64 = policy_row.get("created_at_ms");
773    let policy_json: JsonValue = policy_row.get("policy_json");
774
775    // SELECT 2: usage rows (one per dimension).
776    let usage_rows = sqlx::query(
777        "SELECT dimensions_key, current_value \
778         FROM ff_budget_usage \
779         WHERE partition_key = $1 AND budget_id = $2",
780    )
781    .bind(partition_key)
782    .bind(&budget_id_str)
783    .fetch_all(pool)
784    .await
785    .map_err(map_sqlx_error)?;
786
787    let mut usage: HashMap<String, u64> = HashMap::new();
788    for r in &usage_rows {
789        let key: String = r.get("dimensions_key");
790        let val: i64 = r.get("current_value");
791        if key == "_init" {
792            continue;
793        }
794        usage.insert(key, val.max(0) as u64);
795    }
796
797    // Limits parse from policy_json (shape documented on
798    // `build_policy_json` above; matches Valkey's 3× HGETALL shape
799    // byte-for-byte post-stringification).
800    let hard_limits: HashMap<String, u64> =
801        limits_from_policy(&policy_json, "hard_limits").into_iter().collect();
802    let soft_limits: HashMap<String, u64> =
803        limits_from_policy(&policy_json, "soft_limits").into_iter().collect();
804
805    // Valkey stringifies i64 timestamps; mirror byte-for-byte so the
806    // contract shape is identical across backends.
807    let fmt_opt = |v: Option<i64>| -> Option<String> { v.map(|n| n.to_string()) };
808
809    Ok(BudgetStatus {
810        budget_id: budget.to_string(),
811        scope_type,
812        scope_id,
813        enforcement_mode,
814        usage,
815        hard_limits,
816        soft_limits,
817        breach_count: breach_count.max(0) as u64,
818        soft_breach_count: soft_breach_count.max(0) as u64,
819        last_breach_at: fmt_opt(last_breach_at_ms),
820        last_breach_dim,
821        next_reset_at: fmt_opt(next_reset_at_ms),
822        created_at: Some(created_at_ms.to_string()),
823    })
824}
825
826// ───────────────────────────────────────────────────────────────────
827// §4.4.3 — `budget_reset` reconciler entry point
828// ───────────────────────────────────────────────────────────────────
829
830/// Called per row returned by the `budget_reset` reconciler scan.
831/// Looks up the policy's `reset_interval_ms` from `policy_json`,
832/// zeroes usage rows + resets breach metadata + reschedules
833/// `next_reset_at_ms` under SERIALIZABLE. Same tx shape as
834/// `reset_budget_impl` but keyed off the scanner-supplied
835/// `(partition_key, budget_id)` pair rather than a caller-supplied
836/// `BudgetId`.
837pub(crate) async fn budget_reset_reconciler_apply(
838    pool: &PgPool,
839    partition_key: i16,
840    budget_id: &str,
841    now: i64,
842) -> Result<(), EngineError> {
843    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
844    sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
845        .execute(&mut *tx)
846        .await
847        .map_err(map_sqlx_error)?;
848
849    let policy_row = sqlx::query(
850        "SELECT policy_json, next_reset_at_ms FROM ff_budget_policy \
851         WHERE partition_key = $1 AND budget_id = $2 FOR NO KEY UPDATE",
852    )
853    .bind(partition_key)
854    .bind(budget_id)
855    .fetch_optional(&mut *tx)
856    .await
857    .map_err(map_sqlx_error)?;
858    let Some(policy_row) = policy_row else {
859        tx.rollback().await.map_err(map_sqlx_error)?;
860        return Ok(());
861    };
862    // Defensive re-check: a concurrent `reset_budget` may have already
863    // advanced `next_reset_at_ms` past `now`. If so this scanner tick
864    // is a no-op.
865    let next_reset: Option<i64> = policy_row
866        .try_get::<Option<i64>, _>("next_reset_at_ms")
867        .map_err(map_sqlx_error)?;
868    if !matches!(next_reset, Some(n) if n <= now) {
869        tx.rollback().await.map_err(map_sqlx_error)?;
870        return Ok(());
871    }
872    let policy_json: JsonValue = policy_row.get("policy_json");
873    let reset_interval_ms: i64 = policy_json
874        .get("reset_interval_ms")
875        .and_then(|v| v.as_i64())
876        .unwrap_or(0);
877
878    sqlx::query(
879        "UPDATE ff_budget_usage \
880         SET current_value = 0, last_reset_at_ms = $3, updated_at_ms = $3 \
881         WHERE partition_key = $1 AND budget_id = $2",
882    )
883    .bind(partition_key)
884    .bind(budget_id)
885    .bind(now)
886    .execute(&mut *tx)
887    .await
888    .map_err(map_sqlx_error)?;
889
890    sqlx::query(
891        "UPDATE ff_budget_policy \
892         SET last_breach_at_ms = NULL, \
893             last_breach_dim   = NULL, \
894             updated_at_ms     = $3, \
895             next_reset_at_ms  = CASE \
896                 WHEN $4::bigint > 0 THEN $3 + $4::bigint \
897                 ELSE NULL \
898             END \
899         WHERE partition_key = $1 AND budget_id = $2",
900    )
901    .bind(partition_key)
902    .bind(budget_id)
903    .bind(now)
904    .bind(reset_interval_ms)
905    .execute(&mut *tx)
906    .await
907    .map_err(map_sqlx_error)?;
908
909    tx.commit().await.map_err(map_sqlx_error)?;
910    Ok(())
911}
912
913// ───────────────────────────────────────────────────────────────────
914// cairn #454 Phase 4a — `record_spend` + `release_budget`
915// (per-execution ledger, option A; parity with Valkey PR #464).
916// ───────────────────────────────────────────────────────────────────
917
918/// Extract the bare UUID from an `ExecutionId` (formatted
919/// `{fp:N}:<uuid>`). `ff_budget_usage_by_exec.execution_id` is typed
920/// as `uuid`, not the wrapped hash-tag string, so we need the inner
921/// bytes. Mirrors `attempt::split_exec_id` but returns only the UUID.
922fn exec_uuid(eid: &ExecutionId) -> Result<Uuid, EngineError> {
923    let s = eid.as_str();
924    let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
925        kind: ValidationKind::InvalidInput,
926        detail: format!("execution_id missing `{{fp:` prefix: {s}"),
927    })?;
928    let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
929        kind: ValidationKind::InvalidInput,
930        detail: format!("execution_id missing `}}:`: {s}"),
931    })?;
932    Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
933        kind: ValidationKind::InvalidInput,
934        detail: format!("execution_id UUID invalid: {s}"),
935    })
936}
937
938/// cairn #454 Phase 4a — `EngineBackend::record_spend`.
939///
940/// Per-execution budget spend with open-set dimensions. Structurally
941/// identical to [`report_usage_and_check_core`] (dedup INSERT → policy
942/// lock → per-dim admission → apply increments → soft-breach book-
943/// keeping), with two deltas from option A:
944///
945/// 1. The idempotency key is `args.idempotency_key` (caller-computed
946///    SHA-256 hex per RFC cairn #454 Q1) instead of
947///    `UsageDimensions::dedup_key`.
948/// 2. After the aggregate UPSERT the same deltas land in the
949///    `ff_budget_usage_by_exec` per-execution ledger (new 0020 table)
950///    so `release_budget` can reverse just this execution's
951///    attribution. Matches Valkey's `HINCRBY` into
952///    `ff:budget:...:by_exec:<execution_id>`.
953pub(crate) async fn record_spend_impl(
954    pool: &PgPool,
955    partition_config: &PartitionConfig,
956    args: RecordSpendArgs,
957) -> Result<ReportUsageResult, EngineError> {
958    let partition = budget_partition(&args.budget_id, partition_config);
959    let partition_key: i16 = partition.index as i16;
960    let budget_id_str = args.budget_id.to_string();
961    let exec_uuid = exec_uuid(&args.execution_id)?;
962    let now = now_ms();
963
964    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
965    sqlx::query("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
966        .execute(&mut *tx)
967        .await
968        .map_err(map_sqlx_error)?;
969
970    // ── Dedup (reuses the existing `ff_budget_usage_dedup` infra) ──
971    let dedup_owned: Option<String> = if args.idempotency_key.is_empty() {
972        None
973    } else {
974        let inserted = sqlx::query(
975            "INSERT INTO ff_budget_usage_dedup \
976                 (partition_key, dedup_key, outcome_json, applied_at_ms, expires_at_ms) \
977             VALUES ($1, $2, '{}'::jsonb, $3, $4) \
978             ON CONFLICT (partition_key, dedup_key) DO NOTHING \
979             RETURNING applied_at_ms",
980        )
981        .bind(partition_key)
982        .bind(&args.idempotency_key)
983        .bind(now)
984        .bind(now + DEDUP_TTL_MS)
985        .fetch_optional(&mut *tx)
986        .await
987        .map_err(map_sqlx_error)?;
988
989        if inserted.is_none() {
990            let row = sqlx::query(
991                "SELECT outcome_json FROM ff_budget_usage_dedup \
992                 WHERE partition_key = $1 AND dedup_key = $2",
993            )
994            .bind(partition_key)
995            .bind(&args.idempotency_key)
996            .fetch_one(&mut *tx)
997            .await
998            .map_err(map_sqlx_error)?;
999            let outcome: JsonValue = row.get("outcome_json");
1000            tx.commit().await.map_err(map_sqlx_error)?;
1001            // cairn #454 Phase 3 parity with Valkey `ff_record_spend`:
1002            // dedup-hit always returns `AlreadyApplied` regardless of
1003            // the original outcome (SET `dedup_key "1"` in Lua —
1004            // outcome is not replayed). We still consult the stored
1005            // outcome only to verify the row isn't an orphaned empty
1006            // placeholder (in which case a prior in-flight caller
1007            // crashed; we treat it the same — AlreadyApplied).
1008            let _ = outcome; // placeholder read, semantics match Valkey
1009            return Ok(ReportUsageResult::AlreadyApplied);
1010        }
1011        Some(args.idempotency_key.clone())
1012    };
1013
1014    // ── Load policy row with `FOR NO KEY UPDATE` (same lock-mode
1015    //    discipline as `report_usage_and_check_core`). ──
1016    let policy_row = sqlx::query(
1017        "SELECT policy_json FROM ff_budget_policy \
1018         WHERE partition_key = $1 AND budget_id = $2 FOR NO KEY UPDATE",
1019    )
1020    .bind(partition_key)
1021    .bind(&budget_id_str)
1022    .fetch_optional(&mut *tx)
1023    .await
1024    .map_err(map_sqlx_error)?;
1025
1026    let policy: JsonValue = match policy_row {
1027        Some(r) => r.get("policy_json"),
1028        None => JsonValue::Object(Default::default()),
1029    };
1030    let hard_limits = limits_from_policy(&policy, "hard_limits");
1031    let soft_limits = limits_from_policy(&policy, "soft_limits");
1032
1033    // ── Hard-breach pre-check: ANY dim over hard-limit rejects the
1034    //    whole call (no increments applied, ledger untouched). ──
1035    let mut per_dim_current: BTreeMap<String, u64> = BTreeMap::new();
1036    for (dim, delta) in args.deltas.iter() {
1037        let dim_key = dim_row_key(dim);
1038        sqlx::query(
1039            "INSERT INTO ff_budget_usage \
1040                 (partition_key, budget_id, dimensions_key, current_value, updated_at_ms) \
1041             VALUES ($1, $2, $3, 0, $4) \
1042             ON CONFLICT (partition_key, budget_id, dimensions_key) DO NOTHING",
1043        )
1044        .bind(partition_key)
1045        .bind(&budget_id_str)
1046        .bind(&dim_key)
1047        .bind(now)
1048        .execute(&mut *tx)
1049        .await
1050        .map_err(map_sqlx_error)?;
1051
1052        let row = sqlx::query(
1053            "SELECT current_value FROM ff_budget_usage \
1054             WHERE partition_key = $1 AND budget_id = $2 AND dimensions_key = $3 \
1055             FOR UPDATE",
1056        )
1057        .bind(partition_key)
1058        .bind(&budget_id_str)
1059        .bind(&dim_key)
1060        .fetch_one(&mut *tx)
1061        .await
1062        .map_err(map_sqlx_error)?;
1063        let cur: i64 = row.get("current_value");
1064        let new_val = (cur as u64).saturating_add(*delta);
1065
1066        if let Some(hard) = hard_limits.get(dim)
1067            && *hard > 0
1068            && new_val > *hard
1069        {
1070            sqlx::query(
1071                "UPDATE ff_budget_policy \
1072                 SET breach_count      = breach_count + 1, \
1073                     last_breach_at_ms = $3, \
1074                     last_breach_dim   = $4, \
1075                     updated_at_ms     = $3 \
1076                 WHERE partition_key = $1 AND budget_id = $2",
1077            )
1078            .bind(partition_key)
1079            .bind(&budget_id_str)
1080            .bind(now)
1081            .bind(dim)
1082            .execute(&mut *tx)
1083            .await
1084            .map_err(map_sqlx_error)?;
1085
1086            let outcome = ReportUsageResult::HardBreach {
1087                dimension: dim.clone(),
1088                current_usage: cur as u64,
1089                hard_limit: *hard,
1090            };
1091            if let Some(dk) = dedup_owned.as_deref() {
1092                finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
1093            }
1094            tx.commit().await.map_err(map_sqlx_error)?;
1095            return Ok(outcome);
1096        }
1097        per_dim_current.insert(dim.clone(), new_val);
1098    }
1099
1100    // ── Apply increments + soft-breach detection + mirror into the
1101    //    per-execution ledger (new 0020 table). ──
1102    let mut soft_breach: Option<ReportUsageResult> = None;
1103    for (dim, delta) in args.deltas.iter() {
1104        let dim_key = dim_row_key(dim);
1105        let new_val = per_dim_current[dim];
1106        sqlx::query(
1107            "UPDATE ff_budget_usage \
1108             SET current_value = current_value + $1, updated_at_ms = $2 \
1109             WHERE partition_key = $3 AND budget_id = $4 AND dimensions_key = $5",
1110        )
1111        .bind(*delta as i64)
1112        .bind(now)
1113        .bind(partition_key)
1114        .bind(&budget_id_str)
1115        .bind(&dim_key)
1116        .execute(&mut *tx)
1117        .await
1118        .map_err(map_sqlx_error)?;
1119
1120        // Per-execution ledger UPSERT — additive on repeat
1121        // `record_spend` for the same (budget, exec, dim).
1122        sqlx::query(
1123            "INSERT INTO ff_budget_usage_by_exec \
1124                 (partition_key, budget_id, execution_id, dimensions_key, \
1125                  delta_total, updated_at_ms) \
1126             VALUES ($1, $2, $3, $4, $5, $6) \
1127             ON CONFLICT (partition_key, budget_id, execution_id, dimensions_key) \
1128             DO UPDATE SET delta_total = ff_budget_usage_by_exec.delta_total + EXCLUDED.delta_total, \
1129                           updated_at_ms = EXCLUDED.updated_at_ms",
1130        )
1131        .bind(partition_key)
1132        .bind(&budget_id_str)
1133        .bind(exec_uuid)
1134        .bind(&dim_key)
1135        .bind(*delta as i64)
1136        .bind(now)
1137        .execute(&mut *tx)
1138        .await
1139        .map_err(map_sqlx_error)?;
1140
1141        if soft_breach.is_none()
1142            && let Some(soft) = soft_limits.get(dim)
1143            && *soft > 0
1144            && new_val > *soft
1145        {
1146            soft_breach = Some(ReportUsageResult::SoftBreach {
1147                dimension: dim.clone(),
1148                current_usage: new_val,
1149                soft_limit: *soft,
1150            });
1151        }
1152    }
1153
1154    if soft_breach.is_some() {
1155        sqlx::query(
1156            "UPDATE ff_budget_policy \
1157             SET soft_breach_count = soft_breach_count + 1, \
1158                 updated_at_ms     = $3 \
1159             WHERE partition_key = $1 AND budget_id = $2",
1160        )
1161        .bind(partition_key)
1162        .bind(&budget_id_str)
1163        .bind(now)
1164        .execute(&mut *tx)
1165        .await
1166        .map_err(map_sqlx_error)?;
1167    }
1168
1169    let outcome = soft_breach.unwrap_or(ReportUsageResult::Ok);
1170    if let Some(dk) = dedup_owned.as_deref() {
1171        finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
1172    }
1173    tx.commit().await.map_err(map_sqlx_error)?;
1174    Ok(outcome)
1175}
1176
1177/// cairn #454 Phase 4a — `EngineBackend::release_budget`.
1178///
1179/// Reverses a single execution's contribution to a budget aggregate
1180/// using the per-exec ledger from 0020. Scans all
1181/// `ff_budget_usage_by_exec` rows for (budget, exec), subtracts each
1182/// `delta_total` from the matching aggregate (clamped at 0 — matches
1183/// the Valkey `math.max(0, ...)` semantics), then DELETEs the ledger
1184/// rows. Idempotent: empty ledger ⇒ no-op `Ok(())`.
1185pub(crate) async fn release_budget_impl(
1186    pool: &PgPool,
1187    partition_config: &PartitionConfig,
1188    args: ReleaseBudgetArgs,
1189) -> Result<(), EngineError> {
1190    let partition = budget_partition(&args.budget_id, partition_config);
1191    let partition_key: i16 = partition.index as i16;
1192    let budget_id_str = args.budget_id.to_string();
1193    let exec_uuid = exec_uuid(&args.execution_id)?;
1194    let now = now_ms();
1195
1196    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
1197    sqlx::query("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
1198        .execute(&mut *tx)
1199        .await
1200        .map_err(map_sqlx_error)?;
1201
1202    // Scan the ledger under `FOR UPDATE` so concurrent
1203    // `record_spend` on the same (budget, exec, dim) serialises
1204    // on the row lock rather than racing the DELETE.
1205    let rows = sqlx::query(
1206        "SELECT dimensions_key, delta_total \
1207         FROM ff_budget_usage_by_exec \
1208         WHERE partition_key = $1 AND budget_id = $2 AND execution_id = $3 \
1209         FOR UPDATE",
1210    )
1211    .bind(partition_key)
1212    .bind(&budget_id_str)
1213    .bind(exec_uuid)
1214    .fetch_all(&mut *tx)
1215    .await
1216    .map_err(map_sqlx_error)?;
1217
1218    if rows.is_empty() {
1219        // No prior record_spend for this execution ⇒ nothing to
1220        // reverse. Idempotent no-op, matching Valkey's behaviour when
1221        // `ff:budget:...:by_exec:<exec>` doesn't exist.
1222        tx.commit().await.map_err(map_sqlx_error)?;
1223        return Ok(());
1224    }
1225
1226    for row in &rows {
1227        let dim: String = row.get("dimensions_key");
1228        let delta: i64 = row.get("delta_total");
1229        sqlx::query(
1230            "UPDATE ff_budget_usage \
1231             SET current_value = GREATEST(0::bigint, current_value - $1), \
1232                 updated_at_ms = $2 \
1233             WHERE partition_key = $3 AND budget_id = $4 AND dimensions_key = $5",
1234        )
1235        .bind(delta)
1236        .bind(now)
1237        .bind(partition_key)
1238        .bind(&budget_id_str)
1239        .bind(&dim)
1240        .execute(&mut *tx)
1241        .await
1242        .map_err(map_sqlx_error)?;
1243    }
1244
1245    sqlx::query(
1246        "DELETE FROM ff_budget_usage_by_exec \
1247         WHERE partition_key = $1 AND budget_id = $2 AND execution_id = $3",
1248    )
1249    .bind(partition_key)
1250    .bind(&budget_id_str)
1251    .bind(exec_uuid)
1252    .execute(&mut *tx)
1253    .await
1254    .map_err(map_sqlx_error)?;
1255
1256    tx.commit().await.map_err(map_sqlx_error)?;
1257    Ok(())
1258}