1use std::collections::{BTreeMap, HashMap};
29
30use ff_core::backend::UsageDimensions;
31use ff_core::contracts::{
32 BudgetStatus, CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs,
33 CreateQuotaPolicyResult, RecordSpendArgs, ReleaseBudgetArgs, ReportUsageAdminArgs,
34 ReportUsageResult, ResetBudgetArgs, ResetBudgetResult,
35};
36use ff_core::engine_error::{backend_context, EngineError, ValidationKind};
37use ff_core::partition::{budget_partition, quota_partition, PartitionConfig};
38use ff_core::types::{BudgetId, ExecutionId, TimestampMs};
39use serde_json::{json, Value as JsonValue};
40use sqlx::{PgPool, Row};
41use uuid::Uuid;
42
43use crate::error::map_sqlx_error;
44
45fn dim_row_key(name: &str) -> String {
54 name.to_string()
55}
56
57fn now_ms() -> i64 {
60 std::time::SystemTime::now()
61 .duration_since(std::time::UNIX_EPOCH)
62 .map(|d| d.as_millis() as i64)
63 .unwrap_or(0)
64}
65
66fn outcome_to_json(r: &ReportUsageResult) -> JsonValue {
76 match r {
77 ReportUsageResult::Ok => json!({"kind": "Ok"}),
78 ReportUsageResult::AlreadyApplied => json!({"kind": "AlreadyApplied"}),
79 ReportUsageResult::SoftBreach {
80 dimension,
81 current_usage,
82 soft_limit,
83 } => json!({
84 "kind": "SoftBreach",
85 "dimension": dimension,
86 "current_usage": current_usage,
87 "soft_limit": soft_limit,
88 }),
89 ReportUsageResult::HardBreach {
90 dimension,
91 current_usage,
92 hard_limit,
93 } => json!({
94 "kind": "HardBreach",
95 "dimension": dimension,
96 "current_usage": current_usage,
97 "hard_limit": hard_limit,
98 }),
99 _ => json!({"kind": "Ok"}),
104 }
105}
106
107fn outcome_from_json(v: &JsonValue) -> Result<ReportUsageResult, EngineError> {
109 let kind = v.get("kind").and_then(|k| k.as_str()).ok_or_else(|| {
110 EngineError::Validation {
111 kind: ValidationKind::Corruption,
112 detail: "budget dedup outcome_json missing `kind`".into(),
113 }
114 })?;
115 match kind {
116 "Ok" => Ok(ReportUsageResult::Ok),
117 "AlreadyApplied" => Ok(ReportUsageResult::AlreadyApplied),
118 "SoftBreach" => Ok(ReportUsageResult::SoftBreach {
119 dimension: v
120 .get("dimension")
121 .and_then(|d| d.as_str())
122 .unwrap_or_default()
123 .to_string(),
124 current_usage: v.get("current_usage").and_then(|d| d.as_u64()).unwrap_or(0),
125 soft_limit: v.get("soft_limit").and_then(|d| d.as_u64()).unwrap_or(0),
126 }),
127 "HardBreach" => Ok(ReportUsageResult::HardBreach {
128 dimension: v
129 .get("dimension")
130 .and_then(|d| d.as_str())
131 .unwrap_or_default()
132 .to_string(),
133 current_usage: v.get("current_usage").and_then(|d| d.as_u64()).unwrap_or(0),
134 hard_limit: v.get("hard_limit").and_then(|d| d.as_u64()).unwrap_or(0),
135 }),
136 other => Err(EngineError::Validation {
137 kind: ValidationKind::Corruption,
138 detail: format!("budget dedup outcome_json unknown kind: {other}"),
139 }),
140 }
141}
142
143fn limits_from_policy(policy: &JsonValue, key: &str) -> BTreeMap<String, u64> {
148 policy
149 .get(key)
150 .and_then(|v| v.as_object())
151 .map(|obj| {
152 obj.iter()
153 .filter_map(|(k, v)| v.as_u64().map(|n| (k.clone(), n)))
154 .collect()
155 })
156 .unwrap_or_default()
157}
158
159const DEDUP_TTL_MS: i64 = 24 * 60 * 60 * 1_000;
162
163pub(crate) async fn report_usage_impl(
170 pool: &PgPool,
171 partition_config: &PartitionConfig,
172 budget: &BudgetId,
173 dimensions: UsageDimensions,
174) -> Result<ReportUsageResult, EngineError> {
175 report_usage_and_check_core(pool, partition_config, budget, dimensions).await
176}
177
178pub(crate) async fn report_usage_admin_impl(
183 pool: &PgPool,
184 partition_config: &PartitionConfig,
185 budget: &BudgetId,
186 args: ReportUsageAdminArgs,
187) -> Result<ReportUsageResult, EngineError> {
188 if args.dimensions.len() != args.deltas.len() {
189 return Err(EngineError::Validation {
190 kind: ValidationKind::InvalidInput,
191 detail: "report_usage_admin: dimensions and deltas length mismatch".into(),
192 });
193 }
194 let mut custom: BTreeMap<String, u64> = BTreeMap::new();
195 for (d, v) in args.dimensions.into_iter().zip(args.deltas) {
196 custom.insert(d, v);
197 }
198 let mut ud = UsageDimensions::new();
204 ud.custom = custom;
205 ud.dedup_key = args.dedup_key;
206 report_usage_and_check_core(pool, partition_config, budget, ud).await
207}
208
209async fn report_usage_and_check_core(
216 pool: &PgPool,
217 partition_config: &PartitionConfig,
218 budget: &BudgetId,
219 dimensions: UsageDimensions,
220) -> Result<ReportUsageResult, EngineError> {
221 let partition = budget_partition(budget, partition_config);
222 let partition_key: i16 = partition.index as i16;
223 let budget_id_str = budget.to_string();
224 let now = now_ms();
225
226 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
227
228 sqlx::query("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
230 .execute(&mut *tx)
231 .await
232 .map_err(map_sqlx_error)?;
233
234 let dedup_owned = match dimensions.dedup_key.as_deref().filter(|k| !k.is_empty()) {
241 Some(dk) => {
242 let inserted = sqlx::query(
243 "INSERT INTO ff_budget_usage_dedup \
244 (partition_key, dedup_key, outcome_json, applied_at_ms, expires_at_ms) \
245 VALUES ($1, $2, '{}'::jsonb, $3, $4) \
246 ON CONFLICT (partition_key, dedup_key) DO NOTHING \
247 RETURNING applied_at_ms",
248 )
249 .bind(partition_key)
250 .bind(dk)
251 .bind(now)
252 .bind(now + DEDUP_TTL_MS)
253 .fetch_optional(&mut *tx)
254 .await
255 .map_err(map_sqlx_error)?;
256
257 if inserted.is_none() {
258 let row = sqlx::query(
260 "SELECT outcome_json FROM ff_budget_usage_dedup \
261 WHERE partition_key = $1 AND dedup_key = $2",
262 )
263 .bind(partition_key)
264 .bind(dk)
265 .fetch_one(&mut *tx)
266 .await
267 .map_err(map_sqlx_error)?;
268 let outcome: JsonValue = row.get("outcome_json");
269 tx.commit().await.map_err(map_sqlx_error)?;
270 if outcome.as_object().map(|o| o.is_empty()).unwrap_or(false) {
275 return Ok(ReportUsageResult::AlreadyApplied);
276 }
277 return outcome_from_json(&outcome);
278 }
279 Some(dk.to_string())
280 }
281 None => None,
282 };
283
284 let policy_row = sqlx::query(
292 "SELECT policy_json FROM ff_budget_policy \
293 WHERE partition_key = $1 AND budget_id = $2 FOR NO KEY UPDATE",
294 )
295 .bind(partition_key)
296 .bind(&budget_id_str)
297 .fetch_optional(&mut *tx)
298 .await
299 .map_err(map_sqlx_error)?;
300
301 let policy: JsonValue = match policy_row {
302 Some(r) => r.get("policy_json"),
303 None => JsonValue::Object(Default::default()),
306 };
307 let hard_limits = limits_from_policy(&policy, "hard_limits");
308 let soft_limits = limits_from_policy(&policy, "soft_limits");
309
310 let mut per_dim_current: BTreeMap<String, u64> = BTreeMap::new();
315 for (dim, delta) in dimensions.custom.iter() {
316 let dim_key = dim_row_key(dim);
317 sqlx::query(
318 "INSERT INTO ff_budget_usage \
319 (partition_key, budget_id, dimensions_key, current_value, updated_at_ms) \
320 VALUES ($1, $2, $3, 0, $4) \
321 ON CONFLICT (partition_key, budget_id, dimensions_key) DO NOTHING",
322 )
323 .bind(partition_key)
324 .bind(&budget_id_str)
325 .bind(&dim_key)
326 .bind(now)
327 .execute(&mut *tx)
328 .await
329 .map_err(map_sqlx_error)?;
330
331 let row = sqlx::query(
332 "SELECT current_value FROM ff_budget_usage \
333 WHERE partition_key = $1 AND budget_id = $2 AND dimensions_key = $3 \
334 FOR UPDATE",
335 )
336 .bind(partition_key)
337 .bind(&budget_id_str)
338 .bind(&dim_key)
339 .fetch_one(&mut *tx)
340 .await
341 .map_err(map_sqlx_error)?;
342 let cur: i64 = row.get("current_value");
343 let new_val = (cur as u64).saturating_add(*delta);
344
345 if let Some(hard) = hard_limits.get(dim)
346 && *hard > 0
347 && new_val > *hard
348 {
349 sqlx::query(
355 "UPDATE ff_budget_policy \
356 SET breach_count = breach_count + 1, \
357 last_breach_at_ms = $3, \
358 last_breach_dim = $4, \
359 updated_at_ms = $3 \
360 WHERE partition_key = $1 AND budget_id = $2",
361 )
362 .bind(partition_key)
363 .bind(&budget_id_str)
364 .bind(now)
365 .bind(dim)
366 .execute(&mut *tx)
367 .await
368 .map_err(map_sqlx_error)?;
369
370 let outcome = ReportUsageResult::HardBreach {
371 dimension: dim.clone(),
372 current_usage: cur as u64,
373 hard_limit: *hard,
374 };
375 if let Some(dk) = dedup_owned.as_deref() {
376 finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
377 }
378 tx.commit().await.map_err(map_sqlx_error)?;
379 return Ok(outcome);
380 }
381 per_dim_current.insert(dim.clone(), new_val);
382 }
383
384 let mut soft_breach: Option<ReportUsageResult> = None;
387 for (dim, delta) in dimensions.custom.iter() {
388 let dim_key = dim_row_key(dim);
389 let new_val = per_dim_current[dim];
390 sqlx::query(
391 "UPDATE ff_budget_usage \
392 SET current_value = current_value + $1, updated_at_ms = $2 \
393 WHERE partition_key = $3 AND budget_id = $4 AND dimensions_key = $5",
394 )
395 .bind(*delta as i64)
396 .bind(now)
397 .bind(partition_key)
398 .bind(&budget_id_str)
399 .bind(&dim_key)
400 .execute(&mut *tx)
401 .await
402 .map_err(map_sqlx_error)?;
403
404 if soft_breach.is_none()
405 && let Some(soft) = soft_limits.get(dim)
406 && *soft > 0
407 && new_val > *soft
408 {
409 soft_breach = Some(ReportUsageResult::SoftBreach {
410 dimension: dim.clone(),
411 current_usage: new_val,
412 soft_limit: *soft,
413 });
414 }
415 }
416
417 if soft_breach.is_some() {
421 sqlx::query(
422 "UPDATE ff_budget_policy \
423 SET soft_breach_count = soft_breach_count + 1, \
424 updated_at_ms = $3 \
425 WHERE partition_key = $1 AND budget_id = $2",
426 )
427 .bind(partition_key)
428 .bind(&budget_id_str)
429 .bind(now)
430 .execute(&mut *tx)
431 .await
432 .map_err(map_sqlx_error)?;
433 }
434
435 let outcome = soft_breach.unwrap_or(ReportUsageResult::Ok);
436 if let Some(dk) = dedup_owned.as_deref() {
437 finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
438 }
439 tx.commit().await.map_err(map_sqlx_error)?;
440 Ok(outcome)
441}
442
443async fn finalize_dedup(
446 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
447 partition_key: i16,
448 dedup_key: &str,
449 outcome: &ReportUsageResult,
450) -> Result<(), EngineError> {
451 let json = outcome_to_json(outcome);
452 sqlx::query(
453 "UPDATE ff_budget_usage_dedup SET outcome_json = $1 \
454 WHERE partition_key = $2 AND dedup_key = $3",
455 )
456 .bind(json)
457 .bind(partition_key)
458 .bind(dedup_key)
459 .execute(&mut **tx)
460 .await
461 .map_err(map_sqlx_error)?;
462 Ok(())
463}
464
465#[doc(hidden)]
470pub async fn upsert_policy_for_test(
471 pool: &PgPool,
472 partition_config: &PartitionConfig,
473 budget: &BudgetId,
474 policy_json: JsonValue,
475) -> Result<(), EngineError> {
476 let partition = budget_partition(budget, partition_config);
477 let partition_key: i16 = partition.index as i16;
478 let now = now_ms();
479 sqlx::query(
480 "INSERT INTO ff_budget_policy \
481 (partition_key, budget_id, policy_json, created_at_ms, updated_at_ms) \
482 VALUES ($1, $2, $3, $4, $4) \
483 ON CONFLICT (partition_key, budget_id) DO UPDATE \
484 SET policy_json = EXCLUDED.policy_json, \
485 updated_at_ms = EXCLUDED.updated_at_ms",
486 )
487 .bind(partition_key)
488 .bind(budget.to_string())
489 .bind(policy_json)
490 .bind(now)
491 .execute(pool)
492 .await
493 .map_err(map_sqlx_error)?;
494 Ok(())
495}
496
497fn build_policy_json(args: &CreateBudgetArgs) -> JsonValue {
508 let mut hard = serde_json::Map::new();
509 let mut soft = serde_json::Map::new();
510 for (i, dim) in args.dimensions.iter().enumerate() {
511 if let Some(h) = args.hard_limits.get(i).copied() {
512 hard.insert(dim.clone(), json!(h));
513 }
514 if let Some(s) = args.soft_limits.get(i).copied() {
515 soft.insert(dim.clone(), json!(s));
516 }
517 }
518 json!({
519 "hard_limits": hard,
520 "soft_limits": soft,
521 "reset_interval_ms": args.reset_interval_ms,
522 "on_hard_limit": args.on_hard_limit,
523 "on_soft_limit": args.on_soft_limit,
524 })
525}
526
527pub(crate) async fn create_budget_impl(
528 pool: &PgPool,
529 partition_config: &PartitionConfig,
530 args: CreateBudgetArgs,
531) -> Result<CreateBudgetResult, EngineError> {
532 if args.dimensions.len() != args.hard_limits.len()
533 || args.dimensions.len() != args.soft_limits.len()
534 {
535 return Err(EngineError::Validation {
536 kind: ValidationKind::InvalidInput,
537 detail: "create_budget: dimensions / hard_limits / soft_limits length mismatch"
538 .into(),
539 });
540 }
541
542 let partition = budget_partition(&args.budget_id, partition_config);
543 let partition_key: i16 = partition.index as i16;
544 let budget_id = args.budget_id.clone();
545 let now: i64 = args.now.0;
546 let policy_json = build_policy_json(&args);
547 let reset_interval_ms = args.reset_interval_ms as i64;
548
549 let row = sqlx::query(
554 "INSERT INTO ff_budget_policy \
555 (partition_key, budget_id, policy_json, scope_type, scope_id, \
556 enforcement_mode, breach_count, soft_breach_count, \
557 last_breach_at_ms, last_breach_dim, next_reset_at_ms, \
558 created_at_ms, updated_at_ms) \
559 VALUES ($1, $2, $3, $4, $5, $6, 0, 0, NULL, NULL, \
560 CASE WHEN $7::bigint > 0 THEN $8::bigint + $7::bigint ELSE NULL END, \
561 $8, $8) \
562 ON CONFLICT (partition_key, budget_id) DO NOTHING \
563 RETURNING created_at_ms",
564 )
565 .bind(partition_key)
566 .bind(budget_id.to_string())
567 .bind(policy_json)
568 .bind(&args.scope_type)
569 .bind(&args.scope_id)
570 .bind(&args.enforcement_mode)
571 .bind(reset_interval_ms)
572 .bind(now)
573 .fetch_optional(pool)
574 .await
575 .map_err(map_sqlx_error)?;
576
577 match row {
578 Some(_) => Ok(CreateBudgetResult::Created { budget_id }),
579 None => Ok(CreateBudgetResult::AlreadySatisfied { budget_id }),
580 }
581}
582
583pub(crate) async fn reset_budget_impl(
588 pool: &PgPool,
589 partition_config: &PartitionConfig,
590 args: ResetBudgetArgs,
591) -> Result<ResetBudgetResult, EngineError> {
592 let partition = budget_partition(&args.budget_id, partition_config);
593 let partition_key: i16 = partition.index as i16;
594 let budget_id_str = args.budget_id.to_string();
595 let now: i64 = args.now.0;
596
597 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
598 sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
602 .execute(&mut *tx)
603 .await
604 .map_err(map_sqlx_error)?;
605
606 let policy_row = sqlx::query(
610 "SELECT policy_json FROM ff_budget_policy \
611 WHERE partition_key = $1 AND budget_id = $2 FOR NO KEY UPDATE",
612 )
613 .bind(partition_key)
614 .bind(&budget_id_str)
615 .fetch_optional(&mut *tx)
616 .await
617 .map_err(map_sqlx_error)?;
618 let Some(policy_row) = policy_row else {
619 tx.rollback().await.map_err(map_sqlx_error)?;
620 return Err(backend_context(
621 EngineError::NotFound { entity: "budget" },
622 format!("reset_budget: {}", args.budget_id),
623 ));
624 };
625 let policy_json: JsonValue = policy_row.get("policy_json");
626 let reset_interval_ms: i64 = policy_json
627 .get("reset_interval_ms")
628 .and_then(|v| v.as_i64())
629 .unwrap_or(0);
630
631 sqlx::query(
633 "UPDATE ff_budget_usage \
634 SET current_value = 0, last_reset_at_ms = $3, updated_at_ms = $3 \
635 WHERE partition_key = $1 AND budget_id = $2",
636 )
637 .bind(partition_key)
638 .bind(&budget_id_str)
639 .bind(now)
640 .execute(&mut *tx)
641 .await
642 .map_err(map_sqlx_error)?;
643
644 let row = sqlx::query(
646 "UPDATE ff_budget_policy \
647 SET last_breach_at_ms = NULL, \
648 last_breach_dim = NULL, \
649 updated_at_ms = $3, \
650 next_reset_at_ms = CASE \
651 WHEN $4::bigint > 0 THEN $3 + $4::bigint \
652 ELSE NULL \
653 END \
654 WHERE partition_key = $1 AND budget_id = $2 \
655 RETURNING next_reset_at_ms",
656 )
657 .bind(partition_key)
658 .bind(&budget_id_str)
659 .bind(now)
660 .bind(reset_interval_ms)
661 .fetch_one(&mut *tx)
662 .await
663 .map_err(map_sqlx_error)?;
664
665 let next_reset: Option<i64> = row
666 .try_get::<Option<i64>, _>("next_reset_at_ms")
667 .map_err(map_sqlx_error)?;
668
669 tx.commit().await.map_err(map_sqlx_error)?;
670
671 Ok(ResetBudgetResult::Reset {
672 next_reset_at: TimestampMs(next_reset.unwrap_or(0)),
677 })
678}
679
680pub(crate) async fn create_quota_policy_impl(
685 pool: &PgPool,
686 partition_config: &PartitionConfig,
687 args: CreateQuotaPolicyArgs,
688) -> Result<CreateQuotaPolicyResult, EngineError> {
689 let partition = quota_partition(&args.quota_policy_id, partition_config);
690 let partition_key: i16 = partition.index as i16;
691 let qid = args.quota_policy_id.clone();
692 let now: i64 = args.now.0;
693
694 let row = sqlx::query(
695 "INSERT INTO ff_quota_policy \
696 (partition_key, quota_policy_id, requests_per_window_seconds, \
697 max_requests_per_window, active_concurrency_cap, \
698 active_concurrency, created_at_ms, updated_at_ms) \
699 VALUES ($1, $2, $3, $4, $5, 0, $6, $6) \
700 ON CONFLICT (partition_key, quota_policy_id) DO NOTHING \
701 RETURNING created_at_ms",
702 )
703 .bind(partition_key)
704 .bind(qid.to_string())
705 .bind(args.window_seconds as i64)
706 .bind(args.max_requests_per_window as i64)
707 .bind(args.max_concurrent as i64)
708 .bind(now)
709 .fetch_optional(pool)
710 .await
711 .map_err(map_sqlx_error)?;
712
713 match row {
714 Some(_) => Ok(CreateQuotaPolicyResult::Created {
715 quota_policy_id: qid,
716 }),
717 None => Ok(CreateQuotaPolicyResult::AlreadySatisfied {
718 quota_policy_id: qid,
719 }),
720 }
721}
722
723pub(crate) async fn get_budget_status_impl(
728 pool: &PgPool,
729 partition_config: &PartitionConfig,
730 budget: &BudgetId,
731) -> Result<BudgetStatus, EngineError> {
732 let partition = budget_partition(budget, partition_config);
733 let partition_key: i16 = partition.index as i16;
734 let budget_id_str = budget.to_string();
735
736 let policy_row = sqlx::query(
738 "SELECT scope_type, scope_id, enforcement_mode, \
739 breach_count, soft_breach_count, \
740 last_breach_at_ms, last_breach_dim, \
741 next_reset_at_ms, created_at_ms, \
742 policy_json \
743 FROM ff_budget_policy \
744 WHERE partition_key = $1 AND budget_id = $2",
745 )
746 .bind(partition_key)
747 .bind(&budget_id_str)
748 .fetch_optional(pool)
749 .await
750 .map_err(map_sqlx_error)?;
751 let Some(policy_row) = policy_row else {
752 return Err(backend_context(
753 EngineError::NotFound { entity: "budget" },
754 format!("get_budget_status: {budget}"),
755 ));
756 };
757
758 let scope_type: String = policy_row.get("scope_type");
759 let scope_id: String = policy_row.get("scope_id");
760 let enforcement_mode: String = policy_row.get("enforcement_mode");
761 let breach_count: i64 = policy_row.get("breach_count");
762 let soft_breach_count: i64 = policy_row.get("soft_breach_count");
763 let last_breach_at_ms: Option<i64> = policy_row
764 .try_get::<Option<i64>, _>("last_breach_at_ms")
765 .map_err(map_sqlx_error)?;
766 let last_breach_dim: Option<String> = policy_row
767 .try_get::<Option<String>, _>("last_breach_dim")
768 .map_err(map_sqlx_error)?;
769 let next_reset_at_ms: Option<i64> = policy_row
770 .try_get::<Option<i64>, _>("next_reset_at_ms")
771 .map_err(map_sqlx_error)?;
772 let created_at_ms: i64 = policy_row.get("created_at_ms");
773 let policy_json: JsonValue = policy_row.get("policy_json");
774
775 let usage_rows = sqlx::query(
777 "SELECT dimensions_key, current_value \
778 FROM ff_budget_usage \
779 WHERE partition_key = $1 AND budget_id = $2",
780 )
781 .bind(partition_key)
782 .bind(&budget_id_str)
783 .fetch_all(pool)
784 .await
785 .map_err(map_sqlx_error)?;
786
787 let mut usage: HashMap<String, u64> = HashMap::new();
788 for r in &usage_rows {
789 let key: String = r.get("dimensions_key");
790 let val: i64 = r.get("current_value");
791 if key == "_init" {
792 continue;
793 }
794 usage.insert(key, val.max(0) as u64);
795 }
796
797 let hard_limits: HashMap<String, u64> =
801 limits_from_policy(&policy_json, "hard_limits").into_iter().collect();
802 let soft_limits: HashMap<String, u64> =
803 limits_from_policy(&policy_json, "soft_limits").into_iter().collect();
804
805 let fmt_opt = |v: Option<i64>| -> Option<String> { v.map(|n| n.to_string()) };
808
809 Ok(BudgetStatus {
810 budget_id: budget.to_string(),
811 scope_type,
812 scope_id,
813 enforcement_mode,
814 usage,
815 hard_limits,
816 soft_limits,
817 breach_count: breach_count.max(0) as u64,
818 soft_breach_count: soft_breach_count.max(0) as u64,
819 last_breach_at: fmt_opt(last_breach_at_ms),
820 last_breach_dim,
821 next_reset_at: fmt_opt(next_reset_at_ms),
822 created_at: Some(created_at_ms.to_string()),
823 })
824}
825
826pub(crate) async fn budget_reset_reconciler_apply(
838 pool: &PgPool,
839 partition_key: i16,
840 budget_id: &str,
841 now: i64,
842) -> Result<(), EngineError> {
843 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
844 sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
845 .execute(&mut *tx)
846 .await
847 .map_err(map_sqlx_error)?;
848
849 let policy_row = sqlx::query(
850 "SELECT policy_json, next_reset_at_ms FROM ff_budget_policy \
851 WHERE partition_key = $1 AND budget_id = $2 FOR NO KEY UPDATE",
852 )
853 .bind(partition_key)
854 .bind(budget_id)
855 .fetch_optional(&mut *tx)
856 .await
857 .map_err(map_sqlx_error)?;
858 let Some(policy_row) = policy_row else {
859 tx.rollback().await.map_err(map_sqlx_error)?;
860 return Ok(());
861 };
862 let next_reset: Option<i64> = policy_row
866 .try_get::<Option<i64>, _>("next_reset_at_ms")
867 .map_err(map_sqlx_error)?;
868 if !matches!(next_reset, Some(n) if n <= now) {
869 tx.rollback().await.map_err(map_sqlx_error)?;
870 return Ok(());
871 }
872 let policy_json: JsonValue = policy_row.get("policy_json");
873 let reset_interval_ms: i64 = policy_json
874 .get("reset_interval_ms")
875 .and_then(|v| v.as_i64())
876 .unwrap_or(0);
877
878 sqlx::query(
879 "UPDATE ff_budget_usage \
880 SET current_value = 0, last_reset_at_ms = $3, updated_at_ms = $3 \
881 WHERE partition_key = $1 AND budget_id = $2",
882 )
883 .bind(partition_key)
884 .bind(budget_id)
885 .bind(now)
886 .execute(&mut *tx)
887 .await
888 .map_err(map_sqlx_error)?;
889
890 sqlx::query(
891 "UPDATE ff_budget_policy \
892 SET last_breach_at_ms = NULL, \
893 last_breach_dim = NULL, \
894 updated_at_ms = $3, \
895 next_reset_at_ms = CASE \
896 WHEN $4::bigint > 0 THEN $3 + $4::bigint \
897 ELSE NULL \
898 END \
899 WHERE partition_key = $1 AND budget_id = $2",
900 )
901 .bind(partition_key)
902 .bind(budget_id)
903 .bind(now)
904 .bind(reset_interval_ms)
905 .execute(&mut *tx)
906 .await
907 .map_err(map_sqlx_error)?;
908
909 tx.commit().await.map_err(map_sqlx_error)?;
910 Ok(())
911}
912
913fn exec_uuid(eid: &ExecutionId) -> Result<Uuid, EngineError> {
923 let s = eid.as_str();
924 let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
925 kind: ValidationKind::InvalidInput,
926 detail: format!("execution_id missing `{{fp:` prefix: {s}"),
927 })?;
928 let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
929 kind: ValidationKind::InvalidInput,
930 detail: format!("execution_id missing `}}:`: {s}"),
931 })?;
932 Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
933 kind: ValidationKind::InvalidInput,
934 detail: format!("execution_id UUID invalid: {s}"),
935 })
936}
937
938pub(crate) async fn record_spend_impl(
954 pool: &PgPool,
955 partition_config: &PartitionConfig,
956 args: RecordSpendArgs,
957) -> Result<ReportUsageResult, EngineError> {
958 let partition = budget_partition(&args.budget_id, partition_config);
959 let partition_key: i16 = partition.index as i16;
960 let budget_id_str = args.budget_id.to_string();
961 let exec_uuid = exec_uuid(&args.execution_id)?;
962 let now = now_ms();
963
964 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
965 sqlx::query("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
966 .execute(&mut *tx)
967 .await
968 .map_err(map_sqlx_error)?;
969
970 let dedup_owned: Option<String> = if args.idempotency_key.is_empty() {
972 None
973 } else {
974 let inserted = sqlx::query(
975 "INSERT INTO ff_budget_usage_dedup \
976 (partition_key, dedup_key, outcome_json, applied_at_ms, expires_at_ms) \
977 VALUES ($1, $2, '{}'::jsonb, $3, $4) \
978 ON CONFLICT (partition_key, dedup_key) DO NOTHING \
979 RETURNING applied_at_ms",
980 )
981 .bind(partition_key)
982 .bind(&args.idempotency_key)
983 .bind(now)
984 .bind(now + DEDUP_TTL_MS)
985 .fetch_optional(&mut *tx)
986 .await
987 .map_err(map_sqlx_error)?;
988
989 if inserted.is_none() {
990 let row = sqlx::query(
991 "SELECT outcome_json FROM ff_budget_usage_dedup \
992 WHERE partition_key = $1 AND dedup_key = $2",
993 )
994 .bind(partition_key)
995 .bind(&args.idempotency_key)
996 .fetch_one(&mut *tx)
997 .await
998 .map_err(map_sqlx_error)?;
999 let outcome: JsonValue = row.get("outcome_json");
1000 tx.commit().await.map_err(map_sqlx_error)?;
1001 let _ = outcome; return Ok(ReportUsageResult::AlreadyApplied);
1010 }
1011 Some(args.idempotency_key.clone())
1012 };
1013
1014 let policy_row = sqlx::query(
1017 "SELECT policy_json FROM ff_budget_policy \
1018 WHERE partition_key = $1 AND budget_id = $2 FOR NO KEY UPDATE",
1019 )
1020 .bind(partition_key)
1021 .bind(&budget_id_str)
1022 .fetch_optional(&mut *tx)
1023 .await
1024 .map_err(map_sqlx_error)?;
1025
1026 let policy: JsonValue = match policy_row {
1027 Some(r) => r.get("policy_json"),
1028 None => JsonValue::Object(Default::default()),
1029 };
1030 let hard_limits = limits_from_policy(&policy, "hard_limits");
1031 let soft_limits = limits_from_policy(&policy, "soft_limits");
1032
1033 let mut per_dim_current: BTreeMap<String, u64> = BTreeMap::new();
1036 for (dim, delta) in args.deltas.iter() {
1037 let dim_key = dim_row_key(dim);
1038 sqlx::query(
1039 "INSERT INTO ff_budget_usage \
1040 (partition_key, budget_id, dimensions_key, current_value, updated_at_ms) \
1041 VALUES ($1, $2, $3, 0, $4) \
1042 ON CONFLICT (partition_key, budget_id, dimensions_key) DO NOTHING",
1043 )
1044 .bind(partition_key)
1045 .bind(&budget_id_str)
1046 .bind(&dim_key)
1047 .bind(now)
1048 .execute(&mut *tx)
1049 .await
1050 .map_err(map_sqlx_error)?;
1051
1052 let row = sqlx::query(
1053 "SELECT current_value FROM ff_budget_usage \
1054 WHERE partition_key = $1 AND budget_id = $2 AND dimensions_key = $3 \
1055 FOR UPDATE",
1056 )
1057 .bind(partition_key)
1058 .bind(&budget_id_str)
1059 .bind(&dim_key)
1060 .fetch_one(&mut *tx)
1061 .await
1062 .map_err(map_sqlx_error)?;
1063 let cur: i64 = row.get("current_value");
1064 let new_val = (cur as u64).saturating_add(*delta);
1065
1066 if let Some(hard) = hard_limits.get(dim)
1067 && *hard > 0
1068 && new_val > *hard
1069 {
1070 sqlx::query(
1071 "UPDATE ff_budget_policy \
1072 SET breach_count = breach_count + 1, \
1073 last_breach_at_ms = $3, \
1074 last_breach_dim = $4, \
1075 updated_at_ms = $3 \
1076 WHERE partition_key = $1 AND budget_id = $2",
1077 )
1078 .bind(partition_key)
1079 .bind(&budget_id_str)
1080 .bind(now)
1081 .bind(dim)
1082 .execute(&mut *tx)
1083 .await
1084 .map_err(map_sqlx_error)?;
1085
1086 let outcome = ReportUsageResult::HardBreach {
1087 dimension: dim.clone(),
1088 current_usage: cur as u64,
1089 hard_limit: *hard,
1090 };
1091 if let Some(dk) = dedup_owned.as_deref() {
1092 finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
1093 }
1094 tx.commit().await.map_err(map_sqlx_error)?;
1095 return Ok(outcome);
1096 }
1097 per_dim_current.insert(dim.clone(), new_val);
1098 }
1099
1100 let mut soft_breach: Option<ReportUsageResult> = None;
1103 for (dim, delta) in args.deltas.iter() {
1104 let dim_key = dim_row_key(dim);
1105 let new_val = per_dim_current[dim];
1106 sqlx::query(
1107 "UPDATE ff_budget_usage \
1108 SET current_value = current_value + $1, updated_at_ms = $2 \
1109 WHERE partition_key = $3 AND budget_id = $4 AND dimensions_key = $5",
1110 )
1111 .bind(*delta as i64)
1112 .bind(now)
1113 .bind(partition_key)
1114 .bind(&budget_id_str)
1115 .bind(&dim_key)
1116 .execute(&mut *tx)
1117 .await
1118 .map_err(map_sqlx_error)?;
1119
1120 sqlx::query(
1123 "INSERT INTO ff_budget_usage_by_exec \
1124 (partition_key, budget_id, execution_id, dimensions_key, \
1125 delta_total, updated_at_ms) \
1126 VALUES ($1, $2, $3, $4, $5, $6) \
1127 ON CONFLICT (partition_key, budget_id, execution_id, dimensions_key) \
1128 DO UPDATE SET delta_total = ff_budget_usage_by_exec.delta_total + EXCLUDED.delta_total, \
1129 updated_at_ms = EXCLUDED.updated_at_ms",
1130 )
1131 .bind(partition_key)
1132 .bind(&budget_id_str)
1133 .bind(exec_uuid)
1134 .bind(&dim_key)
1135 .bind(*delta as i64)
1136 .bind(now)
1137 .execute(&mut *tx)
1138 .await
1139 .map_err(map_sqlx_error)?;
1140
1141 if soft_breach.is_none()
1142 && let Some(soft) = soft_limits.get(dim)
1143 && *soft > 0
1144 && new_val > *soft
1145 {
1146 soft_breach = Some(ReportUsageResult::SoftBreach {
1147 dimension: dim.clone(),
1148 current_usage: new_val,
1149 soft_limit: *soft,
1150 });
1151 }
1152 }
1153
1154 if soft_breach.is_some() {
1155 sqlx::query(
1156 "UPDATE ff_budget_policy \
1157 SET soft_breach_count = soft_breach_count + 1, \
1158 updated_at_ms = $3 \
1159 WHERE partition_key = $1 AND budget_id = $2",
1160 )
1161 .bind(partition_key)
1162 .bind(&budget_id_str)
1163 .bind(now)
1164 .execute(&mut *tx)
1165 .await
1166 .map_err(map_sqlx_error)?;
1167 }
1168
1169 let outcome = soft_breach.unwrap_or(ReportUsageResult::Ok);
1170 if let Some(dk) = dedup_owned.as_deref() {
1171 finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
1172 }
1173 tx.commit().await.map_err(map_sqlx_error)?;
1174 Ok(outcome)
1175}
1176
1177pub(crate) async fn release_budget_impl(
1186 pool: &PgPool,
1187 partition_config: &PartitionConfig,
1188 args: ReleaseBudgetArgs,
1189) -> Result<(), EngineError> {
1190 let partition = budget_partition(&args.budget_id, partition_config);
1191 let partition_key: i16 = partition.index as i16;
1192 let budget_id_str = args.budget_id.to_string();
1193 let exec_uuid = exec_uuid(&args.execution_id)?;
1194 let now = now_ms();
1195
1196 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
1197 sqlx::query("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
1198 .execute(&mut *tx)
1199 .await
1200 .map_err(map_sqlx_error)?;
1201
1202 let rows = sqlx::query(
1206 "SELECT dimensions_key, delta_total \
1207 FROM ff_budget_usage_by_exec \
1208 WHERE partition_key = $1 AND budget_id = $2 AND execution_id = $3 \
1209 FOR UPDATE",
1210 )
1211 .bind(partition_key)
1212 .bind(&budget_id_str)
1213 .bind(exec_uuid)
1214 .fetch_all(&mut *tx)
1215 .await
1216 .map_err(map_sqlx_error)?;
1217
1218 if rows.is_empty() {
1219 tx.commit().await.map_err(map_sqlx_error)?;
1223 return Ok(());
1224 }
1225
1226 for row in &rows {
1227 let dim: String = row.get("dimensions_key");
1228 let delta: i64 = row.get("delta_total");
1229 sqlx::query(
1230 "UPDATE ff_budget_usage \
1231 SET current_value = GREATEST(0::bigint, current_value - $1), \
1232 updated_at_ms = $2 \
1233 WHERE partition_key = $3 AND budget_id = $4 AND dimensions_key = $5",
1234 )
1235 .bind(delta)
1236 .bind(now)
1237 .bind(partition_key)
1238 .bind(&budget_id_str)
1239 .bind(&dim)
1240 .execute(&mut *tx)
1241 .await
1242 .map_err(map_sqlx_error)?;
1243 }
1244
1245 sqlx::query(
1246 "DELETE FROM ff_budget_usage_by_exec \
1247 WHERE partition_key = $1 AND budget_id = $2 AND execution_id = $3",
1248 )
1249 .bind(partition_key)
1250 .bind(&budget_id_str)
1251 .bind(exec_uuid)
1252 .execute(&mut *tx)
1253 .await
1254 .map_err(map_sqlx_error)?;
1255
1256 tx.commit().await.map_err(map_sqlx_error)?;
1257 Ok(())
1258}