1use anyhow::{anyhow, Result};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6
7use crate::event::{EventKind, EventPattern};
8use crate::matcher::GlobPattern;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct BThread {
13 pub id: String,
15 pub name: String,
16 pub priority: u32,
18 pub enabled: bool,
19 pub rules: Vec<BThreadRule>,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24pub enum BThreadRule {
25 Mutex {
27 scope: EventPattern,
28 key: String,
30 ttl_secs: Option<u64>,
31 },
32 Require {
34 trigger: EventPattern,
35 prerequisite: EventPattern,
36 reset: Option<EventPattern>,
37 },
38 BlockUntil {
40 trigger: EventPattern,
41 block: Vec<EventPattern>,
42 until: EventPattern,
43 #[serde(default)]
44 escalate: bool,
45 #[serde(default, skip_serializing_if = "Option::is_none")]
46 escalation_message: Option<String>,
47 },
48 BlockAlways {
50 scope: EventPattern,
51 },
52 RateLimit {
54 scope: EventPattern,
55 max: u32,
56 window_secs: u64,
57 },
58 Timeout {
60 scope: EventPattern,
61 max_duration_secs: u64,
62 action: TimeoutAction,
63 },
64 Partition {
66 scope: EventPattern,
67 strategy: PartitionStrategy,
68 agent_count: u32,
69 },
70}
71
72impl BThreadRule {
73 pub fn parse(rule_type: &str, spec: &str) -> Result<BThreadRule> {
78 let params = parse_key_values(spec);
79
80 match rule_type {
81 "Mutex" => {
82 let scope = pattern_from_params(¶ms)?;
83 let key = params
84 .get("key")
85 .ok_or_else(|| anyhow!("Mutex rule requires 'key' parameter"))?
86 .clone();
87 let ttl_secs = params.get("ttl").map(|v| v.parse::<u64>()).transpose()?;
88 Ok(BThreadRule::Mutex {
89 scope,
90 key,
91 ttl_secs,
92 })
93 }
94 "Require" => {
95 let trigger_kind = params
96 .get("trigger")
97 .ok_or_else(|| anyhow!("Require rule requires 'trigger' parameter"))?;
98 let prereq_kind = params
99 .get("prereq")
100 .ok_or_else(|| anyhow!("Require rule requires 'prereq' parameter"))?;
101 let reset = params
102 .get("reset")
103 .map(|r| EventPattern::kind(EventKind::parse(r)));
104 Ok(BThreadRule::Require {
105 trigger: EventPattern::kind(EventKind::parse(trigger_kind)),
106 prerequisite: EventPattern::kind(EventKind::parse(prereq_kind)),
107 reset,
108 })
109 }
110 "BlockUntil" => {
111 let trigger_kind = params
112 .get("trigger")
113 .ok_or_else(|| anyhow!("BlockUntil rule requires 'trigger' parameter"))?;
114 let block_kinds = params
115 .get("block")
116 .ok_or_else(|| anyhow!("BlockUntil rule requires 'block' parameter"))?;
117 let until_kind = params
118 .get("until")
119 .ok_or_else(|| anyhow!("BlockUntil rule requires 'until' parameter"))?;
120 let escalate = params
121 .get("escalate")
122 .map(|v| v == "true" || v == "Y")
123 .unwrap_or(false);
124 let escalation_message = params.get("message").cloned();
125 let block = block_kinds
126 .split(',')
127 .map(|k| EventPattern::kind(EventKind::parse(k.trim())))
128 .collect();
129 Ok(BThreadRule::BlockUntil {
130 trigger: EventPattern::kind(EventKind::parse(trigger_kind)),
131 block,
132 until: EventPattern::kind(EventKind::parse(until_kind)),
133 escalate,
134 escalation_message,
135 })
136 }
137 "BlockAlways" => {
138 let scope = pattern_from_params(¶ms)?;
139 Ok(BThreadRule::BlockAlways { scope })
140 }
141 "RateLimit" => {
142 let scope = pattern_from_params(¶ms)?;
143 let max = params
144 .get("max")
145 .ok_or_else(|| anyhow!("RateLimit rule requires 'max' parameter"))?
146 .parse::<u32>()?;
147 let window_secs = params
148 .get("window")
149 .ok_or_else(|| anyhow!("RateLimit rule requires 'window' parameter"))?
150 .parse::<u64>()?;
151 Ok(BThreadRule::RateLimit {
152 scope,
153 max,
154 window_secs,
155 })
156 }
157 "Timeout" => {
158 let scope = pattern_from_params(¶ms)?;
159 let max_duration_secs = params
160 .get("max_secs")
161 .ok_or_else(|| anyhow!("Timeout rule requires 'max_secs' parameter"))?
162 .parse::<u64>()?;
163 let action = match params.get("action").map(|s| s.as_str()) {
164 Some("Warn" | "warn") => TimeoutAction::Warn,
165 _ => TimeoutAction::Kill,
166 };
167 Ok(BThreadRule::Timeout {
168 scope,
169 max_duration_secs,
170 action,
171 })
172 }
173 "Partition" => {
174 let scope = pattern_from_params(¶ms)?;
175 let strategy = match params.get("strategy").map(|s| s.as_str()) {
176 Some("round-robin" | "RoundRobin") => PartitionStrategy::RoundRobin,
177 Some("directory" | "Directory") => PartitionStrategy::Directory,
178 _ => PartitionStrategy::Hash,
179 };
180 let agent_count = params
181 .get("count")
182 .or_else(|| params.get("agent_count"))
183 .ok_or_else(|| {
184 anyhow!("Partition rule requires 'count' or 'agent_count' parameter")
185 })?
186 .parse::<u32>()?;
187 Ok(BThreadRule::Partition {
188 scope,
189 strategy,
190 agent_count,
191 })
192 }
193 other => Err(anyhow!("Unknown rule type: {}", other)),
194 }
195 }
196}
197
198fn parse_key_values(spec: &str) -> HashMap<String, String> {
200 let mut result = HashMap::new();
201 for token in spec.split_whitespace() {
202 if let Some((k, v)) = token.split_once('=') {
203 result.insert(k.to_string(), v.to_string());
204 }
205 }
206 result
207}
208
209fn pattern_from_params(params: &HashMap<String, String>) -> Result<EventPattern> {
211 let kind = params.get("kind").map(|k| EventKind::parse(k));
212 let agent = params.get("agent").cloned();
213 let target = params.get("target").map(|t| GlobPattern::new(t));
214 let negate_agent = params
215 .get("negate_agent")
216 .map(|v| v == "true" || v == "Y")
217 .unwrap_or(false);
218 let target_not = params
219 .get("target_not")
220 .map(|v| {
221 v.split(',')
222 .map(|p| GlobPattern::new(p.trim()))
223 .collect()
224 })
225 .unwrap_or_default();
226 Ok(EventPattern {
227 kind,
228 agent,
229 target,
230 task_id: None,
231 negate_agent,
232 target_not,
233 })
234}
235
236#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
238pub enum TimeoutAction {
239 #[default]
240 Kill,
241 Warn,
242}
243
244#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
246pub enum PartitionStrategy {
247 Hash,
248 RoundRobin,
249 Directory,
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct Role {
255 pub id: String,
257 pub name: String,
258 pub allow_patterns: Vec<GlobPattern>,
260 pub deny_patterns: Vec<GlobPattern>,
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct PartitionDef {
267 pub id: String,
269 pub scope_pattern: GlobPattern,
271 pub strategy: PartitionStrategy,
272 pub agent_count: u32,
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize, Default)]
277pub struct NodeAnnotation {
278 #[serde(default, skip_serializing_if = "Option::is_none")]
279 pub role: Option<String>,
280 #[serde(default, skip_serializing_if = "Option::is_none")]
281 pub scope: Option<String>,
282}
283
284#[derive(Debug, Clone, Serialize, Deserialize)]
286pub struct WeaveEdge {
287 pub from: String,
288 pub to: String,
289 pub edge_type: WeaveEdgeType,
290 #[serde(default, skip_serializing_if = "Option::is_none")]
291 pub reason: Option<String>,
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297
298 #[test]
299 fn test_parse_mutex_rule() {
300 let rule = BThreadRule::parse("Mutex", "kind=FileWrite key=file:{target}").unwrap();
301 match rule {
302 BThreadRule::Mutex { scope, key, ttl_secs } => {
303 assert_eq!(scope.kind, Some(EventKind::FileWrite));
304 assert_eq!(key, "file:{target}");
305 assert!(ttl_secs.is_none());
306 }
307 _ => panic!("Expected Mutex rule"),
308 }
309 }
310
311 #[test]
312 fn test_parse_mutex_rule_with_ttl() {
313 let rule = BThreadRule::parse("Mutex", "kind=FileWrite key=file:{target} ttl=600").unwrap();
314 match rule {
315 BThreadRule::Mutex { ttl_secs, .. } => {
316 assert_eq!(ttl_secs, Some(600));
317 }
318 _ => panic!("Expected Mutex rule"),
319 }
320 }
321
322 #[test]
323 fn test_parse_require_rule() {
324 let rule = BThreadRule::parse("Require", "trigger=Commit prereq=TestPass").unwrap();
325 match rule {
326 BThreadRule::Require { trigger, prerequisite, reset } => {
327 assert_eq!(trigger.kind, Some(EventKind::Commit));
328 assert_eq!(prerequisite.kind, Some(EventKind::TestPass));
329 assert!(reset.is_none());
330 }
331 _ => panic!("Expected Require rule"),
332 }
333 }
334
335 #[test]
336 fn test_parse_require_rule_with_reset() {
337 let rule = BThreadRule::parse("Require", "trigger=Commit prereq=TestPass reset=FileWrite").unwrap();
338 match rule {
339 BThreadRule::Require { reset, .. } => {
340 assert!(reset.is_some());
341 assert_eq!(reset.unwrap().kind, Some(EventKind::FileWrite));
342 }
343 _ => panic!("Expected Require rule"),
344 }
345 }
346
347 #[test]
348 fn test_parse_block_always_rule() {
349 let rule = BThreadRule::parse("BlockAlways", "kind=DangerousCommand").unwrap();
350 match rule {
351 BThreadRule::BlockAlways { scope } => {
352 assert_eq!(scope.kind, Some(EventKind::DangerousCommand));
353 }
354 _ => panic!("Expected BlockAlways rule"),
355 }
356 }
357
358 #[test]
359 fn test_parse_block_until_rule() {
360 let rule = BThreadRule::parse("BlockUntil", "trigger=ApiChange block=Build until=ApiReviewApproved").unwrap();
361 match rule {
362 BThreadRule::BlockUntil { trigger, block, until, escalate, escalation_message } => {
363 assert_eq!(trigger.kind, Some(EventKind::ApiChange));
364 assert_eq!(block.len(), 1);
365 assert_eq!(block[0].kind, Some(EventKind::Build));
366 assert_eq!(until.kind, Some(EventKind::Custom("ApiReviewApproved".to_string())));
367 assert!(!escalate);
368 assert!(escalation_message.is_none());
369 }
370 _ => panic!("Expected BlockUntil rule"),
371 }
372 }
373
374 #[test]
375 fn test_parse_block_until_with_escalate() {
376 let rule = BThreadRule::parse("BlockUntil", "trigger=ApiChange block=Build until=Approved escalate=true").unwrap();
377 match rule {
378 BThreadRule::BlockUntil { escalate, .. } => {
379 assert!(escalate);
380 }
381 _ => panic!("Expected BlockUntil rule"),
382 }
383 }
384
385 #[test]
386 fn test_parse_rate_limit_rule() {
387 let rule = BThreadRule::parse("RateLimit", "kind=Commit max=5 window=120").unwrap();
388 match rule {
389 BThreadRule::RateLimit { scope, max, window_secs } => {
390 assert_eq!(scope.kind, Some(EventKind::Commit));
391 assert_eq!(max, 5);
392 assert_eq!(window_secs, 120);
393 }
394 _ => panic!("Expected RateLimit rule"),
395 }
396 }
397
398 #[test]
399 fn test_parse_timeout_rule() {
400 let rule = BThreadRule::parse("Timeout", "kind=TestRun max_secs=300 action=Warn").unwrap();
401 match rule {
402 BThreadRule::Timeout { scope, max_duration_secs, action } => {
403 assert_eq!(scope.kind, Some(EventKind::TestRun));
404 assert_eq!(max_duration_secs, 300);
405 assert_eq!(action, TimeoutAction::Warn);
406 }
407 _ => panic!("Expected Timeout rule"),
408 }
409 }
410
411 #[test]
412 fn test_parse_timeout_rule_default_action() {
413 let rule = BThreadRule::parse("Timeout", "kind=Build max_secs=600").unwrap();
414 match rule {
415 BThreadRule::Timeout { action, .. } => {
416 assert_eq!(action, TimeoutAction::Kill);
417 }
418 _ => panic!("Expected Timeout rule"),
419 }
420 }
421
422 #[test]
423 fn test_parse_partition_rule_hash() {
424 let rule = BThreadRule::parse("Partition", "kind=FileWrite count=4").unwrap();
425 match rule {
426 BThreadRule::Partition { scope, strategy, agent_count } => {
427 assert_eq!(scope.kind, Some(EventKind::FileWrite));
428 assert_eq!(strategy, PartitionStrategy::Hash);
429 assert_eq!(agent_count, 4);
430 }
431 _ => panic!("Expected Partition rule"),
432 }
433 }
434
435 #[test]
436 fn test_parse_partition_rule_round_robin() {
437 let rule = BThreadRule::parse("Partition", "kind=FileWrite strategy=round-robin count=3").unwrap();
438 match rule {
439 BThreadRule::Partition { strategy, .. } => {
440 assert_eq!(strategy, PartitionStrategy::RoundRobin);
441 }
442 _ => panic!("Expected Partition rule"),
443 }
444 }
445
446 #[test]
447 fn test_parse_partition_rule_directory() {
448 let rule = BThreadRule::parse("Partition", "kind=FileWrite strategy=Directory agent_count=2").unwrap();
449 match rule {
450 BThreadRule::Partition { strategy, agent_count, .. } => {
451 assert_eq!(strategy, PartitionStrategy::Directory);
452 assert_eq!(agent_count, 2);
453 }
454 _ => panic!("Expected Partition rule"),
455 }
456 }
457
458 #[test]
459 fn test_parse_unknown_rule_type_errors() {
460 let result = BThreadRule::parse("UnknownRule", "kind=FileWrite");
461 assert!(result.is_err());
462 assert!(result.unwrap_err().to_string().contains("Unknown rule type"));
463 }
464
465 #[test]
466 fn test_parse_mutex_missing_key_errors() {
467 let result = BThreadRule::parse("Mutex", "kind=FileWrite");
468 assert!(result.is_err());
469 assert!(result.unwrap_err().to_string().contains("key"));
470 }
471
472 #[test]
473 fn test_parse_require_missing_trigger_errors() {
474 let result = BThreadRule::parse("Require", "prereq=TestPass");
475 assert!(result.is_err());
476 assert!(result.unwrap_err().to_string().contains("trigger"));
477 }
478
479 #[test]
480 fn test_parse_require_missing_prereq_errors() {
481 let result = BThreadRule::parse("Require", "trigger=Commit");
482 assert!(result.is_err());
483 assert!(result.unwrap_err().to_string().contains("prereq"));
484 }
485
486 #[test]
487 fn test_parse_rate_limit_missing_max_errors() {
488 let result = BThreadRule::parse("RateLimit", "kind=Commit window=60");
489 assert!(result.is_err());
490 assert!(result.unwrap_err().to_string().contains("max"));
491 }
492
493 #[test]
494 fn test_parse_rate_limit_missing_window_errors() {
495 let result = BThreadRule::parse("RateLimit", "kind=Commit max=5");
496 assert!(result.is_err());
497 assert!(result.unwrap_err().to_string().contains("window"));
498 }
499
500 #[test]
501 fn test_parse_timeout_missing_max_secs_errors() {
502 let result = BThreadRule::parse("Timeout", "kind=TestRun action=kill");
503 assert!(result.is_err());
504 assert!(result.unwrap_err().to_string().contains("max_secs"));
505 }
506
507 #[test]
508 fn test_parse_partition_missing_count_errors() {
509 let result = BThreadRule::parse("Partition", "kind=FileWrite strategy=hash");
510 assert!(result.is_err());
511 assert!(result.unwrap_err().to_string().contains("count"));
512 }
513
514 #[test]
515 fn test_parse_block_until_missing_trigger_errors() {
516 let result = BThreadRule::parse("BlockUntil", "block=Build until=Approved");
517 assert!(result.is_err());
518 assert!(result.unwrap_err().to_string().contains("trigger"));
519 }
520
521 #[test]
522 fn test_parse_block_until_missing_block_errors() {
523 let result = BThreadRule::parse("BlockUntil", "trigger=ApiChange until=Approved");
524 assert!(result.is_err());
525 assert!(result.unwrap_err().to_string().contains("block"));
526 }
527
528 #[test]
529 fn test_parse_block_until_missing_until_errors() {
530 let result = BThreadRule::parse("BlockUntil", "trigger=ApiChange block=Build");
531 assert!(result.is_err());
532 assert!(result.unwrap_err().to_string().contains("until"));
533 }
534
535 #[test]
536 fn test_parse_with_agent_and_target() {
537 let rule = BThreadRule::parse("BlockAlways", "kind=FileWrite agent=bot target=src/**").unwrap();
538 match rule {
539 BThreadRule::BlockAlways { scope } => {
540 assert_eq!(scope.kind, Some(EventKind::FileWrite));
541 assert_eq!(scope.agent, Some("bot".to_string()));
542 assert!(scope.target.is_some());
543 assert_eq!(scope.target.unwrap().as_str(), "src/**");
544 }
545 _ => panic!("Expected BlockAlways rule"),
546 }
547 }
548
549 #[test]
550 fn test_parse_with_negate_agent() {
551 let rule = BThreadRule::parse("BlockAlways", "kind=FileWrite agent=admin negate_agent=true").unwrap();
552 match rule {
553 BThreadRule::BlockAlways { scope } => {
554 assert!(scope.negate_agent);
555 assert_eq!(scope.agent, Some("admin".to_string()));
556 }
557 _ => panic!("Expected BlockAlways rule"),
558 }
559 }
560
561 #[test]
562 fn test_parse_with_target_not() {
563 let rule = BThreadRule::parse("Mutex", "kind=FileWrite key=f:{target} target_not=docs/**,*.md").unwrap();
564 match rule {
565 BThreadRule::Mutex { scope, .. } => {
566 assert_eq!(scope.target_not.len(), 2);
567 assert_eq!(scope.target_not[0].as_str(), "docs/**");
568 assert_eq!(scope.target_not[1].as_str(), "*.md");
569 }
570 _ => panic!("Expected Mutex rule"),
571 }
572 }
573
574 #[test]
575 fn test_parse_block_until_multiple_block_kinds() {
576 let rule = BThreadRule::parse("BlockUntil", "trigger=ApiChange block=Build,Commit until=Approved").unwrap();
577 match rule {
578 BThreadRule::BlockUntil { block, .. } => {
579 assert_eq!(block.len(), 2);
580 assert_eq!(block[0].kind, Some(EventKind::Build));
581 assert_eq!(block[1].kind, Some(EventKind::Commit));
582 }
583 _ => panic!("Expected BlockUntil rule"),
584 }
585 }
586
587 #[test]
588 fn test_parse_key_values_helper() {
589 let params = parse_key_values("kind=FileWrite key=file:{target} ttl=600");
590 assert_eq!(params.get("kind").unwrap(), "FileWrite");
591 assert_eq!(params.get("key").unwrap(), "file:{target}");
592 assert_eq!(params.get("ttl").unwrap(), "600");
593 }
594
595 #[test]
596 fn test_parse_key_values_empty() {
597 let params = parse_key_values("");
598 assert!(params.is_empty());
599 }
600
601 #[test]
602 fn test_parse_key_values_ignores_bare_tokens() {
603 let params = parse_key_values("kind=FileWrite baretoken key=val");
604 assert_eq!(params.len(), 2);
605 assert!(!params.contains_key("baretoken"));
606 }
607
608 #[test]
609 fn test_weave_edge_type_parse() {
610 assert_eq!(WeaveEdgeType::parse("~~"), Some(WeaveEdgeType::Conflict));
611 assert_eq!(WeaveEdgeType::parse(">>"), Some(WeaveEdgeType::Sequence));
612 assert_eq!(WeaveEdgeType::parse("!="), Some(WeaveEdgeType::Exclusion));
613 assert_eq!(WeaveEdgeType::parse("->"), None);
614 assert_eq!(WeaveEdgeType::parse("xx"), None);
615 }
616
617 #[test]
618 fn test_weave_edge_type_operator_roundtrip() {
619 for edge_type in [WeaveEdgeType::Conflict, WeaveEdgeType::Sequence, WeaveEdgeType::Exclusion] {
620 let op = edge_type.operator();
621 assert_eq!(WeaveEdgeType::parse(op), Some(edge_type));
622 }
623 }
624}
625
626#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
628pub enum WeaveEdgeType {
629 Conflict,
631 Sequence,
633 Exclusion,
635}
636
637impl WeaveEdgeType {
638 pub fn operator(&self) -> &'static str {
639 match self {
640 WeaveEdgeType::Conflict => "~~",
641 WeaveEdgeType::Sequence => ">>",
642 WeaveEdgeType::Exclusion => "!=",
643 }
644 }
645
646 pub fn parse(s: &str) -> Option<Self> {
647 match s.trim() {
648 "~~" => Some(WeaveEdgeType::Conflict),
649 ">>" => Some(WeaveEdgeType::Sequence),
650 "!=" => Some(WeaveEdgeType::Exclusion),
651 _ => None,
652 }
653 }
654}