swarm_engine_core/learn/daemon/
sink.rs1use std::sync::Arc;
7
8use crate::learn::episode::EpisodeId;
9use crate::learn::learn_model::LearnModel;
10use crate::learn::record::Record;
11use crate::learn::store::{EpisodeDto, EpisodeStore, RecordStore, RecordStoreError, StoreError};
12
13#[derive(Debug)]
19pub enum DataSinkError {
20 RecordStore(RecordStoreError),
22 EpisodeStore(StoreError),
24 Other(String),
26}
27
28impl std::fmt::Display for DataSinkError {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 match self {
31 Self::RecordStore(e) => write!(f, "RecordStore error: {}", e),
32 Self::EpisodeStore(e) => write!(f, "EpisodeStore error: {}", e),
33 Self::Other(msg) => write!(f, "{}", msg),
34 }
35 }
36}
37
38impl std::error::Error for DataSinkError {}
39
40impl From<RecordStoreError> for DataSinkError {
41 fn from(e: RecordStoreError) -> Self {
42 Self::RecordStore(e)
43 }
44}
45
46impl From<StoreError> for DataSinkError {
47 fn from(e: StoreError) -> Self {
48 Self::EpisodeStore(e)
49 }
50}
51
52pub struct DataSink {
58 record_store: Arc<dyn RecordStore>,
60 episode_store: Arc<dyn EpisodeStore>,
62 learn_model: Arc<dyn LearnModel>,
64 record_count: usize,
66 episode_count: usize,
68}
69
70impl DataSink {
71 pub fn new(
73 record_store: Arc<dyn RecordStore>,
74 episode_store: Arc<dyn EpisodeStore>,
75 learn_model: Arc<dyn LearnModel>,
76 ) -> Self {
77 Self {
78 record_store,
79 episode_store,
80 learn_model,
81 record_count: 0,
82 episode_count: 0,
83 }
84 }
85
86 pub fn record_store(&self) -> &Arc<dyn RecordStore> {
88 &self.record_store
89 }
90
91 pub fn episode_store(&self) -> &Arc<dyn EpisodeStore> {
93 &self.episode_store
94 }
95
96 pub fn record_count(&self) -> usize {
98 self.record_count
99 }
100
101 pub fn episode_count(&self) -> usize {
103 self.episode_count
104 }
105
106 pub fn ingest(&mut self, records: Vec<Record>) -> Result<Vec<EpisodeId>, DataSinkError> {
111 if records.is_empty() {
112 return Ok(vec![]);
113 }
114
115 tracing::debug!(count = records.len(), "Ingesting records");
116
117 for record in &records {
119 self.record_store.append(record)?;
120 self.record_count += 1;
121 }
122
123 let episodes = self.learn_model.build_episodes(&records);
125
126 if episodes.is_empty() {
127 tracing::debug!("No episodes built from records");
128 return Ok(vec![]);
129 }
130
131 tracing::debug!(count = episodes.len(), "Built episodes from records");
132
133 let mut ids = Vec::with_capacity(episodes.len());
135 for episode in episodes {
136 let dto = EpisodeDto::from_episode(&episode);
137 let id = self.episode_store.append(&dto)?;
138 ids.push(id);
139 self.episode_count += 1;
140 }
141
142 tracing::info!(
143 records = records.len(),
144 episodes = ids.len(),
145 total_records = self.record_count,
146 total_episodes = self.episode_count,
147 "Ingested records and built episodes"
148 );
149
150 Ok(ids)
151 }
152
153 pub fn stats(&self) -> DataSinkStats {
155 DataSinkStats {
156 record_count: self.record_count,
157 episode_count: self.episode_count,
158 learn_model_name: self.learn_model.name().to_string(),
159 }
160 }
161}
162
163#[derive(Debug, Clone)]
165pub struct DataSinkStats {
166 pub record_count: usize,
168 pub episode_count: usize,
170 pub learn_model_name: String,
172}
173
174#[cfg(test)]
179mod tests {
180 use super::*;
181 use crate::events::{ActionContext, ActionEventBuilder, ActionEventResult};
182 use crate::learn::learn_model::WorkerDecisionSequenceLearn;
183 use crate::learn::store::{InMemoryEpisodeStore, InMemoryRecordStore};
184 use crate::types::{TaskId, WorkerId};
185 use std::time::Duration;
186
187 fn make_action_record_with_task(
188 tick: u64,
189 worker_id: usize,
190 action: &str,
191 success: bool,
192 task_id: TaskId,
193 ) -> Record {
194 let result = if success {
195 ActionEventResult::success()
196 } else {
197 ActionEventResult::failure("error")
198 };
199
200 let event = ActionEventBuilder::new(tick, WorkerId(worker_id), action)
201 .task_id(task_id)
202 .result(result)
203 .duration(Duration::from_millis(10))
204 .context(ActionContext::new())
205 .build();
206
207 Record::from(&event)
208 }
209
210 #[test]
211 fn test_data_sink_ingest() {
212 let record_store = Arc::new(InMemoryRecordStore::new());
213 let episode_store = Arc::new(InMemoryEpisodeStore::new());
214 let learn_model = Arc::new(WorkerDecisionSequenceLearn::new().with_min_actions(2));
215
216 let mut sink = DataSink::new(record_store.clone(), episode_store.clone(), learn_model);
217
218 let task_id = TaskId::new();
220
221 let records = vec![
223 make_action_record_with_task(1, 0, "CheckStatus", true, task_id),
224 make_action_record_with_task(2, 0, "ReadLogs", true, task_id),
225 make_action_record_with_task(3, 0, "Restart", true, task_id),
226 ];
227
228 let ids = sink.ingest(records).unwrap();
229
230 assert!(!ids.is_empty());
232 assert_eq!(sink.record_count(), 3);
233 assert!(sink.episode_count() > 0);
234 }
235
236 #[test]
237 fn test_data_sink_empty_records() {
238 let record_store = Arc::new(InMemoryRecordStore::new());
239 let episode_store = Arc::new(InMemoryEpisodeStore::new());
240 let learn_model = Arc::new(WorkerDecisionSequenceLearn::new());
241
242 let mut sink = DataSink::new(record_store, episode_store, learn_model);
243
244 let ids = sink.ingest(vec![]).unwrap();
245 assert!(ids.is_empty());
246 assert_eq!(sink.record_count(), 0);
247 }
248}