1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
use tokio::{
    io::{AsyncRead, AsyncReadExt},
    sync::mpsc::{self, Sender},
};
use tokio_stream::{wrappers::ReceiverStream, Stream};

use crate::{
    application::{domain::SmlMessages, parser::parse_body},
    transport::SMLMessageBuilder,
};

/// Read SML message stream from a reader
///
/// ```
/// use std::io::Cursor;
/// use hackdose_sml_parser::message_stream::sml_message_stream;
/// use tokio_stream::StreamExt;
///
/// #[tokio::main]
/// async fn main() {
///     let cursor = Cursor::new(vec![0x01, 0x02, 0x03]);
///     let mut message_stream = sml_message_stream(cursor);
///     while let Some(message) = message_stream.next().await {
///         println!("Message: {:?}", message);
///     }
/// }
/// ```
pub fn sml_message_stream(
    mut stream: impl AsyncRead + Unpin + Send + 'static,
) -> impl Stream<Item = SmlMessages> {
    let (tx, rx) = mpsc::channel::<SmlMessages>(256);

    let mut buf = [0; 512];
    let mut builder = SMLMessageBuilder::Empty;

    tokio::spawn(async move {
        while let Ok(n) = stream.read(&mut buf).await {
            if n == 0 {
                break;
            }
            emit_message(&mut builder, &buf[..n], tx.clone()).await;
        }
    });

    ReceiverStream::new(rx)
}

async fn emit_message<'a>(
    builder: &'a mut SMLMessageBuilder,
    buf: &'a [u8],
    tx: Sender<SmlMessages>,
) {
    let mut to_process = buf.to_vec();
    while to_process.len() > 0 {
        builder.record(&to_process);
        to_process = vec![];

        match builder {
            SMLMessageBuilder::Complete { ref data, ref rest } => {
                let result = parse_body(data);
                if let Ok(messages) = result {
                    let _ = tx.send(messages).await;
                }
                if rest.len() == 0 {
                    *builder = SMLMessageBuilder::Empty;
                } else {
                    to_process = rest.to_vec();
                    *builder = SMLMessageBuilder::Empty;
                }
            }
            SMLMessageBuilder::Empty => (),
            SMLMessageBuilder::IncompleteStartSignature(_) => (),
            SMLMessageBuilder::Recording(_) => (),
        }
    }
}