Skip to main content

ff_backend_postgres/
budget.rs

1//! Budget family — Postgres impl.
2//!
3//! **RFC-v0.7 Wave 4f.** Ships `EngineBackend::report_usage` against
4//! the Wave 3b schema (`0002_budget.sql`): `ff_budget_policy`,
5//! `ff_budget_usage`, `ff_budget_usage_dedup`.
6//!
7//! **RFC-020 Wave 9 Standalone-1 (Revision 6).** Extends this module
8//! with the 5 budget/quota admin methods — `create_budget`,
9//! `reset_budget`, `create_quota_policy`, `get_budget_status`,
10//! `report_usage_admin` — writing against 0012 (`ff_quota_policy` +
11//! window + admitted set) and 0013 (additive breach / scope / reset
12//! columns on `ff_budget_policy`). `report_usage_impl` is narrowly
13//! extended to maintain the new breach counters per §4.4.6 and to
14//! hold the policy row under `FOR NO KEY UPDATE` (replacing the
15//! previous `FOR SHARE` — reviewer-finding deadlock fix on the
16//! breach-UPDATE path).
17//!
18//! Isolation per v0.7 migration-master §Q11: READ COMMITTED + row-
19//! level locking on `report_usage_impl` (hot path). `reset_budget`
20//! runs SERIALIZABLE to match Valkey's Lua atomicity — the zero-all
21//! pattern must not interleave with a concurrent `report_usage`
22//! mid-flight.
23//!
24//! Idempotency per RFC-012 §R7.2.3: the caller-supplied `dedup_key`
25//! keys an `INSERT ... ON CONFLICT DO NOTHING`; on conflict the cached
26//! `outcome_json` row is returned verbatim (replay).
27
28use std::collections::{BTreeMap, HashMap};
29
30use ff_core::backend::UsageDimensions;
31use ff_core::contracts::{
32    BudgetStatus, CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs,
33    CreateQuotaPolicyResult, ReportUsageAdminArgs, ReportUsageResult, ResetBudgetArgs,
34    ResetBudgetResult,
35};
36use ff_core::engine_error::{backend_context, EngineError, ValidationKind};
37use ff_core::partition::{budget_partition, quota_partition, PartitionConfig};
38use ff_core::types::{BudgetId, TimestampMs};
39use serde_json::{json, Value as JsonValue};
40use sqlx::{PgPool, Row};
41
42use crate::error::map_sqlx_error;
43
44/// Canonical stringification of the custom-dimensions map.
45///
46/// Keyed lookups on `ff_budget_usage(..., dimensions_key)` rely on the
47/// canonical form being stable across callers. A `BTreeMap` iterates in
48/// sorted-key order, so the serialised JSON object is deterministic.
49/// For `report_usage` we key per-dimension (one row per dimension) so
50/// the stored key is simply the dimension name; this mirrors the Valkey
51/// Hash-field shape where each dim has its own HGET slot.
52fn dim_row_key(name: &str) -> String {
53    name.to_string()
54}
55
56/// Now in unix-milliseconds. Mirrors the `now_ms_timestamp` helper in
57/// the Valkey backend.
58fn now_ms() -> i64 {
59    std::time::SystemTime::now()
60        .duration_since(std::time::UNIX_EPOCH)
61        .map(|d| d.as_millis() as i64)
62        .unwrap_or(0)
63}
64
65/// Serialize a `ReportUsageResult` to the jsonb shape stored in
66/// `ff_budget_usage_dedup.outcome_json`. Shape:
67///
68/// ```json
69/// {"kind":"Ok"}
70/// {"kind":"AlreadyApplied"}
71/// {"kind":"SoftBreach","dimension":"...","current_usage":123,"soft_limit":100}
72/// {"kind":"HardBreach","dimension":"...","current_usage":123,"hard_limit":100}
73/// ```
74fn outcome_to_json(r: &ReportUsageResult) -> JsonValue {
75    match r {
76        ReportUsageResult::Ok => json!({"kind": "Ok"}),
77        ReportUsageResult::AlreadyApplied => json!({"kind": "AlreadyApplied"}),
78        ReportUsageResult::SoftBreach {
79            dimension,
80            current_usage,
81            soft_limit,
82        } => json!({
83            "kind": "SoftBreach",
84            "dimension": dimension,
85            "current_usage": current_usage,
86            "soft_limit": soft_limit,
87        }),
88        ReportUsageResult::HardBreach {
89            dimension,
90            current_usage,
91            hard_limit,
92        } => json!({
93            "kind": "HardBreach",
94            "dimension": dimension,
95            "current_usage": current_usage,
96            "hard_limit": hard_limit,
97        }),
98        // `ReportUsageResult` is `#[non_exhaustive]`; future variants
99        // land in ff-core before reaching the backend impl, so emit a
100        // safe placeholder that the replay path will reject rather
101        // than silently mis-decoding.
102        _ => json!({"kind": "Ok"}),
103    }
104}
105
106/// Inverse of [`outcome_to_json`] — used on the dedup-replay path.
107fn outcome_from_json(v: &JsonValue) -> Result<ReportUsageResult, EngineError> {
108    let kind = v.get("kind").and_then(|k| k.as_str()).ok_or_else(|| {
109        EngineError::Validation {
110            kind: ValidationKind::Corruption,
111            detail: "budget dedup outcome_json missing `kind`".into(),
112        }
113    })?;
114    match kind {
115        "Ok" => Ok(ReportUsageResult::Ok),
116        "AlreadyApplied" => Ok(ReportUsageResult::AlreadyApplied),
117        "SoftBreach" => Ok(ReportUsageResult::SoftBreach {
118            dimension: v
119                .get("dimension")
120                .and_then(|d| d.as_str())
121                .unwrap_or_default()
122                .to_string(),
123            current_usage: v.get("current_usage").and_then(|d| d.as_u64()).unwrap_or(0),
124            soft_limit: v.get("soft_limit").and_then(|d| d.as_u64()).unwrap_or(0),
125        }),
126        "HardBreach" => Ok(ReportUsageResult::HardBreach {
127            dimension: v
128                .get("dimension")
129                .and_then(|d| d.as_str())
130                .unwrap_or_default()
131                .to_string(),
132            current_usage: v.get("current_usage").and_then(|d| d.as_u64()).unwrap_or(0),
133            hard_limit: v.get("hard_limit").and_then(|d| d.as_u64()).unwrap_or(0),
134        }),
135        other => Err(EngineError::Validation {
136            kind: ValidationKind::Corruption,
137            detail: format!("budget dedup outcome_json unknown kind: {other}"),
138        }),
139    }
140}
141
142/// Extract the dimension → limit map from a `policy_json` blob, keyed
143/// by the shape documented in RFC-008 §Policy schema:
144/// `{"hard_limits": {"dim": u64}, "soft_limits": {"dim": u64}, ...}`.
145/// Missing field / non-object → empty map (= no limit on any dim).
146fn limits_from_policy(policy: &JsonValue, key: &str) -> BTreeMap<String, u64> {
147    policy
148        .get(key)
149        .and_then(|v| v.as_object())
150        .map(|obj| {
151            obj.iter()
152                .filter_map(|(k, v)| v.as_u64().map(|n| (k.clone(), n)))
153                .collect()
154        })
155        .unwrap_or_default()
156}
157
158/// Dedup-expiry window. Matches RFC-012 §R7.2.3's "caller-supplied
159/// idempotency key" retention: 24h by default.
160const DEDUP_TTL_MS: i64 = 24 * 60 * 60 * 1_000;
161
162// ───────────────────────────────────────────────────────────────────
163// §4.4.6 — `report_usage_impl` / `report_usage_admin` shared core
164// ───────────────────────────────────────────────────────────────────
165
166/// `EngineBackend::report_usage` — Postgres impl (worker-path). Thin
167/// wrapper around [`report_usage_and_check_core`].
168pub(crate) async fn report_usage_impl(
169    pool: &PgPool,
170    partition_config: &PartitionConfig,
171    budget: &BudgetId,
172    dimensions: UsageDimensions,
173) -> Result<ReportUsageResult, EngineError> {
174    report_usage_and_check_core(pool, partition_config, budget, dimensions).await
175}
176
177/// Admin-path `report_usage_admin`. Translates the admin-shape
178/// `ReportUsageAdminArgs` (parallel `dimensions` / `deltas` vectors)
179/// to `UsageDimensions` and delegates to the shared core. See RFC-020
180/// §4.4.6 "report_usage_admin entry point".
181pub(crate) async fn report_usage_admin_impl(
182    pool: &PgPool,
183    partition_config: &PartitionConfig,
184    budget: &BudgetId,
185    args: ReportUsageAdminArgs,
186) -> Result<ReportUsageResult, EngineError> {
187    if args.dimensions.len() != args.deltas.len() {
188        return Err(EngineError::Validation {
189            kind: ValidationKind::InvalidInput,
190            detail: "report_usage_admin: dimensions and deltas length mismatch".into(),
191        });
192    }
193    let mut custom: BTreeMap<String, u64> = BTreeMap::new();
194    for (d, v) in args.dimensions.into_iter().zip(args.deltas) {
195        custom.insert(d, v);
196    }
197    // `UsageDimensions` is `#[non_exhaustive]`; build via `new()` +
198    // mutate the fields we have direct access to (`custom`,
199    // `dedup_key`). `input_tokens` / `output_tokens` / `wall_ms` stay
200    // at their `Default` values — the shared core only inspects
201    // `custom` + `dedup_key`.
202    let mut ud = UsageDimensions::new();
203    ud.custom = custom;
204    ud.dedup_key = args.dedup_key;
205    report_usage_and_check_core(pool, partition_config, budget, ud).await
206}
207
208/// Shared body of `report_usage` + `report_usage_admin`. §4.4.6 lifts
209/// the Revision 5 §7.2 pin narrowly: this function now (1) loads the
210/// policy row with `FOR NO KEY UPDATE` rather than `FOR SHARE`
211/// (deadlock fix for the breach-UPDATE path) and (2) maintains the
212/// `breach_count` / `soft_breach_count` columns + `last_breach_*`
213/// metadata on the 0013 columns.
214async fn report_usage_and_check_core(
215    pool: &PgPool,
216    partition_config: &PartitionConfig,
217    budget: &BudgetId,
218    dimensions: UsageDimensions,
219) -> Result<ReportUsageResult, EngineError> {
220    let partition = budget_partition(budget, partition_config);
221    let partition_key: i16 = partition.index as i16;
222    let budget_id_str = budget.to_string();
223    let now = now_ms();
224
225    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
226
227    // RC is Postgres's default; assert explicitly per Q11.
228    sqlx::query("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
229        .execute(&mut *tx)
230        .await
231        .map_err(map_sqlx_error)?;
232
233    // ── Dedup ──
234    //
235    // When a dedup_key is provided, try to INSERT a placeholder dedup
236    // row. If the INSERT succeeds we've taken ownership and will fill
237    // the outcome_json below. If it conflicts the row already exists
238    // from a prior call — read it back and replay the cached outcome.
239    let dedup_owned = match dimensions.dedup_key.as_deref().filter(|k| !k.is_empty()) {
240        Some(dk) => {
241            let inserted = sqlx::query(
242                "INSERT INTO ff_budget_usage_dedup \
243                     (partition_key, dedup_key, outcome_json, applied_at_ms, expires_at_ms) \
244                 VALUES ($1, $2, '{}'::jsonb, $3, $4) \
245                 ON CONFLICT (partition_key, dedup_key) DO NOTHING \
246                 RETURNING applied_at_ms",
247            )
248            .bind(partition_key)
249            .bind(dk)
250            .bind(now)
251            .bind(now + DEDUP_TTL_MS)
252            .fetch_optional(&mut *tx)
253            .await
254            .map_err(map_sqlx_error)?;
255
256            if inserted.is_none() {
257                // Replay: fetch the cached outcome_json and return it.
258                let row = sqlx::query(
259                    "SELECT outcome_json FROM ff_budget_usage_dedup \
260                     WHERE partition_key = $1 AND dedup_key = $2",
261                )
262                .bind(partition_key)
263                .bind(dk)
264                .fetch_one(&mut *tx)
265                .await
266                .map_err(map_sqlx_error)?;
267                let outcome: JsonValue = row.get("outcome_json");
268                tx.commit().await.map_err(map_sqlx_error)?;
269                // Empty placeholder means a prior in-flight caller
270                // crashed before writing the outcome. Treat as
271                // AlreadyApplied so the caller sees idempotent
272                // semantics rather than double-counting.
273                if outcome.as_object().map(|o| o.is_empty()).unwrap_or(false) {
274                    return Ok(ReportUsageResult::AlreadyApplied);
275                }
276                return outcome_from_json(&outcome);
277            }
278            Some(dk.to_string())
279        }
280        None => None,
281    };
282
283    // ── Load policy row ──
284    //
285    // §4.4.6 lock-mode correction: `FOR NO KEY UPDATE` serialises the
286    // breach path deterministically (two concurrent callers both
287    // hitting hard-breach now serialise on this SELECT, not deadlock
288    // on the breach UPDATE that would otherwise need NO-KEY-UPDATE
289    // while each tx holds SHARE).
290    let policy_row = sqlx::query(
291        "SELECT policy_json FROM ff_budget_policy \
292         WHERE partition_key = $1 AND budget_id = $2 FOR NO KEY UPDATE",
293    )
294    .bind(partition_key)
295    .bind(&budget_id_str)
296    .fetch_optional(&mut *tx)
297    .await
298    .map_err(map_sqlx_error)?;
299
300    let policy: JsonValue = match policy_row {
301        Some(r) => r.get("policy_json"),
302        // No policy row ⇒ no limits defined. Treat all reports as Ok
303        // and still apply increments (Valkey parity).
304        None => JsonValue::Object(Default::default()),
305    };
306    let hard_limits = limits_from_policy(&policy, "hard_limits");
307    let soft_limits = limits_from_policy(&policy, "soft_limits");
308
309    // ── Compute admission per dimension. Hard breach on ANY dim
310    //    rejects the whole report (no increments applied); soft breach
311    //    is advisory (increments applied). Sorted-key iteration keeps
312    //    the reported dimension deterministic on multi-dim reports. ──
313    let mut per_dim_current: BTreeMap<String, u64> = BTreeMap::new();
314    for (dim, delta) in dimensions.custom.iter() {
315        let dim_key = dim_row_key(dim);
316        sqlx::query(
317            "INSERT INTO ff_budget_usage \
318                 (partition_key, budget_id, dimensions_key, current_value, updated_at_ms) \
319             VALUES ($1, $2, $3, 0, $4) \
320             ON CONFLICT (partition_key, budget_id, dimensions_key) DO NOTHING",
321        )
322        .bind(partition_key)
323        .bind(&budget_id_str)
324        .bind(&dim_key)
325        .bind(now)
326        .execute(&mut *tx)
327        .await
328        .map_err(map_sqlx_error)?;
329
330        let row = sqlx::query(
331            "SELECT current_value FROM ff_budget_usage \
332             WHERE partition_key = $1 AND budget_id = $2 AND dimensions_key = $3 \
333             FOR UPDATE",
334        )
335        .bind(partition_key)
336        .bind(&budget_id_str)
337        .bind(&dim_key)
338        .fetch_one(&mut *tx)
339        .await
340        .map_err(map_sqlx_error)?;
341        let cur: i64 = row.get("current_value");
342        let new_val = (cur as u64).saturating_add(*delta);
343
344        if let Some(hard) = hard_limits.get(dim)
345            && *hard > 0
346            && new_val > *hard
347        {
348            // §4.4.6 — Hard breach: no increments applied, maintain
349            // `breach_count` + `last_breach_*` on the policy row
350            // (mirrors Valkey `HINCRBY breach_count 1` + `HSET
351            // last_breach_at/last_breach_dim` at
352            // flowfabric.lua:6576-6580), commit dedup outcome.
353            sqlx::query(
354                "UPDATE ff_budget_policy \
355                 SET breach_count      = breach_count + 1, \
356                     last_breach_at_ms = $3, \
357                     last_breach_dim   = $4, \
358                     updated_at_ms     = $3 \
359                 WHERE partition_key = $1 AND budget_id = $2",
360            )
361            .bind(partition_key)
362            .bind(&budget_id_str)
363            .bind(now)
364            .bind(dim)
365            .execute(&mut *tx)
366            .await
367            .map_err(map_sqlx_error)?;
368
369            let outcome = ReportUsageResult::HardBreach {
370                dimension: dim.clone(),
371                current_usage: cur as u64,
372                hard_limit: *hard,
373            };
374            if let Some(dk) = dedup_owned.as_deref() {
375                finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
376            }
377            tx.commit().await.map_err(map_sqlx_error)?;
378            return Ok(outcome);
379        }
380        per_dim_current.insert(dim.clone(), new_val);
381    }
382
383    // ── Apply increments + check soft limits. Soft breach is advisory
384    //    (first-dim-wins, matching Valkey's iteration-order semantics). ──
385    let mut soft_breach: Option<ReportUsageResult> = None;
386    for (dim, delta) in dimensions.custom.iter() {
387        let dim_key = dim_row_key(dim);
388        let new_val = per_dim_current[dim];
389        sqlx::query(
390            "UPDATE ff_budget_usage \
391             SET current_value = current_value + $1, updated_at_ms = $2 \
392             WHERE partition_key = $3 AND budget_id = $4 AND dimensions_key = $5",
393        )
394        .bind(*delta as i64)
395        .bind(now)
396        .bind(partition_key)
397        .bind(&budget_id_str)
398        .bind(&dim_key)
399        .execute(&mut *tx)
400        .await
401        .map_err(map_sqlx_error)?;
402
403        if soft_breach.is_none()
404            && let Some(soft) = soft_limits.get(dim)
405            && *soft > 0
406            && new_val > *soft
407        {
408            soft_breach = Some(ReportUsageResult::SoftBreach {
409                dimension: dim.clone(),
410                current_usage: new_val,
411                soft_limit: *soft,
412            });
413        }
414    }
415
416    // §4.4.6 — on soft breach, HINCRBY soft_breach_count (Valkey
417    // parity at flowfabric.lua:6614). Hard-breach path returned early
418    // above; this block only fires on the non-hard-breach outcome.
419    if soft_breach.is_some() {
420        sqlx::query(
421            "UPDATE ff_budget_policy \
422             SET soft_breach_count = soft_breach_count + 1, \
423                 updated_at_ms     = $3 \
424             WHERE partition_key = $1 AND budget_id = $2",
425        )
426        .bind(partition_key)
427        .bind(&budget_id_str)
428        .bind(now)
429        .execute(&mut *tx)
430        .await
431        .map_err(map_sqlx_error)?;
432    }
433
434    let outcome = soft_breach.unwrap_or(ReportUsageResult::Ok);
435    if let Some(dk) = dedup_owned.as_deref() {
436        finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
437    }
438    tx.commit().await.map_err(map_sqlx_error)?;
439    Ok(outcome)
440}
441
442/// Write the final `outcome_json` onto the dedup row we inserted at
443/// the top of [`report_usage_and_check_core`].
444async fn finalize_dedup(
445    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
446    partition_key: i16,
447    dedup_key: &str,
448    outcome: &ReportUsageResult,
449) -> Result<(), EngineError> {
450    let json = outcome_to_json(outcome);
451    sqlx::query(
452        "UPDATE ff_budget_usage_dedup SET outcome_json = $1 \
453         WHERE partition_key = $2 AND dedup_key = $3",
454    )
455    .bind(json)
456    .bind(partition_key)
457    .bind(dedup_key)
458    .execute(&mut **tx)
459    .await
460    .map_err(map_sqlx_error)?;
461    Ok(())
462}
463
464/// Test-only helper: upsert a `ff_budget_policy` row directly. Pre-
465/// dates the trait-lifted `create_budget` (RFC-020 §4.4.2); retained
466/// for tests that still seed a raw `policy_json` blob + exercise the
467/// policy-row-missing branch of `report_usage_impl`.
468#[doc(hidden)]
469pub async fn upsert_policy_for_test(
470    pool: &PgPool,
471    partition_config: &PartitionConfig,
472    budget: &BudgetId,
473    policy_json: JsonValue,
474) -> Result<(), EngineError> {
475    let partition = budget_partition(budget, partition_config);
476    let partition_key: i16 = partition.index as i16;
477    let now = now_ms();
478    sqlx::query(
479        "INSERT INTO ff_budget_policy \
480             (partition_key, budget_id, policy_json, created_at_ms, updated_at_ms) \
481         VALUES ($1, $2, $3, $4, $4) \
482         ON CONFLICT (partition_key, budget_id) DO UPDATE \
483             SET policy_json = EXCLUDED.policy_json, \
484                 updated_at_ms = EXCLUDED.updated_at_ms",
485    )
486    .bind(partition_key)
487    .bind(budget.to_string())
488    .bind(policy_json)
489    .bind(now)
490    .execute(pool)
491    .await
492    .map_err(map_sqlx_error)?;
493    Ok(())
494}
495
496// ───────────────────────────────────────────────────────────────────
497// §4.4.2 — `create_budget`
498// ───────────────────────────────────────────────────────────────────
499
500/// Pack a [`CreateBudgetArgs`] into the `policy_json` blob shape
501/// `report_usage_and_check_core` + `get_budget_status` consume. Writes
502/// parallel `dimensions` / `hard_limits` / `soft_limits` vectors into
503/// `{hard_limits: {dim: u64, ...}, soft_limits: {dim: u64, ...},
504/// reset_interval_ms, on_hard_limit, on_soft_limit}` — the shape
505/// already used by `upsert_policy_for_test` + `limits_from_policy`.
506fn build_policy_json(args: &CreateBudgetArgs) -> JsonValue {
507    let mut hard = serde_json::Map::new();
508    let mut soft = serde_json::Map::new();
509    for (i, dim) in args.dimensions.iter().enumerate() {
510        if let Some(h) = args.hard_limits.get(i).copied() {
511            hard.insert(dim.clone(), json!(h));
512        }
513        if let Some(s) = args.soft_limits.get(i).copied() {
514            soft.insert(dim.clone(), json!(s));
515        }
516    }
517    json!({
518        "hard_limits": hard,
519        "soft_limits": soft,
520        "reset_interval_ms": args.reset_interval_ms,
521        "on_hard_limit": args.on_hard_limit,
522        "on_soft_limit": args.on_soft_limit,
523    })
524}
525
526pub(crate) async fn create_budget_impl(
527    pool: &PgPool,
528    partition_config: &PartitionConfig,
529    args: CreateBudgetArgs,
530) -> Result<CreateBudgetResult, EngineError> {
531    if args.dimensions.len() != args.hard_limits.len()
532        || args.dimensions.len() != args.soft_limits.len()
533    {
534        return Err(EngineError::Validation {
535            kind: ValidationKind::InvalidInput,
536            detail: "create_budget: dimensions / hard_limits / soft_limits length mismatch"
537                .into(),
538        });
539    }
540
541    let partition = budget_partition(&args.budget_id, partition_config);
542    let partition_key: i16 = partition.index as i16;
543    let budget_id = args.budget_id.clone();
544    let now: i64 = args.now.0;
545    let policy_json = build_policy_json(&args);
546    let reset_interval_ms = args.reset_interval_ms as i64;
547
548    // `next_reset_at_ms` seeded to `now + interval` when interval > 0
549    // so the `budget_reset` reconciler picks it up (RFC §4.4.3).
550    // Matches Valkey's `ff_create_budget` `ZADD resets_zset` scheduling
551    // at flowfabric.lua:6522-6526.
552    let row = sqlx::query(
553        "INSERT INTO ff_budget_policy \
554             (partition_key, budget_id, policy_json, scope_type, scope_id, \
555              enforcement_mode, breach_count, soft_breach_count, \
556              last_breach_at_ms, last_breach_dim, next_reset_at_ms, \
557              created_at_ms, updated_at_ms) \
558         VALUES ($1, $2, $3, $4, $5, $6, 0, 0, NULL, NULL, \
559                 CASE WHEN $7::bigint > 0 THEN $8::bigint + $7::bigint ELSE NULL END, \
560                 $8, $8) \
561         ON CONFLICT (partition_key, budget_id) DO NOTHING \
562         RETURNING created_at_ms",
563    )
564    .bind(partition_key)
565    .bind(budget_id.to_string())
566    .bind(policy_json)
567    .bind(&args.scope_type)
568    .bind(&args.scope_id)
569    .bind(&args.enforcement_mode)
570    .bind(reset_interval_ms)
571    .bind(now)
572    .fetch_optional(pool)
573    .await
574    .map_err(map_sqlx_error)?;
575
576    match row {
577        Some(_) => Ok(CreateBudgetResult::Created { budget_id }),
578        None => Ok(CreateBudgetResult::AlreadySatisfied { budget_id }),
579    }
580}
581
582// ───────────────────────────────────────────────────────────────────
583// §4.4.3 — `reset_budget`
584// ───────────────────────────────────────────────────────────────────
585
586pub(crate) async fn reset_budget_impl(
587    pool: &PgPool,
588    partition_config: &PartitionConfig,
589    args: ResetBudgetArgs,
590) -> Result<ResetBudgetResult, EngineError> {
591    let partition = budget_partition(&args.budget_id, partition_config);
592    let partition_key: i16 = partition.index as i16;
593    let budget_id_str = args.budget_id.to_string();
594    let now: i64 = args.now.0;
595
596    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
597    // SERIALIZABLE matches Valkey's Lua atomicity — the zero-all
598    // pattern must not see mid-flight increments from concurrent
599    // `report_usage` / `report_usage_admin`.
600    sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
601        .execute(&mut *tx)
602        .await
603        .map_err(map_sqlx_error)?;
604
605    // Lock the policy row with `FOR NO KEY UPDATE` before zeroing —
606    // serialises against a concurrent breach-UPDATE on the same row
607    // (§4.4.6 lock-mode discipline).
608    let policy_row = sqlx::query(
609        "SELECT policy_json FROM ff_budget_policy \
610         WHERE partition_key = $1 AND budget_id = $2 FOR NO KEY UPDATE",
611    )
612    .bind(partition_key)
613    .bind(&budget_id_str)
614    .fetch_optional(&mut *tx)
615    .await
616    .map_err(map_sqlx_error)?;
617    let Some(policy_row) = policy_row else {
618        tx.rollback().await.map_err(map_sqlx_error)?;
619        return Err(backend_context(
620            EngineError::NotFound { entity: "budget" },
621            format!("reset_budget: {}", args.budget_id),
622        ));
623    };
624    let policy_json: JsonValue = policy_row.get("policy_json");
625    let reset_interval_ms: i64 = policy_json
626        .get("reset_interval_ms")
627        .and_then(|v| v.as_i64())
628        .unwrap_or(0);
629
630    // Zero all usage rows for this budget.
631    sqlx::query(
632        "UPDATE ff_budget_usage \
633         SET current_value = 0, last_reset_at_ms = $3, updated_at_ms = $3 \
634         WHERE partition_key = $1 AND budget_id = $2",
635    )
636    .bind(partition_key)
637    .bind(&budget_id_str)
638    .bind(now)
639    .execute(&mut *tx)
640    .await
641    .map_err(map_sqlx_error)?;
642
643    // Reset policy metadata + reschedule.
644    let row = sqlx::query(
645        "UPDATE ff_budget_policy \
646         SET last_breach_at_ms = NULL, \
647             last_breach_dim   = NULL, \
648             updated_at_ms     = $3, \
649             next_reset_at_ms  = CASE \
650                 WHEN $4::bigint > 0 THEN $3 + $4::bigint \
651                 ELSE NULL \
652             END \
653         WHERE partition_key = $1 AND budget_id = $2 \
654         RETURNING next_reset_at_ms",
655    )
656    .bind(partition_key)
657    .bind(&budget_id_str)
658    .bind(now)
659    .bind(reset_interval_ms)
660    .fetch_one(&mut *tx)
661    .await
662    .map_err(map_sqlx_error)?;
663
664    let next_reset: Option<i64> = row
665        .try_get::<Option<i64>, _>("next_reset_at_ms")
666        .map_err(map_sqlx_error)?;
667
668    tx.commit().await.map_err(map_sqlx_error)?;
669
670    Ok(ResetBudgetResult::Reset {
671        // `ResetBudgetResult::Reset` carries a non-optional
672        // `TimestampMs`; when no interval is configured the budget
673        // has no scheduled-reset, so report `0` (matches Valkey's
674        // zero-score-when-unset behaviour for ZADD resets_zset).
675        next_reset_at: TimestampMs(next_reset.unwrap_or(0)),
676    })
677}
678
679// ───────────────────────────────────────────────────────────────────
680// §4.4.1 — `create_quota_policy`
681// ───────────────────────────────────────────────────────────────────
682
683pub(crate) async fn create_quota_policy_impl(
684    pool: &PgPool,
685    partition_config: &PartitionConfig,
686    args: CreateQuotaPolicyArgs,
687) -> Result<CreateQuotaPolicyResult, EngineError> {
688    let partition = quota_partition(&args.quota_policy_id, partition_config);
689    let partition_key: i16 = partition.index as i16;
690    let qid = args.quota_policy_id.clone();
691    let now: i64 = args.now.0;
692
693    let row = sqlx::query(
694        "INSERT INTO ff_quota_policy \
695             (partition_key, quota_policy_id, requests_per_window_seconds, \
696              max_requests_per_window, active_concurrency_cap, \
697              active_concurrency, created_at_ms, updated_at_ms) \
698         VALUES ($1, $2, $3, $4, $5, 0, $6, $6) \
699         ON CONFLICT (partition_key, quota_policy_id) DO NOTHING \
700         RETURNING created_at_ms",
701    )
702    .bind(partition_key)
703    .bind(qid.to_string())
704    .bind(args.window_seconds as i64)
705    .bind(args.max_requests_per_window as i64)
706    .bind(args.max_concurrent as i64)
707    .bind(now)
708    .fetch_optional(pool)
709    .await
710    .map_err(map_sqlx_error)?;
711
712    match row {
713        Some(_) => Ok(CreateQuotaPolicyResult::Created {
714            quota_policy_id: qid,
715        }),
716        None => Ok(CreateQuotaPolicyResult::AlreadySatisfied {
717            quota_policy_id: qid,
718        }),
719    }
720}
721
722// ───────────────────────────────────────────────────────────────────
723// §4.4.7 — `get_budget_status`
724// ───────────────────────────────────────────────────────────────────
725
726pub(crate) async fn get_budget_status_impl(
727    pool: &PgPool,
728    partition_config: &PartitionConfig,
729    budget: &BudgetId,
730) -> Result<BudgetStatus, EngineError> {
731    let partition = budget_partition(budget, partition_config);
732    let partition_key: i16 = partition.index as i16;
733    let budget_id_str = budget.to_string();
734
735    // SELECT 1: policy row (definitional + counters + scheduler).
736    let policy_row = sqlx::query(
737        "SELECT scope_type, scope_id, enforcement_mode, \
738                breach_count, soft_breach_count, \
739                last_breach_at_ms, last_breach_dim, \
740                next_reset_at_ms, created_at_ms, \
741                policy_json \
742         FROM ff_budget_policy \
743         WHERE partition_key = $1 AND budget_id = $2",
744    )
745    .bind(partition_key)
746    .bind(&budget_id_str)
747    .fetch_optional(pool)
748    .await
749    .map_err(map_sqlx_error)?;
750    let Some(policy_row) = policy_row else {
751        return Err(backend_context(
752            EngineError::NotFound { entity: "budget" },
753            format!("get_budget_status: {budget}"),
754        ));
755    };
756
757    let scope_type: String = policy_row.get("scope_type");
758    let scope_id: String = policy_row.get("scope_id");
759    let enforcement_mode: String = policy_row.get("enforcement_mode");
760    let breach_count: i64 = policy_row.get("breach_count");
761    let soft_breach_count: i64 = policy_row.get("soft_breach_count");
762    let last_breach_at_ms: Option<i64> = policy_row
763        .try_get::<Option<i64>, _>("last_breach_at_ms")
764        .map_err(map_sqlx_error)?;
765    let last_breach_dim: Option<String> = policy_row
766        .try_get::<Option<String>, _>("last_breach_dim")
767        .map_err(map_sqlx_error)?;
768    let next_reset_at_ms: Option<i64> = policy_row
769        .try_get::<Option<i64>, _>("next_reset_at_ms")
770        .map_err(map_sqlx_error)?;
771    let created_at_ms: i64 = policy_row.get("created_at_ms");
772    let policy_json: JsonValue = policy_row.get("policy_json");
773
774    // SELECT 2: usage rows (one per dimension).
775    let usage_rows = sqlx::query(
776        "SELECT dimensions_key, current_value \
777         FROM ff_budget_usage \
778         WHERE partition_key = $1 AND budget_id = $2",
779    )
780    .bind(partition_key)
781    .bind(&budget_id_str)
782    .fetch_all(pool)
783    .await
784    .map_err(map_sqlx_error)?;
785
786    let mut usage: HashMap<String, u64> = HashMap::new();
787    for r in &usage_rows {
788        let key: String = r.get("dimensions_key");
789        let val: i64 = r.get("current_value");
790        if key == "_init" {
791            continue;
792        }
793        usage.insert(key, val.max(0) as u64);
794    }
795
796    // Limits parse from policy_json (shape documented on
797    // `build_policy_json` above; matches Valkey's 3× HGETALL shape
798    // byte-for-byte post-stringification).
799    let hard_limits: HashMap<String, u64> =
800        limits_from_policy(&policy_json, "hard_limits").into_iter().collect();
801    let soft_limits: HashMap<String, u64> =
802        limits_from_policy(&policy_json, "soft_limits").into_iter().collect();
803
804    // Valkey stringifies i64 timestamps; mirror byte-for-byte so the
805    // contract shape is identical across backends.
806    let fmt_opt = |v: Option<i64>| -> Option<String> { v.map(|n| n.to_string()) };
807
808    Ok(BudgetStatus {
809        budget_id: budget.to_string(),
810        scope_type,
811        scope_id,
812        enforcement_mode,
813        usage,
814        hard_limits,
815        soft_limits,
816        breach_count: breach_count.max(0) as u64,
817        soft_breach_count: soft_breach_count.max(0) as u64,
818        last_breach_at: fmt_opt(last_breach_at_ms),
819        last_breach_dim,
820        next_reset_at: fmt_opt(next_reset_at_ms),
821        created_at: Some(created_at_ms.to_string()),
822    })
823}
824
825// ───────────────────────────────────────────────────────────────────
826// §4.4.3 — `budget_reset` reconciler entry point
827// ───────────────────────────────────────────────────────────────────
828
829/// Called per row returned by the `budget_reset` reconciler scan.
830/// Looks up the policy's `reset_interval_ms` from `policy_json`,
831/// zeroes usage rows + resets breach metadata + reschedules
832/// `next_reset_at_ms` under SERIALIZABLE. Same tx shape as
833/// `reset_budget_impl` but keyed off the scanner-supplied
834/// `(partition_key, budget_id)` pair rather than a caller-supplied
835/// `BudgetId`.
836pub(crate) async fn budget_reset_reconciler_apply(
837    pool: &PgPool,
838    partition_key: i16,
839    budget_id: &str,
840    now: i64,
841) -> Result<(), EngineError> {
842    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
843    sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
844        .execute(&mut *tx)
845        .await
846        .map_err(map_sqlx_error)?;
847
848    let policy_row = sqlx::query(
849        "SELECT policy_json, next_reset_at_ms FROM ff_budget_policy \
850         WHERE partition_key = $1 AND budget_id = $2 FOR NO KEY UPDATE",
851    )
852    .bind(partition_key)
853    .bind(budget_id)
854    .fetch_optional(&mut *tx)
855    .await
856    .map_err(map_sqlx_error)?;
857    let Some(policy_row) = policy_row else {
858        tx.rollback().await.map_err(map_sqlx_error)?;
859        return Ok(());
860    };
861    // Defensive re-check: a concurrent `reset_budget` may have already
862    // advanced `next_reset_at_ms` past `now`. If so this scanner tick
863    // is a no-op.
864    let next_reset: Option<i64> = policy_row
865        .try_get::<Option<i64>, _>("next_reset_at_ms")
866        .map_err(map_sqlx_error)?;
867    if !matches!(next_reset, Some(n) if n <= now) {
868        tx.rollback().await.map_err(map_sqlx_error)?;
869        return Ok(());
870    }
871    let policy_json: JsonValue = policy_row.get("policy_json");
872    let reset_interval_ms: i64 = policy_json
873        .get("reset_interval_ms")
874        .and_then(|v| v.as_i64())
875        .unwrap_or(0);
876
877    sqlx::query(
878        "UPDATE ff_budget_usage \
879         SET current_value = 0, last_reset_at_ms = $3, updated_at_ms = $3 \
880         WHERE partition_key = $1 AND budget_id = $2",
881    )
882    .bind(partition_key)
883    .bind(budget_id)
884    .bind(now)
885    .execute(&mut *tx)
886    .await
887    .map_err(map_sqlx_error)?;
888
889    sqlx::query(
890        "UPDATE ff_budget_policy \
891         SET last_breach_at_ms = NULL, \
892             last_breach_dim   = NULL, \
893             updated_at_ms     = $3, \
894             next_reset_at_ms  = CASE \
895                 WHEN $4::bigint > 0 THEN $3 + $4::bigint \
896                 ELSE NULL \
897             END \
898         WHERE partition_key = $1 AND budget_id = $2",
899    )
900    .bind(partition_key)
901    .bind(budget_id)
902    .bind(now)
903    .bind(reset_interval_ms)
904    .execute(&mut *tx)
905    .await
906    .map_err(map_sqlx_error)?;
907
908    tx.commit().await.map_err(map_sqlx_error)?;
909    Ok(())
910}