camelliakv/storage/journal/
nvm_buffer.rs

1use std::cmp;
2use std::io::{self, Read, Seek, SeekFrom, Write};
3use std::ptr;
4
5use crate::block::{AlignedBytes, BlockSize};
6use crate::nvm::NonVolatileMemory;
7use crate::{ErrorKind, Result};
8
9/// Buffer for journal region.
10///
11/// While filling the alignment constraint of the internal implementation for `NonVolatileMemory` 
12/// (i.e., aligned with the storage block boundary) and purpose of improving the journal region.
13#[derive(Debug)]
14pub struct JournalNvmBuffer<N: NonVolatileMemory> {
15    // An internal NVM instance used to actually persist data in a journal region.
16    inner: N,
17
18    // Current read/write position.
19    position: u64,
20
21    // Write buffer
22    //
23    // Write request issued from the journal region 
24    // Keep in this buffer on the memory until one of the following conditions are met,
25    // Not reflected in internal NVM:
26    // - The `sync` method was called:
27    //   - Journal region periodically call this method
28    // - When a read request is issued for a region overlapping the coverage range of write buffer:
29    //   - Flush the contents of write buffer, processing read command after synchronizes the internal NVM
30    // - When a write request is issued for a region that does not overlap the write range of write buffer:
31    //   - Data structures in current write buffer cannot have gaps (i. e., Multiple not continuous subregions)
32    //   - Therefore, once after the contents of old buffer are refreshed, allocate a buffer for processing write request. 
33    //
34    // The write request issued from journal region,
35    // It also aligns to internal NVM block boundary.
36    write_buf: AlignedBytes,
37
38    // T field for holding the position of `write_buf` on internal NVM.
39    //
40    // It is similar to the `position` field, pointing to the position on internal NVM 
41    // `position` is the value of the read and seek operations.
42    // `write_buf_offset` continues to use fixed value until the contents of the write buffer are flushed.
43    write_buf_offset: u64,
44
45    // Used to determine whether or not data is stored in the write buffer.
46    //
47    // If the data is written in the write buffer, it is set to `true`
48    // After the data in the buffer is flushed to the internal NVM, it is set to `false`
49    maybe_dirty: bool,
50
51    // Read buffer
52    //
53    // The read request issued by the journal region 
54    // that used to align the block boundary of the internal NVM.
55    read_buf: AlignedBytes,
56}
57impl<N: NonVolatileMemory> JournalNvmBuffer<N> {
58    /// Create a new `JournalNvmBuffer` instance.
59    ///
60    /// It actually use ` NVM ` for reading and writing.
61    ///
62    /// It is not necessary for user to care about `nvm` that it is aligned with 
63    /// the block boundary required by `nvm`, because `JournalNvmBuffer` ensures that.
64    ///
65    /// Therefore, it should be noted that overwritten data from the seek point to 
66    /// the next block boundary is not included in search.
67    pub fn new(nvm: N) -> Self {
68        let block_size = nvm.block_size();
69        JournalNvmBuffer {
70            inner: nvm,
71            position: 0,
72            maybe_dirty: false,
73            write_buf_offset: 0,
74            write_buf: AlignedBytes::new(0, block_size),
75            read_buf: AlignedBytes::new(0, block_size),
76        }
77    }
78
79    #[cfg(test)]
80    pub fn nvm(&self) -> &N {
81        &self.inner
82    }
83
84    fn is_dirty_area(&self, offset: u64, length: usize) -> bool {
85        if !self.maybe_dirty || length == 0 || self.write_buf.is_empty() {
86            return false;
87        }
88        if self.write_buf_offset < offset {
89            let buf_end = self.write_buf_offset + self.write_buf.len() as u64;
90            offset < buf_end
91        } else {
92            let end = offset + length as u64;
93            self.write_buf_offset < end
94        }
95    }
96
97    fn flush_write_buf(&mut self) -> Result<()> {
98        if self.write_buf.is_empty() || !self.maybe_dirty {
99            return Ok(());
100        }
101
102        track_io!(self.inner.seek(SeekFrom::Start(self.write_buf_offset)))?;
103        track_io!(self.inner.write(&self.write_buf))?;
104        if self.write_buf.len() > self.block_size().as_u16() as usize {
105            // This if case leaves information about trailing alignment bytes (= new_len) in the buffer.
106            // write_buf_offset is perform by write_buf.len() - new_len(= drop_len).
107            //
108            // write_buf_offset  can clear write_buf according to the write_buf.len() written successfully. 
109            // Because it only can be written by block length, 
110            // so NVM must be read once in the next write to obtain the entire block.
111            // To avoid this read, it takes the current implementation.
112            let new_len = self.block_size().as_u16() as usize;
113            let drop_len = self.write_buf.len() - new_len;
114            unsafe {
115                // This nonoverlappingness is guranteed by the callers.
116                ptr::copy(
117                    self.write_buf.as_ptr().add(drop_len), // src
118                    self.write_buf.as_mut_ptr(),           // dst
119                    new_len,
120                );
121            }
122            self.write_buf.truncate(new_len);
123
124            self.write_buf_offset += drop_len as u64;
125        }
126        self.maybe_dirty = false;
127        Ok(())
128    }
129
130    fn check_overflow(&self, write_len: usize) -> Result<()> {
131        let next_position = self.position() + write_len as u64;
132        track_assert!(
133            next_position <= self.capacity(),
134            ErrorKind::InconsistentState,
135            "self.position={}, write_len={}, self.len={}",
136            self.position(),
137            write_len,
138            self.capacity()
139        );
140        Ok(())
141    }
142}
143impl<N: NonVolatileMemory> NonVolatileMemory for JournalNvmBuffer<N> {
144    fn sync(&mut self) -> Result<()> {
145        track!(self.flush_write_buf())?;
146        self.inner.sync()
147    }
148
149    fn position(&self) -> u64 {
150        self.position
151    }
152
153    fn capacity(&self) -> u64 {
154        self.inner.capacity()
155    }
156
157    fn block_size(&self) -> BlockSize {
158        self.inner.block_size()
159    }
160
161    fn split(self, _: u64) -> Result<(Self, Self)> {
162        unreachable!()
163    }
164}
165impl<N: NonVolatileMemory> Drop for JournalNvmBuffer<N> {
166    fn drop(&mut self) {
167        let _ = self.sync();
168    }
169}
170impl<N: NonVolatileMemory> Seek for JournalNvmBuffer<N> {
171    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
172        let offset = track!(self.convert_to_offset(pos))?;
173        self.position = offset;
174        Ok(offset)
175    }
176}
177impl<N: NonVolatileMemory> Read for JournalNvmBuffer<N> {
178    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
179        if self.is_dirty_area(self.position, buf.len()) {
180            track!(self.flush_write_buf())?;
181        }
182
183        let aligned_start = self.block_size().floor_align(self.position);
184        let aligned_end = self
185            .block_size()
186            .ceil_align(self.position + buf.len() as u64);
187
188        self.read_buf
189            .aligned_resize((aligned_end - aligned_start) as usize);
190        self.inner.seek(SeekFrom::Start(aligned_start))?;
191        let inner_read_size = self.inner.read(&mut self.read_buf)?;
192
193        let start = (self.position - aligned_start) as usize;
194        let end = cmp::min(inner_read_size, start + buf.len());
195        let read_size = end - start;
196        (&mut buf[..read_size]).copy_from_slice(&self.read_buf[start..end]);
197        self.position += read_size as u64;
198        Ok(read_size)
199    }
200}
201impl<N: NonVolatileMemory> Write for JournalNvmBuffer<N> {
202    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
203        track!(self.check_overflow(buf.len()))?;
204
205        let write_buf_start = self.write_buf_offset;
206        let write_buf_end = write_buf_start + self.write_buf.len() as u64;
207        if write_buf_start <= self.position && self.position <= write_buf_end {
208            // The buffer is duplicated and can be insert from the middle of the buffer.
209            // (i.e., Unnecessary flash buffer)
210            let start = (self.position - self.write_buf_offset) as usize;
211            let end = start + buf.len();
212            self.write_buf.aligned_resize(end);
213            (&mut self.write_buf[start..end]).copy_from_slice(buf);
214            self.position += buf.len() as u64;
215            self.maybe_dirty = true;
216            Ok(buf.len())
217        } else {
218            // Because the buffer is not duplicated, the contents of the buffer are temporarily written back once.
219            track!(self.flush_write_buf())?;
220
221            if self.block_size().is_aligned(self.position) {
222                self.write_buf_offset = self.position;
223                self.write_buf.aligned_resize(0);
224            } else {
225                // Read once so that existing data preceding the seek position will not be discarded.
226                let size = self.block_size().as_u16();
227                self.write_buf_offset = self.block_size().floor_align(self.position);
228                self.write_buf.aligned_resize(size as usize);
229                self.inner.seek(SeekFrom::Start(self.write_buf_offset))?;
230                self.inner.read_exact(&mut self.write_buf)?;
231            }
232            self.write(buf)
233        }
234    }
235
236    fn flush(&mut self) -> io::Result<()> {
237        track!(self.flush_write_buf())?;
238        Ok(())
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use std::io::{Read, Seek, SeekFrom, Write};
245    use trackable::result::TestResult;
246
247    use super::*;
248    use crate::nvm::MemoryNvm;
249
250    #[test]
251    fn write_write_flush() -> TestResult {
252        // Continuous region writes remain on buffer until `flush`.
253        let mut buffer = new_buffer();
254        track_io!(buffer.write_all(b"foo"))?;
255        assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
256
257        track_io!(buffer.write_all(b"bar"))?;
258        assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
259        assert_eq!(&buffer.nvm().as_bytes()[3..6], &[0; 3][..]);
260
261        track_io!(buffer.flush())?;
262        assert_eq!(&buffer.nvm().as_bytes()[0..6], b"foobar");
263        Ok(())
264    }
265
266    #[test]
267    fn write_seek_write_flush() -> TestResult {
268        // The "continuous" determination is performed in blocks
269        // (Even if the seek does not span blocks, it will not be determined as "discontinuous")
270        let mut buffer = new_buffer();
271        track_io!(buffer.write_all(b"foo"))?;
272        assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
273
274        track_io!(buffer.seek(SeekFrom::Current(1)))?;
275        track_io!(buffer.write_all(b"bar"))?;
276        assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
277        assert_eq!(&buffer.nvm().as_bytes()[4..7], &[0; 3][..]);
278
279        track_io!(buffer.flush())?;
280        assert_eq!(&buffer.nvm().as_bytes()[0..3], b"foo");
281        assert_eq!(&buffer.nvm().as_bytes()[4..7], b"bar");
282
283        // if the target is far away, the storage is also in a continuous block.
284        let mut buffer = new_buffer();
285        track_io!(buffer.write_all(b"foo"))?;
286        assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
287
288        track_io!(buffer.seek(SeekFrom::Start(512)))?;
289        track_io!(buffer.write_all(b"bar"))?;
290        assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
291        assert_eq!(&buffer.nvm().as_bytes()[512..515], &[0; 3][..]);
292
293        track_io!(buffer.flush())?;
294        assert_eq!(&buffer.nvm().as_bytes()[0..3], b"foo");
295        assert_eq!(&buffer.nvm().as_bytes()[512..515], b"bar");
296
297        // If the write area is overlapped.
298        let mut buffer = new_buffer();
299        track_io!(buffer.write_all(b"foo"))?;
300        assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
301
302        track_io!(buffer.seek(SeekFrom::Current(-1)))?;
303        track_io!(buffer.write_all(b"bar"))?;
304        assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
305        assert_eq!(&buffer.nvm().as_bytes()[2..5], &[0; 3][..]);
306
307        track_io!(buffer.flush())?;
308        assert_eq!(&buffer.nvm().as_bytes()[0..5], b"fobar");
309        Ok(())
310    }
311
312    #[test]
313    fn write_seek_write() -> TestResult {
314        // If the write target (in blocks) is no longer in block, the contents of current buffer are written back to NVM.
315        let mut buffer = new_buffer();
316        track_io!(buffer.write_all(b"foo"))?;
317        assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
318
319        track_io!(buffer.seek(SeekFrom::Start(513)))?;
320        track_io!(buffer.write_all(b"bar"))?;
321        assert_eq!(&buffer.nvm().as_bytes()[0..3], b"foo");
322        assert_eq!(&buffer.nvm().as_bytes()[513..516], &[0; 3][..]);
323        Ok(())
324    }
325
326    #[test]
327    fn write_seek_read() -> TestResult {
328        // If the target is overlapped with the write buffer, the contents of buffer are written back to NVM.
329        let mut buffer = new_buffer();
330        track_io!(buffer.write_all(b"foo"))?;
331        assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
332
333        track_io!(buffer.read_exact(&mut [0; 1][..]))?;
334        assert_eq!(&buffer.nvm().as_bytes()[0..3], b"foo");
335
336        // If the target is not overlapped with the write buffer, it will not be written back.
337        let mut buffer = new_buffer();
338        track_io!(buffer.write_all(b"foo"))?;
339        assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
340
341        track_io!(buffer.seek(SeekFrom::Start(512)))?;
342        track_io!(buffer.read_exact(&mut [0; 1][..]))?;
343        assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
344        Ok(())
345    }
346
347    #[test]
348    fn overwritten() -> TestResult {
349        // Keep the data ahead of seek points.
350        // (What happens to the data that is not defined to the next block boundary)
351        let mut buffer = new_buffer();
352        track_io!(buffer.write_all(&[b'a'; 512]))?;
353        track_io!(buffer.flush())?;
354        assert_eq!(&buffer.nvm().as_bytes()[0..512], &[b'a'; 512][..]);
355
356        track_io!(buffer.seek(SeekFrom::Start(256)))?;
357        track_io!(buffer.write_all(&[b'b'; 1]))?;
358        track_io!(buffer.flush())?;
359        assert_eq!(&buffer.nvm().as_bytes()[0..256], &[b'a'; 256][..]);
360        assert_eq!(buffer.nvm().as_bytes()[256], b'b');
361        Ok(())
362    }
363
364    fn new_buffer() -> JournalNvmBuffer<MemoryNvm> {
365        let nvm = MemoryNvm::new(vec![0; 10 * 1024]);
366        JournalNvmBuffer::new(nvm)
367    }
368}