Skip to main content

codetether_agent/session/
journal.rs

1//! Validated-writeback journal (ClawVM §3).
2//!
3//! Every lifecycle transition that touches durable state — compaction,
4//! save, reset — goes through a three-phase transaction:
5//!
6//! 1. **Staging.** The caller proposes a typed [`Op`]; the journal
7//!    reserves a [`TxnId`] and records the staged entry.
8//! 2. **Validation.** The caller checks schema, provenance, scope,
9//!    non-destructive semantics, and page-invariant compliance. The
10//!    outcome is recorded.
11//! 3. **Commit.** A validated transaction commits; a rejected one
12//!    stays in the journal with its reason code and does *not* mutate
13//!    the owning state.
14//!
15//! Rejections are not errors — they are load-bearing evidence that the
16//! enforcement layer caught a would-be regression. ClawVM's LRU
17//! ablation shows fault elimination comes from this structural
18//! contract, not from clever selection heuristics.
19//!
20//! ## Format
21//!
22//! The journal is an append-only JSONL file named
23//! `<session-id>.journal.jsonl` co-located with the session JSON. Each
24//! line is a [`JournalEntry`] — cheap to tail, safe under concurrent
25//! append, and inspectable by hand during debugging.
26//!
27//! ## Phase A scope
28//!
29//! This module delivers the type layer plus the in-memory transaction
30//! state machine. The *consumers* (compaction writeback, page-invariant
31//! validation hooks) come online in Phase B alongside
32//! `DerivePolicy::Incremental`.
33//!
34//! ## Examples
35//!
36//! ```rust
37//! use codetether_agent::session::journal::{JournalEntry, Op, RejectReason, WritebackJournal};
38//!
39//! let mut journal = WritebackJournal::new("session-42");
40//! let txn = journal.stage(Op::Compaction {
41//!     before: 120,
42//!     after: 24,
43//! });
44//! journal
45//!     .commit(txn)
46//!     .expect("no pending validation => commit is allowed");
47//! assert_eq!(journal.entries().len(), 2);
48//! assert!(matches!(journal.entries()[1], JournalEntry::Committed { .. }));
49//! ```
50
51use chrono::{DateTime, Utc};
52use serde::{Deserialize, Serialize};
53use std::collections::HashMap;
54use tokio::io::AsyncWriteExt;
55
56/// Opaque handle returned from [`WritebackJournal::stage`] and required
57/// by [`WritebackJournal::validate`] / [`WritebackJournal::commit`].
58///
59/// Monotonic per journal instance. Crashing between stage and commit
60/// leaves a `Staged` entry in the journal — the next load can reconcile
61/// or abandon it.
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
63pub struct TxnId(pub u64);
64
65/// Typed operation being journalled.
66///
67/// Extend by adding variants rather than free-form JSON so each new
68/// call site declares what it is doing.
69#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
70#[serde(tag = "op", rename_all = "snake_case")]
71pub enum Op {
72    /// Context-window enforcement rewrote the derived buffer.
73    Compaction { before: usize, after: usize },
74    /// `Session::save` persisted the transcript to disk.
75    Save,
76    /// TUI or orchestrator reset the session (e.g., `/clear`).
77    Reset,
78    /// Lu et al. reset-to-(prompt, summary) context reset (Phase B).
79    ContextReset,
80    /// MinIO history sink uploaded a delta.
81    HistorySinkUpload {
82        bucket: String,
83        key: String,
84        bytes: usize,
85    },
86}
87
88/// Reason codes for a rejected writeback.
89#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
90#[serde(tag = "reason", rename_all = "snake_case")]
91pub enum RejectReason {
92    /// A destructive overwrite was requested without an explicit
93    /// `allow_destructive` flag.
94    DestructiveOp,
95    /// Schema validation failed.
96    SchemaMismatch { detail: String },
97    /// Provenance / authorship check failed.
98    ProvenanceFail { detail: String },
99    /// The op would drop a page below its `min_fidelity` invariant.
100    InvariantViolation { detail: String },
101}
102
103/// A single line in the journal.
104#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
105#[serde(tag = "phase", rename_all = "snake_case")]
106pub enum JournalEntry {
107    /// An op was proposed and reserved a transaction id.
108    Staged {
109        txn: TxnId,
110        at: DateTime<Utc>,
111        op: Op,
112    },
113    /// Validation failed for a staged op. The transaction does not
114    /// commit and the state does not mutate.
115    Rejected {
116        txn: TxnId,
117        at: DateTime<Utc>,
118        reason: RejectReason,
119    },
120    /// A staged op passed validation (or skipped it) and committed.
121    Committed { txn: TxnId, at: DateTime<Utc> },
122}
123
124/// In-memory, per-session writeback journal.
125///
126/// Writes go into [`Self::entries`]; a future `flush_to_disk(path)`
127/// helper will stream them to `<session-id>.journal.jsonl`. Keeping
128/// disk I/O out of this type makes every phase fast to unit-test.
129#[derive(Debug, Clone)]
130pub struct WritebackJournal {
131    session_id: String,
132    next_id: u64,
133    pending: HashMap<TxnId, Op>,
134    entries: Vec<JournalEntry>,
135}
136
137impl WritebackJournal {
138    /// Create a fresh journal for `session_id`.
139    pub fn new(session_id: impl Into<String>) -> Self {
140        Self {
141            session_id: session_id.into(),
142            next_id: 0,
143            pending: HashMap::new(),
144            entries: Vec::new(),
145        }
146    }
147
148    /// The session this journal belongs to.
149    pub fn session_id(&self) -> &str {
150        &self.session_id
151    }
152
153    /// Every entry in the order it was appended.
154    pub fn entries(&self) -> &[JournalEntry] {
155        &self.entries
156    }
157
158    /// Stage an op and return its [`TxnId`]. Writes a `Staged` entry.
159    ///
160    /// # Examples
161    ///
162    /// ```rust
163    /// use codetether_agent::session::journal::{Op, WritebackJournal};
164    ///
165    /// let mut journal = WritebackJournal::new("s");
166    /// let txn = journal.stage(Op::Save);
167    /// assert_eq!(journal.entries().len(), 1);
168    /// journal.commit(txn).unwrap();
169    /// assert_eq!(journal.entries().len(), 2);
170    /// ```
171    pub fn stage(&mut self, op: Op) -> TxnId {
172        let txn = TxnId(self.next_id);
173        self.next_id += 1;
174        self.pending.insert(txn, op.clone());
175        self.entries.push(JournalEntry::Staged {
176            txn,
177            at: Utc::now(),
178            op,
179        });
180        txn
181    }
182
183    /// Mark a staged transaction as rejected with `reason`. The
184    /// transaction is dropped from the pending set — a subsequent
185    /// [`Self::commit`] on the same id returns `Err(RejectReason)`.
186    pub fn reject(&mut self, txn: TxnId, reason: RejectReason) {
187        self.pending.remove(&txn);
188        self.entries.push(JournalEntry::Rejected {
189            txn,
190            at: Utc::now(),
191            reason,
192        });
193    }
194
195    /// Commit a staged transaction that passed validation. Returns
196    /// `Err(RejectReason::SchemaMismatch)` when the transaction is
197    /// unknown (never staged, or already rejected / committed).
198    ///
199    /// # Errors
200    ///
201    /// See above — returns `Err` when the `TxnId` is not pending.
202    pub fn commit(&mut self, txn: TxnId) -> Result<(), RejectReason> {
203        if self.pending.remove(&txn).is_none() {
204            return Err(RejectReason::SchemaMismatch {
205                detail: format!("txn {:?} is not pending", txn),
206            });
207        }
208        self.entries.push(JournalEntry::Committed {
209            txn,
210            at: Utc::now(),
211        });
212        Ok(())
213    }
214
215    /// Number of currently staged (neither committed nor rejected)
216    /// transactions.
217    pub fn pending_count(&self) -> usize {
218        self.pending.len()
219    }
220}
221
222/// Append journal entries to `<session-id>.journal.jsonl`.
223///
224/// Best-effort durability helper used by lifecycle call sites such as
225/// `Session::save`. The journal remains append-only: callers hand us a
226/// pre-built ordered slice and we stream it as JSONL.
227pub async fn append_entries(session_id: &str, entries: &[JournalEntry]) -> anyhow::Result<()> {
228    let path = crate::session::Session::sessions_dir()?.join(format!("{session_id}.journal.jsonl"));
229    if let Some(parent) = path.parent() {
230        tokio::fs::create_dir_all(parent).await?;
231    }
232    let mut file = tokio::fs::OpenOptions::new()
233        .create(true)
234        .append(true)
235        .open(&path)
236        .await?;
237    let mut batch = Vec::new();
238    for entry in entries {
239        let line = serde_json::to_string(entry)?;
240        batch.extend_from_slice(line.as_bytes());
241        batch.push(b'\n');
242    }
243    if !batch.is_empty() {
244        file.write_all(&batch).await?;
245    }
246    file.flush().await?;
247    Ok(())
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253
254    #[test]
255    fn stage_then_commit_appends_two_entries() {
256        let mut journal = WritebackJournal::new("s1");
257        let txn = journal.stage(Op::Compaction {
258            before: 10,
259            after: 3,
260        });
261        assert_eq!(journal.pending_count(), 1);
262        journal.commit(txn).unwrap();
263        assert_eq!(journal.pending_count(), 0);
264
265        let entries = journal.entries();
266        assert_eq!(entries.len(), 2);
267        assert!(matches!(entries[0], JournalEntry::Staged { .. }));
268        assert!(matches!(entries[1], JournalEntry::Committed { .. }));
269    }
270
271    #[test]
272    fn rejected_writeback_does_not_commit() {
273        let mut journal = WritebackJournal::new("s1");
274        let txn = journal.stage(Op::Reset);
275        journal.reject(txn, RejectReason::DestructiveOp);
276
277        assert_eq!(journal.pending_count(), 0);
278        let entries = journal.entries();
279        assert_eq!(entries.len(), 2);
280        assert!(matches!(entries[1], JournalEntry::Rejected { .. }));
281
282        // Commit on a rejected txn is an error.
283        let err = journal.commit(txn).unwrap_err();
284        assert!(matches!(err, RejectReason::SchemaMismatch { .. }));
285    }
286
287    #[test]
288    fn txn_ids_are_monotonic() {
289        let mut journal = WritebackJournal::new("s1");
290        let a = journal.stage(Op::Save);
291        let b = journal.stage(Op::Save);
292        assert!(b.0 > a.0);
293    }
294
295    #[test]
296    fn journal_entry_round_trips_through_serde() {
297        let entry = JournalEntry::Rejected {
298            txn: TxnId(7),
299            at: Utc::now(),
300            reason: RejectReason::InvariantViolation {
301                detail: "constraint below structured".into(),
302            },
303        };
304        let json = serde_json::to_string(&entry).unwrap();
305        let back: JournalEntry = serde_json::from_str(&json).unwrap();
306        assert_eq!(entry, back);
307    }
308}