Skip to main content

market_flow/
lib.rs

1//! Read NDJSON market-data files as an async stream of [`model::market_event::MarketEvent`] values.
2//!
3//! # Quick start
4//!
5//! ```no_run
6//! use futures::StreamExt;
7//! use market_flow::{init_market_event_stream, model::market_event::MarketEvent};
8//!
9//! # async fn example() -> Result<(), market_flow::MarketFlowError> {
10//! let mut stream = init_market_event_stream("events.ndjson").await?;
11//! while let Some(result) = stream.next().await {
12//!     let event: MarketEvent = result?;
13//!     // handle event
14//! }
15//! # Ok(())
16//! # }
17//! ```
18
19pub mod model;
20
21mod market_event_stream;
22
23pub use market_event_stream::MarketEventStream;
24pub use model::error::MarketFlowError;
25pub use model::market_event::{MarketEvent, OrderbookEvent, OrderbookLevel, Side, TradeEvent};
26
27/// Open an NDJSON file at `file` and return a stream of market events.
28///
29/// Each non-empty line is parsed as one [`model::market_event::MarketEvent`]. Requires a Tokio runtime
30/// (see [`MarketEventStream::new`]).
31pub async fn init_market_event_stream(file: &str) -> Result<MarketEventStream, MarketFlowError> {
32    MarketEventStream::new(file).await
33}
34
35#[cfg(test)]
36mod tests {
37    use std::path::PathBuf;
38
39    use futures::StreamExt;
40    use rust_decimal::Decimal;
41
42    use crate::model::market_event::{MarketEvent, Side};
43
44    fn fixture_path() -> PathBuf {
45        PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("src/data/input.ndjson")
46    }
47
48    #[tokio::test]
49    async fn streams_all_events_from_input_ndjson() {
50        let mut stream = crate::init_market_event_stream(fixture_path().to_str().unwrap())
51            .await
52            .expect("open fixture");
53
54        let mut events = Vec::new();
55        while let Some(result) = stream.next().await {
56            events.push(result.expect("parse event"));
57        }
58
59        assert_eq!(events.len(), 10);
60
61        let MarketEvent::Snapshot(first) = &events[0] else {
62            panic!("expected snapshot");
63        };
64        assert_eq!(first.symbol.as_str(), "BTC-USDT");
65        assert_eq!(first.bids.len(), 3);
66        assert_eq!(first.bids[0].price, Decimal::from(10_000_000));
67
68        let MarketEvent::Trade(third) = &events[2] else {
69            panic!("expected trade");
70        };
71        assert_eq!(third.side, Side::Buy);
72        assert_eq!(third.price, Decimal::from(10_000_100));
73
74        assert!(matches!(events[9], MarketEvent::DepthBatch(_)));
75    }
76}