1use chrono::Utc;
4use serde::{Deserialize, Serialize};
5use std::collections::hash_map::DefaultHasher;
6use std::collections::HashMap;
7use std::hash::{Hash, Hasher};
8
9use crate::bthread::{BThread, BThreadRule, PartitionDef, PartitionStrategy, Role};
10use crate::event::Event;
11use crate::log::TimestampedEvent;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct ActiveLock {
16 pub lock_key: String,
17 pub holder_agent: String,
18 pub task_id: Option<String>,
19 pub acquired_at: String,
20 pub ttl_secs: u64,
21}
22
23impl ActiveLock {
24 pub fn is_expired(&self) -> bool {
26 if self.ttl_secs == 0 {
27 return false;
28 }
29 let Ok(acquired) = chrono::DateTime::parse_from_rfc3339(&self.acquired_at) else {
30 return false;
31 };
32 let elapsed = Utc::now().signed_duration_since(acquired);
33 elapsed.num_seconds() as u64 >= self.ttl_secs
34 }
35}
36
37#[derive(Debug, Clone)]
39pub enum Decision {
40 Proceed,
42 Wait {
44 reason: String,
45 thread_id: String,
46 },
47 Blocked {
49 reason: String,
50 thread_id: String,
51 },
52}
53
54impl Decision {
55 pub fn is_proceed(&self) -> bool {
56 matches!(self, Decision::Proceed)
57 }
58
59 pub fn is_blocked(&self) -> bool {
60 matches!(self, Decision::Blocked { .. })
61 }
62
63 pub fn is_wait(&self) -> bool {
64 matches!(self, Decision::Wait { .. })
65 }
66}
67
68impl std::fmt::Display for Decision {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 match self {
71 Decision::Proceed => write!(f, "PROCEED"),
72 Decision::Wait { reason, thread_id } => {
73 write!(f, "WAIT for {} \"{}\"", thread_id, reason)
74 }
75 Decision::Blocked { reason, thread_id } => {
76 write!(f, "BLOCKED by {} \"{}\"", thread_id, reason)
77 }
78 }
79 }
80}
81
82pub struct Coordinator {
84 pub threads: Vec<BThread>,
85 pub roles: Vec<Role>,
86 pub partitions: Vec<PartitionDef>,
87 pub active_locks: HashMap<String, ActiveLock>,
88 pub event_log: Vec<TimestampedEvent>,
89}
90
91impl Coordinator {
92 pub fn new() -> Self {
94 Coordinator {
95 threads: Vec::new(),
96 roles: Vec::new(),
97 partitions: Vec::new(),
98 active_locks: HashMap::new(),
99 event_log: Vec::new(),
100 }
101 }
102
103 pub fn expire_locks(&mut self) {
105 self.active_locks.retain(|_, lock| !lock.is_expired());
106 }
107
108 pub fn evaluate(&self, event: &Event) -> Decision {
115 let mut sorted: Vec<&BThread> = self.threads.iter().filter(|t| t.enabled).collect();
117 sorted.sort_by_key(|t| t.priority);
118
119 for thread in &sorted {
120 for rule in &thread.rules {
121 let decision = self.evaluate_rule(rule, event, &thread.id, &thread.name);
122 if !decision.is_proceed() {
123 return decision;
124 }
125 }
126 }
127
128 Decision::Proceed
129 }
130
131 fn evaluate_rule(
133 &self,
134 rule: &BThreadRule,
135 event: &Event,
136 thread_id: &str,
137 thread_name: &str,
138 ) -> Decision {
139 match rule {
140 BThreadRule::BlockAlways { scope } => {
141 if scope.matches_event(event) {
142 return Decision::Blocked {
143 reason: format!("{}: unconditionally blocked", thread_name),
144 thread_id: thread_id.to_string(),
145 };
146 }
147 }
148 BThreadRule::Mutex {
149 scope,
150 key,
151 ttl_secs: _,
152 } => {
153 if scope.matches_event(event) {
154 let expanded_key = expand_key_template(key, event);
155 if let Some(lock) = self.active_locks.get(&expanded_key) {
156 if !lock.is_expired() {
157 let event_agent = event.agent.as_deref().unwrap_or("");
159 if lock.holder_agent != event_agent {
160 return Decision::Blocked {
161 reason: format!(
162 "{}: {} holds {} ({})",
163 thread_name,
164 lock.holder_agent,
165 expanded_key,
166 lock.task_id.as_deref().unwrap_or("no task"),
167 ),
168 thread_id: thread_id.to_string(),
169 };
170 }
171 }
172 }
173 }
174 }
175 BThreadRule::Require {
176 trigger,
177 prerequisite,
178 reset,
179 } => {
180 if trigger.matches_event(event) {
181 let reset_idx = reset.as_ref().and_then(|rp| {
183 self.event_log
184 .iter()
185 .rposition(|te| rp.matches_event(&te.event))
186 });
187
188 let search_from = reset_idx.map(|i| i + 1).unwrap_or(0);
190 let has_prereq = self.event_log[search_from..]
191 .iter()
192 .any(|te| prerequisite.matches_event(&te.event));
193
194 if !has_prereq {
195 return Decision::Wait {
196 reason: format!(
197 "{}: {} required",
198 thread_name,
199 prerequisite
200 .kind
201 .as_ref()
202 .map(|k| k.as_str())
203 .unwrap_or("prerequisite"),
204 ),
205 thread_id: thread_id.to_string(),
206 };
207 }
208 }
209 }
210 BThreadRule::BlockUntil {
211 trigger,
212 block,
213 until,
214 ..
215 } => {
216 let trigger_seen = self
218 .event_log
219 .iter()
220 .rposition(|te| trigger.matches_event(&te.event));
221
222 if let Some(trigger_idx) = trigger_seen {
223 let until_seen = self.event_log[trigger_idx..]
225 .iter()
226 .any(|te| until.matches_event(&te.event));
227
228 if !until_seen {
229 for bp in block {
232 if bp.matches_event(event) {
233 return Decision::Blocked {
234 reason: format!(
235 "{}: blocked until {}",
236 thread_name,
237 until
238 .kind
239 .as_ref()
240 .map(|k| k.as_str())
241 .unwrap_or("condition met"),
242 ),
243 thread_id: thread_id.to_string(),
244 };
245 }
246 }
247 }
248 }
249 }
250 BThreadRule::RateLimit {
251 scope,
252 max,
253 window_secs,
254 } => {
255 if scope.matches_event(event) {
256 let now = Utc::now();
257 let count = self
258 .event_log
259 .iter()
260 .filter(|te| {
261 if !scope.matches_event(&te.event) {
262 return false;
263 }
264 if let Ok(ts) = chrono::DateTime::parse_from_rfc3339(&te.timestamp) {
265 let elapsed = now.signed_duration_since(ts);
266 elapsed.num_seconds() < *window_secs as i64
267 } else {
268 false
269 }
270 })
271 .count() as u32;
272
273 if count >= *max {
274 return Decision::Blocked {
275 reason: format!(
276 "{}: rate limit exceeded ({}/{} in {}s window)",
277 thread_name, count, max, window_secs,
278 ),
279 thread_id: thread_id.to_string(),
280 };
281 }
282 }
283 }
284 BThreadRule::Timeout {
285 scope,
286 max_duration_secs,
287 action: _,
288 } => {
289 if scope.matches_event(event) {
290 let now = Utc::now();
292 for te in self.event_log.iter().rev() {
293 if scope.matches_event(&te.event) {
294 if let Ok(ts) = chrono::DateTime::parse_from_rfc3339(&te.timestamp) {
295 let elapsed = now.signed_duration_since(ts);
296 if elapsed.num_seconds() as u64 > *max_duration_secs {
297 return Decision::Blocked {
298 reason: format!(
299 "{}: timeout exceeded ({}s > {}s max)",
300 thread_name,
301 elapsed.num_seconds(),
302 max_duration_secs,
303 ),
304 thread_id: thread_id.to_string(),
305 };
306 }
307 }
308 break;
309 }
310 }
311 }
312 }
313 BThreadRule::Partition {
314 scope,
315 strategy,
316 agent_count,
317 } => {
318 if scope.matches_event(event) {
319 if let (Some(agent), Some(target)) = (&event.agent, &event.target) {
320 let agent_slot = parse_agent_slot(agent);
321 let target_slot = compute_partition_slot(target, *strategy, *agent_count);
322 if agent_slot != target_slot {
323 return Decision::Blocked {
324 reason: format!(
325 "{}: {} assigned to slot {}, target in slot {}",
326 thread_name, agent, agent_slot, target_slot,
327 ),
328 thread_id: thread_id.to_string(),
329 };
330 }
331 }
332 }
333 }
334 }
335
336 Decision::Proceed
339 }
340
341 fn check_roles(&self, event: &Event) -> Decision {
343 let Some(_agent) = &event.agent else {
345 return Decision::Proceed;
346 };
347 let Some(target) = &event.target else {
348 return Decision::Proceed;
349 };
350
351 let role_id = event.metadata.get("role");
353 let Some(role_id) = role_id else {
354 return Decision::Proceed;
355 };
356
357 let Some(role) = self.roles.iter().find(|r| r.id == *role_id) else {
359 return Decision::Proceed;
360 };
361
362 for deny in &role.deny_patterns {
364 if deny.matches(target) {
365 return Decision::Blocked {
366 reason: format!(
367 "role \"{}\": {} denied for {} role",
368 role.name, target, role.name,
369 ),
370 thread_id: format!("role:{}", role.id),
371 };
372 }
373 }
374
375 if !role.allow_patterns.is_empty() {
377 let allowed = role.allow_patterns.iter().any(|p| p.matches(target));
378 if !allowed {
379 return Decision::Blocked {
380 reason: format!(
381 "role \"{}\": {} not in allowed scope for {} role",
382 role.name, target, role.name,
383 ),
384 thread_id: format!("role:{}", role.id),
385 };
386 }
387 }
388
389 Decision::Proceed
390 }
391
392 pub fn evaluate_full(&self, event: &Event) -> Decision {
394 let role_decision = self.check_roles(event);
396 if !role_decision.is_proceed() {
397 return role_decision;
398 }
399
400 self.evaluate(event)
401 }
402
403 pub fn record(&mut self, event: &Event) -> anyhow::Result<()> {
405 let now = Utc::now().to_rfc3339();
406
407 self.event_log.push(TimestampedEvent {
409 timestamp: now.clone(),
410 event: event.clone(),
411 });
412
413 for thread in &self.threads {
415 if !thread.enabled {
416 continue;
417 }
418 for rule in &thread.rules {
419 if let BThreadRule::Mutex {
420 scope,
421 key,
422 ttl_secs,
423 } = rule
424 {
425 if scope.matches_event(event) {
426 let expanded_key = expand_key_template(key, event);
427 if let Some(agent) = &event.agent {
428 self.active_locks.insert(
429 expanded_key.clone(),
430 ActiveLock {
431 lock_key: expanded_key,
432 holder_agent: agent.clone(),
433 task_id: event.task_id.clone(),
434 acquired_at: now.clone(),
435 ttl_secs: ttl_secs.unwrap_or(3600),
436 },
437 );
438 }
439 }
440 }
441 }
442 }
443
444 Ok(())
445 }
446
447 pub fn release(&mut self, lock_key: &str, _agent: &str) -> anyhow::Result<()> {
449 self.active_locks.remove(lock_key);
450 Ok(())
451 }
452
453 pub fn release_all(&mut self, agent: &str, task_id: Option<&str>) -> anyhow::Result<()> {
455 self.active_locks.retain(|_, lock| {
456 if lock.holder_agent != agent {
457 return true;
458 }
459 if let Some(tid) = task_id {
460 lock.task_id.as_deref() != Some(tid)
461 } else {
462 false
463 }
464 });
465 Ok(())
466 }
467}
468
469impl Default for Coordinator {
470 fn default() -> Self {
471 Self::new()
472 }
473}
474
475fn expand_key_template(template: &str, event: &Event) -> String {
477 let mut result = template.to_string();
478 if let Some(ref target) = event.target {
479 result = result.replace("{target}", target);
480 }
481 if let Some(ref agent) = event.agent {
482 result = result.replace("{agent}", agent);
483 }
484 result
485}
486
487fn parse_agent_slot(agent: &str) -> u32 {
489 agent
490 .rsplit('-')
491 .next()
492 .and_then(|s| s.parse::<u32>().ok())
493 .unwrap_or(0)
494}
495
496fn compute_partition_slot(target: &str, strategy: PartitionStrategy, agent_count: u32) -> u32 {
498 if agent_count == 0 {
499 return 0;
500 }
501 match strategy {
502 PartitionStrategy::Hash => {
503 let mut hasher = DefaultHasher::new();
504 target.hash(&mut hasher);
505 (hasher.finish() % agent_count as u64) as u32
506 }
507 PartitionStrategy::RoundRobin => {
508 let mut hasher = DefaultHasher::new();
510 target.hash(&mut hasher);
511 (hasher.finish() % agent_count as u64) as u32
512 }
513 PartitionStrategy::Directory => {
514 let dir = target.split('/').next().unwrap_or(target);
516 let mut hasher = DefaultHasher::new();
517 dir.hash(&mut hasher);
518 (hasher.finish() % agent_count as u64) as u32
519 }
520 }
521}
522
523#[cfg(test)]
524mod tests {
525 use super::*;
526 use crate::event::{EventKind, EventPattern};
527 use crate::matcher::GlobPattern;
528
529 fn make_event(kind: EventKind, agent: &str, target: &str) -> Event {
530 Event {
531 kind,
532 agent: Some(agent.to_string()),
533 target: Some(target.to_string()),
534 task_id: None,
535 metadata: std::collections::HashMap::new(),
536 }
537 }
538
539 fn make_timestamped(event: Event, timestamp: &str) -> TimestampedEvent {
540 TimestampedEvent {
541 timestamp: timestamp.to_string(),
542 event,
543 }
544 }
545
546 #[test]
547 fn test_block_always() {
548 let mut coord = Coordinator::new();
549 coord.threads.push(BThread {
550 id: "w:1".to_string(),
551 name: "No dangerous".to_string(),
552 priority: 1,
553 enabled: true,
554 rules: vec![BThreadRule::BlockAlways {
555 scope: EventPattern::kind(EventKind::DangerousCommand),
556 }],
557 });
558
559 let event = make_event(EventKind::DangerousCommand, "agent-1", "pkill bash");
560 assert!(coord.evaluate(&event).is_blocked());
561
562 let event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
563 assert!(coord.evaluate(&event).is_proceed());
564 }
565
566 #[test]
567 fn test_mutex_different_agents() {
568 let mut coord = Coordinator::new();
569 coord.threads.push(BThread {
570 id: "w:1".to_string(),
571 name: "File mutex".to_string(),
572 priority: 5,
573 enabled: true,
574 rules: vec![BThreadRule::Mutex {
575 scope: EventPattern::kind(EventKind::FileWrite),
576 key: "file:{target}".to_string(),
577 ttl_secs: Some(3600),
578 }],
579 });
580
581 coord.active_locks.insert(
583 "file:src/main.rs".to_string(),
584 ActiveLock {
585 lock_key: "file:src/main.rs".to_string(),
586 holder_agent: "agent-1".to_string(),
587 task_id: Some("auth:1".to_string()),
588 acquired_at: Utc::now().to_rfc3339(),
589 ttl_secs: 3600,
590 },
591 );
592
593 let event = make_event(EventKind::FileWrite, "agent-2", "src/main.rs");
595 assert!(coord.evaluate(&event).is_blocked());
596
597 let event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
599 assert!(coord.evaluate(&event).is_proceed());
600
601 let event = make_event(EventKind::FileWrite, "agent-2", "src/lib.rs");
603 assert!(coord.evaluate(&event).is_proceed());
604 }
605
606 #[test]
607 fn test_require_no_prereq() {
608 let mut coord = Coordinator::new();
609 coord.threads.push(BThread {
610 id: "w:2".to_string(),
611 name: "Test gate".to_string(),
612 priority: 10,
613 enabled: true,
614 rules: vec![BThreadRule::Require {
615 trigger: EventPattern::kind(EventKind::Commit),
616 prerequisite: EventPattern::kind(EventKind::TestPass),
617 reset: Some(EventPattern::kind(EventKind::FileWrite)),
618 }],
619 });
620
621 let event = make_event(EventKind::Commit, "agent-1", "");
623 assert!(coord.evaluate(&event).is_wait());
624 }
625
626 #[test]
627 fn test_require_with_prereq() {
628 let mut coord = Coordinator::new();
629 coord.threads.push(BThread {
630 id: "w:2".to_string(),
631 name: "Test gate".to_string(),
632 priority: 10,
633 enabled: true,
634 rules: vec![BThreadRule::Require {
635 trigger: EventPattern::kind(EventKind::Commit),
636 prerequisite: EventPattern::kind(EventKind::TestPass),
637 reset: Some(EventPattern::kind(EventKind::FileWrite)),
638 }],
639 });
640
641 coord.event_log.push(make_timestamped(
643 make_event(EventKind::TestPass, "agent-1", ""),
644 &Utc::now().to_rfc3339(),
645 ));
646
647 let event = make_event(EventKind::Commit, "agent-1", "");
649 assert!(coord.evaluate(&event).is_proceed());
650 }
651
652 #[test]
653 fn test_require_reset() {
654 let mut coord = Coordinator::new();
655 coord.threads.push(BThread {
656 id: "w:2".to_string(),
657 name: "Test gate".to_string(),
658 priority: 10,
659 enabled: true,
660 rules: vec![BThreadRule::Require {
661 trigger: EventPattern::kind(EventKind::Commit),
662 prerequisite: EventPattern::kind(EventKind::TestPass),
663 reset: Some(EventPattern::kind(EventKind::FileWrite)),
664 }],
665 });
666
667 let now = Utc::now();
668 coord.event_log.push(make_timestamped(
670 make_event(EventKind::TestPass, "agent-1", ""),
671 &now.to_rfc3339(),
672 ));
673 coord.event_log.push(make_timestamped(
675 make_event(EventKind::FileWrite, "agent-1", "src/main.rs"),
676 &(now + chrono::Duration::seconds(1)).to_rfc3339(),
677 ));
678
679 let event = make_event(EventKind::Commit, "agent-1", "");
681 assert!(coord.evaluate(&event).is_wait());
682 }
683
684 #[test]
685 fn test_disabled_thread_ignored() {
686 let mut coord = Coordinator::new();
687 coord.threads.push(BThread {
688 id: "w:1".to_string(),
689 name: "Disabled".to_string(),
690 priority: 1,
691 enabled: false,
692 rules: vec![BThreadRule::BlockAlways {
693 scope: EventPattern::kind(EventKind::DangerousCommand),
694 }],
695 });
696
697 let event = make_event(EventKind::DangerousCommand, "agent-1", "pkill");
698 assert!(coord.evaluate(&event).is_proceed());
699 }
700
701 #[test]
702 fn test_record_creates_locks() {
703 let mut coord = Coordinator::new();
704 coord.threads.push(BThread {
705 id: "w:1".to_string(),
706 name: "File mutex".to_string(),
707 priority: 5,
708 enabled: true,
709 rules: vec![BThreadRule::Mutex {
710 scope: EventPattern::kind(EventKind::FileWrite),
711 key: "file:{target}".to_string(),
712 ttl_secs: Some(3600),
713 }],
714 });
715
716 let event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
717 coord.record(&event).unwrap();
718
719 assert!(coord.active_locks.contains_key("file:src/main.rs"));
720 assert_eq!(
721 coord.active_locks["file:src/main.rs"].holder_agent,
722 "agent-1"
723 );
724 assert_eq!(coord.event_log.len(), 1);
725 }
726
727 #[test]
728 fn test_role_deny() {
729 let mut coord = Coordinator::new();
730 coord.roles.push(Role {
731 id: "r:impl".to_string(),
732 name: "Implementer".to_string(),
733 allow_patterns: vec![GlobPattern::new("src/**")],
734 deny_patterns: vec![GlobPattern::new("docs/**")],
735 });
736
737 let mut event = make_event(EventKind::FileWrite, "agent-1", "docs/readme.md");
738 event.metadata.insert("role".to_string(), "r:impl".to_string());
739
740 assert!(coord.check_roles(&event).is_blocked());
741 }
742
743 #[test]
744 fn test_role_allow() {
745 let mut coord = Coordinator::new();
746 coord.roles.push(Role {
747 id: "r:impl".to_string(),
748 name: "Implementer".to_string(),
749 allow_patterns: vec![GlobPattern::new("src/**")],
750 deny_patterns: vec![GlobPattern::new("docs/**")],
751 });
752
753 let mut event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
754 event.metadata.insert("role".to_string(), "r:impl".to_string());
755
756 assert!(coord.check_roles(&event).is_proceed());
757 }
758
759 #[test]
760 fn test_priority_ordering() {
761 let mut coord = Coordinator::new();
762 coord.threads.push(BThread {
764 id: "w:1".to_string(),
765 name: "High priority block".to_string(),
766 priority: 1,
767 enabled: true,
768 rules: vec![BThreadRule::BlockAlways {
769 scope: EventPattern::kind(EventKind::DangerousCommand),
770 }],
771 });
772 coord.threads.push(BThread {
773 id: "w:2".to_string(),
774 name: "Lower priority".to_string(),
775 priority: 50,
776 enabled: true,
777 rules: vec![BThreadRule::BlockAlways {
778 scope: EventPattern::kind(EventKind::DangerousCommand),
779 }],
780 });
781
782 let event = make_event(EventKind::DangerousCommand, "agent-1", "pkill");
783 let decision = coord.evaluate(&event);
784 match &decision {
785 Decision::Blocked { thread_id, .. } => assert_eq!(thread_id, "w:1"),
786 _ => panic!("Expected blocked"),
787 }
788 }
789
790 #[test]
791 fn test_expand_key_template() {
792 let event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
793 assert_eq!(expand_key_template("file:{target}", &event), "file:src/main.rs");
794 assert_eq!(expand_key_template("{agent}-lock", &event), "agent-1-lock");
795 assert_eq!(expand_key_template("schema-global", &event), "schema-global");
796 }
797
798 #[test]
799 fn test_rate_limit_under_limit_proceeds() {
800 let mut coord = Coordinator::new();
801 coord.threads.push(BThread {
802 id: "w:10".to_string(),
803 name: "Commit rate".to_string(),
804 priority: 50,
805 enabled: true,
806 rules: vec![BThreadRule::RateLimit {
807 scope: EventPattern::kind(EventKind::Commit),
808 max: 3,
809 window_secs: 300,
810 }],
811 });
812
813 let now = Utc::now();
815 coord.event_log.push(make_timestamped(
816 make_event(EventKind::Commit, "agent-1", ""),
817 &now.to_rfc3339(),
818 ));
819 coord.event_log.push(make_timestamped(
820 make_event(EventKind::Commit, "agent-1", ""),
821 &now.to_rfc3339(),
822 ));
823
824 let event = make_event(EventKind::Commit, "agent-1", "");
825 assert!(coord.evaluate(&event).is_proceed());
826 }
827
828 #[test]
829 fn test_rate_limit_over_limit_blocked() {
830 let mut coord = Coordinator::new();
831 coord.threads.push(BThread {
832 id: "w:10".to_string(),
833 name: "Commit rate".to_string(),
834 priority: 50,
835 enabled: true,
836 rules: vec![BThreadRule::RateLimit {
837 scope: EventPattern::kind(EventKind::Commit),
838 max: 2,
839 window_secs: 300,
840 }],
841 });
842
843 let now = Utc::now();
845 for _ in 0..2 {
846 coord.event_log.push(make_timestamped(
847 make_event(EventKind::Commit, "agent-1", ""),
848 &now.to_rfc3339(),
849 ));
850 }
851
852 let event = make_event(EventKind::Commit, "agent-1", "");
853 let decision = coord.evaluate(&event);
854 assert!(decision.is_blocked());
855 match decision {
856 Decision::Blocked { reason, .. } => {
857 assert!(reason.contains("rate limit"));
858 }
859 _ => panic!("Expected blocked"),
860 }
861 }
862
863 #[test]
864 fn test_rate_limit_old_events_not_counted() {
865 let mut coord = Coordinator::new();
866 coord.threads.push(BThread {
867 id: "w:10".to_string(),
868 name: "Commit rate".to_string(),
869 priority: 50,
870 enabled: true,
871 rules: vec![BThreadRule::RateLimit {
872 scope: EventPattern::kind(EventKind::Commit),
873 max: 2,
874 window_secs: 60,
875 }],
876 });
877
878 let old = Utc::now() - chrono::Duration::hours(2);
880 for _ in 0..5 {
881 coord.event_log.push(make_timestamped(
882 make_event(EventKind::Commit, "agent-1", ""),
883 &old.to_rfc3339(),
884 ));
885 }
886
887 let event = make_event(EventKind::Commit, "agent-1", "");
888 assert!(coord.evaluate(&event).is_proceed());
889 }
890
891 #[test]
892 fn test_partition_in_slot_proceeds() {
893 let mut coord = Coordinator::new();
894 coord.threads.push(BThread {
895 id: "w:11".to_string(),
896 name: "File partition".to_string(),
897 priority: 20,
898 enabled: true,
899 rules: vec![BThreadRule::Partition {
900 scope: EventPattern::kind(EventKind::FileWrite),
901 strategy: PartitionStrategy::Hash,
902 agent_count: 2,
903 }],
904 });
905
906 let target = find_target_for_slot(0, PartitionStrategy::Hash, 2);
909 let event = make_event(EventKind::FileWrite, "agent-0", &target);
910 assert!(coord.evaluate(&event).is_proceed());
911 }
912
913 #[test]
914 fn test_partition_out_of_slot_blocked() {
915 let mut coord = Coordinator::new();
916 coord.threads.push(BThread {
917 id: "w:11".to_string(),
918 name: "File partition".to_string(),
919 priority: 20,
920 enabled: true,
921 rules: vec![BThreadRule::Partition {
922 scope: EventPattern::kind(EventKind::FileWrite),
923 strategy: PartitionStrategy::Hash,
924 agent_count: 2,
925 }],
926 });
927
928 let target = find_target_for_slot(1, PartitionStrategy::Hash, 2);
930 let event = make_event(EventKind::FileWrite, "agent-0", &target);
931 assert!(coord.evaluate(&event).is_blocked());
932 }
933
934 fn find_target_for_slot(slot: u32, strategy: PartitionStrategy, count: u32) -> String {
936 for i in 0..1000 {
937 let target = format!("src/file{}.rs", i);
938 if compute_partition_slot(&target, strategy, count) == slot {
939 return target;
940 }
941 }
942 panic!("Could not find target for slot {}", slot);
943 }
944
945 #[test]
946 fn test_timeout_expired_operation() {
947 let mut coord = Coordinator::new();
948 coord.threads.push(BThread {
949 id: "w:12".to_string(),
950 name: "Build timeout".to_string(),
951 priority: 30,
952 enabled: true,
953 rules: vec![BThreadRule::Timeout {
954 scope: EventPattern::kind(EventKind::Build),
955 max_duration_secs: 60,
956 action: super::super::bthread::TimeoutAction::Kill,
957 }],
958 });
959
960 let old = Utc::now() - chrono::Duration::minutes(2);
962 coord.event_log.push(make_timestamped(
963 make_event(EventKind::Build, "agent-1", ""),
964 &old.to_rfc3339(),
965 ));
966
967 let event = make_event(EventKind::Build, "agent-1", "");
968 let decision = coord.evaluate(&event);
969 assert!(decision.is_blocked());
970 match decision {
971 Decision::Blocked { reason, .. } => {
972 assert!(reason.contains("timeout"));
973 }
974 _ => panic!("Expected blocked"),
975 }
976 }
977
978 #[test]
979 fn test_timeout_not_yet_expired() {
980 let mut coord = Coordinator::new();
981 coord.threads.push(BThread {
982 id: "w:12".to_string(),
983 name: "Build timeout".to_string(),
984 priority: 30,
985 enabled: true,
986 rules: vec![BThreadRule::Timeout {
987 scope: EventPattern::kind(EventKind::Build),
988 max_duration_secs: 3600,
989 action: super::super::bthread::TimeoutAction::Kill,
990 }],
991 });
992
993 coord.event_log.push(make_timestamped(
995 make_event(EventKind::Build, "agent-1", ""),
996 &Utc::now().to_rfc3339(),
997 ));
998
999 let event = make_event(EventKind::Build, "agent-1", "");
1000 assert!(coord.evaluate(&event).is_proceed());
1001 }
1002
1003 #[test]
1004 fn test_ttl_expiry_auto_releases() {
1005 let mut coord = Coordinator::new();
1006 let old = Utc::now() - chrono::Duration::hours(2);
1008 coord.active_locks.insert(
1009 "file:src/main.rs".to_string(),
1010 ActiveLock {
1011 lock_key: "file:src/main.rs".to_string(),
1012 holder_agent: "agent-1".to_string(),
1013 task_id: None,
1014 acquired_at: old.to_rfc3339(),
1015 ttl_secs: 1,
1016 },
1017 );
1018
1019 coord.expire_locks();
1020 assert!(coord.active_locks.is_empty());
1021 }
1022
1023 #[test]
1024 fn test_ttl_zero_never_expires() {
1025 let mut coord = Coordinator::new();
1026 let old = Utc::now() - chrono::Duration::hours(2);
1027 coord.active_locks.insert(
1028 "global".to_string(),
1029 ActiveLock {
1030 lock_key: "global".to_string(),
1031 holder_agent: "agent-1".to_string(),
1032 task_id: None,
1033 acquired_at: old.to_rfc3339(),
1034 ttl_secs: 0,
1035 },
1036 );
1037
1038 coord.expire_locks();
1039 assert_eq!(coord.active_locks.len(), 1);
1040 }
1041
1042 #[test]
1043 fn test_release_specific_lock() {
1044 let mut coord = Coordinator::new();
1045 coord.active_locks.insert(
1046 "file:a.rs".to_string(),
1047 ActiveLock {
1048 lock_key: "file:a.rs".to_string(),
1049 holder_agent: "agent-1".to_string(),
1050 task_id: None,
1051 acquired_at: Utc::now().to_rfc3339(),
1052 ttl_secs: 3600,
1053 },
1054 );
1055 coord.active_locks.insert(
1056 "file:b.rs".to_string(),
1057 ActiveLock {
1058 lock_key: "file:b.rs".to_string(),
1059 holder_agent: "agent-1".to_string(),
1060 task_id: None,
1061 acquired_at: Utc::now().to_rfc3339(),
1062 ttl_secs: 3600,
1063 },
1064 );
1065
1066 coord.release("file:a.rs", "agent-1").unwrap();
1067 assert!(!coord.active_locks.contains_key("file:a.rs"));
1068 assert!(coord.active_locks.contains_key("file:b.rs"));
1069 }
1070
1071 #[test]
1072 fn test_release_all_by_agent() {
1073 let mut coord = Coordinator::new();
1074 for key in ["file:a.rs", "file:b.rs"] {
1075 coord.active_locks.insert(
1076 key.to_string(),
1077 ActiveLock {
1078 lock_key: key.to_string(),
1079 holder_agent: "agent-1".to_string(),
1080 task_id: None,
1081 acquired_at: Utc::now().to_rfc3339(),
1082 ttl_secs: 3600,
1083 },
1084 );
1085 }
1086 coord.active_locks.insert(
1087 "file:c.rs".to_string(),
1088 ActiveLock {
1089 lock_key: "file:c.rs".to_string(),
1090 holder_agent: "agent-2".to_string(),
1091 task_id: None,
1092 acquired_at: Utc::now().to_rfc3339(),
1093 ttl_secs: 3600,
1094 },
1095 );
1096
1097 coord.release_all("agent-1", None).unwrap();
1098 assert_eq!(coord.active_locks.len(), 1);
1099 assert!(coord.active_locks.contains_key("file:c.rs"));
1100 }
1101
1102 #[test]
1103 fn test_release_all_by_agent_and_task() {
1104 let mut coord = Coordinator::new();
1105 coord.active_locks.insert(
1106 "file:a.rs".to_string(),
1107 ActiveLock {
1108 lock_key: "file:a.rs".to_string(),
1109 holder_agent: "agent-1".to_string(),
1110 task_id: Some("task-1".to_string()),
1111 acquired_at: Utc::now().to_rfc3339(),
1112 ttl_secs: 3600,
1113 },
1114 );
1115 coord.active_locks.insert(
1116 "file:b.rs".to_string(),
1117 ActiveLock {
1118 lock_key: "file:b.rs".to_string(),
1119 holder_agent: "agent-1".to_string(),
1120 task_id: Some("task-2".to_string()),
1121 acquired_at: Utc::now().to_rfc3339(),
1122 ttl_secs: 3600,
1123 },
1124 );
1125
1126 coord.release_all("agent-1", Some("task-1")).unwrap();
1128 assert_eq!(coord.active_locks.len(), 1);
1129 assert!(coord.active_locks.contains_key("file:b.rs"));
1130 }
1131
1132 #[test]
1133 fn test_multiple_threads_first_block_wins() {
1134 let mut coord = Coordinator::new();
1135 coord.threads.push(BThread {
1137 id: "w:1".to_string(),
1138 name: "File block".to_string(),
1139 priority: 5,
1140 enabled: true,
1141 rules: vec![BThreadRule::BlockAlways {
1142 scope: EventPattern::kind(EventKind::FileWrite),
1143 }],
1144 });
1145 coord.threads.push(BThread {
1147 id: "w:2".to_string(),
1148 name: "Also file block".to_string(),
1149 priority: 10,
1150 enabled: true,
1151 rules: vec![BThreadRule::BlockAlways {
1152 scope: EventPattern::kind(EventKind::FileWrite),
1153 }],
1154 });
1155
1156 let event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
1157 let decision = coord.evaluate(&event);
1158 match &decision {
1159 Decision::Blocked { thread_id, .. } => {
1160 assert_eq!(thread_id, "w:1", "Higher priority (lower number) thread should block first");
1161 }
1162 _ => panic!("Expected blocked"),
1163 }
1164 }
1165
1166 #[test]
1167 fn test_evaluate_full_role_check_before_threads() {
1168 let mut coord = Coordinator::new();
1169 coord.roles.push(Role {
1170 id: "r:test".to_string(),
1171 name: "Tester".to_string(),
1172 allow_patterns: vec![GlobPattern::new("tests/**")],
1173 deny_patterns: vec![],
1174 });
1175
1176 coord.threads.push(BThread {
1178 id: "w:1".to_string(),
1179 name: "Always block".to_string(),
1180 priority: 1,
1181 enabled: true,
1182 rules: vec![BThreadRule::BlockAlways {
1183 scope: EventPattern::kind(EventKind::FileWrite),
1184 }],
1185 });
1186
1187 let mut event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
1188 event.metadata.insert("role".to_string(), "r:test".to_string());
1189
1190 let decision = coord.evaluate_full(&event);
1192 assert!(decision.is_blocked());
1193 match decision {
1194 Decision::Blocked { thread_id, .. } => {
1195 assert!(thread_id.starts_with("role:"), "Role should block, not thread");
1196 }
1197 _ => panic!("Expected blocked"),
1198 }
1199 }
1200
1201 #[test]
1202 fn test_event_pattern_with_negated_agent() {
1203 let pattern = EventPattern {
1204 kind: Some(EventKind::FileWrite),
1205 agent: Some("admin".to_string()),
1206 target: None,
1207 task_id: None,
1208 negate_agent: true,
1209 target_not: Vec::new(),
1210 };
1211
1212 let event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
1214 assert!(pattern.matches_event(&event));
1215
1216 let event = make_event(EventKind::FileWrite, "admin", "src/main.rs");
1218 assert!(!pattern.matches_event(&event));
1219 }
1220
1221 #[test]
1222 fn test_event_pattern_with_target_exclusion() {
1223 let pattern = EventPattern {
1224 kind: Some(EventKind::FileWrite),
1225 agent: None,
1226 target: None,
1227 task_id: None,
1228 negate_agent: false,
1229 target_not: vec![GlobPattern::new("docs/**"), GlobPattern::new("*.md")],
1230 };
1231
1232 let event = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
1234 assert!(pattern.matches_event(&event));
1235
1236 let event = make_event(EventKind::FileWrite, "agent-1", "docs/readme.md");
1238 assert!(!pattern.matches_event(&event));
1239
1240 let event = make_event(EventKind::FileWrite, "agent-1", "CHANGELOG.md");
1242 assert!(!pattern.matches_event(&event));
1243 }
1244
1245 #[test]
1246 fn test_record_then_evaluate_full_cycle() {
1247 let mut coord = Coordinator::new();
1248 coord.threads.push(BThread {
1249 id: "w:1".to_string(),
1250 name: "File mutex".to_string(),
1251 priority: 5,
1252 enabled: true,
1253 rules: vec![BThreadRule::Mutex {
1254 scope: EventPattern::kind(EventKind::FileWrite),
1255 key: "file:{target}".to_string(),
1256 ttl_secs: Some(3600),
1257 }],
1258 });
1259
1260 let event1 = make_event(EventKind::FileWrite, "agent-1", "src/main.rs");
1262 coord.record(&event1).unwrap();
1263 assert!(coord.active_locks.contains_key("file:src/main.rs"));
1264
1265 let event2 = make_event(EventKind::FileWrite, "agent-2", "src/main.rs");
1267 assert!(coord.evaluate(&event2).is_blocked());
1268
1269 coord.release("file:src/main.rs", "agent-1").unwrap();
1271 assert!(!coord.active_locks.contains_key("file:src/main.rs"));
1272
1273 assert!(coord.evaluate(&event2).is_proceed());
1275 }
1276
1277 #[test]
1278 fn test_expired_lock_allows_access() {
1279 let mut coord = Coordinator::new();
1280 coord.threads.push(BThread {
1281 id: "w:1".to_string(),
1282 name: "File mutex".to_string(),
1283 priority: 5,
1284 enabled: true,
1285 rules: vec![BThreadRule::Mutex {
1286 scope: EventPattern::kind(EventKind::FileWrite),
1287 key: "file:{target}".to_string(),
1288 ttl_secs: Some(1),
1289 }],
1290 });
1291
1292 let old = Utc::now() - chrono::Duration::hours(1);
1294 coord.active_locks.insert(
1295 "file:src/main.rs".to_string(),
1296 ActiveLock {
1297 lock_key: "file:src/main.rs".to_string(),
1298 holder_agent: "agent-1".to_string(),
1299 task_id: None,
1300 acquired_at: old.to_rfc3339(),
1301 ttl_secs: 1,
1302 },
1303 );
1304
1305 let event = make_event(EventKind::FileWrite, "agent-2", "src/main.rs");
1307 assert!(coord.evaluate(&event).is_proceed());
1308 }
1309
1310 #[test]
1311 fn test_decision_display() {
1312 let proceed = Decision::Proceed;
1313 assert_eq!(format!("{}", proceed), "PROCEED");
1314
1315 let wait = Decision::Wait {
1316 reason: "needs tests".to_string(),
1317 thread_id: "w:1".to_string(),
1318 };
1319 assert!(format!("{}", wait).contains("WAIT"));
1320 assert!(format!("{}", wait).contains("needs tests"));
1321
1322 let blocked = Decision::Blocked {
1323 reason: "mutex held".to_string(),
1324 thread_id: "w:2".to_string(),
1325 };
1326 assert!(format!("{}", blocked).contains("BLOCKED"));
1327 assert!(format!("{}", blocked).contains("mutex held"));
1328 }
1329
1330 #[test]
1331 fn test_block_until() {
1332 let mut coord = Coordinator::new();
1333 coord.threads.push(BThread {
1334 id: "w:5".to_string(),
1335 name: "API review gate".to_string(),
1336 priority: 15,
1337 enabled: true,
1338 rules: vec![BThreadRule::BlockUntil {
1339 trigger: EventPattern::kind(EventKind::ApiChange),
1340 block: vec![EventPattern::kind(EventKind::Build)],
1341 until: EventPattern::kind(EventKind::Custom("ApiReviewApproved".to_string())),
1342 escalate: false,
1343 escalation_message: None,
1344 }],
1345 });
1346
1347 let build = make_event(EventKind::Build, "agent-1", "");
1349 assert!(coord.evaluate(&build).is_proceed());
1350
1351 coord.event_log.push(make_timestamped(
1353 make_event(EventKind::ApiChange, "agent-1", "src/api.rs"),
1354 &Utc::now().to_rfc3339(),
1355 ));
1356
1357 assert!(coord.evaluate(&build).is_blocked());
1359
1360 coord.event_log.push(make_timestamped(
1362 make_event(EventKind::Custom("ApiReviewApproved".to_string()), "reviewer", ""),
1363 &Utc::now().to_rfc3339(),
1364 ));
1365
1366 assert!(coord.evaluate(&build).is_proceed());
1368 }
1369}