1use prometrics::metrics::MetricBuilder;
2use std::io::{BufReader, Read, Seek, SeekFrom};
3
4use super::record::{EMBEDDED_DATA_OFFSET, END_OF_RECORDS_SIZE};
5use super::{JournalEntry, JournalNvmBuffer, JournalRecord};
6use crate::lump::LumpId;
7use crate::metrics::JournalQueueMetrics;
8use crate::nvm::NonVolatileMemory;
9use crate::storage::portion::JournalPortion;
10use crate::storage::Address;
11use crate::{ErrorKind, Result};
12
13#[derive(Debug)]
15pub struct JournalRingBuffer<N: NonVolatileMemory> {
16 nvm: JournalNvmBuffer<N>,
17
18 unreleased_head: u64,
23
24 head: u64,
26
27 tail: u64,
33
34 metrics: JournalQueueMetrics,
35}
36impl<N: NonVolatileMemory> JournalRingBuffer<N> {
37 pub fn head(&self) -> u64 {
38 self.head
39 }
40 pub fn tail(&self) -> u64 {
41 self.tail
42 }
43
44 pub fn journal_entries(&mut self) -> Result<(u64, u64, u64, Vec<JournalEntry>)> {
45 track_io!(self.nvm.seek(SeekFrom::Start(self.head)))?;
46 let result: Result<Vec<JournalEntry>> =
47 ReadEntries::new(&mut self.nvm, self.head).collect();
48 result.map(|r| (self.unreleased_head, self.head, self.tail, r))
49 }
50
51 pub fn new(nvm: N, head: u64, metric_builder: &MetricBuilder) -> Self {
53 let metrics = JournalQueueMetrics::new(metric_builder);
54 metrics.capacity_bytes.set(nvm.capacity() as f64);
55 JournalRingBuffer {
56 nvm: JournalNvmBuffer::new(nvm),
57 unreleased_head: head,
58 head,
59 tail: head,
60 metrics,
61 }
62 }
63
64 pub fn restore_entries(&mut self) -> Result<RestoredEntries<N>> {
68 track!(RestoredEntries::new(self))
69 }
70
71 pub fn is_empty(&self) -> bool {
73 self.head == self.tail
74 }
75
76 pub fn usage(&self) -> u64 {
78 if self.unreleased_head <= self.tail {
79 self.tail - self.unreleased_head
80 } else {
81 (self.tail + self.capacity()) - self.unreleased_head
82 }
83 }
84
85 pub fn capacity(&self) -> u64 {
87 self.nvm.capacity()
88 }
89
90 pub fn metrics(&self) -> &JournalQueueMetrics {
92 &self.metrics
93 }
94
95 pub fn read_embedded_data(&mut self, position: u64, buf: &mut [u8]) -> Result<()> {
99 track_io!(self.nvm.seek(SeekFrom::Start(position)))?;
100 track_io!(self.nvm.read_exact(buf))?;
101 Ok(())
102 }
103
104 pub fn sync(&mut self) -> Result<()> {
106 track!(self.nvm.sync())
107 }
108
109 pub fn enqueue<B: AsRef<[u8]>>(
113 &mut self,
114 record: &JournalRecord<B>,
115 ) -> Result<Option<(LumpId, JournalPortion)>> {
116 track!(self.check_free_space(record))?;
118
119 if self.will_overflow(record) {
121 track_io!(self.nvm.seek(SeekFrom::Start(self.tail)))?;
122 track!(JournalRecord::GoToFront::<[_; 0]>.write_to(&mut self.nvm))?;
123
124 self.metrics
126 .consumed_bytes_at_running
127 .add_u64(self.nvm.capacity() - self.tail);
128 self.tail = 0;
129 debug_assert!(!self.will_overflow(record));
130 return self.enqueue(record);
131 }
132
133 let prev_tail = self.tail;
135 track_io!(self.nvm.seek(SeekFrom::Start(self.tail)))?;
136 track!(record.write_to(&mut self.nvm))?;
137 self.metrics.enqueued_records_at_running.increment(record);
138
139 self.tail = self.nvm.position(); self.metrics
142 .consumed_bytes_at_running
143 .add_u64(self.tail - prev_tail);
144 track!(JournalRecord::EndOfRecords::<[_; 0]>.write_to(&mut self.nvm))?;
145
146 if let JournalRecord::Embed(ref lump_id, ref data) = *record {
148 let portion = JournalPortion {
149 start: Address::from_u64(prev_tail + EMBEDDED_DATA_OFFSET as u64).unwrap(),
150 len: data.as_ref().len() as u16,
151 };
152 Ok(Some((*lump_id, portion)))
153 } else {
154 Ok(None)
155 }
156 }
157
158 pub fn dequeue_iter(&mut self) -> Result<DequeuedEntries<N>> {
164 track!(DequeuedEntries::new(self))
165 }
166
167 pub fn release_bytes_until(&mut self, point: u64) {
168 let released_bytes = if self.unreleased_head <= point {
169 point - self.unreleased_head
170 } else {
171 (point + self.nvm.capacity()) - self.unreleased_head
172 };
173 self.metrics.released_bytes.add_u64(released_bytes);
174
175 self.unreleased_head = point;
176 }
177
178 fn will_overflow<B: AsRef<[u8]>>(&self, record: &JournalRecord<B>) -> bool {
180 let mut next_tail = self.tail + record.external_size() as u64;
181
182 next_tail += END_OF_RECORDS_SIZE as u64;
184
185 next_tail > self.nvm.capacity()
186 }
187
188 fn check_free_space<B: AsRef<[u8]>>(&mut self, record: &JournalRecord<B>) -> Result<()> {
190 let write_end = self.tail + (record.external_size() + END_OF_RECORDS_SIZE) as u64;
192
193 let write_end = self.nvm.block_size().ceil_align(write_end);
195
196 let free_end = if self.tail < self.unreleased_head {
198 self.unreleased_head
199 } else {
200 self.nvm.capacity() + self.unreleased_head
201 };
202 track_assert!(
203 write_end <= free_end,
204 ErrorKind::StorageFull,
205 "journal region is full: unreleased_head={}, head={}, tail={}, write_end={}, free_end={}",
206 self.unreleased_head,
207 self.head,
208 self.tail,
209 write_end,
210 free_end
211 );
212 Ok(())
213 }
214}
215
216#[derive(Debug)]
217pub struct RestoredEntries<'a, N: 'a + NonVolatileMemory> {
218 entries: ReadEntries<'a, N>,
219 head: u64,
220 tail: &'a mut u64,
221 capacity: u64,
222 metrics: &'a JournalQueueMetrics,
223}
224impl<'a, N: 'a + NonVolatileMemory> RestoredEntries<'a, N> {
225 #[allow(clippy::new_ret_no_self)]
226 fn new(ring: &'a mut JournalRingBuffer<N>) -> Result<Self> {
227 track_assert_eq!(
229 ring.unreleased_head,
230 ring.head,
231 ErrorKind::InconsistentState
232 );
233 track_assert_eq!(ring.head, ring.tail, ErrorKind::InconsistentState);
234
235 track_io!(ring.nvm.seek(SeekFrom::Start(ring.head)))?;
236 let capacity = ring.nvm.capacity();
237 Ok(RestoredEntries {
238 entries: ReadEntries::with_capacity(&mut ring.nvm, ring.head, 1024 * 1024),
239 head: ring.head,
240 tail: &mut ring.tail,
241 capacity,
242 metrics: &ring.metrics,
243 })
244 }
245}
246impl<'a, N: 'a + NonVolatileMemory> Iterator for RestoredEntries<'a, N> {
247 type Item = Result<JournalEntry>;
248 fn next(&mut self) -> Option<Self::Item> {
249 let next = self.entries.next();
250 match next {
251 Some(Ok(ref entry)) => {
252 self.metrics
253 .enqueued_records_at_starting
254 .increment(&entry.record);
255 *self.tail = entry.end().as_u64();
256 }
257 None => {
258 let size = if self.head <= *self.tail {
259 *self.tail - self.head
260 } else {
261 (*self.tail + self.capacity) - self.head
262 };
263 self.metrics.consumed_bytes_at_starting.add_u64(size);
264 }
265 _ => {}
266 }
267 next
268 }
269}
270
271#[derive(Debug)]
272pub struct DequeuedEntries<'a, N: 'a + NonVolatileMemory> {
273 entries: ReadEntries<'a, N>,
274 head: &'a mut u64,
275 metrics: &'a JournalQueueMetrics,
276}
277impl<'a, N: 'a + NonVolatileMemory> DequeuedEntries<'a, N> {
278 #[allow(clippy::new_ret_no_self)]
279 fn new(ring: &'a mut JournalRingBuffer<N>) -> Result<Self> {
280 track_io!(ring.nvm.seek(SeekFrom::Start(ring.head)))?;
281 Ok(DequeuedEntries {
282 entries: ReadEntries::new(&mut ring.nvm, ring.head),
283 head: &mut ring.head,
284 metrics: &ring.metrics,
285 })
286 }
287}
288impl<'a, N: 'a + NonVolatileMemory> Iterator for DequeuedEntries<'a, N> {
289 type Item = Result<JournalEntry>;
290 fn next(&mut self) -> Option<Self::Item> {
291 let next = self.entries.next();
292 if let Some(Ok(ref entry)) = next {
293 self.metrics.dequeued_records.increment(&entry.record);
294 *self.head = entry.end().as_u64();
295 }
296 next
297 }
298}
299
300#[derive(Debug)]
301struct ReadEntries<'a, N: 'a + NonVolatileMemory> {
302 reader: BufReader<&'a mut JournalNvmBuffer<N>>,
303 current: u64,
304 is_second_lap: bool,
305}
306impl<'a, N: 'a + NonVolatileMemory> ReadEntries<'a, N> {
307 fn new(nvm: &'a mut JournalNvmBuffer<N>, head: u64) -> Self {
308 ReadEntries {
309 reader: BufReader::new(nvm),
310 current: head,
311 is_second_lap: false,
312 }
313 }
314 fn with_capacity(nvm: &'a mut JournalNvmBuffer<N>, head: u64, capacity: usize) -> Self {
315 ReadEntries {
316 reader: BufReader::with_capacity(capacity, nvm),
317 current: head,
318 is_second_lap: false,
319 }
320 }
321 fn read_record(&mut self) -> Result<Option<JournalRecord<Vec<u8>>>> {
322 match track!(JournalRecord::read_from(&mut self.reader))? {
323 JournalRecord::EndOfRecords => Ok(None),
324 JournalRecord::GoToFront => {
325 track_assert!(!self.is_second_lap, ErrorKind::StorageCorrupted);
326 track_io!(self.reader.seek(SeekFrom::Start(0)))?;
327 self.current = 0;
328 self.is_second_lap = true;
329 self.read_record()
330 }
331 record => Ok(Some(record)),
332 }
333 }
334}
335impl<'a, N: 'a + NonVolatileMemory> Iterator for ReadEntries<'a, N> {
336 type Item = Result<JournalEntry>;
337 fn next(&mut self) -> Option<Self::Item> {
338 match self.read_record() {
339 Err(e) => Some(Err(e)),
340 Ok(None) => None,
341 Ok(Some(record)) => {
342 let start = Address::from_u64(self.current).expect("Never fails");
343 self.current += record.external_size() as u64;
344 let entry = JournalEntry { start, record };
345 Some(Ok(entry))
346 }
347 }
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use prometrics::metrics::MetricBuilder;
354 use trackable::result::TestResult;
355
356 use super::*;
357 use crate::nvm::MemoryNvm;
358 use crate::storage::portion::DataPortion;
359 use crate::storage::{Address, JournalRecord};
360 use crate::ErrorKind;
361
362 #[test]
363 fn append_and_read_records() -> TestResult {
364 let nvm = MemoryNvm::new(vec![0; 1024]);
365 let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
366
367 let records = vec![
368 record_put("000", 30, 5),
369 record_put("111", 100, 300),
370 record_delete("222"),
371 record_embed("333", b"foo"),
372 record_delete("444"),
373 record_delete_range("000", "999"),
374 ];
375 for record in &records {
376 assert!(ring.enqueue(record).is_ok());
377 }
378
379 let mut position = Address::from(0);
380 for (entry, record) in track!(ring.dequeue_iter())?.zip(records.iter()) {
381 let entry = track!(entry)?;
382 assert_eq!(entry.record, *record);
383 assert_eq!(entry.start, position);
384 position = position + Address::from(record.external_size() as u32);
385 }
386
387 assert_eq!(ring.unreleased_head, 0);
388 assert_eq!(ring.head, position.as_u64());
389 assert_eq!(ring.tail, position.as_u64());
390
391 assert_eq!(track!(ring.dequeue_iter())?.count(), 0);
392 Ok(())
393 }
394
395 #[test]
396 fn read_embedded_data() -> TestResult {
397 let nvm = MemoryNvm::new(vec![0; 1024]);
398 let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
399
400 track!(ring.enqueue(&record_put("000", 30, 5)))?;
401 track!(ring.enqueue(&record_delete("111")))?;
402
403 let (lump_id, portion) =
404 track!(ring.enqueue(&record_embed("222", b"foo")))?.expect("Some(_)");
405 assert_eq!(lump_id, track_any_err!("222".parse())?);
406
407 let mut buf = vec![0; portion.len as usize];
408 track!(ring.read_embedded_data(portion.start.as_u64(), &mut buf))?;
409 assert_eq!(buf, b"foo");
410 Ok(())
411 }
412
413 #[test]
414 fn go_round_ring_buffer() -> TestResult {
415 let nvm = MemoryNvm::new(vec![0; 1024]);
416 let mut ring = JournalRingBuffer::new(nvm, 512, &MetricBuilder::new());
417 assert_eq!(ring.head, 512);
418 assert_eq!(ring.tail, 512);
419
420 let record = record_delete("000");
421 for _ in 0..(512 / record.external_size()) {
422 track!(ring.enqueue(&record))?;
423 }
424 assert_eq!(ring.tail, 1016);
425
426 track!(ring.enqueue(&record))?;
427 assert_eq!(ring.tail, 21);
428 Ok(())
429 }
430
431 #[test]
432 fn full() -> TestResult {
433 let nvm = MemoryNvm::new(vec![0; 1024]);
434 let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
435
436 let record = record_put("000", 1, 2);
437 while ring.tail <= 1024 - record.external_size() as u64 {
438 track!(ring.enqueue(&record))?;
439 }
440 assert_eq!(ring.tail, 1008);
441
442 assert_eq!(
443 ring.enqueue(&record).err().map(|e| *e.kind()),
444 Some(ErrorKind::StorageFull)
445 );
446 assert_eq!(ring.tail, 1008);
447
448 ring.unreleased_head = 511;
449 ring.head = 511;
450 assert_eq!(
451 ring.enqueue(&record).err().map(|e| *e.kind()),
452 Some(ErrorKind::StorageFull)
453 );
454
455 ring.unreleased_head = 512;
456 ring.head = 512;
457 assert!(ring.enqueue(&record).is_ok());
458 assert_eq!(ring.tail, record.external_size() as u64);
459 Ok(())
460 }
461
462 #[test]
463 fn too_large_record() {
464 let nvm = MemoryNvm::new(vec![0; 1024]);
465 let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
466
467 let record = record_embed("000", &[0; 997]);
468 assert_eq!(record.external_size(), 1020);
469 assert_eq!(
470 ring.enqueue(&record).err().map(|e| *e.kind()),
471 Some(ErrorKind::StorageFull)
472 );
473
474 let record = record_embed("000", &[0; 996]);
475 assert_eq!(record.external_size(), 1019);
476 assert!(ring.enqueue(&record).is_ok());
477 assert_eq!(ring.tail, 1019);
478 }
479
480 fn record_put(lump_id: &str, start: u32, len: u16) -> JournalRecord<Vec<u8>> {
481 JournalRecord::Put(
482 lump_id.parse().unwrap(),
483 DataPortion {
484 start: Address::from(start),
485 len,
486 },
487 )
488 }
489
490 fn lump_id(id: &str) -> LumpId {
491 id.parse().unwrap()
492 }
493
494 fn record_embed(id: &str, data: &[u8]) -> JournalRecord<Vec<u8>> {
495 JournalRecord::Embed(lump_id(id), data.to_owned())
496 }
497
498 fn record_delete(id: &str) -> JournalRecord<Vec<u8>> {
499 JournalRecord::Delete(lump_id(id))
500 }
501
502 fn record_delete_range(start: &str, end: &str) -> JournalRecord<Vec<u8>> {
503 use std::ops::Range;
504 JournalRecord::DeleteRange(Range {
505 start: lump_id(start),
506 end: lump_id(end),
507 })
508 }
509}