rustic_cdc/
chunk.rs

1use crate::Separator;
2
3/// A chunk is a part of a stream of data that is separated by a separator.
4#[derive(Debug, Clone, Copy)]
5pub struct Chunk {
6    /// The index of the chunk in the stream.
7    pub index: u64,
8
9    /// The size of the chunk.
10    pub size: u64,
11
12    /// The hash of the separator that separates the chunk from the next chunk.
13    pub separator_hash: u64,
14}
15
16/// An iterator that chunks data.
17#[derive(Debug)]
18pub struct ChunkIter<Iter> {
19    /// The separators that separate the chunks.
20    separators: Iter,
21
22    /// The length of the stream.
23    stream_length: u64,
24
25    /// The index of the last separator.
26    last_separator_index: u64,
27}
28
29impl<Iter: Iterator<Item = Separator>> ChunkIter<Iter> {
30    /// Creates a new `ChunkIter`.
31    ///
32    /// # Arguments
33    ///
34    /// * `iter` - The separators that separate the chunks.
35    /// * `stream_length` - The length of the stream.
36    pub fn new(iter: Iter, stream_length: u64) -> Self {
37        Self {
38            separators: iter,
39            stream_length,
40            last_separator_index: 0,
41        }
42    }
43}
44
45impl<Iter: Iterator<Item = Separator>> Iterator for ChunkIter<Iter> {
46    type Item = Chunk;
47
48    fn next(&mut self) -> Option<Self::Item> {
49        if let Some(separator) = self.separators.next() {
50            let chunk_size = separator.index - self.last_separator_index;
51            self.last_separator_index = separator.index;
52            Some(Chunk {
53                index: self.last_separator_index,
54                size: chunk_size,
55                separator_hash: separator.hash,
56            })
57        } else {
58            let chunk_size = self.stream_length - self.last_separator_index;
59            self.last_separator_index = self.stream_length;
60            if chunk_size > 0 {
61                Some(Chunk {
62                    index: self.last_separator_index,
63                    size: chunk_size,
64                    separator_hash: 0, // any value is ok, last chunk of the stream.
65                })
66            } else {
67                None
68            }
69        }
70    }
71}