fluvio_smartstream_executor/smartstream/
file_batch.rs1use dataplane::batch::{Batch, BATCH_FILE_HEADER_SIZE, BATCH_HEADER_SIZE};
2use dataplane::Offset;
3use std::io::{Error as IoError, ErrorKind, Cursor};
4use tracing::{warn, debug};
5use std::os::unix::io::RawFd;
6use nix::sys::uio::pread;
7use fluvio_future::file_slice::AsyncFileSlice;
8
9pub struct FileBatch {
11 pub(crate) batch: Batch,
12 pub(crate) records: Vec<u8>,
13}
14
15impl FileBatch {
16 pub(crate) fn base_offset(&self) -> Offset {
17 self.batch.base_offset
18 }
19
20 pub(crate) fn offset_delta(&self) -> i32 {
21 self.batch.header.last_offset_delta
22 }
23}
24
25pub struct FileBatchIterator {
27 fd: RawFd,
28 offset: i64,
29 end: i64,
30}
31
32impl FileBatchIterator {
33 #[allow(unused)]
34 pub fn new(fd: RawFd, offset: i64, len: i64) -> Self {
35 Self {
36 fd,
37 offset,
38 end: offset + len,
39 }
40 }
41
42 pub fn from_raw_slice(slice: AsyncFileSlice) -> Self {
43 use std::os::unix::io::AsRawFd;
44 let offset = slice.position() as i64;
45 Self {
46 fd: slice.as_raw_fd(),
47 offset,
48 end: offset + slice.len() as i64,
49 }
50 }
51}
52
53impl Iterator for FileBatchIterator {
54 type Item = Result<FileBatch, IoError>;
55
56 fn next(&mut self) -> Option<Self::Item> {
57 if self.offset >= self.end {
58 return None;
59 }
60
61 let mut header = vec![0u8; BATCH_FILE_HEADER_SIZE];
62 let bytes_read = match pread(self.fd, &mut header, self.offset)
63 .map_err(|err| IoError::new(ErrorKind::Other, format!("pread error {}", err)))
64 {
65 Ok(bytes) => bytes,
66 Err(err) => return Some(Err(err)),
67 };
68
69 if bytes_read < header.len() {
70 warn!(bytes_read, header_len = header.len());
71 return Some(Err(IoError::new(
72 ErrorKind::UnexpectedEof,
73 format!(
74 "not eough for batch header {} out of {}",
75 bytes_read,
76 header.len()
77 ),
78 )));
79 }
80
81 let mut batch = Batch::default();
82 if let Err(err) = batch.decode_from_file_buf(&mut Cursor::new(header), 0) {
83 return Some(Err(IoError::new(
84 ErrorKind::Other,
85 format!("decodinge batch header error {}", err),
86 )));
87 }
88
89 let remainder = batch.batch_len as usize - BATCH_HEADER_SIZE as usize;
90
91 debug!(
92 file_offset = self.offset,
93 base_offset = batch.base_offset,
94 "fbatch header"
95 );
96
97 let mut records = vec![0u8; remainder];
98
99 self.offset += BATCH_FILE_HEADER_SIZE as i64;
100
101 let bytes_read = match pread(self.fd, &mut records, self.offset)
102 .map_err(|err| IoError::new(ErrorKind::Other, format!("pread error {}", err)))
103 {
104 Ok(bytes) => bytes,
105 Err(err) => return Some(Err(err)),
106 };
107
108 if bytes_read < records.len() {
109 warn!(bytes_read, record_len = records.len());
110 return Some(Err(IoError::new(
111 ErrorKind::UnexpectedEof,
112 format!(
113 "not enough for batch records {} out of {}",
114 bytes_read,
115 records.len()
116 ),
117 )));
118 }
119
120 self.offset += bytes_read as i64;
121
122 debug!(file_offset = self.offset, "fbatch end");
123
124 Some(Ok(FileBatch { batch, records }))
125 }
126}
127
128#[cfg(test)]
129mod test {
130 use std::{fs::File, io::Write};
131 use std::env::temp_dir;
132 use std::os::unix::io::AsRawFd;
133
134 use fluvio_storage::config::DEFAULT_MAX_BATCH_SIZE;
135
136 use super::*;
137
138 #[test]
139 fn test_file() {
140 let path = temp_dir().join("pread.txt");
141 let mut file = File::create(&path).expect("create");
142 file.write_all(b"Hello, world!").expect("write");
143 file.sync_all().expect("flush");
144 drop(file);
145
146 let read_only = File::open(path).expect("open");
147 let fd = read_only.as_raw_fd();
148 let mut buf = vec![0; DEFAULT_MAX_BATCH_SIZE as usize];
149 let bytes_read = pread(fd, &mut buf, 1).expect("");
151 println!("bytes read: {}", bytes_read);
152 assert!(bytes_read > 2);
153 }
154}