use std::fs::{self, File, OpenOptions};
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::RwLock;
use serde::{Deserialize, Serialize};
use super::record_store::RecordId;
use crate::error::SwarmError;
use crate::learn::episode::{Episode, EpisodeId, EpisodeMetadata, Outcome};
use crate::learn::record::{ActionRecord, LlmCallRecord};
#[derive(Debug, thiserror::Error)]
pub enum StoreError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("Episode not found: {0}")]
NotFound(String),
#[error("Store error: {0}")]
Other(String),
}
impl From<StoreError> for SwarmError {
fn from(e: StoreError) -> Self {
SwarmError::config(e.to_string())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EpisodeDto {
pub id: EpisodeId,
pub learn_model: String,
pub outcome: Outcome,
pub metadata: EpisodeMetadata,
pub record_ids: Vec<RecordId>,
}
impl EpisodeDto {
pub fn from_episode(episode: &Episode) -> Self {
Self {
id: episode.id.clone(),
learn_model: episode.learn_model.clone(),
outcome: episode.outcome.clone(),
metadata: episode.metadata.clone(),
record_ids: Vec::new(),
}
}
pub fn with_record_ids(mut self, ids: Vec<RecordId>) -> Self {
self.record_ids = ids;
self
}
}
#[derive(Debug, Clone, Default)]
pub struct EpisodeFilter {
pub learn_model: Option<String>,
pub scenario_name: Option<String>,
pub outcome_filter: Option<OutcomeFilter>,
pub since: Option<u64>,
pub until: Option<u64>,
pub worker_id: Option<usize>,
pub limit: Option<usize>,
pub offset: Option<usize>,
}
impl EpisodeFilter {
pub fn new() -> Self {
Self::default()
}
pub fn learn_model(mut self, name: impl Into<String>) -> Self {
self.learn_model = Some(name.into());
self
}
pub fn strategy(self, name: impl Into<String>) -> Self {
self.learn_model(name)
}
pub fn scenario(mut self, name: impl Into<String>) -> Self {
self.scenario_name = Some(name.into());
self
}
pub fn outcome(mut self, filter: OutcomeFilter) -> Self {
self.outcome_filter = Some(filter);
self
}
pub fn since(mut self, timestamp_ms: u64) -> Self {
self.since = Some(timestamp_ms);
self
}
pub fn until(mut self, timestamp_ms: u64) -> Self {
self.until = Some(timestamp_ms);
self
}
pub fn worker_id(mut self, id: usize) -> Self {
self.worker_id = Some(id);
self
}
pub fn limit(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}
pub fn offset(mut self, offset: usize) -> Self {
self.offset = Some(offset);
self
}
pub fn matches(&self, episode: &Episode) -> bool {
if let Some(ref name) = self.learn_model {
if &episode.learn_model != name {
return false;
}
}
if let Some(ref name) = self.scenario_name {
if episode.metadata.scenario_name.as_ref() != Some(name) {
return false;
}
}
if let Some(ref outcome_filter) = self.outcome_filter {
if !outcome_filter.matches(&episode.outcome) {
return false;
}
}
if let Some(since) = self.since {
if episode.metadata.created_at < since {
return false;
}
}
if let Some(until) = self.until {
if episode.metadata.created_at > until {
return false;
}
}
if let Some(worker_id) = self.worker_id {
if episode.worker_id() != Some(worker_id) {
return false;
}
}
true
}
pub fn matches_dto(&self, dto: &EpisodeDto) -> bool {
if let Some(ref name) = self.learn_model {
if &dto.learn_model != name {
return false;
}
}
if let Some(ref name) = self.scenario_name {
if dto.metadata.scenario_name.as_ref() != Some(name) {
return false;
}
}
if let Some(ref outcome_filter) = self.outcome_filter {
if !outcome_filter.matches(&dto.outcome) {
return false;
}
}
if let Some(since) = self.since {
if dto.metadata.created_at < since {
return false;
}
}
if let Some(until) = self.until {
if dto.metadata.created_at > until {
return false;
}
}
true
}
}
#[derive(Debug, Clone)]
pub enum OutcomeFilter {
SuccessOnly,
FailureOnly,
ScoreAbove(f64),
ScoreBelow(f64),
}
impl OutcomeFilter {
pub fn matches(&self, outcome: &Outcome) -> bool {
match self {
Self::SuccessOnly => outcome.is_success(),
Self::FailureOnly => outcome.is_failure(),
Self::ScoreAbove(threshold) => outcome.score() >= *threshold,
Self::ScoreBelow(threshold) => outcome.score() <= *threshold,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EpisodeMeta {
pub id: EpisodeId,
pub learn_model: String,
pub outcome_type: String,
pub score: f64,
pub created_at: u64,
pub scenario_name: Option<String>,
pub action_count: usize,
pub llm_call_count: usize,
}
impl From<&Episode> for EpisodeMeta {
fn from(ep: &Episode) -> Self {
Self {
id: ep.id.clone(),
learn_model: ep.learn_model.clone(),
outcome_type: match &ep.outcome {
Outcome::Success { .. } => "success".to_string(),
Outcome::Failure { .. } => "failure".to_string(),
Outcome::Timeout { .. } => "timeout".to_string(),
Outcome::Unknown => "unknown".to_string(),
},
score: ep.outcome.score(),
created_at: ep.metadata.created_at,
scenario_name: ep.metadata.scenario_name.clone(),
action_count: ep.context.iter::<ActionRecord>().count(),
llm_call_count: ep.context.iter::<LlmCallRecord>().count(),
}
}
}
impl From<&EpisodeDto> for EpisodeMeta {
fn from(dto: &EpisodeDto) -> Self {
Self {
id: dto.id.clone(),
learn_model: dto.learn_model.clone(),
outcome_type: match &dto.outcome {
Outcome::Success { .. } => "success".to_string(),
Outcome::Failure { .. } => "failure".to_string(),
Outcome::Timeout { .. } => "timeout".to_string(),
Outcome::Unknown => "unknown".to_string(),
},
score: dto.outcome.score(),
created_at: dto.metadata.created_at,
scenario_name: dto.metadata.scenario_name.clone(),
action_count: 0,
llm_call_count: 0,
}
}
}
pub trait EpisodeStore: Send + Sync {
fn append(&self, dto: &EpisodeDto) -> Result<EpisodeId, StoreError>;
fn get(&self, id: &EpisodeId) -> Result<Option<EpisodeDto>, StoreError>;
fn query(&self, filter: &EpisodeFilter) -> Result<Vec<EpisodeDto>, StoreError>;
fn count(&self, filter: Option<&EpisodeFilter>) -> Result<usize, StoreError>;
fn list_meta(&self, filter: Option<&EpisodeFilter>) -> Result<Vec<EpisodeMeta>, StoreError>;
}
pub struct InMemoryEpisodeStore {
episodes: RwLock<Vec<EpisodeDto>>,
}
impl InMemoryEpisodeStore {
pub fn new() -> Self {
Self {
episodes: RwLock::new(Vec::new()),
}
}
}
impl Default for InMemoryEpisodeStore {
fn default() -> Self {
Self::new()
}
}
impl EpisodeStore for InMemoryEpisodeStore {
fn append(&self, dto: &EpisodeDto) -> Result<EpisodeId, StoreError> {
let mut guard = self
.episodes
.write()
.map_err(|_| StoreError::Other("Lock error".into()))?;
guard.push(dto.clone());
Ok(dto.id.clone())
}
fn get(&self, id: &EpisodeId) -> Result<Option<EpisodeDto>, StoreError> {
let guard = self
.episodes
.read()
.map_err(|_| StoreError::Other("Lock error".into()))?;
Ok(guard.iter().find(|e| &e.id == id).cloned())
}
fn query(&self, filter: &EpisodeFilter) -> Result<Vec<EpisodeDto>, StoreError> {
let guard = self
.episodes
.read()
.map_err(|_| StoreError::Other("Lock error".into()))?;
let mut result: Vec<_> = guard
.iter()
.filter(|e| filter.matches_dto(e))
.cloned()
.collect();
if let Some(offset) = filter.offset {
if offset < result.len() {
result = result.into_iter().skip(offset).collect();
} else {
result = Vec::new();
}
}
if let Some(limit) = filter.limit {
result.truncate(limit);
}
Ok(result)
}
fn count(&self, filter: Option<&EpisodeFilter>) -> Result<usize, StoreError> {
let guard = self
.episodes
.read()
.map_err(|_| StoreError::Other("Lock error".into()))?;
let count = match filter {
Some(f) => guard.iter().filter(|e| f.matches_dto(e)).count(),
None => guard.len(),
};
Ok(count)
}
fn list_meta(&self, filter: Option<&EpisodeFilter>) -> Result<Vec<EpisodeMeta>, StoreError> {
let guard = self
.episodes
.read()
.map_err(|_| StoreError::Other("Lock error".into()))?;
let result: Vec<EpisodeMeta> = match filter {
Some(f) => guard
.iter()
.filter(|e| f.matches_dto(e))
.map(EpisodeMeta::from)
.collect(),
None => guard.iter().map(EpisodeMeta::from).collect(),
};
Ok(result)
}
}
pub struct FileEpisodeStore {
base_path: PathBuf,
cache: Option<RwLock<Vec<EpisodeDto>>>,
}
impl FileEpisodeStore {
pub fn new(base_path: impl AsRef<Path>) -> Result<Self, StoreError> {
let base_path = base_path.as_ref().to_path_buf();
fs::create_dir_all(&base_path)?;
Ok(Self {
base_path,
cache: None,
})
}
pub fn with_cache(base_path: impl AsRef<Path>) -> Result<Self, StoreError> {
let mut store = Self::new(base_path)?;
store.cache = Some(RwLock::new(Vec::new()));
store.load_all_to_cache()?;
Ok(store)
}
fn file_path(&self) -> PathBuf {
self.base_path.join("episodes.jsonl")
}
fn load_all(&self) -> Result<Vec<EpisodeDto>, StoreError> {
let path = self.file_path();
if !path.exists() {
return Ok(Vec::new());
}
let file = File::open(path)?;
let reader = BufReader::new(file);
let mut episodes = Vec::new();
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<EpisodeDto>(&line) {
Ok(dto) => episodes.push(dto),
Err(e) => {
tracing::warn!("Failed to parse episode line: {}", e);
continue;
}
}
}
episodes.sort_by_key(|e| e.metadata.created_at);
Ok(episodes)
}
fn load_all_to_cache(&self) -> Result<(), StoreError> {
if let Some(ref cache) = self.cache {
let episodes = self.load_all()?;
let mut guard = cache
.write()
.map_err(|_| StoreError::Other("Lock error".into()))?;
*guard = episodes;
}
Ok(())
}
fn append_to_file(&self, dto: &EpisodeDto) -> Result<(), StoreError> {
let path = self.file_path();
let file = OpenOptions::new().create(true).append(true).open(&path)?;
let mut writer = BufWriter::new(file);
let json = serde_json::to_string(dto)?;
writeln!(writer, "{}", json)?;
writer.flush()?;
Ok(())
}
}
impl EpisodeStore for FileEpisodeStore {
fn append(&self, dto: &EpisodeDto) -> Result<EpisodeId, StoreError> {
self.append_to_file(dto)?;
if let Some(ref cache) = self.cache {
if let Ok(mut guard) = cache.write() {
guard.push(dto.clone());
}
}
Ok(dto.id.clone())
}
fn get(&self, id: &EpisodeId) -> Result<Option<EpisodeDto>, StoreError> {
if let Some(ref cache) = self.cache {
if let Ok(guard) = cache.read() {
return Ok(guard.iter().find(|e| &e.id == id).cloned());
}
}
for dto in self.load_all()? {
if &dto.id == id {
return Ok(Some(dto));
}
}
Ok(None)
}
fn query(&self, filter: &EpisodeFilter) -> Result<Vec<EpisodeDto>, StoreError> {
let episodes = if let Some(ref cache) = self.cache {
cache
.read()
.map_err(|_| StoreError::Other("Lock error".into()))?
.clone()
} else {
self.load_all()?
};
let mut result: Vec<_> = episodes
.into_iter()
.filter(|e| filter.matches_dto(e))
.collect();
if let Some(offset) = filter.offset {
if offset < result.len() {
result = result.into_iter().skip(offset).collect();
} else {
result = Vec::new();
}
}
if let Some(limit) = filter.limit {
result.truncate(limit);
}
Ok(result)
}
fn count(&self, filter: Option<&EpisodeFilter>) -> Result<usize, StoreError> {
let episodes = if let Some(ref cache) = self.cache {
cache
.read()
.map_err(|_| StoreError::Other("Lock error".into()))?
.clone()
} else {
self.load_all()?
};
let count = match filter {
Some(f) => episodes.iter().filter(|e| f.matches_dto(e)).count(),
None => episodes.len(),
};
Ok(count)
}
fn list_meta(&self, filter: Option<&EpisodeFilter>) -> Result<Vec<EpisodeMeta>, StoreError> {
let episodes = if let Some(ref cache) = self.cache {
cache
.read()
.map_err(|_| StoreError::Other("Lock error".into()))?
.clone()
} else {
self.load_all()?
};
let result: Vec<EpisodeMeta> = match filter {
Some(f) => episodes
.iter()
.filter(|e| f.matches_dto(e))
.map(EpisodeMeta::from)
.collect(),
None => episodes.iter().map(EpisodeMeta::from).collect(),
};
Ok(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::learn::episode::{Episode, Outcome};
fn make_episode_dto(worker_id: usize, success: bool) -> EpisodeDto {
let outcome = if success {
Outcome::success_binary()
} else {
Outcome::failure("test error")
};
let episode = Episode::builder()
.learn_model("worker_task")
.record(ActionRecord::new(1, worker_id, "CheckStatus").success(success))
.outcome(outcome)
.scenario("test-scenario")
.build();
EpisodeDto::from_episode(&episode)
}
#[test]
fn test_in_memory_store_append_and_get() {
let store = InMemoryEpisodeStore::new();
let dto = make_episode_dto(0, true);
let id = dto.id.clone();
store.append(&dto).unwrap();
let retrieved = store.get(&id).unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().id, id);
}
#[test]
fn test_in_memory_store_query_by_outcome() {
let store = InMemoryEpisodeStore::new();
store.append(&make_episode_dto(0, true)).unwrap();
store.append(&make_episode_dto(1, true)).unwrap();
store.append(&make_episode_dto(2, false)).unwrap();
let filter = EpisodeFilter::new().outcome(OutcomeFilter::SuccessOnly);
let results = store.query(&filter).unwrap();
assert_eq!(results.len(), 2);
let filter = EpisodeFilter::new().outcome(OutcomeFilter::FailureOnly);
let results = store.query(&filter).unwrap();
assert_eq!(results.len(), 1);
}
#[test]
fn test_file_store_roundtrip() {
let temp_dir =
std::env::temp_dir().join(format!("episode_store_test_{}", std::process::id()));
let store = FileEpisodeStore::new(&temp_dir).unwrap();
let dto = make_episode_dto(0, true);
let id = dto.id.clone();
store.append(&dto).unwrap();
let store2 = FileEpisodeStore::new(&temp_dir).unwrap();
let retrieved = store2.get(&id).unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().id, id);
let _ = std::fs::remove_dir_all(&temp_dir);
}
}