barter_integration/stream/
mod.rs1use crate::{Transformer, error::SocketError, protocol::StreamParser};
2use futures::Stream;
3use pin_project::pin_project;
4use std::{
5 collections::VecDeque,
6 marker::PhantomData,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11pub mod indexed;
12pub mod merge;
13
14#[derive(Debug)]
17#[pin_project]
18pub struct ExchangeStream<Protocol, InnerStream, StreamTransformer>
19where
20 Protocol: StreamParser,
21 InnerStream: Stream,
22 StreamTransformer: Transformer,
23{
24 #[pin]
25 pub stream: InnerStream,
26 pub transformer: StreamTransformer,
27 pub buffer: VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>,
28 pub protocol_marker: PhantomData<Protocol>,
29}
30
31impl<Protocol, InnerStream, StreamTransformer> Stream
32 for ExchangeStream<Protocol, InnerStream, StreamTransformer>
33where
34 Protocol: StreamParser,
35 InnerStream: Stream<Item = Result<Protocol::Message, Protocol::Error>> + Unpin,
36 StreamTransformer: Transformer,
37 StreamTransformer::Error: From<SocketError>,
38{
39 type Item = Result<StreamTransformer::Output, StreamTransformer::Error>;
40
41 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42 loop {
43 if let Some(output) = self.buffer.pop_front() {
45 return Poll::Ready(Some(output));
46 }
47
48 let input = match self.as_mut().project().stream.poll_next(cx) {
50 Poll::Ready(Some(input)) => input,
51 Poll::Ready(None) => return Poll::Ready(None),
52 Poll::Pending => return Poll::Pending,
53 };
54
55 let exchange_message = match Protocol::parse::<StreamTransformer::Input>(input) {
57 Some(Ok(exchange_message)) => exchange_message,
59
60 Some(Err(err)) => return Poll::Ready(Some(Err(err.into()))),
62
63 None => continue,
65 };
66
67 self.transformer
70 .transform(exchange_message)
71 .into_iter()
72 .for_each(
73 |output_result: Result<StreamTransformer::Output, StreamTransformer::Error>| {
74 self.buffer.push_back(output_result)
75 },
76 );
77 }
78 }
79}
80
81impl<Protocol, InnerStream, StreamTransformer>
82 ExchangeStream<Protocol, InnerStream, StreamTransformer>
83where
84 Protocol: StreamParser,
85 InnerStream: Stream,
86 StreamTransformer: Transformer,
87{
88 pub fn new(
89 stream: InnerStream,
90 transformer: StreamTransformer,
91 buffer: VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>,
92 ) -> Self {
93 Self {
94 stream,
95 transformer,
96 buffer,
97 protocol_marker: PhantomData,
98 }
99 }
100}