Skip to main content

codetether_agent/session/
journal.rs

1//! Validated-writeback journal (ClawVM §3).
2//!
3//! Every lifecycle transition that touches durable state — compaction,
4//! save, reset — goes through a three-phase transaction:
5//!
6//! 1. **Staging.** The caller proposes a typed [`Op`]; the journal
7//!    reserves a [`TxnId`] and records the staged entry.
8//! 2. **Validation.** The caller checks schema, provenance, scope,
9//!    non-destructive semantics, and page-invariant compliance. The
10//!    outcome is recorded.
11//! 3. **Commit.** A validated transaction commits; a rejected one
12//!    stays in the journal with its reason code and does *not* mutate
13//!    the owning state.
14//!
15//! Rejections are not errors — they are load-bearing evidence that the
16//! enforcement layer caught a would-be regression. ClawVM's LRU
17//! ablation shows fault elimination comes from this structural
18//! contract, not from clever selection heuristics.
19//!
20//! ## Format
21//!
22//! The journal is an append-only JSONL file named
23//! `<session-id>.journal.jsonl` co-located with the session JSON. Each
24//! line is a [`JournalEntry`] — cheap to tail, safe under concurrent
25//! append, and inspectable by hand during debugging.
26//!
27//! ## Phase A scope
28//!
29//! This module delivers the type layer plus the in-memory transaction
30//! state machine. The *consumers* (compaction writeback, page-invariant
31//! validation hooks) come online in Phase B alongside
32//! `DerivePolicy::Incremental`.
33//!
34//! ## Examples
35//!
36//! ```rust
37//! use codetether_agent::session::journal::{JournalEntry, Op, RejectReason, WritebackJournal};
38//!
39//! let mut journal = WritebackJournal::new("session-42");
40//! let txn = journal.stage(Op::Compaction {
41//!     before: 120,
42//!     after: 24,
43//! });
44//! journal
45//!     .commit(txn)
46//!     .expect("no pending validation => commit is allowed");
47//! assert_eq!(journal.entries().len(), 2);
48//! assert!(matches!(journal.entries()[1], JournalEntry::Committed { .. }));
49//! ```
50
51use chrono::{DateTime, Utc};
52use serde::{Deserialize, Serialize};
53use std::collections::HashMap;
54
55/// Opaque handle returned from [`WritebackJournal::stage`] and required
56/// by [`WritebackJournal::validate`] / [`WritebackJournal::commit`].
57///
58/// Monotonic per journal instance. Crashing between stage and commit
59/// leaves a `Staged` entry in the journal — the next load can reconcile
60/// or abandon it.
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
62pub struct TxnId(pub u64);
63
64/// Typed operation being journalled.
65///
66/// Extend by adding variants rather than free-form JSON so each new
67/// call site declares what it is doing.
68#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(tag = "op", rename_all = "snake_case")]
70pub enum Op {
71    /// Context-window enforcement rewrote the derived buffer.
72    Compaction { before: usize, after: usize },
73    /// `Session::save` persisted the transcript to disk.
74    Save,
75    /// TUI or orchestrator reset the session (e.g., `/clear`).
76    Reset,
77    /// Lu et al. reset-to-(prompt, summary) context reset (Phase B).
78    ContextReset,
79    /// MinIO history sink uploaded a delta.
80    HistorySinkUpload {
81        bucket: String,
82        key: String,
83        bytes: usize,
84    },
85}
86
87/// Reason codes for a rejected writeback.
88#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
89#[serde(tag = "reason", rename_all = "snake_case")]
90pub enum RejectReason {
91    /// A destructive overwrite was requested without an explicit
92    /// `allow_destructive` flag.
93    DestructiveOp,
94    /// Schema validation failed.
95    SchemaMismatch { detail: String },
96    /// Provenance / authorship check failed.
97    ProvenanceFail { detail: String },
98    /// The op would drop a page below its `min_fidelity` invariant.
99    InvariantViolation { detail: String },
100}
101
102/// A single line in the journal.
103#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
104#[serde(tag = "phase", rename_all = "snake_case")]
105pub enum JournalEntry {
106    /// An op was proposed and reserved a transaction id.
107    Staged {
108        txn: TxnId,
109        at: DateTime<Utc>,
110        op: Op,
111    },
112    /// Validation failed for a staged op. The transaction does not
113    /// commit and the state does not mutate.
114    Rejected {
115        txn: TxnId,
116        at: DateTime<Utc>,
117        reason: RejectReason,
118    },
119    /// A staged op passed validation (or skipped it) and committed.
120    Committed { txn: TxnId, at: DateTime<Utc> },
121}
122
123/// In-memory, per-session writeback journal.
124///
125/// Writes go into [`Self::entries`]; a future `flush_to_disk(path)`
126/// helper will stream them to `<session-id>.journal.jsonl`. Keeping
127/// disk I/O out of this type makes every phase fast to unit-test.
128#[derive(Debug, Clone)]
129pub struct WritebackJournal {
130    session_id: String,
131    next_id: u64,
132    pending: HashMap<TxnId, Op>,
133    entries: Vec<JournalEntry>,
134}
135
136impl WritebackJournal {
137    /// Create a fresh journal for `session_id`.
138    pub fn new(session_id: impl Into<String>) -> Self {
139        Self {
140            session_id: session_id.into(),
141            next_id: 0,
142            pending: HashMap::new(),
143            entries: Vec::new(),
144        }
145    }
146
147    /// The session this journal belongs to.
148    pub fn session_id(&self) -> &str {
149        &self.session_id
150    }
151
152    /// Every entry in the order it was appended.
153    pub fn entries(&self) -> &[JournalEntry] {
154        &self.entries
155    }
156
157    /// Stage an op and return its [`TxnId`]. Writes a `Staged` entry.
158    ///
159    /// # Examples
160    ///
161    /// ```rust
162    /// use codetether_agent::session::journal::{Op, WritebackJournal};
163    ///
164    /// let mut journal = WritebackJournal::new("s");
165    /// let txn = journal.stage(Op::Save);
166    /// assert_eq!(journal.entries().len(), 1);
167    /// journal.commit(txn).unwrap();
168    /// assert_eq!(journal.entries().len(), 2);
169    /// ```
170    pub fn stage(&mut self, op: Op) -> TxnId {
171        let txn = TxnId(self.next_id);
172        self.next_id += 1;
173        self.pending.insert(txn, op.clone());
174        self.entries.push(JournalEntry::Staged {
175            txn,
176            at: Utc::now(),
177            op,
178        });
179        txn
180    }
181
182    /// Mark a staged transaction as rejected with `reason`. The
183    /// transaction is dropped from the pending set — a subsequent
184    /// [`Self::commit`] on the same id returns `Err(RejectReason)`.
185    pub fn reject(&mut self, txn: TxnId, reason: RejectReason) {
186        self.pending.remove(&txn);
187        self.entries.push(JournalEntry::Rejected {
188            txn,
189            at: Utc::now(),
190            reason,
191        });
192    }
193
194    /// Commit a staged transaction that passed validation. Returns
195    /// `Err(RejectReason::SchemaMismatch)` when the transaction is
196    /// unknown (never staged, or already rejected / committed).
197    ///
198    /// # Errors
199    ///
200    /// See above — returns `Err` when the `TxnId` is not pending.
201    pub fn commit(&mut self, txn: TxnId) -> Result<(), RejectReason> {
202        if self.pending.remove(&txn).is_none() {
203            return Err(RejectReason::SchemaMismatch {
204                detail: format!("txn {:?} is not pending", txn),
205            });
206        }
207        self.entries.push(JournalEntry::Committed {
208            txn,
209            at: Utc::now(),
210        });
211        Ok(())
212    }
213
214    /// Number of currently staged (neither committed nor rejected)
215    /// transactions.
216    pub fn pending_count(&self) -> usize {
217        self.pending.len()
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224
225    #[test]
226    fn stage_then_commit_appends_two_entries() {
227        let mut journal = WritebackJournal::new("s1");
228        let txn = journal.stage(Op::Compaction {
229            before: 10,
230            after: 3,
231        });
232        assert_eq!(journal.pending_count(), 1);
233        journal.commit(txn).unwrap();
234        assert_eq!(journal.pending_count(), 0);
235
236        let entries = journal.entries();
237        assert_eq!(entries.len(), 2);
238        assert!(matches!(entries[0], JournalEntry::Staged { .. }));
239        assert!(matches!(entries[1], JournalEntry::Committed { .. }));
240    }
241
242    #[test]
243    fn rejected_writeback_does_not_commit() {
244        let mut journal = WritebackJournal::new("s1");
245        let txn = journal.stage(Op::Reset);
246        journal.reject(txn, RejectReason::DestructiveOp);
247
248        assert_eq!(journal.pending_count(), 0);
249        let entries = journal.entries();
250        assert_eq!(entries.len(), 2);
251        assert!(matches!(entries[1], JournalEntry::Rejected { .. }));
252
253        // Commit on a rejected txn is an error.
254        let err = journal.commit(txn).unwrap_err();
255        assert!(matches!(err, RejectReason::SchemaMismatch { .. }));
256    }
257
258    #[test]
259    fn txn_ids_are_monotonic() {
260        let mut journal = WritebackJournal::new("s1");
261        let a = journal.stage(Op::Save);
262        let b = journal.stage(Op::Save);
263        assert!(b.0 > a.0);
264    }
265
266    #[test]
267    fn journal_entry_round_trips_through_serde() {
268        let entry = JournalEntry::Rejected {
269            txn: TxnId(7),
270            at: Utc::now(),
271            reason: RejectReason::InvariantViolation {
272                detail: "constraint below structured".into(),
273            },
274        };
275        let json = serde_json::to_string(&entry).unwrap();
276        let back: JournalEntry = serde_json::from_str(&json).unwrap();
277        assert_eq!(entry, back);
278    }
279}