use std::sync::Arc;
use crate::learn::episode::EpisodeId;
use crate::learn::learn_model::LearnModel;
use crate::learn::record::Record;
use crate::learn::store::{EpisodeDto, EpisodeStore, RecordStore, RecordStoreError, StoreError};
#[derive(Debug)]
pub enum DataSinkError {
RecordStore(RecordStoreError),
EpisodeStore(StoreError),
Other(String),
}
impl std::fmt::Display for DataSinkError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::RecordStore(e) => write!(f, "RecordStore error: {}", e),
Self::EpisodeStore(e) => write!(f, "EpisodeStore error: {}", e),
Self::Other(msg) => write!(f, "{}", msg),
}
}
}
impl std::error::Error for DataSinkError {}
impl From<RecordStoreError> for DataSinkError {
fn from(e: RecordStoreError) -> Self {
Self::RecordStore(e)
}
}
impl From<StoreError> for DataSinkError {
fn from(e: StoreError) -> Self {
Self::EpisodeStore(e)
}
}
pub struct DataSink {
record_store: Arc<dyn RecordStore>,
episode_store: Arc<dyn EpisodeStore>,
learn_model: Arc<dyn LearnModel>,
record_count: usize,
episode_count: usize,
}
impl DataSink {
pub fn new(
record_store: Arc<dyn RecordStore>,
episode_store: Arc<dyn EpisodeStore>,
learn_model: Arc<dyn LearnModel>,
) -> Self {
Self {
record_store,
episode_store,
learn_model,
record_count: 0,
episode_count: 0,
}
}
pub fn record_store(&self) -> &Arc<dyn RecordStore> {
&self.record_store
}
pub fn episode_store(&self) -> &Arc<dyn EpisodeStore> {
&self.episode_store
}
pub fn record_count(&self) -> usize {
self.record_count
}
pub fn episode_count(&self) -> usize {
self.episode_count
}
pub fn ingest(&mut self, records: Vec<Record>) -> Result<Vec<EpisodeId>, DataSinkError> {
if records.is_empty() {
return Ok(vec![]);
}
tracing::debug!(count = records.len(), "Ingesting records");
for record in &records {
self.record_store.append(record)?;
self.record_count += 1;
}
let episodes = self.learn_model.build_episodes(&records);
if episodes.is_empty() {
tracing::debug!("No episodes built from records");
return Ok(vec![]);
}
tracing::debug!(count = episodes.len(), "Built episodes from records");
let mut ids = Vec::with_capacity(episodes.len());
for episode in episodes {
let dto = EpisodeDto::from_episode(&episode);
let id = self.episode_store.append(&dto)?;
ids.push(id);
self.episode_count += 1;
}
tracing::info!(
records = records.len(),
episodes = ids.len(),
total_records = self.record_count,
total_episodes = self.episode_count,
"Ingested records and built episodes"
);
Ok(ids)
}
pub fn stats(&self) -> DataSinkStats {
DataSinkStats {
record_count: self.record_count,
episode_count: self.episode_count,
learn_model_name: self.learn_model.name().to_string(),
}
}
}
#[derive(Debug, Clone)]
pub struct DataSinkStats {
pub record_count: usize,
pub episode_count: usize,
pub learn_model_name: String,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::events::{ActionContext, ActionEventBuilder, ActionEventResult};
use crate::learn::learn_model::WorkerDecisionSequenceLearn;
use crate::learn::store::{InMemoryEpisodeStore, InMemoryRecordStore};
use crate::types::{TaskId, WorkerId};
use std::time::Duration;
fn make_action_record_with_task(
tick: u64,
worker_id: usize,
action: &str,
success: bool,
task_id: TaskId,
) -> Record {
let result = if success {
ActionEventResult::success()
} else {
ActionEventResult::failure("error")
};
let event = ActionEventBuilder::new(tick, WorkerId(worker_id), action)
.task_id(task_id)
.result(result)
.duration(Duration::from_millis(10))
.context(ActionContext::new())
.build();
Record::from(&event)
}
#[test]
fn test_data_sink_ingest() {
let record_store = Arc::new(InMemoryRecordStore::new());
let episode_store = Arc::new(InMemoryEpisodeStore::new());
let learn_model = Arc::new(WorkerDecisionSequenceLearn::new().with_min_actions(2));
let mut sink = DataSink::new(record_store.clone(), episode_store.clone(), learn_model);
let task_id = TaskId::new();
let records = vec![
make_action_record_with_task(1, 0, "CheckStatus", true, task_id),
make_action_record_with_task(2, 0, "ReadLogs", true, task_id),
make_action_record_with_task(3, 0, "Restart", true, task_id),
];
let ids = sink.ingest(records).unwrap();
assert!(!ids.is_empty());
assert_eq!(sink.record_count(), 3);
assert!(sink.episode_count() > 0);
}
#[test]
fn test_data_sink_empty_records() {
let record_store = Arc::new(InMemoryRecordStore::new());
let episode_store = Arc::new(InMemoryEpisodeStore::new());
let learn_model = Arc::new(WorkerDecisionSequenceLearn::new());
let mut sink = DataSink::new(record_store, episode_store, learn_model);
let ids = sink.ingest(vec![]).unwrap();
assert!(ids.is_empty());
assert_eq!(sink.record_count(), 0);
}
}