1use rusqlite::{Connection, OptionalExtension};
15use sha2::{Digest, Sha256};
16use time::format_description::well_known::Rfc3339;
17use time::OffsetDateTime;
18use uuid::Uuid;
19
20use crate::types::{Class, Decision, ProposedCommand, Verdict};
21
22pub const GENESIS_HASH: &str = "0000000000000000000000000000000000000000000000000000000000000000";
24
25#[derive(Debug, thiserror::Error)]
27pub enum LogError {
28 #[error("database error: {0}")]
29 Db(#[from] rusqlite::Error),
30 #[error("time formatting error: {0}")]
31 Time(#[from] time::error::Format),
32 #[error("stored timestamp is not valid RFC3339: {0}")]
33 TimeParse(#[from] time::error::Parse),
34 #[error("stored value is not valid: {0}")]
35 Corrupt(String),
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct LoggedEvent {
41 pub seq: i64,
43 pub id: Uuid,
45 pub ts: OffsetDateTime,
47 pub agent: String,
49 pub cwd: String,
51 pub command: String,
53 pub argv: Vec<String>,
55 pub class: Class,
57 pub decision: Decision,
59 pub reason: String,
61 pub tier: u8,
63 pub risk: Option<u8>,
65 pub summary: Option<String>,
67 pub snapshot_id: Option<String>,
69 pub session: Option<String>,
71 pub prev_hash: String,
73 pub hash: String,
75 pub redacted: bool,
77}
78
79#[derive(Debug, Clone, Default)]
81pub struct Filter {
82 pub agent: Option<String>,
84 pub session: Option<String>,
86 pub since: Option<OffsetDateTime>,
88 pub until: Option<OffsetDateTime>,
90 pub grep: Option<String>,
92 pub class: Option<Class>,
94 pub include_redacted: bool,
96 pub limit: Option<usize>,
98 pub offset: Option<usize>,
101}
102
103impl Filter {
104 fn where_clause(&self) -> (String, Vec<Box<dyn rusqlite::ToSql>>) {
107 let mut clauses: Vec<String> = Vec::new();
108 let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
109 if let Some(a) = &self.agent {
110 clauses.push("events.agent = ?".into());
111 params.push(Box::new(a.clone()));
112 }
113 if let Some(s) = &self.session {
114 clauses.push("events.session = ?".into());
115 params.push(Box::new(s.clone()));
116 }
117 if let Some(c) = &self.class {
118 clauses.push("events.class = ?".into());
119 params.push(Box::new(c.as_str().to_string()));
120 }
121 if let Some(g) = &self.grep {
122 clauses.push("events.command LIKE ? ESCAPE '\\'".into());
123 params.push(Box::new(format!("%{}%", like_escape(g))));
124 }
125 if let Some(since) = &self.since {
127 if let Ok(s) = since.format(&Rfc3339) {
128 clauses.push("events.ts >= ?".into());
129 params.push(Box::new(s));
130 }
131 }
132 if let Some(until) = &self.until {
133 if let Ok(s) = until.format(&Rfc3339) {
134 clauses.push("events.ts < ?".into());
135 params.push(Box::new(s));
136 }
137 }
138 if !self.include_redacted {
139 clauses.push("r.event_id IS NULL".into());
140 }
141 let body = if clauses.is_empty() {
142 "1=1".to_string()
143 } else {
144 clauses.join(" AND ")
145 };
146 (body, params)
147 }
148}
149
150fn like_escape(s: &str) -> String {
152 s.replace('\\', "\\\\")
153 .replace('%', "\\%")
154 .replace('_', "\\_")
155}
156
157#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
159pub struct PendingItem {
160 pub command: ProposedCommand,
162 pub class: Class,
164 pub reason: String,
166 #[serde(with = "time::serde::rfc3339")]
168 pub ts: OffsetDateTime,
169}
170
171#[derive(Debug, Clone, PartialEq, Eq)]
173pub enum ChainStatus {
174 Intact,
176 Broken {
178 seq: i64,
180 detail: String,
182 },
183}
184
185impl ChainStatus {
186 pub fn is_intact(&self) -> bool {
188 matches!(self, ChainStatus::Intact)
189 }
190}
191
192pub struct EventLog {
194 conn: Connection,
195}
196
197impl EventLog {
198 pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, LogError> {
200 let conn = Connection::open(path)?;
201 Self::init(conn)
202 }
203
204 pub fn open_in_memory() -> Result<Self, LogError> {
206 let conn = Connection::open_in_memory()?;
207 Self::init(conn)
208 }
209
210 fn init(conn: Connection) -> Result<Self, LogError> {
211 conn.pragma_update(None, "journal_mode", "WAL")?;
212 conn.pragma_update(None, "synchronous", "NORMAL")?;
216 conn.pragma_update(None, "foreign_keys", "ON")?;
217 conn.busy_timeout(std::time::Duration::from_secs(5))?;
221 conn.execute_batch(
222 r#"
223 CREATE TABLE IF NOT EXISTS events (
224 seq INTEGER PRIMARY KEY AUTOINCREMENT,
225 id TEXT NOT NULL,
226 ts TEXT NOT NULL,
227 agent TEXT NOT NULL,
228 cwd TEXT NOT NULL,
229 command TEXT NOT NULL,
230 argv TEXT NOT NULL,
231 class TEXT NOT NULL,
232 decision TEXT NOT NULL,
233 reason TEXT NOT NULL,
234 tier INTEGER NOT NULL,
235 risk INTEGER,
236 summary TEXT,
237 snapshot_id TEXT,
238 prev_hash TEXT NOT NULL,
239 hash TEXT NOT NULL,
240 session TEXT
241 );
242
243 -- Append-only redactions: hide an event from views without mutating
244 -- it or breaking the chain. The original row and its hash are intact.
245 CREATE TABLE IF NOT EXISTS redactions (
246 event_id TEXT PRIMARY KEY,
247 ts TEXT NOT NULL,
248 reason TEXT NOT NULL
249 );
250
251 -- Decision memory. Unlike `events`, this table is intentionally
252 -- mutable state: per-repo always-allow / always-deny by command hash.
253 CREATE TABLE IF NOT EXISTS memory (
254 repo TEXT NOT NULL,
255 command_hash TEXT NOT NULL,
256 action TEXT NOT NULL,
257 updated_at TEXT NOT NULL,
258 PRIMARY KEY (repo, command_hash)
259 );
260
261 -- Snapshots taken before destructive ops, for `kintsugi undo`.
262 CREATE TABLE IF NOT EXISTS snapshots (
263 id TEXT PRIMARY KEY,
264 seq INTEGER,
265 ts TEXT NOT NULL,
266 command TEXT NOT NULL,
267 manifest TEXT NOT NULL,
268 reverted INTEGER NOT NULL DEFAULT 0
269 );
270
271 -- The approval queue: held commands awaiting a human decision.
272 -- Mutable state; status is 'pending' | 'approved' | 'denied'.
273 CREATE TABLE IF NOT EXISTS pending (
274 id TEXT PRIMARY KEY,
275 ts TEXT NOT NULL,
276 command TEXT NOT NULL,
277 class TEXT NOT NULL,
278 reason TEXT NOT NULL,
279 status TEXT NOT NULL DEFAULT 'pending',
280 updated_at TEXT NOT NULL
281 );
282 "#,
283 )?;
284 let has_session = conn
286 .prepare("SELECT 1 FROM pragma_table_info('events') WHERE name = 'session'")?
287 .exists([])?;
288 if !has_session {
289 conn.execute_batch("ALTER TABLE events ADD COLUMN session TEXT")?;
290 }
291 Ok(Self { conn })
292 }
293
294 pub fn enqueue_pending(
296 &self,
297 cmd: &ProposedCommand,
298 class: Class,
299 reason: &str,
300 ) -> Result<(), LogError> {
301 let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
302 let cmd_json = serde_json::to_string(cmd)
303 .map_err(|e| LogError::Corrupt(format!("pending command serialize: {e}")))?;
304 self.conn.execute(
305 "INSERT INTO pending (id, ts, command, class, reason, status, updated_at)
306 VALUES (?1, ?2, ?3, ?4, ?5, 'pending', ?2)
307 ON CONFLICT(id) DO NOTHING",
308 rusqlite::params![cmd.id.to_string(), now, cmd_json, class.as_str(), reason],
309 )?;
310 Ok(())
311 }
312
313 pub fn pending_status(&self, id: &str) -> Result<Option<String>, LogError> {
315 Ok(self
316 .conn
317 .query_row("SELECT status FROM pending WHERE id = ?1", [id], |r| {
318 r.get(0)
319 })
320 .optional()?)
321 }
322
323 pub fn set_pending_status(&self, id: &str, status: &str) -> Result<(), LogError> {
325 let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
326 self.conn.execute(
327 "UPDATE pending SET status = ?2, updated_at = ?3 WHERE id = ?1",
328 rusqlite::params![id, status, now],
329 )?;
330 Ok(())
331 }
332
333 pub fn cas_pending_status(&self, id: &str, from: &str, to: &str) -> Result<bool, LogError> {
340 let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
341 let changed = self.conn.execute(
342 "UPDATE pending SET status = ?3, updated_at = ?4 WHERE id = ?1 AND status = ?2",
343 rusqlite::params![id, from, to, now],
344 )?;
345 Ok(changed == 1)
346 }
347
348 pub fn pending_command(&self, id: &str) -> Result<Option<ProposedCommand>, LogError> {
350 let json: Option<String> = self
351 .conn
352 .query_row("SELECT command FROM pending WHERE id = ?1", [id], |r| {
353 r.get(0)
354 })
355 .optional()?;
356 match json {
357 Some(j) => Ok(Some(serde_json::from_str(&j).map_err(|e| {
358 LogError::Corrupt(format!("pending command parse: {e}"))
359 })?)),
360 None => Ok(None),
361 }
362 }
363
364 pub fn list_pending(&self) -> Result<Vec<PendingItem>, LogError> {
366 let mut stmt = self.conn.prepare(
367 "SELECT command, class, reason, ts FROM pending WHERE status = 'pending' ORDER BY rowid ASC",
373 )?;
374 let rows = stmt.query_map([], |row| {
375 Ok((
376 row.get::<_, String>(0)?,
377 row.get::<_, String>(1)?,
378 row.get::<_, String>(2)?,
379 row.get::<_, String>(3)?,
380 ))
381 })?;
382 let mut out = Vec::new();
383 for r in rows {
384 let (cmd_json, class_s, reason, ts_s) = r?;
385 let command: ProposedCommand = serde_json::from_str(&cmd_json)
386 .map_err(|e| LogError::Corrupt(format!("pending parse: {e}")))?;
387 out.push(PendingItem {
388 command,
389 class: parse_class(&class_s)?,
390 reason,
391 ts: OffsetDateTime::parse(&ts_s, &Rfc3339)?,
392 });
393 }
394 Ok(out)
395 }
396
397 pub fn record_snapshot(&self, manifest: &crate::snapshot::Manifest) -> Result<(), LogError> {
399 let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
400 let json = serde_json::to_string(manifest)
401 .map_err(|e| LogError::Corrupt(format!("manifest serialize: {e}")))?;
402 let seq: i64 = self
405 .conn
406 .query_row("SELECT COUNT(*) + 1 FROM events", [], |r| r.get(0))?;
407 self.conn.execute(
408 "INSERT INTO snapshots (id, seq, ts, command, manifest, reverted) VALUES (?1, ?2, ?3, ?4, ?5, 0)",
409 rusqlite::params![manifest.id, seq, now, manifest.command, json],
410 )?;
411 Ok(())
412 }
413
414 pub fn unreverted_snapshots(&self) -> Result<Vec<crate::snapshot::Manifest>, LogError> {
416 let mut stmt = self
417 .conn
418 .prepare("SELECT manifest FROM snapshots WHERE reverted = 0 ORDER BY rowid DESC")?;
419 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
420 let mut out = Vec::new();
421 for r in rows {
422 let json = r?;
423 let m: crate::snapshot::Manifest = serde_json::from_str(&json)
424 .map_err(|e| LogError::Corrupt(format!("manifest parse: {e}")))?;
425 out.push(m);
426 }
427 Ok(out)
428 }
429
430 pub fn latest_unreverted_snapshot(
432 &self,
433 ) -> Result<Option<crate::snapshot::Manifest>, LogError> {
434 Ok(self.unreverted_snapshots()?.into_iter().next())
435 }
436
437 pub fn mark_reverted(&self, id: &str) -> Result<(), LogError> {
439 self.conn
440 .execute("UPDATE snapshots SET reverted = 1 WHERE id = ?1", [id])?;
441 Ok(())
442 }
443
444 pub fn remember(
448 &self,
449 repo: &str,
450 command_hash: &str,
451 action: crate::types::Decision,
452 ) -> Result<(), LogError> {
453 use crate::types::Decision;
454 if action == Decision::Hold {
455 return Err(LogError::Corrupt(
456 "cannot remember a Hold decision".to_string(),
457 ));
458 }
459 let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
460 self.conn.execute(
461 r#"
462 INSERT INTO memory (repo, command_hash, action, updated_at)
463 VALUES (?1, ?2, ?3, ?4)
464 ON CONFLICT(repo, command_hash) DO UPDATE SET action = ?3, updated_at = ?4
465 "#,
466 rusqlite::params![repo, command_hash, action.as_str(), now],
467 )?;
468 Ok(())
469 }
470
471 pub fn memory_lookup(
473 &self,
474 repo: &str,
475 command_hash: &str,
476 ) -> Result<Option<crate::types::Decision>, LogError> {
477 use crate::types::Decision;
478 let action: Option<String> = self
479 .conn
480 .query_row(
481 "SELECT action FROM memory WHERE repo = ?1 AND command_hash = ?2",
482 rusqlite::params![repo, command_hash],
483 |row| row.get(0),
484 )
485 .optional()?;
486 Ok(match action.as_deref() {
487 Some("allow") => Some(Decision::Allow),
488 Some("deny") => Some(Decision::Deny),
489 _ => None,
490 })
491 }
492
493 #[allow(clippy::too_many_arguments)]
498 fn compute_hash(
499 prev_hash: &str,
500 id: &Uuid,
501 ts_rfc3339: &str,
502 agent: &str,
503 cwd: &str,
504 command: &str,
505 argv_json: &str,
506 class: Class,
507 decision: Decision,
508 reason: &str,
509 tier: u8,
510 risk: Option<u8>,
511 summary: Option<&str>,
512 snapshot_id: Option<&str>,
513 ) -> String {
514 let payload = format!(
515 "{prev}\u{1f}{id}\u{1f}{ts}\u{1f}{agent}\u{1f}{cwd}\u{1f}{cmd}\u{1f}{argv}\u{1f}{class}\u{1f}{dec}\u{1f}{reason}\u{1f}{tier}\u{1f}{risk}\u{1f}{summary}\u{1f}{snap}",
516 prev = prev_hash,
517 id = id,
518 ts = ts_rfc3339,
519 agent = agent,
520 cwd = cwd,
521 cmd = command,
522 argv = argv_json,
523 class = class.as_str(),
524 dec = decision.as_str(),
525 reason = reason,
526 tier = tier,
527 risk = risk.map(|r| r.to_string()).unwrap_or_default(),
528 summary = summary.unwrap_or_default(),
529 snap = snapshot_id.unwrap_or_default(),
530 );
531 let mut hasher = Sha256::new();
532 hasher.update(payload.as_bytes());
533 hex::encode(hasher.finalize())
534 }
535
536 fn head_hash(&self) -> Result<String, LogError> {
538 let hash: Option<String> = self
539 .conn
540 .query_row(
541 "SELECT hash FROM events ORDER BY seq DESC LIMIT 1",
542 [],
543 |row| row.get(0),
544 )
545 .optional()?;
546 Ok(hash.unwrap_or_else(|| GENESIS_HASH.to_string()))
547 }
548
549 #[allow(clippy::too_many_arguments)]
552 #[allow(clippy::too_many_arguments)]
553 fn append_locked(
554 &self,
555 cmd: &ProposedCommand,
556 verdict: &Verdict,
557 ts: &str,
558 cwd: &str,
559 command: &str,
560 argv_json: &str,
561 snapshot_id: Option<&str>,
562 ) -> Result<(String, String, i64), LogError> {
563 let prev_hash = self.head_hash()?;
564 let hash = Self::compute_hash(
565 &prev_hash,
566 &cmd.id,
567 ts,
568 &cmd.agent,
569 cwd,
570 command,
571 argv_json,
572 verdict.class,
573 verdict.decision,
574 &verdict.reason,
575 verdict.tier,
576 verdict.risk,
577 verdict.summary.as_deref(),
578 snapshot_id,
579 );
580 self.conn.execute(
581 r#"
582 INSERT INTO events
583 (id, ts, agent, cwd, command, argv, class, decision, reason, tier, risk, summary, snapshot_id, prev_hash, hash, session)
584 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)
585 "#,
586 rusqlite::params![
587 cmd.id.to_string(),
588 ts,
589 cmd.agent,
590 cwd,
591 command,
592 argv_json,
593 verdict.class.as_str(),
594 verdict.decision.as_str(),
595 verdict.reason,
596 verdict.tier as i64,
597 verdict.risk.map(|r| r as i64),
598 verdict.summary,
599 snapshot_id,
600 prev_hash,
601 hash,
602 cmd.session,
603 ],
604 )?;
605 Ok((prev_hash, hash, self.conn.last_insert_rowid()))
606 }
607
608 pub fn log_event(
610 &self,
611 cmd: &ProposedCommand,
612 verdict: &Verdict,
613 snapshot_id: Option<&str>,
614 ) -> Result<LoggedEvent, LogError> {
615 let ts = cmd.ts.format(&Rfc3339)?;
616 let cwd = cmd.cwd.to_string_lossy().to_string();
617
618 let red = crate::redact::redact_command(&cmd.raw);
626 let (command, argv): (String, Vec<String>) = if red.any() {
627 (red.text.clone(), crate::shell::split(&red.text))
628 } else {
629 (cmd.raw.clone(), cmd.argv.clone())
630 };
631 let argv_json = serde_json::to_string(&argv)
632 .map_err(|e| LogError::Corrupt(format!("argv serialize: {e}")))?;
633
634 self.conn.execute_batch("BEGIN IMMEDIATE")?;
638 let (prev_hash, hash, seq) =
639 match self.append_locked(cmd, verdict, &ts, &cwd, &command, &argv_json, snapshot_id) {
640 Ok(v) => {
641 self.conn.execute_batch("COMMIT")?;
642 v
643 }
644 Err(e) => {
645 let _ = self.conn.execute_batch("ROLLBACK");
646 return Err(e);
647 }
648 };
649
650 Ok(LoggedEvent {
651 seq,
652 id: cmd.id,
653 ts: cmd.ts,
654 agent: cmd.agent.clone(),
655 cwd,
656 command,
657 argv,
658 class: verdict.class,
659 decision: verdict.decision,
660 reason: verdict.reason.clone(),
661 tier: verdict.tier,
662 risk: verdict.risk,
663 summary: verdict.summary.clone(),
664 snapshot_id: snapshot_id.map(str::to_string),
665 session: cmd.session.clone(),
666 prev_hash,
667 hash,
668 redacted: false,
669 })
670 }
671
672 pub fn tail(&self, n: usize) -> Result<Vec<LoggedEvent>, LogError> {
674 self.query(&Filter {
675 limit: Some(n),
676 ..Filter::default()
677 })
678 }
679
680 pub fn query(&self, filter: &Filter) -> Result<Vec<LoggedEvent>, LogError> {
683 let (where_body, params) = filter.where_clause();
684 let limit = filter.limit.map(|n| n as i64).unwrap_or(-1);
685 let offset = filter.offset.map(|n| n as i64).unwrap_or(0);
686 let sql = format!(
690 r#"
691 SELECT seq, id, ts, agent, cwd, command, argv, class, decision, reason, tier,
692 risk, summary, snapshot_id, prev_hash, hash, session, redacted
693 FROM (
694 SELECT events.*, (r.event_id IS NOT NULL) AS redacted
695 FROM events LEFT JOIN redactions r ON r.event_id = events.id
696 WHERE {where_body}
697 ORDER BY events.seq DESC LIMIT ? OFFSET ?
698 ) ORDER BY seq ASC
699 "#
700 );
701 let mut stmt = self.conn.prepare(&sql)?;
702 let mut bound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
703 bound.push(&limit);
704 bound.push(&offset);
705 let rows = stmt.query_map(bound.as_slice(), Self::row_to_event)?;
706 let mut out = Vec::new();
707 for r in rows {
708 out.push(r??);
709 }
710 Ok(out)
711 }
712
713 pub fn count_matching(&self, filter: &Filter) -> Result<i64, LogError> {
715 let (where_body, params) = filter.where_clause();
716 let sql = format!(
717 "SELECT COUNT(*) FROM events LEFT JOIN redactions r ON r.event_id = events.id WHERE {where_body}"
718 );
719 let bound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
720 Ok(self
721 .conn
722 .query_row(&sql, bound.as_slice(), |row| row.get(0))?)
723 }
724
725 pub fn redact(&self, event_id: &str, reason: &str) -> Result<bool, LogError> {
728 let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
729 let exists: bool = self
730 .conn
731 .prepare("SELECT 1 FROM events WHERE id = ?1")?
732 .exists([event_id])?;
733 if !exists {
734 return Ok(false);
735 }
736 let n = self.conn.execute(
737 "INSERT INTO redactions (event_id, ts, reason) VALUES (?1, ?2, ?3)
738 ON CONFLICT(event_id) DO NOTHING",
739 rusqlite::params![event_id, now, reason],
740 )?;
741 Ok(n > 0)
742 }
743
744 pub fn redact_matching(&self, filter: &Filter, reason: &str) -> Result<usize, LogError> {
747 let f = Filter {
749 include_redacted: false,
750 limit: None,
751 ..filter.clone()
752 };
753 let (where_body, params) = f.where_clause();
754 let now = OffsetDateTime::now_utc().format(&Rfc3339)?;
755 let sql = format!(
756 "INSERT INTO redactions (event_id, ts, reason)
757 SELECT events.id, ?, ? FROM events
758 LEFT JOIN redactions r ON r.event_id = events.id
759 WHERE {where_body}"
760 );
761 let mut bound: Vec<&dyn rusqlite::ToSql> = vec![&now, &reason];
762 let pbound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
763 bound.extend(pbound);
764 Ok(self.conn.execute(&sql, bound.as_slice())?)
765 }
766
767 pub fn purge_matching(&self, filter: &Filter, reason: &str) -> Result<usize, LogError> {
776 let f = Filter {
777 include_redacted: true,
778 limit: None,
779 ..filter.clone()
780 };
781 let (where_body, params) = f.where_clause();
782
783 self.conn.execute_batch("BEGIN IMMEDIATE")?;
784 let removed = (|| -> Result<usize, LogError> {
785 let del_red = format!(
787 "DELETE FROM redactions WHERE event_id IN (
788 SELECT events.id FROM events
789 LEFT JOIN redactions r ON r.event_id = events.id WHERE {where_body})"
790 );
791 let bound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
792 self.conn.execute(&del_red, bound.as_slice())?;
793
794 let del = format!(
795 "DELETE FROM events WHERE id IN (
796 SELECT events.id FROM events
797 LEFT JOIN redactions r ON r.event_id = events.id WHERE {where_body})"
798 );
799 let bound: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
800 let n = self.conn.execute(&del, bound.as_slice())?;
801 self.rechain()?;
802 Ok(n)
803 })();
804 let removed = match removed {
805 Ok(n) => {
806 self.conn.execute_batch("COMMIT")?;
807 n
808 }
809 Err(e) => {
810 let _ = self.conn.execute_batch("ROLLBACK");
811 return Err(e);
812 }
813 };
814
815 if removed > 0 {
818 let marker = ProposedCommand::new(
819 "kintsugi",
820 std::path::PathBuf::from("."),
821 vec!["purge".into()],
822 format!("kintsugi purge --hard ({removed} event(s): {reason})"),
823 );
824 let verdict = Verdict::rules(Class::Safe, Decision::Allow, "audit:purge");
825 self.log_event(&marker, &verdict, None)?;
826 }
827 Ok(removed)
828 }
829
830 fn rechain(&self) -> Result<(), LogError> {
833 let mut stmt = self.conn.prepare(
834 r#"
835 SELECT seq, id, ts, agent, cwd, command, argv, class, decision, reason, tier,
836 risk, summary, snapshot_id, prev_hash, hash, session, 0 AS redacted
837 FROM events ORDER BY seq ASC
838 "#,
839 )?;
840 let mut events: Vec<LoggedEvent> = Vec::new();
841 for r in stmt.query_map([], Self::row_to_event)? {
842 events.push(r??);
843 }
844 drop(stmt);
845
846 let mut prev = GENESIS_HASH.to_string();
847 for ev in events {
848 let ts = ev.ts.format(&Rfc3339)?;
849 let argv_json = serde_json::to_string(&ev.argv)
850 .map_err(|e| LogError::Corrupt(format!("argv serialize: {e}")))?;
851 let hash = Self::compute_hash(
852 &prev,
853 &ev.id,
854 &ts,
855 &ev.agent,
856 &ev.cwd,
857 &ev.command,
858 &argv_json,
859 ev.class,
860 ev.decision,
861 &ev.reason,
862 ev.tier,
863 ev.risk,
864 ev.summary.as_deref(),
865 ev.snapshot_id.as_deref(),
866 );
867 self.conn.execute(
868 "UPDATE events SET prev_hash = ?1, hash = ?2 WHERE seq = ?3",
869 rusqlite::params![prev, hash, ev.seq],
870 )?;
871 prev = hash;
872 }
873 Ok(())
874 }
875
876 pub fn count(&self) -> Result<i64, LogError> {
878 Ok(self
879 .conn
880 .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?)
881 }
882
883 pub fn verify_chain(&self) -> Result<ChainStatus, LogError> {
888 let mut stmt = self.conn.prepare(
889 r#"
890 SELECT seq, id, ts, agent, cwd, command, argv, class, decision, reason, tier,
891 risk, summary, snapshot_id, prev_hash, hash, session, 0 AS redacted
892 FROM events ORDER BY seq ASC
893 "#,
894 )?;
895 let rows = stmt.query_map([], Self::row_to_event)?;
896
897 let mut expected_prev = GENESIS_HASH.to_string();
898 for r in rows {
899 let ev = r??;
900
901 if ev.prev_hash != expected_prev {
902 return Ok(ChainStatus::Broken {
903 seq: ev.seq,
904 detail: format!(
905 "prev_hash {} does not link to predecessor {}",
906 short(&ev.prev_hash),
907 short(&expected_prev)
908 ),
909 });
910 }
911
912 let ts = ev.ts.format(&Rfc3339)?;
913 let argv_json = serde_json::to_string(&ev.argv)
914 .map_err(|e| LogError::Corrupt(format!("argv serialize: {e}")))?;
915 let recomputed = Self::compute_hash(
916 &ev.prev_hash,
917 &ev.id,
918 &ts,
919 &ev.agent,
920 &ev.cwd,
921 &ev.command,
922 &argv_json,
923 ev.class,
924 ev.decision,
925 &ev.reason,
926 ev.tier,
927 ev.risk,
928 ev.summary.as_deref(),
929 ev.snapshot_id.as_deref(),
930 );
931 if recomputed != ev.hash {
932 return Ok(ChainStatus::Broken {
933 seq: ev.seq,
934 detail: format!(
935 "row contents do not match stored hash {} (recomputed {})",
936 short(&ev.hash),
937 short(&recomputed)
938 ),
939 });
940 }
941
942 expected_prev = ev.hash;
943 }
944
945 Ok(ChainStatus::Intact)
946 }
947
948 fn row_to_event(row: &rusqlite::Row<'_>) -> rusqlite::Result<Result<LoggedEvent, LogError>> {
949 let seq: i64 = row.get(0)?;
951 let id_s: String = row.get(1)?;
952 let ts_s: String = row.get(2)?;
953 let agent: String = row.get(3)?;
954 let cwd: String = row.get(4)?;
955 let command: String = row.get(5)?;
956 let argv_s: String = row.get(6)?;
957 let class_s: String = row.get(7)?;
958 let decision_s: String = row.get(8)?;
959 let reason: String = row.get(9)?;
960 let tier: i64 = row.get(10)?;
961 let risk: Option<i64> = row.get(11)?;
962 let summary: Option<String> = row.get(12)?;
963 let snapshot_id: Option<String> = row.get(13)?;
964 let prev_hash: String = row.get(14)?;
965 let hash: String = row.get(15)?;
966 let session: Option<String> = row.get(16)?;
967 let redacted: bool = row.get(17)?;
968
969 Ok((|| {
970 let id = Uuid::parse_str(&id_s)
971 .map_err(|e| LogError::Corrupt(format!("uuid {id_s}: {e}")))?;
972 let ts = OffsetDateTime::parse(&ts_s, &Rfc3339)?;
973 let argv: Vec<String> = serde_json::from_str(&argv_s)
974 .map_err(|e| LogError::Corrupt(format!("argv {argv_s}: {e}")))?;
975 let class = parse_class(&class_s)?;
976 let decision = parse_decision(&decision_s)?;
977 let tier = u8::try_from(tier)
978 .map_err(|_| LogError::Corrupt(format!("tier out of range: {tier}")))?;
979 let risk = match risk {
980 Some(r) => Some(
981 u8::try_from(r)
982 .map_err(|_| LogError::Corrupt(format!("risk out of range: {r}")))?,
983 ),
984 None => None,
985 };
986 Ok(LoggedEvent {
987 seq,
988 id,
989 ts,
990 agent,
991 cwd,
992 command,
993 argv,
994 class,
995 decision,
996 reason,
997 tier,
998 risk,
999 summary,
1000 snapshot_id,
1001 session,
1002 prev_hash,
1003 hash,
1004 redacted,
1005 })
1006 })())
1007 }
1008}
1009
1010fn parse_class(s: &str) -> Result<Class, LogError> {
1011 match s {
1012 "safe" => Ok(Class::Safe),
1013 "catastrophic" => Ok(Class::Catastrophic),
1014 "ambiguous" => Ok(Class::Ambiguous),
1015 other => Err(LogError::Corrupt(format!("unknown class: {other}"))),
1016 }
1017}
1018
1019fn parse_decision(s: &str) -> Result<Decision, LogError> {
1020 match s {
1021 "allow" => Ok(Decision::Allow),
1022 "deny" => Ok(Decision::Deny),
1023 "hold" => Ok(Decision::Hold),
1024 other => Err(LogError::Corrupt(format!("unknown decision: {other}"))),
1025 }
1026}
1027
1028fn short(hash: &str) -> String {
1029 hash.chars().take(12).collect()
1030}