barter_integration/stream/
mod.rs1use crate::{Transformer, error::SocketError, protocol::StreamParser};
2use futures::Stream;
3use pin_project::pin_project;
4use std::{
5 collections::VecDeque,
6 marker::PhantomData,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11pub mod data;
13
14pub mod ext;
16
17pub mod util;
19
20#[derive(Debug)]
23#[pin_project]
24pub struct ExchangeStream<Protocol, InnerStream, StreamTransformer>
25where
26 Protocol: StreamParser<StreamTransformer::Input>,
27 InnerStream: Stream,
28 StreamTransformer: Transformer,
29{
30 #[pin]
31 pub stream: InnerStream,
32 pub transformer: StreamTransformer,
33 pub buffer: VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>,
34 pub protocol_marker: PhantomData<Protocol>,
35}
36
37impl<Protocol, InnerStream, StreamTransformer> Stream
38 for ExchangeStream<Protocol, InnerStream, StreamTransformer>
39where
40 Protocol: StreamParser<StreamTransformer::Input>,
41 InnerStream: Stream<Item = Result<Protocol::Message, Protocol::Error>> + Unpin,
42 StreamTransformer: Transformer,
43 StreamTransformer::Error: From<SocketError>,
44{
45 type Item = Result<StreamTransformer::Output, StreamTransformer::Error>;
46
47 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
48 loop {
49 if let Some(output) = self.buffer.pop_front() {
51 return Poll::Ready(Some(output));
52 }
53
54 let input = match self.as_mut().project().stream.poll_next(cx) {
56 Poll::Ready(Some(input)) => input,
57 Poll::Ready(None) => return Poll::Ready(None),
58 Poll::Pending => return Poll::Pending,
59 };
60
61 let exchange_message = match Protocol::parse(input) {
63 Some(Ok(exchange_message)) => exchange_message,
65
66 Some(Err(err)) => return Poll::Ready(Some(Err(err.into()))),
68
69 None => continue,
71 };
72
73 self.transformer
76 .transform(exchange_message)
77 .into_iter()
78 .for_each(
79 |output_result: Result<StreamTransformer::Output, StreamTransformer::Error>| {
80 self.buffer.push_back(output_result)
81 },
82 );
83 }
84 }
85}
86
87impl<Protocol, InnerStream, StreamTransformer>
88 ExchangeStream<Protocol, InnerStream, StreamTransformer>
89where
90 Protocol: StreamParser<StreamTransformer::Input>,
91 InnerStream: Stream,
92 StreamTransformer: Transformer,
93{
94 pub fn new(
95 stream: InnerStream,
96 transformer: StreamTransformer,
97 buffer: VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>,
98 ) -> Self {
99 Self {
100 stream,
101 transformer,
102 buffer,
103 protocol_marker: PhantomData,
104 }
105 }
106}