use libsql::params;
use tokio::fs;
use tracing::warn;
use crate::error::{MemoryError, Result};
use crate::persistence::{ConceptVersion, Persistence};
impl Persistence {
pub async fn save_associations(
&self,
ns: &str,
associations: &[(String, String, f32)],
) -> Result<()> {
if associations.is_empty() {
return Ok(());
}
let _permit = self.acquire_remote_slot().await?;
let conn = self.connect().await?;
conn.execute("BEGIN", ())
.await
.map_err(|e| MemoryError::database(format!("Failed to begin transaction: {e}")))?;
let stmt = conn
.prepare(
"INSERT INTO csm_associations (namespace, from_id, to_id, strength)
VALUES (?1, ?2, ?3, ?4)
ON CONFLICT(namespace, from_id, to_id) DO UPDATE SET strength = excluded.strength",
)
.await
.map_err(|e| MemoryError::database(format!("Failed to prepare statement: {e}")))?;
let mut first_error: Option<MemoryError> = None;
for (from, to, strength) in associations {
stmt.reset();
if let Err(e) = stmt
.execute(params![ns, from.as_str(), to.as_str(), *strength])
.await
{
first_error = Some(MemoryError::database(format!(
"Failed to batch save association: {e}"
)));
break;
}
}
if let Some(error) = first_error {
let _ = conn.execute("ROLLBACK", ()).await;
return Err(error);
}
conn.execute("COMMIT", ())
.await
.map_err(|e| MemoryError::database(format!("Failed to commit transaction: {e}")))?;
Ok(())
}
pub async fn clear_namespace(&self, ns: &str) -> Result<()> {
let _permit = self.acquire_remote_slot().await?;
let conn = self.connect().await?;
conn.execute("BEGIN", ())
.await
.map_err(|e| MemoryError::database(format!("Failed to begin transaction: {e}")))?;
let tables = [
"DELETE FROM csm_associations WHERE namespace = ?1",
"DELETE FROM csm_versions WHERE namespace = ?1",
"DELETE FROM csm_concepts WHERE namespace = ?1",
"DELETE FROM csm_hnsw_graph WHERE namespace = ?1",
"DELETE FROM csm_canonical WHERE namespace = ?1",
];
for sql in &tables {
if let Err(e) = conn.execute(sql, params![ns]).await {
let _ = conn.execute("ROLLBACK", ()).await;
return Err(MemoryError::database(format!(
"Failed to clear namespace data: {e}"
)));
}
}
conn.execute("COMMIT", ())
.await
.map_err(|e| MemoryError::database(format!("Failed to commit namespace clear: {e}")))?;
Ok(())
}
pub async fn clear_all(&self) -> Result<()> {
let _permit = self.acquire_remote_slot().await?;
let conn = self.connect().await?;
conn.execute_batch(
"BEGIN;
DELETE FROM csm_associations;
DELETE FROM csm_versions;
DELETE FROM csm_concepts;
DELETE FROM csm_hnsw_graph;
DELETE FROM csm_canonical;
COMMIT;",
)
.await
.map_err(|e| MemoryError::database(format!("Failed to clear all data: {e}")))?;
Ok(())
}
pub async fn get_concept_history(
&self,
ns: &str,
id: &str,
limit: usize,
) -> Result<Vec<ConceptVersion>> {
let _permit = self.acquire_remote_slot().await?;
let conn = self.connect().await?;
let mut rows = conn
.query(
"SELECT concept_id, version, vector, metadata, modified_at
FROM csm_versions
WHERE namespace = ?1 AND concept_id = ?2
ORDER BY version DESC
LIMIT ?3",
libsql::params![ns, id, limit as i64],
)
.await
.map_err(|e| MemoryError::database(format!("Failed to load concept history: {e}")))?;
let mut history = Vec::new();
while let Some(row) = rows.next().await.map_err(|e| {
MemoryError::database(format!("Failed to fetch concept history row: {e}"))
})? {
let concept_id: String = row
.get(0)
.map_err(|e| MemoryError::database(format!("Failed to get concept_id: {e}")))?;
let version: i64 = row
.get(1)
.map_err(|e| MemoryError::database(format!("Failed to get version: {e}")))?;
let vector_bytes: Vec<u8> = row
.get(2)
.map_err(|e| MemoryError::database(format!("Failed to get vector: {e}")))?;
let metadata_json: String = row
.get(3)
.map_err(|e| MemoryError::database(format!("Failed to get metadata: {e}")))?;
let modified_at: i64 = row
.get(4)
.map_err(|e| MemoryError::database(format!("Failed to get modified_at: {e}")))?;
history.push(ConceptVersion {
concept_id,
version,
vector: crate::hyperdim::HVec10240::from_bytes(&vector_bytes)?,
metadata: serde_json::from_str(&metadata_json)?,
modified_at: modified_at as u64,
});
}
Ok(history)
}
pub async fn schema_version(&self) -> Result<i64> {
let _permit = self.acquire_remote_slot().await?;
let conn = self.connect().await?;
let mut rows = conn
.query(
"SELECT COALESCE(MAX(version), 0) FROM csm_schema_version",
(),
)
.await
.map_err(|e| MemoryError::database(format!("Failed to get schema version: {e}")))?;
if let Some(row) = rows.next().await.map_err(|e| {
MemoryError::database(format!("Failed to fetch schema version row: {e}"))
})? {
let version: i64 = row.get(0).map_err(|e| {
MemoryError::database(format!("Failed to parse schema version: {e}"))
})?;
Ok(version)
} else {
Ok(0)
}
}
pub async fn apply_migrations(&self, target_version: i64) -> Result<()> {
let _permit = self.acquire_remote_slot().await?;
let conn = self.connect().await?;
self.apply_migrations_with_conn(&conn, target_version).await
}
pub async fn backup(&self, path: &str) -> Result<()> {
let Some(_local_path) = &self.local_path else {
return Err(MemoryError::UnsupportedOperation(
"backup is only supported for local SQLite databases".to_string(),
));
};
self.checkpoint().await?;
if fs::metadata(path).await.is_ok() {
fs::remove_file(path).await.map_err(MemoryError::Io)?;
}
let _permit = self.acquire_remote_slot().await?;
let conn = self.connect().await?;
conn.execute("VACUUM INTO ?1", params![path])
.await
.map_err(|e| MemoryError::database(format!("Failed to create backup: {e}")))?;
Ok(())
}
pub async fn restore(&self, path: &str) -> Result<()> {
let Some(_local_path) = &self.local_path else {
return Err(MemoryError::UnsupportedOperation(
"restore is only supported for local SQLite databases".to_string(),
));
};
fs::metadata(path).await.map_err(MemoryError::Io)?;
let _permit = self.acquire_remote_slot().await?;
let conn = self.connect().await?;
conn.execute("BEGIN IMMEDIATE", ()).await.map_err(|e| {
MemoryError::database(format!("Failed to begin restore transaction: {e}"))
})?;
if let Err(error) = async {
conn.execute("ATTACH DATABASE ?1 AS restore_db", params![path])
.await
.map_err(|e| MemoryError::database(format!("Failed to attach backup DB: {e}")))?;
conn.execute_batch(
"DELETE FROM csm_associations;
DELETE FROM csm_versions;
DELETE FROM csm_concepts;
DELETE FROM csm_hnsw_graph;
DELETE FROM csm_canonical;
DELETE FROM csm_schema_version;",
)
.await
.map_err(|e| MemoryError::database(format!("Failed to clear current database: {e}")))?;
conn.execute_batch(
"INSERT INTO csm_concepts (namespace, id, vector, metadata, created_at, modified_at, expires_at, canonical_concept_ids_json)
SELECT namespace, id, vector, metadata, created_at, modified_at, expires_at, canonical_concept_ids_json FROM restore_db.csm_concepts;
INSERT INTO csm_associations (namespace, from_id, to_id, strength)
SELECT namespace, from_id, to_id, strength FROM restore_db.csm_associations;
INSERT INTO csm_versions (namespace, concept_id, version, vector, metadata, modified_at)
SELECT namespace, concept_id, version, vector, metadata, modified_at FROM restore_db.csm_versions;
INSERT INTO csm_hnsw_graph (namespace, id, data, modified_at)
SELECT namespace, id, data, modified_at FROM restore_db.csm_hnsw_graph;
INSERT INTO csm_canonical (namespace, id, version, labels_json, related_json)
SELECT namespace, id, version, labels_json, related_json FROM restore_db.csm_canonical;
INSERT INTO csm_schema_version(version)
SELECT version FROM restore_db.csm_schema_version;",
)
.await
.map_err(|e| MemoryError::database(format!("Failed to import backup data: {e}")))?;
Ok::<(), MemoryError>(())
}
.await
{
let _ = conn.execute_batch("ROLLBACK;").await;
return Err(error);
}
conn.execute("COMMIT", ())
.await
.map_err(|e| MemoryError::database(format!("Failed to commit restore: {e}")))?;
if let Err(error) = conn.execute_batch("DETACH DATABASE restore_db;").await {
warn!(error = %error, "failed to detach restore_db after restore");
}
self.init_schema().await?;
Ok(())
}
pub async fn health_check(&self) -> Result<()> {
let _permit = self.acquire_remote_slot().await?;
let conn = self.connect().await?;
conn.query("SELECT 1", ())
.await
.map_err(|e| MemoryError::database(format!("Failed persistence health check: {e}")))?;
Ok(())
}
pub async fn delete_association(&self, ns: &str, from: &str, to: &str) -> Result<()> {
let _permit = self.acquire_remote_slot().await?;
let conn = self.connect().await?;
conn.execute(
"DELETE FROM csm_associations WHERE namespace = ?1 AND from_id = ?2 AND to_id = ?3",
params![ns, from, to],
)
.await
.map_err(|e| MemoryError::database(format!("Failed to delete association: {e}")))?;
Ok(())
}
pub async fn clear_concept_associations(&self, ns: &str, id: &str) -> Result<()> {
let _permit = self.acquire_remote_slot().await?;
let conn = self.connect().await?;
conn.execute(
"DELETE FROM csm_associations WHERE namespace = ?1 AND from_id = ?2",
params![ns, id],
)
.await
.map_err(|e| MemoryError::database(format!("Failed to clear concept associations: {e}")))?;
Ok(())
}
}
#[cfg(test)]
#[cfg(feature = "persistence")]
mod tests {
use crate::hyperdim::HVec10240;
use crate::persistence::Persistence;
use crate::singularity::Concept;
use std::collections::HashMap;
use tempfile::NamedTempFile;
fn make_concept(id: &str) -> Concept {
Concept {
id: id.to_string(),
vector: HVec10240::random(),
metadata: HashMap::new(),
created_at: 0,
modified_at: 0,
expires_at: None,
canonical_concept_ids: Vec::new(),
}
}
#[tokio::test]
async fn save_and_load_concept_roundtrip() {
let temp = NamedTempFile::new().expect("Failed to create temp file");
let path = temp.path().to_str().expect("Invalid path");
let persistence = Persistence::new_local(path)
.await
.expect("Failed to create persistence");
let ns = "_default";
let concept = make_concept("test-concept");
persistence
.save_concept(ns, &concept)
.await
.expect("Failed to save");
let loaded = persistence
.load_concept(ns, "test-concept")
.await
.expect("Failed to load")
.expect("Concept not found");
assert_eq!(loaded.id, concept.id);
}
#[tokio::test]
async fn delete_concept_removes_from_db() {
let temp = NamedTempFile::new().expect("Failed to create temp file");
let path = temp.path().to_str().expect("Invalid path");
let persistence = Persistence::new_local(path)
.await
.expect("Failed to create persistence");
let ns = "_default";
let concept = make_concept("delete-test");
persistence
.save_concept(ns, &concept)
.await
.expect("Failed to save");
persistence
.delete_concept(ns, "delete-test")
.await
.expect("Failed to delete");
let result = persistence.load_concept(ns, "delete-test").await;
assert!(result.expect("Query failed").is_none());
}
#[tokio::test]
async fn schema_version_initialized() {
let temp = NamedTempFile::new().expect("Failed to create temp file");
let path = temp.path().to_str().expect("Invalid path");
let persistence = Persistence::new_local(path)
.await
.expect("Failed to create persistence");
let version = persistence
.schema_version()
.await
.expect("Failed to get version");
assert!(version > 0);
}
}