msr_plugin_csv_event_journal/internal/
context.rs1use std::{num::NonZeroUsize, path::PathBuf, result::Result as StdResult};
2
3use msr_core::{
4 event_journal::{
5 csv::FileRecordStorage as CsvFileRecordStorage, DefaultRecordPreludeGenerator, Entry,
6 Record, RecordFilter, RecordPreludeGenerator, RecordStorage, Result, Severity,
7 StoredRecord, StoredRecordPrelude,
8 },
9 storage::{
10 BinaryDataFormat, RecordStorageBase as _, RecordStorageWrite as _, StorageConfig,
11 StorageStatus,
12 },
13};
14
15#[derive(Debug, Clone, Copy, Eq, PartialEq)]
16pub enum State {
17 Inactive,
18 Active,
19}
20
21#[derive(Debug, Clone)]
22pub struct Status {
23 pub state: State,
24 pub storage: StorageStatus,
25}
26
27#[derive(Debug, Clone, Eq, PartialEq)]
28pub struct Config {
29 pub severity_threshold: Severity,
30 pub storage: StorageConfig,
31}
32
33pub(crate) struct Context {
34 config: Config,
35
36 state: State,
37
38 storage: CsvFileRecordStorage,
39}
40
41#[derive(Debug)]
42pub struct EntryRecorded(pub StoredRecordPrelude);
43
44#[derive(Debug, Copy, Clone, Eq, PartialEq)]
45pub enum EntryNotRecorded {
46 Inactive,
47 SeverityBelowThreshold,
48}
49
50pub type RecordEntryOutcome = StdResult<EntryRecorded, EntryNotRecorded>;
51
52impl Context {
53 pub(crate) fn try_new(
54 data_dir: PathBuf,
55 file_name_prefix: String,
56 binary_data_format: BinaryDataFormat,
57 initial_config: Config,
58 initial_state: State,
59 ) -> Result<Self> {
60 let storage = CsvFileRecordStorage::try_new(
61 data_dir,
62 file_name_prefix,
63 binary_data_format,
64 initial_config.storage.clone(),
65 )?;
66 Ok(Self {
67 config: initial_config,
68 state: initial_state,
69 storage,
70 })
71 }
72
73 pub(crate) fn config(&self) -> &Config {
74 &self.config
75 }
76
77 pub(crate) fn state(&self) -> State {
78 self.state
79 }
80
81 pub(crate) fn status(&mut self, with_storage_statistics: bool) -> Result<Status> {
82 let storage_statistics = if with_storage_statistics {
83 Some(self.storage.report_statistics()?)
84 } else {
85 None
86 };
87 let storage = StorageStatus {
88 descriptor: self.storage.descriptor().clone(),
89 statistics: storage_statistics,
90 };
91 Ok(Status {
92 state: self.state(),
93 storage,
94 })
95 }
96
97 pub(crate) fn recent_records(&mut self, limit: NonZeroUsize) -> Result<Vec<StoredRecord>> {
98 self.storage.recent_records(limit)
99 }
100
101 pub(crate) fn filter_records(
102 &mut self,
103 limit: NonZeroUsize,
104 filter: RecordFilter,
105 ) -> Result<Vec<StoredRecord>> {
106 self.storage.filter_records(limit, filter)
107 }
108
109 pub(crate) fn replace_config(&mut self, new_config: Config) -> Result<Config> {
113 if self.config == new_config {
114 return Ok(new_config);
115 }
116 log::debug!("Replacing config: {:?} -> {:?}", self.config, new_config);
117 self.storage.replace_config(new_config.storage.clone());
118 Ok(std::mem::replace(&mut self.config, new_config))
119 }
120
121 pub(crate) fn switch_state(&mut self, new_state: State) -> Result<State> {
125 if self.state == new_state {
126 return Ok(new_state);
127 }
128 log::debug!("Switching state: {:?} -> {:?}", self.state, new_state);
129 Ok(std::mem::replace(&mut self.state, new_state))
130 }
131
132 pub(crate) fn record_entry(&mut self, new_entry: Entry) -> Result<RecordEntryOutcome> {
133 match self.state {
134 State::Inactive => {
135 log::debug!("Discarding new entry while inactive: {:?}", new_entry);
136 Ok(Err(EntryNotRecorded::Inactive))
137 }
138 State::Active => {
139 if new_entry.severity < self.config.severity_threshold {
140 log::debug!(
141 "Discarding new entry below severity threshold: {:?}",
142 new_entry
143 );
144 return Ok(Err(EntryNotRecorded::SeverityBelowThreshold));
145 }
146 DefaultRecordPreludeGenerator
147 .generate_prelude()
148 .map(|(created_at, prelude)| {
149 (
150 created_at,
151 Record {
152 prelude,
153 entry: new_entry,
154 },
155 )
156 })
157 .and_then(|(created_at, recorded_entry)| {
158 log::debug!("Recording entry: {:?}", recorded_entry);
159 let prelude = StoredRecordPrelude {
160 id: recorded_entry.prelude.id.clone(),
161 created_at: created_at.system_time(),
162 };
163 self.storage
164 .append_record(&created_at, recorded_entry)
165 .map(|_created_at_offset| Ok(EntryRecorded(prelude)))
166 .map_err(msr_core::event_journal::Error::Storage)
167 })
168 }
169 }
170 }
171}