1use std::{
18 collections::HashMap,
19 ffi::OsStr,
20 fs::{self, File, OpenOptions},
21 io,
22 path::{Path, PathBuf},
23 sync::{
24 Arc, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard,
25 atomic::{AtomicU64, Ordering},
26 },
27};
28
29use crate::{
30 error::{Result, WalError},
31 store::{WalStore, durable_sync, pread_fill, pwrite_all},
32};
33
34const NAME_DIGITS: usize = 20;
37const NAME_EXT: &str = "wal";
39const HEAD_FILE: &str = "head";
42const HEAD_FILE_LEN: usize = 12;
44
45#[derive(Debug)]
70pub struct SegmentedStore {
71 dir: PathBuf,
72 segment_size: u64,
73 segments: RwLock<HashMap<u64, Arc<File>>>,
74 max_written: AtomicU64,
77 synced_from: AtomicU64,
80 head: AtomicU64,
83}
84
85impl SegmentedStore {
86 pub fn open(dir: impl AsRef<Path>, segment_size: u64) -> Result<Self> {
99 if segment_size == 0 {
100 return Err(WalError::io(
101 "opening the segmented log",
102 io::Error::other("segment size must be non-zero"),
103 ));
104 }
105 let dir = dir.as_ref().to_path_buf();
106 fs::create_dir_all(&dir).map_err(|e| WalError::io("creating the log directory", e))?;
107
108 let mut highest: Option<(u64, u64)> = None; for entry in fs::read_dir(&dir).map_err(|e| WalError::io("reading the log directory", e))? {
111 let entry = entry.map_err(|e| WalError::io("reading the log directory", e))?;
112 if let Some(index) = parse_segment_name(&entry.file_name()) {
113 let len = entry
114 .metadata()
115 .map_err(|e| WalError::io("reading segment metadata", e))?
116 .len();
117 if highest.is_none_or(|(h, _)| index > h) {
118 highest = Some((index, len));
119 }
120 }
121 }
122
123 let total_len = match highest {
124 Some((index, len)) => index.saturating_mul(segment_size).saturating_add(len),
125 None => 0,
126 };
127 let active = total_len / segment_size;
128 let head = read_head_file(&dir)?.unwrap_or(0).min(total_len);
132
133 Ok(SegmentedStore {
134 dir,
135 segment_size,
136 segments: RwLock::new(HashMap::new()),
137 max_written: AtomicU64::new(total_len),
138 synced_from: AtomicU64::new(active),
140 head: AtomicU64::new(head),
141 })
142 }
143
144 #[must_use]
146 pub fn dir(&self) -> &Path {
147 &self.dir
148 }
149
150 #[must_use]
152 pub fn segment_size(&self) -> u64 {
153 self.segment_size
154 }
155
156 fn read_map(&self) -> RwLockReadGuard<'_, HashMap<u64, Arc<File>>> {
157 self.segments.read().unwrap_or_else(PoisonError::into_inner)
158 }
159
160 fn write_map(&self) -> RwLockWriteGuard<'_, HashMap<u64, Arc<File>>> {
161 self.segments
162 .write()
163 .unwrap_or_else(PoisonError::into_inner)
164 }
165
166 fn segment_for_write(&self, index: u64) -> Result<Arc<File>> {
169 if let Some(file) = self.read_map().get(&index) {
170 return Ok(Arc::clone(file));
171 }
172 let mut map = self.write_map();
173 if let Some(file) = map.get(&index) {
174 return Ok(Arc::clone(file));
175 }
176 let path = self.dir.join(segment_name(index));
177 let file = OpenOptions::new()
178 .read(true)
179 .write(true)
180 .create(true)
181 .truncate(false)
182 .open(&path)
183 .map_err(|e| WalError::io("creating a log segment", e))?;
184 let file = Arc::new(file);
185 let _ = map.insert(index, Arc::clone(&file));
186 Ok(file)
187 }
188
189 fn segment_for_read(&self, index: u64) -> Result<Option<Arc<File>>> {
192 if let Some(file) = self.read_map().get(&index) {
193 return Ok(Some(Arc::clone(file)));
194 }
195 let path = self.dir.join(segment_name(index));
196 match OpenOptions::new().read(true).write(true).open(&path) {
197 Ok(file) => {
198 let file = Arc::new(file);
199 let mut map = self.write_map();
200 if let Some(existing) = map.get(&index) {
201 return Ok(Some(Arc::clone(existing)));
202 }
203 let _ = map.insert(index, Arc::clone(&file));
204 Ok(Some(file))
205 }
206 Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(None),
207 Err(error) => Err(WalError::io("opening a log segment", error)),
208 }
209 }
210
211 fn open_segment(&self, index: u64) -> Option<Arc<File>> {
213 self.read_map().get(&index).map(Arc::clone)
214 }
215
216 fn write_head_file(&self, head: u64) -> Result<()> {
222 let mut buf = [0u8; HEAD_FILE_LEN];
223 buf[..8].copy_from_slice(&head.to_le_bytes());
224 let crc = crc32c::crc32c(&buf[..8]);
225 buf[8..].copy_from_slice(&crc.to_le_bytes());
226
227 let path = self.dir.join(HEAD_FILE);
228 let file = OpenOptions::new()
229 .write(true)
230 .create(true)
231 .truncate(true)
232 .open(&path)
233 .map_err(|e| WalError::io("writing the head marker", e))?;
234 pwrite_all(&file, 0, &buf).map_err(|e| WalError::io("writing the head marker", e))?;
235 durable_sync(&file).map_err(|e| WalError::io("flushing the head marker", e))?;
236 Ok(())
237 }
238}
239
240fn read_head_file(dir: &Path) -> Result<Option<u64>> {
247 match fs::read(dir.join(HEAD_FILE)) {
248 Ok(bytes) if bytes.len() >= HEAD_FILE_LEN => {
249 let head = u64::from_le_bytes([
250 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
251 ]);
252 let stored = u32::from_le_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]);
253 if crc32c::crc32c(&bytes[..8]) == stored {
254 Ok(Some(head))
255 } else {
256 Ok(None) }
258 }
259 Ok(_) => Ok(None),
260 Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(None),
261 Err(error) => Err(WalError::io("reading the head marker", error)),
262 }
263}
264
265impl WalStore for SegmentedStore {
266 fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()> {
267 let mut pos = offset;
268 let mut remaining = bytes;
269 while !remaining.is_empty() {
270 let index = pos / self.segment_size;
271 let local = pos % self.segment_size;
272 let room = (self.segment_size - local) as usize;
273 let take = remaining.len().min(room);
274
275 let file = self.segment_for_write(index)?;
276 pwrite_all(&file, local, &remaining[..take])
277 .map_err(|e| WalError::io("writing a record", e))?;
278
279 pos += take as u64;
280 remaining = &remaining[take..];
281 }
282 let end = offset.saturating_add(bytes.len() as u64);
283 let _ = self.max_written.fetch_max(end, Ordering::Relaxed);
284 Ok(())
285 }
286
287 fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize> {
288 let mut pos = offset;
289 let mut filled = 0;
290 while filled < buf.len() {
291 let index = pos / self.segment_size;
292 let local = pos % self.segment_size;
293 let room = (self.segment_size - local) as usize;
294 let want = (buf.len() - filled).min(room);
295
296 let Some(file) = self.segment_for_read(index)? else {
297 break; };
299 let got = pread_fill(&file, local, &mut buf[filled..filled + want])
300 .map_err(|e| WalError::io("reading from the log", e))?;
301 filled += got;
302 pos += got as u64;
303 if got < want {
304 break; }
306 }
307 Ok(filled)
308 }
309
310 fn truncate(&self, len: u64) -> Result<()> {
311 let last_index = len / self.segment_size;
312 let last_local = len % self.segment_size;
313
314 let entries =
316 fs::read_dir(&self.dir).map_err(|e| WalError::io("reading the log directory", e))?;
317 for entry in entries {
318 let entry = entry.map_err(|e| WalError::io("reading the log directory", e))?;
319 let Some(index) = parse_segment_name(&entry.file_name()) else {
320 continue;
321 };
322 match index.cmp(&last_index) {
323 std::cmp::Ordering::Greater => {
324 fs::remove_file(entry.path())
326 .map_err(|e| WalError::io("removing a truncated segment", e))?;
327 let _ = self.write_map().remove(&index);
328 }
329 std::cmp::Ordering::Equal => {
330 let file = self.segment_for_write(index)?;
332 file.set_len(last_local)
333 .map_err(|e| WalError::io("truncating a log segment", e))?;
334 }
335 std::cmp::Ordering::Less => {}
336 }
337 }
338
339 self.max_written.store(len, Ordering::Relaxed);
340 self.synced_from.store(last_index, Ordering::Relaxed);
341 Ok(())
342 }
343
344 fn sync(&self) -> Result<()> {
345 let written = self.max_written.load(Ordering::Acquire);
346 if written == 0 {
347 return Ok(());
348 }
349 let active = (written - 1) / self.segment_size;
351 let from = self.synced_from.load(Ordering::Acquire);
352
353 for index in from..=active {
354 if let Some(file) = self.open_segment(index) {
355 durable_sync(&file).map_err(|e| WalError::io("flushing to stable storage", e))?;
356 }
357 }
358 self.synced_from.store(active, Ordering::Release);
361 Ok(())
362 }
363
364 fn len(&self) -> Result<u64> {
365 Ok(self.max_written.load(Ordering::Acquire))
366 }
367
368 fn head(&self) -> Result<u64> {
369 Ok(self.head.load(Ordering::Acquire))
370 }
371
372 fn truncate_before(&self, offset: u64) -> Result<u64> {
373 let written = self.max_written.load(Ordering::Acquire);
374 let prev = self.head.load(Ordering::Acquire);
380 let new_head = offset.clamp(prev, written);
381
382 self.write_head_file(new_head)?;
386
387 let last_segment = written.saturating_sub(1) / self.segment_size;
390 let keep_from = (new_head / self.segment_size).min(last_segment);
391 let entries =
392 fs::read_dir(&self.dir).map_err(|e| WalError::io("reading the log directory", e))?;
393 for entry in entries {
394 let entry = entry.map_err(|e| WalError::io("reading the log directory", e))?;
395 let Some(index) = parse_segment_name(&entry.file_name()) else {
396 continue;
397 };
398 if index < keep_from {
399 let _ = self.write_map().remove(&index);
402 fs::remove_file(entry.path())
403 .map_err(|e| WalError::io("removing a dropped segment", e))?;
404 }
405 }
406
407 self.head.store(new_head, Ordering::Release);
408 Ok(new_head)
409 }
410}
411
412fn segment_name(index: u64) -> String {
414 format!("{index:0NAME_DIGITS$}.{NAME_EXT}")
415}
416
417fn parse_segment_name(name: &OsStr) -> Option<u64> {
421 let name = name.to_str()?;
422 let stem = name.strip_suffix(&format!(".{NAME_EXT}"))?;
423 if stem.len() != NAME_DIGITS || !stem.bytes().all(|b| b.is_ascii_digit()) {
424 return None;
425 }
426 stem.parse().ok()
427}
428
429#[cfg(test)]
430#[allow(clippy::unwrap_used, clippy::expect_used)]
431mod tests {
432 use super::*;
433
434 #[test]
435 fn test_segment_name_roundtrips() {
436 assert_eq!(segment_name(0), "00000000000000000000.wal");
437 assert_eq!(segment_name(42), "00000000000000000042.wal");
438 assert_eq!(
439 parse_segment_name(OsStr::new("00000000000000000042.wal")),
440 Some(42)
441 );
442 assert_eq!(parse_segment_name(OsStr::new("README.md")), None);
443 assert_eq!(parse_segment_name(OsStr::new("42.wal")), None);
444 assert_eq!(
445 parse_segment_name(OsStr::new("0000000000000000004x.wal")),
446 None
447 );
448 }
449
450 #[test]
451 fn test_write_read_within_one_segment() {
452 let dir = tempfile::tempdir().unwrap();
453 let store = SegmentedStore::open(dir.path(), 64).unwrap();
454 store.write_at(0, b"hello").unwrap();
455 store.sync().unwrap();
456
457 let mut buf = [0u8; 5];
458 assert_eq!(store.read_at(0, &mut buf).unwrap(), 5);
459 assert_eq!(&buf, b"hello");
460 assert_eq!(store.len().unwrap(), 5);
461 }
462
463 #[test]
464 fn test_write_spans_segment_boundary() {
465 let dir = tempfile::tempdir().unwrap();
466 let store = SegmentedStore::open(dir.path(), 8).unwrap();
468 store.write_at(0, b"ABCDEFGHIJKL").unwrap(); store.sync().unwrap();
470
471 assert!(dir.path().join("00000000000000000000.wal").exists());
473 assert!(dir.path().join("00000000000000000001.wal").exists());
474
475 let mut buf = [0u8; 12];
476 assert_eq!(store.read_at(0, &mut buf).unwrap(), 12);
477 assert_eq!(&buf, b"ABCDEFGHIJKL");
478 }
479
480 #[test]
481 fn test_read_at_arbitrary_offset_across_boundary() {
482 let dir = tempfile::tempdir().unwrap();
483 let store = SegmentedStore::open(dir.path(), 4).unwrap();
484 store.write_at(0, b"0123456789").unwrap();
485 let mut buf = [0u8; 5];
486 let n = store.read_at(3, &mut buf).unwrap(); assert_eq!(n, 5);
488 assert_eq!(&buf, b"34567");
489 }
490
491 #[test]
492 fn test_reopen_reports_correct_length() {
493 let dir = tempfile::tempdir().unwrap();
494 {
495 let store = SegmentedStore::open(dir.path(), 8).unwrap();
496 store.write_at(0, b"ABCDEFGHIJKLM").unwrap(); store.sync().unwrap();
498 assert_eq!(store.len().unwrap(), 13);
499 }
500 let store = SegmentedStore::open(dir.path(), 8).unwrap();
501 assert_eq!(store.len().unwrap(), 13);
502 let mut buf = [0u8; 13];
503 assert_eq!(store.read_at(0, &mut buf).unwrap(), 13);
504 assert_eq!(&buf, b"ABCDEFGHIJKLM");
505 }
506
507 #[test]
508 fn test_truncate_removes_later_segments() {
509 let dir = tempfile::tempdir().unwrap();
510 let store = SegmentedStore::open(dir.path(), 8).unwrap();
511 store.write_at(0, &[0xAB; 30]).unwrap(); store.sync().unwrap();
513 assert!(dir.path().join("00000000000000000003.wal").exists());
514
515 store.truncate(10).unwrap(); assert_eq!(store.len().unwrap(), 10);
517 assert!(dir.path().join("00000000000000000001.wal").exists());
518 assert!(!dir.path().join("00000000000000000002.wal").exists());
519 assert!(!dir.path().join("00000000000000000003.wal").exists());
520
521 let mut buf = [0u8; 16];
522 assert_eq!(store.read_at(0, &mut buf).unwrap(), 10);
523 }
524
525 #[test]
526 fn test_read_past_end_is_short() {
527 let dir = tempfile::tempdir().unwrap();
528 let store = SegmentedStore::open(dir.path(), 8).unwrap();
529 store.write_at(0, b"abc").unwrap();
530 let mut buf = [0u8; 16];
531 assert_eq!(store.read_at(0, &mut buf).unwrap(), 3);
532 assert_eq!(store.read_at(100, &mut buf).unwrap(), 0);
533 }
534}