below_store/
lib.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![deny(clippy::all)]
16
17use std::fs;
18use std::fs::File;
19use std::fs::OpenOptions;
20use std::io::ErrorKind;
21use std::io::Write;
22use std::os::unix::io::AsRawFd;
23use std::path::Path;
24use std::path::PathBuf;
25use std::time::SystemTime;
26
27use anyhow::bail;
28use anyhow::Context;
29use anyhow::Result;
30use bitflags::bitflags;
31use common::fileutil::get_dir_size;
32use common::open_source_shim;
33use common::util::get_unix_timestamp;
34use model::Model;
35use serde::Deserialize;
36use serde::Serialize;
37use slog::info;
38use slog::warn;
39use static_assertions::const_assert_eq;
40
41use crate::compression::Compressor;
42use crate::cursor::KeyedCursor;
43use crate::cursor::StoreCursor;
44
45pub mod advance;
46pub mod compression;
47pub mod cursor;
48#[cfg(test)]
49mod test;
50
51pub type Advance = advance::Advance<DataFrame, Model>;
52
53open_source_shim!();
54
55/// This data store works as follows:
56///
57/// Each data item (e.g. DataFrame) is simply appended to a data file.
58///
59/// An IndexEntry is appended to a corresponding index file. Each
60/// IndexEntry contains the timestamp (e.g. key) of the data item, its
61/// offset into the data file, the length of the data entry, and a CRC
62/// of the data entry as well as a CRC of itself. It also contains
63/// flags that can indicate if the corresponding data is compressed
64/// and how it is compressed.
65///
66/// The CRCs in the index entry give us an atomicity guarantee - if
67/// they are not present and correct, we treat it as if the entry
68/// never existed.
69///
70/// In dictionary compression mode, the index file may be padded with
71/// zeros (i.e. empty index entries). Thus empty index entries are
72/// not considered to be corrupt, but we ignore such entries as they
73/// do not point to any data.
74///
75/// Data and Index files are append-only and never modified (only ever
76/// removed).
77///
78/// Data and Index files are sharded by SHARD_TIME - e.g. any one file
79/// only contains data or index entries whose timestamps are congruent
80/// modulo SHARD_TIME. This allows data and index files to be cleaned
81/// up by just unlinking the files.
82
83#[derive(Default, Clone, PartialEq, Debug, Serialize, Deserialize)]
84pub struct DataFrame {
85    pub sample: model::Sample,
86}
87
88const SHARD_TIME: u64 = 24 * 60 * 60;
89
90// Number of bits used by other bit flags in IndexEntry before the
91// chunk compress flags.
92const CHUNK_COMPRESS_SHIFT: u32 = 2;
93
94const MAX_CHUNK_COMPRESS_SIZE_PO2: u32 = 0x0F;
95pub const MAX_CHUNK_COMPRESS_SIZE: u32 = 1 << MAX_CHUNK_COMPRESS_SIZE_PO2;
96const_assert_eq!(MAX_CHUNK_COMPRESS_SIZE, 32768);
97
98bitflags! {
99    #[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Clone, Copy)]
100    struct IndexEntryFlags: u32 {
101        /// If set, data item is compressed with zstd.
102        const COMPRESSED = 0x1;
103        /// If set, data item is serialized as CBOR. If unset,
104        /// serialization is set to the default (also CBOR in the
105        /// case of open source build).
106        const CBOR = 0x2;
107        /// If `COMPRESSED` is set `CHUNK_COMPRESS_SIZE_PO2` is
108        /// non-zero, then zstd dictionary compression is used.
109        /// Data is return in "chunks" of size
110        /// `2^CHUNK_COMPRESS_SIZE_PO2` entries. The first entry of
111        /// each chunk, in its uncompressed form, is used as the zstd
112        /// dictionary for the rest of the chunk.
113        ///
114        /// Chunks are aligned to chunk size. The index is padded
115        /// with empty index entries as necessary. For example, if
116        /// below is started with a chunk size of 4, and the index
117        /// has 5 entries, then the index will be zero-padded to the
118        /// length of 8 entries before the first dict key frame is
119        /// written. Padding occurs on a restart in recording, but
120        /// can also occur if possible data corruption has been
121        /// detected.
122        const CHUNK_COMPRESS_SIZE_PO2 = MAX_CHUNK_COMPRESS_SIZE_PO2 << CHUNK_COMPRESS_SHIFT;
123    }
124}
125
126impl IndexEntryFlags {
127    fn get_chunk_compress_size_po2(&self) -> u32 {
128        (self.bits() & Self::CHUNK_COMPRESS_SIZE_PO2.bits()) >> CHUNK_COMPRESS_SHIFT
129    }
130
131    fn set_chunk_compress_size_po2(&mut self, chunk_compress_size_po2: u32) -> Result<()> {
132        if chunk_compress_size_po2 > MAX_CHUNK_COMPRESS_SIZE_PO2 {
133            bail!(
134                "Chunk compress size po2 should be less than or equal to {}",
135                MAX_CHUNK_COMPRESS_SIZE_PO2
136            );
137        }
138        *self |= IndexEntryFlags::from_bits_retain(chunk_compress_size_po2 << CHUNK_COMPRESS_SHIFT);
139        Ok(())
140    }
141}
142
143#[repr(C)]
144struct IndexEntry {
145    /// Timestamp of the data entry
146    timestamp: u64,
147    /// Offset into the data file that this entry exists at
148    offset: u64,
149    /// Length of the data entry
150    len: u32,
151    /// Flags for this data entry
152    flags: IndexEntryFlags,
153    /// crc32 of the data entry
154    data_crc: u32,
155    /// crc32 of this entry (e.g. crc32 of all the above members)
156    index_crc: u32,
157}
158
159const INDEX_ENTRY_SIZE: usize = std::mem::size_of::<IndexEntry>();
160const INDEX_ENTRY_SIZE_PO2: u32 = INDEX_ENTRY_SIZE.trailing_zeros();
161const_assert_eq!(INDEX_ENTRY_SIZE, 32);
162
163#[derive(Copy, Clone, Debug)]
164pub struct ChunkSizePo2(pub u32);
165
166#[derive(Copy, Clone, Debug)]
167pub enum CompressionMode {
168    None,
169    Zstd,
170    ZstdDictionary(ChunkSizePo2),
171}
172
173/// The StoreWriter struct maintains state to put more data in the
174/// store. It keeps track of the index and data file it's currently
175/// working on so in the common case it can just append data. When it
176/// rolls over to a new shard, it will recreate itself.
177// #[derive(Debug)]
178pub struct StoreWriter {
179    logger: slog::Logger,
180    /// Directory of the store itself
181    dir: PathBuf,
182    /// Currently active index file. Append only so cursor always
183    /// points to end of file.
184    index: File,
185    /// Currently active data file. Append only so cursor always
186    /// point to end of file.
187    data: File,
188    /// Current length of the data file (needed to record offsets in
189    /// the index)
190    data_len: u64,
191    /// Active shard
192    shard: u64,
193    /// Cached compressor for memory efficiency. Compressor also stores key
194    /// frame for dict compression.
195    compressor: Option<Compressor>,
196    /// If non-empty, individual frames are compressed with
197    /// `compression_mode`.
198    compression_mode: CompressionMode,
199    /// Serialization format of data frames
200    format: Format,
201}
202
203// Given path to the store dir, get a Vec<String> of the index file
204// names, sorted
205fn get_index_files(path: &Path) -> Result<Vec<String>> {
206    let mut entries = fs::read_dir(path)
207        .with_context(|| format!("Failed to read directory {}", path.display()))?
208        .filter_map(|res_ent| {
209            res_ent
210                .map(|ent| {
211                    ent.file_name()
212                        .to_str()
213                        .filter(|s| s.starts_with("index"))
214                        .map(|s| s.to_string())
215                })
216                .transpose()
217        })
218        .collect::<Result<Vec<_>, std::io::Error>>()
219        .with_context(|| format!("Failed to read directory entries in {}", path.display()))?;
220
221    entries.sort_unstable();
222    Ok(entries)
223}
224
225enum SerializedFrame<'a> {
226    Owned(bytes::Bytes),
227    Borrowed(&'a [u8]),
228}
229
230impl AsRef<[u8]> for SerializedFrame<'_> {
231    fn as_ref(&self) -> &[u8] {
232        match self {
233            SerializedFrame::Owned(b) => b.as_ref(),
234            SerializedFrame::Borrowed(s) => s,
235        }
236    }
237}
238
239impl SerializedFrame<'_> {
240    fn into_owned(self) -> bytes::Bytes {
241        match self {
242            SerializedFrame::Owned(b) => b,
243            SerializedFrame::Borrowed(s) => bytes::Bytes::copy_from_slice(s),
244        }
245    }
246}
247
248/// Serialization format. Currently only Cbor is supported.
249#[derive(Copy, Clone, Debug)]
250pub enum Format {
251    Cbor,
252}
253
254/// Serialize a single data frame with `format` format.
255fn serialize_frame(data: &DataFrame, format: Format) -> Result<bytes::Bytes> {
256    match format {
257        Format::Cbor => {
258            let bytes = serde_cbor::to_vec(data)?;
259            Ok(bytes::Bytes::from(bytes))
260        }
261    }
262}
263
264/// Deserialize a single data frame with `format` format.
265fn deserialize_frame(bytes: &[u8], format: Format) -> Result<DataFrame> {
266    match format {
267        Format::Cbor => {
268            let data_frame = serde_cbor::from_slice(bytes)?;
269            Ok(data_frame)
270        }
271    }
272}
273
274impl StoreWriter {
275    /// Create a new `StoreWriter` that writes data to `path`
276    /// directory. Data serialized with `format`.
277    ///
278    /// If `compression_mode` is set, dataframes are zstd compressed,
279    /// as defined by `compression_mode`.
280    pub fn new<P: AsRef<Path>>(
281        logger: slog::Logger,
282        path: P,
283        compression_mode: CompressionMode,
284        format: Format,
285    ) -> Result<Self> {
286        Self::new_with_timestamp(logger, path, SystemTime::now(), compression_mode, format)
287    }
288
289    pub fn new_with_timestamp<P: AsRef<Path>>(
290        logger: slog::Logger,
291        path: P,
292        timestamp: SystemTime,
293        compression_mode: CompressionMode,
294        format: Format,
295    ) -> Result<Self> {
296        let shard = calculate_shard(timestamp);
297        Self::new_with_shard(logger, path, shard, compression_mode, format)
298    }
299
300    fn new_with_shard<P: AsRef<Path>>(
301        logger: slog::Logger,
302        path: P,
303        shard: u64,
304        compression_mode: CompressionMode,
305        format: Format,
306    ) -> Result<Self> {
307        if !path.as_ref().is_dir() {
308            std::fs::create_dir(&path).with_context(|| {
309                format!("Failed to create store path: {}", path.as_ref().display())
310            })?;
311        }
312
313        let (data_path, index_path) = {
314            let mut data_path = path.as_ref().to_path_buf();
315            let mut index_path = data_path.clone();
316            data_path.push(format!("data_{:011}", shard));
317            index_path.push(format!("index_{:011}", shard));
318            (data_path, index_path)
319        };
320
321        let index = OpenOptions::new()
322            .append(true)
323            .create(true)
324            .open(index_path.as_path())
325            .with_context(|| format!("Failed to open index file: {}", index_path.display()))?;
326        nix::fcntl::flock(
327            index.as_raw_fd(),
328            nix::fcntl::FlockArg::LockExclusiveNonblock,
329        )
330        .with_context(|| {
331            format!(
332                "Failed to acquire file lock on index file: {}",
333                index_path.display(),
334            )
335        })?;
336
337        let data = OpenOptions::new()
338            .append(true)
339            .create(true)
340            .open(data_path.as_path())
341            .with_context(|| format!("Failed to open data file: {}", data_path.display()))?;
342        nix::fcntl::flock(
343            data.as_raw_fd(),
344            nix::fcntl::FlockArg::LockExclusiveNonblock,
345        )
346        .with_context(|| {
347            format!(
348                "Failed to acquire file lock on data file: {}",
349                data_path.display(),
350            )
351        })?;
352
353        let data_len = data
354            .metadata()
355            .with_context(|| {
356                format!(
357                    "Failed to get metadata of data file: {}",
358                    data_path.display()
359                )
360            })?
361            .len();
362
363        Ok(StoreWriter {
364            logger,
365            dir: path.as_ref().to_path_buf(),
366            index,
367            data,
368            data_len,
369            shard,
370            // First compressed write initializes the compressor
371            compressor: None,
372            compression_mode,
373            format,
374        })
375    }
376
377    /// The index file is padded to the next (1 << alignment_po2) aligned
378    /// boundary. Both the original and aligned lengths are then returned.
379    /// Mostly used to align index file with INDEX_ENTRY_SIZE or chunk size if
380    /// dictionary compression is used. Misalignment can happen if we partially
381    /// wrote an index entry, a new chunk must be used, an external actor
382    /// modified the index file, etc.
383    fn pad_and_get_index_len(index: &mut File, alignment_po2: u32) -> Result<(u64, u64)> {
384        let index_len = index
385            .metadata()
386            .context("Failed to get metadata of index file")?
387            .len();
388        let alignment_mask = (1 << alignment_po2) - 1;
389        let aligned_len = (index_len + alignment_mask) & !alignment_mask;
390        if aligned_len != index_len {
391            index
392                .set_len(aligned_len)
393                .context("Failed to pad index file")?;
394            // Since file is opened as append only, we don't need to
395            // move the cursor to end of file.
396        }
397        Ok((index_len, aligned_len))
398    }
399
400    /// For the given `DataFrame` and an optional Compressor mut ref, returns a
401    /// tuple consisting of:
402    ///   1) Raw bytes to write to the data file
403    ///   2) Flags to write to the index entry
404    /// For compressed write, the Compressor will be initialized if None, and
405    /// potentially updated. is_key_frame is used to indicate the start of a new
406    /// chunk if dictionary compression is enabled.
407    fn get_bytes_and_flags_for_frame(
408        &self,
409        data_frame: &DataFrame,
410        compressor: &mut Option<Compressor>,
411        is_key_frame: bool,
412    ) -> Result<(bytes::Bytes, IndexEntryFlags)> {
413        let mut flags = match self.format {
414            Format::Cbor => IndexEntryFlags::CBOR,
415        };
416        // Get serialized data frame
417        let frame_bytes =
418            serialize_frame(data_frame, self.format).context("Failed to serialize data frame")?;
419        let serialized = match self.compression_mode {
420            CompressionMode::None => frame_bytes,
421            CompressionMode::Zstd => {
422                flags |= IndexEntryFlags::COMPRESSED;
423                compressor
424                    .get_or_insert_with(Compressor::new)
425                    .compress_with_dict_reset(&frame_bytes)
426                    .context("Failed to compress data")?
427            }
428            CompressionMode::ZstdDictionary(ChunkSizePo2(chunk_size_po2)) => {
429                flags |= IndexEntryFlags::COMPRESSED;
430                flags
431                    .set_chunk_compress_size_po2(chunk_size_po2)
432                    .expect("bug: invalid chunk compress size");
433                let compressor = compressor.get_or_insert_with(Compressor::new);
434                if is_key_frame {
435                    let serialized = compressor
436                        .compress_with_dict_reset(&frame_bytes)
437                        .context("Failed to compress key frame")?;
438                    compressor
439                        .load_dict(&frame_bytes)
440                        .context("Failed to set key frame as dict")?;
441                    serialized
442                } else {
443                    compressor
444                        .compress_with_loaded_dict(&frame_bytes)
445                        .context("Failed to compress data frame")?
446                }
447            }
448        };
449        Ok((serialized, flags))
450    }
451
452    /// Store data with corresponding timestamp in current shard.
453    /// Fails if data does not belong to current shard. Errors may be
454    /// returned if file operations fail.
455    fn put_in_current_shard(&mut self, timestamp: SystemTime, data: &DataFrame) -> Result<()> {
456        let shard = calculate_shard(timestamp);
457        if shard != self.shard {
458            panic!("Can't write data to shard as it belongs to different shard")
459        }
460
461        // PO2 chunk size in bytes if dict compression is used, otherwise 0.
462        let chunk_alignment_po2 =
463            if let CompressionMode::ZstdDictionary(ChunkSizePo2(chunk_size_po2)) =
464                self.compression_mode
465            {
466                // chunk_size_po2 is in number of entries. Add with entry size
467                // po2 to get size in bytes po2.
468                chunk_size_po2 + INDEX_ENTRY_SIZE_PO2
469            } else {
470                0
471            };
472        // If dict compression is used but Compressor uninitialized, e.g. new
473        // shard, previous write failed, then pad index to start a new chunk.
474        // Otherwise pad to ensure index file is aligned with INDEX_ENTRY_SIZE.
475        let alignment_po2 = if chunk_alignment_po2 != 0 && self.compressor.is_none() {
476            chunk_alignment_po2
477        } else {
478            INDEX_ENTRY_SIZE_PO2
479        };
480        let (index_len, aligned_len) = Self::pad_and_get_index_len(&mut self.index, alignment_po2)
481            .with_context(|| {
482                format!(
483                    "Failed to get index length and possibly pad index file: index_{:011}",
484                    shard
485                )
486            })?;
487        if index_len != aligned_len {
488            if alignment_po2 == INDEX_ENTRY_SIZE_PO2 {
489                warn!(
490                    self.logger,
491                    "Index length not a multiple of fixed index entry size: {}. Padded to size: {}",
492                    index_len,
493                    aligned_len,
494                );
495            } else if alignment_po2 == chunk_alignment_po2 {
496                // Always happen when below restarts. Thus log with info level
497                info!(
498                    self.logger,
499                    "Padded index so that first entry of block is aligned. Previous len: {}. New len: {}",
500                    index_len,
501                    aligned_len,
502                );
503            } else {
504                panic!("Unexpected alignment_po2 value");
505            }
506        }
507
508        // Take the compressor from self before modifying it. In case any write
509        // failure occurs, the old compressor (potentially in bad state) will be
510        // discarded and a new one be created in the next write. No-op if
511        // compression is not used.
512        let mut compressor = self.compressor.take();
513        // If dict compression is used and the index file is chunk aligned, the
514        // current frame is the key frame.
515        let is_key_frame =
516            chunk_alignment_po2 != 0 && aligned_len.trailing_zeros() >= chunk_alignment_po2;
517        let (serialized, flags) = self
518            .get_bytes_and_flags_for_frame(data, &mut compressor, is_key_frame)
519            .context("Failed to get serialized frame and flags")?;
520
521        // Appends to data file are large and cannot be atomic. We
522        // may have partial writes that increase file size without
523        // updating the stored state. Thus always read actual data
524        // file length. This is less of an issue for the index file
525        // but we track it anyway.
526        let data_len = self
527            .data
528            .metadata()
529            .with_context(|| {
530                format!(
531                    "Failed to get metadata of data file: data_{:011}",
532                    self.shard
533                )
534            })?
535            .len();
536        // Warn potential data file corruption
537        if self.data_len != data_len {
538            warn!(
539                self.logger,
540                "Data length mismatch: {} (expect {})", data_len, self.data_len
541            );
542            self.data_len = data_len;
543        }
544
545        let offset = self.data_len;
546        // It doesn't really matter which order we write the data in,
547        // most filesystems do not provide ordering guarantees for
548        // appends to different files anyways. We just need to handle
549        // various failure cases on the read side.
550        self.data
551            .write_all(&serialized)
552            .context("Failed to write entry to data file")?;
553        self.data_len += serialized.len() as u64;
554        let data_crc = serialized.crc32();
555
556        let mut index_entry = IndexEntry {
557            timestamp: get_unix_timestamp(timestamp),
558            offset,
559            flags,
560            len: serialized
561                .len()
562                .try_into()
563                .with_context(|| format!("Serialized len={} overflows u32", serialized.len()))?,
564            data_crc,
565            index_crc: 0,
566        };
567        index_entry.index_crc = index_entry.crc32();
568        {
569            // unsafe to turn this into a slice - we need this to write it though
570            let entry_slice = unsafe {
571                std::slice::from_raw_parts(
572                    &index_entry as *const IndexEntry as *const u8,
573                    INDEX_ENTRY_SIZE,
574                )
575            };
576            self.index
577                .write_all(entry_slice)
578                .context("Failed to write entry to index file")?;
579        }
580
581        // Set compressor only after successful writes. No-op if not in
582        // compression mode
583        self.compressor = compressor;
584        Ok(())
585    }
586
587    /// Store data with corresponding timestamp. Returns true if a new shard
588    /// is created and data is written successfully. Errors may be returned if
589    /// file operations fail.
590    pub fn put(&mut self, timestamp: SystemTime, data: &DataFrame) -> Result<bool> {
591        let shard = calculate_shard(timestamp);
592        if shard != self.shard {
593            // We just recreate the StoreWriter since this is a new shard
594            let mut writer = Self::new_with_shard(
595                self.logger.clone(),
596                self.dir.as_path(),
597                shard,
598                self.compression_mode,
599                self.format,
600            )?;
601            // Set self to new shard only if we succeed in writing the first
602            // frame. If we don't do this, we may "forget" returning a true
603            // for a new shard where the first write fails.
604            writer.put_in_current_shard(timestamp, data)?;
605            *self = writer;
606            Ok(true)
607        } else {
608            self.put_in_current_shard(timestamp, data)?;
609            Ok(false)
610        }
611    }
612
613    /// Discard shards from the oldest first until f(shard_timestamp) is true
614    /// or we've reached the current shard. Returns true if f(shard_timestamp)
615    /// is true for the last shard visited or false otherwise.
616    fn discard_until<F>(&self, f: F) -> Result<bool>
617    where
618        F: Fn(u64) -> bool,
619    {
620        let entries = get_index_files(self.dir.as_path())?;
621
622        // Entries are sorted with increasing timestamp
623        for entry in entries {
624            let v: Vec<&str> = entry.split('_').collect();
625            if v.len() != 2 {
626                warn!(self.logger, "Invalid index file name: {}", entry);
627                continue;
628            }
629
630            let entry_shard = match v[1].parse::<u64>() {
631                Ok(val) => val,
632                _ => {
633                    warn!(self.logger, "Cannot parse index shard: {}", entry);
634                    continue;
635                }
636            };
637
638            if f(entry_shard) {
639                return Ok(true);
640            }
641            if entry_shard >= self.shard {
642                return Ok(false);
643            }
644
645            // Removal order doesn't matter at all, it's the
646            // responsibility of the read side to handle missing files
647            let mut index_path = self.dir.clone();
648            index_path.push(&entry);
649
650            match std::fs::remove_file(&index_path) {
651                Err(e) if e.kind() != ErrorKind::NotFound => {
652                    return Err(e).context(format!(
653                        "Failed to remove index file: {}",
654                        index_path.display()
655                    ));
656                }
657                _ => {}
658            };
659
660            let mut data_path = self.dir.clone();
661            data_path.push(format!("data_{:011}", entry_shard));
662
663            match std::fs::remove_file(&data_path) {
664                Err(e) if e.kind() != ErrorKind::NotFound => {
665                    return Err(e).context(format!(
666                        "Failed to remove data file: {}",
667                        data_path.display()
668                    ));
669                }
670                _ => {}
671            };
672        }
673        Ok(false)
674    }
675
676    /// Discard all data earlier than timestamp
677    ///
678    /// We do not modify index and data files. We just look for files
679    /// which can only contain earlier data and remove them.
680    pub fn discard_earlier(&self, timestamp: SystemTime) -> Result<()> {
681        let shard = calculate_shard(timestamp);
682        self.discard_until(|shard_timestamp| shard_timestamp >= shard)?;
683        Ok(())
684    }
685
686    /// Discard data until store size is less than limit, or there is only one
687    /// shard left. Oldest shards are discarded first. Returns true on success
688    /// or false if the current shard size is greater than the limit.
689    pub fn try_discard_until_size(&self, store_size_limit: u64) -> Result<bool> {
690        let dir = self.dir.clone();
691        self.discard_until(|_| {
692            let size = get_dir_size(dir.clone());
693            size <= store_size_limit
694        })
695    }
696}
697
698/// Direction to scan for next sample
699#[derive(Clone, Copy, Debug, Eq, PartialEq)]
700pub enum Direction {
701    Forward,
702    Reverse,
703}
704
705impl Direction {
706    pub fn get_skip_order(&self) -> std::cmp::Ordering {
707        match self {
708            Direction::Forward => std::cmp::Ordering::Less,
709            Direction::Reverse => std::cmp::Ordering::Greater,
710        }
711    }
712
713    pub fn flip(&self) -> Self {
714        match self {
715            Direction::Forward => Direction::Reverse,
716            Direction::Reverse => Direction::Forward,
717        }
718    }
719}
720
721/// Convenient function to read the first sample at timestamp or after
722/// timestamp in direction. Prefer directly using StoreCursor for sequential
723/// reads
724pub fn read_next_sample<P: AsRef<Path>>(
725    path: P,
726    timestamp: SystemTime,
727    direction: Direction,
728    logger: slog::Logger,
729) -> Result<Option<(SystemTime, DataFrame)>> {
730    let mut cursor = cursor::StoreCursor::new(logger, path.as_ref().to_path_buf());
731    cursor.get_next(&get_unix_timestamp(timestamp), direction)
732}
733
734pub trait Store {
735    // We intentionally make this trait generic which not tied to the DataFrame and Model
736    // type for ease of testing.
737    // For LocalStore and RemoteStore, SampleType will be DataFrame
738    // For FakeStore, SampleType will be u64
739    type SampleType;
740
741    /// Return the sample time and data frame. Needs to be implemented by
742    /// all stores.
743    // This function should return the data sample at the provided timestamp.
744    // If no sample available at the given timestamp, it will return the
745    // first sample after the timestamp if the direction is forward. Otherwise
746    // it will return the last sample before the timestamp. This function should
747    // return None in the following situation:
748    // * reverse search a target that has timestamp earlier than the first recorded
749    //   sample
750    // * forward search a target that has timestamp later than the last recorded
751    //   sample
752    fn get_sample_at_timestamp(
753        &mut self,
754        timestamp: SystemTime,
755        direction: Direction,
756    ) -> Result<Option<(SystemTime, Self::SampleType)>>;
757}
758
759pub struct LocalStore {
760    store_cursor: StoreCursor,
761}
762
763pub struct RemoteStore {
764    store: crate::remote_store::RemoteStore,
765}
766
767impl LocalStore {
768    pub fn new(logger: slog::Logger, dir: PathBuf) -> Self {
769        Self {
770            store_cursor: StoreCursor::new(logger, dir),
771        }
772    }
773}
774
775impl RemoteStore {
776    pub fn new(host: String, port: Option<u16>) -> Result<Self> {
777        Ok(Self {
778            store: crate::remote_store::RemoteStore::new(host, port)?,
779        })
780    }
781}
782
783impl Store for LocalStore {
784    type SampleType = DataFrame;
785
786    fn get_sample_at_timestamp(
787        &mut self,
788        timestamp: SystemTime,
789        direction: Direction,
790    ) -> Result<Option<(SystemTime, Self::SampleType)>> {
791        self.store_cursor
792            .get_next(&get_unix_timestamp(timestamp), direction)
793    }
794}
795
796impl Store for RemoteStore {
797    type SampleType = DataFrame;
798
799    fn get_sample_at_timestamp(
800        &mut self,
801        timestamp: SystemTime,
802        direction: Direction,
803    ) -> Result<Option<(SystemTime, Self::SampleType)>> {
804        self.store
805            .get_frame(get_unix_timestamp(timestamp), direction)
806    }
807}
808
809trait Crc32 {
810    fn crc32(&self) -> u32;
811}
812
813/// Lookup table for byte-by-byte crc32 computation
814const CRC32_TABLE: [u32; 256] = [
815    0, 0x77073096, 0xEE0E612C, 0x990951BA, 0x076DC419, 0x706AF48F, 0xE963A535, 0x9E6495A3,
816    0x0EDB8832, 0x79DCB8A4, 0xE0D5E91E, 0x97D2D988, 0x09B64C2B, 0x7EB17CBD, 0xE7B82D07, 0x90BF1D91,
817    0x1DB71064, 0x6AB020F2, 0xF3B97148, 0x84BE41DE, 0x1ADAD47D, 0x6DDDE4EB, 0xF4D4B551, 0x83D385C7,
818    0x136C9856, 0x646BA8C0, 0xFD62F97A, 0x8A65C9EC, 0x14015C4F, 0x63066CD9, 0xFA0F3D63, 0x8D080DF5,
819    0x3B6E20C8, 0x4C69105E, 0xD56041E4, 0xA2677172, 0x3C03E4D1, 0x4B04D447, 0xD20D85FD, 0xA50AB56B,
820    0x35B5A8FA, 0x42B2986C, 0xDBBBC9D6, 0xACBCF940, 0x32D86CE3, 0x45DF5C75, 0xDCD60DCF, 0xABD13D59,
821    0x26D930AC, 0x51DE003A, 0xC8D75180, 0xBFD06116, 0x21B4F4B5, 0x56B3C423, 0xCFBA9599, 0xB8BDA50F,
822    0x2802B89E, 0x5F058808, 0xC60CD9B2, 0xB10BE924, 0x2F6F7C87, 0x58684C11, 0xC1611DAB, 0xB6662D3D,
823    0x76DC4190, 0x01DB7106, 0x98D220BC, 0xEFD5102A, 0x71B18589, 0x06B6B51F, 0x9FBFE4A5, 0xE8B8D433,
824    0x7807C9A2, 0x0F00F934, 0x9609A88E, 0xE10E9818, 0x7F6A0DBB, 0x086D3D2D, 0x91646C97, 0xE6635C01,
825    0x6B6B51F4, 0x1C6C6162, 0x856530D8, 0xF262004E, 0x6C0695ED, 0x1B01A57B, 0x8208F4C1, 0xF50FC457,
826    0x65B0D9C6, 0x12B7E950, 0x8BBEB8EA, 0xFCB9887C, 0x62DD1DDF, 0x15DA2D49, 0x8CD37CF3, 0xFBD44C65,
827    0x4DB26158, 0x3AB551CE, 0xA3BC0074, 0xD4BB30E2, 0x4ADFA541, 0x3DD895D7, 0xA4D1C46D, 0xD3D6F4FB,
828    0x4369E96A, 0x346ED9FC, 0xAD678846, 0xDA60B8D0, 0x44042D73, 0x33031DE5, 0xAA0A4C5F, 0xDD0D7CC9,
829    0x5005713C, 0x270241AA, 0xBE0B1010, 0xC90C2086, 0x5768B525, 0x206F85B3, 0xB966D409, 0xCE61E49F,
830    0x5EDEF90E, 0x29D9C998, 0xB0D09822, 0xC7D7A8B4, 0x59B33D17, 0x2EB40D81, 0xB7BD5C3B, 0xC0BA6CAD,
831    0xEDB88320, 0x9ABFB3B6, 0x03B6E20C, 0x74B1D29A, 0xEAD54739, 0x9DD277AF, 0x04DB2615, 0x73DC1683,
832    0xE3630B12, 0x94643B84, 0x0D6D6A3E, 0x7A6A5AA8, 0xE40ECF0B, 0x9309FF9D, 0x0A00AE27, 0x7D079EB1,
833    0xF00F9344, 0x8708A3D2, 0x1E01F268, 0x6906C2FE, 0xF762575D, 0x806567CB, 0x196C3671, 0x6E6B06E7,
834    0xFED41B76, 0x89D32BE0, 0x10DA7A5A, 0x67DD4ACC, 0xF9B9DF6F, 0x8EBEEFF9, 0x17B7BE43, 0x60B08ED5,
835    0xD6D6A3E8, 0xA1D1937E, 0x38D8C2C4, 0x4FDFF252, 0xD1BB67F1, 0xA6BC5767, 0x3FB506DD, 0x48B2364B,
836    0xD80D2BDA, 0xAF0A1B4C, 0x36034AF6, 0x41047A60, 0xDF60EFC3, 0xA867DF55, 0x316E8EEF, 0x4669BE79,
837    0xCB61B38C, 0xBC66831A, 0x256FD2A0, 0x5268E236, 0xCC0C7795, 0xBB0B4703, 0x220216B9, 0x5505262F,
838    0xC5BA3BBE, 0xB2BD0B28, 0x2BB45A92, 0x5CB36A04, 0xC2D7FFA7, 0xB5D0CF31, 0x2CD99E8B, 0x5BDEAE1D,
839    0x9B64C2B0, 0xEC63F226, 0x756AA39C, 0x026D930A, 0x9C0906A9, 0xEB0E363F, 0x72076785, 0x05005713,
840    0x95BF4A82, 0xE2B87A14, 0x7BB12BAE, 0x0CB61B38, 0x92D28E9B, 0xE5D5BE0D, 0x7CDCEFB7, 0x0BDBDF21,
841    0x86D3D2D4, 0xF1D4E242, 0x68DDB3F8, 0x1FDA836E, 0x81BE16CD, 0xF6B9265B, 0x6FB077E1, 0x18B74777,
842    0x88085AE6, 0xFF0F6A70, 0x66063BCA, 0x11010B5C, 0x8F659EFF, 0xF862AE69, 0x616BFFD3, 0x166CCF45,
843    0xA00AE278, 0xD70DD2EE, 0x4E048354, 0x3903B3C2, 0xA7672661, 0xD06016F7, 0x4969474D, 0x3E6E77DB,
844    0xAED16A4A, 0xD9D65ADC, 0x40DF0B66, 0x37D83BF0, 0xA9BCAE53, 0xDEBB9EC5, 0x47B2CF7F, 0x30B5FFE9,
845    0xBDBDF21C, 0xCABAC28A, 0x53B39330, 0x24B4A3A6, 0xBAD03605, 0xCDD70693, 0x54DE5729, 0x23D967BF,
846    0xB3667A2E, 0xC4614AB8, 0x5D681B02, 0x2A6F2B94, 0xB40BBE37, 0xC30C8EA1, 0x5A05DF1B, 0x2D02EF8D,
847];
848
849impl Crc32 for [u8] {
850    fn crc32(&self) -> u32 {
851        let mut crc: u32 = 0xFFFF_FFFF;
852        for byte in self {
853            crc = (crc >> 8) ^ CRC32_TABLE[((crc & 0xFF) as u8 ^ *byte) as usize];
854        }
855        crc
856    }
857}
858
859impl Crc32 for IndexEntry {
860    fn crc32(&self) -> u32 {
861        let slice = unsafe {
862            std::slice::from_raw_parts(
863                self as *const IndexEntry as *const u8,
864                // Make sure to ignore the index_crc itself for this
865                INDEX_ENTRY_SIZE - std::mem::size_of::<u32>(),
866            )
867        };
868        slice.crc32()
869    }
870}
871
872// This is the timestamp rounded down to the nearest
873// multiple of SHARD_TIME
874fn calculate_shard(timestamp: SystemTime) -> u64 {
875    let timestamp_secs = get_unix_timestamp(timestamp);
876    let shard_rem = timestamp_secs % SHARD_TIME;
877    timestamp_secs - shard_rem
878}
879
880#[cfg(test)]
881mod tests {
882    use std::time::Duration;
883
884    use paste::paste;
885    use slog::Drain;
886    use tempfile::TempDir;
887
888    use super::*;
889
890    fn get_logger() -> slog::Logger {
891        let plain = slog_term::PlainSyncDecorator::new(std::io::stderr());
892        slog::Logger::root(slog_term::FullFormat::new(plain).build().fuse(), slog::o!())
893    }
894
895    // Asserts that a and b are equal, to the resolution of one second
896    macro_rules! assert_ts {
897        ($a:expr, $b:expr) => {
898            let a_dur = $a
899                .duration_since(SystemTime::UNIX_EPOCH)
900                .expect("Timestamp earlier than UNIX EPOCH");
901            let b_dur = $b
902                .duration_since(SystemTime::UNIX_EPOCH)
903                .expect("Timestamp earlier than UNIX EPOCH");
904            assert_eq!(a_dur.as_secs(), b_dur.as_secs());
905        };
906    }
907
908    macro_rules! store_test {
909        ($name:ident, $func:ident) => {
910            paste! {
911                #[test]
912                fn [<$name _uncompressed_cbor>]() {
913                    $func(CompressionMode::None, Format::Cbor);
914                }
915            }
916
917            paste! {
918                #[test]
919                fn [<$name _compressed_cbor>]() {
920                    $func(CompressionMode::Zstd, Format::Cbor);
921                }
922            }
923
924            paste! {
925                #[test]
926                fn [<$name _dict_compressed_cbor>]() {
927                    $func(CompressionMode::ZstdDictionary(ChunkSizePo2(2)), Format::Cbor);
928                }
929            }
930        };
931    }
932
933    #[test]
934    fn writing_to_already_written_index_with_different_compression_format_works() {
935        use itertools::Itertools;
936
937        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
938        let ts = std::time::UNIX_EPOCH + Duration::from_secs(SHARD_TIME);
939
940        // States, (compression_mode, format), that we transition between when
941        // writing
942        let states = [
943            (CompressionMode::None, Format::Cbor),
944            (CompressionMode::Zstd, Format::Cbor),
945            (
946                CompressionMode::ZstdDictionary(ChunkSizePo2(0)),
947                Format::Cbor,
948            ),
949            (
950                CompressionMode::ZstdDictionary(ChunkSizePo2(1)),
951                Format::Cbor,
952            ),
953            (
954                CompressionMode::ZstdDictionary(ChunkSizePo2(2)),
955                Format::Cbor,
956            ),
957            (
958                CompressionMode::ZstdDictionary(ChunkSizePo2(3)),
959                Format::Cbor,
960            ),
961        ];
962        // State sequence that contains all possible transitions
963        let state_sequence = states
964            .iter()
965            .cartesian_product(states.iter())
966            .flat_map(|(a, b)| vec![a, b])
967            .collect::<Vec<_>>();
968
969        for (i, (compression_mode, format)) in state_sequence.iter().enumerate() {
970            let mut writer = StoreWriter::new(get_logger(), &dir, *compression_mode, *format)
971                .expect("Failed to create store");
972            let mut frame = DataFrame::default();
973            frame.sample.cgroup.memory_current = Some(i as i64);
974
975            writer
976                .put(ts + Duration::from_secs(i as u64), &frame)
977                .expect("Failed to store data");
978        }
979
980        // Test reading all the samples
981        let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
982        for (i, (_compress, _format)) in state_sequence.iter().enumerate() {
983            let frame = store_cursor
984                .get_next(
985                    &get_unix_timestamp(ts + Duration::from_secs(i as u64)),
986                    Direction::Forward,
987                )
988                .expect("Failed to read sample")
989                .expect("Did not find stored sample");
990            assert_ts!(frame.0, ts + Duration::from_secs(i as u64));
991            assert_eq!(frame.1.sample.cgroup.memory_current, Some(i as i64));
992        }
993    }
994
995    #[test]
996    fn write_index_padding() {
997        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
998        // Keep test on one shard
999        let ts = std::time::UNIX_EPOCH + Duration::from_secs(SHARD_TIME);
1000        // Write 1 frame without compression. Doesn't add padding.
1001        {
1002            let mut writer =
1003                StoreWriter::new(get_logger(), &dir, CompressionMode::None, Format::Cbor)
1004                    .expect("Failed to create store");
1005            let mut frame = DataFrame::default();
1006            for i in 0..1 {
1007                frame.sample.cgroup.memory_current = Some(i);
1008                writer
1009                    .put(ts + Duration::from_secs(i as u64), &frame)
1010                    .expect("Failed to store data");
1011            }
1012            assert_eq!(
1013                writer.index.metadata().unwrap().len(),
1014                INDEX_ENTRY_SIZE as u64
1015            );
1016        }
1017
1018        // Write 2 frames with without compression. Doesn't add padding.
1019        {
1020            let mut writer =
1021                StoreWriter::new(get_logger(), &dir, CompressionMode::None, Format::Cbor)
1022                    .expect("Failed to create store");
1023            let mut frame = DataFrame::default();
1024            for i in 1..3 {
1025                frame.sample.cgroup.memory_current = Some(i);
1026                writer
1027                    .put(ts + Duration::from_secs(i as u64), &frame)
1028                    .expect("Failed to store data");
1029            }
1030            assert_eq!(
1031                writer.index.metadata().unwrap().len(),
1032                3 * INDEX_ENTRY_SIZE as u64
1033            );
1034        }
1035
1036        // Write 2 frames with compression. Doesn't add padding.
1037        {
1038            let mut writer =
1039                StoreWriter::new(get_logger(), &dir, CompressionMode::Zstd, Format::Cbor)
1040                    .expect("Failed to create store");
1041            let mut frame = DataFrame::default();
1042            for i in 3..5 {
1043                frame.sample.cgroup.memory_current = Some(i);
1044                writer
1045                    .put(ts + Duration::from_secs(i as u64), &frame)
1046                    .expect("Failed to store data");
1047            }
1048            assert_eq!(
1049                writer.index.metadata().unwrap().len(),
1050                5 * INDEX_ENTRY_SIZE as u64
1051            );
1052        }
1053
1054        // Dict compress with chunk size of 4. Current size of 5 so
1055        // need to pad by 3.
1056        {
1057            let mut writer = StoreWriter::new(
1058                get_logger(),
1059                &dir,
1060                CompressionMode::ZstdDictionary(ChunkSizePo2(2)),
1061                Format::Cbor,
1062            )
1063            .expect("Failed to create store");
1064            let mut frame = DataFrame::default();
1065            for i in 5..13 {
1066                frame.sample.cgroup.memory_current = Some(i);
1067                writer
1068                    .put(ts + Duration::from_secs(i as u64), &frame)
1069                    .expect("Failed to store data");
1070            }
1071            assert_eq!(
1072                writer.index.metadata().unwrap().len(),
1073                16 * INDEX_ENTRY_SIZE as u64
1074            );
1075        }
1076
1077        // Dict compress with chunk size of 8. Current size of 16 so
1078        // no padding needed.
1079        {
1080            let mut writer = StoreWriter::new(
1081                get_logger(),
1082                &dir,
1083                CompressionMode::ZstdDictionary(ChunkSizePo2(3)),
1084                Format::Cbor,
1085            )
1086            .expect("Failed to create store");
1087            let mut frame = DataFrame::default();
1088            for i in 13..16 {
1089                frame.sample.cgroup.memory_current = Some(i);
1090                writer
1091                    .put(ts + Duration::from_secs(i as u64), &frame)
1092                    .expect("Failed to store data");
1093            }
1094            assert_eq!(
1095                writer.index.metadata().unwrap().len(),
1096                19 * INDEX_ENTRY_SIZE as u64
1097            );
1098        }
1099
1100        let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1101        for i in 0..16 {
1102            let frame = store_cursor
1103                .get_next(
1104                    &get_unix_timestamp(ts + Duration::from_secs(i as u64)),
1105                    Direction::Forward,
1106                )
1107                .expect("Failed to read sample")
1108                .expect("Did not find stored sample");
1109            assert_ts!(frame.0, ts + Duration::from_secs(i as u64));
1110            assert_eq!(frame.1.sample.cgroup.memory_current, Some(i));
1111        }
1112    }
1113
1114    store_test!(create_writer, _create_writer);
1115    fn _create_writer(compression_mode: CompressionMode, format: Format) {
1116        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1117        StoreWriter::new(get_logger(), &dir, compression_mode, format)
1118            .expect("Failed to create store");
1119    }
1120
1121    store_test!(simple_put_read, _simple_put_read);
1122    fn _simple_put_read(compression_mode: CompressionMode, format: Format) {
1123        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1124        let ts = SystemTime::now();
1125        {
1126            let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1127                .expect("Failed to create store");
1128            let mut frame = DataFrame::default();
1129            frame.sample.cgroup.memory_current = Some(333);
1130
1131            writer.put(ts, &frame).expect("Failed to store data");
1132        }
1133
1134        let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1135        let frame = store_cursor
1136            .get_next(&get_unix_timestamp(ts), Direction::Forward)
1137            .expect("Failed to read sample")
1138            .expect("Did not find stored sample");
1139        assert_ts!(frame.0, ts);
1140        assert_eq!(frame.1.sample.cgroup.memory_current, Some(333));
1141    }
1142
1143    store_test!(simple_put_read_10, _simple_put_read_10);
1144    fn _simple_put_read_10(compression_mode: CompressionMode, format: Format) {
1145        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1146        // Keep test on one shard
1147        let ts = std::time::UNIX_EPOCH + Duration::from_secs(SHARD_TIME);
1148        {
1149            let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1150                .expect("Failed to create store");
1151            let mut frame = DataFrame::default();
1152            for i in 0..10 {
1153                frame.sample.cgroup.memory_current = Some(i);
1154                writer
1155                    .put(ts + Duration::from_secs(i as u64), &frame)
1156                    .expect("Failed to store data");
1157            }
1158        }
1159
1160        let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1161        for i in 0..10 {
1162            let frame = store_cursor
1163                .get_next(
1164                    &get_unix_timestamp(ts + Duration::from_secs(i as u64)),
1165                    Direction::Forward,
1166                )
1167                .expect("Failed to read sample")
1168                .expect("Did not find stored sample");
1169            assert_ts!(frame.0, ts + Duration::from_secs(i as u64));
1170            assert_eq!(frame.1.sample.cgroup.memory_current, Some(i));
1171        }
1172    }
1173
1174    store_test!(put_new_shard, _put_new_shard);
1175    fn _put_new_shard(compression_mode: CompressionMode, format: Format) {
1176        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1177        let now = SystemTime::now();
1178        // Ensure that the follow writes (within 60s) are to the same shard
1179        let ts = if calculate_shard(now) == calculate_shard(now + Duration::from_secs(60)) {
1180            now
1181        } else {
1182            now + Duration::from_secs(60)
1183        };
1184
1185        {
1186            let mut writer =
1187                StoreWriter::new_with_timestamp(get_logger(), &dir, ts, compression_mode, format)
1188                    .expect("Failed to create store");
1189            let mut frame = DataFrame::default();
1190            frame.sample.cgroup.memory_current = Some(111);
1191
1192            // New StoreWriter, but we're not switching to new shard
1193            assert!(!writer.put(ts, &frame).expect("Failed to store data"));
1194
1195            frame.sample.cgroup.memory_current = Some(222);
1196
1197            // No new shard
1198            assert!(
1199                !writer
1200                    .put(ts + Duration::from_secs(1), &frame)
1201                    .expect("Failed to store data")
1202            );
1203
1204            frame.sample.cgroup.memory_current = Some(333);
1205
1206            // New shard
1207            assert!(
1208                writer
1209                    .put(ts + Duration::from_secs(SHARD_TIME), &frame)
1210                    .expect("Failed to store data")
1211            );
1212        }
1213
1214        {
1215            let mut writer = StoreWriter::new_with_timestamp(
1216                get_logger(),
1217                &dir,
1218                ts + Duration::from_secs(SHARD_TIME + 1),
1219                compression_mode,
1220                format,
1221            )
1222            .expect("Failed to create store");
1223            let mut frame = DataFrame::default();
1224            frame.sample.cgroup.memory_current = Some(444);
1225
1226            // New StoreWriter but writing to existing shard
1227            assert!(
1228                !writer
1229                    .put(ts + Duration::from_secs(SHARD_TIME + 1), &frame,)
1230                    .expect("Failed to store data")
1231            );
1232        }
1233
1234        let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1235        let frame = store_cursor
1236            .get_next(&get_unix_timestamp(ts), Direction::Forward)
1237            .expect("Failed to read sample")
1238            .expect("Did not find stored sample");
1239        assert_ts!(frame.0, ts);
1240        assert_eq!(frame.1.sample.cgroup.memory_current, Some(111));
1241
1242        let frame = store_cursor
1243            .get_next(
1244                &get_unix_timestamp(ts + Duration::from_secs(1)),
1245                Direction::Forward,
1246            )
1247            .expect("Failed to read sample")
1248            .expect("Did not find stored sample");
1249        assert_ts!(frame.0, ts + Duration::from_secs(1));
1250        assert_eq!(frame.1.sample.cgroup.memory_current, Some(222));
1251
1252        let frame = store_cursor
1253            .get_next(
1254                &get_unix_timestamp(ts + Duration::from_secs(SHARD_TIME)),
1255                Direction::Forward,
1256            )
1257            .expect("Failed to read sample")
1258            .expect("Did not find stored sample");
1259        assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME));
1260        assert_eq!(frame.1.sample.cgroup.memory_current, Some(333));
1261
1262        let frame = store_cursor
1263            .get_next(
1264                &get_unix_timestamp(ts + Duration::from_secs(SHARD_TIME + 1)),
1265                Direction::Forward,
1266            )
1267            .expect("Failed to read sample")
1268            .expect("Did not find stored sample");
1269        assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME + 1));
1270        assert_eq!(frame.1.sample.cgroup.memory_current, Some(444));
1271    }
1272
1273    store_test!(put_read_corrupt_data, _put_read_corrupt_data);
1274    fn _put_read_corrupt_data(compression_mode: CompressionMode, format: Format) {
1275        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1276        let ts = SystemTime::now();
1277        let ts_next = ts + Duration::from_secs(1);
1278        {
1279            let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1280                .expect("Failed to create store");
1281            let mut frame = DataFrame::default();
1282            frame.sample.cgroup.memory_current = Some(333);
1283
1284            writer.put(ts, &frame).expect("Failed to store data");
1285
1286            // Inject an extra byte to corrupt data file
1287            for entry in fs::read_dir(&dir).expect("Failed to read dir") {
1288                let entry = entry.expect("Failed to list entry");
1289                if let Some(name) = entry.path().file_name() {
1290                    if name.to_string_lossy().starts_with("data_") {
1291                        OpenOptions::new()
1292                            .append(true)
1293                            .open(entry.path())
1294                            .expect("Failed to open data file")
1295                            .write_all(&[0])
1296                            .expect("Failed to write to data file");
1297                    }
1298                }
1299            }
1300
1301            frame.sample.cgroup.memory_current = Some(222);
1302
1303            // Write a second sample after the faulty byte
1304            writer.put(ts_next, &frame).expect("Failed to store data");
1305        }
1306
1307        let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1308        let frame = store_cursor
1309            .get_next(&get_unix_timestamp(ts), Direction::Forward)
1310            .expect("Failed to read sample")
1311            .expect("Did not find stored sample");
1312        assert_ts!(frame.0, ts);
1313        assert_eq!(frame.1.sample.cgroup.memory_current, Some(333));
1314
1315        let frame = store_cursor
1316            .get_next(&get_unix_timestamp(ts_next), Direction::Forward)
1317            .expect("Failed to read sample")
1318            .expect("Did not find stored sample");
1319        assert_ts!(frame.0, ts_next);
1320        assert_eq!(frame.1.sample.cgroup.memory_current, Some(222));
1321    }
1322
1323    store_test!(
1324        read_past_the_end_returns_none,
1325        _read_past_the_end_returns_none
1326    );
1327    fn _read_past_the_end_returns_none(compression_mode: CompressionMode, format: Format) {
1328        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1329        let ts = SystemTime::now();
1330        {
1331            let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1332                .expect("Failed to create store");
1333            let mut frame = DataFrame::default();
1334            frame.sample.cgroup.memory_current = Some(333);
1335
1336            writer.put(ts, &frame).expect("Failed to store data");
1337        }
1338
1339        let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1340        let frame_opt = store_cursor
1341            .get_next(
1342                &get_unix_timestamp(ts + Duration::from_secs(1)),
1343                Direction::Forward,
1344            )
1345            .expect("Failed to read sample");
1346        assert_eq!(frame_opt, None);
1347    }
1348
1349    store_test!(read_iterates_appropriately, _read_iterates_appropriately);
1350    fn _read_iterates_appropriately(compression_mode: CompressionMode, format: Format) {
1351        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1352        let ts = SystemTime::now();
1353        {
1354            let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1355                .expect("Failed to create store");
1356            let mut frame = DataFrame::default();
1357            frame.sample.cgroup.memory_current = Some(333);
1358
1359            writer.put(ts, &frame).expect("Failed to store data");
1360
1361            frame.sample.cgroup.memory_current = Some(666);
1362            writer
1363                .put(ts + Duration::from_secs(5), &frame)
1364                .expect("Failed to store data");
1365        }
1366
1367        let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1368        let frame = store_cursor
1369            .get_next(
1370                &get_unix_timestamp(ts + Duration::from_secs(3)),
1371                Direction::Forward,
1372            )
1373            .expect("Failed to read sample")
1374            .expect("Did not find stored sample");
1375        assert_ts!(frame.0, ts + Duration::from_secs(5));
1376        assert_eq!(frame.1.sample.cgroup.memory_current, Some(666));
1377    }
1378
1379    store_test!(
1380        put_and_read_work_across_shards,
1381        _put_and_read_work_across_shards
1382    );
1383    fn _put_and_read_work_across_shards(compression_mode: CompressionMode, format: Format) {
1384        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1385        let ts = SystemTime::now();
1386        {
1387            let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1388                .expect("Failed to create store");
1389            let mut frame = DataFrame::default();
1390            frame.sample.cgroup.memory_current = Some(333);
1391
1392            writer.put(ts, &frame).expect("Failed to store data");
1393
1394            frame.sample.cgroup.memory_current = Some(666);
1395            writer
1396                .put(ts + Duration::from_secs(SHARD_TIME), &frame)
1397                .expect("Failed to store data");
1398        }
1399
1400        let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1401        let frame = store_cursor
1402            .get_next(
1403                &get_unix_timestamp(ts + Duration::from_secs(1)),
1404                Direction::Forward,
1405            )
1406            .expect("Failed to read sample")
1407            .expect("Did not find stored sample");
1408        assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME));
1409        assert_eq!(frame.1.sample.cgroup.memory_current, Some(666));
1410    }
1411
1412    store_test!(read_reverse, _read_reverse);
1413    fn _read_reverse(compression_mode: CompressionMode, format: Format) {
1414        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1415        let ts = SystemTime::now();
1416        {
1417            let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1418                .expect("Failed to create store");
1419            let mut frame = DataFrame::default();
1420            frame.sample.cgroup.memory_current = Some(333);
1421
1422            writer.put(ts, &frame).expect("Failed to store data");
1423        }
1424
1425        let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1426        let frame = store_cursor
1427            .get_next(&get_unix_timestamp(ts), Direction::Reverse)
1428            .expect("Failed to read sample")
1429            .expect("Did not find stored sample");
1430        assert_ts!(frame.0, ts);
1431        assert_eq!(frame.1.sample.cgroup.memory_current, Some(333));
1432    }
1433
1434    store_test!(read_reverse_across_shards, _read_reverse_across_shards);
1435    fn _read_reverse_across_shards(compression_mode: CompressionMode, format: Format) {
1436        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1437        let ts = SystemTime::now();
1438        {
1439            let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1440                .expect("Failed to create store");
1441            let mut frame = DataFrame::default();
1442            frame.sample.cgroup.memory_current = Some(333);
1443
1444            writer.put(ts, &frame).expect("Failed to store data");
1445
1446            frame.sample.cgroup.memory_current = Some(666);
1447            writer
1448                .put(ts + Duration::from_secs(SHARD_TIME), &frame)
1449                .expect("Failed to store data");
1450        }
1451
1452        let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1453        let frame = store_cursor
1454            .get_next(
1455                &get_unix_timestamp(ts + Duration::from_secs(SHARD_TIME) - Duration::from_secs(1)),
1456                Direction::Reverse,
1457            )
1458            .expect("Failed to read sample")
1459            .expect("Did not find stored sample");
1460        assert_ts!(frame.0, ts);
1461        assert_eq!(frame.1.sample.cgroup.memory_current, Some(333));
1462    }
1463
1464    store_test!(discard_earlier, _discard_earlier);
1465    fn _discard_earlier(compression_mode: CompressionMode, format: Format) {
1466        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1467        let ts = std::time::UNIX_EPOCH + Duration::from_secs(SHARD_TIME);
1468        {
1469            let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1470                .expect("Failed to create store");
1471            let mut frame = DataFrame::default();
1472            frame.sample.cgroup.memory_current = Some(333);
1473
1474            writer.put(ts, &frame).expect("Failed to store data");
1475
1476            frame.sample.cgroup.memory_current = Some(666);
1477            writer
1478                .put(ts + Duration::from_secs(1), &frame)
1479                .expect("Failed to store data");
1480
1481            frame.sample.cgroup.memory_current = Some(777);
1482            writer
1483                .put(ts + Duration::from_secs(SHARD_TIME), &frame)
1484                .expect("Failed to store data");
1485
1486            frame.sample.cgroup.memory_current = Some(888);
1487            writer
1488                .put(ts + Duration::from_secs(SHARD_TIME + 1), &frame)
1489                .expect("Failed to store data");
1490
1491            writer
1492                .discard_earlier(ts + Duration::from_secs(SHARD_TIME + 1))
1493                .expect("Failed to discard data");
1494        }
1495
1496        let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1497        let frame = store_cursor
1498            .get_next(&get_unix_timestamp(ts), Direction::Forward)
1499            .expect("Failed to read sample")
1500            .expect("Did not find stored sample");
1501        assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME));
1502        assert_eq!(frame.1.sample.cgroup.memory_current, Some(777));
1503    }
1504
1505    store_test!(try_discard_until_size, _try_discard_until_size);
1506    fn _try_discard_until_size(compression_mode: CompressionMode, format: Format) {
1507        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1508        let dir_path_buf = dir.path().to_path_buf();
1509        let ts = std::time::UNIX_EPOCH + Duration::from_secs(SHARD_TIME);
1510        let mut shard_sizes = Vec::new();
1511        let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1512            .expect("Failed to create store");
1513
1514        // Write n samples from timestamp 1 seconds apart, returning size
1515        // increase of the store directory.
1516        let mut write = |timestamp: SystemTime, n: u64| -> u64 {
1517            let dir_size = get_dir_size(dir_path_buf.clone());
1518            let mut frame = DataFrame::default();
1519            for i in 0..n {
1520                frame.sample.cgroup.memory_current = Some(n as i64 + i as i64);
1521                writer
1522                    .put(timestamp + Duration::from_secs(i), &frame)
1523                    .expect("Failed to store data");
1524            }
1525            let dir_size_after = get_dir_size(dir_path_buf.clone());
1526            assert!(
1527                dir_size_after > dir_size,
1528                "Directory size did not increase. before: {} after: {}: n_samples {}",
1529                dir_size,
1530                dir_size_after,
1531                n,
1532            );
1533            dir_size_after - dir_size
1534        };
1535
1536        let num_shards = 7;
1537        for i in 0..num_shards {
1538            shard_sizes.push(write(ts + Duration::from_secs(SHARD_TIME * i), i + 1));
1539        }
1540        let total_size = shard_sizes.iter().sum::<u64>();
1541
1542        // In the following tests, we use new instances of StoreCursor so that
1543        // it doesn't continue using the mmap of current files.
1544        {
1545            // Nothing is discarded
1546            let target_size = total_size;
1547            assert!(
1548                writer
1549                    .try_discard_until_size(target_size)
1550                    .expect("Failed to discard data")
1551            );
1552            let frame = StoreCursor::new(get_logger(), dir.path().to_path_buf())
1553                .get_next(&get_unix_timestamp(ts), Direction::Forward)
1554                .expect("Failed to read sample")
1555                .expect("Did not find stored sample");
1556            assert_ts!(frame.0, ts);
1557            assert_eq!(frame.1.sample.cgroup.memory_current, Some(1));
1558        }
1559
1560        {
1561            // Delete first shard
1562            let target_size = total_size - 1;
1563            assert!(
1564                writer
1565                    .try_discard_until_size(target_size)
1566                    .expect("Failed to discard data")
1567            );
1568            let frame = StoreCursor::new(get_logger(), dir.path().to_path_buf())
1569                .get_next(&get_unix_timestamp(ts), Direction::Forward)
1570                .expect("Failed to read sample")
1571                .expect("Did not find stored sample");
1572            // assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME));
1573            assert_eq!(frame.1.sample.cgroup.memory_current, Some(2));
1574        }
1575
1576        {
1577            // Delete second and third shards
1578            let target_size = total_size - (shard_sizes[0] + shard_sizes[1] + shard_sizes[2]);
1579            assert!(
1580                writer
1581                    .try_discard_until_size(target_size)
1582                    .expect("Failed to discard data")
1583            );
1584            let frame = StoreCursor::new(get_logger(), dir.path().to_path_buf())
1585                .get_next(&get_unix_timestamp(ts), Direction::Forward)
1586                .expect("Failed to read sample")
1587                .expect("Did not find stored sample");
1588            assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME * 3));
1589            assert_eq!(frame.1.sample.cgroup.memory_current, Some(4));
1590        }
1591
1592        {
1593            // Delete fourth and fifth shards, with a target directory size
1594            // slightly greater than the resulting size directory size
1595            let target_size = total_size - (shard_sizes[0] + shard_sizes[1] + shard_sizes[2] +
1596                shard_sizes[3] + shard_sizes[4])
1597            + /* smaller than a shard */ 1;
1598            assert!(
1599                writer
1600                    .try_discard_until_size(target_size)
1601                    .expect("Failed to discard data")
1602            );
1603            let frame = StoreCursor::new(get_logger(), dir.path().to_path_buf())
1604                .get_next(&get_unix_timestamp(ts), Direction::Forward)
1605                .expect("Failed to read sample")
1606                .expect("Did not find stored sample");
1607            assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME * 5));
1608            assert_eq!(frame.1.sample.cgroup.memory_current, Some(6));
1609        }
1610
1611        {
1612            // Delete until size is 1. Verify that the current shard remains
1613            // (i.e. size > 1).
1614            assert!(
1615                !writer
1616                    .try_discard_until_size(1)
1617                    .expect("Failed to discard data"),
1618            );
1619            let frame = StoreCursor::new(get_logger(), dir.path().to_path_buf())
1620                .get_next(&get_unix_timestamp(ts), Direction::Forward)
1621                .expect("Failed to read sample")
1622                .expect("Did not find stored sample");
1623            assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME) * 6);
1624            assert_eq!(frame.1.sample.cgroup.memory_current, Some(7));
1625        }
1626    }
1627
1628    store_test!(flock_protects, _flock_protects);
1629    fn _flock_protects(compression_mode: CompressionMode, format: Format) {
1630        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1631        let ts = SystemTime::now();
1632        let shard = calculate_shard(ts);
1633        let mut index_path = dir.path().to_path_buf();
1634        index_path.push(format!("index_{:011}", shard));
1635        let index = OpenOptions::new()
1636            .append(true)
1637            .create(true)
1638            .open(index_path.as_path())
1639            .expect("Failed to create index file");
1640        nix::fcntl::flock(
1641            index.as_raw_fd(),
1642            nix::fcntl::FlockArg::LockExclusiveNonblock,
1643        )
1644        .expect("Failed to acquire flock on index file");
1645
1646        assert!(
1647            StoreWriter::new(get_logger(), &dir, compression_mode, format).is_err(),
1648            "Did not conflict on index lock"
1649        );
1650    }
1651
1652    store_test!(
1653        writing_to_already_written_index_works,
1654        _writing_to_already_written_index_works
1655    );
1656    fn _writing_to_already_written_index_works(compression_mode: CompressionMode, format: Format) {
1657        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1658        let ts = std::time::UNIX_EPOCH + Duration::from_secs(SHARD_TIME);
1659        {
1660            let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1661                .expect("Failed to create store");
1662            let mut frame = DataFrame::default();
1663            frame.sample.cgroup.memory_current = Some(333);
1664
1665            writer.put(ts, &frame).expect("Failed to store data");
1666        }
1667        {
1668            let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1669                .expect("Failed to create store");
1670            let mut frame = DataFrame::default();
1671            frame.sample.cgroup.memory_current = Some(666);
1672            writer
1673                .put(ts + Duration::from_secs(5), &frame)
1674                .expect("Failed to store data");
1675        }
1676
1677        let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1678        let frame = store_cursor
1679            .get_next(&get_unix_timestamp(ts), Direction::Forward)
1680            .expect("Failed to read sample")
1681            .expect("Did not find stored sample");
1682        assert_ts!(frame.0, ts);
1683        assert_eq!(frame.1.sample.cgroup.memory_current, Some(333));
1684
1685        let frame = store_cursor
1686            .get_next(
1687                &get_unix_timestamp(ts + Duration::from_secs(1)),
1688                Direction::Forward,
1689            )
1690            .expect("Failed to read sample")
1691            .expect("Did not find stored sample");
1692        assert_ts!(frame.0, ts + Duration::from_secs(5));
1693        assert_eq!(frame.1.sample.cgroup.memory_current, Some(666));
1694    }
1695
1696    store_test!(
1697        read_skips_over_corrupt_index_entry,
1698        _read_skips_over_corrupt_index_entry
1699    );
1700    fn _read_skips_over_corrupt_index_entry(compression_mode: CompressionMode, format: Format) {
1701        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1702        let ts = std::time::UNIX_EPOCH + Duration::from_secs(SHARD_TIME);
1703        {
1704            let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1705                .expect("Failed to create store");
1706            let mut frame = DataFrame::default();
1707            frame.sample.cgroup.memory_current = Some(333);
1708
1709            writer.put(ts, &frame).expect("Failed to store data");
1710        }
1711        // Append garbage to the index entry
1712        {
1713            let shard = calculate_shard(ts);
1714            let mut index_path = dir.path().to_path_buf();
1715            index_path.push(format!("index_{:011}", shard));
1716            let mut index = OpenOptions::new()
1717                .append(true)
1718                .create(true)
1719                .open(index_path.as_path())
1720                .expect("Failed to create index file");
1721            index
1722                .write_all(b"This is complete garbage data that is longer than an entry")
1723                .expect("Failed to append to index");
1724        }
1725        {
1726            let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1727                .expect("Failed to create store");
1728            let mut frame = DataFrame::default();
1729            frame.sample.cgroup.memory_current = Some(666);
1730            writer
1731                .put(ts + Duration::from_secs(5), &frame)
1732                .expect("Failed to store data");
1733        }
1734
1735        let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1736        let frame = store_cursor
1737            .get_next(
1738                &get_unix_timestamp(ts + Duration::from_secs(1)),
1739                Direction::Forward,
1740            )
1741            .expect("Failed to read sample")
1742            .expect("Did not find stored sample");
1743        assert_ts!(frame.0, ts + Duration::from_secs(5));
1744        assert_eq!(frame.1.sample.cgroup.memory_current, Some(666));
1745    }
1746
1747    store_test!(writer_creates_directory, _writer_creates_directory);
1748    fn _writer_creates_directory(compression_mode: CompressionMode, format: Format) {
1749        let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1750        let mut subdir = dir.path().to_path_buf();
1751        subdir.push("foo");
1752        let ts = SystemTime::now();
1753        {
1754            let mut writer = StoreWriter::new(get_logger(), &subdir, compression_mode, format)
1755                .expect("Failed to create store");
1756            let mut frame = DataFrame::default();
1757            frame.sample.cgroup.memory_current = Some(333);
1758
1759            writer.put(ts, &frame).expect("Failed to store data");
1760        }
1761
1762        let mut store_cursor = StoreCursor::new(get_logger(), subdir);
1763        let frame = store_cursor
1764            .get_next(&get_unix_timestamp(ts), Direction::Forward)
1765            .expect("Failed to read sample")
1766            .expect("Did not find stored sample");
1767        assert_ts!(frame.0, ts);
1768        assert_eq!(frame.1.sample.cgroup.memory_current, Some(333));
1769    }
1770}