Skip to main content

organism_runtime/
huddle.rs

1//! Huddle primitives — platform-level deliberation patterns.
2//!
3//! Suggestors here consume governance facts ([`Vote`], [`Disagreement`]) and
4//! produce [`ConsensusOutcome`] facts. They power any flow that needs
5//! collective sign-off: research huddles, vendor-selection panels, approval
6//! gates, multi-agent reviews.
7//!
8//! This module is the upstream home for patterns Wolfgang previously kept in
9//! its `deep_research_runtime`. Domain packs (Wolfgang, Monterro, etc.) compose
10//! these primitives through `Formation` rather than reinventing them.
11
12use std::collections::{HashMap, HashSet};
13use std::sync::Mutex;
14
15use converge_pack::{
16    AgentEffect, ConsensusOutcome, ConsensusRule, Context, ContextFact, ContextKey, Disagreement,
17    ProposedFact, Provenance, ProvenanceSource, Suggestor, TextPayload, Vote, VoteTopicId,
18};
19use serde::{Deserialize, Serialize};
20
21use crate::provenance::ORGANISM_RUNTIME_PROVENANCE;
22
23fn proposed_text_fact(
24    key: ContextKey,
25    id: impl Into<converge_pack::ProposalId>,
26    content: impl Into<String>,
27) -> ProposedFact {
28    ORGANISM_RUNTIME_PROVENANCE.proposed_fact(key, id, TextPayload::new(content))
29}
30
31fn fact_text(fact: &ContextFact) -> &str {
32    fact.text().unwrap_or_default()
33}
34
35/// Fact-id and key conventions for round-based deliberation.
36///
37/// Wolfgang's research huddles, Monterro's diligence loops, and any other
38/// pack that wants round-by-round turn-taking shares this naming so upstream
39/// suggestors can read and write the same facts.
40#[derive(Debug, Clone, Copy)]
41pub struct RoundConventions {
42    /// Where round-start signals live. Default [`ContextKey::Signals`].
43    pub round_signal_key: ContextKey,
44    /// Fact-id prefix for round-start signals. Default `"round:start:"`.
45    pub round_signal_prefix: &'static str,
46    /// Where "this round may continue" markers live. Default
47    /// [`ContextKey::Constraints`].
48    pub continue_key: ContextKey,
49    /// Fact-id prefix for continue markers. Default `"round:continue:"`.
50    pub continue_prefix: &'static str,
51    /// Where per-round contributor notes live. Default
52    /// [`ContextKey::Hypotheses`]. Note ids are expected to end with `:N` for
53    /// round N (matching Wolfgang's `note:{participant}:{round}` shape).
54    pub note_key: ContextKey,
55    /// Where round syntheses live. Default [`ContextKey::Strategies`].
56    pub synthesis_key: ContextKey,
57    /// Fact-id prefix for syntheses. Default `"synthesis:"`.
58    pub synthesis_prefix: &'static str,
59}
60
61impl RoundConventions {
62    #[must_use]
63    pub const fn default_const() -> Self {
64        Self {
65            round_signal_key: ContextKey::Signals,
66            round_signal_prefix: "round:start:",
67            continue_key: ContextKey::Constraints,
68            continue_prefix: "round:continue:",
69            note_key: ContextKey::Hypotheses,
70            synthesis_key: ContextKey::Strategies,
71            synthesis_prefix: "synthesis:",
72        }
73    }
74
75    fn round_signal_id(&self, round: u8) -> String {
76        format!("{}{round}", self.round_signal_prefix)
77    }
78
79    fn continue_id(&self, round: u8) -> String {
80        format!("{}{round}", self.continue_prefix)
81    }
82
83    fn synthesis_id(&self, round: u8) -> String {
84        format!("{}{round}", self.synthesis_prefix)
85    }
86
87    fn note_belongs_to_round(fact_id: &str, round: u8) -> bool {
88        fact_id.ends_with(&format!(":{round}"))
89    }
90}
91
92impl Default for RoundConventions {
93    fn default() -> Self {
94        Self::default_const()
95    }
96}
97
98/// Boxed terminal-state predicate: returns true to halt round emission.
99pub type TerminalPredicate = Box<dyn Fn(&dyn Context) -> bool + Send + Sync>;
100
101fn never_terminal() -> TerminalPredicate {
102    Box::new(|_ctx| false)
103}
104
105fn has_fact(ctx: &dyn Context, key: ContextKey, id: &str) -> bool {
106    ctx.get(key).iter().any(|fact| fact.id().as_str() == id)
107}
108
109/// Drives round-by-round deliberation by emitting `round:start:N` signals.
110///
111/// Round 1 fires when no round has started yet. Round N+1 fires when the
112/// previous round has been marked continue (a fact under
113/// [`RoundConventions::continue_key`] with id `round:continue:N`). Stops when
114/// the configured terminal predicate returns true or `max_rounds` is reached.
115///
116/// Domain packs supply the terminal predicate to express research-specific
117/// completion markers (e.g. Wolfgang's `research:complete` /
118/// `research:max_rounds_reached` facts). The platform stays agnostic.
119pub struct RoundStarter {
120    max_rounds: u8,
121    conventions: RoundConventions,
122    is_terminal: TerminalPredicate,
123}
124
125impl RoundStarter {
126    #[must_use]
127    pub fn new(max_rounds: u8) -> Self {
128        Self {
129            max_rounds,
130            conventions: RoundConventions::default(),
131            is_terminal: never_terminal(),
132        }
133    }
134
135    #[must_use]
136    pub fn with_conventions(mut self, conventions: RoundConventions) -> Self {
137        self.conventions = conventions;
138        self
139    }
140
141    /// Provide a domain-specific terminal-state predicate. Returns `true` to
142    /// stop round emission (e.g. when a research-complete fact is present).
143    #[must_use]
144    pub fn with_terminal_predicate<F>(mut self, predicate: F) -> Self
145    where
146        F: Fn(&dyn Context) -> bool + Send + Sync + 'static,
147    {
148        self.is_terminal = Box::new(predicate);
149        self
150    }
151
152    fn next_round_to_emit(&self, ctx: &dyn Context) -> Option<u8> {
153        if !has_fact(
154            ctx,
155            self.conventions.round_signal_key,
156            &self.conventions.round_signal_id(1),
157        ) {
158            return Some(1);
159        }
160        for round in 1..self.max_rounds {
161            if has_fact(
162                ctx,
163                self.conventions.continue_key,
164                &self.conventions.continue_id(round),
165            ) && !has_fact(
166                ctx,
167                self.conventions.round_signal_key,
168                &self.conventions.round_signal_id(round + 1),
169            ) {
170                return Some(round + 1);
171            }
172        }
173        None
174    }
175}
176
177#[async_trait::async_trait]
178impl Suggestor for RoundStarter {
179    fn name(&self) -> &'static str {
180        "organism-round-starter"
181    }
182
183    fn dependencies(&self) -> &[ContextKey] {
184        &[]
185    }
186
187    fn provenance(&self) -> Provenance {
188        ORGANISM_RUNTIME_PROVENANCE.provenance()
189    }
190
191    fn accepts(&self, ctx: &dyn Context) -> bool {
192        if (self.is_terminal)(ctx) {
193            return false;
194        }
195        self.next_round_to_emit(ctx).is_some()
196    }
197
198    async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
199        let Some(round) = self.next_round_to_emit(ctx) else {
200            return AgentEffect::empty();
201        };
202        AgentEffect::with_proposal(proposed_text_fact(
203            self.conventions.round_signal_key,
204            self.conventions.round_signal_id(round),
205            format!("start round {round}"),
206        ))
207    }
208}
209
210/// Produces a synthesis string from a slice of round notes.
211///
212/// Implementations own the content production — calling LLMs, applying
213/// templates, running rule-based summarizers — while [`RoundSynthesizer`]
214/// owns the orchestration (which round to synthesize, when notes are
215/// complete, where to write the result).
216#[async_trait::async_trait]
217pub trait SynthesisProducer: Send + Sync {
218    /// Synthesize the given round's notes into a single content payload.
219    ///
220    /// `notes` are facts from [`RoundConventions::note_key`] whose ids end
221    /// with `:N` for round `N`, pre-filtered by [`RoundSynthesizer`] for
222    /// convenience. `ctx` exposes the full convergence context so producers
223    /// can pull supplementary signals (external evidence, prior decisions,
224    /// charter constraints) that don't fit in the note slice.
225    ///
226    /// Return the content the engine should publish under the synthesis
227    /// fact, or an error message that the orchestrator will route to
228    /// [`ContextKey::Diagnostic`].
229    async fn synthesize(
230        &self,
231        round: u8,
232        notes: &[ContextFact],
233        ctx: &dyn Context,
234    ) -> Result<String, String>;
235}
236
237/// Drives round-by-round synthesis.
238///
239/// Watches [`RoundConventions::round_signal_key`] for started rounds, counts
240/// notes for each round under [`RoundConventions::note_key`], and once a
241/// round has at least `expected_note_count` notes (and no synthesis yet),
242/// invokes the [`SynthesisProducer`] and emits a synthesis fact under
243/// [`RoundConventions::synthesis_key`].
244///
245/// Errors from the producer are routed to [`ContextKey::Diagnostic`] with a
246/// `runtime:error:synthesis:{round}` id; the same round is then skipped on
247/// subsequent cycles so a flaky producer does not loop. A terminal predicate
248/// can also halt all synthesis (e.g. when research has been judged complete).
249pub struct RoundSynthesizer<P: SynthesisProducer> {
250    expected_note_count: usize,
251    conventions: RoundConventions,
252    producer: P,
253    is_terminal: TerminalPredicate,
254}
255
256impl<P: SynthesisProducer> RoundSynthesizer<P> {
257    #[must_use]
258    pub fn new(expected_note_count: usize, producer: P) -> Self {
259        Self {
260            expected_note_count,
261            conventions: RoundConventions::default(),
262            producer,
263            is_terminal: never_terminal(),
264        }
265    }
266
267    #[must_use]
268    pub fn with_conventions(mut self, conventions: RoundConventions) -> Self {
269        self.conventions = conventions;
270        self
271    }
272
273    /// Provide a domain-specific terminal-state predicate. Returns `true` to
274    /// stop all synthesis (mirrors [`RoundStarter::with_terminal_predicate`]).
275    #[must_use]
276    pub fn with_terminal_predicate<F>(mut self, predicate: F) -> Self
277    where
278        F: Fn(&dyn Context) -> bool + Send + Sync + 'static,
279    {
280        self.is_terminal = Box::new(predicate);
281        self
282    }
283
284    fn started_rounds(&self, ctx: &dyn Context) -> Vec<u8> {
285        ctx.get(self.conventions.round_signal_key)
286            .iter()
287            .filter_map(|fact| {
288                fact.id()
289                    .as_str()
290                    .strip_prefix(self.conventions.round_signal_prefix)
291                    .and_then(|n| n.parse::<u8>().ok())
292            })
293            .collect()
294    }
295
296    fn count_notes_for_round(&self, ctx: &dyn Context, round: u8) -> usize {
297        ctx.get(self.conventions.note_key)
298            .iter()
299            .filter(|fact| RoundConventions::note_belongs_to_round(fact.id().as_str(), round))
300            .count()
301    }
302
303    fn notes_for_round<'a>(&self, ctx: &'a dyn Context, round: u8) -> Vec<&'a ContextFact> {
304        ctx.get(self.conventions.note_key)
305            .iter()
306            .filter(|fact| RoundConventions::note_belongs_to_round(fact.id().as_str(), round))
307            .collect()
308    }
309
310    fn next_round_needing_synthesis(&self, ctx: &dyn Context) -> Option<u8> {
311        self.started_rounds(ctx).into_iter().find(|round| {
312            !has_fact(
313                ctx,
314                self.conventions.synthesis_key,
315                &self.conventions.synthesis_id(*round),
316            ) && !has_fact(
317                ctx,
318                ContextKey::Diagnostic,
319                &format!("runtime:error:synthesis:{round}"),
320            ) && self.count_notes_for_round(ctx, *round) >= self.expected_note_count
321        })
322    }
323}
324
325#[async_trait::async_trait]
326impl<P: SynthesisProducer> Suggestor for RoundSynthesizer<P> {
327    fn name(&self) -> &'static str {
328        "organism-round-synthesizer"
329    }
330
331    fn dependencies(&self) -> &[ContextKey] {
332        &[ContextKey::Hypotheses]
333    }
334
335    fn provenance(&self) -> Provenance {
336        ORGANISM_RUNTIME_PROVENANCE.provenance()
337    }
338
339    fn accepts(&self, ctx: &dyn Context) -> bool {
340        if (self.is_terminal)(ctx) {
341            return false;
342        }
343        self.next_round_needing_synthesis(ctx).is_some()
344    }
345
346    async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
347        let Some(round) = self.next_round_needing_synthesis(ctx) else {
348            return AgentEffect::empty();
349        };
350
351        let notes: Vec<ContextFact> = self
352            .notes_for_round(ctx, round)
353            .into_iter()
354            .cloned()
355            .collect();
356
357        match self.producer.synthesize(round, &notes, ctx).await {
358            Ok(content) => AgentEffect::with_proposal(proposed_text_fact(
359                self.conventions.synthesis_key,
360                self.conventions.synthesis_id(round),
361                content,
362            )),
363            Err(err) => AgentEffect::with_proposal(proposed_text_fact(
364                ContextKey::Diagnostic,
365                format!("runtime:error:synthesis:{round}"),
366                err,
367            )),
368        }
369    }
370}
371
372/// Aggregated dissent payload emitted by [`DisagreementMapper`].
373#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
374#[serde(rename_all = "camelCase")]
375pub struct DisagreementMap {
376    pub topic: VoteTopicId,
377    pub entries: Vec<Disagreement>,
378}
379
380/// Aggregates [`Disagreement`] facts under [`ContextKey::Disagreements`] into
381/// per-topic [`DisagreementMap`] payloads.
382///
383/// Output goes under a configurable key (default [`ContextKey::Diagnostic`])
384/// with id `disagreement_map:{topic}`. Maps are emitted at most once per
385/// topic — the same once-per-topic discipline as [`ConsensusEvaluator`].
386pub struct DisagreementMapper {
387    output_key: ContextKey,
388    mapped_topics: Mutex<HashSet<VoteTopicId>>,
389}
390
391impl DisagreementMapper {
392    #[must_use]
393    pub fn new() -> Self {
394        Self {
395            output_key: ContextKey::Diagnostic,
396            mapped_topics: Mutex::new(HashSet::new()),
397        }
398    }
399
400    #[must_use]
401    pub fn with_output_key(mut self, key: ContextKey) -> Self {
402        self.output_key = key;
403        self
404    }
405
406    #[must_use]
407    pub const fn output_key(&self) -> ContextKey {
408        self.output_key
409    }
410
411    fn map_id(topic: &VoteTopicId) -> String {
412        format!("disagreement_map:{}", topic.as_str())
413    }
414}
415
416impl Default for DisagreementMapper {
417    fn default() -> Self {
418        Self::new()
419    }
420}
421
422#[async_trait::async_trait]
423impl Suggestor for DisagreementMapper {
424    fn name(&self) -> &'static str {
425        "organism-disagreement-mapper"
426    }
427
428    fn dependencies(&self) -> &[ContextKey] {
429        &[ContextKey::Disagreements]
430    }
431
432    fn provenance(&self) -> Provenance {
433        ORGANISM_RUNTIME_PROVENANCE.provenance()
434    }
435
436    fn accepts(&self, ctx: &dyn Context) -> bool {
437        if !ctx.has(ContextKey::Disagreements) {
438            return false;
439        }
440        let mapped = self.mapped_topics.lock().unwrap();
441        ctx.get(ContextKey::Disagreements)
442            .iter()
443            .filter_map(|fact| serde_json::from_str::<Disagreement>(fact_text(fact)).ok())
444            .any(|d| !mapped.contains(&d.topic))
445    }
446
447    async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
448        let mut mapped = self.mapped_topics.lock().unwrap();
449
450        let mut by_topic: HashMap<VoteTopicId, Vec<Disagreement>> = HashMap::new();
451        for fact in ctx.get(ContextKey::Disagreements) {
452            let Ok(d) = serde_json::from_str::<Disagreement>(fact_text(fact)) else {
453                continue;
454            };
455            if mapped.contains(&d.topic) {
456                continue;
457            }
458            by_topic.entry(d.topic.clone()).or_default().push(d);
459        }
460
461        let mut topics: Vec<VoteTopicId> = by_topic.keys().cloned().collect();
462        topics.sort();
463
464        let mut proposals = Vec::with_capacity(topics.len());
465        for topic in topics {
466            let entries = by_topic.remove(&topic).unwrap_or_default();
467            let map = DisagreementMap {
468                topic: topic.clone(),
469                entries,
470            };
471            let Ok(content) = serde_json::to_string(&map) else {
472                continue;
473            };
474            proposals.push(proposed_text_fact(
475                self.output_key,
476                Self::map_id(&topic),
477                content,
478            ));
479            mapped.insert(topic);
480        }
481
482        AgentEffect::with_proposals(proposals)
483    }
484}
485
486/// Tallies [`Vote`] facts under [`ContextKey::Votes`] against a
487/// [`ConsensusRule`] and emits [`ConsensusOutcome`] facts under
488/// [`ContextKey::ConsensusOutcomes`].
489///
490/// Vote payloads are read as JSON-serialized [`Vote`] structs from each fact's
491/// content. Outcomes are emitted at most once per topic — once a topic has an
492/// outcome, additional votes on that topic are ignored. This matches the
493/// round-based deliberation pattern: each round has its own topic id, so
494/// re-tallying happens by issuing a new topic, not by amending an old one.
495pub struct ConsensusEvaluator {
496    rule: ConsensusRule,
497    total_voters: usize,
498    decided_topics: Mutex<HashSet<VoteTopicId>>,
499}
500
501impl ConsensusEvaluator {
502    #[must_use]
503    pub fn new(rule: ConsensusRule, total_voters: usize) -> Self {
504        Self {
505            rule,
506            total_voters,
507            decided_topics: Mutex::new(HashSet::new()),
508        }
509    }
510
511    #[must_use]
512    pub const fn rule(&self) -> ConsensusRule {
513        self.rule
514    }
515
516    #[must_use]
517    pub const fn total_voters(&self) -> usize {
518        self.total_voters
519    }
520}
521
522#[async_trait::async_trait]
523impl Suggestor for ConsensusEvaluator {
524    fn name(&self) -> &'static str {
525        "organism-consensus-evaluator"
526    }
527
528    fn dependencies(&self) -> &[ContextKey] {
529        &[ContextKey::Votes]
530    }
531
532    fn provenance(&self) -> Provenance {
533        ORGANISM_RUNTIME_PROVENANCE.provenance()
534    }
535
536    fn accepts(&self, ctx: &dyn Context) -> bool {
537        if !ctx.has(ContextKey::Votes) {
538            return false;
539        }
540        let decided = self.decided_topics.lock().unwrap();
541        ctx.get(ContextKey::Votes)
542            .iter()
543            .filter_map(|fact| serde_json::from_str::<Vote>(fact_text(fact)).ok())
544            .any(|vote| !decided.contains(&vote.topic))
545    }
546
547    async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
548        let mut decided = self.decided_topics.lock().unwrap();
549
550        let mut by_topic: HashMap<VoteTopicId, Vec<Vote>> = HashMap::new();
551        for fact in ctx.get(ContextKey::Votes) {
552            let Ok(vote) = serde_json::from_str::<Vote>(fact_text(fact)) else {
553                continue;
554            };
555            if decided.contains(&vote.topic) {
556                continue;
557            }
558            by_topic.entry(vote.topic.clone()).or_default().push(vote);
559        }
560
561        let mut topics: Vec<VoteTopicId> = by_topic.keys().cloned().collect();
562        topics.sort();
563
564        let mut proposals = Vec::with_capacity(topics.len());
565        let Ok(eligible) = converge_pack::EligibleVoters::new(self.total_voters) else {
566            return AgentEffect::with_proposals(proposals);
567        };
568        for topic in topics {
569            let votes = by_topic.remove(&topic).unwrap_or_default();
570            let Ok(outcome) =
571                ConsensusOutcome::evaluate(topic.clone(), self.rule, &votes, eligible)
572            else {
573                continue;
574            };
575            let Ok(content) = serde_json::to_string(&outcome) else {
576                continue;
577            };
578            proposals.push(proposed_text_fact(
579                ContextKey::ConsensusOutcomes,
580                format!("outcome:{}", topic.as_str()),
581                content,
582            ));
583            decided.insert(topic);
584        }
585
586        AgentEffect::with_proposals(proposals)
587    }
588}
589
590#[cfg(test)]
591mod tests {
592    use super::*;
593    use crate::formation::Formation;
594    use converge_pack::{ActorId, VoteDecision};
595
596    fn vote(topic: &str, voter: &str, decision: VoteDecision) -> Vote {
597        Vote {
598            topic: VoteTopicId::new(topic),
599            voter: ActorId::new(voter),
600            decision,
601            reason: None,
602        }
603    }
604
605    fn formation_with_votes(
606        label: &str,
607        rule: ConsensusRule,
608        total_voters: usize,
609        votes: &[Vote],
610    ) -> Formation {
611        let mut formation =
612            Formation::new(label).agent(ConsensusEvaluator::new(rule, total_voters));
613        for (i, v) in votes.iter().enumerate() {
614            let content = serde_json::to_string(v).unwrap();
615            formation = formation.seed(
616                ContextKey::Votes,
617                format!("vote-{i}"),
618                content,
619                "test-author",
620            );
621        }
622        formation
623    }
624
625    #[tokio::test]
626    async fn emits_outcome_per_topic_under_consensus_outcomes_key() {
627        let votes = [
628            vote("done-r1", "alice", VoteDecision::Yes),
629            vote("done-r1", "bob", VoteDecision::Yes),
630            vote("done-r1", "carol", VoteDecision::No),
631        ];
632        let result = formation_with_votes("majority-pass", ConsensusRule::Majority, 3, &votes)
633            .run()
634            .await
635            .expect("formation should converge");
636
637        let outcomes = result
638            .converge_result
639            .context
640            .get(ContextKey::ConsensusOutcomes);
641        assert_eq!(outcomes.len(), 1);
642        assert_eq!(outcomes[0].id().as_str(), "outcome:done-r1");
643
644        let outcome: ConsensusOutcome = serde_json::from_str(fact_text(&outcomes[0])).unwrap();
645        assert_eq!(outcome.tally().yes_votes(), 2);
646        assert_eq!(outcome.tally().no_votes(), 1);
647        assert_eq!(outcome.total_voters().get(), 3);
648        assert!(outcome.passes());
649    }
650
651    #[tokio::test]
652    async fn evaluates_each_topic_independently() {
653        let votes = [
654            vote("a", "alice", VoteDecision::Yes),
655            vote("a", "bob", VoteDecision::Yes),
656            vote("b", "alice", VoteDecision::No),
657            vote("b", "bob", VoteDecision::No),
658        ];
659        let result = formation_with_votes("split-topics", ConsensusRule::Majority, 2, &votes)
660            .run()
661            .await
662            .expect("formation should converge");
663
664        let outcomes = result
665            .converge_result
666            .context
667            .get(ContextKey::ConsensusOutcomes);
668        assert_eq!(outcomes.len(), 2);
669
670        let mut decisions: std::collections::HashMap<String, ConsensusOutcome> =
671            std::collections::HashMap::new();
672        for fact in outcomes {
673            decisions.insert(
674                fact.id().as_str().to_string(),
675                serde_json::from_str(fact_text(fact)).unwrap(),
676            );
677        }
678        assert!(decisions["outcome:a"].passes());
679        assert!(!decisions["outcome:b"].passes());
680    }
681
682    #[tokio::test]
683    async fn unanimous_rule_blocks_when_any_voter_dissents() {
684        let votes = [
685            vote("ship", "a", VoteDecision::Yes),
686            vote("ship", "b", VoteDecision::No),
687        ];
688        let result = formation_with_votes("unanimous-fail", ConsensusRule::Unanimous, 2, &votes)
689            .run()
690            .await
691            .expect("formation should converge");
692
693        let outcomes = result
694            .converge_result
695            .context
696            .get(ContextKey::ConsensusOutcomes);
697        assert_eq!(outcomes.len(), 1);
698        let outcome: ConsensusOutcome = serde_json::from_str(fact_text(&outcomes[0])).unwrap();
699        assert!(!outcome.passes());
700    }
701
702    #[tokio::test]
703    async fn does_not_emit_when_no_votes_seeded() {
704        let result = Formation::new("no-votes")
705            .agent(ConsensusEvaluator::new(ConsensusRule::Majority, 1))
706            .run()
707            .await
708            .expect("formation should converge");
709
710        assert!(
711            !result
712                .converge_result
713                .context
714                .has(ContextKey::ConsensusOutcomes)
715        );
716    }
717
718    // ── RoundStarter ──────────────────────────────────────────────
719
720    #[tokio::test]
721    async fn round_starter_emits_round_one_when_no_round_has_started() {
722        let result = Formation::new("round-1")
723            .agent(RoundStarter::new(3))
724            .run()
725            .await
726            .expect("formation should converge");
727
728        let signals = result.converge_result.context.get(ContextKey::Signals);
729        assert_eq!(signals.len(), 1);
730        assert_eq!(signals[0].id().as_str(), "round:start:1");
731    }
732
733    #[tokio::test]
734    async fn round_starter_advances_when_continue_marker_lands() {
735        let result = Formation::new("round-2")
736            .agent(RoundStarter::new(3))
737            .seed(
738                ContextKey::Constraints,
739                "round:continue:1",
740                "round 1 voted to continue",
741                "test-author",
742            )
743            .run()
744            .await
745            .expect("formation should converge");
746
747        let mut signal_ids: Vec<&str> = result
748            .converge_result
749            .context
750            .get(ContextKey::Signals)
751            .iter()
752            .map(|f| f.id().as_str())
753            .collect();
754        signal_ids.sort_unstable();
755        assert_eq!(signal_ids, vec!["round:start:1", "round:start:2"]);
756    }
757
758    #[tokio::test]
759    async fn round_starter_stops_at_max_rounds() {
760        let mut formation = Formation::new("max-cap").agent(RoundStarter::new(2));
761        for round in 1..=2 {
762            formation = formation.seed(
763                ContextKey::Constraints,
764                format!("round:continue:{round}"),
765                format!("continue {round}"),
766                "test-author",
767            );
768        }
769        let result = formation.run().await.expect("formation should converge");
770
771        let signals = result.converge_result.context.get(ContextKey::Signals);
772        assert_eq!(signals.len(), 2);
773        assert!(!signals.iter().any(|f| f.id().as_str() == "round:start:3"));
774    }
775
776    #[tokio::test]
777    async fn round_starter_respects_terminal_predicate() {
778        const TERMINAL_ID: &str = "research:complete";
779        let result = Formation::new("terminal-block")
780            .agent(RoundStarter::new(3).with_terminal_predicate(|ctx| {
781                ctx.get(ContextKey::Strategies)
782                    .iter()
783                    .any(|f| f.id().as_str() == TERMINAL_ID)
784            }))
785            .seed(
786                ContextKey::Strategies,
787                TERMINAL_ID,
788                "research is done",
789                "test-author",
790            )
791            .run()
792            .await
793            .expect("formation should converge");
794
795        assert!(!result.converge_result.context.has(ContextKey::Signals));
796    }
797
798    #[tokio::test]
799    async fn round_starter_honors_custom_conventions() {
800        let conventions = RoundConventions {
801            round_signal_key: ContextKey::Hypotheses,
802            round_signal_prefix: "phase:",
803            continue_key: ContextKey::Strategies,
804            continue_prefix: "phase:next:",
805            note_key: ContextKey::Hypotheses,
806            synthesis_key: ContextKey::Strategies,
807            synthesis_prefix: "phase:synthesis:",
808        };
809        let result = Formation::new("custom-conv")
810            .agent(RoundStarter::new(3).with_conventions(conventions))
811            .seed(
812                ContextKey::Strategies,
813                "phase:next:1",
814                "advance",
815                "test-author",
816            )
817            .run()
818            .await
819            .expect("formation should converge");
820
821        let mut ids: Vec<&str> = result
822            .converge_result
823            .context
824            .get(ContextKey::Hypotheses)
825            .iter()
826            .map(|f| f.id().as_str())
827            .collect();
828        ids.sort_unstable();
829        assert_eq!(ids, vec!["phase:1", "phase:2"]);
830    }
831
832    // ── RoundSynthesizer ──────────────────────────────────────────
833
834    struct StaticProducer(&'static str);
835
836    #[async_trait::async_trait]
837    impl SynthesisProducer for StaticProducer {
838        async fn synthesize(
839            &self,
840            _round: u8,
841            _notes: &[ContextFact],
842            _ctx: &dyn Context,
843        ) -> Result<String, String> {
844            Ok(self.0.to_string())
845        }
846    }
847
848    struct CountingProducer;
849
850    #[async_trait::async_trait]
851    impl SynthesisProducer for CountingProducer {
852        async fn synthesize(
853            &self,
854            round: u8,
855            notes: &[ContextFact],
856            _ctx: &dyn Context,
857        ) -> Result<String, String> {
858            Ok(format!("round {round} from {} notes", notes.len()))
859        }
860    }
861
862    struct FailingProducer(&'static str);
863
864    #[async_trait::async_trait]
865    impl SynthesisProducer for FailingProducer {
866        async fn synthesize(
867            &self,
868            _round: u8,
869            _notes: &[ContextFact],
870            _ctx: &dyn Context,
871        ) -> Result<String, String> {
872            Err(self.0.to_string())
873        }
874    }
875
876    fn formation_with_round_one_started(label: &str) -> Formation {
877        Formation::new(label).seed(
878            ContextKey::Signals,
879            "round:start:1",
880            "start round 1",
881            "test-author",
882        )
883    }
884
885    #[tokio::test]
886    async fn round_synthesizer_emits_when_notes_complete() {
887        let result = formation_with_round_one_started("synth-complete")
888            .agent(RoundSynthesizer::new(2, CountingProducer))
889            .seed(
890                ContextKey::Hypotheses,
891                "note:alice:1",
892                "alice note",
893                "test-author",
894            )
895            .seed(
896                ContextKey::Hypotheses,
897                "note:bob:1",
898                "bob note",
899                "test-author",
900            )
901            .run()
902            .await
903            .expect("formation should converge");
904
905        let strategies = result.converge_result.context.get(ContextKey::Strategies);
906        assert_eq!(strategies.len(), 1);
907        assert_eq!(strategies[0].id().as_str(), "synthesis:1");
908        assert_eq!(strategies[0].text(), Some("round 1 from 2 notes"));
909    }
910
911    #[tokio::test]
912    async fn round_synthesizer_waits_for_complete_note_count() {
913        let result = formation_with_round_one_started("synth-incomplete")
914            .agent(RoundSynthesizer::new(
915                3,
916                StaticProducer("should-not-appear"),
917            ))
918            .seed(
919                ContextKey::Hypotheses,
920                "note:alice:1",
921                "alice note",
922                "test-author",
923            )
924            .seed(
925                ContextKey::Hypotheses,
926                "note:bob:1",
927                "bob note",
928                "test-author",
929            )
930            .run()
931            .await
932            .expect("formation should converge");
933
934        assert!(!result.converge_result.context.has(ContextKey::Strategies));
935    }
936
937    #[tokio::test]
938    async fn round_synthesizer_routes_producer_errors_to_diagnostic() {
939        let result = formation_with_round_one_started("synth-err")
940            .agent(RoundSynthesizer::new(
941                1,
942                FailingProducer("upstream timeout"),
943            ))
944            .seed(
945                ContextKey::Hypotheses,
946                "note:alice:1",
947                "alice note",
948                "test-author",
949            )
950            .run()
951            .await
952            .expect("formation should converge");
953
954        assert!(!result.converge_result.context.has(ContextKey::Strategies));
955        let diagnostic = result.converge_result.context.get(ContextKey::Diagnostic);
956        assert_eq!(diagnostic.len(), 1);
957        assert_eq!(diagnostic[0].id().as_str(), "runtime:error:synthesis:1");
958        assert_eq!(diagnostic[0].text(), Some("upstream timeout"));
959    }
960
961    #[tokio::test]
962    async fn round_synthesizer_skips_round_with_existing_diagnostic_error() {
963        let result = formation_with_round_one_started("synth-prior-error")
964            .agent(RoundSynthesizer::new(1, StaticProducer("must-not-emit")))
965            .seed(
966                ContextKey::Hypotheses,
967                "note:alice:1",
968                "alice note",
969                "test-author",
970            )
971            .seed(
972                ContextKey::Diagnostic,
973                "runtime:error:synthesis:1",
974                "earlier failure recorded",
975                "test-author",
976            )
977            .run()
978            .await
979            .expect("formation should converge");
980
981        assert!(!result.converge_result.context.has(ContextKey::Strategies));
982    }
983
984    #[tokio::test]
985    async fn round_synthesizer_respects_terminal_predicate() {
986        const TERMINAL_ID: &str = "research:complete";
987        let result = formation_with_round_one_started("synth-terminal")
988            .agent(
989                RoundSynthesizer::new(1, StaticProducer("should-not-appear"))
990                    .with_terminal_predicate(|ctx| {
991                        ctx.get(ContextKey::Strategies)
992                            .iter()
993                            .any(|f| f.id().as_str() == TERMINAL_ID)
994                    }),
995            )
996            .seed(
997                ContextKey::Hypotheses,
998                "note:alice:1",
999                "alice note",
1000                "test-author",
1001            )
1002            .seed(
1003                ContextKey::Strategies,
1004                TERMINAL_ID,
1005                "research is done",
1006                "test-author",
1007            )
1008            .run()
1009            .await
1010            .expect("formation should converge");
1011
1012        let strategies = result.converge_result.context.get(ContextKey::Strategies);
1013        assert_eq!(strategies.len(), 1);
1014        assert_eq!(strategies[0].id().as_str(), TERMINAL_ID);
1015    }
1016
1017    #[tokio::test]
1018    async fn round_synthesizer_only_synthesizes_started_rounds() {
1019        let result = formation_with_round_one_started("synth-pending-round-2")
1020            .agent(RoundSynthesizer::new(1, StaticProducer("done")))
1021            .seed(
1022                ContextKey::Hypotheses,
1023                "note:alice:1",
1024                "round 1 note",
1025                "test-author",
1026            )
1027            .seed(
1028                ContextKey::Hypotheses,
1029                "note:alice:2",
1030                "round 2 note (not yet started)",
1031                "test-author",
1032            )
1033            .run()
1034            .await
1035            .expect("formation should converge");
1036
1037        let strategies = result.converge_result.context.get(ContextKey::Strategies);
1038        assert_eq!(strategies.len(), 1);
1039        assert_eq!(strategies[0].id().as_str(), "synthesis:1");
1040    }
1041
1042    // ── DisagreementMapper ────────────────────────────────────────
1043
1044    fn disagreement(topic: &str, dissenter: &str, reason: &str) -> Disagreement {
1045        Disagreement {
1046            topic: VoteTopicId::new(topic),
1047            dissenter: ActorId::new(dissenter),
1048            reason: reason.to_string(),
1049        }
1050    }
1051
1052    fn seed_disagreement(formation: Formation, slot: usize, d: &Disagreement) -> Formation {
1053        let content = serde_json::to_string(d).unwrap();
1054        formation.seed(
1055            ContextKey::Disagreements,
1056            format!("disagreement-{slot}"),
1057            content,
1058            "test-author",
1059        )
1060    }
1061
1062    #[tokio::test]
1063    async fn disagreement_mapper_aggregates_per_topic() {
1064        let alice_on_a = disagreement("topic-a", "alice", "too risky");
1065        let bob_on_a = disagreement("topic-a", "bob", "missing context");
1066        let carol_on_b = disagreement("topic-b", "carol", "missing data");
1067
1068        let mut formation = Formation::new("dmap").agent(DisagreementMapper::new());
1069        formation = seed_disagreement(formation, 0, &alice_on_a);
1070        formation = seed_disagreement(formation, 1, &bob_on_a);
1071        formation = seed_disagreement(formation, 2, &carol_on_b);
1072
1073        let result = formation.run().await.expect("formation should converge");
1074
1075        let maps = result.converge_result.context.get(ContextKey::Diagnostic);
1076        assert_eq!(maps.len(), 2);
1077
1078        let mut by_id: std::collections::HashMap<String, DisagreementMap> =
1079            std::collections::HashMap::new();
1080        for fact in maps {
1081            let parsed: DisagreementMap = serde_json::from_str(fact_text(fact)).unwrap();
1082            by_id.insert(fact.id().as_str().to_string(), parsed);
1083        }
1084        let map_a = &by_id["disagreement_map:topic-a"];
1085        assert_eq!(map_a.entries.len(), 2);
1086        let map_b = &by_id["disagreement_map:topic-b"];
1087        assert_eq!(map_b.entries.len(), 1);
1088        assert_eq!(map_b.entries[0].dissenter.as_str(), "carol");
1089    }
1090
1091    #[tokio::test]
1092    async fn disagreement_mapper_does_nothing_without_disagreements() {
1093        let result = Formation::new("dmap-empty")
1094            .agent(DisagreementMapper::new())
1095            .run()
1096            .await
1097            .expect("formation should converge");
1098
1099        assert!(!result.converge_result.context.has(ContextKey::Diagnostic));
1100    }
1101
1102    #[tokio::test]
1103    async fn disagreement_mapper_honors_custom_output_key() {
1104        let d = disagreement("topic-x", "alice", "too rushed");
1105        let mut formation = Formation::new("dmap-custom")
1106            .agent(DisagreementMapper::new().with_output_key(ContextKey::Strategies));
1107        formation = seed_disagreement(formation, 0, &d);
1108
1109        let result = formation.run().await.expect("formation should converge");
1110
1111        assert!(!result.converge_result.context.has(ContextKey::Diagnostic));
1112        let strategies = result.converge_result.context.get(ContextKey::Strategies);
1113        assert_eq!(strategies.len(), 1);
1114        assert_eq!(strategies[0].id().as_str(), "disagreement_map:topic-x");
1115    }
1116}