use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use crate::db;
use crate::memory::entries::StructuredMemoryEntry;
use crate::paths::state::StateLayout;
use crate::state::runtime::RuntimeMemoryEntry;
const WORKSPACE_MEMORY_SESSION_CAP: u64 = 6;
const WORK_STREAM_MEMORY_SESSION_CAP: u64 = 4;
const PROJECT_MEMORY_SESSION_CAP: u64 = 3;
const PROFILE_MEMORY_SESSION_CAP: u64 = 2;
const POD_MEMORY_SESSION_CAP: u64 = 2;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum GovernanceAction {
Discard,
AdmitWorkspace,
AdmitWorkStream,
StageProjectPromotion,
StageProfilePromotion,
StagePodPromotion,
DraftProjectTruth,
AutoCompactDuplicate,
StageSupersede,
StageNarrowing,
StageEvictionReview,
Escalate,
}
impl GovernanceAction {
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::Discard => "discard",
Self::AdmitWorkspace => "admit_workspace",
Self::AdmitWorkStream => "admit_work_stream",
Self::StageProjectPromotion => "stage_project_promotion",
Self::StageProfilePromotion => "stage_profile_promotion",
Self::StagePodPromotion => "stage_pod_promotion",
Self::DraftProjectTruth => "draft_project_truth",
Self::AutoCompactDuplicate => "auto_compact_duplicate",
Self::StageSupersede => "stage_supersede",
Self::StageNarrowing => "stage_narrowing",
Self::StageEvictionReview => "stage_eviction_review",
Self::Escalate => "escalate",
}
}
pub(crate) fn requires_reconciler(self) -> bool {
matches!(
self,
Self::StageProjectPromotion
| Self::StageProfilePromotion
| Self::StagePodPromotion
| Self::StageSupersede
| Self::StageNarrowing
| Self::StageEvictionReview
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum GovernanceStatus {
Allowed,
Discarded,
Escalated,
RateLimited,
}
impl GovernanceStatus {
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::Allowed => "allowed",
Self::Discarded => "discarded",
Self::Escalated => "escalated",
Self::RateLimited => "rate_limited",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct GovernancePressureSignalView {
pub(crate) signal: String,
pub(crate) detail: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct GovernanceRateLimitView {
pub(crate) scope: String,
pub(crate) count: u64,
pub(crate) cap: u64,
pub(crate) remaining: u64,
pub(crate) status: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct GovernanceRollbackView {
pub(crate) mutation_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) actor_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) session_id: Option<String>,
pub(crate) target_scope: String,
pub(crate) prior_entry_ids: Vec<String>,
pub(crate) prior_target_digest: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) prior_source_digest: Option<String>,
pub(crate) applied_action: GovernanceAction,
pub(crate) rationale: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub(crate) evidence_summary: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct GovernanceDecisionView {
pub(crate) action: GovernanceAction,
pub(crate) status: GovernanceStatus,
pub(crate) target_scope: String,
pub(crate) entry_type: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) source_scope: Option<String>,
pub(crate) requires_reconciler: bool,
pub(crate) rationale: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub(crate) evidence_summary: Vec<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub(crate) pressure_signals: Vec<GovernancePressureSignalView>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) duplicate_digest: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) rate_limit: Option<GovernanceRateLimitView>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) rollback: Option<GovernanceRollbackView>,
}
pub(crate) struct GovernanceDecisionInit {
pub(crate) action: GovernanceAction,
pub(crate) status: GovernanceStatus,
pub(crate) target_scope: String,
pub(crate) entry_type: String,
pub(crate) source_scope: Option<String>,
pub(crate) rationale: String,
pub(crate) evidence_summary: Vec<String>,
pub(crate) pressure_signals: Vec<GovernancePressureSignalView>,
pub(crate) duplicate_digest: Option<String>,
pub(crate) rate_limit: Option<GovernanceRateLimitView>,
}
impl GovernanceDecisionView {
pub(crate) fn new(init: GovernanceDecisionInit) -> Self {
Self {
action: init.action,
status: init.status,
target_scope: init.target_scope,
entry_type: init.entry_type,
source_scope: init.source_scope,
requires_reconciler: init.action.requires_reconciler(),
rationale: init.rationale,
evidence_summary: init.evidence_summary,
pressure_signals: init.pressure_signals,
duplicate_digest: init.duplicate_digest,
rate_limit: init.rate_limit,
rollback: None,
}
}
pub(crate) fn blocked_for_write(&self, autonomous: bool) -> bool {
match self.status {
GovernanceStatus::Allowed => false,
GovernanceStatus::Discarded | GovernanceStatus::RateLimited => true,
GovernanceStatus::Escalated => autonomous,
}
}
pub(crate) fn blocked_write_message(&self) -> String {
match self.status {
GovernanceStatus::Discarded => {
format!(
"governance discarded this memory mutation: {}",
self.rationale
)
}
GovernanceStatus::RateLimited => {
if let Some(rate_limit) = &self.rate_limit {
format!(
"governance rate limit reached for {} ({} / {} this session): {}",
rate_limit.scope, rate_limit.count, rate_limit.cap, self.rationale
)
} else {
format!("governance rate limit reached: {}", self.rationale)
}
}
GovernanceStatus::Escalated => {
format!(
"governance escalated this memory mutation: {}",
self.rationale
)
}
GovernanceStatus::Allowed => self.rationale.clone(),
}
}
pub(crate) fn with_rollback(
mut self,
mutation_id: &str,
actor_id: Option<&str>,
session_id: Option<&str>,
prior_entry_ids: Vec<String>,
target_before: &str,
source_before: Option<&str>,
) -> Self {
self.rollback = Some(GovernanceRollbackView {
mutation_id: mutation_id.to_owned(),
actor_id: actor_id.map(str::to_owned),
session_id: session_id.map(str::to_owned),
target_scope: self.target_scope.clone(),
prior_entry_ids,
prior_target_digest: hex_digest(target_before.as_bytes()),
prior_source_digest: source_before.map(|contents| hex_digest(contents.as_bytes())),
applied_action: self.action,
rationale: self.rationale.clone(),
evidence_summary: self.evidence_summary.clone(),
});
self
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct DuplicateMatch {
pub(crate) entry_id: String,
pub(crate) digest: String,
}
pub(crate) fn signal(
signal: impl Into<String>,
detail: impl Into<String>,
) -> GovernancePressureSignalView {
GovernancePressureSignalView {
signal: signal.into(),
detail: detail.into(),
}
}
pub(crate) fn normalized_duplicate_digest(entry: &StructuredMemoryEntry) -> String {
let key = format!(
"{}\n{}",
entry.entry_type,
entry
.content
.split_whitespace()
.collect::<Vec<_>>()
.join(" ")
.to_lowercase()
);
hex_digest(key.as_bytes())
}
pub(crate) fn find_runtime_duplicate(
entries: &[RuntimeMemoryEntry],
selected_entry: &StructuredMemoryEntry,
) -> Option<DuplicateMatch> {
let selected_digest = normalized_duplicate_digest(selected_entry);
entries.iter().find_map(|entry| {
let structured = entry.as_structured_entry()?;
if structured.id == selected_entry.id {
return None;
}
if normalized_duplicate_digest(&structured) == selected_digest {
Some(DuplicateMatch {
entry_id: structured.id,
digest: selected_digest.clone(),
})
} else {
None
}
})
}
pub(crate) fn session_rate_limit(
layout: &StateLayout,
session_id: Option<&str>,
target_scope: &str,
current_op_id: Option<&str>,
) -> Result<Option<GovernanceRateLimitView>> {
let Some(cap) = scope_cap(target_scope) else {
return Ok(None);
};
let Some(session_id) = session_id else {
return Ok(None);
};
if !layout.state_db_path().exists() {
return Ok(Some(rate_limit_view(target_scope, 0, cap)));
}
let db = db::StateDb::open(&layout.state_db_path())?;
let records = db::memory_ops::list_by_session(db.conn(), session_id)?;
let mut count = 0u64;
for record in records {
if current_op_id.is_some_and(|id| id == record.id) {
continue;
}
if !matches!(
record.command.as_str(),
"memory-candidate-admit" | "memory-promote"
) {
continue;
}
if matches!(
record.outcome.as_deref(),
Some("ownership_conflict")
| Some("stale_session")
| Some("unsupported_multiwriter")
| Some("duplicate_id_conflict")
) {
continue;
}
let plan: RateLimitPlan = serde_json::from_str(&record.plan_json).with_context(|| {
format!(
"failed to decode queued memory-op plan for rate-limit accounting `{}`",
record.id
)
})?;
if plan.target.scope == target_scope {
count += 1;
}
}
Ok(Some(rate_limit_view(target_scope, count, cap)))
}
#[derive(Deserialize)]
struct RateLimitPlan {
target: RateLimitTarget,
}
#[derive(Deserialize)]
struct RateLimitTarget {
scope: String,
}
fn scope_cap(target_scope: &str) -> Option<u64> {
match target_scope {
"workspace_memory" => Some(WORKSPACE_MEMORY_SESSION_CAP),
"work_stream_memory" => Some(WORK_STREAM_MEMORY_SESSION_CAP),
"project_memory" => Some(PROJECT_MEMORY_SESSION_CAP),
"profile_memory" => Some(PROFILE_MEMORY_SESSION_CAP),
"pod_memory" => Some(POD_MEMORY_SESSION_CAP),
_ => None,
}
}
fn rate_limit_view(target_scope: &str, count: u64, cap: u64) -> GovernanceRateLimitView {
let remaining = cap.saturating_sub(count);
GovernanceRateLimitView {
scope: target_scope.to_owned(),
count,
cap,
remaining,
status: if count < cap {
"within_limit".to_owned()
} else {
"reached".to_owned()
},
}
}
fn hex_digest(bytes: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(bytes);
format!("{:x}", hasher.finalize())
}