msr_core/storage/
mod.rs

1//! Persistence related stuff
2
3use std::{
4    collections::VecDeque,
5    io::Error as IoError,
6    num::{NonZeroU32, NonZeroU64, NonZeroUsize},
7    path::PathBuf,
8    result::Result as StdResult,
9    time::{Duration, SystemTime},
10};
11
12use base64::prelude::*;
13use thiserror::Error;
14
15use crate::{
16    fs::WriteResult,
17    time::{Interval, SystemInstant},
18};
19
20// TODO: Currently unused
21pub mod field;
22
23#[cfg(feature = "csv-storage")]
24pub mod csv;
25
26#[derive(Error, Debug)]
27pub enum Error {
28    #[error(transparent)]
29    Io(#[from] IoError),
30
31    #[cfg(feature = "csv-storage")]
32    #[error(transparent)]
33    Csv(#[from] ::csv::Error),
34
35    #[error(transparent)]
36    Other(#[from] anyhow::Error),
37}
38
39#[cfg(feature = "csv-storage")]
40impl From<crate::fs::csv::Error> for Error {
41    fn from(err: crate::fs::csv::Error) -> Self {
42        use crate::fs::csv::Error::*;
43        match err {
44            Io(err) => Error::Io(err),
45            Csv(err) => Error::Csv(err),
46        }
47    }
48}
49
50pub type Result<T> = StdResult<T, Error>;
51
52// Maximum pre-allocated capacity to avoid allocation errors
53// caused by excessively high capacity or limit parameters
54pub const MAX_PREALLOCATED_CAPACITY_LIMIT: usize = 16_384; // 2^14
55
56#[derive(Debug, Clone)]
57pub struct StorageStatus {
58    pub descriptor: StorageDescriptor,
59    pub statistics: Option<StorageStatistics>,
60}
61
62#[derive(Debug, Clone, Copy, Eq, PartialEq)]
63pub enum TimeInterval {
64    Days(NonZeroU32),
65}
66
67const SECONDS_PER_DAY: u64 = 24 * 3_600;
68
69impl From<TimeInterval> for Duration {
70    fn from(from: TimeInterval) -> Self {
71        use TimeInterval::*;
72        match from {
73            Days(days) => Duration::from_secs(SECONDS_PER_DAY * u64::from(days.get())),
74        }
75    }
76}
77
78impl From<TimeInterval> for Interval {
79    fn from(from: TimeInterval) -> Self {
80        use TimeInterval::*;
81        match from {
82            Days(days) => Interval::Days(days.get()),
83        }
84    }
85}
86
87#[derive(Debug, Clone, Copy, Eq, PartialEq)]
88pub enum MemorySize {
89    Bytes(NonZeroU64),
90}
91
92#[derive(Debug, Clone, Eq, PartialEq)]
93pub struct StorageConfig {
94    pub retention_time: TimeInterval,
95    pub segmentation: StorageSegmentConfig,
96}
97
98#[derive(Debug, Clone, Eq, PartialEq)]
99pub struct StorageSegmentConfig {
100    pub time_interval: TimeInterval,
101    pub size_limit: MemorySize,
102}
103
104#[derive(Debug, Clone)]
105pub struct StorageDescriptor {
106    pub kind: String,
107    pub base_path: Option<PathBuf>,
108    pub binary_data_format: BinaryDataFormat,
109}
110
111#[derive(Debug, Clone)]
112pub struct StorageStatistics {
113    /// The total number of records (if known)
114    pub total_records: Option<usize>,
115
116    /// The total size in bytes (if known)
117    pub total_bytes: Option<u64>,
118
119    /// Segment statistics (if applicable and available)
120    pub segments: Option<Vec<StorageSegmentStatistics>>,
121}
122
123#[derive(Debug, Clone)]
124pub struct StorageSegmentStatistics {
125    pub created_at: SystemTime,
126
127    pub total_records: usize,
128
129    /// The total size in bytes (if known)
130    pub total_bytes: Option<u64>,
131}
132
133pub trait ReadableRecordPrelude {
134    fn created_at_offset(&self) -> CreatedAtOffset;
135}
136
137pub trait WritableRecordPrelude {
138    fn set_created_at_offset(&mut self, created_at_offset: CreatedAtOffset);
139}
140
141pub type CreatedAtOffsetNanos = u64;
142
143#[derive(Default, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
144pub struct CreatedAtOffset {
145    nanos: CreatedAtOffsetNanos,
146}
147
148impl CreatedAtOffset {
149    #[must_use]
150    pub fn system_time_from_origin(self, origin: SystemTime) -> SystemTime {
151        origin + Duration::from(self)
152    }
153
154    #[must_use]
155    pub const fn to_duration(self) -> Duration {
156        let Self { nanos } = self;
157        Duration::from_nanos(nanos)
158    }
159}
160
161impl From<CreatedAtOffsetNanos> for CreatedAtOffset {
162    fn from(nanos: CreatedAtOffsetNanos) -> Self {
163        Self { nanos }
164    }
165}
166
167impl From<CreatedAtOffset> for CreatedAtOffsetNanos {
168    fn from(from: CreatedAtOffset) -> Self {
169        let CreatedAtOffset { nanos } = from;
170        nanos
171    }
172}
173
174impl From<Duration> for CreatedAtOffset {
175    fn from(from: Duration) -> Self {
176        let nanos = from.as_nanos();
177        // TODO: Handle overflow?
178        debug_assert!(nanos <= u128::from(CreatedAtOffsetNanos::MAX));
179        Self {
180            nanos: nanos as CreatedAtOffsetNanos,
181        }
182    }
183}
184
185impl From<CreatedAtOffset> for Duration {
186    fn from(from: CreatedAtOffset) -> Self {
187        from.to_duration()
188    }
189}
190
191#[derive(Debug, Default, Clone, Eq, PartialEq)]
192pub struct RecordPreludeFilter {
193    pub since_created_at: Option<SystemTime>,
194    pub until_created_at: Option<SystemTime>,
195}
196
197pub trait RecordStorageBase {
198    fn descriptor(&self) -> &StorageDescriptor;
199
200    fn config(&self) -> &StorageConfig;
201
202    fn replace_config(&mut self, new_config: StorageConfig) -> StorageConfig;
203
204    fn perform_housekeeping(&mut self) -> Result<()>;
205
206    /// Try to drop records that have been created before the given time
207    fn retain_all_records_created_since(&mut self, created_since: SystemTime) -> Result<()>;
208
209    fn report_statistics(&mut self) -> Result<StorageStatistics>;
210}
211
212/// Format of custom, binary data
213#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
214pub enum BinaryDataFormat {
215    /// Arbitrary binary data
216    ///
217    /// Serialized as Base64 with standard alphabet and no padding.
218    #[default]
219    Bytes,
220
221    /// Serialized UTF-8 data
222    ///
223    /// A typical use case is the tunneling of UTF-8 JSON data.
224    Utf8,
225}
226
227fn encode_binary_data_bytes(input: impl AsRef<[u8]>) -> String {
228    BASE64_STANDARD_NO_PAD.encode(input)
229}
230
231fn encode_binary_data_utf8(input: Vec<u8>) -> anyhow::Result<String> {
232    String::from_utf8(input).map_err(Into::into)
233}
234
235pub fn encode_binary_data_into_string(
236    input: Vec<u8>,
237    format: BinaryDataFormat,
238) -> anyhow::Result<String> {
239    match format {
240        BinaryDataFormat::Bytes => Ok(encode_binary_data_bytes(&input)),
241        BinaryDataFormat::Utf8 => encode_binary_data_utf8(input),
242    }
243}
244
245fn decode_binary_data_bytes(input: impl AsRef<[u8]>) -> anyhow::Result<Vec<u8>> {
246    BASE64_STANDARD_NO_PAD.decode(input).map_err(Into::into)
247}
248
249fn decode_binary_data_utf8(input: String) -> Vec<u8> {
250    input.into_bytes()
251}
252
253pub fn decode_binary_data_from_string(
254    input: String,
255    format: BinaryDataFormat,
256) -> anyhow::Result<Vec<u8>> {
257    match format {
258        BinaryDataFormat::Bytes => decode_binary_data_bytes(&input),
259        BinaryDataFormat::Utf8 => Ok(decode_binary_data_utf8(input)),
260    }
261}
262
263pub trait RecordStorageWrite<R>: RecordStorageBase
264where
265    R: WritableRecordPrelude,
266{
267    fn append_record(
268        &mut self,
269        created_at: &SystemInstant,
270        record: R,
271    ) -> Result<(WriteResult, CreatedAtOffset)>;
272}
273
274pub trait RecordStorageRead<R>: RecordStorageBase {
275    fn recent_records(&mut self, limit: NonZeroUsize) -> Result<Vec<(SystemTime, R)>>;
276}
277
278#[allow(missing_debug_implementations)]
279pub struct InMemoryRecordStorage<R> {
280    config: StorageConfig,
281    descriptor: StorageDescriptor,
282    created_at_origin: SystemInstant,
283    records: VecDeque<R>,
284    _record_phantom: std::marker::PhantomData<R>,
285}
286
287impl<R> InMemoryRecordStorage<R>
288where
289    R: Clone,
290{
291    #[must_use]
292    pub fn new(config: StorageConfig) -> Self {
293        let descriptor = StorageDescriptor {
294            kind: "in-memory".to_string(),
295            base_path: None,
296            binary_data_format: Default::default(), // no serialization
297        };
298        Self {
299            config,
300            descriptor,
301            created_at_origin: SystemInstant::now(),
302            records: VecDeque::with_capacity(MAX_PREALLOCATED_CAPACITY_LIMIT),
303            _record_phantom: Default::default(),
304        }
305    }
306
307    pub fn recent_records(&mut self, limit: NonZeroUsize) -> Result<Vec<R>> {
308        let total_count = self.records.len();
309        let limited_count = limit.get().min(total_count);
310        Ok(self
311            .records
312            .iter()
313            .skip(total_count - limited_count)
314            .cloned()
315            .collect())
316    }
317}
318
319impl<R> RecordStorageBase for InMemoryRecordStorage<R>
320where
321    R: ReadableRecordPrelude,
322{
323    fn descriptor(&self) -> &StorageDescriptor {
324        &self.descriptor
325    }
326
327    fn config(&self) -> &StorageConfig {
328        &self.config
329    }
330
331    fn replace_config(&mut self, new_config: StorageConfig) -> StorageConfig {
332        std::mem::replace(&mut self.config, new_config)
333    }
334
335    fn perform_housekeeping(&mut self) -> Result<()> {
336        Ok(())
337    }
338
339    fn retain_all_records_created_since(&mut self, created_since: SystemTime) -> Result<()> {
340        let created_since_offset = created_since
341            .duration_since(self.created_at_origin.system_time())
342            .unwrap_or_default()
343            .into();
344        while let Some(first) = self.records.front() {
345            if first.created_at_offset() >= created_since_offset {
346                break;
347            }
348            self.records.pop_front();
349        }
350        Ok(())
351    }
352
353    fn report_statistics(&mut self) -> Result<StorageStatistics> {
354        Ok(StorageStatistics {
355            total_records: Some(self.records.len()),
356            total_bytes: None,
357            segments: None,
358        })
359    }
360}
361
362impl<R> RecordStorageWrite<R> for InMemoryRecordStorage<R>
363where
364    R: ReadableRecordPrelude + WritableRecordPrelude,
365{
366    fn append_record(
367        &mut self,
368        created_at: &SystemInstant,
369        mut record: R,
370    ) -> Result<(WriteResult, CreatedAtOffset)> {
371        debug_assert!(created_at.instant() >= self.created_at_origin.instant());
372        let created_at_offset =
373            CreatedAtOffset::from(created_at.instant() - self.created_at_origin.instant());
374        debug_assert_eq!(record.created_at_offset(), Default::default()); // not yet initialized
375        record.set_created_at_offset(created_at_offset);
376        self.records.push_back(record);
377        Ok((Ok(()), created_at_offset))
378    }
379}