use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, thiserror::Error)]
pub enum BudgetStoreError {
#[error("sqlite error: {0}")]
Sqlite(#[from] rusqlite::Error),
#[error("failed to prepare budget store directory: {0}")]
Io(#[from] std::io::Error),
#[error("budget arithmetic overflow: {0}")]
Overflow(String),
#[error("budget state invariant violated: {0}")]
Invariant(String),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BudgetUsageRecord {
pub capability_id: String,
pub grant_index: u32,
pub invocation_count: u32,
pub updated_at: i64,
pub seq: u64,
pub total_cost_exposed: u64,
pub total_cost_realized_spend: u64,
}
impl BudgetUsageRecord {
pub fn committed_cost_units(&self) -> Result<u64, BudgetStoreError> {
checked_committed_cost_units(self.total_cost_exposed, self.total_cost_realized_spend)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BudgetMutationKind {
IncrementInvocation,
AuthorizeExposure,
ReverseExposure,
ReleaseExposure,
ReconcileSpend,
}
impl BudgetMutationKind {
pub fn as_str(self) -> &'static str {
match self {
Self::IncrementInvocation => "increment_invocation",
Self::AuthorizeExposure => "authorize_exposure",
Self::ReverseExposure => "reverse_exposure",
Self::ReleaseExposure => "release_exposure",
Self::ReconcileSpend => "reconcile_spend",
}
}
pub fn parse(value: &str) -> Option<Self> {
match value {
"increment_invocation" => Some(Self::IncrementInvocation),
"authorize_exposure" => Some(Self::AuthorizeExposure),
"reverse_exposure" => Some(Self::ReverseExposure),
"release_exposure" => Some(Self::ReleaseExposure),
"reconcile_spend" => Some(Self::ReconcileSpend),
_ => None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BudgetEventAuthority {
pub authority_id: String,
pub lease_id: String,
pub lease_epoch: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BudgetMutationRecord {
pub event_id: String,
pub hold_id: Option<String>,
pub capability_id: String,
pub grant_index: u32,
pub kind: BudgetMutationKind,
pub allowed: Option<bool>,
pub recorded_at: i64,
pub event_seq: u64,
pub usage_seq: Option<u64>,
pub exposure_units: u64,
pub realized_spend_units: u64,
pub max_invocations: Option<u32>,
pub max_cost_per_invocation: Option<u64>,
pub max_total_cost_units: Option<u64>,
pub invocation_count_after: u32,
pub total_cost_exposed_after: u64,
pub total_cost_realized_spend_after: u64,
pub authority: Option<BudgetEventAuthority>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BudgetGuaranteeLevel {
SingleNodeAtomic,
HaLinearizable,
PartitionEscrowed,
AdvisoryPosthoc,
}
impl BudgetGuaranteeLevel {
pub fn as_str(self) -> &'static str {
match self {
Self::SingleNodeAtomic => "single_node_atomic",
Self::HaLinearizable => "ha_linearizable",
Self::PartitionEscrowed => "partition_escrowed",
Self::AdvisoryPosthoc => "advisory_posthoc",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BudgetAuthorityProfile {
AuthoritativeHoldEvent,
}
impl BudgetAuthorityProfile {
pub fn as_str(self) -> &'static str {
match self {
Self::AuthoritativeHoldEvent => "authoritative_hold_event",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BudgetMeteringProfile {
MaxCostPreauthorizeThenReconcileActual,
}
impl BudgetMeteringProfile {
pub fn as_str(self) -> &'static str {
match self {
Self::MaxCostPreauthorizeThenReconcileActual => {
"max_cost_preauthorize_then_reconcile_actual"
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BudgetCommitMetadata {
pub authority: Option<BudgetEventAuthority>,
pub guarantee_level: BudgetGuaranteeLevel,
pub budget_profile: BudgetAuthorityProfile,
pub metering_profile: BudgetMeteringProfile,
pub budget_commit_index: Option<u64>,
pub event_id: Option<String>,
}
impl BudgetCommitMetadata {
pub fn budget_term(&self) -> Option<String> {
self.authority
.as_ref()
.map(|authority| format!("{}:{}", authority.authority_id, authority.lease_epoch))
}
}
fn budget_commit_metadata<T: BudgetStore + ?Sized>(
store: &T,
authority: Option<BudgetEventAuthority>,
budget_commit_index: Option<u64>,
event_id: Option<String>,
) -> BudgetCommitMetadata {
BudgetCommitMetadata {
authority,
guarantee_level: store.budget_guarantee_level(),
budget_profile: store.budget_authority_profile(),
metering_profile: store.budget_metering_profile(),
budget_commit_index,
event_id,
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BudgetAuthorizeHoldRequest {
pub capability_id: String,
pub grant_index: usize,
pub max_invocations: Option<u32>,
pub requested_exposure_units: u64,
pub max_cost_per_invocation: Option<u64>,
pub max_total_cost_units: Option<u64>,
pub hold_id: Option<String>,
pub event_id: Option<String>,
pub authority: Option<BudgetEventAuthority>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BudgetReleaseHoldRequest {
pub capability_id: String,
pub grant_index: usize,
pub released_exposure_units: u64,
pub hold_id: Option<String>,
pub event_id: Option<String>,
pub authority: Option<BudgetEventAuthority>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BudgetReverseHoldRequest {
pub capability_id: String,
pub grant_index: usize,
pub reversed_exposure_units: u64,
pub hold_id: Option<String>,
pub event_id: Option<String>,
pub authority: Option<BudgetEventAuthority>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BudgetReconcileHoldRequest {
pub capability_id: String,
pub grant_index: usize,
pub exposed_cost_units: u64,
pub realized_spend_units: u64,
pub hold_id: Option<String>,
pub event_id: Option<String>,
pub authority: Option<BudgetEventAuthority>,
}
pub type BudgetCaptureHoldRequest = BudgetReconcileHoldRequest;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AuthorizedBudgetHold {
pub hold_id: Option<String>,
pub authorized_exposure_units: u64,
pub committed_cost_units_after: u64,
pub invocation_count_after: u32,
pub metadata: BudgetCommitMetadata,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DeniedBudgetHold {
pub hold_id: Option<String>,
pub attempted_exposure_units: u64,
pub committed_cost_units_after: u64,
pub invocation_count_after: u32,
pub metadata: BudgetCommitMetadata,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BudgetAuthorizeHoldDecision {
Authorized(AuthorizedBudgetHold),
Denied(DeniedBudgetHold),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BudgetHoldMutationDecision {
pub hold_id: Option<String>,
pub exposure_units: u64,
pub realized_spend_units: u64,
pub committed_cost_units_after: u64,
pub invocation_count_after: u32,
pub metadata: BudgetCommitMetadata,
}
pub type BudgetReleaseHoldDecision = BudgetHoldMutationDecision;
pub type BudgetReverseHoldDecision = BudgetHoldMutationDecision;
pub type BudgetReconcileHoldDecision = BudgetHoldMutationDecision;
pub type BudgetCaptureHoldDecision = BudgetHoldMutationDecision;
pub trait BudgetStore: Send {
fn try_increment(
&mut self,
capability_id: &str,
grant_index: usize,
max_invocations: Option<u32>,
) -> Result<bool, BudgetStoreError>;
fn try_charge_cost(
&mut self,
capability_id: &str,
grant_index: usize,
max_invocations: Option<u32>,
cost_units: u64,
max_cost_per_invocation: Option<u64>,
max_total_cost_units: Option<u64>,
) -> Result<bool, BudgetStoreError>;
#[allow(clippy::too_many_arguments)]
fn try_charge_cost_with_ids(
&mut self,
capability_id: &str,
grant_index: usize,
max_invocations: Option<u32>,
cost_units: u64,
max_cost_per_invocation: Option<u64>,
max_total_cost_units: Option<u64>,
hold_id: Option<&str>,
event_id: Option<&str>,
) -> Result<bool, BudgetStoreError> {
let _ = hold_id;
let _ = event_id;
self.try_charge_cost(
capability_id,
grant_index,
max_invocations,
cost_units,
max_cost_per_invocation,
max_total_cost_units,
)
}
#[allow(clippy::too_many_arguments)]
fn try_charge_cost_with_ids_and_authority(
&mut self,
capability_id: &str,
grant_index: usize,
max_invocations: Option<u32>,
cost_units: u64,
max_cost_per_invocation: Option<u64>,
max_total_cost_units: Option<u64>,
hold_id: Option<&str>,
event_id: Option<&str>,
authority: Option<&BudgetEventAuthority>,
) -> Result<bool, BudgetStoreError> {
let _ = authority;
self.try_charge_cost_with_ids(
capability_id,
grant_index,
max_invocations,
cost_units,
max_cost_per_invocation,
max_total_cost_units,
hold_id,
event_id,
)
}
fn reverse_charge_cost(
&mut self,
capability_id: &str,
grant_index: usize,
cost_units: u64,
) -> Result<(), BudgetStoreError>;
fn reverse_charge_cost_with_ids(
&mut self,
capability_id: &str,
grant_index: usize,
cost_units: u64,
hold_id: Option<&str>,
event_id: Option<&str>,
) -> Result<(), BudgetStoreError> {
let _ = hold_id;
let _ = event_id;
self.reverse_charge_cost(capability_id, grant_index, cost_units)
}
fn reverse_charge_cost_with_ids_and_authority(
&mut self,
capability_id: &str,
grant_index: usize,
cost_units: u64,
hold_id: Option<&str>,
event_id: Option<&str>,
authority: Option<&BudgetEventAuthority>,
) -> Result<(), BudgetStoreError> {
let _ = authority;
self.reverse_charge_cost_with_ids(capability_id, grant_index, cost_units, hold_id, event_id)
}
fn reduce_charge_cost(
&mut self,
capability_id: &str,
grant_index: usize,
cost_units: u64,
) -> Result<(), BudgetStoreError>;
fn reduce_charge_cost_with_ids(
&mut self,
capability_id: &str,
grant_index: usize,
cost_units: u64,
hold_id: Option<&str>,
event_id: Option<&str>,
) -> Result<(), BudgetStoreError> {
let _ = hold_id;
let _ = event_id;
self.reduce_charge_cost(capability_id, grant_index, cost_units)
}
fn reduce_charge_cost_with_ids_and_authority(
&mut self,
capability_id: &str,
grant_index: usize,
cost_units: u64,
hold_id: Option<&str>,
event_id: Option<&str>,
authority: Option<&BudgetEventAuthority>,
) -> Result<(), BudgetStoreError> {
let _ = authority;
self.reduce_charge_cost_with_ids(capability_id, grant_index, cost_units, hold_id, event_id)
}
fn settle_charge_cost(
&mut self,
capability_id: &str,
grant_index: usize,
exposed_cost_units: u64,
realized_cost_units: u64,
) -> Result<(), BudgetStoreError>;
fn settle_charge_cost_with_ids(
&mut self,
capability_id: &str,
grant_index: usize,
exposed_cost_units: u64,
realized_cost_units: u64,
hold_id: Option<&str>,
event_id: Option<&str>,
) -> Result<(), BudgetStoreError> {
let _ = hold_id;
let _ = event_id;
self.settle_charge_cost(
capability_id,
grant_index,
exposed_cost_units,
realized_cost_units,
)
}
#[allow(clippy::too_many_arguments)]
fn settle_charge_cost_with_ids_and_authority(
&mut self,
capability_id: &str,
grant_index: usize,
exposed_cost_units: u64,
realized_cost_units: u64,
hold_id: Option<&str>,
event_id: Option<&str>,
authority: Option<&BudgetEventAuthority>,
) -> Result<(), BudgetStoreError> {
let _ = authority;
self.settle_charge_cost_with_ids(
capability_id,
grant_index,
exposed_cost_units,
realized_cost_units,
hold_id,
event_id,
)
}
fn list_usages(
&self,
limit: usize,
capability_id: Option<&str>,
) -> Result<Vec<BudgetUsageRecord>, BudgetStoreError>;
fn get_usage(
&self,
capability_id: &str,
grant_index: usize,
) -> Result<Option<BudgetUsageRecord>, BudgetStoreError>;
fn list_mutation_events(
&self,
_limit: usize,
_capability_id: Option<&str>,
_grant_index: Option<usize>,
) -> Result<Vec<BudgetMutationRecord>, BudgetStoreError> {
Err(BudgetStoreError::Invariant(
"budget mutation events unavailable for this backend".to_string(),
))
}
fn budget_guarantee_level(&self) -> BudgetGuaranteeLevel {
BudgetGuaranteeLevel::SingleNodeAtomic
}
fn budget_authority_profile(&self) -> BudgetAuthorityProfile {
BudgetAuthorityProfile::AuthoritativeHoldEvent
}
fn budget_metering_profile(&self) -> BudgetMeteringProfile {
BudgetMeteringProfile::MaxCostPreauthorizeThenReconcileActual
}
fn authorize_budget_hold(
&mut self,
request: BudgetAuthorizeHoldRequest,
) -> Result<BudgetAuthorizeHoldDecision, BudgetStoreError> {
let allowed = self.try_charge_cost_with_ids_and_authority(
&request.capability_id,
request.grant_index,
request.max_invocations,
request.requested_exposure_units,
request.max_cost_per_invocation,
request.max_total_cost_units,
request.hold_id.as_deref(),
request.event_id.as_deref(),
request.authority.as_ref(),
)?;
let usage = self.get_usage(&request.capability_id, request.grant_index)?;
let committed_cost_units_after = usage
.as_ref()
.map(BudgetUsageRecord::committed_cost_units)
.transpose()?
.unwrap_or(0);
let invocation_count_after = usage.as_ref().map_or(0, |usage| usage.invocation_count);
let metadata = budget_commit_metadata(
self,
request.authority,
allowed
.then(|| usage.as_ref().map(|usage| usage.seq))
.flatten(),
request.event_id,
);
if allowed {
Ok(BudgetAuthorizeHoldDecision::Authorized(
AuthorizedBudgetHold {
hold_id: request.hold_id,
authorized_exposure_units: request.requested_exposure_units,
committed_cost_units_after,
invocation_count_after,
metadata,
},
))
} else {
Ok(BudgetAuthorizeHoldDecision::Denied(DeniedBudgetHold {
hold_id: request.hold_id,
attempted_exposure_units: request.requested_exposure_units,
committed_cost_units_after,
invocation_count_after,
metadata,
}))
}
}
fn reverse_budget_hold(
&mut self,
request: BudgetReverseHoldRequest,
) -> Result<BudgetReverseHoldDecision, BudgetStoreError> {
self.reverse_charge_cost_with_ids_and_authority(
&request.capability_id,
request.grant_index,
request.reversed_exposure_units,
request.hold_id.as_deref(),
request.event_id.as_deref(),
request.authority.as_ref(),
)?;
let usage = self.get_usage(&request.capability_id, request.grant_index)?;
Ok(BudgetHoldMutationDecision {
hold_id: request.hold_id,
exposure_units: request.reversed_exposure_units,
realized_spend_units: 0,
committed_cost_units_after: usage
.as_ref()
.map(BudgetUsageRecord::committed_cost_units)
.transpose()?
.unwrap_or(0),
invocation_count_after: usage.as_ref().map_or(0, |usage| usage.invocation_count),
metadata: budget_commit_metadata(
self,
request.authority,
usage.as_ref().map(|usage| usage.seq),
request.event_id,
),
})
}
fn release_budget_hold(
&mut self,
request: BudgetReleaseHoldRequest,
) -> Result<BudgetReleaseHoldDecision, BudgetStoreError> {
self.reduce_charge_cost_with_ids_and_authority(
&request.capability_id,
request.grant_index,
request.released_exposure_units,
request.hold_id.as_deref(),
request.event_id.as_deref(),
request.authority.as_ref(),
)?;
let usage = self.get_usage(&request.capability_id, request.grant_index)?;
Ok(BudgetHoldMutationDecision {
hold_id: request.hold_id,
exposure_units: request.released_exposure_units,
realized_spend_units: 0,
committed_cost_units_after: usage
.as_ref()
.map(BudgetUsageRecord::committed_cost_units)
.transpose()?
.unwrap_or(0),
invocation_count_after: usage.as_ref().map_or(0, |usage| usage.invocation_count),
metadata: budget_commit_metadata(
self,
request.authority,
usage.as_ref().map(|usage| usage.seq),
request.event_id,
),
})
}
fn reconcile_budget_hold(
&mut self,
request: BudgetReconcileHoldRequest,
) -> Result<BudgetReconcileHoldDecision, BudgetStoreError> {
self.settle_charge_cost_with_ids_and_authority(
&request.capability_id,
request.grant_index,
request.exposed_cost_units,
request.realized_spend_units,
request.hold_id.as_deref(),
request.event_id.as_deref(),
request.authority.as_ref(),
)?;
let usage = self.get_usage(&request.capability_id, request.grant_index)?;
Ok(BudgetHoldMutationDecision {
hold_id: request.hold_id,
exposure_units: request.exposed_cost_units,
realized_spend_units: request.realized_spend_units,
committed_cost_units_after: usage
.as_ref()
.map(BudgetUsageRecord::committed_cost_units)
.transpose()?
.unwrap_or(0),
invocation_count_after: usage.as_ref().map_or(0, |usage| usage.invocation_count),
metadata: budget_commit_metadata(
self,
request.authority,
usage.as_ref().map(|usage| usage.seq),
request.event_id,
),
})
}
fn capture_budget_hold(
&mut self,
request: BudgetCaptureHoldRequest,
) -> Result<BudgetCaptureHoldDecision, BudgetStoreError> {
self.reconcile_budget_hold(request)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum BudgetHoldDisposition {
Open,
Released,
Reversed,
Reconciled,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct BudgetHoldState {
capability_id: String,
grant_index: usize,
authorized_exposure_units: u64,
remaining_exposure_units: u64,
invocation_count_debited: bool,
disposition: BudgetHoldDisposition,
authority: Option<BudgetEventAuthority>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum BudgetMutationRequest {
Increment {
capability_id: String,
grant_index: usize,
max_invocations: Option<u32>,
},
Authorize {
capability_id: String,
grant_index: usize,
hold_id: Option<String>,
authority: Option<BudgetEventAuthority>,
cost_units: u64,
max_invocations: Option<u32>,
max_cost_per_invocation: Option<u64>,
max_total_cost_units: Option<u64>,
},
Reverse {
capability_id: String,
grant_index: usize,
hold_id: Option<String>,
authority: Option<BudgetEventAuthority>,
cost_units: u64,
},
Release {
capability_id: String,
grant_index: usize,
hold_id: Option<String>,
authority: Option<BudgetEventAuthority>,
cost_units: u64,
},
Reconcile {
capability_id: String,
grant_index: usize,
hold_id: Option<String>,
authority: Option<BudgetEventAuthority>,
exposed_cost_units: u64,
realized_cost_units: u64,
},
}
#[derive(Debug, Clone)]
struct RecordedBudgetMutation {
request: BudgetMutationRequest,
record: BudgetMutationRecord,
}
#[derive(Default)]
pub struct InMemoryBudgetStore {
counts: HashMap<(String, usize), BudgetUsageRecord>,
events: Vec<BudgetMutationRecord>,
explicit_events: HashMap<String, RecordedBudgetMutation>,
holds: HashMap<String, BudgetHoldState>,
next_seq: u64,
next_event_ordinal: u64,
}
impl InMemoryBudgetStore {
pub fn new() -> Self {
Self::default()
}
fn next_event_id(&mut self) -> String {
self.next_event_ordinal = self.next_event_ordinal.saturating_add(1);
format!("local-budget-event-{}", self.next_event_ordinal)
}
fn duplicate_mutation(
&self,
event_id: Option<&str>,
request: &BudgetMutationRequest,
) -> Result<Option<RecordedBudgetMutation>, BudgetStoreError> {
let Some(event_id) = event_id else {
return Ok(None);
};
let Some(existing) = self.explicit_events.get(event_id) else {
return Ok(None);
};
if &existing.request != request {
return Err(BudgetStoreError::Invariant(format!(
"budget event_id `{event_id}` was reused for a different mutation"
)));
}
Ok(Some(existing.clone()))
}
fn append_mutation(
&mut self,
explicit_event_id: Option<&str>,
request: BudgetMutationRequest,
mut record: BudgetMutationRecord,
) {
let event_id = explicit_event_id
.map(ToOwned::to_owned)
.unwrap_or_else(|| self.next_event_id());
record.event_id = event_id.clone();
self.events.push(record.clone());
if explicit_event_id.is_some() {
self.explicit_events
.insert(event_id, RecordedBudgetMutation { request, record });
}
}
fn validate_open_hold(
&self,
hold_id: &str,
capability_id: &str,
grant_index: usize,
) -> Result<&BudgetHoldState, BudgetStoreError> {
let hold = self.holds.get(hold_id).ok_or_else(|| {
BudgetStoreError::Invariant(format!("missing budget hold `{hold_id}`"))
})?;
if hold.capability_id != capability_id || hold.grant_index != grant_index {
return Err(BudgetStoreError::Invariant(format!(
"budget hold `{hold_id}` does not match capability/grant"
)));
}
if hold.disposition != BudgetHoldDisposition::Open {
return Err(BudgetStoreError::Invariant(format!(
"budget hold `{hold_id}` is no longer open"
)));
}
Ok(hold)
}
fn validate_hold_authority(
hold_id: &str,
current: Option<&BudgetEventAuthority>,
requested: Option<&BudgetEventAuthority>,
) -> Result<Option<BudgetEventAuthority>, BudgetStoreError> {
match (current, requested) {
(None, None) => Ok(None),
(None, Some(_)) => Err(BudgetStoreError::Invariant(format!(
"budget hold `{hold_id}` was created without authority lease metadata"
))),
(Some(_), None) => Err(BudgetStoreError::Invariant(format!(
"budget hold `{hold_id}` requires authority lease metadata"
))),
(Some(current), Some(requested)) => {
if current.authority_id != requested.authority_id {
return Err(BudgetStoreError::Invariant(format!(
"budget hold `{hold_id}` authority_id does not match the open lease"
)));
}
if requested.lease_id != current.lease_id {
return Err(BudgetStoreError::Invariant(format!(
"budget hold `{hold_id}` lease_id does not match the open lease epoch"
)));
}
if requested.lease_epoch < current.lease_epoch {
return Err(BudgetStoreError::Invariant(format!(
"budget hold `{hold_id}` authority lease epoch regressed"
)));
}
if requested.lease_epoch > current.lease_epoch {
return Err(BudgetStoreError::Invariant(format!(
"budget hold `{hold_id}` authority lease epoch advanced beyond the open lease"
)));
}
Ok(Some(requested.clone()))
}
}
}
fn default_usage_record(capability_id: &str, grant_index: usize) -> BudgetUsageRecord {
BudgetUsageRecord {
capability_id: capability_id.to_string(),
grant_index: grant_index as u32,
invocation_count: 0,
updated_at: unix_now(),
seq: 0,
total_cost_exposed: 0,
total_cost_realized_spend: 0,
}
}
}
fn checked_committed_cost_units(
total_cost_exposed: u64,
total_cost_realized_spend: u64,
) -> Result<u64, BudgetStoreError> {
total_cost_exposed
.checked_add(total_cost_realized_spend)
.ok_or_else(|| {
BudgetStoreError::Overflow(
"total_cost_exposed + total_cost_realized_spend overflowed u64".to_string(),
)
})
}
impl BudgetStore for InMemoryBudgetStore {
fn try_increment(
&mut self,
capability_id: &str,
grant_index: usize,
max_invocations: Option<u32>,
) -> Result<bool, BudgetStoreError> {
let request = BudgetMutationRequest::Increment {
capability_id: capability_id.to_string(),
grant_index,
max_invocations,
};
let key = (capability_id.to_string(), grant_index);
let current = self
.counts
.get(&key)
.cloned()
.unwrap_or_else(|| Self::default_usage_record(capability_id, grant_index));
let allowed = max_invocations.is_none_or(|max| current.invocation_count < max);
let recorded_at = unix_now();
let event_seq = self.next_seq.saturating_add(1);
self.next_seq = event_seq;
let usage_seq = if allowed {
let entry = self
.counts
.entry(key)
.or_insert_with(|| Self::default_usage_record(capability_id, grant_index));
entry.invocation_count = current.invocation_count.saturating_add(1);
entry.updated_at = recorded_at;
entry.seq = event_seq;
Some(event_seq)
} else {
None
};
self.append_mutation(
None,
request,
BudgetMutationRecord {
event_id: String::new(),
hold_id: None,
capability_id: capability_id.to_string(),
grant_index: grant_index as u32,
kind: BudgetMutationKind::IncrementInvocation,
allowed: Some(allowed),
recorded_at,
event_seq,
usage_seq,
exposure_units: 0,
realized_spend_units: 0,
max_invocations,
max_cost_per_invocation: None,
max_total_cost_units: None,
invocation_count_after: if allowed {
current.invocation_count.saturating_add(1)
} else {
current.invocation_count
},
total_cost_exposed_after: current.total_cost_exposed,
total_cost_realized_spend_after: current.total_cost_realized_spend,
authority: None,
},
);
Ok(allowed)
}
fn try_charge_cost(
&mut self,
capability_id: &str,
grant_index: usize,
max_invocations: Option<u32>,
cost_units: u64,
max_cost_per_invocation: Option<u64>,
max_total_cost_units: Option<u64>,
) -> Result<bool, BudgetStoreError> {
self.try_charge_cost_with_ids(
capability_id,
grant_index,
max_invocations,
cost_units,
max_cost_per_invocation,
max_total_cost_units,
None,
None,
)
}
fn try_charge_cost_with_ids(
&mut self,
capability_id: &str,
grant_index: usize,
max_invocations: Option<u32>,
cost_units: u64,
max_cost_per_invocation: Option<u64>,
max_total_cost_units: Option<u64>,
hold_id: Option<&str>,
event_id: Option<&str>,
) -> Result<bool, BudgetStoreError> {
self.try_charge_cost_with_ids_and_authority(
capability_id,
grant_index,
max_invocations,
cost_units,
max_cost_per_invocation,
max_total_cost_units,
hold_id,
event_id,
None,
)
}
fn try_charge_cost_with_ids_and_authority(
&mut self,
capability_id: &str,
grant_index: usize,
max_invocations: Option<u32>,
cost_units: u64,
max_cost_per_invocation: Option<u64>,
max_total_cost_units: Option<u64>,
hold_id: Option<&str>,
event_id: Option<&str>,
authority: Option<&BudgetEventAuthority>,
) -> Result<bool, BudgetStoreError> {
let request = BudgetMutationRequest::Authorize {
capability_id: capability_id.to_string(),
grant_index,
hold_id: hold_id.map(ToOwned::to_owned),
authority: authority.cloned(),
cost_units,
max_invocations,
max_cost_per_invocation,
max_total_cost_units,
};
if let Some(existing) = self.duplicate_mutation(event_id, &request)? {
return Ok(existing.record.allowed.unwrap_or(false));
}
let key = (capability_id.to_string(), grant_index);
let current = self
.counts
.get(&key)
.cloned()
.unwrap_or_else(|| Self::default_usage_record(capability_id, grant_index));
let mut allowed = true;
if let Some(max) = max_invocations {
if current.invocation_count >= max {
allowed = false;
}
}
if let Some(max_per) = max_cost_per_invocation {
if cost_units > max_per {
allowed = false;
}
}
if let Some(max_total) = max_total_cost_units {
let current_total = checked_committed_cost_units(
current.total_cost_exposed,
current.total_cost_realized_spend,
)?;
let new_total = current_total.checked_add(cost_units).ok_or_else(|| {
BudgetStoreError::Overflow(
"authorized exposure + cost_units overflowed u64".to_string(),
)
})?;
if new_total > max_total {
allowed = false;
}
}
let recorded_at = unix_now();
let (invocation_count_after, total_cost_exposed_after, total_cost_realized_spend_after);
let event_seq;
let mut usage_seq = None;
if allowed {
if let Some(hold_id) = hold_id {
if self.holds.contains_key(hold_id) {
return Err(BudgetStoreError::Invariant(format!(
"budget hold `{hold_id}` already exists"
)));
}
}
let new_total_cost_exposed = current
.total_cost_exposed
.checked_add(cost_units)
.ok_or_else(|| {
BudgetStoreError::Overflow(
"total_cost_exposed + cost_units overflowed u64".to_string(),
)
})?;
event_seq = self.next_seq.saturating_add(1);
self.next_seq = event_seq;
let entry = self
.counts
.entry(key)
.or_insert_with(|| Self::default_usage_record(capability_id, grant_index));
entry.invocation_count = current.invocation_count.saturating_add(1);
entry.total_cost_exposed = new_total_cost_exposed;
entry.updated_at = recorded_at;
entry.seq = event_seq;
if let Some(hold_id) = hold_id {
self.holds.insert(
hold_id.to_string(),
BudgetHoldState {
capability_id: capability_id.to_string(),
grant_index,
authorized_exposure_units: cost_units,
remaining_exposure_units: cost_units,
invocation_count_debited: true,
disposition: BudgetHoldDisposition::Open,
authority: authority.cloned(),
},
);
}
invocation_count_after = entry.invocation_count;
total_cost_exposed_after = entry.total_cost_exposed;
total_cost_realized_spend_after = entry.total_cost_realized_spend;
usage_seq = Some(event_seq);
} else {
event_seq = self.next_seq.saturating_add(1);
self.next_seq = event_seq;
invocation_count_after = current.invocation_count;
total_cost_exposed_after = current.total_cost_exposed;
total_cost_realized_spend_after = current.total_cost_realized_spend;
}
self.append_mutation(
event_id,
request,
BudgetMutationRecord {
event_id: String::new(),
hold_id: hold_id.map(ToOwned::to_owned),
capability_id: capability_id.to_string(),
grant_index: grant_index as u32,
kind: BudgetMutationKind::AuthorizeExposure,
allowed: Some(allowed),
recorded_at,
event_seq,
usage_seq,
exposure_units: cost_units,
realized_spend_units: 0,
max_invocations,
max_cost_per_invocation,
max_total_cost_units,
invocation_count_after,
total_cost_exposed_after,
total_cost_realized_spend_after,
authority: authority.cloned(),
},
);
Ok(allowed)
}
fn reverse_charge_cost(
&mut self,
capability_id: &str,
grant_index: usize,
cost_units: u64,
) -> Result<(), BudgetStoreError> {
self.reverse_charge_cost_with_ids(capability_id, grant_index, cost_units, None, None)
}
fn reverse_charge_cost_with_ids(
&mut self,
capability_id: &str,
grant_index: usize,
cost_units: u64,
hold_id: Option<&str>,
event_id: Option<&str>,
) -> Result<(), BudgetStoreError> {
self.reverse_charge_cost_with_ids_and_authority(
capability_id,
grant_index,
cost_units,
hold_id,
event_id,
None,
)
}
fn reverse_charge_cost_with_ids_and_authority(
&mut self,
capability_id: &str,
grant_index: usize,
cost_units: u64,
hold_id: Option<&str>,
event_id: Option<&str>,
authority: Option<&BudgetEventAuthority>,
) -> Result<(), BudgetStoreError> {
let request = BudgetMutationRequest::Reverse {
capability_id: capability_id.to_string(),
grant_index,
hold_id: hold_id.map(ToOwned::to_owned),
authority: authority.cloned(),
cost_units,
};
if self.duplicate_mutation(event_id, &request)?.is_some() {
return Ok(());
}
if let Some(hold_id) = hold_id {
let hold = self.validate_open_hold(hold_id, capability_id, grant_index)?;
if hold.remaining_exposure_units != cost_units || !hold.invocation_count_debited {
return Err(BudgetStoreError::Invariant(format!(
"budget hold `{hold_id}` does not match reverse amount"
)));
}
Self::validate_hold_authority(hold_id, hold.authority.as_ref(), authority)?;
}
let key = (capability_id.to_string(), grant_index);
let (
invocation_count_after,
total_cost_exposed_after,
total_cost_realized_spend_after,
seq,
);
{
let entry = self.counts.get_mut(&key).ok_or_else(|| {
BudgetStoreError::Invariant("missing charged budget row".to_string())
})?;
if entry.invocation_count == 0 {
return Err(BudgetStoreError::Invariant(
"cannot reverse charge with zero invocation_count".to_string(),
));
}
if entry.total_cost_exposed < cost_units {
return Err(BudgetStoreError::Invariant(
"cannot reverse charge larger than total_cost_exposed".to_string(),
));
}
let next_seq = self.next_seq.saturating_add(1);
self.next_seq = next_seq;
entry.invocation_count -= 1;
entry.total_cost_exposed -= cost_units;
entry.updated_at = unix_now();
entry.seq = next_seq;
invocation_count_after = entry.invocation_count;
total_cost_exposed_after = entry.total_cost_exposed;
total_cost_realized_spend_after = entry.total_cost_realized_spend;
seq = entry.seq;
}
if let Some(hold_id) = hold_id {
let Some(hold) = self.holds.get_mut(hold_id) else {
return Err(BudgetStoreError::Invariant(
"validated hold missing during reverse_charge_cost".to_string(),
));
};
hold.remaining_exposure_units = 0;
hold.disposition = BudgetHoldDisposition::Reversed;
hold.authority = authority.cloned().or_else(|| hold.authority.clone());
}
self.append_mutation(
event_id,
request,
BudgetMutationRecord {
event_id: String::new(),
hold_id: hold_id.map(ToOwned::to_owned),
capability_id: capability_id.to_string(),
grant_index: grant_index as u32,
kind: BudgetMutationKind::ReverseExposure,
allowed: None,
recorded_at: unix_now(),
event_seq: seq,
usage_seq: Some(seq),
exposure_units: cost_units,
realized_spend_units: 0,
max_invocations: None,
max_cost_per_invocation: None,
max_total_cost_units: None,
invocation_count_after,
total_cost_exposed_after,
total_cost_realized_spend_after,
authority: authority.cloned(),
},
);
Ok(())
}
fn reduce_charge_cost(
&mut self,
capability_id: &str,
grant_index: usize,
cost_units: u64,
) -> Result<(), BudgetStoreError> {
self.reduce_charge_cost_with_ids(capability_id, grant_index, cost_units, None, None)
}
fn reduce_charge_cost_with_ids(
&mut self,
capability_id: &str,
grant_index: usize,
cost_units: u64,
hold_id: Option<&str>,
event_id: Option<&str>,
) -> Result<(), BudgetStoreError> {
self.reduce_charge_cost_with_ids_and_authority(
capability_id,
grant_index,
cost_units,
hold_id,
event_id,
None,
)
}
fn reduce_charge_cost_with_ids_and_authority(
&mut self,
capability_id: &str,
grant_index: usize,
cost_units: u64,
hold_id: Option<&str>,
event_id: Option<&str>,
authority: Option<&BudgetEventAuthority>,
) -> Result<(), BudgetStoreError> {
let request = BudgetMutationRequest::Release {
capability_id: capability_id.to_string(),
grant_index,
hold_id: hold_id.map(ToOwned::to_owned),
authority: authority.cloned(),
cost_units,
};
if self.duplicate_mutation(event_id, &request)?.is_some() {
return Ok(());
}
if let Some(hold_id) = hold_id {
let hold = self.validate_open_hold(hold_id, capability_id, grant_index)?;
if hold.remaining_exposure_units < cost_units {
return Err(BudgetStoreError::Invariant(format!(
"budget hold `{hold_id}` cannot release more than remaining exposure"
)));
}
Self::validate_hold_authority(hold_id, hold.authority.as_ref(), authority)?;
}
let key = (capability_id.to_string(), grant_index);
let (
invocation_count_after,
total_cost_exposed_after,
total_cost_realized_spend_after,
seq,
);
{
let entry = self.counts.get_mut(&key).ok_or_else(|| {
BudgetStoreError::Invariant("missing charged budget row".to_string())
})?;
if entry.total_cost_exposed < cost_units {
return Err(BudgetStoreError::Invariant(
"cannot reduce charge larger than total_cost_exposed".to_string(),
));
}
let next_seq = self.next_seq.saturating_add(1);
self.next_seq = next_seq;
entry.total_cost_exposed -= cost_units;
entry.updated_at = unix_now();
entry.seq = next_seq;
invocation_count_after = entry.invocation_count;
total_cost_exposed_after = entry.total_cost_exposed;
total_cost_realized_spend_after = entry.total_cost_realized_spend;
seq = entry.seq;
}
if let Some(hold_id) = hold_id {
let Some(hold) = self.holds.get_mut(hold_id) else {
return Err(BudgetStoreError::Invariant(
"validated hold missing during release_charge_cost".to_string(),
));
};
hold.remaining_exposure_units -= cost_units;
if hold.remaining_exposure_units == 0 {
hold.disposition = BudgetHoldDisposition::Released;
}
hold.authority = authority.cloned().or_else(|| hold.authority.clone());
}
self.append_mutation(
event_id,
request,
BudgetMutationRecord {
event_id: String::new(),
hold_id: hold_id.map(ToOwned::to_owned),
capability_id: capability_id.to_string(),
grant_index: grant_index as u32,
kind: BudgetMutationKind::ReleaseExposure,
allowed: None,
recorded_at: unix_now(),
event_seq: seq,
usage_seq: Some(seq),
exposure_units: cost_units,
realized_spend_units: 0,
max_invocations: None,
max_cost_per_invocation: None,
max_total_cost_units: None,
invocation_count_after,
total_cost_exposed_after,
total_cost_realized_spend_after,
authority: authority.cloned(),
},
);
Ok(())
}
fn settle_charge_cost(
&mut self,
capability_id: &str,
grant_index: usize,
exposed_cost_units: u64,
realized_cost_units: u64,
) -> Result<(), BudgetStoreError> {
self.settle_charge_cost_with_ids(
capability_id,
grant_index,
exposed_cost_units,
realized_cost_units,
None,
None,
)
}
fn settle_charge_cost_with_ids(
&mut self,
capability_id: &str,
grant_index: usize,
exposed_cost_units: u64,
realized_cost_units: u64,
hold_id: Option<&str>,
event_id: Option<&str>,
) -> Result<(), BudgetStoreError> {
self.settle_charge_cost_with_ids_and_authority(
capability_id,
grant_index,
exposed_cost_units,
realized_cost_units,
hold_id,
event_id,
None,
)
}
fn settle_charge_cost_with_ids_and_authority(
&mut self,
capability_id: &str,
grant_index: usize,
exposed_cost_units: u64,
realized_cost_units: u64,
hold_id: Option<&str>,
event_id: Option<&str>,
authority: Option<&BudgetEventAuthority>,
) -> Result<(), BudgetStoreError> {
if realized_cost_units > exposed_cost_units {
return Err(BudgetStoreError::Invariant(
"cannot realize spend larger than exposed cost".to_string(),
));
}
let request = BudgetMutationRequest::Reconcile {
capability_id: capability_id.to_string(),
grant_index,
hold_id: hold_id.map(ToOwned::to_owned),
authority: authority.cloned(),
exposed_cost_units,
realized_cost_units,
};
if self.duplicate_mutation(event_id, &request)?.is_some() {
return Ok(());
}
if let Some(hold_id) = hold_id {
let hold = self.validate_open_hold(hold_id, capability_id, grant_index)?;
if hold.remaining_exposure_units != exposed_cost_units {
return Err(BudgetStoreError::Invariant(format!(
"budget hold `{hold_id}` does not match reconciled exposure"
)));
}
Self::validate_hold_authority(hold_id, hold.authority.as_ref(), authority)?;
}
let key = (capability_id.to_string(), grant_index);
let (
invocation_count_after,
total_cost_exposed_after,
total_cost_realized_spend_after,
seq,
);
{
let entry = self.counts.get_mut(&key).ok_or_else(|| {
BudgetStoreError::Invariant("missing charged budget row".to_string())
})?;
if entry.invocation_count == 0 {
return Err(BudgetStoreError::Invariant(
"cannot settle charge with zero invocation_count".to_string(),
));
}
if entry.total_cost_exposed < exposed_cost_units {
return Err(BudgetStoreError::Invariant(
"cannot settle more exposure than total_cost_exposed".to_string(),
));
}
entry.total_cost_realized_spend = entry
.total_cost_realized_spend
.checked_add(realized_cost_units)
.ok_or_else(|| {
BudgetStoreError::Overflow(
"total_cost_realized_spend + realized_cost_units overflowed u64"
.to_string(),
)
})?;
entry.total_cost_exposed -= exposed_cost_units;
let next_seq = self.next_seq.saturating_add(1);
self.next_seq = next_seq;
entry.updated_at = unix_now();
entry.seq = next_seq;
invocation_count_after = entry.invocation_count;
total_cost_exposed_after = entry.total_cost_exposed;
total_cost_realized_spend_after = entry.total_cost_realized_spend;
seq = entry.seq;
}
if let Some(hold_id) = hold_id {
let Some(hold) = self.holds.get_mut(hold_id) else {
return Err(BudgetStoreError::Invariant(
"validated hold missing during settle_charge_cost".to_string(),
));
};
hold.remaining_exposure_units = 0;
hold.disposition = BudgetHoldDisposition::Reconciled;
hold.authority = authority.cloned().or_else(|| hold.authority.clone());
}
self.append_mutation(
event_id,
request,
BudgetMutationRecord {
event_id: String::new(),
hold_id: hold_id.map(ToOwned::to_owned),
capability_id: capability_id.to_string(),
grant_index: grant_index as u32,
kind: BudgetMutationKind::ReconcileSpend,
allowed: None,
recorded_at: unix_now(),
event_seq: seq,
usage_seq: Some(seq),
exposure_units: exposed_cost_units,
realized_spend_units: realized_cost_units,
max_invocations: None,
max_cost_per_invocation: None,
max_total_cost_units: None,
invocation_count_after,
total_cost_exposed_after,
total_cost_realized_spend_after,
authority: authority.cloned(),
},
);
Ok(())
}
fn list_usages(
&self,
limit: usize,
capability_id: Option<&str>,
) -> Result<Vec<BudgetUsageRecord>, BudgetStoreError> {
let mut records = self
.counts
.values()
.filter(|record| capability_id.is_none_or(|value| record.capability_id == value))
.cloned()
.collect::<Vec<_>>();
records.sort_by(|left, right| {
right
.updated_at
.cmp(&left.updated_at)
.then_with(|| left.capability_id.cmp(&right.capability_id))
.then_with(|| left.grant_index.cmp(&right.grant_index))
});
records.truncate(limit);
Ok(records)
}
fn get_usage(
&self,
capability_id: &str,
grant_index: usize,
) -> Result<Option<BudgetUsageRecord>, BudgetStoreError> {
Ok(self
.counts
.get(&(capability_id.to_string(), grant_index))
.cloned())
}
fn list_mutation_events(
&self,
limit: usize,
capability_id: Option<&str>,
grant_index: Option<usize>,
) -> Result<Vec<BudgetMutationRecord>, BudgetStoreError> {
let mut events = self
.events
.iter()
.filter(|record| capability_id.is_none_or(|value| record.capability_id == value))
.filter(|record| grant_index.is_none_or(|value| record.grant_index == value as u32))
.cloned()
.collect::<Vec<_>>();
events.truncate(limit);
Ok(events)
}
}
fn unix_now() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_secs() as i64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn authorize_and_reconcile_hold_preserve_authority_metadata() {
let mut store = InMemoryBudgetStore::new();
let authority = BudgetEventAuthority {
authority_id: "kernel:test-authority".to_string(),
lease_id: "single-node".to_string(),
lease_epoch: 0,
};
let decision = store
.authorize_budget_hold(BudgetAuthorizeHoldRequest {
capability_id: "cap-budget-1".to_string(),
grant_index: 0,
max_invocations: Some(4),
requested_exposure_units: 100,
max_cost_per_invocation: Some(100),
max_total_cost_units: Some(1_000),
hold_id: Some("hold-budget-1".to_string()),
event_id: Some("hold-budget-1:authorize".to_string()),
authority: Some(authority.clone()),
})
.unwrap();
let BudgetAuthorizeHoldDecision::Authorized(authorized) = decision else {
panic!("budget hold should be authorized");
};
assert_eq!(authorized.committed_cost_units_after, 100);
assert_eq!(
authorized.metadata.event_id.as_deref(),
Some("hold-budget-1:authorize")
);
assert_eq!(authorized.metadata.budget_commit_index, Some(1));
assert_eq!(
authorized.metadata.budget_term().as_deref(),
Some("kernel:test-authority:0")
);
let reconcile = store
.reconcile_budget_hold(BudgetReconcileHoldRequest {
capability_id: "cap-budget-1".to_string(),
grant_index: 0,
exposed_cost_units: 100,
realized_spend_units: 75,
hold_id: Some("hold-budget-1".to_string()),
event_id: Some("hold-budget-1:reconcile".to_string()),
authority: Some(authority.clone()),
})
.unwrap();
assert_eq!(reconcile.committed_cost_units_after, 75);
assert_eq!(reconcile.realized_spend_units, 75);
assert_eq!(
reconcile.metadata.event_id.as_deref(),
Some("hold-budget-1:reconcile")
);
assert_eq!(reconcile.metadata.budget_commit_index, Some(2));
assert_eq!(reconcile.metadata.authority.as_ref(), Some(&authority));
let usage = store.get_usage("cap-budget-1", 0).unwrap().unwrap();
assert_eq!(usage.total_cost_exposed, 0);
assert_eq!(usage.total_cost_realized_spend, 75);
assert_eq!(usage.committed_cost_units().unwrap(), 75);
let events = store
.list_mutation_events(10, Some("cap-budget-1"), Some(0))
.unwrap();
assert_eq!(events.len(), 2);
assert_eq!(events[0].kind, BudgetMutationKind::AuthorizeExposure);
assert_eq!(events[0].authority.as_ref(), Some(&authority));
assert_eq!(events[1].kind, BudgetMutationKind::ReconcileSpend);
assert_eq!(events[1].authority.as_ref(), Some(&authority));
assert_eq!(events[1].realized_spend_units, 75);
}
#[test]
fn denied_authorize_hold_reports_guarantee_metadata_without_commit_index() {
let mut store = InMemoryBudgetStore::new();
let authority = BudgetEventAuthority {
authority_id: "kernel:test-authority".to_string(),
lease_id: "single-node".to_string(),
lease_epoch: 0,
};
let decision = store
.authorize_budget_hold(BudgetAuthorizeHoldRequest {
capability_id: "cap-budget-deny".to_string(),
grant_index: 0,
max_invocations: Some(1),
requested_exposure_units: 150,
max_cost_per_invocation: Some(100),
max_total_cost_units: Some(1_000),
hold_id: Some("hold-budget-deny".to_string()),
event_id: Some("hold-budget-deny:authorize".to_string()),
authority: Some(authority.clone()),
})
.unwrap();
let BudgetAuthorizeHoldDecision::Denied(denied) = decision else {
panic!("budget hold should be denied");
};
assert_eq!(denied.committed_cost_units_after, 0);
assert_eq!(denied.invocation_count_after, 0);
assert_eq!(
denied.metadata.event_id.as_deref(),
Some("hold-budget-deny:authorize")
);
assert_eq!(denied.metadata.budget_commit_index, None);
assert_eq!(
denied.metadata.guarantee_level,
BudgetGuaranteeLevel::SingleNodeAtomic
);
assert_eq!(denied.metadata.authority.as_ref(), Some(&authority));
let events = store
.list_mutation_events(10, Some("cap-budget-deny"), Some(0))
.unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].allowed, Some(false));
assert_eq!(events[0].authority.as_ref(), Some(&authority));
assert!(store.get_usage("cap-budget-deny", 0).unwrap().is_none());
}
}