ccsds/framing/
synchronizer.rs

1use super::bytes::Bytes;
2use crate::prelude::*;
3use std::collections::HashMap;
4use std::io::{ErrorKind, Read};
5
6/// Default CCSDS attached sync marker.
7pub const ASM: [u8; 4] = [0x1a, 0xcf, 0xfc, 0x1d];
8
9/// Bit-shift each byte in dat by k bits to the left, without wrapping.
10fn left_shift(dat: &[u8], k: usize) -> Vec<u8> {
11    let mut out: Vec<u8> = vec![0; dat.len()];
12    // left shift each byte the correct nufdcmber of bits
13    for i in 0..dat.len() {
14        out[i] = dat[i] << k;
15    }
16    // OR with the remainder from the i+1th byte
17    if k != 0 {
18        for i in 0..(dat.len() - 1) {
19            out[i] |= dat[i + 1] >> (8 - k);
20        }
21    }
22    out
23}
24
25/// Create all possible bit-shifted patterns, and their associated masks to indicate
26/// significant bits, for dat.
27fn create_patterns(dat: &[u8]) -> (Vec<Vec<u8>>, Vec<Vec<u8>>) {
28    let mut patterns: Vec<Vec<u8>> = Vec::new();
29    let mut masks: Vec<Vec<u8>> = Vec::new();
30
31    // dat padded with an extra byte to give us room to shift
32    let mut padded_pattern = vec![0x0; dat.len() + 1];
33    padded_pattern[1..=dat.len()].copy_from_slice(&dat[..dat.len()]);
34    // for i in 1..=dat.len() {
35    //     padded_pattern[i] = dat[i - 1];
36    // }
37    let mut padded_mask = vec![0xff; dat.len() + 1];
38    padded_mask[0] = 0;
39
40    // First pattern is just the asm (one less in length than the rest)
41    patterns.push(dat.to_owned());
42    // First mask is all 1s because all bits must match
43    masks.push(vec![0xff; dat.len()]);
44
45    // Bit-shift other bytes such that the first byte of the pattern is the first
46    // byte of dat shifted *RIGHT* by 1.
47    for i in 1..8usize {
48        patterns.push(left_shift(&padded_pattern, 8 - i));
49        masks.push(left_shift(&padded_mask, 8 - i));
50    }
51
52    (patterns, masks)
53}
54
55/// A sychronized block location.
56#[derive(Debug, PartialEq)]
57pub struct Loc {
58    /// Offset (1-based) to the first byte after a found sync marker
59    pub offset: usize,
60    /// The bit in the byte at offset where the marker is found.
61    pub bit: u8,
62}
63
64/// Synchronizer scans a byte stream for data blocks indicated by a sync marker.
65///
66/// The sync marker may be bit-shifted, in which case the bytes returned will also
67/// be bit shifted.
68pub struct Synchronizer<R>
69where
70    R: Read + Send,
71{
72    bytes: Bytes<R>,
73    // Size of the block of data expected after an ASM
74    block_size: usize,
75    // All 8 possible bit patterns
76    patterns: Vec<Vec<u8>>,
77    // Bit-mask indicating the relavent bits for all 8 patterns
78    masks: Vec<Vec<u8>>,
79    // Index of the current pattern in the pattern vector
80    pattern_idx: usize,
81    /// Count of times each pattern was used.
82    pub pattern_hits: HashMap<u8, i32>,
83}
84
85impl<R> Synchronizer<R>
86where
87    R: Read + Send,
88{
89    /// Creates a new ``Synchronizer``.
90    ///
91    /// `block_size` is the length of the CADU minus the length of the ASM.
92    pub fn new(reader: R, block_size: usize) -> Self {
93        let (patterns, masks) = create_patterns(&ASM);
94        let bytes = Bytes::new(reader);
95        Synchronizer {
96            bytes,
97            block_size,
98            patterns,
99            masks,
100            pattern_idx: 0,
101            pattern_hits: HashMap::new(),
102        }
103    }
104
105    /// Use the specified attached sync marker rather than the defualt ([ASM]).
106    pub fn with_asm(mut self, asm: &[u8]) -> Self {
107        let (patterns, masks) = create_patterns(asm);
108        self.patterns = patterns;
109        self.masks = masks;
110        self
111    }
112
113    /// Scan our stream until the next sync marker is found and return a option containing
114    /// a [Some(Loc)] indicating the position of the data block and any left bit-shift currently
115    /// in effect. If there are not enough bytes to check the sync marker return Ok(None).
116    ///
117    /// # Errors
118    /// On [ErrorKind::UnexpectedEof] this will return [Ok(None)]. Any other error will result
119    /// in [Err(err)].
120    ///
121    /// # Panics
122    /// On unexpected state handling bit-shifting.
123    pub fn scan(&mut self) -> Result<Option<Loc>> {
124        let mut b: u8 = 0;
125        let mut working: Vec<u8> = Vec::new();
126
127        'next_pattern: loop {
128            for byte_idx in 0..self.patterns[self.pattern_idx].len() {
129                b = match self.bytes.next() {
130                    Err(err) => {
131                        if err.kind() == ErrorKind::UnexpectedEof {
132                            return Ok(None);
133                        }
134                        return Err(Error::Io(err));
135                    }
136                    Ok(b) => b,
137                };
138                working.push(b);
139
140                if (b & self.masks[self.pattern_idx][byte_idx])
141                    != self.patterns[self.pattern_idx][byte_idx]
142                {
143                    // No match
144                    self.pattern_idx += 1;
145                    if self.pattern_idx == 8 {
146                        // put all but the first byte in the working set back on bytes
147                        // (since we now have fully checked the first byte and know an
148                        // ASM does not begin there)
149                        self.pattern_idx = 0;
150                        working.reverse();
151                        self.bytes.push(&working[..working.len() - 1]);
152                    } else {
153                        // If we haven't checked all patterns put the working set back on bytes to
154                        // check against the other patterns.
155                        working.reverse();
156                        self.bytes.push(&working);
157                    }
158                    working.clear();
159                    continue 'next_pattern;
160                }
161            }
162
163            let mut loc = Loc {
164                offset: self.bytes.offset(),
165                bit: (8 - u8::try_from(self.pattern_idx).unwrap()) % 8,
166            };
167            // Exact sync means data block starts at the next byte
168            if loc.bit == 0 {
169                loc.offset += 1;
170            }
171
172            if self.pattern_idx > 0 {
173                self.bytes.push(&[b]);
174            }
175
176            self.pattern_hits
177                .entry(u8::try_from(self.pattern_idx).unwrap())
178                .and_modify(|count| *count += 1)
179                .or_insert(1);
180
181            return Ok(Some(loc));
182        }
183    }
184
185    /// Fetch a block from the stream.
186    ///
187    /// # Errors
188    /// On [Error]s filling buffer
189    pub fn block(&mut self) -> Result<Vec<u8>> {
190        let mut buf = vec![0u8; self.block_size];
191        if self.pattern_idx != 0 {
192            // Make room for bit-shifting
193            buf.push(0);
194        }
195        self.bytes.fill(&mut buf)?;
196        if self.pattern_idx != 0 {
197            // There's a partially used byte, so push it back for the next read
198            self.bytes.push(&[buf[buf.len() - 1]]);
199        }
200        let buf = left_shift(&buf, self.pattern_idx)[..self.block_size].to_vec();
201
202        Ok(buf)
203    }
204}
205
206impl<R> IntoIterator for Synchronizer<R>
207where
208    R: Read + Send,
209{
210    type Item = Result<Vec<u8>>;
211    type IntoIter = BlockIter<R>;
212
213    fn into_iter(self) -> Self::IntoIter {
214        BlockIter { scanner: self }
215    }
216}
217
218/// Iterates over synchronized data in block size defined by the source [Synchronizer].
219/// Created using ``Synchronizer::into_iter``.
220///
221/// ## Errors
222/// If a full block cannot be constructed the iterator simply ends, i.e., next returns
223/// `None`, however, any other error is passed on.
224pub struct BlockIter<R>
225where
226    R: Read + Send,
227{
228    scanner: Synchronizer<R>,
229}
230
231impl<R> Iterator for BlockIter<R>
232where
233    R: Read + Send,
234{
235    type Item = Result<Vec<u8>>;
236
237    fn next(&mut self) -> Option<Self::Item> {
238        match self.scanner.scan() {
239            Ok(Some(_)) => (),       // got a valid Loc
240            Ok(None) => return None, // no loc, must be done
241            // Scan resulted in a non-EOF error, let the consumer figure out what to do
242            Err(err) => return Some(Err(err)),
243        }
244        match self.scanner.block() {
245            Ok(block) => Some(Ok(block)),
246            Err(err) => Some(Err(err)),
247        }
248    }
249}
250
251/// Creates an iterator that produces byte-aligned data blocks.
252///
253/// `reader` is a ``std::io::Read`` implementation providing the byte stream. `asm` is the
254/// attached synchronization marker used to locate blocks in the data stream, and `block_size`
255/// is size of each block w/o the ASM.
256///
257/// The ASM need not be byte-aligned in the stream but it is expected that block data will
258/// follow immediately after the ASM. Blocks returned will be byte-aligned.
259///
260/// Data blocks are only produced if there are `block_size` bytes available, i.e.,
261/// any partial block at the end of the file is dropped.
262///
263/// For more control over the iteration process see [Synchronizer].
264///
265/// # Errors
266/// Any errors reading from the stream will cause the iterator to exit.
267///
268pub fn read_synchronized_blocks<'a, R>(
269    reader: R,
270    block_size: usize,
271) -> impl Iterator<Item = Result<Vec<u8>>> + 'a
272where
273    R: Read + Send + 'a,
274{
275    Synchronizer::new(reader, block_size).into_iter()
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281
282    #[test]
283    fn left_shift_over_asm_bytes() {
284        let input = [0, 26, 207, 252, 29];
285        let expected = vec![
286            [0, 26, 207, 252, 29],
287            [13, 103, 254, 14, 128],
288            [6, 179, 255, 7, 64],
289            [3, 89, 255, 131, 160],
290            [1, 172, 255, 193, 208],
291            [0, 214, 127, 224, 232],
292            [0, 107, 63, 240, 116],
293            [0, 53, 159, 248, 58],
294        ];
295        for i in expected.len()..0 {
296            let zult = left_shift(&input[..], i);
297            zult.iter().zip(expected[i]).for_each(|(x, y)| {
298                assert_eq!(
299                    x, &y,
300                    "test:{i} expected:{expected:?} got:{zult:?} for {input:?}",
301                );
302            });
303        }
304    }
305
306    #[test]
307    fn create_patterns_over_asm_bytes() {
308        let asm = ASM;
309        let (patterns, _) = create_patterns(ASM.as_ref());
310        for (i, x) in asm.iter().enumerate() {
311            assert_eq!(patterns[0][i], *x, "missmatch at index {i}");
312        }
313
314        let expected = [
315            [13, 103, 254, 14, 128],
316            [6, 179, 255, 7, 64],
317            [3, 89, 255, 131, 160],
318            [1, 172, 255, 193, 208],
319            [0, 214, 127, 224, 232],
320            [0, 107, 63, 240, 116],
321            [0, 53, 159, 248, 58],
322        ];
323        for i in 1..patterns.len() {
324            assert_eq!(patterns[i], expected[i - 1]);
325        }
326    }
327
328    mod scanner_tests {
329        use super::*;
330
331        #[test]
332        fn ccsds_asm_with_no_bitshift_succeeds() {
333            let r = &ASM[..];
334            let mut scanner = Synchronizer::new(r, 0);
335            let loc = scanner.scan().expect("Expected scan to succeed");
336
337            let expected = Loc { offset: 5, bit: 0 };
338            assert_eq!(loc.unwrap(), expected);
339        }
340
341        #[test]
342        fn ccsds_asm_shifted() {
343            let patterns: Vec<[u8; 5]> = vec![
344                [13, 103, 254, 14, 128],
345                [6, 179, 255, 7, 64],
346                [3, 89, 255, 131, 160],
347                [1, 172, 255, 193, 208],
348                [0, 214, 127, 224, 232],
349                [0, 107, 63, 240, 116],
350                [0, 53, 159, 248, 58],
351            ];
352            for (i, pat) in patterns.iter().enumerate() {
353                let mut scanner = Synchronizer::new(&pat[..], 0);
354                let msg = format!("expected sync for {pat:?}");
355                let loc = scanner.scan().unwrap_or_else(|_| panic!("{msg}"));
356
357                let expected = Loc {
358                    offset: 5,
359                    bit: 7 - u8::try_from(i).unwrap(),
360                };
361                assert_eq!(loc.unwrap(), expected, "pattern {pat:?}");
362            }
363        }
364
365        #[test]
366        fn ccsds_asm_shifted_right_one_bit() {
367            let r: &[u8] = &[13, 103, 254, 14, 128];
368            let mut scanner = Synchronizer::new(r, 0);
369            let loc = scanner.scan().unwrap();
370
371            let expected = Loc { offset: 5, bit: 7 };
372            assert_eq!(loc.unwrap(), expected);
373        }
374
375        #[test]
376        fn block_fcn_returns_correct_bytes_with_no_shift() {
377            let asm = vec![0x55];
378            let r: &[u8] = &[0x55, 0x01, 0x02, 0x00, 0x00, 0x55, 0x03, 0x04, 0x00, 0x00];
379            let mut scanner = Synchronizer::new(r, 2).with_asm(&asm);
380
381            // First block
382            let loc = scanner.scan().expect("Expected scan 1 to succeed");
383            let expected = Loc { offset: 2, bit: 0 };
384            assert_eq!(loc.unwrap(), expected);
385            let block = scanner.block().expect("Expected block 1 to succeed");
386            assert_eq!(block, [0x01, 0x02]);
387
388            // Second block
389            let loc = scanner.scan().expect("Expected scan 2 to succeed");
390            let expected = Loc { offset: 7, bit: 0 };
391            assert_eq!(loc.unwrap(), expected);
392            let block = scanner.block().expect("Expected block 2 to succeed");
393            assert_eq!(block, [0x03, 0x04]);
394        }
395
396        #[test]
397        fn block_fcn_returns_correct_bytes_when_shifted_1() {
398            let asm = vec![0b0101_0101];
399            let r: &[u8] = &[
400                0b0010_1010,
401                0b1000_0000,
402                0b1000_0001,
403                0b0000_0000,
404                0b0000_0000,
405                0b0010_1010,
406                0b1000_0001,
407                0b1000_0010,
408                0b0000_0000,
409                0b0000_0000,
410                0b0000_0000,
411            ];
412            let mut scanner = Synchronizer::new(r, 2).with_asm(&asm);
413
414            // First block
415            let loc = scanner.scan().expect("Expected scan 1 to succeed");
416            let expected = Loc { offset: 2, bit: 7 };
417            assert_eq!(loc.unwrap(), expected);
418            let block = scanner.block().expect("Expected block 1 to succeed");
419            assert_eq!(block, [0x01, 0x02]);
420
421            // Second block
422            let loc = scanner.scan().expect("Expected scan 2 to succeed");
423            let expected = Loc { offset: 7, bit: 7 };
424            assert_eq!(loc.unwrap(), expected);
425            let block = scanner.block().expect("Expected block 2 to succeed");
426            assert_eq!(block, [0x03, 0x04]);
427        }
428    }
429}