1use std::{
2 collections::{hash_map::Entry, HashMap},
3 fmt, fs,
4 num::NonZeroUsize,
5 path::{Path, PathBuf},
6};
7
8use msr_core::{
9 register::{
10 recorder::{
11 csv::FileRecordStorage as CsvFileRecordStorage, RecordPrelude, RecordStorage as _,
12 StoredRecordPrelude as StoredRegisterRecordPrelude,
13 },
14 Index as RegisterIndex,
15 },
16 storage::{
17 RecordPreludeFilter, RecordStorageBase, Result as StorageResult, StorageConfig,
18 StorageStatus,
19 },
20 time::{SystemInstant, Timestamp},
21 ScalarType, ScalarValue,
22};
23
24use crate::{
25 api::{
26 ObservedRegisterValues, RegisterGroupId, RegisterRecord, RegisterType, StoredRegisterRecord,
27 },
28 Error, Result,
29};
30
31#[derive(Debug, Clone, Hash, Eq, PartialEq)]
32pub(crate) struct PartitionId(String);
33
34impl PartitionId {
35 pub(crate) fn encode(s: &str) -> Self {
36 Self(bs58::encode(s).into_string())
37 }
38}
39
40impl AsRef<str> for PartitionId {
41 fn as_ref(&self) -> &str {
42 let Self(inner) = &self;
43 inner
44 }
45}
46
47impl fmt::Display for PartitionId {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 f.write_str(self.as_ref())
50 }
51}
52
53#[derive(Debug, Clone, Eq, PartialEq)]
54pub struct RegisterGroupConfig {
55 pub registers: Vec<(RegisterIndex, RegisterType)>,
56 pub storage: StorageConfig,
57}
58
59#[derive(Debug, Clone, Copy, Eq, PartialEq)]
60pub enum State {
61 Inactive,
62 Active,
63}
64
65#[derive(Debug, Clone)]
66pub struct Status {
67 pub state: State,
68 pub register_groups: Option<HashMap<RegisterGroupId, RegisterGroupStatus>>,
69}
70
71#[derive(Debug, Clone, Eq, PartialEq)]
72pub struct Config {
73 pub default_storage: StorageConfig,
74 pub register_groups: HashMap<RegisterGroupId, RegisterGroupConfig>,
75}
76
77pub(crate) struct Context {
78 data_path: PathBuf, file_name_prefix: String,
81
82 config: Config,
83
84 state: State,
85
86 register_groups: HashMap<RegisterGroupId, RegisterGroupContext>,
87
88 event_cb: Box<dyn ContextEventCallback + Send>,
89}
90
91pub(crate) trait ContextEventCallback {
92 fn data_directory_created(&self, register_group_id: &RegisterGroupId, data_dir: &Path);
93}
94
95struct RegisterGroupContext {
96 storage: CsvFileRecordStorage,
97}
98
99#[derive(Debug, Clone)]
100pub struct RegisterGroupStatus {
101 pub storage: StorageStatus,
102}
103
104fn partition_id_as_path(partition_id: &PartitionId) -> &Path {
105 let path = Path::new(partition_id.as_ref());
106 debug_assert!(!path.has_root());
107 debug_assert!(path.is_relative());
108 debug_assert!(path.components().count() == 1);
109 path
110}
111
112impl RegisterGroupContext {
113 fn try_new(
114 register_group_id: &RegisterGroupId,
115 data_path: &Path,
116 file_name_prefix: String,
117 config: RegisterGroupConfig,
118 event_cb: &dyn ContextEventCallback,
119 ) -> Result<Self> {
120 let mut data_path = PathBuf::from(data_path);
121 let partition_id = PartitionId::encode(register_group_id.as_ref());
122 let id_path = partition_id_as_path(&partition_id);
123 data_path.push(id_path);
124 if !data_path.is_dir() {
125 log::info!("Creating non-existent directory {}", data_path.display());
126 fs::create_dir_all(&data_path)?;
127 event_cb.data_directory_created(register_group_id, &data_path);
128 }
129 let storage = CsvFileRecordStorage::try_new(
130 config.storage,
131 data_path,
132 file_name_prefix,
133 config.registers,
134 )
135 .map_err(anyhow::Error::from)?;
136 let context = Self { storage };
137 Ok(context)
138 }
139
140 fn status(&mut self, with_storage_statistics: bool) -> StorageResult<RegisterGroupStatus> {
141 let storage_statistics = if with_storage_statistics {
142 Some(self.storage.report_statistics()?)
143 } else {
144 None
145 };
146 let storage = StorageStatus {
147 descriptor: self.storage.descriptor().clone(),
148 statistics: storage_statistics,
149 };
150 Ok(RegisterGroupStatus { storage })
151 }
152}
153
154pub(crate) trait RecordPreludeGenerator {
155 fn generate_prelude(&self) -> Result<(SystemInstant, RecordPrelude)>;
156}
157
158#[derive(Debug)]
159struct DefaultRecordPreludeGenerator;
160
161impl RecordPreludeGenerator for DefaultRecordPreludeGenerator {
162 fn generate_prelude(&self) -> Result<(SystemInstant, RecordPrelude)> {
163 Ok((SystemInstant::now(), Default::default()))
164 }
165}
166
167pub(crate) trait RecordRepo {
168 fn append_record(&mut self, record: RegisterRecord) -> Result<()>;
169
170 fn recent_records(&self, limit: NonZeroUsize) -> Result<Vec<StoredRegisterRecord>>;
171
172 fn filter_records(
173 &self,
174 limit: NonZeroUsize,
175 filter: RecordPreludeFilter,
176 ) -> Result<Vec<StoredRegisterRecord>>;
177
178 fn total_record_count(&self) -> usize;
179}
180
181#[allow(clippy::needless_pass_by_value)]
182fn create_register_group_contexts(
183 data_path: &Path,
184 file_name_prefix: String,
185 register_group_configs: HashMap<RegisterGroupId, RegisterGroupConfig>,
186 event_cb: &dyn ContextEventCallback,
187) -> Result<HashMap<RegisterGroupId, RegisterGroupContext>> {
188 let mut register_group_contexts = HashMap::with_capacity(register_group_configs.len());
189 for (register_group_id, register_group_config) in register_group_configs {
190 let register_group_context = RegisterGroupContext::try_new(
191 ®ister_group_id,
192 data_path,
193 file_name_prefix.clone(),
194 register_group_config.clone(),
195 event_cb,
196 )?;
197 register_group_contexts.insert(register_group_id, register_group_context);
198 }
199 Ok(register_group_contexts)
200}
201
202impl Context {
203 pub(crate) fn try_new(
204 data_path: PathBuf,
205 file_name_prefix: String,
206 initial_config: Config,
207 initial_state: State,
208 event_cb: Box<dyn ContextEventCallback + Send>,
209 ) -> Result<Self> {
210 let register_groups = create_register_group_contexts(
211 &data_path,
212 file_name_prefix.clone(),
213 initial_config.register_groups.clone(),
214 &*event_cb,
215 )?;
216 Ok(Self {
217 data_path,
218 file_name_prefix,
219 config: initial_config,
220 state: initial_state,
221 register_groups,
222 event_cb,
223 })
224 }
225
226 pub(crate) fn config(&self) -> &Config {
227 &self.config
228 }
229
230 pub(crate) fn state(&self) -> State {
231 self.state
232 }
233
234 pub(crate) fn register_group_config(
235 &self,
236 id: &RegisterGroupId,
237 ) -> Option<&RegisterGroupConfig> {
238 self.config.register_groups.get(id)
239 }
240
241 pub(crate) fn status(
242 &mut self,
243 with_register_groups: bool,
244 with_storage_statistics: bool,
245 ) -> Result<Status> {
246 let state = self.state();
247 let register_groups = if with_register_groups {
248 let mut register_groups = HashMap::with_capacity(self.register_groups.len());
249 for (id, context) in &mut self.register_groups {
250 let status = context
251 .status(with_storage_statistics)
252 .map_err(Error::MsrStorage)?;
253 register_groups.insert(id.clone(), status);
254 }
255 Some(register_groups)
256 } else {
257 None
258 };
259 Ok(Status {
260 state,
261 register_groups,
262 })
263 }
264
265 pub(crate) fn recent_records(
266 &mut self,
267 register_group_id: &RegisterGroupId,
268 limit: NonZeroUsize,
269 ) -> Result<Vec<StoredRegisterRecord>> {
270 let context = self
271 .register_groups
272 .get_mut(register_group_id)
273 .ok_or(Error::RegisterGroupUnknown)?;
274 Ok(context.storage.recent_records(limit)?)
275 }
276
277 pub(crate) fn filter_records(
278 &mut self,
279 register_group_id: &RegisterGroupId,
280 limit: NonZeroUsize,
281 filter: &RecordPreludeFilter,
282 ) -> Result<Vec<StoredRegisterRecord>> {
283 let context = self
284 .register_groups
285 .get_mut(register_group_id)
286 .ok_or(Error::RegisterGroupUnknown)?;
287 Ok(context.storage.filter_records(limit, filter)?)
288 }
289
290 pub(crate) fn replace_config(&mut self, new_config: Config) -> Result<Config> {
294 if self.config == new_config {
295 return Ok(new_config);
296 }
297 log::debug!(
298 "Replacing configuration: {:?} -> {:?}",
299 self.config,
300 new_config
301 );
302 let new_register_groups = create_register_group_contexts(
303 &self.data_path,
304 self.file_name_prefix.clone(),
305 new_config.register_groups.clone(),
306 &*self.event_cb,
307 )?;
308 self.register_groups = new_register_groups;
310 Ok(std::mem::replace(&mut self.config, new_config))
311 }
312
313 pub(crate) fn replace_register_group_config(
317 &mut self,
318 register_group_id: RegisterGroupId,
319 new_config: RegisterGroupConfig,
320 ) -> Result<Option<RegisterGroupConfig>> {
321 let entry = self.config.register_groups.entry(register_group_id);
322 match entry {
323 Entry::Vacant(vacant) => {
324 let register_group_id = vacant.key().clone();
325 log::debug!(
326 "Configuring register group {}: {:?}",
327 register_group_id,
328 new_config
329 );
330 let register_group_context = RegisterGroupContext::try_new(
331 ®ister_group_id,
332 &self.data_path,
333 self.file_name_prefix.clone(),
334 new_config.clone(),
335 &*self.event_cb,
336 )?;
337 self.register_groups
338 .insert(register_group_id, register_group_context);
339 vacant.insert(new_config);
340 Ok(None)
341 }
342 Entry::Occupied(mut occupied) => {
343 if occupied.get() == &new_config {
344 return Ok(Some(new_config));
345 }
346 let register_group_id = occupied.key().clone();
347 log::debug!(
348 "Replacing configuration of register group {}: {:?} -> {:?}",
349 register_group_id,
350 occupied.get(),
351 new_config
352 );
353 let register_group_context = RegisterGroupContext::try_new(
354 ®ister_group_id,
355 &self.data_path,
356 self.file_name_prefix.clone(),
357 new_config.clone(),
358 &*self.event_cb,
359 )?;
360 self.register_groups
361 .insert(register_group_id, register_group_context);
362 let old_config = std::mem::replace(occupied.get_mut(), new_config);
363 Ok(Some(old_config))
364 }
365 }
366 }
367
368 pub(crate) fn switch_state(&mut self, new_state: State) -> Result<State> {
372 if self.state == new_state {
373 return Ok(new_state);
374 }
375 log::debug!("Switching state: {:?} -> {:?}", self.state, new_state);
376 Ok(std::mem::replace(&mut self.state, new_state))
377 }
378
379 pub(crate) fn record_observed_register_group_values(
380 &mut self,
381 register_group_id: &RegisterGroupId,
382 observed_register_values: ObservedRegisterValues,
383 ) -> Result<Option<StoredRegisterRecordPrelude>> {
384 match self.state {
385 State::Inactive => {
386 log::debug!(
387 "Discarding new observation for register group {} while inactive: {:?}",
388 register_group_id,
389 observed_register_values
390 );
391 Ok(None)
392 }
393 State::Active => {
394 if let Some(config) = self.config.register_groups.get(register_group_id) {
395 let expected_register_count = config.registers.len();
396 let actual_register_count = observed_register_values.register_values.len();
397 if expected_register_count != actual_register_count {
398 log::warn!(
399 "Mismatching number of register values in observation for group {}: expected = {}, actual = {}",
400 register_group_id,
401 expected_register_count,
402 actual_register_count);
403 return Err(Error::DataFormatInvalid);
404 }
405 for ((register_index, expected_type), actual_type) in
406 config.registers.iter().zip(
407 observed_register_values
408 .register_values
409 .iter()
410 .map(|v| v.as_ref().map(msr_core::Value::to_type)),
411 )
412 {
413 if let Some(actual_type) = actual_type {
414 if *expected_type != actual_type {
415 log::warn!(
416 "Mismatching register type for register {} in observation for group {}: expected = {}, actual = {}",
417 register_index,
418 register_group_id,
419 expected_type,
420 actual_type);
421 }
422 }
423 }
424 } else {
425 log::warn!(
426 "Missing configuration for register group {} - rejecting observation",
427 register_group_id
428 );
429 return Err(Error::RegisterGroupUnknown);
430 }
431 let context = self
432 .register_groups
433 .get_mut(register_group_id)
434 .ok_or(Error::RegisterGroupUnknown)?;
435 DefaultRecordPreludeGenerator.generate_prelude().and_then(
436 |(created_at, prelude)| {
437 let new_record = RegisterRecord {
438 prelude,
439 observation: observed_register_values,
440 };
441 log::debug!(
442 "Recording new observation for register group {}: {:?}",
443 register_group_id,
444 new_record
445 );
446 let prelude = context.storage.append_record(&created_at, new_record)?;
447 Ok(Some(prelude))
448 },
449 )
450 }
451 }
452 }
453
454 #[allow(clippy::panic_in_result_fn)] pub(crate) fn smoke_test(&mut self) -> Result<()> {
457 let register_group_id = RegisterGroupId::from_value("smoke-test-register-group".into());
458 let register_group_config = RegisterGroupConfig {
459 registers: vec![
460 (
461 RegisterIndex::new(1),
462 RegisterType::Scalar(ScalarType::Bool),
463 ),
464 (RegisterIndex::new(2), RegisterType::Scalar(ScalarType::I64)),
465 (RegisterIndex::new(3), RegisterType::Scalar(ScalarType::U64)),
466 (RegisterIndex::new(4), RegisterType::Scalar(ScalarType::F64)),
467 (RegisterIndex::new(5), RegisterType::String),
468 ],
469 storage: self.config.default_storage.clone(),
470 };
471 let orig_config =
472 self.replace_register_group_config(register_group_id.clone(), register_group_config)?;
473 let recorded_observations = vec![
474 ObservedRegisterValues {
475 observed_at: Timestamp::now(),
476 register_values: vec![
477 None,
478 Some(ScalarValue::I64(0).into()),
479 Some(ScalarValue::U64(0).into()),
480 Some(ScalarValue::F64(0.0).into()),
481 None,
482 ],
483 },
484 ObservedRegisterValues {
485 observed_at: Timestamp::now(),
486 register_values: vec![
487 Some(ScalarValue::Bool(false).into()),
488 Some(ScalarValue::I64(-1).into()),
489 Some(ScalarValue::U64(1).into()),
490 Some(ScalarValue::F64(-1.125).into()),
491 Some("Hello".to_owned().into()),
492 ],
493 },
494 ObservedRegisterValues {
495 observed_at: Timestamp::now(),
496 register_values: vec![
497 Some(ScalarValue::Bool(true).into()),
498 Some(ScalarValue::I64(1).into()),
499 None,
500 Some(ScalarValue::F64(1.125).into()),
501 Some(", world!".to_owned().into()),
502 ],
503 },
504 ObservedRegisterValues {
505 observed_at: Timestamp::now(),
506 register_values: vec![None, None, None, None, None],
507 },
508 ];
509 for observation in &recorded_observations {
510 self.record_observed_register_group_values(®ister_group_id, observation.clone())?;
511 }
512 let recent_records = self.recent_records(
513 ®ister_group_id,
514 NonZeroUsize::new(recorded_observations.len()).unwrap(),
515 )?;
516 assert_eq!(recent_records.len(), recorded_observations.len());
517 log::info!(
518 "Smoke test recorded observations: {:?}",
519 recorded_observations
520 );
521 log::info!("Smoke test records: {:?}", recent_records);
522 if let Some(orig_config) = orig_config {
524 self.replace_register_group_config(register_group_id, orig_config)?;
525 }
526 Ok(())
527 }
528}