selium_log/data/
iterator.rs1use crate::{
2 error::Result,
3 message::{Headers, Message, CRC_SIZE, HEADERS_SIZE},
4};
5use tokio::{
6 fs::File,
7 io::{AsyncReadExt, BufReader},
8};
9
10#[derive(Debug)]
16pub struct LogIterator {
17 reader: BufReader<File>,
18 cursor: u64,
19 end_position: u64,
20}
21
22impl LogIterator {
23 pub fn new(reader: BufReader<File>, cursor: u64, end_position: u64) -> Self {
25 Self {
26 reader,
27 cursor,
28 end_position,
29 }
30 }
31
32 pub async fn next(&mut self) -> Result<Option<Message>> {
39 if self.cursor >= self.end_position {
40 return Ok(None);
41 }
42
43 let mut headers = vec![0; HEADERS_SIZE];
44 self.reader.read_exact(&mut headers).await?;
45
46 let headers = Headers::decode(&headers);
47 let remainder_len = headers.length() as usize - HEADERS_SIZE;
48 let combined_len = HEADERS_SIZE + remainder_len;
49 let records_len = remainder_len - CRC_SIZE;
50
51 let mut remainder = vec![0; remainder_len];
52 self.reader.read_exact(&mut remainder).await?;
53
54 let records = &remainder[..records_len];
55 let mut crc = [0; CRC_SIZE];
56 crc.copy_from_slice(&remainder[records_len..]);
57 let crc = u32::from_be_bytes(crc);
58 let message = Message::new(headers, records, crc);
59
60 self.cursor += combined_len as u64;
61
62 Ok(Some(message))
63 }
64}