1use std::collections::HashMap;
2use std::os::fd::{AsRawFd, OwnedFd};
3use std::sync::Arc;
4
5use bytes::{Bytes, BytesMut};
6
7use crate::error::{Result, WalError};
8use crate::{NANO_REC_SIGNATURE, RECORD_FRAMING_SIZE};
9
10const MAX_IOVECS_PER_PREADV: usize = 512;
12
13pub struct ReadDescriptor {
15 pub read_fd: Arc<OwnedFd>,
16 pub file_offset: u64,
17 pub byte_size: usize,
18}
19
20pub struct Record {
22 pub header: Option<Bytes>,
23 pub content: Bytes,
24}
25
26pub(crate) fn read_single(fd: &Arc<OwnedFd>, file_offset: u64, byte_size: usize) -> Result<Record> {
28 if byte_size < RECORD_FRAMING_SIZE {
29 return Err(WalError::CorruptedData(format!(
30 "record byte_size {} is less than framing size {}",
31 byte_size, RECORD_FRAMING_SIZE
32 )));
33 }
34
35 let mut buf = vec![0u8; byte_size];
36 let iov = libc::iovec {
37 iov_base: buf.as_mut_ptr() as *mut libc::c_void,
38 iov_len: byte_size,
39 };
40
41 let n = unsafe {
42 libc::preadv(
43 fd.as_raw_fd(),
44 &iov as *const libc::iovec,
45 1,
46 file_offset as i64,
47 )
48 };
49
50 if n < 0 {
51 return Err(WalError::Io(std::io::Error::last_os_error()));
52 }
53 if (n as usize) != byte_size {
54 return Err(WalError::CorruptedData(format!(
55 "short preadv: expected {} bytes, got {}",
56 byte_size, n
57 )));
58 }
59
60 parse_record(&buf)
61}
62
63fn parse_record(buf: &[u8]) -> Result<Record> {
67 if buf.len() < RECORD_FRAMING_SIZE {
68 return Err(WalError::CorruptedData(format!(
69 "buffer too small: {} < {}",
70 buf.len(),
71 RECORD_FRAMING_SIZE
72 )));
73 }
74
75 if buf[0..6] != NANO_REC_SIGNATURE {
77 return Err(WalError::CorruptedData(format!(
78 "invalid record signature: expected NANORC, got {:?}",
79 &buf[0..6]
80 )));
81 }
82
83 let header_len = u16::from_le_bytes(buf[6..8].try_into().unwrap()) as usize;
85
86 let content_len_offset = 8 + header_len;
88 if buf.len() < content_len_offset + 8 {
89 return Err(WalError::CorruptedData(format!(
90 "buffer too small for content_len: need {}, have {}",
91 content_len_offset + 8,
92 buf.len()
93 )));
94 }
95
96 let content_len = u64::from_le_bytes(
97 buf[content_len_offset..content_len_offset + 8]
98 .try_into()
99 .unwrap(),
100 ) as usize;
101
102 let content_offset = content_len_offset + 8;
103 let expected_total = content_offset + content_len;
104 if buf.len() < expected_total {
105 return Err(WalError::CorruptedData(format!(
106 "buffer too small for content: need {}, have {}",
107 expected_total,
108 buf.len()
109 )));
110 }
111
112 let header = if header_len > 0 {
113 Some(Bytes::copy_from_slice(&buf[8..8 + header_len]))
114 } else {
115 None
116 };
117
118 let content = Bytes::copy_from_slice(&buf[content_offset..content_offset + content_len]);
119
120 Ok(Record { header, content })
121}
122
123pub fn read_batch(reads: &[ReadDescriptor]) -> Result<Vec<Record>> {
128 if reads.is_empty() {
129 return Ok(Vec::new());
130 }
131
132 if reads.len() == 1 {
133 let r = &reads[0];
134 let record = read_single(&r.read_fd, r.file_offset, r.byte_size)?;
135 return Ok(vec![record]);
136 }
137
138 let mut results: Vec<Option<Record>> = (0..reads.len()).map(|_| None).collect();
140
141 let mut groups: HashMap<i32, Vec<usize>> = HashMap::new();
143 for (i, r) in reads.iter().enumerate() {
144 groups
145 .entry(r.read_fd.as_raw_fd())
146 .or_default()
147 .push(i);
148 }
149
150 for (_fd_raw, mut indices) in groups {
151 indices.sort_by_key(|&i| reads[i].file_offset);
153
154 let mut run_start = 0;
156 while run_start < indices.len() {
157 let mut run_end = run_start;
158 while run_end + 1 < indices.len() {
159 let curr_idx = indices[run_end];
160 let next_idx = indices[run_end + 1];
161 let curr = &reads[curr_idx];
162 let next = &reads[next_idx];
163 if next.file_offset == curr.file_offset + curr.byte_size as u64 {
164 run_end += 1;
165 } else {
166 break;
167 }
168 }
169
170 let run = &indices[run_start..=run_end];
171 if run.len() == 1 {
172 let idx = run[0];
174 let r = &reads[idx];
175 results[idx] = Some(read_single(&r.read_fd, r.file_offset, r.byte_size)?);
176 } else {
177 for chunk in run.chunks(MAX_IOVECS_PER_PREADV) {
179 let base_offset = reads[chunk[0]].file_offset;
180 let fd = &reads[chunk[0]].read_fd;
181
182 let mut bufs: Vec<BytesMut> = chunk
184 .iter()
185 .map(|&idx| BytesMut::zeroed(reads[idx].byte_size))
186 .collect();
187
188 let iovecs: Vec<libc::iovec> = bufs
190 .iter_mut()
191 .map(|b| libc::iovec {
192 iov_base: b.as_mut_ptr() as *mut libc::c_void,
193 iov_len: b.len(),
194 })
195 .collect();
196
197 let total_bytes: usize = chunk.iter().map(|&idx| reads[idx].byte_size).sum();
198
199 let n = unsafe {
200 libc::preadv(
201 fd.as_raw_fd(),
202 iovecs.as_ptr(),
203 iovecs.len() as i32,
204 base_offset as i64,
205 )
206 };
207
208 if n < 0 {
209 return Err(WalError::Io(std::io::Error::last_os_error()));
210 }
211 if (n as usize) != total_bytes {
212 return Err(WalError::CorruptedData(format!(
213 "short coalesced preadv: expected {} bytes, got {}",
214 total_bytes, n
215 )));
216 }
217
218 for (j, &idx) in chunk.iter().enumerate() {
220 results[idx] = Some(parse_record(&bufs[j])?);
221 }
222 }
223 }
224
225 run_start = run_end + 1;
226 }
227 }
228
229 let records: Vec<Record> = results
231 .into_iter()
232 .enumerate()
233 .map(|(i, opt)| {
234 opt.unwrap_or_else(|| panic!("BUG: missing record at index {}", i))
235 })
236 .collect();
237
238 Ok(records)
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244 use crate::{Wal, WalOptions, WriteEntry};
245 use tempfile::TempDir;
246 use std::time::Duration;
247
248 fn test_options() -> WalOptions {
249 WalOptions {
250 retention: Duration::from_secs(3600),
251 segment_duration: Duration::from_secs(600),
252 }
253 }
254
255 #[test]
256 fn test_read_single_no_header() {
257 let dir = TempDir::new().unwrap();
258 let wal = Wal::new(dir.path(), "r1", test_options()).unwrap();
259 let now = 1_711_234_567_890i64;
260 let entry = wal.append(None, b"hello world", now, false).unwrap();
261 let seg = wal.ensure_segment(now).unwrap();
262 let record = wal.read_at(&seg, entry.file_offset, entry.byte_size).unwrap();
263 assert!(record.header.is_none());
264 assert_eq!(record.content.as_ref(), b"hello world");
265 }
266
267 #[test]
268 fn test_read_single_with_header() {
269 let dir = TempDir::new().unwrap();
270 let wal = Wal::new(dir.path(), "r2", test_options()).unwrap();
271 let now = 1_711_234_567_890i64;
272 let entry = wal.append(Some(b"meta"), b"payload", now, false).unwrap();
273 let seg = wal.ensure_segment(now).unwrap();
274 let record = wal.read_at(&seg, entry.file_offset, entry.byte_size).unwrap();
275 assert_eq!(record.header.as_ref().unwrap().as_ref(), b"meta");
276 assert_eq!(record.content.as_ref(), b"payload");
277 }
278
279 #[test]
280 fn test_read_batch_multiple() {
281 let dir = TempDir::new().unwrap();
282 let wal = Wal::new(dir.path(), "rb", test_options()).unwrap();
283 let now = 1_711_234_567_890i64;
284 let entries = vec![
285 WriteEntry { header: None, content: b"first" },
286 WriteEntry { header: Some(b"h"), content: b"second" },
287 WriteEntry { header: None, content: b"third" },
288 ];
289 let refs = wal.append_batch(&entries, now, false).unwrap();
290 let seg = wal.ensure_segment(now).unwrap();
291
292 let descriptors: Vec<ReadDescriptor> = refs.iter().map(|r| ReadDescriptor {
293 read_fd: seg.read_fd().clone(),
294 file_offset: r.file_offset,
295 byte_size: r.byte_size,
296 }).collect();
297
298 let records = read_batch(&descriptors).unwrap();
299 assert_eq!(records.len(), 3);
300 assert_eq!(records[0].content.as_ref(), b"first");
301 assert!(records[0].header.is_none());
302 assert_eq!(records[1].content.as_ref(), b"second");
303 assert_eq!(records[1].header.as_ref().unwrap().as_ref(), b"h");
304 assert_eq!(records[2].content.as_ref(), b"third");
305 }
306
307 #[test]
308 fn test_read_batch_empty() {
309 let records = read_batch(&[]).unwrap();
310 assert!(records.is_empty());
311 }
312}