hackdose_sml_parser/message_stream/
mod.rs1use tokio::{
2 io::{AsyncRead, AsyncReadExt},
3 sync::mpsc::{self, Sender},
4};
5use tokio_stream::{wrappers::ReceiverStream, Stream};
6
7use crate::{
8 application::{domain::SmlMessages, parser::parse_body},
9 transport::SMLMessageBuilder,
10};
11
12pub fn sml_message_stream(
29 mut stream: impl AsyncRead + Unpin + Send + 'static,
30) -> impl Stream<Item = SmlMessages> {
31 let (tx, rx) = mpsc::channel::<SmlMessages>(256);
32
33 let mut buf = [0; 512];
34 let mut builder = SMLMessageBuilder::Empty;
35
36 tokio::spawn(async move {
37 while let Ok(n) = stream.read(&mut buf).await {
38 if n == 0 {
39 break;
40 }
41 emit_message(&mut builder, &buf[..n], tx.clone()).await;
42 }
43 });
44
45 ReceiverStream::new(rx)
46}
47
48async fn emit_message<'a>(
49 builder: &'a mut SMLMessageBuilder,
50 buf: &'a [u8],
51 tx: Sender<SmlMessages>,
52) {
53 let mut to_process = buf.to_vec();
54 while to_process.len() > 0 {
55 builder.record(&to_process);
56 to_process = vec![];
57
58 match builder {
59 SMLMessageBuilder::Complete { ref data, ref rest } => {
60 let result = parse_body(data);
61 if let Ok(messages) = result {
62 let _ = tx.send(messages).await;
63 }
64 if rest.len() == 0 {
65 *builder = SMLMessageBuilder::Empty;
66 } else {
67 to_process = rest.to_vec();
68 *builder = SMLMessageBuilder::Empty;
69 }
70 }
71 SMLMessageBuilder::Empty => (),
72 SMLMessageBuilder::IncompleteStartSignature(_) => (),
73 SMLMessageBuilder::Recording(_) => (),
74 }
75 }
76}