codetether_agent/session/journal.rs
1//! Validated-writeback journal (ClawVM §3).
2//!
3//! Every lifecycle transition that touches durable state — compaction,
4//! save, reset — goes through a three-phase transaction:
5//!
6//! 1. **Staging.** The caller proposes a typed [`Op`]; the journal
7//! reserves a [`TxnId`] and records the staged entry.
8//! 2. **Validation.** The caller checks schema, provenance, scope,
9//! non-destructive semantics, and page-invariant compliance. The
10//! outcome is recorded.
11//! 3. **Commit.** A validated transaction commits; a rejected one
12//! stays in the journal with its reason code and does *not* mutate
13//! the owning state.
14//!
15//! Rejections are not errors — they are load-bearing evidence that the
16//! enforcement layer caught a would-be regression. ClawVM's LRU
17//! ablation shows fault elimination comes from this structural
18//! contract, not from clever selection heuristics.
19//!
20//! ## Format
21//!
22//! The journal is an append-only JSONL file named
23//! `<session-id>.journal.jsonl` co-located with the session JSON. Each
24//! line is a [`JournalEntry`] — cheap to tail, safe under concurrent
25//! append, and inspectable by hand during debugging.
26//!
27//! ## Phase A scope
28//!
29//! This module delivers the type layer plus the in-memory transaction
30//! state machine. The *consumers* (compaction writeback, page-invariant
31//! validation hooks) come online in Phase B alongside
32//! `DerivePolicy::Incremental`.
33//!
34//! ## Examples
35//!
36//! ```rust
37//! use codetether_agent::session::journal::{JournalEntry, Op, RejectReason, WritebackJournal};
38//!
39//! let mut journal = WritebackJournal::new("session-42");
40//! let txn = journal.stage(Op::Compaction {
41//! before: 120,
42//! after: 24,
43//! });
44//! journal
45//! .commit(txn)
46//! .expect("no pending validation => commit is allowed");
47//! assert_eq!(journal.entries().len(), 2);
48//! assert!(matches!(journal.entries()[1], JournalEntry::Committed { .. }));
49//! ```
50
51use chrono::{DateTime, Utc};
52use serde::{Deserialize, Serialize};
53use std::collections::HashMap;
54use tokio::io::AsyncWriteExt;
55
56/// Opaque handle returned from [`WritebackJournal::stage`] and required
57/// by [`WritebackJournal::validate`] / [`WritebackJournal::commit`].
58///
59/// Monotonic per journal instance. Crashing between stage and commit
60/// leaves a `Staged` entry in the journal — the next load can reconcile
61/// or abandon it.
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
63pub struct TxnId(pub u64);
64
65/// Typed operation being journalled.
66///
67/// Extend by adding variants rather than free-form JSON so each new
68/// call site declares what it is doing.
69#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
70#[serde(tag = "op", rename_all = "snake_case")]
71pub enum Op {
72 /// Context-window enforcement rewrote the derived buffer.
73 Compaction { before: usize, after: usize },
74 /// `Session::save` persisted the transcript to disk.
75 Save,
76 /// TUI or orchestrator reset the session (e.g., `/clear`).
77 Reset,
78 /// Lu et al. reset-to-(prompt, summary) context reset (Phase B).
79 ContextReset,
80 /// MinIO history sink uploaded a delta.
81 HistorySinkUpload {
82 bucket: String,
83 key: String,
84 bytes: usize,
85 },
86}
87
88/// Reason codes for a rejected writeback.
89#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
90#[serde(tag = "reason", rename_all = "snake_case")]
91pub enum RejectReason {
92 /// A destructive overwrite was requested without an explicit
93 /// `allow_destructive` flag.
94 DestructiveOp,
95 /// Schema validation failed.
96 SchemaMismatch { detail: String },
97 /// Provenance / authorship check failed.
98 ProvenanceFail { detail: String },
99 /// The op would drop a page below its `min_fidelity` invariant.
100 InvariantViolation { detail: String },
101}
102
103/// A single line in the journal.
104#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
105#[serde(tag = "phase", rename_all = "snake_case")]
106pub enum JournalEntry {
107 /// An op was proposed and reserved a transaction id.
108 Staged {
109 txn: TxnId,
110 at: DateTime<Utc>,
111 op: Op,
112 },
113 /// Validation failed for a staged op. The transaction does not
114 /// commit and the state does not mutate.
115 Rejected {
116 txn: TxnId,
117 at: DateTime<Utc>,
118 reason: RejectReason,
119 },
120 /// A staged op passed validation (or skipped it) and committed.
121 Committed { txn: TxnId, at: DateTime<Utc> },
122}
123
124/// In-memory, per-session writeback journal.
125///
126/// Writes go into [`Self::entries`]; a future `flush_to_disk(path)`
127/// helper will stream them to `<session-id>.journal.jsonl`. Keeping
128/// disk I/O out of this type makes every phase fast to unit-test.
129#[derive(Debug, Clone)]
130pub struct WritebackJournal {
131 session_id: String,
132 next_id: u64,
133 pending: HashMap<TxnId, Op>,
134 entries: Vec<JournalEntry>,
135}
136
137impl WritebackJournal {
138 /// Create a fresh journal for `session_id`.
139 pub fn new(session_id: impl Into<String>) -> Self {
140 Self {
141 session_id: session_id.into(),
142 next_id: 0,
143 pending: HashMap::new(),
144 entries: Vec::new(),
145 }
146 }
147
148 /// The session this journal belongs to.
149 pub fn session_id(&self) -> &str {
150 &self.session_id
151 }
152
153 /// Every entry in the order it was appended.
154 pub fn entries(&self) -> &[JournalEntry] {
155 &self.entries
156 }
157
158 /// Stage an op and return its [`TxnId`]. Writes a `Staged` entry.
159 ///
160 /// # Examples
161 ///
162 /// ```rust
163 /// use codetether_agent::session::journal::{Op, WritebackJournal};
164 ///
165 /// let mut journal = WritebackJournal::new("s");
166 /// let txn = journal.stage(Op::Save);
167 /// assert_eq!(journal.entries().len(), 1);
168 /// journal.commit(txn).unwrap();
169 /// assert_eq!(journal.entries().len(), 2);
170 /// ```
171 pub fn stage(&mut self, op: Op) -> TxnId {
172 let txn = TxnId(self.next_id);
173 self.next_id += 1;
174 self.pending.insert(txn, op.clone());
175 self.entries.push(JournalEntry::Staged {
176 txn,
177 at: Utc::now(),
178 op,
179 });
180 txn
181 }
182
183 /// Mark a staged transaction as rejected with `reason`. The
184 /// transaction is dropped from the pending set — a subsequent
185 /// [`Self::commit`] on the same id returns `Err(RejectReason)`.
186 pub fn reject(&mut self, txn: TxnId, reason: RejectReason) {
187 self.pending.remove(&txn);
188 self.entries.push(JournalEntry::Rejected {
189 txn,
190 at: Utc::now(),
191 reason,
192 });
193 }
194
195 /// Commit a staged transaction that passed validation. Returns
196 /// `Err(RejectReason::SchemaMismatch)` when the transaction is
197 /// unknown (never staged, or already rejected / committed).
198 ///
199 /// # Errors
200 ///
201 /// See above — returns `Err` when the `TxnId` is not pending.
202 pub fn commit(&mut self, txn: TxnId) -> Result<(), RejectReason> {
203 if self.pending.remove(&txn).is_none() {
204 return Err(RejectReason::SchemaMismatch {
205 detail: format!("txn {:?} is not pending", txn),
206 });
207 }
208 self.entries.push(JournalEntry::Committed {
209 txn,
210 at: Utc::now(),
211 });
212 Ok(())
213 }
214
215 /// Number of currently staged (neither committed nor rejected)
216 /// transactions.
217 pub fn pending_count(&self) -> usize {
218 self.pending.len()
219 }
220}
221
222/// Append journal entries to `<session-id>.journal.jsonl`.
223///
224/// Best-effort durability helper used by lifecycle call sites such as
225/// `Session::save`. The journal remains append-only: callers hand us a
226/// pre-built ordered slice and we stream it as JSONL.
227pub async fn append_entries(session_id: &str, entries: &[JournalEntry]) -> anyhow::Result<()> {
228 let path = crate::session::Session::sessions_dir()?.join(format!("{session_id}.journal.jsonl"));
229 if let Some(parent) = path.parent() {
230 tokio::fs::create_dir_all(parent).await?;
231 }
232 let mut file = tokio::fs::OpenOptions::new()
233 .create(true)
234 .append(true)
235 .open(&path)
236 .await?;
237 let mut batch = Vec::new();
238 for entry in entries {
239 let line = serde_json::to_string(entry)?;
240 batch.extend_from_slice(line.as_bytes());
241 batch.push(b'\n');
242 }
243 if !batch.is_empty() {
244 file.write_all(&batch).await?;
245 }
246 file.flush().await?;
247 Ok(())
248}
249
250#[cfg(test)]
251mod tests {
252 use super::*;
253
254 #[test]
255 fn stage_then_commit_appends_two_entries() {
256 let mut journal = WritebackJournal::new("s1");
257 let txn = journal.stage(Op::Compaction {
258 before: 10,
259 after: 3,
260 });
261 assert_eq!(journal.pending_count(), 1);
262 journal.commit(txn).unwrap();
263 assert_eq!(journal.pending_count(), 0);
264
265 let entries = journal.entries();
266 assert_eq!(entries.len(), 2);
267 assert!(matches!(entries[0], JournalEntry::Staged { .. }));
268 assert!(matches!(entries[1], JournalEntry::Committed { .. }));
269 }
270
271 #[test]
272 fn rejected_writeback_does_not_commit() {
273 let mut journal = WritebackJournal::new("s1");
274 let txn = journal.stage(Op::Reset);
275 journal.reject(txn, RejectReason::DestructiveOp);
276
277 assert_eq!(journal.pending_count(), 0);
278 let entries = journal.entries();
279 assert_eq!(entries.len(), 2);
280 assert!(matches!(entries[1], JournalEntry::Rejected { .. }));
281
282 // Commit on a rejected txn is an error.
283 let err = journal.commit(txn).unwrap_err();
284 assert!(matches!(err, RejectReason::SchemaMismatch { .. }));
285 }
286
287 #[test]
288 fn txn_ids_are_monotonic() {
289 let mut journal = WritebackJournal::new("s1");
290 let a = journal.stage(Op::Save);
291 let b = journal.stage(Op::Save);
292 assert!(b.0 > a.0);
293 }
294
295 #[test]
296 fn journal_entry_round_trips_through_serde() {
297 let entry = JournalEntry::Rejected {
298 txn: TxnId(7),
299 at: Utc::now(),
300 reason: RejectReason::InvariantViolation {
301 detail: "constraint below structured".into(),
302 },
303 };
304 let json = serde_json::to_string(&entry).unwrap();
305 let back: JournalEntry = serde_json::from_str(&json).unwrap();
306 assert_eq!(entry, back);
307 }
308}