Skip to main content

barter_integration/stream/
mod.rs

1use crate::{Transformer, error::SocketError, protocol::StreamParser};
2use futures::Stream;
3use pin_project::pin_project;
4use std::{
5    collections::VecDeque,
6    marker::PhantomData,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11/// Data stream abstractions and configuration.
12pub mod data;
13
14/// Stream extension traits.
15pub mod ext;
16
17/// Stream utility functions.
18pub mod util;
19
20/// An [`ExchangeStream`] is a communication protocol agnostic [`Stream`]. It polls protocol
21/// messages from the inner [`Stream`], and transforms them into the desired output data structure.
22#[derive(Debug)]
23#[pin_project]
24pub struct ExchangeStream<Protocol, InnerStream, StreamTransformer>
25where
26    Protocol: StreamParser<StreamTransformer::Input>,
27    InnerStream: Stream,
28    StreamTransformer: Transformer,
29{
30    #[pin]
31    pub stream: InnerStream,
32    pub transformer: StreamTransformer,
33    pub buffer: VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>,
34    pub protocol_marker: PhantomData<Protocol>,
35}
36
37impl<Protocol, InnerStream, StreamTransformer> Stream
38    for ExchangeStream<Protocol, InnerStream, StreamTransformer>
39where
40    Protocol: StreamParser<StreamTransformer::Input>,
41    InnerStream: Stream<Item = Result<Protocol::Message, Protocol::Error>> + Unpin,
42    StreamTransformer: Transformer,
43    StreamTransformer::Error: From<SocketError>,
44{
45    type Item = Result<StreamTransformer::Output, StreamTransformer::Error>;
46
47    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
48        loop {
49            // Flush Self::Item buffer if it is not currently empty
50            if let Some(output) = self.buffer.pop_front() {
51                return Poll::Ready(Some(output));
52            }
53
54            // Poll inner `Stream` for next the next input protocol message
55            let input = match self.as_mut().project().stream.poll_next(cx) {
56                Poll::Ready(Some(input)) => input,
57                Poll::Ready(None) => return Poll::Ready(None),
58                Poll::Pending => return Poll::Pending,
59            };
60
61            // Parse input protocol message into `ExchangeMessage`
62            let exchange_message = match Protocol::parse(input) {
63                // `StreamParser` successfully deserialised `ExchangeMessage`
64                Some(Ok(exchange_message)) => exchange_message,
65
66                // If `StreamParser` returns an Err pass it downstream
67                Some(Err(err)) => return Poll::Ready(Some(Err(err.into()))),
68
69                // If `StreamParser` returns None it's a safe-to-skip message
70                None => continue,
71            };
72
73            // Transform `ExchangeMessage` into `Transformer::OutputIter`
74            // ie/ IntoIterator<Item = Result<Output, SocketError>>
75            self.transformer
76                .transform(exchange_message)
77                .into_iter()
78                .for_each(
79                    |output_result: Result<StreamTransformer::Output, StreamTransformer::Error>| {
80                        self.buffer.push_back(output_result)
81                    },
82                );
83        }
84    }
85}
86
87impl<Protocol, InnerStream, StreamTransformer>
88    ExchangeStream<Protocol, InnerStream, StreamTransformer>
89where
90    Protocol: StreamParser<StreamTransformer::Input>,
91    InnerStream: Stream,
92    StreamTransformer: Transformer,
93{
94    pub fn new(
95        stream: InnerStream,
96        transformer: StreamTransformer,
97        buffer: VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>,
98    ) -> Self {
99        Self {
100            stream,
101            transformer,
102            buffer,
103            protocol_marker: PhantomData,
104        }
105    }
106}