use crate::error::ConvergeError;
use crate::{AdmissionReceipt, AdmissionRequest};
use std::collections::{BTreeMap, BTreeSet, HashMap};
pub use converge_pack::{
ContextFact, ContextKey, FactId, ProposalId, ProposedFact, Timestamp, ValidationError,
};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ContextSnapshot {
version: u64,
merkle_root: crate::integrity::MerkleRoot,
facts: BTreeMap<ContextKey, Vec<ContextFact>>,
proposals: BTreeMap<ContextKey, Vec<ProposedFact>>,
}
impl ContextSnapshot {
#[must_use]
pub fn from_context(context: &ContextState) -> Self {
let facts = context
.facts
.iter()
.map(|(key, facts)| (*key, facts.clone()))
.collect();
let proposals = context
.proposals
.iter()
.map(|(key, proposals)| (*key, proposals.clone()))
.collect();
Self {
version: context.version,
merkle_root: crate::integrity::MerkleRoot::from_context(context),
facts,
proposals,
}
}
#[must_use]
pub fn version(&self) -> u64 {
self.version
}
#[must_use]
pub fn merkle_root(&self) -> &crate::integrity::MerkleRoot {
&self.merkle_root
}
#[must_use]
pub fn facts(&self) -> &BTreeMap<ContextKey, Vec<ContextFact>> {
&self.facts
}
#[must_use]
pub fn proposals(&self) -> &BTreeMap<ContextKey, Vec<ProposedFact>> {
&self.proposals
}
fn validate(&self) -> Result<(), ConvergeError> {
for (key, facts) in &self.facts {
let mut seen = BTreeSet::new();
for fact in facts {
if fact.key() != *key {
return Err(ConvergeError::InvalidSnapshot {
reason: format!(
"fact '{}' stored under {:?} but declares {:?}",
fact.id(),
key,
fact.key()
),
});
}
if !seen.insert(fact.id().clone()) {
return Err(ConvergeError::InvalidSnapshot {
reason: format!("duplicate fact '{}' under {:?}", fact.id(), key),
});
}
}
}
for (key, proposals) in &self.proposals {
let mut seen = BTreeSet::new();
for proposal in proposals {
if proposal.key() != *key {
return Err(ConvergeError::InvalidSnapshot {
reason: format!(
"proposal '{}' stored under {:?} but declares {:?}",
proposal.id(),
key,
proposal.key()
),
});
}
if !seen.insert(proposal.id().clone()) {
return Err(ConvergeError::InvalidSnapshot {
reason: format!("duplicate proposal '{}' under {:?}", proposal.id(), key),
});
}
}
}
let context = ContextState {
facts: self
.facts
.iter()
.map(|(key, facts)| (*key, facts.clone()))
.collect(),
proposals: self
.proposals
.iter()
.map(|(key, proposals)| (*key, proposals.clone()))
.collect(),
dirty_keys: Vec::new(),
version: self.version,
};
let computed_root = crate::integrity::MerkleRoot::from_context(&context);
if computed_root != self.merkle_root {
return Err(ConvergeError::InvalidSnapshot {
reason: "snapshot merkle root does not match restored facts".to_string(),
});
}
Ok(())
}
}
pub(crate) fn new_fact(
key: ContextKey,
id: impl Into<FactId>,
content: impl Into<String>,
) -> ContextFact {
new_fact_with_promotion(
key,
id,
content,
converge_pack::FactPromotionRecord::new_projection(
"engine-projection",
converge_pack::ContentHash::zero(),
converge_pack::FactActor::new_projection(
"converge-engine",
converge_pack::FactActorKind::System,
),
converge_pack::FactValidationSummary::default(),
Vec::new(),
converge_pack::FactTraceLink::Local(converge_pack::FactLocalTrace::new_projection(
"engine-projection",
"seed",
None,
true,
)),
Timestamp::epoch(),
),
Timestamp::epoch(),
)
}
pub(crate) fn new_fact_with_promotion(
key: ContextKey,
id: impl Into<FactId>,
content: impl Into<String>,
promotion_record: converge_pack::FactPromotionRecord,
created_at: impl Into<Timestamp>,
) -> ContextFact {
ContextFact::new_projection(key, id, content, promotion_record, created_at)
}
#[derive(Debug, Default, Clone, serde::Serialize)]
pub struct ContextState {
facts: HashMap<ContextKey, Vec<ContextFact>>,
proposals: HashMap<ContextKey, Vec<ProposedFact>>,
dirty_keys: Vec<ContextKey>,
version: u64,
}
impl converge_pack::Context for ContextState {
fn has(&self, key: ContextKey) -> bool {
self.facts.get(&key).is_some_and(|v| !v.is_empty())
}
fn get(&self, key: ContextKey) -> &[ContextFact] {
self.facts.get(&key).map_or(&[], Vec::as_slice)
}
fn get_proposals(&self, key: ContextKey) -> &[ProposedFact] {
self.proposals.get(&key).map_or(&[], Vec::as_slice)
}
}
impl ContextState {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn snapshot(&self) -> ContextSnapshot {
ContextSnapshot::from_context(self)
}
pub fn from_snapshot(snapshot: ContextSnapshot) -> Result<Self, ConvergeError> {
snapshot.validate()?;
Ok(Self {
facts: snapshot.facts.into_iter().collect(),
proposals: snapshot.proposals.into_iter().collect(),
dirty_keys: Vec::new(),
version: snapshot.version,
})
}
#[must_use]
pub fn get(&self, key: ContextKey) -> &[ContextFact] {
self.facts.get(&key).map_or(&[], Vec::as_slice)
}
#[must_use]
pub fn has(&self, key: ContextKey) -> bool {
self.facts.get(&key).is_some_and(|v| !v.is_empty())
}
#[must_use]
pub fn version(&self) -> u64 {
self.version
}
#[must_use]
pub fn dirty_keys(&self) -> &[ContextKey] {
&self.dirty_keys
}
#[must_use]
pub fn all_keys(&self) -> Vec<ContextKey> {
self.facts.keys().copied().collect()
}
#[must_use]
pub fn has_pending_proposals(&self) -> bool {
self.proposals.values().any(|items| !items.is_empty())
}
pub fn clear_dirty(&mut self) {
self.dirty_keys.clear();
}
pub fn add_proposal(&mut self, proposal: ProposedFact) -> Result<bool, ConvergeError> {
let key = proposal.key;
let proposals = self.proposals.entry(key).or_default();
if let Some(existing) = proposals.iter().find(|p| p.id == proposal.id) {
if existing.content == proposal.content
&& existing.confidence() == proposal.confidence()
&& existing.provenance == proposal.provenance
{
return Ok(false);
}
return Err(ConvergeError::Conflict {
id: proposal.id.to_string(),
existing: existing.content.clone(),
new: proposal.content,
context: Box::new(self.clone()),
});
}
proposals.push(proposal);
Ok(true)
}
pub fn add_input(
&mut self,
key: ContextKey,
id: impl Into<ProposalId>,
content: impl Into<String>,
) -> Result<bool, ConvergeError> {
self.add_input_with_provenance(key, id, content, "context-input")
}
pub fn add_input_with_provenance(
&mut self,
key: ContextKey,
id: impl Into<ProposalId>,
content: impl Into<String>,
provenance: impl Into<String>,
) -> Result<bool, ConvergeError> {
self.add_proposal(ProposedFact::new(key, id, content, provenance))
}
pub fn submit_observation(
&mut self,
request: AdmissionRequest,
) -> Result<AdmissionReceipt, ConvergeError> {
let staged = self.add_proposal(request.clone().into_proposal())?;
Ok(AdmissionReceipt::new(&request, staged))
}
pub(crate) fn drain_proposals(&mut self) -> Vec<ProposedFact> {
let mut drained = Vec::new();
for proposals in self.proposals.values_mut() {
drained.append(proposals);
}
self.proposals.retain(|_, proposals| !proposals.is_empty());
drained
}
pub(crate) fn remove_proposal(&mut self, key: ContextKey, id: &ProposalId) {
if let Some(proposals) = self.proposals.get_mut(&key) {
proposals.retain(|proposal| proposal.id != id);
if proposals.is_empty() {
self.proposals.remove(&key);
}
}
}
pub(crate) fn add_fact(&mut self, fact: ContextFact) -> Result<bool, ConvergeError> {
let key = fact.key();
let facts = self.facts.entry(key).or_default();
if let Some(existing) = facts.iter().find(|f| f.id() == fact.id()) {
if existing.content() == fact.content() {
return Ok(false);
}
return Err(ConvergeError::Conflict {
id: fact.id().to_string(),
existing: existing.content().to_string(),
new: fact.content().to_string(),
context: Box::new(self.clone()),
});
}
facts.push(fact);
self.proposals.remove(&key);
self.dirty_keys.push(key);
self.version += 1;
Ok(true)
}
}
#[cfg(test)]
mod tests {
use super::*;
use converge_pack::Context as _;
#[test]
fn empty_context_has_no_facts() {
let ctx = ContextState::new();
assert!(!ctx.has(ContextKey::Seeds));
assert_eq!(ctx.version(), 0);
}
#[test]
fn adding_fact_increments_version() {
let mut ctx = ContextState::new();
let fact = crate::context::new_fact(ContextKey::Seeds, "seed-1", "initial value");
let changed = ctx.add_fact(fact).expect("should add");
assert!(changed);
assert_eq!(ctx.version(), 1);
assert!(ctx.has(ContextKey::Seeds));
}
#[test]
fn duplicate_fact_does_not_change_context() {
let mut ctx = ContextState::new();
let fact = crate::context::new_fact(ContextKey::Seeds, "seed-1", "initial");
ctx.add_fact(fact.clone()).expect("should add first");
let changed = ctx.add_fact(fact).expect("should not error on duplicate");
assert!(!changed);
assert_eq!(ctx.version(), 1);
}
#[test]
fn dirty_keys_track_new_facts_and_clear() {
let mut ctx = ContextState::new();
let fact = crate::context::new_fact(ContextKey::Hypotheses, "hyp-1", "value");
ctx.add_fact(fact).expect("should add");
assert_eq!(ctx.dirty_keys(), &[ContextKey::Hypotheses]);
ctx.clear_dirty();
assert!(ctx.dirty_keys().is_empty());
}
#[test]
fn detects_conflict() {
let mut ctx = ContextState::new();
ctx.add_fact(crate::context::new_fact(
ContextKey::Seeds,
"fact-1",
"version A",
))
.unwrap();
let result = ctx.add_fact(crate::context::new_fact(
ContextKey::Seeds,
"fact-1",
"version B",
));
match result {
Err(ConvergeError::Conflict {
id, existing, new, ..
}) => {
assert_eq!(id, "fact-1");
assert_eq!(existing, "version A");
assert_eq!(new, "version B");
}
_ => panic!("Expected Conflict error, got {result:?}"),
}
}
#[test]
fn adding_proposal_tracks_pending_state() {
let mut ctx = ContextState::new();
let proposal =
ProposedFact::new(ContextKey::Hypotheses, "hyp-1", "market is growing", "test");
assert!(ctx.add_proposal(proposal).unwrap());
assert!(ctx.has_pending_proposals());
assert_eq!(ctx.get_proposals(ContextKey::Hypotheses).len(), 1);
}
#[test]
fn conflicting_staged_inputs_are_rejected_before_promotion() {
let mut ctx = ContextState::new();
assert!(
ctx.add_input_with_provenance(ContextKey::Seeds, "seed-1", "version A", "user")
.unwrap()
);
let result =
ctx.add_input_with_provenance(ContextKey::Seeds, "seed-1", "version B", "user");
match result {
Err(ConvergeError::Conflict {
id, existing, new, ..
}) => {
assert_eq!(id, "seed-1");
assert_eq!(existing, "version A");
assert_eq!(new, "version B");
}
_ => panic!("Expected Conflict error, got {result:?}"),
}
assert!(ctx.has_pending_proposals());
assert_eq!(ctx.get_proposals(ContextKey::Seeds).len(), 1);
}
#[test]
fn snapshot_round_trips_facts_and_proposals() {
let mut ctx = ContextState::new();
ctx.add_fact(crate::context::new_fact(
ContextKey::Seeds,
"seed-1",
"persisted seed",
))
.unwrap();
ctx.add_proposal(ProposedFact::new(
ContextKey::Hypotheses,
"hyp-1",
"staged hypothesis",
"test",
))
.unwrap();
let restored = ContextState::from_snapshot(ctx.snapshot()).unwrap();
assert_eq!(restored.version(), 1);
assert!(restored.dirty_keys().is_empty());
assert_eq!(restored.get(ContextKey::Seeds)[0].id(), "seed-1");
assert_eq!(
restored.get(ContextKey::Seeds)[0].content(),
"persisted seed"
);
assert_eq!(
restored.get_proposals(ContextKey::Hypotheses)[0].id(),
"hyp-1"
);
}
#[test]
fn snapshot_rejects_fact_key_mismatch() {
let mut ctx = ContextState::new();
ctx.add_fact(crate::context::new_fact(
ContextKey::Seeds,
"seed-1",
"value",
))
.unwrap();
let mut snapshot = ctx.snapshot();
let fact = snapshot
.facts
.get_mut(&ContextKey::Seeds)
.unwrap()
.pop()
.unwrap();
snapshot
.facts
.entry(ContextKey::Signals)
.or_default()
.push(fact);
let err = ContextState::from_snapshot(snapshot).unwrap_err();
assert!(matches!(err, ConvergeError::InvalidSnapshot { .. }));
assert!(err.to_string().contains("stored under Signals"));
}
#[test]
fn snapshot_rejects_merkle_mismatch() {
let mut ctx = ContextState::new();
ctx.add_fact(crate::context::new_fact(
ContextKey::Seeds,
"seed-1",
"value",
))
.unwrap();
let mut snapshot = ctx.snapshot();
snapshot.merkle_root =
crate::integrity::MerkleRoot(crate::integrity::ContentHash::compute("tampered"));
let err = ContextState::from_snapshot(snapshot).unwrap_err();
assert!(matches!(err, ConvergeError::InvalidSnapshot { .. }));
assert!(err.to_string().contains("merkle root"));
}
#[test]
fn snapshot_rejects_duplicate_fact_ids() {
let mut ctx = ContextState::new();
ctx.add_fact(crate::context::new_fact(
ContextKey::Seeds,
"seed-1",
"value",
))
.unwrap();
let mut snapshot = ctx.snapshot();
let duplicate = snapshot.facts.get(&ContextKey::Seeds).unwrap()[0].clone();
snapshot
.facts
.get_mut(&ContextKey::Seeds)
.unwrap()
.push(duplicate);
let err = ContextState::from_snapshot(snapshot).unwrap_err();
assert!(matches!(err, ConvergeError::InvalidSnapshot { .. }));
assert!(err.to_string().contains("duplicate fact"));
}
#[test]
fn context_implements_trait() {
let mut ctx = ContextState::new();
ctx.add_fact(crate::context::new_fact(ContextKey::Seeds, "s1", "hello"))
.unwrap();
let dyn_ctx: &dyn converge_pack::Context = &ctx;
assert!(dyn_ctx.has(ContextKey::Seeds));
assert_eq!(dyn_ctx.get(ContextKey::Seeds).len(), 1);
}
}