crunchert_storage/
lib.rs

1use dashmap::DashMap;
2use itertools::Itertools;
3use memmap2::Mmap;
4use pco::standalone::simple_decompress;
5use postcard::from_bytes;
6
7use std::cmp::{max, min};
8use std::sync::Arc;
9use std::{fmt::Debug, path::PathBuf};
10use std::{fs, io, usize};
11use tokio::task::JoinSet;
12
13use chrono::Utc;
14use serde::{Deserialize, Serialize};
15use tokio::sync::{Mutex, RwLock};
16
17const PARTITIONS_FILE_HEADER_FILENAME: &str = "CruncheRTPartitionsConfig";
18const MIN_PTS_TO_COMPRESS: usize = 8192;
19const MIN_STREAMS_PER_THREAD: usize = 1024;
20
21#[derive(Clone, Copy)]
22pub enum Aggregation {
23    Sum,
24    Avg,
25    Min,
26    Max,
27}
28
29pub struct ChartRequest {
30    pub stream_ids: Vec<u64>,
31    pub start_unix_s: i64,
32    pub stop_unix_s: i64,
33    pub step_s: u32,
34}
35
36#[derive(Debug, Copy, Clone)]
37pub struct Datapoint {
38    pub unix_s: i64,
39    pub value: f32,
40}
41
42#[derive(Serialize, Deserialize, Debug)]
43pub struct StorageConfig {
44    pub compression_level: usize,
45    pub retention_period_s: usize,
46    pub cold_storage_after_s: usize,
47    pub data_frequency_s: usize,
48    pub stream_cache_ttl_s: usize,
49    pub data_storage_dir: PathBuf,
50}
51
52#[derive(Serialize, Deserialize, Clone, Copy)]
53struct DiskStreamFileHeader {
54    stream_id: u64,
55    unix_s_byte_start: usize,
56    unix_s_byte_stop: usize,
57    values_byte_stop: usize,
58    compressed: bool,
59}
60
61#[derive(Serialize, Deserialize, Clone)]
62struct TimePartitionFileHeader {
63    start_unix_s: i64,
64    file_path: PathBuf,
65    disk_streams: Vec<DiskStreamFileHeader>,
66}
67
68impl TimePartitionFileHeader {
69    fn new(config: &StorageConfig) -> Self {
70        let now = Utc::now().timestamp();
71        let file_path = config.data_storage_dir.join(now.to_string());
72        Self {
73            start_unix_s: now,
74            file_path,
75            disk_streams: Default::default(),
76        }
77    }
78}
79
80#[derive(Serialize, Deserialize)]
81struct PartitionsFileHeader {
82    // sorted descending start_unix_s
83    time_partitions: Vec<TimePartitionFileHeader>,
84}
85
86#[derive(Default)]
87struct HotStream {
88    unix_seconds: Vec<i64>,
89    values: Vec<f32>,
90}
91
92struct Stream {
93    disk_header: DiskStreamFileHeader,
94    hot_stream: RwLock<Option<HotStream>>,
95    last_accessed: Mutex<Option<i64>>,
96}
97
98struct TimePartition {
99    start_unix_s: i64,
100    mmap: Mmap,
101    streams: DashMap<u64, Stream>,
102}
103
104pub struct Storage {
105    config: StorageConfig,
106    partitions: RwLock<Vec<Arc<TimePartition>>>,
107    partition_file_header: PartitionsFileHeader,
108    num_threads: usize,
109}
110
111#[derive(Clone, Copy)]
112struct ChartReqMetadata {
113    start_unix_s: i64,
114    stop_unix_s: i64,
115    step_s: u32,
116    resolution: usize,
117}
118
119#[inline]
120fn resolution(start_unix_s: i64, stop_unix_s: i64, step_s: u32) -> usize {
121    let duration_s = (stop_unix_s - start_unix_s) as u32;
122    let resolution = duration_s / step_s;
123    return resolution as usize;
124}
125
126impl From<&ChartRequest> for ChartReqMetadata {
127    fn from(value: &ChartRequest) -> Self {
128        Self {
129            start_unix_s: value.start_unix_s,
130            stop_unix_s: value.stop_unix_s,
131            step_s: value.step_s,
132            resolution: resolution(value.start_unix_s, value.stop_unix_s, value.step_s),
133        }
134    }
135}
136
137impl Default for StorageConfig {
138    fn default() -> Self {
139        Self {
140            compression_level: 8,
141            retention_period_s: 31104000,  //1y
142            cold_storage_after_s: 7776000, //90d
143            data_frequency_s: 900,
144            stream_cache_ttl_s: 900,
145            data_storage_dir: PathBuf::from("/var/lib/wolfeymetrics"),
146        }
147    }
148}
149
150const MIN_COMPRESSION_LEVEL: usize = 4;
151const MAX_COMPRESSION_LEVEL: usize = 12;
152const MIN_RETENTION_PERIOD_S: usize = 900; //15m
153const MIN_COLD_STORAGE_S: usize = 7776000; //90d
154const MAX_RETENTION_PERIOD_S: usize = 3156000000; //100y
155const MAX_DATA_FREQUENCY_S: usize = 604800; //7d
156
157#[derive(thiserror::Error, Debug)]
158pub enum StorageConfigError {
159    #[error("COMPRESSION_LEVEL must be >= {MIN_COMPRESSION_LEVEL}")]
160    ToLowCompressionLevel,
161    #[error("COMPRESSION_LEVEL must be <= {MAX_COMPRESSION_LEVEL}")]
162    ToHighCompressionLevel,
163    #[error("RETENTION_PERIOD must be >= {MIN_RETENTION_PERIOD_S}")]
164    ToLowRetentionPeriod,
165    #[error("RETENTION_PERIOD must be <= {MAX_RETENTION_PERIOD_S}")]
166    ToHighRetentionPeriod,
167    #[error("RETENTION_PERIOD_S must be >= COLD_STORAGE_AFTER_S")]
168    ColdStorageCannotBeGreaterThanRetention,
169    #[error("COLD_STORAGE_AFTER_S must be >= {MIN_COLD_STORAGE_S} or RETENTION_PERIOD_S")]
170    ColdStorageTooLow,
171    #[error("DATA_FREQUENCY_S must be <= {MAX_DATA_FREQUENCY_S}")]
172    DataFrequencyTooHigh,
173}
174
175impl StorageConfig {
176    fn validate(self) -> Result<Self, StorageConfigError> {
177        if self.compression_level < MIN_COMPRESSION_LEVEL {
178            return Err(StorageConfigError::ToLowCompressionLevel);
179        }
180
181        if self.compression_level > MAX_COMPRESSION_LEVEL {
182            return Err(StorageConfigError::ToHighCompressionLevel);
183        }
184
185        if self.retention_period_s < MIN_RETENTION_PERIOD_S {
186            return Err(StorageConfigError::ToLowRetentionPeriod);
187        }
188        if self.retention_period_s > MAX_RETENTION_PERIOD_S {
189            return Err(StorageConfigError::ToHighRetentionPeriod);
190        }
191
192        if self.retention_period_s < self.cold_storage_after_s {
193            return Err(StorageConfigError::ColdStorageCannotBeGreaterThanRetention);
194        }
195
196        let min_cold_storage_s = std::cmp::min(MIN_COLD_STORAGE_S, self.retention_period_s);
197
198        if self.cold_storage_after_s < min_cold_storage_s {
199            return Err(StorageConfigError::ColdStorageTooLow);
200        }
201
202        if self.data_frequency_s > MAX_DATA_FREQUENCY_S {
203            return Err(StorageConfigError::DataFrequencyTooHigh);
204        }
205
206        Ok(self)
207    }
208}
209
210impl DiskStreamFileHeader {
211    fn read_stream_from_mmap(&self, mmap: &Mmap) -> HotStream {
212        let unix_s_bytes = &mmap[self.unix_s_byte_start..self.unix_s_byte_stop];
213        let value_bytes = &mmap[self.unix_s_byte_stop..self.values_byte_stop];
214
215        let Ok(unix_s_decompressed) = simple_decompress(unix_s_bytes) else {
216            return HotStream::default();
217        };
218        let Ok(values_decompressed) = simple_decompress(value_bytes) else {
219            return HotStream::default();
220        };
221
222        HotStream {
223            unix_seconds: unix_s_decompressed,
224            values: values_decompressed,
225        }
226    }
227}
228
229#[derive(Default, Debug, Clone, Copy)]
230struct ValueTracker {
231    value: f32,
232    count: u32,
233}
234
235impl ValueTracker {
236    #[inline]
237    fn agg<F>(self, rhs: Self, agg_fn: &F) -> Self
238    where
239        F: Fn(f32, f32) -> f32,
240    {
241        Self {
242            value: agg_fn(self.value, rhs.value),
243            count: self.count + rhs.count,
244        }
245    }
246    #[inline]
247    fn apply<F>(self, rhs: f32, agg_fn: &F) -> Self
248    where
249        F: Fn(f32, f32) -> f32,
250    {
251        Self {
252            value: agg_fn(self.value, rhs),
253            count: self.count + 1,
254        }
255    }
256}
257#[inline]
258fn agg_to_agg_fn(agg: Aggregation) -> impl Fn(f32, f32) -> f32 {
259    match agg {
260        Aggregation::Sum => std::ops::Add::add,
261        Aggregation::Avg => std::ops::Add::add,
262        Aggregation::Min => f32::min,
263        Aggregation::Max => f32::max,
264    }
265}
266#[inline]
267fn iter_search_ts(req: ChartReqMetadata) -> impl Iterator<Item = i64> {
268    (req.start_unix_s..req.stop_unix_s)
269        .rev()
270        .step_by(req.step_s as usize)
271}
272
273impl HotStream {
274    fn get_chart_values(&self, req: ChartReqMetadata) -> impl Iterator<Item = Option<f32>> + '_ {
275        iter_search_ts(req).map(move |x| {
276            let found_ts_idx = self.unix_seconds.binary_search(&x).unwrap_or_else(|x| x);
277            let found_ts_idx = min(found_ts_idx, self.unix_seconds.len() - 1);
278            let found_ts = self.unix_seconds[found_ts_idx];
279            let diff = x.abs_diff(found_ts);
280            if diff < req.step_s as u64 {
281                let value = self.values[found_ts_idx];
282                Some(value)
283            } else {
284                None
285            }
286        })
287    }
288    fn add_stream_to_chart(
289        &self,
290        req: ChartReqMetadata,
291        aggregated_result: Vec<ValueTracker>,
292        agg: Aggregation,
293    ) -> Vec<ValueTracker> {
294        let chart_values = self.get_chart_values(req);
295        let agg_fn = agg_to_agg_fn(agg);
296        aggregated_result
297            .into_iter()
298            .zip(chart_values)
299            .filter_map(|(x, y)| match y {
300                Some(y) => Some((x, y)),
301                None => None,
302            })
303            .map(|(x, y)| x.apply(y, &agg_fn))
304            .collect()
305    }
306}
307
308impl Stream {
309    async fn get_chart_aggregated(
310        &self,
311        req: ChartReqMetadata,
312        mmap: &Mmap,
313        aggregated_result: Vec<ValueTracker>,
314        agg: Aggregation,
315    ) -> Vec<ValueTracker> {
316        let mut last_accessed_lock = self.last_accessed.lock().await;
317        *last_accessed_lock = Some(Utc::now().timestamp());
318        drop(last_accessed_lock);
319
320        let hot_stream_option = self.hot_stream.read().await;
321        if let Some(ref x) = *hot_stream_option {
322            return x.add_stream_to_chart(req, aggregated_result, agg);
323        }
324        drop(hot_stream_option);
325
326        let mut writable_hot_stream = self.hot_stream.write().await;
327
328        if let Some(ref x) = *writable_hot_stream {
329            return x.add_stream_to_chart(req, aggregated_result, agg);
330        }
331
332        let hot_stream = self.disk_header.read_stream_from_mmap(mmap);
333
334        let res = hot_stream.add_stream_to_chart(req, aggregated_result, agg);
335        *writable_hot_stream = Some(hot_stream);
336        res
337    }
338}
339
340async fn get_chart_aggregated_batched(
341    req: Arc<ChartRequest>,
342    agg: Aggregation,
343    meta: ChartReqMetadata,
344    thread_idx: usize,
345    num_threads: usize,
346    time_partition: Arc<TimePartition>,
347) -> Vec<ValueTracker> {
348    let streams_per_thread = req.stream_ids.len() / num_threads;
349    let start_idx = streams_per_thread * thread_idx;
350    let stop_idx = if thread_idx == num_threads - 1 {
351        req.stream_ids.len()
352    } else {
353        streams_per_thread * (thread_idx + 1)
354    };
355    // pretty confident with this math, if it goes wrong, then well shit
356
357    let streams = req.stream_ids[start_idx..stop_idx]
358        .iter()
359        .filter_map(|x| time_partition.streams.get(x));
360
361    let mut aggregated_batch = vec![ValueTracker::default(); meta.resolution];
362    for stream in streams {
363        aggregated_batch = stream
364            .get_chart_aggregated(meta, &time_partition.mmap, aggregated_batch, agg)
365            .await;
366    }
367    return aggregated_batch;
368}
369
370#[inline]
371fn default_final_agg_fn(x: ValueTracker) -> f32 {
372    x.value
373}
374#[inline]
375fn avg_final_agg(x: ValueTracker) -> f32 {
376    x.value / x.count as f32
377}
378
379async fn time_partition_get_agg_chart(
380    time_partition: Arc<TimePartition>,
381    req: Arc<ChartRequest>,
382    agg: Aggregation,
383    num_threads: usize,
384) -> Vec<Datapoint> {
385    let meta: ChartReqMetadata = req.as_ref().into();
386
387    let threads_requested = req.stream_ids.len() / MIN_STREAMS_PER_THREAD;
388    let threads_capped = min(threads_requested, num_threads);
389    let num_threads = max(threads_capped, 1);
390
391    let batches = (0..num_threads).map(|x| {
392        get_chart_aggregated_batched(
393            req.clone(),
394            agg,
395            meta,
396            x,
397            num_threads,
398            time_partition.clone(),
399        )
400    });
401    let agg_fn = agg_to_agg_fn(agg);
402    let batched_agg_chart = JoinSet::from_iter(batches).join_all().await;
403    let reduced = batched_agg_chart.into_iter().reduce(|acc, x| {
404        acc.into_iter()
405            .zip(x)
406            .map(|(x, y)| x.agg(y, &agg_fn))
407            .collect()
408    });
409
410    let Some(reduced) = reduced else {
411        panic!("tried to aggregate nothing");
412    };
413
414    let final_agg = match agg {
415        Aggregation::Avg => avg_final_agg,
416        _ => default_final_agg_fn,
417    };
418
419    reduced
420        .into_iter()
421        .zip(iter_search_ts(meta))
422        .filter(|(x, _)| x.count > 0)
423        .map(|(x, ts)| Datapoint {
424            unix_s: ts,
425            value: final_agg(x),
426        })
427        .collect()
428}
429
430impl TryFrom<&TimePartitionFileHeader> for TimePartition {
431    type Error = io::Error;
432    fn try_from(value: &TimePartitionFileHeader) -> Result<Self, Self::Error> {
433        let hash_map_iter = value.disk_streams.iter().map(|x| {
434            (
435                x.stream_id,
436                Stream {
437                    disk_header: x.clone(),
438                    hot_stream: RwLock::new(None),
439                    last_accessed: Mutex::new(None),
440                },
441            )
442        });
443
444        let file = fs::File::open(&value.file_path)?;
445        let mmap = unsafe { Mmap::map(&file)? };
446        let streams = DashMap::from_iter(hash_map_iter);
447        let start_unix_s = value.start_unix_s;
448        Ok(Self {
449            start_unix_s,
450            streams,
451            mmap,
452        })
453    }
454}
455
456impl PartitionsFileHeader {
457    fn new(config: &StorageConfig) -> Self {
458        Self {
459            time_partitions: vec![TimePartitionFileHeader::new(config)],
460        }
461    }
462    fn thaw(&self, config: &StorageConfig) -> Result<Vec<Arc<TimePartition>>, io::Error> {
463        let now = Utc::now().timestamp();
464        let cutoff = now - config.cold_storage_after_s as i64;
465        self.time_partitions
466            .iter()
467            .filter(|x| x.start_unix_s > cutoff)
468            .map(|x| x.try_into())
469            .process_results(|iter| iter.map(Arc::new).collect())
470    }
471}
472
473#[derive(thiserror::Error, Debug)]
474pub enum StorageCreationError {
475    #[error("io error")]
476    IOError(#[from] io::Error),
477    #[error("config error")]
478    ConfigError(#[from] StorageConfigError),
479    #[error("postcard deserialization error")]
480    DeserializationError(#[from] postcard::Error),
481}
482
483impl Storage {
484    async fn get_partitions_in_range(
485        &self,
486        start_unix_s: i64,
487        stop_unix_s: i64,
488    ) -> Vec<Arc<TimePartition>> {
489        let mut partition_end = Utc::now().timestamp();
490        let mut partitions_in_range = Vec::new();
491        let partitions = self.partitions.read().await;
492        for partition in partitions.iter() {
493            if start_unix_s > partition_end {
494                return partitions_in_range;
495            }
496            partition_end = partition.start_unix_s;
497            if stop_unix_s < partition.start_unix_s {
498                continue;
499            }
500            partitions_in_range.push(partition.clone());
501        }
502        return partitions_in_range;
503    }
504
505    pub async fn get_agg_chart(&self, req: ChartRequest, agg: Aggregation) -> Vec<Datapoint> {
506        let time_partitions = self
507            .get_partitions_in_range(req.start_unix_s, req.stop_unix_s)
508            .await;
509        let arc_req = Arc::new(req);
510        let datapoint_jobs = time_partitions
511            .into_iter()
512            .map(|x| time_partition_get_agg_chart(x, arc_req.clone(), agg, self.num_threads));
513        let datapoints_nested = JoinSet::from_iter(datapoint_jobs).join_all().await;
514        let datapoints_flattened = datapoints_nested.into_iter().flatten().collect();
515        return datapoints_flattened;
516    }
517
518    pub fn new(config: StorageConfig, num_threads: usize) -> Result<Self, StorageCreationError> {
519        let config = config
520            .validate()
521            .map_err(StorageCreationError::ConfigError)?;
522
523        let partitions_file_path = config
524            .data_storage_dir
525            .join(PARTITIONS_FILE_HEADER_FILENAME);
526
527        let partition_file_header: PartitionsFileHeader = if partitions_file_path.exists() {
528            let partitions_file_header_bytes =
529                std::fs::read(partitions_file_path).map_err(StorageCreationError::IOError)?;
530            from_bytes(&partitions_file_header_bytes)
531                .map_err(StorageCreationError::DeserializationError)?
532        } else {
533            PartitionsFileHeader::new(&config)
534        };
535
536        let partitions = partition_file_header
537            .thaw(&config)
538            .map_err(StorageCreationError::IOError)?;
539        let partitions = RwLock::new(partitions);
540
541        Ok(Self {
542            config,
543            partitions,
544            partition_file_header,
545            num_threads,
546        })
547    }
548}