1use crate::error::{JournalError, Result};
2use journal_common::compat::is_multiple_of;
3use std::fs::File;
4#[cfg(not(unix))]
5use std::io::{Read, Seek, SeekFrom, Write};
6use std::ops::{Deref, DerefMut};
7#[cfg(unix)]
8use std::os::unix::fs::FileExt;
9use std::sync::atomic::{Ordering, fence};
10use tracing::error;
11
12pub use memmap2::{Mmap, MmapMut, MmapOptions};
14
15const PAGE_SIZE: u64 = 4096;
16
17#[doc(hidden)]
18#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
19pub enum ExperimentalMmapStrategy {
20 #[default]
21 Windowed,
22 WholeFile,
23}
24
25#[doc(hidden)]
26#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
27pub struct WindowManagerStats {
28 pub strategy: ExperimentalMmapStrategy,
29 pub file_size: u64,
30 pub window_count: usize,
31 pub row_pin_count: usize,
32 pub row_pin_limit: usize,
33 pub row_overflow_object_count: usize,
34 pub current_mapped_bytes: u64,
35 pub max_mapped_bytes: u64,
36 pub map_count: u64,
37 pub remap_count: u64,
38 pub eviction_count: u64,
39}
40
41pub trait MemoryMap: Deref<Target = [u8]> {
42 fn create(file: &File, offset: u64, size: u64) -> Result<Self>
43 where
44 Self: Sized;
45
46 fn create_checked(file: &File, offset: u64, size: u64, file_size: u64) -> Result<Self>
47 where
48 Self: Sized,
49 {
50 let _ = file_size;
51 Self::create(file, offset, size)
52 }
53}
54
55pub trait MemoryMapMut: MemoryMap + DerefMut {
56 fn flush(&self) -> Result<()>;
58}
59
60impl MemoryMap for Mmap {
61 fn create(file: &File, offset: u64, size: u64) -> Result<Self> {
62 let end = offset
63 .checked_add(size)
64 .ok_or(JournalError::ObjectExceedsFileBounds)?;
65 let file_size = file.metadata()?.len();
66 if end > file_size {
67 return Err(JournalError::ObjectExceedsFileBounds);
68 }
69 Self::create_checked(file, offset, size, file_size)
70 }
71
72 fn create_checked(file: &File, offset: u64, size: u64, file_size: u64) -> Result<Self> {
73 let end = offset
74 .checked_add(size)
75 .ok_or(JournalError::ObjectExceedsFileBounds)?;
76 if end > file_size {
77 return Err(JournalError::ObjectExceedsFileBounds);
78 }
79 let mmap = unsafe {
83 MmapOptions::new()
84 .offset(offset)
85 .len(size as usize)
86 .map(file)?
87 };
88
89 Ok(mmap)
90 }
91}
92
93impl MemoryMap for MmapMut {
94 fn create(file: &File, offset: u64, size: u64) -> Result<Self> {
95 let required_size = offset
96 .checked_add(size)
97 .ok_or(JournalError::ObjectExceedsFileBounds)?;
98
99 let mut file_size = file.metadata()?.len();
100 if required_size > file_size {
101 file.set_len(required_size)?;
102 file_size = required_size;
103 }
104 Self::create_checked(file, offset, size, file_size)
105 }
106
107 fn create_checked(file: &File, offset: u64, size: u64, file_size: u64) -> Result<Self> {
108 let required_size = offset
109 .checked_add(size)
110 .ok_or(JournalError::ObjectExceedsFileBounds)?;
111 if required_size > file_size {
112 return Err(JournalError::ObjectExceedsFileBounds);
113 }
114
115 let mmap = unsafe {
119 MmapOptions::new()
120 .offset(offset)
121 .len(size as usize)
122 .map_mut(file)?
123 };
124
125 Ok(mmap)
126 }
127}
128
129impl MemoryMapMut for MmapMut {
130 fn flush(&self) -> Result<()> {
131 MmapMut::flush(self)?;
132 Ok(())
133 }
134}
135
136struct Window<M: MemoryMap> {
137 offset: u64,
138 size: u64,
139 mmap: M,
140 row_pinned: bool,
141}
142
143impl<M: MemoryMap> std::fmt::Debug for Window<M> {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 f.debug_struct("Window")
146 .field("offset", &self.offset)
147 .field("size", &self.size)
148 .finish()
149 }
150}
151
152impl<M: MemoryMap> Window<M> {
153 fn end_offset(&self) -> Option<u64> {
154 self.offset.checked_add(self.size)
155 }
156
157 fn contains(&self, position: u64) -> bool {
158 self.end_offset()
159 .is_some_and(|end_offset| position >= self.offset && position < end_offset)
160 }
161
162 fn contains_range(&self, position: u64, size: u64) -> bool {
163 let Some(end) = position.checked_add(size) else {
164 return false;
165 };
166 self.end_offset()
167 .is_some_and(|end_offset| position >= self.offset && end <= end_offset)
168 }
169
170 fn get_slice(&self, position: u64, size: u64) -> &[u8] {
171 debug_assert!(self.contains_range(position, size));
172
173 let offset = (position - self.offset) as usize;
174 &self.mmap[offset..offset + size as usize]
175 }
176}
177
178impl<M: MemoryMapMut> Window<M> {
179 pub fn get_mut_slice(&mut self, position: u64, size: u64) -> &mut [u8] {
180 debug_assert!(self.contains_range(position, size));
181
182 let offset = (position - self.offset) as usize;
183 &mut self.mmap[offset..offset + size as usize]
184 }
185}
186
187pub struct WindowManager<M: MemoryMap> {
188 file: File,
189 file_size: u64,
190 bounds_mode: BoundsMode,
191 strategy: ExperimentalMmapStrategy,
192 chunk_size: u64,
193 active_window_idx: Option<usize>,
194 max_windows: usize,
195 windows: Vec<Window<M>>,
196 row_pin_count: usize,
197 map_count: u64,
198 remap_count: u64,
199 eviction_count: u64,
200 max_mapped_bytes: u64,
201 row_overflow_objects: Vec<Box<[u8]>>,
202}
203
204#[derive(Clone, Copy, Debug, Eq, PartialEq)]
205enum BoundsMode {
206 LiveFile,
207 Snapshot,
208 WriterOwned,
209}
210
211impl<M: MemoryMap> WindowManager<M> {
212 pub fn new(file: File, chunk_size: u64, max_windows: usize) -> Result<Self> {
213 Self::new_with_strategy(
214 file,
215 chunk_size,
216 max_windows,
217 ExperimentalMmapStrategy::Windowed,
218 )
219 }
220
221 pub fn new_with_strategy(
222 file: File,
223 chunk_size: u64,
224 max_windows: usize,
225 strategy: ExperimentalMmapStrategy,
226 ) -> Result<Self> {
227 Self::new_with_bounds_mode(
228 file,
229 chunk_size,
230 max_windows,
231 BoundsMode::LiveFile,
232 strategy,
233 )
234 }
235
236 pub fn new_snapshot(
237 file: File,
238 chunk_size: u64,
239 max_windows: usize,
240 strategy: ExperimentalMmapStrategy,
241 ) -> Result<Self> {
242 Self::new_with_bounds_mode(
243 file,
244 chunk_size,
245 max_windows,
246 BoundsMode::Snapshot,
247 strategy,
248 )
249 }
250
251 pub fn new_writer_owned(file: File, chunk_size: u64, max_windows: usize) -> Result<Self> {
252 Self::new_writer_owned_with_strategy(
253 file,
254 chunk_size,
255 max_windows,
256 ExperimentalMmapStrategy::Windowed,
257 )
258 }
259
260 pub fn new_writer_owned_with_strategy(
261 file: File,
262 chunk_size: u64,
263 max_windows: usize,
264 strategy: ExperimentalMmapStrategy,
265 ) -> Result<Self> {
266 Self::new_with_bounds_mode(
267 file,
268 chunk_size,
269 max_windows,
270 BoundsMode::WriterOwned,
271 strategy,
272 )
273 }
274
275 fn new_with_bounds_mode(
276 file: File,
277 chunk_size: u64,
278 max_windows: usize,
279 bounds_mode: BoundsMode,
280 strategy: ExperimentalMmapStrategy,
281 ) -> Result<Self> {
282 debug_assert!(chunk_size != 0 && is_multiple_of(chunk_size, PAGE_SIZE));
283 debug_assert!(max_windows != 0);
284
285 let file_size = file.metadata()?.len();
286
287 Ok(WindowManager {
288 file,
289 file_size,
290 bounds_mode,
291 strategy,
292 chunk_size,
293 max_windows,
294 windows: Vec::new(),
295 row_pin_count: 0,
296 active_window_idx: None,
297 map_count: 0,
298 remap_count: 0,
299 eviction_count: 0,
300 max_mapped_bytes: 0,
301 row_overflow_objects: Vec::new(),
302 })
303 }
304
305 pub fn stats(&self) -> WindowManagerStats {
306 let current_mapped_bytes = self.current_mapped_bytes();
307 WindowManagerStats {
308 strategy: self.strategy,
309 file_size: self.file_size,
310 window_count: self.windows.len(),
311 row_pin_count: self.row_pin_count,
312 row_pin_limit: self.max_windows,
313 row_overflow_object_count: self.row_overflow_objects.len(),
314 current_mapped_bytes,
315 max_mapped_bytes: self.max_mapped_bytes.max(current_mapped_bytes),
316 map_count: self.map_count,
317 remap_count: self.remap_count,
318 eviction_count: self.eviction_count,
319 }
320 }
321
322 fn current_mapped_bytes(&self) -> u64 {
323 self.windows.iter().map(|window| window.size).sum()
324 }
325
326 fn record_mapped_bytes(&mut self) {
327 self.max_mapped_bytes = self.max_mapped_bytes.max(self.current_mapped_bytes());
328 }
329
330 fn refresh_file_size(&mut self) -> Result<u64> {
331 self.file_size = self.file.metadata()?.len();
332 Ok(self.file_size)
333 }
334
335 fn ensure_cached_file_contains(&mut self, end: u64) -> Result<()> {
336 if end <= self.file_size {
337 return Ok(());
338 }
339 if self.bounds_mode == BoundsMode::LiveFile && end <= self.refresh_file_size()? {
340 return Ok(());
341 }
342 Err(JournalError::ObjectExceedsFileBounds)
343 }
344
345 pub(crate) fn read_exact_at(&mut self, position: u64, output: &mut [u8]) -> Result<()> {
346 let end = position
347 .checked_add(output.len() as u64)
348 .ok_or(JournalError::ObjectExceedsFileBounds)?;
349 self.ensure_cached_file_contains(end)?;
350
351 #[cfg(unix)]
352 {
353 let mut read = 0usize;
354 while read < output.len() {
355 let bytes_read = self
356 .file
357 .read_at(&mut output[read..], position + read as u64)?;
358 if bytes_read == 0 {
359 return Err(JournalError::Io(std::io::Error::new(
360 std::io::ErrorKind::UnexpectedEof,
361 "short journal file read",
362 )));
363 }
364 read += bytes_read;
365 }
366 }
367
368 #[cfg(not(unix))]
369 {
370 self.file.seek(SeekFrom::Start(position))?;
371 self.file.read_exact(output)?;
372 }
373
374 Ok(())
375 }
376
377 fn get_chunk_aligned_start(&self, position: u64) -> u64 {
378 (position / self.chunk_size) * self.chunk_size
379 }
380
381 fn get_chunk_aligned_end(&self, position: u64) -> Result<u64> {
382 position
383 .div_ceil(self.chunk_size)
384 .checked_mul(self.chunk_size)
385 .ok_or(JournalError::ObjectExceedsFileBounds)
386 }
387
388 fn create_window(&mut self, window_start: u64, chunk_count: u64) -> Result<Window<M>> {
389 debug_assert_ne!(chunk_count, 0);
390
391 let requested_size = chunk_count
392 .checked_mul(self.chunk_size)
393 .ok_or(JournalError::ObjectExceedsFileBounds)?;
394 let requested_end = window_start
395 .checked_add(requested_size)
396 .ok_or(JournalError::ObjectExceedsFileBounds)?;
397 let size = match self.bounds_mode {
398 BoundsMode::LiveFile => {
399 if window_start >= self.file_size {
400 self.refresh_file_size()?;
401 }
402 if window_start >= self.file_size {
403 return Err(JournalError::ObjectExceedsFileBounds);
404 }
405 requested_size.min(self.file_size - window_start)
406 }
407 BoundsMode::Snapshot => {
408 if window_start >= self.file_size {
409 return Err(JournalError::ObjectExceedsFileBounds);
410 }
411 requested_size.min(self.file_size - window_start)
412 }
413 BoundsMode::WriterOwned => {
414 if requested_end > self.file_size {
415 self.file.set_len(requested_end)?;
416 self.file_size = requested_end;
417 }
418 requested_size
419 }
420 };
421 let mmap =
422 M::create_checked(&self.file, window_start, size, self.file_size).map_err(|e| {
423 error!(
424 window_start,
425 size,
426 chunk_count,
427 chunk_size = self.chunk_size,
428 "mmap failed: {e}"
429 );
430 e
431 })?;
432 self.map_count += 1;
433 Ok(Window {
434 offset: window_start,
435 size,
436 mmap,
437 row_pinned: false,
438 })
439 }
440
441 fn lookup_window_by_range(&self, position: u64, size_needed: u64) -> Option<usize> {
442 if let Some(idx) = self.active_window_idx {
443 if self.windows[idx].contains_range(position, size_needed) {
444 return Some(idx);
445 }
446 }
447
448 for (idx, window) in self.windows.iter().enumerate() {
449 if window.contains_range(position, size_needed) {
450 return Some(idx);
451 }
452 }
453
454 None
455 }
456
457 fn lookup_window_by_position(&self, position: u64) -> Option<usize> {
458 if let Some(idx) = self.active_window_idx {
459 if self.windows[idx].contains(position) {
460 return Some(idx);
461 }
462 }
463
464 for (idx, window) in self.windows.iter().enumerate() {
465 if window.contains(position) {
466 return Some(idx);
467 }
468 }
469
470 None
471 }
472
473 pub(crate) fn active_slice_if_contains(&self, position: u64, size: u64) -> Option<&[u8]> {
474 let idx = self.active_window_idx?;
475 let window = &self.windows[idx];
476 if window.contains_range(position, size) {
477 Some(window.get_slice(position, size))
478 } else {
479 None
480 }
481 }
482
483 pub(crate) fn active_window_contains(&self, position: u64, size: u64) -> bool {
484 self.active_window_idx
485 .and_then(|idx| self.windows.get(idx))
486 .is_some_and(|window| window.contains_range(position, size))
487 }
488
489 pub(crate) fn active_slice(&self, position: u64, size: u64) -> &[u8] {
490 let idx = self
491 .active_window_idx
492 .expect("active window should exist when active_window_contains returned true");
493 let window = &self.windows[idx];
494 debug_assert!(window.contains_range(position, size));
495 window.get_slice(position, size)
496 }
497
498 pub(crate) fn clear_row_pins(&mut self) {
499 if self.row_pin_count == 0 {
500 self.row_overflow_objects.clear();
501 return;
502 }
503 for window in &mut self.windows {
504 window.row_pinned = false;
505 }
506 self.row_pin_count = 0;
507 self.row_overflow_objects.clear();
508 }
509
510 #[inline(always)]
511 pub(crate) fn row_pin_limit_reached(&self) -> bool {
512 self.strategy != ExperimentalMmapStrategy::WholeFile
513 && self.row_pin_count >= self.max_windows
514 }
515
516 #[cold]
517 #[inline(never)]
518 fn get_row_overflow_slice(&mut self, position: u64, size: u64) -> Result<&[u8]> {
519 let len = usize::try_from(size).map_err(|_| JournalError::ObjectExceedsFileBounds)?;
520 let mut data = vec![0u8; len].into_boxed_slice();
521 self.read_exact_at(position, &mut data)?;
522 self.row_overflow_objects.push(data);
523 Ok(self
524 .row_overflow_objects
525 .last()
526 .expect("just pushed row overflow object")
527 .as_ref())
528 }
529
530 pub(crate) fn get_row_pinned_slice(&mut self, position: u64, size: u64) -> Result<&[u8]> {
531 let end = position
532 .checked_add(size)
533 .ok_or(JournalError::ObjectExceedsFileBounds)?;
534 self.ensure_cached_file_contains(end)?;
535 let Some(idx) = self.get_window_index_preserving_row_pins(position, size)? else {
536 return self.get_row_overflow_slice(position, size);
537 };
538 self.active_window_idx = Some(idx);
539 if !self.windows[idx].row_pinned {
540 if self.row_pin_limit_reached() {
541 return self.get_row_overflow_slice(position, size);
542 }
543 self.windows[idx].row_pinned = true;
544 self.row_pin_count += 1;
545 }
546 let window = &mut self.windows[idx];
547 Ok(window.get_slice(position, size))
548 }
549
550 fn push_window(&mut self, window: Window<M>) -> usize {
551 self.windows.push(window);
552 self.record_mapped_bytes();
553 self.windows.len() - 1
554 }
555
556 fn chunk_span_for_range(&self, position: u64, size_needed: u64) -> Result<(u64, u64)> {
557 let range_end = position
558 .checked_add(size_needed)
559 .ok_or(JournalError::ObjectExceedsFileBounds)?;
560 let window_start = self.get_chunk_aligned_start(position);
561 let window_end = self.get_chunk_aligned_end(range_end)?;
562 Ok((window_start, (window_end - window_start) / self.chunk_size))
563 }
564
565 fn push_new_window_for_range(&mut self, position: u64, size_needed: u64) -> Result<usize> {
566 let (window_start, num_chunks) = self.chunk_span_for_range(position, size_needed)?;
567 let new_window = self.create_window(window_start, num_chunks)?;
568 Ok(self.push_window(new_window))
569 }
570
571 fn replace_window_for_range(
572 &mut self,
573 idx: usize,
574 position: u64,
575 size_needed: u64,
576 ) -> Result<usize> {
577 let (window_start, num_chunks) = self.chunk_span_for_range(position, size_needed)?;
578 let _window = self.windows.remove(idx);
579 self.active_window_idx = None;
580 let new_window = self.create_window(window_start, num_chunks)?;
581 self.remap_count += 1;
582 Ok(self.push_window(new_window))
583 }
584
585 fn evict_unpinned_window_if_full(&mut self) -> bool {
586 if self.windows.len() < self.max_windows {
587 return true;
588 }
589 let Some(idx) = self.windows.iter().position(|window| !window.row_pinned) else {
590 return false;
591 };
592 self.windows.remove(idx);
593 self.eviction_count += 1;
594 self.active_window_idx = None;
595 true
596 }
597
598 fn make_room_for_new_window(&mut self) {
599 if self.windows.len() < self.max_windows {
600 return;
601 }
602 let idx = if self.row_pin_count == 0 {
603 Some(
604 if self.active_window_idx == Some(0) && self.windows.len() > 1 {
605 1
606 } else {
607 0
608 },
609 )
610 } else {
611 self.windows.iter().position(|window| !window.row_pinned)
612 };
613 if let Some(idx) = idx {
614 self.windows.remove(idx);
615 self.eviction_count += 1;
616 self.active_window_idx = None;
617 }
618 }
619
620 fn get_window_index_preserving_row_pins(
621 &mut self,
622 position: u64,
623 size_needed: u64,
624 ) -> Result<Option<usize>> {
625 if self.strategy == ExperimentalMmapStrategy::WholeFile {
626 return self.get_whole_file_window_index_preserving_row_pins(position, size_needed);
627 }
628
629 if let Some(idx) = self.lookup_window_by_range(position, size_needed) {
630 return Ok(Some(idx));
631 }
632
633 if let Some(idx) = self.lookup_window_by_position(position) {
634 if !self.windows[idx].row_pinned {
635 if self.row_pin_limit_reached() {
636 return Ok(None);
637 }
638 return Ok(Some(self.replace_window_for_range(
643 idx,
644 position,
645 size_needed,
646 )?));
647 }
648 }
653
654 if !self.evict_unpinned_window_if_full() {
655 return Ok(None);
656 }
657
658 Ok(Some(self.push_new_window_for_range(position, size_needed)?))
659 }
660
661 fn get_whole_file_window_index_preserving_row_pins(
662 &mut self,
663 position: u64,
664 size_needed: u64,
665 ) -> Result<Option<usize>> {
666 let idx = self.get_whole_file_window_index(position, size_needed)?;
667 if !self.windows[idx].row_pinned {
668 self.windows[idx].row_pinned = true;
669 self.row_pin_count += 1;
670 }
671 Ok(Some(idx))
672 }
673
674 fn get_window_index(&mut self, position: u64, size_needed: u64) -> Result<usize> {
675 if self.strategy == ExperimentalMmapStrategy::WholeFile {
676 return self.get_whole_file_window_index(position, size_needed);
677 }
678 if let Some(idx) = self.lookup_window_by_range(position, size_needed) {
679 self.active_window_idx = Some(idx);
680 return Ok(idx);
681 }
682 if let Some(idx) = self.lookup_window_by_position(position) {
683 return self.get_overlapping_window_index(idx, position, size_needed);
684 }
685 self.get_new_window_index(position, size_needed)
686 }
687
688 fn get_overlapping_window_index(
689 &mut self,
690 idx: usize,
691 position: u64,
692 size_needed: u64,
693 ) -> Result<usize> {
694 if self.row_pin_count > 0 && self.windows[idx].row_pinned {
695 self.evict_unpinned_window_if_full();
698 let idx = self.push_new_window_for_range(position, size_needed)?;
699 self.active_window_idx = Some(idx);
700 return Ok(idx);
701 }
702 let idx = self.replace_window_for_range(idx, position, size_needed)?;
703 self.active_window_idx = Some(idx);
704 Ok(idx)
705 }
706
707 fn get_new_window_index(&mut self, position: u64, size_needed: u64) -> Result<usize> {
708 self.make_room_for_new_window();
709 let idx = self.push_new_window_for_range(position, size_needed)?;
710 self.active_window_idx = Some(idx);
711 Ok(idx)
712 }
713
714 fn get_window(&mut self, position: u64, size_needed: u64) -> Result<&mut Window<M>> {
715 let idx = self.get_window_index(position, size_needed)?;
716 Ok(&mut self.windows[idx])
717 }
718
719 fn get_whole_file_window_index(&mut self, position: u64, size_needed: u64) -> Result<usize> {
720 if let Some(idx) = self.lookup_window_by_range(position, size_needed) {
721 self.active_window_idx = Some(idx);
722 return Ok(idx);
723 }
724
725 let requested_end = position
726 .checked_add(size_needed)
727 .ok_or(JournalError::ObjectExceedsFileBounds)?;
728 match self.bounds_mode {
729 BoundsMode::LiveFile | BoundsMode::Snapshot => {
730 self.ensure_cached_file_contains(requested_end)?
731 }
732 BoundsMode::WriterOwned => {}
733 }
734 let target_end = requested_end.max(self.file_size);
735 let window_end = self.get_chunk_aligned_end(target_end)?;
736 let chunk_count = (window_end / self.chunk_size).max(1);
737
738 let had_windows = !self.windows.is_empty();
739 if had_windows {
740 self.windows.clear();
741 self.active_window_idx = None;
742 self.row_pin_count = 0;
743 self.row_overflow_objects.clear();
744 }
745
746 let new_window = self.create_window(0, chunk_count)?;
747 if had_windows {
748 self.remap_count += 1;
749 }
750 let idx = self.push_window(new_window);
751 self.active_window_idx = Some(idx);
752 Ok(idx)
753 }
754
755 pub fn get_slice(&mut self, position: u64, size: u64) -> Result<&[u8]> {
756 let end = position
757 .checked_add(size)
758 .ok_or(JournalError::ObjectExceedsFileBounds)?;
759 self.ensure_cached_file_contains(end)?;
760 let window = self.get_window(position, size)?;
761 Ok(window.get_slice(position, size))
762 }
763}
764
765impl<M: MemoryMapMut> WindowManager<M> {
766 pub fn get_slice_mut(&mut self, position: u64, size: u64) -> Result<&mut [u8]> {
767 let _end = position
768 .checked_add(size)
769 .ok_or(JournalError::ObjectExceedsFileBounds)?;
770 let window = self.get_window(position, size)?;
771 Ok(window.get_mut_slice(position, size))
772 }
773
774 pub fn sync(&mut self, logical_size: u64, header_bytes: &[u8]) -> Result<()> {
776 for window in &self.windows {
777 window.mmap.flush()?;
778 }
779 self.windows.clear();
780 self.active_window_idx = None;
781 self.row_pin_count = 0;
782 self.row_overflow_objects.clear();
783 self.file.set_len(logical_size)?;
784 #[cfg(unix)]
785 {
786 let mut written = 0usize;
787 while written < header_bytes.len() {
788 written += self
789 .file
790 .write_at(&header_bytes[written..], written as u64)?;
791 }
792 }
793 #[cfg(not(unix))]
794 {
795 self.file.seek(SeekFrom::Start(0))?;
796 self.file.write_all(header_bytes)?;
797 }
798 self.file.sync_data()?;
799 self.file_size = logical_size;
800 Ok(())
801 }
802
803 pub fn post_change(&mut self, logical_size: u64) -> Result<()> {
806 fence(Ordering::SeqCst);
807 if logical_size < self.file_size {
808 self.windows.clear();
809 self.active_window_idx = None;
810 self.row_pin_count = 0;
811 self.row_overflow_objects.clear();
812 }
813 self.file.set_len(logical_size)?;
814 self.file_size = logical_size;
815 Ok(())
816 }
817}
818
819#[cfg(test)]
820mod tests {
821 use super::*;
822 use crate::error::JournalError;
823 use std::cell::Cell;
824 use std::io::Write;
825 use std::rc::Rc;
826 use tempfile::NamedTempFile;
827
828 const PAGE_SIZE_TEST: u64 = 4096;
829
830 struct FailingMmap {
833 data: Vec<u8>,
834 }
835
836 impl Deref for FailingMmap {
837 type Target = [u8];
838 fn deref(&self) -> &[u8] {
839 &self.data
840 }
841 }
842
843 struct MockController {
845 fail_next_create: Cell<bool>,
846 create_count: Cell<usize>,
847 }
848
849 impl MockController {
850 fn new() -> Self {
851 Self {
852 fail_next_create: Cell::new(false),
853 create_count: Cell::new(0),
854 }
855 }
856
857 fn set_fail_next(&self, fail: bool) {
858 self.fail_next_create.set(fail);
859 }
860
861 fn should_fail(&self) -> bool {
862 let count = self.create_count.get();
863 self.create_count.set(count + 1);
864 self.fail_next_create.get()
865 }
866 }
867
868 thread_local! {
870 static MOCK_CONTROLLER: Rc<MockController> = Rc::new(MockController::new());
871 }
872
873 impl MemoryMap for FailingMmap {
874 fn create(_file: &File, _offset: u64, size: u64) -> Result<Self> {
875 let mmap_size = size as usize;
876 MOCK_CONTROLLER.with(|ctrl| {
877 if ctrl.should_fail() {
878 return Err(JournalError::Io(std::io::Error::new(
879 std::io::ErrorKind::Other,
880 "simulated mmap failure",
881 )));
882 }
883 Ok(FailingMmap {
885 data: vec![0u8; mmap_size],
886 })
887 })
888 }
889 }
890
891 #[test]
902 fn test_consistent_state_after_failed_remap() {
903 let mut temp_file = NamedTempFile::new().unwrap();
905 temp_file.write_all(&[0u8; 8192]).unwrap();
906 temp_file.flush().unwrap();
907
908 let file = File::open(temp_file.path()).unwrap();
909
910 let mut wm: WindowManager<FailingMmap> =
912 WindowManager::new(file, PAGE_SIZE_TEST, 1).unwrap();
913
914 MOCK_CONTROLLER.with(|ctrl| {
916 ctrl.set_fail_next(false);
917 ctrl.create_count.set(0);
918 });
919
920 {
922 let slice = wm.get_slice(0, 100).unwrap();
923 assert_eq!(slice.len(), 100);
924 }
925 assert_eq!(wm.windows.len(), 1);
926 assert_eq!(wm.active_window_idx, Some(0));
927
928 MOCK_CONTROLLER.with(|ctrl| ctrl.set_fail_next(true));
930
931 let remap_result = wm.get_slice(100, 4000);
938 assert!(remap_result.is_err(), "Expected remap to fail");
939
940 assert_eq!(wm.windows.len(), 0);
944 assert_eq!(wm.active_window_idx, None);
945
946 MOCK_CONTROLLER.with(|ctrl| ctrl.set_fail_next(false));
948
949 let result = wm.get_slice(0, 100);
951 assert!(
952 result.is_ok(),
953 "Expected get_slice to succeed after recovery"
954 );
955 assert_eq!(wm.windows.len(), 1);
956 }
957
958 #[test]
969 fn test_consistent_state_after_failed_eviction() {
970 let mut temp_file = NamedTempFile::new().unwrap();
972 temp_file.write_all(&[0u8; 8192]).unwrap();
973 temp_file.flush().unwrap();
974
975 let file = File::open(temp_file.path()).unwrap();
976
977 let mut wm: WindowManager<FailingMmap> =
979 WindowManager::new(file, PAGE_SIZE_TEST, 1).unwrap();
980
981 MOCK_CONTROLLER.with(|ctrl| {
983 ctrl.set_fail_next(false);
984 ctrl.create_count.set(0);
985 });
986
987 {
989 let _slice = wm.get_slice(0, 100).unwrap();
990 }
991 assert_eq!(wm.windows.len(), 1);
992 assert_eq!(wm.active_window_idx, Some(0));
993
994 MOCK_CONTROLLER.with(|ctrl| ctrl.set_fail_next(true));
996
997 let result = wm.get_slice(4096, 100);
1005 assert!(result.is_err(), "Expected mmap to fail");
1006
1007 assert_eq!(wm.windows.len(), 0);
1011 assert_eq!(wm.active_window_idx, None);
1012
1013 MOCK_CONTROLLER.with(|ctrl| ctrl.set_fail_next(false));
1015
1016 let result = wm.get_slice(0, 100);
1018 assert!(
1019 result.is_ok(),
1020 "Expected get_slice to succeed after recovery"
1021 );
1022 assert_eq!(wm.windows.len(), 1);
1023 }
1024
1025 #[test]
1026 fn row_pinned_slice_uses_overflow_storage_at_window_limit_one() {
1027 let mut temp_file = NamedTempFile::new().unwrap();
1028 temp_file
1029 .write_all(&vec![1u8; PAGE_SIZE_TEST as usize])
1030 .unwrap();
1031 temp_file
1032 .write_all(&vec![2u8; PAGE_SIZE_TEST as usize])
1033 .unwrap();
1034 temp_file.flush().unwrap();
1035
1036 let file = File::open(temp_file.path()).unwrap();
1037 let mut wm: WindowManager<Mmap> = WindowManager::new(file, PAGE_SIZE_TEST, 1).unwrap();
1038
1039 let first = wm.get_row_pinned_slice(0, 16).unwrap();
1040 let first_ptr = first.as_ptr();
1041 let first_len = first.len();
1042 assert_eq!(first, &[1u8; 16]);
1043
1044 let second = wm.get_row_pinned_slice(PAGE_SIZE_TEST, 16).unwrap();
1045 assert_eq!(second, &[2u8; 16]);
1046
1047 let stats = wm.stats();
1048 assert_eq!(stats.row_pin_limit, 1);
1049 assert_eq!(stats.row_pin_count, 1);
1050 assert_eq!(stats.window_count, 1);
1051 assert_eq!(stats.row_overflow_object_count, 1);
1052
1053 let first_after_overflow = unsafe { std::slice::from_raw_parts(first_ptr, first_len) };
1058 assert_eq!(first_after_overflow, &[1u8; 16]);
1059
1060 wm.clear_row_pins();
1061 let stats = wm.stats();
1062 assert_eq!(stats.row_pin_count, 0);
1063 assert_eq!(stats.row_overflow_object_count, 0);
1064 }
1065
1066 #[test]
1067 fn live_reader_refreshes_file_size_only_when_access_exceeds_cache() {
1068 let mut temp_file = NamedTempFile::new().unwrap();
1069 temp_file
1070 .write_all(&vec![1u8; PAGE_SIZE_TEST as usize])
1071 .unwrap();
1072 temp_file.flush().unwrap();
1073
1074 let file = File::open(temp_file.path()).unwrap();
1075 let mut wm: WindowManager<Mmap> = WindowManager::new(file, PAGE_SIZE_TEST, 2).unwrap();
1076 assert_eq!(wm.stats().file_size, PAGE_SIZE_TEST);
1077
1078 assert_eq!(wm.get_slice(0, 16).unwrap(), &[1u8; 16]);
1079
1080 temp_file
1081 .write_all(&vec![2u8; PAGE_SIZE_TEST as usize])
1082 .unwrap();
1083 temp_file.flush().unwrap();
1084
1085 assert_eq!(wm.get_slice(128, 16).unwrap(), &[1u8; 16]);
1086 assert_eq!(wm.stats().file_size, PAGE_SIZE_TEST);
1087
1088 assert_eq!(wm.get_slice(PAGE_SIZE_TEST + 128, 16).unwrap(), &[2u8; 16]);
1089 assert_eq!(wm.stats().file_size, PAGE_SIZE_TEST * 2);
1090 }
1091
1092 #[test]
1093 fn snapshot_reader_does_not_refresh_file_size_after_growth() {
1094 let mut temp_file = NamedTempFile::new().unwrap();
1095 temp_file
1096 .write_all(&vec![1u8; PAGE_SIZE_TEST as usize])
1097 .unwrap();
1098 temp_file.flush().unwrap();
1099
1100 let file = File::open(temp_file.path()).unwrap();
1101 let mut wm: WindowManager<Mmap> = WindowManager::new_snapshot(
1102 file,
1103 PAGE_SIZE_TEST,
1104 2,
1105 ExperimentalMmapStrategy::Windowed,
1106 )
1107 .unwrap();
1108 assert_eq!(wm.stats().file_size, PAGE_SIZE_TEST);
1109
1110 temp_file
1111 .write_all(&vec![2u8; PAGE_SIZE_TEST as usize])
1112 .unwrap();
1113 temp_file.flush().unwrap();
1114
1115 assert!(matches!(
1116 wm.get_slice(PAGE_SIZE_TEST + 128, 16).unwrap_err(),
1117 JournalError::ObjectExceedsFileBounds
1118 ));
1119 assert_eq!(wm.stats().file_size, PAGE_SIZE_TEST);
1120 }
1121
1122 #[test]
1123 fn snapshot_whole_file_maps_cached_file_once() {
1124 let temp_file = NamedTempFile::new().unwrap();
1125 temp_file.as_file().set_len(PAGE_SIZE_TEST * 2).unwrap();
1126 let file = std::fs::OpenOptions::new()
1127 .read(true)
1128 .open(temp_file.path())
1129 .unwrap();
1130 let mut wm: WindowManager<Mmap> = WindowManager::new_snapshot(
1131 file,
1132 PAGE_SIZE_TEST,
1133 32,
1134 ExperimentalMmapStrategy::WholeFile,
1135 )
1136 .unwrap();
1137
1138 assert_eq!(wm.get_slice(PAGE_SIZE_TEST + 128, 16).unwrap(), &[0; 16]);
1139 assert_eq!(wm.get_slice(128, 16).unwrap(), &[0; 16]);
1140
1141 let stats = wm.stats();
1142 assert_eq!(stats.strategy, ExperimentalMmapStrategy::WholeFile);
1143 assert_eq!(stats.file_size, PAGE_SIZE_TEST * 2);
1144 assert_eq!(stats.current_mapped_bytes, PAGE_SIZE_TEST * 2);
1145 assert_eq!(stats.map_count, 1);
1146 assert_eq!(stats.remap_count, 0);
1147 }
1148
1149 #[test]
1150 fn snapshot_whole_file_does_not_refresh_file_size_after_growth() {
1151 let mut temp_file = NamedTempFile::new().unwrap();
1152 temp_file
1153 .write_all(&vec![1u8; PAGE_SIZE_TEST as usize])
1154 .unwrap();
1155 temp_file.flush().unwrap();
1156
1157 let file = File::open(temp_file.path()).unwrap();
1158 let mut wm: WindowManager<Mmap> = WindowManager::new_snapshot(
1159 file,
1160 PAGE_SIZE_TEST,
1161 32,
1162 ExperimentalMmapStrategy::WholeFile,
1163 )
1164 .unwrap();
1165 assert_eq!(wm.get_slice(128, 16).unwrap(), &[1u8; 16]);
1166
1167 temp_file
1168 .write_all(&vec![2u8; PAGE_SIZE_TEST as usize])
1169 .unwrap();
1170 temp_file.flush().unwrap();
1171
1172 assert!(matches!(
1173 wm.get_slice(PAGE_SIZE_TEST + 128, 16).unwrap_err(),
1174 JournalError::ObjectExceedsFileBounds
1175 ));
1176 assert_eq!(wm.stats().file_size, PAGE_SIZE_TEST);
1177 }
1178
1179 #[test]
1180 fn live_whole_file_maps_cached_file_once_and_remaps_on_growth() {
1181 let mut temp_file = NamedTempFile::new().unwrap();
1182 temp_file
1183 .write_all(&vec![1u8; PAGE_SIZE_TEST as usize])
1184 .unwrap();
1185 temp_file.flush().unwrap();
1186
1187 let file = File::open(temp_file.path()).unwrap();
1188 let mut wm: WindowManager<Mmap> = WindowManager::new_with_strategy(
1189 file,
1190 PAGE_SIZE_TEST,
1191 32,
1192 ExperimentalMmapStrategy::WholeFile,
1193 )
1194 .unwrap();
1195
1196 assert_eq!(wm.get_slice(128, 16).unwrap(), &[1u8; 16]);
1197 let stats = wm.stats();
1198 assert_eq!(stats.strategy, ExperimentalMmapStrategy::WholeFile);
1199 assert_eq!(stats.file_size, PAGE_SIZE_TEST);
1200 assert_eq!(stats.current_mapped_bytes, PAGE_SIZE_TEST);
1201 assert_eq!(stats.map_count, 1);
1202 assert_eq!(stats.remap_count, 0);
1203
1204 temp_file
1205 .write_all(&vec![2u8; PAGE_SIZE_TEST as usize])
1206 .unwrap();
1207 temp_file.flush().unwrap();
1208
1209 assert_eq!(wm.get_slice(256, 16).unwrap(), &[1u8; 16]);
1210 let stats = wm.stats();
1211 assert_eq!(stats.file_size, PAGE_SIZE_TEST);
1212 assert_eq!(stats.map_count, 1);
1213 assert_eq!(stats.remap_count, 0);
1214
1215 assert_eq!(wm.get_slice(PAGE_SIZE_TEST + 128, 16).unwrap(), &[2u8; 16]);
1216 let stats = wm.stats();
1217 assert_eq!(stats.file_size, PAGE_SIZE_TEST * 2);
1218 assert_eq!(stats.current_mapped_bytes, PAGE_SIZE_TEST * 2);
1219 assert_eq!(stats.map_count, 2);
1220 assert_eq!(stats.remap_count, 1);
1221 }
1222
1223 #[test]
1224 fn whole_file_writer_owned_remaps_after_post_change_growth() {
1225 let temp_file = NamedTempFile::new().unwrap();
1226 temp_file.as_file().set_len(PAGE_SIZE_TEST).unwrap();
1227 let file = std::fs::OpenOptions::new()
1228 .read(true)
1229 .write(true)
1230 .open(temp_file.path())
1231 .unwrap();
1232 let mut wm: WindowManager<MmapMut> = WindowManager::new_writer_owned_with_strategy(
1233 file,
1234 PAGE_SIZE_TEST,
1235 32,
1236 ExperimentalMmapStrategy::WholeFile,
1237 )
1238 .unwrap();
1239
1240 wm.get_slice_mut(0, 16).unwrap().copy_from_slice(&[1; 16]);
1241 assert_eq!(wm.stats().current_mapped_bytes, PAGE_SIZE_TEST);
1242
1243 wm.post_change(PAGE_SIZE_TEST * 2).unwrap();
1244 assert_eq!(wm.get_slice(0, 16).unwrap(), &[1; 16]);
1245
1246 let new_offset = PAGE_SIZE_TEST + 128;
1247 wm.get_slice_mut(new_offset, 16)
1248 .unwrap()
1249 .copy_from_slice(&[2; 16]);
1250 assert_eq!(wm.get_slice(new_offset, 16).unwrap(), &[2; 16]);
1251
1252 let stats = wm.stats();
1253 assert_eq!(stats.current_mapped_bytes, PAGE_SIZE_TEST * 2);
1254 assert_eq!(stats.max_mapped_bytes, PAGE_SIZE_TEST * 2);
1255 assert_eq!(stats.remap_count, 1);
1256 }
1257
1258 #[test]
1259 fn post_change_drops_mappings_before_truncating_oversized_windows() {
1260 let temp_file = NamedTempFile::new().unwrap();
1261 temp_file.as_file().set_len(PAGE_SIZE_TEST).unwrap();
1262 let file = std::fs::OpenOptions::new()
1263 .read(true)
1264 .write(true)
1265 .open(temp_file.path())
1266 .unwrap();
1267 let oversized_window = PAGE_SIZE_TEST * 4;
1268 let mut wm: WindowManager<MmapMut> = WindowManager::new_writer_owned_with_strategy(
1269 file,
1270 oversized_window,
1271 32,
1272 ExperimentalMmapStrategy::WholeFile,
1273 )
1274 .unwrap();
1275
1276 wm.get_slice_mut(0, 16).unwrap().copy_from_slice(&[1; 16]);
1277 assert_eq!(wm.stats().current_mapped_bytes, oversized_window);
1278
1279 wm.post_change(PAGE_SIZE_TEST * 2).unwrap();
1280 let stats_after_truncate = wm.stats();
1281 assert_eq!(stats_after_truncate.file_size, PAGE_SIZE_TEST * 2);
1282 assert_eq!(stats_after_truncate.current_mapped_bytes, 0);
1283 assert_eq!(stats_after_truncate.window_count, 0);
1284
1285 let crossing_offset = PAGE_SIZE_TEST * 2 - 1024;
1286 let crossing_payload = vec![2; 2048];
1287 wm.get_slice_mut(crossing_offset, 2048)
1288 .unwrap()
1289 .copy_from_slice(&crossing_payload);
1290 assert_eq!(
1291 wm.get_slice(crossing_offset, 2048).unwrap(),
1292 crossing_payload.as_slice()
1293 );
1294 assert_eq!(wm.stats().current_mapped_bytes, oversized_window);
1295 }
1296
1297 #[test]
1298 fn sequential_boundary_crossing_slides_window_instead_of_growing_from_start() {
1299 let mut temp_file = NamedTempFile::new().unwrap();
1300 temp_file.write_all(&[0u8; 64 * 1024]).unwrap();
1301 temp_file.flush().unwrap();
1302
1303 let file = File::open(temp_file.path()).unwrap();
1304 let mut wm: WindowManager<FailingMmap> =
1305 WindowManager::new(file, PAGE_SIZE_TEST, 1).unwrap();
1306
1307 MOCK_CONTROLLER.with(|ctrl| {
1308 ctrl.set_fail_next(false);
1309 ctrl.create_count.set(0);
1310 });
1311
1312 let _ = wm.get_slice(0, 100).unwrap();
1313 assert_eq!(wm.windows[0].offset, 0);
1314 assert_eq!(wm.windows[0].size, PAGE_SIZE_TEST);
1315
1316 let _ = wm.get_slice(PAGE_SIZE_TEST - 6, 32).unwrap();
1317 assert_eq!(wm.windows[0].offset, 0);
1318 assert_eq!(wm.windows[0].size, PAGE_SIZE_TEST * 2);
1319
1320 let _ = wm.get_slice((PAGE_SIZE_TEST * 2) - 12, 32).unwrap();
1321 assert_eq!(wm.windows[0].offset, PAGE_SIZE_TEST);
1322 assert_eq!(wm.windows[0].size, PAGE_SIZE_TEST * 2);
1323
1324 let _ = wm.get_slice((PAGE_SIZE_TEST * 3) - 20, 32).unwrap();
1325 assert_eq!(wm.windows[0].offset, PAGE_SIZE_TEST * 2);
1326 assert_eq!(wm.windows[0].size, PAGE_SIZE_TEST * 2);
1327 }
1328}