Skip to main content

ralph_core/diagnostics/
orchestration.rs

1use serde::{Deserialize, Serialize};
2use std::fs::{File, OpenOptions};
3use std::io::{BufWriter, Write};
4use std::path::Path;
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
7pub struct OrchestrationEntry {
8    pub timestamp: String,
9    pub iteration: u32,
10    pub hat: String,
11    pub event: OrchestrationEvent,
12}
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
15#[serde(tag = "type", rename_all = "snake_case")]
16pub enum OrchestrationEvent {
17    IterationStarted,
18    HatSelected {
19        hat: String,
20        reason: String,
21    },
22    EventPublished {
23        topic: String,
24    },
25    BackpressureTriggered {
26        reason: String,
27    },
28    LoopTerminated {
29        reason: String,
30    },
31    TaskAbandoned {
32        reason: String,
33    },
34    WaveStarted {
35        wave_id: String,
36        expected_total: u32,
37        worker_hat: String,
38        concurrency: u32,
39    },
40    WaveInstanceCompleted {
41        wave_id: String,
42        index: u32,
43        duration_ms: u64,
44        cost_usd: f64,
45    },
46    WaveInstanceFailed {
47        wave_id: String,
48        index: u32,
49        error: String,
50        duration_ms: u64,
51    },
52    WaveCompleted {
53        wave_id: String,
54        total_results: u32,
55        total_failures: u32,
56        timed_out: bool,
57        duration_ms: u64,
58    },
59}
60
61pub struct OrchestrationLogger {
62    writer: BufWriter<File>,
63}
64
65impl OrchestrationLogger {
66    pub fn new(session_dir: &Path) -> std::io::Result<Self> {
67        let file = OpenOptions::new()
68            .create(true)
69            .append(true)
70            .open(session_dir.join("orchestration.jsonl"))?;
71        Ok(Self {
72            writer: BufWriter::new(file),
73        })
74    }
75
76    pub fn log(
77        &mut self,
78        iteration: u32,
79        hat: &str,
80        event: OrchestrationEvent,
81    ) -> std::io::Result<()> {
82        let entry = OrchestrationEntry {
83            timestamp: chrono::Local::now().to_rfc3339(),
84            iteration,
85            hat: hat.to_string(),
86            event,
87        };
88        serde_json::to_writer(&mut self.writer, &entry)?;
89        self.writer.write_all(b"\n")?;
90        self.writer.flush()?;
91        Ok(())
92    }
93}
94
95#[cfg(test)]
96mod tests {
97    use super::*;
98    use std::io::{BufRead, BufReader};
99    use tempfile::TempDir;
100
101    #[test]
102    fn test_all_event_types_serialize() {
103        let events = vec![
104            OrchestrationEvent::IterationStarted,
105            OrchestrationEvent::HatSelected {
106                hat: "ralph".to_string(),
107                reason: "pending_events".to_string(),
108            },
109            OrchestrationEvent::EventPublished {
110                topic: "build.start".to_string(),
111            },
112            OrchestrationEvent::BackpressureTriggered {
113                reason: "tests failed".to_string(),
114            },
115            OrchestrationEvent::LoopTerminated {
116                reason: "completion_promise".to_string(),
117            },
118            OrchestrationEvent::TaskAbandoned {
119                reason: "max_iterations".to_string(),
120            },
121            OrchestrationEvent::WaveStarted {
122                wave_id: "w-abc12345".to_string(),
123                expected_total: 3,
124                worker_hat: "reviewer".to_string(),
125                concurrency: 4,
126            },
127            OrchestrationEvent::WaveInstanceCompleted {
128                wave_id: "w-abc12345".to_string(),
129                index: 0,
130                duration_ms: 5000,
131                cost_usd: 0.05,
132            },
133            OrchestrationEvent::WaveInstanceFailed {
134                wave_id: "w-abc12345".to_string(),
135                index: 1,
136                error: "backend timeout".to_string(),
137                duration_ms: 30000,
138            },
139            OrchestrationEvent::WaveCompleted {
140                wave_id: "w-abc12345".to_string(),
141                total_results: 2,
142                total_failures: 1,
143                timed_out: false,
144                duration_ms: 35000,
145            },
146        ];
147
148        for event in events {
149            let json = serde_json::to_string(&event).unwrap();
150            let _: OrchestrationEvent = serde_json::from_str(&json).unwrap();
151        }
152    }
153
154    #[test]
155    fn test_iteration_and_hat_captured() {
156        let temp_dir = TempDir::new().unwrap();
157        let mut logger = OrchestrationLogger::new(temp_dir.path()).unwrap();
158
159        logger
160            .log(
161                5,
162                "builder",
163                OrchestrationEvent::HatSelected {
164                    hat: "builder".to_string(),
165                    reason: "tasks_ready".to_string(),
166                },
167            )
168            .unwrap();
169
170        drop(logger);
171
172        let file = File::open(temp_dir.path().join("orchestration.jsonl")).unwrap();
173        let reader = BufReader::new(file);
174        let line = reader.lines().next().unwrap().unwrap();
175        let entry: OrchestrationEntry = serde_json::from_str(&line).unwrap();
176
177        assert_eq!(entry.iteration, 5);
178        assert_eq!(entry.hat, "builder");
179    }
180
181    #[test]
182    fn test_immediate_flush() {
183        let temp_dir = TempDir::new().unwrap();
184        let mut logger = OrchestrationLogger::new(temp_dir.path()).unwrap();
185
186        logger
187            .log(1, "ralph", OrchestrationEvent::IterationStarted)
188            .unwrap();
189
190        // Don't drop logger - verify file has content immediately
191        let file = File::open(temp_dir.path().join("orchestration.jsonl")).unwrap();
192        let reader = BufReader::new(file);
193        let lines: Vec<_> = reader.lines().collect();
194        assert_eq!(lines.len(), 1);
195    }
196}