mmap_append/
lib.rs

1pub use memmap2;
2use memmap2::{Advice, MmapRaw};
3
4use std::fmt;
5use std::io::{self, Result};
6use std::ops::Deref;
7use std::os::unix::io::{AsRawFd, RawFd};
8use std::slice;
9use std::sync::{Mutex, RwLock};
10
11/// A handle to an append-only memory mapped buffer.
12///
13/// Dereferencing this gives a `&[u8]` array of bytes which consists only of the
14/// bytes previously appended, not the entire unused space.
15///
16/// There will be an area the size of a usize at the start of the file which is
17/// used internally to record where the written content ends.
18///
19/// Only one writer may append at a time (the other writers will spin-wait). Readers are not
20/// blocked while an append is taking place.
21pub struct MmapAppend {
22    // Only one writer may append at a time.
23    append_lock: Mutex<()>,
24
25    // Needed for remap workaround
26    fd: RawFd,
27
28    // This is the mmap. It has a usize at the beginning indicating where the end of the content lies
29    // It is write locked only in the case of resizing, not in the case of appending.
30    pub(crate) inner: RwLock<MmapRaw>,
31}
32
33#[cfg(target_os = "linux")]
34fn remap(_fd: RawFd, inner: &mut MmapRaw, new_len: usize) -> Result<()> {
35    unsafe { inner.remap(new_len, memmap2::RemapOptions::new().may_move(true)) }
36}
37
38#[cfg(not(target_os = "linux"))]
39fn remap(fd: RawFd, inner: &mut MmapRaw, new_len: usize) -> Result<()> {
40    inner.flush()?;
41    let map = memmap2::MmapOptions::new().len(new_len).map_raw(fd)?;
42    // Drop the old map after making a new map
43    let _ = std::mem::replace(inner, map);
44    Ok(())
45}
46
47pub const HEADER_SIZE: usize = std::mem::size_of::<usize>();
48
49impl MmapAppend {
50    /// Creates Mmaps the `file` returning an MmapAppend object. The entire file will be mapped.
51    ///
52    /// If `initialize` is true, it writes the initial end marker setting the end of the
53    /// data to right after the end marker.
54    ///
55    /// There is no offset and it does not populate.
56    ///
57    /// ## Safety
58    ///
59    /// This is  `unsafe` because of the potential for *Undefined Behavior* (UB) using the map if the underlying
60    /// file is subsequently modified, in or out of process. Applications must consider the risk and take appropriate
61    /// precautions when using file-backed maps. Solutions such as file permissions, locks or process-private
62    /// (e.g. unlinked) files exist but are platform specific and limited.
63    pub unsafe fn new<T: MmapAsRawDesc>(file: T, initialize: bool) -> Result<MmapAppend> {
64        let fd = file.as_raw_desc().0;
65        // Will automatically look up the file length
66        let map = MmapRaw::map_raw(fd)?;
67        // File must be long enough for a usize 'end' record at the front
68        if map.len() < HEADER_SIZE {
69            return Err(io::Error::new(
70                io::ErrorKind::Other,
71                "File not large enough.",
72            ));
73        }
74
75        if initialize {
76            // write the end value to the beginning
77            let slice: &mut [u8] =
78                unsafe { slice::from_raw_parts_mut(map.as_mut_ptr(), HEADER_SIZE) };
79            slice[0..HEADER_SIZE].copy_from_slice(&HEADER_SIZE.to_le_bytes());
80            map.flush_range(0, HEADER_SIZE)?;
81        }
82
83        Ok(MmapAppend {
84            append_lock: Mutex::new(()),
85            inner: RwLock::new(map),
86            fd,
87        })
88    }
89
90    /// Append data. `writer` should write up to max_len bytes and then return the number
91    /// of bytes actually written, or an error.
92    ///
93    /// This will return an error if there is not enough space, or if the writer errors.
94    ///
95    /// This may panic if the mutex is poisoned. If our code is not buggy, it will never happen.
96    pub fn append<F>(&self, max_len: usize, writer: F) -> Result<usize>
97    where
98        F: FnOnce(&mut [u8]) -> Result<usize>,
99    {
100        // Wait for and acquire the append lock
101        let _guard = self.append_lock.lock().unwrap();
102
103        // Read lock the map
104        let inner = self.inner.read().unwrap();
105
106        // Define a slice over the map
107        let slice: &mut [u8] =
108            unsafe { slice::from_raw_parts_mut(inner.as_mut_ptr(), inner.len()) };
109
110        // Read the end marker
111        let end = usize::from_le_bytes(slice[0..HEADER_SIZE].try_into().unwrap());
112
113        // Check available space
114        if end + max_len > inner.len() {
115            return Err(io::Error::new(io::ErrorKind::Other, "Out of space"));
116        }
117
118        // Append
119        let len = writer(&mut slice[end..end + max_len])?;
120
121        // This is to make sure the end marker is not over-written until
122        // strictly after the append happens
123        std::sync::atomic::fence(std::sync::atomic::Ordering::SeqCst);
124
125        // Overwrite the end marker
126        let newend = end + len;
127        slice[0..HEADER_SIZE].copy_from_slice(&newend.to_le_bytes());
128
129        Ok(end)
130    }
131
132    /// Resize the map. The caller is responsible for ensuring the file is long enough.
133    ///
134    /// This may return OS errors
135    ///
136    /// This may panic if the mutex is poisoned. If our code is not buggy, it will never happen.
137    pub fn resize(&self, new_len: usize) -> Result<()> {
138        // Wait for and acquire the append lock (so nobody can append)
139        let _guard = self.append_lock.lock().unwrap();
140
141        // Write lock the map
142        let mut inner = self.inner.write().unwrap();
143
144        // flush first
145        inner.flush_range(0, inner.len())?;
146
147        remap(self.fd, &mut inner, new_len)
148    }
149
150    pub fn get_end(&self) -> usize {
151        let inner = self.inner.read().unwrap();
152        let slice: &[u8] = unsafe { slice::from_raw_parts(inner.as_ptr(), HEADER_SIZE) };
153        usize::from_le_bytes(slice[0..HEADER_SIZE].try_into().unwrap())
154    }
155
156    pub fn flush(&self) -> Result<()> {
157        let len = self.len();
158        let inner = self.inner.read().unwrap();
159        inner.flush_range(0, len)
160    }
161
162    pub fn flush_async(&self) -> Result<()> {
163        let len = self.len();
164        let inner = self.inner.read().unwrap();
165        inner.flush_async_range(0, len)
166    }
167
168    pub fn flush_range(&self, offset: usize, len: usize) -> Result<()> {
169        let inner = self.inner.read().unwrap();
170        inner.flush_range(offset, len)
171    }
172
173    pub fn flush_async_range(&self, offset: usize, len: usize) -> Result<()> {
174        let inner = self.inner.read().unwrap();
175        inner.flush_async_range(offset, len)
176    }
177
178    pub fn advise(&self, advice: Advice) -> Result<()> {
179        let inner = self.inner.read().unwrap();
180        inner.advise(advice)
181    }
182
183    pub fn advise_range(&self, advice: Advice, offset: usize, len: usize) -> Result<()> {
184        let inner = self.inner.read().unwrap();
185        inner.advise_range(advice, offset, len)
186    }
187
188    pub fn lock(&mut self) -> Result<()> {
189        let inner = self.inner.read().unwrap();
190        inner.lock()
191    }
192
193    pub fn unlock(&mut self) -> Result<()> {
194        let inner = self.inner.read().unwrap();
195        inner.unlock()
196    }
197}
198
199#[cfg(feature = "stable_deref_trait")]
200unsafe impl stable_deref_trait::StableDeref for MmapAppend {}
201
202impl Deref for MmapAppend {
203    type Target = [u8];
204
205    #[inline]
206    fn deref(&self) -> &[u8] {
207        let inner = self.inner.read().unwrap();
208        unsafe { slice::from_raw_parts(inner.as_ptr(), self.get_end()) }
209    }
210}
211
212impl AsRef<[u8]> for MmapAppend {
213    #[inline]
214    fn as_ref(&self) -> &[u8] {
215        self.deref()
216    }
217}
218
219impl fmt::Debug for MmapAppend {
220    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
221        fmt.debug_struct("MmapAppend")
222            .field("ptr", &self.as_ptr())
223            .field("len", &self.len())
224            .finish()
225    }
226}
227
228pub struct MmapRawDescriptor(RawFd);
229
230pub trait MmapAsRawDesc {
231    fn as_raw_desc(&self) -> MmapRawDescriptor;
232}
233
234impl MmapAsRawDesc for RawFd {
235    fn as_raw_desc(&self) -> MmapRawDescriptor {
236        MmapRawDescriptor(*self)
237    }
238}
239
240impl<'a, T> MmapAsRawDesc for &'a T
241where
242    T: AsRawFd,
243{
244    fn as_raw_desc(&self) -> MmapRawDescriptor {
245        MmapRawDescriptor(self.as_raw_fd())
246    }
247}
248
249#[cfg(test)]
250mod test {
251    use super::*;
252
253    #[test]
254    fn test_mmap_append() {
255        use std::fs::OpenOptions;
256
257        let tempdir = tempfile::tempdir().unwrap();
258        let path = tempdir.path().join("mmap");
259
260        let file = OpenOptions::new()
261            .read(true)
262            .write(true)
263            .create(true)
264            .open(&path)
265            .unwrap();
266
267        file.set_len(0 as u64).unwrap();
268
269        // Verify it won't work if too small
270        unsafe {
271            assert!(MmapAppend::new(&file, true).is_err());
272        }
273
274        file.set_len(32 as u64).unwrap();
275        let mmap = unsafe { MmapAppend::new(&file, true).unwrap() };
276        assert_eq!(mmap.len(), 8); // only 8 bytes written so far
277
278        assert_eq!(mmap.get_end(), HEADER_SIZE);
279
280        let thirty_two_bytes: Vec<u8> = vec![
281            0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
282            24, 25, 26, 27, 28, 29, 30, 31,
283        ];
284
285        // Not enough space
286        assert!(mmap
287            .append(thirty_two_bytes.len(), |s: &mut [u8]| {
288                s.copy_from_slice(&thirty_two_bytes);
289                Ok(thirty_two_bytes.len())
290            })
291            .is_err());
292
293        // Resize
294        mmap.resize(128).unwrap();
295
296        mmap.append(thirty_two_bytes.len(), |s: &mut [u8]| -> Result<usize> {
297            s.copy_from_slice(&thirty_two_bytes);
298            Ok(thirty_two_bytes.len())
299        })
300        .unwrap();
301
302        assert_eq!(mmap.get_end(), 32 + HEADER_SIZE);
303
304        mmap.append(thirty_two_bytes.len(), |s: &mut [u8]| -> Result<usize> {
305            s.copy_from_slice(&thirty_two_bytes);
306            Ok(thirty_two_bytes.len())
307        })
308        .unwrap();
309
310        assert_eq!(mmap.get_end(), 32 + 32 + HEADER_SIZE);
311    }
312}