pub struct LogTaleaStore { /* private fields */ }Expand description
An append-log implementation of Store.
§Directory structure
<dir>/
LOCK ← exclusive advisory lock held for the process lifetime
books/
_system/ ← system book (assets)
<book>/ ← one dir per user book§Concurrency / single-process invariant
One LogTaleaStore per directory at a time is enforced via an fs4 advisory
lock on <dir>/LOCK. A second open on the same directory from the same
or a different process will fail with StoreError::Io.
§Book name safety
Book names are validated to prevent directory escape: names containing /,
\, or the component .. are rejected. This validation happens in every
path that accepts a book name from external input.
Implementations§
Source§impl LogTaleaStore
impl LogTaleaStore
Sourcepub async fn open(dir: &Path) -> Result<Self, StoreError>
pub async fn open(dir: &Path) -> Result<Self, StoreError>
Open (or create) the store at dir with default options.
See [open_with] for the full option set.
Sourcepub async fn open_with(
dir: &Path,
opts: LogStoreOptions,
) -> Result<Self, StoreError>
pub async fn open_with( dir: &Path, opts: LogStoreOptions, ) -> Result<Self, StoreError>
Open (or create) the store at dir with explicit options.
- Creates
<dir>/books/if missing. - Acquires an exclusive fs4 advisory lock on
<dir>/LOCK. - Replays each existing book dir under
<dir>/books/to rebuild in-memory state, then spawns aBookWriterper book. _systemis treated like any other book for replay;AssetRegisteredevents also populate the in-memory asset registry.- For each book, calls
idem.attach_dir(...)to load spill runs and rebuild the Bloom filter. If any run’s CRC fails or the Bloom file is missing/corrupt, a full log scan is performed to rebuild the runs.
Sourcepub async fn snapshot_now(&self, book: &str) -> Result<(), StoreError>
pub async fn snapshot_now(&self, book: &str) -> Result<(), StoreError>
Trigger an immediate snapshot for book and wait for it to complete.
This is an ops/test hook. The snapshot is written by the book’s writer task (the only mutator), so it is consistent with the current state.
Returns Ok(()) on success. Returns StoreError::Io if the book is
unknown, the store is shut down, or the snapshot write fails.
Sourcepub async fn shutdown(&self)
pub async fn shutdown(&self)
Drain all writers and await their completion.
Writers finish when their last sender clone is dropped. This method:
- Drains the books map (removing the store’s writer clones).
- For each writer, calls
BookWriter::shutdown(w)which consumes the clone — dropping itsmpsc::Sender— before awaiting the task. This guarantees the task sees sender-count → 0 and exits. - Releases the advisory LOCK file so a subsequent
openon the same directory succeeds in the same process (the test pattern callsshutdownbefore dropping and then reopens).
Note: takes &self (not self) so it can be called without consuming
the store. The caller is expected to drop the LogTaleaStore afterward.
Trait Implementations§
Source§impl Store for LogTaleaStore
impl Store for LogTaleaStore
Source§fn register_asset<'life0, 'life1, 'async_trait>(
&'life0 self,
asset: &'life1 AssetDef,
) -> Pin<Box<dyn Future<Output = Result<(), StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn register_asset<'life0, 'life1, 'async_trait>(
&'life0 self,
asset: &'life1 AssetDef,
) -> Pin<Box<dyn Future<Output = Result<(), StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Register an asset.
Idempotent: same def → Ok(()) with no new event.
Different def for same id → StoreError::AlreadyExists.
§Race note (same-new-asset concurrent register)
Both concurrent calls for a brand-new asset would each pass the “absent”
check and submit a Job::RegisterAsset. To prevent two AssetRegistered
frames being appended, we hold the registry WRITE lock across the submit
AND the ack from the writer. Asset registration is rare (a setup-time
operation, not a hot path), so a write-lock held across an await is
acceptable here. Documented explicitly.
Source§fn open_account<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
def: &'life1 AccountDef,
cfg: &'life2 AccountCfg,
) -> Pin<Box<dyn Future<Output = Result<(), StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn open_account<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
def: &'life1 AccountDef,
cfg: &'life2 AccountCfg,
) -> Pin<Box<dyn Future<Output = Result<(), StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Open an account.
Validates that:
- The book is not a reserved name (starts with
_). - The asset is registered.
Idempotent for the same (def, cfg) pair; AlreadyExists for a
conflicting def/cfg.
Source§fn commit<'life0, 'life1, 'async_trait>(
&'life0 self,
transaction: &'life1 Transaction,
) -> Pin<Box<dyn Future<Output = Result<Committed, StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn commit<'life0, 'life1, 'async_trait>(
&'life0 self,
transaction: &'life1 Transaction,
) -> Pin<Box<dyn Future<Output = Result<Committed, StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Commit a transaction.
Source§fn commit_batch<'life0, 'life1, 'async_trait>(
&'life0 self,
txs: &'life1 [Transaction],
) -> Pin<Box<dyn Future<Output = Vec<Result<Committed, StoreError>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn commit_batch<'life0, 'life1, 'async_trait>(
&'life0 self,
txs: &'life1 [Transaction],
) -> Pin<Box<dyn Future<Output = Vec<Result<Committed, StoreError>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Commit a batch of transactions with group-commit semantics.
All oneshot reply channels are created and submitted to the writer before awaiting any reply, so the writer can drain them into a single fsync batch. Results are positional.
Source§fn balance<'life0, 'life1, 'async_trait>(
&'life0 self,
account: &'life1 AccountId,
as_of: Option<DateTime<Utc>>,
) -> Pin<Box<dyn Future<Output = Result<BalanceSnapshot, StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn balance<'life0, 'life1, 'async_trait>(
&'life0 self,
account: &'life1 AccountId,
as_of: Option<DateTime<Utc>>,
) -> Pin<Box<dyn Future<Output = Result<BalanceSnapshot, StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Current balance for account.
as_of: None returns the current (fully-applied) balance from
in-memory state. as_of: Some(t) binary-searches the per-account
posting entries (sorted by committed_at, non-decreasing) to find the
balance as it stood at time t.
Uses existing_book (read-only lookup) so querying an account in a
book that has never been written to does NOT create that book’s
directory on disk. An absent book means the account cannot exist →
StoreError::UnknownAccount.
Edge semantics matching sqlite:
- Unknown book →
UnknownAccount(no book → no account). - Unknown account within a known book →
UnknownAccount. - Account exists but no postings before
as_of→BalanceSnapshot { amount: 0, updated_seq: 0 }.
Source§fn asset<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 AssetId,
) -> Pin<Box<dyn Future<Output = Result<Option<AssetDef>, StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn asset<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 AssetId,
) -> Pin<Box<dyn Future<Output = Result<Option<AssetDef>, StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Look up an asset by id.
Source§fn account_history<'life0, 'life1, 'async_trait>(
&'life0 self,
account: &'life1 AccountId,
after_seq: Option<Seq>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<Vec<PostingRecord>, StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn account_history<'life0, 'life1, 'async_trait>(
&'life0 self,
account: &'life1 AccountId,
after_seq: Option<Seq>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<Vec<PostingRecord>, StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Posting history for account, paginated by seq.
after_seq is EXCLUSIVE (matches sqlite semantics: seq > after_seq).
limit counts DISTINCT seqs so one transaction’s postings to the same
account are never split across pages.
Edge semantics matching sqlite:
- Unknown book or account →
UnknownAccount. - limit 0 → empty vec (never an error).
Source§fn transaction<'life0, 'life1, 'async_trait>(
&'life0 self,
txid: &'life1 TxId,
) -> Pin<Box<dyn Future<Output = Result<Option<StoredTransaction>, StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn transaction<'life0, 'life1, 'async_trait>(
&'life0 self,
txid: &'life1 TxId,
) -> Pin<Box<dyn Future<Output = Result<Option<StoredTransaction>, StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Look up a committed transaction by its TxId.
Searches all book writers’ in-memory txid indexes. Found → reads the
frame at the recorded position and returns a StoredTransaction.
Not found → Ok(None) (matches sqlite semantics).
Source§fn trial_balance<'life0, 'life1, 'async_trait>(
&'life0 self,
book: &'life1 Book,
as_of: Option<DateTime<Utc>>,
) -> Pin<Box<dyn Future<Output = Result<Vec<TrialBalanceRow>, StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn trial_balance<'life0, 'life1, 'async_trait>(
&'life0 self,
book: &'life1 Book,
as_of: Option<DateTime<Utc>>,
) -> Pin<Box<dyn Future<Output = Result<Vec<TrialBalanceRow>, StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Trial balance for book.
as_of: None returns the current lifetime sums from in-memory state
(cheap). as_of: Some(t) replays all TransactionPosted frames with
committed_at <= t from disk (rare, documented as slow path).
Edge semantics matching sqlite:
- Unknown book → empty vec (sqlite returns no rows for an unknown book).
- Rows are sorted by asset id string, matching sqlite’s
ORDER BY asset.
Source§fn read_events<'life0, 'life1, 'async_trait>(
&'life0 self,
book: &'life1 Book,
from: Seq,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<Vec<Sequenced<LedgerEvent>>, StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_events<'life0, 'life1, 'async_trait>(
&'life0 self,
book: &'life1 Book,
from: Seq,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<Vec<Sequenced<LedgerEvent>>, StoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Read events from book starting at from (INCLUSIVE), returning at
most limit.
Edge semantics matching sqlite:
- Unknown book → empty vec (sqlite returns no rows).
§Durability watermark
Log-replay reads must never surface frames that the in-memory state
hasn’t applied yet. The writer pipeline is: append → fsync →
apply-to-state → ack. state.next_seq advances only after fsync AND
apply, so ceiling = next_seq - 1 is the highest seq that is both
durable on disk and consistent with the in-memory index. Frames with
seq > ceiling may be page-cache-visible but not fsynced; returning
them would be a dirty read — they could vanish on crash or diverge from
in-memory state. We drop the state lock before scanning and filter out
any frames that exceed the ceiling.