use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::io::AsyncWriteExt;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct TxnId(pub u64);
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "op", rename_all = "snake_case")]
pub enum Op {
Compaction { before: usize, after: usize },
Save,
Reset,
ContextReset,
HistorySinkUpload {
bucket: String,
key: String,
bytes: usize,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "reason", rename_all = "snake_case")]
pub enum RejectReason {
DestructiveOp,
SchemaMismatch { detail: String },
ProvenanceFail { detail: String },
InvariantViolation { detail: String },
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "phase", rename_all = "snake_case")]
pub enum JournalEntry {
Staged {
txn: TxnId,
at: DateTime<Utc>,
op: Op,
},
Rejected {
txn: TxnId,
at: DateTime<Utc>,
reason: RejectReason,
},
Committed { txn: TxnId, at: DateTime<Utc> },
}
#[derive(Debug, Clone)]
pub struct WritebackJournal {
session_id: String,
next_id: u64,
pending: HashMap<TxnId, Op>,
entries: Vec<JournalEntry>,
}
impl WritebackJournal {
pub fn new(session_id: impl Into<String>) -> Self {
Self {
session_id: session_id.into(),
next_id: 0,
pending: HashMap::new(),
entries: Vec::new(),
}
}
pub fn session_id(&self) -> &str {
&self.session_id
}
pub fn entries(&self) -> &[JournalEntry] {
&self.entries
}
pub fn stage(&mut self, op: Op) -> TxnId {
let txn = TxnId(self.next_id);
self.next_id += 1;
self.pending.insert(txn, op.clone());
self.entries.push(JournalEntry::Staged {
txn,
at: Utc::now(),
op,
});
txn
}
pub fn reject(&mut self, txn: TxnId, reason: RejectReason) {
self.pending.remove(&txn);
self.entries.push(JournalEntry::Rejected {
txn,
at: Utc::now(),
reason,
});
}
pub fn commit(&mut self, txn: TxnId) -> Result<(), RejectReason> {
if self.pending.remove(&txn).is_none() {
return Err(RejectReason::SchemaMismatch {
detail: format!("txn {:?} is not pending", txn),
});
}
self.entries.push(JournalEntry::Committed {
txn,
at: Utc::now(),
});
Ok(())
}
pub fn pending_count(&self) -> usize {
self.pending.len()
}
}
pub async fn append_entries(session_id: &str, entries: &[JournalEntry]) -> anyhow::Result<()> {
let path = crate::session::Session::sessions_dir()?.join(format!("{session_id}.journal.jsonl"));
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await?;
let mut batch = Vec::new();
for entry in entries {
let line = serde_json::to_string(entry)?;
batch.extend_from_slice(line.as_bytes());
batch.push(b'\n');
}
if !batch.is_empty() {
file.write_all(&batch).await?;
}
file.flush().await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn stage_then_commit_appends_two_entries() {
let mut journal = WritebackJournal::new("s1");
let txn = journal.stage(Op::Compaction {
before: 10,
after: 3,
});
assert_eq!(journal.pending_count(), 1);
journal.commit(txn).unwrap();
assert_eq!(journal.pending_count(), 0);
let entries = journal.entries();
assert_eq!(entries.len(), 2);
assert!(matches!(entries[0], JournalEntry::Staged { .. }));
assert!(matches!(entries[1], JournalEntry::Committed { .. }));
}
#[test]
fn rejected_writeback_does_not_commit() {
let mut journal = WritebackJournal::new("s1");
let txn = journal.stage(Op::Reset);
journal.reject(txn, RejectReason::DestructiveOp);
assert_eq!(journal.pending_count(), 0);
let entries = journal.entries();
assert_eq!(entries.len(), 2);
assert!(matches!(entries[1], JournalEntry::Rejected { .. }));
let err = journal.commit(txn).unwrap_err();
assert!(matches!(err, RejectReason::SchemaMismatch { .. }));
}
#[test]
fn txn_ids_are_monotonic() {
let mut journal = WritebackJournal::new("s1");
let a = journal.stage(Op::Save);
let b = journal.stage(Op::Save);
assert!(b.0 > a.0);
}
#[test]
fn journal_entry_round_trips_through_serde() {
let entry = JournalEntry::Rejected {
txn: TxnId(7),
at: Utc::now(),
reason: RejectReason::InvariantViolation {
detail: "constraint below structured".into(),
},
};
let json = serde_json::to_string(&entry).unwrap();
let back: JournalEntry = serde_json::from_str(&json).unwrap();
assert_eq!(entry, back);
}
}