1use serde::{Deserialize, Serialize};
2use std::fs::{File, OpenOptions};
3use std::io::{BufWriter, Write};
4use std::path::Path;
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
7pub struct OrchestrationEntry {
8 pub timestamp: String,
9 pub iteration: u32,
10 pub hat: String,
11 pub event: OrchestrationEvent,
12}
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
15#[serde(tag = "type", rename_all = "snake_case")]
16pub enum OrchestrationEvent {
17 IterationStarted,
18 HatSelected {
19 hat: String,
20 reason: String,
21 },
22 EventPublished {
23 topic: String,
24 },
25 BackpressureTriggered {
26 reason: String,
27 },
28 LoopTerminated {
29 reason: String,
30 },
31 TaskAbandoned {
32 reason: String,
33 },
34 WaveStarted {
35 wave_id: String,
36 expected_total: u32,
37 worker_hat: String,
38 concurrency: u32,
39 },
40 WaveInstanceCompleted {
41 wave_id: String,
42 index: u32,
43 duration_ms: u64,
44 cost_usd: f64,
45 },
46 WaveInstanceFailed {
47 wave_id: String,
48 index: u32,
49 error: String,
50 duration_ms: u64,
51 },
52 WaveCompleted {
53 wave_id: String,
54 total_results: u32,
55 total_failures: u32,
56 timed_out: bool,
57 duration_ms: u64,
58 },
59}
60
61pub struct OrchestrationLogger {
62 writer: BufWriter<File>,
63}
64
65impl OrchestrationLogger {
66 pub fn new(session_dir: &Path) -> std::io::Result<Self> {
67 let file = OpenOptions::new()
68 .create(true)
69 .append(true)
70 .open(session_dir.join("orchestration.jsonl"))?;
71 Ok(Self {
72 writer: BufWriter::new(file),
73 })
74 }
75
76 pub fn log(
77 &mut self,
78 iteration: u32,
79 hat: &str,
80 event: OrchestrationEvent,
81 ) -> std::io::Result<()> {
82 let entry = OrchestrationEntry {
83 timestamp: chrono::Local::now().to_rfc3339(),
84 iteration,
85 hat: hat.to_string(),
86 event,
87 };
88 serde_json::to_writer(&mut self.writer, &entry)?;
89 self.writer.write_all(b"\n")?;
90 self.writer.flush()?;
91 Ok(())
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use super::*;
98 use std::io::{BufRead, BufReader};
99 use tempfile::TempDir;
100
101 #[test]
102 fn test_all_event_types_serialize() {
103 let events = vec![
104 OrchestrationEvent::IterationStarted,
105 OrchestrationEvent::HatSelected {
106 hat: "ralph".to_string(),
107 reason: "pending_events".to_string(),
108 },
109 OrchestrationEvent::EventPublished {
110 topic: "build.start".to_string(),
111 },
112 OrchestrationEvent::BackpressureTriggered {
113 reason: "tests failed".to_string(),
114 },
115 OrchestrationEvent::LoopTerminated {
116 reason: "completion_promise".to_string(),
117 },
118 OrchestrationEvent::TaskAbandoned {
119 reason: "max_iterations".to_string(),
120 },
121 OrchestrationEvent::WaveStarted {
122 wave_id: "w-abc12345".to_string(),
123 expected_total: 3,
124 worker_hat: "reviewer".to_string(),
125 concurrency: 4,
126 },
127 OrchestrationEvent::WaveInstanceCompleted {
128 wave_id: "w-abc12345".to_string(),
129 index: 0,
130 duration_ms: 5000,
131 cost_usd: 0.05,
132 },
133 OrchestrationEvent::WaveInstanceFailed {
134 wave_id: "w-abc12345".to_string(),
135 index: 1,
136 error: "backend timeout".to_string(),
137 duration_ms: 30000,
138 },
139 OrchestrationEvent::WaveCompleted {
140 wave_id: "w-abc12345".to_string(),
141 total_results: 2,
142 total_failures: 1,
143 timed_out: false,
144 duration_ms: 35000,
145 },
146 ];
147
148 for event in events {
149 let json = serde_json::to_string(&event).unwrap();
150 let _: OrchestrationEvent = serde_json::from_str(&json).unwrap();
151 }
152 }
153
154 #[test]
155 fn test_iteration_and_hat_captured() {
156 let temp_dir = TempDir::new().unwrap();
157 let mut logger = OrchestrationLogger::new(temp_dir.path()).unwrap();
158
159 logger
160 .log(
161 5,
162 "builder",
163 OrchestrationEvent::HatSelected {
164 hat: "builder".to_string(),
165 reason: "tasks_ready".to_string(),
166 },
167 )
168 .unwrap();
169
170 drop(logger);
171
172 let file = File::open(temp_dir.path().join("orchestration.jsonl")).unwrap();
173 let reader = BufReader::new(file);
174 let line = reader.lines().next().unwrap().unwrap();
175 let entry: OrchestrationEntry = serde_json::from_str(&line).unwrap();
176
177 assert_eq!(entry.iteration, 5);
178 assert_eq!(entry.hat, "builder");
179 }
180
181 #[test]
182 fn test_immediate_flush() {
183 let temp_dir = TempDir::new().unwrap();
184 let mut logger = OrchestrationLogger::new(temp_dir.path()).unwrap();
185
186 logger
187 .log(1, "ralph", OrchestrationEvent::IterationStarted)
188 .unwrap();
189
190 let file = File::open(temp_dir.path().join("orchestration.jsonl")).unwrap();
192 let reader = BufReader::new(file);
193 let lines: Vec<_> = reader.lines().collect();
194 assert_eq!(lines.len(), 1);
195 }
196}