Skip to main content

batty_cli/team/
events.rs

1//! Structured JSONL event stream for external consumers.
2
3use std::fs::{self, OpenOptions};
4use std::io::{BufWriter, Write};
5use std::path::{Path, PathBuf};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10
11use super::DEFAULT_EVENT_LOG_MAX_BYTES;
12
13/// Bundled parameters for merge-confidence scoring events.
14pub struct MergeConfidenceInfo<'a> {
15    pub engineer: &'a str,
16    pub task: &'a str,
17    pub confidence: f64,
18    pub files_changed: usize,
19    pub lines_changed: usize,
20    pub has_migrations: bool,
21    pub has_config_changes: bool,
22    pub rename_count: usize,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26
27pub struct TeamEvent {
28    pub event: String,
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub role: Option<String>,
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub task: Option<String>,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub recipient: Option<String>,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub from: Option<String>,
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub to: Option<String>,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub restart: Option<bool>,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub restart_count: Option<u32>,
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub load: Option<f64>,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub working_members: Option<u32>,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub total_members: Option<u32>,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub session_running: Option<bool>,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub reason: Option<String>,
53    #[serde(skip_serializing_if = "Option::is_none")]
54    pub step: Option<String>,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub error: Option<String>,
57    #[serde(skip_serializing_if = "Option::is_none")]
58    pub uptime_secs: Option<u64>,
59    #[serde(skip_serializing_if = "Option::is_none")]
60    pub session_size_bytes: Option<u64>,
61    pub ts: u64,
62}
63
64impl TeamEvent {
65    fn now() -> u64 {
66        SystemTime::now()
67            .duration_since(UNIX_EPOCH)
68            .unwrap_or_default()
69            .as_secs()
70    }
71
72    fn base(event: &str) -> Self {
73        Self {
74            event: event.into(),
75            role: None,
76            task: None,
77            recipient: None,
78            from: None,
79            to: None,
80            restart: None,
81            restart_count: None,
82            load: None,
83            working_members: None,
84            total_members: None,
85            session_running: None,
86            reason: None,
87            step: None,
88            error: None,
89            uptime_secs: None,
90            session_size_bytes: None,
91            ts: Self::now(),
92        }
93    }
94
95    pub fn daemon_started() -> Self {
96        Self::base("daemon_started")
97    }
98
99    pub fn daemon_reloading() -> Self {
100        Self::base("daemon_reloading")
101    }
102
103    pub fn daemon_reloaded() -> Self {
104        Self::base("daemon_reloaded")
105    }
106
107    pub fn daemon_stopped() -> Self {
108        Self::base("daemon_stopped")
109    }
110
111    pub fn daemon_stopped_with_reason(reason: &str, uptime_secs: u64) -> Self {
112        Self {
113            reason: Some(reason.into()),
114            uptime_secs: Some(uptime_secs),
115            ..Self::base("daemon_stopped")
116        }
117    }
118
119    pub fn daemon_heartbeat(uptime_secs: u64) -> Self {
120        Self {
121            uptime_secs: Some(uptime_secs),
122            ..Self::base("daemon_heartbeat")
123        }
124    }
125
126    pub fn context_exhausted(
127        role: &str,
128        task: Option<u32>,
129        session_size_bytes: Option<u64>,
130    ) -> Self {
131        Self {
132            role: Some(role.into()),
133            task: task.map(|task_id| task_id.to_string()),
134            session_size_bytes,
135            ..Self::base("context_exhausted")
136        }
137    }
138
139    pub fn loop_step_error(step: &str, error: &str) -> Self {
140        Self {
141            step: Some(step.into()),
142            error: Some(error.into()),
143            ..Self::base("loop_step_error")
144        }
145    }
146
147    pub fn daemon_panic(reason: &str) -> Self {
148        Self {
149            reason: Some(reason.into()),
150            ..Self::base("daemon_panic")
151        }
152    }
153
154    pub fn task_assigned(role: &str, task: &str) -> Self {
155        Self {
156            role: Some(role.into()),
157            task: Some(task.into()),
158            ..Self::base("task_assigned")
159        }
160    }
161
162    pub fn cwd_corrected(role: &str, path: &str) -> Self {
163        Self {
164            role: Some(role.into()),
165            reason: Some(path.into()),
166            ..Self::base("cwd_corrected")
167        }
168    }
169
170    pub fn review_nudge_sent(role: &str, task: &str) -> Self {
171        Self {
172            role: Some(role.into()),
173            task: Some(task.into()),
174            ..Self::base("review_nudge_sent")
175        }
176    }
177
178    pub fn review_escalated(task: &str, reason: &str) -> Self {
179        Self {
180            task: Some(task.into()),
181            reason: Some(reason.into()),
182            ..Self::base("review_escalated")
183        }
184    }
185
186    pub fn task_escalated(role: &str, task: &str, reason: Option<&str>) -> Self {
187        Self {
188            role: Some(role.into()),
189            task: Some(task.into()),
190            reason: reason.map(|r| r.into()),
191            ..Self::base("task_escalated")
192        }
193    }
194
195    pub fn task_unblocked(role: &str, task: &str) -> Self {
196        Self {
197            role: Some(role.into()),
198            task: Some(task.into()),
199            ..Self::base("task_unblocked")
200        }
201    }
202
203    pub fn performance_regression(task: &str, reason: &str) -> Self {
204        Self {
205            task: Some(task.into()),
206            reason: Some(reason.into()),
207            ..Self::base("performance_regression")
208        }
209    }
210
211    pub fn task_completed(role: &str, task: Option<&str>) -> Self {
212        Self {
213            role: Some(role.into()),
214            task: task.map(|t| t.into()),
215            ..Self::base("task_completed")
216        }
217    }
218
219    pub fn standup_generated(recipient: &str) -> Self {
220        Self {
221            recipient: Some(recipient.into()),
222            ..Self::base("standup_generated")
223        }
224    }
225
226    pub fn retro_generated() -> Self {
227        Self::base("retro_generated")
228    }
229
230    pub fn pattern_detected(pattern_type: &str, frequency: u32) -> Self {
231        Self {
232            reason: Some(format!("{pattern_type}:{frequency}")),
233            ..Self::base("pattern_detected")
234        }
235    }
236
237    pub fn member_crashed(role: &str, restart: bool) -> Self {
238        Self {
239            role: Some(role.into()),
240            restart: Some(restart),
241            ..Self::base("member_crashed")
242        }
243    }
244
245    pub fn pane_death(role: &str) -> Self {
246        Self {
247            role: Some(role.into()),
248            ..Self::base("pane_death")
249        }
250    }
251
252    pub fn pane_respawned(role: &str) -> Self {
253        Self {
254            role: Some(role.into()),
255            ..Self::base("pane_respawned")
256        }
257    }
258
259    pub fn stall_detected(role: &str, task: Option<u32>, stall_duration_secs: u64) -> Self {
260        Self {
261            role: Some(role.into()),
262            task: task.map(|id| id.to_string()),
263            uptime_secs: Some(stall_duration_secs),
264            ..Self::base("stall_detected")
265        }
266    }
267
268    /// Record a backend health state change for an agent.
269    ///
270    /// `reason` encodes the transition, e.g. "healthy→unreachable".
271    pub fn health_changed(role: &str, reason: &str) -> Self {
272        Self {
273            role: Some(role.into()),
274            reason: Some(reason.into()),
275            ..Self::base("health_changed")
276        }
277    }
278
279    pub fn message_routed(from: &str, to: &str) -> Self {
280        Self {
281            from: Some(from.into()),
282            to: Some(to.into()),
283            ..Self::base("message_routed")
284        }
285    }
286
287    pub fn agent_spawned(role: &str) -> Self {
288        Self {
289            role: Some(role.into()),
290            ..Self::base("agent_spawned")
291        }
292    }
293
294    pub fn agent_restarted(role: &str, task: &str, reason: &str, restart_count: u32) -> Self {
295        Self {
296            role: Some(role.into()),
297            task: Some(task.into()),
298            reason: Some(reason.into()),
299            restart_count: Some(restart_count),
300            ..Self::base("agent_restarted")
301        }
302    }
303
304    pub fn delivery_failed(role: &str, from: &str, reason: &str) -> Self {
305        Self {
306            role: Some(role.into()),
307            from: Some(from.into()),
308            reason: Some(reason.into()),
309            ..Self::base("delivery_failed")
310        }
311    }
312
313    pub fn task_auto_merged(
314        engineer: &str,
315        task: &str,
316        confidence: f64,
317        files_changed: usize,
318        lines_changed: usize,
319    ) -> Self {
320        Self {
321            role: Some(engineer.into()),
322            task: Some(task.into()),
323            load: Some(confidence),
324            reason: Some(format!("files={} lines={}", files_changed, lines_changed)),
325            ..Self::base("task_auto_merged")
326        }
327    }
328
329    pub fn task_manual_merged(task: &str) -> Self {
330        Self {
331            task: Some(task.into()),
332            ..Self::base("task_manual_merged")
333        }
334    }
335
336    /// Emitted for every completed task to record its merge confidence score.
337    pub fn merge_confidence_scored(info: &MergeConfidenceInfo<'_>) -> Self {
338        let detail = format!(
339            "files={} lines={} migrations={} config={} renames={}",
340            info.files_changed,
341            info.lines_changed,
342            info.has_migrations,
343            info.has_config_changes,
344            info.rename_count
345        );
346        Self {
347            role: Some(info.engineer.into()),
348            task: Some(info.task.into()),
349            load: Some(info.confidence),
350            reason: Some(detail),
351            ..Self::base("merge_confidence_scored")
352        }
353    }
354
355    pub fn review_escalated_by_role(role: &str, task: &str) -> Self {
356        Self {
357            role: Some(role.into()),
358            task: Some(task.into()),
359            ..Self::base("review_escalated")
360        }
361    }
362
363    pub fn task_reworked(role: &str, task: &str) -> Self {
364        Self {
365            role: Some(role.into()),
366            task: Some(task.into()),
367            ..Self::base("task_reworked")
368        }
369    }
370
371    pub fn task_recycled(task_id: u32, cron_expr: &str) -> Self {
372        Self {
373            task: Some(format!("#{task_id}")),
374            reason: Some(cron_expr.into()),
375            ..Self::base("task_recycled")
376        }
377    }
378
379    pub fn load_snapshot(working_members: u32, total_members: u32, session_running: bool) -> Self {
380        let load = if total_members == 0 {
381            0.0
382        } else {
383            working_members as f64 / total_members as f64
384        };
385        Self {
386            load: Some(load),
387            working_members: Some(working_members),
388            total_members: Some(total_members),
389            session_running: Some(session_running),
390            ..Self::base("load_snapshot")
391        }
392    }
393
394    pub fn worktree_reconciled(role: &str, branch: &str) -> Self {
395        Self {
396            role: Some(role.into()),
397            reason: Some(format!("branch '{branch}' merged into main")),
398            ..Self::base("worktree_reconciled")
399        }
400    }
401}
402
403pub struct EventSink {
404    writer: Box<dyn Write + Send>,
405    path: PathBuf,
406    max_bytes: Option<u64>,
407}
408
409impl EventSink {
410    pub fn new(path: &Path) -> Result<Self> {
411        Self::new_with_max_bytes(path, DEFAULT_EVENT_LOG_MAX_BYTES)
412    }
413
414    pub fn new_with_max_bytes(path: &Path, max_bytes: u64) -> Result<Self> {
415        if let Some(parent) = path.parent() {
416            std::fs::create_dir_all(parent)?;
417        }
418        rotate_event_log_if_needed(path, max_bytes, 0)?;
419        let file = OpenOptions::new()
420            .create(true)
421            .append(true)
422            .open(path)
423            .with_context(|| format!("failed to open event sink: {}", path.display()))?;
424        Ok(Self {
425            writer: Box::new(BufWriter::new(file)),
426            path: path.to_path_buf(),
427            max_bytes: Some(max_bytes),
428        })
429    }
430
431    #[cfg(test)]
432    pub(crate) fn from_writer(path: &Path, writer: impl Write + Send + 'static) -> Self {
433        Self {
434            writer: Box::new(writer),
435            path: path.to_path_buf(),
436            max_bytes: None,
437        }
438    }
439
440    pub fn emit(&mut self, event: TeamEvent) -> Result<()> {
441        let json = serde_json::to_string(&event)?;
442        self.rotate_if_needed((json.len() + 1) as u64)?;
443        writeln!(self.writer, "{json}")?;
444        self.writer.flush()?;
445        Ok(())
446    }
447
448    pub fn path(&self) -> &Path {
449        &self.path
450    }
451
452    fn rotate_if_needed(&mut self, next_entry_bytes: u64) -> Result<()> {
453        let Some(max_bytes) = self.max_bytes else {
454            return Ok(());
455        };
456        self.writer.flush()?;
457        if rotate_event_log_if_needed(&self.path, max_bytes, next_entry_bytes)? {
458            self.writer = Box::new(BufWriter::new(
459                OpenOptions::new()
460                    .create(true)
461                    .append(true)
462                    .open(&self.path)
463                    .with_context(|| {
464                        format!("failed to reopen event sink: {}", self.path.display())
465                    })?,
466            ));
467        }
468        Ok(())
469    }
470}
471
472fn rotated_event_log_path(path: &Path) -> PathBuf {
473    PathBuf::from(format!("{}.1", path.display()))
474}
475
476fn rotate_event_log_if_needed(path: &Path, max_bytes: u64, next_entry_bytes: u64) -> Result<bool> {
477    let len = match fs::metadata(path) {
478        Ok(metadata) => metadata.len(),
479        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(false),
480        Err(error) => {
481            return Err(error).with_context(|| format!("failed to stat {}", path.display()));
482        }
483    };
484
485    if len == 0 {
486        return Ok(false);
487    }
488
489    if len.saturating_add(next_entry_bytes) <= max_bytes {
490        return Ok(false);
491    }
492
493    let rotated = rotated_event_log_path(path);
494    if rotated.exists() {
495        fs::remove_file(&rotated)
496            .with_context(|| format!("failed to remove {}", rotated.display()))?;
497    }
498    fs::rename(path, &rotated).with_context(|| {
499        format!(
500            "failed to rotate event log {} to {}",
501            path.display(),
502            rotated.display()
503        )
504    })?;
505    Ok(true)
506}
507
508pub fn read_events(path: &Path) -> Result<Vec<TeamEvent>> {
509    if !path.exists() {
510        return Ok(Vec::new());
511    }
512    let content = fs::read_to_string(path).context("failed to read event log")?;
513    let mut events = Vec::new();
514    for line in content.lines() {
515        let line = line.trim();
516        if line.is_empty() {
517            continue;
518        }
519        if let Ok(event) = serde_json::from_str::<TeamEvent>(line) {
520            events.push(event);
521        }
522    }
523    Ok(events)
524}
525
526#[cfg(test)]
527mod tests {
528    use std::sync::{Arc, Mutex};
529    use std::thread;
530
531    use super::*;
532
533    #[test]
534    fn event_serializes_to_json() {
535        let event = TeamEvent::task_assigned("eng-1-1", "fix auth bug");
536        let json = serde_json::to_string(&event).unwrap();
537        assert!(json.contains("\"event\":\"task_assigned\""));
538        assert!(json.contains("\"role\":\"eng-1-1\""));
539        assert!(json.contains("\"task\":\"fix auth bug\""));
540        assert!(json.contains("\"ts\":"));
541    }
542
543    #[test]
544    fn optional_fields_omitted() {
545        let event = TeamEvent::daemon_started();
546        let json = serde_json::to_string(&event).unwrap();
547        assert!(!json.contains("\"role\""));
548        assert!(!json.contains("\"task\""));
549    }
550
551    #[test]
552    fn event_sink_writes_jsonl() {
553        let tmp = tempfile::tempdir().unwrap();
554        let path = tmp.path().join("events.jsonl");
555        let mut sink = EventSink::new(&path).unwrap();
556        sink.emit(TeamEvent::daemon_started()).unwrap();
557        sink.emit(TeamEvent::task_assigned("eng-1", "fix bug"))
558            .unwrap();
559        sink.emit(TeamEvent::daemon_stopped()).unwrap();
560
561        let content = std::fs::read_to_string(&path).unwrap();
562        let lines: Vec<&str> = content.lines().collect();
563        assert_eq!(lines.len(), 3);
564        assert!(lines[0].contains("daemon_started"));
565        assert!(lines[1].contains("task_assigned"));
566        assert!(lines[2].contains("daemon_stopped"));
567    }
568
569    #[test]
570    fn all_event_variants_serialize_with_correct_event_field() {
571        let variants: Vec<(&str, TeamEvent)> = vec![
572            ("daemon_started", TeamEvent::daemon_started()),
573            ("daemon_reloading", TeamEvent::daemon_reloading()),
574            ("daemon_reloaded", TeamEvent::daemon_reloaded()),
575            ("daemon_stopped", TeamEvent::daemon_stopped()),
576            (
577                "daemon_stopped",
578                TeamEvent::daemon_stopped_with_reason("signal", 3600),
579            ),
580            ("daemon_heartbeat", TeamEvent::daemon_heartbeat(120)),
581            (
582                "context_exhausted",
583                TeamEvent::context_exhausted("eng-1", Some(42), Some(1_024)),
584            ),
585            (
586                "loop_step_error",
587                TeamEvent::loop_step_error("poll_watchers", "tmux error"),
588            ),
589            (
590                "daemon_panic",
591                TeamEvent::daemon_panic("index out of bounds"),
592            ),
593            ("task_assigned", TeamEvent::task_assigned("eng-1", "task")),
594            (
595                "cwd_corrected",
596                TeamEvent::cwd_corrected("eng-1", "/tmp/worktree"),
597            ),
598            (
599                "task_escalated",
600                TeamEvent::task_escalated("eng-1", "task", None),
601            ),
602            ("task_unblocked", TeamEvent::task_unblocked("eng-1", "task")),
603            (
604                "performance_regression",
605                TeamEvent::performance_regression("42", "runtime_ms=1300 avg_ms=1000 pct=30"),
606            ),
607            (
608                "task_completed",
609                TeamEvent::task_completed("eng-1", Some("42")),
610            ),
611            ("standup_generated", TeamEvent::standup_generated("manager")),
612            ("retro_generated", TeamEvent::retro_generated()),
613            (
614                "pattern_detected",
615                TeamEvent::pattern_detected("merge_conflict_recurrence", 5),
616            ),
617            ("member_crashed", TeamEvent::member_crashed("eng-1", true)),
618            ("pane_death", TeamEvent::pane_death("eng-1")),
619            ("pane_respawned", TeamEvent::pane_respawned("eng-1")),
620            ("message_routed", TeamEvent::message_routed("a", "b")),
621            ("agent_spawned", TeamEvent::agent_spawned("eng-1")),
622            (
623                "agent_restarted",
624                TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 1),
625            ),
626            (
627                "delivery_failed",
628                TeamEvent::delivery_failed("eng-1", "manager", "message marker missing"),
629            ),
630            (
631                "task_auto_merged",
632                TeamEvent::task_auto_merged("eng-1", "42", 0.95, 2, 30),
633            ),
634            ("task_manual_merged", TeamEvent::task_manual_merged("42")),
635            (
636                "merge_confidence_scored",
637                TeamEvent::merge_confidence_scored(&MergeConfidenceInfo {
638                    engineer: "eng-1",
639                    task: "42",
640                    confidence: 0.85,
641                    files_changed: 3,
642                    lines_changed: 50,
643                    has_migrations: false,
644                    has_config_changes: false,
645                    rename_count: 0,
646                }),
647            ),
648            (
649                "review_nudge_sent",
650                TeamEvent::review_nudge_sent("manager", "42"),
651            ),
652            (
653                "review_escalated",
654                TeamEvent::review_escalated("42", "stale review"),
655            ),
656            ("task_reworked", TeamEvent::task_reworked("eng-1", "42")),
657            ("load_snapshot", TeamEvent::load_snapshot(2, 5, true)),
658            (
659                "worktree_reconciled",
660                TeamEvent::worktree_reconciled("eng-1", "eng-1/42"),
661            ),
662        ];
663        for (expected_event, event) in &variants {
664            let json = serde_json::to_string(event).unwrap();
665            let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
666            assert_eq!(parsed["event"].as_str().unwrap(), *expected_event);
667            assert!(parsed["ts"].as_u64().is_some());
668        }
669    }
670
671    #[test]
672    fn load_snapshot_serializes_optional_metrics() {
673        let event = TeamEvent::load_snapshot(3, 7, false);
674        let json = serde_json::to_string(&event).unwrap();
675        assert!(json.contains("\"event\":\"load_snapshot\""));
676        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
677        assert_eq!(parsed["load"].as_f64().unwrap(), 3.0 / 7.0);
678        assert_eq!(parsed["working_members"].as_u64().unwrap(), 3);
679        assert_eq!(parsed["total_members"].as_u64().unwrap(), 7);
680        assert!(!parsed["session_running"].as_bool().unwrap());
681    }
682
683    #[test]
684    fn read_events_parses_all_known_lines() {
685        let tmp = tempfile::tempdir().unwrap();
686        let path = tmp.path().join("events.jsonl");
687        let mut sink = EventSink::new(&path).unwrap();
688        sink.emit(TeamEvent::daemon_started()).unwrap();
689        sink.emit(TeamEvent::load_snapshot(1, 4, true)).unwrap();
690        sink.emit(TeamEvent::load_snapshot(2, 4, true)).unwrap();
691
692        let events = read_events(&path).unwrap();
693        assert_eq!(events.len(), 3);
694        assert_eq!(events[1].event, "load_snapshot");
695        assert_eq!(events[1].working_members, Some(1));
696        assert_eq!(events[2].total_members, Some(4));
697    }
698
699    #[test]
700    fn event_sink_appends_to_existing_file() {
701        let tmp = tempfile::tempdir().unwrap();
702        let path = tmp.path().join("events.jsonl");
703
704        // Write one event and close the sink
705        {
706            let mut sink = EventSink::new(&path).unwrap();
707            sink.emit(TeamEvent::daemon_started()).unwrap();
708        }
709
710        // Open again and append another
711        {
712            let mut sink = EventSink::new(&path).unwrap();
713            sink.emit(TeamEvent::daemon_stopped()).unwrap();
714        }
715
716        let content = std::fs::read_to_string(&path).unwrap();
717        let lines: Vec<&str> = content.lines().collect();
718        assert_eq!(lines.len(), 2);
719        assert!(lines[0].contains("daemon_started"));
720        assert!(lines[1].contains("daemon_stopped"));
721    }
722
723    #[test]
724    fn event_with_special_characters_in_task() {
725        let event = TeamEvent::task_assigned("eng-1", "fix: \"quotes\" & <angles> / \\ newline\n");
726        let json = serde_json::to_string(&event).unwrap();
727        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
728        let task_val = parsed["task"].as_str().unwrap();
729        assert!(task_val.contains("\"quotes\""));
730        assert!(task_val.contains("<angles>"));
731    }
732
733    #[test]
734    fn task_escalated_serializes_role_and_task() {
735        let event = TeamEvent::task_escalated("eng-1-1", "42", Some("tests_failed"));
736        let json = serde_json::to_string(&event).unwrap();
737        assert!(json.contains("\"event\":\"task_escalated\""));
738        assert!(json.contains("\"role\":\"eng-1-1\""));
739        assert!(json.contains("\"task\":\"42\""));
740        assert!(json.contains("\"reason\":\"tests_failed\""));
741    }
742
743    #[test]
744    fn cwd_corrected_serializes_role_and_reason() {
745        let event = TeamEvent::cwd_corrected("eng-1-1", "/tmp/worktree");
746        let json = serde_json::to_string(&event).unwrap();
747        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
748        assert_eq!(parsed["event"].as_str().unwrap(), "cwd_corrected");
749        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
750        assert_eq!(parsed["reason"].as_str().unwrap(), "/tmp/worktree");
751    }
752
753    #[test]
754    fn pane_death_serializes_role() {
755        let event = TeamEvent::pane_death("eng-1-1");
756        let json = serde_json::to_string(&event).unwrap();
757        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
758        assert_eq!(parsed["event"].as_str().unwrap(), "pane_death");
759        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
760    }
761
762    #[test]
763    fn pane_respawned_serializes_role() {
764        let event = TeamEvent::pane_respawned("eng-1-1");
765        let json = serde_json::to_string(&event).unwrap();
766        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
767        assert_eq!(parsed["event"].as_str().unwrap(), "pane_respawned");
768        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
769    }
770
771    #[test]
772    fn agent_restarted_includes_reason_task_and_count() {
773        let event = TeamEvent::agent_restarted("eng-1-2", "67", "context_exhausted", 2);
774        let json = serde_json::to_string(&event).unwrap();
775        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
776        assert_eq!(parsed["event"].as_str().unwrap(), "agent_restarted");
777        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
778        assert_eq!(parsed["task"].as_str().unwrap(), "67");
779        assert_eq!(parsed["reason"].as_str().unwrap(), "context_exhausted");
780        assert_eq!(parsed["restart_count"].as_u64().unwrap(), 2);
781    }
782
783    #[test]
784    fn delivery_failed_includes_role_sender_and_reason() {
785        let event = TeamEvent::delivery_failed("eng-1-2", "manager", "message marker missing");
786        let json = serde_json::to_string(&event).unwrap();
787        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
788        assert_eq!(parsed["event"].as_str().unwrap(), "delivery_failed");
789        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
790        assert_eq!(parsed["from"].as_str().unwrap(), "manager");
791        assert_eq!(parsed["reason"].as_str().unwrap(), "message marker missing");
792    }
793
794    #[test]
795    fn task_unblocked_serializes_role_and_task() {
796        let event = TeamEvent::task_unblocked("eng-1-1", "42");
797        let json = serde_json::to_string(&event).unwrap();
798        assert!(json.contains("\"event\":\"task_unblocked\""));
799        assert!(json.contains("\"role\":\"eng-1-1\""));
800        assert!(json.contains("\"task\":\"42\""));
801    }
802
803    #[test]
804    fn merge_confidence_scored_includes_all_fields() {
805        let event = TeamEvent::merge_confidence_scored(&MergeConfidenceInfo {
806            engineer: "eng-1-1",
807            task: "42",
808            confidence: 0.85,
809            files_changed: 3,
810            lines_changed: 50,
811            has_migrations: true,
812            has_config_changes: false,
813            rename_count: 1,
814        });
815        let json = serde_json::to_string(&event).unwrap();
816        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
817        assert_eq!(parsed["event"].as_str().unwrap(), "merge_confidence_scored");
818        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
819        assert_eq!(parsed["task"].as_str().unwrap(), "42");
820        assert!((parsed["load"].as_f64().unwrap() - 0.85).abs() < 0.001);
821        let reason = parsed["reason"].as_str().unwrap();
822        assert!(reason.contains("files=3"));
823        assert!(reason.contains("lines=50"));
824        assert!(reason.contains("migrations=true"));
825        assert!(reason.contains("config=false"));
826        assert!(reason.contains("renames=1"));
827    }
828
829    #[test]
830    fn performance_regression_serializes_task_and_reason() {
831        let event = TeamEvent::performance_regression("42", "runtime_ms=1300 avg_ms=1000 pct=30");
832        let json = serde_json::to_string(&event).unwrap();
833        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
834        assert_eq!(parsed["event"].as_str().unwrap(), "performance_regression");
835        assert_eq!(parsed["task"].as_str().unwrap(), "42");
836        assert_eq!(
837            parsed["reason"].as_str().unwrap(),
838            "runtime_ms=1300 avg_ms=1000 pct=30"
839        );
840    }
841
842    #[test]
843    fn daemon_stopped_with_reason_includes_fields() {
844        let event = TeamEvent::daemon_stopped_with_reason("signal", 7200);
845        let json = serde_json::to_string(&event).unwrap();
846        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
847        assert_eq!(parsed["reason"].as_str().unwrap(), "signal");
848        assert_eq!(parsed["uptime_secs"].as_u64().unwrap(), 7200);
849    }
850
851    #[test]
852    fn heartbeat_includes_uptime() {
853        let event = TeamEvent::daemon_heartbeat(600);
854        let json = serde_json::to_string(&event).unwrap();
855        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
856        assert_eq!(parsed["event"].as_str().unwrap(), "daemon_heartbeat");
857        assert_eq!(parsed["uptime_secs"].as_u64().unwrap(), 600);
858        // No reason/step/error fields
859        assert!(parsed.get("reason").is_none());
860        assert!(parsed.get("step").is_none());
861    }
862
863    #[test]
864    fn context_exhausted_includes_role_task_and_session_size() {
865        let event = TeamEvent::context_exhausted("eng-1", Some(77), Some(4096));
866        let json = serde_json::to_string(&event).unwrap();
867        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
868        assert_eq!(parsed["event"].as_str().unwrap(), "context_exhausted");
869        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1");
870        assert_eq!(parsed["task"].as_str().unwrap(), "77");
871        assert_eq!(parsed["session_size_bytes"].as_u64().unwrap(), 4096);
872    }
873
874    #[test]
875    fn loop_step_error_includes_step_and_error() {
876        let event = TeamEvent::loop_step_error("poll_watchers", "connection refused");
877        let json = serde_json::to_string(&event).unwrap();
878        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
879        assert_eq!(parsed["step"].as_str().unwrap(), "poll_watchers");
880        assert_eq!(parsed["error"].as_str().unwrap(), "connection refused");
881    }
882
883    #[test]
884    fn daemon_panic_includes_reason() {
885        let event = TeamEvent::daemon_panic("index out of bounds");
886        let json = serde_json::to_string(&event).unwrap();
887        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
888        assert_eq!(parsed["event"].as_str().unwrap(), "daemon_panic");
889        assert_eq!(parsed["reason"].as_str().unwrap(), "index out of bounds");
890    }
891
892    #[test]
893    fn new_fields_omitted_from_basic_events() {
894        let event = TeamEvent::daemon_started();
895        let json = serde_json::to_string(&event).unwrap();
896        assert!(!json.contains("\"reason\""));
897        assert!(!json.contains("\"step\""));
898        assert!(!json.contains("\"error\""));
899        assert!(!json.contains("\"uptime_secs\""));
900        assert!(!json.contains("\"restart_count\""));
901    }
902
903    #[test]
904    fn pattern_detected_includes_reason_payload() {
905        let event = TeamEvent::pattern_detected("escalation_cluster", 6);
906        let json = serde_json::to_string(&event).unwrap();
907        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
908        assert_eq!(parsed["event"].as_str().unwrap(), "pattern_detected");
909        assert_eq!(parsed["reason"].as_str().unwrap(), "escalation_cluster:6");
910    }
911
912    #[test]
913    fn event_sink_creates_parent_directories() {
914        let tmp = tempfile::tempdir().unwrap();
915        let path = tmp.path().join("deep").join("nested").join("events.jsonl");
916        let mut sink = EventSink::new(&path).unwrap();
917        sink.emit(TeamEvent::daemon_started()).unwrap();
918        assert!(path.exists());
919        assert_eq!(sink.path(), path);
920    }
921
922    #[test]
923    fn event_sink_rotates_oversized_log_on_open() {
924        let tmp = tempfile::tempdir().unwrap();
925        let path = tmp.path().join("events.jsonl");
926        fs::write(&path, "0123456789").unwrap();
927
928        let mut sink = EventSink::new_with_max_bytes(&path, 5).unwrap();
929        sink.emit(TeamEvent::daemon_started()).unwrap();
930
931        let rotated = rotated_event_log_path(&path);
932        assert_eq!(fs::read_to_string(&rotated).unwrap(), "0123456789");
933        let current = fs::read_to_string(&path).unwrap();
934        assert!(current.contains("daemon_started"));
935    }
936
937    #[test]
938    fn event_sink_rotates_before_write_that_would_exceed_threshold() {
939        let tmp = tempfile::tempdir().unwrap();
940        let path = tmp.path().join("events.jsonl");
941        let first_line = "{\"event\":\"first\"}\n";
942        fs::write(&path, first_line).unwrap();
943
944        let mut sink = EventSink::new_with_max_bytes(&path, first_line.len() as u64 + 10).unwrap();
945        sink.emit(TeamEvent::task_assigned(
946            "eng-1",
947            "this assignment is long enough to rotate",
948        ))
949        .unwrap();
950
951        let rotated = rotated_event_log_path(&path);
952        assert_eq!(fs::read_to_string(&rotated).unwrap(), first_line);
953        let current = fs::read_to_string(&path).unwrap();
954        assert!(current.contains("task_assigned"));
955        assert!(!current.contains("\"event\":\"first\""));
956    }
957
958    #[test]
959    fn event_round_trip_preserves_fields_for_agent_restarted() {
960        let original = TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 3);
961
962        let json = serde_json::to_string(&original).unwrap();
963        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
964
965        assert_eq!(parsed.event, "agent_restarted");
966        assert_eq!(parsed.role.as_deref(), Some("eng-1"));
967        assert_eq!(parsed.task.as_deref(), Some("42"));
968        assert_eq!(parsed.reason.as_deref(), Some("context_exhausted"));
969        assert_eq!(parsed.restart_count, Some(3));
970        assert_eq!(parsed.ts, original.ts);
971    }
972
973    #[test]
974    fn event_round_trip_preserves_fields_for_load_snapshot() {
975        let original = TeamEvent::load_snapshot(4, 8, true);
976
977        let json = serde_json::to_string(&original).unwrap();
978        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
979
980        assert_eq!(parsed.event, "load_snapshot");
981        assert_eq!(parsed.working_members, Some(4));
982        assert_eq!(parsed.total_members, Some(8));
983        assert_eq!(parsed.session_running, Some(true));
984        assert_eq!(parsed.load, Some(0.5));
985        assert_eq!(parsed.ts, original.ts);
986    }
987
988    #[test]
989    fn event_round_trip_preserves_fields_for_delivery_failed() {
990        let original = TeamEvent::delivery_failed("eng-2", "manager", "marker missing");
991
992        let json = serde_json::to_string(&original).unwrap();
993        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
994
995        assert_eq!(parsed.event, "delivery_failed");
996        assert_eq!(parsed.role.as_deref(), Some("eng-2"));
997        assert_eq!(parsed.from.as_deref(), Some("manager"));
998        assert_eq!(parsed.reason.as_deref(), Some("marker missing"));
999        assert_eq!(parsed.ts, original.ts);
1000    }
1001
1002    #[test]
1003    fn read_events_skips_blank_and_malformed_lines() {
1004        let tmp = tempfile::tempdir().unwrap();
1005        let path = tmp.path().join("events.jsonl");
1006        fs::write(
1007            &path,
1008            [
1009                "",
1010                "{\"event\":\"daemon_started\",\"ts\":1}",
1011                "not-json",
1012                "   ",
1013                "{\"event\":\"daemon_stopped\",\"ts\":2}",
1014            ]
1015            .join("\n"),
1016        )
1017        .unwrap();
1018
1019        let events = read_events(&path).unwrap();
1020
1021        assert_eq!(events.len(), 2);
1022        assert_eq!(events[0].event, "daemon_started");
1023        assert_eq!(events[1].event, "daemon_stopped");
1024    }
1025
1026    #[test]
1027    fn rotate_event_log_if_needed_returns_false_for_missing_file() {
1028        let tmp = tempfile::tempdir().unwrap();
1029        let path = tmp.path().join("events.jsonl");
1030
1031        let rotated = rotate_event_log_if_needed(&path, 128, 0).unwrap();
1032
1033        assert!(!rotated);
1034        assert!(!rotated_event_log_path(&path).exists());
1035    }
1036
1037    #[test]
1038    fn rotate_event_log_if_needed_returns_false_for_empty_file() {
1039        let tmp = tempfile::tempdir().unwrap();
1040        let path = tmp.path().join("events.jsonl");
1041        fs::write(&path, "").unwrap();
1042
1043        let rotated = rotate_event_log_if_needed(&path, 1, 1).unwrap();
1044
1045        assert!(!rotated);
1046        assert!(path.exists());
1047        assert!(!rotated_event_log_path(&path).exists());
1048    }
1049
1050    #[test]
1051    fn rotate_event_log_if_needed_replaces_existing_rotated_file() {
1052        let tmp = tempfile::tempdir().unwrap();
1053        let path = tmp.path().join("events.jsonl");
1054        let rotated_path = rotated_event_log_path(&path);
1055        fs::write(&path, "current-events").unwrap();
1056        fs::write(&rotated_path, "old-rotated-events").unwrap();
1057
1058        let rotated = rotate_event_log_if_needed(&path, 5, 0).unwrap();
1059
1060        assert!(rotated);
1061        assert_eq!(fs::read_to_string(&rotated_path).unwrap(), "current-events");
1062    }
1063
1064    #[test]
1065    fn concurrent_event_sinks_append_without_losing_lines() {
1066        let tmp = tempfile::tempdir().unwrap();
1067        let path = Arc::new(tmp.path().join("events.jsonl"));
1068        let ready = Arc::new(std::sync::Barrier::new(5));
1069        let errors = Arc::new(Mutex::new(Vec::<String>::new()));
1070        let mut handles = Vec::new();
1071
1072        for idx in 0..4 {
1073            let path = Arc::clone(&path);
1074            let ready = Arc::clone(&ready);
1075            let errors = Arc::clone(&errors);
1076            handles.push(thread::spawn(move || {
1077                ready.wait();
1078                let result = (|| -> Result<()> {
1079                    let mut sink = EventSink::new(&path)?;
1080                    sink.emit(TeamEvent::task_assigned(
1081                        &format!("eng-{idx}"),
1082                        &format!("task-{idx}"),
1083                    ))?;
1084                    Ok(())
1085                })();
1086                if let Err(error) = result {
1087                    errors.lock().unwrap().push(error.to_string());
1088                }
1089            }));
1090        }
1091
1092        ready.wait();
1093        for handle in handles {
1094            handle.join().unwrap();
1095        }
1096
1097        assert!(errors.lock().unwrap().is_empty());
1098        let events = read_events(&path).unwrap();
1099        assert_eq!(events.len(), 4);
1100        for idx in 0..4 {
1101            assert!(
1102                events
1103                    .iter()
1104                    .any(|event| event.role.as_deref() == Some(&format!("eng-{idx}")))
1105            );
1106        }
1107    }
1108
1109    #[test]
1110    fn read_events_handles_large_log_file() {
1111        let tmp = tempfile::tempdir().unwrap();
1112        let path = tmp.path().join("events.jsonl");
1113        let mut sink = EventSink::new(&path).unwrap();
1114
1115        for idx in 0..512 {
1116            sink.emit(TeamEvent::task_assigned(
1117                &format!("eng-{idx}"),
1118                &"x".repeat(128),
1119            ))
1120            .unwrap();
1121        }
1122
1123        let events = read_events(&path).unwrap();
1124
1125        assert_eq!(events.len(), 512);
1126        assert_eq!(events.first().unwrap().event, "task_assigned");
1127        assert_eq!(events.last().unwrap().event, "task_assigned");
1128    }
1129
1130    fn production_unwrap_expect_count(source: &str) -> usize {
1131        let prod = if let Some(pos) = source.find("\n#[cfg(test)]\nmod tests") {
1132            &source[..pos]
1133        } else {
1134            source
1135        };
1136        prod.lines()
1137            .filter(|line| {
1138                let trimmed = line.trim();
1139                !trimmed.starts_with("#[cfg(test)]")
1140                    && (trimmed.contains(".unwrap(") || trimmed.contains(".expect("))
1141            })
1142            .count()
1143    }
1144
1145    #[test]
1146    fn task_completed_includes_task_id() {
1147        let event = TeamEvent::task_completed("eng-1", Some("42"));
1148        let json = serde_json::to_string(&event).unwrap();
1149        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1150        assert_eq!(parsed["event"].as_str().unwrap(), "task_completed");
1151        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1");
1152        assert_eq!(parsed["task"].as_str().unwrap(), "42");
1153    }
1154
1155    #[test]
1156    fn task_completed_without_task_id_omits_task_field() {
1157        let event = TeamEvent::task_completed("eng-1", None);
1158        let json = serde_json::to_string(&event).unwrap();
1159        assert!(json.contains("\"event\":\"task_completed\""));
1160        assert!(json.contains("\"role\":\"eng-1\""));
1161        assert!(!json.contains("\"task\""));
1162    }
1163
1164    #[test]
1165    fn task_escalated_without_reason_omits_reason_field() {
1166        let event = TeamEvent::task_escalated("eng-1", "42", None);
1167        let json = serde_json::to_string(&event).unwrap();
1168        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1169        assert_eq!(parsed["event"].as_str().unwrap(), "task_escalated");
1170        assert_eq!(parsed["task"].as_str().unwrap(), "42");
1171        assert!(parsed.get("reason").is_none());
1172    }
1173
1174    #[test]
1175    fn task_escalated_with_reason_includes_reason_field() {
1176        let event = TeamEvent::task_escalated("eng-1", "42", Some("merge_conflict"));
1177        let json = serde_json::to_string(&event).unwrap();
1178        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1179        assert_eq!(parsed["event"].as_str().unwrap(), "task_escalated");
1180        assert_eq!(parsed["task"].as_str().unwrap(), "42");
1181        assert_eq!(parsed["reason"].as_str().unwrap(), "merge_conflict");
1182    }
1183
1184    #[test]
1185    fn task_completed_round_trip_preserves_task_id() {
1186        let original = TeamEvent::task_completed("eng-1", Some("99"));
1187        let json = serde_json::to_string(&original).unwrap();
1188        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1189        assert_eq!(parsed.event, "task_completed");
1190        assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1191        assert_eq!(parsed.task.as_deref(), Some("99"));
1192    }
1193
1194    #[test]
1195    fn task_escalated_round_trip_preserves_reason() {
1196        let original = TeamEvent::task_escalated("eng-1", "42", Some("context_exhausted"));
1197        let json = serde_json::to_string(&original).unwrap();
1198        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1199        assert_eq!(parsed.event, "task_escalated");
1200        assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1201        assert_eq!(parsed.task.as_deref(), Some("42"));
1202        assert_eq!(parsed.reason.as_deref(), Some("context_exhausted"));
1203    }
1204
1205    #[test]
1206    fn production_events_has_no_unwrap_or_expect_calls() {
1207        let src = include_str!("events.rs");
1208        assert_eq!(
1209            production_unwrap_expect_count(src),
1210            0,
1211            "production events.rs should avoid unwrap/expect"
1212        );
1213    }
1214
1215    #[test]
1216    fn stall_detected_event_fields() {
1217        let event = TeamEvent::stall_detected("eng-1-1", Some(42), 300);
1218        assert_eq!(event.event, "stall_detected");
1219        assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1220        assert_eq!(event.task.as_deref(), Some("42"));
1221        assert_eq!(event.uptime_secs, Some(300));
1222    }
1223
1224    #[test]
1225    fn stall_detected_event_without_task() {
1226        let event = TeamEvent::stall_detected("eng-1-1", None, 600);
1227        assert_eq!(event.event, "stall_detected");
1228        assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1229        assert!(event.task.is_none());
1230        assert_eq!(event.uptime_secs, Some(600));
1231    }
1232
1233    #[test]
1234    fn stall_detected_event_serializes_to_jsonl() {
1235        let event = TeamEvent::stall_detected("eng-1-1", Some(42), 300);
1236        let json = serde_json::to_string(&event).unwrap();
1237        assert!(json.contains("\"stall_detected\""));
1238        assert!(json.contains("\"eng-1-1\""));
1239        assert!(json.contains("\"42\""));
1240    }
1241
1242    #[test]
1243    fn health_changed_event_fields() {
1244        let event = TeamEvent::health_changed("eng-1-1", "healthy→unreachable");
1245        assert_eq!(event.event, "health_changed");
1246        assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1247        assert_eq!(event.reason.as_deref(), Some("healthy→unreachable"));
1248    }
1249
1250    #[test]
1251    fn health_changed_event_serializes_to_jsonl() {
1252        let event = TeamEvent::health_changed("eng-1-2", "unreachable→healthy");
1253        let json = serde_json::to_string(&event).unwrap();
1254        assert!(json.contains("\"health_changed\""));
1255        assert!(json.contains("\"eng-1-2\""));
1256    }
1257
1258    // --- Error path and recovery tests (Task #265) ---
1259
1260    #[test]
1261    fn event_sink_on_readonly_dir_returns_error() {
1262        #[cfg(unix)]
1263        {
1264            use std::os::unix::fs::PermissionsExt;
1265            let tmp = tempfile::tempdir().unwrap();
1266            let readonly_dir = tmp.path().join("readonly");
1267            fs::create_dir(&readonly_dir).unwrap();
1268            fs::set_permissions(&readonly_dir, fs::Permissions::from_mode(0o444)).unwrap();
1269
1270            let path = readonly_dir.join("subdir").join("events.jsonl");
1271            let result = EventSink::new(&path);
1272            assert!(result.is_err());
1273
1274            // Restore permissions for cleanup
1275            fs::set_permissions(&readonly_dir, fs::Permissions::from_mode(0o755)).unwrap();
1276        }
1277    }
1278
1279    #[test]
1280    fn read_events_from_nonexistent_file_returns_empty() {
1281        let tmp = tempfile::tempdir().unwrap();
1282        let path = tmp.path().join("does_not_exist.jsonl");
1283        let events = read_events(&path).unwrap();
1284        assert!(events.is_empty());
1285    }
1286
1287    #[test]
1288    fn read_events_all_malformed_lines_returns_empty() {
1289        let tmp = tempfile::tempdir().unwrap();
1290        let path = tmp.path().join("events.jsonl");
1291        fs::write(&path, "not json\nalso not json\n{invalid}\n").unwrap();
1292        let events = read_events(&path).unwrap();
1293        assert!(events.is_empty());
1294    }
1295
1296    #[test]
1297    fn event_sink_emit_with_failing_writer() {
1298        struct FailWriter;
1299        impl Write for FailWriter {
1300            fn write(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
1301                Err(std::io::Error::new(
1302                    std::io::ErrorKind::BrokenPipe,
1303                    "simulated write failure",
1304                ))
1305            }
1306            fn flush(&mut self) -> std::io::Result<()> {
1307                Err(std::io::Error::new(
1308                    std::io::ErrorKind::BrokenPipe,
1309                    "simulated flush failure",
1310                ))
1311            }
1312        }
1313
1314        let tmp = tempfile::tempdir().unwrap();
1315        let path = tmp.path().join("events.jsonl");
1316        let mut sink = EventSink::from_writer(&path, FailWriter);
1317        let result = sink.emit(TeamEvent::daemon_started());
1318        assert!(result.is_err());
1319    }
1320
1321    #[test]
1322    fn rotate_event_log_replaces_stale_rotated_file() {
1323        let tmp = tempfile::tempdir().unwrap();
1324        let path = tmp.path().join("events.jsonl");
1325        let rotated = rotated_event_log_path(&path);
1326
1327        fs::write(&path, "current-data-that-is-large").unwrap();
1328        fs::write(&rotated, "old-rotated-data").unwrap();
1329
1330        let did_rotate = rotate_event_log_if_needed(&path, 5, 0).unwrap();
1331        assert!(did_rotate);
1332        // Old rotated was replaced with current data
1333        assert_eq!(
1334            fs::read_to_string(&rotated).unwrap(),
1335            "current-data-that-is-large"
1336        );
1337        // Current file is now gone (rotated away)
1338        assert!(!path.exists());
1339    }
1340
1341    #[test]
1342    fn event_sink_handles_zero_max_bytes_rotation() {
1343        let tmp = tempfile::tempdir().unwrap();
1344        let path = tmp.path().join("events.jsonl");
1345
1346        // With max_bytes=0, any existing content triggers rotation, but empty file doesn't
1347        fs::write(&path, "").unwrap();
1348        let did_rotate = rotate_event_log_if_needed(&path, 0, 0).unwrap();
1349        assert!(!did_rotate); // empty file → no rotation
1350
1351        fs::write(&path, "x").unwrap();
1352        let did_rotate = rotate_event_log_if_needed(&path, 0, 0).unwrap();
1353        assert!(did_rotate); // non-empty file at 0-byte limit → rotation
1354    }
1355
1356    #[test]
1357    fn read_events_partial_json_with_valid_lines_mixed() {
1358        let tmp = tempfile::tempdir().unwrap();
1359        let path = tmp.path().join("events.jsonl");
1360        // Simulate a truncated write: valid JSON, then partial, then valid
1361        let content = format!(
1362            "{}\n{{\"event\":\"trunca\n{}\n",
1363            r#"{"event":"daemon_started","ts":1}"#, r#"{"event":"daemon_stopped","ts":3}"#
1364        );
1365        fs::write(&path, content).unwrap();
1366
1367        let events = read_events(&path).unwrap();
1368        assert_eq!(events.len(), 2);
1369        assert_eq!(events[0].event, "daemon_started");
1370        assert_eq!(events[1].event, "daemon_stopped");
1371    }
1372}