fluvio_smartstream_executor/smartstream/
file_batch.rs

1use dataplane::batch::{Batch, BATCH_FILE_HEADER_SIZE, BATCH_HEADER_SIZE};
2use dataplane::Offset;
3use std::io::{Error as IoError, ErrorKind, Cursor};
4use tracing::{warn, debug};
5use std::os::unix::io::RawFd;
6use nix::sys::uio::pread;
7use fluvio_future::file_slice::AsyncFileSlice;
8
9// only encode information necessary to decode batches efficiently
10pub struct FileBatch {
11    pub(crate) batch: Batch,
12    pub(crate) records: Vec<u8>,
13}
14
15impl FileBatch {
16    pub(crate) fn base_offset(&self) -> Offset {
17        self.batch.base_offset
18    }
19
20    pub(crate) fn offset_delta(&self) -> i32 {
21        self.batch.header.last_offset_delta
22    }
23}
24
25/// Iterator that returns batch from file
26pub struct FileBatchIterator {
27    fd: RawFd,
28    offset: i64,
29    end: i64,
30}
31
32impl FileBatchIterator {
33    #[allow(unused)]
34    pub fn new(fd: RawFd, offset: i64, len: i64) -> Self {
35        Self {
36            fd,
37            offset,
38            end: offset + len,
39        }
40    }
41
42    pub fn from_raw_slice(slice: AsyncFileSlice) -> Self {
43        use std::os::unix::io::AsRawFd;
44        let offset = slice.position() as i64;
45        Self {
46            fd: slice.as_raw_fd(),
47            offset,
48            end: offset + slice.len() as i64,
49        }
50    }
51}
52
53impl Iterator for FileBatchIterator {
54    type Item = Result<FileBatch, IoError>;
55
56    fn next(&mut self) -> Option<Self::Item> {
57        if self.offset >= self.end {
58            return None;
59        }
60
61        let mut header = vec![0u8; BATCH_FILE_HEADER_SIZE];
62        let bytes_read = match pread(self.fd, &mut header, self.offset)
63            .map_err(|err| IoError::new(ErrorKind::Other, format!("pread error {}", err)))
64        {
65            Ok(bytes) => bytes,
66            Err(err) => return Some(Err(err)),
67        };
68
69        if bytes_read < header.len() {
70            warn!(bytes_read, header_len = header.len());
71            return Some(Err(IoError::new(
72                ErrorKind::UnexpectedEof,
73                format!(
74                    "not eough for batch header {} out of {}",
75                    bytes_read,
76                    header.len()
77                ),
78            )));
79        }
80
81        let mut batch = Batch::default();
82        if let Err(err) = batch.decode_from_file_buf(&mut Cursor::new(header), 0) {
83            return Some(Err(IoError::new(
84                ErrorKind::Other,
85                format!("decodinge batch header error {}", err),
86            )));
87        }
88
89        let remainder = batch.batch_len as usize - BATCH_HEADER_SIZE as usize;
90
91        debug!(
92            file_offset = self.offset,
93            base_offset = batch.base_offset,
94            "fbatch header"
95        );
96
97        let mut records = vec![0u8; remainder];
98
99        self.offset += BATCH_FILE_HEADER_SIZE as i64;
100
101        let bytes_read = match pread(self.fd, &mut records, self.offset)
102            .map_err(|err| IoError::new(ErrorKind::Other, format!("pread error {}", err)))
103        {
104            Ok(bytes) => bytes,
105            Err(err) => return Some(Err(err)),
106        };
107
108        if bytes_read < records.len() {
109            warn!(bytes_read, record_len = records.len());
110            return Some(Err(IoError::new(
111                ErrorKind::UnexpectedEof,
112                format!(
113                    "not enough for batch records {} out of {}",
114                    bytes_read,
115                    records.len()
116                ),
117            )));
118        }
119
120        self.offset += bytes_read as i64;
121
122        debug!(file_offset = self.offset, "fbatch end");
123
124        Some(Ok(FileBatch { batch, records }))
125    }
126}
127
128#[cfg(test)]
129mod test {
130    use std::{fs::File, io::Write};
131    use std::env::temp_dir;
132    use std::os::unix::io::AsRawFd;
133
134    use fluvio_storage::config::DEFAULT_MAX_BATCH_SIZE;
135
136    use super::*;
137
138    #[test]
139    fn test_file() {
140        let path = temp_dir().join("pread.txt");
141        let mut file = File::create(&path).expect("create");
142        file.write_all(b"Hello, world!").expect("write");
143        file.sync_all().expect("flush");
144        drop(file);
145
146        let read_only = File::open(path).expect("open");
147        let fd = read_only.as_raw_fd();
148        let mut buf = vec![0; DEFAULT_MAX_BATCH_SIZE as usize];
149        // let mut buf = BytesMut::with_capacity(64);
150        let bytes_read = pread(fd, &mut buf, 1).expect("");
151        println!("bytes read: {}", bytes_read);
152        assert!(bytes_read > 2);
153    }
154}