Skip to main content

swarm_engine_core/events/
persistence.rs

1//! Action Event Persistence - 行動イベントの永続化
2//!
3//! ActionEvent を JSONL 形式でファイルに書き出す。
4
5use std::io::Write;
6use std::path::PathBuf;
7
8use serde::Serialize;
9use tokio::sync::broadcast;
10
11use super::action::ActionEvent;
12
13/// JSONL 永続化イベント(シリアライズ用)
14#[derive(Debug, Serialize)]
15struct JsonlEvent {
16    tick: u64,
17    worker_id: usize,
18    action: String,
19    target: Option<String>,
20    success: bool,
21    error: Option<String>,
22    duration_ms: u64,
23    #[serde(skip_serializing_if = "Option::is_none")]
24    selection_logic: Option<String>,
25    #[serde(skip_serializing_if = "Option::is_none")]
26    previous_action: Option<String>,
27}
28
29impl From<&ActionEvent> for JsonlEvent {
30    fn from(e: &ActionEvent) -> Self {
31        Self {
32            tick: e.tick,
33            worker_id: e.worker_id.0,
34            action: e.action.clone(),
35            target: e.target.clone(),
36            success: e.result.success,
37            error: e.result.error.clone(),
38            duration_ms: e.duration.as_millis() as u64,
39            selection_logic: e.context.selection_logic.clone(),
40            previous_action: e.context.previous_action.clone(),
41        }
42    }
43}
44
45/// JSONL ファイル永続化
46///
47/// ActionEvent を JSONL 形式でファイルに書き出す。
48/// 非同期で受信し、同期的にファイルに追記。
49pub struct JsonlWriter {
50    rx: broadcast::Receiver<ActionEvent>,
51    path: PathBuf,
52    buffer_lines: usize,
53}
54
55impl JsonlWriter {
56    pub fn new(rx: broadcast::Receiver<ActionEvent>, path: impl Into<PathBuf>) -> Self {
57        Self {
58            rx,
59            path: path.into(),
60            buffer_lines: 0,
61        }
62    }
63
64    /// バッファリング設定
65    pub fn with_buffer(mut self, lines: usize) -> Self {
66        self.buffer_lines = lines;
67        self
68    }
69
70    /// 受信ループを開始(async)
71    pub async fn run(mut self) -> std::io::Result<()> {
72        let file = std::fs::OpenOptions::new()
73            .create(true)
74            .append(true)
75            .open(&self.path)?;
76        let mut writer = std::io::BufWriter::new(file);
77        let mut line_count = 0;
78
79        while let Ok(event) = self.rx.recv().await {
80            let jsonl_event = JsonlEvent::from(&event);
81            if let Ok(json) = serde_json::to_string(&jsonl_event) {
82                writeln!(writer, "{}", json)?;
83                line_count += 1;
84
85                if self.buffer_lines == 0 || line_count >= self.buffer_lines {
86                    writer.flush()?;
87                    line_count = 0;
88                }
89            }
90        }
91
92        writer.flush()?;
93        Ok(())
94    }
95}
96
97#[cfg(test)]
98mod tests {
99    use std::time::Duration;
100
101    use super::*;
102    use crate::events::action::{ActionEventBuilder, ActionEventResult};
103    use crate::types::WorkerId;
104
105    fn make_event(tick: u64, action: &str, success: bool) -> ActionEvent {
106        let result = if success {
107            ActionEventResult::success()
108        } else {
109            ActionEventResult::failure("error")
110        };
111
112        ActionEventBuilder::new(tick, WorkerId(0), action)
113            .result(result)
114            .duration(Duration::from_millis(50))
115            .build()
116    }
117
118    #[tokio::test]
119    async fn test_jsonl_writer() {
120        let temp_dir = std::env::temp_dir();
121        let path = temp_dir.join(format!("test_events_{}.jsonl", std::process::id()));
122
123        let (tx, rx) = broadcast::channel::<ActionEvent>(16);
124        let writer = JsonlWriter::new(rx, &path);
125
126        let handle = tokio::spawn(async move {
127            writer.run().await.unwrap();
128        });
129
130        tx.send(make_event(1, "CheckStatus", true)).unwrap();
131        tx.send(make_event(2, "ReadLogs", false)).unwrap();
132
133        tokio::time::sleep(Duration::from_millis(10)).await;
134
135        drop(tx);
136        let _ = handle.await;
137
138        let content = std::fs::read_to_string(&path).unwrap();
139        let lines: Vec<&str> = content.lines().collect();
140        assert_eq!(lines.len(), 2);
141
142        let first: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
143        assert_eq!(first["tick"], 1);
144        assert_eq!(first["action"], "CheckStatus");
145        assert!(first["success"].as_bool().unwrap());
146
147        std::fs::remove_file(&path).ok();
148    }
149}