1use std::cell::RefCell;
16use std::fs::File;
17use std::io::ErrorKind;
18use std::path::PathBuf;
19use std::time::SystemTime;
20
21use anyhow::anyhow;
22use anyhow::bail;
23use anyhow::Context;
24use anyhow::Result;
25use memmap2::Mmap;
26use memmap2::MmapOptions;
27use slog::warn;
28use slog::Logger;
29
30use crate::compression::Decompressor;
31use crate::deserialize_frame;
32use crate::get_index_files;
33use crate::Crc32;
34use crate::DataFrame;
35use crate::Direction;
36use crate::Format;
37use crate::IndexEntry;
38use crate::IndexEntryFlags;
39use crate::SerializedFrame;
40use crate::INDEX_ENTRY_SIZE;
41use crate::SHARD_TIME;
42
43pub trait Cursor {
45 type Offset;
46 type Item;
47
48 fn get_offset(&self) -> Self::Offset;
50 fn set_offset(&mut self, offset: Self::Offset);
52 fn get(&self) -> Option<Self::Item>;
54 fn advance(&mut self, direction: Direction) -> Result<bool>;
57 fn next(&mut self, direction: Direction) -> Result<Option<Self::Item>> {
60 let offset = self.get_offset();
61 while self.advance(direction)? {
62 if let Some(item) = self.get() {
63 return Ok(Some(item));
64 }
65 }
66 self.set_offset(offset);
67 Ok(None)
68 }
69}
70
71pub trait KeyedCursor<Key: std::cmp::Ord>: Cursor {
74 fn get_key(&self) -> Option<Key>;
76
77 fn jump_near_key(&mut self, _key: &Key, _direction: Direction) {}
82
83 fn jump_to_key(&mut self, key: &Key, direction: Direction) -> Result<bool> {
90 self.jump_near_key(key, direction);
91 let mut curr_key = self.get_key();
94 for curr_dir in &[direction.flip(), direction] {
95 let skip_order = curr_dir.get_skip_order();
96 while curr_key.as_ref().map_or(true, |k| k.cmp(key) == skip_order) {
97 if !self.advance(*curr_dir)? {
98 break;
99 }
100 curr_key = self.get_key();
101 }
102 }
103 Ok(curr_key.map_or(false, |k| k.cmp(key) != direction.get_skip_order()))
105 }
106
107 fn get_near(
115 &mut self,
116 key: &Key,
117 preferred_direction: Direction,
118 ) -> Result<Option<Self::Item>> {
119 self.jump_to_key(key, preferred_direction)?;
120 match self.get() {
121 Some(item) => Ok(Some(item)),
122 None => self.next(preferred_direction),
123 }
124 }
125
126 fn get_next(&mut self, key: &Key, direction: Direction) -> Result<Option<Self::Item>> {
134 if self.jump_to_key(key, direction)? {
135 match self.get() {
136 Some(item) => Ok(Some(item)),
137 None => self.next(direction),
138 }
139 } else {
140 Ok(None)
141 }
142 }
143}
144
145pub struct StoreCursor {
148 logger: Logger,
149 path: PathBuf,
151 shard: Option<u64>,
153 index_mmap: Option<Mmap>,
156 data_mmap: Option<Mmap>,
157 index_offset: Option<usize>,
161 decompressor: RefCell<Option<Decompressor<(u64, usize)>>>,
164}
165
166enum StoreFile {
167 Index,
168 Data,
169}
170
171impl StoreCursor {
172 pub fn new(logger: Logger, path: PathBuf) -> Self {
174 Self {
175 logger,
176 path,
177 shard: None,
178 index_mmap: None,
179 data_mmap: None,
180 index_offset: None,
181 decompressor: RefCell::new(None),
182 }
183 }
184
185 fn get_mmap(&self, file_type: StoreFile, shard: u64) -> Result<Option<Mmap>> {
188 let prefix = match file_type {
189 StoreFile::Index => "index",
190 StoreFile::Data => "data",
191 };
192 let path = self.path.join(format!("{}_{:011}", prefix, shard));
193 let file = match File::open(&path) {
194 Ok(f) => f,
195 Err(e) if e.kind() == ErrorKind::NotFound => {
196 warn!(
197 self.logger,
198 "Expected file does not exist: {}",
199 path.display()
200 );
201 return Ok(None);
202 }
203 Err(e) => {
204 return Err(e).context(format!("Failed while opening file: {}", path.display()));
205 }
206 };
207
208 let mut len = file
209 .metadata()
210 .with_context(|| format!("Failed to get metadata of file: {}", path.display()))?
211 .len() as usize;
212 if let StoreFile::Index = file_type {
213 len = len - len % INDEX_ENTRY_SIZE;
214 }
215 if len == 0 {
216 warn!(self.logger, "0 length file found: {}", path.display());
217 return Ok(None);
218 }
219
220 unsafe {
226 Some(
227 MmapOptions::new()
228 .len(len)
229 .map(&file)
230 .with_context(|| format!("Failed to mmap file {}", path.display())),
231 )
232 .transpose()
233 }
234 }
235
236 fn update_shard(&mut self, shard: u64) -> Result<bool> {
244 let new_index_mmap = match self.get_mmap(StoreFile::Index, shard)? {
247 Some(index_mmap) => index_mmap,
248 None => return Ok(false),
249 };
250 let new_data_mmap = match self.get_mmap(StoreFile::Data, shard)? {
251 Some(data_mmap) => data_mmap,
252 None => return Ok(false),
253 };
254 if self.shard == Some(shard) {
255 let index_mmap_len = self.index_mmap.as_ref().map_or(0, |m| m.len());
256 if new_index_mmap.len() <= index_mmap_len {
257 return Ok(false);
259 }
260 } else {
261 self.shard = Some(shard);
262 self.index_offset = None;
263 }
264 self.index_mmap = Some(new_index_mmap);
265 self.data_mmap = Some(new_data_mmap);
266 Ok(true)
267 }
268
269 fn update_or_advance_shard(&mut self, direction: Direction) -> Result<bool> {
275 let entries = get_index_files(&self.path)?;
276
277 let entries_iter: Box<dyn Iterator<Item = &String>> = match direction {
278 Direction::Forward => Box::new(entries.iter()),
279 Direction::Reverse => Box::new(entries.iter().rev()),
280 };
281 for entry in entries_iter {
282 let v: Vec<&str> = entry.split('_').collect();
283 if v.len() != 2 {
284 warn!(self.logger, "Invalid index file name: {}", entry);
285 continue;
286 }
287
288 let entry_shard = match v[1].parse::<u64>() {
289 Ok(val) => val,
290 _ => {
291 warn!(self.logger, "Cannot parse index shard: {}", entry);
292 continue;
293 }
294 };
295
296 if let Some(shard) = self.shard.as_ref() {
297 if entry_shard.cmp(shard) == direction.get_skip_order() {
298 continue;
299 }
300 }
301
302 if self.update_shard(entry_shard)? {
305 return Ok(true);
306 }
307 }
308 Ok(false)
309 }
310
311 fn advance_index(&mut self, direction: Direction) -> bool {
314 if let Some(index_mmap) = self.index_mmap.as_ref() {
315 debug_assert!(index_mmap.len() > 0);
318 let offset = match self.index_offset {
320 Some(offset) => match direction {
321 Direction::Forward => offset
322 .checked_add(INDEX_ENTRY_SIZE)
323 .filter(|o| o < &index_mmap.len()),
324 Direction::Reverse => offset.checked_sub(INDEX_ENTRY_SIZE),
325 },
326 None => match direction {
328 Direction::Forward => Some(0),
329 Direction::Reverse => index_mmap.len().checked_sub(INDEX_ENTRY_SIZE),
330 },
331 };
332 if offset.is_some() {
333 self.index_offset = offset;
334 return true;
335 }
336 }
337 false
338 }
339
340 fn get_index_entry_at(&self, index_offset: usize) -> Option<&IndexEntry> {
343 let index_mmap = self.index_mmap.as_ref()?;
344 let index_entry_slice =
345 index_mmap.get(index_offset..(index_offset.checked_add(INDEX_ENTRY_SIZE)?))?;
346 let (_, body, _) = unsafe { index_entry_slice.align_to::<IndexEntry>() };
355 assert_eq!(
356 body.len(),
357 1,
358 "bug: Mis-aligned index entry found: shard={} offset={}",
359 self.shard.unwrap(),
360 index_offset,
361 );
362 if index_entry_slice == [0; INDEX_ENTRY_SIZE] {
365 return None;
366 }
367 let index_entry = &body[0];
368 if index_entry.crc32() != index_entry.index_crc {
369 warn!(
370 self.logger,
371 "Corrupted index entry found: shard={} offset={:#x}",
372 self.shard.unwrap(),
373 index_offset,
374 );
375 None
376 } else {
377 Some(index_entry)
378 }
379 }
380
381 fn get_index_entry(&self) -> Option<&IndexEntry> {
383 self.get_index_entry_at(self.index_offset?)
384 }
385
386 fn get_serialized_single_frame<'a>(
389 data_slice: &'a [u8],
390 compressed: bool,
391 decompressor: &mut Option<Decompressor<(u64, usize)>>,
392 ) -> Result<SerializedFrame<'a>> {
393 let serialized_frame = if compressed {
394 SerializedFrame::Owned(
395 decompressor
396 .get_or_insert_with(Decompressor::new)
397 .decompress_with_dict_reset(data_slice)
398 .context("Failed to decompress data frame")?,
399 )
400 } else {
401 SerializedFrame::Borrowed(data_slice)
402 };
403 Ok(serialized_frame)
404 }
405
406 fn get_serialized_chunk_frame(
415 &self,
416 data_slice: &[u8],
417 index_offset: usize,
418 chunk_compress_size_po2: u32,
419 decompressor: &mut Option<Decompressor<(u64, usize)>>,
420 ) -> Result<SerializedFrame> {
421 let chunk_mask = (INDEX_ENTRY_SIZE << chunk_compress_size_po2) - 1;
424 let dict_index_offset = index_offset & !chunk_mask;
425
426 let shard = self.shard.expect("shard should be set");
427 let dict_key = (shard, dict_index_offset);
428
429 let decompressor = match decompressor {
430 Some(d) if d.get_dict_key() == Some(&dict_key) => d,
431 _ => {
432 let (index_entry, data_slice) = self.get_index_and_data_at(dict_index_offset)?;
433 let dict_key_frame = Self::get_serialized_single_frame(
434 data_slice,
435 index_entry.flags.contains(IndexEntryFlags::COMPRESSED),
436 decompressor,
437 )
438 .context("Failed to get serialized dict key frame")?;
439 let d = decompressor.get_or_insert_with(Decompressor::new);
440 d.load_dict(dict_key_frame.into_owned(), dict_key)
441 .context("Failed to set decompressor dict")?;
442 d
443 }
444 };
445
446 let bytes = if index_offset == dict_index_offset {
449 decompressor.get_dict().clone()
450 } else {
451 decompressor
452 .decompress_with_loaded_dict(data_slice)
453 .context("Failed to decompress data frame with dictionary")?
454 };
455 Ok(SerializedFrame::Owned(bytes))
456 }
457
458 fn get_index_and_data_at(&self, index_offset: usize) -> Result<(&IndexEntry, &[u8])> {
460 let index_entry = self
461 .get_index_entry_at(index_offset)
462 .ok_or_else(|| anyhow!("Failed to get index entry at offset {}", index_offset))?;
463 let data_mmap = self
464 .data_mmap
465 .as_ref()
466 .ok_or_else(|| anyhow!("Failed to get mmap"))?;
467 let data_offset = index_entry.offset as usize;
468 let data_len = index_entry.len as usize;
469 let data_slice = data_mmap
470 .get(
471 data_offset
472 ..(data_offset
473 .checked_add(data_len)
474 .ok_or_else(|| anyhow!("overflow"))?),
475 )
476 .ok_or_else(|| anyhow!("Failed to get data slice from mmap"))?;
477
478 if data_slice.crc32() != index_entry.data_crc {
479 bail!(
480 "Corrupted data entry found: ts={} offset={:#x}",
481 index_entry.timestamp,
482 index_entry.offset,
483 );
484 };
485 Ok((index_entry, data_slice))
486 }
487
488 fn get_index_and_serialized_frame_at(
491 &self,
492 index_offset: usize,
493 ) -> Result<(&IndexEntry, SerializedFrame)> {
494 let (index_entry, data_slice) = self.get_index_and_data_at(index_offset)?;
495 let chunk_compress_size_po2 = index_entry.flags.get_chunk_compress_size_po2();
496 let uncompressed_frame = if chunk_compress_size_po2 > 0 {
497 self.get_serialized_chunk_frame(
501 data_slice,
502 index_offset,
503 chunk_compress_size_po2,
504 &mut self.decompressor.borrow_mut(),
505 )
506 .context("Failed to get serialized chunk frame")?
507 } else {
508 Self::get_serialized_single_frame(
509 data_slice,
510 index_entry.flags.contains(IndexEntryFlags::COMPRESSED),
511 &mut self.decompressor.borrow_mut(),
512 )
513 .context("Failed to get serialized single frame")?
514 };
515 Ok((index_entry, uncompressed_frame))
516 }
517}
518
519#[derive(Clone, Debug, Default, PartialEq)]
521pub struct StoreOffset {
522 shard: Option<u64>,
523 index_offset: Option<usize>,
524}
525
526impl StoreOffset {
527 pub fn new(shard: Option<u64>, index_offset: Option<usize>) -> Self {
530 StoreOffset {
531 shard: shard.as_ref().map(|s| s - s % SHARD_TIME),
532 index_offset: shard.and(index_offset.map(|o| o - o % INDEX_ENTRY_SIZE)),
533 }
534 }
535
536 pub fn get_shard(&self) -> Option<u64> {
537 self.shard
538 }
539
540 pub fn get_index_offset(&self) -> Option<usize> {
541 self.index_offset
542 }
543}
544
545impl Cursor for StoreCursor {
546 type Offset = StoreOffset;
547 type Item = (SystemTime, DataFrame);
548
549 fn get_offset(&self) -> StoreOffset {
550 StoreOffset::new(self.shard, self.index_offset)
551 }
552
553 fn set_offset(&mut self, offset: StoreOffset) {
554 if let Some(shard) = offset.get_shard() {
555 if self.shard == Some(shard) || self.update_shard(shard).unwrap_or(false) {
556 self.index_offset = offset.get_index_offset();
557 return;
558 }
559 }
560 self.shard = offset.get_shard();
566 self.index_mmap = None;
567 self.index_offset = offset.get_index_offset();
568 }
569
570 fn advance(&mut self, direction: Direction) -> Result<bool> {
575 while !self.advance_index(direction) {
576 if !self.update_or_advance_shard(direction)? {
577 return Ok(false);
579 }
580 }
581 Ok(true)
582 }
583
584 fn get(&self) -> Option<(SystemTime, DataFrame)> {
589 match self.get_index_and_serialized_frame_at(self.index_offset?) {
590 Ok((index_entry, serialized_data)) => {
591 let format = if index_entry.flags.contains(IndexEntryFlags::CBOR) {
592 Format::Cbor
593 } else {
594 panic!("Unexpected format");
595 };
596 let ts =
597 std::time::UNIX_EPOCH + std::time::Duration::from_secs(index_entry.timestamp);
598 match deserialize_frame(serialized_data.as_ref(), format) {
599 Ok(df) => Some((ts, df)),
600 Err(e) => {
601 warn!(self.logger, "Failed to deserialize data frame: {}", e);
602 None
603 }
604 }
605 }
606 Err(e) => {
607 warn!(
608 self.logger,
609 "Failed to extract serialized data frame: {}", e
610 );
611 None
612 }
613 }
614 }
615}
616
617impl KeyedCursor<u64> for StoreCursor {
620 fn get_key(&self) -> Option<u64> {
622 Some(self.get_index_entry()?.timestamp)
623 }
624
625 fn jump_near_key(&mut self, key: &u64, _direction: Direction) {
628 let time_offset = key % SHARD_TIME;
629 let shard = key - time_offset;
630 self.set_offset(StoreOffset::new(Some(shard), None));
631 if self.advance_index(Direction::Reverse) {
633 if let Some(last_entry) = self.get_index_entry() {
634 let last_entry_index_offset = self
635 .get_offset()
636 .get_index_offset()
637 .expect("get_index_offset should return Some if get_index_entry returns Some");
638 let last_entry_time_offset = last_entry.timestamp % SHARD_TIME;
639 if last_entry_time_offset != 0 {
640 let index_offset_hint = (last_entry_index_offset as f64
643 / last_entry_time_offset as f64
644 * time_offset as f64) as usize;
645 self.set_offset(StoreOffset::new(Some(shard), Some(index_offset_hint)));
646 }
647 }
648 }
649 }
650}
651
652#[cfg(test)]
653mod tests {
654 use std::fs::OpenOptions;
655 use std::io::Write;
656
657 use common::util::get_unix_timestamp;
658 use slog::Drain;
659 use tempfile::TempDir;
660 use Direction::Forward;
661 use Direction::Reverse;
662
663 use super::*;
664 use crate::serialize_frame;
665 use crate::ChunkSizePo2;
666 use crate::CompressionMode;
667 use crate::StoreWriter;
668
669 struct TestCursor<'a> {
671 data: &'a Vec<Option<i32>>,
672 offset: Option<usize>,
673 }
674 impl Cursor for TestCursor<'_> {
675 type Offset = Option<usize>;
676 type Item = i32;
677 fn get_offset(&self) -> Self::Offset {
678 self.offset
679 }
680 fn set_offset(&mut self, offset: Self::Offset) {
681 self.offset = offset;
682 }
683 fn get(&self) -> Option<Self::Item> {
684 self.offset
685 .as_ref()
686 .and_then(|o| self.data.get(*o).cloned().flatten())
687 }
688 fn advance(&mut self, direction: Direction) -> Result<bool> {
689 let offset = match self.offset {
690 Some(offset) => match direction {
691 Direction::Forward => offset.checked_add(1).filter(|o| o < &self.data.len()),
692 Direction::Reverse => offset.checked_sub(1),
693 },
694 None => match direction {
696 Direction::Forward => Some(0).filter(|o| o < &self.data.len()),
697 Direction::Reverse => self.data.len().checked_sub(1),
698 },
699 };
700 if offset.is_some() {
701 self.offset = offset;
702 Ok(true)
703 } else {
704 Ok(false)
705 }
706 }
707 }
708 impl KeyedCursor<i32> for TestCursor<'_> {
709 fn get_key(&self) -> Option<i32> {
710 self.get()
711 }
712 }
713
714 #[test]
716 fn default_next() {
717 let data = vec![None, Some(3), Some(5), None, None, Some(9)];
718 let mut cursor = TestCursor {
719 data: &data,
720 offset: None,
721 };
722 assert_eq!(cursor.next(Forward).unwrap(), Some(3));
723 assert_eq!(cursor.next(Forward).unwrap(), Some(5));
724 assert_eq!(cursor.next(Forward).unwrap(), Some(9));
725 assert_eq!(cursor.next(Forward).unwrap(), None);
726 assert_eq!(cursor.next(Reverse).unwrap(), Some(5));
727 assert_eq!(cursor.next(Reverse).unwrap(), Some(3));
728 assert_eq!(cursor.next(Reverse).unwrap(), None);
729 assert_eq!(cursor.get(), Some(3));
731 }
732
733 #[test]
735 fn default_jump_to_key() {
736 let data = vec![None, Some(3), Some(5), None, None, Some(9)];
737 let mut cursor = TestCursor {
738 data: &data,
739 offset: None,
740 };
741 assert!(cursor.jump_to_key(&3, Forward).unwrap());
743 assert_eq!(cursor.get_key(), Some(3));
744 assert!(cursor.jump_to_key(&5, Reverse).unwrap());
745 assert_eq!(cursor.get_key(), Some(5));
746 assert!(cursor.jump_to_key(&7, Forward).unwrap());
748 assert_eq!(cursor.get_key(), Some(9));
749 assert!(cursor.jump_to_key(&4, Reverse).unwrap());
750 assert_eq!(cursor.get_key(), Some(3));
751 assert!(!cursor.jump_to_key(&10, Forward).unwrap());
753 assert_eq!(cursor.get_key(), Some(9));
754 assert!(!cursor.jump_to_key(&0, Reverse).unwrap());
755 assert_eq!(cursor.get_key(), None);
756 }
757
758 #[test]
760 fn default_get_near() {
761 let data = vec![Some(3), Some(5), None, None, Some(9)];
762 let mut cursor = TestCursor {
763 data: &data,
764 offset: None,
765 };
766 assert_eq!(cursor.get_near(&5, Forward).unwrap(), Some(5));
768 assert_eq!(cursor.get_near(&4, Forward).unwrap(), Some(5));
770 assert_eq!(cursor.get_near(&4, Reverse).unwrap(), Some(3));
771 assert_eq!(cursor.get_near(&2, Reverse).unwrap(), Some(3));
773 assert_eq!(cursor.get_near(&10, Forward).unwrap(), Some(9));
774 }
775
776 #[test]
778 fn default_get_next() {
779 let data = vec![Some(3), Some(5), None, None, Some(9)];
780 let mut cursor = TestCursor {
781 data: &data,
782 offset: None,
783 };
784 assert_eq!(cursor.get_next(&5, Forward).unwrap(), Some(5));
786 assert_eq!(cursor.get_next(&4, Forward).unwrap(), Some(5));
788 assert_eq!(cursor.get_next(&4, Reverse).unwrap(), Some(3));
789 assert_eq!(cursor.get_next(&2, Reverse).unwrap(), None);
791 assert_eq!(cursor.get_next(&10, Forward).unwrap(), None);
792 }
793
794 fn get_logger() -> Logger {
795 let plain = slog_term::PlainSyncDecorator::new(std::io::stderr());
796 Logger::root(slog_term::FullFormat::new(plain).build().fuse(), slog::o!())
797 }
798
799 fn simple_put_read(compression_mode: CompressionMode, format: Format) {
801 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
802 let ts = get_unix_timestamp(SystemTime::now());
803 let now = std::time::UNIX_EPOCH + std::time::Duration::from_secs(ts);
804 let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
805 .expect("Failed to create store");
806 let mut frame = DataFrame::default();
807 frame.sample.cgroup.memory_current = Some(42);
808 writer.put(now, &frame).expect("Failed to store data");
809
810 let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
811 let sample = cursor
812 .next(Forward)
813 .expect("Failed to read sample")
814 .expect("Did not find stored sample");
815 assert_eq!(sample, (now, frame));
816 }
817
818 #[test]
819 fn read_cbor() {
820 simple_put_read(CompressionMode::None, Format::Cbor);
821 }
822 #[test]
823 fn read_compressed_cbor() {
824 simple_put_read(CompressionMode::Zstd, Format::Cbor);
825 }
826
827 #[test]
828 fn read_dict_compressed_cbor() {
829 simple_put_read(
830 CompressionMode::ZstdDictionary(ChunkSizePo2(2)),
831 Format::Cbor,
832 );
833 }
834
835 struct TestWriter {
840 path: PathBuf,
841 }
842
843 impl TestWriter {
844 pub fn new<P: AsRef<std::path::Path>>(path: P) -> Self {
845 Self {
846 path: path.as_ref().to_path_buf(),
847 }
848 }
849
850 pub fn put(&self, timestamp: u64) -> Result<()> {
851 self.put_helper(timestamp, false, false)
852 }
853 pub fn put_corrupt_index(&self, timestamp: u64) -> Result<()> {
854 self.put_helper(timestamp, true, false)
855 }
856 pub fn put_corrupt_data(&self, timestamp: u64) -> Result<()> {
857 self.put_helper(timestamp, false, true)
858 }
859
860 fn put_helper(
863 &self,
864 timestamp: u64,
865 corrupt_index: bool,
866 corrupt_data: bool,
867 ) -> Result<()> {
868 let shard = timestamp - timestamp % SHARD_TIME;
869 let open_options = OpenOptions::new().create(true).append(true).clone();
870
871 let data_bytes = serialize_frame(&DataFrame::default(), Format::Cbor)
872 .context("Failed to serialize data frame")?;
873 let data_crc = if corrupt_data { 0 } else { data_bytes.crc32() };
874 let mut data_file = open_options
875 .open(self.path.join(format!("data_{:011}", shard)))
876 .context("Failed to open data file")?;
877 let offset = data_file
878 .metadata()
879 .context("Failed to get metadata of data file")?
880 .len();
881 data_file
882 .write_all(&data_bytes)
883 .context("Failed to write to data file")?;
884
885 let mut index_entry = IndexEntry {
886 timestamp,
887 offset,
888 len: data_bytes.len() as u32,
889 flags: IndexEntryFlags::CBOR,
890 data_crc,
891 index_crc: 0,
892 };
893 if !corrupt_index {
894 index_entry.index_crc = index_entry.crc32();
895 }
896 let entry_slice = unsafe {
897 std::slice::from_raw_parts(
898 &index_entry as *const IndexEntry as *const u8,
899 INDEX_ENTRY_SIZE,
900 )
901 };
902 open_options
903 .open(self.path.join(format!("index_{:011}", shard)))
904 .context("Failed to open index file")?
905 .write_all(entry_slice)
906 .context("Failed to write entry to index file")?;
907 Ok(())
908 }
909 }
910
911 #[test]
913 fn advance_when_empty() {
914 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
915 let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
916
917 assert!(!cursor.advance(Forward).unwrap());
918 assert!(cursor.get_key().is_none());
919 assert!(!cursor.advance(Reverse).unwrap());
920 assert!(cursor.get_key().is_none());
921 }
922
923 #[test]
925 fn advance_at_boundries() {
926 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
927 let ts = get_unix_timestamp(SystemTime::now());
928 let writer = TestWriter::new(&dir);
929 let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
930
931 writer.put(ts).unwrap();
932
933 assert!(cursor.advance(Forward).unwrap());
935 assert_eq!(cursor.get_key(), Some(ts));
936 assert!(!cursor.advance(Forward).unwrap());
938 assert_eq!(cursor.get_key(), Some(ts));
939 assert!(!cursor.advance(Reverse).unwrap());
941 assert_eq!(cursor.get_key(), Some(ts));
942 }
943
944 #[test]
946 fn advance_simple() {
947 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
948 let ts = get_unix_timestamp(SystemTime::now());
949 let writer = TestWriter::new(&dir);
950 let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
951
952 writer.put(ts).unwrap();
953 writer.put(ts + 5).unwrap();
954 writer.put(ts + SHARD_TIME).unwrap();
955
956 assert!(cursor.advance(Forward).unwrap());
958 assert_eq!(cursor.get_key(), Some(ts));
959 assert!(cursor.advance(Forward).unwrap());
961 assert_eq!(cursor.get_key(), Some(ts + 5));
962 assert!(cursor.advance(Forward).unwrap());
964 assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME));
965 assert!(cursor.advance(Reverse).unwrap());
967 assert_eq!(cursor.get_key(), Some(ts + 5));
968 assert!(cursor.advance(Reverse).unwrap());
970 assert_eq!(cursor.get_key(), Some(ts));
971 }
972
973 #[test]
975 fn advance_retry() {
976 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
977 let ts = get_unix_timestamp(SystemTime::now());
978 let writer = TestWriter::new(&dir);
979 let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
980
981 writer.put(ts).unwrap();
982 assert!(cursor.advance(Forward).unwrap());
983 assert_eq!(cursor.get_key(), Some(ts));
984 assert!(!cursor.advance(Forward).unwrap());
985
986 writer.put(ts + 5).unwrap();
988 assert!(cursor.advance(Forward).unwrap());
989 assert_eq!(cursor.get_key(), Some(ts + 5));
990 assert!(!cursor.advance(Forward).unwrap());
991
992 writer.put(ts + SHARD_TIME).unwrap();
994 assert!(cursor.advance(Forward).unwrap());
995 assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME));
996 assert!(!cursor.advance(Forward).unwrap());
997 }
998
999 #[test]
1001 fn get_corrupt() {
1002 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1003 let ts = get_unix_timestamp(SystemTime::now());
1004 let writer = TestWriter::new(&dir);
1005 let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1006
1007 writer.put_corrupt_index(ts).unwrap();
1009 assert!(cursor.advance(Forward).unwrap());
1010 assert!(cursor.get_key().is_none());
1011 assert!(cursor.get().is_none());
1012
1013 writer.put_corrupt_data(ts + 5).unwrap();
1015 assert!(cursor.advance(Forward).unwrap());
1016 assert_eq!(cursor.get_key(), Some(ts + 5));
1017 assert!(cursor.get().is_none());
1018 }
1019
1020 #[test]
1022 fn skip_corrupt() {
1023 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1024 let ts = get_unix_timestamp(SystemTime::now());
1025 let writer = TestWriter::new(&dir);
1026 let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1027
1028 writer.put_corrupt_data(ts).unwrap();
1030 writer.put(ts + 5).unwrap(); writer.put_corrupt_data(ts + 5 * 2).unwrap();
1032 writer.put_corrupt_index(ts + 5 * 3).unwrap();
1033 writer.put_corrupt_data(ts + SHARD_TIME).unwrap();
1034 writer.put_corrupt_index(ts + SHARD_TIME * 2).unwrap();
1035 writer.put(ts + SHARD_TIME * 2 + 5).unwrap(); writer.put_corrupt_data(ts + SHARD_TIME * 3).unwrap();
1037
1038 assert_eq!(
1040 get_unix_timestamp(cursor.next(Forward).unwrap().unwrap().0),
1041 ts + 5
1042 );
1043 assert_eq!(
1044 get_unix_timestamp(cursor.next(Forward).unwrap().unwrap().0),
1045 ts + SHARD_TIME * 2 + 5
1046 );
1047 assert!(cursor.next(Forward).unwrap().is_none());
1049 assert_eq!(
1050 get_unix_timestamp(cursor.next(Reverse).unwrap().unwrap().0),
1051 ts + 5
1052 );
1053 }
1054
1055 #[test]
1057 fn manipulate_offset() {
1058 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1059 let ts = get_unix_timestamp(SystemTime::now());
1060 let writer = TestWriter::new(&dir);
1061 let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1062
1063 writer.put(ts).unwrap();
1064 writer.put(ts + 5).unwrap();
1065 writer.put(ts + SHARD_TIME * 2 + 5).unwrap();
1066
1067 let expected_offsets = &[
1068 StoreOffset::new(Some(ts), Some(0)),
1069 StoreOffset::new(Some(ts), Some(INDEX_ENTRY_SIZE)),
1070 StoreOffset::new(Some(ts + SHARD_TIME * 2), Some(0)),
1071 ];
1072
1073 assert_eq!(cursor.get_offset(), StoreOffset::default());
1075 assert!(cursor.advance(Forward).unwrap());
1076 assert_eq!(cursor.get_offset(), expected_offsets[0]);
1077 assert!(cursor.advance(Forward).unwrap());
1078 assert_eq!(cursor.get_offset(), expected_offsets[1]);
1079 assert!(cursor.advance(Forward).unwrap());
1080 assert_eq!(cursor.get_offset(), expected_offsets[2]);
1081
1082 cursor.set_offset(StoreOffset::default());
1084 assert!(cursor.get_key().is_none());
1085 cursor.set_offset(StoreOffset::new(
1087 Some(ts + SHARD_TIME),
1088 Some(INDEX_ENTRY_SIZE),
1089 ));
1090 assert!(cursor.get_key().is_none());
1091 cursor.set_offset(StoreOffset::new(Some(ts + SHARD_TIME * 2), None));
1092 assert!(cursor.get_key().is_none());
1093 cursor.set_offset(expected_offsets[1].clone());
1095 assert_eq!(cursor.get_key(), Some(ts + 5));
1096 cursor.set_offset(expected_offsets[0].clone());
1097 assert_eq!(cursor.get_key(), Some(ts));
1098 cursor.set_offset(expected_offsets[2].clone());
1099 assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME * 2 + 5));
1100 }
1101
1102 #[test]
1104 fn advance_from_invalid_offset() {
1105 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1106 let ts = get_unix_timestamp(SystemTime::now());
1107 let writer = TestWriter::new(&dir);
1108 let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1109
1110 writer.put(ts).unwrap();
1111 writer.put(ts + SHARD_TIME * 2).unwrap();
1112
1113 cursor.set_offset(StoreOffset::new(Some(ts), Some(INDEX_ENTRY_SIZE)));
1115 assert!(cursor.advance(Reverse).unwrap());
1116 assert_eq!(cursor.get_key(), Some(ts));
1117 cursor.set_offset(StoreOffset::new(Some(ts + SHARD_TIME), Some(0)));
1119 assert!(cursor.advance(Forward).unwrap());
1120 assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME * 2));
1121 cursor.set_offset(StoreOffset::new(Some(ts + SHARD_TIME * 4), Some(0)));
1123 assert!(!cursor.advance(Forward).unwrap());
1124 assert_eq!(
1125 cursor.get_offset(),
1126 StoreOffset::new(Some(ts + SHARD_TIME * 4), Some(0))
1127 );
1128 }
1129
1130 #[test]
1132 fn jump_to_key() {
1133 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1134 let ts = get_unix_timestamp(SystemTime::now());
1135 let writer = TestWriter::new(&dir);
1136 let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1137
1138 writer.put(ts + 5).unwrap();
1139 writer.put(ts + 5 * 20).unwrap();
1140 writer.put(ts + 5 * 21).unwrap();
1141 writer.put(ts + SHARD_TIME * 2).unwrap();
1142
1143 cursor.jump_to_key(&(ts + 5), Forward).unwrap();
1145 assert_eq!(cursor.get_key(), Some(ts + 5));
1146 cursor.jump_to_key(&(ts + SHARD_TIME * 2), Reverse).unwrap();
1147 assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME * 2));
1148
1149 cursor.jump_to_key(&(ts), Reverse).unwrap();
1151 assert_eq!(cursor.get_key(), Some(ts + 5));
1152 cursor.jump_to_key(&(ts + SHARD_TIME * 3), Forward).unwrap();
1153 assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME * 2));
1154
1155 cursor.jump_to_key(&(ts + 5 * 100), Forward).unwrap();
1157 assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME * 2));
1158 cursor.jump_to_key(&(ts + 5 * 100), Reverse).unwrap();
1159 assert_eq!(cursor.get_key(), Some(ts + 5 * 21));
1160 }
1161}