1use prometrics::metrics::MetricBuilder;
2use std::io::{BufReader, Read, Seek, SeekFrom};
3
4use super::record::{EMBEDDED_DATA_OFFSET, END_OF_RECORDS_SIZE};
5use super::{JournalEntry, JournalNvmBuffer, JournalRecord};
6use lump::LumpId;
7use metrics::JournalQueueMetrics;
8use nvm::NonVolatileMemory;
9use storage::portion::JournalPortion;
10use storage::Address;
11use {ErrorKind, Result};
12
13#[derive(Debug)]
15pub struct JournalRingBuffer<N: NonVolatileMemory> {
16 nvm: JournalNvmBuffer<N>,
17
18 unreleased_head: u64,
25
26 head: u64,
28
29 tail: u64,
35
36 metrics: JournalQueueMetrics,
37}
38impl<N: NonVolatileMemory> JournalRingBuffer<N> {
39 pub fn head(&self) -> u64 {
40 self.head
41 }
42 pub fn tail(&self) -> u64 {
43 self.tail
44 }
45
46 pub fn journal_entries(&mut self) -> Result<(u64, u64, u64, Vec<JournalEntry>)> {
47 track_io!(self.nvm.seek(SeekFrom::Start(self.head)))?;
48 let result: Result<Vec<JournalEntry>> =
49 ReadEntries::new(&mut self.nvm, self.head).collect();
50 result.map(|r| (self.unreleased_head, self.head, self.tail, r))
51 }
52
53 pub fn new(nvm: N, head: u64, metric_builder: &MetricBuilder) -> Self {
55 let metrics = JournalQueueMetrics::new(metric_builder);
56 metrics.capacity_bytes.set(nvm.capacity() as f64);
57 JournalRingBuffer {
58 nvm: JournalNvmBuffer::new(nvm),
59 unreleased_head: head,
60 head,
61 tail: head,
62 metrics,
63 }
64 }
65
66 pub fn restore_entries(&mut self) -> Result<RestoredEntries<N>> {
70 track!(RestoredEntries::new(self))
71 }
72
73 pub fn is_empty(&self) -> bool {
75 self.head == self.tail
76 }
77
78 pub fn usage(&self) -> u64 {
80 if self.unreleased_head <= self.tail {
81 self.tail - self.unreleased_head
82 } else {
83 (self.tail + self.capacity()) - self.unreleased_head
84 }
85 }
86
87 pub fn capacity(&self) -> u64 {
89 self.nvm.capacity()
90 }
91
92 pub fn metrics(&self) -> &JournalQueueMetrics {
94 &self.metrics
95 }
96
97 pub fn read_embedded_data(&mut self, position: u64, buf: &mut [u8]) -> Result<()> {
101 track_io!(self.nvm.seek(SeekFrom::Start(position)))?;
102 track_io!(self.nvm.read_exact(buf))?;
103 Ok(())
104 }
105
106 pub fn sync(&mut self) -> Result<()> {
108 track!(self.nvm.sync())
109 }
110
111 pub fn enqueue<B: AsRef<[u8]>>(
115 &mut self,
116 record: &JournalRecord<B>,
117 ) -> Result<Option<(LumpId, JournalPortion)>> {
118 track!(self.check_free_space(record))?;
120
121 if self.will_overflow(record) {
123 track_io!(self.nvm.seek(SeekFrom::Start(self.tail)))?;
124 track!(JournalRecord::GoToFront::<[_; 0]>.write_to(&mut self.nvm))?;
125
126 self.metrics
128 .consumed_bytes_at_running
129 .add_u64(self.nvm.capacity() - self.tail);
130 self.tail = 0;
131 debug_assert!(!self.will_overflow(record));
132 return self.enqueue(record);
133 }
134
135 let prev_tail = self.tail;
137 track_io!(self.nvm.seek(SeekFrom::Start(self.tail)))?;
138 track!(record.write_to(&mut self.nvm))?;
139 self.metrics.enqueued_records_at_running.increment(record);
140
141 self.tail = self.nvm.position(); self.metrics
144 .consumed_bytes_at_running
145 .add_u64(self.tail - prev_tail);
146 track!(JournalRecord::EndOfRecords::<[_; 0]>.write_to(&mut self.nvm))?;
147
148 if let JournalRecord::Embed(ref lump_id, ref data) = *record {
150 let portion = JournalPortion {
151 start: Address::from_u64(prev_tail + EMBEDDED_DATA_OFFSET as u64).unwrap(),
152 len: data.as_ref().len() as u16,
153 };
154 Ok(Some((*lump_id, portion)))
155 } else {
156 Ok(None)
157 }
158 }
159
160 pub fn dequeue_iter(&mut self) -> Result<DequeuedEntries<N>> {
166 track!(DequeuedEntries::new(self))
167 }
168
169 pub fn release_bytes_until(&mut self, point: u64) {
170 let released_bytes = if self.unreleased_head <= point {
171 point - self.unreleased_head
172 } else {
173 (point + self.nvm.capacity()) - self.unreleased_head
174 };
175 self.metrics.released_bytes.add_u64(released_bytes);
176
177 self.unreleased_head = point;
178 }
179
180 fn will_overflow<B: AsRef<[u8]>>(&self, record: &JournalRecord<B>) -> bool {
182 let mut next_tail = self.tail + record.external_size() as u64;
183
184 next_tail += END_OF_RECORDS_SIZE as u64;
186
187 next_tail > self.nvm.capacity()
188 }
189
190 fn check_free_space<B: AsRef<[u8]>>(&mut self, record: &JournalRecord<B>) -> Result<()> {
192 let write_end = self.tail + (record.external_size() + END_OF_RECORDS_SIZE) as u64;
194
195 let write_end = self.nvm.block_size().ceil_align(write_end);
197
198 let free_end = if self.tail < self.unreleased_head {
200 self.unreleased_head
201 } else {
202 self.nvm.capacity() + self.unreleased_head
203 };
204 track_assert!(
205 write_end <= free_end,
206 ErrorKind::StorageFull,
207 "journal region is full: unreleased_head={}, head={}, tail={}, write_end={}, free_end={}",
208 self.unreleased_head,
209 self.head,
210 self.tail,
211 write_end,
212 free_end
213 );
214 Ok(())
215 }
216}
217
218#[derive(Debug)]
219pub struct RestoredEntries<'a, N: 'a + NonVolatileMemory> {
220 entries: ReadEntries<'a, N>,
221 head: u64,
222 tail: &'a mut u64,
223 capacity: u64,
224 metrics: &'a JournalQueueMetrics,
225}
226impl<'a, N: 'a + NonVolatileMemory> RestoredEntries<'a, N> {
227 #[allow(clippy::new_ret_no_self)]
228 fn new(ring: &'a mut JournalRingBuffer<N>) -> Result<Self> {
229 track_assert_eq!(
231 ring.unreleased_head,
232 ring.head,
233 ErrorKind::InconsistentState
234 );
235 track_assert_eq!(ring.head, ring.tail, ErrorKind::InconsistentState);
236
237 track_io!(ring.nvm.seek(SeekFrom::Start(ring.head)))?;
238 let capacity = ring.nvm.capacity();
239 Ok(RestoredEntries {
240 entries: ReadEntries::with_capacity(&mut ring.nvm, ring.head, 1024 * 1024),
241 head: ring.head,
242 tail: &mut ring.tail,
243 capacity,
244 metrics: &ring.metrics,
245 })
246 }
247}
248impl<'a, N: 'a + NonVolatileMemory> Iterator for RestoredEntries<'a, N> {
249 type Item = Result<JournalEntry>;
250 fn next(&mut self) -> Option<Self::Item> {
251 let next = self.entries.next();
252 match next {
253 Some(Ok(ref entry)) => {
254 self.metrics
255 .enqueued_records_at_starting
256 .increment(&entry.record);
257 *self.tail = entry.end().as_u64();
258 }
259 None => {
260 let size = if self.head <= *self.tail {
261 *self.tail - self.head
262 } else {
263 (*self.tail + self.capacity) - self.head
264 };
265 self.metrics.consumed_bytes_at_starting.add_u64(size);
266 }
267 _ => {}
268 }
269 next
270 }
271}
272
273#[derive(Debug)]
274pub struct DequeuedEntries<'a, N: 'a + NonVolatileMemory> {
275 entries: ReadEntries<'a, N>,
276 head: &'a mut u64,
277 metrics: &'a JournalQueueMetrics,
278}
279impl<'a, N: 'a + NonVolatileMemory> DequeuedEntries<'a, N> {
280 #[allow(clippy::new_ret_no_self)]
281 fn new(ring: &'a mut JournalRingBuffer<N>) -> Result<Self> {
282 track_io!(ring.nvm.seek(SeekFrom::Start(ring.head)))?;
283 Ok(DequeuedEntries {
284 entries: ReadEntries::new(&mut ring.nvm, ring.head),
285 head: &mut ring.head,
286 metrics: &ring.metrics,
287 })
288 }
289}
290impl<'a, N: 'a + NonVolatileMemory> Iterator for DequeuedEntries<'a, N> {
291 type Item = Result<JournalEntry>;
292 fn next(&mut self) -> Option<Self::Item> {
293 let next = self.entries.next();
294 if let Some(Ok(ref entry)) = next {
295 self.metrics.dequeued_records.increment(&entry.record);
296 *self.head = entry.end().as_u64();
297 }
298 next
299 }
300}
301
302#[derive(Debug)]
303struct ReadEntries<'a, N: 'a + NonVolatileMemory> {
304 reader: BufReader<&'a mut JournalNvmBuffer<N>>,
305 current: u64,
306 is_second_lap: bool,
307}
308impl<'a, N: 'a + NonVolatileMemory> ReadEntries<'a, N> {
309 fn new(nvm: &'a mut JournalNvmBuffer<N>, head: u64) -> Self {
310 ReadEntries {
311 reader: BufReader::new(nvm),
312 current: head,
313 is_second_lap: false,
314 }
315 }
316 fn with_capacity(nvm: &'a mut JournalNvmBuffer<N>, head: u64, capacity: usize) -> Self {
317 ReadEntries {
318 reader: BufReader::with_capacity(capacity, nvm),
319 current: head,
320 is_second_lap: false,
321 }
322 }
323 fn read_record(&mut self) -> Result<Option<JournalRecord<Vec<u8>>>> {
324 match track!(JournalRecord::read_from(&mut self.reader))? {
325 JournalRecord::EndOfRecords => Ok(None),
326 JournalRecord::GoToFront => {
327 track_assert!(!self.is_second_lap, ErrorKind::StorageCorrupted);
328 track_io!(self.reader.seek(SeekFrom::Start(0)))?;
329 self.current = 0;
330 self.is_second_lap = true;
331 self.read_record()
332 }
333 record => Ok(Some(record)),
334 }
335 }
336}
337impl<'a, N: 'a + NonVolatileMemory> Iterator for ReadEntries<'a, N> {
338 type Item = Result<JournalEntry>;
339 fn next(&mut self) -> Option<Self::Item> {
340 match self.read_record() {
341 Err(e) => Some(Err(e)),
342 Ok(None) => None,
343 Ok(Some(record)) => {
344 let start = Address::from_u64(self.current).expect("Never fails");
345 self.current += record.external_size() as u64;
346 let entry = JournalEntry { start, record };
347 Some(Ok(entry))
348 }
349 }
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use prometrics::metrics::MetricBuilder;
356 use trackable::result::TestResult;
357
358 use super::*;
359 use nvm::MemoryNvm;
360 use storage::portion::DataPortion;
361 use storage::{Address, JournalRecord};
362 use ErrorKind;
363
364 #[test]
365 fn append_and_read_records() -> TestResult {
366 let nvm = MemoryNvm::new(vec![0; 1024]);
367 let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
368
369 let records = vec![
370 record_put("000", 30, 5),
371 record_put("111", 100, 300),
372 record_delete("222"),
373 record_embed("333", b"foo"),
374 record_delete("444"),
375 record_delete_range("000", "999"),
376 ];
377 for record in &records {
378 assert!(ring.enqueue(record).is_ok());
379 }
380
381 let mut position = Address::from(0);
382 for (entry, record) in track!(ring.dequeue_iter())?.zip(records.iter()) {
383 let entry = track!(entry)?;
384 assert_eq!(entry.record, *record);
385 assert_eq!(entry.start, position);
386 position = position + Address::from(record.external_size() as u32);
387 }
388
389 assert_eq!(ring.unreleased_head, 0);
390 assert_eq!(ring.head, position.as_u64());
391 assert_eq!(ring.tail, position.as_u64());
392
393 assert_eq!(track!(ring.dequeue_iter())?.count(), 0);
394 Ok(())
395 }
396
397 #[test]
398 fn read_embedded_data() -> TestResult {
399 let nvm = MemoryNvm::new(vec![0; 1024]);
400 let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
401
402 track!(ring.enqueue(&record_put("000", 30, 5)))?;
403 track!(ring.enqueue(&record_delete("111")))?;
404
405 let (lump_id, portion) =
406 track!(ring.enqueue(&record_embed("222", b"foo")))?.expect("Some(_)");
407 assert_eq!(lump_id, track_any_err!("222".parse())?);
408
409 let mut buf = vec![0; portion.len as usize];
410 track!(ring.read_embedded_data(portion.start.as_u64(), &mut buf))?;
411 assert_eq!(buf, b"foo");
412 Ok(())
413 }
414
415 #[test]
416 fn go_round_ring_buffer() -> TestResult {
417 let nvm = MemoryNvm::new(vec![0; 1024]);
418 let mut ring = JournalRingBuffer::new(nvm, 512, &MetricBuilder::new());
419 assert_eq!(ring.head, 512);
420 assert_eq!(ring.tail, 512);
421
422 let record = record_delete("000");
423 for _ in 0..(512 / record.external_size()) {
424 track!(ring.enqueue(&record))?;
425 }
426 assert_eq!(ring.tail, 1016);
427
428 track!(ring.enqueue(&record))?;
429 assert_eq!(ring.tail, 21);
430 Ok(())
431 }
432
433 #[test]
434 fn full() -> TestResult {
435 let nvm = MemoryNvm::new(vec![0; 1024]);
436 let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
437
438 let record = record_put("000", 1, 2);
439 while ring.tail <= 1024 - record.external_size() as u64 {
440 track!(ring.enqueue(&record))?;
441 }
442 assert_eq!(ring.tail, 1008);
443
444 assert_eq!(
445 ring.enqueue(&record).err().map(|e| *e.kind()),
446 Some(ErrorKind::StorageFull)
447 );
448 assert_eq!(ring.tail, 1008);
449
450 ring.unreleased_head = 511;
451 ring.head = 511;
452 assert_eq!(
453 ring.enqueue(&record).err().map(|e| *e.kind()),
454 Some(ErrorKind::StorageFull)
455 );
456
457 ring.unreleased_head = 512;
458 ring.head = 512;
459 assert!(ring.enqueue(&record).is_ok());
460 assert_eq!(ring.tail, record.external_size() as u64);
461 Ok(())
462 }
463
464 #[test]
465 fn too_large_record() {
466 let nvm = MemoryNvm::new(vec![0; 1024]);
467 let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
468
469 let record = record_embed("000", &[0; 997]);
470 assert_eq!(record.external_size(), 1020);
471 assert_eq!(
472 ring.enqueue(&record).err().map(|e| *e.kind()),
473 Some(ErrorKind::StorageFull)
474 );
475
476 let record = record_embed("000", &[0; 996]);
477 assert_eq!(record.external_size(), 1019);
478 assert!(ring.enqueue(&record).is_ok());
479 assert_eq!(ring.tail, 1019);
480 }
481
482 fn record_put(lump_id: &str, start: u32, len: u16) -> JournalRecord<Vec<u8>> {
483 JournalRecord::Put(
484 lump_id.parse().unwrap(),
485 DataPortion {
486 start: Address::from(start),
487 len,
488 },
489 )
490 }
491
492 fn lump_id(id: &str) -> LumpId {
493 id.parse().unwrap()
494 }
495
496 fn record_embed(id: &str, data: &[u8]) -> JournalRecord<Vec<u8>> {
497 JournalRecord::Embed(lump_id(id), data.to_owned())
498 }
499
500 fn record_delete(id: &str) -> JournalRecord<Vec<u8>> {
501 JournalRecord::Delete(lump_id(id))
502 }
503
504 fn record_delete_range(start: &str, end: &str) -> JournalRecord<Vec<u8>> {
505 use std::ops::Range;
506 JournalRecord::DeleteRange(Range {
507 start: lump_id(start),
508 end: lump_id(end),
509 })
510 }
511}