use std::path::Path;
use std::sync::Arc;
use rusqlite::{Connection, TransactionBehavior, params};
use solo_core::{Embedder, Error, Result, TenantId, VectorIndex};
use crate::audit::{AuditOperation, AuditResult, insert_audit_admin_row};
use crate::embedder_registry::EmbedderIdentity;
use crate::hnsw_id::{chunk_hnsw_id, episode_hnsw_id};
use crate::init::open_sqlcipher;
use crate::key_material::KeyMaterial;
use crate::tenants::TenantsIndex;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForgetReport {
pub episodes_deleted: u64,
pub triples_deleted: u64,
pub chunks_deleted: u64,
pub triples_orphan_null_source: u64,
pub hnsw_rebuilt: bool,
pub audit_admin_row_id: i64,
}
pub fn forget_principal(
tenant_handle: Arc<crate::tenants::TenantHandle>,
principal_subject: &str,
actor_principal: Option<&str>,
data_dir: &Path,
key: &KeyMaterial,
) -> Result<ForgetReport> {
let tenant_id = tenant_handle.tenant_id().clone();
let db_path = tenant_handle.db_path().to_path_buf();
let hnsw = tenant_handle.hnsw().clone();
let embedder_id = tenant_handle.embedder_id();
let mut conn = open_sqlcipher(&db_path, key)?;
let DeleteOutcome {
episodes_deleted,
triples_deleted,
triples_orphan_null_source,
chunks_deleted,
episode_rowids,
} = delete_principal_rows(&mut conn, principal_subject)?;
if triples_orphan_null_source > 0 {
tracing::warn!(
tenant = %tenant_id,
principal = %principal_subject,
orphan_count = triples_orphan_null_source,
"gdpr.forget_user: {triples_orphan_null_source} triple(s) reference \
the forgotten subject's cluster(s) but have NULL source_episode_id \
(pre-v0.8.1 schema or unresolved provenance). Cannot cascade. \
Operator action: review and manually clean up if compliance scope \
requires it."
);
}
let total_deleted = episodes_deleted + triples_deleted + chunks_deleted;
for rowid in &episode_rowids {
let _ = hnsw.remove(episode_hnsw_id(*rowid));
}
let hnsw_rebuilt = if total_deleted > 0 {
rebuild_hnsw_after_forget(&conn, hnsw.as_ref(), embedder_id)?;
true
} else {
false
};
let now_ms = chrono::Utc::now().timestamp_millis();
let admin_path = data_dir.join(crate::tenants::TENANTS_INDEX_FILENAME);
let admin_conn = open_sqlcipher(&admin_path, key)?;
let details = serde_json::json!({
"principal_subject": principal_subject,
"episodes_deleted": episodes_deleted,
"triples_deleted": triples_deleted,
"triples_orphan_null_source": triples_orphan_null_source,
"chunks_deleted": chunks_deleted,
"hnsw_rebuilt": hnsw_rebuilt,
});
let audit_admin_row_id = insert_audit_admin_row(
&admin_conn,
now_ms,
actor_principal,
AuditOperation::GdprForgetUser,
Some(tenant_id.as_str()),
AuditResult::Ok,
Some(&details),
)?;
Ok(ForgetReport {
episodes_deleted,
triples_deleted,
triples_orphan_null_source,
chunks_deleted,
hnsw_rebuilt,
audit_admin_row_id,
})
}
struct DeleteOutcome {
episodes_deleted: u64,
triples_deleted: u64,
triples_orphan_null_source: u64,
chunks_deleted: u64,
episode_rowids: Vec<i64>,
}
fn delete_principal_rows(
conn: &mut Connection,
principal_subject: &str,
) -> Result<DeleteOutcome> {
let tx = conn
.transaction_with_behavior(TransactionBehavior::Immediate)
.map_err(|e| Error::storage(format!("BEGIN IMMEDIATE for forget: {e}")))?;
let mut rowids: Vec<i64> = {
let mut stmt = tx
.prepare(
"SELECT rowid FROM episodes WHERE principal_subject = ?",
)
.map_err(|e| Error::storage(format!("prepare SELECT episodes.rowid: {e}")))?;
let rows = stmt
.query_map(params![principal_subject], |r| r.get::<_, i64>(0))
.map_err(|e| Error::storage(format!("query episodes.rowid: {e}")))?;
rows.collect::<rusqlite::Result<Vec<_>>>()
.map_err(|e| Error::storage(format!("collect episode rowids: {e}")))?
};
rowids.sort_unstable();
let (triples_deleted, triples_orphan_null_source): (u64, u64) = if rowids.is_empty() {
(0, 0)
} else {
let placeholders =
std::iter::repeat("?").take(rowids.len()).collect::<Vec<_>>().join(",");
let delete_sql = format!(
"DELETE FROM triples WHERE source_episode_id IN ({placeholders})"
);
let deleted = tx
.execute(&delete_sql, rusqlite::params_from_iter(rowids.iter()))
.map_err(|e| Error::storage(format!("DELETE triples: {e}")))?
as u64;
let orphan_sql = format!(
"SELECT COUNT(*) FROM triples t \
WHERE t.source_episode_id IS NULL \
AND t.cluster_id IN ( \
SELECT DISTINCT ce.cluster_id FROM cluster_episodes ce \
JOIN episodes e ON e.memory_id = ce.memory_id \
WHERE e.rowid IN ({placeholders}) \
)"
);
let orphans: i64 = tx
.query_row(
&orphan_sql,
rusqlite::params_from_iter(rowids.iter()),
|r| r.get(0),
)
.map_err(|e| Error::storage(format!("COUNT orphan triples: {e}")))?;
(deleted, orphans.max(0) as u64)
};
let episodes_deleted: u64 = tx
.execute(
"DELETE FROM episodes WHERE principal_subject = ?",
params![principal_subject],
)
.map_err(|e| Error::storage(format!("DELETE episodes: {e}")))?
as u64;
let chunks_deleted: u64 = tx
.execute(
"DELETE FROM document_chunks WHERE ingested_by_principal = ?",
params![principal_subject],
)
.map_err(|e| Error::storage(format!("DELETE document_chunks: {e}")))?
as u64;
tx.commit()
.map_err(|e| Error::storage(format!("COMMIT forget: {e}")))?;
Ok(DeleteOutcome {
episodes_deleted,
triples_deleted,
triples_orphan_null_source,
chunks_deleted,
episode_rowids: rowids,
})
}
fn rebuild_hnsw_after_forget(
conn: &Connection,
hnsw: &dyn VectorIndex,
embedder_id: i64,
) -> Result<()> {
let _ = crate::recovery::rebuild_hnsw_from_sql(conn, hnsw, embedder_id)?;
let mut stmt = conn
.prepare(
"SELECT c.rowid, ce.vector, ce.dim
FROM document_chunks c
JOIN chunk_embeddings ce ON ce.chunk_id = c.chunk_id
WHERE ce.embedder_id = ?1
ORDER BY c.rowid",
)
.map_err(|e| Error::storage(format!("prepare chunks rebuild: {e}")))?;
let rows = stmt
.query_map(params![embedder_id], |r| {
Ok((
r.get::<_, i64>(0)?,
r.get::<_, Vec<u8>>(1)?,
r.get::<_, i64>(2)?,
))
})
.map_err(|e| Error::storage(format!("query chunks rebuild: {e}")))?;
for row in rows {
let (rowid, blob, dim) = match row {
Ok(r) => r,
Err(e) => {
tracing::warn!(error = %e, "GDPR rebuild: chunk row decode failed; skipping");
continue;
}
};
let dim = dim as usize;
if blob.len() != dim * 4 {
tracing::warn!(
rowid,
blob_len = blob.len(),
expected = dim * 4,
"GDPR rebuild: chunk vector size mismatch; skipping"
);
continue;
}
let slice: &[f32] = match bytemuck::try_cast_slice(&blob) {
Ok(s) => s,
Err(e) => {
tracing::warn!(rowid, error = %e, "GDPR rebuild: chunk vector cast failed; skipping");
continue;
}
};
if let Err(e) = hnsw.add(chunk_hnsw_id(rowid), slice) {
tracing::warn!(rowid, error = %e, "GDPR rebuild: hnsw.add failed; skipping");
}
}
Ok(())
}
pub fn estimate_forget_scope(
db_path: &Path,
key: &KeyMaterial,
principal_subject: &str,
) -> Result<(u64, u64)> {
let conn = open_sqlcipher(db_path, key)?;
let episodes: i64 = conn
.query_row(
"SELECT COUNT(*) FROM episodes WHERE principal_subject = ?",
params![principal_subject],
|r| r.get(0),
)
.map_err(|e| Error::storage(format!("COUNT episodes for estimate: {e}")))?;
let chunks: i64 = conn
.query_row(
"SELECT COUNT(*) FROM document_chunks WHERE ingested_by_principal = ?",
params![principal_subject],
|r| r.get(0),
)
.map_err(|e| Error::storage(format!("COUNT chunks for estimate: {e}")))?;
Ok((episodes as u64, chunks as u64))
}
#[allow(dead_code)]
fn _silence_unused() {
let _: Option<&dyn Embedder> = None;
let _: Option<TenantId> = None;
let _: Option<&TenantsIndex> = None;
let _: Option<&EmbedderIdentity> = None;
}
#[cfg(test)]
mod tests {
use super::*;
use rusqlite::params;
fn seed_two_principal_db() -> (tempfile::TempDir, std::path::PathBuf) {
let tmp = tempfile::TempDir::new().unwrap();
let db_path = tmp.path().join("forget.db");
let mut conn = Connection::open(&db_path).unwrap();
conn.execute_batch(
"PRAGMA journal_mode = wal;
PRAGMA foreign_keys = ON;
PRAGMA busy_timeout = 5000;",
)
.unwrap();
crate::migration::run_migrations(&mut conn).unwrap();
let now_ms = chrono::Utc::now().timestamp_millis();
for i in 0..3 {
conn.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, content,
encoding_context_json, confidence, strength, salience,
tier, created_at_ms, updated_at_ms, principal_subject
) VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5, 'hot', ?, ?, 'alice')",
params![
format!("00000000-0000-0000-0000-00000000a{i:03x}"),
now_ms,
format!("alice content {i}"),
now_ms,
now_ms,
],
)
.unwrap();
}
for i in 0..2 {
conn.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, content,
encoding_context_json, confidence, strength, salience,
tier, created_at_ms, updated_at_ms, principal_subject
) VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5, 'hot', ?, ?, 'bob')",
params![
format!("00000000-0000-0000-0000-00000000b{i:03x}"),
now_ms,
format!("bob content {i}"),
now_ms,
now_ms,
],
)
.unwrap();
}
conn.execute(
"INSERT INTO documents (
doc_id, source, title, mime_type, ingested_at_ms,
modified_at_ms, status, chunk_count, content_hash, byte_size
) VALUES ('00000000-0000-0000-0000-000000000d01', 'src://test', 't',
'text/markdown', ?, NULL, 'active', 4, 'hashabc', 200)",
params![now_ms],
)
.unwrap();
for i in 0..3 {
conn.execute(
"INSERT INTO document_chunks (
chunk_id, doc_id, chunk_index, content, token_count,
start_offset, end_offset, created_at_ms, ingested_by_principal
) VALUES (?, ?, ?, ?, 5, ?, ?, ?, 'alice')",
params![
format!("00000000-0000-0000-0000-00000000c{i:03x}"),
"00000000-0000-0000-0000-000000000d01",
i,
format!("alice chunk {i}"),
(i * 10) as i64,
((i + 1) * 10) as i64,
now_ms,
],
)
.unwrap();
}
conn.execute(
"INSERT INTO document_chunks (
chunk_id, doc_id, chunk_index, content, token_count,
start_offset, end_offset, created_at_ms, ingested_by_principal
) VALUES ('00000000-0000-0000-0000-00000000ccc1', '00000000-0000-0000-0000-000000000d01',
3, 'bob chunk', 5, 30, 40, ?, 'bob')",
params![now_ms],
)
.unwrap();
(tmp, db_path)
}
#[test]
fn delete_principal_rows_targets_only_named_subject() {
let (_tmp, db_path) = seed_two_principal_db();
let mut conn = Connection::open(&db_path).unwrap();
conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
let DeleteOutcome {
episodes_deleted: episodes,
chunks_deleted: chunks,
..
} = delete_principal_rows(&mut conn, "alice").unwrap();
assert_eq!(episodes, 3, "should delete alice's 3 episodes");
assert_eq!(chunks, 3, "should delete alice's 3 chunks");
let alice_remaining: i64 = conn
.query_row(
"SELECT COUNT(*) FROM episodes WHERE principal_subject = 'alice'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(alice_remaining, 0);
let bob_remaining: i64 = conn
.query_row(
"SELECT COUNT(*) FROM episodes WHERE principal_subject = 'bob'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(bob_remaining, 2);
let bob_chunks: i64 = conn
.query_row(
"SELECT COUNT(*) FROM document_chunks WHERE ingested_by_principal = 'bob'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(bob_chunks, 1);
}
#[test]
fn delete_principal_rows_idempotent_on_absent_subject() {
let (_tmp, db_path) = seed_two_principal_db();
let mut conn = Connection::open(&db_path).unwrap();
let DeleteOutcome {
episodes_deleted: episodes,
chunks_deleted: chunks,
episode_rowids: rowids,
..
} = delete_principal_rows(&mut conn, "ghost").unwrap();
assert_eq!(episodes, 0);
assert_eq!(chunks, 0);
assert!(rowids.is_empty());
let DeleteOutcome {
episodes_deleted: e2,
chunks_deleted: c2,
..
} = delete_principal_rows(&mut conn, "ghost").unwrap();
assert_eq!(e2, 0);
assert_eq!(c2, 0);
}
#[test]
fn delete_principal_rows_cascades_to_embeddings() {
let (_tmp, db_path) = seed_two_principal_db();
let mut conn = Connection::open(&db_path).unwrap();
conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
let alice_id_0 = "00000000-0000-0000-0000-00000000a000";
conn.execute(
"INSERT INTO embedders (name, version, dim, dtype, first_seen_ms)
VALUES ('test', 'v1', 4, 'f32', 0)",
[],
)
.unwrap();
let embedder_id: i64 = conn
.query_row("SELECT embedder_id FROM embedders WHERE name = 'test'", [], |r| r.get(0))
.unwrap();
let vec_blob = vec![0u8; 16];
conn.execute(
"INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms)
VALUES (?, ?, 'f32', 4, ?, 0)",
params![alice_id_0, embedder_id, &vec_blob[..]],
)
.unwrap();
let before: i64 = conn
.query_row("SELECT COUNT(*) FROM embeddings", [], |r| r.get(0))
.unwrap();
assert_eq!(before, 1);
let _ = delete_principal_rows(&mut conn, "alice").unwrap();
let after: i64 = conn
.query_row("SELECT COUNT(*) FROM embeddings", [], |r| r.get(0))
.unwrap();
assert_eq!(after, 0, "embeddings should cascade-delete with episodes");
}
#[test]
#[ignore = "requires SQLCipher: estimate_forget_scope opens via open_sqlcipher which fails on plain SQLite test DBs. Round-trip is covered by SQLCipher-enabled integration tests."]
fn estimate_forget_scope_counts_correctly() {
let (_tmp, db_path) = seed_two_principal_db();
let key = crate::key_material::KeyMaterial::derive("test-pass", &[0u8; 16]).unwrap();
let (eps, chunks) = estimate_forget_scope(&db_path, &key, "alice").unwrap();
assert_eq!(eps, 3);
assert_eq!(chunks, 3);
let (eps_b, chunks_b) = estimate_forget_scope(&db_path, &key, "bob").unwrap();
assert_eq!(eps_b, 2);
assert_eq!(chunks_b, 1);
let (eps_ghost, _) = estimate_forget_scope(&db_path, &key, "ghost").unwrap();
assert_eq!(eps_ghost, 0);
}
#[test]
fn forget_principal_round_trip_against_real_sqlcipher() {
use crate::init::{InitParams, init};
use solo_core::TenantId;
use std::sync::Arc;
use zeroize::Zeroizing;
let tmp = tempfile::TempDir::new().unwrap();
let data_dir = tmp.path().to_path_buf();
let pass = "forget round-trip test passphrase";
let outcome = init(InitParams {
data_dir: data_dir.clone(),
passphrase: Zeroizing::new(pass.into()),
force: false,
embedder: crate::init::default_embedder(),
})
.expect("init");
let cfg = crate::config::SoloConfig::read(&outcome.config_path).unwrap();
let salt = cfg.salt_bytes().unwrap();
let key = crate::key_material::KeyMaterial::derive(pass, &salt).unwrap();
{
let conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
let now = chrono::Utc::now().timestamp_millis();
for i in 0..3 {
conn.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, content,
encoding_context_json, confidence, strength, salience,
tier, created_at_ms, updated_at_ms, principal_subject
) VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5,
'hot', ?, ?, 'alice')",
params![
format!("00000000-0000-0000-0000-0000000{i:08x}"),
now,
format!("alice ep {i}"),
now,
now,
],
)
.unwrap();
}
conn.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, content,
encoding_context_json, confidence, strength, salience,
tier, created_at_ms, updated_at_ms, principal_subject
) VALUES (?, ?, 'user_message', 'bob ep', '{}', 0.9, 0.5, 0.5,
'hot', ?, ?, 'bob')",
params![
"00000000-0000-0000-0000-0000000b000000",
now,
now,
now,
],
)
.unwrap();
}
let stub = Arc::new(crate::test_support::StubVectorIndex::new(4));
let writer_conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
let crate::writer::WriterSpawn { handle: write_handle, join } =
crate::writer::WriterActor::spawn(writer_conn, stub.clone() as Arc<_>);
let read_pool = crate::reader::ReaderPool::new(
&outcome.db_path,
Some(key.clone()),
stub.clone() as Arc<_>,
)
.unwrap();
let embedder: Arc<dyn solo_core::Embedder> = Arc::new(
crate::embedder::StubEmbedder::new("stub", "v1", 4),
);
let handle = Arc::new(crate::tenants::TenantHandle::from_parts_for_tests(
TenantId::default_tenant(),
cfg.clone(),
outcome.db_path.clone(),
data_dir.clone(),
1, stub.clone() as Arc<_>,
embedder,
write_handle,
join,
read_pool,
));
let report = forget_principal(handle, "alice", Some("admin"), &data_dir, &key)
.expect("forget_principal");
assert_eq!(report.episodes_deleted, 3);
assert_eq!(report.triples_deleted, 0);
assert_eq!(report.chunks_deleted, 0);
assert!(report.hnsw_rebuilt);
let after_conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
let alice_left: i64 = after_conn
.query_row(
"SELECT COUNT(*) FROM episodes WHERE principal_subject = 'alice'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(alice_left, 0);
let bob_left: i64 = after_conn
.query_row(
"SELECT COUNT(*) FROM episodes WHERE principal_subject = 'bob'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(bob_left, 1);
let admin_conn = crate::init::open_sqlcipher(
&data_dir.join(crate::tenants::TENANTS_INDEX_FILENAME),
&key,
)
.unwrap();
let (op, target, principal): (String, Option<String>, Option<String>) = admin_conn
.query_row(
"SELECT operation, target_tenant_id, principal_subject \
FROM audit_events_admin WHERE audit_id = ?",
params![report.audit_admin_row_id],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
)
.unwrap();
assert_eq!(op, "gdpr.forget_user");
assert_eq!(target.as_deref(), Some("default"));
assert_eq!(principal.as_deref(), Some("admin"));
}
#[test]
fn forget_principal_idempotent_on_absent_subject() {
use crate::init::{InitParams, init};
use solo_core::TenantId;
use std::sync::Arc;
use zeroize::Zeroizing;
let tmp = tempfile::TempDir::new().unwrap();
let data_dir = tmp.path().to_path_buf();
let pass = "idempotent forget test";
let outcome = init(InitParams {
data_dir: data_dir.clone(),
passphrase: Zeroizing::new(pass.into()),
force: false,
embedder: crate::init::default_embedder(),
})
.expect("init");
let cfg = crate::config::SoloConfig::read(&outcome.config_path).unwrap();
let salt = cfg.salt_bytes().unwrap();
let key = crate::key_material::KeyMaterial::derive(pass, &salt).unwrap();
let stub = Arc::new(crate::test_support::StubVectorIndex::new(4));
let writer_conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
let crate::writer::WriterSpawn { handle: write_handle, join } =
crate::writer::WriterActor::spawn(writer_conn, stub.clone() as Arc<_>);
let read_pool = crate::reader::ReaderPool::new(
&outcome.db_path,
Some(key.clone()),
stub.clone() as Arc<_>,
)
.unwrap();
let embedder: Arc<dyn solo_core::Embedder> = Arc::new(
crate::embedder::StubEmbedder::new("stub", "v1", 4),
);
let handle = Arc::new(crate::tenants::TenantHandle::from_parts_for_tests(
TenantId::default_tenant(),
cfg,
outcome.db_path.clone(),
data_dir.clone(),
1,
stub.clone() as Arc<_>,
embedder,
write_handle,
join,
read_pool,
));
let report = forget_principal(handle, "ghost", None, &data_dir, &key)
.expect("forget_principal on absent subject");
assert_eq!(report.episodes_deleted, 0);
assert_eq!(report.chunks_deleted, 0);
assert!(!report.hnsw_rebuilt, "no rebuild when nothing deleted");
assert!(report.audit_admin_row_id > 0);
}
fn seed_triple_with_source(
conn: &Connection,
triple_id: &str,
subject: &str,
predicate: &str,
object: &str,
source_episode_id: Option<i64>,
cluster_id: Option<&str>,
) {
let now_ms = chrono::Utc::now().timestamp_millis();
conn.execute(
"INSERT INTO triples (
triple_id, subject_id, predicate, object_id, object_kind,
valid_from_ms, valid_to_ms, confidence, provenance_json,
created_at_ms, updated_at_ms, source_episode_id, cluster_id
) VALUES (?, ?, ?, ?, 'literal', ?, NULL, 0.9, '{}', ?, ?, ?, ?)",
params![
triple_id,
subject,
predicate,
object,
now_ms,
now_ms,
now_ms,
source_episode_id,
cluster_id,
],
)
.expect("seed triple");
}
#[test]
fn delete_principal_rows_cascades_through_triples_by_source_episode() {
let (_tmp, db_path) = seed_two_principal_db();
let mut conn = Connection::open(&db_path).unwrap();
conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
let alice_rowids: Vec<i64> = {
let mut stmt = conn
.prepare(
"SELECT rowid FROM episodes WHERE principal_subject = 'alice' ORDER BY rowid",
)
.unwrap();
stmt.query_map([], |r| r.get::<_, i64>(0))
.unwrap()
.map(|r| r.unwrap())
.collect()
};
assert_eq!(alice_rowids.len(), 3);
let bob_rowid: i64 = conn
.query_row(
"SELECT rowid FROM episodes WHERE principal_subject = 'bob' LIMIT 1",
[],
|r| r.get(0),
)
.unwrap();
seed_triple_with_source(
&conn,
"00000000-0000-0000-0000-000000000a01",
"alice",
"uses",
"rust",
Some(alice_rowids[0]),
None,
);
seed_triple_with_source(
&conn,
"00000000-0000-0000-0000-000000000a02",
"alice",
"likes",
"skiing",
Some(alice_rowids[1]),
None,
);
seed_triple_with_source(
&conn,
"00000000-0000-0000-0000-000000000b01",
"bob",
"uses",
"go",
Some(bob_rowid),
None,
);
let outcome = delete_principal_rows(&mut conn, "alice").unwrap();
assert_eq!(outcome.episodes_deleted, 3);
assert_eq!(
outcome.triples_deleted, 2,
"should delete the two triples whose source was an alice episode"
);
assert_eq!(outcome.triples_orphan_null_source, 0);
let bob_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM triples WHERE subject_id = 'bob'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(bob_count, 1);
}
#[test]
fn delete_principal_rows_preserves_null_source_triples() {
let (_tmp, db_path) = seed_two_principal_db();
let mut conn = Connection::open(&db_path).unwrap();
conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
let alice_rowids: Vec<i64> = {
let mut stmt = conn
.prepare(
"SELECT rowid FROM episodes WHERE principal_subject = 'alice' ORDER BY rowid",
)
.unwrap();
stmt.query_map([], |r| r.get::<_, i64>(0))
.unwrap()
.map(|r| r.unwrap())
.collect()
};
seed_triple_with_source(
&conn,
"00000000-0000-0000-0000-000000000c01",
"alice",
"claimed",
"x",
None,
None,
);
seed_triple_with_source(
&conn,
"00000000-0000-0000-0000-000000000c02",
"alice",
"linked",
"y",
Some(alice_rowids[0]),
None,
);
let outcome = delete_principal_rows(&mut conn, "alice").unwrap();
assert_eq!(outcome.triples_deleted, 1, "only the source-linked triple");
let remaining: i64 = conn
.query_row(
"SELECT COUNT(*) FROM triples WHERE source_episode_id IS NULL",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(remaining, 1, "NULL-source row must survive");
}
#[test]
fn delete_principal_rows_counts_orphans_via_cluster_membership() {
let (_tmp, db_path) = seed_two_principal_db();
let mut conn = Connection::open(&db_path).unwrap();
conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
let alice_rows: Vec<(i64, String)> = {
let mut stmt = conn
.prepare(
"SELECT rowid, memory_id FROM episodes WHERE principal_subject = 'alice' \
ORDER BY rowid",
)
.unwrap();
stmt.query_map([], |r| {
Ok((r.get::<_, i64>(0)?, r.get::<_, String>(1)?))
})
.unwrap()
.map(|r| r.unwrap())
.collect()
};
let alice_memory_id = alice_rows[0].1.clone();
let cluster_id = "00000000-0000-0000-0000-0000000c1001";
let now_ms = chrono::Utc::now().timestamp_millis();
conn.execute(
"INSERT INTO clusters (cluster_id, coherence, created_at_ms) VALUES (?, ?, ?)",
params![cluster_id, 0.9_f64, now_ms],
)
.unwrap();
conn.execute(
"INSERT INTO cluster_episodes (cluster_id, memory_id) VALUES (?, ?)",
params![cluster_id, alice_memory_id],
)
.unwrap();
seed_triple_with_source(
&conn,
"00000000-0000-0000-0000-000000000d01",
"alice",
"claimed",
"fact",
None,
Some(cluster_id),
);
let outcome = delete_principal_rows(&mut conn, "alice").unwrap();
assert_eq!(
outcome.triples_orphan_null_source, 1,
"the NULL-source triple linked via cluster must appear in the orphan count"
);
let remaining: i64 = conn
.query_row(
"SELECT COUNT(*) FROM triples WHERE triple_id = '00000000-0000-0000-0000-000000000d01'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(remaining, 1, "orphan triple must remain in the table");
}
#[test]
fn migration_0007_adds_source_episode_id_column_and_index() {
let mut conn = Connection::open_in_memory().unwrap();
crate::migration::run_migrations(&mut conn).unwrap();
let cols: Vec<(String, String)> = conn
.prepare("PRAGMA table_info('triples')")
.unwrap()
.query_map([], |row| {
Ok((row.get::<_, String>(1)?, row.get::<_, String>(2)?))
})
.unwrap()
.map(|r| r.unwrap())
.collect();
let names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
assert!(
names.contains(&"source_episode_id"),
"triples missing source_episode_id after 0007; got {names:?}"
);
let idx_exists: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master \
WHERE type='index' AND name='idx_triples_source_episode'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(idx_exists, 1, "idx_triples_source_episode missing");
}
#[test]
fn migration_0007_is_idempotent_on_repeated_open() {
let mut conn = Connection::open_in_memory().unwrap();
crate::migration::run_migrations(&mut conn).unwrap();
crate::migration::run_migrations(&mut conn).unwrap();
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM schema_migrations WHERE version = 7",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(count, 1, "0007 row must not be inserted twice");
}
#[test]
fn migration_0007_backfills_source_episode_id_from_provenance_json() {
let mut conn = Connection::open_in_memory().unwrap();
crate::migration::run_migrations(&mut conn).unwrap();
let now_ms = chrono::Utc::now().timestamp_millis();
conn.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, content,
encoding_context_json, confidence, strength, salience,
tier, created_at_ms, updated_at_ms
) VALUES ('00000000-0000-0000-0000-000000000111', ?, 'user_message',
'seed', '{}', 0.9, 0.5, 0.5, 'hot', ?, ?)",
params![now_ms, now_ms, now_ms],
)
.unwrap();
let ep_rowid: i64 = conn
.query_row(
"SELECT rowid FROM episodes WHERE memory_id = ?",
params!["00000000-0000-0000-0000-000000000111"],
|r| r.get(0),
)
.unwrap();
let prov = serde_json::json!({
"derived_from": ["00000000-0000-0000-0000-000000000111"],
"derivation": "extraction",
"by": "test",
"at_ms": now_ms,
});
conn.execute(
"INSERT INTO triples (
triple_id, subject_id, predicate, object_id, object_kind,
valid_from_ms, valid_to_ms, confidence, provenance_json,
created_at_ms, updated_at_ms, source_episode_id
) VALUES ('00000000-0000-0000-0000-000000000222',
'alice', 'uses', 'rust', 'literal',
?, NULL, 0.9, ?, ?, ?, NULL)",
params![now_ms, prov.to_string(), now_ms, now_ms],
)
.unwrap();
let backfill = include_str!("migrations/0007_triples_source.sql");
let update_sql = backfill
.split("UPDATE triples")
.nth(1)
.map(|tail| format!("UPDATE triples{tail}"))
.expect("0007 contains the UPDATE backfill");
conn.execute_batch(&update_sql).unwrap();
let backfilled: Option<i64> = conn
.query_row(
"SELECT source_episode_id FROM triples \
WHERE triple_id = '00000000-0000-0000-0000-000000000222'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(
backfilled,
Some(ep_rowid),
"backfill must resolve derived_from[0] memory_id → episodes.rowid"
);
}
#[test]
fn forget_report_carries_triples_deleted_count_through_to_audit_details() {
use crate::init::{InitParams, init};
use solo_core::TenantId;
use zeroize::Zeroizing;
let tmp = tempfile::TempDir::new().unwrap();
let data_dir = tmp.path().to_path_buf();
let pass = "forget triples cascade test passphrase";
let outcome = init(InitParams {
data_dir: data_dir.clone(),
passphrase: Zeroizing::new(pass.into()),
force: false,
embedder: crate::init::default_embedder(),
})
.expect("init");
let cfg = crate::config::SoloConfig::read(&outcome.config_path).unwrap();
let salt = cfg.salt_bytes().unwrap();
let key = crate::key_material::KeyMaterial::derive(pass, &salt).unwrap();
{
let conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
let now = chrono::Utc::now().timestamp_millis();
conn.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, content,
encoding_context_json, confidence, strength, salience,
tier, created_at_ms, updated_at_ms, principal_subject
) VALUES ('00000000-0000-0000-0000-000000000ace', ?, 'user_message',
'alice ep', '{}', 0.9, 0.5, 0.5, 'hot', ?, ?, 'alice')",
params![now, now, now],
)
.unwrap();
let ep_rowid: i64 = conn
.query_row(
"SELECT rowid FROM episodes WHERE memory_id = ?",
params!["00000000-0000-0000-0000-000000000ace"],
|r| r.get(0),
)
.unwrap();
conn.execute(
"INSERT INTO triples (
triple_id, subject_id, predicate, object_id, object_kind,
valid_from_ms, valid_to_ms, confidence, provenance_json,
created_at_ms, updated_at_ms, source_episode_id
) VALUES ('00000000-0000-0000-0000-000000000a11',
'alice', 'uses', 'rust', 'literal',
?, NULL, 0.9, '{}', ?, ?, ?)",
params![now, now, now, ep_rowid],
)
.unwrap();
}
let stub = Arc::new(crate::test_support::StubVectorIndex::new(4));
let writer_conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
let crate::writer::WriterSpawn { handle: write_handle, join } =
crate::writer::WriterActor::spawn(writer_conn, stub.clone() as Arc<_>);
let read_pool = crate::reader::ReaderPool::new(
&outcome.db_path,
Some(key.clone()),
stub.clone() as Arc<_>,
)
.unwrap();
let embedder: Arc<dyn solo_core::Embedder> = Arc::new(
crate::embedder::StubEmbedder::new("stub", "v1", 4),
);
let handle = Arc::new(crate::tenants::TenantHandle::from_parts_for_tests(
TenantId::default_tenant(),
cfg,
outcome.db_path.clone(),
data_dir.clone(),
1,
stub.clone() as Arc<_>,
embedder,
write_handle,
join,
read_pool,
));
let report = forget_principal(handle, "alice", Some("admin"), &data_dir, &key)
.expect("forget_principal");
assert_eq!(report.episodes_deleted, 1);
assert_eq!(report.triples_deleted, 1, "real count, not 0");
let admin_conn = crate::init::open_sqlcipher(
&data_dir.join(crate::tenants::TENANTS_INDEX_FILENAME),
&key,
)
.unwrap();
let details_json: String = admin_conn
.query_row(
"SELECT details_json FROM audit_events_admin WHERE audit_id = ?",
params![report.audit_admin_row_id],
|r| r.get(0),
)
.unwrap();
let parsed: serde_json::Value = serde_json::from_str(&details_json).unwrap();
assert_eq!(parsed["triples_deleted"], 1);
assert_eq!(parsed["episodes_deleted"], 1);
}
#[test]
fn estimate_via_direct_count_matches_seeded_data() {
let (_tmp, db_path) = seed_two_principal_db();
let conn = Connection::open(&db_path).unwrap();
let alice_eps: i64 = conn
.query_row(
"SELECT COUNT(*) FROM episodes WHERE principal_subject = ?",
params!["alice"],
|r| r.get(0),
)
.unwrap();
assert_eq!(alice_eps, 3);
let alice_chunks: i64 = conn
.query_row(
"SELECT COUNT(*) FROM document_chunks WHERE ingested_by_principal = ?",
params!["alice"],
|r| r.get(0),
)
.unwrap();
assert_eq!(alice_chunks, 3);
let bob_eps: i64 = conn
.query_row(
"SELECT COUNT(*) FROM episodes WHERE principal_subject = ?",
params!["bob"],
|r| r.get(0),
)
.unwrap();
assert_eq!(bob_eps, 2);
let ghost_eps: i64 = conn
.query_row(
"SELECT COUNT(*) FROM episodes WHERE principal_subject = ?",
params!["ghost"],
|r| r.get(0),
)
.unwrap();
assert_eq!(ghost_eps, 0);
}
}