Skip to main content

git_paw/broker/
conflict.rs

1//! Broker-internal conflict detector.
2//!
3//! Runs alongside the message-delivery pipeline when supervisor mode is
4//! active. The detector observes `agent.intent` and `agent.status` events,
5//! maintains an in-memory tracker of active intents and per-agent current
6//! file claims, and auto-emits `agent.feedback` / `agent.question` when
7//! one of three failure shapes triggers:
8//!
9//! - **Forward conflict** — two agents publish `agent.intent` messages
10//!   that overlap on at least one file. Both publishers are warned via
11//!   `agent.feedback`. Each ordered pair is warned at most once until one
12//!   intent is replaced or expires.
13//! - **In-flight conflict** — two agents' `agent.status.modified_files`
14//!   sets overlap on a file. Both branches are warned. If neither agent
15//!   stops touching the file within `[supervisor.conflict] window_seconds`
16//!   an escalation `agent.question` is published to the supervisor inbox.
17//! - **Ownership violation** — an agent's `modified_files` include a file
18//!   outside its own active `agent.intent` *and* inside another active
19//!   agent's intent. The violator gets `agent.feedback`. If
20//!   `[supervisor.conflict] escalate_on_violation = true`, an
21//!   `agent.question` also reaches the supervisor inbox.
22//!
23//! Auto-emitted messages use `from = "supervisor"` and prefix their text
24//! with `[conflict-detector]` so dashboards and humans can distinguish
25//! detector-emitted feedback from human-typed supervisor feedback.
26
27use std::collections::{HashMap, HashSet};
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30
31use super::messages::{
32    BrokerMessage, FeedbackPayload, FileIntent, IntentPayload, QuestionPayload, Region,
33    StatusPayload,
34};
35use super::{BrokerState, delivery};
36use crate::config::ConflictConfig;
37
38/// Detector-internal normalised form of one `files` entry.
39///
40/// Both wire shapes ([`FileIntent::Path`] and [`FileIntent::Detailed`])
41/// collapse into this single shape so the detector never branches on the
42/// publisher's wire form. `regions: None` means file-level intent (the
43/// v0.5.0 default and the safe fallback); `Some(..)` carries declared
44/// regions. An empty `regions` vec on the wire collapses to `None`.
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct NormalizedFileIntent {
47    /// The file path.
48    pub path: String,
49    /// Declared regions, or `None` for a file-level intent.
50    pub regions: Option<Vec<Region>>,
51}
52
53impl From<FileIntent> for NormalizedFileIntent {
54    fn from(fi: FileIntent) -> Self {
55        match fi {
56            FileIntent::Path(path) => Self {
57                path,
58                regions: None,
59            },
60            FileIntent::Detailed { path, regions } => Self {
61                path,
62                // An empty regions vec is equivalent to no regions declared.
63                regions: if regions.is_empty() {
64                    None
65                } else {
66                    Some(regions)
67                },
68            },
69        }
70    }
71}
72
73/// One file's contribution to a forward conflict between two intents.
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct FileConflict {
76    /// The shared file path.
77    pub path: String,
78    /// Intersecting regions naming why the file conflicts. Empty means a
79    /// file-level conflict (at least one side declared no regions).
80    pub regions: Vec<Region>,
81    /// `true` when at least one intersecting pair was a conservative
82    /// cross-kind match (a named region vs a line range).
83    pub cross_kind: bool,
84}
85
86/// A forward conflict between the queried agent and one other agent.
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct ForwardConflict {
89    /// The other agent whose intent overlaps.
90    pub other_agent: String,
91    /// The conflicting files (each with its intersecting regions).
92    pub files: Vec<FileConflict>,
93}
94
95/// Returns `true` when two inclusive line intervals overlap.
96fn ranges_overlap(s1: u32, e1: u32, s2: u32, e2: u32) -> bool {
97    s1 <= e2 && s2 <= e1
98}
99
100/// Computes the intersection of two region sets per the detector rules:
101///
102/// - Same kind + matching name (function / class / block) → intersect,
103///   contributing that region.
104/// - Two ranges with overlapping intervals → intersect, contributing the
105///   overlapping sub-range.
106/// - A named region vs a range (either order) → conservative intersect
107///   (we can't resolve a name to lines without source parsing); contributes
108///   both regions and sets the cross-kind flag.
109/// - Named regions of differing kinds, or same kind with differing names →
110///   no intersection.
111///
112/// Returns the de-duplicated list of intersecting regions (sorted for
113/// deterministic output) and whether any cross-kind match occurred.
114fn regions_intersect(a: &[Region], b: &[Region]) -> (Vec<Region>, bool) {
115    let mut hits: Vec<Region> = Vec::new();
116    let mut cross_kind = false;
117    let push = |r: Region, hits: &mut Vec<Region>| {
118        if !hits.contains(&r) {
119            hits.push(r);
120        }
121    };
122    for ra in a {
123        for rb in b {
124            match (ra, rb) {
125                (Region::Function { name: n1 }, Region::Function { name: n2 })
126                | (Region::Class { name: n1 }, Region::Class { name: n2 })
127                | (Region::Block { anchor: n1 }, Region::Block { anchor: n2 })
128                    if n1 == n2 =>
129                {
130                    push(ra.clone(), &mut hits);
131                }
132                (
133                    Region::Range {
134                        start_line: s1,
135                        end_line: e1,
136                    },
137                    Region::Range {
138                        start_line: s2,
139                        end_line: e2,
140                    },
141                ) if ranges_overlap(*s1, *e1, *s2, *e2) => {
142                    push(
143                        Region::Range {
144                            start_line: (*s1).max(*s2),
145                            end_line: (*e1).min(*e2),
146                        },
147                        &mut hits,
148                    );
149                }
150                // Cross-kind: a named region vs a range, in either order.
151                // We can't resolve the name to lines without source parsing,
152                // so we intersect conservatively and name both regions.
153                (
154                    Region::Range { .. },
155                    Region::Function { .. } | Region::Class { .. } | Region::Block { .. },
156                )
157                | (
158                    Region::Function { .. } | Region::Class { .. } | Region::Block { .. },
159                    Region::Range { .. },
160                ) => {
161                    cross_kind = true;
162                    push(ra.clone(), &mut hits);
163                    push(rb.clone(), &mut hits);
164                }
165                // Same-shape named regions with differing names, or differing
166                // named kinds: no intersection.
167                _ => {}
168            }
169        }
170    }
171    hits.sort_by_key(Region::to_string);
172    hits.dedup();
173    (hits, cross_kind)
174}
175
176/// Sender identifier used for all auto-emitted detector messages. Lets
177/// recipients (and the dashboard) treat detector output the same as
178/// human-typed supervisor feedback while the leading `[conflict-detector]`
179/// token disambiguates inside the text.
180pub const CONFLICT_DETECTOR_SENDER: &str = "supervisor";
181
182/// Token that prefixes every detector-emitted error / question text. Used
183/// by skill tests and the dashboard to identify auto-warnings.
184pub const CONFLICT_DETECTOR_TAG: &str = "[conflict-detector]";
185
186/// One agent's currently-active intent declaration.
187#[derive(Debug, Clone)]
188pub struct IntentRecord {
189    /// Publishing agent's ID.
190    pub agent_id: String,
191    /// File paths the agent intends to modify, each mapped to its declared
192    /// regions (`None` = file-level intent / no regions declared).
193    pub files: HashMap<String, Option<Vec<Region>>>,
194    /// Human-readable summary of the planned change.
195    pub summary: String,
196    /// When the intent was received.
197    pub received_at: Instant,
198    /// Relative TTL — the entry is dropped when
199    /// `now - received_at > valid_for`.
200    pub valid_for: Duration,
201}
202
203impl IntentRecord {
204    /// Returns `true` if this intent declares the given file path (at any
205    /// granularity).
206    #[must_use]
207    pub fn claims_path(&self, path: &str) -> bool {
208        self.files.contains_key(path)
209    }
210}
211
212impl IntentRecord {
213    fn is_expired(&self, now: Instant) -> bool {
214        now.saturating_duration_since(self.received_at) > self.valid_for
215    }
216}
217
218/// State for one in-flight-conflict triple.
219#[derive(Debug, Clone)]
220struct InFlightPair {
221    /// When the triple was first observed.
222    first_seen: Instant,
223    /// Whether an escalation `agent.question` has already been emitted.
224    escalated: bool,
225}
226
227/// Lex-ordered agent-id pair used as the dedup key for forward
228/// conflicts and as part of the in-flight-pair key.
229fn ordered_pair(a: &str, b: &str) -> (String, String) {
230    if a <= b {
231        (a.to_string(), b.to_string())
232    } else {
233        (b.to_string(), a.to_string())
234    }
235}
236
237/// In-memory tracker for the detector. Owns the active-intent map, the
238/// per-agent current-files map, and dedup sets for warnings.
239#[derive(Debug, Default)]
240pub struct ConflictTracker {
241    intents: HashMap<String, IntentRecord>,
242    current_files: HashMap<String, HashSet<String>>,
243    warned_intent_pairs: HashSet<(String, String)>,
244    in_flight_pairs: HashMap<(String, String, String), InFlightPair>,
245    warned_violations: HashSet<(String, String)>,
246}
247
248impl ConflictTracker {
249    /// Returns an empty tracker.
250    #[must_use]
251    pub fn new() -> Self {
252        Self::default()
253    }
254
255    // ====================================================================
256    // Mutators
257    // ====================================================================
258
259    /// Inserts or replaces the intent record for `agent_id`. When the new
260    /// intent's file set differs from the prior intent's, any pair-level
261    /// forward-conflict dedup entries the prior intent participated in
262    /// are cleared so future overlaps with peers can re-warn. Re-publishing
263    /// an *identical* file set leaves the dedup intact, so a no-op
264    /// re-publish does not retrigger warnings.
265    pub fn insert_intent(
266        &mut self,
267        agent_id: &str,
268        files: Vec<NormalizedFileIntent>,
269        summary: String,
270        ttl: Duration,
271        now: Instant,
272    ) {
273        let normalized: HashMap<String, Option<Vec<Region>>> = files
274            .into_iter()
275            .filter_map(|nfi| {
276                let path = nfi.path.trim().to_string();
277                if path.is_empty() {
278                    None
279                } else {
280                    Some((path, nfi.regions))
281                }
282            })
283            .collect();
284        let files_changed = self
285            .intents
286            .get(agent_id)
287            .is_none_or(|prior| prior.files != normalized);
288        if files_changed {
289            self.warned_intent_pairs
290                .retain(|(a, b)| a != agent_id && b != agent_id);
291        }
292        self.intents.insert(
293            agent_id.to_string(),
294            IntentRecord {
295                agent_id: agent_id.to_string(),
296                files: normalized,
297                summary,
298                received_at: now,
299                valid_for: ttl,
300            },
301        );
302    }
303
304    /// Replaces the current modified-file set for `agent_id`.
305    /// `modified_files` is always treated as the full current set, not a
306    /// delta.
307    pub fn update_status(&mut self, agent_id: &str, modified_files: Vec<String>) {
308        let normalized: HashSet<String> = modified_files
309            .into_iter()
310            .map(|f| f.trim().to_string())
311            .filter(|f| !f.is_empty())
312            .collect();
313        self.current_files.insert(agent_id.to_string(), normalized);
314    }
315
316    /// Drops intents whose age exceeds their TTL. Forward-conflict dedup
317    /// entries involving the dropped agents are also removed so that a
318    /// future intent from the same agent can re-trigger warnings.
319    pub fn expire_stale_intents(&mut self, now: Instant) {
320        let expired: Vec<String> = self
321            .intents
322            .iter()
323            .filter(|(_, r)| r.is_expired(now))
324            .map(|(id, _)| id.clone())
325            .collect();
326        for id in &expired {
327            self.intents.remove(id);
328        }
329        self.warned_intent_pairs
330            .retain(|(a, b)| !expired.contains(a) && !expired.contains(b));
331    }
332
333    /// Removes in-flight triples whose file is no longer in the
334    /// intersection of both agents' current modified-file sets.
335    pub fn sweep_in_flight_pairs(&mut self) {
336        let keys: Vec<(String, String, String)> = self.in_flight_pairs.keys().cloned().collect();
337        for (a, b, file) in keys {
338            let a_has = self
339                .current_files
340                .get(&a)
341                .is_some_and(|files| files.contains(&file));
342            let b_has = self
343                .current_files
344                .get(&b)
345                .is_some_and(|files| files.contains(&file));
346            if !(a_has && b_has) {
347                self.in_flight_pairs.remove(&(a, b, file));
348            }
349        }
350    }
351
352    // ====================================================================
353    // Read-only queries
354    // ====================================================================
355
356    /// Returns every forward conflict between `x_id`'s intent and every other
357    /// non-expired intent in the tracker.
358    ///
359    /// For each file shared by both intents:
360    /// - If either side declared no regions (`None`), the file is a
361    ///   file-level conflict (v0.5.0 fallback) — reported with empty
362    ///   `regions`.
363    /// - If both sides declared regions, the file conflicts only when the
364    ///   region sets intersect; the intersecting regions are reported.
365    ///
366    /// A pair appears in the output only when at least one file conflicts.
367    /// Caller is responsible for dedup against `warned_intent_pairs`.
368    #[must_use]
369    pub fn forward_overlaps(&self, x_id: &str) -> Vec<ForwardConflict> {
370        let Some(x) = self.intents.get(x_id) else {
371            return Vec::new();
372        };
373        let mut out = Vec::new();
374        for (other_id, y) in &self.intents {
375            if other_id == x_id {
376                continue;
377            }
378            let mut shared: Vec<&String> = x
379                .files
380                .keys()
381                .filter(|path| y.files.contains_key(*path))
382                .collect();
383            shared.sort();
384            let mut file_conflicts = Vec::new();
385            for path in shared {
386                match (&x.files[path], &y.files[path]) {
387                    (Some(xr), Some(yr)) => {
388                        let (regions, cross_kind) = regions_intersect(xr, yr);
389                        if !regions.is_empty() {
390                            file_conflicts.push(FileConflict {
391                                path: path.clone(),
392                                regions,
393                                cross_kind,
394                            });
395                        }
396                    }
397                    // At least one side omitted regions → file-level conflict
398                    // (preserves the v0.5.0 safety net).
399                    _ => {
400                        file_conflicts.push(FileConflict {
401                            path: path.clone(),
402                            regions: Vec::new(),
403                            cross_kind: false,
404                        });
405                    }
406                }
407            }
408            if !file_conflicts.is_empty() {
409                out.push(ForwardConflict {
410                    other_agent: other_id.clone(),
411                    files: file_conflicts,
412                });
413            }
414        }
415        out.sort_by(|a, b| a.other_agent.cmp(&b.other_agent));
416        out
417    }
418
419    /// Returns every `(min_id, max_id, file)` triple currently in the
420    /// intersection of two agents' modified-file sets.
421    #[must_use]
422    pub fn in_flight_overlaps(&self) -> Vec<(String, String, String)> {
423        let ids: Vec<&String> = self.current_files.keys().collect();
424        let mut out = Vec::new();
425        for i in 0..ids.len() {
426            for j in (i + 1)..ids.len() {
427                let a = ids[i];
428                let b = ids[j];
429                let (Some(a_files), Some(b_files)) =
430                    (self.current_files.get(a), self.current_files.get(b))
431                else {
432                    continue;
433                };
434                if a_files.is_empty() || b_files.is_empty() {
435                    continue;
436                }
437                let (lo, hi) = ordered_pair(a, b);
438                let mut files: Vec<String> = a_files.intersection(b_files).cloned().collect();
439                files.sort();
440                for f in files {
441                    out.push((lo.clone(), hi.clone(), f));
442                }
443            }
444        }
445        out.sort();
446        out
447    }
448
449    /// Returns ownership violations for agent `x_id` as `(file,
450    /// owner_y_id)` tuples — files in `x_id`'s `current_files` that lie
451    /// outside `x_id`'s own intent (or `x_id` has no intent) and inside
452    /// some other agent's active non-expired intent.
453    #[must_use]
454    pub fn ownership_violations(&self, x_id: &str) -> Vec<(String, String)> {
455        let Some(x_files) = self.current_files.get(x_id) else {
456            return Vec::new();
457        };
458        let x_intent = self.intents.get(x_id);
459        let mut out = Vec::new();
460        let mut sorted_files: Vec<&String> = x_files.iter().collect();
461        sorted_files.sort();
462        for file in sorted_files {
463            if x_intent.is_some_and(|r| r.claims_path(file)) {
464                continue;
465            }
466            for (other_id, other) in &self.intents {
467                if other_id == x_id {
468                    continue;
469                }
470                if other.claims_path(file) {
471                    out.push((file.clone(), other_id.clone()));
472                }
473            }
474        }
475        // Deterministic ordering by file then owner.
476        out.sort();
477        out
478    }
479
480    // ====================================================================
481    // Dedup state
482    // ====================================================================
483
484    /// Returns `true` if the ordered pair `(min(a, b), max(a, b))` has
485    /// already been warned for a forward conflict.
486    #[must_use]
487    pub fn was_intent_pair_warned(&self, a: &str, b: &str) -> bool {
488        self.warned_intent_pairs.contains(&ordered_pair(a, b))
489    }
490
491    /// Marks the ordered pair as having been warned for a forward
492    /// conflict. Subsequent calls to [`was_intent_pair_warned`] will
493    /// return `true` until either intent is replaced or expires.
494    pub fn mark_intent_pair_warned(&mut self, a: &str, b: &str) {
495        self.warned_intent_pairs.insert(ordered_pair(a, b));
496    }
497
498    /// Inserts an initial entry for the in-flight triple `(min(a, b),
499    /// max(a, b), file)` if not already present. Returns `true` if the
500    /// entry was newly created (i.e. this is the initial warning).
501    pub fn record_in_flight_pair(&mut self, a: &str, b: &str, file: &str, now: Instant) -> bool {
502        let (lo, hi) = ordered_pair(a, b);
503        let key = (lo, hi, file.to_string());
504        if let std::collections::hash_map::Entry::Vacant(slot) = self.in_flight_pairs.entry(key) {
505            slot.insert(InFlightPair {
506                first_seen: now,
507                escalated: false,
508            });
509            true
510        } else {
511            false
512        }
513    }
514
515    /// Returns and marks-escalated every in-flight triple whose age
516    /// exceeds `window` and that has not yet been escalated. Each triple
517    /// is returned at most once across the tracker's lifetime.
518    pub fn take_due_escalations(
519        &mut self,
520        window: Duration,
521        now: Instant,
522    ) -> Vec<(String, String, String)> {
523        let mut out = Vec::new();
524        for (key, pair) in &mut self.in_flight_pairs {
525            if pair.escalated {
526                continue;
527            }
528            if now.saturating_duration_since(pair.first_seen) >= window {
529                pair.escalated = true;
530                out.push(key.clone());
531            }
532        }
533        out.sort();
534        out
535    }
536
537    /// Returns `true` if the violator/file pair has already been warned.
538    #[must_use]
539    pub fn was_ownership_warned(&self, violator: &str, file: &str) -> bool {
540        self.warned_violations
541            .contains(&(violator.to_string(), file.to_string()))
542    }
543
544    /// Marks the violator/file pair as warned. Subsequent
545    /// [`was_ownership_warned`] calls return `true`.
546    pub fn mark_ownership_warned(&mut self, violator: &str, file: &str) {
547        self.warned_violations
548            .insert((violator.to_string(), file.to_string()));
549    }
550
551    // ====================================================================
552    // Inspection (read-only access for tests and external callers)
553    // ====================================================================
554
555    /// Returns the intent record for an agent, if one is currently
556    /// tracked.
557    #[must_use]
558    pub fn intent_for(&self, agent_id: &str) -> Option<&IntentRecord> {
559        self.intents.get(agent_id)
560    }
561
562    /// Returns the current modified-file set for an agent, if known.
563    #[must_use]
564    pub fn current_files_for(&self, agent_id: &str) -> Option<&HashSet<String>> {
565        self.current_files.get(agent_id)
566    }
567
568    /// Returns the number of in-flight triples currently tracked.
569    #[must_use]
570    pub fn in_flight_pair_count(&self) -> usize {
571        self.in_flight_pairs.len()
572    }
573}
574
575// =========================================================================
576// Auto-emit helpers and detector loop
577// =========================================================================
578
579/// Hint appended to a forward-conflict warning when a conservative
580/// cross-kind region match (a named region vs a line range) drove the
581/// conflict. Teaches the user why the warning fired and how to narrow it.
582pub const CROSS_KIND_HINT: &str = "Note: one side declared named regions and the other declared line ranges; \
583     these always intersect conservatively. If you want narrower conflict matching, \
584     both sides should use the same region kind.";
585
586/// Renders one conflicting file for warning text: bare path for a
587/// file-level conflict, or `path (regions: <list>)` when regions
588/// intersected.
589fn describe_file_conflict(fc: &FileConflict) -> String {
590    if fc.regions.is_empty() {
591        fc.path.clone()
592    } else {
593        let regions: Vec<String> = fc.regions.iter().map(Region::to_string).collect();
594        format!("{} (regions: {})", fc.path, regions.join(", "))
595    }
596}
597
598/// Builds a forward-conflict feedback error string addressed to one
599/// publisher of an overlapping intent pair.
600///
601/// When any conflicting file carries intersecting regions, those regions are
602/// named explicitly (kind + name, or range). When a cross-kind conservative
603/// match contributed to the conflict, the [`CROSS_KIND_HINT`] is appended.
604fn forward_conflict_error(other_agent: &str, files: &[FileConflict]) -> String {
605    let list = files
606        .iter()
607        .map(describe_file_conflict)
608        .collect::<Vec<_>>()
609        .join("; ");
610    let mut text = format!(
611        "{CONFLICT_DETECTOR_TAG} forward conflict: agent {other_agent} also intends to modify: {list}",
612    );
613    if files.iter().any(|fc| fc.cross_kind) {
614        text.push(' ');
615        text.push_str(CROSS_KIND_HINT);
616    }
617    text
618}
619
620/// Builds an in-flight-conflict feedback error string addressed to one
621/// toucher of a contested file.
622fn in_flight_conflict_error(other_agent: &str, file: &str) -> String {
623    format!(
624        "{CONFLICT_DETECTOR_TAG} in-flight conflict: file {file} is being modified by both you and {other_agent}",
625    )
626}
627
628/// Builds an ownership-violation feedback error string addressed to the
629/// violator.
630fn ownership_violation_error(file: &str, owner: &str) -> String {
631    format!(
632        "{CONFLICT_DETECTOR_TAG} ownership violation: you edited {file} but agent {owner} declared intent over it. Update your agent.intent to declare this file or back off.",
633    )
634}
635
636/// Builds the in-flight-escalation question text.
637fn in_flight_escalation_question(a: &str, b: &str, file: &str, window_secs: u64) -> String {
638    format!(
639        "{CONFLICT_DETECTOR_TAG} in-flight conflict on {file} between {a} and {b} has not resolved within {window_secs}s. Human input requested.",
640    )
641}
642
643/// Builds the ownership-violation escalation question text.
644fn ownership_escalation_question(violator: &str, file: &str, owner: &str) -> String {
645    format!(
646        "{CONFLICT_DETECTOR_TAG} ownership violation: {violator} edited {file} which is in {owner}'s intent. Human review requested.",
647    )
648}
649
650/// Publishes an `agent.feedback` message addressed to `target_id` with a
651/// single error string `error_text`. The message's `from` is
652/// [`CONFLICT_DETECTOR_SENDER`] (always `"supervisor"`).
653pub fn emit_feedback(state: &Arc<BrokerState>, target_id: &str, error_text: String) {
654    let msg = BrokerMessage::Feedback {
655        agent_id: target_id.to_string(),
656        payload: FeedbackPayload {
657            from: CONFLICT_DETECTOR_SENDER.to_string(),
658            errors: vec![error_text],
659        },
660    };
661    delivery::publish_message(state, &msg);
662}
663
664/// Publishes an `agent.question` message into the supervisor inbox. The
665/// message's `agent_id` is `"supervisor"` (the recipient by convention).
666pub fn emit_question(state: &Arc<BrokerState>, question_text: String) {
667    let msg = BrokerMessage::Question {
668        agent_id: CONFLICT_DETECTOR_SENDER.to_string(),
669        payload: QuestionPayload {
670            question: question_text,
671        },
672    };
673    delivery::publish_message(state, &msg);
674}
675
676/// Process a single message through the tracker and emit any warnings
677/// that the configured policy allows.
678///
679/// Returns the number of auto-emitted broker messages.
680///
681/// This is the per-message body of the detector loop, lifted into a
682/// standalone function so it can be unit-tested without spawning a
683/// tokio task.
684pub fn process_message(
685    state: &Arc<BrokerState>,
686    tracker: &mut ConflictTracker,
687    msg: &BrokerMessage,
688    config: &ConflictConfig,
689    now: Instant,
690) -> usize {
691    // Re-entrancy guard: ignore any message whose sender is the detector
692    // itself. The detector publishes `Feedback` (from supervisor) and
693    // `Question` (agent_id = supervisor), neither of which it should
694    // re-process.
695    if matches!(
696        msg,
697        BrokerMessage::Feedback { payload, .. } if payload.from == CONFLICT_DETECTOR_SENDER
698    ) || matches!(
699        msg,
700        BrokerMessage::Question { agent_id, .. } if agent_id == CONFLICT_DETECTOR_SENDER
701    ) {
702        return 0;
703    }
704
705    let mut emitted = 0usize;
706    // Drop expired intents up front so neither overlap check sees them.
707    tracker.expire_stale_intents(now);
708
709    match msg {
710        BrokerMessage::Intent { agent_id, payload } => {
711            let IntentPayload {
712                files,
713                summary,
714                valid_for_seconds,
715            } = payload.clone();
716            let normalized: Vec<NormalizedFileIntent> =
717                files.into_iter().map(NormalizedFileIntent::from).collect();
718            tracker.insert_intent(
719                agent_id,
720                normalized,
721                summary,
722                Duration::from_secs(valid_for_seconds),
723                now,
724            );
725            if config.warn_on_intent_overlap {
726                for conflict in tracker.forward_overlaps(agent_id) {
727                    if tracker.was_intent_pair_warned(agent_id, &conflict.other_agent) {
728                        continue;
729                    }
730                    emit_feedback(
731                        state,
732                        agent_id,
733                        forward_conflict_error(&conflict.other_agent, &conflict.files),
734                    );
735                    emit_feedback(
736                        state,
737                        &conflict.other_agent,
738                        forward_conflict_error(agent_id, &conflict.files),
739                    );
740                    tracker.mark_intent_pair_warned(agent_id, &conflict.other_agent);
741                    emitted += 2;
742                }
743            }
744        }
745        BrokerMessage::Status { agent_id, payload } => {
746            let StatusPayload { modified_files, .. } = payload.clone();
747            tracker.update_status(agent_id, modified_files);
748
749            // 1. In-flight initial warnings — look for triples involving X.
750            for (a, b, file) in tracker.in_flight_overlaps() {
751                if a.as_str() != agent_id.as_str() && b.as_str() != agent_id.as_str() {
752                    continue;
753                }
754                if tracker.record_in_flight_pair(&a, &b, &file, now) {
755                    emit_feedback(state, &a, in_flight_conflict_error(&b, &file));
756                    emit_feedback(state, &b, in_flight_conflict_error(&a, &file));
757                    emitted += 2;
758                }
759            }
760
761            // 2. In-flight resolution — drop any triple whose file
762            //    intersection no longer includes the file (one agent
763            //    stopped touching it).
764            tracker.sweep_in_flight_pairs();
765
766            // 3. Ownership violations for X.
767            for (file, owner) in tracker.ownership_violations(agent_id) {
768                if tracker.was_ownership_warned(agent_id, &file) {
769                    continue;
770                }
771                emit_feedback(state, agent_id, ownership_violation_error(&file, &owner));
772                emitted += 1;
773                if config.escalate_on_violation {
774                    emit_question(
775                        state,
776                        ownership_escalation_question(agent_id, &file, &owner),
777                    );
778                    emitted += 1;
779                }
780                tracker.mark_ownership_warned(agent_id, &file);
781            }
782        }
783        _ => {}
784    }
785
786    emitted
787}
788
789/// Run a single tick of the periodic timer-driven detector logic:
790/// expire stale intents, sweep resolved in-flight pairs, then emit any
791/// escalations whose window has elapsed.
792///
793/// Returns the number of escalation messages emitted.
794pub fn tick(
795    state: &Arc<BrokerState>,
796    tracker: &mut ConflictTracker,
797    config: &ConflictConfig,
798    now: Instant,
799) -> usize {
800    tracker.expire_stale_intents(now);
801    tracker.sweep_in_flight_pairs();
802    let window = Duration::from_secs(config.window_seconds);
803    let mut emitted = 0usize;
804    for (a, b, file) in tracker.take_due_escalations(window, now) {
805        emit_question(
806            state,
807            in_flight_escalation_question(&a, &b, &file, config.window_seconds),
808        );
809        emitted += 1;
810    }
811    emitted
812}
813
814/// Spawns a background tokio task that drives the detector loop.
815///
816/// The task tails the broker message log via a sequence cursor (matching
817/// the existing watcher-style pattern). On each poll interval it:
818///
819/// 1. Reads any new messages since the previous cursor and feeds each
820///    through [`process_message`] (which handles forward/in-flight/owner
821///    detection for that message).
822/// 2. Runs [`tick`] to expire stale intents, sweep resolved in-flight
823///    pairs, and emit escalations whose window has elapsed.
824///
825/// Exits cleanly when `shutdown` is flipped to `true`.
826pub async fn run_detector_loop(
827    state: Arc<BrokerState>,
828    config: ConflictConfig,
829    mut shutdown: tokio::sync::watch::Receiver<bool>,
830) {
831    let mut tracker = ConflictTracker::new();
832    let mut cursor: u64 = 0;
833    let mut ticker = tokio::time::interval(DETECTOR_TICK_INTERVAL);
834    ticker.tick().await; // skip the immediate first tick
835    loop {
836        tokio::select! {
837            _ = ticker.tick() => {}
838            _ = shutdown.changed() => {
839                if *shutdown.borrow() {
840                    break;
841                }
842            }
843        }
844
845        let now = Instant::now();
846
847        // Pull every new message since the last cursor under the read
848        // lock, then release before doing any further work.
849        let batch = delivery::full_log(&state, cursor);
850        for (seq, _ts, msg) in &batch {
851            process_message(&state, &mut tracker, msg, &config, now);
852            if *seq > cursor {
853                cursor = *seq;
854            }
855        }
856
857        tick(&state, &mut tracker, &config, now);
858    }
859}
860
861/// Poll interval for the detector loop. Matches the watcher's cadence
862/// to keep cross-subsystem timing predictable.
863pub const DETECTOR_TICK_INTERVAL: Duration = Duration::from_millis(500);
864
865#[cfg(test)]
866mod tests {
867    use super::*;
868    use crate::broker::messages::{ArtifactPayload, IntentPayload, StatusPayload};
869
870    fn fresh() -> ConflictTracker {
871        ConflictTracker::new()
872    }
873
874    fn ttl_secs(s: u64) -> Duration {
875        Duration::from_secs(s)
876    }
877
878    fn files(list: &[&str]) -> Vec<String> {
879        list.iter().map(|s| (*s).to_string()).collect()
880    }
881
882    /// File-level normalised intents (no regions) from a path list — the
883    /// v0.5.0 shape, used by the tracker tests that exercise file-level
884    /// behaviour.
885    fn nfi(list: &[&str]) -> Vec<NormalizedFileIntent> {
886        list.iter()
887            .map(|s| NormalizedFileIntent {
888                path: (*s).to_string(),
889                regions: None,
890            })
891            .collect()
892    }
893
894    /// File-level `FileIntent` wire entries (plain strings) from a path list.
895    fn fi(list: &[&str]) -> Vec<FileIntent> {
896        list.iter().map(|s| FileIntent::from(*s)).collect()
897    }
898
899    fn func(name: &str) -> Region {
900        Region::Function {
901            name: name.to_string(),
902        }
903    }
904
905    // Maps to scenario `Detector stops cleanly when broker stops` from
906    // conflict-detection. Spawns the detector via its existing constructor
907    // and asserts the task exits within one poll interval + slack after
908    // the broker's shutdown signal flips. (test-coverage-v0-5-0 task 6.1)
909    #[test]
910    fn detector_stops_cleanly_on_broker_stop() {
911        use tokio::time::Duration;
912
913        let runtime = tokio::runtime::Builder::new_current_thread()
914            .enable_all()
915            .build()
916            .expect("runtime");
917        runtime.block_on(async {
918            let state = Arc::new(BrokerState::new(None));
919            let cfg = ConflictConfig::default();
920            let (tx, rx) = tokio::sync::watch::channel(false);
921            let handle = tokio::spawn(run_detector_loop(state, cfg, rx));
922
923            // Mirror the broker drop path: flip the shutdown channel to true.
924            tx.send(true).expect("shutdown send");
925
926            let timed =
927                tokio::time::timeout(DETECTOR_TICK_INTERVAL + Duration::from_millis(100), handle)
928                    .await
929                    .expect("detector task did not exit within poll interval + slack");
930            timed.expect("detector task should not panic");
931        });
932    }
933
934    fn fresh_state() -> Arc<BrokerState> {
935        Arc::new(BrokerState::new(None))
936    }
937
938    fn intent_msg(agent_id: &str, files_list: &[&str], summary: &str, ttl: u64) -> BrokerMessage {
939        BrokerMessage::Intent {
940            agent_id: agent_id.to_string(),
941            payload: IntentPayload {
942                files: fi(files_list),
943                summary: summary.to_string(),
944                valid_for_seconds: ttl,
945            },
946        }
947    }
948
949    /// Builds an intent message whose `files` carry explicit region-bearing
950    /// entries. Each `(path, regions)` pair becomes a `FileIntent::Detailed`.
951    fn intent_msg_with_regions(
952        agent_id: &str,
953        files_list: &[(&str, Vec<Region>)],
954        summary: &str,
955        ttl: u64,
956    ) -> BrokerMessage {
957        BrokerMessage::Intent {
958            agent_id: agent_id.to_string(),
959            payload: IntentPayload {
960                files: files_list
961                    .iter()
962                    .map(|(path, regions)| FileIntent::Detailed {
963                        path: (*path).to_string(),
964                        regions: regions.clone(),
965                    })
966                    .collect(),
967                summary: summary.to_string(),
968                valid_for_seconds: ttl,
969            },
970        }
971    }
972
973    fn status_msg(agent_id: &str, files_list: &[&str]) -> BrokerMessage {
974        BrokerMessage::Status {
975            agent_id: agent_id.to_string(),
976            payload: StatusPayload {
977                status: "working".to_string(),
978                modified_files: files(files_list),
979                message: None,
980                ..Default::default()
981            },
982        }
983    }
984
985    fn supervisor_feedbacks_in_inbox(state: &Arc<BrokerState>, target: &str) -> Vec<BrokerMessage> {
986        let (msgs, _) = delivery::poll_messages(state, target, 0);
987        msgs.into_iter()
988            .filter(|m| {
989                matches!(
990                    m,
991                    BrokerMessage::Feedback { payload, .. }
992                        if payload.from == CONFLICT_DETECTOR_SENDER
993                )
994            })
995            .collect()
996    }
997
998    fn supervisor_questions(state: &Arc<BrokerState>) -> Vec<BrokerMessage> {
999        let (msgs, _) = delivery::poll_messages(state, "supervisor", 0);
1000        msgs.into_iter()
1001            .filter(|m| matches!(m, BrokerMessage::Question { .. }))
1002            .collect()
1003    }
1004
1005    fn default_config() -> ConflictConfig {
1006        ConflictConfig::default()
1007    }
1008
1009    // ====================================================================
1010    // Tracker unit tests (task 2.5)
1011    // ====================================================================
1012
1013    #[test]
1014    fn tracker_insert_intent_records_files() {
1015        let mut t = fresh();
1016        let now = Instant::now();
1017        t.insert_intent(
1018            "feat-x",
1019            nfi(&["src/a.rs", "src/b.rs"]),
1020            "x".into(),
1021            ttl_secs(60),
1022            now,
1023        );
1024        let r = t.intent_for("feat-x").unwrap();
1025        assert!(r.files.contains_key("src/a.rs"));
1026        assert!(r.files.contains_key("src/b.rs"));
1027        assert_eq!(r.valid_for, ttl_secs(60));
1028    }
1029
1030    #[test]
1031    fn tracker_insert_intent_replaces_prior_intent() {
1032        let mut t = fresh();
1033        let now = Instant::now();
1034        t.insert_intent(
1035            "feat-x",
1036            nfi(&["src/a.rs"]),
1037            "old".into(),
1038            ttl_secs(60),
1039            now,
1040        );
1041        t.insert_intent(
1042            "feat-x",
1043            nfi(&["src/a.rs", "src/b.rs"]),
1044            "new".into(),
1045            ttl_secs(60),
1046            now,
1047        );
1048        let r = t.intent_for("feat-x").unwrap();
1049        assert_eq!(r.summary, "new");
1050        assert_eq!(r.files.len(), 2);
1051    }
1052
1053    #[test]
1054    fn tracker_expire_stale_intents_drops_aged_entries() {
1055        let mut t = fresh();
1056        let now = Instant::now();
1057        t.insert_intent("feat-x", nfi(&["a"]), "x".into(), ttl_secs(1), now);
1058        let later = now + Duration::from_secs(2);
1059        t.expire_stale_intents(later);
1060        assert!(t.intent_for("feat-x").is_none());
1061    }
1062
1063    #[test]
1064    fn tracker_forward_overlaps_returns_overlap_files() {
1065        let mut t = fresh();
1066        let now = Instant::now();
1067        t.insert_intent("feat-x", nfi(&["a", "b"]), "x".into(), ttl_secs(60), now);
1068        t.insert_intent("feat-y", nfi(&["b", "c"]), "y".into(), ttl_secs(60), now);
1069        let overlaps = t.forward_overlaps("feat-x");
1070        assert_eq!(overlaps.len(), 1);
1071        assert_eq!(overlaps[0].other_agent, "feat-y");
1072        // File-level conflict (no regions declared on either side): one
1073        // conflicting file, "b", with empty regions.
1074        assert_eq!(overlaps[0].files.len(), 1);
1075        assert_eq!(overlaps[0].files[0].path, "b");
1076        assert!(overlaps[0].files[0].regions.is_empty());
1077    }
1078
1079    #[test]
1080    fn tracker_intent_pair_dedupe_is_ordered() {
1081        let mut t = fresh();
1082        assert!(!t.was_intent_pair_warned("feat-y", "feat-x"));
1083        t.mark_intent_pair_warned("feat-x", "feat-y");
1084        assert!(t.was_intent_pair_warned("feat-x", "feat-y"));
1085        assert!(t.was_intent_pair_warned("feat-y", "feat-x"));
1086    }
1087
1088    #[test]
1089    fn tracker_insert_intent_clears_prior_pair_dedupe() {
1090        let mut t = fresh();
1091        let now = Instant::now();
1092        t.insert_intent("feat-x", nfi(&["a"]), "x".into(), ttl_secs(60), now);
1093        t.insert_intent("feat-y", nfi(&["a"]), "y".into(), ttl_secs(60), now);
1094        t.mark_intent_pair_warned("feat-x", "feat-y");
1095        assert!(t.was_intent_pair_warned("feat-x", "feat-y"));
1096        // New intent from x must clear the pair entry so subsequent overlaps re-warn.
1097        t.insert_intent("feat-x", nfi(&["a", "b"]), "x2".into(), ttl_secs(60), now);
1098        assert!(!t.was_intent_pair_warned("feat-x", "feat-y"));
1099    }
1100
1101    #[test]
1102    fn tracker_in_flight_overlaps_returns_intersected_files() {
1103        let mut t = fresh();
1104        t.update_status("feat-x", files(&["src/a.rs", "src/b.rs"]));
1105        t.update_status("feat-y", files(&["src/a.rs"]));
1106        let pairs = t.in_flight_overlaps();
1107        assert_eq!(pairs.len(), 1);
1108        assert_eq!(
1109            pairs[0],
1110            (
1111                "feat-x".to_string(),
1112                "feat-y".to_string(),
1113                "src/a.rs".to_string()
1114            )
1115        );
1116    }
1117
1118    #[test]
1119    fn tracker_record_in_flight_pair_returns_true_only_first_time() {
1120        let mut t = fresh();
1121        let now = Instant::now();
1122        assert!(t.record_in_flight_pair("feat-x", "feat-y", "src/a.rs", now));
1123        assert!(!t.record_in_flight_pair("feat-y", "feat-x", "src/a.rs", now));
1124        // Sweep removes triples whose file is no longer in the intersection.
1125        t.update_status("feat-x", files(&["src/b.rs"]));
1126        t.update_status("feat-y", files(&["src/a.rs"]));
1127        t.sweep_in_flight_pairs();
1128        assert!(t.record_in_flight_pair("feat-x", "feat-y", "src/a.rs", now));
1129    }
1130
1131    #[test]
1132    fn tracker_take_due_escalations_returns_aged_triples_once() {
1133        let mut t = fresh();
1134        let now = Instant::now();
1135        t.record_in_flight_pair("feat-x", "feat-y", "f", now);
1136        let window = Duration::from_mins(2);
1137        // Too soon — nothing returned.
1138        let out = t.take_due_escalations(window, now + Duration::from_secs(10));
1139        assert!(out.is_empty());
1140        let due = now + Duration::from_mins(2) + Duration::from_secs(1);
1141        let out = t.take_due_escalations(window, due);
1142        assert_eq!(out.len(), 1);
1143        // Second call after marking — escalation is sticky.
1144        let out2 = t.take_due_escalations(window, due);
1145        assert!(out2.is_empty());
1146    }
1147
1148    #[test]
1149    fn tracker_ownership_violations_file_inside_other_intent() {
1150        let mut t = fresh();
1151        let now = Instant::now();
1152        t.insert_intent("feat-x", nfi(&["src/a.rs"]), "x".into(), ttl_secs(60), now);
1153        t.update_status("feat-y", files(&["src/a.rs"]));
1154        let v = t.ownership_violations("feat-y");
1155        assert_eq!(v.len(), 1);
1156        assert_eq!(v[0], ("src/a.rs".to_string(), "feat-x".to_string()));
1157    }
1158
1159    #[test]
1160    fn tracker_ownership_violations_inside_own_intent_is_ok() {
1161        let mut t = fresh();
1162        let now = Instant::now();
1163        t.insert_intent("feat-y", nfi(&["src/a.rs"]), "y".into(), ttl_secs(60), now);
1164        t.update_status("feat-y", files(&["src/a.rs"]));
1165        assert!(t.ownership_violations("feat-y").is_empty());
1166    }
1167
1168    #[test]
1169    fn tracker_ownership_violations_unclaimed_file_is_ok() {
1170        let mut t = fresh();
1171        t.update_status("feat-y", files(&["src/orphan.rs"]));
1172        assert!(t.ownership_violations("feat-y").is_empty());
1173    }
1174
1175    // ====================================================================
1176    // Detector behavior tests (task 4)
1177    // ====================================================================
1178
1179    #[test]
1180    fn detector_forward_conflict_happy_path() {
1181        let state = fresh_state();
1182        let mut t = ConflictTracker::new();
1183        // Ensure both inboxes exist (delivery skips unregistered queues).
1184        delivery::publish_message(&state, &status_msg("feat-x", &[]));
1185        delivery::publish_message(&state, &status_msg("feat-y", &[]));
1186
1187        let now = Instant::now();
1188        process_message(
1189            &state,
1190            &mut t,
1191            &intent_msg("feat-x", &["src/a.rs", "src/b.rs"], "x", 600),
1192            &default_config(),
1193            now,
1194        );
1195        process_message(
1196            &state,
1197            &mut t,
1198            &intent_msg("feat-y", &["src/b.rs", "src/c.rs"], "y", 600),
1199            &default_config(),
1200            now,
1201        );
1202
1203        let x_fb = supervisor_feedbacks_in_inbox(&state, "feat-x");
1204        let y_fb = supervisor_feedbacks_in_inbox(&state, "feat-y");
1205        assert_eq!(
1206            x_fb.len(),
1207            1,
1208            "feat-x should have one forward-conflict feedback"
1209        );
1210        assert_eq!(y_fb.len(), 1);
1211        // Text contains the tag, peer agent_id, and the overlap file.
1212        if let BrokerMessage::Feedback { payload, .. } = &x_fb[0] {
1213            let err = &payload.errors[0];
1214            assert!(err.starts_with(CONFLICT_DETECTOR_TAG));
1215            assert!(err.contains("forward conflict"));
1216            assert!(err.contains("feat-y"));
1217            assert!(err.contains("src/b.rs"));
1218        } else {
1219            panic!("expected Feedback");
1220        }
1221    }
1222
1223    #[test]
1224    fn detector_forward_conflict_dedupe() {
1225        let state = fresh_state();
1226        let mut t = ConflictTracker::new();
1227        delivery::publish_message(&state, &status_msg("feat-x", &[]));
1228        delivery::publish_message(&state, &status_msg("feat-y", &[]));
1229        let cfg = default_config();
1230        let now = Instant::now();
1231        process_message(
1232            &state,
1233            &mut t,
1234            &intent_msg("feat-x", &["src/a.rs"], "x", 600),
1235            &cfg,
1236            now,
1237        );
1238        process_message(
1239            &state,
1240            &mut t,
1241            &intent_msg("feat-y", &["src/a.rs"], "y", 600),
1242            &cfg,
1243            now,
1244        );
1245        // Re-publishing the *same* intent must not re-emit. (The
1246        // tracker's clear-on-insert behaviour clears pair dedupe on
1247        // replace; here we use a fresh duplicate of the same agent's
1248        // intent which should leave dedupe in place because the pair was
1249        // already warned via the prior pair, not via the agent itself.
1250        // To make this deterministic, simulate a no-op re-publish from a
1251        // *new* x intent that is identical to the prior one: the
1252        // dedupe is cleared, so the rule we test is at the message level
1253        // — re-publishing the same intent message body does NOT re-emit
1254        // because nothing changed.)
1255        let before_x = supervisor_feedbacks_in_inbox(&state, "feat-x").len();
1256        let before_y = supervisor_feedbacks_in_inbox(&state, "feat-y").len();
1257        // Re-publish feat-y's identical intent.
1258        process_message(
1259            &state,
1260            &mut t,
1261            &intent_msg("feat-y", &["src/a.rs"], "y", 600),
1262            &cfg,
1263            now,
1264        );
1265        let after_x = supervisor_feedbacks_in_inbox(&state, "feat-x").len();
1266        let after_y = supervisor_feedbacks_in_inbox(&state, "feat-y").len();
1267        assert_eq!(
1268            before_x, after_x,
1269            "no new feedback to x on identical re-publish"
1270        );
1271        assert_eq!(before_y, after_y);
1272    }
1273
1274    #[test]
1275    fn detector_forward_conflict_suppression_when_disabled() {
1276        let state = fresh_state();
1277        let mut t = ConflictTracker::new();
1278        delivery::publish_message(&state, &status_msg("feat-x", &[]));
1279        delivery::publish_message(&state, &status_msg("feat-y", &[]));
1280        let cfg = ConflictConfig {
1281            warn_on_intent_overlap: false,
1282            ..ConflictConfig::default()
1283        };
1284        let now = Instant::now();
1285        process_message(
1286            &state,
1287            &mut t,
1288            &intent_msg("feat-x", &["src/a.rs"], "x", 600),
1289            &cfg,
1290            now,
1291        );
1292        process_message(
1293            &state,
1294            &mut t,
1295            &intent_msg("feat-y", &["src/a.rs"], "y", 600),
1296            &cfg,
1297            now,
1298        );
1299        assert!(supervisor_feedbacks_in_inbox(&state, "feat-x").is_empty());
1300        assert!(supervisor_feedbacks_in_inbox(&state, "feat-y").is_empty());
1301        // Tracker still has both intents — needed for in-flight + ownership detection.
1302        assert!(t.intent_for("feat-x").is_some());
1303        assert!(t.intent_for("feat-y").is_some());
1304    }
1305
1306    #[test]
1307    fn detector_forward_conflict_non_overlap_no_warnings() {
1308        let state = fresh_state();
1309        let mut t = ConflictTracker::new();
1310        delivery::publish_message(&state, &status_msg("feat-x", &[]));
1311        delivery::publish_message(&state, &status_msg("feat-y", &[]));
1312        let now = Instant::now();
1313        let cfg = default_config();
1314        process_message(
1315            &state,
1316            &mut t,
1317            &intent_msg("feat-x", &["src/a.rs"], "x", 600),
1318            &cfg,
1319            now,
1320        );
1321        process_message(
1322            &state,
1323            &mut t,
1324            &intent_msg("feat-y", &["src/b.rs"], "y", 600),
1325            &cfg,
1326            now,
1327        );
1328        assert!(supervisor_feedbacks_in_inbox(&state, "feat-x").is_empty());
1329        assert!(supervisor_feedbacks_in_inbox(&state, "feat-y").is_empty());
1330    }
1331
1332    #[test]
1333    fn detector_self_replace_no_self_conflict() {
1334        let state = fresh_state();
1335        let mut t = ConflictTracker::new();
1336        delivery::publish_message(&state, &status_msg("feat-x", &[]));
1337        let now = Instant::now();
1338        let cfg = default_config();
1339        process_message(
1340            &state,
1341            &mut t,
1342            &intent_msg("feat-x", &["src/a.rs"], "x", 600),
1343            &cfg,
1344            now,
1345        );
1346        process_message(
1347            &state,
1348            &mut t,
1349            &intent_msg("feat-x", &["src/a.rs", "src/b.rs"], "x2", 600),
1350            &cfg,
1351            now,
1352        );
1353        assert!(supervisor_feedbacks_in_inbox(&state, "feat-x").is_empty());
1354    }
1355
1356    #[test]
1357    fn detector_ttl_expired_intent_does_not_overlap() {
1358        let state = fresh_state();
1359        let mut t = ConflictTracker::new();
1360        delivery::publish_message(&state, &status_msg("feat-x", &[]));
1361        delivery::publish_message(&state, &status_msg("feat-y", &[]));
1362        let now = Instant::now();
1363        let cfg = default_config();
1364        process_message(
1365            &state,
1366            &mut t,
1367            &intent_msg("feat-x", &["src/a.rs"], "x", 1),
1368            &cfg,
1369            now,
1370        );
1371        // Wait past TTL.
1372        let later = now + Duration::from_secs(5);
1373        process_message(
1374            &state,
1375            &mut t,
1376            &intent_msg("feat-y", &["src/a.rs"], "y", 600),
1377            &cfg,
1378            later,
1379        );
1380        assert!(supervisor_feedbacks_in_inbox(&state, "feat-x").is_empty());
1381        assert!(supervisor_feedbacks_in_inbox(&state, "feat-y").is_empty());
1382    }
1383
1384    #[test]
1385    fn detector_in_flight_initial_warning() {
1386        let state = fresh_state();
1387        let mut t = ConflictTracker::new();
1388        delivery::publish_message(&state, &status_msg("feat-x", &[]));
1389        delivery::publish_message(&state, &status_msg("feat-y", &[]));
1390        let now = Instant::now();
1391        let cfg = default_config();
1392        process_message(
1393            &state,
1394            &mut t,
1395            &status_msg("feat-x", &["src/a.rs"]),
1396            &cfg,
1397            now,
1398        );
1399        process_message(
1400            &state,
1401            &mut t,
1402            &status_msg("feat-y", &["src/a.rs"]),
1403            &cfg,
1404            now,
1405        );
1406        let x_fb = supervisor_feedbacks_in_inbox(&state, "feat-x");
1407        let y_fb = supervisor_feedbacks_in_inbox(&state, "feat-y");
1408        assert_eq!(x_fb.len(), 1);
1409        assert_eq!(y_fb.len(), 1);
1410        if let BrokerMessage::Feedback { payload, .. } = &x_fb[0] {
1411            assert!(payload.errors[0].contains("in-flight conflict"));
1412            assert!(payload.errors[0].contains("src/a.rs"));
1413            assert!(payload.errors[0].starts_with(CONFLICT_DETECTOR_TAG));
1414        }
1415    }
1416
1417    #[test]
1418    fn detector_in_flight_escalation_after_window() {
1419        let state = fresh_state();
1420        let mut t = ConflictTracker::new();
1421        delivery::publish_message(&state, &status_msg("feat-x", &[]));
1422        delivery::publish_message(&state, &status_msg("feat-y", &[]));
1423        let now = Instant::now();
1424        let cfg = ConflictConfig {
1425            window_seconds: 5,
1426            ..ConflictConfig::default()
1427        };
1428        process_message(
1429            &state,
1430            &mut t,
1431            &status_msg("feat-x", &["src/a.rs"]),
1432            &cfg,
1433            now,
1434        );
1435        process_message(
1436            &state,
1437            &mut t,
1438            &status_msg("feat-y", &["src/a.rs"]),
1439            &cfg,
1440            now,
1441        );
1442        // Time advances past the window — tick should emit one question.
1443        let due = now + Duration::from_secs(10);
1444        let emitted = tick(&state, &mut t, &cfg, due);
1445        assert_eq!(emitted, 1);
1446        let q = supervisor_questions(&state);
1447        assert_eq!(q.len(), 1);
1448        if let BrokerMessage::Question { payload, .. } = &q[0] {
1449            assert!(payload.question.contains(CONFLICT_DETECTOR_TAG));
1450            assert!(payload.question.contains("src/a.rs"));
1451            assert!(payload.question.contains("feat-x"));
1452            assert!(payload.question.contains("feat-y"));
1453        }
1454    }
1455
1456    #[test]
1457    fn detector_in_flight_escalation_dedupe() {
1458        let state = fresh_state();
1459        let mut t = ConflictTracker::new();
1460        delivery::publish_message(&state, &status_msg("feat-x", &[]));
1461        delivery::publish_message(&state, &status_msg("feat-y", &[]));
1462        let now = Instant::now();
1463        let cfg = ConflictConfig {
1464            window_seconds: 5,
1465            ..ConflictConfig::default()
1466        };
1467        process_message(
1468            &state,
1469            &mut t,
1470            &status_msg("feat-x", &["src/a.rs"]),
1471            &cfg,
1472            now,
1473        );
1474        process_message(
1475            &state,
1476            &mut t,
1477            &status_msg("feat-y", &["src/a.rs"]),
1478            &cfg,
1479            now,
1480        );
1481        let due = now + Duration::from_secs(10);
1482        tick(&state, &mut t, &cfg, due);
1483        // Subsequent tick while still overlapping must not re-emit.
1484        let later = due + Duration::from_secs(10);
1485        let emitted = tick(&state, &mut t, &cfg, later);
1486        assert_eq!(emitted, 0);
1487        let q = supervisor_questions(&state);
1488        assert_eq!(q.len(), 1);
1489    }
1490
1491    #[test]
1492    fn detector_in_flight_resolution_drops_triple() {
1493        let state = fresh_state();
1494        let mut t = ConflictTracker::new();
1495        delivery::publish_message(&state, &status_msg("feat-x", &[]));
1496        delivery::publish_message(&state, &status_msg("feat-y", &[]));
1497        let now = Instant::now();
1498        let cfg = ConflictConfig {
1499            window_seconds: 5,
1500            ..ConflictConfig::default()
1501        };
1502        process_message(
1503            &state,
1504            &mut t,
1505            &status_msg("feat-x", &["src/a.rs"]),
1506            &cfg,
1507            now,
1508        );
1509        process_message(
1510            &state,
1511            &mut t,
1512            &status_msg("feat-y", &["src/a.rs"]),
1513            &cfg,
1514            now,
1515        );
1516        assert_eq!(t.in_flight_pair_count(), 1);
1517        // X stops touching the file.
1518        process_message(&state, &mut t, &status_msg("feat-x", &[]), &cfg, now);
1519        assert_eq!(t.in_flight_pair_count(), 0);
1520        let due = now + Duration::from_secs(10);
1521        let emitted = tick(&state, &mut t, &cfg, due);
1522        assert_eq!(emitted, 0, "no escalation for a resolved conflict");
1523    }
1524
1525    #[test]
1526    fn detector_ownership_violation_emits_feedback_and_question() {
1527        let state = fresh_state();
1528        let mut t = ConflictTracker::new();
1529        delivery::publish_message(&state, &status_msg("feat-x", &[]));
1530        delivery::publish_message(&state, &status_msg("feat-y", &[]));
1531        let now = Instant::now();
1532        let cfg = ConflictConfig {
1533            // disable forward warning to isolate ownership behaviour
1534            warn_on_intent_overlap: false,
1535            ..ConflictConfig::default()
1536        };
1537        process_message(
1538            &state,
1539            &mut t,
1540            &intent_msg("feat-x", &["src/a.rs"], "x", 600),
1541            &cfg,
1542            now,
1543        );
1544        process_message(
1545            &state,
1546            &mut t,
1547            &intent_msg("feat-y", &["src/b.rs"], "y", 600),
1548            &cfg,
1549            now,
1550        );
1551        process_message(
1552            &state,
1553            &mut t,
1554            &status_msg("feat-y", &["src/a.rs"]),
1555            &cfg,
1556            now,
1557        );
1558        let y_fb = supervisor_feedbacks_in_inbox(&state, "feat-y");
1559        assert_eq!(y_fb.len(), 1);
1560        if let BrokerMessage::Feedback { payload, .. } = &y_fb[0] {
1561            assert!(payload.errors[0].contains("ownership violation"));
1562            assert!(payload.errors[0].contains("src/a.rs"));
1563            assert!(payload.errors[0].contains("feat-x"));
1564        }
1565        let q = supervisor_questions(&state);
1566        assert_eq!(q.len(), 1);
1567    }
1568
1569    #[test]
1570    fn detector_ownership_escalation_suppression() {
1571        let state = fresh_state();
1572        let mut t = ConflictTracker::new();
1573        delivery::publish_message(&state, &status_msg("feat-x", &[]));
1574        delivery::publish_message(&state, &status_msg("feat-y", &[]));
1575        let now = Instant::now();
1576        let cfg = ConflictConfig {
1577            warn_on_intent_overlap: false,
1578            escalate_on_violation: false,
1579            ..ConflictConfig::default()
1580        };
1581        process_message(
1582            &state,
1583            &mut t,
1584            &intent_msg("feat-x", &["src/a.rs"], "x", 600),
1585            &cfg,
1586            now,
1587        );
1588        process_message(
1589            &state,
1590            &mut t,
1591            &status_msg("feat-y", &["src/a.rs"]),
1592            &cfg,
1593            now,
1594        );
1595        // Feedback still fires.
1596        assert_eq!(supervisor_feedbacks_in_inbox(&state, "feat-y").len(), 1);
1597        // No question to supervisor.
1598        assert!(supervisor_questions(&state).is_empty());
1599    }
1600
1601    #[test]
1602    fn detector_ownership_file_inside_own_intent_no_violation() {
1603        let state = fresh_state();
1604        let mut t = ConflictTracker::new();
1605        delivery::publish_message(&state, &status_msg("feat-y", &[]));
1606        let now = Instant::now();
1607        let cfg = default_config();
1608        process_message(
1609            &state,
1610            &mut t,
1611            &intent_msg("feat-y", &["src/a.rs"], "y", 600),
1612            &cfg,
1613            now,
1614        );
1615        process_message(
1616            &state,
1617            &mut t,
1618            &status_msg("feat-y", &["src/a.rs"]),
1619            &cfg,
1620            now,
1621        );
1622        assert!(supervisor_feedbacks_in_inbox(&state, "feat-y").is_empty());
1623        assert!(supervisor_questions(&state).is_empty());
1624    }
1625
1626    #[test]
1627    fn detector_ownership_unclaimed_file_no_violation() {
1628        let state = fresh_state();
1629        let mut t = ConflictTracker::new();
1630        delivery::publish_message(&state, &status_msg("feat-y", &[]));
1631        let now = Instant::now();
1632        let cfg = default_config();
1633        // feat-y has no intent at all.
1634        process_message(
1635            &state,
1636            &mut t,
1637            &status_msg("feat-y", &["src/orphan.rs"]),
1638            &cfg,
1639            now,
1640        );
1641        assert!(supervisor_feedbacks_in_inbox(&state, "feat-y").is_empty());
1642        assert!(supervisor_questions(&state).is_empty());
1643    }
1644
1645    #[test]
1646    fn detector_ownership_violation_dedupe() {
1647        let state = fresh_state();
1648        let mut t = ConflictTracker::new();
1649        delivery::publish_message(&state, &status_msg("feat-x", &[]));
1650        delivery::publish_message(&state, &status_msg("feat-y", &[]));
1651        let now = Instant::now();
1652        let cfg = ConflictConfig {
1653            warn_on_intent_overlap: false,
1654            ..ConflictConfig::default()
1655        };
1656        process_message(
1657            &state,
1658            &mut t,
1659            &intent_msg("feat-x", &["src/a.rs"], "x", 600),
1660            &cfg,
1661            now,
1662        );
1663        process_message(
1664            &state,
1665            &mut t,
1666            &status_msg("feat-y", &["src/a.rs"]),
1667            &cfg,
1668            now,
1669        );
1670        let first = supervisor_feedbacks_in_inbox(&state, "feat-y").len();
1671        // Second status from same violator on same file.
1672        process_message(
1673            &state,
1674            &mut t,
1675            &status_msg("feat-y", &["src/a.rs"]),
1676            &cfg,
1677            now,
1678        );
1679        let second = supervisor_feedbacks_in_inbox(&state, "feat-y").len();
1680        assert_eq!(
1681            first, second,
1682            "no new ownership feedback on repeated status"
1683        );
1684    }
1685
1686    #[test]
1687    fn detector_filters_own_emissions() {
1688        // Re-entrancy guard — feedback/question emitted with from="supervisor"
1689        // (or agent_id="supervisor") must not be re-processed.
1690        let state = fresh_state();
1691        let mut t = ConflictTracker::new();
1692        let now = Instant::now();
1693        let cfg = default_config();
1694        let detector_feedback = BrokerMessage::Feedback {
1695            agent_id: "feat-x".into(),
1696            payload: FeedbackPayload {
1697                from: CONFLICT_DETECTOR_SENDER.into(),
1698                errors: vec![format!("{CONFLICT_DETECTOR_TAG} test")],
1699            },
1700        };
1701        let emitted = process_message(&state, &mut t, &detector_feedback, &cfg, now);
1702        assert_eq!(emitted, 0);
1703        let detector_question = BrokerMessage::Question {
1704            agent_id: CONFLICT_DETECTOR_SENDER.into(),
1705            payload: QuestionPayload {
1706                question: format!("{CONFLICT_DETECTOR_TAG} test"),
1707            },
1708        };
1709        let emitted = process_message(&state, &mut t, &detector_question, &cfg, now);
1710        assert_eq!(emitted, 0);
1711    }
1712
1713    #[test]
1714    fn detector_ignores_artifact_messages_for_warnings() {
1715        // Artifacts don't drive the detector (only Intent + Status do).
1716        // Confirms forward-coordination's broadcast pattern isn't
1717        // accidentally tripping warnings.
1718        let state = fresh_state();
1719        let mut t = ConflictTracker::new();
1720        let now = Instant::now();
1721        let cfg = default_config();
1722        let artifact = BrokerMessage::Artifact {
1723            agent_id: "feat-x".into(),
1724            payload: ArtifactPayload {
1725                status: "done".into(),
1726                exports: vec![],
1727                modified_files: vec!["src/a.rs".into()],
1728            },
1729        };
1730        let emitted = process_message(&state, &mut t, &artifact, &cfg, now);
1731        assert_eq!(emitted, 0);
1732    }
1733
1734    // ====================================================================
1735    // Auto-emit message conventions (task 4 / spec scenarios)
1736    // ====================================================================
1737
1738    #[test]
1739    fn auto_emitted_feedback_uses_supervisor_from_and_conflict_tag() {
1740        let state = fresh_state();
1741        // Recipient must have a registered inbox or delivery silently drops.
1742        delivery::publish_message(&state, &status_msg("feat-x", &[]));
1743        emit_feedback(&state, "feat-x", "[conflict-detector] something".into());
1744        let (msgs, _) = delivery::poll_messages(&state, "feat-x", 0);
1745        let fb: Vec<&BrokerMessage> = msgs
1746            .iter()
1747            .filter(|m| matches!(m, BrokerMessage::Feedback { .. }))
1748            .collect();
1749        assert_eq!(fb.len(), 1);
1750        if let BrokerMessage::Feedback { payload, .. } = fb[0] {
1751            assert_eq!(payload.from, CONFLICT_DETECTOR_SENDER);
1752            assert!(payload.errors[0].starts_with(CONFLICT_DETECTOR_TAG));
1753        } else {
1754            panic!("expected Feedback");
1755        }
1756    }
1757
1758    #[test]
1759    fn auto_emitted_question_targets_supervisor_inbox_with_tag() {
1760        let state = fresh_state();
1761        emit_question(&state, "[conflict-detector] test".into());
1762        let (msgs, _) = delivery::poll_messages(&state, "supervisor", 0);
1763        assert_eq!(msgs.len(), 1);
1764        if let BrokerMessage::Question { agent_id, payload } = &msgs[0] {
1765            assert_eq!(agent_id, "supervisor");
1766            assert!(payload.question.contains(CONFLICT_DETECTOR_TAG));
1767        } else {
1768            panic!("expected Question");
1769        }
1770    }
1771
1772    // ====================================================================
1773    // NormalizedFileIntent conversion (task 2.2)
1774    // ====================================================================
1775
1776    #[test]
1777    fn normalized_from_path_has_no_regions() {
1778        let n: NormalizedFileIntent = FileIntent::Path("src/a.rs".to_string()).into();
1779        assert_eq!(n.path, "src/a.rs");
1780        assert_eq!(n.regions, None);
1781    }
1782
1783    #[test]
1784    fn normalized_from_detailed_with_regions_is_some() {
1785        let n: NormalizedFileIntent = FileIntent::Detailed {
1786            path: "src/a.rs".to_string(),
1787            regions: vec![func("validate_token")],
1788        }
1789        .into();
1790        assert_eq!(n.regions, Some(vec![func("validate_token")]));
1791    }
1792
1793    #[test]
1794    fn normalized_from_detailed_empty_regions_collapses_to_none() {
1795        let n: NormalizedFileIntent = FileIntent::Detailed {
1796            path: "src/a.rs".to_string(),
1797            regions: vec![],
1798        }
1799        .into();
1800        assert_eq!(
1801            n.regions, None,
1802            "an empty regions vec is equivalent to no regions"
1803        );
1804    }
1805
1806    // ====================================================================
1807    // regions_intersect unit coverage (task 3.3)
1808    // ====================================================================
1809
1810    #[test]
1811    fn regions_intersect_same_function_name() {
1812        let (hits, cross) = regions_intersect(&[func("a")], &[func("a")]);
1813        assert_eq!(hits, vec![func("a")]);
1814        assert!(!cross);
1815    }
1816
1817    #[test]
1818    fn regions_intersect_different_function_names_empty() {
1819        let (hits, cross) = regions_intersect(&[func("a")], &[func("b")]);
1820        assert!(hits.is_empty());
1821        assert!(!cross);
1822    }
1823
1824    #[test]
1825    fn regions_intersect_different_named_kinds_empty() {
1826        // function "a" vs class "a" — different kinds, no intersection.
1827        let class_a = Region::Class {
1828            name: "a".to_string(),
1829        };
1830        let (hits, cross) = regions_intersect(&[func("a")], &[class_a]);
1831        assert!(hits.is_empty());
1832        assert!(!cross);
1833    }
1834
1835    #[test]
1836    fn regions_intersect_overlapping_ranges() {
1837        let r1 = Region::Range {
1838            start_line: 10,
1839            end_line: 30,
1840        };
1841        let r2 = Region::Range {
1842            start_line: 25,
1843            end_line: 45,
1844        };
1845        let (hits, cross) = regions_intersect(&[r1], &[r2]);
1846        assert_eq!(
1847            hits,
1848            vec![Region::Range {
1849                start_line: 25,
1850                end_line: 30
1851            }]
1852        );
1853        assert!(!cross);
1854    }
1855
1856    #[test]
1857    fn regions_intersect_non_overlapping_ranges_empty() {
1858        let r1 = Region::Range {
1859            start_line: 10,
1860            end_line: 20,
1861        };
1862        let r2 = Region::Range {
1863            start_line: 30,
1864            end_line: 40,
1865        };
1866        let (hits, _) = regions_intersect(&[r1], &[r2]);
1867        assert!(hits.is_empty());
1868    }
1869
1870    #[test]
1871    fn regions_intersect_cross_kind_is_conservative() {
1872        let range = Region::Range {
1873            start_line: 10,
1874            end_line: 50,
1875        };
1876        let (hits, cross) =
1877            regions_intersect(&[func("validate_token")], std::slice::from_ref(&range));
1878        assert!(cross, "named-vs-range must flag cross_kind");
1879        assert!(hits.contains(&func("validate_token")));
1880        assert!(hits.contains(&range));
1881    }
1882
1883    // ====================================================================
1884    // Region-aware detector scenarios (task 3.6 / spec.md)
1885    // ====================================================================
1886
1887    fn run_two_intents(
1888        a: &BrokerMessage,
1889        b: &BrokerMessage,
1890    ) -> (Arc<BrokerState>, ConflictTracker) {
1891        let state = fresh_state();
1892        let mut t = ConflictTracker::new();
1893        delivery::publish_message(&state, &status_msg("feat-x", &[]));
1894        delivery::publish_message(&state, &status_msg("feat-y", &[]));
1895        let now = Instant::now();
1896        let cfg = default_config();
1897        process_message(&state, &mut t, a, &cfg, now);
1898        process_message(&state, &mut t, b, &cfg, now);
1899        (state, t)
1900    }
1901
1902    #[test]
1903    fn detector_non_overlapping_functions_no_conflict() {
1904        // Spec: Non-overlapping functions in the same file do not conflict.
1905        let a = intent_msg_with_regions(
1906            "feat-x",
1907            &[("src/auth.rs", vec![func("validate_token")])],
1908            "x",
1909            600,
1910        );
1911        let b = intent_msg_with_regions(
1912            "feat-y",
1913            &[("src/auth.rs", vec![func("refresh_session")])],
1914            "y",
1915            600,
1916        );
1917        let (state, _) = run_two_intents(&a, &b);
1918        assert!(supervisor_feedbacks_in_inbox(&state, "feat-x").is_empty());
1919        assert!(supervisor_feedbacks_in_inbox(&state, "feat-y").is_empty());
1920    }
1921
1922    #[test]
1923    fn detector_overlapping_functions_conflict_names_function() {
1924        // Spec: Overlapping functions in the same file conflict; warning names
1925        // the intersecting function.
1926        let a = intent_msg_with_regions(
1927            "feat-x",
1928            &[("src/auth.rs", vec![func("validate_token")])],
1929            "x",
1930            600,
1931        );
1932        let b = intent_msg_with_regions(
1933            "feat-y",
1934            &[("src/auth.rs", vec![func("validate_token")])],
1935            "y",
1936            600,
1937        );
1938        let (state, _) = run_two_intents(&a, &b);
1939        let x_fb = supervisor_feedbacks_in_inbox(&state, "feat-x");
1940        assert_eq!(x_fb.len(), 1);
1941        if let BrokerMessage::Feedback { payload, .. } = &x_fb[0] {
1942            let err = &payload.errors[0];
1943            assert!(err.contains("forward conflict"));
1944            assert!(err.contains("feat-y"));
1945            assert!(err.contains("function validate_token"));
1946            assert!(err.contains("src/auth.rs"));
1947        } else {
1948            panic!("expected Feedback");
1949        }
1950        assert_eq!(supervisor_feedbacks_in_inbox(&state, "feat-y").len(), 1);
1951    }
1952
1953    #[test]
1954    fn detector_file_level_fallback_when_one_side_omits_regions() {
1955        // Spec: File-level fallback when regions omitted. A declares regions,
1956        // B is a plain string → file-level conflict.
1957        let a = intent_msg_with_regions(
1958            "feat-x",
1959            &[("src/auth.rs", vec![func("validate_token")])],
1960            "x",
1961            600,
1962        );
1963        let b = intent_msg("feat-y", &["src/auth.rs"], "y", 600);
1964        let (state, _) = run_two_intents(&a, &b);
1965        let x_fb = supervisor_feedbacks_in_inbox(&state, "feat-x");
1966        assert_eq!(x_fb.len(), 1, "file-level fallback must still warn");
1967        if let BrokerMessage::Feedback { payload, .. } = &x_fb[0] {
1968            // File-level conflict: names the path, no region detail.
1969            assert!(payload.errors[0].contains("src/auth.rs"));
1970            assert!(!payload.errors[0].contains("(regions:"));
1971        }
1972    }
1973
1974    #[test]
1975    fn detector_cross_kind_conflict_includes_hint() {
1976        // Spec: Cross-kind comparison intersects conservatively + hint.
1977        let range = Region::Range {
1978            start_line: 10,
1979            end_line: 50,
1980        };
1981        let a = intent_msg_with_regions(
1982            "feat-x",
1983            &[("src/auth.rs", vec![func("validate_token")])],
1984            "x",
1985            600,
1986        );
1987        let b = intent_msg_with_regions("feat-y", &[("src/auth.rs", vec![range])], "y", 600);
1988        let (state, _) = run_two_intents(&a, &b);
1989        let x_fb = supervisor_feedbacks_in_inbox(&state, "feat-x");
1990        assert_eq!(x_fb.len(), 1);
1991        if let BrokerMessage::Feedback { payload, .. } = &x_fb[0] {
1992            assert!(
1993                payload.errors[0].contains(CROSS_KIND_HINT),
1994                "cross-kind conflict must include the hint; got: {}",
1995                payload.errors[0]
1996            );
1997        }
1998    }
1999
2000    #[test]
2001    fn detector_overlapping_ranges_conflict() {
2002        // Spec: Overlapping ranges intersect.
2003        let r1 = Region::Range {
2004            start_line: 10,
2005            end_line: 30,
2006        };
2007        let r2 = Region::Range {
2008            start_line: 25,
2009            end_line: 45,
2010        };
2011        let a = intent_msg_with_regions("feat-x", &[("src/auth.rs", vec![r1])], "x", 600);
2012        let b = intent_msg_with_regions("feat-y", &[("src/auth.rs", vec![r2])], "y", 600);
2013        let (state, _) = run_two_intents(&a, &b);
2014        let x_fb = supervisor_feedbacks_in_inbox(&state, "feat-x");
2015        assert_eq!(x_fb.len(), 1);
2016        if let BrokerMessage::Feedback { payload, .. } = &x_fb[0] {
2017            assert!(payload.errors[0].contains("range 25-30"));
2018        }
2019    }
2020
2021    #[test]
2022    fn detector_non_overlapping_ranges_no_conflict() {
2023        // Spec: Non-overlapping ranges do not intersect.
2024        let r1 = Region::Range {
2025            start_line: 10,
2026            end_line: 20,
2027        };
2028        let r2 = Region::Range {
2029            start_line: 30,
2030            end_line: 40,
2031        };
2032        let a = intent_msg_with_regions("feat-x", &[("src/auth.rs", vec![r1])], "x", 600);
2033        let b = intent_msg_with_regions("feat-y", &[("src/auth.rs", vec![r2])], "y", 600);
2034        let (state, _) = run_two_intents(&a, &b);
2035        assert!(supervisor_feedbacks_in_inbox(&state, "feat-x").is_empty());
2036        assert!(supervisor_feedbacks_in_inbox(&state, "feat-y").is_empty());
2037    }
2038
2039    #[test]
2040    fn detector_warning_enumerates_multiple_intersecting_regions() {
2041        // Spec: Detector warning identifies intersecting regions — both
2042        // functions named.
2043        let a = intent_msg_with_regions(
2044            "feat-x",
2045            &[(
2046                "src/auth.rs",
2047                vec![func("validate_token"), func("refresh_session")],
2048            )],
2049            "x",
2050            600,
2051        );
2052        let b = intent_msg_with_regions(
2053            "feat-y",
2054            &[(
2055                "src/auth.rs",
2056                vec![func("validate_token"), func("refresh_session")],
2057            )],
2058            "y",
2059            600,
2060        );
2061        let (state, _) = run_two_intents(&a, &b);
2062        let x_fb = supervisor_feedbacks_in_inbox(&state, "feat-x");
2063        assert_eq!(x_fb.len(), 1);
2064        if let BrokerMessage::Feedback { payload, .. } = &x_fb[0] {
2065            assert!(payload.errors[0].contains("function validate_token"));
2066            assert!(payload.errors[0].contains("function refresh_session"));
2067        }
2068    }
2069
2070    #[test]
2071    fn detector_v050_string_only_intents_behave_file_level() {
2072        // Backwards compatibility: string-only intents (v0.5.0 shape) warn at
2073        // file level exactly as before regions existed.
2074        let a = intent_msg("feat-x", &["src/foo.rs", "src/bar.rs"], "x", 600);
2075        let b = intent_msg("feat-y", &["src/bar.rs"], "y", 600);
2076        let (state, _) = run_two_intents(&a, &b);
2077        let x_fb = supervisor_feedbacks_in_inbox(&state, "feat-x");
2078        assert_eq!(x_fb.len(), 1);
2079        if let BrokerMessage::Feedback { payload, .. } = &x_fb[0] {
2080            assert!(payload.errors[0].contains("src/bar.rs"));
2081            assert!(!payload.errors[0].contains("(regions:"));
2082        }
2083    }
2084
2085    #[test]
2086    fn detector_region_conflict_only_on_intersecting_file() {
2087        // Two shared files: one with overlapping regions, one with disjoint
2088        // regions. Only the intersecting file appears in the warning.
2089        let a = intent_msg_with_regions(
2090            "feat-x",
2091            &[
2092                ("src/auth.rs", vec![func("validate_token")]),
2093                ("src/db.rs", vec![func("connect")]),
2094            ],
2095            "x",
2096            600,
2097        );
2098        let b = intent_msg_with_regions(
2099            "feat-y",
2100            &[
2101                ("src/auth.rs", vec![func("validate_token")]),
2102                ("src/db.rs", vec![func("migrate")]),
2103            ],
2104            "y",
2105            600,
2106        );
2107        let (state, _) = run_two_intents(&a, &b);
2108        let x_fb = supervisor_feedbacks_in_inbox(&state, "feat-x");
2109        assert_eq!(x_fb.len(), 1);
2110        if let BrokerMessage::Feedback { payload, .. } = &x_fb[0] {
2111            assert!(payload.errors[0].contains("src/auth.rs"));
2112            assert!(
2113                !payload.errors[0].contains("src/db.rs"),
2114                "db.rs has disjoint functions and must not appear: {}",
2115                payload.errors[0]
2116            );
2117        }
2118    }
2119}