1use std::collections::{BTreeMap, HashMap};
29
30use ff_core::backend::UsageDimensions;
31use ff_core::contracts::{
32 BudgetStatus, CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs,
33 CreateQuotaPolicyResult, ReportUsageAdminArgs, ReportUsageResult, ResetBudgetArgs,
34 ResetBudgetResult,
35};
36use ff_core::engine_error::{backend_context, EngineError, ValidationKind};
37use ff_core::partition::{budget_partition, quota_partition, PartitionConfig};
38use ff_core::types::{BudgetId, TimestampMs};
39use serde_json::{json, Value as JsonValue};
40use sqlx::{PgPool, Row};
41
42use crate::error::map_sqlx_error;
43
44fn dim_row_key(name: &str) -> String {
53 name.to_string()
54}
55
56fn now_ms() -> i64 {
59 std::time::SystemTime::now()
60 .duration_since(std::time::UNIX_EPOCH)
61 .map(|d| d.as_millis() as i64)
62 .unwrap_or(0)
63}
64
65fn outcome_to_json(r: &ReportUsageResult) -> JsonValue {
75 match r {
76 ReportUsageResult::Ok => json!({"kind": "Ok"}),
77 ReportUsageResult::AlreadyApplied => json!({"kind": "AlreadyApplied"}),
78 ReportUsageResult::SoftBreach {
79 dimension,
80 current_usage,
81 soft_limit,
82 } => json!({
83 "kind": "SoftBreach",
84 "dimension": dimension,
85 "current_usage": current_usage,
86 "soft_limit": soft_limit,
87 }),
88 ReportUsageResult::HardBreach {
89 dimension,
90 current_usage,
91 hard_limit,
92 } => json!({
93 "kind": "HardBreach",
94 "dimension": dimension,
95 "current_usage": current_usage,
96 "hard_limit": hard_limit,
97 }),
98 _ => json!({"kind": "Ok"}),
103 }
104}
105
106fn outcome_from_json(v: &JsonValue) -> Result<ReportUsageResult, EngineError> {
108 let kind = v.get("kind").and_then(|k| k.as_str()).ok_or_else(|| {
109 EngineError::Validation {
110 kind: ValidationKind::Corruption,
111 detail: "budget dedup outcome_json missing `kind`".into(),
112 }
113 })?;
114 match kind {
115 "Ok" => Ok(ReportUsageResult::Ok),
116 "AlreadyApplied" => Ok(ReportUsageResult::AlreadyApplied),
117 "SoftBreach" => Ok(ReportUsageResult::SoftBreach {
118 dimension: v
119 .get("dimension")
120 .and_then(|d| d.as_str())
121 .unwrap_or_default()
122 .to_string(),
123 current_usage: v.get("current_usage").and_then(|d| d.as_u64()).unwrap_or(0),
124 soft_limit: v.get("soft_limit").and_then(|d| d.as_u64()).unwrap_or(0),
125 }),
126 "HardBreach" => Ok(ReportUsageResult::HardBreach {
127 dimension: v
128 .get("dimension")
129 .and_then(|d| d.as_str())
130 .unwrap_or_default()
131 .to_string(),
132 current_usage: v.get("current_usage").and_then(|d| d.as_u64()).unwrap_or(0),
133 hard_limit: v.get("hard_limit").and_then(|d| d.as_u64()).unwrap_or(0),
134 }),
135 other => Err(EngineError::Validation {
136 kind: ValidationKind::Corruption,
137 detail: format!("budget dedup outcome_json unknown kind: {other}"),
138 }),
139 }
140}
141
142fn limits_from_policy(policy: &JsonValue, key: &str) -> BTreeMap<String, u64> {
147 policy
148 .get(key)
149 .and_then(|v| v.as_object())
150 .map(|obj| {
151 obj.iter()
152 .filter_map(|(k, v)| v.as_u64().map(|n| (k.clone(), n)))
153 .collect()
154 })
155 .unwrap_or_default()
156}
157
158const DEDUP_TTL_MS: i64 = 24 * 60 * 60 * 1_000;
161
162pub(crate) async fn report_usage_impl(
169 pool: &PgPool,
170 partition_config: &PartitionConfig,
171 budget: &BudgetId,
172 dimensions: UsageDimensions,
173) -> Result<ReportUsageResult, EngineError> {
174 report_usage_and_check_core(pool, partition_config, budget, dimensions).await
175}
176
177pub(crate) async fn report_usage_admin_impl(
182 pool: &PgPool,
183 partition_config: &PartitionConfig,
184 budget: &BudgetId,
185 args: ReportUsageAdminArgs,
186) -> Result<ReportUsageResult, EngineError> {
187 if args.dimensions.len() != args.deltas.len() {
188 return Err(EngineError::Validation {
189 kind: ValidationKind::InvalidInput,
190 detail: "report_usage_admin: dimensions and deltas length mismatch".into(),
191 });
192 }
193 let mut custom: BTreeMap<String, u64> = BTreeMap::new();
194 for (d, v) in args.dimensions.into_iter().zip(args.deltas) {
195 custom.insert(d, v);
196 }
197 let mut ud = UsageDimensions::new();
203 ud.custom = custom;
204 ud.dedup_key = args.dedup_key;
205 report_usage_and_check_core(pool, partition_config, budget, ud).await
206}
207
208async fn report_usage_and_check_core(
215 pool: &PgPool,
216 partition_config: &PartitionConfig,
217 budget: &BudgetId,
218 dimensions: UsageDimensions,
219) -> Result<ReportUsageResult, EngineError> {
220 let partition = budget_partition(budget, partition_config);
221 let partition_key: i16 = partition.index as i16;
222 let budget_id_str = budget.to_string();
223 let now = now_ms();
224
225 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
226
227 sqlx::query("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
229 .execute(&mut *tx)
230 .await
231 .map_err(map_sqlx_error)?;
232
233 let dedup_owned = match dimensions.dedup_key.as_deref().filter(|k| !k.is_empty()) {
240 Some(dk) => {
241 let inserted = sqlx::query(
242 "INSERT INTO ff_budget_usage_dedup \
243 (partition_key, dedup_key, outcome_json, applied_at_ms, expires_at_ms) \
244 VALUES ($1, $2, '{}'::jsonb, $3, $4) \
245 ON CONFLICT (partition_key, dedup_key) DO NOTHING \
246 RETURNING applied_at_ms",
247 )
248 .bind(partition_key)
249 .bind(dk)
250 .bind(now)
251 .bind(now + DEDUP_TTL_MS)
252 .fetch_optional(&mut *tx)
253 .await
254 .map_err(map_sqlx_error)?;
255
256 if inserted.is_none() {
257 let row = sqlx::query(
259 "SELECT outcome_json FROM ff_budget_usage_dedup \
260 WHERE partition_key = $1 AND dedup_key = $2",
261 )
262 .bind(partition_key)
263 .bind(dk)
264 .fetch_one(&mut *tx)
265 .await
266 .map_err(map_sqlx_error)?;
267 let outcome: JsonValue = row.get("outcome_json");
268 tx.commit().await.map_err(map_sqlx_error)?;
269 if outcome.as_object().map(|o| o.is_empty()).unwrap_or(false) {
274 return Ok(ReportUsageResult::AlreadyApplied);
275 }
276 return outcome_from_json(&outcome);
277 }
278 Some(dk.to_string())
279 }
280 None => None,
281 };
282
283 let policy_row = sqlx::query(
291 "SELECT policy_json FROM ff_budget_policy \
292 WHERE partition_key = $1 AND budget_id = $2 FOR NO KEY UPDATE",
293 )
294 .bind(partition_key)
295 .bind(&budget_id_str)
296 .fetch_optional(&mut *tx)
297 .await
298 .map_err(map_sqlx_error)?;
299
300 let policy: JsonValue = match policy_row {
301 Some(r) => r.get("policy_json"),
302 None => JsonValue::Object(Default::default()),
305 };
306 let hard_limits = limits_from_policy(&policy, "hard_limits");
307 let soft_limits = limits_from_policy(&policy, "soft_limits");
308
309 let mut per_dim_current: BTreeMap<String, u64> = BTreeMap::new();
314 for (dim, delta) in dimensions.custom.iter() {
315 let dim_key = dim_row_key(dim);
316 sqlx::query(
317 "INSERT INTO ff_budget_usage \
318 (partition_key, budget_id, dimensions_key, current_value, updated_at_ms) \
319 VALUES ($1, $2, $3, 0, $4) \
320 ON CONFLICT (partition_key, budget_id, dimensions_key) DO NOTHING",
321 )
322 .bind(partition_key)
323 .bind(&budget_id_str)
324 .bind(&dim_key)
325 .bind(now)
326 .execute(&mut *tx)
327 .await
328 .map_err(map_sqlx_error)?;
329
330 let row = sqlx::query(
331 "SELECT current_value FROM ff_budget_usage \
332 WHERE partition_key = $1 AND budget_id = $2 AND dimensions_key = $3 \
333 FOR UPDATE",
334 )
335 .bind(partition_key)
336 .bind(&budget_id_str)
337 .bind(&dim_key)
338 .fetch_one(&mut *tx)
339 .await
340 .map_err(map_sqlx_error)?;
341 let cur: i64 = row.get("current_value");
342 let new_val = (cur as u64).saturating_add(*delta);
343
344 if let Some(hard) = hard_limits.get(dim)
345 && *hard > 0
346 && new_val > *hard
347 {
348 sqlx::query(
354 "UPDATE ff_budget_policy \
355 SET breach_count = breach_count + 1, \
356 last_breach_at_ms = $3, \
357 last_breach_dim = $4, \
358 updated_at_ms = $3 \
359 WHERE partition_key = $1 AND budget_id = $2",
360 )
361 .bind(partition_key)
362 .bind(&budget_id_str)
363 .bind(now)
364 .bind(dim)
365 .execute(&mut *tx)
366 .await
367 .map_err(map_sqlx_error)?;
368
369 let outcome = ReportUsageResult::HardBreach {
370 dimension: dim.clone(),
371 current_usage: cur as u64,
372 hard_limit: *hard,
373 };
374 if let Some(dk) = dedup_owned.as_deref() {
375 finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
376 }
377 tx.commit().await.map_err(map_sqlx_error)?;
378 return Ok(outcome);
379 }
380 per_dim_current.insert(dim.clone(), new_val);
381 }
382
383 let mut soft_breach: Option<ReportUsageResult> = None;
386 for (dim, delta) in dimensions.custom.iter() {
387 let dim_key = dim_row_key(dim);
388 let new_val = per_dim_current[dim];
389 sqlx::query(
390 "UPDATE ff_budget_usage \
391 SET current_value = current_value + $1, updated_at_ms = $2 \
392 WHERE partition_key = $3 AND budget_id = $4 AND dimensions_key = $5",
393 )
394 .bind(*delta as i64)
395 .bind(now)
396 .bind(partition_key)
397 .bind(&budget_id_str)
398 .bind(&dim_key)
399 .execute(&mut *tx)
400 .await
401 .map_err(map_sqlx_error)?;
402
403 if soft_breach.is_none()
404 && let Some(soft) = soft_limits.get(dim)
405 && *soft > 0
406 && new_val > *soft
407 {
408 soft_breach = Some(ReportUsageResult::SoftBreach {
409 dimension: dim.clone(),
410 current_usage: new_val,
411 soft_limit: *soft,
412 });
413 }
414 }
415
416 if soft_breach.is_some() {
420 sqlx::query(
421 "UPDATE ff_budget_policy \
422 SET soft_breach_count = soft_breach_count + 1, \
423 updated_at_ms = $3 \
424 WHERE partition_key = $1 AND budget_id = $2",
425 )
426 .bind(partition_key)
427 .bind(&budget_id_str)
428 .bind(now)
429 .execute(&mut *tx)
430 .await
431 .map_err(map_sqlx_error)?;
432 }
433
434 let outcome = soft_breach.unwrap_or(ReportUsageResult::Ok);
435 if let Some(dk) = dedup_owned.as_deref() {
436 finalize_dedup(&mut tx, partition_key, dk, &outcome).await?;
437 }
438 tx.commit().await.map_err(map_sqlx_error)?;
439 Ok(outcome)
440}
441
442async fn finalize_dedup(
445 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
446 partition_key: i16,
447 dedup_key: &str,
448 outcome: &ReportUsageResult,
449) -> Result<(), EngineError> {
450 let json = outcome_to_json(outcome);
451 sqlx::query(
452 "UPDATE ff_budget_usage_dedup SET outcome_json = $1 \
453 WHERE partition_key = $2 AND dedup_key = $3",
454 )
455 .bind(json)
456 .bind(partition_key)
457 .bind(dedup_key)
458 .execute(&mut **tx)
459 .await
460 .map_err(map_sqlx_error)?;
461 Ok(())
462}
463
464#[doc(hidden)]
469pub async fn upsert_policy_for_test(
470 pool: &PgPool,
471 partition_config: &PartitionConfig,
472 budget: &BudgetId,
473 policy_json: JsonValue,
474) -> Result<(), EngineError> {
475 let partition = budget_partition(budget, partition_config);
476 let partition_key: i16 = partition.index as i16;
477 let now = now_ms();
478 sqlx::query(
479 "INSERT INTO ff_budget_policy \
480 (partition_key, budget_id, policy_json, created_at_ms, updated_at_ms) \
481 VALUES ($1, $2, $3, $4, $4) \
482 ON CONFLICT (partition_key, budget_id) DO UPDATE \
483 SET policy_json = EXCLUDED.policy_json, \
484 updated_at_ms = EXCLUDED.updated_at_ms",
485 )
486 .bind(partition_key)
487 .bind(budget.to_string())
488 .bind(policy_json)
489 .bind(now)
490 .execute(pool)
491 .await
492 .map_err(map_sqlx_error)?;
493 Ok(())
494}
495
496fn build_policy_json(args: &CreateBudgetArgs) -> JsonValue {
507 let mut hard = serde_json::Map::new();
508 let mut soft = serde_json::Map::new();
509 for (i, dim) in args.dimensions.iter().enumerate() {
510 if let Some(h) = args.hard_limits.get(i).copied() {
511 hard.insert(dim.clone(), json!(h));
512 }
513 if let Some(s) = args.soft_limits.get(i).copied() {
514 soft.insert(dim.clone(), json!(s));
515 }
516 }
517 json!({
518 "hard_limits": hard,
519 "soft_limits": soft,
520 "reset_interval_ms": args.reset_interval_ms,
521 "on_hard_limit": args.on_hard_limit,
522 "on_soft_limit": args.on_soft_limit,
523 })
524}
525
526pub(crate) async fn create_budget_impl(
527 pool: &PgPool,
528 partition_config: &PartitionConfig,
529 args: CreateBudgetArgs,
530) -> Result<CreateBudgetResult, EngineError> {
531 if args.dimensions.len() != args.hard_limits.len()
532 || args.dimensions.len() != args.soft_limits.len()
533 {
534 return Err(EngineError::Validation {
535 kind: ValidationKind::InvalidInput,
536 detail: "create_budget: dimensions / hard_limits / soft_limits length mismatch"
537 .into(),
538 });
539 }
540
541 let partition = budget_partition(&args.budget_id, partition_config);
542 let partition_key: i16 = partition.index as i16;
543 let budget_id = args.budget_id.clone();
544 let now: i64 = args.now.0;
545 let policy_json = build_policy_json(&args);
546 let reset_interval_ms = args.reset_interval_ms as i64;
547
548 let row = sqlx::query(
553 "INSERT INTO ff_budget_policy \
554 (partition_key, budget_id, policy_json, scope_type, scope_id, \
555 enforcement_mode, breach_count, soft_breach_count, \
556 last_breach_at_ms, last_breach_dim, next_reset_at_ms, \
557 created_at_ms, updated_at_ms) \
558 VALUES ($1, $2, $3, $4, $5, $6, 0, 0, NULL, NULL, \
559 CASE WHEN $7::bigint > 0 THEN $8::bigint + $7::bigint ELSE NULL END, \
560 $8, $8) \
561 ON CONFLICT (partition_key, budget_id) DO NOTHING \
562 RETURNING created_at_ms",
563 )
564 .bind(partition_key)
565 .bind(budget_id.to_string())
566 .bind(policy_json)
567 .bind(&args.scope_type)
568 .bind(&args.scope_id)
569 .bind(&args.enforcement_mode)
570 .bind(reset_interval_ms)
571 .bind(now)
572 .fetch_optional(pool)
573 .await
574 .map_err(map_sqlx_error)?;
575
576 match row {
577 Some(_) => Ok(CreateBudgetResult::Created { budget_id }),
578 None => Ok(CreateBudgetResult::AlreadySatisfied { budget_id }),
579 }
580}
581
582pub(crate) async fn reset_budget_impl(
587 pool: &PgPool,
588 partition_config: &PartitionConfig,
589 args: ResetBudgetArgs,
590) -> Result<ResetBudgetResult, EngineError> {
591 let partition = budget_partition(&args.budget_id, partition_config);
592 let partition_key: i16 = partition.index as i16;
593 let budget_id_str = args.budget_id.to_string();
594 let now: i64 = args.now.0;
595
596 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
597 sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
601 .execute(&mut *tx)
602 .await
603 .map_err(map_sqlx_error)?;
604
605 let policy_row = sqlx::query(
609 "SELECT policy_json FROM ff_budget_policy \
610 WHERE partition_key = $1 AND budget_id = $2 FOR NO KEY UPDATE",
611 )
612 .bind(partition_key)
613 .bind(&budget_id_str)
614 .fetch_optional(&mut *tx)
615 .await
616 .map_err(map_sqlx_error)?;
617 let Some(policy_row) = policy_row else {
618 tx.rollback().await.map_err(map_sqlx_error)?;
619 return Err(backend_context(
620 EngineError::NotFound { entity: "budget" },
621 format!("reset_budget: {}", args.budget_id),
622 ));
623 };
624 let policy_json: JsonValue = policy_row.get("policy_json");
625 let reset_interval_ms: i64 = policy_json
626 .get("reset_interval_ms")
627 .and_then(|v| v.as_i64())
628 .unwrap_or(0);
629
630 sqlx::query(
632 "UPDATE ff_budget_usage \
633 SET current_value = 0, last_reset_at_ms = $3, updated_at_ms = $3 \
634 WHERE partition_key = $1 AND budget_id = $2",
635 )
636 .bind(partition_key)
637 .bind(&budget_id_str)
638 .bind(now)
639 .execute(&mut *tx)
640 .await
641 .map_err(map_sqlx_error)?;
642
643 let row = sqlx::query(
645 "UPDATE ff_budget_policy \
646 SET last_breach_at_ms = NULL, \
647 last_breach_dim = NULL, \
648 updated_at_ms = $3, \
649 next_reset_at_ms = CASE \
650 WHEN $4::bigint > 0 THEN $3 + $4::bigint \
651 ELSE NULL \
652 END \
653 WHERE partition_key = $1 AND budget_id = $2 \
654 RETURNING next_reset_at_ms",
655 )
656 .bind(partition_key)
657 .bind(&budget_id_str)
658 .bind(now)
659 .bind(reset_interval_ms)
660 .fetch_one(&mut *tx)
661 .await
662 .map_err(map_sqlx_error)?;
663
664 let next_reset: Option<i64> = row
665 .try_get::<Option<i64>, _>("next_reset_at_ms")
666 .map_err(map_sqlx_error)?;
667
668 tx.commit().await.map_err(map_sqlx_error)?;
669
670 Ok(ResetBudgetResult::Reset {
671 next_reset_at: TimestampMs(next_reset.unwrap_or(0)),
676 })
677}
678
679pub(crate) async fn create_quota_policy_impl(
684 pool: &PgPool,
685 partition_config: &PartitionConfig,
686 args: CreateQuotaPolicyArgs,
687) -> Result<CreateQuotaPolicyResult, EngineError> {
688 let partition = quota_partition(&args.quota_policy_id, partition_config);
689 let partition_key: i16 = partition.index as i16;
690 let qid = args.quota_policy_id.clone();
691 let now: i64 = args.now.0;
692
693 let row = sqlx::query(
694 "INSERT INTO ff_quota_policy \
695 (partition_key, quota_policy_id, requests_per_window_seconds, \
696 max_requests_per_window, active_concurrency_cap, \
697 active_concurrency, created_at_ms, updated_at_ms) \
698 VALUES ($1, $2, $3, $4, $5, 0, $6, $6) \
699 ON CONFLICT (partition_key, quota_policy_id) DO NOTHING \
700 RETURNING created_at_ms",
701 )
702 .bind(partition_key)
703 .bind(qid.to_string())
704 .bind(args.window_seconds as i64)
705 .bind(args.max_requests_per_window as i64)
706 .bind(args.max_concurrent as i64)
707 .bind(now)
708 .fetch_optional(pool)
709 .await
710 .map_err(map_sqlx_error)?;
711
712 match row {
713 Some(_) => Ok(CreateQuotaPolicyResult::Created {
714 quota_policy_id: qid,
715 }),
716 None => Ok(CreateQuotaPolicyResult::AlreadySatisfied {
717 quota_policy_id: qid,
718 }),
719 }
720}
721
722pub(crate) async fn get_budget_status_impl(
727 pool: &PgPool,
728 partition_config: &PartitionConfig,
729 budget: &BudgetId,
730) -> Result<BudgetStatus, EngineError> {
731 let partition = budget_partition(budget, partition_config);
732 let partition_key: i16 = partition.index as i16;
733 let budget_id_str = budget.to_string();
734
735 let policy_row = sqlx::query(
737 "SELECT scope_type, scope_id, enforcement_mode, \
738 breach_count, soft_breach_count, \
739 last_breach_at_ms, last_breach_dim, \
740 next_reset_at_ms, created_at_ms, \
741 policy_json \
742 FROM ff_budget_policy \
743 WHERE partition_key = $1 AND budget_id = $2",
744 )
745 .bind(partition_key)
746 .bind(&budget_id_str)
747 .fetch_optional(pool)
748 .await
749 .map_err(map_sqlx_error)?;
750 let Some(policy_row) = policy_row else {
751 return Err(backend_context(
752 EngineError::NotFound { entity: "budget" },
753 format!("get_budget_status: {budget}"),
754 ));
755 };
756
757 let scope_type: String = policy_row.get("scope_type");
758 let scope_id: String = policy_row.get("scope_id");
759 let enforcement_mode: String = policy_row.get("enforcement_mode");
760 let breach_count: i64 = policy_row.get("breach_count");
761 let soft_breach_count: i64 = policy_row.get("soft_breach_count");
762 let last_breach_at_ms: Option<i64> = policy_row
763 .try_get::<Option<i64>, _>("last_breach_at_ms")
764 .map_err(map_sqlx_error)?;
765 let last_breach_dim: Option<String> = policy_row
766 .try_get::<Option<String>, _>("last_breach_dim")
767 .map_err(map_sqlx_error)?;
768 let next_reset_at_ms: Option<i64> = policy_row
769 .try_get::<Option<i64>, _>("next_reset_at_ms")
770 .map_err(map_sqlx_error)?;
771 let created_at_ms: i64 = policy_row.get("created_at_ms");
772 let policy_json: JsonValue = policy_row.get("policy_json");
773
774 let usage_rows = sqlx::query(
776 "SELECT dimensions_key, current_value \
777 FROM ff_budget_usage \
778 WHERE partition_key = $1 AND budget_id = $2",
779 )
780 .bind(partition_key)
781 .bind(&budget_id_str)
782 .fetch_all(pool)
783 .await
784 .map_err(map_sqlx_error)?;
785
786 let mut usage: HashMap<String, u64> = HashMap::new();
787 for r in &usage_rows {
788 let key: String = r.get("dimensions_key");
789 let val: i64 = r.get("current_value");
790 if key == "_init" {
791 continue;
792 }
793 usage.insert(key, val.max(0) as u64);
794 }
795
796 let hard_limits: HashMap<String, u64> =
800 limits_from_policy(&policy_json, "hard_limits").into_iter().collect();
801 let soft_limits: HashMap<String, u64> =
802 limits_from_policy(&policy_json, "soft_limits").into_iter().collect();
803
804 let fmt_opt = |v: Option<i64>| -> Option<String> { v.map(|n| n.to_string()) };
807
808 Ok(BudgetStatus {
809 budget_id: budget.to_string(),
810 scope_type,
811 scope_id,
812 enforcement_mode,
813 usage,
814 hard_limits,
815 soft_limits,
816 breach_count: breach_count.max(0) as u64,
817 soft_breach_count: soft_breach_count.max(0) as u64,
818 last_breach_at: fmt_opt(last_breach_at_ms),
819 last_breach_dim,
820 next_reset_at: fmt_opt(next_reset_at_ms),
821 created_at: Some(created_at_ms.to_string()),
822 })
823}
824
825pub(crate) async fn budget_reset_reconciler_apply(
837 pool: &PgPool,
838 partition_key: i16,
839 budget_id: &str,
840 now: i64,
841) -> Result<(), EngineError> {
842 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
843 sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
844 .execute(&mut *tx)
845 .await
846 .map_err(map_sqlx_error)?;
847
848 let policy_row = sqlx::query(
849 "SELECT policy_json, next_reset_at_ms FROM ff_budget_policy \
850 WHERE partition_key = $1 AND budget_id = $2 FOR NO KEY UPDATE",
851 )
852 .bind(partition_key)
853 .bind(budget_id)
854 .fetch_optional(&mut *tx)
855 .await
856 .map_err(map_sqlx_error)?;
857 let Some(policy_row) = policy_row else {
858 tx.rollback().await.map_err(map_sqlx_error)?;
859 return Ok(());
860 };
861 let next_reset: Option<i64> = policy_row
865 .try_get::<Option<i64>, _>("next_reset_at_ms")
866 .map_err(map_sqlx_error)?;
867 if !matches!(next_reset, Some(n) if n <= now) {
868 tx.rollback().await.map_err(map_sqlx_error)?;
869 return Ok(());
870 }
871 let policy_json: JsonValue = policy_row.get("policy_json");
872 let reset_interval_ms: i64 = policy_json
873 .get("reset_interval_ms")
874 .and_then(|v| v.as_i64())
875 .unwrap_or(0);
876
877 sqlx::query(
878 "UPDATE ff_budget_usage \
879 SET current_value = 0, last_reset_at_ms = $3, updated_at_ms = $3 \
880 WHERE partition_key = $1 AND budget_id = $2",
881 )
882 .bind(partition_key)
883 .bind(budget_id)
884 .bind(now)
885 .execute(&mut *tx)
886 .await
887 .map_err(map_sqlx_error)?;
888
889 sqlx::query(
890 "UPDATE ff_budget_policy \
891 SET last_breach_at_ms = NULL, \
892 last_breach_dim = NULL, \
893 updated_at_ms = $3, \
894 next_reset_at_ms = CASE \
895 WHEN $4::bigint > 0 THEN $3 + $4::bigint \
896 ELSE NULL \
897 END \
898 WHERE partition_key = $1 AND budget_id = $2",
899 )
900 .bind(partition_key)
901 .bind(budget_id)
902 .bind(now)
903 .bind(reset_interval_ms)
904 .execute(&mut *tx)
905 .await
906 .map_err(map_sqlx_error)?;
907
908 tx.commit().await.map_err(map_sqlx_error)?;
909 Ok(())
910}