1use std::fs::{self, OpenOptions};
4use std::io::{BufWriter, Write};
5use std::path::{Path, PathBuf};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10
11use super::DEFAULT_EVENT_LOG_MAX_BYTES;
12
13pub struct MergeConfidenceInfo<'a> {
15 pub engineer: &'a str,
16 pub task: &'a str,
17 pub confidence: f64,
18 pub files_changed: usize,
19 pub lines_changed: usize,
20 pub has_migrations: bool,
21 pub has_config_changes: bool,
22 pub rename_count: usize,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26
27pub struct TeamEvent {
28 pub event: String,
29 #[serde(skip_serializing_if = "Option::is_none")]
30 pub role: Option<String>,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 pub task: Option<String>,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub recipient: Option<String>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub from: Option<String>,
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub to: Option<String>,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub restart: Option<bool>,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 pub restart_count: Option<u32>,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 pub load: Option<f64>,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub working_members: Option<u32>,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub total_members: Option<u32>,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub session_running: Option<bool>,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub reason: Option<String>,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 pub step: Option<String>,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 pub error: Option<String>,
57 #[serde(skip_serializing_if = "Option::is_none")]
58 pub uptime_secs: Option<u64>,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub session_size_bytes: Option<u64>,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub output_bytes: Option<u64>,
63 pub ts: u64,
64}
65
66impl TeamEvent {
67 fn now() -> u64 {
68 SystemTime::now()
69 .duration_since(UNIX_EPOCH)
70 .unwrap_or_default()
71 .as_secs()
72 }
73
74 fn base(event: &str) -> Self {
75 Self {
76 event: event.into(),
77 role: None,
78 task: None,
79 recipient: None,
80 from: None,
81 to: None,
82 restart: None,
83 restart_count: None,
84 load: None,
85 working_members: None,
86 total_members: None,
87 session_running: None,
88 reason: None,
89 step: None,
90 error: None,
91 uptime_secs: None,
92 session_size_bytes: None,
93 output_bytes: None,
94 ts: Self::now(),
95 }
96 }
97
98 pub fn daemon_started() -> Self {
99 Self::base("daemon_started")
100 }
101
102 pub fn daemon_reloading() -> Self {
103 Self::base("daemon_reloading")
104 }
105
106 pub fn daemon_reloaded() -> Self {
107 Self::base("daemon_reloaded")
108 }
109
110 pub fn daemon_stopped() -> Self {
111 Self::base("daemon_stopped")
112 }
113
114 pub fn daemon_stopped_with_reason(reason: &str, uptime_secs: u64) -> Self {
115 Self {
116 reason: Some(reason.into()),
117 uptime_secs: Some(uptime_secs),
118 ..Self::base("daemon_stopped")
119 }
120 }
121
122 pub fn daemon_heartbeat(uptime_secs: u64) -> Self {
123 Self {
124 uptime_secs: Some(uptime_secs),
125 ..Self::base("daemon_heartbeat")
126 }
127 }
128
129 pub fn context_exhausted(
130 role: &str,
131 task: Option<u32>,
132 session_size_bytes: Option<u64>,
133 ) -> Self {
134 Self {
135 role: Some(role.into()),
136 task: task.map(|task_id| task_id.to_string()),
137 session_size_bytes,
138 ..Self::base("context_exhausted")
139 }
140 }
141
142 pub fn loop_step_error(step: &str, error: &str) -> Self {
143 Self {
144 step: Some(step.into()),
145 error: Some(error.into()),
146 ..Self::base("loop_step_error")
147 }
148 }
149
150 pub fn daemon_panic(reason: &str) -> Self {
151 Self {
152 reason: Some(reason.into()),
153 ..Self::base("daemon_panic")
154 }
155 }
156
157 pub fn task_assigned(role: &str, task: &str) -> Self {
158 Self {
159 role: Some(role.into()),
160 task: Some(task.into()),
161 ..Self::base("task_assigned")
162 }
163 }
164
165 pub fn cwd_corrected(role: &str, path: &str) -> Self {
166 Self {
167 role: Some(role.into()),
168 reason: Some(path.into()),
169 ..Self::base("cwd_corrected")
170 }
171 }
172
173 pub fn review_nudge_sent(role: &str, task: &str) -> Self {
174 Self {
175 role: Some(role.into()),
176 task: Some(task.into()),
177 ..Self::base("review_nudge_sent")
178 }
179 }
180
181 pub fn review_escalated(task: &str, reason: &str) -> Self {
182 Self {
183 task: Some(task.into()),
184 reason: Some(reason.into()),
185 ..Self::base("review_escalated")
186 }
187 }
188
189 pub fn task_escalated(role: &str, task: &str, reason: Option<&str>) -> Self {
190 Self {
191 role: Some(role.into()),
192 task: Some(task.into()),
193 reason: reason.map(|r| r.into()),
194 ..Self::base("task_escalated")
195 }
196 }
197
198 pub fn task_unblocked(role: &str, task: &str) -> Self {
199 Self {
200 role: Some(role.into()),
201 task: Some(task.into()),
202 ..Self::base("task_unblocked")
203 }
204 }
205
206 pub fn performance_regression(task: &str, reason: &str) -> Self {
207 Self {
208 task: Some(task.into()),
209 reason: Some(reason.into()),
210 ..Self::base("performance_regression")
211 }
212 }
213
214 pub fn task_completed(role: &str, task: Option<&str>) -> Self {
215 Self {
216 role: Some(role.into()),
217 task: task.map(|t| t.into()),
218 ..Self::base("task_completed")
219 }
220 }
221
222 pub fn standup_generated(recipient: &str) -> Self {
223 Self {
224 recipient: Some(recipient.into()),
225 ..Self::base("standup_generated")
226 }
227 }
228
229 pub fn retro_generated() -> Self {
230 Self::base("retro_generated")
231 }
232
233 pub fn pattern_detected(pattern_type: &str, frequency: u32) -> Self {
234 Self {
235 reason: Some(format!("{pattern_type}:{frequency}")),
236 ..Self::base("pattern_detected")
237 }
238 }
239
240 pub fn member_crashed(role: &str, restart: bool) -> Self {
241 Self {
242 role: Some(role.into()),
243 restart: Some(restart),
244 ..Self::base("member_crashed")
245 }
246 }
247
248 pub fn pane_death(role: &str) -> Self {
249 Self {
250 role: Some(role.into()),
251 ..Self::base("pane_death")
252 }
253 }
254
255 pub fn pane_respawned(role: &str) -> Self {
256 Self {
257 role: Some(role.into()),
258 ..Self::base("pane_respawned")
259 }
260 }
261
262 pub fn narration_detected(role: &str, task: Option<u32>) -> Self {
263 Self {
264 role: Some(role.into()),
265 task: task.map(|id| id.to_string()),
266 ..Self::base("narration_detected")
267 }
268 }
269
270 pub fn context_pressure_warning(
271 role: &str,
272 task: Option<u32>,
273 output_bytes: u64,
274 threshold_bytes: u64,
275 ) -> Self {
276 Self {
277 role: Some(role.into()),
278 task: task.map(|id| id.to_string()),
279 output_bytes: Some(output_bytes),
280 reason: Some(format!("threshold_bytes={threshold_bytes}")),
281 ..Self::base("context_pressure_warning")
282 }
283 }
284
285 pub fn stall_detected(role: &str, task: Option<u32>, stall_duration_secs: u64) -> Self {
286 Self {
287 role: Some(role.into()),
288 task: task.map(|id| id.to_string()),
289 uptime_secs: Some(stall_duration_secs),
290 ..Self::base("stall_detected")
291 }
292 }
293
294 pub fn health_changed(role: &str, reason: &str) -> Self {
298 Self {
299 role: Some(role.into()),
300 reason: Some(reason.into()),
301 ..Self::base("health_changed")
302 }
303 }
304
305 pub fn message_routed(from: &str, to: &str) -> Self {
306 Self {
307 from: Some(from.into()),
308 to: Some(to.into()),
309 ..Self::base("message_routed")
310 }
311 }
312
313 pub fn agent_spawned(role: &str) -> Self {
314 Self {
315 role: Some(role.into()),
316 ..Self::base("agent_spawned")
317 }
318 }
319
320 pub fn agent_restarted(role: &str, task: &str, reason: &str, restart_count: u32) -> Self {
321 Self {
322 role: Some(role.into()),
323 task: Some(task.into()),
324 reason: Some(reason.into()),
325 restart_count: Some(restart_count),
326 ..Self::base("agent_restarted")
327 }
328 }
329
330 pub fn task_resumed(role: &str, task: &str, reason: &str, restart_count: u32) -> Self {
331 Self {
332 role: Some(role.into()),
333 task: Some(task.into()),
334 reason: Some(reason.into()),
335 restart_count: Some(restart_count),
336 ..Self::base("task_resumed")
337 }
338 }
339
340 pub fn delivery_failed(role: &str, from: &str, reason: &str) -> Self {
341 Self {
342 role: Some(role.into()),
343 from: Some(from.into()),
344 reason: Some(reason.into()),
345 ..Self::base("delivery_failed")
346 }
347 }
348
349 pub fn task_auto_merged(
350 engineer: &str,
351 task: &str,
352 confidence: f64,
353 files_changed: usize,
354 lines_changed: usize,
355 ) -> Self {
356 Self {
357 role: Some(engineer.into()),
358 task: Some(task.into()),
359 load: Some(confidence),
360 reason: Some(format!("files={} lines={}", files_changed, lines_changed)),
361 ..Self::base("task_auto_merged")
362 }
363 }
364
365 pub fn task_manual_merged(task: &str) -> Self {
366 Self {
367 task: Some(task.into()),
368 ..Self::base("task_manual_merged")
369 }
370 }
371
372 pub fn merge_confidence_scored(info: &MergeConfidenceInfo<'_>) -> Self {
374 let detail = format!(
375 "files={} lines={} migrations={} config={} renames={}",
376 info.files_changed,
377 info.lines_changed,
378 info.has_migrations,
379 info.has_config_changes,
380 info.rename_count
381 );
382 Self {
383 role: Some(info.engineer.into()),
384 task: Some(info.task.into()),
385 load: Some(info.confidence),
386 reason: Some(detail),
387 ..Self::base("merge_confidence_scored")
388 }
389 }
390
391 pub fn review_escalated_by_role(role: &str, task: &str) -> Self {
392 Self {
393 role: Some(role.into()),
394 task: Some(task.into()),
395 ..Self::base("review_escalated")
396 }
397 }
398
399 pub fn task_reworked(role: &str, task: &str) -> Self {
400 Self {
401 role: Some(role.into()),
402 task: Some(task.into()),
403 ..Self::base("task_reworked")
404 }
405 }
406
407 pub fn task_recycled(task_id: u32, cron_expr: &str) -> Self {
408 Self {
409 task: Some(format!("#{task_id}")),
410 reason: Some(cron_expr.into()),
411 ..Self::base("task_recycled")
412 }
413 }
414
415 pub fn load_snapshot(working_members: u32, total_members: u32, session_running: bool) -> Self {
416 let load = if total_members == 0 {
417 0.0
418 } else {
419 working_members as f64 / total_members as f64
420 };
421 Self {
422 load: Some(load),
423 working_members: Some(working_members),
424 total_members: Some(total_members),
425 session_running: Some(session_running),
426 ..Self::base("load_snapshot")
427 }
428 }
429
430 pub fn worktree_reconciled(role: &str, branch: &str) -> Self {
431 Self {
432 role: Some(role.into()),
433 reason: Some(format!("branch '{branch}' merged into main")),
434 ..Self::base("worktree_reconciled")
435 }
436 }
437
438 pub fn topology_changed(added: u32, removed: u32, reason: &str) -> Self {
442 Self {
443 working_members: Some(added),
444 total_members: Some(removed),
445 reason: Some(reason.into()),
446 ..Self::base("topology_changed")
447 }
448 }
449
450 pub fn agent_removed(role: &str, reason: &str) -> Self {
452 Self {
453 role: Some(role.into()),
454 reason: Some(reason.into()),
455 ..Self::base("agent_removed")
456 }
457 }
458}
459
460pub struct EventSink {
461 writer: Box<dyn Write + Send>,
462 path: PathBuf,
463 max_bytes: Option<u64>,
464}
465
466impl EventSink {
467 pub fn new(path: &Path) -> Result<Self> {
468 Self::new_with_max_bytes(path, DEFAULT_EVENT_LOG_MAX_BYTES)
469 }
470
471 pub fn new_with_max_bytes(path: &Path, max_bytes: u64) -> Result<Self> {
472 if let Some(parent) = path.parent() {
473 std::fs::create_dir_all(parent)?;
474 }
475 rotate_event_log_if_needed(path, max_bytes, 0)?;
476 let file = OpenOptions::new()
477 .create(true)
478 .append(true)
479 .open(path)
480 .with_context(|| format!("failed to open event sink: {}", path.display()))?;
481 Ok(Self {
482 writer: Box::new(BufWriter::new(file)),
483 path: path.to_path_buf(),
484 max_bytes: Some(max_bytes),
485 })
486 }
487
488 #[cfg(test)]
489 pub(crate) fn from_writer(path: &Path, writer: impl Write + Send + 'static) -> Self {
490 Self {
491 writer: Box::new(writer),
492 path: path.to_path_buf(),
493 max_bytes: None,
494 }
495 }
496
497 pub fn emit(&mut self, event: TeamEvent) -> Result<()> {
498 let json = serde_json::to_string(&event)?;
499 self.rotate_if_needed((json.len() + 1) as u64)?;
500 writeln!(self.writer, "{json}")?;
501 self.writer.flush()?;
502 Ok(())
503 }
504
505 pub fn path(&self) -> &Path {
506 &self.path
507 }
508
509 fn rotate_if_needed(&mut self, next_entry_bytes: u64) -> Result<()> {
510 let Some(max_bytes) = self.max_bytes else {
511 return Ok(());
512 };
513 self.writer.flush()?;
514 if rotate_event_log_if_needed(&self.path, max_bytes, next_entry_bytes)? {
515 self.writer = Box::new(BufWriter::new(
516 OpenOptions::new()
517 .create(true)
518 .append(true)
519 .open(&self.path)
520 .with_context(|| {
521 format!("failed to reopen event sink: {}", self.path.display())
522 })?,
523 ));
524 }
525 Ok(())
526 }
527}
528
529fn rotated_event_log_path(path: &Path) -> PathBuf {
530 PathBuf::from(format!("{}.1", path.display()))
531}
532
533fn rotate_event_log_if_needed(path: &Path, max_bytes: u64, next_entry_bytes: u64) -> Result<bool> {
534 let len = match fs::metadata(path) {
535 Ok(metadata) => metadata.len(),
536 Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(false),
537 Err(error) => {
538 return Err(error).with_context(|| format!("failed to stat {}", path.display()));
539 }
540 };
541
542 if len == 0 {
543 return Ok(false);
544 }
545
546 if len.saturating_add(next_entry_bytes) <= max_bytes {
547 return Ok(false);
548 }
549
550 let rotated = rotated_event_log_path(path);
551 if rotated.exists() {
552 fs::remove_file(&rotated)
553 .with_context(|| format!("failed to remove {}", rotated.display()))?;
554 }
555 fs::rename(path, &rotated).with_context(|| {
556 format!(
557 "failed to rotate event log {} to {}",
558 path.display(),
559 rotated.display()
560 )
561 })?;
562 Ok(true)
563}
564
565pub fn read_events(path: &Path) -> Result<Vec<TeamEvent>> {
566 if !path.exists() {
567 return Ok(Vec::new());
568 }
569 let content = fs::read_to_string(path).context("failed to read event log")?;
570 let mut events = Vec::new();
571 for line in content.lines() {
572 let line = line.trim();
573 if line.is_empty() {
574 continue;
575 }
576 if let Ok(event) = serde_json::from_str::<TeamEvent>(line) {
577 events.push(event);
578 }
579 }
580 Ok(events)
581}
582
583#[cfg(test)]
584mod tests {
585 use std::sync::{Arc, Mutex};
586 use std::thread;
587
588 use super::*;
589
590 #[test]
591 fn event_serializes_to_json() {
592 let event = TeamEvent::task_assigned("eng-1-1", "fix auth bug");
593 let json = serde_json::to_string(&event).unwrap();
594 assert!(json.contains("\"event\":\"task_assigned\""));
595 assert!(json.contains("\"role\":\"eng-1-1\""));
596 assert!(json.contains("\"task\":\"fix auth bug\""));
597 assert!(json.contains("\"ts\":"));
598 }
599
600 #[test]
601 fn optional_fields_omitted() {
602 let event = TeamEvent::daemon_started();
603 let json = serde_json::to_string(&event).unwrap();
604 assert!(!json.contains("\"role\""));
605 assert!(!json.contains("\"task\""));
606 }
607
608 #[test]
609 fn event_sink_writes_jsonl() {
610 let tmp = tempfile::tempdir().unwrap();
611 let path = tmp.path().join("events.jsonl");
612 let mut sink = EventSink::new(&path).unwrap();
613 sink.emit(TeamEvent::daemon_started()).unwrap();
614 sink.emit(TeamEvent::task_assigned("eng-1", "fix bug"))
615 .unwrap();
616 sink.emit(TeamEvent::daemon_stopped()).unwrap();
617
618 let content = std::fs::read_to_string(&path).unwrap();
619 let lines: Vec<&str> = content.lines().collect();
620 assert_eq!(lines.len(), 3);
621 assert!(lines[0].contains("daemon_started"));
622 assert!(lines[1].contains("task_assigned"));
623 assert!(lines[2].contains("daemon_stopped"));
624 }
625
626 #[test]
627 fn all_event_variants_serialize_with_correct_event_field() {
628 let variants: Vec<(&str, TeamEvent)> = vec![
629 ("daemon_started", TeamEvent::daemon_started()),
630 ("daemon_reloading", TeamEvent::daemon_reloading()),
631 ("daemon_reloaded", TeamEvent::daemon_reloaded()),
632 ("daemon_stopped", TeamEvent::daemon_stopped()),
633 (
634 "daemon_stopped",
635 TeamEvent::daemon_stopped_with_reason("signal", 3600),
636 ),
637 ("daemon_heartbeat", TeamEvent::daemon_heartbeat(120)),
638 (
639 "context_exhausted",
640 TeamEvent::context_exhausted("eng-1", Some(42), Some(1_024)),
641 ),
642 (
643 "loop_step_error",
644 TeamEvent::loop_step_error("poll_watchers", "tmux error"),
645 ),
646 (
647 "daemon_panic",
648 TeamEvent::daemon_panic("index out of bounds"),
649 ),
650 ("task_assigned", TeamEvent::task_assigned("eng-1", "task")),
651 (
652 "cwd_corrected",
653 TeamEvent::cwd_corrected("eng-1", "/tmp/worktree"),
654 ),
655 (
656 "task_escalated",
657 TeamEvent::task_escalated("eng-1", "task", None),
658 ),
659 ("task_unblocked", TeamEvent::task_unblocked("eng-1", "task")),
660 (
661 "performance_regression",
662 TeamEvent::performance_regression("42", "runtime_ms=1300 avg_ms=1000 pct=30"),
663 ),
664 (
665 "task_completed",
666 TeamEvent::task_completed("eng-1", Some("42")),
667 ),
668 ("standup_generated", TeamEvent::standup_generated("manager")),
669 ("retro_generated", TeamEvent::retro_generated()),
670 (
671 "pattern_detected",
672 TeamEvent::pattern_detected("merge_conflict_recurrence", 5),
673 ),
674 ("member_crashed", TeamEvent::member_crashed("eng-1", true)),
675 ("pane_death", TeamEvent::pane_death("eng-1")),
676 ("pane_respawned", TeamEvent::pane_respawned("eng-1")),
677 (
678 "context_pressure_warning",
679 TeamEvent::context_pressure_warning("eng-1", Some(42), 400_000, 512_000),
680 ),
681 ("message_routed", TeamEvent::message_routed("a", "b")),
682 ("agent_spawned", TeamEvent::agent_spawned("eng-1")),
683 (
684 "agent_restarted",
685 TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 1),
686 ),
687 (
688 "delivery_failed",
689 TeamEvent::delivery_failed("eng-1", "manager", "message marker missing"),
690 ),
691 (
692 "task_auto_merged",
693 TeamEvent::task_auto_merged("eng-1", "42", 0.95, 2, 30),
694 ),
695 ("task_manual_merged", TeamEvent::task_manual_merged("42")),
696 (
697 "merge_confidence_scored",
698 TeamEvent::merge_confidence_scored(&MergeConfidenceInfo {
699 engineer: "eng-1",
700 task: "42",
701 confidence: 0.85,
702 files_changed: 3,
703 lines_changed: 50,
704 has_migrations: false,
705 has_config_changes: false,
706 rename_count: 0,
707 }),
708 ),
709 (
710 "review_nudge_sent",
711 TeamEvent::review_nudge_sent("manager", "42"),
712 ),
713 (
714 "review_escalated",
715 TeamEvent::review_escalated("42", "stale review"),
716 ),
717 ("task_reworked", TeamEvent::task_reworked("eng-1", "42")),
718 ("load_snapshot", TeamEvent::load_snapshot(2, 5, true)),
719 (
720 "worktree_reconciled",
721 TeamEvent::worktree_reconciled("eng-1", "eng-1/42"),
722 ),
723 ];
724 for (expected_event, event) in &variants {
725 let json = serde_json::to_string(event).unwrap();
726 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
727 assert_eq!(parsed["event"].as_str().unwrap(), *expected_event);
728 assert!(parsed["ts"].as_u64().is_some());
729 }
730 }
731
732 #[test]
733 fn load_snapshot_serializes_optional_metrics() {
734 let event = TeamEvent::load_snapshot(3, 7, false);
735 let json = serde_json::to_string(&event).unwrap();
736 assert!(json.contains("\"event\":\"load_snapshot\""));
737 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
738 assert_eq!(parsed["load"].as_f64().unwrap(), 3.0 / 7.0);
739 assert_eq!(parsed["working_members"].as_u64().unwrap(), 3);
740 assert_eq!(parsed["total_members"].as_u64().unwrap(), 7);
741 assert!(!parsed["session_running"].as_bool().unwrap());
742 }
743
744 #[test]
745 fn read_events_parses_all_known_lines() {
746 let tmp = tempfile::tempdir().unwrap();
747 let path = tmp.path().join("events.jsonl");
748 let mut sink = EventSink::new(&path).unwrap();
749 sink.emit(TeamEvent::daemon_started()).unwrap();
750 sink.emit(TeamEvent::load_snapshot(1, 4, true)).unwrap();
751 sink.emit(TeamEvent::load_snapshot(2, 4, true)).unwrap();
752
753 let events = read_events(&path).unwrap();
754 assert_eq!(events.len(), 3);
755 assert_eq!(events[1].event, "load_snapshot");
756 assert_eq!(events[1].working_members, Some(1));
757 assert_eq!(events[2].total_members, Some(4));
758 }
759
760 #[test]
761 fn event_sink_appends_to_existing_file() {
762 let tmp = tempfile::tempdir().unwrap();
763 let path = tmp.path().join("events.jsonl");
764
765 {
767 let mut sink = EventSink::new(&path).unwrap();
768 sink.emit(TeamEvent::daemon_started()).unwrap();
769 }
770
771 {
773 let mut sink = EventSink::new(&path).unwrap();
774 sink.emit(TeamEvent::daemon_stopped()).unwrap();
775 }
776
777 let content = std::fs::read_to_string(&path).unwrap();
778 let lines: Vec<&str> = content.lines().collect();
779 assert_eq!(lines.len(), 2);
780 assert!(lines[0].contains("daemon_started"));
781 assert!(lines[1].contains("daemon_stopped"));
782 }
783
784 #[test]
785 fn event_with_special_characters_in_task() {
786 let event = TeamEvent::task_assigned("eng-1", "fix: \"quotes\" & <angles> / \\ newline\n");
787 let json = serde_json::to_string(&event).unwrap();
788 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
789 let task_val = parsed["task"].as_str().unwrap();
790 assert!(task_val.contains("\"quotes\""));
791 assert!(task_val.contains("<angles>"));
792 }
793
794 #[test]
795 fn task_escalated_serializes_role_and_task() {
796 let event = TeamEvent::task_escalated("eng-1-1", "42", Some("tests_failed"));
797 let json = serde_json::to_string(&event).unwrap();
798 assert!(json.contains("\"event\":\"task_escalated\""));
799 assert!(json.contains("\"role\":\"eng-1-1\""));
800 assert!(json.contains("\"task\":\"42\""));
801 assert!(json.contains("\"reason\":\"tests_failed\""));
802 }
803
804 #[test]
805 fn cwd_corrected_serializes_role_and_reason() {
806 let event = TeamEvent::cwd_corrected("eng-1-1", "/tmp/worktree");
807 let json = serde_json::to_string(&event).unwrap();
808 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
809 assert_eq!(parsed["event"].as_str().unwrap(), "cwd_corrected");
810 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
811 assert_eq!(parsed["reason"].as_str().unwrap(), "/tmp/worktree");
812 }
813
814 #[test]
815 fn pane_death_serializes_role() {
816 let event = TeamEvent::pane_death("eng-1-1");
817 let json = serde_json::to_string(&event).unwrap();
818 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
819 assert_eq!(parsed["event"].as_str().unwrap(), "pane_death");
820 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
821 }
822
823 #[test]
824 fn pane_respawned_serializes_role() {
825 let event = TeamEvent::pane_respawned("eng-1-1");
826 let json = serde_json::to_string(&event).unwrap();
827 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
828 assert_eq!(parsed["event"].as_str().unwrap(), "pane_respawned");
829 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
830 }
831
832 #[test]
833 fn agent_restarted_includes_reason_task_and_count() {
834 let event = TeamEvent::agent_restarted("eng-1-2", "67", "context_exhausted", 2);
835 let json = serde_json::to_string(&event).unwrap();
836 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
837 assert_eq!(parsed["event"].as_str().unwrap(), "agent_restarted");
838 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
839 assert_eq!(parsed["task"].as_str().unwrap(), "67");
840 assert_eq!(parsed["reason"].as_str().unwrap(), "context_exhausted");
841 assert_eq!(parsed["restart_count"].as_u64().unwrap(), 2);
842 }
843
844 #[test]
845 fn context_pressure_warning_includes_threshold_and_output_bytes() {
846 let event = TeamEvent::context_pressure_warning("eng-1-2", Some(67), 420_000, 512_000);
847 let json = serde_json::to_string(&event).unwrap();
848 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
849
850 assert_eq!(
851 parsed["event"].as_str().unwrap(),
852 "context_pressure_warning"
853 );
854 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
855 assert_eq!(parsed["task"].as_str().unwrap(), "67");
856 assert_eq!(parsed["output_bytes"].as_u64().unwrap(), 420_000);
857 assert_eq!(parsed["reason"].as_str().unwrap(), "threshold_bytes=512000");
858 }
859
860 #[test]
861 fn delivery_failed_includes_role_sender_and_reason() {
862 let event = TeamEvent::delivery_failed("eng-1-2", "manager", "message marker missing");
863 let json = serde_json::to_string(&event).unwrap();
864 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
865 assert_eq!(parsed["event"].as_str().unwrap(), "delivery_failed");
866 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
867 assert_eq!(parsed["from"].as_str().unwrap(), "manager");
868 assert_eq!(parsed["reason"].as_str().unwrap(), "message marker missing");
869 }
870
871 #[test]
872 fn task_unblocked_serializes_role_and_task() {
873 let event = TeamEvent::task_unblocked("eng-1-1", "42");
874 let json = serde_json::to_string(&event).unwrap();
875 assert!(json.contains("\"event\":\"task_unblocked\""));
876 assert!(json.contains("\"role\":\"eng-1-1\""));
877 assert!(json.contains("\"task\":\"42\""));
878 }
879
880 #[test]
881 fn merge_confidence_scored_includes_all_fields() {
882 let event = TeamEvent::merge_confidence_scored(&MergeConfidenceInfo {
883 engineer: "eng-1-1",
884 task: "42",
885 confidence: 0.85,
886 files_changed: 3,
887 lines_changed: 50,
888 has_migrations: true,
889 has_config_changes: false,
890 rename_count: 1,
891 });
892 let json = serde_json::to_string(&event).unwrap();
893 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
894 assert_eq!(parsed["event"].as_str().unwrap(), "merge_confidence_scored");
895 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
896 assert_eq!(parsed["task"].as_str().unwrap(), "42");
897 assert!((parsed["load"].as_f64().unwrap() - 0.85).abs() < 0.001);
898 let reason = parsed["reason"].as_str().unwrap();
899 assert!(reason.contains("files=3"));
900 assert!(reason.contains("lines=50"));
901 assert!(reason.contains("migrations=true"));
902 assert!(reason.contains("config=false"));
903 assert!(reason.contains("renames=1"));
904 }
905
906 #[test]
907 fn performance_regression_serializes_task_and_reason() {
908 let event = TeamEvent::performance_regression("42", "runtime_ms=1300 avg_ms=1000 pct=30");
909 let json = serde_json::to_string(&event).unwrap();
910 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
911 assert_eq!(parsed["event"].as_str().unwrap(), "performance_regression");
912 assert_eq!(parsed["task"].as_str().unwrap(), "42");
913 assert_eq!(
914 parsed["reason"].as_str().unwrap(),
915 "runtime_ms=1300 avg_ms=1000 pct=30"
916 );
917 }
918
919 #[test]
920 fn daemon_stopped_with_reason_includes_fields() {
921 let event = TeamEvent::daemon_stopped_with_reason("signal", 7200);
922 let json = serde_json::to_string(&event).unwrap();
923 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
924 assert_eq!(parsed["reason"].as_str().unwrap(), "signal");
925 assert_eq!(parsed["uptime_secs"].as_u64().unwrap(), 7200);
926 }
927
928 #[test]
929 fn heartbeat_includes_uptime() {
930 let event = TeamEvent::daemon_heartbeat(600);
931 let json = serde_json::to_string(&event).unwrap();
932 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
933 assert_eq!(parsed["event"].as_str().unwrap(), "daemon_heartbeat");
934 assert_eq!(parsed["uptime_secs"].as_u64().unwrap(), 600);
935 assert!(parsed.get("reason").is_none());
937 assert!(parsed.get("step").is_none());
938 }
939
940 #[test]
941 fn context_exhausted_includes_role_task_and_session_size() {
942 let event = TeamEvent::context_exhausted("eng-1", Some(77), Some(4096));
943 let json = serde_json::to_string(&event).unwrap();
944 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
945 assert_eq!(parsed["event"].as_str().unwrap(), "context_exhausted");
946 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1");
947 assert_eq!(parsed["task"].as_str().unwrap(), "77");
948 assert_eq!(parsed["session_size_bytes"].as_u64().unwrap(), 4096);
949 }
950
951 #[test]
952 fn loop_step_error_includes_step_and_error() {
953 let event = TeamEvent::loop_step_error("poll_watchers", "connection refused");
954 let json = serde_json::to_string(&event).unwrap();
955 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
956 assert_eq!(parsed["step"].as_str().unwrap(), "poll_watchers");
957 assert_eq!(parsed["error"].as_str().unwrap(), "connection refused");
958 }
959
960 #[test]
961 fn daemon_panic_includes_reason() {
962 let event = TeamEvent::daemon_panic("index out of bounds");
963 let json = serde_json::to_string(&event).unwrap();
964 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
965 assert_eq!(parsed["event"].as_str().unwrap(), "daemon_panic");
966 assert_eq!(parsed["reason"].as_str().unwrap(), "index out of bounds");
967 }
968
969 #[test]
970 fn new_fields_omitted_from_basic_events() {
971 let event = TeamEvent::daemon_started();
972 let json = serde_json::to_string(&event).unwrap();
973 assert!(!json.contains("\"reason\""));
974 assert!(!json.contains("\"step\""));
975 assert!(!json.contains("\"error\""));
976 assert!(!json.contains("\"uptime_secs\""));
977 assert!(!json.contains("\"restart_count\""));
978 }
979
980 #[test]
981 fn pattern_detected_includes_reason_payload() {
982 let event = TeamEvent::pattern_detected("escalation_cluster", 6);
983 let json = serde_json::to_string(&event).unwrap();
984 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
985 assert_eq!(parsed["event"].as_str().unwrap(), "pattern_detected");
986 assert_eq!(parsed["reason"].as_str().unwrap(), "escalation_cluster:6");
987 }
988
989 #[test]
990 fn event_sink_creates_parent_directories() {
991 let tmp = tempfile::tempdir().unwrap();
992 let path = tmp.path().join("deep").join("nested").join("events.jsonl");
993 let mut sink = EventSink::new(&path).unwrap();
994 sink.emit(TeamEvent::daemon_started()).unwrap();
995 assert!(path.exists());
996 assert_eq!(sink.path(), path);
997 }
998
999 #[test]
1000 fn event_sink_rotates_oversized_log_on_open() {
1001 let tmp = tempfile::tempdir().unwrap();
1002 let path = tmp.path().join("events.jsonl");
1003 fs::write(&path, "0123456789").unwrap();
1004
1005 let mut sink = EventSink::new_with_max_bytes(&path, 5).unwrap();
1006 sink.emit(TeamEvent::daemon_started()).unwrap();
1007
1008 let rotated = rotated_event_log_path(&path);
1009 assert_eq!(fs::read_to_string(&rotated).unwrap(), "0123456789");
1010 let current = fs::read_to_string(&path).unwrap();
1011 assert!(current.contains("daemon_started"));
1012 }
1013
1014 #[test]
1015 fn event_sink_rotates_before_write_that_would_exceed_threshold() {
1016 let tmp = tempfile::tempdir().unwrap();
1017 let path = tmp.path().join("events.jsonl");
1018 let first_line = "{\"event\":\"first\"}\n";
1019 fs::write(&path, first_line).unwrap();
1020
1021 let mut sink = EventSink::new_with_max_bytes(&path, first_line.len() as u64 + 10).unwrap();
1022 sink.emit(TeamEvent::task_assigned(
1023 "eng-1",
1024 "this assignment is long enough to rotate",
1025 ))
1026 .unwrap();
1027
1028 let rotated = rotated_event_log_path(&path);
1029 assert_eq!(fs::read_to_string(&rotated).unwrap(), first_line);
1030 let current = fs::read_to_string(&path).unwrap();
1031 assert!(current.contains("task_assigned"));
1032 assert!(!current.contains("\"event\":\"first\""));
1033 }
1034
1035 #[test]
1036 fn event_round_trip_preserves_fields_for_agent_restarted() {
1037 let original = TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 3);
1038
1039 let json = serde_json::to_string(&original).unwrap();
1040 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1041
1042 assert_eq!(parsed.event, "agent_restarted");
1043 assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1044 assert_eq!(parsed.task.as_deref(), Some("42"));
1045 assert_eq!(parsed.reason.as_deref(), Some("context_exhausted"));
1046 assert_eq!(parsed.restart_count, Some(3));
1047 assert_eq!(parsed.ts, original.ts);
1048 }
1049
1050 #[test]
1051 fn event_round_trip_preserves_fields_for_load_snapshot() {
1052 let original = TeamEvent::load_snapshot(4, 8, true);
1053
1054 let json = serde_json::to_string(&original).unwrap();
1055 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1056
1057 assert_eq!(parsed.event, "load_snapshot");
1058 assert_eq!(parsed.working_members, Some(4));
1059 assert_eq!(parsed.total_members, Some(8));
1060 assert_eq!(parsed.session_running, Some(true));
1061 assert_eq!(parsed.load, Some(0.5));
1062 assert_eq!(parsed.ts, original.ts);
1063 }
1064
1065 #[test]
1066 fn event_round_trip_preserves_fields_for_delivery_failed() {
1067 let original = TeamEvent::delivery_failed("eng-2", "manager", "marker missing");
1068
1069 let json = serde_json::to_string(&original).unwrap();
1070 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1071
1072 assert_eq!(parsed.event, "delivery_failed");
1073 assert_eq!(parsed.role.as_deref(), Some("eng-2"));
1074 assert_eq!(parsed.from.as_deref(), Some("manager"));
1075 assert_eq!(parsed.reason.as_deref(), Some("marker missing"));
1076 assert_eq!(parsed.ts, original.ts);
1077 }
1078
1079 #[test]
1080 fn read_events_skips_blank_and_malformed_lines() {
1081 let tmp = tempfile::tempdir().unwrap();
1082 let path = tmp.path().join("events.jsonl");
1083 fs::write(
1084 &path,
1085 [
1086 "",
1087 "{\"event\":\"daemon_started\",\"ts\":1}",
1088 "not-json",
1089 " ",
1090 "{\"event\":\"daemon_stopped\",\"ts\":2}",
1091 ]
1092 .join("\n"),
1093 )
1094 .unwrap();
1095
1096 let events = read_events(&path).unwrap();
1097
1098 assert_eq!(events.len(), 2);
1099 assert_eq!(events[0].event, "daemon_started");
1100 assert_eq!(events[1].event, "daemon_stopped");
1101 }
1102
1103 #[test]
1104 fn rotate_event_log_if_needed_returns_false_for_missing_file() {
1105 let tmp = tempfile::tempdir().unwrap();
1106 let path = tmp.path().join("events.jsonl");
1107
1108 let rotated = rotate_event_log_if_needed(&path, 128, 0).unwrap();
1109
1110 assert!(!rotated);
1111 assert!(!rotated_event_log_path(&path).exists());
1112 }
1113
1114 #[test]
1115 fn rotate_event_log_if_needed_returns_false_for_empty_file() {
1116 let tmp = tempfile::tempdir().unwrap();
1117 let path = tmp.path().join("events.jsonl");
1118 fs::write(&path, "").unwrap();
1119
1120 let rotated = rotate_event_log_if_needed(&path, 1, 1).unwrap();
1121
1122 assert!(!rotated);
1123 assert!(path.exists());
1124 assert!(!rotated_event_log_path(&path).exists());
1125 }
1126
1127 #[test]
1128 fn rotate_event_log_if_needed_replaces_existing_rotated_file() {
1129 let tmp = tempfile::tempdir().unwrap();
1130 let path = tmp.path().join("events.jsonl");
1131 let rotated_path = rotated_event_log_path(&path);
1132 fs::write(&path, "current-events").unwrap();
1133 fs::write(&rotated_path, "old-rotated-events").unwrap();
1134
1135 let rotated = rotate_event_log_if_needed(&path, 5, 0).unwrap();
1136
1137 assert!(rotated);
1138 assert_eq!(fs::read_to_string(&rotated_path).unwrap(), "current-events");
1139 }
1140
1141 #[test]
1142 fn concurrent_event_sinks_append_without_losing_lines() {
1143 let tmp = tempfile::tempdir().unwrap();
1144 let path = Arc::new(tmp.path().join("events.jsonl"));
1145 let ready = Arc::new(std::sync::Barrier::new(5));
1146 let errors = Arc::new(Mutex::new(Vec::<String>::new()));
1147 let mut handles = Vec::new();
1148
1149 for idx in 0..4 {
1150 let path = Arc::clone(&path);
1151 let ready = Arc::clone(&ready);
1152 let errors = Arc::clone(&errors);
1153 handles.push(thread::spawn(move || {
1154 ready.wait();
1155 let result = (|| -> Result<()> {
1156 let mut sink = EventSink::new(&path)?;
1157 sink.emit(TeamEvent::task_assigned(
1158 &format!("eng-{idx}"),
1159 &format!("task-{idx}"),
1160 ))?;
1161 Ok(())
1162 })();
1163 if let Err(error) = result {
1164 errors.lock().unwrap().push(error.to_string());
1165 }
1166 }));
1167 }
1168
1169 ready.wait();
1170 for handle in handles {
1171 handle.join().unwrap();
1172 }
1173
1174 assert!(errors.lock().unwrap().is_empty());
1175 let events = read_events(&path).unwrap();
1176 assert_eq!(events.len(), 4);
1177 for idx in 0..4 {
1178 assert!(
1179 events
1180 .iter()
1181 .any(|event| event.role.as_deref() == Some(&format!("eng-{idx}")))
1182 );
1183 }
1184 }
1185
1186 #[test]
1187 fn read_events_handles_large_log_file() {
1188 let tmp = tempfile::tempdir().unwrap();
1189 let path = tmp.path().join("events.jsonl");
1190 let mut sink = EventSink::new(&path).unwrap();
1191
1192 for idx in 0..512 {
1193 sink.emit(TeamEvent::task_assigned(
1194 &format!("eng-{idx}"),
1195 &"x".repeat(128),
1196 ))
1197 .unwrap();
1198 }
1199
1200 let events = read_events(&path).unwrap();
1201
1202 assert_eq!(events.len(), 512);
1203 assert_eq!(events.first().unwrap().event, "task_assigned");
1204 assert_eq!(events.last().unwrap().event, "task_assigned");
1205 }
1206
1207 fn production_unwrap_expect_count(source: &str) -> usize {
1208 let prod = if let Some(pos) = source.find("\n#[cfg(test)]\nmod tests") {
1209 &source[..pos]
1210 } else {
1211 source
1212 };
1213 prod.lines()
1214 .filter(|line| {
1215 let trimmed = line.trim();
1216 !trimmed.starts_with("#[cfg(test)]")
1217 && (trimmed.contains(".unwrap(") || trimmed.contains(".expect("))
1218 })
1219 .count()
1220 }
1221
1222 #[test]
1223 fn task_completed_includes_task_id() {
1224 let event = TeamEvent::task_completed("eng-1", Some("42"));
1225 let json = serde_json::to_string(&event).unwrap();
1226 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1227 assert_eq!(parsed["event"].as_str().unwrap(), "task_completed");
1228 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1");
1229 assert_eq!(parsed["task"].as_str().unwrap(), "42");
1230 }
1231
1232 #[test]
1233 fn task_completed_without_task_id_omits_task_field() {
1234 let event = TeamEvent::task_completed("eng-1", None);
1235 let json = serde_json::to_string(&event).unwrap();
1236 assert!(json.contains("\"event\":\"task_completed\""));
1237 assert!(json.contains("\"role\":\"eng-1\""));
1238 assert!(!json.contains("\"task\""));
1239 }
1240
1241 #[test]
1242 fn task_escalated_without_reason_omits_reason_field() {
1243 let event = TeamEvent::task_escalated("eng-1", "42", None);
1244 let json = serde_json::to_string(&event).unwrap();
1245 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1246 assert_eq!(parsed["event"].as_str().unwrap(), "task_escalated");
1247 assert_eq!(parsed["task"].as_str().unwrap(), "42");
1248 assert!(parsed.get("reason").is_none());
1249 }
1250
1251 #[test]
1252 fn task_escalated_with_reason_includes_reason_field() {
1253 let event = TeamEvent::task_escalated("eng-1", "42", Some("merge_conflict"));
1254 let json = serde_json::to_string(&event).unwrap();
1255 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1256 assert_eq!(parsed["event"].as_str().unwrap(), "task_escalated");
1257 assert_eq!(parsed["task"].as_str().unwrap(), "42");
1258 assert_eq!(parsed["reason"].as_str().unwrap(), "merge_conflict");
1259 }
1260
1261 #[test]
1262 fn task_completed_round_trip_preserves_task_id() {
1263 let original = TeamEvent::task_completed("eng-1", Some("99"));
1264 let json = serde_json::to_string(&original).unwrap();
1265 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1266 assert_eq!(parsed.event, "task_completed");
1267 assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1268 assert_eq!(parsed.task.as_deref(), Some("99"));
1269 }
1270
1271 #[test]
1272 fn task_escalated_round_trip_preserves_reason() {
1273 let original = TeamEvent::task_escalated("eng-1", "42", Some("context_exhausted"));
1274 let json = serde_json::to_string(&original).unwrap();
1275 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1276 assert_eq!(parsed.event, "task_escalated");
1277 assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1278 assert_eq!(parsed.task.as_deref(), Some("42"));
1279 assert_eq!(parsed.reason.as_deref(), Some("context_exhausted"));
1280 }
1281
1282 #[test]
1283 fn production_events_has_no_unwrap_or_expect_calls() {
1284 let src = include_str!("events.rs");
1285 assert_eq!(
1286 production_unwrap_expect_count(src),
1287 0,
1288 "production events.rs should avoid unwrap/expect"
1289 );
1290 }
1291
1292 #[test]
1293 fn stall_detected_event_fields() {
1294 let event = TeamEvent::stall_detected("eng-1-1", Some(42), 300);
1295 assert_eq!(event.event, "stall_detected");
1296 assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1297 assert_eq!(event.task.as_deref(), Some("42"));
1298 assert_eq!(event.uptime_secs, Some(300));
1299 }
1300
1301 #[test]
1302 fn stall_detected_event_without_task() {
1303 let event = TeamEvent::stall_detected("eng-1-1", None, 600);
1304 assert_eq!(event.event, "stall_detected");
1305 assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1306 assert!(event.task.is_none());
1307 assert_eq!(event.uptime_secs, Some(600));
1308 }
1309
1310 #[test]
1311 fn stall_detected_event_serializes_to_jsonl() {
1312 let event = TeamEvent::stall_detected("eng-1-1", Some(42), 300);
1313 let json = serde_json::to_string(&event).unwrap();
1314 assert!(json.contains("\"stall_detected\""));
1315 assert!(json.contains("\"eng-1-1\""));
1316 assert!(json.contains("\"42\""));
1317 }
1318
1319 #[test]
1320 fn health_changed_event_fields() {
1321 let event = TeamEvent::health_changed("eng-1-1", "healthy→unreachable");
1322 assert_eq!(event.event, "health_changed");
1323 assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1324 assert_eq!(event.reason.as_deref(), Some("healthy→unreachable"));
1325 }
1326
1327 #[test]
1328 fn health_changed_event_serializes_to_jsonl() {
1329 let event = TeamEvent::health_changed("eng-1-2", "unreachable→healthy");
1330 let json = serde_json::to_string(&event).unwrap();
1331 assert!(json.contains("\"health_changed\""));
1332 assert!(json.contains("\"eng-1-2\""));
1333 }
1334
1335 #[test]
1338 fn event_sink_on_readonly_dir_returns_error() {
1339 #[cfg(unix)]
1340 {
1341 use std::os::unix::fs::PermissionsExt;
1342 let tmp = tempfile::tempdir().unwrap();
1343 let readonly_dir = tmp.path().join("readonly");
1344 fs::create_dir(&readonly_dir).unwrap();
1345 fs::set_permissions(&readonly_dir, fs::Permissions::from_mode(0o444)).unwrap();
1346
1347 let path = readonly_dir.join("subdir").join("events.jsonl");
1348 let result = EventSink::new(&path);
1349 assert!(result.is_err());
1350
1351 fs::set_permissions(&readonly_dir, fs::Permissions::from_mode(0o755)).unwrap();
1353 }
1354 }
1355
1356 #[test]
1357 fn read_events_from_nonexistent_file_returns_empty() {
1358 let tmp = tempfile::tempdir().unwrap();
1359 let path = tmp.path().join("does_not_exist.jsonl");
1360 let events = read_events(&path).unwrap();
1361 assert!(events.is_empty());
1362 }
1363
1364 #[test]
1365 fn read_events_all_malformed_lines_returns_empty() {
1366 let tmp = tempfile::tempdir().unwrap();
1367 let path = tmp.path().join("events.jsonl");
1368 fs::write(&path, "not json\nalso not json\n{invalid}\n").unwrap();
1369 let events = read_events(&path).unwrap();
1370 assert!(events.is_empty());
1371 }
1372
1373 #[test]
1374 fn event_sink_emit_with_failing_writer() {
1375 struct FailWriter;
1376 impl Write for FailWriter {
1377 fn write(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
1378 Err(std::io::Error::new(
1379 std::io::ErrorKind::BrokenPipe,
1380 "simulated write failure",
1381 ))
1382 }
1383 fn flush(&mut self) -> std::io::Result<()> {
1384 Err(std::io::Error::new(
1385 std::io::ErrorKind::BrokenPipe,
1386 "simulated flush failure",
1387 ))
1388 }
1389 }
1390
1391 let tmp = tempfile::tempdir().unwrap();
1392 let path = tmp.path().join("events.jsonl");
1393 let mut sink = EventSink::from_writer(&path, FailWriter);
1394 let result = sink.emit(TeamEvent::daemon_started());
1395 assert!(result.is_err());
1396 }
1397
1398 #[test]
1399 fn rotate_event_log_replaces_stale_rotated_file() {
1400 let tmp = tempfile::tempdir().unwrap();
1401 let path = tmp.path().join("events.jsonl");
1402 let rotated = rotated_event_log_path(&path);
1403
1404 fs::write(&path, "current-data-that-is-large").unwrap();
1405 fs::write(&rotated, "old-rotated-data").unwrap();
1406
1407 let did_rotate = rotate_event_log_if_needed(&path, 5, 0).unwrap();
1408 assert!(did_rotate);
1409 assert_eq!(
1411 fs::read_to_string(&rotated).unwrap(),
1412 "current-data-that-is-large"
1413 );
1414 assert!(!path.exists());
1416 }
1417
1418 #[test]
1419 fn event_sink_handles_zero_max_bytes_rotation() {
1420 let tmp = tempfile::tempdir().unwrap();
1421 let path = tmp.path().join("events.jsonl");
1422
1423 fs::write(&path, "").unwrap();
1425 let did_rotate = rotate_event_log_if_needed(&path, 0, 0).unwrap();
1426 assert!(!did_rotate); fs::write(&path, "x").unwrap();
1429 let did_rotate = rotate_event_log_if_needed(&path, 0, 0).unwrap();
1430 assert!(did_rotate); }
1432
1433 #[test]
1434 fn read_events_partial_json_with_valid_lines_mixed() {
1435 let tmp = tempfile::tempdir().unwrap();
1436 let path = tmp.path().join("events.jsonl");
1437 let content = format!(
1439 "{}\n{{\"event\":\"trunca\n{}\n",
1440 r#"{"event":"daemon_started","ts":1}"#, r#"{"event":"daemon_stopped","ts":3}"#
1441 );
1442 fs::write(&path, content).unwrap();
1443
1444 let events = read_events(&path).unwrap();
1445 assert_eq!(events.len(), 2);
1446 assert_eq!(events[0].event, "daemon_started");
1447 assert_eq!(events[1].event, "daemon_stopped");
1448 }
1449}