Skip to main content

converge_core/
context.rs

1// Copyright 2024-2026 Reflective Labs
2// SPDX-License-Identifier: MIT
3
4//! Context model for Converge.
5//!
6//! Context is the shared, typed, evolving representation of a job.
7//! Types are defined in `converge-traits`; this module provides the
8//! concrete `Context` struct that the engine uses.
9
10use crate::error::ConvergeError;
11use crate::{AdmissionReceipt, AdmissionRequest};
12use std::collections::{BTreeMap, BTreeSet, HashMap};
13
14// Re-export canonical types from converge-pack
15pub use converge_pack::{
16    ContextFact, ContextKey, FactId, FactPayload, ProposalId, ProposedFact, TextPayload, Timestamp,
17    ValidationError,
18};
19
20/// Durable, verified context snapshot for storage adapters.
21///
22/// This is the supported rehydration boundary for embedders such as Helms.
23/// Storage persists this value and later calls [`ContextState::from_snapshot`].
24/// It must not reconstruct facts through promotion constructors.
25#[derive(Debug, Clone, serde::Serialize)]
26#[serde(deny_unknown_fields)]
27pub struct ContextSnapshot {
28    version: u64,
29    merkle_root: crate::integrity::MerkleRoot,
30    facts: BTreeMap<ContextKey, Vec<ContextFact>>,
31    proposals: BTreeMap<ContextKey, Vec<ProposedFact>>,
32}
33
34impl ContextSnapshot {
35    /// Build a storage snapshot from a live context.
36    #[must_use]
37    pub fn from_context(context: &ContextState) -> Self {
38        let facts = context
39            .facts
40            .iter()
41            .map(|(key, facts)| (*key, facts.clone()))
42            .collect();
43        let proposals = context
44            .proposals
45            .iter()
46            .map(|(key, proposals)| (*key, proposals.clone()))
47            .collect();
48
49        Self {
50            version: context.version,
51            merkle_root: crate::integrity::MerkleRoot::from_context(context),
52            facts,
53            proposals,
54        }
55    }
56
57    /// Returns the context version captured by the snapshot.
58    #[must_use]
59    pub fn version(&self) -> u64 {
60        self.version
61    }
62
63    /// Returns the snapshot Merkle root.
64    #[must_use]
65    pub fn merkle_root(&self) -> &crate::integrity::MerkleRoot {
66        &self.merkle_root
67    }
68
69    /// Returns fact projections grouped by context key.
70    #[must_use]
71    pub fn facts(&self) -> &BTreeMap<ContextKey, Vec<ContextFact>> {
72        &self.facts
73    }
74
75    /// Returns staged proposals grouped by context key.
76    #[must_use]
77    pub fn proposals(&self) -> &BTreeMap<ContextKey, Vec<ProposedFact>> {
78        &self.proposals
79    }
80
81    fn validate(&self) -> Result<(), ConvergeError> {
82        for (key, facts) in &self.facts {
83            let mut seen = BTreeSet::new();
84            for fact in facts {
85                if fact.key() != *key {
86                    return Err(ConvergeError::InvalidSnapshot {
87                        reason: format!(
88                            "fact '{}' stored under {:?} but declares {:?}",
89                            fact.id(),
90                            key,
91                            fact.key()
92                        ),
93                    });
94                }
95                if !seen.insert(fact.id().clone()) {
96                    return Err(ConvergeError::InvalidSnapshot {
97                        reason: format!("duplicate fact '{}' under {:?}", fact.id(), key),
98                    });
99                }
100            }
101        }
102
103        for (key, proposals) in &self.proposals {
104            let mut seen = BTreeSet::new();
105            for proposal in proposals {
106                if proposal.key() != *key {
107                    return Err(ConvergeError::InvalidSnapshot {
108                        reason: format!(
109                            "proposal '{}' stored under {:?} but declares {:?}",
110                            proposal.id(),
111                            key,
112                            proposal.key()
113                        ),
114                    });
115                }
116                if !seen.insert(proposal.id().clone()) {
117                    return Err(ConvergeError::InvalidSnapshot {
118                        reason: format!("duplicate proposal '{}' under {:?}", proposal.id(), key),
119                    });
120                }
121            }
122        }
123
124        let context = ContextState {
125            facts: self
126                .facts
127                .iter()
128                .map(|(key, facts)| (*key, facts.clone()))
129                .collect(),
130            proposals: self
131                .proposals
132                .iter()
133                .map(|(key, proposals)| (*key, proposals.clone()))
134                .collect(),
135            dirty_keys: Vec::new(),
136            version: self.version,
137        };
138        let computed_root = crate::integrity::MerkleRoot::from_context(&context);
139        if computed_root != self.merkle_root {
140            return Err(ConvergeError::InvalidSnapshot {
141                reason: "snapshot merkle root does not match restored facts".to_string(),
142            });
143        }
144
145        Ok(())
146    }
147}
148
149pub(crate) fn new_fact(
150    key: ContextKey,
151    id: impl Into<FactId>,
152    content: impl Into<String>,
153) -> ContextFact {
154    new_fact_with_promotion(
155        key,
156        id,
157        TextPayload::new(content),
158        converge_pack::FactPromotionRecord::new_projection(
159            "engine-projection",
160            converge_pack::ContentHash::zero(),
161            converge_pack::FactActor::new_projection(
162                "converge-engine",
163                converge_pack::FactActorKind::System,
164            ),
165            converge_pack::FactValidationSummary::default(),
166            Vec::new(),
167            converge_pack::FactTraceLink::Local(converge_pack::FactLocalTrace::new_projection(
168                "engine-projection",
169                "seed",
170                None,
171                true,
172            )),
173            Timestamp::epoch(),
174        ),
175        Timestamp::epoch(),
176    )
177}
178
179pub(crate) fn new_fact_with_promotion(
180    key: ContextKey,
181    id: impl Into<FactId>,
182    payload: impl FactPayload + PartialEq,
183    promotion_record: converge_pack::FactPromotionRecord,
184    created_at: impl Into<Timestamp>,
185) -> ContextFact {
186    ContextFact::new_projection(key, id, payload, promotion_record, created_at)
187}
188
189/// The shared context for a Converge job.
190///
191/// Agents receive `&dyn converge_pack::Context` (immutable) during execution.
192/// Only the engine holds `&mut Context` during the merge phase.
193#[derive(Debug, Default, Clone, serde::Serialize)]
194pub struct ContextState {
195    /// Facts stored by their key category.
196    facts: HashMap<ContextKey, Vec<ContextFact>>,
197    /// Pending proposals staged for engine validation/promotion.
198    proposals: HashMap<ContextKey, Vec<ProposedFact>>,
199    /// Tracks which keys changed in the last merge cycle.
200    dirty_keys: Vec<ContextKey>,
201    /// Monotonic version counter for convergence detection.
202    version: u64,
203}
204
205/// Implement the converge-pack Context trait for the concrete Context struct.
206/// This allows agents to use `&dyn converge_pack::Context`.
207impl converge_pack::Context for ContextState {
208    fn has(&self, key: ContextKey) -> bool {
209        self.facts.get(&key).is_some_and(|v| !v.is_empty())
210    }
211
212    fn get(&self, key: ContextKey) -> &[ContextFact] {
213        self.facts.get(&key).map_or(&[], Vec::as_slice)
214    }
215
216    fn get_proposals(&self, key: ContextKey) -> &[ProposedFact] {
217        self.proposals.get(&key).map_or(&[], Vec::as_slice)
218    }
219}
220
221impl ContextState {
222    /// Creates a new empty context.
223    #[must_use]
224    pub fn new() -> Self {
225        Self::default()
226    }
227
228    /// Captures a durable storage snapshot for later rehydration.
229    #[must_use]
230    pub fn snapshot(&self) -> ContextSnapshot {
231        ContextSnapshot::from_context(self)
232    }
233
234    /// Rehydrates a context from a verified storage snapshot.
235    ///
236    /// This restores previously promoted context state. It is not a promotion
237    /// API: malformed snapshots, key mismatches, duplicate IDs, and Merkle
238    /// mismatches are rejected before the context is returned.
239    pub fn from_snapshot(snapshot: ContextSnapshot) -> Result<Self, ConvergeError> {
240        snapshot.validate()?;
241        Ok(Self {
242            facts: snapshot.facts.into_iter().collect(),
243            proposals: snapshot.proposals.into_iter().collect(),
244            dirty_keys: Vec::new(),
245            version: snapshot.version,
246        })
247    }
248
249    /// Returns all facts for a given key.
250    #[must_use]
251    pub fn get(&self, key: ContextKey) -> &[ContextFact] {
252        self.facts.get(&key).map_or(&[], Vec::as_slice)
253    }
254
255    /// Returns true if there are any facts for the given key.
256    #[must_use]
257    pub fn has(&self, key: ContextKey) -> bool {
258        self.facts.get(&key).is_some_and(|v| !v.is_empty())
259    }
260
261    /// Returns the current version (for convergence detection).
262    #[must_use]
263    pub fn version(&self) -> u64 {
264        self.version
265    }
266
267    /// Returns keys that changed in the last merge cycle.
268    #[must_use]
269    pub fn dirty_keys(&self) -> &[ContextKey] {
270        &self.dirty_keys
271    }
272
273    /// Returns all keys that currently have facts in the context.
274    #[must_use]
275    pub fn all_keys(&self) -> Vec<ContextKey> {
276        self.facts.keys().copied().collect()
277    }
278
279    /// Returns true if any staged proposals are pending promotion.
280    #[must_use]
281    pub fn has_pending_proposals(&self) -> bool {
282        self.proposals.values().any(|items| !items.is_empty())
283    }
284
285    /// Clears the dirty key tracker (called at start of each cycle).
286    pub fn clear_dirty(&mut self) {
287        self.dirty_keys.clear();
288    }
289
290    /// Stages a proposal for engine validation/promotion.
291    ///
292    /// Returns `Ok(true)` if the proposal was new.
293    /// Returns `Ok(false)` if an identical proposal is already pending.
294    pub fn add_proposal(&mut self, proposal: ProposedFact) -> Result<bool, ConvergeError> {
295        let key = proposal.key();
296        let proposals = self.proposals.entry(key).or_default();
297
298        if let Some(existing) = proposals.iter().find(|p| p.id() == proposal.id()) {
299            if existing == &proposal {
300                return Ok(false);
301            }
302            return Err(ConvergeError::Conflict {
303                id: proposal.id().to_string(),
304                existing: format!("{existing:?}"),
305                new: format!("{proposal:?}"),
306                context: Box::new(self.clone()),
307            });
308        }
309
310        proposals.push(proposal);
311        Ok(true)
312    }
313
314    /// Stages external input as a proposal to be governed by the engine.
315    pub fn add_input(
316        &mut self,
317        key: ContextKey,
318        id: impl Into<ProposalId>,
319        content: impl Into<String>,
320    ) -> Result<bool, ConvergeError> {
321        self.add_input_with_provenance(key, id, content, "context-input")
322    }
323
324    /// Stages external input with explicit provenance.
325    pub fn add_input_with_provenance(
326        &mut self,
327        key: ContextKey,
328        id: impl Into<ProposalId>,
329        content: impl Into<String>,
330        provenance: impl Into<String>,
331    ) -> Result<bool, ConvergeError> {
332        self.add_proposal(ProposedFact::new(
333            key,
334            id,
335            TextPayload::new(content),
336            converge_pack::Provenance::new(provenance.into()),
337        ))
338    }
339
340    /// Stages a typed external observation as a proposal.
341    ///
342    /// This is the preferred boundary for systems such as Organism. It records
343    /// actor and source provenance, but it does not create authoritative facts.
344    pub fn submit_observation(
345        &mut self,
346        request: AdmissionRequest,
347    ) -> Result<AdmissionReceipt, ConvergeError> {
348        let staged = self.add_proposal(request.clone().into_proposal())?;
349        Ok(AdmissionReceipt::new(&request, staged))
350    }
351
352    /// Drains all pending proposals from the context.
353    pub(crate) fn drain_proposals(&mut self) -> Vec<ProposedFact> {
354        let mut drained = Vec::new();
355        for proposals in self.proposals.values_mut() {
356            drained.append(proposals);
357        }
358        self.proposals.retain(|_, proposals| !proposals.is_empty());
359        drained
360    }
361
362    /// Removes a specific pending proposal if it exists.
363    pub(crate) fn remove_proposal(&mut self, key: ContextKey, id: &ProposalId) {
364        if let Some(proposals) = self.proposals.get_mut(&key) {
365            proposals.retain(|proposal| proposal.id != id);
366            if proposals.is_empty() {
367                self.proposals.remove(&key);
368            }
369        }
370    }
371
372    /// Adds a fact to the context (engine-only, during merge phase).
373    ///
374    /// Returns `Ok(true)` if the fact was new (context changed).
375    /// Returns `Ok(false)` if the fact was already present and identical.
376    pub(crate) fn add_fact(&mut self, fact: ContextFact) -> Result<bool, ConvergeError> {
377        let key = fact.key();
378        let facts = self.facts.entry(key).or_default();
379
380        if let Some(existing) = facts.iter().find(|f| f.id() == fact.id()) {
381            if existing == &fact {
382                return Ok(false);
383            }
384            return Err(ConvergeError::Conflict {
385                id: fact.id().to_string(),
386                existing: format!("{existing:?}"),
387                new: format!("{fact:?}"),
388                context: Box::new(self.clone()),
389            });
390        }
391
392        facts.push(fact);
393        self.proposals.remove(&key);
394        self.dirty_keys.push(key);
395
396        self.version += 1;
397        Ok(true)
398    }
399}
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404    use converge_pack::Context as _;
405
406    #[test]
407    fn empty_context_has_no_facts() {
408        let ctx = ContextState::new();
409        assert!(!ctx.has(ContextKey::Seeds));
410        assert_eq!(ctx.version(), 0);
411    }
412
413    #[test]
414    fn adding_fact_increments_version() {
415        let mut ctx = ContextState::new();
416        let fact = crate::context::new_fact(ContextKey::Seeds, "seed-1", "initial value");
417
418        let changed = ctx.add_fact(fact).expect("should add");
419        assert!(changed);
420        assert_eq!(ctx.version(), 1);
421        assert!(ctx.has(ContextKey::Seeds));
422    }
423
424    #[test]
425    fn duplicate_fact_does_not_change_context() {
426        let mut ctx = ContextState::new();
427        let fact = crate::context::new_fact(ContextKey::Seeds, "seed-1", "initial");
428
429        ctx.add_fact(fact.clone()).expect("should add first");
430        let changed = ctx.add_fact(fact).expect("should not error on duplicate");
431        assert!(!changed);
432        assert_eq!(ctx.version(), 1);
433    }
434
435    #[test]
436    fn dirty_keys_track_new_facts_and_clear() {
437        let mut ctx = ContextState::new();
438        let fact = crate::context::new_fact(ContextKey::Hypotheses, "hyp-1", "value");
439
440        ctx.add_fact(fact).expect("should add");
441        assert_eq!(ctx.dirty_keys(), &[ContextKey::Hypotheses]);
442
443        ctx.clear_dirty();
444        assert!(ctx.dirty_keys().is_empty());
445    }
446
447    #[test]
448    fn detects_conflict() {
449        let mut ctx = ContextState::new();
450        ctx.add_fact(crate::context::new_fact(
451            ContextKey::Seeds,
452            "fact-1",
453            "version A",
454        ))
455        .unwrap();
456
457        let result = ctx.add_fact(crate::context::new_fact(
458            ContextKey::Seeds,
459            "fact-1",
460            "version B",
461        ));
462
463        match result {
464            Err(ConvergeError::Conflict {
465                id, existing, new, ..
466            }) => {
467                assert_eq!(id, "fact-1");
468                assert!(existing.contains("ContextFact"));
469                assert!(new.contains("ContextFact"));
470            }
471            _ => panic!("Expected Conflict error, got {result:?}"),
472        }
473    }
474
475    #[test]
476    fn adding_proposal_tracks_pending_state() {
477        let mut ctx = ContextState::new();
478        let proposal = ProposedFact::new(
479            ContextKey::Hypotheses,
480            "hyp-1",
481            TextPayload::new("market is growing"),
482            "test",
483        );
484
485        assert!(ctx.add_proposal(proposal).unwrap());
486        assert!(ctx.has_pending_proposals());
487        assert_eq!(ctx.get_proposals(ContextKey::Hypotheses).len(), 1);
488    }
489
490    #[test]
491    fn conflicting_staged_inputs_are_rejected_before_promotion() {
492        let mut ctx = ContextState::new();
493
494        assert!(
495            ctx.add_input_with_provenance(ContextKey::Seeds, "seed-1", "version A", "user")
496                .unwrap()
497        );
498
499        let result =
500            ctx.add_input_with_provenance(ContextKey::Seeds, "seed-1", "version B", "user");
501
502        match result {
503            Err(ConvergeError::Conflict {
504                id, existing, new, ..
505            }) => {
506                assert_eq!(id, "seed-1");
507                assert!(existing.contains("ProposedFact"));
508                assert!(new.contains("ProposedFact"));
509            }
510            _ => panic!("Expected Conflict error, got {result:?}"),
511        }
512
513        assert!(ctx.has_pending_proposals());
514        assert_eq!(ctx.get_proposals(ContextKey::Seeds).len(), 1);
515    }
516
517    #[test]
518    fn snapshot_round_trips_facts_and_proposals() {
519        let mut ctx = ContextState::new();
520        ctx.add_fact(crate::context::new_fact(
521            ContextKey::Seeds,
522            "seed-1",
523            "persisted seed",
524        ))
525        .unwrap();
526        ctx.add_proposal(ProposedFact::new(
527            ContextKey::Hypotheses,
528            "hyp-1",
529            TextPayload::new("staged hypothesis"),
530            "test",
531        ))
532        .unwrap();
533
534        let restored = ContextState::from_snapshot(ctx.snapshot()).unwrap();
535
536        assert_eq!(restored.version(), 1);
537        assert!(restored.dirty_keys().is_empty());
538        assert_eq!(restored.get(ContextKey::Seeds)[0].id(), "seed-1");
539        assert_eq!(
540            restored.get(ContextKey::Seeds)[0].text(),
541            Some("persisted seed")
542        );
543        assert_eq!(
544            restored.get_proposals(ContextKey::Hypotheses)[0].id(),
545            "hyp-1"
546        );
547    }
548
549    #[test]
550    fn snapshot_rejects_fact_key_mismatch() {
551        let mut ctx = ContextState::new();
552        ctx.add_fact(crate::context::new_fact(
553            ContextKey::Seeds,
554            "seed-1",
555            "value",
556        ))
557        .unwrap();
558
559        let mut snapshot = ctx.snapshot();
560        let fact = snapshot
561            .facts
562            .get_mut(&ContextKey::Seeds)
563            .unwrap()
564            .pop()
565            .unwrap();
566        snapshot
567            .facts
568            .entry(ContextKey::Signals)
569            .or_default()
570            .push(fact);
571
572        let err = ContextState::from_snapshot(snapshot).unwrap_err();
573        assert!(matches!(err, ConvergeError::InvalidSnapshot { .. }));
574        assert!(err.to_string().contains("stored under Signals"));
575    }
576
577    #[test]
578    fn snapshot_rejects_merkle_mismatch() {
579        let mut ctx = ContextState::new();
580        ctx.add_fact(crate::context::new_fact(
581            ContextKey::Seeds,
582            "seed-1",
583            "value",
584        ))
585        .unwrap();
586
587        let mut snapshot = ctx.snapshot();
588        snapshot.merkle_root =
589            crate::integrity::MerkleRoot(crate::integrity::ContentHash::compute("tampered"));
590
591        let err = ContextState::from_snapshot(snapshot).unwrap_err();
592        assert!(matches!(err, ConvergeError::InvalidSnapshot { .. }));
593        assert!(err.to_string().contains("merkle root"));
594    }
595
596    #[test]
597    fn snapshot_rejects_duplicate_fact_ids() {
598        let mut ctx = ContextState::new();
599        ctx.add_fact(crate::context::new_fact(
600            ContextKey::Seeds,
601            "seed-1",
602            "value",
603        ))
604        .unwrap();
605
606        let mut snapshot = ctx.snapshot();
607        let duplicate = snapshot.facts.get(&ContextKey::Seeds).unwrap()[0].clone();
608        snapshot
609            .facts
610            .get_mut(&ContextKey::Seeds)
611            .unwrap()
612            .push(duplicate);
613
614        let err = ContextState::from_snapshot(snapshot).unwrap_err();
615        assert!(matches!(err, ConvergeError::InvalidSnapshot { .. }));
616        assert!(err.to_string().contains("duplicate fact"));
617    }
618
619    /// Test that Context implements the converge_pack::Context trait.
620    #[test]
621    fn context_implements_trait() {
622        let mut ctx = ContextState::new();
623        ctx.add_fact(crate::context::new_fact(ContextKey::Seeds, "s1", "hello"))
624            .unwrap();
625
626        // Use via trait object
627        let dyn_ctx: &dyn converge_pack::Context = &ctx;
628        assert!(dyn_ctx.has(ContextKey::Seeds));
629        assert_eq!(dyn_ctx.get(ContextKey::Seeds).len(), 1);
630    }
631}