market_flow/
market_event_stream.rs1use futures::Stream;
4use std::{
5 pin::Pin,
6 task::{Context, Poll},
7};
8use tokio::{
9 fs::File,
10 io::{AsyncBufReadExt, BufReader},
11};
12use tokio_stream::wrappers::LinesStream;
13
14use crate::model::{error::MarketFlowError, market_event::MarketEvent};
15
16pub struct MarketEventStream {
21 line_stream: LinesStream<BufReader<File>>,
22}
23
24impl MarketEventStream {
25 pub async fn new(path: &str) -> Result<Self, MarketFlowError> {
27 let file = File::open(path).await?;
28 let line_stream = LinesStream::new(BufReader::new(file).lines());
29 Ok(Self { line_stream })
30 }
31}
32
33impl Stream for MarketEventStream {
34 type Item = Result<MarketEvent, MarketFlowError>;
35
36 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
37 match Pin::new(&mut self.line_stream).poll_next(cx) {
38 Poll::Ready(Some(line)) => Poll::Ready(Some(parse_line(line))),
39 Poll::Ready(None) => Poll::Ready(None),
40 Poll::Pending => Poll::Pending,
41 }
42 }
43}
44
45fn parse_line(line: Result<String, std::io::Error>) -> Result<MarketEvent, MarketFlowError> {
46 let line = line?;
47 Ok(serde_json::from_str(&line)?)
48}