1use std::collections::{HashMap, HashSet};
13use std::sync::Mutex;
14
15use converge_pack::{
16 AgentEffect, ConsensusOutcome, ConsensusRule, Context, ContextFact, ContextKey, Disagreement,
17 ProposedFact, Provenance, ProvenanceSource, Suggestor, TextPayload, Vote, VoteTopicId,
18};
19use serde::{Deserialize, Serialize};
20
21use crate::provenance::ORGANISM_RUNTIME_PROVENANCE;
22
23fn proposed_text_fact(
24 key: ContextKey,
25 id: impl Into<converge_pack::ProposalId>,
26 content: impl Into<String>,
27) -> ProposedFact {
28 ORGANISM_RUNTIME_PROVENANCE.proposed_fact(key, id, TextPayload::new(content))
29}
30
31fn fact_text(fact: &ContextFact) -> &str {
32 fact.text().unwrap_or_default()
33}
34
35#[derive(Debug, Clone, Copy)]
41pub struct RoundConventions {
42 pub round_signal_key: ContextKey,
44 pub round_signal_prefix: &'static str,
46 pub continue_key: ContextKey,
49 pub continue_prefix: &'static str,
51 pub note_key: ContextKey,
55 pub synthesis_key: ContextKey,
57 pub synthesis_prefix: &'static str,
59}
60
61impl RoundConventions {
62 #[must_use]
63 pub const fn default_const() -> Self {
64 Self {
65 round_signal_key: ContextKey::Signals,
66 round_signal_prefix: "round:start:",
67 continue_key: ContextKey::Constraints,
68 continue_prefix: "round:continue:",
69 note_key: ContextKey::Hypotheses,
70 synthesis_key: ContextKey::Strategies,
71 synthesis_prefix: "synthesis:",
72 }
73 }
74
75 fn round_signal_id(&self, round: u8) -> String {
76 format!("{}{round}", self.round_signal_prefix)
77 }
78
79 fn continue_id(&self, round: u8) -> String {
80 format!("{}{round}", self.continue_prefix)
81 }
82
83 fn synthesis_id(&self, round: u8) -> String {
84 format!("{}{round}", self.synthesis_prefix)
85 }
86
87 fn note_belongs_to_round(fact_id: &str, round: u8) -> bool {
88 fact_id.ends_with(&format!(":{round}"))
89 }
90}
91
92impl Default for RoundConventions {
93 fn default() -> Self {
94 Self::default_const()
95 }
96}
97
98pub type TerminalPredicate = Box<dyn Fn(&dyn Context) -> bool + Send + Sync>;
100
101fn never_terminal() -> TerminalPredicate {
102 Box::new(|_ctx| false)
103}
104
105fn has_fact(ctx: &dyn Context, key: ContextKey, id: &str) -> bool {
106 ctx.get(key).iter().any(|fact| fact.id().as_str() == id)
107}
108
109pub struct RoundStarter {
120 max_rounds: u8,
121 conventions: RoundConventions,
122 is_terminal: TerminalPredicate,
123}
124
125impl RoundStarter {
126 #[must_use]
127 pub fn new(max_rounds: u8) -> Self {
128 Self {
129 max_rounds,
130 conventions: RoundConventions::default(),
131 is_terminal: never_terminal(),
132 }
133 }
134
135 #[must_use]
136 pub fn with_conventions(mut self, conventions: RoundConventions) -> Self {
137 self.conventions = conventions;
138 self
139 }
140
141 #[must_use]
144 pub fn with_terminal_predicate<F>(mut self, predicate: F) -> Self
145 where
146 F: Fn(&dyn Context) -> bool + Send + Sync + 'static,
147 {
148 self.is_terminal = Box::new(predicate);
149 self
150 }
151
152 fn next_round_to_emit(&self, ctx: &dyn Context) -> Option<u8> {
153 if !has_fact(
154 ctx,
155 self.conventions.round_signal_key,
156 &self.conventions.round_signal_id(1),
157 ) {
158 return Some(1);
159 }
160 for round in 1..self.max_rounds {
161 if has_fact(
162 ctx,
163 self.conventions.continue_key,
164 &self.conventions.continue_id(round),
165 ) && !has_fact(
166 ctx,
167 self.conventions.round_signal_key,
168 &self.conventions.round_signal_id(round + 1),
169 ) {
170 return Some(round + 1);
171 }
172 }
173 None
174 }
175}
176
177#[async_trait::async_trait]
178impl Suggestor for RoundStarter {
179 fn name(&self) -> &'static str {
180 "organism-round-starter"
181 }
182
183 fn dependencies(&self) -> &[ContextKey] {
184 &[]
185 }
186
187 fn provenance(&self) -> Provenance {
188 ORGANISM_RUNTIME_PROVENANCE.provenance()
189 }
190
191 fn accepts(&self, ctx: &dyn Context) -> bool {
192 if (self.is_terminal)(ctx) {
193 return false;
194 }
195 self.next_round_to_emit(ctx).is_some()
196 }
197
198 async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
199 let Some(round) = self.next_round_to_emit(ctx) else {
200 return AgentEffect::empty();
201 };
202 AgentEffect::with_proposal(proposed_text_fact(
203 self.conventions.round_signal_key,
204 self.conventions.round_signal_id(round),
205 format!("start round {round}"),
206 ))
207 }
208}
209
210#[async_trait::async_trait]
217pub trait SynthesisProducer: Send + Sync {
218 async fn synthesize(
230 &self,
231 round: u8,
232 notes: &[ContextFact],
233 ctx: &dyn Context,
234 ) -> Result<String, String>;
235}
236
237pub struct RoundSynthesizer<P: SynthesisProducer> {
250 expected_note_count: usize,
251 conventions: RoundConventions,
252 producer: P,
253 is_terminal: TerminalPredicate,
254}
255
256impl<P: SynthesisProducer> RoundSynthesizer<P> {
257 #[must_use]
258 pub fn new(expected_note_count: usize, producer: P) -> Self {
259 Self {
260 expected_note_count,
261 conventions: RoundConventions::default(),
262 producer,
263 is_terminal: never_terminal(),
264 }
265 }
266
267 #[must_use]
268 pub fn with_conventions(mut self, conventions: RoundConventions) -> Self {
269 self.conventions = conventions;
270 self
271 }
272
273 #[must_use]
276 pub fn with_terminal_predicate<F>(mut self, predicate: F) -> Self
277 where
278 F: Fn(&dyn Context) -> bool + Send + Sync + 'static,
279 {
280 self.is_terminal = Box::new(predicate);
281 self
282 }
283
284 fn started_rounds(&self, ctx: &dyn Context) -> Vec<u8> {
285 ctx.get(self.conventions.round_signal_key)
286 .iter()
287 .filter_map(|fact| {
288 fact.id()
289 .as_str()
290 .strip_prefix(self.conventions.round_signal_prefix)
291 .and_then(|n| n.parse::<u8>().ok())
292 })
293 .collect()
294 }
295
296 fn count_notes_for_round(&self, ctx: &dyn Context, round: u8) -> usize {
297 ctx.get(self.conventions.note_key)
298 .iter()
299 .filter(|fact| RoundConventions::note_belongs_to_round(fact.id().as_str(), round))
300 .count()
301 }
302
303 fn notes_for_round<'a>(&self, ctx: &'a dyn Context, round: u8) -> Vec<&'a ContextFact> {
304 ctx.get(self.conventions.note_key)
305 .iter()
306 .filter(|fact| RoundConventions::note_belongs_to_round(fact.id().as_str(), round))
307 .collect()
308 }
309
310 fn next_round_needing_synthesis(&self, ctx: &dyn Context) -> Option<u8> {
311 self.started_rounds(ctx).into_iter().find(|round| {
312 !has_fact(
313 ctx,
314 self.conventions.synthesis_key,
315 &self.conventions.synthesis_id(*round),
316 ) && !has_fact(
317 ctx,
318 ContextKey::Diagnostic,
319 &format!("runtime:error:synthesis:{round}"),
320 ) && self.count_notes_for_round(ctx, *round) >= self.expected_note_count
321 })
322 }
323}
324
325#[async_trait::async_trait]
326impl<P: SynthesisProducer> Suggestor for RoundSynthesizer<P> {
327 fn name(&self) -> &'static str {
328 "organism-round-synthesizer"
329 }
330
331 fn dependencies(&self) -> &[ContextKey] {
332 &[ContextKey::Hypotheses]
333 }
334
335 fn provenance(&self) -> Provenance {
336 ORGANISM_RUNTIME_PROVENANCE.provenance()
337 }
338
339 fn accepts(&self, ctx: &dyn Context) -> bool {
340 if (self.is_terminal)(ctx) {
341 return false;
342 }
343 self.next_round_needing_synthesis(ctx).is_some()
344 }
345
346 async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
347 let Some(round) = self.next_round_needing_synthesis(ctx) else {
348 return AgentEffect::empty();
349 };
350
351 let notes: Vec<ContextFact> = self
352 .notes_for_round(ctx, round)
353 .into_iter()
354 .cloned()
355 .collect();
356
357 match self.producer.synthesize(round, ¬es, ctx).await {
358 Ok(content) => AgentEffect::with_proposal(proposed_text_fact(
359 self.conventions.synthesis_key,
360 self.conventions.synthesis_id(round),
361 content,
362 )),
363 Err(err) => AgentEffect::with_proposal(proposed_text_fact(
364 ContextKey::Diagnostic,
365 format!("runtime:error:synthesis:{round}"),
366 err,
367 )),
368 }
369 }
370}
371
372#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
374#[serde(rename_all = "camelCase")]
375pub struct DisagreementMap {
376 pub topic: VoteTopicId,
377 pub entries: Vec<Disagreement>,
378}
379
380pub struct DisagreementMapper {
387 output_key: ContextKey,
388 mapped_topics: Mutex<HashSet<VoteTopicId>>,
389}
390
391impl DisagreementMapper {
392 #[must_use]
393 pub fn new() -> Self {
394 Self {
395 output_key: ContextKey::Diagnostic,
396 mapped_topics: Mutex::new(HashSet::new()),
397 }
398 }
399
400 #[must_use]
401 pub fn with_output_key(mut self, key: ContextKey) -> Self {
402 self.output_key = key;
403 self
404 }
405
406 #[must_use]
407 pub const fn output_key(&self) -> ContextKey {
408 self.output_key
409 }
410
411 fn map_id(topic: &VoteTopicId) -> String {
412 format!("disagreement_map:{}", topic.as_str())
413 }
414}
415
416impl Default for DisagreementMapper {
417 fn default() -> Self {
418 Self::new()
419 }
420}
421
422#[async_trait::async_trait]
423impl Suggestor for DisagreementMapper {
424 fn name(&self) -> &'static str {
425 "organism-disagreement-mapper"
426 }
427
428 fn dependencies(&self) -> &[ContextKey] {
429 &[ContextKey::Disagreements]
430 }
431
432 fn provenance(&self) -> Provenance {
433 ORGANISM_RUNTIME_PROVENANCE.provenance()
434 }
435
436 fn accepts(&self, ctx: &dyn Context) -> bool {
437 if !ctx.has(ContextKey::Disagreements) {
438 return false;
439 }
440 let mapped = self.mapped_topics.lock().unwrap();
441 ctx.get(ContextKey::Disagreements)
442 .iter()
443 .filter_map(|fact| serde_json::from_str::<Disagreement>(fact_text(fact)).ok())
444 .any(|d| !mapped.contains(&d.topic))
445 }
446
447 async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
448 let mut mapped = self.mapped_topics.lock().unwrap();
449
450 let mut by_topic: HashMap<VoteTopicId, Vec<Disagreement>> = HashMap::new();
451 for fact in ctx.get(ContextKey::Disagreements) {
452 let Ok(d) = serde_json::from_str::<Disagreement>(fact_text(fact)) else {
453 continue;
454 };
455 if mapped.contains(&d.topic) {
456 continue;
457 }
458 by_topic.entry(d.topic.clone()).or_default().push(d);
459 }
460
461 let mut topics: Vec<VoteTopicId> = by_topic.keys().cloned().collect();
462 topics.sort();
463
464 let mut proposals = Vec::with_capacity(topics.len());
465 for topic in topics {
466 let entries = by_topic.remove(&topic).unwrap_or_default();
467 let map = DisagreementMap {
468 topic: topic.clone(),
469 entries,
470 };
471 let Ok(content) = serde_json::to_string(&map) else {
472 continue;
473 };
474 proposals.push(proposed_text_fact(
475 self.output_key,
476 Self::map_id(&topic),
477 content,
478 ));
479 mapped.insert(topic);
480 }
481
482 AgentEffect::with_proposals(proposals)
483 }
484}
485
486pub struct ConsensusEvaluator {
496 rule: ConsensusRule,
497 total_voters: usize,
498 decided_topics: Mutex<HashSet<VoteTopicId>>,
499}
500
501impl ConsensusEvaluator {
502 #[must_use]
503 pub fn new(rule: ConsensusRule, total_voters: usize) -> Self {
504 Self {
505 rule,
506 total_voters,
507 decided_topics: Mutex::new(HashSet::new()),
508 }
509 }
510
511 #[must_use]
512 pub const fn rule(&self) -> ConsensusRule {
513 self.rule
514 }
515
516 #[must_use]
517 pub const fn total_voters(&self) -> usize {
518 self.total_voters
519 }
520}
521
522#[async_trait::async_trait]
523impl Suggestor for ConsensusEvaluator {
524 fn name(&self) -> &'static str {
525 "organism-consensus-evaluator"
526 }
527
528 fn dependencies(&self) -> &[ContextKey] {
529 &[ContextKey::Votes]
530 }
531
532 fn provenance(&self) -> Provenance {
533 ORGANISM_RUNTIME_PROVENANCE.provenance()
534 }
535
536 fn accepts(&self, ctx: &dyn Context) -> bool {
537 if !ctx.has(ContextKey::Votes) {
538 return false;
539 }
540 let decided = self.decided_topics.lock().unwrap();
541 ctx.get(ContextKey::Votes)
542 .iter()
543 .filter_map(|fact| serde_json::from_str::<Vote>(fact_text(fact)).ok())
544 .any(|vote| !decided.contains(&vote.topic))
545 }
546
547 async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
548 let mut decided = self.decided_topics.lock().unwrap();
549
550 let mut by_topic: HashMap<VoteTopicId, Vec<Vote>> = HashMap::new();
551 for fact in ctx.get(ContextKey::Votes) {
552 let Ok(vote) = serde_json::from_str::<Vote>(fact_text(fact)) else {
553 continue;
554 };
555 if decided.contains(&vote.topic) {
556 continue;
557 }
558 by_topic.entry(vote.topic.clone()).or_default().push(vote);
559 }
560
561 let mut topics: Vec<VoteTopicId> = by_topic.keys().cloned().collect();
562 topics.sort();
563
564 let mut proposals = Vec::with_capacity(topics.len());
565 let Ok(eligible) = converge_pack::EligibleVoters::new(self.total_voters) else {
566 return AgentEffect::with_proposals(proposals);
567 };
568 for topic in topics {
569 let votes = by_topic.remove(&topic).unwrap_or_default();
570 let Ok(outcome) =
571 ConsensusOutcome::evaluate(topic.clone(), self.rule, &votes, eligible)
572 else {
573 continue;
574 };
575 let Ok(content) = serde_json::to_string(&outcome) else {
576 continue;
577 };
578 proposals.push(proposed_text_fact(
579 ContextKey::ConsensusOutcomes,
580 format!("outcome:{}", topic.as_str()),
581 content,
582 ));
583 decided.insert(topic);
584 }
585
586 AgentEffect::with_proposals(proposals)
587 }
588}
589
590#[cfg(test)]
591mod tests {
592 use super::*;
593 use crate::formation::Formation;
594 use converge_pack::{ActorId, VoteDecision};
595
596 fn vote(topic: &str, voter: &str, decision: VoteDecision) -> Vote {
597 Vote {
598 topic: VoteTopicId::new(topic),
599 voter: ActorId::new(voter),
600 decision,
601 reason: None,
602 }
603 }
604
605 fn formation_with_votes(
606 label: &str,
607 rule: ConsensusRule,
608 total_voters: usize,
609 votes: &[Vote],
610 ) -> Formation {
611 let mut formation =
612 Formation::new(label).agent(ConsensusEvaluator::new(rule, total_voters));
613 for (i, v) in votes.iter().enumerate() {
614 let content = serde_json::to_string(v).unwrap();
615 formation = formation.seed(
616 ContextKey::Votes,
617 format!("vote-{i}"),
618 content,
619 "test-author",
620 );
621 }
622 formation
623 }
624
625 #[tokio::test]
626 async fn emits_outcome_per_topic_under_consensus_outcomes_key() {
627 let votes = [
628 vote("done-r1", "alice", VoteDecision::Yes),
629 vote("done-r1", "bob", VoteDecision::Yes),
630 vote("done-r1", "carol", VoteDecision::No),
631 ];
632 let result = formation_with_votes("majority-pass", ConsensusRule::Majority, 3, &votes)
633 .run()
634 .await
635 .expect("formation should converge");
636
637 let outcomes = result
638 .converge_result
639 .context
640 .get(ContextKey::ConsensusOutcomes);
641 assert_eq!(outcomes.len(), 1);
642 assert_eq!(outcomes[0].id().as_str(), "outcome:done-r1");
643
644 let outcome: ConsensusOutcome = serde_json::from_str(fact_text(&outcomes[0])).unwrap();
645 assert_eq!(outcome.tally().yes_votes(), 2);
646 assert_eq!(outcome.tally().no_votes(), 1);
647 assert_eq!(outcome.total_voters().get(), 3);
648 assert!(outcome.passes());
649 }
650
651 #[tokio::test]
652 async fn evaluates_each_topic_independently() {
653 let votes = [
654 vote("a", "alice", VoteDecision::Yes),
655 vote("a", "bob", VoteDecision::Yes),
656 vote("b", "alice", VoteDecision::No),
657 vote("b", "bob", VoteDecision::No),
658 ];
659 let result = formation_with_votes("split-topics", ConsensusRule::Majority, 2, &votes)
660 .run()
661 .await
662 .expect("formation should converge");
663
664 let outcomes = result
665 .converge_result
666 .context
667 .get(ContextKey::ConsensusOutcomes);
668 assert_eq!(outcomes.len(), 2);
669
670 let mut decisions: std::collections::HashMap<String, ConsensusOutcome> =
671 std::collections::HashMap::new();
672 for fact in outcomes {
673 decisions.insert(
674 fact.id().as_str().to_string(),
675 serde_json::from_str(fact_text(fact)).unwrap(),
676 );
677 }
678 assert!(decisions["outcome:a"].passes());
679 assert!(!decisions["outcome:b"].passes());
680 }
681
682 #[tokio::test]
683 async fn unanimous_rule_blocks_when_any_voter_dissents() {
684 let votes = [
685 vote("ship", "a", VoteDecision::Yes),
686 vote("ship", "b", VoteDecision::No),
687 ];
688 let result = formation_with_votes("unanimous-fail", ConsensusRule::Unanimous, 2, &votes)
689 .run()
690 .await
691 .expect("formation should converge");
692
693 let outcomes = result
694 .converge_result
695 .context
696 .get(ContextKey::ConsensusOutcomes);
697 assert_eq!(outcomes.len(), 1);
698 let outcome: ConsensusOutcome = serde_json::from_str(fact_text(&outcomes[0])).unwrap();
699 assert!(!outcome.passes());
700 }
701
702 #[tokio::test]
703 async fn does_not_emit_when_no_votes_seeded() {
704 let result = Formation::new("no-votes")
705 .agent(ConsensusEvaluator::new(ConsensusRule::Majority, 1))
706 .run()
707 .await
708 .expect("formation should converge");
709
710 assert!(
711 !result
712 .converge_result
713 .context
714 .has(ContextKey::ConsensusOutcomes)
715 );
716 }
717
718 #[tokio::test]
721 async fn round_starter_emits_round_one_when_no_round_has_started() {
722 let result = Formation::new("round-1")
723 .agent(RoundStarter::new(3))
724 .run()
725 .await
726 .expect("formation should converge");
727
728 let signals = result.converge_result.context.get(ContextKey::Signals);
729 assert_eq!(signals.len(), 1);
730 assert_eq!(signals[0].id().as_str(), "round:start:1");
731 }
732
733 #[tokio::test]
734 async fn round_starter_advances_when_continue_marker_lands() {
735 let result = Formation::new("round-2")
736 .agent(RoundStarter::new(3))
737 .seed(
738 ContextKey::Constraints,
739 "round:continue:1",
740 "round 1 voted to continue",
741 "test-author",
742 )
743 .run()
744 .await
745 .expect("formation should converge");
746
747 let mut signal_ids: Vec<&str> = result
748 .converge_result
749 .context
750 .get(ContextKey::Signals)
751 .iter()
752 .map(|f| f.id().as_str())
753 .collect();
754 signal_ids.sort_unstable();
755 assert_eq!(signal_ids, vec!["round:start:1", "round:start:2"]);
756 }
757
758 #[tokio::test]
759 async fn round_starter_stops_at_max_rounds() {
760 let mut formation = Formation::new("max-cap").agent(RoundStarter::new(2));
761 for round in 1..=2 {
762 formation = formation.seed(
763 ContextKey::Constraints,
764 format!("round:continue:{round}"),
765 format!("continue {round}"),
766 "test-author",
767 );
768 }
769 let result = formation.run().await.expect("formation should converge");
770
771 let signals = result.converge_result.context.get(ContextKey::Signals);
772 assert_eq!(signals.len(), 2);
773 assert!(!signals.iter().any(|f| f.id().as_str() == "round:start:3"));
774 }
775
776 #[tokio::test]
777 async fn round_starter_respects_terminal_predicate() {
778 const TERMINAL_ID: &str = "research:complete";
779 let result = Formation::new("terminal-block")
780 .agent(RoundStarter::new(3).with_terminal_predicate(|ctx| {
781 ctx.get(ContextKey::Strategies)
782 .iter()
783 .any(|f| f.id().as_str() == TERMINAL_ID)
784 }))
785 .seed(
786 ContextKey::Strategies,
787 TERMINAL_ID,
788 "research is done",
789 "test-author",
790 )
791 .run()
792 .await
793 .expect("formation should converge");
794
795 assert!(!result.converge_result.context.has(ContextKey::Signals));
796 }
797
798 #[tokio::test]
799 async fn round_starter_honors_custom_conventions() {
800 let conventions = RoundConventions {
801 round_signal_key: ContextKey::Hypotheses,
802 round_signal_prefix: "phase:",
803 continue_key: ContextKey::Strategies,
804 continue_prefix: "phase:next:",
805 note_key: ContextKey::Hypotheses,
806 synthesis_key: ContextKey::Strategies,
807 synthesis_prefix: "phase:synthesis:",
808 };
809 let result = Formation::new("custom-conv")
810 .agent(RoundStarter::new(3).with_conventions(conventions))
811 .seed(
812 ContextKey::Strategies,
813 "phase:next:1",
814 "advance",
815 "test-author",
816 )
817 .run()
818 .await
819 .expect("formation should converge");
820
821 let mut ids: Vec<&str> = result
822 .converge_result
823 .context
824 .get(ContextKey::Hypotheses)
825 .iter()
826 .map(|f| f.id().as_str())
827 .collect();
828 ids.sort_unstable();
829 assert_eq!(ids, vec!["phase:1", "phase:2"]);
830 }
831
832 struct StaticProducer(&'static str);
835
836 #[async_trait::async_trait]
837 impl SynthesisProducer for StaticProducer {
838 async fn synthesize(
839 &self,
840 _round: u8,
841 _notes: &[ContextFact],
842 _ctx: &dyn Context,
843 ) -> Result<String, String> {
844 Ok(self.0.to_string())
845 }
846 }
847
848 struct CountingProducer;
849
850 #[async_trait::async_trait]
851 impl SynthesisProducer for CountingProducer {
852 async fn synthesize(
853 &self,
854 round: u8,
855 notes: &[ContextFact],
856 _ctx: &dyn Context,
857 ) -> Result<String, String> {
858 Ok(format!("round {round} from {} notes", notes.len()))
859 }
860 }
861
862 struct FailingProducer(&'static str);
863
864 #[async_trait::async_trait]
865 impl SynthesisProducer for FailingProducer {
866 async fn synthesize(
867 &self,
868 _round: u8,
869 _notes: &[ContextFact],
870 _ctx: &dyn Context,
871 ) -> Result<String, String> {
872 Err(self.0.to_string())
873 }
874 }
875
876 fn formation_with_round_one_started(label: &str) -> Formation {
877 Formation::new(label).seed(
878 ContextKey::Signals,
879 "round:start:1",
880 "start round 1",
881 "test-author",
882 )
883 }
884
885 #[tokio::test]
886 async fn round_synthesizer_emits_when_notes_complete() {
887 let result = formation_with_round_one_started("synth-complete")
888 .agent(RoundSynthesizer::new(2, CountingProducer))
889 .seed(
890 ContextKey::Hypotheses,
891 "note:alice:1",
892 "alice note",
893 "test-author",
894 )
895 .seed(
896 ContextKey::Hypotheses,
897 "note:bob:1",
898 "bob note",
899 "test-author",
900 )
901 .run()
902 .await
903 .expect("formation should converge");
904
905 let strategies = result.converge_result.context.get(ContextKey::Strategies);
906 assert_eq!(strategies.len(), 1);
907 assert_eq!(strategies[0].id().as_str(), "synthesis:1");
908 assert_eq!(strategies[0].text(), Some("round 1 from 2 notes"));
909 }
910
911 #[tokio::test]
912 async fn round_synthesizer_waits_for_complete_note_count() {
913 let result = formation_with_round_one_started("synth-incomplete")
914 .agent(RoundSynthesizer::new(
915 3,
916 StaticProducer("should-not-appear"),
917 ))
918 .seed(
919 ContextKey::Hypotheses,
920 "note:alice:1",
921 "alice note",
922 "test-author",
923 )
924 .seed(
925 ContextKey::Hypotheses,
926 "note:bob:1",
927 "bob note",
928 "test-author",
929 )
930 .run()
931 .await
932 .expect("formation should converge");
933
934 assert!(!result.converge_result.context.has(ContextKey::Strategies));
935 }
936
937 #[tokio::test]
938 async fn round_synthesizer_routes_producer_errors_to_diagnostic() {
939 let result = formation_with_round_one_started("synth-err")
940 .agent(RoundSynthesizer::new(
941 1,
942 FailingProducer("upstream timeout"),
943 ))
944 .seed(
945 ContextKey::Hypotheses,
946 "note:alice:1",
947 "alice note",
948 "test-author",
949 )
950 .run()
951 .await
952 .expect("formation should converge");
953
954 assert!(!result.converge_result.context.has(ContextKey::Strategies));
955 let diagnostic = result.converge_result.context.get(ContextKey::Diagnostic);
956 assert_eq!(diagnostic.len(), 1);
957 assert_eq!(diagnostic[0].id().as_str(), "runtime:error:synthesis:1");
958 assert_eq!(diagnostic[0].text(), Some("upstream timeout"));
959 }
960
961 #[tokio::test]
962 async fn round_synthesizer_skips_round_with_existing_diagnostic_error() {
963 let result = formation_with_round_one_started("synth-prior-error")
964 .agent(RoundSynthesizer::new(1, StaticProducer("must-not-emit")))
965 .seed(
966 ContextKey::Hypotheses,
967 "note:alice:1",
968 "alice note",
969 "test-author",
970 )
971 .seed(
972 ContextKey::Diagnostic,
973 "runtime:error:synthesis:1",
974 "earlier failure recorded",
975 "test-author",
976 )
977 .run()
978 .await
979 .expect("formation should converge");
980
981 assert!(!result.converge_result.context.has(ContextKey::Strategies));
982 }
983
984 #[tokio::test]
985 async fn round_synthesizer_respects_terminal_predicate() {
986 const TERMINAL_ID: &str = "research:complete";
987 let result = formation_with_round_one_started("synth-terminal")
988 .agent(
989 RoundSynthesizer::new(1, StaticProducer("should-not-appear"))
990 .with_terminal_predicate(|ctx| {
991 ctx.get(ContextKey::Strategies)
992 .iter()
993 .any(|f| f.id().as_str() == TERMINAL_ID)
994 }),
995 )
996 .seed(
997 ContextKey::Hypotheses,
998 "note:alice:1",
999 "alice note",
1000 "test-author",
1001 )
1002 .seed(
1003 ContextKey::Strategies,
1004 TERMINAL_ID,
1005 "research is done",
1006 "test-author",
1007 )
1008 .run()
1009 .await
1010 .expect("formation should converge");
1011
1012 let strategies = result.converge_result.context.get(ContextKey::Strategies);
1013 assert_eq!(strategies.len(), 1);
1014 assert_eq!(strategies[0].id().as_str(), TERMINAL_ID);
1015 }
1016
1017 #[tokio::test]
1018 async fn round_synthesizer_only_synthesizes_started_rounds() {
1019 let result = formation_with_round_one_started("synth-pending-round-2")
1020 .agent(RoundSynthesizer::new(1, StaticProducer("done")))
1021 .seed(
1022 ContextKey::Hypotheses,
1023 "note:alice:1",
1024 "round 1 note",
1025 "test-author",
1026 )
1027 .seed(
1028 ContextKey::Hypotheses,
1029 "note:alice:2",
1030 "round 2 note (not yet started)",
1031 "test-author",
1032 )
1033 .run()
1034 .await
1035 .expect("formation should converge");
1036
1037 let strategies = result.converge_result.context.get(ContextKey::Strategies);
1038 assert_eq!(strategies.len(), 1);
1039 assert_eq!(strategies[0].id().as_str(), "synthesis:1");
1040 }
1041
1042 fn disagreement(topic: &str, dissenter: &str, reason: &str) -> Disagreement {
1045 Disagreement {
1046 topic: VoteTopicId::new(topic),
1047 dissenter: ActorId::new(dissenter),
1048 reason: reason.to_string(),
1049 }
1050 }
1051
1052 fn seed_disagreement(formation: Formation, slot: usize, d: &Disagreement) -> Formation {
1053 let content = serde_json::to_string(d).unwrap();
1054 formation.seed(
1055 ContextKey::Disagreements,
1056 format!("disagreement-{slot}"),
1057 content,
1058 "test-author",
1059 )
1060 }
1061
1062 #[tokio::test]
1063 async fn disagreement_mapper_aggregates_per_topic() {
1064 let alice_on_a = disagreement("topic-a", "alice", "too risky");
1065 let bob_on_a = disagreement("topic-a", "bob", "missing context");
1066 let carol_on_b = disagreement("topic-b", "carol", "missing data");
1067
1068 let mut formation = Formation::new("dmap").agent(DisagreementMapper::new());
1069 formation = seed_disagreement(formation, 0, &alice_on_a);
1070 formation = seed_disagreement(formation, 1, &bob_on_a);
1071 formation = seed_disagreement(formation, 2, &carol_on_b);
1072
1073 let result = formation.run().await.expect("formation should converge");
1074
1075 let maps = result.converge_result.context.get(ContextKey::Diagnostic);
1076 assert_eq!(maps.len(), 2);
1077
1078 let mut by_id: std::collections::HashMap<String, DisagreementMap> =
1079 std::collections::HashMap::new();
1080 for fact in maps {
1081 let parsed: DisagreementMap = serde_json::from_str(fact_text(fact)).unwrap();
1082 by_id.insert(fact.id().as_str().to_string(), parsed);
1083 }
1084 let map_a = &by_id["disagreement_map:topic-a"];
1085 assert_eq!(map_a.entries.len(), 2);
1086 let map_b = &by_id["disagreement_map:topic-b"];
1087 assert_eq!(map_b.entries.len(), 1);
1088 assert_eq!(map_b.entries[0].dissenter.as_str(), "carol");
1089 }
1090
1091 #[tokio::test]
1092 async fn disagreement_mapper_does_nothing_without_disagreements() {
1093 let result = Formation::new("dmap-empty")
1094 .agent(DisagreementMapper::new())
1095 .run()
1096 .await
1097 .expect("formation should converge");
1098
1099 assert!(!result.converge_result.context.has(ContextKey::Diagnostic));
1100 }
1101
1102 #[tokio::test]
1103 async fn disagreement_mapper_honors_custom_output_key() {
1104 let d = disagreement("topic-x", "alice", "too rushed");
1105 let mut formation = Formation::new("dmap-custom")
1106 .agent(DisagreementMapper::new().with_output_key(ContextKey::Strategies));
1107 formation = seed_disagreement(formation, 0, &d);
1108
1109 let result = formation.run().await.expect("formation should converge");
1110
1111 assert!(!result.converge_result.context.has(ContextKey::Diagnostic));
1112 let strategies = result.converge_result.context.get(ContextKey::Strategies);
1113 assert_eq!(strategies.len(), 1);
1114 assert_eq!(strategies[0].id().as_str(), "disagreement_map:topic-x");
1115 }
1116}