selium_log/data/
iterator.rs

1use crate::{
2    error::Result,
3    message::{Headers, Message, CRC_SIZE, HEADERS_SIZE},
4};
5use tokio::{
6    fs::File,
7    io::{AsyncReadExt, BufReader},
8};
9
10/// An iterator over a [Data](crate::data::Data) file.
11///
12/// The LogIterator acts as an active reader over the log, pulling messages from the log
13/// and decoding them, while maintaining a cursor to ensure that superflous reads are not performed
14/// when a read limit is provided via `end_position`.
15#[derive(Debug)]
16pub struct LogIterator {
17    reader: BufReader<File>,
18    cursor: u64,
19    end_position: u64,
20}
21
22impl LogIterator {
23    /// Constructs a new LogIterator instance.
24    pub fn new(reader: BufReader<File>, cursor: u64, end_position: u64) -> Self {
25        Self {
26            reader,
27            cursor,
28            end_position,
29        }
30    }
31
32    /// Attempts to decode and retrieve the next message from the `reader`.
33    /// Returns [Option::None] if there are no more messages to decode.
34    ///
35    /// # Errors
36    /// Returns std::io::ErrorKind::UnexpectedEof if the an unexpected end-of-file
37    /// is encountered due to a partially committed or corrupted message.
38    pub async fn next(&mut self) -> Result<Option<Message>> {
39        if self.cursor >= self.end_position {
40            return Ok(None);
41        }
42
43        let mut headers = vec![0; HEADERS_SIZE];
44        self.reader.read_exact(&mut headers).await?;
45
46        let headers = Headers::decode(&headers);
47        let remainder_len = headers.length() as usize - HEADERS_SIZE;
48        let combined_len = HEADERS_SIZE + remainder_len;
49        let records_len = remainder_len - CRC_SIZE;
50
51        let mut remainder = vec![0; remainder_len];
52        self.reader.read_exact(&mut remainder).await?;
53
54        let records = &remainder[..records_len];
55        let mut crc = [0; CRC_SIZE];
56        crc.copy_from_slice(&remainder[records_len..]);
57        let crc = u32::from_be_bytes(crc);
58        let message = Message::new(headers, records, crc);
59
60        self.cursor += combined_len as u64;
61
62        Ok(Some(message))
63    }
64}