barter_integration/stream/
mod.rs

1use crate::{Transformer, error::SocketError, protocol::StreamParser};
2use futures::Stream;
3use pin_project::pin_project;
4use std::{
5    collections::VecDeque,
6    marker::PhantomData,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11pub mod indexed;
12pub mod merge;
13
14/// An [`ExchangeStream`] is a communication protocol agnostic [`Stream`]. It polls protocol
15/// messages from the inner [`Stream`], and transforms them into the desired output data structure.
16#[derive(Debug)]
17#[pin_project]
18pub struct ExchangeStream<Protocol, InnerStream, StreamTransformer>
19where
20    Protocol: StreamParser,
21    InnerStream: Stream,
22    StreamTransformer: Transformer,
23{
24    #[pin]
25    pub stream: InnerStream,
26    pub transformer: StreamTransformer,
27    pub buffer: VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>,
28    pub protocol_marker: PhantomData<Protocol>,
29}
30
31impl<Protocol, InnerStream, StreamTransformer> Stream
32    for ExchangeStream<Protocol, InnerStream, StreamTransformer>
33where
34    Protocol: StreamParser,
35    InnerStream: Stream<Item = Result<Protocol::Message, Protocol::Error>> + Unpin,
36    StreamTransformer: Transformer,
37    StreamTransformer::Error: From<SocketError>,
38{
39    type Item = Result<StreamTransformer::Output, StreamTransformer::Error>;
40
41    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42        loop {
43            // Flush Self::Item buffer if it is not currently empty
44            if let Some(output) = self.buffer.pop_front() {
45                return Poll::Ready(Some(output));
46            }
47
48            // Poll inner `Stream` for next the next input protocol message
49            let input = match self.as_mut().project().stream.poll_next(cx) {
50                Poll::Ready(Some(input)) => input,
51                Poll::Ready(None) => return Poll::Ready(None),
52                Poll::Pending => return Poll::Pending,
53            };
54
55            // Parse input protocol message into `ExchangeMessage`
56            let exchange_message = match Protocol::parse::<StreamTransformer::Input>(input) {
57                // `StreamParser` successfully deserialised `ExchangeMessage`
58                Some(Ok(exchange_message)) => exchange_message,
59
60                // If `StreamParser` returns an Err pass it downstream
61                Some(Err(err)) => return Poll::Ready(Some(Err(err.into()))),
62
63                // If `StreamParser` returns None it's a safe-to-skip message
64                None => continue,
65            };
66
67            // Transform `ExchangeMessage` into `Transformer::OutputIter`
68            // ie/ IntoIterator<Item = Result<Output, SocketError>>
69            self.transformer
70                .transform(exchange_message)
71                .into_iter()
72                .for_each(
73                    |output_result: Result<StreamTransformer::Output, StreamTransformer::Error>| {
74                        self.buffer.push_back(output_result)
75                    },
76                );
77        }
78    }
79}
80
81impl<Protocol, InnerStream, StreamTransformer>
82    ExchangeStream<Protocol, InnerStream, StreamTransformer>
83where
84    Protocol: StreamParser,
85    InnerStream: Stream,
86    StreamTransformer: Transformer,
87{
88    pub fn new(
89        stream: InnerStream,
90        transformer: StreamTransformer,
91        buffer: VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>,
92    ) -> Self {
93        Self {
94            stream,
95            transformer,
96            buffer,
97            protocol_marker: PhantomData,
98        }
99    }
100}