1use std::fs;
2use std::path::{Path, PathBuf};
3use std::str::FromStr;
4
5use anyhow::{Context, Result, bail};
6use chrono::{DateTime, Duration, Utc};
7use cron::Schedule;
8use serde::{Deserialize, Serialize};
9
10use crate::project_registry::{self, RegisteredProject};
11
12use super::{
13 config, events, hierarchy, messaging, openclaw_contract, pause_marker_path, status,
14 team_config_path,
15};
16
17const DEFAULT_RECENT_EVENTS: usize = 8;
18const OPENCLAW_EVENT_KIND: &str = "batty.openclaw.projectEvent";
19const OPENCLAW_EVENT_SCHEMA_VERSION: u32 = 1;
20
21#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
22#[serde(rename_all = "snake_case")]
23pub enum OpenClawEventTopic {
24 Completion,
25 Review,
26 Stall,
27 Merge,
28 Escalation,
29 DeliveryFailure,
30 Lifecycle,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
34#[serde(rename_all = "camelCase")]
35pub struct OpenClawEventSubscription {
36 #[serde(default)]
37 pub topics: Vec<OpenClawEventTopic>,
38 #[serde(default)]
39 pub project_ids: Vec<String>,
40 #[serde(default)]
41 pub session_names: Vec<String>,
42 #[serde(default)]
43 pub roles: Vec<String>,
44 #[serde(default)]
45 pub task_ids: Vec<String>,
46 #[serde(default)]
47 pub event_types: Vec<String>,
48 #[serde(default)]
49 pub since_ts: Option<u64>,
50 #[serde(default)]
51 pub limit: Option<usize>,
52 #[serde(default)]
53 pub include_archived: bool,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
57#[serde(rename_all = "camelCase")]
58pub struct OpenClawEventIdentifiers {
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub role: Option<String>,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub task_id: Option<String>,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub sender: Option<String>,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub recipient: Option<String>,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
70#[serde(rename_all = "camelCase")]
71pub struct OpenClawProjectEventEnvelope {
72 pub kind: String,
73 pub schema_version: u32,
74 pub topic: OpenClawEventTopic,
75 pub event_type: String,
76 pub project_id: String,
77 pub project_name: String,
78 pub project_root: String,
79 pub team_name: String,
80 pub session_name: String,
81 pub ts: u64,
82 pub identifiers: OpenClawEventIdentifiers,
83 #[serde(skip_serializing_if = "Option::is_none")]
84 pub reason: Option<String>,
85 #[serde(skip_serializing_if = "Option::is_none")]
86 pub details: Option<String>,
87 #[serde(skip_serializing_if = "Option::is_none")]
88 pub action_type: Option<String>,
89 #[serde(skip_serializing_if = "Option::is_none")]
90 pub success: Option<bool>,
91 #[serde(skip_serializing_if = "Option::is_none")]
92 pub restart_count: Option<u32>,
93 #[serde(skip_serializing_if = "Option::is_none")]
94 pub load: Option<f64>,
95 #[serde(skip_serializing_if = "Option::is_none")]
96 pub uptime_secs: Option<u64>,
97 #[serde(skip_serializing_if = "Option::is_none")]
98 pub session_running: Option<bool>,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
102pub struct OpenClawProjectConfig {
103 #[serde(default = "default_version")]
104 pub version: u32,
105 pub project_name: String,
106 #[serde(default)]
107 pub batty_root: Option<String>,
108 #[serde(default)]
109 pub status: OpenClawStatusConfig,
110 #[serde(default)]
111 pub instruction: OpenClawInstructionConfig,
112 #[serde(default)]
113 pub follow_ups: Vec<OpenClawFollowUp>,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
117pub struct OpenClawStatusConfig {
118 #[serde(default = "default_recent_events")]
119 pub recent_events: usize,
120}
121
122impl Default for OpenClawStatusConfig {
123 fn default() -> Self {
124 Self {
125 recent_events: default_recent_events(),
126 }
127 }
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
131pub struct OpenClawInstructionConfig {
132 #[serde(default = "default_instruction_sender")]
133 pub sender: String,
134 #[serde(default = "default_allowed_roles")]
135 pub allowed_roles: Vec<String>,
136}
137
138impl Default for OpenClawInstructionConfig {
139 fn default() -> Self {
140 Self {
141 sender: default_instruction_sender(),
142 allowed_roles: default_allowed_roles(),
143 }
144 }
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
148pub struct OpenClawFollowUp {
149 pub name: String,
150 pub cron: String,
151 pub role: String,
152 pub message: String,
153 #[serde(default)]
154 pub summary: Option<String>,
155 #[serde(default)]
156 pub when: OpenClawFollowUpCondition,
157 #[serde(default)]
158 pub last_sent_at: Option<String>,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
162#[serde(rename_all = "snake_case")]
163pub enum OpenClawFollowUpCondition {
164 #[default]
165 Always,
166 ReviewQueueNonEmpty,
167 ActiveTasksNonEmpty,
168 UnhealthyMembersPresent,
169 TriageBacklogPresent,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct OpenClawStatusSummary {
174 pub project: String,
175 pub team: String,
176 pub running: bool,
177 pub paused: bool,
178 pub active_task_count: usize,
179 pub review_queue_count: usize,
180 pub unhealthy_members: Vec<String>,
181 pub triage_backlog_count: usize,
182 pub highlights: Vec<String>,
183 pub recent_events: Vec<String>,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
187pub struct FollowUpDispatch {
188 pub name: String,
189 pub role: String,
190 pub reason: String,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
194pub struct FollowUpRunSummary {
195 pub dispatched: Vec<FollowUpDispatch>,
196}
197
198pub trait SupervisorAdapter {
199 fn status_report(&self, project_root: &Path) -> Result<status::TeamStatusJsonReport>;
200 fn recent_events(&self, project_root: &Path, limit: usize) -> Result<Vec<events::TeamEvent>>;
201 fn send_instruction(
202 &self,
203 project_root: &Path,
204 sender: &str,
205 role: &str,
206 message: &str,
207 ) -> Result<()>;
208}
209
210pub struct BattySupervisorAdapter;
211
212impl SupervisorAdapter for BattySupervisorAdapter {
213 fn status_report(&self, project_root: &Path) -> Result<status::TeamStatusJsonReport> {
214 load_status_report(project_root)
215 }
216
217 fn recent_events(&self, project_root: &Path, limit: usize) -> Result<Vec<events::TeamEvent>> {
218 let mut events = events::read_events(&super::team_events_path(project_root))?;
219 if events.len() > limit {
220 events = events.split_off(events.len() - limit);
221 }
222 Ok(events)
223 }
224
225 fn send_instruction(
226 &self,
227 project_root: &Path,
228 sender: &str,
229 role: &str,
230 message: &str,
231 ) -> Result<()> {
232 messaging::send_message_as(project_root, Some(sender), role, message)
233 }
234}
235
236pub fn project_config_path(project_root: &Path) -> PathBuf {
237 project_root.join(".batty").join("openclaw.yaml")
238}
239
240pub fn register_project(project_root: &Path, force: bool) -> Result<PathBuf> {
241 let path = project_config_path(project_root);
242 if path.exists() && !force {
243 bail!(
244 "OpenClaw project config already exists at {} (use --force to overwrite)",
245 path.display()
246 );
247 }
248
249 let team_name = load_team_name(project_root).unwrap_or_else(|| "batty".to_string());
250 let config = OpenClawProjectConfig {
251 version: default_version(),
252 project_name: team_name.clone(),
253 batty_root: Some(project_root.display().to_string()),
254 status: OpenClawStatusConfig::default(),
255 instruction: OpenClawInstructionConfig::default(),
256 follow_ups: vec![
257 OpenClawFollowUp {
258 name: "review-queue-reminder".to_string(),
259 cron: "*/30 * * * *".to_string(),
260 role: "manager".to_string(),
261 message: "Review queue still has pending work. Check `batty openclaw status` and move the lane forward.".to_string(),
262 summary: Some("Review queue follow-up".to_string()),
263 when: OpenClawFollowUpCondition::ReviewQueueNonEmpty,
264 last_sent_at: None,
265 },
266 OpenClawFollowUp {
267 name: "architect-utilization-follow-up".to_string(),
268 cron: "0 * * * *".to_string(),
269 role: "architect".to_string(),
270 message: "Please review Batty status and unblock idle capacity with high-level direction if needed.".to_string(),
271 summary: Some("Architect utilization follow-up".to_string()),
272 when: OpenClawFollowUpCondition::TriageBacklogPresent,
273 last_sent_at: None,
274 },
275 ],
276 };
277 save_project_config(&path, &config)?;
278 Ok(path)
279}
280
281pub fn openclaw_status(project_root: &Path, json: bool) -> Result<()> {
282 let summary = openclaw_status_summary(project_root)?;
283 if json {
284 println!("{}", serde_json::to_string_pretty(&summary)?);
285 } else {
286 println!("{}", format_status_summary(&summary));
287 }
288 Ok(())
289}
290
291pub fn send_openclaw_instruction(project_root: &Path, role: &str, message: &str) -> Result<()> {
292 let config = load_project_config(project_root)?;
293 validate_instruction_role(&config, role)?;
294 BattySupervisorAdapter.send_instruction(
295 project_root,
296 &config.instruction.sender,
297 role,
298 message,
299 )?;
300 Ok(())
301}
302
303pub fn run_follow_ups(project_root: &Path, json: bool) -> Result<()> {
304 let summary = openclaw_follow_up_summary(project_root)?;
305 if json {
306 println!("{}", serde_json::to_string_pretty(&summary)?);
307 } else if summary.dispatched.is_empty() {
308 println!("No OpenClaw follow-ups were due.");
309 } else {
310 println!("OpenClaw follow-ups dispatched:");
311 for dispatch in &summary.dispatched {
312 println!(
313 "- {} -> {} ({})",
314 dispatch.name, dispatch.role, dispatch.reason
315 );
316 }
317 }
318 Ok(())
319}
320
321pub fn openclaw_status_summary(project_root: &Path) -> Result<OpenClawStatusSummary> {
322 build_status_summary(project_root, &BattySupervisorAdapter)
323}
324
325pub fn openclaw_follow_up_summary(project_root: &Path) -> Result<FollowUpRunSummary> {
326 run_follow_ups_with_adapter(project_root, &BattySupervisorAdapter, Utc::now())
327}
328
329pub fn watch_project_events(
330 project_id: &str,
331 subscription: &OpenClawEventSubscription,
332) -> Result<Vec<OpenClawProjectEventEnvelope>> {
333 watch_project_events_at(
334 &project_registry::registry_path()?,
335 project_id,
336 subscription,
337 )
338}
339
340pub fn watch_all_project_events(
341 subscription: &OpenClawEventSubscription,
342) -> Result<Vec<OpenClawProjectEventEnvelope>> {
343 watch_all_project_events_at(&project_registry::registry_path()?, subscription)
344}
345
346pub fn openclaw_events(
347 project_root: &Path,
348 _subscription: &OpenClawEventSubscription,
349 _project_id: Option<&str>,
350 _all_projects: bool,
351 _json: bool,
352) -> Result<()> {
353 let events_path = project_root
354 .join(".batty")
355 .join("team_config")
356 .join("events.jsonl");
357 if events_path.exists() {
358 let content = std::fs::read_to_string(&events_path)?;
359 print!("{content}");
360 }
361 Ok(())
362}
363
364pub fn openclaw_contract_descriptor() -> openclaw_contract::ContractDescriptor {
365 openclaw_contract::descriptor()
366}
367
368pub fn openclaw_team_status_contract(project_root: &Path) -> Result<openclaw_contract::TeamStatus> {
369 let report = load_status_report(project_root)?;
370 Ok(openclaw_contract::team_status_from_report(&report))
371}
372
373pub fn watch_project_event_contracts(
374 project_id: &str,
375 subscription: &OpenClawEventSubscription,
376) -> Result<Vec<openclaw_contract::ProjectEventEnvelope>> {
377 watch_project_events(project_id, subscription)?
378 .into_iter()
379 .map(legacy_envelope_to_contract)
380 .collect()
381}
382
383pub fn watch_all_event_contracts(
384 subscription: &OpenClawEventSubscription,
385) -> Result<Vec<openclaw_contract::ProjectEventEnvelope>> {
386 watch_all_project_events(subscription)?
387 .into_iter()
388 .map(legacy_envelope_to_contract)
389 .collect()
390}
391
392fn legacy_event_topic(topic: openclaw_contract::TeamEventTopic) -> OpenClawEventTopic {
393 match topic {
394 openclaw_contract::TeamEventTopic::Completion => OpenClawEventTopic::Completion,
395 openclaw_contract::TeamEventTopic::Review => OpenClawEventTopic::Review,
396 openclaw_contract::TeamEventTopic::Stall => OpenClawEventTopic::Stall,
397 openclaw_contract::TeamEventTopic::Merge => OpenClawEventTopic::Merge,
398 openclaw_contract::TeamEventTopic::Escalation => OpenClawEventTopic::Escalation,
399 openclaw_contract::TeamEventTopic::DeliveryFailure => OpenClawEventTopic::DeliveryFailure,
400 openclaw_contract::TeamEventTopic::Lifecycle => OpenClawEventTopic::Lifecycle,
401 }
402}
403
404fn contract_event_topic(topic: OpenClawEventTopic) -> openclaw_contract::TeamEventTopic {
405 match topic {
406 OpenClawEventTopic::Completion => openclaw_contract::TeamEventTopic::Completion,
407 OpenClawEventTopic::Review => openclaw_contract::TeamEventTopic::Review,
408 OpenClawEventTopic::Stall => openclaw_contract::TeamEventTopic::Stall,
409 OpenClawEventTopic::Merge => openclaw_contract::TeamEventTopic::Merge,
410 OpenClawEventTopic::Escalation => openclaw_contract::TeamEventTopic::Escalation,
411 OpenClawEventTopic::DeliveryFailure => openclaw_contract::TeamEventTopic::DeliveryFailure,
412 OpenClawEventTopic::Lifecycle => openclaw_contract::TeamEventTopic::Lifecycle,
413 }
414}
415
416fn legacy_envelope_to_contract(
417 envelope: OpenClawProjectEventEnvelope,
418) -> Result<openclaw_contract::ProjectEventEnvelope> {
419 let event_kind = openclaw_contract::event_kind_from_legacy_event_type(&envelope.event_type)
420 .with_context(|| {
421 format!(
422 "unsupported legacy OpenClaw event type '{}'",
423 envelope.event_type
424 )
425 })?;
426
427 Ok(openclaw_contract::ProjectEventEnvelope {
428 kind: openclaw_contract::TEAM_EVENT_KIND.to_string(),
429 schema_version: openclaw_contract::CONTRACT_SCHEMA_VERSION,
430 min_supported_schema_version: openclaw_contract::MIN_SUPPORTED_SCHEMA_VERSION,
431 project_id: envelope.project_id,
432 project_name: envelope.project_name,
433 project_root: envelope.project_root,
434 team_name: envelope.team_name,
435 session_name: envelope.session_name,
436 event: openclaw_contract::TeamEvent {
437 topic: contract_event_topic(envelope.topic),
438 event_kind,
439 ts: envelope.ts,
440 member_name: envelope.identifiers.role,
441 task_id: envelope.identifiers.task_id,
442 sender: envelope.identifiers.sender,
443 recipient: envelope.identifiers.recipient,
444 reason: envelope.reason,
445 detail: envelope.details,
446 action_type: envelope.action_type,
447 success: envelope.success,
448 restart_count: envelope.restart_count,
449 load: envelope.load,
450 uptime_secs: envelope.uptime_secs,
451 session_running: envelope.session_running,
452 },
453 })
454}
455
456fn load_project_config(project_root: &Path) -> Result<OpenClawProjectConfig> {
457 let path = project_config_path(project_root);
458 let content = fs::read_to_string(&path)
459 .with_context(|| format!("failed to read OpenClaw project config {}", path.display()))?;
460 let config = serde_yaml::from_str(&content)
461 .with_context(|| format!("failed to parse OpenClaw project config {}", path.display()))?;
462 Ok(config)
463}
464
465fn save_project_config(path: &Path, config: &OpenClawProjectConfig) -> Result<()> {
466 if let Some(parent) = path.parent() {
467 fs::create_dir_all(parent)
468 .with_context(|| format!("failed to create {}", parent.display()))?;
469 }
470 let content = serde_yaml::to_string(config)?;
471 fs::write(path, content).with_context(|| format!("failed to write {}", path.display()))?;
472 Ok(())
473}
474
475fn collect_project_events(
476 projects: impl IntoIterator<Item = RegisteredProject>,
477 subscription: &OpenClawEventSubscription,
478) -> Result<Vec<OpenClawProjectEventEnvelope>> {
479 let mut envelopes = Vec::new();
480
481 for project in projects {
482 if !subscription.project_ids.is_empty()
483 && !subscription
484 .project_ids
485 .iter()
486 .any(|id| id == &project.project_id)
487 {
488 continue;
489 }
490 if !subscription.session_names.is_empty()
491 && !subscription
492 .session_names
493 .iter()
494 .any(|name| name == &project.session_name)
495 {
496 continue;
497 }
498
499 let events_path = super::team_events_path(&project.project_root);
500 for event in events::read_events(&events_path)? {
501 let Some(envelope) = map_team_event_to_openclaw_event(&project, &event) else {
502 continue;
503 };
504 if subscription.matches(&envelope) {
505 envelopes.push(envelope);
506 }
507 }
508 }
509
510 envelopes.sort_by(|left, right| {
511 left.ts
512 .cmp(&right.ts)
513 .then_with(|| left.project_id.cmp(&right.project_id))
514 .then_with(|| left.event_type.cmp(&right.event_type))
515 });
516
517 if let Some(limit) = subscription.limit {
518 if envelopes.len() > limit {
519 envelopes = envelopes.split_off(envelopes.len() - limit);
520 }
521 }
522
523 Ok(envelopes)
524}
525
526impl OpenClawEventSubscription {
527 fn matches(&self, envelope: &OpenClawProjectEventEnvelope) -> bool {
528 if let Some(since_ts) = self.since_ts {
529 if envelope.ts < since_ts {
530 return false;
531 }
532 }
533 if !self.topics.is_empty() && !self.topics.iter().any(|topic| topic == &envelope.topic) {
534 return false;
535 }
536 if !self.event_types.is_empty()
537 && !self
538 .event_types
539 .iter()
540 .any(|event_type| event_type == &envelope.event_type)
541 {
542 return false;
543 }
544 if !self.roles.is_empty()
545 && !envelope
546 .identifiers
547 .role
548 .as_ref()
549 .is_some_and(|role| self.roles.iter().any(|candidate| candidate == role))
550 {
551 return false;
552 }
553 if !self.task_ids.is_empty()
554 && !envelope
555 .identifiers
556 .task_id
557 .as_ref()
558 .is_some_and(|task_id| self.task_ids.iter().any(|candidate| candidate == task_id))
559 {
560 return false;
561 }
562 true
563 }
564}
565
566fn map_team_event_to_openclaw_event(
567 project: &RegisteredProject,
568 event: &events::TeamEvent,
569) -> Option<OpenClawProjectEventEnvelope> {
570 let (topic, event_type) = public_event_contract(event)?;
571 Some(OpenClawProjectEventEnvelope {
572 kind: OPENCLAW_EVENT_KIND.to_string(),
573 schema_version: OPENCLAW_EVENT_SCHEMA_VERSION,
574 topic,
575 event_type: event_type.to_string(),
576 project_id: project.project_id.clone(),
577 project_name: project.name.clone(),
578 project_root: project.project_root.display().to_string(),
579 team_name: project.team_name.clone(),
580 session_name: project.session_name.clone(),
581 ts: event.ts,
582 identifiers: OpenClawEventIdentifiers {
583 role: event.role.clone(),
584 task_id: event.task.clone(),
585 sender: event.from.clone(),
586 recipient: event.recipient.clone().or_else(|| event.to.clone()),
587 },
588 reason: event.reason.clone(),
589 details: event.details.clone(),
590 action_type: event.action_type.clone(),
591 success: event.success,
592 restart_count: event.restart_count,
593 load: event.load,
594 uptime_secs: event.uptime_secs,
595 session_running: event.session_running,
596 })
597}
598
599fn public_event_contract(event: &events::TeamEvent) -> Option<(OpenClawEventTopic, &'static str)> {
600 openclaw_contract::contract_for_internal_event(event)
601 .map(|(topic, event_kind)| (legacy_event_topic(topic), event_kind.legacy_event_type()))
602}
603
604fn watch_project_events_at(
605 registry_path: &Path,
606 project_id: &str,
607 subscription: &OpenClawEventSubscription,
608) -> Result<Vec<OpenClawProjectEventEnvelope>> {
609 let project = project_registry::get_project_at(registry_path, project_id)?
610 .with_context(|| format!("project '{project_id}' is not registered"))?;
611 if !project.policy_flags.allow_openclaw_supervision {
612 bail!("project '{project_id}' does not allow OpenClaw supervision");
613 }
614 if project.policy_flags.archived && !subscription.include_archived {
615 bail!("project '{project_id}' is archived");
616 }
617
618 collect_project_events(std::iter::once(project), subscription)
619}
620
621fn watch_all_project_events_at(
622 registry_path: &Path,
623 subscription: &OpenClawEventSubscription,
624) -> Result<Vec<OpenClawProjectEventEnvelope>> {
625 let projects = project_registry::load_registry_at(registry_path)?
626 .projects
627 .into_iter()
628 .filter(|project| project.policy_flags.allow_openclaw_supervision)
629 .filter(|project| subscription.include_archived || !project.policy_flags.archived)
630 .collect::<Vec<_>>();
631 collect_project_events(projects, subscription)
632}
633
634fn build_status_summary<A: SupervisorAdapter>(
635 project_root: &Path,
636 adapter: &A,
637) -> Result<OpenClawStatusSummary> {
638 let config = load_project_config(project_root)?;
639 let report = adapter.status_report(project_root)?;
640 let events = adapter.recent_events(project_root, config.status.recent_events)?;
641 Ok(summarize_status_report(
642 &report,
643 &events,
644 &config.project_name,
645 ))
646}
647
648fn summarize_status_report(
649 report: &status::TeamStatusJsonReport,
650 events: &[events::TeamEvent],
651 project_name: &str,
652) -> OpenClawStatusSummary {
653 let mut highlights = Vec::new();
654 if !report.running {
655 highlights.push("Batty daemon is not running".to_string());
656 }
657 if report.paused {
658 highlights.push("Batty is paused".to_string());
659 }
660 if !report.health.unhealthy_members.is_empty() {
661 highlights.push(format!(
662 "Unhealthy members: {}",
663 report.health.unhealthy_members.join(", ")
664 ));
665 }
666 if !report.review_queue.is_empty() {
667 highlights.push(format!(
668 "Review queue has {} task(s)",
669 report.review_queue.len()
670 ));
671 }
672 if report.health.triage_backlog_count > 0 {
673 highlights.push(format!(
674 "Triage backlog: {} message(s)",
675 report.health.triage_backlog_count
676 ));
677 }
678
679 OpenClawStatusSummary {
680 project: project_name.to_string(),
681 team: report.team.clone(),
682 running: report.running,
683 paused: report.paused,
684 active_task_count: report.active_tasks.len(),
685 review_queue_count: report.review_queue.len(),
686 unhealthy_members: report.health.unhealthy_members.clone(),
687 triage_backlog_count: report.health.triage_backlog_count,
688 highlights,
689 recent_events: events.iter().rev().map(format_event_summary).collect(),
690 }
691}
692
693fn format_status_summary(summary: &OpenClawStatusSummary) -> String {
694 let mut lines = vec![
695 format!("OpenClaw Project: {}", summary.project),
696 format!("Batty Team: {}", summary.team),
697 format!(
698 "State: {}{}",
699 if summary.running {
700 "running"
701 } else {
702 "stopped"
703 },
704 if summary.paused { " (paused)" } else { "" }
705 ),
706 format!(
707 "Queues: active={} review={} triage_backlog={}",
708 summary.active_task_count, summary.review_queue_count, summary.triage_backlog_count
709 ),
710 ];
711
712 if summary.highlights.is_empty() {
713 lines.push("Highlights: none".to_string());
714 } else {
715 lines.push("Highlights:".to_string());
716 for highlight in &summary.highlights {
717 lines.push(format!("- {highlight}"));
718 }
719 }
720
721 if !summary.recent_events.is_empty() {
722 lines.push("Recent events:".to_string());
723 for event in &summary.recent_events {
724 lines.push(format!("- {event}"));
725 }
726 }
727
728 lines.join("\n")
729}
730
731fn validate_instruction_role(config: &OpenClawProjectConfig, role: &str) -> Result<()> {
732 if config
733 .instruction
734 .allowed_roles
735 .iter()
736 .any(|item| item == role)
737 {
738 return Ok(());
739 }
740
741 bail!(
742 "OpenClaw instructions may only target these Batty roles: {}",
743 config.instruction.allowed_roles.join(", ")
744 )
745}
746
747fn run_follow_ups_with_adapter<A: SupervisorAdapter>(
748 project_root: &Path,
749 adapter: &A,
750 now: DateTime<Utc>,
751) -> Result<FollowUpRunSummary> {
752 let path = project_config_path(project_root);
753 let mut config = load_project_config(project_root)?;
754 let report = adapter.status_report(project_root)?;
755 let allowed_roles = config.instruction.allowed_roles.clone();
756 let sender = config.instruction.sender.clone();
757 let mut dispatched = Vec::new();
758
759 for follow_up in &mut config.follow_ups {
760 if !allowed_roles.iter().any(|role| role == &follow_up.role) {
761 bail!(
762 "OpenClaw instructions may only target these Batty roles: {}",
763 allowed_roles.join(", ")
764 );
765 }
766 if !follow_up_condition_matches(follow_up, &report) {
767 continue;
768 }
769 if !is_follow_up_due(follow_up, now)? {
770 continue;
771 }
772
773 adapter.send_instruction(project_root, &sender, &follow_up.role, &follow_up.message)?;
774 follow_up.last_sent_at = Some(now.to_rfc3339());
775 dispatched.push(FollowUpDispatch {
776 name: follow_up.name.clone(),
777 role: follow_up.role.clone(),
778 reason: follow_up
779 .summary
780 .clone()
781 .unwrap_or_else(|| format!("{:?}", follow_up.when)),
782 });
783 }
784
785 if !dispatched.is_empty() {
786 save_project_config(&path, &config)?;
787 }
788
789 Ok(FollowUpRunSummary { dispatched })
790}
791
792fn follow_up_condition_matches(
793 follow_up: &OpenClawFollowUp,
794 report: &status::TeamStatusJsonReport,
795) -> bool {
796 match follow_up.when {
797 OpenClawFollowUpCondition::Always => true,
798 OpenClawFollowUpCondition::ReviewQueueNonEmpty => !report.review_queue.is_empty(),
799 OpenClawFollowUpCondition::ActiveTasksNonEmpty => !report.active_tasks.is_empty(),
800 OpenClawFollowUpCondition::UnhealthyMembersPresent => {
801 !report.health.unhealthy_members.is_empty()
802 }
803 OpenClawFollowUpCondition::TriageBacklogPresent => report.health.triage_backlog_count > 0,
804 }
805}
806
807fn is_follow_up_due(follow_up: &OpenClawFollowUp, now: DateTime<Utc>) -> Result<bool> {
808 let normalized = normalize_cron(&follow_up.cron);
809 let schedule = Schedule::from_str(&normalized)
810 .with_context(|| format!("invalid OpenClaw follow-up cron '{}'", follow_up.cron))?;
811 let reference = follow_up
812 .last_sent_at
813 .as_deref()
814 .and_then(|value| DateTime::parse_from_rfc3339(value).ok())
815 .map(|value| value.with_timezone(&Utc))
816 .unwrap_or_else(|| now - Duration::days(1));
817 let Some(next_run) = schedule.after(&reference).next() else {
818 return Ok(false);
819 };
820 Ok(next_run <= now)
821}
822
823fn normalize_cron(expr: &str) -> String {
824 let trimmed = expr.trim();
825 if trimmed.split_whitespace().count() == 5 {
826 format!("0 {trimmed}")
827 } else {
828 trimmed.to_string()
829 }
830}
831
832fn format_event_summary(event: &events::TeamEvent) -> String {
833 match event.event.as_str() {
834 "task_completed" => format!(
835 "task {} completed by {}",
836 event.task.as_deref().unwrap_or("?"),
837 event.role.as_deref().unwrap_or("unknown")
838 ),
839 "task_escalated" => format!(
840 "task {} escalated{}",
841 event.task.as_deref().unwrap_or("?"),
842 event
843 .reason
844 .as_deref()
845 .map(|reason| format!(": {reason}"))
846 .unwrap_or_default()
847 ),
848 "task_assigned" => format!(
849 "task {} assigned to {}",
850 event.task.as_deref().unwrap_or("?"),
851 event.role.as_deref().unwrap_or("unknown")
852 ),
853 "daemon_started" => "daemon started".to_string(),
854 "daemon_stopped" => "daemon stopped".to_string(),
855 other => {
856 let mut parts = vec![other.replace('_', " ")];
857 if let Some(task) = event.task.as_deref() {
858 parts.push(format!("#{task}"));
859 }
860 if let Some(role) = event.role.as_deref() {
861 parts.push(format!("({role})"));
862 }
863 parts.join(" ")
864 }
865 }
866}
867
868fn default_version() -> u32 {
869 1
870}
871
872fn default_recent_events() -> usize {
873 DEFAULT_RECENT_EVENTS
874}
875
876fn default_instruction_sender() -> String {
877 "daemon".to_string()
878}
879
880fn default_allowed_roles() -> Vec<String> {
881 vec!["architect".to_string(), "manager".to_string()]
882}
883
884fn load_team_name(project_root: &Path) -> Option<String> {
885 let config_path = team_config_path(project_root);
886 config::TeamConfig::load(&config_path)
887 .ok()
888 .map(|config| config.name)
889}
890
891fn load_status_report(project_root: &Path) -> Result<status::TeamStatusJsonReport> {
892 let config_path = team_config_path(project_root);
893 if !config_path.exists() {
894 bail!("no team config found at {}", config_path.display());
895 }
896
897 let team_config = config::TeamConfig::load(&config_path)?;
898 let members = hierarchy::resolve_hierarchy(&team_config)?;
899 let session_name = format!("batty-{}", team_config.name);
900 let session_running = crate::tmux::session_exists(&session_name);
901 let runtime_statuses = if session_running {
902 status::list_runtime_member_statuses(&session_name).unwrap_or_default()
903 } else {
904 std::collections::HashMap::new()
905 };
906 let pending_inbox_counts = status::pending_inbox_counts(project_root, &members);
907 let triage_backlog_counts = status::triage_backlog_counts(project_root, &members);
908 let owned_task_buckets = status::owned_task_buckets(project_root, &members);
909 let branch_mismatches = status::branch_mismatch_by_member(project_root, &members);
910 let worktree_staleness = status::worktree_staleness_by_member(project_root, &members);
911 let agent_health = status::agent_health_by_member(project_root, &members);
912 let paused = pause_marker_path(project_root).exists();
913 let rows = status::build_team_status_rows(
914 &members,
915 session_running,
916 &runtime_statuses,
917 &pending_inbox_counts,
918 &triage_backlog_counts,
919 &owned_task_buckets,
920 &branch_mismatches,
921 &worktree_staleness,
922 &agent_health,
923 );
924 let workflow_metrics =
925 status::workflow_metrics_section(project_root, &members).map(|(_, metrics)| metrics);
926 let watchdog = status::load_watchdog_status(project_root, session_running);
927 let (active_tasks, review_queue) =
928 status::board_status_task_queues(project_root).unwrap_or_default();
929
930 Ok(status::build_team_status_json_report(
931 status::TeamStatusJsonReportInput {
932 team: team_config.name,
933 session: session_name,
934 session_running,
935 paused,
936 watchdog,
937 workflow_metrics,
938 active_tasks,
939 review_queue,
940 optional_subsystems: None,
941 engineer_profiles: None,
942 members: rows,
943 },
944 ))
945}
946
947#[cfg(test)]
948mod tests {
949 use std::path::{Path, PathBuf};
950
951 use super::*;
952 use crate::project_registry::{
953 ProjectPolicyFlags, ProjectRegistration, load_registry_at, register_project_at,
954 };
955
956 struct FakeAdapter {
957 report: status::TeamStatusJsonReport,
958 events: Vec<events::TeamEvent>,
959 sent: std::sync::Mutex<Vec<(String, String, String)>>,
960 }
961
962 impl SupervisorAdapter for FakeAdapter {
963 fn status_report(&self, _project_root: &Path) -> Result<status::TeamStatusJsonReport> {
964 Ok(self.report.clone())
965 }
966
967 fn recent_events(
968 &self,
969 _project_root: &Path,
970 limit: usize,
971 ) -> Result<Vec<events::TeamEvent>> {
972 Ok(self
973 .events
974 .iter()
975 .rev()
976 .take(limit)
977 .cloned()
978 .collect::<Vec<_>>()
979 .into_iter()
980 .rev()
981 .collect())
982 }
983
984 fn send_instruction(
985 &self,
986 _project_root: &Path,
987 sender: &str,
988 role: &str,
989 message: &str,
990 ) -> Result<()> {
991 self.sent.lock().unwrap().push((
992 sender.to_string(),
993 role.to_string(),
994 message.to_string(),
995 ));
996 Ok(())
997 }
998 }
999
1000 fn sample_report() -> status::TeamStatusJsonReport {
1001 status::TeamStatusJsonReport {
1002 team: "batty".to_string(),
1003 session: "batty-batty".to_string(),
1004 running: true,
1005 paused: false,
1006 watchdog: status::WatchdogStatus {
1007 state: "running".to_string(),
1008 restart_count: 0,
1009 current_backoff_secs: None,
1010 last_exit_reason: None,
1011 },
1012 health: status::TeamStatusHealth {
1013 session_running: true,
1014 paused: false,
1015 member_count: 3,
1016 active_member_count: 2,
1017 pending_inbox_count: 0,
1018 triage_backlog_count: 1,
1019 unhealthy_members: vec!["eng-1-2".to_string()],
1020 },
1021 workflow_metrics: None,
1022 active_tasks: vec![status::StatusTaskEntry {
1023 id: 12,
1024 title: "Active".to_string(),
1025 status: "in-progress".to_string(),
1026 priority: "high".to_string(),
1027 claimed_by: Some("eng-1-1".to_string()),
1028 review_owner: None,
1029 blocked_on: None,
1030 branch: None,
1031 worktree_path: None,
1032 commit: None,
1033 branch_mismatch: None,
1034 next_action: None,
1035 test_summary: None,
1036 }],
1037 review_queue: vec![status::StatusTaskEntry {
1038 id: 13,
1039 title: "Review".to_string(),
1040 status: "review".to_string(),
1041 priority: "medium".to_string(),
1042 claimed_by: None,
1043 review_owner: Some("manager".to_string()),
1044 blocked_on: None,
1045 branch: None,
1046 worktree_path: None,
1047 commit: None,
1048 branch_mismatch: None,
1049 next_action: None,
1050 test_summary: None,
1051 }],
1052 optional_subsystems: None,
1053 engineer_profiles: None,
1054 members: Vec::new(),
1055 }
1056 }
1057
1058 fn sample_config() -> OpenClawProjectConfig {
1059 OpenClawProjectConfig {
1060 version: 1,
1061 project_name: "batty".to_string(),
1062 batty_root: Some("/tmp/batty".to_string()),
1063 status: OpenClawStatusConfig::default(),
1064 instruction: OpenClawInstructionConfig::default(),
1065 follow_ups: vec![OpenClawFollowUp {
1066 name: "review".to_string(),
1067 cron: "*/30 * * * *".to_string(),
1068 role: "manager".to_string(),
1069 message: "Review queue still has pending work.".to_string(),
1070 summary: Some("Review queue follow-up".to_string()),
1071 when: OpenClawFollowUpCondition::ReviewQueueNonEmpty,
1072 last_sent_at: None,
1073 }],
1074 }
1075 }
1076
1077 fn fixture_root(name: &str) -> PathBuf {
1078 PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1079 .join("tests")
1080 .join("fixtures")
1081 .join("openclaw")
1082 .join(name)
1083 }
1084
1085 fn copy_fixture_project(name: &str) -> tempfile::TempDir {
1086 let tmp = tempfile::tempdir().unwrap();
1087 copy_dir_recursive(&fixture_root(name), tmp.path());
1088 let snapshot = tmp.path().join("_batty");
1089 if snapshot.exists() {
1090 fs::rename(snapshot, tmp.path().join(".batty")).unwrap();
1091 }
1092 tmp
1093 }
1094
1095 fn copy_dir_recursive(src: &Path, dst: &Path) {
1096 fs::create_dir_all(dst).unwrap();
1097 for entry in fs::read_dir(src).unwrap() {
1098 let entry = entry.unwrap();
1099 let src_path = entry.path();
1100 let dst_path = dst.join(entry.file_name());
1101 if entry.file_type().unwrap().is_dir() {
1102 copy_dir_recursive(&src_path, &dst_path);
1103 } else {
1104 if let Some(parent) = dst_path.parent() {
1105 fs::create_dir_all(parent).unwrap();
1106 }
1107 fs::copy(&src_path, &dst_path).unwrap();
1108 }
1109 }
1110 }
1111
1112 fn register_fixture_project(
1113 registry_path: &Path,
1114 project_id: &str,
1115 name: &str,
1116 root: &Path,
1117 team_name: &str,
1118 session_name: &str,
1119 ) {
1120 register_project_at(
1121 registry_path,
1122 ProjectRegistration {
1123 project_id: project_id.to_string(),
1124 name: name.to_string(),
1125 aliases: Vec::new(),
1126 project_root: root.to_path_buf(),
1127 board_dir: root.join(".batty").join("team_config").join("board"),
1128 team_name: team_name.to_string(),
1129 session_name: session_name.to_string(),
1130 channel_bindings: Vec::new(),
1131 owner: None,
1132 tags: vec!["openclaw".to_string()],
1133 policy_flags: ProjectPolicyFlags {
1134 allow_openclaw_supervision: true,
1135 allow_cross_project_routing: false,
1136 allow_shared_service_routing: false,
1137 archived: false,
1138 },
1139 },
1140 )
1141 .unwrap();
1142 }
1143
1144 #[test]
1145 fn register_project_writes_skeleton_config() {
1146 let tmp = tempfile::tempdir().unwrap();
1147 fs::create_dir_all(tmp.path().join(".batty").join("team_config")).unwrap();
1148 fs::write(
1149 tmp.path()
1150 .join(".batty")
1151 .join("team_config")
1152 .join("team.yaml"),
1153 "name: demo\nroles: []\n",
1154 )
1155 .unwrap();
1156
1157 let path = register_project(tmp.path(), false).unwrap();
1158 let loaded = load_project_config(tmp.path()).unwrap();
1159
1160 assert_eq!(path, project_config_path(tmp.path()));
1161 assert_eq!(loaded.project_name, "demo");
1162 assert_eq!(
1163 loaded.instruction.allowed_roles,
1164 vec!["architect", "manager"]
1165 );
1166 assert_eq!(loaded.follow_ups.len(), 2);
1167 }
1168
1169 #[test]
1170 fn summarize_status_report_returns_operator_friendly_highlights() {
1171 let summary = summarize_status_report(
1172 &sample_report(),
1173 &[events::TeamEvent::task_completed("eng-1-2", Some("471"))],
1174 "batty",
1175 );
1176
1177 assert_eq!(summary.project, "batty");
1178 assert_eq!(summary.active_task_count, 1);
1179 assert_eq!(summary.review_queue_count, 1);
1180 assert!(
1181 summary
1182 .highlights
1183 .iter()
1184 .any(|item| item.contains("Unhealthy members"))
1185 );
1186 assert!(summary.recent_events[0].contains("task 471 completed"));
1187 }
1188
1189 #[test]
1190 fn validate_instruction_role_rejects_out_of_scope_roles() {
1191 let error = validate_instruction_role(&sample_config(), "eng-1-1")
1192 .unwrap_err()
1193 .to_string();
1194 assert!(error.contains("architect, manager"));
1195 }
1196
1197 #[test]
1198 fn follow_up_due_when_condition_matches_and_cron_elapsed() {
1199 let tmp = tempfile::tempdir().unwrap();
1200 let path = project_config_path(tmp.path());
1201 save_project_config(&path, &sample_config()).unwrap();
1202
1203 let adapter = FakeAdapter {
1204 report: sample_report(),
1205 events: Vec::new(),
1206 sent: std::sync::Mutex::new(Vec::new()),
1207 };
1208 let now = DateTime::parse_from_rfc3339("2026-04-06T13:30:00Z")
1209 .unwrap()
1210 .with_timezone(&Utc);
1211
1212 let summary = run_follow_ups_with_adapter(tmp.path(), &adapter, now).unwrap();
1213
1214 assert_eq!(summary.dispatched.len(), 1);
1215 let sent = adapter.sent.lock().unwrap();
1216 assert_eq!(sent[0].0, "daemon");
1217 assert_eq!(sent[0].1, "manager");
1218 assert!(sent[0].2.contains("Review queue"));
1219 }
1220
1221 #[test]
1222 fn follow_up_skips_when_condition_does_not_match() {
1223 let tmp = tempfile::tempdir().unwrap();
1224 let mut config = sample_config();
1225 config.follow_ups[0].when = OpenClawFollowUpCondition::UnhealthyMembersPresent;
1226 let path = project_config_path(tmp.path());
1227 save_project_config(&path, &config).unwrap();
1228
1229 let mut report = sample_report();
1230 report.health.unhealthy_members.clear();
1231 let adapter = FakeAdapter {
1232 report,
1233 events: Vec::new(),
1234 sent: std::sync::Mutex::new(Vec::new()),
1235 };
1236 let now = DateTime::parse_from_rfc3339("2026-04-06T13:30:00Z")
1237 .unwrap()
1238 .with_timezone(&Utc);
1239
1240 let summary = run_follow_ups_with_adapter(tmp.path(), &adapter, now).unwrap();
1241
1242 assert!(summary.dispatched.is_empty());
1243 assert!(adapter.sent.lock().unwrap().is_empty());
1244 }
1245
1246 #[test]
1247 fn format_status_summary_includes_highlights_and_events() {
1248 let rendered = format_status_summary(&OpenClawStatusSummary {
1249 project: "batty".to_string(),
1250 team: "batty".to_string(),
1251 running: true,
1252 paused: false,
1253 active_task_count: 1,
1254 review_queue_count: 2,
1255 unhealthy_members: vec!["eng-1-2".to_string()],
1256 triage_backlog_count: 1,
1257 highlights: vec!["Review queue has 2 task(s)".to_string()],
1258 recent_events: vec!["task 471 completed by eng-1-2".to_string()],
1259 });
1260
1261 assert!(rendered.contains("OpenClaw Project: batty"));
1262 assert!(rendered.contains("Review queue has 2 task(s)"));
1263 assert!(rendered.contains("task 471 completed by eng-1-2"));
1264 }
1265
1266 #[test]
1267 fn watch_project_events_maps_internal_events_to_stable_public_contract() {
1268 let degraded = copy_fixture_project("degraded");
1269 let registry_path = degraded.path().join("registry.json");
1270 register_fixture_project(
1271 ®istry_path,
1272 "fixture-degraded",
1273 "Fixture Degraded",
1274 degraded.path(),
1275 "fixture-team",
1276 "batty-fixture-team-degraded",
1277 );
1278
1279 let events = watch_project_events_at(
1280 ®istry_path,
1281 "fixture-degraded",
1282 &OpenClawEventSubscription::default(),
1283 )
1284 .unwrap();
1285
1286 assert_eq!(events.len(), 4);
1287 assert_eq!(events[0].kind, OPENCLAW_EVENT_KIND);
1288 assert_eq!(events[0].schema_version, OPENCLAW_EVENT_SCHEMA_VERSION);
1289 assert_eq!(events[0].project_id, "fixture-degraded");
1290 assert_eq!(events[0].team_name, "fixture-team");
1291 assert_eq!(events[0].session_name, "batty-fixture-team-degraded");
1292 assert_eq!(events[0].topic, OpenClawEventTopic::Lifecycle);
1293 assert_eq!(events[0].event_type, "session.started");
1294 assert_eq!(events[1].topic, OpenClawEventTopic::Lifecycle);
1295 assert_eq!(events[1].event_type, "agent.health_changed");
1296 assert_eq!(events[1].identifiers.role.as_deref(), Some("eng-1-1"));
1297 assert_eq!(events[2].topic, OpenClawEventTopic::Escalation);
1298 assert_eq!(events[2].event_type, "task.escalated");
1299 assert_eq!(events[2].identifiers.task_id.as_deref(), Some("449"));
1300 assert!(
1301 events[2]
1302 .reason
1303 .as_deref()
1304 .is_some_and(|reason| reason.contains("wording drift"))
1305 );
1306 assert_eq!(events[3].topic, OpenClawEventTopic::Completion);
1307 assert_eq!(events[3].event_type, "task.completed");
1308 assert_eq!(events[3].identifiers.task_id.as_deref(), Some("448"));
1309 }
1310
1311 #[test]
1312 fn watch_all_project_events_keeps_cross_project_streams_separate() {
1313 let degraded = copy_fixture_project("degraded");
1314 let running = copy_fixture_project("running");
1315 let registry_dir = tempfile::tempdir().unwrap();
1316 let registry_path = registry_dir.path().join("project-registry.json");
1317
1318 register_fixture_project(
1319 ®istry_path,
1320 "fixture-degraded",
1321 "Fixture Degraded",
1322 degraded.path(),
1323 "fixture-team-degraded",
1324 "batty-fixture-team-degraded",
1325 );
1326 register_fixture_project(
1327 ®istry_path,
1328 "fixture-running",
1329 "Fixture Running",
1330 running.path(),
1331 "fixture-team-running",
1332 "batty-fixture-team-running",
1333 );
1334
1335 let all_events =
1336 watch_all_project_events_at(®istry_path, &OpenClawEventSubscription::default())
1337 .unwrap();
1338
1339 assert_eq!(
1340 all_events
1341 .iter()
1342 .map(|event| event.project_id.as_str())
1343 .collect::<Vec<_>>(),
1344 vec![
1345 "fixture-running",
1346 "fixture-running",
1347 "fixture-degraded",
1348 "fixture-degraded",
1349 "fixture-degraded",
1350 "fixture-degraded",
1351 ]
1352 );
1353 assert!(all_events.iter().all(|event| {
1354 (event.project_id == "fixture-running"
1355 && event.session_name == "batty-fixture-team-running"
1356 && event.team_name == "fixture-team-running")
1357 || (event.project_id == "fixture-degraded"
1358 && event.session_name == "batty-fixture-team-degraded"
1359 && event.team_name == "fixture-team-degraded")
1360 }));
1361
1362 let degraded_only = watch_all_project_events_at(
1363 ®istry_path,
1364 &OpenClawEventSubscription {
1365 project_ids: vec!["fixture-degraded".to_string()],
1366 ..OpenClawEventSubscription::default()
1367 },
1368 )
1369 .unwrap();
1370 assert_eq!(degraded_only.len(), 4);
1371 assert!(
1372 degraded_only
1373 .iter()
1374 .all(|event| event.project_id == "fixture-degraded")
1375 );
1376
1377 let completion_only = watch_all_project_events_at(
1378 ®istry_path,
1379 &OpenClawEventSubscription {
1380 topics: vec![OpenClawEventTopic::Completion],
1381 ..OpenClawEventSubscription::default()
1382 },
1383 )
1384 .unwrap();
1385 assert_eq!(completion_only.len(), 2);
1386 assert_eq!(completion_only[0].project_id, "fixture-running");
1387 assert_eq!(completion_only[0].event_type, "task.completed");
1388 assert_eq!(completion_only[1].project_id, "fixture-degraded");
1389 assert_eq!(completion_only[1].event_type, "task.completed");
1390 }
1391
1392 #[test]
1393 fn watch_all_project_events_respects_topic_task_and_limit_filters() {
1394 let degraded = copy_fixture_project("degraded");
1395 let running = copy_fixture_project("running");
1396 let registry_dir = tempfile::tempdir().unwrap();
1397 let registry_path = registry_dir.path().join("project-registry.json");
1398
1399 register_fixture_project(
1400 ®istry_path,
1401 "fixture-degraded",
1402 "Fixture Degraded",
1403 degraded.path(),
1404 "fixture-team-degraded",
1405 "batty-fixture-team-degraded",
1406 );
1407 register_fixture_project(
1408 ®istry_path,
1409 "fixture-running",
1410 "Fixture Running",
1411 running.path(),
1412 "fixture-team-running",
1413 "batty-fixture-team-running",
1414 );
1415
1416 let filtered = watch_all_project_events_at(
1417 ®istry_path,
1418 &OpenClawEventSubscription {
1419 topics: vec![
1420 OpenClawEventTopic::Escalation,
1421 OpenClawEventTopic::Completion,
1422 ],
1423 task_ids: vec!["449".to_string()],
1424 limit: Some(1),
1425 since_ts: Some(1_712_402_000),
1426 ..OpenClawEventSubscription::default()
1427 },
1428 )
1429 .unwrap();
1430
1431 assert_eq!(filtered.len(), 1);
1432 assert_eq!(filtered[0].project_id, "fixture-degraded");
1433 assert_eq!(filtered[0].event_type, "task.escalated");
1434 assert_eq!(filtered[0].identifiers.task_id.as_deref(), Some("449"));
1435 assert_eq!(load_registry_at(®istry_path).unwrap().projects.len(), 2);
1436 }
1437
1438 #[test]
1439 fn openclaw_team_status_contract_maps_fixture_to_stable_dto() {
1440 let report = openclaw_contract_descriptor();
1441 let project = copy_fixture_project("degraded");
1442
1443 let status = openclaw_team_status_contract(project.path()).unwrap();
1444
1445 assert_eq!(report.kind, openclaw_contract::CONTRACT_DESCRIPTOR_KIND);
1446 assert_eq!(status.kind, openclaw_contract::TEAM_STATUS_KIND);
1447 assert_eq!(status.team_name, "fixture-team");
1448 assert_eq!(status.lifecycle, openclaw_contract::TeamLifecycle::Stopped);
1449 assert_eq!(status.pipeline.active_task_count, 1);
1450 assert_eq!(status.pipeline.review_queue_count, 1);
1451 assert!(
1452 status
1453 .approval_surface
1454 .human_only_decisions
1455 .contains(&openclaw_contract::HumanDecisionKind::MergeDisposition)
1456 );
1457 }
1458
1459 #[test]
1460 fn legacy_event_contract_conversion_uses_explicit_event_kinds() {
1461 let degraded = copy_fixture_project("degraded");
1462 let registry_path = degraded.path().join("registry.json");
1463 register_fixture_project(
1464 ®istry_path,
1465 "fixture-degraded",
1466 "Fixture Degraded",
1467 degraded.path(),
1468 "fixture-team",
1469 "batty-fixture-team-degraded",
1470 );
1471
1472 let events = watch_project_events_at(
1473 ®istry_path,
1474 "fixture-degraded",
1475 &OpenClawEventSubscription::default(),
1476 )
1477 .unwrap();
1478
1479 let converted = legacy_envelope_to_contract(events[2].clone()).unwrap();
1480
1481 assert_eq!(converted.kind, openclaw_contract::TEAM_EVENT_KIND);
1482 assert_eq!(
1483 converted.event.event_kind,
1484 openclaw_contract::TeamEventKind::TaskEscalated
1485 );
1486 assert_eq!(
1487 converted.event.topic,
1488 openclaw_contract::TeamEventTopic::Escalation
1489 );
1490 assert_eq!(converted.event.task_id.as_deref(), Some("449"));
1491 }
1492}