y_octo/protocol/
scanner.rs

1use super::*;
2
3pub struct SyncMessageScanner<'a> {
4    buffer: &'a [u8],
5}
6
7impl SyncMessageScanner<'_> {
8    pub fn new(buffer: &[u8]) -> SyncMessageScanner {
9        SyncMessageScanner { buffer }
10    }
11}
12
13impl<'a> Iterator for SyncMessageScanner<'a> {
14    type Item = Result<SyncMessage, nom::Err<nom::error::Error<&'a [u8]>>>;
15
16    fn next(&mut self) -> Option<Self::Item> {
17        if self.buffer.is_empty() {
18            return None;
19        }
20
21        match read_sync_message(self.buffer) {
22            Ok((tail, message)) => {
23                self.buffer = tail;
24                Some(Ok(message))
25            }
26            Err(nom::Err::Incomplete(_))
27            | Err(nom::Err::Error(nom::error::Error {
28                code: nom::error::ErrorKind::Eof,
29                ..
30            }))
31            | Err(nom::Err::Failure(nom::error::Error {
32                code: nom::error::ErrorKind::Eof,
33                ..
34            })) => {
35                debug!("incomplete sync message");
36                None
37            }
38
39            Err(e) => Some(Err(e)),
40        }
41    }
42}
43
44#[cfg(test)]
45mod tests {
46    use proptest::{collection::vec, prelude::*};
47    use y_sync::sync::MessageReader;
48    use yrs::updates::decoder::DecoderV1;
49
50    use super::{utils::to_sync_message, *};
51
52    proptest! {
53        #[test]
54        #[cfg_attr(miri, ignore)]
55        fn test_sync_message_scanner(messages in vec(any::<SyncMessage>(), 0..10)) {
56            let mut buffer = Vec::new();
57
58            for message in &messages {
59                write_sync_message(&mut buffer, message).unwrap();
60            }
61
62            let result: Result<Vec<SyncMessage>, _> = SyncMessageScanner::new(&buffer).collect();
63            assert_eq!(result.unwrap(), messages);
64
65            {
66                let mut decoder = DecoderV1::from(buffer.as_slice());
67                let original =  MessageReader::new(&mut decoder)
68                    .flatten()
69                    .collect::<Vec<_>>();
70                assert_eq!(original.into_iter().filter_map(to_sync_message).collect::<Vec<_>>(), messages);
71            }
72        }
73    }
74}