Skip to main content

batty_cli/team/
events.rs

1//! Structured JSONL event stream for external consumers.
2
3use std::fs::{self, OpenOptions};
4use std::io::{BufWriter, Write};
5use std::path::{Path, PathBuf};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10
11use super::DEFAULT_EVENT_LOG_MAX_BYTES;
12
13/// Bundled parameters for merge-confidence scoring events.
14pub struct MergeConfidenceInfo<'a> {
15    pub engineer: &'a str,
16    pub task: &'a str,
17    pub confidence: f64,
18    pub files_changed: usize,
19    pub lines_changed: usize,
20    pub has_migrations: bool,
21    pub has_config_changes: bool,
22    pub rename_count: usize,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26
27pub struct TeamEvent {
28    pub event: String,
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub role: Option<String>,
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub task: Option<String>,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub recipient: Option<String>,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub from: Option<String>,
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub to: Option<String>,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub restart: Option<bool>,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub restart_count: Option<u32>,
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub load: Option<f64>,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub working_members: Option<u32>,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub total_members: Option<u32>,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub session_running: Option<bool>,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub reason: Option<String>,
53    #[serde(skip_serializing_if = "Option::is_none")]
54    pub step: Option<String>,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub error: Option<String>,
57    #[serde(skip_serializing_if = "Option::is_none")]
58    pub uptime_secs: Option<u64>,
59    #[serde(skip_serializing_if = "Option::is_none")]
60    pub session_size_bytes: Option<u64>,
61    #[serde(skip_serializing_if = "Option::is_none")]
62    pub output_bytes: Option<u64>,
63    pub ts: u64,
64}
65
66impl TeamEvent {
67    fn now() -> u64 {
68        SystemTime::now()
69            .duration_since(UNIX_EPOCH)
70            .unwrap_or_default()
71            .as_secs()
72    }
73
74    fn base(event: &str) -> Self {
75        Self {
76            event: event.into(),
77            role: None,
78            task: None,
79            recipient: None,
80            from: None,
81            to: None,
82            restart: None,
83            restart_count: None,
84            load: None,
85            working_members: None,
86            total_members: None,
87            session_running: None,
88            reason: None,
89            step: None,
90            error: None,
91            uptime_secs: None,
92            session_size_bytes: None,
93            output_bytes: None,
94            ts: Self::now(),
95        }
96    }
97
98    pub fn daemon_started() -> Self {
99        Self::base("daemon_started")
100    }
101
102    pub fn daemon_reloading() -> Self {
103        Self::base("daemon_reloading")
104    }
105
106    pub fn daemon_reloaded() -> Self {
107        Self::base("daemon_reloaded")
108    }
109
110    pub fn daemon_stopped() -> Self {
111        Self::base("daemon_stopped")
112    }
113
114    pub fn daemon_stopped_with_reason(reason: &str, uptime_secs: u64) -> Self {
115        Self {
116            reason: Some(reason.into()),
117            uptime_secs: Some(uptime_secs),
118            ..Self::base("daemon_stopped")
119        }
120    }
121
122    pub fn daemon_heartbeat(uptime_secs: u64) -> Self {
123        Self {
124            uptime_secs: Some(uptime_secs),
125            ..Self::base("daemon_heartbeat")
126        }
127    }
128
129    pub fn context_exhausted(
130        role: &str,
131        task: Option<u32>,
132        session_size_bytes: Option<u64>,
133    ) -> Self {
134        Self {
135            role: Some(role.into()),
136            task: task.map(|task_id| task_id.to_string()),
137            session_size_bytes,
138            ..Self::base("context_exhausted")
139        }
140    }
141
142    pub fn loop_step_error(step: &str, error: &str) -> Self {
143        Self {
144            step: Some(step.into()),
145            error: Some(error.into()),
146            ..Self::base("loop_step_error")
147        }
148    }
149
150    pub fn daemon_panic(reason: &str) -> Self {
151        Self {
152            reason: Some(reason.into()),
153            ..Self::base("daemon_panic")
154        }
155    }
156
157    pub fn task_assigned(role: &str, task: &str) -> Self {
158        Self {
159            role: Some(role.into()),
160            task: Some(task.into()),
161            ..Self::base("task_assigned")
162        }
163    }
164
165    pub fn cwd_corrected(role: &str, path: &str) -> Self {
166        Self {
167            role: Some(role.into()),
168            reason: Some(path.into()),
169            ..Self::base("cwd_corrected")
170        }
171    }
172
173    pub fn review_nudge_sent(role: &str, task: &str) -> Self {
174        Self {
175            role: Some(role.into()),
176            task: Some(task.into()),
177            ..Self::base("review_nudge_sent")
178        }
179    }
180
181    pub fn review_escalated(task: &str, reason: &str) -> Self {
182        Self {
183            task: Some(task.into()),
184            reason: Some(reason.into()),
185            ..Self::base("review_escalated")
186        }
187    }
188
189    pub fn task_escalated(role: &str, task: &str, reason: Option<&str>) -> Self {
190        Self {
191            role: Some(role.into()),
192            task: Some(task.into()),
193            reason: reason.map(|r| r.into()),
194            ..Self::base("task_escalated")
195        }
196    }
197
198    pub fn task_unblocked(role: &str, task: &str) -> Self {
199        Self {
200            role: Some(role.into()),
201            task: Some(task.into()),
202            ..Self::base("task_unblocked")
203        }
204    }
205
206    pub fn performance_regression(task: &str, reason: &str) -> Self {
207        Self {
208            task: Some(task.into()),
209            reason: Some(reason.into()),
210            ..Self::base("performance_regression")
211        }
212    }
213
214    pub fn task_completed(role: &str, task: Option<&str>) -> Self {
215        Self {
216            role: Some(role.into()),
217            task: task.map(|t| t.into()),
218            ..Self::base("task_completed")
219        }
220    }
221
222    pub fn standup_generated(recipient: &str) -> Self {
223        Self {
224            recipient: Some(recipient.into()),
225            ..Self::base("standup_generated")
226        }
227    }
228
229    pub fn retro_generated() -> Self {
230        Self::base("retro_generated")
231    }
232
233    pub fn pattern_detected(pattern_type: &str, frequency: u32) -> Self {
234        Self {
235            reason: Some(format!("{pattern_type}:{frequency}")),
236            ..Self::base("pattern_detected")
237        }
238    }
239
240    pub fn member_crashed(role: &str, restart: bool) -> Self {
241        Self {
242            role: Some(role.into()),
243            restart: Some(restart),
244            ..Self::base("member_crashed")
245        }
246    }
247
248    pub fn pane_death(role: &str) -> Self {
249        Self {
250            role: Some(role.into()),
251            ..Self::base("pane_death")
252        }
253    }
254
255    pub fn pane_respawned(role: &str) -> Self {
256        Self {
257            role: Some(role.into()),
258            ..Self::base("pane_respawned")
259        }
260    }
261
262    pub fn narration_detected(role: &str, task: Option<u32>) -> Self {
263        Self {
264            role: Some(role.into()),
265            task: task.map(|id| id.to_string()),
266            ..Self::base("narration_detected")
267        }
268    }
269
270    pub fn context_pressure_warning(
271        role: &str,
272        task: Option<u32>,
273        output_bytes: u64,
274        threshold_bytes: u64,
275    ) -> Self {
276        Self {
277            role: Some(role.into()),
278            task: task.map(|id| id.to_string()),
279            output_bytes: Some(output_bytes),
280            reason: Some(format!("threshold_bytes={threshold_bytes}")),
281            ..Self::base("context_pressure_warning")
282        }
283    }
284
285    pub fn stall_detected(role: &str, task: Option<u32>, stall_duration_secs: u64) -> Self {
286        Self {
287            role: Some(role.into()),
288            task: task.map(|id| id.to_string()),
289            uptime_secs: Some(stall_duration_secs),
290            ..Self::base("stall_detected")
291        }
292    }
293
294    /// Record a backend health state change for an agent.
295    ///
296    /// `reason` encodes the transition, e.g. "healthy→unreachable".
297    pub fn health_changed(role: &str, reason: &str) -> Self {
298        Self {
299            role: Some(role.into()),
300            reason: Some(reason.into()),
301            ..Self::base("health_changed")
302        }
303    }
304
305    pub fn message_routed(from: &str, to: &str) -> Self {
306        Self {
307            from: Some(from.into()),
308            to: Some(to.into()),
309            ..Self::base("message_routed")
310        }
311    }
312
313    pub fn agent_spawned(role: &str) -> Self {
314        Self {
315            role: Some(role.into()),
316            ..Self::base("agent_spawned")
317        }
318    }
319
320    pub fn agent_restarted(role: &str, task: &str, reason: &str, restart_count: u32) -> Self {
321        Self {
322            role: Some(role.into()),
323            task: Some(task.into()),
324            reason: Some(reason.into()),
325            restart_count: Some(restart_count),
326            ..Self::base("agent_restarted")
327        }
328    }
329
330    pub fn task_resumed(role: &str, task: &str, reason: &str, restart_count: u32) -> Self {
331        Self {
332            role: Some(role.into()),
333            task: Some(task.into()),
334            reason: Some(reason.into()),
335            restart_count: Some(restart_count),
336            ..Self::base("task_resumed")
337        }
338    }
339
340    pub fn delivery_failed(role: &str, from: &str, reason: &str) -> Self {
341        Self {
342            role: Some(role.into()),
343            from: Some(from.into()),
344            reason: Some(reason.into()),
345            ..Self::base("delivery_failed")
346        }
347    }
348
349    pub fn task_auto_merged(
350        engineer: &str,
351        task: &str,
352        confidence: f64,
353        files_changed: usize,
354        lines_changed: usize,
355    ) -> Self {
356        Self {
357            role: Some(engineer.into()),
358            task: Some(task.into()),
359            load: Some(confidence),
360            reason: Some(format!("files={} lines={}", files_changed, lines_changed)),
361            ..Self::base("task_auto_merged")
362        }
363    }
364
365    pub fn task_manual_merged(task: &str) -> Self {
366        Self {
367            task: Some(task.into()),
368            ..Self::base("task_manual_merged")
369        }
370    }
371
372    /// Emitted for every completed task to record its merge confidence score.
373    pub fn merge_confidence_scored(info: &MergeConfidenceInfo<'_>) -> Self {
374        let detail = format!(
375            "files={} lines={} migrations={} config={} renames={}",
376            info.files_changed,
377            info.lines_changed,
378            info.has_migrations,
379            info.has_config_changes,
380            info.rename_count
381        );
382        Self {
383            role: Some(info.engineer.into()),
384            task: Some(info.task.into()),
385            load: Some(info.confidence),
386            reason: Some(detail),
387            ..Self::base("merge_confidence_scored")
388        }
389    }
390
391    pub fn review_escalated_by_role(role: &str, task: &str) -> Self {
392        Self {
393            role: Some(role.into()),
394            task: Some(task.into()),
395            ..Self::base("review_escalated")
396        }
397    }
398
399    pub fn task_reworked(role: &str, task: &str) -> Self {
400        Self {
401            role: Some(role.into()),
402            task: Some(task.into()),
403            ..Self::base("task_reworked")
404        }
405    }
406
407    pub fn task_recycled(task_id: u32, cron_expr: &str) -> Self {
408        Self {
409            task: Some(format!("#{task_id}")),
410            reason: Some(cron_expr.into()),
411            ..Self::base("task_recycled")
412        }
413    }
414
415    pub fn load_snapshot(working_members: u32, total_members: u32, session_running: bool) -> Self {
416        let load = if total_members == 0 {
417            0.0
418        } else {
419            working_members as f64 / total_members as f64
420        };
421        Self {
422            load: Some(load),
423            working_members: Some(working_members),
424            total_members: Some(total_members),
425            session_running: Some(session_running),
426            ..Self::base("load_snapshot")
427        }
428    }
429
430    pub fn worktree_reconciled(role: &str, branch: &str) -> Self {
431        Self {
432            role: Some(role.into()),
433            reason: Some(format!("branch '{branch}' merged into main")),
434            ..Self::base("worktree_reconciled")
435        }
436    }
437
438    /// Emitted when the daemon reconciles a topology change.
439    ///
440    /// `reason` contains a human-readable summary (e.g. "+2 added, -1 removed").
441    pub fn topology_changed(added: u32, removed: u32, reason: &str) -> Self {
442        Self {
443            working_members: Some(added),
444            total_members: Some(removed),
445            reason: Some(reason.into()),
446            ..Self::base("topology_changed")
447        }
448    }
449
450    /// Emitted when an agent is removed during a scale-down.
451    pub fn agent_removed(role: &str, reason: &str) -> Self {
452        Self {
453            role: Some(role.into()),
454            reason: Some(reason.into()),
455            ..Self::base("agent_removed")
456        }
457    }
458}
459
460pub struct EventSink {
461    writer: Box<dyn Write + Send>,
462    path: PathBuf,
463    max_bytes: Option<u64>,
464}
465
466impl EventSink {
467    pub fn new(path: &Path) -> Result<Self> {
468        Self::new_with_max_bytes(path, DEFAULT_EVENT_LOG_MAX_BYTES)
469    }
470
471    pub fn new_with_max_bytes(path: &Path, max_bytes: u64) -> Result<Self> {
472        if let Some(parent) = path.parent() {
473            std::fs::create_dir_all(parent)?;
474        }
475        rotate_event_log_if_needed(path, max_bytes, 0)?;
476        let file = OpenOptions::new()
477            .create(true)
478            .append(true)
479            .open(path)
480            .with_context(|| format!("failed to open event sink: {}", path.display()))?;
481        Ok(Self {
482            writer: Box::new(BufWriter::new(file)),
483            path: path.to_path_buf(),
484            max_bytes: Some(max_bytes),
485        })
486    }
487
488    #[cfg(test)]
489    pub(crate) fn from_writer(path: &Path, writer: impl Write + Send + 'static) -> Self {
490        Self {
491            writer: Box::new(writer),
492            path: path.to_path_buf(),
493            max_bytes: None,
494        }
495    }
496
497    pub fn emit(&mut self, event: TeamEvent) -> Result<()> {
498        let json = serde_json::to_string(&event)?;
499        self.rotate_if_needed((json.len() + 1) as u64)?;
500        writeln!(self.writer, "{json}")?;
501        self.writer.flush()?;
502        Ok(())
503    }
504
505    pub fn path(&self) -> &Path {
506        &self.path
507    }
508
509    fn rotate_if_needed(&mut self, next_entry_bytes: u64) -> Result<()> {
510        let Some(max_bytes) = self.max_bytes else {
511            return Ok(());
512        };
513        self.writer.flush()?;
514        if rotate_event_log_if_needed(&self.path, max_bytes, next_entry_bytes)? {
515            self.writer = Box::new(BufWriter::new(
516                OpenOptions::new()
517                    .create(true)
518                    .append(true)
519                    .open(&self.path)
520                    .with_context(|| {
521                        format!("failed to reopen event sink: {}", self.path.display())
522                    })?,
523            ));
524        }
525        Ok(())
526    }
527}
528
529fn rotated_event_log_path(path: &Path) -> PathBuf {
530    PathBuf::from(format!("{}.1", path.display()))
531}
532
533fn rotate_event_log_if_needed(path: &Path, max_bytes: u64, next_entry_bytes: u64) -> Result<bool> {
534    let len = match fs::metadata(path) {
535        Ok(metadata) => metadata.len(),
536        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(false),
537        Err(error) => {
538            return Err(error).with_context(|| format!("failed to stat {}", path.display()));
539        }
540    };
541
542    if len == 0 {
543        return Ok(false);
544    }
545
546    if len.saturating_add(next_entry_bytes) <= max_bytes {
547        return Ok(false);
548    }
549
550    let rotated = rotated_event_log_path(path);
551    if rotated.exists() {
552        fs::remove_file(&rotated)
553            .with_context(|| format!("failed to remove {}", rotated.display()))?;
554    }
555    fs::rename(path, &rotated).with_context(|| {
556        format!(
557            "failed to rotate event log {} to {}",
558            path.display(),
559            rotated.display()
560        )
561    })?;
562    Ok(true)
563}
564
565pub fn read_events(path: &Path) -> Result<Vec<TeamEvent>> {
566    if !path.exists() {
567        return Ok(Vec::new());
568    }
569    let content = fs::read_to_string(path).context("failed to read event log")?;
570    let mut events = Vec::new();
571    for line in content.lines() {
572        let line = line.trim();
573        if line.is_empty() {
574            continue;
575        }
576        if let Ok(event) = serde_json::from_str::<TeamEvent>(line) {
577            events.push(event);
578        }
579    }
580    Ok(events)
581}
582
583#[cfg(test)]
584mod tests {
585    use std::sync::{Arc, Mutex};
586    use std::thread;
587
588    use super::*;
589
590    #[test]
591    fn event_serializes_to_json() {
592        let event = TeamEvent::task_assigned("eng-1-1", "fix auth bug");
593        let json = serde_json::to_string(&event).unwrap();
594        assert!(json.contains("\"event\":\"task_assigned\""));
595        assert!(json.contains("\"role\":\"eng-1-1\""));
596        assert!(json.contains("\"task\":\"fix auth bug\""));
597        assert!(json.contains("\"ts\":"));
598    }
599
600    #[test]
601    fn optional_fields_omitted() {
602        let event = TeamEvent::daemon_started();
603        let json = serde_json::to_string(&event).unwrap();
604        assert!(!json.contains("\"role\""));
605        assert!(!json.contains("\"task\""));
606    }
607
608    #[test]
609    fn event_sink_writes_jsonl() {
610        let tmp = tempfile::tempdir().unwrap();
611        let path = tmp.path().join("events.jsonl");
612        let mut sink = EventSink::new(&path).unwrap();
613        sink.emit(TeamEvent::daemon_started()).unwrap();
614        sink.emit(TeamEvent::task_assigned("eng-1", "fix bug"))
615            .unwrap();
616        sink.emit(TeamEvent::daemon_stopped()).unwrap();
617
618        let content = std::fs::read_to_string(&path).unwrap();
619        let lines: Vec<&str> = content.lines().collect();
620        assert_eq!(lines.len(), 3);
621        assert!(lines[0].contains("daemon_started"));
622        assert!(lines[1].contains("task_assigned"));
623        assert!(lines[2].contains("daemon_stopped"));
624    }
625
626    #[test]
627    fn all_event_variants_serialize_with_correct_event_field() {
628        let variants: Vec<(&str, TeamEvent)> = vec![
629            ("daemon_started", TeamEvent::daemon_started()),
630            ("daemon_reloading", TeamEvent::daemon_reloading()),
631            ("daemon_reloaded", TeamEvent::daemon_reloaded()),
632            ("daemon_stopped", TeamEvent::daemon_stopped()),
633            (
634                "daemon_stopped",
635                TeamEvent::daemon_stopped_with_reason("signal", 3600),
636            ),
637            ("daemon_heartbeat", TeamEvent::daemon_heartbeat(120)),
638            (
639                "context_exhausted",
640                TeamEvent::context_exhausted("eng-1", Some(42), Some(1_024)),
641            ),
642            (
643                "loop_step_error",
644                TeamEvent::loop_step_error("poll_watchers", "tmux error"),
645            ),
646            (
647                "daemon_panic",
648                TeamEvent::daemon_panic("index out of bounds"),
649            ),
650            ("task_assigned", TeamEvent::task_assigned("eng-1", "task")),
651            (
652                "cwd_corrected",
653                TeamEvent::cwd_corrected("eng-1", "/tmp/worktree"),
654            ),
655            (
656                "task_escalated",
657                TeamEvent::task_escalated("eng-1", "task", None),
658            ),
659            ("task_unblocked", TeamEvent::task_unblocked("eng-1", "task")),
660            (
661                "performance_regression",
662                TeamEvent::performance_regression("42", "runtime_ms=1300 avg_ms=1000 pct=30"),
663            ),
664            (
665                "task_completed",
666                TeamEvent::task_completed("eng-1", Some("42")),
667            ),
668            ("standup_generated", TeamEvent::standup_generated("manager")),
669            ("retro_generated", TeamEvent::retro_generated()),
670            (
671                "pattern_detected",
672                TeamEvent::pattern_detected("merge_conflict_recurrence", 5),
673            ),
674            ("member_crashed", TeamEvent::member_crashed("eng-1", true)),
675            ("pane_death", TeamEvent::pane_death("eng-1")),
676            ("pane_respawned", TeamEvent::pane_respawned("eng-1")),
677            (
678                "context_pressure_warning",
679                TeamEvent::context_pressure_warning("eng-1", Some(42), 400_000, 512_000),
680            ),
681            ("message_routed", TeamEvent::message_routed("a", "b")),
682            ("agent_spawned", TeamEvent::agent_spawned("eng-1")),
683            (
684                "agent_restarted",
685                TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 1),
686            ),
687            (
688                "delivery_failed",
689                TeamEvent::delivery_failed("eng-1", "manager", "message marker missing"),
690            ),
691            (
692                "task_auto_merged",
693                TeamEvent::task_auto_merged("eng-1", "42", 0.95, 2, 30),
694            ),
695            ("task_manual_merged", TeamEvent::task_manual_merged("42")),
696            (
697                "merge_confidence_scored",
698                TeamEvent::merge_confidence_scored(&MergeConfidenceInfo {
699                    engineer: "eng-1",
700                    task: "42",
701                    confidence: 0.85,
702                    files_changed: 3,
703                    lines_changed: 50,
704                    has_migrations: false,
705                    has_config_changes: false,
706                    rename_count: 0,
707                }),
708            ),
709            (
710                "review_nudge_sent",
711                TeamEvent::review_nudge_sent("manager", "42"),
712            ),
713            (
714                "review_escalated",
715                TeamEvent::review_escalated("42", "stale review"),
716            ),
717            ("task_reworked", TeamEvent::task_reworked("eng-1", "42")),
718            ("load_snapshot", TeamEvent::load_snapshot(2, 5, true)),
719            (
720                "worktree_reconciled",
721                TeamEvent::worktree_reconciled("eng-1", "eng-1/42"),
722            ),
723        ];
724        for (expected_event, event) in &variants {
725            let json = serde_json::to_string(event).unwrap();
726            let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
727            assert_eq!(parsed["event"].as_str().unwrap(), *expected_event);
728            assert!(parsed["ts"].as_u64().is_some());
729        }
730    }
731
732    #[test]
733    fn load_snapshot_serializes_optional_metrics() {
734        let event = TeamEvent::load_snapshot(3, 7, false);
735        let json = serde_json::to_string(&event).unwrap();
736        assert!(json.contains("\"event\":\"load_snapshot\""));
737        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
738        assert_eq!(parsed["load"].as_f64().unwrap(), 3.0 / 7.0);
739        assert_eq!(parsed["working_members"].as_u64().unwrap(), 3);
740        assert_eq!(parsed["total_members"].as_u64().unwrap(), 7);
741        assert!(!parsed["session_running"].as_bool().unwrap());
742    }
743
744    #[test]
745    fn read_events_parses_all_known_lines() {
746        let tmp = tempfile::tempdir().unwrap();
747        let path = tmp.path().join("events.jsonl");
748        let mut sink = EventSink::new(&path).unwrap();
749        sink.emit(TeamEvent::daemon_started()).unwrap();
750        sink.emit(TeamEvent::load_snapshot(1, 4, true)).unwrap();
751        sink.emit(TeamEvent::load_snapshot(2, 4, true)).unwrap();
752
753        let events = read_events(&path).unwrap();
754        assert_eq!(events.len(), 3);
755        assert_eq!(events[1].event, "load_snapshot");
756        assert_eq!(events[1].working_members, Some(1));
757        assert_eq!(events[2].total_members, Some(4));
758    }
759
760    #[test]
761    fn event_sink_appends_to_existing_file() {
762        let tmp = tempfile::tempdir().unwrap();
763        let path = tmp.path().join("events.jsonl");
764
765        // Write one event and close the sink
766        {
767            let mut sink = EventSink::new(&path).unwrap();
768            sink.emit(TeamEvent::daemon_started()).unwrap();
769        }
770
771        // Open again and append another
772        {
773            let mut sink = EventSink::new(&path).unwrap();
774            sink.emit(TeamEvent::daemon_stopped()).unwrap();
775        }
776
777        let content = std::fs::read_to_string(&path).unwrap();
778        let lines: Vec<&str> = content.lines().collect();
779        assert_eq!(lines.len(), 2);
780        assert!(lines[0].contains("daemon_started"));
781        assert!(lines[1].contains("daemon_stopped"));
782    }
783
784    #[test]
785    fn event_with_special_characters_in_task() {
786        let event = TeamEvent::task_assigned("eng-1", "fix: \"quotes\" & <angles> / \\ newline\n");
787        let json = serde_json::to_string(&event).unwrap();
788        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
789        let task_val = parsed["task"].as_str().unwrap();
790        assert!(task_val.contains("\"quotes\""));
791        assert!(task_val.contains("<angles>"));
792    }
793
794    #[test]
795    fn task_escalated_serializes_role_and_task() {
796        let event = TeamEvent::task_escalated("eng-1-1", "42", Some("tests_failed"));
797        let json = serde_json::to_string(&event).unwrap();
798        assert!(json.contains("\"event\":\"task_escalated\""));
799        assert!(json.contains("\"role\":\"eng-1-1\""));
800        assert!(json.contains("\"task\":\"42\""));
801        assert!(json.contains("\"reason\":\"tests_failed\""));
802    }
803
804    #[test]
805    fn cwd_corrected_serializes_role_and_reason() {
806        let event = TeamEvent::cwd_corrected("eng-1-1", "/tmp/worktree");
807        let json = serde_json::to_string(&event).unwrap();
808        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
809        assert_eq!(parsed["event"].as_str().unwrap(), "cwd_corrected");
810        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
811        assert_eq!(parsed["reason"].as_str().unwrap(), "/tmp/worktree");
812    }
813
814    #[test]
815    fn pane_death_serializes_role() {
816        let event = TeamEvent::pane_death("eng-1-1");
817        let json = serde_json::to_string(&event).unwrap();
818        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
819        assert_eq!(parsed["event"].as_str().unwrap(), "pane_death");
820        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
821    }
822
823    #[test]
824    fn pane_respawned_serializes_role() {
825        let event = TeamEvent::pane_respawned("eng-1-1");
826        let json = serde_json::to_string(&event).unwrap();
827        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
828        assert_eq!(parsed["event"].as_str().unwrap(), "pane_respawned");
829        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
830    }
831
832    #[test]
833    fn agent_restarted_includes_reason_task_and_count() {
834        let event = TeamEvent::agent_restarted("eng-1-2", "67", "context_exhausted", 2);
835        let json = serde_json::to_string(&event).unwrap();
836        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
837        assert_eq!(parsed["event"].as_str().unwrap(), "agent_restarted");
838        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
839        assert_eq!(parsed["task"].as_str().unwrap(), "67");
840        assert_eq!(parsed["reason"].as_str().unwrap(), "context_exhausted");
841        assert_eq!(parsed["restart_count"].as_u64().unwrap(), 2);
842    }
843
844    #[test]
845    fn context_pressure_warning_includes_threshold_and_output_bytes() {
846        let event = TeamEvent::context_pressure_warning("eng-1-2", Some(67), 420_000, 512_000);
847        let json = serde_json::to_string(&event).unwrap();
848        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
849
850        assert_eq!(
851            parsed["event"].as_str().unwrap(),
852            "context_pressure_warning"
853        );
854        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
855        assert_eq!(parsed["task"].as_str().unwrap(), "67");
856        assert_eq!(parsed["output_bytes"].as_u64().unwrap(), 420_000);
857        assert_eq!(parsed["reason"].as_str().unwrap(), "threshold_bytes=512000");
858    }
859
860    #[test]
861    fn delivery_failed_includes_role_sender_and_reason() {
862        let event = TeamEvent::delivery_failed("eng-1-2", "manager", "message marker missing");
863        let json = serde_json::to_string(&event).unwrap();
864        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
865        assert_eq!(parsed["event"].as_str().unwrap(), "delivery_failed");
866        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
867        assert_eq!(parsed["from"].as_str().unwrap(), "manager");
868        assert_eq!(parsed["reason"].as_str().unwrap(), "message marker missing");
869    }
870
871    #[test]
872    fn task_unblocked_serializes_role_and_task() {
873        let event = TeamEvent::task_unblocked("eng-1-1", "42");
874        let json = serde_json::to_string(&event).unwrap();
875        assert!(json.contains("\"event\":\"task_unblocked\""));
876        assert!(json.contains("\"role\":\"eng-1-1\""));
877        assert!(json.contains("\"task\":\"42\""));
878    }
879
880    #[test]
881    fn merge_confidence_scored_includes_all_fields() {
882        let event = TeamEvent::merge_confidence_scored(&MergeConfidenceInfo {
883            engineer: "eng-1-1",
884            task: "42",
885            confidence: 0.85,
886            files_changed: 3,
887            lines_changed: 50,
888            has_migrations: true,
889            has_config_changes: false,
890            rename_count: 1,
891        });
892        let json = serde_json::to_string(&event).unwrap();
893        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
894        assert_eq!(parsed["event"].as_str().unwrap(), "merge_confidence_scored");
895        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
896        assert_eq!(parsed["task"].as_str().unwrap(), "42");
897        assert!((parsed["load"].as_f64().unwrap() - 0.85).abs() < 0.001);
898        let reason = parsed["reason"].as_str().unwrap();
899        assert!(reason.contains("files=3"));
900        assert!(reason.contains("lines=50"));
901        assert!(reason.contains("migrations=true"));
902        assert!(reason.contains("config=false"));
903        assert!(reason.contains("renames=1"));
904    }
905
906    #[test]
907    fn performance_regression_serializes_task_and_reason() {
908        let event = TeamEvent::performance_regression("42", "runtime_ms=1300 avg_ms=1000 pct=30");
909        let json = serde_json::to_string(&event).unwrap();
910        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
911        assert_eq!(parsed["event"].as_str().unwrap(), "performance_regression");
912        assert_eq!(parsed["task"].as_str().unwrap(), "42");
913        assert_eq!(
914            parsed["reason"].as_str().unwrap(),
915            "runtime_ms=1300 avg_ms=1000 pct=30"
916        );
917    }
918
919    #[test]
920    fn daemon_stopped_with_reason_includes_fields() {
921        let event = TeamEvent::daemon_stopped_with_reason("signal", 7200);
922        let json = serde_json::to_string(&event).unwrap();
923        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
924        assert_eq!(parsed["reason"].as_str().unwrap(), "signal");
925        assert_eq!(parsed["uptime_secs"].as_u64().unwrap(), 7200);
926    }
927
928    #[test]
929    fn heartbeat_includes_uptime() {
930        let event = TeamEvent::daemon_heartbeat(600);
931        let json = serde_json::to_string(&event).unwrap();
932        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
933        assert_eq!(parsed["event"].as_str().unwrap(), "daemon_heartbeat");
934        assert_eq!(parsed["uptime_secs"].as_u64().unwrap(), 600);
935        // No reason/step/error fields
936        assert!(parsed.get("reason").is_none());
937        assert!(parsed.get("step").is_none());
938    }
939
940    #[test]
941    fn context_exhausted_includes_role_task_and_session_size() {
942        let event = TeamEvent::context_exhausted("eng-1", Some(77), Some(4096));
943        let json = serde_json::to_string(&event).unwrap();
944        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
945        assert_eq!(parsed["event"].as_str().unwrap(), "context_exhausted");
946        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1");
947        assert_eq!(parsed["task"].as_str().unwrap(), "77");
948        assert_eq!(parsed["session_size_bytes"].as_u64().unwrap(), 4096);
949    }
950
951    #[test]
952    fn loop_step_error_includes_step_and_error() {
953        let event = TeamEvent::loop_step_error("poll_watchers", "connection refused");
954        let json = serde_json::to_string(&event).unwrap();
955        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
956        assert_eq!(parsed["step"].as_str().unwrap(), "poll_watchers");
957        assert_eq!(parsed["error"].as_str().unwrap(), "connection refused");
958    }
959
960    #[test]
961    fn daemon_panic_includes_reason() {
962        let event = TeamEvent::daemon_panic("index out of bounds");
963        let json = serde_json::to_string(&event).unwrap();
964        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
965        assert_eq!(parsed["event"].as_str().unwrap(), "daemon_panic");
966        assert_eq!(parsed["reason"].as_str().unwrap(), "index out of bounds");
967    }
968
969    #[test]
970    fn new_fields_omitted_from_basic_events() {
971        let event = TeamEvent::daemon_started();
972        let json = serde_json::to_string(&event).unwrap();
973        assert!(!json.contains("\"reason\""));
974        assert!(!json.contains("\"step\""));
975        assert!(!json.contains("\"error\""));
976        assert!(!json.contains("\"uptime_secs\""));
977        assert!(!json.contains("\"restart_count\""));
978    }
979
980    #[test]
981    fn pattern_detected_includes_reason_payload() {
982        let event = TeamEvent::pattern_detected("escalation_cluster", 6);
983        let json = serde_json::to_string(&event).unwrap();
984        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
985        assert_eq!(parsed["event"].as_str().unwrap(), "pattern_detected");
986        assert_eq!(parsed["reason"].as_str().unwrap(), "escalation_cluster:6");
987    }
988
989    #[test]
990    fn event_sink_creates_parent_directories() {
991        let tmp = tempfile::tempdir().unwrap();
992        let path = tmp.path().join("deep").join("nested").join("events.jsonl");
993        let mut sink = EventSink::new(&path).unwrap();
994        sink.emit(TeamEvent::daemon_started()).unwrap();
995        assert!(path.exists());
996        assert_eq!(sink.path(), path);
997    }
998
999    #[test]
1000    fn event_sink_rotates_oversized_log_on_open() {
1001        let tmp = tempfile::tempdir().unwrap();
1002        let path = tmp.path().join("events.jsonl");
1003        fs::write(&path, "0123456789").unwrap();
1004
1005        let mut sink = EventSink::new_with_max_bytes(&path, 5).unwrap();
1006        sink.emit(TeamEvent::daemon_started()).unwrap();
1007
1008        let rotated = rotated_event_log_path(&path);
1009        assert_eq!(fs::read_to_string(&rotated).unwrap(), "0123456789");
1010        let current = fs::read_to_string(&path).unwrap();
1011        assert!(current.contains("daemon_started"));
1012    }
1013
1014    #[test]
1015    fn event_sink_rotates_before_write_that_would_exceed_threshold() {
1016        let tmp = tempfile::tempdir().unwrap();
1017        let path = tmp.path().join("events.jsonl");
1018        let first_line = "{\"event\":\"first\"}\n";
1019        fs::write(&path, first_line).unwrap();
1020
1021        let mut sink = EventSink::new_with_max_bytes(&path, first_line.len() as u64 + 10).unwrap();
1022        sink.emit(TeamEvent::task_assigned(
1023            "eng-1",
1024            "this assignment is long enough to rotate",
1025        ))
1026        .unwrap();
1027
1028        let rotated = rotated_event_log_path(&path);
1029        assert_eq!(fs::read_to_string(&rotated).unwrap(), first_line);
1030        let current = fs::read_to_string(&path).unwrap();
1031        assert!(current.contains("task_assigned"));
1032        assert!(!current.contains("\"event\":\"first\""));
1033    }
1034
1035    #[test]
1036    fn event_round_trip_preserves_fields_for_agent_restarted() {
1037        let original = TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 3);
1038
1039        let json = serde_json::to_string(&original).unwrap();
1040        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1041
1042        assert_eq!(parsed.event, "agent_restarted");
1043        assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1044        assert_eq!(parsed.task.as_deref(), Some("42"));
1045        assert_eq!(parsed.reason.as_deref(), Some("context_exhausted"));
1046        assert_eq!(parsed.restart_count, Some(3));
1047        assert_eq!(parsed.ts, original.ts);
1048    }
1049
1050    #[test]
1051    fn event_round_trip_preserves_fields_for_load_snapshot() {
1052        let original = TeamEvent::load_snapshot(4, 8, true);
1053
1054        let json = serde_json::to_string(&original).unwrap();
1055        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1056
1057        assert_eq!(parsed.event, "load_snapshot");
1058        assert_eq!(parsed.working_members, Some(4));
1059        assert_eq!(parsed.total_members, Some(8));
1060        assert_eq!(parsed.session_running, Some(true));
1061        assert_eq!(parsed.load, Some(0.5));
1062        assert_eq!(parsed.ts, original.ts);
1063    }
1064
1065    #[test]
1066    fn event_round_trip_preserves_fields_for_delivery_failed() {
1067        let original = TeamEvent::delivery_failed("eng-2", "manager", "marker missing");
1068
1069        let json = serde_json::to_string(&original).unwrap();
1070        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1071
1072        assert_eq!(parsed.event, "delivery_failed");
1073        assert_eq!(parsed.role.as_deref(), Some("eng-2"));
1074        assert_eq!(parsed.from.as_deref(), Some("manager"));
1075        assert_eq!(parsed.reason.as_deref(), Some("marker missing"));
1076        assert_eq!(parsed.ts, original.ts);
1077    }
1078
1079    #[test]
1080    fn read_events_skips_blank_and_malformed_lines() {
1081        let tmp = tempfile::tempdir().unwrap();
1082        let path = tmp.path().join("events.jsonl");
1083        fs::write(
1084            &path,
1085            [
1086                "",
1087                "{\"event\":\"daemon_started\",\"ts\":1}",
1088                "not-json",
1089                "   ",
1090                "{\"event\":\"daemon_stopped\",\"ts\":2}",
1091            ]
1092            .join("\n"),
1093        )
1094        .unwrap();
1095
1096        let events = read_events(&path).unwrap();
1097
1098        assert_eq!(events.len(), 2);
1099        assert_eq!(events[0].event, "daemon_started");
1100        assert_eq!(events[1].event, "daemon_stopped");
1101    }
1102
1103    #[test]
1104    fn rotate_event_log_if_needed_returns_false_for_missing_file() {
1105        let tmp = tempfile::tempdir().unwrap();
1106        let path = tmp.path().join("events.jsonl");
1107
1108        let rotated = rotate_event_log_if_needed(&path, 128, 0).unwrap();
1109
1110        assert!(!rotated);
1111        assert!(!rotated_event_log_path(&path).exists());
1112    }
1113
1114    #[test]
1115    fn rotate_event_log_if_needed_returns_false_for_empty_file() {
1116        let tmp = tempfile::tempdir().unwrap();
1117        let path = tmp.path().join("events.jsonl");
1118        fs::write(&path, "").unwrap();
1119
1120        let rotated = rotate_event_log_if_needed(&path, 1, 1).unwrap();
1121
1122        assert!(!rotated);
1123        assert!(path.exists());
1124        assert!(!rotated_event_log_path(&path).exists());
1125    }
1126
1127    #[test]
1128    fn rotate_event_log_if_needed_replaces_existing_rotated_file() {
1129        let tmp = tempfile::tempdir().unwrap();
1130        let path = tmp.path().join("events.jsonl");
1131        let rotated_path = rotated_event_log_path(&path);
1132        fs::write(&path, "current-events").unwrap();
1133        fs::write(&rotated_path, "old-rotated-events").unwrap();
1134
1135        let rotated = rotate_event_log_if_needed(&path, 5, 0).unwrap();
1136
1137        assert!(rotated);
1138        assert_eq!(fs::read_to_string(&rotated_path).unwrap(), "current-events");
1139    }
1140
1141    #[test]
1142    fn concurrent_event_sinks_append_without_losing_lines() {
1143        let tmp = tempfile::tempdir().unwrap();
1144        let path = Arc::new(tmp.path().join("events.jsonl"));
1145        let ready = Arc::new(std::sync::Barrier::new(5));
1146        let errors = Arc::new(Mutex::new(Vec::<String>::new()));
1147        let mut handles = Vec::new();
1148
1149        for idx in 0..4 {
1150            let path = Arc::clone(&path);
1151            let ready = Arc::clone(&ready);
1152            let errors = Arc::clone(&errors);
1153            handles.push(thread::spawn(move || {
1154                ready.wait();
1155                let result = (|| -> Result<()> {
1156                    let mut sink = EventSink::new(&path)?;
1157                    sink.emit(TeamEvent::task_assigned(
1158                        &format!("eng-{idx}"),
1159                        &format!("task-{idx}"),
1160                    ))?;
1161                    Ok(())
1162                })();
1163                if let Err(error) = result {
1164                    errors.lock().unwrap().push(error.to_string());
1165                }
1166            }));
1167        }
1168
1169        ready.wait();
1170        for handle in handles {
1171            handle.join().unwrap();
1172        }
1173
1174        assert!(errors.lock().unwrap().is_empty());
1175        let events = read_events(&path).unwrap();
1176        assert_eq!(events.len(), 4);
1177        for idx in 0..4 {
1178            assert!(
1179                events
1180                    .iter()
1181                    .any(|event| event.role.as_deref() == Some(&format!("eng-{idx}")))
1182            );
1183        }
1184    }
1185
1186    #[test]
1187    fn read_events_handles_large_log_file() {
1188        let tmp = tempfile::tempdir().unwrap();
1189        let path = tmp.path().join("events.jsonl");
1190        let mut sink = EventSink::new(&path).unwrap();
1191
1192        for idx in 0..512 {
1193            sink.emit(TeamEvent::task_assigned(
1194                &format!("eng-{idx}"),
1195                &"x".repeat(128),
1196            ))
1197            .unwrap();
1198        }
1199
1200        let events = read_events(&path).unwrap();
1201
1202        assert_eq!(events.len(), 512);
1203        assert_eq!(events.first().unwrap().event, "task_assigned");
1204        assert_eq!(events.last().unwrap().event, "task_assigned");
1205    }
1206
1207    fn production_unwrap_expect_count(source: &str) -> usize {
1208        let prod = if let Some(pos) = source.find("\n#[cfg(test)]\nmod tests") {
1209            &source[..pos]
1210        } else {
1211            source
1212        };
1213        prod.lines()
1214            .filter(|line| {
1215                let trimmed = line.trim();
1216                !trimmed.starts_with("#[cfg(test)]")
1217                    && (trimmed.contains(".unwrap(") || trimmed.contains(".expect("))
1218            })
1219            .count()
1220    }
1221
1222    #[test]
1223    fn task_completed_includes_task_id() {
1224        let event = TeamEvent::task_completed("eng-1", Some("42"));
1225        let json = serde_json::to_string(&event).unwrap();
1226        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1227        assert_eq!(parsed["event"].as_str().unwrap(), "task_completed");
1228        assert_eq!(parsed["role"].as_str().unwrap(), "eng-1");
1229        assert_eq!(parsed["task"].as_str().unwrap(), "42");
1230    }
1231
1232    #[test]
1233    fn task_completed_without_task_id_omits_task_field() {
1234        let event = TeamEvent::task_completed("eng-1", None);
1235        let json = serde_json::to_string(&event).unwrap();
1236        assert!(json.contains("\"event\":\"task_completed\""));
1237        assert!(json.contains("\"role\":\"eng-1\""));
1238        assert!(!json.contains("\"task\""));
1239    }
1240
1241    #[test]
1242    fn task_escalated_without_reason_omits_reason_field() {
1243        let event = TeamEvent::task_escalated("eng-1", "42", None);
1244        let json = serde_json::to_string(&event).unwrap();
1245        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1246        assert_eq!(parsed["event"].as_str().unwrap(), "task_escalated");
1247        assert_eq!(parsed["task"].as_str().unwrap(), "42");
1248        assert!(parsed.get("reason").is_none());
1249    }
1250
1251    #[test]
1252    fn task_escalated_with_reason_includes_reason_field() {
1253        let event = TeamEvent::task_escalated("eng-1", "42", Some("merge_conflict"));
1254        let json = serde_json::to_string(&event).unwrap();
1255        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1256        assert_eq!(parsed["event"].as_str().unwrap(), "task_escalated");
1257        assert_eq!(parsed["task"].as_str().unwrap(), "42");
1258        assert_eq!(parsed["reason"].as_str().unwrap(), "merge_conflict");
1259    }
1260
1261    #[test]
1262    fn task_completed_round_trip_preserves_task_id() {
1263        let original = TeamEvent::task_completed("eng-1", Some("99"));
1264        let json = serde_json::to_string(&original).unwrap();
1265        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1266        assert_eq!(parsed.event, "task_completed");
1267        assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1268        assert_eq!(parsed.task.as_deref(), Some("99"));
1269    }
1270
1271    #[test]
1272    fn task_escalated_round_trip_preserves_reason() {
1273        let original = TeamEvent::task_escalated("eng-1", "42", Some("context_exhausted"));
1274        let json = serde_json::to_string(&original).unwrap();
1275        let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1276        assert_eq!(parsed.event, "task_escalated");
1277        assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1278        assert_eq!(parsed.task.as_deref(), Some("42"));
1279        assert_eq!(parsed.reason.as_deref(), Some("context_exhausted"));
1280    }
1281
1282    #[test]
1283    fn production_events_has_no_unwrap_or_expect_calls() {
1284        let src = include_str!("events.rs");
1285        assert_eq!(
1286            production_unwrap_expect_count(src),
1287            0,
1288            "production events.rs should avoid unwrap/expect"
1289        );
1290    }
1291
1292    #[test]
1293    fn stall_detected_event_fields() {
1294        let event = TeamEvent::stall_detected("eng-1-1", Some(42), 300);
1295        assert_eq!(event.event, "stall_detected");
1296        assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1297        assert_eq!(event.task.as_deref(), Some("42"));
1298        assert_eq!(event.uptime_secs, Some(300));
1299    }
1300
1301    #[test]
1302    fn stall_detected_event_without_task() {
1303        let event = TeamEvent::stall_detected("eng-1-1", None, 600);
1304        assert_eq!(event.event, "stall_detected");
1305        assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1306        assert!(event.task.is_none());
1307        assert_eq!(event.uptime_secs, Some(600));
1308    }
1309
1310    #[test]
1311    fn stall_detected_event_serializes_to_jsonl() {
1312        let event = TeamEvent::stall_detected("eng-1-1", Some(42), 300);
1313        let json = serde_json::to_string(&event).unwrap();
1314        assert!(json.contains("\"stall_detected\""));
1315        assert!(json.contains("\"eng-1-1\""));
1316        assert!(json.contains("\"42\""));
1317    }
1318
1319    #[test]
1320    fn health_changed_event_fields() {
1321        let event = TeamEvent::health_changed("eng-1-1", "healthy→unreachable");
1322        assert_eq!(event.event, "health_changed");
1323        assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1324        assert_eq!(event.reason.as_deref(), Some("healthy→unreachable"));
1325    }
1326
1327    #[test]
1328    fn health_changed_event_serializes_to_jsonl() {
1329        let event = TeamEvent::health_changed("eng-1-2", "unreachable→healthy");
1330        let json = serde_json::to_string(&event).unwrap();
1331        assert!(json.contains("\"health_changed\""));
1332        assert!(json.contains("\"eng-1-2\""));
1333    }
1334
1335    // --- Error path and recovery tests (Task #265) ---
1336
1337    #[test]
1338    fn event_sink_on_readonly_dir_returns_error() {
1339        #[cfg(unix)]
1340        {
1341            use std::os::unix::fs::PermissionsExt;
1342            let tmp = tempfile::tempdir().unwrap();
1343            let readonly_dir = tmp.path().join("readonly");
1344            fs::create_dir(&readonly_dir).unwrap();
1345            fs::set_permissions(&readonly_dir, fs::Permissions::from_mode(0o444)).unwrap();
1346
1347            let path = readonly_dir.join("subdir").join("events.jsonl");
1348            let result = EventSink::new(&path);
1349            assert!(result.is_err());
1350
1351            // Restore permissions for cleanup
1352            fs::set_permissions(&readonly_dir, fs::Permissions::from_mode(0o755)).unwrap();
1353        }
1354    }
1355
1356    #[test]
1357    fn read_events_from_nonexistent_file_returns_empty() {
1358        let tmp = tempfile::tempdir().unwrap();
1359        let path = tmp.path().join("does_not_exist.jsonl");
1360        let events = read_events(&path).unwrap();
1361        assert!(events.is_empty());
1362    }
1363
1364    #[test]
1365    fn read_events_all_malformed_lines_returns_empty() {
1366        let tmp = tempfile::tempdir().unwrap();
1367        let path = tmp.path().join("events.jsonl");
1368        fs::write(&path, "not json\nalso not json\n{invalid}\n").unwrap();
1369        let events = read_events(&path).unwrap();
1370        assert!(events.is_empty());
1371    }
1372
1373    #[test]
1374    fn event_sink_emit_with_failing_writer() {
1375        struct FailWriter;
1376        impl Write for FailWriter {
1377            fn write(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
1378                Err(std::io::Error::new(
1379                    std::io::ErrorKind::BrokenPipe,
1380                    "simulated write failure",
1381                ))
1382            }
1383            fn flush(&mut self) -> std::io::Result<()> {
1384                Err(std::io::Error::new(
1385                    std::io::ErrorKind::BrokenPipe,
1386                    "simulated flush failure",
1387                ))
1388            }
1389        }
1390
1391        let tmp = tempfile::tempdir().unwrap();
1392        let path = tmp.path().join("events.jsonl");
1393        let mut sink = EventSink::from_writer(&path, FailWriter);
1394        let result = sink.emit(TeamEvent::daemon_started());
1395        assert!(result.is_err());
1396    }
1397
1398    #[test]
1399    fn rotate_event_log_replaces_stale_rotated_file() {
1400        let tmp = tempfile::tempdir().unwrap();
1401        let path = tmp.path().join("events.jsonl");
1402        let rotated = rotated_event_log_path(&path);
1403
1404        fs::write(&path, "current-data-that-is-large").unwrap();
1405        fs::write(&rotated, "old-rotated-data").unwrap();
1406
1407        let did_rotate = rotate_event_log_if_needed(&path, 5, 0).unwrap();
1408        assert!(did_rotate);
1409        // Old rotated was replaced with current data
1410        assert_eq!(
1411            fs::read_to_string(&rotated).unwrap(),
1412            "current-data-that-is-large"
1413        );
1414        // Current file is now gone (rotated away)
1415        assert!(!path.exists());
1416    }
1417
1418    #[test]
1419    fn event_sink_handles_zero_max_bytes_rotation() {
1420        let tmp = tempfile::tempdir().unwrap();
1421        let path = tmp.path().join("events.jsonl");
1422
1423        // With max_bytes=0, any existing content triggers rotation, but empty file doesn't
1424        fs::write(&path, "").unwrap();
1425        let did_rotate = rotate_event_log_if_needed(&path, 0, 0).unwrap();
1426        assert!(!did_rotate); // empty file → no rotation
1427
1428        fs::write(&path, "x").unwrap();
1429        let did_rotate = rotate_event_log_if_needed(&path, 0, 0).unwrap();
1430        assert!(did_rotate); // non-empty file at 0-byte limit → rotation
1431    }
1432
1433    #[test]
1434    fn read_events_partial_json_with_valid_lines_mixed() {
1435        let tmp = tempfile::tempdir().unwrap();
1436        let path = tmp.path().join("events.jsonl");
1437        // Simulate a truncated write: valid JSON, then partial, then valid
1438        let content = format!(
1439            "{}\n{{\"event\":\"trunca\n{}\n",
1440            r#"{"event":"daemon_started","ts":1}"#, r#"{"event":"daemon_stopped","ts":3}"#
1441        );
1442        fs::write(&path, content).unwrap();
1443
1444        let events = read_events(&path).unwrap();
1445        assert_eq!(events.len(), 2);
1446        assert_eq!(events[0].event, "daemon_started");
1447        assert_eq!(events[1].event, "daemon_stopped");
1448    }
1449}