Skip to main content

nano_wal/
segment.rs

1use std::fmt;
2use std::fs::{File, OpenOptions};
3use std::io::{Read, Seek, SeekFrom, Write};
4use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8
9use crate::error::{Result, WalError};
10use crate::{FILE_HEADER_SIZE, NANO_LOG_SIGNATURE};
11
12/// A single WAL segment file with concurrent read/write support.
13///
14/// Writes go through the mutex-protected `File` handle.
15/// Reads use a dup'd file descriptor (`read_fd`) that is independent
16/// of the write cursor, enabling lock-free `pread`/`preadv` calls.
17pub struct Segment {
18    pub(crate) file: Mutex<File>,
19    read_fd: Arc<OwnedFd>,
20    path: PathBuf,
21    expiration_ms: i64,
22    file_size: AtomicU64,
23}
24
25impl Segment {
26    /// Create a new segment file, writing the 16-byte header.
27    ///
28    /// Header layout: `[NANO-LOG (8 bytes)][expiration_ms LE (8 bytes)]`
29    pub(crate) fn create(path: &Path, expiration_ms: i64) -> Result<Self> {
30        let mut file = OpenOptions::new()
31            .create(true)
32            .read(true)
33            .write(true)
34            .truncate(true)
35            .open(path)?;
36
37        // Write header
38        let mut header = [0u8; FILE_HEADER_SIZE];
39        header[..8].copy_from_slice(&NANO_LOG_SIGNATURE);
40        header[8..16].copy_from_slice(&expiration_ms.to_le_bytes());
41        file.write_all(&header)?;
42        file.flush()?;
43
44        let read_fd = Arc::new(dup_read_fd(&file)?);
45
46        Ok(Segment {
47            file: Mutex::new(file),
48            read_fd,
49            path: path.to_path_buf(),
50            expiration_ms,
51            file_size: AtomicU64::new(FILE_HEADER_SIZE as u64),
52        })
53    }
54
55    /// Reopen an existing segment file, validating its header.
56    pub(crate) fn open(path: &Path, expected_expiration_ms: i64) -> Result<Self> {
57        let mut file = OpenOptions::new()
58            .read(true)
59            .append(true)
60            .open(path)?;
61
62        // Read and validate header
63        let mut header = [0u8; FILE_HEADER_SIZE];
64        file.seek(SeekFrom::Start(0))?;
65        file.read_exact(&mut header).map_err(|e| {
66            WalError::CorruptedData(format!("failed to read segment header: {}", e))
67        })?;
68
69        // Validate magic
70        if header[..8] != NANO_LOG_SIGNATURE {
71            return Err(WalError::CorruptedData(format!(
72                "invalid segment magic: expected NANO-LOG, got {:?}",
73                &header[..8]
74            )));
75        }
76
77        // Validate expiration
78        let stored_expiration = i64::from_le_bytes(header[8..16].try_into().unwrap());
79        if stored_expiration != expected_expiration_ms {
80            return Err(WalError::CorruptedData(format!(
81                "expiration mismatch: expected {}, got {}",
82                expected_expiration_ms, stored_expiration
83            )));
84        }
85
86        // Seek to end to get file size
87        let file_size = file.seek(SeekFrom::End(0))?;
88
89        let read_fd = Arc::new(dup_read_fd(&file)?);
90
91        Ok(Segment {
92            file: Mutex::new(file),
93            read_fd,
94            path: path.to_path_buf(),
95            expiration_ms: expected_expiration_ms,
96            file_size: AtomicU64::new(file_size),
97        })
98    }
99
100    /// Returns the dup'd read file descriptor for lock-free pread/preadv.
101    pub fn read_fd(&self) -> &Arc<OwnedFd> {
102        &self.read_fd
103    }
104
105    /// Returns the current total file size (header + data).
106    pub fn file_size(&self) -> u64 {
107        self.file_size.load(Ordering::Acquire)
108    }
109
110    /// Atomically adds `bytes` to the tracked file size; returns the previous value.
111    pub(crate) fn add_file_size(&self, bytes: u64) -> u64 {
112        self.file_size.fetch_add(bytes, Ordering::AcqRel)
113    }
114
115    /// Returns the immutable expiration timestamp (ms since epoch).
116    pub fn expiration_ms(&self) -> i64 {
117        self.expiration_ms
118    }
119
120    /// Returns the path to this segment file.
121    pub fn path(&self) -> &Path {
122        &self.path
123    }
124}
125
126impl fmt::Debug for Segment {
127    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128        f.debug_struct("Segment")
129            .field("path", &self.path)
130            .field("expiration_ms", &self.expiration_ms)
131            .field("file_size", &self.file_size.load(Ordering::Relaxed))
132            .finish()
133    }
134}
135
136/// Duplicate a file descriptor for independent read access.
137///
138/// The duplicated fd is configured with `POSIX_FADV_SEQUENTIAL` on Linux
139/// to hint the kernel for sequential read-ahead.
140pub(crate) fn dup_read_fd(file: &File) -> Result<OwnedFd> {
141    let raw = file.as_raw_fd();
142    let duped = unsafe { libc::dup(raw) };
143    if duped < 0 {
144        return Err(WalError::Io(std::io::Error::last_os_error()));
145    }
146
147    // On Linux, hint the kernel for sequential reads
148    #[cfg(target_os = "linux")]
149    {
150        unsafe {
151            libc::posix_fadvise(duped, 0, 0, libc::POSIX_FADV_SEQUENTIAL);
152        }
153    }
154
155    Ok(unsafe { OwnedFd::from_raw_fd(duped) })
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use std::io::Write;
162    use tempfile::TempDir;
163
164    #[test]
165    fn test_create_segment_writes_header() {
166        let dir = TempDir::new().unwrap();
167        let path = dir.path().join("test_12345.seg");
168        let segment = Segment::create(&path, 12345).unwrap();
169
170        assert_eq!(segment.expiration_ms(), 12345);
171        assert_eq!(segment.file_size(), FILE_HEADER_SIZE as u64);
172        assert!(segment.path() == path);
173
174        let data = std::fs::read(&path).unwrap();
175        assert_eq!(&data[0..8], b"NANO-LOG");
176        assert_eq!(i64::from_le_bytes(data[8..16].try_into().unwrap()), 12345);
177    }
178
179    #[test]
180    fn test_read_fd_is_independent() {
181        let dir = TempDir::new().unwrap();
182        let path = dir.path().join("test_99999.seg");
183        let segment = Segment::create(&path, 99999).unwrap();
184
185        {
186            let mut file = segment.file.lock().unwrap();
187            file.write_all(b"extra data").unwrap();
188            file.flush().unwrap();
189        }
190
191        let fd = segment.read_fd();
192        let mut buf = [0u8; 8];
193        let ret = unsafe {
194            libc::pread(
195                std::os::fd::AsRawFd::as_raw_fd(fd.as_ref()),
196                buf.as_mut_ptr() as *mut libc::c_void,
197                8,
198                0,
199            )
200        };
201        assert_eq!(ret, 8);
202        assert_eq!(&buf, b"NANO-LOG");
203    }
204
205    #[test]
206    fn test_reopen_segment() {
207        let dir = TempDir::new().unwrap();
208        let path = dir.path().join("test_55555.seg");
209
210        {
211            let segment = Segment::create(&path, 55555).unwrap();
212            let mut file = segment.file.lock().unwrap();
213            file.write_all(b"payload").unwrap();
214        }
215
216        let segment = Segment::open(&path, 55555).unwrap();
217        assert_eq!(segment.expiration_ms(), 55555);
218        assert_eq!(segment.file_size(), (FILE_HEADER_SIZE + 7) as u64);
219    }
220}