hackdose_sml_parser/message_stream/
mod.rs

1use tokio::{
2    io::{AsyncRead, AsyncReadExt},
3    sync::mpsc::{self, Sender},
4};
5use tokio_stream::{wrappers::ReceiverStream, Stream};
6
7use crate::{
8    application::{domain::SmlMessages, parser::parse_body},
9    transport::SMLMessageBuilder,
10};
11
12/// Read SML message stream from a reader
13///
14/// ```
15/// use std::io::Cursor;
16/// use hackdose_sml_parser::message_stream::sml_message_stream;
17/// use tokio_stream::StreamExt;
18///
19/// #[tokio::main]
20/// async fn main() {
21///     let cursor = Cursor::new(vec![0x01, 0x02, 0x03]);
22///     let mut message_stream = sml_message_stream(cursor);
23///     while let Some(message) = message_stream.next().await {
24///         println!("Message: {:?}", message);
25///     }
26/// }
27/// ```
28pub fn sml_message_stream(
29    mut stream: impl AsyncRead + Unpin + Send + 'static,
30) -> impl Stream<Item = SmlMessages> {
31    let (tx, rx) = mpsc::channel::<SmlMessages>(256);
32
33    let mut buf = [0; 512];
34    let mut builder = SMLMessageBuilder::Empty;
35
36    tokio::spawn(async move {
37        while let Ok(n) = stream.read(&mut buf).await {
38            if n == 0 {
39                break;
40            }
41            emit_message(&mut builder, &buf[..n], tx.clone()).await;
42        }
43    });
44
45    ReceiverStream::new(rx)
46}
47
48async fn emit_message<'a>(
49    builder: &'a mut SMLMessageBuilder,
50    buf: &'a [u8],
51    tx: Sender<SmlMessages>,
52) {
53    let mut to_process = buf.to_vec();
54    while to_process.len() > 0 {
55        builder.record(&to_process);
56        to_process = vec![];
57
58        match builder {
59            SMLMessageBuilder::Complete { ref data, ref rest } => {
60                let result = parse_body(data);
61                if let Ok(messages) = result {
62                    let _ = tx.send(messages).await;
63                }
64                if rest.len() == 0 {
65                    *builder = SMLMessageBuilder::Empty;
66                } else {
67                    to_process = rest.to_vec();
68                    *builder = SMLMessageBuilder::Empty;
69                }
70            }
71            SMLMessageBuilder::Empty => (),
72            SMLMessageBuilder::IncompleteStartSignature(_) => (),
73            SMLMessageBuilder::Recording(_) => (),
74        }
75    }
76}