Skip to main content

kintsugi_core/
log.rs

1//! Append-only, hash-chained event log (SQLite).
2//!
3//! Every observed command becomes one immutable row. Each row's `hash` is
4//! `SHA-256(prev_hash || canonical(row))`, so any edit to a past row — or any
5//! reordering — breaks the chain and is detectable by [`EventLog::verify_chain`].
6//!
7//! Security spine: the event chain is append-only. Day-to-day "delete" is
8//! **redaction** — an append-only [`redactions`](EventLog::redact) row that hides
9//! an entry from views while the original row and the hash chain stay intact and
10//! verifiable. True erasure is the separate, explicit [`EventLog::purge_matching`]
11//! (hard delete + re-chain): it deliberately rewrites history for the purged span
12//! and records a marker event, and is never invoked automatically.
13
14use rusqlite::{Connection, OptionalExtension};
15use sha2::{Digest, Sha256};
16use time::format_description::well_known::Rfc3339;
17use time::OffsetDateTime;
18use uuid::Uuid;
19
20use crate::types::{Class, Decision, ProposedCommand, Verdict};
21
22/// The genesis predecessor hash for the very first event.
23pub const GENESIS_HASH: &str = "0000000000000000000000000000000000000000000000000000000000000000";
24
25/// Errors from the event log.
26#[derive(Debug, thiserror::Error)]
27pub enum LogError {
28    #[error("database error: {0}")]
29    Db(#[from] rusqlite::Error),
30    #[error("time formatting error: {0}")]
31    Time(#[from] time::error::Format),
32    #[error("stored timestamp is not valid RFC3339: {0}")]
33    TimeParse(#[from] time::error::Parse),
34    #[error("stored value is not valid: {0}")]
35    Corrupt(String),
36}
37
38/// A single immutable row of the event log.
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct LoggedEvent {
41    /// Monotonic sequence number (storage rowid).
42    pub seq: i64,
43    /// The originating command id.
44    pub id: Uuid,
45    /// When the command was observed.
46    pub ts: OffsetDateTime,
47    /// Originating agent.
48    pub agent: String,
49    /// Working directory (stored as a string).
50    pub cwd: String,
51    /// The raw command, preserved verbatim.
52    pub command: String,
53    /// The argument vector.
54    pub argv: Vec<String>,
55    /// Rule-engine classification.
56    pub class: Class,
57    /// The decision recorded.
58    pub decision: Decision,
59    /// The rule name or resolution reason behind the decision.
60    pub reason: String,
61    /// Tier that produced the decision.
62    pub tier: u8,
63    /// Optional severity score.
64    pub risk: Option<u8>,
65    /// Optional one-sentence summary.
66    pub summary: Option<String>,
67    /// Optional snapshot reference.
68    pub snapshot_id: Option<String>,
69    /// Optional originating session id (view metadata; not part of the hash).
70    pub session: Option<String>,
71    /// Hash of the predecessor row.
72    pub prev_hash: String,
73    /// This row's hash.
74    pub hash: String,
75    /// Whether this event has been redacted (hidden from default views).
76    pub redacted: bool,
77}
78
79/// A filter over the event log, used by views, redaction, and purge.
80#[derive(Debug, Clone, Default)]
81pub struct Filter {
82    /// Restrict to one agent (`claude-code`, `cursor`, `shim`, …).
83    pub agent: Option<String>,
84    /// Exclude one agent (e.g. drop `fs-watch` from the command timeline).
85    pub agent_not: Option<String>,
86    /// Restrict to one session id.
87    pub session: Option<String>,
88    /// Only events at or after this instant.
89    pub since: Option<OffsetDateTime>,
90    /// Only events strictly before this instant.
91    pub until: Option<OffsetDateTime>,
92    /// Case-insensitive substring match on the raw command.
93    pub grep: Option<String>,
94    /// Restrict to one classification.
95    pub class: Option<Class>,
96    /// Include redacted rows (default: hidden).
97    pub include_redacted: bool,
98    /// Cap the number of rows returned (newest kept).
99    pub limit: Option<usize>,
100    /// Skip this many of the newest matching rows before applying `limit` —
101    /// the page offset for `kintsugi log --page N`.
102    pub offset: Option<usize>,
103}
104
105impl Filter {
106    /// Build the SQL `WHERE` body (without the `WHERE` keyword) and its params.
107    /// `events`-qualified so it composes with the redaction LEFT JOIN.
108    fn where_clause(&self) -> (String, Vec<Box<dyn rusqlite::ToSql>>) {
109        let mut clauses: Vec<String> = Vec::new();
110        let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
111        if let Some(a) = &self.agent {
112            clauses.push("events.agent = ?".into());
113            params.push(Box::new(a.clone()));
114        }
115        if let Some(a) = &self.agent_not {
116            clauses.push("events.agent != ?".into());
117            params.push(Box::new(a.clone()));
118        }
119        if let Some(s) = &self.session {
120            clauses.push("events.session = ?".into());
121            params.push(Box::new(s.clone()));
122        }
123        if let Some(c) = &self.class {
124            clauses.push("events.class = ?".into());
125            params.push(Box::new(c.as_str().to_string()));
126        }
127        if let Some(g) = &self.grep {
128            clauses.push("events.command LIKE ? ESCAPE '\\'".into());
129            params.push(Box::new(format!("%{}%", like_escape(g))));
130        }
131        // ts is stored as RFC3339 text; lexical compare is chronological for UTC Z.
132        if let Some(since) = &self.since {
133            if let Ok(s) = since.format(&Rfc3339) {
134                clauses.push("events.ts >= ?".into());
135                params.push(Box::new(s));
136            }
137        }
138        if let Some(until) = &self.until {
139            if let Ok(s) = until.format(&Rfc3339) {
140                clauses.push("events.ts < ?".into());
141                params.push(Box::new(s));
142            }
143        }
144        if !self.include_redacted {
145            clauses.push("r.event_id IS NULL".into());
146        }
147        let body = if clauses.is_empty() {
148            "1=1".to_string()
149        } else {
150            clauses.join(" AND ")
151        };
152        (body, params)
153    }
154}
155
156/// Escape LIKE wildcards so a user's grep text matches literally.
157fn like_escape(s: &str) -> String {
158    s.replace('\\', "\\\\")
159        .replace('%', "\\%")
160        .replace('_', "\\_")
161}
162
163/// One entry in the approval queue (a held command awaiting a human decision).
164#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
165pub struct PendingItem {
166    /// The held command (its `id` is the queue id).
167    pub command: ProposedCommand,
168    /// Rule-engine classification.
169    pub class: Class,
170    /// Why it was held.
171    pub reason: String,
172    /// When it was enqueued.
173    #[serde(with = "time::serde::rfc3339")]
174    pub ts: OffsetDateTime,
175}
176
177/// The result of verifying the hash chain.
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub enum ChainStatus {
180    /// The chain is intact and every row hashes correctly.
181    Intact,
182    /// A break was found.
183    Broken {
184        /// The sequence number of the offending row.
185        seq: i64,
186        /// What went wrong.
187        detail: String,
188    },
189}
190
191impl ChainStatus {
192    /// `true` when the chain is intact.
193    pub fn is_intact(&self) -> bool {
194        matches!(self, ChainStatus::Intact)
195    }
196}
197
198/// Handle to the append-only event log.
199pub struct EventLog {
200    conn: Connection,
201}
202
203impl EventLog {
204    /// Open (creating if needed) a log at `path`.
205    pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, LogError> {
206        let conn = Connection::open(path)?;
207        Self::init(conn)
208    }
209
210    /// Open an ephemeral in-memory log (used in tests).
211    pub fn open_in_memory() -> Result<Self, LogError> {
212        let conn = Connection::open_in_memory()?;
213        Self::init(conn)
214    }
215
216    fn init(conn: Connection) -> Result<Self, LogError> {
217        conn.pragma_update(None, "journal_mode", "WAL")?;
218        // NORMAL is safe under WAL and keeps per-event writes fast (no fsync per
219        // commit). A crash can only lose the very last transactions; the surviving
220        // chain stays intact and verifiable.
221        conn.pragma_update(None, "synchronous", "NORMAL")?;
222        conn.pragma_update(None, "foreign_keys", "ON")?;
223        // Block (rather than fail) when another process holds the write lock, so
224        // the read-modify-append in `log_event` serializes across processes instead
225        // of forking the hash chain on a shared prev_hash.
226        conn.busy_timeout(std::time::Duration::from_secs(5))?;
227        conn.execute_batch(
228            r#"
229            CREATE TABLE IF NOT EXISTS events (
230                seq        INTEGER PRIMARY KEY AUTOINCREMENT,
231                id         TEXT NOT NULL,
232                ts         TEXT NOT NULL,
233                agent      TEXT NOT NULL,
234                cwd        TEXT NOT NULL,
235                command    TEXT NOT NULL,
236                argv       TEXT NOT NULL,
237                class      TEXT NOT NULL,
238                decision   TEXT NOT NULL,
239                reason     TEXT NOT NULL,
240                tier       INTEGER NOT NULL,
241                risk       INTEGER,
242                summary    TEXT,
243                snapshot_id TEXT,
244                prev_hash  TEXT NOT NULL,
245                hash       TEXT NOT NULL,
246                session    TEXT
247            );
248
249            -- Append-only redactions: hide an event from views without mutating
250            -- it or breaking the chain. The original row and its hash are intact.
251            CREATE TABLE IF NOT EXISTS redactions (
252                event_id   TEXT PRIMARY KEY,
253                ts         TEXT NOT NULL,
254                reason     TEXT NOT NULL
255            );
256
257            -- Decision memory. Unlike `events`, this table is intentionally
258            -- mutable state: per-repo always-allow / always-deny by command hash.
259            CREATE TABLE IF NOT EXISTS memory (
260                repo         TEXT NOT NULL,
261                command_hash TEXT NOT NULL,
262                action       TEXT NOT NULL,
263                updated_at   TEXT NOT NULL,
264                PRIMARY KEY (repo, command_hash)
265            );
266
267            -- Snapshots taken before destructive ops, for `kintsugi undo`.
268            CREATE TABLE IF NOT EXISTS snapshots (
269                id         TEXT PRIMARY KEY,
270                seq        INTEGER,
271                ts         TEXT NOT NULL,
272                command    TEXT NOT NULL,
273                manifest   TEXT NOT NULL,
274                reverted   INTEGER NOT NULL DEFAULT 0
275            );
276
277            -- The approval queue: held commands awaiting a human decision.
278            -- Mutable state; status is 'pending' | 'approved' | 'denied'.
279            CREATE TABLE IF NOT EXISTS pending (
280                id          TEXT PRIMARY KEY,
281                ts          TEXT NOT NULL,
282                command     TEXT NOT NULL,
283                class       TEXT NOT NULL,
284                reason      TEXT NOT NULL,
285                status      TEXT NOT NULL DEFAULT 'pending',
286                updated_at  TEXT NOT NULL
287            );
288            "#,
289        )?;
290        // Migrate older DBs created before the `session` column existed.
291        let has_session = conn
292            .prepare("SELECT 1 FROM pragma_table_info('events') WHERE name = 'session'")?
293            .exists([])?;
294        if !has_session {
295            conn.execute_batch("ALTER TABLE events ADD COLUMN session TEXT")?;
296        }
297        Ok(Self { conn })
298    }
299
300    /// Add a held command to the approval queue (idempotent on its id).
301    pub fn enqueue_pending(
302        &self,
303        cmd: &ProposedCommand,
304        class: Class,
305        reason: &str,
306    ) -> Result<(), LogError> {
307        let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
308        let cmd_json = serde_json::to_string(cmd)
309            .map_err(|e| LogError::Corrupt(format!("pending command serialize: {e}")))?;
310        self.conn.execute(
311            "INSERT INTO pending (id, ts, command, class, reason, status, updated_at)
312             VALUES (?1, ?2, ?3, ?4, ?5, 'pending', ?2)
313             ON CONFLICT(id) DO NOTHING",
314            rusqlite::params![cmd.id.to_string(), now, cmd_json, class.as_str(), reason],
315        )?;
316        Ok(())
317    }
318
319    /// The current status of a queued command, if it is in the queue.
320    pub fn pending_status(&self, id: &str) -> Result<Option<String>, LogError> {
321        Ok(self
322            .conn
323            .query_row("SELECT status FROM pending WHERE id = ?1", [id], |r| {
324                r.get(0)
325            })
326            .optional()?)
327    }
328
329    /// Set a queued command's status (`approved` | `denied`).
330    pub fn set_pending_status(&self, id: &str, status: &str) -> Result<(), LogError> {
331        let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
332        self.conn.execute(
333            "UPDATE pending SET status = ?2, updated_at = ?3 WHERE id = ?1",
334            rusqlite::params![id, status, now],
335        )?;
336        Ok(())
337    }
338
339    /// Atomically move a queued command from status `from` to `to`. Returns true
340    /// iff *this* call performed the transition (the row existed and was `from`).
341    ///
342    /// This is the exactly-once guard: a held command must resolve/run once even
343    /// if two `kintsugi approve`/`kintsugi run` invocations race — only the winner of
344    /// the compare-and-swap proceeds; the loser sees `false` and does nothing.
345    pub fn cas_pending_status(&self, id: &str, from: &str, to: &str) -> Result<bool, LogError> {
346        let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
347        let changed = self.conn.execute(
348            "UPDATE pending SET status = ?3, updated_at = ?4 WHERE id = ?1 AND status = ?2",
349            rusqlite::params![id, from, to, now],
350        )?;
351        Ok(changed == 1)
352    }
353
354    /// The stored command for a queued id (for resolve/re-run).
355    pub fn pending_command(&self, id: &str) -> Result<Option<ProposedCommand>, LogError> {
356        let json: Option<String> = self
357            .conn
358            .query_row("SELECT command FROM pending WHERE id = ?1", [id], |r| {
359                r.get(0)
360            })
361            .optional()?;
362        match json {
363            Some(j) => Ok(Some(serde_json::from_str(&j).map_err(|e| {
364                LogError::Corrupt(format!("pending command parse: {e}"))
365            })?)),
366            None => Ok(None),
367        }
368    }
369
370    /// List the still-pending queued commands, oldest first.
371    pub fn list_pending(&self) -> Result<Vec<PendingItem>, LogError> {
372        let mut stmt = self.conn.prepare(
373            // Order by rowid alone — it IS insertion order. We used to lead with
374            // `ts ASC` and tiebreak on rowid, but the Windows runner's wall clock
375            // can step backwards by a few ms (NTP slew on the VM host), which
376            // made ts(second insert) < ts(first insert) and put the newer row
377            // before the older one. Rowid is monotonic, so it doesn't care.
378            "SELECT command, class, reason, ts FROM pending WHERE status = 'pending' ORDER BY rowid ASC",
379        )?;
380        let rows = stmt.query_map([], |row| {
381            Ok((
382                row.get::<_, String>(0)?,
383                row.get::<_, String>(1)?,
384                row.get::<_, String>(2)?,
385                row.get::<_, String>(3)?,
386            ))
387        })?;
388        let mut out = Vec::new();
389        for r in rows {
390            let (cmd_json, class_s, reason, ts_s) = r?;
391            let command: ProposedCommand = serde_json::from_str(&cmd_json)
392                .map_err(|e| LogError::Corrupt(format!("pending parse: {e}")))?;
393            out.push(PendingItem {
394                command,
395                class: parse_class(&class_s)?,
396                reason,
397                ts: OffsetDateTime::parse(&ts_s, &Rfc3339)?,
398            });
399        }
400        Ok(out)
401    }
402
403    /// Record a snapshot taken before a destructive command.
404    pub fn record_snapshot(&self, manifest: &crate::snapshot::Manifest) -> Result<(), LogError> {
405        let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
406        let json = serde_json::to_string(manifest)
407            .map_err(|e| LogError::Corrupt(format!("manifest serialize: {e}")))?;
408        // Snapshots are recorded just before the event they guard is appended, so
409        // the guarded event's seq is the next rowid (= current count + 1).
410        let seq: i64 = self
411            .conn
412            .query_row("SELECT COUNT(*) + 1 FROM events", [], |r| r.get(0))?;
413        self.conn.execute(
414            "INSERT INTO snapshots (id, seq, ts, command, manifest, reverted) VALUES (?1, ?2, ?3, ?4, ?5, 0)",
415            rusqlite::params![manifest.id, seq, now, manifest.command, json],
416        )?;
417        Ok(())
418    }
419
420    /// Load all snapshots not yet reverted, newest first.
421    pub fn unreverted_snapshots(&self) -> Result<Vec<crate::snapshot::Manifest>, LogError> {
422        let mut stmt = self
423            .conn
424            .prepare("SELECT manifest FROM snapshots WHERE reverted = 0 ORDER BY rowid DESC")?;
425        let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
426        let mut out = Vec::new();
427        for r in rows {
428            let json = r?;
429            let m: crate::snapshot::Manifest = serde_json::from_str(&json)
430                .map_err(|e| LogError::Corrupt(format!("manifest parse: {e}")))?;
431            out.push(m);
432        }
433        Ok(out)
434    }
435
436    /// The most recent not-yet-reverted snapshot, if any.
437    pub fn latest_unreverted_snapshot(
438        &self,
439    ) -> Result<Option<crate::snapshot::Manifest>, LogError> {
440        Ok(self.unreverted_snapshots()?.into_iter().next())
441    }
442
443    /// Mark a snapshot as reverted (it has been undone).
444    pub fn mark_reverted(&self, id: &str) -> Result<(), LogError> {
445        self.conn
446            .execute("UPDATE snapshots SET reverted = 1 WHERE id = ?1", [id])?;
447        Ok(())
448    }
449
450    /// Remember a per-repo decision for an exact command (always-allow / -deny).
451    ///
452    /// Only `Allow` and `Deny` are meaningful; `Hold` is rejected.
453    pub fn remember(
454        &self,
455        repo: &str,
456        command_hash: &str,
457        action: crate::types::Decision,
458    ) -> Result<(), LogError> {
459        use crate::types::Decision;
460        if action == Decision::Hold {
461            return Err(LogError::Corrupt(
462                "cannot remember a Hold decision".to_string(),
463            ));
464        }
465        let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
466        self.conn.execute(
467            r#"
468            INSERT INTO memory (repo, command_hash, action, updated_at)
469            VALUES (?1, ?2, ?3, ?4)
470            ON CONFLICT(repo, command_hash) DO UPDATE SET action = ?3, updated_at = ?4
471            "#,
472            rusqlite::params![repo, command_hash, action.as_str(), now],
473        )?;
474        Ok(())
475    }
476
477    /// Look up a remembered decision for an exact command in a repo.
478    pub fn memory_lookup(
479        &self,
480        repo: &str,
481        command_hash: &str,
482    ) -> Result<Option<crate::types::Decision>, LogError> {
483        use crate::types::Decision;
484        let action: Option<String> = self
485            .conn
486            .query_row(
487                "SELECT action FROM memory WHERE repo = ?1 AND command_hash = ?2",
488                rusqlite::params![repo, command_hash],
489                |row| row.get(0),
490            )
491            .optional()?;
492        Ok(match action.as_deref() {
493            Some("allow") => Some(Decision::Allow),
494            Some("deny") => Some(Decision::Deny),
495            _ => None,
496        })
497    }
498
499    /// Compute the canonical hash for a row given its predecessor.
500    ///
501    /// The hash binds every immutable field plus the predecessor hash, so neither
502    /// a field edit nor a reordering can go unnoticed.
503    #[allow(clippy::too_many_arguments)]
504    fn compute_hash(
505        prev_hash: &str,
506        id: &Uuid,
507        ts_rfc3339: &str,
508        agent: &str,
509        cwd: &str,
510        command: &str,
511        argv_json: &str,
512        class: Class,
513        decision: Decision,
514        reason: &str,
515        tier: u8,
516        risk: Option<u8>,
517        summary: Option<&str>,
518        snapshot_id: Option<&str>,
519    ) -> String {
520        let payload = format!(
521            "{prev}\u{1f}{id}\u{1f}{ts}\u{1f}{agent}\u{1f}{cwd}\u{1f}{cmd}\u{1f}{argv}\u{1f}{class}\u{1f}{dec}\u{1f}{reason}\u{1f}{tier}\u{1f}{risk}\u{1f}{summary}\u{1f}{snap}",
522            prev = prev_hash,
523            id = id,
524            ts = ts_rfc3339,
525            agent = agent,
526            cwd = cwd,
527            cmd = command,
528            argv = argv_json,
529            class = class.as_str(),
530            dec = decision.as_str(),
531            reason = reason,
532            tier = tier,
533            risk = risk.map(|r| r.to_string()).unwrap_or_default(),
534            summary = summary.unwrap_or_default(),
535            snap = snapshot_id.unwrap_or_default(),
536        );
537        let mut hasher = Sha256::new();
538        hasher.update(payload.as_bytes());
539        hex::encode(hasher.finalize())
540    }
541
542    /// Return the hash of the most recent event, or [`GENESIS_HASH`] if empty.
543    fn head_hash(&self) -> Result<String, LogError> {
544        let hash: Option<String> = self
545            .conn
546            .query_row(
547                "SELECT hash FROM events ORDER BY seq DESC LIMIT 1",
548                [],
549                |row| row.get(0),
550            )
551            .optional()?;
552        Ok(hash.unwrap_or_else(|| GENESIS_HASH.to_string()))
553    }
554
555    /// The read-modify-append, run inside the write transaction. Returns
556    /// (prev_hash, hash, seq).
557    #[allow(clippy::too_many_arguments)]
558    #[allow(clippy::too_many_arguments)]
559    fn append_locked(
560        &self,
561        cmd: &ProposedCommand,
562        verdict: &Verdict,
563        ts: &str,
564        cwd: &str,
565        command: &str,
566        argv_json: &str,
567        snapshot_id: Option<&str>,
568    ) -> Result<(String, String, i64), LogError> {
569        let prev_hash = self.head_hash()?;
570        let hash = Self::compute_hash(
571            &prev_hash,
572            &cmd.id,
573            ts,
574            &cmd.agent,
575            cwd,
576            command,
577            argv_json,
578            verdict.class,
579            verdict.decision,
580            &verdict.reason,
581            verdict.tier,
582            verdict.risk,
583            verdict.summary.as_deref(),
584            snapshot_id,
585        );
586        self.conn.execute(
587            r#"
588            INSERT INTO events
589                (id, ts, agent, cwd, command, argv, class, decision, reason, tier, risk, summary, snapshot_id, prev_hash, hash, session)
590            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)
591            "#,
592            rusqlite::params![
593                cmd.id.to_string(),
594                ts,
595                cmd.agent,
596                cwd,
597                command,
598                argv_json,
599                verdict.class.as_str(),
600                verdict.decision.as_str(),
601                verdict.reason,
602                verdict.tier as i64,
603                verdict.risk.map(|r| r as i64),
604                verdict.summary,
605                snapshot_id,
606                prev_hash,
607                hash,
608                cmd.session,
609            ],
610        )?;
611        Ok((prev_hash, hash, self.conn.last_insert_rowid()))
612    }
613
614    /// Append one event built from a proposal and its verdict.
615    pub fn log_event(
616        &self,
617        cmd: &ProposedCommand,
618        verdict: &Verdict,
619        snapshot_id: Option<&str>,
620    ) -> Result<LoggedEvent, LogError> {
621        let ts = cmd.ts.format(&Rfc3339)?;
622        let cwd = cmd.cwd.to_string_lossy().to_string();
623
624        // Redact-before-hash (security spine #6): never let a command-line secret
625        // (DB connection strings, `-pSECRET`, `PGPASSWORD=…`, bearer tokens) enter
626        // the append-only, hash-chained log — it could not be scrubbed later. Only
627        // the secret *value* is replaced (rest verbatim); when nothing matches, the
628        // command/argv are stored byte-identically (so the common case and every
629        // existing test are unchanged). The argv is re-derived from the redacted
630        // command so it can't leak the secret either.
631        let red = crate::redact::redact_command(&cmd.raw);
632        let (command, argv): (String, Vec<String>) = if red.any() {
633            (red.text.clone(), crate::shell::split(&red.text))
634        } else {
635            (cmd.raw.clone(), cmd.argv.clone())
636        };
637        let argv_json = serde_json::to_string(&argv)
638            .map_err(|e| LogError::Corrupt(format!("argv serialize: {e}")))?;
639
640        // Serialize the read-modify-append: take the write lock immediately so a
641        // concurrent writer (another process) blocks and then reads the updated
642        // head, rather than both linking new rows to the same prev_hash.
643        self.conn.execute_batch("BEGIN IMMEDIATE")?;
644        let (prev_hash, hash, seq) =
645            match self.append_locked(cmd, verdict, &ts, &cwd, &command, &argv_json, snapshot_id) {
646                Ok(v) => {
647                    self.conn.execute_batch("COMMIT")?;
648                    v
649                }
650                Err(e) => {
651                    let _ = self.conn.execute_batch("ROLLBACK");
652                    return Err(e);
653                }
654            };
655
656        Ok(LoggedEvent {
657            seq,
658            id: cmd.id,
659            ts: cmd.ts,
660            agent: cmd.agent.clone(),
661            cwd,
662            command,
663            argv,
664            class: verdict.class,
665            decision: verdict.decision,
666            reason: verdict.reason.clone(),
667            tier: verdict.tier,
668            risk: verdict.risk,
669            summary: verdict.summary.clone(),
670            snapshot_id: snapshot_id.map(str::to_string),
671            session: cmd.session.clone(),
672            prev_hash,
673            hash,
674            redacted: false,
675        })
676    }
677
678    /// Return the most recent `n` non-redacted events, oldest first.
679    pub fn tail(&self, n: usize) -> Result<Vec<LoggedEvent>, LogError> {
680        self.query(&Filter {
681            limit: Some(n),
682            ..Filter::default()
683        })
684    }
685
686    /// Return events matching `filter`, oldest first (capped by `filter.limit`,
687    /// skipping `filter.offset` of the newest matches first for pagination).
688    pub fn query(&self, filter: &Filter) -> Result<Vec<LoggedEvent>, LogError> {
689        let (where_body, params) = filter.where_clause();
690        let limit = filter.limit.map(|n| n as i64).unwrap_or(-1);
691        let offset = filter.offset.map(|n| n as i64).unwrap_or(0);
692        // Take the page window of newest-by-seq rows (skip `offset`, take
693        // `limit`), then re-sort ascending so the page reads chronologically.
694        // SQLite accepts `LIMIT -1 OFFSET n` to mean "all, skipping n".
695        let sql = format!(
696            r#"
697            SELECT seq, id, ts, agent, cwd, command, argv, class, decision, reason, tier,
698                   risk, summary, snapshot_id, prev_hash, hash, session, redacted
699            FROM (
700                SELECT events.*, (r.event_id IS NOT NULL) AS redacted
701                FROM events LEFT JOIN redactions r ON r.event_id = events.id
702                WHERE {where_body}
703                ORDER BY events.seq DESC LIMIT ? OFFSET ?
704            ) ORDER BY seq ASC
705            "#
706        );
707        let mut stmt = self.conn.prepare(&sql)?;
708        let mut bound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
709        bound.push(&limit);
710        bound.push(&offset);
711        let rows = stmt.query_map(bound.as_slice(), Self::row_to_event)?;
712        let mut out = Vec::new();
713        for r in rows {
714            out.push(r??);
715        }
716        Ok(out)
717    }
718
719    /// Count events matching `filter` (ignores `limit`).
720    pub fn count_matching(&self, filter: &Filter) -> Result<i64, LogError> {
721        let (where_body, params) = filter.where_clause();
722        let sql = format!(
723            "SELECT COUNT(*) FROM events LEFT JOIN redactions r ON r.event_id = events.id WHERE {where_body}"
724        );
725        let bound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
726        Ok(self
727            .conn
728            .query_row(&sql, bound.as_slice(), |row| row.get(0))?)
729    }
730
731    /// Redact a single event by id (append-only; idempotent). Returns whether a
732    /// matching, not-already-redacted event existed.
733    pub fn redact(&self, event_id: &str, reason: &str) -> Result<bool, LogError> {
734        let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
735        let exists: bool = self
736            .conn
737            .prepare("SELECT 1 FROM events WHERE id = ?1")?
738            .exists([event_id])?;
739        if !exists {
740            return Ok(false);
741        }
742        let n = self.conn.execute(
743            "INSERT INTO redactions (event_id, ts, reason) VALUES (?1, ?2, ?3)
744             ON CONFLICT(event_id) DO NOTHING",
745            rusqlite::params![event_id, now, reason],
746        )?;
747        Ok(n > 0)
748    }
749
750    /// Redact every event matching `filter` (newest-first, no limit applied).
751    /// Returns the number newly redacted.
752    pub fn redact_matching(&self, filter: &Filter, reason: &str) -> Result<usize, LogError> {
753        // Match against not-yet-redacted rows regardless of the filter's flag.
754        let f = Filter {
755            include_redacted: false,
756            limit: None,
757            ..filter.clone()
758        };
759        let (where_body, params) = f.where_clause();
760        let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
761        let sql = format!(
762            "INSERT INTO redactions (event_id, ts, reason)
763             SELECT events.id, ?, ? FROM events
764             LEFT JOIN redactions r ON r.event_id = events.id
765             WHERE {where_body}"
766        );
767        let mut bound: Vec<&dyn rusqlite::ToSql> = vec![&now, &reason];
768        let pbound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
769        bound.extend(pbound);
770        Ok(self.conn.execute(&sql, bound.as_slice())?)
771    }
772
773    /// **Hard erasure** — physically delete events matching `filter`, rebuild the
774    /// hash chain over the survivors, and append a marker event recording the
775    /// purge. Deliberately rewrites history for the purged span; never automatic.
776    /// Returns the number of events removed.
777    ///
778    /// `include_redacted`/`limit` on the filter are ignored: purge always targets
779    /// every matching row. Catastrophic-or-not is irrelevant — this is the user's
780    /// explicit erasure of their own local data.
781    pub fn purge_matching(&self, filter: &Filter, reason: &str) -> Result<usize, LogError> {
782        let f = Filter {
783            include_redacted: true,
784            limit: None,
785            ..filter.clone()
786        };
787        let (where_body, params) = f.where_clause();
788
789        self.conn.execute_batch("BEGIN IMMEDIATE")?;
790        let removed = (|| -> Result<usize, LogError> {
791            // Drop redaction rows for the doomed events, then the events.
792            let del_red = format!(
793                "DELETE FROM redactions WHERE event_id IN (
794                     SELECT events.id FROM events
795                     LEFT JOIN redactions r ON r.event_id = events.id WHERE {where_body})"
796            );
797            let bound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
798            self.conn.execute(&del_red, bound.as_slice())?;
799
800            let del = format!(
801                "DELETE FROM events WHERE id IN (
802                     SELECT events.id FROM events
803                     LEFT JOIN redactions r ON r.event_id = events.id WHERE {where_body})"
804            );
805            let bound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
806            let n = self.conn.execute(&del, bound.as_slice())?;
807            self.rechain()?;
808            Ok(n)
809        })();
810        let removed = match removed {
811            Ok(n) => {
812                self.conn.execute_batch("COMMIT")?;
813                n
814            }
815            Err(e) => {
816                let _ = self.conn.execute_batch("ROLLBACK");
817                return Err(e);
818            }
819        };
820
821        // Record the purge itself as an immutable marker (outside the txn so it
822        // links to the freshly re-chained head).
823        if removed > 0 {
824            let marker = ProposedCommand::new(
825                "kintsugi",
826                std::path::PathBuf::from("."),
827                vec!["purge".into()],
828                format!("kintsugi purge --hard ({removed} event(s): {reason})"),
829            );
830            let verdict = Verdict::rules(Class::Safe, Decision::Allow, "audit:purge");
831            self.log_event(&marker, &verdict, None)?;
832        }
833        Ok(removed)
834    }
835
836    /// Recompute prev_hash/hash for every surviving row in seq order so the chain
837    /// is valid again after a purge. Caller holds the write transaction.
838    fn rechain(&self) -> Result<(), LogError> {
839        let mut stmt = self.conn.prepare(
840            r#"
841            SELECT seq, id, ts, agent, cwd, command, argv, class, decision, reason, tier,
842                   risk, summary, snapshot_id, prev_hash, hash, session, 0 AS redacted
843            FROM events ORDER BY seq ASC
844            "#,
845        )?;
846        let mut events: Vec<LoggedEvent> = Vec::new();
847        for r in stmt.query_map([], Self::row_to_event)? {
848            events.push(r??);
849        }
850        drop(stmt);
851
852        let mut prev = GENESIS_HASH.to_string();
853        for ev in events {
854            let ts = ev.ts.format(&Rfc3339)?;
855            let argv_json = serde_json::to_string(&ev.argv)
856                .map_err(|e| LogError::Corrupt(format!("argv serialize: {e}")))?;
857            let hash = Self::compute_hash(
858                &prev,
859                &ev.id,
860                &ts,
861                &ev.agent,
862                &ev.cwd,
863                &ev.command,
864                &argv_json,
865                ev.class,
866                ev.decision,
867                &ev.reason,
868                ev.tier,
869                ev.risk,
870                ev.summary.as_deref(),
871                ev.snapshot_id.as_deref(),
872            );
873            self.conn.execute(
874                "UPDATE events SET prev_hash = ?1, hash = ?2 WHERE seq = ?3",
875                rusqlite::params![prev, hash, ev.seq],
876            )?;
877            prev = hash;
878        }
879        Ok(())
880    }
881
882    /// Total number of events.
883    pub fn count(&self) -> Result<i64, LogError> {
884        Ok(self
885            .conn
886            .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?)
887    }
888
889    /// The highest sequence number (0 if empty). O(1) via the rowid index — cheap
890    /// enough to poll so the TUI only reloads when the log actually grew.
891    pub fn latest_seq(&self) -> Result<i64, LogError> {
892        Ok(self
893            .conn
894            .query_row("SELECT COALESCE(MAX(seq), 0) FROM events", [], |r| r.get(0))?)
895    }
896
897    /// Walk the chain from genesis and confirm every link.
898    ///
899    /// Recomputes each row's hash from its stored fields and verifies it both
900    /// matches the stored `hash` and links to the previous row's `hash`.
901    pub fn verify_chain(&self) -> Result<ChainStatus, LogError> {
902        let mut stmt = self.conn.prepare(
903            r#"
904            SELECT seq, id, ts, agent, cwd, command, argv, class, decision, reason, tier,
905                   risk, summary, snapshot_id, prev_hash, hash, session, 0 AS redacted
906            FROM events ORDER BY seq ASC
907            "#,
908        )?;
909        let rows = stmt.query_map([], Self::row_to_event)?;
910
911        let mut expected_prev = GENESIS_HASH.to_string();
912        for r in rows {
913            let ev = r??;
914
915            if ev.prev_hash != expected_prev {
916                return Ok(ChainStatus::Broken {
917                    seq: ev.seq,
918                    detail: format!(
919                        "prev_hash {} does not link to predecessor {}",
920                        short(&ev.prev_hash),
921                        short(&expected_prev)
922                    ),
923                });
924            }
925
926            let ts = ev.ts.format(&Rfc3339)?;
927            let argv_json = serde_json::to_string(&ev.argv)
928                .map_err(|e| LogError::Corrupt(format!("argv serialize: {e}")))?;
929            let recomputed = Self::compute_hash(
930                &ev.prev_hash,
931                &ev.id,
932                &ts,
933                &ev.agent,
934                &ev.cwd,
935                &ev.command,
936                &argv_json,
937                ev.class,
938                ev.decision,
939                &ev.reason,
940                ev.tier,
941                ev.risk,
942                ev.summary.as_deref(),
943                ev.snapshot_id.as_deref(),
944            );
945            if recomputed != ev.hash {
946                return Ok(ChainStatus::Broken {
947                    seq: ev.seq,
948                    detail: format!(
949                        "row contents do not match stored hash {} (recomputed {})",
950                        short(&ev.hash),
951                        short(&recomputed)
952                    ),
953                });
954            }
955
956            expected_prev = ev.hash;
957        }
958
959        Ok(ChainStatus::Intact)
960    }
961
962    fn row_to_event(row: &rusqlite::Row<'_>) -> rusqlite::Result<Result<LoggedEvent, LogError>> {
963        // Pull raw columns first; map fallible conversions into LogError.
964        let seq: i64 = row.get(0)?;
965        let id_s: String = row.get(1)?;
966        let ts_s: String = row.get(2)?;
967        let agent: String = row.get(3)?;
968        let cwd: String = row.get(4)?;
969        let command: String = row.get(5)?;
970        let argv_s: String = row.get(6)?;
971        let class_s: String = row.get(7)?;
972        let decision_s: String = row.get(8)?;
973        let reason: String = row.get(9)?;
974        let tier: i64 = row.get(10)?;
975        let risk: Option<i64> = row.get(11)?;
976        let summary: Option<String> = row.get(12)?;
977        let snapshot_id: Option<String> = row.get(13)?;
978        let prev_hash: String = row.get(14)?;
979        let hash: String = row.get(15)?;
980        let session: Option<String> = row.get(16)?;
981        let redacted: bool = row.get(17)?;
982
983        Ok((|| {
984            let id = Uuid::parse_str(&id_s)
985                .map_err(|e| LogError::Corrupt(format!("uuid {id_s}: {e}")))?;
986            let ts = OffsetDateTime::parse(&ts_s, &Rfc3339)?;
987            let argv: Vec<String> = serde_json::from_str(&argv_s)
988                .map_err(|e| LogError::Corrupt(format!("argv {argv_s}: {e}")))?;
989            let class = parse_class(&class_s)?;
990            let decision = parse_decision(&decision_s)?;
991            let tier = u8::try_from(tier)
992                .map_err(|_| LogError::Corrupt(format!("tier out of range: {tier}")))?;
993            let risk = match risk {
994                Some(r) => Some(
995                    u8::try_from(r)
996                        .map_err(|_| LogError::Corrupt(format!("risk out of range: {r}")))?,
997                ),
998                None => None,
999            };
1000            Ok(LoggedEvent {
1001                seq,
1002                id,
1003                ts,
1004                agent,
1005                cwd,
1006                command,
1007                argv,
1008                class,
1009                decision,
1010                reason,
1011                tier,
1012                risk,
1013                summary,
1014                snapshot_id,
1015                session,
1016                prev_hash,
1017                hash,
1018                redacted,
1019            })
1020        })())
1021    }
1022}
1023
1024fn parse_class(s: &str) -> Result<Class, LogError> {
1025    match s {
1026        "safe" => Ok(Class::Safe),
1027        "catastrophic" => Ok(Class::Catastrophic),
1028        "ambiguous" => Ok(Class::Ambiguous),
1029        other => Err(LogError::Corrupt(format!("unknown class: {other}"))),
1030    }
1031}
1032
1033fn parse_decision(s: &str) -> Result<Decision, LogError> {
1034    match s {
1035        "allow" => Ok(Decision::Allow),
1036        "deny" => Ok(Decision::Deny),
1037        "hold" => Ok(Decision::Hold),
1038        other => Err(LogError::Corrupt(format!("unknown decision: {other}"))),
1039    }
1040}
1041
1042fn short(hash: &str) -> String {
1043    hash.chars().take(12).collect()
1044}