easyfix_session/io/
input_stream.rs1use std::{
2 io,
3 pin::Pin,
4 task::{Context, Poll, ready},
5};
6
7use bytes::BytesMut;
8use easyfix_messages::{
9 deserializer::{self, RawMessageError, raw_message},
10 messages::FixtMessage,
11};
12use futures_util::Stream;
13use pin_project::pin_project;
14use tokio::io::AsyncRead;
15use tokio_util::io::poll_read_buf;
16use tracing::{debug, info, warn};
17
18use crate::application::DeserializeError;
19
20#[derive(Debug)]
21pub enum InputEvent {
22 Message(Box<FixtMessage>),
23 DeserializeError(DeserializeError),
24 IoError(io::Error),
25 Timeout,
26 LogoutTimeout,
27}
28
29fn process_garbled_data(buf: &mut BytesMut) {
30 let len = buf.len();
31 for i in 1..buf.len() {
32 if let Ok(_) | Err(RawMessageError::Incomplete) = raw_message(&buf[i..]) {
33 buf.split_to(i).freeze();
34 info!("dropped {i} bytes of garbled message");
35 return;
36 }
37 }
38 buf.clear();
39 info!("dropped {len} bytes of garbled message");
40}
41
42fn parse_message(
43 bytes: &mut BytesMut,
44) -> Result<Option<Box<FixtMessage>>, deserializer::DeserializeError> {
45 if bytes.is_empty() {
46 return Ok(None);
47 }
48 debug!(
49 "Raw data input :: {}",
50 String::from_utf8_lossy(bytes).replace('\x01', "|")
51 );
52
53 let src_len = bytes.len();
54
55 match raw_message(bytes) {
56 Ok((leftover, raw_msg)) => {
57 let result = FixtMessage::from_raw_message(raw_msg).map(Some);
58 let leftover_len = leftover.len();
59 bytes.split_to(src_len - leftover_len).freeze();
60 result
61 }
62 Err(RawMessageError::Incomplete) => Ok(None),
63 Err(err) => {
64 process_garbled_data(bytes);
65 Err(err.into())
66 }
67 }
68}
69
70#[pin_project]
71pub struct InputStream<S> {
72 buffer: BytesMut,
73 #[pin]
74 source: S,
75}
76
77impl<S> Stream for InputStream<S>
78where
79 S: AsyncRead + Unpin,
80{
81 type Item = InputEvent;
82
83 fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
84 let mut this = self.project();
85
86 loop {
87 match parse_message(this.buffer) {
90 Ok(Some(msg)) => {
91 return Poll::Ready(Some(InputEvent::Message(msg)));
92 }
93 Ok(None) => {}
94 Err(error) => {
97 return Poll::Ready(Some(InputEvent::DeserializeError(error.into())));
98 }
99 }
100
101 let future = poll_read_buf(Pin::new(&mut this.source), cx, this.buffer);
107 match ready!(future) {
108 Ok(0) => {
109 if this.buffer.is_empty() {
114 info!("Stream closed");
115 return Poll::Ready(None);
116 } else {
117 warn!("Connection reset by peer");
118 return Poll::Ready(None);
119 }
120 }
121 Ok(_n) => continue,
122 Err(err) => return Poll::Ready(Some(InputEvent::IoError(err))),
123 }
124 }
125 }
126}
127
128pub fn input_stream<S>(source: S) -> InputStream<S>
129where
130 S: AsyncRead + Unpin,
131{
132 InputStream {
133 buffer: BytesMut::with_capacity(4096),
135 source,
136 }
137}