Skip to main content

LogTaleaStore

Struct LogTaleaStore 

Source
pub struct LogTaleaStore { /* private fields */ }
Expand description

An append-log implementation of Store.

§Directory structure

<dir>/
  LOCK                   ← exclusive advisory lock held for the process lifetime
  books/
    _system/             ← system book (assets)
    <book>/              ← one dir per user book

§Concurrency / single-process invariant

One LogTaleaStore per directory at a time is enforced via an fs4 advisory lock on <dir>/LOCK. A second open on the same directory from the same or a different process will fail with StoreError::Io.

§Book name safety

Book names are validated to prevent directory escape: names containing /, \, or the component .. are rejected. This validation happens in every path that accepts a book name from external input.

Implementations§

Source§

impl LogTaleaStore

Source

pub async fn open(dir: &Path) -> Result<Self, StoreError>

Open (or create) the store at dir with default options.

See [open_with] for the full option set.

Source

pub async fn open_with( dir: &Path, opts: LogStoreOptions, ) -> Result<Self, StoreError>

Open (or create) the store at dir with explicit options.

  1. Creates <dir>/books/ if missing.
  2. Acquires an exclusive fs4 advisory lock on <dir>/LOCK.
  3. Replays each existing book dir under <dir>/books/ to rebuild in-memory state, then spawns a BookWriter per book.
  4. _system is treated like any other book for replay; AssetRegistered events also populate the in-memory asset registry.
  5. For each book, calls idem.attach_dir(...) to load spill runs and rebuild the Bloom filter. If any run’s CRC fails or the Bloom file is missing/corrupt, a full log scan is performed to rebuild the runs.
Source

pub async fn snapshot_now(&self, book: &str) -> Result<(), StoreError>

Trigger an immediate snapshot for book and wait for it to complete.

This is an ops/test hook. The snapshot is written by the book’s writer task (the only mutator), so it is consistent with the current state.

Returns Ok(()) on success. Returns StoreError::Io if the book is unknown, the store is shut down, or the snapshot write fails.

Source

pub async fn shutdown(&self)

Drain all writers and await their completion.

Writers finish when their last sender clone is dropped. This method:

  1. Drains the books map (removing the store’s writer clones).
  2. For each writer, calls BookWriter::shutdown(w) which consumes the clone — dropping its mpsc::Sender — before awaiting the task. This guarantees the task sees sender-count → 0 and exits.
  3. Releases the advisory LOCK file so a subsequent open on the same directory succeeds in the same process (the test pattern calls shutdown before dropping and then reopens).

Note: takes &self (not self) so it can be called without consuming the store. The caller is expected to drop the LogTaleaStore afterward.

Trait Implementations§

Source§

impl Store for LogTaleaStore

Source§

fn register_asset<'life0, 'life1, 'async_trait>( &'life0 self, asset: &'life1 AssetDef, ) -> Pin<Box<dyn Future<Output = Result<(), StoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Register an asset.

Idempotent: same def → Ok(()) with no new event. Different def for same id → StoreError::AlreadyExists.

§Race note (same-new-asset concurrent register)

Both concurrent calls for a brand-new asset would each pass the “absent” check and submit a Job::RegisterAsset. To prevent two AssetRegistered frames being appended, we hold the registry WRITE lock across the submit AND the ack from the writer. Asset registration is rare (a setup-time operation, not a hot path), so a write-lock held across an await is acceptable here. Documented explicitly.

Source§

fn open_account<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, def: &'life1 AccountDef, cfg: &'life2 AccountCfg, ) -> Pin<Box<dyn Future<Output = Result<(), StoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Open an account.

Validates that:

  • The book is not a reserved name (starts with _).
  • The asset is registered.

Idempotent for the same (def, cfg) pair; AlreadyExists for a conflicting def/cfg.

Source§

fn commit<'life0, 'life1, 'async_trait>( &'life0 self, transaction: &'life1 Transaction, ) -> Pin<Box<dyn Future<Output = Result<Committed, StoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Commit a transaction.

Source§

fn commit_batch<'life0, 'life1, 'async_trait>( &'life0 self, txs: &'life1 [Transaction], ) -> Pin<Box<dyn Future<Output = Vec<Result<Committed, StoreError>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Commit a batch of transactions with group-commit semantics.

All oneshot reply channels are created and submitted to the writer before awaiting any reply, so the writer can drain them into a single fsync batch. Results are positional.

Source§

fn balance<'life0, 'life1, 'async_trait>( &'life0 self, account: &'life1 AccountId, as_of: Option<DateTime<Utc>>, ) -> Pin<Box<dyn Future<Output = Result<BalanceSnapshot, StoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Current balance for account.

as_of: None returns the current (fully-applied) balance from in-memory state. as_of: Some(t) binary-searches the per-account posting entries (sorted by committed_at, non-decreasing) to find the balance as it stood at time t.

Uses existing_book (read-only lookup) so querying an account in a book that has never been written to does NOT create that book’s directory on disk. An absent book means the account cannot exist → StoreError::UnknownAccount.

Edge semantics matching sqlite:

  • Unknown book → UnknownAccount (no book → no account).
  • Unknown account within a known book → UnknownAccount.
  • Account exists but no postings before as_ofBalanceSnapshot { amount: 0, updated_seq: 0 }.
Source§

fn asset<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 AssetId, ) -> Pin<Box<dyn Future<Output = Result<Option<AssetDef>, StoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Look up an asset by id.

Source§

fn account_history<'life0, 'life1, 'async_trait>( &'life0 self, account: &'life1 AccountId, after_seq: Option<Seq>, limit: usize, ) -> Pin<Box<dyn Future<Output = Result<Vec<PostingRecord>, StoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Posting history for account, paginated by seq.

after_seq is EXCLUSIVE (matches sqlite semantics: seq > after_seq). limit counts DISTINCT seqs so one transaction’s postings to the same account are never split across pages.

Edge semantics matching sqlite:

  • Unknown book or account → UnknownAccount.
  • limit 0 → empty vec (never an error).
Source§

fn transaction<'life0, 'life1, 'async_trait>( &'life0 self, txid: &'life1 TxId, ) -> Pin<Box<dyn Future<Output = Result<Option<StoredTransaction>, StoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Look up a committed transaction by its TxId.

Searches all book writers’ in-memory txid indexes. Found → reads the frame at the recorded position and returns a StoredTransaction. Not found → Ok(None) (matches sqlite semantics).

Source§

fn trial_balance<'life0, 'life1, 'async_trait>( &'life0 self, book: &'life1 Book, as_of: Option<DateTime<Utc>>, ) -> Pin<Box<dyn Future<Output = Result<Vec<TrialBalanceRow>, StoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Trial balance for book.

as_of: None returns the current lifetime sums from in-memory state (cheap). as_of: Some(t) replays all TransactionPosted frames with committed_at <= t from disk (rare, documented as slow path).

Edge semantics matching sqlite:

  • Unknown book → empty vec (sqlite returns no rows for an unknown book).
  • Rows are sorted by asset id string, matching sqlite’s ORDER BY asset.
Source§

fn read_events<'life0, 'life1, 'async_trait>( &'life0 self, book: &'life1 Book, from: Seq, limit: usize, ) -> Pin<Box<dyn Future<Output = Result<Vec<Sequenced<LedgerEvent>>, StoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Read events from book starting at from (INCLUSIVE), returning at most limit.

Edge semantics matching sqlite:

  • Unknown book → empty vec (sqlite returns no rows).
§Durability watermark

Log-replay reads must never surface frames that the in-memory state hasn’t applied yet. The writer pipeline is: append → fsync → apply-to-state → ack. state.next_seq advances only after fsync AND apply, so ceiling = next_seq - 1 is the highest seq that is both durable on disk and consistent with the in-memory index. Frames with seq > ceiling may be page-cache-visible but not fsynced; returning them would be a dirty read — they could vanish on crash or diverge from in-memory state. We drop the state lock before scanning and filter out any frames that exceed the ceiling.

Source§

fn subscribe( &self, book: &Book, from: Seq, ) -> BoxStream<'static, Result<Sequenced<LedgerEvent>, StoreError>>

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more