msr_plugin_csv_event_journal/
lib.rs

1// FIXME: Enable `deny(missing_docs)` before release
2//#![deny(missing_docs)]
3
4#![warn(rust_2018_idioms)]
5#![warn(rust_2021_compatibility)]
6#![warn(missing_debug_implementations)]
7#![warn(unreachable_pub)]
8#![warn(unsafe_code)]
9#![warn(rustdoc::broken_intra_doc_links)]
10#![warn(clippy::pedantic)]
11// Additional restrictions
12#![warn(clippy::clone_on_ref_ptr)]
13#![warn(clippy::self_named_module_files)]
14// Exceptions
15#![allow(clippy::default_trait_access)]
16#![allow(clippy::module_name_repetitions)]
17#![allow(clippy::missing_errors_doc)] // TODO
18#![allow(clippy::missing_panics_doc)] // TODO
19#![allow(clippy::unnecessary_wraps)] // TODO
20
21use std::{
22    io::Error as IoError,
23    num::{NonZeroU32, NonZeroU64},
24    path::PathBuf,
25};
26
27use thiserror::Error;
28
29use msr_core::{
30    event_journal::Severity,
31    storage::{BinaryDataFormat, MemorySize, StorageConfig, StorageSegmentConfig, TimeInterval},
32};
33
34use msr_plugin::EventPublisherIndex;
35
36pub mod api;
37
38mod internal;
39use self::internal::message_loop::create_message_loop;
40
41#[derive(Debug, Clone)]
42pub struct Environment {
43    pub event_publisher_index: EventPublisherIndex,
44
45    /// Directory for storing CSV data
46    pub data_dir: PathBuf,
47
48    pub custom_file_name_prefix: Option<String>,
49}
50
51#[must_use]
52pub fn default_storage_config() -> StorageConfig {
53    StorageConfig {
54        retention_time: TimeInterval::Days(NonZeroU32::new(180).unwrap()), // 180 days
55        segmentation: StorageSegmentConfig {
56            time_interval: TimeInterval::Days(NonZeroU32::new(1).unwrap()), // daily
57            size_limit: MemorySize::Bytes(NonZeroU64::new(1_048_576).unwrap()), // 1 MiB
58        },
59    }
60}
61
62#[must_use]
63pub fn default_config() -> api::Config {
64    api::Config {
65        severity_threshold: Severity::Information,
66        storage: default_storage_config(),
67    }
68}
69
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub struct PluginSetup {
72    pub binary_data_format: BinaryDataFormat,
73    pub initial_config: api::Config,
74    pub initial_state: api::State,
75}
76
77impl Default for PluginSetup {
78    fn default() -> Self {
79        Self {
80            binary_data_format: BinaryDataFormat::Utf8, // assume JSON/UTF-8 data
81            initial_config: default_config(),
82            initial_state: api::State::Inactive,
83        }
84    }
85}
86
87#[derive(Error, Debug)]
88pub enum Error {
89    #[error("missing config")]
90    MissingConfig,
91
92    #[error("invalid state")]
93    InvalidState,
94
95    // TODO: Rename this variant?
96    #[error(transparent)]
97    MsrCore(#[from] msr_core::event_journal::Error),
98
99    #[error(transparent)]
100    Io(#[from] IoError),
101
102    #[error(transparent)]
103    Other(#[from] anyhow::Error),
104}
105
106pub type Result<T> = std::result::Result<T, Error>;
107
108pub type PluginError = msr_plugin::PluginError<Error>;
109pub type PluginResult<T> = msr_plugin::PluginResult<T, Error>;
110
111pub type MessageSender = msr_plugin::MessageSender<api::Message>;
112pub type MessageReceiver = msr_plugin::MessageReceiver<api::Message>;
113
114pub type ResultSender<T> = msr_plugin::ResultSender<T, Error>;
115pub type ResultReceiver<T> = msr_plugin::ResultReceiver<T, Error>;
116
117pub type PublishedEvent = msr_plugin::PublishedEvent<api::Event>;
118pub type EventReceiver = msr_plugin::EventReceiver<api::Event>;
119type EventPubSub = msr_plugin::EventPubSub<api::Event>;
120
121pub type Plugin = msr_plugin::PluginContainer<api::Message, api::Event>;
122pub type PluginPorts = msr_plugin::PluginPorts<api::Message, api::Event>;
123
124pub const DEFAULT_FILE_NAME_PREFIX: &str = "event_journal_records_";
125
126pub fn create_plugin(
127    environment: Environment,
128    plugin_setup: PluginSetup,
129    event_channel_capacity: usize,
130) -> Result<Plugin> {
131    let Environment {
132        event_publisher_index,
133        data_dir,
134        custom_file_name_prefix,
135    } = environment;
136    let PluginSetup {
137        binary_data_format,
138        initial_config,
139        initial_state,
140    } = plugin_setup;
141    let (event_pubsub, event_subscriber) =
142        EventPubSub::new(event_publisher_index, event_channel_capacity);
143    let file_name_prefix =
144        custom_file_name_prefix.unwrap_or_else(|| DEFAULT_FILE_NAME_PREFIX.to_owned());
145    let (message_loop, message_tx) = create_message_loop(
146        data_dir,
147        file_name_prefix,
148        event_pubsub,
149        binary_data_format,
150        initial_config,
151        initial_state,
152    )?;
153    Ok(Plugin {
154        ports: PluginPorts {
155            message_tx,
156            event_subscriber,
157        },
158        message_loop,
159    })
160}