swarm-engine-core 0.1.6

Core types and orchestration for SwarmEngine
Documentation
//! DataSink - Record/Episode の永続化
//!
//! EventSource から受け取った Record を Store に永続化し、
//! LearnModel を使って Episode を構築する。

use std::sync::Arc;

use crate::learn::episode::EpisodeId;
use crate::learn::learn_model::LearnModel;
use crate::learn::record::Record;
use crate::learn::store::{EpisodeDto, EpisodeStore, RecordStore, RecordStoreError, StoreError};

// ============================================================================
// DataSinkError
// ============================================================================

/// DataSink のエラー型
#[derive(Debug)]
pub enum DataSinkError {
    /// Record Store エラー
    RecordStore(RecordStoreError),
    /// Episode Store エラー
    EpisodeStore(StoreError),
    /// その他
    Other(String),
}

impl std::fmt::Display for DataSinkError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::RecordStore(e) => write!(f, "RecordStore error: {}", e),
            Self::EpisodeStore(e) => write!(f, "EpisodeStore error: {}", e),
            Self::Other(msg) => write!(f, "{}", msg),
        }
    }
}

impl std::error::Error for DataSinkError {}

impl From<RecordStoreError> for DataSinkError {
    fn from(e: RecordStoreError) -> Self {
        Self::RecordStore(e)
    }
}

impl From<StoreError> for DataSinkError {
    fn from(e: StoreError) -> Self {
        Self::EpisodeStore(e)
    }
}

// ============================================================================
// DataSink
// ============================================================================

/// Record/Episode の永続化を担当
pub struct DataSink {
    /// Record Store
    record_store: Arc<dyn RecordStore>,
    /// Episode Store
    episode_store: Arc<dyn EpisodeStore>,
    /// LearnModel(Record → Episode 変換用)
    learn_model: Arc<dyn LearnModel>,
    /// 永続化した Record 数
    record_count: usize,
    /// 生成した Episode 数
    episode_count: usize,
}

impl DataSink {
    /// 新しい DataSink を作成
    pub fn new(
        record_store: Arc<dyn RecordStore>,
        episode_store: Arc<dyn EpisodeStore>,
        learn_model: Arc<dyn LearnModel>,
    ) -> Self {
        Self {
            record_store,
            episode_store,
            learn_model,
            record_count: 0,
            episode_count: 0,
        }
    }

    /// Record Store への参照を取得
    pub fn record_store(&self) -> &Arc<dyn RecordStore> {
        &self.record_store
    }

    /// Episode Store への参照を取得
    pub fn episode_store(&self) -> &Arc<dyn EpisodeStore> {
        &self.episode_store
    }

    /// 永続化した Record 数を取得
    pub fn record_count(&self) -> usize {
        self.record_count
    }

    /// 生成した Episode 数を取得
    pub fn episode_count(&self) -> usize {
        self.episode_count
    }

    /// Record を永続化し、Episode を構築
    ///
    /// # Returns
    /// 生成された Episode の ID リスト
    pub fn ingest(&mut self, records: Vec<Record>) -> Result<Vec<EpisodeId>, DataSinkError> {
        if records.is_empty() {
            return Ok(vec![]);
        }

        tracing::debug!(count = records.len(), "Ingesting records");

        // 1. Record を保存
        for record in &records {
            self.record_store.append(record)?;
            self.record_count += 1;
        }

        // 2. LearnModel で Episode 構築
        let episodes = self.learn_model.build_episodes(&records);

        if episodes.is_empty() {
            tracing::debug!("No episodes built from records");
            return Ok(vec![]);
        }

        tracing::debug!(count = episodes.len(), "Built episodes from records");

        // 3. Episode を保存
        let mut ids = Vec::with_capacity(episodes.len());
        for episode in episodes {
            let dto = EpisodeDto::from_episode(&episode);
            let id = self.episode_store.append(&dto)?;
            ids.push(id);
            self.episode_count += 1;
        }

        tracing::info!(
            records = records.len(),
            episodes = ids.len(),
            total_records = self.record_count,
            total_episodes = self.episode_count,
            "Ingested records and built episodes"
        );

        Ok(ids)
    }

    /// 統計情報を取得
    pub fn stats(&self) -> DataSinkStats {
        DataSinkStats {
            record_count: self.record_count,
            episode_count: self.episode_count,
            learn_model_name: self.learn_model.name().to_string(),
        }
    }
}

/// DataSink の統計情報
#[derive(Debug, Clone)]
pub struct DataSinkStats {
    /// 永続化した Record 数
    pub record_count: usize,
    /// 生成した Episode 数
    pub episode_count: usize,
    /// 使用中の LearnModel 名
    pub learn_model_name: String,
}

// ============================================================================
// Tests
// ============================================================================

#[cfg(test)]
mod tests {
    use super::*;
    use crate::events::{ActionContext, ActionEventBuilder, ActionEventResult};
    use crate::learn::learn_model::WorkerDecisionSequenceLearn;
    use crate::learn::store::{InMemoryEpisodeStore, InMemoryRecordStore};
    use crate::types::{TaskId, WorkerId};
    use std::time::Duration;

    fn make_action_record_with_task(
        tick: u64,
        worker_id: usize,
        action: &str,
        success: bool,
        task_id: TaskId,
    ) -> Record {
        let result = if success {
            ActionEventResult::success()
        } else {
            ActionEventResult::failure("error")
        };

        let event = ActionEventBuilder::new(tick, WorkerId(worker_id), action)
            .task_id(task_id)
            .result(result)
            .duration(Duration::from_millis(10))
            .context(ActionContext::new())
            .build();

        Record::from(&event)
    }

    #[test]
    fn test_data_sink_ingest() {
        let record_store = Arc::new(InMemoryRecordStore::new());
        let episode_store = Arc::new(InMemoryEpisodeStore::new());
        let learn_model = Arc::new(WorkerDecisionSequenceLearn::new().with_min_actions(2));

        let mut sink = DataSink::new(record_store.clone(), episode_store.clone(), learn_model);

        // All actions belong to the same task
        let task_id = TaskId::new();

        // 成功アクションを投入
        let records = vec![
            make_action_record_with_task(1, 0, "CheckStatus", true, task_id),
            make_action_record_with_task(2, 0, "ReadLogs", true, task_id),
            make_action_record_with_task(3, 0, "Restart", true, task_id),
        ];

        let ids = sink.ingest(records).unwrap();

        // Episode が生成されているはず
        assert!(!ids.is_empty());
        assert_eq!(sink.record_count(), 3);
        assert!(sink.episode_count() > 0);
    }

    #[test]
    fn test_data_sink_empty_records() {
        let record_store = Arc::new(InMemoryRecordStore::new());
        let episode_store = Arc::new(InMemoryEpisodeStore::new());
        let learn_model = Arc::new(WorkerDecisionSequenceLearn::new());

        let mut sink = DataSink::new(record_store, episode_store, learn_model);

        let ids = sink.ingest(vec![]).unwrap();
        assert!(ids.is_empty());
        assert_eq!(sink.record_count(), 0);
    }
}