Skip to main content

Crate of_persist

Crate of_persist 

Source
Expand description

§of_persist

of_persist provides append-only JSONL persistence for normalized orderflow events, with optional retention pruning. It is designed for replay, auditability, and post-trade research workflows.

§Main Types

§New In 0.3.0

Newly-written JSONL records now include additive schema metadata:

  • "schema": 1
  • ts_exchange_ns
  • ts_recv_ns

Legacy records without these fields remain readable. Runtime tests now cover persist -> readback -> replay parity for analytics, signals, and materialized book state.

§New In 0.2.0

Relative to the 0.1.x line, of_persist is no longer write-only. It now includes:

  • discovery APIs for venues, symbols, and streams
  • typed readback APIs for books and trades
  • merged event replay reads
  • inclusive sequence-range filtering

That makes the crate useful for replay, incident analysis, and research instead of only append-only storage.

§Public API Inventory

Public types:

Public methods:

§Storage Layout

Events are written to:

<root>/<venue>/<symbol>/(book|trades).jsonl

This makes stream files easy to map into replay and analytics pipelines.

§Record Schema Reference

Persisted JSONL records are additive and versioned with "schema": 1. Newly-written records include event timestamps (ts_exchange_ns, ts_recv_ns) alongside sequence and payload fields. The typed readback API continues to accept legacy records that do not contain schema or timestamp fields.

StoredBookEvent contains:

  • side, level, price, size, action
  • sequence

StoredTradeEvent contains:

  • price, size, aggressor_side
  • sequence

StoredEvent is the merged replay enum:

  • StoredEvent::Book(StoredBookEvent)
  • StoredEvent::Trade(StoredTradeEvent)

StoredEvent::sequence returns the merged event sequence regardless of variant.

§Readback API

RollingStore now supports additive typed readback over the same files it already writes:

  • list_venues() enumerates discovered venue directories
  • list_symbols(venue) enumerates discovered symbols for one venue
  • list_streams(venue, symbol) enumerates discovered JSONL streams for one symbol
  • read_books(venue, symbol) reads book.jsonl into StoredBookEvent values
  • read_books_in_range(venue, symbol, from_sequence, to_sequence) applies inclusive sequence filtering to book reads
  • read_trades(venue, symbol) reads trades.jsonl into StoredTradeEvent values
  • read_trades_in_range(venue, symbol, from_sequence, to_sequence) applies inclusive sequence filtering to trade reads
  • read_events(venue, symbol) merges both streams into StoredEvent values ordered by sequence
  • read_events_in_range(venue, symbol, from_sequence, to_sequence) applies inclusive sequence filtering to merged reads
  • missing streams return an empty vector
  • malformed lines return PersistError::Io with InvalidData

§Ordering and Range Semantics

  • append methods always write one JSON object per line
  • read_books* and read_trades* preserve file order
  • read_events* merges book and trade streams by ascending sequence
  • *_in_range methods use inclusive from_sequence / to_sequence bounds
  • None for a bound means it is open-ended on that side
  • missing book.jsonl or trades.jsonl files are treated as empty streams, not hard errors

§RollingStore Contract

§Quick Example

use of_core::{Side, SymbolId, TradePrint};
use of_persist::RollingStore;

let store = RollingStore::new("data").expect("store");

store.append_trade(&TradePrint {
    symbol: SymbolId {
        venue: "CME".to_string(),
        symbol: "ESM6".to_string(),
    },
    price: 505_000,
    size: 2,
    aggressor_side: Side::Ask,
    sequence: 1,
    ts_exchange_ns: 1,
    ts_recv_ns: 2,
}).expect("append");

§Readback Example

use of_persist::RollingStore;

let store = RollingStore::new("data").expect("store");
let venues = store.list_venues().expect("list venues");
let symbols = store.list_symbols("CME").expect("list symbols");
let streams = store.list_streams("CME", "ESM6").expect("list streams");
let trades = store
    .read_trades_in_range("CME", "ESM6", Some(10), Some(100))
    .expect("read trades");

println!("venues={venues:?} symbols={symbols:?} streams={streams:?}");
for trade in trades {
    println!("seq={} price={} size={}", trade.sequence, trade.price, trade.size);
}

§Replay Read Example

use of_persist::{RollingStore, StoredEvent};

let store = RollingStore::new("data").expect("store");
let events = store.read_events("CME", "ESM6").expect("read events");

for event in events {
    match event {
        StoredEvent::Book(book) => println!("book seq={} px={}", book.sequence, book.price),
        StoredEvent::Trade(trade) => println!("trade seq={} px={}", trade.sequence, trade.price),
    }
}

§Retention Example

use of_persist::{RetentionPolicy, RollingStore};

let store = RollingStore::new("data")?
    .with_retention(Some(RetentionPolicy {
        max_total_bytes: 2 * 1024 * 1024 * 1024,
        max_age_secs: 7 * 24 * 60 * 60,
    }));

let _ = store;

§Retention Behavior

  • max_age_secs > 0: files older than threshold are pruned.
  • max_total_bytes > 0: oldest files are pruned until under limit.
  • 0 means that limit is disabled.

§Error Semantics

  • PersistError::Io wraps filesystem and parse failures.
  • directory creation happens eagerly on store creation, so path permission issues surface early.
  • retention pruning is best-effort within normal append flows; it is not a separate daemon or background compactor.

§Real-World Use Cases

§1. Incident review after a bad fill or missed signal

Read back the exact normalized book/trade stream that the runtime saw and reconstruct the session around the problematic sequence range.

§2. Research dataset generation

Persist normalized data during live or simulated sessions, then read back only the venue/symbol windows needed for offline analysis.

§3. Deterministic replay

Use read_events(...) or read_events_in_range(...) to feed ordered events back into test or replay tooling.

§Detailed Example: Investigate A Sequence Window

use of_persist::{RollingStore, StoredEvent};

fn main() {
    let store = RollingStore::new("data").expect("store");
    let events = store
        .read_events_in_range("CME", "ESM6", Some(10_000), Some(10_150))
        .expect("events");

    for event in events {
        match event {
            StoredEvent::Book(book) => {
                println!(
                    "BOOK seq={} level={} px={} size={}",
                    book.sequence, book.level, book.price, book.size
                );
            }
            StoredEvent::Trade(trade) => {
                println!(
                    "TRADE seq={} px={} size={}",
                    trade.sequence, trade.price, trade.size
                );
            }
        }
    }
}

§Detailed Example: Discovery-First Replay Preparation

use of_persist::RollingStore;

fn main() {
    let store = RollingStore::new("data").expect("store");

    for venue in store.list_venues().expect("venues") {
        println!("venue={venue}");
        for symbol in store.list_symbols(&venue).expect("symbols") {
            let streams = store.list_streams(&venue, &symbol).expect("streams");
            println!("  symbol={symbol} streams={streams:?}");
        }
    }
}

Structs§

RetentionPolicy
Retention policy used by RollingStore.
RollingStore
JSONL rolling store for book/trade stream persistence.
StoredBookEvent
Parsed book event read back from persisted JSONL storage.
StoredTradeEvent
Parsed trade event read back from persisted JSONL storage.

Enums§

PersistError
Persistence-layer errors.
StoredEvent
Merged persisted event used for replay-oriented symbol reads.

Type Aliases§

PersistResult
Result type alias used by persistence APIs.