Skip to main content

market_flow/
market_event_stream.rs

1//! Async line-by-line reader that turns an NDJSON file into a [`Stream`] of [`MarketEvent`]s.
2
3use futures::Stream;
4use std::{
5    pin::Pin,
6    task::{Context, Poll},
7};
8use tokio::{
9    fs::File,
10    io::{AsyncBufReadExt, BufReader},
11};
12use tokio_stream::wrappers::LinesStream;
13
14use crate::model::{error::MarketFlowError, market_event::MarketEvent};
15
16/// A [`Stream`] over market events read from a single NDJSON file.
17///
18/// Construct with [`MarketEventStream::new`] or [`crate::init_market_event_stream`],
19/// then poll with [`futures::StreamExt::next`].
20pub struct MarketEventStream {
21    line_stream: LinesStream<BufReader<File>>,
22}
23
24impl MarketEventStream {
25    /// Open `path` and prepare to stream one event per line.
26    pub async fn new(path: &str) -> Result<Self, MarketFlowError> {
27        let file = File::open(path).await?;
28        let line_stream = LinesStream::new(BufReader::new(file).lines());
29        Ok(Self { line_stream })
30    }
31}
32
33impl Stream for MarketEventStream {
34    type Item = Result<MarketEvent, MarketFlowError>;
35
36    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
37        match Pin::new(&mut self.line_stream).poll_next(cx) {
38            Poll::Ready(Some(line)) => Poll::Ready(Some(parse_line(line))),
39            Poll::Ready(None) => Poll::Ready(None),
40            Poll::Pending => Poll::Pending,
41        }
42    }
43}
44
45fn parse_line(line: Result<String, std::io::Error>) -> Result<MarketEvent, MarketFlowError> {
46    let line = line?;
47    Ok(serde_json::from_str(&line)?)
48}