use super::{
EntityRef, GovernanceError, Hash, LineageError, LineageId, LineageRecord, Operation,
PolicyBundle, PolicyBundleId, PolicyBundleStatus, PolicyError, Timestamp, WitnessError,
WitnessId, WitnessRecord,
};
use std::collections::HashMap;
use std::sync::Arc;
pub type RepositoryResult<T> = Result<T, GovernanceError>;
#[derive(Clone, Debug, Default)]
pub struct QueryOptions {
pub limit: Option<usize>,
pub offset: Option<usize>,
pub ascending: bool,
}
impl QueryOptions {
#[must_use]
pub const fn with_limit(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}
#[must_use]
pub const fn with_offset(mut self, offset: usize) -> Self {
self.offset = Some(offset);
self
}
#[must_use]
pub const fn descending(mut self) -> Self {
self.ascending = false;
self
}
}
#[derive(Clone, Debug)]
pub struct TimeRange {
pub start: Timestamp,
pub end: Timestamp,
}
impl TimeRange {
#[must_use]
pub const fn new(start: Timestamp, end: Timestamp) -> Self {
Self { start, end }
}
#[must_use]
pub fn contains(&self, ts: Timestamp) -> bool {
ts >= self.start && ts < self.end
}
}
pub trait PolicyRepository: Send + Sync {
fn save(&self, bundle: &PolicyBundle) -> RepositoryResult<()>;
fn get(&self, id: PolicyBundleId) -> RepositoryResult<Option<PolicyBundle>>;
fn update(&self, bundle: &PolicyBundle) -> RepositoryResult<()>;
fn delete(&self, id: PolicyBundleId) -> RepositoryResult<()>;
fn list(
&self,
status: Option<PolicyBundleStatus>,
options: QueryOptions,
) -> RepositoryResult<Vec<PolicyBundle>>;
fn get_active(&self) -> RepositoryResult<Option<PolicyBundle>>;
fn find_by_name(
&self,
pattern: &str,
options: QueryOptions,
) -> RepositoryResult<Vec<PolicyBundle>>;
fn get_history(&self, name: &str) -> RepositoryResult<Vec<PolicyBundle>>;
fn exists(&self, id: PolicyBundleId) -> RepositoryResult<bool>;
}
pub trait WitnessRepository: Send + Sync {
fn save(&self, witness: &WitnessRecord) -> RepositoryResult<()>;
fn get(&self, id: WitnessId) -> RepositoryResult<Option<WitnessRecord>>;
fn get_head(&self) -> RepositoryResult<Option<WitnessRecord>>;
fn get_by_sequence(&self, sequence: u64) -> RepositoryResult<Option<WitnessRecord>>;
fn get_range(&self, start_seq: u64, end_seq: u64) -> RepositoryResult<Vec<WitnessRecord>>;
fn get_by_time_range(
&self,
range: TimeRange,
options: QueryOptions,
) -> RepositoryResult<Vec<WitnessRecord>>;
fn get_by_action(&self, action_hash: Hash) -> RepositoryResult<Vec<WitnessRecord>>;
fn get_by_policy(
&self,
policy_id: PolicyBundleId,
options: QueryOptions,
) -> RepositoryResult<Vec<WitnessRecord>>;
fn get_denials(&self, options: QueryOptions) -> RepositoryResult<Vec<WitnessRecord>>;
fn get_by_correlation(&self, correlation_id: &str) -> RepositoryResult<Vec<WitnessRecord>>;
fn count(&self) -> RepositoryResult<u64>;
fn verify_chain(&self, from_sequence: u64) -> RepositoryResult<bool>;
}
pub trait LineageRepository: Send + Sync {
fn save(&self, lineage: &LineageRecord) -> RepositoryResult<()>;
fn get(&self, id: LineageId) -> RepositoryResult<Option<LineageRecord>>;
fn get_for_entity(
&self,
entity_ref: &EntityRef,
options: QueryOptions,
) -> RepositoryResult<Vec<LineageRecord>>;
fn get_latest_for_entity(
&self,
entity_ref: &EntityRef,
) -> RepositoryResult<Option<LineageRecord>>;
fn get_by_actor(
&self,
actor: &str,
options: QueryOptions,
) -> RepositoryResult<Vec<LineageRecord>>;
fn get_by_operation(
&self,
operation: Operation,
options: QueryOptions,
) -> RepositoryResult<Vec<LineageRecord>>;
fn get_by_witness(&self, witness_id: WitnessId) -> RepositoryResult<Vec<LineageRecord>>;
fn get_by_time_range(
&self,
range: TimeRange,
options: QueryOptions,
) -> RepositoryResult<Vec<LineageRecord>>;
fn get_all_dependencies(&self, id: LineageId) -> RepositoryResult<Vec<LineageRecord>>;
fn get_dependents(&self, id: LineageId) -> RepositoryResult<Vec<LineageRecord>>;
fn count(&self) -> RepositoryResult<u64>;
fn verify_no_cycles(&self) -> RepositoryResult<bool>;
}
#[derive(Default)]
pub struct InMemoryPolicyRepository {
bundles: parking_lot::RwLock<HashMap<PolicyBundleId, PolicyBundle>>,
}
impl InMemoryPolicyRepository {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
impl PolicyRepository for InMemoryPolicyRepository {
fn save(&self, bundle: &PolicyBundle) -> RepositoryResult<()> {
let mut bundles = self.bundles.write();
if bundles.contains_key(&bundle.id) {
return Err(GovernanceError::Policy(PolicyError::AlreadyExists(
bundle.id,
)));
}
bundles.insert(bundle.id, bundle.clone());
Ok(())
}
fn get(&self, id: PolicyBundleId) -> RepositoryResult<Option<PolicyBundle>> {
Ok(self.bundles.read().get(&id).cloned())
}
fn update(&self, bundle: &PolicyBundle) -> RepositoryResult<()> {
let mut bundles = self.bundles.write();
if !bundles.contains_key(&bundle.id) {
return Err(GovernanceError::Policy(PolicyError::ScopeNotFound(
bundle.id.to_string(),
)));
}
bundles.insert(bundle.id, bundle.clone());
Ok(())
}
fn delete(&self, id: PolicyBundleId) -> RepositoryResult<()> {
let mut bundles = self.bundles.write();
if let Some(bundle) = bundles.get(&id) {
if bundle.status != PolicyBundleStatus::Draft {
return Err(GovernanceError::Policy(PolicyError::NotEditable(
bundle.status,
)));
}
}
bundles.remove(&id);
Ok(())
}
fn list(
&self,
status: Option<PolicyBundleStatus>,
options: QueryOptions,
) -> RepositoryResult<Vec<PolicyBundle>> {
let bundles = self.bundles.read();
let mut result: Vec<_> = bundles
.values()
.filter(|b| status.map_or(true, |s| b.status == s))
.cloned()
.collect();
result.sort_by(|a, b| {
if options.ascending {
a.created_at.cmp(&b.created_at)
} else {
b.created_at.cmp(&a.created_at)
}
});
if let Some(offset) = options.offset {
result = result.into_iter().skip(offset).collect();
}
if let Some(limit) = options.limit {
result.truncate(limit);
}
Ok(result)
}
fn get_active(&self) -> RepositoryResult<Option<PolicyBundle>> {
Ok(self
.bundles
.read()
.values()
.find(|b| b.status == PolicyBundleStatus::Active)
.cloned())
}
fn find_by_name(
&self,
pattern: &str,
options: QueryOptions,
) -> RepositoryResult<Vec<PolicyBundle>> {
let bundles = self.bundles.read();
let mut result: Vec<_> = bundles
.values()
.filter(|b| b.name.contains(pattern))
.cloned()
.collect();
if let Some(limit) = options.limit {
result.truncate(limit);
}
Ok(result)
}
fn get_history(&self, name: &str) -> RepositoryResult<Vec<PolicyBundle>> {
let bundles = self.bundles.read();
let mut result: Vec<_> = bundles
.values()
.filter(|b| b.name == name)
.cloned()
.collect();
result.sort_by(|a, b| a.version.cmp(&b.version));
Ok(result)
}
fn exists(&self, id: PolicyBundleId) -> RepositoryResult<bool> {
Ok(self.bundles.read().contains_key(&id))
}
}
#[derive(Default)]
pub struct InMemoryWitnessRepository {
witnesses: parking_lot::RwLock<HashMap<WitnessId, WitnessRecord>>,
by_sequence: parking_lot::RwLock<HashMap<u64, WitnessId>>,
}
impl InMemoryWitnessRepository {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
impl WitnessRepository for InMemoryWitnessRepository {
fn save(&self, witness: &WitnessRecord) -> RepositoryResult<()> {
let mut witnesses = self.witnesses.write();
let mut by_sequence = self.by_sequence.write();
if witnesses.contains_key(&witness.id) {
return Err(GovernanceError::Witness(WitnessError::AlreadyExists(
witness.id,
)));
}
if let Some(prev_id) = witness.previous_witness {
if !witnesses.contains_key(&prev_id) {
return Err(GovernanceError::Witness(WitnessError::ChainError(
super::WitnessChainError::PreviousNotFound(prev_id),
)));
}
}
witnesses.insert(witness.id, witness.clone());
by_sequence.insert(witness.sequence, witness.id);
Ok(())
}
fn get(&self, id: WitnessId) -> RepositoryResult<Option<WitnessRecord>> {
Ok(self.witnesses.read().get(&id).cloned())
}
fn get_head(&self) -> RepositoryResult<Option<WitnessRecord>> {
let by_sequence = self.by_sequence.read();
let witnesses = self.witnesses.read();
if let Some(max_seq) = by_sequence.keys().max() {
if let Some(id) = by_sequence.get(max_seq) {
return Ok(witnesses.get(id).cloned());
}
}
Ok(None)
}
fn get_by_sequence(&self, sequence: u64) -> RepositoryResult<Option<WitnessRecord>> {
let by_sequence = self.by_sequence.read();
let witnesses = self.witnesses.read();
if let Some(id) = by_sequence.get(&sequence) {
return Ok(witnesses.get(id).cloned());
}
Ok(None)
}
fn get_range(&self, start_seq: u64, end_seq: u64) -> RepositoryResult<Vec<WitnessRecord>> {
let by_sequence = self.by_sequence.read();
let witnesses = self.witnesses.read();
let mut result = Vec::new();
for seq in start_seq..=end_seq {
if let Some(id) = by_sequence.get(&seq) {
if let Some(w) = witnesses.get(id) {
result.push(w.clone());
}
}
}
Ok(result)
}
fn get_by_time_range(
&self,
range: TimeRange,
options: QueryOptions,
) -> RepositoryResult<Vec<WitnessRecord>> {
let witnesses = self.witnesses.read();
let mut result: Vec<_> = witnesses
.values()
.filter(|w| range.contains(w.timestamp))
.cloned()
.collect();
result.sort_by(|a, b| a.sequence.cmp(&b.sequence));
if let Some(limit) = options.limit {
result.truncate(limit);
}
Ok(result)
}
fn get_by_action(&self, action_hash: Hash) -> RepositoryResult<Vec<WitnessRecord>> {
let witnesses = self.witnesses.read();
Ok(witnesses
.values()
.filter(|w| w.action_hash == action_hash)
.cloned()
.collect())
}
fn get_by_policy(
&self,
policy_id: PolicyBundleId,
options: QueryOptions,
) -> RepositoryResult<Vec<WitnessRecord>> {
let witnesses = self.witnesses.read();
let mut result: Vec<_> = witnesses
.values()
.filter(|w| w.policy_bundle_ref.id == policy_id)
.cloned()
.collect();
if let Some(limit) = options.limit {
result.truncate(limit);
}
Ok(result)
}
fn get_denials(&self, options: QueryOptions) -> RepositoryResult<Vec<WitnessRecord>> {
let witnesses = self.witnesses.read();
let mut result: Vec<_> = witnesses
.values()
.filter(|w| !w.decision.allow)
.cloned()
.collect();
if let Some(limit) = options.limit {
result.truncate(limit);
}
Ok(result)
}
fn get_by_correlation(&self, correlation_id: &str) -> RepositoryResult<Vec<WitnessRecord>> {
let witnesses = self.witnesses.read();
Ok(witnesses
.values()
.filter(|w| w.correlation_id.as_deref() == Some(correlation_id))
.cloned()
.collect())
}
fn count(&self) -> RepositoryResult<u64> {
Ok(self.witnesses.read().len() as u64)
}
fn verify_chain(&self, from_sequence: u64) -> RepositoryResult<bool> {
let witnesses = self.witnesses.read();
let by_sequence = self.by_sequence.read();
let max_seq = by_sequence.keys().max().copied().unwrap_or(0);
for seq in from_sequence..=max_seq {
let Some(id) = by_sequence.get(&seq) else {
return Ok(false); };
let Some(witness) = witnesses.get(id) else {
return Ok(false);
};
if !witness.verify_content_hash() {
return Ok(false);
}
if seq > from_sequence {
if let Some(prev_id) = witness.previous_witness {
if let Some(prev) = witnesses.get(&prev_id) {
if witness.verify_chain_link(prev).is_err() {
return Ok(false);
}
} else {
return Ok(false);
}
}
}
}
Ok(true)
}
}
#[derive(Default)]
pub struct InMemoryLineageRepository {
lineages: parking_lot::RwLock<HashMap<LineageId, LineageRecord>>,
by_entity: parking_lot::RwLock<HashMap<String, Vec<LineageId>>>,
}
impl InMemoryLineageRepository {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
impl LineageRepository for InMemoryLineageRepository {
fn save(&self, lineage: &LineageRecord) -> RepositoryResult<()> {
let mut lineages = self.lineages.write();
let mut by_entity = self.by_entity.write();
if lineages.contains_key(&lineage.id) {
return Err(GovernanceError::Lineage(LineageError::AlreadyExists(
lineage.id,
)));
}
for dep_id in &lineage.dependencies {
if !lineages.contains_key(dep_id) {
return Err(GovernanceError::Lineage(LineageError::DependencyNotFound(
*dep_id,
)));
}
}
lineages.insert(lineage.id, lineage.clone());
let entity_key = lineage.entity_ref.canonical();
by_entity.entry(entity_key).or_default().push(lineage.id);
Ok(())
}
fn get(&self, id: LineageId) -> RepositoryResult<Option<LineageRecord>> {
Ok(self.lineages.read().get(&id).cloned())
}
fn get_for_entity(
&self,
entity_ref: &EntityRef,
options: QueryOptions,
) -> RepositoryResult<Vec<LineageRecord>> {
let lineages = self.lineages.read();
let by_entity = self.by_entity.read();
let entity_key = entity_ref.canonical();
let mut result: Vec<_> = by_entity
.get(&entity_key)
.map(|ids| {
ids.iter()
.filter_map(|id| lineages.get(id).cloned())
.collect()
})
.unwrap_or_default();
result.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
if let Some(limit) = options.limit {
result.truncate(limit);
}
Ok(result)
}
fn get_latest_for_entity(
&self,
entity_ref: &EntityRef,
) -> RepositoryResult<Option<LineageRecord>> {
let lineages = self.lineages.read();
let by_entity = self.by_entity.read();
let entity_key = entity_ref.canonical();
Ok(by_entity.get(&entity_key).and_then(|ids| {
ids.iter()
.filter_map(|id| lineages.get(id))
.max_by_key(|l| l.timestamp)
.cloned()
}))
}
fn get_by_actor(
&self,
actor: &str,
options: QueryOptions,
) -> RepositoryResult<Vec<LineageRecord>> {
let lineages = self.lineages.read();
let mut result: Vec<_> = lineages
.values()
.filter(|l| l.actor == actor)
.cloned()
.collect();
if let Some(limit) = options.limit {
result.truncate(limit);
}
Ok(result)
}
fn get_by_operation(
&self,
operation: Operation,
options: QueryOptions,
) -> RepositoryResult<Vec<LineageRecord>> {
let lineages = self.lineages.read();
let mut result: Vec<_> = lineages
.values()
.filter(|l| l.operation == operation)
.cloned()
.collect();
if let Some(limit) = options.limit {
result.truncate(limit);
}
Ok(result)
}
fn get_by_witness(&self, witness_id: WitnessId) -> RepositoryResult<Vec<LineageRecord>> {
let lineages = self.lineages.read();
Ok(lineages
.values()
.filter(|l| l.authorizing_witness == witness_id)
.cloned()
.collect())
}
fn get_by_time_range(
&self,
range: TimeRange,
options: QueryOptions,
) -> RepositoryResult<Vec<LineageRecord>> {
let lineages = self.lineages.read();
let mut result: Vec<_> = lineages
.values()
.filter(|l| range.contains(l.timestamp))
.cloned()
.collect();
result.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
if let Some(limit) = options.limit {
result.truncate(limit);
}
Ok(result)
}
fn get_all_dependencies(&self, id: LineageId) -> RepositoryResult<Vec<LineageRecord>> {
let lineages = self.lineages.read();
let mut visited = std::collections::HashSet::new();
let mut result = Vec::new();
let mut stack = vec![id];
while let Some(current_id) = stack.pop() {
if !visited.insert(current_id) {
continue;
}
if let Some(lineage) = lineages.get(¤t_id) {
if current_id != id {
result.push(lineage.clone());
}
for dep_id in &lineage.dependencies {
if !visited.contains(dep_id) {
stack.push(*dep_id);
}
}
}
}
Ok(result)
}
fn get_dependents(&self, id: LineageId) -> RepositoryResult<Vec<LineageRecord>> {
let lineages = self.lineages.read();
Ok(lineages
.values()
.filter(|l| l.dependencies.contains(&id))
.cloned()
.collect())
}
fn count(&self) -> RepositoryResult<u64> {
Ok(self.lineages.read().len() as u64)
}
fn verify_no_cycles(&self) -> RepositoryResult<bool> {
let lineages = self.lineages.read();
let mut in_degree: HashMap<LineageId, usize> = HashMap::new();
let mut graph: HashMap<LineageId, Vec<LineageId>> = HashMap::new();
for (id, lineage) in lineages.iter() {
in_degree.entry(*id).or_insert(0);
for dep_id in &lineage.dependencies {
graph.entry(*dep_id).or_default().push(*id);
*in_degree.entry(*id).or_insert(0) += 1;
}
}
let mut queue: Vec<_> = in_degree
.iter()
.filter(|(_, °)| deg == 0)
.map(|(id, _)| *id)
.collect();
let mut visited = 0;
while let Some(id) = queue.pop() {
visited += 1;
if let Some(dependents) = graph.get(&id) {
for dep_id in dependents {
if let Some(deg) = in_degree.get_mut(dep_id) {
*deg -= 1;
if *deg == 0 {
queue.push(*dep_id);
}
}
}
}
}
Ok(visited == lineages.len())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::governance::{
EnergySnapshot, GateDecision, PolicyBundleRef, ThresholdConfig,
WitnessComputeLane as ComputeLane,
};
fn test_policy() -> PolicyBundle {
let mut policy = PolicyBundle::new("test-policy");
let _ = policy.add_threshold("default", ThresholdConfig::default());
policy
}
fn test_witness(policy_ref: PolicyBundleRef, prev: Option<&WitnessRecord>) -> WitnessRecord {
WitnessRecord::new(
Hash::from_bytes([1u8; 32]),
EnergySnapshot::new(0.5, 0.3, "test"),
GateDecision::allow(ComputeLane::Reflex),
policy_ref,
prev,
)
}
fn test_lineage(witness_id: WitnessId, deps: Vec<LineageId>) -> LineageRecord {
LineageRecord::new(
EntityRef::node("test-node"),
Operation::Create,
deps,
witness_id,
"test-actor",
)
}
#[test]
fn test_policy_repository() -> RepositoryResult<()> {
let repo = InMemoryPolicyRepository::new();
let policy = test_policy();
repo.save(&policy)?;
let retrieved = repo.get(policy.id)?;
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().name, "test-policy");
assert!(repo.exists(policy.id)?);
Ok(())
}
#[test]
fn test_witness_repository_chain() -> RepositoryResult<()> {
let repo = InMemoryWitnessRepository::new();
let policy_ref = test_policy().reference();
let genesis = test_witness(policy_ref.clone(), None);
repo.save(&genesis)?;
let second = test_witness(policy_ref, Some(&genesis));
repo.save(&second)?;
assert_eq!(repo.count()?, 2);
let head = repo.get_head()?;
assert!(head.is_some());
assert_eq!(head.unwrap().sequence, 1);
assert!(repo.verify_chain(0)?);
Ok(())
}
#[test]
fn test_lineage_repository_dependencies() -> RepositoryResult<()> {
let repo = InMemoryLineageRepository::new();
let witness_id = super::super::WitnessId::new();
let root = test_lineage(witness_id, vec![]);
repo.save(&root)?;
let dependent = test_lineage(witness_id, vec![root.id]);
repo.save(&dependent)?;
let deps = repo.get_all_dependencies(dependent.id)?;
assert_eq!(deps.len(), 1);
assert_eq!(deps[0].id, root.id);
let dependents = repo.get_dependents(root.id)?;
assert_eq!(dependents.len(), 1);
assert_eq!(dependents[0].id, dependent.id);
assert!(repo.verify_no_cycles()?);
Ok(())
}
#[test]
fn test_query_options() {
let options = QueryOptions::default()
.with_limit(10)
.with_offset(5)
.descending();
assert_eq!(options.limit, Some(10));
assert_eq!(options.offset, Some(5));
assert!(!options.ascending);
}
}