Skip to main content

scud_weave/
bthread.rs

1//! B-Thread definitions and rule types.
2
3use anyhow::{anyhow, Result};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6
7use crate::event::{EventKind, EventPattern};
8use crate::matcher::GlobPattern;
9
10/// A behavioral thread — a named, prioritized set of coordination rules.
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct BThread {
13    /// Thread ID, e.g. "w:1"
14    pub id: String,
15    pub name: String,
16    /// Lower number = higher priority.
17    pub priority: u32,
18    pub enabled: bool,
19    pub rules: Vec<BThreadRule>,
20}
21
22/// Rule types for b-thread coordination.
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub enum BThreadRule {
25    /// Only one agent can hold a resource at a time.
26    Mutex {
27        scope: EventPattern,
28        /// Template key, e.g. "file:{target}" or "schema-global"
29        key: String,
30        ttl_secs: Option<u64>,
31    },
32    /// Action X requires prior event Y (with optional reset).
33    Require {
34        trigger: EventPattern,
35        prerequisite: EventPattern,
36        reset: Option<EventPattern>,
37    },
38    /// Block event Y after event X until event Z occurs.
39    BlockUntil {
40        trigger: EventPattern,
41        block: Vec<EventPattern>,
42        until: EventPattern,
43        #[serde(default)]
44        escalate: bool,
45        #[serde(default, skip_serializing_if = "Option::is_none")]
46        escalation_message: Option<String>,
47    },
48    /// Unconditionally block matching events.
49    BlockAlways {
50        scope: EventPattern,
51    },
52    /// Max N events in a time window.
53    RateLimit {
54        scope: EventPattern,
55        max: u32,
56        window_secs: u64,
57    },
58    /// Kill operations exceeding a time budget.
59    Timeout {
60        scope: EventPattern,
61        max_duration_secs: u64,
62        action: TimeoutAction,
63    },
64    /// Deterministic work sharding across agents.
65    Partition {
66        scope: EventPattern,
67        strategy: PartitionStrategy,
68        agent_count: u32,
69    },
70}
71
72impl BThreadRule {
73    /// Parse a rule from its type name and spec string.
74    ///
75    /// The spec is space-separated `key=value` pairs.
76    /// Example: `kind=FileWrite key=file:{target}` with rule_type="Mutex"
77    pub fn parse(rule_type: &str, spec: &str) -> Result<BThreadRule> {
78        let params = parse_key_values(spec);
79
80        match rule_type {
81            "Mutex" => {
82                let scope = pattern_from_params(&params)?;
83                let key = params
84                    .get("key")
85                    .ok_or_else(|| anyhow!("Mutex rule requires 'key' parameter"))?
86                    .clone();
87                let ttl_secs = params.get("ttl").map(|v| v.parse::<u64>()).transpose()?;
88                Ok(BThreadRule::Mutex {
89                    scope,
90                    key,
91                    ttl_secs,
92                })
93            }
94            "Require" => {
95                let trigger_kind = params
96                    .get("trigger")
97                    .ok_or_else(|| anyhow!("Require rule requires 'trigger' parameter"))?;
98                let prereq_kind = params
99                    .get("prereq")
100                    .ok_or_else(|| anyhow!("Require rule requires 'prereq' parameter"))?;
101                let reset = params
102                    .get("reset")
103                    .map(|r| EventPattern::kind(EventKind::parse(r)));
104                Ok(BThreadRule::Require {
105                    trigger: EventPattern::kind(EventKind::parse(trigger_kind)),
106                    prerequisite: EventPattern::kind(EventKind::parse(prereq_kind)),
107                    reset,
108                })
109            }
110            "BlockUntil" => {
111                let trigger_kind = params
112                    .get("trigger")
113                    .ok_or_else(|| anyhow!("BlockUntil rule requires 'trigger' parameter"))?;
114                let block_kinds = params
115                    .get("block")
116                    .ok_or_else(|| anyhow!("BlockUntil rule requires 'block' parameter"))?;
117                let until_kind = params
118                    .get("until")
119                    .ok_or_else(|| anyhow!("BlockUntil rule requires 'until' parameter"))?;
120                let escalate = params
121                    .get("escalate")
122                    .map(|v| v == "true" || v == "Y")
123                    .unwrap_or(false);
124                let escalation_message = params.get("message").cloned();
125                let block = block_kinds
126                    .split(',')
127                    .map(|k| EventPattern::kind(EventKind::parse(k.trim())))
128                    .collect();
129                Ok(BThreadRule::BlockUntil {
130                    trigger: EventPattern::kind(EventKind::parse(trigger_kind)),
131                    block,
132                    until: EventPattern::kind(EventKind::parse(until_kind)),
133                    escalate,
134                    escalation_message,
135                })
136            }
137            "BlockAlways" => {
138                let scope = pattern_from_params(&params)?;
139                Ok(BThreadRule::BlockAlways { scope })
140            }
141            "RateLimit" => {
142                let scope = pattern_from_params(&params)?;
143                let max = params
144                    .get("max")
145                    .ok_or_else(|| anyhow!("RateLimit rule requires 'max' parameter"))?
146                    .parse::<u32>()?;
147                let window_secs = params
148                    .get("window")
149                    .ok_or_else(|| anyhow!("RateLimit rule requires 'window' parameter"))?
150                    .parse::<u64>()?;
151                Ok(BThreadRule::RateLimit {
152                    scope,
153                    max,
154                    window_secs,
155                })
156            }
157            "Timeout" => {
158                let scope = pattern_from_params(&params)?;
159                let max_duration_secs = params
160                    .get("max_secs")
161                    .ok_or_else(|| anyhow!("Timeout rule requires 'max_secs' parameter"))?
162                    .parse::<u64>()?;
163                let action = match params.get("action").map(|s| s.as_str()) {
164                    Some("Warn" | "warn") => TimeoutAction::Warn,
165                    _ => TimeoutAction::Kill,
166                };
167                Ok(BThreadRule::Timeout {
168                    scope,
169                    max_duration_secs,
170                    action,
171                })
172            }
173            "Partition" => {
174                let scope = pattern_from_params(&params)?;
175                let strategy = match params.get("strategy").map(|s| s.as_str()) {
176                    Some("round-robin" | "RoundRobin") => PartitionStrategy::RoundRobin,
177                    Some("directory" | "Directory") => PartitionStrategy::Directory,
178                    _ => PartitionStrategy::Hash,
179                };
180                let agent_count = params
181                    .get("count")
182                    .or_else(|| params.get("agent_count"))
183                    .ok_or_else(|| {
184                        anyhow!("Partition rule requires 'count' or 'agent_count' parameter")
185                    })?
186                    .parse::<u32>()?;
187                Ok(BThreadRule::Partition {
188                    scope,
189                    strategy,
190                    agent_count,
191                })
192            }
193            other => Err(anyhow!("Unknown rule type: {}", other)),
194        }
195    }
196}
197
198/// Parse space-separated `key=value` pairs from a spec string.
199fn parse_key_values(spec: &str) -> HashMap<String, String> {
200    let mut result = HashMap::new();
201    for token in spec.split_whitespace() {
202        if let Some((k, v)) = token.split_once('=') {
203            result.insert(k.to_string(), v.to_string());
204        }
205    }
206    result
207}
208
209/// Build an EventPattern from parsed parameters (using `kind`, `target`, `agent` keys).
210fn pattern_from_params(params: &HashMap<String, String>) -> Result<EventPattern> {
211    let kind = params.get("kind").map(|k| EventKind::parse(k));
212    let agent = params.get("agent").cloned();
213    let target = params.get("target").map(|t| GlobPattern::new(t));
214    let negate_agent = params
215        .get("negate_agent")
216        .map(|v| v == "true" || v == "Y")
217        .unwrap_or(false);
218    let target_not = params
219        .get("target_not")
220        .map(|v| {
221            v.split(',')
222                .map(|p| GlobPattern::new(p.trim()))
223                .collect()
224        })
225        .unwrap_or_default();
226    Ok(EventPattern {
227        kind,
228        agent,
229        target,
230        task_id: None,
231        negate_agent,
232        target_not,
233    })
234}
235
236/// What to do when a timeout fires.
237#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
238pub enum TimeoutAction {
239    #[default]
240    Kill,
241    Warn,
242}
243
244/// Strategy for partitioning work across agents.
245#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
246pub enum PartitionStrategy {
247    Hash,
248    RoundRobin,
249    Directory,
250}
251
252/// An agent role with scope constraints.
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct Role {
255    /// Role ID, e.g. "r:impl"
256    pub id: String,
257    pub name: String,
258    /// Glob patterns the agent CAN write to. Empty means everything allowed.
259    pub allow_patterns: Vec<GlobPattern>,
260    /// Glob patterns the agent CANNOT write to. Empty means nothing denied.
261    pub deny_patterns: Vec<GlobPattern>,
262}
263
264/// A partition definition for deterministic work sharding.
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct PartitionDef {
267    /// Partition ID, e.g. "p:1"
268    pub id: String,
269    /// Glob pattern for files in scope.
270    pub scope_pattern: GlobPattern,
271    pub strategy: PartitionStrategy,
272    pub agent_count: u32,
273}
274
275/// Node annotation from extended @nodes (role=, scope=).
276#[derive(Debug, Clone, Serialize, Deserialize, Default)]
277pub struct NodeAnnotation {
278    #[serde(default, skip_serializing_if = "Option::is_none")]
279    pub role: Option<String>,
280    #[serde(default, skip_serializing_if = "Option::is_none")]
281    pub scope: Option<String>,
282}
283
284/// A behavioral edge between two nodes.
285#[derive(Debug, Clone, Serialize, Deserialize)]
286pub struct WeaveEdge {
287    pub from: String,
288    pub to: String,
289    pub edge_type: WeaveEdgeType,
290    #[serde(default, skip_serializing_if = "Option::is_none")]
291    pub reason: Option<String>,
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297
298    #[test]
299    fn test_parse_mutex_rule() {
300        let rule = BThreadRule::parse("Mutex", "kind=FileWrite key=file:{target}").unwrap();
301        match rule {
302            BThreadRule::Mutex { scope, key, ttl_secs } => {
303                assert_eq!(scope.kind, Some(EventKind::FileWrite));
304                assert_eq!(key, "file:{target}");
305                assert!(ttl_secs.is_none());
306            }
307            _ => panic!("Expected Mutex rule"),
308        }
309    }
310
311    #[test]
312    fn test_parse_mutex_rule_with_ttl() {
313        let rule = BThreadRule::parse("Mutex", "kind=FileWrite key=file:{target} ttl=600").unwrap();
314        match rule {
315            BThreadRule::Mutex { ttl_secs, .. } => {
316                assert_eq!(ttl_secs, Some(600));
317            }
318            _ => panic!("Expected Mutex rule"),
319        }
320    }
321
322    #[test]
323    fn test_parse_require_rule() {
324        let rule = BThreadRule::parse("Require", "trigger=Commit prereq=TestPass").unwrap();
325        match rule {
326            BThreadRule::Require { trigger, prerequisite, reset } => {
327                assert_eq!(trigger.kind, Some(EventKind::Commit));
328                assert_eq!(prerequisite.kind, Some(EventKind::TestPass));
329                assert!(reset.is_none());
330            }
331            _ => panic!("Expected Require rule"),
332        }
333    }
334
335    #[test]
336    fn test_parse_require_rule_with_reset() {
337        let rule = BThreadRule::parse("Require", "trigger=Commit prereq=TestPass reset=FileWrite").unwrap();
338        match rule {
339            BThreadRule::Require { reset, .. } => {
340                assert!(reset.is_some());
341                assert_eq!(reset.unwrap().kind, Some(EventKind::FileWrite));
342            }
343            _ => panic!("Expected Require rule"),
344        }
345    }
346
347    #[test]
348    fn test_parse_block_always_rule() {
349        let rule = BThreadRule::parse("BlockAlways", "kind=DangerousCommand").unwrap();
350        match rule {
351            BThreadRule::BlockAlways { scope } => {
352                assert_eq!(scope.kind, Some(EventKind::DangerousCommand));
353            }
354            _ => panic!("Expected BlockAlways rule"),
355        }
356    }
357
358    #[test]
359    fn test_parse_block_until_rule() {
360        let rule = BThreadRule::parse("BlockUntil", "trigger=ApiChange block=Build until=ApiReviewApproved").unwrap();
361        match rule {
362            BThreadRule::BlockUntil { trigger, block, until, escalate, escalation_message } => {
363                assert_eq!(trigger.kind, Some(EventKind::ApiChange));
364                assert_eq!(block.len(), 1);
365                assert_eq!(block[0].kind, Some(EventKind::Build));
366                assert_eq!(until.kind, Some(EventKind::Custom("ApiReviewApproved".to_string())));
367                assert!(!escalate);
368                assert!(escalation_message.is_none());
369            }
370            _ => panic!("Expected BlockUntil rule"),
371        }
372    }
373
374    #[test]
375    fn test_parse_block_until_with_escalate() {
376        let rule = BThreadRule::parse("BlockUntil", "trigger=ApiChange block=Build until=Approved escalate=true").unwrap();
377        match rule {
378            BThreadRule::BlockUntil { escalate, .. } => {
379                assert!(escalate);
380            }
381            _ => panic!("Expected BlockUntil rule"),
382        }
383    }
384
385    #[test]
386    fn test_parse_rate_limit_rule() {
387        let rule = BThreadRule::parse("RateLimit", "kind=Commit max=5 window=120").unwrap();
388        match rule {
389            BThreadRule::RateLimit { scope, max, window_secs } => {
390                assert_eq!(scope.kind, Some(EventKind::Commit));
391                assert_eq!(max, 5);
392                assert_eq!(window_secs, 120);
393            }
394            _ => panic!("Expected RateLimit rule"),
395        }
396    }
397
398    #[test]
399    fn test_parse_timeout_rule() {
400        let rule = BThreadRule::parse("Timeout", "kind=TestRun max_secs=300 action=Warn").unwrap();
401        match rule {
402            BThreadRule::Timeout { scope, max_duration_secs, action } => {
403                assert_eq!(scope.kind, Some(EventKind::TestRun));
404                assert_eq!(max_duration_secs, 300);
405                assert_eq!(action, TimeoutAction::Warn);
406            }
407            _ => panic!("Expected Timeout rule"),
408        }
409    }
410
411    #[test]
412    fn test_parse_timeout_rule_default_action() {
413        let rule = BThreadRule::parse("Timeout", "kind=Build max_secs=600").unwrap();
414        match rule {
415            BThreadRule::Timeout { action, .. } => {
416                assert_eq!(action, TimeoutAction::Kill);
417            }
418            _ => panic!("Expected Timeout rule"),
419        }
420    }
421
422    #[test]
423    fn test_parse_partition_rule_hash() {
424        let rule = BThreadRule::parse("Partition", "kind=FileWrite count=4").unwrap();
425        match rule {
426            BThreadRule::Partition { scope, strategy, agent_count } => {
427                assert_eq!(scope.kind, Some(EventKind::FileWrite));
428                assert_eq!(strategy, PartitionStrategy::Hash);
429                assert_eq!(agent_count, 4);
430            }
431            _ => panic!("Expected Partition rule"),
432        }
433    }
434
435    #[test]
436    fn test_parse_partition_rule_round_robin() {
437        let rule = BThreadRule::parse("Partition", "kind=FileWrite strategy=round-robin count=3").unwrap();
438        match rule {
439            BThreadRule::Partition { strategy, .. } => {
440                assert_eq!(strategy, PartitionStrategy::RoundRobin);
441            }
442            _ => panic!("Expected Partition rule"),
443        }
444    }
445
446    #[test]
447    fn test_parse_partition_rule_directory() {
448        let rule = BThreadRule::parse("Partition", "kind=FileWrite strategy=Directory agent_count=2").unwrap();
449        match rule {
450            BThreadRule::Partition { strategy, agent_count, .. } => {
451                assert_eq!(strategy, PartitionStrategy::Directory);
452                assert_eq!(agent_count, 2);
453            }
454            _ => panic!("Expected Partition rule"),
455        }
456    }
457
458    #[test]
459    fn test_parse_unknown_rule_type_errors() {
460        let result = BThreadRule::parse("UnknownRule", "kind=FileWrite");
461        assert!(result.is_err());
462        assert!(result.unwrap_err().to_string().contains("Unknown rule type"));
463    }
464
465    #[test]
466    fn test_parse_mutex_missing_key_errors() {
467        let result = BThreadRule::parse("Mutex", "kind=FileWrite");
468        assert!(result.is_err());
469        assert!(result.unwrap_err().to_string().contains("key"));
470    }
471
472    #[test]
473    fn test_parse_require_missing_trigger_errors() {
474        let result = BThreadRule::parse("Require", "prereq=TestPass");
475        assert!(result.is_err());
476        assert!(result.unwrap_err().to_string().contains("trigger"));
477    }
478
479    #[test]
480    fn test_parse_require_missing_prereq_errors() {
481        let result = BThreadRule::parse("Require", "trigger=Commit");
482        assert!(result.is_err());
483        assert!(result.unwrap_err().to_string().contains("prereq"));
484    }
485
486    #[test]
487    fn test_parse_rate_limit_missing_max_errors() {
488        let result = BThreadRule::parse("RateLimit", "kind=Commit window=60");
489        assert!(result.is_err());
490        assert!(result.unwrap_err().to_string().contains("max"));
491    }
492
493    #[test]
494    fn test_parse_rate_limit_missing_window_errors() {
495        let result = BThreadRule::parse("RateLimit", "kind=Commit max=5");
496        assert!(result.is_err());
497        assert!(result.unwrap_err().to_string().contains("window"));
498    }
499
500    #[test]
501    fn test_parse_timeout_missing_max_secs_errors() {
502        let result = BThreadRule::parse("Timeout", "kind=TestRun action=kill");
503        assert!(result.is_err());
504        assert!(result.unwrap_err().to_string().contains("max_secs"));
505    }
506
507    #[test]
508    fn test_parse_partition_missing_count_errors() {
509        let result = BThreadRule::parse("Partition", "kind=FileWrite strategy=hash");
510        assert!(result.is_err());
511        assert!(result.unwrap_err().to_string().contains("count"));
512    }
513
514    #[test]
515    fn test_parse_block_until_missing_trigger_errors() {
516        let result = BThreadRule::parse("BlockUntil", "block=Build until=Approved");
517        assert!(result.is_err());
518        assert!(result.unwrap_err().to_string().contains("trigger"));
519    }
520
521    #[test]
522    fn test_parse_block_until_missing_block_errors() {
523        let result = BThreadRule::parse("BlockUntil", "trigger=ApiChange until=Approved");
524        assert!(result.is_err());
525        assert!(result.unwrap_err().to_string().contains("block"));
526    }
527
528    #[test]
529    fn test_parse_block_until_missing_until_errors() {
530        let result = BThreadRule::parse("BlockUntil", "trigger=ApiChange block=Build");
531        assert!(result.is_err());
532        assert!(result.unwrap_err().to_string().contains("until"));
533    }
534
535    #[test]
536    fn test_parse_with_agent_and_target() {
537        let rule = BThreadRule::parse("BlockAlways", "kind=FileWrite agent=bot target=src/**").unwrap();
538        match rule {
539            BThreadRule::BlockAlways { scope } => {
540                assert_eq!(scope.kind, Some(EventKind::FileWrite));
541                assert_eq!(scope.agent, Some("bot".to_string()));
542                assert!(scope.target.is_some());
543                assert_eq!(scope.target.unwrap().as_str(), "src/**");
544            }
545            _ => panic!("Expected BlockAlways rule"),
546        }
547    }
548
549    #[test]
550    fn test_parse_with_negate_agent() {
551        let rule = BThreadRule::parse("BlockAlways", "kind=FileWrite agent=admin negate_agent=true").unwrap();
552        match rule {
553            BThreadRule::BlockAlways { scope } => {
554                assert!(scope.negate_agent);
555                assert_eq!(scope.agent, Some("admin".to_string()));
556            }
557            _ => panic!("Expected BlockAlways rule"),
558        }
559    }
560
561    #[test]
562    fn test_parse_with_target_not() {
563        let rule = BThreadRule::parse("Mutex", "kind=FileWrite key=f:{target} target_not=docs/**,*.md").unwrap();
564        match rule {
565            BThreadRule::Mutex { scope, .. } => {
566                assert_eq!(scope.target_not.len(), 2);
567                assert_eq!(scope.target_not[0].as_str(), "docs/**");
568                assert_eq!(scope.target_not[1].as_str(), "*.md");
569            }
570            _ => panic!("Expected Mutex rule"),
571        }
572    }
573
574    #[test]
575    fn test_parse_block_until_multiple_block_kinds() {
576        let rule = BThreadRule::parse("BlockUntil", "trigger=ApiChange block=Build,Commit until=Approved").unwrap();
577        match rule {
578            BThreadRule::BlockUntil { block, .. } => {
579                assert_eq!(block.len(), 2);
580                assert_eq!(block[0].kind, Some(EventKind::Build));
581                assert_eq!(block[1].kind, Some(EventKind::Commit));
582            }
583            _ => panic!("Expected BlockUntil rule"),
584        }
585    }
586
587    #[test]
588    fn test_parse_key_values_helper() {
589        let params = parse_key_values("kind=FileWrite key=file:{target} ttl=600");
590        assert_eq!(params.get("kind").unwrap(), "FileWrite");
591        assert_eq!(params.get("key").unwrap(), "file:{target}");
592        assert_eq!(params.get("ttl").unwrap(), "600");
593    }
594
595    #[test]
596    fn test_parse_key_values_empty() {
597        let params = parse_key_values("");
598        assert!(params.is_empty());
599    }
600
601    #[test]
602    fn test_parse_key_values_ignores_bare_tokens() {
603        let params = parse_key_values("kind=FileWrite baretoken key=val");
604        assert_eq!(params.len(), 2);
605        assert!(!params.contains_key("baretoken"));
606    }
607
608    #[test]
609    fn test_weave_edge_type_parse() {
610        assert_eq!(WeaveEdgeType::parse("~~"), Some(WeaveEdgeType::Conflict));
611        assert_eq!(WeaveEdgeType::parse(">>"), Some(WeaveEdgeType::Sequence));
612        assert_eq!(WeaveEdgeType::parse("!="), Some(WeaveEdgeType::Exclusion));
613        assert_eq!(WeaveEdgeType::parse("->"), None);
614        assert_eq!(WeaveEdgeType::parse("xx"), None);
615    }
616
617    #[test]
618    fn test_weave_edge_type_operator_roundtrip() {
619        for edge_type in [WeaveEdgeType::Conflict, WeaveEdgeType::Sequence, WeaveEdgeType::Exclusion] {
620            let op = edge_type.operator();
621            assert_eq!(WeaveEdgeType::parse(op), Some(edge_type));
622        }
623    }
624}
625
626/// Type of behavioral edge.
627#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
628pub enum WeaveEdgeType {
629    /// `~~` — tasks conflict if run simultaneously
630    Conflict,
631    /// `>>` — behavioral ordering (no data dependency)
632    Sequence,
633    /// `!=` — must NOT run on same agent
634    Exclusion,
635}
636
637impl WeaveEdgeType {
638    pub fn operator(&self) -> &'static str {
639        match self {
640            WeaveEdgeType::Conflict => "~~",
641            WeaveEdgeType::Sequence => ">>",
642            WeaveEdgeType::Exclusion => "!=",
643        }
644    }
645
646    pub fn parse(s: &str) -> Option<Self> {
647        match s.trim() {
648            "~~" => Some(WeaveEdgeType::Conflict),
649            ">>" => Some(WeaveEdgeType::Sequence),
650            "!=" => Some(WeaveEdgeType::Exclusion),
651            _ => None,
652        }
653    }
654}