msr_plugin_csv_event_journal/internal/
context.rs

1use std::{num::NonZeroUsize, path::PathBuf, result::Result as StdResult};
2
3use msr_core::{
4    event_journal::{
5        csv::FileRecordStorage as CsvFileRecordStorage, DefaultRecordPreludeGenerator, Entry,
6        Record, RecordFilter, RecordPreludeGenerator, RecordStorage, Result, Severity,
7        StoredRecord, StoredRecordPrelude,
8    },
9    storage::{
10        BinaryDataFormat, RecordStorageBase as _, RecordStorageWrite as _, StorageConfig,
11        StorageStatus,
12    },
13};
14
15#[derive(Debug, Clone, Copy, Eq, PartialEq)]
16pub enum State {
17    Inactive,
18    Active,
19}
20
21#[derive(Debug, Clone)]
22pub struct Status {
23    pub state: State,
24    pub storage: StorageStatus,
25}
26
27#[derive(Debug, Clone, Eq, PartialEq)]
28pub struct Config {
29    pub severity_threshold: Severity,
30    pub storage: StorageConfig,
31}
32
33pub(crate) struct Context {
34    config: Config,
35
36    state: State,
37
38    storage: CsvFileRecordStorage,
39}
40
41#[derive(Debug)]
42pub struct EntryRecorded(pub StoredRecordPrelude);
43
44#[derive(Debug, Copy, Clone, Eq, PartialEq)]
45pub enum EntryNotRecorded {
46    Inactive,
47    SeverityBelowThreshold,
48}
49
50pub type RecordEntryOutcome = StdResult<EntryRecorded, EntryNotRecorded>;
51
52impl Context {
53    pub(crate) fn try_new(
54        data_dir: PathBuf,
55        file_name_prefix: String,
56        binary_data_format: BinaryDataFormat,
57        initial_config: Config,
58        initial_state: State,
59    ) -> Result<Self> {
60        let storage = CsvFileRecordStorage::try_new(
61            data_dir,
62            file_name_prefix,
63            binary_data_format,
64            initial_config.storage.clone(),
65        )?;
66        Ok(Self {
67            config: initial_config,
68            state: initial_state,
69            storage,
70        })
71    }
72
73    pub(crate) fn config(&self) -> &Config {
74        &self.config
75    }
76
77    pub(crate) fn state(&self) -> State {
78        self.state
79    }
80
81    pub(crate) fn status(&mut self, with_storage_statistics: bool) -> Result<Status> {
82        let storage_statistics = if with_storage_statistics {
83            Some(self.storage.report_statistics()?)
84        } else {
85            None
86        };
87        let storage = StorageStatus {
88            descriptor: self.storage.descriptor().clone(),
89            statistics: storage_statistics,
90        };
91        Ok(Status {
92            state: self.state(),
93            storage,
94        })
95    }
96
97    pub(crate) fn recent_records(&mut self, limit: NonZeroUsize) -> Result<Vec<StoredRecord>> {
98        self.storage.recent_records(limit)
99    }
100
101    pub(crate) fn filter_records(
102        &mut self,
103        limit: NonZeroUsize,
104        filter: RecordFilter,
105    ) -> Result<Vec<StoredRecord>> {
106        self.storage.filter_records(limit, filter)
107    }
108
109    /// Switch the current configuration
110    ///
111    /// Returns the previous configuration.
112    pub(crate) fn replace_config(&mut self, new_config: Config) -> Result<Config> {
113        if self.config == new_config {
114            return Ok(new_config);
115        }
116        log::debug!("Replacing config: {:?} -> {:?}", self.config, new_config);
117        self.storage.replace_config(new_config.storage.clone());
118        Ok(std::mem::replace(&mut self.config, new_config))
119    }
120
121    /// Switch the current state
122    ///
123    /// Returns the previous state.
124    pub(crate) fn switch_state(&mut self, new_state: State) -> Result<State> {
125        if self.state == new_state {
126            return Ok(new_state);
127        }
128        log::debug!("Switching state: {:?} -> {:?}", self.state, new_state);
129        Ok(std::mem::replace(&mut self.state, new_state))
130    }
131
132    pub(crate) fn record_entry(&mut self, new_entry: Entry) -> Result<RecordEntryOutcome> {
133        match self.state {
134            State::Inactive => {
135                log::debug!("Discarding new entry while inactive: {:?}", new_entry);
136                Ok(Err(EntryNotRecorded::Inactive))
137            }
138            State::Active => {
139                if new_entry.severity < self.config.severity_threshold {
140                    log::debug!(
141                        "Discarding new entry below severity threshold: {:?}",
142                        new_entry
143                    );
144                    return Ok(Err(EntryNotRecorded::SeverityBelowThreshold));
145                }
146                DefaultRecordPreludeGenerator
147                    .generate_prelude()
148                    .map(|(created_at, prelude)| {
149                        (
150                            created_at,
151                            Record {
152                                prelude,
153                                entry: new_entry,
154                            },
155                        )
156                    })
157                    .and_then(|(created_at, recorded_entry)| {
158                        log::debug!("Recording entry: {:?}", recorded_entry);
159                        let prelude = StoredRecordPrelude {
160                            id: recorded_entry.prelude.id.clone(),
161                            created_at: created_at.system_time(),
162                        };
163                        self.storage
164                            .append_record(&created_at, recorded_entry)
165                            .map(|_created_at_offset| Ok(EntryRecorded(prelude)))
166                            .map_err(msr_core::event_journal::Error::Storage)
167                    })
168            }
169        }
170    }
171}