1#![deny(clippy::all)]
16
17use std::fs;
18use std::fs::File;
19use std::fs::OpenOptions;
20use std::io::ErrorKind;
21use std::io::Write;
22use std::os::unix::io::AsRawFd;
23use std::path::Path;
24use std::path::PathBuf;
25use std::time::SystemTime;
26
27use anyhow::bail;
28use anyhow::Context;
29use anyhow::Result;
30use bitflags::bitflags;
31use common::fileutil::get_dir_size;
32use common::open_source_shim;
33use common::util::get_unix_timestamp;
34use model::Model;
35use serde::Deserialize;
36use serde::Serialize;
37use slog::info;
38use slog::warn;
39use static_assertions::const_assert_eq;
40
41use crate::compression::Compressor;
42use crate::cursor::KeyedCursor;
43use crate::cursor::StoreCursor;
44
45pub mod advance;
46pub mod compression;
47pub mod cursor;
48#[cfg(test)]
49mod test;
50
51pub type Advance = advance::Advance<DataFrame, Model>;
52
53open_source_shim!();
54
55#[derive(Default, Clone, PartialEq, Debug, Serialize, Deserialize)]
84pub struct DataFrame {
85 pub sample: model::Sample,
86}
87
88const SHARD_TIME: u64 = 24 * 60 * 60;
89
90const CHUNK_COMPRESS_SHIFT: u32 = 2;
93
94const MAX_CHUNK_COMPRESS_SIZE_PO2: u32 = 0x0F;
95pub const MAX_CHUNK_COMPRESS_SIZE: u32 = 1 << MAX_CHUNK_COMPRESS_SIZE_PO2;
96const_assert_eq!(MAX_CHUNK_COMPRESS_SIZE, 32768);
97
98bitflags! {
99 #[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Clone, Copy)]
100 struct IndexEntryFlags: u32 {
101 const COMPRESSED = 0x1;
103 const CBOR = 0x2;
107 const CHUNK_COMPRESS_SIZE_PO2 = MAX_CHUNK_COMPRESS_SIZE_PO2 << CHUNK_COMPRESS_SHIFT;
123 }
124}
125
126impl IndexEntryFlags {
127 fn get_chunk_compress_size_po2(&self) -> u32 {
128 (self.bits() & Self::CHUNK_COMPRESS_SIZE_PO2.bits()) >> CHUNK_COMPRESS_SHIFT
129 }
130
131 fn set_chunk_compress_size_po2(&mut self, chunk_compress_size_po2: u32) -> Result<()> {
132 if chunk_compress_size_po2 > MAX_CHUNK_COMPRESS_SIZE_PO2 {
133 bail!(
134 "Chunk compress size po2 should be less than or equal to {}",
135 MAX_CHUNK_COMPRESS_SIZE_PO2
136 );
137 }
138 *self |= IndexEntryFlags::from_bits_retain(chunk_compress_size_po2 << CHUNK_COMPRESS_SHIFT);
139 Ok(())
140 }
141}
142
143#[repr(C)]
144struct IndexEntry {
145 timestamp: u64,
147 offset: u64,
149 len: u32,
151 flags: IndexEntryFlags,
153 data_crc: u32,
155 index_crc: u32,
157}
158
159const INDEX_ENTRY_SIZE: usize = std::mem::size_of::<IndexEntry>();
160const INDEX_ENTRY_SIZE_PO2: u32 = INDEX_ENTRY_SIZE.trailing_zeros();
161const_assert_eq!(INDEX_ENTRY_SIZE, 32);
162
163#[derive(Copy, Clone, Debug)]
164pub struct ChunkSizePo2(pub u32);
165
166#[derive(Copy, Clone, Debug)]
167pub enum CompressionMode {
168 None,
169 Zstd,
170 ZstdDictionary(ChunkSizePo2),
171}
172
173pub struct StoreWriter {
179 logger: slog::Logger,
180 dir: PathBuf,
182 index: File,
185 data: File,
188 data_len: u64,
191 shard: u64,
193 compressor: Option<Compressor>,
196 compression_mode: CompressionMode,
199 format: Format,
201}
202
203fn get_index_files(path: &Path) -> Result<Vec<String>> {
206 let mut entries = fs::read_dir(path)
207 .with_context(|| format!("Failed to read directory {}", path.display()))?
208 .filter_map(|res_ent| {
209 res_ent
210 .map(|ent| {
211 ent.file_name()
212 .to_str()
213 .filter(|s| s.starts_with("index"))
214 .map(|s| s.to_string())
215 })
216 .transpose()
217 })
218 .collect::<Result<Vec<_>, std::io::Error>>()
219 .with_context(|| format!("Failed to read directory entries in {}", path.display()))?;
220
221 entries.sort_unstable();
222 Ok(entries)
223}
224
225enum SerializedFrame<'a> {
226 Owned(bytes::Bytes),
227 Borrowed(&'a [u8]),
228}
229
230impl AsRef<[u8]> for SerializedFrame<'_> {
231 fn as_ref(&self) -> &[u8] {
232 match self {
233 SerializedFrame::Owned(b) => b.as_ref(),
234 SerializedFrame::Borrowed(s) => s,
235 }
236 }
237}
238
239impl SerializedFrame<'_> {
240 fn into_owned(self) -> bytes::Bytes {
241 match self {
242 SerializedFrame::Owned(b) => b,
243 SerializedFrame::Borrowed(s) => bytes::Bytes::copy_from_slice(s),
244 }
245 }
246}
247
248#[derive(Copy, Clone, Debug)]
250pub enum Format {
251 Cbor,
252}
253
254fn serialize_frame(data: &DataFrame, format: Format) -> Result<bytes::Bytes> {
256 match format {
257 Format::Cbor => {
258 let bytes = serde_cbor::to_vec(data)?;
259 Ok(bytes::Bytes::from(bytes))
260 }
261 }
262}
263
264fn deserialize_frame(bytes: &[u8], format: Format) -> Result<DataFrame> {
266 match format {
267 Format::Cbor => {
268 let data_frame = serde_cbor::from_slice(bytes)?;
269 Ok(data_frame)
270 }
271 }
272}
273
274impl StoreWriter {
275 pub fn new<P: AsRef<Path>>(
281 logger: slog::Logger,
282 path: P,
283 compression_mode: CompressionMode,
284 format: Format,
285 ) -> Result<Self> {
286 Self::new_with_timestamp(logger, path, SystemTime::now(), compression_mode, format)
287 }
288
289 pub fn new_with_timestamp<P: AsRef<Path>>(
290 logger: slog::Logger,
291 path: P,
292 timestamp: SystemTime,
293 compression_mode: CompressionMode,
294 format: Format,
295 ) -> Result<Self> {
296 let shard = calculate_shard(timestamp);
297 Self::new_with_shard(logger, path, shard, compression_mode, format)
298 }
299
300 fn new_with_shard<P: AsRef<Path>>(
301 logger: slog::Logger,
302 path: P,
303 shard: u64,
304 compression_mode: CompressionMode,
305 format: Format,
306 ) -> Result<Self> {
307 if !path.as_ref().is_dir() {
308 std::fs::create_dir(&path).with_context(|| {
309 format!("Failed to create store path: {}", path.as_ref().display())
310 })?;
311 }
312
313 let (data_path, index_path) = {
314 let mut data_path = path.as_ref().to_path_buf();
315 let mut index_path = data_path.clone();
316 data_path.push(format!("data_{:011}", shard));
317 index_path.push(format!("index_{:011}", shard));
318 (data_path, index_path)
319 };
320
321 let index = OpenOptions::new()
322 .append(true)
323 .create(true)
324 .open(index_path.as_path())
325 .with_context(|| format!("Failed to open index file: {}", index_path.display()))?;
326 nix::fcntl::flock(
327 index.as_raw_fd(),
328 nix::fcntl::FlockArg::LockExclusiveNonblock,
329 )
330 .with_context(|| {
331 format!(
332 "Failed to acquire file lock on index file: {}",
333 index_path.display(),
334 )
335 })?;
336
337 let data = OpenOptions::new()
338 .append(true)
339 .create(true)
340 .open(data_path.as_path())
341 .with_context(|| format!("Failed to open data file: {}", data_path.display()))?;
342 nix::fcntl::flock(
343 data.as_raw_fd(),
344 nix::fcntl::FlockArg::LockExclusiveNonblock,
345 )
346 .with_context(|| {
347 format!(
348 "Failed to acquire file lock on data file: {}",
349 data_path.display(),
350 )
351 })?;
352
353 let data_len = data
354 .metadata()
355 .with_context(|| {
356 format!(
357 "Failed to get metadata of data file: {}",
358 data_path.display()
359 )
360 })?
361 .len();
362
363 Ok(StoreWriter {
364 logger,
365 dir: path.as_ref().to_path_buf(),
366 index,
367 data,
368 data_len,
369 shard,
370 compressor: None,
372 compression_mode,
373 format,
374 })
375 }
376
377 fn pad_and_get_index_len(index: &mut File, alignment_po2: u32) -> Result<(u64, u64)> {
384 let index_len = index
385 .metadata()
386 .context("Failed to get metadata of index file")?
387 .len();
388 let alignment_mask = (1 << alignment_po2) - 1;
389 let aligned_len = (index_len + alignment_mask) & !alignment_mask;
390 if aligned_len != index_len {
391 index
392 .set_len(aligned_len)
393 .context("Failed to pad index file")?;
394 }
397 Ok((index_len, aligned_len))
398 }
399
400 fn get_bytes_and_flags_for_frame(
408 &self,
409 data_frame: &DataFrame,
410 compressor: &mut Option<Compressor>,
411 is_key_frame: bool,
412 ) -> Result<(bytes::Bytes, IndexEntryFlags)> {
413 let mut flags = match self.format {
414 Format::Cbor => IndexEntryFlags::CBOR,
415 };
416 let frame_bytes =
418 serialize_frame(data_frame, self.format).context("Failed to serialize data frame")?;
419 let serialized = match self.compression_mode {
420 CompressionMode::None => frame_bytes,
421 CompressionMode::Zstd => {
422 flags |= IndexEntryFlags::COMPRESSED;
423 compressor
424 .get_or_insert_with(Compressor::new)
425 .compress_with_dict_reset(&frame_bytes)
426 .context("Failed to compress data")?
427 }
428 CompressionMode::ZstdDictionary(ChunkSizePo2(chunk_size_po2)) => {
429 flags |= IndexEntryFlags::COMPRESSED;
430 flags
431 .set_chunk_compress_size_po2(chunk_size_po2)
432 .expect("bug: invalid chunk compress size");
433 let compressor = compressor.get_or_insert_with(Compressor::new);
434 if is_key_frame {
435 let serialized = compressor
436 .compress_with_dict_reset(&frame_bytes)
437 .context("Failed to compress key frame")?;
438 compressor
439 .load_dict(&frame_bytes)
440 .context("Failed to set key frame as dict")?;
441 serialized
442 } else {
443 compressor
444 .compress_with_loaded_dict(&frame_bytes)
445 .context("Failed to compress data frame")?
446 }
447 }
448 };
449 Ok((serialized, flags))
450 }
451
452 fn put_in_current_shard(&mut self, timestamp: SystemTime, data: &DataFrame) -> Result<()> {
456 let shard = calculate_shard(timestamp);
457 if shard != self.shard {
458 panic!("Can't write data to shard as it belongs to different shard")
459 }
460
461 let chunk_alignment_po2 =
463 if let CompressionMode::ZstdDictionary(ChunkSizePo2(chunk_size_po2)) =
464 self.compression_mode
465 {
466 chunk_size_po2 + INDEX_ENTRY_SIZE_PO2
469 } else {
470 0
471 };
472 let alignment_po2 = if chunk_alignment_po2 != 0 && self.compressor.is_none() {
476 chunk_alignment_po2
477 } else {
478 INDEX_ENTRY_SIZE_PO2
479 };
480 let (index_len, aligned_len) = Self::pad_and_get_index_len(&mut self.index, alignment_po2)
481 .with_context(|| {
482 format!(
483 "Failed to get index length and possibly pad index file: index_{:011}",
484 shard
485 )
486 })?;
487 if index_len != aligned_len {
488 if alignment_po2 == INDEX_ENTRY_SIZE_PO2 {
489 warn!(
490 self.logger,
491 "Index length not a multiple of fixed index entry size: {}. Padded to size: {}",
492 index_len,
493 aligned_len,
494 );
495 } else if alignment_po2 == chunk_alignment_po2 {
496 info!(
498 self.logger,
499 "Padded index so that first entry of block is aligned. Previous len: {}. New len: {}",
500 index_len,
501 aligned_len,
502 );
503 } else {
504 panic!("Unexpected alignment_po2 value");
505 }
506 }
507
508 let mut compressor = self.compressor.take();
513 let is_key_frame =
516 chunk_alignment_po2 != 0 && aligned_len.trailing_zeros() >= chunk_alignment_po2;
517 let (serialized, flags) = self
518 .get_bytes_and_flags_for_frame(data, &mut compressor, is_key_frame)
519 .context("Failed to get serialized frame and flags")?;
520
521 let data_len = self
527 .data
528 .metadata()
529 .with_context(|| {
530 format!(
531 "Failed to get metadata of data file: data_{:011}",
532 self.shard
533 )
534 })?
535 .len();
536 if self.data_len != data_len {
538 warn!(
539 self.logger,
540 "Data length mismatch: {} (expect {})", data_len, self.data_len
541 );
542 self.data_len = data_len;
543 }
544
545 let offset = self.data_len;
546 self.data
551 .write_all(&serialized)
552 .context("Failed to write entry to data file")?;
553 self.data_len += serialized.len() as u64;
554 let data_crc = serialized.crc32();
555
556 let mut index_entry = IndexEntry {
557 timestamp: get_unix_timestamp(timestamp),
558 offset,
559 flags,
560 len: serialized
561 .len()
562 .try_into()
563 .with_context(|| format!("Serialized len={} overflows u32", serialized.len()))?,
564 data_crc,
565 index_crc: 0,
566 };
567 index_entry.index_crc = index_entry.crc32();
568 {
569 let entry_slice = unsafe {
571 std::slice::from_raw_parts(
572 &index_entry as *const IndexEntry as *const u8,
573 INDEX_ENTRY_SIZE,
574 )
575 };
576 self.index
577 .write_all(entry_slice)
578 .context("Failed to write entry to index file")?;
579 }
580
581 self.compressor = compressor;
584 Ok(())
585 }
586
587 pub fn put(&mut self, timestamp: SystemTime, data: &DataFrame) -> Result<bool> {
591 let shard = calculate_shard(timestamp);
592 if shard != self.shard {
593 let mut writer = Self::new_with_shard(
595 self.logger.clone(),
596 self.dir.as_path(),
597 shard,
598 self.compression_mode,
599 self.format,
600 )?;
601 writer.put_in_current_shard(timestamp, data)?;
605 *self = writer;
606 Ok(true)
607 } else {
608 self.put_in_current_shard(timestamp, data)?;
609 Ok(false)
610 }
611 }
612
613 fn discard_until<F>(&self, f: F) -> Result<bool>
617 where
618 F: Fn(u64) -> bool,
619 {
620 let entries = get_index_files(self.dir.as_path())?;
621
622 for entry in entries {
624 let v: Vec<&str> = entry.split('_').collect();
625 if v.len() != 2 {
626 warn!(self.logger, "Invalid index file name: {}", entry);
627 continue;
628 }
629
630 let entry_shard = match v[1].parse::<u64>() {
631 Ok(val) => val,
632 _ => {
633 warn!(self.logger, "Cannot parse index shard: {}", entry);
634 continue;
635 }
636 };
637
638 if f(entry_shard) {
639 return Ok(true);
640 }
641 if entry_shard >= self.shard {
642 return Ok(false);
643 }
644
645 let mut index_path = self.dir.clone();
648 index_path.push(&entry);
649
650 match std::fs::remove_file(&index_path) {
651 Err(e) if e.kind() != ErrorKind::NotFound => {
652 return Err(e).context(format!(
653 "Failed to remove index file: {}",
654 index_path.display()
655 ));
656 }
657 _ => {}
658 };
659
660 let mut data_path = self.dir.clone();
661 data_path.push(format!("data_{:011}", entry_shard));
662
663 match std::fs::remove_file(&data_path) {
664 Err(e) if e.kind() != ErrorKind::NotFound => {
665 return Err(e).context(format!(
666 "Failed to remove data file: {}",
667 data_path.display()
668 ));
669 }
670 _ => {}
671 };
672 }
673 Ok(false)
674 }
675
676 pub fn discard_earlier(&self, timestamp: SystemTime) -> Result<()> {
681 let shard = calculate_shard(timestamp);
682 self.discard_until(|shard_timestamp| shard_timestamp >= shard)?;
683 Ok(())
684 }
685
686 pub fn try_discard_until_size(&self, store_size_limit: u64) -> Result<bool> {
690 let dir = self.dir.clone();
691 self.discard_until(|_| {
692 let size = get_dir_size(dir.clone());
693 size <= store_size_limit
694 })
695 }
696}
697
698#[derive(Clone, Copy, Debug, Eq, PartialEq)]
700pub enum Direction {
701 Forward,
702 Reverse,
703}
704
705impl Direction {
706 pub fn get_skip_order(&self) -> std::cmp::Ordering {
707 match self {
708 Direction::Forward => std::cmp::Ordering::Less,
709 Direction::Reverse => std::cmp::Ordering::Greater,
710 }
711 }
712
713 pub fn flip(&self) -> Self {
714 match self {
715 Direction::Forward => Direction::Reverse,
716 Direction::Reverse => Direction::Forward,
717 }
718 }
719}
720
721pub fn read_next_sample<P: AsRef<Path>>(
725 path: P,
726 timestamp: SystemTime,
727 direction: Direction,
728 logger: slog::Logger,
729) -> Result<Option<(SystemTime, DataFrame)>> {
730 let mut cursor = cursor::StoreCursor::new(logger, path.as_ref().to_path_buf());
731 cursor.get_next(&get_unix_timestamp(timestamp), direction)
732}
733
734pub trait Store {
735 type SampleType;
740
741 fn get_sample_at_timestamp(
753 &mut self,
754 timestamp: SystemTime,
755 direction: Direction,
756 ) -> Result<Option<(SystemTime, Self::SampleType)>>;
757}
758
759pub struct LocalStore {
760 store_cursor: StoreCursor,
761}
762
763pub struct RemoteStore {
764 store: crate::remote_store::RemoteStore,
765}
766
767impl LocalStore {
768 pub fn new(logger: slog::Logger, dir: PathBuf) -> Self {
769 Self {
770 store_cursor: StoreCursor::new(logger, dir),
771 }
772 }
773}
774
775impl RemoteStore {
776 pub fn new(host: String, port: Option<u16>) -> Result<Self> {
777 Ok(Self {
778 store: crate::remote_store::RemoteStore::new(host, port)?,
779 })
780 }
781}
782
783impl Store for LocalStore {
784 type SampleType = DataFrame;
785
786 fn get_sample_at_timestamp(
787 &mut self,
788 timestamp: SystemTime,
789 direction: Direction,
790 ) -> Result<Option<(SystemTime, Self::SampleType)>> {
791 self.store_cursor
792 .get_next(&get_unix_timestamp(timestamp), direction)
793 }
794}
795
796impl Store for RemoteStore {
797 type SampleType = DataFrame;
798
799 fn get_sample_at_timestamp(
800 &mut self,
801 timestamp: SystemTime,
802 direction: Direction,
803 ) -> Result<Option<(SystemTime, Self::SampleType)>> {
804 self.store
805 .get_frame(get_unix_timestamp(timestamp), direction)
806 }
807}
808
809trait Crc32 {
810 fn crc32(&self) -> u32;
811}
812
813const CRC32_TABLE: [u32; 256] = [
815 0, 0x77073096, 0xEE0E612C, 0x990951BA, 0x076DC419, 0x706AF48F, 0xE963A535, 0x9E6495A3,
816 0x0EDB8832, 0x79DCB8A4, 0xE0D5E91E, 0x97D2D988, 0x09B64C2B, 0x7EB17CBD, 0xE7B82D07, 0x90BF1D91,
817 0x1DB71064, 0x6AB020F2, 0xF3B97148, 0x84BE41DE, 0x1ADAD47D, 0x6DDDE4EB, 0xF4D4B551, 0x83D385C7,
818 0x136C9856, 0x646BA8C0, 0xFD62F97A, 0x8A65C9EC, 0x14015C4F, 0x63066CD9, 0xFA0F3D63, 0x8D080DF5,
819 0x3B6E20C8, 0x4C69105E, 0xD56041E4, 0xA2677172, 0x3C03E4D1, 0x4B04D447, 0xD20D85FD, 0xA50AB56B,
820 0x35B5A8FA, 0x42B2986C, 0xDBBBC9D6, 0xACBCF940, 0x32D86CE3, 0x45DF5C75, 0xDCD60DCF, 0xABD13D59,
821 0x26D930AC, 0x51DE003A, 0xC8D75180, 0xBFD06116, 0x21B4F4B5, 0x56B3C423, 0xCFBA9599, 0xB8BDA50F,
822 0x2802B89E, 0x5F058808, 0xC60CD9B2, 0xB10BE924, 0x2F6F7C87, 0x58684C11, 0xC1611DAB, 0xB6662D3D,
823 0x76DC4190, 0x01DB7106, 0x98D220BC, 0xEFD5102A, 0x71B18589, 0x06B6B51F, 0x9FBFE4A5, 0xE8B8D433,
824 0x7807C9A2, 0x0F00F934, 0x9609A88E, 0xE10E9818, 0x7F6A0DBB, 0x086D3D2D, 0x91646C97, 0xE6635C01,
825 0x6B6B51F4, 0x1C6C6162, 0x856530D8, 0xF262004E, 0x6C0695ED, 0x1B01A57B, 0x8208F4C1, 0xF50FC457,
826 0x65B0D9C6, 0x12B7E950, 0x8BBEB8EA, 0xFCB9887C, 0x62DD1DDF, 0x15DA2D49, 0x8CD37CF3, 0xFBD44C65,
827 0x4DB26158, 0x3AB551CE, 0xA3BC0074, 0xD4BB30E2, 0x4ADFA541, 0x3DD895D7, 0xA4D1C46D, 0xD3D6F4FB,
828 0x4369E96A, 0x346ED9FC, 0xAD678846, 0xDA60B8D0, 0x44042D73, 0x33031DE5, 0xAA0A4C5F, 0xDD0D7CC9,
829 0x5005713C, 0x270241AA, 0xBE0B1010, 0xC90C2086, 0x5768B525, 0x206F85B3, 0xB966D409, 0xCE61E49F,
830 0x5EDEF90E, 0x29D9C998, 0xB0D09822, 0xC7D7A8B4, 0x59B33D17, 0x2EB40D81, 0xB7BD5C3B, 0xC0BA6CAD,
831 0xEDB88320, 0x9ABFB3B6, 0x03B6E20C, 0x74B1D29A, 0xEAD54739, 0x9DD277AF, 0x04DB2615, 0x73DC1683,
832 0xE3630B12, 0x94643B84, 0x0D6D6A3E, 0x7A6A5AA8, 0xE40ECF0B, 0x9309FF9D, 0x0A00AE27, 0x7D079EB1,
833 0xF00F9344, 0x8708A3D2, 0x1E01F268, 0x6906C2FE, 0xF762575D, 0x806567CB, 0x196C3671, 0x6E6B06E7,
834 0xFED41B76, 0x89D32BE0, 0x10DA7A5A, 0x67DD4ACC, 0xF9B9DF6F, 0x8EBEEFF9, 0x17B7BE43, 0x60B08ED5,
835 0xD6D6A3E8, 0xA1D1937E, 0x38D8C2C4, 0x4FDFF252, 0xD1BB67F1, 0xA6BC5767, 0x3FB506DD, 0x48B2364B,
836 0xD80D2BDA, 0xAF0A1B4C, 0x36034AF6, 0x41047A60, 0xDF60EFC3, 0xA867DF55, 0x316E8EEF, 0x4669BE79,
837 0xCB61B38C, 0xBC66831A, 0x256FD2A0, 0x5268E236, 0xCC0C7795, 0xBB0B4703, 0x220216B9, 0x5505262F,
838 0xC5BA3BBE, 0xB2BD0B28, 0x2BB45A92, 0x5CB36A04, 0xC2D7FFA7, 0xB5D0CF31, 0x2CD99E8B, 0x5BDEAE1D,
839 0x9B64C2B0, 0xEC63F226, 0x756AA39C, 0x026D930A, 0x9C0906A9, 0xEB0E363F, 0x72076785, 0x05005713,
840 0x95BF4A82, 0xE2B87A14, 0x7BB12BAE, 0x0CB61B38, 0x92D28E9B, 0xE5D5BE0D, 0x7CDCEFB7, 0x0BDBDF21,
841 0x86D3D2D4, 0xF1D4E242, 0x68DDB3F8, 0x1FDA836E, 0x81BE16CD, 0xF6B9265B, 0x6FB077E1, 0x18B74777,
842 0x88085AE6, 0xFF0F6A70, 0x66063BCA, 0x11010B5C, 0x8F659EFF, 0xF862AE69, 0x616BFFD3, 0x166CCF45,
843 0xA00AE278, 0xD70DD2EE, 0x4E048354, 0x3903B3C2, 0xA7672661, 0xD06016F7, 0x4969474D, 0x3E6E77DB,
844 0xAED16A4A, 0xD9D65ADC, 0x40DF0B66, 0x37D83BF0, 0xA9BCAE53, 0xDEBB9EC5, 0x47B2CF7F, 0x30B5FFE9,
845 0xBDBDF21C, 0xCABAC28A, 0x53B39330, 0x24B4A3A6, 0xBAD03605, 0xCDD70693, 0x54DE5729, 0x23D967BF,
846 0xB3667A2E, 0xC4614AB8, 0x5D681B02, 0x2A6F2B94, 0xB40BBE37, 0xC30C8EA1, 0x5A05DF1B, 0x2D02EF8D,
847];
848
849impl Crc32 for [u8] {
850 fn crc32(&self) -> u32 {
851 let mut crc: u32 = 0xFFFF_FFFF;
852 for byte in self {
853 crc = (crc >> 8) ^ CRC32_TABLE[((crc & 0xFF) as u8 ^ *byte) as usize];
854 }
855 crc
856 }
857}
858
859impl Crc32 for IndexEntry {
860 fn crc32(&self) -> u32 {
861 let slice = unsafe {
862 std::slice::from_raw_parts(
863 self as *const IndexEntry as *const u8,
864 INDEX_ENTRY_SIZE - std::mem::size_of::<u32>(),
866 )
867 };
868 slice.crc32()
869 }
870}
871
872fn calculate_shard(timestamp: SystemTime) -> u64 {
875 let timestamp_secs = get_unix_timestamp(timestamp);
876 let shard_rem = timestamp_secs % SHARD_TIME;
877 timestamp_secs - shard_rem
878}
879
880#[cfg(test)]
881mod tests {
882 use std::time::Duration;
883
884 use paste::paste;
885 use slog::Drain;
886 use tempfile::TempDir;
887
888 use super::*;
889
890 fn get_logger() -> slog::Logger {
891 let plain = slog_term::PlainSyncDecorator::new(std::io::stderr());
892 slog::Logger::root(slog_term::FullFormat::new(plain).build().fuse(), slog::o!())
893 }
894
895 macro_rules! assert_ts {
897 ($a:expr, $b:expr) => {
898 let a_dur = $a
899 .duration_since(SystemTime::UNIX_EPOCH)
900 .expect("Timestamp earlier than UNIX EPOCH");
901 let b_dur = $b
902 .duration_since(SystemTime::UNIX_EPOCH)
903 .expect("Timestamp earlier than UNIX EPOCH");
904 assert_eq!(a_dur.as_secs(), b_dur.as_secs());
905 };
906 }
907
908 macro_rules! store_test {
909 ($name:ident, $func:ident) => {
910 paste! {
911 #[test]
912 fn [<$name _uncompressed_cbor>]() {
913 $func(CompressionMode::None, Format::Cbor);
914 }
915 }
916
917 paste! {
918 #[test]
919 fn [<$name _compressed_cbor>]() {
920 $func(CompressionMode::Zstd, Format::Cbor);
921 }
922 }
923
924 paste! {
925 #[test]
926 fn [<$name _dict_compressed_cbor>]() {
927 $func(CompressionMode::ZstdDictionary(ChunkSizePo2(2)), Format::Cbor);
928 }
929 }
930 };
931 }
932
933 #[test]
934 fn writing_to_already_written_index_with_different_compression_format_works() {
935 use itertools::Itertools;
936
937 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
938 let ts = std::time::UNIX_EPOCH + Duration::from_secs(SHARD_TIME);
939
940 let states = [
943 (CompressionMode::None, Format::Cbor),
944 (CompressionMode::Zstd, Format::Cbor),
945 (
946 CompressionMode::ZstdDictionary(ChunkSizePo2(0)),
947 Format::Cbor,
948 ),
949 (
950 CompressionMode::ZstdDictionary(ChunkSizePo2(1)),
951 Format::Cbor,
952 ),
953 (
954 CompressionMode::ZstdDictionary(ChunkSizePo2(2)),
955 Format::Cbor,
956 ),
957 (
958 CompressionMode::ZstdDictionary(ChunkSizePo2(3)),
959 Format::Cbor,
960 ),
961 ];
962 let state_sequence = states
964 .iter()
965 .cartesian_product(states.iter())
966 .flat_map(|(a, b)| vec![a, b])
967 .collect::<Vec<_>>();
968
969 for (i, (compression_mode, format)) in state_sequence.iter().enumerate() {
970 let mut writer = StoreWriter::new(get_logger(), &dir, *compression_mode, *format)
971 .expect("Failed to create store");
972 let mut frame = DataFrame::default();
973 frame.sample.cgroup.memory_current = Some(i as i64);
974
975 writer
976 .put(ts + Duration::from_secs(i as u64), &frame)
977 .expect("Failed to store data");
978 }
979
980 let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
982 for (i, (_compress, _format)) in state_sequence.iter().enumerate() {
983 let frame = store_cursor
984 .get_next(
985 &get_unix_timestamp(ts + Duration::from_secs(i as u64)),
986 Direction::Forward,
987 )
988 .expect("Failed to read sample")
989 .expect("Did not find stored sample");
990 assert_ts!(frame.0, ts + Duration::from_secs(i as u64));
991 assert_eq!(frame.1.sample.cgroup.memory_current, Some(i as i64));
992 }
993 }
994
995 #[test]
996 fn write_index_padding() {
997 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
998 let ts = std::time::UNIX_EPOCH + Duration::from_secs(SHARD_TIME);
1000 {
1002 let mut writer =
1003 StoreWriter::new(get_logger(), &dir, CompressionMode::None, Format::Cbor)
1004 .expect("Failed to create store");
1005 let mut frame = DataFrame::default();
1006 for i in 0..1 {
1007 frame.sample.cgroup.memory_current = Some(i);
1008 writer
1009 .put(ts + Duration::from_secs(i as u64), &frame)
1010 .expect("Failed to store data");
1011 }
1012 assert_eq!(
1013 writer.index.metadata().unwrap().len(),
1014 INDEX_ENTRY_SIZE as u64
1015 );
1016 }
1017
1018 {
1020 let mut writer =
1021 StoreWriter::new(get_logger(), &dir, CompressionMode::None, Format::Cbor)
1022 .expect("Failed to create store");
1023 let mut frame = DataFrame::default();
1024 for i in 1..3 {
1025 frame.sample.cgroup.memory_current = Some(i);
1026 writer
1027 .put(ts + Duration::from_secs(i as u64), &frame)
1028 .expect("Failed to store data");
1029 }
1030 assert_eq!(
1031 writer.index.metadata().unwrap().len(),
1032 3 * INDEX_ENTRY_SIZE as u64
1033 );
1034 }
1035
1036 {
1038 let mut writer =
1039 StoreWriter::new(get_logger(), &dir, CompressionMode::Zstd, Format::Cbor)
1040 .expect("Failed to create store");
1041 let mut frame = DataFrame::default();
1042 for i in 3..5 {
1043 frame.sample.cgroup.memory_current = Some(i);
1044 writer
1045 .put(ts + Duration::from_secs(i as u64), &frame)
1046 .expect("Failed to store data");
1047 }
1048 assert_eq!(
1049 writer.index.metadata().unwrap().len(),
1050 5 * INDEX_ENTRY_SIZE as u64
1051 );
1052 }
1053
1054 {
1057 let mut writer = StoreWriter::new(
1058 get_logger(),
1059 &dir,
1060 CompressionMode::ZstdDictionary(ChunkSizePo2(2)),
1061 Format::Cbor,
1062 )
1063 .expect("Failed to create store");
1064 let mut frame = DataFrame::default();
1065 for i in 5..13 {
1066 frame.sample.cgroup.memory_current = Some(i);
1067 writer
1068 .put(ts + Duration::from_secs(i as u64), &frame)
1069 .expect("Failed to store data");
1070 }
1071 assert_eq!(
1072 writer.index.metadata().unwrap().len(),
1073 16 * INDEX_ENTRY_SIZE as u64
1074 );
1075 }
1076
1077 {
1080 let mut writer = StoreWriter::new(
1081 get_logger(),
1082 &dir,
1083 CompressionMode::ZstdDictionary(ChunkSizePo2(3)),
1084 Format::Cbor,
1085 )
1086 .expect("Failed to create store");
1087 let mut frame = DataFrame::default();
1088 for i in 13..16 {
1089 frame.sample.cgroup.memory_current = Some(i);
1090 writer
1091 .put(ts + Duration::from_secs(i as u64), &frame)
1092 .expect("Failed to store data");
1093 }
1094 assert_eq!(
1095 writer.index.metadata().unwrap().len(),
1096 19 * INDEX_ENTRY_SIZE as u64
1097 );
1098 }
1099
1100 let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1101 for i in 0..16 {
1102 let frame = store_cursor
1103 .get_next(
1104 &get_unix_timestamp(ts + Duration::from_secs(i as u64)),
1105 Direction::Forward,
1106 )
1107 .expect("Failed to read sample")
1108 .expect("Did not find stored sample");
1109 assert_ts!(frame.0, ts + Duration::from_secs(i as u64));
1110 assert_eq!(frame.1.sample.cgroup.memory_current, Some(i));
1111 }
1112 }
1113
1114 store_test!(create_writer, _create_writer);
1115 fn _create_writer(compression_mode: CompressionMode, format: Format) {
1116 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1117 StoreWriter::new(get_logger(), &dir, compression_mode, format)
1118 .expect("Failed to create store");
1119 }
1120
1121 store_test!(simple_put_read, _simple_put_read);
1122 fn _simple_put_read(compression_mode: CompressionMode, format: Format) {
1123 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1124 let ts = SystemTime::now();
1125 {
1126 let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1127 .expect("Failed to create store");
1128 let mut frame = DataFrame::default();
1129 frame.sample.cgroup.memory_current = Some(333);
1130
1131 writer.put(ts, &frame).expect("Failed to store data");
1132 }
1133
1134 let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1135 let frame = store_cursor
1136 .get_next(&get_unix_timestamp(ts), Direction::Forward)
1137 .expect("Failed to read sample")
1138 .expect("Did not find stored sample");
1139 assert_ts!(frame.0, ts);
1140 assert_eq!(frame.1.sample.cgroup.memory_current, Some(333));
1141 }
1142
1143 store_test!(simple_put_read_10, _simple_put_read_10);
1144 fn _simple_put_read_10(compression_mode: CompressionMode, format: Format) {
1145 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1146 let ts = std::time::UNIX_EPOCH + Duration::from_secs(SHARD_TIME);
1148 {
1149 let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1150 .expect("Failed to create store");
1151 let mut frame = DataFrame::default();
1152 for i in 0..10 {
1153 frame.sample.cgroup.memory_current = Some(i);
1154 writer
1155 .put(ts + Duration::from_secs(i as u64), &frame)
1156 .expect("Failed to store data");
1157 }
1158 }
1159
1160 let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1161 for i in 0..10 {
1162 let frame = store_cursor
1163 .get_next(
1164 &get_unix_timestamp(ts + Duration::from_secs(i as u64)),
1165 Direction::Forward,
1166 )
1167 .expect("Failed to read sample")
1168 .expect("Did not find stored sample");
1169 assert_ts!(frame.0, ts + Duration::from_secs(i as u64));
1170 assert_eq!(frame.1.sample.cgroup.memory_current, Some(i));
1171 }
1172 }
1173
1174 store_test!(put_new_shard, _put_new_shard);
1175 fn _put_new_shard(compression_mode: CompressionMode, format: Format) {
1176 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1177 let now = SystemTime::now();
1178 let ts = if calculate_shard(now) == calculate_shard(now + Duration::from_secs(60)) {
1180 now
1181 } else {
1182 now + Duration::from_secs(60)
1183 };
1184
1185 {
1186 let mut writer =
1187 StoreWriter::new_with_timestamp(get_logger(), &dir, ts, compression_mode, format)
1188 .expect("Failed to create store");
1189 let mut frame = DataFrame::default();
1190 frame.sample.cgroup.memory_current = Some(111);
1191
1192 assert!(!writer.put(ts, &frame).expect("Failed to store data"));
1194
1195 frame.sample.cgroup.memory_current = Some(222);
1196
1197 assert!(
1199 !writer
1200 .put(ts + Duration::from_secs(1), &frame)
1201 .expect("Failed to store data")
1202 );
1203
1204 frame.sample.cgroup.memory_current = Some(333);
1205
1206 assert!(
1208 writer
1209 .put(ts + Duration::from_secs(SHARD_TIME), &frame)
1210 .expect("Failed to store data")
1211 );
1212 }
1213
1214 {
1215 let mut writer = StoreWriter::new_with_timestamp(
1216 get_logger(),
1217 &dir,
1218 ts + Duration::from_secs(SHARD_TIME + 1),
1219 compression_mode,
1220 format,
1221 )
1222 .expect("Failed to create store");
1223 let mut frame = DataFrame::default();
1224 frame.sample.cgroup.memory_current = Some(444);
1225
1226 assert!(
1228 !writer
1229 .put(ts + Duration::from_secs(SHARD_TIME + 1), &frame,)
1230 .expect("Failed to store data")
1231 );
1232 }
1233
1234 let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1235 let frame = store_cursor
1236 .get_next(&get_unix_timestamp(ts), Direction::Forward)
1237 .expect("Failed to read sample")
1238 .expect("Did not find stored sample");
1239 assert_ts!(frame.0, ts);
1240 assert_eq!(frame.1.sample.cgroup.memory_current, Some(111));
1241
1242 let frame = store_cursor
1243 .get_next(
1244 &get_unix_timestamp(ts + Duration::from_secs(1)),
1245 Direction::Forward,
1246 )
1247 .expect("Failed to read sample")
1248 .expect("Did not find stored sample");
1249 assert_ts!(frame.0, ts + Duration::from_secs(1));
1250 assert_eq!(frame.1.sample.cgroup.memory_current, Some(222));
1251
1252 let frame = store_cursor
1253 .get_next(
1254 &get_unix_timestamp(ts + Duration::from_secs(SHARD_TIME)),
1255 Direction::Forward,
1256 )
1257 .expect("Failed to read sample")
1258 .expect("Did not find stored sample");
1259 assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME));
1260 assert_eq!(frame.1.sample.cgroup.memory_current, Some(333));
1261
1262 let frame = store_cursor
1263 .get_next(
1264 &get_unix_timestamp(ts + Duration::from_secs(SHARD_TIME + 1)),
1265 Direction::Forward,
1266 )
1267 .expect("Failed to read sample")
1268 .expect("Did not find stored sample");
1269 assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME + 1));
1270 assert_eq!(frame.1.sample.cgroup.memory_current, Some(444));
1271 }
1272
1273 store_test!(put_read_corrupt_data, _put_read_corrupt_data);
1274 fn _put_read_corrupt_data(compression_mode: CompressionMode, format: Format) {
1275 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1276 let ts = SystemTime::now();
1277 let ts_next = ts + Duration::from_secs(1);
1278 {
1279 let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1280 .expect("Failed to create store");
1281 let mut frame = DataFrame::default();
1282 frame.sample.cgroup.memory_current = Some(333);
1283
1284 writer.put(ts, &frame).expect("Failed to store data");
1285
1286 for entry in fs::read_dir(&dir).expect("Failed to read dir") {
1288 let entry = entry.expect("Failed to list entry");
1289 if let Some(name) = entry.path().file_name() {
1290 if name.to_string_lossy().starts_with("data_") {
1291 OpenOptions::new()
1292 .append(true)
1293 .open(entry.path())
1294 .expect("Failed to open data file")
1295 .write_all(&[0])
1296 .expect("Failed to write to data file");
1297 }
1298 }
1299 }
1300
1301 frame.sample.cgroup.memory_current = Some(222);
1302
1303 writer.put(ts_next, &frame).expect("Failed to store data");
1305 }
1306
1307 let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1308 let frame = store_cursor
1309 .get_next(&get_unix_timestamp(ts), Direction::Forward)
1310 .expect("Failed to read sample")
1311 .expect("Did not find stored sample");
1312 assert_ts!(frame.0, ts);
1313 assert_eq!(frame.1.sample.cgroup.memory_current, Some(333));
1314
1315 let frame = store_cursor
1316 .get_next(&get_unix_timestamp(ts_next), Direction::Forward)
1317 .expect("Failed to read sample")
1318 .expect("Did not find stored sample");
1319 assert_ts!(frame.0, ts_next);
1320 assert_eq!(frame.1.sample.cgroup.memory_current, Some(222));
1321 }
1322
1323 store_test!(
1324 read_past_the_end_returns_none,
1325 _read_past_the_end_returns_none
1326 );
1327 fn _read_past_the_end_returns_none(compression_mode: CompressionMode, format: Format) {
1328 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1329 let ts = SystemTime::now();
1330 {
1331 let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1332 .expect("Failed to create store");
1333 let mut frame = DataFrame::default();
1334 frame.sample.cgroup.memory_current = Some(333);
1335
1336 writer.put(ts, &frame).expect("Failed to store data");
1337 }
1338
1339 let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1340 let frame_opt = store_cursor
1341 .get_next(
1342 &get_unix_timestamp(ts + Duration::from_secs(1)),
1343 Direction::Forward,
1344 )
1345 .expect("Failed to read sample");
1346 assert_eq!(frame_opt, None);
1347 }
1348
1349 store_test!(read_iterates_appropriately, _read_iterates_appropriately);
1350 fn _read_iterates_appropriately(compression_mode: CompressionMode, format: Format) {
1351 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1352 let ts = SystemTime::now();
1353 {
1354 let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1355 .expect("Failed to create store");
1356 let mut frame = DataFrame::default();
1357 frame.sample.cgroup.memory_current = Some(333);
1358
1359 writer.put(ts, &frame).expect("Failed to store data");
1360
1361 frame.sample.cgroup.memory_current = Some(666);
1362 writer
1363 .put(ts + Duration::from_secs(5), &frame)
1364 .expect("Failed to store data");
1365 }
1366
1367 let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1368 let frame = store_cursor
1369 .get_next(
1370 &get_unix_timestamp(ts + Duration::from_secs(3)),
1371 Direction::Forward,
1372 )
1373 .expect("Failed to read sample")
1374 .expect("Did not find stored sample");
1375 assert_ts!(frame.0, ts + Duration::from_secs(5));
1376 assert_eq!(frame.1.sample.cgroup.memory_current, Some(666));
1377 }
1378
1379 store_test!(
1380 put_and_read_work_across_shards,
1381 _put_and_read_work_across_shards
1382 );
1383 fn _put_and_read_work_across_shards(compression_mode: CompressionMode, format: Format) {
1384 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1385 let ts = SystemTime::now();
1386 {
1387 let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1388 .expect("Failed to create store");
1389 let mut frame = DataFrame::default();
1390 frame.sample.cgroup.memory_current = Some(333);
1391
1392 writer.put(ts, &frame).expect("Failed to store data");
1393
1394 frame.sample.cgroup.memory_current = Some(666);
1395 writer
1396 .put(ts + Duration::from_secs(SHARD_TIME), &frame)
1397 .expect("Failed to store data");
1398 }
1399
1400 let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1401 let frame = store_cursor
1402 .get_next(
1403 &get_unix_timestamp(ts + Duration::from_secs(1)),
1404 Direction::Forward,
1405 )
1406 .expect("Failed to read sample")
1407 .expect("Did not find stored sample");
1408 assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME));
1409 assert_eq!(frame.1.sample.cgroup.memory_current, Some(666));
1410 }
1411
1412 store_test!(read_reverse, _read_reverse);
1413 fn _read_reverse(compression_mode: CompressionMode, format: Format) {
1414 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1415 let ts = SystemTime::now();
1416 {
1417 let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1418 .expect("Failed to create store");
1419 let mut frame = DataFrame::default();
1420 frame.sample.cgroup.memory_current = Some(333);
1421
1422 writer.put(ts, &frame).expect("Failed to store data");
1423 }
1424
1425 let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1426 let frame = store_cursor
1427 .get_next(&get_unix_timestamp(ts), Direction::Reverse)
1428 .expect("Failed to read sample")
1429 .expect("Did not find stored sample");
1430 assert_ts!(frame.0, ts);
1431 assert_eq!(frame.1.sample.cgroup.memory_current, Some(333));
1432 }
1433
1434 store_test!(read_reverse_across_shards, _read_reverse_across_shards);
1435 fn _read_reverse_across_shards(compression_mode: CompressionMode, format: Format) {
1436 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1437 let ts = SystemTime::now();
1438 {
1439 let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1440 .expect("Failed to create store");
1441 let mut frame = DataFrame::default();
1442 frame.sample.cgroup.memory_current = Some(333);
1443
1444 writer.put(ts, &frame).expect("Failed to store data");
1445
1446 frame.sample.cgroup.memory_current = Some(666);
1447 writer
1448 .put(ts + Duration::from_secs(SHARD_TIME), &frame)
1449 .expect("Failed to store data");
1450 }
1451
1452 let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1453 let frame = store_cursor
1454 .get_next(
1455 &get_unix_timestamp(ts + Duration::from_secs(SHARD_TIME) - Duration::from_secs(1)),
1456 Direction::Reverse,
1457 )
1458 .expect("Failed to read sample")
1459 .expect("Did not find stored sample");
1460 assert_ts!(frame.0, ts);
1461 assert_eq!(frame.1.sample.cgroup.memory_current, Some(333));
1462 }
1463
1464 store_test!(discard_earlier, _discard_earlier);
1465 fn _discard_earlier(compression_mode: CompressionMode, format: Format) {
1466 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1467 let ts = std::time::UNIX_EPOCH + Duration::from_secs(SHARD_TIME);
1468 {
1469 let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1470 .expect("Failed to create store");
1471 let mut frame = DataFrame::default();
1472 frame.sample.cgroup.memory_current = Some(333);
1473
1474 writer.put(ts, &frame).expect("Failed to store data");
1475
1476 frame.sample.cgroup.memory_current = Some(666);
1477 writer
1478 .put(ts + Duration::from_secs(1), &frame)
1479 .expect("Failed to store data");
1480
1481 frame.sample.cgroup.memory_current = Some(777);
1482 writer
1483 .put(ts + Duration::from_secs(SHARD_TIME), &frame)
1484 .expect("Failed to store data");
1485
1486 frame.sample.cgroup.memory_current = Some(888);
1487 writer
1488 .put(ts + Duration::from_secs(SHARD_TIME + 1), &frame)
1489 .expect("Failed to store data");
1490
1491 writer
1492 .discard_earlier(ts + Duration::from_secs(SHARD_TIME + 1))
1493 .expect("Failed to discard data");
1494 }
1495
1496 let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1497 let frame = store_cursor
1498 .get_next(&get_unix_timestamp(ts), Direction::Forward)
1499 .expect("Failed to read sample")
1500 .expect("Did not find stored sample");
1501 assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME));
1502 assert_eq!(frame.1.sample.cgroup.memory_current, Some(777));
1503 }
1504
1505 store_test!(try_discard_until_size, _try_discard_until_size);
1506 fn _try_discard_until_size(compression_mode: CompressionMode, format: Format) {
1507 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1508 let dir_path_buf = dir.path().to_path_buf();
1509 let ts = std::time::UNIX_EPOCH + Duration::from_secs(SHARD_TIME);
1510 let mut shard_sizes = Vec::new();
1511 let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1512 .expect("Failed to create store");
1513
1514 let mut write = |timestamp: SystemTime, n: u64| -> u64 {
1517 let dir_size = get_dir_size(dir_path_buf.clone());
1518 let mut frame = DataFrame::default();
1519 for i in 0..n {
1520 frame.sample.cgroup.memory_current = Some(n as i64 + i as i64);
1521 writer
1522 .put(timestamp + Duration::from_secs(i), &frame)
1523 .expect("Failed to store data");
1524 }
1525 let dir_size_after = get_dir_size(dir_path_buf.clone());
1526 assert!(
1527 dir_size_after > dir_size,
1528 "Directory size did not increase. before: {} after: {}: n_samples {}",
1529 dir_size,
1530 dir_size_after,
1531 n,
1532 );
1533 dir_size_after - dir_size
1534 };
1535
1536 let num_shards = 7;
1537 for i in 0..num_shards {
1538 shard_sizes.push(write(ts + Duration::from_secs(SHARD_TIME * i), i + 1));
1539 }
1540 let total_size = shard_sizes.iter().sum::<u64>();
1541
1542 {
1545 let target_size = total_size;
1547 assert!(
1548 writer
1549 .try_discard_until_size(target_size)
1550 .expect("Failed to discard data")
1551 );
1552 let frame = StoreCursor::new(get_logger(), dir.path().to_path_buf())
1553 .get_next(&get_unix_timestamp(ts), Direction::Forward)
1554 .expect("Failed to read sample")
1555 .expect("Did not find stored sample");
1556 assert_ts!(frame.0, ts);
1557 assert_eq!(frame.1.sample.cgroup.memory_current, Some(1));
1558 }
1559
1560 {
1561 let target_size = total_size - 1;
1563 assert!(
1564 writer
1565 .try_discard_until_size(target_size)
1566 .expect("Failed to discard data")
1567 );
1568 let frame = StoreCursor::new(get_logger(), dir.path().to_path_buf())
1569 .get_next(&get_unix_timestamp(ts), Direction::Forward)
1570 .expect("Failed to read sample")
1571 .expect("Did not find stored sample");
1572 assert_eq!(frame.1.sample.cgroup.memory_current, Some(2));
1574 }
1575
1576 {
1577 let target_size = total_size - (shard_sizes[0] + shard_sizes[1] + shard_sizes[2]);
1579 assert!(
1580 writer
1581 .try_discard_until_size(target_size)
1582 .expect("Failed to discard data")
1583 );
1584 let frame = StoreCursor::new(get_logger(), dir.path().to_path_buf())
1585 .get_next(&get_unix_timestamp(ts), Direction::Forward)
1586 .expect("Failed to read sample")
1587 .expect("Did not find stored sample");
1588 assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME * 3));
1589 assert_eq!(frame.1.sample.cgroup.memory_current, Some(4));
1590 }
1591
1592 {
1593 let target_size = total_size - (shard_sizes[0] + shard_sizes[1] + shard_sizes[2] +
1596 shard_sizes[3] + shard_sizes[4])
1597 + 1;
1598 assert!(
1599 writer
1600 .try_discard_until_size(target_size)
1601 .expect("Failed to discard data")
1602 );
1603 let frame = StoreCursor::new(get_logger(), dir.path().to_path_buf())
1604 .get_next(&get_unix_timestamp(ts), Direction::Forward)
1605 .expect("Failed to read sample")
1606 .expect("Did not find stored sample");
1607 assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME * 5));
1608 assert_eq!(frame.1.sample.cgroup.memory_current, Some(6));
1609 }
1610
1611 {
1612 assert!(
1615 !writer
1616 .try_discard_until_size(1)
1617 .expect("Failed to discard data"),
1618 );
1619 let frame = StoreCursor::new(get_logger(), dir.path().to_path_buf())
1620 .get_next(&get_unix_timestamp(ts), Direction::Forward)
1621 .expect("Failed to read sample")
1622 .expect("Did not find stored sample");
1623 assert_ts!(frame.0, ts + Duration::from_secs(SHARD_TIME) * 6);
1624 assert_eq!(frame.1.sample.cgroup.memory_current, Some(7));
1625 }
1626 }
1627
1628 store_test!(flock_protects, _flock_protects);
1629 fn _flock_protects(compression_mode: CompressionMode, format: Format) {
1630 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1631 let ts = SystemTime::now();
1632 let shard = calculate_shard(ts);
1633 let mut index_path = dir.path().to_path_buf();
1634 index_path.push(format!("index_{:011}", shard));
1635 let index = OpenOptions::new()
1636 .append(true)
1637 .create(true)
1638 .open(index_path.as_path())
1639 .expect("Failed to create index file");
1640 nix::fcntl::flock(
1641 index.as_raw_fd(),
1642 nix::fcntl::FlockArg::LockExclusiveNonblock,
1643 )
1644 .expect("Failed to acquire flock on index file");
1645
1646 assert!(
1647 StoreWriter::new(get_logger(), &dir, compression_mode, format).is_err(),
1648 "Did not conflict on index lock"
1649 );
1650 }
1651
1652 store_test!(
1653 writing_to_already_written_index_works,
1654 _writing_to_already_written_index_works
1655 );
1656 fn _writing_to_already_written_index_works(compression_mode: CompressionMode, format: Format) {
1657 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1658 let ts = std::time::UNIX_EPOCH + Duration::from_secs(SHARD_TIME);
1659 {
1660 let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1661 .expect("Failed to create store");
1662 let mut frame = DataFrame::default();
1663 frame.sample.cgroup.memory_current = Some(333);
1664
1665 writer.put(ts, &frame).expect("Failed to store data");
1666 }
1667 {
1668 let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1669 .expect("Failed to create store");
1670 let mut frame = DataFrame::default();
1671 frame.sample.cgroup.memory_current = Some(666);
1672 writer
1673 .put(ts + Duration::from_secs(5), &frame)
1674 .expect("Failed to store data");
1675 }
1676
1677 let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1678 let frame = store_cursor
1679 .get_next(&get_unix_timestamp(ts), Direction::Forward)
1680 .expect("Failed to read sample")
1681 .expect("Did not find stored sample");
1682 assert_ts!(frame.0, ts);
1683 assert_eq!(frame.1.sample.cgroup.memory_current, Some(333));
1684
1685 let frame = store_cursor
1686 .get_next(
1687 &get_unix_timestamp(ts + Duration::from_secs(1)),
1688 Direction::Forward,
1689 )
1690 .expect("Failed to read sample")
1691 .expect("Did not find stored sample");
1692 assert_ts!(frame.0, ts + Duration::from_secs(5));
1693 assert_eq!(frame.1.sample.cgroup.memory_current, Some(666));
1694 }
1695
1696 store_test!(
1697 read_skips_over_corrupt_index_entry,
1698 _read_skips_over_corrupt_index_entry
1699 );
1700 fn _read_skips_over_corrupt_index_entry(compression_mode: CompressionMode, format: Format) {
1701 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1702 let ts = std::time::UNIX_EPOCH + Duration::from_secs(SHARD_TIME);
1703 {
1704 let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1705 .expect("Failed to create store");
1706 let mut frame = DataFrame::default();
1707 frame.sample.cgroup.memory_current = Some(333);
1708
1709 writer.put(ts, &frame).expect("Failed to store data");
1710 }
1711 {
1713 let shard = calculate_shard(ts);
1714 let mut index_path = dir.path().to_path_buf();
1715 index_path.push(format!("index_{:011}", shard));
1716 let mut index = OpenOptions::new()
1717 .append(true)
1718 .create(true)
1719 .open(index_path.as_path())
1720 .expect("Failed to create index file");
1721 index
1722 .write_all(b"This is complete garbage data that is longer than an entry")
1723 .expect("Failed to append to index");
1724 }
1725 {
1726 let mut writer = StoreWriter::new(get_logger(), &dir, compression_mode, format)
1727 .expect("Failed to create store");
1728 let mut frame = DataFrame::default();
1729 frame.sample.cgroup.memory_current = Some(666);
1730 writer
1731 .put(ts + Duration::from_secs(5), &frame)
1732 .expect("Failed to store data");
1733 }
1734
1735 let mut store_cursor = StoreCursor::new(get_logger(), dir.path().to_path_buf());
1736 let frame = store_cursor
1737 .get_next(
1738 &get_unix_timestamp(ts + Duration::from_secs(1)),
1739 Direction::Forward,
1740 )
1741 .expect("Failed to read sample")
1742 .expect("Did not find stored sample");
1743 assert_ts!(frame.0, ts + Duration::from_secs(5));
1744 assert_eq!(frame.1.sample.cgroup.memory_current, Some(666));
1745 }
1746
1747 store_test!(writer_creates_directory, _writer_creates_directory);
1748 fn _writer_creates_directory(compression_mode: CompressionMode, format: Format) {
1749 let dir = TempDir::with_prefix("below_store_test.").expect("tempdir failed");
1750 let mut subdir = dir.path().to_path_buf();
1751 subdir.push("foo");
1752 let ts = SystemTime::now();
1753 {
1754 let mut writer = StoreWriter::new(get_logger(), &subdir, compression_mode, format)
1755 .expect("Failed to create store");
1756 let mut frame = DataFrame::default();
1757 frame.sample.cgroup.memory_current = Some(333);
1758
1759 writer.put(ts, &frame).expect("Failed to store data");
1760 }
1761
1762 let mut store_cursor = StoreCursor::new(get_logger(), subdir);
1763 let frame = store_cursor
1764 .get_next(&get_unix_timestamp(ts), Direction::Forward)
1765 .expect("Failed to read sample")
1766 .expect("Did not find stored sample");
1767 assert_ts!(frame.0, ts);
1768 assert_eq!(frame.1.sample.cgroup.memory_current, Some(333));
1769 }
1770}