Skip to main content

rewal/
storage.rs

1use std::fs::{File, OpenOptions};
2use std::io::{Read, Seek, SeekFrom, Write};
3use std::path::Path;
4use std::sync::Mutex;
5
6use crate::error::{Error, Result};
7use crate::sys::{lock_file, unlock_file, FileLock};
8
9/// Persistence backend for the WAL.
10///
11/// The writer thread calls [`write`](Storage::write) sequentially; replay and
12/// recovery use memory-mapped I/O when available, falling back to
13/// [`read_at`](Storage::read_at) for custom implementations. Implementations
14/// must handle their own thread safety for concurrent read + write access.
15pub trait Storage: Send + Sync {
16    /// Appends data. The WAL does not seek before writing.
17    fn write(&mut self, buf: &[u8]) -> Result<usize>;
18    /// Ensures all written data is durable (fsync).
19    fn sync(&mut self) -> Result<()>;
20    /// Releases all resources. After close, all methods must return
21    /// [`Error::Closed`].
22    fn close(&mut self) -> Result<()>;
23    /// Current storage size in bytes.
24    fn size(&self) -> Result<u64>;
25    /// Shrinks storage to `size` bytes. Used during recovery to remove
26    /// corrupted trailing records.
27    fn truncate(&mut self, size: u64) -> Result<()>;
28    /// Reads `buf.len()` bytes starting at `offset`. Used as a fallback when
29    /// mmap is not available.
30    fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize>;
31}
32
33/// Default [`Storage`] backed by [`std::fs::File`] with advisory file locking.
34///
35/// Uses flock (Unix) or LockFileEx (Windows) to prevent concurrent access.
36/// All methods are protected by a mutex for thread safety.
37pub struct FileStorage {
38    inner: Mutex<FileStorageInner>,
39    path: String,
40}
41
42struct FileStorageInner {
43    file: Option<File>,
44    lock: Option<FileLock>,
45}
46
47impl FileStorage {
48    /// Opens or creates a WAL file at `path`. Acquires an exclusive lock.
49    pub fn new(path: impl AsRef<Path>) -> Result<Self> {
50        let path = path.as_ref();
51        let file = OpenOptions::new()
52            .create(true)
53            .truncate(false)
54            .read(true)
55            .write(true)
56            .open(path)?;
57
58        let lock = lock_file(&file).map_err(|_| Error::FileLocked)?;
59
60        let mut f = file;
61        f.seek(SeekFrom::End(0))?;
62
63        Ok(Self {
64            inner: Mutex::new(FileStorageInner {
65                file: Some(f),
66                lock: Some(lock),
67            }),
68            path: path.to_string_lossy().into_owned(),
69        })
70    }
71
72    /// Returns the filesystem path (lock-free).
73    #[inline]
74    pub fn path(&self) -> &str {
75        &self.path
76    }
77
78    /// Executes a closure with a reference to the underlying [`File`], if open.
79    /// Used by mmap and sync to obtain the file descriptor / handle.
80    pub(crate) fn with_file<R>(&self, f: impl FnOnce(&std::fs::File) -> R) -> Option<R> {
81        let guard = self.inner.lock().unwrap();
82        guard.file.as_ref().map(f)
83    }
84
85    /// Clones the underlying file handle (via `try_clone`), if open.
86    pub(crate) fn try_clone_file(&self) -> Option<std::io::Result<std::fs::File>> {
87        let guard = self.inner.lock().unwrap();
88        guard.file.as_ref().map(|f| f.try_clone())
89    }
90}
91
92impl FileStorageInner {
93    fn file_ref(&self) -> Result<&File> {
94        self.file.as_ref().ok_or(Error::Closed)
95    }
96
97    fn file_mut(&mut self) -> Result<&mut File> {
98        self.file.as_mut().ok_or(Error::Closed)
99    }
100}
101
102impl Storage for FileStorage {
103    fn write(&mut self, buf: &[u8]) -> Result<usize> {
104        let mut inner = self.inner.lock().unwrap();
105        let f = inner.file_mut()?;
106        Ok(f.write(buf)?)
107    }
108
109    fn sync(&mut self) -> Result<()> {
110        let inner = self.inner.lock().unwrap();
111        let f = inner.file_ref()?;
112        Ok(f.sync_all()?)
113    }
114
115    fn close(&mut self) -> Result<()> {
116        let mut inner = self.inner.lock().unwrap();
117        if let Some(lock) = inner.lock.take() {
118            unlock_file(&lock);
119        }
120        inner.file = None;
121        Ok(())
122    }
123
124    fn size(&self) -> Result<u64> {
125        let inner = self.inner.lock().unwrap();
126        let f = inner.file_ref()?;
127        Ok(f.metadata()?.len())
128    }
129
130    fn truncate(&mut self, size: u64) -> Result<()> {
131        let mut inner = self.inner.lock().unwrap();
132        let f = inner.file_mut()?;
133        f.set_len(size)?;
134        f.seek(SeekFrom::End(0))?;
135        Ok(())
136    }
137
138    fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize> {
139        let mut inner = self.inner.lock().unwrap();
140        let f = inner.file_mut()?;
141        f.seek(SeekFrom::Start(offset))?;
142        Ok(f.read(buf)?)
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149
150    #[test]
151    #[cfg_attr(miri, ignore)]
152    fn file_storage_lifecycle() {
153        let dir = std::env::temp_dir().join("rewal_test_storage");
154        let _ = std::fs::remove_file(&dir);
155
156        let mut fs = FileStorage::new(&dir).unwrap();
157        assert_eq!(fs.size().unwrap(), 0);
158
159        let n = Storage::write(&mut fs, b"hello").unwrap();
160        assert_eq!(n, 5);
161        assert_eq!(fs.size().unwrap(), 5);
162
163        fs.sync().unwrap();
164
165        let mut buf = vec![0u8; 5];
166        let n = fs.read_at(&mut buf, 0).unwrap();
167        assert_eq!(n, 5);
168        assert_eq!(&buf, b"hello");
169
170        fs.truncate(3).unwrap();
171        assert_eq!(fs.size().unwrap(), 3);
172
173        fs.close().unwrap();
174        assert!(Storage::write(&mut fs, b"x").is_err());
175
176        let _ = std::fs::remove_file(&dir);
177    }
178
179    #[test]
180    #[cfg_attr(miri, ignore)]
181    fn file_locking() {
182        let dir = std::env::temp_dir().join("rewal_test_flock");
183        let _ = std::fs::remove_file(&dir);
184
185        let _fs1 = FileStorage::new(&dir).unwrap();
186        let result = FileStorage::new(&dir);
187        assert!(result.is_err());
188        if let Err(e) = result {
189            assert_eq!(e, Error::FileLocked);
190        }
191
192        drop(_fs1);
193        let _ = std::fs::remove_file(&dir);
194    }
195}