camelliakv/storage/journal/
nvm_buffer.rs1use std::cmp;
2use std::io::{self, Read, Seek, SeekFrom, Write};
3use std::ptr;
4
5use crate::block::{AlignedBytes, BlockSize};
6use crate::nvm::NonVolatileMemory;
7use crate::{ErrorKind, Result};
8
9#[derive(Debug)]
14pub struct JournalNvmBuffer<N: NonVolatileMemory> {
15 inner: N,
17
18 position: u64,
20
21 write_buf: AlignedBytes,
37
38 write_buf_offset: u64,
44
45 maybe_dirty: bool,
50
51 read_buf: AlignedBytes,
56}
57impl<N: NonVolatileMemory> JournalNvmBuffer<N> {
58 pub fn new(nvm: N) -> Self {
68 let block_size = nvm.block_size();
69 JournalNvmBuffer {
70 inner: nvm,
71 position: 0,
72 maybe_dirty: false,
73 write_buf_offset: 0,
74 write_buf: AlignedBytes::new(0, block_size),
75 read_buf: AlignedBytes::new(0, block_size),
76 }
77 }
78
79 #[cfg(test)]
80 pub fn nvm(&self) -> &N {
81 &self.inner
82 }
83
84 fn is_dirty_area(&self, offset: u64, length: usize) -> bool {
85 if !self.maybe_dirty || length == 0 || self.write_buf.is_empty() {
86 return false;
87 }
88 if self.write_buf_offset < offset {
89 let buf_end = self.write_buf_offset + self.write_buf.len() as u64;
90 offset < buf_end
91 } else {
92 let end = offset + length as u64;
93 self.write_buf_offset < end
94 }
95 }
96
97 fn flush_write_buf(&mut self) -> Result<()> {
98 if self.write_buf.is_empty() || !self.maybe_dirty {
99 return Ok(());
100 }
101
102 track_io!(self.inner.seek(SeekFrom::Start(self.write_buf_offset)))?;
103 track_io!(self.inner.write(&self.write_buf))?;
104 if self.write_buf.len() > self.block_size().as_u16() as usize {
105 let new_len = self.block_size().as_u16() as usize;
113 let drop_len = self.write_buf.len() - new_len;
114 unsafe {
115 ptr::copy(
117 self.write_buf.as_ptr().add(drop_len), self.write_buf.as_mut_ptr(), new_len,
120 );
121 }
122 self.write_buf.truncate(new_len);
123
124 self.write_buf_offset += drop_len as u64;
125 }
126 self.maybe_dirty = false;
127 Ok(())
128 }
129
130 fn check_overflow(&self, write_len: usize) -> Result<()> {
131 let next_position = self.position() + write_len as u64;
132 track_assert!(
133 next_position <= self.capacity(),
134 ErrorKind::InconsistentState,
135 "self.position={}, write_len={}, self.len={}",
136 self.position(),
137 write_len,
138 self.capacity()
139 );
140 Ok(())
141 }
142}
143impl<N: NonVolatileMemory> NonVolatileMemory for JournalNvmBuffer<N> {
144 fn sync(&mut self) -> Result<()> {
145 track!(self.flush_write_buf())?;
146 self.inner.sync()
147 }
148
149 fn position(&self) -> u64 {
150 self.position
151 }
152
153 fn capacity(&self) -> u64 {
154 self.inner.capacity()
155 }
156
157 fn block_size(&self) -> BlockSize {
158 self.inner.block_size()
159 }
160
161 fn split(self, _: u64) -> Result<(Self, Self)> {
162 unreachable!()
163 }
164}
165impl<N: NonVolatileMemory> Drop for JournalNvmBuffer<N> {
166 fn drop(&mut self) {
167 let _ = self.sync();
168 }
169}
170impl<N: NonVolatileMemory> Seek for JournalNvmBuffer<N> {
171 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
172 let offset = track!(self.convert_to_offset(pos))?;
173 self.position = offset;
174 Ok(offset)
175 }
176}
177impl<N: NonVolatileMemory> Read for JournalNvmBuffer<N> {
178 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
179 if self.is_dirty_area(self.position, buf.len()) {
180 track!(self.flush_write_buf())?;
181 }
182
183 let aligned_start = self.block_size().floor_align(self.position);
184 let aligned_end = self
185 .block_size()
186 .ceil_align(self.position + buf.len() as u64);
187
188 self.read_buf
189 .aligned_resize((aligned_end - aligned_start) as usize);
190 self.inner.seek(SeekFrom::Start(aligned_start))?;
191 let inner_read_size = self.inner.read(&mut self.read_buf)?;
192
193 let start = (self.position - aligned_start) as usize;
194 let end = cmp::min(inner_read_size, start + buf.len());
195 let read_size = end - start;
196 (&mut buf[..read_size]).copy_from_slice(&self.read_buf[start..end]);
197 self.position += read_size as u64;
198 Ok(read_size)
199 }
200}
201impl<N: NonVolatileMemory> Write for JournalNvmBuffer<N> {
202 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
203 track!(self.check_overflow(buf.len()))?;
204
205 let write_buf_start = self.write_buf_offset;
206 let write_buf_end = write_buf_start + self.write_buf.len() as u64;
207 if write_buf_start <= self.position && self.position <= write_buf_end {
208 let start = (self.position - self.write_buf_offset) as usize;
211 let end = start + buf.len();
212 self.write_buf.aligned_resize(end);
213 (&mut self.write_buf[start..end]).copy_from_slice(buf);
214 self.position += buf.len() as u64;
215 self.maybe_dirty = true;
216 Ok(buf.len())
217 } else {
218 track!(self.flush_write_buf())?;
220
221 if self.block_size().is_aligned(self.position) {
222 self.write_buf_offset = self.position;
223 self.write_buf.aligned_resize(0);
224 } else {
225 let size = self.block_size().as_u16();
227 self.write_buf_offset = self.block_size().floor_align(self.position);
228 self.write_buf.aligned_resize(size as usize);
229 self.inner.seek(SeekFrom::Start(self.write_buf_offset))?;
230 self.inner.read_exact(&mut self.write_buf)?;
231 }
232 self.write(buf)
233 }
234 }
235
236 fn flush(&mut self) -> io::Result<()> {
237 track!(self.flush_write_buf())?;
238 Ok(())
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use std::io::{Read, Seek, SeekFrom, Write};
245 use trackable::result::TestResult;
246
247 use super::*;
248 use crate::nvm::MemoryNvm;
249
250 #[test]
251 fn write_write_flush() -> TestResult {
252 let mut buffer = new_buffer();
254 track_io!(buffer.write_all(b"foo"))?;
255 assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
256
257 track_io!(buffer.write_all(b"bar"))?;
258 assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
259 assert_eq!(&buffer.nvm().as_bytes()[3..6], &[0; 3][..]);
260
261 track_io!(buffer.flush())?;
262 assert_eq!(&buffer.nvm().as_bytes()[0..6], b"foobar");
263 Ok(())
264 }
265
266 #[test]
267 fn write_seek_write_flush() -> TestResult {
268 let mut buffer = new_buffer();
271 track_io!(buffer.write_all(b"foo"))?;
272 assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
273
274 track_io!(buffer.seek(SeekFrom::Current(1)))?;
275 track_io!(buffer.write_all(b"bar"))?;
276 assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
277 assert_eq!(&buffer.nvm().as_bytes()[4..7], &[0; 3][..]);
278
279 track_io!(buffer.flush())?;
280 assert_eq!(&buffer.nvm().as_bytes()[0..3], b"foo");
281 assert_eq!(&buffer.nvm().as_bytes()[4..7], b"bar");
282
283 let mut buffer = new_buffer();
285 track_io!(buffer.write_all(b"foo"))?;
286 assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
287
288 track_io!(buffer.seek(SeekFrom::Start(512)))?;
289 track_io!(buffer.write_all(b"bar"))?;
290 assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
291 assert_eq!(&buffer.nvm().as_bytes()[512..515], &[0; 3][..]);
292
293 track_io!(buffer.flush())?;
294 assert_eq!(&buffer.nvm().as_bytes()[0..3], b"foo");
295 assert_eq!(&buffer.nvm().as_bytes()[512..515], b"bar");
296
297 let mut buffer = new_buffer();
299 track_io!(buffer.write_all(b"foo"))?;
300 assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
301
302 track_io!(buffer.seek(SeekFrom::Current(-1)))?;
303 track_io!(buffer.write_all(b"bar"))?;
304 assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
305 assert_eq!(&buffer.nvm().as_bytes()[2..5], &[0; 3][..]);
306
307 track_io!(buffer.flush())?;
308 assert_eq!(&buffer.nvm().as_bytes()[0..5], b"fobar");
309 Ok(())
310 }
311
312 #[test]
313 fn write_seek_write() -> TestResult {
314 let mut buffer = new_buffer();
316 track_io!(buffer.write_all(b"foo"))?;
317 assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
318
319 track_io!(buffer.seek(SeekFrom::Start(513)))?;
320 track_io!(buffer.write_all(b"bar"))?;
321 assert_eq!(&buffer.nvm().as_bytes()[0..3], b"foo");
322 assert_eq!(&buffer.nvm().as_bytes()[513..516], &[0; 3][..]);
323 Ok(())
324 }
325
326 #[test]
327 fn write_seek_read() -> TestResult {
328 let mut buffer = new_buffer();
330 track_io!(buffer.write_all(b"foo"))?;
331 assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
332
333 track_io!(buffer.read_exact(&mut [0; 1][..]))?;
334 assert_eq!(&buffer.nvm().as_bytes()[0..3], b"foo");
335
336 let mut buffer = new_buffer();
338 track_io!(buffer.write_all(b"foo"))?;
339 assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
340
341 track_io!(buffer.seek(SeekFrom::Start(512)))?;
342 track_io!(buffer.read_exact(&mut [0; 1][..]))?;
343 assert_eq!(&buffer.nvm().as_bytes()[0..3], &[0; 3][..]);
344 Ok(())
345 }
346
347 #[test]
348 fn overwritten() -> TestResult {
349 let mut buffer = new_buffer();
352 track_io!(buffer.write_all(&[b'a'; 512]))?;
353 track_io!(buffer.flush())?;
354 assert_eq!(&buffer.nvm().as_bytes()[0..512], &[b'a'; 512][..]);
355
356 track_io!(buffer.seek(SeekFrom::Start(256)))?;
357 track_io!(buffer.write_all(&[b'b'; 1]))?;
358 track_io!(buffer.flush())?;
359 assert_eq!(&buffer.nvm().as_bytes()[0..256], &[b'a'; 256][..]);
360 assert_eq!(buffer.nvm().as_bytes()[256], b'b');
361 Ok(())
362 }
363
364 fn new_buffer() -> JournalNvmBuffer<MemoryNvm> {
365 let nvm = MemoryNvm::new(vec![0; 10 * 1024]);
366 JournalNvmBuffer::new(nvm)
367 }
368}