1use std::path::Path;
41use std::sync::Arc;
42
43use rusqlite::{Connection, TransactionBehavior, params};
44use solo_core::{Embedder, Error, Result, TenantId, VectorIndex};
45
46use crate::audit::{AuditOperation, AuditResult, insert_audit_admin_row};
47use crate::embedder_registry::EmbedderIdentity;
48use crate::hnsw_id::episode_hnsw_id;
49use crate::init::open_sqlcipher;
50use crate::key_material::KeyMaterial;
51use crate::tenants::TenantsIndex;
52
53#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct ForgetReport {
58 pub episodes_deleted: u64,
60 pub triples_deleted: u64,
64 pub chunks_deleted: u64,
66 pub triples_orphan_null_source: u64,
73 pub hnsw_rebuilt: bool,
76 pub audit_admin_row_id: i64,
81}
82
83pub fn forget_principal(
106 tenant_handle: Arc<crate::tenants::TenantHandle>,
107 principal_subject: &str,
108 actor_principal: Option<&str>,
109 data_dir: &Path,
110 key: &KeyMaterial,
111) -> Result<ForgetReport> {
112 let tenant_id = tenant_handle.tenant_id().clone();
113 let db_path = tenant_handle.db_path().to_path_buf();
114 let hnsw = tenant_handle.hnsw().clone();
115 let embedder_id = tenant_handle.embedder_id();
116
117 let mut conn = open_sqlcipher(&db_path, key)?;
125
126 let DeleteOutcome {
127 episodes_deleted,
128 triples_deleted,
129 triples_orphan_null_source,
130 chunks_deleted,
131 episode_rowids,
132 } = delete_principal_rows(&mut conn, principal_subject)?;
133
134 if triples_orphan_null_source > 0 {
135 tracing::warn!(
140 tenant = %tenant_id,
141 principal = %principal_subject,
142 orphan_count = triples_orphan_null_source,
143 "gdpr.forget_user: {triples_orphan_null_source} triple(s) reference \
144 the forgotten subject's cluster(s) but have NULL source_episode_id \
145 (pre-v0.8.1 schema or unresolved provenance). Cannot cascade. \
146 Operator action: review and manually clean up if compliance scope \
147 requires it."
148 );
149 }
150
151 let total_deleted = episodes_deleted + triples_deleted + chunks_deleted;
152
153 for rowid in &episode_rowids {
159 let _ = hnsw.remove(episode_hnsw_id(*rowid));
161 }
162
163 let hnsw_rebuilt = if total_deleted > 0 {
164 rebuild_hnsw_after_forget(&conn, hnsw.as_ref(), embedder_id)?;
165 true
166 } else {
167 false
168 };
169
170 let now_ms = chrono::Utc::now().timestamp_millis();
176 let admin_path = data_dir.join(crate::tenants::TENANTS_INDEX_FILENAME);
177 let admin_conn = open_sqlcipher(&admin_path, key)?;
178 let details = serde_json::json!({
179 "principal_subject": principal_subject,
180 "episodes_deleted": episodes_deleted,
181 "triples_deleted": triples_deleted,
182 "triples_orphan_null_source": triples_orphan_null_source,
183 "chunks_deleted": chunks_deleted,
184 "hnsw_rebuilt": hnsw_rebuilt,
185 });
186 let audit_admin_row_id = insert_audit_admin_row(
187 &admin_conn,
188 now_ms,
189 actor_principal,
190 AuditOperation::GdprForgetUser,
191 Some(tenant_id.as_str()),
192 AuditResult::Ok,
193 Some(&details),
194 )?;
195
196 if total_deleted > 0 {
203 let _ = tenant_handle
204 .invalidate_sender()
205 .send(solo_core::InvalidateEvent {
206 reason: AuditOperation::GdprForgetUser.as_str().to_string(),
207 tenant_id: tenant_id.to_string(),
208 ts_ms: now_ms,
209 kind: "tenant".to_string(),
210 });
211 }
212
213 Ok(ForgetReport {
214 episodes_deleted,
215 triples_deleted,
216 triples_orphan_null_source,
217 chunks_deleted,
218 hnsw_rebuilt,
219 audit_admin_row_id,
220 })
221}
222
223struct DeleteOutcome {
232 episodes_deleted: u64,
233 triples_deleted: u64,
234 triples_orphan_null_source: u64,
235 chunks_deleted: u64,
236 episode_rowids: Vec<i64>,
237}
238
239fn delete_principal_rows(conn: &mut Connection, principal_subject: &str) -> Result<DeleteOutcome> {
245 let tx = conn
246 .transaction_with_behavior(TransactionBehavior::Immediate)
247 .map_err(|e| Error::storage(format!("BEGIN IMMEDIATE for forget: {e}")))?;
248
249 let mut rowids: Vec<i64> = {
254 let mut stmt = tx
255 .prepare("SELECT rowid FROM episodes WHERE principal_subject = ?")
256 .map_err(|e| Error::storage(format!("prepare SELECT episodes.rowid: {e}")))?;
257 let rows = stmt
258 .query_map(params![principal_subject], |r| r.get::<_, i64>(0))
259 .map_err(|e| Error::storage(format!("query episodes.rowid: {e}")))?;
260 rows.collect::<rusqlite::Result<Vec<_>>>()
261 .map_err(|e| Error::storage(format!("collect episode rowids: {e}")))?
262 };
263 rowids.sort_unstable();
264
265 let (triples_deleted, triples_orphan_null_source): (u64, u64) = if rowids.is_empty() {
282 (0, 0)
283 } else {
284 let placeholders = std::iter::repeat("?")
291 .take(rowids.len())
292 .collect::<Vec<_>>()
293 .join(",");
294
295 let delete_sql = format!("DELETE FROM triples WHERE source_episode_id IN ({placeholders})");
296 let deleted =
297 tx.execute(&delete_sql, rusqlite::params_from_iter(rowids.iter()))
298 .map_err(|e| Error::storage(format!("DELETE triples: {e}")))? as u64;
299
300 let orphan_sql = format!(
308 "SELECT COUNT(*) FROM triples t \
309 WHERE t.source_episode_id IS NULL \
310 AND t.cluster_id IN ( \
311 SELECT DISTINCT ce.cluster_id FROM cluster_episodes ce \
312 JOIN episodes e ON e.memory_id = ce.memory_id \
313 WHERE e.rowid IN ({placeholders}) \
314 )"
315 );
316 let orphans: i64 = tx
317 .query_row(
318 &orphan_sql,
319 rusqlite::params_from_iter(rowids.iter()),
320 |r| r.get(0),
321 )
322 .map_err(|e| Error::storage(format!("COUNT orphan triples: {e}")))?;
323
324 (deleted, orphans.max(0) as u64)
325 };
326
327 let episodes_deleted: u64 =
340 tx.execute(
341 "DELETE FROM episodes WHERE principal_subject = ?",
342 params![principal_subject],
343 )
344 .map_err(|e| Error::storage(format!("DELETE episodes: {e}")))? as u64;
345
346 let chunks_deleted: u64 =
355 tx.execute(
356 "DELETE FROM document_chunks WHERE ingested_by_principal = ?",
357 params![principal_subject],
358 )
359 .map_err(|e| Error::storage(format!("DELETE document_chunks: {e}")))? as u64;
360
361 tx.commit()
362 .map_err(|e| Error::storage(format!("COMMIT forget: {e}")))?;
363
364 Ok(DeleteOutcome {
365 episodes_deleted,
366 triples_deleted,
367 triples_orphan_null_source,
368 chunks_deleted,
369 episode_rowids: rowids,
370 })
371}
372
373fn rebuild_hnsw_after_forget(
384 conn: &Connection,
385 hnsw: &dyn VectorIndex,
386 embedder_id: i64,
387) -> Result<()> {
388 let _ = crate::recovery::rebuild_hnsw_from_sql(conn, hnsw, embedder_id)?;
402
403 Ok(())
404}
405
406pub fn estimate_forget_scope(
410 db_path: &Path,
411 key: &KeyMaterial,
412 principal_subject: &str,
413) -> Result<(u64, u64)> {
414 let conn = open_sqlcipher(db_path, key)?;
415 let episodes: i64 = conn
416 .query_row(
417 "SELECT COUNT(*) FROM episodes WHERE principal_subject = ?",
418 params![principal_subject],
419 |r| r.get(0),
420 )
421 .map_err(|e| Error::storage(format!("COUNT episodes for estimate: {e}")))?;
422 let chunks: i64 = conn
423 .query_row(
424 "SELECT COUNT(*) FROM document_chunks WHERE ingested_by_principal = ?",
425 params![principal_subject],
426 |r| r.get(0),
427 )
428 .map_err(|e| Error::storage(format!("COUNT chunks for estimate: {e}")))?;
429 Ok((episodes as u64, chunks as u64))
430}
431
432#[allow(dead_code)]
435fn _silence_unused() {
436 let _: Option<&dyn Embedder> = None;
437 let _: Option<TenantId> = None;
438 let _: Option<&TenantsIndex> = None;
439 let _: Option<&EmbedderIdentity> = None;
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445 use rusqlite::params;
446
447 fn seed_two_principal_db() -> (tempfile::TempDir, std::path::PathBuf) {
450 let tmp = tempfile::TempDir::new().unwrap();
451 let db_path = tmp.path().join("forget.db");
452 let mut conn = Connection::open(&db_path).unwrap();
453 conn.execute_batch(
454 "PRAGMA journal_mode = wal;
455 PRAGMA foreign_keys = ON;
456 PRAGMA busy_timeout = 5000;",
457 )
458 .unwrap();
459 crate::migration::run_migrations(&mut conn).unwrap();
460
461 let now_ms = chrono::Utc::now().timestamp_millis();
462 for i in 0..3 {
464 conn.execute(
465 "INSERT INTO episodes (
466 memory_id, ts_ms, source_type, content,
467 encoding_context_json, confidence, strength, salience,
468 tier, created_at_ms, updated_at_ms, principal_subject
469 ) VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5, 'hot', ?, ?, 'alice')",
470 params![
471 format!("00000000-0000-0000-0000-00000000a{i:03x}"),
472 now_ms,
473 format!("alice content {i}"),
474 now_ms,
475 now_ms,
476 ],
477 )
478 .unwrap();
479 }
480 for i in 0..2 {
481 conn.execute(
482 "INSERT INTO episodes (
483 memory_id, ts_ms, source_type, content,
484 encoding_context_json, confidence, strength, salience,
485 tier, created_at_ms, updated_at_ms, principal_subject
486 ) VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5, 'hot', ?, ?, 'bob')",
487 params![
488 format!("00000000-0000-0000-0000-00000000b{i:03x}"),
489 now_ms,
490 format!("bob content {i}"),
491 now_ms,
492 now_ms,
493 ],
494 )
495 .unwrap();
496 }
497
498 conn.execute(
500 "INSERT INTO documents (
501 doc_id, source, title, mime_type, ingested_at_ms,
502 modified_at_ms, status, chunk_count, content_hash, byte_size
503 ) VALUES ('00000000-0000-0000-0000-000000000d01', 'src://test', 't',
504 'text/markdown', ?, NULL, 'active', 4, 'hashabc', 200)",
505 params![now_ms],
506 )
507 .unwrap();
508 for i in 0..3 {
509 conn.execute(
510 "INSERT INTO document_chunks (
511 chunk_id, doc_id, chunk_index, content, token_count,
512 start_offset, end_offset, created_at_ms, ingested_by_principal
513 ) VALUES (?, ?, ?, ?, 5, ?, ?, ?, 'alice')",
514 params![
515 format!("00000000-0000-0000-0000-00000000c{i:03x}"),
516 "00000000-0000-0000-0000-000000000d01",
517 i,
518 format!("alice chunk {i}"),
519 (i * 10) as i64,
520 ((i + 1) * 10) as i64,
521 now_ms,
522 ],
523 )
524 .unwrap();
525 }
526 conn.execute(
527 "INSERT INTO document_chunks (
528 chunk_id, doc_id, chunk_index, content, token_count,
529 start_offset, end_offset, created_at_ms, ingested_by_principal
530 ) VALUES ('00000000-0000-0000-0000-00000000ccc1', '00000000-0000-0000-0000-000000000d01',
531 3, 'bob chunk', 5, 30, 40, ?, 'bob')",
532 params![now_ms],
533 )
534 .unwrap();
535
536 (tmp, db_path)
537 }
538
539 #[test]
540 fn delete_principal_rows_targets_only_named_subject() {
541 let (_tmp, db_path) = seed_two_principal_db();
542 let mut conn = Connection::open(&db_path).unwrap();
543 conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
544 let DeleteOutcome {
545 episodes_deleted: episodes,
546 chunks_deleted: chunks,
547 ..
548 } = delete_principal_rows(&mut conn, "alice").unwrap();
549 assert_eq!(episodes, 3, "should delete alice's 3 episodes");
550 assert_eq!(chunks, 3, "should delete alice's 3 chunks");
551 let alice_remaining: i64 = conn
553 .query_row(
554 "SELECT COUNT(*) FROM episodes WHERE principal_subject = 'alice'",
555 [],
556 |r| r.get(0),
557 )
558 .unwrap();
559 assert_eq!(alice_remaining, 0);
560 let bob_remaining: i64 = conn
561 .query_row(
562 "SELECT COUNT(*) FROM episodes WHERE principal_subject = 'bob'",
563 [],
564 |r| r.get(0),
565 )
566 .unwrap();
567 assert_eq!(bob_remaining, 2);
568 let bob_chunks: i64 = conn
569 .query_row(
570 "SELECT COUNT(*) FROM document_chunks WHERE ingested_by_principal = 'bob'",
571 [],
572 |r| r.get(0),
573 )
574 .unwrap();
575 assert_eq!(bob_chunks, 1);
576 }
577
578 #[test]
579 fn delete_principal_rows_idempotent_on_absent_subject() {
580 let (_tmp, db_path) = seed_two_principal_db();
581 let mut conn = Connection::open(&db_path).unwrap();
582 let DeleteOutcome {
583 episodes_deleted: episodes,
584 chunks_deleted: chunks,
585 episode_rowids: rowids,
586 ..
587 } = delete_principal_rows(&mut conn, "ghost").unwrap();
588 assert_eq!(episodes, 0);
589 assert_eq!(chunks, 0);
590 assert!(rowids.is_empty());
591 let DeleteOutcome {
593 episodes_deleted: e2,
594 chunks_deleted: c2,
595 ..
596 } = delete_principal_rows(&mut conn, "ghost").unwrap();
597 assert_eq!(e2, 0);
598 assert_eq!(c2, 0);
599 }
600
601 #[test]
602 fn delete_principal_rows_cascades_to_embeddings() {
603 let (_tmp, db_path) = seed_two_principal_db();
607 let mut conn = Connection::open(&db_path).unwrap();
608 conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
609
610 let alice_id_0 = "00000000-0000-0000-0000-00000000a000";
612 conn.execute(
614 "INSERT INTO embedders (name, version, dim, dtype, first_seen_ms)
615 VALUES ('test', 'v1', 4, 'f32', 0)",
616 [],
617 )
618 .unwrap();
619 let embedder_id: i64 = conn
620 .query_row(
621 "SELECT embedder_id FROM embedders WHERE name = 'test'",
622 [],
623 |r| r.get(0),
624 )
625 .unwrap();
626 let vec_blob = vec![0u8; 16];
627 conn.execute(
628 "INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms)
629 VALUES (?, ?, 'f32', 4, ?, 0)",
630 params![alice_id_0, embedder_id, &vec_blob[..]],
631 )
632 .unwrap();
633
634 let before: i64 = conn
635 .query_row("SELECT COUNT(*) FROM embeddings", [], |r| r.get(0))
636 .unwrap();
637 assert_eq!(before, 1);
638
639 let _ = delete_principal_rows(&mut conn, "alice").unwrap();
640
641 let after: i64 = conn
642 .query_row("SELECT COUNT(*) FROM embeddings", [], |r| r.get(0))
643 .unwrap();
644 assert_eq!(after, 0, "embeddings should cascade-delete with episodes");
645 }
646
647 #[test]
648 #[ignore = "requires SQLCipher: estimate_forget_scope opens via open_sqlcipher which fails on plain SQLite test DBs. Round-trip is covered by SQLCipher-enabled integration tests."]
649 fn estimate_forget_scope_counts_correctly() {
650 let (_tmp, db_path) = seed_two_principal_db();
655 let key = crate::key_material::KeyMaterial::derive("test-pass", &[0u8; 16]).unwrap();
656 let (eps, chunks) = estimate_forget_scope(&db_path, &key, "alice").unwrap();
657 assert_eq!(eps, 3);
658 assert_eq!(chunks, 3);
659 let (eps_b, chunks_b) = estimate_forget_scope(&db_path, &key, "bob").unwrap();
660 assert_eq!(eps_b, 2);
661 assert_eq!(chunks_b, 1);
662 let (eps_ghost, _) = estimate_forget_scope(&db_path, &key, "ghost").unwrap();
663 assert_eq!(eps_ghost, 0);
664 }
665
666 #[test]
672 fn forget_principal_round_trip_against_real_sqlcipher() {
673 use crate::init::{InitParams, init};
674 use solo_core::TenantId;
675 use std::sync::Arc;
676 use zeroize::Zeroizing;
677
678 let tmp = tempfile::TempDir::new().unwrap();
679 let data_dir = tmp.path().to_path_buf();
680 let pass = "forget round-trip test passphrase";
681 let outcome = init(InitParams {
682 data_dir: data_dir.clone(),
683 passphrase: Zeroizing::new(pass.into()),
684 force: false,
685 embedder: crate::init::default_embedder(),
686 })
687 .expect("init");
688
689 let cfg = crate::config::SoloConfig::read(&outcome.config_path).unwrap();
690 let salt = cfg.salt_bytes().unwrap();
691 let key = crate::key_material::KeyMaterial::derive(pass, &salt).unwrap();
692
693 {
695 let conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
696 let now = chrono::Utc::now().timestamp_millis();
697 for i in 0..3 {
698 conn.execute(
699 "INSERT INTO episodes (
700 memory_id, ts_ms, source_type, content,
701 encoding_context_json, confidence, strength, salience,
702 tier, created_at_ms, updated_at_ms, principal_subject
703 ) VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5,
704 'hot', ?, ?, 'alice')",
705 params![
706 format!("00000000-0000-0000-0000-0000000{i:08x}"),
707 now,
708 format!("alice ep {i}"),
709 now,
710 now,
711 ],
712 )
713 .unwrap();
714 }
715 conn.execute(
716 "INSERT INTO episodes (
717 memory_id, ts_ms, source_type, content,
718 encoding_context_json, confidence, strength, salience,
719 tier, created_at_ms, updated_at_ms, principal_subject
720 ) VALUES (?, ?, 'user_message', 'bob ep', '{}', 0.9, 0.5, 0.5,
721 'hot', ?, ?, 'bob')",
722 params!["00000000-0000-0000-0000-0000000b000000", now, now, now,],
723 )
724 .unwrap();
725 }
726
727 let stub = Arc::new(crate::test_support::StubVectorIndex::new(4));
729 let writer_conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
730 let crate::writer::WriterSpawn {
731 handle: write_handle,
732 join,
733 } = crate::writer::WriterActor::spawn(writer_conn, stub.clone() as Arc<_>);
734 let read_pool = crate::reader::ReaderPool::new(
735 &outcome.db_path,
736 Some(key.clone()),
737 stub.clone() as Arc<_>,
738 )
739 .unwrap();
740 let embedder: Arc<dyn solo_core::Embedder> =
741 Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", 4));
742 let handle = Arc::new(crate::tenants::TenantHandle::from_parts_for_tests(
743 TenantId::default_tenant(),
744 cfg.clone(),
745 outcome.db_path.clone(),
746 data_dir.clone(),
747 1, stub.clone() as Arc<_>,
749 embedder,
750 write_handle,
751 join,
752 read_pool,
753 ));
754
755 let report = forget_principal(handle, "alice", Some("admin"), &data_dir, &key)
757 .expect("forget_principal");
758 assert_eq!(report.episodes_deleted, 3);
759 assert_eq!(report.triples_deleted, 0);
760 assert_eq!(report.chunks_deleted, 0);
761 assert!(report.hnsw_rebuilt);
762
763 let after_conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
765 let alice_left: i64 = after_conn
766 .query_row(
767 "SELECT COUNT(*) FROM episodes WHERE principal_subject = 'alice'",
768 [],
769 |r| r.get(0),
770 )
771 .unwrap();
772 assert_eq!(alice_left, 0);
773 let bob_left: i64 = after_conn
774 .query_row(
775 "SELECT COUNT(*) FROM episodes WHERE principal_subject = 'bob'",
776 [],
777 |r| r.get(0),
778 )
779 .unwrap();
780 assert_eq!(bob_left, 1);
781
782 let admin_conn = crate::init::open_sqlcipher(
784 &data_dir.join(crate::tenants::TENANTS_INDEX_FILENAME),
785 &key,
786 )
787 .unwrap();
788 let (op, target, principal): (String, Option<String>, Option<String>) = admin_conn
789 .query_row(
790 "SELECT operation, target_tenant_id, principal_subject \
791 FROM audit_events_admin WHERE audit_id = ?",
792 params![report.audit_admin_row_id],
793 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
794 )
795 .unwrap();
796 assert_eq!(op, "gdpr.forget_user");
797 assert_eq!(target.as_deref(), Some("default"));
798 assert_eq!(principal.as_deref(), Some("admin"));
799 }
800
801 #[test]
805 fn forget_principal_idempotent_on_absent_subject() {
806 use crate::init::{InitParams, init};
807 use solo_core::TenantId;
808 use std::sync::Arc;
809 use zeroize::Zeroizing;
810
811 let tmp = tempfile::TempDir::new().unwrap();
812 let data_dir = tmp.path().to_path_buf();
813 let pass = "idempotent forget test";
814 let outcome = init(InitParams {
815 data_dir: data_dir.clone(),
816 passphrase: Zeroizing::new(pass.into()),
817 force: false,
818 embedder: crate::init::default_embedder(),
819 })
820 .expect("init");
821 let cfg = crate::config::SoloConfig::read(&outcome.config_path).unwrap();
822 let salt = cfg.salt_bytes().unwrap();
823 let key = crate::key_material::KeyMaterial::derive(pass, &salt).unwrap();
824
825 let stub = Arc::new(crate::test_support::StubVectorIndex::new(4));
826 let writer_conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
827 let crate::writer::WriterSpawn {
828 handle: write_handle,
829 join,
830 } = crate::writer::WriterActor::spawn(writer_conn, stub.clone() as Arc<_>);
831 let read_pool = crate::reader::ReaderPool::new(
832 &outcome.db_path,
833 Some(key.clone()),
834 stub.clone() as Arc<_>,
835 )
836 .unwrap();
837 let embedder: Arc<dyn solo_core::Embedder> =
838 Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", 4));
839 let handle = Arc::new(crate::tenants::TenantHandle::from_parts_for_tests(
840 TenantId::default_tenant(),
841 cfg,
842 outcome.db_path.clone(),
843 data_dir.clone(),
844 1,
845 stub.clone() as Arc<_>,
846 embedder,
847 write_handle,
848 join,
849 read_pool,
850 ));
851
852 let report = forget_principal(handle, "ghost", None, &data_dir, &key)
853 .expect("forget_principal on absent subject");
854 assert_eq!(report.episodes_deleted, 0);
855 assert_eq!(report.chunks_deleted, 0);
856 assert!(!report.hnsw_rebuilt, "no rebuild when nothing deleted");
857 assert!(report.audit_admin_row_id > 0);
859 }
860
861 fn seed_triple_with_source(
869 conn: &Connection,
870 triple_id: &str,
871 subject: &str,
872 predicate: &str,
873 object: &str,
874 source_episode_id: Option<i64>,
875 cluster_id: Option<&str>,
876 ) {
877 let now_ms = chrono::Utc::now().timestamp_millis();
878 conn.execute(
879 "INSERT INTO triples (
880 triple_id, subject_id, predicate, object_id, object_kind,
881 valid_from_ms, valid_to_ms, confidence, provenance_json,
882 created_at_ms, updated_at_ms, source_episode_id, cluster_id
883 ) VALUES (?, ?, ?, ?, 'literal', ?, NULL, 0.9, '{}', ?, ?, ?, ?)",
884 params![
885 triple_id,
886 subject,
887 predicate,
888 object,
889 now_ms,
890 now_ms,
891 now_ms,
892 source_episode_id,
893 cluster_id,
894 ],
895 )
896 .expect("seed triple");
897 }
898
899 #[test]
900 fn delete_principal_rows_cascades_through_triples_by_source_episode() {
901 let (_tmp, db_path) = seed_two_principal_db();
902 let mut conn = Connection::open(&db_path).unwrap();
903 conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
904
905 let alice_rowids: Vec<i64> = {
907 let mut stmt = conn
908 .prepare(
909 "SELECT rowid FROM episodes WHERE principal_subject = 'alice' ORDER BY rowid",
910 )
911 .unwrap();
912 stmt.query_map([], |r| r.get::<_, i64>(0))
913 .unwrap()
914 .map(|r| r.unwrap())
915 .collect()
916 };
917 assert_eq!(alice_rowids.len(), 3);
918 let bob_rowid: i64 = conn
919 .query_row(
920 "SELECT rowid FROM episodes WHERE principal_subject = 'bob' LIMIT 1",
921 [],
922 |r| r.get(0),
923 )
924 .unwrap();
925
926 seed_triple_with_source(
928 &conn,
929 "00000000-0000-0000-0000-000000000a01",
930 "alice",
931 "uses",
932 "rust",
933 Some(alice_rowids[0]),
934 None,
935 );
936 seed_triple_with_source(
937 &conn,
938 "00000000-0000-0000-0000-000000000a02",
939 "alice",
940 "likes",
941 "skiing",
942 Some(alice_rowids[1]),
943 None,
944 );
945 seed_triple_with_source(
946 &conn,
947 "00000000-0000-0000-0000-000000000b01",
948 "bob",
949 "uses",
950 "go",
951 Some(bob_rowid),
952 None,
953 );
954
955 let outcome = delete_principal_rows(&mut conn, "alice").unwrap();
956 assert_eq!(outcome.episodes_deleted, 3);
957 assert_eq!(
958 outcome.triples_deleted, 2,
959 "should delete the two triples whose source was an alice episode"
960 );
961 assert_eq!(outcome.triples_orphan_null_source, 0);
962
963 let bob_count: i64 = conn
965 .query_row(
966 "SELECT COUNT(*) FROM triples WHERE subject_id = 'bob'",
967 [],
968 |r| r.get(0),
969 )
970 .unwrap();
971 assert_eq!(bob_count, 1);
972 }
973
974 #[test]
975 fn delete_principal_rows_preserves_null_source_triples() {
976 let (_tmp, db_path) = seed_two_principal_db();
977 let mut conn = Connection::open(&db_path).unwrap();
978 conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
979 let alice_rowids: Vec<i64> = {
980 let mut stmt = conn
981 .prepare(
982 "SELECT rowid FROM episodes WHERE principal_subject = 'alice' ORDER BY rowid",
983 )
984 .unwrap();
985 stmt.query_map([], |r| r.get::<_, i64>(0))
986 .unwrap()
987 .map(|r| r.unwrap())
988 .collect()
989 };
990
991 seed_triple_with_source(
994 &conn,
995 "00000000-0000-0000-0000-000000000c01",
996 "alice",
997 "claimed",
998 "x",
999 None,
1000 None,
1001 );
1002 seed_triple_with_source(
1004 &conn,
1005 "00000000-0000-0000-0000-000000000c02",
1006 "alice",
1007 "linked",
1008 "y",
1009 Some(alice_rowids[0]),
1010 None,
1011 );
1012
1013 let outcome = delete_principal_rows(&mut conn, "alice").unwrap();
1014 assert_eq!(outcome.triples_deleted, 1, "only the source-linked triple");
1015 let remaining: i64 = conn
1018 .query_row(
1019 "SELECT COUNT(*) FROM triples WHERE source_episode_id IS NULL",
1020 [],
1021 |r| r.get(0),
1022 )
1023 .unwrap();
1024 assert_eq!(remaining, 1, "NULL-source row must survive");
1025 }
1026
1027 #[test]
1028 fn delete_principal_rows_counts_orphans_via_cluster_membership() {
1029 let (_tmp, db_path) = seed_two_principal_db();
1030 let mut conn = Connection::open(&db_path).unwrap();
1031 conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
1032
1033 let alice_rows: Vec<(i64, String)> = {
1034 let mut stmt = conn
1035 .prepare(
1036 "SELECT rowid, memory_id FROM episodes WHERE principal_subject = 'alice' \
1037 ORDER BY rowid",
1038 )
1039 .unwrap();
1040 stmt.query_map([], |r| Ok((r.get::<_, i64>(0)?, r.get::<_, String>(1)?)))
1041 .unwrap()
1042 .map(|r| r.unwrap())
1043 .collect()
1044 };
1045 let alice_memory_id = alice_rows[0].1.clone();
1046
1047 let cluster_id = "00000000-0000-0000-0000-0000000c1001";
1049 let now_ms = chrono::Utc::now().timestamp_millis();
1050 conn.execute(
1051 "INSERT INTO clusters (cluster_id, coherence, created_at_ms) VALUES (?, ?, ?)",
1052 params![cluster_id, 0.9_f64, now_ms],
1053 )
1054 .unwrap();
1055 conn.execute(
1056 "INSERT INTO cluster_episodes (cluster_id, memory_id) VALUES (?, ?)",
1057 params![cluster_id, alice_memory_id],
1058 )
1059 .unwrap();
1060
1061 seed_triple_with_source(
1064 &conn,
1065 "00000000-0000-0000-0000-000000000d01",
1066 "alice",
1067 "claimed",
1068 "fact",
1069 None,
1070 Some(cluster_id),
1071 );
1072
1073 let outcome = delete_principal_rows(&mut conn, "alice").unwrap();
1074 assert_eq!(
1075 outcome.triples_orphan_null_source, 1,
1076 "the NULL-source triple linked via cluster must appear in the orphan count"
1077 );
1078
1079 let remaining: i64 = conn
1082 .query_row(
1083 "SELECT COUNT(*) FROM triples WHERE triple_id = '00000000-0000-0000-0000-000000000d01'",
1084 [],
1085 |r| r.get(0),
1086 )
1087 .unwrap();
1088 assert_eq!(remaining, 1, "orphan triple must remain in the table");
1089 }
1090
1091 #[test]
1092 fn migration_0007_adds_source_episode_id_column_and_index() {
1093 let mut conn = Connection::open_in_memory().unwrap();
1094 crate::migration::run_migrations(&mut conn).unwrap();
1095 let cols: Vec<(String, String)> = conn
1097 .prepare("PRAGMA table_info('triples')")
1098 .unwrap()
1099 .query_map([], |row| {
1100 Ok((row.get::<_, String>(1)?, row.get::<_, String>(2)?))
1101 })
1102 .unwrap()
1103 .map(|r| r.unwrap())
1104 .collect();
1105 let names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
1106 assert!(
1107 names.contains(&"source_episode_id"),
1108 "triples missing source_episode_id after 0007; got {names:?}"
1109 );
1110 let idx_exists: i64 = conn
1112 .query_row(
1113 "SELECT COUNT(*) FROM sqlite_master \
1114 WHERE type='index' AND name='idx_triples_source_episode'",
1115 [],
1116 |r| r.get(0),
1117 )
1118 .unwrap();
1119 assert_eq!(idx_exists, 1, "idx_triples_source_episode missing");
1120 }
1121
1122 #[test]
1123 fn migration_0007_is_idempotent_on_repeated_open() {
1124 let mut conn = Connection::open_in_memory().unwrap();
1125 crate::migration::run_migrations(&mut conn).unwrap();
1126 crate::migration::run_migrations(&mut conn).unwrap();
1127 let count: i64 = conn
1128 .query_row(
1129 "SELECT COUNT(*) FROM schema_migrations WHERE version = 7",
1130 [],
1131 |r| r.get(0),
1132 )
1133 .unwrap();
1134 assert_eq!(count, 1, "0007 row must not be inserted twice");
1135 }
1136
1137 #[test]
1138 fn migration_0007_backfills_source_episode_id_from_provenance_json() {
1139 let mut conn = Connection::open_in_memory().unwrap();
1144 crate::migration::run_migrations(&mut conn).unwrap();
1146 let now_ms = chrono::Utc::now().timestamp_millis();
1152 conn.execute(
1153 "INSERT INTO episodes (
1154 memory_id, ts_ms, source_type, content,
1155 encoding_context_json, confidence, strength, salience,
1156 tier, created_at_ms, updated_at_ms
1157 ) VALUES ('00000000-0000-0000-0000-000000000111', ?, 'user_message',
1158 'seed', '{}', 0.9, 0.5, 0.5, 'hot', ?, ?)",
1159 params![now_ms, now_ms, now_ms],
1160 )
1161 .unwrap();
1162 let ep_rowid: i64 = conn
1163 .query_row(
1164 "SELECT rowid FROM episodes WHERE memory_id = ?",
1165 params!["00000000-0000-0000-0000-000000000111"],
1166 |r| r.get(0),
1167 )
1168 .unwrap();
1169 let prov = serde_json::json!({
1172 "derived_from": ["00000000-0000-0000-0000-000000000111"],
1173 "derivation": "extraction",
1174 "by": "test",
1175 "at_ms": now_ms,
1176 });
1177 conn.execute(
1178 "INSERT INTO triples (
1179 triple_id, subject_id, predicate, object_id, object_kind,
1180 valid_from_ms, valid_to_ms, confidence, provenance_json,
1181 created_at_ms, updated_at_ms, source_episode_id
1182 ) VALUES ('00000000-0000-0000-0000-000000000222',
1183 'alice', 'uses', 'rust', 'literal',
1184 ?, NULL, 0.9, ?, ?, ?, NULL)",
1185 params![now_ms, prov.to_string(), now_ms, now_ms],
1186 )
1187 .unwrap();
1188
1189 let backfill = include_str!("migrations/0007_triples_source.sql");
1193 let update_sql = backfill
1197 .split("UPDATE triples")
1198 .nth(1)
1199 .map(|tail| format!("UPDATE triples{tail}"))
1200 .expect("0007 contains the UPDATE backfill");
1201 conn.execute_batch(&update_sql).unwrap();
1202
1203 let backfilled: Option<i64> = conn
1204 .query_row(
1205 "SELECT source_episode_id FROM triples \
1206 WHERE triple_id = '00000000-0000-0000-0000-000000000222'",
1207 [],
1208 |r| r.get(0),
1209 )
1210 .unwrap();
1211 assert_eq!(
1212 backfilled,
1213 Some(ep_rowid),
1214 "backfill must resolve derived_from[0] memory_id → episodes.rowid"
1215 );
1216 }
1217
1218 #[test]
1219 fn forget_report_carries_triples_deleted_count_through_to_audit_details() {
1220 use crate::init::{InitParams, init};
1225 use solo_core::TenantId;
1226 use zeroize::Zeroizing;
1227
1228 let tmp = tempfile::TempDir::new().unwrap();
1229 let data_dir = tmp.path().to_path_buf();
1230 let pass = "forget triples cascade test passphrase";
1231 let outcome = init(InitParams {
1232 data_dir: data_dir.clone(),
1233 passphrase: Zeroizing::new(pass.into()),
1234 force: false,
1235 embedder: crate::init::default_embedder(),
1236 })
1237 .expect("init");
1238 let cfg = crate::config::SoloConfig::read(&outcome.config_path).unwrap();
1239 let salt = cfg.salt_bytes().unwrap();
1240 let key = crate::key_material::KeyMaterial::derive(pass, &salt).unwrap();
1241
1242 {
1244 let conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
1245 let now = chrono::Utc::now().timestamp_millis();
1246 conn.execute(
1247 "INSERT INTO episodes (
1248 memory_id, ts_ms, source_type, content,
1249 encoding_context_json, confidence, strength, salience,
1250 tier, created_at_ms, updated_at_ms, principal_subject
1251 ) VALUES ('00000000-0000-0000-0000-000000000ace', ?, 'user_message',
1252 'alice ep', '{}', 0.9, 0.5, 0.5, 'hot', ?, ?, 'alice')",
1253 params![now, now, now],
1254 )
1255 .unwrap();
1256 let ep_rowid: i64 = conn
1257 .query_row(
1258 "SELECT rowid FROM episodes WHERE memory_id = ?",
1259 params!["00000000-0000-0000-0000-000000000ace"],
1260 |r| r.get(0),
1261 )
1262 .unwrap();
1263 conn.execute(
1264 "INSERT INTO triples (
1265 triple_id, subject_id, predicate, object_id, object_kind,
1266 valid_from_ms, valid_to_ms, confidence, provenance_json,
1267 created_at_ms, updated_at_ms, source_episode_id
1268 ) VALUES ('00000000-0000-0000-0000-000000000a11',
1269 'alice', 'uses', 'rust', 'literal',
1270 ?, NULL, 0.9, '{}', ?, ?, ?)",
1271 params![now, now, now, ep_rowid],
1272 )
1273 .unwrap();
1274 }
1275
1276 let stub = Arc::new(crate::test_support::StubVectorIndex::new(4));
1278 let writer_conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
1279 let crate::writer::WriterSpawn {
1280 handle: write_handle,
1281 join,
1282 } = crate::writer::WriterActor::spawn(writer_conn, stub.clone() as Arc<_>);
1283 let read_pool = crate::reader::ReaderPool::new(
1284 &outcome.db_path,
1285 Some(key.clone()),
1286 stub.clone() as Arc<_>,
1287 )
1288 .unwrap();
1289 let embedder: Arc<dyn solo_core::Embedder> =
1290 Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", 4));
1291 let handle = Arc::new(crate::tenants::TenantHandle::from_parts_for_tests(
1292 TenantId::default_tenant(),
1293 cfg,
1294 outcome.db_path.clone(),
1295 data_dir.clone(),
1296 1,
1297 stub.clone() as Arc<_>,
1298 embedder,
1299 write_handle,
1300 join,
1301 read_pool,
1302 ));
1303
1304 let report = forget_principal(handle, "alice", Some("admin"), &data_dir, &key)
1305 .expect("forget_principal");
1306 assert_eq!(report.episodes_deleted, 1);
1307 assert_eq!(report.triples_deleted, 1, "real count, not 0");
1308
1309 let admin_conn = crate::init::open_sqlcipher(
1311 &data_dir.join(crate::tenants::TENANTS_INDEX_FILENAME),
1312 &key,
1313 )
1314 .unwrap();
1315 let details_json: String = admin_conn
1316 .query_row(
1317 "SELECT details_json FROM audit_events_admin WHERE audit_id = ?",
1318 params![report.audit_admin_row_id],
1319 |r| r.get(0),
1320 )
1321 .unwrap();
1322 let parsed: serde_json::Value = serde_json::from_str(&details_json).unwrap();
1323 assert_eq!(parsed["triples_deleted"], 1);
1324 assert_eq!(parsed["episodes_deleted"], 1);
1325 }
1326
1327 #[test]
1332 fn estimate_via_direct_count_matches_seeded_data() {
1333 let (_tmp, db_path) = seed_two_principal_db();
1334 let conn = Connection::open(&db_path).unwrap();
1335 let alice_eps: i64 = conn
1336 .query_row(
1337 "SELECT COUNT(*) FROM episodes WHERE principal_subject = ?",
1338 params!["alice"],
1339 |r| r.get(0),
1340 )
1341 .unwrap();
1342 assert_eq!(alice_eps, 3);
1343 let alice_chunks: i64 = conn
1344 .query_row(
1345 "SELECT COUNT(*) FROM document_chunks WHERE ingested_by_principal = ?",
1346 params!["alice"],
1347 |r| r.get(0),
1348 )
1349 .unwrap();
1350 assert_eq!(alice_chunks, 3);
1351 let bob_eps: i64 = conn
1352 .query_row(
1353 "SELECT COUNT(*) FROM episodes WHERE principal_subject = ?",
1354 params!["bob"],
1355 |r| r.get(0),
1356 )
1357 .unwrap();
1358 assert_eq!(bob_eps, 2);
1359 let ghost_eps: i64 = conn
1360 .query_row(
1361 "SELECT COUNT(*) FROM episodes WHERE principal_subject = ?",
1362 params!["ghost"],
1363 |r| r.get(0),
1364 )
1365 .unwrap();
1366 assert_eq!(ghost_eps, 0);
1367 }
1368}