y_octo/protocol/
scanner.rs1use super::*;
2
3pub struct SyncMessageScanner<'a> {
4 buffer: &'a [u8],
5}
6
7impl SyncMessageScanner<'_> {
8 pub fn new(buffer: &[u8]) -> SyncMessageScanner {
9 SyncMessageScanner { buffer }
10 }
11}
12
13impl<'a> Iterator for SyncMessageScanner<'a> {
14 type Item = Result<SyncMessage, nom::Err<nom::error::Error<&'a [u8]>>>;
15
16 fn next(&mut self) -> Option<Self::Item> {
17 if self.buffer.is_empty() {
18 return None;
19 }
20
21 match read_sync_message(self.buffer) {
22 Ok((tail, message)) => {
23 self.buffer = tail;
24 Some(Ok(message))
25 }
26 Err(nom::Err::Incomplete(_))
27 | Err(nom::Err::Error(nom::error::Error {
28 code: nom::error::ErrorKind::Eof,
29 ..
30 }))
31 | Err(nom::Err::Failure(nom::error::Error {
32 code: nom::error::ErrorKind::Eof,
33 ..
34 })) => {
35 debug!("incomplete sync message");
36 None
37 }
38
39 Err(e) => Some(Err(e)),
40 }
41 }
42}
43
44#[cfg(test)]
45mod tests {
46 use proptest::{collection::vec, prelude::*};
47 use y_sync::sync::MessageReader;
48 use yrs::updates::decoder::DecoderV1;
49
50 use super::{utils::to_sync_message, *};
51
52 proptest! {
53 #[test]
54 #[cfg_attr(miri, ignore)]
55 fn test_sync_message_scanner(messages in vec(any::<SyncMessage>(), 0..10)) {
56 let mut buffer = Vec::new();
57
58 for message in &messages {
59 write_sync_message(&mut buffer, message).unwrap();
60 }
61
62 let result: Result<Vec<SyncMessage>, _> = SyncMessageScanner::new(&buffer).collect();
63 assert_eq!(result.unwrap(), messages);
64
65 {
66 let mut decoder = DecoderV1::from(buffer.as_slice());
67 let original = MessageReader::new(&mut decoder)
68 .flatten()
69 .collect::<Vec<_>>();
70 assert_eq!(original.into_iter().filter_map(to_sync_message).collect::<Vec<_>>(), messages);
71 }
72 }
73 }
74}