swarm-engine-core 0.1.6

Core types and orchestration for SwarmEngine
Documentation
//! Action Event Persistence - 行動イベントの永続化
//!
//! ActionEvent を JSONL 形式でファイルに書き出す。

use std::io::Write;
use std::path::PathBuf;

use serde::Serialize;
use tokio::sync::broadcast;

use super::action::ActionEvent;

/// JSONL 永続化イベント(シリアライズ用)
#[derive(Debug, Serialize)]
struct JsonlEvent {
    tick: u64,
    worker_id: usize,
    action: String,
    target: Option<String>,
    success: bool,
    error: Option<String>,
    duration_ms: u64,
    #[serde(skip_serializing_if = "Option::is_none")]
    selection_logic: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    previous_action: Option<String>,
}

impl From<&ActionEvent> for JsonlEvent {
    fn from(e: &ActionEvent) -> Self {
        Self {
            tick: e.tick,
            worker_id: e.worker_id.0,
            action: e.action.clone(),
            target: e.target.clone(),
            success: e.result.success,
            error: e.result.error.clone(),
            duration_ms: e.duration.as_millis() as u64,
            selection_logic: e.context.selection_logic.clone(),
            previous_action: e.context.previous_action.clone(),
        }
    }
}

/// JSONL ファイル永続化
///
/// ActionEvent を JSONL 形式でファイルに書き出す。
/// 非同期で受信し、同期的にファイルに追記。
pub struct JsonlWriter {
    rx: broadcast::Receiver<ActionEvent>,
    path: PathBuf,
    buffer_lines: usize,
}

impl JsonlWriter {
    pub fn new(rx: broadcast::Receiver<ActionEvent>, path: impl Into<PathBuf>) -> Self {
        Self {
            rx,
            path: path.into(),
            buffer_lines: 0,
        }
    }

    /// バッファリング設定
    pub fn with_buffer(mut self, lines: usize) -> Self {
        self.buffer_lines = lines;
        self
    }

    /// 受信ループを開始(async)
    pub async fn run(mut self) -> std::io::Result<()> {
        let file = std::fs::OpenOptions::new()
            .create(true)
            .append(true)
            .open(&self.path)?;
        let mut writer = std::io::BufWriter::new(file);
        let mut line_count = 0;

        while let Ok(event) = self.rx.recv().await {
            let jsonl_event = JsonlEvent::from(&event);
            if let Ok(json) = serde_json::to_string(&jsonl_event) {
                writeln!(writer, "{}", json)?;
                line_count += 1;

                if self.buffer_lines == 0 || line_count >= self.buffer_lines {
                    writer.flush()?;
                    line_count = 0;
                }
            }
        }

        writer.flush()?;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use super::*;
    use crate::events::action::{ActionEventBuilder, ActionEventResult};
    use crate::types::WorkerId;

    fn make_event(tick: u64, action: &str, success: bool) -> ActionEvent {
        let result = if success {
            ActionEventResult::success()
        } else {
            ActionEventResult::failure("error")
        };

        ActionEventBuilder::new(tick, WorkerId(0), action)
            .result(result)
            .duration(Duration::from_millis(50))
            .build()
    }

    #[tokio::test]
    async fn test_jsonl_writer() {
        let temp_dir = std::env::temp_dir();
        let path = temp_dir.join(format!("test_events_{}.jsonl", std::process::id()));

        let (tx, rx) = broadcast::channel::<ActionEvent>(16);
        let writer = JsonlWriter::new(rx, &path);

        let handle = tokio::spawn(async move {
            writer.run().await.unwrap();
        });

        tx.send(make_event(1, "CheckStatus", true)).unwrap();
        tx.send(make_event(2, "ReadLogs", false)).unwrap();

        tokio::time::sleep(Duration::from_millis(10)).await;

        drop(tx);
        let _ = handle.await;

        let content = std::fs::read_to_string(&path).unwrap();
        let lines: Vec<&str> = content.lines().collect();
        assert_eq!(lines.len(), 2);

        let first: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
        assert_eq!(first["tick"], 1);
        assert_eq!(first["action"], "CheckStatus");
        assert!(first["success"].as_bool().unwrap());

        std::fs::remove_file(&path).ok();
    }
}