#[cfg(test)]
use crate::asupersync::atp::inbox::ObjectDigest;
#[cfg(test)]
use crate::asupersync::atp::quota::{
QuotaAllocation, QuotaBucket, QuotaError, QuotaLedger, QuotaRow, QuotaUsage, RetentionClock,
RetentionPolicy, RetentionRecord,
};
#[cfg(not(test))]
use crate::atp::inbox::ObjectDigest;
#[cfg(not(test))]
use crate::atp::quota::{
QuotaAllocation, QuotaBucket, QuotaError, QuotaLedger, QuotaRow, QuotaUsage, RetentionClock,
RetentionPolicy, RetentionRecord,
};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
pub const ATPD_STATE_SCHEMA_VERSION: u32 = 1;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct AtpdSchemaVersion(pub u32);
impl AtpdSchemaVersion {
pub const CURRENT: Self = Self(ATPD_STATE_SCHEMA_VERSION);
#[must_use]
pub const fn is_supported(self) -> bool {
self.0 <= Self::CURRENT.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AtpdStateCollection {
Identities,
Grants,
Transfers,
Journals,
Cache,
Inbox,
Mailbox,
ReceivePlans,
ConsentRecords,
ProofBundles,
Traces,
Diagnostics,
Settings,
Quarantine,
}
impl AtpdStateCollection {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Identities => "identities",
Self::Grants => "grants",
Self::Transfers => "transfers",
Self::Journals => "journals",
Self::Cache => "cache",
Self::Inbox => "inbox",
Self::Mailbox => "mailbox",
Self::ReceivePlans => "receive_plans",
Self::ConsentRecords => "consent_records",
Self::ProofBundles => "proof_bundles",
Self::Traces => "traces",
Self::Diagnostics => "diagnostics",
Self::Settings => "settings",
Self::Quarantine => "quarantine",
}
}
#[must_use]
pub const fn quota_bucket(self) -> QuotaBucket {
match self {
Self::Identities => QuotaBucket::Identities,
Self::Grants | Self::ConsentRecords | Self::ReceivePlans => QuotaBucket::Grants,
Self::Transfers => QuotaBucket::Transfers,
Self::Journals => QuotaBucket::PartialJournals,
Self::Cache => QuotaBucket::Cache,
Self::Inbox => QuotaBucket::Inbox,
Self::Mailbox => QuotaBucket::Mailbox,
Self::ProofBundles => QuotaBucket::ProofArtifacts,
Self::Traces => QuotaBucket::Traces,
Self::Diagnostics => QuotaBucket::Diagnostics,
Self::Settings => QuotaBucket::Settings,
Self::Quarantine => QuotaBucket::Quarantine,
}
}
}
impl fmt::Display for AtpdStateCollection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum StateSensitivity {
Public,
Internal,
PeerIdentity,
GrantSecret,
KeyMaterial,
PrivateContent,
Quarantine,
}
impl StateSensitivity {
#[allow(dead_code)]
#[must_use]
pub const fn redact_by_default(self) -> bool {
!matches!(self, Self::Public)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum StateExportPolicy {
Deny,
Redacted,
ExplicitPolicyRequired,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AtpdExportMode {
Redacted,
Full,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AtpdStateRecord {
pub record_id: String,
pub collection: AtpdStateCollection,
pub payload_digest: ObjectDigest,
pub redacted_summary: String,
pub bytes: u64,
pub retention: RetentionClock,
pub sensitivity: StateSensitivity,
pub export_policy: StateExportPolicy,
pub quarantined: bool,
pub fields: BTreeSet<String>,
}
impl AtpdStateRecord {
#[must_use]
pub fn new(
record_id: impl Into<String>,
collection: AtpdStateCollection,
payload_digest: ObjectDigest,
redacted_summary: impl Into<String>,
bytes: u64,
retention: RetentionClock,
sensitivity: StateSensitivity,
) -> Self {
Self {
record_id: record_id.into(),
collection,
payload_digest,
redacted_summary: redacted_summary.into(),
bytes,
retention,
sensitivity,
export_policy: StateExportPolicy::Redacted,
quarantined: collection == AtpdStateCollection::Quarantine,
fields: BTreeSet::new(),
}
}
#[must_use]
pub const fn with_export_policy(mut self, export_policy: StateExportPolicy) -> Self {
self.export_policy = export_policy;
self
}
#[must_use]
pub fn with_field(mut self, field: impl Into<String>) -> Self {
self.fields.insert(field.into());
self
}
fn quota_allocation(&self) -> QuotaAllocation {
QuotaAllocation::one_record(self.collection.quota_bucket(), self.bytes)
}
fn retention_record(&self) -> RetentionRecord {
RetentionRecord {
record_id: scoped_record_id(self.collection, &self.record_id),
bucket: self.collection.quota_bucket(),
clock: self.retention,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AtpdExportRecord {
pub scoped_id: String,
pub collection: AtpdStateCollection,
pub summary: String,
pub payload_digest: String,
pub bytes: u64,
pub sensitivity: StateSensitivity,
pub redacted: bool,
pub fields: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AtpdStateExport {
pub schema_version: AtpdSchemaVersion,
pub mode: String,
pub records: Vec<AtpdExportRecord>,
pub quota_rows: Vec<QuotaRow>,
pub settings: AtpdStateSettings,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AtpdIntegrityReport {
pub missing_collections: Vec<AtpdStateCollection>,
pub quota_mismatches: Vec<AtpdQuotaMismatch>,
}
impl AtpdIntegrityReport {
#[allow(dead_code)]
#[must_use]
pub fn is_clean(&self) -> bool {
self.missing_collections.is_empty() && self.quota_mismatches.is_empty()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct AtpdQuotaMismatch {
pub bucket: QuotaBucket,
pub expected: QuotaUsage,
pub actual: QuotaUsage,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AtpdStateSettings {
pub storage_root_label: String,
pub require_policy_for_full_export: bool,
pub key_material_is_handle_only: bool,
pub diagnostics_redacted_by_default: bool,
}
impl Default for AtpdStateSettings {
fn default() -> Self {
Self {
storage_root_label: "cx_scoped_atpd_state".to_string(),
require_policy_for_full_export: true,
key_material_is_handle_only: true,
diagnostics_redacted_by_default: true,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AtpdPersistentState {
pub schema_version: AtpdSchemaVersion,
pub settings: AtpdStateSettings,
pub quota_ledger: QuotaLedger,
pub retention_policy: RetentionPolicy,
pub records: BTreeMap<AtpdStateCollection, BTreeMap<String, AtpdStateRecord>>,
}
impl AtpdPersistentState {
#[must_use]
pub fn new() -> Self {
Self {
schema_version: AtpdSchemaVersion::CURRENT,
settings: AtpdStateSettings::default(),
quota_ledger: QuotaLedger::daemon_defaults(),
retention_policy: RetentionPolicy::daemon_defaults(),
records: BTreeMap::new(),
}
}
#[must_use]
pub fn covers_required_collections(&self) -> bool {
required_collections()
.iter()
.all(|collection| self.records.contains_key(collection))
}
pub fn ensure_required_collections(&mut self) {
for collection in required_collections() {
self.records.entry(collection).or_default();
}
}
pub fn insert_record(&mut self, record: AtpdStateRecord) -> Result<(), AtpdStateError> {
if !self.schema_version.is_supported() {
return Err(AtpdStateError::UnsupportedSchema(self.schema_version));
}
let scoped_id = scoped_record_id(record.collection, &record.record_id);
if self
.records
.get(&record.collection)
.and_then(|records| records.get(&record.record_id))
.is_some()
{
return Err(AtpdStateError::DuplicateRecord(scoped_id));
}
self.quota_ledger
.reserve(scoped_id, record.quota_allocation())?;
self.records
.entry(record.collection)
.or_default()
.insert(record.record_id.clone(), record);
Ok(())
}
pub fn expire_record(
&mut self,
collection: AtpdStateCollection,
record_id: &str,
) -> Result<AtpdStateRecord, AtpdStateError> {
let records = self
.records
.get_mut(&collection)
.ok_or(AtpdStateError::UnknownCollection(collection))?;
let record = records.remove(record_id).ok_or_else(|| {
AtpdStateError::UnknownRecord(scoped_record_id(collection, record_id))
})?;
self.quota_ledger
.release(&scoped_record_id(collection, record_id))?;
Ok(record)
}
pub fn apply_retention(&mut self, now_epoch_secs: u64) -> Result<Vec<String>, AtpdStateError> {
let expired = self.expired_record_ids(now_epoch_secs);
for scoped_id in &expired {
let (collection, record_id) = parse_scoped_record_id(scoped_id)?;
self.expire_record(collection, record_id)?;
}
Ok(expired)
}
#[must_use]
pub fn expired_record_ids(&self, now_epoch_secs: u64) -> Vec<String> {
let records: Vec<_> = self
.records
.values()
.flat_map(|records| records.values().map(AtpdStateRecord::retention_record))
.collect();
self.retention_policy
.expired_records(&records, now_epoch_secs)
}
#[must_use]
pub fn integrity_report(&self) -> AtpdIntegrityReport {
let missing_collections = required_collections()
.into_iter()
.filter(|collection| !self.records.contains_key(collection))
.collect();
let mut expected_usage: BTreeMap<QuotaBucket, QuotaUsage> = BTreeMap::new();
for record in self.records.values().flat_map(BTreeMap::values) {
let usage = expected_usage
.entry(record.collection.quota_bucket())
.or_default();
usage.bytes = usage.bytes.saturating_add(record.bytes);
usage.records = usage.records.saturating_add(1);
}
let mut buckets: BTreeSet<_> = expected_usage.keys().copied().collect();
buckets.extend(self.quota_ledger.rows().iter().map(|row| row.bucket));
let quota_mismatches = buckets
.into_iter()
.filter_map(|bucket| {
let expected = expected_usage.get(&bucket).copied().unwrap_or_default();
let actual = self.quota_ledger.usage(bucket);
(expected != actual).then_some(AtpdQuotaMismatch {
bucket,
expected,
actual,
})
})
.collect();
AtpdIntegrityReport {
missing_collections,
quota_mismatches,
}
}
pub fn export(
&self,
mode: AtpdExportMode,
policy_authorized: bool,
) -> Result<AtpdStateExport, AtpdStateError> {
if mode == AtpdExportMode::Full
&& self.settings.require_policy_for_full_export
&& !policy_authorized
{
return Err(AtpdStateError::PolicyRequiredForFullExport);
}
let mut records = Vec::new();
for (collection, collection_records) in &self.records {
for record in collection_records.values() {
if record.export_policy == StateExportPolicy::Deny {
continue;
}
let redacted = match mode {
AtpdExportMode::Redacted => true,
AtpdExportMode::Full => {
record.export_policy == StateExportPolicy::Redacted
|| (record.sensitivity == StateSensitivity::KeyMaterial
&& self.settings.key_material_is_handle_only)
}
};
let payload_digest = if redacted {
record.payload_digest.redacted()
} else {
record.payload_digest.to_hex()
};
records.push(AtpdExportRecord {
scoped_id: scoped_record_id(*collection, &record.record_id),
collection: *collection,
summary: record.redacted_summary.clone(),
payload_digest,
bytes: record.bytes,
sensitivity: record.sensitivity,
redacted,
fields: record.fields.iter().cloned().collect(),
});
}
}
Ok(AtpdStateExport {
schema_version: self.schema_version,
mode: match mode {
AtpdExportMode::Redacted => "redacted",
AtpdExportMode::Full => "full",
}
.to_string(),
records,
quota_rows: self.quota_ledger.rows(),
settings: self.settings.clone(),
})
}
pub fn privacy_safe_diagnostics_export(&self) -> Result<AtpdStateExport, AtpdStateError> {
self.export(AtpdExportMode::Redacted, false)
}
pub fn restore(snapshot: Self) -> Result<Self, AtpdStateError> {
if !snapshot.schema_version.is_supported() {
return Err(AtpdStateError::UnsupportedSchema(snapshot.schema_version));
}
let mut restored = Self {
schema_version: snapshot.schema_version,
settings: snapshot.settings,
quota_ledger: quota_limits_only(&snapshot.quota_ledger),
retention_policy: snapshot.retention_policy,
records: BTreeMap::new(),
};
restored.ensure_required_collections();
for records in snapshot.records.into_values() {
for record in records.into_values() {
restored.insert_record(record)?;
}
}
Ok(restored)
}
pub fn deterministic_snapshot_json(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(self)
}
}
impl Default for AtpdPersistentState {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AtpdStateError {
UnsupportedSchema(AtpdSchemaVersion),
UnknownCollection(AtpdStateCollection),
UnknownRecord(String),
DuplicateRecord(String),
Quota(QuotaError),
PolicyRequiredForFullExport,
MalformedScopedRecordId(String),
}
impl fmt::Display for AtpdStateError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::UnsupportedSchema(version) => {
write!(f, "unsupported atpd state schema version {}", version.0)
}
Self::UnknownCollection(collection) => {
write!(f, "unknown atpd state collection `{collection}`")
}
Self::UnknownRecord(record_id) => write!(f, "unknown atpd state record `{record_id}`"),
Self::DuplicateRecord(record_id) => {
write!(f, "duplicate atpd state record `{record_id}`")
}
Self::Quota(err) => write!(f, "{err}"),
Self::PolicyRequiredForFullExport => {
write!(f, "full atpd state export requires explicit policy")
}
Self::MalformedScopedRecordId(record_id) => {
write!(f, "malformed scoped record id `{record_id}`")
}
}
}
}
impl std::error::Error for AtpdStateError {}
impl From<QuotaError> for AtpdStateError {
fn from(value: QuotaError) -> Self {
Self::Quota(value)
}
}
#[must_use]
pub fn required_collections() -> Vec<AtpdStateCollection> {
vec![
AtpdStateCollection::Identities,
AtpdStateCollection::Grants,
AtpdStateCollection::Transfers,
AtpdStateCollection::Journals,
AtpdStateCollection::Cache,
AtpdStateCollection::Inbox,
AtpdStateCollection::Mailbox,
AtpdStateCollection::ReceivePlans,
AtpdStateCollection::ConsentRecords,
AtpdStateCollection::ProofBundles,
AtpdStateCollection::Traces,
AtpdStateCollection::Diagnostics,
AtpdStateCollection::Settings,
AtpdStateCollection::Quarantine,
]
}
fn scoped_record_id(collection: AtpdStateCollection, record_id: &str) -> String {
format!("{collection}:{record_id}")
}
fn quota_limits_only(source: &QuotaLedger) -> QuotaLedger {
let mut ledger = QuotaLedger::new();
for (bucket, limit) in source.limits() {
ledger.set_limit(bucket, limit);
}
ledger
}
fn parse_scoped_record_id(scoped_id: &str) -> Result<(AtpdStateCollection, &str), AtpdStateError> {
let Some((collection_name, record_id)) = scoped_id.split_once(':') else {
return Err(AtpdStateError::MalformedScopedRecordId(
scoped_id.to_string(),
));
};
let collection = required_collections()
.into_iter()
.find(|collection| collection.as_str() == collection_name)
.ok_or_else(|| AtpdStateError::MalformedScopedRecordId(scoped_id.to_string()))?;
if record_id.is_empty() {
return Err(AtpdStateError::MalformedScopedRecordId(
scoped_id.to_string(),
));
}
Ok((collection, record_id))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::asupersync::atp::quota::{QuotaLimit, RetentionRule};
fn digest(byte: u8) -> ObjectDigest {
ObjectDigest::new([byte; 32])
}
fn record(
record_id: &str,
collection: AtpdStateCollection,
bytes: u64,
updated_epoch_secs: u64,
sensitivity: StateSensitivity,
) -> AtpdStateRecord {
AtpdStateRecord::new(
record_id,
collection,
digest(bytes as u8),
format!("{collection}:{record_id}"),
bytes,
RetentionClock::new(0, updated_epoch_secs),
sensitivity,
)
.with_field("payload_digest")
}
#[test]
fn schema_covers_every_required_collection() {
let mut state = AtpdPersistentState::new();
state.ensure_required_collections();
assert!(state.covers_required_collections());
assert_eq!(required_collections().len(), 14);
}
#[test]
fn migration_rejects_future_schema() {
let mut snapshot = AtpdPersistentState::new();
snapshot.schema_version = AtpdSchemaVersion(ATPD_STATE_SCHEMA_VERSION + 1);
assert_eq!(
AtpdPersistentState::restore(snapshot).unwrap_err(), AtpdStateError::UnsupportedSchema(AtpdSchemaVersion(ATPD_STATE_SCHEMA_VERSION + 1))
);
}
#[test]
fn quota_exhaustion_blocks_insert_without_mutating_state() {
let mut state = AtpdPersistentState::new();
state
.quota_ledger
.set_limit(QuotaBucket::Cache, QuotaLimit::new(10, 1));
state
.insert_record(record(
"cache-a",
AtpdStateCollection::Cache,
8,
10,
StateSensitivity::PrivateContent,
))
.unwrap();
let err = state
.insert_record(record(
"cache-b",
AtpdStateCollection::Cache,
1,
11,
StateSensitivity::PrivateContent,
))
.unwrap_err();
assert!(matches!(
err,
AtpdStateError::Quota(QuotaError::Exhausted { .. })
));
assert_eq!(
state
.records
.get(&AtpdStateCollection::Cache)
.map(BTreeMap::len),
Some(1)
);
}
#[test]
fn integrity_report_detects_corrupt_quota_accounting() {
let mut state = AtpdPersistentState::new();
state.ensure_required_collections();
state
.records
.entry(AtpdStateCollection::Cache)
.or_default()
.insert(
"unaccounted".to_string(),
record(
"unaccounted",
AtpdStateCollection::Cache,
9,
10,
StateSensitivity::PrivateContent,
),
);
let report = state.integrity_report();
assert!(report.missing_collections.is_empty());
assert_eq!(
report.quota_mismatches,
vec![AtpdQuotaMismatch {
bucket: QuotaBucket::Cache,
expected: QuotaUsage {
bytes: 9,
records: 1
},
actual: QuotaUsage::default(),
}]
);
}
#[test]
fn retention_expires_old_records_and_releases_quota() {
let mut state = AtpdPersistentState::new();
state
.retention_policy
.set_rule(QuotaBucket::Traces, RetentionRule::max_age(10));
state
.insert_record(record(
"trace-old",
AtpdStateCollection::Traces,
5,
80,
StateSensitivity::Internal,
))
.unwrap(); state
.insert_record(record(
"trace-new",
AtpdStateCollection::Traces,
7,
95,
StateSensitivity::Internal,
))
.unwrap();
assert_eq!(
state.apply_retention(100).unwrap(),
vec!["traces:trace-old".to_string()]
);
assert_eq!(state.quota_ledger.usage(QuotaBucket::Traces).bytes, 7);
}
#[test]
fn redacted_export_hides_sensitive_payloads() {
let mut state = AtpdPersistentState::new();
state
.insert_record(
record(
"identity-a",
AtpdStateCollection::Identities,
32,
10,
StateSensitivity::KeyMaterial,
)
.with_export_policy(StateExportPolicy::ExplicitPolicyRequired),
)
.unwrap();
let export = state.export(AtpdExportMode::Redacted, false).unwrap(); assert_eq!(export.records.len(), 1);
assert!(export.records[0].redacted);
assert!(export.records[0].payload_digest.ends_with("..."));
}
#[test]
fn diagnostics_export_is_always_redacted_without_policy() {
let mut state = AtpdPersistentState::new();
state
.insert_record(
record(
"diagnostic-a",
AtpdStateCollection::Diagnostics,
16,
10,
StateSensitivity::Internal,
)
.with_export_policy(StateExportPolicy::ExplicitPolicyRequired),
)
.unwrap();
let export = state.privacy_safe_diagnostics_export().unwrap(); assert_eq!(export.mode, "redacted");
assert!(export.records[0].redacted);
}
#[test]
fn full_export_requires_policy_and_still_respects_record_policy() {
let mut state = AtpdPersistentState::new();
state
.insert_record(
record(
"public-setting",
AtpdStateCollection::Settings,
1,
10,
StateSensitivity::Public,
)
.with_export_policy(StateExportPolicy::ExplicitPolicyRequired),
)
.unwrap();
assert_eq!(
state.export(AtpdExportMode::Full, false).unwrap_err(), AtpdStateError::PolicyRequiredForFullExport
);
let export = state.export(AtpdExportMode::Full, true).unwrap(); assert!(!export.records[0].redacted);
assert!(!export.records[0].payload_digest.ends_with("..."));
}
#[test]
fn full_export_with_policy_can_include_private_metadata_digest() {
let mut state = AtpdPersistentState::new();
state
.insert_record(
record(
"cache-private",
AtpdStateCollection::Cache,
2,
10,
StateSensitivity::PrivateContent,
)
.with_export_policy(StateExportPolicy::ExplicitPolicyRequired),
)
.unwrap();
let export = state.export(AtpdExportMode::Full, true).unwrap(); assert!(!export.records[0].redacted);
}
#[test]
fn consent_records_restore_and_snapshot_deterministically() {
let mut state = AtpdPersistentState::new();
state.ensure_required_collections();
state
.insert_record(record(
"consent-a",
AtpdStateCollection::ConsentRecords,
9,
10,
StateSensitivity::GrantSecret,
))
.unwrap();
let restored = AtpdPersistentState::restore(state.clone()).unwrap(); assert_eq!(
state.deterministic_snapshot_json().unwrap(),
restored.deterministic_snapshot_json().unwrap()
);
}
#[test]
fn quarantine_cleanup_uses_retention_policy() {
let mut state = AtpdPersistentState::new();
state
.retention_policy
.set_rule(QuotaBucket::Quarantine, RetentionRule::max_age(1));
state
.insert_record(record(
"quarantine-a",
AtpdStateCollection::Quarantine,
64,
10,
StateSensitivity::Quarantine,
))
.unwrap();
assert_eq!(
state.expired_record_ids(12),
vec!["quarantine:quarantine-a".to_string()]
);
}
}