ralph-core 2.9.3

Core orchestration loop, configuration, and state management for Ralph Orchestrator
Documentation
use serde::{Deserialize, Serialize};
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::path::Path;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrchestrationEntry {
    pub timestamp: String,
    pub iteration: u32,
    pub hat: String,
    pub event: OrchestrationEvent,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum OrchestrationEvent {
    IterationStarted,
    HatSelected {
        hat: String,
        reason: String,
    },
    EventPublished {
        topic: String,
    },
    BackpressureTriggered {
        reason: String,
    },
    LoopTerminated {
        reason: String,
    },
    TaskAbandoned {
        reason: String,
    },
    WaveStarted {
        wave_id: String,
        expected_total: u32,
        worker_hat: String,
        concurrency: u32,
    },
    WaveInstanceCompleted {
        wave_id: String,
        index: u32,
        duration_ms: u64,
        cost_usd: f64,
    },
    WaveInstanceFailed {
        wave_id: String,
        index: u32,
        error: String,
        duration_ms: u64,
    },
    WaveCompleted {
        wave_id: String,
        total_results: u32,
        total_failures: u32,
        timed_out: bool,
        duration_ms: u64,
    },
}

pub struct OrchestrationLogger {
    writer: BufWriter<File>,
}

impl OrchestrationLogger {
    pub fn new(session_dir: &Path) -> std::io::Result<Self> {
        let file = OpenOptions::new()
            .create(true)
            .append(true)
            .open(session_dir.join("orchestration.jsonl"))?;
        Ok(Self {
            writer: BufWriter::new(file),
        })
    }

    pub fn log(
        &mut self,
        iteration: u32,
        hat: &str,
        event: OrchestrationEvent,
    ) -> std::io::Result<()> {
        let entry = OrchestrationEntry {
            timestamp: chrono::Local::now().to_rfc3339(),
            iteration,
            hat: hat.to_string(),
            event,
        };
        serde_json::to_writer(&mut self.writer, &entry)?;
        self.writer.write_all(b"\n")?;
        self.writer.flush()?;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::io::{BufRead, BufReader};
    use tempfile::TempDir;

    #[test]
    fn test_all_event_types_serialize() {
        let events = vec![
            OrchestrationEvent::IterationStarted,
            OrchestrationEvent::HatSelected {
                hat: "ralph".to_string(),
                reason: "pending_events".to_string(),
            },
            OrchestrationEvent::EventPublished {
                topic: "build.start".to_string(),
            },
            OrchestrationEvent::BackpressureTriggered {
                reason: "tests failed".to_string(),
            },
            OrchestrationEvent::LoopTerminated {
                reason: "completion_promise".to_string(),
            },
            OrchestrationEvent::TaskAbandoned {
                reason: "max_iterations".to_string(),
            },
            OrchestrationEvent::WaveStarted {
                wave_id: "w-abc12345".to_string(),
                expected_total: 3,
                worker_hat: "reviewer".to_string(),
                concurrency: 4,
            },
            OrchestrationEvent::WaveInstanceCompleted {
                wave_id: "w-abc12345".to_string(),
                index: 0,
                duration_ms: 5000,
                cost_usd: 0.05,
            },
            OrchestrationEvent::WaveInstanceFailed {
                wave_id: "w-abc12345".to_string(),
                index: 1,
                error: "backend timeout".to_string(),
                duration_ms: 30000,
            },
            OrchestrationEvent::WaveCompleted {
                wave_id: "w-abc12345".to_string(),
                total_results: 2,
                total_failures: 1,
                timed_out: false,
                duration_ms: 35000,
            },
        ];

        for event in events {
            let json = serde_json::to_string(&event).unwrap();
            let _: OrchestrationEvent = serde_json::from_str(&json).unwrap();
        }
    }

    #[test]
    fn test_iteration_and_hat_captured() {
        let temp_dir = TempDir::new().unwrap();
        let mut logger = OrchestrationLogger::new(temp_dir.path()).unwrap();

        logger
            .log(
                5,
                "builder",
                OrchestrationEvent::HatSelected {
                    hat: "builder".to_string(),
                    reason: "tasks_ready".to_string(),
                },
            )
            .unwrap();

        drop(logger);

        let file = File::open(temp_dir.path().join("orchestration.jsonl")).unwrap();
        let reader = BufReader::new(file);
        let line = reader.lines().next().unwrap().unwrap();
        let entry: OrchestrationEntry = serde_json::from_str(&line).unwrap();

        assert_eq!(entry.iteration, 5);
        assert_eq!(entry.hat, "builder");
    }

    #[test]
    fn test_immediate_flush() {
        let temp_dir = TempDir::new().unwrap();
        let mut logger = OrchestrationLogger::new(temp_dir.path()).unwrap();

        logger
            .log(1, "ralph", OrchestrationEvent::IterationStarted)
            .unwrap();

        // Don't drop logger - verify file has content immediately
        let file = File::open(temp_dir.path().join("orchestration.jsonl")).unwrap();
        let reader = BufReader::new(file);
        let lines: Vec<_> = reader.lines().collect();
        assert_eq!(lines.len(), 1);
    }
}