use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use chrono::DateTime;
use chrono::Utc;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use talea_core::events::LedgerEvent;
use talea_core::store::{AccountCfg, Committed, Sequenced, StoreError, ledger_now};
use talea_core::types::{AccountDef, AssetDef, Seq, Transaction};
use crate::frame::{WireEvent, encode_frame};
use crate::segment::{SegmentCatalog, SegmentSet};
use crate::snapshot;
use crate::state::{BookState, CommittedRec, FramePos, Scratch};
use std::collections::HashMap as StdHashMap;
pub enum Job {
Commit(Transaction, oneshot::Sender<Result<Committed, StoreError>>),
OpenAccount(
AccountDef,
AccountCfg,
oneshot::Sender<Result<(), StoreError>>,
),
RegisterAsset(AssetDef, oneshot::Sender<Result<(), StoreError>>),
Snapshot(oneshot::Sender<Result<(), StoreError>>),
}
#[derive(Clone)]
pub struct BookWriter {
tx: mpsc::Sender<Job>,
pub events: broadcast::Sender<Sequenced<LedgerEvent>>,
pub state: Arc<RwLock<BookState>>,
pub catalog: SegmentCatalog,
handle: Arc<Mutex<Option<JoinHandle<()>>>>,
}
impl BookWriter {
pub const DEFAULT_SNAPSHOT_EVERY: u64 = 100_000;
pub async fn spawn(
dir: PathBuf,
state: Arc<RwLock<BookState>>,
batch_max: usize,
) -> std::io::Result<Self> {
Self::spawn_with(dir, state, batch_max, Self::DEFAULT_SNAPSHOT_EVERY).await
}
pub async fn spawn_with(
dir: PathBuf,
state: Arc<RwLock<BookState>>,
batch_max: usize,
snapshot_every: u64,
) -> std::io::Result<Self> {
Self::spawn_with_opts(
dir,
state,
batch_max,
snapshot_every,
crate::segment::DEFAULT_SEGMENT_MAX,
)
.await
}
pub async fn spawn_with_opts(
dir: PathBuf,
state: Arc<RwLock<BookState>>,
batch_max: usize,
snapshot_every: u64,
segment_max: u64,
) -> std::io::Result<Self> {
Self::spawn_inner(
dir,
state,
batch_max,
snapshot_every,
segment_max,
None::<std::sync::Arc<dyn Fn() -> std::io::Result<()> + Send + Sync>>,
)
.await
}
#[cfg(test)]
pub(crate) async fn spawn_for_test(
dir: PathBuf,
state: Arc<RwLock<BookState>>,
batch_max: usize,
hook: Option<std::sync::Arc<dyn Fn() -> std::io::Result<()> + Send + Sync>>,
) -> std::io::Result<Self> {
Self::spawn_inner(
dir,
state,
batch_max,
0,
crate::segment::DEFAULT_SEGMENT_MAX,
hook,
)
.await
}
async fn spawn_inner(
dir: PathBuf,
state: Arc<RwLock<BookState>>,
batch_max: usize,
snapshot_every: u64,
segment_max: u64,
_sync_hook: Option<std::sync::Arc<dyn Fn() -> std::io::Result<()> + Send + Sync>>,
) -> std::io::Result<Self> {
{
let st = state.read().await;
if st
.writer_attached
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return Err(std::io::Error::other(
"a BookWriter is already attached to this BookState \
— single-writer contract violated",
));
}
}
#[allow(unused_mut)]
let mut segments = SegmentSet::open_with_max(&dir, segment_max).await?;
#[cfg(test)]
segments.set_sync_hook(_sync_hook);
let catalog = segments.catalog();
let (tx, rx) = mpsc::channel::<Job>(batch_max.max(64) * 4);
let (ev_tx, _) = broadcast::channel::<Sequenced<LedgerEvent>>(1024);
let state2 = Arc::clone(&state);
let ev_tx2 = ev_tx.clone();
let snap_inflight: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let handle = tokio::spawn(run_loop(
rx,
segments,
state2,
ev_tx2,
batch_max,
dir,
snapshot_every,
snap_inflight,
));
Ok(Self {
tx,
events: ev_tx,
state,
catalog,
handle: Arc::new(Mutex::new(Some(handle))),
})
}
pub async fn submit(&self, job: Job) -> Result<(), StoreError> {
self.tx
.send(job)
.await
.map_err(|_| StoreError::Io("book writer gone".into()))
}
pub async fn commit(&self, t: Transaction) -> Result<Committed, StoreError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.submit(Job::Commit(t, reply_tx)).await?;
reply_rx
.await
.map_err(|_| StoreError::Io("book writer gone".into()))?
}
pub fn close(&self) {}
pub async fn join(&self) {
let maybe_handle = self.handle.lock().await.take();
if let Some(h) = maybe_handle {
let _ = h.await;
}
}
pub async fn shutdown(self) {
let maybe_handle = self.handle.lock().await.take();
drop(self);
if let Some(h) = maybe_handle {
let _ = h.await;
}
}
}
enum Reply {
Dup(usize, CommittedRec),
DupInBatch { job_idx: usize, staged_slot: usize },
Reject(usize, StoreError),
OpenExistsOk(usize),
Staged { staged_slot: usize },
}
struct Staged {
job_idx: usize,
wire: WireEvent,
pos: FramePos,
}
#[allow(clippy::too_many_arguments)]
async fn run_loop(
mut rx: mpsc::Receiver<Job>,
mut segments: SegmentSet,
state: Arc<RwLock<BookState>>,
events: broadcast::Sender<Sequenced<LedgerEvent>>,
batch_max: usize,
dir: PathBuf,
snapshot_every: u64,
snap_inflight: Arc<AtomicBool>,
) {
let mut events_since_snap: u64 = 0;
loop {
let first = match rx.recv().await {
Some(j) => j,
None => return, };
let mut jobs: Vec<Option<Job>> = Vec::with_capacity(batch_max);
jobs.push(Some(first));
while jobs.len() < batch_max {
match rx.try_recv() {
Ok(j) => jobs.push(Some(j)),
Err(_) => break,
}
}
let mut replies: Vec<Reply> = Vec::with_capacity(jobs.len());
let mut staged: Vec<Staged> = Vec::new();
let mut scratch = Scratch::default();
let mut snap_job_idxs: Vec<usize> = Vec::new();
let bloom_positive_keys: Vec<String> = {
let st = state.read().await;
let mut bp = Vec::new();
for slot in jobs.iter() {
let Some(job) = slot.as_ref() else { continue };
if let Job::Commit(tx, _) = job {
let key = tx.idempotency_key.0.as_str();
if st.idem.get_hot(key).is_none() && st.idem.bloom_might_contain(key) {
bp.push(tx.idempotency_key.0.clone());
}
}
}
bp
};
let mut disk_resolved: StdHashMap<String, CommittedRec> = StdHashMap::new();
for key in &bloom_positive_keys {
let st = state.read().await;
let idem_ref: &crate::idem_spill::TieredIdem = &st.idem;
if let Some(rec) = idem_ref.get_hot(key) {
disk_resolved.insert(key.clone(), rec.clone());
continue;
}
let runs_snapshot = idem_ref.runs.clone();
let key_clone = key.clone();
drop(st);
let tmp_tiered = {
let mut t = crate::idem_spill::TieredIdem::with_cap(1);
t.runs = runs_snapshot;
t
};
if let Some(rec) = tmp_tiered.lookup_runs(&key_clone).await {
disk_resolved.insert(key_clone, rec);
}
}
let mut fatal: Option<String> = None;
{
let st = state.read().await;
let mut batch_at: DateTime<Utc> = {
let floor = st.last_at.unwrap_or_default();
ledger_now().max(floor)
};
let mut seq: Seq = st.next_seq;
for (idx, slot) in jobs.iter().enumerate() {
let Some(job) = slot.as_ref() else { continue };
match job {
Job::Commit(tx, _) => {
let idem_key = tx.idempotency_key.0.clone();
let committed_rec = st
.idem
.get_hot(&idem_key)
.cloned()
.or_else(|| disk_resolved.get(&idem_key).cloned());
if let Some(rec) = committed_rec {
replies.push(Reply::Dup(idx, rec));
continue;
}
if let Some(&staged_slot) = scratch.idem.get(&idem_key) {
replies.push(Reply::DupInBatch {
job_idx: idx,
staged_slot,
});
continue;
}
if let Err(e) = st.validate(tx, &mut scratch) {
replies.push(Reply::Reject(idx, e));
continue;
}
if let Err(e) = scratch.stage(tx) {
fatal = Some(e);
break;
}
batch_at = batch_at.max(ledger_now());
let staged_slot = staged.len();
staged.push(Staged {
job_idx: idx,
wire: WireEvent {
seq,
at: batch_at,
event: LedgerEvent::TransactionPosted(tx.clone()),
},
pos: (0, 0),
});
scratch.idem.insert(idem_key, staged_slot);
seq += 1;
replies.push(Reply::Staged { staged_slot });
}
Job::OpenAccount(def, cfg, _) => {
let key = def.id.to_key();
if let Some(existing) = st.accounts.get(&key) {
if existing.def == *def && existing.cfg == *cfg {
replies.push(Reply::OpenExistsOk(idx));
} else {
replies.push(Reply::Reject(
idx,
StoreError::AlreadyExists {
what: format!("account {key}"),
},
));
}
continue;
}
batch_at = batch_at.max(ledger_now());
let staged_slot = staged.len();
staged.push(Staged {
job_idx: idx,
wire: WireEvent {
seq,
at: batch_at,
event: LedgerEvent::AccountOpened {
def: def.clone(),
cfg: cfg.clone(),
},
},
pos: (0, 0),
});
seq += 1;
replies.push(Reply::Staged { staged_slot });
}
Job::RegisterAsset(def, _) => {
batch_at = batch_at.max(ledger_now());
let staged_slot = staged.len();
staged.push(Staged {
job_idx: idx,
wire: WireEvent {
seq,
at: batch_at,
event: LedgerEvent::AssetRegistered(def.clone()),
},
pos: (0, 0),
});
seq += 1;
replies.push(Reply::Staged { staged_slot });
}
Job::Snapshot(_) => {
snap_job_idxs.push(idx);
}
}
}
}
if let Some(e) = fatal {
io_kill_batch(jobs, std::io::Error::other(e));
return;
}
if !staged.is_empty() {
for s in &mut staged {
let frame_bytes = match encode_frame(&s.wire) {
Ok(b) => b,
Err(e) => {
io_kill_batch(jobs, std::io::Error::other(e));
return;
}
};
if let Err(e) = segments.maybe_rotate(s.wire.seq).await {
io_kill_batch(jobs, e);
return;
}
s.pos = segments.next_pos();
if let Err(e) = segments.append(&frame_bytes).await {
io_kill_batch(jobs, e);
return;
}
}
if let Err(e) = segments.sync().await {
io_kill_batch(jobs, e);
return;
}
}
let applied_count = staged.len() as u64;
if !staged.is_empty() {
let mut st = state.write().await;
for s in &staged {
match &s.wire.event {
LedgerEvent::TransactionPosted(tx) => {
if let Err(e) = st.try_apply_transaction(tx, s.wire.seq, s.wire.at, s.pos) {
drop(st);
io_kill_batch(jobs, std::io::Error::other(e));
return;
}
}
LedgerEvent::AccountOpened { def, cfg } => {
st.apply_account_opened(def, cfg, s.wire.seq, s.wire.at);
}
LedgerEvent::AssetRegistered(_) => {
st.bump_seq(s.wire.seq, s.wire.at);
}
}
}
}
{
let needs_flush = {
let st = state.read().await;
st.idem.needs_flush()
};
if needs_flush {
let mut st = state.write().await;
st.idem.flush_spill().await;
}
}
for reply in replies {
match reply {
Reply::Staged { staged_slot } => {
let s = &staged[staged_slot];
let _ = events.send(Sequenced {
seq: s.wire.seq,
at: s.wire.at,
event: s.wire.event.clone(),
});
let Some(job) = jobs[s.job_idx].take() else {
continue;
};
match job {
Job::Commit(tx, reply_tx) => {
let _ = reply_tx.send(Ok(Committed {
txid: tx.id,
seq: s.wire.seq,
at: s.wire.at,
}));
}
Job::OpenAccount(_, _, reply_tx) => {
let _ = reply_tx.send(Ok(()));
}
Job::RegisterAsset(_, reply_tx) => {
let _ = reply_tx.send(Ok(()));
}
Job::Snapshot(_) => {
unreachable!("Snapshot never in staged");
}
}
}
Reply::Dup(idx, rec) => {
if let Some(Job::Commit(_, reply_tx)) = jobs[idx].take() {
let _ = reply_tx.send(Ok(Committed::from(&rec)));
}
}
Reply::DupInBatch {
job_idx,
staged_slot,
} => {
let s = &staged[staged_slot];
let first_txid = match &s.wire.event {
LedgerEvent::TransactionPosted(t) => t.id.clone(),
_ => unreachable!("DupInBatch staged slot is always a TransactionPosted"),
};
if let Some(Job::Commit(_, reply_tx)) = jobs[job_idx].take() {
let _ = reply_tx.send(Ok(Committed {
txid: first_txid,
seq: s.wire.seq,
at: s.wire.at,
}));
}
}
Reply::OpenExistsOk(idx) => {
if let Some(Job::OpenAccount(_, _, reply_tx)) = jobs[idx].take() {
let _ = reply_tx.send(Ok(()));
}
}
Reply::Reject(idx, e) => {
match jobs[idx].take() {
Some(Job::Commit(_, reply_tx)) => {
let _ = reply_tx.send(Err(e));
}
Some(Job::OpenAccount(_, _, reply_tx)) => {
let _ = reply_tx.send(Err(e));
}
Some(Job::RegisterAsset(_, reply_tx)) => {
let _ = reply_tx.send(Err(e));
}
Some(Job::Snapshot(reply_tx)) => {
let _ = reply_tx.send(Err(e));
}
None => {}
}
}
}
}
let has_explicit_snap = !snap_job_idxs.is_empty();
let hit_periodic = snapshot_every > 0 && applied_count > 0 && {
events_since_snap += applied_count;
events_since_snap >= snapshot_every
};
if has_explicit_snap {
let (snap_state, snap_seq) = {
let st = state.read().await;
let last = st.next_seq.saturating_sub(1);
(st.clone(), last)
};
let snap_result = if snap_seq > 0 {
let r = snapshot::write_snapshot(&dir, &snap_state, snap_seq).await;
match &r {
Ok(()) => tracing::debug!(seq = snap_seq, "snapshot written (explicit)"),
Err(e) => tracing::error!(
error = %e,
seq = snap_seq,
"explicit snapshot write failed (non-fatal; log is truth)"
),
}
r
} else {
Ok(())
};
for idx in snap_job_idxs {
if let Some(Job::Snapshot(reply_tx)) = jobs[idx].take() {
let send_val = snap_result
.as_ref()
.map(|_| ())
.map_err(|e| StoreError::Io(e.to_string().into()));
let _ = reply_tx.send(send_val);
}
}
}
if hit_periodic && !snap_inflight.load(Ordering::Acquire) {
let (snap_state, snap_seq) = {
let st = state.read().await;
let last = st.next_seq.saturating_sub(1);
(st.clone(), last)
};
if snap_seq > 0 {
snap_inflight.store(true, Ordering::Release);
events_since_snap = 0;
let dir2 = dir.clone();
let flag = Arc::clone(&snap_inflight);
tokio::spawn(async move {
match snapshot::write_snapshot(&dir2, &snap_state, snap_seq).await {
Ok(()) => tracing::debug!(seq = snap_seq, "periodic snapshot written"),
Err(e) => tracing::error!(
error = %e,
seq = snap_seq,
"periodic snapshot write failed (non-fatal; log is truth)"
),
}
flag.store(false, Ordering::Release);
});
}
} else if hit_periodic {
tracing::debug!("periodic snapshot skipped: background snapshot already in flight");
}
}
}
fn io_kill_batch(jobs: Vec<Option<Job>>, e: std::io::Error) {
tracing::error!(error = %e, "fatal I/O in book writer — writer exiting");
drop(jobs);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::state::{AccountState, BookState, PostingIndex};
use std::sync::Arc;
use talea_core::store::AccountCfg;
use talea_core::types::*;
use tokio::sync::RwLock;
async fn writer_with_accounts(dir: &std::path::Path) -> BookWriter {
let mut st = BookState::default();
for path in ["cash", "rev"] {
st.accounts.insert(
AccountId {
book: Book("b".into()),
path: path.into(),
}
.to_key(),
AccountState {
def: AccountDef {
id: AccountId {
book: Book("b".into()),
path: path.into(),
},
asset: AssetId::new("USD"),
kind: AccountKind::Asset,
},
cfg: AccountCfg {
normal_side: None,
min_balance: None,
},
raw_balance: 0,
updated_seq: 0,
postings: PostingIndex::default(),
},
);
}
BookWriter::spawn(dir.to_path_buf(), Arc::new(RwLock::new(st)), 1024)
.await
.unwrap()
}
fn tx(key: &str) -> Transaction {
Transaction {
id: TxId(uuid::Uuid::now_v7()),
book: Book("b".into()),
postings: vec![
Posting {
account: AccountId {
book: Book("b".into()),
path: "cash".into(),
},
amount: Amount::new(10, AssetId::new("USD")),
direction: Direction::Debit,
},
Posting {
account: AccountId {
book: Book("b".into()),
path: "rev".into(),
},
amount: Amount::new(10, AssetId::new("USD")),
direction: Direction::Credit,
},
],
idempotency_key: IdempotencyKey(key.into()),
external_refs: vec![],
metadata: serde_json::Value::Null,
occurred_at: chrono::Utc::now(),
}
}
#[tokio::test]
async fn commits_assign_gapless_seq_and_monotonic_at() {
let dir = tempfile::tempdir().unwrap();
let w = writer_with_accounts(dir.path()).await;
let mut prev_at = None;
for (i, key) in ["a", "b", "c"].iter().enumerate() {
let c = w.commit(tx(key)).await.unwrap();
assert_eq!(c.seq, (i + 1) as Seq);
if let Some(p) = prev_at {
assert!(c.at >= p, "committed_at must be non-decreasing");
}
prev_at = Some(c.at);
}
}
#[tokio::test]
async fn duplicate_idem_returns_prior_committed() {
let dir = tempfile::tempdir().unwrap();
let w = writer_with_accounts(dir.path()).await;
let first = w.commit(tx("same")).await.unwrap();
let replay = w.commit(tx("same")).await.unwrap();
assert_eq!(replay.seq, first.seq);
assert_eq!(replay.txid, first.txid);
assert_eq!(replay.at, first.at);
}
#[tokio::test]
async fn rejected_draft_does_not_poison_batchmates_or_consume_seq() {
let dir = tempfile::tempdir().unwrap();
let w = writer_with_accounts(dir.path()).await;
let mut bad = tx("bad");
bad.postings[0].account = AccountId {
book: Book("b".into()),
path: "ghost".into(),
};
let (r_bad, r_ok) = tokio::join!(w.commit(bad), w.commit(tx("ok")));
assert!(matches!(
r_bad,
Err(talea_core::store::StoreError::UnknownAccount(_))
));
assert_eq!(
r_ok.unwrap().seq,
1,
"rejected draft must not consume a seq"
);
}
#[tokio::test]
async fn concurrent_commits_all_land_durably_and_gapless() {
let dir = tempfile::tempdir().unwrap();
let w = writer_with_accounts(dir.path()).await;
let mut handles = vec![];
for i in 0..64 {
let w = w.clone();
handles.push(tokio::spawn(
async move { w.commit(tx(&format!("k{i}"))).await },
));
}
let mut seqs: Vec<Seq> = vec![];
for h in handles {
seqs.push(h.await.unwrap().unwrap().seq);
}
seqs.sort();
assert_eq!(seqs, (1..=64).collect::<Vec<Seq>>());
let seg = crate::segment::SegmentSet::open(dir.path()).await.unwrap();
assert_eq!(seg.scan_from(1, 1000).await.unwrap().len(), 64);
}
#[tokio::test]
async fn subscribers_see_events_post_fsync_in_seq_order() {
let dir = tempfile::tempdir().unwrap();
let w = writer_with_accounts(dir.path()).await;
let mut rx = w.events.subscribe();
for key in ["a", "b"] {
w.commit(tx(key)).await.unwrap();
}
assert_eq!(rx.recv().await.unwrap().seq, 1);
assert_eq!(rx.recv().await.unwrap().seq, 2);
}
#[tokio::test]
async fn duplicate_idem_within_one_batch_resolves_to_first_txid() {
let dir = tempfile::tempdir().unwrap();
let w = writer_with_accounts(dir.path()).await;
let t1 = tx("same-key");
let t2 = tx("same-key");
let (r1, r2) = tokio::join!(w.commit(t1), w.commit(t2));
let c1 = r1.expect("first commit must succeed");
let c2 = r2.expect("second commit must succeed");
assert_eq!(
c2.txid, c1.txid,
"dup must resolve to the first txid, not its own"
);
assert_eq!(c2.seq, c1.seq, "dup must resolve to the first seq");
assert_eq!(c2.at, c1.at, "dup must resolve to the first at");
let seg = crate::segment::SegmentSet::open(dir.path()).await.unwrap();
let frames = seg.scan_from(1, 1000).await.unwrap();
assert_eq!(
frames.len(),
1,
"only the first transaction should be persisted"
);
}
#[tokio::test]
async fn second_writer_on_same_state_is_refused() {
let dir1 = tempfile::tempdir().unwrap();
let dir2 = tempfile::tempdir().unwrap();
let state = Arc::new(RwLock::new(BookState::default()));
let _w1 = BookWriter::spawn(dir1.path().to_path_buf(), Arc::clone(&state), 64)
.await
.expect("first writer must succeed");
let result = BookWriter::spawn(dir2.path().to_path_buf(), Arc::clone(&state), 64).await;
match result {
Err(e) => assert!(
e.to_string().contains("single-writer contract violated"),
"unexpected error message: {e}",
),
Ok(_) => panic!("second writer must be refused but spawn succeeded"),
}
}
#[tokio::test]
async fn open_account_idempotent_same_def_no_event() {
let dir = tempfile::tempdir().unwrap();
let state = Arc::new(RwLock::new(BookState::default()));
let w = BookWriter::spawn(dir.path().to_path_buf(), Arc::clone(&state), 64)
.await
.unwrap();
let def = AccountDef {
id: AccountId {
book: Book("b".into()),
path: "checking".into(),
},
asset: AssetId::new("USD"),
kind: AccountKind::Asset,
};
let cfg = AccountCfg {
normal_side: None,
min_balance: None,
};
let (tx1, rx1) = oneshot::channel();
w.submit(Job::OpenAccount(def.clone(), cfg.clone(), tx1))
.await
.unwrap();
rx1.await.unwrap().expect("first open must succeed");
let (tx2, rx2) = oneshot::channel();
w.submit(Job::OpenAccount(def.clone(), cfg.clone(), tx2))
.await
.unwrap();
rx2.await.unwrap().expect("idempotent open must succeed");
let seg = crate::segment::SegmentSet::open(dir.path()).await.unwrap();
let frames = seg.scan_from(1, 1000).await.unwrap();
assert_eq!(
frames.len(),
1,
"idempotent open must not append a second frame"
);
let state_ref = state.read().await;
let next = state_ref.next_seq;
drop(state_ref);
assert_eq!(
next, 2,
"seq must be 2 after one open + one idempotent no-op"
);
}
#[tokio::test]
async fn open_account_different_def_already_exists() {
let dir = tempfile::tempdir().unwrap();
let state = Arc::new(RwLock::new(BookState::default()));
let w = BookWriter::spawn(dir.path().to_path_buf(), Arc::clone(&state), 64)
.await
.unwrap();
let def = AccountDef {
id: AccountId {
book: Book("b".into()),
path: "savings".into(),
},
asset: AssetId::new("USD"),
kind: AccountKind::Asset,
};
let cfg1 = AccountCfg {
normal_side: None,
min_balance: None,
};
let cfg2 = AccountCfg {
normal_side: Some(Direction::Debit),
min_balance: Some(0),
};
let (tx1, rx1) = oneshot::channel();
w.submit(Job::OpenAccount(def.clone(), cfg1, tx1))
.await
.unwrap();
rx1.await.unwrap().expect("first open must succeed");
let (tx2, rx2) = oneshot::channel();
w.submit(Job::OpenAccount(def.clone(), cfg2, tx2))
.await
.unwrap();
let err = rx2.await.unwrap().expect_err("conflicting open must fail");
assert!(
matches!(err, talea_core::store::StoreError::AlreadyExists { .. }),
"expected AlreadyExists, got {err:?}",
);
}
#[tokio::test]
async fn register_asset_appends_to_log() {
let dir = tempfile::tempdir().unwrap();
let state = Arc::new(RwLock::new(BookState::default()));
let w = BookWriter::spawn(dir.path().to_path_buf(), Arc::clone(&state), 64)
.await
.unwrap();
let def = AssetDef {
id: AssetId::new("EUR"),
class: talea_core::types::AssetClass::Fiat,
precision: 2,
name: "Euro".into(),
};
let (reply_tx, reply_rx) = oneshot::channel();
w.submit(Job::RegisterAsset(def, reply_tx)).await.unwrap();
reply_rx
.await
.unwrap()
.expect("register asset must succeed");
let seg = crate::segment::SegmentSet::open(dir.path()).await.unwrap();
let frames = seg.scan_from(1, 1000).await.unwrap();
assert_eq!(frames.len(), 1, "one AssetRegistered frame expected");
assert!(
matches!(
frames[0].event,
talea_core::events::LedgerEvent::AssetRegistered(_)
),
"expected AssetRegistered event",
);
}
#[tokio::test]
async fn rejected_draft_produces_no_broadcast() {
let dir = tempfile::tempdir().unwrap();
let w = writer_with_accounts(dir.path()).await;
let mut rx = w.events.subscribe();
let mut bad = tx("bad");
bad.postings[0].account = AccountId {
book: Book("b".into()),
path: "ghost".into(),
};
let good = tx("good");
let (r_bad, r_good) = tokio::join!(w.commit(bad), w.commit(good));
assert!(r_bad.is_err(), "bad tx must be rejected");
let good_seq = r_good.expect("good tx must succeed").seq;
let ev = rx.recv().await.expect("must receive one broadcast event");
assert_eq!(ev.seq, good_seq, "broadcast event must be the accepted tx");
let second = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv()).await;
assert!(
second.is_err(),
"no second broadcast event expected after rejected draft"
);
}
async fn writer_with_accounts_and_hook(
dir: &std::path::Path,
hook: Option<std::sync::Arc<dyn Fn() -> std::io::Result<()> + Send + Sync>>,
) -> (BookWriter, Arc<RwLock<BookState>>) {
let mut st = BookState::default();
for path in ["cash", "rev"] {
st.accounts.insert(
AccountId {
book: Book("b".into()),
path: path.into(),
}
.to_key(),
AccountState {
def: AccountDef {
id: AccountId {
book: Book("b".into()),
path: path.into(),
},
asset: AssetId::new("USD"),
kind: AccountKind::Asset,
},
cfg: AccountCfg {
normal_side: None,
min_balance: None,
},
raw_balance: 0,
updated_seq: 0,
postings: PostingIndex::default(),
},
);
}
let state = Arc::new(RwLock::new(st));
let w = BookWriter::spawn_for_test(dir.to_path_buf(), Arc::clone(&state), 1024, hook)
.await
.unwrap();
(w, state)
}
#[tokio::test]
async fn ack_only_after_sync_returns() {
use std::sync::atomic::{AtomicBool, Ordering};
let dir = tempfile::tempdir().unwrap();
let synced = Arc::new(AtomicBool::new(false));
let synced2 = Arc::clone(&synced);
let hook = std::sync::Arc::new(move || -> std::io::Result<()> {
synced2.store(true, Ordering::SeqCst);
Ok(())
});
let (w, _state) = writer_with_accounts_and_hook(dir.path(), Some(hook)).await;
w.commit(tx("k1")).await.expect("first commit must succeed");
assert!(
synced.load(Ordering::SeqCst),
"ack must arrive only after sync_hook set synced=true (first commit)"
);
synced.store(false, Ordering::SeqCst);
w.commit(tx("k2"))
.await
.expect("second commit must succeed");
assert!(
synced.load(Ordering::SeqCst),
"ack must arrive only after sync_hook set synced=true (second commit)"
);
}
#[tokio::test]
async fn no_broadcast_before_sync() {
use std::sync::atomic::{AtomicU64, Ordering};
let dir = tempfile::tempdir().unwrap();
let sync_count = Arc::new(AtomicU64::new(0));
let sc2 = Arc::clone(&sync_count);
let hook = std::sync::Arc::new(move || -> std::io::Result<()> {
sc2.fetch_add(1, Ordering::SeqCst);
Ok(())
});
let (w, _state) = writer_with_accounts_and_hook(dir.path(), Some(hook)).await;
for (i, key) in ["a", "b", "c"].iter().enumerate() {
let events_before = (i as u64).saturating_sub(1);
let mut rx = w.events.subscribe();
w.commit(tx(key)).await.expect("commit must succeed");
let _ev = tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv())
.await
.expect("broadcast timeout")
.expect("broadcast channel closed");
let sc = sync_count.load(Ordering::SeqCst);
assert!(
sc > events_before,
"sync_count ({sc}) must be ≥ batches committed ({}) at event {}",
events_before + 1,
i + 1,
);
}
assert!(
sync_count.load(Ordering::SeqCst) >= 1,
"at least one sync must have fired for three sequential commits"
);
}
#[tokio::test]
async fn reject_only_batch_skips_sync() {
use std::sync::atomic::{AtomicU64, Ordering};
let dir = tempfile::tempdir().unwrap();
let sync_count = Arc::new(AtomicU64::new(0));
let sc2 = Arc::clone(&sync_count);
let hook = std::sync::Arc::new(move || -> std::io::Result<()> {
sc2.fetch_add(1, Ordering::SeqCst);
Ok(())
});
let (w, _state) = writer_with_accounts_and_hook(dir.path(), Some(hook)).await;
let mut bad = tx("ghost-tx");
bad.postings[0].account = AccountId {
book: Book("b".into()),
path: "ghost".into(),
};
let result = w.commit(bad).await;
assert!(
matches!(
result,
Err(talea_core::store::StoreError::UnknownAccount(_))
),
"ghost-account tx must be rejected, got {result:?}"
);
assert_eq!(
sync_count.load(Ordering::SeqCst),
0,
"a reject-only batch must not trigger fsync"
);
}
#[tokio::test]
async fn fsync_failure_fails_the_batch_with_io() {
let dir = tempfile::tempdir().unwrap();
let hook = std::sync::Arc::new(|| -> std::io::Result<()> {
Err(std::io::Error::other("injected fsync failure"))
});
let (w, _state) = writer_with_accounts_and_hook(dir.path(), Some(hook)).await;
let result = w.commit(tx("will-fail")).await;
assert!(
matches!(result, Err(talea_core::store::StoreError::Io(_))),
"fsync failure must surface as StoreError::Io, got {result:?}"
);
}
#[tokio::test]
async fn fsync_failure_kills_the_writer_permanently() {
let dir = tempfile::tempdir().unwrap();
let hook = std::sync::Arc::new(|| -> std::io::Result<()> {
Err(std::io::Error::other("injected fsync failure"))
});
let (w, state) = writer_with_accounts_and_hook(dir.path(), Some(hook)).await;
let (seq_before, idem_count_before) = {
let st = state.read().await;
(st.next_seq, st.idem.hot_len())
};
let r1 = w.commit(tx("fail-1")).await;
assert!(
matches!(r1, Err(talea_core::store::StoreError::Io(_))),
"first post-failure commit must be StoreError::Io, got {r1:?}"
);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let r2 = w.commit(tx("fail-2")).await;
assert!(
matches!(r2, Err(talea_core::store::StoreError::Io(_))),
"second commit after dead writer must be StoreError::Io, got {r2:?}"
);
let (seq_after, idem_count_after) = {
let st = state.read().await;
(st.next_seq, st.idem.hot_len())
};
assert_eq!(
seq_after, seq_before,
"next_seq must be unchanged after a failed fsync (apply-after-fsync)"
);
assert_eq!(
idem_count_after, idem_count_before,
"idem map must be unchanged after a failed fsync (apply-after-fsync)"
);
}
}