easyfix_session/io/
input_stream.rs

1use std::{
2    io,
3    pin::Pin,
4    task::{Context, Poll, ready},
5};
6
7use bytes::BytesMut;
8use easyfix_messages::{
9    deserializer::{self, RawMessageError, raw_message},
10    messages::FixtMessage,
11};
12use futures_util::Stream;
13use pin_project::pin_project;
14use tokio::io::AsyncRead;
15use tokio_util::io::poll_read_buf;
16use tracing::{debug, info, warn};
17
18use crate::application::DeserializeError;
19
20#[derive(Debug)]
21pub enum InputEvent {
22    Message(Box<FixtMessage>),
23    DeserializeError(DeserializeError),
24    IoError(io::Error),
25    Timeout,
26    LogoutTimeout,
27}
28
29fn process_garbled_data(buf: &mut BytesMut) {
30    let len = buf.len();
31    for i in 1..buf.len() {
32        if let Ok(_) | Err(RawMessageError::Incomplete) = raw_message(&buf[i..]) {
33            buf.split_to(i).freeze();
34            info!("dropped {i} bytes of garbled message");
35            return;
36        }
37    }
38    buf.clear();
39    info!("dropped {len} bytes of garbled message");
40}
41
42fn parse_message(
43    bytes: &mut BytesMut,
44) -> Result<Option<Box<FixtMessage>>, deserializer::DeserializeError> {
45    if bytes.is_empty() {
46        return Ok(None);
47    }
48    debug!(
49        "Raw data input :: {}",
50        String::from_utf8_lossy(bytes).replace('\x01', "|")
51    );
52
53    let src_len = bytes.len();
54
55    match raw_message(bytes) {
56        Ok((leftover, raw_msg)) => {
57            let result = FixtMessage::from_raw_message(raw_msg).map(Some);
58            let leftover_len = leftover.len();
59            bytes.split_to(src_len - leftover_len).freeze();
60            result
61        }
62        Err(RawMessageError::Incomplete) => Ok(None),
63        Err(err) => {
64            process_garbled_data(bytes);
65            Err(err.into())
66        }
67    }
68}
69
70#[pin_project]
71pub struct InputStream<S> {
72    buffer: BytesMut,
73    #[pin]
74    source: S,
75}
76
77impl<S> Stream for InputStream<S>
78where
79    S: AsyncRead + Unpin,
80{
81    type Item = InputEvent;
82
83    fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
84        let mut this = self.project();
85
86        loop {
87            // Attempt to parse a message from the buffered data.
88            // If enough data has been buffered, the message is returned.
89            match parse_message(this.buffer) {
90                Ok(Some(msg)) => {
91                    return Poll::Ready(Some(InputEvent::Message(msg)));
92                }
93                Ok(None) => {}
94                // Convert `deserializer::DeserializeError` to `application::DeserializeError`
95                // to prevent leaking ParseRejectReason to user code.
96                Err(error) => {
97                    return Poll::Ready(Some(InputEvent::DeserializeError(error.into())));
98                }
99            }
100
101            // There is not enough buffered data to read a message.
102            // Attempt to read more data from the socket.
103            //
104            // On success, the number of bytes is returned. `0` indicates "end
105            // of stream".
106            let future = poll_read_buf(Pin::new(&mut this.source), cx, this.buffer);
107            match ready!(future) {
108                Ok(0) => {
109                    // The remote closed the connection. For this to be a clean
110                    // shutdown, there should be no data in the read buffer. If
111                    // there is, this means that the peer closed the socket while
112                    // sending a frame.
113                    if this.buffer.is_empty() {
114                        info!("Stream closed");
115                        return Poll::Ready(None);
116                    } else {
117                        warn!("Connection reset by peer");
118                        return Poll::Ready(None);
119                    }
120                }
121                Ok(_n) => continue,
122                Err(err) => return Poll::Ready(Some(InputEvent::IoError(err))),
123            }
124        }
125    }
126}
127
128pub fn input_stream<S>(source: S) -> InputStream<S>
129where
130    S: AsyncRead + Unpin,
131{
132    InputStream {
133        // TODO: Max MSG size
134        buffer: BytesMut::with_capacity(4096),
135        source,
136    }
137}