1use dashmap::DashMap;
2use itertools::Itertools;
3use memmap2::Mmap;
4use pco::standalone::simple_decompress;
5use postcard::from_bytes;
6
7use std::cmp::{max, min};
8use std::sync::Arc;
9use std::{fmt::Debug, path::PathBuf};
10use std::{fs, io, usize};
11use tokio::task::JoinSet;
12
13use chrono::Utc;
14use serde::{Deserialize, Serialize};
15use tokio::sync::{Mutex, RwLock};
16
17const PARTITIONS_FILE_HEADER_FILENAME: &str = "CruncheRTPartitionsConfig";
18const MIN_PTS_TO_COMPRESS: usize = 8192;
19const MIN_STREAMS_PER_THREAD: usize = 1024;
20
21#[derive(Clone, Copy)]
22pub enum Aggregation {
23 Sum,
24 Avg,
25 Min,
26 Max,
27}
28
29pub struct ChartRequest {
30 pub stream_ids: Vec<u64>,
31 pub start_unix_s: i64,
32 pub stop_unix_s: i64,
33 pub step_s: u32,
34}
35
36#[derive(Debug, Copy, Clone)]
37pub struct Datapoint {
38 pub unix_s: i64,
39 pub value: f32,
40}
41
42#[derive(Serialize, Deserialize, Debug)]
43pub struct StorageConfig {
44 pub compression_level: usize,
45 pub retention_period_s: usize,
46 pub cold_storage_after_s: usize,
47 pub data_frequency_s: usize,
48 pub stream_cache_ttl_s: usize,
49 pub data_storage_dir: PathBuf,
50}
51
52#[derive(Serialize, Deserialize, Clone, Copy)]
53struct DiskStreamFileHeader {
54 stream_id: u64,
55 unix_s_byte_start: usize,
56 unix_s_byte_stop: usize,
57 values_byte_stop: usize,
58 compressed: bool,
59}
60
61#[derive(Serialize, Deserialize, Clone)]
62struct TimePartitionFileHeader {
63 start_unix_s: i64,
64 file_path: PathBuf,
65 disk_streams: Vec<DiskStreamFileHeader>,
66}
67
68impl TimePartitionFileHeader {
69 fn new(config: &StorageConfig) -> Self {
70 let now = Utc::now().timestamp();
71 let file_path = config.data_storage_dir.join(now.to_string());
72 Self {
73 start_unix_s: now,
74 file_path,
75 disk_streams: Default::default(),
76 }
77 }
78}
79
80#[derive(Serialize, Deserialize)]
81struct PartitionsFileHeader {
82 time_partitions: Vec<TimePartitionFileHeader>,
84}
85
86#[derive(Default)]
87struct HotStream {
88 unix_seconds: Vec<i64>,
89 values: Vec<f32>,
90}
91
92struct Stream {
93 disk_header: DiskStreamFileHeader,
94 hot_stream: RwLock<Option<HotStream>>,
95 last_accessed: Mutex<Option<i64>>,
96}
97
98struct TimePartition {
99 start_unix_s: i64,
100 mmap: Mmap,
101 streams: DashMap<u64, Stream>,
102}
103
104pub struct Storage {
105 config: StorageConfig,
106 partitions: RwLock<Vec<Arc<TimePartition>>>,
107 partition_file_header: PartitionsFileHeader,
108 num_threads: usize,
109}
110
111#[derive(Clone, Copy)]
112struct ChartReqMetadata {
113 start_unix_s: i64,
114 stop_unix_s: i64,
115 step_s: u32,
116 resolution: usize,
117}
118
119#[inline]
120fn resolution(start_unix_s: i64, stop_unix_s: i64, step_s: u32) -> usize {
121 let duration_s = (stop_unix_s - start_unix_s) as u32;
122 let resolution = duration_s / step_s;
123 return resolution as usize;
124}
125
126impl From<&ChartRequest> for ChartReqMetadata {
127 fn from(value: &ChartRequest) -> Self {
128 Self {
129 start_unix_s: value.start_unix_s,
130 stop_unix_s: value.stop_unix_s,
131 step_s: value.step_s,
132 resolution: resolution(value.start_unix_s, value.stop_unix_s, value.step_s),
133 }
134 }
135}
136
137impl Default for StorageConfig {
138 fn default() -> Self {
139 Self {
140 compression_level: 8,
141 retention_period_s: 31104000, cold_storage_after_s: 7776000, data_frequency_s: 900,
144 stream_cache_ttl_s: 900,
145 data_storage_dir: PathBuf::from("/var/lib/wolfeymetrics"),
146 }
147 }
148}
149
150const MIN_COMPRESSION_LEVEL: usize = 4;
151const MAX_COMPRESSION_LEVEL: usize = 12;
152const MIN_RETENTION_PERIOD_S: usize = 900; const MIN_COLD_STORAGE_S: usize = 7776000; const MAX_RETENTION_PERIOD_S: usize = 3156000000; const MAX_DATA_FREQUENCY_S: usize = 604800; #[derive(thiserror::Error, Debug)]
158pub enum StorageConfigError {
159 #[error("COMPRESSION_LEVEL must be >= {MIN_COMPRESSION_LEVEL}")]
160 ToLowCompressionLevel,
161 #[error("COMPRESSION_LEVEL must be <= {MAX_COMPRESSION_LEVEL}")]
162 ToHighCompressionLevel,
163 #[error("RETENTION_PERIOD must be >= {MIN_RETENTION_PERIOD_S}")]
164 ToLowRetentionPeriod,
165 #[error("RETENTION_PERIOD must be <= {MAX_RETENTION_PERIOD_S}")]
166 ToHighRetentionPeriod,
167 #[error("RETENTION_PERIOD_S must be >= COLD_STORAGE_AFTER_S")]
168 ColdStorageCannotBeGreaterThanRetention,
169 #[error("COLD_STORAGE_AFTER_S must be >= {MIN_COLD_STORAGE_S} or RETENTION_PERIOD_S")]
170 ColdStorageTooLow,
171 #[error("DATA_FREQUENCY_S must be <= {MAX_DATA_FREQUENCY_S}")]
172 DataFrequencyTooHigh,
173}
174
175impl StorageConfig {
176 fn validate(self) -> Result<Self, StorageConfigError> {
177 if self.compression_level < MIN_COMPRESSION_LEVEL {
178 return Err(StorageConfigError::ToLowCompressionLevel);
179 }
180
181 if self.compression_level > MAX_COMPRESSION_LEVEL {
182 return Err(StorageConfigError::ToHighCompressionLevel);
183 }
184
185 if self.retention_period_s < MIN_RETENTION_PERIOD_S {
186 return Err(StorageConfigError::ToLowRetentionPeriod);
187 }
188 if self.retention_period_s > MAX_RETENTION_PERIOD_S {
189 return Err(StorageConfigError::ToHighRetentionPeriod);
190 }
191
192 if self.retention_period_s < self.cold_storage_after_s {
193 return Err(StorageConfigError::ColdStorageCannotBeGreaterThanRetention);
194 }
195
196 let min_cold_storage_s = std::cmp::min(MIN_COLD_STORAGE_S, self.retention_period_s);
197
198 if self.cold_storage_after_s < min_cold_storage_s {
199 return Err(StorageConfigError::ColdStorageTooLow);
200 }
201
202 if self.data_frequency_s > MAX_DATA_FREQUENCY_S {
203 return Err(StorageConfigError::DataFrequencyTooHigh);
204 }
205
206 Ok(self)
207 }
208}
209
210impl DiskStreamFileHeader {
211 fn read_stream_from_mmap(&self, mmap: &Mmap) -> HotStream {
212 let unix_s_bytes = &mmap[self.unix_s_byte_start..self.unix_s_byte_stop];
213 let value_bytes = &mmap[self.unix_s_byte_stop..self.values_byte_stop];
214
215 let Ok(unix_s_decompressed) = simple_decompress(unix_s_bytes) else {
216 return HotStream::default();
217 };
218 let Ok(values_decompressed) = simple_decompress(value_bytes) else {
219 return HotStream::default();
220 };
221
222 HotStream {
223 unix_seconds: unix_s_decompressed,
224 values: values_decompressed,
225 }
226 }
227}
228
229#[derive(Default, Debug, Clone, Copy)]
230struct ValueTracker {
231 value: f32,
232 count: u32,
233}
234
235impl ValueTracker {
236 #[inline]
237 fn agg<F>(self, rhs: Self, agg_fn: &F) -> Self
238 where
239 F: Fn(f32, f32) -> f32,
240 {
241 Self {
242 value: agg_fn(self.value, rhs.value),
243 count: self.count + rhs.count,
244 }
245 }
246 #[inline]
247 fn apply<F>(self, rhs: f32, agg_fn: &F) -> Self
248 where
249 F: Fn(f32, f32) -> f32,
250 {
251 Self {
252 value: agg_fn(self.value, rhs),
253 count: self.count + 1,
254 }
255 }
256}
257#[inline]
258fn agg_to_agg_fn(agg: Aggregation) -> impl Fn(f32, f32) -> f32 {
259 match agg {
260 Aggregation::Sum => std::ops::Add::add,
261 Aggregation::Avg => std::ops::Add::add,
262 Aggregation::Min => f32::min,
263 Aggregation::Max => f32::max,
264 }
265}
266#[inline]
267fn iter_search_ts(req: ChartReqMetadata) -> impl Iterator<Item = i64> {
268 (req.start_unix_s..req.stop_unix_s)
269 .rev()
270 .step_by(req.step_s as usize)
271}
272
273impl HotStream {
274 fn get_chart_values(&self, req: ChartReqMetadata) -> impl Iterator<Item = Option<f32>> + '_ {
275 iter_search_ts(req).map(move |x| {
276 let found_ts_idx = self.unix_seconds.binary_search(&x).unwrap_or_else(|x| x);
277 let found_ts_idx = min(found_ts_idx, self.unix_seconds.len() - 1);
278 let found_ts = self.unix_seconds[found_ts_idx];
279 let diff = x.abs_diff(found_ts);
280 if diff < req.step_s as u64 {
281 let value = self.values[found_ts_idx];
282 Some(value)
283 } else {
284 None
285 }
286 })
287 }
288 fn add_stream_to_chart(
289 &self,
290 req: ChartReqMetadata,
291 aggregated_result: Vec<ValueTracker>,
292 agg: Aggregation,
293 ) -> Vec<ValueTracker> {
294 let chart_values = self.get_chart_values(req);
295 let agg_fn = agg_to_agg_fn(agg);
296 aggregated_result
297 .into_iter()
298 .zip(chart_values)
299 .filter_map(|(x, y)| match y {
300 Some(y) => Some((x, y)),
301 None => None,
302 })
303 .map(|(x, y)| x.apply(y, &agg_fn))
304 .collect()
305 }
306}
307
308impl Stream {
309 async fn get_chart_aggregated(
310 &self,
311 req: ChartReqMetadata,
312 mmap: &Mmap,
313 aggregated_result: Vec<ValueTracker>,
314 agg: Aggregation,
315 ) -> Vec<ValueTracker> {
316 let mut last_accessed_lock = self.last_accessed.lock().await;
317 *last_accessed_lock = Some(Utc::now().timestamp());
318 drop(last_accessed_lock);
319
320 let hot_stream_option = self.hot_stream.read().await;
321 if let Some(ref x) = *hot_stream_option {
322 return x.add_stream_to_chart(req, aggregated_result, agg);
323 }
324 drop(hot_stream_option);
325
326 let mut writable_hot_stream = self.hot_stream.write().await;
327
328 if let Some(ref x) = *writable_hot_stream {
329 return x.add_stream_to_chart(req, aggregated_result, agg);
330 }
331
332 let hot_stream = self.disk_header.read_stream_from_mmap(mmap);
333
334 let res = hot_stream.add_stream_to_chart(req, aggregated_result, agg);
335 *writable_hot_stream = Some(hot_stream);
336 res
337 }
338}
339
340async fn get_chart_aggregated_batched(
341 req: Arc<ChartRequest>,
342 agg: Aggregation,
343 meta: ChartReqMetadata,
344 thread_idx: usize,
345 num_threads: usize,
346 time_partition: Arc<TimePartition>,
347) -> Vec<ValueTracker> {
348 let streams_per_thread = req.stream_ids.len() / num_threads;
349 let start_idx = streams_per_thread * thread_idx;
350 let stop_idx = if thread_idx == num_threads - 1 {
351 req.stream_ids.len()
352 } else {
353 streams_per_thread * (thread_idx + 1)
354 };
355 let streams = req.stream_ids[start_idx..stop_idx]
358 .iter()
359 .filter_map(|x| time_partition.streams.get(x));
360
361 let mut aggregated_batch = vec![ValueTracker::default(); meta.resolution];
362 for stream in streams {
363 aggregated_batch = stream
364 .get_chart_aggregated(meta, &time_partition.mmap, aggregated_batch, agg)
365 .await;
366 }
367 return aggregated_batch;
368}
369
370#[inline]
371fn default_final_agg_fn(x: ValueTracker) -> f32 {
372 x.value
373}
374#[inline]
375fn avg_final_agg(x: ValueTracker) -> f32 {
376 x.value / x.count as f32
377}
378
379async fn time_partition_get_agg_chart(
380 time_partition: Arc<TimePartition>,
381 req: Arc<ChartRequest>,
382 agg: Aggregation,
383 num_threads: usize,
384) -> Vec<Datapoint> {
385 let meta: ChartReqMetadata = req.as_ref().into();
386
387 let threads_requested = req.stream_ids.len() / MIN_STREAMS_PER_THREAD;
388 let threads_capped = min(threads_requested, num_threads);
389 let num_threads = max(threads_capped, 1);
390
391 let batches = (0..num_threads).map(|x| {
392 get_chart_aggregated_batched(
393 req.clone(),
394 agg,
395 meta,
396 x,
397 num_threads,
398 time_partition.clone(),
399 )
400 });
401 let agg_fn = agg_to_agg_fn(agg);
402 let batched_agg_chart = JoinSet::from_iter(batches).join_all().await;
403 let reduced = batched_agg_chart.into_iter().reduce(|acc, x| {
404 acc.into_iter()
405 .zip(x)
406 .map(|(x, y)| x.agg(y, &agg_fn))
407 .collect()
408 });
409
410 let Some(reduced) = reduced else {
411 panic!("tried to aggregate nothing");
412 };
413
414 let final_agg = match agg {
415 Aggregation::Avg => avg_final_agg,
416 _ => default_final_agg_fn,
417 };
418
419 reduced
420 .into_iter()
421 .zip(iter_search_ts(meta))
422 .filter(|(x, _)| x.count > 0)
423 .map(|(x, ts)| Datapoint {
424 unix_s: ts,
425 value: final_agg(x),
426 })
427 .collect()
428}
429
430impl TryFrom<&TimePartitionFileHeader> for TimePartition {
431 type Error = io::Error;
432 fn try_from(value: &TimePartitionFileHeader) -> Result<Self, Self::Error> {
433 let hash_map_iter = value.disk_streams.iter().map(|x| {
434 (
435 x.stream_id,
436 Stream {
437 disk_header: x.clone(),
438 hot_stream: RwLock::new(None),
439 last_accessed: Mutex::new(None),
440 },
441 )
442 });
443
444 let file = fs::File::open(&value.file_path)?;
445 let mmap = unsafe { Mmap::map(&file)? };
446 let streams = DashMap::from_iter(hash_map_iter);
447 let start_unix_s = value.start_unix_s;
448 Ok(Self {
449 start_unix_s,
450 streams,
451 mmap,
452 })
453 }
454}
455
456impl PartitionsFileHeader {
457 fn new(config: &StorageConfig) -> Self {
458 Self {
459 time_partitions: vec![TimePartitionFileHeader::new(config)],
460 }
461 }
462 fn thaw(&self, config: &StorageConfig) -> Result<Vec<Arc<TimePartition>>, io::Error> {
463 let now = Utc::now().timestamp();
464 let cutoff = now - config.cold_storage_after_s as i64;
465 self.time_partitions
466 .iter()
467 .filter(|x| x.start_unix_s > cutoff)
468 .map(|x| x.try_into())
469 .process_results(|iter| iter.map(Arc::new).collect())
470 }
471}
472
473#[derive(thiserror::Error, Debug)]
474pub enum StorageCreationError {
475 #[error("io error")]
476 IOError(#[from] io::Error),
477 #[error("config error")]
478 ConfigError(#[from] StorageConfigError),
479 #[error("postcard deserialization error")]
480 DeserializationError(#[from] postcard::Error),
481}
482
483impl Storage {
484 async fn get_partitions_in_range(
485 &self,
486 start_unix_s: i64,
487 stop_unix_s: i64,
488 ) -> Vec<Arc<TimePartition>> {
489 let mut partition_end = Utc::now().timestamp();
490 let mut partitions_in_range = Vec::new();
491 let partitions = self.partitions.read().await;
492 for partition in partitions.iter() {
493 if start_unix_s > partition_end {
494 return partitions_in_range;
495 }
496 partition_end = partition.start_unix_s;
497 if stop_unix_s < partition.start_unix_s {
498 continue;
499 }
500 partitions_in_range.push(partition.clone());
501 }
502 return partitions_in_range;
503 }
504
505 pub async fn get_agg_chart(&self, req: ChartRequest, agg: Aggregation) -> Vec<Datapoint> {
506 let time_partitions = self
507 .get_partitions_in_range(req.start_unix_s, req.stop_unix_s)
508 .await;
509 let arc_req = Arc::new(req);
510 let datapoint_jobs = time_partitions
511 .into_iter()
512 .map(|x| time_partition_get_agg_chart(x, arc_req.clone(), agg, self.num_threads));
513 let datapoints_nested = JoinSet::from_iter(datapoint_jobs).join_all().await;
514 let datapoints_flattened = datapoints_nested.into_iter().flatten().collect();
515 return datapoints_flattened;
516 }
517
518 pub fn new(config: StorageConfig, num_threads: usize) -> Result<Self, StorageCreationError> {
519 let config = config
520 .validate()
521 .map_err(StorageCreationError::ConfigError)?;
522
523 let partitions_file_path = config
524 .data_storage_dir
525 .join(PARTITIONS_FILE_HEADER_FILENAME);
526
527 let partition_file_header: PartitionsFileHeader = if partitions_file_path.exists() {
528 let partitions_file_header_bytes =
529 std::fs::read(partitions_file_path).map_err(StorageCreationError::IOError)?;
530 from_bytes(&partitions_file_header_bytes)
531 .map_err(StorageCreationError::DeserializationError)?
532 } else {
533 PartitionsFileHeader::new(&config)
534 };
535
536 let partitions = partition_file_header
537 .thaw(&config)
538 .map_err(StorageCreationError::IOError)?;
539 let partitions = RwLock::new(partitions);
540
541 Ok(Self {
542 config,
543 partitions,
544 partition_file_header,
545 num_threads,
546 })
547 }
548}