Skip to main content

git_paw/broker/
learnings.rs

1//! Broker-internal learnings aggregator.
2//!
3//! Observes the broker's publish-event stream and accumulates per-session
4//! signals — stuck durations, recovery cycles, conflict events, and
5//! permission patterns — that are flushed as human-readable bullets to
6//! `.git-paw/session-learnings.md`.
7//!
8//! The aggregator does NOT publish back into the broker; the markdown file
9//! is its only output sink. See the `learnings-mode` `OpenSpec` change for
10//! the per-signal semantics and the markdown format.
11
12use std::collections::HashMap;
13use std::fmt::Write as _;
14use std::fs::OpenOptions;
15use std::io::Write;
16use std::path::{Path, PathBuf};
17use std::sync::{Arc, Mutex};
18use std::time::{Duration, SystemTime, UNIX_EPOCH};
19
20use std::hash::Hasher as _;
21
22use chrono::{TimeZone as _, Utc};
23
24use super::messages::{BrokerMessage, LearningPayload};
25
26/// Substring marker that conflict-detector-originated messages prepend to
27/// their `errors`/`question` text.
28pub const CONFLICT_DETECTOR_TAG: &str = "[conflict-detector]";
29
30/// Default `count` threshold below which a permission-pattern entry is
31/// withheld at flush. See the `Permission-pattern signal` spec.
32pub const PERMISSION_PATTERN_THRESHOLD: u64 = 5;
33
34/// Category tag for conflict-detector-derived learnings.
35pub const CATEGORY_CONFLICT_EVENT: &str = "conflict_event";
36/// Category tag for stuck-duration learnings.
37pub const CATEGORY_STUCK_DURATION: &str = "stuck_duration";
38/// Category tag for recovery-cycle learnings.
39pub const CATEGORY_RECOVERY_CYCLES: &str = "recovery_cycles";
40/// Category tag for permission-pattern learnings.
41pub const CATEGORY_PERMISSION_PATTERN: &str = "permission_pattern";
42
43// --- Qualitative learning categories (v0.6.0) ---
44//
45// These four categories are NOT produced by the aggregator itself; they are
46// published by the supervisor LLM via the existing `agent.learning` wire
47// variant (no wire-format change — the open `category` enum from the
48// `agent-learning-variant` change carries them). The aggregator ingests them
49// on the publish path and routes each into its own file section. The broker
50// does NOT validate the bodies (consumer-side discipline, design D5); the
51// documented body shape below is what the supervisor skill teaches the LLM to
52// emit, and the file renderer tolerates drift by falling back to a JSON dump.
53
54/// Category tag for recurring-failure-shape learnings: the same error shape
55/// observed across multiple `agent.feedback` cycles from distinct branches.
56///
57/// Documented body shape (design D1):
58/// ```json
59/// { "shape": "import cycle in module X",
60///   "instances": [ { "branch_id": "feat/a", "feedback_id": "...", "excerpt": "..." } ] }
61/// ```
62/// Primary identifier (for within-session dedup): `shape`.
63pub const CATEGORY_RECURRING_FAILURE_SHAPE: &str = "recurring_failure_shape";
64/// Category tag for documentation-gap learnings: a spec assumes a convention
65/// that no checked-in doc explains.
66///
67/// Documented body shape (design D1):
68/// ```json
69/// { "convention": "agents run lint before commit",
70///   "evidence_paths": ["AGENTS.md"], "suggestion": "add to AGENTS.md" }
71/// ```
72/// Primary identifier: `convention`.
73pub const CATEGORY_DOC_GAP: &str = "doc_gap";
74/// Category tag for ADR / architectural-drift learnings: code introduces a
75/// decision (pattern, dependency, boundary) not reflected in the configured
76/// ADRs.
77///
78/// Documented body shape (design D1):
79/// ```json
80/// { "decision_area": "async runtime", "observed_pattern": "...",
81///   "configured_adr_path": "docs/adr", "candidate_adr_title": "ADR-NNNN: ..." }
82/// ```
83/// Primary identifier: `decision_area`.
84pub const CATEGORY_ADR_DRIFT: &str = "adr_drift";
85/// Category tag for scope-mistake learnings: two or more branches coordinated
86/// heavily because the original spec scope drew the boundary in the wrong
87/// place.
88///
89/// Documented body shape (design D1):
90/// ```json
91/// { "branches": ["feat/a", "feat/b"], "shared_files": ["src/foo"],
92///   "coordination_events": [], "suggestion": "merge feat/a and feat/b scopes" }
93/// ```
94/// Primary identifier: the `branches` set.
95pub const CATEGORY_SCOPE_MISTAKE: &str = "scope_mistake";
96
97/// Publishing `agent_id` for aggregator-produced learnings. The aggregator
98/// runs inside the broker/supervisor process, so every record is attributed
99/// to the supervisor regardless of which branch it concerns; per-branch
100/// scoping is carried by [`LearningRecord::branch_id`].
101pub const LEARNINGS_AGENT_ID: &str = "supervisor";
102
103/// One structured, emittable learning record — the single in-memory
104/// representation of a learning destined for the `agent.learning` wire
105/// variant.
106///
107/// The aggregator's working-state entry types ([`StuckDurationEntry`],
108/// [`RecoveryCycleEntry`], [`ConflictEvent`], and the permission counters)
109/// are projected into `LearningRecord`s at flush time. The broker payload is
110/// then produced solely by `From<&LearningRecord> for BrokerMessage` — there
111/// is no parallel wire-record data model (the
112/// `agent-learning-variant` "Internal model serialises directly" requirement).
113#[derive(Debug, Clone, PartialEq)]
114pub struct LearningRecord {
115    /// Open category tag (see the `CATEGORY_*` constants).
116    pub category: String,
117    /// Publishing agent id (typically [`LEARNINGS_AGENT_ID`]).
118    pub agent_id: String,
119    /// Branch the record is scoped to; `None` for cross-cutting records
120    /// (permission patterns, conflict pairs).
121    pub branch_id: Option<String>,
122    /// Short human-readable summary; mirrors the markdown bullet text.
123    pub title: String,
124    /// Category-specific structured body.
125    pub body: serde_json::Value,
126    /// When the record was committed; used for the hour bucket and the
127    /// emitted ISO-8601 wire timestamp.
128    pub timestamp: SystemTime,
129}
130
131impl LearningRecord {
132    /// Computes the deterministic dedup id: a stable 16-hex-char (64-bit)
133    /// hash of a canonical serialisation of `category`, `branch_id`, the
134    /// `body` (object keys sorted), and the UTC hour bucket (`YYYY-MM-DDTHH`).
135    ///
136    /// Uses the std-library [`DefaultHasher`](std::collections::hash_map::DefaultHasher)
137    /// — no external crypto dependency. The id is not a security primitive;
138    /// it only needs to be deterministic for the same canonical input so
139    /// consumers can dedupe. Re-publishing the same logical record within one
140    /// UTC hour yields the same id; across an hour boundary the id changes so
141    /// a genuine recurrence registers.
142    #[must_use]
143    pub fn deterministic_id(&self) -> String {
144        let mut canon = String::new();
145        canon.push_str(&self.category);
146        canon.push('|');
147        canon.push_str(self.branch_id.as_deref().unwrap_or(""));
148        canon.push('|');
149        canonical_value(&self.body, &mut canon);
150        canon.push('|');
151        canon.push_str(&hour_bucket(self.timestamp));
152
153        let mut hasher = std::collections::hash_map::DefaultHasher::new();
154        hasher.write(canon.as_bytes());
155        format!("{:016x}", hasher.finish())
156    }
157}
158
159impl From<&LearningRecord> for BrokerMessage {
160    fn from(record: &LearningRecord) -> Self {
161        BrokerMessage::Learning {
162            payload: LearningPayload {
163                id: record.deterministic_id(),
164                agent_id: record.agent_id.clone(),
165                branch_id: record.branch_id.clone(),
166                category: record.category.clone(),
167                title: record.title.clone(),
168                body: record.body.clone(),
169                timestamp: format_iso8601_utc(record.timestamp),
170            },
171        }
172    }
173}
174
175/// A resolved-or-unresolved stuck-duration observation for one agent.
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct StuckDurationEntry {
178    /// The agent that was blocked.
179    pub agent_id: String,
180    /// The agent the blocked agent was waiting on (`payload.from`).
181    pub blocked_on: String,
182    /// Elapsed wall-clock seconds between the block start and either the
183    /// resolving artifact or the shutdown flush.
184    pub duration_seconds: u64,
185    /// `true` if a subsequent artifact resolved the block, `false` if the
186    /// session ended with the block still open.
187    pub resolved: bool,
188}
189
190/// A per-agent recovery-cycle count, recorded when an agent verifies.
191#[derive(Debug, Clone, PartialEq, Eq)]
192pub struct RecoveryCycleEntry {
193    /// The agent whose work was eventually verified.
194    pub agent_id: String,
195    /// Number of `agent.feedback` messages the agent received before
196    /// verification. Guaranteed to be `>= 1` per the spec.
197    pub count: u32,
198}
199
200/// Classification of a single conflict-detector-derived event.
201#[derive(Debug, Clone, PartialEq, Eq)]
202pub enum ConflictCategory {
203    /// Forward conflict between agents in the same `SpecEntry` family.
204    ForwardConflictIntraSpec {
205        /// Sorted pair of agent ids implicated by the conflict.
206        agents: Vec<String>,
207        /// The shared spec id (e.g. `003-user-list`).
208        spec_id: String,
209        /// Intersecting region descriptors (e.g. `function validate_token`)
210        /// when the forward conflict was detected at region granularity;
211        /// empty for a file-level conflict. See
212        /// `conflict-detector-fn-granularity`.
213        regions: Vec<String>,
214    },
215    /// Forward conflict spanning two `SpecEntry` families.
216    ForwardConflictCrossSpec {
217        /// Sorted pair of agent ids implicated by the conflict.
218        agents: Vec<String>,
219        /// Spec ids for the agents, in the same order as `agents`. May be
220        /// empty entries when the agent → spec mapping is not yet known.
221        spec_ids: Vec<String>,
222        /// Intersecting region descriptors when detected at region
223        /// granularity; empty for a file-level conflict.
224        regions: Vec<String>,
225    },
226    /// In-flight conflict — both agents currently editing the same file.
227    InFlightConflict {
228        /// Sorted pair of agent ids touching the shared file.
229        agents: Vec<String>,
230    },
231    /// Ownership violation — `violator` edited a file owned by `owner`.
232    OwnershipViolation {
233        /// Agent that touched the file outside its declared ownership.
234        violator: String,
235        /// Agent that owns the file (per its `agent.intent`).
236        owner: String,
237        /// The conflicting file path, if extractable from the message.
238        file: String,
239    },
240}
241
242/// A classified conflict event ready to be rendered as a markdown bullet.
243#[derive(Debug, Clone, PartialEq, Eq)]
244pub struct ConflictEvent {
245    /// What kind of conflict this is.
246    pub category: ConflictCategory,
247}
248
249/// Aggregator state maintained across observed events.
250///
251/// The aggregator is normally owned by a single `Arc<Mutex<_>>` shared
252/// between the publish path (which observes events) and the periodic flush
253/// task. All input methods take `&mut self` so callers must hold the lock
254/// for the duration of the call.
255#[derive(Debug)]
256pub struct LearningsAggregator {
257    /// Outstanding blocks keyed by blocked agent id.
258    pending_blocks: HashMap<String, (SystemTime, String)>,
259    /// Running feedback counts per target agent id. Cleared once the agent
260    /// verifies (or at shutdown flush for unresolved agents).
261    feedback_counts: HashMap<String, u32>,
262    /// Completed stuck-duration observations awaiting flush.
263    stuck_events: Vec<StuckDurationEntry>,
264    /// Recovery-cycle observations awaiting flush.
265    recovery_events: Vec<RecoveryCycleEntry>,
266    /// Classified conflict events awaiting flush.
267    conflict_events: Vec<ConflictEvent>,
268    /// Per-command-class auto-approve hit counts. Persisted across flushes
269    /// so a slow burn can eventually cross the threshold.
270    permission_counts: HashMap<String, u64>,
271    /// Command classes already emitted to the markdown file this session.
272    /// Each class produces at most one entry per session.
273    permission_emitted: HashMap<String, u64>,
274    /// Cursor: number of `stuck_events` already written to the markdown.
275    stuck_flushed: usize,
276    /// Cursor: number of `recovery_events` already written to the markdown.
277    recovery_flushed: usize,
278    /// Cursor: number of `conflict_events` already written to the markdown.
279    conflict_flushed: usize,
280    /// Whether the H2 session header has been written.
281    h2_written: bool,
282    /// Session start time, used for the H2 header and shutdown durations.
283    session_start: SystemTime,
284    /// Per-flush threshold for permission patterns. Lower-count classes do
285    /// not appear in the markdown until they cross this threshold.
286    permission_threshold: u64,
287    /// Agent → spec id mapping, used to classify forward conflicts as
288    /// intra-spec vs cross-spec. May be empty.
289    spec_ids: HashMap<String, String>,
290    /// Output markdown path (typically `.git-paw/session-learnings.md`).
291    file_path: PathBuf,
292    /// Cached set of known agent ids, used to extract the "other" agent
293    /// id from conflict-detector message text.
294    known_agents: Vec<String>,
295    /// When `true`, flushed entries are also projected into
296    /// [`LearningRecord`]s and queued in `pending_publish` for the flush loop
297    /// to emit as `agent.learning` broker messages. When `false`, the
298    /// aggregator is file-only (v0.5.0 behaviour) and never queues records.
299    broker_publish: bool,
300    /// Records committed during flushes, awaiting broker publish; drained by
301    /// [`Self::take_pending_publish`]. Always empty when `broker_publish` is
302    /// `false`.
303    pending_publish: Vec<LearningRecord>,
304    /// Qualitative learning records ingested from externally-published
305    /// `agent.learning` messages (the supervisor LLM), awaiting flush. Unlike
306    /// the deterministic event vectors above, these arrive fully-formed on the
307    /// publish path rather than being accumulated from raw broker traffic, and
308    /// are NEVER re-published (they already came from the broker).
309    qualitative_events: Vec<LearningPayload>,
310    /// Cursor: number of `qualitative_events` already written to the markdown.
311    qualitative_flushed: usize,
312}
313
314impl LearningsAggregator {
315    /// Creates a new aggregator with default thresholds and an empty
316    /// agent → spec id mapping. Callers register agents via
317    /// [`Self::register_agent`] and the optional spec id via
318    /// [`Self::set_spec_id`].
319    #[must_use]
320    pub fn new(file_path: PathBuf) -> Self {
321        Self::with_threshold(file_path, PERMISSION_PATTERN_THRESHOLD)
322    }
323
324    /// Like [`Self::new`] but with a custom permission-pattern threshold.
325    #[must_use]
326    pub fn with_threshold(file_path: PathBuf, permission_threshold: u64) -> Self {
327        Self {
328            pending_blocks: HashMap::new(),
329            feedback_counts: HashMap::new(),
330            stuck_events: Vec::new(),
331            recovery_events: Vec::new(),
332            conflict_events: Vec::new(),
333            permission_counts: HashMap::new(),
334            permission_emitted: HashMap::new(),
335            stuck_flushed: 0,
336            recovery_flushed: 0,
337            conflict_flushed: 0,
338            h2_written: false,
339            session_start: SystemTime::now(),
340            permission_threshold,
341            spec_ids: HashMap::new(),
342            file_path,
343            known_agents: Vec::new(),
344            broker_publish: false,
345            pending_publish: Vec::new(),
346            qualitative_events: Vec::new(),
347            qualitative_flushed: 0,
348        }
349    }
350
351    /// Enables or disables broker publication of flushed records. Off by
352    /// default (file-only, the v0.5.0 behaviour). The dual-output path is
353    /// gated on this so `[broker] enabled = false` or
354    /// `[supervisor.learnings] broker_publish = "force_off"` produce no
355    /// broker traffic at all.
356    pub fn set_broker_publish(&mut self, enabled: bool) {
357        self.broker_publish = enabled;
358    }
359
360    /// Returns whether broker publication of flushed records is enabled.
361    #[must_use]
362    pub fn broker_publish_enabled(&self) -> bool {
363        self.broker_publish
364    }
365
366    /// Drains the queue of [`LearningRecord`]s committed since the last call
367    /// so the flush loop can emit them as `agent.learning` messages. Always
368    /// empty when broker publish is disabled.
369    pub fn take_pending_publish(&mut self) -> Vec<LearningRecord> {
370        std::mem::take(&mut self.pending_publish)
371    }
372
373    /// Registers an agent so its id can be extracted from conflict-detector
374    /// message text. Idempotent.
375    pub fn register_agent(&mut self, agent_id: &str) {
376        if !self.known_agents.iter().any(|a| a == agent_id) {
377            self.known_agents.push(agent_id.to_string());
378        }
379    }
380
381    /// Records the `agent_id` → `spec_id` mapping used by forward-conflict
382    /// intra-vs-cross classification. Replaces any previous value.
383    pub fn set_spec_id(&mut self, agent_id: &str, spec_id: &str) {
384        self.spec_ids
385            .insert(agent_id.to_string(), spec_id.to_string());
386    }
387
388    /// Returns the path the aggregator flushes to.
389    #[must_use]
390    pub fn file_path(&self) -> &Path {
391        &self.file_path
392    }
393
394    /// Records the start of a block from agent X waiting on agent Y.
395    pub fn record_blocked(&mut self, agent_id: &str, blocked_on: &str, ts: SystemTime) {
396        self.register_agent(agent_id);
397        self.pending_blocks
398            .insert(agent_id.to_string(), (ts, blocked_on.to_string()));
399    }
400
401    /// Records an artifact event from `agent_id`. If a pending block exists
402    /// for that agent, it is resolved into a stuck-duration entry.
403    pub fn record_artifact(&mut self, agent_id: &str, ts: SystemTime) {
404        self.register_agent(agent_id);
405        if let Some((start, blocked_on)) = self.pending_blocks.remove(agent_id) {
406            let duration = ts.duration_since(start).unwrap_or(Duration::ZERO).as_secs();
407            self.stuck_events.push(StuckDurationEntry {
408                agent_id: agent_id.to_string(),
409                blocked_on,
410                duration_seconds: duration,
411                resolved: true,
412            });
413        }
414    }
415
416    /// Increments the recovery-cycle counter for the agent the feedback is
417    /// addressed to.
418    pub fn record_feedback(&mut self, target_agent_id: &str) {
419        self.register_agent(target_agent_id);
420        *self
421            .feedback_counts
422            .entry(target_agent_id.to_string())
423            .or_insert(0) += 1;
424    }
425
426    /// Emits a recovery-cycle entry if the target accumulated at least one
427    /// feedback before verifying. Clears the counter either way.
428    pub fn record_verified(&mut self, target_agent_id: &str) {
429        self.register_agent(target_agent_id);
430        if let Some(count) = self.feedback_counts.remove(target_agent_id)
431            && count >= 1
432        {
433            self.recovery_events.push(RecoveryCycleEntry {
434                agent_id: target_agent_id.to_string(),
435                count,
436            });
437        }
438    }
439
440    /// Increments the per-class auto-approve counter. Threshold gating
441    /// happens at flush time.
442    pub fn record_auto_approve(&mut self, command_class: &str) {
443        let key = command_class.trim();
444        if key.is_empty() {
445            return;
446        }
447        *self.permission_counts.entry(key.to_string()).or_insert(0) += 1;
448    }
449
450    /// Classifies a `[conflict-detector]`-tagged feedback or question
451    /// message and (if it represents a new conflict event) accumulates an
452    /// entry.
453    ///
454    /// Duplicate detector messages — e.g. the symmetric forward-conflict
455    /// feedback sent to both agents in the pair — are deduplicated by
456    /// canonical agent pair and category so each conflict yields exactly
457    /// one bullet.
458    pub fn record_detector_message(&mut self, msg: &BrokerMessage) {
459        let text = match msg {
460            BrokerMessage::Feedback { payload, .. } => payload.errors.join(" "),
461            BrokerMessage::Question { payload, .. } => payload.question.clone(),
462            _ => return,
463        };
464        if !text.contains(CONFLICT_DETECTOR_TAG) {
465            return;
466        }
467
468        let target = msg.agent_id().to_string();
469        self.register_agent(&target);
470        let others = self.other_agents_in_text(&text, &target);
471        let file = extract_file_token(&text);
472
473        if text.contains("ownership violation") {
474            // The recipient is the violator; the other agent is the owner.
475            if let Some(owner) = others.first() {
476                let candidate = ConflictCategory::OwnershipViolation {
477                    violator: target.clone(),
478                    owner: owner.clone(),
479                    file: file.clone().unwrap_or_default(),
480                };
481                if !self.has_conflict_category(&candidate) {
482                    self.conflict_events.push(ConflictEvent {
483                        category: candidate,
484                    });
485                }
486            }
487            return;
488        }
489
490        if text.contains("forward conflict") {
491            if let Some(other) = others.first() {
492                let pair = sorted_pair(&target, other);
493                let regions = extract_regions(&text);
494                let category = self.classify_forward(&pair, regions);
495                if !self.has_conflict_category(&category) {
496                    self.conflict_events.push(ConflictEvent { category });
497                }
498            }
499            return;
500        }
501
502        if text.contains("in-flight conflict")
503            && let Some(other) = others.first()
504        {
505            let pair = sorted_pair(&target, other);
506            let category = ConflictCategory::InFlightConflict { agents: pair };
507            if !self.has_conflict_category(&category) {
508                self.conflict_events.push(ConflictEvent { category });
509            }
510        }
511    }
512
513    /// Convenience dispatcher: routes a `BrokerMessage` to the appropriate
514    /// `record_*` method. Returns `true` if the aggregator's state changed.
515    pub fn observe(&mut self, msg: &BrokerMessage) {
516        match msg {
517            BrokerMessage::Blocked { agent_id, payload } => {
518                self.record_blocked(agent_id, &payload.from, SystemTime::now());
519            }
520            BrokerMessage::Artifact { agent_id, .. } => {
521                self.record_artifact(agent_id, SystemTime::now());
522            }
523            BrokerMessage::Verified { agent_id, .. } => {
524                self.record_verified(agent_id);
525            }
526            BrokerMessage::Feedback {
527                agent_id, payload, ..
528            } => {
529                self.record_feedback(agent_id);
530                // The forward / in-flight / ownership detectors all send
531                // `agent.feedback`; classification is keyed on text content.
532                let text = payload.errors.join(" ");
533                if text.contains(CONFLICT_DETECTOR_TAG) {
534                    self.record_detector_message(msg);
535                }
536            }
537            BrokerMessage::Question { payload, .. } => {
538                if payload.question.contains(CONFLICT_DETECTOR_TAG) {
539                    self.record_detector_message(msg);
540                }
541            }
542            BrokerMessage::Status { agent_id, payload } => {
543                if payload.status == "auto_approved"
544                    && let Some(cls) = extract_command_class(payload.message.as_deref())
545                {
546                    self.record_auto_approve(&cls);
547                }
548                self.register_agent(agent_id);
549            }
550            BrokerMessage::Intent { agent_id, .. } => {
551                // `agent.intent` is purely informational for the aggregator —
552                // it carries the agent's planned files, not a learning signal.
553                // Register the sender so downstream `agent.blocked` /
554                // `agent.artifact` correlations can find it.
555                self.register_agent(agent_id);
556            }
557            BrokerMessage::Learning { payload } => {
558                // `agent.learning` records flow back through the publish path.
559                // The aggregator's own deterministic-category records were
560                // already rendered by `write_flush`; re-aggregating them would
561                // double-render (and recurse), so they are ignored here. Any
562                // OTHER category is an externally-published (supervisor LLM)
563                // qualitative record the aggregator must route into a file
564                // section — see [`Self::record_qualitative`].
565                self.record_qualitative(payload);
566            }
567            // `supervisor.verify-now` is a broker-emitted operational nudge and
568            // `agent.advanced-main` is a supervisor-published merge notification
569            // — both are coordination signals, not agent learnings, and are
570            // ignored.
571            BrokerMessage::VerifyNow { .. } | BrokerMessage::AdvancedMain { .. } => {}
572        }
573    }
574
575    /// Ingests an externally-published `agent.learning` record for file
576    /// rendering. Deterministic-category records (the aggregator's own,
577    /// flowing back through the publish path) are dropped to avoid
578    /// double-rendering. The remaining qualitative / unknown-category records
579    /// are accumulated for the next flush, after a within-session dedup pass
580    /// (design D3, belt-and-braces on top of the skill-level dedup): a record
581    /// whose `(category, primary identifier)` matches one already ingested
582    /// this session is suppressed.
583    ///
584    /// Ingested records are NEVER queued for publish — they already came from
585    /// the broker.
586    pub fn record_qualitative(&mut self, payload: &LearningPayload) {
587        if is_deterministic_category(&payload.category) {
588            return;
589        }
590        let key = qualitative_dedup_key(payload);
591        if self
592            .qualitative_events
593            .iter()
594            .any(|p| qualitative_dedup_key(p) == key)
595        {
596            return;
597        }
598        self.qualitative_events.push(payload.clone());
599    }
600
601    /// Appends accumulated entries (since the last flush) to the markdown
602    /// file. Empty categories are omitted from the output.
603    pub fn flush(&mut self) -> std::io::Result<()> {
604        self.write_flush(false)
605    }
606
607    /// Identical to [`Self::flush`] but additionally records any open
608    /// stuck-duration entries as unresolved with the duration measured up
609    /// to `now`. Used at broker shutdown.
610    pub fn flush_at_shutdown(&mut self) -> std::io::Result<()> {
611        let now = SystemTime::now();
612        let pending: Vec<(String, SystemTime, String)> = self
613            .pending_blocks
614            .drain()
615            .map(|(agent, (start, on))| (agent, start, on))
616            .collect();
617        for (agent, start, on) in pending {
618            let duration = now
619                .duration_since(start)
620                .unwrap_or(Duration::ZERO)
621                .as_secs();
622            self.stuck_events.push(StuckDurationEntry {
623                agent_id: agent,
624                blocked_on: on,
625                duration_seconds: duration,
626                resolved: false,
627            });
628        }
629        // Flush any recovery cycles for agents that never verified.
630        let pending_recovery: Vec<(String, u32)> = self.feedback_counts.drain().collect();
631        for (agent, count) in pending_recovery {
632            if count >= 1 {
633                self.recovery_events.push(RecoveryCycleEntry {
634                    agent_id: agent,
635                    count,
636                });
637            }
638        }
639        self.write_flush(true)
640    }
641
642    fn classify_forward(&self, pair: &[String], regions: Vec<String>) -> ConflictCategory {
643        let spec_a = self.spec_ids.get(&pair[0]);
644        let spec_b = self.spec_ids.get(&pair[1]);
645        match (spec_a, spec_b) {
646            (Some(a), Some(b)) if a == b => ConflictCategory::ForwardConflictIntraSpec {
647                agents: pair.to_vec(),
648                spec_id: a.clone(),
649                regions,
650            },
651            (Some(a), Some(b)) => ConflictCategory::ForwardConflictCrossSpec {
652                agents: pair.to_vec(),
653                spec_ids: vec![a.clone(), b.clone()],
654                regions,
655            },
656            _ => ConflictCategory::ForwardConflictCrossSpec {
657                agents: pair.to_vec(),
658                spec_ids: vec![
659                    spec_a.cloned().unwrap_or_default(),
660                    spec_b.cloned().unwrap_or_default(),
661                ],
662                regions,
663            },
664        }
665    }
666
667    fn has_conflict_category(&self, candidate: &ConflictCategory) -> bool {
668        self.conflict_events
669            .iter()
670            .any(|e| matches_category(&e.category, candidate))
671    }
672
673    fn other_agents_in_text(&self, text: &str, exclude: &str) -> Vec<String> {
674        self.known_agents
675            .iter()
676            .filter(|id| *id != exclude && text.contains(id.as_str()))
677            .cloned()
678            .collect()
679    }
680
681    fn write_flush(&mut self, _shutdown: bool) -> std::io::Result<()> {
682        let broker_publish = self.broker_publish;
683        let new_stuck = &self.stuck_events[self.stuck_flushed..];
684        let new_recovery = &self.recovery_events[self.recovery_flushed..];
685        let new_conflicts = &self.conflict_events[self.conflict_flushed..];
686
687        let permission_entries: Vec<(String, u64)> = {
688            let mut entries: Vec<(String, u64)> = self
689                .permission_counts
690                .iter()
691                .filter(|(class, count)| {
692                    **count >= self.permission_threshold
693                        && self.permission_emitted.get(*class).copied().unwrap_or(0) < **count
694                })
695                .map(|(k, v)| (k.clone(), *v))
696                .collect();
697            entries.sort_by(|a, b| a.0.cmp(&b.0));
698            entries
699        };
700
701        let new_qualitative = &self.qualitative_events[self.qualitative_flushed..];
702
703        let has_any = !new_stuck.is_empty()
704            || !new_recovery.is_empty()
705            || !new_conflicts.is_empty()
706            || !permission_entries.is_empty()
707            || !new_qualitative.is_empty();
708        if !has_any {
709            return Ok(());
710        }
711
712        let mut out = String::new();
713        if !self.h2_written {
714            let ts = format_iso8601_utc(self.session_start);
715            let _ = writeln!(out, "## Session Learnings — {ts}");
716            self.h2_written = true;
717        }
718
719        if !new_conflicts.is_empty() {
720            out.push_str("\n### Conflict events\n");
721            for ev in new_conflicts {
722                let _ = writeln!(out, "- {}", render_conflict(&ev.category));
723            }
724        }
725        if !new_stuck.is_empty() {
726            out.push_str("\n### Where agents got stuck\n");
727            for ev in new_stuck {
728                let _ = writeln!(out, "- {}", render_stuck(ev));
729            }
730        }
731        if !new_recovery.is_empty() {
732            out.push_str("\n### Recovery cycles\n");
733            for ev in new_recovery {
734                let _ = writeln!(out, "- {}", render_recovery(ev));
735            }
736        }
737        if !permission_entries.is_empty() {
738            out.push_str("\n### Permission patterns\n");
739            for (class, count) in &permission_entries {
740                let _ = writeln!(out, "- {}", render_permission(class, *count));
741            }
742        }
743
744        render_qualitative_sections(new_qualitative, &mut out);
745
746        // Project the newly-flushed entries into wire-bound `LearningRecord`s
747        // for the broker, when broker publish is active. This is purely
748        // additive: it reads the same slices used for the markdown above and
749        // does NOT alter the file output (which remains byte-for-byte the
750        // v0.5.0 shape). `now` stamps every record from this flush so they
751        // share one UTC hour bucket for id stability.
752        let records: Vec<LearningRecord> = if broker_publish {
753            let now = SystemTime::now();
754            let mut records =
755                Vec::with_capacity(new_conflicts.len() + new_stuck.len() + new_recovery.len());
756            for ev in new_conflicts {
757                records.push(record_from_conflict(&ev.category, now));
758            }
759            for ev in new_stuck {
760                records.push(record_from_stuck(ev, now));
761            }
762            for ev in new_recovery {
763                records.push(record_from_recovery(ev, now));
764            }
765            for (class, count) in &permission_entries {
766                records.push(record_from_permission(class, *count, now));
767            }
768            records
769        } else {
770            Vec::new()
771        };
772
773        append_to_file(&self.file_path, &out)?;
774
775        self.stuck_flushed = self.stuck_events.len();
776        self.recovery_flushed = self.recovery_events.len();
777        self.conflict_flushed = self.conflict_events.len();
778        self.qualitative_flushed = self.qualitative_events.len();
779        for (class, count) in &permission_entries {
780            self.permission_emitted.insert(class.clone(), *count);
781        }
782        // Queue records only after the file write succeeded, so a failed
783        // append never publishes a record that isn't also in the file.
784        self.pending_publish.extend(records);
785        Ok(())
786    }
787
788    #[cfg(test)]
789    fn stuck_events(&self) -> &[StuckDurationEntry] {
790        &self.stuck_events
791    }
792
793    #[cfg(test)]
794    fn recovery_events(&self) -> &[RecoveryCycleEntry] {
795        &self.recovery_events
796    }
797
798    #[cfg(test)]
799    fn conflict_events(&self) -> &[ConflictEvent] {
800        &self.conflict_events
801    }
802
803    #[cfg(test)]
804    fn qualitative_events(&self) -> &[LearningPayload] {
805        &self.qualitative_events
806    }
807}
808
809/// Reference-counted aggregator handle shared between the broker's publish
810/// path and the periodic flush task.
811pub type SharedLearnings = Arc<Mutex<LearningsAggregator>>;
812
813fn append_to_file(path: &Path, contents: &str) -> std::io::Result<()> {
814    if let Some(parent) = path.parent()
815        && !parent.as_os_str().is_empty()
816    {
817        std::fs::create_dir_all(parent)?;
818    }
819    let mut file = OpenOptions::new().create(true).append(true).open(path)?;
820    file.write_all(contents.as_bytes())
821}
822
823/// The Markdown H3 header under which `/tell` routing decisions are recorded
824/// (design D4).
825pub const ROUTING_SECTION_HEADER: &str = "### Supervisor routing";
826
827/// Maximum prompt length recorded verbatim in a routing entry; longer prompts
828/// are truncated with a trailing `…` (the full prompt rides the broker
829/// message).
830pub const ROUTING_PROMPT_MAX_CHARS: usize = 200;
831
832/// Formats one "Supervisor routing" log line for a `/tell` invocation
833/// (design D4).
834///
835/// `ts_iso` is the ISO-8601 UTC timestamp, `target` the agent identifier,
836/// `mode` the resolved delivery-mode label (`"feedback"` / `"send-keys"`), and
837/// `prompt` the user-typed prompt — truncated to
838/// [`ROUTING_PROMPT_MAX_CHARS`] characters with a trailing `…` when longer.
839#[must_use]
840pub fn format_routing_entry(ts_iso: &str, target: &str, mode: &str, prompt: &str) -> String {
841    let trimmed = prompt.trim();
842    let shown = if trimmed.chars().count() > ROUTING_PROMPT_MAX_CHARS {
843        let mut s: String = trimmed.chars().take(ROUTING_PROMPT_MAX_CHARS).collect();
844        s.push('…');
845        s
846    } else {
847        trimmed.to_string()
848    };
849    format!("- {ts_iso} — supervisor told `{target}` via {mode}: \"{shown}\"")
850}
851
852/// Appends a `/tell` routing decision to the "Supervisor routing" section of
853/// the learnings file, gated on `enabled` (design D4 / task 7).
854///
855/// Reuses the same [`append_to_file`] helper the aggregator uses for its
856/// flushes. When `enabled` is `false` this is a strict no-op — no file is
857/// created or written, honouring the `[supervisor] learnings = false`
858/// contract. The section header is written once, the first time a routing
859/// record lands in a file that does not already contain it.
860///
861/// # Errors
862/// Propagates any I/O error from reading or appending to the file.
863pub fn append_routing_record(
864    path: &Path,
865    enabled: bool,
866    ts_iso: &str,
867    target: &str,
868    mode: &str,
869    prompt: &str,
870) -> std::io::Result<()> {
871    if !enabled {
872        return Ok(());
873    }
874    let needs_header = match std::fs::read_to_string(path) {
875        Ok(existing) => !existing.contains(ROUTING_SECTION_HEADER),
876        Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
877        Err(e) => return Err(e),
878    };
879    let mut block = String::new();
880    if needs_header {
881        block.push('\n');
882        block.push_str(ROUTING_SECTION_HEADER);
883        block.push('\n');
884    }
885    block.push_str(&format_routing_entry(ts_iso, target, mode, prompt));
886    block.push('\n');
887    append_to_file(path, &block)
888}
889
890/// Appends a canonical, key-sorted serialisation of `value` to `out`.
891///
892/// Objects emit their keys in sorted order so that two logically-identical
893/// bodies (regardless of insertion order) produce byte-identical canonical
894/// strings — a precondition for the deterministic dedup id.
895fn canonical_value(value: &serde_json::Value, out: &mut String) {
896    use serde_json::Value;
897    match value {
898        Value::Object(map) => {
899            let mut keys: Vec<&String> = map.keys().collect();
900            keys.sort();
901            out.push('{');
902            for (i, key) in keys.iter().enumerate() {
903                if i > 0 {
904                    out.push(',');
905                }
906                out.push_str(key);
907                out.push(':');
908                canonical_value(&map[*key], out);
909            }
910            out.push('}');
911        }
912        Value::Array(items) => {
913            out.push('[');
914            for (i, item) in items.iter().enumerate() {
915                if i > 0 {
916                    out.push(',');
917                }
918                canonical_value(item, out);
919            }
920            out.push(']');
921        }
922        other => out.push_str(&other.to_string()),
923    }
924}
925
926/// Formats `time` as a UTC hour bucket (`YYYY-MM-DDTHH`) for id hashing.
927fn hour_bucket(time: SystemTime) -> String {
928    let secs = time.duration_since(UNIX_EPOCH).map_or(0, |d| d.as_secs());
929    Utc.timestamp_opt(i64::try_from(secs).unwrap_or(0), 0)
930        .single()
931        .map(|dt| dt.format("%Y-%m-%dT%H").to_string())
932        .unwrap_or_default()
933}
934
935fn sorted_pair(a: &str, b: &str) -> Vec<String> {
936    let mut pair = vec![a.to_string(), b.to_string()];
937    pair.sort();
938    pair
939}
940
941fn matches_category(a: &ConflictCategory, b: &ConflictCategory) -> bool {
942    use ConflictCategory::{
943        ForwardConflictCrossSpec, ForwardConflictIntraSpec, InFlightConflict, OwnershipViolation,
944    };
945    match (a, b) {
946        (
947            ForwardConflictIntraSpec { agents: x, .. },
948            ForwardConflictIntraSpec { agents: y, .. },
949        )
950        | (
951            ForwardConflictCrossSpec { agents: x, .. },
952            ForwardConflictCrossSpec { agents: y, .. },
953        )
954        | (InFlightConflict { agents: x }, InFlightConflict { agents: y }) => x == y,
955        (
956            OwnershipViolation {
957                violator: vx,
958                owner: ox,
959                file: fx,
960            },
961            OwnershipViolation {
962                violator: vy,
963                owner: oy,
964                file: fy,
965            },
966        ) => vx == vy && ox == oy && fx == fy,
967        _ => false,
968    }
969}
970
971fn extract_file_token(text: &str) -> Option<String> {
972    // Heuristic: pick the first whitespace-delimited token that looks like
973    // a path with an extension (e.g. `src/main.rs`).
974    text.split_whitespace()
975        .find(|tok| {
976            let cleaned = tok.trim_matches(|c: char| !c.is_alphanumeric() && c != '/' && c != '.');
977            cleaned.contains('.') && cleaned.contains('/')
978        })
979        .map(|tok| {
980            tok.trim_matches(|c: char| !c.is_alphanumeric() && c != '/' && c != '.')
981                .to_string()
982        })
983}
984
985/// Extracts intersecting region descriptors from a region-aware
986/// forward-conflict detector message.
987///
988/// The detector renders region-level conflicts as
989/// `... path (regions: function foo, range 10-30); ...`. This collects each
990/// comma-separated descriptor inside every `(regions: ...)` group, preserving
991/// order and de-duplicating. Returns an empty vec for a file-level conflict
992/// (no `(regions: ...)` group present), in which case the body omits the
993/// `regions` field.
994fn extract_regions(text: &str) -> Vec<String> {
995    let mut out: Vec<String> = Vec::new();
996    let mut rest = text;
997    while let Some(start) = rest.find("(regions: ") {
998        let after = &rest[start + "(regions: ".len()..];
999        let Some(end) = after.find(')') else { break };
1000        for descriptor in after[..end].split(',') {
1001            let trimmed = descriptor.trim();
1002            if !trimmed.is_empty() && !out.iter().any(|d| d == trimmed) {
1003                out.push(trimmed.to_string());
1004            }
1005        }
1006        rest = &after[end..];
1007    }
1008    out
1009}
1010
1011fn extract_command_class(message: Option<&str>) -> Option<String> {
1012    let msg = message?;
1013    msg.strip_prefix("auto_approved: matched ")
1014        .map(|rest| rest.trim().to_string())
1015        .filter(|s| !s.is_empty())
1016}
1017
1018/// Projects a classified conflict event into a `conflict_event`
1019/// [`LearningRecord`]. Conflicts are cross-cutting (they implicate a pair of
1020/// branches), so `branch_id` is `None` and the involved agents live in the
1021/// body.
1022fn record_from_conflict(cat: &ConflictCategory, now: SystemTime) -> LearningRecord {
1023    use serde_json::json;
1024    let body = match cat {
1025        ConflictCategory::ForwardConflictIntraSpec {
1026            agents,
1027            spec_id,
1028            regions,
1029        } => {
1030            let mut b = json!({
1031                "shape": "forward_intra_spec",
1032                "agents": agents,
1033                "spec_id": spec_id,
1034            });
1035            if !regions.is_empty() {
1036                b["regions"] = json!(regions);
1037            }
1038            b
1039        }
1040        ConflictCategory::ForwardConflictCrossSpec {
1041            agents,
1042            spec_ids,
1043            regions,
1044        } => {
1045            let mut b = json!({
1046                "shape": "forward_cross_spec",
1047                "agents": agents,
1048                "spec_ids": spec_ids,
1049            });
1050            if !regions.is_empty() {
1051                b["regions"] = json!(regions);
1052            }
1053            b
1054        }
1055        ConflictCategory::InFlightConflict { agents } => json!({
1056            "shape": "in_flight",
1057            "agents": agents,
1058        }),
1059        ConflictCategory::OwnershipViolation {
1060            violator,
1061            owner,
1062            file,
1063        } => json!({
1064            "shape": "ownership_violation",
1065            "violator": violator,
1066            "owner": owner,
1067            "file": file,
1068        }),
1069    };
1070    LearningRecord {
1071        category: CATEGORY_CONFLICT_EVENT.to_string(),
1072        agent_id: LEARNINGS_AGENT_ID.to_string(),
1073        branch_id: None,
1074        title: render_conflict(cat),
1075        body,
1076        timestamp: now,
1077    }
1078}
1079
1080/// Projects a stuck-duration entry into a `stuck_duration` [`LearningRecord`]
1081/// scoped to the blocked agent's branch.
1082fn record_from_stuck(ev: &StuckDurationEntry, now: SystemTime) -> LearningRecord {
1083    LearningRecord {
1084        category: CATEGORY_STUCK_DURATION.to_string(),
1085        agent_id: LEARNINGS_AGENT_ID.to_string(),
1086        branch_id: Some(ev.agent_id.clone()),
1087        title: render_stuck(ev),
1088        body: serde_json::json!({
1089            "agent_id": ev.agent_id,
1090            "blocked_on": ev.blocked_on,
1091            "duration_seconds": ev.duration_seconds,
1092            "resolved": ev.resolved,
1093        }),
1094        timestamp: now,
1095    }
1096}
1097
1098/// Projects a recovery-cycle entry into a `recovery_cycles`
1099/// [`LearningRecord`] scoped to the verifying agent's branch.
1100fn record_from_recovery(ev: &RecoveryCycleEntry, now: SystemTime) -> LearningRecord {
1101    LearningRecord {
1102        category: CATEGORY_RECOVERY_CYCLES.to_string(),
1103        agent_id: LEARNINGS_AGENT_ID.to_string(),
1104        branch_id: Some(ev.agent_id.clone()),
1105        title: render_recovery(ev),
1106        body: serde_json::json!({
1107            "agent_id": ev.agent_id,
1108            "count": ev.count,
1109        }),
1110        timestamp: now,
1111    }
1112}
1113
1114/// Projects a permission-pattern entry into a `permission_pattern`
1115/// [`LearningRecord`]. Permission patterns are cross-cutting (they describe
1116/// the supervisor's auto-approve behaviour), so `branch_id` is `None`.
1117fn record_from_permission(class: &str, count: u64, now: SystemTime) -> LearningRecord {
1118    LearningRecord {
1119        category: CATEGORY_PERMISSION_PATTERN.to_string(),
1120        agent_id: LEARNINGS_AGENT_ID.to_string(),
1121        branch_id: None,
1122        title: render_permission(class, count),
1123        body: serde_json::json!({
1124            "command_class": class,
1125            "count": count,
1126        }),
1127        timestamp: now,
1128    }
1129}
1130
1131fn render_conflict(cat: &ConflictCategory) -> String {
1132    match cat {
1133        ConflictCategory::ForwardConflictIntraSpec {
1134            agents, spec_id, ..
1135        } => {
1136            format!(
1137                "forward-conflict-intra-spec: {} (spec {})",
1138                agents.join(" and "),
1139                spec_id
1140            )
1141        }
1142        ConflictCategory::ForwardConflictCrossSpec {
1143            agents, spec_ids, ..
1144        } => {
1145            let specs: Vec<String> = spec_ids.iter().filter(|s| !s.is_empty()).cloned().collect();
1146            if specs.is_empty() {
1147                format!("forward-conflict-cross-spec: {}", agents.join(" and "))
1148            } else {
1149                format!(
1150                    "forward-conflict-cross-spec: {} (specs {})",
1151                    agents.join(" and "),
1152                    specs.join(", ")
1153                )
1154            }
1155        }
1156        ConflictCategory::InFlightConflict { agents } => {
1157            format!("in-flight-conflict: {}", agents.join(" and "))
1158        }
1159        ConflictCategory::OwnershipViolation {
1160            violator,
1161            owner,
1162            file,
1163        } => {
1164            if file.is_empty() {
1165                format!("ownership-violation: {violator} edited a file owned by {owner}")
1166            } else {
1167                format!("ownership-violation: {violator} edited `{file}` owned by {owner}")
1168            }
1169        }
1170    }
1171}
1172
1173fn render_stuck(ev: &StuckDurationEntry) -> String {
1174    let dur = format_duration(ev.duration_seconds);
1175    let suffix = if ev.resolved {
1176        String::new()
1177    } else {
1178        " (unresolved at session end)".to_string()
1179    };
1180    format!(
1181        "{}: blocked {dur} waiting on {}{suffix}",
1182        ev.agent_id, ev.blocked_on
1183    )
1184}
1185
1186fn render_recovery(ev: &RecoveryCycleEntry) -> String {
1187    let cycles = if ev.count == 1 { "cycle" } else { "cycles" };
1188    format!(
1189        "{}: {} feedback {cycles} before verifying",
1190        ev.agent_id, ev.count
1191    )
1192}
1193
1194fn render_permission(class: &str, count: u64) -> String {
1195    format!("`{class}` auto-approved {count} times")
1196}
1197
1198// === Qualitative learnings (v0.6.0) ===
1199
1200/// The recognised qualitative categories paired with their file-section
1201/// headers, in render order. Categories absent from this table fall through
1202/// to the "Other learnings" section.
1203const QUALITATIVE_SECTIONS: &[(&str, &str)] = &[
1204    (CATEGORY_RECURRING_FAILURE_SHAPE, "Recurring failure shapes"),
1205    (CATEGORY_DOC_GAP, "Documentation gaps"),
1206    (CATEGORY_ADR_DRIFT, "ADR / architectural drift"),
1207    (CATEGORY_SCOPE_MISTAKE, "Scope-mistake signals"),
1208];
1209
1210/// Returns `true` for the four v0.5.0 deterministic categories the aggregator
1211/// produces itself. Used to drop the aggregator's own records when they flow
1212/// back through the publish path, so only externally-published qualitative
1213/// records are ingested for file rendering.
1214fn is_deterministic_category(category: &str) -> bool {
1215    matches!(
1216        category,
1217        CATEGORY_CONFLICT_EVENT
1218            | CATEGORY_STUCK_DURATION
1219            | CATEGORY_RECOVERY_CYCLES
1220            | CATEGORY_PERMISSION_PATTERN
1221    )
1222}
1223
1224/// Renders the qualitative records flushed this round into `out` (design D4):
1225/// one section per recognised category in a fixed order, each emitted only
1226/// when there is a record for it, followed by an "Other learnings" fallback
1227/// that absorbs every unrecognised category so nothing is silently dropped.
1228fn render_qualitative_sections(new_qualitative: &[LearningPayload], out: &mut String) {
1229    for (category, header) in QUALITATIVE_SECTIONS {
1230        let mut wrote_header = false;
1231        for p in new_qualitative.iter().filter(|p| &p.category == category) {
1232            if !wrote_header {
1233                let _ = writeln!(out, "\n### {header}");
1234                wrote_header = true;
1235            }
1236            out.push_str(&render_qualitative(p));
1237        }
1238    }
1239    let mut wrote_other = false;
1240    for p in new_qualitative
1241        .iter()
1242        .filter(|p| qualitative_section(&p.category).is_none())
1243    {
1244        if !wrote_other {
1245            out.push_str("\n### Other learnings\n");
1246            wrote_other = true;
1247        }
1248        out.push_str(&render_qualitative(p));
1249    }
1250}
1251
1252/// Returns the file-section header for a qualitative `category`, or `None`
1253/// when the category is unrecognised (routes to "Other learnings").
1254fn qualitative_section(category: &str) -> Option<&'static str> {
1255    QUALITATIVE_SECTIONS
1256        .iter()
1257        .find(|(cat, _)| *cat == category)
1258        .map(|(_, header)| *header)
1259}
1260
1261/// Reads a string `key` from a JSON object body, if present and a string.
1262fn string_field(body: &serde_json::Value, key: &str) -> Option<String> {
1263    body.get(key).and_then(|v| v.as_str()).map(str::to_string)
1264}
1265
1266/// Reads an array `key` from a JSON object body and returns its elements
1267/// sorted and comma-joined, for use as a stable primary identifier. Non-string
1268/// elements are serialised with their JSON representation.
1269fn sorted_array_field(body: &serde_json::Value, key: &str) -> Option<String> {
1270    let arr = body.get(key)?.as_array()?;
1271    let mut items: Vec<String> = arr
1272        .iter()
1273        .map(|v| {
1274            v.as_str()
1275                .map_or_else(|| v.to_string(), std::string::ToString::to_string)
1276        })
1277        .collect();
1278    items.sort();
1279    Some(items.join(","))
1280}
1281
1282/// Computes the within-session dedup key for a qualitative record: its
1283/// category plus the category's primary identifier (design D3). When the
1284/// primary identifier is absent (malformed body) or the category is unknown,
1285/// the publisher's deterministic `id` is used instead, so only exact
1286/// duplicates are suppressed and distinct-but-malformed records survive.
1287fn qualitative_dedup_key(p: &LearningPayload) -> String {
1288    let primary = match p.category.as_str() {
1289        CATEGORY_RECURRING_FAILURE_SHAPE => string_field(&p.body, "shape"),
1290        CATEGORY_DOC_GAP => string_field(&p.body, "convention"),
1291        CATEGORY_ADR_DRIFT => string_field(&p.body, "decision_area"),
1292        CATEGORY_SCOPE_MISTAKE => sorted_array_field(&p.body, "branches"),
1293        _ => None,
1294    };
1295    match primary {
1296        Some(id) => format!("{}|{}", p.category, id),
1297        None => format!("{}|#{}", p.category, p.id),
1298    }
1299}
1300
1301/// Serialises a JSON body compactly, falling back to its `Display` form if
1302/// serialisation somehow fails (it cannot for an in-memory `Value`).
1303fn compact_json(value: &serde_json::Value) -> String {
1304    serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
1305}
1306
1307/// Renders one qualitative record as a markdown bullet block (always ending
1308/// in a newline). Well-formed bodies get a structured one-line summary; bodies
1309/// that don't match the documented shape fall back to the title plus a JSON
1310/// dump (design D5, tolerant rendering) so a record is never dropped.
1311fn render_qualitative(p: &LearningPayload) -> String {
1312    render_qualitative_structured(p).unwrap_or_else(|| render_qualitative_fallback(p))
1313}
1314
1315/// Tolerant fallback: the record's `title` followed by its `body` serialised
1316/// as compact JSON on an indented continuation line.
1317fn render_qualitative_fallback(p: &LearningPayload) -> String {
1318    format!("- {}\n  {}\n", p.title, compact_json(&p.body))
1319}
1320
1321/// Structured rendering for a recognised, well-formed qualitative body.
1322/// Returns `None` when a required field is missing so the caller falls back to
1323/// [`render_qualitative_fallback`].
1324fn render_qualitative_structured(p: &LearningPayload) -> Option<String> {
1325    match p.category.as_str() {
1326        CATEGORY_RECURRING_FAILURE_SHAPE => {
1327            let shape = string_field(&p.body, "shape")?;
1328            let instances = p.body.get("instances")?.as_array()?;
1329            let branches: Vec<String> = instances
1330                .iter()
1331                .filter_map(|i| i.get("branch_id").and_then(|v| v.as_str()))
1332                .map(str::to_string)
1333                .collect();
1334            let n = instances.len();
1335            let noun = if n == 1 { "instance" } else { "instances" };
1336            let across = if branches.is_empty() {
1337                String::new()
1338            } else {
1339                format!(" across {}", branches.join(", "))
1340            };
1341            Some(format!("- {shape}: {n} {noun}{across}\n"))
1342        }
1343        CATEGORY_DOC_GAP => {
1344            let convention = string_field(&p.body, "convention")?;
1345            let suggestion = string_field(&p.body, "suggestion")?;
1346            Some(format!("- {convention} — {suggestion}\n"))
1347        }
1348        CATEGORY_ADR_DRIFT => {
1349            let area = string_field(&p.body, "decision_area")?;
1350            let observed = string_field(&p.body, "observed_pattern")?;
1351            Some(format!("- {area}: {observed}\n"))
1352        }
1353        CATEGORY_SCOPE_MISTAKE => {
1354            let branches = p.body.get("branches")?.as_array()?;
1355            let names: Vec<String> = branches
1356                .iter()
1357                .filter_map(|v| v.as_str())
1358                .map(str::to_string)
1359                .collect();
1360            if names.is_empty() {
1361                return None;
1362            }
1363            let suggestion = string_field(&p.body, "suggestion")?;
1364            Some(format!("- {} — {suggestion}\n", names.join(" and ")))
1365        }
1366        _ => None,
1367    }
1368}
1369
1370fn format_duration(seconds: u64) -> String {
1371    let m = seconds / 60;
1372    let s = seconds % 60;
1373    if m == 0 {
1374        format!("{s}s")
1375    } else {
1376        format!("{m}m{s:02}s")
1377    }
1378}
1379
1380fn format_iso8601_utc(time: SystemTime) -> String {
1381    let secs = time.duration_since(UNIX_EPOCH).map_or(0, |d| d.as_secs());
1382    let (year, month, day, hour, min, sec) = secs_to_civil(secs);
1383    format!("{year:04}-{month:02}-{day:02}T{hour:02}:{min:02}:{sec:02}Z")
1384}
1385
1386#[allow(clippy::cast_possible_wrap)]
1387#[allow(clippy::cast_sign_loss)]
1388fn secs_to_civil(secs: u64) -> (u64, u64, u64, u64, u64, u64) {
1389    let sec_of_day = secs % 86400;
1390    let hour = sec_of_day / 3600;
1391    let min = (sec_of_day % 3600) / 60;
1392    let sec = sec_of_day % 60;
1393
1394    let mut days = (secs / 86400) as i64;
1395    days += 719_468;
1396    let era = days.div_euclid(146_097);
1397    let doe = (days - era * 146_097) as u64;
1398    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
1399    let y = yoe as i64 + era * 400;
1400    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1401    let mp = (5 * doy + 2) / 153;
1402    let d = doy - (153 * mp + 2) / 5 + 1;
1403    let m = if mp < 10 { mp + 3 } else { mp - 9 };
1404    let y = if m <= 2 { y + 1 } else { y };
1405    (y as u64, m, d, hour, min, sec)
1406}
1407
1408#[cfg(test)]
1409mod tests {
1410    use super::*;
1411    use crate::broker::messages::{
1412        ArtifactPayload, BlockedPayload, FeedbackPayload, QuestionPayload, StatusPayload,
1413        VerifiedPayload,
1414    };
1415    use std::time::Duration;
1416    use tempfile::TempDir;
1417
1418    fn agg(tmp: &TempDir) -> LearningsAggregator {
1419        LearningsAggregator::new(tmp.path().join("session-learnings.md"))
1420    }
1421
1422    fn read_md(path: &Path) -> String {
1423        std::fs::read_to_string(path).unwrap_or_default()
1424    }
1425
1426    fn blocked(agent: &str, from: &str) -> BrokerMessage {
1427        BrokerMessage::Blocked {
1428            agent_id: agent.to_string(),
1429            payload: BlockedPayload {
1430                needs: "x".to_string(),
1431                from: from.to_string(),
1432            },
1433        }
1434    }
1435
1436    fn artifact(agent: &str) -> BrokerMessage {
1437        BrokerMessage::Artifact {
1438            agent_id: agent.to_string(),
1439            payload: ArtifactPayload {
1440                status: "done".to_string(),
1441                exports: vec![],
1442                modified_files: vec![],
1443            },
1444        }
1445    }
1446
1447    fn feedback(target: &str, errors: &[&str]) -> BrokerMessage {
1448        BrokerMessage::Feedback {
1449            agent_id: target.to_string(),
1450            payload: FeedbackPayload {
1451                from: "supervisor".to_string(),
1452                errors: errors.iter().map(|s| (*s).to_string()).collect(),
1453            },
1454        }
1455    }
1456
1457    fn verified(target: &str) -> BrokerMessage {
1458        BrokerMessage::Verified {
1459            agent_id: target.to_string(),
1460            payload: VerifiedPayload {
1461                verified_by: "supervisor".to_string(),
1462                message: None,
1463            },
1464        }
1465    }
1466
1467    fn question(text: &str) -> BrokerMessage {
1468        BrokerMessage::Question {
1469            agent_id: "supervisor".to_string(),
1470            payload: QuestionPayload {
1471                question: text.to_string(),
1472            },
1473        }
1474    }
1475
1476    fn auto_approve_status(agent: &str, class: &str) -> BrokerMessage {
1477        BrokerMessage::Status {
1478            agent_id: agent.to_string(),
1479            payload: StatusPayload {
1480                status: "auto_approved".to_string(),
1481                modified_files: vec![],
1482                message: Some(format!("auto_approved: matched {class}")),
1483                ..Default::default()
1484            },
1485        }
1486    }
1487
1488    #[test]
1489    fn stuck_duration_resolved_on_artifact() {
1490        let tmp = TempDir::new().unwrap();
1491        let mut a = agg(&tmp);
1492        let t0 = SystemTime::now();
1493        a.record_blocked("x", "y", t0);
1494        a.record_artifact("x", t0 + Duration::from_secs(672));
1495        let events = a.stuck_events();
1496        assert_eq!(events.len(), 1);
1497        assert_eq!(events[0].agent_id, "x");
1498        assert_eq!(events[0].blocked_on, "y");
1499        assert!((670..=674).contains(&events[0].duration_seconds));
1500        assert!(events[0].resolved);
1501
1502        a.flush().unwrap();
1503        let md = read_md(a.file_path());
1504        assert!(md.contains("### Where agents got stuck"));
1505        assert!(md.contains("x: blocked"));
1506        assert!(md.contains("waiting on y"));
1507    }
1508
1509    #[test]
1510    fn stuck_duration_unresolved_at_shutdown() {
1511        let tmp = TempDir::new().unwrap();
1512        let mut a = agg(&tmp);
1513        let t0 = SystemTime::now() - Duration::from_mins(2);
1514        a.record_blocked("x", "y", t0);
1515        a.flush_at_shutdown().unwrap();
1516        let events = a.stuck_events();
1517        assert_eq!(events.len(), 1);
1518        assert!(!events[0].resolved);
1519        assert!(events[0].duration_seconds >= 119);
1520        let md = read_md(a.file_path());
1521        assert!(md.contains("unresolved at session end"));
1522    }
1523
1524    #[test]
1525    fn recovery_cycles_recorded_on_verify() {
1526        let tmp = TempDir::new().unwrap();
1527        let mut a = agg(&tmp);
1528        a.record_feedback("x");
1529        a.record_feedback("x");
1530        a.record_feedback("x");
1531        a.record_verified("x");
1532        assert_eq!(a.recovery_events().len(), 1);
1533        assert_eq!(a.recovery_events()[0].agent_id, "x");
1534        assert_eq!(a.recovery_events()[0].count, 3);
1535    }
1536
1537    #[test]
1538    fn recovery_cycles_zero_count_skipped() {
1539        let tmp = TempDir::new().unwrap();
1540        let mut a = agg(&tmp);
1541        a.record_verified("x");
1542        assert!(a.recovery_events().is_empty());
1543        a.flush().unwrap();
1544        assert_eq!(read_md(a.file_path()), "");
1545    }
1546
1547    #[test]
1548    fn forward_conflict_intra_spec_recorded_once() {
1549        let tmp = TempDir::new().unwrap();
1550        let mut a = agg(&tmp);
1551        a.register_agent("feat-x");
1552        a.register_agent("feat-y");
1553        a.set_spec_id("feat-x", "003-user-list");
1554        a.set_spec_id("feat-y", "003-user-list");
1555
1556        a.record_detector_message(&feedback(
1557            "feat-x",
1558            &["[conflict-detector] forward conflict with feat-y on src/main.rs"],
1559        ));
1560        a.record_detector_message(&feedback(
1561            "feat-y",
1562            &["[conflict-detector] forward conflict with feat-x on src/main.rs"],
1563        ));
1564
1565        let events = a.conflict_events();
1566        assert_eq!(events.len(), 1);
1567        match &events[0].category {
1568            ConflictCategory::ForwardConflictIntraSpec {
1569                agents, spec_id, ..
1570            } => {
1571                assert_eq!(agents, &vec!["feat-x".to_string(), "feat-y".to_string()]);
1572                assert_eq!(spec_id, "003-user-list");
1573            }
1574            other => panic!("expected intra-spec, got {other:?}"),
1575        }
1576    }
1577
1578    #[test]
1579    fn forward_conflict_region_aware_body_includes_regions() {
1580        // A region-aware detector message yields a conflict_event body with a
1581        // `regions` array naming the intersecting regions
1582        // (conflict-detector-fn-granularity task 6.1).
1583        let tmp = TempDir::new().unwrap();
1584        let mut a = agg(&tmp);
1585        a.register_agent("feat-x");
1586        a.register_agent("feat-y");
1587        a.record_detector_message(&feedback(
1588            "feat-y",
1589            &["[conflict-detector] forward conflict: agent feat-x also intends to modify: src/auth.rs (regions: function validate_token, function refresh_session)"],
1590        ));
1591        let events = a.conflict_events();
1592        assert_eq!(events.len(), 1);
1593        let record = record_from_conflict(&events[0].category, SystemTime::now());
1594        let regions = record.body.get("regions").expect("regions field present");
1595        assert_eq!(
1596            regions,
1597            &serde_json::json!(["function validate_token", "function refresh_session"])
1598        );
1599    }
1600
1601    #[test]
1602    fn forward_conflict_file_level_body_omits_regions() {
1603        // A file-level (regionless) forward conflict omits the `regions` field.
1604        let tmp = TempDir::new().unwrap();
1605        let mut a = agg(&tmp);
1606        a.register_agent("feat-x");
1607        a.register_agent("feat-y");
1608        a.record_detector_message(&feedback(
1609            "feat-y",
1610            &["[conflict-detector] forward conflict: agent feat-x also intends to modify: src/main.rs"],
1611        ));
1612        let events = a.conflict_events();
1613        assert_eq!(events.len(), 1);
1614        let record = record_from_conflict(&events[0].category, SystemTime::now());
1615        assert!(
1616            record.body.get("regions").is_none(),
1617            "file-level conflict must omit regions; got {:?}",
1618            record.body
1619        );
1620    }
1621
1622    #[test]
1623    fn extract_regions_parses_descriptors() {
1624        assert_eq!(
1625            extract_regions(
1626                "foo src/a.rs (regions: function f, range 10-30); src/b.rs (regions: class C)"
1627            ),
1628            vec![
1629                "function f".to_string(),
1630                "range 10-30".to_string(),
1631                "class C".to_string()
1632            ]
1633        );
1634        assert!(extract_regions("no regions here, just src/a.rs").is_empty());
1635    }
1636
1637    #[test]
1638    fn forward_conflict_cross_spec_records_specs() {
1639        let tmp = TempDir::new().unwrap();
1640        let mut a = agg(&tmp);
1641        a.register_agent("feat-x");
1642        a.register_agent("feat-y");
1643        a.set_spec_id("feat-x", "003-user-list");
1644        a.set_spec_id("feat-y", "004-error-handling");
1645
1646        a.record_detector_message(&feedback(
1647            "feat-x",
1648            &["[conflict-detector] forward conflict with feat-y on src/main.rs"],
1649        ));
1650        a.record_detector_message(&feedback(
1651            "feat-y",
1652            &["[conflict-detector] forward conflict with feat-x on src/main.rs"],
1653        ));
1654
1655        let events = a.conflict_events();
1656        assert_eq!(events.len(), 1);
1657        match &events[0].category {
1658            ConflictCategory::ForwardConflictCrossSpec {
1659                agents, spec_ids, ..
1660            } => {
1661                assert_eq!(agents, &vec!["feat-x".to_string(), "feat-y".to_string()]);
1662                assert!(spec_ids.iter().any(|s| s == "003-user-list"));
1663                assert!(spec_ids.iter().any(|s| s == "004-error-handling"));
1664            }
1665            other => panic!("expected cross-spec, got {other:?}"),
1666        }
1667    }
1668
1669    #[test]
1670    fn in_flight_conflict_classified() {
1671        let tmp = TempDir::new().unwrap();
1672        let mut a = agg(&tmp);
1673        a.register_agent("feat-x");
1674        a.register_agent("feat-y");
1675        a.record_detector_message(&feedback(
1676            "feat-x",
1677            &["[conflict-detector] in-flight conflict with feat-y on src/a.rs"],
1678        ));
1679        a.record_detector_message(&feedback(
1680            "feat-y",
1681            &["[conflict-detector] in-flight conflict with feat-x on src/a.rs"],
1682        ));
1683        let events = a.conflict_events();
1684        assert_eq!(events.len(), 1);
1685        assert!(matches!(
1686            events[0].category,
1687            ConflictCategory::InFlightConflict { .. }
1688        ));
1689    }
1690
1691    #[test]
1692    fn ownership_violation_classified() {
1693        let tmp = TempDir::new().unwrap();
1694        let mut a = agg(&tmp);
1695        a.register_agent("feat-x");
1696        a.register_agent("feat-y");
1697        a.record_detector_message(&feedback(
1698            "feat-y",
1699            &["[conflict-detector] ownership violation on src/a.rs claimed by feat-x"],
1700        ));
1701        let events = a.conflict_events();
1702        assert_eq!(events.len(), 1);
1703        match &events[0].category {
1704            ConflictCategory::OwnershipViolation {
1705                violator,
1706                owner,
1707                file,
1708            } => {
1709                assert_eq!(violator, "feat-y");
1710                assert_eq!(owner, "feat-x");
1711                assert_eq!(file, "src/a.rs");
1712            }
1713            other => panic!("expected ownership-violation, got {other:?}"),
1714        }
1715    }
1716
1717    #[test]
1718    fn detector_question_to_supervisor_is_classified() {
1719        let tmp = TempDir::new().unwrap();
1720        let mut a = agg(&tmp);
1721        a.register_agent("feat-x");
1722        a.register_agent("feat-y");
1723        a.record_detector_message(&question(
1724            "[conflict-detector] in-flight conflict between feat-x and feat-y on src/a.rs",
1725        ));
1726        // Question target is "supervisor" which isn't in known_agents,
1727        // so the classifier looks at the two real agents both mentioned.
1728        let events = a.conflict_events();
1729        assert_eq!(events.len(), 1);
1730        assert!(matches!(
1731            events[0].category,
1732            ConflictCategory::InFlightConflict { .. }
1733        ));
1734    }
1735
1736    #[test]
1737    fn permission_pattern_above_threshold_emits_entry() {
1738        let tmp = TempDir::new().unwrap();
1739        let mut a = agg(&tmp);
1740        for _ in 0..23 {
1741            a.record_auto_approve("cargo check");
1742        }
1743        a.flush().unwrap();
1744        let md = read_md(a.file_path());
1745        assert!(md.contains("### Permission patterns"));
1746        assert!(md.contains("`cargo check` auto-approved 23 times"));
1747    }
1748
1749    #[test]
1750    fn permission_pattern_below_threshold_omitted_then_emitted_later() {
1751        let tmp = TempDir::new().unwrap();
1752        let mut a = agg(&tmp);
1753        a.record_auto_approve("git status");
1754        a.record_auto_approve("git status");
1755        // Need at least one signal to flush — give it an artifact.
1756        a.flush().unwrap();
1757        let md1 = read_md(a.file_path());
1758        assert!(!md1.contains("git status"));
1759
1760        // A later burst pushes the count over the default 5.
1761        for _ in 0..5 {
1762            a.record_auto_approve("git status");
1763        }
1764        a.flush().unwrap();
1765        let md2 = read_md(a.file_path());
1766        assert!(md2.contains("`git status` auto-approved 7 times"));
1767    }
1768
1769    #[test]
1770    fn no_learnings_session_writes_nothing() {
1771        let tmp = TempDir::new().unwrap();
1772        let mut a = agg(&tmp);
1773        a.flush().unwrap();
1774        a.flush_at_shutdown().unwrap();
1775        assert_eq!(read_md(a.file_path()), "");
1776        assert!(!a.file_path().exists() || read_md(a.file_path()).is_empty());
1777    }
1778
1779    #[test]
1780    fn flush_writes_h2_header_once_per_session() {
1781        let tmp = TempDir::new().unwrap();
1782        let mut a = agg(&tmp);
1783        for _ in 0..PERMISSION_PATTERN_THRESHOLD {
1784            a.record_auto_approve("cargo check");
1785        }
1786        a.flush().unwrap();
1787        // Add another signal and flush again — should NOT add a second H2.
1788        a.record_feedback("alpha");
1789        a.record_verified("alpha");
1790        a.flush().unwrap();
1791
1792        let md = read_md(a.file_path());
1793        let h2_count = md.matches("## Session Learnings — ").count();
1794        assert_eq!(h2_count, 1, "expected exactly one H2, got\n{md}");
1795        // ISO timestamp on the H2 line.
1796        let h2_line = md
1797            .lines()
1798            .find(|l| l.starts_with("## Session Learnings — "))
1799            .unwrap();
1800        let ts = h2_line.trim_start_matches("## Session Learnings — ").trim();
1801        assert!(
1802            regex_like_iso(ts),
1803            "H2 timestamp did not match ISO regex: {ts:?}"
1804        );
1805    }
1806
1807    fn regex_like_iso(s: &str) -> bool {
1808        // Equivalent of ^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$
1809        let bytes = s.as_bytes();
1810        if bytes.len() != 20 {
1811            return false;
1812        }
1813        for (i, b) in bytes.iter().enumerate() {
1814            let ok = match i {
1815                4 | 7 => *b == b'-',
1816                10 => *b == b'T',
1817                13 | 16 => *b == b':',
1818                19 => *b == b'Z',
1819                _ => b.is_ascii_digit(),
1820            };
1821            if !ok {
1822                return false;
1823            }
1824        }
1825        true
1826    }
1827
1828    #[test]
1829    fn second_session_appends_new_h2_preserves_prior_content() {
1830        let tmp = TempDir::new().unwrap();
1831        let path = tmp.path().join("session-learnings.md");
1832        let mut a1 = LearningsAggregator::new(path.clone());
1833        for _ in 0..PERMISSION_PATTERN_THRESHOLD {
1834            a1.record_auto_approve("cargo check");
1835        }
1836        a1.flush().unwrap();
1837        let after_first = read_md(&path);
1838        assert!(after_first.contains("`cargo check`"));
1839
1840        // Wait a second so the ISO timestamps differ.
1841        std::thread::sleep(Duration::from_secs(1));
1842
1843        let mut a2 = LearningsAggregator::new(path.clone());
1844        for _ in 0..PERMISSION_PATTERN_THRESHOLD {
1845            a2.record_auto_approve("cargo fmt");
1846        }
1847        a2.flush().unwrap();
1848        let after_second = read_md(&path);
1849        // Prior content unchanged at the start of the file.
1850        assert!(after_second.starts_with(after_first.trim_end()));
1851        // Two H2 headers in the file.
1852        let h2_count = after_second.matches("## Session Learnings — ").count();
1853        assert_eq!(h2_count, 2);
1854        assert!(after_second.contains("`cargo fmt`"));
1855    }
1856
1857    #[test]
1858    fn observe_routes_blocked_and_artifact() {
1859        let tmp = TempDir::new().unwrap();
1860        let mut a = agg(&tmp);
1861        a.observe(&blocked("x", "y"));
1862        a.observe(&artifact("x"));
1863        assert_eq!(a.stuck_events().len(), 1);
1864    }
1865
1866    #[test]
1867    fn observe_increments_feedback_then_records_recovery() {
1868        let tmp = TempDir::new().unwrap();
1869        let mut a = agg(&tmp);
1870        for _ in 0..3 {
1871            a.observe(&feedback("x", &["test failed"]));
1872        }
1873        a.observe(&verified("x"));
1874        assert_eq!(a.recovery_events().len(), 1);
1875        assert_eq!(a.recovery_events()[0].count, 3);
1876    }
1877
1878    #[test]
1879    fn observe_auto_approve_increments_counter() {
1880        let tmp = TempDir::new().unwrap();
1881        let mut a = agg(&tmp);
1882        for _ in 0..PERMISSION_PATTERN_THRESHOLD {
1883            a.observe(&auto_approve_status("feat-x", "cargo check"));
1884        }
1885        a.flush().unwrap();
1886        assert!(read_md(a.file_path()).contains("`cargo check` auto-approved"));
1887    }
1888
1889    #[test]
1890    fn extract_command_class_parses_matched_entry() {
1891        assert_eq!(
1892            extract_command_class(Some("auto_approved: matched cargo check")),
1893            Some("cargo check".to_string())
1894        );
1895        assert_eq!(extract_command_class(Some("auto_approved")), None);
1896        assert_eq!(extract_command_class(None), None);
1897    }
1898
1899    /// Spec scenario `Markdown file output / Empty categories are omitted`:
1900    /// a session with conflict events but no stuck-duration events must
1901    /// produce a `### Conflict events` heading and MUST NOT produce a
1902    /// `### Where agents got stuck` heading.
1903    #[test]
1904    fn empty_categories_are_omitted_from_markdown() {
1905        let tmp = TempDir::new().unwrap();
1906        let mut a = agg(&tmp);
1907        a.register_agent("feat-x");
1908        a.register_agent("feat-y");
1909        a.record_detector_message(&feedback(
1910            "feat-x",
1911            &["[conflict-detector] in-flight conflict with feat-y on src/a.rs"],
1912        ));
1913        a.flush().unwrap();
1914
1915        let md = read_md(a.file_path());
1916        assert!(md.contains("### Conflict events"));
1917        assert!(
1918            !md.contains("### Where agents got stuck"),
1919            "stuck heading should be omitted when there are no stuck events:\n{md}"
1920        );
1921        assert!(
1922            !md.contains("### Recovery cycles"),
1923            "recovery heading should be omitted when there are no recovery events:\n{md}"
1924        );
1925        assert!(
1926            !md.contains("### Permission patterns"),
1927            "permission heading should be omitted when there are no permission entries:\n{md}"
1928        );
1929    }
1930
1931    /// Spec scenario `Periodic flush + shutdown flush / Burst of events
1932    /// does not trigger eager flush`: observing 5 events in quick
1933    /// succession SHALL NOT write anything until `flush()` is invoked.
1934    #[test]
1935    fn burst_of_events_does_not_write_until_flush() {
1936        let tmp = TempDir::new().unwrap();
1937        let mut a = agg(&tmp);
1938        a.register_agent("feat-x");
1939        a.register_agent("feat-y");
1940        for _ in 0..5 {
1941            a.record_detector_message(&feedback(
1942                "feat-x",
1943                &["[conflict-detector] in-flight conflict with feat-y on src/a.rs"],
1944            ));
1945        }
1946        // Five back-to-back observes — the file must remain unwritten.
1947        assert!(
1948            !a.file_path().exists() || read_md(a.file_path()).is_empty(),
1949            "aggregator wrote eagerly without a flush call"
1950        );
1951        // One explicit flush captures all events together.
1952        a.flush().unwrap();
1953        let md = read_md(a.file_path());
1954        // Despite five `observe` calls, deduping keeps exactly one bullet
1955        // for the same canonical pair — the assertion the spec actually
1956        // cares about is "no flush until the timer fires".
1957        assert!(md.contains("### Conflict events"));
1958    }
1959
1960    /// Spec scenario (agent-learning-variant) `agent.learning broker message
1961    /// variant`: the variant deferred in v0.5.0 now exists and round-trips.
1962    /// This supersedes the v0.5.0 `broker_message_has_no_agent_learning_variant`
1963    /// negative test, which asserted the variant's *absence*.
1964    #[test]
1965    fn broker_message_has_agent_learning_variant() {
1966        use crate::broker::messages::LearningPayload;
1967
1968        // A well-formed `agent.learning` envelope now deserialises cleanly.
1969        let probe = r#"{"type":"agent.learning","payload":{"id":"abc123def456abcd","agent_id":"supervisor","branch_id":"feat/x","category":"conflict_event","title":"forward conflict","body":{"shape":"forward"},"timestamp":"2026-05-28T12:01:01Z"}}"#;
1970        let msg = BrokerMessage::from_json(probe)
1971            .expect("a well-formed agent.learning envelope must deserialise");
1972        let BrokerMessage::Learning { payload } = &msg else {
1973            panic!("expected Learning, got {msg:?}");
1974        };
1975        assert_eq!(payload.category, "conflict_event");
1976        assert_eq!(payload.agent_id, "supervisor");
1977        assert_eq!(payload.branch_id.as_deref(), Some("feat/x"));
1978        assert_eq!(msg.status_label(), "learning");
1979
1980        // And it re-serialises under the documented wire tag.
1981        let round = BrokerMessage::Learning {
1982            payload: LearningPayload {
1983                id: "abc123def456abcd".to_string(),
1984                agent_id: "supervisor".to_string(),
1985                branch_id: None,
1986                category: "permission_pattern".to_string(),
1987                title: "`cargo check` auto-approved 23 times".to_string(),
1988                body: serde_json::json!({"command_class": "cargo check", "count": 23}),
1989                timestamp: "2026-05-28T12:01:01Z".to_string(),
1990            },
1991        };
1992        let json = serde_json::to_string(&round).unwrap();
1993        assert!(json.contains("\"type\":\"agent.learning\""));
1994    }
1995
1996    /// Spec scenario `Aggregator does not start when learnings flag is
1997    /// false` and `Aggregator does not start when supervisor is disabled`:
1998    /// the wiring decision keys on `supervisor.enabled && supervisor.learnings`.
1999    #[test]
2000    fn wiring_predicate_only_enables_when_supervisor_and_learnings_both_true() {
2001        use crate::config::{LearningsConfig, SupervisorConfig};
2002
2003        // The predicate used in `cmd_dashboard` to decide whether to
2004        // attach a learnings aggregator. Mirroring it in a test pins down
2005        // the lifecycle requirement: any change to the gating logic
2006        // breaks this test.
2007        fn should_attach(s: Option<&SupervisorConfig>) -> bool {
2008            s.is_some_and(|s| s.enabled && s.learnings)
2009        }
2010
2011        // Section absent → no aggregator.
2012        assert!(!should_attach(None));
2013
2014        // Supervisor disabled, learnings true → no aggregator.
2015        assert!(!should_attach(Some(&SupervisorConfig {
2016            enabled: false,
2017            learnings: true,
2018            learnings_config: LearningsConfig::default(),
2019            ..SupervisorConfig::default()
2020        })));
2021
2022        // Supervisor enabled, learnings false → no aggregator.
2023        assert!(!should_attach(Some(&SupervisorConfig {
2024            enabled: true,
2025            learnings: false,
2026            learnings_config: LearningsConfig::default(),
2027            ..SupervisorConfig::default()
2028        })));
2029
2030        // Both enabled → aggregator attached.
2031        assert!(should_attach(Some(&SupervisorConfig {
2032            enabled: true,
2033            learnings: true,
2034            learnings_config: LearningsConfig::default(),
2035            ..SupervisorConfig::default()
2036        })));
2037    }
2038
2039    // Maps to scenario `Default flush interval is 60 seconds` from
2040    // learnings-mode. (test-coverage-v0-5-0 task 5.1)
2041    #[test]
2042    fn default_flush_interval_is_60_seconds() {
2043        use crate::config::LearningsConfig;
2044        let cfg = LearningsConfig::default();
2045        assert_eq!(
2046            cfg.flush_interval_seconds, 60,
2047            "LearningsConfig::default().flush_interval_seconds must be 60"
2048        );
2049    }
2050
2051    // === agent-learning-variant: deterministic id + record conversion ===
2052
2053    /// A timestamp at `YYYY-MM-DDTHH:MM` UTC for a fixed reference day so the
2054    /// hour-bucket boundary behaviour can be exercised deterministically.
2055    fn ts_at(hour: u64, minute: u64) -> SystemTime {
2056        // 2026-05-28T00:00:00Z = 1_779_926_400 (days since epoch * 86400).
2057        const DAY_START: u64 = 1_779_926_400;
2058        UNIX_EPOCH + Duration::from_secs(DAY_START + hour * 3600 + minute * 60)
2059    }
2060
2061    fn sample_record(category: &str, branch: Option<&str>, ts: SystemTime) -> LearningRecord {
2062        LearningRecord {
2063            category: category.to_string(),
2064            agent_id: LEARNINGS_AGENT_ID.to_string(),
2065            branch_id: branch.map(str::to_string),
2066            title: "title".to_string(),
2067            body: serde_json::json!({"agents": ["feat-x", "feat-y"], "files": ["src/a.rs"]}),
2068            timestamp: ts,
2069        }
2070    }
2071
2072    // Task 2.3: same record at 13:30 and 13:59 produces the same id.
2073    #[test]
2074    fn same_record_within_the_hour_gets_same_id() {
2075        let a = sample_record(CATEGORY_CONFLICT_EVENT, Some("feat-x"), ts_at(13, 30));
2076        let b = sample_record(CATEGORY_CONFLICT_EVENT, Some("feat-x"), ts_at(13, 59));
2077        assert_eq!(a.deterministic_id(), b.deterministic_id());
2078        // The id is a 16-hex-char prefix.
2079        assert_eq!(a.deterministic_id().len(), 16);
2080        assert!(a.deterministic_id().chars().all(|c| c.is_ascii_hexdigit()));
2081    }
2082
2083    // Task 2.4: same record at 13:59 and 14:01 produces different ids.
2084    #[test]
2085    fn same_record_across_hour_boundary_gets_different_ids() {
2086        let a = sample_record(CATEGORY_CONFLICT_EVENT, Some("feat-x"), ts_at(13, 59));
2087        let b = sample_record(CATEGORY_CONFLICT_EVENT, Some("feat-x"), ts_at(14, 1));
2088        assert_ne!(a.deterministic_id(), b.deterministic_id());
2089    }
2090
2091    // Task 2.5: different categories produce different ids even when the body
2092    // is identical.
2093    #[test]
2094    fn different_categories_get_different_ids_with_identical_body() {
2095        let ts = ts_at(13, 30);
2096        let a = sample_record(CATEGORY_CONFLICT_EVENT, Some("feat-x"), ts);
2097        let b = sample_record(CATEGORY_STUCK_DURATION, Some("feat-x"), ts);
2098        assert_ne!(a.deterministic_id(), b.deterministic_id());
2099    }
2100
2101    #[test]
2102    fn id_is_independent_of_body_key_insertion_order() {
2103        let ts = ts_at(13, 30);
2104        let mut a = sample_record(CATEGORY_CONFLICT_EVENT, None, ts);
2105        a.body = serde_json::json!({"alpha": 1, "beta": 2});
2106        let mut b = sample_record(CATEGORY_CONFLICT_EVENT, None, ts);
2107        b.body = serde_json::json!({"beta": 2, "alpha": 1});
2108        assert_eq!(a.deterministic_id(), b.deterministic_id());
2109    }
2110
2111    #[test]
2112    fn branch_id_distinguishes_otherwise_identical_records() {
2113        let ts = ts_at(13, 30);
2114        let a = sample_record(CATEGORY_STUCK_DURATION, Some("feat-x"), ts);
2115        let b = sample_record(CATEGORY_STUCK_DURATION, Some("feat-y"), ts);
2116        assert_ne!(a.deterministic_id(), b.deterministic_id());
2117    }
2118
2119    // Task 1.4: serialise + deserialise each of the four categories' records
2120    // through the From<&LearningRecord> conversion and the wire envelope.
2121    #[test]
2122    fn all_four_categories_round_trip_through_broker_message() {
2123        let now = ts_at(12, 1);
2124        let records = [
2125            record_from_conflict(
2126                &ConflictCategory::InFlightConflict {
2127                    agents: vec!["feat-x".to_string(), "feat-y".to_string()],
2128                },
2129                now,
2130            ),
2131            record_from_stuck(
2132                &StuckDurationEntry {
2133                    agent_id: "feat-x".to_string(),
2134                    blocked_on: "feat-y".to_string(),
2135                    duration_seconds: 672,
2136                    resolved: true,
2137                },
2138                now,
2139            ),
2140            record_from_recovery(
2141                &RecoveryCycleEntry {
2142                    agent_id: "feat-x".to_string(),
2143                    count: 3,
2144                },
2145                now,
2146            ),
2147            record_from_permission("cargo check", 23, now),
2148        ];
2149        let expected_categories = [
2150            CATEGORY_CONFLICT_EVENT,
2151            CATEGORY_STUCK_DURATION,
2152            CATEGORY_RECOVERY_CYCLES,
2153            CATEGORY_PERMISSION_PATTERN,
2154        ];
2155        for (record, expected_category) in records.iter().zip(expected_categories) {
2156            let msg = BrokerMessage::from(record);
2157            let json = serde_json::to_string(&msg).unwrap();
2158            let back = BrokerMessage::from_json(&json)
2159                .unwrap_or_else(|e| panic!("{expected_category} must round-trip: {e}"));
2160            let BrokerMessage::Learning { payload } = back else {
2161                panic!("expected Learning variant for {expected_category}");
2162            };
2163            assert_eq!(payload.category, expected_category);
2164            assert_eq!(payload.id, record.deterministic_id());
2165            assert_eq!(payload.agent_id, LEARNINGS_AGENT_ID);
2166            assert!(!payload.title.is_empty());
2167        }
2168    }
2169
2170    #[test]
2171    fn conflict_and_permission_records_are_cross_cutting_no_branch() {
2172        let now = ts_at(12, 1);
2173        let conflict = record_from_conflict(
2174            &ConflictCategory::InFlightConflict {
2175                agents: vec!["feat-x".to_string(), "feat-y".to_string()],
2176            },
2177            now,
2178        );
2179        let permission = record_from_permission("cargo check", 9, now);
2180        assert_eq!(conflict.branch_id, None);
2181        assert_eq!(permission.branch_id, None);
2182    }
2183
2184    #[test]
2185    fn stuck_and_recovery_records_are_branch_scoped() {
2186        let now = ts_at(12, 1);
2187        let stuck = record_from_stuck(
2188            &StuckDurationEntry {
2189                agent_id: "feat-x".to_string(),
2190                blocked_on: "feat-y".to_string(),
2191                duration_seconds: 60,
2192                resolved: false,
2193            },
2194            now,
2195        );
2196        let recovery = record_from_recovery(
2197            &RecoveryCycleEntry {
2198                agent_id: "feat-z".to_string(),
2199                count: 2,
2200            },
2201            now,
2202        );
2203        assert_eq!(stuck.branch_id.as_deref(), Some("feat-x"));
2204        assert_eq!(recovery.branch_id.as_deref(), Some("feat-z"));
2205    }
2206
2207    // === agent-learning-variant: dual-output gating ===
2208
2209    // Spec scenario `File-only output when broker is disabled`: with broker
2210    // publish off (the default), a flush appends to the file and queues NO
2211    // broker records.
2212    #[test]
2213    fn broker_publish_off_queues_no_records() {
2214        let tmp = TempDir::new().unwrap();
2215        let mut a = agg(&tmp);
2216        assert!(!a.broker_publish_enabled());
2217        for _ in 0..PERMISSION_PATTERN_THRESHOLD {
2218            a.record_auto_approve("cargo check");
2219        }
2220        a.flush().unwrap();
2221        assert!(read_md(a.file_path()).contains("`cargo check`"));
2222        assert!(
2223            a.take_pending_publish().is_empty(),
2224            "no records should be queued when broker publish is disabled"
2225        );
2226    }
2227
2228    // Spec scenario `Both outputs when broker is enabled`: with broker publish
2229    // on, a flush appends to the file AND queues a matching broker record.
2230    #[test]
2231    fn broker_publish_on_queues_records_matching_file() {
2232        let tmp = TempDir::new().unwrap();
2233        let mut a = agg(&tmp);
2234        a.set_broker_publish(true);
2235        a.register_agent("feat-x");
2236        a.register_agent("feat-y");
2237        a.record_detector_message(&feedback(
2238            "feat-x",
2239            &["[conflict-detector] in-flight conflict with feat-y on src/a.rs"],
2240        ));
2241        a.flush().unwrap();
2242
2243        let md = read_md(a.file_path());
2244        assert!(md.contains("### Conflict events"));
2245        let records = a.take_pending_publish();
2246        assert_eq!(records.len(), 1, "one conflict record should be queued");
2247        assert_eq!(records[0].category, CATEGORY_CONFLICT_EVENT);
2248        // The record's title mirrors the markdown bullet text.
2249        assert!(md.contains(&records[0].title));
2250        // Draining empties the queue.
2251        assert!(a.take_pending_publish().is_empty());
2252    }
2253
2254    // === qualitative-learnings: ingestion, routing, tolerant rendering ===
2255
2256    use crate::broker::messages::LearningPayload;
2257
2258    /// Builds an externally-published `agent.learning` envelope with the given
2259    /// category, title, and body — the shape the supervisor LLM publishes.
2260    fn learning(category: &str, title: &str, body: serde_json::Value) -> BrokerMessage {
2261        BrokerMessage::Learning {
2262            payload: LearningPayload {
2263                id: format!("id-{category}-{title}"),
2264                agent_id: LEARNINGS_AGENT_ID.to_string(),
2265                branch_id: None,
2266                category: category.to_string(),
2267                title: title.to_string(),
2268                body,
2269                timestamp: "2026-06-05T12:00:00Z".to_string(),
2270            },
2271        }
2272    }
2273
2274    // Task 2.4: each new category routes to its own section header.
2275    #[test]
2276    fn each_qualitative_category_routes_to_its_section() {
2277        let tmp = TempDir::new().unwrap();
2278        let mut a = agg(&tmp);
2279        a.observe(&learning(
2280            CATEGORY_RECURRING_FAILURE_SHAPE,
2281            "import cycle recurs",
2282            serde_json::json!({
2283                "shape": "import cycle in payments module",
2284                "instances": [
2285                    {"branch_id": "feat/a", "feedback_id": "f1", "excerpt": "..."},
2286                    {"branch_id": "feat/b", "feedback_id": "f2", "excerpt": "..."}
2287                ]
2288            }),
2289        ));
2290        a.observe(&learning(
2291            CATEGORY_DOC_GAP,
2292            "lint-before-commit undocumented",
2293            serde_json::json!({
2294                "convention": "agents run lint before commit",
2295                "evidence_paths": ["AGENTS.md"],
2296                "suggestion": "add a Conventions section to AGENTS.md"
2297            }),
2298        ));
2299        a.observe(&learning(
2300            CATEGORY_ADR_DRIFT,
2301            "async runtime undocumented",
2302            serde_json::json!({
2303                "decision_area": "async runtime",
2304                "observed_pattern": "a background runtime added in the broker server",
2305                "configured_adr_path": "docs/adr",
2306                "candidate_adr_title": "ADR-0007: Adopt an async runtime"
2307            }),
2308        ));
2309        a.observe(&learning(
2310            CATEGORY_SCOPE_MISTAKE,
2311            "two branches over-coordinated",
2312            serde_json::json!({
2313                "branches": ["feat/a", "feat/b"],
2314                "shared_files": ["src/router"],
2315                "coordination_events": [],
2316                "suggestion": "merge the feat/a and feat/b scopes"
2317            }),
2318        ));
2319        a.flush().unwrap();
2320
2321        let md = read_md(a.file_path());
2322        assert!(md.contains("### Recurring failure shapes"), "{md}");
2323        assert!(md.contains("import cycle in payments module: 2 instances across feat/a, feat/b"));
2324        assert!(md.contains("### Documentation gaps"), "{md}");
2325        assert!(
2326            md.contains("- agents run lint before commit — add a Conventions section to AGENTS.md")
2327        );
2328        assert!(md.contains("### ADR / architectural drift"), "{md}");
2329        assert!(md.contains("- async runtime: a background runtime added in the broker server"));
2330        assert!(md.contains("### Scope-mistake signals"), "{md}");
2331        assert!(md.contains("- feat/a and feat/b — merge the feat/a and feat/b scopes"));
2332    }
2333
2334    // Task 2.4: a body that lacks a documented field renders as the title plus
2335    // a JSON dump under the category's section (design D5).
2336    #[test]
2337    fn malformed_qualitative_body_renders_as_title_plus_json() {
2338        let tmp = TempDir::new().unwrap();
2339        let mut a = agg(&tmp);
2340        // Lacks the documented `instances` field.
2341        a.observe(&learning(
2342            CATEGORY_RECURRING_FAILURE_SHAPE,
2343            "vague shape with no instances",
2344            serde_json::json!({"shape": "something fuzzy"}),
2345        ));
2346        a.flush().unwrap();
2347
2348        let md = read_md(a.file_path());
2349        // Still under the recurring-failure-shape section.
2350        assert!(md.contains("### Recurring failure shapes"), "{md}");
2351        // Title line present.
2352        assert!(md.contains("- vague shape with no instances"), "{md}");
2353        // Body serialised as JSON present (not dropped).
2354        assert!(md.contains(r#"{"shape":"something fuzzy"}"#), "{md}");
2355    }
2356
2357    // Task 2.4: an unrecognised category lands under "Other learnings" and is
2358    // not silently dropped.
2359    #[test]
2360    fn unknown_category_falls_through_to_other_learnings() {
2361        let tmp = TempDir::new().unwrap();
2362        let mut a = agg(&tmp);
2363        a.observe(&learning(
2364            "some_future_category",
2365            "a future learning shape",
2366            serde_json::json!({"note": "from a later version"}),
2367        ));
2368        a.flush().unwrap();
2369
2370        let md = read_md(a.file_path());
2371        assert!(md.contains("### Other learnings"), "{md}");
2372        assert!(md.contains("- a future learning shape"), "{md}");
2373        assert!(md.contains(r#"{"note":"from a later version"}"#), "{md}");
2374    }
2375
2376    // Task 2.4: ingested deterministic-category records (the aggregator's own,
2377    // flowing back through the publish path) are ignored — no double render,
2378    // no "Other learnings" leak.
2379    #[test]
2380    fn ingested_deterministic_learning_is_ignored() {
2381        let tmp = TempDir::new().unwrap();
2382        let mut a = agg(&tmp);
2383        a.observe(&learning(
2384            CATEGORY_CONFLICT_EVENT,
2385            "forward conflict feat-x and feat-y",
2386            serde_json::json!({"shape": "forward", "agents": ["feat-x", "feat-y"]}),
2387        ));
2388        assert!(a.qualitative_events().is_empty());
2389        a.flush().unwrap();
2390        // Nothing written: the record was dropped and there is no other signal.
2391        assert_eq!(read_md(a.file_path()), "");
2392    }
2393
2394    // Task 2.4: the v0.5.0 deterministic sections still render in their v0.5.0
2395    // shape, even when qualitative records are present in the same flush.
2396    #[test]
2397    fn v0_5_0_sections_unchanged_alongside_qualitative() {
2398        let tmp = TempDir::new().unwrap();
2399        let mut a = agg(&tmp);
2400        // A v0.5.0 deterministic signal.
2401        for _ in 0..PERMISSION_PATTERN_THRESHOLD {
2402            a.record_auto_approve("git status");
2403        }
2404        // A qualitative signal in the same flush.
2405        a.observe(&learning(
2406            CATEGORY_DOC_GAP,
2407            "doc gap",
2408            serde_json::json!({"convention": "c", "suggestion": "s"}),
2409        ));
2410        a.flush().unwrap();
2411
2412        let md = read_md(a.file_path());
2413        // v0.5.0 permission-pattern section + bullet, byte-for-byte shape.
2414        assert!(md.contains("### Permission patterns"));
2415        assert!(md.contains("- `git status` auto-approved 5 times"));
2416        // Qualitative section also present.
2417        assert!(md.contains("### Documentation gaps"));
2418    }
2419
2420    // Task 4 / 6.2: the same recurring_failure_shape published twice in a
2421    // session is rendered once (skill-level dedup reinforced code-side).
2422    #[test]
2423    fn qualitative_dedup_suppresses_same_primary_identifier() {
2424        let tmp = TempDir::new().unwrap();
2425        let mut a = agg(&tmp);
2426        let body = serde_json::json!({
2427            "shape": "import cycle in payments module",
2428            "instances": [{"branch_id": "feat/a"}, {"branch_id": "feat/b"}]
2429        });
2430        a.observe(&learning(
2431            CATEGORY_RECURRING_FAILURE_SHAPE,
2432            "first sighting",
2433            body.clone(),
2434        ));
2435        // Same shape, different title/wording — must be suppressed.
2436        a.observe(&learning(
2437            CATEGORY_RECURRING_FAILURE_SHAPE,
2438            "second sighting, reworded",
2439            body,
2440        ));
2441        assert_eq!(
2442            a.qualitative_events().len(),
2443            1,
2444            "near-duplicate not deduped"
2445        );
2446        a.flush().unwrap();
2447        let md = read_md(a.file_path());
2448        let occurrences = md.matches("import cycle in payments module").count();
2449        assert_eq!(occurrences, 1, "shape rendered more than once:\n{md}");
2450    }
2451
2452    // Distinct primary identifiers within the same category are NOT deduped.
2453    #[test]
2454    fn qualitative_dedup_keeps_distinct_identifiers() {
2455        let tmp = TempDir::new().unwrap();
2456        let mut a = agg(&tmp);
2457        a.observe(&learning(
2458            CATEGORY_DOC_GAP,
2459            "gap one",
2460            serde_json::json!({"convention": "lint before commit", "suggestion": "s1"}),
2461        ));
2462        a.observe(&learning(
2463            CATEGORY_DOC_GAP,
2464            "gap two",
2465            serde_json::json!({"convention": "sign your commits", "suggestion": "s2"}),
2466        ));
2467        assert_eq!(a.qualitative_events().len(), 2);
2468    }
2469
2470    // Two malformed records with the same category but no primary identifier
2471    // are kept distinct via the publisher's deterministic id.
2472    #[test]
2473    fn qualitative_dedup_distinguishes_malformed_by_id() {
2474        let tmp = TempDir::new().unwrap();
2475        let mut a = agg(&tmp);
2476        a.observe(&learning(
2477            CATEGORY_SCOPE_MISTAKE,
2478            "malformed one",
2479            serde_json::json!({"note": "no branches a"}),
2480        ));
2481        a.observe(&learning(
2482            CATEGORY_SCOPE_MISTAKE,
2483            "malformed two",
2484            serde_json::json!({"note": "no branches b"}),
2485        ));
2486        assert_eq!(a.qualitative_events().len(), 2);
2487    }
2488
2489    // Spec scenario `Hour-bucket id collisions are independently handled`:
2490    // two qualitative records with identical canonical input within the same
2491    // UTC hour produce the same deterministic id, so a broker consumer can
2492    // dedupe exact re-emissions even if the skill-level dedup misses.
2493    #[test]
2494    fn qualitative_records_get_identical_ids_within_the_hour() {
2495        let body = serde_json::json!({
2496            "shape": "import cycle in payments module",
2497            "instances": [{"branch_id": "feat/a"}, {"branch_id": "feat/b"}]
2498        });
2499        let a = LearningRecord {
2500            category: CATEGORY_RECURRING_FAILURE_SHAPE.to_string(),
2501            agent_id: LEARNINGS_AGENT_ID.to_string(),
2502            branch_id: None,
2503            title: "first".to_string(),
2504            body: body.clone(),
2505            timestamp: ts_at(13, 5),
2506        };
2507        let b = LearningRecord {
2508            timestamp: ts_at(13, 55),
2509            title: "reworded".to_string(),
2510            ..a.clone()
2511        };
2512        assert_eq!(a.deterministic_id(), b.deterministic_id());
2513        assert_eq!(a.deterministic_id().len(), 16);
2514    }
2515
2516    // Qualitative ingestion never queues a broker publish (the record already
2517    // came from the broker) even when broker publish is enabled.
2518    #[test]
2519    fn qualitative_ingestion_does_not_republish() {
2520        let tmp = TempDir::new().unwrap();
2521        let mut a = agg(&tmp);
2522        a.set_broker_publish(true);
2523        a.observe(&learning(
2524            CATEGORY_DOC_GAP,
2525            "doc gap",
2526            serde_json::json!({"convention": "c", "suggestion": "s"}),
2527        ));
2528        a.flush().unwrap();
2529        assert!(read_md(a.file_path()).contains("### Documentation gaps"));
2530        assert!(
2531            a.take_pending_publish().is_empty(),
2532            "ingested qualitative records must not be re-published"
2533        );
2534    }
2535
2536    // Task 7.4 (idempotency, unit level): two aggregators replaying the same
2537    // input events within the same hour produce records with identical ids,
2538    // so a consumer dedupes them to one.
2539    #[test]
2540    fn replayed_events_within_hour_produce_identical_ids() {
2541        fn run() -> String {
2542            let tmp = TempDir::new().unwrap();
2543            let mut a = agg(&tmp);
2544            a.set_broker_publish(true);
2545            a.register_agent("feat-x");
2546            a.register_agent("feat-y");
2547            a.record_detector_message(&feedback(
2548                "feat-x",
2549                &["[conflict-detector] in-flight conflict with feat-y on src/a.rs"],
2550            ));
2551            a.flush().unwrap();
2552            a.take_pending_publish()[0].deterministic_id()
2553        }
2554        // Both runs fall in the same wall-clock hour (the test runs in well
2555        // under an hour), so the ids match.
2556        assert_eq!(run(), run());
2557    }
2558
2559    // --- Supervisor routing records (supervisor-tell change, design D4) ---
2560
2561    #[test]
2562    fn format_routing_entry_shape() {
2563        let line = format_routing_entry(
2564            "2026-05-28T14:35:09Z",
2565            "feat/x",
2566            "feedback",
2567            "rebase onto main before continuing",
2568        );
2569        assert_eq!(
2570            line,
2571            "- 2026-05-28T14:35:09Z — supervisor told `feat/x` via feedback: \"rebase onto main before continuing\""
2572        );
2573    }
2574
2575    #[test]
2576    fn format_routing_entry_truncates_long_prompt() {
2577        let prompt = "x".repeat(300);
2578        let line = format_routing_entry("T", "feat/x", "send-keys", &prompt);
2579        assert!(
2580            line.ends_with("…\""),
2581            "long prompt should end with …: {line}"
2582        );
2583        // 200 retained chars + the ellipsis.
2584        assert_eq!(prompt.chars().take(ROUTING_PROMPT_MAX_CHARS).count(), 200);
2585        assert!(line.contains(&"x".repeat(ROUTING_PROMPT_MAX_CHARS)));
2586    }
2587
2588    #[test]
2589    fn routing_record_with_learnings_enabled_writes_section() {
2590        let tmp = tempfile::tempdir().unwrap();
2591        let path = tmp.path().join("session-learnings.md");
2592        append_routing_record(
2593            &path,
2594            true,
2595            "2026-05-28T14:35:09Z",
2596            "feat/auth",
2597            "feedback",
2598            "rebase onto main",
2599        )
2600        .unwrap();
2601        let body = std::fs::read_to_string(&path).unwrap();
2602        assert!(body.contains(ROUTING_SECTION_HEADER));
2603        assert!(body.contains("feat/auth"));
2604        assert!(body.contains("via feedback"));
2605        assert!(body.contains("rebase onto main"));
2606
2607        // A second record reuses the existing section header (written once).
2608        append_routing_record(&path, true, "T2", "feat/api", "send-keys", "run it").unwrap();
2609        let body = std::fs::read_to_string(&path).unwrap();
2610        assert_eq!(
2611            body.matches(ROUTING_SECTION_HEADER).count(),
2612            1,
2613            "section header must be written exactly once"
2614        );
2615        assert!(body.contains("feat/api"));
2616    }
2617
2618    #[test]
2619    fn routing_record_with_learnings_disabled_writes_nothing() {
2620        let tmp = tempfile::tempdir().unwrap();
2621        let path = tmp.path().join("session-learnings.md");
2622        append_routing_record(&path, false, "T", "feat/auth", "feedback", "noop").unwrap();
2623        assert!(
2624            !path.exists(),
2625            "learnings = false must not create or write the file"
2626        );
2627    }
2628}