use std::cell::RefCell;
use std::fs::File;
use std::io::ErrorKind;
use std::path::PathBuf;
use std::time::SystemTime;
use anyhow::anyhow;
use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use memmap2::Mmap;
use memmap2::MmapOptions;
use slog::warn;
use slog::Logger;
use crate::compression::Decompressor;
use crate::deserialize_frame;
use crate::get_index_files;
use crate::Crc32;
use crate::DataFrame;
use crate::Direction;
use crate::Format;
use crate::IndexEntry;
use crate::IndexEntryFlags;
use crate::SerializedFrame;
use crate::INDEX_ENTRY_SIZE;
use crate::SHARD_TIME;
pub trait Cursor {
type Offset;
type Item;
fn get_offset(&self) -> Self::Offset;
fn set_offset(&mut self, offset: Self::Offset);
fn get(&self) -> Option<Self::Item>;
fn advance(&mut self, direction: Direction) -> Result<bool>;
fn next(&mut self, direction: Direction) -> Result<Option<Self::Item>> {
let offset = self.get_offset();
while self.advance(direction)? {
if let Some(item) = self.get() {
return Ok(Some(item));
}
}
self.set_offset(offset);
Ok(None)
}
}
pub trait KeyedCursor<Key: std::cmp::Ord>: Cursor {
fn get_key(&self) -> Option<Key>;
fn jump_near_key(&mut self, _key: &Key, _direction: Direction) {}
fn jump_to_key(&mut self, key: &Key, direction: Direction) -> Result<bool> {
self.jump_near_key(key, direction);
let mut curr_key = self.get_key();
for curr_dir in &[direction.flip(), direction] {
let skip_order = curr_dir.get_skip_order();
while curr_key.as_ref().map_or(true, |k| k.cmp(key) == skip_order) {
if !self.advance(*curr_dir)? {
break;
}
curr_key = self.get_key();
}
}
Ok(curr_key.map_or(false, |k| k.cmp(key) != direction.get_skip_order()))
}
fn get_near(
&mut self,
key: &Key,
preferred_direction: Direction,
) -> Result<Option<Self::Item>> {
self.jump_to_key(key, preferred_direction)?;
match self.get() {
Some(item) => Ok(Some(item)),
None => self.next(preferred_direction),
}
}
fn get_next(&mut self, key: &Key, direction: Direction) -> Result<Option<Self::Item>> {
if self.jump_to_key(key, direction)? {
match self.get() {
Some(item) => Ok(Some(item)),
None => self.next(direction),
}
} else {
Ok(None)
}
}
}
pub struct StoreCursor {
logger: Logger,
path: PathBuf,
shard: Option<u64>,
index_mmap: Option<Mmap>,
data_mmap: Option<Mmap>,
index_offset: Option<usize>,
decompressor: RefCell<Option<Decompressor<(u64, usize)>>>,
}
enum StoreFile {
Index,
Data,
}
impl StoreCursor {
pub fn new(logger: Logger, path: PathBuf) -> Self {
Self {
logger,
path,
shard: None,
index_mmap: None,
data_mmap: None,
index_offset: None,
decompressor: RefCell::new(None),
}
}
fn get_mmap(&self, file_type: StoreFile, shard: u64) -> Result<Option<Mmap>> {
let prefix = match file_type {
StoreFile::Index => "index",
StoreFile::Data => "data",
};
let path = self.path.join(format!("{}_{:011}", prefix, shard));
let file = match File::open(&path) {
Ok(f) => f,
Err(e) if e.kind() == ErrorKind::NotFound => {
warn!(
self.logger,
"Expected file does not exist: {}",
path.display()
);
return Ok(None);
}
Err(e) => {
return Err(e).context(format!("Failed while opening file: {}", path.display()));
}
};
let mut len = file
.metadata()
.with_context(|| format!("Failed to get metadata of file: {}", path.display()))?
.len() as usize;
if let StoreFile::Index = file_type {
len = len - len % INDEX_ENTRY_SIZE;
}
if len == 0 {
warn!(self.logger, "0 length file found: {}", path.display());
return Ok(None);
}
unsafe {
Some(
MmapOptions::new()
.len(len)
.map(&file)
.with_context(|| format!("Failed to mmap file {}", path.display())),
)
.transpose()
}
}
fn update_shard(&mut self, shard: u64) -> Result<bool> {
let new_index_mmap = match self.get_mmap(StoreFile::Index, shard)? {
Some(index_mmap) => index_mmap,
None => return Ok(false),
};
let new_data_mmap = match self.get_mmap(StoreFile::Data, shard)? {
Some(data_mmap) => data_mmap,
None => return Ok(false),
};
if self.shard == Some(shard) {
let index_mmap_len = self.index_mmap.as_ref().map_or(0, |m| m.len());
if new_index_mmap.len() <= index_mmap_len {
return Ok(false);
}
} else {
self.shard = Some(shard);
self.index_offset = None;
}
self.index_mmap = Some(new_index_mmap);
self.data_mmap = Some(new_data_mmap);
Ok(true)
}
fn update_or_advance_shard(&mut self, direction: Direction) -> Result<bool> {
let entries = get_index_files(&self.path)?;
let entries_iter: Box<dyn Iterator<Item = &String>> = match direction {
Direction::Forward => Box::new(entries.iter()),
Direction::Reverse => Box::new(entries.iter().rev()),
};
for entry in entries_iter {
let v: Vec<&str> = entry.split('_').collect();
if v.len() != 2 {
warn!(self.logger, "Invalid index file name: {}", entry);
continue;
}
let entry_shard = match v[1].parse::<u64>() {
Ok(val) => val,
_ => {
warn!(self.logger, "Cannot parse index shard: {}", entry);
continue;
}
};
if let Some(shard) = self.shard.as_ref() {
if entry_shard.cmp(shard) == direction.get_skip_order() {
continue;
}
}
if self.update_shard(entry_shard)? {
return Ok(true);
}
}
Ok(false)
}
fn advance_index(&mut self, direction: Direction) -> bool {
if let Some(index_mmap) = self.index_mmap.as_ref() {
debug_assert!(index_mmap.len() > 0);
let offset = match self.index_offset {
Some(offset) => match direction {
Direction::Forward => offset
.checked_add(INDEX_ENTRY_SIZE)
.filter(|o| o < &index_mmap.len()),
Direction::Reverse => offset.checked_sub(INDEX_ENTRY_SIZE),
},
None => match direction {
Direction::Forward => Some(0),
Direction::Reverse => index_mmap.len().checked_sub(INDEX_ENTRY_SIZE),
},
};
if offset.is_some() {
self.index_offset = offset;
return true;
}
}
false
}
fn get_index_entry_at(&self, index_offset: usize) -> Option<&IndexEntry> {
let index_mmap = self.index_mmap.as_ref()?;
let index_entry_slice =
index_mmap.get(index_offset..(index_offset.checked_add(INDEX_ENTRY_SIZE)?))?;
let (_, body, _) = unsafe { index_entry_slice.align_to::<IndexEntry>() };
assert_eq!(
body.len(),
1,
"bug: Mis-aligned index entry found: shard={} offset={}",
self.shard.unwrap(),
index_offset,
);
if index_entry_slice == [0; INDEX_ENTRY_SIZE] {
return None;
}
let index_entry = &body[0];
if index_entry.crc32() != index_entry.index_crc {
warn!(
self.logger,
"Corrupted index entry found: shard={} offset={:#x}",
self.shard.unwrap(),
index_offset,
);
None
} else {
Some(index_entry)
}
}
fn get_index_entry(&self) -> Option<&IndexEntry> {
self.get_index_entry_at(self.index_offset?)
}
fn get_serialized_single_frame<'a>(
data_slice: &'a [u8],
compressed: bool,
decompressor: &mut Option<Decompressor<(u64, usize)>>,
) -> Result<SerializedFrame<'a>> {
let serialized_frame = if compressed {
SerializedFrame::Owned(
decompressor
.get_or_insert_with(Decompressor::new)
.decompress_with_dict_reset(data_slice)
.context("Failed to decompress data frame")?,
)
} else {
SerializedFrame::Borrowed(data_slice)
};
Ok(serialized_frame)
}
fn get_serialized_chunk_frame(
&self,
data_slice: &[u8],
index_offset: usize,
chunk_compress_size_po2: u32,
decompressor: &mut Option<Decompressor<(u64, usize)>>,
) -> Result<SerializedFrame> {
let chunk_mask = (INDEX_ENTRY_SIZE << chunk_compress_size_po2) - 1;
let dict_index_offset = index_offset & !chunk_mask;
let shard = self.shard.expect("shard should be set");
let dict_key = (shard, dict_index_offset);
let decompressor = match decompressor {
Some(d) if d.get_dict_key() == Some(&dict_key) => d,
_ => {
let (index_entry, data_slice) = self.get_index_and_data_at(dict_index_offset)?;
let dict_key_frame = Self::get_serialized_single_frame(
data_slice,
index_entry.flags.contains(IndexEntryFlags::COMPRESSED),
decompressor,
)
.context("Failed to get serialized dict key frame")?;
let d = decompressor.get_or_insert_with(Decompressor::new);
d.load_dict(dict_key_frame.into_owned(), dict_key)
.context("Failed to set decompressor dict")?;
d
}
};
let bytes = if index_offset == dict_index_offset {
decompressor.get_dict().clone()
} else {
decompressor
.decompress_with_loaded_dict(data_slice)
.context("Failed to decompress data frame with dictionary")?
};
Ok(SerializedFrame::Owned(bytes))
}
fn get_index_and_data_at(&self, index_offset: usize) -> Result<(&IndexEntry, &[u8])> {
let index_entry = self
.get_index_entry_at(index_offset)
.ok_or_else(|| anyhow!("Failed to get index entry at offset {}", index_offset))?;
let data_mmap = self
.data_mmap
.as_ref()
.ok_or_else(|| anyhow!("Failed to get mmap"))?;
let data_offset = index_entry.offset as usize;
let data_len = index_entry.len as usize;
let data_slice = data_mmap
.get(
data_offset
..(data_offset
.checked_add(data_len)
.ok_or_else(|| anyhow!("overflow"))?),
)
.ok_or_else(|| anyhow!("Failed to get data slice from mmap"))?;
if data_slice.crc32() != index_entry.data_crc {
bail!(
"Corrupted data entry found: ts={} offset={:#x}",
index_entry.timestamp,
index_entry.offset,
);
};
Ok((index_entry, data_slice))
}
fn get_index_and_serialized_frame_at(
&self,
index_offset: usize,
) -> Result<(&IndexEntry, SerializedFrame)> {
let (index_entry, data_slice) = self.get_index_and_data_at(index_offset)?;
let chunk_compress_size_po2 = index_entry.flags.get_chunk_compress_size_po2();
let uncompressed_frame = if chunk_compress_size_po2 > 0 {
self.get_serialized_chunk_frame(
data_slice,
index_offset,
chunk_compress_size_po2,
&mut self.decompressor.borrow_mut(),
)
.context("Failed to get serialized chunk frame")?
} else {
Self::get_serialized_single_frame(
data_slice,
index_entry.flags.contains(IndexEntryFlags::COMPRESSED),
&mut self.decompressor.borrow_mut(),
)
.context("Failed to get serialized single frame")?
};
Ok((index_entry, uncompressed_frame))
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct StoreOffset {
shard: Option<u64>,
index_offset: Option<usize>,
}
impl StoreOffset {
pub fn new(shard: Option<u64>, index_offset: Option<usize>) -> Self {
StoreOffset {
shard: shard.as_ref().map(|s| s - s % SHARD_TIME),
index_offset: shard.and(index_offset.map(|o| o - o % INDEX_ENTRY_SIZE)),
}
}
pub fn get_shard(&self) -> Option<u64> {
self.shard
}
pub fn get_index_offset(&self) -> Option<usize> {
self.index_offset
}
}
impl Cursor for StoreCursor {
type Offset = StoreOffset;
type Item = (SystemTime, DataFrame);
fn get_offset(&self) -> StoreOffset {
StoreOffset::new(self.shard, self.index_offset)
}
fn set_offset(&mut self, offset: StoreOffset) {
if let Some(shard) = offset.get_shard() {
if self.shard == Some(shard) || self.update_shard(shard).unwrap_or(false) {
self.index_offset = offset.get_index_offset();
return;
}
}
self.shard = offset.get_shard();
self.index_mmap = None;
self.index_offset = offset.get_index_offset();
}
fn advance(&mut self, direction: Direction) -> Result<bool> {
while !self.advance_index(direction) {
if !self.update_or_advance_shard(direction)? {
return Ok(false);
}
}
Ok(true)
}
fn get(&self) -> Option<(SystemTime, DataFrame)> {
match self.get_index_and_serialized_frame_at(self.index_offset?) {
Ok((index_entry, serialized_data)) => {
let format = if index_entry.flags.contains(IndexEntryFlags::CBOR) {
Format::Cbor
} else {
panic!("Unexpected format");
};
let ts =
std::time::UNIX_EPOCH + std::time::Duration::from_secs(index_entry.timestamp);
match deserialize_frame(serialized_data.as_ref(), format) {
Ok(df) => Some((ts, df)),
Err(e) => {
warn!(self.logger, "Failed to deserialize data frame: {}", e);
None
}
}
}
Err(e) => {
warn!(
self.logger,
"Failed to extract serialized data frame: {}", e
);
None
}
}
}
}
impl KeyedCursor<u64> for StoreCursor {
fn get_key(&self) -> Option<u64> {
Some(self.get_index_entry()?.timestamp)
}
fn jump_near_key(&mut self, key: &u64, _direction: Direction) {
let time_offset = key % SHARD_TIME;
let shard = key - time_offset;
self.set_offset(StoreOffset::new(Some(shard), None));
if self.advance_index(Direction::Reverse) {
if let Some(last_entry) = self.get_index_entry() {
let last_entry_index_offset = self
.get_offset()
.get_index_offset()
.expect("get_index_offset should return Some if get_index_entry returns Some");
let last_entry_time_offset = last_entry.timestamp % SHARD_TIME;
if last_entry_time_offset != 0 {
let index_offset_hint = (last_entry_index_offset as f64
/ last_entry_time_offset as f64
* time_offset as f64) as usize;
self.set_offset(StoreOffset::new(Some(shard), Some(index_offset_hint)));
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::fs::OpenOptions;
use std::io::Write;
use common::util::get_unix_timestamp;
use slog::Drain;
use tempfile::TempDir;
use Direction::Forward;
use Direction::Reverse;
use super::*;
use crate::serialize_frame;
use crate::ChunkSizePo2;
use crate::CompressionMode;
use crate::StoreWriter;
struct TestCursor<'a> {
data: &'a Vec<Option<i32>>,
offset: Option<usize>,
}
impl Cursor for TestCursor<'_> {
type Offset = Option<usize>;
type Item = i32;
fn get_offset(&self) -> Self::Offset {
self.offset
}
fn set_offset(&mut self, offset: Self::Offset) {
self.offset = offset;
}
fn get(&self) -> Option<Self::Item> {
self.offset
.as_ref()
.and_then(|o| self.data.get(*o).cloned().flatten())
}
fn advance(&mut self, direction: Direction) -> Result<bool> {
let offset = match self.offset {
Some(offset) => match direction {
Direction::Forward => offset.checked_add(1).filter(|o| o < &self.data.len()),
Direction::Reverse => offset.checked_sub(1),
},
None => match direction {
Direction::Forward => Some(0).filter(|o| o < &self.data.len()),
Direction::Reverse => self.data.len().checked_sub(1),
},
};
if offset.is_some() {
self.offset = offset;
Ok(true)
} else {
Ok(false)
}
}
}
impl KeyedCursor<i32> for TestCursor<'_> {
fn get_key(&self) -> Option<i32> {
self.get()
}
}
#[test]
fn default_next() {
let data = vec![None, Some(3), Some(5), None, None, Some(9)];
let mut cursor = TestCursor {
data: &data,
offset: None,
};
assert_eq!(cursor.next(Forward).unwrap(), Some(3));
assert_eq!(cursor.next(Forward).unwrap(), Some(5));
assert_eq!(cursor.next(Forward).unwrap(), Some(9));
assert_eq!(cursor.next(Forward).unwrap(), None);
assert_eq!(cursor.next(Reverse).unwrap(), Some(5));
assert_eq!(cursor.next(Reverse).unwrap(), Some(3));
assert_eq!(cursor.next(Reverse).unwrap(), None);
assert_eq!(cursor.get(), Some(3));
}
#[test]
fn default_jump_to_key() {
let data = vec![None, Some(3), Some(5), None, None, Some(9)];
let mut cursor = TestCursor {
data: &data,
offset: None,
};
assert!(cursor.jump_to_key(&3, Forward).unwrap());
assert_eq!(cursor.get_key(), Some(3));
assert!(cursor.jump_to_key(&5, Reverse).unwrap());
assert_eq!(cursor.get_key(), Some(5));
assert!(cursor.jump_to_key(&7, Forward).unwrap());
assert_eq!(cursor.get_key(), Some(9));
assert!(cursor.jump_to_key(&4, Reverse).unwrap());
assert_eq!(cursor.get_key(), Some(3));
assert!(!cursor.jump_to_key(&10, Forward).unwrap());
assert_eq!(cursor.get_key(), Some(9));
assert!(!cursor.jump_to_key(&0, Reverse).unwrap());
assert_eq!(cursor.get_key(), None);
}
#[test]
fn default_get_near() {
let data = vec![Some(3), Some(5), None, None, Some(9)];
let mut cursor = TestCursor {
data: &data,
offset: None,
};
assert_eq!(cursor.get_near(&5, Forward).unwrap(), Some(5));
assert_eq!(cursor.get_near(&4, Forward).unwrap(), Some(5));
assert_eq!(cursor.get_near(&4, Reverse).unwrap(), Some(3));
assert_eq!(cursor.get_near(&2, Reverse).unwrap(), Some(3));
assert_eq!(cursor.get_near(&10, Forward).unwrap(), Some(9));
}
#[test]
fn default_get_next() {
let data = vec![Some(3), Some(5), None, None, Some(9)];
let mut cursor = TestCursor {
data: &data,
offset: None,
};
assert_eq!(cursor.get_next(&5, Forward).unwrap(), Some(5));
assert_eq!(cursor.get_next(&4, Forward).unwrap(), Some(5));
assert_eq!(cursor.get_next(&4, Reverse).unwrap(), Some(3));
assert_eq!(cursor.get_next(&2, Reverse).unwrap(), None);
assert_eq!(cursor.get_next(&10, Forward).unwrap(), None);
}
fn get_logger() -> Logger {
let plain = slog_term::PlainSyncDecorator::new(std::io::stderr());
Logger::root(slog_term::FullFormat::new(plain).build().fuse(), slog::o!())
}
fn simple_put_read(compression_mode: CompressionMode, format: Format) {
let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
let ts = get_unix_timestamp(SystemTime::now());
let now = std::time::UNIX_EPOCH + std::time::Duration::from_secs(ts);
let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
.expect("Failed to create store");
let mut frame = DataFrame::default();
frame.sample.cgroup.memory_current = Some(42);
writer.put(now, &frame).expect("Failed to store data");
let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
let sample = cursor
.next(Forward)
.expect("Failed to read sample")
.expect("Did not find stored sample");
assert_eq!(sample, (now, frame));
}
#[test]
fn read_cbor() {
simple_put_read(CompressionMode::None, Format::Cbor);
}
#[test]
fn read_compressed_cbor() {
simple_put_read(CompressionMode::Zstd, Format::Cbor);
}
#[test]
fn read_dict_compressed_cbor() {
simple_put_read(
CompressionMode::ZstdDictionary(ChunkSizePo2(2)),
Format::Cbor,
);
}
struct TestWriter {
path: PathBuf,
}
impl TestWriter {
pub fn new<P: AsRef<std::path::Path>>(path: P) -> Self {
Self {
path: path.as_ref().to_path_buf(),
}
}
pub fn put(&self, timestamp: u64) -> Result<()> {
self.put_helper(timestamp, false, false)
}
pub fn put_corrupt_index(&self, timestamp: u64) -> Result<()> {
self.put_helper(timestamp, true, false)
}
pub fn put_corrupt_data(&self, timestamp: u64) -> Result<()> {
self.put_helper(timestamp, false, true)
}
fn put_helper(
&self,
timestamp: u64,
corrupt_index: bool,
corrupt_data: bool,
) -> Result<()> {
let shard = timestamp - timestamp % SHARD_TIME;
let open_options = OpenOptions::new().create(true).append(true).clone();
let data_bytes = serialize_frame(&DataFrame::default(), Format::Cbor)
.context("Failed to serialize data frame")?;
let data_crc = if corrupt_data { 0 } else { data_bytes.crc32() };
let mut data_file = open_options
.open(self.path.join(format!("data_{:011}", shard)))
.context("Failed to open data file")?;
let offset = data_file
.metadata()
.context("Failed to get metadata of data file")?
.len();
data_file
.write_all(&data_bytes)
.context("Failed to write to data file")?;
let mut index_entry = IndexEntry {
timestamp,
offset,
len: data_bytes.len() as u32,
flags: IndexEntryFlags::CBOR,
data_crc,
index_crc: 0,
};
if !corrupt_index {
index_entry.index_crc = index_entry.crc32();
}
let entry_slice = unsafe {
std::slice::from_raw_parts(
&index_entry as *const IndexEntry as *const u8,
INDEX_ENTRY_SIZE,
)
};
open_options
.open(self.path.join(format!("index_{:011}", shard)))
.context("Failed to open index file")?
.write_all(entry_slice)
.context("Failed to write entry to index file")?;
Ok(())
}
}
#[test]
fn advance_when_empty() {
let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
assert!(!cursor.advance(Forward).unwrap());
assert!(cursor.get_key().is_none());
assert!(!cursor.advance(Reverse).unwrap());
assert!(cursor.get_key().is_none());
}
#[test]
fn advance_at_boundries() {
let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
let ts = get_unix_timestamp(SystemTime::now());
let writer = TestWriter::new(&dir);
let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
writer.put(ts).unwrap();
assert!(cursor.advance(Forward).unwrap());
assert_eq!(cursor.get_key(), Some(ts));
assert!(!cursor.advance(Forward).unwrap());
assert_eq!(cursor.get_key(), Some(ts));
assert!(!cursor.advance(Reverse).unwrap());
assert_eq!(cursor.get_key(), Some(ts));
}
#[test]
fn advance_simple() {
let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
let ts = get_unix_timestamp(SystemTime::now());
let writer = TestWriter::new(&dir);
let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
writer.put(ts).unwrap();
writer.put(ts + 5).unwrap();
writer.put(ts + SHARD_TIME).unwrap();
assert!(cursor.advance(Forward).unwrap());
assert_eq!(cursor.get_key(), Some(ts));
assert!(cursor.advance(Forward).unwrap());
assert_eq!(cursor.get_key(), Some(ts + 5));
assert!(cursor.advance(Forward).unwrap());
assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME));
assert!(cursor.advance(Reverse).unwrap());
assert_eq!(cursor.get_key(), Some(ts + 5));
assert!(cursor.advance(Reverse).unwrap());
assert_eq!(cursor.get_key(), Some(ts));
}
#[test]
fn advance_retry() {
let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
let ts = get_unix_timestamp(SystemTime::now());
let writer = TestWriter::new(&dir);
let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
writer.put(ts).unwrap();
assert!(cursor.advance(Forward).unwrap());
assert_eq!(cursor.get_key(), Some(ts));
assert!(!cursor.advance(Forward).unwrap());
writer.put(ts + 5).unwrap();
assert!(cursor.advance(Forward).unwrap());
assert_eq!(cursor.get_key(), Some(ts + 5));
assert!(!cursor.advance(Forward).unwrap());
writer.put(ts + SHARD_TIME).unwrap();
assert!(cursor.advance(Forward).unwrap());
assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME));
assert!(!cursor.advance(Forward).unwrap());
}
#[test]
fn get_corrupt() {
let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
let ts = get_unix_timestamp(SystemTime::now());
let writer = TestWriter::new(&dir);
let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
writer.put_corrupt_index(ts).unwrap();
assert!(cursor.advance(Forward).unwrap());
assert!(cursor.get_key().is_none());
assert!(cursor.get().is_none());
writer.put_corrupt_data(ts + 5).unwrap();
assert!(cursor.advance(Forward).unwrap());
assert_eq!(cursor.get_key(), Some(ts + 5));
assert!(cursor.get().is_none());
}
#[test]
fn skip_corrupt() {
let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
let ts = get_unix_timestamp(SystemTime::now());
let writer = TestWriter::new(&dir);
let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
writer.put_corrupt_data(ts).unwrap();
writer.put(ts + 5).unwrap(); writer.put_corrupt_data(ts + 5 * 2).unwrap();
writer.put_corrupt_index(ts + 5 * 3).unwrap();
writer.put_corrupt_data(ts + SHARD_TIME).unwrap();
writer.put_corrupt_index(ts + SHARD_TIME * 2).unwrap();
writer.put(ts + SHARD_TIME * 2 + 5).unwrap(); writer.put_corrupt_data(ts + SHARD_TIME * 3).unwrap();
assert_eq!(
get_unix_timestamp(cursor.next(Forward).unwrap().unwrap().0),
ts + 5
);
assert_eq!(
get_unix_timestamp(cursor.next(Forward).unwrap().unwrap().0),
ts + SHARD_TIME * 2 + 5
);
assert!(cursor.next(Forward).unwrap().is_none());
assert_eq!(
get_unix_timestamp(cursor.next(Reverse).unwrap().unwrap().0),
ts + 5
);
}
#[test]
fn manipulate_offset() {
let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
let ts = get_unix_timestamp(SystemTime::now());
let writer = TestWriter::new(&dir);
let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
writer.put(ts).unwrap();
writer.put(ts + 5).unwrap();
writer.put(ts + SHARD_TIME * 2 + 5).unwrap();
let expected_offsets = &[
StoreOffset::new(Some(ts), Some(0)),
StoreOffset::new(Some(ts), Some(INDEX_ENTRY_SIZE)),
StoreOffset::new(Some(ts + SHARD_TIME * 2), Some(0)),
];
assert_eq!(cursor.get_offset(), StoreOffset::default());
assert!(cursor.advance(Forward).unwrap());
assert_eq!(cursor.get_offset(), expected_offsets[0]);
assert!(cursor.advance(Forward).unwrap());
assert_eq!(cursor.get_offset(), expected_offsets[1]);
assert!(cursor.advance(Forward).unwrap());
assert_eq!(cursor.get_offset(), expected_offsets[2]);
cursor.set_offset(StoreOffset::default());
assert!(cursor.get_key().is_none());
cursor.set_offset(StoreOffset::new(
Some(ts + SHARD_TIME),
Some(INDEX_ENTRY_SIZE),
));
assert!(cursor.get_key().is_none());
cursor.set_offset(StoreOffset::new(Some(ts + SHARD_TIME * 2), None));
assert!(cursor.get_key().is_none());
cursor.set_offset(expected_offsets[1].clone());
assert_eq!(cursor.get_key(), Some(ts + 5));
cursor.set_offset(expected_offsets[0].clone());
assert_eq!(cursor.get_key(), Some(ts));
cursor.set_offset(expected_offsets[2].clone());
assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME * 2 + 5));
}
#[test]
fn advance_from_invalid_offset() {
let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
let ts = get_unix_timestamp(SystemTime::now());
let writer = TestWriter::new(&dir);
let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
writer.put(ts).unwrap();
writer.put(ts + SHARD_TIME * 2).unwrap();
cursor.set_offset(StoreOffset::new(Some(ts), Some(INDEX_ENTRY_SIZE)));
assert!(cursor.advance(Reverse).unwrap());
assert_eq!(cursor.get_key(), Some(ts));
cursor.set_offset(StoreOffset::new(Some(ts + SHARD_TIME), Some(0)));
assert!(cursor.advance(Forward).unwrap());
assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME * 2));
cursor.set_offset(StoreOffset::new(Some(ts + SHARD_TIME * 4), Some(0)));
assert!(!cursor.advance(Forward).unwrap());
assert_eq!(
cursor.get_offset(),
StoreOffset::new(Some(ts + SHARD_TIME * 4), Some(0))
);
}
#[test]
fn jump_to_key() {
let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
let ts = get_unix_timestamp(SystemTime::now());
let writer = TestWriter::new(&dir);
let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
writer.put(ts + 5).unwrap();
writer.put(ts + 5 * 20).unwrap();
writer.put(ts + 5 * 21).unwrap();
writer.put(ts + SHARD_TIME * 2).unwrap();
cursor.jump_to_key(&(ts + 5), Forward).unwrap();
assert_eq!(cursor.get_key(), Some(ts + 5));
cursor.jump_to_key(&(ts + SHARD_TIME * 2), Reverse).unwrap();
assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME * 2));
cursor.jump_to_key(&(ts), Reverse).unwrap();
assert_eq!(cursor.get_key(), Some(ts + 5));
cursor.jump_to_key(&(ts + SHARD_TIME * 3), Forward).unwrap();
assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME * 2));
cursor.jump_to_key(&(ts + 5 * 100), Forward).unwrap();
assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME * 2));
cursor.jump_to_key(&(ts + 5 * 100), Reverse).unwrap();
assert_eq!(cursor.get_key(), Some(ts + 5 * 21));
}
}