mmap_vec/
segment.rs

1use std::{
2    fs::{File, OpenOptions},
3    io, mem,
4    ops::{Deref, DerefMut},
5    os::fd::AsRawFd,
6    path::Path,
7    ptr, slice,
8    sync::atomic::Ordering,
9};
10
11use crate::{
12    stats::{COUNT_ACTIVE_SEGMENT, COUNT_FTRUNCATE_FAILED, COUNT_MMAP_FAILED, COUNT_MUNMAP_FAILED},
13    utils::{check_zst, page_size},
14};
15
16/// Segment is a constant slice of type T that is memory mapped to disk.
17///
18/// It is the basic building block of memory mapped data structure.
19///
20/// It cannot growth / shrink.
21#[derive(Debug)]
22pub struct Segment<T> {
23    pub(crate) addr: *mut T,
24    len: usize,
25    capacity: usize,
26}
27
28impl<T> Segment<T> {
29    /// Create a zero size segment.
30    #[inline(always)]
31    pub const fn null() -> Self {
32        check_zst::<T>();
33        Self {
34            addr: std::ptr::null_mut(),
35            len: 0,
36            capacity: 0,
37        }
38    }
39
40    /// Memory map a segment to disk.
41    ///
42    /// File will be created and init with computed capacity.
43    pub fn open_rw<P: AsRef<Path>>(path: P, capacity: usize) -> io::Result<Self> {
44        check_zst::<T>();
45        if capacity == 0 {
46            return Ok(Self::null());
47        }
48
49        let file = OpenOptions::new()
50            .read(true)
51            .write(true)
52            .create(true)
53            .open(&path)?;
54
55        // Fill the file with 0
56        unsafe { ftruncate::<T>(&file, capacity) }?;
57
58        // Map the block
59        let addr = unsafe { mmap(&file, capacity) }?;
60        Ok(Self {
61            addr,
62            len: 0,
63            capacity,
64        })
65    }
66
67    /// Currently used segment size.
68    #[inline(always)]
69    pub fn capacity(&self) -> usize {
70        self.capacity
71    }
72
73    /// Shortens the segment, keeping the first `new_len` elements and dropping
74    /// the rest.
75    pub fn truncate(&mut self, new_len: usize) {
76        if new_len > self.len {
77            return;
78        }
79
80        unsafe {
81            let remaining_len = self.len - new_len;
82            let items = ptr::slice_from_raw_parts_mut(self.addr.add(new_len), remaining_len);
83            self.set_len(new_len);
84            ptr::drop_in_place(items);
85        }
86    }
87
88    /// Remove `delete_count` element at beginning of the segment.
89    ///
90    /// Element will be drop in place.
91    ///
92    /// If delete count is greater than the segment len, then this call will be
93    /// equivalent to calling `clear` function.
94    pub fn truncate_first(&mut self, delete_count: usize) {
95        let new_len = self.len.saturating_add_signed(-(delete_count as isize));
96        if new_len == 0 {
97            self.clear()
98        } else {
99            unsafe {
100                let items = slice::from_raw_parts_mut(self.addr, delete_count);
101                ptr::drop_in_place(items);
102                ptr::copy(self.addr.add(delete_count), self.addr, new_len);
103                self.set_len(new_len);
104            }
105        }
106    }
107
108    /// Clears the segment, removing all values.
109    #[inline]
110    pub fn clear(&mut self) {
111        unsafe {
112            let items = slice::from_raw_parts_mut(self.addr, self.len);
113            self.set_len(0);
114            ptr::drop_in_place(items);
115        }
116    }
117
118    /// Forces the length of the segment to `new_len`.
119    #[allow(clippy::missing_safety_doc)]
120    #[inline(always)]
121    pub unsafe fn set_len(&mut self, new_len: usize) {
122        debug_assert!(new_len <= self.capacity());
123        self.len = new_len;
124    }
125
126    /// Bytes use on disk for this segment.
127    #[inline(always)]
128    pub fn disk_size(&self) -> usize {
129        self.capacity * mem::size_of::<T>()
130    }
131
132    /// Try to add new element to the segment.
133    ///
134    /// If the segment is already full, value will be return in `Err`.
135    #[inline]
136    pub fn push_within_capacity(&mut self, value: T) -> Result<(), T> {
137        if self.len == self.capacity {
138            return Err(value);
139        }
140
141        unsafe {
142            let dst = self.addr.add(self.len);
143            ptr::write(dst, value);
144        }
145
146        self.len += 1;
147        Ok(())
148    }
149
150    /// Remove last element of the segment and reduce its capacity.
151    ///
152    /// Value will be return if segment is not empty.
153    #[inline]
154    pub fn pop(&mut self) -> Option<T> {
155        if self.len == 0 {
156            return None;
157        }
158
159        self.len -= 1;
160        unsafe {
161            let src = self.addr.add(self.len);
162            Some(ptr::read(src))
163        }
164    }
165
166    /// Move data contained in `other` segment to the end of current segment.
167    ///
168    /// ```rust
169    /// # use mmap_vec::Segment;
170    /// let mut s1 = Segment::<i32>::open_rw("test_extend_from_segment_1.seg", 2).unwrap();
171    /// let mut s2 = Segment::<i32>::open_rw("test_extend_from_segment_2.seg", 5).unwrap();
172    ///
173    /// s1.push_within_capacity(7);
174    /// s1.push_within_capacity(-3);
175    /// s2.push_within_capacity(-4);
176    /// s2.push_within_capacity(37);
177    ///
178    /// assert_eq!(&s1[..], [7, -3]);
179    /// assert_eq!(&s2[..], [-4, 37]);
180    ///
181    /// s2.extend_from_segment(s1);
182    /// assert_eq!(&s2[..], [-4, 37, 7, -3]);
183    ///
184    /// # let _ = std::fs::remove_file("test_extend_from_segment_1.seg");
185    /// # let _ = std::fs::remove_file("test_extend_from_segment_2.seg");
186    /// ```
187    pub fn extend_from_segment(&mut self, mut other: Segment<T>) {
188        let new_len = other.len + self.len;
189        assert!(
190            new_len <= self.capacity,
191            "New segment is too small: new_len={}, capacity={}",
192            new_len,
193            self.capacity
194        );
195
196        unsafe {
197            ptr::copy_nonoverlapping(other.addr, self.addr.add(self.len), other.len);
198            self.set_len(new_len);
199            other.set_len(0);
200        };
201    }
202
203    /// Inform the kernel that the complete segment will be access in a near future.
204    ///
205    /// All underlying pages should be load in RAM.
206    ///
207    /// This function is only a wrapper above `libc::madvise`.
208    ///
209    /// Will panic if `libc::madvise` return an error.
210    pub fn advice_prefetch_all_pages(&self) {
211        if self.addr.is_null() || self.len == 0 {
212            return;
213        }
214
215        let madvise_code = unsafe {
216            libc::madvise(
217                self.addr.cast(),
218                self.len * mem::size_of::<T>(),
219                libc::MADV_WILLNEED,
220            )
221        };
222        assert_eq!(
223            madvise_code,
224            0,
225            "madvise error: {}",
226            io::Error::last_os_error()
227        );
228    }
229
230    /// Inform the kernel that underlying page for `index` will be access in a near future.
231    ///
232    /// This function is only a wrapper above `libc::madvise`.
233    pub fn advice_prefetch_page_at(&self, index: usize) {
234        if self.addr.is_null() || index >= self.len {
235            return;
236        }
237
238        let page_size = page_size();
239        let page_mask = !(page_size.wrapping_add_signed(-1));
240
241        let madvise_code = unsafe {
242            libc::madvise(
243                (self.addr.add(index) as usize & page_mask) as *mut libc::c_void,
244                page_size,
245                libc::MADV_WILLNEED,
246            )
247        };
248        assert_eq!(
249            madvise_code,
250            0,
251            "madvise error: {}",
252            io::Error::last_os_error()
253        );
254    }
255}
256
257impl<T> Deref for Segment<T> {
258    type Target = [T];
259
260    #[inline(always)]
261    fn deref(&self) -> &Self::Target {
262        unsafe { slice::from_raw_parts(self.addr, self.len) }
263    }
264}
265
266impl<T> DerefMut for Segment<T> {
267    #[inline(always)]
268    fn deref_mut(&mut self) -> &mut Self::Target {
269        unsafe { slice::from_raw_parts_mut(self.addr, self.len) }
270    }
271}
272
273impl<T> Drop for Segment<T> {
274    fn drop(&mut self) {
275        if self.len > 0 {
276            unsafe { ptr::drop_in_place(ptr::slice_from_raw_parts_mut(self.addr, self.len)) }
277        }
278
279        if !self.addr.is_null() {
280            let _ = unsafe { munmap(self.addr, self.capacity) };
281        }
282    }
283}
284
285unsafe impl<T> Send for Segment<T> {}
286unsafe impl<T> Sync for Segment<T> {}
287
288unsafe fn ftruncate<T>(file: &File, capacity: usize) -> io::Result<()> {
289    check_zst::<T>();
290    let segment_size = capacity * mem::size_of::<T>();
291    let fd = file.as_raw_fd();
292
293    if libc::ftruncate(fd, segment_size as libc::off_t) != 0 {
294        COUNT_FTRUNCATE_FAILED.fetch_add(1, Ordering::Relaxed);
295        Err(io::Error::last_os_error())
296    } else {
297        Ok(())
298    }
299}
300
301unsafe fn mmap<T>(file: &File, capacity: usize) -> io::Result<*mut T> {
302    check_zst::<T>();
303    let segment_size = capacity * mem::size_of::<T>();
304
305    // It is safe to not keep a reference to the initial file descriptor.
306    // See: https://stackoverflow.com/questions/17490033/do-i-need-to-keep-a-file-open-after-calling-mmap-on-it
307    let fd = file.as_raw_fd();
308
309    let addr = libc::mmap(
310        std::ptr::null_mut(),
311        segment_size as libc::size_t,
312        libc::PROT_READ | libc::PROT_WRITE,
313        libc::MAP_SHARED,
314        fd,
315        0,
316    );
317
318    if addr == libc::MAP_FAILED {
319        COUNT_MMAP_FAILED.fetch_add(1, Ordering::Relaxed);
320        Err(io::Error::last_os_error())
321    } else {
322        COUNT_ACTIVE_SEGMENT.fetch_add(1, Ordering::Relaxed);
323        Ok(addr.cast())
324    }
325}
326
327unsafe fn munmap<T>(addr: *mut T, capacity: usize) -> io::Result<()> {
328    check_zst::<T>();
329    debug_assert!(!addr.is_null());
330    debug_assert!(capacity > 0);
331
332    let unmap_code = libc::munmap(addr.cast(), capacity * mem::size_of::<T>());
333
334    if unmap_code != 0 {
335        COUNT_MUNMAP_FAILED.fetch_add(1, Ordering::Relaxed);
336        Err(io::Error::last_os_error())
337    } else {
338        COUNT_ACTIVE_SEGMENT.fetch_sub(1, Ordering::Relaxed);
339        Ok(())
340    }
341}