barter_integration_copy/
lib.rs1#![warn(
16 missing_debug_implementations,
17 missing_copy_implementations,
18 rust_2018_idioms
19)]
20
21use crate::{error::SocketError, protocol::StreamParser};
22use futures::Stream;
23use pin_project::pin_project;
24use serde::{Deserialize, Serialize};
25use std::{
26 collections::VecDeque,
27 fmt::{Debug, Display, Formatter},
28 marker::PhantomData,
29 pin::Pin,
30 task::{Context, Poll},
31};
32
33pub mod error;
35
36pub mod protocol;
39
40pub mod metric;
42
43pub mod de;
45
46pub mod subscription;
50
51pub mod channel;
56
57pub trait Validator {
60 fn validate(self) -> Result<Self, SocketError>
62 where
63 Self: Sized;
64}
65
66pub trait Transformer {
69 type Error;
70 type Input: for<'de> Deserialize<'de>;
71 type Output;
72 type OutputIter: IntoIterator<Item = Result<Self::Output, Self::Error>>;
73 fn transform(&mut self, input: Self::Input) -> Self::OutputIter;
74}
75
76#[derive(Debug)]
79#[pin_project]
80pub struct ExchangeStream<Protocol, InnerStream, StreamTransformer>
81where
82 Protocol: StreamParser,
83 InnerStream: Stream,
84 StreamTransformer: Transformer,
85{
86 #[pin]
87 pub stream: InnerStream,
88 pub transformer: StreamTransformer,
89 pub buffer: VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>,
90 pub protocol_marker: PhantomData<Protocol>,
91}
92
93impl<Protocol, InnerStream, StreamTransformer> Stream
94 for ExchangeStream<Protocol, InnerStream, StreamTransformer>
95where
96 Protocol: StreamParser,
97 InnerStream: Stream<Item = Result<Protocol::Message, Protocol::Error>> + Unpin,
98 StreamTransformer: Transformer,
99 StreamTransformer::Error: From<SocketError>,
100{
101 type Item = Result<StreamTransformer::Output, StreamTransformer::Error>;
102
103 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
104 loop {
105 if let Some(output) = self.buffer.pop_front() {
107 return Poll::Ready(Some(output));
108 }
109
110 let input = match self.as_mut().project().stream.poll_next(cx) {
112 Poll::Ready(Some(input)) => input,
113 Poll::Ready(None) => return Poll::Ready(None),
114 Poll::Pending => return Poll::Pending,
115 };
116
117 let exchange_message = match Protocol::parse::<StreamTransformer::Input>(input) {
119 Some(Ok(exchange_message)) => exchange_message,
121
122 Some(Err(err)) => return Poll::Ready(Some(Err(err.into()))),
124
125 None => continue,
127 };
128
129 self.transformer
132 .transform(exchange_message)
133 .into_iter()
134 .for_each(
135 |output_result: Result<StreamTransformer::Output, StreamTransformer::Error>| {
136 self.buffer.push_back(output_result)
137 },
138 );
139 }
140 }
141}
142
143impl<Protocol, InnerStream, StreamTransformer>
144 ExchangeStream<Protocol, InnerStream, StreamTransformer>
145where
146 Protocol: StreamParser,
147 InnerStream: Stream,
148 StreamTransformer: Transformer,
149{
150 pub fn new(
151 stream: InnerStream,
152 transformer: StreamTransformer,
153 buffer: VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>,
154 ) -> Self {
155 Self {
156 stream,
157 transformer,
158 buffer,
159 protocol_marker: PhantomData,
160 }
161 }
162}
163
164#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
166pub enum Side {
167 #[serde(alias = "buy", alias = "BUY", alias = "b")]
168 Buy,
169 #[serde(alias = "sell", alias = "SELL", alias = "s")]
170 Sell,
171}
172
173impl Display for Side {
174 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
175 write!(
176 f,
177 "{}",
178 match self {
179 Side::Buy => "buy",
180 Side::Sell => "sell",
181 }
182 )
183 }
184}