1use std::fs::{self, OpenOptions};
4use std::io::{BufWriter, Write};
5use std::path::{Path, PathBuf};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10
11use super::DEFAULT_EVENT_LOG_MAX_BYTES;
12
13pub struct MergeConfidenceInfo<'a> {
15 pub engineer: &'a str,
16 pub task: &'a str,
17 pub confidence: f64,
18 pub files_changed: usize,
19 pub lines_changed: usize,
20 pub has_migrations: bool,
21 pub has_config_changes: bool,
22 pub rename_count: usize,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26
27pub struct TeamEvent {
28 pub event: String,
29 #[serde(skip_serializing_if = "Option::is_none")]
30 pub role: Option<String>,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 pub task: Option<String>,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub recipient: Option<String>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub from: Option<String>,
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub to: Option<String>,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub restart: Option<bool>,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 pub restart_count: Option<u32>,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 pub load: Option<f64>,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub working_members: Option<u32>,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub total_members: Option<u32>,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub session_running: Option<bool>,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub reason: Option<String>,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 pub step: Option<String>,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 pub error: Option<String>,
57 #[serde(skip_serializing_if = "Option::is_none")]
58 pub uptime_secs: Option<u64>,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub session_size_bytes: Option<u64>,
61 pub ts: u64,
62}
63
64impl TeamEvent {
65 fn now() -> u64 {
66 SystemTime::now()
67 .duration_since(UNIX_EPOCH)
68 .unwrap_or_default()
69 .as_secs()
70 }
71
72 fn base(event: &str) -> Self {
73 Self {
74 event: event.into(),
75 role: None,
76 task: None,
77 recipient: None,
78 from: None,
79 to: None,
80 restart: None,
81 restart_count: None,
82 load: None,
83 working_members: None,
84 total_members: None,
85 session_running: None,
86 reason: None,
87 step: None,
88 error: None,
89 uptime_secs: None,
90 session_size_bytes: None,
91 ts: Self::now(),
92 }
93 }
94
95 pub fn daemon_started() -> Self {
96 Self::base("daemon_started")
97 }
98
99 pub fn daemon_reloading() -> Self {
100 Self::base("daemon_reloading")
101 }
102
103 pub fn daemon_reloaded() -> Self {
104 Self::base("daemon_reloaded")
105 }
106
107 #[allow(dead_code)]
108 pub fn daemon_stopped() -> Self {
109 Self::base("daemon_stopped")
110 }
111
112 pub fn daemon_stopped_with_reason(reason: &str, uptime_secs: u64) -> Self {
113 Self {
114 reason: Some(reason.into()),
115 uptime_secs: Some(uptime_secs),
116 ..Self::base("daemon_stopped")
117 }
118 }
119
120 pub fn daemon_heartbeat(uptime_secs: u64) -> Self {
121 Self {
122 uptime_secs: Some(uptime_secs),
123 ..Self::base("daemon_heartbeat")
124 }
125 }
126
127 pub fn context_exhausted(
128 role: &str,
129 task: Option<u32>,
130 session_size_bytes: Option<u64>,
131 ) -> Self {
132 Self {
133 role: Some(role.into()),
134 task: task.map(|task_id| task_id.to_string()),
135 session_size_bytes,
136 ..Self::base("context_exhausted")
137 }
138 }
139
140 pub fn loop_step_error(step: &str, error: &str) -> Self {
141 Self {
142 step: Some(step.into()),
143 error: Some(error.into()),
144 ..Self::base("loop_step_error")
145 }
146 }
147
148 pub fn daemon_panic(reason: &str) -> Self {
149 Self {
150 reason: Some(reason.into()),
151 ..Self::base("daemon_panic")
152 }
153 }
154
155 pub fn task_assigned(role: &str, task: &str) -> Self {
156 Self {
157 role: Some(role.into()),
158 task: Some(task.into()),
159 ..Self::base("task_assigned")
160 }
161 }
162
163 pub fn cwd_corrected(role: &str, path: &str) -> Self {
164 Self {
165 role: Some(role.into()),
166 reason: Some(path.into()),
167 ..Self::base("cwd_corrected")
168 }
169 }
170
171 pub fn review_nudge_sent(role: &str, task: &str) -> Self {
172 Self {
173 role: Some(role.into()),
174 task: Some(task.into()),
175 ..Self::base("review_nudge_sent")
176 }
177 }
178
179 pub fn review_escalated(task: &str, reason: &str) -> Self {
180 Self {
181 task: Some(task.into()),
182 reason: Some(reason.into()),
183 ..Self::base("review_escalated")
184 }
185 }
186
187 pub fn task_escalated(role: &str, task: &str, reason: Option<&str>) -> Self {
188 Self {
189 role: Some(role.into()),
190 task: Some(task.into()),
191 reason: reason.map(|r| r.into()),
192 ..Self::base("task_escalated")
193 }
194 }
195
196 pub fn task_unblocked(role: &str, task: &str) -> Self {
197 Self {
198 role: Some(role.into()),
199 task: Some(task.into()),
200 ..Self::base("task_unblocked")
201 }
202 }
203
204 pub fn performance_regression(task: &str, reason: &str) -> Self {
205 Self {
206 task: Some(task.into()),
207 reason: Some(reason.into()),
208 ..Self::base("performance_regression")
209 }
210 }
211
212 pub fn task_completed(role: &str, task: Option<&str>) -> Self {
213 Self {
214 role: Some(role.into()),
215 task: task.map(|t| t.into()),
216 ..Self::base("task_completed")
217 }
218 }
219
220 pub fn standup_generated(recipient: &str) -> Self {
221 Self {
222 recipient: Some(recipient.into()),
223 ..Self::base("standup_generated")
224 }
225 }
226
227 pub fn retro_generated() -> Self {
228 Self::base("retro_generated")
229 }
230
231 pub fn pattern_detected(pattern_type: &str, frequency: u32) -> Self {
232 Self {
233 reason: Some(format!("{pattern_type}:{frequency}")),
234 ..Self::base("pattern_detected")
235 }
236 }
237
238 pub fn member_crashed(role: &str, restart: bool) -> Self {
239 Self {
240 role: Some(role.into()),
241 restart: Some(restart),
242 ..Self::base("member_crashed")
243 }
244 }
245
246 pub fn pane_death(role: &str) -> Self {
247 Self {
248 role: Some(role.into()),
249 ..Self::base("pane_death")
250 }
251 }
252
253 pub fn pane_respawned(role: &str) -> Self {
254 Self {
255 role: Some(role.into()),
256 ..Self::base("pane_respawned")
257 }
258 }
259
260 pub fn stall_detected(role: &str, task: Option<u32>, stall_duration_secs: u64) -> Self {
261 Self {
262 role: Some(role.into()),
263 task: task.map(|id| id.to_string()),
264 uptime_secs: Some(stall_duration_secs),
265 ..Self::base("stall_detected")
266 }
267 }
268
269 pub fn health_changed(role: &str, reason: &str) -> Self {
273 Self {
274 role: Some(role.into()),
275 reason: Some(reason.into()),
276 ..Self::base("health_changed")
277 }
278 }
279
280 pub fn message_routed(from: &str, to: &str) -> Self {
281 Self {
282 from: Some(from.into()),
283 to: Some(to.into()),
284 ..Self::base("message_routed")
285 }
286 }
287
288 pub fn agent_spawned(role: &str) -> Self {
289 Self {
290 role: Some(role.into()),
291 ..Self::base("agent_spawned")
292 }
293 }
294
295 pub fn agent_restarted(role: &str, task: &str, reason: &str, restart_count: u32) -> Self {
296 Self {
297 role: Some(role.into()),
298 task: Some(task.into()),
299 reason: Some(reason.into()),
300 restart_count: Some(restart_count),
301 ..Self::base("agent_restarted")
302 }
303 }
304
305 pub fn delivery_failed(role: &str, from: &str, reason: &str) -> Self {
306 Self {
307 role: Some(role.into()),
308 from: Some(from.into()),
309 reason: Some(reason.into()),
310 ..Self::base("delivery_failed")
311 }
312 }
313
314 pub fn task_auto_merged(
315 engineer: &str,
316 task: &str,
317 confidence: f64,
318 files_changed: usize,
319 lines_changed: usize,
320 ) -> Self {
321 Self {
322 role: Some(engineer.into()),
323 task: Some(task.into()),
324 load: Some(confidence),
325 reason: Some(format!("files={} lines={}", files_changed, lines_changed)),
326 ..Self::base("task_auto_merged")
327 }
328 }
329
330 pub fn task_manual_merged(task: &str) -> Self {
331 Self {
332 task: Some(task.into()),
333 ..Self::base("task_manual_merged")
334 }
335 }
336
337 pub fn merge_confidence_scored(info: &MergeConfidenceInfo<'_>) -> Self {
339 let detail = format!(
340 "files={} lines={} migrations={} config={} renames={}",
341 info.files_changed,
342 info.lines_changed,
343 info.has_migrations,
344 info.has_config_changes,
345 info.rename_count
346 );
347 Self {
348 role: Some(info.engineer.into()),
349 task: Some(info.task.into()),
350 load: Some(info.confidence),
351 reason: Some(detail),
352 ..Self::base("merge_confidence_scored")
353 }
354 }
355
356 pub fn review_escalated_by_role(role: &str, task: &str) -> Self {
357 Self {
358 role: Some(role.into()),
359 task: Some(task.into()),
360 ..Self::base("review_escalated")
361 }
362 }
363
364 pub fn task_reworked(role: &str, task: &str) -> Self {
365 Self {
366 role: Some(role.into()),
367 task: Some(task.into()),
368 ..Self::base("task_reworked")
369 }
370 }
371
372 pub fn task_recycled(task_id: u32, cron_expr: &str) -> Self {
373 Self {
374 task: Some(format!("#{task_id}")),
375 reason: Some(cron_expr.into()),
376 ..Self::base("task_recycled")
377 }
378 }
379
380 pub fn load_snapshot(working_members: u32, total_members: u32, session_running: bool) -> Self {
381 let load = if total_members == 0 {
382 0.0
383 } else {
384 working_members as f64 / total_members as f64
385 };
386 Self {
387 load: Some(load),
388 working_members: Some(working_members),
389 total_members: Some(total_members),
390 session_running: Some(session_running),
391 ..Self::base("load_snapshot")
392 }
393 }
394
395 pub fn worktree_reconciled(role: &str, branch: &str) -> Self {
396 Self {
397 role: Some(role.into()),
398 reason: Some(format!("branch '{branch}' merged into main")),
399 ..Self::base("worktree_reconciled")
400 }
401 }
402}
403
404pub struct EventSink {
405 writer: Box<dyn Write + Send>,
406 path: PathBuf,
407 max_bytes: Option<u64>,
408}
409
410impl EventSink {
411 pub fn new(path: &Path) -> Result<Self> {
412 Self::new_with_max_bytes(path, DEFAULT_EVENT_LOG_MAX_BYTES)
413 }
414
415 pub fn new_with_max_bytes(path: &Path, max_bytes: u64) -> Result<Self> {
416 if let Some(parent) = path.parent() {
417 std::fs::create_dir_all(parent)?;
418 }
419 rotate_event_log_if_needed(path, max_bytes, 0)?;
420 let file = OpenOptions::new()
421 .create(true)
422 .append(true)
423 .open(path)
424 .with_context(|| format!("failed to open event sink: {}", path.display()))?;
425 Ok(Self {
426 writer: Box::new(BufWriter::new(file)),
427 path: path.to_path_buf(),
428 max_bytes: Some(max_bytes),
429 })
430 }
431
432 #[cfg(test)]
433 pub(crate) fn from_writer(path: &Path, writer: impl Write + Send + 'static) -> Self {
434 Self {
435 writer: Box::new(writer),
436 path: path.to_path_buf(),
437 max_bytes: None,
438 }
439 }
440
441 pub fn emit(&mut self, event: TeamEvent) -> Result<()> {
442 let json = serde_json::to_string(&event)?;
443 self.rotate_if_needed((json.len() + 1) as u64)?;
444 writeln!(self.writer, "{json}")?;
445 self.writer.flush()?;
446 Ok(())
447 }
448
449 #[allow(dead_code)]
450 pub fn path(&self) -> &Path {
451 &self.path
452 }
453
454 fn rotate_if_needed(&mut self, next_entry_bytes: u64) -> Result<()> {
455 let Some(max_bytes) = self.max_bytes else {
456 return Ok(());
457 };
458 self.writer.flush()?;
459 if rotate_event_log_if_needed(&self.path, max_bytes, next_entry_bytes)? {
460 self.writer = Box::new(BufWriter::new(
461 OpenOptions::new()
462 .create(true)
463 .append(true)
464 .open(&self.path)
465 .with_context(|| {
466 format!("failed to reopen event sink: {}", self.path.display())
467 })?,
468 ));
469 }
470 Ok(())
471 }
472}
473
474fn rotated_event_log_path(path: &Path) -> PathBuf {
475 PathBuf::from(format!("{}.1", path.display()))
476}
477
478fn rotate_event_log_if_needed(path: &Path, max_bytes: u64, next_entry_bytes: u64) -> Result<bool> {
479 let len = match fs::metadata(path) {
480 Ok(metadata) => metadata.len(),
481 Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(false),
482 Err(error) => {
483 return Err(error).with_context(|| format!("failed to stat {}", path.display()));
484 }
485 };
486
487 if len == 0 {
488 return Ok(false);
489 }
490
491 if len.saturating_add(next_entry_bytes) <= max_bytes {
492 return Ok(false);
493 }
494
495 let rotated = rotated_event_log_path(path);
496 if rotated.exists() {
497 fs::remove_file(&rotated)
498 .with_context(|| format!("failed to remove {}", rotated.display()))?;
499 }
500 fs::rename(path, &rotated).with_context(|| {
501 format!(
502 "failed to rotate event log {} to {}",
503 path.display(),
504 rotated.display()
505 )
506 })?;
507 Ok(true)
508}
509
510pub fn read_events(path: &Path) -> Result<Vec<TeamEvent>> {
511 if !path.exists() {
512 return Ok(Vec::new());
513 }
514 let content = fs::read_to_string(path).context("failed to read event log")?;
515 let mut events = Vec::new();
516 for line in content.lines() {
517 let line = line.trim();
518 if line.is_empty() {
519 continue;
520 }
521 if let Ok(event) = serde_json::from_str::<TeamEvent>(line) {
522 events.push(event);
523 }
524 }
525 Ok(events)
526}
527
528#[cfg(test)]
529mod tests {
530 use std::sync::{Arc, Mutex};
531 use std::thread;
532
533 use super::*;
534
535 #[test]
536 fn event_serializes_to_json() {
537 let event = TeamEvent::task_assigned("eng-1-1", "fix auth bug");
538 let json = serde_json::to_string(&event).unwrap();
539 assert!(json.contains("\"event\":\"task_assigned\""));
540 assert!(json.contains("\"role\":\"eng-1-1\""));
541 assert!(json.contains("\"task\":\"fix auth bug\""));
542 assert!(json.contains("\"ts\":"));
543 }
544
545 #[test]
546 fn optional_fields_omitted() {
547 let event = TeamEvent::daemon_started();
548 let json = serde_json::to_string(&event).unwrap();
549 assert!(!json.contains("\"role\""));
550 assert!(!json.contains("\"task\""));
551 }
552
553 #[test]
554 fn event_sink_writes_jsonl() {
555 let tmp = tempfile::tempdir().unwrap();
556 let path = tmp.path().join("events.jsonl");
557 let mut sink = EventSink::new(&path).unwrap();
558 sink.emit(TeamEvent::daemon_started()).unwrap();
559 sink.emit(TeamEvent::task_assigned("eng-1", "fix bug"))
560 .unwrap();
561 sink.emit(TeamEvent::daemon_stopped()).unwrap();
562
563 let content = std::fs::read_to_string(&path).unwrap();
564 let lines: Vec<&str> = content.lines().collect();
565 assert_eq!(lines.len(), 3);
566 assert!(lines[0].contains("daemon_started"));
567 assert!(lines[1].contains("task_assigned"));
568 assert!(lines[2].contains("daemon_stopped"));
569 }
570
571 #[test]
572 fn all_event_variants_serialize_with_correct_event_field() {
573 let variants: Vec<(&str, TeamEvent)> = vec![
574 ("daemon_started", TeamEvent::daemon_started()),
575 ("daemon_reloading", TeamEvent::daemon_reloading()),
576 ("daemon_reloaded", TeamEvent::daemon_reloaded()),
577 ("daemon_stopped", TeamEvent::daemon_stopped()),
578 (
579 "daemon_stopped",
580 TeamEvent::daemon_stopped_with_reason("signal", 3600),
581 ),
582 ("daemon_heartbeat", TeamEvent::daemon_heartbeat(120)),
583 (
584 "context_exhausted",
585 TeamEvent::context_exhausted("eng-1", Some(42), Some(1_024)),
586 ),
587 (
588 "loop_step_error",
589 TeamEvent::loop_step_error("poll_watchers", "tmux error"),
590 ),
591 (
592 "daemon_panic",
593 TeamEvent::daemon_panic("index out of bounds"),
594 ),
595 ("task_assigned", TeamEvent::task_assigned("eng-1", "task")),
596 (
597 "cwd_corrected",
598 TeamEvent::cwd_corrected("eng-1", "/tmp/worktree"),
599 ),
600 (
601 "task_escalated",
602 TeamEvent::task_escalated("eng-1", "task", None),
603 ),
604 ("task_unblocked", TeamEvent::task_unblocked("eng-1", "task")),
605 (
606 "performance_regression",
607 TeamEvent::performance_regression("42", "runtime_ms=1300 avg_ms=1000 pct=30"),
608 ),
609 (
610 "task_completed",
611 TeamEvent::task_completed("eng-1", Some("42")),
612 ),
613 ("standup_generated", TeamEvent::standup_generated("manager")),
614 ("retro_generated", TeamEvent::retro_generated()),
615 (
616 "pattern_detected",
617 TeamEvent::pattern_detected("merge_conflict_recurrence", 5),
618 ),
619 ("member_crashed", TeamEvent::member_crashed("eng-1", true)),
620 ("pane_death", TeamEvent::pane_death("eng-1")),
621 ("pane_respawned", TeamEvent::pane_respawned("eng-1")),
622 ("message_routed", TeamEvent::message_routed("a", "b")),
623 ("agent_spawned", TeamEvent::agent_spawned("eng-1")),
624 (
625 "agent_restarted",
626 TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 1),
627 ),
628 (
629 "delivery_failed",
630 TeamEvent::delivery_failed("eng-1", "manager", "message marker missing"),
631 ),
632 (
633 "task_auto_merged",
634 TeamEvent::task_auto_merged("eng-1", "42", 0.95, 2, 30),
635 ),
636 ("task_manual_merged", TeamEvent::task_manual_merged("42")),
637 (
638 "merge_confidence_scored",
639 TeamEvent::merge_confidence_scored(&MergeConfidenceInfo {
640 engineer: "eng-1",
641 task: "42",
642 confidence: 0.85,
643 files_changed: 3,
644 lines_changed: 50,
645 has_migrations: false,
646 has_config_changes: false,
647 rename_count: 0,
648 }),
649 ),
650 (
651 "review_nudge_sent",
652 TeamEvent::review_nudge_sent("manager", "42"),
653 ),
654 (
655 "review_escalated",
656 TeamEvent::review_escalated("42", "stale review"),
657 ),
658 ("task_reworked", TeamEvent::task_reworked("eng-1", "42")),
659 ("load_snapshot", TeamEvent::load_snapshot(2, 5, true)),
660 (
661 "worktree_reconciled",
662 TeamEvent::worktree_reconciled("eng-1", "eng-1/42"),
663 ),
664 ];
665 for (expected_event, event) in &variants {
666 let json = serde_json::to_string(event).unwrap();
667 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
668 assert_eq!(parsed["event"].as_str().unwrap(), *expected_event);
669 assert!(parsed["ts"].as_u64().is_some());
670 }
671 }
672
673 #[test]
674 fn load_snapshot_serializes_optional_metrics() {
675 let event = TeamEvent::load_snapshot(3, 7, false);
676 let json = serde_json::to_string(&event).unwrap();
677 assert!(json.contains("\"event\":\"load_snapshot\""));
678 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
679 assert_eq!(parsed["load"].as_f64().unwrap(), 3.0 / 7.0);
680 assert_eq!(parsed["working_members"].as_u64().unwrap(), 3);
681 assert_eq!(parsed["total_members"].as_u64().unwrap(), 7);
682 assert!(!parsed["session_running"].as_bool().unwrap());
683 }
684
685 #[test]
686 fn read_events_parses_all_known_lines() {
687 let tmp = tempfile::tempdir().unwrap();
688 let path = tmp.path().join("events.jsonl");
689 let mut sink = EventSink::new(&path).unwrap();
690 sink.emit(TeamEvent::daemon_started()).unwrap();
691 sink.emit(TeamEvent::load_snapshot(1, 4, true)).unwrap();
692 sink.emit(TeamEvent::load_snapshot(2, 4, true)).unwrap();
693
694 let events = read_events(&path).unwrap();
695 assert_eq!(events.len(), 3);
696 assert_eq!(events[1].event, "load_snapshot");
697 assert_eq!(events[1].working_members, Some(1));
698 assert_eq!(events[2].total_members, Some(4));
699 }
700
701 #[test]
702 fn event_sink_appends_to_existing_file() {
703 let tmp = tempfile::tempdir().unwrap();
704 let path = tmp.path().join("events.jsonl");
705
706 {
708 let mut sink = EventSink::new(&path).unwrap();
709 sink.emit(TeamEvent::daemon_started()).unwrap();
710 }
711
712 {
714 let mut sink = EventSink::new(&path).unwrap();
715 sink.emit(TeamEvent::daemon_stopped()).unwrap();
716 }
717
718 let content = std::fs::read_to_string(&path).unwrap();
719 let lines: Vec<&str> = content.lines().collect();
720 assert_eq!(lines.len(), 2);
721 assert!(lines[0].contains("daemon_started"));
722 assert!(lines[1].contains("daemon_stopped"));
723 }
724
725 #[test]
726 fn event_with_special_characters_in_task() {
727 let event = TeamEvent::task_assigned("eng-1", "fix: \"quotes\" & <angles> / \\ newline\n");
728 let json = serde_json::to_string(&event).unwrap();
729 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
730 let task_val = parsed["task"].as_str().unwrap();
731 assert!(task_val.contains("\"quotes\""));
732 assert!(task_val.contains("<angles>"));
733 }
734
735 #[test]
736 fn task_escalated_serializes_role_and_task() {
737 let event = TeamEvent::task_escalated("eng-1-1", "42", Some("tests_failed"));
738 let json = serde_json::to_string(&event).unwrap();
739 assert!(json.contains("\"event\":\"task_escalated\""));
740 assert!(json.contains("\"role\":\"eng-1-1\""));
741 assert!(json.contains("\"task\":\"42\""));
742 assert!(json.contains("\"reason\":\"tests_failed\""));
743 }
744
745 #[test]
746 fn cwd_corrected_serializes_role_and_reason() {
747 let event = TeamEvent::cwd_corrected("eng-1-1", "/tmp/worktree");
748 let json = serde_json::to_string(&event).unwrap();
749 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
750 assert_eq!(parsed["event"].as_str().unwrap(), "cwd_corrected");
751 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
752 assert_eq!(parsed["reason"].as_str().unwrap(), "/tmp/worktree");
753 }
754
755 #[test]
756 fn pane_death_serializes_role() {
757 let event = TeamEvent::pane_death("eng-1-1");
758 let json = serde_json::to_string(&event).unwrap();
759 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
760 assert_eq!(parsed["event"].as_str().unwrap(), "pane_death");
761 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
762 }
763
764 #[test]
765 fn pane_respawned_serializes_role() {
766 let event = TeamEvent::pane_respawned("eng-1-1");
767 let json = serde_json::to_string(&event).unwrap();
768 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
769 assert_eq!(parsed["event"].as_str().unwrap(), "pane_respawned");
770 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
771 }
772
773 #[test]
774 fn agent_restarted_includes_reason_task_and_count() {
775 let event = TeamEvent::agent_restarted("eng-1-2", "67", "context_exhausted", 2);
776 let json = serde_json::to_string(&event).unwrap();
777 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
778 assert_eq!(parsed["event"].as_str().unwrap(), "agent_restarted");
779 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
780 assert_eq!(parsed["task"].as_str().unwrap(), "67");
781 assert_eq!(parsed["reason"].as_str().unwrap(), "context_exhausted");
782 assert_eq!(parsed["restart_count"].as_u64().unwrap(), 2);
783 }
784
785 #[test]
786 fn delivery_failed_includes_role_sender_and_reason() {
787 let event = TeamEvent::delivery_failed("eng-1-2", "manager", "message marker missing");
788 let json = serde_json::to_string(&event).unwrap();
789 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
790 assert_eq!(parsed["event"].as_str().unwrap(), "delivery_failed");
791 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
792 assert_eq!(parsed["from"].as_str().unwrap(), "manager");
793 assert_eq!(parsed["reason"].as_str().unwrap(), "message marker missing");
794 }
795
796 #[test]
797 fn task_unblocked_serializes_role_and_task() {
798 let event = TeamEvent::task_unblocked("eng-1-1", "42");
799 let json = serde_json::to_string(&event).unwrap();
800 assert!(json.contains("\"event\":\"task_unblocked\""));
801 assert!(json.contains("\"role\":\"eng-1-1\""));
802 assert!(json.contains("\"task\":\"42\""));
803 }
804
805 #[test]
806 fn merge_confidence_scored_includes_all_fields() {
807 let event = TeamEvent::merge_confidence_scored(&MergeConfidenceInfo {
808 engineer: "eng-1-1",
809 task: "42",
810 confidence: 0.85,
811 files_changed: 3,
812 lines_changed: 50,
813 has_migrations: true,
814 has_config_changes: false,
815 rename_count: 1,
816 });
817 let json = serde_json::to_string(&event).unwrap();
818 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
819 assert_eq!(parsed["event"].as_str().unwrap(), "merge_confidence_scored");
820 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
821 assert_eq!(parsed["task"].as_str().unwrap(), "42");
822 assert!((parsed["load"].as_f64().unwrap() - 0.85).abs() < 0.001);
823 let reason = parsed["reason"].as_str().unwrap();
824 assert!(reason.contains("files=3"));
825 assert!(reason.contains("lines=50"));
826 assert!(reason.contains("migrations=true"));
827 assert!(reason.contains("config=false"));
828 assert!(reason.contains("renames=1"));
829 }
830
831 #[test]
832 fn performance_regression_serializes_task_and_reason() {
833 let event = TeamEvent::performance_regression("42", "runtime_ms=1300 avg_ms=1000 pct=30");
834 let json = serde_json::to_string(&event).unwrap();
835 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
836 assert_eq!(parsed["event"].as_str().unwrap(), "performance_regression");
837 assert_eq!(parsed["task"].as_str().unwrap(), "42");
838 assert_eq!(
839 parsed["reason"].as_str().unwrap(),
840 "runtime_ms=1300 avg_ms=1000 pct=30"
841 );
842 }
843
844 #[test]
845 fn daemon_stopped_with_reason_includes_fields() {
846 let event = TeamEvent::daemon_stopped_with_reason("signal", 7200);
847 let json = serde_json::to_string(&event).unwrap();
848 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
849 assert_eq!(parsed["reason"].as_str().unwrap(), "signal");
850 assert_eq!(parsed["uptime_secs"].as_u64().unwrap(), 7200);
851 }
852
853 #[test]
854 fn heartbeat_includes_uptime() {
855 let event = TeamEvent::daemon_heartbeat(600);
856 let json = serde_json::to_string(&event).unwrap();
857 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
858 assert_eq!(parsed["event"].as_str().unwrap(), "daemon_heartbeat");
859 assert_eq!(parsed["uptime_secs"].as_u64().unwrap(), 600);
860 assert!(parsed.get("reason").is_none());
862 assert!(parsed.get("step").is_none());
863 }
864
865 #[test]
866 fn context_exhausted_includes_role_task_and_session_size() {
867 let event = TeamEvent::context_exhausted("eng-1", Some(77), Some(4096));
868 let json = serde_json::to_string(&event).unwrap();
869 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
870 assert_eq!(parsed["event"].as_str().unwrap(), "context_exhausted");
871 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1");
872 assert_eq!(parsed["task"].as_str().unwrap(), "77");
873 assert_eq!(parsed["session_size_bytes"].as_u64().unwrap(), 4096);
874 }
875
876 #[test]
877 fn loop_step_error_includes_step_and_error() {
878 let event = TeamEvent::loop_step_error("poll_watchers", "connection refused");
879 let json = serde_json::to_string(&event).unwrap();
880 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
881 assert_eq!(parsed["step"].as_str().unwrap(), "poll_watchers");
882 assert_eq!(parsed["error"].as_str().unwrap(), "connection refused");
883 }
884
885 #[test]
886 fn daemon_panic_includes_reason() {
887 let event = TeamEvent::daemon_panic("index out of bounds");
888 let json = serde_json::to_string(&event).unwrap();
889 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
890 assert_eq!(parsed["event"].as_str().unwrap(), "daemon_panic");
891 assert_eq!(parsed["reason"].as_str().unwrap(), "index out of bounds");
892 }
893
894 #[test]
895 fn new_fields_omitted_from_basic_events() {
896 let event = TeamEvent::daemon_started();
897 let json = serde_json::to_string(&event).unwrap();
898 assert!(!json.contains("\"reason\""));
899 assert!(!json.contains("\"step\""));
900 assert!(!json.contains("\"error\""));
901 assert!(!json.contains("\"uptime_secs\""));
902 assert!(!json.contains("\"restart_count\""));
903 }
904
905 #[test]
906 fn pattern_detected_includes_reason_payload() {
907 let event = TeamEvent::pattern_detected("escalation_cluster", 6);
908 let json = serde_json::to_string(&event).unwrap();
909 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
910 assert_eq!(parsed["event"].as_str().unwrap(), "pattern_detected");
911 assert_eq!(parsed["reason"].as_str().unwrap(), "escalation_cluster:6");
912 }
913
914 #[test]
915 fn event_sink_creates_parent_directories() {
916 let tmp = tempfile::tempdir().unwrap();
917 let path = tmp.path().join("deep").join("nested").join("events.jsonl");
918 let mut sink = EventSink::new(&path).unwrap();
919 sink.emit(TeamEvent::daemon_started()).unwrap();
920 assert!(path.exists());
921 assert_eq!(sink.path(), path);
922 }
923
924 #[test]
925 fn event_sink_rotates_oversized_log_on_open() {
926 let tmp = tempfile::tempdir().unwrap();
927 let path = tmp.path().join("events.jsonl");
928 fs::write(&path, "0123456789").unwrap();
929
930 let mut sink = EventSink::new_with_max_bytes(&path, 5).unwrap();
931 sink.emit(TeamEvent::daemon_started()).unwrap();
932
933 let rotated = rotated_event_log_path(&path);
934 assert_eq!(fs::read_to_string(&rotated).unwrap(), "0123456789");
935 let current = fs::read_to_string(&path).unwrap();
936 assert!(current.contains("daemon_started"));
937 }
938
939 #[test]
940 fn event_sink_rotates_before_write_that_would_exceed_threshold() {
941 let tmp = tempfile::tempdir().unwrap();
942 let path = tmp.path().join("events.jsonl");
943 let first_line = "{\"event\":\"first\"}\n";
944 fs::write(&path, first_line).unwrap();
945
946 let mut sink = EventSink::new_with_max_bytes(&path, first_line.len() as u64 + 10).unwrap();
947 sink.emit(TeamEvent::task_assigned(
948 "eng-1",
949 "this assignment is long enough to rotate",
950 ))
951 .unwrap();
952
953 let rotated = rotated_event_log_path(&path);
954 assert_eq!(fs::read_to_string(&rotated).unwrap(), first_line);
955 let current = fs::read_to_string(&path).unwrap();
956 assert!(current.contains("task_assigned"));
957 assert!(!current.contains("\"event\":\"first\""));
958 }
959
960 #[test]
961 fn event_round_trip_preserves_fields_for_agent_restarted() {
962 let original = TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 3);
963
964 let json = serde_json::to_string(&original).unwrap();
965 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
966
967 assert_eq!(parsed.event, "agent_restarted");
968 assert_eq!(parsed.role.as_deref(), Some("eng-1"));
969 assert_eq!(parsed.task.as_deref(), Some("42"));
970 assert_eq!(parsed.reason.as_deref(), Some("context_exhausted"));
971 assert_eq!(parsed.restart_count, Some(3));
972 assert_eq!(parsed.ts, original.ts);
973 }
974
975 #[test]
976 fn event_round_trip_preserves_fields_for_load_snapshot() {
977 let original = TeamEvent::load_snapshot(4, 8, true);
978
979 let json = serde_json::to_string(&original).unwrap();
980 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
981
982 assert_eq!(parsed.event, "load_snapshot");
983 assert_eq!(parsed.working_members, Some(4));
984 assert_eq!(parsed.total_members, Some(8));
985 assert_eq!(parsed.session_running, Some(true));
986 assert_eq!(parsed.load, Some(0.5));
987 assert_eq!(parsed.ts, original.ts);
988 }
989
990 #[test]
991 fn event_round_trip_preserves_fields_for_delivery_failed() {
992 let original = TeamEvent::delivery_failed("eng-2", "manager", "marker missing");
993
994 let json = serde_json::to_string(&original).unwrap();
995 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
996
997 assert_eq!(parsed.event, "delivery_failed");
998 assert_eq!(parsed.role.as_deref(), Some("eng-2"));
999 assert_eq!(parsed.from.as_deref(), Some("manager"));
1000 assert_eq!(parsed.reason.as_deref(), Some("marker missing"));
1001 assert_eq!(parsed.ts, original.ts);
1002 }
1003
1004 #[test]
1005 fn read_events_skips_blank_and_malformed_lines() {
1006 let tmp = tempfile::tempdir().unwrap();
1007 let path = tmp.path().join("events.jsonl");
1008 fs::write(
1009 &path,
1010 [
1011 "",
1012 "{\"event\":\"daemon_started\",\"ts\":1}",
1013 "not-json",
1014 " ",
1015 "{\"event\":\"daemon_stopped\",\"ts\":2}",
1016 ]
1017 .join("\n"),
1018 )
1019 .unwrap();
1020
1021 let events = read_events(&path).unwrap();
1022
1023 assert_eq!(events.len(), 2);
1024 assert_eq!(events[0].event, "daemon_started");
1025 assert_eq!(events[1].event, "daemon_stopped");
1026 }
1027
1028 #[test]
1029 fn rotate_event_log_if_needed_returns_false_for_missing_file() {
1030 let tmp = tempfile::tempdir().unwrap();
1031 let path = tmp.path().join("events.jsonl");
1032
1033 let rotated = rotate_event_log_if_needed(&path, 128, 0).unwrap();
1034
1035 assert!(!rotated);
1036 assert!(!rotated_event_log_path(&path).exists());
1037 }
1038
1039 #[test]
1040 fn rotate_event_log_if_needed_returns_false_for_empty_file() {
1041 let tmp = tempfile::tempdir().unwrap();
1042 let path = tmp.path().join("events.jsonl");
1043 fs::write(&path, "").unwrap();
1044
1045 let rotated = rotate_event_log_if_needed(&path, 1, 1).unwrap();
1046
1047 assert!(!rotated);
1048 assert!(path.exists());
1049 assert!(!rotated_event_log_path(&path).exists());
1050 }
1051
1052 #[test]
1053 fn rotate_event_log_if_needed_replaces_existing_rotated_file() {
1054 let tmp = tempfile::tempdir().unwrap();
1055 let path = tmp.path().join("events.jsonl");
1056 let rotated_path = rotated_event_log_path(&path);
1057 fs::write(&path, "current-events").unwrap();
1058 fs::write(&rotated_path, "old-rotated-events").unwrap();
1059
1060 let rotated = rotate_event_log_if_needed(&path, 5, 0).unwrap();
1061
1062 assert!(rotated);
1063 assert_eq!(fs::read_to_string(&rotated_path).unwrap(), "current-events");
1064 }
1065
1066 #[test]
1067 fn concurrent_event_sinks_append_without_losing_lines() {
1068 let tmp = tempfile::tempdir().unwrap();
1069 let path = Arc::new(tmp.path().join("events.jsonl"));
1070 let ready = Arc::new(std::sync::Barrier::new(5));
1071 let errors = Arc::new(Mutex::new(Vec::<String>::new()));
1072 let mut handles = Vec::new();
1073
1074 for idx in 0..4 {
1075 let path = Arc::clone(&path);
1076 let ready = Arc::clone(&ready);
1077 let errors = Arc::clone(&errors);
1078 handles.push(thread::spawn(move || {
1079 ready.wait();
1080 let result = (|| -> Result<()> {
1081 let mut sink = EventSink::new(&path)?;
1082 sink.emit(TeamEvent::task_assigned(
1083 &format!("eng-{idx}"),
1084 &format!("task-{idx}"),
1085 ))?;
1086 Ok(())
1087 })();
1088 if let Err(error) = result {
1089 errors.lock().unwrap().push(error.to_string());
1090 }
1091 }));
1092 }
1093
1094 ready.wait();
1095 for handle in handles {
1096 handle.join().unwrap();
1097 }
1098
1099 assert!(errors.lock().unwrap().is_empty());
1100 let events = read_events(&path).unwrap();
1101 assert_eq!(events.len(), 4);
1102 for idx in 0..4 {
1103 assert!(
1104 events
1105 .iter()
1106 .any(|event| event.role.as_deref() == Some(&format!("eng-{idx}")))
1107 );
1108 }
1109 }
1110
1111 #[test]
1112 fn read_events_handles_large_log_file() {
1113 let tmp = tempfile::tempdir().unwrap();
1114 let path = tmp.path().join("events.jsonl");
1115 let mut sink = EventSink::new(&path).unwrap();
1116
1117 for idx in 0..512 {
1118 sink.emit(TeamEvent::task_assigned(
1119 &format!("eng-{idx}"),
1120 &"x".repeat(128),
1121 ))
1122 .unwrap();
1123 }
1124
1125 let events = read_events(&path).unwrap();
1126
1127 assert_eq!(events.len(), 512);
1128 assert_eq!(events.first().unwrap().event, "task_assigned");
1129 assert_eq!(events.last().unwrap().event, "task_assigned");
1130 }
1131
1132 fn production_unwrap_expect_count(source: &str) -> usize {
1133 let prod = if let Some(pos) = source.find("\n#[cfg(test)]\nmod tests") {
1134 &source[..pos]
1135 } else {
1136 source
1137 };
1138 prod.lines()
1139 .filter(|line| {
1140 let trimmed = line.trim();
1141 !trimmed.starts_with("#[cfg(test)]")
1142 && (trimmed.contains(".unwrap(") || trimmed.contains(".expect("))
1143 })
1144 .count()
1145 }
1146
1147 #[test]
1148 fn task_completed_includes_task_id() {
1149 let event = TeamEvent::task_completed("eng-1", Some("42"));
1150 let json = serde_json::to_string(&event).unwrap();
1151 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1152 assert_eq!(parsed["event"].as_str().unwrap(), "task_completed");
1153 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1");
1154 assert_eq!(parsed["task"].as_str().unwrap(), "42");
1155 }
1156
1157 #[test]
1158 fn task_completed_without_task_id_omits_task_field() {
1159 let event = TeamEvent::task_completed("eng-1", None);
1160 let json = serde_json::to_string(&event).unwrap();
1161 assert!(json.contains("\"event\":\"task_completed\""));
1162 assert!(json.contains("\"role\":\"eng-1\""));
1163 assert!(!json.contains("\"task\""));
1164 }
1165
1166 #[test]
1167 fn task_escalated_without_reason_omits_reason_field() {
1168 let event = TeamEvent::task_escalated("eng-1", "42", None);
1169 let json = serde_json::to_string(&event).unwrap();
1170 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1171 assert_eq!(parsed["event"].as_str().unwrap(), "task_escalated");
1172 assert_eq!(parsed["task"].as_str().unwrap(), "42");
1173 assert!(parsed.get("reason").is_none());
1174 }
1175
1176 #[test]
1177 fn task_escalated_with_reason_includes_reason_field() {
1178 let event = TeamEvent::task_escalated("eng-1", "42", Some("merge_conflict"));
1179 let json = serde_json::to_string(&event).unwrap();
1180 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1181 assert_eq!(parsed["event"].as_str().unwrap(), "task_escalated");
1182 assert_eq!(parsed["task"].as_str().unwrap(), "42");
1183 assert_eq!(parsed["reason"].as_str().unwrap(), "merge_conflict");
1184 }
1185
1186 #[test]
1187 fn task_completed_round_trip_preserves_task_id() {
1188 let original = TeamEvent::task_completed("eng-1", Some("99"));
1189 let json = serde_json::to_string(&original).unwrap();
1190 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1191 assert_eq!(parsed.event, "task_completed");
1192 assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1193 assert_eq!(parsed.task.as_deref(), Some("99"));
1194 }
1195
1196 #[test]
1197 fn task_escalated_round_trip_preserves_reason() {
1198 let original = TeamEvent::task_escalated("eng-1", "42", Some("context_exhausted"));
1199 let json = serde_json::to_string(&original).unwrap();
1200 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1201 assert_eq!(parsed.event, "task_escalated");
1202 assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1203 assert_eq!(parsed.task.as_deref(), Some("42"));
1204 assert_eq!(parsed.reason.as_deref(), Some("context_exhausted"));
1205 }
1206
1207 #[test]
1208 fn production_events_has_no_unwrap_or_expect_calls() {
1209 let src = include_str!("events.rs");
1210 assert_eq!(
1211 production_unwrap_expect_count(src),
1212 0,
1213 "production events.rs should avoid unwrap/expect"
1214 );
1215 }
1216
1217 #[test]
1218 fn stall_detected_event_fields() {
1219 let event = TeamEvent::stall_detected("eng-1-1", Some(42), 300);
1220 assert_eq!(event.event, "stall_detected");
1221 assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1222 assert_eq!(event.task.as_deref(), Some("42"));
1223 assert_eq!(event.uptime_secs, Some(300));
1224 }
1225
1226 #[test]
1227 fn stall_detected_event_without_task() {
1228 let event = TeamEvent::stall_detected("eng-1-1", None, 600);
1229 assert_eq!(event.event, "stall_detected");
1230 assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1231 assert!(event.task.is_none());
1232 assert_eq!(event.uptime_secs, Some(600));
1233 }
1234
1235 #[test]
1236 fn stall_detected_event_serializes_to_jsonl() {
1237 let event = TeamEvent::stall_detected("eng-1-1", Some(42), 300);
1238 let json = serde_json::to_string(&event).unwrap();
1239 assert!(json.contains("\"stall_detected\""));
1240 assert!(json.contains("\"eng-1-1\""));
1241 assert!(json.contains("\"42\""));
1242 }
1243
1244 #[test]
1245 fn health_changed_event_fields() {
1246 let event = TeamEvent::health_changed("eng-1-1", "healthy→unreachable");
1247 assert_eq!(event.event, "health_changed");
1248 assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1249 assert_eq!(event.reason.as_deref(), Some("healthy→unreachable"));
1250 }
1251
1252 #[test]
1253 fn health_changed_event_serializes_to_jsonl() {
1254 let event = TeamEvent::health_changed("eng-1-2", "unreachable→healthy");
1255 let json = serde_json::to_string(&event).unwrap();
1256 assert!(json.contains("\"health_changed\""));
1257 assert!(json.contains("\"eng-1-2\""));
1258 }
1259
1260 #[test]
1263 fn event_sink_on_readonly_dir_returns_error() {
1264 #[cfg(unix)]
1265 {
1266 use std::os::unix::fs::PermissionsExt;
1267 let tmp = tempfile::tempdir().unwrap();
1268 let readonly_dir = tmp.path().join("readonly");
1269 fs::create_dir(&readonly_dir).unwrap();
1270 fs::set_permissions(&readonly_dir, fs::Permissions::from_mode(0o444)).unwrap();
1271
1272 let path = readonly_dir.join("subdir").join("events.jsonl");
1273 let result = EventSink::new(&path);
1274 assert!(result.is_err());
1275
1276 fs::set_permissions(&readonly_dir, fs::Permissions::from_mode(0o755)).unwrap();
1278 }
1279 }
1280
1281 #[test]
1282 fn read_events_from_nonexistent_file_returns_empty() {
1283 let tmp = tempfile::tempdir().unwrap();
1284 let path = tmp.path().join("does_not_exist.jsonl");
1285 let events = read_events(&path).unwrap();
1286 assert!(events.is_empty());
1287 }
1288
1289 #[test]
1290 fn read_events_all_malformed_lines_returns_empty() {
1291 let tmp = tempfile::tempdir().unwrap();
1292 let path = tmp.path().join("events.jsonl");
1293 fs::write(&path, "not json\nalso not json\n{invalid}\n").unwrap();
1294 let events = read_events(&path).unwrap();
1295 assert!(events.is_empty());
1296 }
1297
1298 #[test]
1299 fn event_sink_emit_with_failing_writer() {
1300 struct FailWriter;
1301 impl Write for FailWriter {
1302 fn write(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
1303 Err(std::io::Error::new(
1304 std::io::ErrorKind::BrokenPipe,
1305 "simulated write failure",
1306 ))
1307 }
1308 fn flush(&mut self) -> std::io::Result<()> {
1309 Err(std::io::Error::new(
1310 std::io::ErrorKind::BrokenPipe,
1311 "simulated flush failure",
1312 ))
1313 }
1314 }
1315
1316 let tmp = tempfile::tempdir().unwrap();
1317 let path = tmp.path().join("events.jsonl");
1318 let mut sink = EventSink::from_writer(&path, FailWriter);
1319 let result = sink.emit(TeamEvent::daemon_started());
1320 assert!(result.is_err());
1321 }
1322
1323 #[test]
1324 fn rotate_event_log_replaces_stale_rotated_file() {
1325 let tmp = tempfile::tempdir().unwrap();
1326 let path = tmp.path().join("events.jsonl");
1327 let rotated = rotated_event_log_path(&path);
1328
1329 fs::write(&path, "current-data-that-is-large").unwrap();
1330 fs::write(&rotated, "old-rotated-data").unwrap();
1331
1332 let did_rotate = rotate_event_log_if_needed(&path, 5, 0).unwrap();
1333 assert!(did_rotate);
1334 assert_eq!(
1336 fs::read_to_string(&rotated).unwrap(),
1337 "current-data-that-is-large"
1338 );
1339 assert!(!path.exists());
1341 }
1342
1343 #[test]
1344 fn event_sink_handles_zero_max_bytes_rotation() {
1345 let tmp = tempfile::tempdir().unwrap();
1346 let path = tmp.path().join("events.jsonl");
1347
1348 fs::write(&path, "").unwrap();
1350 let did_rotate = rotate_event_log_if_needed(&path, 0, 0).unwrap();
1351 assert!(!did_rotate); fs::write(&path, "x").unwrap();
1354 let did_rotate = rotate_event_log_if_needed(&path, 0, 0).unwrap();
1355 assert!(did_rotate); }
1357
1358 #[test]
1359 fn read_events_partial_json_with_valid_lines_mixed() {
1360 let tmp = tempfile::tempdir().unwrap();
1361 let path = tmp.path().join("events.jsonl");
1362 let content = format!(
1364 "{}\n{{\"event\":\"trunca\n{}\n",
1365 r#"{"event":"daemon_started","ts":1}"#, r#"{"event":"daemon_stopped","ts":3}"#
1366 );
1367 fs::write(&path, content).unwrap();
1368
1369 let events = read_events(&path).unwrap();
1370 assert_eq!(events.len(), 2);
1371 assert_eq!(events[0].event, "daemon_started");
1372 assert_eq!(events[1].event, "daemon_stopped");
1373 }
1374}