1use std::fs::{self, OpenOptions};
4use std::io::{BufWriter, Write};
5use std::path::{Path, PathBuf};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10
11use super::DEFAULT_EVENT_LOG_MAX_BYTES;
12
13pub struct MergeConfidenceInfo<'a> {
15 pub engineer: &'a str,
16 pub task: &'a str,
17 pub confidence: f64,
18 pub files_changed: usize,
19 pub lines_changed: usize,
20 pub has_migrations: bool,
21 pub has_config_changes: bool,
22 pub rename_count: usize,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26
27pub struct TeamEvent {
28 pub event: String,
29 #[serde(skip_serializing_if = "Option::is_none")]
30 pub role: Option<String>,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 pub task: Option<String>,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub recipient: Option<String>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub from: Option<String>,
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub to: Option<String>,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub restart: Option<bool>,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 pub restart_count: Option<u32>,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 pub load: Option<f64>,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub working_members: Option<u32>,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub total_members: Option<u32>,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub session_running: Option<bool>,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub reason: Option<String>,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 pub step: Option<String>,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 pub error: Option<String>,
57 #[serde(skip_serializing_if = "Option::is_none")]
58 pub uptime_secs: Option<u64>,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub session_size_bytes: Option<u64>,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub output_bytes: Option<u64>,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub filename: Option<String>,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub content_hash: Option<String>,
67 pub ts: u64,
68}
69
70impl TeamEvent {
71 fn now() -> u64 {
72 SystemTime::now()
73 .duration_since(UNIX_EPOCH)
74 .unwrap_or_default()
75 .as_secs()
76 }
77
78 fn base(event: &str) -> Self {
79 Self {
80 event: event.into(),
81 role: None,
82 task: None,
83 recipient: None,
84 from: None,
85 to: None,
86 restart: None,
87 restart_count: None,
88 load: None,
89 working_members: None,
90 total_members: None,
91 session_running: None,
92 reason: None,
93 step: None,
94 error: None,
95 uptime_secs: None,
96 session_size_bytes: None,
97 output_bytes: None,
98 filename: None,
99 content_hash: None,
100 ts: Self::now(),
101 }
102 }
103
104 pub fn daemon_started() -> Self {
105 Self::base("daemon_started")
106 }
107
108 pub fn daemon_reloading() -> Self {
109 Self::base("daemon_reloading")
110 }
111
112 pub fn daemon_reloaded() -> Self {
113 Self::base("daemon_reloaded")
114 }
115
116 pub fn daemon_stopped() -> Self {
117 Self::base("daemon_stopped")
118 }
119
120 pub fn daemon_stopped_with_reason(reason: &str, uptime_secs: u64) -> Self {
121 Self {
122 reason: Some(reason.into()),
123 uptime_secs: Some(uptime_secs),
124 ..Self::base("daemon_stopped")
125 }
126 }
127
128 pub fn daemon_heartbeat(uptime_secs: u64) -> Self {
129 Self {
130 uptime_secs: Some(uptime_secs),
131 ..Self::base("daemon_heartbeat")
132 }
133 }
134
135 pub fn context_exhausted(
136 role: &str,
137 task: Option<u32>,
138 session_size_bytes: Option<u64>,
139 ) -> Self {
140 Self {
141 role: Some(role.into()),
142 task: task.map(|task_id| task_id.to_string()),
143 session_size_bytes,
144 ..Self::base("context_exhausted")
145 }
146 }
147
148 pub fn loop_step_error(step: &str, error: &str) -> Self {
149 Self {
150 step: Some(step.into()),
151 error: Some(error.into()),
152 ..Self::base("loop_step_error")
153 }
154 }
155
156 pub fn daemon_panic(reason: &str) -> Self {
157 Self {
158 reason: Some(reason.into()),
159 ..Self::base("daemon_panic")
160 }
161 }
162
163 pub fn task_assigned(role: &str, task: &str) -> Self {
164 Self {
165 role: Some(role.into()),
166 task: Some(task.into()),
167 ..Self::base("task_assigned")
168 }
169 }
170
171 pub fn cwd_corrected(role: &str, path: &str) -> Self {
172 Self {
173 role: Some(role.into()),
174 reason: Some(path.into()),
175 ..Self::base("cwd_corrected")
176 }
177 }
178
179 pub fn review_nudge_sent(role: &str, task: &str) -> Self {
180 Self {
181 role: Some(role.into()),
182 task: Some(task.into()),
183 ..Self::base("review_nudge_sent")
184 }
185 }
186
187 pub fn review_escalated(task: &str, reason: &str) -> Self {
188 Self {
189 task: Some(task.into()),
190 reason: Some(reason.into()),
191 ..Self::base("review_escalated")
192 }
193 }
194
195 pub fn state_reconciliation(role: Option<&str>, task: Option<&str>, correction: &str) -> Self {
196 Self {
197 role: role.map(str::to_string),
198 task: task.map(str::to_string),
199 reason: Some(correction.into()),
200 ..Self::base("state_reconciliation")
201 }
202 }
203
204 pub fn task_escalated(role: &str, task: &str, reason: Option<&str>) -> Self {
205 Self {
206 role: Some(role.into()),
207 task: Some(task.into()),
208 reason: reason.map(|r| r.into()),
209 ..Self::base("task_escalated")
210 }
211 }
212
213 pub fn task_unblocked(role: &str, task: &str) -> Self {
214 Self {
215 role: Some(role.into()),
216 task: Some(task.into()),
217 ..Self::base("task_unblocked")
218 }
219 }
220
221 pub fn performance_regression(task: &str, reason: &str) -> Self {
222 Self {
223 task: Some(task.into()),
224 reason: Some(reason.into()),
225 ..Self::base("performance_regression")
226 }
227 }
228
229 pub fn task_completed(role: &str, task: Option<&str>) -> Self {
230 Self {
231 role: Some(role.into()),
232 task: task.map(|t| t.into()),
233 ..Self::base("task_completed")
234 }
235 }
236
237 pub fn standup_generated(recipient: &str) -> Self {
238 Self {
239 recipient: Some(recipient.into()),
240 ..Self::base("standup_generated")
241 }
242 }
243
244 pub fn retro_generated() -> Self {
245 Self::base("retro_generated")
246 }
247
248 pub fn pattern_detected(pattern_type: &str, frequency: u32) -> Self {
249 Self {
250 reason: Some(format!("{pattern_type}:{frequency}")),
251 ..Self::base("pattern_detected")
252 }
253 }
254
255 pub fn member_crashed(role: &str, restart: bool) -> Self {
256 Self {
257 role: Some(role.into()),
258 restart: Some(restart),
259 ..Self::base("member_crashed")
260 }
261 }
262
263 pub fn pane_death(role: &str) -> Self {
264 Self {
265 role: Some(role.into()),
266 ..Self::base("pane_death")
267 }
268 }
269
270 pub fn pane_respawned(role: &str) -> Self {
271 Self {
272 role: Some(role.into()),
273 ..Self::base("pane_respawned")
274 }
275 }
276
277 pub fn narration_detected(role: &str, task: Option<u32>) -> Self {
278 Self {
279 role: Some(role.into()),
280 task: task.map(|id| id.to_string()),
281 ..Self::base("narration_detected")
282 }
283 }
284
285 pub fn narration_rejection(role: &str, task_id: u32, rejection_count: u32) -> Self {
286 Self {
287 role: Some(role.into()),
288 task: Some(task_id.to_string()),
289 reason: Some(format!("rejection_count={rejection_count}")),
290 ..Self::base("narration_rejection")
291 }
292 }
293
294 pub fn context_pressure_warning(
295 role: &str,
296 task: Option<u32>,
297 output_bytes: u64,
298 threshold_bytes: u64,
299 ) -> Self {
300 Self {
301 role: Some(role.into()),
302 task: task.map(|id| id.to_string()),
303 output_bytes: Some(output_bytes),
304 reason: Some(format!("threshold_bytes={threshold_bytes}")),
305 ..Self::base("context_pressure_warning")
306 }
307 }
308
309 pub fn stall_detected(role: &str, task: Option<u32>, stall_duration_secs: u64) -> Self {
310 Self {
311 role: Some(role.into()),
312 task: task.map(|id| id.to_string()),
313 uptime_secs: Some(stall_duration_secs),
314 ..Self::base("stall_detected")
315 }
316 }
317
318 pub fn health_changed(role: &str, reason: &str) -> Self {
322 Self {
323 role: Some(role.into()),
324 reason: Some(reason.into()),
325 ..Self::base("health_changed")
326 }
327 }
328
329 pub fn message_routed(from: &str, to: &str) -> Self {
330 Self {
331 from: Some(from.into()),
332 to: Some(to.into()),
333 ..Self::base("message_routed")
334 }
335 }
336
337 pub fn agent_spawned(role: &str) -> Self {
338 Self {
339 role: Some(role.into()),
340 ..Self::base("agent_spawned")
341 }
342 }
343
344 pub fn agent_restarted(role: &str, task: &str, reason: &str, restart_count: u32) -> Self {
345 Self {
346 role: Some(role.into()),
347 task: Some(task.into()),
348 reason: Some(reason.into()),
349 restart_count: Some(restart_count),
350 ..Self::base("agent_restarted")
351 }
352 }
353
354 pub fn task_resumed(role: &str, task: &str, reason: &str, restart_count: u32) -> Self {
355 Self {
356 role: Some(role.into()),
357 task: Some(task.into()),
358 reason: Some(reason.into()),
359 restart_count: Some(restart_count),
360 ..Self::base("task_resumed")
361 }
362 }
363
364 pub fn delivery_failed(role: &str, from: &str, reason: &str) -> Self {
365 Self {
366 role: Some(role.into()),
367 from: Some(from.into()),
368 reason: Some(reason.into()),
369 ..Self::base("delivery_failed")
370 }
371 }
372
373 pub fn task_auto_merged(
374 engineer: &str,
375 task: &str,
376 confidence: f64,
377 files_changed: usize,
378 lines_changed: usize,
379 ) -> Self {
380 Self {
381 role: Some(engineer.into()),
382 task: Some(task.into()),
383 load: Some(confidence),
384 reason: Some(format!("files={} lines={}", files_changed, lines_changed)),
385 ..Self::base("task_auto_merged")
386 }
387 }
388
389 pub fn task_manual_merged(task: &str) -> Self {
390 Self {
391 task: Some(task.into()),
392 ..Self::base("task_manual_merged")
393 }
394 }
395
396 pub fn merge_confidence_scored(info: &MergeConfidenceInfo<'_>) -> Self {
398 let detail = format!(
399 "files={} lines={} migrations={} config={} renames={}",
400 info.files_changed,
401 info.lines_changed,
402 info.has_migrations,
403 info.has_config_changes,
404 info.rename_count
405 );
406 Self {
407 role: Some(info.engineer.into()),
408 task: Some(info.task.into()),
409 load: Some(info.confidence),
410 reason: Some(detail),
411 ..Self::base("merge_confidence_scored")
412 }
413 }
414
415 pub fn review_escalated_by_role(role: &str, task: &str) -> Self {
416 Self {
417 role: Some(role.into()),
418 task: Some(task.into()),
419 ..Self::base("review_escalated")
420 }
421 }
422
423 pub fn task_reworked(role: &str, task: &str) -> Self {
424 Self {
425 role: Some(role.into()),
426 task: Some(task.into()),
427 ..Self::base("task_reworked")
428 }
429 }
430
431 pub fn task_recycled(task_id: u32, cron_expr: &str) -> Self {
432 Self {
433 task: Some(format!("#{task_id}")),
434 reason: Some(cron_expr.into()),
435 ..Self::base("task_recycled")
436 }
437 }
438
439 pub fn barrier_artifact_created(role: &str, filename: &str, content_hash: &str) -> Self {
440 Self {
441 role: Some(role.into()),
442 filename: Some(filename.into()),
443 content_hash: Some(content_hash.into()),
444 ..Self::base("barrier_artifact_created")
445 }
446 }
447
448 pub fn barrier_artifact_read(role: &str, filename: &str, content_hash: &str) -> Self {
449 Self {
450 role: Some(role.into()),
451 filename: Some(filename.into()),
452 content_hash: Some(content_hash.into()),
453 ..Self::base("barrier_artifact_read")
454 }
455 }
456
457 pub fn barrier_violation_attempt(role: &str, filename: &str, reason: &str) -> Self {
458 Self {
459 role: Some(role.into()),
460 filename: Some(filename.into()),
461 reason: Some(reason.into()),
462 ..Self::base("barrier_violation_attempt")
463 }
464 }
465
466 pub fn load_snapshot(working_members: u32, total_members: u32, session_running: bool) -> Self {
467 let load = if total_members == 0 {
468 0.0
469 } else {
470 working_members as f64 / total_members as f64
471 };
472 Self {
473 load: Some(load),
474 working_members: Some(working_members),
475 total_members: Some(total_members),
476 session_running: Some(session_running),
477 ..Self::base("load_snapshot")
478 }
479 }
480
481 pub fn parity_updated(summary: &crate::team::parity::ParitySummary) -> Self {
482 Self {
483 load: Some(summary.overall_parity_pct as f64 / 100.0),
484 reason: Some(format!(
485 "total={} spec={} tests={} implementation={} verified_pass={} verified_fail={}",
486 summary.total_behaviors,
487 summary.spec_complete,
488 summary.tests_complete,
489 summary.implementation_complete,
490 summary.verified_pass,
491 summary.verified_fail
492 )),
493 ..Self::base("parity_updated")
494 }
495 }
496
497 pub fn worktree_reconciled(role: &str, branch: &str) -> Self {
498 Self {
499 role: Some(role.into()),
500 reason: Some(format!("branch '{branch}' merged into main")),
501 ..Self::base("worktree_reconciled")
502 }
503 }
504
505 pub fn topology_changed(added: u32, removed: u32, reason: &str) -> Self {
509 Self {
510 working_members: Some(added),
511 total_members: Some(removed),
512 reason: Some(reason.into()),
513 ..Self::base("topology_changed")
514 }
515 }
516
517 pub fn agent_removed(role: &str, reason: &str) -> Self {
519 Self {
520 role: Some(role.into()),
521 reason: Some(reason.into()),
522 ..Self::base("agent_removed")
523 }
524 }
525}
526
527pub struct EventSink {
528 writer: Box<dyn Write + Send>,
529 path: PathBuf,
530 max_bytes: Option<u64>,
531}
532
533impl EventSink {
534 pub fn new(path: &Path) -> Result<Self> {
535 Self::new_with_max_bytes(path, DEFAULT_EVENT_LOG_MAX_BYTES)
536 }
537
538 pub fn new_with_max_bytes(path: &Path, max_bytes: u64) -> Result<Self> {
539 if let Some(parent) = path.parent() {
540 std::fs::create_dir_all(parent)?;
541 }
542 rotate_event_log_if_needed(path, max_bytes, 0)?;
543 let file = OpenOptions::new()
544 .create(true)
545 .append(true)
546 .open(path)
547 .with_context(|| format!("failed to open event sink: {}", path.display()))?;
548 Ok(Self {
549 writer: Box::new(BufWriter::new(file)),
550 path: path.to_path_buf(),
551 max_bytes: Some(max_bytes),
552 })
553 }
554
555 #[cfg(test)]
556 pub(crate) fn from_writer(path: &Path, writer: impl Write + Send + 'static) -> Self {
557 Self {
558 writer: Box::new(writer),
559 path: path.to_path_buf(),
560 max_bytes: None,
561 }
562 }
563
564 pub fn emit(&mut self, event: TeamEvent) -> Result<()> {
565 let json = serde_json::to_string(&event)?;
566 self.rotate_if_needed((json.len() + 1) as u64)?;
567 writeln!(self.writer, "{json}")?;
568 self.writer.flush()?;
569 Ok(())
570 }
571
572 pub fn path(&self) -> &Path {
573 &self.path
574 }
575
576 fn rotate_if_needed(&mut self, next_entry_bytes: u64) -> Result<()> {
577 let Some(max_bytes) = self.max_bytes else {
578 return Ok(());
579 };
580 self.writer.flush()?;
581 if rotate_event_log_if_needed(&self.path, max_bytes, next_entry_bytes)? {
582 self.writer = Box::new(BufWriter::new(
583 OpenOptions::new()
584 .create(true)
585 .append(true)
586 .open(&self.path)
587 .with_context(|| {
588 format!("failed to reopen event sink: {}", self.path.display())
589 })?,
590 ));
591 }
592 Ok(())
593 }
594}
595
596fn rotated_event_log_path(path: &Path) -> PathBuf {
597 PathBuf::from(format!("{}.1", path.display()))
598}
599
600fn rotate_event_log_if_needed(path: &Path, max_bytes: u64, next_entry_bytes: u64) -> Result<bool> {
601 let len = match fs::metadata(path) {
602 Ok(metadata) => metadata.len(),
603 Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(false),
604 Err(error) => {
605 return Err(error).with_context(|| format!("failed to stat {}", path.display()));
606 }
607 };
608
609 if len == 0 {
610 return Ok(false);
611 }
612
613 if len.saturating_add(next_entry_bytes) <= max_bytes {
614 return Ok(false);
615 }
616
617 let rotated = rotated_event_log_path(path);
618 if rotated.exists() {
619 fs::remove_file(&rotated)
620 .with_context(|| format!("failed to remove {}", rotated.display()))?;
621 }
622 fs::rename(path, &rotated).with_context(|| {
623 format!(
624 "failed to rotate event log {} to {}",
625 path.display(),
626 rotated.display()
627 )
628 })?;
629 Ok(true)
630}
631
632pub fn read_events(path: &Path) -> Result<Vec<TeamEvent>> {
633 if !path.exists() {
634 return Ok(Vec::new());
635 }
636 let content = fs::read_to_string(path).context("failed to read event log")?;
637 let mut events = Vec::new();
638 for line in content.lines() {
639 let line = line.trim();
640 if line.is_empty() {
641 continue;
642 }
643 if let Ok(event) = serde_json::from_str::<TeamEvent>(line) {
644 events.push(event);
645 }
646 }
647 Ok(events)
648}
649
650#[cfg(test)]
651mod tests {
652 use std::sync::{Arc, Mutex};
653 use std::thread;
654
655 use super::*;
656
657 #[test]
658 fn event_serializes_to_json() {
659 let event = TeamEvent::task_assigned("eng-1-1", "fix auth bug");
660 let json = serde_json::to_string(&event).unwrap();
661 assert!(json.contains("\"event\":\"task_assigned\""));
662 assert!(json.contains("\"role\":\"eng-1-1\""));
663 assert!(json.contains("\"task\":\"fix auth bug\""));
664 assert!(json.contains("\"ts\":"));
665 }
666
667 #[test]
668 fn optional_fields_omitted() {
669 let event = TeamEvent::daemon_started();
670 let json = serde_json::to_string(&event).unwrap();
671 assert!(!json.contains("\"role\""));
672 assert!(!json.contains("\"task\""));
673 }
674
675 #[test]
676 fn event_sink_writes_jsonl() {
677 let tmp = tempfile::tempdir().unwrap();
678 let path = tmp.path().join("events.jsonl");
679 let mut sink = EventSink::new(&path).unwrap();
680 sink.emit(TeamEvent::daemon_started()).unwrap();
681 sink.emit(TeamEvent::task_assigned("eng-1", "fix bug"))
682 .unwrap();
683 sink.emit(TeamEvent::daemon_stopped()).unwrap();
684
685 let content = std::fs::read_to_string(&path).unwrap();
686 let lines: Vec<&str> = content.lines().collect();
687 assert_eq!(lines.len(), 3);
688 assert!(lines[0].contains("daemon_started"));
689 assert!(lines[1].contains("task_assigned"));
690 assert!(lines[2].contains("daemon_stopped"));
691 }
692
693 #[test]
694 fn all_event_variants_serialize_with_correct_event_field() {
695 let variants: Vec<(&str, TeamEvent)> = vec![
696 ("daemon_started", TeamEvent::daemon_started()),
697 ("daemon_reloading", TeamEvent::daemon_reloading()),
698 ("daemon_reloaded", TeamEvent::daemon_reloaded()),
699 ("daemon_stopped", TeamEvent::daemon_stopped()),
700 (
701 "daemon_stopped",
702 TeamEvent::daemon_stopped_with_reason("signal", 3600),
703 ),
704 ("daemon_heartbeat", TeamEvent::daemon_heartbeat(120)),
705 (
706 "context_exhausted",
707 TeamEvent::context_exhausted("eng-1", Some(42), Some(1_024)),
708 ),
709 (
710 "loop_step_error",
711 TeamEvent::loop_step_error("poll_watchers", "tmux error"),
712 ),
713 (
714 "daemon_panic",
715 TeamEvent::daemon_panic("index out of bounds"),
716 ),
717 ("task_assigned", TeamEvent::task_assigned("eng-1", "task")),
718 (
719 "cwd_corrected",
720 TeamEvent::cwd_corrected("eng-1", "/tmp/worktree"),
721 ),
722 (
723 "task_escalated",
724 TeamEvent::task_escalated("eng-1", "task", None),
725 ),
726 ("task_unblocked", TeamEvent::task_unblocked("eng-1", "task")),
727 (
728 "performance_regression",
729 TeamEvent::performance_regression("42", "runtime_ms=1300 avg_ms=1000 pct=30"),
730 ),
731 (
732 "task_completed",
733 TeamEvent::task_completed("eng-1", Some("42")),
734 ),
735 ("standup_generated", TeamEvent::standup_generated("manager")),
736 ("retro_generated", TeamEvent::retro_generated()),
737 (
738 "pattern_detected",
739 TeamEvent::pattern_detected("merge_conflict_recurrence", 5),
740 ),
741 ("member_crashed", TeamEvent::member_crashed("eng-1", true)),
742 ("pane_death", TeamEvent::pane_death("eng-1")),
743 ("pane_respawned", TeamEvent::pane_respawned("eng-1")),
744 (
745 "context_pressure_warning",
746 TeamEvent::context_pressure_warning("eng-1", Some(42), 400_000, 512_000),
747 ),
748 ("message_routed", TeamEvent::message_routed("a", "b")),
749 ("agent_spawned", TeamEvent::agent_spawned("eng-1")),
750 (
751 "agent_restarted",
752 TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 1),
753 ),
754 (
755 "delivery_failed",
756 TeamEvent::delivery_failed("eng-1", "manager", "message marker missing"),
757 ),
758 (
759 "task_auto_merged",
760 TeamEvent::task_auto_merged("eng-1", "42", 0.95, 2, 30),
761 ),
762 ("task_manual_merged", TeamEvent::task_manual_merged("42")),
763 (
764 "merge_confidence_scored",
765 TeamEvent::merge_confidence_scored(&MergeConfidenceInfo {
766 engineer: "eng-1",
767 task: "42",
768 confidence: 0.85,
769 files_changed: 3,
770 lines_changed: 50,
771 has_migrations: false,
772 has_config_changes: false,
773 rename_count: 0,
774 }),
775 ),
776 (
777 "review_nudge_sent",
778 TeamEvent::review_nudge_sent("manager", "42"),
779 ),
780 (
781 "review_escalated",
782 TeamEvent::review_escalated("42", "stale review"),
783 ),
784 (
785 "state_reconciliation",
786 TeamEvent::state_reconciliation(Some("eng-1"), Some("42"), "adopt"),
787 ),
788 ("task_reworked", TeamEvent::task_reworked("eng-1", "42")),
789 ("load_snapshot", TeamEvent::load_snapshot(2, 5, true)),
790 (
791 "parity_updated",
792 TeamEvent::parity_updated(&crate::team::parity::ParitySummary {
793 total_behaviors: 10,
794 spec_complete: 8,
795 tests_complete: 6,
796 implementation_complete: 5,
797 verified_pass: 4,
798 verified_fail: 1,
799 overall_parity_pct: 40,
800 }),
801 ),
802 (
803 "worktree_reconciled",
804 TeamEvent::worktree_reconciled("eng-1", "eng-1/42"),
805 ),
806 ];
807 for (expected_event, event) in &variants {
808 let json = serde_json::to_string(event).unwrap();
809 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
810 assert_eq!(parsed["event"].as_str().unwrap(), *expected_event);
811 assert!(parsed["ts"].as_u64().is_some());
812 }
813 }
814
815 #[test]
816 fn load_snapshot_serializes_optional_metrics() {
817 let event = TeamEvent::load_snapshot(3, 7, false);
818 let json = serde_json::to_string(&event).unwrap();
819 assert!(json.contains("\"event\":\"load_snapshot\""));
820 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
821 assert_eq!(parsed["load"].as_f64().unwrap(), 3.0 / 7.0);
822 assert_eq!(parsed["working_members"].as_u64().unwrap(), 3);
823 assert_eq!(parsed["total_members"].as_u64().unwrap(), 7);
824 assert!(!parsed["session_running"].as_bool().unwrap());
825 }
826
827 #[test]
828 fn parity_updated_serializes_summary_metrics() {
829 let event = TeamEvent::parity_updated(&crate::team::parity::ParitySummary {
830 total_behaviors: 10,
831 spec_complete: 8,
832 tests_complete: 6,
833 implementation_complete: 5,
834 verified_pass: 4,
835 verified_fail: 1,
836 overall_parity_pct: 40,
837 });
838 let json = serde_json::to_string(&event).unwrap();
839 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
840 assert_eq!(parsed["event"].as_str().unwrap(), "parity_updated");
841 assert_eq!(parsed["load"].as_f64().unwrap(), 0.4);
842 let reason = parsed["reason"].as_str().unwrap();
843 assert!(reason.contains("total=10"));
844 assert!(reason.contains("spec=8"));
845 assert!(reason.contains("verified_pass=4"));
846 }
847
848 #[test]
849 fn read_events_parses_all_known_lines() {
850 let tmp = tempfile::tempdir().unwrap();
851 let path = tmp.path().join("events.jsonl");
852 let mut sink = EventSink::new(&path).unwrap();
853 sink.emit(TeamEvent::daemon_started()).unwrap();
854 sink.emit(TeamEvent::load_snapshot(1, 4, true)).unwrap();
855 sink.emit(TeamEvent::load_snapshot(2, 4, true)).unwrap();
856
857 let events = read_events(&path).unwrap();
858 assert_eq!(events.len(), 3);
859 assert_eq!(events[1].event, "load_snapshot");
860 assert_eq!(events[1].working_members, Some(1));
861 assert_eq!(events[2].total_members, Some(4));
862 }
863
864 #[test]
865 fn event_sink_appends_to_existing_file() {
866 let tmp = tempfile::tempdir().unwrap();
867 let path = tmp.path().join("events.jsonl");
868
869 {
871 let mut sink = EventSink::new(&path).unwrap();
872 sink.emit(TeamEvent::daemon_started()).unwrap();
873 }
874
875 {
877 let mut sink = EventSink::new(&path).unwrap();
878 sink.emit(TeamEvent::daemon_stopped()).unwrap();
879 }
880
881 let content = std::fs::read_to_string(&path).unwrap();
882 let lines: Vec<&str> = content.lines().collect();
883 assert_eq!(lines.len(), 2);
884 assert!(lines[0].contains("daemon_started"));
885 assert!(lines[1].contains("daemon_stopped"));
886 }
887
888 #[test]
889 fn event_with_special_characters_in_task() {
890 let event = TeamEvent::task_assigned("eng-1", "fix: \"quotes\" & <angles> / \\ newline\n");
891 let json = serde_json::to_string(&event).unwrap();
892 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
893 let task_val = parsed["task"].as_str().unwrap();
894 assert!(task_val.contains("\"quotes\""));
895 assert!(task_val.contains("<angles>"));
896 }
897
898 #[test]
899 fn task_escalated_serializes_role_and_task() {
900 let event = TeamEvent::task_escalated("eng-1-1", "42", Some("tests_failed"));
901 let json = serde_json::to_string(&event).unwrap();
902 assert!(json.contains("\"event\":\"task_escalated\""));
903 assert!(json.contains("\"role\":\"eng-1-1\""));
904 assert!(json.contains("\"task\":\"42\""));
905 assert!(json.contains("\"reason\":\"tests_failed\""));
906 }
907
908 #[test]
909 fn cwd_corrected_serializes_role_and_reason() {
910 let event = TeamEvent::cwd_corrected("eng-1-1", "/tmp/worktree");
911 let json = serde_json::to_string(&event).unwrap();
912 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
913 assert_eq!(parsed["event"].as_str().unwrap(), "cwd_corrected");
914 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
915 assert_eq!(parsed["reason"].as_str().unwrap(), "/tmp/worktree");
916 }
917
918 #[test]
919 fn pane_death_serializes_role() {
920 let event = TeamEvent::pane_death("eng-1-1");
921 let json = serde_json::to_string(&event).unwrap();
922 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
923 assert_eq!(parsed["event"].as_str().unwrap(), "pane_death");
924 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
925 }
926
927 #[test]
928 fn pane_respawned_serializes_role() {
929 let event = TeamEvent::pane_respawned("eng-1-1");
930 let json = serde_json::to_string(&event).unwrap();
931 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
932 assert_eq!(parsed["event"].as_str().unwrap(), "pane_respawned");
933 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
934 }
935
936 #[test]
937 fn agent_restarted_includes_reason_task_and_count() {
938 let event = TeamEvent::agent_restarted("eng-1-2", "67", "context_exhausted", 2);
939 let json = serde_json::to_string(&event).unwrap();
940 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
941 assert_eq!(parsed["event"].as_str().unwrap(), "agent_restarted");
942 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
943 assert_eq!(parsed["task"].as_str().unwrap(), "67");
944 assert_eq!(parsed["reason"].as_str().unwrap(), "context_exhausted");
945 assert_eq!(parsed["restart_count"].as_u64().unwrap(), 2);
946 }
947
948 #[test]
949 fn context_pressure_warning_includes_threshold_and_output_bytes() {
950 let event = TeamEvent::context_pressure_warning("eng-1-2", Some(67), 420_000, 512_000);
951 let json = serde_json::to_string(&event).unwrap();
952 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
953
954 assert_eq!(
955 parsed["event"].as_str().unwrap(),
956 "context_pressure_warning"
957 );
958 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
959 assert_eq!(parsed["task"].as_str().unwrap(), "67");
960 assert_eq!(parsed["output_bytes"].as_u64().unwrap(), 420_000);
961 assert_eq!(parsed["reason"].as_str().unwrap(), "threshold_bytes=512000");
962 }
963
964 #[test]
965 fn delivery_failed_includes_role_sender_and_reason() {
966 let event = TeamEvent::delivery_failed("eng-1-2", "manager", "message marker missing");
967 let json = serde_json::to_string(&event).unwrap();
968 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
969 assert_eq!(parsed["event"].as_str().unwrap(), "delivery_failed");
970 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-2");
971 assert_eq!(parsed["from"].as_str().unwrap(), "manager");
972 assert_eq!(parsed["reason"].as_str().unwrap(), "message marker missing");
973 }
974
975 #[test]
976 fn task_unblocked_serializes_role_and_task() {
977 let event = TeamEvent::task_unblocked("eng-1-1", "42");
978 let json = serde_json::to_string(&event).unwrap();
979 assert!(json.contains("\"event\":\"task_unblocked\""));
980 assert!(json.contains("\"role\":\"eng-1-1\""));
981 assert!(json.contains("\"task\":\"42\""));
982 }
983
984 #[test]
985 fn merge_confidence_scored_includes_all_fields() {
986 let event = TeamEvent::merge_confidence_scored(&MergeConfidenceInfo {
987 engineer: "eng-1-1",
988 task: "42",
989 confidence: 0.85,
990 files_changed: 3,
991 lines_changed: 50,
992 has_migrations: true,
993 has_config_changes: false,
994 rename_count: 1,
995 });
996 let json = serde_json::to_string(&event).unwrap();
997 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
998 assert_eq!(parsed["event"].as_str().unwrap(), "merge_confidence_scored");
999 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1-1");
1000 assert_eq!(parsed["task"].as_str().unwrap(), "42");
1001 assert!((parsed["load"].as_f64().unwrap() - 0.85).abs() < 0.001);
1002 let reason = parsed["reason"].as_str().unwrap();
1003 assert!(reason.contains("files=3"));
1004 assert!(reason.contains("lines=50"));
1005 assert!(reason.contains("migrations=true"));
1006 assert!(reason.contains("config=false"));
1007 assert!(reason.contains("renames=1"));
1008 }
1009
1010 #[test]
1011 fn performance_regression_serializes_task_and_reason() {
1012 let event = TeamEvent::performance_regression("42", "runtime_ms=1300 avg_ms=1000 pct=30");
1013 let json = serde_json::to_string(&event).unwrap();
1014 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1015 assert_eq!(parsed["event"].as_str().unwrap(), "performance_regression");
1016 assert_eq!(parsed["task"].as_str().unwrap(), "42");
1017 assert_eq!(
1018 parsed["reason"].as_str().unwrap(),
1019 "runtime_ms=1300 avg_ms=1000 pct=30"
1020 );
1021 }
1022
1023 #[test]
1024 fn daemon_stopped_with_reason_includes_fields() {
1025 let event = TeamEvent::daemon_stopped_with_reason("signal", 7200);
1026 let json = serde_json::to_string(&event).unwrap();
1027 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1028 assert_eq!(parsed["reason"].as_str().unwrap(), "signal");
1029 assert_eq!(parsed["uptime_secs"].as_u64().unwrap(), 7200);
1030 }
1031
1032 #[test]
1033 fn heartbeat_includes_uptime() {
1034 let event = TeamEvent::daemon_heartbeat(600);
1035 let json = serde_json::to_string(&event).unwrap();
1036 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1037 assert_eq!(parsed["event"].as_str().unwrap(), "daemon_heartbeat");
1038 assert_eq!(parsed["uptime_secs"].as_u64().unwrap(), 600);
1039 assert!(parsed.get("reason").is_none());
1041 assert!(parsed.get("step").is_none());
1042 }
1043
1044 #[test]
1045 fn context_exhausted_includes_role_task_and_session_size() {
1046 let event = TeamEvent::context_exhausted("eng-1", Some(77), Some(4096));
1047 let json = serde_json::to_string(&event).unwrap();
1048 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1049 assert_eq!(parsed["event"].as_str().unwrap(), "context_exhausted");
1050 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1");
1051 assert_eq!(parsed["task"].as_str().unwrap(), "77");
1052 assert_eq!(parsed["session_size_bytes"].as_u64().unwrap(), 4096);
1053 }
1054
1055 #[test]
1056 fn loop_step_error_includes_step_and_error() {
1057 let event = TeamEvent::loop_step_error("poll_watchers", "connection refused");
1058 let json = serde_json::to_string(&event).unwrap();
1059 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1060 assert_eq!(parsed["step"].as_str().unwrap(), "poll_watchers");
1061 assert_eq!(parsed["error"].as_str().unwrap(), "connection refused");
1062 }
1063
1064 #[test]
1065 fn daemon_panic_includes_reason() {
1066 let event = TeamEvent::daemon_panic("index out of bounds");
1067 let json = serde_json::to_string(&event).unwrap();
1068 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1069 assert_eq!(parsed["event"].as_str().unwrap(), "daemon_panic");
1070 assert_eq!(parsed["reason"].as_str().unwrap(), "index out of bounds");
1071 }
1072
1073 #[test]
1074 fn new_fields_omitted_from_basic_events() {
1075 let event = TeamEvent::daemon_started();
1076 let json = serde_json::to_string(&event).unwrap();
1077 assert!(!json.contains("\"reason\""));
1078 assert!(!json.contains("\"step\""));
1079 assert!(!json.contains("\"error\""));
1080 assert!(!json.contains("\"uptime_secs\""));
1081 assert!(!json.contains("\"restart_count\""));
1082 }
1083
1084 #[test]
1085 fn pattern_detected_includes_reason_payload() {
1086 let event = TeamEvent::pattern_detected("escalation_cluster", 6);
1087 let json = serde_json::to_string(&event).unwrap();
1088 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1089 assert_eq!(parsed["event"].as_str().unwrap(), "pattern_detected");
1090 assert_eq!(parsed["reason"].as_str().unwrap(), "escalation_cluster:6");
1091 }
1092
1093 #[test]
1094 fn event_sink_creates_parent_directories() {
1095 let tmp = tempfile::tempdir().unwrap();
1096 let path = tmp.path().join("deep").join("nested").join("events.jsonl");
1097 let mut sink = EventSink::new(&path).unwrap();
1098 sink.emit(TeamEvent::daemon_started()).unwrap();
1099 assert!(path.exists());
1100 assert_eq!(sink.path(), path);
1101 }
1102
1103 #[test]
1104 fn event_sink_rotates_oversized_log_on_open() {
1105 let tmp = tempfile::tempdir().unwrap();
1106 let path = tmp.path().join("events.jsonl");
1107 fs::write(&path, "0123456789").unwrap();
1108
1109 let mut sink = EventSink::new_with_max_bytes(&path, 5).unwrap();
1110 sink.emit(TeamEvent::daemon_started()).unwrap();
1111
1112 let rotated = rotated_event_log_path(&path);
1113 assert_eq!(fs::read_to_string(&rotated).unwrap(), "0123456789");
1114 let current = fs::read_to_string(&path).unwrap();
1115 assert!(current.contains("daemon_started"));
1116 }
1117
1118 #[test]
1119 fn event_sink_rotates_before_write_that_would_exceed_threshold() {
1120 let tmp = tempfile::tempdir().unwrap();
1121 let path = tmp.path().join("events.jsonl");
1122 let first_line = "{\"event\":\"first\"}\n";
1123 fs::write(&path, first_line).unwrap();
1124
1125 let mut sink = EventSink::new_with_max_bytes(&path, first_line.len() as u64 + 10).unwrap();
1126 sink.emit(TeamEvent::task_assigned(
1127 "eng-1",
1128 "this assignment is long enough to rotate",
1129 ))
1130 .unwrap();
1131
1132 let rotated = rotated_event_log_path(&path);
1133 assert_eq!(fs::read_to_string(&rotated).unwrap(), first_line);
1134 let current = fs::read_to_string(&path).unwrap();
1135 assert!(current.contains("task_assigned"));
1136 assert!(!current.contains("\"event\":\"first\""));
1137 }
1138
1139 #[test]
1140 fn event_round_trip_preserves_fields_for_agent_restarted() {
1141 let original = TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 3);
1142
1143 let json = serde_json::to_string(&original).unwrap();
1144 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1145
1146 assert_eq!(parsed.event, "agent_restarted");
1147 assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1148 assert_eq!(parsed.task.as_deref(), Some("42"));
1149 assert_eq!(parsed.reason.as_deref(), Some("context_exhausted"));
1150 assert_eq!(parsed.restart_count, Some(3));
1151 assert_eq!(parsed.ts, original.ts);
1152 }
1153
1154 #[test]
1155 fn event_round_trip_preserves_fields_for_load_snapshot() {
1156 let original = TeamEvent::load_snapshot(4, 8, true);
1157
1158 let json = serde_json::to_string(&original).unwrap();
1159 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1160
1161 assert_eq!(parsed.event, "load_snapshot");
1162 assert_eq!(parsed.working_members, Some(4));
1163 assert_eq!(parsed.total_members, Some(8));
1164 assert_eq!(parsed.session_running, Some(true));
1165 assert_eq!(parsed.load, Some(0.5));
1166 assert_eq!(parsed.ts, original.ts);
1167 }
1168
1169 #[test]
1170 fn event_round_trip_preserves_fields_for_delivery_failed() {
1171 let original = TeamEvent::delivery_failed("eng-2", "manager", "marker missing");
1172
1173 let json = serde_json::to_string(&original).unwrap();
1174 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1175
1176 assert_eq!(parsed.event, "delivery_failed");
1177 assert_eq!(parsed.role.as_deref(), Some("eng-2"));
1178 assert_eq!(parsed.from.as_deref(), Some("manager"));
1179 assert_eq!(parsed.reason.as_deref(), Some("marker missing"));
1180 assert_eq!(parsed.ts, original.ts);
1181 }
1182
1183 #[test]
1184 fn read_events_skips_blank_and_malformed_lines() {
1185 let tmp = tempfile::tempdir().unwrap();
1186 let path = tmp.path().join("events.jsonl");
1187 fs::write(
1188 &path,
1189 [
1190 "",
1191 "{\"event\":\"daemon_started\",\"ts\":1}",
1192 "not-json",
1193 " ",
1194 "{\"event\":\"daemon_stopped\",\"ts\":2}",
1195 ]
1196 .join("\n"),
1197 )
1198 .unwrap();
1199
1200 let events = read_events(&path).unwrap();
1201
1202 assert_eq!(events.len(), 2);
1203 assert_eq!(events[0].event, "daemon_started");
1204 assert_eq!(events[1].event, "daemon_stopped");
1205 }
1206
1207 #[test]
1208 fn rotate_event_log_if_needed_returns_false_for_missing_file() {
1209 let tmp = tempfile::tempdir().unwrap();
1210 let path = tmp.path().join("events.jsonl");
1211
1212 let rotated = rotate_event_log_if_needed(&path, 128, 0).unwrap();
1213
1214 assert!(!rotated);
1215 assert!(!rotated_event_log_path(&path).exists());
1216 }
1217
1218 #[test]
1219 fn rotate_event_log_if_needed_returns_false_for_empty_file() {
1220 let tmp = tempfile::tempdir().unwrap();
1221 let path = tmp.path().join("events.jsonl");
1222 fs::write(&path, "").unwrap();
1223
1224 let rotated = rotate_event_log_if_needed(&path, 1, 1).unwrap();
1225
1226 assert!(!rotated);
1227 assert!(path.exists());
1228 assert!(!rotated_event_log_path(&path).exists());
1229 }
1230
1231 #[test]
1232 fn rotate_event_log_if_needed_replaces_existing_rotated_file() {
1233 let tmp = tempfile::tempdir().unwrap();
1234 let path = tmp.path().join("events.jsonl");
1235 let rotated_path = rotated_event_log_path(&path);
1236 fs::write(&path, "current-events").unwrap();
1237 fs::write(&rotated_path, "old-rotated-events").unwrap();
1238
1239 let rotated = rotate_event_log_if_needed(&path, 5, 0).unwrap();
1240
1241 assert!(rotated);
1242 assert_eq!(fs::read_to_string(&rotated_path).unwrap(), "current-events");
1243 }
1244
1245 #[test]
1246 fn concurrent_event_sinks_append_without_losing_lines() {
1247 let tmp = tempfile::tempdir().unwrap();
1248 let path = Arc::new(tmp.path().join("events.jsonl"));
1249 let ready = Arc::new(std::sync::Barrier::new(5));
1250 let errors = Arc::new(Mutex::new(Vec::<String>::new()));
1251 let mut handles = Vec::new();
1252
1253 for idx in 0..4 {
1254 let path = Arc::clone(&path);
1255 let ready = Arc::clone(&ready);
1256 let errors = Arc::clone(&errors);
1257 handles.push(thread::spawn(move || {
1258 ready.wait();
1259 let result = (|| -> Result<()> {
1260 let mut sink = EventSink::new(&path)?;
1261 sink.emit(TeamEvent::task_assigned(
1262 &format!("eng-{idx}"),
1263 &format!("task-{idx}"),
1264 ))?;
1265 Ok(())
1266 })();
1267 if let Err(error) = result {
1268 errors.lock().unwrap().push(error.to_string());
1269 }
1270 }));
1271 }
1272
1273 ready.wait();
1274 for handle in handles {
1275 handle.join().unwrap();
1276 }
1277
1278 assert!(errors.lock().unwrap().is_empty());
1279 let events = read_events(&path).unwrap();
1280 assert_eq!(events.len(), 4);
1281 for idx in 0..4 {
1282 assert!(
1283 events
1284 .iter()
1285 .any(|event| event.role.as_deref() == Some(&format!("eng-{idx}")))
1286 );
1287 }
1288 }
1289
1290 #[test]
1291 fn read_events_handles_large_log_file() {
1292 let tmp = tempfile::tempdir().unwrap();
1293 let path = tmp.path().join("events.jsonl");
1294 let mut sink = EventSink::new(&path).unwrap();
1295
1296 for idx in 0..512 {
1297 sink.emit(TeamEvent::task_assigned(
1298 &format!("eng-{idx}"),
1299 &"x".repeat(128),
1300 ))
1301 .unwrap();
1302 }
1303
1304 let events = read_events(&path).unwrap();
1305
1306 assert_eq!(events.len(), 512);
1307 assert_eq!(events.first().unwrap().event, "task_assigned");
1308 assert_eq!(events.last().unwrap().event, "task_assigned");
1309 }
1310
1311 fn production_unwrap_expect_count(source: &str) -> usize {
1312 let prod = if let Some(pos) = source.find("\n#[cfg(test)]\nmod tests") {
1313 &source[..pos]
1314 } else {
1315 source
1316 };
1317 prod.lines()
1318 .filter(|line| {
1319 let trimmed = line.trim();
1320 !trimmed.starts_with("#[cfg(test)]")
1321 && (trimmed.contains(".unwrap(") || trimmed.contains(".expect("))
1322 })
1323 .count()
1324 }
1325
1326 #[test]
1327 fn task_completed_includes_task_id() {
1328 let event = TeamEvent::task_completed("eng-1", Some("42"));
1329 let json = serde_json::to_string(&event).unwrap();
1330 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1331 assert_eq!(parsed["event"].as_str().unwrap(), "task_completed");
1332 assert_eq!(parsed["role"].as_str().unwrap(), "eng-1");
1333 assert_eq!(parsed["task"].as_str().unwrap(), "42");
1334 }
1335
1336 #[test]
1337 fn task_completed_without_task_id_omits_task_field() {
1338 let event = TeamEvent::task_completed("eng-1", None);
1339 let json = serde_json::to_string(&event).unwrap();
1340 assert!(json.contains("\"event\":\"task_completed\""));
1341 assert!(json.contains("\"role\":\"eng-1\""));
1342 assert!(!json.contains("\"task\""));
1343 }
1344
1345 #[test]
1346 fn task_escalated_without_reason_omits_reason_field() {
1347 let event = TeamEvent::task_escalated("eng-1", "42", None);
1348 let json = serde_json::to_string(&event).unwrap();
1349 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1350 assert_eq!(parsed["event"].as_str().unwrap(), "task_escalated");
1351 assert_eq!(parsed["task"].as_str().unwrap(), "42");
1352 assert!(parsed.get("reason").is_none());
1353 }
1354
1355 #[test]
1356 fn task_escalated_with_reason_includes_reason_field() {
1357 let event = TeamEvent::task_escalated("eng-1", "42", Some("merge_conflict"));
1358 let json = serde_json::to_string(&event).unwrap();
1359 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1360 assert_eq!(parsed["event"].as_str().unwrap(), "task_escalated");
1361 assert_eq!(parsed["task"].as_str().unwrap(), "42");
1362 assert_eq!(parsed["reason"].as_str().unwrap(), "merge_conflict");
1363 }
1364
1365 #[test]
1366 fn task_completed_round_trip_preserves_task_id() {
1367 let original = TeamEvent::task_completed("eng-1", Some("99"));
1368 let json = serde_json::to_string(&original).unwrap();
1369 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1370 assert_eq!(parsed.event, "task_completed");
1371 assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1372 assert_eq!(parsed.task.as_deref(), Some("99"));
1373 }
1374
1375 #[test]
1376 fn task_escalated_round_trip_preserves_reason() {
1377 let original = TeamEvent::task_escalated("eng-1", "42", Some("context_exhausted"));
1378 let json = serde_json::to_string(&original).unwrap();
1379 let parsed: TeamEvent = serde_json::from_str(&json).unwrap();
1380 assert_eq!(parsed.event, "task_escalated");
1381 assert_eq!(parsed.role.as_deref(), Some("eng-1"));
1382 assert_eq!(parsed.task.as_deref(), Some("42"));
1383 assert_eq!(parsed.reason.as_deref(), Some("context_exhausted"));
1384 }
1385
1386 #[test]
1387 fn production_events_has_no_unwrap_or_expect_calls() {
1388 let src = include_str!("events.rs");
1389 assert_eq!(
1390 production_unwrap_expect_count(src),
1391 0,
1392 "production events.rs should avoid unwrap/expect"
1393 );
1394 }
1395
1396 #[test]
1397 fn stall_detected_event_fields() {
1398 let event = TeamEvent::stall_detected("eng-1-1", Some(42), 300);
1399 assert_eq!(event.event, "stall_detected");
1400 assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1401 assert_eq!(event.task.as_deref(), Some("42"));
1402 assert_eq!(event.uptime_secs, Some(300));
1403 }
1404
1405 #[test]
1406 fn stall_detected_event_without_task() {
1407 let event = TeamEvent::stall_detected("eng-1-1", None, 600);
1408 assert_eq!(event.event, "stall_detected");
1409 assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1410 assert!(event.task.is_none());
1411 assert_eq!(event.uptime_secs, Some(600));
1412 }
1413
1414 #[test]
1415 fn stall_detected_event_serializes_to_jsonl() {
1416 let event = TeamEvent::stall_detected("eng-1-1", Some(42), 300);
1417 let json = serde_json::to_string(&event).unwrap();
1418 assert!(json.contains("\"stall_detected\""));
1419 assert!(json.contains("\"eng-1-1\""));
1420 assert!(json.contains("\"42\""));
1421 }
1422
1423 #[test]
1424 fn health_changed_event_fields() {
1425 let event = TeamEvent::health_changed("eng-1-1", "healthy→unreachable");
1426 assert_eq!(event.event, "health_changed");
1427 assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1428 assert_eq!(event.reason.as_deref(), Some("healthy→unreachable"));
1429 }
1430
1431 #[test]
1432 fn health_changed_event_serializes_to_jsonl() {
1433 let event = TeamEvent::health_changed("eng-1-2", "unreachable→healthy");
1434 let json = serde_json::to_string(&event).unwrap();
1435 assert!(json.contains("\"health_changed\""));
1436 assert!(json.contains("\"eng-1-2\""));
1437 }
1438
1439 #[test]
1442 fn event_sink_on_readonly_dir_returns_error() {
1443 #[cfg(unix)]
1444 {
1445 use std::os::unix::fs::PermissionsExt;
1446 let tmp = tempfile::tempdir().unwrap();
1447 let readonly_dir = tmp.path().join("readonly");
1448 fs::create_dir(&readonly_dir).unwrap();
1449 fs::set_permissions(&readonly_dir, fs::Permissions::from_mode(0o444)).unwrap();
1450
1451 let path = readonly_dir.join("subdir").join("events.jsonl");
1452 let result = EventSink::new(&path);
1453 assert!(result.is_err());
1454
1455 fs::set_permissions(&readonly_dir, fs::Permissions::from_mode(0o755)).unwrap();
1457 }
1458 }
1459
1460 #[test]
1461 fn read_events_from_nonexistent_file_returns_empty() {
1462 let tmp = tempfile::tempdir().unwrap();
1463 let path = tmp.path().join("does_not_exist.jsonl");
1464 let events = read_events(&path).unwrap();
1465 assert!(events.is_empty());
1466 }
1467
1468 #[test]
1469 fn read_events_all_malformed_lines_returns_empty() {
1470 let tmp = tempfile::tempdir().unwrap();
1471 let path = tmp.path().join("events.jsonl");
1472 fs::write(&path, "not json\nalso not json\n{invalid}\n").unwrap();
1473 let events = read_events(&path).unwrap();
1474 assert!(events.is_empty());
1475 }
1476
1477 #[test]
1478 fn event_sink_emit_with_failing_writer() {
1479 struct FailWriter;
1480 impl Write for FailWriter {
1481 fn write(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
1482 Err(std::io::Error::new(
1483 std::io::ErrorKind::BrokenPipe,
1484 "simulated write failure",
1485 ))
1486 }
1487 fn flush(&mut self) -> std::io::Result<()> {
1488 Err(std::io::Error::new(
1489 std::io::ErrorKind::BrokenPipe,
1490 "simulated flush failure",
1491 ))
1492 }
1493 }
1494
1495 let tmp = tempfile::tempdir().unwrap();
1496 let path = tmp.path().join("events.jsonl");
1497 let mut sink = EventSink::from_writer(&path, FailWriter);
1498 let result = sink.emit(TeamEvent::daemon_started());
1499 assert!(result.is_err());
1500 }
1501
1502 #[test]
1503 fn rotate_event_log_replaces_stale_rotated_file() {
1504 let tmp = tempfile::tempdir().unwrap();
1505 let path = tmp.path().join("events.jsonl");
1506 let rotated = rotated_event_log_path(&path);
1507
1508 fs::write(&path, "current-data-that-is-large").unwrap();
1509 fs::write(&rotated, "old-rotated-data").unwrap();
1510
1511 let did_rotate = rotate_event_log_if_needed(&path, 5, 0).unwrap();
1512 assert!(did_rotate);
1513 assert_eq!(
1515 fs::read_to_string(&rotated).unwrap(),
1516 "current-data-that-is-large"
1517 );
1518 assert!(!path.exists());
1520 }
1521
1522 #[test]
1523 fn event_sink_handles_zero_max_bytes_rotation() {
1524 let tmp = tempfile::tempdir().unwrap();
1525 let path = tmp.path().join("events.jsonl");
1526
1527 fs::write(&path, "").unwrap();
1529 let did_rotate = rotate_event_log_if_needed(&path, 0, 0).unwrap();
1530 assert!(!did_rotate); fs::write(&path, "x").unwrap();
1533 let did_rotate = rotate_event_log_if_needed(&path, 0, 0).unwrap();
1534 assert!(did_rotate); }
1536
1537 #[test]
1538 fn narration_rejection_event_has_correct_fields() {
1539 let event = TeamEvent::narration_rejection("eng-1-1", 42, 2);
1540 assert_eq!(event.event, "narration_rejection");
1541 assert_eq!(event.role.as_deref(), Some("eng-1-1"));
1542 assert_eq!(event.task.as_deref(), Some("42"));
1543 assert_eq!(event.reason.as_deref(), Some("rejection_count=2"));
1544 }
1545
1546 #[test]
1547 fn read_events_partial_json_with_valid_lines_mixed() {
1548 let tmp = tempfile::tempdir().unwrap();
1549 let path = tmp.path().join("events.jsonl");
1550 let content = format!(
1552 "{}\n{{\"event\":\"trunca\n{}\n",
1553 r#"{"event":"daemon_started","ts":1}"#, r#"{"event":"daemon_stopped","ts":3}"#
1554 );
1555 fs::write(&path, content).unwrap();
1556
1557 let events = read_events(&path).unwrap();
1558 assert_eq!(events.len(), 2);
1559 assert_eq!(events[0].event, "daemon_started");
1560 assert_eq!(events[1].event, "daemon_stopped");
1561 }
1562}