fin-stream
Lock-free streaming primitives for real-time financial market data. Provides a composable ingestion pipeline from raw exchange ticks to normalized, transformed features ready for downstream models or trade execution. Built on Tokio. Targets 100 K+ ticks/second throughput with zero heap allocation on the fast path.
v2.4.0 — 40 K+ lines of production Rust. Includes an extensive analytics
suite: 200+ static analytics on NormalizedTick, 200+ on OhlcvBar, and 80+
rolling-window analytics on MinMaxNormalizer and ZScoreNormalizer (rounds 1–88).
What Is Included
| Module | Purpose | Key types |
|---|---|---|
ws |
WebSocket connection lifecycle with exponential-backoff reconnect and backpressure | WsManager, ConnectionConfig, ReconnectPolicy |
tick |
Convert raw exchange payloads (Binance/Coinbase/Alpaca/Polygon) into a single canonical form; 200+ batch analytics on tick slices | RawTick, NormalizedTick, Exchange, TradeSide, TickNormalizer |
ring |
Lock-free SPSC ring buffer: zero-allocation hot path between normalizer and consumers | SpscRing<T, N>, SpscProducer, SpscConsumer |
book |
Incremental order book delta streaming with snapshot reset and crossed-book detection | OrderBook, BookDelta, BookSide, PriceLevel |
ohlcv |
Bar construction at any Seconds / Minutes / Hours timeframe with optional gap-fill bars; 200+ batch analytics on bar slices |
OhlcvAggregator, OhlcvBar, Timeframe |
health |
Per-feed staleness detection with configurable thresholds and a circuit-breaker | HealthMonitor, FeedHealth, HealthStatus |
session |
Trading-status classification (Open / Extended / Closed) for US Equity, Crypto, Forex | SessionAwareness, MarketSession, TradingStatus |
norm |
Rolling min-max and z-score normalizers for streaming observations; 80+ analytics each (moments, percentiles, entropy, trend, etc.) | MinMaxNormalizer, ZScoreNormalizer |
lorentz |
Lorentz spacetime transforms for feature engineering on price-time coordinates | LorentzTransform, SpacetimePoint |
error |
Unified typed error hierarchy covering every pipeline failure mode | StreamError |
Design Principles
- Never panic on valid production inputs. Every fallible operation returns
Result<_, StreamError>. The only intentional panic isMinMaxNormalizer::new(0), which is an API misuse guard documented in the function-level doc comment. - Zero heap allocation on the hot path.
SpscRing<T, N>is a const-generic array;push/popnever callmalloc.NormalizedTickis stack-allocated. - Exact decimal arithmetic for prices. All price and quantity fields use
rust_decimal::Decimal, neverf64.f64is used only for the dimensionlessbeta/gammaLorentz parameters and thef64normalizer observations. - Thread-safety where needed.
HealthMonitorusesDashMapfor concurrent feed updates.OrderBookisSend + Sync.SpscRingsplits into producer/consumer halves that are individuallySend. - No unsafe code.
#![forbid(unsafe_code)]is active inlib.rs. The SPSC ring buffer usesUnsafeCellwith a documented safety invariant, gated behind a safe public API.
Architecture
Tick Source (WebSocket / simulation)
|
v
[ WsManager ] -- connection lifecycle, exponential-backoff reconnect
|
v
[ TickNormalizer ] -- raw JSON payload -> NormalizedTick (all exchanges)
|
v
[ SPSC Ring Buffer ] -- lock-free O(1) push/pop, zero allocation hot path
|
v
[ OHLCV Aggregator ] -- streaming bar construction at any timeframe
|
v
[ MinMax / ZScore Normalizer ] -- rolling-window coordinate normalization
|
+---> [ Lorentz Transform ] -- relativistic spacetime boost for features
|
v
Downstream (ML model | trade signal engine | order management)
Parallel paths:
[ OrderBook ] -- delta streaming, snapshot reset, crossed-book guard
[ HealthMonitor ] -- per-feed staleness detection, circuit-breaker
[ SessionAwareness ] -- Open / Extended / Closed classification
Analytics Suite
Over 88 rounds of development, fin-stream has accumulated a comprehensive analytics suite covering every layer of the pipeline.
NormalizedTick Batch Analytics (200+ functions)
Static methods operating on &[NormalizedTick] slices for microstructure analysis:
| Category | Example functions |
|---|---|
| VWAP / price | vwap, vwap_deviation_std, volume_weighted_mid_price, mid_price_drift |
| Volume / notional | total_volume, buy_volume, sell_volume, buy_notional, sell_notional_fraction, max_notional, min_notional, trade_notional_std |
| Side / flow | buy_count, sell_count, buy_sell_count_ratio, buy_sell_size_ratio, order_flow_imbalance, buy_sell_avg_qty_ratio |
| Price movement | price_range, price_mean, price_mad, price_dispersion, max_price_gap, price_range_velocity, max_price_drop, max_price_rise |
| Tick direction | uptick_count, downtick_count, uptick_fraction, tick_direction_bias, price_mean_crossover_count |
| Timing / arrival | tick_count_per_ms, volume_per_ms, inter_arrival_variance, inter_arrival_cv, notional_per_second |
| Concentration | quantity_concentration, price_level_volume, quantity_std, notional_skewness |
| Running extremes | running_high_count, running_low_count, max_consecutive_side_run |
| Spread / efficiency | spread_efficiency, realized_spread, adverse_selection_score, price_impact_per_unit |
OhlcvBar Batch Analytics (200+ functions)
Static methods operating on &[OhlcvBar] slices:
| Category | Example functions |
|---|---|
| Candle structure | body_fraction, bullish_ratio, avg_bar_efficiency, avg_wick_symmetry, body_to_range_std |
| Highs / lows | peak_close, trough_close, max_high, min_low, higher_highs_count, lower_lows_count, new_high_count, new_low_count |
| Volume | mean_volume, up_volume_fraction, down_close_volume, up_close_volume, max_bar_volume, min_bar_volume, high_volume_fraction |
| Close statistics | mean_close, close_std, close_skewness, close_at_high_fraction, close_at_low_fraction, close_cluster_count |
| Range / movement | total_range, range_std_dev, avg_range_pct_of_open, volume_per_range, total_body_movement, avg_open_to_close |
| Patterns | continuation_bar_count, zero_volume_fraction, complete_fraction |
| Shadow analysis | avg_lower_shadow_ratio, tail_upper_fraction, tail_lower_fraction, avg_lower_wick_to_range |
| VWAP / price | mean_vwap, normalized_close, price_channel_position, candle_score |
Normalizer Analytics (80+ functions each)
Both MinMaxNormalizer and ZScoreNormalizer expose identical analytics suites:
| Category | Example functions |
|---|---|
| Central tendency | mean, median, geometric_mean, harmonic_mean, exponential_weighted_mean |
| Dispersion | variance_f64, std_dev, interquartile_range, range_over_mean, coeff_of_variation, rms |
| Shape | skewness, kurtosis, second_moment, tail_variance |
| Rank / quantile | percentile_rank, quantile_range, value_rank, distance_from_median |
| Threshold | count_above, above_median_fraction, below_mean_fraction, outlier_fraction, zero_fraction |
| Trend | momentum, rolling_mean_change, is_mean_stable, sign_flip_count, new_max_count, new_min_count |
| Extremes | max_fraction, min_fraction, peak_to_trough_ratio, range_normalized_value |
| Misc | ema_of_z_scores, rms, distinct_count, interquartile_mean, latest_minus_mean, latest_to_mean_ratio |
Mathematical Definitions
Min-Max Normalization
Given a rolling window of W observations x_1, ..., x_W with minimum m and
maximum M, the normalized value of a new sample x is:
x_norm = (x - m) / (M - m) when M != m
x_norm = 0.0 when M == m (degenerate; all window values identical)
The result is clamped to [0.0, 1.0]. This ensures that observations falling
outside the current window range are mapped to the boundary rather than outside it.
Z-Score Normalization
Given a rolling window of W observations with mean μ and standard deviation σ:
z = (x - μ) / σ when σ != 0
z = 0.0 when σ == 0 (degenerate; all window values identical)
ZScoreNormalizer also provides IQR, percentile rank, variance, EMA of z-scores,
and rolling mean change across the window.
Lorentz Transform
The LorentzTransform applies the special-relativistic boost with velocity
parameter beta = v/c (speed of light normalized to c = 1):
t' = gamma * (t - beta * x)
x' = gamma * (x - beta * t)
where beta = v/c (0 <= beta < 1, dimensionless drift velocity)
gamma = 1 / sqrt(1 - beta^2) (Lorentz factor, always >= 1)
The inverse transform is:
t = gamma * (t' + beta * x')
x = gamma * (x' + beta * t')
The spacetime interval s^2 = t^2 - x^2 is invariant under the transform.
beta = 0 gives the identity (gamma = 1). beta >= 1 is invalid (gamma is
undefined) and is rejected at construction time with StreamError::LorentzConfigError.
Financial interpretation. t is elapsed time normalized to a convenient
scale. x is a normalized log-price or price coordinate. The boost maps the
price-time plane along Lorentz hyperbolas. Certain microstructure signals that
appear curved in the untransformed frame can appear as straight lines in a
suitably boosted frame, simplifying downstream linear models.
OHLCV Invariants
Every completed OhlcvBar satisfies:
| Invariant | Expression |
|---|---|
| High is largest | high >= max(open, close) |
| Low is smallest | low <= min(open, close) |
| Valid ordering | high >= low |
| Volume non-negative | volume >= 0 |
Order Book Guarantees
| Property | Guarantee |
|---|---|
| No crossed book | Any delta that would produce best_bid >= best_ask is rejected with StreamError::BookCrossed; the book is not mutated |
| Sequence gap detection | If a delta carries a sequence number that is not exactly last_sequence + 1, the apply returns StreamError::BookReconstructionFailed |
| Zero quantity removes level | A delta with quantity = 0 removes the price level entirely |
Reconnect Backoff
ReconnectPolicy::backoff_for_attempt(n) returns:
backoff(n) = min(initial_backoff * multiplier^n, max_backoff)
multiplier must be >= 1.0 and max_attempts must be > 0; both are validated
at construction time.
Performance Characteristics
| Metric | Value |
|---|---|
| SPSC push/pop latency | O(1), single cache-line access |
| SPSC throughput | >100 K ticks/second (zero allocation) |
| OHLCV feed per tick | O(1) |
| Normalization update | O(1) amortized; O(W) after window eviction |
| Lorentz transform | O(1), two multiplications per coordinate |
| Ring buffer memory | N * sizeof(T) bytes (N is const generic) |
Quickstart
Normalize a Binance tick and aggregate OHLCV
use ;
use ;
use json;
SPSC ring buffer pipeline
use SpscRing;
use ;
use json;
Min-max normalization of closing prices
use MinMaxNormalizer;
Z-score normalization with analytics
use ZScoreNormalizer;
Lorentz feature engineering
use ;
Order book delta streaming
use ;
use dec;
Feed health monitoring with circuit breaker
use HealthMonitor;
Session classification
use ;
API Reference
tick module
// Parse an exchange identifier string.
from_str // "Binance" / "Coinbase" / "Alpaca" / "Polygon"
// Construct a raw tick (system clock stamp applied automatically).
new
ring module
// Create a const-generic SPSC ring buffer.
new // N slots, zero allocation
// Split into thread-safe producer/consumer halves.
split // StreamError::RingBufferFull on overflow
pop // StreamError::RingBufferEmpty on underflow
len // items currently queued
is_empty // always N
// Analytics on a populated ring (clone-based reads on initialized slots)
sum_cloned where T: Clone + Sum + Default
average_cloned where T: Clone +
peek_nth where T: Clone // 0 = oldest
contains_cloned where T: Clone + PartialEq
max_cloned_by where F: Fn , K: Ord
min_cloned_by where F: Fn , K: Ord
to_vec_sorted where T: Clone + Ord
to_vec_cloned where T: Clone
first where T: Clone // oldest item
drain_into where T: Clone
book module
// Construct a delta (sequence number optional).
new // Apply deltas and query the book.
new
ohlcv module
// Construct an aggregator.
new
norm module
Both normalizers expose 80+ analytics. Only core methods are shown here; see the Analytics Suite section above for the full categorized function list.
// Min-max rolling normalizer
new // panics if window_size == 0
update // O(1) amortized
normalize // [0.0, 1.0]
min_max // ... 70+ additional analytics (moments, percentiles, trend, shape — see Analytics Suite)
// Z-score rolling normalizer
new // ... 60+ additional analytics (see Analytics Suite)
lorentz module
new // beta in [0, 1)
beta // t' = gamma * t (x = 0)
contract_length // x' = x / gamma (t = 0)
spacetime_interval // t^2 - x^2
rapidity // atanh(beta)
relativistic_momentum // gamma * mass * beta
four_momentum // (E, p)
velocity_addition // β·γ
energy_momentum_invariant // E² - p² = m²
new // public fields
health module
new // per-feed custom thresholds
heartbeat // feeds with no heartbeat yet
feeds_needing_check // sorted non-Healthy feed IDs
ratio_healthy // healthy / total
total_tick_count
session module
new // Open or Extended
remaining_ms
ws module
new // 10 attempts, 500ms initial, 30s cap, 2x multiplier
backoff_for_attempt
Supported Exchanges
| Exchange | Adapter | Status | Wire-format fields used |
|---|---|---|---|
| Binance | Exchange::Binance |
Stable | p (price), q (qty), m (maker/taker), t (trade id), T (exchange ts) |
| Coinbase | Exchange::Coinbase |
Stable | price, size, side, trade_id |
| Alpaca | Exchange::Alpaca |
Stable | p (price), s (size), i (trade id) |
| Polygon | Exchange::Polygon |
Stable | p (price), s (size), i (trade id), t (exchange ts) |
All four adapters are covered by unit and integration tests. To add a new exchange, see the Contributing section below.
Precision and Accuracy Notes
- Price and quantity fields use
rust_decimal::Decimal— a 96-bit integer mantissa with a power-of-10 exponent. This guarantees exact representation of any finite decimal number with up to 28 significant digits. There is no floating-point rounding error on price arithmetic. - Normalization (
f64) uses IEEE 754 double precision. The error bound onnormalize(x)is roughly2 * machine_epsilon * |x|in the worst case. For typical price ranges this is well below any practical threshold. - Lorentz parameters (
f64) usef64throughout. The round-trip error ofinverse_transform(transform(p))is bounded by4 * gamma^2 * machine_epsilon. Forbeta <= 0.9,gamma <= ~2.3and the round-trip error is< 1e-13. - Bar aggregation accumulates volume with
Decimaladdition. OHLC fields carry the exact decimal values from normalized ticks with no intermediate rounding.
Error Handling
All fallible operations return Result<_, StreamError>. StreamError variants:
| Variant | Subsystem | When emitted |
|---|---|---|
ConnectionFailed |
ws | WebSocket connection attempt rejected |
Disconnected |
ws | Live connection dropped unexpectedly |
ReconnectExhausted |
ws | All reconnect attempts consumed |
Backpressure |
ws / ring | Downstream channel or ring buffer is full |
ParseError |
tick | Tick deserialization failed (missing field, invalid decimal) |
UnknownExchange |
tick | Exchange identifier string not recognized |
InvalidTick |
tick | Tick failed validation (negative price, zero quantity) |
BookReconstructionFailed |
book | Delta applied to wrong symbol, or sequence gap |
BookCrossed |
book | Order book bid >= ask after applying a delta |
StaleFeed |
health | Feed has not produced data within staleness threshold |
AggregationError |
ohlcv | Wrong symbol or zero-duration timeframe |
NormalizationError |
norm | normalize() called before any observations fed |
RingBufferFull |
ring | SPSC ring buffer has no free slots |
RingBufferEmpty |
ring | SPSC ring buffer has no pending items |
LorentzConfigError |
lorentz | beta >= 1 or beta < 0 or beta = NaN |
Io |
all | Underlying I/O error |
WebSocket |
ws | WebSocket protocol-level error |
Custom Pipeline Extensions
Implementing a custom tick normalizer
use ;
use StreamError;
;
Implementing a custom downstream consumer
use OhlcvBar;
Running Tests and Benchmarks
PROPTEST_CASES=1000
Changelog
See CHANGELOG.md for a full version-by-version history.
Contributing
General workflow
- Fork the repository and create a feature branch.
- Add or update tests for any changed behaviour. The CI gate requires all tests to pass and Clippy to report no warnings.
- Run
cargo fmtbefore opening a pull request. - Keep public APIs documented with
///doc comments;#![deny(missing_docs)]is active inlib.rs— undocumented public items cause a build failure. - Open a pull request against
main. The CI pipeline (fmt, clippy, test on three platforms, bench, doc, deny, coverage) must be green before merge.
Adding a new exchange adapter
- Add the variant to
Exchangeinsrc/tick/mod.rswith a///doc comment. - Implement
DisplayandFromStrfor the new variant in the same file. - Add a
normalize_<exchange>method following the pattern ofnormalize_binance. - Wire the method into
TickNormalizer::normalizevia the match arm. - Add unit tests covering: happy-path, each required missing field returning
StreamError::ParseError, and an invalid decimal string. - Update the README "Supported Exchanges" table and
CHANGELOG.md[Unreleased].
License
MIT. See LICENSE for details.