msr_plugin_csv_register_recorder/internal/
context.rs

1use std::{
2    collections::{hash_map::Entry, HashMap},
3    fmt, fs,
4    num::NonZeroUsize,
5    path::{Path, PathBuf},
6};
7
8use msr_core::{
9    register::{
10        recorder::{
11            csv::FileRecordStorage as CsvFileRecordStorage, RecordPrelude, RecordStorage as _,
12            StoredRecordPrelude as StoredRegisterRecordPrelude,
13        },
14        Index as RegisterIndex,
15    },
16    storage::{
17        RecordPreludeFilter, RecordStorageBase, Result as StorageResult, StorageConfig,
18        StorageStatus,
19    },
20    time::{SystemInstant, Timestamp},
21    ScalarType, ScalarValue,
22};
23
24use crate::{
25    api::{
26        ObservedRegisterValues, RegisterGroupId, RegisterRecord, RegisterType, StoredRegisterRecord,
27    },
28    Error, Result,
29};
30
31#[derive(Debug, Clone, Hash, Eq, PartialEq)]
32pub(crate) struct PartitionId(String);
33
34impl PartitionId {
35    pub(crate) fn encode(s: &str) -> Self {
36        Self(bs58::encode(s).into_string())
37    }
38}
39
40impl AsRef<str> for PartitionId {
41    fn as_ref(&self) -> &str {
42        let Self(inner) = &self;
43        inner
44    }
45}
46
47impl fmt::Display for PartitionId {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        f.write_str(self.as_ref())
50    }
51}
52
53#[derive(Debug, Clone, Eq, PartialEq)]
54pub struct RegisterGroupConfig {
55    pub registers: Vec<(RegisterIndex, RegisterType)>,
56    pub storage: StorageConfig,
57}
58
59#[derive(Debug, Clone, Copy, Eq, PartialEq)]
60pub enum State {
61    Inactive,
62    Active,
63}
64
65#[derive(Debug, Clone)]
66pub struct Status {
67    pub state: State,
68    pub register_groups: Option<HashMap<RegisterGroupId, RegisterGroupStatus>>,
69}
70
71#[derive(Debug, Clone, Eq, PartialEq)]
72pub struct Config {
73    pub default_storage: StorageConfig,
74    pub register_groups: HashMap<RegisterGroupId, RegisterGroupConfig>,
75}
76
77pub(crate) struct Context {
78    data_path: PathBuf, // immutable
79
80    file_name_prefix: String,
81
82    config: Config,
83
84    state: State,
85
86    register_groups: HashMap<RegisterGroupId, RegisterGroupContext>,
87
88    event_cb: Box<dyn ContextEventCallback + Send>,
89}
90
91pub(crate) trait ContextEventCallback {
92    fn data_directory_created(&self, register_group_id: &RegisterGroupId, data_dir: &Path);
93}
94
95struct RegisterGroupContext {
96    storage: CsvFileRecordStorage,
97}
98
99#[derive(Debug, Clone)]
100pub struct RegisterGroupStatus {
101    pub storage: StorageStatus,
102}
103
104fn partition_id_as_path(partition_id: &PartitionId) -> &Path {
105    let path = Path::new(partition_id.as_ref());
106    debug_assert!(!path.has_root());
107    debug_assert!(path.is_relative());
108    debug_assert!(path.components().count() == 1);
109    path
110}
111
112impl RegisterGroupContext {
113    fn try_new(
114        register_group_id: &RegisterGroupId,
115        data_path: &Path,
116        file_name_prefix: String,
117        config: RegisterGroupConfig,
118        event_cb: &dyn ContextEventCallback,
119    ) -> Result<Self> {
120        let mut data_path = PathBuf::from(data_path);
121        let partition_id = PartitionId::encode(register_group_id.as_ref());
122        let id_path = partition_id_as_path(&partition_id);
123        data_path.push(id_path);
124        if !data_path.is_dir() {
125            log::info!("Creating non-existent directory {}", data_path.display());
126            fs::create_dir_all(&data_path)?;
127            event_cb.data_directory_created(register_group_id, &data_path);
128        }
129        let storage = CsvFileRecordStorage::try_new(
130            config.storage,
131            data_path,
132            file_name_prefix,
133            config.registers,
134        )
135        .map_err(anyhow::Error::from)?;
136        let context = Self { storage };
137        Ok(context)
138    }
139
140    fn status(&mut self, with_storage_statistics: bool) -> StorageResult<RegisterGroupStatus> {
141        let storage_statistics = if with_storage_statistics {
142            Some(self.storage.report_statistics()?)
143        } else {
144            None
145        };
146        let storage = StorageStatus {
147            descriptor: self.storage.descriptor().clone(),
148            statistics: storage_statistics,
149        };
150        Ok(RegisterGroupStatus { storage })
151    }
152}
153
154pub(crate) trait RecordPreludeGenerator {
155    fn generate_prelude(&self) -> Result<(SystemInstant, RecordPrelude)>;
156}
157
158#[derive(Debug)]
159struct DefaultRecordPreludeGenerator;
160
161impl RecordPreludeGenerator for DefaultRecordPreludeGenerator {
162    fn generate_prelude(&self) -> Result<(SystemInstant, RecordPrelude)> {
163        Ok((SystemInstant::now(), Default::default()))
164    }
165}
166
167pub(crate) trait RecordRepo {
168    fn append_record(&mut self, record: RegisterRecord) -> Result<()>;
169
170    fn recent_records(&self, limit: NonZeroUsize) -> Result<Vec<StoredRegisterRecord>>;
171
172    fn filter_records(
173        &self,
174        limit: NonZeroUsize,
175        filter: RecordPreludeFilter,
176    ) -> Result<Vec<StoredRegisterRecord>>;
177
178    fn total_record_count(&self) -> usize;
179}
180
181#[allow(clippy::needless_pass_by_value)]
182fn create_register_group_contexts(
183    data_path: &Path,
184    file_name_prefix: String,
185    register_group_configs: HashMap<RegisterGroupId, RegisterGroupConfig>,
186    event_cb: &dyn ContextEventCallback,
187) -> Result<HashMap<RegisterGroupId, RegisterGroupContext>> {
188    let mut register_group_contexts = HashMap::with_capacity(register_group_configs.len());
189    for (register_group_id, register_group_config) in register_group_configs {
190        let register_group_context = RegisterGroupContext::try_new(
191            &register_group_id,
192            data_path,
193            file_name_prefix.clone(),
194            register_group_config.clone(),
195            event_cb,
196        )?;
197        register_group_contexts.insert(register_group_id, register_group_context);
198    }
199    Ok(register_group_contexts)
200}
201
202impl Context {
203    pub(crate) fn try_new(
204        data_path: PathBuf,
205        file_name_prefix: String,
206        initial_config: Config,
207        initial_state: State,
208        event_cb: Box<dyn ContextEventCallback + Send>,
209    ) -> Result<Self> {
210        let register_groups = create_register_group_contexts(
211            &data_path,
212            file_name_prefix.clone(),
213            initial_config.register_groups.clone(),
214            &*event_cb,
215        )?;
216        Ok(Self {
217            data_path,
218            file_name_prefix,
219            config: initial_config,
220            state: initial_state,
221            register_groups,
222            event_cb,
223        })
224    }
225
226    pub(crate) fn config(&self) -> &Config {
227        &self.config
228    }
229
230    pub(crate) fn state(&self) -> State {
231        self.state
232    }
233
234    pub(crate) fn register_group_config(
235        &self,
236        id: &RegisterGroupId,
237    ) -> Option<&RegisterGroupConfig> {
238        self.config.register_groups.get(id)
239    }
240
241    pub(crate) fn status(
242        &mut self,
243        with_register_groups: bool,
244        with_storage_statistics: bool,
245    ) -> Result<Status> {
246        let state = self.state();
247        let register_groups = if with_register_groups {
248            let mut register_groups = HashMap::with_capacity(self.register_groups.len());
249            for (id, context) in &mut self.register_groups {
250                let status = context
251                    .status(with_storage_statistics)
252                    .map_err(Error::MsrStorage)?;
253                register_groups.insert(id.clone(), status);
254            }
255            Some(register_groups)
256        } else {
257            None
258        };
259        Ok(Status {
260            state,
261            register_groups,
262        })
263    }
264
265    pub(crate) fn recent_records(
266        &mut self,
267        register_group_id: &RegisterGroupId,
268        limit: NonZeroUsize,
269    ) -> Result<Vec<StoredRegisterRecord>> {
270        let context = self
271            .register_groups
272            .get_mut(register_group_id)
273            .ok_or(Error::RegisterGroupUnknown)?;
274        Ok(context.storage.recent_records(limit)?)
275    }
276
277    pub(crate) fn filter_records(
278        &mut self,
279        register_group_id: &RegisterGroupId,
280        limit: NonZeroUsize,
281        filter: &RecordPreludeFilter,
282    ) -> Result<Vec<StoredRegisterRecord>> {
283        let context = self
284            .register_groups
285            .get_mut(register_group_id)
286            .ok_or(Error::RegisterGroupUnknown)?;
287        Ok(context.storage.filter_records(limit, filter)?)
288    }
289
290    /// Switch the current configuration
291    ///
292    /// Returns the previous configuration.
293    pub(crate) fn replace_config(&mut self, new_config: Config) -> Result<Config> {
294        if self.config == new_config {
295            return Ok(new_config);
296        }
297        log::debug!(
298            "Replacing configuration: {:?} -> {:?}",
299            self.config,
300            new_config
301        );
302        let new_register_groups = create_register_group_contexts(
303            &self.data_path,
304            self.file_name_prefix.clone(),
305            new_config.register_groups.clone(),
306            &*self.event_cb,
307        )?;
308        // Replace atomically
309        self.register_groups = new_register_groups;
310        Ok(std::mem::replace(&mut self.config, new_config))
311    }
312
313    /// Switch the current configuration of a single register group
314    ///
315    /// Returns the previous configuration.
316    pub(crate) fn replace_register_group_config(
317        &mut self,
318        register_group_id: RegisterGroupId,
319        new_config: RegisterGroupConfig,
320    ) -> Result<Option<RegisterGroupConfig>> {
321        let entry = self.config.register_groups.entry(register_group_id);
322        match entry {
323            Entry::Vacant(vacant) => {
324                let register_group_id = vacant.key().clone();
325                log::debug!(
326                    "Configuring register group {}: {:?}",
327                    register_group_id,
328                    new_config
329                );
330                let register_group_context = RegisterGroupContext::try_new(
331                    &register_group_id,
332                    &self.data_path,
333                    self.file_name_prefix.clone(),
334                    new_config.clone(),
335                    &*self.event_cb,
336                )?;
337                self.register_groups
338                    .insert(register_group_id, register_group_context);
339                vacant.insert(new_config);
340                Ok(None)
341            }
342            Entry::Occupied(mut occupied) => {
343                if occupied.get() == &new_config {
344                    return Ok(Some(new_config));
345                }
346                let register_group_id = occupied.key().clone();
347                log::debug!(
348                    "Replacing configuration of register group {}: {:?} -> {:?}",
349                    register_group_id,
350                    occupied.get(),
351                    new_config
352                );
353                let register_group_context = RegisterGroupContext::try_new(
354                    &register_group_id,
355                    &self.data_path,
356                    self.file_name_prefix.clone(),
357                    new_config.clone(),
358                    &*self.event_cb,
359                )?;
360                self.register_groups
361                    .insert(register_group_id, register_group_context);
362                let old_config = std::mem::replace(occupied.get_mut(), new_config);
363                Ok(Some(old_config))
364            }
365        }
366    }
367
368    /// Switch the current state
369    ///
370    /// Returns the previous state.
371    pub(crate) fn switch_state(&mut self, new_state: State) -> Result<State> {
372        if self.state == new_state {
373            return Ok(new_state);
374        }
375        log::debug!("Switching state: {:?} -> {:?}", self.state, new_state);
376        Ok(std::mem::replace(&mut self.state, new_state))
377    }
378
379    pub(crate) fn record_observed_register_group_values(
380        &mut self,
381        register_group_id: &RegisterGroupId,
382        observed_register_values: ObservedRegisterValues,
383    ) -> Result<Option<StoredRegisterRecordPrelude>> {
384        match self.state {
385            State::Inactive => {
386                log::debug!(
387                    "Discarding new observation for register group {} while inactive: {:?}",
388                    register_group_id,
389                    observed_register_values
390                );
391                Ok(None)
392            }
393            State::Active => {
394                if let Some(config) = self.config.register_groups.get(register_group_id) {
395                    let expected_register_count = config.registers.len();
396                    let actual_register_count = observed_register_values.register_values.len();
397                    if expected_register_count != actual_register_count {
398                        log::warn!(
399                            "Mismatching number of register values in observation for group {}: expected = {}, actual = {}",
400                            register_group_id,
401                            expected_register_count,
402                            actual_register_count);
403                        return Err(Error::DataFormatInvalid);
404                    }
405                    for ((register_index, expected_type), actual_type) in
406                        config.registers.iter().zip(
407                            observed_register_values
408                                .register_values
409                                .iter()
410                                .map(|v| v.as_ref().map(msr_core::Value::to_type)),
411                        )
412                    {
413                        if let Some(actual_type) = actual_type {
414                            if *expected_type != actual_type {
415                                log::warn!(
416                                    "Mismatching register type for register {} in observation for group {}: expected = {}, actual = {}",
417                                    register_index,
418                                    register_group_id,
419                                    expected_type,
420                                    actual_type);
421                            }
422                        }
423                    }
424                } else {
425                    log::warn!(
426                        "Missing configuration for register group {} - rejecting observation",
427                        register_group_id
428                    );
429                    return Err(Error::RegisterGroupUnknown);
430                }
431                let context = self
432                    .register_groups
433                    .get_mut(register_group_id)
434                    .ok_or(Error::RegisterGroupUnknown)?;
435                DefaultRecordPreludeGenerator.generate_prelude().and_then(
436                    |(created_at, prelude)| {
437                        let new_record = RegisterRecord {
438                            prelude,
439                            observation: observed_register_values,
440                        };
441                        log::debug!(
442                            "Recording new observation for register group {}: {:?}",
443                            register_group_id,
444                            new_record
445                        );
446                        let prelude = context.storage.append_record(&created_at, new_record)?;
447                        Ok(Some(prelude))
448                    },
449                )
450            }
451        }
452    }
453
454    // FIXME: Replace with an integration test
455    #[allow(clippy::panic_in_result_fn)] // just a test
456    pub(crate) fn smoke_test(&mut self) -> Result<()> {
457        let register_group_id = RegisterGroupId::from_value("smoke-test-register-group".into());
458        let register_group_config = RegisterGroupConfig {
459            registers: vec![
460                (
461                    RegisterIndex::new(1),
462                    RegisterType::Scalar(ScalarType::Bool),
463                ),
464                (RegisterIndex::new(2), RegisterType::Scalar(ScalarType::I64)),
465                (RegisterIndex::new(3), RegisterType::Scalar(ScalarType::U64)),
466                (RegisterIndex::new(4), RegisterType::Scalar(ScalarType::F64)),
467                (RegisterIndex::new(5), RegisterType::String),
468            ],
469            storage: self.config.default_storage.clone(),
470        };
471        let orig_config =
472            self.replace_register_group_config(register_group_id.clone(), register_group_config)?;
473        let recorded_observations = vec![
474            ObservedRegisterValues {
475                observed_at: Timestamp::now(),
476                register_values: vec![
477                    None,
478                    Some(ScalarValue::I64(0).into()),
479                    Some(ScalarValue::U64(0).into()),
480                    Some(ScalarValue::F64(0.0).into()),
481                    None,
482                ],
483            },
484            ObservedRegisterValues {
485                observed_at: Timestamp::now(),
486                register_values: vec![
487                    Some(ScalarValue::Bool(false).into()),
488                    Some(ScalarValue::I64(-1).into()),
489                    Some(ScalarValue::U64(1).into()),
490                    Some(ScalarValue::F64(-1.125).into()),
491                    Some("Hello".to_owned().into()),
492                ],
493            },
494            ObservedRegisterValues {
495                observed_at: Timestamp::now(),
496                register_values: vec![
497                    Some(ScalarValue::Bool(true).into()),
498                    Some(ScalarValue::I64(1).into()),
499                    None,
500                    Some(ScalarValue::F64(1.125).into()),
501                    Some(", world!".to_owned().into()),
502                ],
503            },
504            ObservedRegisterValues {
505                observed_at: Timestamp::now(),
506                register_values: vec![None, None, None, None, None],
507            },
508        ];
509        for observation in &recorded_observations {
510            self.record_observed_register_group_values(&register_group_id, observation.clone())?;
511        }
512        let recent_records = self.recent_records(
513            &register_group_id,
514            NonZeroUsize::new(recorded_observations.len()).unwrap(),
515        )?;
516        assert_eq!(recent_records.len(), recorded_observations.len());
517        log::info!(
518            "Smoke test recorded observations: {:?}",
519            recorded_observations
520        );
521        log::info!("Smoke test records: {:?}", recent_records);
522        // Restore configuration
523        if let Some(orig_config) = orig_config {
524            self.replace_register_group_config(register_group_id, orig_config)?;
525        }
526        Ok(())
527    }
528}