Skip to main content

kintsugi_core/
log.rs

1//! Append-only, hash-chained event log (SQLite).
2//!
3//! Every observed command becomes one immutable row. Each row's `hash` is
4//! `SHA-256(prev_hash || canonical(row))`, so any edit to a past row — or any
5//! reordering — breaks the chain and is detectable by [`EventLog::verify_chain`].
6//!
7//! Security spine: the event chain is append-only. Day-to-day "delete" is
8//! **redaction** — an append-only [`redactions`](EventLog::redact) row that hides
9//! an entry from views while the original row and the hash chain stay intact and
10//! verifiable. True erasure is the separate, explicit [`EventLog::purge_matching`]
11//! (hard delete + re-chain): it deliberately rewrites history for the purged span
12//! and records a marker event, and is never invoked automatically.
13
14use rusqlite::{Connection, OptionalExtension};
15use sha2::{Digest, Sha256};
16use time::format_description::well_known::Rfc3339;
17use time::OffsetDateTime;
18use uuid::Uuid;
19
20use crate::types::{Class, Decision, ProposedCommand, Verdict};
21
22/// The genesis predecessor hash for the very first event.
23pub const GENESIS_HASH: &str = "0000000000000000000000000000000000000000000000000000000000000000";
24
25/// Errors from the event log.
26#[derive(Debug, thiserror::Error)]
27pub enum LogError {
28    #[error("database error: {0}")]
29    Db(#[from] rusqlite::Error),
30    #[error("time formatting error: {0}")]
31    Time(#[from] time::error::Format),
32    #[error("stored timestamp is not valid RFC3339: {0}")]
33    TimeParse(#[from] time::error::Parse),
34    #[error("stored value is not valid: {0}")]
35    Corrupt(String),
36}
37
38/// A single immutable row of the event log.
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct LoggedEvent {
41    /// Monotonic sequence number (storage rowid).
42    pub seq: i64,
43    /// The originating command id.
44    pub id: Uuid,
45    /// When the command was observed.
46    pub ts: OffsetDateTime,
47    /// Originating agent.
48    pub agent: String,
49    /// Working directory (stored as a string).
50    pub cwd: String,
51    /// The raw command, preserved verbatim.
52    pub command: String,
53    /// The argument vector.
54    pub argv: Vec<String>,
55    /// Rule-engine classification.
56    pub class: Class,
57    /// The decision recorded.
58    pub decision: Decision,
59    /// The rule name or resolution reason behind the decision.
60    pub reason: String,
61    /// Tier that produced the decision.
62    pub tier: u8,
63    /// Optional severity score.
64    pub risk: Option<u8>,
65    /// Optional one-sentence summary.
66    pub summary: Option<String>,
67    /// Optional snapshot reference.
68    pub snapshot_id: Option<String>,
69    /// Optional originating session id (view metadata; not part of the hash).
70    pub session: Option<String>,
71    /// Hash of the predecessor row.
72    pub prev_hash: String,
73    /// This row's hash.
74    pub hash: String,
75    /// Whether this event has been redacted (hidden from default views).
76    pub redacted: bool,
77}
78
79/// A filter over the event log, used by views, redaction, and purge.
80#[derive(Debug, Clone, Default)]
81pub struct Filter {
82    /// Restrict to one agent (`claude-code`, `cursor`, `shim`, …).
83    pub agent: Option<String>,
84    /// Restrict to one session id.
85    pub session: Option<String>,
86    /// Only events at or after this instant.
87    pub since: Option<OffsetDateTime>,
88    /// Only events strictly before this instant.
89    pub until: Option<OffsetDateTime>,
90    /// Case-insensitive substring match on the raw command.
91    pub grep: Option<String>,
92    /// Restrict to one classification.
93    pub class: Option<Class>,
94    /// Include redacted rows (default: hidden).
95    pub include_redacted: bool,
96    /// Cap the number of rows returned (newest kept).
97    pub limit: Option<usize>,
98    /// Skip this many of the newest matching rows before applying `limit` —
99    /// the page offset for `kintsugi log --page N`.
100    pub offset: Option<usize>,
101}
102
103impl Filter {
104    /// Build the SQL `WHERE` body (without the `WHERE` keyword) and its params.
105    /// `events`-qualified so it composes with the redaction LEFT JOIN.
106    fn where_clause(&self) -> (String, Vec<Box<dyn rusqlite::ToSql>>) {
107        let mut clauses: Vec<String> = Vec::new();
108        let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
109        if let Some(a) = &self.agent {
110            clauses.push("events.agent = ?".into());
111            params.push(Box::new(a.clone()));
112        }
113        if let Some(s) = &self.session {
114            clauses.push("events.session = ?".into());
115            params.push(Box::new(s.clone()));
116        }
117        if let Some(c) = &self.class {
118            clauses.push("events.class = ?".into());
119            params.push(Box::new(c.as_str().to_string()));
120        }
121        if let Some(g) = &self.grep {
122            clauses.push("events.command LIKE ? ESCAPE '\\'".into());
123            params.push(Box::new(format!("%{}%", like_escape(g))));
124        }
125        // ts is stored as RFC3339 text; lexical compare is chronological for UTC Z.
126        if let Some(since) = &self.since {
127            if let Ok(s) = since.format(&Rfc3339) {
128                clauses.push("events.ts >= ?".into());
129                params.push(Box::new(s));
130            }
131        }
132        if let Some(until) = &self.until {
133            if let Ok(s) = until.format(&Rfc3339) {
134                clauses.push("events.ts < ?".into());
135                params.push(Box::new(s));
136            }
137        }
138        if !self.include_redacted {
139            clauses.push("r.event_id IS NULL".into());
140        }
141        let body = if clauses.is_empty() {
142            "1=1".to_string()
143        } else {
144            clauses.join(" AND ")
145        };
146        (body, params)
147    }
148}
149
150/// Escape LIKE wildcards so a user's grep text matches literally.
151fn like_escape(s: &str) -> String {
152    s.replace('\\', "\\\\")
153        .replace('%', "\\%")
154        .replace('_', "\\_")
155}
156
157/// One entry in the approval queue (a held command awaiting a human decision).
158#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
159pub struct PendingItem {
160    /// The held command (its `id` is the queue id).
161    pub command: ProposedCommand,
162    /// Rule-engine classification.
163    pub class: Class,
164    /// Why it was held.
165    pub reason: String,
166    /// When it was enqueued.
167    #[serde(with = "time::serde::rfc3339")]
168    pub ts: OffsetDateTime,
169}
170
171/// The result of verifying the hash chain.
172#[derive(Debug, Clone, PartialEq, Eq)]
173pub enum ChainStatus {
174    /// The chain is intact and every row hashes correctly.
175    Intact,
176    /// A break was found.
177    Broken {
178        /// The sequence number of the offending row.
179        seq: i64,
180        /// What went wrong.
181        detail: String,
182    },
183}
184
185impl ChainStatus {
186    /// `true` when the chain is intact.
187    pub fn is_intact(&self) -> bool {
188        matches!(self, ChainStatus::Intact)
189    }
190}
191
192/// Handle to the append-only event log.
193pub struct EventLog {
194    conn: Connection,
195}
196
197impl EventLog {
198    /// Open (creating if needed) a log at `path`.
199    pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, LogError> {
200        let conn = Connection::open(path)?;
201        Self::init(conn)
202    }
203
204    /// Open an ephemeral in-memory log (used in tests).
205    pub fn open_in_memory() -> Result<Self, LogError> {
206        let conn = Connection::open_in_memory()?;
207        Self::init(conn)
208    }
209
210    fn init(conn: Connection) -> Result<Self, LogError> {
211        conn.pragma_update(None, "journal_mode", "WAL")?;
212        // NORMAL is safe under WAL and keeps per-event writes fast (no fsync per
213        // commit). A crash can only lose the very last transactions; the surviving
214        // chain stays intact and verifiable.
215        conn.pragma_update(None, "synchronous", "NORMAL")?;
216        conn.pragma_update(None, "foreign_keys", "ON")?;
217        // Block (rather than fail) when another process holds the write lock, so
218        // the read-modify-append in `log_event` serializes across processes instead
219        // of forking the hash chain on a shared prev_hash.
220        conn.busy_timeout(std::time::Duration::from_secs(5))?;
221        conn.execute_batch(
222            r#"
223            CREATE TABLE IF NOT EXISTS events (
224                seq        INTEGER PRIMARY KEY AUTOINCREMENT,
225                id         TEXT NOT NULL,
226                ts         TEXT NOT NULL,
227                agent      TEXT NOT NULL,
228                cwd        TEXT NOT NULL,
229                command    TEXT NOT NULL,
230                argv       TEXT NOT NULL,
231                class      TEXT NOT NULL,
232                decision   TEXT NOT NULL,
233                reason     TEXT NOT NULL,
234                tier       INTEGER NOT NULL,
235                risk       INTEGER,
236                summary    TEXT,
237                snapshot_id TEXT,
238                prev_hash  TEXT NOT NULL,
239                hash       TEXT NOT NULL,
240                session    TEXT
241            );
242
243            -- Append-only redactions: hide an event from views without mutating
244            -- it or breaking the chain. The original row and its hash are intact.
245            CREATE TABLE IF NOT EXISTS redactions (
246                event_id   TEXT PRIMARY KEY,
247                ts         TEXT NOT NULL,
248                reason     TEXT NOT NULL
249            );
250
251            -- Decision memory. Unlike `events`, this table is intentionally
252            -- mutable state: per-repo always-allow / always-deny by command hash.
253            CREATE TABLE IF NOT EXISTS memory (
254                repo         TEXT NOT NULL,
255                command_hash TEXT NOT NULL,
256                action       TEXT NOT NULL,
257                updated_at   TEXT NOT NULL,
258                PRIMARY KEY (repo, command_hash)
259            );
260
261            -- Snapshots taken before destructive ops, for `kintsugi undo`.
262            CREATE TABLE IF NOT EXISTS snapshots (
263                id         TEXT PRIMARY KEY,
264                seq        INTEGER,
265                ts         TEXT NOT NULL,
266                command    TEXT NOT NULL,
267                manifest   TEXT NOT NULL,
268                reverted   INTEGER NOT NULL DEFAULT 0
269            );
270
271            -- The approval queue: held commands awaiting a human decision.
272            -- Mutable state; status is 'pending' | 'approved' | 'denied'.
273            CREATE TABLE IF NOT EXISTS pending (
274                id          TEXT PRIMARY KEY,
275                ts          TEXT NOT NULL,
276                command     TEXT NOT NULL,
277                class       TEXT NOT NULL,
278                reason      TEXT NOT NULL,
279                status      TEXT NOT NULL DEFAULT 'pending',
280                updated_at  TEXT NOT NULL
281            );
282            "#,
283        )?;
284        // Migrate older DBs created before the `session` column existed.
285        let has_session = conn
286            .prepare("SELECT 1 FROM pragma_table_info('events') WHERE name = 'session'")?
287            .exists([])?;
288        if !has_session {
289            conn.execute_batch("ALTER TABLE events ADD COLUMN session TEXT")?;
290        }
291        Ok(Self { conn })
292    }
293
294    /// Add a held command to the approval queue (idempotent on its id).
295    pub fn enqueue_pending(
296        &self,
297        cmd: &ProposedCommand,
298        class: Class,
299        reason: &str,
300    ) -> Result<(), LogError> {
301        let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
302        let cmd_json = serde_json::to_string(cmd)
303            .map_err(|e| LogError::Corrupt(format!("pending command serialize: {e}")))?;
304        self.conn.execute(
305            "INSERT INTO pending (id, ts, command, class, reason, status, updated_at)
306             VALUES (?1, ?2, ?3, ?4, ?5, 'pending', ?2)
307             ON CONFLICT(id) DO NOTHING",
308            rusqlite::params![cmd.id.to_string(), now, cmd_json, class.as_str(), reason],
309        )?;
310        Ok(())
311    }
312
313    /// The current status of a queued command, if it is in the queue.
314    pub fn pending_status(&self, id: &str) -> Result<Option<String>, LogError> {
315        Ok(self
316            .conn
317            .query_row("SELECT status FROM pending WHERE id = ?1", [id], |r| {
318                r.get(0)
319            })
320            .optional()?)
321    }
322
323    /// Set a queued command's status (`approved` | `denied`).
324    pub fn set_pending_status(&self, id: &str, status: &str) -> Result<(), LogError> {
325        let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
326        self.conn.execute(
327            "UPDATE pending SET status = ?2, updated_at = ?3 WHERE id = ?1",
328            rusqlite::params![id, status, now],
329        )?;
330        Ok(())
331    }
332
333    /// Atomically move a queued command from status `from` to `to`. Returns true
334    /// iff *this* call performed the transition (the row existed and was `from`).
335    ///
336    /// This is the exactly-once guard: a held command must resolve/run once even
337    /// if two `kintsugi approve`/`kintsugi run` invocations race — only the winner of
338    /// the compare-and-swap proceeds; the loser sees `false` and does nothing.
339    pub fn cas_pending_status(&self, id: &str, from: &str, to: &str) -> Result<bool, LogError> {
340        let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
341        let changed = self.conn.execute(
342            "UPDATE pending SET status = ?3, updated_at = ?4 WHERE id = ?1 AND status = ?2",
343            rusqlite::params![id, from, to, now],
344        )?;
345        Ok(changed == 1)
346    }
347
348    /// The stored command for a queued id (for resolve/re-run).
349    pub fn pending_command(&self, id: &str) -> Result<Option<ProposedCommand>, LogError> {
350        let json: Option<String> = self
351            .conn
352            .query_row("SELECT command FROM pending WHERE id = ?1", [id], |r| {
353                r.get(0)
354            })
355            .optional()?;
356        match json {
357            Some(j) => Ok(Some(serde_json::from_str(&j).map_err(|e| {
358                LogError::Corrupt(format!("pending command parse: {e}"))
359            })?)),
360            None => Ok(None),
361        }
362    }
363
364    /// List the still-pending queued commands, oldest first.
365    pub fn list_pending(&self) -> Result<Vec<PendingItem>, LogError> {
366        let mut stmt = self.conn.prepare(
367            // Order by rowid alone — it IS insertion order. We used to lead with
368            // `ts ASC` and tiebreak on rowid, but the Windows runner's wall clock
369            // can step backwards by a few ms (NTP slew on the VM host), which
370            // made ts(second insert) < ts(first insert) and put the newer row
371            // before the older one. Rowid is monotonic, so it doesn't care.
372            "SELECT command, class, reason, ts FROM pending WHERE status = 'pending' ORDER BY rowid ASC",
373        )?;
374        let rows = stmt.query_map([], |row| {
375            Ok((
376                row.get::<_, String>(0)?,
377                row.get::<_, String>(1)?,
378                row.get::<_, String>(2)?,
379                row.get::<_, String>(3)?,
380            ))
381        })?;
382        let mut out = Vec::new();
383        for r in rows {
384            let (cmd_json, class_s, reason, ts_s) = r?;
385            let command: ProposedCommand = serde_json::from_str(&cmd_json)
386                .map_err(|e| LogError::Corrupt(format!("pending parse: {e}")))?;
387            out.push(PendingItem {
388                command,
389                class: parse_class(&class_s)?,
390                reason,
391                ts: OffsetDateTime::parse(&ts_s, &Rfc3339)?,
392            });
393        }
394        Ok(out)
395    }
396
397    /// Record a snapshot taken before a destructive command.
398    pub fn record_snapshot(&self, manifest: &crate::snapshot::Manifest) -> Result<(), LogError> {
399        let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
400        let json = serde_json::to_string(manifest)
401            .map_err(|e| LogError::Corrupt(format!("manifest serialize: {e}")))?;
402        // Snapshots are recorded just before the event they guard is appended, so
403        // the guarded event's seq is the next rowid (= current count + 1).
404        let seq: i64 = self
405            .conn
406            .query_row("SELECT COUNT(*) + 1 FROM events", [], |r| r.get(0))?;
407        self.conn.execute(
408            "INSERT INTO snapshots (id, seq, ts, command, manifest, reverted) VALUES (?1, ?2, ?3, ?4, ?5, 0)",
409            rusqlite::params![manifest.id, seq, now, manifest.command, json],
410        )?;
411        Ok(())
412    }
413
414    /// Load all snapshots not yet reverted, newest first.
415    pub fn unreverted_snapshots(&self) -> Result<Vec<crate::snapshot::Manifest>, LogError> {
416        let mut stmt = self
417            .conn
418            .prepare("SELECT manifest FROM snapshots WHERE reverted = 0 ORDER BY rowid DESC")?;
419        let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
420        let mut out = Vec::new();
421        for r in rows {
422            let json = r?;
423            let m: crate::snapshot::Manifest = serde_json::from_str(&json)
424                .map_err(|e| LogError::Corrupt(format!("manifest parse: {e}")))?;
425            out.push(m);
426        }
427        Ok(out)
428    }
429
430    /// The most recent not-yet-reverted snapshot, if any.
431    pub fn latest_unreverted_snapshot(
432        &self,
433    ) -> Result<Option<crate::snapshot::Manifest>, LogError> {
434        Ok(self.unreverted_snapshots()?.into_iter().next())
435    }
436
437    /// Mark a snapshot as reverted (it has been undone).
438    pub fn mark_reverted(&self, id: &str) -> Result<(), LogError> {
439        self.conn
440            .execute("UPDATE snapshots SET reverted = 1 WHERE id = ?1", [id])?;
441        Ok(())
442    }
443
444    /// Remember a per-repo decision for an exact command (always-allow / -deny).
445    ///
446    /// Only `Allow` and `Deny` are meaningful; `Hold` is rejected.
447    pub fn remember(
448        &self,
449        repo: &str,
450        command_hash: &str,
451        action: crate::types::Decision,
452    ) -> Result<(), LogError> {
453        use crate::types::Decision;
454        if action == Decision::Hold {
455            return Err(LogError::Corrupt(
456                "cannot remember a Hold decision".to_string(),
457            ));
458        }
459        let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
460        self.conn.execute(
461            r#"
462            INSERT INTO memory (repo, command_hash, action, updated_at)
463            VALUES (?1, ?2, ?3, ?4)
464            ON CONFLICT(repo, command_hash) DO UPDATE SET action = ?3, updated_at = ?4
465            "#,
466            rusqlite::params![repo, command_hash, action.as_str(), now],
467        )?;
468        Ok(())
469    }
470
471    /// Look up a remembered decision for an exact command in a repo.
472    pub fn memory_lookup(
473        &self,
474        repo: &str,
475        command_hash: &str,
476    ) -> Result<Option<crate::types::Decision>, LogError> {
477        use crate::types::Decision;
478        let action: Option<String> = self
479            .conn
480            .query_row(
481                "SELECT action FROM memory WHERE repo = ?1 AND command_hash = ?2",
482                rusqlite::params![repo, command_hash],
483                |row| row.get(0),
484            )
485            .optional()?;
486        Ok(match action.as_deref() {
487            Some("allow") => Some(Decision::Allow),
488            Some("deny") => Some(Decision::Deny),
489            _ => None,
490        })
491    }
492
493    /// Compute the canonical hash for a row given its predecessor.
494    ///
495    /// The hash binds every immutable field plus the predecessor hash, so neither
496    /// a field edit nor a reordering can go unnoticed.
497    #[allow(clippy::too_many_arguments)]
498    fn compute_hash(
499        prev_hash: &str,
500        id: &Uuid,
501        ts_rfc3339: &str,
502        agent: &str,
503        cwd: &str,
504        command: &str,
505        argv_json: &str,
506        class: Class,
507        decision: Decision,
508        reason: &str,
509        tier: u8,
510        risk: Option<u8>,
511        summary: Option<&str>,
512        snapshot_id: Option<&str>,
513    ) -> String {
514        let payload = format!(
515            "{prev}\u{1f}{id}\u{1f}{ts}\u{1f}{agent}\u{1f}{cwd}\u{1f}{cmd}\u{1f}{argv}\u{1f}{class}\u{1f}{dec}\u{1f}{reason}\u{1f}{tier}\u{1f}{risk}\u{1f}{summary}\u{1f}{snap}",
516            prev = prev_hash,
517            id = id,
518            ts = ts_rfc3339,
519            agent = agent,
520            cwd = cwd,
521            cmd = command,
522            argv = argv_json,
523            class = class.as_str(),
524            dec = decision.as_str(),
525            reason = reason,
526            tier = tier,
527            risk = risk.map(|r| r.to_string()).unwrap_or_default(),
528            summary = summary.unwrap_or_default(),
529            snap = snapshot_id.unwrap_or_default(),
530        );
531        let mut hasher = Sha256::new();
532        hasher.update(payload.as_bytes());
533        hex::encode(hasher.finalize())
534    }
535
536    /// Return the hash of the most recent event, or [`GENESIS_HASH`] if empty.
537    fn head_hash(&self) -> Result<String, LogError> {
538        let hash: Option<String> = self
539            .conn
540            .query_row(
541                "SELECT hash FROM events ORDER BY seq DESC LIMIT 1",
542                [],
543                |row| row.get(0),
544            )
545            .optional()?;
546        Ok(hash.unwrap_or_else(|| GENESIS_HASH.to_string()))
547    }
548
549    /// The read-modify-append, run inside the write transaction. Returns
550    /// (prev_hash, hash, seq).
551    #[allow(clippy::too_many_arguments)]
552    #[allow(clippy::too_many_arguments)]
553    fn append_locked(
554        &self,
555        cmd: &ProposedCommand,
556        verdict: &Verdict,
557        ts: &str,
558        cwd: &str,
559        command: &str,
560        argv_json: &str,
561        snapshot_id: Option<&str>,
562    ) -> Result<(String, String, i64), LogError> {
563        let prev_hash = self.head_hash()?;
564        let hash = Self::compute_hash(
565            &prev_hash,
566            &cmd.id,
567            ts,
568            &cmd.agent,
569            cwd,
570            command,
571            argv_json,
572            verdict.class,
573            verdict.decision,
574            &verdict.reason,
575            verdict.tier,
576            verdict.risk,
577            verdict.summary.as_deref(),
578            snapshot_id,
579        );
580        self.conn.execute(
581            r#"
582            INSERT INTO events
583                (id, ts, agent, cwd, command, argv, class, decision, reason, tier, risk, summary, snapshot_id, prev_hash, hash, session)
584            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)
585            "#,
586            rusqlite::params![
587                cmd.id.to_string(),
588                ts,
589                cmd.agent,
590                cwd,
591                command,
592                argv_json,
593                verdict.class.as_str(),
594                verdict.decision.as_str(),
595                verdict.reason,
596                verdict.tier as i64,
597                verdict.risk.map(|r| r as i64),
598                verdict.summary,
599                snapshot_id,
600                prev_hash,
601                hash,
602                cmd.session,
603            ],
604        )?;
605        Ok((prev_hash, hash, self.conn.last_insert_rowid()))
606    }
607
608    /// Append one event built from a proposal and its verdict.
609    pub fn log_event(
610        &self,
611        cmd: &ProposedCommand,
612        verdict: &Verdict,
613        snapshot_id: Option<&str>,
614    ) -> Result<LoggedEvent, LogError> {
615        let ts = cmd.ts.format(&Rfc3339)?;
616        let cwd = cmd.cwd.to_string_lossy().to_string();
617
618        // Redact-before-hash (security spine #6): never let a command-line secret
619        // (DB connection strings, `-pSECRET`, `PGPASSWORD=…`, bearer tokens) enter
620        // the append-only, hash-chained log — it could not be scrubbed later. Only
621        // the secret *value* is replaced (rest verbatim); when nothing matches, the
622        // command/argv are stored byte-identically (so the common case and every
623        // existing test are unchanged). The argv is re-derived from the redacted
624        // command so it can't leak the secret either.
625        let red = crate::redact::redact_command(&cmd.raw);
626        let (command, argv): (String, Vec<String>) = if red.any() {
627            (red.text.clone(), crate::shell::split(&red.text))
628        } else {
629            (cmd.raw.clone(), cmd.argv.clone())
630        };
631        let argv_json = serde_json::to_string(&argv)
632            .map_err(|e| LogError::Corrupt(format!("argv serialize: {e}")))?;
633
634        // Serialize the read-modify-append: take the write lock immediately so a
635        // concurrent writer (another process) blocks and then reads the updated
636        // head, rather than both linking new rows to the same prev_hash.
637        self.conn.execute_batch("BEGIN IMMEDIATE")?;
638        let (prev_hash, hash, seq) =
639            match self.append_locked(cmd, verdict, &ts, &cwd, &command, &argv_json, snapshot_id) {
640                Ok(v) => {
641                    self.conn.execute_batch("COMMIT")?;
642                    v
643                }
644                Err(e) => {
645                    let _ = self.conn.execute_batch("ROLLBACK");
646                    return Err(e);
647                }
648            };
649
650        Ok(LoggedEvent {
651            seq,
652            id: cmd.id,
653            ts: cmd.ts,
654            agent: cmd.agent.clone(),
655            cwd,
656            command,
657            argv,
658            class: verdict.class,
659            decision: verdict.decision,
660            reason: verdict.reason.clone(),
661            tier: verdict.tier,
662            risk: verdict.risk,
663            summary: verdict.summary.clone(),
664            snapshot_id: snapshot_id.map(str::to_string),
665            session: cmd.session.clone(),
666            prev_hash,
667            hash,
668            redacted: false,
669        })
670    }
671
672    /// Return the most recent `n` non-redacted events, oldest first.
673    pub fn tail(&self, n: usize) -> Result<Vec<LoggedEvent>, LogError> {
674        self.query(&Filter {
675            limit: Some(n),
676            ..Filter::default()
677        })
678    }
679
680    /// Return events matching `filter`, oldest first (capped by `filter.limit`,
681    /// skipping `filter.offset` of the newest matches first for pagination).
682    pub fn query(&self, filter: &Filter) -> Result<Vec<LoggedEvent>, LogError> {
683        let (where_body, params) = filter.where_clause();
684        let limit = filter.limit.map(|n| n as i64).unwrap_or(-1);
685        let offset = filter.offset.map(|n| n as i64).unwrap_or(0);
686        // Take the page window of newest-by-seq rows (skip `offset`, take
687        // `limit`), then re-sort ascending so the page reads chronologically.
688        // SQLite accepts `LIMIT -1 OFFSET n` to mean "all, skipping n".
689        let sql = format!(
690            r#"
691            SELECT seq, id, ts, agent, cwd, command, argv, class, decision, reason, tier,
692                   risk, summary, snapshot_id, prev_hash, hash, session, redacted
693            FROM (
694                SELECT events.*, (r.event_id IS NOT NULL) AS redacted
695                FROM events LEFT JOIN redactions r ON r.event_id = events.id
696                WHERE {where_body}
697                ORDER BY events.seq DESC LIMIT ? OFFSET ?
698            ) ORDER BY seq ASC
699            "#
700        );
701        let mut stmt = self.conn.prepare(&sql)?;
702        let mut bound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
703        bound.push(&limit);
704        bound.push(&offset);
705        let rows = stmt.query_map(bound.as_slice(), Self::row_to_event)?;
706        let mut out = Vec::new();
707        for r in rows {
708            out.push(r??);
709        }
710        Ok(out)
711    }
712
713    /// Count events matching `filter` (ignores `limit`).
714    pub fn count_matching(&self, filter: &Filter) -> Result<i64, LogError> {
715        let (where_body, params) = filter.where_clause();
716        let sql = format!(
717            "SELECT COUNT(*) FROM events LEFT JOIN redactions r ON r.event_id = events.id WHERE {where_body}"
718        );
719        let bound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
720        Ok(self
721            .conn
722            .query_row(&sql, bound.as_slice(), |row| row.get(0))?)
723    }
724
725    /// Redact a single event by id (append-only; idempotent). Returns whether a
726    /// matching, not-already-redacted event existed.
727    pub fn redact(&self, event_id: &str, reason: &str) -> Result<bool, LogError> {
728        let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
729        let exists: bool = self
730            .conn
731            .prepare("SELECT 1 FROM events WHERE id = ?1")?
732            .exists([event_id])?;
733        if !exists {
734            return Ok(false);
735        }
736        let n = self.conn.execute(
737            "INSERT INTO redactions (event_id, ts, reason) VALUES (?1, ?2, ?3)
738             ON CONFLICT(event_id) DO NOTHING",
739            rusqlite::params![event_id, now, reason],
740        )?;
741        Ok(n > 0)
742    }
743
744    /// Redact every event matching `filter` (newest-first, no limit applied).
745    /// Returns the number newly redacted.
746    pub fn redact_matching(&self, filter: &Filter, reason: &str) -> Result<usize, LogError> {
747        // Match against not-yet-redacted rows regardless of the filter's flag.
748        let f = Filter {
749            include_redacted: false,
750            limit: None,
751            ..filter.clone()
752        };
753        let (where_body, params) = f.where_clause();
754        let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
755        let sql = format!(
756            "INSERT INTO redactions (event_id, ts, reason)
757             SELECT events.id, ?, ? FROM events
758             LEFT JOIN redactions r ON r.event_id = events.id
759             WHERE {where_body}"
760        );
761        let mut bound: Vec<&dyn rusqlite::ToSql> = vec![&now, &reason];
762        let pbound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
763        bound.extend(pbound);
764        Ok(self.conn.execute(&sql, bound.as_slice())?)
765    }
766
767    /// **Hard erasure** — physically delete events matching `filter`, rebuild the
768    /// hash chain over the survivors, and append a marker event recording the
769    /// purge. Deliberately rewrites history for the purged span; never automatic.
770    /// Returns the number of events removed.
771    ///
772    /// `include_redacted`/`limit` on the filter are ignored: purge always targets
773    /// every matching row. Catastrophic-or-not is irrelevant — this is the user's
774    /// explicit erasure of their own local data.
775    pub fn purge_matching(&self, filter: &Filter, reason: &str) -> Result<usize, LogError> {
776        let f = Filter {
777            include_redacted: true,
778            limit: None,
779            ..filter.clone()
780        };
781        let (where_body, params) = f.where_clause();
782
783        self.conn.execute_batch("BEGIN IMMEDIATE")?;
784        let removed = (|| -> Result<usize, LogError> {
785            // Drop redaction rows for the doomed events, then the events.
786            let del_red = format!(
787                "DELETE FROM redactions WHERE event_id IN (
788                     SELECT events.id FROM events
789                     LEFT JOIN redactions r ON r.event_id = events.id WHERE {where_body})"
790            );
791            let bound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
792            self.conn.execute(&del_red, bound.as_slice())?;
793
794            let del = format!(
795                "DELETE FROM events WHERE id IN (
796                     SELECT events.id FROM events
797                     LEFT JOIN redactions r ON r.event_id = events.id WHERE {where_body})"
798            );
799            let bound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
800            let n = self.conn.execute(&del, bound.as_slice())?;
801            self.rechain()?;
802            Ok(n)
803        })();
804        let removed = match removed {
805            Ok(n) => {
806                self.conn.execute_batch("COMMIT")?;
807                n
808            }
809            Err(e) => {
810                let _ = self.conn.execute_batch("ROLLBACK");
811                return Err(e);
812            }
813        };
814
815        // Record the purge itself as an immutable marker (outside the txn so it
816        // links to the freshly re-chained head).
817        if removed > 0 {
818            let marker = ProposedCommand::new(
819                "kintsugi",
820                std::path::PathBuf::from("."),
821                vec!["purge".into()],
822                format!("kintsugi purge --hard ({removed} event(s): {reason})"),
823            );
824            let verdict = Verdict::rules(Class::Safe, Decision::Allow, "audit:purge");
825            self.log_event(&marker, &verdict, None)?;
826        }
827        Ok(removed)
828    }
829
830    /// Recompute prev_hash/hash for every surviving row in seq order so the chain
831    /// is valid again after a purge. Caller holds the write transaction.
832    fn rechain(&self) -> Result<(), LogError> {
833        let mut stmt = self.conn.prepare(
834            r#"
835            SELECT seq, id, ts, agent, cwd, command, argv, class, decision, reason, tier,
836                   risk, summary, snapshot_id, prev_hash, hash, session, 0 AS redacted
837            FROM events ORDER BY seq ASC
838            "#,
839        )?;
840        let mut events: Vec<LoggedEvent> = Vec::new();
841        for r in stmt.query_map([], Self::row_to_event)? {
842            events.push(r??);
843        }
844        drop(stmt);
845
846        let mut prev = GENESIS_HASH.to_string();
847        for ev in events {
848            let ts = ev.ts.format(&Rfc3339)?;
849            let argv_json = serde_json::to_string(&ev.argv)
850                .map_err(|e| LogError::Corrupt(format!("argv serialize: {e}")))?;
851            let hash = Self::compute_hash(
852                &prev,
853                &ev.id,
854                &ts,
855                &ev.agent,
856                &ev.cwd,
857                &ev.command,
858                &argv_json,
859                ev.class,
860                ev.decision,
861                &ev.reason,
862                ev.tier,
863                ev.risk,
864                ev.summary.as_deref(),
865                ev.snapshot_id.as_deref(),
866            );
867            self.conn.execute(
868                "UPDATE events SET prev_hash = ?1, hash = ?2 WHERE seq = ?3",
869                rusqlite::params![prev, hash, ev.seq],
870            )?;
871            prev = hash;
872        }
873        Ok(())
874    }
875
876    /// Total number of events.
877    pub fn count(&self) -> Result<i64, LogError> {
878        Ok(self
879            .conn
880            .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?)
881    }
882
883    /// Walk the chain from genesis and confirm every link.
884    ///
885    /// Recomputes each row's hash from its stored fields and verifies it both
886    /// matches the stored `hash` and links to the previous row's `hash`.
887    pub fn verify_chain(&self) -> Result<ChainStatus, LogError> {
888        let mut stmt = self.conn.prepare(
889            r#"
890            SELECT seq, id, ts, agent, cwd, command, argv, class, decision, reason, tier,
891                   risk, summary, snapshot_id, prev_hash, hash, session, 0 AS redacted
892            FROM events ORDER BY seq ASC
893            "#,
894        )?;
895        let rows = stmt.query_map([], Self::row_to_event)?;
896
897        let mut expected_prev = GENESIS_HASH.to_string();
898        for r in rows {
899            let ev = r??;
900
901            if ev.prev_hash != expected_prev {
902                return Ok(ChainStatus::Broken {
903                    seq: ev.seq,
904                    detail: format!(
905                        "prev_hash {} does not link to predecessor {}",
906                        short(&ev.prev_hash),
907                        short(&expected_prev)
908                    ),
909                });
910            }
911
912            let ts = ev.ts.format(&Rfc3339)?;
913            let argv_json = serde_json::to_string(&ev.argv)
914                .map_err(|e| LogError::Corrupt(format!("argv serialize: {e}")))?;
915            let recomputed = Self::compute_hash(
916                &ev.prev_hash,
917                &ev.id,
918                &ts,
919                &ev.agent,
920                &ev.cwd,
921                &ev.command,
922                &argv_json,
923                ev.class,
924                ev.decision,
925                &ev.reason,
926                ev.tier,
927                ev.risk,
928                ev.summary.as_deref(),
929                ev.snapshot_id.as_deref(),
930            );
931            if recomputed != ev.hash {
932                return Ok(ChainStatus::Broken {
933                    seq: ev.seq,
934                    detail: format!(
935                        "row contents do not match stored hash {} (recomputed {})",
936                        short(&ev.hash),
937                        short(&recomputed)
938                    ),
939                });
940            }
941
942            expected_prev = ev.hash;
943        }
944
945        Ok(ChainStatus::Intact)
946    }
947
948    fn row_to_event(row: &rusqlite::Row<'_>) -> rusqlite::Result<Result<LoggedEvent, LogError>> {
949        // Pull raw columns first; map fallible conversions into LogError.
950        let seq: i64 = row.get(0)?;
951        let id_s: String = row.get(1)?;
952        let ts_s: String = row.get(2)?;
953        let agent: String = row.get(3)?;
954        let cwd: String = row.get(4)?;
955        let command: String = row.get(5)?;
956        let argv_s: String = row.get(6)?;
957        let class_s: String = row.get(7)?;
958        let decision_s: String = row.get(8)?;
959        let reason: String = row.get(9)?;
960        let tier: i64 = row.get(10)?;
961        let risk: Option<i64> = row.get(11)?;
962        let summary: Option<String> = row.get(12)?;
963        let snapshot_id: Option<String> = row.get(13)?;
964        let prev_hash: String = row.get(14)?;
965        let hash: String = row.get(15)?;
966        let session: Option<String> = row.get(16)?;
967        let redacted: bool = row.get(17)?;
968
969        Ok((|| {
970            let id = Uuid::parse_str(&id_s)
971                .map_err(|e| LogError::Corrupt(format!("uuid {id_s}: {e}")))?;
972            let ts = OffsetDateTime::parse(&ts_s, &Rfc3339)?;
973            let argv: Vec<String> = serde_json::from_str(&argv_s)
974                .map_err(|e| LogError::Corrupt(format!("argv {argv_s}: {e}")))?;
975            let class = parse_class(&class_s)?;
976            let decision = parse_decision(&decision_s)?;
977            let tier = u8::try_from(tier)
978                .map_err(|_| LogError::Corrupt(format!("tier out of range: {tier}")))?;
979            let risk = match risk {
980                Some(r) => Some(
981                    u8::try_from(r)
982                        .map_err(|_| LogError::Corrupt(format!("risk out of range: {r}")))?,
983                ),
984                None => None,
985            };
986            Ok(LoggedEvent {
987                seq,
988                id,
989                ts,
990                agent,
991                cwd,
992                command,
993                argv,
994                class,
995                decision,
996                reason,
997                tier,
998                risk,
999                summary,
1000                snapshot_id,
1001                session,
1002                prev_hash,
1003                hash,
1004                redacted,
1005            })
1006        })())
1007    }
1008}
1009
1010fn parse_class(s: &str) -> Result<Class, LogError> {
1011    match s {
1012        "safe" => Ok(Class::Safe),
1013        "catastrophic" => Ok(Class::Catastrophic),
1014        "ambiguous" => Ok(Class::Ambiguous),
1015        other => Err(LogError::Corrupt(format!("unknown class: {other}"))),
1016    }
1017}
1018
1019fn parse_decision(s: &str) -> Result<Decision, LogError> {
1020    match s {
1021        "allow" => Ok(Decision::Allow),
1022        "deny" => Ok(Decision::Deny),
1023        "hold" => Ok(Decision::Hold),
1024        other => Err(LogError::Corrupt(format!("unknown decision: {other}"))),
1025    }
1026}
1027
1028fn short(hash: &str) -> String {
1029    hash.chars().take(12).collect()
1030}