camelliakv/storage/journal/
ring_buffer.rs

1use prometrics::metrics::MetricBuilder;
2use std::io::{BufReader, Read, Seek, SeekFrom};
3
4use super::record::{EMBEDDED_DATA_OFFSET, END_OF_RECORDS_SIZE};
5use super::{JournalEntry, JournalNvmBuffer, JournalRecord};
6use crate::lump::LumpId;
7use crate::metrics::JournalQueueMetrics;
8use crate::nvm::NonVolatileMemory;
9use crate::storage::portion::JournalPortion;
10use crate::storage::Address;
11use crate::{ErrorKind, Result};
12
13/// Ring buffer for journal region.
14#[derive(Debug)]
15pub struct JournalRingBuffer<N: NonVolatileMemory> {
16    nvm: JournalNvmBuffer<N>,
17
18    /// The start position of ring buffer and include the unreleased part.
19    ///
20    /// A record group located between `unreleased_head` and`head` is dequeued by `JournalRegion` 
21    /// but cannot be safely overwritten because it may not yet be rearranged by GC.
22    unreleased_head: u64,
23
24    /// Start position of ring buffer.
25    head: u64,
26
27    /// End position of ring buffer.
28    ///
29    /// This will be the next start position.
30    ///
31    /// Invariant: `unreleased_head <= head <= tail`
32    tail: u64,
33
34    metrics: JournalQueueMetrics,
35}
36impl<N: NonVolatileMemory> JournalRingBuffer<N> {
37    pub fn head(&self) -> u64 {
38        self.head
39    }
40    pub fn tail(&self) -> u64 {
41        self.tail
42    }
43
44    pub fn journal_entries(&mut self) -> Result<(u64, u64, u64, Vec<JournalEntry>)> {
45        track_io!(self.nvm.seek(SeekFrom::Start(self.head)))?;
46        let result: Result<Vec<JournalEntry>> =
47            ReadEntries::new(&mut self.nvm, self.head).collect();
48        result.map(|r| (self.unreleased_head, self.head, self.tail, r))
49    }
50
51    /// Create an `JournalRingBuffer` instance.
52    pub fn new(nvm: N, head: u64, metric_builder: &MetricBuilder) -> Self {
53        let metrics = JournalQueueMetrics::new(metric_builder);
54        metrics.capacity_bytes.set(nvm.capacity() as f64);
55        JournalRingBuffer {
56            nvm: JournalNvmBuffer::new(nvm),
57            unreleased_head: head,
58            head,
59            tail: head,
60            metrics,
61        }
62    }
63
64    /// Return an iterator for restore previous entries from NVM and operate them.
65    ///
66    /// Assume that only once called after the generated.
67    pub fn restore_entries(&mut self) -> Result<RestoredEntries<N>> {
68        track!(RestoredEntries::new(self))
69    }
70
71    /// Determine whether an element exists in the ring buffer.
72    pub fn is_empty(&self) -> bool {
73        self.head == self.tail
74    }
75
76    /// Return the amount (in bytes) of a ring buffer.
77    pub fn usage(&self) -> u64 {
78        if self.unreleased_head <= self.tail {
79            self.tail - self.unreleased_head
80        } else {
81            (self.tail + self.capacity()) - self.unreleased_head
82        }
83    }
84
85    /// Return the capacity (in bytes) of a ring buffer.
86    pub fn capacity(&self) -> u64 {
87        self.nvm.capacity()
88    }
89
90    /// Return the ring buffer metrics.
91    pub fn metrics(&self) -> &JournalQueueMetrics {
92        &self.metrics
93    }
94
95    /// Read lump data embedded in the specified position..
96    ///
97    /// Data validation is not performed in `CamelliaKV`.
98    pub fn read_embedded_data(&mut self, position: u64, buf: &mut [u8]) -> Result<()> {
99        track_io!(self.nvm.seek(SeekFrom::Start(position)))?;
100        track_io!(self.nvm.read_exact(buf))?;
101        Ok(())
102    }
103
104    /// Send synchronization commands to physical devices.
105    pub fn sync(&mut self) -> Result<()> {
106        track!(self.nvm.sync())
107    }
108
109    /// Add a record at the end of the journal.
110    ///
111    /// If the record is "JournalRecord:: Embed", return the position of embedded data.
112    pub fn enqueue<B: AsRef<[u8]>>(
113        &mut self,
114        record: &JournalRecord<B>,
115    ) -> Result<Option<(LumpId, JournalPortion)>> {
116        // 1. Check if there have enough free space.
117        track!(self.check_free_space(record))?;
118
119        // 2. Ring buffer tail check
120        if self.will_overflow(record) {
121            track_io!(self.nvm.seek(SeekFrom::Start(self.tail)))?;
122            track!(JournalRecord::GoToFront::<[_; 0]>.write_to(&mut self.nvm))?;
123
124            // Back to top
125            self.metrics
126                .consumed_bytes_at_running
127                .add_u64(self.nvm.capacity() - self.tail);
128            self.tail = 0;
129            debug_assert!(!self.will_overflow(record));
130            return self.enqueue(record);
131        }
132
133        // 3. Write a record
134        let prev_tail = self.tail;
135        track_io!(self.nvm.seek(SeekFrom::Start(self.tail)))?;
136        track!(record.write_to(&mut self.nvm))?;
137        self.metrics.enqueued_records_at_running.increment(record);
138
139        // 4. Also write a record indicating the end
140        self.tail = self.nvm.position(); // Save the next start position(just before `EndOfRecords`)
141        self.metrics
142            .consumed_bytes_at_running
143            .add_u64(self.tail - prev_tail);
144        track!(JournalRecord::EndOfRecords::<[_; 0]>.write_to(&mut self.nvm))?;
145
146        // 5. In the case of embedded PUT, Return position information to index.
147        if let JournalRecord::Embed(ref lump_id, ref data) = *record {
148            let portion = JournalPortion {
149                start: Address::from_u64(prev_tail + EMBEDDED_DATA_OFFSET as u64).unwrap(),
150                len: data.as_ref().len() as u16,
151            };
152            Ok(Some((*lump_id, portion)))
153        } else {
154            Ok(None)
155        }
156    }
157
158    /// Fetch entries from the beginning of the ring buffer.
159    ///
160    /// Scan ends when `EndOfRecords` is reached.
161    ///
162    /// The scan object does not include `EndOfRecords` and `GoToFront`.
163    pub fn dequeue_iter(&mut self) -> Result<DequeuedEntries<N>> {
164        track!(DequeuedEntries::new(self))
165    }
166
167    pub fn release_bytes_until(&mut self, point: u64) {
168        let released_bytes = if self.unreleased_head <= point {
169            point - self.unreleased_head
170        } else {
171            (point + self.nvm.capacity()) - self.unreleased_head
172        };
173        self.metrics.released_bytes.add_u64(released_bytes);
174
175        self.unreleased_head = point;
176    }
177
178    /// If the `record` is written, it is determined whether or not it exceeds the ring buffer.
179    fn will_overflow<B: AsRef<[u8]>>(&self, record: &JournalRecord<B>) -> bool {
180        let mut next_tail = self.tail + record.external_size() as u64;
181
182        // `EndOfRecords` is always written at the end, so the size should also be considered.
183        next_tail += END_OF_RECORDS_SIZE as u64;
184
185        next_tail > self.nvm.capacity()
186    }
187
188    ///  Write `record` to check the ring buffer of TAIL is exceeding HEAD.
189    fn check_free_space<B: AsRef<[u8]>>(&mut self, record: &JournalRecord<B>) -> Result<()> {
190        // Calculate the physical end position of write.
191        let write_end = self.tail + (record.external_size() + END_OF_RECORDS_SIZE) as u64;
192
193        // Data to the next block boundary is overwritten.
194        let write_end = self.nvm.block_size().ceil_align(write_end);
195
196        // The tail that can safely writable position
197        let free_end = if self.tail < self.unreleased_head {
198            self.unreleased_head
199        } else {
200            self.nvm.capacity() + self.unreleased_head
201        };
202        track_assert!(
203            write_end <= free_end,
204            ErrorKind::StorageFull,
205            "journal region is full: unreleased_head={}, head={}, tail={}, write_end={}, free_end={}",
206            self.unreleased_head,
207            self.head,
208            self.tail,
209            write_end,
210            free_end
211        );
212        Ok(())
213    }
214}
215
216#[derive(Debug)]
217pub struct RestoredEntries<'a, N: 'a + NonVolatileMemory> {
218    entries: ReadEntries<'a, N>,
219    head: u64,
220    tail: &'a mut u64,
221    capacity: u64,
222    metrics: &'a JournalQueueMetrics,
223}
224impl<'a, N: 'a + NonVolatileMemory> RestoredEntries<'a, N> {
225    #[allow(clippy::new_ret_no_self)]
226    fn new(ring: &'a mut JournalRingBuffer<N>) -> Result<Self> {
227        // Check whether to call immediately after create.
228        track_assert_eq!(
229            ring.unreleased_head,
230            ring.head,
231            ErrorKind::InconsistentState
232        );
233        track_assert_eq!(ring.head, ring.tail, ErrorKind::InconsistentState);
234
235        track_io!(ring.nvm.seek(SeekFrom::Start(ring.head)))?;
236        let capacity = ring.nvm.capacity();
237        Ok(RestoredEntries {
238            entries: ReadEntries::with_capacity(&mut ring.nvm, ring.head, 1024 * 1024),
239            head: ring.head,
240            tail: &mut ring.tail,
241            capacity,
242            metrics: &ring.metrics,
243        })
244    }
245}
246impl<'a, N: 'a + NonVolatileMemory> Iterator for RestoredEntries<'a, N> {
247    type Item = Result<JournalEntry>;
248    fn next(&mut self) -> Option<Self::Item> {
249        let next = self.entries.next();
250        match next {
251            Some(Ok(ref entry)) => {
252                self.metrics
253                    .enqueued_records_at_starting
254                    .increment(&entry.record);
255                *self.tail = entry.end().as_u64();
256            }
257            None => {
258                let size = if self.head <= *self.tail {
259                    *self.tail - self.head
260                } else {
261                    (*self.tail + self.capacity) - self.head
262                };
263                self.metrics.consumed_bytes_at_starting.add_u64(size);
264            }
265            _ => {}
266        }
267        next
268    }
269}
270
271#[derive(Debug)]
272pub struct DequeuedEntries<'a, N: 'a + NonVolatileMemory> {
273    entries: ReadEntries<'a, N>,
274    head: &'a mut u64,
275    metrics: &'a JournalQueueMetrics,
276}
277impl<'a, N: 'a + NonVolatileMemory> DequeuedEntries<'a, N> {
278    #[allow(clippy::new_ret_no_self)]
279    fn new(ring: &'a mut JournalRingBuffer<N>) -> Result<Self> {
280        track_io!(ring.nvm.seek(SeekFrom::Start(ring.head)))?;
281        Ok(DequeuedEntries {
282            entries: ReadEntries::new(&mut ring.nvm, ring.head),
283            head: &mut ring.head,
284            metrics: &ring.metrics,
285        })
286    }
287}
288impl<'a, N: 'a + NonVolatileMemory> Iterator for DequeuedEntries<'a, N> {
289    type Item = Result<JournalEntry>;
290    fn next(&mut self) -> Option<Self::Item> {
291        let next = self.entries.next();
292        if let Some(Ok(ref entry)) = next {
293            self.metrics.dequeued_records.increment(&entry.record);
294            *self.head = entry.end().as_u64();
295        }
296        next
297    }
298}
299
300#[derive(Debug)]
301struct ReadEntries<'a, N: 'a + NonVolatileMemory> {
302    reader: BufReader<&'a mut JournalNvmBuffer<N>>,
303    current: u64,
304    is_second_lap: bool,
305}
306impl<'a, N: 'a + NonVolatileMemory> ReadEntries<'a, N> {
307    fn new(nvm: &'a mut JournalNvmBuffer<N>, head: u64) -> Self {
308        ReadEntries {
309            reader: BufReader::new(nvm),
310            current: head,
311            is_second_lap: false,
312        }
313    }
314    fn with_capacity(nvm: &'a mut JournalNvmBuffer<N>, head: u64, capacity: usize) -> Self {
315        ReadEntries {
316            reader: BufReader::with_capacity(capacity, nvm),
317            current: head,
318            is_second_lap: false,
319        }
320    }
321    fn read_record(&mut self) -> Result<Option<JournalRecord<Vec<u8>>>> {
322        match track!(JournalRecord::read_from(&mut self.reader))? {
323            JournalRecord::EndOfRecords => Ok(None),
324            JournalRecord::GoToFront => {
325                track_assert!(!self.is_second_lap, ErrorKind::StorageCorrupted);
326                track_io!(self.reader.seek(SeekFrom::Start(0)))?;
327                self.current = 0;
328                self.is_second_lap = true;
329                self.read_record()
330            }
331            record => Ok(Some(record)),
332        }
333    }
334}
335impl<'a, N: 'a + NonVolatileMemory> Iterator for ReadEntries<'a, N> {
336    type Item = Result<JournalEntry>;
337    fn next(&mut self) -> Option<Self::Item> {
338        match self.read_record() {
339            Err(e) => Some(Err(e)),
340            Ok(None) => None,
341            Ok(Some(record)) => {
342                let start = Address::from_u64(self.current).expect("Never fails");
343                self.current += record.external_size() as u64;
344                let entry = JournalEntry { start, record };
345                Some(Ok(entry))
346            }
347        }
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use prometrics::metrics::MetricBuilder;
354    use trackable::result::TestResult;
355
356    use super::*;
357    use crate::nvm::MemoryNvm;
358    use crate::storage::portion::DataPortion;
359    use crate::storage::{Address, JournalRecord};
360    use crate::ErrorKind;
361
362    #[test]
363    fn append_and_read_records() -> TestResult {
364        let nvm = MemoryNvm::new(vec![0; 1024]);
365        let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
366
367        let records = vec![
368            record_put("000", 30, 5),
369            record_put("111", 100, 300),
370            record_delete("222"),
371            record_embed("333", b"foo"),
372            record_delete("444"),
373            record_delete_range("000", "999"),
374        ];
375        for record in &records {
376            assert!(ring.enqueue(record).is_ok());
377        }
378
379        let mut position = Address::from(0);
380        for (entry, record) in track!(ring.dequeue_iter())?.zip(records.iter()) {
381            let entry = track!(entry)?;
382            assert_eq!(entry.record, *record);
383            assert_eq!(entry.start, position);
384            position = position + Address::from(record.external_size() as u32);
385        }
386
387        assert_eq!(ring.unreleased_head, 0);
388        assert_eq!(ring.head, position.as_u64());
389        assert_eq!(ring.tail, position.as_u64());
390
391        assert_eq!(track!(ring.dequeue_iter())?.count(), 0);
392        Ok(())
393    }
394
395    #[test]
396    fn read_embedded_data() -> TestResult {
397        let nvm = MemoryNvm::new(vec![0; 1024]);
398        let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
399
400        track!(ring.enqueue(&record_put("000", 30, 5)))?;
401        track!(ring.enqueue(&record_delete("111")))?;
402
403        let (lump_id, portion) =
404            track!(ring.enqueue(&record_embed("222", b"foo")))?.expect("Some(_)");
405        assert_eq!(lump_id, track_any_err!("222".parse())?);
406
407        let mut buf = vec![0; portion.len as usize];
408        track!(ring.read_embedded_data(portion.start.as_u64(), &mut buf))?;
409        assert_eq!(buf, b"foo");
410        Ok(())
411    }
412
413    #[test]
414    fn go_round_ring_buffer() -> TestResult {
415        let nvm = MemoryNvm::new(vec![0; 1024]);
416        let mut ring = JournalRingBuffer::new(nvm, 512, &MetricBuilder::new());
417        assert_eq!(ring.head, 512);
418        assert_eq!(ring.tail, 512);
419
420        let record = record_delete("000");
421        for _ in 0..(512 / record.external_size()) {
422            track!(ring.enqueue(&record))?;
423        }
424        assert_eq!(ring.tail, 1016);
425
426        track!(ring.enqueue(&record))?;
427        assert_eq!(ring.tail, 21);
428        Ok(())
429    }
430
431    #[test]
432    fn full() -> TestResult {
433        let nvm = MemoryNvm::new(vec![0; 1024]);
434        let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
435
436        let record = record_put("000", 1, 2);
437        while ring.tail <= 1024 - record.external_size() as u64 {
438            track!(ring.enqueue(&record))?;
439        }
440        assert_eq!(ring.tail, 1008);
441
442        assert_eq!(
443            ring.enqueue(&record).err().map(|e| *e.kind()),
444            Some(ErrorKind::StorageFull)
445        );
446        assert_eq!(ring.tail, 1008);
447
448        ring.unreleased_head = 511;
449        ring.head = 511;
450        assert_eq!(
451            ring.enqueue(&record).err().map(|e| *e.kind()),
452            Some(ErrorKind::StorageFull)
453        );
454
455        ring.unreleased_head = 512;
456        ring.head = 512;
457        assert!(ring.enqueue(&record).is_ok());
458        assert_eq!(ring.tail, record.external_size() as u64);
459        Ok(())
460    }
461
462    #[test]
463    fn too_large_record() {
464        let nvm = MemoryNvm::new(vec![0; 1024]);
465        let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
466
467        let record = record_embed("000", &[0; 997]);
468        assert_eq!(record.external_size(), 1020);
469        assert_eq!(
470            ring.enqueue(&record).err().map(|e| *e.kind()),
471            Some(ErrorKind::StorageFull)
472        );
473
474        let record = record_embed("000", &[0; 996]);
475        assert_eq!(record.external_size(), 1019);
476        assert!(ring.enqueue(&record).is_ok());
477        assert_eq!(ring.tail, 1019);
478    }
479
480    fn record_put(lump_id: &str, start: u32, len: u16) -> JournalRecord<Vec<u8>> {
481        JournalRecord::Put(
482            lump_id.parse().unwrap(),
483            DataPortion {
484                start: Address::from(start),
485                len,
486            },
487        )
488    }
489
490    fn lump_id(id: &str) -> LumpId {
491        id.parse().unwrap()
492    }
493
494    fn record_embed(id: &str, data: &[u8]) -> JournalRecord<Vec<u8>> {
495        JournalRecord::Embed(lump_id(id), data.to_owned())
496    }
497
498    fn record_delete(id: &str) -> JournalRecord<Vec<u8>> {
499        JournalRecord::Delete(lump_id(id))
500    }
501
502    fn record_delete_range(start: &str, end: &str) -> JournalRecord<Vec<u8>> {
503        use std::ops::Range;
504        JournalRecord::DeleteRange(Range {
505            start: lump_id(start),
506            end: lump_id(end),
507        })
508    }
509}