Skip to main content

journal_core/file/
mmap.rs

1use crate::error::{JournalError, Result};
2use journal_common::compat::is_multiple_of;
3use std::fs::File;
4#[cfg(not(unix))]
5use std::io::{Read, Seek, SeekFrom, Write};
6use std::ops::{Deref, DerefMut};
7#[cfg(unix)]
8use std::os::unix::fs::FileExt;
9use std::sync::atomic::{Ordering, fence};
10use tracing::error;
11
12// Re-export memmap2 types for other crates and import for internal use
13pub use memmap2::{Mmap, MmapMut, MmapOptions};
14
15const PAGE_SIZE: u64 = 4096;
16
17#[doc(hidden)]
18#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
19pub enum ExperimentalMmapStrategy {
20    #[default]
21    Windowed,
22    WholeFile,
23}
24
25#[doc(hidden)]
26#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
27pub struct WindowManagerStats {
28    pub strategy: ExperimentalMmapStrategy,
29    pub file_size: u64,
30    pub window_count: usize,
31    pub row_pin_count: usize,
32    pub row_pin_limit: usize,
33    pub row_overflow_object_count: usize,
34    pub current_mapped_bytes: u64,
35    pub max_mapped_bytes: u64,
36    pub map_count: u64,
37    pub remap_count: u64,
38    pub eviction_count: u64,
39}
40
41pub trait MemoryMap: Deref<Target = [u8]> {
42    fn create(file: &File, offset: u64, size: u64) -> Result<Self>
43    where
44        Self: Sized;
45
46    fn create_checked(file: &File, offset: u64, size: u64, file_size: u64) -> Result<Self>
47    where
48        Self: Sized,
49    {
50        let _ = file_size;
51        Self::create(file, offset, size)
52    }
53}
54
55pub trait MemoryMapMut: MemoryMap + DerefMut {
56    /// Flushes outstanding memory map modifications to disk
57    fn flush(&self) -> Result<()>;
58}
59
60impl MemoryMap for Mmap {
61    fn create(file: &File, offset: u64, size: u64) -> Result<Self> {
62        let end = offset
63            .checked_add(size)
64            .ok_or(JournalError::ObjectExceedsFileBounds)?;
65        let file_size = file.metadata()?.len();
66        if end > file_size {
67            return Err(JournalError::ObjectExceedsFileBounds);
68        }
69        Self::create_checked(file, offset, size, file_size)
70    }
71
72    fn create_checked(file: &File, offset: u64, size: u64, file_size: u64) -> Result<Self> {
73        let end = offset
74            .checked_add(size)
75            .ok_or(JournalError::ObjectExceedsFileBounds)?;
76        if end > file_size {
77            return Err(JournalError::ObjectExceedsFileBounds);
78        }
79        // SAFETY: `offset + size` was checked against the current file size
80        // above, so this read-only mapping stays within file bounds.
81        // nosemgrep: rust.lang.security.unsafe-usage.unsafe-usage
82        let mmap = unsafe {
83            MmapOptions::new()
84                .offset(offset)
85                .len(size as usize)
86                .map(file)?
87        };
88
89        Ok(mmap)
90    }
91}
92
93impl MemoryMap for MmapMut {
94    fn create(file: &File, offset: u64, size: u64) -> Result<Self> {
95        let required_size = offset
96            .checked_add(size)
97            .ok_or(JournalError::ObjectExceedsFileBounds)?;
98
99        let mut file_size = file.metadata()?.len();
100        if required_size > file_size {
101            file.set_len(required_size)?;
102            file_size = required_size;
103        }
104        Self::create_checked(file, offset, size, file_size)
105    }
106
107    fn create_checked(file: &File, offset: u64, size: u64, file_size: u64) -> Result<Self> {
108        let required_size = offset
109            .checked_add(size)
110            .ok_or(JournalError::ObjectExceedsFileBounds)?;
111        if required_size > file_size {
112            return Err(JournalError::ObjectExceedsFileBounds);
113        }
114
115        // SAFETY: `required_size` was checked against the file size above, and
116        // `create` extends the file before calling this checked constructor.
117        // nosemgrep: rust.lang.security.unsafe-usage.unsafe-usage
118        let mmap = unsafe {
119            MmapOptions::new()
120                .offset(offset)
121                .len(size as usize)
122                .map_mut(file)?
123        };
124
125        Ok(mmap)
126    }
127}
128
129impl MemoryMapMut for MmapMut {
130    fn flush(&self) -> Result<()> {
131        MmapMut::flush(self)?;
132        Ok(())
133    }
134}
135
136struct Window<M: MemoryMap> {
137    offset: u64,
138    size: u64,
139    mmap: M,
140    row_pinned: bool,
141}
142
143impl<M: MemoryMap> std::fmt::Debug for Window<M> {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        f.debug_struct("Window")
146            .field("offset", &self.offset)
147            .field("size", &self.size)
148            .finish()
149    }
150}
151
152impl<M: MemoryMap> Window<M> {
153    fn end_offset(&self) -> Option<u64> {
154        self.offset.checked_add(self.size)
155    }
156
157    fn contains(&self, position: u64) -> bool {
158        self.end_offset()
159            .is_some_and(|end_offset| position >= self.offset && position < end_offset)
160    }
161
162    fn contains_range(&self, position: u64, size: u64) -> bool {
163        let Some(end) = position.checked_add(size) else {
164            return false;
165        };
166        self.end_offset()
167            .is_some_and(|end_offset| position >= self.offset && end <= end_offset)
168    }
169
170    fn get_slice(&self, position: u64, size: u64) -> &[u8] {
171        debug_assert!(self.contains_range(position, size));
172
173        let offset = (position - self.offset) as usize;
174        &self.mmap[offset..offset + size as usize]
175    }
176}
177
178impl<M: MemoryMapMut> Window<M> {
179    pub fn get_mut_slice(&mut self, position: u64, size: u64) -> &mut [u8] {
180        debug_assert!(self.contains_range(position, size));
181
182        let offset = (position - self.offset) as usize;
183        &mut self.mmap[offset..offset + size as usize]
184    }
185}
186
187pub struct WindowManager<M: MemoryMap> {
188    file: File,
189    file_size: u64,
190    bounds_mode: BoundsMode,
191    strategy: ExperimentalMmapStrategy,
192    chunk_size: u64,
193    active_window_idx: Option<usize>,
194    max_windows: usize,
195    windows: Vec<Window<M>>,
196    row_pin_count: usize,
197    map_count: u64,
198    remap_count: u64,
199    eviction_count: u64,
200    max_mapped_bytes: u64,
201    row_overflow_objects: Vec<Box<[u8]>>,
202}
203
204#[derive(Clone, Copy, Debug, Eq, PartialEq)]
205enum BoundsMode {
206    LiveFile,
207    Snapshot,
208    WriterOwned,
209}
210
211impl<M: MemoryMap> WindowManager<M> {
212    pub fn new(file: File, chunk_size: u64, max_windows: usize) -> Result<Self> {
213        Self::new_with_strategy(
214            file,
215            chunk_size,
216            max_windows,
217            ExperimentalMmapStrategy::Windowed,
218        )
219    }
220
221    pub fn new_with_strategy(
222        file: File,
223        chunk_size: u64,
224        max_windows: usize,
225        strategy: ExperimentalMmapStrategy,
226    ) -> Result<Self> {
227        Self::new_with_bounds_mode(
228            file,
229            chunk_size,
230            max_windows,
231            BoundsMode::LiveFile,
232            strategy,
233        )
234    }
235
236    pub fn new_snapshot(
237        file: File,
238        chunk_size: u64,
239        max_windows: usize,
240        strategy: ExperimentalMmapStrategy,
241    ) -> Result<Self> {
242        Self::new_with_bounds_mode(
243            file,
244            chunk_size,
245            max_windows,
246            BoundsMode::Snapshot,
247            strategy,
248        )
249    }
250
251    pub fn new_writer_owned(file: File, chunk_size: u64, max_windows: usize) -> Result<Self> {
252        Self::new_writer_owned_with_strategy(
253            file,
254            chunk_size,
255            max_windows,
256            ExperimentalMmapStrategy::Windowed,
257        )
258    }
259
260    pub fn new_writer_owned_with_strategy(
261        file: File,
262        chunk_size: u64,
263        max_windows: usize,
264        strategy: ExperimentalMmapStrategy,
265    ) -> Result<Self> {
266        Self::new_with_bounds_mode(
267            file,
268            chunk_size,
269            max_windows,
270            BoundsMode::WriterOwned,
271            strategy,
272        )
273    }
274
275    fn new_with_bounds_mode(
276        file: File,
277        chunk_size: u64,
278        max_windows: usize,
279        bounds_mode: BoundsMode,
280        strategy: ExperimentalMmapStrategy,
281    ) -> Result<Self> {
282        debug_assert!(chunk_size != 0 && is_multiple_of(chunk_size, PAGE_SIZE));
283        debug_assert!(max_windows != 0);
284
285        let file_size = file.metadata()?.len();
286
287        Ok(WindowManager {
288            file,
289            file_size,
290            bounds_mode,
291            strategy,
292            chunk_size,
293            max_windows,
294            windows: Vec::new(),
295            row_pin_count: 0,
296            active_window_idx: None,
297            map_count: 0,
298            remap_count: 0,
299            eviction_count: 0,
300            max_mapped_bytes: 0,
301            row_overflow_objects: Vec::new(),
302        })
303    }
304
305    pub fn stats(&self) -> WindowManagerStats {
306        let current_mapped_bytes = self.current_mapped_bytes();
307        WindowManagerStats {
308            strategy: self.strategy,
309            file_size: self.file_size,
310            window_count: self.windows.len(),
311            row_pin_count: self.row_pin_count,
312            row_pin_limit: self.max_windows,
313            row_overflow_object_count: self.row_overflow_objects.len(),
314            current_mapped_bytes,
315            max_mapped_bytes: self.max_mapped_bytes.max(current_mapped_bytes),
316            map_count: self.map_count,
317            remap_count: self.remap_count,
318            eviction_count: self.eviction_count,
319        }
320    }
321
322    fn current_mapped_bytes(&self) -> u64 {
323        self.windows.iter().map(|window| window.size).sum()
324    }
325
326    fn record_mapped_bytes(&mut self) {
327        self.max_mapped_bytes = self.max_mapped_bytes.max(self.current_mapped_bytes());
328    }
329
330    fn refresh_file_size(&mut self) -> Result<u64> {
331        self.file_size = self.file.metadata()?.len();
332        Ok(self.file_size)
333    }
334
335    fn ensure_cached_file_contains(&mut self, end: u64) -> Result<()> {
336        if end <= self.file_size {
337            return Ok(());
338        }
339        if self.bounds_mode == BoundsMode::LiveFile && end <= self.refresh_file_size()? {
340            return Ok(());
341        }
342        Err(JournalError::ObjectExceedsFileBounds)
343    }
344
345    pub(crate) fn read_exact_at(&mut self, position: u64, output: &mut [u8]) -> Result<()> {
346        let end = position
347            .checked_add(output.len() as u64)
348            .ok_or(JournalError::ObjectExceedsFileBounds)?;
349        self.ensure_cached_file_contains(end)?;
350
351        #[cfg(unix)]
352        {
353            let mut read = 0usize;
354            while read < output.len() {
355                let bytes_read = self
356                    .file
357                    .read_at(&mut output[read..], position + read as u64)?;
358                if bytes_read == 0 {
359                    return Err(JournalError::Io(std::io::Error::new(
360                        std::io::ErrorKind::UnexpectedEof,
361                        "short journal file read",
362                    )));
363                }
364                read += bytes_read;
365            }
366        }
367
368        #[cfg(not(unix))]
369        {
370            self.file.seek(SeekFrom::Start(position))?;
371            self.file.read_exact(output)?;
372        }
373
374        Ok(())
375    }
376
377    fn get_chunk_aligned_start(&self, position: u64) -> u64 {
378        (position / self.chunk_size) * self.chunk_size
379    }
380
381    fn get_chunk_aligned_end(&self, position: u64) -> Result<u64> {
382        position
383            .div_ceil(self.chunk_size)
384            .checked_mul(self.chunk_size)
385            .ok_or(JournalError::ObjectExceedsFileBounds)
386    }
387
388    fn create_window(&mut self, window_start: u64, chunk_count: u64) -> Result<Window<M>> {
389        debug_assert_ne!(chunk_count, 0);
390
391        let requested_size = chunk_count
392            .checked_mul(self.chunk_size)
393            .ok_or(JournalError::ObjectExceedsFileBounds)?;
394        let requested_end = window_start
395            .checked_add(requested_size)
396            .ok_or(JournalError::ObjectExceedsFileBounds)?;
397        let size = match self.bounds_mode {
398            BoundsMode::LiveFile => {
399                if window_start >= self.file_size {
400                    self.refresh_file_size()?;
401                }
402                if window_start >= self.file_size {
403                    return Err(JournalError::ObjectExceedsFileBounds);
404                }
405                requested_size.min(self.file_size - window_start)
406            }
407            BoundsMode::Snapshot => {
408                if window_start >= self.file_size {
409                    return Err(JournalError::ObjectExceedsFileBounds);
410                }
411                requested_size.min(self.file_size - window_start)
412            }
413            BoundsMode::WriterOwned => {
414                if requested_end > self.file_size {
415                    self.file.set_len(requested_end)?;
416                    self.file_size = requested_end;
417                }
418                requested_size
419            }
420        };
421        let mmap =
422            M::create_checked(&self.file, window_start, size, self.file_size).map_err(|e| {
423                error!(
424                    window_start,
425                    size,
426                    chunk_count,
427                    chunk_size = self.chunk_size,
428                    "mmap failed: {e}"
429                );
430                e
431            })?;
432        self.map_count += 1;
433        Ok(Window {
434            offset: window_start,
435            size,
436            mmap,
437            row_pinned: false,
438        })
439    }
440
441    fn lookup_window_by_range(&self, position: u64, size_needed: u64) -> Option<usize> {
442        if let Some(idx) = self.active_window_idx {
443            if self.windows[idx].contains_range(position, size_needed) {
444                return Some(idx);
445            }
446        }
447
448        for (idx, window) in self.windows.iter().enumerate() {
449            if window.contains_range(position, size_needed) {
450                return Some(idx);
451            }
452        }
453
454        None
455    }
456
457    fn lookup_window_by_position(&self, position: u64) -> Option<usize> {
458        if let Some(idx) = self.active_window_idx {
459            if self.windows[idx].contains(position) {
460                return Some(idx);
461            }
462        }
463
464        for (idx, window) in self.windows.iter().enumerate() {
465            if window.contains(position) {
466                return Some(idx);
467            }
468        }
469
470        None
471    }
472
473    pub(crate) fn active_slice_if_contains(&self, position: u64, size: u64) -> Option<&[u8]> {
474        let idx = self.active_window_idx?;
475        let window = &self.windows[idx];
476        if window.contains_range(position, size) {
477            Some(window.get_slice(position, size))
478        } else {
479            None
480        }
481    }
482
483    pub(crate) fn active_window_contains(&self, position: u64, size: u64) -> bool {
484        self.active_window_idx
485            .and_then(|idx| self.windows.get(idx))
486            .is_some_and(|window| window.contains_range(position, size))
487    }
488
489    pub(crate) fn active_slice(&self, position: u64, size: u64) -> &[u8] {
490        let idx = self
491            .active_window_idx
492            .expect("active window should exist when active_window_contains returned true");
493        let window = &self.windows[idx];
494        debug_assert!(window.contains_range(position, size));
495        window.get_slice(position, size)
496    }
497
498    pub(crate) fn clear_row_pins(&mut self) {
499        if self.row_pin_count == 0 {
500            self.row_overflow_objects.clear();
501            return;
502        }
503        for window in &mut self.windows {
504            window.row_pinned = false;
505        }
506        self.row_pin_count = 0;
507        self.row_overflow_objects.clear();
508    }
509
510    #[inline(always)]
511    pub(crate) fn row_pin_limit_reached(&self) -> bool {
512        self.strategy != ExperimentalMmapStrategy::WholeFile
513            && self.row_pin_count >= self.max_windows
514    }
515
516    #[cold]
517    #[inline(never)]
518    fn get_row_overflow_slice(&mut self, position: u64, size: u64) -> Result<&[u8]> {
519        let len = usize::try_from(size).map_err(|_| JournalError::ObjectExceedsFileBounds)?;
520        let mut data = vec![0u8; len].into_boxed_slice();
521        self.read_exact_at(position, &mut data)?;
522        self.row_overflow_objects.push(data);
523        Ok(self
524            .row_overflow_objects
525            .last()
526            .expect("just pushed row overflow object")
527            .as_ref())
528    }
529
530    pub(crate) fn get_row_pinned_slice(&mut self, position: u64, size: u64) -> Result<&[u8]> {
531        let end = position
532            .checked_add(size)
533            .ok_or(JournalError::ObjectExceedsFileBounds)?;
534        self.ensure_cached_file_contains(end)?;
535        let Some(idx) = self.get_window_index_preserving_row_pins(position, size)? else {
536            return self.get_row_overflow_slice(position, size);
537        };
538        self.active_window_idx = Some(idx);
539        if !self.windows[idx].row_pinned {
540            if self.row_pin_limit_reached() {
541                return self.get_row_overflow_slice(position, size);
542            }
543            self.windows[idx].row_pinned = true;
544            self.row_pin_count += 1;
545        }
546        let window = &mut self.windows[idx];
547        Ok(window.get_slice(position, size))
548    }
549
550    fn push_window(&mut self, window: Window<M>) -> usize {
551        self.windows.push(window);
552        self.record_mapped_bytes();
553        self.windows.len() - 1
554    }
555
556    fn chunk_span_for_range(&self, position: u64, size_needed: u64) -> Result<(u64, u64)> {
557        let range_end = position
558            .checked_add(size_needed)
559            .ok_or(JournalError::ObjectExceedsFileBounds)?;
560        let window_start = self.get_chunk_aligned_start(position);
561        let window_end = self.get_chunk_aligned_end(range_end)?;
562        Ok((window_start, (window_end - window_start) / self.chunk_size))
563    }
564
565    fn push_new_window_for_range(&mut self, position: u64, size_needed: u64) -> Result<usize> {
566        let (window_start, num_chunks) = self.chunk_span_for_range(position, size_needed)?;
567        let new_window = self.create_window(window_start, num_chunks)?;
568        Ok(self.push_window(new_window))
569    }
570
571    fn replace_window_for_range(
572        &mut self,
573        idx: usize,
574        position: u64,
575        size_needed: u64,
576    ) -> Result<usize> {
577        let (window_start, num_chunks) = self.chunk_span_for_range(position, size_needed)?;
578        let _window = self.windows.remove(idx);
579        self.active_window_idx = None;
580        let new_window = self.create_window(window_start, num_chunks)?;
581        self.remap_count += 1;
582        Ok(self.push_window(new_window))
583    }
584
585    fn evict_unpinned_window_if_full(&mut self) -> bool {
586        if self.windows.len() < self.max_windows {
587            return true;
588        }
589        let Some(idx) = self.windows.iter().position(|window| !window.row_pinned) else {
590            return false;
591        };
592        self.windows.remove(idx);
593        self.eviction_count += 1;
594        self.active_window_idx = None;
595        true
596    }
597
598    fn make_room_for_new_window(&mut self) {
599        if self.windows.len() < self.max_windows {
600            return;
601        }
602        let idx = if self.row_pin_count == 0 {
603            Some(
604                if self.active_window_idx == Some(0) && self.windows.len() > 1 {
605                    1
606                } else {
607                    0
608                },
609            )
610        } else {
611            self.windows.iter().position(|window| !window.row_pinned)
612        };
613        if let Some(idx) = idx {
614            self.windows.remove(idx);
615            self.eviction_count += 1;
616            self.active_window_idx = None;
617        }
618    }
619
620    fn get_window_index_preserving_row_pins(
621        &mut self,
622        position: u64,
623        size_needed: u64,
624    ) -> Result<Option<usize>> {
625        if self.strategy == ExperimentalMmapStrategy::WholeFile {
626            return self.get_whole_file_window_index_preserving_row_pins(position, size_needed);
627        }
628
629        if let Some(idx) = self.lookup_window_by_range(position, size_needed) {
630            return Ok(Some(idx));
631        }
632
633        if let Some(idx) = self.lookup_window_by_position(position) {
634            if !self.windows[idx].row_pinned {
635                if self.row_pin_limit_reached() {
636                    return Ok(None);
637                }
638                // The overlapping window is not pinned, so no current-row
639                // payload can point into it. Replace it with a wider window;
640                // get_row_pinned_slice() pins the replacement before returning
641                // borrowed bytes to the reader.
642                return Ok(Some(self.replace_window_for_range(
643                    idx,
644                    position,
645                    size_needed,
646                )?));
647            }
648            // The pinned window contains the requested start but not the full
649            // requested range; lookup_window_by_range would have matched
650            // otherwise. Do not remap it because existing row slices may point
651            // into it. Map a wider overlapping window for this row instead.
652        }
653
654        if !self.evict_unpinned_window_if_full() {
655            return Ok(None);
656        }
657
658        Ok(Some(self.push_new_window_for_range(position, size_needed)?))
659    }
660
661    fn get_whole_file_window_index_preserving_row_pins(
662        &mut self,
663        position: u64,
664        size_needed: u64,
665    ) -> Result<Option<usize>> {
666        let idx = self.get_whole_file_window_index(position, size_needed)?;
667        if !self.windows[idx].row_pinned {
668            self.windows[idx].row_pinned = true;
669            self.row_pin_count += 1;
670        }
671        Ok(Some(idx))
672    }
673
674    fn get_window_index(&mut self, position: u64, size_needed: u64) -> Result<usize> {
675        if self.strategy == ExperimentalMmapStrategy::WholeFile {
676            return self.get_whole_file_window_index(position, size_needed);
677        }
678        if let Some(idx) = self.lookup_window_by_range(position, size_needed) {
679            self.active_window_idx = Some(idx);
680            return Ok(idx);
681        }
682        if let Some(idx) = self.lookup_window_by_position(position) {
683            return self.get_overlapping_window_index(idx, position, size_needed);
684        }
685        self.get_new_window_index(position, size_needed)
686    }
687
688    fn get_overlapping_window_index(
689        &mut self,
690        idx: usize,
691        position: u64,
692        size_needed: u64,
693    ) -> Result<usize> {
694        if self.row_pin_count > 0 && self.windows[idx].row_pinned {
695            // Non-row-pinned reads may use a transient window while row-pinned
696            // windows stay valid until the current row is released.
697            self.evict_unpinned_window_if_full();
698            let idx = self.push_new_window_for_range(position, size_needed)?;
699            self.active_window_idx = Some(idx);
700            return Ok(idx);
701        }
702        let idx = self.replace_window_for_range(idx, position, size_needed)?;
703        self.active_window_idx = Some(idx);
704        Ok(idx)
705    }
706
707    fn get_new_window_index(&mut self, position: u64, size_needed: u64) -> Result<usize> {
708        self.make_room_for_new_window();
709        let idx = self.push_new_window_for_range(position, size_needed)?;
710        self.active_window_idx = Some(idx);
711        Ok(idx)
712    }
713
714    fn get_window(&mut self, position: u64, size_needed: u64) -> Result<&mut Window<M>> {
715        let idx = self.get_window_index(position, size_needed)?;
716        Ok(&mut self.windows[idx])
717    }
718
719    fn get_whole_file_window_index(&mut self, position: u64, size_needed: u64) -> Result<usize> {
720        if let Some(idx) = self.lookup_window_by_range(position, size_needed) {
721            self.active_window_idx = Some(idx);
722            return Ok(idx);
723        }
724
725        let requested_end = position
726            .checked_add(size_needed)
727            .ok_or(JournalError::ObjectExceedsFileBounds)?;
728        match self.bounds_mode {
729            BoundsMode::LiveFile | BoundsMode::Snapshot => {
730                self.ensure_cached_file_contains(requested_end)?
731            }
732            BoundsMode::WriterOwned => {}
733        }
734        let target_end = requested_end.max(self.file_size);
735        let window_end = self.get_chunk_aligned_end(target_end)?;
736        let chunk_count = (window_end / self.chunk_size).max(1);
737
738        let had_windows = !self.windows.is_empty();
739        if had_windows {
740            self.windows.clear();
741            self.active_window_idx = None;
742            self.row_pin_count = 0;
743            self.row_overflow_objects.clear();
744        }
745
746        let new_window = self.create_window(0, chunk_count)?;
747        if had_windows {
748            self.remap_count += 1;
749        }
750        let idx = self.push_window(new_window);
751        self.active_window_idx = Some(idx);
752        Ok(idx)
753    }
754
755    pub fn get_slice(&mut self, position: u64, size: u64) -> Result<&[u8]> {
756        let end = position
757            .checked_add(size)
758            .ok_or(JournalError::ObjectExceedsFileBounds)?;
759        self.ensure_cached_file_contains(end)?;
760        let window = self.get_window(position, size)?;
761        Ok(window.get_slice(position, size))
762    }
763}
764
765impl<M: MemoryMapMut> WindowManager<M> {
766    pub fn get_slice_mut(&mut self, position: u64, size: u64) -> Result<&mut [u8]> {
767        let _end = position
768            .checked_add(size)
769            .ok_or(JournalError::ObjectExceedsFileBounds)?;
770        let window = self.get_window(position, size)?;
771        Ok(window.get_mut_slice(position, size))
772    }
773
774    /// Syncs all file data to disk
775    pub fn sync(&mut self, logical_size: u64, header_bytes: &[u8]) -> Result<()> {
776        for window in &self.windows {
777            window.mmap.flush()?;
778        }
779        self.windows.clear();
780        self.active_window_idx = None;
781        self.row_pin_count = 0;
782        self.row_overflow_objects.clear();
783        self.file.set_len(logical_size)?;
784        #[cfg(unix)]
785        {
786            let mut written = 0usize;
787            while written < header_bytes.len() {
788                written += self
789                    .file
790                    .write_at(&header_bytes[written..], written as u64)?;
791            }
792        }
793        #[cfg(not(unix))]
794        {
795            self.file.seek(SeekFrom::Start(0))?;
796            self.file.write_all(header_bytes)?;
797        }
798        self.file.sync_data()?;
799        self.file_size = logical_size;
800        Ok(())
801    }
802
803    /// Publish mmap writes to stock follow readers by triggering an inotify
804    /// event with the same-size truncate used by systemd.
805    pub fn post_change(&mut self, logical_size: u64) -> Result<()> {
806        fence(Ordering::SeqCst);
807        if logical_size < self.file_size {
808            self.windows.clear();
809            self.active_window_idx = None;
810            self.row_pin_count = 0;
811            self.row_overflow_objects.clear();
812        }
813        self.file.set_len(logical_size)?;
814        self.file_size = logical_size;
815        Ok(())
816    }
817}
818
819#[cfg(test)]
820mod tests {
821    use super::*;
822    use crate::error::JournalError;
823    use std::cell::Cell;
824    use std::io::Write;
825    use std::rc::Rc;
826    use tempfile::NamedTempFile;
827
828    const PAGE_SIZE_TEST: u64 = 4096;
829
830    /// A mock MemoryMap that can be configured to fail on specific calls.
831    /// This allows us to test error handling in WindowManager.
832    struct FailingMmap {
833        data: Vec<u8>,
834    }
835
836    impl Deref for FailingMmap {
837        type Target = [u8];
838        fn deref(&self) -> &[u8] {
839            &self.data
840        }
841    }
842
843    /// Shared state to control when the mock should fail
844    struct MockController {
845        fail_next_create: Cell<bool>,
846        create_count: Cell<usize>,
847    }
848
849    impl MockController {
850        fn new() -> Self {
851            Self {
852                fail_next_create: Cell::new(false),
853                create_count: Cell::new(0),
854            }
855        }
856
857        fn set_fail_next(&self, fail: bool) {
858            self.fail_next_create.set(fail);
859        }
860
861        fn should_fail(&self) -> bool {
862            let count = self.create_count.get();
863            self.create_count.set(count + 1);
864            self.fail_next_create.get()
865        }
866    }
867
868    // Thread-local controller for the mock
869    thread_local! {
870        static MOCK_CONTROLLER: Rc<MockController> = Rc::new(MockController::new());
871    }
872
873    impl MemoryMap for FailingMmap {
874        fn create(_file: &File, _offset: u64, size: u64) -> Result<Self> {
875            let mmap_size = size as usize;
876            MOCK_CONTROLLER.with(|ctrl| {
877                if ctrl.should_fail() {
878                    return Err(JournalError::Io(std::io::Error::new(
879                        std::io::ErrorKind::Other,
880                        "simulated mmap failure",
881                    )));
882                }
883                // Create a mock mmap with zeros
884                Ok(FailingMmap {
885                    data: vec![0u8; mmap_size],
886                })
887            })
888        }
889    }
890
891    /// This test verifies that WindowManager maintains consistent state
892    /// after a failed remap operation.
893    ///
894    /// The scenario:
895    /// 1. A window exists at some position
896    /// 2. A request comes in that requires remapping (position in window, but size extends beyond)
897    /// 3. The old window is removed
898    /// 4. Creating the new (larger) window fails (e.g., mmap error)
899    /// 5. The WindowManager should remain in a consistent state
900    /// 6. Subsequent operations should not panic
901    #[test]
902    fn test_consistent_state_after_failed_remap() {
903        // Create a temporary file (content doesn't matter for mock)
904        let mut temp_file = NamedTempFile::new().unwrap();
905        temp_file.write_all(&[0u8; 8192]).unwrap();
906        temp_file.flush().unwrap();
907
908        let file = File::open(temp_file.path()).unwrap();
909
910        // Create WindowManager with mock mmap, 4KB chunks, max 1 window
911        let mut wm: WindowManager<FailingMmap> =
912            WindowManager::new(file, PAGE_SIZE_TEST, 1).unwrap();
913
914        // Reset controller state
915        MOCK_CONTROLLER.with(|ctrl| {
916            ctrl.set_fail_next(false);
917            ctrl.create_count.set(0);
918        });
919
920        // First read: creates a window at offset 0, size 4KB (this should succeed)
921        {
922            let slice = wm.get_slice(0, 100).unwrap();
923            assert_eq!(slice.len(), 100);
924        }
925        assert_eq!(wm.windows.len(), 1);
926        assert_eq!(wm.active_window_idx, Some(0));
927
928        // Configure mock to fail on the next create call
929        MOCK_CONTROLLER.with(|ctrl| ctrl.set_fail_next(true));
930
931        // Request a slice that requires remapping:
932        // - Position 100 is within the existing window [0, 4096)
933        // - But size 4000 means we need bytes [100, 4100), which extends beyond window
934        // - This triggers the "Remap the window" branch
935        // - The old window is removed
936        // - Then create_window is called and FAILS
937        let remap_result = wm.get_slice(100, 4000);
938        assert!(remap_result.is_err(), "Expected remap to fail");
939
940        // Verify state is consistent after the failure:
941        // - windows is empty (the old window was removed, new one failed to create)
942        // - active_window_idx should be None (not pointing to non-existent window)
943        assert_eq!(wm.windows.len(), 0);
944        assert_eq!(wm.active_window_idx, None);
945
946        // Allow the next create to succeed
947        MOCK_CONTROLLER.with(|ctrl| ctrl.set_fail_next(false));
948
949        // The next operation should NOT panic - it should succeed by creating a new window
950        let result = wm.get_slice(0, 100);
951        assert!(
952            result.is_ok(),
953            "Expected get_slice to succeed after recovery"
954        );
955        assert_eq!(wm.windows.len(), 1);
956    }
957
958    /// This test verifies that WindowManager maintains consistent state
959    /// after a failed window creation in the eviction path.
960    ///
961    /// The scenario:
962    /// 1. A window exists and we're at max_windows
963    /// 2. A request comes in for a different region requiring a new window
964    /// 3. The old window is evicted to make room
965    /// 4. Creating the new window fails (e.g., mmap error)
966    /// 5. The WindowManager should remain in a consistent state
967    /// 6. Subsequent operations should not panic
968    #[test]
969    fn test_consistent_state_after_failed_eviction() {
970        // Create a temporary file
971        let mut temp_file = NamedTempFile::new().unwrap();
972        temp_file.write_all(&[0u8; 8192]).unwrap();
973        temp_file.flush().unwrap();
974
975        let file = File::open(temp_file.path()).unwrap();
976
977        // Create WindowManager with mock mmap, 4KB chunks, max 1 window
978        let mut wm: WindowManager<FailingMmap> =
979            WindowManager::new(file, PAGE_SIZE_TEST, 1).unwrap();
980
981        // Reset controller state
982        MOCK_CONTROLLER.with(|ctrl| {
983            ctrl.set_fail_next(false);
984            ctrl.create_count.set(0);
985        });
986
987        // Create first window at offset 0
988        {
989            let _slice = wm.get_slice(0, 100).unwrap();
990        }
991        assert_eq!(wm.windows.len(), 1);
992        assert_eq!(wm.active_window_idx, Some(0));
993
994        // Configure mock to fail on the next create call
995        MOCK_CONTROLLER.with(|ctrl| ctrl.set_fail_next(true));
996
997        // Request a slice at a completely different position (second page)
998        // This triggers:
999        // - lookup_window_by_range returns None (position 4096 not in window [0, 4096))
1000        // - lookup_window_by_position returns None
1001        // - "Create a brand new window" branch
1002        // - Eviction: windows.remove(0) since we're at max_windows
1003        // - create_window fails
1004        let result = wm.get_slice(4096, 100);
1005        assert!(result.is_err(), "Expected mmap to fail");
1006
1007        // Verify state is consistent after the failure:
1008        // - windows is empty (the old window was evicted, new one failed to create)
1009        // - active_window_idx should be None (not pointing to non-existent window)
1010        assert_eq!(wm.windows.len(), 0);
1011        assert_eq!(wm.active_window_idx, None);
1012
1013        // Allow the next create to succeed
1014        MOCK_CONTROLLER.with(|ctrl| ctrl.set_fail_next(false));
1015
1016        // The next operation should NOT panic - it should succeed by creating a new window
1017        let result = wm.get_slice(0, 100);
1018        assert!(
1019            result.is_ok(),
1020            "Expected get_slice to succeed after recovery"
1021        );
1022        assert_eq!(wm.windows.len(), 1);
1023    }
1024
1025    #[test]
1026    fn row_pinned_slice_uses_overflow_storage_at_window_limit_one() {
1027        let mut temp_file = NamedTempFile::new().unwrap();
1028        temp_file
1029            .write_all(&vec![1u8; PAGE_SIZE_TEST as usize])
1030            .unwrap();
1031        temp_file
1032            .write_all(&vec![2u8; PAGE_SIZE_TEST as usize])
1033            .unwrap();
1034        temp_file.flush().unwrap();
1035
1036        let file = File::open(temp_file.path()).unwrap();
1037        let mut wm: WindowManager<Mmap> = WindowManager::new(file, PAGE_SIZE_TEST, 1).unwrap();
1038
1039        let first = wm.get_row_pinned_slice(0, 16).unwrap();
1040        let first_ptr = first.as_ptr();
1041        let first_len = first.len();
1042        assert_eq!(first, &[1u8; 16]);
1043
1044        let second = wm.get_row_pinned_slice(PAGE_SIZE_TEST, 16).unwrap();
1045        assert_eq!(second, &[2u8; 16]);
1046
1047        let stats = wm.stats();
1048        assert_eq!(stats.row_pin_limit, 1);
1049        assert_eq!(stats.row_pin_count, 1);
1050        assert_eq!(stats.window_count, 1);
1051        assert_eq!(stats.row_overflow_object_count, 1);
1052
1053        // SAFETY: The first slice points into a row-pinned mmap window. The
1054        // second access forced overflow storage, but it must not unmap the
1055        // first row-pinned window.
1056        // nosemgrep: rust.lang.security.unsafe-usage.unsafe-usage
1057        let first_after_overflow = unsafe { std::slice::from_raw_parts(first_ptr, first_len) };
1058        assert_eq!(first_after_overflow, &[1u8; 16]);
1059
1060        wm.clear_row_pins();
1061        let stats = wm.stats();
1062        assert_eq!(stats.row_pin_count, 0);
1063        assert_eq!(stats.row_overflow_object_count, 0);
1064    }
1065
1066    #[test]
1067    fn live_reader_refreshes_file_size_only_when_access_exceeds_cache() {
1068        let mut temp_file = NamedTempFile::new().unwrap();
1069        temp_file
1070            .write_all(&vec![1u8; PAGE_SIZE_TEST as usize])
1071            .unwrap();
1072        temp_file.flush().unwrap();
1073
1074        let file = File::open(temp_file.path()).unwrap();
1075        let mut wm: WindowManager<Mmap> = WindowManager::new(file, PAGE_SIZE_TEST, 2).unwrap();
1076        assert_eq!(wm.stats().file_size, PAGE_SIZE_TEST);
1077
1078        assert_eq!(wm.get_slice(0, 16).unwrap(), &[1u8; 16]);
1079
1080        temp_file
1081            .write_all(&vec![2u8; PAGE_SIZE_TEST as usize])
1082            .unwrap();
1083        temp_file.flush().unwrap();
1084
1085        assert_eq!(wm.get_slice(128, 16).unwrap(), &[1u8; 16]);
1086        assert_eq!(wm.stats().file_size, PAGE_SIZE_TEST);
1087
1088        assert_eq!(wm.get_slice(PAGE_SIZE_TEST + 128, 16).unwrap(), &[2u8; 16]);
1089        assert_eq!(wm.stats().file_size, PAGE_SIZE_TEST * 2);
1090    }
1091
1092    #[test]
1093    fn snapshot_reader_does_not_refresh_file_size_after_growth() {
1094        let mut temp_file = NamedTempFile::new().unwrap();
1095        temp_file
1096            .write_all(&vec![1u8; PAGE_SIZE_TEST as usize])
1097            .unwrap();
1098        temp_file.flush().unwrap();
1099
1100        let file = File::open(temp_file.path()).unwrap();
1101        let mut wm: WindowManager<Mmap> = WindowManager::new_snapshot(
1102            file,
1103            PAGE_SIZE_TEST,
1104            2,
1105            ExperimentalMmapStrategy::Windowed,
1106        )
1107        .unwrap();
1108        assert_eq!(wm.stats().file_size, PAGE_SIZE_TEST);
1109
1110        temp_file
1111            .write_all(&vec![2u8; PAGE_SIZE_TEST as usize])
1112            .unwrap();
1113        temp_file.flush().unwrap();
1114
1115        assert!(matches!(
1116            wm.get_slice(PAGE_SIZE_TEST + 128, 16).unwrap_err(),
1117            JournalError::ObjectExceedsFileBounds
1118        ));
1119        assert_eq!(wm.stats().file_size, PAGE_SIZE_TEST);
1120    }
1121
1122    #[test]
1123    fn snapshot_whole_file_maps_cached_file_once() {
1124        let temp_file = NamedTempFile::new().unwrap();
1125        temp_file.as_file().set_len(PAGE_SIZE_TEST * 2).unwrap();
1126        let file = std::fs::OpenOptions::new()
1127            .read(true)
1128            .open(temp_file.path())
1129            .unwrap();
1130        let mut wm: WindowManager<Mmap> = WindowManager::new_snapshot(
1131            file,
1132            PAGE_SIZE_TEST,
1133            32,
1134            ExperimentalMmapStrategy::WholeFile,
1135        )
1136        .unwrap();
1137
1138        assert_eq!(wm.get_slice(PAGE_SIZE_TEST + 128, 16).unwrap(), &[0; 16]);
1139        assert_eq!(wm.get_slice(128, 16).unwrap(), &[0; 16]);
1140
1141        let stats = wm.stats();
1142        assert_eq!(stats.strategy, ExperimentalMmapStrategy::WholeFile);
1143        assert_eq!(stats.file_size, PAGE_SIZE_TEST * 2);
1144        assert_eq!(stats.current_mapped_bytes, PAGE_SIZE_TEST * 2);
1145        assert_eq!(stats.map_count, 1);
1146        assert_eq!(stats.remap_count, 0);
1147    }
1148
1149    #[test]
1150    fn snapshot_whole_file_does_not_refresh_file_size_after_growth() {
1151        let mut temp_file = NamedTempFile::new().unwrap();
1152        temp_file
1153            .write_all(&vec![1u8; PAGE_SIZE_TEST as usize])
1154            .unwrap();
1155        temp_file.flush().unwrap();
1156
1157        let file = File::open(temp_file.path()).unwrap();
1158        let mut wm: WindowManager<Mmap> = WindowManager::new_snapshot(
1159            file,
1160            PAGE_SIZE_TEST,
1161            32,
1162            ExperimentalMmapStrategy::WholeFile,
1163        )
1164        .unwrap();
1165        assert_eq!(wm.get_slice(128, 16).unwrap(), &[1u8; 16]);
1166
1167        temp_file
1168            .write_all(&vec![2u8; PAGE_SIZE_TEST as usize])
1169            .unwrap();
1170        temp_file.flush().unwrap();
1171
1172        assert!(matches!(
1173            wm.get_slice(PAGE_SIZE_TEST + 128, 16).unwrap_err(),
1174            JournalError::ObjectExceedsFileBounds
1175        ));
1176        assert_eq!(wm.stats().file_size, PAGE_SIZE_TEST);
1177    }
1178
1179    #[test]
1180    fn live_whole_file_maps_cached_file_once_and_remaps_on_growth() {
1181        let mut temp_file = NamedTempFile::new().unwrap();
1182        temp_file
1183            .write_all(&vec![1u8; PAGE_SIZE_TEST as usize])
1184            .unwrap();
1185        temp_file.flush().unwrap();
1186
1187        let file = File::open(temp_file.path()).unwrap();
1188        let mut wm: WindowManager<Mmap> = WindowManager::new_with_strategy(
1189            file,
1190            PAGE_SIZE_TEST,
1191            32,
1192            ExperimentalMmapStrategy::WholeFile,
1193        )
1194        .unwrap();
1195
1196        assert_eq!(wm.get_slice(128, 16).unwrap(), &[1u8; 16]);
1197        let stats = wm.stats();
1198        assert_eq!(stats.strategy, ExperimentalMmapStrategy::WholeFile);
1199        assert_eq!(stats.file_size, PAGE_SIZE_TEST);
1200        assert_eq!(stats.current_mapped_bytes, PAGE_SIZE_TEST);
1201        assert_eq!(stats.map_count, 1);
1202        assert_eq!(stats.remap_count, 0);
1203
1204        temp_file
1205            .write_all(&vec![2u8; PAGE_SIZE_TEST as usize])
1206            .unwrap();
1207        temp_file.flush().unwrap();
1208
1209        assert_eq!(wm.get_slice(256, 16).unwrap(), &[1u8; 16]);
1210        let stats = wm.stats();
1211        assert_eq!(stats.file_size, PAGE_SIZE_TEST);
1212        assert_eq!(stats.map_count, 1);
1213        assert_eq!(stats.remap_count, 0);
1214
1215        assert_eq!(wm.get_slice(PAGE_SIZE_TEST + 128, 16).unwrap(), &[2u8; 16]);
1216        let stats = wm.stats();
1217        assert_eq!(stats.file_size, PAGE_SIZE_TEST * 2);
1218        assert_eq!(stats.current_mapped_bytes, PAGE_SIZE_TEST * 2);
1219        assert_eq!(stats.map_count, 2);
1220        assert_eq!(stats.remap_count, 1);
1221    }
1222
1223    #[test]
1224    fn whole_file_writer_owned_remaps_after_post_change_growth() {
1225        let temp_file = NamedTempFile::new().unwrap();
1226        temp_file.as_file().set_len(PAGE_SIZE_TEST).unwrap();
1227        let file = std::fs::OpenOptions::new()
1228            .read(true)
1229            .write(true)
1230            .open(temp_file.path())
1231            .unwrap();
1232        let mut wm: WindowManager<MmapMut> = WindowManager::new_writer_owned_with_strategy(
1233            file,
1234            PAGE_SIZE_TEST,
1235            32,
1236            ExperimentalMmapStrategy::WholeFile,
1237        )
1238        .unwrap();
1239
1240        wm.get_slice_mut(0, 16).unwrap().copy_from_slice(&[1; 16]);
1241        assert_eq!(wm.stats().current_mapped_bytes, PAGE_SIZE_TEST);
1242
1243        wm.post_change(PAGE_SIZE_TEST * 2).unwrap();
1244        assert_eq!(wm.get_slice(0, 16).unwrap(), &[1; 16]);
1245
1246        let new_offset = PAGE_SIZE_TEST + 128;
1247        wm.get_slice_mut(new_offset, 16)
1248            .unwrap()
1249            .copy_from_slice(&[2; 16]);
1250        assert_eq!(wm.get_slice(new_offset, 16).unwrap(), &[2; 16]);
1251
1252        let stats = wm.stats();
1253        assert_eq!(stats.current_mapped_bytes, PAGE_SIZE_TEST * 2);
1254        assert_eq!(stats.max_mapped_bytes, PAGE_SIZE_TEST * 2);
1255        assert_eq!(stats.remap_count, 1);
1256    }
1257
1258    #[test]
1259    fn post_change_drops_mappings_before_truncating_oversized_windows() {
1260        let temp_file = NamedTempFile::new().unwrap();
1261        temp_file.as_file().set_len(PAGE_SIZE_TEST).unwrap();
1262        let file = std::fs::OpenOptions::new()
1263            .read(true)
1264            .write(true)
1265            .open(temp_file.path())
1266            .unwrap();
1267        let oversized_window = PAGE_SIZE_TEST * 4;
1268        let mut wm: WindowManager<MmapMut> = WindowManager::new_writer_owned_with_strategy(
1269            file,
1270            oversized_window,
1271            32,
1272            ExperimentalMmapStrategy::WholeFile,
1273        )
1274        .unwrap();
1275
1276        wm.get_slice_mut(0, 16).unwrap().copy_from_slice(&[1; 16]);
1277        assert_eq!(wm.stats().current_mapped_bytes, oversized_window);
1278
1279        wm.post_change(PAGE_SIZE_TEST * 2).unwrap();
1280        let stats_after_truncate = wm.stats();
1281        assert_eq!(stats_after_truncate.file_size, PAGE_SIZE_TEST * 2);
1282        assert_eq!(stats_after_truncate.current_mapped_bytes, 0);
1283        assert_eq!(stats_after_truncate.window_count, 0);
1284
1285        let crossing_offset = PAGE_SIZE_TEST * 2 - 1024;
1286        let crossing_payload = vec![2; 2048];
1287        wm.get_slice_mut(crossing_offset, 2048)
1288            .unwrap()
1289            .copy_from_slice(&crossing_payload);
1290        assert_eq!(
1291            wm.get_slice(crossing_offset, 2048).unwrap(),
1292            crossing_payload.as_slice()
1293        );
1294        assert_eq!(wm.stats().current_mapped_bytes, oversized_window);
1295    }
1296
1297    #[test]
1298    fn sequential_boundary_crossing_slides_window_instead_of_growing_from_start() {
1299        let mut temp_file = NamedTempFile::new().unwrap();
1300        temp_file.write_all(&[0u8; 64 * 1024]).unwrap();
1301        temp_file.flush().unwrap();
1302
1303        let file = File::open(temp_file.path()).unwrap();
1304        let mut wm: WindowManager<FailingMmap> =
1305            WindowManager::new(file, PAGE_SIZE_TEST, 1).unwrap();
1306
1307        MOCK_CONTROLLER.with(|ctrl| {
1308            ctrl.set_fail_next(false);
1309            ctrl.create_count.set(0);
1310        });
1311
1312        let _ = wm.get_slice(0, 100).unwrap();
1313        assert_eq!(wm.windows[0].offset, 0);
1314        assert_eq!(wm.windows[0].size, PAGE_SIZE_TEST);
1315
1316        let _ = wm.get_slice(PAGE_SIZE_TEST - 6, 32).unwrap();
1317        assert_eq!(wm.windows[0].offset, 0);
1318        assert_eq!(wm.windows[0].size, PAGE_SIZE_TEST * 2);
1319
1320        let _ = wm.get_slice((PAGE_SIZE_TEST * 2) - 12, 32).unwrap();
1321        assert_eq!(wm.windows[0].offset, PAGE_SIZE_TEST);
1322        assert_eq!(wm.windows[0].size, PAGE_SIZE_TEST * 2);
1323
1324        let _ = wm.get_slice((PAGE_SIZE_TEST * 3) - 20, 32).unwrap();
1325        assert_eq!(wm.windows[0].offset, PAGE_SIZE_TEST * 2);
1326        assert_eq!(wm.windows[0].size, PAGE_SIZE_TEST * 2);
1327    }
1328}