1use std::collections::HashMap;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4#[derive(Debug, thiserror::Error)]
5pub enum BudgetStoreError {
6 #[error("sqlite error: {0}")]
7 Sqlite(#[from] rusqlite::Error),
8
9 #[error("failed to prepare budget store directory: {0}")]
10 Io(#[from] std::io::Error),
11
12 #[error("budget arithmetic overflow: {0}")]
13 Overflow(String),
14
15 #[error("budget state invariant violated: {0}")]
16 Invariant(String),
17}
18
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub struct BudgetUsageRecord {
21 pub capability_id: String,
22 pub grant_index: u32,
23 pub invocation_count: u32,
24 pub updated_at: i64,
25 pub seq: u64,
26 pub total_cost_exposed: u64,
27 pub total_cost_realized_spend: u64,
28}
29
30impl BudgetUsageRecord {
31 pub fn committed_cost_units(&self) -> Result<u64, BudgetStoreError> {
32 checked_committed_cost_units(self.total_cost_exposed, self.total_cost_realized_spend)
33 }
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum BudgetMutationKind {
38 IncrementInvocation,
39 AuthorizeExposure,
40 ReverseExposure,
41 ReleaseExposure,
42 ReconcileSpend,
43}
44
45impl BudgetMutationKind {
46 pub fn as_str(self) -> &'static str {
47 match self {
48 Self::IncrementInvocation => "increment_invocation",
49 Self::AuthorizeExposure => "authorize_exposure",
50 Self::ReverseExposure => "reverse_exposure",
51 Self::ReleaseExposure => "release_exposure",
52 Self::ReconcileSpend => "reconcile_spend",
53 }
54 }
55
56 pub fn parse(value: &str) -> Option<Self> {
57 match value {
58 "increment_invocation" => Some(Self::IncrementInvocation),
59 "authorize_exposure" => Some(Self::AuthorizeExposure),
60 "reverse_exposure" => Some(Self::ReverseExposure),
61 "release_exposure" => Some(Self::ReleaseExposure),
62 "reconcile_spend" => Some(Self::ReconcileSpend),
63 _ => None,
64 }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct BudgetEventAuthority {
70 pub authority_id: String,
71 pub lease_id: String,
72 pub lease_epoch: u64,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct BudgetMutationRecord {
77 pub event_id: String,
78 pub hold_id: Option<String>,
79 pub capability_id: String,
80 pub grant_index: u32,
81 pub kind: BudgetMutationKind,
82 pub allowed: Option<bool>,
83 pub recorded_at: i64,
84 pub event_seq: u64,
85 pub usage_seq: Option<u64>,
86 pub exposure_units: u64,
87 pub realized_spend_units: u64,
88 pub max_invocations: Option<u32>,
89 pub max_cost_per_invocation: Option<u64>,
90 pub max_total_cost_units: Option<u64>,
91 pub invocation_count_after: u32,
92 pub total_cost_exposed_after: u64,
93 pub total_cost_realized_spend_after: u64,
94 pub authority: Option<BudgetEventAuthority>,
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum BudgetGuaranteeLevel {
99 SingleNodeAtomic,
100 HaLinearizable,
101 PartitionEscrowed,
102 AdvisoryPosthoc,
103}
104
105impl BudgetGuaranteeLevel {
106 pub fn as_str(self) -> &'static str {
107 match self {
108 Self::SingleNodeAtomic => "single_node_atomic",
109 Self::HaLinearizable => "ha_linearizable",
110 Self::PartitionEscrowed => "partition_escrowed",
111 Self::AdvisoryPosthoc => "advisory_posthoc",
112 }
113 }
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq)]
117pub enum BudgetAuthorityProfile {
118 AuthoritativeHoldEvent,
119}
120
121impl BudgetAuthorityProfile {
122 pub fn as_str(self) -> &'static str {
123 match self {
124 Self::AuthoritativeHoldEvent => "authoritative_hold_event",
125 }
126 }
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub enum BudgetMeteringProfile {
131 MaxCostPreauthorizeThenReconcileActual,
132}
133
134impl BudgetMeteringProfile {
135 pub fn as_str(self) -> &'static str {
136 match self {
137 Self::MaxCostPreauthorizeThenReconcileActual => {
138 "max_cost_preauthorize_then_reconcile_actual"
139 }
140 }
141 }
142}
143
144#[derive(Debug, Clone, PartialEq, Eq)]
145pub struct BudgetCommitMetadata {
146 pub authority: Option<BudgetEventAuthority>,
147 pub guarantee_level: BudgetGuaranteeLevel,
148 pub budget_profile: BudgetAuthorityProfile,
149 pub metering_profile: BudgetMeteringProfile,
150 pub budget_commit_index: Option<u64>,
151 pub event_id: Option<String>,
152}
153
154impl BudgetCommitMetadata {
155 pub fn budget_term(&self) -> Option<String> {
156 self.authority
157 .as_ref()
158 .map(|authority| format!("{}:{}", authority.authority_id, authority.lease_epoch))
159 }
160}
161
162fn budget_commit_metadata<T: BudgetStore + ?Sized>(
163 store: &T,
164 authority: Option<BudgetEventAuthority>,
165 budget_commit_index: Option<u64>,
166 event_id: Option<String>,
167) -> BudgetCommitMetadata {
168 BudgetCommitMetadata {
169 authority,
170 guarantee_level: store.budget_guarantee_level(),
171 budget_profile: store.budget_authority_profile(),
172 metering_profile: store.budget_metering_profile(),
173 budget_commit_index,
174 event_id,
175 }
176}
177
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub struct BudgetAuthorizeHoldRequest {
180 pub capability_id: String,
181 pub grant_index: usize,
182 pub max_invocations: Option<u32>,
183 pub requested_exposure_units: u64,
184 pub max_cost_per_invocation: Option<u64>,
185 pub max_total_cost_units: Option<u64>,
186 pub hold_id: Option<String>,
187 pub event_id: Option<String>,
188 pub authority: Option<BudgetEventAuthority>,
189}
190
191#[derive(Debug, Clone, PartialEq, Eq)]
192pub struct BudgetReleaseHoldRequest {
193 pub capability_id: String,
194 pub grant_index: usize,
195 pub released_exposure_units: u64,
196 pub hold_id: Option<String>,
197 pub event_id: Option<String>,
198 pub authority: Option<BudgetEventAuthority>,
199}
200
201#[derive(Debug, Clone, PartialEq, Eq)]
202pub struct BudgetReverseHoldRequest {
203 pub capability_id: String,
204 pub grant_index: usize,
205 pub reversed_exposure_units: u64,
206 pub hold_id: Option<String>,
207 pub event_id: Option<String>,
208 pub authority: Option<BudgetEventAuthority>,
209}
210
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub struct BudgetReconcileHoldRequest {
213 pub capability_id: String,
214 pub grant_index: usize,
215 pub exposed_cost_units: u64,
216 pub realized_spend_units: u64,
217 pub hold_id: Option<String>,
218 pub event_id: Option<String>,
219 pub authority: Option<BudgetEventAuthority>,
220}
221
222pub type BudgetCaptureHoldRequest = BudgetReconcileHoldRequest;
223
224#[derive(Debug, Clone, PartialEq, Eq)]
225pub struct AuthorizedBudgetHold {
226 pub hold_id: Option<String>,
227 pub authorized_exposure_units: u64,
228 pub committed_cost_units_after: u64,
229 pub invocation_count_after: u32,
230 pub metadata: BudgetCommitMetadata,
231}
232
233#[derive(Debug, Clone, PartialEq, Eq)]
234pub struct DeniedBudgetHold {
235 pub hold_id: Option<String>,
236 pub attempted_exposure_units: u64,
237 pub committed_cost_units_after: u64,
238 pub invocation_count_after: u32,
239 pub metadata: BudgetCommitMetadata,
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub enum BudgetAuthorizeHoldDecision {
244 Authorized(AuthorizedBudgetHold),
245 Denied(DeniedBudgetHold),
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub struct BudgetHoldMutationDecision {
250 pub hold_id: Option<String>,
251 pub exposure_units: u64,
252 pub realized_spend_units: u64,
253 pub committed_cost_units_after: u64,
254 pub invocation_count_after: u32,
255 pub metadata: BudgetCommitMetadata,
256}
257
258pub type BudgetReleaseHoldDecision = BudgetHoldMutationDecision;
259pub type BudgetReverseHoldDecision = BudgetHoldMutationDecision;
260pub type BudgetReconcileHoldDecision = BudgetHoldMutationDecision;
261pub type BudgetCaptureHoldDecision = BudgetHoldMutationDecision;
262
263pub trait BudgetStore: Send {
264 fn try_increment(
265 &mut self,
266 capability_id: &str,
267 grant_index: usize,
268 max_invocations: Option<u32>,
269 ) -> Result<bool, BudgetStoreError>;
270
271 fn try_charge_cost(
290 &mut self,
291 capability_id: &str,
292 grant_index: usize,
293 max_invocations: Option<u32>,
294 cost_units: u64,
295 max_cost_per_invocation: Option<u64>,
296 max_total_cost_units: Option<u64>,
297 ) -> Result<bool, BudgetStoreError>;
298
299 #[allow(clippy::too_many_arguments)]
300 fn try_charge_cost_with_ids(
301 &mut self,
302 capability_id: &str,
303 grant_index: usize,
304 max_invocations: Option<u32>,
305 cost_units: u64,
306 max_cost_per_invocation: Option<u64>,
307 max_total_cost_units: Option<u64>,
308 hold_id: Option<&str>,
309 event_id: Option<&str>,
310 ) -> Result<bool, BudgetStoreError> {
311 let _ = hold_id;
312 let _ = event_id;
313 self.try_charge_cost(
314 capability_id,
315 grant_index,
316 max_invocations,
317 cost_units,
318 max_cost_per_invocation,
319 max_total_cost_units,
320 )
321 }
322
323 #[allow(clippy::too_many_arguments)]
324 fn try_charge_cost_with_ids_and_authority(
325 &mut self,
326 capability_id: &str,
327 grant_index: usize,
328 max_invocations: Option<u32>,
329 cost_units: u64,
330 max_cost_per_invocation: Option<u64>,
331 max_total_cost_units: Option<u64>,
332 hold_id: Option<&str>,
333 event_id: Option<&str>,
334 authority: Option<&BudgetEventAuthority>,
335 ) -> Result<bool, BudgetStoreError> {
336 let _ = authority;
337 self.try_charge_cost_with_ids(
338 capability_id,
339 grant_index,
340 max_invocations,
341 cost_units,
342 max_cost_per_invocation,
343 max_total_cost_units,
344 hold_id,
345 event_id,
346 )
347 }
348
349 fn reverse_charge_cost(
351 &mut self,
352 capability_id: &str,
353 grant_index: usize,
354 cost_units: u64,
355 ) -> Result<(), BudgetStoreError>;
356
357 fn reverse_charge_cost_with_ids(
358 &mut self,
359 capability_id: &str,
360 grant_index: usize,
361 cost_units: u64,
362 hold_id: Option<&str>,
363 event_id: Option<&str>,
364 ) -> Result<(), BudgetStoreError> {
365 let _ = hold_id;
366 let _ = event_id;
367 self.reverse_charge_cost(capability_id, grant_index, cost_units)
368 }
369
370 fn reverse_charge_cost_with_ids_and_authority(
371 &mut self,
372 capability_id: &str,
373 grant_index: usize,
374 cost_units: u64,
375 hold_id: Option<&str>,
376 event_id: Option<&str>,
377 authority: Option<&BudgetEventAuthority>,
378 ) -> Result<(), BudgetStoreError> {
379 let _ = authority;
380 self.reverse_charge_cost_with_ids(capability_id, grant_index, cost_units, hold_id, event_id)
381 }
382
383 fn reduce_charge_cost(
388 &mut self,
389 capability_id: &str,
390 grant_index: usize,
391 cost_units: u64,
392 ) -> Result<(), BudgetStoreError>;
393
394 fn reduce_charge_cost_with_ids(
395 &mut self,
396 capability_id: &str,
397 grant_index: usize,
398 cost_units: u64,
399 hold_id: Option<&str>,
400 event_id: Option<&str>,
401 ) -> Result<(), BudgetStoreError> {
402 let _ = hold_id;
403 let _ = event_id;
404 self.reduce_charge_cost(capability_id, grant_index, cost_units)
405 }
406
407 fn reduce_charge_cost_with_ids_and_authority(
408 &mut self,
409 capability_id: &str,
410 grant_index: usize,
411 cost_units: u64,
412 hold_id: Option<&str>,
413 event_id: Option<&str>,
414 authority: Option<&BudgetEventAuthority>,
415 ) -> Result<(), BudgetStoreError> {
416 let _ = authority;
417 self.reduce_charge_cost_with_ids(capability_id, grant_index, cost_units, hold_id, event_id)
418 }
419
420 fn settle_charge_cost(
427 &mut self,
428 capability_id: &str,
429 grant_index: usize,
430 exposed_cost_units: u64,
431 realized_cost_units: u64,
432 ) -> Result<(), BudgetStoreError>;
433
434 fn settle_charge_cost_with_ids(
435 &mut self,
436 capability_id: &str,
437 grant_index: usize,
438 exposed_cost_units: u64,
439 realized_cost_units: u64,
440 hold_id: Option<&str>,
441 event_id: Option<&str>,
442 ) -> Result<(), BudgetStoreError> {
443 let _ = hold_id;
444 let _ = event_id;
445 self.settle_charge_cost(
446 capability_id,
447 grant_index,
448 exposed_cost_units,
449 realized_cost_units,
450 )
451 }
452
453 #[allow(clippy::too_many_arguments)]
454 fn settle_charge_cost_with_ids_and_authority(
455 &mut self,
456 capability_id: &str,
457 grant_index: usize,
458 exposed_cost_units: u64,
459 realized_cost_units: u64,
460 hold_id: Option<&str>,
461 event_id: Option<&str>,
462 authority: Option<&BudgetEventAuthority>,
463 ) -> Result<(), BudgetStoreError> {
464 let _ = authority;
465 self.settle_charge_cost_with_ids(
466 capability_id,
467 grant_index,
468 exposed_cost_units,
469 realized_cost_units,
470 hold_id,
471 event_id,
472 )
473 }
474
475 fn list_usages(
476 &self,
477 limit: usize,
478 capability_id: Option<&str>,
479 ) -> Result<Vec<BudgetUsageRecord>, BudgetStoreError>;
480
481 fn get_usage(
482 &self,
483 capability_id: &str,
484 grant_index: usize,
485 ) -> Result<Option<BudgetUsageRecord>, BudgetStoreError>;
486
487 fn list_mutation_events(
488 &self,
489 _limit: usize,
490 _capability_id: Option<&str>,
491 _grant_index: Option<usize>,
492 ) -> Result<Vec<BudgetMutationRecord>, BudgetStoreError> {
493 Err(BudgetStoreError::Invariant(
494 "budget mutation events unavailable for this backend".to_string(),
495 ))
496 }
497
498 fn budget_guarantee_level(&self) -> BudgetGuaranteeLevel {
499 BudgetGuaranteeLevel::SingleNodeAtomic
500 }
501
502 fn budget_authority_profile(&self) -> BudgetAuthorityProfile {
503 BudgetAuthorityProfile::AuthoritativeHoldEvent
504 }
505
506 fn budget_metering_profile(&self) -> BudgetMeteringProfile {
507 BudgetMeteringProfile::MaxCostPreauthorizeThenReconcileActual
508 }
509
510 fn authorize_budget_hold(
511 &mut self,
512 request: BudgetAuthorizeHoldRequest,
513 ) -> Result<BudgetAuthorizeHoldDecision, BudgetStoreError> {
514 let allowed = self.try_charge_cost_with_ids_and_authority(
515 &request.capability_id,
516 request.grant_index,
517 request.max_invocations,
518 request.requested_exposure_units,
519 request.max_cost_per_invocation,
520 request.max_total_cost_units,
521 request.hold_id.as_deref(),
522 request.event_id.as_deref(),
523 request.authority.as_ref(),
524 )?;
525 let usage = self.get_usage(&request.capability_id, request.grant_index)?;
526 let committed_cost_units_after = usage
527 .as_ref()
528 .map(BudgetUsageRecord::committed_cost_units)
529 .transpose()?
530 .unwrap_or(0);
531 let invocation_count_after = usage.as_ref().map_or(0, |usage| usage.invocation_count);
532 let metadata = budget_commit_metadata(
533 self,
534 request.authority,
535 allowed
536 .then(|| usage.as_ref().map(|usage| usage.seq))
537 .flatten(),
538 request.event_id,
539 );
540
541 if allowed {
542 Ok(BudgetAuthorizeHoldDecision::Authorized(
543 AuthorizedBudgetHold {
544 hold_id: request.hold_id,
545 authorized_exposure_units: request.requested_exposure_units,
546 committed_cost_units_after,
547 invocation_count_after,
548 metadata,
549 },
550 ))
551 } else {
552 Ok(BudgetAuthorizeHoldDecision::Denied(DeniedBudgetHold {
553 hold_id: request.hold_id,
554 attempted_exposure_units: request.requested_exposure_units,
555 committed_cost_units_after,
556 invocation_count_after,
557 metadata,
558 }))
559 }
560 }
561
562 fn reverse_budget_hold(
563 &mut self,
564 request: BudgetReverseHoldRequest,
565 ) -> Result<BudgetReverseHoldDecision, BudgetStoreError> {
566 self.reverse_charge_cost_with_ids_and_authority(
567 &request.capability_id,
568 request.grant_index,
569 request.reversed_exposure_units,
570 request.hold_id.as_deref(),
571 request.event_id.as_deref(),
572 request.authority.as_ref(),
573 )?;
574 let usage = self.get_usage(&request.capability_id, request.grant_index)?;
575 Ok(BudgetHoldMutationDecision {
576 hold_id: request.hold_id,
577 exposure_units: request.reversed_exposure_units,
578 realized_spend_units: 0,
579 committed_cost_units_after: usage
580 .as_ref()
581 .map(BudgetUsageRecord::committed_cost_units)
582 .transpose()?
583 .unwrap_or(0),
584 invocation_count_after: usage.as_ref().map_or(0, |usage| usage.invocation_count),
585 metadata: budget_commit_metadata(
586 self,
587 request.authority,
588 usage.as_ref().map(|usage| usage.seq),
589 request.event_id,
590 ),
591 })
592 }
593
594 fn release_budget_hold(
595 &mut self,
596 request: BudgetReleaseHoldRequest,
597 ) -> Result<BudgetReleaseHoldDecision, BudgetStoreError> {
598 self.reduce_charge_cost_with_ids_and_authority(
599 &request.capability_id,
600 request.grant_index,
601 request.released_exposure_units,
602 request.hold_id.as_deref(),
603 request.event_id.as_deref(),
604 request.authority.as_ref(),
605 )?;
606 let usage = self.get_usage(&request.capability_id, request.grant_index)?;
607 Ok(BudgetHoldMutationDecision {
608 hold_id: request.hold_id,
609 exposure_units: request.released_exposure_units,
610 realized_spend_units: 0,
611 committed_cost_units_after: usage
612 .as_ref()
613 .map(BudgetUsageRecord::committed_cost_units)
614 .transpose()?
615 .unwrap_or(0),
616 invocation_count_after: usage.as_ref().map_or(0, |usage| usage.invocation_count),
617 metadata: budget_commit_metadata(
618 self,
619 request.authority,
620 usage.as_ref().map(|usage| usage.seq),
621 request.event_id,
622 ),
623 })
624 }
625
626 fn reconcile_budget_hold(
627 &mut self,
628 request: BudgetReconcileHoldRequest,
629 ) -> Result<BudgetReconcileHoldDecision, BudgetStoreError> {
630 self.settle_charge_cost_with_ids_and_authority(
631 &request.capability_id,
632 request.grant_index,
633 request.exposed_cost_units,
634 request.realized_spend_units,
635 request.hold_id.as_deref(),
636 request.event_id.as_deref(),
637 request.authority.as_ref(),
638 )?;
639 let usage = self.get_usage(&request.capability_id, request.grant_index)?;
640 Ok(BudgetHoldMutationDecision {
641 hold_id: request.hold_id,
642 exposure_units: request.exposed_cost_units,
643 realized_spend_units: request.realized_spend_units,
644 committed_cost_units_after: usage
645 .as_ref()
646 .map(BudgetUsageRecord::committed_cost_units)
647 .transpose()?
648 .unwrap_or(0),
649 invocation_count_after: usage.as_ref().map_or(0, |usage| usage.invocation_count),
650 metadata: budget_commit_metadata(
651 self,
652 request.authority,
653 usage.as_ref().map(|usage| usage.seq),
654 request.event_id,
655 ),
656 })
657 }
658
659 fn capture_budget_hold(
660 &mut self,
661 request: BudgetCaptureHoldRequest,
662 ) -> Result<BudgetCaptureHoldDecision, BudgetStoreError> {
663 self.reconcile_budget_hold(request)
664 }
665}
666
667#[derive(Debug, Clone, PartialEq, Eq)]
668enum BudgetHoldDisposition {
669 Open,
670 Released,
671 Reversed,
672 Reconciled,
673}
674
675#[derive(Debug, Clone, PartialEq, Eq)]
676struct BudgetHoldState {
677 capability_id: String,
678 grant_index: usize,
679 authorized_exposure_units: u64,
680 remaining_exposure_units: u64,
681 invocation_count_debited: bool,
682 disposition: BudgetHoldDisposition,
683 authority: Option<BudgetEventAuthority>,
684}
685
686#[derive(Debug, Clone, PartialEq, Eq)]
687enum BudgetMutationRequest {
688 Increment {
689 capability_id: String,
690 grant_index: usize,
691 max_invocations: Option<u32>,
692 },
693 Authorize {
694 capability_id: String,
695 grant_index: usize,
696 hold_id: Option<String>,
697 authority: Option<BudgetEventAuthority>,
698 cost_units: u64,
699 max_invocations: Option<u32>,
700 max_cost_per_invocation: Option<u64>,
701 max_total_cost_units: Option<u64>,
702 },
703 Reverse {
704 capability_id: String,
705 grant_index: usize,
706 hold_id: Option<String>,
707 authority: Option<BudgetEventAuthority>,
708 cost_units: u64,
709 },
710 Release {
711 capability_id: String,
712 grant_index: usize,
713 hold_id: Option<String>,
714 authority: Option<BudgetEventAuthority>,
715 cost_units: u64,
716 },
717 Reconcile {
718 capability_id: String,
719 grant_index: usize,
720 hold_id: Option<String>,
721 authority: Option<BudgetEventAuthority>,
722 exposed_cost_units: u64,
723 realized_cost_units: u64,
724 },
725}
726
727#[derive(Debug, Clone)]
728struct RecordedBudgetMutation {
729 request: BudgetMutationRequest,
730 record: BudgetMutationRecord,
731}
732
733#[derive(Default)]
734pub struct InMemoryBudgetStore {
735 counts: HashMap<(String, usize), BudgetUsageRecord>,
736 events: Vec<BudgetMutationRecord>,
737 explicit_events: HashMap<String, RecordedBudgetMutation>,
738 holds: HashMap<String, BudgetHoldState>,
739 next_seq: u64,
740 next_event_ordinal: u64,
741}
742
743impl InMemoryBudgetStore {
744 pub fn new() -> Self {
745 Self::default()
746 }
747
748 fn next_event_id(&mut self) -> String {
749 self.next_event_ordinal = self.next_event_ordinal.saturating_add(1);
750 format!("local-budget-event-{}", self.next_event_ordinal)
751 }
752
753 fn duplicate_mutation(
754 &self,
755 event_id: Option<&str>,
756 request: &BudgetMutationRequest,
757 ) -> Result<Option<RecordedBudgetMutation>, BudgetStoreError> {
758 let Some(event_id) = event_id else {
759 return Ok(None);
760 };
761 let Some(existing) = self.explicit_events.get(event_id) else {
762 return Ok(None);
763 };
764 if &existing.request != request {
765 return Err(BudgetStoreError::Invariant(format!(
766 "budget event_id `{event_id}` was reused for a different mutation"
767 )));
768 }
769 Ok(Some(existing.clone()))
770 }
771
772 fn append_mutation(
773 &mut self,
774 explicit_event_id: Option<&str>,
775 request: BudgetMutationRequest,
776 mut record: BudgetMutationRecord,
777 ) {
778 let event_id = explicit_event_id
779 .map(ToOwned::to_owned)
780 .unwrap_or_else(|| self.next_event_id());
781 record.event_id = event_id.clone();
782 self.events.push(record.clone());
783 if explicit_event_id.is_some() {
784 self.explicit_events
785 .insert(event_id, RecordedBudgetMutation { request, record });
786 }
787 }
788
789 fn validate_open_hold(
790 &self,
791 hold_id: &str,
792 capability_id: &str,
793 grant_index: usize,
794 ) -> Result<&BudgetHoldState, BudgetStoreError> {
795 let hold = self.holds.get(hold_id).ok_or_else(|| {
796 BudgetStoreError::Invariant(format!("missing budget hold `{hold_id}`"))
797 })?;
798 if hold.capability_id != capability_id || hold.grant_index != grant_index {
799 return Err(BudgetStoreError::Invariant(format!(
800 "budget hold `{hold_id}` does not match capability/grant"
801 )));
802 }
803 if hold.disposition != BudgetHoldDisposition::Open {
804 return Err(BudgetStoreError::Invariant(format!(
805 "budget hold `{hold_id}` is no longer open"
806 )));
807 }
808 Ok(hold)
809 }
810
811 fn validate_hold_authority(
812 hold_id: &str,
813 current: Option<&BudgetEventAuthority>,
814 requested: Option<&BudgetEventAuthority>,
815 ) -> Result<Option<BudgetEventAuthority>, BudgetStoreError> {
816 match (current, requested) {
817 (None, None) => Ok(None),
818 (None, Some(_)) => Err(BudgetStoreError::Invariant(format!(
819 "budget hold `{hold_id}` was created without authority lease metadata"
820 ))),
821 (Some(_), None) => Err(BudgetStoreError::Invariant(format!(
822 "budget hold `{hold_id}` requires authority lease metadata"
823 ))),
824 (Some(current), Some(requested)) => {
825 if current.authority_id != requested.authority_id {
826 return Err(BudgetStoreError::Invariant(format!(
827 "budget hold `{hold_id}` authority_id does not match the open lease"
828 )));
829 }
830 if requested.lease_id != current.lease_id {
831 return Err(BudgetStoreError::Invariant(format!(
832 "budget hold `{hold_id}` lease_id does not match the open lease epoch"
833 )));
834 }
835 if requested.lease_epoch < current.lease_epoch {
836 return Err(BudgetStoreError::Invariant(format!(
837 "budget hold `{hold_id}` authority lease epoch regressed"
838 )));
839 }
840 if requested.lease_epoch > current.lease_epoch {
841 return Err(BudgetStoreError::Invariant(format!(
842 "budget hold `{hold_id}` authority lease epoch advanced beyond the open lease"
843 )));
844 }
845 Ok(Some(requested.clone()))
846 }
847 }
848 }
849
850 fn default_usage_record(capability_id: &str, grant_index: usize) -> BudgetUsageRecord {
851 BudgetUsageRecord {
852 capability_id: capability_id.to_string(),
853 grant_index: grant_index as u32,
854 invocation_count: 0,
855 updated_at: unix_now(),
856 seq: 0,
857 total_cost_exposed: 0,
858 total_cost_realized_spend: 0,
859 }
860 }
861}
862
863fn checked_committed_cost_units(
864 total_cost_exposed: u64,
865 total_cost_realized_spend: u64,
866) -> Result<u64, BudgetStoreError> {
867 total_cost_exposed
868 .checked_add(total_cost_realized_spend)
869 .ok_or_else(|| {
870 BudgetStoreError::Overflow(
871 "total_cost_exposed + total_cost_realized_spend overflowed u64".to_string(),
872 )
873 })
874}
875
876impl BudgetStore for InMemoryBudgetStore {
877 fn try_increment(
878 &mut self,
879 capability_id: &str,
880 grant_index: usize,
881 max_invocations: Option<u32>,
882 ) -> Result<bool, BudgetStoreError> {
883 let request = BudgetMutationRequest::Increment {
884 capability_id: capability_id.to_string(),
885 grant_index,
886 max_invocations,
887 };
888 let key = (capability_id.to_string(), grant_index);
889 let current = self
890 .counts
891 .get(&key)
892 .cloned()
893 .unwrap_or_else(|| Self::default_usage_record(capability_id, grant_index));
894 let allowed = max_invocations.is_none_or(|max| current.invocation_count < max);
895 let recorded_at = unix_now();
896 let event_seq = self.next_seq.saturating_add(1);
897 self.next_seq = event_seq;
898 let usage_seq = if allowed {
899 let entry = self
900 .counts
901 .entry(key)
902 .or_insert_with(|| Self::default_usage_record(capability_id, grant_index));
903 entry.invocation_count = current.invocation_count.saturating_add(1);
904 entry.updated_at = recorded_at;
905 entry.seq = event_seq;
906 Some(event_seq)
907 } else {
908 None
909 };
910 self.append_mutation(
911 None,
912 request,
913 BudgetMutationRecord {
914 event_id: String::new(),
915 hold_id: None,
916 capability_id: capability_id.to_string(),
917 grant_index: grant_index as u32,
918 kind: BudgetMutationKind::IncrementInvocation,
919 allowed: Some(allowed),
920 recorded_at,
921 event_seq,
922 usage_seq,
923 exposure_units: 0,
924 realized_spend_units: 0,
925 max_invocations,
926 max_cost_per_invocation: None,
927 max_total_cost_units: None,
928 invocation_count_after: if allowed {
929 current.invocation_count.saturating_add(1)
930 } else {
931 current.invocation_count
932 },
933 total_cost_exposed_after: current.total_cost_exposed,
934 total_cost_realized_spend_after: current.total_cost_realized_spend,
935 authority: None,
936 },
937 );
938 Ok(allowed)
939 }
940
941 fn try_charge_cost(
942 &mut self,
943 capability_id: &str,
944 grant_index: usize,
945 max_invocations: Option<u32>,
946 cost_units: u64,
947 max_cost_per_invocation: Option<u64>,
948 max_total_cost_units: Option<u64>,
949 ) -> Result<bool, BudgetStoreError> {
950 self.try_charge_cost_with_ids(
951 capability_id,
952 grant_index,
953 max_invocations,
954 cost_units,
955 max_cost_per_invocation,
956 max_total_cost_units,
957 None,
958 None,
959 )
960 }
961
962 fn try_charge_cost_with_ids(
963 &mut self,
964 capability_id: &str,
965 grant_index: usize,
966 max_invocations: Option<u32>,
967 cost_units: u64,
968 max_cost_per_invocation: Option<u64>,
969 max_total_cost_units: Option<u64>,
970 hold_id: Option<&str>,
971 event_id: Option<&str>,
972 ) -> Result<bool, BudgetStoreError> {
973 self.try_charge_cost_with_ids_and_authority(
974 capability_id,
975 grant_index,
976 max_invocations,
977 cost_units,
978 max_cost_per_invocation,
979 max_total_cost_units,
980 hold_id,
981 event_id,
982 None,
983 )
984 }
985
986 fn try_charge_cost_with_ids_and_authority(
987 &mut self,
988 capability_id: &str,
989 grant_index: usize,
990 max_invocations: Option<u32>,
991 cost_units: u64,
992 max_cost_per_invocation: Option<u64>,
993 max_total_cost_units: Option<u64>,
994 hold_id: Option<&str>,
995 event_id: Option<&str>,
996 authority: Option<&BudgetEventAuthority>,
997 ) -> Result<bool, BudgetStoreError> {
998 let request = BudgetMutationRequest::Authorize {
999 capability_id: capability_id.to_string(),
1000 grant_index,
1001 hold_id: hold_id.map(ToOwned::to_owned),
1002 authority: authority.cloned(),
1003 cost_units,
1004 max_invocations,
1005 max_cost_per_invocation,
1006 max_total_cost_units,
1007 };
1008 if let Some(existing) = self.duplicate_mutation(event_id, &request)? {
1009 return Ok(existing.record.allowed.unwrap_or(false));
1010 }
1011
1012 let key = (capability_id.to_string(), grant_index);
1013 let current = self
1014 .counts
1015 .get(&key)
1016 .cloned()
1017 .unwrap_or_else(|| Self::default_usage_record(capability_id, grant_index));
1018
1019 let mut allowed = true;
1020 if let Some(max) = max_invocations {
1021 if current.invocation_count >= max {
1022 allowed = false;
1023 }
1024 }
1025 if let Some(max_per) = max_cost_per_invocation {
1026 if cost_units > max_per {
1027 allowed = false;
1028 }
1029 }
1030 if let Some(max_total) = max_total_cost_units {
1031 let current_total = checked_committed_cost_units(
1032 current.total_cost_exposed,
1033 current.total_cost_realized_spend,
1034 )?;
1035 let new_total = current_total.checked_add(cost_units).ok_or_else(|| {
1036 BudgetStoreError::Overflow(
1037 "authorized exposure + cost_units overflowed u64".to_string(),
1038 )
1039 })?;
1040 if new_total > max_total {
1041 allowed = false;
1042 }
1043 }
1044
1045 let recorded_at = unix_now();
1046 let (invocation_count_after, total_cost_exposed_after, total_cost_realized_spend_after);
1047 let event_seq;
1048 let mut usage_seq = None;
1049
1050 if allowed {
1051 if let Some(hold_id) = hold_id {
1052 if self.holds.contains_key(hold_id) {
1053 return Err(BudgetStoreError::Invariant(format!(
1054 "budget hold `{hold_id}` already exists"
1055 )));
1056 }
1057 }
1058 let new_total_cost_exposed = current
1059 .total_cost_exposed
1060 .checked_add(cost_units)
1061 .ok_or_else(|| {
1062 BudgetStoreError::Overflow(
1063 "total_cost_exposed + cost_units overflowed u64".to_string(),
1064 )
1065 })?;
1066 event_seq = self.next_seq.saturating_add(1);
1067 self.next_seq = event_seq;
1068 let entry = self
1069 .counts
1070 .entry(key)
1071 .or_insert_with(|| Self::default_usage_record(capability_id, grant_index));
1072 entry.invocation_count = current.invocation_count.saturating_add(1);
1073 entry.total_cost_exposed = new_total_cost_exposed;
1074 entry.updated_at = recorded_at;
1075 entry.seq = event_seq;
1076 if let Some(hold_id) = hold_id {
1077 self.holds.insert(
1078 hold_id.to_string(),
1079 BudgetHoldState {
1080 capability_id: capability_id.to_string(),
1081 grant_index,
1082 authorized_exposure_units: cost_units,
1083 remaining_exposure_units: cost_units,
1084 invocation_count_debited: true,
1085 disposition: BudgetHoldDisposition::Open,
1086 authority: authority.cloned(),
1087 },
1088 );
1089 }
1090 invocation_count_after = entry.invocation_count;
1091 total_cost_exposed_after = entry.total_cost_exposed;
1092 total_cost_realized_spend_after = entry.total_cost_realized_spend;
1093 usage_seq = Some(event_seq);
1094 } else {
1095 event_seq = self.next_seq.saturating_add(1);
1096 self.next_seq = event_seq;
1097 invocation_count_after = current.invocation_count;
1098 total_cost_exposed_after = current.total_cost_exposed;
1099 total_cost_realized_spend_after = current.total_cost_realized_spend;
1100 }
1101
1102 self.append_mutation(
1103 event_id,
1104 request,
1105 BudgetMutationRecord {
1106 event_id: String::new(),
1107 hold_id: hold_id.map(ToOwned::to_owned),
1108 capability_id: capability_id.to_string(),
1109 grant_index: grant_index as u32,
1110 kind: BudgetMutationKind::AuthorizeExposure,
1111 allowed: Some(allowed),
1112 recorded_at,
1113 event_seq,
1114 usage_seq,
1115 exposure_units: cost_units,
1116 realized_spend_units: 0,
1117 max_invocations,
1118 max_cost_per_invocation,
1119 max_total_cost_units,
1120 invocation_count_after,
1121 total_cost_exposed_after,
1122 total_cost_realized_spend_after,
1123 authority: authority.cloned(),
1124 },
1125 );
1126
1127 Ok(allowed)
1128 }
1129
1130 fn reverse_charge_cost(
1131 &mut self,
1132 capability_id: &str,
1133 grant_index: usize,
1134 cost_units: u64,
1135 ) -> Result<(), BudgetStoreError> {
1136 self.reverse_charge_cost_with_ids(capability_id, grant_index, cost_units, None, None)
1137 }
1138
1139 fn reverse_charge_cost_with_ids(
1140 &mut self,
1141 capability_id: &str,
1142 grant_index: usize,
1143 cost_units: u64,
1144 hold_id: Option<&str>,
1145 event_id: Option<&str>,
1146 ) -> Result<(), BudgetStoreError> {
1147 self.reverse_charge_cost_with_ids_and_authority(
1148 capability_id,
1149 grant_index,
1150 cost_units,
1151 hold_id,
1152 event_id,
1153 None,
1154 )
1155 }
1156
1157 fn reverse_charge_cost_with_ids_and_authority(
1158 &mut self,
1159 capability_id: &str,
1160 grant_index: usize,
1161 cost_units: u64,
1162 hold_id: Option<&str>,
1163 event_id: Option<&str>,
1164 authority: Option<&BudgetEventAuthority>,
1165 ) -> Result<(), BudgetStoreError> {
1166 let request = BudgetMutationRequest::Reverse {
1167 capability_id: capability_id.to_string(),
1168 grant_index,
1169 hold_id: hold_id.map(ToOwned::to_owned),
1170 authority: authority.cloned(),
1171 cost_units,
1172 };
1173 if self.duplicate_mutation(event_id, &request)?.is_some() {
1174 return Ok(());
1175 }
1176 if let Some(hold_id) = hold_id {
1177 let hold = self.validate_open_hold(hold_id, capability_id, grant_index)?;
1178 if hold.remaining_exposure_units != cost_units || !hold.invocation_count_debited {
1179 return Err(BudgetStoreError::Invariant(format!(
1180 "budget hold `{hold_id}` does not match reverse amount"
1181 )));
1182 }
1183 Self::validate_hold_authority(hold_id, hold.authority.as_ref(), authority)?;
1184 }
1185
1186 let key = (capability_id.to_string(), grant_index);
1187 let (
1188 invocation_count_after,
1189 total_cost_exposed_after,
1190 total_cost_realized_spend_after,
1191 seq,
1192 );
1193 {
1194 let entry = self.counts.get_mut(&key).ok_or_else(|| {
1195 BudgetStoreError::Invariant("missing charged budget row".to_string())
1196 })?;
1197 if entry.invocation_count == 0 {
1198 return Err(BudgetStoreError::Invariant(
1199 "cannot reverse charge with zero invocation_count".to_string(),
1200 ));
1201 }
1202 if entry.total_cost_exposed < cost_units {
1203 return Err(BudgetStoreError::Invariant(
1204 "cannot reverse charge larger than total_cost_exposed".to_string(),
1205 ));
1206 }
1207 let next_seq = self.next_seq.saturating_add(1);
1208 self.next_seq = next_seq;
1209 entry.invocation_count -= 1;
1210 entry.total_cost_exposed -= cost_units;
1211 entry.updated_at = unix_now();
1212 entry.seq = next_seq;
1213 invocation_count_after = entry.invocation_count;
1214 total_cost_exposed_after = entry.total_cost_exposed;
1215 total_cost_realized_spend_after = entry.total_cost_realized_spend;
1216 seq = entry.seq;
1217 }
1218 if let Some(hold_id) = hold_id {
1219 let Some(hold) = self.holds.get_mut(hold_id) else {
1220 return Err(BudgetStoreError::Invariant(
1221 "validated hold missing during reverse_charge_cost".to_string(),
1222 ));
1223 };
1224 hold.remaining_exposure_units = 0;
1225 hold.disposition = BudgetHoldDisposition::Reversed;
1226 hold.authority = authority.cloned().or_else(|| hold.authority.clone());
1227 }
1228 self.append_mutation(
1229 event_id,
1230 request,
1231 BudgetMutationRecord {
1232 event_id: String::new(),
1233 hold_id: hold_id.map(ToOwned::to_owned),
1234 capability_id: capability_id.to_string(),
1235 grant_index: grant_index as u32,
1236 kind: BudgetMutationKind::ReverseExposure,
1237 allowed: None,
1238 recorded_at: unix_now(),
1239 event_seq: seq,
1240 usage_seq: Some(seq),
1241 exposure_units: cost_units,
1242 realized_spend_units: 0,
1243 max_invocations: None,
1244 max_cost_per_invocation: None,
1245 max_total_cost_units: None,
1246 invocation_count_after,
1247 total_cost_exposed_after,
1248 total_cost_realized_spend_after,
1249 authority: authority.cloned(),
1250 },
1251 );
1252 Ok(())
1253 }
1254
1255 fn reduce_charge_cost(
1256 &mut self,
1257 capability_id: &str,
1258 grant_index: usize,
1259 cost_units: u64,
1260 ) -> Result<(), BudgetStoreError> {
1261 self.reduce_charge_cost_with_ids(capability_id, grant_index, cost_units, None, None)
1262 }
1263
1264 fn reduce_charge_cost_with_ids(
1265 &mut self,
1266 capability_id: &str,
1267 grant_index: usize,
1268 cost_units: u64,
1269 hold_id: Option<&str>,
1270 event_id: Option<&str>,
1271 ) -> Result<(), BudgetStoreError> {
1272 self.reduce_charge_cost_with_ids_and_authority(
1273 capability_id,
1274 grant_index,
1275 cost_units,
1276 hold_id,
1277 event_id,
1278 None,
1279 )
1280 }
1281
1282 fn reduce_charge_cost_with_ids_and_authority(
1283 &mut self,
1284 capability_id: &str,
1285 grant_index: usize,
1286 cost_units: u64,
1287 hold_id: Option<&str>,
1288 event_id: Option<&str>,
1289 authority: Option<&BudgetEventAuthority>,
1290 ) -> Result<(), BudgetStoreError> {
1291 let request = BudgetMutationRequest::Release {
1292 capability_id: capability_id.to_string(),
1293 grant_index,
1294 hold_id: hold_id.map(ToOwned::to_owned),
1295 authority: authority.cloned(),
1296 cost_units,
1297 };
1298 if self.duplicate_mutation(event_id, &request)?.is_some() {
1299 return Ok(());
1300 }
1301 if let Some(hold_id) = hold_id {
1302 let hold = self.validate_open_hold(hold_id, capability_id, grant_index)?;
1303 if hold.remaining_exposure_units < cost_units {
1304 return Err(BudgetStoreError::Invariant(format!(
1305 "budget hold `{hold_id}` cannot release more than remaining exposure"
1306 )));
1307 }
1308 Self::validate_hold_authority(hold_id, hold.authority.as_ref(), authority)?;
1309 }
1310
1311 let key = (capability_id.to_string(), grant_index);
1312 let (
1313 invocation_count_after,
1314 total_cost_exposed_after,
1315 total_cost_realized_spend_after,
1316 seq,
1317 );
1318 {
1319 let entry = self.counts.get_mut(&key).ok_or_else(|| {
1320 BudgetStoreError::Invariant("missing charged budget row".to_string())
1321 })?;
1322
1323 if entry.total_cost_exposed < cost_units {
1324 return Err(BudgetStoreError::Invariant(
1325 "cannot reduce charge larger than total_cost_exposed".to_string(),
1326 ));
1327 }
1328
1329 let next_seq = self.next_seq.saturating_add(1);
1330 self.next_seq = next_seq;
1331 entry.total_cost_exposed -= cost_units;
1332 entry.updated_at = unix_now();
1333 entry.seq = next_seq;
1334 invocation_count_after = entry.invocation_count;
1335 total_cost_exposed_after = entry.total_cost_exposed;
1336 total_cost_realized_spend_after = entry.total_cost_realized_spend;
1337 seq = entry.seq;
1338 }
1339 if let Some(hold_id) = hold_id {
1340 let Some(hold) = self.holds.get_mut(hold_id) else {
1341 return Err(BudgetStoreError::Invariant(
1342 "validated hold missing during release_charge_cost".to_string(),
1343 ));
1344 };
1345 hold.remaining_exposure_units -= cost_units;
1346 if hold.remaining_exposure_units == 0 {
1347 hold.disposition = BudgetHoldDisposition::Released;
1348 }
1349 hold.authority = authority.cloned().or_else(|| hold.authority.clone());
1350 }
1351 self.append_mutation(
1352 event_id,
1353 request,
1354 BudgetMutationRecord {
1355 event_id: String::new(),
1356 hold_id: hold_id.map(ToOwned::to_owned),
1357 capability_id: capability_id.to_string(),
1358 grant_index: grant_index as u32,
1359 kind: BudgetMutationKind::ReleaseExposure,
1360 allowed: None,
1361 recorded_at: unix_now(),
1362 event_seq: seq,
1363 usage_seq: Some(seq),
1364 exposure_units: cost_units,
1365 realized_spend_units: 0,
1366 max_invocations: None,
1367 max_cost_per_invocation: None,
1368 max_total_cost_units: None,
1369 invocation_count_after,
1370 total_cost_exposed_after,
1371 total_cost_realized_spend_after,
1372 authority: authority.cloned(),
1373 },
1374 );
1375 Ok(())
1376 }
1377
1378 fn settle_charge_cost(
1379 &mut self,
1380 capability_id: &str,
1381 grant_index: usize,
1382 exposed_cost_units: u64,
1383 realized_cost_units: u64,
1384 ) -> Result<(), BudgetStoreError> {
1385 self.settle_charge_cost_with_ids(
1386 capability_id,
1387 grant_index,
1388 exposed_cost_units,
1389 realized_cost_units,
1390 None,
1391 None,
1392 )
1393 }
1394
1395 fn settle_charge_cost_with_ids(
1396 &mut self,
1397 capability_id: &str,
1398 grant_index: usize,
1399 exposed_cost_units: u64,
1400 realized_cost_units: u64,
1401 hold_id: Option<&str>,
1402 event_id: Option<&str>,
1403 ) -> Result<(), BudgetStoreError> {
1404 self.settle_charge_cost_with_ids_and_authority(
1405 capability_id,
1406 grant_index,
1407 exposed_cost_units,
1408 realized_cost_units,
1409 hold_id,
1410 event_id,
1411 None,
1412 )
1413 }
1414
1415 fn settle_charge_cost_with_ids_and_authority(
1416 &mut self,
1417 capability_id: &str,
1418 grant_index: usize,
1419 exposed_cost_units: u64,
1420 realized_cost_units: u64,
1421 hold_id: Option<&str>,
1422 event_id: Option<&str>,
1423 authority: Option<&BudgetEventAuthority>,
1424 ) -> Result<(), BudgetStoreError> {
1425 if realized_cost_units > exposed_cost_units {
1426 return Err(BudgetStoreError::Invariant(
1427 "cannot realize spend larger than exposed cost".to_string(),
1428 ));
1429 }
1430 let request = BudgetMutationRequest::Reconcile {
1431 capability_id: capability_id.to_string(),
1432 grant_index,
1433 hold_id: hold_id.map(ToOwned::to_owned),
1434 authority: authority.cloned(),
1435 exposed_cost_units,
1436 realized_cost_units,
1437 };
1438 if self.duplicate_mutation(event_id, &request)?.is_some() {
1439 return Ok(());
1440 }
1441 if let Some(hold_id) = hold_id {
1442 let hold = self.validate_open_hold(hold_id, capability_id, grant_index)?;
1443 if hold.remaining_exposure_units != exposed_cost_units {
1444 return Err(BudgetStoreError::Invariant(format!(
1445 "budget hold `{hold_id}` does not match reconciled exposure"
1446 )));
1447 }
1448 Self::validate_hold_authority(hold_id, hold.authority.as_ref(), authority)?;
1449 }
1450
1451 let key = (capability_id.to_string(), grant_index);
1452 let (
1453 invocation_count_after,
1454 total_cost_exposed_after,
1455 total_cost_realized_spend_after,
1456 seq,
1457 );
1458 {
1459 let entry = self.counts.get_mut(&key).ok_or_else(|| {
1460 BudgetStoreError::Invariant("missing charged budget row".to_string())
1461 })?;
1462
1463 if entry.invocation_count == 0 {
1464 return Err(BudgetStoreError::Invariant(
1465 "cannot settle charge with zero invocation_count".to_string(),
1466 ));
1467 }
1468 if entry.total_cost_exposed < exposed_cost_units {
1469 return Err(BudgetStoreError::Invariant(
1470 "cannot settle more exposure than total_cost_exposed".to_string(),
1471 ));
1472 }
1473
1474 entry.total_cost_realized_spend = entry
1475 .total_cost_realized_spend
1476 .checked_add(realized_cost_units)
1477 .ok_or_else(|| {
1478 BudgetStoreError::Overflow(
1479 "total_cost_realized_spend + realized_cost_units overflowed u64"
1480 .to_string(),
1481 )
1482 })?;
1483 entry.total_cost_exposed -= exposed_cost_units;
1484
1485 let next_seq = self.next_seq.saturating_add(1);
1486 self.next_seq = next_seq;
1487 entry.updated_at = unix_now();
1488 entry.seq = next_seq;
1489 invocation_count_after = entry.invocation_count;
1490 total_cost_exposed_after = entry.total_cost_exposed;
1491 total_cost_realized_spend_after = entry.total_cost_realized_spend;
1492 seq = entry.seq;
1493 }
1494 if let Some(hold_id) = hold_id {
1495 let Some(hold) = self.holds.get_mut(hold_id) else {
1496 return Err(BudgetStoreError::Invariant(
1497 "validated hold missing during settle_charge_cost".to_string(),
1498 ));
1499 };
1500 hold.remaining_exposure_units = 0;
1501 hold.disposition = BudgetHoldDisposition::Reconciled;
1502 hold.authority = authority.cloned().or_else(|| hold.authority.clone());
1503 }
1504 self.append_mutation(
1505 event_id,
1506 request,
1507 BudgetMutationRecord {
1508 event_id: String::new(),
1509 hold_id: hold_id.map(ToOwned::to_owned),
1510 capability_id: capability_id.to_string(),
1511 grant_index: grant_index as u32,
1512 kind: BudgetMutationKind::ReconcileSpend,
1513 allowed: None,
1514 recorded_at: unix_now(),
1515 event_seq: seq,
1516 usage_seq: Some(seq),
1517 exposure_units: exposed_cost_units,
1518 realized_spend_units: realized_cost_units,
1519 max_invocations: None,
1520 max_cost_per_invocation: None,
1521 max_total_cost_units: None,
1522 invocation_count_after,
1523 total_cost_exposed_after,
1524 total_cost_realized_spend_after,
1525 authority: authority.cloned(),
1526 },
1527 );
1528 Ok(())
1529 }
1530
1531 fn list_usages(
1532 &self,
1533 limit: usize,
1534 capability_id: Option<&str>,
1535 ) -> Result<Vec<BudgetUsageRecord>, BudgetStoreError> {
1536 let mut records = self
1537 .counts
1538 .values()
1539 .filter(|record| capability_id.is_none_or(|value| record.capability_id == value))
1540 .cloned()
1541 .collect::<Vec<_>>();
1542 records.sort_by(|left, right| {
1543 right
1544 .updated_at
1545 .cmp(&left.updated_at)
1546 .then_with(|| left.capability_id.cmp(&right.capability_id))
1547 .then_with(|| left.grant_index.cmp(&right.grant_index))
1548 });
1549 records.truncate(limit);
1550 Ok(records)
1551 }
1552
1553 fn get_usage(
1554 &self,
1555 capability_id: &str,
1556 grant_index: usize,
1557 ) -> Result<Option<BudgetUsageRecord>, BudgetStoreError> {
1558 Ok(self
1559 .counts
1560 .get(&(capability_id.to_string(), grant_index))
1561 .cloned())
1562 }
1563
1564 fn list_mutation_events(
1565 &self,
1566 limit: usize,
1567 capability_id: Option<&str>,
1568 grant_index: Option<usize>,
1569 ) -> Result<Vec<BudgetMutationRecord>, BudgetStoreError> {
1570 let mut events = self
1571 .events
1572 .iter()
1573 .filter(|record| capability_id.is_none_or(|value| record.capability_id == value))
1574 .filter(|record| grant_index.is_none_or(|value| record.grant_index == value as u32))
1575 .cloned()
1576 .collect::<Vec<_>>();
1577 events.truncate(limit);
1578 Ok(events)
1579 }
1580}
1581
1582fn unix_now() -> i64 {
1583 SystemTime::now()
1584 .duration_since(UNIX_EPOCH)
1585 .map(|duration| duration.as_secs() as i64)
1586 .unwrap_or(0)
1587}
1588
1589#[cfg(test)]
1590mod tests {
1591 use super::*;
1592
1593 #[test]
1594 fn authorize_and_reconcile_hold_preserve_authority_metadata() {
1595 let mut store = InMemoryBudgetStore::new();
1596 let authority = BudgetEventAuthority {
1597 authority_id: "kernel:test-authority".to_string(),
1598 lease_id: "single-node".to_string(),
1599 lease_epoch: 0,
1600 };
1601
1602 let decision = store
1603 .authorize_budget_hold(BudgetAuthorizeHoldRequest {
1604 capability_id: "cap-budget-1".to_string(),
1605 grant_index: 0,
1606 max_invocations: Some(4),
1607 requested_exposure_units: 100,
1608 max_cost_per_invocation: Some(100),
1609 max_total_cost_units: Some(1_000),
1610 hold_id: Some("hold-budget-1".to_string()),
1611 event_id: Some("hold-budget-1:authorize".to_string()),
1612 authority: Some(authority.clone()),
1613 })
1614 .unwrap();
1615 let BudgetAuthorizeHoldDecision::Authorized(authorized) = decision else {
1616 panic!("budget hold should be authorized");
1617 };
1618 assert_eq!(authorized.committed_cost_units_after, 100);
1619 assert_eq!(
1620 authorized.metadata.event_id.as_deref(),
1621 Some("hold-budget-1:authorize")
1622 );
1623 assert_eq!(authorized.metadata.budget_commit_index, Some(1));
1624 assert_eq!(
1625 authorized.metadata.budget_term().as_deref(),
1626 Some("kernel:test-authority:0")
1627 );
1628
1629 let reconcile = store
1630 .reconcile_budget_hold(BudgetReconcileHoldRequest {
1631 capability_id: "cap-budget-1".to_string(),
1632 grant_index: 0,
1633 exposed_cost_units: 100,
1634 realized_spend_units: 75,
1635 hold_id: Some("hold-budget-1".to_string()),
1636 event_id: Some("hold-budget-1:reconcile".to_string()),
1637 authority: Some(authority.clone()),
1638 })
1639 .unwrap();
1640 assert_eq!(reconcile.committed_cost_units_after, 75);
1641 assert_eq!(reconcile.realized_spend_units, 75);
1642 assert_eq!(
1643 reconcile.metadata.event_id.as_deref(),
1644 Some("hold-budget-1:reconcile")
1645 );
1646 assert_eq!(reconcile.metadata.budget_commit_index, Some(2));
1647 assert_eq!(reconcile.metadata.authority.as_ref(), Some(&authority));
1648
1649 let usage = store.get_usage("cap-budget-1", 0).unwrap().unwrap();
1650 assert_eq!(usage.total_cost_exposed, 0);
1651 assert_eq!(usage.total_cost_realized_spend, 75);
1652 assert_eq!(usage.committed_cost_units().unwrap(), 75);
1653
1654 let events = store
1655 .list_mutation_events(10, Some("cap-budget-1"), Some(0))
1656 .unwrap();
1657 assert_eq!(events.len(), 2);
1658 assert_eq!(events[0].kind, BudgetMutationKind::AuthorizeExposure);
1659 assert_eq!(events[0].authority.as_ref(), Some(&authority));
1660 assert_eq!(events[1].kind, BudgetMutationKind::ReconcileSpend);
1661 assert_eq!(events[1].authority.as_ref(), Some(&authority));
1662 assert_eq!(events[1].realized_spend_units, 75);
1663 }
1664
1665 #[test]
1666 fn denied_authorize_hold_reports_guarantee_metadata_without_commit_index() {
1667 let mut store = InMemoryBudgetStore::new();
1668 let authority = BudgetEventAuthority {
1669 authority_id: "kernel:test-authority".to_string(),
1670 lease_id: "single-node".to_string(),
1671 lease_epoch: 0,
1672 };
1673
1674 let decision = store
1675 .authorize_budget_hold(BudgetAuthorizeHoldRequest {
1676 capability_id: "cap-budget-deny".to_string(),
1677 grant_index: 0,
1678 max_invocations: Some(1),
1679 requested_exposure_units: 150,
1680 max_cost_per_invocation: Some(100),
1681 max_total_cost_units: Some(1_000),
1682 hold_id: Some("hold-budget-deny".to_string()),
1683 event_id: Some("hold-budget-deny:authorize".to_string()),
1684 authority: Some(authority.clone()),
1685 })
1686 .unwrap();
1687 let BudgetAuthorizeHoldDecision::Denied(denied) = decision else {
1688 panic!("budget hold should be denied");
1689 };
1690 assert_eq!(denied.committed_cost_units_after, 0);
1691 assert_eq!(denied.invocation_count_after, 0);
1692 assert_eq!(
1693 denied.metadata.event_id.as_deref(),
1694 Some("hold-budget-deny:authorize")
1695 );
1696 assert_eq!(denied.metadata.budget_commit_index, None);
1697 assert_eq!(
1698 denied.metadata.guarantee_level,
1699 BudgetGuaranteeLevel::SingleNodeAtomic
1700 );
1701 assert_eq!(denied.metadata.authority.as_ref(), Some(&authority));
1702
1703 let events = store
1704 .list_mutation_events(10, Some("cap-budget-deny"), Some(0))
1705 .unwrap();
1706 assert_eq!(events.len(), 1);
1707 assert_eq!(events[0].allowed, Some(false));
1708 assert_eq!(events[0].authority.as_ref(), Some(&authority));
1709 assert!(store.get_usage("cap-budget-deny", 0).unwrap().is_none());
1710 }
1711}