Skip to main content

cannyls/storage/journal/
ring_buffer.rs

1use prometrics::metrics::MetricBuilder;
2use std::io::{BufReader, Read, Seek, SeekFrom};
3
4use super::record::{EMBEDDED_DATA_OFFSET, END_OF_RECORDS_SIZE};
5use super::{JournalEntry, JournalNvmBuffer, JournalRecord};
6use lump::LumpId;
7use metrics::JournalQueueMetrics;
8use nvm::NonVolatileMemory;
9use storage::portion::JournalPortion;
10use storage::Address;
11use {ErrorKind, Result};
12
13/// ジャーナル領域用のリングバッファ.
14#[derive(Debug)]
15pub struct JournalRingBuffer<N: NonVolatileMemory> {
16    nvm: JournalNvmBuffer<N>,
17
18    /// 未解放分の含めた場合の、リングバッファの始端位置.
19    ///
20    /// `unreleased_head`から`head`の間に位置するレコード群は、
21    /// `JournalRegion`によってデキューはされているが、
22    /// まだGCによる再配置は終わっていない可能性があるので、
23    /// 安全に上書きすることができない.
24    unreleased_head: u64,
25
26    /// リングバッファの始端位置.
27    head: u64,
28
29    /// リングバッファの終端位置.
30    ///
31    /// ここが次の追記開始位置となる.
32    ///
33    /// 不変項: `unreleased_head <= head <= tail`
34    tail: u64,
35
36    metrics: JournalQueueMetrics,
37}
38impl<N: NonVolatileMemory> JournalRingBuffer<N> {
39    pub fn head(&self) -> u64 {
40        self.head
41    }
42    pub fn tail(&self) -> u64 {
43        self.tail
44    }
45
46    pub fn journal_entries(&mut self) -> Result<(u64, u64, u64, Vec<JournalEntry>)> {
47        track_io!(self.nvm.seek(SeekFrom::Start(self.head)))?;
48        let result: Result<Vec<JournalEntry>> =
49            ReadEntries::new(&mut self.nvm, self.head).collect();
50        result.map(|r| (self.unreleased_head, self.head, self.tail, r))
51    }
52
53    /// `JournalRingBuffer`インスタンスを生成する.
54    pub fn new(nvm: N, head: u64, metric_builder: &MetricBuilder) -> Self {
55        let metrics = JournalQueueMetrics::new(metric_builder);
56        metrics.capacity_bytes.set(nvm.capacity() as f64);
57        JournalRingBuffer {
58            nvm: JournalNvmBuffer::new(nvm),
59            unreleased_head: head,
60            head,
61            tail: head,
62            metrics,
63        }
64    }
65
66    /// NVMから以前のエントリ群を復元し、それらを操作するためのイテレータを返す.
67    ///
68    /// インスタンス生成直後に一度だけ呼ばれることを想定.
69    pub fn restore_entries(&mut self) -> Result<RestoredEntries<N>> {
70        track!(RestoredEntries::new(self))
71    }
72
73    /// リングバッファ内に要素が存在するかどうかを判定する.
74    pub fn is_empty(&self) -> bool {
75        self.head == self.tail
76    }
77
78    /// リングバッファの使用量(バイト単位)を返す.
79    pub fn usage(&self) -> u64 {
80        if self.unreleased_head <= self.tail {
81            self.tail - self.unreleased_head
82        } else {
83            (self.tail + self.capacity()) - self.unreleased_head
84        }
85    }
86
87    /// リングバッファの容量(バイト単位)を返す.
88    pub fn capacity(&self) -> u64 {
89        self.nvm.capacity()
90    }
91
92    /// リングバッファのメトリクスを返す.
93    pub fn metrics(&self) -> &JournalQueueMetrics {
94        &self.metrics
95    }
96
97    /// 指定位置に埋め込まれたlumpデータの読み込みを行う.
98    ///
99    /// データの妥当性検証は`cannyls`内では行わない.
100    pub fn read_embedded_data(&mut self, position: u64, buf: &mut [u8]) -> Result<()> {
101        track_io!(self.nvm.seek(SeekFrom::Start(position)))?;
102        track_io!(self.nvm.read_exact(buf))?;
103        Ok(())
104    }
105
106    /// 物理デバイスに同期命令を発行する.
107    pub fn sync(&mut self) -> Result<()> {
108        track!(self.nvm.sync())
109    }
110
111    /// レコードをジャーナルの末尾に追記する.
112    ///
113    /// レコードが`JournalRecord::Embed`だった場合には、データを埋め込んだ位置を結果として返す.
114    pub fn enqueue<B: AsRef<[u8]>>(
115        &mut self,
116        record: &JournalRecord<B>,
117    ) -> Result<Option<(LumpId, JournalPortion)>> {
118        // 1. 十分な空き領域が存在するかをチェック
119        track!(self.check_free_space(record))?;
120
121        // 2. リングバッファの終端チェック
122        if self.will_overflow(record) {
123            track_io!(self.nvm.seek(SeekFrom::Start(self.tail)))?;
124            track!(JournalRecord::GoToFront::<[_; 0]>.write_to(&mut self.nvm))?;
125
126            // 先頭に戻って再試行
127            self.metrics
128                .consumed_bytes_at_running
129                .add_u64(self.nvm.capacity() - self.tail);
130            self.tail = 0;
131            debug_assert!(!self.will_overflow(record));
132            return self.enqueue(record);
133        }
134
135        // 3. レコードを書き込む
136        let prev_tail = self.tail;
137        track_io!(self.nvm.seek(SeekFrom::Start(self.tail)))?;
138        track!(record.write_to(&mut self.nvm))?;
139        self.metrics.enqueued_records_at_running.increment(record);
140
141        // 4. 終端を示すレコードも書き込む
142        self.tail = self.nvm.position(); // 次回の追記開始位置を保存 (`EndOfRecords`の直前)
143        self.metrics
144            .consumed_bytes_at_running
145            .add_u64(self.tail - prev_tail);
146        track!(JournalRecord::EndOfRecords::<[_; 0]>.write_to(&mut self.nvm))?;
147
148        // 5. 埋め込みPUTの場合には、インデックスに位置情報を返す
149        if let JournalRecord::Embed(ref lump_id, ref data) = *record {
150            let portion = JournalPortion {
151                start: Address::from_u64(prev_tail + EMBEDDED_DATA_OFFSET as u64).unwrap(),
152                len: data.as_ref().len() as u16,
153            };
154            Ok(Some((*lump_id, portion)))
155        } else {
156            Ok(None)
157        }
158    }
159
160    /// リングバッファの先頭からエントリ群を取り出す.
161    ///
162    /// `EndOfRecords`に到達した時点で走査は終了する.
163    ///
164    /// `EndOfRecords`および`GoToFront`は、走査対象には含まれない.
165    pub fn dequeue_iter(&mut self) -> Result<DequeuedEntries<N>> {
166        track!(DequeuedEntries::new(self))
167    }
168
169    pub fn release_bytes_until(&mut self, point: u64) {
170        let released_bytes = if self.unreleased_head <= point {
171            point - self.unreleased_head
172        } else {
173            (point + self.nvm.capacity()) - self.unreleased_head
174        };
175        self.metrics.released_bytes.add_u64(released_bytes);
176
177        self.unreleased_head = point;
178    }
179
180    /// `record`を書き込んだら、リングバッファ用の領域を超えてしまうかどうかを判定する.
181    fn will_overflow<B: AsRef<[u8]>>(&self, record: &JournalRecord<B>) -> bool {
182        let mut next_tail = self.tail + record.external_size() as u64;
183
184        // `EndOfRecords`は常に末尾に書き込まれるので、その分のサイズも考慮する
185        next_tail += END_OF_RECORDS_SIZE as u64;
186
187        next_tail > self.nvm.capacity()
188    }
189
190    /// `record`の書き込みを行うことで、リングバッファのTAILがHEADを追い越してしまう危険性がないかを確認する.
191    fn check_free_space<B: AsRef<[u8]>>(&mut self, record: &JournalRecord<B>) -> Result<()> {
192        // 書き込みの物理的な終端位置を計算
193        let write_end = self.tail + (record.external_size() + END_OF_RECORDS_SIZE) as u64;
194
195        // 次のブロック境界までのデータは上書きされる
196        let write_end = self.nvm.block_size().ceil_align(write_end);
197
198        // 安全に書き込み可能な位置の終端
199        let free_end = if self.tail < self.unreleased_head {
200            self.unreleased_head
201        } else {
202            self.nvm.capacity() + self.unreleased_head
203        };
204        track_assert!(
205            write_end <= free_end,
206            ErrorKind::StorageFull,
207            "journal region is full: unreleased_head={}, head={}, tail={}, write_end={}, free_end={}",
208            self.unreleased_head,
209            self.head,
210            self.tail,
211            write_end,
212            free_end
213        );
214        Ok(())
215    }
216}
217
218#[derive(Debug)]
219pub struct RestoredEntries<'a, N: 'a + NonVolatileMemory> {
220    entries: ReadEntries<'a, N>,
221    head: u64,
222    tail: &'a mut u64,
223    capacity: u64,
224    metrics: &'a JournalQueueMetrics,
225}
226impl<'a, N: 'a + NonVolatileMemory> RestoredEntries<'a, N> {
227    #[allow(clippy::new_ret_no_self)]
228    fn new(ring: &'a mut JournalRingBuffer<N>) -> Result<Self> {
229        // 生成直後の呼び出しかどうかを簡易チェック
230        track_assert_eq!(
231            ring.unreleased_head,
232            ring.head,
233            ErrorKind::InconsistentState
234        );
235        track_assert_eq!(ring.head, ring.tail, ErrorKind::InconsistentState);
236
237        track_io!(ring.nvm.seek(SeekFrom::Start(ring.head)))?;
238        let capacity = ring.nvm.capacity();
239        Ok(RestoredEntries {
240            entries: ReadEntries::with_capacity(&mut ring.nvm, ring.head, 1024 * 1024),
241            head: ring.head,
242            tail: &mut ring.tail,
243            capacity,
244            metrics: &ring.metrics,
245        })
246    }
247}
248impl<'a, N: 'a + NonVolatileMemory> Iterator for RestoredEntries<'a, N> {
249    type Item = Result<JournalEntry>;
250    fn next(&mut self) -> Option<Self::Item> {
251        let next = self.entries.next();
252        match next {
253            Some(Ok(ref entry)) => {
254                self.metrics
255                    .enqueued_records_at_starting
256                    .increment(&entry.record);
257                *self.tail = entry.end().as_u64();
258            }
259            None => {
260                let size = if self.head <= *self.tail {
261                    *self.tail - self.head
262                } else {
263                    (*self.tail + self.capacity) - self.head
264                };
265                self.metrics.consumed_bytes_at_starting.add_u64(size);
266            }
267            _ => {}
268        }
269        next
270    }
271}
272
273#[derive(Debug)]
274pub struct DequeuedEntries<'a, N: 'a + NonVolatileMemory> {
275    entries: ReadEntries<'a, N>,
276    head: &'a mut u64,
277    metrics: &'a JournalQueueMetrics,
278}
279impl<'a, N: 'a + NonVolatileMemory> DequeuedEntries<'a, N> {
280    #[allow(clippy::new_ret_no_self)]
281    fn new(ring: &'a mut JournalRingBuffer<N>) -> Result<Self> {
282        track_io!(ring.nvm.seek(SeekFrom::Start(ring.head)))?;
283        Ok(DequeuedEntries {
284            entries: ReadEntries::new(&mut ring.nvm, ring.head),
285            head: &mut ring.head,
286            metrics: &ring.metrics,
287        })
288    }
289}
290impl<'a, N: 'a + NonVolatileMemory> Iterator for DequeuedEntries<'a, N> {
291    type Item = Result<JournalEntry>;
292    fn next(&mut self) -> Option<Self::Item> {
293        let next = self.entries.next();
294        if let Some(Ok(ref entry)) = next {
295            self.metrics.dequeued_records.increment(&entry.record);
296            *self.head = entry.end().as_u64();
297        }
298        next
299    }
300}
301
302#[derive(Debug)]
303struct ReadEntries<'a, N: 'a + NonVolatileMemory> {
304    reader: BufReader<&'a mut JournalNvmBuffer<N>>,
305    current: u64,
306    is_second_lap: bool,
307}
308impl<'a, N: 'a + NonVolatileMemory> ReadEntries<'a, N> {
309    fn new(nvm: &'a mut JournalNvmBuffer<N>, head: u64) -> Self {
310        ReadEntries {
311            reader: BufReader::new(nvm),
312            current: head,
313            is_second_lap: false,
314        }
315    }
316    fn with_capacity(nvm: &'a mut JournalNvmBuffer<N>, head: u64, capacity: usize) -> Self {
317        ReadEntries {
318            reader: BufReader::with_capacity(capacity, nvm),
319            current: head,
320            is_second_lap: false,
321        }
322    }
323    fn read_record(&mut self) -> Result<Option<JournalRecord<Vec<u8>>>> {
324        match track!(JournalRecord::read_from(&mut self.reader))? {
325            JournalRecord::EndOfRecords => Ok(None),
326            JournalRecord::GoToFront => {
327                track_assert!(!self.is_second_lap, ErrorKind::StorageCorrupted);
328                track_io!(self.reader.seek(SeekFrom::Start(0)))?;
329                self.current = 0;
330                self.is_second_lap = true;
331                self.read_record()
332            }
333            record => Ok(Some(record)),
334        }
335    }
336}
337impl<'a, N: 'a + NonVolatileMemory> Iterator for ReadEntries<'a, N> {
338    type Item = Result<JournalEntry>;
339    fn next(&mut self) -> Option<Self::Item> {
340        match self.read_record() {
341            Err(e) => Some(Err(e)),
342            Ok(None) => None,
343            Ok(Some(record)) => {
344                let start = Address::from_u64(self.current).expect("Never fails");
345                self.current += record.external_size() as u64;
346                let entry = JournalEntry { start, record };
347                Some(Ok(entry))
348            }
349        }
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use prometrics::metrics::MetricBuilder;
356    use trackable::result::TestResult;
357
358    use super::*;
359    use nvm::MemoryNvm;
360    use storage::portion::DataPortion;
361    use storage::{Address, JournalRecord};
362    use ErrorKind;
363
364    #[test]
365    fn append_and_read_records() -> TestResult {
366        let nvm = MemoryNvm::new(vec![0; 1024]);
367        let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
368
369        let records = vec![
370            record_put("000", 30, 5),
371            record_put("111", 100, 300),
372            record_delete("222"),
373            record_embed("333", b"foo"),
374            record_delete("444"),
375            record_delete_range("000", "999"),
376        ];
377        for record in &records {
378            assert!(ring.enqueue(record).is_ok());
379        }
380
381        let mut position = Address::from(0);
382        for (entry, record) in track!(ring.dequeue_iter())?.zip(records.iter()) {
383            let entry = track!(entry)?;
384            assert_eq!(entry.record, *record);
385            assert_eq!(entry.start, position);
386            position = position + Address::from(record.external_size() as u32);
387        }
388
389        assert_eq!(ring.unreleased_head, 0);
390        assert_eq!(ring.head, position.as_u64());
391        assert_eq!(ring.tail, position.as_u64());
392
393        assert_eq!(track!(ring.dequeue_iter())?.count(), 0);
394        Ok(())
395    }
396
397    #[test]
398    fn read_embedded_data() -> TestResult {
399        let nvm = MemoryNvm::new(vec![0; 1024]);
400        let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
401
402        track!(ring.enqueue(&record_put("000", 30, 5)))?;
403        track!(ring.enqueue(&record_delete("111")))?;
404
405        let (lump_id, portion) =
406            track!(ring.enqueue(&record_embed("222", b"foo")))?.expect("Some(_)");
407        assert_eq!(lump_id, track_any_err!("222".parse())?);
408
409        let mut buf = vec![0; portion.len as usize];
410        track!(ring.read_embedded_data(portion.start.as_u64(), &mut buf))?;
411        assert_eq!(buf, b"foo");
412        Ok(())
413    }
414
415    #[test]
416    fn go_round_ring_buffer() -> TestResult {
417        let nvm = MemoryNvm::new(vec![0; 1024]);
418        let mut ring = JournalRingBuffer::new(nvm, 512, &MetricBuilder::new());
419        assert_eq!(ring.head, 512);
420        assert_eq!(ring.tail, 512);
421
422        let record = record_delete("000");
423        for _ in 0..(512 / record.external_size()) {
424            track!(ring.enqueue(&record))?;
425        }
426        assert_eq!(ring.tail, 1016);
427
428        track!(ring.enqueue(&record))?;
429        assert_eq!(ring.tail, 21);
430        Ok(())
431    }
432
433    #[test]
434    fn full() -> TestResult {
435        let nvm = MemoryNvm::new(vec![0; 1024]);
436        let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
437
438        let record = record_put("000", 1, 2);
439        while ring.tail <= 1024 - record.external_size() as u64 {
440            track!(ring.enqueue(&record))?;
441        }
442        assert_eq!(ring.tail, 1008);
443
444        assert_eq!(
445            ring.enqueue(&record).err().map(|e| *e.kind()),
446            Some(ErrorKind::StorageFull)
447        );
448        assert_eq!(ring.tail, 1008);
449
450        ring.unreleased_head = 511;
451        ring.head = 511;
452        assert_eq!(
453            ring.enqueue(&record).err().map(|e| *e.kind()),
454            Some(ErrorKind::StorageFull)
455        );
456
457        ring.unreleased_head = 512;
458        ring.head = 512;
459        assert!(ring.enqueue(&record).is_ok());
460        assert_eq!(ring.tail, record.external_size() as u64);
461        Ok(())
462    }
463
464    #[test]
465    fn too_large_record() {
466        let nvm = MemoryNvm::new(vec![0; 1024]);
467        let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
468
469        let record = record_embed("000", &[0; 997]);
470        assert_eq!(record.external_size(), 1020);
471        assert_eq!(
472            ring.enqueue(&record).err().map(|e| *e.kind()),
473            Some(ErrorKind::StorageFull)
474        );
475
476        let record = record_embed("000", &[0; 996]);
477        assert_eq!(record.external_size(), 1019);
478        assert!(ring.enqueue(&record).is_ok());
479        assert_eq!(ring.tail, 1019);
480    }
481
482    fn record_put(lump_id: &str, start: u32, len: u16) -> JournalRecord<Vec<u8>> {
483        JournalRecord::Put(
484            lump_id.parse().unwrap(),
485            DataPortion {
486                start: Address::from(start),
487                len,
488            },
489        )
490    }
491
492    fn lump_id(id: &str) -> LumpId {
493        id.parse().unwrap()
494    }
495
496    fn record_embed(id: &str, data: &[u8]) -> JournalRecord<Vec<u8>> {
497        JournalRecord::Embed(lump_id(id), data.to_owned())
498    }
499
500    fn record_delete(id: &str) -> JournalRecord<Vec<u8>> {
501        JournalRecord::Delete(lump_id(id))
502    }
503
504    fn record_delete_range(start: &str, end: &str) -> JournalRecord<Vec<u8>> {
505        use std::ops::Range;
506        JournalRecord::DeleteRange(Range {
507            start: lump_id(start),
508            end: lump_id(end),
509        })
510    }
511}