1use std::{
18 collections::HashMap,
19 ffi::OsStr,
20 fs::{self, File, OpenOptions},
21 io,
22 path::{Path, PathBuf},
23 sync::{
24 Arc, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard,
25 atomic::{AtomicU64, Ordering},
26 },
27};
28
29use crate::{
30 error::{Result, WalError},
31 store::{WalStore, durable_sync, pread_fill, pwrite_all},
32};
33
34const NAME_DIGITS: usize = 20;
37const NAME_EXT: &str = "wal";
39
40#[derive(Debug)]
65pub struct SegmentedStore {
66 dir: PathBuf,
67 segment_size: u64,
68 segments: RwLock<HashMap<u64, Arc<File>>>,
69 max_written: AtomicU64,
72 synced_from: AtomicU64,
75}
76
77impl SegmentedStore {
78 pub fn open(dir: impl AsRef<Path>, segment_size: u64) -> Result<Self> {
91 if segment_size == 0 {
92 return Err(WalError::io(
93 "opening the segmented log",
94 io::Error::other("segment size must be non-zero"),
95 ));
96 }
97 let dir = dir.as_ref().to_path_buf();
98 fs::create_dir_all(&dir).map_err(|e| WalError::io("creating the log directory", e))?;
99
100 let mut highest: Option<(u64, u64)> = None; for entry in fs::read_dir(&dir).map_err(|e| WalError::io("reading the log directory", e))? {
103 let entry = entry.map_err(|e| WalError::io("reading the log directory", e))?;
104 if let Some(index) = parse_segment_name(&entry.file_name()) {
105 let len = entry
106 .metadata()
107 .map_err(|e| WalError::io("reading segment metadata", e))?
108 .len();
109 if highest.is_none_or(|(h, _)| index > h) {
110 highest = Some((index, len));
111 }
112 }
113 }
114
115 let total_len = match highest {
116 Some((index, len)) => index.saturating_mul(segment_size).saturating_add(len),
117 None => 0,
118 };
119 let active = total_len / segment_size;
120
121 Ok(SegmentedStore {
122 dir,
123 segment_size,
124 segments: RwLock::new(HashMap::new()),
125 max_written: AtomicU64::new(total_len),
126 synced_from: AtomicU64::new(active),
128 })
129 }
130
131 #[must_use]
133 pub fn dir(&self) -> &Path {
134 &self.dir
135 }
136
137 #[must_use]
139 pub fn segment_size(&self) -> u64 {
140 self.segment_size
141 }
142
143 fn read_map(&self) -> RwLockReadGuard<'_, HashMap<u64, Arc<File>>> {
144 self.segments.read().unwrap_or_else(PoisonError::into_inner)
145 }
146
147 fn write_map(&self) -> RwLockWriteGuard<'_, HashMap<u64, Arc<File>>> {
148 self.segments
149 .write()
150 .unwrap_or_else(PoisonError::into_inner)
151 }
152
153 fn segment_for_write(&self, index: u64) -> Result<Arc<File>> {
156 if let Some(file) = self.read_map().get(&index) {
157 return Ok(Arc::clone(file));
158 }
159 let mut map = self.write_map();
160 if let Some(file) = map.get(&index) {
161 return Ok(Arc::clone(file));
162 }
163 let path = self.dir.join(segment_name(index));
164 let file = OpenOptions::new()
165 .read(true)
166 .write(true)
167 .create(true)
168 .truncate(false)
169 .open(&path)
170 .map_err(|e| WalError::io("creating a log segment", e))?;
171 let file = Arc::new(file);
172 let _ = map.insert(index, Arc::clone(&file));
173 Ok(file)
174 }
175
176 fn segment_for_read(&self, index: u64) -> Result<Option<Arc<File>>> {
179 if let Some(file) = self.read_map().get(&index) {
180 return Ok(Some(Arc::clone(file)));
181 }
182 let path = self.dir.join(segment_name(index));
183 match OpenOptions::new().read(true).write(true).open(&path) {
184 Ok(file) => {
185 let file = Arc::new(file);
186 let mut map = self.write_map();
187 if let Some(existing) = map.get(&index) {
188 return Ok(Some(Arc::clone(existing)));
189 }
190 let _ = map.insert(index, Arc::clone(&file));
191 Ok(Some(file))
192 }
193 Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(None),
194 Err(error) => Err(WalError::io("opening a log segment", error)),
195 }
196 }
197
198 fn open_segment(&self, index: u64) -> Option<Arc<File>> {
200 self.read_map().get(&index).map(Arc::clone)
201 }
202}
203
204impl WalStore for SegmentedStore {
205 fn write_at(&self, offset: u64, bytes: &[u8]) -> Result<()> {
206 let mut pos = offset;
207 let mut remaining = bytes;
208 while !remaining.is_empty() {
209 let index = pos / self.segment_size;
210 let local = pos % self.segment_size;
211 let room = (self.segment_size - local) as usize;
212 let take = remaining.len().min(room);
213
214 let file = self.segment_for_write(index)?;
215 pwrite_all(&file, local, &remaining[..take])
216 .map_err(|e| WalError::io("writing a record", e))?;
217
218 pos += take as u64;
219 remaining = &remaining[take..];
220 }
221 let end = offset.saturating_add(bytes.len() as u64);
222 let _ = self.max_written.fetch_max(end, Ordering::Relaxed);
223 Ok(())
224 }
225
226 fn read_at(&self, offset: u64, buf: &mut [u8]) -> Result<usize> {
227 let mut pos = offset;
228 let mut filled = 0;
229 while filled < buf.len() {
230 let index = pos / self.segment_size;
231 let local = pos % self.segment_size;
232 let room = (self.segment_size - local) as usize;
233 let want = (buf.len() - filled).min(room);
234
235 let Some(file) = self.segment_for_read(index)? else {
236 break; };
238 let got = pread_fill(&file, local, &mut buf[filled..filled + want])
239 .map_err(|e| WalError::io("reading from the log", e))?;
240 filled += got;
241 pos += got as u64;
242 if got < want {
243 break; }
245 }
246 Ok(filled)
247 }
248
249 fn truncate(&self, len: u64) -> Result<()> {
250 let last_index = len / self.segment_size;
251 let last_local = len % self.segment_size;
252
253 let entries =
255 fs::read_dir(&self.dir).map_err(|e| WalError::io("reading the log directory", e))?;
256 for entry in entries {
257 let entry = entry.map_err(|e| WalError::io("reading the log directory", e))?;
258 let Some(index) = parse_segment_name(&entry.file_name()) else {
259 continue;
260 };
261 match index.cmp(&last_index) {
262 std::cmp::Ordering::Greater => {
263 fs::remove_file(entry.path())
265 .map_err(|e| WalError::io("removing a truncated segment", e))?;
266 let _ = self.write_map().remove(&index);
267 }
268 std::cmp::Ordering::Equal => {
269 let file = self.segment_for_write(index)?;
271 file.set_len(last_local)
272 .map_err(|e| WalError::io("truncating a log segment", e))?;
273 }
274 std::cmp::Ordering::Less => {}
275 }
276 }
277
278 self.max_written.store(len, Ordering::Relaxed);
279 self.synced_from.store(last_index, Ordering::Relaxed);
280 Ok(())
281 }
282
283 fn sync(&self) -> Result<()> {
284 let written = self.max_written.load(Ordering::Acquire);
285 if written == 0 {
286 return Ok(());
287 }
288 let active = (written - 1) / self.segment_size;
290 let from = self.synced_from.load(Ordering::Acquire);
291
292 for index in from..=active {
293 if let Some(file) = self.open_segment(index) {
294 durable_sync(&file).map_err(|e| WalError::io("flushing to stable storage", e))?;
295 }
296 }
297 self.synced_from.store(active, Ordering::Release);
300 Ok(())
301 }
302
303 fn len(&self) -> Result<u64> {
304 Ok(self.max_written.load(Ordering::Acquire))
305 }
306}
307
308fn segment_name(index: u64) -> String {
310 format!("{index:0NAME_DIGITS$}.{NAME_EXT}")
311}
312
313fn parse_segment_name(name: &OsStr) -> Option<u64> {
317 let name = name.to_str()?;
318 let stem = name.strip_suffix(&format!(".{NAME_EXT}"))?;
319 if stem.len() != NAME_DIGITS || !stem.bytes().all(|b| b.is_ascii_digit()) {
320 return None;
321 }
322 stem.parse().ok()
323}
324
325#[cfg(test)]
326#[allow(clippy::unwrap_used, clippy::expect_used)]
327mod tests {
328 use super::*;
329
330 #[test]
331 fn test_segment_name_roundtrips() {
332 assert_eq!(segment_name(0), "00000000000000000000.wal");
333 assert_eq!(segment_name(42), "00000000000000000042.wal");
334 assert_eq!(
335 parse_segment_name(OsStr::new("00000000000000000042.wal")),
336 Some(42)
337 );
338 assert_eq!(parse_segment_name(OsStr::new("README.md")), None);
339 assert_eq!(parse_segment_name(OsStr::new("42.wal")), None);
340 assert_eq!(
341 parse_segment_name(OsStr::new("0000000000000000004x.wal")),
342 None
343 );
344 }
345
346 #[test]
347 fn test_write_read_within_one_segment() {
348 let dir = tempfile::tempdir().unwrap();
349 let store = SegmentedStore::open(dir.path(), 64).unwrap();
350 store.write_at(0, b"hello").unwrap();
351 store.sync().unwrap();
352
353 let mut buf = [0u8; 5];
354 assert_eq!(store.read_at(0, &mut buf).unwrap(), 5);
355 assert_eq!(&buf, b"hello");
356 assert_eq!(store.len().unwrap(), 5);
357 }
358
359 #[test]
360 fn test_write_spans_segment_boundary() {
361 let dir = tempfile::tempdir().unwrap();
362 let store = SegmentedStore::open(dir.path(), 8).unwrap();
364 store.write_at(0, b"ABCDEFGHIJKL").unwrap(); store.sync().unwrap();
366
367 assert!(dir.path().join("00000000000000000000.wal").exists());
369 assert!(dir.path().join("00000000000000000001.wal").exists());
370
371 let mut buf = [0u8; 12];
372 assert_eq!(store.read_at(0, &mut buf).unwrap(), 12);
373 assert_eq!(&buf, b"ABCDEFGHIJKL");
374 }
375
376 #[test]
377 fn test_read_at_arbitrary_offset_across_boundary() {
378 let dir = tempfile::tempdir().unwrap();
379 let store = SegmentedStore::open(dir.path(), 4).unwrap();
380 store.write_at(0, b"0123456789").unwrap();
381 let mut buf = [0u8; 5];
382 let n = store.read_at(3, &mut buf).unwrap(); assert_eq!(n, 5);
384 assert_eq!(&buf, b"34567");
385 }
386
387 #[test]
388 fn test_reopen_reports_correct_length() {
389 let dir = tempfile::tempdir().unwrap();
390 {
391 let store = SegmentedStore::open(dir.path(), 8).unwrap();
392 store.write_at(0, b"ABCDEFGHIJKLM").unwrap(); store.sync().unwrap();
394 assert_eq!(store.len().unwrap(), 13);
395 }
396 let store = SegmentedStore::open(dir.path(), 8).unwrap();
397 assert_eq!(store.len().unwrap(), 13);
398 let mut buf = [0u8; 13];
399 assert_eq!(store.read_at(0, &mut buf).unwrap(), 13);
400 assert_eq!(&buf, b"ABCDEFGHIJKLM");
401 }
402
403 #[test]
404 fn test_truncate_removes_later_segments() {
405 let dir = tempfile::tempdir().unwrap();
406 let store = SegmentedStore::open(dir.path(), 8).unwrap();
407 store.write_at(0, &[0xAB; 30]).unwrap(); store.sync().unwrap();
409 assert!(dir.path().join("00000000000000000003.wal").exists());
410
411 store.truncate(10).unwrap(); assert_eq!(store.len().unwrap(), 10);
413 assert!(dir.path().join("00000000000000000001.wal").exists());
414 assert!(!dir.path().join("00000000000000000002.wal").exists());
415 assert!(!dir.path().join("00000000000000000003.wal").exists());
416
417 let mut buf = [0u8; 16];
418 assert_eq!(store.read_at(0, &mut buf).unwrap(), 10);
419 }
420
421 #[test]
422 fn test_read_past_end_is_short() {
423 let dir = tempfile::tempdir().unwrap();
424 let store = SegmentedStore::open(dir.path(), 8).unwrap();
425 store.write_at(0, b"abc").unwrap();
426 let mut buf = [0u8; 16];
427 assert_eq!(store.read_at(0, &mut buf).unwrap(), 3);
428 assert_eq!(store.read_at(100, &mut buf).unwrap(), 0);
429 }
430}