use super::EpisodeQuery;
use crate::TursoStorage;
use do_memory_core::{Episode, Error, Result, apply_query_limit as core_apply_limit};
use tracing::{debug, info};
#[inline]
fn apply_query_limit(limit: Option<usize>) -> usize {
core_apply_limit(limit)
}
impl TursoStorage {
pub async fn query_episodes(&self, query: &EpisodeQuery) -> Result<Vec<Episode>> {
debug!("Querying episodes with filters: {:?}", query);
let (conn, _conn_id) = self.get_connection_with_id().await?;
let mut sql = String::from(
r#"
SELECT episode_id, task_type, task_description, context,
start_time, end_time, steps, outcome, reward,
reflection, patterns, heuristics,
COALESCE(checkpoints, '[]') AS checkpoints,
metadata, domain, language,
archived_at
FROM episodes WHERE 1=1
"#,
);
let mut params_vec = Vec::new();
if let Some(ref task_type) = query.task_type {
sql.push_str(" AND task_type = ?");
params_vec.push(task_type.to_string());
}
if let Some(ref domain) = query.domain {
sql.push_str(" AND domain = ?");
params_vec.push(domain.clone());
}
if let Some(ref language) = query.language {
sql.push_str(" AND language = ?");
params_vec.push(language.clone());
}
if query.completed_only {
sql.push_str(" AND end_time IS NOT NULL");
}
sql.push_str(" ORDER BY start_time DESC");
let limit = apply_query_limit(query.limit);
sql.push_str(&format!(" LIMIT {}", limit));
let mut rows = conn
.query(&sql, libsql::params_from_iter(params_vec))
.await
.map_err(|e| Error::Storage(format!("Failed to query episodes: {}", e)))?;
let mut episodes = Vec::new();
while let Some(row) = rows
.next()
.await
.map_err(|e| Error::Storage(format!("Failed to fetch episode row: {}", e)))?
{
episodes.push(Self::row_to_episode(&row)?);
}
info!("Found {} episodes matching query", episodes.len());
Ok(episodes)
}
pub async fn query_episodes_since(
&self,
since: chrono::DateTime<chrono::Utc>,
limit: Option<usize>,
) -> Result<Vec<Episode>> {
let effective_limit = apply_query_limit(limit);
debug!(
"Querying episodes since {} (limit: {})",
since, effective_limit
);
let (conn, _conn_id) = self.get_connection_with_id().await?;
const SQL: &str = r#"
SELECT episode_id, task_type, task_description, context,
start_time, end_time, steps, outcome, reward,
reflection, patterns, heuristics,
COALESCE(checkpoints, '[]') AS checkpoints,
metadata, domain, language,
archived_at
FROM episodes
WHERE start_time >= ?
ORDER BY start_time DESC
LIMIT ?
"#;
let since_timestamp = since.timestamp();
let stmt = self
.prepared_cache
.get_or_prepare(&conn, SQL)
.await
.map_err(|e| Error::Storage(format!("Failed to prepare statement: {}", e)))?;
let mut rows = stmt
.query(libsql::params![since_timestamp, effective_limit as i64])
.await
.map_err(|e| Error::Storage(format!("Failed to query episodes: {}", e)))?;
let mut episodes = Vec::new();
while let Some(row) = rows
.next()
.await
.map_err(|e| Error::Storage(format!("Failed to fetch episode row: {}", e)))?
{
episodes.push(Self::row_to_episode(&row)?);
}
info!(
"Found {} episodes since {} (limit: {})",
episodes.len(),
since,
effective_limit
);
Ok(episodes)
}
pub async fn query_episodes_by_metadata(
&self,
key: &str,
value: &str,
limit: Option<usize>,
) -> Result<Vec<Episode>> {
let effective_limit = apply_query_limit(limit);
debug!(
"Querying episodes by metadata {} = {} (limit: {})",
key, value, effective_limit
);
let (conn, _conn_id) = self.get_connection_with_id().await?;
let sql = r#"
SELECT episode_id, task_type, task_description, context,
start_time, end_time, steps, outcome, reward,
reflection, patterns, heuristics,
COALESCE(checkpoints, '[]') AS checkpoints,
metadata, domain, language,
archived_at
FROM episodes
WHERE json_extract(metadata, ?) = ?
ORDER BY start_time DESC
LIMIT ?
"#;
let json_path = format!("$.{}", key);
let mut rows = conn
.query(
sql,
libsql::params![json_path, value, effective_limit as i64],
)
.await
.map_err(|e| Error::Storage(format!("Failed to query episodes by metadata: {}", e)))?;
let mut episodes = Vec::new();
while let Some(row) = rows
.next()
.await
.map_err(|e| Error::Storage(format!("Failed to fetch episode row: {}", e)))?
{
episodes.push(Self::row_to_episode(&row)?);
}
info!(
"Found {} episodes with metadata {} = {} (limit: {})",
episodes.len(),
key,
value,
effective_limit
);
Ok(episodes)
}
}
#[cfg(test)]
mod tests {
use super::*;
use do_memory_core::{Episode, TaskContext, TaskType};
use tempfile::TempDir;
async fn create_test_storage() -> Result<(TursoStorage, TempDir)> {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("test.db");
let db = libsql::Builder::new_local(&db_path)
.build()
.await
.map_err(|e| Error::Storage(format!("Failed to create test database: {}", e)))?;
let storage = TursoStorage::from_database(db)?;
storage.initialize_schema().await?;
Ok((storage, dir))
}
#[tokio::test]
async fn test_query_episodes_empty() {
let (storage, _dir) = create_test_storage().await.unwrap();
let query = EpisodeQuery::default();
let result = storage.query_episodes(&query).await.unwrap();
assert!(result.is_empty());
}
#[tokio::test]
async fn test_query_episodes_with_limit() {
let (storage, _dir) = create_test_storage().await.unwrap();
for i in 0..5 {
let episode = Episode::new(
format!("Task {}", i),
TaskContext::default(),
TaskType::CodeGeneration,
);
storage.store_episode(&episode).await.unwrap();
}
let query = EpisodeQuery {
limit: Some(3),
..Default::default()
};
let result = storage.query_episodes(&query).await.unwrap();
assert_eq!(result.len(), 3);
}
#[tokio::test]
async fn test_query_episodes_by_task_type() {
let (storage, _dir) = create_test_storage().await.unwrap();
for i in 0..3 {
let episode = Episode::new(
format!("Code task {}", i),
TaskContext::default(),
TaskType::CodeGeneration,
);
storage.store_episode(&episode).await.unwrap();
}
for i in 0..2 {
let episode = Episode::new(
format!("Debug task {}", i),
TaskContext::default(),
TaskType::Debugging,
);
storage.store_episode(&episode).await.unwrap();
}
let query = EpisodeQuery {
task_type: Some(TaskType::CodeGeneration),
..Default::default()
};
let result = storage.query_episodes(&query).await.unwrap();
assert_eq!(result.len(), 3);
}
#[tokio::test]
async fn test_query_episodes_by_metadata() {
let (storage, _dir) = create_test_storage().await.unwrap();
let mut episode1 = Episode::new(
"Task with tag".to_string(),
TaskContext::default(),
TaskType::Refactoring,
);
episode1
.metadata
.insert("tag".to_string(), "important".to_string());
storage.store_episode(&episode1).await.unwrap();
let episode2 = Episode::new(
"Task without tag".to_string(),
TaskContext::default(),
TaskType::Refactoring,
);
storage.store_episode(&episode2).await.unwrap();
let result = storage
.query_episodes_by_metadata("tag", "important", None)
.await
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].task_description, "Task with tag");
}
}