Skip to main content

solo_storage/
gdpr.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! GDPR right-to-erasure (v0.8.0 P6) — hard-delete every row tied to a
4//! principal subject in one tenant.
5//!
6//! The Solo design decision (locked in 0090) is hard-delete, not soft-
7//! delete: rows go away, the HNSW gets rebuilt from the surviving rows.
8//! Tombstones would leak the deleted-subject's existence in compliance
9//! audits; the GDPR contract requires actual removal.
10//!
11//! ## Algorithm
12//!
13//! Single SQL transaction on the per-tenant DB, then a post-commit HNSW
14//! rebuild + an admin-tier audit row in `tenants_index.db`:
15//!
16//!   1. `BEGIN IMMEDIATE` on the per-tenant DB.
17//!   2. Collect `episodes.rowid` set for the subject.
18//!   3. `DELETE FROM triples WHERE source_episode_id IN (...)` — count
19//!      rows. (Today the schema doesn't carry `source_episode_id` on
20//!      triples; v0.8.0 P6's GDPR contract is best-effort under that
21//!      schema — see the inline note on `triples_deleted`.)
22//!   4. `DELETE FROM episodes WHERE principal_subject = ?` — count.
23//!   5. `DELETE FROM document_chunks WHERE ingested_by_principal = ?`
24//!      — count.
25//!   6. `COMMIT`.
26//!   7. If any rows deleted: full HNSW rebuild from the remaining
27//!      `episodes` + `document_chunks`. Rebuild is eager because GDPR
28//!      is rare; the write-side latency hit is operator-acceptable.
29//!   8. Emit `gdpr.forget_user` admin-audit row to
30//!      `tenants_index.db::audit_events_admin`. The admin tier is the
31//!      right home because the subject can no longer query their own
32//!      per-tenant DB audit_events post-deletion.
33//!
34//! ## Idempotency
35//!
36//! Re-running on an absent subject is a no-op: 0 rows deleted, HNSW
37//! NOT rebuilt, admin-audit row still emitted with count=0 so the
38//! operator's compliance trail records the attempt.
39
40use std::path::Path;
41use std::sync::Arc;
42
43use rusqlite::{Connection, TransactionBehavior, params};
44use solo_core::{Embedder, Error, Result, TenantId, VectorIndex};
45
46use crate::audit::{AuditOperation, AuditResult, insert_audit_admin_row};
47use crate::embedder_registry::EmbedderIdentity;
48use crate::hnsw_id::episode_hnsw_id;
49use crate::init::open_sqlcipher;
50use crate::key_material::KeyMaterial;
51use crate::tenants::TenantsIndex;
52
53/// What `forget_principal` did. Returned to callers (typically the CLI
54/// `solo gdpr forget` subcommand) for surfacing in the user-visible
55/// summary and for downstream tests / scripting.
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct ForgetReport {
58    /// Rows deleted from `episodes`.
59    pub episodes_deleted: u64,
60    /// Rows deleted from `triples` whose `source_episode_id` referenced
61    /// one of the forgotten episodes (v0.8.1 P1 cascade — was always 0
62    /// in v0.8.0 because the FK column didn't exist).
63    pub triples_deleted: u64,
64    /// Rows deleted from `document_chunks`.
65    pub chunks_deleted: u64,
66    /// Triples that referenced the forgotten subject's domain but had
67    /// `source_episode_id IS NULL` (pre-v0.8.1 rows whose provenance
68    /// didn't backfill against a live episode, plus any pre-v0.8.0
69    /// rows). These are orphans-by-design — the GDPR cascade cannot
70    /// attribute them to the deleted principal without an FK.
71    /// Surfaced for operator visibility.
72    pub triples_orphan_null_source: u64,
73    /// Did the post-tx HNSW rebuild run? `false` iff no rows were
74    /// deleted (absent-subject idempotent case).
75    pub hnsw_rebuilt: bool,
76    /// `audit_id` of the row written to
77    /// `tenants_index.db::audit_events_admin`. Always present — even
78    /// the no-op (count=0) case emits a row so the compliance trail
79    /// records the attempt.
80    pub audit_admin_row_id: i64,
81}
82
83/// Delete every row in `tenant_handle`'s per-tenant DB that's
84/// attributed to `principal_subject`, then rebuild the HNSW from the
85/// surviving rows.
86///
87/// `tenant_handle` is the live `TenantHandle` for the target tenant
88/// (so we can route through its writer + embedder + HNSW). `data_dir`
89/// + `key` are used to write the admin-audit row into
90/// `tenants_index.db`.
91///
92/// ## Concurrency
93///
94/// Single `BEGIN IMMEDIATE` tx on the per-tenant DB. Concurrent reads
95/// see either the pre- or post-delete state; no torn view. The HNSW
96/// rebuild is post-tx, so concurrent recalls during the rebuild window
97/// may transiently see vectors for already-deleted episodes — the
98/// SELECTed read-side rows are gone, so the read won't surface deleted
99/// data even if the HNSW's tombstone bitmap is briefly out of date.
100///
101/// ## Lockfile
102///
103/// Caller is responsible for holding `solo.lock` (or running through a
104/// live daemon). This helper doesn't acquire it.
105pub fn forget_principal(
106    tenant_handle: Arc<crate::tenants::TenantHandle>,
107    principal_subject: &str,
108    actor_principal: Option<&str>,
109    data_dir: &Path,
110    key: &KeyMaterial,
111) -> Result<ForgetReport> {
112    let tenant_id = tenant_handle.tenant_id().clone();
113    let db_path = tenant_handle.db_path().to_path_buf();
114    let hnsw = tenant_handle.hnsw().clone();
115    let embedder_id = tenant_handle.embedder_id();
116
117    // Open a fresh connection on the per-tenant DB. We deliberately do
118    // NOT route through the writer-actor: the writer's mpsc is a
119    // single-write-at-a-time bottleneck, and GDPR is admin-tier
120    // (operator-initiated, rare). Routing through a dedicated
121    // connection avoids contention with regular writes and keeps the
122    // delete sequence in one place. The writer-actor's separate
123    // SQLCipher session sees the post-COMMIT state.
124    let mut conn = open_sqlcipher(&db_path, key)?;
125
126    let DeleteOutcome {
127        episodes_deleted,
128        triples_deleted,
129        triples_orphan_null_source,
130        chunks_deleted,
131        episode_rowids,
132    } = delete_principal_rows(&mut conn, principal_subject)?;
133
134    if triples_orphan_null_source > 0 {
135        // Operator visibility: pre-v0.8.1 triples without a resolved
136        // source_episode_id cannot be cascaded by GDPR. They remain in
137        // the per-tenant DB as orphans-by-design (documented in the
138        // v0.8.1 release notes + this module's docstring).
139        tracing::warn!(
140            tenant = %tenant_id,
141            principal = %principal_subject,
142            orphan_count = triples_orphan_null_source,
143            "gdpr.forget_user: {triples_orphan_null_source} triple(s) reference \
144             the forgotten subject's cluster(s) but have NULL source_episode_id \
145             (pre-v0.8.1 schema or unresolved provenance). Cannot cascade. \
146             Operator action: review and manually clean up if compliance scope \
147             requires it."
148        );
149    }
150
151    let total_deleted = episodes_deleted + triples_deleted + chunks_deleted;
152
153    // Tombstone the deleted-episode HNSW entries cheaply BEFORE the
154    // rebuild path — that way an absent rebuild target (e.g. no rebuild
155    // because writer thread is offline) still leaves the HNSW free of
156    // deleted-subject vectors. The rebuild below (if it runs) is the
157    // strong correctness path; this is defense in depth.
158    for rowid in &episode_rowids {
159        // tombstone is idempotent + cheap; ignore any error.
160        let _ = hnsw.remove(episode_hnsw_id(*rowid));
161    }
162
163    let hnsw_rebuilt = if total_deleted > 0 {
164        rebuild_hnsw_after_forget(&conn, hnsw.as_ref(), embedder_id)?;
165        true
166    } else {
167        false
168    };
169
170    // Admin-audit row goes to `tenants_index.db::audit_events_admin`.
171    // We open a separate SQLCipher connection to that file rather than
172    // route through `TenantRegistry::with_index` because the registry's
173    // mutex contention is unnecessary for this single write — the
174    // admin-audit table is independent of the tenants registry CRUD.
175    let now_ms = chrono::Utc::now().timestamp_millis();
176    let admin_path = data_dir.join(crate::tenants::TENANTS_INDEX_FILENAME);
177    let admin_conn = open_sqlcipher(&admin_path, key)?;
178    let details = serde_json::json!({
179        "principal_subject": principal_subject,
180        "episodes_deleted": episodes_deleted,
181        "triples_deleted": triples_deleted,
182        "triples_orphan_null_source": triples_orphan_null_source,
183        "chunks_deleted": chunks_deleted,
184        "hnsw_rebuilt": hnsw_rebuilt,
185    });
186    let audit_admin_row_id = insert_audit_admin_row(
187        &admin_conn,
188        now_ms,
189        actor_principal,
190        AuditOperation::GdprForgetUser,
191        Some(tenant_id.as_str()),
192        AuditResult::Ok,
193        Some(&details),
194    )?;
195
196    // v0.10.0: post-commit invalidation. GDPR forget cascades across
197    // episodes / triples / chunks; surface as a tenant-kind event so
198    // solo-web clients refetch every page. Skip the event when the
199    // forget was a no-op (zero rows deleted) — there's nothing to
200    // refetch. The broadcast `send` failure mode (no subscribers) is
201    // benign; the `.ok()` swallows it.
202    if total_deleted > 0 {
203        let _ = tenant_handle
204            .invalidate_sender()
205            .send(solo_core::InvalidateEvent {
206                reason: AuditOperation::GdprForgetUser.as_str().to_string(),
207                tenant_id: tenant_id.to_string(),
208                ts_ms: now_ms,
209                kind: "tenant".to_string(),
210            });
211    }
212
213    Ok(ForgetReport {
214        episodes_deleted,
215        triples_deleted,
216        triples_orphan_null_source,
217        chunks_deleted,
218        hnsw_rebuilt,
219        audit_admin_row_id,
220    })
221}
222
223/// Outcome of one `delete_principal_rows` pass.
224///
225/// `triples_orphan_null_source` is a v0.8.1 counter — for every cluster
226/// the deleted episodes belonged to, how many triples reference that
227/// cluster but have `source_episode_id IS NULL`. These are orphans-by-
228/// design (pre-v0.8.1 rows whose `provenance_json` couldn't be back-
229/// filled to a live episode). Surfaced for operator visibility; the
230/// triple rows themselves are NOT deleted.
231struct DeleteOutcome {
232    episodes_deleted: u64,
233    triples_deleted: u64,
234    triples_orphan_null_source: u64,
235    chunks_deleted: u64,
236    episode_rowids: Vec<i64>,
237}
238
239/// SQL-side delete cascade. Runs inside one BEGIN IMMEDIATE tx.
240///
241/// Returns a [`DeleteOutcome`] summarising per-table counts plus the
242/// affected episode rowids (so the caller can tombstone the HNSW for
243/// those rowids before / instead of triggering a full rebuild).
244fn delete_principal_rows(conn: &mut Connection, principal_subject: &str) -> Result<DeleteOutcome> {
245    let tx = conn
246        .transaction_with_behavior(TransactionBehavior::Immediate)
247        .map_err(|e| Error::storage(format!("BEGIN IMMEDIATE for forget: {e}")))?;
248
249    // Step 1: collect rowids of episodes belonging to the subject. We
250    // use `?` parameterisation defensively even though `principal_subject`
251    // is operator-supplied (CLI arg) — never trust input that crosses
252    // a process boundary.
253    let mut rowids: Vec<i64> = {
254        let mut stmt = tx
255            .prepare("SELECT rowid FROM episodes WHERE principal_subject = ?")
256            .map_err(|e| Error::storage(format!("prepare SELECT episodes.rowid: {e}")))?;
257        let rows = stmt
258            .query_map(params![principal_subject], |r| r.get::<_, i64>(0))
259            .map_err(|e| Error::storage(format!("query episodes.rowid: {e}")))?;
260        rows.collect::<rusqlite::Result<Vec<_>>>()
261            .map_err(|e| Error::storage(format!("collect episode rowids: {e}")))?
262    };
263    rowids.sort_unstable();
264
265    // Step 2: triples cascade. v0.8.1 P1 wired
266    // `triples.source_episode_id` via migration 0007. Delete every
267    // triple whose source_episode_id is one of the forgotten episodes'
268    // rowids. Rowids may be absent (deleted-subject has no episodes ⇒
269    // empty set ⇒ no-op DELETE) so we early-exit the IN-clause build.
270    //
271    // Pre-v0.8.1 triples with NULL source_episode_id remain as orphans-
272    // by-design; the caller logs the count via `triples_orphan_null_source`
273    // for operator visibility. They cannot be cascaded without a
274    // resolvable FK back to an episode.
275    //
276    // Orphan-count probe uses cluster_episodes membership: if the
277    // forgotten episode rowid was part of cluster C, and there are
278    // triples with cluster_id = C but source_episode_id IS NULL, those
279    // are the orphans we surface. This is a strict count (not a delete)
280    // — operator workflow decides cleanup.
281    let (triples_deleted, triples_orphan_null_source): (u64, u64) = if rowids.is_empty() {
282        (0, 0)
283    } else {
284        // Compose the IN-list. We pass rowids as positional `?`
285        // parameters — bound safely via `rusqlite::params_from_iter`
286        // so no SQL injection surface. The list size is bounded by
287        // the deleted-principal's episode count; for human-scale
288        // tenants this is comfortably under SQLite's
289        // SQLITE_MAX_VARIABLE_NUMBER (default 32766 since 3.32).
290        let placeholders = std::iter::repeat("?")
291            .take(rowids.len())
292            .collect::<Vec<_>>()
293            .join(",");
294
295        let delete_sql = format!("DELETE FROM triples WHERE source_episode_id IN ({placeholders})");
296        let deleted =
297            tx.execute(&delete_sql, rusqlite::params_from_iter(rowids.iter()))
298                .map_err(|e| Error::storage(format!("DELETE triples: {e}")))? as u64;
299
300        // Orphan count: triples whose cluster contains a forgotten
301        // episode but whose own source_episode_id is NULL. This is the
302        // pre-v0.8.1 case — `provenance_json` either didn't resolve or
303        // was absent. LEFT JOIN through cluster_episodes is the right
304        // shape because clusters fan out to many episodes; even one
305        // forgotten member makes the orphan count include the cluster's
306        // null-source triples.
307        let orphan_sql = format!(
308            "SELECT COUNT(*) FROM triples t \
309             WHERE t.source_episode_id IS NULL \
310               AND t.cluster_id IN ( \
311                    SELECT DISTINCT ce.cluster_id FROM cluster_episodes ce \
312                     JOIN episodes e ON e.memory_id = ce.memory_id \
313                     WHERE e.rowid IN ({placeholders}) \
314               )"
315        );
316        let orphans: i64 = tx
317            .query_row(
318                &orphan_sql,
319                rusqlite::params_from_iter(rowids.iter()),
320                |r| r.get(0),
321            )
322            .map_err(|e| Error::storage(format!("COUNT orphan triples: {e}")))?;
323
324        (deleted, orphans.max(0) as u64)
325    };
326
327    // Step 3: episodes themselves. CASCADE on FK from `embeddings` +
328    // `pending_index` + `cluster_episodes` handles the row-level fanout
329    // for us (see migration 0001 + 0002). The principal_subject column
330    // was added in migration 0006 — pre-0006 rows have it as NULL and
331    // are unaffected by this WHERE.
332    //
333    // Note on order: we delete triples FIRST so the
334    // `triples.source_episode_id REFERENCES episodes(rowid) ON DELETE
335    // SET NULL` from migration 0007 doesn't get a chance to silently
336    // clear the FK on rows we're trying to count as `triples_deleted`.
337    // Deleting triples-by-source first guarantees the count reflects
338    // the principal's actual derived facts, not the SET NULL fallback.
339    let episodes_deleted: u64 =
340        tx.execute(
341            "DELETE FROM episodes WHERE principal_subject = ?",
342            params![principal_subject],
343        )
344        .map_err(|e| Error::storage(format!("DELETE episodes: {e}")))? as u64;
345
346    // Step 4: document_chunks. The 0003 schema cascades from
347    // `documents` to `document_chunks`, but here we delete chunks
348    // directly because the principal is who *ingested* the document —
349    // the document row stays (a hypothetical multi-principal corpus
350    // could have other chunks under the same doc_id; not a v0.8.0
351    // concern but the design is correct). Cascades from chunk deletion
352    // to `chunk_embeddings` + `pending_index` (kind='chunk') run
353    // automatically per the 0003 FKs.
354    let chunks_deleted: u64 =
355        tx.execute(
356            "DELETE FROM document_chunks WHERE ingested_by_principal = ?",
357            params![principal_subject],
358        )
359        .map_err(|e| Error::storage(format!("DELETE document_chunks: {e}")))? as u64;
360
361    tx.commit()
362        .map_err(|e| Error::storage(format!("COMMIT forget: {e}")))?;
363
364    Ok(DeleteOutcome {
365        episodes_deleted,
366        triples_deleted,
367        triples_orphan_null_source,
368        chunks_deleted,
369        episode_rowids: rowids,
370    })
371}
372
373/// Eager full HNSW rebuild from the surviving SQL rows. Walks
374/// `episodes` + `document_chunks` (both `active`) + the cached
375/// embedder_id and re-adds every vector.
376///
377/// Used by `forget_principal`. v0.8.0 P6 GDPR is rare; an eager rebuild
378/// is the operator-acceptable trade-off. The rebuild is racing with
379/// concurrent reads — they're either reading the SQL (gone) or the
380/// HNSW (about to be replaced). A read seeing a stale HNSW row for a
381/// deleted episode still gets back nothing from the SQL `SELECT` step
382/// of recall, so the read-side leak risk is zero.
383fn rebuild_hnsw_after_forget(
384    conn: &Connection,
385    hnsw: &dyn VectorIndex,
386    embedder_id: i64,
387) -> Result<()> {
388    // Clear the in-memory state by `remove`-ing every active rowid.
389    // The HNSW impl handles `remove` of non-present rowids cheaply
390    // (HashSet insert) so iterating works even if some entries are
391    // already tombstoned. We don't have a `clear()` method on
392    // `VectorIndex`, so this is the closest equivalent without
393    // breaking the trait surface.
394    //
395    // Actually — `crate::recovery::rebuild_hnsw_from_sql` re-inserts
396    // every active row, but if we don't clear first we may end up
397    // with rowids that ARE active in SQL but were ALSO present in the
398    // index from before. Defensive: don't try to clear; just rebuild,
399    // since the surviving rows will hash to the same hnsw_ids and
400    // hnsw_rs treats re-add of an existing id as a no-op.
401    let _ = crate::recovery::rebuild_hnsw_from_sql(conn, hnsw, embedder_id)?;
402
403    Ok(())
404}
405
406/// Estimate (via COUNT) how many rows the operator's `forget` will
407/// affect, BEFORE running the destructive sequence. Used by the CLI
408/// confirmation gate to decide whether `--double-confirm` is required.
409pub fn estimate_forget_scope(
410    db_path: &Path,
411    key: &KeyMaterial,
412    principal_subject: &str,
413) -> Result<(u64, u64)> {
414    let conn = open_sqlcipher(db_path, key)?;
415    let episodes: i64 = conn
416        .query_row(
417            "SELECT COUNT(*) FROM episodes WHERE principal_subject = ?",
418            params![principal_subject],
419            |r| r.get(0),
420        )
421        .map_err(|e| Error::storage(format!("COUNT episodes for estimate: {e}")))?;
422    let chunks: i64 = conn
423        .query_row(
424            "SELECT COUNT(*) FROM document_chunks WHERE ingested_by_principal = ?",
425            params![principal_subject],
426            |r| r.get(0),
427        )
428        .map_err(|e| Error::storage(format!("COUNT chunks for estimate: {e}")))?;
429    Ok((episodes as u64, chunks as u64))
430}
431
432// Silence false-positive unused-import lints for items referenced
433// through public function signatures only.
434#[allow(dead_code)]
435fn _silence_unused() {
436    let _: Option<&dyn Embedder> = None;
437    let _: Option<TenantId> = None;
438    let _: Option<&TenantsIndex> = None;
439    let _: Option<&EmbedderIdentity> = None;
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445    use rusqlite::params;
446
447    /// Open a per-tenant DB (with migration 0006 applied) and seed
448    /// episodes under two principals. Returns (path, key).
449    fn seed_two_principal_db() -> (tempfile::TempDir, std::path::PathBuf) {
450        let tmp = tempfile::TempDir::new().unwrap();
451        let db_path = tmp.path().join("forget.db");
452        let mut conn = Connection::open(&db_path).unwrap();
453        conn.execute_batch(
454            "PRAGMA journal_mode = wal;
455             PRAGMA foreign_keys = ON;
456             PRAGMA busy_timeout = 5000;",
457        )
458        .unwrap();
459        crate::migration::run_migrations(&mut conn).unwrap();
460
461        let now_ms = chrono::Utc::now().timestamp_millis();
462        // alice has 3 episodes; bob has 2.
463        for i in 0..3 {
464            conn.execute(
465                "INSERT INTO episodes (
466                    memory_id, ts_ms, source_type, content,
467                    encoding_context_json, confidence, strength, salience,
468                    tier, created_at_ms, updated_at_ms, principal_subject
469                 ) VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5, 'hot', ?, ?, 'alice')",
470                params![
471                    format!("00000000-0000-0000-0000-00000000a{i:03x}"),
472                    now_ms,
473                    format!("alice content {i}"),
474                    now_ms,
475                    now_ms,
476                ],
477            )
478            .unwrap();
479        }
480        for i in 0..2 {
481            conn.execute(
482                "INSERT INTO episodes (
483                    memory_id, ts_ms, source_type, content,
484                    encoding_context_json, confidence, strength, salience,
485                    tier, created_at_ms, updated_at_ms, principal_subject
486                 ) VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5, 'hot', ?, ?, 'bob')",
487                params![
488                    format!("00000000-0000-0000-0000-00000000b{i:03x}"),
489                    now_ms,
490                    format!("bob content {i}"),
491                    now_ms,
492                    now_ms,
493                ],
494            )
495            .unwrap();
496        }
497
498        // Seed a documents row + chunks (3 under alice, 1 under bob).
499        conn.execute(
500            "INSERT INTO documents (
501                doc_id, source, title, mime_type, ingested_at_ms,
502                modified_at_ms, status, chunk_count, content_hash, byte_size
503             ) VALUES ('00000000-0000-0000-0000-000000000d01', 'src://test', 't',
504                       'text/markdown', ?, NULL, 'active', 4, 'hashabc', 200)",
505            params![now_ms],
506        )
507        .unwrap();
508        for i in 0..3 {
509            conn.execute(
510                "INSERT INTO document_chunks (
511                    chunk_id, doc_id, chunk_index, content, token_count,
512                    start_offset, end_offset, created_at_ms, ingested_by_principal
513                 ) VALUES (?, ?, ?, ?, 5, ?, ?, ?, 'alice')",
514                params![
515                    format!("00000000-0000-0000-0000-00000000c{i:03x}"),
516                    "00000000-0000-0000-0000-000000000d01",
517                    i,
518                    format!("alice chunk {i}"),
519                    (i * 10) as i64,
520                    ((i + 1) * 10) as i64,
521                    now_ms,
522                ],
523            )
524            .unwrap();
525        }
526        conn.execute(
527            "INSERT INTO document_chunks (
528                chunk_id, doc_id, chunk_index, content, token_count,
529                start_offset, end_offset, created_at_ms, ingested_by_principal
530             ) VALUES ('00000000-0000-0000-0000-00000000ccc1', '00000000-0000-0000-0000-000000000d01',
531                       3, 'bob chunk', 5, 30, 40, ?, 'bob')",
532            params![now_ms],
533        )
534        .unwrap();
535
536        (tmp, db_path)
537    }
538
539    #[test]
540    fn delete_principal_rows_targets_only_named_subject() {
541        let (_tmp, db_path) = seed_two_principal_db();
542        let mut conn = Connection::open(&db_path).unwrap();
543        conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
544        let DeleteOutcome {
545            episodes_deleted: episodes,
546            chunks_deleted: chunks,
547            ..
548        } = delete_principal_rows(&mut conn, "alice").unwrap();
549        assert_eq!(episodes, 3, "should delete alice's 3 episodes");
550        assert_eq!(chunks, 3, "should delete alice's 3 chunks");
551        // Bob's rows remain.
552        let alice_remaining: i64 = conn
553            .query_row(
554                "SELECT COUNT(*) FROM episodes WHERE principal_subject = 'alice'",
555                [],
556                |r| r.get(0),
557            )
558            .unwrap();
559        assert_eq!(alice_remaining, 0);
560        let bob_remaining: i64 = conn
561            .query_row(
562                "SELECT COUNT(*) FROM episodes WHERE principal_subject = 'bob'",
563                [],
564                |r| r.get(0),
565            )
566            .unwrap();
567        assert_eq!(bob_remaining, 2);
568        let bob_chunks: i64 = conn
569            .query_row(
570                "SELECT COUNT(*) FROM document_chunks WHERE ingested_by_principal = 'bob'",
571                [],
572                |r| r.get(0),
573            )
574            .unwrap();
575        assert_eq!(bob_chunks, 1);
576    }
577
578    #[test]
579    fn delete_principal_rows_idempotent_on_absent_subject() {
580        let (_tmp, db_path) = seed_two_principal_db();
581        let mut conn = Connection::open(&db_path).unwrap();
582        let DeleteOutcome {
583            episodes_deleted: episodes,
584            chunks_deleted: chunks,
585            episode_rowids: rowids,
586            ..
587        } = delete_principal_rows(&mut conn, "ghost").unwrap();
588        assert_eq!(episodes, 0);
589        assert_eq!(chunks, 0);
590        assert!(rowids.is_empty());
591        // Re-run: still 0.
592        let DeleteOutcome {
593            episodes_deleted: e2,
594            chunks_deleted: c2,
595            ..
596        } = delete_principal_rows(&mut conn, "ghost").unwrap();
597        assert_eq!(e2, 0);
598        assert_eq!(c2, 0);
599    }
600
601    #[test]
602    fn delete_principal_rows_cascades_to_embeddings() {
603        // Migration 0001 declares ON DELETE CASCADE from embeddings.memory_id
604        // → episodes.memory_id. Verify that the cascade fires when GDPR
605        // delete removes the episode.
606        let (_tmp, db_path) = seed_two_principal_db();
607        let mut conn = Connection::open(&db_path).unwrap();
608        conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
609
610        // Seed an embedding row tied to one of alice's episodes.
611        let alice_id_0 = "00000000-0000-0000-0000-00000000a000";
612        // First need an embedder row to satisfy the FK.
613        conn.execute(
614            "INSERT INTO embedders (name, version, dim, dtype, first_seen_ms)
615             VALUES ('test', 'v1', 4, 'f32', 0)",
616            [],
617        )
618        .unwrap();
619        let embedder_id: i64 = conn
620            .query_row(
621                "SELECT embedder_id FROM embedders WHERE name = 'test'",
622                [],
623                |r| r.get(0),
624            )
625            .unwrap();
626        let vec_blob = vec![0u8; 16];
627        conn.execute(
628            "INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms)
629             VALUES (?, ?, 'f32', 4, ?, 0)",
630            params![alice_id_0, embedder_id, &vec_blob[..]],
631        )
632        .unwrap();
633
634        let before: i64 = conn
635            .query_row("SELECT COUNT(*) FROM embeddings", [], |r| r.get(0))
636            .unwrap();
637        assert_eq!(before, 1);
638
639        let _ = delete_principal_rows(&mut conn, "alice").unwrap();
640
641        let after: i64 = conn
642            .query_row("SELECT COUNT(*) FROM embeddings", [], |r| r.get(0))
643            .unwrap();
644        assert_eq!(after, 0, "embeddings should cascade-delete with episodes");
645    }
646
647    #[test]
648    #[ignore = "requires SQLCipher: estimate_forget_scope opens via open_sqlcipher which fails on plain SQLite test DBs. Round-trip is covered by SQLCipher-enabled integration tests."]
649    fn estimate_forget_scope_counts_correctly() {
650        // SQLCipher round-trip — needs a real PRAGMA key bind that the
651        // plain bundled SQLite test profile rejects. The estimate
652        // helper is also exercised end-to-end by `delete_principal_rows`
653        // through the unit test above; this just covers the COUNT side.
654        let (_tmp, db_path) = seed_two_principal_db();
655        let key = crate::key_material::KeyMaterial::derive("test-pass", &[0u8; 16]).unwrap();
656        let (eps, chunks) = estimate_forget_scope(&db_path, &key, "alice").unwrap();
657        assert_eq!(eps, 3);
658        assert_eq!(chunks, 3);
659        let (eps_b, chunks_b) = estimate_forget_scope(&db_path, &key, "bob").unwrap();
660        assert_eq!(eps_b, 2);
661        assert_eq!(chunks_b, 1);
662        let (eps_ghost, _) = estimate_forget_scope(&db_path, &key, "ghost").unwrap();
663        assert_eq!(eps_ghost, 0);
664    }
665
666    /// Round-trip `forget_principal` against a real SQLCipher data dir
667    /// initialised via `init()`. Verifies: episodes/chunks/embeddings
668    /// cascade-delete, HNSW rebuilt flag set, admin-audit row written.
669    ///
670    /// SQLCipher-only because `open_sqlcipher` rejects plain SQLite files.
671    #[test]
672    fn forget_principal_round_trip_against_real_sqlcipher() {
673        use crate::init::{InitParams, init};
674        use solo_core::TenantId;
675        use std::sync::Arc;
676        use zeroize::Zeroizing;
677
678        let tmp = tempfile::TempDir::new().unwrap();
679        let data_dir = tmp.path().to_path_buf();
680        let pass = "forget round-trip test passphrase";
681        let outcome = init(InitParams {
682            data_dir: data_dir.clone(),
683            passphrase: Zeroizing::new(pass.into()),
684            force: false,
685            embedder: crate::init::default_embedder(),
686        })
687        .expect("init");
688
689        let cfg = crate::config::SoloConfig::read(&outcome.config_path).unwrap();
690        let salt = cfg.salt_bytes().unwrap();
691        let key = crate::key_material::KeyMaterial::derive(pass, &salt).unwrap();
692
693        // Seed two principals worth of episodes via direct SQL.
694        {
695            let conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
696            let now = chrono::Utc::now().timestamp_millis();
697            for i in 0..3 {
698                conn.execute(
699                    "INSERT INTO episodes (
700                        memory_id, ts_ms, source_type, content,
701                        encoding_context_json, confidence, strength, salience,
702                        tier, created_at_ms, updated_at_ms, principal_subject
703                     ) VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5,
704                               'hot', ?, ?, 'alice')",
705                    params![
706                        format!("00000000-0000-0000-0000-0000000{i:08x}"),
707                        now,
708                        format!("alice ep {i}"),
709                        now,
710                        now,
711                    ],
712                )
713                .unwrap();
714            }
715            conn.execute(
716                "INSERT INTO episodes (
717                    memory_id, ts_ms, source_type, content,
718                    encoding_context_json, confidence, strength, salience,
719                    tier, created_at_ms, updated_at_ms, principal_subject
720                 ) VALUES (?, ?, 'user_message', 'bob ep', '{}', 0.9, 0.5, 0.5,
721                           'hot', ?, ?, 'bob')",
722                params!["00000000-0000-0000-0000-0000000b000000", now, now, now,],
723            )
724            .unwrap();
725        }
726
727        // Build a minimal TenantHandle via from_parts_for_tests.
728        let stub = Arc::new(crate::test_support::StubVectorIndex::new(4));
729        let writer_conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
730        let crate::writer::WriterSpawn {
731            handle: write_handle,
732            join,
733        } = crate::writer::WriterActor::spawn(writer_conn, stub.clone() as Arc<_>);
734        let read_pool = crate::reader::ReaderPool::new(
735            &outcome.db_path,
736            Some(key.clone()),
737            stub.clone() as Arc<_>,
738        )
739        .unwrap();
740        let embedder: Arc<dyn solo_core::Embedder> =
741            Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", 4));
742        let handle = Arc::new(crate::tenants::TenantHandle::from_parts_for_tests(
743            TenantId::default_tenant(),
744            cfg.clone(),
745            outcome.db_path.clone(),
746            data_dir.clone(),
747            1, // embedder_id placeholder
748            stub.clone() as Arc<_>,
749            embedder,
750            write_handle,
751            join,
752            read_pool,
753        ));
754
755        // Run forget_principal.
756        let report = forget_principal(handle, "alice", Some("admin"), &data_dir, &key)
757            .expect("forget_principal");
758        assert_eq!(report.episodes_deleted, 3);
759        assert_eq!(report.triples_deleted, 0);
760        assert_eq!(report.chunks_deleted, 0);
761        assert!(report.hnsw_rebuilt);
762
763        // Verify alice's rows are gone, bob's remain.
764        let after_conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
765        let alice_left: i64 = after_conn
766            .query_row(
767                "SELECT COUNT(*) FROM episodes WHERE principal_subject = 'alice'",
768                [],
769                |r| r.get(0),
770            )
771            .unwrap();
772        assert_eq!(alice_left, 0);
773        let bob_left: i64 = after_conn
774            .query_row(
775                "SELECT COUNT(*) FROM episodes WHERE principal_subject = 'bob'",
776                [],
777                |r| r.get(0),
778            )
779            .unwrap();
780        assert_eq!(bob_left, 1);
781
782        // Verify the admin-audit row landed.
783        let admin_conn = crate::init::open_sqlcipher(
784            &data_dir.join(crate::tenants::TENANTS_INDEX_FILENAME),
785            &key,
786        )
787        .unwrap();
788        let (op, target, principal): (String, Option<String>, Option<String>) = admin_conn
789            .query_row(
790                "SELECT operation, target_tenant_id, principal_subject \
791                 FROM audit_events_admin WHERE audit_id = ?",
792                params![report.audit_admin_row_id],
793                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
794            )
795            .unwrap();
796        assert_eq!(op, "gdpr.forget_user");
797        assert_eq!(target.as_deref(), Some("default"));
798        assert_eq!(principal.as_deref(), Some("admin"));
799    }
800
801    /// `forget_principal` on an absent subject is idempotent: 0 rows
802    /// deleted, HNSW NOT rebuilt, but an admin-audit row still emitted
803    /// with count=0 so the compliance trail records the attempt.
804    #[test]
805    fn forget_principal_idempotent_on_absent_subject() {
806        use crate::init::{InitParams, init};
807        use solo_core::TenantId;
808        use std::sync::Arc;
809        use zeroize::Zeroizing;
810
811        let tmp = tempfile::TempDir::new().unwrap();
812        let data_dir = tmp.path().to_path_buf();
813        let pass = "idempotent forget test";
814        let outcome = init(InitParams {
815            data_dir: data_dir.clone(),
816            passphrase: Zeroizing::new(pass.into()),
817            force: false,
818            embedder: crate::init::default_embedder(),
819        })
820        .expect("init");
821        let cfg = crate::config::SoloConfig::read(&outcome.config_path).unwrap();
822        let salt = cfg.salt_bytes().unwrap();
823        let key = crate::key_material::KeyMaterial::derive(pass, &salt).unwrap();
824
825        let stub = Arc::new(crate::test_support::StubVectorIndex::new(4));
826        let writer_conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
827        let crate::writer::WriterSpawn {
828            handle: write_handle,
829            join,
830        } = crate::writer::WriterActor::spawn(writer_conn, stub.clone() as Arc<_>);
831        let read_pool = crate::reader::ReaderPool::new(
832            &outcome.db_path,
833            Some(key.clone()),
834            stub.clone() as Arc<_>,
835        )
836        .unwrap();
837        let embedder: Arc<dyn solo_core::Embedder> =
838            Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", 4));
839        let handle = Arc::new(crate::tenants::TenantHandle::from_parts_for_tests(
840            TenantId::default_tenant(),
841            cfg,
842            outcome.db_path.clone(),
843            data_dir.clone(),
844            1,
845            stub.clone() as Arc<_>,
846            embedder,
847            write_handle,
848            join,
849            read_pool,
850        ));
851
852        let report = forget_principal(handle, "ghost", None, &data_dir, &key)
853            .expect("forget_principal on absent subject");
854        assert_eq!(report.episodes_deleted, 0);
855        assert_eq!(report.chunks_deleted, 0);
856        assert!(!report.hnsw_rebuilt, "no rebuild when nothing deleted");
857        // Admin audit row must still be present.
858        assert!(report.audit_admin_row_id > 0);
859    }
860
861    // ---- v0.8.1 P1: triples cascade ----
862    //
863    // The following tests use the plain-SQLite seed (no SQLCipher) to
864    // exercise the `delete_principal_rows` cascade through the new
865    // `triples.source_episode_id` FK introduced in migration 0007.
866
867    /// Helper: seed a triple linked to a specific source episode rowid.
868    fn seed_triple_with_source(
869        conn: &Connection,
870        triple_id: &str,
871        subject: &str,
872        predicate: &str,
873        object: &str,
874        source_episode_id: Option<i64>,
875        cluster_id: Option<&str>,
876    ) {
877        let now_ms = chrono::Utc::now().timestamp_millis();
878        conn.execute(
879            "INSERT INTO triples (
880                triple_id, subject_id, predicate, object_id, object_kind,
881                valid_from_ms, valid_to_ms, confidence, provenance_json,
882                created_at_ms, updated_at_ms, source_episode_id, cluster_id
883             ) VALUES (?, ?, ?, ?, 'literal', ?, NULL, 0.9, '{}', ?, ?, ?, ?)",
884            params![
885                triple_id,
886                subject,
887                predicate,
888                object,
889                now_ms,
890                now_ms,
891                now_ms,
892                source_episode_id,
893                cluster_id,
894            ],
895        )
896        .expect("seed triple");
897    }
898
899    #[test]
900    fn delete_principal_rows_cascades_through_triples_by_source_episode() {
901        let (_tmp, db_path) = seed_two_principal_db();
902        let mut conn = Connection::open(&db_path).unwrap();
903        conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
904
905        // Look up alice's episode rowids.
906        let alice_rowids: Vec<i64> = {
907            let mut stmt = conn
908                .prepare(
909                    "SELECT rowid FROM episodes WHERE principal_subject = 'alice' ORDER BY rowid",
910                )
911                .unwrap();
912            stmt.query_map([], |r| r.get::<_, i64>(0))
913                .unwrap()
914                .map(|r| r.unwrap())
915                .collect()
916        };
917        assert_eq!(alice_rowids.len(), 3);
918        let bob_rowid: i64 = conn
919            .query_row(
920                "SELECT rowid FROM episodes WHERE principal_subject = 'bob' LIMIT 1",
921                [],
922                |r| r.get(0),
923            )
924            .unwrap();
925
926        // Two triples derived from alice's first episode + one from bob's.
927        seed_triple_with_source(
928            &conn,
929            "00000000-0000-0000-0000-000000000a01",
930            "alice",
931            "uses",
932            "rust",
933            Some(alice_rowids[0]),
934            None,
935        );
936        seed_triple_with_source(
937            &conn,
938            "00000000-0000-0000-0000-000000000a02",
939            "alice",
940            "likes",
941            "skiing",
942            Some(alice_rowids[1]),
943            None,
944        );
945        seed_triple_with_source(
946            &conn,
947            "00000000-0000-0000-0000-000000000b01",
948            "bob",
949            "uses",
950            "go",
951            Some(bob_rowid),
952            None,
953        );
954
955        let outcome = delete_principal_rows(&mut conn, "alice").unwrap();
956        assert_eq!(outcome.episodes_deleted, 3);
957        assert_eq!(
958            outcome.triples_deleted, 2,
959            "should delete the two triples whose source was an alice episode"
960        );
961        assert_eq!(outcome.triples_orphan_null_source, 0);
962
963        // Bob's triple survives.
964        let bob_count: i64 = conn
965            .query_row(
966                "SELECT COUNT(*) FROM triples WHERE subject_id = 'bob'",
967                [],
968                |r| r.get(0),
969            )
970            .unwrap();
971        assert_eq!(bob_count, 1);
972    }
973
974    #[test]
975    fn delete_principal_rows_preserves_null_source_triples() {
976        let (_tmp, db_path) = seed_two_principal_db();
977        let mut conn = Connection::open(&db_path).unwrap();
978        conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
979        let alice_rowids: Vec<i64> = {
980            let mut stmt = conn
981                .prepare(
982                    "SELECT rowid FROM episodes WHERE principal_subject = 'alice' ORDER BY rowid",
983                )
984                .unwrap();
985            stmt.query_map([], |r| r.get::<_, i64>(0))
986                .unwrap()
987                .map(|r| r.unwrap())
988                .collect()
989        };
990
991        // Triple with NULL source — pre-v0.8.1 row shape. NOT linked to a
992        // cluster either, so it shouldn't be counted as orphan-by-cluster.
993        seed_triple_with_source(
994            &conn,
995            "00000000-0000-0000-0000-000000000c01",
996            "alice",
997            "claimed",
998            "x",
999            None,
1000            None,
1001        );
1002        // Triple with source on alice's first episode — should cascade.
1003        seed_triple_with_source(
1004            &conn,
1005            "00000000-0000-0000-0000-000000000c02",
1006            "alice",
1007            "linked",
1008            "y",
1009            Some(alice_rowids[0]),
1010            None,
1011        );
1012
1013        let outcome = delete_principal_rows(&mut conn, "alice").unwrap();
1014        assert_eq!(outcome.triples_deleted, 1, "only the source-linked triple");
1015        // The NULL-source triple stays — it's orphan-by-design but not
1016        // counted as orphan-by-cluster (no cluster membership).
1017        let remaining: i64 = conn
1018            .query_row(
1019                "SELECT COUNT(*) FROM triples WHERE source_episode_id IS NULL",
1020                [],
1021                |r| r.get(0),
1022            )
1023            .unwrap();
1024        assert_eq!(remaining, 1, "NULL-source row must survive");
1025    }
1026
1027    #[test]
1028    fn delete_principal_rows_counts_orphans_via_cluster_membership() {
1029        let (_tmp, db_path) = seed_two_principal_db();
1030        let mut conn = Connection::open(&db_path).unwrap();
1031        conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap();
1032
1033        let alice_rows: Vec<(i64, String)> = {
1034            let mut stmt = conn
1035                .prepare(
1036                    "SELECT rowid, memory_id FROM episodes WHERE principal_subject = 'alice' \
1037                     ORDER BY rowid",
1038                )
1039                .unwrap();
1040            stmt.query_map([], |r| Ok((r.get::<_, i64>(0)?, r.get::<_, String>(1)?)))
1041                .unwrap()
1042                .map(|r| r.unwrap())
1043                .collect()
1044        };
1045        let alice_memory_id = alice_rows[0].1.clone();
1046
1047        // Seed a cluster + cluster_episodes link for alice's first episode.
1048        let cluster_id = "00000000-0000-0000-0000-0000000c1001";
1049        let now_ms = chrono::Utc::now().timestamp_millis();
1050        conn.execute(
1051            "INSERT INTO clusters (cluster_id, coherence, created_at_ms) VALUES (?, ?, ?)",
1052            params![cluster_id, 0.9_f64, now_ms],
1053        )
1054        .unwrap();
1055        conn.execute(
1056            "INSERT INTO cluster_episodes (cluster_id, memory_id) VALUES (?, ?)",
1057            params![cluster_id, alice_memory_id],
1058        )
1059        .unwrap();
1060
1061        // Pre-v0.8.1-shape triple: source_episode_id IS NULL but cluster
1062        // membership ties it to alice. This is the "orphan by design" case.
1063        seed_triple_with_source(
1064            &conn,
1065            "00000000-0000-0000-0000-000000000d01",
1066            "alice",
1067            "claimed",
1068            "fact",
1069            None,
1070            Some(cluster_id),
1071        );
1072
1073        let outcome = delete_principal_rows(&mut conn, "alice").unwrap();
1074        assert_eq!(
1075            outcome.triples_orphan_null_source, 1,
1076            "the NULL-source triple linked via cluster must appear in the orphan count"
1077        );
1078
1079        // The orphan triple is NOT deleted; only the operator visibility
1080        // counter records it.
1081        let remaining: i64 = conn
1082            .query_row(
1083                "SELECT COUNT(*) FROM triples WHERE triple_id = '00000000-0000-0000-0000-000000000d01'",
1084                [],
1085                |r| r.get(0),
1086            )
1087            .unwrap();
1088        assert_eq!(remaining, 1, "orphan triple must remain in the table");
1089    }
1090
1091    #[test]
1092    fn migration_0007_adds_source_episode_id_column_and_index() {
1093        let mut conn = Connection::open_in_memory().unwrap();
1094        crate::migration::run_migrations(&mut conn).unwrap();
1095        // Column present.
1096        let cols: Vec<(String, String)> = conn
1097            .prepare("PRAGMA table_info('triples')")
1098            .unwrap()
1099            .query_map([], |row| {
1100                Ok((row.get::<_, String>(1)?, row.get::<_, String>(2)?))
1101            })
1102            .unwrap()
1103            .map(|r| r.unwrap())
1104            .collect();
1105        let names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
1106        assert!(
1107            names.contains(&"source_episode_id"),
1108            "triples missing source_episode_id after 0007; got {names:?}"
1109        );
1110        // Index present.
1111        let idx_exists: i64 = conn
1112            .query_row(
1113                "SELECT COUNT(*) FROM sqlite_master \
1114                 WHERE type='index' AND name='idx_triples_source_episode'",
1115                [],
1116                |r| r.get(0),
1117            )
1118            .unwrap();
1119        assert_eq!(idx_exists, 1, "idx_triples_source_episode missing");
1120    }
1121
1122    #[test]
1123    fn migration_0007_is_idempotent_on_repeated_open() {
1124        let mut conn = Connection::open_in_memory().unwrap();
1125        crate::migration::run_migrations(&mut conn).unwrap();
1126        crate::migration::run_migrations(&mut conn).unwrap();
1127        let count: i64 = conn
1128            .query_row(
1129                "SELECT COUNT(*) FROM schema_migrations WHERE version = 7",
1130                [],
1131                |r| r.get(0),
1132            )
1133            .unwrap();
1134        assert_eq!(count, 1, "0007 row must not be inserted twice");
1135    }
1136
1137    #[test]
1138    fn migration_0007_backfills_source_episode_id_from_provenance_json() {
1139        // Build a v0.6-style DB (no source_episode_id yet) by hand-running
1140        // the migrations up to 6, seeding a triple whose provenance_json
1141        // references a live episode by memory_id, then running 0007 and
1142        // verifying the column got populated.
1143        let mut conn = Connection::open_in_memory().unwrap();
1144        // Apply migrations 1..=6 manually so we control the pre-0007 state.
1145        crate::migration::run_migrations(&mut conn).unwrap();
1146        // Now seed: insert an episode, then a triple whose
1147        // provenance_json references that episode's memory_id, then
1148        // clear source_episode_id on the triple (simulating a row
1149        // written before 0007 was applied — the column would have been
1150        // NULL by default for a pre-existing INSERT).
1151        let now_ms = chrono::Utc::now().timestamp_millis();
1152        conn.execute(
1153            "INSERT INTO episodes (
1154                memory_id, ts_ms, source_type, content,
1155                encoding_context_json, confidence, strength, salience,
1156                tier, created_at_ms, updated_at_ms
1157             ) VALUES ('00000000-0000-0000-0000-000000000111', ?, 'user_message',
1158                       'seed', '{}', 0.9, 0.5, 0.5, 'hot', ?, ?)",
1159            params![now_ms, now_ms, now_ms],
1160        )
1161        .unwrap();
1162        let ep_rowid: i64 = conn
1163            .query_row(
1164                "SELECT rowid FROM episodes WHERE memory_id = ?",
1165                params!["00000000-0000-0000-0000-000000000111"],
1166                |r| r.get(0),
1167            )
1168            .unwrap();
1169        // Insert a triple with provenance_json carrying the memory_id
1170        // (Provenance.derived_from JSON shape) but explicit NULL source.
1171        let prov = serde_json::json!({
1172            "derived_from": ["00000000-0000-0000-0000-000000000111"],
1173            "derivation": "extraction",
1174            "by": "test",
1175            "at_ms": now_ms,
1176        });
1177        conn.execute(
1178            "INSERT INTO triples (
1179                triple_id, subject_id, predicate, object_id, object_kind,
1180                valid_from_ms, valid_to_ms, confidence, provenance_json,
1181                created_at_ms, updated_at_ms, source_episode_id
1182             ) VALUES ('00000000-0000-0000-0000-000000000222',
1183                       'alice', 'uses', 'rust', 'literal',
1184                       ?, NULL, 0.9, ?, ?, ?, NULL)",
1185            params![now_ms, prov.to_string(), now_ms, now_ms],
1186        )
1187        .unwrap();
1188
1189        // Manually clear + re-run the backfill UPDATE that lives in 0007.
1190        // (Migrations are idempotent — this re-run hits only the NULL
1191        // rows, which is the same behavior 0007 takes on first apply.)
1192        let backfill = include_str!("migrations/0007_triples_source.sql");
1193        // The backfill statement is the trailing UPDATE; running the
1194        // whole file twice would error on the ALTER. So we extract the
1195        // UPDATE portion alone for this re-run.
1196        let update_sql = backfill
1197            .split("UPDATE triples")
1198            .nth(1)
1199            .map(|tail| format!("UPDATE triples{tail}"))
1200            .expect("0007 contains the UPDATE backfill");
1201        conn.execute_batch(&update_sql).unwrap();
1202
1203        let backfilled: Option<i64> = conn
1204            .query_row(
1205                "SELECT source_episode_id FROM triples \
1206                 WHERE triple_id = '00000000-0000-0000-0000-000000000222'",
1207                [],
1208                |r| r.get(0),
1209            )
1210            .unwrap();
1211        assert_eq!(
1212            backfilled,
1213            Some(ep_rowid),
1214            "backfill must resolve derived_from[0] memory_id → episodes.rowid"
1215        );
1216    }
1217
1218    #[test]
1219    fn forget_report_carries_triples_deleted_count_through_to_audit_details() {
1220        // Same setup as the existing round-trip test, but with a triple
1221        // wired to the deleted principal's episode. Verifies the report's
1222        // triples_deleted count surfaces correctly + the audit row
1223        // captures it.
1224        use crate::init::{InitParams, init};
1225        use solo_core::TenantId;
1226        use zeroize::Zeroizing;
1227
1228        let tmp = tempfile::TempDir::new().unwrap();
1229        let data_dir = tmp.path().to_path_buf();
1230        let pass = "forget triples cascade test passphrase";
1231        let outcome = init(InitParams {
1232            data_dir: data_dir.clone(),
1233            passphrase: Zeroizing::new(pass.into()),
1234            force: false,
1235            embedder: crate::init::default_embedder(),
1236        })
1237        .expect("init");
1238        let cfg = crate::config::SoloConfig::read(&outcome.config_path).unwrap();
1239        let salt = cfg.salt_bytes().unwrap();
1240        let key = crate::key_material::KeyMaterial::derive(pass, &salt).unwrap();
1241
1242        // Seed alice's episode + a triple sourced from it via direct SQL.
1243        {
1244            let conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
1245            let now = chrono::Utc::now().timestamp_millis();
1246            conn.execute(
1247                "INSERT INTO episodes (
1248                    memory_id, ts_ms, source_type, content,
1249                    encoding_context_json, confidence, strength, salience,
1250                    tier, created_at_ms, updated_at_ms, principal_subject
1251                 ) VALUES ('00000000-0000-0000-0000-000000000ace', ?, 'user_message',
1252                           'alice ep', '{}', 0.9, 0.5, 0.5, 'hot', ?, ?, 'alice')",
1253                params![now, now, now],
1254            )
1255            .unwrap();
1256            let ep_rowid: i64 = conn
1257                .query_row(
1258                    "SELECT rowid FROM episodes WHERE memory_id = ?",
1259                    params!["00000000-0000-0000-0000-000000000ace"],
1260                    |r| r.get(0),
1261                )
1262                .unwrap();
1263            conn.execute(
1264                "INSERT INTO triples (
1265                    triple_id, subject_id, predicate, object_id, object_kind,
1266                    valid_from_ms, valid_to_ms, confidence, provenance_json,
1267                    created_at_ms, updated_at_ms, source_episode_id
1268                 ) VALUES ('00000000-0000-0000-0000-000000000a11',
1269                           'alice', 'uses', 'rust', 'literal',
1270                           ?, NULL, 0.9, '{}', ?, ?, ?)",
1271                params![now, now, now, ep_rowid],
1272            )
1273            .unwrap();
1274        }
1275
1276        // Stand up a TenantHandle via from_parts_for_tests, then run forget.
1277        let stub = Arc::new(crate::test_support::StubVectorIndex::new(4));
1278        let writer_conn = crate::init::open_sqlcipher(&outcome.db_path, &key).unwrap();
1279        let crate::writer::WriterSpawn {
1280            handle: write_handle,
1281            join,
1282        } = crate::writer::WriterActor::spawn(writer_conn, stub.clone() as Arc<_>);
1283        let read_pool = crate::reader::ReaderPool::new(
1284            &outcome.db_path,
1285            Some(key.clone()),
1286            stub.clone() as Arc<_>,
1287        )
1288        .unwrap();
1289        let embedder: Arc<dyn solo_core::Embedder> =
1290            Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", 4));
1291        let handle = Arc::new(crate::tenants::TenantHandle::from_parts_for_tests(
1292            TenantId::default_tenant(),
1293            cfg,
1294            outcome.db_path.clone(),
1295            data_dir.clone(),
1296            1,
1297            stub.clone() as Arc<_>,
1298            embedder,
1299            write_handle,
1300            join,
1301            read_pool,
1302        ));
1303
1304        let report = forget_principal(handle, "alice", Some("admin"), &data_dir, &key)
1305            .expect("forget_principal");
1306        assert_eq!(report.episodes_deleted, 1);
1307        assert_eq!(report.triples_deleted, 1, "real count, not 0");
1308
1309        // The admin-audit row's details_json must carry the same count.
1310        let admin_conn = crate::init::open_sqlcipher(
1311            &data_dir.join(crate::tenants::TENANTS_INDEX_FILENAME),
1312            &key,
1313        )
1314        .unwrap();
1315        let details_json: String = admin_conn
1316            .query_row(
1317                "SELECT details_json FROM audit_events_admin WHERE audit_id = ?",
1318                params![report.audit_admin_row_id],
1319                |r| r.get(0),
1320            )
1321            .unwrap();
1322        let parsed: serde_json::Value = serde_json::from_str(&details_json).unwrap();
1323        assert_eq!(parsed["triples_deleted"], 1);
1324        assert_eq!(parsed["episodes_deleted"], 1);
1325    }
1326
1327    /// Same shape as `estimate_forget_scope_counts_correctly` but uses a
1328    /// raw `Connection::query_row` against the seeded plain-SQLite DB.
1329    /// Exercises the SELECT COUNT logic without going through
1330    /// `open_sqlcipher`.
1331    #[test]
1332    fn estimate_via_direct_count_matches_seeded_data() {
1333        let (_tmp, db_path) = seed_two_principal_db();
1334        let conn = Connection::open(&db_path).unwrap();
1335        let alice_eps: i64 = conn
1336            .query_row(
1337                "SELECT COUNT(*) FROM episodes WHERE principal_subject = ?",
1338                params!["alice"],
1339                |r| r.get(0),
1340            )
1341            .unwrap();
1342        assert_eq!(alice_eps, 3);
1343        let alice_chunks: i64 = conn
1344            .query_row(
1345                "SELECT COUNT(*) FROM document_chunks WHERE ingested_by_principal = ?",
1346                params!["alice"],
1347                |r| r.get(0),
1348            )
1349            .unwrap();
1350        assert_eq!(alice_chunks, 3);
1351        let bob_eps: i64 = conn
1352            .query_row(
1353                "SELECT COUNT(*) FROM episodes WHERE principal_subject = ?",
1354                params!["bob"],
1355                |r| r.get(0),
1356            )
1357            .unwrap();
1358        assert_eq!(bob_eps, 2);
1359        let ghost_eps: i64 = conn
1360            .query_row(
1361                "SELECT COUNT(*) FROM episodes WHERE principal_subject = ?",
1362                params!["ghost"],
1363                |r| r.get(0),
1364            )
1365            .unwrap();
1366        assert_eq!(ghost_eps, 0);
1367    }
1368}