Skip to main content

chio_kernel/
budget_store.rs

1use std::collections::HashMap;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4#[derive(Debug, thiserror::Error)]
5pub enum BudgetStoreError {
6    #[error("sqlite error: {0}")]
7    Sqlite(#[from] rusqlite::Error),
8
9    #[error("failed to prepare budget store directory: {0}")]
10    Io(#[from] std::io::Error),
11
12    #[error("budget arithmetic overflow: {0}")]
13    Overflow(String),
14
15    #[error("budget state invariant violated: {0}")]
16    Invariant(String),
17}
18
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub struct BudgetUsageRecord {
21    pub capability_id: String,
22    pub grant_index: u32,
23    pub invocation_count: u32,
24    pub updated_at: i64,
25    pub seq: u64,
26    pub total_cost_exposed: u64,
27    pub total_cost_realized_spend: u64,
28}
29
30impl BudgetUsageRecord {
31    pub fn committed_cost_units(&self) -> Result<u64, BudgetStoreError> {
32        checked_committed_cost_units(self.total_cost_exposed, self.total_cost_realized_spend)
33    }
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum BudgetMutationKind {
38    IncrementInvocation,
39    AuthorizeExposure,
40    ReverseExposure,
41    ReleaseExposure,
42    ReconcileSpend,
43}
44
45impl BudgetMutationKind {
46    pub fn as_str(self) -> &'static str {
47        match self {
48            Self::IncrementInvocation => "increment_invocation",
49            Self::AuthorizeExposure => "authorize_exposure",
50            Self::ReverseExposure => "reverse_exposure",
51            Self::ReleaseExposure => "release_exposure",
52            Self::ReconcileSpend => "reconcile_spend",
53        }
54    }
55
56    pub fn parse(value: &str) -> Option<Self> {
57        match value {
58            "increment_invocation" => Some(Self::IncrementInvocation),
59            "authorize_exposure" => Some(Self::AuthorizeExposure),
60            "reverse_exposure" => Some(Self::ReverseExposure),
61            "release_exposure" => Some(Self::ReleaseExposure),
62            "reconcile_spend" => Some(Self::ReconcileSpend),
63            _ => None,
64        }
65    }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct BudgetEventAuthority {
70    pub authority_id: String,
71    pub lease_id: String,
72    pub lease_epoch: u64,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct BudgetMutationRecord {
77    pub event_id: String,
78    pub hold_id: Option<String>,
79    pub capability_id: String,
80    pub grant_index: u32,
81    pub kind: BudgetMutationKind,
82    pub allowed: Option<bool>,
83    pub recorded_at: i64,
84    pub event_seq: u64,
85    pub usage_seq: Option<u64>,
86    pub exposure_units: u64,
87    pub realized_spend_units: u64,
88    pub max_invocations: Option<u32>,
89    pub max_cost_per_invocation: Option<u64>,
90    pub max_total_cost_units: Option<u64>,
91    pub invocation_count_after: u32,
92    pub total_cost_exposed_after: u64,
93    pub total_cost_realized_spend_after: u64,
94    pub authority: Option<BudgetEventAuthority>,
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum BudgetGuaranteeLevel {
99    SingleNodeAtomic,
100    HaLinearizable,
101    PartitionEscrowed,
102    AdvisoryPosthoc,
103}
104
105impl BudgetGuaranteeLevel {
106    pub fn as_str(self) -> &'static str {
107        match self {
108            Self::SingleNodeAtomic => "single_node_atomic",
109            Self::HaLinearizable => "ha_linearizable",
110            Self::PartitionEscrowed => "partition_escrowed",
111            Self::AdvisoryPosthoc => "advisory_posthoc",
112        }
113    }
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq)]
117pub enum BudgetAuthorityProfile {
118    AuthoritativeHoldEvent,
119}
120
121impl BudgetAuthorityProfile {
122    pub fn as_str(self) -> &'static str {
123        match self {
124            Self::AuthoritativeHoldEvent => "authoritative_hold_event",
125        }
126    }
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub enum BudgetMeteringProfile {
131    MaxCostPreauthorizeThenReconcileActual,
132}
133
134impl BudgetMeteringProfile {
135    pub fn as_str(self) -> &'static str {
136        match self {
137            Self::MaxCostPreauthorizeThenReconcileActual => {
138                "max_cost_preauthorize_then_reconcile_actual"
139            }
140        }
141    }
142}
143
144#[derive(Debug, Clone, PartialEq, Eq)]
145pub struct BudgetCommitMetadata {
146    pub authority: Option<BudgetEventAuthority>,
147    pub guarantee_level: BudgetGuaranteeLevel,
148    pub budget_profile: BudgetAuthorityProfile,
149    pub metering_profile: BudgetMeteringProfile,
150    pub budget_commit_index: Option<u64>,
151    pub event_id: Option<String>,
152}
153
154impl BudgetCommitMetadata {
155    pub fn budget_term(&self) -> Option<String> {
156        self.authority
157            .as_ref()
158            .map(|authority| format!("{}:{}", authority.authority_id, authority.lease_epoch))
159    }
160}
161
162fn budget_commit_metadata<T: BudgetStore + ?Sized>(
163    store: &T,
164    authority: Option<BudgetEventAuthority>,
165    budget_commit_index: Option<u64>,
166    event_id: Option<String>,
167) -> BudgetCommitMetadata {
168    BudgetCommitMetadata {
169        authority,
170        guarantee_level: store.budget_guarantee_level(),
171        budget_profile: store.budget_authority_profile(),
172        metering_profile: store.budget_metering_profile(),
173        budget_commit_index,
174        event_id,
175    }
176}
177
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub struct BudgetAuthorizeHoldRequest {
180    pub capability_id: String,
181    pub grant_index: usize,
182    pub max_invocations: Option<u32>,
183    pub requested_exposure_units: u64,
184    pub max_cost_per_invocation: Option<u64>,
185    pub max_total_cost_units: Option<u64>,
186    pub hold_id: Option<String>,
187    pub event_id: Option<String>,
188    pub authority: Option<BudgetEventAuthority>,
189}
190
191#[derive(Debug, Clone, PartialEq, Eq)]
192pub struct BudgetReleaseHoldRequest {
193    pub capability_id: String,
194    pub grant_index: usize,
195    pub released_exposure_units: u64,
196    pub hold_id: Option<String>,
197    pub event_id: Option<String>,
198    pub authority: Option<BudgetEventAuthority>,
199}
200
201#[derive(Debug, Clone, PartialEq, Eq)]
202pub struct BudgetReverseHoldRequest {
203    pub capability_id: String,
204    pub grant_index: usize,
205    pub reversed_exposure_units: u64,
206    pub hold_id: Option<String>,
207    pub event_id: Option<String>,
208    pub authority: Option<BudgetEventAuthority>,
209}
210
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub struct BudgetReconcileHoldRequest {
213    pub capability_id: String,
214    pub grant_index: usize,
215    pub exposed_cost_units: u64,
216    pub realized_spend_units: u64,
217    pub hold_id: Option<String>,
218    pub event_id: Option<String>,
219    pub authority: Option<BudgetEventAuthority>,
220}
221
222pub type BudgetCaptureHoldRequest = BudgetReconcileHoldRequest;
223
224#[derive(Debug, Clone, PartialEq, Eq)]
225pub struct AuthorizedBudgetHold {
226    pub hold_id: Option<String>,
227    pub authorized_exposure_units: u64,
228    pub committed_cost_units_after: u64,
229    pub invocation_count_after: u32,
230    pub metadata: BudgetCommitMetadata,
231}
232
233#[derive(Debug, Clone, PartialEq, Eq)]
234pub struct DeniedBudgetHold {
235    pub hold_id: Option<String>,
236    pub attempted_exposure_units: u64,
237    pub committed_cost_units_after: u64,
238    pub invocation_count_after: u32,
239    pub metadata: BudgetCommitMetadata,
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub enum BudgetAuthorizeHoldDecision {
244    Authorized(AuthorizedBudgetHold),
245    Denied(DeniedBudgetHold),
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub struct BudgetHoldMutationDecision {
250    pub hold_id: Option<String>,
251    pub exposure_units: u64,
252    pub realized_spend_units: u64,
253    pub committed_cost_units_after: u64,
254    pub invocation_count_after: u32,
255    pub metadata: BudgetCommitMetadata,
256}
257
258pub type BudgetReleaseHoldDecision = BudgetHoldMutationDecision;
259pub type BudgetReverseHoldDecision = BudgetHoldMutationDecision;
260pub type BudgetReconcileHoldDecision = BudgetHoldMutationDecision;
261pub type BudgetCaptureHoldDecision = BudgetHoldMutationDecision;
262
263pub trait BudgetStore: Send {
264    fn try_increment(
265        &mut self,
266        capability_id: &str,
267        grant_index: usize,
268        max_invocations: Option<u32>,
269    ) -> Result<bool, BudgetStoreError>;
270
271    /// Atomically check monetary budget limits and record provisional exposure if within bounds.
272    ///
273    /// Checks:
274    /// 1. `invocation_count < max_invocations` (if set)
275    /// 2. `cost_units <= max_cost_per_invocation` (if set)
276    /// 3. `(total_cost_exposed + total_cost_realized_spend + cost_units)
277    ///    <= max_total_cost_units` (if set)
278    ///
279    /// On pass: increments `invocation_count` by 1 and `total_cost_exposed` by
280    /// `cost_units`, allocates a new replication seq, returns `Ok(true)`.
281    /// On any limit exceeded: rolls back, returns `Ok(false)`.
282    ///
283    // SAFETY: HA overrun bound = max_cost_per_invocation x node_count
284    // In a split-brain scenario, each HA node may independently approve up to
285    // one invocation at the full per-invocation cap before the LWW merge
286    // propagates the updated total. The maximum possible overrun is therefore
287    // bounded by max_cost_per_invocation multiplied by the number of active
288    // nodes in the HA cluster.
289    fn try_charge_cost(
290        &mut self,
291        capability_id: &str,
292        grant_index: usize,
293        max_invocations: Option<u32>,
294        cost_units: u64,
295        max_cost_per_invocation: Option<u64>,
296        max_total_cost_units: Option<u64>,
297    ) -> Result<bool, BudgetStoreError>;
298
299    #[allow(clippy::too_many_arguments)]
300    fn try_charge_cost_with_ids(
301        &mut self,
302        capability_id: &str,
303        grant_index: usize,
304        max_invocations: Option<u32>,
305        cost_units: u64,
306        max_cost_per_invocation: Option<u64>,
307        max_total_cost_units: Option<u64>,
308        hold_id: Option<&str>,
309        event_id: Option<&str>,
310    ) -> Result<bool, BudgetStoreError> {
311        let _ = hold_id;
312        let _ = event_id;
313        self.try_charge_cost(
314            capability_id,
315            grant_index,
316            max_invocations,
317            cost_units,
318            max_cost_per_invocation,
319            max_total_cost_units,
320        )
321    }
322
323    #[allow(clippy::too_many_arguments)]
324    fn try_charge_cost_with_ids_and_authority(
325        &mut self,
326        capability_id: &str,
327        grant_index: usize,
328        max_invocations: Option<u32>,
329        cost_units: u64,
330        max_cost_per_invocation: Option<u64>,
331        max_total_cost_units: Option<u64>,
332        hold_id: Option<&str>,
333        event_id: Option<&str>,
334        authority: Option<&BudgetEventAuthority>,
335    ) -> Result<bool, BudgetStoreError> {
336        let _ = authority;
337        self.try_charge_cost_with_ids(
338            capability_id,
339            grant_index,
340            max_invocations,
341            cost_units,
342            max_cost_per_invocation,
343            max_total_cost_units,
344            hold_id,
345            event_id,
346        )
347    }
348
349    /// Reverse a previously applied provisional exposure for a pre-execution denial path.
350    fn reverse_charge_cost(
351        &mut self,
352        capability_id: &str,
353        grant_index: usize,
354        cost_units: u64,
355    ) -> Result<(), BudgetStoreError>;
356
357    fn reverse_charge_cost_with_ids(
358        &mut self,
359        capability_id: &str,
360        grant_index: usize,
361        cost_units: u64,
362        hold_id: Option<&str>,
363        event_id: Option<&str>,
364    ) -> Result<(), BudgetStoreError> {
365        let _ = hold_id;
366        let _ = event_id;
367        self.reverse_charge_cost(capability_id, grant_index, cost_units)
368    }
369
370    fn reverse_charge_cost_with_ids_and_authority(
371        &mut self,
372        capability_id: &str,
373        grant_index: usize,
374        cost_units: u64,
375        hold_id: Option<&str>,
376        event_id: Option<&str>,
377        authority: Option<&BudgetEventAuthority>,
378    ) -> Result<(), BudgetStoreError> {
379        let _ = authority;
380        self.reverse_charge_cost_with_ids(capability_id, grant_index, cost_units, hold_id, event_id)
381    }
382
383    /// Release a previously exposed monetary amount without changing invocation count.
384    ///
385    /// This is used when the kernel needs to release provisional exposure without
386    /// realizing any spend in the budget store itself.
387    fn reduce_charge_cost(
388        &mut self,
389        capability_id: &str,
390        grant_index: usize,
391        cost_units: u64,
392    ) -> Result<(), BudgetStoreError>;
393
394    fn reduce_charge_cost_with_ids(
395        &mut self,
396        capability_id: &str,
397        grant_index: usize,
398        cost_units: u64,
399        hold_id: Option<&str>,
400        event_id: Option<&str>,
401    ) -> Result<(), BudgetStoreError> {
402        let _ = hold_id;
403        let _ = event_id;
404        self.reduce_charge_cost(capability_id, grant_index, cost_units)
405    }
406
407    fn reduce_charge_cost_with_ids_and_authority(
408        &mut self,
409        capability_id: &str,
410        grant_index: usize,
411        cost_units: u64,
412        hold_id: Option<&str>,
413        event_id: Option<&str>,
414        authority: Option<&BudgetEventAuthority>,
415    ) -> Result<(), BudgetStoreError> {
416        let _ = authority;
417        self.reduce_charge_cost_with_ids(capability_id, grant_index, cost_units, hold_id, event_id)
418    }
419
420    /// Atomically release provisional exposure and record realized spend.
421    ///
422    /// This removes `exposed_cost_units` from `total_cost_exposed` and adds
423    /// `realized_cost_units` to `total_cost_realized_spend` without changing
424    /// invocation count. `realized_cost_units` must not exceed
425    /// `exposed_cost_units`.
426    fn settle_charge_cost(
427        &mut self,
428        capability_id: &str,
429        grant_index: usize,
430        exposed_cost_units: u64,
431        realized_cost_units: u64,
432    ) -> Result<(), BudgetStoreError>;
433
434    fn settle_charge_cost_with_ids(
435        &mut self,
436        capability_id: &str,
437        grant_index: usize,
438        exposed_cost_units: u64,
439        realized_cost_units: u64,
440        hold_id: Option<&str>,
441        event_id: Option<&str>,
442    ) -> Result<(), BudgetStoreError> {
443        let _ = hold_id;
444        let _ = event_id;
445        self.settle_charge_cost(
446            capability_id,
447            grant_index,
448            exposed_cost_units,
449            realized_cost_units,
450        )
451    }
452
453    #[allow(clippy::too_many_arguments)]
454    fn settle_charge_cost_with_ids_and_authority(
455        &mut self,
456        capability_id: &str,
457        grant_index: usize,
458        exposed_cost_units: u64,
459        realized_cost_units: u64,
460        hold_id: Option<&str>,
461        event_id: Option<&str>,
462        authority: Option<&BudgetEventAuthority>,
463    ) -> Result<(), BudgetStoreError> {
464        let _ = authority;
465        self.settle_charge_cost_with_ids(
466            capability_id,
467            grant_index,
468            exposed_cost_units,
469            realized_cost_units,
470            hold_id,
471            event_id,
472        )
473    }
474
475    fn list_usages(
476        &self,
477        limit: usize,
478        capability_id: Option<&str>,
479    ) -> Result<Vec<BudgetUsageRecord>, BudgetStoreError>;
480
481    fn get_usage(
482        &self,
483        capability_id: &str,
484        grant_index: usize,
485    ) -> Result<Option<BudgetUsageRecord>, BudgetStoreError>;
486
487    fn list_mutation_events(
488        &self,
489        _limit: usize,
490        _capability_id: Option<&str>,
491        _grant_index: Option<usize>,
492    ) -> Result<Vec<BudgetMutationRecord>, BudgetStoreError> {
493        Err(BudgetStoreError::Invariant(
494            "budget mutation events unavailable for this backend".to_string(),
495        ))
496    }
497
498    fn budget_guarantee_level(&self) -> BudgetGuaranteeLevel {
499        BudgetGuaranteeLevel::SingleNodeAtomic
500    }
501
502    fn budget_authority_profile(&self) -> BudgetAuthorityProfile {
503        BudgetAuthorityProfile::AuthoritativeHoldEvent
504    }
505
506    fn budget_metering_profile(&self) -> BudgetMeteringProfile {
507        BudgetMeteringProfile::MaxCostPreauthorizeThenReconcileActual
508    }
509
510    fn authorize_budget_hold(
511        &mut self,
512        request: BudgetAuthorizeHoldRequest,
513    ) -> Result<BudgetAuthorizeHoldDecision, BudgetStoreError> {
514        let allowed = self.try_charge_cost_with_ids_and_authority(
515            &request.capability_id,
516            request.grant_index,
517            request.max_invocations,
518            request.requested_exposure_units,
519            request.max_cost_per_invocation,
520            request.max_total_cost_units,
521            request.hold_id.as_deref(),
522            request.event_id.as_deref(),
523            request.authority.as_ref(),
524        )?;
525        let usage = self.get_usage(&request.capability_id, request.grant_index)?;
526        let committed_cost_units_after = usage
527            .as_ref()
528            .map(BudgetUsageRecord::committed_cost_units)
529            .transpose()?
530            .unwrap_or(0);
531        let invocation_count_after = usage.as_ref().map_or(0, |usage| usage.invocation_count);
532        let metadata = budget_commit_metadata(
533            self,
534            request.authority,
535            allowed
536                .then(|| usage.as_ref().map(|usage| usage.seq))
537                .flatten(),
538            request.event_id,
539        );
540
541        if allowed {
542            Ok(BudgetAuthorizeHoldDecision::Authorized(
543                AuthorizedBudgetHold {
544                    hold_id: request.hold_id,
545                    authorized_exposure_units: request.requested_exposure_units,
546                    committed_cost_units_after,
547                    invocation_count_after,
548                    metadata,
549                },
550            ))
551        } else {
552            Ok(BudgetAuthorizeHoldDecision::Denied(DeniedBudgetHold {
553                hold_id: request.hold_id,
554                attempted_exposure_units: request.requested_exposure_units,
555                committed_cost_units_after,
556                invocation_count_after,
557                metadata,
558            }))
559        }
560    }
561
562    fn reverse_budget_hold(
563        &mut self,
564        request: BudgetReverseHoldRequest,
565    ) -> Result<BudgetReverseHoldDecision, BudgetStoreError> {
566        self.reverse_charge_cost_with_ids_and_authority(
567            &request.capability_id,
568            request.grant_index,
569            request.reversed_exposure_units,
570            request.hold_id.as_deref(),
571            request.event_id.as_deref(),
572            request.authority.as_ref(),
573        )?;
574        let usage = self.get_usage(&request.capability_id, request.grant_index)?;
575        Ok(BudgetHoldMutationDecision {
576            hold_id: request.hold_id,
577            exposure_units: request.reversed_exposure_units,
578            realized_spend_units: 0,
579            committed_cost_units_after: usage
580                .as_ref()
581                .map(BudgetUsageRecord::committed_cost_units)
582                .transpose()?
583                .unwrap_or(0),
584            invocation_count_after: usage.as_ref().map_or(0, |usage| usage.invocation_count),
585            metadata: budget_commit_metadata(
586                self,
587                request.authority,
588                usage.as_ref().map(|usage| usage.seq),
589                request.event_id,
590            ),
591        })
592    }
593
594    fn release_budget_hold(
595        &mut self,
596        request: BudgetReleaseHoldRequest,
597    ) -> Result<BudgetReleaseHoldDecision, BudgetStoreError> {
598        self.reduce_charge_cost_with_ids_and_authority(
599            &request.capability_id,
600            request.grant_index,
601            request.released_exposure_units,
602            request.hold_id.as_deref(),
603            request.event_id.as_deref(),
604            request.authority.as_ref(),
605        )?;
606        let usage = self.get_usage(&request.capability_id, request.grant_index)?;
607        Ok(BudgetHoldMutationDecision {
608            hold_id: request.hold_id,
609            exposure_units: request.released_exposure_units,
610            realized_spend_units: 0,
611            committed_cost_units_after: usage
612                .as_ref()
613                .map(BudgetUsageRecord::committed_cost_units)
614                .transpose()?
615                .unwrap_or(0),
616            invocation_count_after: usage.as_ref().map_or(0, |usage| usage.invocation_count),
617            metadata: budget_commit_metadata(
618                self,
619                request.authority,
620                usage.as_ref().map(|usage| usage.seq),
621                request.event_id,
622            ),
623        })
624    }
625
626    fn reconcile_budget_hold(
627        &mut self,
628        request: BudgetReconcileHoldRequest,
629    ) -> Result<BudgetReconcileHoldDecision, BudgetStoreError> {
630        self.settle_charge_cost_with_ids_and_authority(
631            &request.capability_id,
632            request.grant_index,
633            request.exposed_cost_units,
634            request.realized_spend_units,
635            request.hold_id.as_deref(),
636            request.event_id.as_deref(),
637            request.authority.as_ref(),
638        )?;
639        let usage = self.get_usage(&request.capability_id, request.grant_index)?;
640        Ok(BudgetHoldMutationDecision {
641            hold_id: request.hold_id,
642            exposure_units: request.exposed_cost_units,
643            realized_spend_units: request.realized_spend_units,
644            committed_cost_units_after: usage
645                .as_ref()
646                .map(BudgetUsageRecord::committed_cost_units)
647                .transpose()?
648                .unwrap_or(0),
649            invocation_count_after: usage.as_ref().map_or(0, |usage| usage.invocation_count),
650            metadata: budget_commit_metadata(
651                self,
652                request.authority,
653                usage.as_ref().map(|usage| usage.seq),
654                request.event_id,
655            ),
656        })
657    }
658
659    fn capture_budget_hold(
660        &mut self,
661        request: BudgetCaptureHoldRequest,
662    ) -> Result<BudgetCaptureHoldDecision, BudgetStoreError> {
663        self.reconcile_budget_hold(request)
664    }
665}
666
667#[derive(Debug, Clone, PartialEq, Eq)]
668enum BudgetHoldDisposition {
669    Open,
670    Released,
671    Reversed,
672    Reconciled,
673}
674
675#[derive(Debug, Clone, PartialEq, Eq)]
676struct BudgetHoldState {
677    capability_id: String,
678    grant_index: usize,
679    authorized_exposure_units: u64,
680    remaining_exposure_units: u64,
681    invocation_count_debited: bool,
682    disposition: BudgetHoldDisposition,
683    authority: Option<BudgetEventAuthority>,
684}
685
686#[derive(Debug, Clone, PartialEq, Eq)]
687enum BudgetMutationRequest {
688    Increment {
689        capability_id: String,
690        grant_index: usize,
691        max_invocations: Option<u32>,
692    },
693    Authorize {
694        capability_id: String,
695        grant_index: usize,
696        hold_id: Option<String>,
697        authority: Option<BudgetEventAuthority>,
698        cost_units: u64,
699        max_invocations: Option<u32>,
700        max_cost_per_invocation: Option<u64>,
701        max_total_cost_units: Option<u64>,
702    },
703    Reverse {
704        capability_id: String,
705        grant_index: usize,
706        hold_id: Option<String>,
707        authority: Option<BudgetEventAuthority>,
708        cost_units: u64,
709    },
710    Release {
711        capability_id: String,
712        grant_index: usize,
713        hold_id: Option<String>,
714        authority: Option<BudgetEventAuthority>,
715        cost_units: u64,
716    },
717    Reconcile {
718        capability_id: String,
719        grant_index: usize,
720        hold_id: Option<String>,
721        authority: Option<BudgetEventAuthority>,
722        exposed_cost_units: u64,
723        realized_cost_units: u64,
724    },
725}
726
727#[derive(Debug, Clone)]
728struct RecordedBudgetMutation {
729    request: BudgetMutationRequest,
730    record: BudgetMutationRecord,
731}
732
733#[derive(Default)]
734pub struct InMemoryBudgetStore {
735    counts: HashMap<(String, usize), BudgetUsageRecord>,
736    events: Vec<BudgetMutationRecord>,
737    explicit_events: HashMap<String, RecordedBudgetMutation>,
738    holds: HashMap<String, BudgetHoldState>,
739    next_seq: u64,
740    next_event_ordinal: u64,
741}
742
743impl InMemoryBudgetStore {
744    pub fn new() -> Self {
745        Self::default()
746    }
747
748    fn next_event_id(&mut self) -> String {
749        self.next_event_ordinal = self.next_event_ordinal.saturating_add(1);
750        format!("local-budget-event-{}", self.next_event_ordinal)
751    }
752
753    fn duplicate_mutation(
754        &self,
755        event_id: Option<&str>,
756        request: &BudgetMutationRequest,
757    ) -> Result<Option<RecordedBudgetMutation>, BudgetStoreError> {
758        let Some(event_id) = event_id else {
759            return Ok(None);
760        };
761        let Some(existing) = self.explicit_events.get(event_id) else {
762            return Ok(None);
763        };
764        if &existing.request != request {
765            return Err(BudgetStoreError::Invariant(format!(
766                "budget event_id `{event_id}` was reused for a different mutation"
767            )));
768        }
769        Ok(Some(existing.clone()))
770    }
771
772    fn append_mutation(
773        &mut self,
774        explicit_event_id: Option<&str>,
775        request: BudgetMutationRequest,
776        mut record: BudgetMutationRecord,
777    ) {
778        let event_id = explicit_event_id
779            .map(ToOwned::to_owned)
780            .unwrap_or_else(|| self.next_event_id());
781        record.event_id = event_id.clone();
782        self.events.push(record.clone());
783        if explicit_event_id.is_some() {
784            self.explicit_events
785                .insert(event_id, RecordedBudgetMutation { request, record });
786        }
787    }
788
789    fn validate_open_hold(
790        &self,
791        hold_id: &str,
792        capability_id: &str,
793        grant_index: usize,
794    ) -> Result<&BudgetHoldState, BudgetStoreError> {
795        let hold = self.holds.get(hold_id).ok_or_else(|| {
796            BudgetStoreError::Invariant(format!("missing budget hold `{hold_id}`"))
797        })?;
798        if hold.capability_id != capability_id || hold.grant_index != grant_index {
799            return Err(BudgetStoreError::Invariant(format!(
800                "budget hold `{hold_id}` does not match capability/grant"
801            )));
802        }
803        if hold.disposition != BudgetHoldDisposition::Open {
804            return Err(BudgetStoreError::Invariant(format!(
805                "budget hold `{hold_id}` is no longer open"
806            )));
807        }
808        Ok(hold)
809    }
810
811    fn validate_hold_authority(
812        hold_id: &str,
813        current: Option<&BudgetEventAuthority>,
814        requested: Option<&BudgetEventAuthority>,
815    ) -> Result<Option<BudgetEventAuthority>, BudgetStoreError> {
816        match (current, requested) {
817            (None, None) => Ok(None),
818            (None, Some(_)) => Err(BudgetStoreError::Invariant(format!(
819                "budget hold `{hold_id}` was created without authority lease metadata"
820            ))),
821            (Some(_), None) => Err(BudgetStoreError::Invariant(format!(
822                "budget hold `{hold_id}` requires authority lease metadata"
823            ))),
824            (Some(current), Some(requested)) => {
825                if current.authority_id != requested.authority_id {
826                    return Err(BudgetStoreError::Invariant(format!(
827                        "budget hold `{hold_id}` authority_id does not match the open lease"
828                    )));
829                }
830                if requested.lease_id != current.lease_id {
831                    return Err(BudgetStoreError::Invariant(format!(
832                        "budget hold `{hold_id}` lease_id does not match the open lease epoch"
833                    )));
834                }
835                if requested.lease_epoch < current.lease_epoch {
836                    return Err(BudgetStoreError::Invariant(format!(
837                        "budget hold `{hold_id}` authority lease epoch regressed"
838                    )));
839                }
840                if requested.lease_epoch > current.lease_epoch {
841                    return Err(BudgetStoreError::Invariant(format!(
842                        "budget hold `{hold_id}` authority lease epoch advanced beyond the open lease"
843                    )));
844                }
845                Ok(Some(requested.clone()))
846            }
847        }
848    }
849
850    fn default_usage_record(capability_id: &str, grant_index: usize) -> BudgetUsageRecord {
851        BudgetUsageRecord {
852            capability_id: capability_id.to_string(),
853            grant_index: grant_index as u32,
854            invocation_count: 0,
855            updated_at: unix_now(),
856            seq: 0,
857            total_cost_exposed: 0,
858            total_cost_realized_spend: 0,
859        }
860    }
861}
862
863fn checked_committed_cost_units(
864    total_cost_exposed: u64,
865    total_cost_realized_spend: u64,
866) -> Result<u64, BudgetStoreError> {
867    total_cost_exposed
868        .checked_add(total_cost_realized_spend)
869        .ok_or_else(|| {
870            BudgetStoreError::Overflow(
871                "total_cost_exposed + total_cost_realized_spend overflowed u64".to_string(),
872            )
873        })
874}
875
876impl BudgetStore for InMemoryBudgetStore {
877    fn try_increment(
878        &mut self,
879        capability_id: &str,
880        grant_index: usize,
881        max_invocations: Option<u32>,
882    ) -> Result<bool, BudgetStoreError> {
883        let request = BudgetMutationRequest::Increment {
884            capability_id: capability_id.to_string(),
885            grant_index,
886            max_invocations,
887        };
888        let key = (capability_id.to_string(), grant_index);
889        let current = self
890            .counts
891            .get(&key)
892            .cloned()
893            .unwrap_or_else(|| Self::default_usage_record(capability_id, grant_index));
894        let allowed = max_invocations.is_none_or(|max| current.invocation_count < max);
895        let recorded_at = unix_now();
896        let event_seq = self.next_seq.saturating_add(1);
897        self.next_seq = event_seq;
898        let usage_seq = if allowed {
899            let entry = self
900                .counts
901                .entry(key)
902                .or_insert_with(|| Self::default_usage_record(capability_id, grant_index));
903            entry.invocation_count = current.invocation_count.saturating_add(1);
904            entry.updated_at = recorded_at;
905            entry.seq = event_seq;
906            Some(event_seq)
907        } else {
908            None
909        };
910        self.append_mutation(
911            None,
912            request,
913            BudgetMutationRecord {
914                event_id: String::new(),
915                hold_id: None,
916                capability_id: capability_id.to_string(),
917                grant_index: grant_index as u32,
918                kind: BudgetMutationKind::IncrementInvocation,
919                allowed: Some(allowed),
920                recorded_at,
921                event_seq,
922                usage_seq,
923                exposure_units: 0,
924                realized_spend_units: 0,
925                max_invocations,
926                max_cost_per_invocation: None,
927                max_total_cost_units: None,
928                invocation_count_after: if allowed {
929                    current.invocation_count.saturating_add(1)
930                } else {
931                    current.invocation_count
932                },
933                total_cost_exposed_after: current.total_cost_exposed,
934                total_cost_realized_spend_after: current.total_cost_realized_spend,
935                authority: None,
936            },
937        );
938        Ok(allowed)
939    }
940
941    fn try_charge_cost(
942        &mut self,
943        capability_id: &str,
944        grant_index: usize,
945        max_invocations: Option<u32>,
946        cost_units: u64,
947        max_cost_per_invocation: Option<u64>,
948        max_total_cost_units: Option<u64>,
949    ) -> Result<bool, BudgetStoreError> {
950        self.try_charge_cost_with_ids(
951            capability_id,
952            grant_index,
953            max_invocations,
954            cost_units,
955            max_cost_per_invocation,
956            max_total_cost_units,
957            None,
958            None,
959        )
960    }
961
962    fn try_charge_cost_with_ids(
963        &mut self,
964        capability_id: &str,
965        grant_index: usize,
966        max_invocations: Option<u32>,
967        cost_units: u64,
968        max_cost_per_invocation: Option<u64>,
969        max_total_cost_units: Option<u64>,
970        hold_id: Option<&str>,
971        event_id: Option<&str>,
972    ) -> Result<bool, BudgetStoreError> {
973        self.try_charge_cost_with_ids_and_authority(
974            capability_id,
975            grant_index,
976            max_invocations,
977            cost_units,
978            max_cost_per_invocation,
979            max_total_cost_units,
980            hold_id,
981            event_id,
982            None,
983        )
984    }
985
986    fn try_charge_cost_with_ids_and_authority(
987        &mut self,
988        capability_id: &str,
989        grant_index: usize,
990        max_invocations: Option<u32>,
991        cost_units: u64,
992        max_cost_per_invocation: Option<u64>,
993        max_total_cost_units: Option<u64>,
994        hold_id: Option<&str>,
995        event_id: Option<&str>,
996        authority: Option<&BudgetEventAuthority>,
997    ) -> Result<bool, BudgetStoreError> {
998        let request = BudgetMutationRequest::Authorize {
999            capability_id: capability_id.to_string(),
1000            grant_index,
1001            hold_id: hold_id.map(ToOwned::to_owned),
1002            authority: authority.cloned(),
1003            cost_units,
1004            max_invocations,
1005            max_cost_per_invocation,
1006            max_total_cost_units,
1007        };
1008        if let Some(existing) = self.duplicate_mutation(event_id, &request)? {
1009            return Ok(existing.record.allowed.unwrap_or(false));
1010        }
1011
1012        let key = (capability_id.to_string(), grant_index);
1013        let current = self
1014            .counts
1015            .get(&key)
1016            .cloned()
1017            .unwrap_or_else(|| Self::default_usage_record(capability_id, grant_index));
1018
1019        let mut allowed = true;
1020        if let Some(max) = max_invocations {
1021            if current.invocation_count >= max {
1022                allowed = false;
1023            }
1024        }
1025        if let Some(max_per) = max_cost_per_invocation {
1026            if cost_units > max_per {
1027                allowed = false;
1028            }
1029        }
1030        if let Some(max_total) = max_total_cost_units {
1031            let current_total = checked_committed_cost_units(
1032                current.total_cost_exposed,
1033                current.total_cost_realized_spend,
1034            )?;
1035            let new_total = current_total.checked_add(cost_units).ok_or_else(|| {
1036                BudgetStoreError::Overflow(
1037                    "authorized exposure + cost_units overflowed u64".to_string(),
1038                )
1039            })?;
1040            if new_total > max_total {
1041                allowed = false;
1042            }
1043        }
1044
1045        let recorded_at = unix_now();
1046        let (invocation_count_after, total_cost_exposed_after, total_cost_realized_spend_after);
1047        let event_seq;
1048        let mut usage_seq = None;
1049
1050        if allowed {
1051            if let Some(hold_id) = hold_id {
1052                if self.holds.contains_key(hold_id) {
1053                    return Err(BudgetStoreError::Invariant(format!(
1054                        "budget hold `{hold_id}` already exists"
1055                    )));
1056                }
1057            }
1058            let new_total_cost_exposed = current
1059                .total_cost_exposed
1060                .checked_add(cost_units)
1061                .ok_or_else(|| {
1062                    BudgetStoreError::Overflow(
1063                        "total_cost_exposed + cost_units overflowed u64".to_string(),
1064                    )
1065                })?;
1066            event_seq = self.next_seq.saturating_add(1);
1067            self.next_seq = event_seq;
1068            let entry = self
1069                .counts
1070                .entry(key)
1071                .or_insert_with(|| Self::default_usage_record(capability_id, grant_index));
1072            entry.invocation_count = current.invocation_count.saturating_add(1);
1073            entry.total_cost_exposed = new_total_cost_exposed;
1074            entry.updated_at = recorded_at;
1075            entry.seq = event_seq;
1076            if let Some(hold_id) = hold_id {
1077                self.holds.insert(
1078                    hold_id.to_string(),
1079                    BudgetHoldState {
1080                        capability_id: capability_id.to_string(),
1081                        grant_index,
1082                        authorized_exposure_units: cost_units,
1083                        remaining_exposure_units: cost_units,
1084                        invocation_count_debited: true,
1085                        disposition: BudgetHoldDisposition::Open,
1086                        authority: authority.cloned(),
1087                    },
1088                );
1089            }
1090            invocation_count_after = entry.invocation_count;
1091            total_cost_exposed_after = entry.total_cost_exposed;
1092            total_cost_realized_spend_after = entry.total_cost_realized_spend;
1093            usage_seq = Some(event_seq);
1094        } else {
1095            event_seq = self.next_seq.saturating_add(1);
1096            self.next_seq = event_seq;
1097            invocation_count_after = current.invocation_count;
1098            total_cost_exposed_after = current.total_cost_exposed;
1099            total_cost_realized_spend_after = current.total_cost_realized_spend;
1100        }
1101
1102        self.append_mutation(
1103            event_id,
1104            request,
1105            BudgetMutationRecord {
1106                event_id: String::new(),
1107                hold_id: hold_id.map(ToOwned::to_owned),
1108                capability_id: capability_id.to_string(),
1109                grant_index: grant_index as u32,
1110                kind: BudgetMutationKind::AuthorizeExposure,
1111                allowed: Some(allowed),
1112                recorded_at,
1113                event_seq,
1114                usage_seq,
1115                exposure_units: cost_units,
1116                realized_spend_units: 0,
1117                max_invocations,
1118                max_cost_per_invocation,
1119                max_total_cost_units,
1120                invocation_count_after,
1121                total_cost_exposed_after,
1122                total_cost_realized_spend_after,
1123                authority: authority.cloned(),
1124            },
1125        );
1126
1127        Ok(allowed)
1128    }
1129
1130    fn reverse_charge_cost(
1131        &mut self,
1132        capability_id: &str,
1133        grant_index: usize,
1134        cost_units: u64,
1135    ) -> Result<(), BudgetStoreError> {
1136        self.reverse_charge_cost_with_ids(capability_id, grant_index, cost_units, None, None)
1137    }
1138
1139    fn reverse_charge_cost_with_ids(
1140        &mut self,
1141        capability_id: &str,
1142        grant_index: usize,
1143        cost_units: u64,
1144        hold_id: Option<&str>,
1145        event_id: Option<&str>,
1146    ) -> Result<(), BudgetStoreError> {
1147        self.reverse_charge_cost_with_ids_and_authority(
1148            capability_id,
1149            grant_index,
1150            cost_units,
1151            hold_id,
1152            event_id,
1153            None,
1154        )
1155    }
1156
1157    fn reverse_charge_cost_with_ids_and_authority(
1158        &mut self,
1159        capability_id: &str,
1160        grant_index: usize,
1161        cost_units: u64,
1162        hold_id: Option<&str>,
1163        event_id: Option<&str>,
1164        authority: Option<&BudgetEventAuthority>,
1165    ) -> Result<(), BudgetStoreError> {
1166        let request = BudgetMutationRequest::Reverse {
1167            capability_id: capability_id.to_string(),
1168            grant_index,
1169            hold_id: hold_id.map(ToOwned::to_owned),
1170            authority: authority.cloned(),
1171            cost_units,
1172        };
1173        if self.duplicate_mutation(event_id, &request)?.is_some() {
1174            return Ok(());
1175        }
1176        if let Some(hold_id) = hold_id {
1177            let hold = self.validate_open_hold(hold_id, capability_id, grant_index)?;
1178            if hold.remaining_exposure_units != cost_units || !hold.invocation_count_debited {
1179                return Err(BudgetStoreError::Invariant(format!(
1180                    "budget hold `{hold_id}` does not match reverse amount"
1181                )));
1182            }
1183            Self::validate_hold_authority(hold_id, hold.authority.as_ref(), authority)?;
1184        }
1185
1186        let key = (capability_id.to_string(), grant_index);
1187        let (
1188            invocation_count_after,
1189            total_cost_exposed_after,
1190            total_cost_realized_spend_after,
1191            seq,
1192        );
1193        {
1194            let entry = self.counts.get_mut(&key).ok_or_else(|| {
1195                BudgetStoreError::Invariant("missing charged budget row".to_string())
1196            })?;
1197            if entry.invocation_count == 0 {
1198                return Err(BudgetStoreError::Invariant(
1199                    "cannot reverse charge with zero invocation_count".to_string(),
1200                ));
1201            }
1202            if entry.total_cost_exposed < cost_units {
1203                return Err(BudgetStoreError::Invariant(
1204                    "cannot reverse charge larger than total_cost_exposed".to_string(),
1205                ));
1206            }
1207            let next_seq = self.next_seq.saturating_add(1);
1208            self.next_seq = next_seq;
1209            entry.invocation_count -= 1;
1210            entry.total_cost_exposed -= cost_units;
1211            entry.updated_at = unix_now();
1212            entry.seq = next_seq;
1213            invocation_count_after = entry.invocation_count;
1214            total_cost_exposed_after = entry.total_cost_exposed;
1215            total_cost_realized_spend_after = entry.total_cost_realized_spend;
1216            seq = entry.seq;
1217        }
1218        if let Some(hold_id) = hold_id {
1219            let Some(hold) = self.holds.get_mut(hold_id) else {
1220                return Err(BudgetStoreError::Invariant(
1221                    "validated hold missing during reverse_charge_cost".to_string(),
1222                ));
1223            };
1224            hold.remaining_exposure_units = 0;
1225            hold.disposition = BudgetHoldDisposition::Reversed;
1226            hold.authority = authority.cloned().or_else(|| hold.authority.clone());
1227        }
1228        self.append_mutation(
1229            event_id,
1230            request,
1231            BudgetMutationRecord {
1232                event_id: String::new(),
1233                hold_id: hold_id.map(ToOwned::to_owned),
1234                capability_id: capability_id.to_string(),
1235                grant_index: grant_index as u32,
1236                kind: BudgetMutationKind::ReverseExposure,
1237                allowed: None,
1238                recorded_at: unix_now(),
1239                event_seq: seq,
1240                usage_seq: Some(seq),
1241                exposure_units: cost_units,
1242                realized_spend_units: 0,
1243                max_invocations: None,
1244                max_cost_per_invocation: None,
1245                max_total_cost_units: None,
1246                invocation_count_after,
1247                total_cost_exposed_after,
1248                total_cost_realized_spend_after,
1249                authority: authority.cloned(),
1250            },
1251        );
1252        Ok(())
1253    }
1254
1255    fn reduce_charge_cost(
1256        &mut self,
1257        capability_id: &str,
1258        grant_index: usize,
1259        cost_units: u64,
1260    ) -> Result<(), BudgetStoreError> {
1261        self.reduce_charge_cost_with_ids(capability_id, grant_index, cost_units, None, None)
1262    }
1263
1264    fn reduce_charge_cost_with_ids(
1265        &mut self,
1266        capability_id: &str,
1267        grant_index: usize,
1268        cost_units: u64,
1269        hold_id: Option<&str>,
1270        event_id: Option<&str>,
1271    ) -> Result<(), BudgetStoreError> {
1272        self.reduce_charge_cost_with_ids_and_authority(
1273            capability_id,
1274            grant_index,
1275            cost_units,
1276            hold_id,
1277            event_id,
1278            None,
1279        )
1280    }
1281
1282    fn reduce_charge_cost_with_ids_and_authority(
1283        &mut self,
1284        capability_id: &str,
1285        grant_index: usize,
1286        cost_units: u64,
1287        hold_id: Option<&str>,
1288        event_id: Option<&str>,
1289        authority: Option<&BudgetEventAuthority>,
1290    ) -> Result<(), BudgetStoreError> {
1291        let request = BudgetMutationRequest::Release {
1292            capability_id: capability_id.to_string(),
1293            grant_index,
1294            hold_id: hold_id.map(ToOwned::to_owned),
1295            authority: authority.cloned(),
1296            cost_units,
1297        };
1298        if self.duplicate_mutation(event_id, &request)?.is_some() {
1299            return Ok(());
1300        }
1301        if let Some(hold_id) = hold_id {
1302            let hold = self.validate_open_hold(hold_id, capability_id, grant_index)?;
1303            if hold.remaining_exposure_units < cost_units {
1304                return Err(BudgetStoreError::Invariant(format!(
1305                    "budget hold `{hold_id}` cannot release more than remaining exposure"
1306                )));
1307            }
1308            Self::validate_hold_authority(hold_id, hold.authority.as_ref(), authority)?;
1309        }
1310
1311        let key = (capability_id.to_string(), grant_index);
1312        let (
1313            invocation_count_after,
1314            total_cost_exposed_after,
1315            total_cost_realized_spend_after,
1316            seq,
1317        );
1318        {
1319            let entry = self.counts.get_mut(&key).ok_or_else(|| {
1320                BudgetStoreError::Invariant("missing charged budget row".to_string())
1321            })?;
1322
1323            if entry.total_cost_exposed < cost_units {
1324                return Err(BudgetStoreError::Invariant(
1325                    "cannot reduce charge larger than total_cost_exposed".to_string(),
1326                ));
1327            }
1328
1329            let next_seq = self.next_seq.saturating_add(1);
1330            self.next_seq = next_seq;
1331            entry.total_cost_exposed -= cost_units;
1332            entry.updated_at = unix_now();
1333            entry.seq = next_seq;
1334            invocation_count_after = entry.invocation_count;
1335            total_cost_exposed_after = entry.total_cost_exposed;
1336            total_cost_realized_spend_after = entry.total_cost_realized_spend;
1337            seq = entry.seq;
1338        }
1339        if let Some(hold_id) = hold_id {
1340            let Some(hold) = self.holds.get_mut(hold_id) else {
1341                return Err(BudgetStoreError::Invariant(
1342                    "validated hold missing during release_charge_cost".to_string(),
1343                ));
1344            };
1345            hold.remaining_exposure_units -= cost_units;
1346            if hold.remaining_exposure_units == 0 {
1347                hold.disposition = BudgetHoldDisposition::Released;
1348            }
1349            hold.authority = authority.cloned().or_else(|| hold.authority.clone());
1350        }
1351        self.append_mutation(
1352            event_id,
1353            request,
1354            BudgetMutationRecord {
1355                event_id: String::new(),
1356                hold_id: hold_id.map(ToOwned::to_owned),
1357                capability_id: capability_id.to_string(),
1358                grant_index: grant_index as u32,
1359                kind: BudgetMutationKind::ReleaseExposure,
1360                allowed: None,
1361                recorded_at: unix_now(),
1362                event_seq: seq,
1363                usage_seq: Some(seq),
1364                exposure_units: cost_units,
1365                realized_spend_units: 0,
1366                max_invocations: None,
1367                max_cost_per_invocation: None,
1368                max_total_cost_units: None,
1369                invocation_count_after,
1370                total_cost_exposed_after,
1371                total_cost_realized_spend_after,
1372                authority: authority.cloned(),
1373            },
1374        );
1375        Ok(())
1376    }
1377
1378    fn settle_charge_cost(
1379        &mut self,
1380        capability_id: &str,
1381        grant_index: usize,
1382        exposed_cost_units: u64,
1383        realized_cost_units: u64,
1384    ) -> Result<(), BudgetStoreError> {
1385        self.settle_charge_cost_with_ids(
1386            capability_id,
1387            grant_index,
1388            exposed_cost_units,
1389            realized_cost_units,
1390            None,
1391            None,
1392        )
1393    }
1394
1395    fn settle_charge_cost_with_ids(
1396        &mut self,
1397        capability_id: &str,
1398        grant_index: usize,
1399        exposed_cost_units: u64,
1400        realized_cost_units: u64,
1401        hold_id: Option<&str>,
1402        event_id: Option<&str>,
1403    ) -> Result<(), BudgetStoreError> {
1404        self.settle_charge_cost_with_ids_and_authority(
1405            capability_id,
1406            grant_index,
1407            exposed_cost_units,
1408            realized_cost_units,
1409            hold_id,
1410            event_id,
1411            None,
1412        )
1413    }
1414
1415    fn settle_charge_cost_with_ids_and_authority(
1416        &mut self,
1417        capability_id: &str,
1418        grant_index: usize,
1419        exposed_cost_units: u64,
1420        realized_cost_units: u64,
1421        hold_id: Option<&str>,
1422        event_id: Option<&str>,
1423        authority: Option<&BudgetEventAuthority>,
1424    ) -> Result<(), BudgetStoreError> {
1425        if realized_cost_units > exposed_cost_units {
1426            return Err(BudgetStoreError::Invariant(
1427                "cannot realize spend larger than exposed cost".to_string(),
1428            ));
1429        }
1430        let request = BudgetMutationRequest::Reconcile {
1431            capability_id: capability_id.to_string(),
1432            grant_index,
1433            hold_id: hold_id.map(ToOwned::to_owned),
1434            authority: authority.cloned(),
1435            exposed_cost_units,
1436            realized_cost_units,
1437        };
1438        if self.duplicate_mutation(event_id, &request)?.is_some() {
1439            return Ok(());
1440        }
1441        if let Some(hold_id) = hold_id {
1442            let hold = self.validate_open_hold(hold_id, capability_id, grant_index)?;
1443            if hold.remaining_exposure_units != exposed_cost_units {
1444                return Err(BudgetStoreError::Invariant(format!(
1445                    "budget hold `{hold_id}` does not match reconciled exposure"
1446                )));
1447            }
1448            Self::validate_hold_authority(hold_id, hold.authority.as_ref(), authority)?;
1449        }
1450
1451        let key = (capability_id.to_string(), grant_index);
1452        let (
1453            invocation_count_after,
1454            total_cost_exposed_after,
1455            total_cost_realized_spend_after,
1456            seq,
1457        );
1458        {
1459            let entry = self.counts.get_mut(&key).ok_or_else(|| {
1460                BudgetStoreError::Invariant("missing charged budget row".to_string())
1461            })?;
1462
1463            if entry.invocation_count == 0 {
1464                return Err(BudgetStoreError::Invariant(
1465                    "cannot settle charge with zero invocation_count".to_string(),
1466                ));
1467            }
1468            if entry.total_cost_exposed < exposed_cost_units {
1469                return Err(BudgetStoreError::Invariant(
1470                    "cannot settle more exposure than total_cost_exposed".to_string(),
1471                ));
1472            }
1473
1474            entry.total_cost_realized_spend = entry
1475                .total_cost_realized_spend
1476                .checked_add(realized_cost_units)
1477                .ok_or_else(|| {
1478                    BudgetStoreError::Overflow(
1479                        "total_cost_realized_spend + realized_cost_units overflowed u64"
1480                            .to_string(),
1481                    )
1482                })?;
1483            entry.total_cost_exposed -= exposed_cost_units;
1484
1485            let next_seq = self.next_seq.saturating_add(1);
1486            self.next_seq = next_seq;
1487            entry.updated_at = unix_now();
1488            entry.seq = next_seq;
1489            invocation_count_after = entry.invocation_count;
1490            total_cost_exposed_after = entry.total_cost_exposed;
1491            total_cost_realized_spend_after = entry.total_cost_realized_spend;
1492            seq = entry.seq;
1493        }
1494        if let Some(hold_id) = hold_id {
1495            let Some(hold) = self.holds.get_mut(hold_id) else {
1496                return Err(BudgetStoreError::Invariant(
1497                    "validated hold missing during settle_charge_cost".to_string(),
1498                ));
1499            };
1500            hold.remaining_exposure_units = 0;
1501            hold.disposition = BudgetHoldDisposition::Reconciled;
1502            hold.authority = authority.cloned().or_else(|| hold.authority.clone());
1503        }
1504        self.append_mutation(
1505            event_id,
1506            request,
1507            BudgetMutationRecord {
1508                event_id: String::new(),
1509                hold_id: hold_id.map(ToOwned::to_owned),
1510                capability_id: capability_id.to_string(),
1511                grant_index: grant_index as u32,
1512                kind: BudgetMutationKind::ReconcileSpend,
1513                allowed: None,
1514                recorded_at: unix_now(),
1515                event_seq: seq,
1516                usage_seq: Some(seq),
1517                exposure_units: exposed_cost_units,
1518                realized_spend_units: realized_cost_units,
1519                max_invocations: None,
1520                max_cost_per_invocation: None,
1521                max_total_cost_units: None,
1522                invocation_count_after,
1523                total_cost_exposed_after,
1524                total_cost_realized_spend_after,
1525                authority: authority.cloned(),
1526            },
1527        );
1528        Ok(())
1529    }
1530
1531    fn list_usages(
1532        &self,
1533        limit: usize,
1534        capability_id: Option<&str>,
1535    ) -> Result<Vec<BudgetUsageRecord>, BudgetStoreError> {
1536        let mut records = self
1537            .counts
1538            .values()
1539            .filter(|record| capability_id.is_none_or(|value| record.capability_id == value))
1540            .cloned()
1541            .collect::<Vec<_>>();
1542        records.sort_by(|left, right| {
1543            right
1544                .updated_at
1545                .cmp(&left.updated_at)
1546                .then_with(|| left.capability_id.cmp(&right.capability_id))
1547                .then_with(|| left.grant_index.cmp(&right.grant_index))
1548        });
1549        records.truncate(limit);
1550        Ok(records)
1551    }
1552
1553    fn get_usage(
1554        &self,
1555        capability_id: &str,
1556        grant_index: usize,
1557    ) -> Result<Option<BudgetUsageRecord>, BudgetStoreError> {
1558        Ok(self
1559            .counts
1560            .get(&(capability_id.to_string(), grant_index))
1561            .cloned())
1562    }
1563
1564    fn list_mutation_events(
1565        &self,
1566        limit: usize,
1567        capability_id: Option<&str>,
1568        grant_index: Option<usize>,
1569    ) -> Result<Vec<BudgetMutationRecord>, BudgetStoreError> {
1570        let mut events = self
1571            .events
1572            .iter()
1573            .filter(|record| capability_id.is_none_or(|value| record.capability_id == value))
1574            .filter(|record| grant_index.is_none_or(|value| record.grant_index == value as u32))
1575            .cloned()
1576            .collect::<Vec<_>>();
1577        events.truncate(limit);
1578        Ok(events)
1579    }
1580}
1581
1582fn unix_now() -> i64 {
1583    SystemTime::now()
1584        .duration_since(UNIX_EPOCH)
1585        .map(|duration| duration.as_secs() as i64)
1586        .unwrap_or(0)
1587}
1588
1589#[cfg(test)]
1590mod tests {
1591    use super::*;
1592
1593    #[test]
1594    fn authorize_and_reconcile_hold_preserve_authority_metadata() {
1595        let mut store = InMemoryBudgetStore::new();
1596        let authority = BudgetEventAuthority {
1597            authority_id: "kernel:test-authority".to_string(),
1598            lease_id: "single-node".to_string(),
1599            lease_epoch: 0,
1600        };
1601
1602        let decision = store
1603            .authorize_budget_hold(BudgetAuthorizeHoldRequest {
1604                capability_id: "cap-budget-1".to_string(),
1605                grant_index: 0,
1606                max_invocations: Some(4),
1607                requested_exposure_units: 100,
1608                max_cost_per_invocation: Some(100),
1609                max_total_cost_units: Some(1_000),
1610                hold_id: Some("hold-budget-1".to_string()),
1611                event_id: Some("hold-budget-1:authorize".to_string()),
1612                authority: Some(authority.clone()),
1613            })
1614            .unwrap();
1615        let BudgetAuthorizeHoldDecision::Authorized(authorized) = decision else {
1616            panic!("budget hold should be authorized");
1617        };
1618        assert_eq!(authorized.committed_cost_units_after, 100);
1619        assert_eq!(
1620            authorized.metadata.event_id.as_deref(),
1621            Some("hold-budget-1:authorize")
1622        );
1623        assert_eq!(authorized.metadata.budget_commit_index, Some(1));
1624        assert_eq!(
1625            authorized.metadata.budget_term().as_deref(),
1626            Some("kernel:test-authority:0")
1627        );
1628
1629        let reconcile = store
1630            .reconcile_budget_hold(BudgetReconcileHoldRequest {
1631                capability_id: "cap-budget-1".to_string(),
1632                grant_index: 0,
1633                exposed_cost_units: 100,
1634                realized_spend_units: 75,
1635                hold_id: Some("hold-budget-1".to_string()),
1636                event_id: Some("hold-budget-1:reconcile".to_string()),
1637                authority: Some(authority.clone()),
1638            })
1639            .unwrap();
1640        assert_eq!(reconcile.committed_cost_units_after, 75);
1641        assert_eq!(reconcile.realized_spend_units, 75);
1642        assert_eq!(
1643            reconcile.metadata.event_id.as_deref(),
1644            Some("hold-budget-1:reconcile")
1645        );
1646        assert_eq!(reconcile.metadata.budget_commit_index, Some(2));
1647        assert_eq!(reconcile.metadata.authority.as_ref(), Some(&authority));
1648
1649        let usage = store.get_usage("cap-budget-1", 0).unwrap().unwrap();
1650        assert_eq!(usage.total_cost_exposed, 0);
1651        assert_eq!(usage.total_cost_realized_spend, 75);
1652        assert_eq!(usage.committed_cost_units().unwrap(), 75);
1653
1654        let events = store
1655            .list_mutation_events(10, Some("cap-budget-1"), Some(0))
1656            .unwrap();
1657        assert_eq!(events.len(), 2);
1658        assert_eq!(events[0].kind, BudgetMutationKind::AuthorizeExposure);
1659        assert_eq!(events[0].authority.as_ref(), Some(&authority));
1660        assert_eq!(events[1].kind, BudgetMutationKind::ReconcileSpend);
1661        assert_eq!(events[1].authority.as_ref(), Some(&authority));
1662        assert_eq!(events[1].realized_spend_units, 75);
1663    }
1664
1665    #[test]
1666    fn denied_authorize_hold_reports_guarantee_metadata_without_commit_index() {
1667        let mut store = InMemoryBudgetStore::new();
1668        let authority = BudgetEventAuthority {
1669            authority_id: "kernel:test-authority".to_string(),
1670            lease_id: "single-node".to_string(),
1671            lease_epoch: 0,
1672        };
1673
1674        let decision = store
1675            .authorize_budget_hold(BudgetAuthorizeHoldRequest {
1676                capability_id: "cap-budget-deny".to_string(),
1677                grant_index: 0,
1678                max_invocations: Some(1),
1679                requested_exposure_units: 150,
1680                max_cost_per_invocation: Some(100),
1681                max_total_cost_units: Some(1_000),
1682                hold_id: Some("hold-budget-deny".to_string()),
1683                event_id: Some("hold-budget-deny:authorize".to_string()),
1684                authority: Some(authority.clone()),
1685            })
1686            .unwrap();
1687        let BudgetAuthorizeHoldDecision::Denied(denied) = decision else {
1688            panic!("budget hold should be denied");
1689        };
1690        assert_eq!(denied.committed_cost_units_after, 0);
1691        assert_eq!(denied.invocation_count_after, 0);
1692        assert_eq!(
1693            denied.metadata.event_id.as_deref(),
1694            Some("hold-budget-deny:authorize")
1695        );
1696        assert_eq!(denied.metadata.budget_commit_index, None);
1697        assert_eq!(
1698            denied.metadata.guarantee_level,
1699            BudgetGuaranteeLevel::SingleNodeAtomic
1700        );
1701        assert_eq!(denied.metadata.authority.as_ref(), Some(&authority));
1702
1703        let events = store
1704            .list_mutation_events(10, Some("cap-budget-deny"), Some(0))
1705            .unwrap();
1706        assert_eq!(events.len(), 1);
1707        assert_eq!(events[0].allowed, Some(false));
1708        assert_eq!(events[0].authority.as_ref(), Some(&authority));
1709        assert!(store.get_usage("cap-budget-deny", 0).unwrap().is_none());
1710    }
1711}