Skip to main content

scud_weave/
coordinator.rs

1//! Coordinator — evaluates b-thread rules against proposed events.
2
3use chrono::Utc;
4use serde::{Deserialize, Serialize};
5use std::collections::hash_map::DefaultHasher;
6use std::collections::HashMap;
7use std::hash::{Hash, Hasher};
8
9use crate::bthread::{BThread, BThreadRule, PartitionDef, PartitionStrategy, Role};
10use crate::event::Event;
11use crate::log::TimestampedEvent;
12
13/// An active mutex lock held by an agent.
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct ActiveLock {
16    pub lock_key: String,
17    pub holder_agent: String,
18    pub task_id: Option<String>,
19    pub acquired_at: String,
20    pub ttl_secs: u64,
21}
22
23impl ActiveLock {
24    /// Check if this lock has expired based on current time.
25    pub fn is_expired(&self) -> bool {
26        if self.ttl_secs == 0 {
27            return false;
28        }
29        let Ok(acquired) = chrono::DateTime::parse_from_rfc3339(&self.acquired_at) else {
30            return false;
31        };
32        let elapsed = Utc::now().signed_duration_since(acquired);
33        elapsed.num_seconds() as u64 >= self.ttl_secs
34    }
35}
36
37/// The coordinator's decision for a proposed event.
38#[derive(Debug, Clone)]
39pub enum Decision {
40    /// Event is allowed to proceed.
41    Proceed,
42    /// Event should wait (prerequisite not yet satisfied).
43    Wait {
44        reason: String,
45        thread_id: String,
46    },
47    /// Event is blocked (unconditional or resource conflict).
48    Blocked {
49        reason: String,
50        thread_id: String,
51    },
52}
53
54impl Decision {
55    pub fn is_proceed(&self) -> bool {
56        matches!(self, Decision::Proceed)
57    }
58
59    pub fn is_blocked(&self) -> bool {
60        matches!(self, Decision::Blocked { .. })
61    }
62
63    pub fn is_wait(&self) -> bool {
64        matches!(self, Decision::Wait { .. })
65    }
66}
67
68impl std::fmt::Display for Decision {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        match self {
71            Decision::Proceed => write!(f, "PROCEED"),
72            Decision::Wait { reason, thread_id } => {
73                write!(f, "WAIT for {} \"{}\"", thread_id, reason)
74            }
75            Decision::Blocked { reason, thread_id } => {
76                write!(f, "BLOCKED by {} \"{}\"", thread_id, reason)
77            }
78        }
79    }
80}
81
82/// The b-thread coordinator.
83pub struct Coordinator {
84    pub threads: Vec<BThread>,
85    pub roles: Vec<Role>,
86    pub partitions: Vec<PartitionDef>,
87    pub active_locks: HashMap<String, ActiveLock>,
88    pub event_log: Vec<TimestampedEvent>,
89}
90
91impl Coordinator {
92    /// Create an empty coordinator.
93    pub fn new() -> Self {
94        Coordinator {
95            threads: Vec::new(),
96            roles: Vec::new(),
97            partitions: Vec::new(),
98            active_locks: HashMap::new(),
99            event_log: Vec::new(),
100        }
101    }
102
103    /// Expire any locks that have exceeded their TTL.
104    pub fn expire_locks(&mut self) {
105        self.active_locks.retain(|_, lock| !lock.is_expired());
106    }
107
108    /// Core evaluation: given a proposed event, check all b-threads.
109    ///
110    /// Evaluation order (by b-thread priority, then rule type):
111    /// BlockAlways -> Role check -> Partition -> Mutex -> Require -> BlockUntil -> RateLimit -> Timeout
112    ///
113    /// Returns the first blocking/waiting decision found.
114    pub fn evaluate(&self, event: &Event) -> Decision {
115        // Sort threads by priority (lower = higher priority)
116        let mut sorted: Vec<&BThread> = self.threads.iter().filter(|t| t.enabled).collect();
117        sorted.sort_by_key(|t| t.priority);
118
119        for thread in &sorted {
120            for rule in &thread.rules {
121                let decision = self.evaluate_rule(rule, event, &thread.id, &thread.name);
122                if !decision.is_proceed() {
123                    return decision;
124                }
125            }
126        }
127
128        Decision::Proceed
129    }
130
131    /// Evaluate a single rule against an event.
132    fn evaluate_rule(
133        &self,
134        rule: &BThreadRule,
135        event: &Event,
136        thread_id: &str,
137        thread_name: &str,
138    ) -> Decision {
139        match rule {
140            BThreadRule::BlockAlways { scope } => {
141                if scope.matches_event(event) {
142                    return Decision::Blocked {
143                        reason: format!("{}: unconditionally blocked", thread_name),
144                        thread_id: thread_id.to_string(),
145                    };
146                }
147            }
148            BThreadRule::Mutex {
149                scope,
150                key,
151                ttl_secs: _,
152            } => {
153                if scope.matches_event(event) {
154                    let expanded_key = expand_key_template(key, event);
155                    if let Some(lock) = self.active_locks.get(&expanded_key) {
156                        if !lock.is_expired() {
157                            // Blocked if different agent holds lock
158                            let event_agent = event.agent.as_deref().unwrap_or("");
159                            if lock.holder_agent != event_agent {
160                                return Decision::Blocked {
161                                    reason: format!(
162                                        "{}: {} holds {} ({})",
163                                        thread_name,
164                                        lock.holder_agent,
165                                        expanded_key,
166                                        lock.task_id.as_deref().unwrap_or("no task"),
167                                    ),
168                                    thread_id: thread_id.to_string(),
169                                };
170                            }
171                        }
172                    }
173                }
174            }
175            BThreadRule::Require {
176                trigger,
177                prerequisite,
178                reset,
179            } => {
180                if trigger.matches_event(event) {
181                    // Find the last reset event (if reset pattern specified)
182                    let reset_idx = reset.as_ref().and_then(|rp| {
183                        self.event_log
184                            .iter()
185                            .rposition(|te| rp.matches_event(&te.event))
186                    });
187
188                    // Check if prerequisite exists after the last reset
189                    let search_from = reset_idx.map(|i| i + 1).unwrap_or(0);
190                    let has_prereq = self.event_log[search_from..]
191                        .iter()
192                        .any(|te| prerequisite.matches_event(&te.event));
193
194                    if !has_prereq {
195                        return Decision::Wait {
196                            reason: format!(
197                                "{}: {} required",
198                                thread_name,
199                                prerequisite
200                                    .kind
201                                    .as_ref()
202                                    .map(|k| k.as_str())
203                                    .unwrap_or("prerequisite"),
204                            ),
205                            thread_id: thread_id.to_string(),
206                        };
207                    }
208                }
209            }
210            BThreadRule::BlockUntil {
211                trigger,
212                block,
213                until,
214                ..
215            } => {
216                // Check if trigger has been seen
217                let trigger_seen = self
218                    .event_log
219                    .iter()
220                    .rposition(|te| trigger.matches_event(&te.event));
221
222                if let Some(trigger_idx) = trigger_seen {
223                    // Check if "until" event has been seen after trigger
224                    let until_seen = self.event_log[trigger_idx..]
225                        .iter()
226                        .any(|te| until.matches_event(&te.event));
227
228                    if !until_seen {
229                        // We're in the "triggered but not yet resolved" state
230                        // Check if the current event matches any of the block patterns
231                        for bp in block {
232                            if bp.matches_event(event) {
233                                return Decision::Blocked {
234                                    reason: format!(
235                                        "{}: blocked until {}",
236                                        thread_name,
237                                        until
238                                            .kind
239                                            .as_ref()
240                                            .map(|k| k.as_str())
241                                            .unwrap_or("condition met"),
242                                    ),
243                                    thread_id: thread_id.to_string(),
244                                };
245                            }
246                        }
247                    }
248                }
249            }
250            BThreadRule::RateLimit {
251                scope,
252                max,
253                window_secs,
254            } => {
255                if scope.matches_event(event) {
256                    let now = Utc::now();
257                    let count = self
258                        .event_log
259                        .iter()
260                        .filter(|te| {
261                            if !scope.matches_event(&te.event) {
262                                return false;
263                            }
264                            if let Ok(ts) = chrono::DateTime::parse_from_rfc3339(&te.timestamp) {
265                                let elapsed = now.signed_duration_since(ts);
266                                elapsed.num_seconds() < *window_secs as i64
267                            } else {
268                                false
269                            }
270                        })
271                        .count() as u32;
272
273                    if count >= *max {
274                        return Decision::Blocked {
275                            reason: format!(
276                                "{}: rate limit exceeded ({}/{} in {}s window)",
277                                thread_name, count, max, window_secs,
278                            ),
279                            thread_id: thread_id.to_string(),
280                        };
281                    }
282                }
283            }
284            BThreadRule::Timeout {
285                scope,
286                max_duration_secs,
287                action: _,
288            } => {
289                if scope.matches_event(event) {
290                    // Check if there's a matching event that started too long ago
291                    let now = Utc::now();
292                    for te in self.event_log.iter().rev() {
293                        if scope.matches_event(&te.event) {
294                            if let Ok(ts) = chrono::DateTime::parse_from_rfc3339(&te.timestamp) {
295                                let elapsed = now.signed_duration_since(ts);
296                                if elapsed.num_seconds() as u64 > *max_duration_secs {
297                                    return Decision::Blocked {
298                                        reason: format!(
299                                            "{}: timeout exceeded ({}s > {}s max)",
300                                            thread_name,
301                                            elapsed.num_seconds(),
302                                            max_duration_secs,
303                                        ),
304                                        thread_id: thread_id.to_string(),
305                                    };
306                                }
307                            }
308                            break;
309                        }
310                    }
311                }
312            }
313            BThreadRule::Partition {
314                scope,
315                strategy,
316                agent_count,
317            } => {
318                if scope.matches_event(event) {
319                    if let (Some(agent), Some(target)) = (&event.agent, &event.target) {
320                        let agent_slot = parse_agent_slot(agent);
321                        let target_slot = compute_partition_slot(target, *strategy, *agent_count);
322                        if agent_slot != target_slot {
323                            return Decision::Blocked {
324                                reason: format!(
325                                    "{}: {} assigned to slot {}, target in slot {}",
326                                    thread_name, agent, agent_slot, target_slot,
327                                ),
328                                thread_id: thread_id.to_string(),
329                            };
330                        }
331                    }
332                }
333            }
334        }
335
336        // Also check role constraints (outside b-threads)
337        // Role checks are done in evaluate() directly
338        Decision::Proceed
339    }
340
341    /// Check role constraints for an event.
342    fn check_roles(&self, event: &Event) -> Decision {
343        // Role checks require an agent and a target
344        let Some(_agent) = &event.agent else {
345            return Decision::Proceed;
346        };
347        let Some(target) = &event.target else {
348            return Decision::Proceed;
349        };
350
351        // Find agent's role from task node annotations (stored in metadata)
352        let role_id = event.metadata.get("role");
353        let Some(role_id) = role_id else {
354            return Decision::Proceed;
355        };
356
357        // Find the role definition
358        let Some(role) = self.roles.iter().find(|r| r.id == *role_id) else {
359            return Decision::Proceed;
360        };
361
362        // Check deny patterns first
363        for deny in &role.deny_patterns {
364            if deny.matches(target) {
365                return Decision::Blocked {
366                    reason: format!(
367                        "role \"{}\": {} denied for {} role",
368                        role.name, target, role.name,
369                    ),
370                    thread_id: format!("role:{}", role.id),
371                };
372            }
373        }
374
375        // If allow patterns exist and are non-empty, target must match at least one
376        if !role.allow_patterns.is_empty() {
377            let allowed = role.allow_patterns.iter().any(|p| p.matches(target));
378            if !allowed {
379                return Decision::Blocked {
380                    reason: format!(
381                        "role \"{}\": {} not in allowed scope for {} role",
382                        role.name, target, role.name,
383                    ),
384                    thread_id: format!("role:{}", role.id),
385                };
386            }
387        }
388
389        Decision::Proceed
390    }
391
392    /// Full evaluation including role checks.
393    pub fn evaluate_full(&self, event: &Event) -> Decision {
394        // Check roles first
395        let role_decision = self.check_roles(event);
396        if !role_decision.is_proceed() {
397            return role_decision;
398        }
399
400        self.evaluate(event)
401    }
402
403    /// Record that an event occurred. Updates locks, appends to event log.
404    pub fn record(&mut self, event: &Event) -> anyhow::Result<()> {
405        let now = Utc::now().to_rfc3339();
406
407        // Append to event log
408        self.event_log.push(TimestampedEvent {
409            timestamp: now.clone(),
410            event: event.clone(),
411        });
412
413        // Check for mutex rules — create locks for matching events
414        for thread in &self.threads {
415            if !thread.enabled {
416                continue;
417            }
418            for rule in &thread.rules {
419                if let BThreadRule::Mutex {
420                    scope,
421                    key,
422                    ttl_secs,
423                } = rule
424                {
425                    if scope.matches_event(event) {
426                        let expanded_key = expand_key_template(key, event);
427                        if let Some(agent) = &event.agent {
428                            self.active_locks.insert(
429                                expanded_key.clone(),
430                                ActiveLock {
431                                    lock_key: expanded_key,
432                                    holder_agent: agent.clone(),
433                                    task_id: event.task_id.clone(),
434                                    acquired_at: now.clone(),
435                                    ttl_secs: ttl_secs.unwrap_or(3600),
436                                },
437                            );
438                        }
439                    }
440                }
441            }
442        }
443
444        Ok(())
445    }
446
447    /// Release a specific mutex lock.
448    pub fn release(&mut self, lock_key: &str, _agent: &str) -> anyhow::Result<()> {
449        self.active_locks.remove(lock_key);
450        Ok(())
451    }
452
453    /// Release all locks held by an agent, optionally filtered by task.
454    pub fn release_all(&mut self, agent: &str, task_id: Option<&str>) -> anyhow::Result<()> {
455        self.active_locks.retain(|_, lock| {
456            if lock.holder_agent != agent {
457                return true;
458            }
459            if let Some(tid) = task_id {
460                lock.task_id.as_deref() != Some(tid)
461            } else {
462                false
463            }
464        });
465        Ok(())
466    }
467}
468
469impl Default for Coordinator {
470    fn default() -> Self {
471        Self::new()
472    }
473}
474
475/// Expand a key template by replacing `{target}` and `{agent}` with event values.
476fn expand_key_template(template: &str, event: &Event) -> String {
477    let mut result = template.to_string();
478    if let Some(ref target) = event.target {
479        result = result.replace("{target}", target);
480    }
481    if let Some(ref agent) = event.agent {
482        result = result.replace("{agent}", agent);
483    }
484    result
485}
486
487/// Parse an agent slot number from an agent name like "agent-0", "agent-1".
488fn parse_agent_slot(agent: &str) -> u32 {
489    agent
490        .rsplit('-')
491        .next()
492        .and_then(|s| s.parse::<u32>().ok())
493        .unwrap_or(0)
494}
495
496/// Compute which partition slot a target belongs to.
497fn compute_partition_slot(target: &str, strategy: PartitionStrategy, agent_count: u32) -> u32 {
498    if agent_count == 0 {
499        return 0;
500    }
501    match strategy {
502        PartitionStrategy::Hash => {
503            let mut hasher = DefaultHasher::new();
504            target.hash(&mut hasher);
505            (hasher.finish() % agent_count as u64) as u32
506        }
507        PartitionStrategy::RoundRobin => {
508            // Sort-order based: hash for determinism
509            let mut hasher = DefaultHasher::new();
510            target.hash(&mut hasher);
511            (hasher.finish() % agent_count as u64) as u32
512        }
513        PartitionStrategy::Directory => {
514            // Use top-level directory
515            let dir = target.split('/').next().unwrap_or(target);
516            let mut hasher = DefaultHasher::new();
517            dir.hash(&mut hasher);
518            (hasher.finish() % agent_count as u64) as u32
519        }
520    }
521}
522
523#[cfg(test)]
524mod tests {
525    use super::*;
526    use crate::event::{EventKind, EventPattern};
527    use crate::matcher::GlobPattern;
528
529    fn make_event(kind: EventKind, agent: &str, target: &str) -> Event {
530        Event {
531            kind,
532            agent: Some(agent.to_string()),
533            target: Some(target.to_string()),
534            task_id: None,
535            metadata: std::collections::HashMap::new(),
536        }
537    }
538
539    fn make_timestamped(event: Event, timestamp: &str) -> TimestampedEvent {
540        TimestampedEvent {
541            timestamp: timestamp.to_string(),
542            event,
543        }
544    }
545
546    #[test]
547    fn test_block_always() {
548        let mut coord = Coordinator::new();
549        coord.threads.push(BThread {
550            id: "w:1".to_string(),
551            name: "No dangerous".to_string(),
552            priority: 1,
553            enabled: true,
554            rules: vec![BThreadRule::BlockAlways {
555                scope: EventPattern::kind(EventKind::DangerousCommand),
556            }],
557        });
558
559        let event = make_event(EventKind::DangerousCommand, "agent-1", "pkill bash");
560        assert!(coord.evaluate(&event).is_blocked());
561
562        let event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
563        assert!(coord.evaluate(&event).is_proceed());
564    }
565
566    #[test]
567    fn test_mutex_different_agents() {
568        let mut coord = Coordinator::new();
569        coord.threads.push(BThread {
570            id: "w:1".to_string(),
571            name: "File mutex".to_string(),
572            priority: 5,
573            enabled: true,
574            rules: vec![BThreadRule::Mutex {
575                scope: EventPattern::kind(EventKind::FileWrite),
576                key: "file:{target}".to_string(),
577                ttl_secs: Some(3600),
578            }],
579        });
580
581        // agent-1 holds lock
582        coord.active_locks.insert(
583            "file:src/main.rs".to_string(),
584            ActiveLock {
585                lock_key: "file:src/main.rs".to_string(),
586                holder_agent: "agent-1".to_string(),
587                task_id: Some("auth:1".to_string()),
588                acquired_at: Utc::now().to_rfc3339(),
589                ttl_secs: 3600,
590            },
591        );
592
593        // agent-2 tries to write same file -> BLOCKED
594        let event = make_event(EventKind::FileWrite, "agent-2", "src/main.rs");
595        assert!(coord.evaluate(&event).is_blocked());
596
597        // agent-1 writes same file -> PROCEED (same agent)
598        let event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
599        assert!(coord.evaluate(&event).is_proceed());
600
601        // agent-2 writes different file -> PROCEED
602        let event = make_event(EventKind::FileWrite, "agent-2", "src/lib.rs");
603        assert!(coord.evaluate(&event).is_proceed());
604    }
605
606    #[test]
607    fn test_require_no_prereq() {
608        let mut coord = Coordinator::new();
609        coord.threads.push(BThread {
610            id: "w:2".to_string(),
611            name: "Test gate".to_string(),
612            priority: 10,
613            enabled: true,
614            rules: vec![BThreadRule::Require {
615                trigger: EventPattern::kind(EventKind::Commit),
616                prerequisite: EventPattern::kind(EventKind::TestPass),
617                reset: Some(EventPattern::kind(EventKind::FileWrite)),
618            }],
619        });
620
621        // No events logged yet, try to commit -> WAIT
622        let event = make_event(EventKind::Commit, "agent-1", "");
623        assert!(coord.evaluate(&event).is_wait());
624    }
625
626    #[test]
627    fn test_require_with_prereq() {
628        let mut coord = Coordinator::new();
629        coord.threads.push(BThread {
630            id: "w:2".to_string(),
631            name: "Test gate".to_string(),
632            priority: 10,
633            enabled: true,
634            rules: vec![BThreadRule::Require {
635                trigger: EventPattern::kind(EventKind::Commit),
636                prerequisite: EventPattern::kind(EventKind::TestPass),
637                reset: Some(EventPattern::kind(EventKind::FileWrite)),
638            }],
639        });
640
641        // TestPass logged
642        coord.event_log.push(make_timestamped(
643            make_event(EventKind::TestPass, "agent-1", ""),
644            &Utc::now().to_rfc3339(),
645        ));
646
647        // Commit -> PROCEED
648        let event = make_event(EventKind::Commit, "agent-1", "");
649        assert!(coord.evaluate(&event).is_proceed());
650    }
651
652    #[test]
653    fn test_require_reset() {
654        let mut coord = Coordinator::new();
655        coord.threads.push(BThread {
656            id: "w:2".to_string(),
657            name: "Test gate".to_string(),
658            priority: 10,
659            enabled: true,
660            rules: vec![BThreadRule::Require {
661                trigger: EventPattern::kind(EventKind::Commit),
662                prerequisite: EventPattern::kind(EventKind::TestPass),
663                reset: Some(EventPattern::kind(EventKind::FileWrite)),
664            }],
665        });
666
667        let now = Utc::now();
668        // TestPass
669        coord.event_log.push(make_timestamped(
670            make_event(EventKind::TestPass, "agent-1", ""),
671            &now.to_rfc3339(),
672        ));
673        // FileWrite after TestPass -> resets requirement
674        coord.event_log.push(make_timestamped(
675            make_event(EventKind::FileWrite, "agent-1", "src/main.rs"),
676            &(now + chrono::Duration::seconds(1)).to_rfc3339(),
677        ));
678
679        // Commit -> WAIT (FileWrite reset the prerequisite)
680        let event = make_event(EventKind::Commit, "agent-1", "");
681        assert!(coord.evaluate(&event).is_wait());
682    }
683
684    #[test]
685    fn test_disabled_thread_ignored() {
686        let mut coord = Coordinator::new();
687        coord.threads.push(BThread {
688            id: "w:1".to_string(),
689            name: "Disabled".to_string(),
690            priority: 1,
691            enabled: false,
692            rules: vec![BThreadRule::BlockAlways {
693                scope: EventPattern::kind(EventKind::DangerousCommand),
694            }],
695        });
696
697        let event = make_event(EventKind::DangerousCommand, "agent-1", "pkill");
698        assert!(coord.evaluate(&event).is_proceed());
699    }
700
701    #[test]
702    fn test_record_creates_locks() {
703        let mut coord = Coordinator::new();
704        coord.threads.push(BThread {
705            id: "w:1".to_string(),
706            name: "File mutex".to_string(),
707            priority: 5,
708            enabled: true,
709            rules: vec![BThreadRule::Mutex {
710                scope: EventPattern::kind(EventKind::FileWrite),
711                key: "file:{target}".to_string(),
712                ttl_secs: Some(3600),
713            }],
714        });
715
716        let event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
717        coord.record(&event).unwrap();
718
719        assert!(coord.active_locks.contains_key("file:src/main.rs"));
720        assert_eq!(
721            coord.active_locks["file:src/main.rs"].holder_agent,
722            "agent-1"
723        );
724        assert_eq!(coord.event_log.len(), 1);
725    }
726
727    #[test]
728    fn test_role_deny() {
729        let mut coord = Coordinator::new();
730        coord.roles.push(Role {
731            id: "r:impl".to_string(),
732            name: "Implementer".to_string(),
733            allow_patterns: vec![GlobPattern::new("src/**")],
734            deny_patterns: vec![GlobPattern::new("docs/**")],
735        });
736
737        let mut event = make_event(EventKind::FileWrite, "agent-1", "docs/readme.md");
738        event.metadata.insert("role".to_string(), "r:impl".to_string());
739
740        assert!(coord.check_roles(&event).is_blocked());
741    }
742
743    #[test]
744    fn test_role_allow() {
745        let mut coord = Coordinator::new();
746        coord.roles.push(Role {
747            id: "r:impl".to_string(),
748            name: "Implementer".to_string(),
749            allow_patterns: vec![GlobPattern::new("src/**")],
750            deny_patterns: vec![GlobPattern::new("docs/**")],
751        });
752
753        let mut event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
754        event.metadata.insert("role".to_string(), "r:impl".to_string());
755
756        assert!(coord.check_roles(&event).is_proceed());
757    }
758
759    #[test]
760    fn test_priority_ordering() {
761        let mut coord = Coordinator::new();
762        // Lower priority (higher number) should not be checked if higher priority blocks
763        coord.threads.push(BThread {
764            id: "w:1".to_string(),
765            name: "High priority block".to_string(),
766            priority: 1,
767            enabled: true,
768            rules: vec![BThreadRule::BlockAlways {
769                scope: EventPattern::kind(EventKind::DangerousCommand),
770            }],
771        });
772        coord.threads.push(BThread {
773            id: "w:2".to_string(),
774            name: "Lower priority".to_string(),
775            priority: 50,
776            enabled: true,
777            rules: vec![BThreadRule::BlockAlways {
778                scope: EventPattern::kind(EventKind::DangerousCommand),
779            }],
780        });
781
782        let event = make_event(EventKind::DangerousCommand, "agent-1", "pkill");
783        let decision = coord.evaluate(&event);
784        match &decision {
785            Decision::Blocked { thread_id, .. } => assert_eq!(thread_id, "w:1"),
786            _ => panic!("Expected blocked"),
787        }
788    }
789
790    #[test]
791    fn test_expand_key_template() {
792        let event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
793        assert_eq!(expand_key_template("file:{target}", &event), "file:src/main.rs");
794        assert_eq!(expand_key_template("{agent}-lock", &event), "agent-1-lock");
795        assert_eq!(expand_key_template("schema-global", &event), "schema-global");
796    }
797
798    #[test]
799    fn test_rate_limit_under_limit_proceeds() {
800        let mut coord = Coordinator::new();
801        coord.threads.push(BThread {
802            id: "w:10".to_string(),
803            name: "Commit rate".to_string(),
804            priority: 50,
805            enabled: true,
806            rules: vec![BThreadRule::RateLimit {
807                scope: EventPattern::kind(EventKind::Commit),
808                max: 3,
809                window_secs: 300,
810            }],
811        });
812
813        // Log 2 events (under limit of 3)
814        let now = Utc::now();
815        coord.event_log.push(make_timestamped(
816            make_event(EventKind::Commit, "agent-1", ""),
817            &now.to_rfc3339(),
818        ));
819        coord.event_log.push(make_timestamped(
820            make_event(EventKind::Commit, "agent-1", ""),
821            &now.to_rfc3339(),
822        ));
823
824        let event = make_event(EventKind::Commit, "agent-1", "");
825        assert!(coord.evaluate(&event).is_proceed());
826    }
827
828    #[test]
829    fn test_rate_limit_over_limit_blocked() {
830        let mut coord = Coordinator::new();
831        coord.threads.push(BThread {
832            id: "w:10".to_string(),
833            name: "Commit rate".to_string(),
834            priority: 50,
835            enabled: true,
836            rules: vec![BThreadRule::RateLimit {
837                scope: EventPattern::kind(EventKind::Commit),
838                max: 2,
839                window_secs: 300,
840            }],
841        });
842
843        // Log 2 events (at limit)
844        let now = Utc::now();
845        for _ in 0..2 {
846            coord.event_log.push(make_timestamped(
847                make_event(EventKind::Commit, "agent-1", ""),
848                &now.to_rfc3339(),
849            ));
850        }
851
852        let event = make_event(EventKind::Commit, "agent-1", "");
853        let decision = coord.evaluate(&event);
854        assert!(decision.is_blocked());
855        match decision {
856            Decision::Blocked { reason, .. } => {
857                assert!(reason.contains("rate limit"));
858            }
859            _ => panic!("Expected blocked"),
860        }
861    }
862
863    #[test]
864    fn test_rate_limit_old_events_not_counted() {
865        let mut coord = Coordinator::new();
866        coord.threads.push(BThread {
867            id: "w:10".to_string(),
868            name: "Commit rate".to_string(),
869            priority: 50,
870            enabled: true,
871            rules: vec![BThreadRule::RateLimit {
872                scope: EventPattern::kind(EventKind::Commit),
873                max: 2,
874                window_secs: 60,
875            }],
876        });
877
878        // Log events from 2 hours ago (outside 60s window)
879        let old = Utc::now() - chrono::Duration::hours(2);
880        for _ in 0..5 {
881            coord.event_log.push(make_timestamped(
882                make_event(EventKind::Commit, "agent-1", ""),
883                &old.to_rfc3339(),
884            ));
885        }
886
887        let event = make_event(EventKind::Commit, "agent-1", "");
888        assert!(coord.evaluate(&event).is_proceed());
889    }
890
891    #[test]
892    fn test_partition_in_slot_proceeds() {
893        let mut coord = Coordinator::new();
894        coord.threads.push(BThread {
895            id: "w:11".to_string(),
896            name: "File partition".to_string(),
897            priority: 20,
898            enabled: true,
899            rules: vec![BThreadRule::Partition {
900                scope: EventPattern::kind(EventKind::FileWrite),
901                strategy: PartitionStrategy::Hash,
902                agent_count: 2,
903            }],
904        });
905
906        // We need to find a target that hashes to agent-0's slot (slot 0)
907        // Try different targets until we find one in slot 0
908        let target = find_target_for_slot(0, PartitionStrategy::Hash, 2);
909        let event = make_event(EventKind::FileWrite, "agent-0", &target);
910        assert!(coord.evaluate(&event).is_proceed());
911    }
912
913    #[test]
914    fn test_partition_out_of_slot_blocked() {
915        let mut coord = Coordinator::new();
916        coord.threads.push(BThread {
917            id: "w:11".to_string(),
918            name: "File partition".to_string(),
919            priority: 20,
920            enabled: true,
921            rules: vec![BThreadRule::Partition {
922                scope: EventPattern::kind(EventKind::FileWrite),
923                strategy: PartitionStrategy::Hash,
924                agent_count: 2,
925            }],
926        });
927
928        // Find a target for slot 1, then try with agent-0 (slot 0) -> blocked
929        let target = find_target_for_slot(1, PartitionStrategy::Hash, 2);
930        let event = make_event(EventKind::FileWrite, "agent-0", &target);
931        assert!(coord.evaluate(&event).is_blocked());
932    }
933
934    // Helper: find a target string whose hash maps to a specific slot
935    fn find_target_for_slot(slot: u32, strategy: PartitionStrategy, count: u32) -> String {
936        for i in 0..1000 {
937            let target = format!("src/file{}.rs", i);
938            if compute_partition_slot(&target, strategy, count) == slot {
939                return target;
940            }
941        }
942        panic!("Could not find target for slot {}", slot);
943    }
944
945    #[test]
946    fn test_timeout_expired_operation() {
947        let mut coord = Coordinator::new();
948        coord.threads.push(BThread {
949            id: "w:12".to_string(),
950            name: "Build timeout".to_string(),
951            priority: 30,
952            enabled: true,
953            rules: vec![BThreadRule::Timeout {
954                scope: EventPattern::kind(EventKind::Build),
955                max_duration_secs: 60,
956                action: super::super::bthread::TimeoutAction::Kill,
957            }],
958        });
959
960        // Log a build event from 2 minutes ago (exceeds 60s timeout)
961        let old = Utc::now() - chrono::Duration::minutes(2);
962        coord.event_log.push(make_timestamped(
963            make_event(EventKind::Build, "agent-1", ""),
964            &old.to_rfc3339(),
965        ));
966
967        let event = make_event(EventKind::Build, "agent-1", "");
968        let decision = coord.evaluate(&event);
969        assert!(decision.is_blocked());
970        match decision {
971            Decision::Blocked { reason, .. } => {
972                assert!(reason.contains("timeout"));
973            }
974            _ => panic!("Expected blocked"),
975        }
976    }
977
978    #[test]
979    fn test_timeout_not_yet_expired() {
980        let mut coord = Coordinator::new();
981        coord.threads.push(BThread {
982            id: "w:12".to_string(),
983            name: "Build timeout".to_string(),
984            priority: 30,
985            enabled: true,
986            rules: vec![BThreadRule::Timeout {
987                scope: EventPattern::kind(EventKind::Build),
988                max_duration_secs: 3600,
989                action: super::super::bthread::TimeoutAction::Kill,
990            }],
991        });
992
993        // Log a build event from just now (well within 3600s)
994        coord.event_log.push(make_timestamped(
995            make_event(EventKind::Build, "agent-1", ""),
996            &Utc::now().to_rfc3339(),
997        ));
998
999        let event = make_event(EventKind::Build, "agent-1", "");
1000        assert!(coord.evaluate(&event).is_proceed());
1001    }
1002
1003    #[test]
1004    fn test_ttl_expiry_auto_releases() {
1005        let mut coord = Coordinator::new();
1006        // Insert a lock that expired 2 hours ago (ttl=1s, acquired 2h ago)
1007        let old = Utc::now() - chrono::Duration::hours(2);
1008        coord.active_locks.insert(
1009            "file:src/main.rs".to_string(),
1010            ActiveLock {
1011                lock_key: "file:src/main.rs".to_string(),
1012                holder_agent: "agent-1".to_string(),
1013                task_id: None,
1014                acquired_at: old.to_rfc3339(),
1015                ttl_secs: 1,
1016            },
1017        );
1018
1019        coord.expire_locks();
1020        assert!(coord.active_locks.is_empty());
1021    }
1022
1023    #[test]
1024    fn test_ttl_zero_never_expires() {
1025        let mut coord = Coordinator::new();
1026        let old = Utc::now() - chrono::Duration::hours(2);
1027        coord.active_locks.insert(
1028            "global".to_string(),
1029            ActiveLock {
1030                lock_key: "global".to_string(),
1031                holder_agent: "agent-1".to_string(),
1032                task_id: None,
1033                acquired_at: old.to_rfc3339(),
1034                ttl_secs: 0,
1035            },
1036        );
1037
1038        coord.expire_locks();
1039        assert_eq!(coord.active_locks.len(), 1);
1040    }
1041
1042    #[test]
1043    fn test_release_specific_lock() {
1044        let mut coord = Coordinator::new();
1045        coord.active_locks.insert(
1046            "file:a.rs".to_string(),
1047            ActiveLock {
1048                lock_key: "file:a.rs".to_string(),
1049                holder_agent: "agent-1".to_string(),
1050                task_id: None,
1051                acquired_at: Utc::now().to_rfc3339(),
1052                ttl_secs: 3600,
1053            },
1054        );
1055        coord.active_locks.insert(
1056            "file:b.rs".to_string(),
1057            ActiveLock {
1058                lock_key: "file:b.rs".to_string(),
1059                holder_agent: "agent-1".to_string(),
1060                task_id: None,
1061                acquired_at: Utc::now().to_rfc3339(),
1062                ttl_secs: 3600,
1063            },
1064        );
1065
1066        coord.release("file:a.rs", "agent-1").unwrap();
1067        assert!(!coord.active_locks.contains_key("file:a.rs"));
1068        assert!(coord.active_locks.contains_key("file:b.rs"));
1069    }
1070
1071    #[test]
1072    fn test_release_all_by_agent() {
1073        let mut coord = Coordinator::new();
1074        for key in ["file:a.rs", "file:b.rs"] {
1075            coord.active_locks.insert(
1076                key.to_string(),
1077                ActiveLock {
1078                    lock_key: key.to_string(),
1079                    holder_agent: "agent-1".to_string(),
1080                    task_id: None,
1081                    acquired_at: Utc::now().to_rfc3339(),
1082                    ttl_secs: 3600,
1083                },
1084            );
1085        }
1086        coord.active_locks.insert(
1087            "file:c.rs".to_string(),
1088            ActiveLock {
1089                lock_key: "file:c.rs".to_string(),
1090                holder_agent: "agent-2".to_string(),
1091                task_id: None,
1092                acquired_at: Utc::now().to_rfc3339(),
1093                ttl_secs: 3600,
1094            },
1095        );
1096
1097        coord.release_all("agent-1", None).unwrap();
1098        assert_eq!(coord.active_locks.len(), 1);
1099        assert!(coord.active_locks.contains_key("file:c.rs"));
1100    }
1101
1102    #[test]
1103    fn test_release_all_by_agent_and_task() {
1104        let mut coord = Coordinator::new();
1105        coord.active_locks.insert(
1106            "file:a.rs".to_string(),
1107            ActiveLock {
1108                lock_key: "file:a.rs".to_string(),
1109                holder_agent: "agent-1".to_string(),
1110                task_id: Some("task-1".to_string()),
1111                acquired_at: Utc::now().to_rfc3339(),
1112                ttl_secs: 3600,
1113            },
1114        );
1115        coord.active_locks.insert(
1116            "file:b.rs".to_string(),
1117            ActiveLock {
1118                lock_key: "file:b.rs".to_string(),
1119                holder_agent: "agent-1".to_string(),
1120                task_id: Some("task-2".to_string()),
1121                acquired_at: Utc::now().to_rfc3339(),
1122                ttl_secs: 3600,
1123            },
1124        );
1125
1126        // Only release locks for task-1
1127        coord.release_all("agent-1", Some("task-1")).unwrap();
1128        assert_eq!(coord.active_locks.len(), 1);
1129        assert!(coord.active_locks.contains_key("file:b.rs"));
1130    }
1131
1132    #[test]
1133    fn test_multiple_threads_first_block_wins() {
1134        let mut coord = Coordinator::new();
1135        // Thread with priority 5 blocks FileWrite
1136        coord.threads.push(BThread {
1137            id: "w:1".to_string(),
1138            name: "File block".to_string(),
1139            priority: 5,
1140            enabled: true,
1141            rules: vec![BThreadRule::BlockAlways {
1142                scope: EventPattern::kind(EventKind::FileWrite),
1143            }],
1144        });
1145        // Thread with priority 10 also blocks FileWrite (but lower priority)
1146        coord.threads.push(BThread {
1147            id: "w:2".to_string(),
1148            name: "Also file block".to_string(),
1149            priority: 10,
1150            enabled: true,
1151            rules: vec![BThreadRule::BlockAlways {
1152                scope: EventPattern::kind(EventKind::FileWrite),
1153            }],
1154        });
1155
1156        let event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
1157        let decision = coord.evaluate(&event);
1158        match &decision {
1159            Decision::Blocked { thread_id, .. } => {
1160                assert_eq!(thread_id, "w:1", "Higher priority (lower number) thread should block first");
1161            }
1162            _ => panic!("Expected blocked"),
1163        }
1164    }
1165
1166    #[test]
1167    fn test_evaluate_full_role_check_before_threads() {
1168        let mut coord = Coordinator::new();
1169        coord.roles.push(Role {
1170            id: "r:test".to_string(),
1171            name: "Tester".to_string(),
1172            allow_patterns: vec![GlobPattern::new("tests/**")],
1173            deny_patterns: vec![],
1174        });
1175
1176        // Also add a thread that would block
1177        coord.threads.push(BThread {
1178            id: "w:1".to_string(),
1179            name: "Always block".to_string(),
1180            priority: 1,
1181            enabled: true,
1182            rules: vec![BThreadRule::BlockAlways {
1183                scope: EventPattern::kind(EventKind::FileWrite),
1184            }],
1185        });
1186
1187        let mut event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
1188        event.metadata.insert("role".to_string(), "r:test".to_string());
1189
1190        // Role check should block before b-thread evaluation
1191        let decision = coord.evaluate_full(&event);
1192        assert!(decision.is_blocked());
1193        match decision {
1194            Decision::Blocked { thread_id, .. } => {
1195                assert!(thread_id.starts_with("role:"), "Role should block, not thread");
1196            }
1197            _ => panic!("Expected blocked"),
1198        }
1199    }
1200
1201    #[test]
1202    fn test_event_pattern_with_negated_agent() {
1203        let pattern = EventPattern {
1204            kind: Some(EventKind::FileWrite),
1205            agent: Some("admin".to_string()),
1206            target: None,
1207            task_id: None,
1208            negate_agent: true,
1209            target_not: Vec::new(),
1210        };
1211
1212        // Non-admin agent matches (negated)
1213        let event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
1214        assert!(pattern.matches_event(&event));
1215
1216        // Admin agent does not match (negated)
1217        let event = make_event(EventKind::FileWrite, "admin", "src/main.rs");
1218        assert!(!pattern.matches_event(&event));
1219    }
1220
1221    #[test]
1222    fn test_event_pattern_with_target_exclusion() {
1223        let pattern = EventPattern {
1224            kind: Some(EventKind::FileWrite),
1225            agent: None,
1226            target: None,
1227            task_id: None,
1228            negate_agent: false,
1229            target_not: vec![GlobPattern::new("docs/**"), GlobPattern::new("*.md")],
1230        };
1231
1232        // Normal file matches
1233        let event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
1234        assert!(pattern.matches_event(&event));
1235
1236        // Excluded file does not match
1237        let event = make_event(EventKind::FileWrite, "agent-1", "docs/readme.md");
1238        assert!(!pattern.matches_event(&event));
1239
1240        // Another excluded pattern
1241        let event = make_event(EventKind::FileWrite, "agent-1", "CHANGELOG.md");
1242        assert!(!pattern.matches_event(&event));
1243    }
1244
1245    #[test]
1246    fn test_record_then_evaluate_full_cycle() {
1247        let mut coord = Coordinator::new();
1248        coord.threads.push(BThread {
1249            id: "w:1".to_string(),
1250            name: "File mutex".to_string(),
1251            priority: 5,
1252            enabled: true,
1253            rules: vec![BThreadRule::Mutex {
1254                scope: EventPattern::kind(EventKind::FileWrite),
1255                key: "file:{target}".to_string(),
1256                ttl_secs: Some(3600),
1257            }],
1258        });
1259
1260        // Step 1: agent-1 records a FileWrite (acquires lock)
1261        let event1 = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
1262        coord.record(&event1).unwrap();
1263        assert!(coord.active_locks.contains_key("file:src/main.rs"));
1264
1265        // Step 2: agent-2 tries to write same file -> BLOCKED
1266        let event2 = make_event(EventKind::FileWrite, "agent-2", "src/main.rs");
1267        assert!(coord.evaluate(&event2).is_blocked());
1268
1269        // Step 3: agent-1 releases the lock
1270        coord.release("file:src/main.rs", "agent-1").unwrap();
1271        assert!(!coord.active_locks.contains_key("file:src/main.rs"));
1272
1273        // Step 4: agent-2 can now write -> PROCEED
1274        assert!(coord.evaluate(&event2).is_proceed());
1275    }
1276
1277    #[test]
1278    fn test_expired_lock_allows_access() {
1279        let mut coord = Coordinator::new();
1280        coord.threads.push(BThread {
1281            id: "w:1".to_string(),
1282            name: "File mutex".to_string(),
1283            priority: 5,
1284            enabled: true,
1285            rules: vec![BThreadRule::Mutex {
1286                scope: EventPattern::kind(EventKind::FileWrite),
1287                key: "file:{target}".to_string(),
1288                ttl_secs: Some(1),
1289            }],
1290        });
1291
1292        // Insert lock that's already expired
1293        let old = Utc::now() - chrono::Duration::hours(1);
1294        coord.active_locks.insert(
1295            "file:src/main.rs".to_string(),
1296            ActiveLock {
1297                lock_key: "file:src/main.rs".to_string(),
1298                holder_agent: "agent-1".to_string(),
1299                task_id: None,
1300                acquired_at: old.to_rfc3339(),
1301                ttl_secs: 1,
1302            },
1303        );
1304
1305        // Even without calling expire_locks, the evaluate should see it's expired
1306        let event = make_event(EventKind::FileWrite, "agent-2", "src/main.rs");
1307        assert!(coord.evaluate(&event).is_proceed());
1308    }
1309
1310    #[test]
1311    fn test_decision_display() {
1312        let proceed = Decision::Proceed;
1313        assert_eq!(format!("{}", proceed), "PROCEED");
1314
1315        let wait = Decision::Wait {
1316            reason: "needs tests".to_string(),
1317            thread_id: "w:1".to_string(),
1318        };
1319        assert!(format!("{}", wait).contains("WAIT"));
1320        assert!(format!("{}", wait).contains("needs tests"));
1321
1322        let blocked = Decision::Blocked {
1323            reason: "mutex held".to_string(),
1324            thread_id: "w:2".to_string(),
1325        };
1326        assert!(format!("{}", blocked).contains("BLOCKED"));
1327        assert!(format!("{}", blocked).contains("mutex held"));
1328    }
1329
1330    #[test]
1331    fn test_block_until() {
1332        let mut coord = Coordinator::new();
1333        coord.threads.push(BThread {
1334            id: "w:5".to_string(),
1335            name: "API review gate".to_string(),
1336            priority: 15,
1337            enabled: true,
1338            rules: vec![BThreadRule::BlockUntil {
1339                trigger: EventPattern::kind(EventKind::ApiChange),
1340                block: vec![EventPattern::kind(EventKind::Build)],
1341                until: EventPattern::kind(EventKind::Custom("ApiReviewApproved".to_string())),
1342                escalate: false,
1343                escalation_message: None,
1344            }],
1345        });
1346
1347        // No trigger yet -> build proceeds
1348        let build = make_event(EventKind::Build, "agent-1", "");
1349        assert!(coord.evaluate(&build).is_proceed());
1350
1351        // Trigger ApiChange
1352        coord.event_log.push(make_timestamped(
1353            make_event(EventKind::ApiChange, "agent-1", "src/api.rs"),
1354            &Utc::now().to_rfc3339(),
1355        ));
1356
1357        // Build now blocked
1358        assert!(coord.evaluate(&build).is_blocked());
1359
1360        // Resolve with ApiReviewApproved
1361        coord.event_log.push(make_timestamped(
1362            make_event(EventKind::Custom("ApiReviewApproved".to_string()), "reviewer", ""),
1363            &Utc::now().to_rfc3339(),
1364        ));
1365
1366        // Build proceeds again
1367        assert!(coord.evaluate(&build).is_proceed());
1368    }
1369}