1use rusqlite::{Connection, OptionalExtension};
15use sha2::{Digest, Sha256};
16use time::format_description::well_known::Rfc3339;
17use time::OffsetDateTime;
18use uuid::Uuid;
19
20use crate::types::{Class, Decision, ProposedCommand, Verdict};
21
22pub const GENESIS_HASH: &str = "0000000000000000000000000000000000000000000000000000000000000000";
24
25#[derive(Debug, thiserror::Error)]
27pub enum LogError {
28 #[error("database error: {0}")]
29 Db(#[from] rusqlite::Error),
30 #[error("time formatting error: {0}")]
31 Time(#[from] time::error::Format),
32 #[error("stored timestamp is not valid RFC3339: {0}")]
33 TimeParse(#[from] time::error::Parse),
34 #[error("stored value is not valid: {0}")]
35 Corrupt(String),
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct LoggedEvent {
41 pub seq: i64,
43 pub id: Uuid,
45 pub ts: OffsetDateTime,
47 pub agent: String,
49 pub cwd: String,
51 pub command: String,
53 pub argv: Vec<String>,
55 pub class: Class,
57 pub decision: Decision,
59 pub reason: String,
61 pub tier: u8,
63 pub risk: Option<u8>,
65 pub summary: Option<String>,
67 pub snapshot_id: Option<String>,
69 pub session: Option<String>,
71 pub prev_hash: String,
73 pub hash: String,
75 pub redacted: bool,
77}
78
79#[derive(Debug, Clone, Default)]
81pub struct Filter {
82 pub agent: Option<String>,
84 pub agent_not: Option<String>,
86 pub session: Option<String>,
88 pub since: Option<OffsetDateTime>,
90 pub until: Option<OffsetDateTime>,
92 pub grep: Option<String>,
94 pub class: Option<Class>,
96 pub include_redacted: bool,
98 pub limit: Option<usize>,
100 pub offset: Option<usize>,
103}
104
105impl Filter {
106 fn where_clause(&self) -> (String, Vec<Box<dyn rusqlite::ToSql>>) {
109 let mut clauses: Vec<String> = Vec::new();
110 let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
111 if let Some(a) = &self.agent {
112 clauses.push("events.agent = ?".into());
113 params.push(Box::new(a.clone()));
114 }
115 if let Some(a) = &self.agent_not {
116 clauses.push("events.agent != ?".into());
117 params.push(Box::new(a.clone()));
118 }
119 if let Some(s) = &self.session {
120 clauses.push("events.session = ?".into());
121 params.push(Box::new(s.clone()));
122 }
123 if let Some(c) = &self.class {
124 clauses.push("events.class = ?".into());
125 params.push(Box::new(c.as_str().to_string()));
126 }
127 if let Some(g) = &self.grep {
128 clauses.push("events.command LIKE ? ESCAPE '\\'".into());
129 params.push(Box::new(format!("%{}%", like_escape(g))));
130 }
131 if let Some(since) = &self.since {
133 if let Ok(s) = since.format(&Rfc3339) {
134 clauses.push("events.ts >= ?".into());
135 params.push(Box::new(s));
136 }
137 }
138 if let Some(until) = &self.until {
139 if let Ok(s) = until.format(&Rfc3339) {
140 clauses.push("events.ts < ?".into());
141 params.push(Box::new(s));
142 }
143 }
144 if !self.include_redacted {
145 clauses.push("r.event_id IS NULL".into());
146 }
147 let body = if clauses.is_empty() {
148 "1=1".to_string()
149 } else {
150 clauses.join(" AND ")
151 };
152 (body, params)
153 }
154}
155
156fn like_escape(s: &str) -> String {
158 s.replace('\\', "\\\\")
159 .replace('%', "\\%")
160 .replace('_', "\\_")
161}
162
163#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
165pub struct PendingItem {
166 pub command: ProposedCommand,
168 pub class: Class,
170 pub reason: String,
172 #[serde(with = "time::serde::rfc3339")]
174 pub ts: OffsetDateTime,
175}
176
177#[derive(Debug, Clone, PartialEq, Eq)]
179pub enum ChainStatus {
180 Intact,
182 Broken {
184 seq: i64,
186 detail: String,
188 },
189}
190
191impl ChainStatus {
192 pub fn is_intact(&self) -> bool {
194 matches!(self, ChainStatus::Intact)
195 }
196}
197
198pub struct EventLog {
200 conn: Connection,
201}
202
203impl EventLog {
204 pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, LogError> {
206 let conn = Connection::open(path)?;
207 Self::init(conn)
208 }
209
210 pub fn open_in_memory() -> Result<Self, LogError> {
212 let conn = Connection::open_in_memory()?;
213 Self::init(conn)
214 }
215
216 fn init(conn: Connection) -> Result<Self, LogError> {
217 conn.pragma_update(None, "journal_mode", "WAL")?;
218 conn.pragma_update(None, "synchronous", "NORMAL")?;
222 conn.pragma_update(None, "foreign_keys", "ON")?;
223 conn.busy_timeout(std::time::Duration::from_secs(5))?;
227 conn.execute_batch(
228 r#"
229 CREATE TABLE IF NOT EXISTS events (
230 seq INTEGER PRIMARY KEY AUTOINCREMENT,
231 id TEXT NOT NULL,
232 ts TEXT NOT NULL,
233 agent TEXT NOT NULL,
234 cwd TEXT NOT NULL,
235 command TEXT NOT NULL,
236 argv TEXT NOT NULL,
237 class TEXT NOT NULL,
238 decision TEXT NOT NULL,
239 reason TEXT NOT NULL,
240 tier INTEGER NOT NULL,
241 risk INTEGER,
242 summary TEXT,
243 snapshot_id TEXT,
244 prev_hash TEXT NOT NULL,
245 hash TEXT NOT NULL,
246 session TEXT
247 );
248
249 -- Append-only redactions: hide an event from views without mutating
250 -- it or breaking the chain. The original row and its hash are intact.
251 CREATE TABLE IF NOT EXISTS redactions (
252 event_id TEXT PRIMARY KEY,
253 ts TEXT NOT NULL,
254 reason TEXT NOT NULL
255 );
256
257 -- Decision memory. Unlike `events`, this table is intentionally
258 -- mutable state: per-repo always-allow / always-deny by command hash.
259 CREATE TABLE IF NOT EXISTS memory (
260 repo TEXT NOT NULL,
261 command_hash TEXT NOT NULL,
262 action TEXT NOT NULL,
263 updated_at TEXT NOT NULL,
264 PRIMARY KEY (repo, command_hash)
265 );
266
267 -- Snapshots taken before destructive ops, for `kintsugi undo`.
268 CREATE TABLE IF NOT EXISTS snapshots (
269 id TEXT PRIMARY KEY,
270 seq INTEGER,
271 ts TEXT NOT NULL,
272 command TEXT NOT NULL,
273 manifest TEXT NOT NULL,
274 reverted INTEGER NOT NULL DEFAULT 0
275 );
276
277 -- The approval queue: held commands awaiting a human decision.
278 -- Mutable state; status is 'pending' | 'approved' | 'denied'.
279 CREATE TABLE IF NOT EXISTS pending (
280 id TEXT PRIMARY KEY,
281 ts TEXT NOT NULL,
282 command TEXT NOT NULL,
283 class TEXT NOT NULL,
284 reason TEXT NOT NULL,
285 status TEXT NOT NULL DEFAULT 'pending',
286 updated_at TEXT NOT NULL
287 );
288 "#,
289 )?;
290 let has_session = conn
292 .prepare("SELECT 1 FROM pragma_table_info('events') WHERE name = 'session'")?
293 .exists([])?;
294 if !has_session {
295 conn.execute_batch("ALTER TABLE events ADD COLUMN session TEXT")?;
296 }
297 Ok(Self { conn })
298 }
299
300 pub fn enqueue_pending(
302 &self,
303 cmd: &ProposedCommand,
304 class: Class,
305 reason: &str,
306 ) -> Result<(), LogError> {
307 let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
308 let cmd_json = serde_json::to_string(cmd)
309 .map_err(|e| LogError::Corrupt(format!("pending command serialize: {e}")))?;
310 self.conn.execute(
311 "INSERT INTO pending (id, ts, command, class, reason, status, updated_at)
312 VALUES (?1, ?2, ?3, ?4, ?5, 'pending', ?2)
313 ON CONFLICT(id) DO NOTHING",
314 rusqlite::params![cmd.id.to_string(), now, cmd_json, class.as_str(), reason],
315 )?;
316 Ok(())
317 }
318
319 pub fn pending_status(&self, id: &str) -> Result<Option<String>, LogError> {
321 Ok(self
322 .conn
323 .query_row("SELECT status FROM pending WHERE id = ?1", [id], |r| {
324 r.get(0)
325 })
326 .optional()?)
327 }
328
329 pub fn set_pending_status(&self, id: &str, status: &str) -> Result<(), LogError> {
331 let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
332 self.conn.execute(
333 "UPDATE pending SET status = ?2, updated_at = ?3 WHERE id = ?1",
334 rusqlite::params![id, status, now],
335 )?;
336 Ok(())
337 }
338
339 pub fn cas_pending_status(&self, id: &str, from: &str, to: &str) -> Result<bool, LogError> {
346 let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
347 let changed = self.conn.execute(
348 "UPDATE pending SET status = ?3, updated_at = ?4 WHERE id = ?1 AND status = ?2",
349 rusqlite::params![id, from, to, now],
350 )?;
351 Ok(changed == 1)
352 }
353
354 pub fn pending_command(&self, id: &str) -> Result<Option<ProposedCommand>, LogError> {
356 let json: Option<String> = self
357 .conn
358 .query_row("SELECT command FROM pending WHERE id = ?1", [id], |r| {
359 r.get(0)
360 })
361 .optional()?;
362 match json {
363 Some(j) => Ok(Some(serde_json::from_str(&j).map_err(|e| {
364 LogError::Corrupt(format!("pending command parse: {e}"))
365 })?)),
366 None => Ok(None),
367 }
368 }
369
370 pub fn list_pending(&self) -> Result<Vec<PendingItem>, LogError> {
372 let mut stmt = self.conn.prepare(
373 "SELECT command, class, reason, ts FROM pending WHERE status = 'pending' ORDER BY rowid ASC",
379 )?;
380 let rows = stmt.query_map([], |row| {
381 Ok((
382 row.get::<_, String>(0)?,
383 row.get::<_, String>(1)?,
384 row.get::<_, String>(2)?,
385 row.get::<_, String>(3)?,
386 ))
387 })?;
388 let mut out = Vec::new();
389 for r in rows {
390 let (cmd_json, class_s, reason, ts_s) = r?;
391 let command: ProposedCommand = serde_json::from_str(&cmd_json)
392 .map_err(|e| LogError::Corrupt(format!("pending parse: {e}")))?;
393 out.push(PendingItem {
394 command,
395 class: parse_class(&class_s)?,
396 reason,
397 ts: OffsetDateTime::parse(&ts_s, &Rfc3339)?,
398 });
399 }
400 Ok(out)
401 }
402
403 pub fn record_snapshot(&self, manifest: &crate::snapshot::Manifest) -> Result<(), LogError> {
405 let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
406 let json = serde_json::to_string(manifest)
407 .map_err(|e| LogError::Corrupt(format!("manifest serialize: {e}")))?;
408 let seq: i64 = self
411 .conn
412 .query_row("SELECT COUNT(*) + 1 FROM events", [], |r| r.get(0))?;
413 self.conn.execute(
414 "INSERT INTO snapshots (id, seq, ts, command, manifest, reverted) VALUES (?1, ?2, ?3, ?4, ?5, 0)",
415 rusqlite::params![manifest.id, seq, now, manifest.command, json],
416 )?;
417 Ok(())
418 }
419
420 pub fn unreverted_snapshots(&self) -> Result<Vec<crate::snapshot::Manifest>, LogError> {
422 let mut stmt = self
423 .conn
424 .prepare("SELECT manifest FROM snapshots WHERE reverted = 0 ORDER BY rowid DESC")?;
425 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
426 let mut out = Vec::new();
427 for r in rows {
428 let json = r?;
429 let m: crate::snapshot::Manifest = serde_json::from_str(&json)
430 .map_err(|e| LogError::Corrupt(format!("manifest parse: {e}")))?;
431 out.push(m);
432 }
433 Ok(out)
434 }
435
436 pub fn latest_unreverted_snapshot(
438 &self,
439 ) -> Result<Option<crate::snapshot::Manifest>, LogError> {
440 Ok(self.unreverted_snapshots()?.into_iter().next())
441 }
442
443 pub fn mark_reverted(&self, id: &str) -> Result<(), LogError> {
445 self.conn
446 .execute("UPDATE snapshots SET reverted = 1 WHERE id = ?1", [id])?;
447 Ok(())
448 }
449
450 pub fn remember(
454 &self,
455 repo: &str,
456 command_hash: &str,
457 action: crate::types::Decision,
458 ) -> Result<(), LogError> {
459 use crate::types::Decision;
460 if action == Decision::Hold {
461 return Err(LogError::Corrupt(
462 "cannot remember a Hold decision".to_string(),
463 ));
464 }
465 let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
466 self.conn.execute(
467 r#"
468 INSERT INTO memory (repo, command_hash, action, updated_at)
469 VALUES (?1, ?2, ?3, ?4)
470 ON CONFLICT(repo, command_hash) DO UPDATE SET action = ?3, updated_at = ?4
471 "#,
472 rusqlite::params![repo, command_hash, action.as_str(), now],
473 )?;
474 Ok(())
475 }
476
477 pub fn memory_lookup(
479 &self,
480 repo: &str,
481 command_hash: &str,
482 ) -> Result<Option<crate::types::Decision>, LogError> {
483 use crate::types::Decision;
484 let action: Option<String> = self
485 .conn
486 .query_row(
487 "SELECT action FROM memory WHERE repo = ?1 AND command_hash = ?2",
488 rusqlite::params![repo, command_hash],
489 |row| row.get(0),
490 )
491 .optional()?;
492 Ok(match action.as_deref() {
493 Some("allow") => Some(Decision::Allow),
494 Some("deny") => Some(Decision::Deny),
495 _ => None,
496 })
497 }
498
499 #[allow(clippy::too_many_arguments)]
504 fn compute_hash(
505 prev_hash: &str,
506 id: &Uuid,
507 ts_rfc3339: &str,
508 agent: &str,
509 cwd: &str,
510 command: &str,
511 argv_json: &str,
512 class: Class,
513 decision: Decision,
514 reason: &str,
515 tier: u8,
516 risk: Option<u8>,
517 summary: Option<&str>,
518 snapshot_id: Option<&str>,
519 ) -> String {
520 let payload = format!(
521 "{prev}\u{1f}{id}\u{1f}{ts}\u{1f}{agent}\u{1f}{cwd}\u{1f}{cmd}\u{1f}{argv}\u{1f}{class}\u{1f}{dec}\u{1f}{reason}\u{1f}{tier}\u{1f}{risk}\u{1f}{summary}\u{1f}{snap}",
522 prev = prev_hash,
523 id = id,
524 ts = ts_rfc3339,
525 agent = agent,
526 cwd = cwd,
527 cmd = command,
528 argv = argv_json,
529 class = class.as_str(),
530 dec = decision.as_str(),
531 reason = reason,
532 tier = tier,
533 risk = risk.map(|r| r.to_string()).unwrap_or_default(),
534 summary = summary.unwrap_or_default(),
535 snap = snapshot_id.unwrap_or_default(),
536 );
537 let mut hasher = Sha256::new();
538 hasher.update(payload.as_bytes());
539 hex::encode(hasher.finalize())
540 }
541
542 fn head_hash(&self) -> Result<String, LogError> {
544 let hash: Option<String> = self
545 .conn
546 .query_row(
547 "SELECT hash FROM events ORDER BY seq DESC LIMIT 1",
548 [],
549 |row| row.get(0),
550 )
551 .optional()?;
552 Ok(hash.unwrap_or_else(|| GENESIS_HASH.to_string()))
553 }
554
555 #[allow(clippy::too_many_arguments)]
558 #[allow(clippy::too_many_arguments)]
559 fn append_locked(
560 &self,
561 cmd: &ProposedCommand,
562 verdict: &Verdict,
563 ts: &str,
564 cwd: &str,
565 command: &str,
566 argv_json: &str,
567 snapshot_id: Option<&str>,
568 ) -> Result<(String, String, i64), LogError> {
569 let prev_hash = self.head_hash()?;
570 let hash = Self::compute_hash(
571 &prev_hash,
572 &cmd.id,
573 ts,
574 &cmd.agent,
575 cwd,
576 command,
577 argv_json,
578 verdict.class,
579 verdict.decision,
580 &verdict.reason,
581 verdict.tier,
582 verdict.risk,
583 verdict.summary.as_deref(),
584 snapshot_id,
585 );
586 self.conn.execute(
587 r#"
588 INSERT INTO events
589 (id, ts, agent, cwd, command, argv, class, decision, reason, tier, risk, summary, snapshot_id, prev_hash, hash, session)
590 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)
591 "#,
592 rusqlite::params![
593 cmd.id.to_string(),
594 ts,
595 cmd.agent,
596 cwd,
597 command,
598 argv_json,
599 verdict.class.as_str(),
600 verdict.decision.as_str(),
601 verdict.reason,
602 verdict.tier as i64,
603 verdict.risk.map(|r| r as i64),
604 verdict.summary,
605 snapshot_id,
606 prev_hash,
607 hash,
608 cmd.session,
609 ],
610 )?;
611 Ok((prev_hash, hash, self.conn.last_insert_rowid()))
612 }
613
614 pub fn log_event(
616 &self,
617 cmd: &ProposedCommand,
618 verdict: &Verdict,
619 snapshot_id: Option<&str>,
620 ) -> Result<LoggedEvent, LogError> {
621 let ts = cmd.ts.format(&Rfc3339)?;
622 let cwd = cmd.cwd.to_string_lossy().to_string();
623
624 let red = crate::redact::redact_command(&cmd.raw);
632 let (command, argv): (String, Vec<String>) = if red.any() {
633 (red.text.clone(), crate::shell::split(&red.text))
634 } else {
635 (cmd.raw.clone(), cmd.argv.clone())
636 };
637 let argv_json = serde_json::to_string(&argv)
638 .map_err(|e| LogError::Corrupt(format!("argv serialize: {e}")))?;
639
640 self.conn.execute_batch("BEGIN IMMEDIATE")?;
644 let (prev_hash, hash, seq) =
645 match self.append_locked(cmd, verdict, &ts, &cwd, &command, &argv_json, snapshot_id) {
646 Ok(v) => {
647 self.conn.execute_batch("COMMIT")?;
648 v
649 }
650 Err(e) => {
651 let _ = self.conn.execute_batch("ROLLBACK");
652 return Err(e);
653 }
654 };
655
656 Ok(LoggedEvent {
657 seq,
658 id: cmd.id,
659 ts: cmd.ts,
660 agent: cmd.agent.clone(),
661 cwd,
662 command,
663 argv,
664 class: verdict.class,
665 decision: verdict.decision,
666 reason: verdict.reason.clone(),
667 tier: verdict.tier,
668 risk: verdict.risk,
669 summary: verdict.summary.clone(),
670 snapshot_id: snapshot_id.map(str::to_string),
671 session: cmd.session.clone(),
672 prev_hash,
673 hash,
674 redacted: false,
675 })
676 }
677
678 pub fn tail(&self, n: usize) -> Result<Vec<LoggedEvent>, LogError> {
680 self.query(&Filter {
681 limit: Some(n),
682 ..Filter::default()
683 })
684 }
685
686 pub fn query(&self, filter: &Filter) -> Result<Vec<LoggedEvent>, LogError> {
689 let (where_body, params) = filter.where_clause();
690 let limit = filter.limit.map(|n| n as i64).unwrap_or(-1);
691 let offset = filter.offset.map(|n| n as i64).unwrap_or(0);
692 let sql = format!(
696 r#"
697 SELECT seq, id, ts, agent, cwd, command, argv, class, decision, reason, tier,
698 risk, summary, snapshot_id, prev_hash, hash, session, redacted
699 FROM (
700 SELECT events.*, (r.event_id IS NOT NULL) AS redacted
701 FROM events LEFT JOIN redactions r ON r.event_id = events.id
702 WHERE {where_body}
703 ORDER BY events.seq DESC LIMIT ? OFFSET ?
704 ) ORDER BY seq ASC
705 "#
706 );
707 let mut stmt = self.conn.prepare(&sql)?;
708 let mut bound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
709 bound.push(&limit);
710 bound.push(&offset);
711 let rows = stmt.query_map(bound.as_slice(), Self::row_to_event)?;
712 let mut out = Vec::new();
713 for r in rows {
714 out.push(r??);
715 }
716 Ok(out)
717 }
718
719 pub fn count_matching(&self, filter: &Filter) -> Result<i64, LogError> {
721 let (where_body, params) = filter.where_clause();
722 let sql = format!(
723 "SELECT COUNT(*) FROM events LEFT JOIN redactions r ON r.event_id = events.id WHERE {where_body}"
724 );
725 let bound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
726 Ok(self
727 .conn
728 .query_row(&sql, bound.as_slice(), |row| row.get(0))?)
729 }
730
731 pub fn redact(&self, event_id: &str, reason: &str) -> Result<bool, LogError> {
734 let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
735 let exists: bool = self
736 .conn
737 .prepare("SELECT 1 FROM events WHERE id = ?1")?
738 .exists([event_id])?;
739 if !exists {
740 return Ok(false);
741 }
742 let n = self.conn.execute(
743 "INSERT INTO redactions (event_id, ts, reason) VALUES (?1, ?2, ?3)
744 ON CONFLICT(event_id) DO NOTHING",
745 rusqlite::params![event_id, now, reason],
746 )?;
747 Ok(n > 0)
748 }
749
750 pub fn redact_matching(&self, filter: &Filter, reason: &str) -> Result<usize, LogError> {
753 let f = Filter {
755 include_redacted: false,
756 limit: None,
757 ..filter.clone()
758 };
759 let (where_body, params) = f.where_clause();
760 let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
761 let sql = format!(
762 "INSERT INTO redactions (event_id, ts, reason)
763 SELECT events.id, ?, ? FROM events
764 LEFT JOIN redactions r ON r.event_id = events.id
765 WHERE {where_body}"
766 );
767 let mut bound: Vec<&dyn rusqlite::ToSql> = vec![&now, &reason];
768 let pbound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
769 bound.extend(pbound);
770 Ok(self.conn.execute(&sql, bound.as_slice())?)
771 }
772
773 pub fn purge_matching(&self, filter: &Filter, reason: &str) -> Result<usize, LogError> {
782 let f = Filter {
783 include_redacted: true,
784 limit: None,
785 ..filter.clone()
786 };
787 let (where_body, params) = f.where_clause();
788
789 self.conn.execute_batch("BEGIN IMMEDIATE")?;
790 let removed = (|| -> Result<usize, LogError> {
791 let del_red = format!(
793 "DELETE FROM redactions WHERE event_id IN (
794 SELECT events.id FROM events
795 LEFT JOIN redactions r ON r.event_id = events.id WHERE {where_body})"
796 );
797 let bound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
798 self.conn.execute(&del_red, bound.as_slice())?;
799
800 let del = format!(
801 "DELETE FROM events WHERE id IN (
802 SELECT events.id FROM events
803 LEFT JOIN redactions r ON r.event_id = events.id WHERE {where_body})"
804 );
805 let bound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
806 let n = self.conn.execute(&del, bound.as_slice())?;
807 self.rechain()?;
808 Ok(n)
809 })();
810 let removed = match removed {
811 Ok(n) => {
812 self.conn.execute_batch("COMMIT")?;
813 n
814 }
815 Err(e) => {
816 let _ = self.conn.execute_batch("ROLLBACK");
817 return Err(e);
818 }
819 };
820
821 if removed > 0 {
824 let marker = ProposedCommand::new(
825 "kintsugi",
826 std::path::PathBuf::from("."),
827 vec!["purge".into()],
828 format!("kintsugi purge --hard ({removed} event(s): {reason})"),
829 );
830 let verdict = Verdict::rules(Class::Safe, Decision::Allow, "audit:purge");
831 self.log_event(&marker, &verdict, None)?;
832 }
833 Ok(removed)
834 }
835
836 fn rechain(&self) -> Result<(), LogError> {
839 let mut stmt = self.conn.prepare(
840 r#"
841 SELECT seq, id, ts, agent, cwd, command, argv, class, decision, reason, tier,
842 risk, summary, snapshot_id, prev_hash, hash, session, 0 AS redacted
843 FROM events ORDER BY seq ASC
844 "#,
845 )?;
846 let mut events: Vec<LoggedEvent> = Vec::new();
847 for r in stmt.query_map([], Self::row_to_event)? {
848 events.push(r??);
849 }
850 drop(stmt);
851
852 let mut prev = GENESIS_HASH.to_string();
853 for ev in events {
854 let ts = ev.ts.format(&Rfc3339)?;
855 let argv_json = serde_json::to_string(&ev.argv)
856 .map_err(|e| LogError::Corrupt(format!("argv serialize: {e}")))?;
857 let hash = Self::compute_hash(
858 &prev,
859 &ev.id,
860 &ts,
861 &ev.agent,
862 &ev.cwd,
863 &ev.command,
864 &argv_json,
865 ev.class,
866 ev.decision,
867 &ev.reason,
868 ev.tier,
869 ev.risk,
870 ev.summary.as_deref(),
871 ev.snapshot_id.as_deref(),
872 );
873 self.conn.execute(
874 "UPDATE events SET prev_hash = ?1, hash = ?2 WHERE seq = ?3",
875 rusqlite::params![prev, hash, ev.seq],
876 )?;
877 prev = hash;
878 }
879 Ok(())
880 }
881
882 pub fn count(&self) -> Result<i64, LogError> {
884 Ok(self
885 .conn
886 .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?)
887 }
888
889 pub fn latest_seq(&self) -> Result<i64, LogError> {
892 Ok(self
893 .conn
894 .query_row("SELECT COALESCE(MAX(seq), 0) FROM events", [], |r| r.get(0))?)
895 }
896
897 pub fn verify_chain(&self) -> Result<ChainStatus, LogError> {
902 let mut stmt = self.conn.prepare(
903 r#"
904 SELECT seq, id, ts, agent, cwd, command, argv, class, decision, reason, tier,
905 risk, summary, snapshot_id, prev_hash, hash, session, 0 AS redacted
906 FROM events ORDER BY seq ASC
907 "#,
908 )?;
909 let rows = stmt.query_map([], Self::row_to_event)?;
910
911 let mut expected_prev = GENESIS_HASH.to_string();
912 for r in rows {
913 let ev = r??;
914
915 if ev.prev_hash != expected_prev {
916 return Ok(ChainStatus::Broken {
917 seq: ev.seq,
918 detail: format!(
919 "prev_hash {} does not link to predecessor {}",
920 short(&ev.prev_hash),
921 short(&expected_prev)
922 ),
923 });
924 }
925
926 let ts = ev.ts.format(&Rfc3339)?;
927 let argv_json = serde_json::to_string(&ev.argv)
928 .map_err(|e| LogError::Corrupt(format!("argv serialize: {e}")))?;
929 let recomputed = Self::compute_hash(
930 &ev.prev_hash,
931 &ev.id,
932 &ts,
933 &ev.agent,
934 &ev.cwd,
935 &ev.command,
936 &argv_json,
937 ev.class,
938 ev.decision,
939 &ev.reason,
940 ev.tier,
941 ev.risk,
942 ev.summary.as_deref(),
943 ev.snapshot_id.as_deref(),
944 );
945 if recomputed != ev.hash {
946 return Ok(ChainStatus::Broken {
947 seq: ev.seq,
948 detail: format!(
949 "row contents do not match stored hash {} (recomputed {})",
950 short(&ev.hash),
951 short(&recomputed)
952 ),
953 });
954 }
955
956 expected_prev = ev.hash;
957 }
958
959 Ok(ChainStatus::Intact)
960 }
961
962 fn row_to_event(row: &rusqlite::Row<'_>) -> rusqlite::Result<Result<LoggedEvent, LogError>> {
963 let seq: i64 = row.get(0)?;
965 let id_s: String = row.get(1)?;
966 let ts_s: String = row.get(2)?;
967 let agent: String = row.get(3)?;
968 let cwd: String = row.get(4)?;
969 let command: String = row.get(5)?;
970 let argv_s: String = row.get(6)?;
971 let class_s: String = row.get(7)?;
972 let decision_s: String = row.get(8)?;
973 let reason: String = row.get(9)?;
974 let tier: i64 = row.get(10)?;
975 let risk: Option<i64> = row.get(11)?;
976 let summary: Option<String> = row.get(12)?;
977 let snapshot_id: Option<String> = row.get(13)?;
978 let prev_hash: String = row.get(14)?;
979 let hash: String = row.get(15)?;
980 let session: Option<String> = row.get(16)?;
981 let redacted: bool = row.get(17)?;
982
983 Ok((|| {
984 let id = Uuid::parse_str(&id_s)
985 .map_err(|e| LogError::Corrupt(format!("uuid {id_s}: {e}")))?;
986 let ts = OffsetDateTime::parse(&ts_s, &Rfc3339)?;
987 let argv: Vec<String> = serde_json::from_str(&argv_s)
988 .map_err(|e| LogError::Corrupt(format!("argv {argv_s}: {e}")))?;
989 let class = parse_class(&class_s)?;
990 let decision = parse_decision(&decision_s)?;
991 let tier = u8::try_from(tier)
992 .map_err(|_| LogError::Corrupt(format!("tier out of range: {tier}")))?;
993 let risk = match risk {
994 Some(r) => Some(
995 u8::try_from(r)
996 .map_err(|_| LogError::Corrupt(format!("risk out of range: {r}")))?,
997 ),
998 None => None,
999 };
1000 Ok(LoggedEvent {
1001 seq,
1002 id,
1003 ts,
1004 agent,
1005 cwd,
1006 command,
1007 argv,
1008 class,
1009 decision,
1010 reason,
1011 tier,
1012 risk,
1013 summary,
1014 snapshot_id,
1015 session,
1016 prev_hash,
1017 hash,
1018 redacted,
1019 })
1020 })())
1021 }
1022}
1023
1024fn parse_class(s: &str) -> Result<Class, LogError> {
1025 match s {
1026 "safe" => Ok(Class::Safe),
1027 "catastrophic" => Ok(Class::Catastrophic),
1028 "ambiguous" => Ok(Class::Ambiguous),
1029 other => Err(LogError::Corrupt(format!("unknown class: {other}"))),
1030 }
1031}
1032
1033fn parse_decision(s: &str) -> Result<Decision, LogError> {
1034 match s {
1035 "allow" => Ok(Decision::Allow),
1036 "deny" => Ok(Decision::Deny),
1037 "hold" => Ok(Decision::Hold),
1038 other => Err(LogError::Corrupt(format!("unknown decision: {other}"))),
1039 }
1040}
1041
1042fn short(hash: &str) -> String {
1043 hash.chars().take(12).collect()
1044}