rtlola_io_plugins/
outputs.rs

1#[cfg(feature = "byte_plugin")]
2pub mod byte_plugin;
3#[cfg(feature = "csv_plugin")]
4pub mod csv_plugin;
5#[cfg(feature = "json_plugin")]
6pub mod json_plugin;
7#[cfg(feature = "log_printer")]
8pub mod log_printer;
9#[cfg(feature = "statistics_plugin")]
10pub mod statistics_plugin;
11
12use std::collections::{HashMap, HashSet};
13use std::convert::Infallible;
14use std::error::Error;
15use std::io::Write;
16use std::marker::PhantomData;
17
18use rtlola_interpreter::monitor::{VerdictRepresentation, Verdicts};
19use rtlola_interpreter::output::{NewVerdictFactory, VerdictFactory};
20use rtlola_interpreter::rtlola_frontend::tag_parser::verbosity_parser::{
21    DebugParser, StreamVerbosity, VerbosityParser,
22};
23use rtlola_interpreter::rtlola_frontend::tag_parser::TagValidator;
24use rtlola_interpreter::rtlola_frontend::{RtLolaError, RtLolaMir};
25use rtlola_interpreter::rtlola_mir::StreamReference;
26use rtlola_interpreter::time::{OutputTimeRepresentation, TimeRepresentation};
27
28/// Struct for a generic factory returning the monitor output
29#[derive(Debug)]
30pub struct VerdictRepresentationFactory<
31    MonitorOutput: VerdictRepresentation,
32    OutputTime: OutputTimeRepresentation,
33> {
34    phantom: PhantomData<(MonitorOutput, OutputTime)>,
35}
36
37impl<MonitorOutput: VerdictRepresentation, OutputTime: OutputTimeRepresentation> Default
38    for VerdictRepresentationFactory<MonitorOutput, OutputTime>
39{
40    fn default() -> Self {
41        Self {
42            phantom: Default::default(),
43        }
44    }
45}
46
47impl<MonitorOutput: VerdictRepresentation, OutputTime: OutputTimeRepresentation>
48    VerdictFactory<MonitorOutput, OutputTime>
49    for VerdictRepresentationFactory<MonitorOutput, OutputTime>
50{
51    type Error = Infallible;
52    type Record = (MonitorOutput, OutputTime::InnerTime);
53
54    fn get_verdict(
55        &mut self,
56        rec: MonitorOutput,
57        ts: OutputTime::InnerTime,
58    ) -> Result<Self::Record, Self::Error> {
59        Ok((rec, ts))
60    }
61}
62
63impl<MonitorOutput: VerdictRepresentation, OutputTime: OutputTimeRepresentation>
64    NewVerdictFactory<MonitorOutput, OutputTime>
65    for VerdictRepresentationFactory<MonitorOutput, OutputTime>
66{
67    type CreationData = ();
68    type CreationError = Infallible;
69
70    fn new(_ir: &RtLolaMir, _data: Self::CreationData) -> Result<Self, Self::Error> {
71        Ok(Self {
72            phantom: Default::default(),
73        })
74    }
75}
76
77/// The main trait that has to be implemented by an output plugin
78pub trait VerdictsSink<V: VerdictRepresentation, T: OutputTimeRepresentation> {
79    /// Error Type of a [VerdictsSink] implementation
80    type Error: Error + 'static;
81    /// Return Type of a [VerdictsSink] implementation
82    type Return;
83    /// Factory Type to convert the monitor output to the required representation
84    type Factory: VerdictFactory<V, T>;
85
86    /// Defines how the verdicts of the monitor needs to be handled
87    fn sink_verdicts(
88        &mut self,
89        verdicts: Verdicts<V, T>,
90    ) -> Result<
91        Vec<Self::Return>,
92        VerdictSinkError<Self::Error, <Self::Factory as VerdictFactory<V, T>>::Error>,
93    > {
94        let Verdicts { timed, event, ts } = verdicts;
95        timed
96            .into_iter()
97            .chain(vec![(ts, event)])
98            .map(|(ts, verdict)| self.sink_verdict(ts, verdict))
99            .collect::<Result<Vec<_>, _>>()
100    }
101    /// Defines how one verdict of the monitor needs to be handled, timed and event-based
102    fn sink_verdict(
103        &mut self,
104        ts: <T as TimeRepresentation>::InnerTime,
105        verdict: V,
106    ) -> Result<
107        Self::Return,
108        VerdictSinkError<Self::Error, <Self::Factory as VerdictFactory<V, T>>::Error>,
109    > {
110        let verdict = self
111            .factory()
112            .get_verdict(verdict, ts)
113            .map_err(VerdictSinkError::Factory)?;
114        self.sink(verdict).map_err(VerdictSinkError::Sink)
115    }
116
117    /// Function to dispatch the converted verdict to the sink
118    fn sink(
119        &mut self,
120        verdict: <Self::Factory as VerdictFactory<V, T>>::Record,
121    ) -> Result<Self::Return, Self::Error>;
122
123    /// Function to return a reference to the Verdictfactory
124    fn factory(&mut self) -> &mut Self::Factory;
125}
126
127#[derive(Debug)]
128/// A generic Error to be used by [VerdictFactory]s
129pub enum VerdictSinkError<SinkError: Error + 'static, FactoryError: Error + 'static> {
130    #[allow(missing_docs)]
131    Sink(SinkError),
132    #[allow(missing_docs)]
133    Factory(FactoryError),
134}
135
136impl<SinkError: Error, FactoryError: Error> std::fmt::Display
137    for VerdictSinkError<SinkError, FactoryError>
138{
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        match self {
141            VerdictSinkError::Sink(e) => write!(f, "{}", e),
142            VerdictSinkError::Factory(e) => write!(f, "{}", e),
143        }
144    }
145}
146
147impl<SinkError: Error, FactoryError: Error> Error for VerdictSinkError<SinkError, FactoryError> {
148    fn source(&self) -> Option<&(dyn Error + 'static)> {
149        match self {
150            VerdictSinkError::Sink(e) => Some(e),
151            VerdictSinkError::Factory(e) => Some(e),
152        }
153    }
154}
155
156/// Generic VerdictSink that accepts verdicts as bytes and writes them to a Writer.
157#[derive(Debug)]
158pub struct ByteSink<
159    W: Write,
160    Factory: VerdictFactory<MonitorOutput, OutputTime, Record = Verdict>,
161    MonitorOutput: VerdictRepresentation,
162    OutputTime: OutputTimeRepresentation,
163    Verdict: Into<Vec<u8>>,
164> {
165    factory: Factory,
166    writer: W,
167    output: PhantomData<MonitorOutput>,
168    time: PhantomData<OutputTime>,
169}
170impl<
171        W: Write,
172        Factory: VerdictFactory<MonitorOutput, OutputTime, Record = Verdict>,
173        MonitorOutput: VerdictRepresentation,
174        OutputTime: OutputTimeRepresentation,
175        Verdict: Into<Vec<u8>>,
176    > ByteSink<W, Factory, MonitorOutput, OutputTime, Verdict>
177{
178    /// Create a new [ByteSink] that receives bytes and forwards them to a writer
179    pub fn new(writer: W, factory: Factory) -> Self {
180        Self {
181            factory,
182            writer,
183            output: PhantomData,
184            time: PhantomData,
185        }
186    }
187}
188
189impl<
190        W: Write,
191        Factory: VerdictFactory<MonitorOutput, OutputTime, Record = Verdict>,
192        MonitorOutput: VerdictRepresentation,
193        OutputTime: OutputTimeRepresentation,
194        Verdict: Into<Vec<u8>>,
195    > VerdictsSink<MonitorOutput, OutputTime>
196    for ByteSink<W, Factory, MonitorOutput, OutputTime, Verdict>
197{
198    type Error = std::io::Error;
199    type Factory = Factory;
200    type Return = ();
201
202    fn sink(&mut self, verdict: Verdict) -> Result<Self::Return, Self::Error> {
203        let bytes: Vec<u8> = verdict.into();
204        self.writer.write_all(&bytes[..])?;
205        self.writer.flush()?;
206        Ok(())
207    }
208
209    fn factory(&mut self) -> &mut Self::Factory {
210        &mut self.factory
211    }
212}
213
214/// A sink implementation that is completely discarding the verdicts
215#[derive(Copy, Clone, Debug)]
216pub struct DiscardSink<
217    O: OutputTimeRepresentation,
218    V: VerdictRepresentation,
219    F: VerdictFactory<V, O>,
220> {
221    factory: F,
222    verdict: PhantomData<(O, V)>,
223}
224
225impl<
226        O: OutputTimeRepresentation,
227        V: VerdictRepresentation,
228        F: VerdictFactory<V, O, Record = ()>,
229    > DiscardSink<O, V, F>
230{
231    /// Creates a new [DiscardSink] for a factory that returns `()` as a verdict.
232    pub fn new(factory: F) -> Self {
233        Self {
234            factory,
235            verdict: Default::default(),
236        }
237    }
238}
239
240impl<O: OutputTimeRepresentation, V: VerdictRepresentation> Default
241    for DiscardSink<O, V, EmptyFactory>
242{
243    fn default() -> Self {
244        Self::new(EmptyFactory)
245    }
246}
247
248impl<O: OutputTimeRepresentation, V: VerdictRepresentation, F: VerdictFactory<V, O>>
249    VerdictsSink<V, O> for DiscardSink<O, V, F>
250{
251    type Error = Infallible;
252    type Factory = F;
253    type Return = ();
254
255    fn sink(
256        &mut self,
257        _verdict: <Self::Factory as VerdictFactory<V, O>>::Record,
258    ) -> Result<Self::Return, Self::Error> {
259        Ok(())
260    }
261
262    fn factory(&mut self) -> &mut Self::Factory {
263        &mut self.factory
264    }
265}
266
267/// A factory implementation that does nothing
268#[derive(Default, Copy, Clone, Debug)]
269pub struct EmptyFactory;
270
271impl<V: VerdictRepresentation, O: OutputTimeRepresentation> VerdictFactory<V, O> for EmptyFactory {
272    type Error = Infallible;
273    type Record = ();
274
275    fn get_verdict(&mut self, _rec: V, _ts: O::InnerTime) -> Result<Self::Record, Self::Error> {
276        Ok(())
277    }
278}
279
280impl<V: VerdictRepresentation, O: OutputTimeRepresentation> NewVerdictFactory<V, O>
281    for EmptyFactory
282{
283    type CreationData = ();
284    type CreationError = Infallible;
285
286    fn new(_ir: &RtLolaMir, _data: Self::CreationData) -> Result<Self, Self::Error> {
287        Ok(Self)
288    }
289}
290
291/// Represents the verbosity and debug configuration of the streams in the specification to be used with the output plugins.
292#[derive(Debug, Clone)]
293pub struct VerbosityAnnotations {
294    stream_verbosity: HashMap<StreamReference, StreamVerbosity>,
295    debug_streams: HashSet<StreamReference>,
296}
297
298impl VerbosityAnnotations {
299    /// Parses the annotated tags in the specification to build the [CliAnnotations]
300    pub fn new(ir: &RtLolaMir) -> Result<VerbosityAnnotations, RtLolaError> {
301        let verbosity_parser = VerbosityParser;
302        let debug_parser = DebugParser;
303
304        let stream_verbosity_tags = ir.parse_tags(verbosity_parser)?;
305        let stream_verbosity = ir
306            .all_streams()
307            .map(|sr| (sr, *stream_verbosity_tags.local_tags(sr).unwrap()))
308            .collect();
309        let debug_tags = ir.parse_tags(debug_parser)?;
310        let debug_streams = ir
311            .all_streams()
312            .filter(|sr| *debug_tags.local_tags(*sr).unwrap())
313            .collect();
314
315        Ok(Self {
316            stream_verbosity,
317            debug_streams,
318        })
319    }
320
321    /// Parses the annotated tags in the specification and additionally mark all streams in `debug_streams` as debug.
322    pub fn new_with_debug(
323        ir: &RtLolaMir,
324        debug_streams: &[String],
325    ) -> Result<VerbosityAnnotations, RtLolaError> {
326        let annotations = Self::new(ir)?;
327        let debug_streams = debug_streams
328            .into_iter()
329            .map(|sname| {
330                ir.get_stream_by_name(sname.as_str())
331                    .ok_or_else(|| {
332                        format!(
333                            "stream {sname} marked for debugging, but not found in specification"
334                        )
335                    })
336                    .map(|stream| stream.as_stream_ref())
337            })
338            .collect::<Result<Vec<_>, String>>()
339            .unwrap_or_else(|e| {
340                eprintln!("{e}");
341                std::process::exit(1);
342            });
343        Ok(annotations.add_debug_streams(&debug_streams))
344    }
345
346    /// Returns the tag parsers applied by the [Self::new] call
347    pub fn parsers<'a>() -> &'a [&'a dyn TagValidator] {
348        &[&VerbosityParser, &DebugParser]
349    }
350
351    /// Mark a set of streams as additional debug streams
352    pub fn add_debug_streams(mut self, stream: &[StreamReference]) -> Self {
353        self.debug_streams.extend(stream);
354        self
355    }
356
357    /// Returns the verbosity for the given stream
358    pub fn verbosity(&self, sr: StreamReference) -> StreamVerbosity {
359        *self.stream_verbosity.get(&sr).unwrap()
360    }
361
362    /// Returns whether the given stream is marked as debug
363    pub fn debug(&self, sr: StreamReference) -> bool {
364        self.debug_streams.contains(&sr)
365    }
366}