Skip to main content

swarm_engine_core/learn/daemon/
sink.rs

1//! DataSink - Record/Episode の永続化
2//!
3//! EventSource から受け取った Record を Store に永続化し、
4//! LearnModel を使って Episode を構築する。
5
6use std::sync::Arc;
7
8use crate::learn::episode::EpisodeId;
9use crate::learn::learn_model::LearnModel;
10use crate::learn::record::Record;
11use crate::learn::store::{EpisodeDto, EpisodeStore, RecordStore, RecordStoreError, StoreError};
12
13// ============================================================================
14// DataSinkError
15// ============================================================================
16
17/// DataSink のエラー型
18#[derive(Debug)]
19pub enum DataSinkError {
20    /// Record Store エラー
21    RecordStore(RecordStoreError),
22    /// Episode Store エラー
23    EpisodeStore(StoreError),
24    /// その他
25    Other(String),
26}
27
28impl std::fmt::Display for DataSinkError {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        match self {
31            Self::RecordStore(e) => write!(f, "RecordStore error: {}", e),
32            Self::EpisodeStore(e) => write!(f, "EpisodeStore error: {}", e),
33            Self::Other(msg) => write!(f, "{}", msg),
34        }
35    }
36}
37
38impl std::error::Error for DataSinkError {}
39
40impl From<RecordStoreError> for DataSinkError {
41    fn from(e: RecordStoreError) -> Self {
42        Self::RecordStore(e)
43    }
44}
45
46impl From<StoreError> for DataSinkError {
47    fn from(e: StoreError) -> Self {
48        Self::EpisodeStore(e)
49    }
50}
51
52// ============================================================================
53// DataSink
54// ============================================================================
55
56/// Record/Episode の永続化を担当
57pub struct DataSink {
58    /// Record Store
59    record_store: Arc<dyn RecordStore>,
60    /// Episode Store
61    episode_store: Arc<dyn EpisodeStore>,
62    /// LearnModel(Record → Episode 変換用)
63    learn_model: Arc<dyn LearnModel>,
64    /// 永続化した Record 数
65    record_count: usize,
66    /// 生成した Episode 数
67    episode_count: usize,
68}
69
70impl DataSink {
71    /// 新しい DataSink を作成
72    pub fn new(
73        record_store: Arc<dyn RecordStore>,
74        episode_store: Arc<dyn EpisodeStore>,
75        learn_model: Arc<dyn LearnModel>,
76    ) -> Self {
77        Self {
78            record_store,
79            episode_store,
80            learn_model,
81            record_count: 0,
82            episode_count: 0,
83        }
84    }
85
86    /// Record Store への参照を取得
87    pub fn record_store(&self) -> &Arc<dyn RecordStore> {
88        &self.record_store
89    }
90
91    /// Episode Store への参照を取得
92    pub fn episode_store(&self) -> &Arc<dyn EpisodeStore> {
93        &self.episode_store
94    }
95
96    /// 永続化した Record 数を取得
97    pub fn record_count(&self) -> usize {
98        self.record_count
99    }
100
101    /// 生成した Episode 数を取得
102    pub fn episode_count(&self) -> usize {
103        self.episode_count
104    }
105
106    /// Record を永続化し、Episode を構築
107    ///
108    /// # Returns
109    /// 生成された Episode の ID リスト
110    pub fn ingest(&mut self, records: Vec<Record>) -> Result<Vec<EpisodeId>, DataSinkError> {
111        if records.is_empty() {
112            return Ok(vec![]);
113        }
114
115        tracing::debug!(count = records.len(), "Ingesting records");
116
117        // 1. Record を保存
118        for record in &records {
119            self.record_store.append(record)?;
120            self.record_count += 1;
121        }
122
123        // 2. LearnModel で Episode 構築
124        let episodes = self.learn_model.build_episodes(&records);
125
126        if episodes.is_empty() {
127            tracing::debug!("No episodes built from records");
128            return Ok(vec![]);
129        }
130
131        tracing::debug!(count = episodes.len(), "Built episodes from records");
132
133        // 3. Episode を保存
134        let mut ids = Vec::with_capacity(episodes.len());
135        for episode in episodes {
136            let dto = EpisodeDto::from_episode(&episode);
137            let id = self.episode_store.append(&dto)?;
138            ids.push(id);
139            self.episode_count += 1;
140        }
141
142        tracing::info!(
143            records = records.len(),
144            episodes = ids.len(),
145            total_records = self.record_count,
146            total_episodes = self.episode_count,
147            "Ingested records and built episodes"
148        );
149
150        Ok(ids)
151    }
152
153    /// 統計情報を取得
154    pub fn stats(&self) -> DataSinkStats {
155        DataSinkStats {
156            record_count: self.record_count,
157            episode_count: self.episode_count,
158            learn_model_name: self.learn_model.name().to_string(),
159        }
160    }
161}
162
163/// DataSink の統計情報
164#[derive(Debug, Clone)]
165pub struct DataSinkStats {
166    /// 永続化した Record 数
167    pub record_count: usize,
168    /// 生成した Episode 数
169    pub episode_count: usize,
170    /// 使用中の LearnModel 名
171    pub learn_model_name: String,
172}
173
174// ============================================================================
175// Tests
176// ============================================================================
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use crate::events::{ActionContext, ActionEventBuilder, ActionEventResult};
182    use crate::learn::learn_model::WorkerDecisionSequenceLearn;
183    use crate::learn::store::{InMemoryEpisodeStore, InMemoryRecordStore};
184    use crate::types::WorkerId;
185    use std::time::Duration;
186
187    fn make_action_record(tick: u64, worker_id: usize, action: &str, success: bool) -> Record {
188        let result = if success {
189            ActionEventResult::success()
190        } else {
191            ActionEventResult::failure("error")
192        };
193
194        let event = ActionEventBuilder::new(tick, WorkerId(worker_id), action)
195            .result(result)
196            .duration(Duration::from_millis(10))
197            .context(ActionContext::new())
198            .build();
199
200        Record::from(&event)
201    }
202
203    #[test]
204    fn test_data_sink_ingest() {
205        let record_store = Arc::new(InMemoryRecordStore::new());
206        let episode_store = Arc::new(InMemoryEpisodeStore::new());
207        let learn_model = Arc::new(WorkerDecisionSequenceLearn::new().with_min_actions(2));
208
209        let mut sink = DataSink::new(record_store.clone(), episode_store.clone(), learn_model);
210
211        // 成功アクションを投入
212        let records = vec![
213            make_action_record(1, 0, "CheckStatus", true),
214            make_action_record(2, 0, "ReadLogs", true),
215            make_action_record(3, 0, "Restart", true),
216        ];
217
218        let ids = sink.ingest(records).unwrap();
219
220        // Episode が生成されているはず
221        assert!(!ids.is_empty());
222        assert_eq!(sink.record_count(), 3);
223        assert!(sink.episode_count() > 0);
224    }
225
226    #[test]
227    fn test_data_sink_empty_records() {
228        let record_store = Arc::new(InMemoryRecordStore::new());
229        let episode_store = Arc::new(InMemoryEpisodeStore::new());
230        let learn_model = Arc::new(WorkerDecisionSequenceLearn::new());
231
232        let mut sink = DataSink::new(record_store, episode_store, learn_model);
233
234        let ids = sink.ingest(vec![]).unwrap();
235        assert!(ids.is_empty());
236        assert_eq!(sink.record_count(), 0);
237    }
238}