d4_framefile/
randfile.rs

1#[cfg(all(feature = "mapped_io", not(target_arch = "wasm32")))]
2use std::fs::File;
3use std::io::{Read, Result, Seek, SeekFrom, Write};
4use std::ops::Deref;
5use std::sync::{Arc, Mutex};
6
7/// The file object that supports random access. Since in D4 file,
8/// we actually use a random access file mode, which means all the read
9/// and write needs to provide the address in file. 
10/// And this is the object that provides the low level random access interface.
11///
12/// At the same time, this RandFile object is synchronized, which means we guarantee
13/// the thread safety that each block of data is written to file correctly (without overlaps).
14///
15/// The rand file provides a offset-based file access API and data can be read and write from the
16/// specified address in blocks. But rand file itself doesn't tracking the block size and it's the
17/// upper layer's responsibility to determine the correct block beginning.
18pub struct RandFile<T> {
19    inner: Arc<Mutex<IoWrapper<T>>>,
20    token: u32,
21}
22
23impl<T> Drop for RandFile<T> {
24    fn drop(&mut self) {
25        let mut inner = self.inner.lock().unwrap();
26        if inner.token_stack[self.token as usize].ref_count > 0 {
27            inner.token_stack[self.token as usize].ref_count -= 1;
28        }
29        let mut update_callbacks = vec![];
30        while inner.current_token > 0
31            && inner.token_stack[inner.current_token as usize].ref_count == 0
32        {
33            inner.current_token -= 1;
34            if let Some(TokenStackItem {
35                on_release: update, ..
36            }) = inner.token_stack.pop()
37            {
38                update_callbacks.push(update);
39            }
40        }
41        drop(inner);
42        update_callbacks.into_iter().for_each(|f| f());
43    }
44}
45
46struct TokenStackItem {
47    ref_count: u32,
48    on_release: Box<dyn FnOnce() + Send>,
49}
50
51/// This is the internal wrapper of an IO object that used by D4 randfile.
52/// It's used with mutex and enforces the lock policy D4 randfile is using.
53/// This wrapper is shared between different higher level IO objects, for instance: a directory in
54/// framefile.
55struct IoWrapper<T> {
56    inner: T,
57    current_token: u32,
58    token_stack: Vec<TokenStackItem>,
59}
60
61impl<T> IoWrapper<T> {
62    fn try_borrow_mut(&mut self, token: u32) -> Result<&mut T> {
63        if token == self.current_token {
64            Ok(&mut self.inner)
65        } else {
66            Err(std::io::Error::new(
67                std::io::ErrorKind::Other,
68                "Rand file locked",
69            ))
70        }
71    }
72    fn seek(&mut self, addr: u64) -> Result<()>
73    where
74        T: Seek,
75    {
76        self.inner.seek(SeekFrom::Start(addr))?;
77        Ok(())
78    }
79    fn read(&mut self, buf: &mut [u8]) -> Result<usize>
80    where
81        T: Read,
82    {
83        self.inner.read(buf)
84    }
85}
86
87impl<T> Deref for IoWrapper<T> {
88    type Target = T;
89    fn deref(&self) -> &T {
90        &self.inner
91    }
92}
93
94impl<T> Clone for RandFile<T> {
95    fn clone(&self) -> Self {
96        self.inner.lock().unwrap().token_stack[self.token as usize].ref_count += 1;
97        Self {
98            inner: self.inner.clone(),
99            token: self.token,
100        }
101    }
102}
103
104impl<T> RandFile<T> {
105    pub fn clone_inner(&self) -> Result<T>
106    where
107        T: Clone,
108    {
109        let inner = self
110            .inner
111            .lock()
112            .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "Lock Error"))?;
113        Ok(inner.inner.clone())
114    }
115    /// Create a new random access file wrapper
116    ///
117    /// - `inner`: The underlying implementation for the backend
118    /// - `returns`: The newly created random file object
119    pub(crate) fn new(inner: T) -> Self {
120        RandFile {
121            inner: Arc::new(Mutex::new(IoWrapper {
122                current_token: 0,
123                token_stack: vec![TokenStackItem {
124                    ref_count: 1,
125                    on_release: Box::new(|| ()),
126                }],
127                inner,
128            })),
129            token: 0,
130        }
131    }
132
133    /// Lock the current IO object and derive a fresh token
134    /// This will prevent any object that holds earlier token from locking this file again.
135    /// However, the freshly returned token can be cloned.
136    pub fn lock(&mut self, update_fn: Box<dyn FnOnce() + Send>) -> Result<Self> {
137        let mut inner = self
138            .inner
139            .lock()
140            .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "Lock Error"))?;
141        inner.current_token += 1;
142        inner.token_stack.push(TokenStackItem {
143            ref_count: 1,
144            on_release: update_fn,
145        });
146        let token = inner.current_token;
147        drop(inner);
148        Ok(RandFile {
149            inner: self.inner.clone(),
150            token,
151        })
152    }
153}
154
155impl<T: Read + Write + Seek> RandFile<T> {
156    /// The convenient helper function to create a read-write random file
157    ///
158    /// - `inner`: The underlying implementation for this backend
159    pub fn for_read_write(inner: T) -> Self {
160        Self::new(inner)
161    }
162}
163
164#[cfg(all(feature = "mapped_io", not(target_arch = "wasm32")))]
165impl RandFile<File> {
166    pub fn mmap(&self, offset: u64, size: usize) -> Result<mapping::MappingHandle> {
167        mapping::MappingHandle::new(self, offset, size)
168    }
169
170    pub fn mmap_mut(&mut self, offset: u64, size: usize) -> Result<mapping::MappingHandleMut> {
171        mapping::MappingHandleMut::new(self, offset, size)
172    }
173}
174
175impl<T: Write + Seek> RandFile<T> {
176    /// Append a block to the random accessing file
177    /// the return value is the relative address compare to the last
178    /// accessed block.
179    ///
180    /// - `buf`: The data buffer that needs to be write
181    /// - `returns`: The absolute address of the block that has been written to the file.
182    pub fn append_block(&mut self, buf: &[u8]) -> Result<u64> {
183        let mut inner = self
184            .inner
185            .lock()
186            .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "LockError"))?;
187        let ret = inner.try_borrow_mut(self.token)?.seek(SeekFrom::End(0))?;
188        inner.try_borrow_mut(self.token)?.write_all(buf)?;
189        Ok(ret)
190    }
191
192    /// Update a data block with the given data buffer.
193    ///
194    /// - `offset`: The offset of the data block
195    /// - `buf`: The data buffer to write
196    pub fn update_block(&mut self, offset: u64, buf: &[u8]) -> Result<()> {
197        let mut inner = self
198            .inner
199            .lock()
200            .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "LockError"))?;
201        inner
202            .try_borrow_mut(self.token)?
203            .seek(SeekFrom::Start(offset))?;
204        inner.try_borrow_mut(self.token)?.write_all(buf)?;
205        Ok(())
206    }
207
208    /// Reserve some space in the rand file. This is useful when we want to reserve a data block
209    /// for future use. This is very useful for some volatile data (for example the directory block), etc.
210    /// And later, we are able to use `update_block` function to keep the reserved block up-to-dated
211    pub fn reserve_block(&mut self, size: usize) -> Result<u64> {
212        let mut inner = self
213            .inner
214            .lock()
215            .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "LockError"))?;
216        let ret = inner.try_borrow_mut(self.token)?.seek(SeekFrom::End(0))?;
217        inner
218            .try_borrow_mut(self.token)?
219            .seek(SeekFrom::Current(size as i64 - 1))?;
220        inner.try_borrow_mut(self.token)?.write_all(b"\0")?;
221        Ok(ret)
222    }
223}
224
225impl<T: Read + Seek> RandFile<T> {
226    pub fn size(&mut self) -> Result<u64> {
227        let mut inner = self
228            .inner
229            .lock()
230            .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "LockError"))?;
231        inner.try_borrow_mut(self.token)?.seek(SeekFrom::End(0))
232    }
233    /// Read a block from the random accessing file
234    /// the size of the buffer slice is equal to the number of bytes that is requesting
235    /// But there might not be enough bytes available for read, thus we always return
236    /// the actual number of bytes is loaded
237    pub fn read_block(&mut self, addr: u64, buf: &mut [u8]) -> Result<usize> {
238        let mut inner = self
239            .inner
240            .lock()
241            .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "LockError"))?;
242        inner.seek(addr)?;
243        let mut ret = 0;
244        loop {
245            let bytes_read = inner.read(&mut buf[ret..])?;
246            if bytes_read == 0 {
247                break Ok(ret);
248            }
249            ret += bytes_read;
250        }
251    }
252}
253
254#[cfg(all(feature = "mapped_io", not(target_arch = "wasm32")))]
255pub mod mapping {
256    use super::*;
257
258    use memmap::{Mmap, MmapMut, MmapOptions};
259    use std::fs::File;
260    use std::io::{Error, ErrorKind};
261    use std::sync::Arc;
262
263    struct SyncGuard(MmapMut);
264
265    impl Drop for SyncGuard {
266        fn drop(&mut self) {
267            self.0.flush().expect("Sync Error");
268        }
269    }
270
271    #[derive(Clone)]
272    pub struct MappingHandle(Arc<Option<Mmap>>);
273
274    impl AsRef<[u8]> for MappingHandle {
275        fn as_ref(&self) -> &[u8] {
276            if let Some(ref mmap) = *self.0 {
277                mmap.as_ref()
278            } else {
279                &[]
280            }
281        }
282    }
283
284    impl MappingHandle {
285        pub(super) fn new(file: &RandFile<File>, offset: u64, size: usize) -> Result<Self> {
286            let inner = file
287                .inner
288                .lock()
289                .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
290            let mapped = if size > 0 {
291                Some(unsafe { MmapOptions::new().offset(offset).len(size).map(&*inner)? })
292            } else {
293                None
294            };
295            drop(inner);
296            Ok(MappingHandle(Arc::new(mapped)))
297        }
298    }
299
300    #[derive(Clone)]
301    pub struct MappingHandleMut {
302        handle: Arc<SyncGuard>,
303        base_addr: *mut u8,
304        size: usize,
305    }
306
307    unsafe impl Send for MappingHandleMut {}
308
309    impl AsRef<[u8]> for MappingHandleMut {
310        fn as_ref(&self) -> &[u8] {
311            self.handle.as_ref().0.as_ref()
312        }
313    }
314
315    impl AsMut<[u8]> for MappingHandleMut {
316        fn as_mut(&mut self) -> &mut [u8] {
317            unsafe { std::slice::from_raw_parts_mut(self.base_addr, self.size) }
318        }
319    }
320
321    impl MappingHandleMut {
322        pub(super) fn new(file: &RandFile<File>, offset: u64, size: usize) -> Result<Self> {
323            let inner = file
324                .inner
325                .lock()
326                .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
327            let mut mapped = unsafe {
328                MmapOptions::new()
329                    .offset(offset)
330                    .len(size)
331                    .map_mut(&*inner)?
332            };
333            drop(inner);
334            let base_addr = mapped.as_mut().as_mut_ptr();
335            Ok(MappingHandleMut {
336                handle: Arc::new(SyncGuard(mapped)),
337                base_addr,
338                size,
339            })
340        }
341    }
342}
343
344#[cfg(test)]
345mod test {
346    use super::*;
347    use std::io::Cursor;
348    #[test]
349    fn test_from_inner() {
350        let backend = Cursor::new(vec![0; 1024]);
351        let _rand_file = RandFile::new(backend);
352
353        let backend = Cursor::new(vec![0; 1024]);
354        let _rand_file = RandFile::new(backend);
355    }
356
357    #[test]
358    fn test_read_write_blocks() {
359        let backend = Cursor::new(vec![0; 0]);
360        let mut rand_file = RandFile::new(backend);
361        assert_eq!(0, rand_file.append_block(b"This is a test block").unwrap());
362        assert_eq!(20, rand_file.append_block(b"This is a test block").unwrap());
363
364        let mut buf = [0u8; 20];
365        assert_eq!(20, rand_file.read_block(0, &mut buf).unwrap());
366        assert_eq!(b"This is a test block", &buf);
367    }
368
369    #[test]
370    fn test_lock() {
371        let backend = Cursor::new(vec![0; 0]);
372        let mut rand_file = RandFile::new(backend);
373        let flag = Arc::new(std::sync::Mutex::new(false));
374        {
375            let flag = flag.clone();
376            let mut locked = rand_file
377                .lock(Box::new(move || {
378                    *flag.lock().unwrap() = true;
379                }))
380                .unwrap();
381            let mut locked_clone = locked.clone();
382
383            locked.append_block(b"a").unwrap();
384            locked_clone.append_block(b"a").unwrap();
385
386            rand_file.append_block(b"c").expect_err("Should be error!");
387        }
388        rand_file.append_block(b"c").unwrap();
389        assert!(*flag.lock().unwrap());
390    }
391}