use super::super::{
EMBEDDINGS_TABLE, EPISODES_TABLE, HEURISTICS_TABLE, METADATA_TABLE, PATTERNS_TABLE,
RECOMMENDATION_EPISODE_INDEX_TABLE, RECOMMENDATION_FEEDBACK_TABLE,
RECOMMENDATION_SESSIONS_TABLE, RELATIONSHIPS_TABLE, SUMMARIES_TABLE, with_db_timeout,
};
use crate::RedbStorage;
use do_memory_core::{Error, Result};
use redb::ReadableTable;
use std::sync::Arc;
use tracing::info;
impl RedbStorage {
pub(super) async fn clear_all_tables(&self) -> Result<()> {
info!("Clearing all tables due to schema version change");
let db = Arc::clone(&self.db);
with_db_timeout(move || {
let write_txn = db
.begin_write()
.map_err(|e| Error::Storage(format!("Failed to begin write transaction: {}", e)))?;
{
Self::clear_table_entries(&write_txn, EPISODES_TABLE, "episodes")?;
Self::clear_table_entries(&write_txn, PATTERNS_TABLE, "patterns")?;
Self::clear_table_entries(&write_txn, HEURISTICS_TABLE, "heuristics")?;
Self::clear_table_entries(&write_txn, EMBEDDINGS_TABLE, "embeddings")?;
Self::clear_table_entries(&write_txn, METADATA_TABLE, "metadata")?;
Self::clear_table_entries(&write_txn, SUMMARIES_TABLE, "summaries")?;
Self::clear_table_entries(&write_txn, RELATIONSHIPS_TABLE, "relationships")?;
Self::clear_table_entries(
&write_txn,
RECOMMENDATION_SESSIONS_TABLE,
"recommendation_sessions",
)?;
Self::clear_table_entries(
&write_txn,
RECOMMENDATION_FEEDBACK_TABLE,
"recommendation_feedback",
)?;
Self::clear_table_entries_str(
&write_txn,
RECOMMENDATION_EPISODE_INDEX_TABLE,
"recommendation_episode_index",
)?;
}
write_txn
.commit()
.map_err(|e| Error::Storage(format!("Failed to commit transaction: {}", e)))?;
Ok::<(), Error>(())
})
.await?;
self.cache.clear().await;
info!("Successfully cleared all tables");
Ok(())
}
pub async fn clear_all(&self) -> Result<()> {
info!("Clearing all cached data from redb");
self.cache.clear().await;
let db = Arc::clone(&self.db);
with_db_timeout(move || {
let write_txn = db
.begin_write()
.map_err(|e| Error::Storage(format!("Failed to begin write transaction: {}", e)))?;
{
Self::clear_table_entries(&write_txn, EPISODES_TABLE, "episodes")?;
Self::clear_table_entries(&write_txn, PATTERNS_TABLE, "patterns")?;
Self::clear_table_entries(&write_txn, HEURISTICS_TABLE, "heuristics")?;
Self::clear_table_entries(&write_txn, EMBEDDINGS_TABLE, "embeddings")?;
Self::clear_table_entries(&write_txn, METADATA_TABLE, "metadata")?;
Self::clear_table_entries(&write_txn, SUMMARIES_TABLE, "summaries")?;
Self::clear_table_entries(&write_txn, RELATIONSHIPS_TABLE, "relationships")?;
}
write_txn
.commit()
.map_err(|e| Error::Storage(format!("Failed to commit transaction: {}", e)))?;
Ok::<(), Error>(())
})
.await?;
info!("Successfully cleared all cached data");
Ok(())
}
fn clear_table_entries(
write_txn: &redb::WriteTransaction,
table_def: redb::TableDefinition<&str, &[u8]>,
table_name: &str,
) -> Result<()> {
let mut table = write_txn
.open_table(table_def)
.map_err(|e| Error::Storage(format!("Failed to open {} table: {}", table_name, e)))?;
let keys: Vec<String> = table
.iter()
.map_err(|e| Error::Storage(format!("Failed to iterate {}: {}", table_name, e)))?
.filter_map(|item| item.ok())
.map(|(k, _v)| k.value().to_string())
.collect();
for key in keys {
table.remove(key.as_str()).map_err(|e| {
Error::Storage(format!("Failed to remove {} key: {}", table_name, e))
})?;
}
Ok(())
}
fn clear_table_entries_str(
write_txn: &redb::WriteTransaction,
table_def: redb::TableDefinition<&str, &str>,
table_name: &str,
) -> Result<()> {
let mut table = write_txn
.open_table(table_def)
.map_err(|e| Error::Storage(format!("Failed to open {} table: {}", table_name, e)))?;
let keys: Vec<String> = table
.iter()
.map_err(|e| Error::Storage(format!("Failed to iterate {}: {}", table_name, e)))?
.filter_map(|item| item.ok())
.map(|(k, _v)| k.value().to_string())
.collect();
for key in keys {
table.remove(key.as_str()).map_err(|e| {
Error::Storage(format!("Failed to remove {} key: {}", table_name, e))
})?;
}
Ok(())
}
}