Skip to main content

batty_cli/team/
events.rs

1//! Structured JSONL event stream for external consumers.
2
3use std::fs::{self, OpenOptions};
4use std::io::{BufWriter, Write};
5use std::path::{Path, PathBuf};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10
11use super::DEFAULT_EVENT_LOG_MAX_BYTES;
12
13/// Bundled parameters for merge-confidence scoring events.
14pub struct MergeConfidenceInfo<'a> {
15    pub engineer: &'a str,
16    pub task: &'a str,
17    pub confidence: f64,
18    pub files_changed: usize,
19    pub lines_changed: usize,
20    pub has_migrations: bool,
21    pub has_config_changes: bool,
22    pub rename_count: usize,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26
27pub struct TeamEvent {
28    pub event: String,
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub role: Option<String>,
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub task: Option<String>,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub recipient: Option<String>,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub from: Option<String>,
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub to: Option<String>,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub restart: Option<bool>,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub restart_count: Option<u32>,
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub load: Option<f64>,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub working_members: Option<u32>,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub total_members: Option<u32>,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub session_running: Option<bool>,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub reason: Option<String>,
53    #[serde(skip_serializing_if = "Option::is_none")]
54    pub step: Option<String>,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub error: Option<String>,
57    #[serde(skip_serializing_if = "Option::is_none")]
58    pub uptime_secs: Option<u64>,
59    #[serde(skip_serializing_if = "Option::is_none")]
60    pub session_size_bytes: Option<u64>,
61    #[serde(skip_serializing_if = "Option::is_none")]
62    pub output_bytes: Option<u64>,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub filename: Option<String>,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub content_hash: Option<String>,
67    pub ts: u64,
68}
69
70impl TeamEvent {
71    fn now() -> u64 {
72        SystemTime::now()
73            .duration_since(UNIX_EPOCH)
74            .unwrap_or_default()
75            .as_secs()
76    }
77
78    fn base(event: &str) -> Self {
79        Self {
80            event: event.into(),
81            role: None,
82            task: None,
83            recipient: None,
84            from: None,
85            to: None,
86            restart: None,
87            restart_count: None,
88            load: None,
89            working_members: None,
90            total_members: None,
91            session_running: None,
92            reason: None,
93            step: None,
94            error: None,
95            uptime_secs: None,
96            session_size_bytes: None,
97            output_bytes: None,
98            filename: None,
99            content_hash: None,
100            ts: Self::now(),
101        }
102    }
103
104    pub fn daemon_started() -> Self {
105        Self::base("daemon_started")
106    }
107
108    pub fn daemon_reloading() -> Self {
109        Self::base("daemon_reloading")
110    }
111
112    pub fn daemon_reloaded() -> Self {
113        Self::base("daemon_reloaded")
114    }
115
116    pub fn daemon_stopped() -> Self {
117        Self::base("daemon_stopped")
118    }
119
120    pub fn daemon_stopped_with_reason(reason: &str, uptime_secs: u64) -> Self {
121        Self {
122            reason: Some(reason.into()),
123            uptime_secs: Some(uptime_secs),
124            ..Self::base("daemon_stopped")
125        }
126    }
127
128    pub fn daemon_heartbeat(uptime_secs: u64) -> Self {
129        Self {
130            uptime_secs: Some(uptime_secs),
131            ..Self::base("daemon_heartbeat")
132        }
133    }
134
135    pub fn context_exhausted(
136        role: &str,
137        task: Option<u32>,
138        session_size_bytes: Option<u64>,
139    ) -> Self {
140        Self {
141            role: Some(role.into()),
142            task: task.map(|task_id| task_id.to_string()),
143            session_size_bytes,
144            ..Self::base("context_exhausted")
145        }
146    }
147
148    pub fn loop_step_error(step: &str, error: &str) -> Self {
149        Self {
150            step: Some(step.into()),
151            error: Some(error.into()),
152            ..Self::base("loop_step_error")
153        }
154    }
155
156    pub fn daemon_panic(reason: &str) -> Self {
157        Self {
158            reason: Some(reason.into()),
159            ..Self::base("daemon_panic")
160        }
161    }
162
163    pub fn task_assigned(role: &str, task: &str) -> Self {
164        Self {
165            role: Some(role.into()),
166            task: Some(task.into()),
167            ..Self::base("task_assigned")
168        }
169    }
170
171    pub fn cwd_corrected(role: &str, path: &str) -> Self {
172        Self {
173            role: Some(role.into()),
174            reason: Some(path.into()),
175            ..Self::base("cwd_corrected")
176        }
177    }
178
179    pub fn review_nudge_sent(role: &str, task: &str) -> Self {
180        Self {
181            role: Some(role.into()),
182            task: Some(task.into()),
183            ..Self::base("review_nudge_sent")
184        }
185    }
186
187    pub fn review_escalated(task: &str, reason: &str) -> Self {
188        Self {
189            task: Some(task.into()),
190            reason: Some(reason.into()),
191            ..Self::base("review_escalated")
192        }
193    }
194
195    pub fn state_reconciliation(role: Option<&str>, task: Option<&str>, correction: &str) -> Self {
196        Self {
197            role: role.map(str::to_string),
198            task: task.map(str::to_string),
199            reason: Some(correction.into()),
200            ..Self::base("state_reconciliation")
201        }
202    }
203
204    pub fn task_escalated(role: &str, task: &str, reason: Option<&str>) -> Self {
205        Self {
206            role: Some(role.into()),
207            task: Some(task.into()),
208            reason: reason.map(|r| r.into()),
209            ..Self::base("task_escalated")
210        }
211    }
212
213    pub fn task_unblocked(role: &str, task: &str) -> Self {
214        Self {
215            role: Some(role.into()),
216            task: Some(task.into()),
217            ..Self::base("task_unblocked")
218        }
219    }
220
221    pub fn performance_regression(task: &str, reason: &str) -> Self {
222        Self {
223            task: Some(task.into()),
224            reason: Some(reason.into()),
225            ..Self::base("performance_regression")
226        }
227    }
228
229    pub fn task_completed(role: &str, task: Option<&str>) -> Self {
230        Self {
231            role: Some(role.into()),
232            task: task.map(|t| t.into()),
233            ..Self::base("task_completed")
234        }
235    }
236
237    pub fn standup_generated(recipient: &str) -> Self {
238        Self {
239            recipient: Some(recipient.into()),
240            ..Self::base("standup_generated")
241        }
242    }
243
244    pub fn retro_generated() -> Self {
245        Self::base("retro_generated")
246    }
247
248    pub fn pattern_detected(pattern_type: &str, frequency: u32) -> Self {
249        Self {
250            reason: Some(format!("{pattern_type}:{frequency}")),
251            ..Self::base("pattern_detected")
252        }
253    }
254
255    pub fn member_crashed(role: &str, restart: bool) -> Self {
256        Self {
257            role: Some(role.into()),
258            restart: Some(restart),
259            ..Self::base("member_crashed")
260        }
261    }
262
263    pub fn pane_death(role: &str) -> Self {
264        Self {
265            role: Some(role.into()),
266            ..Self::base("pane_death")
267        }
268    }
269
270    pub fn pane_respawned(role: &str) -> Self {
271        Self {
272            role: Some(role.into()),
273            ..Self::base("pane_respawned")
274        }
275    }
276
277    pub fn narration_detected(role: &str, task: Option<u32>) -> Self {
278        Self {
279            role: Some(role.into()),
280            task: task.map(|id| id.to_string()),
281            ..Self::base("narration_detected")
282        }
283    }
284
285    pub fn narration_rejection(role: &str, task_id: u32, rejection_count: u32) -> Self {
286        Self {
287            role: Some(role.into()),
288            task: Some(task_id.to_string()),
289            reason: Some(format!("rejection_count={rejection_count}")),
290            ..Self::base("narration_rejection")
291        }
292    }
293
294    pub fn context_pressure_warning(
295        role: &str,
296        task: Option<u32>,
297        output_bytes: u64,
298        threshold_bytes: u64,
299    ) -> Self {
300        Self {
301            role: Some(role.into()),
302            task: task.map(|id| id.to_string()),
303            output_bytes: Some(output_bytes),
304            reason: Some(format!("threshold_bytes={threshold_bytes}")),
305            ..Self::base("context_pressure_warning")
306        }
307    }
308
309    pub fn stall_detected(role: &str, task: Option<u32>, stall_duration_secs: u64) -> Self {
310        Self {
311            role: Some(role.into()),
312            task: task.map(|id| id.to_string()),
313            uptime_secs: Some(stall_duration_secs),
314            ..Self::base("stall_detected")
315        }
316    }
317
318    /// Record a backend health state change for an agent.
319    ///
320    /// `reason` encodes the transition, e.g. "healthy→unreachable".
321    pub fn health_changed(role: &str, reason: &str) -> Self {
322        Self {
323            role: Some(role.into()),
324            reason: Some(reason.into()),
325            ..Self::base("health_changed")
326        }
327    }
328
329    pub fn message_routed(from: &str, to: &str) -> Self {
330        Self {
331            from: Some(from.into()),
332            to: Some(to.into()),
333            ..Self::base("message_routed")
334        }
335    }
336
337    pub fn agent_spawned(role: &str) -> Self {
338        Self {
339            role: Some(role.into()),
340            ..Self::base("agent_spawned")
341        }
342    }
343
344    pub fn agent_restarted(role: &str, task: &str, reason: &str, restart_count: u32) -> Self {
345        Self {
346            role: Some(role.into()),
347            task: Some(task.into()),
348            reason: Some(reason.into()),
349            restart_count: Some(restart_count),
350            ..Self::base("agent_restarted")
351        }
352    }
353
354    pub fn task_resumed(role: &str, task: &str, reason: &str, restart_count: u32) -> Self {
355        Self {
356            role: Some(role.into()),
357            task: Some(task.into()),
358            reason: Some(reason.into()),
359            restart_count: Some(restart_count),
360            ..Self::base("task_resumed")
361        }
362    }
363
364    pub fn delivery_failed(role: &str, from: &str, reason: &str) -> Self {
365        Self {
366            role: Some(role.into()),
367            from: Some(from.into()),
368            reason: Some(reason.into()),
369            ..Self::base("delivery_failed")
370        }
371    }
372
373    pub fn task_auto_merged(
374        engineer: &str,
375        task: &str,
376        confidence: f64,
377        files_changed: usize,
378        lines_changed: usize,
379    ) -> Self {
380        Self {
381            role: Some(engineer.into()),
382            task: Some(task.into()),
383            load: Some(confidence),
384            reason: Some(format!("files={} lines={}", files_changed, lines_changed)),
385            ..Self::base("task_auto_merged")
386        }
387    }
388
389    pub fn task_manual_merged(task: &str) -> Self {
390        Self {
391            task: Some(task.into()),
392            ..Self::base("task_manual_merged")
393        }
394    }
395
396    /// Emitted for every completed task to record its merge confidence score.
397    pub fn merge_confidence_scored(info: &MergeConfidenceInfo<'_>) -> Self {
398        let detail = format!(
399            "files={} lines={} migrations={} config={} renames={}",
400            info.files_changed,
401            info.lines_changed,
402            info.has_migrations,
403            info.has_config_changes,
404            info.rename_count
405        );
406        Self {
407            role: Some(info.engineer.into()),
408            task: Some(info.task.into()),
409            load: Some(info.confidence),
410            reason: Some(detail),
411            ..Self::base("merge_confidence_scored")
412        }
413    }
414
415    pub fn review_escalated_by_role(role: &str, task: &str) -> Self {
416        Self {
417            role: Some(role.into()),
418            task: Some(task.into()),
419            ..Self::base("review_escalated")
420        }
421    }
422
423    pub fn task_reworked(role: &str, task: &str) -> Self {
424        Self {
425            role: Some(role.into()),
426            task: Some(task.into()),
427            ..Self::base("task_reworked")
428        }
429    }
430
431    pub fn task_recycled(task_id: u32, cron_expr: &str) -> Self {
432        Self {
433            task: Some(format!("#{task_id}")),
434            reason: Some(cron_expr.into()),
435            ..Self::base("task_recycled")
436        }
437    }
438
439    pub fn barrier_artifact_created(role: &str, filename: &str, content_hash: &str) -> Self {
440        Self {
441            role: Some(role.into()),
442            filename: Some(filename.into()),
443            content_hash: Some(content_hash.into()),
444            ..Self::base("barrier_artifact_created")
445        }
446    }
447
448    pub fn barrier_artifact_read(role: &str, filename: &str, content_hash: &str) -> Self {
449        Self {
450            role: Some(role.into()),
451            filename: Some(filename.into()),
452            content_hash: Some(content_hash.into()),
453            ..Self::base("barrier_artifact_read")
454        }
455    }
456
457    pub fn barrier_violation_attempt(role: &str, filename: &str, reason: &str) -> Self {
458        Self {
459            role: Some(role.into()),
460            filename: Some(filename.into()),
461            reason: Some(reason.into()),
462            ..Self::base("barrier_violation_attempt")
463        }
464    }
465
466    pub fn load_snapshot(working_members: u32, total_members: u32, session_running: bool) -> Self {
467        let load = if total_members == 0 {
468            0.0
469        } else {
470            working_members as f64 / total_members as f64
471        };
472        Self {
473            load: Some(load),
474            working_members: Some(working_members),
475            total_members: Some(total_members),
476            session_running: Some(session_running),
477            ..Self::base("load_snapshot")
478        }
479    }
480
481    pub fn parity_updated(summary: &crate::team::parity::ParitySummary) -> Self {
482        Self {
483            load: Some(summary.overall_parity_pct as f64 / 100.0),
484            reason: Some(format!(
485                "total={} spec={} tests={} implementation={} verified_pass={} verified_fail={}",
486                summary.total_behaviors,
487                summary.spec_complete,
488                summary.tests_complete,
489                summary.implementation_complete,
490                summary.verified_pass,
491                summary.verified_fail
492            )),
493            ..Self::base("parity_updated")
494        }
495    }
496
497    pub fn worktree_reconciled(role: &str, branch: &str) -> Self {
498        Self {
499            role: Some(role.into()),
500            reason: Some(format!("branch '{branch}' merged into main")),
501            ..Self::base("worktree_reconciled")
502        }
503    }
504
505    /// Emitted when the daemon reconciles a topology change.
506    ///
507    /// `reason` contains a human-readable summary (e.g. "+2 added, -1 removed").
508    pub fn topology_changed(added: u32, removed: u32, reason: &str) -> Self {
509        Self {
510            working_members: Some(added),
511            total_members: Some(removed),
512            reason: Some(reason.into()),
513            ..Self::base("topology_changed")
514        }
515    }
516
517    /// Emitted when an agent is removed during a scale-down.
518    pub fn agent_removed(role: &str, reason: &str) -> Self {
519        Self {
520            role: Some(role.into()),
521            reason: Some(reason.into()),
522            ..Self::base("agent_removed")
523        }
524    }
525}
526
527pub struct EventSink {
528    writer: Box<dyn Write + Send>,
529    path: PathBuf,
530    max_bytes: Option<u64>,
531}
532
533impl EventSink {
534    pub fn new(path: &Path) -> Result<Self> {
535        Self::new_with_max_bytes(path, DEFAULT_EVENT_LOG_MAX_BYTES)
536    }
537
538    pub fn new_with_max_bytes(path: &Path, max_bytes: u64) -> Result<Self> {
539        if let Some(parent) = path.parent() {
540            std::fs::create_dir_all(parent)?;
541        }
542        rotate_event_log_if_needed(path, max_bytes, 0)?;
543        let file = OpenOptions::new()
544            .create(true)
545            .append(true)
546            .open(path)
547            .with_context(|| format!("failed to open event sink: {}", path.display()))?;
548        Ok(Self {
549            writer: Box::new(BufWriter::new(file)),
550            path: path.to_path_buf(),
551            max_bytes: Some(max_bytes),
552        })
553    }
554
555    #[cfg(test)]
556    pub(crate) fn from_writer(path: &Path, writer: impl Write + Send + 'static) -> Self {
557        Self {
558            writer: Box::new(writer),
559            path: path.to_path_buf(),
560            max_bytes: None,
561        }
562    }
563
564    pub fn emit(&mut self, event: TeamEvent) -> Result<()> {
565        let json = serde_json::to_string(&event)?;
566        self.rotate_if_needed((json.len() + 1) as u64)?;
567        writeln!(self.writer, "{json}")?;
568        self.writer.flush()?;
569        Ok(())
570    }
571
572    pub fn path(&self) -> &Path {
573        &self.path
574    }
575
576    fn rotate_if_needed(&mut self, next_entry_bytes: u64) -> Result<()> {
577        let Some(max_bytes) = self.max_bytes else {
578            return Ok(());
579        };
580        self.writer.flush()?;
581        if rotate_event_log_if_needed(&self.path, max_bytes, next_entry_bytes)? {
582            self.writer = Box::new(BufWriter::new(
583                OpenOptions::new()
584                    .create(true)
585                    .append(true)
586                    .open(&self.path)
587                    .with_context(|| {
588                        format!("failed to reopen event sink: {}", self.path.display())
589                    })?,
590            ));
591        }
592        Ok(())
593    }
594}
595
596fn rotated_event_log_path(path: &Path) -> PathBuf {
597    PathBuf::from(format!("{}.1", path.display()))
598}
599
600fn rotate_event_log_if_needed(path: &Path, max_bytes: u64, next_entry_bytes: u64) -> Result<bool> {
601    let len = match fs::metadata(path) {
602        Ok(metadata) => metadata.len(),
603        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(false),
604        Err(error) => {
605            return Err(error).with_context(|| format!("failed to stat {}", path.display()));
606        }
607    };
608
609    if len == 0 {
610        return Ok(false);
611    }
612
613    if len.saturating_add(next_entry_bytes) <= max_bytes {
614        return Ok(false);
615    }
616
617    let rotated = rotated_event_log_path(path);
618    if rotated.exists() {
619        fs::remove_file(&rotated)
620            .with_context(|| format!("failed to remove {}", rotated.display()))?;
621    }
622    fs::rename(path, &rotated).with_context(|| {
623        format!(
624            "failed to rotate event log {} to {}",
625            path.display(),
626            rotated.display()
627        )
628    })?;
629    Ok(true)
630}
631
632pub fn read_events(path: &Path) -> Result<Vec<TeamEvent>> {
633    if !path.exists() {
634        return Ok(Vec::new());
635    }
636    let content = fs::read_to_string(path).context("failed to read event log")?;
637    let mut events = Vec::new();
638    for line in content.lines() {
639        let line = line.trim();
640        if line.is_empty() {
641            continue;
642        }
643        if let Ok(event) = serde_json::from_str::<TeamEvent>(line) {
644            events.push(event);
645        }
646    }
647    Ok(events)
648}
649
650#[cfg(test)]
651mod tests {
652    use std::sync::{Arc, Mutex};
653    use std::thread;
654
655    use super::*;
656
657    #[test]
658    fn event_serializes_to_json() {
659        let event = TeamEvent::task_assigned("eng-1-1", "fix auth bug");
660        let json = serde_json::to_string(&event).unwrap();
661        assert!(json.contains("\"event\":\"task_assigned\""));
662        assert!(json.contains("\"role\":\"eng-1-1\""));
663        assert!(json.contains("\"task\":\"fix auth bug\""));
664        assert!(json.contains("\"ts\":"));
665    }
666
667    #[test]
668    fn optional_fields_omitted() {
669        let event = TeamEvent::daemon_started();
670        let json = serde_json::to_string(&event).unwrap();
671        assert!(!json.contains("\"role\""));
672        assert!(!json.contains("\"task\""));
673    }
674
675    #[test]
676    fn event_sink_writes_jsonl() {
677        let tmp = tempfile::tempdir().unwrap();
678        let path = tmp.path().join("events.jsonl");
679        let mut sink = EventSink::new(&path).unwrap();
680        sink.emit(TeamEvent::daemon_started()).unwrap();
681        sink.emit(TeamEvent::task_assigned("eng-1", "fix bug"))
682            .unwrap();
683        sink.emit(TeamEvent::daemon_stopped()).unwrap();
684
685        let content = std::fs::read_to_string(&path).unwrap();
686        let lines: Vec<&str> = content.lines().collect();
687        assert_eq!(lines.len(), 3);
688        assert!(lines[0].contains("daemon_started"));
689        assert!(lines[1].contains("task_assigned"));
690        assert!(lines[2].contains("daemon_stopped"));
691    }
692
693    #[test]
694    fn all_event_variants_serialize_with_correct_event_field() {
695        let variants: Vec<(&str, TeamEvent)> = vec![
696            ("daemon_started", TeamEvent::daemon_started()),
697            ("daemon_reloading", TeamEvent::daemon_reloading()),
698            ("daemon_reloaded", TeamEvent::daemon_reloaded()),
699            ("daemon_stopped", TeamEvent::daemon_stopped()),
700            (
701                "daemon_stopped",
702                TeamEvent::daemon_stopped_with_reason("signal", 3600),
703            ),
704            ("daemon_heartbeat", TeamEvent::daemon_heartbeat(120)),
705            (
706                "context_exhausted",
707                TeamEvent::context_exhausted("eng-1", Some(42), Some(1_024)),
708            ),
709            (
710                "loop_step_error",
711                TeamEvent::loop_step_error("poll_watchers", "tmux error"),
712            ),
713            (
714                "daemon_panic",
715                TeamEvent::daemon_panic("index out of bounds"),
716            ),
717            ("task_assigned", TeamEvent::task_assigned("eng-1", "task")),
718            (
719                "cwd_corrected",
720                TeamEvent::cwd_corrected("eng-1", "/tmp/worktree"),
721            ),
722            (
723                "task_escalated",
724                TeamEvent::task_escalated("eng-1", "task", None),
725            ),
726            ("task_unblocked", TeamEvent::task_unblocked("eng-1", "task")),
727            (
728                "performance_regression",
729                TeamEvent::performance_regression("42", "runtime_ms=1300 avg_ms=1000 pct=30"),
730            ),
731            (
732                "task_completed",
733                TeamEvent::task_completed("eng-1", Some("42")),
734            ),
735            ("standup_generated", TeamEvent::standup_generated("manager")),
736            ("retro_generated", TeamEvent::retro_generated()),
737            (
738                "pattern_detected",
739                TeamEvent::pattern_detected("merge_conflict_recurrence", 5),
740            ),
741            ("member_crashed", TeamEvent::member_crashed("eng-1", true)),
742            ("pane_death", TeamEvent::pane_death("eng-1")),
743            ("pane_respawned", TeamEvent::pane_respawned("eng-1")),
744            (
745                "context_pressure_warning",
746                TeamEvent::context_pressure_warning("eng-1", Some(42), 400_000, 512_000),
747            ),
748            ("message_routed", TeamEvent::message_routed("a", "b")),
749            ("agent_spawned", TeamEvent::agent_spawned("eng-1")),
750            (
751                "agent_restarted",
752                TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 1),
753            ),
754            (
755                "delivery_failed",
756                TeamEvent::delivery_failed("eng-1", "manager", "message marker missing"),
757            ),
758            (
759                "task_auto_merged",
760                TeamEvent::task_auto_merged("eng-1", "42", 0.95, 2, 30),
761            ),
762            ("task_manual_merged", TeamEvent::task_manual_merged("42")),
763            (
764                "merge_confidence_scored",
765                TeamEvent::merge_confidence_scored(&MergeConfidenceInfo {
766                    engineer: "eng-1",
767                    task: "42",
768                    confidence: 0.85,
769                    files_changed: 3,
770                    lines_changed: 50,
771                    has_migrations: false,
772                    has_config_changes: false,
773                    rename_count: 0,
774                }),
775            ),
776            (
777                "review_nudge_sent",
778                TeamEvent::review_nudge_sent("manager", "42"),
779            ),
780            (
781                "review_escalated",
782                TeamEvent::review_escalated("42", "stale review"),
783            ),
784            (
785                "state_reconciliation",
786                TeamEvent::state_reconciliation(Some("eng-1"), Some("42"), "adopt"),
787            ),
788            ("task_reworked", TeamEvent::task_reworked("eng-1", "42")),
789            ("load_snapshot", TeamEvent::load_snapshot(2, 5, true)),
790            (
791                "parity_updated",
792                TeamEvent::parity_updated(&crate::team::parity::ParitySummary {
793                    total_behaviors: 10,
794                    spec_complete: 8,
795                    tests_complete: 6,
796                    implementation_complete: 5,
797                    verified_pass: 4,
798                    verified_fail: 1,
799                    overall_parity_pct: 40,
800                }),
801            ),
802            (
803                "worktree_reconciled",
804                TeamEvent::worktree_reconciled("eng-1", "eng-1/42"),
805            ),
806        ];
807        for (expected_event, event) in &variants {
808            let json = serde_json::to_string(event).unwrap();
809            let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
810            assert_eq!(parsed["event"].as_str().unwrap(), *expected_event);
811            assert!(parsed["ts"].as_u64().is_some());
812        }
813    }
814
815    #[test]
816    fn load_snapshot_serializes_optional_metrics() {
817        let event = TeamEvent::load_snapshot(3, 7, false);
818        let json = serde_json::to_string(&event).unwrap();
819        assert!(json.contains("\"event\":\"load_snapshot\""));
820        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
821        assert_eq!(parsed["load"].as_f64().unwrap(), 3.0 / 7.0);
822        assert_eq!(parsed["working_members"].as_u64().unwrap(), 3);
823        assert_eq!(parsed["total_members"].as_u64().unwrap(), 7);
824        assert!(!parsed["session_running"].as_bool().unwrap());
825    }
826
827    #[test]
828    fn parity_updated_serializes_summary_metrics() {
829        let event = TeamEvent::parity_updated(&crate::team::parity::ParitySummary {
830            total_behaviors: 10,
831            spec_complete: 8,
832            tests_complete: 6,
833            implementation_complete: 5,
834            verified_pass: 4,
835            verified_fail: 1,
836            overall_parity_pct: 40,
837        });
838        let json = serde_json::to_string(&event).unwrap();
839        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
840        assert_eq!(parsed["event"].as_str().unwrap(), "parity_updated");
841        assert_eq!(parsed["load"].as_f64().unwrap(), 0.4);
842        let reason = parsed["reason"].as_str().unwrap();
843        assert!(reason.contains("total=10"));
844        assert!(reason.contains("spec=8"));
845        assert!(reason.contains("verified_pass=4"));
846    }
847
848    #[test]
849    fn read_events_parses_all_known_lines() {
850        let tmp = tempfile::tempdir().unwrap();
851        let path = tmp.path().join("events.jsonl");
852        let mut sink = EventSink::new(&path).unwrap();
853        sink.emit(TeamEvent::daemon_started()).unwrap();
854        sink.emit(TeamEvent::load_snapshot(1, 4, true)).unwrap();
855        sink.emit(TeamEvent::load_snapshot(2, 4, true)).unwrap();
856
857        let events = read_events(&path).unwrap();
858        assert_eq!(events.len(), 3);
859        assert_eq!(events[1].event, "load_snapshot");
860        assert_eq!(events[1].working_members, Some(1));
861        assert_eq!(events[2].total_members, Some(4));
862    }
863
864    #[test]
865    fn event_sink_appends_to_existing_file() {
866        let tmp = tempfile::tempdir().unwrap();
867        let path = tmp.path().join("events.jsonl");
868
869        // Write one event and close the sink
870        {
871            let mut sink = EventSink::new(&path).unwrap();
872            sink.emit(TeamEvent::daemon_started()).unwrap();
873        }
874
875        // Open again and append another
876        {
877            let mut sink = EventSink::new(&path).unwrap();
878            sink.emit(TeamEvent::daemon_stopped()).unwrap();
879        }
880
881        let content = std::fs::read_to_string(&path).unwrap();
882        let lines: Vec<&str> = content.lines().collect();
883        assert_eq!(lines.len(), 2);
884        assert!(lines[0].contains("daemon_started"));
885        assert!(lines[1].contains("daemon_stopped"));
886    }
887
888    #[test]
889    fn event_with_special_characters_in_task() {
890        let event = TeamEvent::task_assigned("eng-1", "fix: \"quotes\" & <angles> / \\ newline\n");
891        let json = serde_json::to_string(&event).unwrap();
892        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
893        let task_val = parsed["task"].as_str().unwrap();
894        assert!(task_val.contains("\"quotes\""));
895        assert!(task_val.contains("<angles>"));
896    }
897
898    #[test]
899    fn task_escalated_serializes_role_and_task() {
900        let event = TeamEvent::task_escalated("eng-1-1", "42", Some("tests_failed"));
901        let json = serde_json::to_string(&event).unwrap();
902        assert!(json.contains("\"event\":\"task_escalated\""));
903        assert!(json.contains("\"role\":\"eng-1-1\""));
904        assert!(json.contains("\"task\":\"42\""));
905        assert!(json.contains("\"reason\":\"tests_failed\""));
906    }
907
908    #[test]
909    fn cwd_corrected_serializes_role_and_reason() {
910        let event = TeamEvent::cwd_corrected("eng-1-1", "/tmp/worktree");
911        let json = serde_json::to_string(&event).unwrap();
912        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
913        assert_eq!(parsed["event"].as_str().unwrap(), "cwd_corrected");
914        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
915        assert_eq!(parsed["reason"].as_str().unwrap(), "/tmp/worktree");
916    }
917
918    #[test]
919    fn pane_death_serializes_role() {
920        let event = TeamEvent::pane_death("eng-1-1");
921        let json = serde_json::to_string(&event).unwrap();
922        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
923        assert_eq!(parsed["event"].as_str().unwrap(), "pane_death");
924        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
925    }
926
927    #[test]
928    fn pane_respawned_serializes_role() {
929        let event = TeamEvent::pane_respawned("eng-1-1");
930        let json = serde_json::to_string(&event).unwrap();
931        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
932        assert_eq!(parsed["event"].as_str().unwrap(), "pane_respawned");
933        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
934    }
935
936    #[test]
937    fn agent_restarted_includes_reason_task_and_count() {
938        let event = TeamEvent::agent_restarted("eng-1-2", "67", "context_exhausted", 2);
939        let json = serde_json::to_string(&event).unwrap();
940        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
941        assert_eq!(parsed["event"].as_str().unwrap(), "agent_restarted");
942        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
943        assert_eq!(parsed["task"].as_str().unwrap(), "67");
944        assert_eq!(parsed["reason"].as_str().unwrap(), "context_exhausted");
945        assert_eq!(parsed["restart_count"].as_u64().unwrap(), 2);
946    }
947
948    #[test]
949    fn context_pressure_warning_includes_threshold_and_output_bytes() {
950        let event = TeamEvent::context_pressure_warning("eng-1-2", Some(67), 420_000, 512_000);
951        let json = serde_json::to_string(&event).unwrap();
952        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
953
954        assert_eq!(
955            parsed["event"].as_str().unwrap(),
956            "context_pressure_warning"
957        );
958        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
959        assert_eq!(parsed["task"].as_str().unwrap(), "67");
960        assert_eq!(parsed["output_bytes"].as_u64().unwrap(), 420_000);
961        assert_eq!(parsed["reason"].as_str().unwrap(), "threshold_bytes=512000");
962    }
963
964    #[test]
965    fn delivery_failed_includes_role_sender_and_reason() {
966        let event = TeamEvent::delivery_failed("eng-1-2", "manager", "message marker missing");
967        let json = serde_json::to_string(&event).unwrap();
968        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
969        assert_eq!(parsed["event"].as_str().unwrap(), "delivery_failed");
970        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
971        assert_eq!(parsed["from"].as_str().unwrap(), "manager");
972        assert_eq!(parsed["reason"].as_str().unwrap(), "message marker missing");
973    }
974
975    #[test]
976    fn task_unblocked_serializes_role_and_task() {
977        let event = TeamEvent::task_unblocked("eng-1-1", "42");
978        let json = serde_json::to_string(&event).unwrap();
979        assert!(json.contains("\"event\":\"task_unblocked\""));
980        assert!(json.contains("\"role\":\"eng-1-1\""));
981        assert!(json.contains("\"task\":\"42\""));
982    }
983
984    #[test]
985    fn merge_confidence_scored_includes_all_fields() {
986        let event = TeamEvent::merge_confidence_scored(&MergeConfidenceInfo {
987            engineer: "eng-1-1",
988            task: "42",
989            confidence: 0.85,
990            files_changed: 3,
991            lines_changed: 50,
992            has_migrations: true,
993            has_config_changes: false,
994            rename_count: 1,
995        });
996        let json = serde_json::to_string(&event).unwrap();
997        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
998        assert_eq!(parsed["event"].as_str().unwrap(), "merge_confidence_scored");
999        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
1000        assert_eq!(parsed["task"].as_str().unwrap(), "42");
1001        assert!((parsed["load"].as_f64().unwrap() - 0.85).abs() < 0.001);
1002        let reason = parsed["reason"].as_str().unwrap();
1003        assert!(reason.contains("files=3"));
1004        assert!(reason.contains("lines=50"));
1005        assert!(reason.contains("migrations=true"));
1006        assert!(reason.contains("config=false"));
1007        assert!(reason.contains("renames=1"));
1008    }
1009
1010    #[test]
1011    fn performance_regression_serializes_task_and_reason() {
1012        let event = TeamEvent::performance_regression("42", "runtime_ms=1300 avg_ms=1000 pct=30");
1013        let json = serde_json::to_string(&event).unwrap();
1014        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1015        assert_eq!(parsed["event"].as_str().unwrap(), "performance_regression");
1016        assert_eq!(parsed["task"].as_str().unwrap(), "42");
1017        assert_eq!(
1018            parsed["reason"].as_str().unwrap(),
1019            "runtime_ms=1300 avg_ms=1000 pct=30"
1020        );
1021    }
1022
1023    #[test]
1024    fn daemon_stopped_with_reason_includes_fields() {
1025        let event = TeamEvent::daemon_stopped_with_reason("signal", 7200);
1026        let json = serde_json::to_string(&event).unwrap();
1027        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1028        assert_eq!(parsed["reason"].as_str().unwrap(), "signal");
1029        assert_eq!(parsed["uptime_secs"].as_u64().unwrap(), 7200);
1030    }
1031
1032    #[test]
1033    fn heartbeat_includes_uptime() {
1034        let event = TeamEvent::daemon_heartbeat(600);
1035        let json = serde_json::to_string(&event).unwrap();
1036        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1037        assert_eq!(parsed["event"].as_str().unwrap(), "daemon_heartbeat");
1038        assert_eq!(parsed["uptime_secs"].as_u64().unwrap(), 600);
1039        // No reason/step/error fields
1040        assert!(parsed.get("reason").is_none());
1041        assert!(parsed.get("step").is_none());
1042    }
1043
1044    #[test]
1045    fn context_exhausted_includes_role_task_and_session_size() {
1046        let event = TeamEvent::context_exhausted("eng-1", Some(77), Some(4096));
1047        let json = serde_json::to_string(&event).unwrap();
1048        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1049        assert_eq!(parsed["event"].as_str().unwrap(), "context_exhausted");
1050        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1");
1051        assert_eq!(parsed["task"].as_str().unwrap(), "77");
1052        assert_eq!(parsed["session_size_bytes"].as_u64().unwrap(), 4096);
1053    }
1054
1055    #[test]
1056    fn loop_step_error_includes_step_and_error() {
1057        let event = TeamEvent::loop_step_error("poll_watchers", "connection refused");
1058        let json = serde_json::to_string(&event).unwrap();
1059        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1060        assert_eq!(parsed["step"].as_str().unwrap(), "poll_watchers");
1061        assert_eq!(parsed["error"].as_str().unwrap(), "connection refused");
1062    }
1063
1064    #[test]
1065    fn daemon_panic_includes_reason() {
1066        let event = TeamEvent::daemon_panic("index out of bounds");
1067        let json = serde_json::to_string(&event).unwrap();
1068        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1069        assert_eq!(parsed["event"].as_str().unwrap(), "daemon_panic");
1070        assert_eq!(parsed["reason"].as_str().unwrap(), "index out of bounds");
1071    }
1072
1073    #[test]
1074    fn new_fields_omitted_from_basic_events() {
1075        let event = TeamEvent::daemon_started();
1076        let json = serde_json::to_string(&event).unwrap();
1077        assert!(!json.contains("\"reason\""));
1078        assert!(!json.contains("\"step\""));
1079        assert!(!json.contains("\"error\""));
1080        assert!(!json.contains("\"uptime_secs\""));
1081        assert!(!json.contains("\"restart_count\""));
1082    }
1083
1084    #[test]
1085    fn pattern_detected_includes_reason_payload() {
1086        let event = TeamEvent::pattern_detected("escalation_cluster", 6);
1087        let json = serde_json::to_string(&event).unwrap();
1088        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1089        assert_eq!(parsed["event"].as_str().unwrap(), "pattern_detected");
1090        assert_eq!(parsed["reason"].as_str().unwrap(), "escalation_cluster:6");
1091    }
1092
1093    #[test]
1094    fn event_sink_creates_parent_directories() {
1095        let tmp = tempfile::tempdir().unwrap();
1096        let path = tmp.path().join("deep").join("nested").join("events.jsonl");
1097        let mut sink = EventSink::new(&path).unwrap();
1098        sink.emit(TeamEvent::daemon_started()).unwrap();
1099        assert!(path.exists());
1100        assert_eq!(sink.path(), path);
1101    }
1102
1103    #[test]
1104    fn event_sink_rotates_oversized_log_on_open() {
1105        let tmp = tempfile::tempdir().unwrap();
1106        let path = tmp.path().join("events.jsonl");
1107        fs::write(&path, "0123456789").unwrap();
1108
1109        let mut sink = EventSink::new_with_max_bytes(&path, 5).unwrap();
1110        sink.emit(TeamEvent::daemon_started()).unwrap();
1111
1112        let rotated = rotated_event_log_path(&path);
1113        assert_eq!(fs::read_to_string(&rotated).unwrap(), "0123456789");
1114        let current = fs::read_to_string(&path).unwrap();
1115        assert!(current.contains("daemon_started"));
1116    }
1117
1118    #[test]
1119    fn event_sink_rotates_before_write_that_would_exceed_threshold() {
1120        let tmp = tempfile::tempdir().unwrap();
1121        let path = tmp.path().join("events.jsonl");
1122        let first_line = "{\"event\":\"first\"}\n";
1123        fs::write(&path, first_line).unwrap();
1124
1125        let mut sink = EventSink::new_with_max_bytes(&path, first_line.len() as u64 + 10).unwrap();
1126        sink.emit(TeamEvent::task_assigned(
1127            "eng-1",
1128            "this assignment is long enough to rotate",
1129        ))
1130        .unwrap();
1131
1132        let rotated = rotated_event_log_path(&path);
1133        assert_eq!(fs::read_to_string(&rotated).unwrap(), first_line);
1134        let current = fs::read_to_string(&path).unwrap();
1135        assert!(current.contains("task_assigned"));
1136        assert!(!current.contains("\"event\":\"first\""));
1137    }
1138
1139    #[test]
1140    fn event_round_trip_preserves_fields_for_agent_restarted() {
1141        let original = TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 3);
1142
1143        let json = serde_json::to_string(&original).unwrap();
1144        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1145
1146        assert_eq!(parsed.event, "agent_restarted");
1147        assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1148        assert_eq!(parsed.task.as_deref(), Some("42"));
1149        assert_eq!(parsed.reason.as_deref(), Some("context_exhausted"));
1150        assert_eq!(parsed.restart_count, Some(3));
1151        assert_eq!(parsed.ts, original.ts);
1152    }
1153
1154    #[test]
1155    fn event_round_trip_preserves_fields_for_load_snapshot() {
1156        let original = TeamEvent::load_snapshot(4, 8, true);
1157
1158        let json = serde_json::to_string(&original).unwrap();
1159        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1160
1161        assert_eq!(parsed.event, "load_snapshot");
1162        assert_eq!(parsed.working_members, Some(4));
1163        assert_eq!(parsed.total_members, Some(8));
1164        assert_eq!(parsed.session_running, Some(true));
1165        assert_eq!(parsed.load, Some(0.5));
1166        assert_eq!(parsed.ts, original.ts);
1167    }
1168
1169    #[test]
1170    fn event_round_trip_preserves_fields_for_delivery_failed() {
1171        let original = TeamEvent::delivery_failed("eng-2", "manager", "marker missing");
1172
1173        let json = serde_json::to_string(&original).unwrap();
1174        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1175
1176        assert_eq!(parsed.event, "delivery_failed");
1177        assert_eq!(parsed.role.as_deref(), Some("eng-2"));
1178        assert_eq!(parsed.from.as_deref(), Some("manager"));
1179        assert_eq!(parsed.reason.as_deref(), Some("marker missing"));
1180        assert_eq!(parsed.ts, original.ts);
1181    }
1182
1183    #[test]
1184    fn read_events_skips_blank_and_malformed_lines() {
1185        let tmp = tempfile::tempdir().unwrap();
1186        let path = tmp.path().join("events.jsonl");
1187        fs::write(
1188            &path,
1189            [
1190                "",
1191                "{\"event\":\"daemon_started\",\"ts\":1}",
1192                "not-json",
1193                "   ",
1194                "{\"event\":\"daemon_stopped\",\"ts\":2}",
1195            ]
1196            .join("\n"),
1197        )
1198        .unwrap();
1199
1200        let events = read_events(&path).unwrap();
1201
1202        assert_eq!(events.len(), 2);
1203        assert_eq!(events[0].event, "daemon_started");
1204        assert_eq!(events[1].event, "daemon_stopped");
1205    }
1206
1207    #[test]
1208    fn rotate_event_log_if_needed_returns_false_for_missing_file() {
1209        let tmp = tempfile::tempdir().unwrap();
1210        let path = tmp.path().join("events.jsonl");
1211
1212        let rotated = rotate_event_log_if_needed(&path, 128, 0).unwrap();
1213
1214        assert!(!rotated);
1215        assert!(!rotated_event_log_path(&path).exists());
1216    }
1217
1218    #[test]
1219    fn rotate_event_log_if_needed_returns_false_for_empty_file() {
1220        let tmp = tempfile::tempdir().unwrap();
1221        let path = tmp.path().join("events.jsonl");
1222        fs::write(&path, "").unwrap();
1223
1224        let rotated = rotate_event_log_if_needed(&path, 1, 1).unwrap();
1225
1226        assert!(!rotated);
1227        assert!(path.exists());
1228        assert!(!rotated_event_log_path(&path).exists());
1229    }
1230
1231    #[test]
1232    fn rotate_event_log_if_needed_replaces_existing_rotated_file() {
1233        let tmp = tempfile::tempdir().unwrap();
1234        let path = tmp.path().join("events.jsonl");
1235        let rotated_path = rotated_event_log_path(&path);
1236        fs::write(&path, "current-events").unwrap();
1237        fs::write(&rotated_path, "old-rotated-events").unwrap();
1238
1239        let rotated = rotate_event_log_if_needed(&path, 5, 0).unwrap();
1240
1241        assert!(rotated);
1242        assert_eq!(fs::read_to_string(&rotated_path).unwrap(), "current-events");
1243    }
1244
1245    #[test]
1246    fn concurrent_event_sinks_append_without_losing_lines() {
1247        let tmp = tempfile::tempdir().unwrap();
1248        let path = Arc::new(tmp.path().join("events.jsonl"));
1249        let ready = Arc::new(std::sync::Barrier::new(5));
1250        let errors = Arc::new(Mutex::new(Vec::<String>::new()));
1251        let mut handles = Vec::new();
1252
1253        for idx in 0..4 {
1254            let path = Arc::clone(&path);
1255            let ready = Arc::clone(&ready);
1256            let errors = Arc::clone(&errors);
1257            handles.push(thread::spawn(move || {
1258                ready.wait();
1259                let result = (|| -> Result<()> {
1260                    let mut sink = EventSink::new(&path)?;
1261                    sink.emit(TeamEvent::task_assigned(
1262                        &format!("eng-{idx}"),
1263                        &format!("task-{idx}"),
1264                    ))?;
1265                    Ok(())
1266                })();
1267                if let Err(error) = result {
1268                    errors.lock().unwrap().push(error.to_string());
1269                }
1270            }));
1271        }
1272
1273        ready.wait();
1274        for handle in handles {
1275            handle.join().unwrap();
1276        }
1277
1278        assert!(errors.lock().unwrap().is_empty());
1279        let events = read_events(&path).unwrap();
1280        assert_eq!(events.len(), 4);
1281        for idx in 0..4 {
1282            assert!(
1283                events
1284                    .iter()
1285                    .any(|event| event.role.as_deref() == Some(&format!("eng-{idx}")))
1286            );
1287        }
1288    }
1289
1290    #[test]
1291    fn read_events_handles_large_log_file() {
1292        let tmp = tempfile::tempdir().unwrap();
1293        let path = tmp.path().join("events.jsonl");
1294        let mut sink = EventSink::new(&path).unwrap();
1295
1296        for idx in 0..512 {
1297            sink.emit(TeamEvent::task_assigned(
1298                &format!("eng-{idx}"),
1299                &"x".repeat(128),
1300            ))
1301            .unwrap();
1302        }
1303
1304        let events = read_events(&path).unwrap();
1305
1306        assert_eq!(events.len(), 512);
1307        assert_eq!(events.first().unwrap().event, "task_assigned");
1308        assert_eq!(events.last().unwrap().event, "task_assigned");
1309    }
1310
1311    fn production_unwrap_expect_count(source: &str) -> usize {
1312        let prod = if let Some(pos) = source.find("\n#[cfg(test)]\nmod tests") {
1313            &source[..pos]
1314        } else {
1315            source
1316        };
1317        prod.lines()
1318            .filter(|line| {
1319                let trimmed = line.trim();
1320                !trimmed.starts_with("#[cfg(test)]")
1321                    && (trimmed.contains(".unwrap(") || trimmed.contains(".expect("))
1322            })
1323            .count()
1324    }
1325
1326    #[test]
1327    fn task_completed_includes_task_id() {
1328        let event = TeamEvent::task_completed("eng-1", Some("42"));
1329        let json = serde_json::to_string(&event).unwrap();
1330        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1331        assert_eq!(parsed["event"].as_str().unwrap(), "task_completed");
1332        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1");
1333        assert_eq!(parsed["task"].as_str().unwrap(), "42");
1334    }
1335
1336    #[test]
1337    fn task_completed_without_task_id_omits_task_field() {
1338        let event = TeamEvent::task_completed("eng-1", None);
1339        let json = serde_json::to_string(&event).unwrap();
1340        assert!(json.contains("\"event\":\"task_completed\""));
1341        assert!(json.contains("\"role\":\"eng-1\""));
1342        assert!(!json.contains("\"task\""));
1343    }
1344
1345    #[test]
1346    fn task_escalated_without_reason_omits_reason_field() {
1347        let event = TeamEvent::task_escalated("eng-1", "42", None);
1348        let json = serde_json::to_string(&event).unwrap();
1349        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1350        assert_eq!(parsed["event"].as_str().unwrap(), "task_escalated");
1351        assert_eq!(parsed["task"].as_str().unwrap(), "42");
1352        assert!(parsed.get("reason").is_none());
1353    }
1354
1355    #[test]
1356    fn task_escalated_with_reason_includes_reason_field() {
1357        let event = TeamEvent::task_escalated("eng-1", "42", Some("merge_conflict"));
1358        let json = serde_json::to_string(&event).unwrap();
1359        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1360        assert_eq!(parsed["event"].as_str().unwrap(), "task_escalated");
1361        assert_eq!(parsed["task"].as_str().unwrap(), "42");
1362        assert_eq!(parsed["reason"].as_str().unwrap(), "merge_conflict");
1363    }
1364
1365    #[test]
1366    fn task_completed_round_trip_preserves_task_id() {
1367        let original = TeamEvent::task_completed("eng-1", Some("99"));
1368        let json = serde_json::to_string(&original).unwrap();
1369        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1370        assert_eq!(parsed.event, "task_completed");
1371        assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1372        assert_eq!(parsed.task.as_deref(), Some("99"));
1373    }
1374
1375    #[test]
1376    fn task_escalated_round_trip_preserves_reason() {
1377        let original = TeamEvent::task_escalated("eng-1", "42", Some("context_exhausted"));
1378        let json = serde_json::to_string(&original).unwrap();
1379        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1380        assert_eq!(parsed.event, "task_escalated");
1381        assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1382        assert_eq!(parsed.task.as_deref(), Some("42"));
1383        assert_eq!(parsed.reason.as_deref(), Some("context_exhausted"));
1384    }
1385
1386    #[test]
1387    fn production_events_has_no_unwrap_or_expect_calls() {
1388        let src = include_str!("events.rs");
1389        assert_eq!(
1390            production_unwrap_expect_count(src),
1391            0,
1392            "production events.rs should avoid unwrap/expect"
1393        );
1394    }
1395
1396    #[test]
1397    fn stall_detected_event_fields() {
1398        let event = TeamEvent::stall_detected("eng-1-1", Some(42), 300);
1399        assert_eq!(event.event, "stall_detected");
1400        assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1401        assert_eq!(event.task.as_deref(), Some("42"));
1402        assert_eq!(event.uptime_secs, Some(300));
1403    }
1404
1405    #[test]
1406    fn stall_detected_event_without_task() {
1407        let event = TeamEvent::stall_detected("eng-1-1", None, 600);
1408        assert_eq!(event.event, "stall_detected");
1409        assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1410        assert!(event.task.is_none());
1411        assert_eq!(event.uptime_secs, Some(600));
1412    }
1413
1414    #[test]
1415    fn stall_detected_event_serializes_to_jsonl() {
1416        let event = TeamEvent::stall_detected("eng-1-1", Some(42), 300);
1417        let json = serde_json::to_string(&event).unwrap();
1418        assert!(json.contains("\"stall_detected\""));
1419        assert!(json.contains("\"eng-1-1\""));
1420        assert!(json.contains("\"42\""));
1421    }
1422
1423    #[test]
1424    fn health_changed_event_fields() {
1425        let event = TeamEvent::health_changed("eng-1-1", "healthy→unreachable");
1426        assert_eq!(event.event, "health_changed");
1427        assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1428        assert_eq!(event.reason.as_deref(), Some("healthy→unreachable"));
1429    }
1430
1431    #[test]
1432    fn health_changed_event_serializes_to_jsonl() {
1433        let event = TeamEvent::health_changed("eng-1-2", "unreachable→healthy");
1434        let json = serde_json::to_string(&event).unwrap();
1435        assert!(json.contains("\"health_changed\""));
1436        assert!(json.contains("\"eng-1-2\""));
1437    }
1438
1439    // --- Error path and recovery tests (Task #265) ---
1440
1441    #[test]
1442    fn event_sink_on_readonly_dir_returns_error() {
1443        #[cfg(unix)]
1444        {
1445            use std::os::unix::fs::PermissionsExt;
1446            let tmp = tempfile::tempdir().unwrap();
1447            let readonly_dir = tmp.path().join("readonly");
1448            fs::create_dir(&readonly_dir).unwrap();
1449            fs::set_permissions(&readonly_dir, fs::Permissions::from_mode(0o444)).unwrap();
1450
1451            let path = readonly_dir.join("subdir").join("events.jsonl");
1452            let result = EventSink::new(&path);
1453            assert!(result.is_err());
1454
1455            // Restore permissions for cleanup
1456            fs::set_permissions(&readonly_dir, fs::Permissions::from_mode(0o755)).unwrap();
1457        }
1458    }
1459
1460    #[test]
1461    fn read_events_from_nonexistent_file_returns_empty() {
1462        let tmp = tempfile::tempdir().unwrap();
1463        let path = tmp.path().join("does_not_exist.jsonl");
1464        let events = read_events(&path).unwrap();
1465        assert!(events.is_empty());
1466    }
1467
1468    #[test]
1469    fn read_events_all_malformed_lines_returns_empty() {
1470        let tmp = tempfile::tempdir().unwrap();
1471        let path = tmp.path().join("events.jsonl");
1472        fs::write(&path, "not json\nalso not json\n{invalid}\n").unwrap();
1473        let events = read_events(&path).unwrap();
1474        assert!(events.is_empty());
1475    }
1476
1477    #[test]
1478    fn event_sink_emit_with_failing_writer() {
1479        struct FailWriter;
1480        impl Write for FailWriter {
1481            fn write(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
1482                Err(std::io::Error::new(
1483                    std::io::ErrorKind::BrokenPipe,
1484                    "simulated write failure",
1485                ))
1486            }
1487            fn flush(&mut self) -> std::io::Result<()> {
1488                Err(std::io::Error::new(
1489                    std::io::ErrorKind::BrokenPipe,
1490                    "simulated flush failure",
1491                ))
1492            }
1493        }
1494
1495        let tmp = tempfile::tempdir().unwrap();
1496        let path = tmp.path().join("events.jsonl");
1497        let mut sink = EventSink::from_writer(&path, FailWriter);
1498        let result = sink.emit(TeamEvent::daemon_started());
1499        assert!(result.is_err());
1500    }
1501
1502    #[test]
1503    fn rotate_event_log_replaces_stale_rotated_file() {
1504        let tmp = tempfile::tempdir().unwrap();
1505        let path = tmp.path().join("events.jsonl");
1506        let rotated = rotated_event_log_path(&path);
1507
1508        fs::write(&path, "current-data-that-is-large").unwrap();
1509        fs::write(&rotated, "old-rotated-data").unwrap();
1510
1511        let did_rotate = rotate_event_log_if_needed(&path, 5, 0).unwrap();
1512        assert!(did_rotate);
1513        // Old rotated was replaced with current data
1514        assert_eq!(
1515            fs::read_to_string(&rotated).unwrap(),
1516            "current-data-that-is-large"
1517        );
1518        // Current file is now gone (rotated away)
1519        assert!(!path.exists());
1520    }
1521
1522    #[test]
1523    fn event_sink_handles_zero_max_bytes_rotation() {
1524        let tmp = tempfile::tempdir().unwrap();
1525        let path = tmp.path().join("events.jsonl");
1526
1527        // With max_bytes=0, any existing content triggers rotation, but empty file doesn't
1528        fs::write(&path, "").unwrap();
1529        let did_rotate = rotate_event_log_if_needed(&path, 0, 0).unwrap();
1530        assert!(!did_rotate); // empty file → no rotation
1531
1532        fs::write(&path, "x").unwrap();
1533        let did_rotate = rotate_event_log_if_needed(&path, 0, 0).unwrap();
1534        assert!(did_rotate); // non-empty file at 0-byte limit → rotation
1535    }
1536
1537    #[test]
1538    fn narration_rejection_event_has_correct_fields() {
1539        let event = TeamEvent::narration_rejection("eng-1-1", 42, 2);
1540        assert_eq!(event.event, "narration_rejection");
1541        assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1542        assert_eq!(event.task.as_deref(), Some("42"));
1543        assert_eq!(event.reason.as_deref(), Some("rejection_count=2"));
1544    }
1545
1546    #[test]
1547    fn read_events_partial_json_with_valid_lines_mixed() {
1548        let tmp = tempfile::tempdir().unwrap();
1549        let path = tmp.path().join("events.jsonl");
1550        // Simulate a truncated write: valid JSON, then partial, then valid
1551        let content = format!(
1552            "{}\n{{\"event\":\"trunca\n{}\n",
1553            r#"{"event":"daemon_started","ts":1}"#, r#"{"event":"daemon_stopped","ts":3}"#
1554        );
1555        fs::write(&path, content).unwrap();
1556
1557        let events = read_events(&path).unwrap();
1558        assert_eq!(events.len(), 2);
1559        assert_eq!(events[0].event, "daemon_started");
1560        assert_eq!(events[1].event, "daemon_stopped");
1561    }
1562}