1use crate::error::ConvergeError;
11use crate::{AdmissionReceipt, AdmissionRequest};
12use std::collections::{BTreeMap, BTreeSet, HashMap};
13
14pub use converge_pack::{
16 ContextFact, ContextKey, FactId, FactPayload, PayloadError, PayloadRegistry, ProposalId,
17 ProposedFact, Provenance, ProvenanceSource, TextPayload, Timestamp, ValidationError,
18 WireContextFact, WireProposedFact,
19};
20
21#[derive(Copy, Clone, Debug)]
23pub struct ContextInput;
24
25impl ProvenanceSource for ContextInput {
26 fn as_str(&self) -> &'static str {
27 "context-input"
28 }
29}
30
31pub const CONTEXT_INPUT_PROVENANCE: ContextInput = ContextInput;
33
34#[derive(Debug, Clone, serde::Serialize)]
40#[serde(deny_unknown_fields)]
41pub struct ContextSnapshot {
42 version: u64,
43 merkle_root: crate::integrity::MerkleRoot,
44 facts: BTreeMap<ContextKey, Vec<ContextFact>>,
45 proposals: BTreeMap<ContextKey, Vec<ProposedFact>>,
46}
47
48#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
50#[serde(deny_unknown_fields)]
51pub struct WireContextSnapshot {
52 version: u64,
53 merkle_root: crate::integrity::MerkleRoot,
54 facts: BTreeMap<ContextKey, Vec<WireContextFact>>,
55 proposals: BTreeMap<ContextKey, Vec<WireProposedFact>>,
56}
57
58impl ContextSnapshot {
59 #[must_use]
61 pub fn from_context(context: &ContextState) -> Self {
62 let facts = context
63 .facts
64 .iter()
65 .map(|(key, facts)| (*key, facts.clone()))
66 .collect();
67 let proposals = context
68 .proposals
69 .iter()
70 .map(|(key, proposals)| (*key, proposals.clone()))
71 .collect();
72
73 Self {
74 version: context.version,
75 merkle_root: crate::integrity::MerkleRoot::from_context(context),
76 facts,
77 proposals,
78 }
79 }
80
81 #[must_use]
83 pub fn version(&self) -> u64 {
84 self.version
85 }
86
87 #[must_use]
89 pub fn merkle_root(&self) -> &crate::integrity::MerkleRoot {
90 &self.merkle_root
91 }
92
93 #[must_use]
95 pub fn facts(&self) -> &BTreeMap<ContextKey, Vec<ContextFact>> {
96 &self.facts
97 }
98
99 #[must_use]
101 pub fn proposals(&self) -> &BTreeMap<ContextKey, Vec<ProposedFact>> {
102 &self.proposals
103 }
104
105 pub fn to_wire(&self) -> Result<WireContextSnapshot, PayloadError> {
107 let facts = self
108 .facts
109 .iter()
110 .map(|(key, facts)| {
111 facts
112 .iter()
113 .map(ContextFact::to_wire)
114 .collect::<Result<Vec<_>, _>>()
115 .map(|facts| (*key, facts))
116 })
117 .collect::<Result<BTreeMap<_, _>, _>>()?;
118 let proposals = self
119 .proposals
120 .iter()
121 .map(|(key, proposals)| {
122 proposals
123 .iter()
124 .map(ProposedFact::to_wire)
125 .collect::<Result<Vec<_>, _>>()
126 .map(|proposals| (*key, proposals))
127 })
128 .collect::<Result<BTreeMap<_, _>, _>>()?;
129
130 Ok(WireContextSnapshot {
131 version: self.version,
132 merkle_root: self.merkle_root.clone(),
133 facts,
134 proposals,
135 })
136 }
137
138 pub fn from_wire(
140 wire: WireContextSnapshot,
141 registry: &PayloadRegistry,
142 ) -> Result<Self, PayloadError> {
143 let facts = wire
144 .facts
145 .into_iter()
146 .map(|(key, facts)| {
147 facts
148 .into_iter()
149 .map(|fact| ContextFact::from_wire(fact, registry))
150 .collect::<Result<Vec<_>, _>>()
151 .map(|facts| (key, facts))
152 })
153 .collect::<Result<BTreeMap<_, _>, _>>()?;
154 let proposals = wire
155 .proposals
156 .into_iter()
157 .map(|(key, proposals)| {
158 proposals
159 .into_iter()
160 .map(|proposal| ProposedFact::from_wire(proposal, registry))
161 .collect::<Result<Vec<_>, _>>()
162 .map(|proposals| (key, proposals))
163 })
164 .collect::<Result<BTreeMap<_, _>, _>>()?;
165
166 Ok(Self {
167 version: wire.version,
168 merkle_root: wire.merkle_root,
169 facts,
170 proposals,
171 })
172 }
173
174 fn validate(&self) -> Result<(), ConvergeError> {
175 for (key, facts) in &self.facts {
176 let mut seen = BTreeSet::new();
177 for fact in facts {
178 if fact.key() != *key {
179 return Err(ConvergeError::InvalidSnapshot {
180 reason: format!(
181 "fact '{}' stored under {:?} but declares {:?}",
182 fact.id(),
183 key,
184 fact.key()
185 ),
186 });
187 }
188 if !seen.insert(fact.id().clone()) {
189 return Err(ConvergeError::InvalidSnapshot {
190 reason: format!("duplicate fact '{}' under {:?}", fact.id(), key),
191 });
192 }
193 }
194 }
195
196 for (key, proposals) in &self.proposals {
197 let mut seen = BTreeSet::new();
198 for proposal in proposals {
199 if proposal.key() != *key {
200 return Err(ConvergeError::InvalidSnapshot {
201 reason: format!(
202 "proposal '{}' stored under {:?} but declares {:?}",
203 proposal.id(),
204 key,
205 proposal.key()
206 ),
207 });
208 }
209 if !seen.insert(proposal.id().clone()) {
210 return Err(ConvergeError::InvalidSnapshot {
211 reason: format!("duplicate proposal '{}' under {:?}", proposal.id(), key),
212 });
213 }
214 }
215 }
216
217 let context = ContextState {
218 facts: self
219 .facts
220 .iter()
221 .map(|(key, facts)| (*key, facts.clone()))
222 .collect(),
223 proposals: self
224 .proposals
225 .iter()
226 .map(|(key, proposals)| (*key, proposals.clone()))
227 .collect(),
228 dirty_keys: Vec::new(),
229 version: self.version,
230 };
231 let computed_root = crate::integrity::MerkleRoot::from_context(&context);
232 if computed_root != self.merkle_root {
233 return Err(ConvergeError::InvalidSnapshot {
234 reason: "snapshot merkle root does not match restored facts".to_string(),
235 });
236 }
237
238 Ok(())
239 }
240}
241
242pub(crate) fn new_fact(
243 key: ContextKey,
244 id: impl Into<FactId>,
245 content: impl Into<String>,
246) -> ContextFact {
247 new_fact_with_promotion(
248 key,
249 id,
250 TextPayload::new(content),
251 converge_pack::FactPromotionRecord::new_projection(
252 "engine-projection",
253 converge_pack::ContentHash::zero(),
254 converge_pack::FactActor::new_projection(
255 "converge-engine",
256 converge_pack::FactActorKind::System,
257 ),
258 converge_pack::FactValidationSummary::default(),
259 Vec::new(),
260 converge_pack::FactTraceLink::Local(converge_pack::FactLocalTrace::new_projection(
261 "engine-projection",
262 "seed",
263 None,
264 true,
265 )),
266 Timestamp::epoch(),
267 ),
268 Timestamp::epoch(),
269 )
270}
271
272pub(crate) fn new_fact_with_promotion(
273 key: ContextKey,
274 id: impl Into<FactId>,
275 payload: impl FactPayload + PartialEq,
276 promotion_record: converge_pack::FactPromotionRecord,
277 created_at: impl Into<Timestamp>,
278) -> ContextFact {
279 ContextFact::new_projection(key, id, payload, promotion_record, created_at)
280}
281
282#[derive(Debug, Default, Clone, serde::Serialize)]
287pub struct ContextState {
288 facts: HashMap<ContextKey, Vec<ContextFact>>,
290 proposals: HashMap<ContextKey, Vec<ProposedFact>>,
292 dirty_keys: Vec<ContextKey>,
294 version: u64,
296}
297
298impl converge_pack::Context for ContextState {
301 fn has(&self, key: ContextKey) -> bool {
302 self.facts.get(&key).is_some_and(|v| !v.is_empty())
303 }
304
305 fn get(&self, key: ContextKey) -> &[ContextFact] {
306 self.facts.get(&key).map_or(&[], Vec::as_slice)
307 }
308
309 fn get_proposals(&self, key: ContextKey) -> &[ProposedFact] {
310 self.proposals.get(&key).map_or(&[], Vec::as_slice)
311 }
312
313 fn version(&self) -> u64 {
314 self.version
315 }
316}
317
318impl ContextState {
319 #[must_use]
321 pub fn new() -> Self {
322 Self::default()
323 }
324
325 #[must_use]
327 pub fn snapshot(&self) -> ContextSnapshot {
328 ContextSnapshot::from_context(self)
329 }
330
331 pub fn from_snapshot(snapshot: ContextSnapshot) -> Result<Self, ConvergeError> {
337 snapshot.validate()?;
338 Ok(Self {
339 facts: snapshot.facts.into_iter().collect(),
340 proposals: snapshot.proposals.into_iter().collect(),
341 dirty_keys: Vec::new(),
342 version: snapshot.version,
343 })
344 }
345
346 #[must_use]
348 pub fn get(&self, key: ContextKey) -> &[ContextFact] {
349 self.facts.get(&key).map_or(&[], Vec::as_slice)
350 }
351
352 #[must_use]
354 pub fn has(&self, key: ContextKey) -> bool {
355 self.facts.get(&key).is_some_and(|v| !v.is_empty())
356 }
357
358 #[must_use]
360 pub fn version(&self) -> u64 {
361 self.version
362 }
363
364 #[must_use]
366 pub fn dirty_keys(&self) -> &[ContextKey] {
367 &self.dirty_keys
368 }
369
370 #[must_use]
372 pub fn all_keys(&self) -> Vec<ContextKey> {
373 self.facts.keys().copied().collect()
374 }
375
376 #[must_use]
378 pub fn has_pending_proposals(&self) -> bool {
379 self.proposals.values().any(|items| !items.is_empty())
380 }
381
382 pub fn clear_dirty(&mut self) {
384 self.dirty_keys.clear();
385 }
386
387 pub fn add_proposal(&mut self, proposal: ProposedFact) -> Result<bool, ConvergeError> {
392 let key = proposal.key();
393 let proposals = self.proposals.entry(key).or_default();
394
395 if let Some(existing) = proposals.iter().find(|p| p.id() == proposal.id()) {
396 if existing == &proposal {
397 return Ok(false);
398 }
399 return Err(ConvergeError::Conflict {
400 id: proposal.id().to_string(),
401 existing: format!("{existing:?}"),
402 new: format!("{proposal:?}"),
403 context: Box::new(self.clone()),
404 });
405 }
406
407 proposals.push(proposal);
408 Ok(true)
409 }
410
411 pub fn add_input(
413 &mut self,
414 key: ContextKey,
415 id: impl Into<ProposalId>,
416 content: impl Into<String>,
417 ) -> Result<bool, ConvergeError> {
418 self.add_input_with_provenance(key, id, content, CONTEXT_INPUT_PROVENANCE.provenance())
419 }
420
421 pub fn add_input_with_provenance(
423 &mut self,
424 key: ContextKey,
425 id: impl Into<ProposalId>,
426 content: impl Into<String>,
427 provenance: impl Into<Provenance>,
428 ) -> Result<bool, ConvergeError> {
429 self.add_proposal(ProposedFact::new(
430 key,
431 id,
432 TextPayload::new(content),
433 provenance.into(),
434 ))
435 }
436
437 pub fn submit_observation(
442 &mut self,
443 request: AdmissionRequest,
444 ) -> Result<AdmissionReceipt, ConvergeError> {
445 let staged = self.add_proposal(request.clone().into_proposal())?;
446 Ok(AdmissionReceipt::new(&request, staged))
447 }
448
449 pub(crate) fn drain_proposals(&mut self) -> Vec<ProposedFact> {
451 let mut drained = Vec::new();
452 for proposals in self.proposals.values_mut() {
453 drained.append(proposals);
454 }
455 self.proposals.retain(|_, proposals| !proposals.is_empty());
456 drained
457 }
458
459 pub(crate) fn remove_proposal(&mut self, key: ContextKey, id: &ProposalId) {
461 if let Some(proposals) = self.proposals.get_mut(&key) {
462 proposals.retain(|proposal| proposal.id != id);
463 if proposals.is_empty() {
464 self.proposals.remove(&key);
465 }
466 }
467 }
468
469 pub(crate) fn add_fact(&mut self, fact: ContextFact) -> Result<bool, ConvergeError> {
474 let key = fact.key();
475 let facts = self.facts.entry(key).or_default();
476
477 if let Some(existing) = facts.iter().find(|f| f.id() == fact.id()) {
478 if existing == &fact {
479 return Ok(false);
480 }
481 return Err(ConvergeError::Conflict {
482 id: fact.id().to_string(),
483 existing: format!("{existing:?}"),
484 new: format!("{fact:?}"),
485 context: Box::new(self.clone()),
486 });
487 }
488
489 facts.push(fact);
490 self.proposals.remove(&key);
491 self.dirty_keys.push(key);
492
493 self.version += 1;
494 Ok(true)
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501 use converge_pack::Context as _;
502
503 #[test]
504 fn empty_context_has_no_facts() {
505 let ctx = ContextState::new();
506 assert!(!ctx.has(ContextKey::Seeds));
507 assert_eq!(ctx.version(), 0);
508 }
509
510 #[test]
511 fn adding_fact_increments_version() {
512 let mut ctx = ContextState::new();
513 let fact = crate::context::new_fact(ContextKey::Seeds, "seed-1", "initial value");
514
515 let changed = ctx.add_fact(fact).expect("should add");
516 assert!(changed);
517 assert_eq!(ctx.version(), 1);
518 assert!(ctx.has(ContextKey::Seeds));
519 }
520
521 #[test]
522 fn duplicate_fact_does_not_change_context() {
523 let mut ctx = ContextState::new();
524 let fact = crate::context::new_fact(ContextKey::Seeds, "seed-1", "initial");
525
526 ctx.add_fact(fact.clone()).expect("should add first");
527 let changed = ctx.add_fact(fact).expect("should not error on duplicate");
528 assert!(!changed);
529 assert_eq!(ctx.version(), 1);
530 }
531
532 #[test]
533 fn dirty_keys_track_new_facts_and_clear() {
534 let mut ctx = ContextState::new();
535 let fact = crate::context::new_fact(ContextKey::Hypotheses, "hyp-1", "value");
536
537 ctx.add_fact(fact).expect("should add");
538 assert_eq!(ctx.dirty_keys(), &[ContextKey::Hypotheses]);
539
540 ctx.clear_dirty();
541 assert!(ctx.dirty_keys().is_empty());
542 }
543
544 #[test]
545 fn detects_conflict() {
546 let mut ctx = ContextState::new();
547 ctx.add_fact(crate::context::new_fact(
548 ContextKey::Seeds,
549 "fact-1",
550 "version A",
551 ))
552 .unwrap();
553
554 let result = ctx.add_fact(crate::context::new_fact(
555 ContextKey::Seeds,
556 "fact-1",
557 "version B",
558 ));
559
560 match result {
561 Err(ConvergeError::Conflict {
562 id, existing, new, ..
563 }) => {
564 assert_eq!(id, "fact-1");
565 assert!(existing.contains("ContextFact"));
566 assert!(new.contains("ContextFact"));
567 }
568 _ => panic!("Expected Conflict error, got {result:?}"),
569 }
570 }
571
572 #[test]
573 fn adding_proposal_tracks_pending_state() {
574 let mut ctx = ContextState::new();
575 let proposal = ProposedFact::new(
576 ContextKey::Hypotheses,
577 "hyp-1",
578 TextPayload::new("market is growing"),
579 CONTEXT_INPUT_PROVENANCE.provenance(),
580 );
581
582 assert!(ctx.add_proposal(proposal).unwrap());
583 assert!(ctx.has_pending_proposals());
584 assert_eq!(ctx.get_proposals(ContextKey::Hypotheses).len(), 1);
585 }
586
587 #[test]
588 fn conflicting_staged_inputs_are_rejected_before_promotion() {
589 let mut ctx = ContextState::new();
590
591 assert!(
592 ctx.add_input_with_provenance(
593 ContextKey::Seeds,
594 "seed-1",
595 "version A",
596 CONTEXT_INPUT_PROVENANCE.provenance(),
597 )
598 .unwrap()
599 );
600
601 let result = ctx.add_input_with_provenance(
602 ContextKey::Seeds,
603 "seed-1",
604 "version B",
605 CONTEXT_INPUT_PROVENANCE.provenance(),
606 );
607
608 match result {
609 Err(ConvergeError::Conflict {
610 id, existing, new, ..
611 }) => {
612 assert_eq!(id, "seed-1");
613 assert!(existing.contains("ProposedFact"));
614 assert!(new.contains("ProposedFact"));
615 }
616 _ => panic!("Expected Conflict error, got {result:?}"),
617 }
618
619 assert!(ctx.has_pending_proposals());
620 assert_eq!(ctx.get_proposals(ContextKey::Seeds).len(), 1);
621 }
622
623 #[test]
624 fn snapshot_round_trips_facts_and_proposals() {
625 let mut ctx = ContextState::new();
626 ctx.add_fact(crate::context::new_fact(
627 ContextKey::Seeds,
628 "seed-1",
629 "persisted seed",
630 ))
631 .unwrap();
632 ctx.add_proposal(ProposedFact::new(
633 ContextKey::Hypotheses,
634 "hyp-1",
635 TextPayload::new("staged hypothesis"),
636 CONTEXT_INPUT_PROVENANCE.provenance(),
637 ))
638 .unwrap();
639
640 let restored = ContextState::from_snapshot(ctx.snapshot()).unwrap();
641
642 assert_eq!(restored.version(), 1);
643 assert!(restored.dirty_keys().is_empty());
644 assert_eq!(restored.get(ContextKey::Seeds)[0].id(), "seed-1");
645 assert_eq!(
646 restored.get(ContextKey::Seeds)[0].text(),
647 Some("persisted seed")
648 );
649 assert_eq!(
650 restored.get_proposals(ContextKey::Hypotheses)[0].id(),
651 "hyp-1"
652 );
653 }
654
655 #[test]
656 fn wire_snapshot_round_trips_through_payload_registry() {
657 let mut ctx = ContextState::new();
658 ctx.add_fact(crate::context::new_fact(
659 ContextKey::Seeds,
660 "seed-1",
661 "persisted seed",
662 ))
663 .unwrap();
664 ctx.add_proposal(ProposedFact::new(
665 ContextKey::Hypotheses,
666 "hyp-1",
667 TextPayload::new("staged hypothesis"),
668 CONTEXT_INPUT_PROVENANCE.provenance(),
669 ))
670 .unwrap();
671
672 let registry = PayloadRegistry::with_pack_payloads();
673 let wire = ctx.snapshot().to_wire().unwrap();
674 let snapshot = ContextSnapshot::from_wire(wire, ®istry).unwrap();
675 let restored = ContextState::from_snapshot(snapshot).unwrap();
676
677 assert_eq!(
678 restored.get(ContextKey::Seeds)[0].text(),
679 Some("persisted seed")
680 );
681 assert_eq!(
682 restored.get_proposals(ContextKey::Hypotheses)[0].text(),
683 Some("staged hypothesis")
684 );
685 }
686
687 #[test]
688 fn snapshot_rejects_fact_key_mismatch() {
689 let mut ctx = ContextState::new();
690 ctx.add_fact(crate::context::new_fact(
691 ContextKey::Seeds,
692 "seed-1",
693 "value",
694 ))
695 .unwrap();
696
697 let mut snapshot = ctx.snapshot();
698 let fact = snapshot
699 .facts
700 .get_mut(&ContextKey::Seeds)
701 .unwrap()
702 .pop()
703 .unwrap();
704 snapshot
705 .facts
706 .entry(ContextKey::Signals)
707 .or_default()
708 .push(fact);
709
710 let err = ContextState::from_snapshot(snapshot).unwrap_err();
711 assert!(matches!(err, ConvergeError::InvalidSnapshot { .. }));
712 assert!(err.to_string().contains("stored under Signals"));
713 }
714
715 #[test]
716 fn snapshot_rejects_merkle_mismatch() {
717 let mut ctx = ContextState::new();
718 ctx.add_fact(crate::context::new_fact(
719 ContextKey::Seeds,
720 "seed-1",
721 "value",
722 ))
723 .unwrap();
724
725 let mut snapshot = ctx.snapshot();
726 snapshot.merkle_root =
727 crate::integrity::MerkleRoot(crate::integrity::ContentHash::compute("tampered"));
728
729 let err = ContextState::from_snapshot(snapshot).unwrap_err();
730 assert!(matches!(err, ConvergeError::InvalidSnapshot { .. }));
731 assert!(err.to_string().contains("merkle root"));
732 }
733
734 #[test]
735 fn snapshot_rejects_duplicate_fact_ids() {
736 let mut ctx = ContextState::new();
737 ctx.add_fact(crate::context::new_fact(
738 ContextKey::Seeds,
739 "seed-1",
740 "value",
741 ))
742 .unwrap();
743
744 let mut snapshot = ctx.snapshot();
745 let duplicate = snapshot.facts.get(&ContextKey::Seeds).unwrap()[0].clone();
746 snapshot
747 .facts
748 .get_mut(&ContextKey::Seeds)
749 .unwrap()
750 .push(duplicate);
751
752 let err = ContextState::from_snapshot(snapshot).unwrap_err();
753 assert!(matches!(err, ConvergeError::InvalidSnapshot { .. }));
754 assert!(err.to_string().contains("duplicate fact"));
755 }
756
757 #[test]
759 fn context_implements_trait() {
760 let mut ctx = ContextState::new();
761 ctx.add_fact(crate::context::new_fact(ContextKey::Seeds, "s1", "hello"))
762 .unwrap();
763
764 let dyn_ctx: &dyn converge_pack::Context = &ctx;
766 assert!(dyn_ctx.has(ContextKey::Seeds));
767 assert_eq!(dyn_ctx.get(ContextKey::Seeds).len(), 1);
768 }
769}