below_store/
cursor.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::fs::File;
17use std::io::ErrorKind;
18use std::path::PathBuf;
19use std::time::SystemTime;
20
21use anyhow::anyhow;
22use anyhow::bail;
23use anyhow::Context;
24use anyhow::Result;
25use memmap2::Mmap;
26use memmap2::MmapOptions;
27use slog::warn;
28use slog::Logger;
29
30use crate::compression::Decompressor;
31use crate::deserialize_frame;
32use crate::get_index_files;
33use crate::Crc32;
34use crate::DataFrame;
35use crate::Direction;
36use crate::Format;
37use crate::IndexEntry;
38use crate::IndexEntryFlags;
39use crate::SerializedFrame;
40use crate::INDEX_ENTRY_SIZE;
41use crate::SHARD_TIME;
42
43/// A read-only Iterator that can move back and forth.
44pub trait Cursor {
45    type Offset;
46    type Item;
47
48    /// Get offset of the cursor.
49    fn get_offset(&self) -> Self::Offset;
50    /// Set offset of the cursor.
51    fn set_offset(&mut self, offset: Self::Offset);
52    /// Get the item pointed to by the cursor. Could be None if item is invalid.
53    fn get(&self) -> Option<Self::Item>;
54    /// Move the cursor one step in the given direction if it's within range.
55    /// Return if the cursor is moved.
56    fn advance(&mut self, direction: Direction) -> Result<bool>;
57    /// Move the cursor in the given direction until a valid item is obtained.
58    /// If no more valid item available, return None and offset unchanged.
59    fn next(&mut self, direction: Direction) -> Result<Option<Self::Item>> {
60        let offset = self.get_offset();
61        while self.advance(direction)? {
62            if let Some(item) = self.get() {
63                return Ok(Some(item));
64            }
65        }
66        self.set_offset(offset);
67        Ok(None)
68    }
69}
70
71/// A cursor that has keys bound to each position in ascending order.
72/// Making this generic so that each Cursor may define multiple Keys.
73pub trait KeyedCursor<Key: std::cmp::Ord>: Cursor {
74    /// Get the key of the current position.
75    fn get_key(&self) -> Option<Key>;
76
77    /// Used by jump_to_keys with same parameters. Should be overridden to make
78    /// jump_to_key more efficient by moving cursor near the key. Notice that
79    /// jump_to_key should work as long as keys are monotonic in same order as
80    /// offsets, even if this function does nothing.
81    fn jump_near_key(&mut self, _key: &Key, _direction: Direction) {}
82
83    /// Move the cursor to the closest valid pos to the given key in the given
84    /// direction. Return if the key at the final pos satisfies the ordering
85    /// requirement. For example, for a list of int keys: [3, 7, 8, 12, 19],
86    /// jumping to key 9 forward moves cursor to index 3 because 12 is the
87    /// smallest int greater than 9 in the list. Jumping to key 0 reverse moves
88    /// cursor to index 0 and returns false because no key is smaller than 3.
89    fn jump_to_key(&mut self, key: &Key, direction: Direction) -> Result<bool> {
90        self.jump_near_key(key, direction);
91        // Move cursor backward to get a position with lower key order, and then
92        // move forward to get the first position with higher key order.
93        let mut curr_key = self.get_key();
94        for curr_dir in &[direction.flip(), direction] {
95            let skip_order = curr_dir.get_skip_order();
96            while curr_key.as_ref().map_or(true, |k| k.cmp(key) == skip_order) {
97                if !self.advance(*curr_dir)? {
98                    break;
99                }
100                curr_key = self.get_key();
101            }
102        }
103        // Check if the last key satisfies the direction order
104        Ok(curr_key.map_or(false, |k| k.cmp(key) != direction.get_skip_order()))
105    }
106
107    /// Convenient function to jump to a key and get the closest valid item to
108    /// the key. Preference is given to the specified direction. Returns None
109    /// only if there are no keys at all.
110    ///
111    /// For example, for a list of int keys: [3, 7, 8, 12, 19],
112    /// `cursor.get_near(0, Direction::Reverse)` will jump
113    /// cursor to 3 and return 3.
114    fn get_near(
115        &mut self,
116        key: &Key,
117        preferred_direction: Direction,
118    ) -> Result<Option<Self::Item>> {
119        self.jump_to_key(key, preferred_direction)?;
120        match self.get() {
121            Some(item) => Ok(Some(item)),
122            None => self.next(preferred_direction),
123        }
124    }
125
126    /// Convenient function to jump to a key and get the closest valid item
127    /// that is at key or in the given direction of key. Returns None if
128    /// no such key exists.
129    ///
130    /// For example, for a list of int keys: [3, 7, 8, 12, 19],
131    /// `cursor.get_next(0, Direction::Reverse)` will return
132    /// `Ok(None)`.
133    fn get_next(&mut self, key: &Key, direction: Direction) -> Result<Option<Self::Item>> {
134        if self.jump_to_key(key, direction)? {
135            match self.get() {
136                Some(item) => Ok(Some(item)),
137                None => self.next(direction),
138            }
139        } else {
140            Ok(None)
141        }
142    }
143}
144
145/// For read-only access to a store. Similar to an iterator, but support moving
146/// back and forth.
147pub struct StoreCursor {
148    logger: Logger,
149    // Path to the store directory that contains index and data files.
150    path: PathBuf,
151    // Current shard this cursor points to.
152    shard: Option<u64>,
153    // Mmap of the index and data files of the current shard. Could be None if
154    // the current shard does not exist.
155    index_mmap: Option<Mmap>,
156    data_mmap: Option<Mmap>,
157    // Current offset into the index mmap. The combination of shard and offset
158    // locates the exact sample of this store. Offset could be None if shard
159    // does not exist or just moved to a newly initialized shard.
160    index_offset: Option<usize>,
161    // Used for extracting compressed frames. If dictionary is used, it's also
162    // cached, along with the shard and dict_index_offset that identify it.
163    decompressor: RefCell<Option<Decompressor<(u64, usize)>>>,
164}
165
166enum StoreFile {
167    Index,
168    Data,
169}
170
171impl StoreCursor {
172    /// Create a new cursor with uninitialized shard.
173    pub fn new(logger: Logger, path: PathBuf) -> Self {
174        Self {
175            logger,
176            path,
177            shard: None,
178            index_mmap: None,
179            data_mmap: None,
180            index_offset: None,
181            decompressor: RefCell::new(None),
182        }
183    }
184
185    /// Get the mmap of a related store file based on the given shard. If the
186    /// file is not found or empty, None will be returned.
187    fn get_mmap(&self, file_type: StoreFile, shard: u64) -> Result<Option<Mmap>> {
188        let prefix = match file_type {
189            StoreFile::Index => "index",
190            StoreFile::Data => "data",
191        };
192        let path = self.path.join(format!("{}_{:011}", prefix, shard));
193        let file = match File::open(&path) {
194            Ok(f) => f,
195            Err(e) if e.kind() == ErrorKind::NotFound => {
196                warn!(
197                    self.logger,
198                    "Expected file does not exist: {}",
199                    path.display()
200                );
201                return Ok(None);
202            }
203            Err(e) => {
204                return Err(e).context(format!("Failed while opening file: {}", path.display()));
205            }
206        };
207
208        let mut len = file
209            .metadata()
210            .with_context(|| format!("Failed to get metadata of file: {}", path.display()))?
211            .len() as usize;
212        if let StoreFile::Index = file_type {
213            len = len - len % INDEX_ENTRY_SIZE;
214        }
215        if len == 0 {
216            warn!(self.logger, "0 length file found: {}", path.display());
217            return Ok(None);
218        }
219
220        // Mmap is unsafe because it allows unrestricted concurrent access. In
221        // our case, we only have one background process (below record) doing
222        // append-only writes to both index and data files. We also use CRC to
223        // verify file content. As long as we do read-only operations here, this
224        // should be Ok.
225        unsafe {
226            Some(
227                MmapOptions::new()
228                    .len(len)
229                    .map(&file)
230                    .with_context(|| format!("Failed to mmap file {}", path.display())),
231            )
232            .transpose()
233        }
234    }
235
236    /// Update the cursor to use the given shard and initialize the mmaps. If
237    /// the current shard's index mmap has grown in length, update to the new
238    /// index mmap so data appended since last update will show up.
239    /// Return if the cursor updated. Could return false if either index or data
240    /// file is empty or does not exist, or if the given shard is the same as
241    /// the current shard and the index file does not grow. Index offset is also
242    /// reset to None if moved to a different shard.
243    fn update_shard(&mut self, shard: u64) -> Result<bool> {
244        // This mmap is always aligned to INDEX_ENTRY_SIZE because
245        // it is page aligned.
246        let new_index_mmap = match self.get_mmap(StoreFile::Index, shard)? {
247            Some(index_mmap) => index_mmap,
248            None => return Ok(false),
249        };
250        let new_data_mmap = match self.get_mmap(StoreFile::Data, shard)? {
251            Some(data_mmap) => data_mmap,
252            None => return Ok(false),
253        };
254        if self.shard == Some(shard) {
255            let index_mmap_len = self.index_mmap.as_ref().map_or(0, |m| m.len());
256            if new_index_mmap.len() <= index_mmap_len {
257                // Nothing is updated if index file does not change.
258                return Ok(false);
259            }
260        } else {
261            self.shard = Some(shard);
262            self.index_offset = None;
263        }
264        self.index_mmap = Some(new_index_mmap);
265        self.data_mmap = Some(new_data_mmap);
266        Ok(true)
267    }
268
269    /// Update current shard or move the cursor to a neighbor valid shard.
270    /// Return if the cursor is updated (current shard is updated with new mmap
271    /// or cursor has moved to next shard). Returning false means there is no
272    /// more shard in the given direction. Retrying may succeed as the store
273    /// directory is scanned on every call.
274    fn update_or_advance_shard(&mut self, direction: Direction) -> Result<bool> {
275        let entries = get_index_files(&self.path)?;
276
277        let entries_iter: Box<dyn Iterator<Item = &String>> = match direction {
278            Direction::Forward => Box::new(entries.iter()),
279            Direction::Reverse => Box::new(entries.iter().rev()),
280        };
281        for entry in entries_iter {
282            let v: Vec<&str> = entry.split('_').collect();
283            if v.len() != 2 {
284                warn!(self.logger, "Invalid index file name: {}", entry);
285                continue;
286            }
287
288            let entry_shard = match v[1].parse::<u64>() {
289                Ok(val) => val,
290                _ => {
291                    warn!(self.logger, "Cannot parse index shard: {}", entry);
292                    continue;
293                }
294            };
295
296            if let Some(shard) = self.shard.as_ref() {
297                if entry_shard.cmp(shard) == direction.get_skip_order() {
298                    continue;
299                }
300            }
301
302            // Try to refresh the current shard (any new entries appended?) or
303            // move to a different shard.
304            if self.update_shard(entry_shard)? {
305                return Ok(true);
306            }
307        }
308        Ok(false)
309    }
310
311    /// Move index offset to next position in given direction if it's valid.
312    /// Return if index offset is updated. False means no more valid position.
313    fn advance_index(&mut self, direction: Direction) -> bool {
314        if let Some(index_mmap) = self.index_mmap.as_ref() {
315            // get_mmap ensures that index_mmap.len() >= INDEX_ENTRY_SIZE, and
316            // thus 0 is always a valid index.
317            debug_assert!(index_mmap.len() > 0);
318            // index offset may be None if overflows
319            let offset = match self.index_offset {
320                Some(offset) => match direction {
321                    Direction::Forward => offset
322                        .checked_add(INDEX_ENTRY_SIZE)
323                        .filter(|o| o < &index_mmap.len()),
324                    Direction::Reverse => offset.checked_sub(INDEX_ENTRY_SIZE),
325                },
326                // Default offsets
327                None => match direction {
328                    Direction::Forward => Some(0),
329                    Direction::Reverse => index_mmap.len().checked_sub(INDEX_ENTRY_SIZE),
330                },
331            };
332            if offset.is_some() {
333                self.index_offset = offset;
334                return true;
335            }
336        }
337        false
338    }
339
340    /// Get index entry at offset. Return None if offset points to
341    /// zero padding, or if the index entry is corrupt.
342    fn get_index_entry_at(&self, index_offset: usize) -> Option<&IndexEntry> {
343        let index_mmap = self.index_mmap.as_ref()?;
344        let index_entry_slice =
345            index_mmap.get(index_offset..(index_offset.checked_add(INDEX_ENTRY_SIZE)?))?;
346        // index_entry_slice is guaranteed to be INDEX_ENTRY_SIZE
347        // bytes. The mmap should also be page aligned, and
348        // index_offset is a multiple of INDEX_ENTRY_SIZE. Thus the
349        // following should always result in empty prefix/suffix
350        // unless there is a bug.
351        //
352        // Treating the slice as an IndexEntry is safe as it's always
353        // validated with crc.
354        let (_, body, _) = unsafe { index_entry_slice.align_to::<IndexEntry>() };
355        assert_eq!(
356            body.len(),
357            1,
358            "bug: Mis-aligned index entry found: shard={} offset={}",
359            self.shard.unwrap(),
360            index_offset,
361        );
362        // Ignore zero padding which can happen in dictionary chunk
363        // compression mode, since chunks need to be aligned.
364        if index_entry_slice == [0; INDEX_ENTRY_SIZE] {
365            return None;
366        }
367        let index_entry = &body[0];
368        if index_entry.crc32() != index_entry.index_crc {
369            warn!(
370                self.logger,
371                "Corrupted index entry found: shard={} offset={:#x}",
372                self.shard.unwrap(),
373                index_offset,
374            );
375            None
376        } else {
377            Some(index_entry)
378        }
379    }
380
381    /// Get the index entry the cursor currently pointing at.
382    fn get_index_entry(&self) -> Option<&IndexEntry> {
383        self.get_index_entry_at(self.index_offset?)
384    }
385
386    /// Get serialized frame from data_slice, decompressing it as
387    /// necessary.
388    fn get_serialized_single_frame<'a>(
389        data_slice: &'a [u8],
390        compressed: bool,
391        decompressor: &mut Option<Decompressor<(u64, usize)>>,
392    ) -> Result<SerializedFrame<'a>> {
393        let serialized_frame = if compressed {
394            SerializedFrame::Owned(
395                decompressor
396                    .get_or_insert_with(Decompressor::new)
397                    .decompress_with_dict_reset(data_slice)
398                    .context("Failed to decompress data frame")?,
399            )
400        } else {
401            SerializedFrame::Borrowed(data_slice)
402        };
403        Ok(serialized_frame)
404    }
405
406    /// Get the serialized, uncompressed frame that is part of a
407    /// chunk. That is, it is either the first frame of the chunk (a
408    /// dictionary key frame) or some other frame in the chunk
409    /// (compressed by dictionary key frame).
410    ///
411    /// Because chunks are aligned, whether the frame is the first
412    /// in a chunk can be determined by `index_offset` and
413    /// `chunk_compress_size`.
414    fn get_serialized_chunk_frame(
415        &self,
416        data_slice: &[u8],
417        index_offset: usize,
418        chunk_compress_size_po2: u32,
419        decompressor: &mut Option<Decompressor<(u64, usize)>>,
420    ) -> Result<SerializedFrame> {
421        // Calculate offset into the chunk. If this is 0, then this
422        // is the first frame and hence key frame of the chunk.
423        let chunk_mask = (INDEX_ENTRY_SIZE << chunk_compress_size_po2) - 1;
424        let dict_index_offset = index_offset & !chunk_mask;
425
426        let shard = self.shard.expect("shard should be set");
427        let dict_key = (shard, dict_index_offset);
428
429        let decompressor = match decompressor {
430            Some(d) if d.get_dict_key() == Some(&dict_key) => d,
431            _ => {
432                let (index_entry, data_slice) = self.get_index_and_data_at(dict_index_offset)?;
433                let dict_key_frame = Self::get_serialized_single_frame(
434                    data_slice,
435                    index_entry.flags.contains(IndexEntryFlags::COMPRESSED),
436                    decompressor,
437                )
438                .context("Failed to get serialized dict key frame")?;
439                let d = decompressor.get_or_insert_with(Decompressor::new);
440                d.load_dict(dict_key_frame.into_owned(), dict_key)
441                    .context("Failed to set decompressor dict")?;
442                d
443            }
444        };
445
446        // First frame in chunk is the dict key frame. Other frames
447        // in the chunk are decompressed using the dict key frame.
448        let bytes = if index_offset == dict_index_offset {
449            decompressor.get_dict().clone()
450        } else {
451            decompressor
452                .decompress_with_loaded_dict(data_slice)
453                .context("Failed to decompress data frame with dictionary")?
454        };
455        Ok(SerializedFrame::Owned(bytes))
456    }
457
458    /// Get index entry at offset and it's corresponding data slice.
459    fn get_index_and_data_at(&self, index_offset: usize) -> Result<(&IndexEntry, &[u8])> {
460        let index_entry = self
461            .get_index_entry_at(index_offset)
462            .ok_or_else(|| anyhow!("Failed to get index entry at offset {}", index_offset))?;
463        let data_mmap = self
464            .data_mmap
465            .as_ref()
466            .ok_or_else(|| anyhow!("Failed to get mmap"))?;
467        let data_offset = index_entry.offset as usize;
468        let data_len = index_entry.len as usize;
469        let data_slice = data_mmap
470            .get(
471                data_offset
472                    ..(data_offset
473                        .checked_add(data_len)
474                        .ok_or_else(|| anyhow!("overflow"))?),
475            )
476            .ok_or_else(|| anyhow!("Failed to get data slice from mmap"))?;
477
478        if data_slice.crc32() != index_entry.data_crc {
479            bail!(
480                "Corrupted data entry found: ts={} offset={:#x}",
481                index_entry.timestamp,
482                index_entry.offset,
483            );
484        };
485        Ok((index_entry, data_slice))
486    }
487
488    /// Get the index entry and uncompressed serialized data at an
489    /// index offset in the current shard.
490    fn get_index_and_serialized_frame_at(
491        &self,
492        index_offset: usize,
493    ) -> Result<(&IndexEntry, SerializedFrame)> {
494        let (index_entry, data_slice) = self.get_index_and_data_at(index_offset)?;
495        let chunk_compress_size_po2 = index_entry.flags.get_chunk_compress_size_po2();
496        let uncompressed_frame = if chunk_compress_size_po2 > 0 {
497            // This frame is dictionary compressed, or it is the
498            // first frame of a chunk which should be stored as
499            // dictionary.
500            self.get_serialized_chunk_frame(
501                data_slice,
502                index_offset,
503                chunk_compress_size_po2,
504                &mut self.decompressor.borrow_mut(),
505            )
506            .context("Failed to get serialized chunk frame")?
507        } else {
508            Self::get_serialized_single_frame(
509                data_slice,
510                index_entry.flags.contains(IndexEntryFlags::COMPRESSED),
511                &mut self.decompressor.borrow_mut(),
512            )
513            .context("Failed to get serialized single frame")?
514        };
515        Ok((index_entry, uncompressed_frame))
516    }
517}
518
519/// Offset of a StoreCursor.
520#[derive(Clone, Debug, Default, PartialEq)]
521pub struct StoreOffset {
522    shard: Option<u64>,
523    index_offset: Option<usize>,
524}
525
526impl StoreOffset {
527    /// Shard and index offset are trimmed to multiples of SHARD_TIME and
528    /// INDEX_ENTRY_SIZE respectively. Index offset is ignored if shard is None.
529    pub fn new(shard: Option<u64>, index_offset: Option<usize>) -> Self {
530        StoreOffset {
531            shard: shard.as_ref().map(|s| s - s % SHARD_TIME),
532            index_offset: shard.and(index_offset.map(|o| o - o % INDEX_ENTRY_SIZE)),
533        }
534    }
535
536    pub fn get_shard(&self) -> Option<u64> {
537        self.shard
538    }
539
540    pub fn get_index_offset(&self) -> Option<usize> {
541        self.index_offset
542    }
543}
544
545impl Cursor for StoreCursor {
546    type Offset = StoreOffset;
547    type Item = (SystemTime, DataFrame);
548
549    fn get_offset(&self) -> StoreOffset {
550        StoreOffset::new(self.shard, self.index_offset)
551    }
552
553    fn set_offset(&mut self, offset: StoreOffset) {
554        if let Some(shard) = offset.get_shard() {
555            if self.shard == Some(shard) || self.update_shard(shard).unwrap_or(false) {
556                self.index_offset = offset.get_index_offset();
557                return;
558            }
559        }
560        // Set the shard even if it does not exist so that advance still finds
561        // the closest valid shard. Clear index_mmap as it no longer corresponds
562        // to this invalid or virtual shard. The index_offset does not matter
563        // much in this case as it will be overwritten once advance is called.
564        // Keep it so get_offset may get the same value back.
565        self.shard = offset.get_shard();
566        self.index_mmap = None;
567        self.index_offset = offset.get_index_offset();
568    }
569
570    /// Move the cursor to a neighbor position. Return if the cursor is updated.
571    /// Returning false means there is no more valid position in the given
572    /// direction, although retrying may succeed. Notice that true means the
573    /// current position is valid, but underlying sample may still be invalid.
574    fn advance(&mut self, direction: Direction) -> Result<bool> {
575        while !self.advance_index(direction) {
576            if !self.update_or_advance_shard(direction)? {
577                // No more shard available
578                return Ok(false);
579            }
580        }
581        Ok(true)
582    }
583
584    /// Get the sample the cursor is currently pointing at. Notice that a store
585    /// may contain holes due to data corruption etc, and thus returns None.
586    /// This does not mean samples are depleted. More could be retrieved by
587    /// advancing further to skip the holes.
588    fn get(&self) -> Option<(SystemTime, DataFrame)> {
589        match self.get_index_and_serialized_frame_at(self.index_offset?) {
590            Ok((index_entry, serialized_data)) => {
591                let format = if index_entry.flags.contains(IndexEntryFlags::CBOR) {
592                    Format::Cbor
593                } else {
594                    panic!("Unexpected format");
595                };
596                let ts =
597                    std::time::UNIX_EPOCH + std::time::Duration::from_secs(index_entry.timestamp);
598                match deserialize_frame(serialized_data.as_ref(), format) {
599                    Ok(df) => Some((ts, df)),
600                    Err(e) => {
601                        warn!(self.logger, "Failed to deserialize data frame: {}", e);
602                        None
603                    }
604                }
605            }
606            Err(e) => {
607                warn!(
608                    self.logger,
609                    "Failed to extract serialized data frame: {}", e
610                );
611                None
612            }
613        }
614    }
615}
616
617/// StoreCursor has each cursor position bound to a SystemTime in monotonic
618/// order. This allows moving the cursor relative to SystemTime instances.
619impl KeyedCursor<u64> for StoreCursor {
620    /// Get timestamp of the current pos.
621    fn get_key(&self) -> Option<u64> {
622        Some(self.get_index_entry()?.timestamp)
623    }
624
625    /// Set the cursor offset near the given timestamp by inferring shard and
626    /// index offset.
627    fn jump_near_key(&mut self, key: &u64, _direction: Direction) {
628        let time_offset = key % SHARD_TIME;
629        let shard = key - time_offset;
630        self.set_offset(StoreOffset::new(Some(shard), None));
631        // Move to the end of the shard.
632        if self.advance_index(Direction::Reverse) {
633            if let Some(last_entry) = self.get_index_entry() {
634                let last_entry_index_offset = self
635                    .get_offset()
636                    .get_index_offset()
637                    .expect("get_index_offset should return Some if get_index_entry returns Some");
638                let last_entry_time_offset = last_entry.timestamp % SHARD_TIME;
639                if last_entry_time_offset != 0 {
640                    // Assume samples are recorded in constant interval and
641                    // scale index offset by time offset
642                    let index_offset_hint = (last_entry_index_offset as f64
643                        / last_entry_time_offset as f64
644                        * time_offset as f64) as usize;
645                    self.set_offset(StoreOffset::new(Some(shard), Some(index_offset_hint)));
646                }
647            }
648        }
649    }
650}
651
652#[cfg(test)]
653mod tests {
654    use std::fs::OpenOptions;
655    use std::io::Write;
656
657    use common::util::get_unix_timestamp;
658    use slog::Drain;
659    use tempfile::TempDir;
660    use Direction::Forward;
661    use Direction::Reverse;
662
663    use super::*;
664    use crate::serialize_frame;
665    use crate::ChunkSizePo2;
666    use crate::CompressionMode;
667    use crate::StoreWriter;
668
669    /// Simple cursor to illustrate implementation and test default methods.
670    struct TestCursor<'a> {
671        data: &'a Vec<Option<i32>>,
672        offset: Option<usize>,
673    }
674    impl Cursor for TestCursor<'_> {
675        type Offset = Option<usize>;
676        type Item = i32;
677        fn get_offset(&self) -> Self::Offset {
678            self.offset
679        }
680        fn set_offset(&mut self, offset: Self::Offset) {
681            self.offset = offset;
682        }
683        fn get(&self) -> Option<Self::Item> {
684            self.offset
685                .as_ref()
686                .and_then(|o| self.data.get(*o).cloned().flatten())
687        }
688        fn advance(&mut self, direction: Direction) -> Result<bool> {
689            let offset = match self.offset {
690                Some(offset) => match direction {
691                    Direction::Forward => offset.checked_add(1).filter(|o| o < &self.data.len()),
692                    Direction::Reverse => offset.checked_sub(1),
693                },
694                // Default offsets
695                None => match direction {
696                    Direction::Forward => Some(0).filter(|o| o < &self.data.len()),
697                    Direction::Reverse => self.data.len().checked_sub(1),
698                },
699            };
700            if offset.is_some() {
701                self.offset = offset;
702                Ok(true)
703            } else {
704                Ok(false)
705            }
706        }
707    }
708    impl KeyedCursor<i32> for TestCursor<'_> {
709        fn get_key(&self) -> Option<i32> {
710            self.get()
711        }
712    }
713
714    /// Test default implementation of next(). It should skip invalid items.
715    #[test]
716    fn default_next() {
717        let data = vec![None, Some(3), Some(5), None, None, Some(9)];
718        let mut cursor = TestCursor {
719            data: &data,
720            offset: None,
721        };
722        assert_eq!(cursor.next(Forward).unwrap(), Some(3));
723        assert_eq!(cursor.next(Forward).unwrap(), Some(5));
724        assert_eq!(cursor.next(Forward).unwrap(), Some(9));
725        assert_eq!(cursor.next(Forward).unwrap(), None);
726        assert_eq!(cursor.next(Reverse).unwrap(), Some(5));
727        assert_eq!(cursor.next(Reverse).unwrap(), Some(3));
728        assert_eq!(cursor.next(Reverse).unwrap(), None);
729        // Offset unchanged after reaching boundry
730        assert_eq!(cursor.get(), Some(3));
731    }
732
733    /// Test default implementation of jump_to_key().
734    #[test]
735    fn default_jump_to_key() {
736        let data = vec![None, Some(3), Some(5), None, None, Some(9)];
737        let mut cursor = TestCursor {
738            data: &data,
739            offset: None,
740        };
741        // Exact key
742        assert!(cursor.jump_to_key(&3, Forward).unwrap());
743        assert_eq!(cursor.get_key(), Some(3));
744        assert!(cursor.jump_to_key(&5, Reverse).unwrap());
745        assert_eq!(cursor.get_key(), Some(5));
746        // Closest key
747        assert!(cursor.jump_to_key(&7, Forward).unwrap());
748        assert_eq!(cursor.get_key(), Some(9));
749        assert!(cursor.jump_to_key(&4, Reverse).unwrap());
750        assert_eq!(cursor.get_key(), Some(3));
751        // No key satisfies direction constrain.
752        assert!(!cursor.jump_to_key(&10, Forward).unwrap());
753        assert_eq!(cursor.get_key(), Some(9));
754        assert!(!cursor.jump_to_key(&0, Reverse).unwrap());
755        assert_eq!(cursor.get_key(), None);
756    }
757
758    /// Test default implementation of get_near().
759    #[test]
760    fn default_get_near() {
761        let data = vec![Some(3), Some(5), None, None, Some(9)];
762        let mut cursor = TestCursor {
763            data: &data,
764            offset: None,
765        };
766        // Exact key
767        assert_eq!(cursor.get_near(&5, Forward).unwrap(), Some(5));
768        // Key in direction
769        assert_eq!(cursor.get_near(&4, Forward).unwrap(), Some(5));
770        assert_eq!(cursor.get_near(&4, Reverse).unwrap(), Some(3));
771        // Key in direction but no key there
772        assert_eq!(cursor.get_near(&2, Reverse).unwrap(), Some(3));
773        assert_eq!(cursor.get_near(&10, Forward).unwrap(), Some(9));
774    }
775
776    /// Test default implementation of get_next().
777    #[test]
778    fn default_get_next() {
779        let data = vec![Some(3), Some(5), None, None, Some(9)];
780        let mut cursor = TestCursor {
781            data: &data,
782            offset: None,
783        };
784        // Exact key
785        assert_eq!(cursor.get_next(&5, Forward).unwrap(), Some(5));
786        // Key in direction
787        assert_eq!(cursor.get_next(&4, Forward).unwrap(), Some(5));
788        assert_eq!(cursor.get_next(&4, Reverse).unwrap(), Some(3));
789        // Key in direction but no key there
790        assert_eq!(cursor.get_next(&2, Reverse).unwrap(), None);
791        assert_eq!(cursor.get_next(&10, Forward).unwrap(), None);
792    }
793
794    fn get_logger() -> Logger {
795        let plain = slog_term::PlainSyncDecorator::new(std::io::stderr());
796        Logger::root(slog_term::FullFormat::new(plain).build().fuse(), slog::o!())
797    }
798
799    /// Write a single sample in different ways and read it back.
800    fn simple_put_read(compression_mode: CompressionMode, format: Format) {
801        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
802        let ts = get_unix_timestamp(SystemTime::now());
803        let now = std::time::UNIX_EPOCH + std::time::Duration::from_secs(ts);
804        let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
805            .expect("Failed to create store");
806        let mut frame = DataFrame::default();
807        frame.sample.cgroup.memory_current = Some(42);
808        writer.put(now, &frame).expect("Failed to store data");
809
810        let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
811        let sample = cursor
812            .next(Forward)
813            .expect("Failed to read sample")
814            .expect("Did not find stored sample");
815        assert_eq!(sample, (now, frame));
816    }
817
818    #[test]
819    fn read_cbor() {
820        simple_put_read(CompressionMode::None, Format::Cbor);
821    }
822    #[test]
823    fn read_compressed_cbor() {
824        simple_put_read(CompressionMode::Zstd, Format::Cbor);
825    }
826
827    #[test]
828    fn read_dict_compressed_cbor() {
829        simple_put_read(
830            CompressionMode::ZstdDictionary(ChunkSizePo2(2)),
831            Format::Cbor,
832        );
833    }
834
835    /// For writing samples readable by the cursor and injecting corruptions.
836    /// Read correctness is tested above. Following tests only care about
837    /// whether the Cursor trait is implemented correctly, therefore using this
838    /// simplistic writer that only writes default data frame.
839    struct TestWriter {
840        path: PathBuf,
841    }
842
843    impl TestWriter {
844        pub fn new<P: AsRef<std::path::Path>>(path: P) -> Self {
845            Self {
846                path: path.as_ref().to_path_buf(),
847            }
848        }
849
850        pub fn put(&self, timestamp: u64) -> Result<()> {
851            self.put_helper(timestamp, false, false)
852        }
853        pub fn put_corrupt_index(&self, timestamp: u64) -> Result<()> {
854            self.put_helper(timestamp, true, false)
855        }
856        pub fn put_corrupt_data(&self, timestamp: u64) -> Result<()> {
857            self.put_helper(timestamp, false, true)
858        }
859
860        /// Similar to the StoreWriter but only writes default data frame. May
861        /// invalidate index or data entry by zeroing crc if requested.
862        fn put_helper(
863            &self,
864            timestamp: u64,
865            corrupt_index: bool,
866            corrupt_data: bool,
867        ) -> Result<()> {
868            let shard = timestamp - timestamp % SHARD_TIME;
869            let open_options = OpenOptions::new().create(true).append(true).clone();
870
871            let data_bytes = serialize_frame(&DataFrame::default(), Format::Cbor)
872                .context("Failed to serialize data frame")?;
873            let data_crc = if corrupt_data { 0 } else { data_bytes.crc32() };
874            let mut data_file = open_options
875                .open(self.path.join(format!("data_{:011}", shard)))
876                .context("Failed to open data file")?;
877            let offset = data_file
878                .metadata()
879                .context("Failed to get metadata of data file")?
880                .len();
881            data_file
882                .write_all(&data_bytes)
883                .context("Failed to write to data file")?;
884
885            let mut index_entry = IndexEntry {
886                timestamp,
887                offset,
888                len: data_bytes.len() as u32,
889                flags: IndexEntryFlags::CBOR,
890                data_crc,
891                index_crc: 0,
892            };
893            if !corrupt_index {
894                index_entry.index_crc = index_entry.crc32();
895            }
896            let entry_slice = unsafe {
897                std::slice::from_raw_parts(
898                    &index_entry as *const IndexEntry as *const u8,
899                    INDEX_ENTRY_SIZE,
900                )
901            };
902            open_options
903                .open(self.path.join(format!("index_{:011}", shard)))
904                .context("Failed to open index file")?
905                .write_all(entry_slice)
906                .context("Failed to write entry to index file")?;
907            Ok(())
908        }
909    }
910
911    /// Calling advance with nothing in the store and cursor uninitialized.
912    #[test]
913    fn advance_when_empty() {
914        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
915        let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
916
917        assert!(!cursor.advance(Forward).unwrap());
918        assert!(cursor.get_key().is_none());
919        assert!(!cursor.advance(Reverse).unwrap());
920        assert!(cursor.get_key().is_none());
921    }
922
923    /// Calling advance in both directions when cursor is at the last pos.
924    #[test]
925    fn advance_at_boundries() {
926        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
927        let ts = get_unix_timestamp(SystemTime::now());
928        let writer = TestWriter::new(&dir);
929        let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
930
931        writer.put(ts).unwrap();
932
933        // First sample
934        assert!(cursor.advance(Forward).unwrap());
935        assert_eq!(cursor.get_key(), Some(ts));
936        // No more sample forward. Still at first sample
937        assert!(!cursor.advance(Forward).unwrap());
938        assert_eq!(cursor.get_key(), Some(ts));
939        // No more sample reverse. Still at first sample.
940        assert!(!cursor.advance(Reverse).unwrap());
941        assert_eq!(cursor.get_key(), Some(ts));
942    }
943
944    /// Cursor moves back and forth.
945    #[test]
946    fn advance_simple() {
947        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
948        let ts = get_unix_timestamp(SystemTime::now());
949        let writer = TestWriter::new(&dir);
950        let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
951
952        writer.put(ts).unwrap();
953        writer.put(ts + 5).unwrap();
954        writer.put(ts + SHARD_TIME).unwrap();
955
956        // First sample
957        assert!(cursor.advance(Forward).unwrap());
958        assert_eq!(cursor.get_key(), Some(ts));
959        // Second sample in same shard
960        assert!(cursor.advance(Forward).unwrap());
961        assert_eq!(cursor.get_key(), Some(ts + 5));
962        // Third sample across shard
963        assert!(cursor.advance(Forward).unwrap());
964        assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME));
965        // Back to second sample
966        assert!(cursor.advance(Reverse).unwrap());
967        assert_eq!(cursor.get_key(), Some(ts + 5));
968        // Back to first sample
969        assert!(cursor.advance(Reverse).unwrap());
970        assert_eq!(cursor.get_key(), Some(ts));
971    }
972
973    /// Retry advance succeeds after updates.
974    #[test]
975    fn advance_retry() {
976        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
977        let ts = get_unix_timestamp(SystemTime::now());
978        let writer = TestWriter::new(&dir);
979        let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
980
981        writer.put(ts).unwrap();
982        assert!(cursor.advance(Forward).unwrap());
983        assert_eq!(cursor.get_key(), Some(ts));
984        assert!(!cursor.advance(Forward).unwrap());
985
986        // Advance succeeds after same shard update
987        writer.put(ts + 5).unwrap();
988        assert!(cursor.advance(Forward).unwrap());
989        assert_eq!(cursor.get_key(), Some(ts + 5));
990        assert!(!cursor.advance(Forward).unwrap());
991
992        // Advance succeeds after new shard update
993        writer.put(ts + SHARD_TIME).unwrap();
994        assert!(cursor.advance(Forward).unwrap());
995        assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME));
996        assert!(!cursor.advance(Forward).unwrap());
997    }
998
999    /// Get corrupt index/data should return None.
1000    #[test]
1001    fn get_corrupt() {
1002        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1003        let ts = get_unix_timestamp(SystemTime::now());
1004        let writer = TestWriter::new(&dir);
1005        let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1006
1007        // Both index and data are None if index alone is corrupted.
1008        writer.put_corrupt_index(ts).unwrap();
1009        assert!(cursor.advance(Forward).unwrap());
1010        assert!(cursor.get_key().is_none());
1011        assert!(cursor.get().is_none());
1012
1013        // Only data is None if data is corrupted but not index.
1014        writer.put_corrupt_data(ts + 5).unwrap();
1015        assert!(cursor.advance(Forward).unwrap());
1016        assert_eq!(cursor.get_key(), Some(ts + 5));
1017        assert!(cursor.get().is_none());
1018    }
1019
1020    /// Calling next skips corrupted index/data entries.
1021    #[test]
1022    fn skip_corrupt() {
1023        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1024        let ts = get_unix_timestamp(SystemTime::now());
1025        let writer = TestWriter::new(&dir);
1026        let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1027
1028        // Only 2 valid samples, separated by invalid ones across shards
1029        writer.put_corrupt_data(ts).unwrap();
1030        writer.put(ts + 5).unwrap(); // First valid sample
1031        writer.put_corrupt_data(ts + 5 * 2).unwrap();
1032        writer.put_corrupt_index(ts + 5 * 3).unwrap();
1033        writer.put_corrupt_data(ts + SHARD_TIME).unwrap();
1034        writer.put_corrupt_index(ts + SHARD_TIME * 2).unwrap();
1035        writer.put(ts + SHARD_TIME * 2 + 5).unwrap(); // Second one
1036        writer.put_corrupt_data(ts + SHARD_TIME * 3).unwrap();
1037
1038        // Calling next to move and get valid samples in both directions
1039        assert_eq!(
1040            get_unix_timestamp(cursor.next(Forward).unwrap().unwrap().0),
1041            ts + 5
1042        );
1043        assert_eq!(
1044            get_unix_timestamp(cursor.next(Forward).unwrap().unwrap().0),
1045            ts + SHARD_TIME * 2 + 5
1046        );
1047        // Offset unchanged
1048        assert!(cursor.next(Forward).unwrap().is_none());
1049        assert_eq!(
1050            get_unix_timestamp(cursor.next(Reverse).unwrap().unwrap().0),
1051            ts + 5
1052        );
1053    }
1054
1055    /// Ensure get and set cursor offset work as expected.
1056    #[test]
1057    fn manipulate_offset() {
1058        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1059        let ts = get_unix_timestamp(SystemTime::now());
1060        let writer = TestWriter::new(&dir);
1061        let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1062
1063        writer.put(ts).unwrap();
1064        writer.put(ts + 5).unwrap();
1065        writer.put(ts + SHARD_TIME * 2 + 5).unwrap();
1066
1067        let expected_offsets = &[
1068            StoreOffset::new(Some(ts), Some(0)),
1069            StoreOffset::new(Some(ts), Some(INDEX_ENTRY_SIZE)),
1070            StoreOffset::new(Some(ts + SHARD_TIME * 2), Some(0)),
1071        ];
1072
1073        // Verify offset values.
1074        assert_eq!(cursor.get_offset(), StoreOffset::default());
1075        assert!(cursor.advance(Forward).unwrap());
1076        assert_eq!(cursor.get_offset(), expected_offsets[0]);
1077        assert!(cursor.advance(Forward).unwrap());
1078        assert_eq!(cursor.get_offset(), expected_offsets[1]);
1079        assert!(cursor.advance(Forward).unwrap());
1080        assert_eq!(cursor.get_offset(), expected_offsets[2]);
1081
1082        // Get None from default offset.
1083        cursor.set_offset(StoreOffset::default());
1084        assert!(cursor.get_key().is_none());
1085        // Get None from invalid offsets.
1086        cursor.set_offset(StoreOffset::new(
1087            Some(ts + SHARD_TIME),
1088            Some(INDEX_ENTRY_SIZE),
1089        ));
1090        assert!(cursor.get_key().is_none());
1091        cursor.set_offset(StoreOffset::new(Some(ts + SHARD_TIME * 2), None));
1092        assert!(cursor.get_key().is_none());
1093        // Get values from expected offsets.
1094        cursor.set_offset(expected_offsets[1].clone());
1095        assert_eq!(cursor.get_key(), Some(ts + 5));
1096        cursor.set_offset(expected_offsets[0].clone());
1097        assert_eq!(cursor.get_key(), Some(ts));
1098        cursor.set_offset(expected_offsets[2].clone());
1099        assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME * 2 + 5));
1100    }
1101
1102    /// Calling advance after set_offset with invalid offsets.
1103    #[test]
1104    fn advance_from_invalid_offset() {
1105        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1106        let ts = get_unix_timestamp(SystemTime::now());
1107        let writer = TestWriter::new(&dir);
1108        let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1109
1110        writer.put(ts).unwrap();
1111        writer.put(ts + SHARD_TIME * 2).unwrap();
1112
1113        // To same shard
1114        cursor.set_offset(StoreOffset::new(Some(ts), Some(INDEX_ENTRY_SIZE)));
1115        assert!(cursor.advance(Reverse).unwrap());
1116        assert_eq!(cursor.get_key(), Some(ts));
1117        // To different shard
1118        cursor.set_offset(StoreOffset::new(Some(ts + SHARD_TIME), Some(0)));
1119        assert!(cursor.advance(Forward).unwrap());
1120        assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME * 2));
1121        // Over boundry (offset shouldn't change)
1122        cursor.set_offset(StoreOffset::new(Some(ts + SHARD_TIME * 4), Some(0)));
1123        assert!(!cursor.advance(Forward).unwrap());
1124        assert_eq!(
1125            cursor.get_offset(),
1126            StoreOffset::new(Some(ts + SHARD_TIME * 4), Some(0))
1127        );
1128    }
1129
1130    /// Ensure jump_to_key works as expected.
1131    #[test]
1132    fn jump_to_key() {
1133        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1134        let ts = get_unix_timestamp(SystemTime::now());
1135        let writer = TestWriter::new(&dir);
1136        let mut cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1137
1138        writer.put(ts + 5).unwrap();
1139        writer.put(ts + 5 * 20).unwrap();
1140        writer.put(ts + 5 * 21).unwrap();
1141        writer.put(ts + SHARD_TIME * 2).unwrap();
1142
1143        // Jump to exact key
1144        cursor.jump_to_key(&(ts + 5), Forward).unwrap();
1145        assert_eq!(cursor.get_key(), Some(ts + 5));
1146        cursor.jump_to_key(&(ts + SHARD_TIME * 2), Reverse).unwrap();
1147        assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME * 2));
1148
1149        // Jump over the boundry gets the last valid samples.
1150        cursor.jump_to_key(&(ts), Reverse).unwrap();
1151        assert_eq!(cursor.get_key(), Some(ts + 5));
1152        cursor.jump_to_key(&(ts + SHARD_TIME * 3), Forward).unwrap();
1153        assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME * 2));
1154
1155        // Jump to cloest sample
1156        cursor.jump_to_key(&(ts + 5 * 100), Forward).unwrap();
1157        assert_eq!(cursor.get_key(), Some(ts + SHARD_TIME * 2));
1158        cursor.jump_to_key(&(ts + 5 * 100), Reverse).unwrap();
1159        assert_eq!(cursor.get_key(), Some(ts + 5 * 21));
1160    }
1161}