Skip to main content

batty_cli/team/
events.rs

1//! Structured JSONL event stream for external consumers.
2
3use std::fs::{self, OpenOptions};
4use std::io::{BufWriter, Write};
5use std::path::{Path, PathBuf};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10
11use super::DEFAULT_EVENT_LOG_MAX_BYTES;
12
13/// Bundled parameters for merge-confidence scoring events.
14pub struct MergeConfidenceInfo<'a> {
15    pub engineer: &'a str,
16    pub task: &'a str,
17    pub confidence: f64,
18    pub files_changed: usize,
19    pub lines_changed: usize,
20    pub has_migrations: bool,
21    pub has_config_changes: bool,
22    pub rename_count: usize,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26
27pub struct TeamEvent {
28    pub event: String,
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub role: Option<String>,
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub task: Option<String>,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub recipient: Option<String>,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub from: Option<String>,
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub to: Option<String>,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub restart: Option<bool>,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub restart_count: Option<u32>,
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub load: Option<f64>,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub working_members: Option<u32>,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub total_members: Option<u32>,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub session_running: Option<bool>,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub reason: Option<String>,
53    #[serde(skip_serializing_if = "Option::is_none")]
54    pub step: Option<String>,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub error: Option<String>,
57    #[serde(skip_serializing_if = "Option::is_none")]
58    pub uptime_secs: Option<u64>,
59    #[serde(skip_serializing_if = "Option::is_none")]
60    pub session_size_bytes: Option<u64>,
61    pub ts: u64,
62}
63
64impl TeamEvent {
65    fn now() -> u64 {
66        SystemTime::now()
67            .duration_since(UNIX_EPOCH)
68            .unwrap_or_default()
69            .as_secs()
70    }
71
72    fn base(event: &str) -> Self {
73        Self {
74            event: event.into(),
75            role: None,
76            task: None,
77            recipient: None,
78            from: None,
79            to: None,
80            restart: None,
81            restart_count: None,
82            load: None,
83            working_members: None,
84            total_members: None,
85            session_running: None,
86            reason: None,
87            step: None,
88            error: None,
89            uptime_secs: None,
90            session_size_bytes: None,
91            ts: Self::now(),
92        }
93    }
94
95    pub fn daemon_started() -> Self {
96        Self::base("daemon_started")
97    }
98
99    pub fn daemon_reloading() -> Self {
100        Self::base("daemon_reloading")
101    }
102
103    pub fn daemon_reloaded() -> Self {
104        Self::base("daemon_reloaded")
105    }
106
107    #[allow(dead_code)]
108    pub fn daemon_stopped() -> Self {
109        Self::base("daemon_stopped")
110    }
111
112    pub fn daemon_stopped_with_reason(reason: &str, uptime_secs: u64) -> Self {
113        Self {
114            reason: Some(reason.into()),
115            uptime_secs: Some(uptime_secs),
116            ..Self::base("daemon_stopped")
117        }
118    }
119
120    pub fn daemon_heartbeat(uptime_secs: u64) -> Self {
121        Self {
122            uptime_secs: Some(uptime_secs),
123            ..Self::base("daemon_heartbeat")
124        }
125    }
126
127    pub fn context_exhausted(
128        role: &str,
129        task: Option<u32>,
130        session_size_bytes: Option<u64>,
131    ) -> Self {
132        Self {
133            role: Some(role.into()),
134            task: task.map(|task_id| task_id.to_string()),
135            session_size_bytes,
136            ..Self::base("context_exhausted")
137        }
138    }
139
140    pub fn loop_step_error(step: &str, error: &str) -> Self {
141        Self {
142            step: Some(step.into()),
143            error: Some(error.into()),
144            ..Self::base("loop_step_error")
145        }
146    }
147
148    pub fn daemon_panic(reason: &str) -> Self {
149        Self {
150            reason: Some(reason.into()),
151            ..Self::base("daemon_panic")
152        }
153    }
154
155    pub fn task_assigned(role: &str, task: &str) -> Self {
156        Self {
157            role: Some(role.into()),
158            task: Some(task.into()),
159            ..Self::base("task_assigned")
160        }
161    }
162
163    pub fn cwd_corrected(role: &str, path: &str) -> Self {
164        Self {
165            role: Some(role.into()),
166            reason: Some(path.into()),
167            ..Self::base("cwd_corrected")
168        }
169    }
170
171    pub fn review_nudge_sent(role: &str, task: &str) -> Self {
172        Self {
173            role: Some(role.into()),
174            task: Some(task.into()),
175            ..Self::base("review_nudge_sent")
176        }
177    }
178
179    pub fn review_escalated(task: &str, reason: &str) -> Self {
180        Self {
181            task: Some(task.into()),
182            reason: Some(reason.into()),
183            ..Self::base("review_escalated")
184        }
185    }
186
187    pub fn task_escalated(role: &str, task: &str, reason: Option<&str>) -> Self {
188        Self {
189            role: Some(role.into()),
190            task: Some(task.into()),
191            reason: reason.map(|r| r.into()),
192            ..Self::base("task_escalated")
193        }
194    }
195
196    pub fn task_unblocked(role: &str, task: &str) -> Self {
197        Self {
198            role: Some(role.into()),
199            task: Some(task.into()),
200            ..Self::base("task_unblocked")
201        }
202    }
203
204    pub fn performance_regression(task: &str, reason: &str) -> Self {
205        Self {
206            task: Some(task.into()),
207            reason: Some(reason.into()),
208            ..Self::base("performance_regression")
209        }
210    }
211
212    pub fn task_completed(role: &str, task: Option<&str>) -> Self {
213        Self {
214            role: Some(role.into()),
215            task: task.map(|t| t.into()),
216            ..Self::base("task_completed")
217        }
218    }
219
220    pub fn standup_generated(recipient: &str) -> Self {
221        Self {
222            recipient: Some(recipient.into()),
223            ..Self::base("standup_generated")
224        }
225    }
226
227    pub fn retro_generated() -> Self {
228        Self::base("retro_generated")
229    }
230
231    pub fn pattern_detected(pattern_type: &str, frequency: u32) -> Self {
232        Self {
233            reason: Some(format!("{pattern_type}:{frequency}")),
234            ..Self::base("pattern_detected")
235        }
236    }
237
238    pub fn member_crashed(role: &str, restart: bool) -> Self {
239        Self {
240            role: Some(role.into()),
241            restart: Some(restart),
242            ..Self::base("member_crashed")
243        }
244    }
245
246    pub fn pane_death(role: &str) -> Self {
247        Self {
248            role: Some(role.into()),
249            ..Self::base("pane_death")
250        }
251    }
252
253    pub fn pane_respawned(role: &str) -> Self {
254        Self {
255            role: Some(role.into()),
256            ..Self::base("pane_respawned")
257        }
258    }
259
260    pub fn stall_detected(role: &str, task: Option<u32>, stall_duration_secs: u64) -> Self {
261        Self {
262            role: Some(role.into()),
263            task: task.map(|id| id.to_string()),
264            uptime_secs: Some(stall_duration_secs),
265            ..Self::base("stall_detected")
266        }
267    }
268
269    /// Record a backend health state change for an agent.
270    ///
271    /// `reason` encodes the transition, e.g. "healthy→unreachable".
272    pub fn health_changed(role: &str, reason: &str) -> Self {
273        Self {
274            role: Some(role.into()),
275            reason: Some(reason.into()),
276            ..Self::base("health_changed")
277        }
278    }
279
280    pub fn message_routed(from: &str, to: &str) -> Self {
281        Self {
282            from: Some(from.into()),
283            to: Some(to.into()),
284            ..Self::base("message_routed")
285        }
286    }
287
288    pub fn agent_spawned(role: &str) -> Self {
289        Self {
290            role: Some(role.into()),
291            ..Self::base("agent_spawned")
292        }
293    }
294
295    pub fn agent_restarted(role: &str, task: &str, reason: &str, restart_count: u32) -> Self {
296        Self {
297            role: Some(role.into()),
298            task: Some(task.into()),
299            reason: Some(reason.into()),
300            restart_count: Some(restart_count),
301            ..Self::base("agent_restarted")
302        }
303    }
304
305    pub fn delivery_failed(role: &str, from: &str, reason: &str) -> Self {
306        Self {
307            role: Some(role.into()),
308            from: Some(from.into()),
309            reason: Some(reason.into()),
310            ..Self::base("delivery_failed")
311        }
312    }
313
314    pub fn task_auto_merged(
315        engineer: &str,
316        task: &str,
317        confidence: f64,
318        files_changed: usize,
319        lines_changed: usize,
320    ) -> Self {
321        Self {
322            role: Some(engineer.into()),
323            task: Some(task.into()),
324            load: Some(confidence),
325            reason: Some(format!("files={} lines={}", files_changed, lines_changed)),
326            ..Self::base("task_auto_merged")
327        }
328    }
329
330    pub fn task_manual_merged(task: &str) -> Self {
331        Self {
332            task: Some(task.into()),
333            ..Self::base("task_manual_merged")
334        }
335    }
336
337    /// Emitted for every completed task to record its merge confidence score.
338    pub fn merge_confidence_scored(info: &MergeConfidenceInfo<'_>) -> Self {
339        let detail = format!(
340            "files={} lines={} migrations={} config={} renames={}",
341            info.files_changed,
342            info.lines_changed,
343            info.has_migrations,
344            info.has_config_changes,
345            info.rename_count
346        );
347        Self {
348            role: Some(info.engineer.into()),
349            task: Some(info.task.into()),
350            load: Some(info.confidence),
351            reason: Some(detail),
352            ..Self::base("merge_confidence_scored")
353        }
354    }
355
356    pub fn review_escalated_by_role(role: &str, task: &str) -> Self {
357        Self {
358            role: Some(role.into()),
359            task: Some(task.into()),
360            ..Self::base("review_escalated")
361        }
362    }
363
364    pub fn task_reworked(role: &str, task: &str) -> Self {
365        Self {
366            role: Some(role.into()),
367            task: Some(task.into()),
368            ..Self::base("task_reworked")
369        }
370    }
371
372    pub fn task_recycled(task_id: u32, cron_expr: &str) -> Self {
373        Self {
374            task: Some(format!("#{task_id}")),
375            reason: Some(cron_expr.into()),
376            ..Self::base("task_recycled")
377        }
378    }
379
380    pub fn load_snapshot(working_members: u32, total_members: u32, session_running: bool) -> Self {
381        let load = if total_members == 0 {
382            0.0
383        } else {
384            working_members as f64 / total_members as f64
385        };
386        Self {
387            load: Some(load),
388            working_members: Some(working_members),
389            total_members: Some(total_members),
390            session_running: Some(session_running),
391            ..Self::base("load_snapshot")
392        }
393    }
394
395    pub fn worktree_reconciled(role: &str, branch: &str) -> Self {
396        Self {
397            role: Some(role.into()),
398            reason: Some(format!("branch '{branch}' merged into main")),
399            ..Self::base("worktree_reconciled")
400        }
401    }
402}
403
404pub struct EventSink {
405    writer: Box<dyn Write + Send>,
406    path: PathBuf,
407    max_bytes: Option<u64>,
408}
409
410impl EventSink {
411    pub fn new(path: &Path) -> Result<Self> {
412        Self::new_with_max_bytes(path, DEFAULT_EVENT_LOG_MAX_BYTES)
413    }
414
415    pub fn new_with_max_bytes(path: &Path, max_bytes: u64) -> Result<Self> {
416        if let Some(parent) = path.parent() {
417            std::fs::create_dir_all(parent)?;
418        }
419        rotate_event_log_if_needed(path, max_bytes, 0)?;
420        let file = OpenOptions::new()
421            .create(true)
422            .append(true)
423            .open(path)
424            .with_context(|| format!("failed to open event sink: {}", path.display()))?;
425        Ok(Self {
426            writer: Box::new(BufWriter::new(file)),
427            path: path.to_path_buf(),
428            max_bytes: Some(max_bytes),
429        })
430    }
431
432    #[cfg(test)]
433    pub(crate) fn from_writer(path: &Path, writer: impl Write + Send + 'static) -> Self {
434        Self {
435            writer: Box::new(writer),
436            path: path.to_path_buf(),
437            max_bytes: None,
438        }
439    }
440
441    pub fn emit(&mut self, event: TeamEvent) -> Result<()> {
442        let json = serde_json::to_string(&event)?;
443        self.rotate_if_needed((json.len() + 1) as u64)?;
444        writeln!(self.writer, "{json}")?;
445        self.writer.flush()?;
446        Ok(())
447    }
448
449    #[allow(dead_code)]
450    pub fn path(&self) -> &Path {
451        &self.path
452    }
453
454    fn rotate_if_needed(&mut self, next_entry_bytes: u64) -> Result<()> {
455        let Some(max_bytes) = self.max_bytes else {
456            return Ok(());
457        };
458        self.writer.flush()?;
459        if rotate_event_log_if_needed(&self.path, max_bytes, next_entry_bytes)? {
460            self.writer = Box::new(BufWriter::new(
461                OpenOptions::new()
462                    .create(true)
463                    .append(true)
464                    .open(&self.path)
465                    .with_context(|| {
466                        format!("failed to reopen event sink: {}", self.path.display())
467                    })?,
468            ));
469        }
470        Ok(())
471    }
472}
473
474fn rotated_event_log_path(path: &Path) -> PathBuf {
475    PathBuf::from(format!("{}.1", path.display()))
476}
477
478fn rotate_event_log_if_needed(path: &Path, max_bytes: u64, next_entry_bytes: u64) -> Result<bool> {
479    let len = match fs::metadata(path) {
480        Ok(metadata) => metadata.len(),
481        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(false),
482        Err(error) => {
483            return Err(error).with_context(|| format!("failed to stat {}", path.display()));
484        }
485    };
486
487    if len == 0 {
488        return Ok(false);
489    }
490
491    if len.saturating_add(next_entry_bytes) <= max_bytes {
492        return Ok(false);
493    }
494
495    let rotated = rotated_event_log_path(path);
496    if rotated.exists() {
497        fs::remove_file(&rotated)
498            .with_context(|| format!("failed to remove {}", rotated.display()))?;
499    }
500    fs::rename(path, &rotated).with_context(|| {
501        format!(
502            "failed to rotate event log {} to {}",
503            path.display(),
504            rotated.display()
505        )
506    })?;
507    Ok(true)
508}
509
510pub fn read_events(path: &Path) -> Result<Vec<TeamEvent>> {
511    if !path.exists() {
512        return Ok(Vec::new());
513    }
514    let content = fs::read_to_string(path).context("failed to read event log")?;
515    let mut events = Vec::new();
516    for line in content.lines() {
517        let line = line.trim();
518        if line.is_empty() {
519            continue;
520        }
521        if let Ok(event) = serde_json::from_str::<TeamEvent>(line) {
522            events.push(event);
523        }
524    }
525    Ok(events)
526}
527
528#[cfg(test)]
529mod tests {
530    use std::sync::{Arc, Mutex};
531    use std::thread;
532
533    use super::*;
534
535    #[test]
536    fn event_serializes_to_json() {
537        let event = TeamEvent::task_assigned("eng-1-1", "fix auth bug");
538        let json = serde_json::to_string(&event).unwrap();
539        assert!(json.contains("\"event\":\"task_assigned\""));
540        assert!(json.contains("\"role\":\"eng-1-1\""));
541        assert!(json.contains("\"task\":\"fix auth bug\""));
542        assert!(json.contains("\"ts\":"));
543    }
544
545    #[test]
546    fn optional_fields_omitted() {
547        let event = TeamEvent::daemon_started();
548        let json = serde_json::to_string(&event).unwrap();
549        assert!(!json.contains("\"role\""));
550        assert!(!json.contains("\"task\""));
551    }
552
553    #[test]
554    fn event_sink_writes_jsonl() {
555        let tmp = tempfile::tempdir().unwrap();
556        let path = tmp.path().join("events.jsonl");
557        let mut sink = EventSink::new(&path).unwrap();
558        sink.emit(TeamEvent::daemon_started()).unwrap();
559        sink.emit(TeamEvent::task_assigned("eng-1", "fix bug"))
560            .unwrap();
561        sink.emit(TeamEvent::daemon_stopped()).unwrap();
562
563        let content = std::fs::read_to_string(&path).unwrap();
564        let lines: Vec<&str> = content.lines().collect();
565        assert_eq!(lines.len(), 3);
566        assert!(lines[0].contains("daemon_started"));
567        assert!(lines[1].contains("task_assigned"));
568        assert!(lines[2].contains("daemon_stopped"));
569    }
570
571    #[test]
572    fn all_event_variants_serialize_with_correct_event_field() {
573        let variants: Vec<(&str, TeamEvent)> = vec![
574            ("daemon_started", TeamEvent::daemon_started()),
575            ("daemon_reloading", TeamEvent::daemon_reloading()),
576            ("daemon_reloaded", TeamEvent::daemon_reloaded()),
577            ("daemon_stopped", TeamEvent::daemon_stopped()),
578            (
579                "daemon_stopped",
580                TeamEvent::daemon_stopped_with_reason("signal", 3600),
581            ),
582            ("daemon_heartbeat", TeamEvent::daemon_heartbeat(120)),
583            (
584                "context_exhausted",
585                TeamEvent::context_exhausted("eng-1", Some(42), Some(1_024)),
586            ),
587            (
588                "loop_step_error",
589                TeamEvent::loop_step_error("poll_watchers", "tmux error"),
590            ),
591            (
592                "daemon_panic",
593                TeamEvent::daemon_panic("index out of bounds"),
594            ),
595            ("task_assigned", TeamEvent::task_assigned("eng-1", "task")),
596            (
597                "cwd_corrected",
598                TeamEvent::cwd_corrected("eng-1", "/tmp/worktree"),
599            ),
600            (
601                "task_escalated",
602                TeamEvent::task_escalated("eng-1", "task", None),
603            ),
604            ("task_unblocked", TeamEvent::task_unblocked("eng-1", "task")),
605            (
606                "performance_regression",
607                TeamEvent::performance_regression("42", "runtime_ms=1300 avg_ms=1000 pct=30"),
608            ),
609            (
610                "task_completed",
611                TeamEvent::task_completed("eng-1", Some("42")),
612            ),
613            ("standup_generated", TeamEvent::standup_generated("manager")),
614            ("retro_generated", TeamEvent::retro_generated()),
615            (
616                "pattern_detected",
617                TeamEvent::pattern_detected("merge_conflict_recurrence", 5),
618            ),
619            ("member_crashed", TeamEvent::member_crashed("eng-1", true)),
620            ("pane_death", TeamEvent::pane_death("eng-1")),
621            ("pane_respawned", TeamEvent::pane_respawned("eng-1")),
622            ("message_routed", TeamEvent::message_routed("a", "b")),
623            ("agent_spawned", TeamEvent::agent_spawned("eng-1")),
624            (
625                "agent_restarted",
626                TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 1),
627            ),
628            (
629                "delivery_failed",
630                TeamEvent::delivery_failed("eng-1", "manager", "message marker missing"),
631            ),
632            (
633                "task_auto_merged",
634                TeamEvent::task_auto_merged("eng-1", "42", 0.95, 2, 30),
635            ),
636            ("task_manual_merged", TeamEvent::task_manual_merged("42")),
637            (
638                "merge_confidence_scored",
639                TeamEvent::merge_confidence_scored(&MergeConfidenceInfo {
640                    engineer: "eng-1",
641                    task: "42",
642                    confidence: 0.85,
643                    files_changed: 3,
644                    lines_changed: 50,
645                    has_migrations: false,
646                    has_config_changes: false,
647                    rename_count: 0,
648                }),
649            ),
650            (
651                "review_nudge_sent",
652                TeamEvent::review_nudge_sent("manager", "42"),
653            ),
654            (
655                "review_escalated",
656                TeamEvent::review_escalated("42", "stale review"),
657            ),
658            ("task_reworked", TeamEvent::task_reworked("eng-1", "42")),
659            ("load_snapshot", TeamEvent::load_snapshot(2, 5, true)),
660            (
661                "worktree_reconciled",
662                TeamEvent::worktree_reconciled("eng-1", "eng-1/42"),
663            ),
664        ];
665        for (expected_event, event) in &variants {
666            let json = serde_json::to_string(event).unwrap();
667            let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
668            assert_eq!(parsed["event"].as_str().unwrap(), *expected_event);
669            assert!(parsed["ts"].as_u64().is_some());
670        }
671    }
672
673    #[test]
674    fn load_snapshot_serializes_optional_metrics() {
675        let event = TeamEvent::load_snapshot(3, 7, false);
676        let json = serde_json::to_string(&event).unwrap();
677        assert!(json.contains("\"event\":\"load_snapshot\""));
678        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
679        assert_eq!(parsed["load"].as_f64().unwrap(), 3.0 / 7.0);
680        assert_eq!(parsed["working_members"].as_u64().unwrap(), 3);
681        assert_eq!(parsed["total_members"].as_u64().unwrap(), 7);
682        assert!(!parsed["session_running"].as_bool().unwrap());
683    }
684
685    #[test]
686    fn read_events_parses_all_known_lines() {
687        let tmp = tempfile::tempdir().unwrap();
688        let path = tmp.path().join("events.jsonl");
689        let mut sink = EventSink::new(&path).unwrap();
690        sink.emit(TeamEvent::daemon_started()).unwrap();
691        sink.emit(TeamEvent::load_snapshot(1, 4, true)).unwrap();
692        sink.emit(TeamEvent::load_snapshot(2, 4, true)).unwrap();
693
694        let events = read_events(&path).unwrap();
695        assert_eq!(events.len(), 3);
696        assert_eq!(events[1].event, "load_snapshot");
697        assert_eq!(events[1].working_members, Some(1));
698        assert_eq!(events[2].total_members, Some(4));
699    }
700
701    #[test]
702    fn event_sink_appends_to_existing_file() {
703        let tmp = tempfile::tempdir().unwrap();
704        let path = tmp.path().join("events.jsonl");
705
706        // Write one event and close the sink
707        {
708            let mut sink = EventSink::new(&path).unwrap();
709            sink.emit(TeamEvent::daemon_started()).unwrap();
710        }
711
712        // Open again and append another
713        {
714            let mut sink = EventSink::new(&path).unwrap();
715            sink.emit(TeamEvent::daemon_stopped()).unwrap();
716        }
717
718        let content = std::fs::read_to_string(&path).unwrap();
719        let lines: Vec<&str> = content.lines().collect();
720        assert_eq!(lines.len(), 2);
721        assert!(lines[0].contains("daemon_started"));
722        assert!(lines[1].contains("daemon_stopped"));
723    }
724
725    #[test]
726    fn event_with_special_characters_in_task() {
727        let event = TeamEvent::task_assigned("eng-1", "fix: \"quotes\" & <angles> / \\ newline\n");
728        let json = serde_json::to_string(&event).unwrap();
729        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
730        let task_val = parsed["task"].as_str().unwrap();
731        assert!(task_val.contains("\"quotes\""));
732        assert!(task_val.contains("<angles>"));
733    }
734
735    #[test]
736    fn task_escalated_serializes_role_and_task() {
737        let event = TeamEvent::task_escalated("eng-1-1", "42", Some("tests_failed"));
738        let json = serde_json::to_string(&event).unwrap();
739        assert!(json.contains("\"event\":\"task_escalated\""));
740        assert!(json.contains("\"role\":\"eng-1-1\""));
741        assert!(json.contains("\"task\":\"42\""));
742        assert!(json.contains("\"reason\":\"tests_failed\""));
743    }
744
745    #[test]
746    fn cwd_corrected_serializes_role_and_reason() {
747        let event = TeamEvent::cwd_corrected("eng-1-1", "/tmp/worktree");
748        let json = serde_json::to_string(&event).unwrap();
749        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
750        assert_eq!(parsed["event"].as_str().unwrap(), "cwd_corrected");
751        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
752        assert_eq!(parsed["reason"].as_str().unwrap(), "/tmp/worktree");
753    }
754
755    #[test]
756    fn pane_death_serializes_role() {
757        let event = TeamEvent::pane_death("eng-1-1");
758        let json = serde_json::to_string(&event).unwrap();
759        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
760        assert_eq!(parsed["event"].as_str().unwrap(), "pane_death");
761        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
762    }
763
764    #[test]
765    fn pane_respawned_serializes_role() {
766        let event = TeamEvent::pane_respawned("eng-1-1");
767        let json = serde_json::to_string(&event).unwrap();
768        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
769        assert_eq!(parsed["event"].as_str().unwrap(), "pane_respawned");
770        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
771    }
772
773    #[test]
774    fn agent_restarted_includes_reason_task_and_count() {
775        let event = TeamEvent::agent_restarted("eng-1-2", "67", "context_exhausted", 2);
776        let json = serde_json::to_string(&event).unwrap();
777        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
778        assert_eq!(parsed["event"].as_str().unwrap(), "agent_restarted");
779        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
780        assert_eq!(parsed["task"].as_str().unwrap(), "67");
781        assert_eq!(parsed["reason"].as_str().unwrap(), "context_exhausted");
782        assert_eq!(parsed["restart_count"].as_u64().unwrap(), 2);
783    }
784
785    #[test]
786    fn delivery_failed_includes_role_sender_and_reason() {
787        let event = TeamEvent::delivery_failed("eng-1-2", "manager", "message marker missing");
788        let json = serde_json::to_string(&event).unwrap();
789        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
790        assert_eq!(parsed["event"].as_str().unwrap(), "delivery_failed");
791        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
792        assert_eq!(parsed["from"].as_str().unwrap(), "manager");
793        assert_eq!(parsed["reason"].as_str().unwrap(), "message marker missing");
794    }
795
796    #[test]
797    fn task_unblocked_serializes_role_and_task() {
798        let event = TeamEvent::task_unblocked("eng-1-1", "42");
799        let json = serde_json::to_string(&event).unwrap();
800        assert!(json.contains("\"event\":\"task_unblocked\""));
801        assert!(json.contains("\"role\":\"eng-1-1\""));
802        assert!(json.contains("\"task\":\"42\""));
803    }
804
805    #[test]
806    fn merge_confidence_scored_includes_all_fields() {
807        let event = TeamEvent::merge_confidence_scored(&MergeConfidenceInfo {
808            engineer: "eng-1-1",
809            task: "42",
810            confidence: 0.85,
811            files_changed: 3,
812            lines_changed: 50,
813            has_migrations: true,
814            has_config_changes: false,
815            rename_count: 1,
816        });
817        let json = serde_json::to_string(&event).unwrap();
818        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
819        assert_eq!(parsed["event"].as_str().unwrap(), "merge_confidence_scored");
820        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
821        assert_eq!(parsed["task"].as_str().unwrap(), "42");
822        assert!((parsed["load"].as_f64().unwrap() - 0.85).abs() < 0.001);
823        let reason = parsed["reason"].as_str().unwrap();
824        assert!(reason.contains("files=3"));
825        assert!(reason.contains("lines=50"));
826        assert!(reason.contains("migrations=true"));
827        assert!(reason.contains("config=false"));
828        assert!(reason.contains("renames=1"));
829    }
830
831    #[test]
832    fn performance_regression_serializes_task_and_reason() {
833        let event = TeamEvent::performance_regression("42", "runtime_ms=1300 avg_ms=1000 pct=30");
834        let json = serde_json::to_string(&event).unwrap();
835        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
836        assert_eq!(parsed["event"].as_str().unwrap(), "performance_regression");
837        assert_eq!(parsed["task"].as_str().unwrap(), "42");
838        assert_eq!(
839            parsed["reason"].as_str().unwrap(),
840            "runtime_ms=1300 avg_ms=1000 pct=30"
841        );
842    }
843
844    #[test]
845    fn daemon_stopped_with_reason_includes_fields() {
846        let event = TeamEvent::daemon_stopped_with_reason("signal", 7200);
847        let json = serde_json::to_string(&event).unwrap();
848        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
849        assert_eq!(parsed["reason"].as_str().unwrap(), "signal");
850        assert_eq!(parsed["uptime_secs"].as_u64().unwrap(), 7200);
851    }
852
853    #[test]
854    fn heartbeat_includes_uptime() {
855        let event = TeamEvent::daemon_heartbeat(600);
856        let json = serde_json::to_string(&event).unwrap();
857        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
858        assert_eq!(parsed["event"].as_str().unwrap(), "daemon_heartbeat");
859        assert_eq!(parsed["uptime_secs"].as_u64().unwrap(), 600);
860        // No reason/step/error fields
861        assert!(parsed.get("reason").is_none());
862        assert!(parsed.get("step").is_none());
863    }
864
865    #[test]
866    fn context_exhausted_includes_role_task_and_session_size() {
867        let event = TeamEvent::context_exhausted("eng-1", Some(77), Some(4096));
868        let json = serde_json::to_string(&event).unwrap();
869        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
870        assert_eq!(parsed["event"].as_str().unwrap(), "context_exhausted");
871        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1");
872        assert_eq!(parsed["task"].as_str().unwrap(), "77");
873        assert_eq!(parsed["session_size_bytes"].as_u64().unwrap(), 4096);
874    }
875
876    #[test]
877    fn loop_step_error_includes_step_and_error() {
878        let event = TeamEvent::loop_step_error("poll_watchers", "connection refused");
879        let json = serde_json::to_string(&event).unwrap();
880        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
881        assert_eq!(parsed["step"].as_str().unwrap(), "poll_watchers");
882        assert_eq!(parsed["error"].as_str().unwrap(), "connection refused");
883    }
884
885    #[test]
886    fn daemon_panic_includes_reason() {
887        let event = TeamEvent::daemon_panic("index out of bounds");
888        let json = serde_json::to_string(&event).unwrap();
889        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
890        assert_eq!(parsed["event"].as_str().unwrap(), "daemon_panic");
891        assert_eq!(parsed["reason"].as_str().unwrap(), "index out of bounds");
892    }
893
894    #[test]
895    fn new_fields_omitted_from_basic_events() {
896        let event = TeamEvent::daemon_started();
897        let json = serde_json::to_string(&event).unwrap();
898        assert!(!json.contains("\"reason\""));
899        assert!(!json.contains("\"step\""));
900        assert!(!json.contains("\"error\""));
901        assert!(!json.contains("\"uptime_secs\""));
902        assert!(!json.contains("\"restart_count\""));
903    }
904
905    #[test]
906    fn pattern_detected_includes_reason_payload() {
907        let event = TeamEvent::pattern_detected("escalation_cluster", 6);
908        let json = serde_json::to_string(&event).unwrap();
909        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
910        assert_eq!(parsed["event"].as_str().unwrap(), "pattern_detected");
911        assert_eq!(parsed["reason"].as_str().unwrap(), "escalation_cluster:6");
912    }
913
914    #[test]
915    fn event_sink_creates_parent_directories() {
916        let tmp = tempfile::tempdir().unwrap();
917        let path = tmp.path().join("deep").join("nested").join("events.jsonl");
918        let mut sink = EventSink::new(&path).unwrap();
919        sink.emit(TeamEvent::daemon_started()).unwrap();
920        assert!(path.exists());
921        assert_eq!(sink.path(), path);
922    }
923
924    #[test]
925    fn event_sink_rotates_oversized_log_on_open() {
926        let tmp = tempfile::tempdir().unwrap();
927        let path = tmp.path().join("events.jsonl");
928        fs::write(&path, "0123456789").unwrap();
929
930        let mut sink = EventSink::new_with_max_bytes(&path, 5).unwrap();
931        sink.emit(TeamEvent::daemon_started()).unwrap();
932
933        let rotated = rotated_event_log_path(&path);
934        assert_eq!(fs::read_to_string(&rotated).unwrap(), "0123456789");
935        let current = fs::read_to_string(&path).unwrap();
936        assert!(current.contains("daemon_started"));
937    }
938
939    #[test]
940    fn event_sink_rotates_before_write_that_would_exceed_threshold() {
941        let tmp = tempfile::tempdir().unwrap();
942        let path = tmp.path().join("events.jsonl");
943        let first_line = "{\"event\":\"first\"}\n";
944        fs::write(&path, first_line).unwrap();
945
946        let mut sink = EventSink::new_with_max_bytes(&path, first_line.len() as u64 + 10).unwrap();
947        sink.emit(TeamEvent::task_assigned(
948            "eng-1",
949            "this assignment is long enough to rotate",
950        ))
951        .unwrap();
952
953        let rotated = rotated_event_log_path(&path);
954        assert_eq!(fs::read_to_string(&rotated).unwrap(), first_line);
955        let current = fs::read_to_string(&path).unwrap();
956        assert!(current.contains("task_assigned"));
957        assert!(!current.contains("\"event\":\"first\""));
958    }
959
960    #[test]
961    fn event_round_trip_preserves_fields_for_agent_restarted() {
962        let original = TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 3);
963
964        let json = serde_json::to_string(&original).unwrap();
965        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
966
967        assert_eq!(parsed.event, "agent_restarted");
968        assert_eq!(parsed.role.as_deref(), Some("eng-1"));
969        assert_eq!(parsed.task.as_deref(), Some("42"));
970        assert_eq!(parsed.reason.as_deref(), Some("context_exhausted"));
971        assert_eq!(parsed.restart_count, Some(3));
972        assert_eq!(parsed.ts, original.ts);
973    }
974
975    #[test]
976    fn event_round_trip_preserves_fields_for_load_snapshot() {
977        let original = TeamEvent::load_snapshot(4, 8, true);
978
979        let json = serde_json::to_string(&original).unwrap();
980        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
981
982        assert_eq!(parsed.event, "load_snapshot");
983        assert_eq!(parsed.working_members, Some(4));
984        assert_eq!(parsed.total_members, Some(8));
985        assert_eq!(parsed.session_running, Some(true));
986        assert_eq!(parsed.load, Some(0.5));
987        assert_eq!(parsed.ts, original.ts);
988    }
989
990    #[test]
991    fn event_round_trip_preserves_fields_for_delivery_failed() {
992        let original = TeamEvent::delivery_failed("eng-2", "manager", "marker missing");
993
994        let json = serde_json::to_string(&original).unwrap();
995        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
996
997        assert_eq!(parsed.event, "delivery_failed");
998        assert_eq!(parsed.role.as_deref(), Some("eng-2"));
999        assert_eq!(parsed.from.as_deref(), Some("manager"));
1000        assert_eq!(parsed.reason.as_deref(), Some("marker missing"));
1001        assert_eq!(parsed.ts, original.ts);
1002    }
1003
1004    #[test]
1005    fn read_events_skips_blank_and_malformed_lines() {
1006        let tmp = tempfile::tempdir().unwrap();
1007        let path = tmp.path().join("events.jsonl");
1008        fs::write(
1009            &path,
1010            [
1011                "",
1012                "{\"event\":\"daemon_started\",\"ts\":1}",
1013                "not-json",
1014                "   ",
1015                "{\"event\":\"daemon_stopped\",\"ts\":2}",
1016            ]
1017            .join("\n"),
1018        )
1019        .unwrap();
1020
1021        let events = read_events(&path).unwrap();
1022
1023        assert_eq!(events.len(), 2);
1024        assert_eq!(events[0].event, "daemon_started");
1025        assert_eq!(events[1].event, "daemon_stopped");
1026    }
1027
1028    #[test]
1029    fn rotate_event_log_if_needed_returns_false_for_missing_file() {
1030        let tmp = tempfile::tempdir().unwrap();
1031        let path = tmp.path().join("events.jsonl");
1032
1033        let rotated = rotate_event_log_if_needed(&path, 128, 0).unwrap();
1034
1035        assert!(!rotated);
1036        assert!(!rotated_event_log_path(&path).exists());
1037    }
1038
1039    #[test]
1040    fn rotate_event_log_if_needed_returns_false_for_empty_file() {
1041        let tmp = tempfile::tempdir().unwrap();
1042        let path = tmp.path().join("events.jsonl");
1043        fs::write(&path, "").unwrap();
1044
1045        let rotated = rotate_event_log_if_needed(&path, 1, 1).unwrap();
1046
1047        assert!(!rotated);
1048        assert!(path.exists());
1049        assert!(!rotated_event_log_path(&path).exists());
1050    }
1051
1052    #[test]
1053    fn rotate_event_log_if_needed_replaces_existing_rotated_file() {
1054        let tmp = tempfile::tempdir().unwrap();
1055        let path = tmp.path().join("events.jsonl");
1056        let rotated_path = rotated_event_log_path(&path);
1057        fs::write(&path, "current-events").unwrap();
1058        fs::write(&rotated_path, "old-rotated-events").unwrap();
1059
1060        let rotated = rotate_event_log_if_needed(&path, 5, 0).unwrap();
1061
1062        assert!(rotated);
1063        assert_eq!(fs::read_to_string(&rotated_path).unwrap(), "current-events");
1064    }
1065
1066    #[test]
1067    fn concurrent_event_sinks_append_without_losing_lines() {
1068        let tmp = tempfile::tempdir().unwrap();
1069        let path = Arc::new(tmp.path().join("events.jsonl"));
1070        let ready = Arc::new(std::sync::Barrier::new(5));
1071        let errors = Arc::new(Mutex::new(Vec::<String>::new()));
1072        let mut handles = Vec::new();
1073
1074        for idx in 0..4 {
1075            let path = Arc::clone(&path);
1076            let ready = Arc::clone(&ready);
1077            let errors = Arc::clone(&errors);
1078            handles.push(thread::spawn(move || {
1079                ready.wait();
1080                let result = (|| -> Result<()> {
1081                    let mut sink = EventSink::new(&path)?;
1082                    sink.emit(TeamEvent::task_assigned(
1083                        &format!("eng-{idx}"),
1084                        &format!("task-{idx}"),
1085                    ))?;
1086                    Ok(())
1087                })();
1088                if let Err(error) = result {
1089                    errors.lock().unwrap().push(error.to_string());
1090                }
1091            }));
1092        }
1093
1094        ready.wait();
1095        for handle in handles {
1096            handle.join().unwrap();
1097        }
1098
1099        assert!(errors.lock().unwrap().is_empty());
1100        let events = read_events(&path).unwrap();
1101        assert_eq!(events.len(), 4);
1102        for idx in 0..4 {
1103            assert!(
1104                events
1105                    .iter()
1106                    .any(|event| event.role.as_deref() == Some(&format!("eng-{idx}")))
1107            );
1108        }
1109    }
1110
1111    #[test]
1112    fn read_events_handles_large_log_file() {
1113        let tmp = tempfile::tempdir().unwrap();
1114        let path = tmp.path().join("events.jsonl");
1115        let mut sink = EventSink::new(&path).unwrap();
1116
1117        for idx in 0..512 {
1118            sink.emit(TeamEvent::task_assigned(
1119                &format!("eng-{idx}"),
1120                &"x".repeat(128),
1121            ))
1122            .unwrap();
1123        }
1124
1125        let events = read_events(&path).unwrap();
1126
1127        assert_eq!(events.len(), 512);
1128        assert_eq!(events.first().unwrap().event, "task_assigned");
1129        assert_eq!(events.last().unwrap().event, "task_assigned");
1130    }
1131
1132    fn production_unwrap_expect_count(source: &str) -> usize {
1133        let prod = if let Some(pos) = source.find("\n#[cfg(test)]\nmod tests") {
1134            &source[..pos]
1135        } else {
1136            source
1137        };
1138        prod.lines()
1139            .filter(|line| {
1140                let trimmed = line.trim();
1141                !trimmed.starts_with("#[cfg(test)]")
1142                    && (trimmed.contains(".unwrap(") || trimmed.contains(".expect("))
1143            })
1144            .count()
1145    }
1146
1147    #[test]
1148    fn task_completed_includes_task_id() {
1149        let event = TeamEvent::task_completed("eng-1", Some("42"));
1150        let json = serde_json::to_string(&event).unwrap();
1151        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1152        assert_eq!(parsed["event"].as_str().unwrap(), "task_completed");
1153        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1");
1154        assert_eq!(parsed["task"].as_str().unwrap(), "42");
1155    }
1156
1157    #[test]
1158    fn task_completed_without_task_id_omits_task_field() {
1159        let event = TeamEvent::task_completed("eng-1", None);
1160        let json = serde_json::to_string(&event).unwrap();
1161        assert!(json.contains("\"event\":\"task_completed\""));
1162        assert!(json.contains("\"role\":\"eng-1\""));
1163        assert!(!json.contains("\"task\""));
1164    }
1165
1166    #[test]
1167    fn task_escalated_without_reason_omits_reason_field() {
1168        let event = TeamEvent::task_escalated("eng-1", "42", None);
1169        let json = serde_json::to_string(&event).unwrap();
1170        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1171        assert_eq!(parsed["event"].as_str().unwrap(), "task_escalated");
1172        assert_eq!(parsed["task"].as_str().unwrap(), "42");
1173        assert!(parsed.get("reason").is_none());
1174    }
1175
1176    #[test]
1177    fn task_escalated_with_reason_includes_reason_field() {
1178        let event = TeamEvent::task_escalated("eng-1", "42", Some("merge_conflict"));
1179        let json = serde_json::to_string(&event).unwrap();
1180        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1181        assert_eq!(parsed["event"].as_str().unwrap(), "task_escalated");
1182        assert_eq!(parsed["task"].as_str().unwrap(), "42");
1183        assert_eq!(parsed["reason"].as_str().unwrap(), "merge_conflict");
1184    }
1185
1186    #[test]
1187    fn task_completed_round_trip_preserves_task_id() {
1188        let original = TeamEvent::task_completed("eng-1", Some("99"));
1189        let json = serde_json::to_string(&original).unwrap();
1190        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1191        assert_eq!(parsed.event, "task_completed");
1192        assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1193        assert_eq!(parsed.task.as_deref(), Some("99"));
1194    }
1195
1196    #[test]
1197    fn task_escalated_round_trip_preserves_reason() {
1198        let original = TeamEvent::task_escalated("eng-1", "42", Some("context_exhausted"));
1199        let json = serde_json::to_string(&original).unwrap();
1200        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1201        assert_eq!(parsed.event, "task_escalated");
1202        assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1203        assert_eq!(parsed.task.as_deref(), Some("42"));
1204        assert_eq!(parsed.reason.as_deref(), Some("context_exhausted"));
1205    }
1206
1207    #[test]
1208    fn production_events_has_no_unwrap_or_expect_calls() {
1209        let src = include_str!("events.rs");
1210        assert_eq!(
1211            production_unwrap_expect_count(src),
1212            0,
1213            "production events.rs should avoid unwrap/expect"
1214        );
1215    }
1216
1217    #[test]
1218    fn stall_detected_event_fields() {
1219        let event = TeamEvent::stall_detected("eng-1-1", Some(42), 300);
1220        assert_eq!(event.event, "stall_detected");
1221        assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1222        assert_eq!(event.task.as_deref(), Some("42"));
1223        assert_eq!(event.uptime_secs, Some(300));
1224    }
1225
1226    #[test]
1227    fn stall_detected_event_without_task() {
1228        let event = TeamEvent::stall_detected("eng-1-1", None, 600);
1229        assert_eq!(event.event, "stall_detected");
1230        assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1231        assert!(event.task.is_none());
1232        assert_eq!(event.uptime_secs, Some(600));
1233    }
1234
1235    #[test]
1236    fn stall_detected_event_serializes_to_jsonl() {
1237        let event = TeamEvent::stall_detected("eng-1-1", Some(42), 300);
1238        let json = serde_json::to_string(&event).unwrap();
1239        assert!(json.contains("\"stall_detected\""));
1240        assert!(json.contains("\"eng-1-1\""));
1241        assert!(json.contains("\"42\""));
1242    }
1243
1244    #[test]
1245    fn health_changed_event_fields() {
1246        let event = TeamEvent::health_changed("eng-1-1", "healthy→unreachable");
1247        assert_eq!(event.event, "health_changed");
1248        assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1249        assert_eq!(event.reason.as_deref(), Some("healthy→unreachable"));
1250    }
1251
1252    #[test]
1253    fn health_changed_event_serializes_to_jsonl() {
1254        let event = TeamEvent::health_changed("eng-1-2", "unreachable→healthy");
1255        let json = serde_json::to_string(&event).unwrap();
1256        assert!(json.contains("\"health_changed\""));
1257        assert!(json.contains("\"eng-1-2\""));
1258    }
1259
1260    // --- Error path and recovery tests (Task #265) ---
1261
1262    #[test]
1263    fn event_sink_on_readonly_dir_returns_error() {
1264        #[cfg(unix)]
1265        {
1266            use std::os::unix::fs::PermissionsExt;
1267            let tmp = tempfile::tempdir().unwrap();
1268            let readonly_dir = tmp.path().join("readonly");
1269            fs::create_dir(&readonly_dir).unwrap();
1270            fs::set_permissions(&readonly_dir, fs::Permissions::from_mode(0o444)).unwrap();
1271
1272            let path = readonly_dir.join("subdir").join("events.jsonl");
1273            let result = EventSink::new(&path);
1274            assert!(result.is_err());
1275
1276            // Restore permissions for cleanup
1277            fs::set_permissions(&readonly_dir, fs::Permissions::from_mode(0o755)).unwrap();
1278        }
1279    }
1280
1281    #[test]
1282    fn read_events_from_nonexistent_file_returns_empty() {
1283        let tmp = tempfile::tempdir().unwrap();
1284        let path = tmp.path().join("does_not_exist.jsonl");
1285        let events = read_events(&path).unwrap();
1286        assert!(events.is_empty());
1287    }
1288
1289    #[test]
1290    fn read_events_all_malformed_lines_returns_empty() {
1291        let tmp = tempfile::tempdir().unwrap();
1292        let path = tmp.path().join("events.jsonl");
1293        fs::write(&path, "not json\nalso not json\n{invalid}\n").unwrap();
1294        let events = read_events(&path).unwrap();
1295        assert!(events.is_empty());
1296    }
1297
1298    #[test]
1299    fn event_sink_emit_with_failing_writer() {
1300        struct FailWriter;
1301        impl Write for FailWriter {
1302            fn write(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
1303                Err(std::io::Error::new(
1304                    std::io::ErrorKind::BrokenPipe,
1305                    "simulated write failure",
1306                ))
1307            }
1308            fn flush(&mut self) -> std::io::Result<()> {
1309                Err(std::io::Error::new(
1310                    std::io::ErrorKind::BrokenPipe,
1311                    "simulated flush failure",
1312                ))
1313            }
1314        }
1315
1316        let tmp = tempfile::tempdir().unwrap();
1317        let path = tmp.path().join("events.jsonl");
1318        let mut sink = EventSink::from_writer(&path, FailWriter);
1319        let result = sink.emit(TeamEvent::daemon_started());
1320        assert!(result.is_err());
1321    }
1322
1323    #[test]
1324    fn rotate_event_log_replaces_stale_rotated_file() {
1325        let tmp = tempfile::tempdir().unwrap();
1326        let path = tmp.path().join("events.jsonl");
1327        let rotated = rotated_event_log_path(&path);
1328
1329        fs::write(&path, "current-data-that-is-large").unwrap();
1330        fs::write(&rotated, "old-rotated-data").unwrap();
1331
1332        let did_rotate = rotate_event_log_if_needed(&path, 5, 0).unwrap();
1333        assert!(did_rotate);
1334        // Old rotated was replaced with current data
1335        assert_eq!(
1336            fs::read_to_string(&rotated).unwrap(),
1337            "current-data-that-is-large"
1338        );
1339        // Current file is now gone (rotated away)
1340        assert!(!path.exists());
1341    }
1342
1343    #[test]
1344    fn event_sink_handles_zero_max_bytes_rotation() {
1345        let tmp = tempfile::tempdir().unwrap();
1346        let path = tmp.path().join("events.jsonl");
1347
1348        // With max_bytes=0, any existing content triggers rotation, but empty file doesn't
1349        fs::write(&path, "").unwrap();
1350        let did_rotate = rotate_event_log_if_needed(&path, 0, 0).unwrap();
1351        assert!(!did_rotate); // empty file → no rotation
1352
1353        fs::write(&path, "x").unwrap();
1354        let did_rotate = rotate_event_log_if_needed(&path, 0, 0).unwrap();
1355        assert!(did_rotate); // non-empty file at 0-byte limit → rotation
1356    }
1357
1358    #[test]
1359    fn read_events_partial_json_with_valid_lines_mixed() {
1360        let tmp = tempfile::tempdir().unwrap();
1361        let path = tmp.path().join("events.jsonl");
1362        // Simulate a truncated write: valid JSON, then partial, then valid
1363        let content = format!(
1364            "{}\n{{\"event\":\"trunca\n{}\n",
1365            r#"{"event":"daemon_started","ts":1}"#, r#"{"event":"daemon_stopped","ts":3}"#
1366        );
1367        fs::write(&path, content).unwrap();
1368
1369        let events = read_events(&path).unwrap();
1370        assert_eq!(events.len(), 2);
1371        assert_eq!(events[0].event, "daemon_started");
1372        assert_eq!(events[1].event, "daemon_stopped");
1373    }
1374}