use std::path::Path;
use std::sync::Arc;
use rusqlite::{Connection, TransactionBehavior, params};
use solo_core::{Embedder, Error, Result, TenantId, VectorIndex};
use crate::audit::{AuditOperation, AuditResult, insert_audit_admin_row};
use crate::embedder_registry::EmbedderIdentity;
use crate::hnsw_id::{chunk_hnsw_id, episode_hnsw_id};
use crate::init::open_sqlcipher;
use crate::key_material::KeyMaterial;
use crate::tenants::TenantsIndex;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForgetReport {
pub episodes_deleted: u64,
pub triples_deleted: u64,
pub chunks_deleted: u64,
pub hnsw_rebuilt: bool,
pub audit_admin_row_id: i64,
}
pub fn forget_principal(
tenant_handle: Arc<crate::tenants::TenantHandle>,
principal_subject: &str,
actor_principal: Option<&str>,
data_dir: &Path,
key: &KeyMaterial,
) -> Result<ForgetReport> {
let tenant_id = tenant_handle.tenant_id().clone();
let db_path = tenant_handle.db_path().to_path_buf();
let hnsw = tenant_handle.hnsw().clone();
let embedder_id = tenant_handle.embedder_id();
let mut conn = open_sqlcipher(&db_path, key)?;
let (episodes_deleted, triples_deleted, chunks_deleted, episode_rowids) =
delete_principal_rows(&mut conn, principal_subject)?;
let total_deleted = episodes_deleted + triples_deleted + chunks_deleted;
for rowid in &episode_rowids {
let _ = hnsw.remove(episode_hnsw_id(*rowid));
}
let hnsw_rebuilt = if total_deleted > 0 {
rebuild_hnsw_after_forget(&conn, hnsw.as_ref(), embedder_id)?;
true
} else {
false
};
let now_ms = chrono::Utc::now().timestamp_millis();
let admin_path = data_dir.join(crate::tenants::TENANTS_INDEX_FILENAME);
let admin_conn = open_sqlcipher(&admin_path, key)?;
let details = serde_json::json!({
"principal_subject": principal_subject,
"episodes_deleted": episodes_deleted,
"triples_deleted": triples_deleted,
"chunks_deleted": chunks_deleted,
"hnsw_rebuilt": hnsw_rebuilt,
});
let audit_admin_row_id = insert_audit_admin_row(
&admin_conn,
now_ms,
actor_principal,
AuditOperation::GdprForgetUser,
Some(tenant_id.as_str()),
AuditResult::Ok,
Some(&details),
)?;
Ok(ForgetReport {
episodes_deleted,
triples_deleted,
chunks_deleted,
hnsw_rebuilt,
audit_admin_row_id,
})
}
fn delete_principal_rows(
conn: &mut Connection,
principal_subject: &str,
) -> Result<(u64, u64, u64, Vec<i64>)> {
let tx = conn
.transaction_with_behavior(TransactionBehavior::Immediate)
.map_err(|e| Error::storage(format!("BEGIN IMMEDIATE for forget: {e}")))?;
let mut rowids: Vec<i64> = {
let mut stmt = tx
.prepare(
"SELECT rowid FROM episodes WHERE principal_subject = ?",
)
.map_err(|e| Error::storage(format!("prepare SELECT episodes.rowid: {e}")))?;
let rows = stmt
.query_map(params![principal_subject], |r| r.get::<_, i64>(0))
.map_err(|e| Error::storage(format!("query episodes.rowid: {e}")))?;
rows.collect::<rusqlite::Result<Vec<_>>>()
.map_err(|e| Error::storage(format!("collect episode rowids: {e}")))?
};
rowids.sort_unstable();
let triples_deleted: u64 = 0;
let episodes_deleted: u64 = tx
.execute(
"DELETE FROM episodes WHERE principal_subject = ?",
params![principal_subject],
)
.map_err(|e| Error::storage(format!("DELETE episodes: {e}")))?
as u64;
let chunks_deleted: u64 = tx
.execute(
"DELETE FROM document_chunks WHERE ingested_by_principal = ?",
params![principal_subject],
)
.map_err(|e| Error::storage(format!("DELETE document_chunks: {e}")))?
as u64;
tx.commit()
.map_err(|e| Error::storage(format!("COMMIT forget: {e}")))?;
Ok((episodes_deleted, triples_deleted, chunks_deleted, rowids))
}
fn rebuild_hnsw_after_forget(
conn: &Connection,
hnsw: &dyn VectorIndex,
embedder_id: i64,
) -> Result<()> {
let _ = crate::recovery::rebuild_hnsw_from_sql(conn, hnsw, embedder_id)?;
let mut stmt = conn
.prepare(
"SELECT c.rowid, ce.vector, ce.dim
FROM document_chunks c
JOIN chunk_embeddings ce ON ce.chunk_id = c.chunk_id
WHERE ce.embedder_id = ?1
ORDER BY c.rowid",
)
.map_err(|e| Error::storage(format!("prepare chunks rebuild: {e}")))?;
let rows = stmt
.query_map(params![embedder_id], |r| {
Ok((
r.get::<_, i64>(0)?,
r.get::<_, Vec<u8>>(1)?,
r.get::<_, i64>(2)?,
))
})
.map_err(|e| Error::storage(format!("query chunks rebuild: {e}")))?;
for row in rows {
let (rowid, blob, dim) = match row {
Ok(r) => r,
Err(e) => {
tracing::warn!(error = %e, "GDPR rebuild: chunk row decode failed; skipping");
continue;
}
};
let dim = dim as usize;
if blob.len() != dim * 4 {
tracing::warn!(
rowid,
blob_len = blob.len(),
expected = dim * 4,
"GDPR rebuild: chunk vector size mismatch; skipping"
);
continue;
}
let slice: &[f32] = match bytemuck::try_cast_slice(&blob) {
Ok(s) => s,
Err(e) => {
tracing::warn!(rowid, error = %e, "GDPR rebuild: chunk vector cast failed; skipping");
continue;
}
};
if let Err(e) = hnsw.add(chunk_hnsw_id(rowid), slice) {
tracing::warn!(rowid, error = %e, "GDPR rebuild: hnsw.add failed; skipping");
}
}
Ok(())
}
pub fn estimate_forget_scope(
db_path: &Path,
key: &KeyMaterial,
principal_subject: &str,
) -> Result<(u64, u64)> {
let conn = open_sqlcipher(db_path, key)?;
let episodes: i64 = conn
.query_row(
"SELECT COUNT(*) FROM episodes WHERE principal_subject = ?",
params![principal_subject],
|r| r.get(0),
)
.map_err(|e| Error::storage(format!("COUNT episodes for estimate: {e}")))?;
let chunks: i64 = conn
.query_row(
"SELECT COUNT(*) FROM document_chunks WHERE ingested_by_principal = ?",
params![principal_subject],
|r| r.get(0),
)
.map_err(|e| Error::storage(format!("COUNT chunks for estimate: {e}")))?;
Ok((episodes as u64, chunks as u64))
}
#[allow(dead_code)]
fn _silence_unused() {
let _: Option<&dyn Embedder> = None;
let _: Option<TenantId> = None;
let _: Option<&TenantsIndex> = None;
let _: Option<&EmbedderIdentity> = None;
}
#[cfg(test)]
mod tests {
use super::*;
use rusqlite::params;
fn seed_two_principal_db() -> (tempfile::TempDir, std::path::PathBuf) {
let tmp = tempfile::TempDir::new().unwrap();
let db_path = tmp.path().join("forget.db");
let mut conn = Connection::open(&db_path).unwrap();
conn.execute_batch(
"PRAGMA journal_mode = wal;
PRAGMA foreign_keys = ON;
PRAGMA busy_timeout = 5000;",
)
.unwrap();
crate::migration::run_migrations(&mut conn).unwrap();
let now_ms = chrono::Utc::now().timestamp_millis();
for i in 0..3 {
conn.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, content,
encoding_context_json, confidence, strength, salience,
tier, created_at_ms, updated_at_ms, principal_subject
) VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5, 'hot', ?, ?, 'alice')",
params![
format!("00000000-0000-0000-0000-00000000a{i:03x}"),
now_ms,
format!("alice content {i}"),
now_ms,
now_ms,
],
)
.unwrap();
}
for i in 0..2 {
conn.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, content,
encoding_context_json, confidence, strength, salience,
tier, created_at_ms, updated_at_ms, principal_subject
) VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5, 'hot', ?, ?, 'bob')",
params![
format!("00000000-0000-0000-0000-00000000b{i:03x}"),
now_ms,
format!("bob content {i}"),
now_ms,
now_ms,
],
)
.unwrap();
}
conn.execute(
"INSERT INTO documents (
doc_id, source, title, mime_type, ingested_at_ms,
modified_at_ms, status, chunk_count, content_hash, byte_size
) VALUES ('00000000-0000-0000-0000-000000000d01', 'src://test', 't',
'text/markdown', ?, NULL, 'active', 4, 'hashabc', 200)",
params![now_ms],
)
.unwrap();
for i in 0..3 {
conn.execute(
"INSERT INTO document_chunks (
chunk_id, doc_id, chunk_index, content, token_count,
start_offset, end_offset, created_at_ms, ingested_by_principal
) VALUES (?, ?, ?, ?, 5, ?, ?, ?, 'alice')",
params![
format!("00000000-0000-0000-0000-00000000c{i:03x}"),
"00000000-0000-0000-0000-000000000d01",
i,
format!("alice chunk {i}"),
(i * 10) as i64,
((i + 1) * 10) as i64,
now_ms,
],
)
.unwrap();
}
conn.execute(
"INSERT INTO document_chunks (
chunk_id, doc_id, chunk_index, content, token_count,
start_offset, end_offset, created_at_ms, ingested_by_principal
) VALUES ('00000000-0000-0000-0000-00000000ccc1', '00000000-0000-0000-0000-000000000d01',
3, 'bob chunk', 5, 30, 40, ?, 'bob')",
params![now_ms],
)
.unwrap();
(tmp, db_path)
}
#[test]
fn delete_principal_rows_targets_only_named_subject() {
let (_tmp, db_path) = seed_two_principal_db();
let mut conn = Connection::open(&db_path).unwrap();
conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
let (episodes, _triples, chunks, _rowids) =
delete_principal_rows(&mut conn, "alice").unwrap();
assert_eq!(episodes, 3, "should delete alice's 3 episodes");
assert_eq!(chunks, 3, "should delete alice's 3 chunks");
let alice_remaining: i64 = conn
.query_row(
"SELECT COUNT(*) FROM episodes WHERE principal_subject = 'alice'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(alice_remaining, 0);
let bob_remaining: i64 = conn
.query_row(
"SELECT COUNT(*) FROM episodes WHERE principal_subject = 'bob'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(bob_remaining, 2);
let bob_chunks: i64 = conn
.query_row(
"SELECT COUNT(*) FROM document_chunks WHERE ingested_by_principal = 'bob'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(bob_chunks, 1);
}
#[test]
fn delete_principal_rows_idempotent_on_absent_subject() {
let (_tmp, db_path) = seed_two_principal_db();
let mut conn = Connection::open(&db_path).unwrap();
let (episodes, _triples, chunks, rowids) =
delete_principal_rows(&mut conn, "ghost").unwrap();
assert_eq!(episodes, 0);
assert_eq!(chunks, 0);
assert!(rowids.is_empty());
let (e2, _, c2, _) = delete_principal_rows(&mut conn, "ghost").unwrap();
assert_eq!(e2, 0);
assert_eq!(c2, 0);
}
#[test]
fn delete_principal_rows_cascades_to_embeddings() {
let (_tmp, db_path) = seed_two_principal_db();
let mut conn = Connection::open(&db_path).unwrap();
conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
let alice_id_0 = "00000000-0000-0000-0000-00000000a000";
conn.execute(
"INSERT INTO embedders (name, version, dim, dtype, first_seen_ms)
VALUES ('test', 'v1', 4, 'f32', 0)",
[],
)
.unwrap();
let embedder_id: i64 = conn
.query_row("SELECT embedder_id FROM embedders WHERE name = 'test'", [], |r| r.get(0))
.unwrap();
let vec_blob = vec![0u8; 16];
conn.execute(
"INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms)
VALUES (?, ?, 'f32', 4, ?, 0)",
params![alice_id_0, embedder_id, &vec_blob[..]],
)
.unwrap();
let before: i64 = conn
.query_row("SELECT COUNT(*) FROM embeddings", [], |r| r.get(0))
.unwrap();
assert_eq!(before, 1);
let (_e, _t, _c, _r) = delete_principal_rows(&mut conn, "alice").unwrap();
let after: i64 = conn
.query_row("SELECT COUNT(*) FROM embeddings", [], |r| r.get(0))
.unwrap();
assert_eq!(after, 0, "embeddings should cascade-delete with episodes");
}
#[test]
#[ignore = "requires SQLCipher: estimate_forget_scope opens via open_sqlcipher which fails on plain SQLite test DBs. Round-trip is covered by SQLCipher-enabled integration tests."]
fn estimate_forget_scope_counts_correctly() {
let (_tmp, db_path) = seed_two_principal_db();
let key = crate::key_material::KeyMaterial::derive("test-pass", &[0u8; 16]).unwrap();
let (eps, chunks) = estimate_forget_scope(&db_path, &key, "alice").unwrap();
assert_eq!(eps, 3);
assert_eq!(chunks, 3);
let (eps_b, chunks_b) = estimate_forget_scope(&db_path, &key, "bob").unwrap();
assert_eq!(eps_b, 2);
assert_eq!(chunks_b, 1);
let (eps_ghost, _) = estimate_forget_scope(&db_path, &key, "ghost").unwrap();
assert_eq!(eps_ghost, 0);
}
#[test]
fn forget_principal_round_trip_against_real_sqlcipher() {
use crate::init::{InitParams, init};
use solo_core::TenantId;
use std::sync::Arc;
use zeroize::Zeroizing;
let tmp = tempfile::TempDir::new().unwrap();
let data_dir = tmp.path().to_path_buf();
let pass = "forget round-trip test passphrase";
let outcome = init(InitParams {
data_dir: data_dir.clone(),
passphrase: Zeroizing::new(pass.into()),
force: false,
embedder: crate::init::default_embedder(),
})
.expect("init");
let cfg = crate::config::SoloConfig::read(&outcome.config_path).unwrap();
let salt = cfg.salt_bytes().unwrap();
let key = crate::key_material::KeyMaterial::derive(pass, &salt).unwrap();
{
let conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
let now = chrono::Utc::now().timestamp_millis();
for i in 0..3 {
conn.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, content,
encoding_context_json, confidence, strength, salience,
tier, created_at_ms, updated_at_ms, principal_subject
) VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5,
'hot', ?, ?, 'alice')",
params![
format!("00000000-0000-0000-0000-0000000{i:08x}"),
now,
format!("alice ep {i}"),
now,
now,
],
)
.unwrap();
}
conn.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, content,
encoding_context_json, confidence, strength, salience,
tier, created_at_ms, updated_at_ms, principal_subject
) VALUES (?, ?, 'user_message', 'bob ep', '{}', 0.9, 0.5, 0.5,
'hot', ?, ?, 'bob')",
params![
"00000000-0000-0000-0000-0000000b000000",
now,
now,
now,
],
)
.unwrap();
}
let stub = Arc::new(crate::test_support::StubVectorIndex::new(4));
let writer_conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
let crate::writer::WriterSpawn { handle: write_handle, join } =
crate::writer::WriterActor::spawn(writer_conn, stub.clone() as Arc<_>);
let read_pool = crate::reader::ReaderPool::new(
&outcome.db_path,
Some(key.clone()),
stub.clone() as Arc<_>,
)
.unwrap();
let embedder: Arc<dyn solo_core::Embedder> = Arc::new(
crate::embedder::StubEmbedder::new("stub", "v1", 4),
);
let handle = Arc::new(crate::tenants::TenantHandle::from_parts_for_tests(
TenantId::default_tenant(),
cfg.clone(),
outcome.db_path.clone(),
data_dir.clone(),
1, stub.clone() as Arc<_>,
embedder,
write_handle,
join,
read_pool,
));
let report = forget_principal(handle, "alice", Some("admin"), &data_dir, &key)
.expect("forget_principal");
assert_eq!(report.episodes_deleted, 3);
assert_eq!(report.triples_deleted, 0);
assert_eq!(report.chunks_deleted, 0);
assert!(report.hnsw_rebuilt);
let after_conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
let alice_left: i64 = after_conn
.query_row(
"SELECT COUNT(*) FROM episodes WHERE principal_subject = 'alice'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(alice_left, 0);
let bob_left: i64 = after_conn
.query_row(
"SELECT COUNT(*) FROM episodes WHERE principal_subject = 'bob'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(bob_left, 1);
let admin_conn = crate::init::open_sqlcipher(
&data_dir.join(crate::tenants::TENANTS_INDEX_FILENAME),
&key,
)
.unwrap();
let (op, target, principal): (String, Option<String>, Option<String>) = admin_conn
.query_row(
"SELECT operation, target_tenant_id, principal_subject \
FROM audit_events_admin WHERE audit_id = ?",
params![report.audit_admin_row_id],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
)
.unwrap();
assert_eq!(op, "gdpr.forget_user");
assert_eq!(target.as_deref(), Some("default"));
assert_eq!(principal.as_deref(), Some("admin"));
}
#[test]
fn forget_principal_idempotent_on_absent_subject() {
use crate::init::{InitParams, init};
use solo_core::TenantId;
use std::sync::Arc;
use zeroize::Zeroizing;
let tmp = tempfile::TempDir::new().unwrap();
let data_dir = tmp.path().to_path_buf();
let pass = "idempotent forget test";
let outcome = init(InitParams {
data_dir: data_dir.clone(),
passphrase: Zeroizing::new(pass.into()),
force: false,
embedder: crate::init::default_embedder(),
})
.expect("init");
let cfg = crate::config::SoloConfig::read(&outcome.config_path).unwrap();
let salt = cfg.salt_bytes().unwrap();
let key = crate::key_material::KeyMaterial::derive(pass, &salt).unwrap();
let stub = Arc::new(crate::test_support::StubVectorIndex::new(4));
let writer_conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
let crate::writer::WriterSpawn { handle: write_handle, join } =
crate::writer::WriterActor::spawn(writer_conn, stub.clone() as Arc<_>);
let read_pool = crate::reader::ReaderPool::new(
&outcome.db_path,
Some(key.clone()),
stub.clone() as Arc<_>,
)
.unwrap();
let embedder: Arc<dyn solo_core::Embedder> = Arc::new(
crate::embedder::StubEmbedder::new("stub", "v1", 4),
);
let handle = Arc::new(crate::tenants::TenantHandle::from_parts_for_tests(
TenantId::default_tenant(),
cfg,
outcome.db_path.clone(),
data_dir.clone(),
1,
stub.clone() as Arc<_>,
embedder,
write_handle,
join,
read_pool,
));
let report = forget_principal(handle, "ghost", None, &data_dir, &key)
.expect("forget_principal on absent subject");
assert_eq!(report.episodes_deleted, 0);
assert_eq!(report.chunks_deleted, 0);
assert!(!report.hnsw_rebuilt, "no rebuild when nothing deleted");
assert!(report.audit_admin_row_id > 0);
}
#[test]
fn estimate_via_direct_count_matches_seeded_data() {
let (_tmp, db_path) = seed_two_principal_db();
let conn = Connection::open(&db_path).unwrap();
let alice_eps: i64 = conn
.query_row(
"SELECT COUNT(*) FROM episodes WHERE principal_subject = ?",
params!["alice"],
|r| r.get(0),
)
.unwrap();
assert_eq!(alice_eps, 3);
let alice_chunks: i64 = conn
.query_row(
"SELECT COUNT(*) FROM document_chunks WHERE ingested_by_principal = ?",
params!["alice"],
|r| r.get(0),
)
.unwrap();
assert_eq!(alice_chunks, 3);
let bob_eps: i64 = conn
.query_row(
"SELECT COUNT(*) FROM episodes WHERE principal_subject = ?",
params!["bob"],
|r| r.get(0),
)
.unwrap();
assert_eq!(bob_eps, 2);
let ghost_eps: i64 = conn
.query_row(
"SELECT COUNT(*) FROM episodes WHERE principal_subject = ?",
params!["ghost"],
|r| r.get(0),
)
.unwrap();
assert_eq!(ghost_eps, 0);
}
}