Skip to main content

talea_store_log/
lib.rs

1//! Append-log store: one CRC-framed JSON event log per book, a single
2//! writer task per book over in-memory state, strict fsync-per-batch.
3
4pub mod frame;
5pub mod idem_spill;
6pub mod segment;
7pub mod snapshot;
8pub mod state;
9pub mod writer;
10pub use frame::WireEvent;
11
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::{Arc, Mutex, PoisonError};
16
17use async_trait::async_trait;
18use chrono::DateTime;
19use chrono::Utc;
20use futures::stream::BoxStream;
21use tokio::sync::RwLock;
22
23use talea_core::events::LedgerEvent;
24use talea_core::store::{
25    AccountCfg, BalanceSnapshot, Committed, PostingRecord, SYSTEM_BOOK, Sequenced, Store,
26    StoreError, StoredTransaction, TrialBalanceRow,
27};
28use talea_core::types::{
29    AccountDef, AccountId, Amount, AssetDef, AssetId, Book, Direction, Seq, Transaction, TxId,
30};
31
32use crate::idem_spill::DEFAULT_IDEM_HOT_CAP;
33use crate::segment::SegmentSet;
34use crate::state::{BookState, effective};
35use crate::writer::{BookWriter, Job};
36
37/// Options for opening a [`LogTaleaStore`].
38#[derive(Debug, Clone)]
39pub struct LogStoreOptions {
40    /// Maximum number of hot idempotency keys kept in memory.
41    /// Overflow drains the oldest half to on-disk spill runs.
42    pub idem_hot_cap: usize,
43    /// Events between automatic snapshots (0 = disabled).
44    pub snapshot_every: u64,
45    /// Maximum bytes per segment file before rotation.
46    pub segment_max: u64,
47}
48
49impl Default for LogStoreOptions {
50    fn default() -> Self {
51        Self {
52            idem_hot_cap: DEFAULT_IDEM_HOT_CAP,
53            snapshot_every: BookWriter::DEFAULT_SNAPSHOT_EVERY,
54            segment_max: crate::segment::DEFAULT_SEGMENT_MAX,
55        }
56    }
57}
58
59/// An append-log implementation of [`Store`].
60///
61/// # Directory structure
62///
63/// ```text
64/// <dir>/
65///   LOCK                   ← exclusive advisory lock held for the process lifetime
66///   books/
67///     _system/             ← system book (assets)
68///     <book>/              ← one dir per user book
69/// ```
70///
71/// # Concurrency / single-process invariant
72///
73/// One `LogTaleaStore` per directory at a time is enforced via an fs4 advisory
74/// lock on `<dir>/LOCK`. A second `open` on the same directory from the same
75/// or a different process will fail with `StoreError::Io`.
76///
77/// # Book name safety
78///
79/// Book names are validated to prevent directory escape: names containing `/`,
80/// `\`, or the component `..` are rejected. This validation happens in every
81/// path that accepts a book name from external input.
82pub struct LogTaleaStore {
83    dir: PathBuf,
84    registry: Arc<RwLock<HashMap<AssetId, AssetDef>>>,
85    books: Arc<RwLock<HashMap<String, BookWriter>>>,
86    batch_max: usize,
87    /// Segment rotation threshold in bytes.  Stored so that new books created
88    /// after `open_with` (via `book_writer`) use the same threshold as books
89    /// that were already on disk at startup.
90    segment_max: u64,
91    /// Snapshot cadence (events between automatic snapshots).  Stored so that
92    /// new books created after `open_with` use the same setting.
93    snapshot_every: u64,
94    /// Set to `true` by `shutdown()` before draining writers.  Any subsequent
95    /// call into `book_writer()` or `register_asset()` checks this flag first
96    /// and returns an error rather than silently recreating writers over an
97    /// empty `BookState` while a newly-opened store may own the directory.
98    shut_down: AtomicBool,
99    /// Held for the process lifetime. `shutdown(&self)` takes the file out so
100    /// the advisory lock is released before another `open` on the same dir
101    /// (e.g. in the same test process). Drop of the struct releases it otherwise.
102    ///
103    /// We wrap in `Mutex<Option<_>>` because `shutdown` takes `&self` (not
104    /// `self`) — the test drops the store after calling `shutdown`, and the
105    /// reopen inside the same scope needs the lock gone first.
106    _lock: Mutex<Option<std::fs::File>>,
107}
108
109fn io_err(e: impl std::error::Error + Send + Sync + 'static) -> StoreError {
110    StoreError::Io(Box::new(e))
111}
112
113fn io_str(s: impl Into<String>) -> StoreError {
114    StoreError::Io(s.into().into())
115}
116
117/// Validate a book name is safe to use as a directory component.
118fn validate_book_name(book: &str) -> Result<(), StoreError> {
119    if book.is_empty()
120        || book.contains('/')
121        || book.contains('\\')
122        || book == ".."
123        || book.contains("..")
124    {
125        return Err(io_str(format!(
126            "invalid book name {book:?}: must not be empty or contain '/', '\\', or '..'"
127        )));
128    }
129    Ok(())
130}
131
132impl LogTaleaStore {
133    /// Open (or create) the store at `dir` with default options.
134    ///
135    /// See [`open_with`] for the full option set.
136    pub async fn open(dir: &Path) -> Result<Self, StoreError> {
137        Self::open_with(dir, LogStoreOptions::default()).await
138    }
139
140    /// Open (or create) the store at `dir` with explicit options.
141    ///
142    /// 1. Creates `<dir>/books/` if missing.
143    /// 2. Acquires an exclusive fs4 advisory lock on `<dir>/LOCK`.
144    /// 3. Replays each existing book dir under `<dir>/books/` to rebuild
145    ///    in-memory state, then spawns a `BookWriter` per book.
146    /// 4. `_system` is treated like any other book for replay; `AssetRegistered`
147    ///    events also populate the in-memory asset registry.
148    /// 5. For each book, calls `idem.attach_dir(...)` to load spill runs and
149    ///    rebuild the Bloom filter.  If any run's CRC fails or the Bloom file
150    ///    is missing/corrupt, a full log scan is performed to rebuild the runs.
151    pub async fn open_with(dir: &Path, opts: LogStoreOptions) -> Result<Self, StoreError> {
152        use fs4::fs_std::FileExt;
153
154        // 1. Create dirs.
155        tokio::fs::create_dir_all(dir).await.map_err(io_err)?;
156        let books_dir = dir.join("books");
157        tokio::fs::create_dir_all(&books_dir)
158            .await
159            .map_err(io_err)?;
160
161        // 2. Acquire exclusive advisory lock.
162        let lock_path = dir.join("LOCK");
163        let lock_file = std::fs::OpenOptions::new()
164            .create(true)
165            .write(true)
166            // truncate(false): we only need the fd for locking, not to modify content
167            .truncate(false)
168            .open(&lock_path)
169            .map_err(io_err)?;
170        // try_lock_exclusive returns Ok(true) on success, Ok(false) if contended,
171        // Err(_) for genuine I/O failure.
172        let locked = lock_file.try_lock_exclusive().map_err(io_err)?;
173        if !locked {
174            return Err(io_str(format!(
175                "data dir already locked by another process: {}",
176                dir.display()
177            )));
178        }
179
180        // 3. Replay existing book dirs.
181        let mut registry: HashMap<AssetId, AssetDef> = HashMap::new();
182        let mut books_map: HashMap<String, BookWriter> = HashMap::new();
183        let batch_max = 1024;
184
185        let mut rd = tokio::fs::read_dir(&books_dir).await.map_err(io_err)?;
186        while let Some(entry) = rd.next_entry().await.map_err(io_err)? {
187            let ft = entry.file_type().await.map_err(io_err)?;
188            if !ft.is_dir() {
189                continue;
190            }
191            let name = entry
192                .file_name()
193                .into_string()
194                .map_err(|_| io_str("non-UTF-8 book dir name"))?;
195
196            let book_dir = entry.path();
197
198            // Open the segments for replay (validation + repair happens inside open).
199            let seg = SegmentSet::open(&book_dir).await.map_err(io_err)?;
200
201            // --- Snapshot-assisted replay ---
202            //
203            // Try to load the newest valid snapshot from this book's directory.
204            // If one is found, we start replaying the event log from `snap_seq + 1`
205            // instead of from genesis, bounding startup time to the tail only.
206            //
207            // The snapshot was taken post-apply (by the writer, after fsync+apply),
208            // so `state.next_seq == snap_seq + 1` in the loaded state.  We trust the
209            // state's own counters; the filename's seq is only used to select the
210            // segment start point.
211            //
212            // The deserialized BookState's `writer_attached` flag is always `false`
213            // (it has `#[serde(skip, default = "default_writer_attached")]`), so a
214            // writer can safely attach to it post-load.
215            let (mut st, replay_from) =
216                match snapshot::load_latest(&book_dir).await.map_err(io_err)? {
217                    Some((snap_st, snap_seq)) => {
218                        // Sanity: state's own next_seq must be snap_seq+1.
219                        debug_assert_eq!(
220                            snap_st.next_seq,
221                            snap_seq + 1,
222                            "snapshot {name} seq={snap_seq}: state.next_seq={} expected {}",
223                            snap_st.next_seq,
224                            snap_seq + 1
225                        );
226                        tracing::debug!(
227                            book = %name,
228                            snap_seq,
229                            "loaded snapshot; replaying tail from seq {}",
230                            snap_seq + 1
231                        );
232                        (snap_st, snap_seq + 1)
233                    }
234                    None => {
235                        tracing::debug!(book = %name, "no snapshot; full replay from seq 1");
236                        (BookState::default(), 1)
237                    }
238                };
239
240            // Apply idem_hot_cap option to the loaded state.
241            st.idem.cap = opts.idem_hot_cap;
242
243            // Replay the tail starting at `replay_from` (full replay when no snapshot).
244            let pairs = seg
245                .scan_with_pos(replay_from, usize::MAX)
246                .await
247                .map_err(io_err)?;
248
249            // Fold into the (possibly snapshot-seeded) BookState.
250            // Use try_apply_transaction (not apply_transaction) so a corrupt or
251            // hand-edited log that contains arithmetic-impossible postings
252            // returns StoreError::Io instead of panicking.
253            for (wire, pos) in &pairs {
254                match &wire.event {
255                    LedgerEvent::TransactionPosted(tx) => {
256                        st.try_apply_transaction(tx, wire.seq, wire.at, *pos)
257                            .map_err(|msg| {
258                                // Encode enough context for the operator to locate the corrupt frame.
259                                let (seg_base, off) = pos;
260                                io_str(format!(
261                                    "corrupt log in book {name} at segment {seg_base} offset {off}: {msg}"
262                                ))
263                            })?;
264                    }
265                    LedgerEvent::AccountOpened { def, cfg } => {
266                        st.apply_account_opened(def, cfg, wire.seq, wire.at);
267                    }
268                    LedgerEvent::AssetRegistered(def) => {
269                        st.bump_seq(wire.seq, wire.at);
270                        // Populate registry regardless of which book the event
271                        // lives in (though the writer only appends these to _system).
272                        registry.insert(def.id.clone(), def.clone());
273                    }
274                }
275            }
276
277            // Attach idem spill runs + bloom from disk.
278            // The rebuild_fn supplies all committed idem keys from the full log
279            // (both the snapshot-sourced hot map and tail replay), so that if any
280            // run is corrupt/missing the index can be rebuilt accurately.
281            //
282            // Note: we do a second pass over `pairs` only for the rebuild path
283            // (rare).  The all_idem_pairs closure is only called if CRC fails.
284            let all_idem_snap: Vec<(String, crate::state::CommittedRec)> = {
285                // Pre-snapshot idem keys come from st.idem.hot (already in hot).
286                // tail-replay idem keys are also already in st.idem.hot at this point.
287                // For rebuild, we need ALL committed keys: hot map is authoritative
288                // for recent, runs hold older. The rebuild_fn produces the UNION
289                // of all committed keys that are NOT already in hot, so pass an
290                // empty vec here (hot is the only source of truth right now; runs
291                // will be rebuilt from the log scan that follows).
292                //
293                // Actually: we need to provide all keys so rebuild can reconstruct
294                // the full set.  `st.idem.hot` has all keys from replay at this point
295                // (runs will be empty since they're being validated).
296                // Pass all pairs from the full log so the rebuild_fn has complete data.
297                pairs
298                    .iter()
299                    .filter_map(|(wire, _)| {
300                        if let LedgerEvent::TransactionPosted(tx) = &wire.event {
301                            Some((
302                                tx.idempotency_key.0.clone(),
303                                crate::state::CommittedRec {
304                                    txid: tx.id.clone(),
305                                    seq: wire.seq,
306                                    at: wire.at,
307                                },
308                            ))
309                        } else {
310                            None
311                        }
312                    })
313                    .collect()
314            };
315
316            st.idem
317                .attach_dir(&book_dir, move || {
318                    let snap = all_idem_snap;
319                    async move { Ok(snap) }
320                })
321                .await
322                .map_err(io_err)?;
323
324            // Drop the replay SegmentSet — BookWriter::spawn re-opens it for writes.
325            drop(seg);
326
327            // Spawn writer over the replayed state.
328            let writer = BookWriter::spawn_with_opts(
329                book_dir,
330                Arc::new(RwLock::new(st)),
331                batch_max,
332                opts.snapshot_every,
333                opts.segment_max,
334            )
335            .await
336            .map_err(io_err)?;
337
338            books_map.insert(name, writer);
339        }
340
341        Ok(Self {
342            dir: dir.to_path_buf(),
343            registry: Arc::new(RwLock::new(registry)),
344            books: Arc::new(RwLock::new(books_map)),
345            batch_max,
346            segment_max: opts.segment_max,
347            snapshot_every: opts.snapshot_every,
348            shut_down: AtomicBool::new(false),
349            _lock: Mutex::new(Some(lock_file)),
350        })
351    }
352
353    /// Look up an existing `BookWriter` without creating one.
354    ///
355    /// Returns:
356    /// - `Err(StoreError::Io)` if the store has been shut down (same behaviour
357    ///   as `book_writer` — a shut-down store should not silently report that
358    ///   every book is absent).
359    /// - `Ok(None)` if the book does not exist yet (read lock only, no I/O).
360    /// - `Ok(Some(w))` if the book is already loaded.
361    ///
362    /// Read paths (e.g. `balance`) use this so they never create a book
363    /// directory on disk as a side effect of querying a nonexistent book.
364    /// Write paths (`commit`, `open_account`, `register_asset`) continue to
365    /// use `book_writer`, which creates the book on first access.
366    async fn existing_book(&self, book: &str) -> Result<Option<BookWriter>, StoreError> {
367        if self.shut_down.load(Ordering::SeqCst) {
368            return Err(io_str("store has been shut down"));
369        }
370        validate_book_name(book)?;
371        let guard = self.books.read().await;
372        Ok(guard.get(book).cloned())
373    }
374
375    /// Get or create the `BookWriter` for `book`.
376    ///
377    /// # Book name safety
378    ///
379    /// Book names containing `/`, `\`, or `..` are rejected to prevent a book
380    /// named e.g. `../x` from escaping the `<dir>/books/` subtree. The API
381    /// layer constrains book names further, but the store provides a last-line
382    /// defence here too.
383    async fn book_writer(&self, book: &str) -> Result<BookWriter, StoreError> {
384        if self.shut_down.load(Ordering::SeqCst) {
385            return Err(io_str("store has been shut down"));
386        }
387        validate_book_name(book)?;
388
389        // Fast path: read lock.
390        {
391            let guard = self.books.read().await;
392            if let Some(w) = guard.get(book) {
393                return Ok(w.clone());
394            }
395        }
396
397        // Slow path: write lock, get-or-create.
398        let mut guard = self.books.write().await;
399        if let Some(w) = guard.get(book) {
400            return Ok(w.clone());
401        }
402
403        let book_dir = self.dir.join("books").join(book);
404        // For a brand-new book, attach_dir with an empty rebuild_fn (no existing log).
405        let mut initial_state = BookState::default();
406        tokio::fs::create_dir_all(&book_dir).await.map_err(io_err)?;
407        initial_state
408            .idem
409            .attach_dir(&book_dir, || async { Ok(vec![]) })
410            .await
411            .map_err(io_err)?;
412
413        let state = Arc::new(RwLock::new(initial_state));
414        let writer = BookWriter::spawn_with_opts(
415            book_dir,
416            state,
417            self.batch_max,
418            self.snapshot_every,
419            self.segment_max,
420        )
421        .await
422        .map_err(io_err)?;
423        guard.insert(book.to_string(), writer.clone());
424        Ok(writer)
425    }
426
427    /// Trigger an immediate snapshot for `book` and wait for it to complete.
428    ///
429    /// This is an ops/test hook.  The snapshot is written by the book's writer
430    /// task (the only mutator), so it is consistent with the current state.
431    ///
432    /// Returns `Ok(())` on success.  Returns `StoreError::Io` if the book is
433    /// unknown, the store is shut down, or the snapshot write fails.
434    pub async fn snapshot_now(&self, book: &str) -> Result<(), StoreError> {
435        let writer = self
436            .existing_book(book)
437            .await?
438            .ok_or_else(|| io_str(format!("book {book:?} not found for snapshot_now")))?;
439        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
440        writer.submit(Job::Snapshot(reply_tx)).await?;
441        reply_rx
442            .await
443            .map_err(|_| io_str("book writer gone during snapshot_now"))?
444    }
445
446    /// Drain all writers and await their completion.
447    ///
448    /// Writers finish when their last sender clone is dropped. This method:
449    /// 1. Drains the books map (removing the store's writer clones).
450    /// 2. For each writer, calls `BookWriter::shutdown(w)` which consumes the
451    ///    clone — dropping its `mpsc::Sender` — before awaiting the task. This
452    ///    guarantees the task sees sender-count → 0 and exits.
453    /// 3. Releases the advisory LOCK file so a subsequent `open` on the same
454    ///    directory succeeds in the same process (the test pattern calls
455    ///    `shutdown` before dropping and then reopens).
456    ///
457    /// Note: takes `&self` (not `self`) so it can be called without consuming
458    /// the store. The caller is expected to drop the `LogTaleaStore` afterward.
459    pub async fn shutdown(&self) {
460        // Signal all subsequent calls that the store is no longer usable.
461        // Must happen BEFORE draining writers so any concurrent book_writer()
462        // call racing with drain sees the flag and returns an error.
463        self.shut_down.store(true, Ordering::SeqCst);
464
465        // Drain the books map → we now hold the only remaining clones.
466        let writers: Vec<BookWriter> = {
467            let mut guard = self.books.write().await;
468            guard.drain().map(|(_, w)| w).collect()
469        };
470
471        // BookWriter::shutdown(w) consumes `w` (dropping its Sender) before
472        // awaiting the task handle. Calling `w.join()` instead would keep `w`
473        // alive during the await, so the Sender would never be dropped and the
474        // task would block forever on `rx.recv()`.
475        for w in writers {
476            w.shutdown().await;
477        }
478
479        // Release the advisory lock so a subsequent `open` on the same dir
480        // works. Poison recovery: taking the file handle out of the Option
481        // cannot be torn by a panicking thread, so the inner value is safe.
482        let _ = self
483            ._lock
484            .lock()
485            .unwrap_or_else(PoisonError::into_inner)
486            .take();
487    }
488}
489
490#[async_trait]
491impl Store for LogTaleaStore {
492    /// Register an asset.
493    ///
494    /// Idempotent: same def → `Ok(())` with no new event.
495    /// Different def for same id → `StoreError::AlreadyExists`.
496    ///
497    /// # Race note (same-new-asset concurrent register)
498    ///
499    /// Both concurrent calls for a brand-new asset would each pass the "absent"
500    /// check and submit a `Job::RegisterAsset`. To prevent two `AssetRegistered`
501    /// frames being appended, we hold the registry WRITE lock across the submit
502    /// AND the ack from the writer. Asset registration is rare (a setup-time
503    /// operation, not a hot path), so a write-lock held across an await is
504    /// acceptable here. Documented explicitly.
505    async fn register_asset(&self, asset: &AssetDef) -> Result<(), StoreError> {
506        if self.shut_down.load(Ordering::SeqCst) {
507            return Err(io_str("store has been shut down"));
508        }
509        // Hold the write lock for the full operation to serialize concurrent
510        // registrations of the same new asset (see race note above).
511        let mut reg = self.registry.write().await;
512
513        if let Some(existing) = reg.get(&asset.id) {
514            if existing == asset {
515                return Ok(()); // same def — idempotent no-op
516            }
517            return Err(StoreError::AlreadyExists {
518                what: format!("asset {}", asset.id.as_str()),
519            });
520        }
521
522        // Submit to the _system writer.
523        let writer = self.book_writer(SYSTEM_BOOK).await?;
524        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
525        writer
526            .submit(Job::RegisterAsset(asset.clone(), reply_tx))
527            .await?;
528        reply_rx
529            .await
530            .map_err(|_| io_str("book writer gone during register_asset"))??;
531
532        // Insert into registry AFTER the ack so a concurrent open sees it
533        // only once it is durable.
534        reg.insert(asset.id.clone(), asset.clone());
535        Ok(())
536    }
537
538    /// Open an account.
539    ///
540    /// Validates that:
541    /// - The book is not a reserved name (starts with `_`).
542    /// - The asset is registered.
543    ///
544    /// Idempotent for the same `(def, cfg)` pair; `AlreadyExists` for a
545    /// conflicting def/cfg.
546    async fn open_account(&self, def: &AccountDef, cfg: &AccountCfg) -> Result<(), StoreError> {
547        // Reserved-book guard: the store is responsible for this (the writer
548        // does not validate book names for OpenAccount jobs).
549        if def.id.book.is_reserved() {
550            return Err(StoreError::InvalidBook(def.id.book.clone()));
551        }
552
553        // Asset must be registered.
554        {
555            let reg = self.registry.read().await;
556            if !reg.contains_key(&def.asset) {
557                return Err(StoreError::UnknownAsset(def.asset.clone()));
558            }
559        }
560
561        let writer = self.book_writer(&def.id.book.0).await?;
562        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
563        writer
564            .submit(Job::OpenAccount(def.clone(), cfg.clone(), reply_tx))
565            .await?;
566        reply_rx
567            .await
568            .map_err(|_| io_str("book writer gone during open_account"))?
569    }
570
571    /// Commit a transaction.
572    async fn commit(&self, transaction: &Transaction) -> Result<Committed, StoreError> {
573        let writer = self.book_writer(&transaction.book.0).await?;
574        writer.commit(transaction.clone()).await
575    }
576
577    /// Commit a batch of transactions with group-commit semantics.
578    ///
579    /// All oneshot reply channels are created and submitted to the writer
580    /// before awaiting any reply, so the writer can drain them into a single
581    /// fsync batch. Results are positional.
582    async fn commit_batch(&self, txs: &[Transaction]) -> Vec<Result<Committed, StoreError>> {
583        if txs.is_empty() {
584            return vec![];
585        }
586
587        // Group by book (in practice all txs are typically the same book,
588        // but handle multi-book correctly).
589        //
590        // For the single-book case: submit ALL jobs first, then collect replies —
591        // this is what lets the writer drain them into one fsync batch.
592        //
593        // For multi-book: we build per-book (channels, txs) then submit all at
594        // once before awaiting any. Positional output is reconstructed at the end.
595
596        // Build a vec of (book, tx_index, reply_rx) — all writers acquired before
597        // any submits.
598        let mut rxs: Vec<tokio::sync::oneshot::Receiver<Result<Committed, StoreError>>> =
599            Vec::with_capacity(txs.len());
600        // Temporarily hold Err results for txs that fail at writer-acquisition stage.
601        let mut errs: HashMap<usize, StoreError> = HashMap::new();
602
603        // Collect writers per book.
604        let mut writers_cache: HashMap<String, Result<BookWriter, StoreError>> = HashMap::new();
605
606        struct PendingSubmit {
607            writer: BookWriter,
608            tx_idx: usize,
609        }
610        let mut pending: Vec<PendingSubmit> = Vec::with_capacity(txs.len());
611
612        // Also track the original error message per book so duplicate slots
613        // for the same failing book get a consistent (not generic) error.
614        // StoreError is not Clone, so we store the message text and produce a
615        // fresh Io error for each affected slot.
616        let mut book_err_msgs: HashMap<String, String> = HashMap::new();
617
618        for (i, tx) in txs.iter().enumerate() {
619            let book = tx.book.0.clone();
620            if !writers_cache.contains_key(&book) {
621                let w = self.book_writer(&book).await;
622                if let Err(ref e) = w {
623                    book_err_msgs.insert(book.clone(), e.to_string());
624                }
625                writers_cache.insert(book.clone(), w);
626            }
627            match &writers_cache[&book] {
628                // Carrying the writer clone here (instead of re-looking it up
629                // by book later) keeps the submit loop infallible.
630                Ok(w) => pending.push(PendingSubmit {
631                    writer: w.clone(),
632                    tx_idx: i,
633                }),
634                Err(_) => {
635                    // Placeholder rx slot will be skipped; record error.
636                    // We need to keep indices aligned, so push a dummy rx and
637                    // record the error separately.
638                    let msg = book_err_msgs
639                        .get(&book)
640                        .cloned()
641                        .unwrap_or_else(|| "failed to get book writer".into());
642                    errs.insert(i, io_str(msg));
643                }
644            }
645        }
646
647        // Build (tx_idx, rx) pairs: submit all jobs, collect all rxs.
648        let mut tx_to_rx_idx: Vec<Option<usize>> = vec![None; txs.len()];
649        let mut rx_to_tx_idx: Vec<usize> = Vec::with_capacity(pending.len());
650
651        for ps in &pending {
652            let i = ps.tx_idx;
653            let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
654            let submit_result = ps
655                .writer
656                .submit(Job::Commit(txs[i].clone(), reply_tx))
657                .await;
658            match submit_result {
659                Ok(()) => {
660                    let rx_idx = rxs.len();
661                    rxs.push(reply_rx);
662                    tx_to_rx_idx[i] = Some(rx_idx);
663                    rx_to_tx_idx.push(i);
664                }
665                Err(e) => {
666                    errs.insert(i, e);
667                }
668            }
669        }
670
671        // Await all replies.
672        let mut replies: Vec<Option<Result<Committed, StoreError>>> =
673            (0..rxs.len()).map(|_| None).collect();
674        for (rx_idx, rx) in rxs.into_iter().enumerate() {
675            replies[rx_idx] = Some(
676                rx.await
677                    .unwrap_or_else(|_| Err(io_str("book writer gone during commit_batch"))),
678            );
679        }
680
681        // Reconstruct positional output.
682        let mut out: Vec<Result<Committed, StoreError>> = (0..txs.len())
683            .map(|_| Err(io_str("unreachable: unset commit_batch slot")))
684            .collect();
685
686        for i in 0..txs.len() {
687            if let Some(e) = errs.remove(&i) {
688                out[i] = Err(e);
689            } else if let Some(result) = tx_to_rx_idx[i]
690                .and_then(|rx_idx| replies.get_mut(rx_idx))
691                .and_then(Option::take)
692            {
693                // Every rx slot was set in the await loop above; a missing one
694                // leaves the pre-seeded "unset slot" error in place.
695                out[i] = result;
696            }
697        }
698
699        out
700    }
701
702    /// Current balance for `account`.
703    ///
704    /// `as_of: None` returns the current (fully-applied) balance from
705    /// in-memory state. `as_of: Some(t)` binary-searches the per-account
706    /// posting entries (sorted by committed_at, non-decreasing) to find the
707    /// balance as it stood at time `t`.
708    ///
709    /// Uses `existing_book` (read-only lookup) so querying an account in a
710    /// book that has never been written to does NOT create that book's
711    /// directory on disk.  An absent book means the account cannot exist →
712    /// `StoreError::UnknownAccount`.
713    ///
714    /// Edge semantics matching sqlite:
715    /// - Unknown book → `UnknownAccount` (no book → no account).
716    /// - Unknown account within a known book → `UnknownAccount`.
717    /// - Account exists but no postings before `as_of` → `BalanceSnapshot { amount: 0, updated_seq: 0 }`.
718    async fn balance(
719        &self,
720        account: &AccountId,
721        as_of: Option<DateTime<Utc>>,
722    ) -> Result<BalanceSnapshot, StoreError> {
723        let writer = self
724            .existing_book(&account.book.0)
725            .await?
726            .ok_or_else(|| StoreError::UnknownAccount(account.clone()))?;
727        let st = writer.state.read().await;
728        let key = account.to_key();
729        let acct = st
730            .accounts
731            .get(&key)
732            .ok_or_else(|| StoreError::UnknownAccount(account.clone()))?;
733
734        match as_of {
735            None => {
736                let eff = effective(acct.raw_balance, &acct.cfg.normal_side);
737                Ok(BalanceSnapshot {
738                    amount: Amount::new(eff, acct.def.asset.clone()),
739                    updated_seq: acct.updated_seq,
740                })
741            }
742            Some(t) => {
743                // Binary search: committed_at is non-decreasing vs seq within
744                // a book. idx == 0 (no postings at or before the cutoff) and
745                // the entry lookup collapse into one infallible Option chain.
746                let idx = acct.postings.partition_point_at(t);
747                match idx.checked_sub(1).and_then(|i| acct.postings.get(i)) {
748                    None => Ok(BalanceSnapshot {
749                        amount: Amount::new(0, acct.def.asset.clone()),
750                        updated_seq: 0,
751                    }),
752                    Some(entry) => {
753                        let eff = effective(entry.raw_after, &acct.cfg.normal_side);
754                        Ok(BalanceSnapshot {
755                            amount: Amount::new(eff, acct.def.asset.clone()),
756                            updated_seq: entry.seq,
757                        })
758                    }
759                }
760            }
761        }
762    }
763
764    /// Look up an asset by id.
765    async fn asset(&self, id: &AssetId) -> Result<Option<AssetDef>, StoreError> {
766        let reg = self.registry.read().await;
767        Ok(reg.get(id).cloned())
768    }
769
770    /// Posting history for `account`, paginated by seq.
771    ///
772    /// `after_seq` is EXCLUSIVE (matches sqlite semantics: `seq > after_seq`).
773    /// `limit` counts DISTINCT seqs so one transaction's postings to the same
774    /// account are never split across pages.
775    ///
776    /// Edge semantics matching sqlite:
777    /// - Unknown book or account → `UnknownAccount`.
778    /// - limit 0 → empty vec (never an error).
779    async fn account_history(
780        &self,
781        account: &AccountId,
782        after_seq: Option<Seq>,
783        limit: usize,
784    ) -> Result<Vec<PostingRecord>, StoreError> {
785        if limit == 0 {
786            return Ok(vec![]);
787        }
788
789        let writer = self
790            .existing_book(&account.book.0)
791            .await?
792            .ok_or_else(|| StoreError::UnknownAccount(account.clone()))?;
793        let st = writer.state.read().await;
794        let key = account.to_key();
795        let acct = st
796            .accounts
797            .get(&key)
798            .ok_or_else(|| StoreError::UnknownAccount(account.clone()))?;
799
800        let after = after_seq.unwrap_or(0);
801        // Start at the first entry with seq > after.
802        let start = acct.postings.partition_point_seq(after);
803
804        // Walk counting DISTINCT seqs, stopping before exceeding `limit`.
805        let mut records: Vec<PostingRecord> = Vec::new();
806        let mut distinct_seqs: usize = 0;
807        let mut last_seq: Seq = 0;
808
809        for entry in acct.postings.iter_from(start) {
810            if entry.seq != last_seq {
811                if distinct_seqs >= limit {
812                    break;
813                }
814                distinct_seqs += 1;
815                last_seq = entry.seq;
816            }
817            records.push(PostingRecord {
818                seq: entry.seq,
819                txid: entry.txid.clone(),
820                account: account.clone(),
821                amount: Amount::new(entry.minor, acct.def.asset.clone()),
822                direction: entry.direction.clone(),
823                at: entry.at,
824            });
825        }
826
827        Ok(records)
828    }
829
830    /// Look up a committed transaction by its `TxId`.
831    ///
832    /// Searches all book writers' in-memory txid indexes. Found → reads the
833    /// frame at the recorded position and returns a `StoredTransaction`.
834    /// Not found → `Ok(None)` (matches sqlite semantics).
835    async fn transaction(&self, txid: &TxId) -> Result<Option<StoredTransaction>, StoreError> {
836        use talea_core::events::LedgerEvent;
837
838        // Snapshot the writers map under a read lock.
839        let writers: Vec<BookWriter> = {
840            let guard = self.books.read().await;
841            guard.values().cloned().collect()
842        };
843
844        for writer in &writers {
845            let st = writer.state.read().await;
846            if let Some(&(seq, pos)) = st.txids.get(&txid.0) {
847                // Found: release state lock before doing async I/O.
848                drop(st);
849                let wire = writer
850                    .catalog
851                    .read_at(pos.0, pos.1, seq)
852                    .await
853                    .map_err(io_err)?;
854                match wire.event {
855                    LedgerEvent::TransactionPosted(tx) => {
856                        return Ok(Some(StoredTransaction {
857                            transaction: tx,
858                            seq: wire.seq,
859                            at: wire.at,
860                        }));
861                    }
862                    _ => {
863                        return Err(io_str(format!(
864                            "frame at recorded position is not a transaction (seq {seq})"
865                        )));
866                    }
867                }
868            }
869        }
870
871        Ok(None)
872    }
873
874    /// Trial balance for `book`.
875    ///
876    /// `as_of: None` returns the current lifetime sums from in-memory state
877    /// (cheap). `as_of: Some(t)` replays all TransactionPosted frames with
878    /// `committed_at <= t` from disk (rare, documented as slow path).
879    ///
880    /// Edge semantics matching sqlite:
881    /// - Unknown book → empty vec (sqlite returns no rows for an unknown book).
882    /// - Rows are sorted by asset id string, matching sqlite's `ORDER BY asset`.
883    async fn trial_balance(
884        &self,
885        book: &Book,
886        as_of: Option<DateTime<Utc>>,
887    ) -> Result<Vec<TrialBalanceRow>, StoreError> {
888        use talea_core::events::LedgerEvent;
889
890        let writer = match self.existing_book(&book.0).await? {
891            None => return Ok(vec![]),
892            Some(w) => w,
893        };
894
895        match as_of {
896            None => {
897                let st = writer.state.read().await;
898                let mut rows: Vec<TrialBalanceRow> = st
899                    .sums
900                    .iter()
901                    .map(|(asset, &(debits, credits))| TrialBalanceRow {
902                        asset: asset.clone(),
903                        debits,
904                        credits,
905                    })
906                    .collect();
907                rows.sort_by(|a, b| a.asset.as_str().cmp(b.asset.as_str()));
908                Ok(rows)
909            }
910            Some(t) => {
911                // Disk replay: scan all events, fold TransactionPosted with at <= t.
912                // Rare operation; documented as slow path.
913                //
914                // Durability watermark: the writer appends → syncs → applies-to-state
915                // → acks.  `state.next_seq` advances only AFTER fsync, so any frame
916                // with `seq < ceiling` (where `ceiling = next_seq - 1`) is guaranteed
917                // to be both durable on disk and reflected in the in-memory state.
918                // Frames beyond the ceiling may be in the page cache but not yet
919                // fsynced (the writer hasn't applied them yet); surfacing them would
920                // be a dirty read — they could vanish on crash.  We therefore stop
921                // scanning once `wire.seq > ceiling`.
922                let ceiling: Seq = {
923                    let st = writer.state.read().await;
924                    st.next_seq.saturating_sub(1)
925                };
926
927                let events = writer
928                    .catalog
929                    .scan_from(1, usize::MAX)
930                    .await
931                    .map_err(io_err)?;
932
933                let mut sums: HashMap<AssetId, (i64, i64)> = HashMap::new();
934                for wire in events {
935                    if wire.seq > ceiling {
936                        break; // beyond durability watermark — stop
937                    }
938                    if wire.at > t {
939                        break; // committed_at is non-decreasing; can stop early
940                    }
941                    if let LedgerEvent::TransactionPosted(tx) = wire.event {
942                        for posting in &tx.postings {
943                            let entry =
944                                sums.entry(posting.amount.asset().clone()).or_insert((0, 0));
945                            match posting.direction {
946                                Direction::Debit => {
947                                    entry.0 = entry.0.saturating_add(posting.amount.minor())
948                                }
949                                Direction::Credit => {
950                                    entry.1 = entry.1.saturating_add(posting.amount.minor())
951                                }
952                            }
953                        }
954                    }
955                }
956
957                let mut rows: Vec<TrialBalanceRow> = sums
958                    .into_iter()
959                    .map(|(asset, (debits, credits))| TrialBalanceRow {
960                        asset,
961                        debits,
962                        credits,
963                    })
964                    .collect();
965                rows.sort_by(|a, b| a.asset.as_str().cmp(b.asset.as_str()));
966                Ok(rows)
967            }
968        }
969    }
970
971    /// Read events from `book` starting at `from` (INCLUSIVE), returning at
972    /// most `limit`.
973    ///
974    /// Edge semantics matching sqlite:
975    /// - Unknown book → empty vec (sqlite returns no rows).
976    ///
977    /// # Durability watermark
978    ///
979    /// Log-replay reads must never surface frames that the in-memory state
980    /// hasn't applied yet.  The writer pipeline is: append → fsync →
981    /// apply-to-state → ack.  `state.next_seq` advances only after fsync AND
982    /// apply, so `ceiling = next_seq - 1` is the highest seq that is both
983    /// durable on disk and consistent with the in-memory index.  Frames with
984    /// `seq > ceiling` may be page-cache-visible but not fsynced; returning
985    /// them would be a dirty read — they could vanish on crash or diverge from
986    /// in-memory state.  We drop the state lock before scanning and filter out
987    /// any frames that exceed the ceiling.
988    async fn read_events(
989        &self,
990        book: &Book,
991        from: Seq,
992        limit: usize,
993    ) -> Result<Vec<Sequenced<LedgerEvent>>, StoreError> {
994        let writer = match self.existing_book(&book.0).await? {
995            None => return Ok(vec![]),
996            Some(w) => w,
997        };
998
999        // Read the durability ceiling under a short-lived read lock, then
1000        // release before doing disk I/O.
1001        let ceiling: Seq = {
1002            let st = writer.state.read().await;
1003            st.next_seq.saturating_sub(1)
1004        };
1005
1006        let wires = writer
1007            .catalog
1008            .scan_from(from, limit)
1009            .await
1010            .map_err(io_err)?;
1011
1012        Ok(wires
1013            .into_iter()
1014            .take_while(|w| w.seq <= ceiling)
1015            .map(|w| Sequenced {
1016                seq: w.seq,
1017                at: w.at,
1018                event: w.event,
1019            })
1020            .collect())
1021    }
1022
1023    fn subscribe(
1024        &self,
1025        book: &Book,
1026        from: Seq,
1027    ) -> BoxStream<'static, Result<Sequenced<LedgerEvent>, StoreError>> {
1028        // subscribe is sync — clone everything we need into the stream before returning.
1029        let books = Arc::clone(&self.books);
1030        let dir = self.dir.clone();
1031        let batch_max = self.batch_max;
1032        let segment_max = self.segment_max;
1033        let snapshot_every = self.snapshot_every;
1034        let book = book.clone();
1035
1036        Box::pin(async_stream::try_stream! {
1037            // Validate the book name before touching the filesystem.  Every
1038            // other entry point calls validate_book_name via book_writer /
1039            // existing_book; subscribe builds the path directly, so we must
1040            // check here.
1041            validate_book_name(&book.0)?;
1042
1043            // Step 1: get-or-create the BookWriter inside the async stream so we
1044            // can await it. This mirrors sqlite's behaviour: subscribing to a book
1045            // that has never been written to still works; future writes will be
1046            // received live.
1047            let writer: BookWriter = {
1048                // Fast path: read lock.
1049                let maybe = {
1050                    let g = books.read().await;
1051                    g.get(&book.0).cloned()
1052                };
1053                if let Some(w) = maybe {
1054                    w
1055                } else {
1056                    // Slow path: write lock, get-or-create.
1057                    let mut g = books.write().await;
1058                    if let Some(w) = g.get(&book.0).cloned() {
1059                        w
1060                    } else {
1061                        let book_dir = dir.join("books").join(&book.0);
1062                        tokio::fs::create_dir_all(&book_dir).await.map_err(|e| StoreError::Io(Box::new(e)))?;
1063                        let mut initial_st = crate::state::BookState::default();
1064                        initial_st.idem.attach_dir(&book_dir, || async { Ok(vec![]) }).await.map_err(|e| StoreError::Io(Box::new(e)))?;
1065                        let state = Arc::new(RwLock::new(initial_st));
1066                        let w = BookWriter::spawn_with_opts(book_dir, state, batch_max, snapshot_every, segment_max)
1067                            .await
1068                            .map_err(|e| StoreError::Io(Box::new(e)))?;
1069                        g.insert(book.0.clone(), w.clone());
1070                        w
1071                    }
1072                }
1073            };
1074
1075            // Step 2: subscribe to the live broadcast channel BEFORE reading history
1076            // so nothing between catch-up and live-tail is missed.
1077            let mut live = writer.events.subscribe();
1078
1079            // Step 3: page through history from `from` up to the durability watermark.
1080            let page_size = 512usize;
1081            let mut next = from;
1082
1083            loop {
1084                // Ceiling at the start of each page so we don't surface un-applied frames.
1085                let ceiling: Seq = {
1086                    let st = writer.state.read().await;
1087                    st.next_seq.saturating_sub(1)
1088                };
1089                if next > ceiling {
1090                    break;
1091                }
1092                let limit = page_size.min((ceiling - next + 1) as usize);
1093                let wires = writer.catalog.scan_from(next, limit).await.map_err(io_err)?;
1094                let page_len = wires.len();
1095                for wire in wires {
1096                    // Double-check ceiling in case ceiling moved backward (shouldn't happen,
1097                    // but be defensive).
1098                    if wire.seq > ceiling {
1099                        break;
1100                    }
1101                    next = wire.seq + 1;
1102                    yield Sequenced { seq: wire.seq, at: wire.at, event: wire.event };
1103                }
1104                // A short page means we're caught up to the watermark.
1105                if page_len < limit {
1106                    break;
1107                }
1108            }
1109
1110            // Step 4: live-tail loop.
1111            let mut last: Seq = next.saturating_sub(1);
1112            loop {
1113                match live.recv().await {
1114                    Ok(ev) => {
1115                        if ev.seq <= last {
1116                            // overlap from catch-up — skip
1117                            continue;
1118                        }
1119                        last = ev.seq;
1120                        yield Sequenced { seq: ev.seq, at: ev.at, event: ev.event };
1121                    }
1122                    Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
1123                        // We fell behind the broadcast ring — re-page from last+1.
1124                        let resume = last + 1;
1125                        let page_size_lag = 512usize;
1126                        let mut cursor = resume;
1127                        loop {
1128                            let ceiling: Seq = {
1129                                let st = writer.state.read().await;
1130                                st.next_seq.saturating_sub(1)
1131                            };
1132                            if cursor > ceiling {
1133                                break;
1134                            }
1135                            let limit = page_size_lag.min((ceiling - cursor + 1) as usize);
1136                            let wires = writer.catalog.scan_from(cursor, limit).await.map_err(io_err)?;
1137                            let page_len = wires.len();
1138                            for wire in wires {
1139                                if wire.seq > ceiling {
1140                                    break;
1141                                }
1142                                if wire.seq > last {
1143                                    last = wire.seq;
1144                                    cursor = wire.seq + 1;
1145                                    yield Sequenced { seq: wire.seq, at: wire.at, event: wire.event };
1146                                } else {
1147                                    cursor = wire.seq + 1;
1148                                }
1149                            }
1150                            if page_len < limit {
1151                                break;
1152                            }
1153                        }
1154                        // After re-paging, resume the live loop (re-subscribe isn't
1155                        // possible but new events will arrive normally).
1156                    }
1157                    Err(tokio::sync::broadcast::error::RecvError::Closed) => {
1158                        // Writer died — no more events. End the stream cleanly (matches sqlite).
1159                        return;
1160                    }
1161                }
1162            }
1163        })
1164    }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169    use super::*;
1170    use crate::writer::BookWriter;
1171    use talea_core::store::{AccountCfg, Store};
1172    use talea_core::types::*;
1173
1174    fn mk_tx(key: &str, minor: i64) -> Transaction {
1175        Transaction {
1176            id: TxId(uuid::Uuid::now_v7()),
1177            book: Book("b".into()),
1178            postings: vec![
1179                Posting {
1180                    account: AccountId {
1181                        book: Book("b".into()),
1182                        path: "cash".into(),
1183                    },
1184                    amount: Amount::new(minor, AssetId::new("USD")),
1185                    direction: Direction::Debit,
1186                },
1187                Posting {
1188                    account: AccountId {
1189                        book: Book("b".into()),
1190                        path: "rev".into(),
1191                    },
1192                    amount: Amount::new(minor, AssetId::new("USD")),
1193                    direction: Direction::Credit,
1194                },
1195            ],
1196            idempotency_key: IdempotencyKey(key.into()),
1197            external_refs: vec![],
1198            metadata: serde_json::Value::Null,
1199            occurred_at: chrono::Utc::now(),
1200        }
1201    }
1202
1203    fn usd() -> AssetDef {
1204        AssetDef {
1205            id: AssetId::new("USD"),
1206            class: AssetClass::Fiat,
1207            precision: 2,
1208            name: "Dollar".into(),
1209        }
1210    }
1211
1212    fn cash_def() -> AccountDef {
1213        AccountDef {
1214            id: AccountId {
1215                book: Book("b".into()),
1216                path: "cash".into(),
1217            },
1218            asset: AssetId::new("USD"),
1219            kind: AccountKind::Asset,
1220        }
1221    }
1222
1223    fn rev_def() -> AccountDef {
1224        AccountDef {
1225            id: AccountId {
1226                book: Book("b".into()),
1227                path: "rev".into(),
1228            },
1229            asset: AssetId::new("USD"),
1230            kind: AccountKind::Income,
1231        }
1232    }
1233
1234    async fn seeded(dir: &std::path::Path) -> LogTaleaStore {
1235        let store = LogTaleaStore::open(dir).await.unwrap();
1236        store.register_asset(&usd()).await.unwrap();
1237        let cfg = AccountCfg {
1238            normal_side: None,
1239            min_balance: None,
1240        };
1241        store.open_account(&cash_def(), &cfg).await.unwrap();
1242        store.open_account(&rev_def(), &cfg).await.unwrap();
1243        store
1244    }
1245
1246    #[tokio::test]
1247    async fn open_recovers_state_by_replay() {
1248        let dir = tempfile::tempdir().unwrap();
1249        {
1250            let store = seeded(dir.path()).await;
1251            store.commit(&mk_tx("k1", 25)).await.unwrap();
1252            store.shutdown().await;
1253        }
1254        let store = LogTaleaStore::open(dir.path()).await.unwrap();
1255        let bal = store.balance(&cash_def().id, None).await.unwrap();
1256        assert_eq!(bal.amount.minor(), 25);
1257        assert_eq!(bal.updated_seq, 3); // seq 1 = cash open, 2 = rev open, 3 = tx
1258        assert_eq!(
1259            store.asset(&AssetId::new("USD")).await.unwrap(),
1260            Some(usd())
1261        );
1262        // idempotency survives restart: replay returns the prior commit
1263        let replay = store.commit(&mk_tx("k1", 25)).await.unwrap();
1264        assert_eq!(replay.seq, 3);
1265        // and balance is unchanged
1266        assert_eq!(
1267            store
1268                .balance(&cash_def().id, None)
1269                .await
1270                .unwrap()
1271                .amount
1272                .minor(),
1273            25
1274        );
1275    }
1276
1277    #[tokio::test]
1278    async fn second_open_on_same_dir_is_refused() {
1279        let dir = tempfile::tempdir().unwrap();
1280        let _first = LogTaleaStore::open(dir.path()).await.unwrap();
1281        assert!(LogTaleaStore::open(dir.path()).await.is_err());
1282    }
1283
1284    #[tokio::test]
1285    async fn register_asset_idempotent_same_def_conflict_different() {
1286        let dir = tempfile::tempdir().unwrap();
1287        let store = LogTaleaStore::open(dir.path()).await.unwrap();
1288        store.register_asset(&usd()).await.unwrap();
1289        store.register_asset(&usd()).await.unwrap(); // same def: Ok, no second event
1290        let mut other = usd();
1291        other.precision = 8;
1292        assert!(matches!(
1293            store.register_asset(&other).await,
1294            Err(talea_core::store::StoreError::AlreadyExists { .. })
1295        ));
1296    }
1297
1298    #[tokio::test]
1299    async fn open_account_requires_registered_asset_and_real_book() {
1300        let dir = tempfile::tempdir().unwrap();
1301        let store = LogTaleaStore::open(dir.path()).await.unwrap();
1302        let cfg = AccountCfg {
1303            normal_side: None,
1304            min_balance: None,
1305        };
1306        assert!(matches!(
1307            store.open_account(&cash_def(), &cfg).await,
1308            Err(talea_core::store::StoreError::UnknownAsset(_))
1309        ));
1310        store.register_asset(&usd()).await.unwrap();
1311        let mut sys = cash_def();
1312        sys.id.book = Book("_system".into());
1313        assert!(matches!(
1314            store.open_account(&sys, &cfg).await,
1315            Err(talea_core::store::StoreError::InvalidBook(_))
1316        ));
1317    }
1318
1319    #[tokio::test]
1320    async fn commit_batch_is_positional_and_isolates_failures() {
1321        let dir = tempfile::tempdir().unwrap();
1322        let store = seeded(dir.path()).await;
1323        let mut bad = mk_tx("bad", 1);
1324        bad.postings[0].account.path = "ghost".into();
1325        let txs = vec![mk_tx("a", 1), bad, mk_tx("b", 2)];
1326        let out = store.commit_batch(&txs).await;
1327        assert!(out[0].is_ok());
1328        assert!(matches!(
1329            out[1],
1330            Err(talea_core::store::StoreError::UnknownAccount(_))
1331        ));
1332        assert!(out[2].is_ok());
1333        assert_eq!(
1334            out[0].as_ref().unwrap().seq + 1,
1335            out[2].as_ref().unwrap().seq,
1336            "gapless across the reject"
1337        );
1338    }
1339
1340    #[tokio::test]
1341    async fn store_use_after_shutdown_fails_cleanly() {
1342        let dir = tempfile::tempdir().unwrap();
1343        let store = seeded(dir.path()).await;
1344
1345        // Commit one tx so there's real state to inspect.
1346        store.commit(&mk_tx("k1", 10)).await.unwrap();
1347
1348        store.shutdown().await;
1349
1350        // commit must return Err, not panic, not succeed.
1351        let commit_res = store.commit(&mk_tx("k2", 5)).await;
1352        assert!(commit_res.is_err(), "commit after shutdown must fail");
1353
1354        // open_account must return Err.
1355        let cfg = AccountCfg {
1356            normal_side: None,
1357            min_balance: None,
1358        };
1359        let open_res = store.open_account(&cash_def(), &cfg).await;
1360        assert!(open_res.is_err(), "open_account after shutdown must fail");
1361
1362        // balance reads via book_writer — after shutdown the books map is empty,
1363        // so book_writer returns the shut_down error before it can recreate anything.
1364        let bal_res = store.balance(&cash_def().id, None).await;
1365        assert!(bal_res.is_err(), "balance after shutdown must fail");
1366    }
1367
1368    #[tokio::test]
1369    async fn empty_book_name_is_rejected() {
1370        let dir = tempfile::tempdir().unwrap();
1371        let store = LogTaleaStore::open(dir.path()).await.unwrap();
1372        store.register_asset(&usd()).await.unwrap();
1373
1374        // An empty book name must be rejected by validate_book_name, which
1375        // book_writer calls.  Using commit (which calls book_writer) is the
1376        // simplest exerciser.
1377        let mut tx = mk_tx("empty-book", 1);
1378        tx.book = Book("".into());
1379        tx.postings[0].account.book = Book("".into());
1380        tx.postings[1].account.book = Book("".into());
1381        let res = store.commit(&tx).await;
1382        assert!(res.is_err(), "empty book name must be rejected");
1383        let err_msg = format!("{:?}", res.unwrap_err());
1384        assert!(
1385            err_msg.contains("invalid book name"),
1386            "error should mention invalid book name: {err_msg}"
1387        );
1388    }
1389
1390    // -----------------------------------------------------------------------
1391    // Fix 1: read paths must not create books
1392    // -----------------------------------------------------------------------
1393
1394    /// `balance` for an account in a book that was never written must return
1395    /// `UnknownAccount` and must NOT create the book directory on disk.
1396    #[tokio::test]
1397    async fn balance_on_unknown_book_creates_nothing() {
1398        let dir = tempfile::tempdir().unwrap();
1399        let store = LogTaleaStore::open(dir.path()).await.unwrap();
1400
1401        let typo_account = AccountId {
1402            book: Book("typo".into()),
1403            path: "cash".into(),
1404        };
1405        let res = store.balance(&typo_account, None).await;
1406
1407        assert!(
1408            matches!(res, Err(talea_core::store::StoreError::UnknownAccount(_))),
1409            "expected UnknownAccount for nonexistent book, got {res:?}",
1410        );
1411
1412        // The book directory must NOT have been created.
1413        let book_dir = dir.path().join("books").join("typo");
1414        assert!(
1415            !book_dir.exists(),
1416            "balance must not create books/typo on disk, but it exists: {book_dir:?}",
1417        );
1418    }
1419
1420    // -----------------------------------------------------------------------
1421    // Fix 2: replay of corrupt log must error, not panic
1422    // -----------------------------------------------------------------------
1423
1424    /// Build a valid store, shut it down, hand-append a structurally valid
1425    /// (correct CRC) but arithmetically impossible frame (debits i64::MAX
1426    /// from an account whose balance is already > 0), then reopen.  The open
1427    /// must return `Err(StoreError::Io(…))` mentioning "corrupt log", not
1428    /// panic.
1429    #[tokio::test]
1430    async fn replay_of_overflowing_log_errors_not_panics() {
1431        use crate::frame::{WireEvent, encode_frame};
1432
1433        let dir = tempfile::tempdir().unwrap();
1434
1435        // 1. Build a valid store with one committed transaction.
1436        {
1437            let store = seeded(dir.path()).await;
1438            // commit one tx so cash has balance > 0 (raw_balance = 100)
1439            store.commit(&mk_tx("k1", 100)).await.unwrap();
1440            store.shutdown().await;
1441        }
1442        // At this point next_seq in book "b" is 4 (seq1=cash open, seq2=rev
1443        // open, seq3=k1 tx).  We forge seq 4 outside the store.
1444
1445        // 2. Find the segment file for book "b" (always segment-00000000000000000001.log
1446        //    since the test store is small and never rotates).
1447        let book_dir = dir.path().join("books").join("b");
1448        let seg_path = book_dir.join("segment-00000000000000000001.log");
1449        assert!(seg_path.exists(), "segment file must exist: {seg_path:?}");
1450
1451        // 3. Forge a frame that is structurally valid (correct CRC) but whose
1452        //    debit of i64::MAX against a balance of 100 cannot fit in i64.
1453        let forged_tx = Transaction {
1454            id: TxId(uuid::Uuid::now_v7()),
1455            book: Book("b".into()),
1456            postings: vec![
1457                Posting {
1458                    account: AccountId {
1459                        book: Book("b".into()),
1460                        path: "cash".into(),
1461                    },
1462                    amount: Amount::new(i64::MAX, AssetId::new("USD")),
1463                    direction: Direction::Debit,
1464                },
1465                Posting {
1466                    account: AccountId {
1467                        book: Book("b".into()),
1468                        path: "rev".into(),
1469                    },
1470                    amount: Amount::new(i64::MAX, AssetId::new("USD")),
1471                    direction: Direction::Credit,
1472                },
1473            ],
1474            idempotency_key: IdempotencyKey("overflow-forge".into()),
1475            external_refs: vec![],
1476            metadata: serde_json::Value::Null,
1477            occurred_at: chrono::Utc::now(),
1478        };
1479        let forged_wire = WireEvent {
1480            seq: 4, // next after the 3 valid events
1481            at: chrono::Utc::now(),
1482            event: talea_core::events::LedgerEvent::TransactionPosted(forged_tx),
1483        };
1484        let frame_bytes = encode_frame(&forged_wire).expect("encode must succeed");
1485
1486        // Append the forged frame directly.
1487        {
1488            use std::io::Write;
1489            let mut f = std::fs::OpenOptions::new()
1490                .append(true)
1491                .open(&seg_path)
1492                .expect("segment file must exist for append");
1493            f.write_all(&frame_bytes).unwrap();
1494        }
1495
1496        // 4. Reopen — must return Err mentioning "corrupt log", not panic.
1497        let result = LogTaleaStore::open(dir.path()).await;
1498        match result {
1499            Err(StoreError::Io(ref msg)) => {
1500                let s = msg.to_string();
1501                assert!(
1502                    s.contains("corrupt log"),
1503                    "error must mention 'corrupt log', got: {s}",
1504                );
1505            }
1506            Ok(_) => panic!("open must fail on overflow replay, but it succeeded"),
1507            Err(other) => panic!("expected StoreError::Io, got {other:?}"),
1508        }
1509    }
1510
1511    // -----------------------------------------------------------------------
1512    // Task 7: read path tests
1513    // -----------------------------------------------------------------------
1514
1515    /// Build a store with 3 transactions at distinct timestamps.
1516    /// Returns (store, txids, commit_ats).
1517    /// Seqs: 1 = cash open, 2 = rev open, 3/4/5 = the 3 transactions.
1518    async fn history_fixture(
1519        dir: &std::path::Path,
1520    ) -> (LogTaleaStore, Vec<TxId>, Vec<chrono::DateTime<chrono::Utc>>) {
1521        let store = seeded(dir).await;
1522        let mut txids = Vec::new();
1523        let mut times = Vec::new();
1524
1525        for (key, minor) in [("tx1", 10i64), ("tx2", 20), ("tx3", 30)] {
1526            let c = store.commit(&mk_tx(key, minor)).await.unwrap();
1527            txids.push(c.txid);
1528            times.push(c.at);
1529            // Ensure distinct microsecond timestamps for as_of binary search.
1530            tokio::time::sleep(std::time::Duration::from_millis(2)).await;
1531        }
1532
1533        (store, txids, times)
1534    }
1535
1536    #[tokio::test]
1537    async fn balance_as_of_binary_searches_posting_entries() {
1538        let dir = tempfile::tempdir().unwrap();
1539        let (store, _txids, times) = history_fixture(dir.path()).await;
1540
1541        let cash = cash_def().id;
1542
1543        // as_of before t1: no postings → amount 0, updated_seq 0.
1544        let before = times[0] - chrono::Duration::microseconds(1);
1545        let snap = store.balance(&cash, Some(before)).await.unwrap();
1546        assert_eq!(
1547            snap.amount.minor(),
1548            0,
1549            "as_of before first commit must be 0"
1550        );
1551        assert_eq!(
1552            snap.updated_seq, 0,
1553            "as_of before first commit: updated_seq must be 0"
1554        );
1555
1556        // as_of = t2 (second commit's at) → balance 30 (10+20), updated_seq = 4.
1557        let snap2 = store.balance(&cash, Some(times[1])).await.unwrap();
1558        assert_eq!(
1559            snap2.amount.minor(),
1560            30,
1561            "balance after 2 commits must be 30"
1562        );
1563        assert_eq!(
1564            snap2.updated_seq, 4,
1565            "updated_seq after 2 commits must be 4"
1566        );
1567
1568        // as_of = t3 → 60, updated_seq 5.
1569        let snap3 = store.balance(&cash, Some(times[2])).await.unwrap();
1570        assert_eq!(
1571            snap3.amount.minor(),
1572            60,
1573            "balance after 3 commits must be 60"
1574        );
1575        assert_eq!(
1576            snap3.updated_seq, 5,
1577            "updated_seq after 3 commits must be 5"
1578        );
1579    }
1580
1581    #[tokio::test]
1582    async fn account_history_pages_by_distinct_seq_after_seq_exclusive() {
1583        let dir = tempfile::tempdir().unwrap();
1584        let (store, _txids, times) = history_fixture(dir.path()).await;
1585
1586        let cash = cash_def().id;
1587
1588        // after_seq None, limit 2 → postings of seqs 3, 4 only.
1589        let page1 = store.account_history(&cash, None, 2).await.unwrap();
1590        assert_eq!(page1.len(), 2, "limit 2 must return 2 records");
1591        assert_eq!(page1[0].seq, 3);
1592        assert_eq!(page1[1].seq, 4);
1593        assert_eq!(page1[0].amount.minor(), 10);
1594        assert_eq!(page1[1].amount.minor(), 20);
1595        assert!(matches!(page1[0].direction, Direction::Debit));
1596        // at must match the commit time
1597        assert_eq!(page1[0].at, times[0]);
1598        assert_eq!(page1[1].at, times[1]);
1599
1600        // Resume after_seq Some(4), limit 10 → only seq 5.
1601        let page2 = store.account_history(&cash, Some(4), 10).await.unwrap();
1602        assert_eq!(page2.len(), 1);
1603        assert_eq!(page2[0].seq, 5);
1604        assert_eq!(page2[0].amount.minor(), 30);
1605
1606        // limit 0 → empty.
1607        let empty = store.account_history(&cash, None, 0).await.unwrap();
1608        assert!(empty.is_empty(), "limit 0 must return empty vec");
1609    }
1610
1611    #[tokio::test]
1612    async fn transaction_round_trips_through_disk() {
1613        let dir = tempfile::tempdir().unwrap();
1614        let (store, txids, times) = history_fixture(dir.path()).await;
1615
1616        // transaction(txid of 2nd commit) → seq 4, at == t2.
1617        let stored = store
1618            .transaction(&txids[1])
1619            .await
1620            .unwrap()
1621            .expect("should find 2nd tx");
1622        assert_eq!(stored.seq, 4, "2nd transaction must have seq 4");
1623        assert_eq!(stored.at, times[1], "committed_at must match");
1624        assert_eq!(stored.transaction.idempotency_key.0, "tx2");
1625
1626        // Unknown uuid → Ok(None).
1627        let unknown = TxId(uuid::Uuid::now_v7());
1628        let result = store.transaction(&unknown).await.unwrap();
1629        assert!(result.is_none(), "unknown txid must return Ok(None)");
1630    }
1631
1632    #[tokio::test]
1633    async fn trial_balance_none_and_as_of() {
1634        let dir = tempfile::tempdir().unwrap();
1635        let (store, _txids, times) = history_fixture(dir.path()).await;
1636
1637        let book = Book("b".into());
1638
1639        // None: both accounts use USD; debits = 60 (10+20+30 to cash), credits = 60 (to rev).
1640        let tb = store.trial_balance(&book, None).await.unwrap();
1641        assert_eq!(tb.len(), 1, "one USD row");
1642        assert_eq!(tb[0].asset, AssetId::new("USD"));
1643        assert_eq!(tb[0].debits, 60);
1644        assert_eq!(tb[0].credits, 60);
1645
1646        // as_of = t2: debits 30 (10+20), credits 30.
1647        let tb2 = store.trial_balance(&book, Some(times[1])).await.unwrap();
1648        assert_eq!(tb2.len(), 1);
1649        assert_eq!(tb2[0].debits, 30);
1650        assert_eq!(tb2[0].credits, 30);
1651
1652        // Unknown book → empty vec (matches sqlite).
1653        let unknown = Book("ghost".into());
1654        let tb3 = store.trial_balance(&unknown, None).await.unwrap();
1655        assert!(tb3.is_empty(), "unknown book must return empty vec");
1656    }
1657
1658    #[tokio::test]
1659    async fn read_events_from_inclusive_pages() {
1660        let dir = tempfile::tempdir().unwrap();
1661        let (store, _txids, _times) = history_fixture(dir.path()).await;
1662
1663        let book = Book("b".into());
1664
1665        // from=2, limit=2 → seqs [2, 3].
1666        let evs = store.read_events(&book, 2, 2).await.unwrap();
1667        assert_eq!(evs.len(), 2);
1668        assert_eq!(evs[0].seq, 2);
1669        assert_eq!(evs[1].seq, 3);
1670
1671        // from=5, limit=10 → [5].
1672        let evs5 = store.read_events(&book, 5, 10).await.unwrap();
1673        assert_eq!(evs5.len(), 1);
1674        assert_eq!(evs5[0].seq, 5);
1675
1676        // from past the end → empty.
1677        let evs_end = store.read_events(&book, 999, 10).await.unwrap();
1678        assert!(evs_end.is_empty(), "from past end must return empty vec");
1679
1680        // Unknown book → empty vec (matches sqlite).
1681        let unknown = Book("ghost".into());
1682        let evs_ghost = store.read_events(&unknown, 1, 10).await.unwrap();
1683        assert!(evs_ghost.is_empty(), "unknown book must return empty vec");
1684    }
1685
1686    // -----------------------------------------------------------------------
1687    // Hazard B: durability-watermark tests
1688    // -----------------------------------------------------------------------
1689
1690    /// Verify that `read_events` never surfaces frames beyond the durability
1691    /// ceiling (`state.next_seq - 1`).
1692    ///
1693    /// After 3 commits the ceiling equals the highest committed seq.
1694    /// `read_events(from=1, limit=100)` must return exactly seqs 1..=ceiling
1695    /// (the two AccountOpened events + 3 TransactionPosted = 5 frames) with no
1696    /// extras.  This pins the ceiling-filter plumbing: if the filter were
1697    /// absent, a page-cache frame beyond the ceiling could appear.
1698    #[tokio::test]
1699    async fn read_events_excludes_frames_beyond_ceiling() {
1700        let dir = tempfile::tempdir().unwrap();
1701        let (store, _txids, _times) = history_fixture(dir.path()).await;
1702
1703        let book = Book("b".into());
1704
1705        // 3 commits + 2 account opens = 5 events total; next_seq = 6.
1706        // ceiling = 5.
1707        let evs = store.read_events(&book, 1, 100).await.unwrap();
1708        let seqs: Vec<Seq> = evs.iter().map(|e| e.seq).collect();
1709
1710        // Must have exactly seqs 1..=5 and no more.
1711        assert_eq!(
1712            seqs,
1713            vec![1, 2, 3, 4, 5],
1714            "read_events must return exactly seqs 1..=ceiling"
1715        );
1716
1717        // Also verify from= skipping still applies within the ceiling.
1718        let evs_from4 = store.read_events(&book, 4, 100).await.unwrap();
1719        let seqs_from4: Vec<Seq> = evs_from4.iter().map(|e| e.seq).collect();
1720        assert_eq!(
1721            seqs_from4,
1722            vec![4, 5],
1723            "from=4 must return seqs 4 and 5 only"
1724        );
1725    }
1726
1727    // -----------------------------------------------------------------------
1728    // Task 12: Snapshot-assisted open path
1729    // -----------------------------------------------------------------------
1730
1731    /// Verify that `open` uses a snapshot for replay when one exists.
1732    ///
1733    /// # What this test proves
1734    ///
1735    /// 1. After shutdown + reopen, balances reflect all commits (basic sanity).
1736    /// 2. The snapshot file exists on disk after `snapshot_now`.
1737    /// 3. `load_latest` after the final shutdown returns a snapshot with seq
1738    ///    >= the 5th commit's seq, proving the snapshot was taken at or after
1739    ///    that point and that the open path would have used it.
1740    ///
1741    /// What we do NOT try to prove here: that the replay *skipped* pre-snapshot
1742    /// frames (that would require corrupting early frames, which is invasive
1743    /// and risks making the test fragile).  The correctness of the load path
1744    /// itself is proven by `snapshot_round_trips_book_state` and the fact that
1745    /// balances are correct after reopen.
1746    #[tokio::test]
1747    async fn open_uses_snapshot_plus_tail() {
1748        use crate::snapshot;
1749
1750        let dir = tempfile::tempdir().unwrap();
1751
1752        // Seqs: 1 = _system (USD), 2 = cash open, 3 = rev open (all in separate
1753        // books: _system and "b").  Within book "b", seqs are 1..=N per-book.
1754        // We commit 5 transactions to book "b", then snapshot, then 2 more.
1755
1756        let (first_5_seqs, snap_seq_at_least): (Vec<Seq>, Seq);
1757        {
1758            let store = seeded(dir.path()).await;
1759
1760            let mut seqs = Vec::new();
1761            for i in 0..5 {
1762                let c = store.commit(&mk_tx(&format!("k{i}"), 1)).await.unwrap();
1763                seqs.push(c.seq);
1764            }
1765
1766            // Snapshot after 5 commits.  The snapshot seq == last applied seq
1767            // in book "b" (3 events for asset+accounts are in _system, so in
1768            // "b" the seqs are [1..5] for the 5 txs; actual seq depends on
1769            // account opens going to "b" before commits).
1770            // The point is: after snapshot_now, a .snap file must exist.
1771            store.snapshot_now("b").await.unwrap();
1772
1773            // 2 more commits after the snapshot.
1774            for i in 5..7 {
1775                store.commit(&mk_tx(&format!("k{i}"), 2)).await.unwrap();
1776            }
1777
1778            first_5_seqs = seqs;
1779            // snap_seq_at_least: the snapshot must be >= the 5th commit's seq.
1780            snap_seq_at_least = *first_5_seqs.last().unwrap();
1781
1782            store.shutdown().await;
1783        }
1784
1785        // --- Verify snapshot file exists for book "b" ---
1786        let book_dir = dir.path().join("books").join("b");
1787        let snap = snapshot::load_latest(&book_dir).await.unwrap();
1788        assert!(
1789            snap.is_some(),
1790            "snapshot must exist for book b after snapshot_now + shutdown"
1791        );
1792        let (_, snap_seq) = snap.unwrap();
1793        assert!(
1794            snap_seq >= snap_seq_at_least,
1795            "snapshot seq {snap_seq} must be >= 5th-commit seq {snap_seq_at_least}"
1796        );
1797
1798        // --- Reopen and verify all 7 commits are reflected ---
1799        let store2 = LogTaleaStore::open(dir.path()).await.unwrap();
1800
1801        // cash should have: 5 × 1 + 2 × 2 = 9 total debited.
1802        let bal = store2.balance(&cash_def().id, None).await.unwrap();
1803        assert_eq!(
1804            bal.amount.minor(),
1805            9,
1806            "all 7 commits must be visible after reopen; got {}",
1807            bal.amount.minor()
1808        );
1809
1810        // Idempotency: replay of any of the first 5 keys returns the original seq.
1811        for (i, &orig_seq) in first_5_seqs.iter().enumerate() {
1812            let dup = store2.commit(&mk_tx(&format!("k{i}"), 1)).await.unwrap();
1813            assert_eq!(
1814                dup.seq, orig_seq,
1815                "idem key k{i} must resolve to original seq {orig_seq} after reopen, got {}",
1816                dup.seq
1817            );
1818        }
1819    }
1820
1821    // -----------------------------------------------------------------------
1822    // Task 13: Snapshot interplay with spilled idem runs
1823    // -----------------------------------------------------------------------
1824
1825    /// Commit enough transactions to overflow the hot idem cap (forcing spills),
1826    /// take a snapshot, reopen, and verify that BOTH hot (recent) and spilled
1827    /// (older) idem keys dedup correctly.
1828    ///
1829    /// Uses a small `idem_hot_cap` (8) so a handful of commits forces overflow.
1830    #[tokio::test]
1831    async fn snapshot_interplay_hot_and_spilled_keys_dedup() {
1832        let dir = tempfile::tempdir().unwrap();
1833
1834        // Open with a tiny hot cap so we force spills with few commits.
1835        let opts = LogStoreOptions {
1836            idem_hot_cap: 8,
1837            snapshot_every: BookWriter::DEFAULT_SNAPSHOT_EVERY,
1838            segment_max: crate::segment::DEFAULT_SEGMENT_MAX,
1839        };
1840
1841        let orig_seqs: Vec<talea_core::types::Seq>;
1842        {
1843            let store = LogTaleaStore::open_with(dir.path(), opts.clone())
1844                .await
1845                .unwrap();
1846            store.register_asset(&usd()).await.unwrap();
1847            let cfg = AccountCfg {
1848                normal_side: None,
1849                min_balance: None,
1850            };
1851            store.open_account(&cash_def(), &cfg).await.unwrap();
1852            store.open_account(&rev_def(), &cfg).await.unwrap();
1853
1854            // Commit 20 transactions (enough to spill with cap=8).
1855            let n = 20usize;
1856            let mut seqs = Vec::with_capacity(n);
1857            for i in 0..n {
1858                let c = store
1859                    .commit(&mk_tx(&format!("spill-idem-{i:03}"), 1))
1860                    .await
1861                    .unwrap();
1862                seqs.push(c.seq);
1863            }
1864
1865            // Take a snapshot so the next open uses it.
1866            store.snapshot_now("b").await.unwrap();
1867
1868            orig_seqs = seqs;
1869
1870            store.shutdown().await;
1871        }
1872
1873        // Reopen with the same small cap — spill run files should be present.
1874        let store2 = LogTaleaStore::open_with(dir.path(), opts).await.unwrap();
1875
1876        // ALL 20 keys must dedup to their original seq/txid.
1877        for (i, &orig_seq) in orig_seqs.iter().enumerate() {
1878            let key = format!("spill-idem-{i:03}");
1879            let dup = store2.commit(&mk_tx(&key, 1)).await.unwrap();
1880            assert_eq!(
1881                dup.seq, orig_seq,
1882                "idem key {key} must resolve to orig seq {orig_seq} after reopen with spilled runs; got {}",
1883                dup.seq,
1884            );
1885        }
1886    }
1887
1888    // -----------------------------------------------------------------------
1889    // Task 8: subscribe catch-up + live tail
1890    // -----------------------------------------------------------------------
1891
1892    /// subscribe with an invalid book name must yield Err as the first item and
1893    /// must NOT create any directory outside the books/ subtree.
1894    #[tokio::test]
1895    async fn subscribe_invalid_book_yields_err_and_creates_nothing() {
1896        use futures::StreamExt;
1897        let dir = tempfile::tempdir().unwrap();
1898        let store = LogTaleaStore::open(dir.path()).await.unwrap();
1899
1900        let mut stream = store.subscribe(&Book("../evil".into()), 1);
1901        let first = stream
1902            .next()
1903            .await
1904            .expect("stream must yield at least one item");
1905        assert!(
1906            first.is_err(),
1907            "first item must be Err for invalid book name"
1908        );
1909
1910        // The directory that would have been created by path traversal must not exist.
1911        let evil_dir = dir.path().join("evil");
1912        assert!(
1913            !evil_dir.exists(),
1914            "subscribe must not create the traversal target directory, but {:?} exists",
1915            evil_dir,
1916        );
1917    }
1918
1919    #[tokio::test]
1920    async fn subscribe_catch_up_then_live() {
1921        use futures::StreamExt;
1922        let dir = tempfile::tempdir().unwrap();
1923        let store = seeded(dir.path()).await; // 2 opens → seqs 1, 2
1924        store.commit(&mk_tx("h1", 10)).await.unwrap(); // seq 3
1925        let mut stream = store.subscribe(&Book("b".into()), 1);
1926        let mut seen = vec![];
1927        for _ in 0..3 {
1928            seen.push(stream.next().await.unwrap().unwrap().seq);
1929        }
1930        assert_eq!(seen, vec![1, 2, 3]);
1931        store.commit(&mk_tx("live1", 5)).await.unwrap(); // seq 4
1932        assert_eq!(stream.next().await.unwrap().unwrap().seq, 4);
1933        // a second subscriber from the middle
1934        let mut s2 = store.subscribe(&Book("b".into()), 3);
1935        assert_eq!(s2.next().await.unwrap().unwrap().seq, 3);
1936        assert_eq!(s2.next().await.unwrap().unwrap().seq, 4);
1937    }
1938
1939    /// Verify that `trial_balance(Some(t))` clamps to the durability ceiling.
1940    ///
1941    /// Commits two transactions in a single batch (so they very likely share
1942    /// the same `committed_at`).  Asserts that `trial_balance(as_of = that
1943    /// timestamp)` includes both transactions' effects.  This tests the
1944    /// ceiling + `as_of` interaction: the ceiling filter must not accidentally
1945    /// drop frames whose `at` equals the cutoff.
1946    #[tokio::test]
1947    async fn as_of_boundary_includes_all_commits_sharing_the_timestamp() {
1948        let dir = tempfile::tempdir().unwrap();
1949        let store = seeded(dir.path()).await;
1950
1951        // Submit two transactions in one batch so they share a committed_at.
1952        let batch = vec![mk_tx("tb1", 50), mk_tx("tb2", 75)];
1953        let results = store.commit_batch(&batch).await;
1954        let c1 = results[0].as_ref().expect("tx1 must succeed");
1955        let c2 = results[1].as_ref().expect("tx2 must succeed");
1956
1957        // If they share the same at, a trial_balance as_of that at must include both.
1958        if c1.at == c2.at {
1959            let book = Book("b".into());
1960            let tb = store.trial_balance(&book, Some(c1.at)).await.unwrap();
1961            assert!(
1962                !tb.is_empty(),
1963                "trial balance must not be empty after two commits"
1964            );
1965            let usd_row = tb
1966                .iter()
1967                .find(|r| r.asset == AssetId::new("USD"))
1968                .expect("USD row must exist");
1969            // Both txs debit cash and credit rev by 50 and 75 → total debits 125.
1970            assert_eq!(
1971                usd_row.debits, 125,
1972                "trial_balance(as_of) must include both batch-committed transactions; \
1973                 expected debits=125, got {}",
1974                usd_row.debits
1975            );
1976            assert_eq!(usd_row.credits, 125, "credits must also be 125");
1977        } else {
1978            // Clock ticked between the two commits; still verify the later as_of
1979            // includes both.
1980            let book = Book("b".into());
1981            let tb = store.trial_balance(&book, Some(c2.at)).await.unwrap();
1982            let usd_row = tb
1983                .iter()
1984                .find(|r| r.asset == AssetId::new("USD"))
1985                .expect("USD row must exist");
1986            assert_eq!(
1987                usd_row.debits, 125,
1988                "trial_balance(as_of=c2.at) must include both transactions even if timestamps differ"
1989            );
1990        }
1991    }
1992}