use crate::{Result, TursoStorage, schema};
use tracing::{debug, info};
impl TursoStorage {
pub async fn initialize_schema(&self) -> Result<()> {
info!("Initializing Turso database schema");
let conn = self.get_connection().await?;
let _ = self.execute_pragmas(&conn).await;
self.execute_with_retry(&conn, schema::CREATE_EPISODES_TABLE)
.await?;
self.ensure_episodes_checkpoints_column(&conn).await?;
self.execute_with_retry(&conn, schema::CREATE_PATTERNS_TABLE)
.await?;
self.execute_with_retry(&conn, schema::CREATE_HEURISTICS_TABLE)
.await?;
self.execute_with_retry(&conn, schema::CREATE_RECOMMENDATION_SESSIONS_TABLE)
.await?;
self.execute_with_retry(&conn, schema::CREATE_RECOMMENDATION_FEEDBACK_TABLE)
.await?;
#[cfg(not(feature = "turso_multi_dimension"))]
self.execute_with_retry(&conn, schema::CREATE_EMBEDDINGS_TABLE)
.await?;
self.execute_with_retry(&conn, schema::CREATE_EXECUTION_RECORDS_TABLE)
.await?;
self.execute_with_retry(&conn, schema::CREATE_AGENT_METRICS_TABLE)
.await?;
self.execute_with_retry(&conn, schema::CREATE_TASK_METRICS_TABLE)
.await?;
self.execute_with_retry(&conn, schema::CREATE_EPISODES_TASK_TYPE_INDEX)
.await?;
self.execute_with_retry(&conn, schema::CREATE_EPISODES_TIMESTAMP_INDEX)
.await?;
self.execute_with_retry(&conn, schema::CREATE_EPISODES_DOMAIN_INDEX)
.await?;
self.execute_with_retry(&conn, schema::CREATE_EPISODES_ARCHIVED_INDEX)
.await?;
self.execute_with_retry(&conn, schema::CREATE_PATTERNS_CONTEXT_INDEX)
.await?;
self.execute_with_retry(&conn, schema::CREATE_HEURISTICS_CONFIDENCE_INDEX)
.await?;
self.execute_with_retry(&conn, schema::CREATE_RECOMMENDATION_SESSIONS_EPISODE_INDEX)
.await?;
#[cfg(not(feature = "turso_multi_dimension"))]
{
self.execute_with_retry(&conn, schema::CREATE_EMBEDDINGS_ITEM_INDEX)
.await?;
self.execute_with_retry(&conn, schema::CREATE_EMBEDDINGS_VECTOR_INDEX)
.await?;
}
self.execute_with_retry(&conn, schema::CREATE_EXECUTION_RECORDS_TIME_INDEX)
.await?;
self.execute_with_retry(&conn, schema::CREATE_EXECUTION_RECORDS_AGENT_INDEX)
.await?;
self.execute_with_retry(&conn, schema::CREATE_AGENT_METRICS_TYPE_INDEX)
.await?;
self.execute_with_retry(&conn, schema::CREATE_EPISODE_SUMMARIES_TABLE)
.await?;
self.execute_with_retry(&conn, schema::CREATE_SUMMARIES_CREATED_AT_INDEX)
.await?;
self.execute_with_retry(&conn, schema::CREATE_METADATA_TABLE)
.await?;
self.execute_with_retry(&conn, schema::CREATE_EPISODE_TAGS_TABLE)
.await?;
self.execute_with_retry(&conn, schema::CREATE_EPISODE_TAGS_TAG_INDEX)
.await?;
self.execute_with_retry(&conn, schema::CREATE_EPISODE_TAGS_EPISODE_INDEX)
.await?;
self.execute_with_retry(&conn, schema::CREATE_TAG_METADATA_TABLE)
.await?;
self.execute_with_retry(&conn, schema::CREATE_EPISODE_RELATIONSHIPS_TABLE)
.await?;
self.execute_with_retry(&conn, schema::CREATE_RELATIONSHIPS_FROM_INDEX)
.await?;
self.execute_with_retry(&conn, schema::CREATE_RELATIONSHIPS_TO_INDEX)
.await?;
self.execute_with_retry(&conn, schema::CREATE_RELATIONSHIPS_TYPE_INDEX)
.await?;
self.execute_with_retry(&conn, schema::CREATE_RELATIONSHIPS_BIDIRECTIONAL_INDEX)
.await?;
#[cfg(feature = "hybrid_search")]
self.initialize_fts5_schema(&conn).await?;
#[cfg(feature = "turso_multi_dimension")]
self.initialize_vector_tables(&conn).await?;
info!("Schema initialization complete");
Ok(())
}
#[cfg(feature = "hybrid_search")]
async fn initialize_fts5_schema(&self, conn: &libsql::Connection) -> Result<()> {
use crate::fts5_schema;
info!("Initializing FTS5 schema for hybrid search");
self.execute_with_retry(conn, fts5_schema::CREATE_EPISODES_FTS_TABLE)
.await?;
self.execute_with_retry(conn, fts5_schema::CREATE_PATTERNS_FTS_TABLE)
.await?;
self.execute_with_retry(conn, fts5_schema::CREATE_EPISODES_FTS_TRIGGERS)
.await?;
self.execute_with_retry(conn, fts5_schema::CREATE_PATTERNS_FTS_TRIGGERS)
.await?;
info!("FTS5 schema initialization complete");
Ok(())
}
#[cfg(not(feature = "hybrid_search"))]
#[allow(dead_code)]
async fn initialize_fts5_schema(&self, _conn: &libsql::Connection) -> Result<()> {
Ok(())
}
#[cfg(feature = "turso_multi_dimension")]
async fn initialize_vector_tables(&self, conn: &libsql::Connection) -> Result<()> {
info!("Initializing dimension-specific vector tables");
self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_384_TABLE)
.await?;
self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_1024_TABLE)
.await?;
self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_1536_TABLE)
.await?;
self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_3072_TABLE)
.await?;
self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_OTHER_TABLE)
.await?;
self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_384_VECTOR_INDEX)
.await?;
self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_1024_VECTOR_INDEX)
.await?;
self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_1536_VECTOR_INDEX)
.await?;
self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_3072_VECTOR_INDEX)
.await?;
self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_384_ITEM_INDEX)
.await?;
self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_1024_ITEM_INDEX)
.await?;
self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_1536_ITEM_INDEX)
.await?;
self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_3072_ITEM_INDEX)
.await?;
self.execute_with_retry(conn, schema::CREATE_EMBEDDINGS_OTHER_ITEM_INDEX)
.await?;
info!("Dimension-specific vector tables initialized");
Ok(())
}
#[cfg(not(feature = "turso_multi_dimension"))]
#[allow(dead_code)]
async fn initialize_vector_tables(&self, _conn: &libsql::Connection) -> Result<()> {
Ok(())
}
async fn ensure_episodes_checkpoints_column(&self, conn: &libsql::Connection) -> Result<()> {
let mut rows = conn
.query("PRAGMA table_info(episodes)", ())
.await
.map_err(|e| {
do_memory_core::Error::Storage(format!("Failed to inspect episodes schema: {}", e))
})?;
let mut has_checkpoints = false;
while let Some(row) = rows.next().await.map_err(|e| {
do_memory_core::Error::Storage(format!("Failed to read episodes schema row: {}", e))
})? {
let column_name: String = row.get(1).map_err(|e| {
do_memory_core::Error::Storage(format!(
"Failed to parse episodes schema column name: {}",
e
))
})?;
if column_name == "checkpoints" {
has_checkpoints = true;
break;
}
}
if !has_checkpoints {
debug!("Adding missing episodes.checkpoints column");
self.execute_with_retry(conn, schema::ADD_EPISODES_CHECKPOINTS_COLUMN)
.await?;
}
Ok(())
}
}