1pub mod model;
20
21mod market_event_stream;
22
23pub use market_event_stream::MarketEventStream;
24pub use model::error::MarketFlowError;
25pub use model::market_event::{MarketEvent, OrderbookEvent, OrderbookLevel, Side, TradeEvent};
26
27pub async fn init_market_event_stream(file: &str) -> Result<MarketEventStream, MarketFlowError> {
32 MarketEventStream::new(file).await
33}
34
35#[cfg(test)]
36mod tests {
37 use std::path::PathBuf;
38
39 use futures::StreamExt;
40 use rust_decimal::Decimal;
41
42 use crate::model::market_event::{MarketEvent, Side};
43
44 fn fixture_path() -> PathBuf {
45 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("src/data/input.ndjson")
46 }
47
48 #[tokio::test]
49 async fn streams_all_events_from_input_ndjson() {
50 let mut stream = crate::init_market_event_stream(fixture_path().to_str().unwrap())
51 .await
52 .expect("open fixture");
53
54 let mut events = Vec::new();
55 while let Some(result) = stream.next().await {
56 events.push(result.expect("parse event"));
57 }
58
59 assert_eq!(events.len(), 10);
60
61 let MarketEvent::Snapshot(first) = &events[0] else {
62 panic!("expected snapshot");
63 };
64 assert_eq!(first.symbol.as_str(), "BTC-USDT");
65 assert_eq!(first.bids.len(), 3);
66 assert_eq!(first.bids[0].price, Decimal::from(10_000_000));
67
68 let MarketEvent::Trade(third) = &events[2] else {
69 panic!("expected trade");
70 };
71 assert_eq!(third.side, Side::Buy);
72 assert_eq!(third.price, Decimal::from(10_000_100));
73
74 assert!(matches!(events[9], MarketEvent::DepthBatch(_)));
75 }
76}