use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct TxnId(pub u64);
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "op", rename_all = "snake_case")]
pub enum Op {
Compaction { before: usize, after: usize },
Save,
Reset,
ContextReset,
HistorySinkUpload {
bucket: String,
key: String,
bytes: usize,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "reason", rename_all = "snake_case")]
pub enum RejectReason {
DestructiveOp,
SchemaMismatch { detail: String },
ProvenanceFail { detail: String },
InvariantViolation { detail: String },
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "phase", rename_all = "snake_case")]
pub enum JournalEntry {
Staged {
txn: TxnId,
at: DateTime<Utc>,
op: Op,
},
Rejected {
txn: TxnId,
at: DateTime<Utc>,
reason: RejectReason,
},
Committed { txn: TxnId, at: DateTime<Utc> },
}
#[derive(Debug, Clone)]
pub struct WritebackJournal {
session_id: String,
next_id: u64,
pending: HashMap<TxnId, Op>,
entries: Vec<JournalEntry>,
}
impl WritebackJournal {
pub fn new(session_id: impl Into<String>) -> Self {
Self {
session_id: session_id.into(),
next_id: 0,
pending: HashMap::new(),
entries: Vec::new(),
}
}
pub fn session_id(&self) -> &str {
&self.session_id
}
pub fn entries(&self) -> &[JournalEntry] {
&self.entries
}
pub fn stage(&mut self, op: Op) -> TxnId {
let txn = TxnId(self.next_id);
self.next_id += 1;
self.pending.insert(txn, op.clone());
self.entries.push(JournalEntry::Staged {
txn,
at: Utc::now(),
op,
});
txn
}
pub fn reject(&mut self, txn: TxnId, reason: RejectReason) {
self.pending.remove(&txn);
self.entries.push(JournalEntry::Rejected {
txn,
at: Utc::now(),
reason,
});
}
pub fn commit(&mut self, txn: TxnId) -> Result<(), RejectReason> {
if self.pending.remove(&txn).is_none() {
return Err(RejectReason::SchemaMismatch {
detail: format!("txn {:?} is not pending", txn),
});
}
self.entries.push(JournalEntry::Committed {
txn,
at: Utc::now(),
});
Ok(())
}
pub fn pending_count(&self) -> usize {
self.pending.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn stage_then_commit_appends_two_entries() {
let mut journal = WritebackJournal::new("s1");
let txn = journal.stage(Op::Compaction {
before: 10,
after: 3,
});
assert_eq!(journal.pending_count(), 1);
journal.commit(txn).unwrap();
assert_eq!(journal.pending_count(), 0);
let entries = journal.entries();
assert_eq!(entries.len(), 2);
assert!(matches!(entries[0], JournalEntry::Staged { .. }));
assert!(matches!(entries[1], JournalEntry::Committed { .. }));
}
#[test]
fn rejected_writeback_does_not_commit() {
let mut journal = WritebackJournal::new("s1");
let txn = journal.stage(Op::Reset);
journal.reject(txn, RejectReason::DestructiveOp);
assert_eq!(journal.pending_count(), 0);
let entries = journal.entries();
assert_eq!(entries.len(), 2);
assert!(matches!(entries[1], JournalEntry::Rejected { .. }));
let err = journal.commit(txn).unwrap_err();
assert!(matches!(err, RejectReason::SchemaMismatch { .. }));
}
#[test]
fn txn_ids_are_monotonic() {
let mut journal = WritebackJournal::new("s1");
let a = journal.stage(Op::Save);
let b = journal.stage(Op::Save);
assert!(b.0 > a.0);
}
#[test]
fn journal_entry_round_trips_through_serde() {
let entry = JournalEntry::Rejected {
txn: TxnId(7),
at: Utc::now(),
reason: RejectReason::InvariantViolation {
detail: "constraint below structured".into(),
},
};
let json = serde_json::to_string(&entry).unwrap();
let back: JournalEntry = serde_json::from_str(&json).unwrap();
assert_eq!(entry, back);
}
}