1use std::fs::{self, OpenOptions};
4use std::io::{BufWriter, Write};
5use std::path::{Path, PathBuf};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10
11use super::DEFAULT_EVENT_LOG_MAX_BYTES;
12
13pub struct MergeConfidenceInfo<'a> {
15 pub engineer: &'a str,
16 pub task: &'a str,
17 pub confidence: f64,
18 pub files_changed: usize,
19 pub lines_changed: usize,
20 pub has_migrations: bool,
21 pub has_config_changes: bool,
22 pub rename_count: usize,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26
27pub struct TeamEvent {
28 pub event: String,
29 #[serde(skip_serializing_if = "Option::is_none")]
30 pub role: Option<String>,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 pub task: Option<String>,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub recipient: Option<String>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub from: Option<String>,
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub to: Option<String>,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub restart: Option<bool>,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 pub restart_count: Option<u32>,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 pub load: Option<f64>,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub working_members: Option<u32>,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub total_members: Option<u32>,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub session_running: Option<bool>,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub reason: Option<String>,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 pub step: Option<String>,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 pub error: Option<String>,
57 #[serde(skip_serializing_if = "Option::is_none")]
58 pub uptime_secs: Option<u64>,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub session_size_bytes: Option<u64>,
61 pub ts: u64,
62}
63
64impl TeamEvent {
65 fn now() -> u64 {
66 SystemTime::now()
67 .duration_since(UNIX_EPOCH)
68 .unwrap_or_default()
69 .as_secs()
70 }
71
72 fn base(event: &str) -> Self {
73 Self {
74 event: event.into(),
75 role: None,
76 task: None,
77 recipient: None,
78 from: None,
79 to: None,
80 restart: None,
81 restart_count: None,
82 load: None,
83 working_members: None,
84 total_members: None,
85 session_running: None,
86 reason: None,
87 step: None,
88 error: None,
89 uptime_secs: None,
90 session_size_bytes: None,
91 ts: Self::now(),
92 }
93 }
94
95 pub fn daemon_started() -> Self {
96 Self::base("daemon_started")
97 }
98
99 pub fn daemon_reloading() -> Self {
100 Self::base("daemon_reloading")
101 }
102
103 pub fn daemon_reloaded() -> Self {
104 Self::base("daemon_reloaded")
105 }
106
107 pub fn daemon_stopped() -> Self {
108 Self::base("daemon_stopped")
109 }
110
111 pub fn daemon_stopped_with_reason(reason: &str, uptime_secs: u64) -> Self {
112 Self {
113 reason: Some(reason.into()),
114 uptime_secs: Some(uptime_secs),
115 ..Self::base("daemon_stopped")
116 }
117 }
118
119 pub fn daemon_heartbeat(uptime_secs: u64) -> Self {
120 Self {
121 uptime_secs: Some(uptime_secs),
122 ..Self::base("daemon_heartbeat")
123 }
124 }
125
126 pub fn context_exhausted(
127 role: &str,
128 task: Option<u32>,
129 session_size_bytes: Option<u64>,
130 ) -> Self {
131 Self {
132 role: Some(role.into()),
133 task: task.map(|task_id| task_id.to_string()),
134 session_size_bytes,
135 ..Self::base("context_exhausted")
136 }
137 }
138
139 pub fn loop_step_error(step: &str, error: &str) -> Self {
140 Self {
141 step: Some(step.into()),
142 error: Some(error.into()),
143 ..Self::base("loop_step_error")
144 }
145 }
146
147 pub fn daemon_panic(reason: &str) -> Self {
148 Self {
149 reason: Some(reason.into()),
150 ..Self::base("daemon_panic")
151 }
152 }
153
154 pub fn task_assigned(role: &str, task: &str) -> Self {
155 Self {
156 role: Some(role.into()),
157 task: Some(task.into()),
158 ..Self::base("task_assigned")
159 }
160 }
161
162 pub fn cwd_corrected(role: &str, path: &str) -> Self {
163 Self {
164 role: Some(role.into()),
165 reason: Some(path.into()),
166 ..Self::base("cwd_corrected")
167 }
168 }
169
170 pub fn review_nudge_sent(role: &str, task: &str) -> Self {
171 Self {
172 role: Some(role.into()),
173 task: Some(task.into()),
174 ..Self::base("review_nudge_sent")
175 }
176 }
177
178 pub fn review_escalated(task: &str, reason: &str) -> Self {
179 Self {
180 task: Some(task.into()),
181 reason: Some(reason.into()),
182 ..Self::base("review_escalated")
183 }
184 }
185
186 pub fn task_escalated(role: &str, task: &str, reason: Option<&str>) -> Self {
187 Self {
188 role: Some(role.into()),
189 task: Some(task.into()),
190 reason: reason.map(|r| r.into()),
191 ..Self::base("task_escalated")
192 }
193 }
194
195 pub fn task_unblocked(role: &str, task: &str) -> Self {
196 Self {
197 role: Some(role.into()),
198 task: Some(task.into()),
199 ..Self::base("task_unblocked")
200 }
201 }
202
203 pub fn performance_regression(task: &str, reason: &str) -> Self {
204 Self {
205 task: Some(task.into()),
206 reason: Some(reason.into()),
207 ..Self::base("performance_regression")
208 }
209 }
210
211 pub fn task_completed(role: &str, task: Option<&str>) -> Self {
212 Self {
213 role: Some(role.into()),
214 task: task.map(|t| t.into()),
215 ..Self::base("task_completed")
216 }
217 }
218
219 pub fn standup_generated(recipient: &str) -> Self {
220 Self {
221 recipient: Some(recipient.into()),
222 ..Self::base("standup_generated")
223 }
224 }
225
226 pub fn retro_generated() -> Self {
227 Self::base("retro_generated")
228 }
229
230 pub fn pattern_detected(pattern_type: &str, frequency: u32) -> Self {
231 Self {
232 reason: Some(format!("{pattern_type}:{frequency}")),
233 ..Self::base("pattern_detected")
234 }
235 }
236
237 pub fn member_crashed(role: &str, restart: bool) -> Self {
238 Self {
239 role: Some(role.into()),
240 restart: Some(restart),
241 ..Self::base("member_crashed")
242 }
243 }
244
245 pub fn pane_death(role: &str) -> Self {
246 Self {
247 role: Some(role.into()),
248 ..Self::base("pane_death")
249 }
250 }
251
252 pub fn pane_respawned(role: &str) -> Self {
253 Self {
254 role: Some(role.into()),
255 ..Self::base("pane_respawned")
256 }
257 }
258
259 pub fn stall_detected(role: &str, task: Option<u32>, stall_duration_secs: u64) -> Self {
260 Self {
261 role: Some(role.into()),
262 task: task.map(|id| id.to_string()),
263 uptime_secs: Some(stall_duration_secs),
264 ..Self::base("stall_detected")
265 }
266 }
267
268 pub fn health_changed(role: &str, reason: &str) -> Self {
272 Self {
273 role: Some(role.into()),
274 reason: Some(reason.into()),
275 ..Self::base("health_changed")
276 }
277 }
278
279 pub fn message_routed(from: &str, to: &str) -> Self {
280 Self {
281 from: Some(from.into()),
282 to: Some(to.into()),
283 ..Self::base("message_routed")
284 }
285 }
286
287 pub fn agent_spawned(role: &str) -> Self {
288 Self {
289 role: Some(role.into()),
290 ..Self::base("agent_spawned")
291 }
292 }
293
294 pub fn agent_restarted(role: &str, task: &str, reason: &str, restart_count: u32) -> Self {
295 Self {
296 role: Some(role.into()),
297 task: Some(task.into()),
298 reason: Some(reason.into()),
299 restart_count: Some(restart_count),
300 ..Self::base("agent_restarted")
301 }
302 }
303
304 pub fn delivery_failed(role: &str, from: &str, reason: &str) -> Self {
305 Self {
306 role: Some(role.into()),
307 from: Some(from.into()),
308 reason: Some(reason.into()),
309 ..Self::base("delivery_failed")
310 }
311 }
312
313 pub fn task_auto_merged(
314 engineer: &str,
315 task: &str,
316 confidence: f64,
317 files_changed: usize,
318 lines_changed: usize,
319 ) -> Self {
320 Self {
321 role: Some(engineer.into()),
322 task: Some(task.into()),
323 load: Some(confidence),
324 reason: Some(format!("files={} lines={}", files_changed, lines_changed)),
325 ..Self::base("task_auto_merged")
326 }
327 }
328
329 pub fn task_manual_merged(task: &str) -> Self {
330 Self {
331 task: Some(task.into()),
332 ..Self::base("task_manual_merged")
333 }
334 }
335
336 pub fn merge_confidence_scored(info: &MergeConfidenceInfo<'_>) -> Self {
338 let detail = format!(
339 "files={} lines={} migrations={} config={} renames={}",
340 info.files_changed,
341 info.lines_changed,
342 info.has_migrations,
343 info.has_config_changes,
344 info.rename_count
345 );
346 Self {
347 role: Some(info.engineer.into()),
348 task: Some(info.task.into()),
349 load: Some(info.confidence),
350 reason: Some(detail),
351 ..Self::base("merge_confidence_scored")
352 }
353 }
354
355 pub fn review_escalated_by_role(role: &str, task: &str) -> Self {
356 Self {
357 role: Some(role.into()),
358 task: Some(task.into()),
359 ..Self::base("review_escalated")
360 }
361 }
362
363 pub fn task_reworked(role: &str, task: &str) -> Self {
364 Self {
365 role: Some(role.into()),
366 task: Some(task.into()),
367 ..Self::base("task_reworked")
368 }
369 }
370
371 pub fn task_recycled(task_id: u32, cron_expr: &str) -> Self {
372 Self {
373 task: Some(format!("#{task_id}")),
374 reason: Some(cron_expr.into()),
375 ..Self::base("task_recycled")
376 }
377 }
378
379 pub fn load_snapshot(working_members: u32, total_members: u32, session_running: bool) -> Self {
380 let load = if total_members == 0 {
381 0.0
382 } else {
383 working_members as f64 / total_members as f64
384 };
385 Self {
386 load: Some(load),
387 working_members: Some(working_members),
388 total_members: Some(total_members),
389 session_running: Some(session_running),
390 ..Self::base("load_snapshot")
391 }
392 }
393
394 pub fn worktree_reconciled(role: &str, branch: &str) -> Self {
395 Self {
396 role: Some(role.into()),
397 reason: Some(format!("branch '{branch}' merged into main")),
398 ..Self::base("worktree_reconciled")
399 }
400 }
401}
402
403pub struct EventSink {
404 writer: Box<dyn Write + Send>,
405 path: PathBuf,
406 max_bytes: Option<u64>,
407}
408
409impl EventSink {
410 pub fn new(path: &Path) -> Result<Self> {
411 Self::new_with_max_bytes(path, DEFAULT_EVENT_LOG_MAX_BYTES)
412 }
413
414 pub fn new_with_max_bytes(path: &Path, max_bytes: u64) -> Result<Self> {
415 if let Some(parent) = path.parent() {
416 std::fs::create_dir_all(parent)?;
417 }
418 rotate_event_log_if_needed(path, max_bytes, 0)?;
419 let file = OpenOptions::new()
420 .create(true)
421 .append(true)
422 .open(path)
423 .with_context(|| format!("failed to open event sink: {}", path.display()))?;
424 Ok(Self {
425 writer: Box::new(BufWriter::new(file)),
426 path: path.to_path_buf(),
427 max_bytes: Some(max_bytes),
428 })
429 }
430
431 #[cfg(test)]
432 pub(crate) fn from_writer(path: &Path, writer: impl Write + Send + 'static) -> Self {
433 Self {
434 writer: Box::new(writer),
435 path: path.to_path_buf(),
436 max_bytes: None,
437 }
438 }
439
440 pub fn emit(&mut self, event: TeamEvent) -> Result<()> {
441 let json = serde_json::to_string(&event)?;
442 self.rotate_if_needed((json.len() + 1) as u64)?;
443 writeln!(self.writer, "{json}")?;
444 self.writer.flush()?;
445 Ok(())
446 }
447
448 pub fn path(&self) -> &Path {
449 &self.path
450 }
451
452 fn rotate_if_needed(&mut self, next_entry_bytes: u64) -> Result<()> {
453 let Some(max_bytes) = self.max_bytes else {
454 return Ok(());
455 };
456 self.writer.flush()?;
457 if rotate_event_log_if_needed(&self.path, max_bytes, next_entry_bytes)? {
458 self.writer = Box::new(BufWriter::new(
459 OpenOptions::new()
460 .create(true)
461 .append(true)
462 .open(&self.path)
463 .with_context(|| {
464 format!("failed to reopen event sink: {}", self.path.display())
465 })?,
466 ));
467 }
468 Ok(())
469 }
470}
471
472fn rotated_event_log_path(path: &Path) -> PathBuf {
473 PathBuf::from(format!("{}.1", path.display()))
474}
475
476fn rotate_event_log_if_needed(path: &Path, max_bytes: u64, next_entry_bytes: u64) -> Result<bool> {
477 let len = match fs::metadata(path) {
478 Ok(metadata) => metadata.len(),
479 Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(false),
480 Err(error) => {
481 return Err(error).with_context(|| format!("failed to stat {}", path.display()));
482 }
483 };
484
485 if len == 0 {
486 return Ok(false);
487 }
488
489 if len.saturating_add(next_entry_bytes) <= max_bytes {
490 return Ok(false);
491 }
492
493 let rotated = rotated_event_log_path(path);
494 if rotated.exists() {
495 fs::remove_file(&rotated)
496 .with_context(|| format!("failed to remove {}", rotated.display()))?;
497 }
498 fs::rename(path, &rotated).with_context(|| {
499 format!(
500 "failed to rotate event log {} to {}",
501 path.display(),
502 rotated.display()
503 )
504 })?;
505 Ok(true)
506}
507
508pub fn read_events(path: &Path) -> Result<Vec<TeamEvent>> {
509 if !path.exists() {
510 return Ok(Vec::new());
511 }
512 let content = fs::read_to_string(path).context("failed to read event log")?;
513 let mut events = Vec::new();
514 for line in content.lines() {
515 let line = line.trim();
516 if line.is_empty() {
517 continue;
518 }
519 if let Ok(event) = serde_json::from_str::<TeamEvent>(line) {
520 events.push(event);
521 }
522 }
523 Ok(events)
524}
525
526#[cfg(test)]
527mod tests {
528 use std::sync::{Arc, Mutex};
529 use std::thread;
530
531 use super::*;
532
533 #[test]
534 fn event_serializes_to_json() {
535 let event = TeamEvent::task_assigned("eng-1-1", "fix auth bug");
536 let json = serde_json::to_string(&event).unwrap();
537 assert!(json.contains("\"event\":\"task_assigned\""));
538 assert!(json.contains("\"role\":\"eng-1-1\""));
539 assert!(json.contains("\"task\":\"fix auth bug\""));
540 assert!(json.contains("\"ts\":"));
541 }
542
543 #[test]
544 fn optional_fields_omitted() {
545 let event = TeamEvent::daemon_started();
546 let json = serde_json::to_string(&event).unwrap();
547 assert!(!json.contains("\"role\""));
548 assert!(!json.contains("\"task\""));
549 }
550
551 #[test]
552 fn event_sink_writes_jsonl() {
553 let tmp = tempfile::tempdir().unwrap();
554 let path = tmp.path().join("events.jsonl");
555 let mut sink = EventSink::new(&path).unwrap();
556 sink.emit(TeamEvent::daemon_started()).unwrap();
557 sink.emit(TeamEvent::task_assigned("eng-1", "fix bug"))
558 .unwrap();
559 sink.emit(TeamEvent::daemon_stopped()).unwrap();
560
561 let content = std::fs::read_to_string(&path).unwrap();
562 let lines: Vec<&str> = content.lines().collect();
563 assert_eq!(lines.len(), 3);
564 assert!(lines[0].contains("daemon_started"));
565 assert!(lines[1].contains("task_assigned"));
566 assert!(lines[2].contains("daemon_stopped"));
567 }
568
569 #[test]
570 fn all_event_variants_serialize_with_correct_event_field() {
571 let variants: Vec<(&str, TeamEvent)> = vec![
572 ("daemon_started", TeamEvent::daemon_started()),
573 ("daemon_reloading", TeamEvent::daemon_reloading()),
574 ("daemon_reloaded", TeamEvent::daemon_reloaded()),
575 ("daemon_stopped", TeamEvent::daemon_stopped()),
576 (
577 "daemon_stopped",
578 TeamEvent::daemon_stopped_with_reason("signal", 3600),
579 ),
580 ("daemon_heartbeat", TeamEvent::daemon_heartbeat(120)),
581 (
582 "context_exhausted",
583 TeamEvent::context_exhausted("eng-1", Some(42), Some(1_024)),
584 ),
585 (
586 "loop_step_error",
587 TeamEvent::loop_step_error("poll_watchers", "tmux error"),
588 ),
589 (
590 "daemon_panic",
591 TeamEvent::daemon_panic("index out of bounds"),
592 ),
593 ("task_assigned", TeamEvent::task_assigned("eng-1", "task")),
594 (
595 "cwd_corrected",
596 TeamEvent::cwd_corrected("eng-1", "/tmp/worktree"),
597 ),
598 (
599 "task_escalated",
600 TeamEvent::task_escalated("eng-1", "task", None),
601 ),
602 ("task_unblocked", TeamEvent::task_unblocked("eng-1", "task")),
603 (
604 "performance_regression",
605 TeamEvent::performance_regression("42", "runtime_ms=1300 avg_ms=1000 pct=30"),
606 ),
607 (
608 "task_completed",
609 TeamEvent::task_completed("eng-1", Some("42")),
610 ),
611 ("standup_generated", TeamEvent::standup_generated("manager")),
612 ("retro_generated", TeamEvent::retro_generated()),
613 (
614 "pattern_detected",
615 TeamEvent::pattern_detected("merge_conflict_recurrence", 5),
616 ),
617 ("member_crashed", TeamEvent::member_crashed("eng-1", true)),
618 ("pane_death", TeamEvent::pane_death("eng-1")),
619 ("pane_respawned", TeamEvent::pane_respawned("eng-1")),
620 ("message_routed", TeamEvent::message_routed("a", "b")),
621 ("agent_spawned", TeamEvent::agent_spawned("eng-1")),
622 (
623 "agent_restarted",
624 TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 1),
625 ),
626 (
627 "delivery_failed",
628 TeamEvent::delivery_failed("eng-1", "manager", "message marker missing"),
629 ),
630 (
631 "task_auto_merged",
632 TeamEvent::task_auto_merged("eng-1", "42", 0.95, 2, 30),
633 ),
634 ("task_manual_merged", TeamEvent::task_manual_merged("42")),
635 (
636 "merge_confidence_scored",
637 TeamEvent::merge_confidence_scored(&MergeConfidenceInfo {
638 engineer: "eng-1",
639 task: "42",
640 confidence: 0.85,
641 files_changed: 3,
642 lines_changed: 50,
643 has_migrations: false,
644 has_config_changes: false,
645 rename_count: 0,
646 }),
647 ),
648 (
649 "review_nudge_sent",
650 TeamEvent::review_nudge_sent("manager", "42"),
651 ),
652 (
653 "review_escalated",
654 TeamEvent::review_escalated("42", "stale review"),
655 ),
656 ("task_reworked", TeamEvent::task_reworked("eng-1", "42")),
657 ("load_snapshot", TeamEvent::load_snapshot(2, 5, true)),
658 (
659 "worktree_reconciled",
660 TeamEvent::worktree_reconciled("eng-1", "eng-1/42"),
661 ),
662 ];
663 for (expected_event, event) in &variants {
664 let json = serde_json::to_string(event).unwrap();
665 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
666 assert_eq!(parsed["event"].as_str().unwrap(), *expected_event);
667 assert!(parsed["ts"].as_u64().is_some());
668 }
669 }
670
671 #[test]
672 fn load_snapshot_serializes_optional_metrics() {
673 let event = TeamEvent::load_snapshot(3, 7, false);
674 let json = serde_json::to_string(&event).unwrap();
675 assert!(json.contains("\"event\":\"load_snapshot\""));
676 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
677 assert_eq!(parsed["load"].as_f64().unwrap(), 3.0 / 7.0);
678 assert_eq!(parsed["working_members"].as_u64().unwrap(), 3);
679 assert_eq!(parsed["total_members"].as_u64().unwrap(), 7);
680 assert!(!parsed["session_running"].as_bool().unwrap());
681 }
682
683 #[test]
684 fn read_events_parses_all_known_lines() {
685 let tmp = tempfile::tempdir().unwrap();
686 let path = tmp.path().join("events.jsonl");
687 let mut sink = EventSink::new(&path).unwrap();
688 sink.emit(TeamEvent::daemon_started()).unwrap();
689 sink.emit(TeamEvent::load_snapshot(1, 4, true)).unwrap();
690 sink.emit(TeamEvent::load_snapshot(2, 4, true)).unwrap();
691
692 let events = read_events(&path).unwrap();
693 assert_eq!(events.len(), 3);
694 assert_eq!(events[1].event, "load_snapshot");
695 assert_eq!(events[1].working_members, Some(1));
696 assert_eq!(events[2].total_members, Some(4));
697 }
698
699 #[test]
700 fn event_sink_appends_to_existing_file() {
701 let tmp = tempfile::tempdir().unwrap();
702 let path = tmp.path().join("events.jsonl");
703
704 {
706 let mut sink = EventSink::new(&path).unwrap();
707 sink.emit(TeamEvent::daemon_started()).unwrap();
708 }
709
710 {
712 let mut sink = EventSink::new(&path).unwrap();
713 sink.emit(TeamEvent::daemon_stopped()).unwrap();
714 }
715
716 let content = std::fs::read_to_string(&path).unwrap();
717 let lines: Vec<&str> = content.lines().collect();
718 assert_eq!(lines.len(), 2);
719 assert!(lines[0].contains("daemon_started"));
720 assert!(lines[1].contains("daemon_stopped"));
721 }
722
723 #[test]
724 fn event_with_special_characters_in_task() {
725 let event = TeamEvent::task_assigned("eng-1", "fix: \"quotes\" & <angles> / \\ newline\n");
726 let json = serde_json::to_string(&event).unwrap();
727 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
728 let task_val = parsed["task"].as_str().unwrap();
729 assert!(task_val.contains("\"quotes\""));
730 assert!(task_val.contains("<angles>"));
731 }
732
733 #[test]
734 fn task_escalated_serializes_role_and_task() {
735 let event = TeamEvent::task_escalated("eng-1-1", "42", Some("tests_failed"));
736 let json = serde_json::to_string(&event).unwrap();
737 assert!(json.contains("\"event\":\"task_escalated\""));
738 assert!(json.contains("\"role\":\"eng-1-1\""));
739 assert!(json.contains("\"task\":\"42\""));
740 assert!(json.contains("\"reason\":\"tests_failed\""));
741 }
742
743 #[test]
744 fn cwd_corrected_serializes_role_and_reason() {
745 let event = TeamEvent::cwd_corrected("eng-1-1", "/tmp/worktree");
746 let json = serde_json::to_string(&event).unwrap();
747 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
748 assert_eq!(parsed["event"].as_str().unwrap(), "cwd_corrected");
749 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
750 assert_eq!(parsed["reason"].as_str().unwrap(), "/tmp/worktree");
751 }
752
753 #[test]
754 fn pane_death_serializes_role() {
755 let event = TeamEvent::pane_death("eng-1-1");
756 let json = serde_json::to_string(&event).unwrap();
757 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
758 assert_eq!(parsed["event"].as_str().unwrap(), "pane_death");
759 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
760 }
761
762 #[test]
763 fn pane_respawned_serializes_role() {
764 let event = TeamEvent::pane_respawned("eng-1-1");
765 let json = serde_json::to_string(&event).unwrap();
766 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
767 assert_eq!(parsed["event"].as_str().unwrap(), "pane_respawned");
768 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
769 }
770
771 #[test]
772 fn agent_restarted_includes_reason_task_and_count() {
773 let event = TeamEvent::agent_restarted("eng-1-2", "67", "context_exhausted", 2);
774 let json = serde_json::to_string(&event).unwrap();
775 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
776 assert_eq!(parsed["event"].as_str().unwrap(), "agent_restarted");
777 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
778 assert_eq!(parsed["task"].as_str().unwrap(), "67");
779 assert_eq!(parsed["reason"].as_str().unwrap(), "context_exhausted");
780 assert_eq!(parsed["restart_count"].as_u64().unwrap(), 2);
781 }
782
783 #[test]
784 fn delivery_failed_includes_role_sender_and_reason() {
785 let event = TeamEvent::delivery_failed("eng-1-2", "manager", "message marker missing");
786 let json = serde_json::to_string(&event).unwrap();
787 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
788 assert_eq!(parsed["event"].as_str().unwrap(), "delivery_failed");
789 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
790 assert_eq!(parsed["from"].as_str().unwrap(), "manager");
791 assert_eq!(parsed["reason"].as_str().unwrap(), "message marker missing");
792 }
793
794 #[test]
795 fn task_unblocked_serializes_role_and_task() {
796 let event = TeamEvent::task_unblocked("eng-1-1", "42");
797 let json = serde_json::to_string(&event).unwrap();
798 assert!(json.contains("\"event\":\"task_unblocked\""));
799 assert!(json.contains("\"role\":\"eng-1-1\""));
800 assert!(json.contains("\"task\":\"42\""));
801 }
802
803 #[test]
804 fn merge_confidence_scored_includes_all_fields() {
805 let event = TeamEvent::merge_confidence_scored(&MergeConfidenceInfo {
806 engineer: "eng-1-1",
807 task: "42",
808 confidence: 0.85,
809 files_changed: 3,
810 lines_changed: 50,
811 has_migrations: true,
812 has_config_changes: false,
813 rename_count: 1,
814 });
815 let json = serde_json::to_string(&event).unwrap();
816 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
817 assert_eq!(parsed["event"].as_str().unwrap(), "merge_confidence_scored");
818 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
819 assert_eq!(parsed["task"].as_str().unwrap(), "42");
820 assert!((parsed["load"].as_f64().unwrap() - 0.85).abs() < 0.001);
821 let reason = parsed["reason"].as_str().unwrap();
822 assert!(reason.contains("files=3"));
823 assert!(reason.contains("lines=50"));
824 assert!(reason.contains("migrations=true"));
825 assert!(reason.contains("config=false"));
826 assert!(reason.contains("renames=1"));
827 }
828
829 #[test]
830 fn performance_regression_serializes_task_and_reason() {
831 let event = TeamEvent::performance_regression("42", "runtime_ms=1300 avg_ms=1000 pct=30");
832 let json = serde_json::to_string(&event).unwrap();
833 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
834 assert_eq!(parsed["event"].as_str().unwrap(), "performance_regression");
835 assert_eq!(parsed["task"].as_str().unwrap(), "42");
836 assert_eq!(
837 parsed["reason"].as_str().unwrap(),
838 "runtime_ms=1300 avg_ms=1000 pct=30"
839 );
840 }
841
842 #[test]
843 fn daemon_stopped_with_reason_includes_fields() {
844 let event = TeamEvent::daemon_stopped_with_reason("signal", 7200);
845 let json = serde_json::to_string(&event).unwrap();
846 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
847 assert_eq!(parsed["reason"].as_str().unwrap(), "signal");
848 assert_eq!(parsed["uptime_secs"].as_u64().unwrap(), 7200);
849 }
850
851 #[test]
852 fn heartbeat_includes_uptime() {
853 let event = TeamEvent::daemon_heartbeat(600);
854 let json = serde_json::to_string(&event).unwrap();
855 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
856 assert_eq!(parsed["event"].as_str().unwrap(), "daemon_heartbeat");
857 assert_eq!(parsed["uptime_secs"].as_u64().unwrap(), 600);
858 assert!(parsed.get("reason").is_none());
860 assert!(parsed.get("step").is_none());
861 }
862
863 #[test]
864 fn context_exhausted_includes_role_task_and_session_size() {
865 let event = TeamEvent::context_exhausted("eng-1", Some(77), Some(4096));
866 let json = serde_json::to_string(&event).unwrap();
867 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
868 assert_eq!(parsed["event"].as_str().unwrap(), "context_exhausted");
869 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1");
870 assert_eq!(parsed["task"].as_str().unwrap(), "77");
871 assert_eq!(parsed["session_size_bytes"].as_u64().unwrap(), 4096);
872 }
873
874 #[test]
875 fn loop_step_error_includes_step_and_error() {
876 let event = TeamEvent::loop_step_error("poll_watchers", "connection refused");
877 let json = serde_json::to_string(&event).unwrap();
878 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
879 assert_eq!(parsed["step"].as_str().unwrap(), "poll_watchers");
880 assert_eq!(parsed["error"].as_str().unwrap(), "connection refused");
881 }
882
883 #[test]
884 fn daemon_panic_includes_reason() {
885 let event = TeamEvent::daemon_panic("index out of bounds");
886 let json = serde_json::to_string(&event).unwrap();
887 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
888 assert_eq!(parsed["event"].as_str().unwrap(), "daemon_panic");
889 assert_eq!(parsed["reason"].as_str().unwrap(), "index out of bounds");
890 }
891
892 #[test]
893 fn new_fields_omitted_from_basic_events() {
894 let event = TeamEvent::daemon_started();
895 let json = serde_json::to_string(&event).unwrap();
896 assert!(!json.contains("\"reason\""));
897 assert!(!json.contains("\"step\""));
898 assert!(!json.contains("\"error\""));
899 assert!(!json.contains("\"uptime_secs\""));
900 assert!(!json.contains("\"restart_count\""));
901 }
902
903 #[test]
904 fn pattern_detected_includes_reason_payload() {
905 let event = TeamEvent::pattern_detected("escalation_cluster", 6);
906 let json = serde_json::to_string(&event).unwrap();
907 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
908 assert_eq!(parsed["event"].as_str().unwrap(), "pattern_detected");
909 assert_eq!(parsed["reason"].as_str().unwrap(), "escalation_cluster:6");
910 }
911
912 #[test]
913 fn event_sink_creates_parent_directories() {
914 let tmp = tempfile::tempdir().unwrap();
915 let path = tmp.path().join("deep").join("nested").join("events.jsonl");
916 let mut sink = EventSink::new(&path).unwrap();
917 sink.emit(TeamEvent::daemon_started()).unwrap();
918 assert!(path.exists());
919 assert_eq!(sink.path(), path);
920 }
921
922 #[test]
923 fn event_sink_rotates_oversized_log_on_open() {
924 let tmp = tempfile::tempdir().unwrap();
925 let path = tmp.path().join("events.jsonl");
926 fs::write(&path, "0123456789").unwrap();
927
928 let mut sink = EventSink::new_with_max_bytes(&path, 5).unwrap();
929 sink.emit(TeamEvent::daemon_started()).unwrap();
930
931 let rotated = rotated_event_log_path(&path);
932 assert_eq!(fs::read_to_string(&rotated).unwrap(), "0123456789");
933 let current = fs::read_to_string(&path).unwrap();
934 assert!(current.contains("daemon_started"));
935 }
936
937 #[test]
938 fn event_sink_rotates_before_write_that_would_exceed_threshold() {
939 let tmp = tempfile::tempdir().unwrap();
940 let path = tmp.path().join("events.jsonl");
941 let first_line = "{\"event\":\"first\"}\n";
942 fs::write(&path, first_line).unwrap();
943
944 let mut sink = EventSink::new_with_max_bytes(&path, first_line.len() as u64 + 10).unwrap();
945 sink.emit(TeamEvent::task_assigned(
946 "eng-1",
947 "this assignment is long enough to rotate",
948 ))
949 .unwrap();
950
951 let rotated = rotated_event_log_path(&path);
952 assert_eq!(fs::read_to_string(&rotated).unwrap(), first_line);
953 let current = fs::read_to_string(&path).unwrap();
954 assert!(current.contains("task_assigned"));
955 assert!(!current.contains("\"event\":\"first\""));
956 }
957
958 #[test]
959 fn event_round_trip_preserves_fields_for_agent_restarted() {
960 let original = TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 3);
961
962 let json = serde_json::to_string(&original).unwrap();
963 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
964
965 assert_eq!(parsed.event, "agent_restarted");
966 assert_eq!(parsed.role.as_deref(), Some("eng-1"));
967 assert_eq!(parsed.task.as_deref(), Some("42"));
968 assert_eq!(parsed.reason.as_deref(), Some("context_exhausted"));
969 assert_eq!(parsed.restart_count, Some(3));
970 assert_eq!(parsed.ts, original.ts);
971 }
972
973 #[test]
974 fn event_round_trip_preserves_fields_for_load_snapshot() {
975 let original = TeamEvent::load_snapshot(4, 8, true);
976
977 let json = serde_json::to_string(&original).unwrap();
978 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
979
980 assert_eq!(parsed.event, "load_snapshot");
981 assert_eq!(parsed.working_members, Some(4));
982 assert_eq!(parsed.total_members, Some(8));
983 assert_eq!(parsed.session_running, Some(true));
984 assert_eq!(parsed.load, Some(0.5));
985 assert_eq!(parsed.ts, original.ts);
986 }
987
988 #[test]
989 fn event_round_trip_preserves_fields_for_delivery_failed() {
990 let original = TeamEvent::delivery_failed("eng-2", "manager", "marker missing");
991
992 let json = serde_json::to_string(&original).unwrap();
993 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
994
995 assert_eq!(parsed.event, "delivery_failed");
996 assert_eq!(parsed.role.as_deref(), Some("eng-2"));
997 assert_eq!(parsed.from.as_deref(), Some("manager"));
998 assert_eq!(parsed.reason.as_deref(), Some("marker missing"));
999 assert_eq!(parsed.ts, original.ts);
1000 }
1001
1002 #[test]
1003 fn read_events_skips_blank_and_malformed_lines() {
1004 let tmp = tempfile::tempdir().unwrap();
1005 let path = tmp.path().join("events.jsonl");
1006 fs::write(
1007 &path,
1008 [
1009 "",
1010 "{\"event\":\"daemon_started\",\"ts\":1}",
1011 "not-json",
1012 " ",
1013 "{\"event\":\"daemon_stopped\",\"ts\":2}",
1014 ]
1015 .join("\n"),
1016 )
1017 .unwrap();
1018
1019 let events = read_events(&path).unwrap();
1020
1021 assert_eq!(events.len(), 2);
1022 assert_eq!(events[0].event, "daemon_started");
1023 assert_eq!(events[1].event, "daemon_stopped");
1024 }
1025
1026 #[test]
1027 fn rotate_event_log_if_needed_returns_false_for_missing_file() {
1028 let tmp = tempfile::tempdir().unwrap();
1029 let path = tmp.path().join("events.jsonl");
1030
1031 let rotated = rotate_event_log_if_needed(&path, 128, 0).unwrap();
1032
1033 assert!(!rotated);
1034 assert!(!rotated_event_log_path(&path).exists());
1035 }
1036
1037 #[test]
1038 fn rotate_event_log_if_needed_returns_false_for_empty_file() {
1039 let tmp = tempfile::tempdir().unwrap();
1040 let path = tmp.path().join("events.jsonl");
1041 fs::write(&path, "").unwrap();
1042
1043 let rotated = rotate_event_log_if_needed(&path, 1, 1).unwrap();
1044
1045 assert!(!rotated);
1046 assert!(path.exists());
1047 assert!(!rotated_event_log_path(&path).exists());
1048 }
1049
1050 #[test]
1051 fn rotate_event_log_if_needed_replaces_existing_rotated_file() {
1052 let tmp = tempfile::tempdir().unwrap();
1053 let path = tmp.path().join("events.jsonl");
1054 let rotated_path = rotated_event_log_path(&path);
1055 fs::write(&path, "current-events").unwrap();
1056 fs::write(&rotated_path, "old-rotated-events").unwrap();
1057
1058 let rotated = rotate_event_log_if_needed(&path, 5, 0).unwrap();
1059
1060 assert!(rotated);
1061 assert_eq!(fs::read_to_string(&rotated_path).unwrap(), "current-events");
1062 }
1063
1064 #[test]
1065 fn concurrent_event_sinks_append_without_losing_lines() {
1066 let tmp = tempfile::tempdir().unwrap();
1067 let path = Arc::new(tmp.path().join("events.jsonl"));
1068 let ready = Arc::new(std::sync::Barrier::new(5));
1069 let errors = Arc::new(Mutex::new(Vec::<String>::new()));
1070 let mut handles = Vec::new();
1071
1072 for idx in 0..4 {
1073 let path = Arc::clone(&path);
1074 let ready = Arc::clone(&ready);
1075 let errors = Arc::clone(&errors);
1076 handles.push(thread::spawn(move || {
1077 ready.wait();
1078 let result = (|| -> Result<()> {
1079 let mut sink = EventSink::new(&path)?;
1080 sink.emit(TeamEvent::task_assigned(
1081 &format!("eng-{idx}"),
1082 &format!("task-{idx}"),
1083 ))?;
1084 Ok(())
1085 })();
1086 if let Err(error) = result {
1087 errors.lock().unwrap().push(error.to_string());
1088 }
1089 }));
1090 }
1091
1092 ready.wait();
1093 for handle in handles {
1094 handle.join().unwrap();
1095 }
1096
1097 assert!(errors.lock().unwrap().is_empty());
1098 let events = read_events(&path).unwrap();
1099 assert_eq!(events.len(), 4);
1100 for idx in 0..4 {
1101 assert!(
1102 events
1103 .iter()
1104 .any(|event| event.role.as_deref() == Some(&format!("eng-{idx}")))
1105 );
1106 }
1107 }
1108
1109 #[test]
1110 fn read_events_handles_large_log_file() {
1111 let tmp = tempfile::tempdir().unwrap();
1112 let path = tmp.path().join("events.jsonl");
1113 let mut sink = EventSink::new(&path).unwrap();
1114
1115 for idx in 0..512 {
1116 sink.emit(TeamEvent::task_assigned(
1117 &format!("eng-{idx}"),
1118 &"x".repeat(128),
1119 ))
1120 .unwrap();
1121 }
1122
1123 let events = read_events(&path).unwrap();
1124
1125 assert_eq!(events.len(), 512);
1126 assert_eq!(events.first().unwrap().event, "task_assigned");
1127 assert_eq!(events.last().unwrap().event, "task_assigned");
1128 }
1129
1130 fn production_unwrap_expect_count(source: &str) -> usize {
1131 let prod = if let Some(pos) = source.find("\n#[cfg(test)]\nmod tests") {
1132 &source[..pos]
1133 } else {
1134 source
1135 };
1136 prod.lines()
1137 .filter(|line| {
1138 let trimmed = line.trim();
1139 !trimmed.starts_with("#[cfg(test)]")
1140 && (trimmed.contains(".unwrap(") || trimmed.contains(".expect("))
1141 })
1142 .count()
1143 }
1144
1145 #[test]
1146 fn task_completed_includes_task_id() {
1147 let event = TeamEvent::task_completed("eng-1", Some("42"));
1148 let json = serde_json::to_string(&event).unwrap();
1149 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1150 assert_eq!(parsed["event"].as_str().unwrap(), "task_completed");
1151 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1");
1152 assert_eq!(parsed["task"].as_str().unwrap(), "42");
1153 }
1154
1155 #[test]
1156 fn task_completed_without_task_id_omits_task_field() {
1157 let event = TeamEvent::task_completed("eng-1", None);
1158 let json = serde_json::to_string(&event).unwrap();
1159 assert!(json.contains("\"event\":\"task_completed\""));
1160 assert!(json.contains("\"role\":\"eng-1\""));
1161 assert!(!json.contains("\"task\""));
1162 }
1163
1164 #[test]
1165 fn task_escalated_without_reason_omits_reason_field() {
1166 let event = TeamEvent::task_escalated("eng-1", "42", None);
1167 let json = serde_json::to_string(&event).unwrap();
1168 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1169 assert_eq!(parsed["event"].as_str().unwrap(), "task_escalated");
1170 assert_eq!(parsed["task"].as_str().unwrap(), "42");
1171 assert!(parsed.get("reason").is_none());
1172 }
1173
1174 #[test]
1175 fn task_escalated_with_reason_includes_reason_field() {
1176 let event = TeamEvent::task_escalated("eng-1", "42", Some("merge_conflict"));
1177 let json = serde_json::to_string(&event).unwrap();
1178 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1179 assert_eq!(parsed["event"].as_str().unwrap(), "task_escalated");
1180 assert_eq!(parsed["task"].as_str().unwrap(), "42");
1181 assert_eq!(parsed["reason"].as_str().unwrap(), "merge_conflict");
1182 }
1183
1184 #[test]
1185 fn task_completed_round_trip_preserves_task_id() {
1186 let original = TeamEvent::task_completed("eng-1", Some("99"));
1187 let json = serde_json::to_string(&original).unwrap();
1188 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1189 assert_eq!(parsed.event, "task_completed");
1190 assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1191 assert_eq!(parsed.task.as_deref(), Some("99"));
1192 }
1193
1194 #[test]
1195 fn task_escalated_round_trip_preserves_reason() {
1196 let original = TeamEvent::task_escalated("eng-1", "42", Some("context_exhausted"));
1197 let json = serde_json::to_string(&original).unwrap();
1198 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1199 assert_eq!(parsed.event, "task_escalated");
1200 assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1201 assert_eq!(parsed.task.as_deref(), Some("42"));
1202 assert_eq!(parsed.reason.as_deref(), Some("context_exhausted"));
1203 }
1204
1205 #[test]
1206 fn production_events_has_no_unwrap_or_expect_calls() {
1207 let src = include_str!("events.rs");
1208 assert_eq!(
1209 production_unwrap_expect_count(src),
1210 0,
1211 "production events.rs should avoid unwrap/expect"
1212 );
1213 }
1214
1215 #[test]
1216 fn stall_detected_event_fields() {
1217 let event = TeamEvent::stall_detected("eng-1-1", Some(42), 300);
1218 assert_eq!(event.event, "stall_detected");
1219 assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1220 assert_eq!(event.task.as_deref(), Some("42"));
1221 assert_eq!(event.uptime_secs, Some(300));
1222 }
1223
1224 #[test]
1225 fn stall_detected_event_without_task() {
1226 let event = TeamEvent::stall_detected("eng-1-1", None, 600);
1227 assert_eq!(event.event, "stall_detected");
1228 assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1229 assert!(event.task.is_none());
1230 assert_eq!(event.uptime_secs, Some(600));
1231 }
1232
1233 #[test]
1234 fn stall_detected_event_serializes_to_jsonl() {
1235 let event = TeamEvent::stall_detected("eng-1-1", Some(42), 300);
1236 let json = serde_json::to_string(&event).unwrap();
1237 assert!(json.contains("\"stall_detected\""));
1238 assert!(json.contains("\"eng-1-1\""));
1239 assert!(json.contains("\"42\""));
1240 }
1241
1242 #[test]
1243 fn health_changed_event_fields() {
1244 let event = TeamEvent::health_changed("eng-1-1", "healthy→unreachable");
1245 assert_eq!(event.event, "health_changed");
1246 assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1247 assert_eq!(event.reason.as_deref(), Some("healthy→unreachable"));
1248 }
1249
1250 #[test]
1251 fn health_changed_event_serializes_to_jsonl() {
1252 let event = TeamEvent::health_changed("eng-1-2", "unreachable→healthy");
1253 let json = serde_json::to_string(&event).unwrap();
1254 assert!(json.contains("\"health_changed\""));
1255 assert!(json.contains("\"eng-1-2\""));
1256 }
1257
1258 #[test]
1261 fn event_sink_on_readonly_dir_returns_error() {
1262 #[cfg(unix)]
1263 {
1264 use std::os::unix::fs::PermissionsExt;
1265 let tmp = tempfile::tempdir().unwrap();
1266 let readonly_dir = tmp.path().join("readonly");
1267 fs::create_dir(&readonly_dir).unwrap();
1268 fs::set_permissions(&readonly_dir, fs::Permissions::from_mode(0o444)).unwrap();
1269
1270 let path = readonly_dir.join("subdir").join("events.jsonl");
1271 let result = EventSink::new(&path);
1272 assert!(result.is_err());
1273
1274 fs::set_permissions(&readonly_dir, fs::Permissions::from_mode(0o755)).unwrap();
1276 }
1277 }
1278
1279 #[test]
1280 fn read_events_from_nonexistent_file_returns_empty() {
1281 let tmp = tempfile::tempdir().unwrap();
1282 let path = tmp.path().join("does_not_exist.jsonl");
1283 let events = read_events(&path).unwrap();
1284 assert!(events.is_empty());
1285 }
1286
1287 #[test]
1288 fn read_events_all_malformed_lines_returns_empty() {
1289 let tmp = tempfile::tempdir().unwrap();
1290 let path = tmp.path().join("events.jsonl");
1291 fs::write(&path, "not json\nalso not json\n{invalid}\n").unwrap();
1292 let events = read_events(&path).unwrap();
1293 assert!(events.is_empty());
1294 }
1295
1296 #[test]
1297 fn event_sink_emit_with_failing_writer() {
1298 struct FailWriter;
1299 impl Write for FailWriter {
1300 fn write(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
1301 Err(std::io::Error::new(
1302 std::io::ErrorKind::BrokenPipe,
1303 "simulated write failure",
1304 ))
1305 }
1306 fn flush(&mut self) -> std::io::Result<()> {
1307 Err(std::io::Error::new(
1308 std::io::ErrorKind::BrokenPipe,
1309 "simulated flush failure",
1310 ))
1311 }
1312 }
1313
1314 let tmp = tempfile::tempdir().unwrap();
1315 let path = tmp.path().join("events.jsonl");
1316 let mut sink = EventSink::from_writer(&path, FailWriter);
1317 let result = sink.emit(TeamEvent::daemon_started());
1318 assert!(result.is_err());
1319 }
1320
1321 #[test]
1322 fn rotate_event_log_replaces_stale_rotated_file() {
1323 let tmp = tempfile::tempdir().unwrap();
1324 let path = tmp.path().join("events.jsonl");
1325 let rotated = rotated_event_log_path(&path);
1326
1327 fs::write(&path, "current-data-that-is-large").unwrap();
1328 fs::write(&rotated, "old-rotated-data").unwrap();
1329
1330 let did_rotate = rotate_event_log_if_needed(&path, 5, 0).unwrap();
1331 assert!(did_rotate);
1332 assert_eq!(
1334 fs::read_to_string(&rotated).unwrap(),
1335 "current-data-that-is-large"
1336 );
1337 assert!(!path.exists());
1339 }
1340
1341 #[test]
1342 fn event_sink_handles_zero_max_bytes_rotation() {
1343 let tmp = tempfile::tempdir().unwrap();
1344 let path = tmp.path().join("events.jsonl");
1345
1346 fs::write(&path, "").unwrap();
1348 let did_rotate = rotate_event_log_if_needed(&path, 0, 0).unwrap();
1349 assert!(!did_rotate); fs::write(&path, "x").unwrap();
1352 let did_rotate = rotate_event_log_if_needed(&path, 0, 0).unwrap();
1353 assert!(did_rotate); }
1355
1356 #[test]
1357 fn read_events_partial_json_with_valid_lines_mixed() {
1358 let tmp = tempfile::tempdir().unwrap();
1359 let path = tmp.path().join("events.jsonl");
1360 let content = format!(
1362 "{}\n{{\"event\":\"trunca\n{}\n",
1363 r#"{"event":"daemon_started","ts":1}"#, r#"{"event":"daemon_stopped","ts":3}"#
1364 );
1365 fs::write(&path, content).unwrap();
1366
1367 let events = read_events(&path).unwrap();
1368 assert_eq!(events.len(), 2);
1369 assert_eq!(events[0].event, "daemon_started");
1370 assert_eq!(events[1].event, "daemon_stopped");
1371 }
1372}