Skip to main content

nano_wal/
read.rs

1use std::collections::HashMap;
2use std::os::fd::{AsRawFd, OwnedFd};
3use std::sync::Arc;
4
5use bytes::{Bytes, BytesMut};
6
7use crate::error::{Result, WalError};
8use crate::{NANO_REC_SIGNATURE, RECORD_FRAMING_SIZE};
9
10/// Maximum entries per single preadv call (safety margin below UIO_MAXIOV).
11const MAX_IOVECS_PER_PREADV: usize = 512;
12
13/// Descriptor for a single record to be read from a segment.
14pub struct ReadDescriptor {
15    pub read_fd: Arc<OwnedFd>,
16    pub file_offset: u64,
17    pub byte_size: usize,
18}
19
20/// A parsed record read from the WAL.
21pub struct Record {
22    pub header: Option<Bytes>,
23    pub content: Bytes,
24}
25
26/// Read a single record from the given fd at the specified offset.
27pub(crate) fn read_single(fd: &Arc<OwnedFd>, file_offset: u64, byte_size: usize) -> Result<Record> {
28    if byte_size < RECORD_FRAMING_SIZE {
29        return Err(WalError::CorruptedData(format!(
30            "record byte_size {} is less than framing size {}",
31            byte_size, RECORD_FRAMING_SIZE
32        )));
33    }
34
35    let mut buf = vec![0u8; byte_size];
36    let iov = libc::iovec {
37        iov_base: buf.as_mut_ptr() as *mut libc::c_void,
38        iov_len: byte_size,
39    };
40
41    let n = unsafe {
42        libc::preadv(
43            fd.as_raw_fd(),
44            &iov as *const libc::iovec,
45            1,
46            file_offset as i64,
47        )
48    };
49
50    if n < 0 {
51        return Err(WalError::Io(std::io::Error::last_os_error()));
52    }
53    if (n as usize) != byte_size {
54        return Err(WalError::CorruptedData(format!(
55            "short preadv: expected {} bytes, got {}",
56            byte_size, n
57        )));
58    }
59
60    parse_record(&buf)
61}
62
63/// Parse a record from a raw buffer.
64///
65/// Layout: `[NANORC (6)][header_len LE u16 (2)][header (N)][content_len LE u64 (8)][content (N)]`
66fn parse_record(buf: &[u8]) -> Result<Record> {
67    if buf.len() < RECORD_FRAMING_SIZE {
68        return Err(WalError::CorruptedData(format!(
69            "buffer too small: {} < {}",
70            buf.len(),
71            RECORD_FRAMING_SIZE
72        )));
73    }
74
75    // Validate signature
76    if buf[0..6] != NANO_REC_SIGNATURE {
77        return Err(WalError::CorruptedData(format!(
78            "invalid record signature: expected NANORC, got {:?}",
79            &buf[0..6]
80        )));
81    }
82
83    // Read header_len
84    let header_len = u16::from_le_bytes(buf[6..8].try_into().unwrap()) as usize;
85
86    // Offset to content_len field
87    let content_len_offset = 8 + header_len;
88    if buf.len() < content_len_offset + 8 {
89        return Err(WalError::CorruptedData(format!(
90            "buffer too small for content_len: need {}, have {}",
91            content_len_offset + 8,
92            buf.len()
93        )));
94    }
95
96    let content_len = u64::from_le_bytes(
97        buf[content_len_offset..content_len_offset + 8]
98            .try_into()
99            .unwrap(),
100    ) as usize;
101
102    let content_offset = content_len_offset + 8;
103    let expected_total = content_offset + content_len;
104    if buf.len() < expected_total {
105        return Err(WalError::CorruptedData(format!(
106            "buffer too small for content: need {}, have {}",
107            expected_total,
108            buf.len()
109        )));
110    }
111
112    let header = if header_len > 0 {
113        Some(Bytes::copy_from_slice(&buf[8..8 + header_len]))
114    } else {
115        None
116    };
117
118    let content = Bytes::copy_from_slice(&buf[content_offset..content_offset + content_len]);
119
120    Ok(Record { header, content })
121}
122
123/// Read a batch of records, coalescing contiguous reads on the same fd into
124/// single preadv syscalls.
125///
126/// Records are returned in the same order as the input descriptors.
127pub fn read_batch(reads: &[ReadDescriptor]) -> Result<Vec<Record>> {
128    if reads.is_empty() {
129        return Ok(Vec::new());
130    }
131
132    if reads.len() == 1 {
133        let r = &reads[0];
134        let record = read_single(&r.read_fd, r.file_offset, r.byte_size)?;
135        return Ok(vec![record]);
136    }
137
138    // Results vec indexed by original position
139    let mut results: Vec<Option<Record>> = (0..reads.len()).map(|_| None).collect();
140
141    // Group indices by fd (using raw fd value as key)
142    let mut groups: HashMap<i32, Vec<usize>> = HashMap::new();
143    for (i, r) in reads.iter().enumerate() {
144        groups
145            .entry(r.read_fd.as_raw_fd())
146            .or_default()
147            .push(i);
148    }
149
150    for (_fd_raw, mut indices) in groups {
151        // Sort by file_offset within this fd group
152        indices.sort_by_key(|&i| reads[i].file_offset);
153
154        // Detect contiguous runs
155        let mut run_start = 0;
156        while run_start < indices.len() {
157            let mut run_end = run_start;
158            while run_end + 1 < indices.len() {
159                let curr_idx = indices[run_end];
160                let next_idx = indices[run_end + 1];
161                let curr = &reads[curr_idx];
162                let next = &reads[next_idx];
163                if next.file_offset == curr.file_offset + curr.byte_size as u64 {
164                    run_end += 1;
165                } else {
166                    break;
167                }
168            }
169
170            let run = &indices[run_start..=run_end];
171            if run.len() == 1 {
172                // Single entry run
173                let idx = run[0];
174                let r = &reads[idx];
175                results[idx] = Some(read_single(&r.read_fd, r.file_offset, r.byte_size)?);
176            } else {
177                // Coalesced multi-entry run -- chunk to respect MAX_IOVECS_PER_PREADV
178                for chunk in run.chunks(MAX_IOVECS_PER_PREADV) {
179                    let base_offset = reads[chunk[0]].file_offset;
180                    let fd = &reads[chunk[0]].read_fd;
181
182                    // Allocate buffers for each entry in the chunk
183                    let mut bufs: Vec<BytesMut> = chunk
184                        .iter()
185                        .map(|&idx| BytesMut::zeroed(reads[idx].byte_size))
186                        .collect();
187
188                    // Build iovecs
189                    let iovecs: Vec<libc::iovec> = bufs
190                        .iter_mut()
191                        .map(|b| libc::iovec {
192                            iov_base: b.as_mut_ptr() as *mut libc::c_void,
193                            iov_len: b.len(),
194                        })
195                        .collect();
196
197                    let total_bytes: usize = chunk.iter().map(|&idx| reads[idx].byte_size).sum();
198
199                    let n = unsafe {
200                        libc::preadv(
201                            fd.as_raw_fd(),
202                            iovecs.as_ptr(),
203                            iovecs.len() as i32,
204                            base_offset as i64,
205                        )
206                    };
207
208                    if n < 0 {
209                        return Err(WalError::Io(std::io::Error::last_os_error()));
210                    }
211                    if (n as usize) != total_bytes {
212                        return Err(WalError::CorruptedData(format!(
213                            "short coalesced preadv: expected {} bytes, got {}",
214                            total_bytes, n
215                        )));
216                    }
217
218                    // Parse each buffer
219                    for (j, &idx) in chunk.iter().enumerate() {
220                        results[idx] = Some(parse_record(&bufs[j])?);
221                    }
222                }
223            }
224
225            run_start = run_end + 1;
226        }
227    }
228
229    // Unwrap all results in order
230    let records: Vec<Record> = results
231        .into_iter()
232        .enumerate()
233        .map(|(i, opt)| {
234            opt.unwrap_or_else(|| panic!("BUG: missing record at index {}", i))
235        })
236        .collect();
237
238    Ok(records)
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use crate::{Wal, WalOptions, WriteEntry};
245    use tempfile::TempDir;
246    use std::time::Duration;
247
248    fn test_options() -> WalOptions {
249        WalOptions {
250            retention: Duration::from_secs(3600),
251            segment_duration: Duration::from_secs(600),
252        }
253    }
254
255    #[test]
256    fn test_read_single_no_header() {
257        let dir = TempDir::new().unwrap();
258        let wal = Wal::new(dir.path(), "r1", test_options()).unwrap();
259        let now = 1_711_234_567_890i64;
260        let entry = wal.append(None, b"hello world", now, false).unwrap();
261        let seg = wal.ensure_segment(now).unwrap();
262        let record = wal.read_at(&seg, entry.file_offset, entry.byte_size).unwrap();
263        assert!(record.header.is_none());
264        assert_eq!(record.content.as_ref(), b"hello world");
265    }
266
267    #[test]
268    fn test_read_single_with_header() {
269        let dir = TempDir::new().unwrap();
270        let wal = Wal::new(dir.path(), "r2", test_options()).unwrap();
271        let now = 1_711_234_567_890i64;
272        let entry = wal.append(Some(b"meta"), b"payload", now, false).unwrap();
273        let seg = wal.ensure_segment(now).unwrap();
274        let record = wal.read_at(&seg, entry.file_offset, entry.byte_size).unwrap();
275        assert_eq!(record.header.as_ref().unwrap().as_ref(), b"meta");
276        assert_eq!(record.content.as_ref(), b"payload");
277    }
278
279    #[test]
280    fn test_read_batch_multiple() {
281        let dir = TempDir::new().unwrap();
282        let wal = Wal::new(dir.path(), "rb", test_options()).unwrap();
283        let now = 1_711_234_567_890i64;
284        let entries = vec![
285            WriteEntry { header: None, content: b"first" },
286            WriteEntry { header: Some(b"h"), content: b"second" },
287            WriteEntry { header: None, content: b"third" },
288        ];
289        let refs = wal.append_batch(&entries, now, false).unwrap();
290        let seg = wal.ensure_segment(now).unwrap();
291
292        let descriptors: Vec<ReadDescriptor> = refs.iter().map(|r| ReadDescriptor {
293            read_fd: seg.read_fd().clone(),
294            file_offset: r.file_offset,
295            byte_size: r.byte_size,
296        }).collect();
297
298        let records = read_batch(&descriptors).unwrap();
299        assert_eq!(records.len(), 3);
300        assert_eq!(records[0].content.as_ref(), b"first");
301        assert!(records[0].header.is_none());
302        assert_eq!(records[1].content.as_ref(), b"second");
303        assert_eq!(records[1].header.as_ref().unwrap().as_ref(), b"h");
304        assert_eq!(records[2].content.as_ref(), b"third");
305    }
306
307    #[test]
308    fn test_read_batch_empty() {
309        let records = read_batch(&[]).unwrap();
310        assert!(records.is_empty());
311    }
312}