codetether_agent/session/journal.rs
1//! Validated-writeback journal (ClawVM §3).
2//!
3//! Every lifecycle transition that touches durable state — compaction,
4//! save, reset — goes through a three-phase transaction:
5//!
6//! 1. **Staging.** The caller proposes a typed [`Op`]; the journal
7//! reserves a [`TxnId`] and records the staged entry.
8//! 2. **Validation.** The caller checks schema, provenance, scope,
9//! non-destructive semantics, and page-invariant compliance. The
10//! outcome is recorded.
11//! 3. **Commit.** A validated transaction commits; a rejected one
12//! stays in the journal with its reason code and does *not* mutate
13//! the owning state.
14//!
15//! Rejections are not errors — they are load-bearing evidence that the
16//! enforcement layer caught a would-be regression. ClawVM's LRU
17//! ablation shows fault elimination comes from this structural
18//! contract, not from clever selection heuristics.
19//!
20//! ## Format
21//!
22//! The journal is an append-only JSONL file named
23//! `<session-id>.journal.jsonl` co-located with the session JSON. Each
24//! line is a [`JournalEntry`] — cheap to tail, safe under concurrent
25//! append, and inspectable by hand during debugging.
26//!
27//! ## Phase A scope
28//!
29//! This module delivers the type layer plus the in-memory transaction
30//! state machine. The *consumers* (compaction writeback, page-invariant
31//! validation hooks) come online in Phase B alongside
32//! `DerivePolicy::Incremental`.
33//!
34//! ## Examples
35//!
36//! ```rust
37//! use codetether_agent::session::journal::{JournalEntry, Op, RejectReason, WritebackJournal};
38//!
39//! let mut journal = WritebackJournal::new("session-42");
40//! let txn = journal.stage(Op::Compaction {
41//! before: 120,
42//! after: 24,
43//! });
44//! journal
45//! .commit(txn)
46//! .expect("no pending validation => commit is allowed");
47//! assert_eq!(journal.entries().len(), 2);
48//! assert!(matches!(journal.entries()[1], JournalEntry::Committed { .. }));
49//! ```
50
51use chrono::{DateTime, Utc};
52use serde::{Deserialize, Serialize};
53use std::collections::HashMap;
54
55/// Opaque handle returned from [`WritebackJournal::stage`] and required
56/// by [`WritebackJournal::validate`] / [`WritebackJournal::commit`].
57///
58/// Monotonic per journal instance. Crashing between stage and commit
59/// leaves a `Staged` entry in the journal — the next load can reconcile
60/// or abandon it.
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
62pub struct TxnId(pub u64);
63
64/// Typed operation being journalled.
65///
66/// Extend by adding variants rather than free-form JSON so each new
67/// call site declares what it is doing.
68#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(tag = "op", rename_all = "snake_case")]
70pub enum Op {
71 /// Context-window enforcement rewrote the derived buffer.
72 Compaction { before: usize, after: usize },
73 /// `Session::save` persisted the transcript to disk.
74 Save,
75 /// TUI or orchestrator reset the session (e.g., `/clear`).
76 Reset,
77 /// Lu et al. reset-to-(prompt, summary) context reset (Phase B).
78 ContextReset,
79 /// MinIO history sink uploaded a delta.
80 HistorySinkUpload {
81 bucket: String,
82 key: String,
83 bytes: usize,
84 },
85}
86
87/// Reason codes for a rejected writeback.
88#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
89#[serde(tag = "reason", rename_all = "snake_case")]
90pub enum RejectReason {
91 /// A destructive overwrite was requested without an explicit
92 /// `allow_destructive` flag.
93 DestructiveOp,
94 /// Schema validation failed.
95 SchemaMismatch { detail: String },
96 /// Provenance / authorship check failed.
97 ProvenanceFail { detail: String },
98 /// The op would drop a page below its `min_fidelity` invariant.
99 InvariantViolation { detail: String },
100}
101
102/// A single line in the journal.
103#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
104#[serde(tag = "phase", rename_all = "snake_case")]
105pub enum JournalEntry {
106 /// An op was proposed and reserved a transaction id.
107 Staged {
108 txn: TxnId,
109 at: DateTime<Utc>,
110 op: Op,
111 },
112 /// Validation failed for a staged op. The transaction does not
113 /// commit and the state does not mutate.
114 Rejected {
115 txn: TxnId,
116 at: DateTime<Utc>,
117 reason: RejectReason,
118 },
119 /// A staged op passed validation (or skipped it) and committed.
120 Committed { txn: TxnId, at: DateTime<Utc> },
121}
122
123/// In-memory, per-session writeback journal.
124///
125/// Writes go into [`Self::entries`]; a future `flush_to_disk(path)`
126/// helper will stream them to `<session-id>.journal.jsonl`. Keeping
127/// disk I/O out of this type makes every phase fast to unit-test.
128#[derive(Debug, Clone)]
129pub struct WritebackJournal {
130 session_id: String,
131 next_id: u64,
132 pending: HashMap<TxnId, Op>,
133 entries: Vec<JournalEntry>,
134}
135
136impl WritebackJournal {
137 /// Create a fresh journal for `session_id`.
138 pub fn new(session_id: impl Into<String>) -> Self {
139 Self {
140 session_id: session_id.into(),
141 next_id: 0,
142 pending: HashMap::new(),
143 entries: Vec::new(),
144 }
145 }
146
147 /// The session this journal belongs to.
148 pub fn session_id(&self) -> &str {
149 &self.session_id
150 }
151
152 /// Every entry in the order it was appended.
153 pub fn entries(&self) -> &[JournalEntry] {
154 &self.entries
155 }
156
157 /// Stage an op and return its [`TxnId`]. Writes a `Staged` entry.
158 ///
159 /// # Examples
160 ///
161 /// ```rust
162 /// use codetether_agent::session::journal::{Op, WritebackJournal};
163 ///
164 /// let mut journal = WritebackJournal::new("s");
165 /// let txn = journal.stage(Op::Save);
166 /// assert_eq!(journal.entries().len(), 1);
167 /// journal.commit(txn).unwrap();
168 /// assert_eq!(journal.entries().len(), 2);
169 /// ```
170 pub fn stage(&mut self, op: Op) -> TxnId {
171 let txn = TxnId(self.next_id);
172 self.next_id += 1;
173 self.pending.insert(txn, op.clone());
174 self.entries.push(JournalEntry::Staged {
175 txn,
176 at: Utc::now(),
177 op,
178 });
179 txn
180 }
181
182 /// Mark a staged transaction as rejected with `reason`. The
183 /// transaction is dropped from the pending set — a subsequent
184 /// [`Self::commit`] on the same id returns `Err(RejectReason)`.
185 pub fn reject(&mut self, txn: TxnId, reason: RejectReason) {
186 self.pending.remove(&txn);
187 self.entries.push(JournalEntry::Rejected {
188 txn,
189 at: Utc::now(),
190 reason,
191 });
192 }
193
194 /// Commit a staged transaction that passed validation. Returns
195 /// `Err(RejectReason::SchemaMismatch)` when the transaction is
196 /// unknown (never staged, or already rejected / committed).
197 ///
198 /// # Errors
199 ///
200 /// See above — returns `Err` when the `TxnId` is not pending.
201 pub fn commit(&mut self, txn: TxnId) -> Result<(), RejectReason> {
202 if self.pending.remove(&txn).is_none() {
203 return Err(RejectReason::SchemaMismatch {
204 detail: format!("txn {:?} is not pending", txn),
205 });
206 }
207 self.entries.push(JournalEntry::Committed {
208 txn,
209 at: Utc::now(),
210 });
211 Ok(())
212 }
213
214 /// Number of currently staged (neither committed nor rejected)
215 /// transactions.
216 pub fn pending_count(&self) -> usize {
217 self.pending.len()
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224
225 #[test]
226 fn stage_then_commit_appends_two_entries() {
227 let mut journal = WritebackJournal::new("s1");
228 let txn = journal.stage(Op::Compaction {
229 before: 10,
230 after: 3,
231 });
232 assert_eq!(journal.pending_count(), 1);
233 journal.commit(txn).unwrap();
234 assert_eq!(journal.pending_count(), 0);
235
236 let entries = journal.entries();
237 assert_eq!(entries.len(), 2);
238 assert!(matches!(entries[0], JournalEntry::Staged { .. }));
239 assert!(matches!(entries[1], JournalEntry::Committed { .. }));
240 }
241
242 #[test]
243 fn rejected_writeback_does_not_commit() {
244 let mut journal = WritebackJournal::new("s1");
245 let txn = journal.stage(Op::Reset);
246 journal.reject(txn, RejectReason::DestructiveOp);
247
248 assert_eq!(journal.pending_count(), 0);
249 let entries = journal.entries();
250 assert_eq!(entries.len(), 2);
251 assert!(matches!(entries[1], JournalEntry::Rejected { .. }));
252
253 // Commit on a rejected txn is an error.
254 let err = journal.commit(txn).unwrap_err();
255 assert!(matches!(err, RejectReason::SchemaMismatch { .. }));
256 }
257
258 #[test]
259 fn txn_ids_are_monotonic() {
260 let mut journal = WritebackJournal::new("s1");
261 let a = journal.stage(Op::Save);
262 let b = journal.stage(Op::Save);
263 assert!(b.0 > a.0);
264 }
265
266 #[test]
267 fn journal_entry_round_trips_through_serde() {
268 let entry = JournalEntry::Rejected {
269 txn: TxnId(7),
270 at: Utc::now(),
271 reason: RejectReason::InvariantViolation {
272 detail: "constraint below structured".into(),
273 },
274 };
275 let json = serde_json::to_string(&entry).unwrap();
276 let back: JournalEntry = serde_json::from_str(&json).unwrap();
277 assert_eq!(entry, back);
278 }
279}