Skip to main content

converge_core/
context.rs

1// Copyright 2024-2026 Reflective Labs
2// SPDX-License-Identifier: MIT
3
4//! Context model for Converge.
5//!
6//! Context is the shared, typed, evolving representation of a job.
7//! Types are defined in `converge-traits`; this module provides the
8//! concrete `Context` struct that the engine uses.
9
10use crate::error::ConvergeError;
11use crate::{AdmissionReceipt, AdmissionRequest};
12use std::collections::{BTreeMap, BTreeSet, HashMap};
13
14// Re-export canonical types from converge-pack
15pub use converge_pack::{
16    ContextFact, ContextKey, FactId, ProposalId, ProposedFact, Timestamp, ValidationError,
17};
18
19/// Durable, verified context snapshot for storage adapters.
20///
21/// This is the supported rehydration boundary for embedders such as Helms.
22/// Storage persists this value and later calls [`ContextState::from_snapshot`].
23/// It must not reconstruct facts through promotion constructors.
24#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
25#[serde(deny_unknown_fields)]
26pub struct ContextSnapshot {
27    version: u64,
28    merkle_root: crate::integrity::MerkleRoot,
29    facts: BTreeMap<ContextKey, Vec<ContextFact>>,
30    proposals: BTreeMap<ContextKey, Vec<ProposedFact>>,
31}
32
33impl ContextSnapshot {
34    /// Build a storage snapshot from a live context.
35    #[must_use]
36    pub fn from_context(context: &ContextState) -> Self {
37        let facts = context
38            .facts
39            .iter()
40            .map(|(key, facts)| (*key, facts.clone()))
41            .collect();
42        let proposals = context
43            .proposals
44            .iter()
45            .map(|(key, proposals)| (*key, proposals.clone()))
46            .collect();
47
48        Self {
49            version: context.version,
50            merkle_root: crate::integrity::MerkleRoot::from_context(context),
51            facts,
52            proposals,
53        }
54    }
55
56    /// Returns the context version captured by the snapshot.
57    #[must_use]
58    pub fn version(&self) -> u64 {
59        self.version
60    }
61
62    /// Returns the snapshot Merkle root.
63    #[must_use]
64    pub fn merkle_root(&self) -> &crate::integrity::MerkleRoot {
65        &self.merkle_root
66    }
67
68    /// Returns fact projections grouped by context key.
69    #[must_use]
70    pub fn facts(&self) -> &BTreeMap<ContextKey, Vec<ContextFact>> {
71        &self.facts
72    }
73
74    /// Returns staged proposals grouped by context key.
75    #[must_use]
76    pub fn proposals(&self) -> &BTreeMap<ContextKey, Vec<ProposedFact>> {
77        &self.proposals
78    }
79
80    fn validate(&self) -> Result<(), ConvergeError> {
81        for (key, facts) in &self.facts {
82            let mut seen = BTreeSet::new();
83            for fact in facts {
84                if fact.key() != *key {
85                    return Err(ConvergeError::InvalidSnapshot {
86                        reason: format!(
87                            "fact '{}' stored under {:?} but declares {:?}",
88                            fact.id(),
89                            key,
90                            fact.key()
91                        ),
92                    });
93                }
94                if !seen.insert(fact.id().clone()) {
95                    return Err(ConvergeError::InvalidSnapshot {
96                        reason: format!("duplicate fact '{}' under {:?}", fact.id(), key),
97                    });
98                }
99            }
100        }
101
102        for (key, proposals) in &self.proposals {
103            let mut seen = BTreeSet::new();
104            for proposal in proposals {
105                if proposal.key() != *key {
106                    return Err(ConvergeError::InvalidSnapshot {
107                        reason: format!(
108                            "proposal '{}' stored under {:?} but declares {:?}",
109                            proposal.id(),
110                            key,
111                            proposal.key()
112                        ),
113                    });
114                }
115                if !seen.insert(proposal.id().clone()) {
116                    return Err(ConvergeError::InvalidSnapshot {
117                        reason: format!("duplicate proposal '{}' under {:?}", proposal.id(), key),
118                    });
119                }
120            }
121        }
122
123        let context = ContextState {
124            facts: self
125                .facts
126                .iter()
127                .map(|(key, facts)| (*key, facts.clone()))
128                .collect(),
129            proposals: self
130                .proposals
131                .iter()
132                .map(|(key, proposals)| (*key, proposals.clone()))
133                .collect(),
134            dirty_keys: Vec::new(),
135            version: self.version,
136        };
137        let computed_root = crate::integrity::MerkleRoot::from_context(&context);
138        if computed_root != self.merkle_root {
139            return Err(ConvergeError::InvalidSnapshot {
140                reason: "snapshot merkle root does not match restored facts".to_string(),
141            });
142        }
143
144        Ok(())
145    }
146}
147
148pub(crate) fn new_fact(
149    key: ContextKey,
150    id: impl Into<FactId>,
151    content: impl Into<String>,
152) -> ContextFact {
153    new_fact_with_promotion(
154        key,
155        id,
156        content,
157        converge_pack::FactPromotionRecord::new_projection(
158            "engine-projection",
159            converge_pack::ContentHash::zero(),
160            converge_pack::FactActor::new_projection(
161                "converge-engine",
162                converge_pack::FactActorKind::System,
163            ),
164            converge_pack::FactValidationSummary::default(),
165            Vec::new(),
166            converge_pack::FactTraceLink::Local(converge_pack::FactLocalTrace::new_projection(
167                "engine-projection",
168                "seed",
169                None,
170                true,
171            )),
172            Timestamp::epoch(),
173        ),
174        Timestamp::epoch(),
175    )
176}
177
178pub(crate) fn new_fact_with_promotion(
179    key: ContextKey,
180    id: impl Into<FactId>,
181    content: impl Into<String>,
182    promotion_record: converge_pack::FactPromotionRecord,
183    created_at: impl Into<Timestamp>,
184) -> ContextFact {
185    ContextFact::new_projection(key, id, content, promotion_record, created_at)
186}
187
188/// The shared context for a Converge job.
189///
190/// Agents receive `&dyn converge_pack::Context` (immutable) during execution.
191/// Only the engine holds `&mut Context` during the merge phase.
192#[derive(Debug, Default, Clone, serde::Serialize)]
193pub struct ContextState {
194    /// Facts stored by their key category.
195    facts: HashMap<ContextKey, Vec<ContextFact>>,
196    /// Pending proposals staged for engine validation/promotion.
197    proposals: HashMap<ContextKey, Vec<ProposedFact>>,
198    /// Tracks which keys changed in the last merge cycle.
199    dirty_keys: Vec<ContextKey>,
200    /// Monotonic version counter for convergence detection.
201    version: u64,
202}
203
204/// Implement the converge-pack Context trait for the concrete Context struct.
205/// This allows agents to use `&dyn converge_pack::Context`.
206impl converge_pack::Context for ContextState {
207    fn has(&self, key: ContextKey) -> bool {
208        self.facts.get(&key).is_some_and(|v| !v.is_empty())
209    }
210
211    fn get(&self, key: ContextKey) -> &[ContextFact] {
212        self.facts.get(&key).map_or(&[], Vec::as_slice)
213    }
214
215    fn get_proposals(&self, key: ContextKey) -> &[ProposedFact] {
216        self.proposals.get(&key).map_or(&[], Vec::as_slice)
217    }
218}
219
220impl ContextState {
221    /// Creates a new empty context.
222    #[must_use]
223    pub fn new() -> Self {
224        Self::default()
225    }
226
227    /// Captures a durable storage snapshot for later rehydration.
228    #[must_use]
229    pub fn snapshot(&self) -> ContextSnapshot {
230        ContextSnapshot::from_context(self)
231    }
232
233    /// Rehydrates a context from a verified storage snapshot.
234    ///
235    /// This restores previously promoted context state. It is not a promotion
236    /// API: malformed snapshots, key mismatches, duplicate IDs, and Merkle
237    /// mismatches are rejected before the context is returned.
238    pub fn from_snapshot(snapshot: ContextSnapshot) -> Result<Self, ConvergeError> {
239        snapshot.validate()?;
240        Ok(Self {
241            facts: snapshot.facts.into_iter().collect(),
242            proposals: snapshot.proposals.into_iter().collect(),
243            dirty_keys: Vec::new(),
244            version: snapshot.version,
245        })
246    }
247
248    /// Returns all facts for a given key.
249    #[must_use]
250    pub fn get(&self, key: ContextKey) -> &[ContextFact] {
251        self.facts.get(&key).map_or(&[], Vec::as_slice)
252    }
253
254    /// Returns true if there are any facts for the given key.
255    #[must_use]
256    pub fn has(&self, key: ContextKey) -> bool {
257        self.facts.get(&key).is_some_and(|v| !v.is_empty())
258    }
259
260    /// Returns the current version (for convergence detection).
261    #[must_use]
262    pub fn version(&self) -> u64 {
263        self.version
264    }
265
266    /// Returns keys that changed in the last merge cycle.
267    #[must_use]
268    pub fn dirty_keys(&self) -> &[ContextKey] {
269        &self.dirty_keys
270    }
271
272    /// Returns all keys that currently have facts in the context.
273    #[must_use]
274    pub fn all_keys(&self) -> Vec<ContextKey> {
275        self.facts.keys().copied().collect()
276    }
277
278    /// Returns true if any staged proposals are pending promotion.
279    #[must_use]
280    pub fn has_pending_proposals(&self) -> bool {
281        self.proposals.values().any(|items| !items.is_empty())
282    }
283
284    /// Clears the dirty key tracker (called at start of each cycle).
285    pub fn clear_dirty(&mut self) {
286        self.dirty_keys.clear();
287    }
288
289    /// Stages a proposal for engine validation/promotion.
290    ///
291    /// Returns `Ok(true)` if the proposal was new.
292    /// Returns `Ok(false)` if an identical proposal is already pending.
293    pub fn add_proposal(&mut self, proposal: ProposedFact) -> Result<bool, ConvergeError> {
294        let key = proposal.key;
295        let proposals = self.proposals.entry(key).or_default();
296
297        if let Some(existing) = proposals.iter().find(|p| p.id == proposal.id) {
298            if existing.content == proposal.content
299                && existing.confidence() == proposal.confidence()
300                && existing.provenance == proposal.provenance
301            {
302                return Ok(false);
303            }
304            return Err(ConvergeError::Conflict {
305                id: proposal.id.to_string(),
306                existing: existing.content.clone(),
307                new: proposal.content,
308                context: Box::new(self.clone()),
309            });
310        }
311
312        proposals.push(proposal);
313        Ok(true)
314    }
315
316    /// Stages external input as a proposal to be governed by the engine.
317    pub fn add_input(
318        &mut self,
319        key: ContextKey,
320        id: impl Into<ProposalId>,
321        content: impl Into<String>,
322    ) -> Result<bool, ConvergeError> {
323        self.add_input_with_provenance(key, id, content, "context-input")
324    }
325
326    /// Stages external input with explicit provenance.
327    pub fn add_input_with_provenance(
328        &mut self,
329        key: ContextKey,
330        id: impl Into<ProposalId>,
331        content: impl Into<String>,
332        provenance: impl Into<String>,
333    ) -> Result<bool, ConvergeError> {
334        self.add_proposal(ProposedFact::new(key, id, content, provenance))
335    }
336
337    /// Stages a typed external observation as a proposal.
338    ///
339    /// This is the preferred boundary for systems such as Organism. It records
340    /// actor and source provenance, but it does not create authoritative facts.
341    pub fn submit_observation(
342        &mut self,
343        request: AdmissionRequest,
344    ) -> Result<AdmissionReceipt, ConvergeError> {
345        let staged = self.add_proposal(request.clone().into_proposal())?;
346        Ok(AdmissionReceipt::new(&request, staged))
347    }
348
349    /// Drains all pending proposals from the context.
350    pub(crate) fn drain_proposals(&mut self) -> Vec<ProposedFact> {
351        let mut drained = Vec::new();
352        for proposals in self.proposals.values_mut() {
353            drained.append(proposals);
354        }
355        self.proposals.retain(|_, proposals| !proposals.is_empty());
356        drained
357    }
358
359    /// Removes a specific pending proposal if it exists.
360    pub(crate) fn remove_proposal(&mut self, key: ContextKey, id: &ProposalId) {
361        if let Some(proposals) = self.proposals.get_mut(&key) {
362            proposals.retain(|proposal| proposal.id != id);
363            if proposals.is_empty() {
364                self.proposals.remove(&key);
365            }
366        }
367    }
368
369    /// Adds a fact to the context (engine-only, during merge phase).
370    ///
371    /// Returns `Ok(true)` if the fact was new (context changed).
372    /// Returns `Ok(false)` if the fact was already present and identical.
373    pub(crate) fn add_fact(&mut self, fact: ContextFact) -> Result<bool, ConvergeError> {
374        let key = fact.key();
375        let facts = self.facts.entry(key).or_default();
376
377        if let Some(existing) = facts.iter().find(|f| f.id() == fact.id()) {
378            if existing.content() == fact.content() {
379                return Ok(false);
380            }
381            return Err(ConvergeError::Conflict {
382                id: fact.id().to_string(),
383                existing: existing.content().to_string(),
384                new: fact.content().to_string(),
385                context: Box::new(self.clone()),
386            });
387        }
388
389        facts.push(fact);
390        self.proposals.remove(&key);
391        self.dirty_keys.push(key);
392
393        self.version += 1;
394        Ok(true)
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use converge_pack::Context as _;
402
403    #[test]
404    fn empty_context_has_no_facts() {
405        let ctx = ContextState::new();
406        assert!(!ctx.has(ContextKey::Seeds));
407        assert_eq!(ctx.version(), 0);
408    }
409
410    #[test]
411    fn adding_fact_increments_version() {
412        let mut ctx = ContextState::new();
413        let fact = crate::context::new_fact(ContextKey::Seeds, "seed-1", "initial value");
414
415        let changed = ctx.add_fact(fact).expect("should add");
416        assert!(changed);
417        assert_eq!(ctx.version(), 1);
418        assert!(ctx.has(ContextKey::Seeds));
419    }
420
421    #[test]
422    fn duplicate_fact_does_not_change_context() {
423        let mut ctx = ContextState::new();
424        let fact = crate::context::new_fact(ContextKey::Seeds, "seed-1", "initial");
425
426        ctx.add_fact(fact.clone()).expect("should add first");
427        let changed = ctx.add_fact(fact).expect("should not error on duplicate");
428        assert!(!changed);
429        assert_eq!(ctx.version(), 1);
430    }
431
432    #[test]
433    fn dirty_keys_track_new_facts_and_clear() {
434        let mut ctx = ContextState::new();
435        let fact = crate::context::new_fact(ContextKey::Hypotheses, "hyp-1", "value");
436
437        ctx.add_fact(fact).expect("should add");
438        assert_eq!(ctx.dirty_keys(), &[ContextKey::Hypotheses]);
439
440        ctx.clear_dirty();
441        assert!(ctx.dirty_keys().is_empty());
442    }
443
444    #[test]
445    fn detects_conflict() {
446        let mut ctx = ContextState::new();
447        ctx.add_fact(crate::context::new_fact(
448            ContextKey::Seeds,
449            "fact-1",
450            "version A",
451        ))
452        .unwrap();
453
454        let result = ctx.add_fact(crate::context::new_fact(
455            ContextKey::Seeds,
456            "fact-1",
457            "version B",
458        ));
459
460        match result {
461            Err(ConvergeError::Conflict {
462                id, existing, new, ..
463            }) => {
464                assert_eq!(id, "fact-1");
465                assert_eq!(existing, "version A");
466                assert_eq!(new, "version B");
467            }
468            _ => panic!("Expected Conflict error, got {result:?}"),
469        }
470    }
471
472    #[test]
473    fn adding_proposal_tracks_pending_state() {
474        let mut ctx = ContextState::new();
475        let proposal =
476            ProposedFact::new(ContextKey::Hypotheses, "hyp-1", "market is growing", "test");
477
478        assert!(ctx.add_proposal(proposal).unwrap());
479        assert!(ctx.has_pending_proposals());
480        assert_eq!(ctx.get_proposals(ContextKey::Hypotheses).len(), 1);
481    }
482
483    #[test]
484    fn conflicting_staged_inputs_are_rejected_before_promotion() {
485        let mut ctx = ContextState::new();
486
487        assert!(
488            ctx.add_input_with_provenance(ContextKey::Seeds, "seed-1", "version A", "user")
489                .unwrap()
490        );
491
492        let result =
493            ctx.add_input_with_provenance(ContextKey::Seeds, "seed-1", "version B", "user");
494
495        match result {
496            Err(ConvergeError::Conflict {
497                id, existing, new, ..
498            }) => {
499                assert_eq!(id, "seed-1");
500                assert_eq!(existing, "version A");
501                assert_eq!(new, "version B");
502            }
503            _ => panic!("Expected Conflict error, got {result:?}"),
504        }
505
506        assert!(ctx.has_pending_proposals());
507        assert_eq!(ctx.get_proposals(ContextKey::Seeds).len(), 1);
508    }
509
510    #[test]
511    fn snapshot_round_trips_facts_and_proposals() {
512        let mut ctx = ContextState::new();
513        ctx.add_fact(crate::context::new_fact(
514            ContextKey::Seeds,
515            "seed-1",
516            "persisted seed",
517        ))
518        .unwrap();
519        ctx.add_proposal(ProposedFact::new(
520            ContextKey::Hypotheses,
521            "hyp-1",
522            "staged hypothesis",
523            "test",
524        ))
525        .unwrap();
526
527        let restored = ContextState::from_snapshot(ctx.snapshot()).unwrap();
528
529        assert_eq!(restored.version(), 1);
530        assert!(restored.dirty_keys().is_empty());
531        assert_eq!(restored.get(ContextKey::Seeds)[0].id(), "seed-1");
532        assert_eq!(
533            restored.get(ContextKey::Seeds)[0].content(),
534            "persisted seed"
535        );
536        assert_eq!(
537            restored.get_proposals(ContextKey::Hypotheses)[0].id(),
538            "hyp-1"
539        );
540    }
541
542    #[test]
543    fn snapshot_rejects_fact_key_mismatch() {
544        let mut ctx = ContextState::new();
545        ctx.add_fact(crate::context::new_fact(
546            ContextKey::Seeds,
547            "seed-1",
548            "value",
549        ))
550        .unwrap();
551
552        let mut snapshot = ctx.snapshot();
553        let fact = snapshot
554            .facts
555            .get_mut(&ContextKey::Seeds)
556            .unwrap()
557            .pop()
558            .unwrap();
559        snapshot
560            .facts
561            .entry(ContextKey::Signals)
562            .or_default()
563            .push(fact);
564
565        let err = ContextState::from_snapshot(snapshot).unwrap_err();
566        assert!(matches!(err, ConvergeError::InvalidSnapshot { .. }));
567        assert!(err.to_string().contains("stored under Signals"));
568    }
569
570    #[test]
571    fn snapshot_rejects_merkle_mismatch() {
572        let mut ctx = ContextState::new();
573        ctx.add_fact(crate::context::new_fact(
574            ContextKey::Seeds,
575            "seed-1",
576            "value",
577        ))
578        .unwrap();
579
580        let mut snapshot = ctx.snapshot();
581        snapshot.merkle_root =
582            crate::integrity::MerkleRoot(crate::integrity::ContentHash::compute("tampered"));
583
584        let err = ContextState::from_snapshot(snapshot).unwrap_err();
585        assert!(matches!(err, ConvergeError::InvalidSnapshot { .. }));
586        assert!(err.to_string().contains("merkle root"));
587    }
588
589    #[test]
590    fn snapshot_rejects_duplicate_fact_ids() {
591        let mut ctx = ContextState::new();
592        ctx.add_fact(crate::context::new_fact(
593            ContextKey::Seeds,
594            "seed-1",
595            "value",
596        ))
597        .unwrap();
598
599        let mut snapshot = ctx.snapshot();
600        let duplicate = snapshot.facts.get(&ContextKey::Seeds).unwrap()[0].clone();
601        snapshot
602            .facts
603            .get_mut(&ContextKey::Seeds)
604            .unwrap()
605            .push(duplicate);
606
607        let err = ContextState::from_snapshot(snapshot).unwrap_err();
608        assert!(matches!(err, ConvergeError::InvalidSnapshot { .. }));
609        assert!(err.to_string().contains("duplicate fact"));
610    }
611
612    /// Test that Context implements the converge_pack::Context trait.
613    #[test]
614    fn context_implements_trait() {
615        let mut ctx = ContextState::new();
616        ctx.add_fact(crate::context::new_fact(ContextKey::Seeds, "s1", "hello"))
617            .unwrap();
618
619        // Use via trait object
620        let dyn_ctx: &dyn converge_pack::Context = &ctx;
621        assert!(dyn_ctx.has(ContextKey::Seeds));
622        assert_eq!(dyn_ctx.get(ContextKey::Seeds).len(), 1);
623    }
624}