1use std::{
13 fs::{File, OpenOptions},
14 io,
15 path::{Path, PathBuf},
16 sync::Mutex,
17};
18
19use crate::error::{Result, WalError};
20
21pub trait WalStore: Send + Sync {
60 fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()>;
63
64 fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize>;
70
71 fn truncate(&self, len: u64) -> Result<()>;
74
75 fn sync(&self) -> Result<()>;
80
81 fn len(&self) -> Result<u64>;
83
84 fn is_empty(&self) -> Result<bool> {
89 Ok(self.len()? == 0)
90 }
91}
92
93#[derive(Debug)]
103pub struct FileStore {
104 file: File,
105 path: PathBuf,
106}
107
108impl FileStore {
109 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
120 let path = path.as_ref().to_path_buf();
121 let file = OpenOptions::new()
122 .read(true)
123 .write(true)
124 .create(true)
125 .truncate(false)
126 .open(&path)
127 .map_err(|e| WalError::io("opening the log file", e))?;
128 Ok(FileStore { file, path })
129 }
130
131 #[must_use]
133 pub fn path(&self) -> &Path {
134 &self.path
135 }
136}
137
138impl WalStore for FileStore {
139 fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()> {
140 pwrite_all(&self.file, offset, bytes).map_err(|e| WalError::io("writing a record", e))
141 }
142
143 fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize> {
144 pread_fill(&self.file, offset, buf).map_err(|e| WalError::io("reading from the log", e))
145 }
146
147 fn truncate(&self, len: u64) -> Result<()> {
148 self.file
149 .set_len(len)
150 .map_err(|e| WalError::io("truncating the log", e))
151 }
152
153 fn sync(&self) -> Result<()> {
154 durable_sync(&self.file).map_err(|e| WalError::io("flushing to stable storage", e))
155 }
156
157 fn len(&self) -> Result<u64> {
158 Ok(self
159 .file
160 .metadata()
161 .map_err(|e| WalError::io("reading log file metadata", e))?
162 .len())
163 }
164}
165
166#[derive(Debug, Default)]
187pub struct MemStore {
188 data: Mutex<Vec<u8>>,
189}
190
191impl MemStore {
192 #[must_use]
194 pub fn new() -> Self {
195 MemStore {
196 data: Mutex::new(Vec::new()),
197 }
198 }
199
200 #[must_use]
203 pub fn with_capacity(capacity: usize) -> Self {
204 MemStore {
205 data: Mutex::new(Vec::with_capacity(capacity)),
206 }
207 }
208
209 #[must_use]
212 pub fn from_bytes(bytes: Vec<u8>) -> Self {
213 MemStore {
214 data: Mutex::new(bytes),
215 }
216 }
217
218 fn lock(&self) -> std::sync::MutexGuard<'_, Vec<u8>> {
219 self.data
220 .lock()
221 .unwrap_or_else(std::sync::PoisonError::into_inner)
222 }
223
224 #[cfg(test)]
227 pub(crate) fn snapshot(&self) -> Vec<u8> {
228 self.lock().clone()
229 }
230}
231
232impl Clone for MemStore {
233 fn clone(&self) -> Self {
234 MemStore {
235 data: Mutex::new(self.lock().clone()),
236 }
237 }
238}
239
240impl WalStore for MemStore {
241 fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()> {
242 let start = usize::try_from(offset).map_err(|_| {
243 WalError::io(
244 "writing to memory",
245 io::Error::other("offset exceeds usize"),
246 )
247 })?;
248 let end = start.checked_add(bytes.len()).ok_or_else(|| {
249 WalError::io(
250 "writing to memory",
251 io::Error::other("write overflows usize"),
252 )
253 })?;
254
255 let mut data = self.lock();
256 if data.len() < end {
257 data.resize(end, 0); }
259 data[start..end].copy_from_slice(bytes);
260 Ok(())
261 }
262
263 fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize> {
264 let data = self.lock();
265 let start = match usize::try_from(offset) {
266 Ok(start) if start < data.len() => start,
267 _ => return Ok(0),
268 };
269 let available = &data[start..];
270 let n = available.len().min(buf.len());
271 buf[..n].copy_from_slice(&available[..n]);
272 Ok(n)
273 }
274
275 fn truncate(&self, len: u64) -> Result<()> {
276 let len = usize::try_from(len).unwrap_or(usize::MAX);
277 self.lock().truncate(len);
278 Ok(())
279 }
280
281 fn sync(&self) -> Result<()> {
282 Ok(())
283 }
284
285 fn len(&self) -> Result<u64> {
286 Ok(self.lock().len() as u64)
287 }
288}
289
290#[cfg(target_os = "macos")]
301pub(crate) fn durable_sync(file: &File) -> io::Result<()> {
302 use std::os::unix::io::AsRawFd;
303
304 let fd = file.as_raw_fd();
305 let ret = unsafe { libc::fcntl(fd, libc::F_FULLFSYNC) };
311 if ret == -1 {
312 return Err(io::Error::last_os_error());
313 }
314 Ok(())
315}
316
317#[cfg(not(target_os = "macos"))]
321pub(crate) fn durable_sync(file: &File) -> io::Result<()> {
322 file.sync_data()
323}
324
325#[cfg(unix)]
332pub(crate) fn pwrite_all(file: &File, mut offset: u64, mut buf: &[u8]) -> io::Result<()> {
333 use std::os::unix::fs::FileExt;
334
335 while !buf.is_empty() {
336 match file.write_at(buf, offset) {
337 Ok(0) => {
338 return Err(io::Error::new(
339 io::ErrorKind::WriteZero,
340 "the store accepted zero bytes mid-record",
341 ));
342 }
343 Ok(n) => {
344 buf = &buf[n..];
345 offset += n as u64;
346 }
347 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
348 Err(e) => return Err(e),
349 }
350 }
351 Ok(())
352}
353
354#[cfg(windows)]
355pub(crate) fn pwrite_all(file: &File, mut offset: u64, mut buf: &[u8]) -> io::Result<()> {
356 use std::os::windows::fs::FileExt;
357
358 while !buf.is_empty() {
359 match file.seek_write(buf, offset) {
360 Ok(0) => {
361 return Err(io::Error::new(
362 io::ErrorKind::WriteZero,
363 "the store accepted zero bytes mid-record",
364 ));
365 }
366 Ok(n) => {
367 buf = &buf[n..];
368 offset += n as u64;
369 }
370 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
371 Err(e) => return Err(e),
372 }
373 }
374 Ok(())
375}
376
377#[cfg(unix)]
378pub(crate) fn pread_fill(file: &File, mut offset: u64, buf: &mut [u8]) -> io::Result<usize> {
379 use std::os::unix::fs::FileExt;
380
381 let mut total = 0;
382 while total < buf.len() {
383 match file.read_at(&mut buf[total..], offset) {
384 Ok(0) => break,
385 Ok(n) => {
386 total += n;
387 offset += n as u64;
388 }
389 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
390 Err(e) => return Err(e),
391 }
392 }
393 Ok(total)
394}
395
396#[cfg(windows)]
397pub(crate) fn pread_fill(file: &File, mut offset: u64, buf: &mut [u8]) -> io::Result<usize> {
398 use std::os::windows::fs::FileExt;
399
400 let mut total = 0;
401 while total < buf.len() {
402 match file.seek_read(&mut buf[total..], offset) {
403 Ok(0) => break,
404 Ok(n) => {
405 total += n;
406 offset += n as u64;
407 }
408 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
409 Err(e) => return Err(e),
410 }
411 }
412 Ok(total)
413}
414
415#[cfg(test)]
416#[allow(clippy::unwrap_used, clippy::expect_used)]
417mod tests {
418 use super::*;
419
420 #[test]
421 fn test_memstore_write_at_advances_len() {
422 let store = MemStore::new();
423 assert_eq!(store.len().unwrap(), 0);
424 store.write_at(0, b"abc").unwrap();
425 assert_eq!(store.len().unwrap(), 3);
426 store.write_at(3, b"de").unwrap();
427 assert_eq!(store.len().unwrap(), 5);
428 }
429
430 #[test]
431 fn test_memstore_write_past_end_zero_fills_gap() {
432 let store = MemStore::new();
433 store.write_at(4, b"XY").unwrap();
435 assert_eq!(store.len().unwrap(), 6);
436 let mut buf = [0xFFu8; 6];
437 assert_eq!(store.read_at(0, &mut buf).unwrap(), 6);
438 assert_eq!(&buf, &[0, 0, 0, 0, b'X', b'Y']);
439 }
440
441 #[test]
442 fn test_memstore_read_past_end_is_short() {
443 let store = MemStore::new();
444 store.write_at(0, b"abc").unwrap();
445 let mut buf = [0u8; 8];
446 assert_eq!(store.read_at(1, &mut buf).unwrap(), 2);
447 assert_eq!(&buf[..2], b"bc");
448 }
449
450 #[test]
451 fn test_memstore_truncate_shrinks() {
452 let store = MemStore::new();
453 store.write_at(0, b"0123456789").unwrap();
454 store.truncate(4).unwrap();
455 assert_eq!(store.len().unwrap(), 4);
456 }
457
458 #[test]
459 fn test_filestore_roundtrip_through_disk() {
460 let dir = tempfile::tempdir().unwrap();
461 let path = dir.path().join("store.bin");
462
463 {
464 let store = FileStore::open(&path).unwrap();
465 store.write_at(0, b"hello world").unwrap();
466 store.sync().unwrap();
467 assert_eq!(store.len().unwrap(), 11);
468 }
469
470 let store = FileStore::open(&path).unwrap();
471 assert_eq!(store.len().unwrap(), 11);
472 let mut buf = [0u8; 5];
473 assert_eq!(store.read_at(6, &mut buf).unwrap(), 5);
474 assert_eq!(&buf, b"world");
475 }
476
477 #[test]
478 fn test_filestore_concurrent_disjoint_writes() {
479 use std::sync::Arc;
480 use std::thread;
481
482 let dir = tempfile::tempdir().unwrap();
483 let path = dir.path().join("concurrent.bin");
484 let store = Arc::new(FileStore::open(&path).unwrap());
485
486 let mut handles = Vec::new();
487 for i in 0..8u64 {
488 let store = Arc::clone(&store);
489 handles.push(thread::spawn(move || {
490 let byte = b'A' + i as u8;
491 store.write_at(i * 4, &[byte; 4]).unwrap();
492 }));
493 }
494 for h in handles {
495 h.join().unwrap();
496 }
497 store.sync().unwrap();
498
499 let mut buf = [0u8; 32];
500 assert_eq!(store.read_at(0, &mut buf).unwrap(), 32);
501 for i in 0..8 {
502 let expected = b'A' + i as u8;
503 assert_eq!(&buf[i * 4..i * 4 + 4], &[expected; 4]);
504 }
505 }
506
507 #[test]
508 fn test_filestore_sync_durable_across_reopen() {
509 let dir = tempfile::tempdir().unwrap();
510 let path = dir.path().join("durable.bin");
511 {
512 let store = FileStore::open(&path).unwrap();
513 store.write_at(0, b"persisted").unwrap();
514 store.sync().unwrap();
515 }
516 let store = FileStore::open(&path).unwrap();
517 let mut buf = [0u8; 9];
518 assert_eq!(store.read_at(0, &mut buf).unwrap(), 9);
519 assert_eq!(&buf, b"persisted");
520 }
521}