1use std::collections::HashMap;
13use std::fmt::Write as _;
14use std::fs::OpenOptions;
15use std::io::Write;
16use std::path::{Path, PathBuf};
17use std::sync::{Arc, Mutex};
18use std::time::{Duration, SystemTime, UNIX_EPOCH};
19
20use std::hash::Hasher as _;
21
22use chrono::{TimeZone as _, Utc};
23
24use super::messages::{BrokerMessage, LearningPayload};
25
26pub const CONFLICT_DETECTOR_TAG: &str = "[conflict-detector]";
29
30pub const PERMISSION_PATTERN_THRESHOLD: u64 = 5;
33
34pub const CATEGORY_CONFLICT_EVENT: &str = "conflict_event";
36pub const CATEGORY_STUCK_DURATION: &str = "stuck_duration";
38pub const CATEGORY_RECOVERY_CYCLES: &str = "recovery_cycles";
40pub const CATEGORY_PERMISSION_PATTERN: &str = "permission_pattern";
42
43pub const CATEGORY_RECURRING_FAILURE_SHAPE: &str = "recurring_failure_shape";
64pub const CATEGORY_DOC_GAP: &str = "doc_gap";
74pub const CATEGORY_ADR_DRIFT: &str = "adr_drift";
85pub const CATEGORY_SCOPE_MISTAKE: &str = "scope_mistake";
96
97pub const LEARNINGS_AGENT_ID: &str = "supervisor";
102
103#[derive(Debug, Clone, PartialEq)]
114pub struct LearningRecord {
115 pub category: String,
117 pub agent_id: String,
119 pub branch_id: Option<String>,
122 pub title: String,
124 pub body: serde_json::Value,
126 pub timestamp: SystemTime,
129}
130
131impl LearningRecord {
132 #[must_use]
143 pub fn deterministic_id(&self) -> String {
144 let mut canon = String::new();
145 canon.push_str(&self.category);
146 canon.push('|');
147 canon.push_str(self.branch_id.as_deref().unwrap_or(""));
148 canon.push('|');
149 canonical_value(&self.body, &mut canon);
150 canon.push('|');
151 canon.push_str(&hour_bucket(self.timestamp));
152
153 let mut hasher = std::collections::hash_map::DefaultHasher::new();
154 hasher.write(canon.as_bytes());
155 format!("{:016x}", hasher.finish())
156 }
157}
158
159impl From<&LearningRecord> for BrokerMessage {
160 fn from(record: &LearningRecord) -> Self {
161 BrokerMessage::Learning {
162 payload: LearningPayload {
163 id: record.deterministic_id(),
164 agent_id: record.agent_id.clone(),
165 branch_id: record.branch_id.clone(),
166 category: record.category.clone(),
167 title: record.title.clone(),
168 body: record.body.clone(),
169 timestamp: format_iso8601_utc(record.timestamp),
170 },
171 }
172 }
173}
174
175#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct StuckDurationEntry {
178 pub agent_id: String,
180 pub blocked_on: String,
182 pub duration_seconds: u64,
185 pub resolved: bool,
188}
189
190#[derive(Debug, Clone, PartialEq, Eq)]
192pub struct RecoveryCycleEntry {
193 pub agent_id: String,
195 pub count: u32,
198}
199
200#[derive(Debug, Clone, PartialEq, Eq)]
202pub enum ConflictCategory {
203 ForwardConflictIntraSpec {
205 agents: Vec<String>,
207 spec_id: String,
209 regions: Vec<String>,
214 },
215 ForwardConflictCrossSpec {
217 agents: Vec<String>,
219 spec_ids: Vec<String>,
222 regions: Vec<String>,
225 },
226 InFlightConflict {
228 agents: Vec<String>,
230 },
231 OwnershipViolation {
233 violator: String,
235 owner: String,
237 file: String,
239 },
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
244pub struct ConflictEvent {
245 pub category: ConflictCategory,
247}
248
249#[derive(Debug)]
256pub struct LearningsAggregator {
257 pending_blocks: HashMap<String, (SystemTime, String)>,
259 feedback_counts: HashMap<String, u32>,
262 stuck_events: Vec<StuckDurationEntry>,
264 recovery_events: Vec<RecoveryCycleEntry>,
266 conflict_events: Vec<ConflictEvent>,
268 permission_counts: HashMap<String, u64>,
271 permission_emitted: HashMap<String, u64>,
274 stuck_flushed: usize,
276 recovery_flushed: usize,
278 conflict_flushed: usize,
280 h2_written: bool,
282 session_start: SystemTime,
284 permission_threshold: u64,
287 spec_ids: HashMap<String, String>,
290 file_path: PathBuf,
292 known_agents: Vec<String>,
295 broker_publish: bool,
300 pending_publish: Vec<LearningRecord>,
304 qualitative_events: Vec<LearningPayload>,
310 qualitative_flushed: usize,
312}
313
314impl LearningsAggregator {
315 #[must_use]
320 pub fn new(file_path: PathBuf) -> Self {
321 Self::with_threshold(file_path, PERMISSION_PATTERN_THRESHOLD)
322 }
323
324 #[must_use]
326 pub fn with_threshold(file_path: PathBuf, permission_threshold: u64) -> Self {
327 Self {
328 pending_blocks: HashMap::new(),
329 feedback_counts: HashMap::new(),
330 stuck_events: Vec::new(),
331 recovery_events: Vec::new(),
332 conflict_events: Vec::new(),
333 permission_counts: HashMap::new(),
334 permission_emitted: HashMap::new(),
335 stuck_flushed: 0,
336 recovery_flushed: 0,
337 conflict_flushed: 0,
338 h2_written: false,
339 session_start: SystemTime::now(),
340 permission_threshold,
341 spec_ids: HashMap::new(),
342 file_path,
343 known_agents: Vec::new(),
344 broker_publish: false,
345 pending_publish: Vec::new(),
346 qualitative_events: Vec::new(),
347 qualitative_flushed: 0,
348 }
349 }
350
351 pub fn set_broker_publish(&mut self, enabled: bool) {
357 self.broker_publish = enabled;
358 }
359
360 #[must_use]
362 pub fn broker_publish_enabled(&self) -> bool {
363 self.broker_publish
364 }
365
366 pub fn take_pending_publish(&mut self) -> Vec<LearningRecord> {
370 std::mem::take(&mut self.pending_publish)
371 }
372
373 pub fn register_agent(&mut self, agent_id: &str) {
376 if !self.known_agents.iter().any(|a| a == agent_id) {
377 self.known_agents.push(agent_id.to_string());
378 }
379 }
380
381 pub fn set_spec_id(&mut self, agent_id: &str, spec_id: &str) {
384 self.spec_ids
385 .insert(agent_id.to_string(), spec_id.to_string());
386 }
387
388 #[must_use]
390 pub fn file_path(&self) -> &Path {
391 &self.file_path
392 }
393
394 pub fn record_blocked(&mut self, agent_id: &str, blocked_on: &str, ts: SystemTime) {
396 self.register_agent(agent_id);
397 self.pending_blocks
398 .insert(agent_id.to_string(), (ts, blocked_on.to_string()));
399 }
400
401 pub fn record_artifact(&mut self, agent_id: &str, ts: SystemTime) {
404 self.register_agent(agent_id);
405 if let Some((start, blocked_on)) = self.pending_blocks.remove(agent_id) {
406 let duration = ts.duration_since(start).unwrap_or(Duration::ZERO).as_secs();
407 self.stuck_events.push(StuckDurationEntry {
408 agent_id: agent_id.to_string(),
409 blocked_on,
410 duration_seconds: duration,
411 resolved: true,
412 });
413 }
414 }
415
416 pub fn record_feedback(&mut self, target_agent_id: &str) {
419 self.register_agent(target_agent_id);
420 *self
421 .feedback_counts
422 .entry(target_agent_id.to_string())
423 .or_insert(0) += 1;
424 }
425
426 pub fn record_verified(&mut self, target_agent_id: &str) {
429 self.register_agent(target_agent_id);
430 if let Some(count) = self.feedback_counts.remove(target_agent_id)
431 && count >= 1
432 {
433 self.recovery_events.push(RecoveryCycleEntry {
434 agent_id: target_agent_id.to_string(),
435 count,
436 });
437 }
438 }
439
440 pub fn record_auto_approve(&mut self, command_class: &str) {
443 let key = command_class.trim();
444 if key.is_empty() {
445 return;
446 }
447 *self.permission_counts.entry(key.to_string()).or_insert(0) += 1;
448 }
449
450 pub fn record_detector_message(&mut self, msg: &BrokerMessage) {
459 let text = match msg {
460 BrokerMessage::Feedback { payload, .. } => payload.errors.join(" "),
461 BrokerMessage::Question { payload, .. } => payload.question.clone(),
462 _ => return,
463 };
464 if !text.contains(CONFLICT_DETECTOR_TAG) {
465 return;
466 }
467
468 let target = msg.agent_id().to_string();
469 self.register_agent(&target);
470 let others = self.other_agents_in_text(&text, &target);
471 let file = extract_file_token(&text);
472
473 if text.contains("ownership violation") {
474 if let Some(owner) = others.first() {
476 let candidate = ConflictCategory::OwnershipViolation {
477 violator: target.clone(),
478 owner: owner.clone(),
479 file: file.clone().unwrap_or_default(),
480 };
481 if !self.has_conflict_category(&candidate) {
482 self.conflict_events.push(ConflictEvent {
483 category: candidate,
484 });
485 }
486 }
487 return;
488 }
489
490 if text.contains("forward conflict") {
491 if let Some(other) = others.first() {
492 let pair = sorted_pair(&target, other);
493 let regions = extract_regions(&text);
494 let category = self.classify_forward(&pair, regions);
495 if !self.has_conflict_category(&category) {
496 self.conflict_events.push(ConflictEvent { category });
497 }
498 }
499 return;
500 }
501
502 if text.contains("in-flight conflict")
503 && let Some(other) = others.first()
504 {
505 let pair = sorted_pair(&target, other);
506 let category = ConflictCategory::InFlightConflict { agents: pair };
507 if !self.has_conflict_category(&category) {
508 self.conflict_events.push(ConflictEvent { category });
509 }
510 }
511 }
512
513 pub fn observe(&mut self, msg: &BrokerMessage) {
516 match msg {
517 BrokerMessage::Blocked { agent_id, payload } => {
518 self.record_blocked(agent_id, &payload.from, SystemTime::now());
519 }
520 BrokerMessage::Artifact { agent_id, .. } => {
521 self.record_artifact(agent_id, SystemTime::now());
522 }
523 BrokerMessage::Verified { agent_id, .. } => {
524 self.record_verified(agent_id);
525 }
526 BrokerMessage::Feedback {
527 agent_id, payload, ..
528 } => {
529 self.record_feedback(agent_id);
530 let text = payload.errors.join(" ");
533 if text.contains(CONFLICT_DETECTOR_TAG) {
534 self.record_detector_message(msg);
535 }
536 }
537 BrokerMessage::Question { payload, .. } => {
538 if payload.question.contains(CONFLICT_DETECTOR_TAG) {
539 self.record_detector_message(msg);
540 }
541 }
542 BrokerMessage::Status { agent_id, payload } => {
543 if payload.status == "auto_approved"
544 && let Some(cls) = extract_command_class(payload.message.as_deref())
545 {
546 self.record_auto_approve(&cls);
547 }
548 self.register_agent(agent_id);
549 }
550 BrokerMessage::Intent { agent_id, .. } => {
551 self.register_agent(agent_id);
556 }
557 BrokerMessage::Learning { payload } => {
558 self.record_qualitative(payload);
566 }
567 BrokerMessage::VerifyNow { .. } | BrokerMessage::AdvancedMain { .. } => {}
572 }
573 }
574
575 pub fn record_qualitative(&mut self, payload: &LearningPayload) {
587 if is_deterministic_category(&payload.category) {
588 return;
589 }
590 let key = qualitative_dedup_key(payload);
591 if self
592 .qualitative_events
593 .iter()
594 .any(|p| qualitative_dedup_key(p) == key)
595 {
596 return;
597 }
598 self.qualitative_events.push(payload.clone());
599 }
600
601 pub fn flush(&mut self) -> std::io::Result<()> {
604 self.write_flush(false)
605 }
606
607 pub fn flush_at_shutdown(&mut self) -> std::io::Result<()> {
611 let now = SystemTime::now();
612 let pending: Vec<(String, SystemTime, String)> = self
613 .pending_blocks
614 .drain()
615 .map(|(agent, (start, on))| (agent, start, on))
616 .collect();
617 for (agent, start, on) in pending {
618 let duration = now
619 .duration_since(start)
620 .unwrap_or(Duration::ZERO)
621 .as_secs();
622 self.stuck_events.push(StuckDurationEntry {
623 agent_id: agent,
624 blocked_on: on,
625 duration_seconds: duration,
626 resolved: false,
627 });
628 }
629 let pending_recovery: Vec<(String, u32)> = self.feedback_counts.drain().collect();
631 for (agent, count) in pending_recovery {
632 if count >= 1 {
633 self.recovery_events.push(RecoveryCycleEntry {
634 agent_id: agent,
635 count,
636 });
637 }
638 }
639 self.write_flush(true)
640 }
641
642 fn classify_forward(&self, pair: &[String], regions: Vec<String>) -> ConflictCategory {
643 let spec_a = self.spec_ids.get(&pair[0]);
644 let spec_b = self.spec_ids.get(&pair[1]);
645 match (spec_a, spec_b) {
646 (Some(a), Some(b)) if a == b => ConflictCategory::ForwardConflictIntraSpec {
647 agents: pair.to_vec(),
648 spec_id: a.clone(),
649 regions,
650 },
651 (Some(a), Some(b)) => ConflictCategory::ForwardConflictCrossSpec {
652 agents: pair.to_vec(),
653 spec_ids: vec![a.clone(), b.clone()],
654 regions,
655 },
656 _ => ConflictCategory::ForwardConflictCrossSpec {
657 agents: pair.to_vec(),
658 spec_ids: vec![
659 spec_a.cloned().unwrap_or_default(),
660 spec_b.cloned().unwrap_or_default(),
661 ],
662 regions,
663 },
664 }
665 }
666
667 fn has_conflict_category(&self, candidate: &ConflictCategory) -> bool {
668 self.conflict_events
669 .iter()
670 .any(|e| matches_category(&e.category, candidate))
671 }
672
673 fn other_agents_in_text(&self, text: &str, exclude: &str) -> Vec<String> {
674 self.known_agents
675 .iter()
676 .filter(|id| *id != exclude && text.contains(id.as_str()))
677 .cloned()
678 .collect()
679 }
680
681 fn write_flush(&mut self, _shutdown: bool) -> std::io::Result<()> {
682 let broker_publish = self.broker_publish;
683 let new_stuck = &self.stuck_events[self.stuck_flushed..];
684 let new_recovery = &self.recovery_events[self.recovery_flushed..];
685 let new_conflicts = &self.conflict_events[self.conflict_flushed..];
686
687 let permission_entries: Vec<(String, u64)> = {
688 let mut entries: Vec<(String, u64)> = self
689 .permission_counts
690 .iter()
691 .filter(|(class, count)| {
692 **count >= self.permission_threshold
693 && self.permission_emitted.get(*class).copied().unwrap_or(0) < **count
694 })
695 .map(|(k, v)| (k.clone(), *v))
696 .collect();
697 entries.sort_by(|a, b| a.0.cmp(&b.0));
698 entries
699 };
700
701 let new_qualitative = &self.qualitative_events[self.qualitative_flushed..];
702
703 let has_any = !new_stuck.is_empty()
704 || !new_recovery.is_empty()
705 || !new_conflicts.is_empty()
706 || !permission_entries.is_empty()
707 || !new_qualitative.is_empty();
708 if !has_any {
709 return Ok(());
710 }
711
712 let mut out = String::new();
713 if !self.h2_written {
714 let ts = format_iso8601_utc(self.session_start);
715 let _ = writeln!(out, "## Session Learnings — {ts}");
716 self.h2_written = true;
717 }
718
719 if !new_conflicts.is_empty() {
720 out.push_str("\n### Conflict events\n");
721 for ev in new_conflicts {
722 let _ = writeln!(out, "- {}", render_conflict(&ev.category));
723 }
724 }
725 if !new_stuck.is_empty() {
726 out.push_str("\n### Where agents got stuck\n");
727 for ev in new_stuck {
728 let _ = writeln!(out, "- {}", render_stuck(ev));
729 }
730 }
731 if !new_recovery.is_empty() {
732 out.push_str("\n### Recovery cycles\n");
733 for ev in new_recovery {
734 let _ = writeln!(out, "- {}", render_recovery(ev));
735 }
736 }
737 if !permission_entries.is_empty() {
738 out.push_str("\n### Permission patterns\n");
739 for (class, count) in &permission_entries {
740 let _ = writeln!(out, "- {}", render_permission(class, *count));
741 }
742 }
743
744 render_qualitative_sections(new_qualitative, &mut out);
745
746 let records: Vec<LearningRecord> = if broker_publish {
753 let now = SystemTime::now();
754 let mut records =
755 Vec::with_capacity(new_conflicts.len() + new_stuck.len() + new_recovery.len());
756 for ev in new_conflicts {
757 records.push(record_from_conflict(&ev.category, now));
758 }
759 for ev in new_stuck {
760 records.push(record_from_stuck(ev, now));
761 }
762 for ev in new_recovery {
763 records.push(record_from_recovery(ev, now));
764 }
765 for (class, count) in &permission_entries {
766 records.push(record_from_permission(class, *count, now));
767 }
768 records
769 } else {
770 Vec::new()
771 };
772
773 append_to_file(&self.file_path, &out)?;
774
775 self.stuck_flushed = self.stuck_events.len();
776 self.recovery_flushed = self.recovery_events.len();
777 self.conflict_flushed = self.conflict_events.len();
778 self.qualitative_flushed = self.qualitative_events.len();
779 for (class, count) in &permission_entries {
780 self.permission_emitted.insert(class.clone(), *count);
781 }
782 self.pending_publish.extend(records);
785 Ok(())
786 }
787
788 #[cfg(test)]
789 fn stuck_events(&self) -> &[StuckDurationEntry] {
790 &self.stuck_events
791 }
792
793 #[cfg(test)]
794 fn recovery_events(&self) -> &[RecoveryCycleEntry] {
795 &self.recovery_events
796 }
797
798 #[cfg(test)]
799 fn conflict_events(&self) -> &[ConflictEvent] {
800 &self.conflict_events
801 }
802
803 #[cfg(test)]
804 fn qualitative_events(&self) -> &[LearningPayload] {
805 &self.qualitative_events
806 }
807}
808
809pub type SharedLearnings = Arc<Mutex<LearningsAggregator>>;
812
813fn append_to_file(path: &Path, contents: &str) -> std::io::Result<()> {
814 if let Some(parent) = path.parent()
815 && !parent.as_os_str().is_empty()
816 {
817 std::fs::create_dir_all(parent)?;
818 }
819 let mut file = OpenOptions::new().create(true).append(true).open(path)?;
820 file.write_all(contents.as_bytes())
821}
822
823pub const ROUTING_SECTION_HEADER: &str = "### Supervisor routing";
826
827pub const ROUTING_PROMPT_MAX_CHARS: usize = 200;
831
832#[must_use]
840pub fn format_routing_entry(ts_iso: &str, target: &str, mode: &str, prompt: &str) -> String {
841 let trimmed = prompt.trim();
842 let shown = if trimmed.chars().count() > ROUTING_PROMPT_MAX_CHARS {
843 let mut s: String = trimmed.chars().take(ROUTING_PROMPT_MAX_CHARS).collect();
844 s.push('…');
845 s
846 } else {
847 trimmed.to_string()
848 };
849 format!("- {ts_iso} — supervisor told `{target}` via {mode}: \"{shown}\"")
850}
851
852pub fn append_routing_record(
864 path: &Path,
865 enabled: bool,
866 ts_iso: &str,
867 target: &str,
868 mode: &str,
869 prompt: &str,
870) -> std::io::Result<()> {
871 if !enabled {
872 return Ok(());
873 }
874 let needs_header = match std::fs::read_to_string(path) {
875 Ok(existing) => !existing.contains(ROUTING_SECTION_HEADER),
876 Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
877 Err(e) => return Err(e),
878 };
879 let mut block = String::new();
880 if needs_header {
881 block.push('\n');
882 block.push_str(ROUTING_SECTION_HEADER);
883 block.push('\n');
884 }
885 block.push_str(&format_routing_entry(ts_iso, target, mode, prompt));
886 block.push('\n');
887 append_to_file(path, &block)
888}
889
890fn canonical_value(value: &serde_json::Value, out: &mut String) {
896 use serde_json::Value;
897 match value {
898 Value::Object(map) => {
899 let mut keys: Vec<&String> = map.keys().collect();
900 keys.sort();
901 out.push('{');
902 for (i, key) in keys.iter().enumerate() {
903 if i > 0 {
904 out.push(',');
905 }
906 out.push_str(key);
907 out.push(':');
908 canonical_value(&map[*key], out);
909 }
910 out.push('}');
911 }
912 Value::Array(items) => {
913 out.push('[');
914 for (i, item) in items.iter().enumerate() {
915 if i > 0 {
916 out.push(',');
917 }
918 canonical_value(item, out);
919 }
920 out.push(']');
921 }
922 other => out.push_str(&other.to_string()),
923 }
924}
925
926fn hour_bucket(time: SystemTime) -> String {
928 let secs = time.duration_since(UNIX_EPOCH).map_or(0, |d| d.as_secs());
929 Utc.timestamp_opt(i64::try_from(secs).unwrap_or(0), 0)
930 .single()
931 .map(|dt| dt.format("%Y-%m-%dT%H").to_string())
932 .unwrap_or_default()
933}
934
935fn sorted_pair(a: &str, b: &str) -> Vec<String> {
936 let mut pair = vec![a.to_string(), b.to_string()];
937 pair.sort();
938 pair
939}
940
941fn matches_category(a: &ConflictCategory, b: &ConflictCategory) -> bool {
942 use ConflictCategory::{
943 ForwardConflictCrossSpec, ForwardConflictIntraSpec, InFlightConflict, OwnershipViolation,
944 };
945 match (a, b) {
946 (
947 ForwardConflictIntraSpec { agents: x, .. },
948 ForwardConflictIntraSpec { agents: y, .. },
949 )
950 | (
951 ForwardConflictCrossSpec { agents: x, .. },
952 ForwardConflictCrossSpec { agents: y, .. },
953 )
954 | (InFlightConflict { agents: x }, InFlightConflict { agents: y }) => x == y,
955 (
956 OwnershipViolation {
957 violator: vx,
958 owner: ox,
959 file: fx,
960 },
961 OwnershipViolation {
962 violator: vy,
963 owner: oy,
964 file: fy,
965 },
966 ) => vx == vy && ox == oy && fx == fy,
967 _ => false,
968 }
969}
970
971fn extract_file_token(text: &str) -> Option<String> {
972 text.split_whitespace()
975 .find(|tok| {
976 let cleaned = tok.trim_matches(|c: char| !c.is_alphanumeric() && c != '/' && c != '.');
977 cleaned.contains('.') && cleaned.contains('/')
978 })
979 .map(|tok| {
980 tok.trim_matches(|c: char| !c.is_alphanumeric() && c != '/' && c != '.')
981 .to_string()
982 })
983}
984
985fn extract_regions(text: &str) -> Vec<String> {
995 let mut out: Vec<String> = Vec::new();
996 let mut rest = text;
997 while let Some(start) = rest.find("(regions: ") {
998 let after = &rest[start + "(regions: ".len()..];
999 let Some(end) = after.find(')') else { break };
1000 for descriptor in after[..end].split(',') {
1001 let trimmed = descriptor.trim();
1002 if !trimmed.is_empty() && !out.iter().any(|d| d == trimmed) {
1003 out.push(trimmed.to_string());
1004 }
1005 }
1006 rest = &after[end..];
1007 }
1008 out
1009}
1010
1011fn extract_command_class(message: Option<&str>) -> Option<String> {
1012 let msg = message?;
1013 msg.strip_prefix("auto_approved: matched ")
1014 .map(|rest| rest.trim().to_string())
1015 .filter(|s| !s.is_empty())
1016}
1017
1018fn record_from_conflict(cat: &ConflictCategory, now: SystemTime) -> LearningRecord {
1023 use serde_json::json;
1024 let body = match cat {
1025 ConflictCategory::ForwardConflictIntraSpec {
1026 agents,
1027 spec_id,
1028 regions,
1029 } => {
1030 let mut b = json!({
1031 "shape": "forward_intra_spec",
1032 "agents": agents,
1033 "spec_id": spec_id,
1034 });
1035 if !regions.is_empty() {
1036 b["regions"] = json!(regions);
1037 }
1038 b
1039 }
1040 ConflictCategory::ForwardConflictCrossSpec {
1041 agents,
1042 spec_ids,
1043 regions,
1044 } => {
1045 let mut b = json!({
1046 "shape": "forward_cross_spec",
1047 "agents": agents,
1048 "spec_ids": spec_ids,
1049 });
1050 if !regions.is_empty() {
1051 b["regions"] = json!(regions);
1052 }
1053 b
1054 }
1055 ConflictCategory::InFlightConflict { agents } => json!({
1056 "shape": "in_flight",
1057 "agents": agents,
1058 }),
1059 ConflictCategory::OwnershipViolation {
1060 violator,
1061 owner,
1062 file,
1063 } => json!({
1064 "shape": "ownership_violation",
1065 "violator": violator,
1066 "owner": owner,
1067 "file": file,
1068 }),
1069 };
1070 LearningRecord {
1071 category: CATEGORY_CONFLICT_EVENT.to_string(),
1072 agent_id: LEARNINGS_AGENT_ID.to_string(),
1073 branch_id: None,
1074 title: render_conflict(cat),
1075 body,
1076 timestamp: now,
1077 }
1078}
1079
1080fn record_from_stuck(ev: &StuckDurationEntry, now: SystemTime) -> LearningRecord {
1083 LearningRecord {
1084 category: CATEGORY_STUCK_DURATION.to_string(),
1085 agent_id: LEARNINGS_AGENT_ID.to_string(),
1086 branch_id: Some(ev.agent_id.clone()),
1087 title: render_stuck(ev),
1088 body: serde_json::json!({
1089 "agent_id": ev.agent_id,
1090 "blocked_on": ev.blocked_on,
1091 "duration_seconds": ev.duration_seconds,
1092 "resolved": ev.resolved,
1093 }),
1094 timestamp: now,
1095 }
1096}
1097
1098fn record_from_recovery(ev: &RecoveryCycleEntry, now: SystemTime) -> LearningRecord {
1101 LearningRecord {
1102 category: CATEGORY_RECOVERY_CYCLES.to_string(),
1103 agent_id: LEARNINGS_AGENT_ID.to_string(),
1104 branch_id: Some(ev.agent_id.clone()),
1105 title: render_recovery(ev),
1106 body: serde_json::json!({
1107 "agent_id": ev.agent_id,
1108 "count": ev.count,
1109 }),
1110 timestamp: now,
1111 }
1112}
1113
1114fn record_from_permission(class: &str, count: u64, now: SystemTime) -> LearningRecord {
1118 LearningRecord {
1119 category: CATEGORY_PERMISSION_PATTERN.to_string(),
1120 agent_id: LEARNINGS_AGENT_ID.to_string(),
1121 branch_id: None,
1122 title: render_permission(class, count),
1123 body: serde_json::json!({
1124 "command_class": class,
1125 "count": count,
1126 }),
1127 timestamp: now,
1128 }
1129}
1130
1131fn render_conflict(cat: &ConflictCategory) -> String {
1132 match cat {
1133 ConflictCategory::ForwardConflictIntraSpec {
1134 agents, spec_id, ..
1135 } => {
1136 format!(
1137 "forward-conflict-intra-spec: {} (spec {})",
1138 agents.join(" and "),
1139 spec_id
1140 )
1141 }
1142 ConflictCategory::ForwardConflictCrossSpec {
1143 agents, spec_ids, ..
1144 } => {
1145 let specs: Vec<String> = spec_ids.iter().filter(|s| !s.is_empty()).cloned().collect();
1146 if specs.is_empty() {
1147 format!("forward-conflict-cross-spec: {}", agents.join(" and "))
1148 } else {
1149 format!(
1150 "forward-conflict-cross-spec: {} (specs {})",
1151 agents.join(" and "),
1152 specs.join(", ")
1153 )
1154 }
1155 }
1156 ConflictCategory::InFlightConflict { agents } => {
1157 format!("in-flight-conflict: {}", agents.join(" and "))
1158 }
1159 ConflictCategory::OwnershipViolation {
1160 violator,
1161 owner,
1162 file,
1163 } => {
1164 if file.is_empty() {
1165 format!("ownership-violation: {violator} edited a file owned by {owner}")
1166 } else {
1167 format!("ownership-violation: {violator} edited `{file}` owned by {owner}")
1168 }
1169 }
1170 }
1171}
1172
1173fn render_stuck(ev: &StuckDurationEntry) -> String {
1174 let dur = format_duration(ev.duration_seconds);
1175 let suffix = if ev.resolved {
1176 String::new()
1177 } else {
1178 " (unresolved at session end)".to_string()
1179 };
1180 format!(
1181 "{}: blocked {dur} waiting on {}{suffix}",
1182 ev.agent_id, ev.blocked_on
1183 )
1184}
1185
1186fn render_recovery(ev: &RecoveryCycleEntry) -> String {
1187 let cycles = if ev.count == 1 { "cycle" } else { "cycles" };
1188 format!(
1189 "{}: {} feedback {cycles} before verifying",
1190 ev.agent_id, ev.count
1191 )
1192}
1193
1194fn render_permission(class: &str, count: u64) -> String {
1195 format!("`{class}` auto-approved {count} times")
1196}
1197
1198const QUALITATIVE_SECTIONS: &[(&str, &str)] = &[
1204 (CATEGORY_RECURRING_FAILURE_SHAPE, "Recurring failure shapes"),
1205 (CATEGORY_DOC_GAP, "Documentation gaps"),
1206 (CATEGORY_ADR_DRIFT, "ADR / architectural drift"),
1207 (CATEGORY_SCOPE_MISTAKE, "Scope-mistake signals"),
1208];
1209
1210fn is_deterministic_category(category: &str) -> bool {
1215 matches!(
1216 category,
1217 CATEGORY_CONFLICT_EVENT
1218 | CATEGORY_STUCK_DURATION
1219 | CATEGORY_RECOVERY_CYCLES
1220 | CATEGORY_PERMISSION_PATTERN
1221 )
1222}
1223
1224fn render_qualitative_sections(new_qualitative: &[LearningPayload], out: &mut String) {
1229 for (category, header) in QUALITATIVE_SECTIONS {
1230 let mut wrote_header = false;
1231 for p in new_qualitative.iter().filter(|p| &p.category == category) {
1232 if !wrote_header {
1233 let _ = writeln!(out, "\n### {header}");
1234 wrote_header = true;
1235 }
1236 out.push_str(&render_qualitative(p));
1237 }
1238 }
1239 let mut wrote_other = false;
1240 for p in new_qualitative
1241 .iter()
1242 .filter(|p| qualitative_section(&p.category).is_none())
1243 {
1244 if !wrote_other {
1245 out.push_str("\n### Other learnings\n");
1246 wrote_other = true;
1247 }
1248 out.push_str(&render_qualitative(p));
1249 }
1250}
1251
1252fn qualitative_section(category: &str) -> Option<&'static str> {
1255 QUALITATIVE_SECTIONS
1256 .iter()
1257 .find(|(cat, _)| *cat == category)
1258 .map(|(_, header)| *header)
1259}
1260
1261fn string_field(body: &serde_json::Value, key: &str) -> Option<String> {
1263 body.get(key).and_then(|v| v.as_str()).map(str::to_string)
1264}
1265
1266fn sorted_array_field(body: &serde_json::Value, key: &str) -> Option<String> {
1270 let arr = body.get(key)?.as_array()?;
1271 let mut items: Vec<String> = arr
1272 .iter()
1273 .map(|v| {
1274 v.as_str()
1275 .map_or_else(|| v.to_string(), std::string::ToString::to_string)
1276 })
1277 .collect();
1278 items.sort();
1279 Some(items.join(","))
1280}
1281
1282fn qualitative_dedup_key(p: &LearningPayload) -> String {
1288 let primary = match p.category.as_str() {
1289 CATEGORY_RECURRING_FAILURE_SHAPE => string_field(&p.body, "shape"),
1290 CATEGORY_DOC_GAP => string_field(&p.body, "convention"),
1291 CATEGORY_ADR_DRIFT => string_field(&p.body, "decision_area"),
1292 CATEGORY_SCOPE_MISTAKE => sorted_array_field(&p.body, "branches"),
1293 _ => None,
1294 };
1295 match primary {
1296 Some(id) => format!("{}|{}", p.category, id),
1297 None => format!("{}|#{}", p.category, p.id),
1298 }
1299}
1300
1301fn compact_json(value: &serde_json::Value) -> String {
1304 serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
1305}
1306
1307fn render_qualitative(p: &LearningPayload) -> String {
1312 render_qualitative_structured(p).unwrap_or_else(|| render_qualitative_fallback(p))
1313}
1314
1315fn render_qualitative_fallback(p: &LearningPayload) -> String {
1318 format!("- {}\n {}\n", p.title, compact_json(&p.body))
1319}
1320
1321fn render_qualitative_structured(p: &LearningPayload) -> Option<String> {
1325 match p.category.as_str() {
1326 CATEGORY_RECURRING_FAILURE_SHAPE => {
1327 let shape = string_field(&p.body, "shape")?;
1328 let instances = p.body.get("instances")?.as_array()?;
1329 let branches: Vec<String> = instances
1330 .iter()
1331 .filter_map(|i| i.get("branch_id").and_then(|v| v.as_str()))
1332 .map(str::to_string)
1333 .collect();
1334 let n = instances.len();
1335 let noun = if n == 1 { "instance" } else { "instances" };
1336 let across = if branches.is_empty() {
1337 String::new()
1338 } else {
1339 format!(" across {}", branches.join(", "))
1340 };
1341 Some(format!("- {shape}: {n} {noun}{across}\n"))
1342 }
1343 CATEGORY_DOC_GAP => {
1344 let convention = string_field(&p.body, "convention")?;
1345 let suggestion = string_field(&p.body, "suggestion")?;
1346 Some(format!("- {convention} — {suggestion}\n"))
1347 }
1348 CATEGORY_ADR_DRIFT => {
1349 let area = string_field(&p.body, "decision_area")?;
1350 let observed = string_field(&p.body, "observed_pattern")?;
1351 Some(format!("- {area}: {observed}\n"))
1352 }
1353 CATEGORY_SCOPE_MISTAKE => {
1354 let branches = p.body.get("branches")?.as_array()?;
1355 let names: Vec<String> = branches
1356 .iter()
1357 .filter_map(|v| v.as_str())
1358 .map(str::to_string)
1359 .collect();
1360 if names.is_empty() {
1361 return None;
1362 }
1363 let suggestion = string_field(&p.body, "suggestion")?;
1364 Some(format!("- {} — {suggestion}\n", names.join(" and ")))
1365 }
1366 _ => None,
1367 }
1368}
1369
1370fn format_duration(seconds: u64) -> String {
1371 let m = seconds / 60;
1372 let s = seconds % 60;
1373 if m == 0 {
1374 format!("{s}s")
1375 } else {
1376 format!("{m}m{s:02}s")
1377 }
1378}
1379
1380fn format_iso8601_utc(time: SystemTime) -> String {
1381 let secs = time.duration_since(UNIX_EPOCH).map_or(0, |d| d.as_secs());
1382 let (year, month, day, hour, min, sec) = secs_to_civil(secs);
1383 format!("{year:04}-{month:02}-{day:02}T{hour:02}:{min:02}:{sec:02}Z")
1384}
1385
1386#[allow(clippy::cast_possible_wrap)]
1387#[allow(clippy::cast_sign_loss)]
1388fn secs_to_civil(secs: u64) -> (u64, u64, u64, u64, u64, u64) {
1389 let sec_of_day = secs % 86400;
1390 let hour = sec_of_day / 3600;
1391 let min = (sec_of_day % 3600) / 60;
1392 let sec = sec_of_day % 60;
1393
1394 let mut days = (secs / 86400) as i64;
1395 days += 719_468;
1396 let era = days.div_euclid(146_097);
1397 let doe = (days - era * 146_097) as u64;
1398 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
1399 let y = yoe as i64 + era * 400;
1400 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1401 let mp = (5 * doy + 2) / 153;
1402 let d = doy - (153 * mp + 2) / 5 + 1;
1403 let m = if mp < 10 { mp + 3 } else { mp - 9 };
1404 let y = if m <= 2 { y + 1 } else { y };
1405 (y as u64, m, d, hour, min, sec)
1406}
1407
1408#[cfg(test)]
1409mod tests {
1410 use super::*;
1411 use crate::broker::messages::{
1412 ArtifactPayload, BlockedPayload, FeedbackPayload, QuestionPayload, StatusPayload,
1413 VerifiedPayload,
1414 };
1415 use std::time::Duration;
1416 use tempfile::TempDir;
1417
1418 fn agg(tmp: &TempDir) -> LearningsAggregator {
1419 LearningsAggregator::new(tmp.path().join("session-learnings.md"))
1420 }
1421
1422 fn read_md(path: &Path) -> String {
1423 std::fs::read_to_string(path).unwrap_or_default()
1424 }
1425
1426 fn blocked(agent: &str, from: &str) -> BrokerMessage {
1427 BrokerMessage::Blocked {
1428 agent_id: agent.to_string(),
1429 payload: BlockedPayload {
1430 needs: "x".to_string(),
1431 from: from.to_string(),
1432 },
1433 }
1434 }
1435
1436 fn artifact(agent: &str) -> BrokerMessage {
1437 BrokerMessage::Artifact {
1438 agent_id: agent.to_string(),
1439 payload: ArtifactPayload {
1440 status: "done".to_string(),
1441 exports: vec![],
1442 modified_files: vec![],
1443 },
1444 }
1445 }
1446
1447 fn feedback(target: &str, errors: &[&str]) -> BrokerMessage {
1448 BrokerMessage::Feedback {
1449 agent_id: target.to_string(),
1450 payload: FeedbackPayload {
1451 from: "supervisor".to_string(),
1452 errors: errors.iter().map(|s| (*s).to_string()).collect(),
1453 },
1454 }
1455 }
1456
1457 fn verified(target: &str) -> BrokerMessage {
1458 BrokerMessage::Verified {
1459 agent_id: target.to_string(),
1460 payload: VerifiedPayload {
1461 verified_by: "supervisor".to_string(),
1462 message: None,
1463 },
1464 }
1465 }
1466
1467 fn question(text: &str) -> BrokerMessage {
1468 BrokerMessage::Question {
1469 agent_id: "supervisor".to_string(),
1470 payload: QuestionPayload {
1471 question: text.to_string(),
1472 },
1473 }
1474 }
1475
1476 fn auto_approve_status(agent: &str, class: &str) -> BrokerMessage {
1477 BrokerMessage::Status {
1478 agent_id: agent.to_string(),
1479 payload: StatusPayload {
1480 status: "auto_approved".to_string(),
1481 modified_files: vec![],
1482 message: Some(format!("auto_approved: matched {class}")),
1483 ..Default::default()
1484 },
1485 }
1486 }
1487
1488 #[test]
1489 fn stuck_duration_resolved_on_artifact() {
1490 let tmp = TempDir::new().unwrap();
1491 let mut a = agg(&tmp);
1492 let t0 = SystemTime::now();
1493 a.record_blocked("x", "y", t0);
1494 a.record_artifact("x", t0 + Duration::from_secs(672));
1495 let events = a.stuck_events();
1496 assert_eq!(events.len(), 1);
1497 assert_eq!(events[0].agent_id, "x");
1498 assert_eq!(events[0].blocked_on, "y");
1499 assert!((670..=674).contains(&events[0].duration_seconds));
1500 assert!(events[0].resolved);
1501
1502 a.flush().unwrap();
1503 let md = read_md(a.file_path());
1504 assert!(md.contains("### Where agents got stuck"));
1505 assert!(md.contains("x: blocked"));
1506 assert!(md.contains("waiting on y"));
1507 }
1508
1509 #[test]
1510 fn stuck_duration_unresolved_at_shutdown() {
1511 let tmp = TempDir::new().unwrap();
1512 let mut a = agg(&tmp);
1513 let t0 = SystemTime::now() - Duration::from_mins(2);
1514 a.record_blocked("x", "y", t0);
1515 a.flush_at_shutdown().unwrap();
1516 let events = a.stuck_events();
1517 assert_eq!(events.len(), 1);
1518 assert!(!events[0].resolved);
1519 assert!(events[0].duration_seconds >= 119);
1520 let md = read_md(a.file_path());
1521 assert!(md.contains("unresolved at session end"));
1522 }
1523
1524 #[test]
1525 fn recovery_cycles_recorded_on_verify() {
1526 let tmp = TempDir::new().unwrap();
1527 let mut a = agg(&tmp);
1528 a.record_feedback("x");
1529 a.record_feedback("x");
1530 a.record_feedback("x");
1531 a.record_verified("x");
1532 assert_eq!(a.recovery_events().len(), 1);
1533 assert_eq!(a.recovery_events()[0].agent_id, "x");
1534 assert_eq!(a.recovery_events()[0].count, 3);
1535 }
1536
1537 #[test]
1538 fn recovery_cycles_zero_count_skipped() {
1539 let tmp = TempDir::new().unwrap();
1540 let mut a = agg(&tmp);
1541 a.record_verified("x");
1542 assert!(a.recovery_events().is_empty());
1543 a.flush().unwrap();
1544 assert_eq!(read_md(a.file_path()), "");
1545 }
1546
1547 #[test]
1548 fn forward_conflict_intra_spec_recorded_once() {
1549 let tmp = TempDir::new().unwrap();
1550 let mut a = agg(&tmp);
1551 a.register_agent("feat-x");
1552 a.register_agent("feat-y");
1553 a.set_spec_id("feat-x", "003-user-list");
1554 a.set_spec_id("feat-y", "003-user-list");
1555
1556 a.record_detector_message(&feedback(
1557 "feat-x",
1558 &["[conflict-detector] forward conflict with feat-y on src/main.rs"],
1559 ));
1560 a.record_detector_message(&feedback(
1561 "feat-y",
1562 &["[conflict-detector] forward conflict with feat-x on src/main.rs"],
1563 ));
1564
1565 let events = a.conflict_events();
1566 assert_eq!(events.len(), 1);
1567 match &events[0].category {
1568 ConflictCategory::ForwardConflictIntraSpec {
1569 agents, spec_id, ..
1570 } => {
1571 assert_eq!(agents, &vec!["feat-x".to_string(), "feat-y".to_string()]);
1572 assert_eq!(spec_id, "003-user-list");
1573 }
1574 other => panic!("expected intra-spec, got {other:?}"),
1575 }
1576 }
1577
1578 #[test]
1579 fn forward_conflict_region_aware_body_includes_regions() {
1580 let tmp = TempDir::new().unwrap();
1584 let mut a = agg(&tmp);
1585 a.register_agent("feat-x");
1586 a.register_agent("feat-y");
1587 a.record_detector_message(&feedback(
1588 "feat-y",
1589 &["[conflict-detector] forward conflict: agent feat-x also intends to modify: src/auth.rs (regions: function validate_token, function refresh_session)"],
1590 ));
1591 let events = a.conflict_events();
1592 assert_eq!(events.len(), 1);
1593 let record = record_from_conflict(&events[0].category, SystemTime::now());
1594 let regions = record.body.get("regions").expect("regions field present");
1595 assert_eq!(
1596 regions,
1597 &serde_json::json!(["function validate_token", "function refresh_session"])
1598 );
1599 }
1600
1601 #[test]
1602 fn forward_conflict_file_level_body_omits_regions() {
1603 let tmp = TempDir::new().unwrap();
1605 let mut a = agg(&tmp);
1606 a.register_agent("feat-x");
1607 a.register_agent("feat-y");
1608 a.record_detector_message(&feedback(
1609 "feat-y",
1610 &["[conflict-detector] forward conflict: agent feat-x also intends to modify: src/main.rs"],
1611 ));
1612 let events = a.conflict_events();
1613 assert_eq!(events.len(), 1);
1614 let record = record_from_conflict(&events[0].category, SystemTime::now());
1615 assert!(
1616 record.body.get("regions").is_none(),
1617 "file-level conflict must omit regions; got {:?}",
1618 record.body
1619 );
1620 }
1621
1622 #[test]
1623 fn extract_regions_parses_descriptors() {
1624 assert_eq!(
1625 extract_regions(
1626 "foo src/a.rs (regions: function f, range 10-30); src/b.rs (regions: class C)"
1627 ),
1628 vec![
1629 "function f".to_string(),
1630 "range 10-30".to_string(),
1631 "class C".to_string()
1632 ]
1633 );
1634 assert!(extract_regions("no regions here, just src/a.rs").is_empty());
1635 }
1636
1637 #[test]
1638 fn forward_conflict_cross_spec_records_specs() {
1639 let tmp = TempDir::new().unwrap();
1640 let mut a = agg(&tmp);
1641 a.register_agent("feat-x");
1642 a.register_agent("feat-y");
1643 a.set_spec_id("feat-x", "003-user-list");
1644 a.set_spec_id("feat-y", "004-error-handling");
1645
1646 a.record_detector_message(&feedback(
1647 "feat-x",
1648 &["[conflict-detector] forward conflict with feat-y on src/main.rs"],
1649 ));
1650 a.record_detector_message(&feedback(
1651 "feat-y",
1652 &["[conflict-detector] forward conflict with feat-x on src/main.rs"],
1653 ));
1654
1655 let events = a.conflict_events();
1656 assert_eq!(events.len(), 1);
1657 match &events[0].category {
1658 ConflictCategory::ForwardConflictCrossSpec {
1659 agents, spec_ids, ..
1660 } => {
1661 assert_eq!(agents, &vec!["feat-x".to_string(), "feat-y".to_string()]);
1662 assert!(spec_ids.iter().any(|s| s == "003-user-list"));
1663 assert!(spec_ids.iter().any(|s| s == "004-error-handling"));
1664 }
1665 other => panic!("expected cross-spec, got {other:?}"),
1666 }
1667 }
1668
1669 #[test]
1670 fn in_flight_conflict_classified() {
1671 let tmp = TempDir::new().unwrap();
1672 let mut a = agg(&tmp);
1673 a.register_agent("feat-x");
1674 a.register_agent("feat-y");
1675 a.record_detector_message(&feedback(
1676 "feat-x",
1677 &["[conflict-detector] in-flight conflict with feat-y on src/a.rs"],
1678 ));
1679 a.record_detector_message(&feedback(
1680 "feat-y",
1681 &["[conflict-detector] in-flight conflict with feat-x on src/a.rs"],
1682 ));
1683 let events = a.conflict_events();
1684 assert_eq!(events.len(), 1);
1685 assert!(matches!(
1686 events[0].category,
1687 ConflictCategory::InFlightConflict { .. }
1688 ));
1689 }
1690
1691 #[test]
1692 fn ownership_violation_classified() {
1693 let tmp = TempDir::new().unwrap();
1694 let mut a = agg(&tmp);
1695 a.register_agent("feat-x");
1696 a.register_agent("feat-y");
1697 a.record_detector_message(&feedback(
1698 "feat-y",
1699 &["[conflict-detector] ownership violation on src/a.rs claimed by feat-x"],
1700 ));
1701 let events = a.conflict_events();
1702 assert_eq!(events.len(), 1);
1703 match &events[0].category {
1704 ConflictCategory::OwnershipViolation {
1705 violator,
1706 owner,
1707 file,
1708 } => {
1709 assert_eq!(violator, "feat-y");
1710 assert_eq!(owner, "feat-x");
1711 assert_eq!(file, "src/a.rs");
1712 }
1713 other => panic!("expected ownership-violation, got {other:?}"),
1714 }
1715 }
1716
1717 #[test]
1718 fn detector_question_to_supervisor_is_classified() {
1719 let tmp = TempDir::new().unwrap();
1720 let mut a = agg(&tmp);
1721 a.register_agent("feat-x");
1722 a.register_agent("feat-y");
1723 a.record_detector_message(&question(
1724 "[conflict-detector] in-flight conflict between feat-x and feat-y on src/a.rs",
1725 ));
1726 let events = a.conflict_events();
1729 assert_eq!(events.len(), 1);
1730 assert!(matches!(
1731 events[0].category,
1732 ConflictCategory::InFlightConflict { .. }
1733 ));
1734 }
1735
1736 #[test]
1737 fn permission_pattern_above_threshold_emits_entry() {
1738 let tmp = TempDir::new().unwrap();
1739 let mut a = agg(&tmp);
1740 for _ in 0..23 {
1741 a.record_auto_approve("cargo check");
1742 }
1743 a.flush().unwrap();
1744 let md = read_md(a.file_path());
1745 assert!(md.contains("### Permission patterns"));
1746 assert!(md.contains("`cargo check` auto-approved 23 times"));
1747 }
1748
1749 #[test]
1750 fn permission_pattern_below_threshold_omitted_then_emitted_later() {
1751 let tmp = TempDir::new().unwrap();
1752 let mut a = agg(&tmp);
1753 a.record_auto_approve("git status");
1754 a.record_auto_approve("git status");
1755 a.flush().unwrap();
1757 let md1 = read_md(a.file_path());
1758 assert!(!md1.contains("git status"));
1759
1760 for _ in 0..5 {
1762 a.record_auto_approve("git status");
1763 }
1764 a.flush().unwrap();
1765 let md2 = read_md(a.file_path());
1766 assert!(md2.contains("`git status` auto-approved 7 times"));
1767 }
1768
1769 #[test]
1770 fn no_learnings_session_writes_nothing() {
1771 let tmp = TempDir::new().unwrap();
1772 let mut a = agg(&tmp);
1773 a.flush().unwrap();
1774 a.flush_at_shutdown().unwrap();
1775 assert_eq!(read_md(a.file_path()), "");
1776 assert!(!a.file_path().exists() || read_md(a.file_path()).is_empty());
1777 }
1778
1779 #[test]
1780 fn flush_writes_h2_header_once_per_session() {
1781 let tmp = TempDir::new().unwrap();
1782 let mut a = agg(&tmp);
1783 for _ in 0..PERMISSION_PATTERN_THRESHOLD {
1784 a.record_auto_approve("cargo check");
1785 }
1786 a.flush().unwrap();
1787 a.record_feedback("alpha");
1789 a.record_verified("alpha");
1790 a.flush().unwrap();
1791
1792 let md = read_md(a.file_path());
1793 let h2_count = md.matches("## Session Learnings — ").count();
1794 assert_eq!(h2_count, 1, "expected exactly one H2, got\n{md}");
1795 let h2_line = md
1797 .lines()
1798 .find(|l| l.starts_with("## Session Learnings — "))
1799 .unwrap();
1800 let ts = h2_line.trim_start_matches("## Session Learnings — ").trim();
1801 assert!(
1802 regex_like_iso(ts),
1803 "H2 timestamp did not match ISO regex: {ts:?}"
1804 );
1805 }
1806
1807 fn regex_like_iso(s: &str) -> bool {
1808 let bytes = s.as_bytes();
1810 if bytes.len() != 20 {
1811 return false;
1812 }
1813 for (i, b) in bytes.iter().enumerate() {
1814 let ok = match i {
1815 4 | 7 => *b == b'-',
1816 10 => *b == b'T',
1817 13 | 16 => *b == b':',
1818 19 => *b == b'Z',
1819 _ => b.is_ascii_digit(),
1820 };
1821 if !ok {
1822 return false;
1823 }
1824 }
1825 true
1826 }
1827
1828 #[test]
1829 fn second_session_appends_new_h2_preserves_prior_content() {
1830 let tmp = TempDir::new().unwrap();
1831 let path = tmp.path().join("session-learnings.md");
1832 let mut a1 = LearningsAggregator::new(path.clone());
1833 for _ in 0..PERMISSION_PATTERN_THRESHOLD {
1834 a1.record_auto_approve("cargo check");
1835 }
1836 a1.flush().unwrap();
1837 let after_first = read_md(&path);
1838 assert!(after_first.contains("`cargo check`"));
1839
1840 std::thread::sleep(Duration::from_secs(1));
1842
1843 let mut a2 = LearningsAggregator::new(path.clone());
1844 for _ in 0..PERMISSION_PATTERN_THRESHOLD {
1845 a2.record_auto_approve("cargo fmt");
1846 }
1847 a2.flush().unwrap();
1848 let after_second = read_md(&path);
1849 assert!(after_second.starts_with(after_first.trim_end()));
1851 let h2_count = after_second.matches("## Session Learnings — ").count();
1853 assert_eq!(h2_count, 2);
1854 assert!(after_second.contains("`cargo fmt`"));
1855 }
1856
1857 #[test]
1858 fn observe_routes_blocked_and_artifact() {
1859 let tmp = TempDir::new().unwrap();
1860 let mut a = agg(&tmp);
1861 a.observe(&blocked("x", "y"));
1862 a.observe(&artifact("x"));
1863 assert_eq!(a.stuck_events().len(), 1);
1864 }
1865
1866 #[test]
1867 fn observe_increments_feedback_then_records_recovery() {
1868 let tmp = TempDir::new().unwrap();
1869 let mut a = agg(&tmp);
1870 for _ in 0..3 {
1871 a.observe(&feedback("x", &["test failed"]));
1872 }
1873 a.observe(&verified("x"));
1874 assert_eq!(a.recovery_events().len(), 1);
1875 assert_eq!(a.recovery_events()[0].count, 3);
1876 }
1877
1878 #[test]
1879 fn observe_auto_approve_increments_counter() {
1880 let tmp = TempDir::new().unwrap();
1881 let mut a = agg(&tmp);
1882 for _ in 0..PERMISSION_PATTERN_THRESHOLD {
1883 a.observe(&auto_approve_status("feat-x", "cargo check"));
1884 }
1885 a.flush().unwrap();
1886 assert!(read_md(a.file_path()).contains("`cargo check` auto-approved"));
1887 }
1888
1889 #[test]
1890 fn extract_command_class_parses_matched_entry() {
1891 assert_eq!(
1892 extract_command_class(Some("auto_approved: matched cargo check")),
1893 Some("cargo check".to_string())
1894 );
1895 assert_eq!(extract_command_class(Some("auto_approved")), None);
1896 assert_eq!(extract_command_class(None), None);
1897 }
1898
1899 #[test]
1904 fn empty_categories_are_omitted_from_markdown() {
1905 let tmp = TempDir::new().unwrap();
1906 let mut a = agg(&tmp);
1907 a.register_agent("feat-x");
1908 a.register_agent("feat-y");
1909 a.record_detector_message(&feedback(
1910 "feat-x",
1911 &["[conflict-detector] in-flight conflict with feat-y on src/a.rs"],
1912 ));
1913 a.flush().unwrap();
1914
1915 let md = read_md(a.file_path());
1916 assert!(md.contains("### Conflict events"));
1917 assert!(
1918 !md.contains("### Where agents got stuck"),
1919 "stuck heading should be omitted when there are no stuck events:\n{md}"
1920 );
1921 assert!(
1922 !md.contains("### Recovery cycles"),
1923 "recovery heading should be omitted when there are no recovery events:\n{md}"
1924 );
1925 assert!(
1926 !md.contains("### Permission patterns"),
1927 "permission heading should be omitted when there are no permission entries:\n{md}"
1928 );
1929 }
1930
1931 #[test]
1935 fn burst_of_events_does_not_write_until_flush() {
1936 let tmp = TempDir::new().unwrap();
1937 let mut a = agg(&tmp);
1938 a.register_agent("feat-x");
1939 a.register_agent("feat-y");
1940 for _ in 0..5 {
1941 a.record_detector_message(&feedback(
1942 "feat-x",
1943 &["[conflict-detector] in-flight conflict with feat-y on src/a.rs"],
1944 ));
1945 }
1946 assert!(
1948 !a.file_path().exists() || read_md(a.file_path()).is_empty(),
1949 "aggregator wrote eagerly without a flush call"
1950 );
1951 a.flush().unwrap();
1953 let md = read_md(a.file_path());
1954 assert!(md.contains("### Conflict events"));
1958 }
1959
1960 #[test]
1965 fn broker_message_has_agent_learning_variant() {
1966 use crate::broker::messages::LearningPayload;
1967
1968 let probe = r#"{"type":"agent.learning","payload":{"id":"abc123def456abcd","agent_id":"supervisor","branch_id":"feat/x","category":"conflict_event","title":"forward conflict","body":{"shape":"forward"},"timestamp":"2026-05-28T12:01:01Z"}}"#;
1970 let msg = BrokerMessage::from_json(probe)
1971 .expect("a well-formed agent.learning envelope must deserialise");
1972 let BrokerMessage::Learning { payload } = &msg else {
1973 panic!("expected Learning, got {msg:?}");
1974 };
1975 assert_eq!(payload.category, "conflict_event");
1976 assert_eq!(payload.agent_id, "supervisor");
1977 assert_eq!(payload.branch_id.as_deref(), Some("feat/x"));
1978 assert_eq!(msg.status_label(), "learning");
1979
1980 let round = BrokerMessage::Learning {
1982 payload: LearningPayload {
1983 id: "abc123def456abcd".to_string(),
1984 agent_id: "supervisor".to_string(),
1985 branch_id: None,
1986 category: "permission_pattern".to_string(),
1987 title: "`cargo check` auto-approved 23 times".to_string(),
1988 body: serde_json::json!({"command_class": "cargo check", "count": 23}),
1989 timestamp: "2026-05-28T12:01:01Z".to_string(),
1990 },
1991 };
1992 let json = serde_json::to_string(&round).unwrap();
1993 assert!(json.contains("\"type\":\"agent.learning\""));
1994 }
1995
1996 #[test]
2000 fn wiring_predicate_only_enables_when_supervisor_and_learnings_both_true() {
2001 use crate::config::{LearningsConfig, SupervisorConfig};
2002
2003 fn should_attach(s: Option<&SupervisorConfig>) -> bool {
2008 s.is_some_and(|s| s.enabled && s.learnings)
2009 }
2010
2011 assert!(!should_attach(None));
2013
2014 assert!(!should_attach(Some(&SupervisorConfig {
2016 enabled: false,
2017 learnings: true,
2018 learnings_config: LearningsConfig::default(),
2019 ..SupervisorConfig::default()
2020 })));
2021
2022 assert!(!should_attach(Some(&SupervisorConfig {
2024 enabled: true,
2025 learnings: false,
2026 learnings_config: LearningsConfig::default(),
2027 ..SupervisorConfig::default()
2028 })));
2029
2030 assert!(should_attach(Some(&SupervisorConfig {
2032 enabled: true,
2033 learnings: true,
2034 learnings_config: LearningsConfig::default(),
2035 ..SupervisorConfig::default()
2036 })));
2037 }
2038
2039 #[test]
2042 fn default_flush_interval_is_60_seconds() {
2043 use crate::config::LearningsConfig;
2044 let cfg = LearningsConfig::default();
2045 assert_eq!(
2046 cfg.flush_interval_seconds, 60,
2047 "LearningsConfig::default().flush_interval_seconds must be 60"
2048 );
2049 }
2050
2051 fn ts_at(hour: u64, minute: u64) -> SystemTime {
2056 const DAY_START: u64 = 1_779_926_400;
2058 UNIX_EPOCH + Duration::from_secs(DAY_START + hour * 3600 + minute * 60)
2059 }
2060
2061 fn sample_record(category: &str, branch: Option<&str>, ts: SystemTime) -> LearningRecord {
2062 LearningRecord {
2063 category: category.to_string(),
2064 agent_id: LEARNINGS_AGENT_ID.to_string(),
2065 branch_id: branch.map(str::to_string),
2066 title: "title".to_string(),
2067 body: serde_json::json!({"agents": ["feat-x", "feat-y"], "files": ["src/a.rs"]}),
2068 timestamp: ts,
2069 }
2070 }
2071
2072 #[test]
2074 fn same_record_within_the_hour_gets_same_id() {
2075 let a = sample_record(CATEGORY_CONFLICT_EVENT, Some("feat-x"), ts_at(13, 30));
2076 let b = sample_record(CATEGORY_CONFLICT_EVENT, Some("feat-x"), ts_at(13, 59));
2077 assert_eq!(a.deterministic_id(), b.deterministic_id());
2078 assert_eq!(a.deterministic_id().len(), 16);
2080 assert!(a.deterministic_id().chars().all(|c| c.is_ascii_hexdigit()));
2081 }
2082
2083 #[test]
2085 fn same_record_across_hour_boundary_gets_different_ids() {
2086 let a = sample_record(CATEGORY_CONFLICT_EVENT, Some("feat-x"), ts_at(13, 59));
2087 let b = sample_record(CATEGORY_CONFLICT_EVENT, Some("feat-x"), ts_at(14, 1));
2088 assert_ne!(a.deterministic_id(), b.deterministic_id());
2089 }
2090
2091 #[test]
2094 fn different_categories_get_different_ids_with_identical_body() {
2095 let ts = ts_at(13, 30);
2096 let a = sample_record(CATEGORY_CONFLICT_EVENT, Some("feat-x"), ts);
2097 let b = sample_record(CATEGORY_STUCK_DURATION, Some("feat-x"), ts);
2098 assert_ne!(a.deterministic_id(), b.deterministic_id());
2099 }
2100
2101 #[test]
2102 fn id_is_independent_of_body_key_insertion_order() {
2103 let ts = ts_at(13, 30);
2104 let mut a = sample_record(CATEGORY_CONFLICT_EVENT, None, ts);
2105 a.body = serde_json::json!({"alpha": 1, "beta": 2});
2106 let mut b = sample_record(CATEGORY_CONFLICT_EVENT, None, ts);
2107 b.body = serde_json::json!({"beta": 2, "alpha": 1});
2108 assert_eq!(a.deterministic_id(), b.deterministic_id());
2109 }
2110
2111 #[test]
2112 fn branch_id_distinguishes_otherwise_identical_records() {
2113 let ts = ts_at(13, 30);
2114 let a = sample_record(CATEGORY_STUCK_DURATION, Some("feat-x"), ts);
2115 let b = sample_record(CATEGORY_STUCK_DURATION, Some("feat-y"), ts);
2116 assert_ne!(a.deterministic_id(), b.deterministic_id());
2117 }
2118
2119 #[test]
2122 fn all_four_categories_round_trip_through_broker_message() {
2123 let now = ts_at(12, 1);
2124 let records = [
2125 record_from_conflict(
2126 &ConflictCategory::InFlightConflict {
2127 agents: vec!["feat-x".to_string(), "feat-y".to_string()],
2128 },
2129 now,
2130 ),
2131 record_from_stuck(
2132 &StuckDurationEntry {
2133 agent_id: "feat-x".to_string(),
2134 blocked_on: "feat-y".to_string(),
2135 duration_seconds: 672,
2136 resolved: true,
2137 },
2138 now,
2139 ),
2140 record_from_recovery(
2141 &RecoveryCycleEntry {
2142 agent_id: "feat-x".to_string(),
2143 count: 3,
2144 },
2145 now,
2146 ),
2147 record_from_permission("cargo check", 23, now),
2148 ];
2149 let expected_categories = [
2150 CATEGORY_CONFLICT_EVENT,
2151 CATEGORY_STUCK_DURATION,
2152 CATEGORY_RECOVERY_CYCLES,
2153 CATEGORY_PERMISSION_PATTERN,
2154 ];
2155 for (record, expected_category) in records.iter().zip(expected_categories) {
2156 let msg = BrokerMessage::from(record);
2157 let json = serde_json::to_string(&msg).unwrap();
2158 let back = BrokerMessage::from_json(&json)
2159 .unwrap_or_else(|e| panic!("{expected_category} must round-trip: {e}"));
2160 let BrokerMessage::Learning { payload } = back else {
2161 panic!("expected Learning variant for {expected_category}");
2162 };
2163 assert_eq!(payload.category, expected_category);
2164 assert_eq!(payload.id, record.deterministic_id());
2165 assert_eq!(payload.agent_id, LEARNINGS_AGENT_ID);
2166 assert!(!payload.title.is_empty());
2167 }
2168 }
2169
2170 #[test]
2171 fn conflict_and_permission_records_are_cross_cutting_no_branch() {
2172 let now = ts_at(12, 1);
2173 let conflict = record_from_conflict(
2174 &ConflictCategory::InFlightConflict {
2175 agents: vec!["feat-x".to_string(), "feat-y".to_string()],
2176 },
2177 now,
2178 );
2179 let permission = record_from_permission("cargo check", 9, now);
2180 assert_eq!(conflict.branch_id, None);
2181 assert_eq!(permission.branch_id, None);
2182 }
2183
2184 #[test]
2185 fn stuck_and_recovery_records_are_branch_scoped() {
2186 let now = ts_at(12, 1);
2187 let stuck = record_from_stuck(
2188 &StuckDurationEntry {
2189 agent_id: "feat-x".to_string(),
2190 blocked_on: "feat-y".to_string(),
2191 duration_seconds: 60,
2192 resolved: false,
2193 },
2194 now,
2195 );
2196 let recovery = record_from_recovery(
2197 &RecoveryCycleEntry {
2198 agent_id: "feat-z".to_string(),
2199 count: 2,
2200 },
2201 now,
2202 );
2203 assert_eq!(stuck.branch_id.as_deref(), Some("feat-x"));
2204 assert_eq!(recovery.branch_id.as_deref(), Some("feat-z"));
2205 }
2206
2207 #[test]
2213 fn broker_publish_off_queues_no_records() {
2214 let tmp = TempDir::new().unwrap();
2215 let mut a = agg(&tmp);
2216 assert!(!a.broker_publish_enabled());
2217 for _ in 0..PERMISSION_PATTERN_THRESHOLD {
2218 a.record_auto_approve("cargo check");
2219 }
2220 a.flush().unwrap();
2221 assert!(read_md(a.file_path()).contains("`cargo check`"));
2222 assert!(
2223 a.take_pending_publish().is_empty(),
2224 "no records should be queued when broker publish is disabled"
2225 );
2226 }
2227
2228 #[test]
2231 fn broker_publish_on_queues_records_matching_file() {
2232 let tmp = TempDir::new().unwrap();
2233 let mut a = agg(&tmp);
2234 a.set_broker_publish(true);
2235 a.register_agent("feat-x");
2236 a.register_agent("feat-y");
2237 a.record_detector_message(&feedback(
2238 "feat-x",
2239 &["[conflict-detector] in-flight conflict with feat-y on src/a.rs"],
2240 ));
2241 a.flush().unwrap();
2242
2243 let md = read_md(a.file_path());
2244 assert!(md.contains("### Conflict events"));
2245 let records = a.take_pending_publish();
2246 assert_eq!(records.len(), 1, "one conflict record should be queued");
2247 assert_eq!(records[0].category, CATEGORY_CONFLICT_EVENT);
2248 assert!(md.contains(&records[0].title));
2250 assert!(a.take_pending_publish().is_empty());
2252 }
2253
2254 use crate::broker::messages::LearningPayload;
2257
2258 fn learning(category: &str, title: &str, body: serde_json::Value) -> BrokerMessage {
2261 BrokerMessage::Learning {
2262 payload: LearningPayload {
2263 id: format!("id-{category}-{title}"),
2264 agent_id: LEARNINGS_AGENT_ID.to_string(),
2265 branch_id: None,
2266 category: category.to_string(),
2267 title: title.to_string(),
2268 body,
2269 timestamp: "2026-06-05T12:00:00Z".to_string(),
2270 },
2271 }
2272 }
2273
2274 #[test]
2276 fn each_qualitative_category_routes_to_its_section() {
2277 let tmp = TempDir::new().unwrap();
2278 let mut a = agg(&tmp);
2279 a.observe(&learning(
2280 CATEGORY_RECURRING_FAILURE_SHAPE,
2281 "import cycle recurs",
2282 serde_json::json!({
2283 "shape": "import cycle in payments module",
2284 "instances": [
2285 {"branch_id": "feat/a", "feedback_id": "f1", "excerpt": "..."},
2286 {"branch_id": "feat/b", "feedback_id": "f2", "excerpt": "..."}
2287 ]
2288 }),
2289 ));
2290 a.observe(&learning(
2291 CATEGORY_DOC_GAP,
2292 "lint-before-commit undocumented",
2293 serde_json::json!({
2294 "convention": "agents run lint before commit",
2295 "evidence_paths": ["AGENTS.md"],
2296 "suggestion": "add a Conventions section to AGENTS.md"
2297 }),
2298 ));
2299 a.observe(&learning(
2300 CATEGORY_ADR_DRIFT,
2301 "async runtime undocumented",
2302 serde_json::json!({
2303 "decision_area": "async runtime",
2304 "observed_pattern": "a background runtime added in the broker server",
2305 "configured_adr_path": "docs/adr",
2306 "candidate_adr_title": "ADR-0007: Adopt an async runtime"
2307 }),
2308 ));
2309 a.observe(&learning(
2310 CATEGORY_SCOPE_MISTAKE,
2311 "two branches over-coordinated",
2312 serde_json::json!({
2313 "branches": ["feat/a", "feat/b"],
2314 "shared_files": ["src/router"],
2315 "coordination_events": [],
2316 "suggestion": "merge the feat/a and feat/b scopes"
2317 }),
2318 ));
2319 a.flush().unwrap();
2320
2321 let md = read_md(a.file_path());
2322 assert!(md.contains("### Recurring failure shapes"), "{md}");
2323 assert!(md.contains("import cycle in payments module: 2 instances across feat/a, feat/b"));
2324 assert!(md.contains("### Documentation gaps"), "{md}");
2325 assert!(
2326 md.contains("- agents run lint before commit — add a Conventions section to AGENTS.md")
2327 );
2328 assert!(md.contains("### ADR / architectural drift"), "{md}");
2329 assert!(md.contains("- async runtime: a background runtime added in the broker server"));
2330 assert!(md.contains("### Scope-mistake signals"), "{md}");
2331 assert!(md.contains("- feat/a and feat/b — merge the feat/a and feat/b scopes"));
2332 }
2333
2334 #[test]
2337 fn malformed_qualitative_body_renders_as_title_plus_json() {
2338 let tmp = TempDir::new().unwrap();
2339 let mut a = agg(&tmp);
2340 a.observe(&learning(
2342 CATEGORY_RECURRING_FAILURE_SHAPE,
2343 "vague shape with no instances",
2344 serde_json::json!({"shape": "something fuzzy"}),
2345 ));
2346 a.flush().unwrap();
2347
2348 let md = read_md(a.file_path());
2349 assert!(md.contains("### Recurring failure shapes"), "{md}");
2351 assert!(md.contains("- vague shape with no instances"), "{md}");
2353 assert!(md.contains(r#"{"shape":"something fuzzy"}"#), "{md}");
2355 }
2356
2357 #[test]
2360 fn unknown_category_falls_through_to_other_learnings() {
2361 let tmp = TempDir::new().unwrap();
2362 let mut a = agg(&tmp);
2363 a.observe(&learning(
2364 "some_future_category",
2365 "a future learning shape",
2366 serde_json::json!({"note": "from a later version"}),
2367 ));
2368 a.flush().unwrap();
2369
2370 let md = read_md(a.file_path());
2371 assert!(md.contains("### Other learnings"), "{md}");
2372 assert!(md.contains("- a future learning shape"), "{md}");
2373 assert!(md.contains(r#"{"note":"from a later version"}"#), "{md}");
2374 }
2375
2376 #[test]
2380 fn ingested_deterministic_learning_is_ignored() {
2381 let tmp = TempDir::new().unwrap();
2382 let mut a = agg(&tmp);
2383 a.observe(&learning(
2384 CATEGORY_CONFLICT_EVENT,
2385 "forward conflict feat-x and feat-y",
2386 serde_json::json!({"shape": "forward", "agents": ["feat-x", "feat-y"]}),
2387 ));
2388 assert!(a.qualitative_events().is_empty());
2389 a.flush().unwrap();
2390 assert_eq!(read_md(a.file_path()), "");
2392 }
2393
2394 #[test]
2397 fn v0_5_0_sections_unchanged_alongside_qualitative() {
2398 let tmp = TempDir::new().unwrap();
2399 let mut a = agg(&tmp);
2400 for _ in 0..PERMISSION_PATTERN_THRESHOLD {
2402 a.record_auto_approve("git status");
2403 }
2404 a.observe(&learning(
2406 CATEGORY_DOC_GAP,
2407 "doc gap",
2408 serde_json::json!({"convention": "c", "suggestion": "s"}),
2409 ));
2410 a.flush().unwrap();
2411
2412 let md = read_md(a.file_path());
2413 assert!(md.contains("### Permission patterns"));
2415 assert!(md.contains("- `git status` auto-approved 5 times"));
2416 assert!(md.contains("### Documentation gaps"));
2418 }
2419
2420 #[test]
2423 fn qualitative_dedup_suppresses_same_primary_identifier() {
2424 let tmp = TempDir::new().unwrap();
2425 let mut a = agg(&tmp);
2426 let body = serde_json::json!({
2427 "shape": "import cycle in payments module",
2428 "instances": [{"branch_id": "feat/a"}, {"branch_id": "feat/b"}]
2429 });
2430 a.observe(&learning(
2431 CATEGORY_RECURRING_FAILURE_SHAPE,
2432 "first sighting",
2433 body.clone(),
2434 ));
2435 a.observe(&learning(
2437 CATEGORY_RECURRING_FAILURE_SHAPE,
2438 "second sighting, reworded",
2439 body,
2440 ));
2441 assert_eq!(
2442 a.qualitative_events().len(),
2443 1,
2444 "near-duplicate not deduped"
2445 );
2446 a.flush().unwrap();
2447 let md = read_md(a.file_path());
2448 let occurrences = md.matches("import cycle in payments module").count();
2449 assert_eq!(occurrences, 1, "shape rendered more than once:\n{md}");
2450 }
2451
2452 #[test]
2454 fn qualitative_dedup_keeps_distinct_identifiers() {
2455 let tmp = TempDir::new().unwrap();
2456 let mut a = agg(&tmp);
2457 a.observe(&learning(
2458 CATEGORY_DOC_GAP,
2459 "gap one",
2460 serde_json::json!({"convention": "lint before commit", "suggestion": "s1"}),
2461 ));
2462 a.observe(&learning(
2463 CATEGORY_DOC_GAP,
2464 "gap two",
2465 serde_json::json!({"convention": "sign your commits", "suggestion": "s2"}),
2466 ));
2467 assert_eq!(a.qualitative_events().len(), 2);
2468 }
2469
2470 #[test]
2473 fn qualitative_dedup_distinguishes_malformed_by_id() {
2474 let tmp = TempDir::new().unwrap();
2475 let mut a = agg(&tmp);
2476 a.observe(&learning(
2477 CATEGORY_SCOPE_MISTAKE,
2478 "malformed one",
2479 serde_json::json!({"note": "no branches a"}),
2480 ));
2481 a.observe(&learning(
2482 CATEGORY_SCOPE_MISTAKE,
2483 "malformed two",
2484 serde_json::json!({"note": "no branches b"}),
2485 ));
2486 assert_eq!(a.qualitative_events().len(), 2);
2487 }
2488
2489 #[test]
2494 fn qualitative_records_get_identical_ids_within_the_hour() {
2495 let body = serde_json::json!({
2496 "shape": "import cycle in payments module",
2497 "instances": [{"branch_id": "feat/a"}, {"branch_id": "feat/b"}]
2498 });
2499 let a = LearningRecord {
2500 category: CATEGORY_RECURRING_FAILURE_SHAPE.to_string(),
2501 agent_id: LEARNINGS_AGENT_ID.to_string(),
2502 branch_id: None,
2503 title: "first".to_string(),
2504 body: body.clone(),
2505 timestamp: ts_at(13, 5),
2506 };
2507 let b = LearningRecord {
2508 timestamp: ts_at(13, 55),
2509 title: "reworded".to_string(),
2510 ..a.clone()
2511 };
2512 assert_eq!(a.deterministic_id(), b.deterministic_id());
2513 assert_eq!(a.deterministic_id().len(), 16);
2514 }
2515
2516 #[test]
2519 fn qualitative_ingestion_does_not_republish() {
2520 let tmp = TempDir::new().unwrap();
2521 let mut a = agg(&tmp);
2522 a.set_broker_publish(true);
2523 a.observe(&learning(
2524 CATEGORY_DOC_GAP,
2525 "doc gap",
2526 serde_json::json!({"convention": "c", "suggestion": "s"}),
2527 ));
2528 a.flush().unwrap();
2529 assert!(read_md(a.file_path()).contains("### Documentation gaps"));
2530 assert!(
2531 a.take_pending_publish().is_empty(),
2532 "ingested qualitative records must not be re-published"
2533 );
2534 }
2535
2536 #[test]
2540 fn replayed_events_within_hour_produce_identical_ids() {
2541 fn run() -> String {
2542 let tmp = TempDir::new().unwrap();
2543 let mut a = agg(&tmp);
2544 a.set_broker_publish(true);
2545 a.register_agent("feat-x");
2546 a.register_agent("feat-y");
2547 a.record_detector_message(&feedback(
2548 "feat-x",
2549 &["[conflict-detector] in-flight conflict with feat-y on src/a.rs"],
2550 ));
2551 a.flush().unwrap();
2552 a.take_pending_publish()[0].deterministic_id()
2553 }
2554 assert_eq!(run(), run());
2557 }
2558
2559 #[test]
2562 fn format_routing_entry_shape() {
2563 let line = format_routing_entry(
2564 "2026-05-28T14:35:09Z",
2565 "feat/x",
2566 "feedback",
2567 "rebase onto main before continuing",
2568 );
2569 assert_eq!(
2570 line,
2571 "- 2026-05-28T14:35:09Z — supervisor told `feat/x` via feedback: \"rebase onto main before continuing\""
2572 );
2573 }
2574
2575 #[test]
2576 fn format_routing_entry_truncates_long_prompt() {
2577 let prompt = "x".repeat(300);
2578 let line = format_routing_entry("T", "feat/x", "send-keys", &prompt);
2579 assert!(
2580 line.ends_with("…\""),
2581 "long prompt should end with …: {line}"
2582 );
2583 assert_eq!(prompt.chars().take(ROUTING_PROMPT_MAX_CHARS).count(), 200);
2585 assert!(line.contains(&"x".repeat(ROUTING_PROMPT_MAX_CHARS)));
2586 }
2587
2588 #[test]
2589 fn routing_record_with_learnings_enabled_writes_section() {
2590 let tmp = tempfile::tempdir().unwrap();
2591 let path = tmp.path().join("session-learnings.md");
2592 append_routing_record(
2593 &path,
2594 true,
2595 "2026-05-28T14:35:09Z",
2596 "feat/auth",
2597 "feedback",
2598 "rebase onto main",
2599 )
2600 .unwrap();
2601 let body = std::fs::read_to_string(&path).unwrap();
2602 assert!(body.contains(ROUTING_SECTION_HEADER));
2603 assert!(body.contains("feat/auth"));
2604 assert!(body.contains("via feedback"));
2605 assert!(body.contains("rebase onto main"));
2606
2607 append_routing_record(&path, true, "T2", "feat/api", "send-keys", "run it").unwrap();
2609 let body = std::fs::read_to_string(&path).unwrap();
2610 assert_eq!(
2611 body.matches(ROUTING_SECTION_HEADER).count(),
2612 1,
2613 "section header must be written exactly once"
2614 );
2615 assert!(body.contains("feat/api"));
2616 }
2617
2618 #[test]
2619 fn routing_record_with_learnings_disabled_writes_nothing() {
2620 let tmp = tempfile::tempdir().unwrap();
2621 let path = tmp.path().join("session-learnings.md");
2622 append_routing_record(&path, false, "T", "feat/auth", "feedback", "noop").unwrap();
2623 assert!(
2624 !path.exists(),
2625 "learnings = false must not create or write the file"
2626 );
2627 }
2628}