talea_store_log/lib.rs
1//! Append-log store: one CRC-framed JSON event log per book, a single
2//! writer task per book over in-memory state, strict fsync-per-batch.
3
4pub mod frame;
5pub mod idem_spill;
6pub mod segment;
7pub mod snapshot;
8pub mod state;
9pub mod writer;
10pub use frame::WireEvent;
11
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::{Arc, Mutex, PoisonError};
16
17use async_trait::async_trait;
18use chrono::DateTime;
19use chrono::Utc;
20use futures::stream::BoxStream;
21use tokio::sync::RwLock;
22
23use talea_core::events::LedgerEvent;
24use talea_core::store::{
25 AccountCfg, BalanceSnapshot, Committed, PostingRecord, SYSTEM_BOOK, Sequenced, Store,
26 StoreError, StoredTransaction, TrialBalanceRow,
27};
28use talea_core::types::{
29 AccountDef, AccountId, Amount, AssetDef, AssetId, Book, Direction, Seq, Transaction, TxId,
30};
31
32use crate::idem_spill::DEFAULT_IDEM_HOT_CAP;
33use crate::segment::SegmentSet;
34use crate::state::{BookState, effective};
35use crate::writer::{BookWriter, Job};
36
37/// Options for opening a [`LogTaleaStore`].
38#[derive(Debug, Clone)]
39pub struct LogStoreOptions {
40 /// Maximum number of hot idempotency keys kept in memory.
41 /// Overflow drains the oldest half to on-disk spill runs.
42 pub idem_hot_cap: usize,
43 /// Events between automatic snapshots (0 = disabled).
44 pub snapshot_every: u64,
45 /// Maximum bytes per segment file before rotation.
46 pub segment_max: u64,
47}
48
49impl Default for LogStoreOptions {
50 fn default() -> Self {
51 Self {
52 idem_hot_cap: DEFAULT_IDEM_HOT_CAP,
53 snapshot_every: BookWriter::DEFAULT_SNAPSHOT_EVERY,
54 segment_max: crate::segment::DEFAULT_SEGMENT_MAX,
55 }
56 }
57}
58
59/// An append-log implementation of [`Store`].
60///
61/// # Directory structure
62///
63/// ```text
64/// <dir>/
65/// LOCK ← exclusive advisory lock held for the process lifetime
66/// books/
67/// _system/ ← system book (assets)
68/// <book>/ ← one dir per user book
69/// ```
70///
71/// # Concurrency / single-process invariant
72///
73/// One `LogTaleaStore` per directory at a time is enforced via an fs4 advisory
74/// lock on `<dir>/LOCK`. A second `open` on the same directory from the same
75/// or a different process will fail with `StoreError::Io`.
76///
77/// # Book name safety
78///
79/// Book names are validated to prevent directory escape: names containing `/`,
80/// `\`, or the component `..` are rejected. This validation happens in every
81/// path that accepts a book name from external input.
82pub struct LogTaleaStore {
83 dir: PathBuf,
84 registry: Arc<RwLock<HashMap<AssetId, AssetDef>>>,
85 books: Arc<RwLock<HashMap<String, BookWriter>>>,
86 batch_max: usize,
87 /// Segment rotation threshold in bytes. Stored so that new books created
88 /// after `open_with` (via `book_writer`) use the same threshold as books
89 /// that were already on disk at startup.
90 segment_max: u64,
91 /// Snapshot cadence (events between automatic snapshots). Stored so that
92 /// new books created after `open_with` use the same setting.
93 snapshot_every: u64,
94 /// Set to `true` by `shutdown()` before draining writers. Any subsequent
95 /// call into `book_writer()` or `register_asset()` checks this flag first
96 /// and returns an error rather than silently recreating writers over an
97 /// empty `BookState` while a newly-opened store may own the directory.
98 shut_down: AtomicBool,
99 /// Held for the process lifetime. `shutdown(&self)` takes the file out so
100 /// the advisory lock is released before another `open` on the same dir
101 /// (e.g. in the same test process). Drop of the struct releases it otherwise.
102 ///
103 /// We wrap in `Mutex<Option<_>>` because `shutdown` takes `&self` (not
104 /// `self`) — the test drops the store after calling `shutdown`, and the
105 /// reopen inside the same scope needs the lock gone first.
106 _lock: Mutex<Option<std::fs::File>>,
107}
108
109fn io_err(e: impl std::error::Error + Send + Sync + 'static) -> StoreError {
110 StoreError::Io(Box::new(e))
111}
112
113fn io_str(s: impl Into<String>) -> StoreError {
114 StoreError::Io(s.into().into())
115}
116
117/// Validate a book name is safe to use as a directory component.
118fn validate_book_name(book: &str) -> Result<(), StoreError> {
119 if book.is_empty()
120 || book.contains('/')
121 || book.contains('\\')
122 || book == ".."
123 || book.contains("..")
124 {
125 return Err(io_str(format!(
126 "invalid book name {book:?}: must not be empty or contain '/', '\\', or '..'"
127 )));
128 }
129 Ok(())
130}
131
132impl LogTaleaStore {
133 /// Open (or create) the store at `dir` with default options.
134 ///
135 /// See [`open_with`] for the full option set.
136 pub async fn open(dir: &Path) -> Result<Self, StoreError> {
137 Self::open_with(dir, LogStoreOptions::default()).await
138 }
139
140 /// Open (or create) the store at `dir` with explicit options.
141 ///
142 /// 1. Creates `<dir>/books/` if missing.
143 /// 2. Acquires an exclusive fs4 advisory lock on `<dir>/LOCK`.
144 /// 3. Replays each existing book dir under `<dir>/books/` to rebuild
145 /// in-memory state, then spawns a `BookWriter` per book.
146 /// 4. `_system` is treated like any other book for replay; `AssetRegistered`
147 /// events also populate the in-memory asset registry.
148 /// 5. For each book, calls `idem.attach_dir(...)` to load spill runs and
149 /// rebuild the Bloom filter. If any run's CRC fails or the Bloom file
150 /// is missing/corrupt, a full log scan is performed to rebuild the runs.
151 pub async fn open_with(dir: &Path, opts: LogStoreOptions) -> Result<Self, StoreError> {
152 use fs4::fs_std::FileExt;
153
154 // 1. Create dirs.
155 tokio::fs::create_dir_all(dir).await.map_err(io_err)?;
156 let books_dir = dir.join("books");
157 tokio::fs::create_dir_all(&books_dir)
158 .await
159 .map_err(io_err)?;
160
161 // 2. Acquire exclusive advisory lock.
162 let lock_path = dir.join("LOCK");
163 let lock_file = std::fs::OpenOptions::new()
164 .create(true)
165 .write(true)
166 // truncate(false): we only need the fd for locking, not to modify content
167 .truncate(false)
168 .open(&lock_path)
169 .map_err(io_err)?;
170 // try_lock_exclusive returns Ok(true) on success, Ok(false) if contended,
171 // Err(_) for genuine I/O failure.
172 let locked = lock_file.try_lock_exclusive().map_err(io_err)?;
173 if !locked {
174 return Err(io_str(format!(
175 "data dir already locked by another process: {}",
176 dir.display()
177 )));
178 }
179
180 // 3. Replay existing book dirs.
181 let mut registry: HashMap<AssetId, AssetDef> = HashMap::new();
182 let mut books_map: HashMap<String, BookWriter> = HashMap::new();
183 let batch_max = 1024;
184
185 let mut rd = tokio::fs::read_dir(&books_dir).await.map_err(io_err)?;
186 while let Some(entry) = rd.next_entry().await.map_err(io_err)? {
187 let ft = entry.file_type().await.map_err(io_err)?;
188 if !ft.is_dir() {
189 continue;
190 }
191 let name = entry
192 .file_name()
193 .into_string()
194 .map_err(|_| io_str("non-UTF-8 book dir name"))?;
195
196 let book_dir = entry.path();
197
198 // Open the segments for replay (validation + repair happens inside open).
199 let seg = SegmentSet::open(&book_dir).await.map_err(io_err)?;
200
201 // --- Snapshot-assisted replay ---
202 //
203 // Try to load the newest valid snapshot from this book's directory.
204 // If one is found, we start replaying the event log from `snap_seq + 1`
205 // instead of from genesis, bounding startup time to the tail only.
206 //
207 // The snapshot was taken post-apply (by the writer, after fsync+apply),
208 // so `state.next_seq == snap_seq + 1` in the loaded state. We trust the
209 // state's own counters; the filename's seq is only used to select the
210 // segment start point.
211 //
212 // The deserialized BookState's `writer_attached` flag is always `false`
213 // (it has `#[serde(skip, default = "default_writer_attached")]`), so a
214 // writer can safely attach to it post-load.
215 let (mut st, replay_from) =
216 match snapshot::load_latest(&book_dir).await.map_err(io_err)? {
217 Some((snap_st, snap_seq)) => {
218 // Sanity: state's own next_seq must be snap_seq+1.
219 debug_assert_eq!(
220 snap_st.next_seq,
221 snap_seq + 1,
222 "snapshot {name} seq={snap_seq}: state.next_seq={} expected {}",
223 snap_st.next_seq,
224 snap_seq + 1
225 );
226 tracing::debug!(
227 book = %name,
228 snap_seq,
229 "loaded snapshot; replaying tail from seq {}",
230 snap_seq + 1
231 );
232 (snap_st, snap_seq + 1)
233 }
234 None => {
235 tracing::debug!(book = %name, "no snapshot; full replay from seq 1");
236 (BookState::default(), 1)
237 }
238 };
239
240 // Apply idem_hot_cap option to the loaded state.
241 st.idem.cap = opts.idem_hot_cap;
242
243 // Replay the tail starting at `replay_from` (full replay when no snapshot).
244 let pairs = seg
245 .scan_with_pos(replay_from, usize::MAX)
246 .await
247 .map_err(io_err)?;
248
249 // Fold into the (possibly snapshot-seeded) BookState.
250 // Use try_apply_transaction (not apply_transaction) so a corrupt or
251 // hand-edited log that contains arithmetic-impossible postings
252 // returns StoreError::Io instead of panicking.
253 for (wire, pos) in &pairs {
254 match &wire.event {
255 LedgerEvent::TransactionPosted(tx) => {
256 st.try_apply_transaction(tx, wire.seq, wire.at, *pos)
257 .map_err(|msg| {
258 // Encode enough context for the operator to locate the corrupt frame.
259 let (seg_base, off) = pos;
260 io_str(format!(
261 "corrupt log in book {name} at segment {seg_base} offset {off}: {msg}"
262 ))
263 })?;
264 }
265 LedgerEvent::AccountOpened { def, cfg } => {
266 st.apply_account_opened(def, cfg, wire.seq, wire.at);
267 }
268 LedgerEvent::AssetRegistered(def) => {
269 st.bump_seq(wire.seq, wire.at);
270 // Populate registry regardless of which book the event
271 // lives in (though the writer only appends these to _system).
272 registry.insert(def.id.clone(), def.clone());
273 }
274 }
275 }
276
277 // Attach idem spill runs + bloom from disk.
278 // The rebuild_fn supplies all committed idem keys from the full log
279 // (both the snapshot-sourced hot map and tail replay), so that if any
280 // run is corrupt/missing the index can be rebuilt accurately.
281 //
282 // Note: we do a second pass over `pairs` only for the rebuild path
283 // (rare). The all_idem_pairs closure is only called if CRC fails.
284 let all_idem_snap: Vec<(String, crate::state::CommittedRec)> = {
285 // Pre-snapshot idem keys come from st.idem.hot (already in hot).
286 // tail-replay idem keys are also already in st.idem.hot at this point.
287 // For rebuild, we need ALL committed keys: hot map is authoritative
288 // for recent, runs hold older. The rebuild_fn produces the UNION
289 // of all committed keys that are NOT already in hot, so pass an
290 // empty vec here (hot is the only source of truth right now; runs
291 // will be rebuilt from the log scan that follows).
292 //
293 // Actually: we need to provide all keys so rebuild can reconstruct
294 // the full set. `st.idem.hot` has all keys from replay at this point
295 // (runs will be empty since they're being validated).
296 // Pass all pairs from the full log so the rebuild_fn has complete data.
297 pairs
298 .iter()
299 .filter_map(|(wire, _)| {
300 if let LedgerEvent::TransactionPosted(tx) = &wire.event {
301 Some((
302 tx.idempotency_key.0.clone(),
303 crate::state::CommittedRec {
304 txid: tx.id.clone(),
305 seq: wire.seq,
306 at: wire.at,
307 },
308 ))
309 } else {
310 None
311 }
312 })
313 .collect()
314 };
315
316 st.idem
317 .attach_dir(&book_dir, move || {
318 let snap = all_idem_snap;
319 async move { Ok(snap) }
320 })
321 .await
322 .map_err(io_err)?;
323
324 // Drop the replay SegmentSet — BookWriter::spawn re-opens it for writes.
325 drop(seg);
326
327 // Spawn writer over the replayed state.
328 let writer = BookWriter::spawn_with_opts(
329 book_dir,
330 Arc::new(RwLock::new(st)),
331 batch_max,
332 opts.snapshot_every,
333 opts.segment_max,
334 )
335 .await
336 .map_err(io_err)?;
337
338 books_map.insert(name, writer);
339 }
340
341 Ok(Self {
342 dir: dir.to_path_buf(),
343 registry: Arc::new(RwLock::new(registry)),
344 books: Arc::new(RwLock::new(books_map)),
345 batch_max,
346 segment_max: opts.segment_max,
347 snapshot_every: opts.snapshot_every,
348 shut_down: AtomicBool::new(false),
349 _lock: Mutex::new(Some(lock_file)),
350 })
351 }
352
353 /// Look up an existing `BookWriter` without creating one.
354 ///
355 /// Returns:
356 /// - `Err(StoreError::Io)` if the store has been shut down (same behaviour
357 /// as `book_writer` — a shut-down store should not silently report that
358 /// every book is absent).
359 /// - `Ok(None)` if the book does not exist yet (read lock only, no I/O).
360 /// - `Ok(Some(w))` if the book is already loaded.
361 ///
362 /// Read paths (e.g. `balance`) use this so they never create a book
363 /// directory on disk as a side effect of querying a nonexistent book.
364 /// Write paths (`commit`, `open_account`, `register_asset`) continue to
365 /// use `book_writer`, which creates the book on first access.
366 async fn existing_book(&self, book: &str) -> Result<Option<BookWriter>, StoreError> {
367 if self.shut_down.load(Ordering::SeqCst) {
368 return Err(io_str("store has been shut down"));
369 }
370 validate_book_name(book)?;
371 let guard = self.books.read().await;
372 Ok(guard.get(book).cloned())
373 }
374
375 /// Get or create the `BookWriter` for `book`.
376 ///
377 /// # Book name safety
378 ///
379 /// Book names containing `/`, `\`, or `..` are rejected to prevent a book
380 /// named e.g. `../x` from escaping the `<dir>/books/` subtree. The API
381 /// layer constrains book names further, but the store provides a last-line
382 /// defence here too.
383 async fn book_writer(&self, book: &str) -> Result<BookWriter, StoreError> {
384 if self.shut_down.load(Ordering::SeqCst) {
385 return Err(io_str("store has been shut down"));
386 }
387 validate_book_name(book)?;
388
389 // Fast path: read lock.
390 {
391 let guard = self.books.read().await;
392 if let Some(w) = guard.get(book) {
393 return Ok(w.clone());
394 }
395 }
396
397 // Slow path: write lock, get-or-create.
398 let mut guard = self.books.write().await;
399 if let Some(w) = guard.get(book) {
400 return Ok(w.clone());
401 }
402
403 let book_dir = self.dir.join("books").join(book);
404 // For a brand-new book, attach_dir with an empty rebuild_fn (no existing log).
405 let mut initial_state = BookState::default();
406 tokio::fs::create_dir_all(&book_dir).await.map_err(io_err)?;
407 initial_state
408 .idem
409 .attach_dir(&book_dir, || async { Ok(vec![]) })
410 .await
411 .map_err(io_err)?;
412
413 let state = Arc::new(RwLock::new(initial_state));
414 let writer = BookWriter::spawn_with_opts(
415 book_dir,
416 state,
417 self.batch_max,
418 self.snapshot_every,
419 self.segment_max,
420 )
421 .await
422 .map_err(io_err)?;
423 guard.insert(book.to_string(), writer.clone());
424 Ok(writer)
425 }
426
427 /// Trigger an immediate snapshot for `book` and wait for it to complete.
428 ///
429 /// This is an ops/test hook. The snapshot is written by the book's writer
430 /// task (the only mutator), so it is consistent with the current state.
431 ///
432 /// Returns `Ok(())` on success. Returns `StoreError::Io` if the book is
433 /// unknown, the store is shut down, or the snapshot write fails.
434 pub async fn snapshot_now(&self, book: &str) -> Result<(), StoreError> {
435 let writer = self
436 .existing_book(book)
437 .await?
438 .ok_or_else(|| io_str(format!("book {book:?} not found for snapshot_now")))?;
439 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
440 writer.submit(Job::Snapshot(reply_tx)).await?;
441 reply_rx
442 .await
443 .map_err(|_| io_str("book writer gone during snapshot_now"))?
444 }
445
446 /// Drain all writers and await their completion.
447 ///
448 /// Writers finish when their last sender clone is dropped. This method:
449 /// 1. Drains the books map (removing the store's writer clones).
450 /// 2. For each writer, calls `BookWriter::shutdown(w)` which consumes the
451 /// clone — dropping its `mpsc::Sender` — before awaiting the task. This
452 /// guarantees the task sees sender-count → 0 and exits.
453 /// 3. Releases the advisory LOCK file so a subsequent `open` on the same
454 /// directory succeeds in the same process (the test pattern calls
455 /// `shutdown` before dropping and then reopens).
456 ///
457 /// Note: takes `&self` (not `self`) so it can be called without consuming
458 /// the store. The caller is expected to drop the `LogTaleaStore` afterward.
459 pub async fn shutdown(&self) {
460 // Signal all subsequent calls that the store is no longer usable.
461 // Must happen BEFORE draining writers so any concurrent book_writer()
462 // call racing with drain sees the flag and returns an error.
463 self.shut_down.store(true, Ordering::SeqCst);
464
465 // Drain the books map → we now hold the only remaining clones.
466 let writers: Vec<BookWriter> = {
467 let mut guard = self.books.write().await;
468 guard.drain().map(|(_, w)| w).collect()
469 };
470
471 // BookWriter::shutdown(w) consumes `w` (dropping its Sender) before
472 // awaiting the task handle. Calling `w.join()` instead would keep `w`
473 // alive during the await, so the Sender would never be dropped and the
474 // task would block forever on `rx.recv()`.
475 for w in writers {
476 w.shutdown().await;
477 }
478
479 // Release the advisory lock so a subsequent `open` on the same dir
480 // works. Poison recovery: taking the file handle out of the Option
481 // cannot be torn by a panicking thread, so the inner value is safe.
482 let _ = self
483 ._lock
484 .lock()
485 .unwrap_or_else(PoisonError::into_inner)
486 .take();
487 }
488}
489
490#[async_trait]
491impl Store for LogTaleaStore {
492 /// Register an asset.
493 ///
494 /// Idempotent: same def → `Ok(())` with no new event.
495 /// Different def for same id → `StoreError::AlreadyExists`.
496 ///
497 /// # Race note (same-new-asset concurrent register)
498 ///
499 /// Both concurrent calls for a brand-new asset would each pass the "absent"
500 /// check and submit a `Job::RegisterAsset`. To prevent two `AssetRegistered`
501 /// frames being appended, we hold the registry WRITE lock across the submit
502 /// AND the ack from the writer. Asset registration is rare (a setup-time
503 /// operation, not a hot path), so a write-lock held across an await is
504 /// acceptable here. Documented explicitly.
505 async fn register_asset(&self, asset: &AssetDef) -> Result<(), StoreError> {
506 if self.shut_down.load(Ordering::SeqCst) {
507 return Err(io_str("store has been shut down"));
508 }
509 // Hold the write lock for the full operation to serialize concurrent
510 // registrations of the same new asset (see race note above).
511 let mut reg = self.registry.write().await;
512
513 if let Some(existing) = reg.get(&asset.id) {
514 if existing == asset {
515 return Ok(()); // same def — idempotent no-op
516 }
517 return Err(StoreError::AlreadyExists {
518 what: format!("asset {}", asset.id.as_str()),
519 });
520 }
521
522 // Submit to the _system writer.
523 let writer = self.book_writer(SYSTEM_BOOK).await?;
524 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
525 writer
526 .submit(Job::RegisterAsset(asset.clone(), reply_tx))
527 .await?;
528 reply_rx
529 .await
530 .map_err(|_| io_str("book writer gone during register_asset"))??;
531
532 // Insert into registry AFTER the ack so a concurrent open sees it
533 // only once it is durable.
534 reg.insert(asset.id.clone(), asset.clone());
535 Ok(())
536 }
537
538 /// Open an account.
539 ///
540 /// Validates that:
541 /// - The book is not a reserved name (starts with `_`).
542 /// - The asset is registered.
543 ///
544 /// Idempotent for the same `(def, cfg)` pair; `AlreadyExists` for a
545 /// conflicting def/cfg.
546 async fn open_account(&self, def: &AccountDef, cfg: &AccountCfg) -> Result<(), StoreError> {
547 // Reserved-book guard: the store is responsible for this (the writer
548 // does not validate book names for OpenAccount jobs).
549 if def.id.book.is_reserved() {
550 return Err(StoreError::InvalidBook(def.id.book.clone()));
551 }
552
553 // Asset must be registered.
554 {
555 let reg = self.registry.read().await;
556 if !reg.contains_key(&def.asset) {
557 return Err(StoreError::UnknownAsset(def.asset.clone()));
558 }
559 }
560
561 let writer = self.book_writer(&def.id.book.0).await?;
562 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
563 writer
564 .submit(Job::OpenAccount(def.clone(), cfg.clone(), reply_tx))
565 .await?;
566 reply_rx
567 .await
568 .map_err(|_| io_str("book writer gone during open_account"))?
569 }
570
571 /// Commit a transaction.
572 async fn commit(&self, transaction: &Transaction) -> Result<Committed, StoreError> {
573 let writer = self.book_writer(&transaction.book.0).await?;
574 writer.commit(transaction.clone()).await
575 }
576
577 /// Commit a batch of transactions with group-commit semantics.
578 ///
579 /// All oneshot reply channels are created and submitted to the writer
580 /// before awaiting any reply, so the writer can drain them into a single
581 /// fsync batch. Results are positional.
582 async fn commit_batch(&self, txs: &[Transaction]) -> Vec<Result<Committed, StoreError>> {
583 if txs.is_empty() {
584 return vec![];
585 }
586
587 // Group by book (in practice all txs are typically the same book,
588 // but handle multi-book correctly).
589 //
590 // For the single-book case: submit ALL jobs first, then collect replies —
591 // this is what lets the writer drain them into one fsync batch.
592 //
593 // For multi-book: we build per-book (channels, txs) then submit all at
594 // once before awaiting any. Positional output is reconstructed at the end.
595
596 // Build a vec of (book, tx_index, reply_rx) — all writers acquired before
597 // any submits.
598 let mut rxs: Vec<tokio::sync::oneshot::Receiver<Result<Committed, StoreError>>> =
599 Vec::with_capacity(txs.len());
600 // Temporarily hold Err results for txs that fail at writer-acquisition stage.
601 let mut errs: HashMap<usize, StoreError> = HashMap::new();
602
603 // Collect writers per book.
604 let mut writers_cache: HashMap<String, Result<BookWriter, StoreError>> = HashMap::new();
605
606 struct PendingSubmit {
607 writer: BookWriter,
608 tx_idx: usize,
609 }
610 let mut pending: Vec<PendingSubmit> = Vec::with_capacity(txs.len());
611
612 // Also track the original error message per book so duplicate slots
613 // for the same failing book get a consistent (not generic) error.
614 // StoreError is not Clone, so we store the message text and produce a
615 // fresh Io error for each affected slot.
616 let mut book_err_msgs: HashMap<String, String> = HashMap::new();
617
618 for (i, tx) in txs.iter().enumerate() {
619 let book = tx.book.0.clone();
620 if !writers_cache.contains_key(&book) {
621 let w = self.book_writer(&book).await;
622 if let Err(ref e) = w {
623 book_err_msgs.insert(book.clone(), e.to_string());
624 }
625 writers_cache.insert(book.clone(), w);
626 }
627 match &writers_cache[&book] {
628 // Carrying the writer clone here (instead of re-looking it up
629 // by book later) keeps the submit loop infallible.
630 Ok(w) => pending.push(PendingSubmit {
631 writer: w.clone(),
632 tx_idx: i,
633 }),
634 Err(_) => {
635 // Placeholder rx slot will be skipped; record error.
636 // We need to keep indices aligned, so push a dummy rx and
637 // record the error separately.
638 let msg = book_err_msgs
639 .get(&book)
640 .cloned()
641 .unwrap_or_else(|| "failed to get book writer".into());
642 errs.insert(i, io_str(msg));
643 }
644 }
645 }
646
647 // Build (tx_idx, rx) pairs: submit all jobs, collect all rxs.
648 let mut tx_to_rx_idx: Vec<Option<usize>> = vec![None; txs.len()];
649 let mut rx_to_tx_idx: Vec<usize> = Vec::with_capacity(pending.len());
650
651 for ps in &pending {
652 let i = ps.tx_idx;
653 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
654 let submit_result = ps
655 .writer
656 .submit(Job::Commit(txs[i].clone(), reply_tx))
657 .await;
658 match submit_result {
659 Ok(()) => {
660 let rx_idx = rxs.len();
661 rxs.push(reply_rx);
662 tx_to_rx_idx[i] = Some(rx_idx);
663 rx_to_tx_idx.push(i);
664 }
665 Err(e) => {
666 errs.insert(i, e);
667 }
668 }
669 }
670
671 // Await all replies.
672 let mut replies: Vec<Option<Result<Committed, StoreError>>> =
673 (0..rxs.len()).map(|_| None).collect();
674 for (rx_idx, rx) in rxs.into_iter().enumerate() {
675 replies[rx_idx] = Some(
676 rx.await
677 .unwrap_or_else(|_| Err(io_str("book writer gone during commit_batch"))),
678 );
679 }
680
681 // Reconstruct positional output.
682 let mut out: Vec<Result<Committed, StoreError>> = (0..txs.len())
683 .map(|_| Err(io_str("unreachable: unset commit_batch slot")))
684 .collect();
685
686 for i in 0..txs.len() {
687 if let Some(e) = errs.remove(&i) {
688 out[i] = Err(e);
689 } else if let Some(result) = tx_to_rx_idx[i]
690 .and_then(|rx_idx| replies.get_mut(rx_idx))
691 .and_then(Option::take)
692 {
693 // Every rx slot was set in the await loop above; a missing one
694 // leaves the pre-seeded "unset slot" error in place.
695 out[i] = result;
696 }
697 }
698
699 out
700 }
701
702 /// Current balance for `account`.
703 ///
704 /// `as_of: None` returns the current (fully-applied) balance from
705 /// in-memory state. `as_of: Some(t)` binary-searches the per-account
706 /// posting entries (sorted by committed_at, non-decreasing) to find the
707 /// balance as it stood at time `t`.
708 ///
709 /// Uses `existing_book` (read-only lookup) so querying an account in a
710 /// book that has never been written to does NOT create that book's
711 /// directory on disk. An absent book means the account cannot exist →
712 /// `StoreError::UnknownAccount`.
713 ///
714 /// Edge semantics matching sqlite:
715 /// - Unknown book → `UnknownAccount` (no book → no account).
716 /// - Unknown account within a known book → `UnknownAccount`.
717 /// - Account exists but no postings before `as_of` → `BalanceSnapshot { amount: 0, updated_seq: 0 }`.
718 async fn balance(
719 &self,
720 account: &AccountId,
721 as_of: Option<DateTime<Utc>>,
722 ) -> Result<BalanceSnapshot, StoreError> {
723 let writer = self
724 .existing_book(&account.book.0)
725 .await?
726 .ok_or_else(|| StoreError::UnknownAccount(account.clone()))?;
727 let st = writer.state.read().await;
728 let key = account.to_key();
729 let acct = st
730 .accounts
731 .get(&key)
732 .ok_or_else(|| StoreError::UnknownAccount(account.clone()))?;
733
734 match as_of {
735 None => {
736 let eff = effective(acct.raw_balance, &acct.cfg.normal_side);
737 Ok(BalanceSnapshot {
738 amount: Amount::new(eff, acct.def.asset.clone()),
739 updated_seq: acct.updated_seq,
740 })
741 }
742 Some(t) => {
743 // Binary search: committed_at is non-decreasing vs seq within
744 // a book. idx == 0 (no postings at or before the cutoff) and
745 // the entry lookup collapse into one infallible Option chain.
746 let idx = acct.postings.partition_point_at(t);
747 match idx.checked_sub(1).and_then(|i| acct.postings.get(i)) {
748 None => Ok(BalanceSnapshot {
749 amount: Amount::new(0, acct.def.asset.clone()),
750 updated_seq: 0,
751 }),
752 Some(entry) => {
753 let eff = effective(entry.raw_after, &acct.cfg.normal_side);
754 Ok(BalanceSnapshot {
755 amount: Amount::new(eff, acct.def.asset.clone()),
756 updated_seq: entry.seq,
757 })
758 }
759 }
760 }
761 }
762 }
763
764 /// Look up an asset by id.
765 async fn asset(&self, id: &AssetId) -> Result<Option<AssetDef>, StoreError> {
766 let reg = self.registry.read().await;
767 Ok(reg.get(id).cloned())
768 }
769
770 /// Posting history for `account`, paginated by seq.
771 ///
772 /// `after_seq` is EXCLUSIVE (matches sqlite semantics: `seq > after_seq`).
773 /// `limit` counts DISTINCT seqs so one transaction's postings to the same
774 /// account are never split across pages.
775 ///
776 /// Edge semantics matching sqlite:
777 /// - Unknown book or account → `UnknownAccount`.
778 /// - limit 0 → empty vec (never an error).
779 async fn account_history(
780 &self,
781 account: &AccountId,
782 after_seq: Option<Seq>,
783 limit: usize,
784 ) -> Result<Vec<PostingRecord>, StoreError> {
785 if limit == 0 {
786 return Ok(vec![]);
787 }
788
789 let writer = self
790 .existing_book(&account.book.0)
791 .await?
792 .ok_or_else(|| StoreError::UnknownAccount(account.clone()))?;
793 let st = writer.state.read().await;
794 let key = account.to_key();
795 let acct = st
796 .accounts
797 .get(&key)
798 .ok_or_else(|| StoreError::UnknownAccount(account.clone()))?;
799
800 let after = after_seq.unwrap_or(0);
801 // Start at the first entry with seq > after.
802 let start = acct.postings.partition_point_seq(after);
803
804 // Walk counting DISTINCT seqs, stopping before exceeding `limit`.
805 let mut records: Vec<PostingRecord> = Vec::new();
806 let mut distinct_seqs: usize = 0;
807 let mut last_seq: Seq = 0;
808
809 for entry in acct.postings.iter_from(start) {
810 if entry.seq != last_seq {
811 if distinct_seqs >= limit {
812 break;
813 }
814 distinct_seqs += 1;
815 last_seq = entry.seq;
816 }
817 records.push(PostingRecord {
818 seq: entry.seq,
819 txid: entry.txid.clone(),
820 account: account.clone(),
821 amount: Amount::new(entry.minor, acct.def.asset.clone()),
822 direction: entry.direction.clone(),
823 at: entry.at,
824 });
825 }
826
827 Ok(records)
828 }
829
830 /// Look up a committed transaction by its `TxId`.
831 ///
832 /// Searches all book writers' in-memory txid indexes. Found → reads the
833 /// frame at the recorded position and returns a `StoredTransaction`.
834 /// Not found → `Ok(None)` (matches sqlite semantics).
835 async fn transaction(&self, txid: &TxId) -> Result<Option<StoredTransaction>, StoreError> {
836 use talea_core::events::LedgerEvent;
837
838 // Snapshot the writers map under a read lock.
839 let writers: Vec<BookWriter> = {
840 let guard = self.books.read().await;
841 guard.values().cloned().collect()
842 };
843
844 for writer in &writers {
845 let st = writer.state.read().await;
846 if let Some(&(seq, pos)) = st.txids.get(&txid.0) {
847 // Found: release state lock before doing async I/O.
848 drop(st);
849 let wire = writer
850 .catalog
851 .read_at(pos.0, pos.1, seq)
852 .await
853 .map_err(io_err)?;
854 match wire.event {
855 LedgerEvent::TransactionPosted(tx) => {
856 return Ok(Some(StoredTransaction {
857 transaction: tx,
858 seq: wire.seq,
859 at: wire.at,
860 }));
861 }
862 _ => {
863 return Err(io_str(format!(
864 "frame at recorded position is not a transaction (seq {seq})"
865 )));
866 }
867 }
868 }
869 }
870
871 Ok(None)
872 }
873
874 /// Trial balance for `book`.
875 ///
876 /// `as_of: None` returns the current lifetime sums from in-memory state
877 /// (cheap). `as_of: Some(t)` replays all TransactionPosted frames with
878 /// `committed_at <= t` from disk (rare, documented as slow path).
879 ///
880 /// Edge semantics matching sqlite:
881 /// - Unknown book → empty vec (sqlite returns no rows for an unknown book).
882 /// - Rows are sorted by asset id string, matching sqlite's `ORDER BY asset`.
883 async fn trial_balance(
884 &self,
885 book: &Book,
886 as_of: Option<DateTime<Utc>>,
887 ) -> Result<Vec<TrialBalanceRow>, StoreError> {
888 use talea_core::events::LedgerEvent;
889
890 let writer = match self.existing_book(&book.0).await? {
891 None => return Ok(vec![]),
892 Some(w) => w,
893 };
894
895 match as_of {
896 None => {
897 let st = writer.state.read().await;
898 let mut rows: Vec<TrialBalanceRow> = st
899 .sums
900 .iter()
901 .map(|(asset, &(debits, credits))| TrialBalanceRow {
902 asset: asset.clone(),
903 debits,
904 credits,
905 })
906 .collect();
907 rows.sort_by(|a, b| a.asset.as_str().cmp(b.asset.as_str()));
908 Ok(rows)
909 }
910 Some(t) => {
911 // Disk replay: scan all events, fold TransactionPosted with at <= t.
912 // Rare operation; documented as slow path.
913 //
914 // Durability watermark: the writer appends → syncs → applies-to-state
915 // → acks. `state.next_seq` advances only AFTER fsync, so any frame
916 // with `seq < ceiling` (where `ceiling = next_seq - 1`) is guaranteed
917 // to be both durable on disk and reflected in the in-memory state.
918 // Frames beyond the ceiling may be in the page cache but not yet
919 // fsynced (the writer hasn't applied them yet); surfacing them would
920 // be a dirty read — they could vanish on crash. We therefore stop
921 // scanning once `wire.seq > ceiling`.
922 let ceiling: Seq = {
923 let st = writer.state.read().await;
924 st.next_seq.saturating_sub(1)
925 };
926
927 let events = writer
928 .catalog
929 .scan_from(1, usize::MAX)
930 .await
931 .map_err(io_err)?;
932
933 let mut sums: HashMap<AssetId, (i64, i64)> = HashMap::new();
934 for wire in events {
935 if wire.seq > ceiling {
936 break; // beyond durability watermark — stop
937 }
938 if wire.at > t {
939 break; // committed_at is non-decreasing; can stop early
940 }
941 if let LedgerEvent::TransactionPosted(tx) = wire.event {
942 for posting in &tx.postings {
943 let entry =
944 sums.entry(posting.amount.asset().clone()).or_insert((0, 0));
945 match posting.direction {
946 Direction::Debit => {
947 entry.0 = entry.0.saturating_add(posting.amount.minor())
948 }
949 Direction::Credit => {
950 entry.1 = entry.1.saturating_add(posting.amount.minor())
951 }
952 }
953 }
954 }
955 }
956
957 let mut rows: Vec<TrialBalanceRow> = sums
958 .into_iter()
959 .map(|(asset, (debits, credits))| TrialBalanceRow {
960 asset,
961 debits,
962 credits,
963 })
964 .collect();
965 rows.sort_by(|a, b| a.asset.as_str().cmp(b.asset.as_str()));
966 Ok(rows)
967 }
968 }
969 }
970
971 /// Read events from `book` starting at `from` (INCLUSIVE), returning at
972 /// most `limit`.
973 ///
974 /// Edge semantics matching sqlite:
975 /// - Unknown book → empty vec (sqlite returns no rows).
976 ///
977 /// # Durability watermark
978 ///
979 /// Log-replay reads must never surface frames that the in-memory state
980 /// hasn't applied yet. The writer pipeline is: append → fsync →
981 /// apply-to-state → ack. `state.next_seq` advances only after fsync AND
982 /// apply, so `ceiling = next_seq - 1` is the highest seq that is both
983 /// durable on disk and consistent with the in-memory index. Frames with
984 /// `seq > ceiling` may be page-cache-visible but not fsynced; returning
985 /// them would be a dirty read — they could vanish on crash or diverge from
986 /// in-memory state. We drop the state lock before scanning and filter out
987 /// any frames that exceed the ceiling.
988 async fn read_events(
989 &self,
990 book: &Book,
991 from: Seq,
992 limit: usize,
993 ) -> Result<Vec<Sequenced<LedgerEvent>>, StoreError> {
994 let writer = match self.existing_book(&book.0).await? {
995 None => return Ok(vec![]),
996 Some(w) => w,
997 };
998
999 // Read the durability ceiling under a short-lived read lock, then
1000 // release before doing disk I/O.
1001 let ceiling: Seq = {
1002 let st = writer.state.read().await;
1003 st.next_seq.saturating_sub(1)
1004 };
1005
1006 let wires = writer
1007 .catalog
1008 .scan_from(from, limit)
1009 .await
1010 .map_err(io_err)?;
1011
1012 Ok(wires
1013 .into_iter()
1014 .take_while(|w| w.seq <= ceiling)
1015 .map(|w| Sequenced {
1016 seq: w.seq,
1017 at: w.at,
1018 event: w.event,
1019 })
1020 .collect())
1021 }
1022
1023 fn subscribe(
1024 &self,
1025 book: &Book,
1026 from: Seq,
1027 ) -> BoxStream<'static, Result<Sequenced<LedgerEvent>, StoreError>> {
1028 // subscribe is sync — clone everything we need into the stream before returning.
1029 let books = Arc::clone(&self.books);
1030 let dir = self.dir.clone();
1031 let batch_max = self.batch_max;
1032 let segment_max = self.segment_max;
1033 let snapshot_every = self.snapshot_every;
1034 let book = book.clone();
1035
1036 Box::pin(async_stream::try_stream! {
1037 // Validate the book name before touching the filesystem. Every
1038 // other entry point calls validate_book_name via book_writer /
1039 // existing_book; subscribe builds the path directly, so we must
1040 // check here.
1041 validate_book_name(&book.0)?;
1042
1043 // Step 1: get-or-create the BookWriter inside the async stream so we
1044 // can await it. This mirrors sqlite's behaviour: subscribing to a book
1045 // that has never been written to still works; future writes will be
1046 // received live.
1047 let writer: BookWriter = {
1048 // Fast path: read lock.
1049 let maybe = {
1050 let g = books.read().await;
1051 g.get(&book.0).cloned()
1052 };
1053 if let Some(w) = maybe {
1054 w
1055 } else {
1056 // Slow path: write lock, get-or-create.
1057 let mut g = books.write().await;
1058 if let Some(w) = g.get(&book.0).cloned() {
1059 w
1060 } else {
1061 let book_dir = dir.join("books").join(&book.0);
1062 tokio::fs::create_dir_all(&book_dir).await.map_err(|e| StoreError::Io(Box::new(e)))?;
1063 let mut initial_st = crate::state::BookState::default();
1064 initial_st.idem.attach_dir(&book_dir, || async { Ok(vec![]) }).await.map_err(|e| StoreError::Io(Box::new(e)))?;
1065 let state = Arc::new(RwLock::new(initial_st));
1066 let w = BookWriter::spawn_with_opts(book_dir, state, batch_max, snapshot_every, segment_max)
1067 .await
1068 .map_err(|e| StoreError::Io(Box::new(e)))?;
1069 g.insert(book.0.clone(), w.clone());
1070 w
1071 }
1072 }
1073 };
1074
1075 // Step 2: subscribe to the live broadcast channel BEFORE reading history
1076 // so nothing between catch-up and live-tail is missed.
1077 let mut live = writer.events.subscribe();
1078
1079 // Step 3: page through history from `from` up to the durability watermark.
1080 let page_size = 512usize;
1081 let mut next = from;
1082
1083 loop {
1084 // Ceiling at the start of each page so we don't surface un-applied frames.
1085 let ceiling: Seq = {
1086 let st = writer.state.read().await;
1087 st.next_seq.saturating_sub(1)
1088 };
1089 if next > ceiling {
1090 break;
1091 }
1092 let limit = page_size.min((ceiling - next + 1) as usize);
1093 let wires = writer.catalog.scan_from(next, limit).await.map_err(io_err)?;
1094 let page_len = wires.len();
1095 for wire in wires {
1096 // Double-check ceiling in case ceiling moved backward (shouldn't happen,
1097 // but be defensive).
1098 if wire.seq > ceiling {
1099 break;
1100 }
1101 next = wire.seq + 1;
1102 yield Sequenced { seq: wire.seq, at: wire.at, event: wire.event };
1103 }
1104 // A short page means we're caught up to the watermark.
1105 if page_len < limit {
1106 break;
1107 }
1108 }
1109
1110 // Step 4: live-tail loop.
1111 let mut last: Seq = next.saturating_sub(1);
1112 loop {
1113 match live.recv().await {
1114 Ok(ev) => {
1115 if ev.seq <= last {
1116 // overlap from catch-up — skip
1117 continue;
1118 }
1119 last = ev.seq;
1120 yield Sequenced { seq: ev.seq, at: ev.at, event: ev.event };
1121 }
1122 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
1123 // We fell behind the broadcast ring — re-page from last+1.
1124 let resume = last + 1;
1125 let page_size_lag = 512usize;
1126 let mut cursor = resume;
1127 loop {
1128 let ceiling: Seq = {
1129 let st = writer.state.read().await;
1130 st.next_seq.saturating_sub(1)
1131 };
1132 if cursor > ceiling {
1133 break;
1134 }
1135 let limit = page_size_lag.min((ceiling - cursor + 1) as usize);
1136 let wires = writer.catalog.scan_from(cursor, limit).await.map_err(io_err)?;
1137 let page_len = wires.len();
1138 for wire in wires {
1139 if wire.seq > ceiling {
1140 break;
1141 }
1142 if wire.seq > last {
1143 last = wire.seq;
1144 cursor = wire.seq + 1;
1145 yield Sequenced { seq: wire.seq, at: wire.at, event: wire.event };
1146 } else {
1147 cursor = wire.seq + 1;
1148 }
1149 }
1150 if page_len < limit {
1151 break;
1152 }
1153 }
1154 // After re-paging, resume the live loop (re-subscribe isn't
1155 // possible but new events will arrive normally).
1156 }
1157 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
1158 // Writer died — no more events. End the stream cleanly (matches sqlite).
1159 return;
1160 }
1161 }
1162 }
1163 })
1164 }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169 use super::*;
1170 use crate::writer::BookWriter;
1171 use talea_core::store::{AccountCfg, Store};
1172 use talea_core::types::*;
1173
1174 fn mk_tx(key: &str, minor: i64) -> Transaction {
1175 Transaction {
1176 id: TxId(uuid::Uuid::now_v7()),
1177 book: Book("b".into()),
1178 postings: vec![
1179 Posting {
1180 account: AccountId {
1181 book: Book("b".into()),
1182 path: "cash".into(),
1183 },
1184 amount: Amount::new(minor, AssetId::new("USD")),
1185 direction: Direction::Debit,
1186 },
1187 Posting {
1188 account: AccountId {
1189 book: Book("b".into()),
1190 path: "rev".into(),
1191 },
1192 amount: Amount::new(minor, AssetId::new("USD")),
1193 direction: Direction::Credit,
1194 },
1195 ],
1196 idempotency_key: IdempotencyKey(key.into()),
1197 external_refs: vec![],
1198 metadata: serde_json::Value::Null,
1199 occurred_at: chrono::Utc::now(),
1200 }
1201 }
1202
1203 fn usd() -> AssetDef {
1204 AssetDef {
1205 id: AssetId::new("USD"),
1206 class: AssetClass::Fiat,
1207 precision: 2,
1208 name: "Dollar".into(),
1209 }
1210 }
1211
1212 fn cash_def() -> AccountDef {
1213 AccountDef {
1214 id: AccountId {
1215 book: Book("b".into()),
1216 path: "cash".into(),
1217 },
1218 asset: AssetId::new("USD"),
1219 kind: AccountKind::Asset,
1220 }
1221 }
1222
1223 fn rev_def() -> AccountDef {
1224 AccountDef {
1225 id: AccountId {
1226 book: Book("b".into()),
1227 path: "rev".into(),
1228 },
1229 asset: AssetId::new("USD"),
1230 kind: AccountKind::Income,
1231 }
1232 }
1233
1234 async fn seeded(dir: &std::path::Path) -> LogTaleaStore {
1235 let store = LogTaleaStore::open(dir).await.unwrap();
1236 store.register_asset(&usd()).await.unwrap();
1237 let cfg = AccountCfg {
1238 normal_side: None,
1239 min_balance: None,
1240 };
1241 store.open_account(&cash_def(), &cfg).await.unwrap();
1242 store.open_account(&rev_def(), &cfg).await.unwrap();
1243 store
1244 }
1245
1246 #[tokio::test]
1247 async fn open_recovers_state_by_replay() {
1248 let dir = tempfile::tempdir().unwrap();
1249 {
1250 let store = seeded(dir.path()).await;
1251 store.commit(&mk_tx("k1", 25)).await.unwrap();
1252 store.shutdown().await;
1253 }
1254 let store = LogTaleaStore::open(dir.path()).await.unwrap();
1255 let bal = store.balance(&cash_def().id, None).await.unwrap();
1256 assert_eq!(bal.amount.minor(), 25);
1257 assert_eq!(bal.updated_seq, 3); // seq 1 = cash open, 2 = rev open, 3 = tx
1258 assert_eq!(
1259 store.asset(&AssetId::new("USD")).await.unwrap(),
1260 Some(usd())
1261 );
1262 // idempotency survives restart: replay returns the prior commit
1263 let replay = store.commit(&mk_tx("k1", 25)).await.unwrap();
1264 assert_eq!(replay.seq, 3);
1265 // and balance is unchanged
1266 assert_eq!(
1267 store
1268 .balance(&cash_def().id, None)
1269 .await
1270 .unwrap()
1271 .amount
1272 .minor(),
1273 25
1274 );
1275 }
1276
1277 #[tokio::test]
1278 async fn second_open_on_same_dir_is_refused() {
1279 let dir = tempfile::tempdir().unwrap();
1280 let _first = LogTaleaStore::open(dir.path()).await.unwrap();
1281 assert!(LogTaleaStore::open(dir.path()).await.is_err());
1282 }
1283
1284 #[tokio::test]
1285 async fn register_asset_idempotent_same_def_conflict_different() {
1286 let dir = tempfile::tempdir().unwrap();
1287 let store = LogTaleaStore::open(dir.path()).await.unwrap();
1288 store.register_asset(&usd()).await.unwrap();
1289 store.register_asset(&usd()).await.unwrap(); // same def: Ok, no second event
1290 let mut other = usd();
1291 other.precision = 8;
1292 assert!(matches!(
1293 store.register_asset(&other).await,
1294 Err(talea_core::store::StoreError::AlreadyExists { .. })
1295 ));
1296 }
1297
1298 #[tokio::test]
1299 async fn open_account_requires_registered_asset_and_real_book() {
1300 let dir = tempfile::tempdir().unwrap();
1301 let store = LogTaleaStore::open(dir.path()).await.unwrap();
1302 let cfg = AccountCfg {
1303 normal_side: None,
1304 min_balance: None,
1305 };
1306 assert!(matches!(
1307 store.open_account(&cash_def(), &cfg).await,
1308 Err(talea_core::store::StoreError::UnknownAsset(_))
1309 ));
1310 store.register_asset(&usd()).await.unwrap();
1311 let mut sys = cash_def();
1312 sys.id.book = Book("_system".into());
1313 assert!(matches!(
1314 store.open_account(&sys, &cfg).await,
1315 Err(talea_core::store::StoreError::InvalidBook(_))
1316 ));
1317 }
1318
1319 #[tokio::test]
1320 async fn commit_batch_is_positional_and_isolates_failures() {
1321 let dir = tempfile::tempdir().unwrap();
1322 let store = seeded(dir.path()).await;
1323 let mut bad = mk_tx("bad", 1);
1324 bad.postings[0].account.path = "ghost".into();
1325 let txs = vec![mk_tx("a", 1), bad, mk_tx("b", 2)];
1326 let out = store.commit_batch(&txs).await;
1327 assert!(out[0].is_ok());
1328 assert!(matches!(
1329 out[1],
1330 Err(talea_core::store::StoreError::UnknownAccount(_))
1331 ));
1332 assert!(out[2].is_ok());
1333 assert_eq!(
1334 out[0].as_ref().unwrap().seq + 1,
1335 out[2].as_ref().unwrap().seq,
1336 "gapless across the reject"
1337 );
1338 }
1339
1340 #[tokio::test]
1341 async fn store_use_after_shutdown_fails_cleanly() {
1342 let dir = tempfile::tempdir().unwrap();
1343 let store = seeded(dir.path()).await;
1344
1345 // Commit one tx so there's real state to inspect.
1346 store.commit(&mk_tx("k1", 10)).await.unwrap();
1347
1348 store.shutdown().await;
1349
1350 // commit must return Err, not panic, not succeed.
1351 let commit_res = store.commit(&mk_tx("k2", 5)).await;
1352 assert!(commit_res.is_err(), "commit after shutdown must fail");
1353
1354 // open_account must return Err.
1355 let cfg = AccountCfg {
1356 normal_side: None,
1357 min_balance: None,
1358 };
1359 let open_res = store.open_account(&cash_def(), &cfg).await;
1360 assert!(open_res.is_err(), "open_account after shutdown must fail");
1361
1362 // balance reads via book_writer — after shutdown the books map is empty,
1363 // so book_writer returns the shut_down error before it can recreate anything.
1364 let bal_res = store.balance(&cash_def().id, None).await;
1365 assert!(bal_res.is_err(), "balance after shutdown must fail");
1366 }
1367
1368 #[tokio::test]
1369 async fn empty_book_name_is_rejected() {
1370 let dir = tempfile::tempdir().unwrap();
1371 let store = LogTaleaStore::open(dir.path()).await.unwrap();
1372 store.register_asset(&usd()).await.unwrap();
1373
1374 // An empty book name must be rejected by validate_book_name, which
1375 // book_writer calls. Using commit (which calls book_writer) is the
1376 // simplest exerciser.
1377 let mut tx = mk_tx("empty-book", 1);
1378 tx.book = Book("".into());
1379 tx.postings[0].account.book = Book("".into());
1380 tx.postings[1].account.book = Book("".into());
1381 let res = store.commit(&tx).await;
1382 assert!(res.is_err(), "empty book name must be rejected");
1383 let err_msg = format!("{:?}", res.unwrap_err());
1384 assert!(
1385 err_msg.contains("invalid book name"),
1386 "error should mention invalid book name: {err_msg}"
1387 );
1388 }
1389
1390 // -----------------------------------------------------------------------
1391 // Fix 1: read paths must not create books
1392 // -----------------------------------------------------------------------
1393
1394 /// `balance` for an account in a book that was never written must return
1395 /// `UnknownAccount` and must NOT create the book directory on disk.
1396 #[tokio::test]
1397 async fn balance_on_unknown_book_creates_nothing() {
1398 let dir = tempfile::tempdir().unwrap();
1399 let store = LogTaleaStore::open(dir.path()).await.unwrap();
1400
1401 let typo_account = AccountId {
1402 book: Book("typo".into()),
1403 path: "cash".into(),
1404 };
1405 let res = store.balance(&typo_account, None).await;
1406
1407 assert!(
1408 matches!(res, Err(talea_core::store::StoreError::UnknownAccount(_))),
1409 "expected UnknownAccount for nonexistent book, got {res:?}",
1410 );
1411
1412 // The book directory must NOT have been created.
1413 let book_dir = dir.path().join("books").join("typo");
1414 assert!(
1415 !book_dir.exists(),
1416 "balance must not create books/typo on disk, but it exists: {book_dir:?}",
1417 );
1418 }
1419
1420 // -----------------------------------------------------------------------
1421 // Fix 2: replay of corrupt log must error, not panic
1422 // -----------------------------------------------------------------------
1423
1424 /// Build a valid store, shut it down, hand-append a structurally valid
1425 /// (correct CRC) but arithmetically impossible frame (debits i64::MAX
1426 /// from an account whose balance is already > 0), then reopen. The open
1427 /// must return `Err(StoreError::Io(…))` mentioning "corrupt log", not
1428 /// panic.
1429 #[tokio::test]
1430 async fn replay_of_overflowing_log_errors_not_panics() {
1431 use crate::frame::{WireEvent, encode_frame};
1432
1433 let dir = tempfile::tempdir().unwrap();
1434
1435 // 1. Build a valid store with one committed transaction.
1436 {
1437 let store = seeded(dir.path()).await;
1438 // commit one tx so cash has balance > 0 (raw_balance = 100)
1439 store.commit(&mk_tx("k1", 100)).await.unwrap();
1440 store.shutdown().await;
1441 }
1442 // At this point next_seq in book "b" is 4 (seq1=cash open, seq2=rev
1443 // open, seq3=k1 tx). We forge seq 4 outside the store.
1444
1445 // 2. Find the segment file for book "b" (always segment-00000000000000000001.log
1446 // since the test store is small and never rotates).
1447 let book_dir = dir.path().join("books").join("b");
1448 let seg_path = book_dir.join("segment-00000000000000000001.log");
1449 assert!(seg_path.exists(), "segment file must exist: {seg_path:?}");
1450
1451 // 3. Forge a frame that is structurally valid (correct CRC) but whose
1452 // debit of i64::MAX against a balance of 100 cannot fit in i64.
1453 let forged_tx = Transaction {
1454 id: TxId(uuid::Uuid::now_v7()),
1455 book: Book("b".into()),
1456 postings: vec![
1457 Posting {
1458 account: AccountId {
1459 book: Book("b".into()),
1460 path: "cash".into(),
1461 },
1462 amount: Amount::new(i64::MAX, AssetId::new("USD")),
1463 direction: Direction::Debit,
1464 },
1465 Posting {
1466 account: AccountId {
1467 book: Book("b".into()),
1468 path: "rev".into(),
1469 },
1470 amount: Amount::new(i64::MAX, AssetId::new("USD")),
1471 direction: Direction::Credit,
1472 },
1473 ],
1474 idempotency_key: IdempotencyKey("overflow-forge".into()),
1475 external_refs: vec![],
1476 metadata: serde_json::Value::Null,
1477 occurred_at: chrono::Utc::now(),
1478 };
1479 let forged_wire = WireEvent {
1480 seq: 4, // next after the 3 valid events
1481 at: chrono::Utc::now(),
1482 event: talea_core::events::LedgerEvent::TransactionPosted(forged_tx),
1483 };
1484 let frame_bytes = encode_frame(&forged_wire).expect("encode must succeed");
1485
1486 // Append the forged frame directly.
1487 {
1488 use std::io::Write;
1489 let mut f = std::fs::OpenOptions::new()
1490 .append(true)
1491 .open(&seg_path)
1492 .expect("segment file must exist for append");
1493 f.write_all(&frame_bytes).unwrap();
1494 }
1495
1496 // 4. Reopen — must return Err mentioning "corrupt log", not panic.
1497 let result = LogTaleaStore::open(dir.path()).await;
1498 match result {
1499 Err(StoreError::Io(ref msg)) => {
1500 let s = msg.to_string();
1501 assert!(
1502 s.contains("corrupt log"),
1503 "error must mention 'corrupt log', got: {s}",
1504 );
1505 }
1506 Ok(_) => panic!("open must fail on overflow replay, but it succeeded"),
1507 Err(other) => panic!("expected StoreError::Io, got {other:?}"),
1508 }
1509 }
1510
1511 // -----------------------------------------------------------------------
1512 // Task 7: read path tests
1513 // -----------------------------------------------------------------------
1514
1515 /// Build a store with 3 transactions at distinct timestamps.
1516 /// Returns (store, txids, commit_ats).
1517 /// Seqs: 1 = cash open, 2 = rev open, 3/4/5 = the 3 transactions.
1518 async fn history_fixture(
1519 dir: &std::path::Path,
1520 ) -> (LogTaleaStore, Vec<TxId>, Vec<chrono::DateTime<chrono::Utc>>) {
1521 let store = seeded(dir).await;
1522 let mut txids = Vec::new();
1523 let mut times = Vec::new();
1524
1525 for (key, minor) in [("tx1", 10i64), ("tx2", 20), ("tx3", 30)] {
1526 let c = store.commit(&mk_tx(key, minor)).await.unwrap();
1527 txids.push(c.txid);
1528 times.push(c.at);
1529 // Ensure distinct microsecond timestamps for as_of binary search.
1530 tokio::time::sleep(std::time::Duration::from_millis(2)).await;
1531 }
1532
1533 (store, txids, times)
1534 }
1535
1536 #[tokio::test]
1537 async fn balance_as_of_binary_searches_posting_entries() {
1538 let dir = tempfile::tempdir().unwrap();
1539 let (store, _txids, times) = history_fixture(dir.path()).await;
1540
1541 let cash = cash_def().id;
1542
1543 // as_of before t1: no postings → amount 0, updated_seq 0.
1544 let before = times[0] - chrono::Duration::microseconds(1);
1545 let snap = store.balance(&cash, Some(before)).await.unwrap();
1546 assert_eq!(
1547 snap.amount.minor(),
1548 0,
1549 "as_of before first commit must be 0"
1550 );
1551 assert_eq!(
1552 snap.updated_seq, 0,
1553 "as_of before first commit: updated_seq must be 0"
1554 );
1555
1556 // as_of = t2 (second commit's at) → balance 30 (10+20), updated_seq = 4.
1557 let snap2 = store.balance(&cash, Some(times[1])).await.unwrap();
1558 assert_eq!(
1559 snap2.amount.minor(),
1560 30,
1561 "balance after 2 commits must be 30"
1562 );
1563 assert_eq!(
1564 snap2.updated_seq, 4,
1565 "updated_seq after 2 commits must be 4"
1566 );
1567
1568 // as_of = t3 → 60, updated_seq 5.
1569 let snap3 = store.balance(&cash, Some(times[2])).await.unwrap();
1570 assert_eq!(
1571 snap3.amount.minor(),
1572 60,
1573 "balance after 3 commits must be 60"
1574 );
1575 assert_eq!(
1576 snap3.updated_seq, 5,
1577 "updated_seq after 3 commits must be 5"
1578 );
1579 }
1580
1581 #[tokio::test]
1582 async fn account_history_pages_by_distinct_seq_after_seq_exclusive() {
1583 let dir = tempfile::tempdir().unwrap();
1584 let (store, _txids, times) = history_fixture(dir.path()).await;
1585
1586 let cash = cash_def().id;
1587
1588 // after_seq None, limit 2 → postings of seqs 3, 4 only.
1589 let page1 = store.account_history(&cash, None, 2).await.unwrap();
1590 assert_eq!(page1.len(), 2, "limit 2 must return 2 records");
1591 assert_eq!(page1[0].seq, 3);
1592 assert_eq!(page1[1].seq, 4);
1593 assert_eq!(page1[0].amount.minor(), 10);
1594 assert_eq!(page1[1].amount.minor(), 20);
1595 assert!(matches!(page1[0].direction, Direction::Debit));
1596 // at must match the commit time
1597 assert_eq!(page1[0].at, times[0]);
1598 assert_eq!(page1[1].at, times[1]);
1599
1600 // Resume after_seq Some(4), limit 10 → only seq 5.
1601 let page2 = store.account_history(&cash, Some(4), 10).await.unwrap();
1602 assert_eq!(page2.len(), 1);
1603 assert_eq!(page2[0].seq, 5);
1604 assert_eq!(page2[0].amount.minor(), 30);
1605
1606 // limit 0 → empty.
1607 let empty = store.account_history(&cash, None, 0).await.unwrap();
1608 assert!(empty.is_empty(), "limit 0 must return empty vec");
1609 }
1610
1611 #[tokio::test]
1612 async fn transaction_round_trips_through_disk() {
1613 let dir = tempfile::tempdir().unwrap();
1614 let (store, txids, times) = history_fixture(dir.path()).await;
1615
1616 // transaction(txid of 2nd commit) → seq 4, at == t2.
1617 let stored = store
1618 .transaction(&txids[1])
1619 .await
1620 .unwrap()
1621 .expect("should find 2nd tx");
1622 assert_eq!(stored.seq, 4, "2nd transaction must have seq 4");
1623 assert_eq!(stored.at, times[1], "committed_at must match");
1624 assert_eq!(stored.transaction.idempotency_key.0, "tx2");
1625
1626 // Unknown uuid → Ok(None).
1627 let unknown = TxId(uuid::Uuid::now_v7());
1628 let result = store.transaction(&unknown).await.unwrap();
1629 assert!(result.is_none(), "unknown txid must return Ok(None)");
1630 }
1631
1632 #[tokio::test]
1633 async fn trial_balance_none_and_as_of() {
1634 let dir = tempfile::tempdir().unwrap();
1635 let (store, _txids, times) = history_fixture(dir.path()).await;
1636
1637 let book = Book("b".into());
1638
1639 // None: both accounts use USD; debits = 60 (10+20+30 to cash), credits = 60 (to rev).
1640 let tb = store.trial_balance(&book, None).await.unwrap();
1641 assert_eq!(tb.len(), 1, "one USD row");
1642 assert_eq!(tb[0].asset, AssetId::new("USD"));
1643 assert_eq!(tb[0].debits, 60);
1644 assert_eq!(tb[0].credits, 60);
1645
1646 // as_of = t2: debits 30 (10+20), credits 30.
1647 let tb2 = store.trial_balance(&book, Some(times[1])).await.unwrap();
1648 assert_eq!(tb2.len(), 1);
1649 assert_eq!(tb2[0].debits, 30);
1650 assert_eq!(tb2[0].credits, 30);
1651
1652 // Unknown book → empty vec (matches sqlite).
1653 let unknown = Book("ghost".into());
1654 let tb3 = store.trial_balance(&unknown, None).await.unwrap();
1655 assert!(tb3.is_empty(), "unknown book must return empty vec");
1656 }
1657
1658 #[tokio::test]
1659 async fn read_events_from_inclusive_pages() {
1660 let dir = tempfile::tempdir().unwrap();
1661 let (store, _txids, _times) = history_fixture(dir.path()).await;
1662
1663 let book = Book("b".into());
1664
1665 // from=2, limit=2 → seqs [2, 3].
1666 let evs = store.read_events(&book, 2, 2).await.unwrap();
1667 assert_eq!(evs.len(), 2);
1668 assert_eq!(evs[0].seq, 2);
1669 assert_eq!(evs[1].seq, 3);
1670
1671 // from=5, limit=10 → [5].
1672 let evs5 = store.read_events(&book, 5, 10).await.unwrap();
1673 assert_eq!(evs5.len(), 1);
1674 assert_eq!(evs5[0].seq, 5);
1675
1676 // from past the end → empty.
1677 let evs_end = store.read_events(&book, 999, 10).await.unwrap();
1678 assert!(evs_end.is_empty(), "from past end must return empty vec");
1679
1680 // Unknown book → empty vec (matches sqlite).
1681 let unknown = Book("ghost".into());
1682 let evs_ghost = store.read_events(&unknown, 1, 10).await.unwrap();
1683 assert!(evs_ghost.is_empty(), "unknown book must return empty vec");
1684 }
1685
1686 // -----------------------------------------------------------------------
1687 // Hazard B: durability-watermark tests
1688 // -----------------------------------------------------------------------
1689
1690 /// Verify that `read_events` never surfaces frames beyond the durability
1691 /// ceiling (`state.next_seq - 1`).
1692 ///
1693 /// After 3 commits the ceiling equals the highest committed seq.
1694 /// `read_events(from=1, limit=100)` must return exactly seqs 1..=ceiling
1695 /// (the two AccountOpened events + 3 TransactionPosted = 5 frames) with no
1696 /// extras. This pins the ceiling-filter plumbing: if the filter were
1697 /// absent, a page-cache frame beyond the ceiling could appear.
1698 #[tokio::test]
1699 async fn read_events_excludes_frames_beyond_ceiling() {
1700 let dir = tempfile::tempdir().unwrap();
1701 let (store, _txids, _times) = history_fixture(dir.path()).await;
1702
1703 let book = Book("b".into());
1704
1705 // 3 commits + 2 account opens = 5 events total; next_seq = 6.
1706 // ceiling = 5.
1707 let evs = store.read_events(&book, 1, 100).await.unwrap();
1708 let seqs: Vec<Seq> = evs.iter().map(|e| e.seq).collect();
1709
1710 // Must have exactly seqs 1..=5 and no more.
1711 assert_eq!(
1712 seqs,
1713 vec![1, 2, 3, 4, 5],
1714 "read_events must return exactly seqs 1..=ceiling"
1715 );
1716
1717 // Also verify from= skipping still applies within the ceiling.
1718 let evs_from4 = store.read_events(&book, 4, 100).await.unwrap();
1719 let seqs_from4: Vec<Seq> = evs_from4.iter().map(|e| e.seq).collect();
1720 assert_eq!(
1721 seqs_from4,
1722 vec![4, 5],
1723 "from=4 must return seqs 4 and 5 only"
1724 );
1725 }
1726
1727 // -----------------------------------------------------------------------
1728 // Task 12: Snapshot-assisted open path
1729 // -----------------------------------------------------------------------
1730
1731 /// Verify that `open` uses a snapshot for replay when one exists.
1732 ///
1733 /// # What this test proves
1734 ///
1735 /// 1. After shutdown + reopen, balances reflect all commits (basic sanity).
1736 /// 2. The snapshot file exists on disk after `snapshot_now`.
1737 /// 3. `load_latest` after the final shutdown returns a snapshot with seq
1738 /// >= the 5th commit's seq, proving the snapshot was taken at or after
1739 /// that point and that the open path would have used it.
1740 ///
1741 /// What we do NOT try to prove here: that the replay *skipped* pre-snapshot
1742 /// frames (that would require corrupting early frames, which is invasive
1743 /// and risks making the test fragile). The correctness of the load path
1744 /// itself is proven by `snapshot_round_trips_book_state` and the fact that
1745 /// balances are correct after reopen.
1746 #[tokio::test]
1747 async fn open_uses_snapshot_plus_tail() {
1748 use crate::snapshot;
1749
1750 let dir = tempfile::tempdir().unwrap();
1751
1752 // Seqs: 1 = _system (USD), 2 = cash open, 3 = rev open (all in separate
1753 // books: _system and "b"). Within book "b", seqs are 1..=N per-book.
1754 // We commit 5 transactions to book "b", then snapshot, then 2 more.
1755
1756 let (first_5_seqs, snap_seq_at_least): (Vec<Seq>, Seq);
1757 {
1758 let store = seeded(dir.path()).await;
1759
1760 let mut seqs = Vec::new();
1761 for i in 0..5 {
1762 let c = store.commit(&mk_tx(&format!("k{i}"), 1)).await.unwrap();
1763 seqs.push(c.seq);
1764 }
1765
1766 // Snapshot after 5 commits. The snapshot seq == last applied seq
1767 // in book "b" (3 events for asset+accounts are in _system, so in
1768 // "b" the seqs are [1..5] for the 5 txs; actual seq depends on
1769 // account opens going to "b" before commits).
1770 // The point is: after snapshot_now, a .snap file must exist.
1771 store.snapshot_now("b").await.unwrap();
1772
1773 // 2 more commits after the snapshot.
1774 for i in 5..7 {
1775 store.commit(&mk_tx(&format!("k{i}"), 2)).await.unwrap();
1776 }
1777
1778 first_5_seqs = seqs;
1779 // snap_seq_at_least: the snapshot must be >= the 5th commit's seq.
1780 snap_seq_at_least = *first_5_seqs.last().unwrap();
1781
1782 store.shutdown().await;
1783 }
1784
1785 // --- Verify snapshot file exists for book "b" ---
1786 let book_dir = dir.path().join("books").join("b");
1787 let snap = snapshot::load_latest(&book_dir).await.unwrap();
1788 assert!(
1789 snap.is_some(),
1790 "snapshot must exist for book b after snapshot_now + shutdown"
1791 );
1792 let (_, snap_seq) = snap.unwrap();
1793 assert!(
1794 snap_seq >= snap_seq_at_least,
1795 "snapshot seq {snap_seq} must be >= 5th-commit seq {snap_seq_at_least}"
1796 );
1797
1798 // --- Reopen and verify all 7 commits are reflected ---
1799 let store2 = LogTaleaStore::open(dir.path()).await.unwrap();
1800
1801 // cash should have: 5 × 1 + 2 × 2 = 9 total debited.
1802 let bal = store2.balance(&cash_def().id, None).await.unwrap();
1803 assert_eq!(
1804 bal.amount.minor(),
1805 9,
1806 "all 7 commits must be visible after reopen; got {}",
1807 bal.amount.minor()
1808 );
1809
1810 // Idempotency: replay of any of the first 5 keys returns the original seq.
1811 for (i, &orig_seq) in first_5_seqs.iter().enumerate() {
1812 let dup = store2.commit(&mk_tx(&format!("k{i}"), 1)).await.unwrap();
1813 assert_eq!(
1814 dup.seq, orig_seq,
1815 "idem key k{i} must resolve to original seq {orig_seq} after reopen, got {}",
1816 dup.seq
1817 );
1818 }
1819 }
1820
1821 // -----------------------------------------------------------------------
1822 // Task 13: Snapshot interplay with spilled idem runs
1823 // -----------------------------------------------------------------------
1824
1825 /// Commit enough transactions to overflow the hot idem cap (forcing spills),
1826 /// take a snapshot, reopen, and verify that BOTH hot (recent) and spilled
1827 /// (older) idem keys dedup correctly.
1828 ///
1829 /// Uses a small `idem_hot_cap` (8) so a handful of commits forces overflow.
1830 #[tokio::test]
1831 async fn snapshot_interplay_hot_and_spilled_keys_dedup() {
1832 let dir = tempfile::tempdir().unwrap();
1833
1834 // Open with a tiny hot cap so we force spills with few commits.
1835 let opts = LogStoreOptions {
1836 idem_hot_cap: 8,
1837 snapshot_every: BookWriter::DEFAULT_SNAPSHOT_EVERY,
1838 segment_max: crate::segment::DEFAULT_SEGMENT_MAX,
1839 };
1840
1841 let orig_seqs: Vec<talea_core::types::Seq>;
1842 {
1843 let store = LogTaleaStore::open_with(dir.path(), opts.clone())
1844 .await
1845 .unwrap();
1846 store.register_asset(&usd()).await.unwrap();
1847 let cfg = AccountCfg {
1848 normal_side: None,
1849 min_balance: None,
1850 };
1851 store.open_account(&cash_def(), &cfg).await.unwrap();
1852 store.open_account(&rev_def(), &cfg).await.unwrap();
1853
1854 // Commit 20 transactions (enough to spill with cap=8).
1855 let n = 20usize;
1856 let mut seqs = Vec::with_capacity(n);
1857 for i in 0..n {
1858 let c = store
1859 .commit(&mk_tx(&format!("spill-idem-{i:03}"), 1))
1860 .await
1861 .unwrap();
1862 seqs.push(c.seq);
1863 }
1864
1865 // Take a snapshot so the next open uses it.
1866 store.snapshot_now("b").await.unwrap();
1867
1868 orig_seqs = seqs;
1869
1870 store.shutdown().await;
1871 }
1872
1873 // Reopen with the same small cap — spill run files should be present.
1874 let store2 = LogTaleaStore::open_with(dir.path(), opts).await.unwrap();
1875
1876 // ALL 20 keys must dedup to their original seq/txid.
1877 for (i, &orig_seq) in orig_seqs.iter().enumerate() {
1878 let key = format!("spill-idem-{i:03}");
1879 let dup = store2.commit(&mk_tx(&key, 1)).await.unwrap();
1880 assert_eq!(
1881 dup.seq, orig_seq,
1882 "idem key {key} must resolve to orig seq {orig_seq} after reopen with spilled runs; got {}",
1883 dup.seq,
1884 );
1885 }
1886 }
1887
1888 // -----------------------------------------------------------------------
1889 // Task 8: subscribe catch-up + live tail
1890 // -----------------------------------------------------------------------
1891
1892 /// subscribe with an invalid book name must yield Err as the first item and
1893 /// must NOT create any directory outside the books/ subtree.
1894 #[tokio::test]
1895 async fn subscribe_invalid_book_yields_err_and_creates_nothing() {
1896 use futures::StreamExt;
1897 let dir = tempfile::tempdir().unwrap();
1898 let store = LogTaleaStore::open(dir.path()).await.unwrap();
1899
1900 let mut stream = store.subscribe(&Book("../evil".into()), 1);
1901 let first = stream
1902 .next()
1903 .await
1904 .expect("stream must yield at least one item");
1905 assert!(
1906 first.is_err(),
1907 "first item must be Err for invalid book name"
1908 );
1909
1910 // The directory that would have been created by path traversal must not exist.
1911 let evil_dir = dir.path().join("evil");
1912 assert!(
1913 !evil_dir.exists(),
1914 "subscribe must not create the traversal target directory, but {:?} exists",
1915 evil_dir,
1916 );
1917 }
1918
1919 #[tokio::test]
1920 async fn subscribe_catch_up_then_live() {
1921 use futures::StreamExt;
1922 let dir = tempfile::tempdir().unwrap();
1923 let store = seeded(dir.path()).await; // 2 opens → seqs 1, 2
1924 store.commit(&mk_tx("h1", 10)).await.unwrap(); // seq 3
1925 let mut stream = store.subscribe(&Book("b".into()), 1);
1926 let mut seen = vec![];
1927 for _ in 0..3 {
1928 seen.push(stream.next().await.unwrap().unwrap().seq);
1929 }
1930 assert_eq!(seen, vec![1, 2, 3]);
1931 store.commit(&mk_tx("live1", 5)).await.unwrap(); // seq 4
1932 assert_eq!(stream.next().await.unwrap().unwrap().seq, 4);
1933 // a second subscriber from the middle
1934 let mut s2 = store.subscribe(&Book("b".into()), 3);
1935 assert_eq!(s2.next().await.unwrap().unwrap().seq, 3);
1936 assert_eq!(s2.next().await.unwrap().unwrap().seq, 4);
1937 }
1938
1939 /// Verify that `trial_balance(Some(t))` clamps to the durability ceiling.
1940 ///
1941 /// Commits two transactions in a single batch (so they very likely share
1942 /// the same `committed_at`). Asserts that `trial_balance(as_of = that
1943 /// timestamp)` includes both transactions' effects. This tests the
1944 /// ceiling + `as_of` interaction: the ceiling filter must not accidentally
1945 /// drop frames whose `at` equals the cutoff.
1946 #[tokio::test]
1947 async fn as_of_boundary_includes_all_commits_sharing_the_timestamp() {
1948 let dir = tempfile::tempdir().unwrap();
1949 let store = seeded(dir.path()).await;
1950
1951 // Submit two transactions in one batch so they share a committed_at.
1952 let batch = vec![mk_tx("tb1", 50), mk_tx("tb2", 75)];
1953 let results = store.commit_batch(&batch).await;
1954 let c1 = results[0].as_ref().expect("tx1 must succeed");
1955 let c2 = results[1].as_ref().expect("tx2 must succeed");
1956
1957 // If they share the same at, a trial_balance as_of that at must include both.
1958 if c1.at == c2.at {
1959 let book = Book("b".into());
1960 let tb = store.trial_balance(&book, Some(c1.at)).await.unwrap();
1961 assert!(
1962 !tb.is_empty(),
1963 "trial balance must not be empty after two commits"
1964 );
1965 let usd_row = tb
1966 .iter()
1967 .find(|r| r.asset == AssetId::new("USD"))
1968 .expect("USD row must exist");
1969 // Both txs debit cash and credit rev by 50 and 75 → total debits 125.
1970 assert_eq!(
1971 usd_row.debits, 125,
1972 "trial_balance(as_of) must include both batch-committed transactions; \
1973 expected debits=125, got {}",
1974 usd_row.debits
1975 );
1976 assert_eq!(usd_row.credits, 125, "credits must also be 125");
1977 } else {
1978 // Clock ticked between the two commits; still verify the later as_of
1979 // includes both.
1980 let book = Book("b".into());
1981 let tb = store.trial_balance(&book, Some(c2.at)).await.unwrap();
1982 let usd_row = tb
1983 .iter()
1984 .find(|r| r.asset == AssetId::new("USD"))
1985 .expect("USD row must exist");
1986 assert_eq!(
1987 usd_row.debits, 125,
1988 "trial_balance(as_of=c2.at) must include both transactions even if timestamps differ"
1989 );
1990 }
1991 }
1992}